Commit b3766355 authored by Unai Perez's avatar Unai Perez
Browse files

Added support for Extrae on Docker containers

parent a56d534a
......@@ -364,7 +364,6 @@ public class NIOAdaptor extends NIOAgent implements CommAdaptor {
long time = System.currentTimeMillis();
int taskId = job.getTaskId();
taskStartTimes.put(taskId, time);
System.out.println("NIO Job " + taskId + " start time " + time);
worker.submitTask(job, obsoleteRenamings);
}
......@@ -438,7 +437,6 @@ public class NIOAdaptor extends NIOAgent implements CommAdaptor {
NIOProfile p = tr.getProfile();
p.setStartTimeMaster(taskStartTimes.get(taskId));
p.setEndTimeMaster(time);
System.out.println("NIO Job " + taskId + " end time " + time);
//nj.taskFinished(successful, e, tr.getProfile());
nj.taskFinished(successful, e, p);
......
......@@ -4,12 +4,15 @@ import es.bsc.comm.nio.NIONode;
import es.bsc.compss.COMPSsConstants;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.exceptions.InitNodeException;
import es.bsc.compss.log.Loggers;
import es.bsc.compss.nio.NIOTracer;
import es.bsc.compss.nio.master.NIOAdaptor;
import es.bsc.compss.nio.master.NIOWorkerNode;
import es.bsc.compss.nio.master.handlers.Ender;
import es.bsc.compss.util.Tracer;
import org.apache.logging.log4j.LogManager;
import java.io.File;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
......@@ -45,8 +48,8 @@ public abstract class ContainerStarter extends Starter {
}
@Override
protected String[] getStartCommand(int workerPort, String masterName) throws InitNodeException {
private String[] useContainer(int workerPort, String masterName) throws InitNodeException {
String workerLibPath = "";
String libPathFromFile = nw.getLibPath();
if (!libPathFromFile.isEmpty()) {
......@@ -58,6 +61,7 @@ public abstract class ContainerStarter extends Starter {
} else {
workerLibPath = LIBPATH_FROM_ENVIRONMENT;
}
workerLibPath += ":/opt/COMPSs/Dependencies/extrae/lib";
String appDir = nw.getAppDir();
if (appDir == null || appDir.isEmpty()) {
......@@ -209,12 +213,12 @@ public abstract class ContainerStarter extends Starter {
cmd.add("/usr/bin/java");
cmd.addAll(Arrays.asList(jvmFlags));
cmd.addAll(Arrays.asList("-XX:+PerfDisableSharedMem", "-XX:-UsePerfData", "-XX:+UseG1GC",
"-XX:+UseThreadPriorities", "-XX:ThreadPriorityPolicy=42",
"-Dlog4j.configurationFile=" + installDir + "/Runtime/configuration/log/" + itlog4jFile,
"-Dcompss.python.interpreter=" + pythonInterpreter, "-Dcompss.python.version=" + pythonVersion,
"-Dcompss.python.virtualenvironment=" + pythonVirtualEnvironment,
"-Dcompss.python.propagate_virtualenvironment=" + pythonPropagateVirtualEnvironment,
"-Dcompss.worker.removeWD=false", "-Djava.library.path=" + workerLibPath));
"-Dcompss.worker.removeWD=false", "-Dcompss.streaming=NONE",
"-Djava.library.path=" + workerLibPath));
cmd.addAll(Arrays.asList("-cp", workerClasspath));
cmd.add(NIO_WORKER_CLASS_NAME);
cmd.add(Boolean.toString(debug)); // 0
......@@ -255,6 +259,231 @@ public abstract class ContainerStarter extends Starter {
return cmd.toArray(new String[0]);
}
private String [] usePersistentWorker(int workerPort, String masterName) throws InitNodeException {
final String workingDir = this.nw.getWorkingDir();
final String installDir = this.nw.getInstallDir();
final String appDir = this.nw.getAppDir();
// Merge command classpath and worker defined classpath
String workerClasspath = "";
String classpathFromFile = this.nw.getClasspath();
if (!classpathFromFile.isEmpty()) {
if (!CLASSPATH_FROM_ENVIRONMENT.isEmpty()) {
workerClasspath = classpathFromFile + LIB_SEPARATOR + CLASSPATH_FROM_ENVIRONMENT;
} else {
workerClasspath = classpathFromFile;
}
} else {
workerClasspath = CLASSPATH_FROM_ENVIRONMENT;
}
// Merge command pythonpath and worker defined pythonpath
String workerPythonpath = "";
String pythonpathFromFile = this.nw.getPythonpath();
if (!pythonpathFromFile.isEmpty()) {
if (!PYTHONPATH_FROM_ENVIRONMENT.isEmpty()) {
workerPythonpath = pythonpathFromFile + LIB_SEPARATOR + PYTHONPATH_FROM_ENVIRONMENT;
} else {
workerPythonpath = pythonpathFromFile;
}
} else {
workerPythonpath = PYTHONPATH_FROM_ENVIRONMENT;
}
// Merge command libpath and machine defined libpath
String workerLibPath = "";
String libPathFromFile = this.nw.getLibPath();
if (!libPathFromFile.isEmpty()) {
if (!LIBPATH_FROM_ENVIRONMENT.isEmpty()) {
workerLibPath = libPathFromFile + LIB_SEPARATOR + LIBPATH_FROM_ENVIRONMENT;
} else {
workerLibPath = libPathFromFile;
}
} else {
workerLibPath = LIBPATH_FROM_ENVIRONMENT;
}
// Get JVM Flags
String workerJVMflags = System.getProperty(COMPSsConstants.WORKER_JVM_OPTS);
String[] jvmFlags = new String[0];
if (workerJVMflags != null && !workerJVMflags.isEmpty()) {
jvmFlags = workerJVMflags.split(",");
}
// Get FPGA reprogram args
String workerFPGAargs = System.getProperty(COMPSsConstants.WORKER_FPGA_REPROGRAM);
String[] fpgaArgs = new String[0];
if (workerFPGAargs != null && !workerFPGAargs.isEmpty()) {
fpgaArgs = workerFPGAargs.split(" ");
}
// Configure worker debug level
final String workerDebug = Boolean.toString(LogManager.getLogger(Loggers.WORKER).isDebugEnabled());
// Configure storage
String storageConf = System.getProperty(COMPSsConstants.STORAGE_CONF);
if (storageConf == null || storageConf.equals("") || storageConf.equals("null")) {
storageConf = "null";
}
String executionType = System.getProperty(COMPSsConstants.TASK_EXECUTION);
if (executionType == null || executionType.equals("") || executionType.equals("null")) {
executionType = COMPSsConstants.TaskExecution.COMPSS.toString();
}
// configure persistent_worker_c execution
String workerPersistentC = System.getProperty(COMPSsConstants.WORKER_PERSISTENT_C);
if (workerPersistentC == null || workerPersistentC.isEmpty() || workerPersistentC.equals("null")) {
workerPersistentC = COMPSsConstants.DEFAULT_PERSISTENT_C;
}
// Configure python interpreter
String pythonInterpreter = System.getProperty(COMPSsConstants.PYTHON_INTERPRETER);
if (pythonInterpreter == null || pythonInterpreter.isEmpty() || pythonInterpreter.equals("null")) {
pythonInterpreter = COMPSsConstants.DEFAULT_PYTHON_INTERPRETER;
}
// Configure python version
String pythonVersion = System.getProperty(COMPSsConstants.PYTHON_VERSION);
if (pythonVersion == null || pythonVersion.isEmpty() || pythonVersion.equals("null")) {
pythonVersion = COMPSsConstants.DEFAULT_PYTHON_VERSION;
}
// Configure python virtual environment
String pythonVirtualEnvironment = System.getProperty(COMPSsConstants.PYTHON_VIRTUAL_ENVIRONMENT);
if (pythonVirtualEnvironment == null || pythonVirtualEnvironment.isEmpty()
|| pythonVirtualEnvironment.equals("null")) {
pythonVirtualEnvironment = COMPSsConstants.DEFAULT_PYTHON_VIRTUAL_ENVIRONMENT;
}
String pythonPropagateVirtualEnvironment =
System.getProperty(COMPSsConstants.PYTHON_PROPAGATE_VIRTUAL_ENVIRONMENT);
if (pythonPropagateVirtualEnvironment == null || pythonPropagateVirtualEnvironment.isEmpty()
|| pythonPropagateVirtualEnvironment.equals("null")) {
pythonPropagateVirtualEnvironment = COMPSsConstants.DEFAULT_PYTHON_PROPAGATE_VIRTUAL_ENVIRONMENT;
}
String pythonMpiWorker = System.getProperty(COMPSsConstants.PYTHON_MPI_WORKER);
if (pythonMpiWorker == null || pythonMpiWorker.isEmpty() || pythonMpiWorker.equals("null")) {
pythonMpiWorker = COMPSsConstants.DEFAULT_PYTHON_MPI_WORKER;
}
// /*
// * ************************************************************************************************************
// * BUILD COMMAND
// * ************************************************************************************************************
// /
String[] cmd = new String[NIOAdaptor.NUM_PARAMS_PER_WORKER_SH + NIOAdaptor.NUM_PARAMS_NIO_WORKER
+ jvmFlags.length + 1 + fpgaArgs.length];
// /* SCRIPT ************************************************ /
cmd[0] = installDir + (installDir.endsWith(File.separator) ? "" : File.separator) + STARTER_SCRIPT_PATH
+ STARTER_SCRIPT_NAME; // 0 script directory
// /* Values ONLY for persistent_worker.sh ****************** /
cmd[1] = workerLibPath.isEmpty() ? "null" : workerLibPath; // 1
cmd[2] = DEFAULT_CONTAINER_APP_DIR;
cmd[3] = workerClasspath.isEmpty() ? "null" : workerClasspath; // 3
cmd[4] = Comm.getStreamingBackend().name();
cmd[5] = String.valueOf(jvmFlags.length);
for (int i = 0; i < jvmFlags.length; ++i) {
cmd[NIOAdaptor.NUM_PARAMS_PER_WORKER_SH + i] = jvmFlags[i];
}
int nextPosition = NIOAdaptor.NUM_PARAMS_PER_WORKER_SH + jvmFlags.length;
cmd[nextPosition++] = String.valueOf(fpgaArgs.length);
for (String fpgaArg : fpgaArgs) {
cmd[nextPosition++] = fpgaArg;
}
// /* Values for NIOWorker ********************************** /
cmd[nextPosition++] = workerDebug; // 0
// Internal parameters
cmd[nextPosition++] = String.valueOf(NIOAdaptor.MAX_SEND_WORKER); // 1
cmd[nextPosition++] = String.valueOf(NIOAdaptor.MAX_RECEIVE_WORKER); // 2
cmd[nextPosition++] = this.nw.getName(); // 3
cmd[nextPosition++] = String.valueOf(workerPort); // 4
cmd[nextPosition++] = masterName; // 5
cmd[nextPosition++] = String.valueOf(NIOAdaptor.MASTER_PORT); // 6
cmd[nextPosition++] = String.valueOf(Comm.getStreamingPort());
// Worker parameters
cmd[nextPosition++] = String.valueOf(this.nw.getTotalComputingUnits()); // 7
cmd[nextPosition++] = String.valueOf(this.nw.getTotalGPUs()); // 8
cmd[nextPosition++] = String.valueOf(this.nw.getTotalFPGAs()); // 9
// get cpu_affinity from properties
String cpuAffinity = nw.getConfiguration().getProperty("cpu_affinity"); // 10
if (cpuAffinity != null) {
cmd[nextPosition++] = String.valueOf(CPU_AFFINITY);
} else {
cmd[nextPosition++] = String.valueOf(CPU_AFFINITY);
}
cmd[nextPosition++] = String.valueOf(GPU_AFFINITY); // 11
cmd[nextPosition++] = String.valueOf(FPGA_AFFINITY); // 12
cmd[nextPosition++] = String.valueOf(this.nw.getLimitOfTasks()); // 13
// Application parameters
cmd[nextPosition++] = DEPLOYMENT_ID; // 14
cmd[nextPosition++] = System.getProperty(COMPSsConstants.LANG); // 15
cmd[nextPosition++] = workingDir; // 16
cmd[nextPosition++] = this.nw.getInstallDir(); // 17
cmd[nextPosition++] = cmd[2]; // 18
cmd[nextPosition++] = workerLibPath.isEmpty() ? "null" : workerLibPath; // 19
cmd[nextPosition++] = workerClasspath.isEmpty() ? "null" : workerClasspath; // 20
cmd[nextPosition++] = workerPythonpath.isEmpty() ? "null" : workerPythonpath; // 21
// Tracing parameters
cmd[nextPosition++] = String.valueOf(NIOTracer.getLevel()); // 22
cmd[nextPosition++] = NIOTracer.getExtraeFile(); // 23
if (Tracer.extraeEnabled()) {
// NumSlots per host is ignored --> 0
Integer hostId = NIOTracer.registerHost(this.nw.getName(), 0);
cmd[nextPosition++] = String.valueOf(hostId.toString()); // 24
} else {
cmd[nextPosition++] = "NoTracinghostID"; // 24
}
// Storage parameters
cmd[nextPosition++] = storageConf; // 25
cmd[nextPosition++] = executionType; // 26
// persistent_c parameter
cmd[nextPosition++] = workerPersistentC; // 27
// Python interpreter parameter
cmd[nextPosition++] = pythonInterpreter; // 28
// Python interpreter version
cmd[nextPosition++] = pythonVersion; // 29
// Python virtual environment parameter
cmd[nextPosition++] = pythonVirtualEnvironment; // 30
// Python propagate virtual environment parameter
cmd[nextPosition++] = pythonPropagateVirtualEnvironment; // 31
// Python use MPI worker parameter
cmd[nextPosition++] = pythonMpiWorker; // 32
if (cmd.length != nextPosition) {
throw new InitNodeException(
"ERROR: Incorrect number of parameters. Expected: " + cmd.length + ". Got: " + nextPosition);
}
return cmd;
}
@Override
protected String[] getStartCommand(int workerPort, String masterName) throws InitNodeException {
if (Tracer.isActivated()) {
return usePersistentWorker(workerPort, masterName);
} else {
return useContainer(workerPort, masterName);
}
}
private boolean isSameNetwork(String[] ip1, String[] ip2, int amount) {
if (amount == 0) {
return true;
......
......@@ -60,6 +60,8 @@ usage() {
container directories. The format is: <host dir>:<cont. dir>,...
This will also work with volumes.
--env Environmental variable to set inside the container.
-- After this flag, the input will not be parsed and it will be
considered as the command with which to initialize the
container.
......@@ -96,6 +98,7 @@ FAIL_IF_PULL=false
SSH_USER=`whoami`
REUSE_EXISTING=false
CHECK_IMAGE=true
ENV_VARS=""
while [ "$1" != "" ]; do
case $1 in
-h | --help)
......@@ -134,6 +137,12 @@ while [ "$1" != "" ]; do
--range)
PORT_RANGE="$2"
shift;;
--env)
if [ -n "${ENV_VARS}" -a "${ENV_VARS}" != "" ]; then
ENV_VARS="${ENV_VARS},"
fi
ENV_VARS="${ENV_VARS}$2"
shift;;
--)
shift
break;;
......@@ -226,4 +235,4 @@ if [ -n "${PORT_RANGE}" ]; then
fi
fi
ssh -o BatchMode=yes -o StrictHostKeyChecking=no ${SSH_USER}@${WORKER_ADDRESS} "/bin/sh -s" < ${COMPSS_HOME:-/opt/COMPSs}/Runtime/scripts/system/adaptors/nio/docker/docker_worker.sh "${IMAGE_NAME}" "${IMAGE_ID-null}" "${CONTAINER_NAME}" "${PULL_IMAGE}" "${CONTAINER_PORTS:-null}" "${PORT_RANGE:-null}" "${CONTAINER_VOLUMES:-null}" "${REPOSITORY:-null}" "${REUSE_EXISTING}" "${FAIL_IF_PULL}" "${LAUNCH_COMMAND}"
\ No newline at end of file
ssh -o BatchMode=yes -o StrictHostKeyChecking=no ${SSH_USER}@${WORKER_ADDRESS} "/bin/sh -s" < ${COMPSS_HOME:-/opt/COMPSs}/Runtime/scripts/system/adaptors/nio/docker/docker_worker.sh "${IMAGE_NAME}" "${IMAGE_ID-null}" "${CONTAINER_NAME}" "${PULL_IMAGE}" "${CONTAINER_PORTS:-null}" "${PORT_RANGE:-null}" "${CONTAINER_VOLUMES:-null}" "${REPOSITORY:-null}" "${REUSE_EXISTING}" "${FAIL_IF_PULL}" "${ENV_VARS:-null}" "${LAUNCH_COMMAND}"
\ No newline at end of file
......@@ -25,8 +25,9 @@ CONTAINER_VOLUMES="$7"
REPOSITORY="$8"
REUSE_EXISTING="$9"
FAIL_IF_PULL="${10}"
ENV_VARS="${11}"
shift 10
shift 11
LAUNCH_COMMAND=""
for ARG in "$@"
do
......@@ -70,6 +71,18 @@ else
CONTAINER_VOLUMES="-v `echo ${CONTAINER_VOLUMES} | sed -r 's/,/ -v /g'`"
fi
if [ "${ENV_VARS}" = "" ]; then
unset ENV_VARS
else
L_ENV_VARS=""
for VAR in `echo ${ENV_VARS} | sed -r 's/,/ /g'`; do
if [ -n "`echo ${VAR} | cut -d"=" -f1`" -a -n "`echo ${VAR} | cut -d"=" -f2`" ]; then
L_ENV_VARS="-e ${VAR} "
fi
done
ENV_VARS="${L_ENV_VARS}"
fi
IMAGE_LIST=$(${DOCKER} images --format "{{.Repository}}:{{.Tag}}" | grep "${IMAGE_NAME}")
if [ "$PULL_IMAGE" = "true" ]; then
if [ "$IMAGE_ID" != "null" -a -z "$(${DOCKER} images --format "{{.ID}}" | grep ${IMAGE_ID})" ] || [ -z "${IMAGE_LIST}" ]; then
......@@ -198,7 +211,7 @@ fi
# }
# }" \
# http://v`${DOCKER} version --format "{{.Server.APIVersion}}"`/containers/create?name=${CONTAINER_NAME} > /dev/null
${DOCKER} run -d -t `echo ${CONTAINER_PORTS} | sed 's/,/ /g' | xargs printf -- "-p %s"` ${CONTAINER_VOLUMES} --name ${CONTAINER_NAME} ${IMAGE_NAME} /bin/sh -c "${LAUNCH_COMMAND}" > /dev/null &
${DOCKER} run -d -t `echo ${CONTAINER_PORTS} | sed 's/,/ /g' | xargs printf -- "-p %s"` ${CONTAINER_VOLUMES} ${ENV_VARS} --name ${CONTAINER_NAME} ${IMAGE_NAME} /bin/sh -c "${LAUNCH_COMMAND}" > /dev/null &
#${DOCKER} exec -t -d ${CONTAINER_NAME} /bin/sh -c "${LAUNCH_COMMAND}"
CREATION_FAILED=$?
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment