Commit 2460555c authored by vgonzale's avatar vgonzale
Browse files

Merge branch 'ppc/container-merge-fix' into ppc/nuvla-rancher-integration

parents 778b305d 9851add4
......@@ -34,9 +34,9 @@
<properties>
<!-- BSC Components versions -->
<cepbatools-extrae.version>1.4</cepbatools-extrae.version>
<comm.version>1.6-16</comm.version>
<!--<conn.version>1.0-U</conn.version>-->
<conn.version>1.5-34</conn.version>
<comm.version>1.6-20</comm.version>
<!--<conn.version>1.5-35</conn.version>-->
<conn.version>1.0-U</conn.version>
<!-- ZK version -->
<zk.version>8.5.0</zk.version>
......
......@@ -24,6 +24,7 @@ import es.bsc.compss.types.resources.configuration.Configuration;
import es.bsc.compss.types.resources.jaxb.ResourcesExternalAdaptorProperties;
import es.bsc.compss.types.resources.jaxb.ResourcesPropertyAdaptorType;
import es.bsc.compss.types.uri.MultiURI;
import es.bsc.conn.types.StarterCommand;
import java.util.LinkedList;
import java.util.List;
......@@ -60,6 +61,11 @@ public class Adaptor implements CommAdaptor {
return new RemoteRESTAgent(ac);
}
@Override
public COMPSsWorker initWorker(Configuration config, String name, int port) {
return this.initWorker(config);
}
@Override
public void stop() {
// You cannot stop a remote agent.
......@@ -81,4 +87,11 @@ public class Adaptor implements CommAdaptor {
// You can't do that
}
@Override
public StarterCommand getStarterCommand(String workerName, int workerPort, String masterName, String workingDir,
String installDir, String appDir, String classpathFromFile, String pythonpathFromFile, String libPathFromFile,
int totalCPU, int totalGPU, int totalFPGA, int limitOfTasks, String hostId, boolean container) {
return null;
}
}
......@@ -45,6 +45,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>es.bsc.conn</groupId>
<artifactId>commons-conn</artifactId>
<version>${conn.version}</version>
</dependency>
<!-- jUnit -->
<dependency>
<groupId>junit</groupId>
......
......@@ -35,6 +35,7 @@ import es.bsc.compss.util.Classpath;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.TraceEvent;
import es.bsc.compss.util.Tracer;
import es.bsc.conn.types.StarterCommand;
import es.bsc.distrostreamlib.client.DistroStreamClient;
import es.bsc.distrostreamlib.exceptions.DistroStreamClientInitException;
import es.bsc.distrostreamlib.requests.StopRequest;
......@@ -657,4 +658,37 @@ public class Comm {
}
}
/**
* Creates the WorkerStarterCommand.
*
* @param adaptorName name of the adaptor to be used
* @param workerName worker name
* @param workerPort worker Port number
* @param masterName master name
* @param workingDir worker working directory
* @param installDir worker COMPSs install directory
* @param appDir worker application install directory
* @param classpathFromFile worker classpath in projects.xml file
* @param pythonpathFromFile worker python path in projects.xml file
* @param libPathFromFile worker library path path in project.xml file
* @param totalCPU total CPU computing units
* @param totalGPU total GPU
* @param totalFPGA total FPGA
* @param limitOfTasks limit of tasks
* @param hostId tracing worker identifier
* @param container defines whether the worker will be run in a container
* @return WorkerStarterCommand
*/
public static StarterCommand getStarterCommand(String adaptorName, String workerName, int workerPort,
String masterName, String workingDir, String installDir, String appDir, String classpathFromFile,
String pythonpathFromFile, String libPathFromFile, int totalCPU, int totalGPU, int totalFPGA, int limitOfTasks,
String hostId, boolean container) {
CommAdaptor adaptor = ADAPTORS.get(adaptorName);
return adaptor.getStarterCommand(workerName, workerPort, masterName, workingDir, installDir, appDir,
classpathFromFile, pythonpathFromFile, libPathFromFile, totalCPU, totalGPU, totalFPGA, limitOfTasks, hostId,
container);
}
}
......@@ -21,6 +21,7 @@ import es.bsc.compss.types.COMPSsWorker;
import es.bsc.compss.types.data.operation.DataOperation;
import es.bsc.compss.types.resources.configuration.Configuration;
import es.bsc.compss.types.uri.MultiURI;
import es.bsc.conn.types.StarterCommand;
import java.util.List;
import java.util.Map;
......@@ -55,6 +56,16 @@ public interface CommAdaptor {
*/
public COMPSsWorker initWorker(Configuration config);
/**
* Initializes a worker through an adaptor.
*
* @param config Adaptor configuration.
* @param name Name/IP of the worker
* @param port Port from which the worker listens
* @return A COMPSsWorker object representing the initialized worker.
*/
public COMPSsWorker initWorker(Configuration config, String name, int port);
/**
* Stops the Communication Adaptor.
*/
......@@ -79,4 +90,8 @@ public interface CommAdaptor {
*/
public void stopSubmittedJobs();
public StarterCommand getStarterCommand(String workerName, int workerPort, String masterName, String workingDir,
String installDir, String appDir, String classpathFromFile, String pythonpathFromFile, String libPathFromFile,
int totalCPU, int totalGPU, int totalFPGA, int limitOfTasks, String hostId, boolean container);
}
package es.bsc.compss.types;
import es.bsc.compss.COMPSsConstants;
import es.bsc.compss.log.Loggers;
import es.bsc.compss.types.execution.ThreadBinder;
import es.bsc.conn.types.StarterCommand;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public abstract class WorkerStarterCommand implements StarterCommand {
// Logger
protected static final Logger LOGGER = LogManager.getLogger(Loggers.COMM);
// Static Environment variables
protected static final String LIB_SEPARATOR = ":";
protected static final String CLASSPATH_FROM_ENVIRONMENT = (System.getProperty(COMPSsConstants.WORKER_CP) != null
&& !System.getProperty(COMPSsConstants.WORKER_CP).isEmpty()) ? System.getProperty(COMPSsConstants.WORKER_CP)
: "";
protected static final String PYTHONPATH_FROM_ENVIRONMENT = (System.getProperty(COMPSsConstants.WORKER_PP) != null
&& !System.getProperty(COMPSsConstants.WORKER_PP).isEmpty()) ? System.getProperty(COMPSsConstants.WORKER_PP)
: "";
protected static final String LIBPATH_FROM_ENVIRONMENT = (System.getenv(COMPSsConstants.LD_LIBRARY_PATH) != null
&& !System.getenv(COMPSsConstants.LD_LIBRARY_PATH).isEmpty()) ? System.getenv(COMPSsConstants.LD_LIBRARY_PATH)
: "";
protected static final boolean IS_CPU_AFFINITY_DEFINED =
System.getProperty(COMPSsConstants.WORKER_CPU_AFFINITY) != null
&& !System.getProperty(COMPSsConstants.WORKER_CPU_AFFINITY).isEmpty();
protected static final String CPU_AFFINITY =
IS_CPU_AFFINITY_DEFINED ? System.getProperty(COMPSsConstants.WORKER_CPU_AFFINITY)
: ThreadBinder.BINDER_DISABLED;
protected static final boolean IS_GPU_AFFINITY_DEFINED =
System.getProperty(COMPSsConstants.WORKER_GPU_AFFINITY) != null
&& !System.getProperty(COMPSsConstants.WORKER_GPU_AFFINITY).isEmpty();
protected static final String GPU_AFFINITY =
IS_GPU_AFFINITY_DEFINED ? System.getProperty(COMPSsConstants.WORKER_GPU_AFFINITY)
: ThreadBinder.BINDER_DISABLED;
protected static final boolean IS_FPGA_AFFINITY_DEFINED =
System.getProperty(COMPSsConstants.WORKER_FPGA_AFFINITY) != null
&& !System.getProperty(COMPSsConstants.WORKER_FPGA_AFFINITY).isEmpty();
protected static final String FPGA_AFFINITY =
IS_FPGA_AFFINITY_DEFINED ? System.getProperty(COMPSsConstants.WORKER_FPGA_AFFINITY)
: ThreadBinder.BINDER_DISABLED;
protected static final String WORKER_APPDIR_FROM_ENVIRONMENT =
System.getProperty(COMPSsConstants.WORKER_APPDIR) != null
&& !System.getProperty(COMPSsConstants.WORKER_APPDIR).isEmpty()
? System.getProperty(COMPSsConstants.WORKER_APPDIR)
: "";
// Deployment ID
protected static final String DEPLOYMENT_ID = System.getProperty(COMPSsConstants.DEPLOYMENT_ID);
protected String workerName;
protected int workerPort;
protected String masterName;
protected String workingDir;
protected String installDir;
protected String appDir = "";
protected String workerClasspath = "";
protected String workerPythonpath = "";
protected String workerLibPath = "";
protected String[] jvmFlags;
protected String[] fpgaArgs;
protected String workerDebug;
protected String storageConf;
protected String executionType;
protected String workerPersistentC;
protected String pythonInterpreter;
protected String pythonVersion;
protected String pythonVirtualEnvironment;
protected String pythonPropagateVirtualEnvironment;
protected String pythonMpiWorker;
protected int totalCPU;
protected int totalGPU;
protected int totalFPGA;
protected int limitOfTasks;
protected String hostId;
protected String lang;
protected String jarName;
/**
* Creates the WorkerStarterCommand.
*
* @param workerName worker name
* @param workerPort worker Port number
* @param masterName master name
* @param workingDir worker working directory
* @param installDir worker COMPSs install directory
* @param appDir worker application install directory
* @param classpathFromFile worker classpath in projects.xml file
* @param pythonpathFromFile worker python path in projects.xml file
* @param libPathFromFile worker library path path in project.xml file
* @param totalCPU total CPU computing units
* @param totalGPU total GPU
* @param totalFPGA total FPGA
* @param limitOfTasks limit of tasks
* @param hostId tracing worker identifier
*/
public WorkerStarterCommand(String workerName, int workerPort, String masterName, String workingDir,
String installDir, String appDir, String classpathFromFile, String pythonpathFromFile, String libPathFromFile,
int totalCPU, int totalGPU, int totalFPGA, int limitOfTasks, String hostId) {
this.workerName = workerName;
this.workerPort = workerPort;
this.masterName = masterName;
this.workingDir = workingDir;
this.installDir = installDir;
if (!appDir.isEmpty()) {
if (!WORKER_APPDIR_FROM_ENVIRONMENT.isEmpty()) {
LOGGER.warn("Path passed via appdir option and xml AppDir field."
+ "The path provided by the xml will be used");
}
this.appDir = appDir;
} else {
if (!WORKER_APPDIR_FROM_ENVIRONMENT.isEmpty()) {
this.appDir = WORKER_APPDIR_FROM_ENVIRONMENT;
}
}
// Merge command classpath and worker defined classpath
if (!classpathFromFile.isEmpty()) {
if (!CLASSPATH_FROM_ENVIRONMENT.isEmpty()) {
workerClasspath = classpathFromFile + LIB_SEPARATOR + CLASSPATH_FROM_ENVIRONMENT;
} else {
workerClasspath = classpathFromFile;
}
} else {
workerClasspath = CLASSPATH_FROM_ENVIRONMENT;
}
if (!pythonpathFromFile.isEmpty()) {
if (!PYTHONPATH_FROM_ENVIRONMENT.isEmpty()) {
workerPythonpath = pythonpathFromFile + LIB_SEPARATOR + PYTHONPATH_FROM_ENVIRONMENT;
} else {
workerPythonpath = pythonpathFromFile;
}
} else {
workerPythonpath = PYTHONPATH_FROM_ENVIRONMENT;
}
if (!libPathFromFile.isEmpty()) {
if (!LIBPATH_FROM_ENVIRONMENT.isEmpty()) {
workerLibPath = libPathFromFile + LIB_SEPARATOR + LIBPATH_FROM_ENVIRONMENT;
} else {
workerLibPath = libPathFromFile;
}
} else {
workerLibPath = LIBPATH_FROM_ENVIRONMENT;
}
// Get JVM Flags
String workerJVMflags = System.getProperty(COMPSsConstants.WORKER_JVM_OPTS);
jvmFlags = new String[0];
if (workerJVMflags != null && !workerJVMflags.isEmpty()) {
jvmFlags = workerJVMflags.split(",");
}
// Get FPGA reprogram args
String workerFPGAargs = System.getProperty(COMPSsConstants.WORKER_FPGA_REPROGRAM);
fpgaArgs = new String[0];
if (workerFPGAargs != null && !workerFPGAargs.isEmpty()) {
fpgaArgs = workerFPGAargs.split(" ");
}
// Configure worker debug level
workerDebug = Boolean.toString(LogManager.getLogger(Loggers.WORKER).isDebugEnabled());
// Configure storage
storageConf = System.getProperty(COMPSsConstants.STORAGE_CONF);
if (storageConf == null || storageConf.equals("") || storageConf.equals("null")) {
storageConf = "null";
}
executionType = System.getProperty(COMPSsConstants.TASK_EXECUTION);
if (executionType == null || executionType.equals("") || executionType.equals("null")) {
executionType = COMPSsConstants.TaskExecution.COMPSS.toString();
}
// configure persistent_worker_c execution
workerPersistentC = System.getProperty(COMPSsConstants.WORKER_PERSISTENT_C);
if (workerPersistentC == null || workerPersistentC.isEmpty() || workerPersistentC.equals("null")) {
workerPersistentC = COMPSsConstants.DEFAULT_PERSISTENT_C;
}
// Configure python interpreter
pythonInterpreter = System.getProperty(COMPSsConstants.PYTHON_INTERPRETER);
if (pythonInterpreter == null || pythonInterpreter.isEmpty() || pythonInterpreter.equals("null")) {
pythonInterpreter = COMPSsConstants.DEFAULT_PYTHON_INTERPRETER;
}
// Configure python version
pythonVersion = System.getProperty(COMPSsConstants.PYTHON_VERSION);
if (pythonVersion == null || pythonVersion.isEmpty() || pythonVersion.equals("null")) {
pythonVersion = COMPSsConstants.DEFAULT_PYTHON_VERSION;
}
// Configure python virtual environment
pythonVirtualEnvironment = System.getProperty(COMPSsConstants.PYTHON_VIRTUAL_ENVIRONMENT);
if (pythonVirtualEnvironment == null || pythonVirtualEnvironment.isEmpty()
|| pythonVirtualEnvironment.equals("null")) {
pythonVirtualEnvironment = COMPSsConstants.DEFAULT_PYTHON_VIRTUAL_ENVIRONMENT;
}
pythonPropagateVirtualEnvironment = System.getProperty(COMPSsConstants.PYTHON_PROPAGATE_VIRTUAL_ENVIRONMENT);
if (pythonPropagateVirtualEnvironment == null || pythonPropagateVirtualEnvironment.isEmpty()
|| pythonPropagateVirtualEnvironment.equals("null")) {
pythonPropagateVirtualEnvironment = COMPSsConstants.DEFAULT_PYTHON_PROPAGATE_VIRTUAL_ENVIRONMENT;
}
pythonMpiWorker = System.getProperty(COMPSsConstants.PYTHON_MPI_WORKER);
if (pythonMpiWorker == null || pythonMpiWorker.isEmpty() || pythonMpiWorker.equals("null")) {
pythonMpiWorker = COMPSsConstants.DEFAULT_PYTHON_MPI_WORKER;
}
this.lang = System.getProperty(COMPSsConstants.LANG);
this.totalCPU = totalCPU;
this.totalGPU = totalGPU;
this.totalFPGA = totalFPGA;
this.limitOfTasks = limitOfTasks;
this.hostId = hostId;
}
/**
* Generate the command to start the worker.
*
* @return Command as string array
* @throws Exception Error when generating the starter command
*/
public abstract String[] getStartCommand() throws Exception;
public abstract void setScriptName(String scriptName);
@Override
public void setWorkerName(String workerName) {
this.workerName = workerName;
}
@Override
public void setNodeId(String nodeId) {
this.hostId = nodeId;
}
}
......@@ -29,6 +29,7 @@ import es.bsc.compss.types.uri.MultiURI;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.RequestQueue;
import es.bsc.compss.util.ThreadPool;
import es.bsc.conn.types.StarterCommand;
import java.io.File;
import java.net.URISyntaxException;
......@@ -163,6 +164,12 @@ public class GATAdaptor implements CommAdaptor {
return node;
}
// GAT adaptor initializes the worker each time it sends a new job
@Override
public GATWorkerNode initWorker(Configuration config, String name, int port) {
return this.initWorker(config);
}
/**
* Adds the transfer context preferences to the Adaptor.
*
......@@ -232,4 +239,11 @@ public class GATAdaptor implements CommAdaptor {
return transferContext;
}
@Override
public StarterCommand getStarterCommand(String workerName, int workerPort, String masterName, String workingDir,
String installDir, String appDir, String classpathFromFile, String pythonpathFromFile, String libPathFromFile,
int totalCPU, int totalGPU, int totalFPGA, int limitOfTasks, String hostId, boolean container) {
return null;
}
}
......@@ -66,6 +66,7 @@ import es.bsc.compss.types.resources.jaxb.ResourcesExternalAdaptorProperties;
import es.bsc.compss.types.resources.jaxb.ResourcesPropertyAdaptorType;
import es.bsc.compss.types.uri.MultiURI;
import es.bsc.compss.util.ErrorManager;
import es.bsc.conn.types.StarterCommand;
import java.io.File;
import java.util.HashMap;
......@@ -303,6 +304,17 @@ public class NIOAdaptor extends NIOAgent implements CommAdaptor {
return worker;
}
@Override
public COMPSsWorker initWorker(Configuration config, String name, int port) {
NIOConfiguration nioCfg = (NIOConfiguration) config;
LOGGER.debug("Init NIO Worker Node named " + nioCfg.getHost());
NIONode n = new NIONode(name, port);
NIOWorkerNode worker = new NIOWorkerNode(nioCfg, this, n);
NODES.add(worker);
return worker;
}
@Override
public boolean isPersistentCEnabled() {
return this.persistentC;
......@@ -849,4 +861,21 @@ public class NIOAdaptor extends NIOAgent implements CommAdaptor {
}
}
@Override
public StarterCommand getStarterCommand(String workerName, int workerPort, String masterName, String workingDir,
String installDir, String appDir, String classpathFromFile, String pythonpathFromFile, String libPathFromFile,
int totalCPU, int totalGPU, int totalFPGA, int limitOfTasks, String hostId, boolean container) {
/*
* if (container) { return new NIOContainerStarterCommand(workerName, workerPort, masterName, workingDir,
* installDir, appDir, classpathFromFile, pythonpathFromFile, libPathFromFile, totalCPU, totalGPU, totalFPGA,
* limitOfTasks, hostId); } else { return new NIOStarterCommand(workerName, workerPort, masterName, workingDir,
* installDir, appDir, classpathFromFile, pythonpathFromFile, libPathFromFile, totalCPU, totalGPU, totalFPGA,
* limitOfTasks, hostId); }
*/
return new NIOStarterCommand(workerName, workerPort, masterName, workingDir, installDir, appDir,
classpathFromFile, pythonpathFromFile, libPathFromFile, totalCPU, totalGPU, totalFPGA, limitOfTasks, hostId,
container);
}
}
package es.bsc.compss.nio.master;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.log.Loggers;
import es.bsc.compss.nio.NIOTracer;
import es.bsc.compss.types.WorkerStarterCommand;
import java.io.File;
import java.util.Arrays;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class NIOStarterCommand extends WorkerStarterCommand {
// Logger
private static final Logger LOGGER = LogManager.getLogger(Loggers.COMM);
private static final String SCRIPT_PATH = "Runtime" + File.separator + "scripts" + File.separator + "system"
+ File.separator + "adaptors" + File.separator + "nio" + File.separator;
private static final String STARTER_SCRIPT_NAME = "persistent_worker.sh";
private String scriptName;
/**
* Creates the WorkerStarterCommand for NIO.
*
* @param workerName worker name
* @param workerPort worker Port number
* @param masterName master name
* @param workingDir worker working directory
* @param installDir worker COMPSs install directory
* @param appDir worker application install directory
* @param classpathFromFile worker classpath in projects.xml file
* @param pythonpathFromFile worker python path in projects.xml file
* @param libPathFromFile worker library path path in project.xml file
* @param totalCPU total CPU computing units
* @param totalGPU total GPU
* @param totalFPGA total FPGA
* @param limitOfTasks limit of tasks
* @param hostId tracing worker identifier
*/
public NIOStarterCommand(String workerName, int workerPort, String masterName, String workingDir, String installDir,
String appDir, String classpathFromFile, String pythonpathFromFile, String libPathFromFile, int totalCPU,
int totalGPU, int totalFPGA, int limitOfTasks, String hostId, boolean isContainer) {
super(workerName, workerPort, masterName, workingDir, installDir, appDir, classpathFromFile, pythonpathFromFile,
libPathFromFile, totalCPU, totalGPU, totalFPGA, limitOfTasks, hostId);
scriptName = installDir + (installDir.endsWith(File.separator) ? "" : File.separator) + SCRIPT_PATH
+ STARTER_SCRIPT_NAME;
if (isContainer) {
this.appDir = "/compss";
if ("python".equals(this.lang.toLowerCase())) {
this.workerPythonpath += LIB_SEPARATOR + appDir;
} else if ("java".equals(this.lang.toLowerCase())) {
String[] paths = CLASSPATH_FROM_ENVIRONMENT.split(LIB_SEPARATOR);
String jarName = paths[1].split("/")[paths[1].split("/").length - 1];
this.workerClasspath += LIB_SEPARATOR + appDir + LIB_SEPARATOR + appDir + "/" + jarName;
}
}
}
@Override
public String[] getStartCommand() throws Exception {
/*
* ************************************************************************************************************
* BUILD COMMAND
* ************************************************************************************************************
*/
String[] cmd = new String[NIOAdaptor.NUM_PARAMS_PER_WORKER_SH + NIOAdaptor.NUM_PARAMS_NIO_WORKER
+ jvmFlags.length + 1 + fpgaArgs.length];
/* SCRIPT ************************************************ */
cmd[0] = scriptName;
/* Values ONLY for persistent_worker.sh ****************** */
cmd[1] = workerLibPath.isEmpty() ? "null" : workerLibPath;
if (appDir.isEmpty()) {
LOGGER.warn("No path passed via appdir option neither xml AppDir field");
cmd[2] = "null";
} else {
cmd[2] = appDir;
}
cmd[3] = workerClasspath.isEmpty() ? "null" : workerClasspath;