Commit 5f1e2792 authored by salbiach's avatar salbiach
Browse files

Simplified code

parent ea6b73a6
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
TAG = latest-compss-2.8-eddlgpu-1.1-python-3.6
TAG = latest-compss-2.8-eddlgpu-1.1-python-3.6-v2
PREFIX = registry.gitlab.bsc.es/ppc-bsc/software/deep-health-compss
IMAGE = compss-eddl
......
......@@ -6,4 +6,4 @@
#masterIP=(${Nodes[2]})
echo "MasterIP is:" $MY_POD_IP
cd pyeddl
conda run --no-capture-output -n pyeddl_pycompss_env runcompss --lang=python --python_interpreter=python3 --project=/root/project.xml --resources=/root/resources.xml --master_name=$MY_POD_IP eddl_train_batch_compss_mnist_sync.py
conda run --no-capture-output -n pyeddl_pycompss_env runcompss --lang=python --python_interpreter=python3 --project=/root/project.xml --resources=/root/resources.xml --master_name=$MY_POD_IP eddl_train_batch.py --dataset="cifar10" --network="lenet" --num_epochs=10 --num_workers=4
......@@ -10,6 +10,8 @@ from eddl_array import to_tensor
from net_utils import net_parametersToNumpy
from net_utils import net_parametersToTensor
from models import SIMPLE_MNIST, LeNet, VGG16
class Eddl_Compss_Distributed:
......@@ -17,30 +19,47 @@ class Eddl_Compss_Distributed:
self.model = None
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(serialized_model=IN, optimizer=IN, losses=IN, metrics=IN, compserv=IN, is_replicated=True)
def build(self, serialized_model, optimizer, losses, metrics, compserv):
#print("Arranca build task en worker")
# Deserialize the received model
model = eddl.import_net_from_onnx_string(serialized_model)
#print(eddl.summary(model))
@task(dataset=IN,network=IN, use_gpu=IN, is_replicated=True)
def build(self, dataset, network, use_gpu):
# Dictionary relating the dataset with its number of classes and the first layer of the associated network
dataset_network_dict = {"mnist": [10, {
"simple-mnist": eddl.Input([784])
}],
"cifar10": [10, {
"lenet": eddl.Input([3, 32, 32]),
"vgg16": eddl.Input([3, 32, 32])
}]
}
# Dictionary relating the network argument with the function that implements it
network_functions_dict = {"simple-mnist": SIMPLE_MNIST, "lenet": LeNet, "vgg16": VGG16}
# Obtain the number of classes and the function that implements the network
num_classes = dataset_network_dict.get(dataset)[0]
network_function = network_functions_dict.get(network)
# Define the model
in_ = dataset_network_dict.get(dataset)[1].get(network)
out = network_function(in_, num_classes)
net = eddl.Model([in_], [out])
# Define the computing service to use
CS = eddl.CS_GPU() if use_gpu else eddl.CS_CPU()
# Build the model in this very node
eddl.build(
model,
net,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
losses,
metrics,
eddl.CS_GPU(),
["soft_cross_entropy"],
["categorical_accuracy"],
CS,
False
)
# Save the model. We have to serialize it to a string so COMPSs is able to serialize and deserialize from disk
self.model = eddl.serialize_net_to_onnx_string(model, False)
self.model = eddl.serialize_net_to_onnx_string(net, False)
#print("Finaliza build task en worker")
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(
......@@ -50,150 +69,58 @@ class Eddl_Compss_Distributed:
num_images_per_worker=IN,
num_epochs_for_param_sync=IN,
workers_batch_size=IN,
target_direction=IN)
target_direction=IN, use_gpu=IN)
def train_batch(self,
x_train,
y_train,
initial_parameters,
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size):
workers_batch_size, use_gpu):
# Convert data to tensors
x_train = to_tensor(x_train)
y_train = to_tensor(y_train)
#print("Entrando en train batch task en worker")
import sys
sys.stdout.flush()
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
#print("Modelo deserializado de disco")
sys.stdout.flush()
# Define the computing service to use
CS = eddl.CS_GPU() if use_gpu else eddl.CS_CPU()
# The model needs to be built after deserializing and before injecting the parameters
# Build the model after deserializing and before injecting the parameters
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
eddl.CS_GPU(),
CS,
False
)
#print("Modelo built")
#sys.stdout.flush()
# Set the parameters sent from master to the model
#model.setParameters(net_parametersToTensor(initial_parameters))
eddl.set_parameters(model, net_parametersToTensor(initial_parameters))
#print("Parametros seteados")
#sys.stdout.flush()
#print(eddl.summary(model))
#print("Build completed in train batch task")
#sys.stdout.flush()
num_batches = int(num_images_per_worker / workers_batch_size)
# Define the number of minibatches to compute
num_mini_batches = int(num_images_per_worker / workers_batch_size)
eddl.reset_loss(model)
#print("Num batches: ", num_batches)
#sys.stdout.flush()
for i in range(num_epochs_for_param_sync):
#print("Epoch %d/%d (%d batches)" % (i + 1, num_epochs_for_param_sync, num_batches))
for j in range(num_batches):
indices = list(range(j * num_batches, (j + 1) * num_batches - 1))
#print("Empieza batch")
#sys.stdout.flush()
for j in range(num_mini_batches):
indices = list(range(j * num_mini_batches, (j + 1) * num_mini_batches - 1))
eddl.train_batch(model, [x_train], [y_train], indices)
#print("Finaliza batch")
#sys.stdout.flush()
eddl.print_loss(model, num_batches) # previously j because it was inside the loop
print()
print("Train batch individual completed in worker's train batch task")
print()
sys.stdout.flush()
# Random noise time sleep for experimentation purposes
# import time
# import random
# time.sleep(random.randint(1, 180))
eddl.print_loss(model, num_mini_batches)
print("\nTrain batch individual completed in worker's train batch task\n")
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
final_parameters = net_parametersToNumpy(eddl.get_parameters(model))
return final_parameters
@task(
x_train={Type: COLLECTION_IN, Depth: 2},
y_train={Type: COLLECTION_IN, Depth: 2},
initial_parameters=IN,
num_images_per_worker=IN,
num_epochs_for_param_sync=IN,
workers_batch_size=IN,
target_direction=IN)
def train_batch_async(self,
x_train,
y_train,
initial_parameters,
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size):
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
import sys
# Set the parameters sent from master to the model
#model.setParameters(net_parametersToTensor(initial_parameters))
eddl.set_parameters(model, net_parametersToTensor(initial_parameters))
# print(eddl.summary(model))
# The model needs to be built after deserializing
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
eddl.CS_GPU(),
False
)
#print("Build completed in train batch task")
num_batches = int(num_images_per_worker / workers_batch_size)
eddl.reset_loss(model)
#print("Num batches: ", num_batches)
for i in range(num_epochs_for_param_sync):
#print("Epoch %d/%d (%d batches)" % (i + 1, num_epochs_for_param_sync, num_batches))
for j in range(num_batches):
indices = list(range(j * num_batches, (j + 1) * num_batches - 1))
eddl.train_batch(model, [x_train], [y_train], indices)
eddl.print_loss(model, num_batches)
print()
print("Train batch individual completed in worker's train batch task")
print()
sys.stdout.flush()
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
#final_parameters = net_parametersToNumpy(model.getParameters())
final_parameters = net_parametersToNumpy(eddl.get_parameters(model, False, True))
return final_parameters
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(accumulated_parameters=COMMUTATIVE, parameters_to_aggregate=IN, mult_factor=IN, target_direction=IN)
......@@ -201,132 +128,7 @@ class Eddl_Compss_Distributed:
for i in range(0, len(accumulated_parameters)):
for j in range(0, len(accumulated_parameters[i])):
# accumulated_parameters[i][j] += (parameters_to_aggregate[i][j] * mult_factor).astype(np.float32)
accumulated_parameters[i][j] = (
(accumulated_parameters[i][j] + parameters_to_aggregate[i][j]) / 2).astype(np.float32)
return accumulated_parameters
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(initial_parameters=IN, train_test_flag=IN, target_direction=IN)
def evaluate(self, initial_parameters, train_test_flag):
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
# Set the parameters sent from master to the model
model.setParameters(net_parametersToTensor(initial_parameters))
# print(eddl.summary(model))
# The model needs to be built after deserializing
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
eddl.CS_GPU(),
False
)
#print("Build completed in evaluate task")
if train_test_flag == "train":
x = eddlT.load(CVAR_DATASET_PATH + CVAR_DATASET_X_TRN)
y = eddlT.load(CVAR_DATASET_PATH + CVAR_DATASET_Y_TRN)
else:
x = eddlT.load(CVAR_DATASET_PATH + CVAR_DATASET_X_TST)
y = eddlT.load(CVAR_DATASET_PATH + CVAR_DATASET_Y_TST)
eddlT.div_(x, 255.0)
print("Evaluating model against " + train_test_flag + " set")
eddl.evaluate(model, [x], [y])
return 1
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(initial_parameters=IN, x_test={Type: COLLECTION_IN, Depth: 2}, y_test={Type: COLLECTION_IN, Depth: 2}, num_images_per_worker=IN, workers_batch_size=IN, target_direction=IN)
def eval_batch(self, x_test, y_test, initial_parameters, num_images_per_worker, workers_batch_size):
#import pickle
#import codecs
#initial_parameters = pickle.loads(codecs.decode(initial_parameters.encode(), "base64"))
print("Entrando en eval batch task en worker")
import sys
sys.stdout.flush()
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
print("Modelo deserializado de disco")
sys.stdout.flush()
# The model needs to be built after deserializing and before injecting the parameters
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
eddl.CS_GPU(),
False
)
print("Modelo built")
sys.stdout.flush()
# Set the parameters sent from master to the model
eddl.set_parameters(model, net_parametersToTensor(initial_parameters))
print("Parametros seteados")
sys.stdout.flush()
#print(eddl.summary(model))
print("Build completed in eval batch task")
sys.stdout.flush()
# Convert data to tensors
x_test = to_tensor(x_test)
y_test = to_tensor(y_test)
print("Lenes: ", x_test.shape)
sys.stdout.flush()
num_batches = int(num_images_per_worker / workers_batch_size)
eddl.reset_loss(model)
print("Num batches: ", num_batches)
sys.stdout.flush()
'''for i in range(num_batches):
indices = list(range(0, x_test.shape[0]))
eddl.eval_batch(model, [x_test], [y_test], indices)'''
eddl.evaluate(model, [x_test], [y_test])
print("Eval batch individual completed in eval batch task")
sys.stdout.flush()
# Random noise time sleep for experimentation purposes
#import time
#import random
#time.sleep(random.randint(1, 180))
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
#final_parameters = net_parametersToNumpy(model.getParameters())
# These final results should be a call to getAcc() y getLoss()
final_results = [eddl.get_losses(model)[-1], eddl.get_metrics(model)[-1]]
print("Final results: ", final_results)
sys.stdout.flush()
return final_results
return accumulated_parameters
\ No newline at end of file
#from pyeddl import _core
"""\
EDDL DISTRIBUTED API IMPLEMENTATION.
"""
from pycompss.api.api import compss_wait_on
from pycompss.api.api import compss_barrier
from eddl_array import paired_partition
from eddl_compss_distributed import *
from net_utils import net_aggregateParameters
from net_utils import net_aggregateResults
from net_utils import net_parametersToNumpy
from shuffle import local_shuffle, block_shuffle_async
from shuffle import block_shuffle_async
compss_object: Eddl_Compss_Distributed = None
def build(model, optimizer, losses, metrics, compserv, random_weights):
def build(net, dataset, network, use_gpu):
# Initialize the compss object
global compss_object
compss_object = Eddl_Compss_Distributed()
#print("Type3333", type(optimizer))
#print("Type2222: ", isinstance(optimizer, _core.sgd))
# This call to build is for security reasons to initialize the parameters to random values
# Define the computing service to use
CS = eddl.CS_GPU() if use_gpu else eddl.CS_CPU()
# Build the model in the master
eddl.build(
model,
optimizer,
losses,
metrics,
compserv,
random_weights
net,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
CS,
True
)
# Serialize the model so it can be sent through the network
serialized_model = eddl.serialize_net_to_onnx_string(model, False)
# Build the model in each distributed computing unit
compss_object.build(dataset, network, use_gpu)
# Build the model in all the nodes synchronously
compss_object.build(serialized_model, "", losses, metrics, "")
# Wait until the models are created in each computing unit
print("Building the model in distributed computing units...")
compss_barrier()
print("Building done!")
def train_batch(model, x_train, y_train, num_workers, num_epochs_for_param_sync, workers_batch_size):
def train_batch(model, x_train, y_train, num_workers, num_epochs_for_param_sync, workers_batch_size, use_gpu):
global compss_object
# Initial parameters that every node will take in order to begin the training
# Initial parameters that every computing unit will take in order to begin the training
initial_parameters = net_parametersToNumpy(eddl.get_parameters(model))
# print("Aqui si se muestran: ", initial_parameters)
# Array to store the weights of each computing unit
recv_weights = [list() for i in range(0, num_workers)]
s = x_train.shape
num_images_per_worker = int(s[0] / num_workers)
#print("Num workers: ", num_workers)
#print("Num images per worker: ", num_images_per_worker)
#print("Workers batch size: ", workers_batch_size)
# Pickle test
# import pickle
# filename = "dogs"
# outfile = open(filename,'wb')
# pickle.dump(initial_parameters, outfile)
# Define the number of images corresponding to each computing unit
num_total_samples = x_train.shape[0]
num_images_per_worker = int(num_total_samples / num_workers)
# print("Pasa el test de pickle")
# infile = open(filename,'rb')
# new_dict = pickle.load(infile)
# infile.close()
# print("Pasa el test de deserialization pickle")
#
# import codecs
# initial_parameters = codecs.encode(pickle.dumps(initial_parameters), "base64").decode()
# Devide the dataset among the computing units and train the epoch range
for i, (block_x, block_y) in enumerate(paired_partition(x_train, y_train)):
recv_weights[i] = compss_object.train_batch(
block_x,
block_y,
initial_parameters,
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size)
# COMPSS barrier to force waiting until every node finishes its training (synchronous training)
block_x,
block_y,
initial_parameters,
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size, use_gpu)
# Wait until every computing unit finishes its training (synchronous training)
recv_weights = compss_wait_on(recv_weights)
# Parameters aggregation
# Aggregate parameters
final_weights = net_aggregateParameters(recv_weights)
# Print final weights
# print("Los final weights al final de train_batch son estos: ", final_weights)
# Set trained and aggregated parameters to the model
# Set the parameters of the model to the aggregated parameters
eddl.set_parameters(model, net_parametersToTensor(final_weights))
def train_batch_async(model, x_train, y_train, num_workers, num_epochs_for_param_sync, workers_batch_size):
global compss_object
s = x_train.shape
num_images_per_worker = int(s[0] / num_workers)
#print("Num workers: ", num_workers)
#print("Num images per worker: ", num_images_per_worker)
#print("Workers batch size: ", workers_batch_size)
# Array of final parameters whose initial value is initial parameters
#accumulated_parameters = net_parametersToNumpy(model.getParameters())
accumulated_parameters = net_parametersToNumpy(eddl.get_parameters(model))
# accumulated_parameters = net_numpyParametersFill(accumulated_parameters, 0)
#workers_parameters = [net_parametersToNumpy(model.getParameters()) for i in range(0, num_workers)]
workers_parameters = [net_parametersToNumpy(eddl.get_parameters(model)) for i in range(0, num_workers)]
for j, (block_x, block_y) in enumerate(paired_partition(x_train, y_train)):
workers_parameters[j] = compss_object.train_batch(
block_x,
block_y,
workers_parameters[j],
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size)
workers_parameters[j] = compss_object.aggregate_parameters_async(
accumulated_parameters,
workers_parameters[j],
1 / num_workers)
accumulated_parameters = compss_wait_on(accumulated_parameters)
# workers_parameters = compss_wait_on(workers_parameters)
#print("Workers parameters: ", workers_parameters)
#print("Final accumulated parameters: ", accumulated_parameters)
# Set trained and aggregated parameters to the model
#model.setParameters(net_parametersToTensor(accumulated_parameters))
eddl.set_parameters(model, net_parametersToTensor(accumulated_parameters))
def fit_async(model, x_train, y_train, num_workers, num_epochs_for_param_sync, max_num_async_epochs,
workers_batch_size):
global compss_object
s = x_train.shape
num_images_per_worker = int(s[0] / num_workers)
#print("Num workers: ", num_workers)
#print("Num images per worker: ", num_images_per_worker)
#print("Workers batch size: ", workers_batch_size)
# Define the number of images corresponding to each computing unit
num_total_samples = x_train.shape[0]
num_images_per_worker = int(num_total_samples / num_workers)
# Array of final parameters whose initial value is initial parameters
#accumulated_parameters = net_parametersToNumpy(model.getParameters())
# accumulated_parameters = net_numpyParametersFill(accumulated_parameters, 0)
# Variable where parameters will be aggregated asynchornously
accumulated_parameters = net_parametersToNumpy(eddl.get_parameters(model))
#workers_parameters = [net_parametersToNumpy(model.getParameters()) for i in range(0, num_workers)]
# Define the parameters for each worker
workers_parameters = [net_parametersToNumpy(eddl.get_parameters(model)) for i in range(0, num_workers)]
x_blocks = [x[0] for x in paired_partition(x_train, y_train)]
y_blocks = [x[1] for x in paired_partition(x_train, y_train)]
# Until the maximum number of asynchrnous epochs is reached
for i in range(0, max_num_async_epochs):
# Train and aggregate the parameters asynchronously for each distributed computing unit
for j in range(0, num_workers):
shuffled_x, shuffled_y = block_shuffle_async(
x_blocks[j],
y_blocks[j],
workers_parameters[j])