diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
3 files changed, 50 insertions, 17 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 8fdf7c9b..00304886 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -152,16 +152,19 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client): + def __init__ (self, server, port, stateless_client, prn_func = None): self.port = port self.server = server self.stateless_client = stateless_client + self.prn_func = prn_func self.raw_snapshot = {} self.stats = CTRexAsyncStatsManager() + self.last_data_recv_ts = 0 + self.connected = False # connects the async channel @@ -171,7 +174,13 @@ class CTRexAsyncClient(): self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + if self.prn_func: + self.prn_func(msg) + else: + print msg # Socket to talk to server self.context = zmq.Context() @@ -180,7 +189,6 @@ class CTRexAsyncClient(): # before running the thread - mark as active self.active = True - self.alive = False self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list @@ -192,7 +200,7 @@ class CTRexAsyncClient(): # wait for data streaming from the server timeout = time.time() + 5 - while not self.alive: + while not self.is_alive(): time.sleep(0.01) if time.time() > timeout: self.disconnect() @@ -219,35 +227,38 @@ class CTRexAsyncClient(): # thread function def _run (self): - # no data yet... - self.alive = False # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.RCVTIMEO, 5000) + got_data = False + while self.active: try: line = self.socket.recv_string() + self.last_data_recv_ts = time.time() - if not self.alive: + # signal once + if not got_data: self.stateless_client.on_async_alive() - self.alive = True + got_data = True + # got a timeout - mark as not alive and retry except zmq.Again: - if self.alive: + # signal once + if got_data: self.stateless_client.on_async_dead() - self.alive = False + got_data = False continue except zmq.ContextTerminated: # outside thread signaled us to exit - self.alive = False break msg = json.loads(line) @@ -264,6 +275,13 @@ class CTRexAsyncClient(): self.socket.close(linger = 0) + # did we get info for the last 3 seconds ? + def is_alive (self): + if self.last_data_recv_ts == None: + return False + + return ( (time.time() - self.last_data_recv_ts) < 3 ) + def get_stats (self): return self.stats diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py index 5c5702dd..4f82e86a 100644 --- a/scripts/automation/trex_control_plane/client/trex_port.py +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -387,6 +387,9 @@ class Port(object): def clear_stats(self): return self.port_stats.clear_stats() + def invalidate_stats(self): + return self.port_stats.invalidate() + ################# events handler ###################### def async_event_port_stopped (self): diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index a2b1f6d9..899805cf 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -55,7 +55,7 @@ class CTRexStatelessClient(object): self.user = username - self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func) # default verbose level self.verbose = self.VERBOSE_REGULAR @@ -68,7 +68,7 @@ class CTRexStatelessClient(object): self.server_version = {} self.__err_log = None - self.async_client = CTRexAsyncClient(server, async_port, self) + self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func) self.streams_db = CStreamsDB() self.global_stats = trex_stats.CGlobalStats(self._connection_info, @@ -105,8 +105,8 @@ class CTRexStatelessClient(object): st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) - if show and self.check_verbose(self.VERBOSE_REGULAR): - print format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))) + if show: + self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) def handle_async_stats_update(self, dump_data): @@ -473,6 +473,10 @@ class CTRexStatelessClient(object): def get_verbose (self): return self.verbose + def prn_func (self, msg, level = VERBOSE_REGULAR): + if self.check_verbose(level): + print msg + ############# server actions ################ # ping server @@ -754,6 +758,14 @@ class CTRexStatelessClient(object): return RC_OK() + def cmd_invalidate (self, port_id_list): + for port_id in port_id_list: + self.ports[port_id].invalidate_stats() + + self.global_stats.invalidate() + + return RC_OK() + # pause cmd def cmd_pause (self, port_id_list): @@ -1146,13 +1158,13 @@ class CTRexStatelessClient(object): # ------ private classes ------ # class CCommLink(object): """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False): + def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): super(CTRexStatelessClient.CCommLink, self).__init__() self.virtual = virtual self.server = server self.port = port self.verbose = False - self.rpc_link = JsonRpcClient(self.server, self.port) + self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) @property def is_connected(self): |