Commit 0ed10926 authored by salbiach's avatar salbiach
Browse files

shuffling in worker and asynchornous train

parent 2359ddbe
......@@ -37,7 +37,10 @@ Run:
`bash configure_compss.sh`
Modify runcompss.sh to your needs, for example, if it is desired that some epochs are perfomed asynchronously the following parameter should be added to the runcompss call:
`--num_async_epochs=5`
`--num_async_epochs=5`.
If a complete asynchronous execution is desired, the flag `--asynchronous` should be added. In that way the workers will aggregate the parameters asynchornously.
Run:
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
TAG = latest-compss-2.8-eddlgpu-1.1-python-3.6-v2
TAG = latest-compss-2.8-eddlgpu-1.1-python-3.6-v3
PREFIX = registry.gitlab.bsc.es/ppc-bsc/software/deep-health-compss
IMAGE = compss-eddl
......
......@@ -6,4 +6,4 @@
#masterIP=(${Nodes[2]})
echo "MasterIP is:" $MY_POD_IP
cd pyeddl
conda run --no-capture-output -n pyeddl_pycompss_env runcompss --lang=python --python_interpreter=python3 --project=/root/project.xml --resources=/root/resources.xml --master_name=$MY_POD_IP eddl_train_batch.py --dataset="cifar10" --network="lenet" --num_epochs=10 --num_workers=4
conda run --no-capture-output -n pyeddl_pycompss_env runcompss --lang=python --python_interpreter=python3 --project=/root/project.xml --resources=/root/resources.xml --master_name=$MY_POD_IP eddl_master_train_batch.py --dataset="cifar10" --network="lenet" --num_epochs=10 --num_workers=4
import numpy as np
import pyeddl.eddl as eddl
from pycompss.api.constraint import constraint
from pycompss.api.parameter import *
from pycompss.api.task import task
from pyeddl.tensor import Tensor as eddlT
from cvars import *
from eddl_array import to_tensor
from net_utils import net_parametersToNumpy
from net_utils import net_parametersToTensor
from models import SIMPLE_MNIST, LeNet, VGG16
class Eddl_Compss_Distributed:
def __init__(self):
self.model = None
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(dataset=IN,network=IN, use_gpu=IN, is_replicated=True)
def build(self, dataset, network, use_gpu):
# Dictionary relating the dataset with its number of classes and the first layer of the associated network
dataset_network_dict = {"mnist": [10, {
"simple-mnist": eddl.Input([784])
}],
"cifar10": [10, {
"lenet": eddl.Input([3, 32, 32]),
"vgg16": eddl.Input([3, 32, 32])
}]
}
# Dictionary relating the network argument with the function that implements it
network_functions_dict = {"simple-mnist": SIMPLE_MNIST, "lenet": LeNet, "vgg16": VGG16}
# Obtain the number of classes and the function that implements the network
num_classes = dataset_network_dict.get(dataset)[0]
network_function = network_functions_dict.get(network)
# Define the model
in_ = dataset_network_dict.get(dataset)[1].get(network)
out = network_function(in_, num_classes)
net = eddl.Model([in_], [out])
# Define the computing service to use
CS = eddl.CS_GPU() if use_gpu else eddl.CS_CPU()
# Build the model in this very node
eddl.build(
net,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
CS,
False
)
# Save the model. We have to serialize it to a string so COMPSs is able to serialize and deserialize from disk
self.model = eddl.serialize_net_to_onnx_string(net, False)
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(
x_train={Type: COLLECTION_IN, Depth: 2},
y_train={Type: COLLECTION_IN, Depth: 2},
initial_parameters=IN,
num_images_per_worker=IN,
num_epochs_for_param_sync=IN,
workers_batch_size=IN,
target_direction=IN, use_gpu=IN)
def train_batch(self,
x_train,
y_train,
initial_parameters,
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size, use_gpu):
# Convert data to tensors
x_train = to_tensor(x_train)
y_train = to_tensor(y_train)
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
# Define the computing service to use
CS = eddl.CS_GPU() if use_gpu else eddl.CS_CPU()
# Build the model after deserializing and before injecting the parameters
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
CS,
False
)
# Set the parameters sent from master to the model
eddl.set_parameters(model, net_parametersToTensor(initial_parameters))
# Define the number of minibatches to compute
num_mini_batches = int(num_images_per_worker / workers_batch_size)
eddl.reset_loss(model)
for i in range(num_epochs_for_param_sync):
for j in range(num_mini_batches):
indices = list(range(j * num_mini_batches, (j + 1) * num_mini_batches - 1))
eddl.train_batch(model, [x_train], [y_train], indices)
eddl.print_loss(model, num_mini_batches)
print("\nTrain batch individual completed in worker's train batch task\n")
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
final_parameters = net_parametersToNumpy(eddl.get_parameters(model))
return final_parameters
"""\
EDDL DISTRIBUTED API IMPLEMENTATION.
"""
from pycompss.api.api import compss_wait_on
from pycompss.api.api import compss_barrier
from eddl_array import paired_partition
from eddl_compss_distributed import *
from net_utils import net_aggregateParameters
from net_utils import net_aggregateResults
from net_utils import net_parametersToNumpy
from shuffle import block_shuffle_async
compss_object: Eddl_Compss_Distributed = None
def build(net, dataset, network, use_gpu):
# Initialize the compss object
global compss_object
compss_object = Eddl_Compss_Distributed()
# Define the computing service to use
CS = eddl.CS_GPU() if use_gpu else eddl.CS_CPU()
# Build the model in the master
eddl.build(
net,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
CS,
True
)
# Build the model in each distributed computing unit
compss_object.build(dataset, network, use_gpu)
# Wait until the models are created in each computing unit
print("Building the model in distributed computing units...")
compss_barrier()
print("Building done!")
def train_batch(model, x_train, y_train, num_workers, num_epochs_for_param_sync, workers_batch_size, use_gpu):
global compss_object
# Initial parameters that every computing unit will take in order to begin the training
initial_parameters = net_parametersToNumpy(eddl.get_parameters(model))
# Array to store the weights of each computing unit
recv_weights = [list() for i in range(0, num_workers)]
# Define the number of images corresponding to each computing unit
num_total_samples = x_train.shape[0]
num_images_per_worker = int(num_total_samples / num_workers)
# Devide the dataset among the computing units and train the epoch range
for i, (block_x, block_y) in enumerate(paired_partition(x_train, y_train)):
recv_weights[i] = compss_object.train_batch(
block_x,
block_y,
initial_parameters,
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size, use_gpu)
# Wait until every computing unit finishes its training (synchronous training)
recv_weights = compss_wait_on(recv_weights)
# Aggregate parameters
final_weights = net_aggregateParameters(recv_weights)
# Set the parameters of the model to the aggregated parameters
eddl.set_parameters(model, net_parametersToTensor(final_weights))
"""\
SYNCHRONOUS TRAIN_BATCH IMPLEMENTATION.
"""
import argparse
from itertools import combinations_with_replacement
import sys
from timeit import default_timer as timer
import pyeddl.eddl as eddl
from pyeddl.tensor import Tensor as eddlT
import eddl_compss_distributed_api as compss_api
from cvars import *
from eddl_array import array
from shuffle import global_shuffle
from models import LeNet, VGG16, SIMPLE_MNIST
def main(args):
##########################
##### INITIALIZATION #####
##########################
# Process arguments
num_workers = args.num_workers
num_epochs = args.num_epochs
workers_batch_size = args.workers_batch_size
num_async_epochs = args.num_async_epochs
max_num_async_epochs = args.max_num_async_epochs
dataset = args.dataset
network = args.network
use_gpu = args.gpu
# Define available datasets and network implementations
dataset_list = ["mnist", "cifar10"]
network_list = ["simple-mnist","lenet", "vgg16"]
combination_list = [("mnist", "simple-mnist"),("cifar10", "lenet"),("cifar10", "vgg16")]
# Dictionary relating the dataset with its number of classes and the first layer of the associated network
dataset_network_dict = {"mnist": [10, {
"simple-mnist": eddl.Input([784])
}],
"cifar10": [10, {
"lenet": eddl.Input([3, 32, 32]),
"vgg16": eddl.Input([3, 32, 32])
}]
}
# Dictionary relating the network argument with the function that implements it
network_functions_dict = {"simple-mnist": SIMPLE_MNIST, "lenet": LeNet, "vgg16": VGG16}
# Check that the dataset is downloaded, the network is implemented and the combination is valid
if (dataset not in dataset_list):
print("The required dataset is not available.")
elif (network not in network_list):
print("The required network is not available.")
elif ((dataset, network) not in combination_list):
print("The required dataset-network combination is not implemented.")
else:
# Obtain the number of classes and the function that implements the network
num_classes = dataset_network_dict.get(dataset)[0]
network_function = network_functions_dict.get(network)
# Define the model
in_ = dataset_network_dict.get(dataset)[1].get(network)
out = network_function(in_, num_classes)
net = eddl.Model([in_], [out])
##########################
##### MODEL BUILDING #####
##########################
compss_api.build(net, dataset, network, use_gpu)
eddl.summary(net)
###########################
##### DATASET LOADING #####
###########################
x_train = eddlT.load(CVAR_DATASET_PATH + dataset+"_trX.bin")
y_train = eddlT.load(CVAR_DATASET_PATH + dataset+"_trY.bin")
x_test = eddlT.load(CVAR_DATASET_PATH + dataset+"_tsX.bin")
y_test = eddlT.load(CVAR_DATASET_PATH + dataset+"_tsY.bin")
# Normalize
eddlT.div_(x_train, 255.0)
eddlT.div_(x_test, 255.0)
# Prepare data for distribution
train_images_per_worker = int(eddlT.getShape(x_train)[0] / num_workers)
x_train_dist = array(x_train, train_images_per_worker)
y_train_dist = array(y_train, train_images_per_worker)
####################
##### TRAINING #####
####################
start_time = timer()
print("MODEL TRAINING...")
print("Num workers: ", num_workers)
print("Number of epochs: ", num_epochs)
print("Number of asynchronous epochs: ", num_async_epochs)
# For every epoch taking into account parameter synchronization step
for i in range(1, num_epochs+1, num_async_epochs):
start_epoch = str(i)
end_epoch = str(i+num_async_epochs-1)
print("\nTraining epochs: ", start_epoch, " to " + end_epoch + "\n")
epoch_start_time = timer()
# Shuffle dataset and train the epoch range
x_train_dist, y_train_dist = global_shuffle(x_train_dist, y_train_dist)
compss_api.train_batch(
net,
x_train_dist,
y_train_dist,
num_workers,
num_async_epochs,
workers_batch_size, use_gpu)
epoch_end_time = timer()
epoch_final_time = epoch_end_time - epoch_start_time
print(f"Elapsed time for epoch range ("+ start_epoch + "-" + end_epoch + "): ", str(round(epoch_final_time,2)), " seconds")
end_time = timer()
final_time = end_time - start_time
print("Total elapsed time: ", str(round(final_time,2)), " seconds")
######################
##### EVALUATION #####
######################
print("Evaluating model against test set")
eddl.evaluate(net, [x_test], [y_test])
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--dataset", type=str, metavar="STR", default="mnist") # The dataset to work with
parser.add_argument("--network", type=str, metavar="STR", default="simple-mnist") # The network that will be trained
parser.add_argument("--num_workers", type=int, metavar="INT", default=4) # Number of computing units to divide the execution
parser.add_argument("--num_epochs", type=int, metavar="INT", default=10) # Number of epochs to run the training
parser.add_argument("--num_async_epochs", type=int, metavar="INT", default=1) # Number of epochs until parameter synchonization
parser.add_argument("--max_num_async_epochs", type=int, metavar="INT", default=1) # Number of maximum epochs until parameter synchronization in async mode
parser.add_argument("--workers_batch_size", type=int, metavar="INT", default=250) # Size of each batch of the training phase
parser.add_argument("--gpu", type=bool, metavar="BOOL", default=True) # True: Use GPU as CS --- False: Use CPU as CS
main(parser.parse_args(sys.argv[1:]))
......@@ -28,7 +28,7 @@ spec:
type: DirectoryOrCreate
containers:
- name: dh-compss-master
image: registry.gitlab.bsc.es/ppc-bsc/software/deep-health-compss/compss-eddl:latest-compss-2.8-eddlgpu-1.1-python-3.6-v2
image: registry.gitlab.bsc.es/ppc-bsc/software/deep-health-compss/compss-eddl:latest-compss-2.8-eddlgpu-1.1-python-3.6-v3
resources:
limits:
nvidia.com/gpu: 1 # requesting 1 GPU
......@@ -84,7 +84,7 @@ spec:
type: DirectoryOrCreate
containers:
- name: dh-compss-worker
image: registry.gitlab.bsc.es/ppc-bsc/software/deep-health-compss/compss-eddl:latest-compss-2.8-eddlgpu-1.1-python-3.6-v2
image: registry.gitlab.bsc.es/ppc-bsc/software/deep-health-compss/compss-eddl:latest-compss-2.8-eddlgpu-1.1-python-3.6-v3
resources:
limits:
nvidia.com/gpu: 1 # requesting 1 GPU
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment