summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-02-08 10:55:20 -0500
committerimarom <imarom@cisco.com>2016-02-08 10:55:20 -0500
commit6107c1ca4aa485c5971ff3326513b8f4934f7ac1 (patch)
treea109d80501bd087e3219f68c186fb55bc17e090a /scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
parentf5a5e50bfe046148a20f6ce578d6082119dec2c0 (diff)
huge refactor - again
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py')
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py322
1 files changed, 322 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
new file mode 100644
index 00000000..410482b9
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
@@ -0,0 +1,322 @@
+#!/router/bin/python
+
+import json
+import threading
+import time
+import datetime
+import zmq
+import re
+import random
+
+from trex_stl_jsonrpc_client import JsonRpcClient, BatchMessage
+
+from utils.text_opts import *
+from trex_stl_stats import *
+from trex_stl_types import *
+
+# basic async stats class
+class CTRexAsyncStats(object):
+ def __init__ (self):
+ self.ref_point = None
+ self.current = {}
+ self.last_update_ts = datetime.datetime.now()
+
+ def update (self, snapshot):
+
+ #update
+ self.last_update_ts = datetime.datetime.now()
+
+ self.current = snapshot
+
+ if self.ref_point == None:
+ self.ref_point = self.current
+
+ def clear(self):
+ self.ref_point = self.current
+
+
+ def get(self, field, format=False, suffix=""):
+
+ if not field in self.current:
+ return "N/A"
+
+ if not format:
+ return self.current[field]
+ else:
+ return format_num(self.current[field], suffix)
+
+ def get_rel (self, field, format=False, suffix=""):
+ if not field in self.current:
+ return "N/A"
+
+ if not format:
+ return (self.current[field] - self.ref_point[field])
+ else:
+ return format_num(self.current[field] - self.ref_point[field], suffix)
+
+
+ # return true if new data has arrived in the past 2 seconds
+ def is_online (self):
+ delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000
+ return (delta_ms < 2000)
+
+# describes the general stats provided by TRex
+class CTRexAsyncStatsGeneral(CTRexAsyncStats):
+ def __init__ (self):
+ super(CTRexAsyncStatsGeneral, self).__init__()
+
+
+# per port stats
+class CTRexAsyncStatsPort(CTRexAsyncStats):
+ def __init__ (self):
+ super(CTRexAsyncStatsPort, self).__init__()
+
+ def get_stream_stats (self, stream_id):
+ return None
+
+# stats manager
+class CTRexAsyncStatsManager():
+ def __init__ (self):
+
+ self.general_stats = CTRexAsyncStatsGeneral()
+ self.port_stats = {}
+
+
+ def get_general_stats(self):
+ return self.general_stats
+
+ def get_port_stats (self, port_id):
+
+ if not str(port_id) in self.port_stats:
+ return None
+
+ return self.port_stats[str(port_id)]
+
+
+ def update(self, data):
+ self.__handle_snapshot(data)
+
+ def __handle_snapshot(self, snapshot):
+
+ general_stats = {}
+ port_stats = {}
+
+ # filter the values per port and general
+ for key, value in snapshot.iteritems():
+
+ # match a pattern of ports
+ m = re.search('(.*)\-([0-8])', key)
+ if m:
+
+ port_id = m.group(2)
+ field_name = m.group(1)
+
+ if not port_id in port_stats:
+ port_stats[port_id] = {}
+
+ port_stats[port_id][field_name] = value
+
+ else:
+ # no port match - general stats
+ general_stats[key] = value
+
+ # update the general object with the snapshot
+ self.general_stats.update(general_stats)
+
+ # update all ports
+ for port_id, data in port_stats.iteritems():
+
+ if not port_id in self.port_stats:
+ self.port_stats[port_id] = CTRexAsyncStatsPort()
+
+ self.port_stats[port_id].update(data)
+
+
+
+
+
+class CTRexAsyncClient():
+ def __init__ (self, server, port, stateless_client):
+
+ self.port = port
+ self.server = server
+
+ self.stateless_client = stateless_client
+
+ self.event_handler = stateless_client.event_handler
+ self.logger = self.stateless_client.logger
+
+ self.raw_snapshot = {}
+
+ self.stats = CTRexAsyncStatsManager()
+
+ self.last_data_recv_ts = 0
+ self.async_barrier = None
+
+ self.connected = False
+
+ # connects the async channel
+ def connect (self):
+
+ if self.connected:
+ self.disconnect()
+
+ self.tr = "tcp://{0}:{1}".format(self.server, self.port)
+
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+
+ # before running the thread - mark as active
+ self.active = True
+ self.t = threading.Thread(target = self._run)
+
+ # kill this thread on exit and don't add it to the join list
+ self.t.setDaemon(True)
+ self.t.start()
+
+ self.connected = True
+
+ rc = self.barrier()
+ if not rc:
+ self.disconnect()
+ return rc
+
+ return RC_OK()
+
+
+
+
+ # disconnect
+ def disconnect (self):
+ if not self.connected:
+ return
+
+ # signal that the context was destroyed (exit the thread loop)
+ self.context.term()
+
+ # mark for join and join
+ self.active = False
+ self.t.join()
+
+ # done
+ self.connected = False
+
+
+ # thread function
+ def _run (self):
+
+ # socket must be created on the same thread
+ self.socket.setsockopt(zmq.SUBSCRIBE, '')
+ self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+ self.socket.connect(self.tr)
+
+ got_data = False
+
+ while self.active:
+ try:
+
+ line = self.socket.recv_string()
+ self.last_data_recv_ts = time.time()
+
+ # signal once
+ if not got_data:
+ self.event_handler.on_async_alive()
+ got_data = True
+
+
+ # got a timeout - mark as not alive and retry
+ except zmq.Again:
+
+ # signal once
+ if got_data:
+ self.event_handler.on_async_dead()
+ got_data = False
+
+ continue
+
+ except zmq.ContextTerminated:
+ # outside thread signaled us to exit
+ break
+
+ msg = json.loads(line)
+
+ name = msg['name']
+ data = msg['data']
+ type = msg['type']
+ self.raw_snapshot[name] = data
+
+ self.__dispatch(name, type, data)
+
+
+ # closing of socket must be from the same thread
+ self.socket.close(linger = 0)
+
+
+ # did we get info for the last 3 seconds ?
+ def is_alive (self):
+ if self.last_data_recv_ts == None:
+ return False
+
+ return ( (time.time() - self.last_data_recv_ts) < 3 )
+
+ def get_stats (self):
+ return self.stats
+
+ def get_raw_snapshot (self):
+ return self.raw_snapshot
+
+ # dispatch the message to the right place
+ def __dispatch (self, name, type, data):
+ # stats
+ if name == "trex-global":
+ self.event_handler.handle_async_stats_update(data)
+
+ # events
+ elif name == "trex-event":
+ self.event_handler.handle_async_event(type, data)
+
+ # barriers
+ elif name == "trex-barrier":
+ self.handle_async_barrier(type, data)
+ else:
+ pass
+
+
+ # async barrier handling routine
+ def handle_async_barrier (self, type, data):
+ if self.async_barrier['key'] == type:
+ self.async_barrier['ack'] = True
+
+
+ # block on barrier for async channel
+ def barrier(self, timeout = 5):
+
+ # set a random key
+ key = random.getrandbits(32)
+ self.async_barrier = {'key': key, 'ack': False}
+
+ # expr time
+ expr = time.time() + timeout
+
+ while not self.async_barrier['ack']:
+
+ # inject
+ rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
+ if not rc:
+ return rc
+
+ # fast loop
+ for i in xrange(0, 100):
+ if self.async_barrier['ack']:
+ break
+ time.sleep(0.001)
+
+ if time.time() > expr:
+ return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
+
+ return RC_OK()
+
+
+