summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_async_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py330
1 files changed, 0 insertions, 330 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
deleted file mode 100644
index ef4c48f9..00000000
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ /dev/null
@@ -1,330 +0,0 @@
-#!/router/bin/python
-
-try:
- # support import for Python 2
- import outer_packages
-except ImportError:
- # support import for Python 3
- import client.outer_packages
-from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
-
-from common.text_opts import *
-
-import json
-import threading
-import time
-import datetime
-import zmq
-import re
-import random
-
-from common.trex_stats import *
-from common.trex_streams import *
-from common.trex_types import *
-
-# basic async stats class
-class CTRexAsyncStats(object):
- def __init__ (self):
- self.ref_point = None
- self.current = {}
- self.last_update_ts = datetime.datetime.now()
-
- def update (self, snapshot):
-
- #update
- self.last_update_ts = datetime.datetime.now()
-
- self.current = snapshot
-
- if self.ref_point == None:
- self.ref_point = self.current
-
- def clear(self):
- self.ref_point = self.current
-
-
- def get(self, field, format=False, suffix=""):
-
- if not field in self.current:
- return "N/A"
-
- if not format:
- return self.current[field]
- else:
- return format_num(self.current[field], suffix)
-
- def get_rel (self, field, format=False, suffix=""):
- if not field in self.current:
- return "N/A"
-
- if not format:
- return (self.current[field] - self.ref_point[field])
- else:
- return format_num(self.current[field] - self.ref_point[field], suffix)
-
-
- # return true if new data has arrived in the past 2 seconds
- def is_online (self):
- delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000
- return (delta_ms < 2000)
-
-# describes the general stats provided by TRex
-class CTRexAsyncStatsGeneral(CTRexAsyncStats):
- def __init__ (self):
- super(CTRexAsyncStatsGeneral, self).__init__()
-
-
-# per port stats
-class CTRexAsyncStatsPort(CTRexAsyncStats):
- def __init__ (self):
- super(CTRexAsyncStatsPort, self).__init__()
-
- def get_stream_stats (self, stream_id):
- return None
-
-# stats manager
-class CTRexAsyncStatsManager():
- def __init__ (self):
-
- self.general_stats = CTRexAsyncStatsGeneral()
- self.port_stats = {}
-
-
- def get_general_stats(self):
- return self.general_stats
-
- def get_port_stats (self, port_id):
-
- if not str(port_id) in self.port_stats:
- return None
-
- return self.port_stats[str(port_id)]
-
-
- def update(self, data):
- self.__handle_snapshot(data)
-
- def __handle_snapshot(self, snapshot):
-
- general_stats = {}
- port_stats = {}
-
- # filter the values per port and general
- for key, value in snapshot.iteritems():
-
- # match a pattern of ports
- m = re.search('(.*)\-([0-8])', key)
- if m:
-
- port_id = m.group(2)
- field_name = m.group(1)
-
- if not port_id in port_stats:
- port_stats[port_id] = {}
-
- port_stats[port_id][field_name] = value
-
- else:
- # no port match - general stats
- general_stats[key] = value
-
- # update the general object with the snapshot
- self.general_stats.update(general_stats)
-
- # update all ports
- for port_id, data in port_stats.iteritems():
-
- if not port_id in self.port_stats:
- self.port_stats[port_id] = CTRexAsyncStatsPort()
-
- self.port_stats[port_id].update(data)
-
-
-
-
-
-class CTRexAsyncClient():
- def __init__ (self, server, port, stateless_client):
-
- self.port = port
- self.server = server
-
- self.stateless_client = stateless_client
-
- self.event_handler = stateless_client.event_handler
- self.logger = self.stateless_client.logger
-
- self.raw_snapshot = {}
-
- self.stats = CTRexAsyncStatsManager()
-
- self.last_data_recv_ts = 0
- self.async_barrier = None
-
- self.connected = False
-
- # connects the async channel
- def connect (self):
-
- if self.connected:
- self.disconnect()
-
- self.tr = "tcp://{0}:{1}".format(self.server, self.port)
-
- # Socket to talk to server
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
-
-
- # before running the thread - mark as active
- self.active = True
- self.t = threading.Thread(target = self._run)
-
- # kill this thread on exit and don't add it to the join list
- self.t.setDaemon(True)
- self.t.start()
-
- self.connected = True
-
- rc = self.barrier()
- if not rc:
- self.disconnect()
- return rc
-
- return RC_OK()
-
-
-
-
- # disconnect
- def disconnect (self):
- if not self.connected:
- return
-
- # signal that the context was destroyed (exit the thread loop)
- self.context.term()
-
- # mark for join and join
- self.active = False
- self.t.join()
-
- # done
- self.connected = False
-
-
- # thread function
- def _run (self):
-
- # socket must be created on the same thread
- self.socket.setsockopt(zmq.SUBSCRIBE, '')
- self.socket.setsockopt(zmq.RCVTIMEO, 5000)
- self.socket.connect(self.tr)
-
- got_data = False
-
- while self.active:
- try:
-
- line = self.socket.recv_string()
- self.last_data_recv_ts = time.time()
-
- # signal once
- if not got_data:
- self.event_handler.on_async_alive()
- got_data = True
-
-
- # got a timeout - mark as not alive and retry
- except zmq.Again:
-
- # signal once
- if got_data:
- self.event_handler.on_async_dead()
- got_data = False
-
- continue
-
- except zmq.ContextTerminated:
- # outside thread signaled us to exit
- break
-
- msg = json.loads(line)
-
- name = msg['name']
- data = msg['data']
- type = msg['type']
- self.raw_snapshot[name] = data
-
- self.__dispatch(name, type, data)
-
-
- # closing of socket must be from the same thread
- self.socket.close(linger = 0)
-
-
- # did we get info for the last 3 seconds ?
- def is_alive (self):
- if self.last_data_recv_ts == None:
- return False
-
- return ( (time.time() - self.last_data_recv_ts) < 3 )
-
- def get_stats (self):
- return self.stats
-
- def get_raw_snapshot (self):
- return self.raw_snapshot
-
- # dispatch the message to the right place
- def __dispatch (self, name, type, data):
- # stats
- if name == "trex-global":
- self.event_handler.handle_async_stats_update(data)
-
- # events
- elif name == "trex-event":
- self.event_handler.handle_async_event(type, data)
-
- # barriers
- elif name == "trex-barrier":
- self.handle_async_barrier(type, data)
- else:
- pass
-
-
- # async barrier handling routine
- def handle_async_barrier (self, type, data):
- if self.async_barrier['key'] == type:
- self.async_barrier['ack'] = True
-
-
- # block on barrier for async channel
- def barrier(self, timeout = 5):
-
- # set a random key
- key = random.getrandbits(32)
- self.async_barrier = {'key': key, 'ack': False}
-
- # expr time
- expr = time.time() + timeout
-
- while not self.async_barrier['ack']:
-
- # inject
- rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
- if not rc:
- return rc
-
- # fast loop
- for i in xrange(0, 100):
- if self.async_barrier['ack']:
- break
- time.sleep(0.001)
-
- if time.time() > expr:
- return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
-
- return RC_OK()
-
-
-