diff options
author | 2016-02-08 06:08:14 -0500 | |
---|---|---|
committer | 2016-02-08 06:08:14 -0500 | |
commit | 995267db77f5554d5228697b8b2a862b51859fe6 (patch) | |
tree | 1a44007a59d8cabacab0690da515a68c3c25e7ac /scripts/automation/trex_control_plane/client | |
parent | 69e5a5c6b94175ece07b247af1b5ca6c0cfdf0e9 (diff) |
first refactor
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
4 files changed, 0 insertions, 3307 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py deleted file mode 100644 index ef4c48f9..00000000 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ /dev/null @@ -1,330 +0,0 @@ -#!/router/bin/python - -try: - # support import for Python 2 - import outer_packages -except ImportError: - # support import for Python 3 - import client.outer_packages -from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage - -from common.text_opts import * - -import json -import threading -import time -import datetime -import zmq -import re -import random - -from common.trex_stats import * -from common.trex_streams import * -from common.trex_types import * - -# basic async stats class -class CTRexAsyncStats(object): - def __init__ (self): - self.ref_point = None - self.current = {} - self.last_update_ts = datetime.datetime.now() - - def update (self, snapshot): - - #update - self.last_update_ts = datetime.datetime.now() - - self.current = snapshot - - if self.ref_point == None: - self.ref_point = self.current - - def clear(self): - self.ref_point = self.current - - - def get(self, field, format=False, suffix=""): - - if not field in self.current: - return "N/A" - - if not format: - return self.current[field] - else: - return format_num(self.current[field], suffix) - - def get_rel (self, field, format=False, suffix=""): - if not field in self.current: - return "N/A" - - if not format: - return (self.current[field] - self.ref_point[field]) - else: - return format_num(self.current[field] - self.ref_point[field], suffix) - - - # return true if new data has arrived in the past 2 seconds - def is_online (self): - delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000 - return (delta_ms < 2000) - -# describes the general stats provided by TRex -class CTRexAsyncStatsGeneral(CTRexAsyncStats): - def __init__ (self): - super(CTRexAsyncStatsGeneral, self).__init__() - - -# per port stats -class CTRexAsyncStatsPort(CTRexAsyncStats): - def __init__ (self): - super(CTRexAsyncStatsPort, self).__init__() - - def get_stream_stats (self, stream_id): - return None - -# stats manager -class CTRexAsyncStatsManager(): - def __init__ (self): - - self.general_stats = CTRexAsyncStatsGeneral() - self.port_stats = {} - - - def get_general_stats(self): - return self.general_stats - - def get_port_stats (self, port_id): - - if not str(port_id) in self.port_stats: - return None - - return self.port_stats[str(port_id)] - - - def update(self, data): - self.__handle_snapshot(data) - - def __handle_snapshot(self, snapshot): - - general_stats = {} - port_stats = {} - - # filter the values per port and general - for key, value in snapshot.iteritems(): - - # match a pattern of ports - m = re.search('(.*)\-([0-8])', key) - if m: - - port_id = m.group(2) - field_name = m.group(1) - - if not port_id in port_stats: - port_stats[port_id] = {} - - port_stats[port_id][field_name] = value - - else: - # no port match - general stats - general_stats[key] = value - - # update the general object with the snapshot - self.general_stats.update(general_stats) - - # update all ports - for port_id, data in port_stats.iteritems(): - - if not port_id in self.port_stats: - self.port_stats[port_id] = CTRexAsyncStatsPort() - - self.port_stats[port_id].update(data) - - - - - -class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client): - - self.port = port - self.server = server - - self.stateless_client = stateless_client - - self.event_handler = stateless_client.event_handler - self.logger = self.stateless_client.logger - - self.raw_snapshot = {} - - self.stats = CTRexAsyncStatsManager() - - self.last_data_recv_ts = 0 - self.async_barrier = None - - self.connected = False - - # connects the async channel - def connect (self): - - if self.connected: - self.disconnect() - - self.tr = "tcp://{0}:{1}".format(self.server, self.port) - - # Socket to talk to server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) - - - # before running the thread - mark as active - self.active = True - self.t = threading.Thread(target = self._run) - - # kill this thread on exit and don't add it to the join list - self.t.setDaemon(True) - self.t.start() - - self.connected = True - - rc = self.barrier() - if not rc: - self.disconnect() - return rc - - return RC_OK() - - - - - # disconnect - def disconnect (self): - if not self.connected: - return - - # signal that the context was destroyed (exit the thread loop) - self.context.term() - - # mark for join and join - self.active = False - self.t.join() - - # done - self.connected = False - - - # thread function - def _run (self): - - # socket must be created on the same thread - self.socket.setsockopt(zmq.SUBSCRIBE, '') - self.socket.setsockopt(zmq.RCVTIMEO, 5000) - self.socket.connect(self.tr) - - got_data = False - - while self.active: - try: - - line = self.socket.recv_string() - self.last_data_recv_ts = time.time() - - # signal once - if not got_data: - self.event_handler.on_async_alive() - got_data = True - - - # got a timeout - mark as not alive and retry - except zmq.Again: - - # signal once - if got_data: - self.event_handler.on_async_dead() - got_data = False - - continue - - except zmq.ContextTerminated: - # outside thread signaled us to exit - break - - msg = json.loads(line) - - name = msg['name'] - data = msg['data'] - type = msg['type'] - self.raw_snapshot[name] = data - - self.__dispatch(name, type, data) - - - # closing of socket must be from the same thread - self.socket.close(linger = 0) - - - # did we get info for the last 3 seconds ? - def is_alive (self): - if self.last_data_recv_ts == None: - return False - - return ( (time.time() - self.last_data_recv_ts) < 3 ) - - def get_stats (self): - return self.stats - - def get_raw_snapshot (self): - return self.raw_snapshot - - # dispatch the message to the right place - def __dispatch (self, name, type, data): - # stats - if name == "trex-global": - self.event_handler.handle_async_stats_update(data) - - # events - elif name == "trex-event": - self.event_handler.handle_async_event(type, data) - - # barriers - elif name == "trex-barrier": - self.handle_async_barrier(type, data) - else: - pass - - - # async barrier handling routine - def handle_async_barrier (self, type, data): - if self.async_barrier['key'] == type: - self.async_barrier['ack'] = True - - - # block on barrier for async channel - def barrier(self, timeout = 5): - - # set a random key - key = random.getrandbits(32) - self.async_barrier = {'key': key, 'ack': False} - - # expr time - expr = time.time() + timeout - - while not self.async_barrier['ack']: - - # inject - rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) - if not rc: - return rc - - # fast loop - for i in xrange(0, 100): - if self.async_barrier['ack']: - break - time.sleep(0.001) - - if time.time() > expr: - return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) - - return RC_OK() - - - diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py deleted file mode 100644 index eaf64ac2..00000000 --- a/scripts/automation/trex_control_plane/client/trex_port.py +++ /dev/null @@ -1,512 +0,0 @@ - -from collections import namedtuple, OrderedDict -from common.trex_types import * -from common import trex_stats -from client_utils import packet_builder - -StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata']) - -########## utlity ############ -def mult_to_factor (mult, max_bps_l2, max_pps, line_util): - if mult['type'] == 'raw': - return mult['value'] - - if mult['type'] == 'bps': - return mult['value'] / max_bps_l2 - - if mult['type'] == 'pps': - return mult['value'] / max_pps - - if mult['type'] == 'percentage': - return mult['value'] / line_util - - -# describes a single port -class Port(object): - STATE_DOWN = 0 - STATE_IDLE = 1 - STATE_STREAMS = 2 - STATE_TX = 3 - STATE_PAUSE = 4 - PortState = namedtuple('PortState', ['state_id', 'state_name']) - STATES_MAP = {STATE_DOWN: "DOWN", - STATE_IDLE: "IDLE", - STATE_STREAMS: "IDLE", - STATE_TX: "ACTIVE", - STATE_PAUSE: "PAUSE"} - - - def __init__ (self, port_id, speed, driver, user, comm_link, session_id): - self.port_id = port_id - self.state = self.STATE_IDLE - self.handler = None - self.comm_link = comm_link - self.transmit = comm_link.transmit - self.transmit_batch = comm_link.transmit_batch - self.user = user - self.driver = driver - self.speed = speed - self.streams = {} - self.profile = None - self.session_id = session_id - - self.port_stats = trex_stats.CPortStats(self) - - - def err(self, msg): - return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - - def ok(self, data = ""): - return RC_OK(data) - - def get_speed_bps (self): - return (self.speed * 1000 * 1000 * 1000) - - # take the port - def acquire(self, force = False): - params = {"port_id": self.port_id, - "user": self.user, - "session_id": self.session_id, - "force": force} - - command = RpcCmdData("acquire", params) - rc = self.transmit(command.method, command.params) - if rc.good(): - self.handler = rc.data() - return self.ok() - else: - return self.err(rc.err()) - - # release the port - def release(self): - params = {"port_id": self.port_id, - "handler": self.handler} - - command = RpcCmdData("release", params) - rc = self.transmit(command.method, command.params) - self.handler = None - - if rc.good(): - return self.ok() - else: - return self.err(rc.err()) - - def is_acquired(self): - return (self.handler != None) - - def is_active(self): - return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) - - def is_transmitting (self): - return (self.state == self.STATE_TX) - - def is_paused (self): - return (self.state == self.STATE_PAUSE) - - - def sync(self): - params = {"port_id": self.port_id} - - command = RpcCmdData("get_port_status", params) - rc = self.transmit(command.method, command.params) - if rc.bad(): - return self.err(rc.err()) - - # sync the port - port_state = rc.data()['state'] - - if port_state == "DOWN": - self.state = self.STATE_DOWN - elif port_state == "IDLE": - self.state = self.STATE_IDLE - elif port_state == "STREAMS": - self.state = self.STATE_STREAMS - elif port_state == "TX": - self.state = self.STATE_TX - elif port_state == "PAUSE": - self.state = self.STATE_PAUSE - else: - raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state)) - - # TODO: handle syncing the streams into stream_db - - return self.ok() - - - # return TRUE if write commands - def is_port_writable (self): - # operations on port can be done on state idle or state streams - return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) - - - # add streams - def add_streams (self, streams_list): - - if not self.is_acquired(): - return self.err("port is not owned") - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to add streams") - - batch = [] - for stream in (streams_list if isinstance(streams_list, list) else [streams_list]): - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream.get_id(), - "stream": stream.to_json()} - - cmd = RpcCmdData('add_stream', params) - batch.append(cmd) - - # meta data for show streams - self.streams[stream.get_id()] = StreamOnPort(stream.to_json(), - Port._generate_stream_metadata(stream.get_id(), stream.to_json())) - - rc = self.transmit_batch(batch) - if not rc: - return self.err(rc.err()) - - - - # the only valid state now - self.state = self.STATE_STREAMS - - return self.ok() - - - - # remove stream from port - def remove_streams (self, stream_id_list): - - if not self.is_acquired(): - return self.err("port is not owned") - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to remove streams") - - # single element to list - stream_id_list = stream_id_list if isinstance(stream_id_list, list) else [stream_id_list] - - # verify existance - if not all([stream_id in self.streams for stream_id in stream_id_list]): - return self.err("stream {0} does not exists".format(stream_id)) - - batch = [] - - for stream_id in stream_id_list: - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id} - - cmd = RpcCmdData('remove_stream', params) - batch.append(cmd) - - del self.streams[stream_id] - - - rc = self.transmit_batch(batch) - if not rc: - return self.err(rc.err()) - - self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE - - return self.ok() - - - # remove all the streams - def remove_all_streams (self): - - if not self.is_acquired(): - return self.err("port is not owned") - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to remove streams") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc = self.transmit("remove_all_streams", params) - if not rc: - return self.err(rc.err()) - - self.streams = {} - - self.state = self.STATE_IDLE - - return self.ok() - - # get a specific stream - def get_stream (self, stream_id): - if stream_id in self.streams: - return self.streams[stream_id] - else: - return None - - def get_all_streams (self): - return self.streams - - # start traffic - def start (self, mul, duration, force): - if not self.is_acquired(): - return self.err("port is not owned") - - if self.state == self.STATE_DOWN: - return self.err("Unable to start traffic - port is down") - - if self.state == self.STATE_IDLE: - return self.err("Unable to start traffic - no streams attached to port") - - if self.state == self.STATE_TX: - return self.err("Unable to start traffic - port is already transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul, - "duration": duration, - "force": force} - - rc = self.transmit("start_traffic", params) - if rc.bad(): - return self.err(rc.err()) - - self.state = self.STATE_TX - - return self.ok() - - # stop traffic - # with force ignores the cached state and sends the command - def stop (self, force = False): - - if not self.is_acquired(): - return self.err("port is not owned") - - # port is already stopped - if not force: - if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS): - return self.ok() - - - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc = self.transmit("stop_traffic", params) - if rc.bad(): - return self.err(rc.err()) - - # only valid state after stop - self.state = self.STATE_STREAMS - - return self.ok() - - def pause (self): - - if not self.is_acquired(): - return self.err("port is not owned") - - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc = self.transmit("pause_traffic", params) - if rc.bad(): - return self.err(rc.err()) - - # only valid state after stop - self.state = self.STATE_PAUSE - - return self.ok() - - - def resume (self): - - if not self.is_acquired(): - return self.err("port is not owned") - - if (self.state != self.STATE_PAUSE) : - return self.err("port is not in pause mode") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc = self.transmit("resume_traffic", params) - if rc.bad(): - return self.err(rc.err()) - - # only valid state after stop - self.state = self.STATE_TX - - return self.ok() - - - def update (self, mul, force): - - if not self.is_acquired(): - return self.err("port is not owned") - - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul, - "force": force} - - rc = self.transmit("update_traffic", params) - if rc.bad(): - return self.err(rc.err()) - - return self.ok() - - - def validate (self): - - if not self.is_acquired(): - return self.err("port is not owned") - - if (self.state == self.STATE_DOWN): - return self.err("port is down") - - if (self.state == self.STATE_IDLE): - return self.err("no streams attached to port") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc = self.transmit("validate", params) - if rc.bad(): - return self.err(rc.err()) - - self.profile = rc.data() - - return self.ok() - - def get_profile (self): - return self.profile - - - def print_profile (self, mult, duration): - if not self.get_profile(): - return - - rate = self.get_profile()['rate'] - graph = self.get_profile()['graph'] - - print format_text("Profile Map Per Port\n", 'underline', 'bold') - - factor = mult_to_factor(mult, rate['max_bps_l2'], rate['max_pps'], rate['max_line_util']) - - print "Profile max BPS L2 (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps_l2'], suffix = "bps"), - format_num(rate['max_bps_l2'] * factor, suffix = "bps")) - - print "Profile max BPS L1 (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps_l1'], suffix = "bps"), - format_num(rate['max_bps_l1'] * factor, suffix = "bps")) - - print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"), - format_num(rate['max_pps'] * factor, suffix = "pps"),) - - print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util']), - format_percentage(rate['max_line_util'] * factor)) - - - # duration - exp_time_base_sec = graph['expected_duration'] / (1000 * 1000) - exp_time_factor_sec = exp_time_base_sec / factor - - # user configured a duration - if duration > 0: - if exp_time_factor_sec > 0: - exp_time_factor_sec = min(exp_time_factor_sec, duration) - else: - exp_time_factor_sec = duration - - - print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec), - format_time(exp_time_factor_sec)) - print "\n" - - - def get_port_state_name(self): - return self.STATES_MAP.get(self.state, "Unknown") - - ################# stats handler ###################### - def generate_port_stats(self): - return self.port_stats.generate_stats() - - def generate_port_status(self): - return {"type": self.driver, - "maximum": "{speed} Gb/s".format(speed=self.speed), - "status": self.get_port_state_name() - } - - def clear_stats(self): - return self.port_stats.clear_stats() - - - def get_stats (self): - return self.port_stats.get_stats() - - - def invalidate_stats(self): - return self.port_stats.invalidate() - - ################# stream printout ###################### - def generate_loaded_streams_sum(self, stream_id_list): - if self.state == self.STATE_DOWN or self.state == self.STATE_STREAMS: - return {} - streams_data = {} - - if not stream_id_list: - # if no mask has been provided, apply to all streams on port - stream_id_list = self.streams.keys() - - - streams_data = {stream_id: self.streams[stream_id].metadata.get('stream_sum', ["N/A"] * 6) - for stream_id in stream_id_list - if stream_id in self.streams} - - - return {"referring_file" : "", - "streams" : streams_data} - - @staticmethod - def _generate_stream_metadata(stream_id, compiled_stream_obj): - meta_dict = {} - # create packet stream description - pkt_bld_obj = packet_builder.CTRexPktBuilder() - pkt_bld_obj.load_from_stream_obj(compiled_stream_obj) - # generate stream summary based on that - - next_stream = "None" if compiled_stream_obj['next_stream_id']==-1 else compiled_stream_obj['next_stream_id'] - - meta_dict['stream_sum'] = OrderedDict([("id", stream_id), - ("packet_type", "/".join(pkt_bld_obj.get_packet_layers())), - ("length", pkt_bld_obj.get_packet_length()), - ("mode", compiled_stream_obj['mode']['type']), - ("rate_pps", compiled_stream_obj['mode']['pps']), - ("next_stream", next_stream) - ]) - return meta_dict - - ################# events handler ###################### - def async_event_port_stopped (self): - self.state = self.STATE_STREAMS - - - def async_event_port_started (self): - self.state = self.STATE_TX - - - def async_event_port_paused (self): - self.state = self.STATE_PAUSE - - - def async_event_port_resumed (self): - self.state = self.STATE_TX - - def async_event_forced_acquired (self): - self.handler = None - diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py deleted file mode 100755 index 95fd2a69..00000000 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ /dev/null @@ -1,2035 +0,0 @@ -#!/router/bin/python - -try: - # support import for Python 2 - import outer_packages -except ImportError: - # support import for Python 3 - import client.outer_packages - -from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage -from client_utils import general_utils -from client_utils.packet_builder import CTRexPktBuilder -import json - -from common.trex_streams import * -from collections import namedtuple -from common.text_opts import * -from common import trex_stats -from client_utils import parsing_opts, text_tables -import time -import datetime -import re -import random -from trex_port import Port -from common.trex_types import * -from common.trex_stl_exceptions import * -from trex_async_client import CTRexAsyncClient -from yaml import YAMLError - - - -############################ logger ############################# -############################ ############################# -############################ ############################# - -# logger API for the client -class LoggerApi(object): - # verbose levels - VERBOSE_QUIET = 0 - VERBOSE_REGULAR = 1 - VERBOSE_HIGH = 2 - - def __init__(self): - self.level = LoggerApi.VERBOSE_REGULAR - - # implemented by specific logger - def write(self, msg, newline = True): - raise Exception("implement this") - - # implemented by specific logger - def flush(self): - raise Exception("implement this") - - def set_verbose (self, level): - if not level in xrange(self.VERBOSE_QUIET, self.VERBOSE_HIGH + 1): - raise ValueError("bad value provided for logger") - - self.level = level - - def get_verbose (self): - return self.level - - - def check_verbose (self, level): - return (self.level >= level) - - - # simple log message with verbose - def log (self, msg, level = VERBOSE_REGULAR, newline = True): - if not self.check_verbose(level): - return - - self.write(msg, newline) - - # logging that comes from async event - def async_log (self, msg, level = VERBOSE_REGULAR, newline = True): - self.log(msg, level, newline) - - - def pre_cmd (self, desc): - self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) - self.flush() - - def post_cmd (self, rc): - if rc: - self.log(format_text("[SUCCESS]\n", 'green', 'bold')) - else: - self.log(format_text("[FAILED]\n", 'red', 'bold')) - - - def log_cmd (self, desc): - self.pre_cmd(desc) - self.post_cmd(True) - - - # supress object getter - def supress (self): - class Supress(object): - def __init__ (self, logger): - self.logger = logger - - def __enter__ (self): - self.saved_level = self.logger.get_verbose() - self.logger.set_verbose(LoggerApi.VERBOSE_QUIET) - - def __exit__ (self, type, value, traceback): - self.logger.set_verbose(self.saved_level) - - return Supress(self) - - - -# default logger - to stdout -class DefaultLogger(LoggerApi): - - def __init__ (self): - super(DefaultLogger, self).__init__() - - def write (self, msg, newline = True): - if newline: - print msg - else: - print msg, - - def flush (self): - sys.stdout.flush() - - -############################ async event hander ############################# -############################ ############################# -############################ ############################# - -# handles different async events given to the client -class AsyncEventHandler(object): - - def __init__ (self, client): - self.client = client - self.logger = self.client.logger - - self.events = [] - - # public functions - - def get_events (self): - return self.events - - - def clear_events (self): - self.events = [] - - - def on_async_dead (self): - if self.client.connected: - msg = 'lost connection to server' - self.__add_event_log(msg, 'local', True) - self.client.connected = False - - - def on_async_alive (self): - pass - - - # handles an async stats update from the subscriber - def handle_async_stats_update(self, dump_data): - global_stats = {} - port_stats = {} - - # filter the values per port and general - for key, value in dump_data.iteritems(): - # match a pattern of ports - m = re.search('(.*)\-([0-8])', key) - if m: - port_id = int(m.group(2)) - field_name = m.group(1) - if self.client.ports.has_key(port_id): - if not port_id in port_stats: - port_stats[port_id] = {} - port_stats[port_id][field_name] = value - else: - continue - else: - # no port match - general stats - global_stats[key] = value - - # update the general object with the snapshot - self.client.global_stats.update(global_stats) - - # update all ports - for port_id, data in port_stats.iteritems(): - self.client.ports[port_id].port_stats.update(data) - - - # dispatcher for server async events (port started, port stopped and etc.) - def handle_async_event (self, type, data): - # DP stopped - - show_event = False - - # port started - if (type == 0): - port_id = int(data['port_id']) - ev = "Port {0} has started".format(port_id) - self.__async_event_port_started(port_id) - - # port stopped - elif (type == 1): - port_id = int(data['port_id']) - ev = "Port {0} has stopped".format(port_id) - - # call the handler - self.__async_event_port_stopped(port_id) - - - # port paused - elif (type == 2): - port_id = int(data['port_id']) - ev = "Port {0} has paused".format(port_id) - - # call the handler - self.__async_event_port_paused(port_id) - - # port resumed - elif (type == 3): - port_id = int(data['port_id']) - ev = "Port {0} has resumed".format(port_id) - - # call the handler - self.__async_event_port_resumed(port_id) - - # port finished traffic - elif (type == 4): - port_id = int(data['port_id']) - ev = "Port {0} job done".format(port_id) - - # call the handler - self.__async_event_port_stopped(port_id) - show_event = True - - # port was stolen... - elif (type == 5): - session_id = data['session_id'] - - # false alarm, its us - if session_id == self.client.session_id: - return - - port_id = int(data['port_id']) - who = data['who'] - - ev = "Port {0} was forcely taken by '{1}'".format(port_id, who) - - # call the handler - self.__async_event_port_forced_acquired(port_id) - show_event = True - - # server stopped - elif (type == 100): - ev = "Server has stopped" - self.__async_event_server_stopped() - show_event = True - - - else: - # unknown event - ignore - return - - - self.__add_event_log(ev, 'server', show_event) - - - # private functions - - def __async_event_port_stopped (self, port_id): - self.client.ports[port_id].async_event_port_stopped() - - - def __async_event_port_started (self, port_id): - self.client.ports[port_id].async_event_port_started() - - - def __async_event_port_paused (self, port_id): - self.client.ports[port_id].async_event_port_paused() - - - def __async_event_port_resumed (self, port_id): - self.client.ports[port_id].async_event_port_resumed() - - - def __async_event_port_forced_acquired (self, port_id): - self.client.ports[port_id].async_event_forced_acquired() - - - def __async_event_server_stopped (self): - self.client.connected = False - - - # add event to log - def __add_event_log (self, msg, ev_type, show = False): - - if ev_type == "server": - prefix = "[server]" - elif ev_type == "local": - prefix = "[local]" - - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) - - if show: - self.logger.async_log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) - - - - - -############################ RPC layer ############################# -############################ ############################# -############################ ############################# - -class CCommLink(object): - """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): - self.virtual = virtual - self.server = server - self.port = port - self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) - - @property - def is_connected(self): - if not self.virtual: - return self.rpc_link.connected - else: - return True - - def get_server (self): - return self.server - - def get_port (self): - return self.port - - def connect(self): - if not self.virtual: - return self.rpc_link.connect() - - def disconnect(self): - if not self.virtual: - return self.rpc_link.disconnect() - - def transmit(self, method_name, params={}): - if self.virtual: - self._prompt_virtual_tx_msg() - _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) - print msg - return - else: - return self.rpc_link.invoke_rpc_method(method_name, params) - - def transmit_batch(self, batch_list): - if self.virtual: - self._prompt_virtual_tx_msg() - print [msg - for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) - for command in batch_list]] - else: - batch = self.rpc_link.create_batch() - for command in batch_list: - batch.add(command.method, command.params) - # invoke the batch - return batch.invoke() - - def _prompt_virtual_tx_msg(self): - print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, - port=self.port) - - - -############################ client ############################# -############################ ############################# -############################ ############################# - -class STLClient(object): - """docstring for STLClient""" - - def __init__(self, - username = general_utils.get_current_user(), - server = "localhost", - sync_port = 4501, - async_port = 4500, - verbose_level = LoggerApi.VERBOSE_QUIET, - logger = None, - virtual = False): - - - self.username = username - - # init objects - self.ports = {} - self.server_version = {} - self.system_info = {} - self.session_id = random.getrandbits(32) - self.connected = False - - # logger - self.logger = DefaultLogger() if not logger else logger - - # initial verbose - self.logger.set_verbose(verbose_level) - - # low level RPC layer - self.comm_link = CCommLink(server, - sync_port, - virtual, - self.logger) - - # async event handler manager - self.event_handler = AsyncEventHandler(self) - - # async subscriber level - self.async_client = CTRexAsyncClient(server, - async_port, - self) - - - - - # stats - self.connection_info = {"username": username, - "server": server, - "sync_port": sync_port, - "async_port": async_port, - "virtual": virtual} - - - self.global_stats = trex_stats.CGlobalStats(self.connection_info, - self.server_version, - self.ports) - - self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, - self.ports) - - - - ############# private functions - used by the class itself ########### - - # some preprocessing for port argument - def __ports (self, port_id_list): - - # none means all - if port_id_list == None: - return range(0, self.get_port_count()) - - # always list - if isinstance(port_id_list, int): - port_id_list = [port_id_list] - - if not isinstance(port_id_list, list): - raise ValueError("bad port id list: {0}".format(port_id_list)) - - for port_id in port_id_list: - if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): - raise ValueError("bad port id {0}".format(port_id)) - - return port_id_list - - - # sync ports - def __sync_ports (self, port_id_list = None, force = False): - port_id_list = self.__ports(port_id_list) - - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].sync()) - - return rc - - # acquire ports, if port_list is none - get all - def __acquire (self, port_id_list = None, force = False): - port_id_list = self.__ports(port_id_list) - - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].acquire(force)) - - return rc - - # release ports - def __release (self, port_id_list = None): - port_id_list = self.__ports(port_id_list) - - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].release()) - - return rc - - - def __add_streams(self, stream_list, port_id_list = None): - - port_id_list = self.__ports(port_id_list) - - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].add_streams(stream_list)) - - return rc - - - - def __remove_streams(self, stream_id_list, port_id_list = None): - - port_id_list = self.__ports(port_id_list) - - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].remove_streams(stream_id_list)) - - return rc - - - - def __remove_all_streams(self, port_id_list = None): - port_id_list = self.__ports(port_id_list) - - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].remove_all_streams()) - - return rc - - - def __get_stream(self, stream_id, port_id, get_pkt = False): - - return self.ports[port_id].get_stream(stream_id) - - - def __get_all_streams(self, port_id, get_pkt = False): - - return self.ports[port_id].get_all_streams() - - - def __get_stream_id_list(self, port_id): - - return self.ports[port_id].get_stream_id_list() - - - def __start (self, multiplier, duration, port_id_list = None, force = False): - - port_id_list = self.__ports(port_id_list) - - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].start(multiplier, duration, force)) - - return rc - - - def __resume (self, port_id_list = None, force = False): - - port_id_list = self.__ports(port_id_list) - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].resume()) - - return rc - - def __pause (self, port_id_list = None, force = False): - - port_id_list = self.__ports(port_id_list) - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].pause()) - - return rc - - - def __stop (self, port_id_list = None, force = False): - - port_id_list = self.__ports(port_id_list) - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].stop(force)) - - return rc - - - def __update (self, mult, port_id_list = None, force = False): - - port_id_list = self.__ports(port_id_list) - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].update(mult, force)) - - return rc - - - def __validate (self, port_id_list = None): - port_id_list = self.__ports(port_id_list) - - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].validate()) - - return rc - - - - # connect to server - def __connect(self): - - # first disconnect if already connected - if self.is_connected(): - self.__disconnect() - - # clear this flag - self.connected = False - - # connect sync channel - self.logger.pre_cmd("Connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port'])) - rc = self.comm_link.connect() - self.logger.post_cmd(rc) - - if not rc: - return rc - - # version - rc = self._transmit("get_version") - if not rc: - return rc - - - self.server_version = rc.data() - self.global_stats.server_version = rc.data() - - # cache system info - rc = self._transmit("get_system_info") - if not rc: - return rc - - self.system_info = rc.data() - - # cache supported commands - rc = self._transmit("get_supported_cmds") - if not rc: - return rc - - self.supported_cmds = rc.data() - - # create ports - for port_id in xrange(self.system_info["port_count"]): - speed = self.system_info['ports'][port_id]['speed'] - driver = self.system_info['ports'][port_id]['driver'] - - self.ports[port_id] = Port(port_id, - speed, - driver, - self.username, - self.comm_link, - self.session_id) - - - # sync the ports - rc = self.__sync_ports() - if not rc: - return rc - - - # connect async channel - self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port'])) - rc = self.async_client.connect() - self.logger.post_cmd(rc) - - if not rc: - return rc - - self.connected = True - - return RC_OK() - - - # disconenct from server - def __disconnect(self, release_ports = True): - # release any previous acquired ports - if self.is_connected() and release_ports: - self.__release(self.get_acquired_ports()) - - self.comm_link.disconnect() - self.async_client.disconnect() - - self.connected = False - - return RC_OK() - - - # clear stats - def __clear_stats(self, port_id_list, clear_global): - - for port_id in port_id_list: - self.ports[port_id].clear_stats() - - if clear_global: - self.global_stats.clear_stats() - - self.logger.log_cmd("clearing stats on port(s) {0}:".format(port_id_list)) - - return RC - - - # get stats - def __get_stats (self, port_id_list): - stats = {} - - stats['global'] = self.global_stats.get_stats() - - total = {} - for port_id in port_id_list: - port_stats = self.ports[port_id].get_stats() - stats[port_id] = port_stats - - for k, v in port_stats.iteritems(): - if not k in total: - total[k] = v - else: - total[k] += v - - stats['total'] = total - - return stats - - - ############ functions used by other classes but not users ############## - - def _verify_port_id_list (self, port_id_list): - # check arguments - if not isinstance(port_id_list, list): - return RC_ERR("ports should be an instance of 'list' not {0}".format(type(port_id_list))) - - # all ports are valid ports - if not port_id_list or not all([port_id in self.get_all_ports() for port_id in port_id_list]): - return RC_ERR("") - - return RC_OK() - - def _validate_port_list(self, port_id_list): - if not isinstance(port_id_list, list): - return False - - # check each item of the sequence - return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list])) - - - - # transmit request on the RPC link - def _transmit(self, method_name, params={}): - return self.comm_link.transmit(method_name, params) - - # transmit batch request on the RPC link - def _transmit_batch(self, batch_list): - return self.comm_link.transmit_batch(batch_list) - - # stats - def _get_formatted_stats(self, port_id_list, stats_mask=set()): - stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) - - stats_obj = {} - for stats_type in stats_opts: - stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type)) - - return stats_obj - - def _get_streams(self, port_id_list, streams_mask=set()): - - streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask) - - return streams_obj - - - def _invalidate_stats (self, port_id_list): - for port_id in port_id_list: - self.ports[port_id].invalidate_stats() - - self.global_stats.invalidate() - - return RC_OK() - - - - - - ################################# - # ------ private methods ------ # - @staticmethod - def __get_mask_keys(ok_values={True}, **kwargs): - masked_keys = set() - for key, val in kwargs.iteritems(): - if val in ok_values: - masked_keys.add(key) - return masked_keys - - @staticmethod - def __filter_namespace_args(namespace, ok_values): - return {k: v for k, v in namespace.__dict__.items() if k in ok_values} - - - # API decorator - double wrap because of argument - def __api_check(connected = True): - - def wrap (f): - def wrap2(*args, **kwargs): - client = args[0] - - func_name = f.__name__ - - # check connection - if connected and not client.is_connected(): - raise STLStateError(func_name, 'disconnected') - - ret = f(*args, **kwargs) - return ret - return wrap2 - - return wrap - - - - ############################ API ############################# - ############################ ############################# - ############################ ############################# - def __enter__ (self): - self.connect() - self.acquire(force = True) - self.reset() - return self - - def __exit__ (self, type, value, traceback): - if self.get_active_ports(): - self.stop(self.get_active_ports()) - self.disconnect() - - ############################ Getters ############################# - ############################ ############################# - ############################ ############################# - - - # return verbose level of the logger - def get_verbose (self): - return self.logger.get_verbose() - - # is the client on read only mode ? - def is_all_ports_acquired (self): - return not (self.get_all_ports() == self.get_acquired_ports()) - - # is the client connected ? - def is_connected (self): - return self.connected and self.comm_link.is_connected - - - # get connection info - def get_connection_info (self): - return self.connection_info - - - # get supported commands by the server - def get_server_supported_cmds(self): - return self.supported_cmds - - # get server version - def get_server_version(self): - return self.server_version - - # get server system info - def get_server_system_info(self): - return self.system_info - - # get port count - def get_port_count(self): - return len(self.ports) - - - # returns the port object - def get_port (self, port_id): - port = self.ports.get(port_id, None) - if (port != None): - return port - else: - raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports()) - - - # get all ports as IDs - def get_all_ports (self): - return self.ports.keys() - - # get all acquired ports - def get_acquired_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_acquired()] - - # get all active ports (TX or pause) - def get_active_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_active()] - - # get paused ports - def get_paused_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_paused()] - - # get all TX ports - def get_transmitting_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_transmitting()] - - - # get stats - def get_stats (self, ports = None, async_barrier = True): - # by default use all ports - if ports == None: - ports = self.get_acquired_ports() - else: - ports = self.__ports(ports) - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - # check async barrier - if not type(async_barrier) is bool: - raise STLArgumentError('async_barrier', async_barrier) - - - # if the user requested a barrier - use it - if async_barrier: - rc = self.async_client.barrier() - if not rc: - raise STLError(rc) - - return self.__get_stats(ports) - - # return all async events - def get_events (self): - return self.event_handler.get_events() - - ############################ Commands ############################# - ############################ ############################# - ############################ ############################# - - - """ - Sets verbose level - - :parameters: - level : str - "high" - "low" - "normal" - - :raises: - None - - """ - def set_verbose (self, level): - modes = {'low' : LoggerApi.VERBOSE_QUIET, 'normal': LoggerApi.VERBOSE_REGULAR, 'high': LoggerApi.VERBOSE_HIGH} - - if not level in modes.keys(): - raise STLArgumentError('level', level) - - self.logger.set_verbose(modes[level]) - - - """ - Connects to the TRex server - - :parameters: - None - - :raises: - + :exc:`STLError` - - """ - @__api_check(False) - def connect (self): - rc = self.__connect() - if not rc: - raise STLError(rc) - - - """ - Disconnects from the server - - :parameters: - stop_traffic : bool - tries to stop traffic before disconnecting - release_ports : bool - tries to release all the acquired ports - - """ - @__api_check(False) - def disconnect (self, stop_traffic = True, release_ports = True): - - # try to stop ports but do nothing if not possible - if stop_traffic: - try: - self.stop() - except STLError: - pass - - - self.logger.pre_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], - self.connection_info['sync_port'])) - rc = self.__disconnect(release_ports) - self.logger.post_cmd(rc) - - - - """ - Acquires ports for executing commands - - :parameters: - ports : list - ports to execute the command - force : bool - force acquire the ports - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def acquire (self, ports = None, force = False): - # by default use all ports - if ports == None: - ports = self.get_all_ports() - - # verify ports - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - # verify valid port id list - if force: - self.logger.pre_cmd("Force acquiring ports {0}:".format(ports)) - else: - self.logger.pre_cmd("Acquiring ports {0}:".format(ports)) - - rc = self.__acquire(ports, force) - - self.logger.post_cmd(rc) - - if not rc: - # cleanup - self.__release(ports) - raise STLError(rc) - - - """ - Release ports - - :parameters: - ports : list - ports to execute the command - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def release (self, ports = None): - # by default use all acquired ports - if ports == None: - ports = self.get_acquired_ports() - - # verify ports - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - self.logger.pre_cmd("Releasing ports {0}:".format(ports)) - rc = self.__release(ports) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - """ - Pings the server - - :parameters: - None - - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def ping(self): - self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], - self.connection_info['sync_port'])) - rc = self._transmit("ping") - - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - - - """ - force acquire ports, stop the traffic, remove all streams and clear stats - - :parameters: - ports : list - ports to execute the command - - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def reset(self, ports = None): - - # by default use all ports - if ports == None: - ports = self.get_all_ports() - - # verify ports - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - self.acquire(ports, force = True) - self.stop(ports) - self.remove_all_streams(ports) - self.clear_stats(ports) - - - """ - remove all streams from port(s) - - :parameters: - ports : list - ports to execute the command - - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def remove_all_streams (self, ports = None): - - # by default use all ports - if ports == None: - ports = self.get_acquired_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(ports)) - rc = self.__remove_all_streams(ports) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - - """ - add a list of streams to port(s) - - :parameters: - ports : list - ports to execute the command - streams: list - streams to attach - - :returns: - list of stream IDs in order of the stream list - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def add_streams (self, streams, ports = None): - # by default use all ports - if ports == None: - ports = self.get_acquired_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - # transform single stream - if not isinstance(streams, list): - streams = [streams] - - # check streams - if not all([isinstance(stream, STLStream) for stream in streams]): - raise STLArgumentError('streams', streams) - - self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(streams), ports)) - rc = self.__add_streams(streams, ports) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - return [stream.get_id() for stream in streams] - - - """ - remove a list of streams from ports - - :parameters: - ports : list - ports to execute the command - stream_id_list: list - stream id list to remove - - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def remove_streams (self, stream_id_list, ports = None): - # by default use all ports - if ports == None: - ports = self.get_acquired_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - # transform single stream - if not isinstance(stream_id_list, list): - stream_id_list = [stream_id_list] - - # check streams - if not all([isinstance(stream_id, long) for stream_id in stream_id_list]): - raise STLArgumentError('stream_id_list', stream_id_list) - - # remove streams - self.logger.pre_cmd("Removing {0} streams from port(s) {1}:".format(len(stream_id_list), ports)) - rc = self.__remove_streams(stream_id_list, ports) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - - """ - load a profile file to port(s) - - :parameters: - filename : str - filename to load - ports : list - ports to execute the command - - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def load_profile (self, filename, ports = None): - - # check filename - if not os.path.isfile(filename): - raise STLError("file '{0}' does not exists".format(filename)) - - # by default use all ports - if ports == None: - ports = self.get_acquired_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - - streams = None - - # try YAML - try: - streams_db = CStreamsDB() - stream_list = streams_db.load_yaml_file(filename) - # convert to new style stream object - streams = [HACKSTLStream(stream) for stream in stream_list.compiled] - except YAMLError: - # try python loader - try: - basedir = os.path.dirname(filename) - - sys.path.append(basedir) - file = os.path.basename(filename).split('.')[0] - module = __import__(file, globals(), locals(), [], -1) - reload(module) # reload the update - - streams = module.register().get_streams() - - except Exception as e : - print str(e); - traceback.print_exc(file=sys.stdout) - raise STLError("Unexpected error: '{0}'".format(filename)) - - - self.add_streams(streams, ports) - - - - """ - start traffic on port(s) - - :parameters: - ports : list - ports to execute command - - mult : str - multiplier in a form of pps, bps, or line util in % - examples: "5kpps", "10gbps", "85%", "32mbps" - - force : bool - imply stopping the port of active and also - forces a profile that exceeds the L1 BW - - duration : int - limit the run for time in seconds - -1 means unlimited - - total : bool - should the B/W be divided by the ports - or duplicated for each - - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def start (self, - ports = None, - mult = "1", - force = False, - duration = -1, - total = False): - - - # by default use all ports - if ports == None: - ports = self.get_acquired_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - # verify multiplier - mult_obj = parsing_opts.decode_multiplier(mult, - allow_update = False, - divide_count = len(ports) if total else 1) - if not mult_obj: - raise STLArgumentError('mult', mult) - - # some type checkings - - if not type(force) is bool: - raise STLArgumentError('force', force) - - if not isinstance(duration, (int, float)): - raise STLArgumentError('duration', duration) - - if not type(total) is bool: - raise STLArgumentError('total', total) - - - # verify ports are stopped or force stop them - active_ports = list(set(self.get_active_ports()).intersection(ports)) - if active_ports: - if not force: - raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports)) - else: - rc = self.stop(active_ports) - if not rc: - raise STLError(rc) - - - # start traffic - self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports)) - rc = self.__start(mult_obj, duration, ports, force) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - - - - """ - stop port(s) - - :parameters: - ports : list - ports to execute the command - - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def stop (self, ports = None): - - # by default the user means all the active ports - if ports == None: - ports = self.get_active_ports() - if not ports: - return - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(ports)) - rc = self.__stop(ports) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - - - """ - update traffic on port(s) - - :parameters: - ports : list - ports to execute command - - mult : str - multiplier in a form of pps, bps, or line util in % - and also with +/- - examples: "5kpps+", "10gbps-", "85%", "32mbps", "20%+" - - force : bool - forces a profile that exceeds the L1 BW - - total : bool - should the B/W be divided by the ports - or duplicated for each - - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def update (self, ports = None, mult = "1", total = False, force = False): - - # by default the user means all the active ports - if ports == None: - ports = self.get_active_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - # verify multiplier - mult_obj = parsing_opts.decode_multiplier(mult, - allow_update = True, - divide_count = len(ports) if total else 1) - if not mult_obj: - raise STLArgumentError('mult', mult) - - # verify total - if not type(total) is bool: - raise STLArgumentError('total', total) - - - # call low level functions - self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(ports)) - rc = self.__update(mult, ports, force) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - - - """ - pause traffic on port(s) - - :parameters: - ports : list - ports to execute command - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def pause (self, ports = None): - - # by default the user means all the TX ports - if ports == None: - ports = self.get_transmitting_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(ports)) - rc = self.__pause(ports) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - - - """ - resume traffic on port(s) - - :parameters: - ports : list - ports to execute command - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def resume (self, ports = None): - - # by default the user means all the paused ports - if ports == None: - ports = self.get_paused_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(ports)) - rc = self.__resume(ports) - self.logger.post_cmd(rc) - - if not rc: - raise STLError(rc) - - - """ - validate port(s) configuration - - :parameters: - ports : list - ports to execute command - - mult : str - multiplier in a form of pps, bps, or line util in % - examples: "5kpps", "10gbps", "85%", "32mbps" - - duration : int - limit the run for time in seconds - -1 means unlimited - - total : bool - should the B/W be divided by the ports - or duplicated for each - - :raises: - + :exc:`STLError` - - """ - @__api_check(True) - def validate (self, ports = None, mult = "1", duration = "-1", total = False): - if ports == None: - ports = self.get_acquired_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - # verify multiplier - mult_obj = parsing_opts.decode_multiplier(mult, - allow_update = True, - divide_count = len(ports) if total else 1) - if not mult_obj: - raise STLArgumentError('mult', mult) - - - if not isinstance(duration, (int, float)): - raise STLArgumentError('duration', duration) - - - self.logger.pre_cmd("Validating streams on port(s) {0}:".format(ports)) - rc = self.__validate(ports) - self.logger.post_cmd(rc) - - - for port in ports: - self.ports[port].print_profile(mult_obj, duration) - - - """ - clear stats on port(s) - - :parameters: - ports : list - ports to execute command - - clear_global : bool - clear the global stats - - :raises: - + :exc:`STLError` - - """ - @__api_check(False) - def clear_stats (self, ports = None, clear_global = True): - - # by default use all ports - if ports == None: - ports = self.get_all_ports() - else: - ports = self.__ports(ports) - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - # verify clear global - if not type(clear_global) is bool: - raise STLArgumentError('clear_global', clear_global) - - - rc = self.__clear_stats(ports, clear_global) - if not rc: - raise STLError(rc) - - - - - - """ - block until specify port(s) traffic has ended - - :parameters: - ports : list - ports to execute command - - timeout : int - timeout in seconds - - :raises: - + :exc:`STLTimeoutError` - in case timeout has expired - + :exe:'STLError' - - """ - @__api_check(True) - def wait_on_traffic (self, ports = None, timeout = 60): - - # by default use all acquired ports - if ports == None: - ports = self.get_acquired_ports() - - # verify valid port id list - rc = self._validate_port_list(ports) - if not rc: - raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - - expr = time.time() + timeout - - # wait while any of the required ports are active - while set(self.get_active_ports()).intersection(ports): - time.sleep(0.01) - if time.time() > expr: - raise STLTimeoutError(timeout) - - - """ - clear all events - - :parameters: - None - - :raises: - None - - """ - def clear_events (self): - self.event_handler.clear_events() - - - ############################ Line ############################# - ############################ Commands ############################# - ############################ ############################# - - # console decorator - def __console(f): - def wrap(*args): - client = args[0] - - time1 = time.time() - - try: - rc = f(*args) - except STLError as e: - client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) - return - - # if got true - print time - if rc: - delta = time.time() - time1 - client.logger.log(format_time(delta) + "\n") - - - return wrap - - - @__console - def connect_line (self, line): - '''Connects to the TRex server''' - # define a parser - parser = parsing_opts.gen_parser(self, - "connect", - self.connect_line.__doc__, - parsing_opts.FORCE) - - opts = parser.parse_args(line.split()) - - if opts is None: - return - - # call the API - self.connect() - self.acquire(force = opts.force) - - # true means print time - return True - - @__console - def disconnect_line (self, line): - self.disconnect() - - - - @__console - def reset_line (self, line): - self.reset() - - # true means print time - return True - - - @__console - def start_line (self, line): - '''Start selected traffic in specified ports on TRex\n''' - # define a parser - parser = parsing_opts.gen_parser(self, - "start", - self.start_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.TOTAL, - parsing_opts.FORCE, - parsing_opts.STREAM_FROM_PATH_OR_FILE, - parsing_opts.DURATION, - parsing_opts.MULTIPLIER_STRICT, - parsing_opts.DRY_RUN) - - opts = parser.parse_args(line.split()) - - - if opts is None: - return - - - active_ports = list(set(self.get_active_ports()).intersection(opts.ports)) - - if active_ports: - if not opts.force: - msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports) - self.logger.log(format_text(msg, 'bold')) - return - else: - self.stop(active_ports) - - - # remove all streams - self.remove_all_streams(opts.ports) - - # pack the profile - self.load_profile(opts.file[0], opts.ports) - - if opts.dry: - self.validate(opts.ports, opts.mult, opts.duration, opts.total) - else: - self.start(opts.ports, - opts.mult, - opts.force, - opts.duration, - opts.total) - - # true means print time - return True - - - - @__console - def stop_line (self, line): - '''Stop active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self, - "stop", - self.stop_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - - opts = parser.parse_args(line.split()) - if opts is None: - return - - # find the relevant ports - ports = list(set(self.get_active_ports()).intersection(opts.ports)) - - if not ports: - self.logger.log(format_text("No active traffic on provided ports\n", 'bold')) - return - - self.stop(ports) - - # true means print time - return True - - - @__console - def update_line (self, line): - '''Update port(s) speed currently active\n''' - parser = parsing_opts.gen_parser(self, - "update", - self.update_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.MULTIPLIER, - parsing_opts.TOTAL, - parsing_opts.FORCE) - - opts = parser.parse_args(line.split()) - if opts is None: - return - - # find the relevant ports - ports = list(set(self.get_active_ports()).intersection(opts.ports)) - - if not ports: - self.logger.log(format_text("No ports in valid state to update\n", 'bold')) - return - - self.update(ports, opts.mult, opts.total, opts.force) - - # true means print time - return True - - - @__console - def pause_line (self, line): - '''Pause active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self, - "pause", - self.pause_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - - opts = parser.parse_args(line.split()) - if opts is None: - return - - # find the relevant ports - ports = list(set(self.get_transmitting_ports()).intersection(opts.ports)) - - if not ports: - self.logger.log(format_text("No ports in valid state to pause\n", 'bold')) - return - - self.pause(ports) - - # true means print time - return True - - - @__console - def resume_line (self, line): - '''Resume active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self, - "resume", - self.resume_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - - opts = parser.parse_args(line.split()) - if opts is None: - return - - # find the relevant ports - ports = list(set(self.get_paused_ports()).intersection(opts.ports)) - - if not ports: - self.logger.log(format_text("No ports in valid state to resume\n", 'bold')) - return - - return self.resume(ports) - - # true means print time - return True - - - @__console - def clear_stats_line (self, line): - '''Clear cached local statistics\n''' - # define a parser - parser = parsing_opts.gen_parser(self, - "clear", - self.clear_stats_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - - opts = parser.parse_args(line.split()) - - if opts is None: - return - - self.clear_stats(opts.ports) - - - - - @__console - def show_stats_line (self, line): - '''Fetch statistics from TRex server by port\n''' - # define a parser - parser = parsing_opts.gen_parser(self, - "stats", - self.show_stats_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.STATS_MASK) - - opts = parser.parse_args(line.split()) - - if opts is None: - return - - # determine stats mask - mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) - if not mask: - # set to show all stats if no filter was given - mask = trex_stats.ALL_STATS_OPTS - - stats_opts = trex_stats.ALL_STATS_OPTS.intersection(mask) - - stats = self._get_formatted_stats(opts.ports, mask) - - - # print stats to screen - for stat_type, stat_data in stats.iteritems(): - text_tables.print_table_with_header(stat_data.text_table, stat_type) - - - @__console - def show_streams_line(self, line): - '''Fetch streams statistics from TRex server by port\n''' - # define a parser - parser = parsing_opts.gen_parser(self, - "streams", - self.show_streams_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.STREAMS_MASK) - - opts = parser.parse_args(line.split()) - - if opts is None: - return - - streams = self._get_streams(opts.ports, set(opts.streams)) - if not streams: - self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta")) - - else: - # print stats to screen - for stream_hdr, port_streams_data in streams.iteritems(): - text_tables.print_table_with_header(port_streams_data.text_table, - header= stream_hdr.split(":")[0] + ":", - untouched_header= stream_hdr.split(":")[1]) - - - - - @__console - def validate_line (self, line): - '''validates port(s) stream configuration\n''' - - parser = parsing_opts.gen_parser(self, - "validate", - self.validate_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - - opts = parser.parse_args(line.split()) - if opts is None: - return - - self.validate(opts.ports) - - - -
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_sim.py b/scripts/automation/trex_control_plane/client/trex_stateless_sim.py deleted file mode 100644 index 1452cdd1..00000000 --- a/scripts/automation/trex_control_plane/client/trex_stateless_sim.py +++ /dev/null @@ -1,430 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -Itay Marom -Cisco Systems, Inc. - -Copyright (c) 2015-2015 Cisco Systems, Inc. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -try: - # support import for Python 2 - import outer_packages -except ImportError: - # support import for Python 3 - import client.outer_packages - -from common.trex_stl_exceptions import STLError -from yaml import YAMLError -from common.trex_streams import * -from client_utils import parsing_opts - -import re -import json - - - -import argparse -import tempfile -import subprocess -import os -from dpkt import pcap -from operator import itemgetter - -class BpSimException(Exception): - pass - -def merge_cap_files (pcap_file_list, out_filename, delete_src = False): - - out_pkts = [] - if not all([os.path.exists(f) for f in pcap_file_list]): - print "failed to merge cap file list...\nnot all files exist\n" - return - - # read all packets to a list - for src in pcap_file_list: - f = open(src, 'r') - reader = pcap.Reader(f) - pkts = reader.readpkts() - out_pkts += pkts - f.close() - if delete_src: - os.unlink(src) - - # sort by the timestamp - out_pkts = sorted(out_pkts, key=itemgetter(0)) - - - out = open(out_filename, 'w') - out_writer = pcap.Writer(out) - - for ts, pkt in out_pkts: - out_writer.writepkt(pkt, ts) - - out.close() - - - -# stateless simulation -class STLSim(object): - def __init__ (self, bp_sim_path = None, handler = 0, port_id = 0): - - if not bp_sim_path: - # auto find scripts - m = re.match(".*/trex-core", os.getcwd()) - if not m: - raise STLError('cannot find BP sim path, please provide it') - - self.bp_sim_path = os.path.join(m.group(0), 'scripts') - - else: - self.bp_sim_path = bp_sim_path - - # dummies - self.handler = handler - self.port_id = port_id - - - def load_input_file (self, input_file): - # try YAML - try: - streams_db = CStreamsDB() - stream_list = streams_db.load_yaml_file(input_file) - - # convert to new style stream object - return [HACKSTLStream(stream) for stream in stream_list.compiled] - except YAMLError: - pass - - # try python - try: - basedir = os.path.dirname(input_file) - sys.path.append(basedir) - - file = os.path.basename(input_file).split('.')[0] - module = __import__(file, globals(), locals(), [], -1) - - return module.register().get_streams() - - except (AttributeError, ImportError) as e: - print "specific error: {0}".format(e) - - raise STLError("bad format input file '{0}'".format(input_file)) - - - def generate_start_cmd (self, mult = "1", force = True, duration = -1): - return {"id":1, - "jsonrpc": "2.0", - "method": "start_traffic", - "params": {"handler": self.handler, - "force": force, - "port_id": self.port_id, - "mul": parsing_opts.decode_multiplier(mult), - "duration": duration} - } - - - - # run command - # input_list - a list of streams or YAML files - # outfile - pcap file to save output, if None its a dry run - # dp_core_count - how many DP cores to use - # dp_core_index - simulate only specific dp core without merging - # is_debug - debug or release image - # pkt_limit - how many packets to simulate - # mult - multiplier - # mode - can be 'valgrind, 'gdb', 'json' or 'none' - def run (self, - input_list, - outfile = None, - dp_core_count = 1, - dp_core_index = None, - is_debug = True, - pkt_limit = 5000, - mult = "1", - duration = -1, - mode = 'none'): - - if not mode in ['none', 'gdb', 'valgrind', 'json']: - raise STLArgumentError('mode', mode) - - # listify - input_list = input_list if isinstance(input_list, list) else [input_list] - - # check streams arguments - if not all([isinstance(i, (STLStream, str)) for i in input_list]): - raise STLArgumentError('input_list', input_list) - - # split to two type - input_files = [x for x in input_list if isinstance(x, str)] - stream_list = [x for x in input_list if isinstance(x, STLStream)] - - # handle YAMLs - for input_file in input_files: - stream_list += self.load_input_file(input_file) - - - # load streams - cmds_json = [] - for stream in stream_list: - cmd = {"id":1, - "jsonrpc": "2.0", - "method": "add_stream", - "params": {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream.get_id(), - "stream": stream.to_json()} - } - - cmds_json.append(cmd) - - # generate start command - cmds_json.append(self.generate_start_cmd(mult = mult, - force = True, - duration = duration)) - - if mode == 'json': - print json.dumps(cmds_json, indent = 4, separators=(',', ': '), sort_keys = True) - return - - # start simulation - self.outfile = outfile - self.dp_core_count = dp_core_count - self.dp_core_index = dp_core_index - self.is_debug = is_debug - self.pkt_limit = pkt_limit - self.mult = mult - self.duration = duration, - self.mode = mode - - self.__run(cmds_json) - - - # internal run - def __run (self, cmds_json): - - # write to temp file - f = tempfile.NamedTemporaryFile(delete = False) - f.write(json.dumps(cmds_json)) - f.close() - - # launch bp-sim - try: - self.execute_bp_sim(f.name) - finally: - os.unlink(f.name) - - - - def execute_bp_sim (self, json_filename): - if self.is_debug: - exe = os.path.join(self.bp_sim_path, 'bp-sim-64-debug') - else: - exe = os.path.join(self.bp_sim_path, 'bp-sim-64') - - if not os.path.exists(exe): - raise STLError("'{0}' does not exists, please build it before calling the simulation".format(exe)) - - - cmd = [exe, - '--pcap', - '--sl', - '--cores', - str(self.dp_core_count), - '--limit', - str(self.pkt_limit), - '-f', - json_filename] - - # out or dry - if not self.outfile: - cmd += ['--dry'] - cmd += ['-o', '/dev/null'] - else: - cmd += ['-o', self.outfile] - - if self.dp_core_index != None: - cmd += ['--core_index', str(self.dp_core_index)] - - if self.mode == 'valgrind': - cmd = ['valgrind', '--leak-check=full', '--error-exitcode=1'] + cmd - - elif self.mode == 'gdb': - cmd = ['/bin/gdb', '--args'] + cmd - - print "executing command: '{0}'".format(" ".join(cmd)) - rc = subprocess.call(cmd) - if rc != 0: - raise STLError('simulation has failed with error code {0}'.format(rc)) - - self.merge_results() - - - def merge_results (self): - if not self.outfile: - return - - if self.dp_core_count == 1: - return - - if self.dp_core_index != None: - return - - - print "Mering cores output to a single pcap file...\n" - inputs = ["{0}-{1}".format(self.outfile, index) for index in xrange(0, self.dp_core_count)] - merge_cap_files(inputs, self.outfile, delete_src = True) - - - - -def is_valid_file(filename): - if not os.path.isfile(filename): - raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename) - - return filename - - -def unsigned_int (x): - x = int(x) - if x < 0: - raise argparse.ArgumentTypeError("argument must be >= 0") - - return x - -def setParserOptions(): - parser = argparse.ArgumentParser(prog="stl_sim.py") - - parser.add_argument("-f", - dest ="input_file", - help = "input file in YAML or Python format", - type = is_valid_file, - required=True) - - parser.add_argument("-o", - dest = "output_file", - default = None, - help = "output file in ERF format") - - - parser.add_argument("-c", "--cores", - help = "DP core count [default is 1]", - dest = "dp_core_count", - default = 1, - type = int, - choices = xrange(1, 9)) - - parser.add_argument("-n", "--core_index", - help = "Record only a specific core", - dest = "dp_core_index", - default = None, - type = int) - - parser.add_argument("-r", "--release", - help = "runs on release image instead of debug [default is False]", - action = "store_true", - default = False) - - - parser.add_argument("-l", "--limit", - help = "limit test total packet count [default is 5000]", - default = 5000, - type = unsigned_int) - - parser.add_argument('-m', '--multiplier', - help = parsing_opts.match_multiplier_help, - dest = 'mult', - default = "1", - type = parsing_opts.match_multiplier_strict) - - parser.add_argument('-d', '--duration', - help = "run duration", - dest = 'duration', - default = -1, - type = float) - - - group = parser.add_mutually_exclusive_group() - - group.add_argument("-x", "--valgrind", - help = "run under valgrind [default is False]", - action = "store_true", - default = False) - - group.add_argument("-g", "--gdb", - help = "run under GDB [default is False]", - action = "store_true", - default = False) - - group.add_argument("--json", - help = "generate JSON output only to stdout [default is False]", - action = "store_true", - default = False) - - return parser - - -def validate_args (parser, options): - - if options.dp_core_index: - if not options.dp_core_index in xrange(0, options.dp_core_count): - parser.error("DP core index valid range is 0 to {0}".format(options.dp_core_count - 1)) - - # zero is ok - no limit, but other values must be at least as the number of cores - if (options.limit != 0) and options.limit < options.dp_core_count: - parser.error("limit cannot be lower than number of DP cores") - - -def main (): - parser = setParserOptions() - options = parser.parse_args() - - validate_args(parser, options) - - - - if options.valgrind: - mode = 'valgrind' - elif options.gdb: - mode = 'gdb' - elif options.json: - mode = 'json' - else: - mode = 'none' - - try: - r = STLSim() - r.run(input_list = options.input_file, - outfile = options.output_file, - dp_core_count = options.dp_core_count, - dp_core_index = options.dp_core_index, - is_debug = (not options.release), - pkt_limit = options.limit, - mult = options.mult, - duration = options.duration, - mode = mode) - - except KeyboardInterrupt as e: - print "\n\n*** Caught Ctrl + C... Exiting...\n\n" - exit(1) - - except STLError as e: - print e - exit(1) - - exit(0) - -if __name__ == '__main__': - main() - - |