From b726b5682fca2b1e032380401457d1afb47e1713 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 20 Jan 2016 11:09:25 -0500 Subject: draft #3 --- .../trex_control_plane/client/trex_async_client.py | 52 +-- .../trex_control_plane/client/trex_port.py | 31 ++ .../client/trex_stateless_client.py | 409 ++++++++++++++------- .../client_utils/jsonrpc_client.py | 3 - .../trex_control_plane/common/trex_stats.py | 4 + .../trex_control_plane/common/trex_types.py | 6 +- .../trex_control_plane/console/trex_console.py | 50 ++- .../trex_control_plane/console/trex_tui.py | 6 +- scripts/stl_test_example.py | 19 +- 9 files changed, 391 insertions(+), 189 deletions(-) diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 9828c838..ef4c48f9 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -159,7 +159,7 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 - self.async_barriers = [] + self.async_barrier = None self.connected = False @@ -171,10 +171,6 @@ class CTRexAsyncClient(): self.tr = "tcp://{0}:{1}".format(self.server, self.port) - msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) - - self.logger.log(msg) - # Socket to talk to server self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) @@ -190,8 +186,7 @@ class CTRexAsyncClient(): self.connected = True - # send a barrier and wait for ack - rc = self.block_on_stats() + rc = self.barrier() if not rc: self.disconnect() return rc @@ -216,14 +211,14 @@ class CTRexAsyncClient(): # done self.connected = False + # thread function def _run (self): - # socket must be created on the same thread - self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.RCVTIMEO, 5000) + self.socket.connect(self.tr) got_data = False @@ -299,32 +294,37 @@ class CTRexAsyncClient(): # async barrier handling routine def handle_async_barrier (self, type, data): - - for b in self.async_barriers: - if b['key'] == type: - b['ack'] = True + if self.async_barrier['key'] == type: + self.async_barrier['ack'] = True - # force update a new snapshot from the server - def block_on_stats(self, timeout = 5): - + # block on barrier for async channel + def barrier(self, timeout = 5): + # set a random key key = random.getrandbits(32) - barrier = {'key': key, 'ack' : False} - - # add to the queue - self.async_barriers.append(barrier) - - rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) - if not rc: - return rc + self.async_barrier = {'key': key, 'ack': False} + # expr time expr = time.time() + timeout - while not barrier['ack']: - time.sleep(0.001) + + while not self.async_barrier['ack']: + + # inject + rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + # fast loop + for i in xrange(0, 100): + if self.async_barrier['ack']: + break + time.sleep(0.001) + if time.time() > expr: return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) return RC_OK() + diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py index 66d87f9d..7e5942d4 100644 --- a/scripts/automation/trex_control_plane/client/trex_port.py +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -198,6 +198,9 @@ class Port(object): # remove stream from port def remove_stream (self, stream_id): + if not self.is_acquired(): + return self.err("port is not owned") + if not stream_id in self.streams: return self.err("stream {0} does not exists".format(stream_id)) @@ -219,6 +222,9 @@ class Port(object): # remove all the streams def remove_all_streams (self): + if not self.is_acquired(): + return self.err("port is not owned") + params = {"handler": self.handler, "port_id": self.port_id} @@ -244,6 +250,10 @@ class Port(object): # start traffic def start (self, mul, duration): + + if not self.is_acquired(): + return self.err("port is not owned") + if self.state == self.STATE_DOWN: return self.err("Unable to start traffic - port is down") @@ -270,6 +280,9 @@ class Port(object): # with force ignores the cached state and sends the command def stop (self, force = False): + if not self.is_acquired(): + return self.err("port is not owned") + if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): return self.err("port is not transmitting") @@ -287,6 +300,9 @@ class Port(object): def pause (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -305,6 +321,9 @@ class Port(object): def resume (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_PAUSE) : return self.err("port is not in pause mode") @@ -322,6 +341,10 @@ class Port(object): def update (self, mul): + + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -338,6 +361,9 @@ class Port(object): def validate (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state == self.STATE_DOWN): return self.err("port is down") @@ -413,6 +439,11 @@ class Port(object): def clear_stats(self): return self.port_stats.clear_stats() + + def get_stats (self): + return self.port_stats.get_stats() + + def invalidate_stats(self): return self.port_stats.invalidate() diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 28e55088..69cc9838 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -51,6 +51,12 @@ class STLStateError(STLError): self.msg = "Operation '{0}' is not valid while '{1}'".format(op, state) +# port state error +class STLPortStateError(STLError): + def __init__ (self, port, op, state): + self.msg = "Operation '{0}' on port '{1}' is not valid for state '{2}'".format(op, port, state) + + # raised when argument is not valid for operation class STLArgumentError(STLError): def __init__ (self, name, got, valid_values = None, extended = None): @@ -62,6 +68,12 @@ class STLArgumentError(STLError): self.msg += "\n{0}".format(extended) +class STLTimeoutError(STLError): + def __init__ (self, timeout): + self.msg = "Timeout: operation took more than '{0}' seconds".format(timeout) + + + ############################ logger ############################# ############################ ############################# ############################ ############################# @@ -109,9 +121,38 @@ class LoggerApi(object): def async_log (self, msg, level = VERBOSE_REGULAR, newline = True): self.log(msg, level, newline) - # annotates an action with a RC - writes to log the result - def annotate (self, rc, desc = None, show_status = True): - rc.annotate(self.log, desc, show_status) + + def pre_cmd (self, desc): + self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) + self.flush() + + def post_cmd (self, rc): + if rc: + self.log(format_text("[SUCCESS]\n", 'green', 'bold')) + else: + self.log(format_text("[FAILED]\n", 'red', 'bold')) + + + def log_cmd (self, desc): + self.pre_cmd(desc) + self.post_cmd(True) + + + # supress object getter + def supress (self): + class Supress(object): + def __init__ (self, logger): + self.logger = logger + + def __enter__ (self): + self.saved_level = self.logger.get_verbose() + self.logger.set_verbose(LoggerApi.VERBOSE_QUIET) + + def __exit__ (self, type, value, traceback): + self.logger.set_verbose(self.saved_level) + + return Supress(self) + # default logger - to stdout @@ -288,7 +329,7 @@ class AsyncEventHandler(object): def __async_event_port_forced_acquired (self, port_id): self.client.ports[port_id].async_event_forced_acquired() - self.client.read_only = True + def __async_event_server_stopped (self): self.client.connected = False @@ -386,7 +427,7 @@ class CTRexStatelessClient(object): server = "localhost", sync_port = 4501, async_port = 4500, - verbose_level = LoggerApi.VERBOSE_REGULAR, + verbose_level = LoggerApi.VERBOSE_QUIET, logger = None, virtual = False): @@ -398,7 +439,6 @@ class CTRexStatelessClient(object): self.server_version = {} self.system_info = {} self.session_id = random.getrandbits(32) - self.read_only = False self.connected = False # logger @@ -634,8 +674,7 @@ class CTRexStatelessClient(object): # connect to server - # mode can be RW - read / write, RWF - read write with force , RO - read only - def __connect(self, mode = "RW"): + def __connect(self): # first disconnect if already connected if self.is_connected(): @@ -645,12 +684,18 @@ class CTRexStatelessClient(object): self.connected = False # connect sync channel + self.logger.pre_cmd("connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port'])) rc = self.comm_link.connect() + self.logger.post_cmd(rc) + if not rc: return rc # connect async channel + self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port'])) rc = self.async_client.connect() + self.logger.post_cmd(rc) + if not rc: return rc @@ -694,32 +739,6 @@ class CTRexStatelessClient(object): if not rc: return rc - # acquire all ports - if mode == "RW": - rc = self.__acquire(force = False) - - # fallback to read only if failed - if not rc: - self.logger.annotate(rc, show_status = False) - self.logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) - - self.__release(self.get_acquired_ports()) - self.read_only = True - else: - self.read_only = False - - elif mode == "RWF": - rc = self.__acquire(force = True) - if not rc: - return rc - self.read_only = False - - elif mode == "RO": - # no acquire on read only - rc = RC_OK() - self.read_only = True - - self.connected = True return RC_OK() @@ -760,27 +779,36 @@ class CTRexStatelessClient(object): return rc + self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(port_id_list)) rc = self.__remove_all_streams(port_id_list) - self.logger.annotate(rc, "Removing all streams from port(s) {0}:".format(port_id_list)) - if rc.bad(): + self.logger.post_cmd(rc) + + if not rc: return rc + self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) rc = self.__add_stream_pack(stream_list, port_id_list) - self.logger.annotate(rc, "Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) - if rc.bad(): + self.logger.post_cmd(rc) + + if not rc: return rc # when not on dry - start the traffic , otherwise validate only if not dry: + + self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(port_id_list)) rc = self.__start_traffic(mult, duration, port_id_list) - self.logger.annotate(rc,"Starting traffic on port(s) {0}:".format(port_id_list)) - + self.logger.post_cmd(rc) + return rc else: + + self.logger.pre_cmd("Validating traffic profile on port(s) {0}:".format(port_id_list)) rc = self.__validate(port_id_list) - self.logger.annotate(rc,"Validating traffic profile on port(s) {0}:".format(port_id_list)) - + self.logger.post_cmd(rc) + + if rc.bad(): return rc @@ -801,8 +829,11 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_WARN(msg) + + self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(port_id_list)) rc = self.__stop_traffic(active_ports) - self.logger.annotate(rc, "Stopping traffic on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + if not rc: return rc @@ -819,8 +850,9 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_WARN(msg) + self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(port_id_list)) rc = self.__update_traffic(mult, active_ports) - self.logger.annotate(rc, "Updating traffic on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) return rc @@ -836,8 +868,10 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_WARN(msg) + self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(port_id_list)) rc = self.__pause_traffic(active_ports) - self.logger.annotate(rc, "Pausing traffic on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + return rc @@ -852,24 +886,51 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_WARN(msg) + self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(port_id_list)) rc = self.__resume_traffic(active_ports) - self.logger.annotate(rc, "Resume traffic on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + return rc # clear stats - def __clear_stats(self, port_id_list): + def __clear_stats(self, port_id_list, clear_global): for port_id in port_id_list: self.ports[port_id].clear_stats() - self.global_stats.clear_stats() + if clear_global: + self.global_stats.clear_stats() + self.logger.pre_cmd("clearing stats on port(s) {0}:".format(port_id_list)) rc = RC_OK() - self.logger.annotate(rc, "clearing stats on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + return RC + # get stats + def __get_stats (self, port_id_list): + stats = {} + + stats['global'] = self.global_stats.get_stats() + + total = {} + for port_id in port_id_list: + port_stats = self.ports[port_id].get_stats() + stats["port {0}".format(port_id)] = port_stats + + for k, v in port_stats.iteritems(): + if not k in total: + total[k] = v + else: + total[k] += v + + stats['total'] = total + + return stats + + def __process_profiles (self, profiles, out): for profile in (profiles if isinstance(profiles, list) else [profiles]): @@ -883,7 +944,6 @@ class CTRexStatelessClient(object): stream_list = self.streams_db.load_yaml_file(profile) except Exception as e: rc = RC_ERR(str(e)) - self.logger.annotate(rc) return rc out.append(stream_list) @@ -895,31 +955,6 @@ class CTRexStatelessClient(object): return RC_OK() - - # stream list - if opts.db: - stream_list = self.streams_db.get_stream_pack(opts.db) - rc = RC(stream_list != None) - self.logger.annotate(rc,"Load stream pack (from DB):") - if rc.bad(): - return RC_ERR("Failed to load stream pack") - - else: - # load streams from file - stream_list = None - try: - stream_list = self.streams_db.load_yaml_file(opts.file[0]) - except Exception as e: - s = str(e) - rc=RC_ERR(s) - self.logger.annotate(rc) - return rc - - rc = RC(stream_list != None) - self.logger.annotate(rc,"Load stream pack (from file):") - if stream_list == None: - return RC_ERR("Failed to load stream pack") - ############ functions used by other classes but not users ############## def _verify_port_id_list (self, port_id_list): @@ -938,7 +973,8 @@ class CTRexStatelessClient(object): return False # check each item of the sequence - return port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list]) + return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list])) + # transmit request on the RPC link @@ -986,8 +1022,10 @@ class CTRexStatelessClient(object): # validate port(s) profile def cmd_validate (self, port_id_list): + self.logger.pre_cmd("Validating streams on port(s) {0}:".format(port_id_list)) rc = self.__validate(port_id_list) - self.logger.annotate(rc,"Validating streams on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + return rc @@ -1025,17 +1063,22 @@ class CTRexStatelessClient(object): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} - # verify decorator - throws exception is client is disconnected - def __verify_connected(f): - def wrap(*args, **kwargs): - inst = args[0] - func_name = f.__name__ + # API decorator - double wrap because of argument + def __api_check(connected = True): + + def wrap (f): + def wrap2(*args, **kwargs): + client = args[0] - if not inst.is_connected(): - raise STLStateError(func_name, 'disconnected') + func_name = f.__name__ - ret = f(*args, **kwargs) - return ret + # check connection + if connected and not client.is_connected(): + raise STLStateError(func_name, 'disconnected') + + ret = f(*args, **kwargs) + return ret + return wrap2 return wrap @@ -1044,7 +1087,14 @@ class CTRexStatelessClient(object): ############################ API ############################# ############################ ############################# ############################ ############################# + def __enter__ (self): + self.connect(mode = "RWF") + return self + def __exit__ (self, type, value, traceback): + if self.get_active_ports(): + self.stop(self.get_active_ports()) + self.disconnect() ############################ Getters ############################# ############################ ############################# @@ -1056,8 +1106,8 @@ class CTRexStatelessClient(object): return self.logger.get_verbose() # is the client on read only mode ? - def is_read_only (self): - return self.read_only + def is_all_ports_acquired (self): + return not (self.get_all_ports() == self.get_acquired_ports()) # is the client connected ? def is_connected (self): @@ -1088,9 +1138,9 @@ class CTRexStatelessClient(object): # returns the port object def get_port (self, port_id): - port_id = self.ports.get(port_id, None) - if (port_id != None): - return port_id + port = self.ports.get(port_id, None) + if (port != None): + return port else: raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports()) @@ -1139,26 +1189,66 @@ class CTRexStatelessClient(object): # 'RO' - read only # 'RW' - read/write # 'RWF' - read write forced (take ownership) + @__api_check(False) def connect (self, mode = "RW"): modes = ['RO', 'RW', 'RWF'] if not mode in modes: raise STLArgumentError('mode', mode, modes) - rc = self.__connect(mode) - self.logger.annotate(rc) + rc = self.__connect() + if not rc: + raise STLError(rc) + + # acquire all ports for 'RW' or 'RWF' + if (mode == "RW") or (mode == "RWF"): + self.acquire(ports = self.get_all_ports(), force = True if mode == "RWF" else False) + + + + + # acquire ports + # this is not needed if connect was called with "RW" or "RWF" + # but for "RO" this might be needed + @__api_check(True) + def acquire (self, ports = None, force = False): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify ports + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify valid port id list + if force: + self.logger.pre_cmd("Force acquiring ports {0}:".format(ports)) + else: + self.logger.pre_cmd("Acquiring ports {0}:".format(ports)) + + rc = self.__acquire(ports, force) + + self.logger.post_cmd(rc) if not rc: + self.__release(ports) raise STLError(rc) - return rc + + + # force connect syntatic sugar + @__api_check(False) + def fconnect (self): + self.connect(mode = "RWF") # disconnects from the server - def disconnect (self, annotate = True): + @__api_check(False) + def disconnect (self, log = True): rc = self.__disconnect() - if annotate: - self.logger.annotate(rc, "Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], - self.connection_info['sync_port'])) + if log: + self.logger.log_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], + self.connection_info['sync_port'])) if not rc: raise STLError(rc) @@ -1166,29 +1256,27 @@ class CTRexStatelessClient(object): # teardown - call after test is done + # NEVER throws an exception + @__api_check(False) def teardown (self, stop_traffic = True): - # stop traffic - if stop_traffic: - rc = self.stop() - if not rc: - raise STLError(rc) + # try to stop traffic + if stop_traffic and self.get_active_ports(): + try: + self.stop() + except STLError: + pass # disconnect - rc = self.__disconnect() - if not rc: - raise STLError(rc) + self.__disconnect() - - - return rc # pings the server on the RPC channel - @__verify_connected + @__api_check(True) def ping(self): rc = self.__ping() - self.logger.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], + self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], self.connection_info['sync_port'])) if not rc: @@ -1199,25 +1287,31 @@ class CTRexStatelessClient(object): # reset the server by performing # force acquire, stop, and remove all streams - @__verify_connected + @__api_check(True) def reset(self): + self.logger.pre_cmd("Force acquiring all ports:") rc = self.__acquire(force = True) - self.logger.annotate(rc, "Force acquiring all ports:") + self.logger.post_cmd(rc) + if not rc: raise STLError(rc) # force stop all ports + self.logger.pre_cmd("Stop traffic on all ports:") rc = self.__stop_traffic(self.get_all_ports(), True) - self.logger.annotate(rc,"Stop traffic on all ports:") + self.logger.post_cmd(rc) + if not rc: raise STLError(rc) # remove all streams + self.logger.pre_cmd("Removing all streams from all ports:") rc = self.__remove_all_streams(self.get_all_ports()) - self.logger.annotate(rc,"Removing all streams from all ports:") + self.logger.post_cmd(rc) + if not rc: raise STLError(rc) @@ -1226,7 +1320,7 @@ class CTRexStatelessClient(object): # start cmd - @__verify_connected + @__api_check(True) def start (self, profiles, ports = None, @@ -1246,7 +1340,6 @@ class CTRexStatelessClient(object): if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = False, @@ -1272,21 +1365,20 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - # dry run if dry: self.logger.log(format_text("\n*** DRY RUN ***", 'bold')) # call private method to start + rc = self.__start(ports, stream_list[0], mult_obj, force, duration, dry) if not rc: raise STLError(rc) - return rc # stop traffic on ports - @__verify_connected + @__api_check(True) def stop (self, ports = None): # by default use all ports if ports == None: @@ -1301,11 +1393,10 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - return rc # update traffic - @__verify_connected + @__api_check(True) def update (self, ports = None, mult = "1", total = False): # by default use all ports @@ -1339,7 +1430,7 @@ class CTRexStatelessClient(object): # pause traffic on ports - @__verify_connected + @__api_check(True) def pause (self, ports = None): # by default use all ports if ports == None: @@ -1358,7 +1449,7 @@ class CTRexStatelessClient(object): # resume traffic on ports - @__verify_connected + @__api_check(True) def resume (self, ports = None): # by default use all ports if ports == None: @@ -1377,7 +1468,8 @@ class CTRexStatelessClient(object): # clear stats - def clear_stats (self, ports = None): + @__api_check(False) + def clear_stats (self, ports = None, clear_global = True): # by default use all ports if ports == None: ports = self.get_all_ports() @@ -1387,11 +1479,64 @@ class CTRexStatelessClient(object): if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - rc = self.__clear_stats(ports) + # verify clear global + if not type(clear_global) is bool: + raise STLArgumentError('clear_global', clear_global) + + + rc = self.__clear_stats(ports, clear_global) if not rc: raise STLError(rc) + # get stats + @__api_check(False) + def get_stats (self, ports = None, async_barrier = True): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # check async barrier + if not type(async_barrier) is bool: + raise STLArgumentError('async_barrier', async_barrier) + + + # if the user requested a barrier - use it + if async_barrier: + rc = self.async_client.barrier() + if not rc: + raise STLError(rc) + + return self.__get_stats(ports) + + + # wait while traffic is on, on timeout throw STLTimeoutError + @__api_check(True) + def wait_on_traffic (self, ports = None, timeout = 60): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + expr = time.time() + timeout + + # wait while any of the required ports are active + while set(self.get_active_ports()).intersection(ports): + time.sleep(0.01) + if time.time() > expr: + raise STLTimeoutError(timeout) + + + ############################ Line ############################# ############################ Commands ############################# @@ -1406,7 +1551,7 @@ class CTRexStatelessClient(object): try: rc = f(*args) except STLError as e: - client.logger.log(format_text("\n" + e.brief() + "\n", 'bold')) + client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) return # don't want to print on error @@ -1567,19 +1712,19 @@ class CTRexStatelessClient(object): @__console - def show_stats_line (self, line): + def print_formatted_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "stats", - self.cmd_stats_line.__doc__, + self.show_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.STATS_MASK) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return None # determine stats mask mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) @@ -1587,7 +1732,9 @@ class CTRexStatelessClient(object): # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS - stats = self.cmd_stats(opts.ports, mask) + + self.print_formatted_stats() + stats = self.get_stats(opts.ports, mask) # print stats to screen for stat_type, stat_data in stats.iteritems(): diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index e08f5d69..05a32bc4 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -204,9 +204,6 @@ class JsonRpcClient(object): # Socket to talk to server self.transport = "tcp://{0}:{1}".format(self.server, self.port) - msg = "\nConnecting To RPC Server On {0}".format(self.transport) - self.logger.log(msg) - self.socket = self.context.socket(zmq.REQ) try: self.socket.connect(self.transport) diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py index a6add4ac..3d6ece7c 100755 --- a/scripts/automation/trex_control_plane/common/trex_stats.py +++ b/scripts/automation/trex_control_plane/common/trex_stats.py @@ -314,6 +314,10 @@ class CTRexStats(object): self.last_update_ts = time.time() + def get_stats (self): + # copy and return + return dict(self.latest_stats) + def clear_stats(self): self.reference_stats = self.latest_stats diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py index aada5bfc..a7ddacea 100644 --- a/scripts/automation/trex_control_plane/common/trex_types.py +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -46,7 +46,11 @@ class RC(): return (e if len(e) != 1 else e[0]) def __str__ (self): - return str(self.data()) if self else str(self.err()) + s = "" + for x in self.rc_list: + if x.data: + s += format_text("\n{0}".format(x.data), 'bold') + return s def prn_func (self, msg, newline = True): if newline: diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 8f070959..3ecbca06 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -29,7 +29,7 @@ import sys import tty, termios import trex_root_path from common.trex_streams import * -from client.trex_stateless_client import CTRexStatelessClient, LoggerApi +from client.trex_stateless_client import CTRexStatelessClient, LoggerApi, STLError from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts @@ -209,7 +209,7 @@ class TRexConsole(TRexGeneralCmd): print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold') return - if inst.stateless_client.is_read_only(): + if inst.stateless_client.is_all_ports_acquired(): print format_text("\n'{0}' cannot be executed on read only mode\n".format(func_name), 'bold') return @@ -253,7 +253,7 @@ class TRexConsole(TRexGeneralCmd): self.supported_rpc = None return stop - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): self.prompt = "TRex (read only) > " return stop @@ -670,25 +670,41 @@ def main(): # Stateless client connection logger = ConsoleLogger() - stateless_client = CTRexStatelessClient(options.user, - options.server, - options.port, - options.pub, - verbose_level, - logger) + stateless_client = CTRexStatelessClient(username = options.user, + server = options.server, + sync_port = options.port, + async_port = options.pub, + verbose_level = verbose_level, + logger = logger) # TUI or no acquire will give us READ ONLY mode - if options.tui or not options.acquire: - rc = stateless_client.connect("RO") - else: - rc = stateless_client.connect("RW") - - # unable to connect - bye - if not rc: - rc.annotate() + try: + stateless_client.connect("RO") + except STLError as e: + logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) return + if not options.tui and options.acquire: + try: + stateless_client.acquire() + except STLError as e: + logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) + logger.log(format_text("\nSwitching to read only mode - only few commands will be available", 'bold')) + + # if options.tui or not options.acquire: + # rc = stateless_client.connect("RO") + # else: + # try: + # rc = stateless_client.connect("RW") + # except STLError as e: + # logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) + # + # with logger.supress(): + # rc = stateless_client.connect("RO") + + + # a script mode if options.batch: cont = stateless_client.run_script_file(options.batch[0]) diff --git a/scripts/automation/trex_control_plane/console/trex_tui.py b/scripts/automation/trex_control_plane/console/trex_tui.py index dbbac02b..9e66a984 100644 --- a/scripts/automation/trex_control_plane/console/trex_tui.py +++ b/scripts/automation/trex_control_plane/console/trex_tui.py @@ -71,8 +71,7 @@ class TrexTUIDashBoard(TrexTUIPanel): allowed['c'] = self.key_actions['c'] - # thats it for read only - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): return allowed if len(self.stateless_client.get_transmitting_ports()) > 0: @@ -179,8 +178,7 @@ class TrexTUIPort(TrexTUIPanel): allowed['c'] = self.key_actions['c'] - # thats it for read only - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): return allowed if self.port.state == self.port.STATE_TX: diff --git a/scripts/stl_test_example.py b/scripts/stl_test_example.py index 9a296bec..5b36a9f6 100644 --- a/scripts/stl_test_example.py +++ b/scripts/stl_test_example.py @@ -10,15 +10,19 @@ from trex_stateless_client import CTRexStatelessClient, STLError c = CTRexStatelessClient() try: - c.connect() - c.stop() + for i in xrange(0, 100): + c.connect("RO") + c.disconnect() + + # + #c.stop() #before_ipackets = x.get_stats().get_rel('ipackets') - c.start(profiles = 'stl/imix_3pkt.yaml', ports = [0,1], mult = "1gbps") + #c.start(profiles = 'stl/imix_3pkt.yaml', ports = [0,1], mult = "1gbps") - for i in xrange(0, 10): - time.sleep(5) - c.update(ports = [0,1], mult = "1gbps+") + #for i in xrange(0, 10): + # time.sleep(5) + # c.update(ports = [0,1], mult = "1gbps+") #c.cmd_wait_on_traffic() #c.stop() @@ -26,5 +30,6 @@ try: except STLError as e: print e finally: - c.teardown() + pass + #c.teardown() -- cgit