Commit 1c0d65e2 authored by Rita Sousa's avatar Rita Sousa
Browse files

Updated Monitors to listen "global" topic and updated fakeWorker Docker images

parent cc8b4eb4
......@@ -96,4 +96,5 @@ app/.idea/
**/.idea/
app/*.txt
app/pidsToMonitor.txt
dataclay
......@@ -31,10 +31,15 @@ In this demo, a fake ElasticSystem is created. To have several fake Workers (Doc
cd app/fakeWorkers/
docker build -t fake_docker_workers .
```
or
```
docker pull rita09sousa/fake_docker_workers:2.0
docker image tag rita09sousa/fake_docker_workers:2.0 fake_docker_workers
```
Then, start some Workers with constant resource usage:
```
docker run -d --rm -it --name fakeworker<NUM_FAKEWORKER> fake_docker_workers fakeworker.py <NUM_FAKEWORKER>
docker run -d --rm -it -e FAKE_WORKER=<NUM_FAKEWORKER> --name fakeworker<NUM_FAKEWORKER> fake_docker_workers
```
where <NUM_FAKEWORKER> must be a number between 1 and 6.
......
FROM python:3.7-alpine
ENV fakeWorker 3
ENV FAKE_WORKER 3
WORKDIR /dockerFakeWorkers/
COPY ./fakeworker.py .
RUN chmod +x ./fakeworker.py
ENTRYPOINT ["./fakeworker.py"]
CMD ["${FAKE_WORKER}"]
CMD ["fakeworker.py ${fakeWorker}"]
ENTRYPOINT ["python3"]
########################## OLD AND HEAVY FAKEWORKERS ##########################
......
#!/usr/bin/python 3
import time
import sys
import threading
......@@ -30,13 +32,14 @@ def thread_function(name):
time.sleep(1)
if len(sys.argv) != 2:
print('Number of arguments wrong. Usage: {} <IdOfFakeworker>'.format(sys.argv[0]))
print("Number of arguments wrong. Usage: {} <IdOfFakeworker>".format(sys.argv[0]))
exit(1)
try:
idFakeWorker = int(sys.argv[1])
if idFakeWorker < 0 or idFakeWorker > 6:
print('Fakeworker ID wrong. It should be number between 1 and 6.')
print("Fakeworker ID wrong. It should be number between 1 and 6.")
exit(1)
except ValueError:
print("Argument is not an int")
......@@ -55,7 +58,7 @@ elif idFakeWorker == 5:
elif idFakeWorker == 6:
defineValues(500000, 2)
print(N, NUM_THREADS)
#print(N, NUM_THREADS)
if NUM_THREADS == 1:
thread_function(1)
......
/*
* Copyright 2020 Instituto Superior de Engenharia do Porto
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package app;
public class ActivationWorkersManager implements Runnable {
private ResourceManager rm;
private ActiveWorkersMap activeWorkers;
private float cpuReactivationThreshold;
private final int PERIOD = 5000;
ActivationWorkersManager(ResourceManager rm, ActiveWorkersMap activeWorkers, float cpuThreshold) {
this.rm = rm;
this.activeWorkers = activeWorkers;
this.cpuReactivationThreshold = cpuThreshold / 2;
}
public void run() {
while (!Thread.interrupted()) {
// This sleep is here temporarily (or not) because of the time of system initialization
try {
Thread.sleep(PERIOD);
} catch (InterruptedException e) {
System.out.println("Thread " + Thread.currentThread().getName() + " was interrupted. Aborting...");
break;
}
if (activeWorkers.size() == 0){
System.out.println("No active Workers. Let's activate another Worker.");
if(activeWorkers.reactivateWorker("NaW")){ // No active Workers
//rm.updateActiveWorkersEverywhere();
} else {
System.out.println("1- There are no healthy workers to reactivate! No workers were activated!");
}
continue;
}
activeWorkers.isHistoryUpdated(); // which also means that all metrics are filled
float actualCPUUsage = activeWorkers.getTotalCPUUsage();
if(actualCPUUsage < cpuReactivationThreshold && !activeWorkers.hasNan()){
System.out.printf("Current CPU usage of Node is %.2f. Let's activate another Worker.\n",actualCPUUsage);
if(activeWorkers.reactivateWorker("time")){
//rm.updateActiveWorkersEverywhere();
} else {
System.out.println("2- There are no healthy workers to reactivate! No workers were activated!");
}
}
}
}
}
/*
* Copyright 2020 Instituto Superior de Engenharia do Porto
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package app;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.json.JSONArray;
import org.json.JSONObject;
import es.bsc.compss.nfr.model.Worker;
import es.bsc.compss.nfr.model.Node;
public class ActiveWorkersMap {
private List<Worker> workerList;
// This map records the time and energy metrics received from the probes
private Map<Integer, JSONObject> activeWorkerMetrics;
// This map records small history of the metrics and it is used by
// DataclayWritingManager to write average metric in Dataclay
private Map<Integer, JSONArray> activeWorkersMetricsHistory;
private boolean updateEnergy = false;
private boolean historyMetricsFilled = false;
private boolean mapsUpdated = false;
private final int COMPUTING_UNITS = Runtime.getRuntime().availableProcessors();
ActiveWorkersMap(List<Worker> workerList) {
this.workerList = workerList;
this.activeWorkerMetrics = new ConcurrentHashMap<>();
this.activeWorkersMetricsHistory = new ConcurrentHashMap<>();
}
public int allWorkersSize() {
return workerList.size();
}
public String getWorkersAddress() {
return workerList.get(0).getIp();
}
public void inactiveWorker(int pidWorker) {
for (Worker worker : workerList) {
if (worker.getPid() == pidWorker) {
worker.setActive(false);
}
}
}
public String toString(){
return activeWorkerMetrics.toString();
}
public int size() {
return activeWorkerMetrics.size();
}
public Set<Integer> workerPids() {
return activeWorkerMetrics.keySet();
}
public Worker getWorkerByPid(int workerPid) {
for (Worker worker : workerList) {
if (worker.getPid() == workerPid) {
return worker;
}
}
return null;
}
public int getMonitoringPeriodByWorkerPid(int workerPid){
return getWorkerByPid(workerPid).getApplication().getMonitoringPeriod();
}
public void put(int workerPid, JSONObject metrics) {
try{
// Control the length of the history array (5 seconds = 5 metrics records)
int actualMonitoringPeriod = getMonitoringPeriodByWorkerPid(workerPid);
if (activeWorkersMetricsHistory.get(workerPid).length() == actualMonitoringPeriod) {
activeWorkersMetricsHistory.get(workerPid).remove(0);
}
// Correct the CPU usage by process to get in the whole "system", not by core
float cpuUsageSystem = metrics.getFloat("task-clock") / COMPUTING_UNITS;
metrics.put("task-clock", cpuUsageSystem);
// Add metrics received to calculate other values with CPU usage,
// e.g. energy consumed
activeWorkerMetrics.put(workerPid, metrics);
// Add metrics received to the history array
activeWorkersMetricsHistory.put(workerPid, activeWorkersMetricsHistory.get(workerPid).put(metrics));
}catch(NullPointerException ex){
System.out.println("!!WARNING!! Worker with PID " + workerPid+ " has active=" + getWorkerByPid(workerPid).isActive());
}
}
public boolean isWorkerActive(int workerPid) {
return activeWorkerMetrics.containsKey(workerPid);
}
public float getWorkerCPUUsage(int workerPid) {
return activeWorkerMetrics.get(workerPid).getFloat("task-clock");
}
public int getFirstActiveWorkerPid() {
if (size() > 0) {
Map.Entry<Integer, JSONObject> entry = activeWorkerMetrics.entrySet().iterator().next();
return entry.getKey();
}
return 0;
}
public boolean reactivateWorker(String dimension) {
if (allWorkersSize() > 0) {
Map<Integer,Worker> inactiveWorkers = new HashMap<>();
for (Worker w : workerList) {
if(!w.isActive() && (!(w.getDeactivationReasons().contains("security") || w.getDeactivationReasons().contains("communication")))){
inactiveWorkers.put(w.getPid(),w);
}
}
if(inactiveWorkers.size() > 0){
Map.Entry<Integer,Worker> entry = inactiveWorkers.entrySet().iterator().next();
int workerPidToActivate = entry.getKey();
if(dimension.equals("time")|| dimension.equals("NaW")){
float minCpuUsage = entry.getValue().getCpuUsage();
for (Worker worker : inactiveWorkers.values()) {
if (worker.getCpuUsage() < minCpuUsage) {
workerPidToActivate = worker.getPid();
minCpuUsage = worker.getCpuUsage();
}
}
} else if(dimension.equals("energy")){
float minEnergyUsage = entry.getValue().getEnergyUsage();
for (Worker worker : inactiveWorkers.values()) {
if (worker.getEnergyUsage() < minEnergyUsage) {
workerPidToActivate = worker.getPid();
minEnergyUsage = worker.getEnergyUsage();
}
}
}
// if(inactiveWorkers.get(workerPidToActivate).getDeactivationReason().contains(dimension) ||
// inactiveWorkers.get(workerPidToActivate).getDeactivationReason().contains("security")) {
inactiveWorkers.get(workerPidToActivate).setActive(true);
inactiveWorkers.get(workerPidToActivate).setComputingUnits(COMPUTING_UNITS);
inactiveWorkers.get(workerPidToActivate).setDeactivationReasons(new ArrayList<>());
//}
// int workerPidToActivate = inactiveWorkers.get(0).getPid();
// float minCpuUsage = inactiveWorkers.get(0).getCpuUsage();
// for (int i = 1; i < inactiveWorkers.size(); i++) {
// if (inactiveWorkers.get(i).getCpuUsage() < minCpuUsage) {
// workerPidToActivate = inactiveWorkers.get(i).getPid();
// minCpuUsage = inactiveWorkers.get(i).getCpuUsage();
// }
// }
// for (Worker w : workerList) {
// if(w.getPid() == workerPidToActivate & w.getDeactivationReason().equals("time")){
// w.setActive(true);
// w.setComputingUnits(COMPUTING_UNITS);
// }
// }
return true;
}
}
return false;
}
public int getMinCPUUsageWorkerPid() {
int minCPUUsageWorkerPid = getFirstActiveWorkerPid();
float minCPUUsage = getWorkerCPUUsage(minCPUUsageWorkerPid);
for (Integer workerPid : workerPids()) {
if (getWorkerCPUUsage(workerPid) < minCPUUsage) {
minCPUUsageWorkerPid = workerPid;
minCPUUsage = getWorkerCPUUsage(workerPid);
}
}
return minCPUUsageWorkerPid;
}
public boolean hasNan(){
float sum = 0;
for (JSONObject metrics : activeWorkerMetrics.values()) {
// TODO Edit this when new metrics become relevant
if((metrics.isNull("task-clock") || !metrics.has("task-clock"))|| !((!metrics.isNull("pkg") || metrics.has("pkg"))||(!metrics.isNull("SOCpower") || metrics.has("SOCpower")))) return true;
}
return false;
}
public float getTotalCPUUsage() {
//waitingToUpdateEnergy();
float sum = 0;
for (JSONObject metrics : activeWorkerMetrics.values()) {
sum += metrics.getFloat("task-clock");
}
return sum;
}
/*public List<Worker> checkSecurity() {
List<Worker> unsafeWorkers = new ArrayList<>();
for (Worker worker : workerList) {
if (worker.isActive()) {
if (!worker.getApplication().isSecure()) {
unsafeWorkers.add(worker);
}
}
}
return unsafeWorkers;
}*/
/*************************************************************************
**************************** Energy Monitor *****************************
*************************************************************************
* Energy Monitor is responsible to receive the metrics from the Energy probe.
* Since the energy metrics are measured by Node, and not by Worker, the
* EnergyMonitor thread has to wait for the TimeMonitor thread (which saves the
* CPU usage about each Worker) then fill the energy per Worker
************************************************************************/
public synchronized void energyWait() {
updateEnergy = false;
}
public synchronized void energyCanBeUpdated() {
updateEnergy = true;
notifyAll();
}
public synchronized void waitingToUpdateEnergy() {
while (!updateEnergy) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void addEnergyConsumed(int workerPid, String key, float metric) {
activeWorkerMetrics.get(workerPid).put(key, metric);
}
/*******************************************************************************
******************************* Resource Manager ******************************
*******************************************************************************
* Resource Manager is responsible to check NFRViolationQueue and maintain the
* Active Workers list updated in the Server and in the Probes. This method only
* update the list in the Server but the Resource Manager thread after then send
* the new list to the Probes through sockets
******************************************************************************/
public synchronized void waitForWorkersActivation() {
while (size() == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void updateActiveWorkersMetricsMap() {
mapsWillBeUpdated();
List<Integer> activeWorkersPids = getActualActiveWorkersPid();
if (size() == 0) {
notifyAll();
}
workerPids().forEach(pid -> {
if (!activeWorkersPids.contains(pid)) {
activeWorkerMetrics.remove(pid);
activeWorkersMetricsHistory.remove(pid);
} else {
activeWorkersPids.remove(pid);
}
});
if (activeWorkersPids.size() > 0) {
activeWorkersPids.forEach(pid -> {
activeWorkerMetrics.put(pid, new JSONObject());
activeWorkersMetricsHistory.put(pid, new JSONArray());
});
}
}
private List<Integer> getActualActiveWorkersPid() {
List<Integer> activeWorkersPids = new ArrayList<>();
workerList.forEach(w -> {
if (w.isActive())
activeWorkersPids.add(w.getPid());
});
return activeWorkersPids;
}
public synchronized void mapsWillBeUpdated() {
mapsUpdated = false;
}
public synchronized boolean areMapsUpdated(String inputLine) {
if(inputLine.equals("UPDATED") && mapsUpdated == false) {
mapsUpdated = true;
System.out.println("Active Workers Updated");
return true;
} else if(mapsUpdated == false){
System.out.println("Wainting Active Workers Update");
return true;
}
return false;
}
/*************************************************************************
************************ Dataclay Writing Manager ***********************
*************************************************************************
* Dataclay Writing Manager is responsible to write the metrics needed in
* Dataclay periodically (according to COMPSsApplication monitoringPeriod)
************************************************************************/
public synchronized void historyWillBeUpdated() {
historyMetricsFilled = false;
}
public synchronized void historyIsUpdated() {
historyMetricsFilled = true;
notifyAll();
}
public synchronized void isHistoryUpdated() {
while (!historyMetricsFilled) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Map<Integer, JSONArray> getActiveWorkersHistory() {
return activeWorkersMetricsHistory;
}
}
......@@ -38,14 +38,12 @@ import java.net.HttpURLConnection;
public class DataclayWritingManager implements Runnable{
private List<Worker> workerList;
private ActiveWorkersMap activeWorkers;
private List<String> idContainers;
private static final String BASE_GET_URL = "http://localhost/containers/";
private static final String USER_AGENT = "Mozilla/5.0";
DataclayWritingManager(List<Worker> workerList, ActiveWorkersMap activeWorkers) {
DataclayWritingManager(List<Worker> workerList) {
this.workerList = workerList;
this.activeWorkers = activeWorkers;
}
@Override
......@@ -260,7 +258,7 @@ public class DataclayWritingManager implements Runnable{
}
private void insertEnergyMetrics(JSONObject metricsJson){
// FIXME: Check Slack
}
private boolean isJSONValid(String test) {
......
......@@ -21,13 +21,14 @@ import app.mqtt_callback.MqttCallBack;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class EnergyMonitor implements Runnable {
public class Monitor implements Runnable {
private static final String TOPIC = "nuvlabox-status";
private ResourceManager rm;
EnergyMonitor(ResourceManager rm) {
Monitor(ResourceManager rm) {
this.rm = rm;
}
......@@ -39,12 +40,9 @@ public class EnergyMonitor implements Runnable {
MqttClient subscriber = null;
try{
subscriber = new MqttClient("tcp://datagateway", MqttClient.generateClientId());
subscriber.setCallback( new MqttCallBack("energy",rm) );
subscriber.setCallback( new MqttCallBack(rm) );
subscriber.connect();
// TODO: Get right topic for energy dimension
String[] topics = new String[] { "power-consumption" };
subscriber.subscribe(topics);
subscriber.subscribe(TOPIC);
} catch(MqttException e){
System.out.println("[ERROR] Creation of MQTT Energy Subscriber failed!!!");
e.printStackTrace();
......@@ -54,4 +52,4 @@ public class EnergyMonitor implements Runnable {
System.out.println(threadName + " Probe is connected!");
}
}
}
\ No newline at end of file
......@@ -49,7 +49,7 @@ public class NFRTool {
// FIXME: Threshold values for demo purposes only
private static final float CPU_THRESHOLD = 0.35f; // 0.8f;
private static final float ENERGY_THRESHOLD = 20.0f; // Watts
private static final float ENERGY_THRESHOLD = 0.5f; // Watts
private static float cpuThreshold;
private static float energyThreshold;
......@@ -182,33 +182,18 @@ public class NFRTool {
+" IP address=" + worker.getIp()
+" state=" + (worker.isActive() ? "active" : "inactive")));
ActiveWorkersMap metricsActiveWorkersList = new ActiveWorkersMap(workerList);
ResourceManager resourceManager = new ResourceManager(node);
//Thread rmThread = new Thread(resourceManager, "Resource Manager");