Commit 2d2d05f2 authored by vmasip's avatar vmasip
Browse files

no dataclay

parent a07c2c45
......@@ -4,4 +4,5 @@ __pycache__
*.so
stubs
*.in
*.log
\ No newline at end of file
*.log
*.out
\ No newline at end of file
......@@ -86,17 +86,21 @@ RUN git clone https://gitlab.bsc.es/ppc-bsc/software/tracker.git -b dev && \
cmake .. -DWITH_MATPLOTLIB=OFF && \
make -j8
RUN git clone https://gitlab.bsc.es/ppc-bsc/software/smartcity-compss.git
# # Tracker class project
# RUN git clone https://github.com/class-euproject/tracker_CLASS.git -b bsc && \
# cd tracker_CLASS && \
# git submodule update --init --recursive && \
# mkdir build && \
# cd build && \
# cmake .. -DWITH_MATPLOTLIB=OFF && \
# make -j8
RUN cp /root/tracker/build/track.cpython-36m-aarch64-linux-gnu.so smartcity-compss/lib
RUN echo "dummy"
RUN git clone https://gitlab.bsc.es/ppc-bsc/software/smartcity-compss.git -b dev
# Tracker class project
RUN git clone https://github.com/class-euproject/tracker_CLASS.git -b bsc && \
cd tracker_CLASS && \
git submodule update --init --recursive && \
mkdir build && \
cd build && \
cmake .. -DWITH_MATPLOTLIB=OFF && \
make -j8
# RUN cp /root/tracker/build/track.cpython-36m-aarch64-linux-gnu.so smartcity-compss/lib
RUN cp /root/tracker_CLASS/build/track.cpython-36m-aarch64-linux-gnu.so smartcity-compss/lib
#RUN cp /root/deduplicator/build/deduplicator.cpython-36m-aarch64-linux-gnu.so smartcity-compss/lib
# cp /root/deduplicator/build/deduplicator.cpython-36m-x86_64-linux-gnu.so . && \
......
......@@ -6,4 +6,14 @@
<WorkingDir>/tmp/COMPSsWorker/</WorkingDir>
<User>flo01</User>
</ComputeNode>
<!-- <ComputeNode Name="192.168.121.247">
<InstallDir>/opt/COMPSs/</InstallDir>
<WorkingDir>/tmp/COMPSsWorker/</WorkingDir>
<User>flo01</User>
</ComputeNode>
<ComputeNode Name="192.168.121.246">
<InstallDir>/opt/COMPSs/</InstallDir>
<WorkingDir>/tmp/COMPSsWorker/</WorkingDir>
<User>flo01</User>
</ComputeNode> -->
</Project>
\ No newline at end of file
......@@ -30,4 +30,65 @@
</Adaptor>
</Adaptors>
</ComputeNode>
<!-- <ComputeNode Name="192.168.121.247">
<Processor Name="MainProcessor">
<ComputingUnits>4</ComputingUnits>
</Processor>
<Adaptors>
<Adaptor Name="es.bsc.compss.nio.master.NIOAdaptor">
<SubmissionSystem>
<Interactive/>
</SubmissionSystem>
<Ports>
<MinPort>43002</MinPort>
<MaxPort>43003</MaxPort>
</Ports>
<Properties>
<Property>
<Name>Engine</Name>
<Value>docker</Value>
</Property>
<Property>
<Name>ImageName</Name>
<Value>bscppc/smartcity-compss:2.7-nfr2-3.6</Value>
</Property>
<Property>
<Name>PullIfNeeded</Name>
<Value>true</Value>
</Property>
</Properties>
</Adaptor>
</Adaptors>
</ComputeNode>
<ComputeNode Name="192.168.121.246">
<Processor Name="MainProcessor">
<ComputingUnits>4</ComputingUnits>
</Processor>
<Adaptors>
<Adaptor Name="es.bsc.compss.nio.master.NIOAdaptor">
<SubmissionSystem>
<Interactive/>
</SubmissionSystem>
<Ports>
<MinPort>43002</MinPort>
<MaxPort>43003</MaxPort>
</Ports>
<Properties>
<Property>
<Name>Engine</Name>
<Value>docker</Value>
</Property>
<Property>
<Name>ImageName</Name>
<Value>bscppc/smartcity-compss:2.7-nfr2-3.6</Value>
</Property>
<Property>
<Name>PullIfNeeded</Name>
<Value>true</Value>
</Property>
</Properties>
</Adaptor>
</Adaptors>
</ComputeNode> -->
</ResourcesList>
def main():
from dataclay.tool.functions import get_stubs
import subprocess
import time
print('HOLAAAAAAAAAAAAAAAAAA')
# from dataclay.tool.functions import get_stubs
# import subprocess
# import time
# Env variables
dataclay_jar_path = "dataclay/dataclay.jar"
user = "defaultUser"
password = "defaultPass"
namespace = "CityNS"
stubspath = "./stubs"
# # Env variables
# dataclay_jar_path = "dataclay/dataclay.jar"
# user = "defaultUser"
# password = "defaultPass"
# namespace = "CityNS"
# stubspath = "./stubs"
# Connection to get contract_id
contract_id = None
while contract_id is None:
try:
contract_id = subprocess.check_output(f"timeout 10 java -cp {dataclay_jar_path} es.bsc.dataclay.tool.AccessNamespace {user} {password} {namespace} | tail -1", shell=True, stderr=subprocess.DEVNULL)[:-1].decode()
print(f"CONTRACT ID IS {contract_id}")
if contract_id == "":
contract_id = None
except:
contract_id = None
print(f"Waiting for contract_id to be set and ready from dataclay in entrypoint.py...")
time.sleep(1)
# # Connection to get contract_id
# contract_id = None
# while contract_id is None:
# try:
# contract_id = subprocess.check_output(f"timeout 10 java -cp {dataclay_jar_path} es.bsc.dataclay.tool.AccessNamespace {user} {password} {namespace} | tail -1", shell=True, stderr=subprocess.DEVNULL)[:-1].decode()
# print(f"CONTRACT ID IS {contract_id}")
# if contract_id == "":
# contract_id = None
# except:
# contract_id = None
# print(f"Waiting for contract_id to be set and ready from dataclay in entrypoint.py...")
# time.sleep(1)
# Get stubs
print(f"CONTRACT ID OUTSIDE WHILE IS {contract_id}")
get_stubs(user, password, contract_id, stubspath)
# # Get stubs
# print(f"CONTRACT ID OUTSIDE WHILE IS {contract_id}")
# get_stubs(user, password, contract_id, stubspath)
if __name__ == "__main__":
......
......@@ -16,6 +16,7 @@ done
runcompss -d --python_interpreter=python3 --lang=python --master_name=192.168.121.248 --master_port=43001 \
--scheduler="es.bsc.compss.scheduler.fifo.FIFOScheduler" \
--project=config/project.xml --resources=config/resources.xml \
--classpath=/root/smartcity-compss/dataclay/dataclay.jar \
--storage_conf=/root/smartcity-compss/cfgfiles/session.properties tracker.py 10.50.100.3:8887 --with_dataclay
\ No newline at end of file
--project=config/project.xml --resources=config/resources.xml tracker.py 10.50.100.2:8887 10.50.100.2:8886 --with_dataclay
# --classpath=/root/smartcity-compss/dataclay/dataclay.jar \
# --storage_conf=/root/smartcity-compss/cfgfiles/session.properties \
# tracker.py 10.50.100.3:8887 10.50.100.3:8886 --with_dataclay
\ No newline at end of file
......@@ -26,7 +26,7 @@ from geopandas import GeoDataFrame
from shapely import geometry
import numpy as np
NUM_ITERS = 7500
NUM_ITERS = 3
NUM_ITERS_POLLUTION = 25
SNAP_PER_FEDERATION = 15
N = 5
......@@ -72,8 +72,7 @@ def getRoi(roi_path):
@task(returns=3, list_boxes=IN, trackers=IN, cur_index=IN, init_point=IN)
def execute_tracking(list_boxes, trackers, cur_index, init_point):
a, b, c = track.track2(list_boxes, trackers, cur_index, init_point)
return a, b, c
return track.track2(list_boxes, trackers, cur_index, init_point)
@task(returns=7,)
......@@ -81,12 +80,12 @@ def receive_boxes(socket_ip, dummy):
import struct
import time
import traceback
print("WAiting ........")
socket_port = 5559
if ":" in socket_ip:
socket_ip, socket_port = socket_ip.split(":")
socket_port = int(socket_port)
print("WAiting ........2")
message = b""
cam_id = None
timestamp = None
......@@ -95,7 +94,7 @@ def receive_boxes(socket_ip, dummy):
init_point = None
no_read = True
frame_number = -1
print("WAiting ........3")
serverSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serverSocket.sendto(b"A", (socket_ip, socket_port))
......@@ -105,9 +104,10 @@ def receive_boxes(socket_ip, dummy):
while no_read:
try:
print("WAiting ........4")
no_read = False
message, address = serverSocket.recvfrom(16000)
print("WAiting ........5")
flag = len(message) > 0
# This flag serves to know if the video has ended
cam_id = struct.unpack_from("i", message[1:1 + int_size])[0]
......@@ -127,6 +127,7 @@ def receive_boxes(socket_ip, dummy):
# print(range(1 + int_size + unsigned_long_size + double_size * 2, len(message),
# double_size * 10 + int_size + 1 + float_size * 4))
init_point = (lat, lon)
print("WAiting ........6")
for offset in range(1 + int_size + unsigned_long_size + double_size * 2, len(message),
double_size * 10 + int_size + 1 + float_size * 4):
north, east, frame_number, obj_class = struct.unpack_from('ddIc', message[
......@@ -141,10 +142,11 @@ def receive_boxes(socket_ip, dummy):
float_size * 4:])
box_coords.append((lat_ur, lon_ur, lat_lr, lon_lr, lat_ll, lon_ll, lat_ul, lon_ul))
except socket.error as e:
no_read = True
traceback.print_exc()
print("WAiting ........7")
serverSocket.close()
# box_coords: 4 esquinas coordenadas
# boxes: track.obj_m -> modules.cpp de tracker_CLASS/bsc
......@@ -157,26 +159,26 @@ def deduplicate(trackers_list, cam_ids, foo_dedu, frames):
return_message = dd.compute_deduplicator(trackers_list, cam_ids, frames,False)
return return_message, foo_dedu
@task(returns = 1, kb = IN)
def getResistenzaStatus(kb):
@task(returns = 1,)
def getResistenzaStatus():
if (kb.traffic_lights[(11.1831603045325, 43.75873372968)].status == 'Red'):
vehLights = {'G3': False} # RED
else:
vehLights = {'G3': True} # GREEN, YELLOW
# if (kb.traffic_lights[(11.1831603045325, 43.75873372968)].status == 'Red'):
# vehLights = {'G3': False} # RED
# else:
# vehLights = {'G3': True} # GREEN, YELLOW
if (kb.traffic_lights[(11.1828671784974, 43.758755396189)].status == 'Red'):
pedLights = {'G1': False} # RED
else:
pedLights = {'G1': True} # GREEN, YELLOW
# if (kb.traffic_lights[(11.1828671784974, 43.758755396189)].status == 'Red'):
# pedLights = {'G1': False} # RED
# else:
# pedLights = {'G1': True} # GREEN, YELLOW
if (kb.traffic_lights[(11.1829760293935, 43.7588197766266)].status == 'Red'):
pedLights.update({'G2': False}) # RED
else:
pedLights.update({'G2': True}) # GREEN, YELLOW
# if (kb.traffic_lights[(11.1829760293935, 43.7588197766266)].status == 'Red'):
# pedLights.update({'G2': False}) # RED
# else:
# pedLights.update({'G2': True}) # GREEN, YELLOW
return {'pedLights': pedLights, 'vehLights': vehLights, 'tramApproach':True}
return {'pedLights': {'G1': True, 'G2': True,'G3': True}, 'vehLights': {'G3': True}, 'tramApproach':True}
def getResistenzaStatus2(current_frame):
......@@ -189,10 +191,10 @@ def getResistenzaStatus2(current_frame):
pedLights = {'G1': q['pedLightG1'].values, 'G2': q['pedLightG2'].values} # RED
return {'pedLights': pedLights, 'vehLights': vehLights, 'tramApproach':q['tramState'].values[0]}
@task(returns=1,id_cam=IN, timestamp = IN, info_for_deduplicator=IN,polys=IN, kb=IN, areaState=IN)
def semantic_analysis(id_cam,timestamp, info_for_deduplicator, polys, kb, areaState):
@task(returns=2,id_cam=IN, timestamp = IN, info_for_deduplicator=IN,polys=IN, areaState=IN)
def semantic_analysis(id_cam,timestamp, info_for_deduplicator, polys, areaState):
# Alert list will contains binary alert value for inserting into csv
from CityNS.classes import Alert
# from CityNS.classes import Alert
alertList = []
alertInfo = []
alarmTime = 10000
......@@ -293,47 +295,48 @@ def semantic_analysis(id_cam,timestamp, info_for_deduplicator, polys, kb, areaSt
description = 'Vehicle queue of length ' + str(carQueueLength)
alertQueueFlag = True
data = carQueueLength
alertList[-1] = 5
alertInfo[-1] = [5, alert_category, severity, description, str(id_cam) + '_' + str(trackId)]
# Any alertFlag send Alert.
if (alertFlag):
print(f'**ALERT: cam id:{id_cam}'\
f' -- {area} {severity} {description}'\
f' -- {info_for_deduplicator[i][0]}, {info_for_deduplicator[i][1]} | timeLapse = {alarmTime}')
alert = Alert( id = str(id_cam) + '_' + str(trackId),
source = id_cam,
alert_category = alert_category,
severity= severity,
longitude = info_for_deduplicator[i][1],
latitude = info_for_deduplicator[i][0],
area = area,
description = description,
timestamp = datetime.utcfromtimestamp(timestamp / 1000),
valid_from = datetime.utcfromtimestamp(timestamp / 1000),
valid_to = datetime.utcfromtimestamp((timestamp + alarmTime) / 1000))
alert.make_persistent()
alert.send_to_mqtt()
# alert = Alert( id = str(id_cam) + '_' + str(trackId),
# source = id_cam,
# alert_category = alert_category,
# severity= severity,
# longitude = info_for_deduplicator[i][1],
# latitude = info_for_deduplicator[i][0],
# area = area,
# description = description,
# timestamp = datetime.utcfromtimestamp(timestamp / 1000),
# valid_from = datetime.utcfromtimestamp(timestamp / 1000),
# valid_to = datetime.utcfromtimestamp((timestamp + alarmTime) / 1000))
# alert.make_persistent()
# alert.send_to_mqtt()
# AlertQueueFlag only triggers Alarm when all the boxes has been computed
if (alertQueueFlag):
print(f'**ALERT: cam id: trafficJam_ {str(data)}'\
f' -- {area} {severity} {description}'\
f' -- 11.1831603045325, 43.75873372968 | timeLapse = {alarmTime}')
alert = Alert( id = 'trafficJam_' + str(data) ,
source = id_cam,
alert_category = alert_category,
severity= severity,
longitude = 11.1831603045325, # posicion semaforo vehiculos
latitude = 43.75873372968,
area = area,
description = description,
#data = data,
timestamp = datetime.utcfromtimestamp(timestamp / 1000),
valid_from = datetime.utcfromtimestamp(timestamp / 1000),
valid_to = datetime.utcfromtimestamp((timestamp + alarmTime) / 1000))
alert.make_persistent()
alert.send_to_mqtt()
alert.send_to_kafka("dataclay", "resistenza")
alertList[-1] = 5
alertInfo[-1] = [5, alert_category, severity, description, str(id_cam) + '_' + str(trackId)]
# alert = Alert( id = 'trafficJam_' + str(data) ,
# source = id_cam,
# alert_category = alert_category,
# severity= severity,
# longitude = 11.1831603045325, # posicion semaforo vehiculos
# latitude = 43.75873372968,
# area = area,
# description = description,
# #data = data,
# timestamp = datetime.utcfromtimestamp(timestamp / 1000),
# valid_from = datetime.utcfromtimestamp(timestamp / 1000),
# valid_to = datetime.utcfromtimestamp((timestamp + alarmTime) / 1000))
# alert.make_persistent()
# alert.send_to_mqtt()
# alert.send_to_kafka("dataclay", "resistenza")
return alertList,alertInfo
......@@ -571,7 +574,7 @@ def boxes_and_track(socket_ip, trackers_list, tracker_indexes, cur_index):
return execute_tracking(list_boxes, trackers_list, tracker_indexes, cur_index)
def execute_trackers(socket_ips, with_dataclay, kb):
def execute_trackers(socket_ips, with_dataclay):
import uuid
import time
import sys
......@@ -617,10 +620,10 @@ def execute_trackers(socket_ips, with_dataclay, kb):
foo_dedu = foo = None
while i < NUM_ITERS:
print(i)
print(f'Iteration: {i}')
# readScenarioState: trafficLights, tram NGAP
areaState = getResistenzaStatus(kb)
areaState = getResistenzaStatus()
for index, socket_ip in enumerate(socket_ips):
......@@ -630,10 +633,9 @@ def execute_trackers(socket_ips, with_dataclay, kb):
trackers_list[index],
cur_index[index],
init_point)
# areaState = getResistenzaStatus2(frames[index])
# info_deduplicated, foo_dedu = deduplicate(info_for_deduplicator, cam_ids, foo_dedu, frames)
alertsList,alertInfo = semantic_analysis(cam_ids[index], timestamps[index], info_for_deduplicator[index] , polys, kb, areaState)
alertsList,alertInfo = semantic_analysis(cam_ids[index], timestamps[index], info_for_deduplicator[index] , polys, areaState)
dump(cam_ids[index], timestamps[index], frames[index], info_for_deduplicator[index], alertsList,initTime)
dump_state(cam_ids[index],frames[index], areaState,initTime)
dump_alerts(cam_ids[index],frames[index], alertInfo, initTime)
......@@ -680,7 +682,7 @@ def main():
import sys
import time
import zmq
print('000000000000000000')
# Parse arguments to accept variable number of "IPs:Ports"
parser = argparse.ArgumentParser()
parser.add_argument("tkdnn_ips", nargs='+')
......@@ -688,11 +690,11 @@ def main():
parser.add_argument("--with_dataclay", nargs='?', const=True, type=str2bool, default=False) # True as default
args = parser.parse_args()
if (args.with_dataclay):
# Initialize dataclay
init()
# Load dataclay DKB class
from CityNS.classes import DKB
# if (args.with_dataclay):
# # Initialize dataclay
# init()
# # Load dataclay DKB class
# from CityNS.classes import DKB
# initialize all computing units in all workers
# num_cus = 8
......@@ -702,17 +704,17 @@ def main():
print(f"Init task completed {datetime.now()}")
#input("Press enter to continue...")
if (args.with_dataclay):
#Dataclay KB generation
try:
print('kb initiated right')
kb = DKB.get_by_alias("DKB")
except DataClayException:
kb = DKB()
kb.make_persistent("DKB")
else:
kb = None
# if (args.with_dataclay):
# #Dataclay KB generation
# try:
# print('kb initiated right')
# kb = DKB.get_by_alias("DKB")
# except DataClayException:
# kb = DKB()
# kb.make_persistent("DKB")
# else:
# kb = None
print('Dataclay ended')
### ACK TO START WORKFLOW AT tkDNN ###
for socket_ip in args.tkdnn_ips:
if ":" not in socket_ip:
......@@ -723,8 +725,8 @@ def main():
sink.send_string("")
sink.close()
context.term()
execute_trackers(args.tkdnn_ips, args.with_dataclay, kb)
print('Executing trackers function')
execute_trackers(args.tkdnn_ips, args.with_dataclay)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment