Commit 0f02c0f4 authored by vmasip's avatar vmasip
Browse files

pyddl project added with datasets. Dockerfile modified with right python version for compss

parent 59cbf255
pyeddl/
\ No newline at end of file
#pyeddl/
\ No newline at end of file
## Guide to execute with kubernetes
In case of using a private docker images repository, use docker repository configutarion in kubernetes (in case it's configured in docker) using following script:
`cd kubernetes && bash create-secret.yaml`
Prepare kubernetes context if you need it:
```
kubectl get namespaces
kubectl config get-contexts
kubectl config set-context deephealth-bsc-context --namespace=compss-space
```
Or if you already have one you want to use:
`kubectl config use-context deephealth-bsc-context`
Prepare your image. Dockerfile is in docker folder. Execute Makefile in case you need to build and push to docker repository everytime.
If you want to deploy k8s overriding existing one, first delete with command:
`kubectl delete -f compss_deephealth.yaml`
Go to kubernetes folder and execute:
`kubectl create -f compss_deephealth.yaml`
In case of need anything from the pod, you can get log:
`kubectl logs **POD NAME** -f`
You can also execute commands into pod's containers like this:
`kubectl exec -it **POD NAME** -- /bin/bash` (in case pod only has one container)
......@@ -46,14 +46,11 @@ RUN set -x && \
WORKDIR /root
RUN mkdir pyeddl && \
mkdir pyeddl/third_party && \
mkdir pyeddl/third_party/compss_runtime
RUN mkdir pyeddl
COPY pyeddl/third_party/compss_runtime/ pyeddl/third_party/compss_runtime/
# Creating environment and activating it for next dockerfile runs
RUN conda create --name pyeddl_pycompss_env
RUN conda create --name "pyeddl_pycompss_env" python=3.6.15
SHELL ["conda", "run", "-n", "pyeddl_pycompss_env", "/bin/bash", "-c"]
......@@ -64,24 +61,27 @@ RUN conda config --add channels dhealth && \
conda install pyeddl-cpu
# pycompss intallation
RUN pip install pycompss
# Some useful utils for users
RUN apt-get update && \
apt-get install -y apt-utils zip unzip nano jq curl
RUN pip install pycompss dislib
# Few useful utils for users
# RUN apt-get update -y && \
# apt-get install -y apt-utils zip unzip nano jq curl
# Copy compss xml files and conf. scripts
COPY compss /root
# Prepare activation of pyeddl env. Works when user uses run exec. TODO: prepare entrypoint
# # Prepare activation of pyeddl env. Works when user uses run exec. TODO: prepare entrypoint
SHELL ["/bin/bash", "-c"]
RUN echo "export PATH="/opt/conda/bin:$PATH"" > ~/.bashrc
RUN echo "source activate pyeddl_pycompss_env" >> ~/.bashrc
ENV PATH /opt/conda/envs/env/bin:$PATH
RUN source ~/.bashrc
COPY pyeddl/ pyeddl/
EXPOSE 22
# Tried, but doesn't work:
# CMD ["/usr/sbin/sshd","-D"]
# ENTRYPOINT service ssh restart && /bin/bash
......
......@@ -19,4 +19,5 @@ echo "${API_SERVER}/api/v1/namespaces/${NAMESPACE}/endpoints"
Nodes=$(curl --cacert ${CACERT} --header "Authorization: Bearer ${TOKEN}" -X GET \
"${APISERVER}/api/v1/namespaces/${NAMESPACE}/endpoints/" | jq -rM ".items[].subsets[].addresses[].ip" | xargs echo)
echo "Pods IP's are:"
echo $Nodes
\ No newline at end of file
......@@ -6,4 +6,4 @@
#masterIP=(${Nodes[2]})
echo "MasterIP is:" $MY_POD_IP
cd pyeddl/third_party/compss_runtime/
conda run --no-capture-output -n pyeddl_pycompss_env runcompss -d --lang=python --python_interpreter=python3 --project=/root/project.xml --resources=/root/resources.xml --master_name=$MY_POD_IP eddl_fit_compss.py
\ No newline at end of file
conda run --no-capture-output -n pyeddl_pycompss_env runcompss -d --lang=python --python_interpreter=python3 --project=/root/project.xml --resources=/root/resources.xml --master_name=$MY_POD_IP eddl_train_batch_compss_mnist_sync.py
\ No newline at end of file
import os
CVAR_SGD1 = 0.01
CVAR_SGD2 = 0.9
CVAR_CURRENT_PATH = os.getcwd()
CVAR_PARENT_PATH = os.path.abspath(os.path.join(CVAR_CURRENT_PATH, os.pardir))
CVAR_DATASET_PATH = os.path.join(CVAR_CURRENT_PATH, "datasets/")
from typing import MutableSequence
import numpy as np
from dislib import utils
from dislib.data.array import Array
from pycompss.api.parameter import *
from pycompss.api.task import task
from pyeddl.tensor import Tensor as eddlT
from scipy.sparse import issparse
from pycompss.api.constraint import constraint
# noinspection PyProtectedMember
#@task(tensor={Type: COLLECTION_IN, Depth: 2}, returns=np.array)
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(returns=np.array)
def _apply_to_tensor(func, tensor, *args, **kwargs):
tensor = to_tensor(tensor)
func(tensor, *args, **kwargs)
return eddlT.getdata(tensor)
# noinspection PyProtectedMember
def apply(func, x, *args, **kwargs):
out_blocks = []
for block in x._iterator():
out = _apply_to_tensor(func, block._blocks, *args, **kwargs)
out_blocks.append([out])
return Array(blocks=out_blocks,
top_left_shape=x._top_left_shape,
reg_shape=x._reg_shape,
shape=x.shape,
sparse=False)
# noinspection PyProtectedMember
def array(x, block_size):
if not isinstance(x, np.ndarray):
x = eddlT.getdata(x)
if not isinstance(block_size, MutableSequence):
block_size = (block_size, *x.shape[1:])
blocks = []
for i in range(0, x.shape[0], block_size[0]):
row = x[i: i + block_size[0], :]
blocks.append([row])
sparse = issparse(x)
arr = Array(blocks=blocks,
top_left_shape=block_size,
reg_shape=block_size,
shape=x.shape,
sparse=sparse)
print("DisLib array: " + str(arr))
return arr
# noinspection PyProtectedMember
def paired_partition(x, y):
for block_x, block_y in utils.base._paired_partition(x, y):
yield block_x._blocks, block_y._blocks
def to_tensor(tensor):
tensor = Array._merge_blocks(tensor)
return eddlT.fromarray(tensor)
import numpy as np
import pyeddl.eddl as eddl
from pycompss.api.constraint import constraint
from pycompss.api.parameter import *
from pycompss.api.task import task
from pyeddl.tensor import Tensor as eddlT
from cvars import *
from eddl_array import to_tensor
from net_utils import net_parametersToNumpy
from net_utils import net_parametersToTensor
class Eddl_Compss_Distributed:
def __init__(self):
self.model = None
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(serialized_model=IN, optimizer=IN, losses=IN, metrics=IN, compserv=IN, is_replicated=True)
def build(self, serialized_model, optimizer, losses, metrics, compserv):
print("Arranca build task en worker")
# Deserialize the received model
model = eddl.import_net_from_onnx_string(serialized_model)
# print(eddl.summary(model))
# Build the model in this very node
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
losses,
metrics,
eddl.CS_CPU(mem="full_mem"),
False
)
# Save the model. We have to serialize it to a string so COMPSs is able to serialize and deserialize from disk
self.model = eddl.serialize_net_to_onnx_string(model, False)
print("Finaliza build task en worker")
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(
x_train={Type: COLLECTION_IN, Depth: 2},
y_train={Type: COLLECTION_IN, Depth: 2},
initial_parameters=IN,
num_images_per_worker=IN,
num_epochs_for_param_sync=IN,
workers_batch_size=IN,
target_direction=IN)
def train_batch(self,
x_train,
y_train,
initial_parameters,
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size):
# Convert data to tensors
x_train = to_tensor(x_train)
y_train = to_tensor(y_train)
print("Entrando en train batch task en worker")
import sys
sys.stdout.flush()
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
print("Modelo deserializado de disco")
sys.stdout.flush()
# The model needs to be built after deserializing and before injecting the parameters
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
eddl.CS_CPU(mem="full_mem"),
False
)
print("Modelo built")
sys.stdout.flush()
# Set the parameters sent from master to the model
#model.setParameters(net_parametersToTensor(initial_parameters))
eddl.set_parameters(model, net_parametersToTensor(initial_parameters))
print("Parametros seteados")
sys.stdout.flush()
# print(eddl.summary(model))
print("Build completed in train batch task")
sys.stdout.flush()
num_batches = int(num_images_per_worker / workers_batch_size)
eddl.reset_loss(model)
print("Num batches: ", num_batches)
sys.stdout.flush()
for i in range(num_epochs_for_param_sync):
print("Epoch %d/%d (%d batches)" % (i + 1, num_epochs_for_param_sync, num_batches))
for j in range(num_batches):
indices = list(range(j * num_batches, (j + 1) * num_batches - 1))
print("Empieza batch")
sys.stdout.flush()
eddl.train_batch(model, [x_train], [y_train], indices)
print("Finaliza batch")
sys.stdout.flush()
eddl.print_loss(model, j)
print()
print("Train batch individual completed in train batch task")
sys.stdout.flush()
# Random noise time sleep for experimentation purposes
# import time
# import random
# time.sleep(random.randint(1, 180))
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
final_parameters = net_parametersToNumpy(eddl.get_parameters(model))
return final_parameters
@task(
x_train={Type: COLLECTION_IN, Depth: 2},
y_train={Type: COLLECTION_IN, Depth: 2},
initial_parameters=IN,
num_images_per_worker=IN,
num_epochs_for_param_sync=IN,
workers_batch_size=IN,
target_direction=IN)
def train_batch_async(self,
x_train,
y_train,
initial_parameters,
num_images_per_worker,
num_epochs_for_param_sync,
workers_batch_size):
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
# Set the parameters sent from master to the model
#model.setParameters(net_parametersToTensor(initial_parameters))
eddl.set_parameters(model, net_parametersToTensor(initial_parameters))
# print(eddl.summary(model))
# The model needs to be built after deserializing
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
eddl.CS_CPU(mem="full_mem"),
False
)
print("Build completed in train batch task")
num_batches = int(num_images_per_worker / workers_batch_size)
eddl.reset_loss(model)
print("Num batches: ", num_batches)
for i in range(num_epochs_for_param_sync):
print("Epoch %d/%d (%d batches)" % (i + 1, num_epochs_for_param_sync, num_batches))
for j in range(num_batches):
indices = list(range(j * num_batches, (j + 1) * num_batches - 1))
eddl.train_batch(model, [x_train], [y_train], indices)
eddl.print_loss(model, j)
print()
print("Train batch individual completed in train batch task")
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
#final_parameters = net_parametersToNumpy(model.getParameters())
final_parameters = net_parametersToNumpy(eddl.get_parameters(model, False, True))
return final_parameters
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(accumulated_parameters=COMMUTATIVE, parameters_to_aggregate=IN, mult_factor=IN, target_direction=IN)
def aggregate_parameters_async(self, accumulated_parameters, parameters_to_aggregate, mult_factor):
for i in range(0, len(accumulated_parameters)):
for j in range(0, len(accumulated_parameters[i])):
# accumulated_parameters[i][j] += (parameters_to_aggregate[i][j] * mult_factor).astype(np.float32)
accumulated_parameters[i][j] = (
(accumulated_parameters[i][j] + parameters_to_aggregate[i][j]) / 2).astype(np.float32)
return accumulated_parameters
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(initial_parameters=IN, train_test_flag=IN, target_direction=IN)
def evaluate(self, initial_parameters, train_test_flag):
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
# Set the parameters sent from master to the model
model.setParameters(net_parametersToTensor(initial_parameters))
# print(eddl.summary(model))
# The model needs to be built after deserializing
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
eddl.CS_CPU(mem="full_mem"),
False
)
print("Build completed in evaluate task")
if train_test_flag == "train":
x = eddlT.load(CVAR_DATASET_PATH + CVAR_DATASET_X_TRN)
y = eddlT.load(CVAR_DATASET_PATH + CVAR_DATASET_Y_TRN)
else:
x = eddlT.load(CVAR_DATASET_PATH + CVAR_DATASET_X_TST)
y = eddlT.load(CVAR_DATASET_PATH + CVAR_DATASET_Y_TST)
eddlT.div_(x, 255.0)
print("Evaluating model against " + train_test_flag + " set")
eddl.evaluate(model, [x], [y])
return 1
@constraint(computing_units="${OMP_NUM_THREADS}")
@task(initial_parameters=IN, x_test={Type: COLLECTION_IN, Depth: 2}, y_test={Type: COLLECTION_IN, Depth: 2}, num_images_per_worker=IN, workers_batch_size=IN, target_direction=IN)
def eval_batch(self, x_test, y_test, initial_parameters, num_images_per_worker, workers_batch_size):
#import pickle
#import codecs
#initial_parameters = pickle.loads(codecs.decode(initial_parameters.encode(), "base64"))
print("Entrando en eval batch task en worker")
import sys
sys.stdout.flush()
# Deserialize from disk
model = eddl.import_net_from_onnx_string(self.model)
print("Modelo deserializado de disco")
sys.stdout.flush()
# The model needs to be built after deserializing and before injecting the parameters
eddl.build(
model,
eddl.sgd(CVAR_SGD1, CVAR_SGD2),
["soft_cross_entropy"],
["categorical_accuracy"],
eddl.CS_CPU(mem="full_mem"),
False
)
print("Modelo built")
sys.stdout.flush()
# Set the parameters sent from master to the model
eddl.set_parameters(model, net_parametersToTensor(initial_parameters))
print("Parametros seteados")
sys.stdout.flush()
#print(eddl.summary(model))
print("Build completed in eval batch task")
sys.stdout.flush()
# Convert data to tensors
x_test = to_tensor(x_test)
y_test = to_tensor(y_test)
print("Lenes: ", x_test.shape)
sys.stdout.flush()
num_batches = int(num_images_per_worker / workers_batch_size)
eddl.reset_loss(model)
print("Num batches: ", num_batches)
sys.stdout.flush()
'''for i in range(num_batches):
indices = list(range(0, x_test.shape[0]))
eddl.eval_batch(model, [x_test], [y_test], indices)'''
eddl.evaluate(model, [x_test], [y_test])
print("Eval batch individual completed in eval batch task")
sys.stdout.flush()
# Random noise time sleep for experimentation purposes
#import time
#import random
#time.sleep(random.randint(1, 180))
# Get parameters from the model and convert them to numpy so COMPSS can serialize them
#final_parameters = net_parametersToNumpy(model.getParameters())
# These final results should be a call to getAcc() y getLoss()
final_results = [eddl.get_losses(model)[-1], eddl.get_metrics(model)[-1]]
print("Final results: ", final_results)
sys.stdout.flush()
return final_results
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment