From 2ae2e4e860194ee8d2b5ec5c4a1375751f51dd98 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Nov 2015 17:17:38 +0200 Subject: full async DP stop support --- .../trex_control_plane/client/trex_async_client.py | 48 ++++++++++++++++------ .../client/trex_stateless_client.py | 9 +++- 2 files changed, 43 insertions(+), 14 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 31bec93f..adb91d97 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -8,6 +8,8 @@ except ImportError: import client.outer_packages from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage +from common.text_opts import * + import json import threading import time @@ -103,13 +105,9 @@ class TrexAsyncStatsManager(): return self.port_stats[str(port_id)] - def update (self, snapshot): - - if snapshot['name'] == 'trex-global': - self.__handle_snapshot(snapshot['data']) - else: - # for now ignore the rest - return + + def update (self, data): + self.__handle_snapshot(data) def __handle_snapshot (self, snapshot): @@ -151,10 +149,11 @@ class TrexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port): + def __init__ (self, server, port, stateless_client): self.port = port self.server = server + self.stateless_client = stateless_client self.raw_snapshot = {} @@ -165,14 +164,15 @@ class CTRexAsyncClient(): print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) self.active = True - self.t = threading.Thread(target = self._run) + self.t = threading.Thread(target = self.run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() - def _run (self): + + def run (self): # Socket to talk to server self.context = zmq.Context() @@ -185,10 +185,12 @@ class CTRexAsyncClient(): line = self.socket.recv_string(); msg = json.loads(line) - key = msg['name'] - self.raw_snapshot[key] = msg['data'] + name = msg['name'] + data = msg['data'] + type = msg['type'] + self.raw_snapshot[name] = data - self.stats.update(msg) + self.__dispatch(name, type, data) def get_stats (self): @@ -199,6 +201,26 @@ class CTRexAsyncClient(): return self.raw_snapshot + # dispatch the message to the right place + def __dispatch (self, name, type, data): + # stats + if name == "trex-global": + self.stats.update(data) + # events + elif name == "trex-event": + self.__handle_async_event(type, data) + else: + # ignore + pass + + def __handle_async_event (self, type, data): + # DP stopped + if (type == 0): + port_id = int(data['port_id']) + print format_text("\n[Event] - Port {0} Stopped".format(port_id), 'bold') + # call the handler + self.stateless_client.async_event_port_stopped(port_id) + def stop (self): self.active = False self.t.join() diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 699f0af2..dd11fb67 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -320,6 +320,9 @@ class Port(object): return self.ok() + ################# events handler ###################### + def async_event_port_stopped (self): + self.state = self.STATE_STREAMS class CTRexStatelessClient(object): @@ -337,12 +340,16 @@ class CTRexStatelessClient(object): self._server_version = None self.__err_log = None - self._async_client = CTRexAsyncClient(server, async_port) + self._async_client = CTRexAsyncClient(server, async_port, self) self.streams_db = CStreamsDB() self.connected = False + ################# events handler ###################### + def async_event_port_stopped (self, port_id): + self.ports[port_id].async_event_port_stopped() + ############# helper functions section ############## def validate_port_list(self, port_id_list): -- cgit 1.2.3-korg