Commit 3244ba97 authored by Rita Sousa's avatar Rita Sousa

Updated Time and Energy monitoring with periodic writing on Dataclay

parent d4179d4c
Go to /dataclay directory and initialize dataclay
(Start dataClay)
$ ./start_dataclay.sh
(Start dataClay)
```
./start_dataclay.sh
```
(Register model located in `model` folder into started dataClay instance)
$ ./register_model.sh
```
./register_model.sh
```
(Get stubs of your registered model)
$ ./get_stubs.sh
```
./get_stubs.sh
```
Go to /app directory
(To be able to compile the app outside a container, install the stubs in the local maven repository)
$ ./install_mvn_stubs.sh
(To be able to compile the app outside a container, install the stubs in the local maven repository)
```
./install_mvn_stubs.sh
```
Finnaly, run the demo which includes Time and Energy requirements
$ ./run_nfrtool.sh
\ No newline at end of file
```
./run_nfrtool.sh
```
OR
Simply run located on /app directory:
```
cd ../dataclay/ ; ./stop_dataclay.sh ; ./clean.sh ; ./start_dataclay.sh ; ./register_model.sh ; ./get_stubs.sh ; cd ../app/ ; ./install_mvn_stubs.sh
./run_nfrtool.sh
```
Warnings:
The Java version has to be greater than 8
See https://docs.oracle.com/javase/9/docs/api/java/lang/Process.html#pid--
The metrics are measured through perf_event_open so the appropriate package must be installed depending on the environment. You should try to use perf from the command line and see what packages are required and then install them.
Before run NFRTool, check the value in /proc/sys/kernel/perf_event_paranoid and change to -1
```
cat /proc/sys/kernel/perf_event_paranoid
sudo sysctl -w kernel.perf_event_paranoid=-1
```
<factorypath>
<factorypathentry kind="VARJAR" id="M2_REPO/es/bsc/compss/nfrtool-dataclay-stubs/2.0/nfrtool-dataclay-stubs-2.0.jar" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="VARJAR" id="M2_REPO/es/bsc/dataclay/dataclay/2.1/dataclay-2.1.jar" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="VARJAR" id="M2_REPO/javax/annotation/javax.annotation-api/1.3.2/javax.annotation-api-1.3.2.jar" enabled="true" runInBatchMode="false"/>
<factorypathentry kind="VARJAR" id="M2_REPO/io/grpc/grpc-netty-shaded/1.25.0/grpc-netty-shaded-1.25.0.jar" enabled="true" runInBatchMode="false"/>
......
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
pthread_t tid[4];
void *function(void *var)
{
pthread_t id = pthread_self();
for(;;){
int n=500000, i;
unsigned long long fact = 1;
for (i = 1; i <= n; ++i) {
fact *= i;
}
//printf("%lu\n",id);
usleep(1);
}
}
int main()
{
int i;
for (i = 0; i < 4; i++)
{
pthread_create(&(tid[i]), NULL, &function, NULL);
}
for (i = 0; i < 4; i++)
{
pthread_join((tid[i]), NULL);
}
return 0;
}
#!/bin/bash
loadavg=`cat /proc/loadavg`
pastminute=`echo $loadavg|awk '{printf $1 * 100}'`
past5minutes=`echo $loadavg|awk '{printf $2 * 100}'`
past15minutes=`echo $loadavg|awk '{printf $3 * 100}'`
echo "{cpuload:"$pastminute"}"
#!/bin/bash
ps -o pcpu= -p $1 | awk '{printf("%d",$1 * 100)}'
#!/bin/bash
ps -eo pcpu,pid,cmd | sort -k1 -r -n | head -10|awk '{printf("{pcpu:%.2f,pid:%d,cmd:%s}\n",$1,$2,$3)}'
#!/bin/bash
consumption_filled=false
if [ -d "/sys/class/power_supply/BAT0" ]
then
energy_now=`cat /sys/class/power_supply/BAT0/energy_now`
energy_full=`cat /sys/class/power_supply/BAT0/energy_full`
charge=`echo $energy_now $energy_full | awk '{ printf("%d\n", $1/$2 * 100)}'`
status=$(cat /sys/class/power_supply/BAT0/status)
if [ $status = "Discharging" ]
then
power_now=`cat /sys/class/power_supply/BAT0/power_now`
consumption=`echo $power_now | awk '{ printf("%dW\n", $1*10^-6) }'`
consumption_filled=true
fi
else
charge=`echo -1`
fi
if [ "$consumption_filled" = false ]
then
CPU=`likwid-powermeter | grep Power | grep -Eo '[0-9]{1,}.[0-9]{1,}'`
PKG=`echo ""$CPU"" | { read a _; echo "$a"; }`
PP0=`echo ""$CPU"" | { read _ a _; echo "$a"; }`
PP1=`echo ""$CPU"" | { read _ _ a _; echo "$a"; }`
consumption=`echo $PKG $PP0 $PP1 | awk '{printf("%dW", $1+$2+$3)}'`
fi
echo "{charge:"$charge",consumption:"$consumption"}"
#sudo sh -c 'echo "kernel.perf_event_paranoid = -1" >> /etc/sysctl.conf'
#sudo sh -c 'echo -1 >/proc/sys/kernel/perf_event_paranoid'
(perf stat -e mem-loads,mem-stores -x, sleep 2) > mem_counters.log 2>&1
awk -F "," 'NR==1{printf "{%s:%d,",$3,$1} NR==2{printf "%s:%d}\n",$3,$1}' mem_counters.log
#!/bin/bash
ps -o pmem= -p $1 | awk '{printf("%d",$1 * 100)}'
#!/bin/bash
memutilization=`free -t | grep Mem | awk '{printf("%.2f"), $3/$2*100}'`
echo "{memutilization:"$memutilization"}"
#!/bin/bash
ps -eo pmem,pid,cmd | sort -k1 -r -n | head -10|awk '{printf("{pmem:%.2f,pid:%d,cmd:%s}\n",$1,$2,$3)}'
#!/bin/bash
pid=$1
time=$2
(perf stat -x , -p $pid sleep $time) > metrics$pid.log 2>&1
awk -v pide="$pid" -v time="$time" -F "," 'BEGIN { printf "{pid:%d,",pide}
NR==1{printf "%s:%.3f,",$4,($1/(1000*time))}
NR==2{printf "%s:%d,",$3,$1}
NR==3{printf "%s:%d,",$3,$1}
NR==4{printf "%s:%d,",$3,$1}
NR==5{printf "%s:%d,",$3,$1}
NR==6{printf "%s:%d,",$3,$1}
$3 ~ /instructions/{Ins=$1} $3 ~ /cycles/ {Cy=$1} END {printf "insn-per-cycle:%.3f,", Ins/Cy}
NR==7{printf "%s:%d,",$3,$1}
NR==8{printf "%s:%d,",$3,$1}
$3 ~ /branch-misses/{Bm=$1} $3 ~ /branches/ {B=$1} END {printf "branch-missed-perc:%.2f}\n", Bm/B*100}' metrics$pid.log
rm metrics$pid.log
\ No newline at end of file
No preview for this file type
This diff is collapsed.
No preview for this file type
......@@ -13,6 +13,7 @@
#include <linux/perf_event.h>
#include <asm/unistd.h>
// Each PID will have one associated structure
// The struct attributes will assist the processes monitoring
// pid - process ID
......@@ -44,7 +45,7 @@ void createThreadsForNewExistingPids();
/****************************************************************************************************
**************************************** Monitoring Metrics ****************************************
****************************************************************************************************/
#define PERIOD 5 // Period of metrics measurement
#define PERIOD 1 // Period of metrics measurement
#define NUM_EVENTS 10 // Number of metrics measured
enum
{
......@@ -69,22 +70,41 @@ struct read_format
uint64_t value[NUM_EVENTS];
};
void *sendMedtricsToServer(void *pos);
void *sendMetricsToServer(void *pos);
int countingEvents(int *counters_fd, int pid);
static int setup_cntr(int *const fd, uint64_t type, uint64_t event, int pid, int leader);
int _perf_event_open(struct perf_event_attr *attr, pid_t pid, int cpu, int group_fd, unsigned long flags);
int force_use_script = 0;
void *sendMetricsToServerScript(void *pos);
int main(int argc, char **argv)
{
int ret = EXIT_FAILURE;
// int option;
// // Decide if uses perf stat (script) or perf_event_open (system call)
// while ((option = getopt(argc, argv, ":if:sc")) != -1)
// {
// switch (option)
// {
// case 's':
// force_use_script = 1;
// break;
// case 'c':
// break;
// }
// }
if(strcmp(argv[3],"script")==0){
force_use_script = 1;
}
// Connect to the server
int serv_port = 0;
struct sockaddr_in serv_addr;
if (argc != 3)
if (argc != 4)
{
fprintf(stderr, "ERROR! Usage: %s <addressServer> <portServer>\n", argv[0]);
fprintf(stderr, "ERROR! Usage: %s <addressServer> <portServer> <typeMeasure>\n", argv[0]);
return 0;
}
......@@ -149,7 +169,14 @@ int main(int argc, char **argv)
printf("Thread %d com pid %d e esta por ativar 2=%d\n", arr_pid[i].array_position, arr_pid[i].pid, arr_pid[i].active);
int *position = malloc(sizeof(arr_pid[i].array_position));
*position = arr_pid[i].array_position;
pthread_create(&t[i], NULL, sendMedtricsToServer, (void *)position);
if (force_use_script == 0)
{
pthread_create(&t[i], NULL, sendMetricsToServer, (void *)position);
}
else
{
pthread_create(&t[i], NULL, sendMetricsToServerScript, (void *)position);
}
//arr_pid[i].t = t[i];
}
......@@ -271,7 +298,14 @@ void createThreadsForNewExistingPids()
*arg = arr_pid[i].array_position;
pthread_t thread;
pthread_create(&thread, NULL, sendMedtricsToServer, arg);
if (force_use_script == 0)
{
pthread_create(&thread, NULL, sendMetricsToServer, arg);
}
else
{
pthread_create(&thread, NULL, sendMetricsToServerScript, arg);
}
}
}
updateMonitoringPIDs();
......@@ -319,7 +353,6 @@ static int setup_cntr(int *const fd, uint64_t type, uint64_t event, int pid, int
return (*fd < 0);
}
// Usar cpu-clock ou task-clock para dar metrica de cpu usage
int countingEvents(int *counters_fd, int pid)
{
int ret;
......@@ -342,7 +375,7 @@ int countingEvents(int *counters_fd, int pid)
}
// Thread responsible to monitorize one process and send this information to the server
void *sendMedtricsToServer(void *pos)
void *sendMetricsToServer(void *pos)
{
int i = *((int *)pos);
free(pos);
......@@ -396,8 +429,42 @@ void *sendMedtricsToServer(void *pos)
pthread_exit(NULL);
}
void *sendMetricsToServerScript(void *pos)
{
int i = *((int *)pos);
free(pos);
arr_pid[i].active = 1;
char command[50] = {0};
memset(&command, 0, sizeof command);
sprintf(command, "./probes/perf_probe.sh %d %d", arr_pid[i].pid, PERIOD);
printf("Path of %d: %s\n", arr_pid[i].pid, command);
// Start monitoring
do
{
FILE *fp = popen(command, "r");
if (fp == NULL)
printf("Failed to run perf_probe.sh\n");
char buffer[1024] = {0};
memset(&buffer, 0, sizeof buffer);
fgets(buffer, sizeof(buffer), fp);
printf("Send buffer of %d: %s\n", arr_pid[i].pid, buffer);
int n = write(sock_cli, buffer, strlen(buffer));
if (n < 0)
perror("ERROR! Writing to server!!!\n");
pclose(fp);
} while (arr_pid[i].active != 0);
printf("Thread KILLED with PID=%d (0=%d)\n", arr_pid[i].pid, arr_pid[i].active);
pthread_exit(NULL);
}
// [1] Power and Performance Software Analysis and Optimization Jim Kukunas (Chapter 8)
/* The
vast majority of events are nonarchitectural, that is, their behavior can vary from
/* The vast majority of events are nonarchitectural, that is, their behavior can vary from
one processor architecture to the next. */
\ No newline at end of file
#!/bin/bash
fans=`sensors | grep fan | awk '{print $2}'`
len=$((`echo $fans| tr -d -c ' ' | wc -c`+1))
json="{"
for i in $(seq 1 $len)
do
label=`echo "fan$i:"$(echo $fans | cut -d ' ' -f$i)`
if [ "$i" -eq "$len" ]; then
json=$json$label"}"
break
fi
json=$json$label","
done
echo $json
#!/bin/bash
temperatures=`sensors | grep Core | awk '{print $3}'`
temp=`echo $temperatures | grep -Eo '[0-9]+[.][0.9]+'`
len=`echo $temp | tr -d -c '.' | wc -c`
json="{"
for i in $(seq 1 $len)
do
label=`echo "core$(($i-1)):"$(echo $temp | cut -d' ' -f$i)`
if [ "$i" -eq "$len" ]; then
json=$json$label"}"
break
fi
json=$json$label","
done
echo $json
#!/bin/bash
mvn clean package
java -jar target/nfrtool-demo-2.1.jar elastic --node elasticnode 192.168.1.1 100 80 --master elasticnode 111 --worker elasticnode 222 127.0.0.1
java -jar target/nfrtool-demo-2.1.jar testApp --mode demo --ip 127.0.0.1
......@@ -21,9 +21,10 @@ public class ActivationWorkersManager implements Runnable {
private ResourceManager rm;
private ActiveWorkersMap activeWorkers;
private final int PERIOD = 15000; // milliseconds
private final float CPU_REACTIVATION_THRESHOLD = 0.5f;
private final int PERIOD = 10000;
// FIXME: Half of CPU_THRESHOLD
private final float CPU_REACTIVATION_THRESHOLD = 0.1f;
ActivationWorkersManager(ResourceManager rm, ActiveWorkersMap activeWorkers) {
this.rm = rm;
......@@ -44,18 +45,20 @@ public class ActivationWorkersManager implements Runnable {
if (activeWorkers.size() == 0){
System.out.println("No active Workers. Let's activate another Worker.");
activeWorkers.reactivateWorker();
rm.updateActiveWorkersEverywhere();
continue;
if(activeWorkers.reactivateWorker()){
rm.updateActiveWorkersEverywhere();
continue;
}
}
activeWorkers.energyCanBeUpdated(); // which also means that all Active Workers has CPU usage filled
activeWorkers.isHistoryUpdated(); // which also means all metrics are filled
float actualCPUUsage = activeWorkers.getTotalCPUUsage();
if(actualCPUUsage < CPU_REACTIVATION_THRESHOLD){
System.out.printf("Current CPU usage of Node is %.2f. Let's activate another Worker.\n",actualCPUUsage);
activeWorkers.reactivateWorker();
rm.updateActiveWorkersEverywhere();
if(activeWorkers.reactivateWorker()){
rm.updateActiveWorkersEverywhere();
}
}
}
}
......
This diff is collapsed.
/*
* Copyright 2020 Instituto Superior de Engenharia do Porto
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package app;
import java.util.List;
import java.util.Map;
import org.json.JSONArray;
import org.json.JSONObject;
import es.bsc.compss.nfr.model.Worker;
public class DataclayWritingManager implements Runnable {
private int monitoringPeriod;
private List<Worker> workerList;
private ActiveWorkersMap activeWorkers;
DataclayWritingManager(int monitoringPeriod, List<Worker> workerList, ActiveWorkersMap activeWorkers) {
this.monitoringPeriod = monitoringPeriod;
this.workerList = workerList;
this.activeWorkers = activeWorkers;
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
Thread.sleep(monitoringPeriod);
} catch (InterruptedException e) {
System.out.println("Thread " + Thread.currentThread().getName() + " was interrupted. Aborting...");
break;
}
activeWorkers.isHistoryUpdated();
// Add/Update CPU mean usage in DataClay Worker
Map<Integer, JSONArray> activeWorkersMetricsHistory = activeWorkers.getActiveWorkersHistory();
for (Integer pidWorker : activeWorkersMetricsHistory.keySet()) {
JSONArray metricsHistory = activeWorkersMetricsHistory.get(pidWorker);
float sumCpuUsage = 0.0f;
float sumEnergyUsage = 0.0f;
int lengthHistory = metricsHistory.length();
for (int i = 0; i < metricsHistory.length(); i++) {
JSONObject metrics = metricsHistory.getJSONObject(i);
if (metrics.isEmpty() || (!metrics.has("task-clock")) || (!metrics.has("power"))) {
lengthHistory--;
continue;
}
sumCpuUsage += metrics.getFloat("task-clock");
sumEnergyUsage += metrics.getFloat("power");
}
float meanCpuUsage = sumCpuUsage / lengthHistory;
float meanEnergyUsage = sumEnergyUsage / lengthHistory;
for (Worker w : workerList) {
if (w.getPid() == pidWorker) {
w.setCpuUsage(meanCpuUsage);
w.setEnergyUsage(meanEnergyUsage);
System.out.printf(
"Worker %d writes on Dataclay its average CPU and Energy usage\t|%.2f|\t|%.2f|\n",
pidWorker, meanCpuUsage, meanEnergyUsage);
break;
}
}
}
}
}
}
......@@ -28,7 +28,7 @@ import org.json.JSONObject;
import es.bsc.compss.nfr.model.Worker;
public class EnergyMonitor implements Runnable {
private ResourceManager rm;
private NFRViolationQueue queue;
private ActiveWorkersMap activeWorkers;
......@@ -40,11 +40,11 @@ public class EnergyMonitor implements Runnable {
EnergyMonitor(ResourceManager rm, NFRViolationQueue queue, ActiveWorkersMap activeWorkersMetrics) {
this.rm = rm;
this.queue = queue;
this.activeWorkers = activeWorkersMetrics;
this.activeWorkers = activeWorkersMetrics;
energyThreshold = (float) 0.0;
}
private void getNodeEnergyCapacity(){
private void getNodeEnergyCapacity() {
if (activeWorkers.size() > 0) {
int workerPid = activeWorkers.getFirstActiveWorkerPid();
Worker worker = activeWorkers.getWorkerByPid(workerPid);
......@@ -63,8 +63,8 @@ public class EnergyMonitor implements Runnable {
command[0] = "probes/probeEnergy";
command[1] = String.valueOf(activeWorkers.getWorkersAddress());
command[2] = String.valueOf(SERVER_ENERGY_PORT);
Process p = Runtime.getRuntime().exec(command);
// Start execution of Time Monitor Probe
Runtime.getRuntime().exec(command);
System.out.println(threadName + " Probe starts running");
......@@ -75,7 +75,7 @@ public class EnergyMonitor implements Runnable {
System.out.println(threadName + " Probe is connected!");
// Send Time's Socket to thread Resource Manager for future updates in Time
// Send Energy's Socket to thread Resource Manager for future updates in Energy
// Monitor Probe
rm.addSocketEnergy(socketEnergy);
......@@ -83,37 +83,49 @@ public class EnergyMonitor implements Runnable {
System.out.println(threadName + " waiting for metrics...");
String inputLine = "";
JSONObject energyMetricsReceived;
float totalEnergyConsumed = 0.0f;
BufferedReader in = new BufferedReader(new InputStreamReader(socketEnergy.getInputStream()));
while ((inputLine = in.readLine()) != null) {
JSONObject energy = new JSONObject(inputLine);
// New data has arrived, so history metrics must not be accessed
activeWorkers.historyWillBeUpdated();
// Check for activated Workers
if (activeWorkers.size() == 0) {
activeWorkers.waitForWorkersActivation();
// in = new BufferedReader(new InputStreamReader(socketTime.getInputStream()));
continue;
}
energyMetricsReceived = new JSONObject(inputLine);
// EnergyMonitor has to wait for TimeMonitor
activeWorkers.energyWait();
float totalEnergyConsumed = 0.0f;
// This and next verification are necessary as the probe may have already sent metrics
// against Workers that have been disabled in the meantime
if(activeWorkers.size() > 0) {
activeWorkers.waitingToUpdateEnergy();
for (Integer pid : activeWorkers.workerPids()) {
if (activeWorkers.isWorkerActive(pid)) {
float cpuUsage = activeWorkers.getWorkerCPUUsage(pid);
totalEnergyConsumed = energy.getFloat("pkg");
System.out.println(totalEnergyConsumed);
// TODO: Ver melhor como é o threashold para energia
activeWorkers.addEnergyConsumed(pid, "power",cpuUsage*100*totalEnergyConsumed);
}
// if (activeWorkers.size() > 0) {
// While TimeMonitor does not save the metrics, more specifically CPU usage, it
// is not possible to determine the consumptions spent by Worker.
activeWorkers.waitingToUpdateEnergy();
// After the time metrics are filled, the energy metrics can be filled
for (Integer pid : activeWorkers.workerPids()) {
// This verification is necessary as the probe may have already sent
// metrics against Workers that have been disabled in the meantime
if (activeWorkers.isWorkerActive(pid)) {
float cpuUsage = activeWorkers.getWorkerCPUUsage(pid);
totalEnergyConsumed = energyMetricsReceived.getFloat("pkg");
activeWorkers.addEnergyConsumed(pid, "power", cpuUsage * totalEnergyConsumed);
}
}
// After the time and energy metrics are filled, the history can be accessed
activeWorkers.historyIsUpdated();
// If so, check the Node capacity
getNodeEnergyCapacity();
System.out.printf("\nNode Energy consumed/capacity : %.2f / %.2f\n\n", totalEnergyConsumed,
System.out.printf("[EnergyMonitor]\tNode Energy consumed/capacity : %.2f / %.2f\n", totalEnergyConsumed,
energyThreshold);
if (totalEnergyConsumed > energyThreshold) {
......@@ -121,8 +133,7 @@ public class EnergyMonitor implements Runnable {
NFRViolation violation = new NFRViolation("energy", "energyload", totalEnergyConsumed);
queue.addNFRViolation(violation);
}
}
//}
}
} catch (IOException e) {
......
This diff is collapsed.
......@@ -29,7 +29,7 @@ public class ResourceManager implements Runnable {
private Socket socketTime;
private Socket socketEnergy;
private final int PERIOD = 100; // milliseconds
private final int PERIOD = 500; // milliseconds
ResourceManager(NFRViolationQueue queue, ActiveWorkersMap activeWorkers) {
this.queue = queue;
......@@ -48,14 +48,21 @@ public class ResourceManager implements Runnable {
System.out.println(
Thread.currentThread().getName() + " acting on detected NFR violation: " + violation.getAttribute());
// If we have any workers running locally
// If we have any workers
if (activeWorkers.allWorkersSize() > 0) {
// If there are active workers left
if (activeWorkers.size() > 0) {
int minCPUUsageWorkerPid = activeWorkers.getMinCPUUsageWorkerPid();
activeWorkers.inactiveWorker(minCPUUsageWorkerPid);
System.out.println("Worker " + minCPUUsageWorkerPid + " is now inactive...");
updateActiveWorkersEverywhere();
int computingUnits = activeWorkers.getWorkerByPid(minCPUUsageWorkerPid).getComputingUnits();
if(computingUnits > 0){
activeWorkers.getWorkerByPid(minCPUUsageWorkerPid).setComputingUnits(computingUnits-1);
System.out.println("[ResourceManager]\tWorker " + minCPUUsageWorkerPid + " should run in " + computingUnits + " computing units...");
} else {
activeWorkers.inactiveWorker(minCPUUsageWorkerPid);
System.out.println("[ResourceManager]\tWorker " + minCPUUsageWorkerPid + " is now inactive...");
updateActiveWorkersEverywhere();
}
} else {
System.out.println("No more active workers. There is nothing I can do :(");
}
......@@ -65,6 +72,7 @@ public class ResourceManager implements Runnable {
public void updateActiveWorkersEverywhere() {
// Update the map
activeWorkers.updateActiveWorkersMetricsMap();
// Update the probes
PrintWriter writerTime;
PrintWriter writerEnergy;
......
......@@ -36,7 +36,7 @@ public class TimeMonitor implements Runnable {