Commit cc8b4eb4 authored by Rita Sousa's avatar Rita Sousa

Added the necessary changes to be the GRM to create a fake ELASTIC system....

Added the necessary changes to be the GRM to create a fake ELASTIC system. Added lighter fake Workers.
parent c94f41d7
......@@ -95,4 +95,5 @@ app/.idea/
*/.idea/
**/.idea/
app/*.txt
dataclay
......@@ -32,18 +32,11 @@ cd app/fakeWorkers/
docker build -t fake_docker_workers .
```
Then, there are two options:
1. If you want to execute a specific fake Worker, you must send the number of the fakeworker as a parameter:
Then, start some Workers with constant resource usage:
```
docker run -e fakeWorker=NUM_FAKEWORKER -d --rm -it --name fakeworkerNUM_FAKEWORKER fake_docker_workers
docker run -d --rm -it --name fakeworker<NUM_FAKEWORKER> fake_docker_workers fakeworker.py <NUM_FAKEWORKER>
```
where NUM_FAKEWORKER must be a number between 1 and 6.
2. But if you simply run:
```
docker run -d --rm -it --name fakeworker fake_docker_workers
```
you will create a Worker with a low CPU consumption, between 20-30%.
where <NUM_FAKEWORKER> must be a number between 1 and 6.
After creating the desired fake Workers (attention to not overload the system), execute in nfrtool-time-and-energy/app directory
```
......
......@@ -31,4 +31,4 @@ VOLUME ${DC_SHARED_VOLUME}
ENTRYPOINT ["./dataclay_init.sh"]
CMD mvn clean compile exec:java -Dexec.cleanupDaemonThreads=false \
-Dexec.mainClass="app.NFRTool" \
-Dexec.args="wordcount --mode demo --ip 192.168.60.68 --agx" \
-Dexec.args="applicationName --mode demo --ip 192.168.60.68 --nvidia" \
Account=cister
Account=xavier-rit
Password=defaultPass
DataSets=defaultDS
DataSetForStore=defaultDS
......
FROM gcc:4.9
FROM python:3.7-alpine
ENV fakeWorker 3
COPY . /DockerFakeWorkers
WORKDIR /DockerFakeWorkers/
#RUN gcc -o fakeworker1 fakeworker.c
# fakeworker1 N=500000
# fakeworker2 N=50000
# fakeworker3 N=5000
RUN sed -i 's/500000/50000/' fakeworker.c
RUN gcc fakeworker.c -o fakeworker2
RUN sed -i 's/50000/5000/' fakeworker.c
RUN gcc fakeworker.c -o fakeworker3
RUN sed -i 's/5000/500000/' fakeworker.c
RUN gcc fakeworker.c -o fakeworker1
# fakeworker4 NUM_THREADS 6
# fakeworker5 NUM_THREADS 4
# fakeworker6 NUM_THREADS 2
RUN sed -i 's/6/4/' fakethreadworker.c
RUN gcc fakethreadworker.c -o fakeworker5 -lpthread
RUN sed -i 's/4/2/' fakethreadworker.c
RUN gcc fakethreadworker.c -o fakeworker6 -lpthread
RUN sed -i 's/2/6/' fakethreadworker.c
RUN gcc fakethreadworker.c -o fakeworker4 -lpthread
RUN [ "sh", "-c", "echo ${fakeWorker}" ]
CMD ["sh", "-c", "./fakeworker${fakeWorker}"]
WORKDIR /dockerFakeWorkers/
COPY ./fakeworker.py .
CMD ["fakeworker.py ${fakeWorker}"]
ENTRYPOINT ["python3"]
########################## OLD AND HEAVY FAKEWORKERS ##########################
# FROM gcc:4.9
# ENV fakeWorker 3
# COPY . /DockerFakeWorkers
# WORKDIR /DockerFakeWorkers/
# #RUN gcc -o fakeworker1 fakeworker.c
# # fakeworker1 N=500000
# # fakeworker2 N=50000
# # fakeworker3 N=5000
# RUN sed -i 's/500000/50000/' fakeworker.c
# RUN gcc fakeworker.c -o fakeworker2
# RUN sed -i 's/50000/5000/' fakeworker.c
# RUN gcc fakeworker.c -o fakeworker3
# RUN sed -i 's/5000/500000/' fakeworker.c
# RUN gcc fakeworker.c -o fakeworker1
# # fakeworker4 NUM_THREADS 6
# # fakeworker5 NUM_THREADS 4
# # fakeworker6 NUM_THREADS 2
# RUN sed -i 's/6/4/' fakethreadworker.c
# RUN gcc fakethreadworker.c -o fakeworker5 -lpthread
# RUN sed -i 's/4/2/' fakethreadworker.c
# RUN gcc fakethreadworker.c -o fakeworker6 -lpthread
# RUN sed -i 's/2/6/' fakethreadworker.c
# RUN gcc fakethreadworker.c -o fakeworker4 -lpthread
# RUN [ "sh", "-c", "echo ${fakeWorker}" ]
# CMD ["sh", "-c", "./fakeworker${fakeWorker}"]
import time
import sys
import threading
from functools import partial
# fakeworker1 N=500000 NUM_THREADS 1
# fakeworker2 N=50000 NUM_THREADS 1
# fakeworker3 N=5000 NUM_THREADS 1
# fakeworker4 N=500000 NUM_THREADS 6
# fakeworker5 N=500000 NUM_THREADS 4
# fakeworker6 N=500000 NUM_THREADS 2
# fakeworker3 is default
N = 5000
NUM_THREADS = 1
def defineValues(n, num_threads):
global N
N=n
global NUM_THREADS
NUM_THREADS=num_threads
#print(N, NUM_THREADS)
def thread_function(name):
while True:
i=0
fact = 1
for i in range(1,N,1):
fact *= i
time.sleep(1)
if len(sys.argv) != 2:
print('Number of arguments wrong. Usage: {} <IdOfFakeworker>'.format(sys.argv[0]))
exit(1)
try:
idFakeWorker = int(sys.argv[1])
if idFakeWorker < 0 or idFakeWorker > 6:
print('Fakeworker ID wrong. It should be number between 1 and 6.')
exit(1)
except ValueError:
print("Argument is not an int")
exit(1)
if idFakeWorker == 1:
defineValues(500000, 1)
elif idFakeWorker == 2:
defineValues(50000, 1)
elif idFakeWorker == 3:
defineValues(5000, 1)
elif idFakeWorker == 4:
defineValues(500000, 6)
elif idFakeWorker == 5:
defineValues(500000, 4)
elif idFakeWorker == 6:
defineValues(500000, 2)
print(N, NUM_THREADS)
if NUM_THREADS == 1:
thread_function(1)
else:
threads = list()
for index in range(NUM_THREADS-1):
print(index)
x = threading.Thread(target=thread_function, args=(index,))
threads.append(x)
x.start()
# for index, thread in enumerate(threads):
# thread.join()
......@@ -6,7 +6,7 @@ current_arch=`uname -m`
if [ $current_arch = "aarch64" ];
then
java -jar target/nfrtool-demo-2.1.jar testApp --mode demo --ip 127.0.0.1 --agx
java -jar target/nfrtool-demo-2.1.jar testApp --mode demo --ip 127.0.0.1 --nvidia
else
java -jar target/nfrtool-demo-2.1.jar testApp --mode demo --ip 127.0.0.1 --etr 19.0 --ttr 0.34
fi
......@@ -88,26 +88,7 @@ public class ActiveWorkersMap {
return null;
}
public String getViolationMessage(Node n, String dimension) {
//int workerPid = getFirstActiveWorkerPid();
//Worker w = getWorkerByPid(workerPid);
// FIXME: Find better way
String nodeIP = "localhost";
if(!n.getIpEth().equals("") || n.getIpEth() != null){
nodeIP = n.getIpEth();
} else if(!n.getIpLte().equals("") || n.getIpLte() != null){
nodeIP = n.getIpLte();
} else if(!n.getIpWifi().equals("") || n.getIpWifi() != null){
nodeIP = n.getIpWifi();
}
//String appUuid = w.getApplication().getUuid();
JSONObject obj = new JSONObject();
obj.put("dimension",dimension);
obj.put("nodeIP",nodeIP);
//obj.put("COMPSsAppUuid",appUuid);
return obj.toString();
}
public int getMonitoringPeriodByWorkerPid(int workerPid){
return getWorkerByPid(workerPid).getApplication().getMonitoringPeriod();
}
......
This diff is collapsed.
......@@ -27,7 +27,6 @@ import app.service.SendService;
public class ResourceManager {
private Node node;
private ActiveWorkersMap activeWorkers;
private float energyThreshold;
private float cpuThreshold;
private SendService sender;
......@@ -35,9 +34,8 @@ public class ResourceManager {
private final String URI = "amqps://ieeqvisb:zSPkZtMpktLAPStm-eL_6MPSKu3N1_Qe@chinook.rmq.cloudamqp.com/ieeqvisb";
private final static String QUEUE_NAME = "violations";
ResourceManager(Node node, ActiveWorkersMap activeWorkers) {
ResourceManager(Node node) {
this.node = node;
this.activeWorkers = activeWorkers;
this.energyThreshold = (float) 0.0;
this.cpuThreshold = (float) 0.0;
try{
......@@ -75,7 +73,7 @@ public class ResourceManager {
if(nodeMetrics.has("load")){
float thresholdFloat = nodeMetrics.getInt("capacity") * cpuThreshold;
if(nodeMetrics.getInt("load") > thresholdFloat){
String violationMessageStr = activeWorkers.getViolationMessage(node, topic);
String violationMessageStr = Utils.getViolationMessage(node, topic);
try {
sender.send(violationMessageStr);
} catch (Exception e) {
......@@ -94,7 +92,7 @@ public class ResourceManager {
if(nodeMetrics.has("used")){
float thresholdFloat = (float) nodeMetrics.getInt("capacity");
if(nodeMetrics.getInt("used") > thresholdFloat){
String violationMessageStr = activeWorkers.getViolationMessage(node, topic);
String violationMessageStr = Utils.getViolationMessage(node, topic);
try {
sender.send(violationMessageStr);
} catch (Exception e) {
......
/*
* Copyright 2020 Instituto Superior de Engenharia do Porto
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package app;
import java.util.List;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.Enumeration;
import es.bsc.compss.nfr.model.*;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
public class Utils {
/**
* Returns list of containers PIDs to monitor
* @return List of processes ID.
*/
private static List<Integer> getWorkersPid() {
List<Integer> pidsToMonitor = new ArrayList<>();
try {
Process process = Runtime.getRuntime().exec("cat pidsToMonitor.txt");
InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String pids = null;
while ((pids = bufferedReader.readLine()) != null) {
//System.out.println(pids);
for( String s : pids.split(" ")){
pidsToMonitor.add(Integer.parseInt(s));
}
}
process.waitFor();
bufferedReader.close();
process.destroy();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return pidsToMonitor;
}
/**
* Returns list of containers IPs address to monitor
* @return List of Workers IPs address.
*/
private static List<String> getWorkersIp() {
List<String> ipsToMonitor = new ArrayList<>();
try {
Process process = Runtime.getRuntime().exec("cat ipsToMonitor.txt");
InputStreamReader inputStreamReader = new InputStreamReader(process.getInputStream());
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String ips = null;
while ((ips = bufferedReader.readLine()) != null) {
//System.out.println(ips);
for( String s : ips.split(" ")){
ipsToMonitor.add(s.trim());
}
}
process.waitFor();
bufferedReader.close();
process.destroy();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return ipsToMonitor;
}
/**
* Adds Workers to ELASTIC System
* @param node node where Workers will be added
* @param app COMPSsApp where Workers will be added
* @param localIP IP address of given Node
*/
public static void addWorkersToSystem(Node node, COMPSsApplication app, String localIP) {
List<Integer> pidsToMonitor = getWorkersPid();
List<String> ipsToMonitor = getWorkersIp();
// FIXME: This is temporary so I don't need to be constantly cleaning the
// dataclay.
for (Worker w : node.getWorkers()) {
node.removeWorker(w);
}
app.removeWorkers();
//////////////////////////////////////////////////////////////////////////////////
// Create workers
float cpuUsage = 0.0f, energyUsage = 0.0f, communicationCost = 0.0f;
int computingUnits = Runtime.getRuntime().availableProcessors();
String ip = localIP;
List<String> deactivationReason = new ArrayList<>();
deactivationReason.add("");
if(pidsToMonitor.size() <= 0){
System.out.println("No Workers to Monitor!!!");
}
for(int i = 0; i < ipsToMonitor.size(); i++){
if(ipsToMonitor.get(i)!="null"){
Worker worker = new Worker(node, pidsToMonitor.get(i), true, app, cpuUsage, energyUsage, computingUnits,
ipsToMonitor.get(i), communicationCost, deactivationReason);
// Add worker to application
app.addWorker(worker);
}
}
}
/**
* Prints current status for a given ELASTIC system
* @param system Elastic system
*/
public static void getCurrentElasticSystem(ElasticSystem system) {
System.out.println();
System.out.println("Deployed ELASTIC system:");
System.out.println("Nodes:");
system.getNodes().forEach(node -> {
System.out.println(node.toString());
});
system.getApplications().forEach(app -> {
System.out.println(String.format("App: %s", app.getName()));
if(app.getMaster() != null && app.getMaster().getNode() != null){
System.out.println(String.format("\tMaster in %s with PID %d",
(app.getMaster().getNode().getIpWifi() == null ? app.getMaster().getNode().getIpEth()
: app.getMaster().getNode().getIpWifi()),
app.getMaster().getPid()));
}
System.out.println("\tWorkers:");
app.getWorkers().forEach(w -> System.out
.println(String.format("\t\t- %s (PID: %d), active = %b", w.getIp(), w.getPid(), w.isActive())));
System.out.println();
});
}
/**
* Returns list with Workers of a given Node and ELASTIC system
* @param system elasticSystem
* @param localIP IP address of Node
* @return list of Workers of a specific Worker
*/
public static List<Worker> getWorkersToMonitor(/*Node node*/ ElasticSystem system, String localIP) {
List<Worker> workerList = new ArrayList<Worker>();
//system.getApplications().forEach(app -> {
system.getNodes().forEach(node -> {
node.getWorkers().forEach(worker -> {
//if (worker.getIp().equals(localIP)) {
workerList.add(worker);
//}
});
});
//});
return workerList;
}
/**
* Get IP addresses
* @return list of IP addresses
*/
public static List<String> getLocalIP() {
List<String> ips = new ArrayList<>();
String localIP, publicIP;
String ipWifi, ipEth;
try {
localIP = InetAddress.getLocalHost().getHostAddress();
System.out.println("NFRTool current local IP address: " + localIP);
} catch (UnknownHostException e) {
e.printStackTrace();
localIP = "localhost";
}
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("google.com", 80));
publicIP = socket.getLocalAddress().getHostAddress();
ips.add(publicIP);
System.out.println("NFRTool current public IP address: " + publicIP);
socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
// Get IP address of network interfaces
try {
for (Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces(); nis.hasMoreElements(); ) {
NetworkInterface ni = nis.nextElement();
if (ni.getName().startsWith("e")) {
Enumeration<InetAddress> nifAddresses = ni.getInetAddresses();
//nifAddresses.nextElement();
ipEth = nifAddresses.nextElement().getHostName();
//System.out.println("IP eth: " + ipEth);
ips.add(ipEth);
}else if (ni.getName().startsWith("w")) {
Enumeration<InetAddress> nifAddresses = ni.getInetAddresses();
nifAddresses.nextElement();
ipWifi = nifAddresses.nextElement().getHostName();
//System.out.println("IP wifi: " + ipWifi);
ips.add(ipWifi);
}
// FIXME: Get Network Interface for LTE
}
} catch (SocketException e) {
e.printStackTrace();
System.out.println("[ERROR] Couldn't retrieve network interface list");
System.exit(1);
}
return ips;
}
/**
* Builds JSON format message to send to Global Resource Manager
* @param n Node
* @param dimension Time, Energy, Communication or Security string dimension
* @return string with JSON format
*/
public static String getViolationMessage(Node n, String dimension) {
//int workerPid = getFirstActiveWorkerPid();
//Worker w = getWorkerByPid(workerPid);
// FIXME: Find better way
String nodeIP = "localhost";
if(!n.getIpEth().equals("") || n.getIpEth() != null){
nodeIP = n.getIpEth();
} else if(!n.getIpLte().equals("") || n.getIpLte() != null){
nodeIP = n.getIpLte();
} else if(!n.getIpWifi().equals("") || n.getIpWifi() != null){
nodeIP = n.getIpWifi();
}
//String appUuid = w.getApplication().getUuid();
JSONObject obj = new JSONObject();
obj.put("dimension",dimension);
obj.put("nodeIP",nodeIP);
//obj.put("COMPSsAppUuid",appUuid);
return obj.toString();
}
}
\ No newline at end of file
......@@ -14,7 +14,7 @@ services:
ports:
- "2127:2127"
environment:
- DATASERVICE_HOST=192.168.60.18
- DATASERVICE_HOST=192.168.60.68
- DATASERVICE_NAME=DS1
- DATASERVICE_JAVA_PORT_TCP=2127
- LOGICMODULE_PORT_TCP=11034
......
......@@ -29,7 +29,7 @@ services:
ports:
- "3127:3127"
environment:
- DATASERVICE_HOST=192.168.60.18
- DATASERVICE_HOST=192.168.60.68
- DATASERVICE_NAME=DS2
- DATASERVICE_JAVA_PORT_TCP=3127
- LOGICMODULE_PORT_TCP=11034
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment