diff options
author | 2016-01-18 06:59:36 -0500 | |
---|---|---|
committer | 2016-01-21 10:11:54 -0500 | |
commit | 9932ff8dcf4f8b6b6f3986832f8a1a8f8461c743 (patch) | |
tree | 28c577725377131eafbd54319407066dcbd385a8 /scripts/automation/trex_control_plane/client | |
parent | 11d328d3e40b04540489eec83ac484d5b06254bb (diff) |
async publish now
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_async_client.py | 53 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_stateless_client.py | 11 |
2 files changed, 53 insertions, 11 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 66e65a32..2bb0e9cd 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -16,6 +16,7 @@ import time import datetime import zmq import re +import random from common.trex_stats import * from common.trex_streams import * @@ -155,6 +156,7 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 + self.async_barriers = [] self.connected = False @@ -188,17 +190,16 @@ class CTRexAsyncClient(): self.connected = True - - # wait for data streaming from the server - timeout = time.time() + 5 - while not self.is_alive(): - time.sleep(0.01) - if time.time() > timeout: - self.disconnect() - return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + # send a barrier and wait for ack + rc = self.block_on_stats() + if not rc: + self.disconnect() + return rc return RC_OK() + + # disconnect def disconnect (self): @@ -284,10 +285,46 @@ class CTRexAsyncClient(): # stats if name == "trex-global": self.stateless_client.handle_async_stats_update(data) + # events elif name == "trex-event": self.stateless_client.handle_async_event(type, data) + + # barriers + elif name == "trex-barrier": + self.handle_async_barrier(type, data) else: pass + # async barrier handling routine + def handle_async_barrier (self, type, data): + + for b in self.async_barriers: + if b['key'] == type: + b['ack'] = True + + + # force update a new snapshot from the server + def block_on_stats(self, timeout = 5): + + # set a random key + key = random.getrandbits(32) + barrier = {'key': key, 'ack' : False} + + # add to the queue + self.async_barriers.append(barrier) + + rc = self.stateless_client.transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + expr = time.time() + timeout + while not barrier['ack']: + time.sleep(0.001) + if time.time() > expr: + return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) + + return RC_OK() + + diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index b7700c7c..105c4d01 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -55,12 +55,16 @@ class LoggerApi(object): return (self.level >= level) + # simple log message with verbose def log (self, msg, level = VERBOSE_REGULAR, newline = True): if not self.check_verbose(level): return self.write(msg, newline) + # annotates an action with a RC - writes to log the result + def annotate (self, rc, desc = None, show_status = True): + rc.annotate(self.log, desc, show_status) # default logger - to stdout class DefaultLogger(LoggerApi): @@ -82,7 +86,7 @@ class CTRexStatelessClient(object): server = "localhost", sync_port = 4501, async_port = 4500, - quiet = False, + verbose_level = LoggerApi.VERBOSE_REGULAR, virtual = False, logger = None): @@ -90,13 +94,14 @@ class CTRexStatelessClient(object): self.user = username + # logger if not logger: self.logger = DefaultLogger() else: self.logger = logger - if quiet: - self.logger.set_verbose(self.logger.VERBOSE_QUIET) + # initial verbose + self.logger.set_verbose(verbose_level) self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.logger) |