Commit 7173ca82 authored by Unai Perez's avatar Unai Perez
Browse files

First working version of DiscoveryThread

parent 1af4e70a
......@@ -61,6 +61,8 @@ public final class Loggers {
public static final String CONNECTORS = IT + ".Connectors";
public static final String CONNECTORS_UTILS = IT + ".ConnectorsUtils";
public static final String DISCOVERY = IT + ".Discovery";
// Worker
public static final String WORKER = IT + ".Worker";
public static final String WORKER_EXEC_MANAGER = WORKER + ".ExecManager";
......
......@@ -530,7 +530,7 @@ public class COMPSsRuntimeImpl implements COMPSsRuntime, LoaderAPI, FatalErrorHa
}
if (DiscoveryThread.isEnabled()) {
LOGGER.debug("Stopping Discovery Thread...");
ResourceLoader.discoveryThread.cease();
DiscoveryThread.getInstance().stop();
}
// Stop runtime components
......
package es.bsc.compss.util;
//import es.bsc.compss.agent.rest.types.NIOAdaptorResource;
//import es.bsc.compss.agent.rest.types.RESTResource;;
import es.bsc.compss.COMPSsConstants;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.exceptions.ConstructConfigurationException;
//import es.bsc.compss.nio.NIOAgent;
//import es.bsc.compss.nio.master.NIOAdaptor;
//import es.bsc.compss.nio.master.NIOWorkerNode;
import es.bsc.compss.log.Loggers;
import es.bsc.compss.types.project.ProjectFile;
import es.bsc.compss.types.resources.DynamicMethodWorker;
import es.bsc.compss.types.resources.MethodResourceDescription;
import es.bsc.compss.types.resources.MethodWorker;
import es.bsc.compss.types.resources.ResourcesFile;
import es.bsc.compss.types.resources.Worker;
import es.bsc.compss.types.resources.WorkerResourceDescription;
import es.bsc.compss.types.resources.configuration.MethodConfiguration;
import es.bsc.compss.types.resources.jaxb.ComputeNodeType;
import es.bsc.compss.types.resources.jaxb.ResourcesNIOAdaptorProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class DiscoveryThread extends Thread {
public class DiscoveryThread {
private ProjectFile project;
private ResourcesFile resources;
private Map<String, es.bsc.compss.types.project.jaxb.ComputeNodeType> projectMap;
private Map<String, ComputeNodeType> resourcesMap;
private Set<String> resourceAttempts = new HashSet<>();
private Map<String, Thread> pendingResources;
private Boolean cease = false;
private final Logger LOGGER = LogManager.getLogger(Loggers.DISCOVERY);
private static boolean enabled = false;
private static DiscoveryThread INSTANCE = null;
public DiscoveryThread(ProjectFile project, ResourcesFile resources) {
this.project = project;
this.resources = resources;
public static DiscoveryThread start(ProjectFile project, ResourcesFile resources) {
return Optional.ofNullable(INSTANCE).orElse(new DiscoveryThread(project, resources));
}
private DiscoveryThread(ProjectFile project, ResourcesFile resources) {
this.projectMap = new HashMap<>();
this.resourcesMap = new HashMap<>();
this.pendingResources = new HashMap<>();
INSTANCE = this;
load(project, resources);
// Runtime.getRuntime().addShutdownHook(new Thread(this::cease));
}
private void load(ProjectFile project, ResourcesFile resources) {
List<Object> resourceList = resources.getResources().getSharedDiskOrDataNodeOrComputeNode();
for (Object o : resourceList) {
if (o instanceof ComputeNodeType) {
ComputeNodeType cnResource = (ComputeNodeType) o;
List<es.bsc.compss.types.project.jaxb.ComputeNodeType> projectList = project.getComputeNodes_list();
Optional<es.bsc.compss.types.project.jaxb.ComputeNodeType> cnProject =
projectList.stream().filter(o2 -> cnResource.getName().equals(o2.getName())).findFirst();
if (cnProject.isPresent()) {
resourcesMap.put(cnResource.getName(), cnResource);
projectMap.put(cnResource.getName(), cnProject.get());
// LOGGER.debug("Resource {} added to discoverable nodes", cnResource.getName());
System.out.println("Resource " + cnResource.getName() + " added to discoverable nodes");
} else {
// LOGGER.debug("Resource {} not found in project file", cnResource.getName());
System.out.println("Resource " + cnResource.getName() + " not found in project file");
}
}
}
}
DiscoveryThread.enabled = true;
public void stop() {
this.pendingResources.forEach((k, v) -> v.interrupt());
}
Runtime.getRuntime().addShutdownHook(new Thread(this::cease));
public static DiscoveryThread getInstance() {
return INSTANCE;
}
public static boolean isEnabled() {
return enabled;
return INSTANCE != null;
}
public void cease() {
System.out.println("Ceasing discovery");
synchronized (this) {
this.cease = true;
enabled = false;
private void tryToAddResource(String nodeName) {
try {
// LOGGER.debug("Trying to add dynamic discoverable resource {}", nodeName);
System.out.println("Trying to add dynamic discoverable resource " + nodeName);
ComputeNodeType cnResource = resourcesMap.get(nodeName);
es.bsc.compss.types.project.jaxb.ComputeNodeType cnProject = projectMap.get(nodeName);
MethodWorker worker = ResourceLoader.createMethodWorker(cnProject, cnResource);
MethodResourceDescription mrd = worker.getDescription();
final String adaptor = System.getProperty(COMPSsConstants.COMM_ADAPTOR);
MethodConfiguration config = ResourceLoader.createConfiguration(cnProject, cnResource, mrd, adaptor);
DynamicMethodWorker w = new DynamicMethodWorker(worker.getName(), mrd,
Comm.getAdaptor(adaptor).initWorker(config), worker.getMaxCPUTaskCount(), worker.getMaxGPUTaskCount(),
worker.getMaxFPGATaskCount(), worker.getMaxOthersTaskCount(), new HashMap<>());
ResourceManager.addDynamicWorker(w, mrd);
// LOGGER.debug("{} added as dynamic worker", w.getName());
System.out.println(w.getName() + " added as dynamic worker");
} catch (Exception e) {
e.printStackTrace();
System.out.println("This exception was caught by the DiscoveryThread");
}
}
@Override
public void run() {
System.out.println("Starting discovery");
while (!this.cease) {
/*
* @Override public void run() { System.out.println("Starting discovery"); while (!this.cease) { try {
* Iterator<String> nodeIterator = discoverableNodes.iterator(); while (nodeIterator.hasNext()) { String nodeName =
* nodeIterator.next(); try { ComputeNodeType cnResource = resourcesMap.get(nodeName);
* es.bsc.compss.types.project.jaxb.ComputeNodeType cnProject = projectMap.get(nodeName); MethodWorker worker =
* ResourceLoader.createMethodWorker(cnProject, cnResource); MethodResourceDescription mrd =
* worker.getDescription(); final String adaptor = System.getProperty(COMPSsConstants.COMM_ADAPTOR);
* MethodConfiguration config = ResourceLoader.createConfiguration(cnProject, cnResource, mrd, adaptor);
* DynamicMethodWorker w = new DynamicMethodWorker(worker.getName(), mrd,
* Comm.getAdaptor(adaptor).initWorker(config), worker.getMaxCPUTaskCount(), worker.getMaxGPUTaskCount(),
* worker.getMaxFPGATaskCount(), worker.getMaxOthersTaskCount(), new HashMap<>()); synchronized (this) { if
* (!this.cease) ResourceManager.addDynamicWorker(w, mrd); } // LOGGER.debug("{} added as dynamic worker",
* w.getName()); System.out.println(w.getName() + " added as dynamic worker"); } catch (Exception e) {
* e.printStackTrace(); } // resourceAttempts.add(cnResource.getName()); } // LOGGER.debug("Waiting 15 seconds...");
* System.out.println("Waiting 15 seconds..."); Thread.sleep(15000); } catch (InterruptedException e) {
* e.printStackTrace(); } } }
*/
public void addDiscoverableNode(String name) {
// this.discoverableNodes.add(name);
// LOGGER.debug("Added discoverable node {}", name);
int delay = 15000;
System.out.println("Added discoverable node " + name);
setTimeout(() -> tryToAddResource(name), name, delay);
}
private void setTimeout(Runnable runnable, String name, long delay) {
Thread timeout = new Thread(() -> {
try {
List<Object> resourceList = resources.getResources().getSharedDiskOrDataNodeOrComputeNode();
for (Object o : resourceList) {
if (o instanceof ComputeNodeType) {
ComputeNodeType cnResource = (ComputeNodeType) o;
if (ResourceManager.getAllWorkers().stream().noneMatch(w -> w.getName().equals(cnResource.getName()))
&& !resourceAttempts.contains(cnResource.getName())) {
List<es.bsc.compss.types.project.jaxb.ComputeNodeType> projectList = project.getComputeNodes_list();
Optional<es.bsc.compss.types.project.jaxb.ComputeNodeType> cnProject = projectList.stream()
.filter(o2 -> cnResource.getName().equals(o2.getName()))
.findFirst();
if (cnProject.isPresent()) {
try {
MethodWorker worker = ResourceLoader.createMethodWorker(cnProject.get(), cnResource);
MethodResourceDescription mrd = worker.getDescription();
final String adaptor = System.getProperty(COMPSsConstants.COMM_ADAPTOR);
MethodConfiguration config = ResourceLoader.createConfiguration(cnProject.get(), cnResource, mrd, adaptor);
DynamicMethodWorker w = new DynamicMethodWorker(worker.getName(),
mrd,
Comm.getAdaptor(adaptor).initWorker(config),
worker.getMaxCPUTaskCount(),
worker.getMaxGPUTaskCount(),
worker.getMaxFPGATaskCount(),
worker.getMaxOthersTaskCount(),
new HashMap<>());
synchronized (this) {
if (!this.cease) ResourceManager.addDynamicWorker(w, mrd);
}
System.out.println(w.getName() + " added as dynamic worker");
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("Resource not found in project file");
}
resourceAttempts.add(cnResource.getName());
} else {
System.out.println(String.format("Worker %s in WorkerPool: %b. Worker in Set: %b",
cnResource.getName(),
ResourceManager.getAllWorkers().stream().anyMatch(w -> w.getName().equals(cnResource.getName())),
resourceAttempts.contains(cnResource.getName())));
}
}
}
System.out.println("Waiting 5 seconds...");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.sleep(delay);
runnable.run();
} catch (Exception e) {
// Do nothing
} finally {
pendingResources.remove(name);
}
}
});
pendingResources.put(name, timeout);
timeout.start();
}
public void threadDiscovery() throws Exception {
......@@ -165,7 +193,7 @@ public class DiscoveryThread extends Thread {
* @param networkDevices when networkDevices
*/
// TODO: Other exception class?
public static void discover(List<DiscoverNetworkDevice> networkDevices) throws Exception {
public void discover(List<DiscoverNetworkDevice> networkDevices) throws Exception {
List<DiscoverNetworkDevice> discoveredDevices =
networkDevices.parallelStream().filter(DiscoverNetworkDevice::ddiscover).collect(Collectors.toList());
for (DiscoverNetworkDevice device : discoveredDevices) {
......@@ -174,7 +202,7 @@ public class DiscoveryThread extends Thread {
}
public static class DiscoverNetworkDevice {
private static class DiscoverNetworkDevice {
private String hostIp;
private String hostName;
......@@ -216,7 +244,7 @@ public class DiscoveryThread extends Thread {
* @throws Exception could not create a configuration to start using this resource
*/
// TODO: Choose another Exception class
public static void addResources(String workerName) throws Exception {
public void addResources(String workerName) throws Exception {
MethodResourceDescription description = new MethodResourceDescription();
DynamicMethodWorker worker = ResourceManager.getDynamicResource(workerName);
......@@ -225,7 +253,7 @@ public class DiscoveryThread extends Thread {
} else {
String adaptor = "es.bsc.compss.nio.master.NIOAdaptor";
Map<String, Object> projectConf = new HashMap<>();
// projectConf.put("Properties", null);
// projectMap.put("Properties", null);
Map<String, Object> resourcesConf = new HashMap<>();
......@@ -234,7 +262,7 @@ public class DiscoveryThread extends Thread {
resourceProperties.setMaxPort(43002);
resourcesConf.put("Ports", resourceProperties);
// registerWorker(workerName, description, adaptor, projectConf, resourcesConf);
// registerWorker(workerName, description, adaptor, projectMap, resourcesMap);
}
}
......@@ -243,7 +271,7 @@ public class DiscoveryThread extends Thread {
* es.bsc.compss.types.project.jaxb.ComputeNodeType projComputeNode) throws Exception { if (mrd == null) { mrd = new
* MethodResourceDescription(); }
*
* MethodConfiguration mc = (MethodConfiguration) Comm.constructConfiguration(adaptor, projectConf, resourcesConf);
* MethodConfiguration mc = (MethodConfiguration) Comm.constructConfiguration(adaptor, projectMap, resourcesMap);
*
* mc.setLimitOfTasks(Math.min(mc.getLimitOfTasks(), mrd.getTotalCPUComputingUnits()));
* mc.setTotalGPUComputingUnits(Math.min(mc.getTotalGPUComputingUnits(), mrd.getTotalGPUComputingUnits()));
......@@ -277,7 +305,7 @@ public class DiscoveryThread extends Thread {
* @param reduction description of the resources to stop using.
* @throws Exception the worker was not set up for the agent.
*/
public static void removeResources(String workerName, MethodResourceDescription reduction) throws Exception {
public void removeResources(String workerName, MethodResourceDescription reduction) throws Exception {
DynamicMethodWorker worker = ResourceManager.getDynamicResource(workerName);
if (worker != null) {
ResourceManager.requestWorkerReduction(worker, reduction);
......
......@@ -88,8 +88,6 @@ public class ResourceLoader {
private static ResourcesFile resources;
private static ProjectFile project;
public static DiscoveryThread discoveryThread;
// Logger
private static final Logger LOGGER = LogManager.getLogger(Loggers.RM_COMP);
......@@ -111,9 +109,8 @@ public class ResourceLoader {
init(resourcesXML, resourcesXSD, projectXML, projectXSD);
if ((ResourceLoader.resources_XML != null) && (ResourceLoader.project_XML != null)) {
loadFiles();
discoveryThread = new DiscoveryThread(ResourceLoader.project, ResourceLoader.resources);
DiscoveryThread.start(ResourceLoader.project, ResourceLoader.resources);
loadRuntime();
discoveryThread.start();
} else {
LOGGER.warn("No resource/project file detected. Starting runtime without computing resources.");
}
......
......@@ -282,6 +282,7 @@ public class ResourceManager {
for (int coreId = 0; coreId < maxTaskCount.length; ++coreId) {
poolCoreMaxConcurrentTasks[coreId] -= maxTaskCount[coreId];
}
DiscoveryThread.getInstance().addDiscoverableNode(r.getName());
}
/**
......
......@@ -154,7 +154,8 @@ public class StartWorkerAction<T extends WorkerResourceDescription> extends Allo
rd.reduce(rd);
// Update the CE and Implementations that can run (none)
this.worker.getResource().updatedFeatures();
this.ts.updateWorker(this.worker.getResource(), new PendingReduction<>(this.worker.getResource().getDescription()));
this.ts.updateWorker(this.worker.getResource(),
new PendingReduction<>(this.worker.getResource().getDescription()));
}
@Override
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment