diff options
author | 2016-01-24 13:27:11 +0200 | |
---|---|---|
committer | 2016-01-24 13:27:11 +0200 | |
commit | 951dd56abfd78d5669f0f57d840b2fe623ded2cd (patch) | |
tree | 60857060a198512b0d629e04daf63ca2168f09b3 | |
parent | 2d27d1df02328d7148ac1c4ed029ecdaf1853c1e (diff) | |
parent | 6f4a51c126b7a78ee8e37d396ed2b61b05fa506c (diff) |
Merge from origin
30 files changed, 2477 insertions, 1531 deletions
diff --git a/api/stl/examples/stl_simple_burst.py b/api/stl/examples/stl_simple_burst.py new file mode 100644 index 00000000..7efb574a --- /dev/null +++ b/api/stl/examples/stl_simple_burst.py @@ -0,0 +1,53 @@ +import sys +sys.path.insert(0, "../") + +import trex_stl_api + +from trex_stl_api import STLClient, STLError + +import time + +# define a simple burst test +def simple_burst (): + + passed = True + + try: + with STLClient() as c: + + # activate this for some logging information + #c.logger.set_verbose(c.logger.VERBOSE_REGULAR) + + # repeat for 5 times + for i in xrange(1, 6): + + # read the stats before + before_ipackets = c.get_stats()['total']['ipackets'] + + # inject burst profile on two ports and block until done + c.start(profiles = '../profiles/burst.yaml', ports = [0, 1], mult = "1gbps") + c.wait_on_traffic(ports = [0, 1]) + + after_ipackets = c.get_stats()['total']['ipackets'] + + print "Test iteration {0} - Packets Received: {1} ".format(i, (after_ipackets - before_ipackets)) + + # we have 600 packets in the burst and two ports + if (after_ipackets - before_ipackets) != (600 * 2): + passed = False + + # error handling + except STLError as e: + passed = False + print e + + + + if passed: + print "\nTest has passed :-)\n" + else: + print "\nTest has failed :-(\n" + + +simple_burst() + diff --git a/api/stl/examples/udp_64B.pcap b/api/stl/examples/udp_64B.pcap Binary files differnew file mode 100644 index 00000000..699b9c80 --- /dev/null +++ b/api/stl/examples/udp_64B.pcap diff --git a/api/stl/profiles/burst.yaml b/api/stl/profiles/burst.yaml new file mode 100644 index 00000000..dbd348c7 --- /dev/null +++ b/api/stl/profiles/burst.yaml @@ -0,0 +1,39 @@ +### Single stream UDP packet, 64B ### +##################################### +- name: stream0 + stream: + self_start: True + next_stream_id: stream1 + packet: + binary: udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 100 + rx_stats: [] + vm: [] + +- name: stream1 + stream: + self_start: False + next_stream_id: stream2 + packet: + binary: udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 200 + rx_stats: [] + vm: [] + +- name: stream2 + stream: + self_start: False + packet: + binary: udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 300 + rx_stats: [] + vm: [] diff --git a/api/stl/trex_stl_api.py b/api/stl/trex_stl_api.py new file mode 100644 index 00000000..aad39916 --- /dev/null +++ b/api/stl/trex_stl_api.py @@ -0,0 +1,17 @@ +import os +import sys +import time + + +# update the import path to include the stateless client +root_path = os.path.dirname(os.path.abspath(__file__)) + +sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client/')) +sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client_utils/')) +sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client_utils/')) + +# aliasing +import trex_stateless_client +STLClient = trex_stateless_client.STLClient +STLError = trex_stateless_client.STLError + diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py index 1a96e127..f098e193 100755 --- a/linux_dpdk/ws_main.py +++ b/linux_dpdk/ws_main.py @@ -86,6 +86,7 @@ def options(opt): opt.load('compiler_cc') opt.add_option('--pkg-dir', '--pkg_dir', dest='pkg_dir', default=False, action='store', help="Destination folder for 'pkg' option.") opt.add_option('--pkg-file', '--pkg_file', dest='pkg_file', default=False, action='store', help="Destination filename for 'pkg' option.") + opt.add_option('--publish-commit', '--publish_commit', dest='publish_commit', default=False, action='store', help="Specify commit id for 'publish_both' option (Please make sure it's good!)") def configure(conf): conf.load('g++') @@ -887,11 +888,14 @@ class Env(object): s= Env().get_env('TREX_EX_WEB_SRV'); return s; + @staticmethod + def get_trex_regression_workspace(): + return Env().get_env('TREX_REGRESSION_WORKSPACE') + + def check_release_permission(): if os.getenv('USER') not in USERS_ALLOWED_TO_RELEASE: - print 'You are not allowed to release TRex. Please contact Hanoch.' - return False - return True + raise Exception('You are not allowed to release TRex. Please contact Hanoch.') # build package in parent dir. can provide custom name and folder with --pkg-dir and --pkg-file def pkg(self): @@ -920,9 +924,8 @@ def release(bld, custom_dir = None): """ release to local folder """ if custom_dir: exec_p = custom_dir - elif not check_release_permission(): - return else: + check_release_permission() exec_p = Env().get_release_path() print "copy images and libs" os.system(' mkdir -p '+exec_p); @@ -947,34 +950,77 @@ def release(bld, custom_dir = None): os.system("mv %s/%s.tar.gz %s" % (os.getcwd(),rel,exec_p)); -def publish(bld): - if not check_release_permission(): - return +def publish(bld, custom_source = None): + check_release_permission() exec_p = Env().get_release_path() rel=get_build_num () release_name ='%s.tar.gz' % (rel); - from_ = exec_p+'/'+release_name; + if custom_source: + from_ = custom_source + else: + from_ = exec_p+'/'+release_name; os.system("rsync -av %s %s:%s/%s " %(from_,Env().get_local_web_server(),Env().get_remote_release_path (), release_name)) os.system("ssh %s 'cd %s;rm be_latest; ln -P %s be_latest' " %(Env().get_local_web_server(),Env().get_remote_release_path (),release_name)) #os.system("ssh %s 'cd %s;rm latest; ln -P %s latest' " %(Env().get_local_web_server(),Env().get_remote_release_path (),release_name)) -def publish_ext(bld): - if not check_release_permission(): - return +def publish_ext(bld, custom_source = None): + check_release_permission() exec_p = Env().get_release_path() rel=get_build_num () release_name ='%s.tar.gz' % (rel); - from_ = exec_p+'/'+release_name; + if custom_source: + from_ = custom_source + else: + from_ = exec_p+'/'+release_name; os.system('rsync -avz -e "ssh -i %s" --rsync-path=/usr/bin/rsync %s %s@%s:%s/release/%s' % (Env().get_trex_ex_web_key(),from_, Env().get_trex_ex_web_user(),Env().get_trex_ex_web_srv(),Env().get_trex_ex_web_path() ,release_name) ) os.system("ssh -i %s -l %s %s 'cd %s/release/;rm be_latest; ln -P %s be_latest' " %(Env().get_trex_ex_web_key(),Env().get_trex_ex_web_user(),Env().get_trex_ex_web_srv(),Env().get_trex_ex_web_path(),release_name)) #os.system("ssh -i %s -l %s %s 'cd %s/release/;rm latest; ln -P %s latest' " %(Env().get_trex_ex_web_key(),Env().get_trex_ex_web_user(),Env().get_trex_ex_web_srv(),Env().get_trex_ex_web_path(),release_name)) -#WIP -def release_successful(self): - print 'Not implemented' +# publish latest passed regression package (or custom commit from --publish_commit option) as be_latest to trex-tgn.cisco.com and internal wiki +def publish_both(self): + check_release_permission() + packages_dir = Env().get_env('TREX_LOCAL_PUBLISH_PATH') + '/experiment/packages' + publish_commit = self.options.publish_commit + if publish_commit: + package_file = '%s/%s.tar.gz' % (packages_dir, publish_commit) + else: + last_passed_commit_file = Env().get_trex_regression_workspace() + '/reports/last_passed_commit' + with open(last_passed_commit_file) as f: + last_passed_commit = f.read().strip() + package_file = '%s/%s.tar.gz' % (packages_dir, last_passed_commit) + publish(self, custom_source = package_file) + publish_ext(self, custom_source = package_file) + +# print detailed latest passed regression commit + brief info of 5 commits before it +def show(bld): + last_passed_commit_file = Env().get_trex_regression_workspace() + '/reports/last_passed_commit' + with open(last_passed_commit_file) as f: + last_passed_commit = f.read().strip() + + # last passed nightly + command = 'timeout 10 git show %s --quiet' % last_passed_commit + result, output = commands.getstatusoutput(command) + if result == 0: + print 'Last passed regression commit:\n%s\n' % output + else: + raise Exception('Error getting commit info with command: %s' % command) + + # brief list of 5 commits before passed + result, output = commands.getstatusoutput('git --version') + if result != 0 or output.startswith('git version 1'): + # old format, no color etc. + command = "timeout 10 git log --no-merges -n 5 --pretty=format:'%%h %%an %%ci %%s' %s^@" % last_passed_commit + else: + # new format, with color, padding, truncating etc. + command = "timeout 10 git log --no-merges -n 5 --pretty=format:'%%C(auto)%%h%%Creset %%<(10,trunc)%%an %%ci %%<(100,trunc)%%s' %s^@ " % last_passed_commit + result, output = commands.getstatusoutput(command) + if result == 0: + print output + else: + raise Exception('Error getting commits info with command: %s' % command) def test (bld): r=commands.getstatusoutput("git log --pretty=format:'%H' -n 1") diff --git a/linux_dpdk/wscript b/linux_dpdk/wscript index 67434a19..981e4b92 100755 --- a/linux_dpdk/wscript +++ b/linux_dpdk/wscript @@ -43,17 +43,16 @@ def publish_ext(bld): def publish_web(bld): ws_main.publish_web(bld) -def release_successful(bld): - ws_main.release_successful(bld) - def sync(bld): ws_main.sync(bld) def test(bld): ws_main.test(bld) +def show(bld): + ws_main.show(bld) - - +def publish_both(bld): + ws_main.publish_both(bld) diff --git a/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py b/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py index 96393d1e..b8831c04 100755 --- a/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py +++ b/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py @@ -212,7 +212,7 @@ class CTRexPktBuilder_Test(pkt_bld_general_test.CGeneralPktBld_Test): # finally, set IP header len with relation to payload data self.pkt_bld.set_layer_attr("l3_ip", "len", len(self.pkt_bld.get_layer('l3_ip'))) - filepath = "reports/test.pcap" + filepath = 'reports/test%s.pcap' % os.getenv('SETUP_DIR', '') self.pkt_bld.dump_pkt_to_pcap(filepath) assert os.path.isfile(filepath) # remove pcap after ensuring it exists diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 66e65a32..ef4c48f9 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -16,6 +16,7 @@ import time import datetime import zmq import re +import random from common.trex_stats import * from common.trex_streams import * @@ -143,18 +144,22 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client, prn_func = None): + def __init__ (self, server, port, stateless_client): self.port = port self.server = server + self.stateless_client = stateless_client - self.prn_func = prn_func + + self.event_handler = stateless_client.event_handler + self.logger = self.stateless_client.logger self.raw_snapshot = {} self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 + self.async_barrier = None self.connected = False @@ -166,13 +171,6 @@ class CTRexAsyncClient(): self.tr = "tcp://{0}:{1}".format(self.server, self.port) - msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) - - if self.prn_func: - self.prn_func(msg) - else: - print msg - # Socket to talk to server self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) @@ -188,17 +186,15 @@ class CTRexAsyncClient(): self.connected = True - - # wait for data streaming from the server - timeout = time.time() + 5 - while not self.is_alive(): - time.sleep(0.01) - if time.time() > timeout: - self.disconnect() - return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + rc = self.barrier() + if not rc: + self.disconnect() + return rc return RC_OK() + + # disconnect def disconnect (self): @@ -215,14 +211,14 @@ class CTRexAsyncClient(): # done self.connected = False + # thread function def _run (self): - # socket must be created on the same thread - self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.RCVTIMEO, 5000) + self.socket.connect(self.tr) got_data = False @@ -234,7 +230,7 @@ class CTRexAsyncClient(): # signal once if not got_data: - self.stateless_client.on_async_alive() + self.event_handler.on_async_alive() got_data = True @@ -243,7 +239,7 @@ class CTRexAsyncClient(): # signal once if got_data: - self.stateless_client.on_async_dead() + self.event_handler.on_async_dead() got_data = False continue @@ -283,11 +279,52 @@ class CTRexAsyncClient(): def __dispatch (self, name, type, data): # stats if name == "trex-global": - self.stateless_client.handle_async_stats_update(data) + self.event_handler.handle_async_stats_update(data) + # events elif name == "trex-event": - self.stateless_client.handle_async_event(type, data) + self.event_handler.handle_async_event(type, data) + + # barriers + elif name == "trex-barrier": + self.handle_async_barrier(type, data) else: pass + # async barrier handling routine + def handle_async_barrier (self, type, data): + if self.async_barrier['key'] == type: + self.async_barrier['ack'] = True + + + # block on barrier for async channel + def barrier(self, timeout = 5): + + # set a random key + key = random.getrandbits(32) + self.async_barrier = {'key': key, 'ack': False} + + # expr time + expr = time.time() + timeout + + while not self.async_barrier['ack']: + + # inject + rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + # fast loop + for i in xrange(0, 100): + if self.async_barrier['ack']: + break + time.sleep(0.001) + + if time.time() > expr: + return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) + + return RC_OK() + + + diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py index 848d5a9e..c25c73cb 100755 --- a/scripts/automation/trex_control_plane/client/trex_hltapi.py +++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py @@ -2,7 +2,7 @@ import trex_root_path from client_utils.packet_builder import CTRexPktBuilder -from trex_stateless_client import CTRexStatelessClient +from trex_stateless_client import STLClient from common.trex_streams import * from client_utils.general_utils import id_count_gen import dpkt @@ -20,7 +20,7 @@ class CTRexHltApi(object): # sync = RPC, async = ZMQ def connect(self, device, port_list, username, sync_port = 4501, async_port = 4500, reset=False, break_locks=False): ret_dict = {"status": 0} - self.trex_client = CTRexStatelessClient(username, device, sync_port, async_port) + self.trex_client = STLClient(username, device, sync_port, async_port) rc = self.trex_client.connect() if rc.bad(): diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py index 66d87f9d..94240f2a 100644 --- a/scripts/automation/trex_control_plane/client/trex_port.py +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -56,7 +56,7 @@ class Port(object): def err(self, msg): return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - def ok(self, data = "ACK"): + def ok(self, data = ""): return RC_OK(data) def get_speed_bps (self): @@ -198,6 +198,9 @@ class Port(object): # remove stream from port def remove_stream (self, stream_id): + if not self.is_acquired(): + return self.err("port is not owned") + if not stream_id in self.streams: return self.err("stream {0} does not exists".format(stream_id)) @@ -219,6 +222,9 @@ class Port(object): # remove all the streams def remove_all_streams (self): + if not self.is_acquired(): + return self.err("port is not owned") + params = {"handler": self.handler, "port_id": self.port_id} @@ -244,6 +250,10 @@ class Port(object): # start traffic def start (self, mul, duration): + + if not self.is_acquired(): + return self.err("port is not owned") + if self.state == self.STATE_DOWN: return self.err("Unable to start traffic - port is down") @@ -270,8 +280,15 @@ class Port(object): # with force ignores the cached state and sends the command def stop (self, force = False): - if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): - return self.err("port is not transmitting") + if not self.is_acquired(): + return self.err("port is not owned") + + # port is already stopped + if not force: + if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS): + return self.ok() + + params = {"handler": self.handler, "port_id": self.port_id} @@ -287,6 +304,9 @@ class Port(object): def pause (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -305,6 +325,9 @@ class Port(object): def resume (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_PAUSE) : return self.err("port is not in pause mode") @@ -322,6 +345,10 @@ class Port(object): def update (self, mul): + + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -338,6 +365,9 @@ class Port(object): def validate (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state == self.STATE_DOWN): return self.err("port is down") @@ -413,6 +443,11 @@ class Port(object): def clear_stats(self): return self.port_stats.clear_stats() + + def get_stats (self): + return self.port_stats.get_stats() + + def invalidate_stats(self): return self.port_stats.invalidate() diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index dc39bee6..c1a4d1d1 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -8,6 +8,7 @@ except ImportError: import client.outer_packages from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage +from client_utils import general_utils from client_utils.packet_builder import CTRexPktBuilder import json @@ -22,86 +23,185 @@ import re import random from trex_port import Port from common.trex_types import * - from trex_async_client import CTRexAsyncClient +# basic error for API +class STLError(Exception): + def __init__ (self, msg): + self.msg = str(msg) + + def __str__ (self): + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + + + s = "\n******\n" + s += "Error at {0}:{1}\n\n".format(format_text(fname, 'bold'), format_text(exc_tb.tb_lineno), 'bold') + s += "specific error:\n\n{0}\n".format(format_text(self.msg, 'bold')) + + return s + + def brief (self): + return self.msg + + +# raised when the client state is invalid for operation +class STLStateError(STLError): + def __init__ (self, op, state): + self.msg = "Operation '{0}' is not valid while '{1}'".format(op, state) + + +# port state error +class STLPortStateError(STLError): + def __init__ (self, port, op, state): + self.msg = "Operation '{0}' on port '{1}' is not valid for state '{2}'".format(op, port, state) + + +# raised when argument is not valid for operation +class STLArgumentError(STLError): + def __init__ (self, name, got, valid_values = None, extended = None): + self.msg = "Argument: '{0}' invalid value: '{1}'".format(name, got) + if valid_values: + self.msg += " - valid values are '{0}'".format(valid_values) + + if extended: + self.msg += "\n{0}".format(extended) -class CTRexStatelessClient(object): - """docstring for CTRexStatelessClient""" +# raised when timeout occurs +class STLTimeoutError(STLError): + def __init__ (self, timeout): + self.msg = "Timeout: operation took more than '{0}' seconds".format(timeout) + + +############################ logger ############################# +############################ ############################# +############################ ############################# + +# logger API for the client +class LoggerApi(object): # verbose levels - VERBOSE_QUIET = 0 + VERBOSE_QUIET = 0 VERBOSE_REGULAR = 1 VERBOSE_HIGH = 2 - - def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, quiet = False, virtual = False): - super(CTRexStatelessClient, self).__init__() - self.user = username + def __init__(self): + self.level = LoggerApi.VERBOSE_REGULAR + + # implemented by specific logger + def write(self, msg, newline = True): + raise Exception("implement this") + + # implemented by specific logger + def flush(self): + raise Exception("implement this") + + def set_verbose (self, level): + if not level in xrange(self.VERBOSE_QUIET, self.VERBOSE_HIGH + 1): + raise ValueError("bad value provided for logger") + + self.level = level + + def get_verbose (self): + return self.level + + + def check_verbose (self, level): + return (self.level >= level) + + + # simple log message with verbose + def log (self, msg, level = VERBOSE_REGULAR, newline = True): + if not self.check_verbose(level): + return + + self.write(msg, newline) + + # logging that comes from async event + def async_log (self, msg, level = VERBOSE_REGULAR, newline = True): + self.log(msg, level, newline) - self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func) - # default verbose level - if not quiet: - self.verbose = self.VERBOSE_REGULAR + def pre_cmd (self, desc): + self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) + self.flush() + + def post_cmd (self, rc): + if rc: + self.log(format_text("[SUCCESS]\n", 'green', 'bold')) else: - self.verbose = self.VERBOSE_QUIET + self.log(format_text("[FAILED]\n", 'red', 'bold')) - self.ports = {} - self._connection_info = {"server": server, - "sync_port": sync_port, - "async_port": async_port} - self.system_info = {} - self.server_version = {} - self.__err_log = None - self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func) + def log_cmd (self, desc): + self.pre_cmd(desc) + self.post_cmd(True) - self.streams_db = CStreamsDB() - self.global_stats = trex_stats.CGlobalStats(self._connection_info, - self.server_version, - self.ports) - self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, - self.ports) - self.events = [] + # supress object getter + def supress (self): + class Supress(object): + def __init__ (self, logger): + self.logger = logger - self.session_id = random.getrandbits(32) - self.read_only = False - self.connected = False - self.prompt_redraw_cb = None + def __enter__ (self): + self.saved_level = self.logger.get_verbose() + self.logger.set_verbose(LoggerApi.VERBOSE_QUIET) + def __exit__ (self, type, value, traceback): + self.logger.set_verbose(self.saved_level) - # returns the port object - def get_port (self, port_id): - return self.ports.get(port_id, None) + return Supress(self) - # connection server ip - def get_server_ip (self): - return self.comm_link.get_server() - # connection server port - def get_server_port (self): - return self.comm_link.get_port() +# default logger - to stdout +class DefaultLogger(LoggerApi): + def write (self, msg, newline = True): + if newline: + print msg + else: + print msg, + def flush (self): + sys.stdout.flush() - ################# events handler ###################### - def add_event_log (self, msg, ev_type, show = False): - if ev_type == "server": - prefix = "[server]" - elif ev_type == "local": - prefix = "[local]" +############################ async event hander ############################# +############################ ############################# +############################ ############################# - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) +# handles different async events given to the client +class AsyncEventHandler(object): - if show: - self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))), redraw_console = True) - + def __init__ (self, client): + self.client = client + self.logger = self.client.logger + + self.events = [] + # public functions + + def get_events (self): + return self.events + + + def clear_events (self): + self.events = [] + + + def on_async_dead (self): + if self.client.connected: + msg = 'lost connection to server' + self.__add_event_log(msg, 'local', True) + self.client.connected = False + + + def on_async_alive (self): + pass + + + # handles an async stats update from the subscriber def handle_async_stats_update(self, dump_data): global_stats = {} port_stats = {} @@ -113,7 +213,7 @@ class CTRexStatelessClient(object): if m: port_id = int(m.group(2)) field_name = m.group(1) - if self.ports.has_key(port_id): + if self.client.ports.has_key(port_id): if not port_id in port_stats: port_stats[port_id] = {} port_stats[port_id][field_name] = value @@ -124,13 +224,14 @@ class CTRexStatelessClient(object): global_stats[key] = value # update the general object with the snapshot - self.global_stats.update(global_stats) + self.client.global_stats.update(global_stats) + # update all ports for port_id, data in port_stats.iteritems(): - self.ports[port_id].port_stats.update(data) - + self.client.ports[port_id].port_stats.update(data) + # dispatcher for server async events (port started, port stopped and etc.) def handle_async_event (self, type, data): # DP stopped @@ -140,7 +241,7 @@ class CTRexStatelessClient(object): if (type == 0): port_id = int(data['port_id']) ev = "Port {0} has started".format(port_id) - self.async_event_port_started(port_id) + self.__async_event_port_started(port_id) # port stopped elif (type == 1): @@ -148,8 +249,8 @@ class CTRexStatelessClient(object): ev = "Port {0} has stopped".format(port_id) # call the handler - self.async_event_port_stopped(port_id) - + self.__async_event_port_stopped(port_id) + # port paused elif (type == 2): @@ -157,7 +258,7 @@ class CTRexStatelessClient(object): ev = "Port {0} has paused".format(port_id) # call the handler - self.async_event_port_paused(port_id) + self.__async_event_port_paused(port_id) # port resumed elif (type == 3): @@ -165,7 +266,7 @@ class CTRexStatelessClient(object): ev = "Port {0} has resumed".format(port_id) # call the handler - self.async_event_port_resumed(port_id) + self.__async_event_port_resumed(port_id) # port finished traffic elif (type == 4): @@ -173,7 +274,7 @@ class CTRexStatelessClient(object): ev = "Port {0} job done".format(port_id) # call the handler - self.async_event_port_stopped(port_id) + self.__async_event_port_stopped(port_id) show_event = True # port was stolen... @@ -181,7 +282,7 @@ class CTRexStatelessClient(object): session_id = data['session_id'] # false alarm, its us - if session_id == self.session_id: + if session_id == self.client.session_id: return port_id = int(data['port_id']) @@ -190,13 +291,13 @@ class CTRexStatelessClient(object): ev = "Port {0} was forcely taken by '{1}'".format(port_id, who) # call the handler - self.async_event_port_forced_acquired(port_id) + self.__async_event_port_forced_acquired(port_id) show_event = True # server stopped elif (type == 100): ev = "Server has stopped" - self.async_event_server_stopped() + self.__async_event_server_stopped() show_event = True @@ -205,338 +306,242 @@ class CTRexStatelessClient(object): return - self.add_event_log(ev, 'server', show_event) - - - def async_event_port_stopped (self, port_id): - self.ports[port_id].async_event_port_stopped() - - - def async_event_port_started (self, port_id): - self.ports[port_id].async_event_port_started() - - - def async_event_port_paused (self, port_id): - self.ports[port_id].async_event_port_paused() - + self.__add_event_log(ev, 'server', show_event) - def async_event_port_resumed (self, port_id): - self.ports[port_id].async_event_port_resumed() + # private functions - def async_event_port_forced_acquired (self, port_id): - self.ports[port_id].async_event_forced_acquired() - self.read_only = True + def __async_event_port_stopped (self, port_id): + self.client.ports[port_id].async_event_port_stopped() - def async_event_server_stopped (self): - self.connected = False + def __async_event_port_started (self, port_id): + self.client.ports[port_id].async_event_port_started() - def get_events (self): - return self.events - def clear_events (self): - self.events = [] + def __async_event_port_paused (self, port_id): + self.client.ports[port_id].async_event_port_paused() - ############# helper functions section ############## - # measure time for functions - def timing(f): - def wrap(*args): - time1 = time.time() - ret = f(*args) + def __async_event_port_resumed (self, port_id): + self.client.ports[port_id].async_event_port_resumed() - # don't want to print on error - if ret.bad(): - return ret - delta = time.time() - time1 - print format_time(delta) + "\n" + def __async_event_port_forced_acquired (self, port_id): + self.client.ports[port_id].async_event_forced_acquired() - return ret - return wrap + def __async_event_server_stopped (self): + self.client.connected = False - def validate_port_list(self, port_id_list): - if not isinstance(port_id_list, list): - print type(port_id_list) - return False - - # check each item of the sequence - return all([ (port_id >= 0) and (port_id < self.get_port_count()) - for port_id in port_id_list ]) - - # some preprocessing for port argument - def __ports (self, port_id_list): - - # none means all - if port_id_list == None: - return range(0, self.get_port_count()) - - # always list - if isinstance(port_id_list, int): - port_id_list = [port_id_list] - - if not isinstance(port_id_list, list): - raise ValueError("bad port id list: {0}".format(port_id_list)) - - for port_id in port_id_list: - if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): - raise ValueError("bad port id {0}".format(port_id)) - - return port_id_list - - ############ boot up section ################ - - # connection sequence - - # mode can be RW - read / write, RWF - read write with force , RO - read only - def connect(self, mode = "RW"): - - if self.is_connected(): - self.disconnect() - - # clear this flag - self.connected = False - - # connect sync channel - rc = self.comm_link.connect() - if rc.bad(): - return rc - - # connect async channel - rc = self.async_client.connect() - if rc.bad(): - return rc - - # version - rc = self.transmit("get_version") - if rc.bad(): - return rc - - self.server_version = rc.data() - self.global_stats.server_version = rc.data() - - # cache system info - rc = self.transmit("get_system_info") - if rc.bad(): - return rc - - self.system_info = rc.data() - - # cache supported commands - rc = self.transmit("get_supported_cmds") - if rc.bad(): - return rc - - self.supported_cmds = rc.data() - - # create ports - for port_id in xrange(self.get_port_count()): - speed = self.system_info['ports'][port_id]['speed'] - driver = self.system_info['ports'][port_id]['driver'] - - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id) + # add event to log + def __add_event_log (self, msg, ev_type, show = False): + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" - # sync the ports - rc = self.sync_ports() - if rc.bad(): - return rc + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) - # acquire all ports - if mode == "RW": - rc = self.acquire(force = False) + if show: + self.logger.async_log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) - # fallback to read only if failed - if rc.bad(): - rc.annotate(show_status = False) - print format_text("Switching to read only mode - only few commands will be available", 'bold') - self.release(self.get_acquired_ports()) - self.read_only = True - else: - self.read_only = False + - elif mode == "RWF": - rc = self.acquire(force = True) - if rc.bad(): - return rc - self.read_only = False - elif mode == "RO": - # no acquire on read only - rc = RC_OK() - self.read_only = True +############################ RPC layer ############################# +############################ ############################# +############################ ############################# +class CCommLink(object): + """describes the connectivity of the stateless client method""" + def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): + self.virtual = virtual + self.server = server + self.port = port + self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) - - self.connected = True - return RC_OK() - + @property + def is_connected(self): + if not self.virtual: + return self.rpc_link.connected + else: + return True - def is_read_only (self): - return self.read_only + def get_server (self): + return self.server - def is_connected (self): - return self.connected and self.comm_link.is_connected + def get_port (self): + return self.port + def connect(self): + if not self.virtual: + return self.rpc_link.connect() def disconnect(self): - # release any previous acquired ports - if self.is_connected(): - self.release(self.get_acquired_ports()) - - self.comm_link.disconnect() - self.async_client.disconnect() - - self.connected = False - - return RC_OK() - - - def on_async_dead (self): - if self.connected: - msg = 'lost connection to server' - self.add_event_log(msg, 'local', True) - self.connected = False - - def on_async_alive (self): - pass - - ########### cached queries (no server traffic) ########### + if not self.virtual: + return self.rpc_link.disconnect() - def get_supported_cmds(self): - return self.supported_cmds + def transmit(self, method_name, params={}): + if self.virtual: + self._prompt_virtual_tx_msg() + _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) + print msg + return + else: + return self.rpc_link.invoke_rpc_method(method_name, params) - def get_version(self): - return self.server_version + def transmit_batch(self, batch_list): + if self.virtual: + self._prompt_virtual_tx_msg() + print [msg + for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) + for command in batch_list]] + else: + batch = self.rpc_link.create_batch() + for command in batch_list: + batch.add(command.method, command.params) + # invoke the batch + return batch.invoke() - def get_system_info(self): - return self.system_info + def _prompt_virtual_tx_msg(self): + print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, + port=self.port) - def get_port_count(self): - return self.system_info.get("port_count") - def get_port_ids(self, as_str=False): - port_ids = range(self.get_port_count()) - if as_str: - return " ".join(str(p) for p in port_ids) - else: - return port_ids - def get_stats_async (self): - return self.async_client.get_stats() +############################ client ############################# +############################ ############################# +############################ ############################# - def get_connection_port (self): - return self.comm_link.port +class STLClient(object): + """docstring for STLClient""" - def get_connection_ip (self): - return self.comm_link.server + def __init__(self, + username = general_utils.get_current_user(), + server = "localhost", + sync_port = 4501, + async_port = 4500, + verbose_level = LoggerApi.VERBOSE_QUIET, + logger = None, + virtual = False): - def get_all_ports (self): - return [port_id for port_id, port_obj in self.ports.iteritems()] - def get_acquired_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_acquired()] + self.username = username + + # init objects + self.ports = {} + self.server_version = {} + self.system_info = {} + self.session_id = random.getrandbits(32) + self.connected = False - def get_active_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_active()] + # logger + self.logger = DefaultLogger() if not logger else logger - def get_paused_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_paused()] - - def get_transmitting_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_transmitting()] + # initial verbose + self.logger.set_verbose(verbose_level) - def set_verbose(self, mode): + # low level RPC layer + self.comm_link = CCommLink(server, + sync_port, + virtual, + self.logger) - # on high - enable link verbose - if mode == self.VERBOSE_HIGH: - self.comm_link.set_verbose(True) - else: - self.comm_link.set_verbose(False) + # async event handler manager + self.event_handler = AsyncEventHandler(self) - self.verbose = mode + # async subscriber level + self.async_client = CTRexAsyncClient(server, + async_port, + self) + + - def check_verbose (self, level): - return (self.verbose >= level) + # stats + self.connection_info = {"username": username, + "server": server, + "sync_port": sync_port, + "async_port": async_port, + "virtual": virtual} - def get_verbose (self): - return self.verbose + + self.global_stats = trex_stats.CGlobalStats(self.connection_info, + self.server_version, + self.ports) - def prn_func (self, msg, level = VERBOSE_REGULAR, redraw_console = False): - if not self.check_verbose(level): - return + self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, + self.ports) - if redraw_console and self.prompt_redraw_cb: - print "\n" + msg + "\n" - self.prompt_redraw_cb() - else: - print msg + # stream DB + self.streams_db = CStreamsDB() - sys.stdout.flush() + + + ############# private functions - used by the class itself ########### - def set_prompt_redraw_cb(self, cb): - self.prompt_redraw_cb = cb + # some preprocessing for port argument + def __ports (self, port_id_list): - ############# server actions ################ + # none means all + if port_id_list == None: + return range(0, self.get_port_count()) - # ping server - def ping(self): - return self.transmit("ping") + # always list + if isinstance(port_id_list, int): + port_id_list = [port_id_list] + if not isinstance(port_id_list, list): + raise ValueError("bad port id list: {0}".format(port_id_list)) + for port_id in port_id_list: + if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): + raise ValueError("bad port id {0}".format(port_id)) - def get_global_stats(self): - return self.transmit("get_global_stats") + return port_id_list - ########## port commands ############## - def sync_ports (self, port_id_list = None, force = False): + # sync ports + def __sync_ports (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].sync()) - + return rc # acquire ports, if port_list is none - get all - def acquire (self, port_id_list = None, force = False): + def __acquire (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].acquire(force)) - + return rc - + # release ports - def release (self, port_id_list = None): + def __release (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].release()) - + return rc - - def add_stream(self, stream_id, stream_obj, port_id_list = None): + + def __add_stream(self, stream_id, stream_obj, port_id_list = None): port_id_list = self.__ports(port_id_list) @@ -544,12 +549,12 @@ class CTRexStatelessClient(object): for port_id in port_id_list: rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) - + return rc - - def add_stream_pack(self, stream_pack, port_id_list = None): + + def __add_stream_pack(self, stream_pack, port_id_list = None): port_id_list = self.__ports(port_id_list) @@ -562,45 +567,45 @@ class CTRexStatelessClient(object): - def remove_stream(self, stream_id, port_id_list = None): + def __remove_stream(self, stream_id, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].remove_stream(stream_id)) - + return rc - def remove_all_streams(self, port_id_list = None): + def __remove_all_streams(self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].remove_all_streams()) - + return rc - - def get_stream(self, stream_id, port_id, get_pkt = False): + + def __get_stream(self, stream_id, port_id, get_pkt = False): return self.ports[port_id].get_stream(stream_id) - def get_all_streams(self, port_id, get_pkt = False): + def __get_all_streams(self, port_id, get_pkt = False): return self.ports[port_id].get_all_streams() - def get_stream_id_list(self, port_id): + def __get_stream_id_list(self, port_id): return self.ports[port_id].get_stream_id_list() - def start_traffic (self, multiplier, duration, port_id_list = None): + def __start_traffic (self, multiplier, duration, port_id_list = None): port_id_list = self.__ports(port_id_list) @@ -608,11 +613,11 @@ class CTRexStatelessClient(object): for port_id in port_id_list: rc.add(self.ports[port_id].start(multiplier, duration)) - + return rc - def resume_traffic (self, port_id_list = None, force = False): + def __resume_traffic (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() @@ -622,7 +627,7 @@ class CTRexStatelessClient(object): return rc - def pause_traffic (self, port_id_list = None, force = False): + def __pause_traffic (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() @@ -632,280 +637,937 @@ class CTRexStatelessClient(object): return rc - def stop_traffic (self, port_id_list = None, force = False): + + def __stop_traffic (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].stop(force)) - + return rc - def update_traffic (self, mult, port_id_list = None, force = False): + def __update_traffic (self, mult, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].update(mult)) - + return rc - def validate (self, port_id_list = None): + def __validate_traffic (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: rc.add(self.ports[port_id].validate()) - + return rc - def get_port_stats(self, port_id=None): - pass - def get_stream_stats(self, port_id=None): - pass + # connect to server + def __connect(self): - def transmit(self, method_name, params={}): - return self.comm_link.transmit(method_name, params) + # first disconnect if already connected + if self.is_connected(): + self.__disconnect() + # clear this flag + self.connected = False - def transmit_batch(self, batch_list): - return self.comm_link.transmit_batch(batch_list) + # connect sync channel + self.logger.pre_cmd("connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port'])) + rc = self.comm_link.connect() + self.logger.post_cmd(rc) - ######################### Console (high level) API ######################### + if not rc: + return rc - @timing - def cmd_ping(self): - rc = self.ping() - rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) - return rc + # connect async channel + self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port'])) + rc = self.async_client.connect() + self.logger.post_cmd(rc) - def cmd_connect(self, mode = "RW"): - rc = self.connect(mode) - rc.annotate() - return rc + if not rc: + return rc - def cmd_disconnect(self): - rc = self.disconnect() - rc.annotate() - return rc + # version + rc = self._transmit("get_version") + if not rc: + return rc - # reset - def cmd_reset(self): - #self.release(self.get_acquired_ports()) + self.server_version = rc.data() + self.global_stats.server_version = rc.data() - rc = self.acquire(force = True) - rc.annotate("Force acquiring all ports:") - if rc.bad(): + # cache system info + rc = self._transmit("get_system_info") + if not rc: return rc + self.system_info = rc.data() - # force stop all ports - rc = self.stop_traffic(self.get_port_ids(), True) - rc.annotate("Stop traffic on all ports:") - if rc.bad(): + # cache supported commands + rc = self._transmit("get_supported_cmds") + if not rc: return rc + self.supported_cmds = rc.data() - # remove all streams - rc = self.remove_all_streams(self.get_port_ids()) - rc.annotate("Removing all streams from all ports:") - if rc.bad(): + # create ports + for port_id in xrange(self.system_info["port_count"]): + speed = self.system_info['ports'][port_id]['speed'] + driver = self.system_info['ports'][port_id]['driver'] + + self.ports[port_id] = Port(port_id, + speed, + driver, + self.username, + self.comm_link, + self.session_id) + + + # sync the ports + rc = self.__sync_ports() + if not rc: return rc - # TODO: clear stats - return RC_OK() + self.connected = True + return RC_OK() - # stop cmd - def cmd_stop (self, port_id_list): - # find the relveant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + # disconenct from server + def __disconnect(self): + # release any previous acquired ports + if self.is_connected(): + self.__release(self.get_acquired_ports()) - if not active_ports: - msg = "No active traffic on provided ports" - print format_text(msg, 'bold') - return RC_ERR(msg) + self.comm_link.disconnect() + self.async_client.disconnect() - rc = self.stop_traffic(active_ports) - rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc + self.connected = False return RC_OK() - # update cmd - def cmd_update (self, port_id_list, mult): - # find the relevant ports + # ping server + def __ping (self): + return self._transmit("ping") + + + # start command + def __start (self, port_id_list, stream_list, mult, force, duration, dry): + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - if not active_ports: - msg = "No active traffic on provided ports" - print format_text(msg, 'bold') - return RC_ERR(msg) + if active_ports: + if not force: + msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) + self.logger.log(format_text(msg, 'bold')) + return RC_ERR(msg) + else: + rc = self.__stop(active_ports) + if not rc: + return rc + - rc = self.update_traffic(mult, active_ports) - rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list)) + self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(port_id_list)) + rc = self.__remove_all_streams(port_id_list) + self.logger.post_cmd(rc) - return rc + if not rc: + return rc + - # clear stats - def cmd_clear(self, port_id_list): + self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) + rc = self.__add_stream_pack(stream_list, port_id_list) + self.logger.post_cmd(rc) - for port_id in port_id_list: - self.ports[port_id].clear_stats() + if not rc: + return rc + + # when not on dry - start the traffic , otherwise validate only + if not dry: - self.global_stats.clear_stats() + self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(port_id_list)) + rc = self.__start_traffic(mult, duration, port_id_list) + self.logger.post_cmd(rc) - return RC_OK() + return rc + else: + self.logger.pre_cmd("Validating traffic profile on port(s) {0}:".format(port_id_list)) + rc = self.__validate(port_id_list) + self.logger.post_cmd(rc) + - def cmd_invalidate (self, port_id_list): - for port_id in port_id_list: - self.ports[port_id].invalidate_stats() + if rc.bad(): + return rc + + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) + + return rc - self.global_stats.invalidate() - return RC_OK() + # stop cmd + def __stop (self, port_id_list): - # pause cmd - def cmd_pause (self, port_id_list): + self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(port_id_list)) + rc = self.__stop_traffic(port_id_list) + self.logger.post_cmd(rc) - # find the relevant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + if not rc: + return rc + + return RC_OK() - if not active_ports: - msg = "No active traffic on provided ports" - print format_text(msg, 'bold') - return RC_ERR(msg) + #update cmd + def __update (self, port_id_list, mult): + + self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(port_id_list)) + rc = self.__update_traffic(mult, port_id_list) + self.logger.post_cmd(rc) - rc = self.pause_traffic(active_ports) - rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list)) return rc + # pause cmd + def __pause (self, port_id_list): + + self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(port_id_list)) + rc = self.__pause_traffic(port_id_list) + self.logger.post_cmd(rc) + + return rc + # resume cmd - def cmd_resume (self, port_id_list): + def __resume (self, port_id_list): - # find the relveant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(port_id_list)) + rc = self.__resume_traffic(port_id_list) + self.logger.post_cmd(rc) + + return rc - if not active_ports: - msg = "No active traffic on porvided ports" - print format_text(msg, 'bold') - return RC_ERR(msg) - rc = self.resume_traffic(active_ports) - rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list)) + # validate port(s) profile + def __validate (self, port_id_list): + self.logger.pre_cmd("Validating streams on port(s) {0}:".format(port_id_list)) + rc = self.__validate_traffic(port_id_list) + self.logger.post_cmd(rc) + return rc - # start cmd - def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): + # clear stats + def __clear_stats(self, port_id_list, clear_global): - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + for port_id in port_id_list: + self.ports[port_id].clear_stats() - if active_ports: - if not force: - msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) - print format_text(msg, 'bold') - return RC_ERR(msg) - else: - rc = self.cmd_stop(active_ports) - if not rc: + if clear_global: + self.global_stats.clear_stats() + + self.logger.pre_cmd("clearing stats on port(s) {0}:".format(port_id_list)) + rc = RC_OK() + self.logger.post_cmd(rc) + + return RC + + + # get stats + def __get_stats (self, port_id_list): + stats = {} + + stats['global'] = self.global_stats.get_stats() + + total = {} + for port_id in port_id_list: + port_stats = self.ports[port_id].get_stats() + stats["port {0}".format(port_id)] = port_stats + + for k, v in port_stats.iteritems(): + if not k in total: + total[k] = v + else: + total[k] += v + + stats['total'] = total + + return stats + + + def __process_profiles (self, profiles, out): + + for profile in (profiles if isinstance(profiles, list) else [profiles]): + # filename + if isinstance(profile, str): + + if not os.path.isfile(profile): + return RC_ERR("file '{0}' does not exists".format(profile)) + + try: + stream_list = self.streams_db.load_yaml_file(profile) + except Exception as e: + rc = RC_ERR(str(e)) return rc + out.append(stream_list) - rc = self.remove_all_streams(port_id_list) - rc.annotate("Removing all streams from port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc + else: + return RC_ERR("unknown profile '{0}'".format(profile)) - rc = self.add_stream_pack(stream_list, port_id_list) - rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) - if rc.bad(): - return rc + return RC_OK() - # when not on dry - start the traffic , otherwise validate only - if not dry: - rc = self.start_traffic(mult, duration, port_id_list) - rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - return rc - else: - rc = self.validate(port_id_list) - rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list)) + ############ functions used by other classes but not users ############## - if rc.bad(): - return rc + def _verify_port_id_list (self, port_id_list): + # check arguments + if not isinstance(port_id_list, list): + return RC_ERR("ports should be an instance of 'list' not {0}".format(type(port_id_list))) - # show a profile on one port for illustration - self.ports[port_id_list[0]].print_profile(mult, duration) + # all ports are valid ports + if not port_id_list or not all([port_id in self.get_all_ports() for port_id in port_id_list]): + return RC_ERR("") - return rc + return RC_OK() + + def _validate_port_list(self, port_id_list): + if not isinstance(port_id_list, list): + return False + # check each item of the sequence + return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list])) - # validate port(s) profile - def cmd_validate (self, port_id_list): - rc = self.validate(port_id_list) - rc.annotate("Validating streams on port(s) {0}:".format(port_id_list)) - return rc + # transmit request on the RPC link + def _transmit(self, method_name, params={}): + return self.comm_link.transmit(method_name, params) + + # transmit batch request on the RPC link + def _transmit_batch(self, batch_list): + return self.comm_link.transmit_batch(batch_list) + # stats - def cmd_stats(self, port_id_list, stats_mask=set()): + def _get_formatted_stats(self, port_id_list, stats_mask=set()): stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) stats_obj = {} for stats_type in stats_opts: stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type)) + return stats_obj - def cmd_streams(self, port_id_list, streams_mask=set()): + def _get_streams(self, port_id_list, streams_mask=set()): streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask) return streams_obj - ############## High Level API With Parser ################ + def _invalidate_stats (self, port_id_list): + for port_id in port_id_list: + self.ports[port_id].invalidate_stats() + + self.global_stats.invalidate() + + return RC_OK() + + + + + + ################################# + # ------ private methods ------ # + @staticmethod + def __get_mask_keys(ok_values={True}, **kwargs): + masked_keys = set() + for key, val in kwargs.iteritems(): + if val in ok_values: + masked_keys.add(key) + return masked_keys + + @staticmethod + def __filter_namespace_args(namespace, ok_values): + return {k: v for k, v in namespace.__dict__.items() if k in ok_values} + + + # API decorator - double wrap because of argument + def __api_check(connected = True): + + def wrap (f): + def wrap2(*args, **kwargs): + client = args[0] + + func_name = f.__name__ + + # check connection + if connected and not client.is_connected(): + raise STLStateError(func_name, 'disconnected') + + ret = f(*args, **kwargs) + return ret + return wrap2 + + return wrap + + + + ############################ API ############################# + ############################ ############################# + ############################ ############################# + def __enter__ (self): + self.connect(mode = "RWF") + self.reset() + return self + + def __exit__ (self, type, value, traceback): + if self.get_active_ports(): + self.stop(self.get_active_ports()) + self.disconnect() + + ############################ Getters ############################# + ############################ ############################# + ############################ ############################# + + + # return verbose level of the logger + def get_verbose (self): + return self.logger.get_verbose() + + # is the client on read only mode ? + def is_all_ports_acquired (self): + return not (self.get_all_ports() == self.get_acquired_ports()) + + # is the client connected ? + def is_connected (self): + return self.connected and self.comm_link.is_connected + + + # get connection info + def get_connection_info (self): + return self.connection_info + + + # get supported commands by the server + def get_server_supported_cmds(self): + return self.supported_cmds + + # get server version + def get_server_version(self): + return self.server_version + + # get server system info + def get_server_system_info(self): + return self.system_info + + # get port count + def get_port_count(self): + return len(self.ports) + + + # returns the port object + def get_port (self, port_id): + port = self.ports.get(port_id, None) + if (port != None): + return port + else: + raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports()) + + + # get all ports as IDs + def get_all_ports (self): + return self.ports.keys() + + # get all acquired ports + def get_acquired_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_acquired()] + + # get all active ports (TX or pause) + def get_active_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_active()] + + # get paused ports + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + # get all TX ports + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + + + # get stats + def get_stats (self, ports = None, async_barrier = True): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # check async barrier + if not type(async_barrier) is bool: + raise STLArgumentError('async_barrier', async_barrier) + + + # if the user requested a barrier - use it + if async_barrier: + rc = self.async_client.barrier() + if not rc: + raise STLError(rc) + + return self.__get_stats(ports) + + # return all async events + def get_events (self): + return self.event_handler.get_events() + + ############################ Commands ############################# + ############################ ############################# + ############################ ############################# + + + # set the log on verbose level + def set_verbose (self, level): + self.logger.set_verbose(level) + + + # connects to the server + # mode can be: + # 'RO' - read only + # 'RW' - read/write + # 'RWF' - read write forced (take ownership) + @__api_check(False) + def connect (self, mode = "RW"): + modes = ['RO', 'RW', 'RWF'] + if not mode in modes: + raise STLArgumentError('mode', mode, modes) + + rc = self.__connect() + if not rc: + raise STLError(rc) + + # acquire all ports for 'RW' or 'RWF' + if (mode == "RW") or (mode == "RWF"): + self.acquire(ports = self.get_all_ports(), force = True if mode == "RWF" else False) + + + + + # acquire ports + # this is not needed if connect was called with "RW" or "RWF" + # but for "RO" this might be needed + @__api_check(True) + def acquire (self, ports = None, force = False): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify ports + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify valid port id list + if force: + self.logger.pre_cmd("Force acquiring ports {0}:".format(ports)) + else: + self.logger.pre_cmd("Acquiring ports {0}:".format(ports)) + + rc = self.__acquire(ports, force) + + self.logger.post_cmd(rc) + + if not rc: + self.__release(ports) + raise STLError(rc) + + + + # force connect syntatic sugar + @__api_check(False) + def fconnect (self): + self.connect(mode = "RWF") + + + # disconnects from the server + @__api_check(False) + def disconnect (self, log = True): + rc = self.__disconnect() + if log: + self.logger.log_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], + self.connection_info['sync_port'])) + if not rc: + raise STLError(rc) + + + + # teardown - call after test is done + # NEVER throws an exception + @__api_check(False) + def teardown (self, stop_traffic = True): + + # try to stop traffic + if stop_traffic and self.get_active_ports(): + try: + self.stop() + except STLError: + pass + + # disconnect + self.__disconnect() + + + + # pings the server on the RPC channel + @__api_check(True) + def ping(self): + self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], + self.connection_info['sync_port'])) + rc = self.__ping() + + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + + + # reset the server by performing + # force acquire, stop, and remove all streams + @__api_check(True) + def reset(self): + + self.logger.pre_cmd("Force acquiring all ports:") + rc = self.__acquire(force = True) + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + + # force stop all ports + self.logger.pre_cmd("Stop traffic on all ports:") + rc = self.__stop_traffic(self.get_all_ports(), True) + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + + # remove all streams + self.logger.pre_cmd("Removing all streams from all ports:") + rc = self.__remove_all_streams(self.get_all_ports()) + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + self.clear_stats() + + + # start cmd + @__api_check(True) + def start (self, + profiles, + ports = None, + mult = "1", + force = False, + duration = -1, + dry = False, + total = False): + + + # by default use all ports + if ports == None: + ports = self.get_acquired_ports() - def cmd_connect_line (self, line): + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify multiplier + mult_obj = parsing_opts.decode_multiplier(mult, + allow_update = False, + divide_count = len(ports) if total else 1) + if not mult_obj: + raise STLArgumentError('mult', mult) + + # some type checkings + + if not type(force) is bool: + raise STLArgumentError('force', force) + + if not isinstance(duration, (int, float)): + raise STLArgumentError('duration', duration) + + if not type(total) is bool: + raise STLArgumentError('total', total) + + + # process profiles + stream_list = [] + rc = self.__process_profiles(profiles, stream_list) + if not rc: + raise STLError(rc) + + # dry run + if dry: + self.logger.log(format_text("\n*** DRY RUN ***", 'bold')) + + # call private method to start + + rc = self.__start(ports, stream_list[0], mult_obj, force, duration, dry) + if not rc: + raise STLError(rc) + + + + # stop traffic on ports + @__api_check(True) + def stop (self, ports = None): + + # by default the user means all the active ports + if ports == None: + ports = self.get_active_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__stop(ports) + if not rc: + raise STLError(rc) + + + + # update traffic + @__api_check(True) + def update (self, ports = None, mult = "1", total = False): + + # by default the user means all the active ports + if ports == None: + ports = self.get_active_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify multiplier + mult_obj = parsing_opts.decode_multiplier(mult, + allow_update = True, + divide_count = len(ports) if total else 1) + if not mult_obj: + raise STLArgumentError('mult', mult) + + # verify total + if not type(total) is bool: + raise STLArgumentError('total', total) + + + # call low level functions + rc = self.__update(ports, mult_obj) + if not rc: + raise STLError(rc) + + + + # pause traffic on ports + @__api_check(True) + def pause (self, ports = None): + + # by default the user means all the TX ports + if ports == None: + ports = self.get_transmitting_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__pause(ports) + if not rc: + raise STLError(rc) + + + + # resume traffic on ports + @__api_check(True) + def resume (self, ports = None): + + # by default the user means all the paused ports + if ports == None: + ports = self.get_paused_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__resume(ports) + if not rc: + raise STLError(rc) + + + @__api_check(True) + def validate (self, ports = None): + if ports == None: + ports = self.get_acquired_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__validate(ports) + if not rc: + raise STLError(rc) + + + # clear stats + @__api_check(False) + def clear_stats (self, ports = None, clear_global = True): + + # by default use all ports + if ports == None: + ports = self.get_acquired_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify clear global + if not type(clear_global) is bool: + raise STLArgumentError('clear_global', clear_global) + + + rc = self.__clear_stats(ports, clear_global) + if not rc: + raise STLError(rc) + + + + + + # wait while traffic is on, on timeout throw STLTimeoutError + @__api_check(True) + def wait_on_traffic (self, ports = None, timeout = 60): + + # by default use all acquired ports + if ports == None: + ports = self.get_acquired_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + expr = time.time() + timeout + + # wait while any of the required ports are active + while set(self.get_active_ports()).intersection(ports): + time.sleep(0.01) + if time.time() > expr: + raise STLTimeoutError(timeout) + + + # clear all async events + def clear_events (self): + self.event_handler.clear_events() + + ############################ Line ############################# + ############################ Commands ############################# + ############################ ############################# + # console decorator + def __console(f): + def wrap(*args): + client = args[0] + + time1 = time.time() + + try: + rc = f(*args) + except STLError as e: + client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) + return + + # if got true - print time + if rc: + delta = time.time() - time1 + client.logger.log(format_time(delta) + "\n") + + + return wrap + + + @__console + def connect_line (self, line): '''Connects to the TRex server''' # define a parser parser = parsing_opts.gen_parser(self, "connect", - self.cmd_connect_line.__doc__, + self.connect_line.__doc__, parsing_opts.FORCE) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return - if opts.force: - rc = self.cmd_connect(mode = "RWF") - else: - rc = self.cmd_connect(mode = "RW") + # call the API + self.connect("RWF" if opts.force else "RW") - @timing - def cmd_start_line (self, line): + # true means print time + return True + + @__console + def disconnect_line (self, line): + self.disconnect() + + + + @__console + def reset_line (self, line): + self.reset() + + # true means print time + return True + + + @__console + def start_line (self, line): '''Start selected traffic in specified ports on TRex\n''' # define a parser parser = parsing_opts.gen_parser(self, "start", - self.cmd_start_line.__doc__, + self.start_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.TOTAL, parsing_opts.FORCE, @@ -918,361 +1580,220 @@ class CTRexStatelessClient(object): if opts is None: - return RC_ERR("bad command line parameters") - - - if opts.dry: - print format_text("\n*** DRY RUN ***", 'bold') - - if opts.db: - stream_list = self.streams_db.get_stream_pack(opts.db) - rc = RC(stream_list != None) - rc.annotate("Load stream pack (from DB):") - if rc.bad(): - return RC_ERR("Failed to load stream pack") + return - else: - # load streams from file - stream_list = None - try: - stream_list = self.streams_db.load_yaml_file(opts.file[0]) - except Exception as e: - s = str(e) - rc=RC_ERR(s) - rc.annotate() - return rc + # pack the profile + profiles = [opts.file[0]] - rc = RC(stream_list != None) - rc.annotate("Load stream pack (from file):") - if stream_list == None: - return RC_ERR("Failed to load stream pack") + self.start(profiles, + opts.ports, + opts.mult, + opts.force, + opts.duration, + opts.dry, + opts.total) + # true means print time + return True - # total has no meaning with percentage - its linear - if opts.total and (opts.mult['type'] != 'percentage'): - # if total was set - divide it between the ports - opts.mult['value'] = opts.mult['value'] / len(opts.ports) - return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry) - @timing - def cmd_resume_line (self, line): - '''Resume active traffic in specified ports on TRex\n''' + @__console + def stop_line (self, line): + '''Stop active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, - "resume", - self.cmd_stop_line.__doc__, + "stop", + self.stop_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + # find the relevant ports + ports = list(set(self.get_active_ports()).intersection(opts.ports)) - return self.cmd_resume(opts.ports) + if not ports: + self.logger.log(format_text("No active traffic on provided ports\n", 'bold')) + return + self.stop(ports) - @timing - def cmd_stop_line (self, line): - '''Stop active traffic in specified ports on TRex\n''' + # true means print time + return True + + + @__console + def update_line (self, line): + '''Update port(s) speed currently active\n''' parser = parsing_opts.gen_parser(self, - "stop", - self.cmd_stop_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) + "update", + self.update_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL, + parsing_opts.MULTIPLIER, + parsing_opts.TOTAL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + # find the relevant ports + ports = list(set(self.get_active_ports()).intersection(opts.ports)) + + if not ports: + self.logger.log(format_text("No ports in valid state to update\n", 'bold')) + return + + self.update(ports, opts.mult, opts.total) - return self.cmd_stop(opts.ports) + # true means print time + return True - @timing - def cmd_pause_line (self, line): + @__console + def pause_line (self, line): '''Pause active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, "pause", - self.cmd_stop_line.__doc__, + self.pause_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + # find the relevant ports + ports = list(set(self.get_transmitting_ports()).intersection(opts.ports)) - return self.cmd_pause(opts.ports) + if not ports: + self.logger.log(format_text("No ports in valid state to pause\n", 'bold')) + return + self.pause(ports) - @timing - def cmd_update_line (self, line): - '''Update port(s) speed currently active\n''' + # true means print time + return True + + + @__console + def resume_line (self, line): + '''Resume active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, - "update", - self.cmd_update_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.MULTIPLIER, - parsing_opts.TOTAL) + "resume", + self.resume_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line paramters") + return - # total has no meaning with percentage - its linear - if opts.total and (opts.mult['type'] != 'percentage'): - # if total was set - divide it between the ports - opts.mult['value'] = opts.mult['value'] / len(opts.ports) + # find the relevant ports + ports = list(set(self.get_paused_ports()).intersection(opts.ports)) - return self.cmd_update(opts.ports, opts.mult) + if not ports: + self.logger.log(format_text("No ports in valid state to resume\n", 'bold')) + return - @timing - def cmd_reset_line (self, line): - return self.cmd_reset() + return self.resume(ports) + # true means print time + return True - def cmd_clear_line (self, line): + + @__console + def clear_stats_line (self, line): '''Clear cached local statistics\n''' # define a parser parser = parsing_opts.gen_parser(self, "clear", - self.cmd_clear_line.__doc__, + self.clear_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + self.clear_stats(opts.ports) - return self.cmd_clear(opts.ports) - def cmd_stats_line (self, line): + + @__console + def show_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "stats", - self.cmd_stats_line.__doc__, + self.show_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.STATS_MASK) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return # determine stats mask - mask = self._get_mask_keys(**self._filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) + mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) if not mask: # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS - stats = self.cmd_stats(opts.ports, mask) + stats_opts = trex_stats.ALL_STATS_OPTS.intersection(mask) + + stats = self._get_formatted_stats(opts.ports, mask) + # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) - return RC_OK() - def cmd_streams_line(self, line): + @__console + def show_streams_line(self, line): '''Fetch streams statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "streams", - self.cmd_streams_line.__doc__, + self.show_streams_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.STREAMS_MASK)#, - #parsing_opts.FULL_OUTPUT) + parsing_opts.STREAMS_MASK) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return - streams = self.cmd_streams(opts.ports, set(opts.streams)) + streams = self._get_streams(opts.ports, set(opts.streams)) if not streams: - # we got no streams running + self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta")) - print format_text("No streams found with desired filter.\n", "bold", "magenta") - return RC_ERR("No streams found with desired filter.") else: # print stats to screen for stream_hdr, port_streams_data in streams.iteritems(): text_tables.print_table_with_header(port_streams_data.text_table, header= stream_hdr.split(":")[0] + ":", untouched_header= stream_hdr.split(":")[1]) - return RC_OK() - @timing - def cmd_validate_line (self, line): + @__console + def validate_line (self, line): '''validates port(s) stream configuration\n''' parser = parsing_opts.gen_parser(self, "validate", - self.cmd_validate_line.__doc__, + self.validate_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line paramters") - - rc = self.cmd_validate(opts.ports) - return rc - - - def cmd_exit_line (self, line): - print format_text("Exiting\n", 'bold') - # a way to exit - return RC_ERR("exit") - - - def cmd_wait_line (self, line): - '''wait for a period of time\n''' - - parser = parsing_opts.gen_parser(self, - "wait", - self.cmd_wait_line.__doc__, - parsing_opts.DURATION) - - opts = parser.parse_args(line.split()) - if opts is None: - return RC_ERR("bad command line parameters") - - delay_sec = opts.duration if (opts.duration > 0) else 1 - - print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold') - time.sleep(delay_sec) - - return RC_OK() - - # run a script of commands - def run_script_file (self, filename): - - print format_text("\nRunning script file '{0}'...".format(filename), 'bold') - - rc = self.cmd_connect() - if rc.bad(): return - with open(filename) as f: - script_lines = f.readlines() - - cmd_table = {} - - # register all the commands - cmd_table['start'] = self.cmd_start_line - cmd_table['stop'] = self.cmd_stop_line - cmd_table['reset'] = self.cmd_reset_line - cmd_table['wait'] = self.cmd_wait_line - cmd_table['exit'] = self.cmd_exit_line - - for index, line in enumerate(script_lines, start = 1): - line = line.strip() - if line == "": - continue - if line.startswith("#"): - continue - - sp = line.split(' ', 1) - cmd = sp[0] - if len(sp) == 2: - args = sp[1] - else: - args = "" - - print format_text("Executing line {0} : '{1}'\n".format(index, line)) - - if not cmd in cmd_table: - print "\n*** Error at line {0} : '{1}'\n".format(index, line) - print format_text("unknown command '{0}'\n".format(cmd), 'bold') - return False - - rc = cmd_table[cmd](args) - if rc.bad(): - return False - - print format_text("\n[Done]", 'bold') - - return True - - - ################################# - # ------ private methods ------ # - @staticmethod - def _get_mask_keys(ok_values={True}, **kwargs): - masked_keys = set() - for key, val in kwargs.iteritems(): - if val in ok_values: - masked_keys.add(key) - return masked_keys - - @staticmethod - def _filter_namespace_args(namespace, ok_values): - return {k: v for k, v in namespace.__dict__.items() if k in ok_values} - - - ################################# - # ------ private classes ------ # - class CCommLink(object): - """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): - super(CTRexStatelessClient.CCommLink, self).__init__() - self.virtual = virtual - self.server = server - self.port = port - self.verbose = False - self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) - - @property - def is_connected(self): - if not self.virtual: - return self.rpc_link.connected - else: - return True - - def get_server (self): - return self.server - - def get_port (self): - return self.port - - def set_verbose(self, mode): - self.verbose = mode - return self.rpc_link.set_verbose(mode) - - def connect(self): - if not self.virtual: - return self.rpc_link.connect() - - def disconnect(self): - if not self.virtual: - return self.rpc_link.disconnect() - - def transmit(self, method_name, params={}): - if self.virtual: - self._prompt_virtual_tx_msg() - _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) - print msg - return - else: - return self.rpc_link.invoke_rpc_method(method_name, params) - - def transmit_batch(self, batch_list): - if self.virtual: - self._prompt_virtual_tx_msg() - print [msg - for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) - for command in batch_list]] - else: - batch = self.rpc_link.create_batch() - for command in batch_list: - batch.add(command.method, command.params) - # invoke the batch - return batch.invoke() + self.validate(opts.ports) - def _prompt_virtual_tx_msg(self): - print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, - port=self.port) -if __name__ == "__main__": - pass +
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index bdae7bd9..05a32bc4 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -42,8 +42,8 @@ class BatchMessage(object): # JSON RPC v2.0 client class JsonRpcClient(object): - def __init__ (self, default_server, default_port, prn_func = None): - self.verbose = False + def __init__ (self, default_server, default_port, logger): + self.logger = logger self.connected = False # default values @@ -51,7 +51,6 @@ class JsonRpcClient(object): self.server = default_server self.id_gen = general_utils.random_id_gen() - self.prn_func = prn_func def get_connection_details (self): rc = {} @@ -82,10 +81,7 @@ class JsonRpcClient(object): return pretty_str def verbose_msg (self, msg): - if not self.verbose: - return - - print "[verbose] " + msg + self.logger.log("[verbose] " + msg, level = self.logger.VERBOSE_HIGH) # batch messages @@ -128,7 +124,7 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 10: + if tries > 5: self.disconnect() return RC_ERR("*** [RPC] - Failed to send message to server") @@ -140,9 +136,9 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 10: + if tries > 5: self.disconnect() - return RC_ERR("*** [RPC] - Failed to get server response") + return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport)) self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -177,16 +173,14 @@ class JsonRpcClient(object): else: return RC_ERR(response_json["error"]["message"]) + # if no error there should be a result if ("result" not in response_json): return RC_ERR("Malformed Response ({0})".format(str(response_json))) return RC_OK(response_json["result"]) - - def set_verbose(self, mode): - self.verbose = mode def disconnect (self): if self.connected: @@ -198,7 +192,7 @@ class JsonRpcClient(object): return RC_ERR("Not connected to server") - def connect(self, server = None, port = None, prn_func = None): + def connect(self, server = None, port = None): if self.connected: self.disconnect() @@ -210,12 +204,6 @@ class JsonRpcClient(object): # Socket to talk to server self.transport = "tcp://{0}:{1}".format(self.server, self.port) - msg = "\nConnecting To RPC Server On {0}".format(self.transport) - if self.prn_func: - self.prn_func(msg) - else: - print msg - self.socket = self.context.socket(zmq.REQ) try: self.socket.connect(self.transport) @@ -245,7 +233,7 @@ class JsonRpcClient(object): return self.connected def __del__(self): - print "Shutting down RPC client\n" + self.logger.log("Shutting down RPC client\n") if hasattr(self, "context"): self.context.destroy(linger=0) diff --git a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py index 3735a45b..ba60c191 100755 --- a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py @@ -69,10 +69,19 @@ match_multiplier_help = """Multiplier should be passed in the following format: will provide a percentage of the line rate. examples : '-m 10', '-m 10kbps', '-m 10mpps', '-m 23%%' """ -def match_multiplier_common(val, strict_abs = True): - # on strict absolute we do not allow +/- - if strict_abs: +# decodes multiplier +# if allow_update - no +/- is allowed +# divide states between how many entities the +# value should be divided +def decode_multiplier(val, allow_update = False, divide_count = 1): + + # must be string + if not isinstance(val, str): + return None + + # do we allow updates ? +/- + if not allow_update: match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val) op = None else: @@ -136,19 +145,32 @@ def match_multiplier_common(val, strict_abs = True): else: result['op'] = "abs" + if result['op'] != 'percentage': + result['value'] = result['value'] / divide_count + return result else: - raise argparse.ArgumentTypeError(match_multiplier_help) + return None def match_multiplier(val): '''match some val against multiplier shortcut inputs ''' - return match_multiplier_common(val, strict_abs = False) + result = decode_multiplier(val, allow_update = True) + if not result: + raise argparse.ArgumentTypeError(match_multiplier_help) + + return val + def match_multiplier_strict(val): '''match some val against multiplier shortcut inputs ''' - return match_multiplier_common(val, strict_abs = True) + result = decode_multiplier(val, allow_update = False) + if not result: + raise argparse.ArgumentTypeError(match_multiplier_help) + + return val + def is_valid_file(filename): if not os.path.isfile(filename): @@ -230,6 +252,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'default': False, 'help': "Starts TUI in xterm window"}), + FULL_OUTPUT: ArgumentPack(['--full'], {'action': 'store_true', 'help': "Prompt full info in a JSON format"}), @@ -284,12 +307,12 @@ class CCmdArgParser(argparse.ArgumentParser): # if all ports are marked or if (getattr(opts, "all_ports", None) == True) or (getattr(opts, "ports", None) == []): - opts.ports = self.stateless_client.get_port_ids() + opts.ports = self.stateless_client.get_all_ports() # so maybe we have ports configured - elif (getattr(opts, "ports", None) == []): + elif getattr(opts, "ports", None): for port in opts.ports: - if not self.stateless_client.validate_port_list([port]): + if not self.stateless_client._validate_port_list([port]): self.error("port id '{0}' is not a valid port id\n".format(port)) return opts diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py index a6add4ac..3f64310f 100755 --- a/scripts/automation/trex_control_plane/common/trex_stats.py +++ b/scripts/automation/trex_control_plane/common/trex_stats.py @@ -59,7 +59,7 @@ def calculate_diff_raw (samples): class CTRexInfoGenerator(object): """ This object is responsible of generating stats and information from objects maintained at - CTRexStatelessClient and the ports. + STLClient and the ports. """ def __init__(self, global_stats_ref, ports_dict_ref): @@ -260,7 +260,7 @@ class CTRexStats(object): def __init__(self): self.reference_stats = None - self.latest_stats = {} + self.latest_stats = None self.last_update_ts = time.time() self.history = deque(maxlen = 10) @@ -314,9 +314,11 @@ class CTRexStats(object): self.last_update_ts = time.time() + def clear_stats(self): self.reference_stats = self.latest_stats + def invalidate (self): self.latest_stats = {} @@ -333,6 +335,10 @@ class CTRexStats(object): return "N/A" if not format: + if not field in self.reference_stats: + print "REF: " + str(self.reference_stats) + print "BASE: " + str(self.latest_stats) + return (self.latest_stats[field] - self.reference_stats[field]) else: return format_num(self.latest_stats[field] - self.reference_stats[field], suffix) @@ -399,6 +405,24 @@ class CGlobalStats(CTRexStats): self.server_version = server_version self._ports_dict = ports_dict_ref + def get_stats (self): + stats = {} + + # absolute + stats['cpu_util'] = self.get("m_cpu_util") + stats['tx_bps'] = self.get("m_tx_bps") + stats['tx_pps'] = self.get("m_tx_pps") + + stats['rx_bps'] = self.get("m_rx_bps") + stats['rx_pps'] = self.get("m_rx_pps") + stats['rx_drop_bps'] = self.get("m_rx_drop_bps") + + # relatives + stats['queue_full'] = self.get_rel("m_total_queue_full") + + return stats + + def generate_stats(self): return OrderedDict([("connection", "{host}, Port {port}".format(host=self.connection_info.get("server"), port=self.connection_info.get("sync_port"))), @@ -453,6 +477,9 @@ class CPortStats(CTRexStats): raise TypeError("cannot add non stats object to stats") # main stats + if not self.latest_stats: + self.latest_stats = {} + self.__merge_dicts(self.latest_stats, x.latest_stats) # reference stats @@ -471,6 +498,23 @@ class CPortStats(CTRexStats): return self + # for port we need to do something smarter + def get_stats (self): + stats = {} + + stats['opackets'] = self.get_rel("opackets") + stats['ipackets'] = self.get_rel("ipackets") + stats['obytes'] = self.get_rel("obytes") + stats['ibytes'] = self.get_rel("ibytes") + stats['oerrors'] = self.get_rel("oerrors") + stats['ierrors'] = self.get_rel("ierrors") + stats['tx_bps'] = self.get("m_total_tx_bps") + stats['tx_pps'] = self.get("m_total_tx_pps") + stats['rx_bps'] = self.get("m_total_rx_bps") + stats['rx_pps'] = self.get("m_total_rx_pps") + + return stats + def generate_stats(self): diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py index 800b6d49..ea3d71d1 100755 --- a/scripts/automation/trex_control_plane/common/trex_streams.py +++ b/scripts/automation/trex_control_plane/common/trex_streams.py @@ -210,7 +210,6 @@ class CStream(object): setattr(self, k, kwargs[k]) # TODO: load to _pkt_bld_obj also when passed as byte array! elif isinstance(binary, str) and binary.endswith(".pcap"): - # self.load_packet_from_pcap(binary, kwargs[k]["meta"]) self._pkt_bld_obj.load_packet_from_pcap(binary) self._pkt_bld_obj.metadata = kwargs[k]["meta"] self.packet = self._pkt_bld_obj.dump_pkt() diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py index 7c3f04c5..a7ddacea 100644 --- a/scripts/automation/trex_control_plane/common/trex_types.py +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -14,12 +14,16 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'] # simple class to represent complex return value class RC(): - def __init__ (self, rc = None, data = None): + def __init__ (self, rc = None, data = None, is_warn = False): self.rc_list = [] - if (rc != None) and (data != None): - tuple_rc = namedtuple('RC', ['rc', 'data']) - self.rc_list.append(tuple_rc(rc, data)) + if (rc != None): + tuple_rc = namedtuple('RC', ['rc', 'data', 'is_warn']) + self.rc_list.append(tuple_rc(rc, data, is_warn)) + + def __nonzero__ (self): + return self.good() + def add (self, rc): self.rc_list += rc.rc_list @@ -30,39 +34,62 @@ class RC(): def bad (self): return not self.good() + def warn (self): + return any([x.is_warn for x in self.rc_list]) + def data (self): d = [x.data if x.rc else "" for x in self.rc_list] - return (d if len(d) > 1 else d[0]) + return (d if len(d) != 1 else d[0]) def err (self): e = [x.data if not x.rc else "" for x in self.rc_list] - return (e if len(e) > 1 else e[0]) + return (e if len(e) != 1 else e[0]) + + def __str__ (self): + s = "" + for x in self.rc_list: + if x.data: + s += format_text("\n{0}".format(x.data), 'bold') + return s + + def prn_func (self, msg, newline = True): + if newline: + print msg + else: + print msg, + + def annotate (self, log_func = None, desc = None, show_status = True): + + if not log_func: + log_func = self.prn_func - def annotate (self, desc = None, show_status = True): if desc: - print format_text('\n{:<60}'.format(desc), 'bold'), + log_func(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) else: - print "" + log_func("") if self.bad(): # print all the errors print "" for x in self.rc_list: if not x.rc: - print format_text("\n{0}".format(x.data), 'bold') + log_func(format_text("\n{0}".format(x.data), 'bold')) print "" if show_status: - print format_text("[FAILED]\n", 'red', 'bold') + log_func(format_text("[FAILED]\n", 'red', 'bold')) else: if show_status: - print format_text("[SUCCESS]\n", 'green', 'bold') + log_func(format_text("[SUCCESS]\n", 'green', 'bold')) def RC_OK(data = ""): return RC(True, data) + def RC_ERR (err): return RC(False, err) +def RC_WARN (warn): + return RC(True, warn, is_warn = True) diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 2672665c..88ff45dc 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -29,7 +29,7 @@ import sys import tty, termios import trex_root_path from common.trex_streams import * -from client.trex_stateless_client import CTRexStatelessClient +from client.trex_stateless_client import STLClient, LoggerApi, STLError from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts @@ -39,6 +39,28 @@ from functools import wraps __version__ = "1.1" +# console custom logger +class ConsoleLogger(LoggerApi): + def __init__ (self): + self.prompt_redraw = None + + def write (self, msg, newline = True): + if newline: + print msg + else: + print msg, + + def flush (self): + sys.stdout.flush() + + # override this for the prompt fix + def async_log (self, msg, level = LoggerApi.VERBOSE_REGULAR, newline = True): + self.log(msg, level, newline) + if self.prompt_redraw: + self.prompt_redraw() + self.flush() + + def set_window_always_on_top (title): # we need the GDK module, if not available - ignroe this command try: @@ -133,9 +155,9 @@ class TRexGeneralCmd(cmd.Cmd): class TRexConsole(TRexGeneralCmd): """Trex Console""" - def __init__(self, stateless_client, verbose=False): + def __init__(self, stateless_client, verbose = False): + self.stateless_client = stateless_client - self.stateless_client.set_prompt_redraw_cb(self.prompt_redraw) TRexGeneralCmd.__init__(self) @@ -153,7 +175,10 @@ class TRexConsole(TRexGeneralCmd): ################### internal section ######################## def prompt_redraw (self): - sys.stdout.write(self.prompt + readline.get_line_buffer()) + self.postcmd(False, "") + sys.stdout.write("\n" + self.prompt + readline.get_line_buffer()) + sys.stdout.flush() + def verify_connected(f): @wraps(f) @@ -185,7 +210,7 @@ class TRexConsole(TRexGeneralCmd): print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold') return - if inst.stateless_client.is_read_only(): + if inst.stateless_client.is_all_ports_acquired(): print format_text("\n'{0}' cannot be executed on read only mode\n".format(func_name), 'bold') return @@ -197,7 +222,7 @@ class TRexConsole(TRexGeneralCmd): def get_console_identifier(self): return "{context}_{server}".format(context=self.__class__.__name__, - server=self.stateless_client.get_server_ip()) + server=self.stateless_client.get_connection_info()['server']) def register_main_console_methods(self): main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__))) @@ -229,7 +254,7 @@ class TRexConsole(TRexGeneralCmd): self.supported_rpc = None return stop - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): self.prompt = "TRex (read only) > " return stop @@ -264,44 +289,12 @@ class TRexConsole(TRexGeneralCmd): return targets - # annotation method - @staticmethod - def annotate (desc, rc = None, err_log = None, ext_err_msg = None): - print format_text('\n{:<40}'.format(desc), 'bold'), - if rc == None: - print "\n" - return - - if rc == False: - # do we have a complex log object ? - if isinstance(err_log, list): - print "" - for func in err_log: - if func: - print func - print "" - - elif isinstance(err_log, str): - print "\n" + err_log + "\n" - - print format_text("[FAILED]\n", 'red', 'bold') - if ext_err_msg: - print format_text(ext_err_msg + "\n", 'blue', 'bold') - - return False - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - ####################### shell commands ####################### @verify_connected def do_ping (self, line): '''Ping the server\n''' - rc = self.stateless_client.cmd_ping() - if rc.bad(): - return + self.stateless_client.ping() # set verbose on / off @@ -312,12 +305,12 @@ class TRexConsole(TRexGeneralCmd): elif line == "on": self.verbose = True - self.stateless_client.set_verbose(self.stateless_client.VERBOSE_HIGH) + self.stateless_client.set_verbose(self.stateless_client.logger.VERBOSE_HIGH) print format_text("\nverbose set to on\n", 'green', 'bold') elif line == "off": self.verbose = False - self.stateless_client.set_verbose(self.stateless_client.VERBOSE_REGULAR) + self.stateless_client.set_verbose(self.stateless_client.logger.VERBOSE_REGULAR) print format_text("\nverbose set to off\n", 'green', 'bold') else: @@ -361,13 +354,13 @@ class TRexConsole(TRexGeneralCmd): def do_connect (self, line): '''Connects to the server\n''' - self.stateless_client.cmd_connect_line(line) + self.stateless_client.connect_line(line) def do_disconnect (self, line): '''Disconnect from the server\n''' - self.stateless_client.cmd_disconnect() + self.stateless_client.disconnect_line(line) ############### start @@ -388,7 +381,7 @@ class TRexConsole(TRexGeneralCmd): def do_start(self, line): '''Start selected traffic in specified port(s) on TRex\n''' - self.stateless_client.cmd_start_line(line) + self.stateless_client.start_line(line) @@ -401,7 +394,7 @@ class TRexConsole(TRexGeneralCmd): def do_stop(self, line): '''stops port(s) transmitting traffic\n''' - self.stateless_client.cmd_stop_line(line) + self.stateless_client.stop_line(line) def help_stop(self): self.do_stop("-h") @@ -411,7 +404,7 @@ class TRexConsole(TRexGeneralCmd): def do_update(self, line): '''update speed of port(s)currently transmitting traffic\n''' - self.stateless_client.cmd_update_line(line) + self.stateless_client.update_line(line) def help_update (self): self.do_update("-h") @@ -421,14 +414,14 @@ class TRexConsole(TRexGeneralCmd): def do_pause(self, line): '''pause port(s) transmitting traffic\n''' - self.stateless_client.cmd_pause_line(line) + self.stateless_client.pause_line(line) ############# resume @verify_connected_and_rw def do_resume(self, line): '''resume port(s) transmitting traffic\n''' - self.stateless_client.cmd_resume_line(line) + self.stateless_client.resume_line(line) @@ -436,7 +429,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected_and_rw def do_reset (self, line): '''force stop all ports\n''' - self.stateless_client.cmd_reset_line(line) + self.stateless_client.reset_line(line) ######### validate @@ -444,13 +437,13 @@ class TRexConsole(TRexGeneralCmd): def do_validate (self, line): '''validates port(s) stream configuration\n''' - self.stateless_client.cmd_validate_line(line) + self.stateless_client.validate_line(line) @verify_connected def do_stats(self, line): '''Fetch statistics from TRex server by port\n''' - self.stateless_client.cmd_stats_line(line) + self.stateless_client.show_stats_line(line) def help_stats(self): @@ -459,7 +452,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_streams(self, line): '''Fetch statistics from TRex server by port\n''' - self.stateless_client.cmd_streams_line(line) + self.stateless_client.show_streams_line(line) def help_streams(self): @@ -468,7 +461,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_clear(self, line): '''Clear cached local statistics\n''' - self.stateless_client.cmd_clear_line(line) + self.stateless_client.clear_stats_line(line) def help_clear(self): @@ -520,20 +513,17 @@ class TRexConsole(TRexGeneralCmd): if opts.xterm: - exe = './trex-console -t -q -s {0} -p {1}'.format(self.stateless_client.get_server_ip(), self.stateless_client.get_server_port()) + info = self.stateless_client.get_connection_info() + + exe = './trex-console -t -q -s {0} -p {1} --async_port {2}'.format(info['server'], info['sync_port'], info['async_port']) cmd = ['xterm', '-geometry', '111x42', '-sl', '0', '-title', 'trex_tui', '-e', exe] self.terminal = subprocess.Popen(cmd) return - set_window_always_on_top('trex_tui') - - save_verbose = self.stateless_client.get_verbose() - - self.stateless_client.set_verbose(self.stateless_client.VERBOSE_QUIET) - self.tui.show() - self.stateless_client.set_verbose(save_verbose) + with self.stateless_client.logger.supress(): + self.tui.show() def help_tui (self): @@ -605,6 +595,49 @@ class TRexConsole(TRexGeneralCmd): do_h = do_history +# run a script of commands +def run_script_file (self, filename, stateless_client): + + self.logger.log(format_text("\nRunning script file '{0}'...".format(filename), 'bold')) + + with open(filename) as f: + script_lines = f.readlines() + + cmd_table = {} + + # register all the commands + cmd_table['start'] = stateless_client.start_line + cmd_table['stop'] = stateless_client.stop_line + cmd_table['reset'] = stateless_client.reset_line + + for index, line in enumerate(script_lines, start = 1): + line = line.strip() + if line == "": + continue + if line.startswith("#"): + continue + + sp = line.split(' ', 1) + cmd = sp[0] + if len(sp) == 2: + args = sp[1] + else: + args = "" + + stateless_client.logger.log(format_text("Executing line {0} : '{1}'\n".format(index, line))) + + if not cmd in cmd_table: + print "\n*** Error at line {0} : '{1}'\n".format(index, line) + stateless_client.logger.log(format_text("unknown command '{0}'\n".format(cmd), 'bold')) + return False + + cmd_table[cmd](args) + + stateless_client.logger.log(format_text("\n[Done]", 'bold')) + + return True + + # def is_valid_file(filename): if not os.path.isfile(filename): @@ -613,6 +646,7 @@ def is_valid_file(filename): return filename + def setParserOptions(): parser = argparse.ArgumentParser(prog="trex_console.py") @@ -633,7 +667,7 @@ def setParserOptions(): default = get_current_user(), type = str) - parser.add_argument("--verbose", dest="verbose", + parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="Switch ON verbose option. Default is: OFF.", default = False) @@ -665,34 +699,50 @@ def main(): options = parser.parse_args() # Stateless client connection - stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub, options.quiet) + if options.quiet: + verbose_level = LoggerApi.VERBOSE_QUIET + elif options.verbose: + verbose_level = LoggerApi.VERBOSE_HIGH + else: + verbose_level = LoggerApi.VERBOSE_REGULAR - if not options.quiet: - print "\nlogged as {0}".format(format_text(options.user, 'bold')) + # Stateless client connection + logger = ConsoleLogger() + stateless_client = STLClient(username = options.user, + server = options.server, + sync_port = options.port, + async_port = options.pub, + verbose_level = verbose_level, + logger = logger) # TUI or no acquire will give us READ ONLY mode - if options.tui or not options.acquire: - rc = stateless_client.connect("RO") - else: - rc = stateless_client.connect("RW") - - # unable to connect - bye - if rc.bad(): - rc.annotate() + try: + stateless_client.connect("RO") + except STLError as e: + logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) return + if not options.tui and options.acquire: + try: + stateless_client.acquire() + except STLError as e: + logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) + logger.log(format_text("\nSwitching to read only mode - only few commands will be available", 'bold')) + # a script mode if options.batch: - cont = stateless_client.run_script_file(options.batch[0]) + cont = run_script_file(options.batch[0], stateless_client) if not cont: return # console - try: console = TRexConsole(stateless_client, options.verbose) + logger.prompt_redraw = console.prompt_redraw + if options.tui: + set_window_always_on_top('trex_tui') console.do_tui("") else: console.start() @@ -701,7 +751,7 @@ def main(): print "\n\n*** Caught Ctrl + C... Exiting...\n\n" finally: - stateless_client.disconnect() + stateless_client.teardown(stop_traffic = False) if __name__ == '__main__': diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py index cdf3fb69..45769693 100644 --- a/scripts/automation/trex_control_plane/console/trex_status.py +++ b/scripts/automation/trex_control_plane/console/trex_status.py @@ -1,525 +1,525 @@ -from time import sleep - -import os - -import curses -from curses import panel -import random -import collections -import operator -import datetime - -g_curses_active = False - -################### utils ################# - -# simple percetange show -def percentage (a, total): - x = int ((float(a) / total) * 100) - return str(x) + "%" - -################### panels ################# - -# panel object -class TrexStatusPanel(object): - def __init__ (self, h, l, y, x, headline, status_obj): - - self.status_obj = status_obj - - self.log = status_obj.log - self.stateless_client = status_obj.stateless_client - - self.stats = status_obj.stats - self.general_stats = status_obj.general_stats - - self.h = h - self.l = l - self.y = y - self.x = x - self.headline = headline - - self.win = curses.newwin(h, l, y, x) - self.win.erase() - self.win.box() - - self.win.addstr(1, 2, headline, curses.A_UNDERLINE) - self.win.refresh() - - panel.new_panel(self.win) - self.panel = panel.new_panel(self.win) - self.panel.top() - - def clear (self): - self.win.erase() - self.win.box() - self.win.addstr(1, 2, self.headline, curses.A_UNDERLINE) - - def getwin (self): - return self.win - - -# various kinds of panels - -# Server Info Panel -class ServerInfoPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj): - - super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj) - - def draw (self): - - if not self.status_obj.server_version : - return - - if not self.status_obj.server_sys_info: - return - - - self.clear() - - self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port()))) - self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"])) - self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:", - self.status_obj.server_version["build_date"] + " @ " + - self.status_obj.server_version["build_time"] + " by " + - self.status_obj.server_version["built_by"])) - - self.getwin().addstr(6, 2, "{:<30} {:30}".format("Server Uptime:", self.status_obj.server_sys_info["uptime"])) - self.getwin().addstr(7, 2, "{:<30} {:<3} / {:<30}".format("DP Cores:", str(self.status_obj.server_sys_info["dp_core_count"]) + - " cores", self.status_obj.server_sys_info["core_type"])) - - self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"])) - - ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports_list) - - if not ports_owned: - ports_owned = "None" - - self.getwin().addstr(10, 2, "{:<30} {:<30}".format("Ports Owned:", ports_owned)) - -# general info panel -class GeneralInfoPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj): - - super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj) - - def draw (self): - self.clear() - - if not self.general_stats.is_online(): - self.getwin().addstr(3, 2, "No Published Data From TRex Server") - return - - self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util"))) - - self.getwin().addstr(6, 2, "{:<30} {:} / {:}".format("Total Tx. rate:", - self.general_stats.get("m_tx_bps", format = True, suffix = "bps"), - self.general_stats.get("m_tx_pps", format = True, suffix = "pps"))) - - - self.getwin().addstr(8, 2, "{:<30} {:} / {:}".format("Total Tx:", - self.general_stats.get_rel("m_total_tx_bytes", format = True, suffix = "B"), - self.general_stats.get_rel("m_total_tx_pkts", format = True, suffix = "pkts"))) - - self.getwin().addstr(11, 2, "{:<30} {:} / {:}".format("Total Rx. rate:", - self.general_stats.get("m_rx_bps", format = True, suffix = "bps"), - self.general_stats.get("m_rx_pps", format = True, suffix = "pps"))) - - - self.getwin().addstr(13, 2, "{:<30} {:} / {:}".format("Total Rx:", - self.general_stats.get_rel("m_total_rx_bytes", format = True, suffix = "B"), - self.general_stats.get_rel("m_total_rx_pkts", format = True, suffix = "pkts"))) - -# all ports stats -class PortsStatsPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj): - - super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj) - - - def draw (self): - - self.clear() - - owned_ports = self.status_obj.owned_ports_list - if not owned_ports: - self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports") - return - - # table header - self.getwin().addstr(3, 2, "{:^15} {:^30} {:^30} {:^30}".format( - "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) - - - - for i, port_index in enumerate(owned_ports): - - port_stats = self.status_obj.stats.get_port_stats(port_index) - - if port_stats: - self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( - "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), - "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), - port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), - - "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), - port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), - "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), - port_stats.get_rel("ibytes", format = True, suffix = "B")))) - - else: - - self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( - "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), - "N/A", - "N/A", - "N/A", - "N/A")) - - - # old format +#from time import sleep +# +#import os +# +#import curses +#from curses import panel +#import random +#import collections +#import operator +#import datetime +# +#g_curses_active = False +# +#################### utils ################# +# +## simple percetange show +#def percentage (a, total): +# x = int ((float(a) / total) * 100) +# return str(x) + "%" +# +#################### panels ################# +# +## panel object +#class TrexStatusPanel(object): +# def __init__ (self, h, l, y, x, headline, status_obj): +# +# self.status_obj = status_obj +# +# self.log = status_obj.log +# self.stateless_client = status_obj.stateless_client +# +# self.stats = status_obj.stats +# self.general_stats = status_obj.general_stats +# +# self.h = h +# self.l = l +# self.y = y +# self.x = x +# self.headline = headline +# +# self.win = curses.newwin(h, l, y, x) +# self.win.erase() +# self.win.box() +# +# self.win.addstr(1, 2, headline, curses.A_UNDERLINE) +# self.win.refresh() +# +# panel.new_panel(self.win) +# self.panel = panel.new_panel(self.win) +# self.panel.top() +# +# def clear (self): +# self.win.erase() +# self.win.box() +# self.win.addstr(1, 2, self.headline, curses.A_UNDERLINE) +# +# def getwin (self): +# return self.win +# +# +## various kinds of panels +# +## Server Info Panel +#class ServerInfoPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj): +# +# super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj) +# +# def draw (self): +# +# if not self.status_obj.server_version : +# return +# +# if not self.status_obj.server_sys_info: +# return +# +# +# self.clear() +# +# self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port()))) +# self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"])) +# self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:", +# self.status_obj.server_version["build_date"] + " @ " + +# self.status_obj.server_version["build_time"] + " by " + +# self.status_obj.server_version["built_by"])) +# +# self.getwin().addstr(6, 2, "{:<30} {:30}".format("Server Uptime:", self.status_obj.server_sys_info["uptime"])) +# self.getwin().addstr(7, 2, "{:<30} {:<3} / {:<30}".format("DP Cores:", str(self.status_obj.server_sys_info["dp_core_count"]) + +# " cores", self.status_obj.server_sys_info["core_type"])) +# +# self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"])) +# +# ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports_list) +# +# if not ports_owned: +# ports_owned = "None" +# +# self.getwin().addstr(10, 2, "{:<30} {:<30}".format("Ports Owned:", ports_owned)) +# +## general info panel +#class GeneralInfoPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj): +# +# super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj) +# +# def draw (self): +# self.clear() +# +# if not self.general_stats.is_online(): +# self.getwin().addstr(3, 2, "No Published Data From TRex Server") +# return +# +# self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util"))) +# +# self.getwin().addstr(6, 2, "{:<30} {:} / {:}".format("Total Tx. rate:", +# self.general_stats.get("m_tx_bps", format = True, suffix = "bps"), +# self.general_stats.get("m_tx_pps", format = True, suffix = "pps"))) +# +# +# self.getwin().addstr(8, 2, "{:<30} {:} / {:}".format("Total Tx:", +# self.general_stats.get_rel("m_total_tx_bytes", format = True, suffix = "B"), +# self.general_stats.get_rel("m_total_tx_pkts", format = True, suffix = "pkts"))) +# +# self.getwin().addstr(11, 2, "{:<30} {:} / {:}".format("Total Rx. rate:", +# self.general_stats.get("m_rx_bps", format = True, suffix = "bps"), +# self.general_stats.get("m_rx_pps", format = True, suffix = "pps"))) +# +# +# self.getwin().addstr(13, 2, "{:<30} {:} / {:}".format("Total Rx:", +# self.general_stats.get_rel("m_total_rx_bytes", format = True, suffix = "B"), +# self.general_stats.get_rel("m_total_rx_pkts", format = True, suffix = "pkts"))) +# +## all ports stats +#class PortsStatsPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj): +# +# super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj) +# +# +# def draw (self): +# +# self.clear() +# +# owned_ports = self.status_obj.owned_ports_list +# if not owned_ports: +# self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports") +# return +# +# # table header +# self.getwin().addstr(3, 2, "{:^15} {:^30} {:^30} {:^30}".format( +# "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) +# +# +# +# for i, port_index in enumerate(owned_ports): +# +# port_stats = self.status_obj.stats.get_port_stats(port_index) +# # if port_stats: -# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( # "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), -# port_stats.get("m_total_tx_pps", format = True, suffix = "pps"), -# port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), -# port_stats.get_rel("obytes", format = True, suffix = "B"), -# port_stats.get("m_total_rx_pps", format = True, suffix = "pps"), -# port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), -# port_stats.get_rel("ibytes", format = True, suffix = "B"))) +# "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), +# port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), +# +# "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), +# port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), +# "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), +# port_stats.get_rel("ibytes", format = True, suffix = "B")))) # # else: -# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# +# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( # "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), # "N/A", # "N/A", # "N/A", -# "N/A", -# "N/A", # "N/A")) - -# control panel -class ControlPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj): - - super(ControlPanel, self).__init__(h, l, y, x, "", status_obj) - - - def draw (self): - self.clear() - - self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit" - .format(self.status_obj.stateless_client.get_port_count() - 1)) - - self.log.draw(self.getwin(), 2, 3) - -# specific ports panels -class SinglePortPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj, port_id): - - super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj) - - self.port_id = port_id - - def draw (self): - y = 3 - - self.clear() - - if not self.port_id in self.status_obj.owned_ports_list: - self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id)) - return - - # streams - self.getwin().addstr(y, 2, "Streams:", curses.A_UNDERLINE) - y += 2 - - # stream table header - self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( - "Stream ID", "Enabled", "Type", "Self Start", "ISG", "Next Stream", "VM")) - y += 2 - - # streams - - if 'streams' in self.status_obj.owned_ports[str(self.port_id)]: - stream_info = self.status_obj.owned_ports[str(self.port_id)]['streams'] - - for stream_id, stream in sorted(stream_info.iteritems(), key=operator.itemgetter(0)): - self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( - stream_id, - ("True" if stream['enabled'] else "False"), - stream['mode']['type'], - ("True" if stream['self_start'] else "False"), - stream['isg'], - (stream['next_stream_id'] if stream['next_stream_id'] != -1 else "None"), - ("{0} instr.".format(len(stream['vm'])) if stream['vm'] else "None"))) - - y += 1 - - # new section - traffic - y += 2 - - self.getwin().addstr(y, 2, "Traffic:", curses.A_UNDERLINE) - y += 2 - - - - # table header - self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( - "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) - - - y += 2 - - port_stats = self.status_obj.stats.get_port_stats(self.port_id) - - if port_stats: - self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( - "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), - "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), - port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), - - "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), - port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), - "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), - port_stats.get_rel("ibytes", format = True, suffix = "B")))) - - else: - self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( - "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), - "N/A", - "N/A", - "N/A", - "N/A")) - - -################### main objects ################# - -# status log -class TrexStatusLog(): - def __init__ (self): - self.log = [] - - def add_event (self, msg): - self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) - - def draw (self, window, x, y, max_lines = 4): - index = y - - cut = len(self.log) - max_lines - if cut < 0: - cut = 0 - - for msg in self.log[cut:]: - window.addstr(index, x, msg) - index += 1 - -# status commands -class TrexStatusCommands(): - def __init__ (self, status_object): - - self.status_object = status_object - - self.stateless_client = status_object.stateless_client - self.log = self.status_object.log - - self.actions = {} - self.actions[ord('q')] = self._quit - self.actions[ord('p')] = self._ping - self.actions[ord('f')] = self._freeze - - self.actions[ord('g')] = self._show_ports_stats - - # register all the available ports shortcuts - for port_id in xrange(0, self.stateless_client.get_port_count()): - self.actions[ord('0') + port_id] = self._show_port_generator(port_id) - - - # handle a key pressed - def handle (self, ch): - if ch in self.actions: - return self.actions[ch]() - else: - self.log.add_event("Unknown key pressed, please see legend") - return True - - # show all ports - def _show_ports_stats (self): - self.log.add_event("Switching to all ports view") - self.status_object.stats_panel = self.status_object.ports_stats_panel - - return True - - - # function generator for different ports requests - def _show_port_generator (self, port_id): - def _show_port(): - self.log.add_event("Switching panel to port {0}".format(port_id)) - self.status_object.stats_panel = self.status_object.ports_panels[port_id] - - return True - - return _show_port - - def _freeze (self): - self.status_object.update_active = not self.status_object.update_active - self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped") - - return True - - def _quit(self): - return False - - def _ping (self): - self.log.add_event("Pinging RPC server") - - rc, msg = self.stateless_client.ping() - if rc: - self.log.add_event("Server replied: '{0}'".format(msg)) - else: - self.log.add_event("Failed to get reply") - - return True - -# status object -# -# -# -class CTRexStatus(): - def __init__ (self, stdscr, stateless_client): - self.stdscr = stdscr - - self.stateless_client = stateless_client - - self.log = TrexStatusLog() - self.cmds = TrexStatusCommands(self) - - self.stats = stateless_client.get_stats_async() - self.general_stats = stateless_client.get_stats_async().get_general_stats() - - # fetch server info - self.server_sys_info = self.stateless_client.get_system_info() - - self.server_version = self.stateless_client.get_version() - - # list of owned ports - self.owned_ports_list = self.stateless_client.get_acquired_ports() - - # data per port - self.owned_ports = {} - - for port_id in self.owned_ports_list: - self.owned_ports[str(port_id)] = {} - self.owned_ports[str(port_id)]['streams'] = {} - - stream_list = self.stateless_client.get_all_streams(port_id) - - self.owned_ports[str(port_id)] = stream_list - - - try: - curses.curs_set(0) - except: - pass - - curses.use_default_colors() - self.stdscr.nodelay(1) - curses.nonl() - curses.noecho() - - self.generate_layout() - - - def generate_layout (self): - self.max_y = self.stdscr.getmaxyx()[0] - self.max_x = self.stdscr.getmaxyx()[1] - - self.server_info_panel = ServerInfoPanel(int(self.max_y * 0.3), self.max_x / 2, int(self.max_y * 0.5), self.max_x /2, self) - self.general_info_panel = GeneralInfoPanel(int(self.max_y * 0.5), self.max_x / 2, 0, self.max_x /2, self) - self.control_panel = ControlPanel(int(self.max_y * 0.2), self.max_x , int(self.max_y * 0.8), 0, self) - - # those can be switched on the same place - self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self) - - self.ports_panels = {} - for i in xrange(0, self.stateless_client.get_port_count()): - self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i) - - # at start time we point to the main one - self.stats_panel = self.ports_stats_panel - self.stats_panel.panel.top() - - panel.update_panels(); self.stdscr.refresh() - return - - - def wait_for_key_input (self): - ch = self.stdscr.getch() - - # no key , continue - if ch == curses.ERR: - return True - - return self.cmds.handle(ch) - - # main run entry point - def run (self): - - # list of owned ports - self.owned_ports_list = self.stateless_client.get_acquired_ports() - - # data per port - self.owned_ports = {} - - for port_id in self.owned_ports_list: - self.owned_ports[str(port_id)] = {} - self.owned_ports[str(port_id)]['streams'] = {} - - stream_list = self.stateless_client.get_all_streams(port_id) - - self.owned_ports[str(port_id)] = stream_list - - self.update_active = True - while (True): - - rc = self.wait_for_key_input() - if not rc: - break - - self.server_info_panel.draw() - self.general_info_panel.draw() - self.control_panel.draw() - - # can be different kinds of panels - self.stats_panel.panel.top() - self.stats_panel.draw() - - panel.update_panels() - self.stdscr.refresh() - sleep(0.01) - - -# global container -trex_status = None - -def show_trex_status_internal (stdscr, stateless_client): - global trex_status - - if trex_status == None: - trex_status = CTRexStatus(stdscr, stateless_client) - - trex_status.run() - -def show_trex_status (stateless_client): - - try: - curses.wrapper(show_trex_status_internal, stateless_client) - except KeyboardInterrupt: - curses.endwin() - -def cleanup (): - try: - curses.endwin() - except: - pass - +# +# +# # old format +## if port_stats: +## self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +## "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), +## port_stats.get("m_total_tx_pps", format = True, suffix = "pps"), +## port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), +## port_stats.get_rel("obytes", format = True, suffix = "B"), +## port_stats.get("m_total_rx_pps", format = True, suffix = "pps"), +## port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), +## port_stats.get_rel("ibytes", format = True, suffix = "B"))) +## +## else: +## self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +## "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), +## "N/A", +## "N/A", +## "N/A", +## "N/A", +## "N/A", +## "N/A")) +# +## control panel +#class ControlPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj): +# +# super(ControlPanel, self).__init__(h, l, y, x, "", status_obj) +# +# +# def draw (self): +# self.clear() +# +# self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit" +# .format(self.status_obj.stateless_client.get_port_count() - 1)) +# +# self.log.draw(self.getwin(), 2, 3) +# +## specific ports panels +#class SinglePortPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj, port_id): +# +# super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj) +# +# self.port_id = port_id +# +# def draw (self): +# y = 3 +# +# self.clear() +# +# if not self.port_id in self.status_obj.owned_ports_list: +# self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id)) +# return +# +# # streams +# self.getwin().addstr(y, 2, "Streams:", curses.A_UNDERLINE) +# y += 2 +# +# # stream table header +# self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# "Stream ID", "Enabled", "Type", "Self Start", "ISG", "Next Stream", "VM")) +# y += 2 +# +# # streams +# +# if 'streams' in self.status_obj.owned_ports[str(self.port_id)]: +# stream_info = self.status_obj.owned_ports[str(self.port_id)]['streams'] +# +# for stream_id, stream in sorted(stream_info.iteritems(), key=operator.itemgetter(0)): +# self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# stream_id, +# ("True" if stream['enabled'] else "False"), +# stream['mode']['type'], +# ("True" if stream['self_start'] else "False"), +# stream['isg'], +# (stream['next_stream_id'] if stream['next_stream_id'] != -1 else "None"), +# ("{0} instr.".format(len(stream['vm'])) if stream['vm'] else "None"))) +# +# y += 1 +# +# # new section - traffic +# y += 2 +# +# self.getwin().addstr(y, 2, "Traffic:", curses.A_UNDERLINE) +# y += 2 +# +# +# +# # table header +# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( +# "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) +# +# +# y += 2 +# +# port_stats = self.status_obj.stats.get_port_stats(self.port_id) +# +# if port_stats: +# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( +# "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), +# "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), +# port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), +# +# "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), +# port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), +# "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), +# port_stats.get_rel("ibytes", format = True, suffix = "B")))) +# +# else: +# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( +# "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), +# "N/A", +# "N/A", +# "N/A", +# "N/A")) +# +# +#################### main objects ################# +# +## status log +#class TrexStatusLog(): +# def __init__ (self): +# self.log = [] +# +# def add_event (self, msg): +# self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) +# +# def draw (self, window, x, y, max_lines = 4): +# index = y +# +# cut = len(self.log) - max_lines +# if cut < 0: +# cut = 0 +# +# for msg in self.log[cut:]: +# window.addstr(index, x, msg) +# index += 1 +# +## status commands +#class TrexStatusCommands(): +# def __init__ (self, status_object): +# +# self.status_object = status_object +# +# self.stateless_client = status_object.stateless_client +# self.log = self.status_object.log +# +# self.actions = {} +# self.actions[ord('q')] = self._quit +# self.actions[ord('p')] = self._ping +# self.actions[ord('f')] = self._freeze +# +# self.actions[ord('g')] = self._show_ports_stats +# +# # register all the available ports shortcuts +# for port_id in xrange(0, self.stateless_client.get_port_count()): +# self.actions[ord('0') + port_id] = self._show_port_generator(port_id) +# +# +# # handle a key pressed +# def handle (self, ch): +# if ch in self.actions: +# return self.actions[ch]() +# else: +# self.log.add_event("Unknown key pressed, please see legend") +# return True +# +# # show all ports +# def _show_ports_stats (self): +# self.log.add_event("Switching to all ports view") +# self.status_object.stats_panel = self.status_object.ports_stats_panel +# +# return True +# +# +# # function generator for different ports requests +# def _show_port_generator (self, port_id): +# def _show_port(): +# self.log.add_event("Switching panel to port {0}".format(port_id)) +# self.status_object.stats_panel = self.status_object.ports_panels[port_id] +# +# return True +# +# return _show_port +# +# def _freeze (self): +# self.status_object.update_active = not self.status_object.update_active +# self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped") +# +# return True +# +# def _quit(self): +# return False +# +# def _ping (self): +# self.log.add_event("Pinging RPC server") +# +# rc, msg = self.stateless_client.ping() +# if rc: +# self.log.add_event("Server replied: '{0}'".format(msg)) +# else: +# self.log.add_event("Failed to get reply") +# +# return True +# +## status object +## +## +## +#class CTRexStatus(): +# def __init__ (self, stdscr, stateless_client): +# self.stdscr = stdscr +# +# self.stateless_client = stateless_client +# +# self.log = TrexStatusLog() +# self.cmds = TrexStatusCommands(self) +# +# self.stats = stateless_client.get_stats_async() +# self.general_stats = stateless_client.get_stats_async().get_general_stats() +# +# # fetch server info +# self.server_sys_info = self.stateless_client.get_system_info() +# +# self.server_version = self.stateless_client.get_server_version() +# +# # list of owned ports +# self.owned_ports_list = self.stateless_client.get_acquired_ports() +# +# # data per port +# self.owned_ports = {} +# +# for port_id in self.owned_ports_list: +# self.owned_ports[str(port_id)] = {} +# self.owned_ports[str(port_id)]['streams'] = {} +# +# stream_list = self.stateless_client.get_all_streams(port_id) +# +# self.owned_ports[str(port_id)] = stream_list +# +# +# try: +# curses.curs_set(0) +# except: +# pass +# +# curses.use_default_colors() +# self.stdscr.nodelay(1) +# curses.nonl() +# curses.noecho() +# +# self.generate_layout() +# +# +# def generate_layout (self): +# self.max_y = self.stdscr.getmaxyx()[0] +# self.max_x = self.stdscr.getmaxyx()[1] +# +# self.server_info_panel = ServerInfoPanel(int(self.max_y * 0.3), self.max_x / 2, int(self.max_y * 0.5), self.max_x /2, self) +# self.general_info_panel = GeneralInfoPanel(int(self.max_y * 0.5), self.max_x / 2, 0, self.max_x /2, self) +# self.control_panel = ControlPanel(int(self.max_y * 0.2), self.max_x , int(self.max_y * 0.8), 0, self) +# +# # those can be switched on the same place +# self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self) +# +# self.ports_panels = {} +# for i in xrange(0, self.stateless_client.get_port_count()): +# self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i) +# +# # at start time we point to the main one +# self.stats_panel = self.ports_stats_panel +# self.stats_panel.panel.top() +# +# panel.update_panels(); self.stdscr.refresh() +# return +# +# +# def wait_for_key_input (self): +# ch = self.stdscr.getch() +# +# # no key , continue +# if ch == curses.ERR: +# return True +# +# return self.cmds.handle(ch) +# +# # main run entry point +# def run (self): +# +# # list of owned ports +# self.owned_ports_list = self.stateless_client.get_acquired_ports() +# +# # data per port +# self.owned_ports = {} +# +# for port_id in self.owned_ports_list: +# self.owned_ports[str(port_id)] = {} +# self.owned_ports[str(port_id)]['streams'] = {} +# +# stream_list = self.stateless_client.get_all_streams(port_id) +# +# self.owned_ports[str(port_id)] = stream_list +# +# self.update_active = True +# while (True): +# +# rc = self.wait_for_key_input() +# if not rc: +# break +# +# self.server_info_panel.draw() +# self.general_info_panel.draw() +# self.control_panel.draw() +# +# # can be different kinds of panels +# self.stats_panel.panel.top() +# self.stats_panel.draw() +# +# panel.update_panels() +# self.stdscr.refresh() +# sleep(0.01) +# +# +## global container +#trex_status = None +# +#def show_trex_status_internal (stdscr, stateless_client): +# global trex_status +# +# if trex_status == None: +# trex_status = CTRexStatus(stdscr, stateless_client) +# +# trex_status.run() +# +#def show_trex_status (stateless_client): +# +# try: +# curses.wrapper(show_trex_status_internal, stateless_client) +# except KeyboardInterrupt: +# curses.endwin() +# +#def cleanup (): +# try: +# curses.endwin() +# except: +# pass +# diff --git a/scripts/automation/trex_control_plane/console/trex_tui.py b/scripts/automation/trex_control_plane/console/trex_tui.py index dbbac02b..1e22b005 100644 --- a/scripts/automation/trex_control_plane/console/trex_tui.py +++ b/scripts/automation/trex_control_plane/console/trex_tui.py @@ -8,6 +8,7 @@ from client_utils import text_tables from collections import OrderedDict import datetime from cStringIO import StringIO +from client.trex_stateless_client import STLError class SimpleBar(object): def __init__ (self, desc, pattern): @@ -60,7 +61,7 @@ class TrexTUIDashBoard(TrexTUIPanel): def show (self): - stats = self.stateless_client.cmd_stats(self.ports, trex_stats.COMPACT) + stats = self.stateless_client._get_formatted_stats(self.ports, trex_stats.COMPACT) # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) @@ -71,8 +72,7 @@ class TrexTUIDashBoard(TrexTUIPanel): allowed['c'] = self.key_actions['c'] - # thats it for read only - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): return allowed if len(self.stateless_client.get_transmitting_ports()) > 0: @@ -89,64 +89,44 @@ class TrexTUIDashBoard(TrexTUIPanel): ######### actions def action_pause (self): - rc = self.stateless_client.pause_traffic(self.mng.ports) + try: + rc = self.stateless_client.pause(ports = self.mng.ports) + except STLError: + pass - ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.ports): - if rc_single.rc: - ports_succeeded.append(port_id) + return "" - if len(ports_succeeded) > 0: - return "paused traffic on port(s): {0}".format(ports_succeeded) - else: - return "" def action_resume (self): - rc = self.stateless_client.resume_traffic(self.mng.ports) - - ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.ports): - if rc_single.rc: - ports_succeeded.append(port_id) + try: + self.stateless_client.resume(ports = self.mng.ports) + except STLError: + pass - if len(ports_succeeded) > 0: - return "resumed traffic on port(s): {0}".format(ports_succeeded) - else: - return "" + return "" def action_raise (self): - mul = {'type': 'percentage', 'value': 5, 'op': 'add'} - rc = self.stateless_client.update_traffic(mul, self.mng.ports) + try: + self.stateless_client.update(mult = "5%+", ports = self.mng.ports) + except STLError: + pass - ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.ports): - if rc_single.rc: - ports_succeeded.append(port_id) + return "" - if len(ports_succeeded) > 0: - return "raised B/W by %5 on port(s): {0}".format(ports_succeeded) - else: - return "" def action_lower (self): - mul = {'type': 'percentage', 'value': 5, 'op': 'sub'} - rc = self.stateless_client.update_traffic(mul, self.mng.ports) - - ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.ports): - if rc_single.rc: - ports_succeeded.append(port_id) + try: + self.stateless_client.update(mult = "5%-", ports = self.mng.ports) + except STLError: + pass - if len(ports_succeeded) > 0: - return "lowered B/W by %5 on port(s): {0}".format(ports_succeeded) - else: - return "" + return "" def action_clear (self): - self.stateless_client.cmd_clear(self.mng.ports) + self.stateless_client.clear_stats(self.mng.ports) return "cleared all stats" @@ -168,7 +148,7 @@ class TrexTUIPort(TrexTUIPanel): def show (self): - stats = self.stateless_client.cmd_stats([self.port_id], trex_stats.COMPACT) + stats = self.stateless_client._get_formatted_stats([self.port_id], trex_stats.COMPACT) # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) @@ -179,8 +159,7 @@ class TrexTUIPort(TrexTUIPanel): allowed['c'] = self.key_actions['c'] - # thats it for read only - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): return allowed if self.port.state == self.port.STATE_TX: @@ -196,39 +175,44 @@ class TrexTUIPort(TrexTUIPanel): # actions def action_pause (self): - rc = self.stateless_client.pause_traffic([self.port_id]) - if rc.good(): - return "port {0}: paused traffic".format(self.port_id) - else: - return "" + try: + self.stateless_client.pause(ports = [self.port_id]) + except STLError: + pass + + return "" def action_resume (self): - rc = self.stateless_client.resume_traffic([self.port_id]) - if rc.good(): - return "port {0}: resumed traffic".format(self.port_id) - else: - return "" + try: + self.stateless_client.resume(ports = [self.port_id]) + except STLError: + pass + + return "" + def action_raise (self): - mul = {'type': 'percentage', 'value': 5, 'op': 'add'} - rc = self.stateless_client.update_traffic(mul, [self.port_id]) + mult = {'type': 'percentage', 'value': 5, 'op': 'add'} - if rc.good(): - return "port {0}: raised B/W by 5%".format(self.port_id) - else: - return "" + try: + self.stateless_client.update(mult = mult, ports = [self.port_id]) + except STLError: + pass + + return "" def action_lower (self): - mul = {'type': 'percentage', 'value': 5, 'op': 'sub'} - rc = self.stateless_client.update_traffic(mul, [self.port_id]) + mult = {'type': 'percentage', 'value': 5, 'op': 'sub'} - if rc.good(): - return "port {0}: lowered B/W by 5%".format(self.port_id) - else: - return "" + try: + self.stateless_client.update(mult = mult, ports = [self.port_id]) + except STLError: + pass + + return "" def action_clear (self): - self.stateless_client.cmd_clear([self.port_id]) + self.stateless_client.clear_stats([self.port_id]) return "port {0}: cleared stats".format(self.port_id) # log @@ -425,7 +409,7 @@ class TrexTUI(): if self.state == self.STATE_ACTIVE: # if no connectivity - move to lost connecitivty if not self.stateless_client.async_client.is_alive(): - self.stateless_client.cmd_invalidate(self.pm.ports) + self.stateless_client._invalidate_stats(self.pm.ports) self.state = self.STATE_LOST_CONT @@ -440,11 +424,10 @@ class TrexTUI(): # restored connectivity - try to reconnect elif self.state == self.STATE_RECONNECT: - rc = self.stateless_client.connect("RO") - if rc.good(): + try: + self.stateless_client.connect("RO") self.state = self.STATE_ACTIVE - else: - # maybe we lost it again + except STLError: self.state = self.STATE_LOST_CONT diff --git a/scripts/automation/trex_control_plane/examples/interactive_stateless.py b/scripts/automation/trex_control_plane/examples/interactive_stateless.py index e64b4755..f6ada17d 100644 --- a/scripts/automation/trex_control_plane/examples/interactive_stateless.py +++ b/scripts/automation/trex_control_plane/examples/interactive_stateless.py @@ -25,7 +25,7 @@ class InteractiveStatelessTRex(cmd.Cmd): self.verbose = verbose self.virtual = virtual - self.trex = CTRexStatelessClient(trex_host, trex_port, self.virtual) + self.trex = STLClient(trex_host, trex_port, self.virtual) self.DEFAULT_RUN_PARAMS = dict(m=1.5, nc=True, p=True, diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index 3ae49da8..f8bc10d5 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -110,6 +110,7 @@ public: virtual void get_global_stats(TrexPlatformGlobalStats &stats) const = 0; virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const = 0; virtual void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const = 0; + virtual void publish_async_data_now(uint32_t key) const = 0; virtual uint8_t get_dp_core_count() const = 0; virtual ~TrexPlatformApi() {} @@ -127,6 +128,7 @@ public: void get_global_stats(TrexPlatformGlobalStats &stats) const; void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const; void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const; + void publish_async_data_now(uint32_t key) const; uint8_t get_dp_core_count() const; }; @@ -146,6 +148,7 @@ public: speed = SPEED_INVALID; } + void publish_async_data_now(uint32_t key) const {} uint8_t get_dp_core_count() const; }; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index fe6c6bdb..d40c4c8b 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2609,8 +2609,12 @@ private: bool is_all_cores_finished(); public: + + void publish_async_data(); + void publish_async_barrier(uint32_t key); + void dump_stats(FILE *fd, - std::string & json,CGlobalStats::DumpFormat format); + CGlobalStats::DumpFormat format); void dump_template_info(std::string & json); bool sanity_check(); void update_stats(void); @@ -2649,6 +2653,7 @@ private: CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */ CLatencyPktInfo m_latency_pkt; TrexPublisher m_zmq_publisher; + CGlobalStats m_stats; public: TrexStateless *m_trex_stateless; @@ -3448,11 +3453,11 @@ void CGlobalTRex::dump_template_info(std::string & json){ json+="]}" ; } -void CGlobalTRex::dump_stats(FILE *fd,std::string & json, - CGlobalStats::DumpFormat format){ - CGlobalStats stats; +void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){ + update_stats(); - get_stats(stats); + get_stats(m_stats); + if (format==CGlobalStats::dmpTABLE) { if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){ switch (m_io_modes.m_pp_mode ){ @@ -3461,11 +3466,11 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json, break; case CTrexGlobalIoMode::ppTABLE: fprintf(fd,"\n-Per port stats table \n"); - stats.Dump(fd,CGlobalStats::dmpTABLE); + m_stats.Dump(fd,CGlobalStats::dmpTABLE); break; case CTrexGlobalIoMode::ppSTANDARD: fprintf(fd,"\n-Per port stats - standard\n"); - stats.Dump(fd,CGlobalStats::dmpSTANDARD); + m_stats.Dump(fd,CGlobalStats::dmpSTANDARD); break; }; @@ -3475,22 +3480,62 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json, break; case CTrexGlobalIoMode::apENABLE: fprintf(fd,"\n-Global stats enabled \n"); - stats.DumpAllPorts(fd); + m_stats.DumpAllPorts(fd); break; }; } }else{ /* at exit , always need to dump it in standartd mode for scripts*/ - stats.Dump(fd,format); - stats.DumpAllPorts(fd); + m_stats.Dump(fd,format); + m_stats.DumpAllPorts(fd); } - stats.dump_json(json); + +} + + +void +CGlobalTRex::publish_async_data() { + std::string json; + + m_stats.dump_json(json); + m_zmq_publisher.publish_json(json); + + /* generator json , all cores are the same just sample the first one */ + m_fl.m_threads_info[0]->m_node_gen.dump_json(json); + m_zmq_publisher.publish_json(json); + + + if ( !get_is_stateless() ){ + dump_template_info(json); + m_zmq_publisher.publish_json(json); + } + + if ( get_is_rx_check_mode() ) { + m_mg.rx_check_dump_json(json ); + m_zmq_publisher.publish_json(json); + } + + /* backward compatible */ + m_mg.dump_json(json ); + m_zmq_publisher.publish_json(json); + + /* more info */ + m_mg.dump_json_v2(json ); + m_zmq_publisher.publish_json(json); + + /* stateless info - nothing for now */ + //m_trex_stateless->generate_publish_snapshot(json); + //m_zmq_publisher.publish_json(json); } +void +CGlobalTRex::publish_async_barrier(uint32_t key) { + m_zmq_publisher.publish_barrier(key); +} -int CGlobalTRex::run_in_master(){ +int CGlobalTRex::run_in_master() { - std::string json; + bool was_stopped=false; if ( get_is_stateless() ) { @@ -3530,7 +3575,7 @@ int CGlobalTRex::run_in_master(){ m_io_modes.DumpHelp(stdout); } - dump_stats(stdout,json,CGlobalStats::dmpTABLE); + dump_stats(stdout,CGlobalStats::dmpTABLE); if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) { fprintf (stdout," current time : %.1f sec \n",now_sec()); @@ -3542,16 +3587,6 @@ int CGlobalTRex::run_in_master(){ fprintf (stdout," test duration : %.1f sec \n",d); } - m_zmq_publisher.publish_json(json); - - /* generator json , all cores are the same just sample the first one */ - m_fl.m_threads_info[0]->m_node_gen.dump_json(json); - m_zmq_publisher.publish_json(json); - - if ( !get_is_stateless() ){ - dump_template_info(json); - m_zmq_publisher.publish_json(json); - } if ( !CGlobalInfo::m_options.is_latency_disabled() ){ m_mg.update(); @@ -3591,24 +3626,12 @@ int CGlobalTRex::run_in_master(){ } - if ( get_is_rx_check_mode() ) { - m_mg.rx_check_dump_json(json ); - m_zmq_publisher.publish_json(json); - } - - /* backward compatible */ - m_mg.dump_json(json ); - m_zmq_publisher.publish_json(json); - - /* more info */ - m_mg.dump_json_v2(json ); - m_zmq_publisher.publish_json(json); + } - /* stateless info */ - m_trex_stateless->generate_publish_snapshot(json); - m_zmq_publisher.publish_json(json); + /* publish data */ + publish_async_data(); /* check from messages from DP */ check_for_dp_messages(); @@ -3679,11 +3702,10 @@ int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){ int CGlobalTRex::stop_master(){ delay(1000); - std::string json; fprintf(stdout," ==================\n"); fprintf(stdout," interface sum \n"); fprintf(stdout," ==================\n"); - dump_stats(stdout,json,CGlobalStats::dmpSTANDARD); + dump_stats(stdout,CGlobalStats::dmpSTANDARD); fprintf(stdout," ==================\n"); fprintf(stdout," \n\n"); @@ -3724,7 +3746,7 @@ int CGlobalTRex::stop_master(){ m_mg.DumpRxCheckVerification(stdout,total_tx_rx_check); } - dump_stats(stdout,json,CGlobalStats::dmpSTANDARD); + dump_stats(stdout,CGlobalStats::dmpSTANDARD); dump_post_test_stats(stdout); m_fl.Delete(); @@ -4903,3 +4925,10 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, driver_name = CTRexExtendedDriverDb::Ins()->get_driver_name(); speed = CTRexExtendedDriverDb::Ins()->get_drv()->get_driver_speed(); } + +void +TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const { + g_trex.publish_async_data(); + g_trex.publish_async_barrier(key); +} + diff --git a/src/mock/trex_platform_api_mock.cpp b/src/mock/trex_platform_api_mock.cpp index 416c4b69..7cacd96c 100644 --- a/src/mock/trex_platform_api_mock.cpp +++ b/src/mock/trex_platform_api_mock.cpp @@ -51,3 +51,4 @@ void TrexMockPlatformApi::port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const { cores_id_list.push_back(std::make_pair(0, 0)); } + diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp index 35653069..f56d56df 100644 --- a/src/publisher/trex_publisher.cpp +++ b/src/publisher/trex_publisher.cpp @@ -94,6 +94,21 @@ TrexPublisher::publish_event(event_type_e type, const Json::Value &data) { publish_json(s); } +void +TrexPublisher::publish_barrier(uint32_t key) { + Json::FastWriter writer; + Json::Value value; + std::string s; + + value["name"] = "trex-barrier"; + value["type"] = key; + value["data"] = Json::objectValue; + + s = writer.write(value); + publish_json(s); +} + + /** * error handling * diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h index 52978476..f086babb 100644 --- a/src/publisher/trex_publisher.h +++ b/src/publisher/trex_publisher.h @@ -53,8 +53,20 @@ public: }; + /** + * publishes an async event + * + */ virtual void publish_event(event_type_e type, const Json::Value &data = Json::nullValue); + /** + * publishes a barrier requested by the user + * + * @author imarom (17-Jan-16) + * + */ + virtual void publish_barrier(uint32_t key); + private: void show_zmq_last_error(const std::string &err); private: diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index a701f6db..66999144 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -317,3 +317,19 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } +/** + * publish async data now (fast flush) + * + */ +trex_rpc_cmd_rc_e +TrexRpcPublishNow::_run(const Json::Value ¶ms, Json::Value &result) { + TrexStateless *main = get_stateless_obj(); + + uint32_t key = parse_uint32(params, "key", result); + + main->get_platform_api()->publish_async_data_now(key); + + result["result"] = Json::objectValue; + return (TREX_RPC_CMD_OK); + +} diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index b1750053..081398d1 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -56,6 +56,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false); * general cmds */ TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false); +TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 1, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false); diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 6e5fbfc6..aee92539 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -51,6 +51,8 @@ TrexRpcServerAsync::_prepare() { */ void TrexRpcServerAsync::_rpc_thread_cb() { +/* disabled, using the main publisher */ +#if 0 std::stringstream ss; /* create a socket based on the configuration */ @@ -105,6 +107,7 @@ TrexRpcServerAsync::_rpc_thread_cb() { /* must be closed from the same thread */ zmq_close(m_socket); +#endif } void diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 82c723b7..5218cd0a 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -34,6 +34,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { /* general */ register_command(new TrexRpcCmdPing()); + register_command(new TrexRpcPublishNow()); register_command(new TrexRpcCmdGetCmds()); register_command(new TrexRpcCmdGetVersion()); register_command(new TrexRpcCmdGetSysInfo()); diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp index 215315e0..13f264cf 100644 --- a/src/sim/trex_sim_stateless.cpp +++ b/src/sim/trex_sim_stateless.cpp @@ -97,6 +97,10 @@ public: } } + virtual void publish_async_data_now(uint32_t key) const { + + } + private: int m_dp_core_count; }; |