Commit d4179d4c authored by Luís Nogueira's avatar Luís Nogueira

updated to the newest version of the dataclay model

parent 828e25c7
This diff is collapsed.
/*
* Copyright 2020 Instituto Superior de Engenharia do Porto
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package app;
public class ActivationWorkersManager implements Runnable {
private ResourceManager rm;
private ActiveWorkersMap activeWorkers;
private final int PERIOD = 15000; // milliseconds
private final float CPU_REACTIVATION_THRESHOLD = 0.5f;
ActivationWorkersManager(ResourceManager rm, ActiveWorkersMap activeWorkers) {
this.rm = rm;
this.activeWorkers = activeWorkers;
}
public void run() {
while (!Thread.interrupted()) {
// This sleep is here temporarily (or not) because of the time of system initialization
try {
Thread.sleep(PERIOD);
} catch (InterruptedException e) {
System.out.println("Thread " + Thread.currentThread().getName() + " was interrupted. Aborting...");
break;
}
if (activeWorkers.size() == 0){
System.out.println("No active Workers. Let's activate another Worker.");
activeWorkers.reactivateWorker();
rm.updateActiveWorkersEverywhere();
continue;
}
activeWorkers.energyCanBeUpdated(); // which also means that all Active Workers has CPU usage filled
float actualCPUUsage = activeWorkers.getTotalCPUUsage();
if(actualCPUUsage < CPU_REACTIVATION_THRESHOLD){
System.out.printf("Current CPU usage of Node is %.2f. Let's activate another Worker.\n",actualCPUUsage);
activeWorkers.reactivateWorker();
rm.updateActiveWorkersEverywhere();
}
}
}
}
/*
* Copyright 2020 Instituto Superior de Engenharia do Porto
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package app;
import java.util.ArrayList;
......@@ -15,17 +32,13 @@ public class ActiveWorkersMap {
private List<Worker> workerList;
private Map<Integer,JSONObject> activeWorkersMetricsMap;
private boolean updateEnergy = false;
ActiveWorkersMap(List<Worker> workerList) {
this.workerList = workerList;
activeWorkersMetricsMap = new ConcurrentHashMap<>();
}
public synchronized void notifyActivationWorkers() {
notifyAll();
}
public synchronized void waitForWorkersActivation() {
synchronized(activeWorkersMetricsMap){
try {
......@@ -51,7 +64,34 @@ public class ActiveWorkersMap {
}
public String getWorkersAddress(){
return workerList.get(0).getAddress();
return workerList.get(0).getIp();
}
public synchronized void activateWorker(int pidWorker){
for (Worker worker : workerList) {
if (worker.getPid() == pidWorker) {
worker.setActive(true);
}
}
}
public synchronized void energyWait() {
updateEnergy = false;
}
public synchronized void energyCanBeUpdated() {
updateEnergy = true;
notifyAll();
}
public synchronized void waitingToUpdateEnergy() {
while (!updateEnergy) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void inactiveWorker(int pidWorker){
......@@ -102,12 +142,17 @@ public class ActiveWorkersMap {
return activeWorkersMetricsMap.put(workerPid, metrics);
}
public void addEnergyConsumed(int workerPid, String key, float metric){
activeWorkersMetricsMap.get(workerPid).put(key, metric);
}
public boolean isWorkerActive(int workerPid){
return activeWorkersMetricsMap.containsKey(workerPid);
}
public float getWorkerCPUUsage(int workerPid){
return activeWorkersMetricsMap.get(workerPid).getFloat("cpu-clock")*100;
return activeWorkersMetricsMap.get(workerPid).getFloat("task-clock");
}
public int getFirstActiveWorkerPid(){
......@@ -118,6 +163,24 @@ public class ActiveWorkersMap {
return 0;
}
public boolean reactivateWorker(){
if(allWorkersSize() > 0){
int workerToActivate = 0;
float minCPUUsage = workerList.get(0).getCpuUsage();
for (int i = 1; i < allWorkersSize(); i++) {
if(!(workerList.get(i).isActive())
&& workerList.get(i).getCpuUsage() < minCPUUsage){
workerToActivate = i;
minCPUUsage = workerList.get(i).getCpuUsage();
}
}
workerList.get(workerToActivate).setActive(true);
return true;
}
return false;
}
public int getMinCPUUsageWorkerPid(){
int minCPUUsageWorkerPid = getFirstActiveWorkerPid();
......@@ -135,7 +198,7 @@ public class ActiveWorkersMap {
public float getTotalCPUUsage(){
float sum = 0;
for (JSONObject metrics : activeWorkersMetricsMap.values()) {
sum += metrics.getFloat("cpu-clock")*100;
sum += metrics.getFloat("task-clock");
}
return sum;
}
......@@ -165,11 +228,13 @@ public class ActiveWorkersMap {
// return true;
// }
public void updateActiveWorkersMetricsMap() {
public synchronized void updateActiveWorkersMetricsMap() {
List<Integer> activeWorkersPids = getActualActiveWorkersPid();
if(size() == 0){
notifyActivationWorkers();
notifyAll();
}
workerPids().forEach(pid -> {
if(!activeWorkersPids.contains(pid)){
activeWorkersMetricsMap.remove(pid);
......
......@@ -18,153 +18,125 @@
package app;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import org.json.JSONObject;
import es.bsc.compss.nfr.model.Worker;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
public class EnergyMonitor implements Runnable {
private ResourceManager rm;
private NFRViolationQueue queue;
private List<Worker> workerList;
private int energyCapacity;
private ActiveWorkersMap activeWorkers;
private float energyThreshold;
private Socket socketEnergy;
//private static final int SERVER_ENERGY_PORT = 8687;
private static final int SERVER_ENERGY_PORT = 8687;
EnergyMonitor(NFRViolationQueue queue, List<Worker> workerList) {
EnergyMonitor(ResourceManager rm, NFRViolationQueue queue, ActiveWorkersMap activeWorkersMetrics) {
this.rm = rm;
this.queue = queue;
this.workerList = workerList;
energyCapacity = 0;
}
private int calculateAverage(List<Integer> list) {
Integer sum = 0;
if(!list.isEmpty()) {
for (Integer value : list) {
sum += value;
}
return (int) sum/list.size();
}
return sum;
}
private int getCharge(){
int charge = 0;
try {
Process p = Runtime.getRuntime().exec("probes/energy/energyprobe.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
String output = stdInput.readLine();
JSONObject jsonObject = new JSONObject(output);
charge = jsonObject.getInt("charge");
} catch (Exception e) {
e.printStackTrace();
}
return charge;
}
private List<Integer> getTemperatures(){
List<Integer> temperatures = new ArrayList<>();
try {
Process p = Runtime.getRuntime().exec("probes/temp_fan/temperaturesprobe.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
String output = stdInput.readLine();
JSONObject jsonObject = new JSONObject(output);
Iterator<String> keys = jsonObject.keys();
while(keys.hasNext()) {
String key = keys.next();
temperatures.add(jsonObject.getInt(key));
}
} catch (Exception e) {
e.printStackTrace();
}
return temperatures;
}
private List<Integer> getFans(){
List<Integer> fans = new ArrayList<>();
try {
Process p = Runtime.getRuntime().exec("probes/temp_fan/fansprobe.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
String output = stdInput.readLine();
JSONObject jsonObject = new JSONObject(output);
Iterator<String> keys = jsonObject.keys();
while(keys.hasNext()) {
String key = keys.next();
fans.add(jsonObject.getInt(key));
}
} catch (Exception e) {
e.printStackTrace();
}
return fans;
this.activeWorkers = activeWorkersMetrics;
energyThreshold = (float) 0.0;
}
private void getNodeEnergyCapacity(){
if(workerList.size() > 0){
Worker worker = workerList.get(0);
energyCapacity = worker.getNode().getEnergyCapacity();
if (activeWorkers.size() > 0) {
int workerPid = activeWorkers.getFirstActiveWorkerPid();
Worker worker = activeWorkers.getWorkerByPid(workerPid);
energyThreshold = worker.getNode().getEnergyThreshold();
}
}
public void run() {
String threadName = Thread.currentThread().getName();
getNodeEnergyCapacity();
int charge, meanTemperature, meanFan = 0;
try (ServerSocket serverSocket = new ServerSocket(SERVER_ENERGY_PORT)) {
while (!Thread.interrupted()) {
System.out.println(threadName + " Server is listening on port " + SERVER_ENERGY_PORT);
charge = getCharge();
meanTemperature = calculateAverage(getTemperatures());
meanFan = calculateAverage(getFans());
String[] command = new String[3];
command[0] = "probes/probeEnergy";
command[1] = String.valueOf(activeWorkers.getWorkersAddress());
command[2] = String.valueOf(SERVER_ENERGY_PORT);
if (charge == -1) {
System.out.println("Powered by eletric current");
} else if(charge < energyCapacity){
NFRViolation violation = new NFRViolation("energy", "charge", charge);
queue.addNFRViolation(violation);
System.out.println(Thread.currentThread().getName() + " added energy violation to queue: " + charge);
} /*else if(meanTemperature < energyCapacity){{
NFRViolation violation = new NFRViolation("energy", "temperature", meanTemperature);
queue.addNFRViolation(violation);
System.out.println(Thread.currentThread().getName() + " added energy violation to queue: " + meanTemperature);
} else if(meanFan < energyCapacity){{
NFRViolation violation = new NFRViolation("energy", "fan", meanFan);
queue.addNFRViolation(violation);
System.out.println(Thread.currentThread().getName() + " added energy violation to queue: " + meanFan);
}*/
Process p = Runtime.getRuntime().exec(command);
System.out.println(threadName + " Probe starts running");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted. Aborting...");
break;
System.out.println(threadName + " Server waiting" + " for the connection of " + threadName + " Probe");
socketEnergy = serverSocket.accept();
System.out.println(threadName + " Probe is connected!");
// Send Time's Socket to thread Resource Manager for future updates in Time
// Monitor Probe
rm.addSocketEnergy(socketEnergy);
try {
System.out.println(threadName + " waiting for metrics...");
String inputLine = "";
BufferedReader in = new BufferedReader(new InputStreamReader(socketEnergy.getInputStream()));
while ((inputLine = in.readLine()) != null) {
JSONObject energy = new JSONObject(inputLine);
activeWorkers.energyWait();
float totalEnergyConsumed = 0.0f;
// This and next verification are necessary as the probe may have already sent metrics
// against Workers that have been disabled in the meantime
if(activeWorkers.size() > 0) {
activeWorkers.waitingToUpdateEnergy();
for (Integer pid : activeWorkers.workerPids()) {
if (activeWorkers.isWorkerActive(pid)) {
float cpuUsage = activeWorkers.getWorkerCPUUsage(pid);
totalEnergyConsumed = energy.getFloat("pkg");
System.out.println(totalEnergyConsumed);
// TODO: Ver melhor como é o threashold para energia
activeWorkers.addEnergyConsumed(pid, "power",cpuUsage*100*totalEnergyConsumed);
}
}
// If so, check the Node capacity
getNodeEnergyCapacity();
System.out.printf("\nNode Energy consumed/capacity : %.2f / %.2f\n\n", totalEnergyConsumed,
energyThreshold);
if (totalEnergyConsumed > energyThreshold) {
System.out.println(threadName + " added time violation to queue: " + totalEnergyConsumed);
NFRViolation violation = new NFRViolation("energy", "energyload", totalEnergyConsumed);
queue.addNFRViolation(violation);
}
}
}
} catch (IOException e) {
System.out.println(threadName + " failed reading from Time Monitor Probe");
e.printStackTrace();
}
} catch (Exception e) {
socketEnergy.close();
System.out.println(threadName + " Socket exception: " + e.getMessage());
e.printStackTrace();
}
} catch (IOException e) {
System.out.println(threadName + " Server exception: " + e.getMessage());
e.printStackTrace();
}
}
......
......@@ -30,10 +30,6 @@ import es.bsc.dataclay.exceptions.metadataservice.ObjectNotRegisteredException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.json.JSONObject;
......@@ -42,8 +38,8 @@ import org.json.JSONObject;
*/
public class NFRTool {
private static final int CPU_CAPACITY = 80;
private static final int ENERGY_CAPACITY = 80;
private static final float CPU_THRESHOLD = 0.8f;
private static final float ENERGY_THRESHOLD = 20.0f; //Watts
public static ElasticSystem createNewElasticSystem(String localIP) throws Exception{
......@@ -52,21 +48,25 @@ public class NFRTool {
String applicationName = "testApp";
// Create application
COMPSsApplication app = new COMPSsApplication(applicationName,"1.0");
int monitoringPeriod = 3000;
COMPSsApplication app = new COMPSsApplication(applicationName,"1.0",monitoringPeriod);
system.addApplication(app);
// Create node1 and persist it
String nodeName = localIP;
Node localhost = new Node(nodeName, CPU_CAPACITY, ENERGY_CAPACITY, 30);
String ipWifi = "localhost", ipEth = "localhost", ipLte = "localhost";
float signalWifi = (float) 0.0;
int numCores = Runtime.getRuntime().availableProcessors();
Node localhost = new Node(ipWifi, ipEth, ipLte, CPU_THRESHOLD, ENERGY_THRESHOLD, signalWifi, numCores);
localhost.makePersistent(nodeName);
system.addNode(localhost);
// Get node by alias
Node node = Node.getByAlias(nodeName);
// Create master
Master master = new Master(node, 111);
Master master = new Master(node, 111, localIP);
master.setApplication(app);
master.makePersistent();
......@@ -115,9 +115,9 @@ public class NFRTool {
System.out.println("Deployed ELASTIC system:");
system.getApplications().forEach(app -> {
System.out.println(String.format("App: %s", app.getName()));
System.out.println(String.format("\tMaster: %s with PID %d", app.getMaster().getNode().getName(), app.getMaster().getPid()));
System.out.println(String.format("\tMaster in %s with PID %d", app.getMaster().getNode().getIpWifi(), app.getMaster().getPid()));
System.out.println("\tWorkers:");
app.getWorkers().forEach(w -> System.out.println(String.format("\t\t- %s (PID: %d), active = %b", w.getAddress(), w.getPid(), w.isActive())));
app.getWorkers().forEach(w -> System.out.println(String.format("\t\t- %s (PID: %d), active = %b", w.getIp(), w.getPid(), w.isActive())));
System.out.println();
});
......@@ -131,7 +131,7 @@ public class NFRTool {
system.getApplications().forEach(app -> {
app.getWorkers().forEach(worker -> {
if(worker.getAddress().equals(localIP)){
if(worker.getIp().equals(localIP)){
workerList.add(worker);
}
});
......@@ -194,9 +194,9 @@ public class NFRTool {
COMPSsApplication app = system.getApplications().get(0);
// Create workers
Worker worker1 = new Worker(node, pid1 ,true, app);
Worker worker2 = new Worker(node, pid2 ,true, app);
Worker worker3 = new Worker(node, pid3 ,true, app);
Worker worker1 = new Worker(node, pid1 ,true, app,0,0,localIP,0.0f);
Worker worker2 = new Worker(node, pid2 ,true, app,0,0,localIP,0.0f);
Worker worker3 = new Worker(node, pid3 ,true, app,0,0,localIP,0.0f);
for (Worker w : app.getWorkers()) {
w.setActive(false);
......@@ -207,7 +207,6 @@ public class NFRTool {
app.addWorker(worker3);
////////////////////////////////////////////
List<Worker> workerList = getWorkersToMonitor(system,localIP);
System.out.println("Workers running locally:");
workerList.forEach(worker -> System.out.println("Worker: " + worker.getPid()
......@@ -220,31 +219,26 @@ public class NFRTool {
ResourceManager resourceManager = new ResourceManager(queue, metricsActiveWorkersList);
Thread rmThread = new Thread(resourceManager,"Resource Manager");
ActivationWorkersManager activationWorkersManager = new ActivationWorkersManager(resourceManager, metricsActiveWorkersList);
Thread awmThread = new Thread(activationWorkersManager,"Activation Workers Manager");
TimeMonitor timeMonitor = new TimeMonitor(resourceManager, queue, metricsActiveWorkersList);
Thread tmThread = new Thread(timeMonitor,"Time Monitor");
// EnergyMonitor energyMonitor = new EnergyMonitor(activeWorkersManager, queue, workerList);
// Thread emThread = new Thread(energyMonitor,"Energy Monitor");
EnergyMonitor energyMonitor = new EnergyMonitor(resourceManager, queue, metricsActiveWorkersList);
Thread emThread = new Thread(energyMonitor,"Energy Monitor");
rmThread.start();
tmThread.start();
//emThread.start();
Thread.sleep(25000);
System.out.println("\nWorker " + worker1.getPid()+" will be activated, for some reason...");
worker1.setActive(true);
resourceManager.updateActiveWorkersEverywhere();
Thread.sleep(10000);
System.out.println("\nWorker " + worker1.getPid()+" will be activated, for some reason...");
worker1.setActive(true);
resourceManager.updateActiveWorkersEverywhere();
emThread.start();
awmThread.start();
rmThread.join();
tmThread.join();
//emThread.join();
emThread.join();
awmThread.join();
exitGracefully();
}
......
......@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
public class ResourceManager implements Runnable {
private NFRViolationQueue queue;
......@@ -34,7 +33,6 @@ public class ResourceManager implements Runnable {
ResourceManager(NFRViolationQueue queue, ActiveWorkersMap activeWorkers) {
this.queue = queue;
// this.workerList = workerList;
this.activeWorkers = activeWorkers;
}
......@@ -64,7 +62,7 @@ public class ResourceManager implements Runnable {
}
}
public void updateActiveWorkersEverywhere() { //TODO: mudar para private
public void updateActiveWorkersEverywhere() {
// Update the map
activeWorkers.updateActiveWorkersMetricsMap();
// Update the probes
......@@ -76,13 +74,11 @@ public class ResourceManager implements Runnable {
writerTime = new PrintWriter(socketTime.getOutputStream(), true);
writerTime.printf("%d,%s", activeWorkers.size(), cleanList);
System.out.printf("Active Workers PID list -> %s\n", cleanList);
/*
* writerEnergy = new PrintWriter(socketEnergy.getOutputStream(), true);
* writerEnergy.printf("%d,%s", activeWorkers.size(), cleanList);
* System.out.println("List updated in probe energy");
*/
writerEnergy = new PrintWriter(socketEnergy.getOutputStream(), true);
writerEnergy.printf("%d,%s", activeWorkers.size(), cleanList);
System.out.printf("Active Workers PID list -> %s\n", cleanList);
} catch (IOException e) {
System.out.println(Thread.currentThread().getName() + " failed updating PID list!!");
......@@ -91,7 +87,7 @@ public class ResourceManager implements Runnable {
}
public void run() {
// As Resource Manager changes the active workers, it is also responsible to send the
// As Resource Manager changes the active workers, it is also responsible to send the
// actual active workers to probes so, RM waits for TM and EM to get the sockets
do {
try {
......@@ -99,11 +95,8 @@ public class ResourceManager implements Runnable {
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println(Thread.currentThread().getName() + " waiting for the Time
// and Energy Monitor Socket");
System.out.println(Thread.currentThread().getName() + " waiting for the Time Monitor Socket");
} while (this.socketTime == null/* && !socketEnergy.isConnected() */);
System.out.println(Thread.currentThread().getName() + " waiting for the Time and Energy Monitor Socket");
} while (this.socketTime == null || this.socketEnergy == null);
updateActiveWorkersEverywhere();
......
......@@ -22,19 +22,17 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collection;
import java.util.Collections;
import org.json.JSONObject;
import es.bsc.compss.nfr.model.Worker;
import es.bsc.compss.nfr.model.*;
public class TimeMonitor implements Runnable {
private ResourceManager rm;
private NFRViolationQueue queue;
private ActiveWorkersMap activeWorkers;
private int cpuCapacity;
private float cpuThreshold;
private Socket socketTime;
private static final int SERVER_TIME_PORT = 8686;
......@@ -43,14 +41,14 @@ public class TimeMonitor implements Runnable {
this.rm = rm;
this.queue = queue;
this.activeWorkers = activeWorkersMetrics;
cpuCapacity = 0;
cpuThreshold = (float) 0.0;
}
private void getNodeCpuCapacity() {
private void getNodeCpuThreshold() {
if (activeWorkers.size() > 0) {
int workerPid = activeWorkers.getFirstActiveWorkerPid();
Worker worker = activeWorkers.getWorkerByPid(workerPid);
cpuCapacity = worker.getNode().getCPUCapacity();
cpuThreshold = worker.getNode().getCPUThreshold();
}
}
......@@ -61,14 +59,14 @@ public class TimeMonitor implements Runnable {
System.out.println(threadName + " Server is listening on port " + SERVER_TIME_PORT);
/* String[] command = new String[3];
String[] command = new String[3];
command[0] = "probes/probeTime";
command[1] = String.valueOf(activeWorkers.getWorkersAddress());
command[2] = String.valueOf(SERVER_TIME_PORT);
Process p = Runtime.getRuntime().exec(command);
System.out.println(threadName + " Probe starts running"); */
System.out.println(threadName + " Probe starts running");