From 9932ff8dcf4f8b6b6f3986832f8a1a8f8461c743 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 18 Jan 2016 06:59:36 -0500 Subject: async publish now --- .../trex_control_plane/client/trex_async_client.py | 53 ++++++++++++++++++---- .../client/trex_stateless_client.py | 11 +++-- .../client_utils/jsonrpc_client.py | 7 +-- .../trex_control_plane/common/trex_types.py | 18 ++++++-- .../trex_control_plane/console/trex_console.py | 49 ++++++-------------- 5 files changed, 85 insertions(+), 53 deletions(-) (limited to 'scripts/automation') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 66e65a32..2bb0e9cd 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -16,6 +16,7 @@ import time import datetime import zmq import re +import random from common.trex_stats import * from common.trex_streams import * @@ -155,6 +156,7 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 + self.async_barriers = [] self.connected = False @@ -188,17 +190,16 @@ class CTRexAsyncClient(): self.connected = True - - # wait for data streaming from the server - timeout = time.time() + 5 - while not self.is_alive(): - time.sleep(0.01) - if time.time() > timeout: - self.disconnect() - return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + # send a barrier and wait for ack + rc = self.block_on_stats() + if not rc: + self.disconnect() + return rc return RC_OK() + + # disconnect def disconnect (self): @@ -284,10 +285,46 @@ class CTRexAsyncClient(): # stats if name == "trex-global": self.stateless_client.handle_async_stats_update(data) + # events elif name == "trex-event": self.stateless_client.handle_async_event(type, data) + + # barriers + elif name == "trex-barrier": + self.handle_async_barrier(type, data) else: pass + # async barrier handling routine + def handle_async_barrier (self, type, data): + + for b in self.async_barriers: + if b['key'] == type: + b['ack'] = True + + + # force update a new snapshot from the server + def block_on_stats(self, timeout = 5): + + # set a random key + key = random.getrandbits(32) + barrier = {'key': key, 'ack' : False} + + # add to the queue + self.async_barriers.append(barrier) + + rc = self.stateless_client.transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + expr = time.time() + timeout + while not barrier['ack']: + time.sleep(0.001) + if time.time() > expr: + return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) + + return RC_OK() + + diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index b7700c7c..105c4d01 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -55,12 +55,16 @@ class LoggerApi(object): return (self.level >= level) + # simple log message with verbose def log (self, msg, level = VERBOSE_REGULAR, newline = True): if not self.check_verbose(level): return self.write(msg, newline) + # annotates an action with a RC - writes to log the result + def annotate (self, rc, desc = None, show_status = True): + rc.annotate(self.log, desc, show_status) # default logger - to stdout class DefaultLogger(LoggerApi): @@ -82,7 +86,7 @@ class CTRexStatelessClient(object): server = "localhost", sync_port = 4501, async_port = 4500, - quiet = False, + verbose_level = LoggerApi.VERBOSE_REGULAR, virtual = False, logger = None): @@ -90,13 +94,14 @@ class CTRexStatelessClient(object): self.user = username + # logger if not logger: self.logger = DefaultLogger() else: self.logger = logger - if quiet: - self.logger.set_verbose(self.logger.VERBOSE_QUIET) + # initial verbose + self.logger.set_verbose(verbose_level) self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.logger) diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index a5e01340..e08f5d69 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -124,7 +124,7 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 10: + if tries > 5: self.disconnect() return RC_ERR("*** [RPC] - Failed to send message to server") @@ -136,9 +136,9 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 10: + if tries > 5: self.disconnect() - return RC_ERR("*** [RPC] - Failed to get server response") + return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport)) self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -173,6 +173,7 @@ class JsonRpcClient(object): else: return RC_ERR(response_json["error"]["message"]) + # if no error there should be a result if ("result" not in response_json): return RC_ERR("Malformed Response ({0})".format(str(response_json))) diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py index 21deffd2..5c29f59b 100644 --- a/scripts/automation/trex_control_plane/common/trex_types.py +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -17,7 +17,7 @@ class RC(): def __init__ (self, rc = None, data = None): self.rc_list = [] - if (rc != None) and (data != None): + if (rc != None): tuple_rc = namedtuple('RC', ['rc', 'data']) self.rc_list.append(tuple_rc(rc, data)) @@ -35,16 +35,26 @@ class RC(): def data (self): d = [x.data if x.rc else "" for x in self.rc_list] - return (d if len(d) > 1 else d[0]) + return (d if len(d) != 1 else d[0]) def err (self): e = [x.data if not x.rc else "" for x in self.rc_list] - return (e if len(e) > 1 else e[0]) + return (e if len(e) != 1 else e[0]) def __str__ (self): return str(self.data()) if self else str(self.err()) - def annotate (self, log_func, desc = None, show_status = True): + def prn_func (self, msg, newline = True): + if newline: + print msg + else: + print msg, + + def annotate (self, log_func = None, desc = None, show_status = True): + + if not log_func: + log_func = self.prn_func + if desc: log_func(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) else: diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 2ae92fb6..f086c208 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -29,7 +29,7 @@ import sys import tty, termios import trex_root_path from common.trex_streams import * -from client.trex_stateless_client import CTRexStatelessClient +from client.trex_stateless_client import CTRexStatelessClient, LoggerApi from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts @@ -266,36 +266,6 @@ class TRexConsole(TRexGeneralCmd): return targets - # annotation method - @staticmethod - def annotate (desc, rc = None, err_log = None, ext_err_msg = None): - print format_text('\n{:<40}'.format(desc), 'bold'), - if rc == None: - print "\n" - return - - if rc == False: - # do we have a complex log object ? - if isinstance(err_log, list): - print "" - for func in err_log: - if func: - print func - print "" - - elif isinstance(err_log, str): - print "\n" + err_log + "\n" - - print format_text("[FAILED]\n", 'red', 'bold') - if ext_err_msg: - print format_text(ext_err_msg + "\n", 'blue', 'bold') - - return False - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - ####################### shell commands ####################### @verify_connected @@ -667,10 +637,19 @@ def main(): options = parser.parse_args() # Stateless client connection - stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub, options.quiet) + if options.quiet: + verbose_level = LoggerApi.VERBOSE_QUIET + elif options.verbose: + verbose_level = LoggerApi.VERBOSE_HIGH + else: + verbose_level = LoggerApi.VERBOSE_REGULAR - if not options.quiet: - print "\nlogged as {0}".format(format_text(options.user, 'bold')) + # Stateless client connection + stateless_client = CTRexStatelessClient(options.user, + options.server, + options.port, + options.pub, + verbose_level) # TUI or no acquire will give us READ ONLY mode if options.tui or not options.acquire: @@ -679,7 +658,7 @@ def main(): rc = stateless_client.connect("RW") # unable to connect - bye - if rc.bad(): + if not rc: rc.annotate() return -- cgit 1.2.3-korg