First commit

parent a94d47f9
FROM python:3.7-alpine
RUN apk update && apk upgrade && apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt ./
RUN python3 -m pip install -r requirements.txt
COPY telemetry.py config.yml ./
CMD ["python3", "-u", "./telemetry.py"]
FROM python:3.7-alpine
RUN apk update && apk upgrade && apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt ./
RUN python3 -m pip install -r requirements.txt
COPY commscost.py commscost-config.yml app-info.yml app-attr.yml ./
CMD ["python3", "-u", "./commscost.py"]
devLevel:
rttmax: 25
pllmax: 1e-6
VoIP:
norm: 10
priority: 2
balance: 0.25
rttmax: 100
pllmax: 1e-2
iface: enp0s3
videoCall:
norm: 10
priority: 4
balance: 0.25
rttmax: 150
pllmax: 1e-3
iface: enp0s3
onlineGame:
norm: 10
priority: 3
balance: 0.25
rttmax: 50
pllmax: 1e-3
iface: enp0s3
videoStream:
norm: 10
priority: 5
balance: 0.25
rttmax: 300
pllmax: 1e-6
iface: enp0s3
criticalPTT:
norm: 10
priority: 0.7
balance: 0.25
rttmax: 75
pllmax: 1e-2
iface: enp0s3
nonCriticalPTT:
norm: 10
priority: 2
balance: 0.25
rttmax: 100
pllmax: 1e-2
iface: enp0s3
IMS:
norm: 10
priority: 1
balance: 0.25
rttmax: 100
pllmax: 1e-6
iface: enp0s3
TCPservices:
norm: 10
priority: 6
balance: 0.25
rttmax: 300
pllmax: 1e-6
iface: enp0s3
interactive:
norm: 10
priority: 7
balance: 0.25
rttmax: 100
pllmax: 1e-3
iface: enp0s3
delaySensitive:
norm: 10
priority: 0.5
balance: 0.25
rttmax: 60
pllmax: 1e-6
iface: enp0s3
criticalData:
norm: 10
priority: 5.5
balance: 0.25
rttmax: 200
pllmax: 1e-6
iface: enp0s3
yolo:
pid: 1234
infoNature: videoStream
active: True
crewcall:
pid: 5678
infoNature: VoIP
active: True
monitor:
interval: 1
netinterfaces:
- enp0s3
costparams:
norm: 10
import:
file:
enabled: yes
path: ./telemetry-data/telemetry-output
mqtt:
enabled: no
hostname: emq.konnekt.ikerlan.es
port: 1883
username: dev
password:
topic: konnekt/v2/demo/telemetry
tls:
enabled: no
ca:
cert:
key:
export:
screen:
enabled: yes
file:
enabled: yes
path: ./commsCost-output
mqtt:
enabled: no
hostname: emq.konnekt.ikerlan.es
port: 1883
username: dev
password:
topic: konnekt/v2/demo/commsCost
tls:
enabled: no
ca:
cert:
key:
#!/usr/bin/python3
"""KonnektBox Manager
Obtains the communication cost of an application
Copyright (C) 2020 IKERLAN S.Coop
"""
# Standard imports
import argparse
import os
#import re
import sys
import time
import json
import math
#import socket
# Installed packages
import yaml
#import psutil
#import cpuinfo
#import paho.mqtt.publish as publish
# Constants
DEF_CONFIG_PATH = './commscost-config.yml'
DEF_APPINFO_PATH = './app-info.yml'
DEF_APPATTR_PATH = './app-attr.yml'
def get_params():
""" Get console input parameters """
parser = argparse.ArgumentParser(description='KonnektBox telemetry daemon.')
parser.add_argument('-c', '--config', metavar='path', help='path of the config file')
args = parser.parse_args()
return args
def get_config(config, path):
""" Return CONFIG read from CONFIG file (same CONFIG if file not changed) """
if 'timestamp' in config:
if config['timestamp'] == os.path.getmtime(path):
return config
with open(path, 'r') as stream:
try:
config = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
sys.exit()
config['timestamp'] = os.path.getmtime(path)
return config
def get_telemetry(config):
""" Return telemetry_data read from output file (TODO: get data via MQTT) """
if config['import']['file']['enabled']:
with open(config['import']['file']['path'], 'r') as stream:
try:
telemetry_data = json.load(stream)
except ValueError as exc:
print(exc)
sys.exit()
return telemetry_data
def get_appinfo(path):
""" Load info/attributes from a YAML file"""
with open(path, 'r') as stream:
try:
appinfo = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
sys.exit()
return appinfo
def get_cost(ninfo, app_attr, app_info):
""" Return the communication cost for every application """
cost = {}
for app in app_info:
infonat = app_info[app]['infoNature']
iface = app_attr[infonat]['iface']
bal = float(app_attr[infonat]['balance'])
rttmax = float(app_attr[infonat]['rttmax'])
pllmax = float(app_attr[infonat]['pllmax'])
try:
rtt = float(ninfo[iface]['rtt_ms'])
total_packets = float(ninfo[iface]['rx_packets'] + ninfo[iface]['tx_packets'])
total_lost = float(ninfo[iface]['rx_lost_packets'] + ninfo[iface]['tx_lost_packets'])
pll = total_lost / total_packets
except yaml.YAMLError as exc:
print(exc)
sys.exit()
try:
cost[app] = ((float(app_attr[infonat]['priority']) / float(app_attr[infonat]['norm']))
- (bal*math.log(max(0, (1-(rtt/rttmax))))
+ (1-bal)*math.log(max(0, (1-(pll/pllmax))))))
except ValueError as exc:
print(exc)
cost[app] = math.inf
return -1
print(cost)
return cost
def check_thres(ninfo, app_attr):
""" Check if comms thresholds are satisfied at device level """
devrttmax = float(app_attr['devLevel']['rttmax'])
devpllmax = float(app_attr['devLevel']['pllmax'])
for iface in ninfo:
if ninfo[iface]['rtt_ms'] == 'ICMP request timeout':
return -1
try:
rtt = float(ninfo[iface]['rtt_ms'])
total_packets = float(ninfo[iface]['rx_packets'] + ninfo[iface]['tx_packets'])
total_lost = float(ninfo[iface]['rx_lost_packets'] + ninfo[iface]['tx_lost_packets'])
pll = total_lost / total_packets
except yaml.YAMLError as exc:
print(exc)
sys.exit()
if ((rtt >= devrttmax) or (pll >= devpllmax)):
return 1
return 0
def main():
""" Main app """
params = get_params()
config_path = DEF_CONFIG_PATH
if params.config:
config_path = params.config
conf = {}
conf = get_config(conf, config_path)
apps = get_appinfo(DEF_APPINFO_PATH)
attr = get_appinfo(DEF_APPATTR_PATH)
while True:
conf = get_config(conf, config_path) # Refresh config each interval
data = get_telemetry(conf)
# print(json.dumps(data, sort_keys=True, indent=4))
thres_violation = check_thres(data['net_interfaces'], attr)
if thres_violation == -1:
print('Could not obtain comms. attributes')
elif thres_violation == 1:
print('Comms. requirements not satisfied')
costs = get_cost(data['net_interfaces'], attr, apps)
print(costs)
markedapp = max(costs)
apps[markedapp]['active'] = False
print(markedapp)
else:
print('Up and running')
time.sleep(int(conf['monitor']['interval']))
main()
monitor:
interval: 1
netinterfaces:
- enp0s3
endpoint:
icmp_ping: 8.8.8.8
http_ping: http://www.konnekt.ikerlan.es
export:
screen:
enabled: no
file:
enabled: yes
path: ./telemetry-data/telemetry-output
mqtt:
enabled: no
hostname: emq.konnekt.ikerlan.es
port: 1883
username: dev
password:
topic: konnekt/v2/demo/telemetry
tls:
enabled: no
ca:
cert:
key:
#-------------------------------------------------------------#
# - Docker Compose file for the KonnektBox Telemetry Daemon - #
#-------------------------------------------------------------#
version: '3'
services:
telemetry:
build:
context: ./
dockerfile: ./DockerfileKBTD
image: konnektbox-telemetry
container_name: telem
network_mode: "host"
volumes:
- telemetry-data:/telemetry-data/
commscost:
build:
context: ./
dockerfile: ./DockerfileNFRComms
image: nfr-comms
container_name: commscost
volumes:
- telemetry-data:/telemetry-data/
depends_on:
- telemetry
volumes:
telemetry-data:
#!/usr/bin/python3
"""KonnektBox Manager
Manages KonnektBox telemetry
Copyright (C) 2020 IKERLAN S.Coop
"""
# Standard imports
import argparse
import os
import re
import sys
import time
import json
import socket
# Installed packages
import yaml
import psutil
import cpuinfo
import paho.mqtt.publish as publish
# Constants
DEF_CONFIG_PATH = './config.yml'
def get_cpu_info(config):
""" Gets CPU info """
cinfo = {}
# Information to be provided at monitoring start up only:
if config['isfirst']:
cinfo['model'] = cpuinfo.get_cpu_info()['brand']
cinfo['cores_physical'] = psutil.cpu_count(logical=False)
cinfo['cores_logical'] = psutil.cpu_count()
# Get cpu usage in percentage:
cinfo['cpu_usage'] = psutil.cpu_percent()
cinfo['cpu_freq'] = psutil.cpu_freq()
return cinfo
def get_disk_usage():
""" Get disk usage in percentage """
return psutil.disk_usage('/')[3]
def get_mem_info():
""" Get mem usage and availability """
minfo = {}
mem_attr = psutil.virtual_memory()
minfo['available_mem_MB'] = mem_attr.available/1000000
minfo['mem_usage_MB'] = mem_attr.used/1000000
return minfo
def get_netinterfaces_info(config):
""" Gets information from network interfaces """
info = {}
pll_stats = psutil.net_io_counters(pernic=True)
pernic_addr = psutil.net_if_addrs()
for iface in config['monitor']['netinterfaces']:
try:
ninfo = {}
if config['isfirst']:
for addr in pernic_addr[iface]:
# Get IP address:
if addr.family == socket.AF_INET:
ninfo['ip'] = addr.address
# Get MAC address:
elif addr.family == psutil.AF_LINK:
ninfo['mac'] = addr.address
# Get total/dropped packets and data volume
ninfo['rx_MB'] = pll_stats[iface].bytes_recv/1000000
ninfo['rx_packets'] = pll_stats[iface].packets_recv
ninfo['rx_lost_packets'] = pll_stats[iface].dropin
ninfo['tx_MB'] = pll_stats[iface].bytes_sent/1000000
ninfo['tx_packets'] = pll_stats[iface].packets_sent
ninfo['tx_lost_packets'] = pll_stats[iface].dropout
# Get RTT to a certain endpoint via ICMP
command = "ping -c 1 -w 1 -W 1 -I {} {}".format(iface, config['endpoint']['icmp_ping'])
request = os.popen(command).read()
request = re.search('=\\s(\\d+\\.\\d+)', request)
if request is None:
ninfo['rtt_ms'] = "ICMP request timeout"
else:
rtt = float(request.group(1))
ninfo['rtt_ms'] = float('{0:.2f}'.format(rtt))
info[iface] = ninfo
except psutil.Error as exc:
print("Error reading interface {}: {}".format(iface, exc))
return info
def get_config(config, path):
""" Return CONFIG read from CONFIG file (same CONFIG if file not changed) """
if 'timestamp' in config:
if config['timestamp'] == os.path.getmtime(path):
return config
with open(path, 'r') as stream:
try:
config = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
sys.exit()
config['timestamp'] = os.path.getmtime(path)
return config
def get_params():
""" Get console input parameters """
parser = argparse.ArgumentParser(description='KonnektBox telemetry daemon.')
parser.add_argument('-c', '--config', metavar='path', help='path of the config file')
args = parser.parse_args()
return args
def telemetry_get(config):
""" Gets telemetry data """
# Build JSON with telemetry data
telemetry_data = {}
telemetry_data['net_interfaces'] = get_netinterfaces_info(config)
telemetry_data['cpu_info'] = get_cpu_info(config)
telemetry_data['disk_usage'] = get_disk_usage()
telemetry_data['mem_usage'] = get_mem_info()
return telemetry_data
def telemetry_export(config, telemetry_data):
""" Exports telemetry data """
if config['export']['screen']['enabled']:
print(json.dumps(telemetry_data, sort_keys=True, indent=4))
if config['export']['mqtt']['enabled']:
tls_cfg = None
if config['export']['mqtt']['tls']['enabled']:
tls_cfg = {'ca_certs':config['export']['mqtt']['tls']['ca'],
'certfile':config['export']['mqtt']['tls']['cert'],
'keyfile':config['export']['mqtt']['tls']['key']}
publish.single(config['export']['mqtt']['topic'],
json.dumps(telemetry_data),
auth={
'username':config['export']['mqtt']['username'],
'password':config['export']['mqtt']['password']},
hostname=config['export']['mqtt']['hostname'],
tls=tls_cfg)
if config['export']['file']['enabled']:
with open(config['export']['file']['path'], 'w') as myfile:
# myfile.write('{},{}\n'.format(time.time(), json.dumps(telemetry_data)))
myfile.write('{}\n'.format(json.dumps(telemetry_data)))
def main():
""" Main app """
params = get_params()
config_path = DEF_CONFIG_PATH
if params.config:
config_path = params.config
conf = {}
conf = get_config(conf, config_path)
conf['isfirst'] = True
while True:
conf = get_config(conf, config_path) # Refresh config each interval
data = telemetry_get(conf)
telemetry_export(conf, data)
conf['isfirst'] = False
time.sleep(int(conf['monitor']['interval']))
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment