diff options
author | 2015-12-20 00:07:44 +0200 | |
---|---|---|
committer | 2015-12-20 00:07:44 +0200 | |
commit | 4ca24cf31919870a684fe78f17c856e0d220e6d5 (patch) | |
tree | f40ab95e52adca3ac713d61eb9fa3fd0d136e4ea /scripts/automation/trex_control_plane/client | |
parent | 1895d21485621c3428d045fa0f5b9daf165c8260 (diff) | |
parent | 5cef472bcdc6c0b7e20e5cc42485ed5570c10f8c (diff) |
Merge branch 'master' into dan_stateless
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
4 files changed, 965 insertions, 571 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 8b274134..459d6915 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -19,6 +19,7 @@ import re from common.trex_stats import * from common.trex_streams import * +from common.trex_types import * # basic async stats class class CTRexAsyncStats(object): @@ -152,38 +153,115 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client): + def __init__ (self, server, port, stateless_client, prn_func = None): self.port = port self.server = server self.stateless_client = stateless_client + self.prn_func = prn_func self.raw_snapshot = {} self.stats = CTRexAsyncStatsManager() + self.last_data_recv_ts = 0 + + self.connected = False + + # connects the async channel + def connect (self): + + if self.connected: + self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + if self.prn_func: + self.prn_func(msg) + else: + print msg + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + + # before running the thread - mark as active self.active = True - self.t = threading.Thread(target= self.run) + self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() - def run(self): + self.connected = True - # Socket to talk to server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) + # wait for data streaming from the server + timeout = time.time() + 5 + while not self.is_alive(): + time.sleep(0.01) + if time.time() > timeout: + self.disconnect() + return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + + return RC_OK() + + + # disconnect + def disconnect (self): + if not self.connected: + return + + # signal that the context was destroyed (exit the thread loop) + self.context.term() + + # mark for join and join + self.active = False + self.t.join() + + # done + self.connected = False + + # thread function + def _run (self): + + + # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') + self.socket.setsockopt(zmq.RCVTIMEO, 5000) + + got_data = False while self.active: - line = self.socket.recv_string() + try: + + line = self.socket.recv_string() + self.last_data_recv_ts = time.time() + + # signal once + if not got_data: + self.stateless_client.on_async_alive() + got_data = True + + + # got a timeout - mark as not alive and retry + except zmq.Again: + + # signal once + if got_data: + self.stateless_client.on_async_dead() + got_data = False + + continue + + except zmq.ContextTerminated: + # outside thread signaled us to exit + break + msg = json.loads(line) name = msg['name'] @@ -193,7 +271,19 @@ class CTRexAsyncClient(): self.__dispatch(name, type, data) - def get_stats(self): + + # closing of socket must be from the same thread + self.socket.close(linger = 0) + + + # did we get info for the last 3 seconds ? + def is_alive (self): + if self.last_data_recv_ts == None: + return False + + return ( (time.time() - self.last_data_recv_ts) < 3 ) + + def get_stats (self): return self.stats def get_raw_snapshot (self): @@ -203,7 +293,6 @@ class CTRexAsyncClient(): def __dispatch (self, name, type, data): # stats if name == "trex-global": - # self.stats.update(data) self.stateless_client.handle_async_stats_update(data) # events elif name == "trex-event": @@ -212,10 +301,3 @@ class CTRexAsyncClient(): pass - def stop (self): - self.active = False - self.t.join() - - -if __name__ == "__main__": - pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py index 160abdec..5709b7a5 100755 --- a/scripts/automation/trex_control_plane/client/trex_client.py +++ b/scripts/automation/trex_control_plane/client/trex_client.py @@ -88,7 +88,7 @@ class CTRexClient(object): finally: self.prompt_verbose_data() - def start_trex (self, f, d, block_to_success = True, timeout = 30, user = None, **trex_cmd_options): + def start_trex (self, f, d, block_to_success = True, timeout = 40, user = None, trex_development = False, **trex_cmd_options): """ Request to start a TRex run on server. @@ -104,7 +104,7 @@ class CTRexClient(object): timeout : int maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running' - default value: **30** + default value: **40** user : str the identity of the the run issuer. trex_cmd_options : key, val @@ -125,13 +125,17 @@ class CTRexClient(object): user = user or self.__default_user try: d = int(d) - if d < 30: # specify a test should take at least 30 seconds long. + if d < 30 and not trex_development: # test duration should be at least 30 seconds, unless trex_development flag is specified. raise ValueError except ValueError: raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.') trex_cmd_options.update( {'f' : f, 'd' : d} ) - + if not trex_cmd_options.get('l'): + self.result_obj.latency_checked = False + if 'k' in trex_cmd_options: + timeout += int(trex_cmd_options['k']) # during 'k' seconds TRex stays in 'Starting' state + self.result_obj.clear_results() try: issue_time = time.time() @@ -544,7 +548,7 @@ class CTRexClient(object): Get TRex version details. :return: - Trex details (Version, User, Date, Uuid) as ordered dictionary + Trex details (Version, User, Date, Uuid, Git SHA) as ordered dictionary :raises: + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex version could not be determined. @@ -556,9 +560,11 @@ class CTRexClient(object): version_dict = OrderedDict() result_lines = binascii.a2b_base64(self.server.get_trex_version()).split('\n') for line in result_lines: + if not line: + continue key, value = line.strip().split(':', 1) version_dict[key.strip()] = value.strip() - for key in ('Version', 'User', 'Date', 'Uuid'): + for key in ('Version', 'User', 'Date', 'Uuid', 'Git SHA'): if key not in version_dict: raise Exception('get_trex_version: got server response without key: {0}'.format(key)) return version_dict @@ -767,6 +773,7 @@ class CTRexResult(object): """ self._history = deque(maxlen = max_history_size) self.clear_results() + self.latency_checked = True def __repr__(self): return ("Is valid history? {arg}\n".format( arg = self.is_valid_hist() ) + @@ -1032,18 +1039,19 @@ class CTRexResult(object): self._done_warmup = True # handle latency data - latency_pre = "trex-latency" - self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC - # support old typo - if self._max_latency is None: - latency_pre = "trex-latecny" - self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-") - - self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC - self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency) - - avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-") - self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list) + if self.latency_checked: + latency_pre = "trex-latency" + self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC + # support old typo + if self._max_latency is None: + latency_pre = "trex-latecny" + self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-") + + self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC + self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency) + + avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-") + self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list) tx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_tx_pkts") rx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_rx_pkts") diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py new file mode 100644 index 00000000..54b4945e --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -0,0 +1,430 @@ + +from collections import namedtuple +from common.trex_types import * +from common import trex_stats + + +########## utlity ############ +def mult_to_factor (mult, max_bps, max_pps, line_util): + if mult['type'] == 'raw': + return mult['value'] + + if mult['type'] == 'bps': + return mult['value'] / max_bps + + if mult['type'] == 'pps': + return mult['value'] / max_pps + + if mult['type'] == 'percentage': + return mult['value'] / line_util + + +# describes a single port +class Port(object): + STATE_DOWN = 0 + STATE_IDLE = 1 + STATE_STREAMS = 2 + STATE_TX = 3 + STATE_PAUSE = 4 + PortState = namedtuple('PortState', ['state_id', 'state_name']) + STATES_MAP = {STATE_DOWN: "DOWN", + STATE_IDLE: "IDLE", + STATE_STREAMS: "IDLE", + STATE_TX: "ACTIVE", + STATE_PAUSE: "PAUSE"} + + + def __init__ (self, port_id, speed, driver, user, comm_link, session_id): + self.port_id = port_id + self.state = self.STATE_IDLE + self.handler = None + self.comm_link = comm_link + self.transmit = comm_link.transmit + self.transmit_batch = comm_link.transmit_batch + self.user = user + self.driver = driver + self.speed = speed + self.streams = {} + self.profile = None + self.session_id = session_id + + self.port_stats = trex_stats.CPortStats(self) + + + def err(self, msg): + return RC_ERR("port {0} : {1}".format(self.port_id, msg)) + + def ok(self, data = "ACK"): + return RC_OK(data) + + def get_speed_bps (self): + return (self.speed * 1000 * 1000 * 1000) + + # take the port + def acquire(self, force = False): + params = {"port_id": self.port_id, + "user": self.user, + "session_id": self.session_id, + "force": force} + + command = RpcCmdData("acquire", params) + rc = self.transmit(command.method, command.params) + if rc.good(): + self.handler = rc.data() + return self.ok() + else: + return self.err(rc.err()) + + # release the port + def release(self): + params = {"port_id": self.port_id, + "handler": self.handler} + + command = RpcCmdData("release", params) + rc = self.transmit(command.method, command.params) + self.handler = None + + if rc.good(): + return self.ok() + else: + return self.err(rc.err()) + + def is_acquired(self): + return (self.handler != None) + + def is_active(self): + return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) + + def is_transmitting (self): + return (self.state == self.STATE_TX) + + def is_paused (self): + return (self.state == self.STATE_PAUSE) + + + def sync(self): + params = {"port_id": self.port_id} + + command = RpcCmdData("get_port_status", params) + rc = self.transmit(command.method, command.params) + if rc.bad(): + return self.err(rc.err()) + + # sync the port + port_state = rc.data()['state'] + + if port_state == "DOWN": + self.state = self.STATE_DOWN + elif port_state == "IDLE": + self.state = self.STATE_IDLE + elif port_state == "STREAMS": + self.state = self.STATE_STREAMS + elif port_state == "TX": + self.state = self.STATE_TX + elif port_state == "PAUSE": + self.state = self.STATE_PAUSE + else: + raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) + + return self.ok() + + + # return TRUE if write commands + def is_port_writable (self): + # operations on port can be done on state idle or state streams + return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) + + # add stream to the port + def add_stream (self, stream_id, stream_obj): + + if not self.is_port_writable(): + return self.err("Please stop port before attempting to add streams") + + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id, + "stream": stream_obj} + + rc = self.transmit("add_stream", params) + if rc.bad(): + return self.err(rc.err()) + + # add the stream + self.streams[stream_id] = stream_obj + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # add multiple streams + def add_streams (self, streams_list): + batch = [] + + for stream in streams_list: + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream.stream_id, + "stream": stream.stream} + + cmd = RpcCmdData('add_stream', params) + batch.append(cmd) + + rc = self.transmit_batch(batch) + if rc.bad(): + return self.err(rc.err()) + + # validate that every action succeeded + + # add the stream + for stream in streams_list: + self.streams[stream.stream_id] = stream.stream + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # remove stream from port + def remove_stream (self, stream_id): + + if not stream_id in self.streams: + return self.err("stream {0} does not exists".format(stream_id)) + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id} + + + rc = self.transmit("remove_stream", params) + if rc.bad(): + return self.err(rc.err()) + + self.streams[stream_id] = None + + self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE + + return self.ok() + + # remove all the streams + def remove_all_streams (self): + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("remove_all_streams", params) + if rc.bad(): + return self.err(rc.err()) + + self.streams = {} + + self.state = self.STATE_IDLE + + return self.ok() + + # get a specific stream + def get_stream (self, stream_id): + if stream_id in self.streams: + return self.streams[stream_id] + else: + return None + + def get_all_streams (self): + return self.streams + + # start traffic + def start (self, mul, duration): + if self.state == self.STATE_DOWN: + return self.err("Unable to start traffic - port is down") + + if self.state == self.STATE_IDLE: + return self.err("Unable to start traffic - no streams attached to port") + + if self.state == self.STATE_TX: + return self.err("Unable to start traffic - port is already transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul, + "duration": duration} + + rc = self.transmit("start_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + self.state = self.STATE_TX + + return self.ok() + + # stop traffic + # with force ignores the cached state and sends the command + def stop (self, force = False): + + if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("stop_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + # only valid state after stop + self.state = self.STATE_STREAMS + + return self.ok() + + def pause (self): + + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("pause_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + # only valid state after stop + self.state = self.STATE_PAUSE + + return self.ok() + + + def resume (self): + + if (self.state != self.STATE_PAUSE) : + return self.err("port is not in pause mode") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("resume_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + # only valid state after stop + self.state = self.STATE_TX + + return self.ok() + + + def update (self, mul): + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul} + + rc = self.transmit("update_traffic", params) + if rc.bad(): + return self.err(rc.err()) + + return self.ok() + + + def validate (self): + + if (self.state == self.STATE_DOWN): + return self.err("port is down") + + if (self.state == self.STATE_IDLE): + return self.err("no streams attached to port") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc = self.transmit("validate", params) + if rc.bad(): + return self.err(rc.err()) + + self.profile = rc.data() + + return self.ok() + + def get_profile (self): + return self.profile + + + def print_profile (self, mult, duration): + if not self.get_profile(): + return + + rate = self.get_profile()['rate'] + graph = self.get_profile()['graph'] + + print format_text("Profile Map Per Port\n", 'underline', 'bold') + + factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util']) + + print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"), + format_num(rate['max_bps'] * factor, suffix = "bps")) + + print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"), + format_num(rate['max_pps'] * factor, suffix = "pps"),) + + print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100), + format_percentage(rate['max_line_util'] * factor * 100)) + + + # duration + exp_time_base_sec = graph['expected_duration'] / (1000 * 1000) + exp_time_factor_sec = exp_time_base_sec / factor + + # user configured a duration + if duration > 0: + if exp_time_factor_sec > 0: + exp_time_factor_sec = min(exp_time_factor_sec, duration) + else: + exp_time_factor_sec = duration + + + print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec), + format_time(exp_time_factor_sec)) + print "\n" + + + def get_port_state_name(self): + return self.STATES_MAP.get(self.state, "Unknown") + + ################# stats handler ###################### + def generate_port_stats(self): + return self.port_stats.generate_stats() + pass + + def generate_port_status(self): + return {"port-type": self.driver, + "maximum": "{speed} Gb/s".format(speed=self.speed), + "port-status": self.get_port_state_name() + } + + def clear_stats(self): + return self.port_stats.clear_stats() + + def invalidate_stats(self): + return self.port_stats.invalidate() + + + ################# events handler ###################### + def async_event_port_stopped (self): + self.state = self.STATE_STREAMS + + + def async_event_port_started (self): + self.state = self.STATE_TX + + + def async_event_port_paused (self): + self.state = self.STATE_PAUSE + + + def async_event_port_resumed (self): + self.state = self.STATE_TX + + def async_event_forced_acquired (self): + self.handler = None + diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 748817da..58fa53c9 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -14,433 +14,40 @@ import json from common.trex_streams import * from collections import namedtuple from common.text_opts import * -# import trex_stats from common import trex_stats from client_utils import parsing_opts, text_tables import time import datetime import re +import random +from trex_port import Port +from common.trex_types import * from trex_async_client import CTRexAsyncClient -RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) -class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): - __slots__ = () - def __str__(self): - return "{id:^3} - {msg} ({stat})".format(id=self.id, - msg=self.msg, - stat="success" if self.success else "fail") - -# simple class to represent complex return value -class RC(): - - def __init__ (self, rc = None, data = None): - self.rc_list = [] - - if (rc != None) and (data != None): - tuple_rc = namedtuple('RC', ['rc', 'data']) - self.rc_list.append(tuple_rc(rc, data)) - - def add (self, rc): - self.rc_list += rc.rc_list - - def good (self): - return all([x.rc for x in self.rc_list]) - - def bad (self): - return not self.good() - - def data (self): - return all([x.data if x.rc else "" for x in self.rc_list]) - - def err (self): - return all([x.data if not x.rc else "" for x in self.rc_list]) - - def annotate (self, desc = None): - if desc: - print format_text('\n{:<40}'.format(desc), 'bold'), - - if self.bad(): - # print all the errors - print "" - for x in self.rc_list: - if not x.rc: - print format_text("\n{0}".format(x.data), 'bold') - - print "" - print format_text("[FAILED]\n", 'red', 'bold') - - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - - -def RC_OK(): - return RC(True, "") -def RC_ERR(err): - return RC(False, err) - - -LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) - -# describes a stream DB -class CStreamsDB(object): - - def __init__(self): - self.stream_packs = {} - - def load_yaml_file(self, filename): - - stream_pack_name = filename - if stream_pack_name in self.get_loaded_streams_names(): - self.remove_stream_packs(stream_pack_name) - - stream_list = CStreamList() - loaded_obj = stream_list.load_yaml(filename) - - try: - compiled_streams = stream_list.compile_streams() - rc = self.load_streams(stream_pack_name, - LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump()) - for k, v in compiled_streams.items()])) - - except Exception as e: - return None - - return self.get_stream_pack(stream_pack_name) - - def load_streams(self, name, LoadedStreamList_obj): - if name in self.stream_packs: - return False - else: - self.stream_packs[name] = LoadedStreamList_obj - return True - - def remove_stream_packs(self, *names): - removed_streams = [] - for name in names: - removed = self.stream_packs.pop(name) - if removed: - removed_streams.append(name) - return removed_streams - - def clear(self): - self.stream_packs.clear() - - def get_loaded_streams_names(self): - return self.stream_packs.keys() - - def stream_pack_exists (self, name): - return name in self.get_loaded_streams_names() - - def get_stream_pack(self, name): - if not self.stream_pack_exists(name): - return None - else: - return self.stream_packs.get(name) - - -# describes a single port -class Port(object): - STATE_DOWN = 0 - STATE_IDLE = 1 - STATE_STREAMS = 2 - STATE_TX = 3 - STATE_PAUSE = 4 - PortState = namedtuple('PortState', ['state_id', 'state_name']) - STATES_MAP = {STATE_DOWN: "DOWN", - STATE_IDLE: "IDLE", - STATE_STREAMS: "STREAMS", - STATE_TX: "ACTIVE", - STATE_PAUSE: "PAUSE"} - - - def __init__ (self, port_id, speed, driver, user, transmit): - self.port_id = port_id - self.state = self.STATE_IDLE - self.handler = None - self.transmit = transmit - self.user = user - self.driver = driver - self.speed = speed - self.streams = {} - self.port_stats = trex_stats.CPortStats(self) - - def err(self, msg): - return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - - def ok(self): - return RC_OK() - - def get_speed_bps (self): - return (self.speed * 1000 * 1000 * 1000) - - # take the port - def acquire(self, force = False): - params = {"port_id": self.port_id, - "user": self.user, - "force": force} - - command = RpcCmdData("acquire", params) - rc = self.transmit(command.method, command.params) - if rc.success: - self.handler = rc.data - return self.ok() - else: - return self.err(rc.data) - - # release the port - def release(self): - params = {"port_id": self.port_id, - "handler": self.handler} - - command = RpcCmdData("release", params) - rc = self.transmit(command.method, command.params) - if rc.success: - self.handler = rc.data - return self.ok() - else: - return self.err(rc.data) - - def is_acquired(self): - return (self.handler != None) - - def is_active(self): - return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) - - def sync(self, sync_data): - self.handler = sync_data['handler'] - port_state = sync_data['state'].upper() - if port_state == "DOWN": - self.state = self.STATE_DOWN - elif port_state == "IDLE": - self.state = self.STATE_IDLE - elif port_state == "STREAMS": - self.state = self.STATE_STREAMS - elif port_state == "TX": - self.state = self.STATE_TX - elif port_state == "PAUSE": - self.state = self.STATE_PAUSE - else: - raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) - - return self.ok() - - - # return TRUE if write commands - def is_port_writable (self): - # operations on port can be done on state idle or state streams - return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) - - # add stream to the port - def add_stream (self, stream_id, stream_obj): - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to add streams") - - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id, - "stream": stream_obj} - - rc, data = self.transmit("add_stream", params) - if not rc: - r = self.err(data) - print r.good() - - # add the stream - self.streams[stream_id] = stream_obj - - # the only valid state now - self.state = self.STATE_STREAMS - - return self.ok() - - # remove stream from port - def remove_stream (self, stream_id): - - if not stream_id in self.streams: - return self.err("stream {0} does not exists".format(stream_id)) - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id} - - - rc, data = self.transmit("remove_stream", params) - if not rc: - return self.err(data) - - self.streams[stream_id] = None - - return self.ok() - - # remove all the streams - def remove_all_streams (self): - - params = {"handler": self.handler, - "port_id": self.port_id} +class CTRexStatelessClient(object): + """docstring for CTRexStatelessClient""" - rc, data = self.transmit("remove_all_streams", params) - if not rc: - return self.err(data) + # verbose levels + VERBOSE_QUIET = 0 + VERBOSE_REGULAR = 1 + VERBOSE_HIGH = 2 + + def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, quiet = False, virtual = False): + super(CTRexStatelessClient, self).__init__() - self.streams = {} + self.user = username - return self.ok() + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func) - # get a specific stream - def get_stream (self, stream_id): - if stream_id in self.streams: - return self.streams[stream_id] + # default verbose level + if not quiet: + self.verbose = self.VERBOSE_REGULAR else: - return None - - def get_all_streams (self): - return self.streams - - - def process_mul (self, mul): - # if percentage - translate - if mul['type'] == 'percentage': - mul['type'] = 'max_bps' - mul['max'] = self.get_speed_bps() * (mul['max'] / 100) - - - # start traffic - def start (self, mul, duration): - if self.state == self.STATE_DOWN: - return self.err("Unable to start traffic - port is down") - - if self.state == self.STATE_IDLE: - return self.err("Unable to start traffic - no streams attached to port") - - if self.state == self.STATE_TX: - return self.err("Unable to start traffic - port is already transmitting") - - self.process_mul(mul) - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul, - "duration": duration} - - rc, data = self.transmit("start_traffic", params) - if not rc: - return self.err(data) - - self.state = self.STATE_TX - - return self.ok() - - # stop traffic - # with force ignores the cached state and sends the command - def stop (self, force = False): - - if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("stop_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_STREAMS - - return self.ok() - - def pause (self): - - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} + self.verbose = self.VERBOSE_QUIET - rc, data = self.transmit("pause_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_PAUSE - - return self.ok() - - - def resume (self): - - if (self.state != self.STATE_PAUSE) : - return self.err("port is not in pause mode") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("resume_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_TX - - return self.ok() - - - def update (self, mul): - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - self.process_mul(mul) - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul} - - rc, data = self.transmit("update_traffic", params) - if not rc: - return self.err(data) - - return self.ok() - - def get_port_state_name(self): - return self.STATES_MAP.get(self.state, "Unknown") - - ################# stats handler ###################### - def generate_port_stats(self): - return self.port_stats.generate_stats() - pass - - def generate_port_status(self): - return {"port-type": self.driver, - "maximum": "{speed} Gb/s".format(speed=self.speed), - "port-status": self.get_port_state_name() - } - - def clear_stats(self): - return self.port_stats.clear_stats() - - ################# events handler ###################### - def async_event_port_stopped (self): - self.state = self.STATE_STREAMS - - -class CTRexStatelessClient(object): - """docstring for CTRexStatelessClient""" - - def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False): - super(CTRexStatelessClient, self).__init__() - self.user = username - self.system_info = None - self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) - self.verbose = False self.ports = {} - # self._conn_handler = {} - # self._active_ports = set() self._connection_info = {"server": server, "sync_port": sync_port, "async_port": async_port} @@ -448,7 +55,7 @@ class CTRexStatelessClient(object): self.server_version = {} self.__err_log = None - self._async_client = CTRexAsyncClient(server, async_port, self) + self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func) self.streams_db = CStreamsDB() self.global_stats = trex_stats.CGlobalStats(self._connection_info, @@ -457,12 +64,44 @@ class CTRexStatelessClient(object): self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats, self.ports) + self.events = [] + + self.session_id = random.getrandbits(32) + self.read_only = False self.connected = False - self.events = [] + + + # returns the port object + def get_port (self, port_id): + return self.ports.get(port_id, None) + + + # connection server ip + def get_server_ip (self): + return self.comm_link.get_server() + + # connection server port + def get_server_port (self): + return self.comm_link.get_port() + ################# events handler ###################### - + def add_event_log (self, msg, ev_type, show = False): + + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" + + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) + + if show: + self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) + + def handle_async_stats_update(self, dump_data): global_stats = {} port_stats = {} @@ -490,59 +129,108 @@ class CTRexStatelessClient(object): for port_id, data in port_stats.iteritems(): self.ports[port_id].port_stats.update(data) + + def handle_async_event (self, type, data): # DP stopped - ev = "[event] - " - show_event = False # port started if (type == 0): port_id = int(data['port_id']) - ev += "Port {0} has started".format(port_id) + ev = "Port {0} has started".format(port_id) + self.async_event_port_started(port_id) # port stopped elif (type == 1): port_id = int(data['port_id']) - ev += "Port {0} has stopped".format(port_id) + ev = "Port {0} has stopped".format(port_id) # call the handler self.async_event_port_stopped(port_id) - # server stopped + # port paused elif (type == 2): - ev += "Server has stopped" - self.async_event_server_stopped() - show_event = True + port_id = int(data['port_id']) + ev = "Port {0} has paused".format(port_id) - # port finished traffic + # call the handler + self.async_event_port_paused(port_id) + + # port resumed elif (type == 3): port_id = int(data['port_id']) - ev += "Port {0} job done".format(port_id) + ev = "Port {0} has resumed".format(port_id) + + # call the handler + self.async_event_port_resumed(port_id) + + # port finished traffic + elif (type == 4): + port_id = int(data['port_id']) + ev = "Port {0} job done".format(port_id) # call the handler self.async_event_port_stopped(port_id) show_event = True + # port was stolen... + elif (type == 5): + session_id = data['session_id'] + + # false alarm, its us + if session_id == self.session_id: + return + + port_id = int(data['port_id']) + who = data['who'] + + ev = "Port {0} was forcely taken by '{1}'".format(port_id, who) + + # call the handler + self.async_event_port_forced_acquired(port_id) + show_event = True + + # server stopped + elif (type == 100): + ev = "Server has stopped" + self.async_event_server_stopped() + show_event = True + + else: # unknown event - ignore return - if show_event: - print format_text("\n" + ev, 'bold') - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{0} - ".format(st) + format_text(ev, 'bold')) + self.add_event_log(ev, 'server', show_event) def async_event_port_stopped (self, port_id): self.ports[port_id].async_event_port_stopped() + + def async_event_port_started (self, port_id): + self.ports[port_id].async_event_port_started() + + + def async_event_port_paused (self, port_id): + self.ports[port_id].async_event_port_paused() + + + def async_event_port_resumed (self, port_id): + self.ports[port_id].async_event_port_resumed() + + + def async_event_port_forced_acquired (self, port_id): + self.ports[port_id].async_event_forced_acquired() + self.read_only = True + def async_event_server_stopped (self): - self.disconnect() + self.connected = False + def get_events (self): return self.events @@ -552,6 +240,24 @@ class CTRexStatelessClient(object): ############# helper functions section ############## + # measure time for functions + def timing(f): + def wrap(*args): + time1 = time.time() + ret = f(*args) + + # don't want to print on error + if ret.bad(): + return ret + + delta = time.time() - time1 + print format_time(delta) + "\n" + + return ret + + return wrap + + def validate_port_list(self, port_id_list): if not isinstance(port_id_list, list): print type(port_id_list) @@ -584,66 +290,120 @@ class CTRexStatelessClient(object): ############ boot up section ################ # connection sequence - def connect(self): + # mode can be RW - read / write, RWF - read write with force , RO - read only + def connect(self, mode = "RW"): + + if self.is_connected(): + self.disconnect() + + # clear this flag self.connected = False - # connect - rc, data = self.comm_link.connect() - if not rc: - return RC_ERR(data) + # connect sync channel + rc = self.comm_link.connect() + if rc.bad(): + return rc + + # connect async channel + rc = self.async_client.connect() + if rc.bad(): + return rc # version - rc, data = self.transmit("get_version") - if not rc: - return RC_ERR(data) + rc = self.transmit("get_version") + if rc.bad(): + return rc - self.server_version = data - self.global_stats.server_version = data + self.server_version = rc.data() + self.global_stats.server_version = rc.data() # cache system info - # self.get_system_info(refresh=True) - rc, data = self.transmit("get_system_info") - if not rc: - return RC_ERR(data) - self.system_info = data + rc = self.transmit("get_system_info") + if rc.bad(): + return rc + + self.system_info = rc.data() # cache supported commands - rc, data = self.transmit("get_supported_cmds") - if not rc: - return RC_ERR(data) + rc = self.transmit("get_supported_cmds") + if rc.bad(): + return rc - self.supported_cmds = data + self.supported_cmds = rc.data() # create ports for port_id in xrange(self.get_port_count()): speed = self.system_info['ports'][port_id]['speed'] driver = self.system_info['ports'][port_id]['driver'] - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.transmit) - # acquire all ports - rc = self.acquire() - if rc.bad(): - return rc + self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id) + - rc = self.sync_with_server() + # sync the ports + rc = self.sync_ports() if rc.bad(): return rc - self.connected = True + # acquire all ports + if mode == "RW": + rc = self.acquire(force = False) + + # fallback to read only if failed + if rc.bad(): + rc.annotate(show_status = False) + print format_text("Switching to read only mode - only few commands will be available", 'bold') + + self.release(self.get_acquired_ports()) + self.read_only = True + else: + self.read_only = False + + elif mode == "RWF": + rc = self.acquire(force = True) + if rc.bad(): + return rc + self.read_only = False + elif mode == "RO": + # no acquire on read only + rc = RC_OK() + self.read_only = True + + + + self.connected = True return RC_OK() + + def is_read_only (self): + return self.read_only + def is_connected (self): return self.connected and self.comm_link.is_connected def disconnect(self): - self.connected = False + # release any previous acquired ports + if self.is_connected(): + self.release(self.get_acquired_ports()) + self.comm_link.disconnect() + self.async_client.disconnect() + + self.connected = False + return RC_OK() + def on_async_dead (self): + if self.connected: + msg = 'lost connection to server' + self.add_event_log(msg, 'local', True) + self.connected = False + + def on_async_alive (self): + pass ########### cached queries (no server traffic) ########### @@ -666,8 +426,8 @@ class CTRexStatelessClient(object): else: return port_ids - def get_stats_async(self): - return self._async_client.get_stats() + def get_stats_async (self): + return self.async_client.get_stats() def get_connection_port (self): return self.comm_link.port @@ -675,6 +435,9 @@ class CTRexStatelessClient(object): def get_connection_ip (self): return self.comm_link.server + def get_all_ports (self): + return [port_id for port_id, port_obj in self.ports.iteritems()] + def get_acquired_ports(self): return [port_id for port_id, port_obj in self.ports.iteritems() @@ -685,36 +448,60 @@ class CTRexStatelessClient(object): for port_id, port_obj in self.ports.iteritems() if port_obj.is_active()] + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + def set_verbose(self, mode): - self.comm_link.set_verbose(mode) + + # on high - enable link verbose + if mode == self.VERBOSE_HIGH: + self.comm_link.set_verbose(True) + else: + self.comm_link.set_verbose(False) + self.verbose = mode + + def check_verbose (self, level): + return (self.verbose >= level) + + def get_verbose (self): + return self.verbose + + def prn_func (self, msg, level = VERBOSE_REGULAR): + if self.check_verbose(level): + print msg + ############# server actions ################ # ping server def ping(self): - rc, info = self.transmit("ping") - return RC(rc, info) + return self.transmit("ping") - def sync_with_server(self, sync_streams=False): - rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) - if not rc: - return RC_ERR(data) - - for port_info in data: - rc = self.ports[port_info['port_id']].sync(port_info) - if rc.bad(): - return rc - - return RC_OK() def get_global_stats(self): - rc, info = self.transmit("get_global_stats") - return RC(rc, info) + return self.transmit("get_global_stats") ########## port commands ############## + def sync_ports (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].sync()) + + return rc + # acquire ports, if port_list is none - get all def acquire (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) @@ -733,7 +520,7 @@ class CTRexStatelessClient(object): rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].release(force)) + rc.add(self.ports[port_id].release()) return rc @@ -750,15 +537,16 @@ class CTRexStatelessClient(object): return rc + def add_stream_pack(self, stream_pack_list, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() - for stream_pack in stream_pack_list: - rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)) - + for port_id in port_id_list: + rc.add(self.ports[port_id].add_streams(stream_pack_list)) + return rc @@ -855,6 +643,17 @@ class CTRexStatelessClient(object): return rc + def validate (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].validate()) + + return rc + + def get_port_stats(self, port_id=None): pass @@ -866,16 +665,19 @@ class CTRexStatelessClient(object): return self.comm_link.transmit(method_name, params) + def transmit_batch(self, batch_list): + return self.comm_link.transmit_batch(batch_list) ######################### Console (high level) API ######################### + @timing def cmd_ping(self): rc = self.ping() rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) return rc - def cmd_connect(self): - rc = self.connect() + def cmd_connect(self, mode = "RW"): + rc = self.connect(mode) rc.annotate() return rc @@ -886,13 +688,7 @@ class CTRexStatelessClient(object): # reset def cmd_reset(self): - - - # sync with the server - rc = self.sync_with_server() - rc.annotate("Syncing with the server:") - if rc.bad(): - return rc + #self.release(self.get_acquired_ports()) rc = self.acquire(force = True) rc.annotate("Force acquiring all ports:") @@ -924,7 +720,7 @@ class CTRexStatelessClient(object): active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) if not active_ports: - msg = "No active traffic on porvided ports" + msg = "No active traffic on provided ports" print format_text(msg, 'bold') return RC_ERR(msg) @@ -948,15 +744,26 @@ class CTRexStatelessClient(object): rc = self.update_traffic(mult, active_ports) rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - return RC_OK() + return rc + # clear stats def cmd_clear(self, port_id_list): + for port_id in port_id_list: self.ports[port_id].clear_stats() + self.global_stats.clear_stats() + + return RC_OK() + + + def cmd_invalidate (self, port_id_list): + for port_id in port_id_list: + self.ports[port_id].invalidate_stats() + + self.global_stats.invalidate() + return RC_OK() # pause cmd @@ -972,10 +779,8 @@ class CTRexStatelessClient(object): rc = self.pause_traffic(active_ports) rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc + return rc - return RC_OK() # resume cmd @@ -991,14 +796,11 @@ class CTRexStatelessClient(object): rc = self.resume_traffic(active_ports) rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - - return RC_OK() + return rc # start cmd - def cmd_start (self, port_id_list, stream_list, mult, force, duration): + def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) @@ -1020,19 +822,37 @@ class CTRexStatelessClient(object): rc = self.add_stream_pack(stream_list.compiled, port_id_list) - rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list)) + rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) if rc.bad(): return rc + # when not on dry - start the traffic , otherwise validate only + if not dry: + rc = self.start_traffic(mult, duration, port_id_list) + rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - # finally, start the traffic - rc = self.start_traffic(mult, duration, port_id_list) - rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): return rc + else: + rc = self.validate(port_id_list) + rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list)) - return RC_OK() + if rc.bad(): + return rc + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) + + return rc + + + # validate port(s) profile + def cmd_validate (self, port_id_list): + rc = self.validate(port_id_list) + rc.annotate("Validating streams on port(s) {0}:".format(port_id_list)) + return rc + + + # stats def cmd_stats(self, port_id_list, stats_mask=set()): stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) @@ -1043,6 +863,26 @@ class CTRexStatelessClient(object): ############## High Level API With Parser ################ + + def cmd_connect_line (self, line): + '''Connects to the TRex server''' + # define a parser + parser = parsing_opts.gen_parser(self, + "connect", + self.cmd_connect_line.__doc__, + parsing_opts.FORCE) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + if opts.force: + rc = self.cmd_connect(mode = "RWF") + else: + rc = self.cmd_connect(mode = "RW") + + @timing def cmd_start_line (self, line): '''Start selected traffic in specified ports on TRex\n''' # define a parser @@ -1054,13 +894,19 @@ class CTRexStatelessClient(object): parsing_opts.FORCE, parsing_opts.STREAM_FROM_PATH_OR_FILE, parsing_opts.DURATION, - parsing_opts.MULTIPLIER) + parsing_opts.MULTIPLIER_STRICT, + parsing_opts.DRY_RUN) opts = parser.parse_args(line.split()) + if opts is None: return RC_ERR("bad command line parameters") + + if opts.dry: + print format_text("\n*** DRY RUN ***", 'bold') + if opts.db: stream_list = self.streams_db.get_stream_pack(opts.db) rc = RC(stream_list != None) @@ -1070,7 +916,15 @@ class CTRexStatelessClient(object): else: # load streams from file - stream_list = self.streams_db.load_yaml_file(opts.file[0]) + stream_list = None; + try: + stream_list = self.streams_db.load_yaml_file(opts.file[0]) + except Exception as e: + s = str(e) + rc=RC_ERR(s) + rc.annotate() + return rc + rc = RC(stream_list != None) rc.annotate("Load stream pack (from file):") if stream_list == None: @@ -1078,12 +932,13 @@ class CTRexStatelessClient(object): # total has no meaning with percentage - its linear - if opts.total and (mult['type'] != 'percentage'): + if opts.total and (opts.mult['type'] != 'percentage'): # if total was set - divide it between the ports - opts.mult['max'] = opts.mult['max'] / len(opts.ports) + opts.mult['value'] = opts.mult['value'] / len(opts.ports) - return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration) + return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry) + @timing def cmd_resume_line (self, line): '''Resume active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1097,6 +952,8 @@ class CTRexStatelessClient(object): return self.cmd_resume(opts.ports) + + @timing def cmd_stop_line (self, line): '''Stop active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1110,6 +967,8 @@ class CTRexStatelessClient(object): return self.cmd_stop(opts.ports) + + @timing def cmd_pause_line (self, line): '''Pause active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1123,6 +982,8 @@ class CTRexStatelessClient(object): return self.cmd_pause(opts.ports) + + @timing def cmd_update_line (self, line): '''Update port(s) speed currently active\n''' parser = parsing_opts.gen_parser(self, @@ -1139,14 +1000,15 @@ class CTRexStatelessClient(object): # total has no meaning with percentage - its linear if opts.total and (opts.mult['type'] != 'percentage'): # if total was set - divide it between the ports - opts.mult['max'] = opts.mult['max'] / len(opts.ports) + opts.mult['value'] = opts.mult['value'] / len(opts.ports) return self.cmd_update(opts.ports, opts.mult) - + @timing def cmd_reset_line (self, line): return self.cmd_reset() + def cmd_clear_line (self, line): '''Clear cached local statistics\n''' # define a parser @@ -1161,6 +1023,7 @@ class CTRexStatelessClient(object): return RC_ERR("bad command line parameters") return self.cmd_clear(opts.ports) + def cmd_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser @@ -1180,30 +1043,34 @@ class CTRexStatelessClient(object): if not mask: # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS - # get stats objects, as dictionary + stats = self.cmd_stats(opts.ports, mask) + # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) - return - - # if opts.db: - # stream_list = self.streams_db.get_stream_pack(opts.db) - # rc = RC(stream_list != None) - # rc.annotate("Load stream pack (from DB):") - # if rc.bad(): - # return RC_ERR("Failed to load stream pack") - # - # else: - # # load streams from file - # stream_list = self.streams_db.load_yaml_file(opts.file[0]) - # rc = RC(stream_list != None) - # rc.annotate("Load stream pack (from file):") - # if stream_list == None: - # return RC_ERR("Failed to load stream pack") - # - # - # return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration) + + + return RC_OK() + + + + @timing + def cmd_validate_line (self, line): + '''validates port(s) stream configuration\n''' + + parser = parsing_opts.gen_parser(self, + "validate", + self.cmd_validate_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + rc = self.cmd_validate(opts.ports) + return rc + def cmd_exit_line (self, line): print format_text("Exiting\n", 'bold') @@ -1280,6 +1147,7 @@ class CTRexStatelessClient(object): return True + ################################# # ------ private methods ------ # @staticmethod @@ -1294,17 +1162,18 @@ class CTRexStatelessClient(object): def _filter_namespace_args(namespace, ok_values): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} + ################################# # ------ private classes ------ # class CCommLink(object): """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False): + def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): super(CTRexStatelessClient.CCommLink, self).__init__() self.virtual = virtual self.server = server self.port = port self.verbose = False - self.rpc_link = JsonRpcClient(self.server, self.port) + self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) @property def is_connected(self): @@ -1313,6 +1182,12 @@ class CTRexStatelessClient(object): else: return True + def get_server (self): + return self.server + + def get_port (self): + return self.port + def set_verbose(self, mode): self.verbose = mode return self.rpc_link.set_verbose(mode) @@ -1354,4 +1229,3 @@ class CTRexStatelessClient(object): if __name__ == "__main__": pass - |