Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
PPC-BSC
Software
Deep Health Compss
Commits
0b018475
Commit
0b018475
authored
Jan 26, 2022
by
salbiach
Browse files
renamed files
parent
7f477b07
Changes
9
Hide whitespace changes
Inline
Side-by-side
docker/pyeddl/eddl_master_distributed_api.py
0 → 100644
View file @
0b018475
"""
\
EDDL DISTRIBUTED API IMPLEMENTATION.
"""
from
pycompss.api.api
import
compss_wait_on
from
pycompss.api.api
import
compss_barrier
from
eddl_array
import
paired_partition
from
eddl_worker_distributed
import
*
from
net_utils
import
net_aggregateParameters
from
net_utils
import
net_aggregateResults
from
net_utils
import
net_parametersToNumpy
from
shuffle
import
block_shuffle_async
,
global_shuffle
from
timeit
import
default_timer
as
timer
compss_object
:
Eddl_Compss_Distributed
=
None
def
build
(
net
,
dataset
,
network
,
use_gpu
):
# Initialize the compss object
global
compss_object
compss_object
=
Eddl_Compss_Distributed
()
# Define the computing service to use
CS
=
eddl
.
CS_GPU
()
if
use_gpu
else
eddl
.
CS_CPU
()
# Build the model in the master
eddl
.
build
(
net
,
eddl
.
sgd
(
CVAR_SGD1
,
CVAR_SGD2
),
[
"soft_cross_entropy"
],
[
"categorical_accuracy"
],
CS
,
True
)
# Build the model in each distributed computing unit
compss_object
.
build
(
dataset
,
network
,
use_gpu
)
# Wait until the models are created in each computing unit
print
(
"Building the model in distributed computing units..."
)
compss_barrier
()
print
(
"Building done!"
)
def
fit_sync
(
model_params
,
x_train_dist
,
y_train_dist
,
num_workers
,
num_epochs
,
workers_batch_size
,
use_gpu
):
"""
Synchronization every epoch
"""
print
(
"Training epochs synchronously..."
)
global
compss_object
# Define the number of images corresponding to each computing unit
num_total_samples
=
x_train_dist
.
shape
[
0
]
num_images_per_worker
=
int
(
num_total_samples
/
num_workers
)
# Array to store the weights of each computing unit
worker_params
=
[
list
()
for
i
in
range
(
0
,
num_workers
)]
# For every epoch taking into account parameter synchronization step
for
i
in
range
(
0
,
num_epochs
):
print
(
"Training epoch: "
+
str
(
i
+
1
))
epoch_start_time
=
timer
()
# Shuffle dataset and train the epoch range
x_train_dist
,
y_train_dist
=
global_shuffle
(
x_train_dist
,
y_train_dist
)
# x_train_dist is a dislib array already divided into the number of workers, so it is iterating over num_workers
for
j
,
(
block_x
,
block_y
)
in
enumerate
(
paired_partition
(
x_train_dist
,
y_train_dist
)):
worker_params
[
j
]
=
compss_object
.
train_batch
(
block_x
,
block_y
,
model_params
,
num_images_per_worker
,
workers_batch_size
,
use_gpu
)
# Wait until every computing unit finishes its training (synchronous training)
worker_params
=
compss_wait_on
(
worker_params
)
# Aggregate parameters
model_params
=
net_aggregateParameters
(
worker_params
)
epoch_end_time
=
timer
()
epoch_final_time
=
epoch_end_time
-
epoch_start_time
print
(
"Elapsed time for epoch "
+
str
(
i
+
1
)
+
": "
+
str
(
round
(
epoch_final_time
,
2
))
+
" seconds"
)
return
model_params
def
fit_async
(
model_params
,
x_train_dist
,
y_train_dist
,
num_workers
,
num_epochs
,
workers_batch_size
,
use_gpu
):
"""
Partial parameter aggregation after every worker completion
"""
print
(
"Training epochs asynchronously... (epochs information not printed)"
)
global
compss_object
# Define the number of images corresponding to each computing unit
num_total_samples
=
x_train_dist
.
shape
[
0
]
num_images_per_worker
=
int
(
num_total_samples
/
num_workers
)
# Define the parameters for each worker
worker_params
=
[
net_parametersToNumpy
(
model_params
)
for
i
in
range
(
0
,
num_workers
)]
# Train and aggregate the parameters asynchronously for each distributed computing unit
for
i
in
range
(
0
,
num_epochs
):
# x_train_dist is a dislib array already divided into the number of workers, so it is iterating over num_workers
for
j
,
(
block_x
,
block_y
)
in
enumerate
(
paired_partition
(
x_train_dist
,
y_train_dist
)):
shuffled_x
,
shuffled_y
=
block_shuffle_async
(
block_x
,
block_y
)
block_x
,
block_y
=
[
shuffled_x
],
[
shuffled_y
]
worker_params
[
j
]
=
compss_object
.
train_batch
(
block_x
,
block_y
,
worker_params
[
j
],
num_images_per_worker
,
workers_batch_size
,
use_gpu
)
# model_params is COMMUTATIVE therefore it is updating in each call
worker_params
[
j
]
=
compss_object
.
aggregate_parameters_async
(
model_params
,
worker_params
[
j
],
(
1
/
num_workers
))
# Wait until every computing unit has aggregated its parameters
model_params
=
compss_wait_on
(
model_params
)
return
model_params
def
fit_full_async
(
model_params
,
x_train_dist
,
y_train_dist
,
num_workers
,
num_epochs
,
workers_batch_size
,
use_gpu
):
"""
Parameter aggregation at the end of num_epochs only
"""
print
(
"Training epochs fully asynchronous... (epochs information not printed)"
)
global
compss_object
# Define the number of images corresponding to each computing unit
num_total_samples
=
x_train_dist
.
shape
[
0
]
num_images_per_worker
=
int
(
num_total_samples
/
num_workers
)
# Array to store the weights of each computing unit
worker_params
=
[
net_parametersToNumpy
(
model_params
)
for
i
in
range
(
0
,
num_workers
)]
# Shuffle the dataset
x_train_dist
,
y_train_dist
=
global_shuffle
(
x_train_dist
,
y_train_dist
)
# For every epoch
for
i
in
range
(
0
,
num_epochs
):
# x_train_dist is a dislib array already divided into the number of workers, so it is iterating over num_workers
for
j
,
(
block_x
,
block_y
)
in
enumerate
(
paired_partition
(
x_train_dist
,
y_train_dist
)):
shuffled_x
,
shuffled_y
=
block_shuffle_async
(
block_x
,
block_y
)
block_x
,
block_y
=
[
shuffled_x
],
[
shuffled_y
]
worker_params
[
j
]
=
compss_object
.
train_batch
(
block_x
,
block_y
,
worker_params
[
j
],
num_images_per_worker
,
workers_batch_size
,
use_gpu
)
# Wait until every computing unit finishes its training (synchronous training)
worker_params
=
compss_wait_on
(
worker_params
)
# Aggregate parameters
model_params
=
net_aggregateParameters
(
worker_params
)
return
model_params
docker/pyeddl/eddl_master_train_batch.py
0 → 100644
View file @
0b018475
"""
\
SYNCHRONOUS TRAIN_BATCH IMPLEMENTATION.
"""
import
argparse
from
itertools
import
combinations_with_replacement
import
sys
from
timeit
import
default_timer
as
timer
import
pyeddl.eddl
as
eddl
from
pyeddl.tensor
import
Tensor
as
eddlT
import
eddl_master_distributed_api
as
compss_api
from
cvars
import
*
from
eddl_array
import
array
from
models
import
LeNet
,
VGG16
,
SIMPLE_MNIST
from
net_utils
import
net_parametersToNumpy
,
net_parametersToTensor
def
main
(
args
):
##########################
##### INITIALIZATION #####
##########################
# Process arguments
num_workers
=
args
.
num_workers
num_epochs
=
args
.
num_epochs
workers_batch_size
=
args
.
workers_batch_size
dataset
=
args
.
dataset
network
=
args
.
network
use_gpu
=
args
.
gpu
sync_type
=
args
.
sync_type
# Define available datasets and network implementations
dataset_list
=
[
"mnist"
,
"cifar10"
]
network_list
=
[
"simple-mnist"
,
"lenet"
,
"vgg16"
]
combination_list
=
[(
"mnist"
,
"simple-mnist"
),(
"cifar10"
,
"lenet"
),(
"cifar10"
,
"vgg16"
)]
# Dictionary relating the dataset with its number of classes and the first layer of the associated network
dataset_network_dict
=
{
"mnist"
:
[
10
,
{
"simple-mnist"
:
eddl
.
Input
([
784
])
}],
"cifar10"
:
[
10
,
{
"lenet"
:
eddl
.
Input
([
3
,
32
,
32
]),
"vgg16"
:
eddl
.
Input
([
3
,
32
,
32
])
}]
}
# Dictionary relating the network argument with the function that implements it
network_functions_dict
=
{
"simple-mnist"
:
SIMPLE_MNIST
,
"lenet"
:
LeNet
,
"vgg16"
:
VGG16
}
# Check that the dataset is downloaded, the network is implemented and the combination is valid
if
(
dataset
not
in
dataset_list
):
print
(
"The required dataset is not available."
)
elif
(
network
not
in
network_list
):
print
(
"The required network is not available."
)
elif
((
dataset
,
network
)
not
in
combination_list
):
print
(
"The required dataset-network combination is not implemented."
)
else
:
# Obtain the number of classes and the function that implements the network
num_classes
=
dataset_network_dict
.
get
(
dataset
)[
0
]
network_function
=
network_functions_dict
.
get
(
network
)
# Define the model
in_
=
dataset_network_dict
.
get
(
dataset
)[
1
].
get
(
network
)
out
=
network_function
(
in_
,
num_classes
)
net
=
eddl
.
Model
([
in_
],
[
out
])
##########################
##### MODEL BUILDING #####
##########################
compss_api
.
build
(
net
,
dataset
,
network
,
use_gpu
)
eddl
.
summary
(
net
)
###########################
##### DATASET LOADING #####
###########################
x_train
=
eddlT
.
load
(
CVAR_DATASET_PATH
+
dataset
+
"_trX.bin"
)
y_train
=
eddlT
.
load
(
CVAR_DATASET_PATH
+
dataset
+
"_trY.bin"
)
x_test
=
eddlT
.
load
(
CVAR_DATASET_PATH
+
dataset
+
"_tsX.bin"
)
y_test
=
eddlT
.
load
(
CVAR_DATASET_PATH
+
dataset
+
"_tsY.bin"
)
# Normalize
eddlT
.
div_
(
x_train
,
255.0
)
eddlT
.
div_
(
x_test
,
255.0
)
# Prepare data for distribution
train_images_per_worker
=
int
(
eddlT
.
getShape
(
x_train
)[
0
]
/
num_workers
)
x_train_dist
=
array
(
x_train
,
train_images_per_worker
)
y_train_dist
=
array
(
y_train
,
train_images_per_worker
)
####################
##### TRAINING #####
####################
start_time
=
timer
()
print
(
"MODEL TRAINING..."
)
print
(
"Num workers: "
,
num_workers
)
print
(
"Number of epochs: "
,
num_epochs
)
# Initial parameters that every computing unit will take in order to begin the training
model_params
=
net_parametersToNumpy
(
eddl
.
get_parameters
(
net
))
if
(
sync_type
==
0
):
model_params
=
compss_api
.
fit_sync
(
model_params
,
x_train_dist
,
y_train_dist
,
num_workers
,
num_epochs
,
workers_batch_size
,
use_gpu
)
elif
(
sync_type
==
1
):
model_params
=
compss_api
.
fit_async
(
model_params
,
x_train_dist
,
y_train_dist
,
num_workers
,
num_epochs
,
workers_batch_size
,
use_gpu
)
elif
(
sync_type
==
2
):
model_params
=
compss_api
.
fit_full_async
(
model_params
,
x_train_dist
,
y_train_dist
,
num_workers
,
num_epochs
,
workers_batch_size
,
use_gpu
)
else
:
print
(
"No such sync type option available"
)
# Set the parameters of the model to the aggregated parameters
eddl
.
set_parameters
(
net
,
net_parametersToTensor
(
model_params
))
end_time
=
timer
()
final_time
=
end_time
-
start_time
print
(
"Total elapsed time: "
,
str
(
round
(
final_time
,
2
)),
" seconds"
)
######################
##### EVALUATION #####
######################
print
(
"Evaluating model against test set"
)
eddl
.
evaluate
(
net
,
[
x_test
],
[
y_test
])
if
__name__
==
"__main__"
:
parser
=
argparse
.
ArgumentParser
(
description
=
__doc__
)
parser
.
add_argument
(
"--dataset"
,
type
=
str
,
metavar
=
"STR"
,
default
=
"mnist"
)
# The dataset to work with
parser
.
add_argument
(
"--network"
,
type
=
str
,
metavar
=
"STR"
,
default
=
"simple-mnist"
)
# The network that will be trained
parser
.
add_argument
(
"--num_workers"
,
type
=
int
,
metavar
=
"INT"
,
default
=
4
)
# Number of computing units to divide the execution
parser
.
add_argument
(
"--num_epochs"
,
type
=
int
,
metavar
=
"INT"
,
default
=
10
)
# Number of epochs to run the training
parser
.
add_argument
(
"--workers_batch_size"
,
type
=
int
,
metavar
=
"INT"
,
default
=
250
)
# Size of each batch of the training phase
parser
.
add_argument
(
"--gpu"
,
type
=
bool
,
metavar
=
"BOOL"
,
default
=
True
)
# True: Use GPU as CS --- False: Use CPU as CS
parser
.
add_argument
(
"--sync_type"
,
type
=
int
,
metavar
=
"INT"
,
default
=
0
)
# 0: synchronous --- 1: asynchronous --- 2: fully asynchronous
main
(
parser
.
parse_args
(
sys
.
argv
[
1
:]))
docker/pyeddl/eddl_worker_distributed.py
0 → 100644
View file @
0b018475
import
numpy
as
np
import
random
import
pyeddl.eddl
as
eddl
from
pycompss.api.constraint
import
constraint
from
pycompss.api.parameter
import
*
from
pycompss.api.task
import
task
from
pyeddl.tensor
import
Tensor
as
eddlT
from
cvars
import
*
from
eddl_array
import
to_tensor
from
net_utils
import
net_parametersToNumpy
from
net_utils
import
net_parametersToTensor
from
models
import
SIMPLE_MNIST
,
LeNet
,
VGG16
class
Eddl_Compss_Distributed
:
def
__init__
(
self
):
self
.
model
=
None
@
constraint
(
computing_units
=
"${OMP_NUM_THREADS}"
)
@
task
(
dataset
=
IN
,
network
=
IN
,
use_gpu
=
IN
,
is_replicated
=
True
)
def
build
(
self
,
dataset
,
network
,
use_gpu
):
# Dictionary relating the dataset with its number of classes and the first layer of the associated network
dataset_network_dict
=
{
"mnist"
:
[
10
,
{
"simple-mnist"
:
eddl
.
Input
([
784
])
}],
"cifar10"
:
[
10
,
{
"lenet"
:
eddl
.
Input
([
3
,
32
,
32
]),
"vgg16"
:
eddl
.
Input
([
3
,
32
,
32
])
}]
}
# Dictionary relating the network argument with the function that implements it
network_functions_dict
=
{
"simple-mnist"
:
SIMPLE_MNIST
,
"lenet"
:
LeNet
,
"vgg16"
:
VGG16
}
# Obtain the number of classes and the function that implements the network
num_classes
=
dataset_network_dict
.
get
(
dataset
)[
0
]
network_function
=
network_functions_dict
.
get
(
network
)
# Define the model
in_
=
dataset_network_dict
.
get
(
dataset
)[
1
].
get
(
network
)
out
=
network_function
(
in_
,
num_classes
)
net
=
eddl
.
Model
([
in_
],
[
out
])
# Define the computing service to use
CS
=
eddl
.
CS_GPU
()
if
use_gpu
else
eddl
.
CS_CPU
()
# Build the model in this very node
eddl
.
build
(
net
,
eddl
.
sgd
(
CVAR_SGD1
,
CVAR_SGD2
),
[
"soft_cross_entropy"
],
[
"categorical_accuracy"
],
CS
,
False
)
# Save the model. We have to serialize it to a string so COMPSs is able to serialize and deserialize from disk
self
.
model
=
eddl
.
serialize_net_to_onnx_string
(
net
,
False
)
@
constraint
(
computing_units
=
"${OMP_NUM_THREADS}"
)
@
task
(
x_train
=
{
Type
:
COLLECTION_IN
,
Depth
:
2
},
y_train
=
{
Type
:
COLLECTION_IN
,
Depth
:
2
},
model_params
=
IN
,
num_images_per_worker
=
IN
,
workers_batch_size
=
IN
,
use_gpu
=
IN
,
target_direction
=
IN
)
def
train_batch
(
self
,
x_train
,
y_train
,
model_params
,
num_images_per_worker
,
workers_batch_size
,
use_gpu
):
# Convert data to tensors
x_train
=
to_tensor
(
x_train
)
y_train
=
to_tensor
(
y_train
)
# Deserialize from disk
model
=
eddl
.
import_net_from_onnx_string
(
self
.
model
)
# Define the computing service to use
CS
=
eddl
.
CS_GPU
()
if
use_gpu
else
eddl
.
CS_CPU
()
# Build the model after deserializing and before injecting the parameters
eddl
.
build
(
model
,
eddl
.
sgd
(
CVAR_SGD1
,
CVAR_SGD2
),
[
"soft_cross_entropy"
],
[
"categorical_accuracy"
],
CS
,
False
)
# Set the parameters sent from master to the model and reset loss
eddl
.
set_parameters
(
model
,
net_parametersToTensor
(
model_params
))
eddl
.
reset_loss
(
model
)
# Define the number of minibatches to compute
num_mini_batches
=
int
(
num_images_per_worker
/
workers_batch_size
)
for
j
in
range
(
num_mini_batches
):
# Select the mini-batch indices
mini_batch_indices
=
list
(
range
(
j
*
workers_batch_size
,(
j
+
1
)
*
workers_batch_size
-
1
))
eddl
.
train_batch
(
model
,
[
x_train
],
[
y_train
],
mini_batch_indices
)
eddl
.
print_loss
(
model
,
num_mini_batches
)
print
(
"
\n
Train batch individual completed in worker's train batch task
\n
"
)
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
local_model_parameters
=
net_parametersToNumpy
(
eddl
.
get_parameters
(
model
))
return
local_model_parameters
@
constraint
(
computing_units
=
"${OMP_NUM_THREADS}"
)
@
task
(
model_params
=
COMMUTATIVE
,
parameters_to_aggregate
=
IN
,
mult_factor
=
IN
,
target_direction
=
IN
)
def
aggregate_parameters_async
(
self
,
model_params
,
parameters_to_aggregate
,
mult_factor
):
for
i
in
range
(
0
,
len
(
model_params
)):
for
j
in
range
(
0
,
len
(
model_params
[
i
])):
model_params
[
i
][
j
]
=
(
(
model_params
[
i
][
j
]
+
parameters_to_aggregate
[
i
][
j
])
/
2
).
astype
(
np
.
float32
)
return
model_params
\ No newline at end of file
docker/pyeddl/eddl_worker_sequential.py
0 → 100644
View file @
0b018475
import
numpy
as
np
import
random
import
pyeddl.eddl
as
eddl
from
pyeddl.tensor
import
Tensor
as
eddlT
from
cvars
import
*
from
eddl_array
import
to_tensor
from
net_utils
import
net_parametersToNumpy
from
net_utils
import
net_parametersToTensor
from
models
import
SIMPLE_MNIST
,
LeNet
,
VGG16
class
Eddl_Compss_Distributed
:
def
__init__
(
self
):
self
.
model
=
None
def
build
(
self
,
dataset
,
network
,
use_gpu
):
# Dictionary relating the dataset with its number of classes and the first layer of the associated network
dataset_network_dict
=
{
"mnist"
:
[
10
,
{
"simple-mnist"
:
eddl
.
Input
([
784
])
}],
"cifar10"
:
[
10
,
{
"lenet"
:
eddl
.
Input
([
3
,
32
,
32
]),
"vgg16"
:
eddl
.
Input
([
3
,
32
,
32
])
}]
}
# Dictionary relating the network argument with the function that implements it
network_functions_dict
=
{
"simple-mnist"
:
SIMPLE_MNIST
,
"lenet"
:
LeNet
,
"vgg16"
:
VGG16
}
# Obtain the number of classes and the function that implements the network
num_classes
=
dataset_network_dict
.
get
(
dataset
)[
0
]
network_function
=
network_functions_dict
.
get
(
network
)
# Define the model
in_
=
dataset_network_dict
.
get
(
dataset
)[
1
].
get
(
network
)
out
=
network_function
(
in_
,
num_classes
)
net
=
eddl
.
Model
([
in_
],
[
out
])
# Define the computing service to use
CS
=
eddl
.
CS_GPU
()
if
use_gpu
else
eddl
.
CS_CPU
()
# Build the model in this very node
eddl
.
build
(
net
,
eddl
.
sgd
(
CVAR_SGD1
,
CVAR_SGD2
),
[
"soft_cross_entropy"
],
[
"categorical_accuracy"
],
CS
,
False
)
# Save the model. We have to serialize it to a string so COMPSs is able to serialize and deserialize from disk
self
.
model
=
eddl
.
serialize_net_to_onnx_string
(
net
,
False
)
def
train_batch
(
self
,
x_train
,
y_train
,
model_params
,
num_images_per_worker
,
workers_batch_size
,
use_gpu
):
# Convert data to tensors
x_train
=
to_tensor
(
x_train
)
y_train
=
to_tensor
(
y_train
)
# Deserialize from disk
model
=
eddl
.
import_net_from_onnx_string
(
self
.
model
)
# Define the computing service to use
CS
=
eddl
.
CS_GPU
()
if
use_gpu
else
eddl
.
CS_CPU
()
# Build the model after deserializing and before injecting the parameters
eddl
.
build
(
model
,
eddl
.
sgd
(
CVAR_SGD1
,
CVAR_SGD2
),
[
"soft_cross_entropy"
],
[
"categorical_accuracy"
],
CS
,
False
)
# Set the parameters sent from master to the model and reset loss
eddl
.
set_parameters
(
model
,
net_parametersToTensor
(
model_params
))
eddl
.
reset_loss
(
model
)
# Define the number of minibatches to compute
num_mini_batches
=
int
(
num_images_per_worker
/
workers_batch_size
)
for
j
in
range
(
num_mini_batches
):
# Select the mini-batch indices
mini_batch_indices
=
list
(
range
(
j
*
workers_batch_size
,(
j
+
1
)
*
workers_batch_size
-
1
))
eddl
.
train_batch
(
model
,
[
x_train
],
[
y_train
],
mini_batch_indices
)
eddl
.
print_loss
(
model
,
num_mini_batches
)
print
(
"
\n
Train batch individual completed in worker's train batch task
\n
"
)
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
local_model_parameters
=
net_parametersToNumpy
(
eddl
.
get_parameters
(
model
))
return
local_model_parameters
def
aggregate_parameters_async
(
self
,
model_params
,
parameters_to_aggregate
,
mult_factor
):
for
i
in
range
(
0
,
len
(
model_params
)):
for
j
in
range
(
0
,
len
(
model_params
[
i
])):
model_params
[
i
][
j
]
=
(
(
model_params
[
i
][
j
]
+
parameters_to_aggregate
[
i
][
j
])
/
2
).
astype
(
np
.
float32
)
return
model_params
\ No newline at end of file
docker/pyeddl/simple_examples/runcompss.sh
0 → 100644
View file @
0b018475
#!/bin/bash
# If you want to execute test python example
#runcompss --resources=/var/local/compss_conf/resources.xml --project=/var/local/compss_conf/project.xml --master_name=10.1.54-38 /tutorial_apps-stable/python/simple/src/simple.py 5
# If you want to execute pyddl in k8s
#source get_pods_ip.sh
#masterIP=(${Nodes[2]})
echo
"MasterIP is:"
$MY_POD_IP
conda run
--no-capture-output
-n
pyeddl_pycompss_env runcompss
--lang
=
python
--python_interpreter
=
python3
--project
=
/root/project.xml
--resources
=
/root/resources.xml
--master_name
=
$MY_POD_IP
simple1.py
docker/pyeddl/simple_examples/simple1.py
0 → 100644
View file @
0b018475
"""
\
SIMPLE EXAMPLE
"""
from
pycompss.api.api
import
compss_wait_on
from
pycompss.api.task
import
task