diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_async_client.py | 53 |
1 files changed, 45 insertions, 8 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 66e65a32..2bb0e9cd 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -16,6 +16,7 @@ import time import datetime import zmq import re +import random from common.trex_stats import * from common.trex_streams import * @@ -155,6 +156,7 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 + self.async_barriers = [] self.connected = False @@ -188,17 +190,16 @@ class CTRexAsyncClient(): self.connected = True - - # wait for data streaming from the server - timeout = time.time() + 5 - while not self.is_alive(): - time.sleep(0.01) - if time.time() > timeout: - self.disconnect() - return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + # send a barrier and wait for ack + rc = self.block_on_stats() + if not rc: + self.disconnect() + return rc return RC_OK() + + # disconnect def disconnect (self): @@ -284,10 +285,46 @@ class CTRexAsyncClient(): # stats if name == "trex-global": self.stateless_client.handle_async_stats_update(data) + # events elif name == "trex-event": self.stateless_client.handle_async_event(type, data) + + # barriers + elif name == "trex-barrier": + self.handle_async_barrier(type, data) else: pass + # async barrier handling routine + def handle_async_barrier (self, type, data): + + for b in self.async_barriers: + if b['key'] == type: + b['ack'] = True + + + # force update a new snapshot from the server + def block_on_stats(self, timeout = 5): + + # set a random key + key = random.getrandbits(32) + barrier = {'key': key, 'ack' : False} + + # add to the queue + self.async_barriers.append(barrier) + + rc = self.stateless_client.transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + expr = time.time() + timeout + while not barrier['ack']: + time.sleep(0.001) + if time.time() > expr: + return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) + + return RC_OK() + + |