Added violation messages for the GRM, fixed issue with setDelayRtt

parent fef07e26
......@@ -24,12 +24,7 @@ import es.bsc.compss.nfr.model.CommunicationLink;
import es.bsc.compss.nfr.model.ElasticSystem;
import es.bsc.compss.nfr.model.Node;
import es.bsc.compss.nfr.model.Worker;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import utils.Constants;
import utils.FileUtils;
......@@ -38,8 +33,10 @@ public class ResourceManagerCommsTask extends ResourceManager implements MqttCal
private static final String PUBLISH_TOPIC = "konnekt/v3/service/konnektbox-telemetry/peers";
private static final String SUBSCRIBE_TOPIC = "konnekt/v3/service/konnektbox-telemetry/net";
private static final String GRM_TOPIC = "violations";
private MqttClient mqttClient;
private MqttClient extMQClient;
private final ObjectMapper mapper;
private final FileUtils fileUtils;
......@@ -59,14 +56,20 @@ public class ResourceManagerCommsTask extends ResourceManager implements MqttCal
private void initializeMqttClient() throws MqttException {
final String broker = Optional.ofNullable(System.getenv("MQTT_BROKER_URL"))
.orElse("tcp://localhost:1883");
final String extBroker = Optional.ofNullable(System.getenv("CENTRAL_MQ_URL"))
.orElse("tcp://localhost:1883");
final String clientId = UUID.randomUUID().toString();
final String extClientId = UUID.randomUUID().toString();
final MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(true); // Totally optional
this.mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
this.extMQClient = new MqttClient(extBroker, extClientId, new MemoryPersistence());
this.mqttClient.setCallback(this);
this.mqttClient.connect(connectOptions);
connectOptions.setAutomaticReconnect(true);
this.extMQClient.connect(connectOptions);
this.mqttClient.subscribe(SUBSCRIBE_TOPIC);
}
......@@ -172,10 +175,19 @@ public class ResourceManagerCommsTask extends ResourceManager implements MqttCal
links.stream()
// Filter the CommunicationLinks so that only ip1->ip2 link(s) remain
.filter(link -> link.getIpNode1().equals(ip1) && link.getIpNode2().equals(ip2))
.findAny() // If any, that is
.ifPresent(link -> { // If there's any, then store the rtt in DataClay
.forEach(link -> { // If there's any, then store the rtt in DataClay
System.out.printf("Updating RTT of link %s->%s to %f\n", ip1, ip2, rtt);
link.setDelayRtt(rtt);
if (rtt>rttmax){
try {
final String jsonMessageContent = String.format("{\"workerIP\": %s, \"workerID\": %s, \"dimension\": \"comms\"}", worker.getIp(), worker.getPid());
final MqttMessage violationMsg = new MqttMessage(jsonMessageContent.getBytes());
extMQClient.publish(GRM_TOPIC, violationMsg);
} catch (MqttException e) {
System.err.println("An exception occurred with the MQTT publish");
e.printStackTrace();
}
}
});
})
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment