modified readme

...@@ -36,7 +36,8 @@ Run: ...@@ -36,7 +36,8 @@ Run:
`bash` `bash`
Modify to your needs Modify to your needs, for example, if it is desired that some epochs are perfomed asynchronously the following parameter should be added to the runcompss call:
Run: Run:
...@@ -119,16 +119,3 @@ class Eddl_Compss_Distributed: ...@@ -119,16 +119,3 @@ class Eddl_Compss_Distributed:
final_parameters = net_parametersToNumpy(eddl.get_parameters(model)) final_parameters = net_parametersToNumpy(eddl.get_parameters(model))
return final_parameters return final_parameters
@task(accumulated_parameters=COMMUTATIVE, parameters_to_aggregate=IN, mult_factor=IN, target_direction=IN)
def aggregate_parameters_async(self, accumulated_parameters, parameters_to_aggregate, mult_factor):
for i in range(0, len(accumulated_parameters)):
for j in range(0, len(accumulated_parameters[i])):
accumulated_parameters[i][j] = (
(accumulated_parameters[i][j] + parameters_to_aggregate[i][j]) / 2).astype(np.float32)
return accumulated_parameters
...@@ -74,50 +74,3 @@ def train_batch(model, x_train, y_train, num_workers, num_epochs_for_param_sync, ...@@ -74,50 +74,3 @@ def train_batch(model, x_train, y_train, num_workers, num_epochs_for_param_sync,
# Set the parameters of the model to the aggregated parameters # Set the parameters of the model to the aggregated parameters
eddl.set_parameters(model, net_parametersToTensor(final_weights)) eddl.set_parameters(model, net_parametersToTensor(final_weights))
def fit_async(model, x_train, y_train, num_workers, num_epochs_for_param_sync, max_num_async_epochs,
global compss_object
# Define the number of images corresponding to each computing unit
num_total_samples = x_train.shape[0]
num_images_per_worker = int(num_total_samples / num_workers)
# Variable where parameters will be aggregated asynchornously
accumulated_parameters = net_parametersToNumpy(eddl.get_parameters(model))
# Define the parameters for each worker
workers_parameters = [net_parametersToNumpy(eddl.get_parameters(model)) for i in range(0, num_workers)]
x_blocks = [x[0] for x in paired_partition(x_train, y_train)]
y_blocks = [x[1] for x in paired_partition(x_train, y_train)]
# Until the maximum number of asynchrnous epochs is reached
for i in range(0, max_num_async_epochs):
# Train and aggregate the parameters asynchronously for each distributed computing unit
for j in range(0, num_workers):
shuffled_x, shuffled_y = block_shuffle_async(
x_blocks[j], y_blocks[j] = [shuffled_x], [shuffled_y]
workers_parameters[j] = compss_object.train_batch(
workers_parameters[j] = compss_object.aggregate_parameters_async(
1 / num_workers)
# Wait until every computing unit has aggregated its parameters
accumulated_parameters = compss_wait_on(accumulated_parameters)
# Set the model parameters to the aggregated parameters
eddl.set_parameters(model, net_parametersToTensor(accumulated_parameters))
