Commit 90a9f434 authored by Unai's avatar Unai 🇵🇲

Implemented new MQTT-base communication model between Manager and

Monitor. Useless files removed and project refactored and restructured.
parent cafc6ac8
/.metadata/
**/target
.idea
**/*.iml
**/stubs/
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="test" value="true"/>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
/target/
**/target
**/.idea
**/*.iml
**/stubs/
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>app</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding/<project>=UTF-8
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=warning
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
HOST=0.0.0.0
HOST=localhost
TCPPORT=11034
import sys
import os
import re
if "C:\\Apps\\Python\\Python38\\Lib" not in sys.path:
sys.path.append("C:\\Apps\\Python\\Python38\\Lib")
import psutil
info = {}
pll_stats = psutil.net_io_counters(pernic=True)
pernic_addr = psutil.net_if_addrs()
iface = sys.argv[1]
ninfo = {}
# Get total/dropped packets and data volume
ninfo['rx_MB'] = pll_stats[iface].bytes_recv/1000000
print('rx_MB:' + str(ninfo['rx_MB']))
ninfo['rx_packets'] = pll_stats[iface].packets_recv
print('rx_packets:' + str(ninfo['rx_packets']))
ninfo['rx_lost_packets'] = pll_stats[iface].dropin
print('rx_lost_packets:' + str(ninfo['rx_lost_packets']))
ninfo['tx_MB'] = pll_stats[iface].bytes_sent/1000000
print('tx_MB:' + str(ninfo['tx_MB']))
ninfo['tx_packets'] = pll_stats[iface].packets_sent
print('tx_packets:' + str(ninfo['tx_packets']))
ninfo['tx_lost_packets'] = pll_stats[iface].dropout
print('tx_lost_packets:' + str(ninfo['tx_lost_packets']))
# Get RTT to a certain endpoint via ICMP
#command = "ping -c 1 -w 1 -W 1 -I {} {}".format(iface, '8.8.8.8')
command = "ping -c 1 -w 1 8.8.8.8"
request = os.popen(command).read()
#request = re.search('=\\s(\\d+\\.\\d+)', request)
request = re.search('Media =\\s(\\d+)', request)
if request is None:
ninfo['rtt_ms'] = "ICMP request timeout"
print('rtt_ms:' + str(ninfo['rtt_ms']))
else:
rtt = float(request.group(1))
ninfo['rtt_ms'] = float('{0:.2f}'.format(rtt))
print('rtt_ms:' + str(ninfo['rtt_ms']))
info[iface] = ninfo
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007f01fb3b2401, pid=13209, tid=13224
#
# JRE version: OpenJDK Runtime Environment (11.0.7+10) (build 11.0.7+10-post-Ubuntu-2ubuntu218.04)
# Java VM: OpenJDK 64-Bit Server VM (11.0.7+10-post-Ubuntu-2ubuntu218.04, mixed mode, sharing, tiered, compressed oops, g1 gc, linux-amd64)
# Problematic frame:
# C [libjimage.so+0x2401]
#
# Core dump will be written. Default location: Core dumps may be processed with "/usr/share/apport/apport %p %s %c %d %P %E" (or dumping to /home/jjvmelastic/src/nfrtool-comms/app/core.13209)
#
# If you would like to submit a bug report, please visit:
# https://bugs.launchpad.net/ubuntu/+source/openjdk-lts
#
--------------- S U M M A R Y ------------
Command Line: -agentlib:jdwp=transport=dt_socket,suspend=y,address=localhost:41387 -javaagent:/home/jjvmelastic/eclipse/jee/configuration/org.eclipse.osgi/405/0/.cp/lib/javaagent-shaded.jar -Dfile.encoding=UTF-8 nfrTool.NFRTool
Host: Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz, 2 cores, 3G,
\ No newline at end of file
......@@ -22,23 +22,35 @@
<dependency>
<groupId>es.bsc.dataclay</groupId>
<artifactId>dataclay</artifactId>
<version>2.1</version>
<version>2.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.3</version>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.21</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
<build>
<finalName>nfrtool</finalName>
......@@ -61,6 +73,16 @@
<mainClass>nfrTool.NFRTool</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
......@@ -77,4 +99,4 @@
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
</project>
package netMonitor;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.collect.BiMap;
import es.bsc.compss.nfr.model.CommunicationLink;
import es.bsc.compss.nfr.model.ElasticSystem;
import es.bsc.compss.nfr.model.Node;
import resourceManager.ResourceManager;
import utils.HardwareInfo;
public class NetMonitor extends ResourceManager {
HardwareInfo hardwareInfo;
public NetMonitor(ElasticSystem elasticSystem) {
super(elasticSystem);
// this.hardwareInfo = new HardwareInfo("enp0s3");
this.hardwareInfo = new HardwareInfo();
// this.node = getNodeByIP("10.0.2.15"); //TODO: get IP as I should
}
public NetMonitor() {
this.hardwareInfo = new HardwareInfo();
}
public void setLinksRTT(Node node) {
try {
BiMap<String, String> ifaces = getSelfIfaces();
//List<CommunicationLink> commsList = node.getCommunicationLinks().stream()
// .filter(cl -> cl.getIpNode1().equals(node.getIpEth())).collect(Collectors.toList());
List<CommunicationLink> commsList = node.getCommunicationLinks().stream()
.filter(cl -> ifaces.containsValue(cl.getIpNode1())).collect(Collectors.toList());
commsList.stream().forEach(cl -> {
cl.setDelayRtt(hardwareInfo.getLinkRTT(cl.getIpNode2(), ifaces.inverse().get(cl.getIpNode1())));
});
} catch (SocketException e) {
throw new RuntimeException(e);
}
// hardwareInfo.getLinksRTT();
}
}
package nfrTool;
import java.net.SocketException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.TimerTask;
import org.json.JSONObject;
import es.bsc.compss.nfr.model.Node;
import netMonitor.NetMonitor;
import resourceManager.ResourceManagerComms;
import utils.Constants;
import utils.FileUtils;
import utils.HardwareInfo;
public class CommsMonitoringTask extends TimerTask {
ResourceManagerComms resourceManager;
NetMonitor netMonitor;
FileUtils fileUtils;
HardwareInfo hardWareInfo;
public CommsMonitoringTask(ResourceManagerComms resourceManager, NetMonitor netMonitor) {
this.resourceManager = resourceManager;
this.netMonitor = netMonitor;
this.fileUtils = new FileUtils();
this.hardWareInfo = new HardwareInfo();
}
@Override
public void run() {
try {
this.resourceManager.getConfig(Constants.CONFIG_FILE_PATH);
this.netMonitor
.setLinksRTT(this.resourceManager.getNodeByIP(this.resourceManager.getSelfIfaces().get("eth0")));
//this.netMonitor
// .setLinksRTT(this.resourceManager.getNodeByIP(this.resourceManager.getSelfIfaces().get("enp0s3")));
this.resourceManager.getCosts(this.resourceManager.getNodeByIP(this.resourceManager.getSelfIfaces().get("eth0")));
//this.resourceManager.getCosts(this.resourceManager.getNodeByIP(this.resourceManager.getSelfIfaces().get("enp0s3")));
System.out.println();
} catch (SocketException e) {
throw new RuntimeException(e);
}
/*
* Map<String, Integer> intResultsMap = this.resourceManager
* .checkThresholds(hardWareInfo.getTelemetry(), Arrays.asList("devLevel",
* "rttmax"), Arrays.asList("devLevel", "pllmax"));
*
* intResultsMap.forEach((k,v) -> {
*
* switch(v) { case 1: System.out.
* println("[-] Could not obtain comms. attributes for Interface: \" + k");
* break; case 2:
* System.out.println("[?] Comms. requirements not satisfied for Interface: " +
* k); Map<String, Double> cost = this.resourceManager.getCost((JSONObject)
* hardWareInfo.getTelemetry().query("/net_interfaces"));
* System.out.println("\tApplication costs: " + cost.toString()); double maxCost
* = Collections.max(cost.values()); System.out.println(maxCost); break; case 0:
* System.out.println("[+] Interface " + k + " up and Running"); break; } });
*/
}
}
......@@ -2,38 +2,47 @@ package nfrTool;
import java.util.Timer;
import es.bsc.compss.nfr.model.ElasticSystem;
import es.bsc.dataclay.api.DataClay;
import es.bsc.dataclay.api.DataClayException;
import netMonitor.NetMonitor;
import resourceManager.ResourceManagerComms;
import org.eclipse.paho.client.mqttv3.MqttException;
import resourceManager.ResourceManagerCommsTask;
public class NFRMonitor {
ResourceManagerComms resourceManager;
NetMonitor netMonitor;
long period;
public NFRMonitor(ResourceManagerComms resourceManager, NetMonitor netMonitor, long period) {
this.resourceManager = resourceManager;
this.netMonitor = netMonitor;
this.period = period;
}
public void runMonitor() {
final Timer timer = new Timer();
CommsMonitoringTask mTask = new CommsMonitoringTask(this.resourceManager,this.netMonitor);
timer.scheduleAtFixedRate(mTask, 0, this.period);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
timer.cancel();
DataClay.finish();
} catch (DataClayException e) {
System.out.println(e.getMessage());
}
}));
}
private final ElasticSystem system;
private final long period;
public NFRMonitor(ElasticSystem system, long period) {
this.system = system;
this.period = period;
}
public void runMonitor() {
final Timer timer = new Timer();
try {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
System.out.println("Exiting NFRTool");
timer.cancel();
DataClay.finish();
} catch (DataClayException e) {
// e.printStackTrace();
// Do nothing
}
}));
ResourceManagerCommsTask mTask = new ResourceManagerCommsTask(this.system);
timer.scheduleAtFixedRate(mTask, 0, this.period);
} catch (MqttException e) {
System.err.println("A problem occurred with the MQTT connection");
e.printStackTrace();
System.exit(1);
} catch (ResourceManagerCommsTask.NodeNotFoundException e) {
System.err.println("This node could not be found in the ElasticSystem");
e.printStackTrace();
System.exit(1);
}
}
}
This diff is collapsed.
......@@ -5,79 +5,74 @@ import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Set;
import java.util.TimerTask;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import es.bsc.compss.nfr.model.COMPSsApplication;
import es.bsc.compss.nfr.model.ElasticSystem;
import es.bsc.compss.nfr.model.Node;
import es.bsc.compss.nfr.model.Worker;
public abstract class ResourceManager {
protected ElasticSystem elasticSystem;
protected ArrayList<COMPSsApplication> apps;
protected ArrayList<Node> nodes;
protected ArrayList<Worker> workers;
/*
* Public Getters & Setters
*/
public ResourceManager(ElasticSystem elasticSystem) {
this.elasticSystem = elasticSystem;
this.apps = this.elasticSystem.getApplications();
this.nodes = elasticSystem.getNodes();
}
public ResourceManager() {
}
public ElasticSystem getElasticSystem() {
return elasticSystem;
}
public void setElasticSystem(ElasticSystem elasticSystem) {
this.elasticSystem = elasticSystem;
}
public ArrayList<COMPSsApplication> getApps() {
return apps;
}
public void setApps(ArrayList<COMPSsApplication> apps) {
this.apps = apps;
}
public ArrayList<Node> getNodes() {
return nodes;
}
public BiMap<String, String> getSelfIfaces() throws SocketException {
BiMap<String, String> iFacesMap = HashBiMap.create();
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
for (NetworkInterface netint : Collections.list(nets)) {
Enumeration<InetAddress> addresses = netint.getInetAddresses();
for (InetAddress addr : Collections.list(addresses)) {
if (addr instanceof Inet4Address)
iFacesMap.put(netint.getName(), addr.getHostAddress());
}
}
return iFacesMap;
}
public Node getNodeByIP(String ip) {
return elasticSystem.getNodes().stream().filter(node -> node.getIpEth().equals(ip)).findFirst().orElse(null);
}
public abstract class ResourceManager extends TimerTask {
protected ElasticSystem elasticSystem;
protected ArrayList<COMPSsApplication> apps;
protected ArrayList<Node> nodes;
private BiMap<String, String> ownInterfaces;
public ResourceManager(ElasticSystem elasticSystem) {
this.elasticSystem = elasticSystem;
this.apps = this.elasticSystem.getApplications();
this.nodes = elasticSystem.getNodes();
this.ownInterfaces = null;
}
protected BiMap<String, String> getOwnInterfaces() throws SocketException {
this.ownInterfaces = HashBiMap.create();
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
for (NetworkInterface netint : Collections.list(nets)) {
Enumeration<InetAddress> addresses = netint.getInetAddresses();
for (InetAddress addr : Collections.list(addresses)) {
if (addr instanceof Inet4Address)
this.ownInterfaces.put(netint.getName(), addr.getHostAddress());
}
}
return this.ownInterfaces;
}
protected Node getSelf() throws NodeNotFoundException {
try {
final Set<String> localIps = this.getOwnInterfaces().values();
System.out.println("Local IPs: " + localIps);
return elasticSystem.getNodes()
.stream()
.filter(n -> !Collections.disjoint(localIps, Arrays.asList(n.getIpEth(), n.getIpLte(), n.getIpWifi())))
.findAny()
.orElseThrow(() -> new NodeNotFoundException(localIps));
} catch (SocketException e) {
System.err.println("An error occurred when scanning local network interfaces: " + e.getMessage());
throw new NodeNotFoundException();
}
}
public static class NodeNotFoundException extends Exception {
public NodeNotFoundException(Set<String> ips) {
super("I could not find myself in DataClay using the following detected set of IPs: " + ips);
}
public NodeNotFoundException() {
super("An error occurred and the node could not be found");
}
}
public void setNodes(ArrayList<Node> nodes) {
this.nodes = nodes;
}
}
package resourceManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.json.JSONObject;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.nodes.MappingNode;
import es.bsc.compss.nfr.model.CommunicationLink;
import es.bsc.compss.nfr.model.ElasticSystem;
import es.bsc.compss.nfr.model.Node;
import es.bsc.compss.nfr.model.Worker;
import utils.Constants;
import utils.FileUtils;
public class ResourceManagerComms extends ResourceManager {
FileUtils fileUtils;
public ResourceManagerComms(ElasticSystem elasticSystem) {
super(elasticSystem);
this.fileUtils = new FileUtils();
}
public ResourceManagerComms() {
this.fileUtils = new FileUtils();
}
/*
* public Map<String, Integer> checkThresholds(JSONObject telemetry,
* List<String> ymlAttrPathRtt, List<String> ymlAttrPathPll) {
*
* Map<String, Integer> intTelemetryMap = new HashMap<>(); JSONObject
* netInterfaces = (JSONObject) telemetry.query("/net_interfaces");
* Iterator<String> keys = netInterfaces.keys();
*
* while(keys.hasNext()) { String key = keys.next(); if (netInterfaces.get(key)
* instanceof JSONObject) { JSONObject o = (JSONObject) netInterfaces.get(key);
* intTelemetryMap.put(key, checkThreshold(o, ymlAttrPathRtt, ymlAttrPathPll));
* } }
*
* return intTelemetryMap; }
*/
public void checkThresholds(Worker worker) {
List<CommunicationLink> workerLinks = worker.getCommunicationLinksForApplication();
// We need to get the rtt threshold from file, depending on infoNature. So we
// need infoNature
String infoNature = worker.getApplication().getInfoNature();
float rttmax = Float.valueOf(
this.fileUtils.getYmlNodeScalar(Constants.APP_ATTRIB_PATH, Arrays.asList(infoNature, "rttmax")));
// We need to get the RTT for each Link in workerLinks from dataClay
// int thresViolation = 0;
for (int ll = 0; ll < workerLinks.size(); ll++) {
// float monitoredRTT = workerLinks.get(ll).getDelayRtt();
if (workerLinks.get(ll).getDelayRtt() >= rttmax) {
worker.setActive(false); // Is inactive true or false?
return;
}
}
}
private int checkThreshold(JSONObject telemetry, List<String> ymlAttrPathRtt, List<String> ymlAttrPathPll) {
float devRttMax = Float.valueOf(this.fileUtils.getYmlNodeScalar(Constants.APP_ATTRIB_PATH, ymlAttrPathRtt));
float devPllMax = Float.valueOf(this.fileUtils.getYmlNodeScalar(Constants.APP_ATTRIB_PATH, ymlAttrPathPll));
try {
float rtt = Float.valueOf((String) telemetry.query("/rtt_ms"));
float totalPackets = Float.valueOf((String) telemetry.query("/rx_packets"))
+ Float.valueOf((String) telemetry.query("/tx_packets"));
float totalLost = Float.valueOf((String) telemetry.query("/rx_lost_packets"))
+ Float.valueOf((String) telemetry.query("/tx_lost_packets"));
float pll = totalLost / totalPackets;
if (rtt >= devRttMax || pll >= devPllMax) {
return 2;
}
} catch (NumberFormatException e) {
String rtt = (String) telemetry.query("/rtt_ms");
if (rtt.equals("ICMP request timeout"))
return 1;
} catch (Exception e) {
System.out.println(e.getMessage());
}
return 0;
}
public double getCost(Worker worker) {
List<CommunicationLink> workerLinks = worker.getCommunicationLinksForApplication();
// We need to get the rtt threshold from file, depending on infoNature. So we
// need infoNature
String infoNature = worker.getApplication().getInfoNature();
/*
* float balance = Float.valueOf(
* this.fileUtils.getYmlNodeScalar(Constants.APP_ATTRIB_PATH,
* Arrays.asList(infoNature, "balance")));
*/
float priority = Float.valueOf(
this.fileUtils.getYmlNodeScalar(Constants.APP_ATTRIB_PATH, Arrays.asList(infoNature, "priority")));
float norm = Float
.valueOf(this.fileUtils.getYmlNodeScalar(Constants.APP_ATTRIB_PATH, Arrays.asList(infoNature, "norm")));
float rttmax = Float.valueOf(
this.fileUtils.getYmlNodeScalar(Constants.APP_ATTRIB_PATH, Arrays.asList(infoNature, "rttmax")));
// We need to get the RTT for each Link in workerLinks from dataClay
double commsCost = 0;
for (int ll = 0; ll < workerLinks.size(); ll++) {
// float monitoredRTT = workerLinks.get(ll).getDelayRtt();
if (workerLinks.get(ll).getDelayRtt() >= rttmax) {
worker.setActive(false);