#!/router/bin/python # for API usage the path name must be full from .trex_stl_exceptions import * from .trex_stl_streams import * from .trex_stl_jsonrpc_client import JsonRpcClient, BatchMessage from . import trex_stl_stats from .trex_stl_port import Port from .trex_stl_types import * from .trex_stl_async_client import CTRexAsyncClient from .utils import parsing_opts, text_tables, common from .utils.common import list_intersect, list_difference, is_sub_list from .utils.text_opts import * from functools import wraps from collections import namedtuple from yaml import YAMLError import time import datetime import re import random import json import traceback ############################ logger ############################# ############################ ############################# ############################ ############################# # logger API for the client class LoggerApi(object): # verbose levels VERBOSE_QUIET = 0 VERBOSE_REGULAR = 1 VERBOSE_HIGH = 2 def __init__(self): self.level = LoggerApi.VERBOSE_REGULAR # implemented by specific logger def write(self, msg, newline = True): raise Exception("Implement this") # implemented by specific logger def flush(self): raise Exception("Implement this") def set_verbose (self, level): if not level in range(self.VERBOSE_QUIET, self.VERBOSE_HIGH + 1): raise ValueError("Bad value provided for logger") self.level = level def get_verbose (self): return self.level def check_verbose (self, level): return (self.level >= level) # simple log message with verbose def log (self, msg, level = VERBOSE_REGULAR, newline = True): if not self.check_verbose(level): return self.write(msg, newline) # logging that comes from async event def async_log (self, msg, level = VERBOSE_REGULAR, newline = True): self.log(msg, level, newline) def pre_cmd (self, desc): self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) self.flush() def post_cmd (self, rc): if rc: self.log(format_text("[SUCCESS]\n", 'green', 'bold')) else: self.log(format_text("[FAILED]\n", 'red', 'bold')) def log_cmd (self, desc): self.pre_cmd(desc) self.post_cmd(True) # supress object getter def supress (self): class Supress(object): def __init__ (self, logger): self.logger = logger def __enter__ (self): self.saved_level = self.logger.get_verbose() self.logger.set_verbose(LoggerApi.VERBOSE_QUIET) def __exit__ (self, type, value, traceback): self.logger.set_verbose(self.saved_level) return Supress(self) # default logger - to stdout class DefaultLogger(LoggerApi): def __init__ (self): super(DefaultLogger, self).__init__() def write (self, msg, newline = True): if newline: print(msg) else: print (msg), def flush (self): sys.stdout.flush() ############################ async event hander ############################# ############################ ############################# ############################ ############################# # an event class Event(object): def __init__ (self, origin, ev_type, msg): self.origin = origin self.ev_type = ev_type self.msg = msg self.ts = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') def __str__ (self): prefix = "[{:^}][{:^}]".format(self.origin, self.ev_type) return "{:<10} - {:18} - {:}".format(self.ts, prefix, format_text(self.msg, 'bold')) # handles different async events given to the client class EventsHandler(object): def __init__ (self, client): self.client = client self.logger = self.client.logger self.events = [] # public functions def get_events (self, ev_type_filter = None): if ev_type_filter: return [ev for ev in self.events if ev.ev_type in listify(ev_type_filter)] else: return [ev for ev in self.events] def clear_events (self): self.events = [] def log_warning (self, msg, show = True): self.__add_event_log('local', 'warning', msg, show) # events called internally def on_async_dead (self): if self.client.connected: msg = 'Lost connection to server' self.__add_event_log('local', 'info', msg, True) self.client.connected = False def on_async_alive (self): pass def on_async_rx_stats_event (self, data, baseline): self.client.flow_stats.update(data, baseline) def on_async_latency_stats_event (self, data, baseline): self.client.latency_stats.update(data, baseline) # handles an async stats update from the subscriber def on_async_stats_update(self, dump_data, baseline): global_stats = {} port_stats = {} # filter the values per port and general for key, value in dump_data.items(): # match a pattern of ports m = re.search('(.*)\-(\d+)', key) if m: port_id = int(m.group(2)) field_name = m.group(1) if port_id in self.client.ports: if not port_id in port_stats: port_stats[port_id] = {} port_stats[port_id][field_name] = value else: continue else: # no port match - general stats global_stats[key] = value # update the general object with the snapshot self.client.global_stats.update(global_stats, baseline) # update all ports for port_id, data in port_stats.items(): self.client.ports[port_id].port_stats.update(data, baseline) # dispatcher for server async events (port started, port stopped and etc.) def on_async_event (self, type, data): # DP stopped show_event = False # port started if (type == 0): port_id = int(data['port_id']) ev = "Port {0} has started".format(port_id) self.__async_event_port_started(port_id) # port stopped elif (type == 1): port_id = int(data['port_id']) ev = "Port {0} has stopped".format(port_id) # call the handler self.__async_event_port_stopped(port_id) # port paused elif (type == 2): port_id = int(data['port_id']) ev = "Port {0} has paused".format(port_id) # call the handler self.__async_event_port_paused(port_id) # port resumed elif (type == 3): port_id = int(data['port_id']) ev = "Port {0} has resumed".format(port_id) # call the handler self.__async_event_port_resumed(port_id) # port finished traffic elif (type == 4): port_id = int(data['port_id']) ev = "Port {0} job done".format(port_id) # call the handler self.__async_event_port_job_done(port_id) show_event = True # port was acquired - maybe stolen... elif (type == 5): session_id = data['session_id'] port_id = int(data['port_id']) who = data['who'] force = data['force'] # if we hold the port and it was not taken by this session - show it if port_id in self.client.get_acquired_ports() and session_id != self.client.session_id: show_event = True # format the thief/us... if session_id == self.client.session_id: user = 'you' elif who == self.client.username: user = 'another session of you' else: user = "'{0}'".format(who) if force: ev = "Port {0} was forcely taken by {1}".format(port_id, user) else: ev = "Port {0} was taken by {1}".format(port_id, user) # call the handler in case its not this session if session_id != self.client.session_id: self.__async_event_port_acquired(port_id, who) # port was released elif (type == 6): port_id = int(data['port_id']) who = data['who'] session_id = data['session_id'] if session_id == self.client.session_id: user = 'you' elif who == self.client.username: user = 'another session of you' else: user = "'{0}'".format(who) ev = "Port {0} was released by {1}".format(port_id, user) # call the handler in case its not this session if session_id != self.client.session_id: self.__async_event_port_released(port_id) elif (type == 7): port_id = int(data['port_id']) ev = "port {0} job failed".format(port_id) show_event = True # server stopped elif (type == 100): ev = "Server has stopped" self.__async_event_server_stopped() show_event = True else: # unknown event - ignore return self.__add_event_log('server', 'info', ev, show_event) # private functions # on rare cases events may come on a non existent prot # (server was re-run with different config) def __async_event_port_job_done (self, port_id): if port_id in self.client.ports: self.client.ports[port_id].async_event_port_job_done() def __async_event_port_stopped (self, port_id): if port_id in self.client.ports: self.client.ports[port_id].async_event_port_stopped() def __async_event_port_started (self, port_id): if port_id in self.client.ports: self.client.ports[port_id].async_event_port_started() def __async_event_port_paused (self, port_id): if port_id in self.client.ports: self.client.ports[port_id].async_event_port_paused() def __async_event_port_resumed (self, port_id): if port_id in self.client.ports: self.client.ports[port_id].async_event_port_resumed() def __async_event_port_acquired (self, port_id, who): if port_id in self.client.ports: self.client.ports[port_id].async_event_acquired(who) def __async_event_port_released (self, port_id): if port_id in self.client.ports: self.client.ports[port_id].async_event_released() def __async_event_server_stopped (self): self.client.connected = False # add event to log def __add_event_log (self, origin, ev_type, msg, show = False): event = Event(origin, ev_type, msg) self.events.append(event) if show: self.logger.async_log("\n\n{0}".format(str(event))) ############################ RPC layer ############################# ############################ ############################# ############################ ############################# class CCommLink(object): """Describes the connectivity of the stateless client method""" def __init__(self, server="localhost", port=5050, virtual=False, client = None): self.virtual = virtual self.server = server self.port = port self.rpc_link = JsonRpcClient(self.server, self.port, client) @property def is_connected(self): if not self.virtual: return self.rpc_link.connected else: return True def get_server (self): return self.server def get_port (self): return self.port def connect(self): if not self.virtual: return self.rpc_link.connect() def disconnect(self): if not self.virtual: return self.rpc_link.disconnect() def transmit(self, method_name, params = None, api_class = 'core'): if self.virtual: self._prompt_virtual_tx_msg() _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params, api_class) print(msg) return else: return self.rpc_link.invoke_rpc_method(method_name, params, api_class) def transmit_batch(self, batch_list): if self.virtual: self._prompt_virtual_tx_msg() print([msg for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params, command.api_class) for command in batch_list]]) else: batch = self.rpc_link.create_batch() for command in batch_list: batch.add(command.method, command.params, command.api_class) # invoke the batch return batch.invoke() def _prompt_virtual_tx_msg(self): print("Transmitting virtually over tcp://{server}:{port}".format(server=self.server, port=self.port)) ############################ client ############################# ############################ ############################# ############################ ############################# class STLClient(object): """TRex Stateless client object - gives operations per TRex/user""" def __init__(self, username = common.get_current_user(), server = "localhost", sync_port = 4501, async_port = 4500, verbose_level = LoggerApi.VERBOSE_QUIET, logger = None, virtual = False): """ Configure the connection settings :parameters: username : string the user name, for example imarom server : string the server name or ip sync_port : int the RPC port async_port : int the ASYNC port .. code-block:: python :caption: Example # connect to local TRex server c = STLClient() # connect to remote server trex-remote-server c = STLClient(server = "trex-remote-server" ) c = STLClient(server = "10.0.0.10" ) # verbose mode c = STLClient(server = "10.0.0.10", verbose_level = LoggerApi.VERBOSE_HIGH ) # change user name c = STLClient(username = "root",server = "10.0.0.10", verbose_level = LoggerApi.VERBOSE_HIGH ) c.connect() c.disconnect() """ self.username = username # init objects self.ports = {} self.server_version = {} self.system_info = {} self.session_id = random.getrandbits(32) self.connected = False # API classes self.api_vers = [ {'type': 'core', 'major': 1, 'minor':2 } ] self.api_h = {'core': None} # logger self.logger = DefaultLogger() if not logger else logger # initial verbose self.logger.set_verbose(verbose_level) # low level RPC layer self.comm_link = CCommLink(server, sync_port, virtual, self) # async event handler manager self.event_handler = EventsHandler(self) # async subscriber level self.async_client = CTRexAsyncClient(server, async_port, self) # stats self.connection_info = {"username": username, "server": server, "sync_port": sync_port, "async_port": async_port, "virtual": virtual} self.global_stats = trex_stl_stats.CGlobalStats(self.connection_info, self.server_version, self.ports, self.event_handler) self.flow_stats = trex_stl_stats.CRxStats(self.ports) self.latency_stats = trex_stl_stats.CLatencyStats(self.ports) self.stats_generator = trex_stl_stats.CTRexInfoGenerator(self.global_stats, self.ports, self.flow_stats, self.latency_stats, self.async_client.monitor) ############# private functions - used by the class itself ########### # some preprocessing for port argument def __ports (self, port_id_list): # none means all if port_id_list == None: return range(0, self.get_port_count()) # always list if isinstance(port_id_list, int): port_id_list = [port_id_list] if not isinstance(port_id_list, list): raise ValueError("Bad port id list: {0}".format(port_id_list)) for port_id in port_id_list: if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): raise ValueError("Bad port id {0}".format(port_id)) return port_id_list # sync ports def __sync_ports (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].sync()) return rc # acquire ports, if port_list is none - get all def __acquire (self, port_id_list = None, force = False, sync_streams = True): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].acquire(force, sync_streams)) return rc # release ports def __release (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].release()) return rc def __add_streams(self, stream_list, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].add_streams(stream_list)) return rc def __remove_streams(self, stream_id_list, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].remove_streams(stream_id_list)) return rc def __remove_all_streams(self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].remove_all_streams()) return rc def __get_stream(self, stream_id, port_id, get_pkt = False): return self.ports[port_id].get_stream(stream_id) def __get_all_streams(self, port_id, get_pkt = False): return self.ports[port_id].get_all_streams() def __get_stream_id_list(self, port_id): return self.ports[port_id].get_stream_id_list() def __start (self, multiplier, duration, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].start(multiplier, duration, force)) return rc def __resume (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].resume()) return rc def __pause (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].pause()) return rc def __stop (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].stop(force)) return rc def __update (self, mult, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].update(mult, force)) return rc def __push_remote (self, pcap_filename, port_id_list, ipg_usec, speedup, count, duration): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].push_remote(pcap_filename, ipg_usec, speedup, count, duration)) return rc def __validate (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].validate()) return rc def __set_port_attr (self, port_id_list = None, attr_dict = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].set_attr(attr_dict)) return rc # connect to server def __connect(self): # first disconnect if already connected if self.is_connected(): self.__disconnect() # clear this flag self.connected = False # connect sync channel self.logger.pre_cmd("Connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port'])) rc = self.comm_link.connect() self.logger.post_cmd(rc) if not rc: return rc # API sync rc = self._transmit("api_sync", params = {'api_vers': self.api_vers}, api_class = None) if not rc: return rc # decode for api in rc.data()['api_vers']: self.api_h[ api['type'] ] = api['api_h'] # version rc = self._transmit("get_version") if not rc: return rc self.server_version = rc.data() self.global_stats.server_version = rc.data() # cache system info rc = self._transmit("get_system_info") if not rc: return rc self.system_info = rc.data() # cache supported commands rc = self._transmit("get_supported_cmds") if not rc: return rc self.supported_cmds = rc.data() # create ports for port_id in range(self.system_info["port_count"]): info = self.system_info['ports'][port_id] self.ports[port_id] = Port(port_id, self.username, self.comm_link, self.session_id, info) # sync the ports rc = self.__sync_ports() if not rc: return rc # connect async channel self.logger.pre_cmd("Connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port'])) rc = self.async_client.connect() self.logger.post_cmd(rc) if not rc: return rc self.connected = True return RC_OK() # disconenct from server def __disconnect(self, release_ports = True): # release any previous acquired ports if self.is_connected() and release_ports: self.__release(self.get_acquired_ports()) self.comm_link.disconnect() self.async_client.disconnect() self.connected = False return RC_OK() # clear stats def __clear_stats(self, port_id_list, clear_global, clear_flow_stats, clear_latency_stats): # we must be sync with the server self.async_client.barrier() for port_id in port_id_list: self.ports[port_id].clear_stats() if clear_global: self.global_stats.clear_stats() if clear_flow_stats: self.flow_stats.clear_stats() if clear_latency_stats: self.latency_stats.clear_stats() self.logger.log_cmd("Clearing stats on port(s) {0}:".format(port_id_list)) return RC # get stats def __get_stats (self, port_id_list): stats = {} stats['global'] = self.global_stats.get_stats() total = {} for port_id in port_id_list: port_stats = self.ports[port_id].get_stats() stats[port_id] = port_stats for k, v in port_stats.items(): if not k in total: total[k] = v else: total[k] += v stats['total'] = total stats['flow_stats'] = self.flow_stats.get_stats() stats['latency'] = self.latency_stats.get_stats() return stats ############ functions used by other classes but not users ############## def _validate_port_list (self, port_id_list): # listfiy single int if isinstance(port_id_list, int): port_id_list = [port_id_list] # should be a list if not isinstance(port_id_list, list): raise STLTypeError('port_id_list', type(port_id_list), list) if not port_id_list: raise STLError('No ports provided') valid_ports = self.get_all_ports() for port_id in port_id_list: if not port_id in valid_ports: raise STLError("Port ID '{0}' is not a valid port ID - valid values: {1}".format(port_id, valid_ports)) return port_id_list # transmit request on the RPC link def _transmit(self, method_name, params = None, api_class = 'core'): return self.comm_link.transmit(method_name, params, api_class) # transmit batch request on the RPC link def _transmit_batch(self, batch_list): return self.comm_link.transmit_batch(batch_list) # stats def _get_formatted_stats(self, port_id_list, stats_mask = trex_stl_stats.COMPACT): stats_opts = common.list_intersect(trex_stl_stats.ALL_STATS_OPTS, stats_mask) stats_obj = OrderedDict() for stats_type in stats_opts: stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type)) return stats_obj def _get_streams(self, port_id_list, streams_mask=set()): streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask) return streams_obj def _invalidate_stats (self, port_id_list): for port_id in port_id_list: self.ports[port_id].invalidate_stats() self.global_stats.invalidate() self.flow_stats.invalidate() return RC_OK() # remove all RX filters in a safe manner def _remove_rx_filters (self, ports, rx_delay_ms): # get the enabled RX ports rx_ports = [port_id for port_id in ports if self.ports[port_id].has_rx_enabled()] if not rx_ports: return RC_OK() # block while any RX configured port has not yet have it's delay expired while any([not self.ports[port_id].has_rx_delay_expired(rx_delay_ms) for port_id in rx_ports]): time.sleep(0.01) # remove RX filters rc = RC() for port_id in rx_ports: rc.add(self.ports[port_id].remove_rx_filters()) return rc ################################# # ------ private methods ------ # @staticmethod def __get_mask_keys(ok_values={True}, **kwargs): masked_keys = set() for key, val in kwargs.items(): if val in ok_values: masked_keys.add(key) return masked_keys @staticmethod def __filter_namespace_args(namespace, ok_values): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} # API decorator - double wrap because of argument def __api_check(connected = True): def wrap (f): @wraps(f) def wrap2(*args, **kwargs): client = args[0] func_name = f.__name__ # check connection if connected and not client.is_connected(): raise STLStateError(func_name, 'disconnected') try: ret = f(*args, **kwargs) except KeyboardInterrupt as e: raise STLError("Test was interrupted by a keyboard signal (probably ctrl + c)") return ret return wrap2 return wrap ############################ API ############################# ############################ ############################# ############################ ############################# def __enter__ (self): self.connect() self.acquire(force = True) self.reset() return self def __exit__ (self, type, value, traceback): if self.get_active_ports(): self.stop(self.get_active_ports()) self.disconnect() ############################ Getters ############################# ############################ ############################# ############################ ############################# # return verbose level of the logger def get_verbose (self): """ Get the verbose mode :parameters: none :return: Get the verbose mode as Bool :raises: None """ return self.logger.get_verbose() # is the client on read only mode ? def is_all_ports_acquired (self): """ is_all_ports_acquired :parameters: None :return: Returns True if all ports are acquired :raises: None """ return (self.get_all_ports() == self.get_acquired_ports()) # is the client connected ? def is_connected (self): """ :parameters: None :return: is_connected :raises: None """ return self.connected and self.comm_link.is_connected # get connection info def get_connection_info (self): """ :parameters: None :return: Connection dict :raises: None """ return self.connection_info # get supported commands by the server def get_server_supported_cmds(self): """ :parameters: None :return: Connection dict :raises: None """ return self.supported_cmds # get server version def get_server_version(self): """ :parameters: None :return: Connection dict :raises: None """ return self.server_version # get server system info def get_server_system_info(self): """ :parameters: None :return: Connection dict :raises: None """ return self.system_info # get port count def get_port_count(self): """ :parameters: None :return: Connection dict :raises: None """ return len(self.ports) # returns the port object def get_port (self, port_id): port = self.ports.get(port_id, None) if (port != None): return port else: raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports()) # get all ports as IDs def get_all_ports (self): """ :parameters: None :return: Connection dict :raises: None """ return list(self.ports) # get all acquired ports def get_acquired_ports(self): return [port_id for port_id, port_obj in self.ports.items() if port_obj.is_acquired()] # get all active ports (TX or pause) def get_active_ports(self, owned = True): if owned: return [port_id for port_id, port_obj in self.ports.items() if port_obj.is_active() and port_obj.is_acquired()] else: return [port_id for port_id, port_obj in self.ports.items() if port_obj.is_active()] # get paused ports def get_paused_ports (self, owned = True): if owned: return [port_id for port_id, port_obj in self.ports.items() if port_obj.is_paused() and port_obj.is_acquired()] else: return [port_id for port_id, port_obj in self.ports.items() if port_obj.is_paused()] # get all TX ports def get_transmitting_ports (self, owned = True): if owned: return [port_id for port_id, port_obj in self.ports.items() if port_obj.is_transmitting() and port_obj.is_acquired()] else: return [port_id for port_id, port_obj in self.ports.items() if port_obj.is_transmitting()] # get stats def get_stats (self, ports = None, sync_now = True): """ Return dictionary containing statistics information gathered from the server. :parameters: ports - List of ports to retreive stats on. If None, assume the request is for all acquired ports. sync_now - Boolean - If true, create a call to the server to get latest stats, and wait for result to arrive. Otherwise, return last stats saved in client cache. Downside of putting True is a slight delay (few 10th msecs) in getting the result. For practical uses, value should be True. :return: Statistics dictionary of dictionaries with the following format: =============================== =============== key Meaning =============================== =============== :ref:`numbers (0,1,..` Statistcs per port number :ref:`total ` Sum of port statistics :ref:`flow_stats ` Per flow statistics :ref:`global ` Global statistics :ref:`latency ` Per flow statistics regarding flow latency =============================== =============== Below is description of each of the inner dictionaries. .. _total: **total** and per port statistics contain dictionary with following format. Most of the bytes counters (unless specified otherwise) are in L2 layer, including the Ethernet FCS. e.g. minimum packet size is 64 bytes =============================== =============== key Meaning =============================== =============== ibytes Number of input bytes ierrors Number of input errors ipackets Number of input packets obytes Number of output bytes oerrors Number of output errors opackets Number of output packets rx_bps Receive bytes per second rate (L2 layer) rx_pps Receive packet per second rate tx_bps Transmit bytes per second rate (L2 layer) tx_pps Transmit packet per second rate =============================== =============== .. _flow_stats: **flow_stats** contains :ref:`global dictionary `, and dictionaries per packet group id (pg id). See structures below. **per pg_id flow stat** dictionaries have following structure: ================= =============== key Meaning ================= =============== rx_bps Received bytes per second rate rx_bps_l1 Received bytes per second rate, including layer one rx_bytes Total number of received bytes rx_pkts Total number of received packets rx_pps Received packets per second tx_bps Transmit bytes per second rate tx_bps_l1 Transmit bytes per second rate, including layer one tx_bytes Total number of sent bytes tx_pkts Total number of sent packets tx_pps Transmit packets per second rate ================= =============== .. _flow_stats_global: **global flow stats** dictionary has the following structure: ================= =============== key Meaning ================= =============== rx_err Number of flow statistics packets received that we could not associate to any pg_id. This can happen if latency on the used setup is large. See :ref:`wait_on_traffic ` rx_delay_ms parameter for details. tx_err Number of flow statistics packets transmitted that we could not associate to any pg_id. This is never expected. If you see this different than 0, please report. ================= =============== .. _global: **global** ================= =============== key Meaning ================= =============== bw_per_core Estimated byte rate Trex can support per core. This is calculated by extrapolation of current rate and load on transmitting cores. cpu_util Estimate of the average utilization percentage of the transimitting cores queue_full Total number of packets transmitted while the NIC TX queue was full. The packets will be transmitted, eventually, but will create high CPU%due to polling the queue. This usually indicates that the rate we trying to transmit is too high for this port. rx_cpu_util Estimate of the utilization percentage of the core handling RX traffic. Too high value of this CPU utilization could cause drop of latency streams. rx_drop_bps Received bytes per second drop rate rx_bps Received bytes per second rate rx_pps Received packets per second rate tx_bps Transmit bytes per second rate tx_pps Transmit packets per second rate ================= =============== .. _latency: **latency** contains :ref:`global dictionary `, and dictionaries per packet group id (pg id). Each one with the following structure. **per pg_id latency stat** dictionaries have following structure: =========================== =============== key Meaning =========================== =============== :ref:`err_cntrs` Counters describing errors that occured with this pg id :ref:`latency` Information regarding packet latency =========================== =============== Following are the inner dictionaries of latency .. _err-cntrs: **err-cntrs** ================= =============== key Meaning (see better explanation below the table) ================= =============== dropped How many packets were dropped (estimation) dup How many packets were duplicated. out_of_order How many packets we received out of order. seq_too_high How many events of packet with sequence number too high we saw. seq_too_low How many events of packet with sequence number too low we saw. ================= =============== For calculating packet error events, we add sequence number to each packet's payload. We decide what went wrong only according to sequence number of last packet received and that of the previous packet. 'seq_too_low' and 'seq_too_high' count events we see. 'dup', 'out_of_order' and 'dropped' are heuristics we apply to try and understand what happened. They will be accurate in common error scenarios. We describe few scenarios below to help understand this. Scenario 1: Received packet with seq num 10, and another one with seq num 10. We increment 'dup' and 'seq_too_low' by 1. Scenario 2: Received pacekt with seq num 10 and then packet with seq num 15. We assume 4 packets were dropped, and increment 'dropped' by 4, and 'seq_too_high' by 1. We expect next packet to arrive with sequence number 16. Scenario 2 continue: Received packet with seq num 11. We increment 'seq_too_low' by 1. We increment 'out_of_order' by 1. We *decrement* 'dropped' by 1. (We assume here that one of the packets we considered as dropped before, actually arrived out of order). .. _lat_inner: **latency** ================= =============== key Meaning ================= =============== average Average latency over the stream lifetime (usec).Low pass filter is applied to the last window average.It is computed each sampling period by following formula: = /2 + /2 histogram Dictionary describing logarithmic distribution histogram of packet latencies. Keys in the dictionary represent range of latencies (in usec). Values are the total number of packets received in this latency range. For example, an entry {100:13} would mean that we saw 13 packets with latency in the range between 100 and 200 usec. jitter Jitter of latency samples, computed as described in :rfc:`3550#appendix-A.8` last_max Maximum latency measured between last two data reads from server (0.5 sec window). total_max Maximum latency measured over the stream lifetime (in usec). total_min Minimum latency measured over the stream lifetime (in usec). ================= =============== .. _lat_stats_global: **global latency stats** dictionary has the following structure: ================= =============== key Meaning ================= =============== old_flow Number of latency statistics packets received that we could not associate to any pg_id. This can happen if latency on the used setup is large. See :ref:`wait_on_traffic ` rx_delay_ms parameter for details. bad_hdr Number of latency packets received with bad latency data. This can happen becuase of garbage packets in the network, or if the DUT causes packet corruption. ================= =============== :raises: None """ # by default use all acquired ports ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) # check async barrier if not type(sync_now) is bool: raise STLArgumentError('sync_now', sync_now) # if the user requested a barrier - use it if sync_now: rc = self.async_client.barrier() if not rc: raise STLError(rc) return self.__get_stats(ports) def get_events (self, ev_type_filter = None): """ returns all the logged events :parameters: ev_type_filter - 'info', 'warning' or a list of those default: no filter :return: logged events :raises: None """ return self.event_handler.get_events(ev_type_filter) def get_warnings (self): """ returns all the warnings logged events :parameters: None :return: warning logged events :raises: None """ return self.get_events(ev_type_filter = 'warning') def get_info (self): """ returns all the info logged events :parameters: None :return: warning logged events :raises: None """ return self.get_events(ev_type_filter = 'info') # get port(s) info as a list of dicts @__api_check(True) def get_port_info (self, ports = None): ports = ports if ports is not None else self.get_all_ports() ports = self._validate_port_list(ports) return [self.ports[port_id].get_info() for port_id in ports] ############################ Commands ############################# ############################ ############################# ############################ ############################# def set_verbose (self, level): """ Sets verbose level :parameters: level : str "high" "low" "normal" :raises: None """ modes = {'low' : LoggerApi.VERBOSE_QUIET, 'normal': LoggerApi.VERBOSE_REGULAR, 'high': LoggerApi.VERBOSE_HIGH} if not level in modes.keys(): raise STLArgumentError('level', level) self.logger.set_verbose(modes[level]) @__api_check(False) def connect (self): """ def connect(self): Connects to the TRex server :parameters: None :raises: + :exc:`STLError` """ rc = self.__connect() if not rc: raise STLError(rc) @__api_check(False) def disconnect (self, stop_traffic = True, release_ports = True): """ Disconnects from the server :parameters: stop_traffic : bool Attempts to stop traffic before disconnecting. release_ports : bool Attempts to release all the acquired ports. """ # try to stop ports but do nothing if not possible if stop_traffic: try: self.stop() except STLError: pass self.logger.pre_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], self.connection_info['sync_port'])) rc = self.__disconnect(release_ports) self.logger.post_cmd(rc) @__api_check(True) def acquire (self, ports = None, force = False, sync_streams = True): """ Acquires ports for executing commands :parameters: ports : list Ports on which to execute the command force : bool Force acquire the ports. sync_streams: bool sync with the server about the configured streams :raises: + :exc:`STLError` """ # by default use all ports ports = ports if ports is not None else self.get_all_ports() ports = self._validate_port_list(ports) if force: self.logger.pre_cmd("Force acquiring ports {0}:".format(ports)) else: self.logger.pre_cmd("Acquiring ports {0}:".format(ports)) rc = self.__acquire(ports, force, sync_streams) self.logger.post_cmd(rc) if not rc: # cleanup self.__release(ports) raise STLError(rc) @__api_check(True) def release (self, ports = None): """ Release ports :parameters: ports : list Ports on which to execute the command :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) self.logger.pre_cmd("Releasing ports {0}:".format(ports)) rc = self.__release(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def ping(self): """ Pings the server :parameters: None :raises: + :exc:`STLError` """ self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], self.connection_info['sync_port'])) rc = self._transmit("ping", api_class = None) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def get_active_pgids(self): """ Get active group IDs :parameters: None :raises: + :exc:`STLError` """ self.logger.pre_cmd( "Getting active packet group ids") rc = self._transmit("get_active_pgids") self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def reset(self, ports = None): """ Force acquire ports, stop the traffic, remove all streams and clear stats :parameters: ports : list Ports on which to execute the command :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_all_ports() ports = self._validate_port_list(ports) # force take the port and ignore any streams on it self.acquire(ports, force = True, sync_streams = False) self.stop(ports, rx_delay_ms = 0) self.remove_all_streams(ports) self.clear_stats(ports) @__api_check(True) def remove_all_streams (self, ports = None): """ remove all streams from port(s) :parameters: ports : list Ports on which to execute the command :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(ports)) rc = self.__remove_all_streams(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def add_streams (self, streams, ports = None): """ Add a list of streams to port(s) :parameters: ports : list Ports on which to execute the command streams: list Streams to attach (or profile) :returns: List of stream IDs in order of the stream list :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) if isinstance(streams, STLProfile): streams = streams.get_streams() # transform single stream if not isinstance(streams, list): streams = [streams] # check streams if not all([isinstance(stream, STLStream) for stream in streams]): raise STLArgumentError('streams', streams) self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(streams), ports)) rc = self.__add_streams(streams, ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) # return the stream IDs return rc.data() @__api_check(True) def remove_streams (self, stream_id_list, ports = None): """ Remove a list of streams from ports :parameters: ports : list Ports on which to execute the command stream_id_list: list Stream id list to remove :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) # transform single stream if not isinstance(stream_id_list, list): stream_id_list = [stream_id_list] # check streams for stream_id in stream_id_list: validate_type('stream_id', stream_id, int) # remove streams self.logger.pre_cmd("Removing {0} streams from port(s) {1}:".format(len(stream_id_list), ports)) rc = self.__remove_streams(stream_id_list, ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def start (self, ports = None, mult = "1", force = False, duration = -1, total = False): """ Start traffic on port(s) :parameters: ports : list Ports on which to execute the command mult : str Multiplier in a form of pps, bps, or line util in % Examples: "5kpps", "10gbps", "85%", "32mbps" force : bool If the ports are not in stopped mode or do not have sufficient bandwidth for the traffic, determines whether to stop the current traffic and force start. True: Force start False: Do not force start duration : int Limit the run time (seconds) -1 = unlimited total : bool Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port. True: Divide bandwidth among the ports False: Duplicate :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) validate_type('mult', mult, basestring) validate_type('force', force, bool) validate_type('duration', duration, (int, float)) validate_type('total', total, bool) # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = False, divide_count = len(ports) if total else 1) if not mult_obj: raise STLArgumentError('mult', mult) # verify ports are stopped or force stop them active_ports = list(set(self.get_active_ports()).intersection(ports)) if active_ports: if not force: raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports)) else: rc = self.stop(active_ports) if not rc: raise STLError(rc) # start traffic self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports)) rc = self.__start(mult_obj, duration, ports, force) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def stop (self, ports = None, rx_delay_ms = 10): """ Stop port(s) :parameters: ports : list Ports on which to execute the command rx_delay_ms : int time to wait until RX filters are removed this value should reflect the time it takes packets which were transmitted to arrive to the destination. after this time the RX filters will be removed :raises: + :exc:`STLError` """ if ports is None: ports = self.get_active_ports() if not ports: return ports = self._validate_port_list(ports) self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(ports)) rc = self.__stop(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) # remove any RX filters rc = self._remove_rx_filters(ports, rx_delay_ms = rx_delay_ms) if not rc: raise STLError(rc) @__api_check(True) def update (self, ports = None, mult = "1", total = False, force = False): """ Update traffic on port(s) :parameters: ports : list Ports on which to execute the command mult : str Multiplier in a form of pps, bps, or line util in % Can also specify +/- Examples: "5kpps+", "10gbps-", "85%", "32mbps", "20%+" force : bool If the ports are not in stopped mode or do not have sufficient bandwidth for the traffic, determines whether to stop the current traffic and force start. True: Force start False: Do not force start total : bool Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port. True: Divide bandwidth among the ports False: Duplicate :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_active_ports() ports = self._validate_port_list(ports) validate_type('mult', mult, basestring) validate_type('force', force, bool) validate_type('total', total, bool) # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = True, divide_count = len(ports) if total else 1) if not mult_obj: raise STLArgumentError('mult', mult) # call low level functions self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(ports)) rc = self.__update(mult_obj, ports, force) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def pause (self, ports = None): """ Pause traffic on port(s). Works only for ports that are active, and only if all streams are in Continuous mode. :parameters: ports : list Ports on which to execute the command :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_transmitting_ports() ports = self._validate_port_list(ports) self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(ports)) rc = self.__pause(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def resume (self, ports = None): """ Resume traffic on port(s) :parameters: ports : list Ports on which to execute the command :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_paused_ports() ports = self._validate_port_list(ports) self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(ports)) rc = self.__resume(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def push_remote (self, pcap_filename, ports = None, ipg_usec = None, speedup = 1.0, count = 1, duration = -1): """ Push a remote server-reachable PCAP file the path must be fullpath accessible to the server :parameters: pcap_filename : str PCAP file name in full path and accessible to the server ports : list Ports on which to execute the command ipg_usec : float Inter-packet gap in microseconds speedup : float A factor to adjust IPG. effectively IPG = IPG / speedup count: int How many times to transmit the cap duration: float Limit runtime by duration in seconds :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) validate_type('pcap_filename', pcap_filename, str) validate_type('ipg_usec', ipg_usec, (float, int, type(None))) validate_type('speedup', speedup, (float, int)) validate_type('count', count, int) validate_type('duration', duration, (float, int)) self.logger.pre_cmd("Pushing remote PCAP on port(s) {0}:".format(ports)) rc = self.__push_remote(pcap_filename, ports, ipg_usec, speedup, count, duration) self.logger.post_cmd(rc) if not rc: raise STLError(rc) @__api_check(True) def push_pcap (self, pcap_filename, ports = None, ipg_usec = None, speedup = 1.0, count = 1, duration = -1, force = False, vm = None, packet_hook = None): """ Push a local PCAP to the server This is equivalent to loading a PCAP file to a profile and attaching the profile to port(s) file size is limited to 1MB :parameters: pcap_filename : str PCAP filename (accessible locally) ports : list Ports on which to execute the command ipg_usec : float Inter-packet gap in microseconds speedup : float A factor to adjust IPG. effectively IPG = IPG / speedup count: int How many times to transmit the cap duration: float Limit runtime by duration in seconds force: bool Ignore file size limit - push any file size to the server vm: list of VM instructions VM instructions to apply for every packet packet_hook : Callable or function Will be applied to every packet :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) validate_type('pcap_filename', pcap_filename, str) validate_type('ipg_usec', ipg_usec, (float, int, type(None))) validate_type('speedup', speedup, (float, int)) validate_type('count', count, int) validate_type('duration', duration, (float, int)) validate_type('vm', vm, (list, type(None))) # no support for > 1MB PCAP - use push remote if not force and os.path.getsize(pcap_filename) > (1024 * 1024): raise STLError("PCAP size of {:} is too big for local push - consider using remote push or provide 'force'".format(format_num(os.path.getsize(pcap_filename), suffix = 'B'))) self.remove_all_streams(ports = ports) profile = STLProfile.load_pcap(pcap_filename, ipg_usec, speedup, count, vm = vm, packet_hook = packet_hook) id_list = self.add_streams(profile.get_streams(), ports) return self.start(ports = ports, duration = duration) @__api_check(True) def validate (self, ports = None, mult = "1", duration = -1, total = False): """ Validate port(s) configuration :parameters: ports : list Ports on which to execute the command mult : str Multiplier in a form of pps, bps, or line util in % Examples: "5kpps", "10gbps", "85%", "32mbps" duration : int Limit the run time (seconds) -1 = unlimited total : bool Determines whether to divide the configured bandwidth among the ports, or to duplicate the bandwidth for each port. True: Divide bandwidth among the ports False: Duplicate :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) validate_type('mult', mult, basestring) validate_type('duration', duration, (int, float)) validate_type('total', total, bool) # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = True, divide_count = len(ports) if total else 1) if not mult_obj: raise STLArgumentError('mult', mult) self.logger.pre_cmd("Validating streams on port(s) {0}:".format(ports)) rc = self.__validate(ports) self.logger.post_cmd(rc) if not rc: raise STLError(rc) for port in ports: self.ports[port].print_profile(mult_obj, duration) @__api_check(False) def clear_stats (self, ports = None, clear_global = True, clear_flow_stats = True, clear_latency_stats = True): """ Clear stats on port(s) :parameters: ports : list Ports on which to execute the command clear_global : bool Clear the global stats clear_flow_stats : bool Clear the flow stats clear_latency_stats : bool Clear the latency stats :raises: + :exc:`STLError` """ ports = ports if ports is not None else self.get_all_ports() ports = self._validate_port_list(ports) # verify clear global if not type(clear_global) is bool: raise STLArgumentError('clear_global', clear_global) rc = self.__clear_stats(ports, clear_global, clear_flow_stats, clear_latency_stats) if not rc: raise STLError(rc) @__api_check(True) def is_traffic_active (self, ports = None): """ Return if specified port(s) have traffic :parameters: ports : list Ports on which to execute the command :raises: + :exc:`STLTimeoutError` - in case timeout has expired + :exe:'STLError' """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) return set(self.get_active_ports()).intersection(ports) @__api_check(True) def wait_on_traffic (self, ports = None, timeout = 60, rx_delay_ms = 10): """ .. _wait_on_traffic: Block until traffic on specified port(s) has ended :parameters: ports : list Ports on which to execute the command timeout : int timeout in seconds rx_delay_ms : int Time to wait (in milliseconds) after last packet was sent, until RX filters used for measuring flow statistics and latency are removed. This value should reflect the time it takes packets which were transmitted to arrive to the destination. After this time, RX filters will be removed, and packets arriving for per flow statistics feature and latency flows will be counted as errors. :raises: + :exc:`STLTimeoutError` - in case timeout has expired + :exe:'STLError' """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) expr = time.time() + timeout # wait while any of the required ports are active while set(self.get_active_ports()).intersection(ports): # make sure ASYNC thread is still alive - otherwise we will be stuck forever if not self.async_client.is_thread_alive(): raise STLError("subscriber thread is dead") time.sleep(0.01) if time.time() > expr: raise STLTimeoutError(timeout) # remove any RX filters rc = self._remove_rx_filters(ports, rx_delay_ms = rx_delay_ms) if not rc: raise STLError(rc) @__api_check(True) def set_port_attr (self, ports = None, promiscuous = None): """ Set port attributes :parameters: promiscuous - True or False :raises: None """ ports = ports if ports is not None else self.get_acquired_ports() ports = self._validate_port_list(ports) # check arguments validate_type('promiscuous', promiscuous, (bool, type(None))) # build attributes attr_dict = {} if promiscuous is not None: attr_dict['promiscuous'] = {'enabled': bool(promiscuous)} # no attributes to set if not attr_dict: return self.logger.pre_cmd("Applying attributes on port(s) {0}:".format(ports)) rc = self.__set_port_attr(ports, attr_dict) self.logger.post_cmd(rc) if not rc: raise STLError(rc) def clear_events (self): """ Clear all events :parameters: None :raises: None """ self.event_handler.clear_events() ############################ Line ############################# ############################ Commands ############################# ############################ ############################# # console decorator def __console(f): @wraps(f) def wrap(*args): client = args[0] time1 = time.time() try: rc = f(*args) except STLError as e: client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) return # if got true - print time if rc: delta = time.time() - time1 client.logger.log(format_time(delta) + "\n") return wrap @__console def ping_line (self, line): '''pings the server''' self.ping() return True @__console def connect_line (self, line): '''Connects to the TRex server and acquire ports''' parser = parsing_opts.gen_parser(self, "connect", self.connect_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.FORCE) opts = parser.parse_args(line.split(), default_ports = self.get_all_ports()) if opts is None: return self.connect() self.acquire(ports = opts.ports, force = opts.force) # true means print time return True @__console def acquire_line (self, line): '''Acquire ports\n''' # define a parser parser = parsing_opts.gen_parser(self, "acquire", self.acquire_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.FORCE) opts = parser.parse_args(line.split(), default_ports = self.get_all_ports()) if opts is None: return # filter out all the already owned ports ports = list_difference(opts.ports, self.get_acquired_ports()) if not ports: self.logger.log("acquire - all port(s) {0} are already acquired".format(opts.ports)) return self.acquire(ports = ports, force = opts.force) # true means print time return True # @__console def release_line (self, line): '''Release ports\n''' parser = parsing_opts.gen_parser(self, "release", self.release_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports()) if opts is None: return ports = list_intersect(opts.ports, self.get_acquired_ports()) if not ports: if not opts.ports: self.logger.log("release - no acquired ports") return else: self.logger.log("release - none of port(s) {0} are acquired".format(opts.ports)) return self.release(ports = ports) # true means print time return True @__console def reacquire_line (self, line): '''reacquire all the ports under your username which are not acquired by your session''' parser = parsing_opts.gen_parser(self, "reacquire", self.reacquire_line.__doc__) opts = parser.parse_args(line.split()) if opts is None: return # find all the on-owned ports under your name my_unowned_ports = list_difference([k for k, v in self.ports.items() if v.get_owner() == self.username], self.get_acquired_ports()) if not my_unowned_ports: self.logger.log("reacquire - no unowned ports under '{0}'".format(self.username)) return self.acquire(ports = my_unowned_ports, force = True) return True @__console def disconnect_line (self, line): self.disconnect() @__console def reset_line (self, line): '''Reset ports - if no ports are provided all acquired ports will be reset''' parser = parsing_opts.gen_parser(self, "reset", self.reset_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True) if opts is None: return self.reset(ports = opts.ports) # true means print time return True @__console def start_line (self, line): '''Start selected traffic on specified ports on TRex\n''' # define a parser parser = parsing_opts.gen_parser(self, "start", self.start_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.TOTAL, parsing_opts.FORCE, parsing_opts.FILE_PATH, parsing_opts.DURATION, parsing_opts.TUNABLES, parsing_opts.MULTIPLIER_STRICT, parsing_opts.DRY_RUN) opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True) if opts is None: return active_ports = list_intersect(self.get_active_ports(), opts.ports) if active_ports: if not opts.force: msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports) self.logger.log(format_text(msg, 'bold')) return else: self.stop(active_ports) # default value for tunables (empty) tunables = [{}] * len(opts.ports) # process tunables if opts.tunables: # for one tunable - duplicate for all ports if len(opts.tunables) == 1: tunables = opts.tunables * len(opts.ports) else: # must be exact if len(opts.ports) != len(opts.tunables): self.logger.log('tunables section count must be 1 or exactly as the number of ports: got {0}'.format(len(opts.tunables))) return tunables = opts.tunables # remove all streams self.remove_all_streams(opts.ports) # pack the profile try: for port, t in zip(opts.ports, tunables): profile = STLProfile.load(opts.file[0], direction = t.get('direction', port % 2), port_id = port, **t) self.add_streams(profile.get_streams(), ports = port) except STLError as e: self.logger.log(format_text("\nError while loading profile '{0}'\n".format(opts.file[0]), 'bold')) self.logger.log(e.brief() + "\n") return if opts.dry: self.validate(opts.ports, opts.mult, opts.duration, opts.total) else: self.start(opts.ports, opts.mult, opts.force, opts.duration, opts.total) # true means print time return True @__console def stop_line (self, line): '''Stop active traffic on specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, "stop", self.stop_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split(), default_ports = self.get_active_ports(), verify_acquired = True) if opts is None: return # find the relevant ports ports = list_intersect(opts.ports, self.get_active_ports()) if not ports: if not opts.ports: self.logger.log('stop - no active ports') else: self.logger.log('stop - no active traffic on ports {0}'.format(opts.ports)) return # call API self.stop(ports) # true means print time return True @__console def update_line (self, line): '''Update port(s) speed currently active\n''' parser = parsing_opts.gen_parser(self, "update", self.update_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.MULTIPLIER, parsing_opts.TOTAL, parsing_opts.FORCE) opts = parser.parse_args(line.split(), default_ports = self.get_active_ports(), verify_acquired = True) if opts is None: return # find the relevant ports ports = list_intersect(opts.ports, self.get_active_ports()) if not ports: if not opts.ports: self.logger.log('update - no active ports') else: self.logger.log('update - no active traffic on ports {0}'.format(opts.ports)) return self.update(ports, opts.mult, opts.total, opts.force) # true means print time return True @__console def pause_line (self, line): '''Pause active traffic on specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, "pause", self.pause_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split(), default_ports = self.get_transmitting_ports(), verify_acquired = True) if opts is None: return # check for already paused case if opts.ports and is_sub_list(opts.ports, self.get_paused_ports()): self.logger.log('pause - all of port(s) {0} are already paused'.format(opts.ports)) return # find the relevant ports ports = list_intersect(opts.ports, self.get_transmitting_ports()) if not ports: if not opts.ports: self.logger.log('pause - no transmitting ports') else: self.logger.log('pause - none of ports {0} are transmitting'.format(opts.ports)) return self.pause(ports) # true means print time return True @__console def resume_line (self, line): '''Resume active traffic on specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, "resume", self.resume_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split(), default_ports = self.get_paused_ports(), verify_acquired = True) if opts is None: return # find the relevant ports ports = list_intersect(opts.ports, self.get_paused_ports()) if not ports: if not opts.ports: self.logger.log('resume - no paused ports') else: self.logger.log('resume - none of ports {0} are paused'.format(opts.ports)) return self.resume(ports) # true means print time return True @__console def clear_stats_line (self, line): '''Clear cached local statistics\n''' # define a parser parser = parsing_opts.gen_parser(self, "clear", self.clear_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: return self.clear_stats(opts.ports) @__console def show_stats_line (self, line): '''Get statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "stats", self.show_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.STATS_MASK) opts = parser.parse_args(line.split()) if opts is None: return # determine stats mask mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stl_stats.ALL_STATS_OPTS)) if not mask: # set to show all stats if no filter was given mask = trex_stl_stats.COMPACT stats_opts = common.list_intersect(trex_stl_stats.ALL_STATS_OPTS, mask) stats = self._get_formatted_stats(opts.ports, mask) # print stats to screen for stat_type, stat_data in stats.items(): text_tables.print_table_with_header(stat_data.text_table, stat_type) @__console def show_streams_line(self, line): '''Get stream statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "streams", self.show_streams_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.STREAMS_MASK) opts = parser.parse_args(line.split()) if opts is None: return streams = self._get_streams(opts.ports, set(opts.streams)) if not streams: self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta")) else: # print stats to screen for stream_hdr, port_streams_data in streams.items(): text_tables.print_table_with_header(port_streams_data.text_table, header= stream_hdr.split(":")[0] + ":", untouched_header= stream_hdr.split(":")[1]) @__console def validate_line (self, line): '''Validates port(s) stream configuration\n''' parser = parsing_opts.gen_parser(self, "validate", self.validate_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: return self.validate(opts.ports) @__console def push_line (self, line): '''Push a pcap file ''' parser = parsing_opts.gen_parser(self, "push", self.push_line.__doc__, parsing_opts.FILE_PATH, parsing_opts.REMOTE_FILE, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.COUNT, parsing_opts.DURATION, parsing_opts.IPG, parsing_opts.SPEEDUP, parsing_opts.FORCE) opts = parser.parse_args(line.split()) if opts is None: return active_ports = list(set(self.get_active_ports()).intersection(opts.ports)) if active_ports: if not opts.force: msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports) self.logger.log(format_text(msg, 'bold')) return else: self.stop(active_ports) if opts.remote: self.push_remote(opts.file[0], ports = opts.ports, ipg_usec = opts.ipg_usec, speedup = opts.speedup, count = opts.count, duration = opts.duration) else: self.push_pcap(opts.file[0], ports = opts.ports, ipg_usec = opts.ipg_usec, speedup = opts.speedup, count = opts.count, duration = opts.duration, force = opts.force) return True @__console def set_port_attr_line (self, line): '''Sets port attributes ''' parser = parsing_opts.gen_parser(self, "port_attr", self.set_port_attr_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.PROMISCUOUS_SWITCH) opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True) if opts is None: return # if no attributes - fall back to printing the status if opts.prom is None: self.show_stats_line("--ps --port {0}".format(' '.join(str(port) for port in opts.ports))) return self.set_port_attr(opts.ports, opts.prom) @__console def show_profile_line (self, line): '''Shows profile information''' parser = parsing_opts.gen_parser(self, "port", self.show_profile_line.__doc__, parsing_opts.FILE_PATH) opts = parser.parse_args(line.split()) if opts is None: return info = STLProfile.get_info(opts.file[0]) self.logger.log(format_text('\nProfile Information:\n', 'bold')) # general info self.logger.log(format_text('\nGeneral Information:', 'underline')) self.logger.log('Filename: {:^12}'.format(opts.file[0])) self.logger.log('Stream count: {:^12}'.format(info['stream_count'])) # specific info profile_type = info['type'] self.logger.log(format_text('\nSpecific Information:', 'underline')) if profile_type == 'python': self.logger.log('Type: {:^12}'.format('Python Module')) self.logger.log('Tunables: {:^12}'.format(['{0} = {1}'.format(k ,v) for k, v in info['tunables'].items()])) elif profile_type == 'yaml': self.logger.log('Type: {:^12}'.format('YAML')) elif profile_type == 'pcap': self.logger.log('Type: {:^12}'.format('PCAP file')) self.logger.log("") @__console def get_events_line (self, line): '''shows events recieved from server\n''' x = [parsing_opts.ArgumentPack(['-c','--clear'], {'action' : "store_true", 'default': False, 'help': "clear the events log"}), parsing_opts.ArgumentPack(['-i','--info'], {'action' : "store_true", 'default': False, 'help': "show info events"}), parsing_opts.ArgumentPack(['-w','--warn'], {'action' : "store_true", 'default': False, 'help': "show warning events"}), ] parser = parsing_opts.gen_parser(self, "events", self.get_events_line.__doc__, *x) opts = parser.parse_args(line.split()) if opts is None: return ev_type_filter = [] if opts.info: ev_type_filter.append('info') if opts.warn: ev_type_filter.append('warning') if not ev_type_filter: ev_type_filter = None events = self.get_events(ev_type_filter) for ev in events: self.logger.log(ev) if opts.clear: self.clear_events()