Commit 828e25c7 authored by Luís Nogueira's avatar Luís Nogueira

version with developments on CPU probes and dataclay model

parent 8afda64a
<factorypath>
<factorypathentry kind="VARJAR" id="M2_REPO/es/bsc/compss/nfrtool-dataclay-stubs/2.0/nfrtool-dataclay-stubs-2.0.jar" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="VARJAR" id="M2_REPO/es/bsc/dataclay/dataclay/2.1/dataclay-2.1.jar" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="VARJAR" id="M2_REPO/javax/annotation/javax.annotation-api/1.3.2/javax.annotation-api-1.3.2.jar" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="VARJAR" id="M2_REPO/io/grpc/grpc-netty-shaded/1.25.0/grpc-netty-shaded-1.25.0.jar" enabled="true" runInBatchMode="false"/>
......
......@@ -4,7 +4,11 @@
int main(){
for(;;){
printf("Worker %d is running...\n", getpid());
sleep(1);
int n=500000, i;
unsigned long long fact = 1;
for (i = 1; i <= n; ++i) {
fact *= i;
}
usleep(1);
}
}
#!/bin/bash
energy_now=`cat /sys/class/power_supply/BAT0/energy_now`
energy_full=`cat /sys/class/power_supply/BAT0/energy_full`
power_now=`cat /sys/class/power_supply/BAT0/power_now`
consumption_filled=false
charge=`echo $energy_now $energy_full | awk '{ printf("%d\n", $1/$2 * 100) }'`
#echo "Charge: "$charge"%"
if [ -d "/sys/class/power_supply/BAT0" ]
then
energy_now=`cat /sys/class/power_supply/BAT0/energy_now`
energy_full=`cat /sys/class/power_supply/BAT0/energy_full`
consumption=`echo $power_now | awk '{ printf("%d\n", $1*10^-6) }'`
#echo "Power consumption: "$consumption"W"
charge=`echo $energy_now $energy_full | awk '{ printf("%d\n", $1/$2 * 100)}'`
echo "{charge:"$charge",consumption:"$consumption}
status=$(cat /sys/class/power_supply/BAT0/status)
if [ $status = "Discharging" ]
then
power_now=`cat /sys/class/power_supply/BAT0/power_now`
consumption=`echo $power_now | awk '{ printf("%dW\n", $1*10^-6) }'`
consumption_filled=true
fi
else
charge=`echo -1`
fi
if [ "$consumption_filled" = false ]
then
CPU=`likwid-powermeter | grep Power | grep -Eo '[0-9]{1,}.[0-9]{1,}'`
PKG=`echo ""$CPU"" | { read a _; echo "$a"; }`
PP0=`echo ""$CPU"" | { read _ a _; echo "$a"; }`
PP1=`echo ""$CPU"" | { read _ _ a _; echo "$a"; }`
consumption=`echo $PKG $PP0 $PP1 | awk '{printf("%dW", $1+$2+$3)}'`
fi
echo "{charge:"$charge",consumption:"$consumption"}"
#sudo sh -c 'echo "kernel.perf_event_paranoid = -1" >> /etc/sysctl.conf'
#sudo sh -c 'echo -1 >/proc/sys/kernel/perf_event_paranoid'
(perf stat -e mem-loads,mem-stores -x, sleep 2) > mem_counters.log 2>&1
awk -F "," 'NR==1{printf "{%s:%d,",$3,$1} NR==2{printf "%s:%d}\n",$3,$1}' mem_counters.log
#!/bin/bash
ps -o pmem= -p $1 | awk '{printf("%d",$1 * 100)}'
#!/bin/bash
memutilization=`free -t | grep Mem | awk '{printf("%.2f"), $3/$2*100}'`
echo "{memutilization:"$memutilization"}"
#!/bin/bash
ps -eo pmem,pid,cmd | sort -k1 -r -n | head -10|awk '{printf("{pmem:%.2f,pid:%d,cmd:%s}\n",$1,$2,$3)}'
This diff is collapsed.
#!/bin/bash
fans=`sensors | grep fan | awk '{print $2}'`
len=$((`echo $fans| tr -d -c ' ' | wc -c`+1))
json="{"
for i in $(seq 1 $len)
do
label=`echo "fan$i:"$(echo $fans | cut -d ' ' -f$i)`
if [ "$i" -eq "$len" ]; then
json=$json$label"}"
break
fi
json=$json$label","
done
echo $json
#!/bin/bash
temperatures=`sensors | grep Core | awk '{print $3}'`
temp=`echo $temperatures | grep -Eo '[0-9]+[.][0.9]+'`
len=`echo $temp | tr -d -c '.' | wc -c`
json="{"
for i in $(seq 1 $len)
do
label=`echo "core$(($i-1)):"$(echo $temp | cut -d' ' -f$i)`
if [ "$i" -eq "$len" ]; then
json=$json$label"}"
break
fi
json=$json$label","
done
echo $json
package app;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.json.JSONObject;
import es.bsc.compss.nfr.model.Worker;
public class ActiveWorkersMap {
private List<Worker> workerList;
private Map<Integer,JSONObject> activeWorkersMetricsMap;
ActiveWorkersMap(List<Worker> workerList) {
this.workerList = workerList;
activeWorkersMetricsMap = new ConcurrentHashMap<>();
}
public synchronized void notifyActivationWorkers() {
notifyAll();
}
public synchronized void waitForWorkersActivation() {
synchronized(activeWorkersMetricsMap){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// public Map<Integer,JSONObject> initialize() {
// activeWorkersMetricsMap.clear();
// workerList.forEach(w ->{
// if(w.isActive())
// activeWorkersMetricsMap.put(w.getPid(), new JSONObject());
// });
// return activeWorkersMetricsMap;
// }
public int allWorkersSize(){
return workerList.size();
}
public String getWorkersAddress(){
return workerList.get(0).getAddress();
}
public void inactiveWorker(int pidWorker){
for (Worker worker : workerList) {
if(worker.getPid() == pidWorker){
worker.setActive(false);
}
}
}
public int size() {
return activeWorkersMetricsMap.size();
}
public Set<Integer> workerPids(){
return activeWorkersMetricsMap.keySet();
}
public Collection<JSONObject> allMetrics(){
return activeWorkersMetricsMap.values();
}
public void clearAllMetrics(){
activeWorkersMetricsMap.keySet().forEach(pid -> {
activeWorkersMetricsMap.put(pid, new JSONObject());
});
}
public boolean areAllMetricsFilled(){
for (JSONObject j : allMetrics()) {
if(j.isEmpty())
return false;
}
return true;
}
public Worker getWorkerByPid(int workerPid){
for (Worker worker : workerList) {
if (worker.getPid() == workerPid) {
return worker;
}
}
return null;
}
public JSONObject put(int workerPid, JSONObject metrics){
return activeWorkersMetricsMap.put(workerPid, metrics);
}
public boolean isWorkerActive(int workerPid){
return activeWorkersMetricsMap.containsKey(workerPid);
}
public float getWorkerCPUUsage(int workerPid){
return activeWorkersMetricsMap.get(workerPid).getFloat("cpu-clock")*100;
}
public int getFirstActiveWorkerPid(){
if(size() > 0){
Map.Entry<Integer,JSONObject> entry = activeWorkersMetricsMap.entrySet().iterator().next();
return entry.getKey();
}
return 0;
}
public int getMinCPUUsageWorkerPid(){
int minCPUUsageWorkerPid = getFirstActiveWorkerPid();
float minCPUUsage = getWorkerCPUUsage(minCPUUsageWorkerPid);
for (Integer workerPid : workerPids()) {
if(getWorkerCPUUsage(workerPid) < minCPUUsage){
minCPUUsageWorkerPid = workerPid;
minCPUUsage = getWorkerCPUUsage(workerPid);
}
}
return minCPUUsageWorkerPid;
}
public float getTotalCPUUsage(){
float sum = 0;
for (JSONObject metrics : activeWorkersMetricsMap.values()) {
sum += metrics.getFloat("cpu-clock")*100;
}
return sum;
}
// Used in Resource Manager
private List <Integer> getActualActiveWorkersPid(){
List<Integer> activeWorkersPids = new ArrayList<>();
workerList.forEach(w -> {
if(w.isActive())
activeWorkersPids.add(w.getPid());
});
return activeWorkersPids;
}
// public boolean isActiveWorkersUpdated() {
// List<Integer> activeWorkersPids = getActualActiveWorkersPid();
// if (activeWorkersMetricsMap.size() != activeWorkersPids.size()) {
// return false;
// }
// for (Integer pid : activeWorkersPids) {
// if(!activeWorkersMetricsMap.containsKey(pid))
// return false;
// }
// return true;
// }
public void updateActiveWorkersMetricsMap() {
List<Integer> activeWorkersPids = getActualActiveWorkersPid();
if(size() == 0){
notifyActivationWorkers();
}
workerPids().forEach(pid -> {
if(!activeWorkersPids.contains(pid)){
activeWorkersMetricsMap.remove(pid);
} else {
activeWorkersPids.remove(pid);
}
});
if(activeWorkersPids.size() > 0){
activeWorkersPids.forEach(pid -> {
activeWorkersMetricsMap.put(pid, new JSONObject());
});
}
}
}
\ No newline at end of file
......@@ -24,18 +24,33 @@ import org.json.JSONObject;
import es.bsc.compss.nfr.model.Worker;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
public class EnergyMonitor implements Runnable {
private NFRViolationQueue queue;
private List<Worker> workerList;
private int energyCapacity;
//private static final int SERVER_ENERGY_PORT = 8687;
EnergyMonitor(NFRViolationQueue queue, List<Worker> workerList) {
this.queue = queue;
this.workerList = workerList;
energyCapacity = 0;
}
private int calculateAverage(List<Integer> list) {
Integer sum = 0;
if(!list.isEmpty()) {
for (Integer value : list) {
sum += value;
}
return (int) sum/list.size();
}
return sum;
}
private int getCharge(){
int charge = 0;
......@@ -57,6 +72,57 @@ public class EnergyMonitor implements Runnable {
return charge;
}
private List<Integer> getTemperatures(){
List<Integer> temperatures = new ArrayList<>();
try {
Process p = Runtime.getRuntime().exec("probes/temp_fan/temperaturesprobe.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
String output = stdInput.readLine();
JSONObject jsonObject = new JSONObject(output);
Iterator<String> keys = jsonObject.keys();
while(keys.hasNext()) {
String key = keys.next();
temperatures.add(jsonObject.getInt(key));
}
} catch (Exception e) {
e.printStackTrace();
}
return temperatures;
}
private List<Integer> getFans(){
List<Integer> fans = new ArrayList<>();
try {
Process p = Runtime.getRuntime().exec("probes/temp_fan/fansprobe.sh");
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
String output = stdInput.readLine();
JSONObject jsonObject = new JSONObject(output);
Iterator<String> keys = jsonObject.keys();
while(keys.hasNext()) {
String key = keys.next();
fans.add(jsonObject.getInt(key));
}
} catch (Exception e) {
e.printStackTrace();
}
return fans;
}
private void getNodeEnergyCapacity(){
if(workerList.size() > 0){
Worker worker = workerList.get(0);
......@@ -68,16 +134,29 @@ public class EnergyMonitor implements Runnable {
getNodeEnergyCapacity();
int charge = 0;
int charge, meanTemperature, meanFan = 0;
while (!Thread.interrupted()) {
charge = getCharge();
if(charge < energyCapacity){
meanTemperature = calculateAverage(getTemperatures());
meanFan = calculateAverage(getFans());
if (charge == -1) {
System.out.println("Powered by eletric current");
} else if(charge < energyCapacity){
NFRViolation violation = new NFRViolation("energy", "charge", charge);
queue.addNFRViolation(violation);
System.out.println(Thread.currentThread().getName() + " added energy violation to queue: " + charge);
}
} /*else if(meanTemperature < energyCapacity){{
NFRViolation violation = new NFRViolation("energy", "temperature", meanTemperature);
queue.addNFRViolation(violation);
System.out.println(Thread.currentThread().getName() + " added energy violation to queue: " + meanTemperature);
} else if(meanFan < energyCapacity){{
NFRViolation violation = new NFRViolation("energy", "fan", meanFan);
queue.addNFRViolation(violation);
System.out.println(Thread.currentThread().getName() + " added energy violation to queue: " + meanFan);
}*/
try {
......@@ -89,4 +168,4 @@ public class EnergyMonitor implements Runnable {
}
}
}
\ No newline at end of file
}
......@@ -22,7 +22,6 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import es.bsc.dataclay.api.DataClay;
import es.bsc.compss.nfr.model.*;
......@@ -31,6 +30,10 @@ import es.bsc.dataclay.exceptions.metadataservice.ObjectNotRegisteredException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.json.JSONObject;
......@@ -39,6 +42,9 @@ import java.util.List;
*/
public class NFRTool {
private static final int CPU_CAPACITY = 80;
private static final int ENERGY_CAPACITY = 80;
public static ElasticSystem createNewElasticSystem(String localIP) throws Exception{
ElasticSystem system = new ElasticSystem();
......@@ -51,7 +57,7 @@ public class NFRTool {
// Create node1 and persist it
String nodeName = localIP;
Node localhost = new Node(nodeName, 100, 80, 30);
Node localhost = new Node(nodeName, CPU_CAPACITY, ENERGY_CAPACITY, 30);
localhost.makePersistent(nodeName);
system.addNode(localhost);
......@@ -67,29 +73,35 @@ public class NFRTool {
// Set master of application
app.setMaster(master);
int pid1 = 0, pid2 = 0;
// TODO: DESCOMENTAR
// int pid1 = 0, pid2 = 0, pid3 = 0;
// Execute fake workers to monitor
try {
// // Execute fake workers to monitor
// try {
Process p1 = Runtime.getRuntime().exec("fake_worker/fakeworker");
pid1 = (int) p1.pid();
// Process p1 = Runtime.getRuntime().exec("fake_worker/fakeworker");
// pid1 = (int) p1.pid();
Process p2 = Runtime.getRuntime().exec("fake_worker/fakeworker");
pid2 = (int) p2.pid();
// Process p2 = Runtime.getRuntime().exec("fake_worker/fakeworker");
// pid2 = (int) p2.pid();
// Process p3 = Runtime.getRuntime().exec("fake_worker/fakeworker");
// pid3 = (int) p3.pid();
} catch (Exception e) {
e.printStackTrace();
}
// } catch (Exception e) {
// e.printStackTrace();
// }
// Create workers
Worker worker1 = new Worker(node, pid1 ,true, app);
Worker worker2 = new Worker(node, pid2 ,true, app);
// Add worker to application
app.addWorker(worker1);
app.addWorker(worker2);
// // Create workers
// Worker worker1 = new Worker(node, pid1 ,true, app);
// Worker worker2 = new Worker(node, pid2 ,true, app);
// Worker worker3 = new Worker(node, pid3 ,true, app);
// // Add worker to application
// app.addWorker(worker1);
// app.addWorker(worker2);
// app.addWorker(worker3);
// Print application information
System.out.println("=== Application information ===");
......@@ -159,31 +171,80 @@ public class NFRTool {
system = createNewElasticSystem(localIP);
}
// TODO: APAGAR
int pid1 = 0, pid2 = 0, pid3 = 0;
// Execute fake workers to monitor
try {
Process p1 = Runtime.getRuntime().exec("fake_worker/fakeworker1");
pid1 = (int) p1.pid();
Process p2 = Runtime.getRuntime().exec("fake_worker/fakeworker2");
pid2 = (int) p2.pid();
Process p3 = Runtime.getRuntime().exec("fake_worker/fakeworker3");
pid3 = (int) p3.pid();
} catch (Exception e) {
e.printStackTrace();
}
Node node = system.getNodes().get(0);
COMPSsApplication app = system.getApplications().get(0);
// Create workers
Worker worker1 = new Worker(node, pid1 ,true, app);
Worker worker2 = new Worker(node, pid2 ,true, app);
Worker worker3 = new Worker(node, pid3 ,true, app);
for (Worker w : app.getWorkers()) {
w.setActive(false);
}
// Add worker to application
app.addWorker(worker1);
app.addWorker(worker2);
app.addWorker(worker3);
////////////////////////////////////////////
List<Worker> workerList = getWorkersToMonitor(system,localIP);
System.out.println("Workers running locally:");
workerList.forEach(worker -> System.out.println("Worker: " + worker.getPid()));
workerList.forEach(worker -> System.out.println("Worker: " + worker.getPid()
+ (worker.isActive()? " active":" inactive")));
NFRViolationQueue queue = new NFRViolationQueue();
ResourceManager resourceManager = new ResourceManager(queue, workerList);
Thread rmThread = new Thread(resourceManager,"Resource Manager");
ActiveWorkersMap metricsActiveWorkersList = new ActiveWorkersMap(workerList);
TimeMonitor timeMonitor = new TimeMonitor(queue, workerList);
ResourceManager resourceManager = new ResourceManager(queue, metricsActiveWorkersList);
Thread rmThread = new Thread(resourceManager,"Resource Manager");
TimeMonitor timeMonitor = new TimeMonitor(resourceManager, queue, metricsActiveWorkersList);
Thread tmThread = new Thread(timeMonitor,"Time Monitor");
EnergyMonitor energyMonitor = new EnergyMonitor(queue, workerList);
Thread emThread = new Thread(energyMonitor,"Energy Monitor");
// EnergyMonitor energyMonitor = new EnergyMonitor(activeWorkersManager, queue, workerList);
// Thread emThread = new Thread(energyMonitor,"Energy Monitor");
rmThread.start();
tmThread.start();
emThread.start();
//emThread.start();
Thread.sleep(25000);
System.out.println("\nWorker " + worker1.getPid()+" will be activated, for some reason...");
worker1.setActive(true);
resourceManager.updateActiveWorkersEverywhere();
Thread.sleep(10000);
System.out.println("\nWorker " + worker1.getPid()+" will be activated, for some reason...");
worker1.setActive(true);
resourceManager.updateActiveWorkersEverywhere();
rmThread.join();
tmThread.join();
emThread.join();
//emThread.join();
exitGracefully();
}
......
......@@ -20,9 +20,9 @@ package app;
public class NFRViolation{
private String dimension;
private String attribute;
private int value;
private float value;
NFRViolation(String dimension, String attribute, int value){
NFRViolation(String dimension, String attribute, float value){
this.dimension = dimension;
this.attribute = attribute;
this.value = value;
......@@ -36,7 +36,7 @@ public class NFRViolation{
return this.attribute;
}
public int getValue(){
public float getValue(){
return this.value;
}
......
......@@ -17,112 +17,108 @@
package app;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import es.bsc.compss.nfr.model.Worker;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
public class ResourceManager implements Runnable {
private NFRViolationQueue queue;
private List<Worker> workerList;
private ActiveWorkersMap activeWorkers;
private Socket socketTime;
private Socket socketEnergy;
ResourceManager(NFRViolationQueue queue, List<Worker> workerList) {
private final int PERIOD = 100; // milliseconds
ResourceManager(NFRViolationQueue queue, ActiveWorkersMap activeWorkers) {
this.queue = queue;
this.workerList = workerList;
// this.workerList = workerList;
this.activeWorkers = activeWorkers;
}
private List<Worker> getActiveWorkers(List<Worker> workerList){
ArrayList<Worker> activeWorkers = new ArrayList<Worker>(0);
workerList.forEach(w -> {
if(w.isActive())
activeWorkers.add(w);
});
public synchronized void addSocketTime(Socket s) {
this.socketTime = s;
}
return activeWorkers;
public synchronized void addSocketEnergy(Socket s) {
this.socketEnergy = s;
}
private int getWorkerCPUUSage(Worker w){
int pcpu = 0;
private void actOnNFRViolation(NFRViolation violation) {
System.out.println(
Thread.currentThread().getName() + " acting on detected NFR violation: " + violation.getAttribute());
try {
String[] command = new String[2];