diff options
author | 2016-01-24 13:27:11 +0200 | |
---|---|---|
committer | 2016-01-24 13:27:11 +0200 | |
commit | 951dd56abfd78d5669f0f57d840b2fe623ded2cd (patch) | |
tree | 60857060a198512b0d629e04daf63ca2168f09b3 /scripts/automation/trex_control_plane/client | |
parent | 2d27d1df02328d7148ac1c4ed029ecdaf1853c1e (diff) | |
parent | 6f4a51c126b7a78ee8e37d396ed2b61b05fa506c (diff) |
Merge from origin
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
4 files changed, 1346 insertions, 753 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 66e65a32..ef4c48f9 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -16,6 +16,7 @@ import time import datetime import zmq import re +import random from common.trex_stats import * from common.trex_streams import * @@ -143,18 +144,22 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client, prn_func = None): + def __init__ (self, server, port, stateless_client): self.port = port self.server = server + self.stateless_client = stateless_client - self.prn_func = prn_func + + self.event_handler = stateless_client.event_handler + self.logger = self.stateless_client.logger self.raw_snapshot = {} self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 + self.async_barrier = None self.connected = False @@ -166,13 +171,6 @@ class CTRexAsyncClient(): self.tr = "tcp://{0}:{1}".format(self.server, self.port) - msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) - - if self.prn_func: - self.prn_func(msg) - else: - print msg - # Socket to talk to server self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) @@ -188,17 +186,15 @@ class CTRexAsyncClient(): self.connected = True - - # wait for data streaming from the server - timeout = time.time() + 5 - while not self.is_alive(): - time.sleep(0.01) - if time.time() > timeout: - self.disconnect() - return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + rc = self.barrier() + if not rc: + self.disconnect() + return rc return RC_OK() + + # disconnect def disconnect (self): @@ -215,14 +211,14 @@ class CTRexAsyncClient(): # done self.connected = False + # thread function def _run (self): - # socket must be created on the same thread - self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.RCVTIMEO, 5000) + self.socket.connect(self.tr) got_data = False @@ -234,7 +230,7 @@ class CTRexAsyncClient(): # signal once if not got_data: - self.stateless_client.on_async_alive() + self.event_handler.on_async_alive() got_data = True @@ -243,7 +239,7 @@ class CTRexAsyncClient(): # signal once if got_data: - self.stateless_client.on_async_dead() + self.event_handler.on_async_dead() got_data = False continue @@ -283,11 +279,52 @@ class CTRexAsyncClient(): def __dispatch (self, name, type, data): # stats if name == "trex-global": - self.stateless_client.handle_async_stats_update(data) + self.event_handler.handle_async_stats_update(data) + # events elif name == "trex-event": - self.stateless_client.handle_async_event(type, data) + self.event_handler.handle_async_event(type, data) + + # barriers + elif name == "trex-barrier": + self.handle_async_barrier(type, data) else: pass + # async barrier handling routine + def handle_async_barrier (self, type, data): + if self.async_barrier['key'] == type: + self.async_barrier['ack'] = True + + + # block on barrier for async channel + def barrier(self, timeout = 5): + + # set a random key + key = random.getrandbits(32) + self.async_barrier = {'key': key, 'ack': False} + + # expr time + expr = time.time() + timeout + + while not self.async_barrier['ack']: + + # inject + rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + # fast loop + for i in xrange(0, 100): + if self.async_barrier['ack']: + break + time.sleep(0.001) + + if time.time() > expr: + return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) + + return RC_OK() + + + diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py index 848d5a9e..c25c73cb 100755 --- a/scripts/automation/trex_control_plane/client/trex_hltapi.py +++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py @@ -2,7 +2,7 @@ import trex_root_path from client_utils.packet_builder import CTRexPktBuilder -from trex_stateless_client import CTRexStatelessClient +from trex_stateless_client import STLClient from common.trex_streams import * from client_utils.general_utils import id_count_gen import dpkt @@ -20,7 +20,7 @@ class CTRexHltApi(object): # sync = RPC, async = ZMQ def connect(self, device, port_list, username, sync_port = 4501, async_port = 4500, reset=False, break_locks=False): ret_dict = {"status": 0} - self.trex_client = CTRexStatelessClient(username, device, sync_port, async_port) + self.trex_client = STLClient(username, device, sync_port, async_port) rc = self.trex_client.connect() if rc.bad(): diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py index 66d87f9d..94240f2a 100644 --- a/scripts/automation/trex_control_plane/client/trex_port.py +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -56,7 +56,7 @@ class Port(object): def err(self, msg): return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - def ok(self, data = "ACK"): + def ok(self, data = ""): return RC_OK(data) def get_speed_bps (self): @@ -198,6 +198,9 @@ class Port(object): # remove stream from port def remove_stream (self, stream_id): + if not self.is_acquired(): + return self.err("port is not owned") + if not stream_id in self.streams: return self.err("stream {0} does not exists".format(stream_id)) @@ -219,6 +222,9 @@ class Port(object): # remove all the streams def remove_all_streams (self): + if not self.is_acquired(): + return self.err("port is not owned") + params = {"handler": self.handler, "port_id": self.port_id} @@ -244,6 +250,10 @@ class Port(object): # start traffic def start (self, mul, duration): + + if not self.is_acquired(): + return self.err("port is not owned") + if self.state == self.STATE_DOWN: return self.err("Unable to start traffic - port is down") @@ -270,8 +280,15 @@ class Port(object): # with force ignores the cached state and sends the command def stop (self, force = False): - if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): - return self.err("port is not transmitting") + if not self.is_acquired(): + return self.err("port is not owned") + + # port is already stopped + if not force: + if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS): + return self.ok() + + params = {"handler": self.handler, "port_id": self.port_id} @@ -287,6 +304,9 @@ class Port(object): def pause (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -305,6 +325,9 @@ class Port(object): def resume (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_PAUSE) : return self.err("port is not in pause mode") @@ -322,6 +345,10 @@ class Port(object): def update (self, mul): + + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -338,6 +365,9 @@ class Port(object): def validate (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state == self.STATE_DOWN): return self.err("port is down") @@ -413,6 +443,11 @@ class Port(object): def clear_stats(self): return self.port_stats.clear_stats() + + def get_stats (self): + return self.port_stats.get_stats() + + def invalidate_stats(self): return self.port_stats.invalidate() diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index dc39bee6..c1a4d1d1 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -8,6 +8,7 @@ except ImportError: import client.outer_packages from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage +from client_utils import general_utils from client_utils.packet_builder import CTRexPktBuilder import json @@ -22,86 +23,185 @@ import re import random from trex_port import Port from common.trex_types import * - from trex_async_client import CTRexAsyncClient +# basic error for API +class STLError(Exception): + def __init__ (self, msg): + self.msg = str(msg) + + def __str__ (self): + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + + + s = "\n******\n" + s += "Error at {0}:{1}\n\n".format(format_text(fname, 'bold'), format_text(exc_tb.tb_lineno), 'bold') + s += "specific error:\n\n{0}\n".format(format_text(self.msg, 'bold')) + + return s + + def brief (self): + return self.msg + + +# raised when the client state is invalid for operation +class STLStateError(STLError): + def __init__ (self, op, state): + self.msg = "Operation '{0}' is not valid while '{1}'".format(op, state) + + +# port state error +class STLPortStateError(STLError): + def __init__ (self, port, op, state): + self.msg = "Operation '{0}' on port '{1}' is not valid for state '{2}'".format(op, port, state) + + +# raised when argument is not valid for operation +class STLArgumentError(STLError): + def __init__ (self, name, got, valid_values = None, extended = None): + self.msg = "Argument: '{0}' invalid value: '{1}'".format(name, got) + if valid_values: + self.msg += " - valid values are '{0}'".format(valid_values) + + if extended: + self.msg += "\n{0}".format(extended) -class CTRexStatelessClient(object): - """docstring for CTRexStatelessClient""" +# raised when timeout occurs +class STLTimeoutError(STLError): + def __init__ (self, timeout): + self.msg = "Timeout: operation took more than '{0}' seconds".format(timeout) + + +############################ logger ############################# +############################ ############################# +############################ ############################# + +# logger API for the client +class LoggerApi(object): # verbose levels - VERBOSE_QUIET = 0 + VERBOSE_QUIET = 0 VERBOSE_REGULAR = 1 VERBOSE_HIGH = 2 - - def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, quiet = False, virtual = False): - super(CTRexStatelessClient, self).__init__() - self.user = username + def __init__(self): + self.level = LoggerApi.VERBOSE_REGULAR + + # implemented by specific logger + def write(self, msg, newline = True): + raise Exception("implement this") + + # implemented by specific logger + def flush(self): + raise Exception("implement this") + + def set_verbose (self, level): + if not level in xrange(self.VERBOSE_QUIET, self.VERBOSE_HIGH + 1): + raise ValueError("bad value provided for logger") + + self.level = level + + def get_verbose (self): + return self.level + + + def check_verbose (self, level): + return (self.level >= level) + + + # simple log message with verbose + def log (self, msg, level = VERBOSE_REGULAR, newline = True): + if not self.check_verbose(level): + return + + self.write(msg, newline) + + # logging that comes from async event + def async_log (self, msg, level = VERBOSE_REGULAR, newline = True): + self.log(msg, level, newline) - self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func) - # default verbose level - if not quiet: - self.verbose = self.VERBOSE_REGULAR + def pre_cmd (self, desc): + self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) + self.flush() + + def post_cmd (self, rc): + if rc: + self.log(format_text("[SUCCESS]\n", 'green', 'bold')) else: - self.verbose = self.VERBOSE_QUIET + self.log(format_text("[FAILED]\n", 'red', 'bold')) - self.ports = {} - self._connection_info = {"server": server, - "sync_port": sync_port, - "async_port": async_port} - self.system_info = {} - self.server_version = {} - self.__err_log = None - self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func) + def log_cmd (self, desc): + self.pre_cmd(desc) + self.post_cmd(True) - self.streams_db = CStreamsDB() - self.global_stats = trex_stats.CGlobalStats(self._connection_info, - self.server_version, - self.ports) - self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, - self.ports) - self.events = [] + # supress object getter + def supress (self): + class Supress(object): + def __init__ (self, logger): + self.logger = logger - self.session_id = random.getrandbits(32) - self.read_only = False - self.connected = False - self.prompt_redraw_cb = None + def __enter__ (self): + self.saved_level = self.logger.get_verbose() + self.logger.set_verbose(LoggerApi.VERBOSE_QUIET) + def __exit__ (self, type, value, traceback): + self.logger.set_verbose(self.saved_level) - # returns the port object - def get_port (self, port_id): - return self.ports.get(port_id, None) + return Supress(self) - # connection server ip - def get_server_ip (self): - return self.comm_link.get_server() - # connection server port - def get_server_port (self): - return self.comm_link.get_port() +# default logger - to stdout +class DefaultLogger(LoggerApi): + def write (self, msg, newline = True): + if newline: + print msg + else: + print msg, + def flush (self): + sys.stdout.flush() - ################# events handler ###################### - def add_event_log (self, msg, ev_type, show = False): - if ev_type == "server": - prefix = "[server]" - elif ev_type == "local": - prefix = "[local]" +############################ async event hander ############################# +############################ ############################# +############################ ############################# - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) +# handles different async events given to the client +class AsyncEventHandler(object): - if show: - self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))), redraw_console = True) - + def __init__ (self, client): + self.client = client + self.logger = self.client.logger + + self.events = [] + # public functions + + def get_events (self): + return self.events + + + def clear_events (self): + self.events = [] + + + def on_async_dead (self): + if self.client.connected: + msg = 'lost connection to server' + self.__add_event_log(msg, 'local', True) + self.client.connected = False + + + def on_async_alive (self): + pass + + + # handles an async stats update from the subscriber def handle_async_stats_update(self, dump_data): global_stats = {} port_stats = {} @@ -113,7 +213,7 @@ class CTRexStatelessClient(object): if m: port_id = int(m.group(2)) field_name = m.group(1) - if self.ports.has_key(port_id): + if self.client.ports.has_key(port_id): if not port_id in port_stats: port_stats[port_id] = {} port_stats[port_id][field_name] = value @@ -124,13 +224,14 @@ class CTRexStatelessClient(object): global_stats[key] = value # update the general object with the snapshot - self.global_stats.update(global_stats) + self.client.global_stats.update(global_stats) + # update all ports for port_id, data in port_stats.iteritems(): - self.ports[port_id].port_stats.update(data) - + self.client.ports[port_id].port_stats.update(data) + # dispatcher for server async events (port started, port stopped and etc.) def handle_async_event (self, type, data): # DP stopped @@ -140,7 +241,7 @@ class CTRexStatelessClient(object): if (type == 0): port_id = int(data['port_id']) ev = "Port {0} has started".format(port_id) - self.async_event_port_started(port_id) + self.__async_event_port_started(port_id) # port stopped elif (type == 1): @@ -148,8 +249,8 @@ class CTRexStatelessClient(object): ev = "Port {0} has stopped".format(port_id) # call the handler - self.async_event_port_stopped(port_id) - + self.__async_event_port_stopped(port_id) + # port paused elif (type == 2): @@ -157,7 +258,7 @@ class CTRexStatelessClient(object): ev = "Port {0} has paused".format(port_id) # call the handler - self.async_event_port_paused(port_id) + self.__async_event_port_paused(port_id) # port resumed elif (type == 3): @@ -165,7 +266,7 @@ class CTRexStatelessClient(object): ev = "Port {0} has resumed".format(port_id) # call the handler - self.async_event_port_resumed(port_id) + self.__async_event_port_resumed(port_id) # port finished traffic elif (type == 4): @@ -173,7 +274,7 @@ class CTRexStatelessClient(object): ev = "Port {0} job done".format(port_id) # call the handler - self.async_event_port_stopped(port_id) + self.__async_event_port_stopped(port_id) show_event = True # port was stolen... @@ -181,7 +282,7 @@ class CTRexStatelessClient(object): session_id = data['session_id'] # false alarm, its us - if session_id == self.session_id: + if session_id == self.client.session_id: return port_id = int(data['port_id']) @@ -190,13 +291,13 @@ class CTRexStatelessClient(object): ev = "Port {0} was forcely taken by '{1}'".format(port_id, who) # call the handler - self.async_event_port_forced_acquired(port_id) + self.__async_event_port_forced_acquired(port_id) show_event = True # server stopped elif (type == 100): ev = "Server has stopped" - self.async_event_server_stopped() + self.__async_event_server_stopped() show_event = True @@ -205,338 +306,242 @@ class CTRexStatelessClient(object): return - self.add_event_log(ev, 'server', show_event) - - - def async_event_port_stopped (self, port_id): - self.ports[port_id].async_event_port_stopped() - - - def async_event_port_started (self, port_id): - self.ports[port_id].async_event_port_started() - - - def async_event_port_paused (self, port_id): - self.ports[port_id].async_event_port_paused() - + self.__add_event_log(ev, 'server', show_event) - def async_event_port_resumed (self, port_id): - self.ports[port_id].async_event_port_resumed() + # private functions - def async_event_port_forced_acquired (self, port_id): - self.ports[port_id].async_event_forced_acquired() - self.read_only = True + def __async_event_port_stopped (self, port_id): + self.client.ports[port_id].async_event_port_stopped() - def async_event_server_stopped (self): - self.connected = False + def __async_event_port_started (self, port_id): + self.client.ports[port_id].async_event_port_started() - def get_events (self): - return self.events - def clear_events (self): - self.events = [] + def __async_event_port_paused (self, port_id): + self.client.ports[port_id].async_event_port_paused() - ############# helper functions section ############## - # measure time for functions - def timing(f): - def wrap(*args): - time1 = time.time() - ret = f(*args) + def __async_event_port_resumed (self, port_id): + self.client.ports[port_id].async_event_port_resumed() - # don't want to print on error - if ret.bad(): - return ret - delta = time.time() - time1 - print format_time(delta) + "\n" + def __async_event_port_forced_acquired (self, port_id): + self.client.ports[port_id].async_event_forced_acquired() - return ret - return wrap + def __async_event_server_stopped (self): + self.client.connected = False - def validate_port_list(self, port_id_list): - if not isinstance(port_id_list, list): - print type(port_id_list) - return False - - # check each item of the sequence - return all([ (port_id >= 0) and (port_id < self.get_port_count()) - for port_id in port_id_list ]) - - # some preprocessing for port argument - def __ports (self, port_id_list): - - # none means all - if port_id_list == None: - return range(0, self.get_port_count()) - - # always list - if isinstance(port_id_list, int): - port_id_list = [port_id_list] - - if not isinstance(port_id_list, list): - raise ValueError("bad port id list: {0}".format(port_id_list)) - - for port_id in port_id_list: - if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): - raise ValueError("bad port id {0}".format(port_id)) - - return port_id_list - - ############ boot up section ################ - - # connection sequence - - # mode can be RW - read / write, RWF - read write with force , RO - read only - def connect(self, mode = "RW"): - - if self.is_connected(): - self.disconnect() - - # clear this flag - self.connected = False - - # connect sync channel - rc = self.comm_link.connect() - if rc.bad(): - return rc - - # connect async channel - rc = self.async_client.connect() - if rc.bad(): - return rc - - # version - rc = self.transmit("get_version") - if rc.bad(): - return rc - - self.server_version = rc.data() - self.global_stats.server_version = rc.data() - - # cache system info - rc = self.transmit("get_system_info") - if rc.bad(): - return rc - - self.system_info = rc.data() - - # cache supported commands - rc = self.transmit("get_supported_cmds") - if rc.bad(): - return rc - - self.supported_cmds = rc.data() - - # create ports - for port_id in xrange(self.get_port_count()): - speed = self.system_info['ports'][port_id]['speed'] - driver = self.system_info['ports'][port_id]['driver'] - - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id) + # add event to log + def __add_event_log (self, msg, ev_type, show = False): + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" - # sync the ports - rc = self.sync_ports() - if rc.bad(): - return rc + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) - # acquire all ports - if mode == "RW": - rc = self.acquire(force = False) + if show: + self.logger.async_log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) - # fallback to read only if failed - if rc.bad(): - rc.annotate(show_status = False) - print format_text("Switching to read only mode - only few commands will be available", 'bold') - self.release(self.get_acquired_ports()) - self.read_only = True - else: - self.read_only = False + - elif mode == "RWF": - rc = self.acquire(force = True) - if rc.bad(): - return rc - self.read_only = False - elif mode == "RO": - # no acquire on read only - rc = RC_OK() - self.read_only = True +############################ RPC layer ############################# +############################ ############################# +############################ ############################# +class CCommLink(object): + """describes the connectivity of the stateless client method""" + def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): + self.virtual = virtual + self.server = server + self.port = port + self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) - - self.connected = True - return RC_OK() - + @property + def is_connected(self): + if not self.virtual: + return self.rpc_link.connected + else: + return True - def is_read_only (self): - return self.read_only + def get_server (self): + return self.server - def is_connected (self): - return self.connected and self.comm_link.is_connected + def get_port (self): + return self.port + def connect(self): + if not self.virtual: + return self.rpc_link.connect() def disconnect(self): - # release any previous acquired ports - if self.is_connected(): - self.release(self.get_acquired_ports()) - - self.comm_link.disconnect() - self.async_client.disconnect() - - self.connected = False - - return RC_OK() - - - def on_async_dead (self): - if self.connected: - msg = 'lost connection to server' - self.add_event_log(msg, 'local', True) - self.connected = False - - def on_async_alive (self): - pass - - ########### cached queries (no server traffic) ########### + if not self.virtual: + return self.rpc_link.disconnect() - def get_supported_cmds(self): - return self.supported_cmds + def transmit(self, method_name, params={}): + if self.virtual: + self._prompt_virtual_tx_msg() + _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) + print msg + return + else: + return self.rpc_link.invoke_rpc_method(method_name, params) - def get_version(self): - return self.server_version + def transmit_batch(self, batch_list): + if self.virtual: + self._prompt_virtual_tx_msg() + print [msg + for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) + for command in batch_list]] + else: + batch = self.rpc_link.create_batch() + for command in batch_list: + batch.add(command.method, command.params) + # invoke the batch + return batch.invoke() - def get_system_info(self): - return self.system_info + def _prompt_virtual_tx_msg(self): + print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, + port=self.port) - def get_port_count(self): - return self.system_info.get("port_count") - def get_port_ids(self, as_str=False): - port_ids = range(self.get_port_count()) - if as_str: - return " ".join(str(p) for p in port_ids) - else: - return port_ids - def get_stats_async (self): - return self.async_client.get_stats() +############################ client ############################# +############################ ############################# +############################ ############################# - def get_connection_port (self): - return self.comm_link.port +class STLClient(object): + """docstring for STLClient""" - def get_connection_ip (self): - return self.comm_link.server + def __init__(self, + username = general_utils.get_current_user(), + server = "localhost", + sync_port = 4501, + async_port = 4500, + verbose_level = LoggerApi.VERBOSE_QUIET, + logger = None, + virtual = False): - def get_all_ports (self): - return [port_id for port_id, port_obj in self.ports.iteritems()] - def get_acquired_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_acquired()] + self.username = username + + # init objects + self.ports = {} + self.server_version = {} + self.system_info = {} + self.session_id = random.getrandbits(32) + self.connected = False - def get_active_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_active()] + # logger + self.logger = DefaultLogger() if not logger else logger - def get_paused_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_paused()] - - def get_transmitting_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_transmitting()] + # initial verbose + self.logger.set_verbose(verbose_level) - def set_verbose(self, mode): + # low level RPC layer + self.comm_link = CCommLink(server, + sync_port, + virtual, + self.logger) - # on high - enable link verbose - if mode == self.VERBOSE_HIGH: - self.comm_link.set_verbose(True) - else: - self.comm_link.set_verbose(False) + # async event handler manager + self.event_handler = AsyncEventHandler(self) - self.verbose = mode + # async subscriber level + self.async_client = CTRexAsyncClient(server, + async_port, + self) + + - def check_verbose (self, level): - return (self.verbose >= level) + # stats + self.connection_info = {"username": username, + "server": server, + "sync_port": sync_port, + "async_port": async_port, + "virtual": virtual} - def get_verbose (self): - return self.verbose + + self.global_stats = trex_stats.CGlobalStats(self.connection_info, + self.server_version, + self.ports) - def prn_func (self, msg, level = VERBOSE_REGULAR, redraw_console = False): - if not self.check_verbose(level): - return + self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, + self.ports) - if redraw_console and self.prompt_redraw_cb: - print "\n" + msg + "\n" - self.prompt_redraw_cb() - else: - print msg + # stream DB + self.streams_db = CStreamsDB() - sys.stdout.flush() + + + ############# private functions - used by the class itself ########### - def set_prompt_redraw_cb(self, cb): - self.prompt_redraw_cb = cb + # some preprocessing for port argument + def __ports (self, port_id_list): - ############# server actions ################ + # none means all + if port_id_list == None: + return range(0, self.get_port_count()) - # ping server - def ping(self): - return self.transmit("ping") + # always list + if isinstance(port_id_list, int): + port_id_list = [port_id_list] + if not isinstance(port_id_list, list): + raise ValueError("bad port id list: {0}".format(port_id_list)) + for port_id in port_id_list: + if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): + raise ValueError("bad port id {0}".format(port_id)) - def get_global_stats(self): - return self.transmit("get_global_stats") + return port_id_list - ########## port commands ############## - def sync_ports (self, port_id_list = None, force = False): + # sync ports + def __sync_ports (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].sync()) - + return rc # acquire ports, if port_list is none - get all - def acquire (self, port_id_list = None, force = False): + def __acquire (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].acquire(force)) - + return rc - + # release ports - def release (self, port_id_list = None): + def __release (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].release()) - + return rc - - def add_stream(self, stream_id, stream_obj, port_id_list = None): + + def __add_stream(self, stream_id, stream_obj, port_id_list = None): port_id_list = self.__ports(port_id_list) @@ -544,12 +549,12 @@ class CTRexStatelessClient(object): for port_id in port_id_list: rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) - + return rc - - def add_stream_pack(self, stream_pack, port_id_list = None): + + def __add_stream_pack(self, stream_pack, port_id_list = None): port_id_list = self.__ports(port_id_list) @@ -562,45 +567,45 @@ class CTRexStatelessClient(object): - def remove_stream(self, stream_id, port_id_list = None): + def __remove_stream(self, stream_id, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].remove_stream(stream_id)) - + return rc - def remove_all_streams(self, port_id_list = None): + def __remove_all_streams(self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].remove_all_streams()) - + return rc - - def get_stream(self, stream_id, port_id, get_pkt = False): + + def __get_stream(self, stream_id, port_id, get_pkt = False): return self.ports[port_id].get_stream(stream_id) - def get_all_streams(self, port_id, get_pkt = False): + def __get_all_streams(self, port_id, get_pkt = False): return self.ports[port_id].get_all_streams() - def get_stream_id_list(self, port_id): + def __get_stream_id_list(self, port_id): return self.ports[port_id].get_stream_id_list() - def start_traffic (self, multiplier, duration, port_id_list = None): + def __start_traffic (self, multiplier, duration, port_id_list = None): port_id_list = self.__ports(port_id_list) @@ -608,11 +613,11 @@ class CTRexStatelessClient(object): for port_id in port_id_list: rc.add(self.ports[port_id].start(multiplier, duration)) - + return rc - def resume_traffic (self, port_id_list = None, force = False): + def __resume_traffic (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() @@ -622,7 +627,7 @@ class CTRexStatelessClient(object): return rc - def pause_traffic (self, port_id_list = None, force = False): + def __pause_traffic (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() @@ -632,280 +637,937 @@ class CTRexStatelessClient(object): return rc - def stop_traffic (self, port_id_list = None, force = False): + + def __stop_traffic (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].stop(force)) - + return rc - def update_traffic (self, mult, port_id_list = None, force = False): + def __update_traffic (self, mult, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].update(mult)) - + return rc - def validate (self, port_id_list = None): + def __validate_traffic (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].validate()) - + return rc - def get_port_stats(self, port_id=None): - pass - def get_stream_stats(self, port_id=None): - pass + # connect to server + def __connect(self): - def transmit(self, method_name, params={}): - return self.comm_link.transmit(method_name, params) + # first disconnect if already connected + if self.is_connected(): + self.__disconnect() + # clear this flag + self.connected = False - def transmit_batch(self, batch_list): - return self.comm_link.transmit_batch(batch_list) + # connect sync channel + self.logger.pre_cmd("connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port'])) + rc = self.comm_link.connect() + self.logger.post_cmd(rc) - ######################### Console (high level) API ######################### + if not rc: + return rc - @timing - def cmd_ping(self): - rc = self.ping() - rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) - return rc + # connect async channel + self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port'])) + rc = self.async_client.connect() + self.logger.post_cmd(rc) - def cmd_connect(self, mode = "RW"): - rc = self.connect(mode) - rc.annotate() - return rc + if not rc: + return rc - def cmd_disconnect(self): - rc = self.disconnect() - rc.annotate() - return rc + # version + rc = self._transmit("get_version") + if not rc: + return rc - # reset - def cmd_reset(self): - #self.release(self.get_acquired_ports()) + self.server_version = rc.data() + self.global_stats.server_version = rc.data() - rc = self.acquire(force = True) - rc.annotate("Force acquiring all ports:") - if rc.bad(): + # cache system info + rc = self._transmit("get_system_info") + if not rc: return rc + self.system_info = rc.data() - # force stop all ports - rc = self.stop_traffic(self.get_port_ids(), True) - rc.annotate("Stop traffic on all ports:") - if rc.bad(): + # cache supported commands + rc = self._transmit("get_supported_cmds") + if not rc: return rc + self.supported_cmds = rc.data() - # remove all streams - rc = self.remove_all_streams(self.get_port_ids()) - rc.annotate("Removing all streams from all ports:") - if rc.bad(): + # create ports + for port_id in xrange(self.system_info["port_count"]): + speed = self.system_info['ports'][port_id]['speed'] + driver = self.system_info['ports'][port_id]['driver'] + + self.ports[port_id] = Port(port_id, + speed, + driver, + self.username, + self.comm_link, + self.session_id) + + + # sync the ports + rc = self.__sync_ports() + if not rc: return rc - # TODO: clear stats - return RC_OK() + self.connected = True + return RC_OK() - # stop cmd - def cmd_stop (self, port_id_list): - # find the relveant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + # disconenct from server + def __disconnect(self): + # release any previous acquired ports + if self.is_connected(): + self.__release(self.get_acquired_ports()) - if not active_ports: - msg = "No active traffic on provided ports" - print format_text(msg, 'bold') - return RC_ERR(msg) + self.comm_link.disconnect() + self.async_client.disconnect() - rc = self.stop_traffic(active_ports) - rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc + self.connected = False return RC_OK() - # update cmd - def cmd_update (self, port_id_list, mult): - # find the relevant ports + # ping server + def __ping (self): + return self._transmit("ping") + + + # start command + def __start (self, port_id_list, stream_list, mult, force, duration, dry): + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - if not active_ports: - msg = "No active traffic on provided ports" - print format_text(msg, 'bold') - return RC_ERR(msg) + if active_ports: + if not force: + msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) + self.logger.log(format_text(msg, 'bold')) + return RC_ERR(msg) + else: + rc = self.__stop(active_ports) + if not rc: + return rc + - rc = self.update_traffic(mult, active_ports) - rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list)) + self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(port_id_list)) + rc = self.__remove_all_streams(port_id_list) + self.logger.post_cmd(rc) - return rc + if not rc: + return rc + - # clear stats - def cmd_clear(self, port_id_list): + self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) + rc = self.__add_stream_pack(stream_list, port_id_list) + self.logger.post_cmd(rc) - for port_id in port_id_list: - self.ports[port_id].clear_stats() + if not rc: + return rc + + # when not on dry - start the traffic , otherwise validate only + if not dry: - self.global_stats.clear_stats() + self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(port_id_list)) + rc = self.__start_traffic(mult, duration, port_id_list) + self.logger.post_cmd(rc) - return RC_OK() + return rc + else: + self.logger.pre_cmd("Validating traffic profile on port(s) {0}:".format(port_id_list)) + rc = self.__validate(port_id_list) + self.logger.post_cmd(rc) + - def cmd_invalidate (self, port_id_list): - for port_id in port_id_list: - self.ports[port_id].invalidate_stats() + if rc.bad(): + return rc + + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) + + return rc - self.global_stats.invalidate() - return RC_OK() + # stop cmd + def __stop (self, port_id_list): - # pause cmd - def cmd_pause (self, port_id_list): + self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(port_id_list)) + rc = self.__stop_traffic(port_id_list) + self.logger.post_cmd(rc) - # find the relevant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + if not rc: + return rc + + return RC_OK() - if not active_ports: - msg = "No active traffic on provided ports" - print format_text(msg, 'bold') - return RC_ERR(msg) + #update cmd + def __update (self, port_id_list, mult): + + self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(port_id_list)) + rc = self.__update_traffic(mult, port_id_list) + self.logger.post_cmd(rc) - rc = self.pause_traffic(active_ports) - rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list)) return rc + # pause cmd + def __pause (self, port_id_list): + + self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(port_id_list)) + rc = self.__pause_traffic(port_id_list) + self.logger.post_cmd(rc) + + return rc + # resume cmd - def cmd_resume (self, port_id_list): + def __resume (self, port_id_list): - # find the relveant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(port_id_list)) + rc = self.__resume_traffic(port_id_list) + self.logger.post_cmd(rc) + + return rc - if not active_ports: - msg = "No active traffic on porvided ports" - print format_text(msg, 'bold') - return RC_ERR(msg) - rc = self.resume_traffic(active_ports) - rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list)) + # validate port(s) profile + def __validate (self, port_id_list): + self.logger.pre_cmd("Validating streams on port(s) {0}:".format(port_id_list)) + rc = self.__validate_traffic(port_id_list) + self.logger.post_cmd(rc) + return rc - # start cmd - def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): + # clear stats + def __clear_stats(self, port_id_list, clear_global): - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + for port_id in port_id_list: + self.ports[port_id].clear_stats() - if active_ports: - if not force: - msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) - print format_text(msg, 'bold') - return RC_ERR(msg) - else: - rc = self.cmd_stop(active_ports) - if not rc: + if clear_global: + self.global_stats.clear_stats() + + self.logger.pre_cmd("clearing stats on port(s) {0}:".format(port_id_list)) + rc = RC_OK() + self.logger.post_cmd(rc) + + return RC + + + # get stats + def __get_stats (self, port_id_list): + stats = {} + + stats['global'] = self.global_stats.get_stats() + + total = {} + for port_id in port_id_list: + port_stats = self.ports[port_id].get_stats() + stats["port {0}".format(port_id)] = port_stats + + for k, v in port_stats.iteritems(): + if not k in total: + total[k] = v + else: + total[k] += v + + stats['total'] = total + + return stats + + + def __process_profiles (self, profiles, out): + + for profile in (profiles if isinstance(profiles, list) else [profiles]): + # filename + if isinstance(profile, str): + + if not os.path.isfile(profile): + return RC_ERR("file '{0}' does not exists".format(profile)) + + try: + stream_list = self.streams_db.load_yaml_file(profile) + except Exception as e: + rc = RC_ERR(str(e)) return rc + out.append(stream_list) - rc = self.remove_all_streams(port_id_list) - rc.annotate("Removing all streams from port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc + else: + return RC_ERR("unknown profile '{0}'".format(profile)) - rc = self.add_stream_pack(stream_list, port_id_list) - rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) - if rc.bad(): - return rc + return RC_OK() - # when not on dry - start the traffic , otherwise validate only - if not dry: - rc = self.start_traffic(mult, duration, port_id_list) - rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - return rc - else: - rc = self.validate(port_id_list) - rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list)) + ############ functions used by other classes but not users ############## - if rc.bad(): - return rc + def _verify_port_id_list (self, port_id_list): + # check arguments + if not isinstance(port_id_list, list): + return RC_ERR("ports should be an instance of 'list' not {0}".format(type(port_id_list))) - # show a profile on one port for illustration - self.ports[port_id_list[0]].print_profile(mult, duration) + # all ports are valid ports + if not port_id_list or not all([port_id in self.get_all_ports() for port_id in port_id_list]): + return RC_ERR("") - return rc + return RC_OK() + + def _validate_port_list(self, port_id_list): + if not isinstance(port_id_list, list): + return False + # check each item of the sequence + return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list])) - # validate port(s) profile - def cmd_validate (self, port_id_list): - rc = self.validate(port_id_list) - rc.annotate("Validating streams on port(s) {0}:".format(port_id_list)) - return rc + # transmit request on the RPC link + def _transmit(self, method_name, params={}): + return self.comm_link.transmit(method_name, params) + + # transmit batch request on the RPC link + def _transmit_batch(self, batch_list): + return self.comm_link.transmit_batch(batch_list) + # stats - def cmd_stats(self, port_id_list, stats_mask=set()): + def _get_formatted_stats(self, port_id_list, stats_mask=set()): stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) stats_obj = {} for stats_type in stats_opts: stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type)) + return stats_obj - def cmd_streams(self, port_id_list, streams_mask=set()): + def _get_streams(self, port_id_list, streams_mask=set()): streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask) return streams_obj - ############## High Level API With Parser ################ + def _invalidate_stats (self, port_id_list): + for port_id in port_id_list: + self.ports[port_id].invalidate_stats() + + self.global_stats.invalidate() + + return RC_OK() + + + + + + ################################# + # ------ private methods ------ # + @staticmethod + def __get_mask_keys(ok_values={True}, **kwargs): + masked_keys = set() + for key, val in kwargs.iteritems(): + if val in ok_values: + masked_keys.add(key) + return masked_keys + + @staticmethod + def __filter_namespace_args(namespace, ok_values): + return {k: v for k, v in namespace.__dict__.items() if k in ok_values} + + + # API decorator - double wrap because of argument + def __api_check(connected = True): + + def wrap (f): + def wrap2(*args, **kwargs): + client = args[0] + + func_name = f.__name__ + + # check connection + if connected and not client.is_connected(): + raise STLStateError(func_name, 'disconnected') + + ret = f(*args, **kwargs) + return ret + return wrap2 + + return wrap + + + + ############################ API ############################# + ############################ ############################# + ############################ ############################# + def __enter__ (self): + self.connect(mode = "RWF") + self.reset() + return self + + def __exit__ (self, type, value, traceback): + if self.get_active_ports(): + self.stop(self.get_active_ports()) + self.disconnect() + + ############################ Getters ############################# + ############################ ############################# + ############################ ############################# + + + # return verbose level of the logger + def get_verbose (self): + return self.logger.get_verbose() + + # is the client on read only mode ? + def is_all_ports_acquired (self): + return not (self.get_all_ports() == self.get_acquired_ports()) + + # is the client connected ? + def is_connected (self): + return self.connected and self.comm_link.is_connected + + + # get connection info + def get_connection_info (self): + return self.connection_info + + + # get supported commands by the server + def get_server_supported_cmds(self): + return self.supported_cmds + + # get server version + def get_server_version(self): + return self.server_version + + # get server system info + def get_server_system_info(self): + return self.system_info + + # get port count + def get_port_count(self): + return len(self.ports) + + + # returns the port object + def get_port (self, port_id): + port = self.ports.get(port_id, None) + if (port != None): + return port + else: + raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports()) + + + # get all ports as IDs + def get_all_ports (self): + return self.ports.keys() + + # get all acquired ports + def get_acquired_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_acquired()] + + # get all active ports (TX or pause) + def get_active_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_active()] + + # get paused ports + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + # get all TX ports + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + + + # get stats + def get_stats (self, ports = None, async_barrier = True): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # check async barrier + if not type(async_barrier) is bool: + raise STLArgumentError('async_barrier', async_barrier) + + + # if the user requested a barrier - use it + if async_barrier: + rc = self.async_client.barrier() + if not rc: + raise STLError(rc) + + return self.__get_stats(ports) + + # return all async events + def get_events (self): + return self.event_handler.get_events() + + ############################ Commands ############################# + ############################ ############################# + ############################ ############################# + + + # set the log on verbose level + def set_verbose (self, level): + self.logger.set_verbose(level) + + + # connects to the server + # mode can be: + # 'RO' - read only + # 'RW' - read/write + # 'RWF' - read write forced (take ownership) + @__api_check(False) + def connect (self, mode = "RW"): + modes = ['RO', 'RW', 'RWF'] + if not mode in modes: + raise STLArgumentError('mode', mode, modes) + + rc = self.__connect() + if not rc: + raise STLError(rc) + + # acquire all ports for 'RW' or 'RWF' + if (mode == "RW") or (mode == "RWF"): + self.acquire(ports = self.get_all_ports(), force = True if mode == "RWF" else False) + + + + + # acquire ports + # this is not needed if connect was called with "RW" or "RWF" + # but for "RO" this might be needed + @__api_check(True) + def acquire (self, ports = None, force = False): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify ports + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify valid port id list + if force: + self.logger.pre_cmd("Force acquiring ports {0}:".format(ports)) + else: + self.logger.pre_cmd("Acquiring ports {0}:".format(ports)) + + rc = self.__acquire(ports, force) + + self.logger.post_cmd(rc) + + if not rc: + self.__release(ports) + raise STLError(rc) + + + + # force connect syntatic sugar + @__api_check(False) + def fconnect (self): + self.connect(mode = "RWF") + + + # disconnects from the server + @__api_check(False) + def disconnect (self, log = True): + rc = self.__disconnect() + if log: + self.logger.log_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], + self.connection_info['sync_port'])) + if not rc: + raise STLError(rc) + + + + # teardown - call after test is done + # NEVER throws an exception + @__api_check(False) + def teardown (self, stop_traffic = True): + + # try to stop traffic + if stop_traffic and self.get_active_ports(): + try: + self.stop() + except STLError: + pass + + # disconnect + self.__disconnect() + + + + # pings the server on the RPC channel + @__api_check(True) + def ping(self): + self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], + self.connection_info['sync_port'])) + rc = self.__ping() + + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + + + # reset the server by performing + # force acquire, stop, and remove all streams + @__api_check(True) + def reset(self): + + self.logger.pre_cmd("Force acquiring all ports:") + rc = self.__acquire(force = True) + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + + # force stop all ports + self.logger.pre_cmd("Stop traffic on all ports:") + rc = self.__stop_traffic(self.get_all_ports(), True) + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + + # remove all streams + self.logger.pre_cmd("Removing all streams from all ports:") + rc = self.__remove_all_streams(self.get_all_ports()) + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + self.clear_stats() + + + # start cmd + @__api_check(True) + def start (self, + profiles, + ports = None, + mult = "1", + force = False, + duration = -1, + dry = False, + total = False): + + + # by default use all ports + if ports == None: + ports = self.get_acquired_ports() - def cmd_connect_line (self, line): + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify multiplier + mult_obj = parsing_opts.decode_multiplier(mult, + allow_update = False, + divide_count = len(ports) if total else 1) + if not mult_obj: + raise STLArgumentError('mult', mult) + + # some type checkings + + if not type(force) is bool: + raise STLArgumentError('force', force) + + if not isinstance(duration, (int, float)): + raise STLArgumentError('duration', duration) + + if not type(total) is bool: + raise STLArgumentError('total', total) + + + # process profiles + stream_list = [] + rc = self.__process_profiles(profiles, stream_list) + if not rc: + raise STLError(rc) + + # dry run + if dry: + self.logger.log(format_text("\n*** DRY RUN ***", 'bold')) + + # call private method to start + + rc = self.__start(ports, stream_list[0], mult_obj, force, duration, dry) + if not rc: + raise STLError(rc) + + + + # stop traffic on ports + @__api_check(True) + def stop (self, ports = None): + + # by default the user means all the active ports + if ports == None: + ports = self.get_active_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__stop(ports) + if not rc: + raise STLError(rc) + + + + # update traffic + @__api_check(True) + def update (self, ports = None, mult = "1", total = False): + + # by default the user means all the active ports + if ports == None: + ports = self.get_active_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify multiplier + mult_obj = parsing_opts.decode_multiplier(mult, + allow_update = True, + divide_count = len(ports) if total else 1) + if not mult_obj: + raise STLArgumentError('mult', mult) + + # verify total + if not type(total) is bool: + raise STLArgumentError('total', total) + + + # call low level functions + rc = self.__update(ports, mult_obj) + if not rc: + raise STLError(rc) + + + + # pause traffic on ports + @__api_check(True) + def pause (self, ports = None): + + # by default the user means all the TX ports + if ports == None: + ports = self.get_transmitting_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__pause(ports) + if not rc: + raise STLError(rc) + + + + # resume traffic on ports + @__api_check(True) + def resume (self, ports = None): + + # by default the user means all the paused ports + if ports == None: + ports = self.get_paused_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__resume(ports) + if not rc: + raise STLError(rc) + + + @__api_check(True) + def validate (self, ports = None): + if ports == None: + ports = self.get_acquired_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__validate(ports) + if not rc: + raise STLError(rc) + + + # clear stats + @__api_check(False) + def clear_stats (self, ports = None, clear_global = True): + + # by default use all ports + if ports == None: + ports = self.get_acquired_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify clear global + if not type(clear_global) is bool: + raise STLArgumentError('clear_global', clear_global) + + + rc = self.__clear_stats(ports, clear_global) + if not rc: + raise STLError(rc) + + + + + + # wait while traffic is on, on timeout throw STLTimeoutError + @__api_check(True) + def wait_on_traffic (self, ports = None, timeout = 60): + + # by default use all acquired ports + if ports == None: + ports = self.get_acquired_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + expr = time.time() + timeout + + # wait while any of the required ports are active + while set(self.get_active_ports()).intersection(ports): + time.sleep(0.01) + if time.time() > expr: + raise STLTimeoutError(timeout) + + + # clear all async events + def clear_events (self): + self.event_handler.clear_events() + + ############################ Line ############################# + ############################ Commands ############################# + ############################ ############################# + # console decorator + def __console(f): + def wrap(*args): + client = args[0] + + time1 = time.time() + + try: + rc = f(*args) + except STLError as e: + client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) + return + + # if got true - print time + if rc: + delta = time.time() - time1 + client.logger.log(format_time(delta) + "\n") + + + return wrap + + + @__console + def connect_line (self, line): '''Connects to the TRex server''' # define a parser parser = parsing_opts.gen_parser(self, "connect", - self.cmd_connect_line.__doc__, + self.connect_line.__doc__, parsing_opts.FORCE) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return - if opts.force: - rc = self.cmd_connect(mode = "RWF") - else: - rc = self.cmd_connect(mode = "RW") + # call the API + self.connect("RWF" if opts.force else "RW") - @timing - def cmd_start_line (self, line): + # true means print time + return True + + @__console + def disconnect_line (self, line): + self.disconnect() + + + + @__console + def reset_line (self, line): + self.reset() + + # true means print time + return True + + + @__console + def start_line (self, line): '''Start selected traffic in specified ports on TRex\n''' # define a parser parser = parsing_opts.gen_parser(self, "start", - self.cmd_start_line.__doc__, + self.start_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.TOTAL, parsing_opts.FORCE, @@ -918,361 +1580,220 @@ class CTRexStatelessClient(object): if opts is None: - return RC_ERR("bad command line parameters") - - - if opts.dry: - print format_text("\n*** DRY RUN ***", 'bold') - - if opts.db: - stream_list = self.streams_db.get_stream_pack(opts.db) - rc = RC(stream_list != None) - rc.annotate("Load stream pack (from DB):") - if rc.bad(): - return RC_ERR("Failed to load stream pack") + return - else: - # load streams from file - stream_list = None - try: - stream_list = self.streams_db.load_yaml_file(opts.file[0]) - except Exception as e: - s = str(e) - rc=RC_ERR(s) - rc.annotate() - return rc + # pack the profile + profiles = [opts.file[0]] - rc = RC(stream_list != None) - rc.annotate("Load stream pack (from file):") - if stream_list == None: - return RC_ERR("Failed to load stream pack") + self.start(profiles, + opts.ports, + opts.mult, + opts.force, + opts.duration, + opts.dry, + opts.total) + # true means print time + return True - # total has no meaning with percentage - its linear - if opts.total and (opts.mult['type'] != 'percentage'): - # if total was set - divide it between the ports - opts.mult['value'] = opts.mult['value'] / len(opts.ports) - return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry) - @timing - def cmd_resume_line (self, line): - '''Resume active traffic in specified ports on TRex\n''' + @__console + def stop_line (self, line): + '''Stop active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, - "resume", - self.cmd_stop_line.__doc__, + "stop", + self.stop_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + # find the relevant ports + ports = list(set(self.get_active_ports()).intersection(opts.ports)) - return self.cmd_resume(opts.ports) + if not ports: + self.logger.log(format_text("No active traffic on provided ports\n", 'bold')) + return + self.stop(ports) - @timing - def cmd_stop_line (self, line): - '''Stop active traffic in specified ports on TRex\n''' + # true means print time + return True + + + @__console + def update_line (self, line): + '''Update port(s) speed currently active\n''' parser = parsing_opts.gen_parser(self, - "stop", - self.cmd_stop_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) + "update", + self.update_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL, + parsing_opts.MULTIPLIER, + parsing_opts.TOTAL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + # find the relevant ports + ports = list(set(self.get_active_ports()).intersection(opts.ports)) + + if not ports: + self.logger.log(format_text("No ports in valid state to update\n", 'bold')) + return + + self.update(ports, opts.mult, opts.total) - return self.cmd_stop(opts.ports) + # true means print time + return True - @timing - def cmd_pause_line (self, line): + @__console + def pause_line (self, line): '''Pause active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, "pause", - self.cmd_stop_line.__doc__, + self.pause_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + # find the relevant ports + ports = list(set(self.get_transmitting_ports()).intersection(opts.ports)) - return self.cmd_pause(opts.ports) + if not ports: + self.logger.log(format_text("No ports in valid state to pause\n", 'bold')) + return + self.pause(ports) - @timing - def cmd_update_line (self, line): - '''Update port(s) speed currently active\n''' + # true means print time + return True + + + @__console + def resume_line (self, line): + '''Resume active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, - "update", - self.cmd_update_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.MULTIPLIER, - parsing_opts.TOTAL) + "resume", + self.resume_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line paramters") + return - # total has no meaning with percentage - its linear - if opts.total and (opts.mult['type'] != 'percentage'): - # if total was set - divide it between the ports - opts.mult['value'] = opts.mult['value'] / len(opts.ports) + # find the relevant ports + ports = list(set(self.get_paused_ports()).intersection(opts.ports)) - return self.cmd_update(opts.ports, opts.mult) + if not ports: + self.logger.log(format_text("No ports in valid state to resume\n", 'bold')) + return - @timing - def cmd_reset_line (self, line): - return self.cmd_reset() + return self.resume(ports) + # true means print time + return True - def cmd_clear_line (self, line): + + @__console + def clear_stats_line (self, line): '''Clear cached local statistics\n''' # define a parser parser = parsing_opts.gen_parser(self, "clear", - self.cmd_clear_line.__doc__, + self.clear_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + self.clear_stats(opts.ports) - return self.cmd_clear(opts.ports) - def cmd_stats_line (self, line): + + @__console + def show_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "stats", - self.cmd_stats_line.__doc__, + self.show_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.STATS_MASK) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return # determine stats mask - mask = self._get_mask_keys(**self._filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) + mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) if not mask: # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS - stats = self.cmd_stats(opts.ports, mask) + stats_opts = trex_stats.ALL_STATS_OPTS.intersection(mask) + + stats = self._get_formatted_stats(opts.ports, mask) + # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) - return RC_OK() - def cmd_streams_line(self, line): + @__console + def show_streams_line(self, line): '''Fetch streams statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "streams", - self.cmd_streams_line.__doc__, + self.show_streams_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.STREAMS_MASK)#, - #parsing_opts.FULL_OUTPUT) + parsing_opts.STREAMS_MASK) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return - streams = self.cmd_streams(opts.ports, set(opts.streams)) + streams = self._get_streams(opts.ports, set(opts.streams)) if not streams: - # we got no streams running + self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta")) - print format_text("No streams found with desired filter.\n", "bold", "magenta") - return RC_ERR("No streams found with desired filter.") else: # print stats to screen for stream_hdr, port_streams_data in streams.iteritems(): text_tables.print_table_with_header(port_streams_data.text_table, header= stream_hdr.split(":")[0] + ":", untouched_header= stream_hdr.split(":")[1]) - return RC_OK() - @timing - def cmd_validate_line (self, line): + @__console + def validate_line (self, line): '''validates port(s) stream configuration\n''' parser = parsing_opts.gen_parser(self, "validate", - self.cmd_validate_line.__doc__, + self.validate_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line paramters") - - rc = self.cmd_validate(opts.ports) - return rc - - - def cmd_exit_line (self, line): - print format_text("Exiting\n", 'bold') - # a way to exit - return RC_ERR("exit") - - - def cmd_wait_line (self, line): - '''wait for a period of time\n''' - - parser = parsing_opts.gen_parser(self, - "wait", - self.cmd_wait_line.__doc__, - parsing_opts.DURATION) - - opts = parser.parse_args(line.split()) - if opts is None: - return RC_ERR("bad command line parameters") - - delay_sec = opts.duration if (opts.duration > 0) else 1 - - print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold') - time.sleep(delay_sec) - - return RC_OK() - - # run a script of commands - def run_script_file (self, filename): - - print format_text("\nRunning script file '{0}'...".format(filename), 'bold') - - rc = self.cmd_connect() - if rc.bad(): return - with open(filename) as f: - script_lines = f.readlines() - - cmd_table = {} - - # register all the commands - cmd_table['start'] = self.cmd_start_line - cmd_table['stop'] = self.cmd_stop_line - cmd_table['reset'] = self.cmd_reset_line - cmd_table['wait'] = self.cmd_wait_line - cmd_table['exit'] = self.cmd_exit_line - - for index, line in enumerate(script_lines, start = 1): - line = line.strip() - if line == "": - continue - if line.startswith("#"): - continue - - sp = line.split(' ', 1) - cmd = sp[0] - if len(sp) == 2: - args = sp[1] - else: - args = "" - - print format_text("Executing line {0} : '{1}'\n".format(index, line)) - - if not cmd in cmd_table: - print "\n*** Error at line {0} : '{1}'\n".format(index, line) - print format_text("unknown command '{0}'\n".format(cmd), 'bold') - return False - - rc = cmd_table[cmd](args) - if rc.bad(): - return False - - print format_text("\n[Done]", 'bold') - - return True - - - ################################# - # ------ private methods ------ # - @staticmethod - def _get_mask_keys(ok_values={True}, **kwargs): - masked_keys = set() - for key, val in kwargs.iteritems(): - if val in ok_values: - masked_keys.add(key) - return masked_keys - - @staticmethod - def _filter_namespace_args(namespace, ok_values): - return {k: v for k, v in namespace.__dict__.items() if k in ok_values} - - - ################################# - # ------ private classes ------ # - class CCommLink(object): - """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): - super(CTRexStatelessClient.CCommLink, self).__init__() - self.virtual = virtual - self.server = server - self.port = port - self.verbose = False - self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) - - @property - def is_connected(self): - if not self.virtual: - return self.rpc_link.connected - else: - return True - - def get_server (self): - return self.server - - def get_port (self): - return self.port - - def set_verbose(self, mode): - self.verbose = mode - return self.rpc_link.set_verbose(mode) - - def connect(self): - if not self.virtual: - return self.rpc_link.connect() - - def disconnect(self): - if not self.virtual: - return self.rpc_link.disconnect() - - def transmit(self, method_name, params={}): - if self.virtual: - self._prompt_virtual_tx_msg() - _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) - print msg - return - else: - return self.rpc_link.invoke_rpc_method(method_name, params) - - def transmit_batch(self, batch_list): - if self.virtual: - self._prompt_virtual_tx_msg() - print [msg - for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) - for command in batch_list]] - else: - batch = self.rpc_link.create_batch() - for command in batch_list: - batch.add(command.method, command.params) - # invoke the batch - return batch.invoke() + self.validate(opts.ports) - def _prompt_virtual_tx_msg(self): - print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, - port=self.port) -if __name__ == "__main__": - pass +
\ No newline at end of file |