Commit e7cd2ff1 authored by esabate's avatar esabate
Browse files

Heuristics adapted with zip and removed overhead and dummy array

parent 33be70dc
......@@ -30,6 +30,9 @@ public class DAG {
// float matrix containing the data size transfers between tasks
private float[][] z;
// String matrix containing the ids of the transfers between tasks
private String[][] zid;
// boolean matrix indicating the precedence relationships between tasks
private boolean[][] succ;
......@@ -40,7 +43,7 @@ public class DAG {
private float safetyMargin;
public DAG(Map<String, float[]> C, float[][] z, boolean[][] succ, int n) {
public DAG(Map<String, float[]> C, float[][] z, String[][] zid, boolean[][] succ, int n) {
this.C = new HashMap<>();
this.origC = new HashMap<>();
for (String worker : C.keySet()) {
......@@ -49,6 +52,7 @@ public class DAG {
}
this.z = Arrays.copyOf(z, z.length);
this.zid = Arrays.copyOf(zid, zid.length);
this.succ = Arrays.copyOf(succ, succ.length);
this.n = n;
this.safetyMargin = 0.0f; // TODO: update it with the correct safetyMargin used
......@@ -74,6 +78,14 @@ public class DAG {
this.z = z;
}
public String[][] getZid() {
return this.zid;
}
public String getZid(int pred, int succ) {
return this.zid[pred][succ];
}
public boolean[][] getSucc() {
return this.succ;
}
......@@ -119,6 +131,14 @@ public class DAG {
System.out.println();
}
System.out.println("Zid:");
for (int i = 0; i < this.n; ++i) {
for (int j = 0; j < this.n; ++j) {
System.out.print(this.zid[i][j] + " ");
}
System.out.println();
}
System.out.println("C:");
for (String worker : this.C.keySet()) {
float[] times = this.C.get(worker);
......
......@@ -17,25 +17,20 @@
package es.bsc.compss.scheduler.heuristics;
import es.bsc.compss.log.Loggers;
import es.bsc.compss.util.ErrorManager;
import org.apache.kafka.common.protocol.types.Field;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.PriorityQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.util.*;
public class LNS {
// Logger
protected static final Logger LOGGER = LogManager.getLogger(Loggers.TS_COMP);
// DAG containing C, z and succ
private DAG dag;
......@@ -52,6 +47,8 @@ public class LNS {
private ArrayList<Boolean> dummies;
private float networkMargin;
public LNS(List<String> workers) {
HeuristicsConfiguration.load();
......@@ -102,6 +99,21 @@ public class LNS {
z[i][j] = Float.parseFloat(st[j]);
}
}
br.readLine();
br.readLine();
String[][] zid = new String[n][n];
for (int i = 0; i < n; ++i) {
lines = br.readLine();
st = lines.trim().split("\\s+");
for (int j = 0; j < n; ++j) {
if (j == 0) {
int pos = st[0].indexOf('[');
st[0] = st[0].substring(pos + 1);
}
zid[i][j] = st[j];
}
}
br.readLine();
lines = br.readLine();
st = lines.trim().split("\\s+");
......@@ -141,31 +153,13 @@ public class LNS {
}
}
br.readLine();
lines = br.readLine();
st = lines.trim().split("\\s+");
this.dummies = new ArrayList<>(n);
for (int j = 0; j < n; j++) {
if (j == 0) {
int pos = st[j + 2].indexOf('[');
if (st[j + 2].substring(pos + 1).charAt(0) == '0') {
this.dummies.add(false);
} else {
this.dummies.add(true);
this.dummyNodes++;
}
} else {
if (st[j + 2].charAt(0) == '0') {
this.dummies.add(false);
} else {
this.dummies.add(true);
this.dummyNodes++;
}
}
}
this.dummies = new ArrayList<>(Arrays.asList(new Boolean[n]));
Collections.fill(this.dummies, Boolean.FALSE);
this.dummyNodes = 0;
lines = br.readLine();
st = lines.trim().split("\\s+");
float overhead = Float.parseFloat(st[2].substring(0, st[2].indexOf(';')));
this.networkMargin = Float.parseFloat(st[2].substring(0, st[2].indexOf(';')));
ArrayList<String> workersOrder = new ArrayList<>(m);
lines = br.readLine();
......@@ -179,7 +173,7 @@ public class LNS {
}
}
this.resources = new Resources(ibw, mfs, h, overhead, m);
this.resources = new Resources(ibw, mfs, h, m);
resources.setResourceNames(workers);
HashMap<String, float[]> mapC = new HashMap<>();
......@@ -191,9 +185,12 @@ public class LNS {
// mapC.put(workers.get(j), times);
mapC.put(workersOrder.get(j), times);
}
this.dag = new DAG(mapC, z, succ, n);
this.dag = new DAG(mapC, z, zid, succ, n);
} catch (IOException | StringIndexOutOfBoundsException | ArrayIndexOutOfBoundsException
| NullPointerException e) {
StringWriter error = new StringWriter();
e.printStackTrace(new PrintWriter(error));
LOGGER.error(error);
ErrorManager.fatal("ERROR WHILE PARSING INPUT FILE. FILE FORMAT NOT CORRECT");
}
}
......@@ -236,40 +233,41 @@ public class LNS {
// TODO: move it to a common folder/class inside mapTaskToResource
private ResourceTransferCost selectBestResource(Resources resources, ArrayList<Float> workload, DAG dag,
ReadyQueueElement rqe, ArrayList<Integer> computingNode, ArrayList<Float> endTime) {
ReadyQueueElement rqe, ArrayList<Integer> computingNode, ArrayList<Float> endTime, Set<String> tagsUsed) {
int resource = 0;
ArrayList<Float> overheads = new ArrayList<>(Collections.nCopies(this.m, 0.0f));
ArrayList<Float> transfers = new ArrayList<>(Collections.nCopies(this.m, 0.0f));
ArrayList<Float> completionTime = new ArrayList<>(Collections.nCopies(this.m, 0.0f));
ArrayList<Float> releaseTimes = new ArrayList<>(Collections.nCopies(this.m, rqe.getReleaseTime()));
Set<String> tags = new HashSet<>();
boolean[][] succ = dag.getSucc();
float[][] ibw = resources.getIBW();
float[][] z = dag.getZ();
Map<String, float[]> C = dag.getC();
float transfer;
float overhead;
for (int idxRes = 0; idxRes < this.m; ++idxRes) {
float maxTime = 0.0f;
for (int pred = 0; pred < rqe.getId(); ++pred) {
if (succ[pred][rqe.getId()]) {
String tag = dag.getZid(pred, rqe.getId());
if (!tag.equals("0") && !tags.contains(tag) && !tagsUsed.contains(tag)) {
tags.add(tag);
// TODO: check if workers are the same for both tasks receiving the same tag
}
float transferTime = (float) (ibw[computingNode.get(pred)][idxRes]
* (Math.ceil((z[pred][rqe.getId()] / resources.getMFS())) * resources.getH()
+ z[pred][rqe.getId()]));
float t = endTime.get(pred);// + transferTime;// + resources.getOverhead();
if (computingNode.get(pred) != idxRes
&& workload.get(idxRes) <= t + resources.getOverhead() + transferTime) {
if (t + resources.getOverhead() + transferTime > maxTime) {
if (computingNode.get(pred) != idxRes && workload.get(idxRes) <= t + transferTime) {
if (t + transferTime > maxTime) {
releaseTimes.set(idxRes, endTime.get(pred));
transfers.set(idxRes, transferTime);
maxTime = t + resources.getOverhead() + transferTime;
maxTime = t + transferTime;
}
overheads.set(idxRes, resources.getOverhead());
} else {
if (computingNode.get(pred) == idxRes) {
if (maxTime < endTime.get(pred)) {
transfers.set(idxRes, transferTime);
overheads.set(idxRes, 0.0f);
}
}
}
......@@ -295,39 +293,44 @@ public class LNS {
}
}
overhead = overheads.get(resource);
transfer = transfers.get(resource);
transfer += transfer * this.networkMargin;
for (String tag : tags) {
if (!tagsUsed.contains(tag))
tagsUsed.add(tag);
else
transfer = 0;
}
rqe.setReleaseTime(releaseTimes.get(resource));
return new ResourceTransferCost(resource, overhead, transfer);
return new ResourceTransferCost(resource, transfer, tagsUsed);
}
// TODO: move to a common class
public ResourceTransferCost mapTaskToResource(ReadyQueueElement rqe, Resources resources, DAG dag,
ArrayList<Float> workload, ArrayList<Integer> computingNode, ArrayList<Float> endTime) {
ArrayList<Float> workload, ArrayList<Integer> computingNode, ArrayList<Float> endTime, Set<String> tagsUsed) {
int taskId = rqe.getId();
ResourceTransferCost rtf;
if (checkDummy(taskId)) {
rtf = checkDummyTransferTime(dag, resources, workload, rqe, computingNode, endTime);
rtf = checkDummyTransferTime(dag, resources, workload, rqe, computingNode, endTime, tagsUsed);
} else {
rtf = selectBestResource(resources, workload, dag, rqe, computingNode, endTime);
rtf = selectBestResource(resources, workload, dag, rqe, computingNode, endTime, tagsUsed);
}
float releaseTime = rqe.getReleaseTime();
if (releaseTime + rtf.getTransfer() + rtf.getOverhead() < workload.get(rtf.getResource())) {
if (releaseTime + rtf.getTransfer() < workload.get(rtf.getResource())) {
rqe.setReleaseTime(workload.get(rtf.getResource()));
} else {
rqe.setReleaseTime(releaseTime + rtf.getTransfer() + rtf.getOverhead());
rqe.setReleaseTime(releaseTime + rtf.getTransfer());
}
return rtf;
}
private ResourceTransferCost checkDummyTransferTime(DAG dag, Resources resources, ArrayList<Float> workload,
ReadyQueueElement rqe, ArrayList<Integer> computingNode, ArrayList<Float> endTime) {
ReadyQueueElement rqe, ArrayList<Integer> computingNode, ArrayList<Float> endTime, Set<String> tagsUsed) {
boolean[][] succ = dag.getSucc();
float[][] ibw = resources.getIBW();
float[][] z = dag.getZ();
float overhead = 0;
float transfer = 0;
int resource = 0;
float maxTime = 0.0f;
......@@ -337,25 +340,24 @@ public class LNS {
* (Math.ceil((z[pred][rqe.getId()] / resources.getMFS())) * resources.getH()
+ z[pred][rqe.getId()]));
float t = endTime.get(pred);// + transferTime;// + resources.getOverhead();
if (computingNode.get(pred) != resource
&& workload.get(0) <= t + resources.getOverhead() + transferTime) {
if (t + resources.getOverhead() + transferTime > maxTime) {
if (computingNode.get(pred) != resource && workload.get(0) <= t + transferTime) {
if (t + transferTime > maxTime) {
rqe.setReleaseTime(endTime.get(pred));
transfer = transferTime;
maxTime = t + resources.getOverhead() + transferTime;
maxTime = t + transferTime;
}
overhead = resources.getOverhead();
} else {
if (computingNode.get(pred) == resource) {
if (maxTime < endTime.get(pred)) {
transfer = transferTime;
overhead = 0;
}
}
}
}
}
return new ResourceTransferCost(resource, overhead, transfer);
// TODO: check if tags present to remove transfer
transfer += transfer * this.networkMargin;
return new ResourceTransferCost(resource, transfer, tagsUsed);
}
private boolean checkDummy(int readyTask) {
......@@ -364,7 +366,7 @@ public class LNS {
for (String worker : C.keySet()) {
sum += C.get(worker)[readyTask];
}
return (sum == 0.0f);
return (sum < 0.0f);
}
// TODO: move it to a common class
......@@ -389,7 +391,7 @@ public class LNS {
if (ready) {
boolean[] visited = new boolean[n];
int nSucc = computeSuccessors(s, visited);
System.out.println("FOR TASK " + s + " NUM SUCC IS " + nSucc);
// System.out.println("FOR TASK " + s + " NUM SUCC IS " + nSucc);
ReadyQueueElement rqe = new ReadyQueueElement(s, releaseTime, nSucc);
rqe.addToQueue(readyQueue);
}
......@@ -404,6 +406,7 @@ public class LNS {
ArrayList<Float> endTime = new ArrayList<>(Collections.nCopies(this.n, 0.0f));
ArrayList<Integer> computingNode = new ArrayList<>(Collections.nCopies(this.n, 0));
ArrayList<Float> workload = new ArrayList<>(Collections.nCopies(this.m, 0.0f)); // end times for each worker
Set<String> tagsUsed = new HashSet<>();
int sourceId = 0; // dummy source node
......@@ -421,11 +424,16 @@ public class LNS {
while (!readyQueue.isEmpty()) {
ReadyQueueElement rqe = getNextReadyTask(readyQueue);
readyTask = rqe.getId();
if (checkDummy(readyTask)) {
this.dummies.set(readyTask, true);
this.dummyNodes++;
}
// System.out.println("Next ready task selected: " + (readyTask + 1));
// for the selected ready task, the bestResource is computed based on the minCT
rtf = mapTaskToResource(rqe, this.resources, this.dag, workload, computingNode, endTime);
rtf = mapTaskToResource(rqe, this.resources, this.dag, workload, computingNode, endTime, tagsUsed);
tagsUsed.addAll(rtf.getTagsUsed());
releaseTime = rqe.getReleaseTime();
// System.out.println("Resource to execute the ready task selected: " + rtf.getResource());
......@@ -595,15 +603,16 @@ public class LNS {
private int resource;
private float overhead;
private float transfer;
private Set<String> tagsUsed;
public ResourceTransferCost(int resource, float overhead, float transfer) {
public ResourceTransferCost(int resource, float transfer, Set<String> tagsUsed) {
this.resource = resource;
this.overhead = overhead;
this.transfer = transfer;
this.tagsUsed = new HashSet<>();
this.tagsUsed.addAll(tagsUsed);
}
public int getResource() {
......@@ -614,8 +623,8 @@ public class LNS {
return this.transfer;
}
public float getOverhead() {
return this.overhead;
public Set<String> getTagsUsed() {
return this.tagsUsed;
}
}
}
\ No newline at end of file
......@@ -17,24 +17,20 @@
package es.bsc.compss.scheduler.heuristics;
import es.bsc.compss.log.Loggers;
import es.bsc.compss.util.ErrorManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.PriorityQueue;
import java.io.*;
import java.util.*;
public class LNSNL {
// Logger
protected static final Logger LOGGER = LogManager.getLogger(Loggers.TS_COMP);
// DAG containing C, z and succ
private DAG dag;
......@@ -51,6 +47,8 @@ public class LNSNL {
private ArrayList<Boolean> dummies;
private float networkMargin;
public LNSNL(List<String> workers) {
HeuristicsConfiguration.load();
......@@ -101,6 +99,21 @@ public class LNSNL {
z[i][j] = Float.parseFloat(st[j]);
}
}
br.readLine();
br.readLine();
String[][] zid = new String[n][n];
for (int i = 0; i < n; ++i) {
lines = br.readLine();
st = lines.trim().split("\\s+");
for (int j = 0; j < n; ++j) {
if (j == 0) {
int pos = st[0].indexOf('[');
st[0] = st[0].substring(pos + 1);
}
zid[i][j] = st[j];
}
}
br.readLine();
lines = br.readLine();
st = lines.trim().split("\\s+");
......@@ -140,31 +153,13 @@ public class LNSNL {
}
}
br.readLine();
lines = br.readLine();
st = lines.trim().split("\\s+");
this.dummies = new ArrayList<>(n);
for (int j = 0; j < n; j++) {
if (j == 0) {
int pos = st[j + 2].indexOf('[');
if (st[j + 2].substring(pos + 1).charAt(0) == '0') {
this.dummies.add(false);
} else {
this.dummies.add(true);
this.dummyNodes++;
}
} else {
if (st[j + 2].charAt(0) == '0') {
this.dummies.add(false);
} else {
this.dummies.add(true);
this.dummyNodes++;
}
}
}
this.dummies = new ArrayList<>(Arrays.asList(new Boolean[n]));
Collections.fill(this.dummies, Boolean.FALSE);
this.dummyNodes = 0;
lines = br.readLine();
st = lines.trim().split("\\s+");
float overhead = Float.parseFloat(st[2].substring(0, st[2].indexOf(';')));
this.networkMargin = Float.parseFloat(st[2].substring(0, st[2].indexOf(';')));
ArrayList<String> workersOrder = new ArrayList<>(m);
lines = br.readLine();
......@@ -178,7 +173,7 @@ public class LNSNL {
}
}
this.resources = new Resources(ibw, mfs, h, overhead, m);
this.resources = new Resources(ibw, mfs, h, m);
resources.setResourceNames(workers);
HashMap<String, float[]> mapC = new HashMap<>();
......@@ -190,9 +185,12 @@ public class LNSNL {
// mapC.put(workers.get(j), times);
mapC.put(workersOrder.get(j), times);
}
this.dag = new DAG(mapC, z, succ, n);
this.dag = new DAG(mapC, z, zid, succ, n);
} catch (IOException | StringIndexOutOfBoundsException | ArrayIndexOutOfBoundsException
| NullPointerException e) {
StringWriter error = new StringWriter();
e.printStackTrace(new PrintWriter(error));
LOGGER.error(error);
ErrorManager.fatal("ERROR WHILE PARSING INPUT FILE. FILE FORMAT NOT CORRECT");
}
}
......@@ -234,40 +232,42 @@ public class LNSNL {
// TODO: move it to a common folder/class inside mapTaskToResource
private ResourceTransferCost selectBestResource(Resources resources, ArrayList<Float> workload, DAG dag,
ReadyQueueElement rqe, ArrayList<Integer> computingNode, ArrayList<Float> endTime) {
ReadyQueueElement rqe, ArrayList<Integer> computingNode, ArrayList<Float> endTime, Set<String> tagsUsed) {
int resource = 0;
ArrayList<Float> overheads = new ArrayList<>(Collections.nCopies(this.m, 0.0f));
ArrayList<Float> transfers = new ArrayList<>(Collections.nCopies(this.m, 0.0f));
ArrayList<Float> completionTime = new ArrayList<>(Collections.nCopies(this.m, 0.0f));
ArrayList<Float> releaseTimes = new ArrayList<>(Collections.nCopies(this.m, rqe.getReleaseTime()));
Set<String> tags = new HashSet<>();
boolean[][] succ = dag.getSucc();
float[][] ibw = resources.getIBW();
float[][] z = dag.getZ();
Map<String, float[]> C = dag.getC();
float transfer;
float overhead;
for (int idxRes = 0; idxRes < this.m; ++idxRes) {
float maxTime = 0.0f;
for (int pred = 0; pred < rqe.getId(); ++pred) {
if (succ[pred][rqe.getId()]) {
String tag = dag.getZid(pred, rqe.getId());
if (!tag.equals("0") && !tags.contains(tag) && !tagsUsed.contains(tag)) {
tags.add(tag);
// TODO: check if workers are the same for both tasks receiving the same tag
}
float transferTime = (float) (ibw[computingNode.get(pred)][idxRes]
* (Math.ceil((z[pred][rqe.getId()] / resources.getMFS())) * resources.getH()
+ z[pred][rqe.getId()]));
float t = endTime.get(pred);// + transferTime;// + resources.getOverhead();
if (computingNode.get(pred) != idxRes
&& workload.get(idxRes) <= t + resources.getOverhead() + transferTime) {
if (t + resources.getOverhead() + transferTime > maxTime) {
if (computingNode.get(pred) != idxRes && workload.get(idxRes) <= t + transferTime) {
if (t + transferTime > maxTime) {
releaseTimes.set(idxRes, endTime.get(pred));
transfers.set(idxRes, transferTime);
maxTime = t + resources.getOverhead() + transferTime;
maxTime = t + transferTime;
}