diff options
Diffstat (limited to 'scripts/automation')
17 files changed, 1982 insertions, 1672 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 8b274134..459d6915 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -19,6 +19,7 @@ import re from common.trex_stats import * from common.trex_streams import * +from common.trex_types import * # basic async stats class class CTRexAsyncStats(object): @@ -152,38 +153,115 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client): + def __init__ (self, server, port, stateless_client, prn_func = None): self.port = port self.server = server self.stateless_client = stateless_client + self.prn_func = prn_func self.raw_snapshot = {} self.stats = CTRexAsyncStatsManager() + self.last_data_recv_ts = 0 + + self.connected = False + + # connects the async channel + def connect (self): + + if self.connected: + self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + if self.prn_func: + self.prn_func(msg) + else: + print msg + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + + # before running the thread - mark as active self.active = True - self.t = threading.Thread(target= self.run) + self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() - def run(self): + self.connected = True - # Socket to talk to server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) + # wait for data streaming from the server + timeout = time.time() + 5 + while not self.is_alive(): + time.sleep(0.01) + if time.time() > timeout: + self.disconnect() + return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + + return RC_OK() + + + # disconnect + def disconnect (self): + if not self.connected: + return + + # signal that the context was destroyed (exit the thread loop) + self.context.term() + + # mark for join and join + self.active = False + self.t.join() + + # done + self.connected = False + + # thread function + def _run (self): + + + # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') + self.socket.setsockopt(zmq.RCVTIMEO, 5000) + + got_data = False while self.active: - line = self.socket.recv_string() + try: + + line = self.socket.recv_string() + self.last_data_recv_ts = time.time() + + # signal once + if not got_data: + self.stateless_client.on_async_alive() + got_data = True + + + # got a timeout - mark as not alive and retry + except zmq.Again: + + # signal once + if got_data: + self.stateless_client.on_async_dead() + got_data = False + + continue + + except zmq.ContextTerminated: + # outside thread signaled us to exit + break + msg = json.loads(line) name = msg['name'] @@ -193,7 +271,19 @@ class CTRexAsyncClient(): self.__dispatch(name, type, data) - def get_stats(self): + + # closing of socket must be from the same thread + self.socket.close(linger = 0) + + + # did we get info for the last 3 seconds ? + def is_alive (self): + if self.last_data_recv_ts == None: + return False + + return ( (time.time() - self.last_data_recv_ts) < 3 ) + + def get_stats (self): return self.stats def get_raw_snapshot (self): @@ -203,7 +293,6 @@ class CTRexAsyncClient(): def __dispatch (self, name, type, data): # stats if name == "trex-global": - # self.stats.update(data) self.stateless_client.handle_async_stats_update(data) # events elif name == "trex-event": @@ -212,10 +301,3 @@ class CTRexAsyncClient(): pass - def stop (self): - self.active = False - self.t.join() - - -if __name__ == "__main__": - pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py index 160abdec..5709b7a5 100755 --- a/scripts/automation/trex_control_plane/client/trex_client.py +++ b/scripts/automation/trex_control_plane/client/trex_client.py @@ -88,7 +88,7 @@ class CTRexClient(object): finally: self.prompt_verbose_data() - def start_trex (self, f, d, block_to_success = True, timeout = 30, user = None, **trex_cmd_options): + def start_trex (self, f, d, block_to_success = True, timeout = 40, user = None, trex_development = False, **trex_cmd_options): """ Request to start a TRex run on server. @@ -104,7 +104,7 @@ class CTRexClient(object): timeout : int maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running' - default value: **30** + default value: **40** user : str the identity of the the run issuer. trex_cmd_options : key, val @@ -125,13 +125,17 @@ class CTRexClient(object): user = user or self.__default_user try: d = int(d) - if d < 30: # specify a test should take at least 30 seconds long. + if d < 30 and not trex_development: # test duration should be at least 30 seconds, unless trex_development flag is specified. raise ValueError except ValueError: raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.') trex_cmd_options.update( {'f' : f, 'd' : d} ) - + if not trex_cmd_options.get('l'): + self.result_obj.latency_checked = False + if 'k' in trex_cmd_options: + timeout += int(trex_cmd_options['k']) # during 'k' seconds TRex stays in 'Starting' state + self.result_obj.clear_results() try: issue_time = time.time() @@ -544,7 +548,7 @@ class CTRexClient(object): Get TRex version details. :return: - Trex details (Version, User, Date, Uuid) as ordered dictionary + Trex details (Version, User, Date, Uuid, Git SHA) as ordered dictionary :raises: + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex version could not be determined. @@ -556,9 +560,11 @@ class CTRexClient(object): version_dict = OrderedDict() result_lines = binascii.a2b_base64(self.server.get_trex_version()).split('\n') for line in result_lines: + if not line: + continue key, value = line.strip().split(':', 1) version_dict[key.strip()] = value.strip() - for key in ('Version', 'User', 'Date', 'Uuid'): + for key in ('Version', 'User', 'Date', 'Uuid', 'Git SHA'): if key not in version_dict: raise Exception('get_trex_version: got server response without key: {0}'.format(key)) return version_dict @@ -767,6 +773,7 @@ class CTRexResult(object): """ self._history = deque(maxlen = max_history_size) self.clear_results() + self.latency_checked = True def __repr__(self): return ("Is valid history? {arg}\n".format( arg = self.is_valid_hist() ) + @@ -1032,18 +1039,19 @@ class CTRexResult(object): self._done_warmup = True # handle latency data - latency_pre = "trex-latency" - self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC - # support old typo - if self._max_latency is None: - latency_pre = "trex-latecny" - self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-") - - self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC - self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency) - - avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-") - self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list) + if self.latency_checked: + latency_pre = "trex-latency" + self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC + # support old typo + if self._max_latency is None: + latency_pre = "trex-latecny" + self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-") + + self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC + self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency) + + avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-") + self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list) tx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_tx_pkts") rx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_rx_pkts") diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py new file mode 100644 index 00000000..54b4945e --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -0,0 +1,430 @@ + +from collections import namedtuple +from common.trex_types import * +from common import trex_stats + + +########## utlity ############ +def mult_to_factor (mult, max_bps, max_pps, line_util): + if mult['type'] == 'raw': + return mult['value'] + + if mult['type'] == 'bps': + return mult['value'] / max_bps + + if mult['type'] == 'pps': + return mult['value'] / max_pps + + if mult['type'] == 'percentage': + return mult['value'] / line_util + + +# describes a single port +class Port(object): + STATE_DOWN = 0 + STATE_IDLE = 1 + STATE_STREAMS = 2 + STATE_TX = 3 + STATE_PAUSE = 4 + PortState = namedtuple('PortState', ['state_id', 'state_name']) + STATES_MAP = {STATE_DOWN: "DOWN", + STATE_IDLE: "IDLE", + STATE_STREAMS: "IDLE", + STATE_TX: "ACTIVE", + STATE_PAUSE: "PAUSE"} + + + def __init__ (self, port_id, speed, driver, user, comm_link, session_id): + self.port_id = port_id + self.state = self.STATE_IDLE + self.handler = None + self.comm_link = comm_link + self.transmit = comm_link.transmit + self.transmit_batch = comm_link.transmit_batch + self.user = user + self.driver = driver + self.speed = speed + self.streams = {} + self.profile = None + self.session_id = session_id + + self.port_stats = trex_stats.CPortStats(self) + + + def err(self, msg): + return RC_ERR("port {0} : {1}".format(self.port_id, msg)) + + def ok(self, data = "ACK"): + return RC_OK(data) + + def get_speed_bps (self): + return (self.speed * 1000 * 1000 * 1000) + + # take the port + def acquire(self, force = False): + params = {"port_id": self.port_id, + "user": self.user, + "session_id": self.session_id, + "force": force} + + command = RpcCmdData("acquire", params) + rc = self.transmit(command.method, command.params) + if rc.good(): + self.handler = rc.data() + return self.ok() + else: + return self.err(rc.err()) + + # release the port + def release(self): + params = {"port_id": self.port_id, + "handler": self.handler} + + command = RpcCmdData("release", params) + rc = self.transmit(command.method, command.params) + self.handler = None + + if rc.good(): + return self.ok() + else: + return self.err(rc.err()) + + def is_acquired(self): + return (self.handler != None) + + def is_active(self): + return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) + + def is_transmitting (self): + return (self.state == self.STATE_TX) + + def is_paused (self): + return (self.state == self.STATE_PAUSE) + + + def sync(self): + params = {"port_id": self.port_id} + + command = RpcCmdData("get_port_status", params) + rc = self.transmit(command.method, command.params) + if rc.bad(): + return self.err(rc.err()) + + # sync the port + port_state = rc.data()['state'] + + if port_state == "DOWN": + self.state = self.STATE_DOWN + elif port_state == "IDLE": + self.state = self.STATE_IDLE + elif port_state == "STREAMS": + self.state = self.STATE_STREAMS + elif port_state == "TX": + self.state = self.STATE_TX + elif port_state == "PAUSE": + self.state = self.STATE_PAUSE + else: + raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) + + return self.ok() + + + # return TRUE if write commands + def is_port_writable (self): + # operations on port can be done on state idle or state streams + return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) + + # add stream to the port + def add_stream (self, stream_id, stream_obj): + + if not self.is_port_writable(): + return self.err("Please stop port before attempting to add streams") + + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id, + "stream": stream_obj} + + rc = self.transmit("add_stream", params) + if rc.bad(): + return self.err(rc.err()) + + # add the stream + self.streams[stream_id] = stream_obj + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # add multiple streams + def add_streams (self, streams_list): + batch = [] + + for stream in streams_list: + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream.stream_id, + "stream": stream.stream} + + cmd = RpcCmdData('add_stream', params) + batch.append(cmd) + + rc = self.transmit_batch(batch) + if rc.bad(): + return self.err(rc.err()) + + # validate that every action succeeded + + # add the stream + for stream in streams_list: + self.streams[stream.stream_id] = stream.stream + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # remove stream from port + def remove_stream (self, stream_id): + + if not stream_id in self.streams: + return self.err("stream {0} does not exists".format(stream_id)) + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id} + + + rc = self.transmit("remove_stream", params) + if rc.bad(): + return self.err(rc.err()) + + self.streams[stream_id] = None + + self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE + + return self.ok() + + # remove all the streams + def remove_all_streams (self): + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("remove_all_streams", params) + if rc.bad(): + return self.err(rc.err()) + + self.streams = {} + + self.state = self.STATE_IDLE + + return self.ok() + + # get a specific stream + def get_stream (self, stream_id): + if stream_id in self.streams: + return self.streams[stream_id] + else: + return None + + def get_all_streams (self): + return self.streams + + # start traffic + def start (self, mul, duration): + if self.state == self.STATE_DOWN: + return self.err("Unable to start traffic - port is down") + + if self.state == self.STATE_IDLE: + return self.err("Unable to start traffic - no streams attached to port") + + if self.state == self.STATE_TX: + return self.err("Unable to start traffic - port is already transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul, + "duration": duration} + + rc = self.transmit("start_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + self.state = self.STATE_TX + + return self.ok() + + # stop traffic + # with force ignores the cached state and sends the command + def stop (self, force = False): + + if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("stop_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + # only valid state after stop + self.state = self.STATE_STREAMS + + return self.ok() + + def pause (self): + + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("pause_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + # only valid state after stop + self.state = self.STATE_PAUSE + + return self.ok() + + + def resume (self): + + if (self.state != self.STATE_PAUSE) : + return self.err("port is not in pause mode") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("resume_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + # only valid state after stop + self.state = self.STATE_TX + + return self.ok() + + + def update (self, mul): + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul} + + rc = self.transmit("update_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + return self.ok() + + + def validate (self): + + if (self.state == self.STATE_DOWN): + return self.err("port is down") + + if (self.state == self.STATE_IDLE): + return self.err("no streams attached to port") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("validate", params) + if rc.bad(): + return self.err(rc.err()) + + self.profile = rc.data() + + return self.ok() + + def get_profile (self): + return self.profile + + + def print_profile (self, mult, duration): + if not self.get_profile(): + return + + rate = self.get_profile()['rate'] + graph = self.get_profile()['graph'] + + print format_text("Profile Map Per Port\n", 'underline', 'bold') + + factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util']) + + print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"), + format_num(rate['max_bps'] * factor, suffix = "bps")) + + print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"), + format_num(rate['max_pps'] * factor, suffix = "pps"),) + + print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100), + format_percentage(rate['max_line_util'] * factor * 100)) + + + # duration + exp_time_base_sec = graph['expected_duration'] / (1000 * 1000) + exp_time_factor_sec = exp_time_base_sec / factor + + # user configured a duration + if duration > 0: + if exp_time_factor_sec > 0: + exp_time_factor_sec = min(exp_time_factor_sec, duration) + else: + exp_time_factor_sec = duration + + + print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec), + format_time(exp_time_factor_sec)) + print "\n" + + + def get_port_state_name(self): + return self.STATES_MAP.get(self.state, "Unknown") + + ################# stats handler ###################### + def generate_port_stats(self): + return self.port_stats.generate_stats() + pass + + def generate_port_status(self): + return {"port-type": self.driver, + "maximum": "{speed} Gb/s".format(speed=self.speed), + "port-status": self.get_port_state_name() + } + + def clear_stats(self): + return self.port_stats.clear_stats() + + def invalidate_stats(self): + return self.port_stats.invalidate() + + + ################# events handler ###################### + def async_event_port_stopped (self): + self.state = self.STATE_STREAMS + + + def async_event_port_started (self): + self.state = self.STATE_TX + + + def async_event_port_paused (self): + self.state = self.STATE_PAUSE + + + def async_event_port_resumed (self): + self.state = self.STATE_TX + + def async_event_forced_acquired (self): + self.handler = None + diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 748817da..58fa53c9 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -14,433 +14,40 @@ import json from common.trex_streams import * from collections import namedtuple from common.text_opts import * -# import trex_stats from common import trex_stats from client_utils import parsing_opts, text_tables import time import datetime import re +import random +from trex_port import Port +from common.trex_types import * from trex_async_client import CTRexAsyncClient -RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) -class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): - __slots__ = () - def __str__(self): - return "{id:^3} - {msg} ({stat})".format(id=self.id, - msg=self.msg, - stat="success" if self.success else "fail") - -# simple class to represent complex return value -class RC(): - - def __init__ (self, rc = None, data = None): - self.rc_list = [] - - if (rc != None) and (data != None): - tuple_rc = namedtuple('RC', ['rc', 'data']) - self.rc_list.append(tuple_rc(rc, data)) - - def add (self, rc): - self.rc_list += rc.rc_list - - def good (self): - return all([x.rc for x in self.rc_list]) - - def bad (self): - return not self.good() - - def data (self): - return all([x.data if x.rc else "" for x in self.rc_list]) - - def err (self): - return all([x.data if not x.rc else "" for x in self.rc_list]) - - def annotate (self, desc = None): - if desc: - print format_text('\n{:<40}'.format(desc), 'bold'), - - if self.bad(): - # print all the errors - print "" - for x in self.rc_list: - if not x.rc: - print format_text("\n{0}".format(x.data), 'bold') - - print "" - print format_text("[FAILED]\n", 'red', 'bold') - - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - - -def RC_OK(): - return RC(True, "") -def RC_ERR(err): - return RC(False, err) - - -LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) - -# describes a stream DB -class CStreamsDB(object): - - def __init__(self): - self.stream_packs = {} - - def load_yaml_file(self, filename): - - stream_pack_name = filename - if stream_pack_name in self.get_loaded_streams_names(): - self.remove_stream_packs(stream_pack_name) - - stream_list = CStreamList() - loaded_obj = stream_list.load_yaml(filename) - - try: - compiled_streams = stream_list.compile_streams() - rc = self.load_streams(stream_pack_name, - LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump()) - for k, v in compiled_streams.items()])) - - except Exception as e: - return None - - return self.get_stream_pack(stream_pack_name) - - def load_streams(self, name, LoadedStreamList_obj): - if name in self.stream_packs: - return False - else: - self.stream_packs[name] = LoadedStreamList_obj - return True - - def remove_stream_packs(self, *names): - removed_streams = [] - for name in names: - removed = self.stream_packs.pop(name) - if removed: - removed_streams.append(name) - return removed_streams - - def clear(self): - self.stream_packs.clear() - - def get_loaded_streams_names(self): - return self.stream_packs.keys() - - def stream_pack_exists (self, name): - return name in self.get_loaded_streams_names() - - def get_stream_pack(self, name): - if not self.stream_pack_exists(name): - return None - else: - return self.stream_packs.get(name) - - -# describes a single port -class Port(object): - STATE_DOWN = 0 - STATE_IDLE = 1 - STATE_STREAMS = 2 - STATE_TX = 3 - STATE_PAUSE = 4 - PortState = namedtuple('PortState', ['state_id', 'state_name']) - STATES_MAP = {STATE_DOWN: "DOWN", - STATE_IDLE: "IDLE", - STATE_STREAMS: "STREAMS", - STATE_TX: "ACTIVE", - STATE_PAUSE: "PAUSE"} - - - def __init__ (self, port_id, speed, driver, user, transmit): - self.port_id = port_id - self.state = self.STATE_IDLE - self.handler = None - self.transmit = transmit - self.user = user - self.driver = driver - self.speed = speed - self.streams = {} - self.port_stats = trex_stats.CPortStats(self) - - def err(self, msg): - return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - - def ok(self): - return RC_OK() - - def get_speed_bps (self): - return (self.speed * 1000 * 1000 * 1000) - - # take the port - def acquire(self, force = False): - params = {"port_id": self.port_id, - "user": self.user, - "force": force} - - command = RpcCmdData("acquire", params) - rc = self.transmit(command.method, command.params) - if rc.success: - self.handler = rc.data - return self.ok() - else: - return self.err(rc.data) - - # release the port - def release(self): - params = {"port_id": self.port_id, - "handler": self.handler} - - command = RpcCmdData("release", params) - rc = self.transmit(command.method, command.params) - if rc.success: - self.handler = rc.data - return self.ok() - else: - return self.err(rc.data) - - def is_acquired(self): - return (self.handler != None) - - def is_active(self): - return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) - - def sync(self, sync_data): - self.handler = sync_data['handler'] - port_state = sync_data['state'].upper() - if port_state == "DOWN": - self.state = self.STATE_DOWN - elif port_state == "IDLE": - self.state = self.STATE_IDLE - elif port_state == "STREAMS": - self.state = self.STATE_STREAMS - elif port_state == "TX": - self.state = self.STATE_TX - elif port_state == "PAUSE": - self.state = self.STATE_PAUSE - else: - raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) - - return self.ok() - - - # return TRUE if write commands - def is_port_writable (self): - # operations on port can be done on state idle or state streams - return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) - - # add stream to the port - def add_stream (self, stream_id, stream_obj): - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to add streams") - - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id, - "stream": stream_obj} - - rc, data = self.transmit("add_stream", params) - if not rc: - r = self.err(data) - print r.good() - - # add the stream - self.streams[stream_id] = stream_obj - - # the only valid state now - self.state = self.STATE_STREAMS - - return self.ok() - - # remove stream from port - def remove_stream (self, stream_id): - - if not stream_id in self.streams: - return self.err("stream {0} does not exists".format(stream_id)) - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id} - - - rc, data = self.transmit("remove_stream", params) - if not rc: - return self.err(data) - - self.streams[stream_id] = None - - return self.ok() - - # remove all the streams - def remove_all_streams (self): - - params = {"handler": self.handler, - "port_id": self.port_id} +class CTRexStatelessClient(object): + """docstring for CTRexStatelessClient""" - rc, data = self.transmit("remove_all_streams", params) - if not rc: - return self.err(data) + # verbose levels + VERBOSE_QUIET = 0 + VERBOSE_REGULAR = 1 + VERBOSE_HIGH = 2 + + def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, quiet = False, virtual = False): + super(CTRexStatelessClient, self).__init__() - self.streams = {} + self.user = username - return self.ok() + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func) - # get a specific stream - def get_stream (self, stream_id): - if stream_id in self.streams: - return self.streams[stream_id] + # default verbose level + if not quiet: + self.verbose = self.VERBOSE_REGULAR else: - return None - - def get_all_streams (self): - return self.streams - - - def process_mul (self, mul): - # if percentage - translate - if mul['type'] == 'percentage': - mul['type'] = 'max_bps' - mul['max'] = self.get_speed_bps() * (mul['max'] / 100) - - - # start traffic - def start (self, mul, duration): - if self.state == self.STATE_DOWN: - return self.err("Unable to start traffic - port is down") - - if self.state == self.STATE_IDLE: - return self.err("Unable to start traffic - no streams attached to port") - - if self.state == self.STATE_TX: - return self.err("Unable to start traffic - port is already transmitting") - - self.process_mul(mul) - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul, - "duration": duration} - - rc, data = self.transmit("start_traffic", params) - if not rc: - return self.err(data) - - self.state = self.STATE_TX - - return self.ok() - - # stop traffic - # with force ignores the cached state and sends the command - def stop (self, force = False): - - if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("stop_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_STREAMS - - return self.ok() - - def pause (self): - - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} + self.verbose = self.VERBOSE_QUIET - rc, data = self.transmit("pause_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_PAUSE - - return self.ok() - - - def resume (self): - - if (self.state != self.STATE_PAUSE) : - return self.err("port is not in pause mode") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("resume_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_TX - - return self.ok() - - - def update (self, mul): - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - self.process_mul(mul) - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul} - - rc, data = self.transmit("update_traffic", params) - if not rc: - return self.err(data) - - return self.ok() - - def get_port_state_name(self): - return self.STATES_MAP.get(self.state, "Unknown") - - ################# stats handler ###################### - def generate_port_stats(self): - return self.port_stats.generate_stats() - pass - - def generate_port_status(self): - return {"port-type": self.driver, - "maximum": "{speed} Gb/s".format(speed=self.speed), - "port-status": self.get_port_state_name() - } - - def clear_stats(self): - return self.port_stats.clear_stats() - - ################# events handler ###################### - def async_event_port_stopped (self): - self.state = self.STATE_STREAMS - - -class CTRexStatelessClient(object): - """docstring for CTRexStatelessClient""" - - def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False): - super(CTRexStatelessClient, self).__init__() - self.user = username - self.system_info = None - self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) - self.verbose = False self.ports = {} - # self._conn_handler = {} - # self._active_ports = set() self._connection_info = {"server": server, "sync_port": sync_port, "async_port": async_port} @@ -448,7 +55,7 @@ class CTRexStatelessClient(object): self.server_version = {} self.__err_log = None - self._async_client = CTRexAsyncClient(server, async_port, self) + self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func) self.streams_db = CStreamsDB() self.global_stats = trex_stats.CGlobalStats(self._connection_info, @@ -457,12 +64,44 @@ class CTRexStatelessClient(object): self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats, self.ports) + self.events = [] + + self.session_id = random.getrandbits(32) + self.read_only = False self.connected = False - self.events = [] + + + # returns the port object + def get_port (self, port_id): + return self.ports.get(port_id, None) + + + # connection server ip + def get_server_ip (self): + return self.comm_link.get_server() + + # connection server port + def get_server_port (self): + return self.comm_link.get_port() + ################# events handler ###################### - + def add_event_log (self, msg, ev_type, show = False): + + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" + + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) + + if show: + self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) + + def handle_async_stats_update(self, dump_data): global_stats = {} port_stats = {} @@ -490,59 +129,108 @@ class CTRexStatelessClient(object): for port_id, data in port_stats.iteritems(): self.ports[port_id].port_stats.update(data) + + def handle_async_event (self, type, data): # DP stopped - ev = "[event] - " - show_event = False # port started if (type == 0): port_id = int(data['port_id']) - ev += "Port {0} has started".format(port_id) + ev = "Port {0} has started".format(port_id) + self.async_event_port_started(port_id) # port stopped elif (type == 1): port_id = int(data['port_id']) - ev += "Port {0} has stopped".format(port_id) + ev = "Port {0} has stopped".format(port_id) # call the handler self.async_event_port_stopped(port_id) - # server stopped + # port paused elif (type == 2): - ev += "Server has stopped" - self.async_event_server_stopped() - show_event = True + port_id = int(data['port_id']) + ev = "Port {0} has paused".format(port_id) - # port finished traffic + # call the handler + self.async_event_port_paused(port_id) + + # port resumed elif (type == 3): port_id = int(data['port_id']) - ev += "Port {0} job done".format(port_id) + ev = "Port {0} has resumed".format(port_id) + + # call the handler + self.async_event_port_resumed(port_id) + + # port finished traffic + elif (type == 4): + port_id = int(data['port_id']) + ev = "Port {0} job done".format(port_id) # call the handler self.async_event_port_stopped(port_id) show_event = True + # port was stolen... + elif (type == 5): + session_id = data['session_id'] + + # false alarm, its us + if session_id == self.session_id: + return + + port_id = int(data['port_id']) + who = data['who'] + + ev = "Port {0} was forcely taken by '{1}'".format(port_id, who) + + # call the handler + self.async_event_port_forced_acquired(port_id) + show_event = True + + # server stopped + elif (type == 100): + ev = "Server has stopped" + self.async_event_server_stopped() + show_event = True + + else: # unknown event - ignore return - if show_event: - print format_text("\n" + ev, 'bold') - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{0} - ".format(st) + format_text(ev, 'bold')) + self.add_event_log(ev, 'server', show_event) def async_event_port_stopped (self, port_id): self.ports[port_id].async_event_port_stopped() + + def async_event_port_started (self, port_id): + self.ports[port_id].async_event_port_started() + + + def async_event_port_paused (self, port_id): + self.ports[port_id].async_event_port_paused() + + + def async_event_port_resumed (self, port_id): + self.ports[port_id].async_event_port_resumed() + + + def async_event_port_forced_acquired (self, port_id): + self.ports[port_id].async_event_forced_acquired() + self.read_only = True + def async_event_server_stopped (self): - self.disconnect() + self.connected = False + def get_events (self): return self.events @@ -552,6 +240,24 @@ class CTRexStatelessClient(object): ############# helper functions section ############## + # measure time for functions + def timing(f): + def wrap(*args): + time1 = time.time() + ret = f(*args) + + # don't want to print on error + if ret.bad(): + return ret + + delta = time.time() - time1 + print format_time(delta) + "\n" + + return ret + + return wrap + + def validate_port_list(self, port_id_list): if not isinstance(port_id_list, list): print type(port_id_list) @@ -584,66 +290,120 @@ class CTRexStatelessClient(object): ############ boot up section ################ # connection sequence - def connect(self): + # mode can be RW - read / write, RWF - read write with force , RO - read only + def connect(self, mode = "RW"): + + if self.is_connected(): + self.disconnect() + + # clear this flag self.connected = False - # connect - rc, data = self.comm_link.connect() - if not rc: - return RC_ERR(data) + # connect sync channel + rc = self.comm_link.connect() + if rc.bad(): + return rc + + # connect async channel + rc = self.async_client.connect() + if rc.bad(): + return rc # version - rc, data = self.transmit("get_version") - if not rc: - return RC_ERR(data) + rc = self.transmit("get_version") + if rc.bad(): + return rc - self.server_version = data - self.global_stats.server_version = data + self.server_version = rc.data() + self.global_stats.server_version = rc.data() # cache system info - # self.get_system_info(refresh=True) - rc, data = self.transmit("get_system_info") - if not rc: - return RC_ERR(data) - self.system_info = data + rc = self.transmit("get_system_info") + if rc.bad(): + return rc + + self.system_info = rc.data() # cache supported commands - rc, data = self.transmit("get_supported_cmds") - if not rc: - return RC_ERR(data) + rc = self.transmit("get_supported_cmds") + if rc.bad(): + return rc - self.supported_cmds = data + self.supported_cmds = rc.data() # create ports for port_id in xrange(self.get_port_count()): speed = self.system_info['ports'][port_id]['speed'] driver = self.system_info['ports'][port_id]['driver'] - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.transmit) - # acquire all ports - rc = self.acquire() - if rc.bad(): - return rc + self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id) + - rc = self.sync_with_server() + # sync the ports + rc = self.sync_ports() if rc.bad(): return rc - self.connected = True + # acquire all ports + if mode == "RW": + rc = self.acquire(force = False) + + # fallback to read only if failed + if rc.bad(): + rc.annotate(show_status = False) + print format_text("Switching to read only mode - only few commands will be available", 'bold') + + self.release(self.get_acquired_ports()) + self.read_only = True + else: + self.read_only = False + + elif mode == "RWF": + rc = self.acquire(force = True) + if rc.bad(): + return rc + self.read_only = False + elif mode == "RO": + # no acquire on read only + rc = RC_OK() + self.read_only = True + + + + self.connected = True return RC_OK() + + def is_read_only (self): + return self.read_only + def is_connected (self): return self.connected and self.comm_link.is_connected def disconnect(self): - self.connected = False + # release any previous acquired ports + if self.is_connected(): + self.release(self.get_acquired_ports()) + self.comm_link.disconnect() + self.async_client.disconnect() + + self.connected = False + return RC_OK() + def on_async_dead (self): + if self.connected: + msg = 'lost connection to server' + self.add_event_log(msg, 'local', True) + self.connected = False + + def on_async_alive (self): + pass ########### cached queries (no server traffic) ########### @@ -666,8 +426,8 @@ class CTRexStatelessClient(object): else: return port_ids - def get_stats_async(self): - return self._async_client.get_stats() + def get_stats_async (self): + return self.async_client.get_stats() def get_connection_port (self): return self.comm_link.port @@ -675,6 +435,9 @@ class CTRexStatelessClient(object): def get_connection_ip (self): return self.comm_link.server + def get_all_ports (self): + return [port_id for port_id, port_obj in self.ports.iteritems()] + def get_acquired_ports(self): return [port_id for port_id, port_obj in self.ports.iteritems() @@ -685,36 +448,60 @@ class CTRexStatelessClient(object): for port_id, port_obj in self.ports.iteritems() if port_obj.is_active()] + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + def set_verbose(self, mode): - self.comm_link.set_verbose(mode) + + # on high - enable link verbose + if mode == self.VERBOSE_HIGH: + self.comm_link.set_verbose(True) + else: + self.comm_link.set_verbose(False) + self.verbose = mode + + def check_verbose (self, level): + return (self.verbose >= level) + + def get_verbose (self): + return self.verbose + + def prn_func (self, msg, level = VERBOSE_REGULAR): + if self.check_verbose(level): + print msg + ############# server actions ################ # ping server def ping(self): - rc, info = self.transmit("ping") - return RC(rc, info) + return self.transmit("ping") - def sync_with_server(self, sync_streams=False): - rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) - if not rc: - return RC_ERR(data) - - for port_info in data: - rc = self.ports[port_info['port_id']].sync(port_info) - if rc.bad(): - return rc - - return RC_OK() def get_global_stats(self): - rc, info = self.transmit("get_global_stats") - return RC(rc, info) + return self.transmit("get_global_stats") ########## port commands ############## + def sync_ports (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].sync()) + + return rc + # acquire ports, if port_list is none - get all def acquire (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) @@ -733,7 +520,7 @@ class CTRexStatelessClient(object): rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].release(force)) + rc.add(self.ports[port_id].release()) return rc @@ -750,15 +537,16 @@ class CTRexStatelessClient(object): return rc + def add_stream_pack(self, stream_pack_list, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() - for stream_pack in stream_pack_list: - rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)) - + for port_id in port_id_list: + rc.add(self.ports[port_id].add_streams(stream_pack_list)) + return rc @@ -855,6 +643,17 @@ class CTRexStatelessClient(object): return rc + def validate (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].validate()) + + return rc + + def get_port_stats(self, port_id=None): pass @@ -866,16 +665,19 @@ class CTRexStatelessClient(object): return self.comm_link.transmit(method_name, params) + def transmit_batch(self, batch_list): + return self.comm_link.transmit_batch(batch_list) ######################### Console (high level) API ######################### + @timing def cmd_ping(self): rc = self.ping() rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) return rc - def cmd_connect(self): - rc = self.connect() + def cmd_connect(self, mode = "RW"): + rc = self.connect(mode) rc.annotate() return rc @@ -886,13 +688,7 @@ class CTRexStatelessClient(object): # reset def cmd_reset(self): - - - # sync with the server - rc = self.sync_with_server() - rc.annotate("Syncing with the server:") - if rc.bad(): - return rc + #self.release(self.get_acquired_ports()) rc = self.acquire(force = True) rc.annotate("Force acquiring all ports:") @@ -924,7 +720,7 @@ class CTRexStatelessClient(object): active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) if not active_ports: - msg = "No active traffic on porvided ports" + msg = "No active traffic on provided ports" print format_text(msg, 'bold') return RC_ERR(msg) @@ -948,15 +744,26 @@ class CTRexStatelessClient(object): rc = self.update_traffic(mult, active_ports) rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - return RC_OK() + return rc + # clear stats def cmd_clear(self, port_id_list): + for port_id in port_id_list: self.ports[port_id].clear_stats() + self.global_stats.clear_stats() + + return RC_OK() + + + def cmd_invalidate (self, port_id_list): + for port_id in port_id_list: + self.ports[port_id].invalidate_stats() + + self.global_stats.invalidate() + return RC_OK() # pause cmd @@ -972,10 +779,8 @@ class CTRexStatelessClient(object): rc = self.pause_traffic(active_ports) rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc + return rc - return RC_OK() # resume cmd @@ -991,14 +796,11 @@ class CTRexStatelessClient(object): rc = self.resume_traffic(active_ports) rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - - return RC_OK() + return rc # start cmd - def cmd_start (self, port_id_list, stream_list, mult, force, duration): + def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) @@ -1020,19 +822,37 @@ class CTRexStatelessClient(object): rc = self.add_stream_pack(stream_list.compiled, port_id_list) - rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list)) + rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) if rc.bad(): return rc + # when not on dry - start the traffic , otherwise validate only + if not dry: + rc = self.start_traffic(mult, duration, port_id_list) + rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - # finally, start the traffic - rc = self.start_traffic(mult, duration, port_id_list) - rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): return rc + else: + rc = self.validate(port_id_list) + rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list)) - return RC_OK() + if rc.bad(): + return rc + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) + + return rc + + + # validate port(s) profile + def cmd_validate (self, port_id_list): + rc = self.validate(port_id_list) + rc.annotate("Validating streams on port(s) {0}:".format(port_id_list)) + return rc + + + # stats def cmd_stats(self, port_id_list, stats_mask=set()): stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) @@ -1043,6 +863,26 @@ class CTRexStatelessClient(object): ############## High Level API With Parser ################ + + def cmd_connect_line (self, line): + '''Connects to the TRex server''' + # define a parser + parser = parsing_opts.gen_parser(self, + "connect", + self.cmd_connect_line.__doc__, + parsing_opts.FORCE) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + if opts.force: + rc = self.cmd_connect(mode = "RWF") + else: + rc = self.cmd_connect(mode = "RW") + + @timing def cmd_start_line (self, line): '''Start selected traffic in specified ports on TRex\n''' # define a parser @@ -1054,13 +894,19 @@ class CTRexStatelessClient(object): parsing_opts.FORCE, parsing_opts.STREAM_FROM_PATH_OR_FILE, parsing_opts.DURATION, - parsing_opts.MULTIPLIER) + parsing_opts.MULTIPLIER_STRICT, + parsing_opts.DRY_RUN) opts = parser.parse_args(line.split()) + if opts is None: return RC_ERR("bad command line parameters") + + if opts.dry: + print format_text("\n*** DRY RUN ***", 'bold') + if opts.db: stream_list = self.streams_db.get_stream_pack(opts.db) rc = RC(stream_list != None) @@ -1070,7 +916,15 @@ class CTRexStatelessClient(object): else: # load streams from file - stream_list = self.streams_db.load_yaml_file(opts.file[0]) + stream_list = None; + try: + stream_list = self.streams_db.load_yaml_file(opts.file[0]) + except Exception as e: + s = str(e) + rc=RC_ERR(s) + rc.annotate() + return rc + rc = RC(stream_list != None) rc.annotate("Load stream pack (from file):") if stream_list == None: @@ -1078,12 +932,13 @@ class CTRexStatelessClient(object): # total has no meaning with percentage - its linear - if opts.total and (mult['type'] != 'percentage'): + if opts.total and (opts.mult['type'] != 'percentage'): # if total was set - divide it between the ports - opts.mult['max'] = opts.mult['max'] / len(opts.ports) + opts.mult['value'] = opts.mult['value'] / len(opts.ports) - return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration) + return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry) + @timing def cmd_resume_line (self, line): '''Resume active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1097,6 +952,8 @@ class CTRexStatelessClient(object): return self.cmd_resume(opts.ports) + + @timing def cmd_stop_line (self, line): '''Stop active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1110,6 +967,8 @@ class CTRexStatelessClient(object): return self.cmd_stop(opts.ports) + + @timing def cmd_pause_line (self, line): '''Pause active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1123,6 +982,8 @@ class CTRexStatelessClient(object): return self.cmd_pause(opts.ports) + + @timing def cmd_update_line (self, line): '''Update port(s) speed currently active\n''' parser = parsing_opts.gen_parser(self, @@ -1139,14 +1000,15 @@ class CTRexStatelessClient(object): # total has no meaning with percentage - its linear if opts.total and (opts.mult['type'] != 'percentage'): # if total was set - divide it between the ports - opts.mult['max'] = opts.mult['max'] / len(opts.ports) + opts.mult['value'] = opts.mult['value'] / len(opts.ports) return self.cmd_update(opts.ports, opts.mult) - + @timing def cmd_reset_line (self, line): return self.cmd_reset() + def cmd_clear_line (self, line): '''Clear cached local statistics\n''' # define a parser @@ -1161,6 +1023,7 @@ class CTRexStatelessClient(object): return RC_ERR("bad command line parameters") return self.cmd_clear(opts.ports) + def cmd_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser @@ -1180,30 +1043,34 @@ class CTRexStatelessClient(object): if not mask: # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS - # get stats objects, as dictionary + stats = self.cmd_stats(opts.ports, mask) + # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) - return - - # if opts.db: - # stream_list = self.streams_db.get_stream_pack(opts.db) - # rc = RC(stream_list != None) - # rc.annotate("Load stream pack (from DB):") - # if rc.bad(): - # return RC_ERR("Failed to load stream pack") - # - # else: - # # load streams from file - # stream_list = self.streams_db.load_yaml_file(opts.file[0]) - # rc = RC(stream_list != None) - # rc.annotate("Load stream pack (from file):") - # if stream_list == None: - # return RC_ERR("Failed to load stream pack") - # - # - # return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration) + + + return RC_OK() + + + + @timing + def cmd_validate_line (self, line): + '''validates port(s) stream configuration\n''' + + parser = parsing_opts.gen_parser(self, + "validate", + self.cmd_validate_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + rc = self.cmd_validate(opts.ports) + return rc + def cmd_exit_line (self, line): print format_text("Exiting\n", 'bold') @@ -1280,6 +1147,7 @@ class CTRexStatelessClient(object): return True + ################################# # ------ private methods ------ # @staticmethod @@ -1294,17 +1162,18 @@ class CTRexStatelessClient(object): def _filter_namespace_args(namespace, ok_values): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} + ################################# # ------ private classes ------ # class CCommLink(object): """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False): + def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): super(CTRexStatelessClient.CCommLink, self).__init__() self.virtual = virtual self.server = server self.port = port self.verbose = False - self.rpc_link = JsonRpcClient(self.server, self.port) + self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) @property def is_connected(self): @@ -1313,6 +1182,12 @@ class CTRexStatelessClient(object): else: return True + def get_server (self): + return self.server + + def get_port (self): + return self.port + def set_verbose(self, mode): self.verbose = mode return self.rpc_link.set_verbose(mode) @@ -1354,4 +1229,3 @@ class CTRexStatelessClient(object): if __name__ == "__main__": pass - diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index dd208da4..bdae7bd9 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -7,8 +7,7 @@ import general_utils import re from time import sleep from collections import namedtuple - -CmdResponse = namedtuple('CmdResponse', ['success', 'data']) +from common.trex_types import * class bcolors: BLUE = '\033[94m' @@ -33,21 +32,17 @@ class BatchMessage(object): def invoke(self, block = False): if not self.rpc_client.connected: - return False, "Not connected to server" + return RC_ERR("Not connected to server") msg = json.dumps(self.batch_list) - rc, resp_list = self.rpc_client.send_raw_msg(msg, block = False) - if len(self.batch_list) == 1: - return CmdResponse(True, [CmdResponse(rc, resp_list)]) - else: - return CmdResponse(rc, resp_list) + return self.rpc_client.send_raw_msg(msg) # JSON RPC v2.0 client class JsonRpcClient(object): - def __init__ (self, default_server, default_port): + def __init__ (self, default_server, default_port, prn_func = None): self.verbose = False self.connected = False @@ -56,6 +51,8 @@ class JsonRpcClient(object): self.server = default_server self.id_gen = general_utils.random_id_gen() + self.prn_func = prn_func + def get_connection_details (self): rc = {} rc['server'] = self.server @@ -112,7 +109,7 @@ class JsonRpcClient(object): def invoke_rpc_method (self, method_name, params = {}): if not self.connected: - return False, "Not connected to server" + return RC_ERR("Not connected to server") id, msg = self.create_jsonrpc_v2(method_name, params) @@ -130,11 +127,10 @@ class JsonRpcClient(object): self.socket.send(msg) break except zmq.Again: - sleep(0.1) tries += 1 if tries > 10: self.disconnect() - return CmdResponse(False, "Failed to send message to server") + return RC_ERR("*** [RPC] - Failed to send message to server") tries = 0 @@ -143,11 +139,10 @@ class JsonRpcClient(object): response = self.socket.recv() break except zmq.Again: - sleep(0.1) tries += 1 if tries > 10: self.disconnect() - return CmdResponse(False, "Failed to get server response") + return RC_ERR("*** [RPC] - Failed to get server response") self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -158,36 +153,35 @@ class JsonRpcClient(object): response_json = json.loads(response) if isinstance(response_json, list): - rc_list = [] + rc_batch = RC() for single_response in response_json: - rc, msg = self.process_single_response(single_response) - rc_list.append( CmdResponse(rc, msg) ) + rc = self.process_single_response(single_response) + rc_batch.add(rc) - return CmdResponse(True, rc_list) + return rc_batch else: - rc, msg = self.process_single_response(response_json) - return CmdResponse(rc, msg) + return self.process_single_response(response_json) def process_single_response (self, response_json): if (response_json.get("jsonrpc") != "2.0"): - return False, "Malformed Response ({0})".format(str(response_json)) + return RC_ERR("Malformed Response ({0})".format(str(response_json))) # error reported by server if ("error" in response_json): if "specific_err" in response_json["error"]: - return False, response_json["error"]["specific_err"] + return RC_ERR(response_json["error"]["specific_err"]) else: - return False, response_json["error"]["message"] + return RC_ERR(response_json["error"]["message"]) # if no error there should be a result if ("result" not in response_json): - return False, "Malformed Response ({0})".format(str(response_json)) + return RC_ERR("Malformed Response ({0})".format(str(response_json))) - return True, response_json["result"] + return RC_OK(response_json["result"]) @@ -199,11 +193,12 @@ class JsonRpcClient(object): self.socket.close(linger = 0) self.context.destroy(linger = 0) self.connected = False - return True, "" + return RC_OK() else: - return False, "Not connected to server" + return RC_ERR("Not connected to server") - def connect(self, server=None, port=None): + + def connect(self, server = None, port = None, prn_func = None): if self.connected: self.disconnect() @@ -215,20 +210,24 @@ class JsonRpcClient(object): # Socket to talk to server self.transport = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To RPC Server On {0}".format(self.transport) + msg = "\nConnecting To RPC Server On {0}".format(self.transport) + if self.prn_func: + self.prn_func(msg) + else: + print msg self.socket = self.context.socket(zmq.REQ) try: self.socket.connect(self.transport) except zmq.error.ZMQError as e: - return False, "ZMQ Error: Bad server or port name: " + str(e) + return RC_ERR("ZMQ Error: Bad server or port name: " + str(e)) - self.socket.setsockopt(zmq.SNDTIMEO, 5) - self.socket.setsockopt(zmq.RCVTIMEO, 5) + self.socket.setsockopt(zmq.SNDTIMEO, 1000) + self.socket.setsockopt(zmq.RCVTIMEO, 1000) self.connected = True - return True, "" + return RC_OK() def reconnect(self): @@ -236,7 +235,7 @@ class JsonRpcClient(object): return self.connect() if not self.connected: - return False, "Not connected to server" + return RC_ERR("Not connected to server") # reconnect return self.connect(self.server, self.port) diff --git a/scripts/automation/trex_control_plane/client_utils/packet_builder.py b/scripts/automation/trex_control_plane/client_utils/packet_builder.py index 3aeb6a34..d8070c74 100755 --- a/scripts/automation/trex_control_plane/client_utils/packet_builder.py +++ b/scripts/automation/trex_control_plane/client_utils/packet_builder.py @@ -301,6 +301,30 @@ class CTRexPktBuilder(object): break return + def load_packet_from_pcap(self, pcap_path): + """ + This method loads a pcap file into a parsed packet builder object. + + :parameters: + pcap_path: str + a path to a pcap file, containing a SINGLE packet. + + :raises: + + :exc:`IOError`, in case provided path doesn't exists. + + """ + with open(pcap_path, 'r') as f: + pcap = dpkt.pcap.Reader(f) + first_packet = True + for _, buf in pcap: + # this is an iterator, can't evaluate the number of files in advance + if first_packet: + self.load_packet(dpkt.ethernet.Ethernet(buf)) + else: + raise ValueError("Provided pcap file contains more than single packet.") + # arrive here ONLY if pcap contained SINGLE packet + return + def get_packet(self, get_ptr=False): """ This method provides access to the built packet, as an instance or as a pointer to packet itself. diff --git a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py index 6c348467..6f9b4c6d 100755 --- a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py @@ -10,22 +10,24 @@ ArgumentGroup = namedtuple('ArgumentGroup', ['type', 'args', 'options']) # list of available parsing options MULTIPLIER = 1 -PORT_LIST = 2 -ALL_PORTS = 3 -PORT_LIST_WITH_ALL = 4 -FILE_PATH = 5 -FILE_FROM_DB = 6 -SERVER_IP = 7 -STREAM_FROM_PATH_OR_FILE = 8 -DURATION = 9 -FORCE = 10 - -TOTAL = 11 - -GLOBAL_STATS = 12 -PORT_STATS = 13 -PORT_STATUS = 14 -STATS_MASK = 15 +MULTIPLIER_STRICT = 2 +PORT_LIST = 3 +ALL_PORTS = 4 +PORT_LIST_WITH_ALL = 5 +FILE_PATH = 6 +FILE_FROM_DB = 7 +SERVER_IP = 8 +STREAM_FROM_PATH_OR_FILE = 9 +DURATION = 10 +FORCE = 11 +DRY_RUN = 12 +XTERM = 13 +TOTAL = 14 + +GLOBAL_STATS = 50 +PORT_STATS = 51 +PORT_STATUS = 52 +STATS_MASK = 53 # list of ArgumentGroup types MUTEX = 1 @@ -60,10 +62,15 @@ match_multiplier_help = """Multiplier should be passed in the following format: will provide a percentage of the line rate. examples : '-m 10', '-m 10kbps', '-m 10mpps', '-m 23%%' """ -def match_multiplier(val): - '''match some val against multiplier shortcut inputs ''' +def match_multiplier_common(val, strict_abs = True): - match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val) + # on strict absolute we do not allow +/- + if strict_abs: + match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val) + op = None + else: + match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)([\+\-])?$", val) + op = match.group(4) result = {} @@ -71,44 +78,53 @@ def match_multiplier(val): value = float(match.group(1)) unit = match.group(3) + + # raw type (factor) if not unit: result['type'] = 'raw' - result['max'] = value + result['value'] = value elif unit == 'bps': - result['type'] = 'max_bps' - result['max'] = value + result['type'] = 'bps' + result['value'] = value elif unit == 'kbps': - result['type'] = 'max_bps' - result['max'] = value * 1000 + result['type'] = 'bps' + result['value'] = value * 1000 elif unit == 'mbps': - result['type'] = 'max_bps' - result['max'] = value * 1000 * 1000 + result['type'] = 'bps' + result['value'] = value * 1000 * 1000 elif unit == 'gbps': - result['type'] = 'max_bps' - result['max'] = value * 1000 * 1000 * 1000 + result['type'] = 'bps' + result['value'] = value * 1000 * 1000 * 1000 elif unit == 'pps': - result['type'] = 'max_pps' - result['max'] = value + result['type'] = 'pps' + result['value'] = value elif unit == "kpps": - result['type'] = 'max_pps' - result['max'] = value * 1000 + result['type'] = 'pps' + result['value'] = value * 1000 elif unit == "mpps": - result['type'] = 'max_pps' - result['max'] = value * 1000 * 1000 + result['type'] = 'pps' + result['value'] = value * 1000 * 1000 elif unit == "%": - # will be translated by the port object result['type'] = 'percentage' - result['max'] = value + result['value'] = value + + + if op == "+": + result['op'] = "add" + elif op == "-": + result['op'] = "sub" + else: + result['op'] = "abs" return result @@ -116,6 +132,13 @@ def match_multiplier(val): raise argparse.ArgumentTypeError(match_multiplier_help) +def match_multiplier(val): + '''match some val against multiplier shortcut inputs ''' + return match_multiplier_common(val, strict_abs = False) + +def match_multiplier_strict(val): + '''match some val against multiplier shortcut inputs ''' + return match_multiplier_common(val, strict_abs = True) def is_valid_file(filename): if not os.path.isfile(filename): @@ -127,9 +150,14 @@ def is_valid_file(filename): OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'help': match_multiplier_help, 'dest': "mult", - 'default': {'type':'raw', 'max':1}, + 'default': {'type':'raw', 'value':1, 'op': 'abs'}, 'type': match_multiplier}), + MULTIPLIER_STRICT: ArgumentPack(['-m', '--multiplier'], + {'help': match_multiplier_help, + 'dest': "mult", + 'default': {'type':'raw', 'value':1, 'op': 'abs'}, + 'type': match_multiplier_strict}), TOTAL: ArgumentPack(['-t', '--total'], {'help': "traffic will be divided between all ports specified", @@ -177,6 +205,19 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'metavar': 'SERVER', 'help': "server IP"}), + DRY_RUN: ArgumentPack(['-n', '--dry'], + {'action': 'store_true', + 'dest': 'dry', + 'default': False, + 'help': "Dry run - no traffic will be injected"}), + + + XTERM: ArgumentPack(['-x', '--xterm'], + {'action': 'store_true', + 'dest': 'xterm', + 'default': False, + 'help': "Starts TUI in xterm window"}), + GLOBAL_STATS: ArgumentPack(['-g'], {'action': 'store_true', 'help': "Fetch only global statistics"}), @@ -189,6 +230,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'action': 'store_true', 'help': "Fetch only port status data"}), + # advanced options PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST, ALL_PORTS], diff --git a/scripts/automation/trex_control_plane/common/text_opts.py b/scripts/automation/trex_control_plane/common/text_opts.py index 06c2c056..5a86149c 100755 --- a/scripts/automation/trex_control_plane/common/text_opts.py +++ b/scripts/automation/trex_control_plane/common/text_opts.py @@ -19,6 +19,50 @@ TEXT_CODES = {'bold': {'start': '\x1b[1m', 'end': '\x1b[24m'}} +def format_num (size, suffix = ""): + for unit in ['','K','M','G','T','P']: + if abs(size) < 1000.0: + return "%3.2f %s%s" % (size, unit, suffix) + size /= 1000.0 + + return "NaN" + +def format_time (t_sec): + if t_sec < 0: + return "infinite" + + if t_sec < 1: + # low numbers + for unit in ['ms', 'usec', 'ns']: + t_sec *= 1000.0 + if t_sec >= 1.0: + return '{:,.2f} [{:}]'.format(t_sec, unit) + + return "NaN" + + else: + # seconds + if t_sec < 60.0: + return '{:,.2f} [{:}]'.format(t_sec, 'sec') + + # minutes + t_sec /= 60.0 + if t_sec < 60.0: + return '{:,.2f} [{:}]'.format(t_sec, 'minutes') + + # hours + t_sec /= 60.0 + if t_sec < 24.0: + return '{:,.2f} [{:}]'.format(t_sec, 'hours') + + # days + t_sec /= 24.0 + return '{:,.2f} [{:}]'.format(t_sec, 'days') + + +def format_percentage (size): + return "%0.2f %%" % (size) + def bold(text): return text_attribute(text, 'bold') diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py index 1f9d59e3..9562f1f5 100755 --- a/scripts/automation/trex_control_plane/common/trex_stats.py +++ b/scripts/automation/trex_control_plane/common/trex_stats.py @@ -5,12 +5,15 @@ from common.text_opts import format_text from client.trex_async_client import CTRexAsyncStats import copy import datetime +import time import re GLOBAL_STATS = 'g' PORT_STATS = 'p' PORT_STATUS = 'ps' ALL_STATS_OPTS = {GLOBAL_STATS, PORT_STATS, PORT_STATUS} +COMPACT = {GLOBAL_STATS, PORT_STATS} + ExportableStats = namedtuple('ExportableStats', ['raw_data', 'text_table']) @@ -54,15 +57,24 @@ class CTRexStatsGenerator(object): return_stats_data = {} per_field_stats = OrderedDict([("owner", []), - ("active", []), + ("state", []), + ("--", []), + ("opackets", []), + ("obytes", []), + ("ipackets", []), + ("ibytes", []), + ("ierrors", []), + ("oerrors", []), ("tx-bytes", []), ("rx-bytes", []), ("tx-pkts", []), ("rx-pkts", []), - ("tx-errors", []), - ("rx-errors", []), - ("tx-BW", []), - ("rx-BW", []) + ("---", []), + ("Tx bps", []), + ("Rx bps", []), + ("----", []), + ("Tx pps", []), + ("Rx pps", []) ] ) @@ -76,6 +88,9 @@ class CTRexStatsGenerator(object): stats_table = text_tables.TRexTextTable() stats_table.set_cols_align(["l"] + ["r"]*len(relevant_ports)) + stats_table.set_cols_width([10] + [20] * len(relevant_ports)) + stats_table.set_cols_dtype(['t'] + ['t'] * len(relevant_ports)) + stats_table.add_rows([[k] + v for k, v in per_field_stats.iteritems()], header=False) @@ -106,6 +121,8 @@ class CTRexStatsGenerator(object): stats_table = text_tables.TRexTextTable() stats_table.set_cols_align(["l"] + ["c"]*len(relevant_ports)) + stats_table.set_cols_width([10] + [20] * len(relevant_ports)) + stats_table.add_rows([[k] + v for k, v in per_field_status.iteritems()], header=False) @@ -118,7 +135,8 @@ class CTRexStatsGenerator(object): # fetch owned ports ports = [port_obj for _, port_obj in self._ports_dict.iteritems() - if port_obj.is_acquired() and port_obj.port_id in port_id_list] + if port_obj.port_id in port_id_list] + # display only the first FOUR options, by design if len(ports) > 4: print format_text("[WARNING]: ", 'magenta', 'bold'), format_text("displaying up to 4 ports", 'magenta') @@ -139,7 +157,7 @@ class CTRexStats(object): def __init__(self): self.reference_stats = None self.latest_stats = {} - self.last_update_ts = datetime.datetime.now() + self.last_update_ts = time.time() def __getitem__(self, item): @@ -176,6 +194,9 @@ class CTRexStats(object): @staticmethod def format_num(size, suffix = ""): + if type(size) == str: + return "N/A" + for unit in ['','K','M','G','T','P']: if abs(size) < 1000.0: return "%3.2f %s%s" % (size, unit, suffix) @@ -188,16 +209,22 @@ class CTRexStats(object): def update(self, snapshot): # update - self.last_update_ts = datetime.datetime.now() - self.latest_stats = snapshot - if self.reference_stats == None: + diff_time = time.time() - self.last_update_ts + + # 3 seconds is too much - this is the new reference + if (self.reference_stats == None) or (diff_time > 3): self.reference_stats = self.latest_stats + self.last_update_ts = time.time() + def clear_stats(self): self.reference_stats = self.latest_stats + def invalidate (self): + self.latest_stats = {} + def get(self, field, format=False, suffix=""): if not field in self.latest_stats: return "N/A" @@ -209,6 +236,7 @@ class CTRexStats(object): def get_rel(self, field, format=False, suffix=""): if not field in self.latest_stats: return "N/A" + if not format: return (self.latest_stats[field] - self.reference_stats[field]) else: @@ -216,7 +244,6 @@ class CTRexStats(object): class CGlobalStats(CTRexStats): - pass def __init__(self, connection_info, server_version, ports_dict_ref): super(CGlobalStats, self).__init__() @@ -242,7 +269,6 @@ class CGlobalStats(CTRexStats): ) class CPortStats(CTRexStats): - pass def __init__(self, port_obj): super(CPortStats, self).__init__() @@ -250,15 +276,26 @@ class CPortStats(CTRexStats): def generate_stats(self): return {"owner": self._port_obj.user, - "active": "YES" if self._port_obj.is_active() else "NO", + "state": self._port_obj.get_port_state_name(), + "--": "", + "opackets" : self.get_rel("opackets"), + "obytes" : self.get_rel("obytes"), + "ipackets" : self.get_rel("ipackets"), + "ibytes" : self.get_rel("ibytes"), + "ierrors" : self.get_rel("ierrors"), + "oerrors" : self.get_rel("oerrors"), + "tx-bytes": self.get_rel("obytes", format = True, suffix = "B"), "rx-bytes": self.get_rel("ibytes", format = True, suffix = "B"), "tx-pkts": self.get_rel("opackets", format = True, suffix = "pkts"), "rx-pkts": self.get_rel("ipackets", format = True, suffix = "pkts"), - "tx-errors": self.get_rel("oerrors", format = True), - "rx-errors": self.get_rel("ierrors", format = True), - "tx-BW": self.get("m_total_tx_bps", format = True, suffix = "bps"), - "rx-BW": self.get("m_total_rx_bps", format = True, suffix = "bps") + + "---": "", + "Tx bps": self.get("m_total_tx_bps", format = True, suffix = "bps"), + "Rx bps": self.get("m_total_rx_bps", format = True, suffix = "bps"), + "----": "", + "Tx pps": self.get("m_total_tx_pps", format = True, suffix = "pps"), + "Rx pps": self.get("m_total_rx_pps", format = True, suffix = "pps"), } diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py index bb4c72ca..86eee1f4 100755 --- a/scripts/automation/trex_control_plane/common/trex_streams.py +++ b/scripts/automation/trex_control_plane/common/trex_streams.py @@ -10,18 +10,31 @@ import copy import os StreamPack = namedtuple('StreamPack', ['stream_id', 'stream']) +LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) class CStreamList(object): def __init__(self): - self.streams_list = {} + self.streams_list = OrderedDict() self.yaml_loader = CTRexYAMLLoader(os.path.join(os.path.dirname(os.path.realpath(__file__)), "rpc_defaults.yaml")) + def generate_numbered_name (self, name): + prefix = name.rstrip('01234567890') + suffix = name[len(prefix):] + if suffix == "": + n = "_1" + else: + n = int(suffix) + 1 + return prefix + str(n) + def append_stream(self, name, stream_obj): assert isinstance(stream_obj, CStream) - if name in self.streams_list: - raise NameError("A stream with this name already exists on this list.") + + # if name exists simply add numbered suffix to it + while name in self.streams_list: + name = self.generate_numbered_name(name) + self.streams_list[name]=stream_obj return name @@ -70,6 +83,7 @@ class CStreamList(object): stream_ids = {} for idx, stream_name in enumerate(self.streams_list): stream_ids[stream_name] = idx + # next, iterate over the streams and transform them from working with names to ids. # with that build a new dict with old stream_name as the key, and StreamPack as the stored value compiled_streams = {} @@ -241,5 +255,61 @@ class CStream(object): raise RuntimeError("CStream object isn't loaded with data. Use 'load_data' method.") -if __name__ == "__main__": - pass + +# describes a stream DB +class CStreamsDB(object): + + def __init__(self): + self.stream_packs = {} + + def load_yaml_file(self, filename): + + stream_pack_name = filename + if stream_pack_name in self.get_loaded_streams_names(): + self.remove_stream_packs(stream_pack_name) + + stream_list = CStreamList() + loaded_obj = stream_list.load_yaml(filename) + + try: + compiled_streams = stream_list.compile_streams() + rc = self.load_streams(stream_pack_name, + LoadedStreamList(loaded_obj, + [StreamPack(v.stream_id, v.stream.dump()) + for k, v in compiled_streams.items()])) + + except Exception as e: + return None + + return self.get_stream_pack(stream_pack_name) + + def load_streams(self, name, LoadedStreamList_obj): + if name in self.stream_packs: + return False + else: + self.stream_packs[name] = LoadedStreamList_obj + return True + + def remove_stream_packs(self, *names): + removed_streams = [] + for name in names: + removed = self.stream_packs.pop(name) + if removed: + removed_streams.append(name) + return removed_streams + + def clear(self): + self.stream_packs.clear() + + def get_loaded_streams_names(self): + return self.stream_packs.keys() + + def stream_pack_exists (self, name): + return name in self.get_loaded_streams_names() + + def get_stream_pack(self, name): + if not self.stream_pack_exists(name): + return None + else: + return self.stream_packs.get(name) + diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py new file mode 100644 index 00000000..7c3f04c5 --- /dev/null +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -0,0 +1,68 @@ + +from collections import namedtuple +from common.text_opts import * + +RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) + +class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): + __slots__ = () + def __str__(self): + return "{id:^3} - {msg} ({stat})".format(id=self.id, + msg=self.msg, + stat="success" if self.success else "fail") + +# simple class to represent complex return value +class RC(): + + def __init__ (self, rc = None, data = None): + self.rc_list = [] + + if (rc != None) and (data != None): + tuple_rc = namedtuple('RC', ['rc', 'data']) + self.rc_list.append(tuple_rc(rc, data)) + + def add (self, rc): + self.rc_list += rc.rc_list + + def good (self): + return all([x.rc for x in self.rc_list]) + + def bad (self): + return not self.good() + + def data (self): + d = [x.data if x.rc else "" for x in self.rc_list] + return (d if len(d) > 1 else d[0]) + + def err (self): + e = [x.data if not x.rc else "" for x in self.rc_list] + return (e if len(e) > 1 else e[0]) + + def annotate (self, desc = None, show_status = True): + if desc: + print format_text('\n{:<60}'.format(desc), 'bold'), + else: + print "" + + if self.bad(): + # print all the errors + print "" + for x in self.rc_list: + if not x.rc: + print format_text("\n{0}".format(x.data), 'bold') + + print "" + if show_status: + print format_text("[FAILED]\n", 'red', 'bold') + + + else: + if show_status: + print format_text("[SUCCESS]\n", 'green', 'bold') + + +def RC_OK(data = ""): + return RC(True, data) +def RC_ERR (err): + return RC(False, err) + diff --git a/scripts/automation/trex_control_plane/console/old_console.py b/scripts/automation/trex_control_plane/console/old_console.py deleted file mode 100644 index 9d61a3a6..00000000 --- a/scripts/automation/trex_control_plane/console/old_console.py +++ /dev/null @@ -1,958 +0,0 @@ - -# main console object -class TRexConsole1(cmd.Cmd): - """Trex Console""" - - def __init__(self, stateless_client, verbose): - cmd.Cmd.__init__(self) - - self.stateless_client = stateless_client - - self.do_connect("") - - self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) - self.intro += "\nType 'help' or '?' for supported actions\n" - - self.verbose = False - self._silent = True - - self.postcmd(False, "") - - self.user_streams = {} - self.streams_db = CStreamsDB() - - - # a cool hack - i stole this function and added space - def completenames(self, text, *ignored): - dotext = 'do_'+text - return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)] - - - # set verbose on / off - def do_verbose(self, line): - '''Shows or set verbose mode\n''' - if line == "": - print "\nverbose is " + ("on\n" if self.verbose else "off\n") - - elif line == "on": - self.verbose = True - self.stateless_client.set_verbose(True) - print green("\nverbose set to on\n") - - elif line == "off": - self.verbose = False - self.stateless_client.set_verbose(False) - print green("\nverbose set to off\n") - - else: - print magenta("\nplease specify 'on' or 'off'\n") - - # query the server for registered commands - def do_query_server(self, line): - '''query the RPC server for supported remote commands\n''' - - res_ok, msg = self.stateless_client.get_supported_cmds() - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print "\nRPC server supports the following commands:\n" - for func in msg: - if func: - print func - print '' - print format_text("[SUCCESS]\n", 'green', 'bold') - return - - def do_ping(self, line): - '''Pings the RPC server\n''' - - print "\n-> Pinging RPC server" - - res_ok, msg = self.stateless_client.ping() - if res_ok: - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print "\n*** " + msg + "\n" - return - - def do_force_acquire(self, line): - '''Acquires ports by force\n''' - - self.do_acquire(line, True) - - def complete_force_acquire(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx, acquired=False) - - def extract_port_ids_from_line(self, line): - return {int(x) for x in line.split()} - - def extract_port_ids_from_list(self, port_list): - return {int(x) for x in port_list} - - def parse_ports_from_line (self, line): - port_list = set() - if line: - for port_id in line.split(' '): - if (not port_id.isdigit()) or (int(port_id) < 0) or (int(port_id) >= self.stateless_client.get_port_count()): - print "Please provide a list of ports separated by spaces between 0 and {0}".format(self.stateless_client.get_port_count() - 1) - return None - - port_list.add(int(port_id)) - - port_list = list(port_list) - - else: - port_list = [i for i in xrange(0, self.stateless_client.get_port_count())] - - return port_list - - - def do_acquire(self, line, force=False): - '''Acquire ports\n''' - - # make sure that the user wants to acquire all - args = line.split() - if len(args) < 1: - print magenta("Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports") - return - - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to acquire all ports ? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_port_ids() - else: - port_list = self.extract_port_ids_from_line(line) - - # rc, resp_list = self.stateless_client.take_ownership(port_list, force) - try: - res_ok, log = self.stateless_client.acquire(port_list, force) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - - - def port_auto_complete(self, text, line, begidx, endidx, acquired=True, active=False): - if acquired: - if not active: - ret_list = [x - for x in map(str, self.stateless_client.get_acquired_ports()) - if x.startswith(text)] - else: - ret_list = [x - for x in map(str, self.stateless_client.get_active_ports()) - if x.startswith(text)] - else: - ret_list = [x - for x in map(str, self.stateless_client.get_port_ids()) - if x.startswith(text)] - ret_list.append("all") - return ret_list - - - def complete_acquire(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx, acquired=False) - - def do_release (self, line): - '''Release ports\n''' - - # if line: - # port_list = self.parse_ports_from_line(line) - # else: - # port_list = self.stateless_client.get_owned_ports() - args = line.split() - if len(args) < 1: - print "Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports" - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to release all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_acquired_ports() - else: - port_list = self.extract_port_ids_from_line(line) - - try: - res_ok, log = self.stateless_client.release(port_list) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - return - - def complete_release(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx) - - def do_connect (self, line): - '''Connects to the server\n''' - - if line == "": - res_ok, msg = self.stateless_client.connect() - else: - sp = line.split() - if (len(sp) != 2): - print "\n[usage] connect [server] [port] or without parameters\n" - return - - res_ok, msg = self.stateless_client.connect(sp[0], sp[1]) - - if res_ok: - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print "\n*** " + msg + "\n" - print format_text("[FAILED]\n", 'red', 'bold') - return - - self.supported_rpc = self.stateless_client.get_supported_cmds().data - - # def do_rpc (self, line): - # '''Launches a RPC on the server\n''' - # - # if line == "": - # print "\nUsage: [method name] [param dict as string]\n" - # print "Example: rpc test_add {'x': 12, 'y': 17}\n" - # return - # - # sp = line.split(' ', 1) - # method = sp[0] - # - # params = None - # bad_parse = False - # if len(sp) > 1: - # - # try: - # params = ast.literal_eval(sp[1]) - # if not isinstance(params, dict): - # bad_parse = True - # - # except ValueError as e1: - # bad_parse = True - # except SyntaxError as e2: - # bad_parse = True - # - # if bad_parse: - # print "\nValue should be a valid dict: '{0}'".format(sp[1]) - # print "\nUsage: [method name] [param dict as string]\n" - # print "Example: rpc test_add {'x': 12, 'y': 17}\n" - # return - # - # res_ok, msg = self.stateless_client.transmit(method, params) - # if res_ok: - # print "\nServer Response:\n\n" + pretty_json(json.dumps(msg)) + "\n" - # else: - # print "\n*** " + msg + "\n" - # #print "Please try 'reconnect' to reconnect to server" - # - # - # def complete_rpc (self, text, line, begidx, endidx): - # return [x - # for x in self.supported_rpc - # if x.startswith(text)] - - def do_status (self, line): - '''Shows a graphical console\n''' - - if not self.stateless_client.is_connected(): - print "Not connected to server\n" - return - - self.do_verbose('off') - trex_status.show_trex_status(self.stateless_client) - - def do_quit(self, line): - '''Exit the client\n''' - return True - - def do_disconnect (self, line): - '''Disconnect from the server\n''' - if not self.stateless_client.is_connected(): - print "Not connected to server\n" - return - - res_ok, msg = self.stateless_client.disconnect() - if res_ok: - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print msg + "\n" - - def do_whoami (self, line): - '''Prints console user name\n''' - print "\n" + self.stateless_client.user + "\n" - - def postcmd(self, stop, line): - if self.stateless_client.is_connected(): - self.prompt = "TRex > " - else: - self.supported_rpc = None - self.prompt = "TRex (offline) > " - - return stop - - def default(self, line): - print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line) - - # def do_help (self, line): - # '''Shows This Help Screen\n''' - # if line: - # try: - # func = getattr(self, 'help_' + line) - # except AttributeError: - # try: - # doc = getattr(self, 'do_' + line).__doc__ - # if doc: - # self.stdout.write("%s\n"%str(doc)) - # return - # except AttributeError: - # pass - # self.stdout.write("%s\n"%str(self.nohelp % (line,))) - # return - # func() - # return - # - # print "\nSupported Console Commands:" - # print "----------------------------\n" - # - # cmds = [x[3:] for x in self.get_names() if x.startswith("do_")] - # for cmd in cmds: - # if cmd == "EOF": - # continue - # - # try: - # doc = getattr(self, 'do_' + cmd).__doc__ - # if doc: - # help = str(doc) - # else: - # help = "*** Undocumented Function ***\n" - # except AttributeError: - # help = "*** Undocumented Function ***\n" - # - # print "{:<30} {:<30}".format(cmd + " - ", help) - - def do_stream_db_add(self, line): - '''Loads a YAML stream list serialization into user console \n''' - args = line.split() - if len(args) >= 2: - name = args[0] - yaml_path = args[1] - try: - multiplier = args[2] - except IndexError: - multiplier = 1 - stream_list = CStreamList() - loaded_obj = stream_list.load_yaml(yaml_path, multiplier) - # print self.stateless_client.pretty_json(json.dumps(loaded_obj)) - try: - compiled_streams = stream_list.compile_streams() - res_ok = self.streams_db.load_streams(name, LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump()) - for k, v in compiled_streams.items()])) - if res_ok: - print green("Stream pack '{0}' loaded and added successfully\n".format(name)) - else: - print magenta("Picked name already exist. Please pick another name.\n") - except Exception as e: - print "adding new stream failed due to the following error:\n", str(e) - print format_text("[FAILED]\n", 'red', 'bold') - - return - else: - print magenta("please provide load name and YAML path, separated by space.\n" - "Optionally, you may provide a third argument to specify multiplier.\n") - - @staticmethod - def tree_autocomplete(text): - dir = os.path.dirname(text) - if dir: - path = dir - else: - path = "." - start_string = os.path.basename(text) - return [x - for x in os.listdir(path) - if x.startswith(start_string)] - - - def complete_stream_db_add(self, text, line, begidx, endidx): - arg_num = len(line.split()) - 1 - if arg_num == 2: - return TRexConsole.tree_autocomplete(line.split()[-1]) - else: - return [text] - - def do_stream_db_show(self, line): - '''Shows the loaded stream list named [name] \n''' - args = line.split() - if args: - list_name = args[0] - try: - stream = self.streams_db.get_stream_pack(list_name)#user_streams[list_name] - if len(args) >= 2 and args[1] == "full": - print pretty_json(json.dumps(stream.compiled)) - else: - print pretty_json(json.dumps(stream.loaded)) - except KeyError as e: - print "Unknown stream list name provided" - else: - print "Available stream packs:\n{0}".format(', '.join(sorted(self.streams_db.get_loaded_streams_names()))) - - def complete_stream_db_show(self, text, line, begidx, endidx): - return [x - for x in self.streams_db.get_loaded_streams_names() - if x.startswith(text)] - - def do_stream_db_remove(self, line): - '''Removes a single loaded stream packs from loaded stream pack repository\n''' - args = line.split() - if args: - removed_streams = self.streams_db.remove_stream_packs(*args) - if removed_streams: - print green("The following stream packs were removed:") - print bold(", ".join(sorted(removed_streams))) - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print red("No streams were removed. Make sure to provide valid stream pack names.") - else: - print magenta("Please provide stream pack name(s), separated with spaces.") - - def do_stream_db_clear(self, line): - '''Clears all loaded stream packs from loaded stream pack repository\n''' - self.streams_db.clear() - print format_text("[SUCCESS]\n", 'green', 'bold') - - - def complete_stream_db_remove(self, text, line, begidx, endidx): - return [x - for x in self.streams_db.get_loaded_streams_names() - if x.startswith(text)] - - - def do_attach(self, line): - '''Assign loaded stream pack into specified ports on TRex\n''' - args = line.split() - if len(args) >= 2: - stream_pack_name = args[0] - stream_list = self.streams_db.get_stream_pack(stream_pack_name) #user_streams[args[0]] - if not stream_list: - print "Provided stream list name '{0}' doesn't exists.".format(stream_pack_name) - print format_text("[FAILED]\n", 'red', 'bold') - return - if args[1] == "all": - ask = ConfirmMenu('Are you sure you want to release all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_acquired_ports() - else: - port_list = self.extract_port_ids_from_line(' '.join(args[1:])) - owned = set(self.stateless_client.get_acquired_ports()) - try: - if set(port_list).issubset(owned): - res_ok, log = self.stateless_client.add_stream_pack(stream_list.compiled, port_id=port_list) - # res_ok, msg = self.stateless_client.add_stream(port_list, stream_list.compiled) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - return - else: - print "Not all desired ports are acquired.\n" \ - "Acquired ports are: {acq}\n" \ - "Requested ports: {req}\n" \ - "Missing ports: {miss}".format(acq=list(owned), - req=port_list, - miss=list(set(port_list).difference(owned))) - print format_text("[FAILED]\n", 'red', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - else: - print magenta("Please provide list name and ports to attach to, " - "or specify 'all' to attach all owned ports.\n") - - def complete_attach(self, text, line, begidx, endidx): - arg_num = len(line.split()) - 1 - if arg_num == 1: - # return optional streams packs - if line.endswith(" "): - return self.port_auto_complete(text, line, begidx, endidx) - return [x - for x in self.streams_db.get_loaded_streams_names() - if x.startswith(text)] - elif arg_num >= 2: - # return optional ports to attach to - return self.port_auto_complete(text, line, begidx, endidx) - else: - return [text] - - def prompt_response(self, response_obj): - resp_list = response_obj if isinstance(response_obj, list) else [response_obj] - def format_return_status(return_status): - if return_status: - return green("OK") - else: - return red("FAIL") - - for response in resp_list: - response_str = "{id:^3} - {msg} ({stat})".format(id=response.id, - msg=response.msg, - stat=format_return_status(response.success)) - print response_str - return - - def do_remove_all_streams(self, line): - '''Acquire ports\n''' - - # make sure that the user wants to acquire all - args = line.split() - if len(args) < 1: - print magenta("Please provide a list of ports separated by spaces, " - "or specify 'all' to remove from all acquired ports") - return - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to remove all stream packs from all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_acquired_ports() - else: - port_list = self.extract_port_ids_from_line(line) - - # rc, resp_list = self.stateless_client.take_ownership(port_list, force) - try: - res_ok, log = self.stateless_client.remove_all_streams(port_list) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - - def complete_remove_all_streams(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx) - - def do_start(self, line): - '''Start selected traffic in specified ports on TRex\n''' - # make sure that the user wants to acquire all - parser = parsing_opts.gen_parser("start", self.do_start.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.FORCE, - parsing_opts.STREAM_FROM_PATH_OR_FILE, - parsing_opts.DURATION, - parsing_opts.MULTIPLIER) - opts = parser.parse_args(line.split()) - if opts is None: - # avoid further processing in this command - return - # print opts - port_list = self.extract_port_list(opts) - # print port_list - if opts.force: - # stop all active ports, if any - res_ok = self.stop_traffic(set(self.stateless_client.get_active_ports()).intersection(port_list)) - if not res_ok: - print yellow("[ABORTED]\n") - return - # remove all traffic from ports - res_ok = self.remove_all_streams(port_list) - if not res_ok: - print yellow("[ABORTED]\n") - return - # decide which traffic to use - stream_pack_name = None - if opts.db: - # use pre-loaded traffic - print format_text('{:<30}'.format("Load stream pack (from DB):"), 'bold'), - if opts.db not in self.streams_db.get_loaded_streams_names(): - print format_text("[FAILED]\n", 'red', 'bold') - print yellow("[ABORTED]\n") - return - else: - stream_pack_name = opts.db - else: - # try loading a YAML file - print format_text('{:<30}'.format("Load stream pack (from file):"), 'bold'), - stream_list = CStreamList() - loaded_obj = stream_list.load_yaml(opts.file[0]) - # print self.stateless_client.pretty_json(json.dumps(loaded_obj)) - try: - compiled_streams = stream_list.compile_streams() - res_ok = self.streams_db.load_streams(opts.file[1], - LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump()) - for k, v in compiled_streams.items()])) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - print yellow("[ABORTED]\n") - return - print format_text("[SUCCESS]\n", 'green', 'bold') - stream_pack_name = opts.file[1] - except Exception as e: - print format_text("[FAILED]\n", 'red', 'bold') - print yellow("[ABORTED]\n") - res_ok = self.attach_to_port(stream_pack_name, port_list) - if not res_ok: - print yellow("[ABORTED]\n") - return - # finally, start the traffic - res_ok = self.start_traffic(opts.mult, port_list) - if not res_ok: - print yellow("[ABORTED]\n") - return - return - - def help_start(self): - self.do_start("-h") - - def do_stop(self, line): - '''Stop active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser("stop", self.do_stop.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - opts = parser.parse_args(line.split()) - if opts is None: - # avoid further processing in this command - return - port_list = self.extract_port_list(opts) - res_ok = self.stop_traffic(port_list) - return - - def do_pause(self, line): - '''Pause active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser("stop", self.do_stop.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - opts = parser.parse_args(line.split()) - if opts is None: - # avoid further processing in this command - return - port_list = self.extract_port_list(opts) - res_ok = self.stop_traffic(port_list) - return - - - def help_stop(self): - self.do_stop("-h") - - - def do_debug(self, line): - '''Enter DEBUG mode of the console to invoke smaller building blocks with server''' - i = DebugTRexConsole(self) - i.prompt = self.prompt[:-3] + ':' + blue('debug') + ' > ' - i.cmdloop() - - # aliasing - do_exit = do_EOF = do_q = do_quit - - # ----- utility methods ----- # - - def start_traffic(self, multiplier, port_list):#, silent=True): - print format_text('{:<30}'.format("Start traffic:"), 'bold'), - try: - res_ok, log = self.stateless_client.start_traffic(multiplier, port_id=port_list) - if not self._silent: - print '' - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return False - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - except ValueError as e: - print '' - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - return False - - def attach_to_port(self, stream_pack_name, port_list): - print format_text('{:<30}'.format("Attaching traffic to ports:"), 'bold'), - stream_list = self.streams_db.get_stream_pack(stream_pack_name) #user_streams[args[0]] - if not stream_list: - print "Provided stream list name '{0}' doesn't exists.".format(stream_pack_name) - print format_text("[FAILED]\n", 'red', 'bold') - return - try: - res_ok, log = self.stateless_client.add_stream_pack(stream_list.compiled, port_id=port_list) - if not self._silent: - print '' - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return False - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - except ValueError as e: - print '' - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - return False - - def stop_traffic(self, port_list): - print format_text('{:<30}'.format("Stop traffic:"), 'bold'), - try: - res_ok, log = self.stateless_client.stop_traffic(port_id=port_list) - if not self._silent: - print '' - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - except ValueError as e: - print '' - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - - def remove_all_streams(self, port_list): - '''Remove all streams from given port_list''' - print format_text('{:<30}'.format("Remove all streams:"), 'bold'), - try: - res_ok, log = self.stateless_client.remove_all_streams(port_id=port_list) - if not self._silent: - print '' - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - except ValueError as e: - print '' - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - - - - - - def extract_port_list(self, opts): - if opts.all_ports or "all" in opts.ports: - # handling all ports - port_list = self.stateless_client.get_acquired_ports() - else: - port_list = self.extract_port_ids_from_list(opts.ports) - return port_list - - def decode_multiplier(self, opts_mult): - pass - - -class DebugTRexConsole(cmd.Cmd): - - def __init__(self, trex_main_console): - cmd.Cmd.__init__(self) - self.trex_console = trex_main_console - self.stateless_client = self.trex_console.stateless_client - self.streams_db = self.trex_console.streams_db - self.register_main_console_methods() - self.do_silent("on") - pass - - # ----- super methods overriding ----- # - def completenames(self, text, *ignored): - dotext = 'do_'+text - return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)] - - def get_names(self): - result = cmd.Cmd.get_names(self) - result += self.trex_console.get_names() - return list(set(result)) - - def register_main_console_methods(self): - main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__))) - for name in main_names: - for prefix in 'do_', 'help_', 'complete_': - if name.startswith(prefix): - self.__dict__[name] = getattr(self.trex_console, name) - - # if (name[:3] == 'do_') or (name[:5] == 'help_') or (name[:9] == 'complete_'): - # chosen.append(name) - # self.__dict__[name] = getattr(self.trex_console, name) - # # setattr(self, name, classmethod(getattr(self.trex_console, name))) - - # print chosen - # self.get_names() - - # return result - - - # ----- DEBUGGING methods ----- # - # set silent on / off - def do_silent(self, line): - '''Shows or set silent mode\n''' - if line == "": - print "\nsilent mode is " + ("on\n" if self.trex_console._silent else "off\n") - - elif line == "on": - self.verbose = True - self.stateless_client.set_verbose(True) - print green("\nsilent set to on\n") - - elif line == "off": - self.verbose = False - self.stateless_client.set_verbose(False) - print green("\nsilent set to off\n") - - else: - print magenta("\nplease specify 'on' or 'off'\n") - - def do_quit(self, line): - '''Exit the debug client back to main console\n''' - self.do_silent("off") - return True - - def do_start_traffic(self, line): - '''Start pre-submitted traffic in specified ports on TRex\n''' - # make sure that the user wants to acquire all - parser = parsing_opts.gen_parser("start_traffic", self.do_start_traffic.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.MULTIPLIER) - opts = parser.parse_args(line.split()) - # print opts - # return - if opts is None: - # avoid further processing in this command - return - try: - port_list = self.trex_console.extract_port_list(opts) - return self.trex_console.start_traffic(opts.mult, port_list) - except Exception as e: - print e - return - - def do_stop_traffic(self, line): - '''Stop active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser("stop_traffic", self.do_stop_traffic.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - opts = parser.parse_args(line.split()) - # print opts - # return - if opts is None: - # avoid further processing in this command - return - try: - port_list = self.trex_console.extract_port_list(opts) - return self.trex_console.stop_traffic(port_list) - except Exception as e: - print e - return - - - def complete_stop_traffic(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx, active=True) - - # return - # # return - # # if not opts.port_list: - # # print magenta("Please provide a list of ports separated by spaces, " - # # "or specify 'all' to start traffic on all acquired ports") - # # return - # - - - return - args = line.split() - if len(args) < 1: - print magenta("Please provide a list of ports separated by spaces, " - "or specify 'all' to start traffic on all acquired ports") - return - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_acquired_ports() - else: - port_list = self.extract_port_ids_from_line(line) - - try: - res_ok, log = self.stateless_client.start_traffic(1.0, port_id=port_list) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - - def complete_start_traffic(self, text, line, begidx, endidx): - # return self.port_auto_complete(text, line, begidx, endidx) - return [text] - - def help_start_traffic(self): - self.do_start_traffic("-h") - - def help_stop_traffic(self): - self.do_stop_traffic("-h") - - # def do_help(self): - - def do_rpc (self, line): - '''Launches a RPC on the server\n''' - - if line == "": - print "\nUsage: [method name] [param dict as string]\n" - print "Example: rpc test_add {'x': 12, 'y': 17}\n" - return - - sp = line.split(' ', 1) - method = sp[0] - - params = None - bad_parse = False - if len(sp) > 1: - - try: - params = ast.literal_eval(sp[1]) - if not isinstance(params, dict): - bad_parse = True - - except ValueError as e1: - bad_parse = True - except SyntaxError as e2: - bad_parse = True - - if bad_parse: - print "\nValue should be a valid dict: '{0}'".format(sp[1]) - print "\nUsage: [method name] [param dict as string]\n" - print "Example: rpc test_add {'x': 12, 'y': 17}\n" - return - - res_ok, msg = self.stateless_client.transmit(method, params) - if res_ok: - print "\nServer Response:\n\n" + pretty_json(json.dumps(msg)) + "\n" - else: - print "\n*** " + msg + "\n" - #print "Please try 'reconnect' to reconnect to server" - - - def complete_rpc (self, text, line, begidx, endidx): - return [x - for x in self.trex_console.supported_rpc - if x.startswith(text)] - - # aliasing - do_exit = do_EOF = do_q = do_quit - -# diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 9236ce98..a3ea6693 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. """ - +import subprocess import cmd import json import ast @@ -34,8 +34,8 @@ from client.trex_stateless_client import CTRexStatelessClient from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts -import trex_status - +import trex_tui +from functools import wraps __version__ = "1.1" @@ -114,13 +114,13 @@ class TRexGeneralCmd(cmd.Cmd): class TRexConsole(TRexGeneralCmd): """Trex Console""" - def __init__(self, stateless_client, acquire_all_ports=True, verbose=False): + def __init__(self, stateless_client, verbose=False): self.stateless_client = stateless_client TRexGeneralCmd.__init__(self) + self.tui = trex_tui.TrexTUI(stateless_client) self.verbose = verbose - self.acquire_all_ports = acquire_all_ports self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) self.intro += "\nType 'help' or '?' for supported actions\n" @@ -130,9 +130,49 @@ class TRexConsole(TRexGeneralCmd): ################### internal section ######################## + def verify_connected(f): + @wraps(f) + def wrap(*args): + inst = args[0] + func_name = f.__name__ + if func_name.startswith("do_"): + func_name = func_name[3:] + + if not inst.stateless_client.is_connected(): + print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold') + return + + ret = f(*args) + return ret + + return wrap + + # TODO: remove this ugly duplication + def verify_connected_and_rw (f): + @wraps(f) + def wrap(*args): + inst = args[0] + func_name = f.__name__ + if func_name.startswith("do_"): + func_name = func_name[3:] + + if not inst.stateless_client.is_connected(): + print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold') + return + + if inst.stateless_client.is_read_only(): + print format_text("\n'{0}' cannot be executed on read only mode\n".format(func_name), 'bold') + return + + rc = f(*args) + return rc + + return wrap + + def get_console_identifier(self): return "{context}_{server}".format(context=self.__class__.__name__, - server=self.stateless_client.get_system_info()['hostname']) + server=self.stateless_client.get_server_ip()) def register_main_console_methods(self): main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__))) @@ -142,11 +182,18 @@ class TRexConsole(TRexGeneralCmd): self.__dict__[name] = getattr(self.trex_console, name) def postcmd(self, stop, line): - if self.stateless_client.is_connected(): - self.prompt = "TRex > " - else: - self.supported_rpc = None + + if not self.stateless_client.is_connected(): self.prompt = "TRex (offline) > " + self.supported_rpc = None + return stop + + if self.stateless_client.is_read_only(): + self.prompt = "TRex (read only) > " + return stop + + + self.prompt = "TRex > " return stop @@ -208,9 +255,9 @@ class TRexConsole(TRexGeneralCmd): ####################### shell commands ####################### + @verify_connected def do_ping (self, line): '''Ping the server\n''' - rc = self.stateless_client.cmd_ping() if rc.bad(): return @@ -224,12 +271,12 @@ class TRexConsole(TRexGeneralCmd): elif line == "on": self.verbose = True - self.stateless_client.set_verbose(True) + self.stateless_client.set_verbose(self.stateless_client.VERBOSE_HIGH) print format_text("\nverbose set to on\n", 'green', 'bold') elif line == "off": self.verbose = False - self.stateless_client.set_verbose(False) + self.stateless_client.set_verbose(self.stateless_client.VERBOSE_REGULAR) print format_text("\nverbose set to off\n", 'green', 'bold') else: @@ -273,17 +320,13 @@ class TRexConsole(TRexGeneralCmd): def do_connect (self, line): '''Connects to the server\n''' - rc = self.stateless_client.cmd_connect() - if rc.bad(): - return + self.stateless_client.cmd_connect_line(line) def do_disconnect (self, line): '''Disconnect from the server\n''' - rc = self.stateless_client.cmd_disconnect() - if rc.bad(): - return + self.stateless_client.cmd_disconnect() ############### start @@ -300,56 +343,79 @@ class TRexConsole(TRexGeneralCmd): if (l > 2) and (s[l - 2] in file_flags): return TRexConsole.tree_autocomplete(s[l - 1]) + @verify_connected_and_rw def do_start(self, line): '''Start selected traffic in specified port(s) on TRex\n''' self.stateless_client.cmd_start_line(line) + + def help_start(self): self.do_start("-h") ############# stop + @verify_connected_and_rw def do_stop(self, line): '''stops port(s) transmitting traffic\n''' + self.stateless_client.cmd_stop_line(line) def help_stop(self): self.do_stop("-h") ############# update + @verify_connected_and_rw def do_update(self, line): '''update speed of port(s)currently transmitting traffic\n''' + self.stateless_client.cmd_update_line(line) def help_update (self): self.do_update("-h") ############# pause + @verify_connected_and_rw def do_pause(self, line): '''pause port(s) transmitting traffic\n''' + self.stateless_client.cmd_pause_line(line) ############# resume + @verify_connected_and_rw def do_resume(self, line): '''resume port(s) transmitting traffic\n''' + self.stateless_client.cmd_resume_line(line) ########## reset + @verify_connected_and_rw def do_reset (self, line): '''force stop all ports\n''' - self.stateless_client.cmd_reset() + self.stateless_client.cmd_reset_line(line) + + ######### validate + @verify_connected + def do_validate (self, line): + '''validates port(s) stream configuration\n''' + + self.stateless_client.cmd_validate_line(line) + + + @verify_connected def do_stats(self, line): '''Fetch statistics from TRex server by port\n''' self.stateless_client.cmd_stats_line(line) - pass + def help_stats(self): self.do_stats("-h") + @verify_connected def do_clear(self, line): '''Clear cached local statistics\n''' self.stateless_client.cmd_clear_line(line) @@ -386,16 +452,43 @@ class TRexConsole(TRexGeneralCmd): self.stateless_client.clear_events() print format_text("\n\nEvent log was cleared\n\n") + # tui + @verify_connected def do_tui (self, line): '''Shows a graphical console\n''' - if not self.stateless_client.is_connected(): - print format_text("\nNot connected to server\n", 'bold') + parser = parsing_opts.gen_parser(self, + "tui", + self.do_tui.__doc__, + parsing_opts.XTERM) + + opts = parser.parse_args(line.split()) + if opts is None: + return + + if opts.xterm: + exe = '' + if os.path.isfile('/usr/bin/wmctrl'): + exe += '/usr/bin/wmctrl -r trex_tui -b add,above;' + + exe += './trex-console -t -q -s {0} -p {1}'.format(self.stateless_client.get_server_ip(), self.stateless_client.get_server_port()) + + cmd = ['xterm', '-geometry', '105x40', '-title', 'trex_tui', '-e', exe] + subprocess.Popen(cmd) + return - self.do_verbose('off') - trex_status.show_trex_status(self.stateless_client) + save_verbose = self.stateless_client.get_verbose() + + self.stateless_client.set_verbose(self.stateless_client.VERBOSE_QUIET) + self.tui.show() + self.stateless_client.set_verbose(save_verbose) + + + def help_tui (self): + do_tui("-h") + # quit function def do_quit(self, line): @@ -487,6 +580,15 @@ def setParserOptions(): help = "Run the console in a batch mode with file", default = None) + parser.add_argument("-t", "--tui", dest="tui", + action="store_true", help="Starts with TUI mode", + default = False) + + + parser.add_argument("-q", "--quiet", dest="quiet", + action="store_true", help="Starts with all outputs suppressed", + default = False) + return parser @@ -495,11 +597,24 @@ def main(): options = parser.parse_args() # Stateless client connection - stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub) - rc = stateless_client.cmd_connect() + stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub, options.quiet) + + if not options.quiet: + print "\nlogged as {0}".format(format_text(options.user, 'bold')) + + # TUI or no acquire will give us READ ONLY mode + if options.tui or not options.acquire: + rc = stateless_client.connect("RO") + else: + rc = stateless_client.connect("RW") + + # unable to connect - bye if rc.bad(): + rc.annotate() return + + # a script mode if options.batch: cont = stateless_client.run_script_file(options.batch[0]) if not cont: @@ -507,11 +622,17 @@ def main(): # console try: - console = TRexConsole(stateless_client, options.acquire, options.verbose) - console.cmdloop() + console = TRexConsole(stateless_client, options.verbose) + if options.tui: + console.do_tui("") + else: + console.cmdloop() + except KeyboardInterrupt as e: print "\n\n*** Caught Ctrl + C... Exiting...\n\n" - return + + finally: + stateless_client.disconnect() if __name__ == '__main__': main() diff --git a/scripts/automation/trex_control_plane/console/trex_tui.py b/scripts/automation/trex_control_plane/console/trex_tui.py new file mode 100644 index 00000000..febe62f4 --- /dev/null +++ b/scripts/automation/trex_control_plane/console/trex_tui.py @@ -0,0 +1,469 @@ +import termios +import sys +import os +import time +from common.text_opts import * +from common import trex_stats +from client_utils import text_tables +from collections import OrderedDict +import datetime + +class SimpleBar(object): + def __init__ (self, desc, pattern): + self.desc = desc + self.pattern = pattern + self.pattern_len = len(pattern) + self.index = 0 + + def show (self): + if self.desc: + print format_text("{0} {1}".format(self.desc, self.pattern[self.index]), 'bold') + else: + print format_text("{0}".format(self.pattern[self.index]), 'bold') + + self.index = (self.index + 1) % self.pattern_len + + +# base type of a panel +class TrexTUIPanel(object): + def __init__ (self, mng, name): + + self.mng = mng + self.name = name + self.stateless_client = mng.stateless_client + + def show (self): + raise NotImplementedError("must implement this") + + def get_key_actions (self): + raise NotImplementedError("must implement this") + + def get_name (self): + return self.name + + +# dashboard panel +class TrexTUIDashBoard(TrexTUIPanel): + def __init__ (self, mng): + super(TrexTUIDashBoard, self).__init__(mng, "dashboard") + + self.key_actions = OrderedDict() + + self.key_actions['c'] = {'action': self.action_clear, 'legend': 'clear', 'show': True} + self.key_actions['p'] = {'action': self.action_pause, 'legend': 'pause', 'show': True} + self.key_actions['r'] = {'action': self.action_resume, 'legend': 'resume', 'show': True} + self.key_actions['+'] = {'action': self.action_raise, 'legend': 'up 5%', 'show': True} + self.key_actions['-'] = {'action': self.action_lower, 'legend': 'low 5%', 'show': True} + + self.ports = self.stateless_client.get_all_ports() + + + def show (self): + stats = self.stateless_client.cmd_stats(self.ports, trex_stats.COMPACT) + # print stats to screen + for stat_type, stat_data in stats.iteritems(): + text_tables.print_table_with_header(stat_data.text_table, stat_type) + + + def get_key_actions (self): + allowed = {} + + allowed['c'] = self.key_actions['c'] + + # thats it for read only + if self.stateless_client.is_read_only(): + return allowed + + if len(self.stateless_client.get_transmitting_ports()) > 0: + allowed['p'] = self.key_actions['p'] + allowed['+'] = self.key_actions['+'] + allowed['-'] = self.key_actions['-'] + + + if len(self.stateless_client.get_paused_ports()) > 0: + allowed['r'] = self.key_actions['r'] + + return allowed + + + ######### actions + def action_pause (self): + rc = self.stateless_client.pause_traffic(self.mng.ports) + + ports_succeeded = [] + for rc_single, port_id in zip(rc.rc_list, self.mng.ports): + if rc_single.rc: + ports_succeeded.append(port_id) + + if len(ports_succeeded) > 0: + return "paused traffic on port(s): {0}".format(ports_succeeded) + else: + return "" + + + def action_resume (self): + rc = self.stateless_client.resume_traffic(self.mng.ports) + + ports_succeeded = [] + for rc_single, port_id in zip(rc.rc_list, self.mng.ports): + if rc_single.rc: + ports_succeeded.append(port_id) + + if len(ports_succeeded) > 0: + return "resumed traffic on port(s): {0}".format(ports_succeeded) + else: + return "" + + + def action_raise (self): + mul = {'type': 'percentage', 'value': 5, 'op': 'add'} + rc = self.stateless_client.update_traffic(mul, self.mng.ports) + + ports_succeeded = [] + for rc_single, port_id in zip(rc.rc_list, self.mng.ports): + if rc_single.rc: + ports_succeeded.append(port_id) + + if len(ports_succeeded) > 0: + return "raised B/W by %5 on port(s): {0}".format(ports_succeeded) + else: + return "" + + def action_lower (self): + mul = {'type': 'percentage', 'value': 5, 'op': 'sub'} + rc = self.stateless_client.update_traffic(mul, self.mng.ports) + + ports_succeeded = [] + for rc_single, port_id in zip(rc.rc_list, self.mng.ports): + if rc_single.rc: + ports_succeeded.append(port_id) + + if len(ports_succeeded) > 0: + return "lowered B/W by %5 on port(s): {0}".format(ports_succeeded) + else: + return "" + + + def action_clear (self): + self.stateless_client.cmd_clear(self.mng.ports) + return "cleared all stats" + + +# port panel +class TrexTUIPort(TrexTUIPanel): + def __init__ (self, mng, port_id): + super(TrexTUIPort, self).__init__(mng, "port {0}".format(port_id)) + + self.port_id = port_id + self.port = self.mng.stateless_client.get_port(port_id) + + self.key_actions = OrderedDict() + + self.key_actions['c'] = {'action': self.action_clear, 'legend': 'clear', 'show': True} + self.key_actions['p'] = {'action': self.action_pause, 'legend': 'pause', 'show': True} + self.key_actions['r'] = {'action': self.action_resume, 'legend': 'resume', 'show': True} + self.key_actions['+'] = {'action': self.action_raise, 'legend': 'up 5%', 'show': True} + self.key_actions['-'] = {'action': self.action_lower, 'legend': 'low 5%', 'show': True} + + + def show (self): + stats = self.stateless_client.cmd_stats([self.port_id], trex_stats.COMPACT) + # print stats to screen + for stat_type, stat_data in stats.iteritems(): + text_tables.print_table_with_header(stat_data.text_table, stat_type) + + def get_key_actions (self): + + allowed = {} + + allowed['c'] = self.key_actions['c'] + + # thats it for read only + if self.stateless_client.is_read_only(): + return allowed + + if self.port.state == self.port.STATE_TX: + allowed['p'] = self.key_actions['p'] + allowed['+'] = self.key_actions['+'] + allowed['-'] = self.key_actions['-'] + + elif self.port.state == self.port.STATE_PAUSE: + allowed['r'] = self.key_actions['r'] + + + return allowed + + # actions + def action_pause (self): + rc = self.stateless_client.pause_traffic([self.port_id]) + if rc.good(): + return "port {0}: paused traffic".format(self.port_id) + else: + return "" + + def action_resume (self): + rc = self.stateless_client.resume_traffic([self.port_id]) + if rc.good(): + return "port {0}: resumed traffic".format(self.port_id) + else: + return "" + + def action_raise (self): + mul = {'type': 'percentage', 'value': 5, 'op': 'add'} + rc = self.stateless_client.update_traffic(mul, [self.port_id]) + + if rc.good(): + return "port {0}: raised B/W by 5%".format(self.port_id) + else: + return "" + + def action_lower (self): + mul = {'type': 'percentage', 'value': 5, 'op': 'sub'} + rc = self.stateless_client.update_traffic(mul, [self.port_id]) + + if rc.good(): + return "port {0}: lowered B/W by 5%".format(self.port_id) + else: + return "" + + def action_clear (self): + self.stateless_client.cmd_clear([self.port_id]) + return "port {0}: cleared stats".format(self.port_id) + +# log +class TrexTUILog(): + def __init__ (self): + self.log = [] + + def add_event (self, msg): + self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) + + def show (self, max_lines = 4): + + cut = len(self.log) - max_lines + if cut < 0: + cut = 0 + + print format_text("\nLog:", 'bold', 'underline') + + for msg in self.log[cut:]: + print msg + + +# Panels manager (contains server panels) +class TrexTUIPanelManager(): + def __init__ (self, tui): + self.tui = tui + self.stateless_client = tui.stateless_client + self.ports = self.stateless_client.get_all_ports() + + + self.panels = {} + self.panels['dashboard'] = TrexTUIDashBoard(self) + + self.key_actions = OrderedDict() + self.key_actions['q'] = {'action': self.action_quit, 'legend': 'quit', 'show': True} + self.key_actions['g'] = {'action': self.action_show_dash, 'legend': 'dashboard', 'show': True} + + for port_id in self.ports: + self.key_actions[str(port_id)] = {'action': self.action_show_port(port_id), 'legend': 'port {0}'.format(port_id), 'show': False} + self.panels['port {0}'.format(port_id)] = TrexTUIPort(self, port_id) + + # start with dashboard + self.main_panel = self.panels['dashboard'] + + # log object + self.log = TrexTUILog() + + self.generate_legend() + + self.conn_bar = SimpleBar('status: ', ['|','/','-','\\']) + self.dis_bar = SimpleBar('status: ', ['X', ' ']) + self.show_log = False + + + def generate_legend (self): + self.legend = "\n{:<12}".format("browse:") + + for k, v in self.key_actions.iteritems(): + if v['show']: + x = "'{0}' - {1}, ".format(k, v['legend']) + self.legend += "{:}".format(x) + + self.legend += "'0-{0}' - port display".format(len(self.ports) - 1) + + + self.legend += "\n{:<12}".format(self.main_panel.get_name() + ":") + for k, v in self.main_panel.get_key_actions().iteritems(): + if v['show']: + x = "'{0}' - {1}, ".format(k, v['legend']) + self.legend += "{:}".format(x) + + + def print_connection_status (self): + if self.tui.get_state() == self.tui.STATE_ACTIVE: + self.conn_bar.show() + else: + self.dis_bar.show() + + def print_legend (self): + print format_text(self.legend, 'bold') + + + # on window switch or turn on / off of the TUI we call this + def init (self, show_log = False): + self.show_log = show_log + self.generate_legend() + + def show (self): + self.main_panel.show() + self.print_connection_status() + self.print_legend() + + if self.show_log: + self.log.show() + + + def handle_key (self, ch): + # check for the manager registered actions + if ch in self.key_actions: + msg = self.key_actions[ch]['action']() + + # check for main panel actions + elif ch in self.main_panel.get_key_actions(): + msg = self.main_panel.get_key_actions()[ch]['action']() + + else: + msg = "" + + self.generate_legend() + + if msg == None: + return False + else: + if msg: + self.log.add_event(msg) + return True + + + # actions + + def action_quit (self): + return None + + def action_show_dash (self): + self.main_panel = self.panels['dashboard'] + self.init(self.show_log) + return "" + + def action_show_port (self, port_id): + def action_show_port_x (): + self.main_panel = self.panels['port {0}'.format(port_id)] + self.init() + return "" + + return action_show_port_x + + + +# shows a textual top style window +class TrexTUI(): + + STATE_ACTIVE = 0 + STATE_LOST_CONT = 1 + STATE_RECONNECT = 2 + + def __init__ (self, stateless_client): + self.stateless_client = stateless_client + + self.pm = TrexTUIPanelManager(self) + + + + def handle_key_input (self): + # try to read a single key + ch = os.read(sys.stdin.fileno(), 1) + if ch != None and len(ch) > 0: + return (self.pm.handle_key(ch), True) + + else: + return (True, False) + + + def clear_screen (self): + #os.system('clear') + # maybe this is faster ? + sys.stderr.write("\x1b[2J\x1b[H") + + + + def show (self, show_log = False): + # init termios + old_settings = termios.tcgetattr(sys.stdin) + new_settings = termios.tcgetattr(sys.stdin) + new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags + new_settings[6][termios.VMIN] = 0 # cc + new_settings[6][termios.VTIME] = 0 # cc + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings) + + self.pm.init(show_log) + + self.state = self.STATE_ACTIVE + self.draw_policer = 0 + + try: + while True: + # draw and handle user input + cont, force_draw = self.handle_key_input() + self.draw_screen(force_draw) + if not cont: + break + time.sleep(0.1) + + # regular state + if self.state == self.STATE_ACTIVE: + # if no connectivity - move to lost connecitivty + if not self.stateless_client.async_client.is_alive(): + self.stateless_client.cmd_invalidate(self.pm.ports) + self.state = self.STATE_LOST_CONT + + + # lost connectivity + elif self.state == self.STATE_LOST_CONT: + # got it back + if self.stateless_client.async_client.is_alive(): + # move to state reconnect + self.state = self.STATE_RECONNECT + + + # restored connectivity - try to reconnect + elif self.state == self.STATE_RECONNECT: + + rc = self.stateless_client.connect("RO") + if rc.good(): + self.state = self.STATE_ACTIVE + else: + # maybe we lost it again + self.state = self.STATE_LOST_CONT + + + finally: + # restore + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) + + print "" + + + # draw once + def draw_screen (self, force_draw = False): + + if (self.draw_policer >= 5) or (force_draw): + self.clear_screen() + self.pm.show() + self.draw_policer = 0 + else: + self.draw_policer += 1 + + def get_state (self): + return self.state + diff --git a/scripts/automation/trex_control_plane/server/extended_daemon_runner.py b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py index 734fa22e..7bc25aac 100755 --- a/scripts/automation/trex_control_plane/server/extended_daemon_runner.py +++ b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py @@ -19,7 +19,6 @@ def daemonize_parser(parser_obj, action_funcs, help_menu): parser_obj.usage = None
parser_obj.add_argument("action", choices=action_funcs,
action="store", help=help_menu)
- return
class ExtendedDaemonRunner(runner.DaemonRunner):
@@ -76,7 +75,12 @@ class ExtendedDaemonRunner(runner.DaemonRunner): self.app = app
self.daemon_context = daemon.DaemonContext()
self.daemon_context.stdin = open(app.stdin_path, 'rt')
- self.daemon_context.stdout = open(app.stdout_path, 'w+t')
+ try:
+ self.daemon_context.stdout = open(app.stdout_path, 'w+t')
+ except IOError as err:
+ # catch 'tty' error when launching server from remote location
+ app.stdout_path = "/dev/null"
+ self.daemon_context.stdout = open(app.stdout_path, 'w+t')
self.daemon_context.stderr = open(app.stderr_path,
'a+t', buffering=0)
diff --git a/scripts/automation/trex_control_plane/server/trex_daemon_server.py b/scripts/automation/trex_control_plane/server/trex_daemon_server.py index ec07cb8a..9784d42a 100755 --- a/scripts/automation/trex_control_plane/server/trex_daemon_server.py +++ b/scripts/automation/trex_control_plane/server/trex_daemon_server.py @@ -57,15 +57,7 @@ def main (): print "Launching user must have sudo privileges in order to run TRex daemon.\nTerminating daemon process." exit(-1) - try: - daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser) - except IOError as err: - # catch 'tty' error when launching server from remote location - if err.errno == errno.ENXIO: - trex_app.stdout_path = "/dev/null" - daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser) - else: - raise + daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser) #This ensures that the logger file handle does not get closed during daemonization daemon_runner.daemon_context.files_preserve=[handler.stream] diff --git a/scripts/automation/trex_control_plane/server/trex_server.py b/scripts/automation/trex_control_plane/server/trex_server.py index 7dee89e9..bf788d35 100755 --- a/scripts/automation/trex_control_plane/server/trex_server.py +++ b/scripts/automation/trex_control_plane/server/trex_server.py @@ -26,6 +26,7 @@ from trex_launch_thread import AsynchronousTRexSession from zmq_monitor_thread import ZmqMonitorSession from argparse import ArgumentParser, RawTextHelpFormatter from json import JSONEncoder +import re # setup the logger @@ -167,15 +168,17 @@ class CTRexServer(object): logger.info("Processing get_trex_daemon_log() command.") return self._pull_file('/var/log/trex/trex_daemon_server.log') - # get Trex version from ./t-rex-64 --help (last 4 lines) + # get Trex version from ./t-rex-64 --help (last lines starting with "Version : ...") def get_trex_version (self, base64 = True): try: logger.info("Processing get_trex_version() command.") if not self.trex_version: help_print = subprocess.Popen(['./t-rex-64', '--help'], cwd = self.TREX_PATH, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - help_print.wait() - help_print_stdout = help_print.stdout.read() - self.trex_version = binascii.b2a_base64('\n'.join(help_print_stdout.split('\n')[-5:-1])) + (stdout, stderr) = help_print.communicate() + search_result = re.search('\n\s*(Version\s*:.+)', stdout, re.DOTALL) + if not search_result: + raise Exception('Could not determine version from ./t-rex-64 --help') + self.trex_version = binascii.b2a_base64(search_result.group(1)) if base64: return self.trex_version else: @@ -256,7 +259,7 @@ class CTRexServer(object): return False - def start_trex(self, trex_cmd_options, user, block_to_success = True, timeout = 30): + def start_trex(self, trex_cmd_options, user, block_to_success = True, timeout = 40): with self.start_lock: logger.info("Processing start_trex() command.") if self.is_reserved(): @@ -340,11 +343,12 @@ class CTRexServer(object): Parameters ---------- - trex_cmd_options : str - Defines the exact command to run on the t-rex - Example: "-c 2 -m 0.500000 -d 100 -f cap2/sfr.yaml --nc -p -l 1000" + kwargs: dictionary + Dictionary of parameters for trex. For example: (c=1, nc=True, l_pkt_mode=3). + Notice that when sending command line parameters that has -, you need to replace it with _. + for example, to have on command line "--l-pkt-mode 3", you need to send l_pkt_mode=3 export_path : str - a full system path to which the results of the trex-run will be logged. + Full system path to which the results of the trex-run will be logged. """ if 'results_file_path' in kwargs: |