From 11d328d3e40b04540489eec83ac484d5b06254bb Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 17 Jan 2016 04:28:55 -0500 Subject: draft of test API for stateless --- scripts/automation/trex_control_plane/console/trex_console.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'scripts/automation/trex_control_plane/console') diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 2672665c..2ae92fb6 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -153,7 +153,9 @@ class TRexConsole(TRexGeneralCmd): ################### internal section ######################## def prompt_redraw (self): - sys.stdout.write(self.prompt + readline.get_line_buffer()) + sys.stdout.write("\n" + self.prompt + readline.get_line_buffer()) + sys.stdout.flush() + def verify_connected(f): @wraps(f) @@ -312,12 +314,12 @@ class TRexConsole(TRexGeneralCmd): elif line == "on": self.verbose = True - self.stateless_client.set_verbose(self.stateless_client.VERBOSE_HIGH) + self.stateless_client.set_verbose(self.stateless_client.logger.VERBOSE_HIGH) print format_text("\nverbose set to on\n", 'green', 'bold') elif line == "off": self.verbose = False - self.stateless_client.set_verbose(self.stateless_client.VERBOSE_REGULAR) + self.stateless_client.set_verbose(self.stateless_client.logger.VERBOSE_REGULAR) print format_text("\nverbose set to off\n", 'green', 'bold') else: @@ -531,7 +533,7 @@ class TRexConsole(TRexGeneralCmd): save_verbose = self.stateless_client.get_verbose() - self.stateless_client.set_verbose(self.stateless_client.VERBOSE_QUIET) + self.stateless_client.set_verbose(self.stateless_client.logger.VERBOSE_QUIET) self.tui.show() self.stateless_client.set_verbose(save_verbose) -- cgit 1.2.3-korg From 9932ff8dcf4f8b6b6f3986832f8a1a8f8461c743 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 18 Jan 2016 06:59:36 -0500 Subject: async publish now --- .../trex_control_plane/client/trex_async_client.py | 53 ++++++++-- .../client/trex_stateless_client.py | 11 +- .../client_utils/jsonrpc_client.py | 7 +- .../trex_control_plane/common/trex_types.py | 18 +++- .../trex_control_plane/console/trex_console.py | 49 +++------ scripts/stl_test_api/__init__.py | 38 +++++-- scripts/stl_test_example.py | 54 ++++++---- src/internal_api/trex_platform_api.h | 3 + src/main_dpdk.cpp | 113 +++++++++++++-------- src/mock/trex_platform_api_mock.cpp | 1 + src/publisher/trex_publisher.cpp | 15 +++ src/publisher/trex_publisher.h | 12 +++ src/rpc-server/commands/trex_rpc_cmd_general.cpp | 16 +++ src/rpc-server/commands/trex_rpc_cmds.h | 1 + src/rpc-server/trex_rpc_async_server.cpp | 3 + src/rpc-server/trex_rpc_cmds_table.cpp | 1 + src/sim/trex_sim_stateless.cpp | 4 + 17 files changed, 280 insertions(+), 119 deletions(-) (limited to 'scripts/automation/trex_control_plane/console') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 66e65a32..2bb0e9cd 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -16,6 +16,7 @@ import time import datetime import zmq import re +import random from common.trex_stats import * from common.trex_streams import * @@ -155,6 +156,7 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 + self.async_barriers = [] self.connected = False @@ -188,17 +190,16 @@ class CTRexAsyncClient(): self.connected = True - - # wait for data streaming from the server - timeout = time.time() + 5 - while not self.is_alive(): - time.sleep(0.01) - if time.time() > timeout: - self.disconnect() - return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + # send a barrier and wait for ack + rc = self.block_on_stats() + if not rc: + self.disconnect() + return rc return RC_OK() + + # disconnect def disconnect (self): @@ -284,10 +285,46 @@ class CTRexAsyncClient(): # stats if name == "trex-global": self.stateless_client.handle_async_stats_update(data) + # events elif name == "trex-event": self.stateless_client.handle_async_event(type, data) + + # barriers + elif name == "trex-barrier": + self.handle_async_barrier(type, data) else: pass + # async barrier handling routine + def handle_async_barrier (self, type, data): + + for b in self.async_barriers: + if b['key'] == type: + b['ack'] = True + + + # force update a new snapshot from the server + def block_on_stats(self, timeout = 5): + + # set a random key + key = random.getrandbits(32) + barrier = {'key': key, 'ack' : False} + + # add to the queue + self.async_barriers.append(barrier) + + rc = self.stateless_client.transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + expr = time.time() + timeout + while not barrier['ack']: + time.sleep(0.001) + if time.time() > expr: + return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) + + return RC_OK() + + diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index b7700c7c..105c4d01 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -55,12 +55,16 @@ class LoggerApi(object): return (self.level >= level) + # simple log message with verbose def log (self, msg, level = VERBOSE_REGULAR, newline = True): if not self.check_verbose(level): return self.write(msg, newline) + # annotates an action with a RC - writes to log the result + def annotate (self, rc, desc = None, show_status = True): + rc.annotate(self.log, desc, show_status) # default logger - to stdout class DefaultLogger(LoggerApi): @@ -82,7 +86,7 @@ class CTRexStatelessClient(object): server = "localhost", sync_port = 4501, async_port = 4500, - quiet = False, + verbose_level = LoggerApi.VERBOSE_REGULAR, virtual = False, logger = None): @@ -90,13 +94,14 @@ class CTRexStatelessClient(object): self.user = username + # logger if not logger: self.logger = DefaultLogger() else: self.logger = logger - if quiet: - self.logger.set_verbose(self.logger.VERBOSE_QUIET) + # initial verbose + self.logger.set_verbose(verbose_level) self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.logger) diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index a5e01340..e08f5d69 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -124,7 +124,7 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 10: + if tries > 5: self.disconnect() return RC_ERR("*** [RPC] - Failed to send message to server") @@ -136,9 +136,9 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 10: + if tries > 5: self.disconnect() - return RC_ERR("*** [RPC] - Failed to get server response") + return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport)) self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -173,6 +173,7 @@ class JsonRpcClient(object): else: return RC_ERR(response_json["error"]["message"]) + # if no error there should be a result if ("result" not in response_json): return RC_ERR("Malformed Response ({0})".format(str(response_json))) diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py index 21deffd2..5c29f59b 100644 --- a/scripts/automation/trex_control_plane/common/trex_types.py +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -17,7 +17,7 @@ class RC(): def __init__ (self, rc = None, data = None): self.rc_list = [] - if (rc != None) and (data != None): + if (rc != None): tuple_rc = namedtuple('RC', ['rc', 'data']) self.rc_list.append(tuple_rc(rc, data)) @@ -35,16 +35,26 @@ class RC(): def data (self): d = [x.data if x.rc else "" for x in self.rc_list] - return (d if len(d) > 1 else d[0]) + return (d if len(d) != 1 else d[0]) def err (self): e = [x.data if not x.rc else "" for x in self.rc_list] - return (e if len(e) > 1 else e[0]) + return (e if len(e) != 1 else e[0]) def __str__ (self): return str(self.data()) if self else str(self.err()) - def annotate (self, log_func, desc = None, show_status = True): + def prn_func (self, msg, newline = True): + if newline: + print msg + else: + print msg, + + def annotate (self, log_func = None, desc = None, show_status = True): + + if not log_func: + log_func = self.prn_func + if desc: log_func(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) else: diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 2ae92fb6..f086c208 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -29,7 +29,7 @@ import sys import tty, termios import trex_root_path from common.trex_streams import * -from client.trex_stateless_client import CTRexStatelessClient +from client.trex_stateless_client import CTRexStatelessClient, LoggerApi from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts @@ -266,36 +266,6 @@ class TRexConsole(TRexGeneralCmd): return targets - # annotation method - @staticmethod - def annotate (desc, rc = None, err_log = None, ext_err_msg = None): - print format_text('\n{:<40}'.format(desc), 'bold'), - if rc == None: - print "\n" - return - - if rc == False: - # do we have a complex log object ? - if isinstance(err_log, list): - print "" - for func in err_log: - if func: - print func - print "" - - elif isinstance(err_log, str): - print "\n" + err_log + "\n" - - print format_text("[FAILED]\n", 'red', 'bold') - if ext_err_msg: - print format_text(ext_err_msg + "\n", 'blue', 'bold') - - return False - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - ####################### shell commands ####################### @verify_connected @@ -667,10 +637,19 @@ def main(): options = parser.parse_args() # Stateless client connection - stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub, options.quiet) + if options.quiet: + verbose_level = LoggerApi.VERBOSE_QUIET + elif options.verbose: + verbose_level = LoggerApi.VERBOSE_HIGH + else: + verbose_level = LoggerApi.VERBOSE_REGULAR - if not options.quiet: - print "\nlogged as {0}".format(format_text(options.user, 'bold')) + # Stateless client connection + stateless_client = CTRexStatelessClient(options.user, + options.server, + options.port, + options.pub, + verbose_level) # TUI or no acquire will give us READ ONLY mode if options.tui or not options.acquire: @@ -679,7 +658,7 @@ def main(): rc = stateless_client.connect("RW") # unable to connect - bye - if rc.bad(): + if not rc: rc.annotate() return diff --git a/scripts/stl_test_api/__init__.py b/scripts/stl_test_api/__init__.py index 6df72136..7381d88e 100644 --- a/scripts/stl_test_api/__init__.py +++ b/scripts/stl_test_api/__init__.py @@ -1,3 +1,9 @@ +# +# TRex stateless API +# provides a layer for communicating with TRex +# using Python API +# + import sys import os import time @@ -43,24 +49,29 @@ class BasicTestAPI(object): # main object def __init__ (self, server = "localhost", sync_port = 4501, async_port = 4500): self.logger = BasicTestAPI.Logger() - self.client = CTRexStatelessClient(logger = self.logger, sync_port = sync_port, async_port = async_port) + self.client = CTRexStatelessClient(logger = self.logger, + sync_port = sync_port, + async_port = async_port, + verbose_level = LoggerApi.VERBOSE_REGULAR) + + self.__invalid_stats = True + # connect to the stateless client def connect (self): rc = self.client.connect(mode = "RWF") self.__verify_rc(rc) + # disconnect from the stateless client def disconnect (self): self.client.disconnect() - def __verify_rc (self, rc): - if rc.bad(): - raise self.Failure(rc) - def inject_profile (self, filename, rate = "1", duration = None): - cmd = "-f {0} -m {1}".format(filename, rate) + self.__invalid_stats = True + + cmd = "--total -f {0} -m {1}".format(filename, rate) if duration: cmd += " -d {0}".format(duration) @@ -74,9 +85,24 @@ class BasicTestAPI(object): def get_stats (self): + if self.__invalid_stats: + # send a barrier + rc = self.client.block_on_stats() + self.__verify_rc(rc) + self.__invalid_stats = False + total_stats = trex_stats.CPortStats(None) for port in self.client.ports.values(): total_stats += port.port_stats return total_stats + + + # some internal methods + + # verify RC return value + def __verify_rc (self, rc): + if not rc: + raise self.Failure(rc) + diff --git a/scripts/stl_test_example.py b/scripts/stl_test_example.py index 689ed316..7974758d 100644 --- a/scripts/stl_test_example.py +++ b/scripts/stl_test_example.py @@ -1,24 +1,42 @@ -from stl_test_api import BasicTestAPI -x = BasicTestAPI() +# simple test that uses simple API with stateless TRex +#from stl_test_api import BasicTestAPI +api_path = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.join(api_path, '../automation/trex_control_plane/client/')) -try: - x.connect() - - s = x.get_stats() - - print "input packets before test: %s" % s['ipackets'] - - x.inject_profile("stl/imix_1pkt.yaml", rate = "5gbps", duration = 1) - x.wait_while_traffic_on() - - s = x.get_stats() - - print "input packets after test: %s" % s['ipackets'] - - print "Test passed :-)\n" +from trex_stateless_client import CTRexStatelessClient, LoggerApi +c = CTRexStatelessClient() +try: + c.connect() + #before_ipackets = x.get_stats().get_rel('ipackets') + c.cmd_start_line("-f stl/imix_1pkt.yaml -m 5mpps -d 1") + c.cmd_wait_on_traffic() finally: - x.disconnect() + c.disconnect() + +#x = BasicTestAPI() +# +#try: +# x.connect() +# +# before_ipackets = x.get_stats().get_rel('ipackets') +# +# print "input packets before test: %s" % before_ipackets +# +# x.inject_profile("stl/imix_1pkt.yaml", rate = "5mpps", duration = 1) +# x.wait_while_traffic_on() +# +# after_ipackets = x.get_stats().get_rel('ipackets') +# +# print "input packets after test: %s" % after_ipackets +# +# if (after_ipackets - before_ipackets) == 5000000: +# print "Test passed :-)\n" +# else: +# print "Test failed :-(\n" +# +#finally: +# x.disconnect() diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index 3ae49da8..f8bc10d5 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -110,6 +110,7 @@ public: virtual void get_global_stats(TrexPlatformGlobalStats &stats) const = 0; virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const = 0; virtual void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const = 0; + virtual void publish_async_data_now(uint32_t key) const = 0; virtual uint8_t get_dp_core_count() const = 0; virtual ~TrexPlatformApi() {} @@ -127,6 +128,7 @@ public: void get_global_stats(TrexPlatformGlobalStats &stats) const; void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const; void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const; + void publish_async_data_now(uint32_t key) const; uint8_t get_dp_core_count() const; }; @@ -146,6 +148,7 @@ public: speed = SPEED_INVALID; } + void publish_async_data_now(uint32_t key) const {} uint8_t get_dp_core_count() const; }; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 7c25b2e2..a5e87172 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2609,8 +2609,12 @@ private: bool is_all_cores_finished(); public: + + void publish_async_data(); + void publish_async_barrier(uint32_t key); + void dump_stats(FILE *fd, - std::string & json,CGlobalStats::DumpFormat format); + CGlobalStats::DumpFormat format); void dump_template_info(std::string & json); bool sanity_check(); void update_stats(void); @@ -2649,6 +2653,7 @@ private: CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */ CLatencyPktInfo m_latency_pkt; TrexPublisher m_zmq_publisher; + CGlobalStats m_stats; public: TrexStateless *m_trex_stateless; @@ -3448,11 +3453,11 @@ void CGlobalTRex::dump_template_info(std::string & json){ json+="]}" ; } -void CGlobalTRex::dump_stats(FILE *fd,std::string & json, - CGlobalStats::DumpFormat format){ - CGlobalStats stats; +void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){ + update_stats(); - get_stats(stats); + get_stats(m_stats); + if (format==CGlobalStats::dmpTABLE) { if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){ switch (m_io_modes.m_pp_mode ){ @@ -3461,11 +3466,11 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json, break; case CTrexGlobalIoMode::ppTABLE: fprintf(fd,"\n-Per port stats table \n"); - stats.Dump(fd,CGlobalStats::dmpTABLE); + m_stats.Dump(fd,CGlobalStats::dmpTABLE); break; case CTrexGlobalIoMode::ppSTANDARD: fprintf(fd,"\n-Per port stats - standard\n"); - stats.Dump(fd,CGlobalStats::dmpSTANDARD); + m_stats.Dump(fd,CGlobalStats::dmpSTANDARD); break; }; @@ -3475,22 +3480,62 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json, break; case CTrexGlobalIoMode::apENABLE: fprintf(fd,"\n-Global stats enabled \n"); - stats.DumpAllPorts(fd); + m_stats.DumpAllPorts(fd); break; }; } }else{ /* at exit , always need to dump it in standartd mode for scripts*/ - stats.Dump(fd,format); - stats.DumpAllPorts(fd); + m_stats.Dump(fd,format); + m_stats.DumpAllPorts(fd); } - stats.dump_json(json); + +} + + +void +CGlobalTRex::publish_async_data() { + std::string json; + + m_stats.dump_json(json); + m_zmq_publisher.publish_json(json); + + /* generator json , all cores are the same just sample the first one */ + m_fl.m_threads_info[0]->m_node_gen.dump_json(json); + m_zmq_publisher.publish_json(json); + + + if ( !get_is_stateless() ){ + dump_template_info(json); + m_zmq_publisher.publish_json(json); + } + + if ( get_is_rx_check_mode() ) { + m_mg.rx_check_dump_json(json ); + m_zmq_publisher.publish_json(json); + } + + /* backward compatible */ + m_mg.dump_json(json ); + m_zmq_publisher.publish_json(json); + + /* more info */ + m_mg.dump_json_v2(json ); + m_zmq_publisher.publish_json(json); + + /* stateless info - nothing for now */ + //m_trex_stateless->generate_publish_snapshot(json); + //m_zmq_publisher.publish_json(json); } +void +CGlobalTRex::publish_async_barrier(uint32_t key) { + m_zmq_publisher.publish_barrier(key); +} -int CGlobalTRex::run_in_master(){ +int CGlobalTRex::run_in_master() { - std::string json; + bool was_stopped=false; if ( get_is_stateless() ) { @@ -3530,7 +3575,7 @@ int CGlobalTRex::run_in_master(){ m_io_modes.DumpHelp(stdout); } - dump_stats(stdout,json,CGlobalStats::dmpTABLE); + dump_stats(stdout,CGlobalStats::dmpTABLE); if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) { fprintf (stdout," current time : %.1f sec \n",now_sec()); @@ -3542,16 +3587,6 @@ int CGlobalTRex::run_in_master(){ fprintf (stdout," test duration : %.1f sec \n",d); } - m_zmq_publisher.publish_json(json); - - /* generator json , all cores are the same just sample the first one */ - m_fl.m_threads_info[0]->m_node_gen.dump_json(json); - m_zmq_publisher.publish_json(json); - - if ( !get_is_stateless() ){ - dump_template_info(json); - m_zmq_publisher.publish_json(json); - } if ( !CGlobalInfo::m_options.is_latency_disabled() ){ m_mg.update(); @@ -3591,24 +3626,12 @@ int CGlobalTRex::run_in_master(){ } - if ( get_is_rx_check_mode() ) { - m_mg.rx_check_dump_json(json ); - m_zmq_publisher.publish_json(json); - } - - /* backward compatible */ - m_mg.dump_json(json ); - m_zmq_publisher.publish_json(json); - - /* more info */ - m_mg.dump_json_v2(json ); - m_zmq_publisher.publish_json(json); + } - /* stateless info */ - m_trex_stateless->generate_publish_snapshot(json); - m_zmq_publisher.publish_json(json); + /* publish data */ + publish_async_data(); /* check from messages from DP */ check_for_dp_messages(); @@ -3679,11 +3702,10 @@ int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){ int CGlobalTRex::stop_master(){ delay(1000); - std::string json; fprintf(stdout," ==================\n"); fprintf(stdout," interface sum \n"); fprintf(stdout," ==================\n"); - dump_stats(stdout,json,CGlobalStats::dmpSTANDARD); + dump_stats(stdout,CGlobalStats::dmpSTANDARD); fprintf(stdout," ==================\n"); fprintf(stdout," \n\n"); @@ -3724,7 +3746,7 @@ int CGlobalTRex::stop_master(){ m_mg.DumpRxCheckVerification(stdout,total_tx_rx_check); } - dump_stats(stdout,json,CGlobalStats::dmpSTANDARD); + dump_stats(stdout,CGlobalStats::dmpSTANDARD); dump_post_test_stats(stdout); m_fl.Delete(); @@ -4899,3 +4921,10 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, driver_name = CTRexExtendedDriverDb::Ins()->get_driver_name(); speed = CTRexExtendedDriverDb::Ins()->get_drv()->get_driver_speed(); } + +void +TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const { + g_trex.publish_async_data(); + g_trex.publish_async_barrier(key); +} + diff --git a/src/mock/trex_platform_api_mock.cpp b/src/mock/trex_platform_api_mock.cpp index 416c4b69..7cacd96c 100644 --- a/src/mock/trex_platform_api_mock.cpp +++ b/src/mock/trex_platform_api_mock.cpp @@ -51,3 +51,4 @@ void TrexMockPlatformApi::port_id_to_cores(uint8_t port_id, std::vector> &cores_id_list) const { cores_id_list.push_back(std::make_pair(0, 0)); } + diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp index 35653069..f56d56df 100644 --- a/src/publisher/trex_publisher.cpp +++ b/src/publisher/trex_publisher.cpp @@ -94,6 +94,21 @@ TrexPublisher::publish_event(event_type_e type, const Json::Value &data) { publish_json(s); } +void +TrexPublisher::publish_barrier(uint32_t key) { + Json::FastWriter writer; + Json::Value value; + std::string s; + + value["name"] = "trex-barrier"; + value["type"] = key; + value["data"] = Json::objectValue; + + s = writer.write(value); + publish_json(s); +} + + /** * error handling * diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h index 52978476..f086babb 100644 --- a/src/publisher/trex_publisher.h +++ b/src/publisher/trex_publisher.h @@ -53,8 +53,20 @@ public: }; + /** + * publishes an async event + * + */ virtual void publish_event(event_type_e type, const Json::Value &data = Json::nullValue); + /** + * publishes a barrier requested by the user + * + * @author imarom (17-Jan-16) + * + */ + virtual void publish_barrier(uint32_t key); + private: void show_zmq_last_error(const std::string &err); private: diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index a701f6db..66999144 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -317,3 +317,19 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } +/** + * publish async data now (fast flush) + * + */ +trex_rpc_cmd_rc_e +TrexRpcPublishNow::_run(const Json::Value ¶ms, Json::Value &result) { + TrexStateless *main = get_stateless_obj(); + + uint32_t key = parse_uint32(params, "key", result); + + main->get_platform_api()->publish_async_data_now(key); + + result["result"] = Json::objectValue; + return (TREX_RPC_CMD_OK); + +} diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index b1750053..081398d1 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -56,6 +56,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false); * general cmds */ TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false); +TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 1, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false); diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 6e5fbfc6..aee92539 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -51,6 +51,8 @@ TrexRpcServerAsync::_prepare() { */ void TrexRpcServerAsync::_rpc_thread_cb() { +/* disabled, using the main publisher */ +#if 0 std::stringstream ss; /* create a socket based on the configuration */ @@ -105,6 +107,7 @@ TrexRpcServerAsync::_rpc_thread_cb() { /* must be closed from the same thread */ zmq_close(m_socket); +#endif } void diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 82c723b7..5218cd0a 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -34,6 +34,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { /* general */ register_command(new TrexRpcCmdPing()); + register_command(new TrexRpcPublishNow()); register_command(new TrexRpcCmdGetCmds()); register_command(new TrexRpcCmdGetVersion()); register_command(new TrexRpcCmdGetSysInfo()); diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp index 215315e0..13f264cf 100644 --- a/src/sim/trex_sim_stateless.cpp +++ b/src/sim/trex_sim_stateless.cpp @@ -97,6 +97,10 @@ public: } } + virtual void publish_async_data_now(uint32_t key) const { + + } + private: int m_dp_core_count; }; -- cgit 1.2.3-korg From 2d9d5e147b8f15a8308dad46711390f3b168ec56 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 18 Jan 2016 11:27:10 -0500 Subject: highly draft - just backing up --- .../trex_control_plane/client/trex_async_client.py | 22 +- .../client/trex_stateless_client.py | 1286 ++++++++++++-------- .../client_utils/parsing_opts.py | 4 +- .../trex_control_plane/common/trex_types.py | 1 + .../trex_control_plane/console/trex_console.py | 48 +- .../trex_control_plane/console/trex_status.py | 1032 ++++++++-------- scripts/stl_test_example.py | 43 +- 7 files changed, 1348 insertions(+), 1088 deletions(-) (limited to 'scripts/automation/trex_control_plane/console') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 2bb0e9cd..9828c838 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -144,12 +144,15 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client, prn_func = None): + def __init__ (self, server, port, stateless_client): self.port = port self.server = server + self.stateless_client = stateless_client - self.prn_func = prn_func + + self.event_handler = stateless_client.event_handler + self.logger = self.stateless_client.logger self.raw_snapshot = {} @@ -170,10 +173,7 @@ class CTRexAsyncClient(): msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) - if self.prn_func: - self.prn_func(msg) - else: - print msg + self.logger.log(msg) # Socket to talk to server self.context = zmq.Context() @@ -235,7 +235,7 @@ class CTRexAsyncClient(): # signal once if not got_data: - self.stateless_client.on_async_alive() + self.event_handler.on_async_alive() got_data = True @@ -244,7 +244,7 @@ class CTRexAsyncClient(): # signal once if got_data: - self.stateless_client.on_async_dead() + self.event_handler.on_async_dead() got_data = False continue @@ -284,11 +284,11 @@ class CTRexAsyncClient(): def __dispatch (self, name, type, data): # stats if name == "trex-global": - self.stateless_client.handle_async_stats_update(data) + self.event_handler.handle_async_stats_update(data) # events elif name == "trex-event": - self.stateless_client.handle_async_event(type, data) + self.event_handler.handle_async_event(type, data) # barriers elif name == "trex-barrier": @@ -315,7 +315,7 @@ class CTRexAsyncClient(): # add to the queue self.async_barriers.append(barrier) - rc = self.stateless_client.transmit("publish_now", params = {'key' : key}) + rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) if not rc: return rc diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 105c4d01..43912e55 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -25,6 +25,26 @@ from trex_port import Port from common.trex_types import * from trex_async_client import CTRexAsyncClient +############################ logger ############################# +############################ ############################# +############################ ############################# + +class STLFailure(Exception): + def __init__ (self, rc_or_str): + self.msg = str(rc_or_str) + + def __str__ (self): + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + + + s = "\n******\n" + s += "Error reported at {0}:{1}\n\n".format(format_text(fname, 'bold'), format_text(exc_tb.tb_lineno), 'bold') + s += "specific error:\n\n'{0}'\n".format(format_text(self.msg, 'bold')) + + return s + + # logger API for the client class LoggerApi(object): # verbose levels @@ -35,9 +55,11 @@ class LoggerApi(object): def __init__(self): self.level = LoggerApi.VERBOSE_REGULAR + # implemented by specific logger def write(self, msg, newline = True): raise Exception("implement this") + # implemented by specific logger def flush(self): raise Exception("implement this") @@ -62,10 +84,15 @@ class LoggerApi(object): self.write(msg, newline) + # logging that comes from async event + def async_log (self, msg, level = VERBOSE_REGULAR, newline = True): + self.log(msg, level, newline) + # annotates an action with a RC - writes to log the result def annotate (self, rc, desc = None, show_status = True): rc.annotate(self.log, desc, show_status) + # default logger - to stdout class DefaultLogger(LoggerApi): def write (self, msg, newline = True): @@ -78,90 +105,41 @@ class DefaultLogger(LoggerApi): sys.stdout.flush() -class CTRexStatelessClient(object): - """docstring for CTRexStatelessClient""" - - def __init__(self, - username = general_utils.get_current_user(), - server = "localhost", - sync_port = 4501, - async_port = 4500, - verbose_level = LoggerApi.VERBOSE_REGULAR, - virtual = False, - logger = None): - - super(CTRexStatelessClient, self).__init__() - - self.user = username - - # logger - if not logger: - self.logger = DefaultLogger() - else: - self.logger = logger - - # initial verbose - self.logger.set_verbose(verbose_level) - - self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.logger) - - self.ports = {} - self._connection_info = {"server": server, - "sync_port": sync_port, - "async_port": async_port} - self.system_info = {} - self.server_version = {} +############################ async event hander ############################# +############################ ############################# +############################ ############################# - self.async_client = CTRexAsyncClient(server, async_port, self, self.logger.log) +# handles different async events given to the client +class AsyncEventHandler(object): - self.streams_db = CStreamsDB() - self.global_stats = trex_stats.CGlobalStats(self._connection_info, - self.server_version, - self.ports) - self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, - self.ports) + def __init__ (self, client): + self.client = client + self.logger = self.client.logger self.events = [] - self.session_id = random.getrandbits(32) - self.read_only = False - self.connected = False - self.prompt_redraw_cb = None - - - # returns the port object - def get_port (self, port_id): - return self.ports.get(port_id, None) - + # public functions - # connection server ip - def get_server_ip (self): - return self.comm_link.get_server() + def get_events (self): + return self.events - # connection server port - def get_server_port (self): - return self.comm_link.get_port() + def clear_events (self): + self.events = [] - ################# events handler ###################### - def add_event_log (self, msg, ev_type, show = False): - if ev_type == "server": - prefix = "[server]" - elif ev_type == "local": - prefix = "[local]" + def on_async_dead (self): + if self.client.connected: + msg = 'lost connection to server' + self.__add_event_log(msg, 'local', True) + self.client.connected = False - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) - if show: - self.logger.log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) - if self.prompt_redraw_cb and self.logger.check_verbose(self.logger.VERBOSE_REGULAR): - self.prompt_redraw_cb() + def on_async_alive (self): + pass - + # handles an async stats update from the subscriber def handle_async_stats_update(self, dump_data): global_stats = {} port_stats = {} @@ -173,7 +151,7 @@ class CTRexStatelessClient(object): if m: port_id = int(m.group(2)) field_name = m.group(1) - if self.ports.has_key(port_id): + if self.client.ports.has_key(port_id): if not port_id in port_stats: port_stats[port_id] = {} port_stats[port_id][field_name] = value @@ -184,13 +162,14 @@ class CTRexStatelessClient(object): global_stats[key] = value # update the general object with the snapshot - self.global_stats.update(global_stats) + self.client.global_stats.update(global_stats) + # update all ports for port_id, data in port_stats.iteritems(): - self.ports[port_id].port_stats.update(data) - + self.client.ports[port_id].port_stats.update(data) + # dispatcher for server async events (port started, port stopped and etc.) def handle_async_event (self, type, data): # DP stopped @@ -200,7 +179,7 @@ class CTRexStatelessClient(object): if (type == 0): port_id = int(data['port_id']) ev = "Port {0} has started".format(port_id) - self.async_event_port_started(port_id) + self.__async_event_port_started(port_id) # port stopped elif (type == 1): @@ -208,8 +187,8 @@ class CTRexStatelessClient(object): ev = "Port {0} has stopped".format(port_id) # call the handler - self.async_event_port_stopped(port_id) - + self.__async_event_port_stopped(port_id) + # port paused elif (type == 2): @@ -217,7 +196,7 @@ class CTRexStatelessClient(object): ev = "Port {0} has paused".format(port_id) # call the handler - self.async_event_port_paused(port_id) + self.__async_event_port_paused(port_id) # port resumed elif (type == 3): @@ -225,7 +204,7 @@ class CTRexStatelessClient(object): ev = "Port {0} has resumed".format(port_id) # call the handler - self.async_event_port_resumed(port_id) + self.__async_event_port_resumed(port_id) # port finished traffic elif (type == 4): @@ -233,7 +212,7 @@ class CTRexStatelessClient(object): ev = "Port {0} job done".format(port_id) # call the handler - self.async_event_port_stopped(port_id) + self.__async_event_port_stopped(port_id) show_event = True # port was stolen... @@ -241,7 +220,7 @@ class CTRexStatelessClient(object): session_id = data['session_id'] # false alarm, its us - if session_id == self.session_id: + if session_id == self.client.session_id: return port_id = int(data['port_id']) @@ -250,13 +229,13 @@ class CTRexStatelessClient(object): ev = "Port {0} was forcely taken by '{1}'".format(port_id, who) # call the handler - self.async_event_port_forced_acquired(port_id) + self.__async_event_port_forced_acquired(port_id) show_event = True # server stopped elif (type == 100): ev = "Server has stopped" - self.async_event_server_stopped() + self.__async_event_server_stopped() show_event = True @@ -265,70 +244,186 @@ class CTRexStatelessClient(object): return - self.add_event_log(ev, 'server', show_event) + self.__add_event_log(ev, 'server', show_event) - def async_event_port_stopped (self, port_id): - self.ports[port_id].async_event_port_stopped() + # private functions + def __async_event_port_stopped (self, port_id): + self.client.ports[port_id].async_event_port_stopped() - def async_event_port_started (self, port_id): - self.ports[port_id].async_event_port_started() - - def async_event_port_paused (self, port_id): - self.ports[port_id].async_event_port_paused() + def __async_event_port_started (self, port_id): + self.client.ports[port_id].async_event_port_started() - def async_event_port_resumed (self, port_id): - self.ports[port_id].async_event_port_resumed() + def __async_event_port_paused (self, port_id): + self.client.ports[port_id].async_event_port_paused() - def async_event_port_forced_acquired (self, port_id): - self.ports[port_id].async_event_forced_acquired() - self.read_only = True + def __async_event_port_resumed (self, port_id): + self.client.ports[port_id].async_event_port_resumed() - def async_event_server_stopped (self): - self.connected = False + def __async_event_port_forced_acquired (self, port_id): + self.client.ports[port_id].async_event_forced_acquired() + self.client.read_only = True - def get_events (self): - return self.events + def __async_event_server_stopped (self): + self.client.connected = False - def clear_events (self): - self.events = [] - ############# helper functions section ############## + # add event to log + def __add_event_log (self, msg, ev_type, show = False): - # measure time for functions - def timing(f): - def wrap(*args): - - time1 = time.time() - ret = f(*args) + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" - # don't want to print on error - if ret.bad(): - return ret + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) - delta = time.time() - time1 + if show: + self.logger.async_log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) - client = args[0] - client.logger.log(format_time(delta) + "\n") - return ret + - return wrap +############################ RPC layer ############################# +############################ ############################# +############################ ############################# - def validate_port_list(self, port_id_list): - if not isinstance(port_id_list, list): - print type(port_id_list) - return False +class CCommLink(object): + """describes the connectivity of the stateless client method""" + def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): + self.virtual = virtual + self.server = server + self.port = port + self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) + + @property + def is_connected(self): + if not self.virtual: + return self.rpc_link.connected + else: + return True + + def get_server (self): + return self.server + + def get_port (self): + return self.port + + def connect(self): + if not self.virtual: + return self.rpc_link.connect() + + def disconnect(self): + if not self.virtual: + return self.rpc_link.disconnect() + + def transmit(self, method_name, params={}): + if self.virtual: + self._prompt_virtual_tx_msg() + _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) + print msg + return + else: + return self.rpc_link.invoke_rpc_method(method_name, params) + + def transmit_batch(self, batch_list): + if self.virtual: + self._prompt_virtual_tx_msg() + print [msg + for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) + for command in batch_list]] + else: + batch = self.rpc_link.create_batch() + for command in batch_list: + batch.add(command.method, command.params) + # invoke the batch + return batch.invoke() + + def _prompt_virtual_tx_msg(self): + print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, + port=self.port) - # check each item of the sequence - return all([ (port_id >= 0) and (port_id < self.get_port_count()) - for port_id in port_id_list ]) + + +############################ client ############################# +############################ ############################# +############################ ############################# + +class CTRexStatelessClient(object): + """docstring for CTRexStatelessClient""" + + def __init__(self, + username = general_utils.get_current_user(), + server = "localhost", + sync_port = 4501, + async_port = 4500, + verbose_level = LoggerApi.VERBOSE_REGULAR, + logger = None, + virtual = False): + + + self.username = username + + # init objects + self.ports = {} + self.server_version = {} + self.system_info = {} + self.session_id = random.getrandbits(32) + self.read_only = False + self.connected = False + + # logger + self.logger = DefaultLogger() if not logger else logger + + # initial verbose + self.logger.set_verbose(verbose_level) + + # low level RPC layer + self.comm_link = CCommLink(server, + sync_port, + virtual, + self.logger) + + # async event handler manager + self.event_handler = AsyncEventHandler(self) + + # async subscriber level + self.async_client = CTRexAsyncClient(server, + async_port, + self) + + + + + # stats + self.connection_info = {"username": username, + "server": server, + "sync_port": sync_port, + "async_port": async_port, + "virtual": virtual} + + + self.global_stats = trex_stats.CGlobalStats(self.connection_info, + self.server_version, + self.ports) + + self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, + self.ports) + + # stream DB + self.streams_db = CStreamsDB() + + + + ############# private functions - used by the class itself ########### # some preprocessing for port argument def __ports (self, port_id_list): @@ -350,423 +445,456 @@ class CTRexStatelessClient(object): return port_id_list - ############ boot up section ################ + # sync ports + def __sync_ports (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) - # mode can be RW - read / write, RWF - read write with force , RO - read only - def connect(self, mode = "RW"): - - if self.is_connected(): - self.disconnect() - - # clear this flag - self.connected = False - + rc = RC() - # connect sync channel - rc = self.comm_link.connect() - if rc.bad(): - return rc + for port_id in port_id_list: + rc.add(self.ports[port_id].sync()) - # connect async channel - rc = self.async_client.connect() - if rc.bad(): - return rc + return rc - # version - rc = self.transmit("get_version") - if rc.bad(): - return rc + # acquire ports, if port_list is none - get all + def __acquire (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) - self.server_version = rc.data() - self.global_stats.server_version = rc.data() + rc = RC() - # cache system info - rc = self.transmit("get_system_info") - if rc.bad(): - return rc + for port_id in port_id_list: + rc.add(self.ports[port_id].acquire(force)) - self.system_info = rc.data() + return rc - # cache supported commands - rc = self.transmit("get_supported_cmds") - if rc.bad(): - return rc + # release ports + def __release (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) - self.supported_cmds = rc.data() + rc = RC() - # create ports - for port_id in xrange(self.get_port_count()): - speed = self.system_info['ports'][port_id]['speed'] - driver = self.system_info['ports'][port_id]['driver'] + for port_id in port_id_list: + rc.add(self.ports[port_id].release()) - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id) + return rc - # sync the ports - rc = self.sync_ports() - if rc.bad(): - return rc + def __add_stream(self, stream_id, stream_obj, port_id_list = None): - # acquire all ports - if mode == "RW": - rc = self.acquire(force = False) + port_id_list = self.__ports(port_id_list) - # fallback to read only if failed - if rc.bad(): - self.annotate(rc, show_status = False) - self.logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) + rc = RC() - self.release(self.get_acquired_ports()) - self.read_only = True - else: - self.read_only = False + for port_id in port_id_list: + rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) - elif mode == "RWF": - rc = self.acquire(force = True) - if rc.bad(): - return rc - self.read_only = False + return rc - elif mode == "RO": - # no acquire on read only - rc = RC_OK() - self.read_only = True - - self.connected = True - return RC_OK() + def __add_stream_pack(self, stream_pack, port_id_list = None): + port_id_list = self.__ports(port_id_list) - def is_read_only (self): - return self.read_only + rc = RC() - def is_connected (self): - return self.connected and self.comm_link.is_connected + for port_id in port_id_list: + rc.add(self.ports[port_id].add_streams(stream_pack)) + return rc - def disconnect(self): - # release any previous acquired ports - if self.is_connected(): - self.release(self.get_acquired_ports()) - self.comm_link.disconnect() - self.async_client.disconnect() - self.connected = False + def __remove_stream(self, stream_id, port_id_list = None): + port_id_list = self.__ports(port_id_list) - return RC_OK() + rc = RC() + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_stream(stream_id)) - def on_async_dead (self): - if self.connected: - msg = 'lost connection to server' - self.add_event_log(msg, 'local', True) - self.connected = False + return rc - def on_async_alive (self): - pass - ########### cached queries (no server traffic) ########### - def get_supported_cmds(self): - return self.supported_cmds + def __remove_all_streams(self, port_id_list = None): + port_id_list = self.__ports(port_id_list) - def get_version(self): - return self.server_version + rc = RC() - def get_system_info(self): - return self.system_info + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_all_streams()) - def get_port_count(self): - return self.system_info.get("port_count") + return rc - def get_port_ids(self, as_str=False): - port_ids = range(self.get_port_count()) - if as_str: - return " ".join(str(p) for p in port_ids) - else: - return port_ids - def get_stats_async (self): - return self.async_client.get_stats() + def __get_stream(self, stream_id, port_id, get_pkt = False): - def get_connection_port (self): - return self.comm_link.port + return self.ports[port_id].get_stream(stream_id) - def get_connection_ip (self): - return self.comm_link.server - def get_all_ports (self): - return [port_id for port_id, port_obj in self.ports.iteritems()] + def __get_all_streams(self, port_id, get_pkt = False): - def get_acquired_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_acquired()] + return self.ports[port_id].get_all_streams() - def get_active_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_active()] - def get_paused_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_paused()] + def __get_stream_id_list(self, port_id): - def get_transmitting_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_transmitting()] + return self.ports[port_id].get_stream_id_list() - def set_verbose (self, level): - self.logger.set_verbose(level) - def get_verbose (self): - return self.logger.get_verbose() + def __start_traffic (self, multiplier, duration, port_id_list = None): - def set_prompt_redraw_cb(self, cb): - self.prompt_redraw_cb = cb + port_id_list = self.__ports(port_id_list) + rc = RC() - def annotate (self, rc, desc = None, show_status = True): + for port_id in port_id_list: + rc.add(self.ports[port_id].start(multiplier, duration)) - rc.annotate(self.logger.log, desc, show_status) + return rc - ############# server actions ################ - # ping server - def ping(self): - return self.transmit("ping") + def __resume_traffic (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + rc = RC() + for port_id in port_id_list: + rc.add(self.ports[port_id].resume()) - def get_global_stats(self): - return self.transmit("get_global_stats") + return rc + def __pause_traffic (self, port_id_list = None, force = False): - ########## port commands ############## - def sync_ports (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) - rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].sync()) - + rc.add(self.ports[port_id].pause()) + return rc - # acquire ports, if port_list is none - get all - def acquire (self, port_id_list = None, force = False): - port_id_list = self.__ports(port_id_list) + def __stop_traffic (self, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].acquire(force)) - + rc.add(self.ports[port_id].stop(force)) + return rc - - # release ports - def release (self, port_id_list = None): - port_id_list = self.__ports(port_id_list) + + def __update_traffic (self, mult, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].release()) - + rc.add(self.ports[port_id].update(mult)) + return rc - - def add_stream(self, stream_id, stream_obj, port_id_list = None): + def __validate (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) - + rc.add(self.ports[port_id].validate()) + return rc - - def add_stream_pack(self, stream_pack, port_id_list = None): - port_id_list = self.__ports(port_id_list) - rc = RC() + # connect to server + # mode can be RW - read / write, RWF - read write with force , RO - read only + def __connect(self, mode = "RW"): - for port_id in port_id_list: - rc.add(self.ports[port_id].add_streams(stream_pack)) + # first disconnect if already connected + if self.is_connected(): + self.__disconnect() - return rc + # clear this flag + self.connected = False + + # connect sync channel + rc = self.comm_link.connect() + if not rc: + return rc + # connect async channel + rc = self.async_client.connect() + if not rc: + return rc + # version + rc = self._transmit("get_version") + if not rc: + return rc - def remove_stream(self, stream_id, port_id_list = None): - port_id_list = self.__ports(port_id_list) + self.server_version = rc.data() + self.global_stats.server_version = rc.data() - rc = RC() + # cache system info + rc = self._transmit("get_system_info") + if not rc: + return rc - for port_id in port_id_list: - rc.add(self.ports[port_id].remove_stream(stream_id)) - - return rc + self.system_info = rc.data() + # cache supported commands + rc = self._transmit("get_supported_cmds") + if not rc: + return rc + self.supported_cmds = rc.data() - def remove_all_streams(self, port_id_list = None): - port_id_list = self.__ports(port_id_list) + # create ports + for port_id in xrange(self.system_info["port_count"]): + speed = self.system_info['ports'][port_id]['speed'] + driver = self.system_info['ports'][port_id]['driver'] + + self.ports[port_id] = Port(port_id, + speed, + driver, + self.username, + self.comm_link, + self.session_id) + + + # sync the ports + rc = self.__sync_ports() + if not rc: + return rc + + # acquire all ports + if mode == "RW": + rc = self.__acquire(force = False) + + # fallback to read only if failed + if not rc: + self.logger.annotate(rc, show_status = False) + self.logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) + + self.__release(self.get_acquired_ports()) + self.read_only = True + else: + self.read_only = False + + elif mode == "RWF": + rc = self.__acquire(force = True) + if not rc: + return rc + self.read_only = False + + elif mode == "RO": + # no acquire on read only + rc = RC_OK() + self.read_only = True - rc = RC() - for port_id in port_id_list: - rc.add(self.ports[port_id].remove_all_streams()) - return rc + self.connected = True + return RC_OK() - - def get_stream(self, stream_id, port_id, get_pkt = False): - return self.ports[port_id].get_stream(stream_id) + # disconenct from server + def __disconnect(self): + # release any previous acquired ports + if self.is_connected(): + self.__release(self.get_acquired_ports()) + self.comm_link.disconnect() + self.async_client.disconnect() - def get_all_streams(self, port_id, get_pkt = False): + self.connected = False - return self.ports[port_id].get_all_streams() + return RC_OK() - def get_stream_id_list(self, port_id): + # ping server + def __ping (self): + return self._transmit("ping") - return self.ports[port_id].get_stream_id_list() + # start command + def __start (self, port_id_list, stream_list, mult, force, duration, dry): - def start_traffic (self, multiplier, duration, port_id_list = None): + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - port_id_list = self.__ports(port_id_list) + if active_ports: + if not force: + msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) + self.logger.log(format_text(msg, 'bold')) + return RC_ERR(msg) + else: + rc = self.cmd_stop(active_ports) + if not rc: + return rc - rc = RC() - for port_id in port_id_list: - rc.add(self.ports[port_id].start(multiplier, duration)) - - return rc + rc = self.__remove_all_streams(port_id_list) + self.logger.annotate(rc,"Removing all streams from port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc + + rc = self.__add_stream_pack(stream_list, port_id_list) + self.logger.annotate(rc,"Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) + if rc.bad(): + return rc + + # when not on dry - start the traffic , otherwise validate only + if not dry: + rc = self.__start_traffic(mult, duration, port_id_list) + self.logger.annotate(rc,"Starting traffic on port(s) {0}:".format(port_id_list)) + + return rc + else: + rc = self.__validate(port_id_list) + self.logger.annotate(rc,"Validating traffic profile on port(s) {0}:".format(port_id_list)) + + if rc.bad(): + return rc + + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) + + return rc - def resume_traffic (self, port_id_list = None, force = False): - port_id_list = self.__ports(port_id_list) - rc = RC() - for port_id in port_id_list: - rc.add(self.ports[port_id].resume()) + def __verify_port_id_list (self, port_id_list): + # check arguments + if not isinstance(port_id_list, list): + return RC_ERR("ports should be an instance of 'list'") - return rc + # all ports are valid ports + if not all([port_id in self.get_all_ports() for port_id in port_id_list]): + return RC_ERR("Port IDs valid values are '{0}' but provided '{1}'".format(self.get_all_ports(), port_id_list)) + + return RC_OK() - def pause_traffic (self, port_id_list = None, force = False): - port_id_list = self.__ports(port_id_list) - rc = RC() + def __verify_mult (self, mult, strict): + if not isinstance(mult, dict): + return RC_ERR("mult should be an instance of dict") - for port_id in port_id_list: - rc.add(self.ports[port_id].pause()) + types = ["raw", "bps", "pps", "percentage"] + if not mult.get('type', None) in types: + return RC_ERR("mult should contain 'type' field of one of '{0}'".format(types)) + + if strict: + ops = ["abs"] + else: + ops = ["abs", "add", "sub"] + if not mult.get('op', None) in ops: + return RC_ERR("mult should contain 'op' field of one of '{0}'".format(ops)) - return rc + return RC_OK() - def stop_traffic (self, port_id_list = None, force = False): + def __process_profiles (self, profiles, out): - port_id_list = self.__ports(port_id_list) - rc = RC() + for profile in (profiles if isinstance(profiles, list) else [profiles]): + # filename + if isinstance(profile, str): - for port_id in port_id_list: - rc.add(self.ports[port_id].stop(force)) - - return rc + if not os.path.isfile(profile): + return RC_ERR("file '{0}' does not exists".format(profile)) + try: + stream_list = self.streams_db.load_yaml_file(profile) + except Exception as e: + rc = RC_ERR(str(e)) + self.logger.annotate(rc) + return rc - def update_traffic (self, mult, port_id_list = None, force = False): + out += stream_list + else: + return RC_ERR("unknown profile '{0}'".format(profile)) - port_id_list = self.__ports(port_id_list) - rc = RC() - for port_id in port_id_list: - rc.add(self.ports[port_id].update(mult)) - - return rc + return RC_OK() - def validate (self, port_id_list = None): - port_id_list = self.__ports(port_id_list) - rc = RC() + # stream list + if opts.db: + stream_list = self.streams_db.get_stream_pack(opts.db) + rc = RC(stream_list != None) + self.logger.annotate(rc,"Load stream pack (from DB):") + if rc.bad(): + return RC_ERR("Failed to load stream pack") - for port_id in port_id_list: - rc.add(self.ports[port_id].validate()) - - return rc + else: + # load streams from file + stream_list = None + try: + stream_list = self.streams_db.load_yaml_file(opts.file[0]) + except Exception as e: + s = str(e) + rc=RC_ERR(s) + self.logger.annotate(rc) + return rc + + rc = RC(stream_list != None) + self.logger.annotate(rc,"Load stream pack (from file):") + if stream_list == None: + return RC_ERR("Failed to load stream pack") + ############ functions used by other classes but not users ############## - def get_port_stats(self, port_id=None): - pass + def _validate_port_list(self, port_id_list): + if not isinstance(port_id_list, list): + print type(port_id_list) + return False - def get_stream_stats(self, port_id=None): - pass + # check each item of the sequence + return all([ (port_id >= 0) and (port_id < self.get_port_count()) + for port_id in port_id_list ]) - def transmit(self, method_name, params={}): + # transmit request on the RPC link + def _transmit(self, method_name, params={}): return self.comm_link.transmit(method_name, params) - - def transmit_batch(self, batch_list): + # transmit batch request on the RPC link + def _transmit_batch(self, batch_list): return self.comm_link.transmit_batch(batch_list) - ######################### Console (high level) API ######################### - - @timing - def cmd_ping(self): - rc = self.ping() - self.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) - return rc + ############# helper functions section ############## - def cmd_connect(self, mode = "RW"): - rc = self.connect(mode) - self.annotate(rc) - return rc + # measure time for functions + def timing(f): + def wrap(*args): + + time1 = time.time() + ret = f(*args) - def cmd_disconnect(self): - rc = self.disconnect() - self.annotate(rc) - return rc + # don't want to print on error + if ret.bad(): + return ret - # reset - def cmd_reset(self): + delta = time.time() - time1 - rc = self.acquire(force = True) - self.annotate(rc, "Force acquiring all ports:") - if rc.bad(): - return rc + client = args[0] + client.logger.log(format_time(delta) + "\n") + return ret - # force stop all ports - rc = self.stop_traffic(self.get_port_ids(), True) - self.annotate(rc,"Stop traffic on all ports:") - if rc.bad(): - return rc + return wrap - # remove all streams - rc = self.remove_all_streams(self.get_port_ids()) - self.annotate(rc,"Removing all streams from all ports:") - if rc.bad(): - return rc - # TODO: clear stats - return RC_OK() - + ########## port commands ############## + + ######################### Console (high level) API ######################### # stop cmd def cmd_stop (self, port_id_list): @@ -779,8 +907,8 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) - rc = self.stop_traffic(active_ports) - self.annotate(rc,"Stopping traffic on port(s) {0}:".format(port_id_list)) + rc = self.__stop_traffic(active_ports) + self.logger.annotate(rc,"Stopping traffic on port(s) {0}:".format(port_id_list)) if rc.bad(): return rc @@ -797,8 +925,8 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) - rc = self.update_traffic(mult, active_ports) - self.annotate(rc,"Updating traffic on port(s) {0}:".format(port_id_list)) + rc = self.__update_traffic(mult, active_ports) + self.logger.annotate(rc,"Updating traffic on port(s) {0}:".format(port_id_list)) return rc @@ -832,8 +960,8 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) - rc = self.pause_traffic(active_ports) - self.annotate(rc,"Pausing traffic on port(s) {0}:".format(port_id_list)) + rc = self.__pause_traffic(active_ports) + self.logger.annotate(rc,"Pausing traffic on port(s) {0}:".format(port_id_list)) return rc @@ -849,61 +977,18 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) - rc = self.resume_traffic(active_ports) - self.annotate(rc,"Resume traffic on port(s) {0}:".format(port_id_list)) + rc = self.__resume_traffic(active_ports) + self.logger.annotate(rc,"Resume traffic on port(s) {0}:".format(port_id_list)) return rc - # start cmd - def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): - - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if active_ports: - if not force: - msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) - self.logger.log(format_text(msg, 'bold')) - return RC_ERR(msg) - else: - rc = self.cmd_stop(active_ports) - if not rc: - return rc - - - rc = self.remove_all_streams(port_id_list) - self.annotate(rc,"Removing all streams from port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - - - rc = self.add_stream_pack(stream_list, port_id_list) - self.annotate(rc,"Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) - if rc.bad(): - return rc - - # when not on dry - start the traffic , otherwise validate only - if not dry: - rc = self.start_traffic(mult, duration, port_id_list) - self.annotate(rc,"Starting traffic on port(s) {0}:".format(port_id_list)) - - return rc - else: - rc = self.validate(port_id_list) - self.annotate(rc,"Validating traffic profile on port(s) {0}:".format(port_id_list)) - - if rc.bad(): - return rc - - # show a profile on one port for illustration - self.ports[port_id_list[0]].print_profile(mult, duration) - - return rc + # validate port(s) profile def cmd_validate (self, port_id_list): - rc = self.validate(port_id_list) - self.annotate(rc,"Validating streams on port(s) {0}:".format(port_id_list)) + rc = self.__validate(port_id_list) + self.logger.annotate(rc,"Validating streams on port(s) {0}:".format(port_id_list)) return rc @@ -925,23 +1010,6 @@ class CTRexStatelessClient(object): ############## High Level API With Parser ################ - def cmd_connect_line (self, line): - '''Connects to the TRex server''' - # define a parser - parser = parsing_opts.gen_parser(self, - "connect", - self.cmd_connect_line.__doc__, - parsing_opts.FORCE) - - opts = parser.parse_args(line.split()) - - if opts is None: - return RC_ERR("bad command line parameters") - - if opts.force: - rc = self.cmd_connect(mode = "RWF") - else: - rc = self.cmd_connect(mode = "RW") @timing def cmd_start_line (self, line): @@ -971,7 +1039,7 @@ class CTRexStatelessClient(object): if opts.db: stream_list = self.streams_db.get_stream_pack(opts.db) rc = RC(stream_list != None) - self.annotate(rc,"Load stream pack (from DB):") + self.logger.annotate(rc,"Load stream pack (from DB):") if rc.bad(): return RC_ERR("Failed to load stream pack") @@ -983,11 +1051,11 @@ class CTRexStatelessClient(object): except Exception as e: s = str(e) rc=RC_ERR(s) - self.annotate(rc) + self.logger.annotate(rc) return rc rc = RC(stream_list != None) - self.annotate(rc,"Load stream pack (from file):") + self.logger.annotate(rc,"Load stream pack (from file):") if stream_list == None: return RC_ERR("Failed to load stream pack") @@ -1253,65 +1321,249 @@ class CTRexStatelessClient(object): def _filter_namespace_args(namespace, ok_values): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} + def __verify_connected(f): + #@wraps(f) + def wrap(*args): + inst = args[0] + func_name = f.__name__ - ################################# - # ------ private classes ------ # - class CCommLink(object): - """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): - super(CTRexStatelessClient.CCommLink, self).__init__() - self.virtual = virtual - self.server = server - self.port = port - self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) - - @property - def is_connected(self): - if not self.virtual: - return self.rpc_link.connected - else: - return True + if not inst.stateless_client.is_connected(): + return RC_ERR("cannot execute '{0}' while client is disconnected".format(func_name)) - def get_server (self): - return self.server + ret = f(*args) + return ret - def get_port (self): - return self.port + return wrap - def connect(self): - if not self.virtual: - return self.rpc_link.connect() - def disconnect(self): - if not self.virtual: - return self.rpc_link.disconnect() - def transmit(self, method_name, params={}): - if self.virtual: - self._prompt_virtual_tx_msg() - _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) - print msg - return - else: - return self.rpc_link.invoke_rpc_method(method_name, params) - - def transmit_batch(self, batch_list): - if self.virtual: - self._prompt_virtual_tx_msg() - print [msg - for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) - for command in batch_list]] - else: - batch = self.rpc_link.create_batch() - for command in batch_list: - batch.add(command.method, command.params) - # invoke the batch - return batch.invoke() + ############################ API ############################# + ############################ ############################# + ############################ ############################# + + + ############################ Getters ############################# + ############################ ############################# + ############################ ############################# + + + # return verbose level of the logger + def get_verbose (self): + return self.logger.get_verbose() + + # is the client on read only mode ? + def is_read_only (self): + return self.read_only + + # is the client connected ? + def is_connected (self): + return self.connected and self.comm_link.is_connected + + + # get connection info + def get_connection_info (self): + return self.connection_info + + + # get supported commands by the server + def get_server_supported_cmds(self): + return self.supported_cmds + + # get server version + def get_server_version(self): + return self.server_version + + # get server system info + def get_server_system_info(self): + return self.system_info + + # get port count + def get_port_count(self): + return len(self.ports) + + # returns the port object + def get_port (self, port_id): + return self.ports.get(port_id, RC_ERR("invalid port id")) + + # get all ports as IDs + def get_all_ports (self): + return self.ports.keys() + + # get all acquired ports + def get_acquired_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_acquired()] + + # get all active ports (TX or pause) + def get_active_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_active()] + + # get paused ports + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + # get all TX ports + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + + + ############################ Commands ############################# + ############################ ############################# + ############################ ############################# + + + # set the log on verbose level + def set_verbose (self, level): + self.logger.set_verbose(level) + + + # connects to the server + # mode can be: + # 'RO' - read only + # 'RW' - read/write + # 'RWF' - read write forced (take ownership) + def connect (self, mode = "RW"): + modes = ['RO', 'RW', 'RWF'] + if not mode in modes: + return RC_ERR("invalid mode '{0}'".format(mode)) + + rc = self.__connect(mode) + self.logger.annotate(rc) + + if not rc: + raise STLFailure(rc) + + return rc + + + # disconnects from the server + def disconnect (self, annotate = True): + rc = self.__disconnect() + if annotate: + self.logger.annotate(rc, "Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], + self.connection_info['sync_port'])) + if not rc: + raise STLFailure(rc) + + return rc + + + # teardown - call after test is done + def teardown (self): + # for now, its only disconnect + rc = self.__disconnect() + if not rc: + raise STLFailure(rc) + + return rc + + + # pings the server on the RPC channel + def ping(self): + rc = self.__ping() + self.logger.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], + self.connection_info['sync_port'])) + + if not rc: + raise STLFailure(rc) + + return rc + + + # reset the server by performing + # force acquire, stop, and remove all streams + def reset(self): + + rc = self.__acquire(force = True) + self.logger.annotate(rc, "Force acquiring all ports:") + if not rc: + raise STLFailure(rc) + + + # force stop all ports + rc = self.__stop_traffic(self.get_all_ports(), True) + self.logger.annotate(rc,"Stop traffic on all ports:") + if not rc: + raise STLFailure(rc) + + + # remove all streams + rc = self.__remove_all_streams(self.get_all_ports()) + self.logger.annotate(rc,"Removing all streams from all ports:") + if not rc: + raise STLFailure(rc) + + # TODO: clear stats + return RC_OK() + + # start cmd + def start (self, + profiles, + ports = None, + mult = "1", + force = False, + duration = -1, + dry = False): + + + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self.__verify_port_id_list(ports) + if not rc: + raise STLFailure(rc) + + + # verify multiplier + try: + result = parsing_opts.match_multiplier_common(mult) + except argparse.ArgumentTypeError: + raise STLFailure("bad format for multiplier: {0}".format(mult)) + + # process profiles + stream_list = [] + rc = self.__process_profiles(profiles, stream_list) + if not rc: + raise STLFailure(rc) + + + + + ############################ Line ############################# + ############################ Commands ############################# + ############################ ############################# + + def connect_line (self, line): + '''Connects to the TRex server''' + # define a parser + parser = parsing_opts.gen_parser(self, + "connect", + self.connect_line.__doc__, + parsing_opts.FORCE) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + # call the API + if opts.force: + rc = self.connect(mode = "RWF") + else: + rc = self.connect(mode = "RW") - def _prompt_virtual_tx_msg(self): - print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, - port=self.port) + def disconnect_line (self, line): + return self.disconnect() -if __name__ == "__main__": - pass + def reset_line (self, line): + return self.reset() diff --git a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py index 3735a45b..c1afda26 100755 --- a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py @@ -284,12 +284,12 @@ class CCmdArgParser(argparse.ArgumentParser): # if all ports are marked or if (getattr(opts, "all_ports", None) == True) or (getattr(opts, "ports", None) == []): - opts.ports = self.stateless_client.get_port_ids() + opts.ports = self.stateless_client.get_all_ports() # so maybe we have ports configured elif (getattr(opts, "ports", None) == []): for port in opts.ports: - if not self.stateless_client.validate_port_list([port]): + if not self.stateless_client._validate_port_list([port]): self.error("port id '{0}' is not a valid port id\n".format(port)) return opts diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py index 5c29f59b..337f0a70 100644 --- a/scripts/automation/trex_control_plane/common/trex_types.py +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -79,6 +79,7 @@ class RC(): def RC_OK(data = ""): return RC(True, data) + def RC_ERR (err): return RC(False, err) diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index f086c208..72cdcb0d 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -39,6 +39,28 @@ from functools import wraps __version__ = "1.1" +# console custom logger +class ConsoleLogger(LoggerApi): + def __init__ (self): + self.prompt_redraw = None + + def write (self, msg, newline = True): + if newline: + print msg + else: + print msg, + + def flush (self): + sys.stdout.flush() + + # override this for the prompt fix + def async_log (self, msg, level = LoggerApi.VERBOSE_REGULAR, newline = True): + self.log(msg, level, newline) + if self.prompt_redraw: + self.prompt_redraw() + self.flush() + + def set_window_always_on_top (title): # we need the GDK module, if not available - ignroe this command try: @@ -133,9 +155,9 @@ class TRexGeneralCmd(cmd.Cmd): class TRexConsole(TRexGeneralCmd): """Trex Console""" - def __init__(self, stateless_client, verbose=False): + def __init__(self, stateless_client, verbose = False): + self.stateless_client = stateless_client - self.stateless_client.set_prompt_redraw_cb(self.prompt_redraw) TRexGeneralCmd.__init__(self) @@ -199,7 +221,7 @@ class TRexConsole(TRexGeneralCmd): def get_console_identifier(self): return "{context}_{server}".format(context=self.__class__.__name__, - server=self.stateless_client.get_server_ip()) + server=self.stateless_client.get_connection_info()['server']) def register_main_console_methods(self): main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__))) @@ -271,7 +293,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_ping (self, line): '''Ping the server\n''' - rc = self.stateless_client.cmd_ping() + rc = self.stateless_client.ping() if rc.bad(): return @@ -333,13 +355,13 @@ class TRexConsole(TRexGeneralCmd): def do_connect (self, line): '''Connects to the server\n''' - self.stateless_client.cmd_connect_line(line) + self.stateless_client.connect_line(line) def do_disconnect (self, line): '''Disconnect from the server\n''' - self.stateless_client.cmd_disconnect() + self.stateless_client.disconnect_line(line) ############### start @@ -408,7 +430,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected_and_rw def do_reset (self, line): '''force stop all ports\n''' - self.stateless_client.cmd_reset_line(line) + self.stateless_client.reset_line(line) ######### validate @@ -492,7 +514,9 @@ class TRexConsole(TRexGeneralCmd): if opts.xterm: - exe = './trex-console -t -q -s {0} -p {1}'.format(self.stateless_client.get_server_ip(), self.stateless_client.get_server_port()) + info = self.stateless_client.get_connection_info() + + exe = './trex-console -t -q -s {0} -p {1} --async_port {2}'.format(info['server'], info['sync_port'], info['async_port']) cmd = ['xterm', '-geometry', '111x42', '-sl', '0', '-title', 'trex_tui', '-e', exe] self.terminal = subprocess.Popen(cmd) @@ -645,11 +669,13 @@ def main(): verbose_level = LoggerApi.VERBOSE_REGULAR # Stateless client connection + logger = ConsoleLogger() stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub, - verbose_level) + verbose_level, + logger) # TUI or no acquire will give us READ ONLY mode if options.tui or not options.acquire: @@ -673,6 +699,8 @@ def main(): try: console = TRexConsole(stateless_client, options.verbose) + logger.prompt_redraw = console.prompt_redraw + if options.tui: console.do_tui("") else: @@ -682,7 +710,7 @@ def main(): print "\n\n*** Caught Ctrl + C... Exiting...\n\n" finally: - stateless_client.disconnect() + stateless_client.teardown() if __name__ == '__main__': diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py index cdf3fb69..45769693 100644 --- a/scripts/automation/trex_control_plane/console/trex_status.py +++ b/scripts/automation/trex_control_plane/console/trex_status.py @@ -1,525 +1,525 @@ -from time import sleep - -import os - -import curses -from curses import panel -import random -import collections -import operator -import datetime - -g_curses_active = False - -################### utils ################# - -# simple percetange show -def percentage (a, total): - x = int ((float(a) / total) * 100) - return str(x) + "%" - -################### panels ################# - -# panel object -class TrexStatusPanel(object): - def __init__ (self, h, l, y, x, headline, status_obj): - - self.status_obj = status_obj - - self.log = status_obj.log - self.stateless_client = status_obj.stateless_client - - self.stats = status_obj.stats - self.general_stats = status_obj.general_stats - - self.h = h - self.l = l - self.y = y - self.x = x - self.headline = headline - - self.win = curses.newwin(h, l, y, x) - self.win.erase() - self.win.box() - - self.win.addstr(1, 2, headline, curses.A_UNDERLINE) - self.win.refresh() - - panel.new_panel(self.win) - self.panel = panel.new_panel(self.win) - self.panel.top() - - def clear (self): - self.win.erase() - self.win.box() - self.win.addstr(1, 2, self.headline, curses.A_UNDERLINE) - - def getwin (self): - return self.win - - -# various kinds of panels - -# Server Info Panel -class ServerInfoPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj): - - super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj) - - def draw (self): - - if not self.status_obj.server_version : - return - - if not self.status_obj.server_sys_info: - return - - - self.clear() - - self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port()))) - self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"])) - self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:", - self.status_obj.server_version["build_date"] + " @ " + - self.status_obj.server_version["build_time"] + " by " + - self.status_obj.server_version["built_by"])) - - self.getwin().addstr(6, 2, "{:<30} {:30}".format("Server Uptime:", self.status_obj.server_sys_info["uptime"])) - self.getwin().addstr(7, 2, "{:<30} {:<3} / {:<30}".format("DP Cores:", str(self.status_obj.server_sys_info["dp_core_count"]) + - " cores", self.status_obj.server_sys_info["core_type"])) - - self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"])) - - ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports_list) - - if not ports_owned: - ports_owned = "None" - - self.getwin().addstr(10, 2, "{:<30} {:<30}".format("Ports Owned:", ports_owned)) - -# general info panel -class GeneralInfoPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj): - - super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj) - - def draw (self): - self.clear() - - if not self.general_stats.is_online(): - self.getwin().addstr(3, 2, "No Published Data From TRex Server") - return - - self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util"))) - - self.getwin().addstr(6, 2, "{:<30} {:} / {:}".format("Total Tx. rate:", - self.general_stats.get("m_tx_bps", format = True, suffix = "bps"), - self.general_stats.get("m_tx_pps", format = True, suffix = "pps"))) - - - self.getwin().addstr(8, 2, "{:<30} {:} / {:}".format("Total Tx:", - self.general_stats.get_rel("m_total_tx_bytes", format = True, suffix = "B"), - self.general_stats.get_rel("m_total_tx_pkts", format = True, suffix = "pkts"))) - - self.getwin().addstr(11, 2, "{:<30} {:} / {:}".format("Total Rx. rate:", - self.general_stats.get("m_rx_bps", format = True, suffix = "bps"), - self.general_stats.get("m_rx_pps", format = True, suffix = "pps"))) - - - self.getwin().addstr(13, 2, "{:<30} {:} / {:}".format("Total Rx:", - self.general_stats.get_rel("m_total_rx_bytes", format = True, suffix = "B"), - self.general_stats.get_rel("m_total_rx_pkts", format = True, suffix = "pkts"))) - -# all ports stats -class PortsStatsPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj): - - super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj) - - - def draw (self): - - self.clear() - - owned_ports = self.status_obj.owned_ports_list - if not owned_ports: - self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports") - return - - # table header - self.getwin().addstr(3, 2, "{:^15} {:^30} {:^30} {:^30}".format( - "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) - - - - for i, port_index in enumerate(owned_ports): - - port_stats = self.status_obj.stats.get_port_stats(port_index) - - if port_stats: - self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( - "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), - "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), - port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), - - "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), - port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), - "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), - port_stats.get_rel("ibytes", format = True, suffix = "B")))) - - else: - - self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( - "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), - "N/A", - "N/A", - "N/A", - "N/A")) - - - # old format +#from time import sleep +# +#import os +# +#import curses +#from curses import panel +#import random +#import collections +#import operator +#import datetime +# +#g_curses_active = False +# +#################### utils ################# +# +## simple percetange show +#def percentage (a, total): +# x = int ((float(a) / total) * 100) +# return str(x) + "%" +# +#################### panels ################# +# +## panel object +#class TrexStatusPanel(object): +# def __init__ (self, h, l, y, x, headline, status_obj): +# +# self.status_obj = status_obj +# +# self.log = status_obj.log +# self.stateless_client = status_obj.stateless_client +# +# self.stats = status_obj.stats +# self.general_stats = status_obj.general_stats +# +# self.h = h +# self.l = l +# self.y = y +# self.x = x +# self.headline = headline +# +# self.win = curses.newwin(h, l, y, x) +# self.win.erase() +# self.win.box() +# +# self.win.addstr(1, 2, headline, curses.A_UNDERLINE) +# self.win.refresh() +# +# panel.new_panel(self.win) +# self.panel = panel.new_panel(self.win) +# self.panel.top() +# +# def clear (self): +# self.win.erase() +# self.win.box() +# self.win.addstr(1, 2, self.headline, curses.A_UNDERLINE) +# +# def getwin (self): +# return self.win +# +# +## various kinds of panels +# +## Server Info Panel +#class ServerInfoPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj): +# +# super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj) +# +# def draw (self): +# +# if not self.status_obj.server_version : +# return +# +# if not self.status_obj.server_sys_info: +# return +# +# +# self.clear() +# +# self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port()))) +# self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"])) +# self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:", +# self.status_obj.server_version["build_date"] + " @ " + +# self.status_obj.server_version["build_time"] + " by " + +# self.status_obj.server_version["built_by"])) +# +# self.getwin().addstr(6, 2, "{:<30} {:30}".format("Server Uptime:", self.status_obj.server_sys_info["uptime"])) +# self.getwin().addstr(7, 2, "{:<30} {:<3} / {:<30}".format("DP Cores:", str(self.status_obj.server_sys_info["dp_core_count"]) + +# " cores", self.status_obj.server_sys_info["core_type"])) +# +# self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"])) +# +# ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports_list) +# +# if not ports_owned: +# ports_owned = "None" +# +# self.getwin().addstr(10, 2, "{:<30} {:<30}".format("Ports Owned:", ports_owned)) +# +## general info panel +#class GeneralInfoPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj): +# +# super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj) +# +# def draw (self): +# self.clear() +# +# if not self.general_stats.is_online(): +# self.getwin().addstr(3, 2, "No Published Data From TRex Server") +# return +# +# self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util"))) +# +# self.getwin().addstr(6, 2, "{:<30} {:} / {:}".format("Total Tx. rate:", +# self.general_stats.get("m_tx_bps", format = True, suffix = "bps"), +# self.general_stats.get("m_tx_pps", format = True, suffix = "pps"))) +# +# +# self.getwin().addstr(8, 2, "{:<30} {:} / {:}".format("Total Tx:", +# self.general_stats.get_rel("m_total_tx_bytes", format = True, suffix = "B"), +# self.general_stats.get_rel("m_total_tx_pkts", format = True, suffix = "pkts"))) +# +# self.getwin().addstr(11, 2, "{:<30} {:} / {:}".format("Total Rx. rate:", +# self.general_stats.get("m_rx_bps", format = True, suffix = "bps"), +# self.general_stats.get("m_rx_pps", format = True, suffix = "pps"))) +# +# +# self.getwin().addstr(13, 2, "{:<30} {:} / {:}".format("Total Rx:", +# self.general_stats.get_rel("m_total_rx_bytes", format = True, suffix = "B"), +# self.general_stats.get_rel("m_total_rx_pkts", format = True, suffix = "pkts"))) +# +## all ports stats +#class PortsStatsPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj): +# +# super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj) +# +# +# def draw (self): +# +# self.clear() +# +# owned_ports = self.status_obj.owned_ports_list +# if not owned_ports: +# self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports") +# return +# +# # table header +# self.getwin().addstr(3, 2, "{:^15} {:^30} {:^30} {:^30}".format( +# "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) +# +# +# +# for i, port_index in enumerate(owned_ports): +# +# port_stats = self.status_obj.stats.get_port_stats(port_index) +# # if port_stats: -# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( # "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), -# port_stats.get("m_total_tx_pps", format = True, suffix = "pps"), -# port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), -# port_stats.get_rel("obytes", format = True, suffix = "B"), -# port_stats.get("m_total_rx_pps", format = True, suffix = "pps"), -# port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), -# port_stats.get_rel("ibytes", format = True, suffix = "B"))) +# "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), +# port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), +# +# "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), +# port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), +# "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), +# port_stats.get_rel("ibytes", format = True, suffix = "B")))) # # else: -# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# +# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( # "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), # "N/A", # "N/A", # "N/A", -# "N/A", -# "N/A", # "N/A")) - -# control panel -class ControlPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj): - - super(ControlPanel, self).__init__(h, l, y, x, "", status_obj) - - - def draw (self): - self.clear() - - self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit" - .format(self.status_obj.stateless_client.get_port_count() - 1)) - - self.log.draw(self.getwin(), 2, 3) - -# specific ports panels -class SinglePortPanel(TrexStatusPanel): - def __init__ (self, h, l, y, x, status_obj, port_id): - - super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj) - - self.port_id = port_id - - def draw (self): - y = 3 - - self.clear() - - if not self.port_id in self.status_obj.owned_ports_list: - self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id)) - return - - # streams - self.getwin().addstr(y, 2, "Streams:", curses.A_UNDERLINE) - y += 2 - - # stream table header - self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( - "Stream ID", "Enabled", "Type", "Self Start", "ISG", "Next Stream", "VM")) - y += 2 - - # streams - - if 'streams' in self.status_obj.owned_ports[str(self.port_id)]: - stream_info = self.status_obj.owned_ports[str(self.port_id)]['streams'] - - for stream_id, stream in sorted(stream_info.iteritems(), key=operator.itemgetter(0)): - self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( - stream_id, - ("True" if stream['enabled'] else "False"), - stream['mode']['type'], - ("True" if stream['self_start'] else "False"), - stream['isg'], - (stream['next_stream_id'] if stream['next_stream_id'] != -1 else "None"), - ("{0} instr.".format(len(stream['vm'])) if stream['vm'] else "None"))) - - y += 1 - - # new section - traffic - y += 2 - - self.getwin().addstr(y, 2, "Traffic:", curses.A_UNDERLINE) - y += 2 - - - - # table header - self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( - "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) - - - y += 2 - - port_stats = self.status_obj.stats.get_port_stats(self.port_id) - - if port_stats: - self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( - "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), - "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), - port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), - - "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), - port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), - "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), - port_stats.get_rel("ibytes", format = True, suffix = "B")))) - - else: - self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( - "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), - "N/A", - "N/A", - "N/A", - "N/A")) - - -################### main objects ################# - -# status log -class TrexStatusLog(): - def __init__ (self): - self.log = [] - - def add_event (self, msg): - self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) - - def draw (self, window, x, y, max_lines = 4): - index = y - - cut = len(self.log) - max_lines - if cut < 0: - cut = 0 - - for msg in self.log[cut:]: - window.addstr(index, x, msg) - index += 1 - -# status commands -class TrexStatusCommands(): - def __init__ (self, status_object): - - self.status_object = status_object - - self.stateless_client = status_object.stateless_client - self.log = self.status_object.log - - self.actions = {} - self.actions[ord('q')] = self._quit - self.actions[ord('p')] = self._ping - self.actions[ord('f')] = self._freeze - - self.actions[ord('g')] = self._show_ports_stats - - # register all the available ports shortcuts - for port_id in xrange(0, self.stateless_client.get_port_count()): - self.actions[ord('0') + port_id] = self._show_port_generator(port_id) - - - # handle a key pressed - def handle (self, ch): - if ch in self.actions: - return self.actions[ch]() - else: - self.log.add_event("Unknown key pressed, please see legend") - return True - - # show all ports - def _show_ports_stats (self): - self.log.add_event("Switching to all ports view") - self.status_object.stats_panel = self.status_object.ports_stats_panel - - return True - - - # function generator for different ports requests - def _show_port_generator (self, port_id): - def _show_port(): - self.log.add_event("Switching panel to port {0}".format(port_id)) - self.status_object.stats_panel = self.status_object.ports_panels[port_id] - - return True - - return _show_port - - def _freeze (self): - self.status_object.update_active = not self.status_object.update_active - self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped") - - return True - - def _quit(self): - return False - - def _ping (self): - self.log.add_event("Pinging RPC server") - - rc, msg = self.stateless_client.ping() - if rc: - self.log.add_event("Server replied: '{0}'".format(msg)) - else: - self.log.add_event("Failed to get reply") - - return True - -# status object -# -# -# -class CTRexStatus(): - def __init__ (self, stdscr, stateless_client): - self.stdscr = stdscr - - self.stateless_client = stateless_client - - self.log = TrexStatusLog() - self.cmds = TrexStatusCommands(self) - - self.stats = stateless_client.get_stats_async() - self.general_stats = stateless_client.get_stats_async().get_general_stats() - - # fetch server info - self.server_sys_info = self.stateless_client.get_system_info() - - self.server_version = self.stateless_client.get_version() - - # list of owned ports - self.owned_ports_list = self.stateless_client.get_acquired_ports() - - # data per port - self.owned_ports = {} - - for port_id in self.owned_ports_list: - self.owned_ports[str(port_id)] = {} - self.owned_ports[str(port_id)]['streams'] = {} - - stream_list = self.stateless_client.get_all_streams(port_id) - - self.owned_ports[str(port_id)] = stream_list - - - try: - curses.curs_set(0) - except: - pass - - curses.use_default_colors() - self.stdscr.nodelay(1) - curses.nonl() - curses.noecho() - - self.generate_layout() - - - def generate_layout (self): - self.max_y = self.stdscr.getmaxyx()[0] - self.max_x = self.stdscr.getmaxyx()[1] - - self.server_info_panel = ServerInfoPanel(int(self.max_y * 0.3), self.max_x / 2, int(self.max_y * 0.5), self.max_x /2, self) - self.general_info_panel = GeneralInfoPanel(int(self.max_y * 0.5), self.max_x / 2, 0, self.max_x /2, self) - self.control_panel = ControlPanel(int(self.max_y * 0.2), self.max_x , int(self.max_y * 0.8), 0, self) - - # those can be switched on the same place - self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self) - - self.ports_panels = {} - for i in xrange(0, self.stateless_client.get_port_count()): - self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i) - - # at start time we point to the main one - self.stats_panel = self.ports_stats_panel - self.stats_panel.panel.top() - - panel.update_panels(); self.stdscr.refresh() - return - - - def wait_for_key_input (self): - ch = self.stdscr.getch() - - # no key , continue - if ch == curses.ERR: - return True - - return self.cmds.handle(ch) - - # main run entry point - def run (self): - - # list of owned ports - self.owned_ports_list = self.stateless_client.get_acquired_ports() - - # data per port - self.owned_ports = {} - - for port_id in self.owned_ports_list: - self.owned_ports[str(port_id)] = {} - self.owned_ports[str(port_id)]['streams'] = {} - - stream_list = self.stateless_client.get_all_streams(port_id) - - self.owned_ports[str(port_id)] = stream_list - - self.update_active = True - while (True): - - rc = self.wait_for_key_input() - if not rc: - break - - self.server_info_panel.draw() - self.general_info_panel.draw() - self.control_panel.draw() - - # can be different kinds of panels - self.stats_panel.panel.top() - self.stats_panel.draw() - - panel.update_panels() - self.stdscr.refresh() - sleep(0.01) - - -# global container -trex_status = None - -def show_trex_status_internal (stdscr, stateless_client): - global trex_status - - if trex_status == None: - trex_status = CTRexStatus(stdscr, stateless_client) - - trex_status.run() - -def show_trex_status (stateless_client): - - try: - curses.wrapper(show_trex_status_internal, stateless_client) - except KeyboardInterrupt: - curses.endwin() - -def cleanup (): - try: - curses.endwin() - except: - pass - +# +# +# # old format +## if port_stats: +## self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +## "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), +## port_stats.get("m_total_tx_pps", format = True, suffix = "pps"), +## port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), +## port_stats.get_rel("obytes", format = True, suffix = "B"), +## port_stats.get("m_total_rx_pps", format = True, suffix = "pps"), +## port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), +## port_stats.get_rel("ibytes", format = True, suffix = "B"))) +## +## else: +## self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +## "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), +## "N/A", +## "N/A", +## "N/A", +## "N/A", +## "N/A", +## "N/A")) +# +## control panel +#class ControlPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj): +# +# super(ControlPanel, self).__init__(h, l, y, x, "", status_obj) +# +# +# def draw (self): +# self.clear() +# +# self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit" +# .format(self.status_obj.stateless_client.get_port_count() - 1)) +# +# self.log.draw(self.getwin(), 2, 3) +# +## specific ports panels +#class SinglePortPanel(TrexStatusPanel): +# def __init__ (self, h, l, y, x, status_obj, port_id): +# +# super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj) +# +# self.port_id = port_id +# +# def draw (self): +# y = 3 +# +# self.clear() +# +# if not self.port_id in self.status_obj.owned_ports_list: +# self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id)) +# return +# +# # streams +# self.getwin().addstr(y, 2, "Streams:", curses.A_UNDERLINE) +# y += 2 +# +# # stream table header +# self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# "Stream ID", "Enabled", "Type", "Self Start", "ISG", "Next Stream", "VM")) +# y += 2 +# +# # streams +# +# if 'streams' in self.status_obj.owned_ports[str(self.port_id)]: +# stream_info = self.status_obj.owned_ports[str(self.port_id)]['streams'] +# +# for stream_id, stream in sorted(stream_info.iteritems(), key=operator.itemgetter(0)): +# self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# stream_id, +# ("True" if stream['enabled'] else "False"), +# stream['mode']['type'], +# ("True" if stream['self_start'] else "False"), +# stream['isg'], +# (stream['next_stream_id'] if stream['next_stream_id'] != -1 else "None"), +# ("{0} instr.".format(len(stream['vm'])) if stream['vm'] else "None"))) +# +# y += 1 +# +# # new section - traffic +# y += 2 +# +# self.getwin().addstr(y, 2, "Traffic:", curses.A_UNDERLINE) +# y += 2 +# +# +# +# # table header +# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( +# "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) +# +# +# y += 2 +# +# port_stats = self.status_obj.stats.get_port_stats(self.port_id) +# +# if port_stats: +# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( +# "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), +# "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), +# port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), +# +# "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), +# port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), +# "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), +# port_stats.get_rel("ibytes", format = True, suffix = "B")))) +# +# else: +# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( +# "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), +# "N/A", +# "N/A", +# "N/A", +# "N/A")) +# +# +#################### main objects ################# +# +## status log +#class TrexStatusLog(): +# def __init__ (self): +# self.log = [] +# +# def add_event (self, msg): +# self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) +# +# def draw (self, window, x, y, max_lines = 4): +# index = y +# +# cut = len(self.log) - max_lines +# if cut < 0: +# cut = 0 +# +# for msg in self.log[cut:]: +# window.addstr(index, x, msg) +# index += 1 +# +## status commands +#class TrexStatusCommands(): +# def __init__ (self, status_object): +# +# self.status_object = status_object +# +# self.stateless_client = status_object.stateless_client +# self.log = self.status_object.log +# +# self.actions = {} +# self.actions[ord('q')] = self._quit +# self.actions[ord('p')] = self._ping +# self.actions[ord('f')] = self._freeze +# +# self.actions[ord('g')] = self._show_ports_stats +# +# # register all the available ports shortcuts +# for port_id in xrange(0, self.stateless_client.get_port_count()): +# self.actions[ord('0') + port_id] = self._show_port_generator(port_id) +# +# +# # handle a key pressed +# def handle (self, ch): +# if ch in self.actions: +# return self.actions[ch]() +# else: +# self.log.add_event("Unknown key pressed, please see legend") +# return True +# +# # show all ports +# def _show_ports_stats (self): +# self.log.add_event("Switching to all ports view") +# self.status_object.stats_panel = self.status_object.ports_stats_panel +# +# return True +# +# +# # function generator for different ports requests +# def _show_port_generator (self, port_id): +# def _show_port(): +# self.log.add_event("Switching panel to port {0}".format(port_id)) +# self.status_object.stats_panel = self.status_object.ports_panels[port_id] +# +# return True +# +# return _show_port +# +# def _freeze (self): +# self.status_object.update_active = not self.status_object.update_active +# self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped") +# +# return True +# +# def _quit(self): +# return False +# +# def _ping (self): +# self.log.add_event("Pinging RPC server") +# +# rc, msg = self.stateless_client.ping() +# if rc: +# self.log.add_event("Server replied: '{0}'".format(msg)) +# else: +# self.log.add_event("Failed to get reply") +# +# return True +# +## status object +## +## +## +#class CTRexStatus(): +# def __init__ (self, stdscr, stateless_client): +# self.stdscr = stdscr +# +# self.stateless_client = stateless_client +# +# self.log = TrexStatusLog() +# self.cmds = TrexStatusCommands(self) +# +# self.stats = stateless_client.get_stats_async() +# self.general_stats = stateless_client.get_stats_async().get_general_stats() +# +# # fetch server info +# self.server_sys_info = self.stateless_client.get_system_info() +# +# self.server_version = self.stateless_client.get_server_version() +# +# # list of owned ports +# self.owned_ports_list = self.stateless_client.get_acquired_ports() +# +# # data per port +# self.owned_ports = {} +# +# for port_id in self.owned_ports_list: +# self.owned_ports[str(port_id)] = {} +# self.owned_ports[str(port_id)]['streams'] = {} +# +# stream_list = self.stateless_client.get_all_streams(port_id) +# +# self.owned_ports[str(port_id)] = stream_list +# +# +# try: +# curses.curs_set(0) +# except: +# pass +# +# curses.use_default_colors() +# self.stdscr.nodelay(1) +# curses.nonl() +# curses.noecho() +# +# self.generate_layout() +# +# +# def generate_layout (self): +# self.max_y = self.stdscr.getmaxyx()[0] +# self.max_x = self.stdscr.getmaxyx()[1] +# +# self.server_info_panel = ServerInfoPanel(int(self.max_y * 0.3), self.max_x / 2, int(self.max_y * 0.5), self.max_x /2, self) +# self.general_info_panel = GeneralInfoPanel(int(self.max_y * 0.5), self.max_x / 2, 0, self.max_x /2, self) +# self.control_panel = ControlPanel(int(self.max_y * 0.2), self.max_x , int(self.max_y * 0.8), 0, self) +# +# # those can be switched on the same place +# self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self) +# +# self.ports_panels = {} +# for i in xrange(0, self.stateless_client.get_port_count()): +# self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i) +# +# # at start time we point to the main one +# self.stats_panel = self.ports_stats_panel +# self.stats_panel.panel.top() +# +# panel.update_panels(); self.stdscr.refresh() +# return +# +# +# def wait_for_key_input (self): +# ch = self.stdscr.getch() +# +# # no key , continue +# if ch == curses.ERR: +# return True +# +# return self.cmds.handle(ch) +# +# # main run entry point +# def run (self): +# +# # list of owned ports +# self.owned_ports_list = self.stateless_client.get_acquired_ports() +# +# # data per port +# self.owned_ports = {} +# +# for port_id in self.owned_ports_list: +# self.owned_ports[str(port_id)] = {} +# self.owned_ports[str(port_id)]['streams'] = {} +# +# stream_list = self.stateless_client.get_all_streams(port_id) +# +# self.owned_ports[str(port_id)] = stream_list +# +# self.update_active = True +# while (True): +# +# rc = self.wait_for_key_input() +# if not rc: +# break +# +# self.server_info_panel.draw() +# self.general_info_panel.draw() +# self.control_panel.draw() +# +# # can be different kinds of panels +# self.stats_panel.panel.top() +# self.stats_panel.draw() +# +# panel.update_panels() +# self.stdscr.refresh() +# sleep(0.01) +# +# +## global container +#trex_status = None +# +#def show_trex_status_internal (stdscr, stateless_client): +# global trex_status +# +# if trex_status == None: +# trex_status = CTRexStatus(stdscr, stateless_client) +# +# trex_status.run() +# +#def show_trex_status (stateless_client): +# +# try: +# curses.wrapper(show_trex_status_internal, stateless_client) +# except KeyboardInterrupt: +# curses.endwin() +# +#def cleanup (): +# try: +# curses.endwin() +# except: +# pass +# diff --git a/scripts/stl_test_example.py b/scripts/stl_test_example.py index 7974758d..e9202ca6 100644 --- a/scripts/stl_test_example.py +++ b/scripts/stl_test_example.py @@ -1,42 +1,21 @@ +import os +import sys -# simple test that uses simple API with stateless TRex -#from stl_test_api import BasicTestAPI api_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(api_path, '../automation/trex_control_plane/client/')) +sys.path.insert(0, os.path.join(api_path, 'automation/trex_control_plane/client/')) -from trex_stateless_client import CTRexStatelessClient, LoggerApi +from trex_stateless_client import CTRexStatelessClient, STLFailure c = CTRexStatelessClient() + try: c.connect() #before_ipackets = x.get_stats().get_rel('ipackets') - c.cmd_start_line("-f stl/imix_1pkt.yaml -m 5mpps -d 1") - c.cmd_wait_on_traffic() -finally: - c.disconnect() - -#x = BasicTestAPI() -# -#try: -# x.connect() -# -# before_ipackets = x.get_stats().get_rel('ipackets') -# -# print "input packets before test: %s" % before_ipackets -# -# x.inject_profile("stl/imix_1pkt.yaml", rate = "5mpps", duration = 1) -# x.wait_while_traffic_on() -# -# after_ipackets = x.get_stats().get_rel('ipackets') -# -# print "input packets after test: %s" % after_ipackets -# -# if (after_ipackets - before_ipackets) == 5000000: -# print "Test passed :-)\n" -# else: -# print "Test failed :-(\n" -# -#finally: -# x.disconnect() + c.start(profiles = 'stl/imix_3pkt.yaml', ports = [1]) + #c.cmd_wait_on_traffic() +except STLFailure as e: + print e +finally: + c.teardown() -- cgit 1.2.3-korg From c93acc26bf2517c872da716198e76bcf566b836a Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 19 Jan 2016 08:49:31 -0500 Subject: draft #2 --- .../client/trex_stateless_client.py | 1118 +++++++++++--------- .../client_utils/parsing_opts.py | 36 +- .../trex_control_plane/common/trex_types.py | 12 +- .../trex_control_plane/console/trex_console.py | 18 +- scripts/stl_test_example.py | 15 +- 5 files changed, 694 insertions(+), 505 deletions(-) (limited to 'scripts/automation/trex_control_plane/console') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 43912e55..28e55088 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -25,13 +25,10 @@ from trex_port import Port from common.trex_types import * from trex_async_client import CTRexAsyncClient -############################ logger ############################# -############################ ############################# -############################ ############################# - -class STLFailure(Exception): - def __init__ (self, rc_or_str): - self.msg = str(rc_or_str) +# basic error for API +class STLError(Exception): + def __init__ (self, msg): + self.msg = str(msg) def __str__ (self): exc_type, exc_obj, exc_tb = sys.exc_info() @@ -39,11 +36,35 @@ class STLFailure(Exception): s = "\n******\n" - s += "Error reported at {0}:{1}\n\n".format(format_text(fname, 'bold'), format_text(exc_tb.tb_lineno), 'bold') - s += "specific error:\n\n'{0}'\n".format(format_text(self.msg, 'bold')) + s += "Error at {0}:{1}\n\n".format(format_text(fname, 'bold'), format_text(exc_tb.tb_lineno), 'bold') + s += "specific error:\n\n{0}\n".format(format_text(self.msg, 'bold')) return s + def brief (self): + return self.msg + + +# raised when the client state is invalid for operation +class STLStateError(STLError): + def __init__ (self, op, state): + self.msg = "Operation '{0}' is not valid while '{1}'".format(op, state) + + +# raised when argument is not valid for operation +class STLArgumentError(STLError): + def __init__ (self, name, got, valid_values = None, extended = None): + self.msg = "Argument: '{0}' invalid value: '{1}'".format(name, got) + if valid_values: + self.msg += " - valid values are '{0}'".format(valid_values) + + if extended: + self.msg += "\n{0}".format(extended) + + +############################ logger ############################# +############################ ############################# +############################ ############################# # logger API for the client class LoggerApi(object): @@ -734,19 +755,19 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) else: - rc = self.cmd_stop(active_ports) + rc = self.__stop(active_ports) if not rc: return rc rc = self.__remove_all_streams(port_id_list) - self.logger.annotate(rc,"Removing all streams from port(s) {0}:".format(port_id_list)) + self.logger.annotate(rc, "Removing all streams from port(s) {0}:".format(port_id_list)) if rc.bad(): return rc rc = self.__add_stream_pack(stream_list, port_id_list) - self.logger.annotate(rc,"Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) + self.logger.annotate(rc, "Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) if rc.bad(): return rc @@ -769,35 +790,85 @@ class CTRexStatelessClient(object): return rc + # stop cmd + def __stop (self, port_id_list): - def __verify_port_id_list (self, port_id_list): - # check arguments - if not isinstance(port_id_list, list): - return RC_ERR("ports should be an instance of 'list'") + # find the relveant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - # all ports are valid ports - if not all([port_id in self.get_all_ports() for port_id in port_id_list]): - return RC_ERR("Port IDs valid values are '{0}' but provided '{1}'".format(self.get_all_ports(), port_id_list)) + if not active_ports: + msg = "No active traffic on provided ports" + self.logger.log(format_text(msg, 'bold')) + return RC_WARN(msg) + + rc = self.__stop_traffic(active_ports) + self.logger.annotate(rc, "Stopping traffic on port(s) {0}:".format(port_id_list)) + if not rc: + return rc return RC_OK() + #update cmd + def __update (self, port_id_list, mult): + + # find the relevant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on provided ports" + self.logger.log(format_text(msg, 'bold')) + return RC_WARN(msg) + + rc = self.__update_traffic(mult, active_ports) + self.logger.annotate(rc, "Updating traffic on port(s) {0}:".format(port_id_list)) - def __verify_mult (self, mult, strict): - if not isinstance(mult, dict): - return RC_ERR("mult should be an instance of dict") + return rc - types = ["raw", "bps", "pps", "percentage"] - if not mult.get('type', None) in types: - return RC_ERR("mult should contain 'type' field of one of '{0}'".format(types)) - if strict: - ops = ["abs"] - else: - ops = ["abs", "add", "sub"] - if not mult.get('op', None) in ops: - return RC_ERR("mult should contain 'op' field of one of '{0}'".format(ops)) + # pause cmd + def __pause (self, port_id_list): + + # find the relevant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on provided ports" + self.logger.log(format_text(msg, 'bold')) + return RC_WARN(msg) + + rc = self.__pause_traffic(active_ports) + self.logger.annotate(rc, "Pausing traffic on port(s) {0}:".format(port_id_list)) + return rc + + + # resume cmd + def __resume (self, port_id_list): + + # find the relveant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on porvided ports" + self.logger.log(format_text(msg, 'bold')) + return RC_WARN(msg) + + rc = self.__resume_traffic(active_ports) + self.logger.annotate(rc, "Resume traffic on port(s) {0}:".format(port_id_list)) + return rc + + + # clear stats + def __clear_stats(self, port_id_list): + + for port_id in port_id_list: + self.ports[port_id].clear_stats() + + self.global_stats.clear_stats() + + rc = RC_OK() + self.logger.annotate(rc, "clearing stats on port(s) {0}:".format(port_id_list)) + return RC - return RC_OK() def __process_profiles (self, profiles, out): @@ -815,7 +886,8 @@ class CTRexStatelessClient(object): self.logger.annotate(rc) return rc - out += stream_list + out.append(stream_list) + else: return RC_ERR("unknown profile '{0}'".format(profile)) @@ -850,14 +922,23 @@ class CTRexStatelessClient(object): ############ functions used by other classes but not users ############## + def _verify_port_id_list (self, port_id_list): + # check arguments + if not isinstance(port_id_list, list): + return RC_ERR("ports should be an instance of 'list' not {0}".format(type(port_id_list))) + + # all ports are valid ports + if not port_id_list or not all([port_id in self.get_all_ports() for port_id in port_id_list]): + return RC_ERR("") + + return RC_OK() + def _validate_port_list(self, port_id_list): if not isinstance(port_id_list, list): - print type(port_id_list) return False # check each item of the sequence - return all([ (port_id >= 0) and (port_id < self.get_port_count()) - for port_id in port_id_list ]) + return port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list]) # transmit request on the RPC link @@ -870,65 +951,12 @@ class CTRexStatelessClient(object): ############# helper functions section ############## - # measure time for functions - def timing(f): - def wrap(*args): - - time1 = time.time() - ret = f(*args) - - # don't want to print on error - if ret.bad(): - return ret - - delta = time.time() - time1 - - client = args[0] - client.logger.log(format_time(delta) + "\n") - - return ret - - return wrap - - - + ########## port commands ############## ######################### Console (high level) API ######################### - # stop cmd - def cmd_stop (self, port_id_list): - - # find the relveant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if not active_ports: - msg = "No active traffic on provided ports" - self.logger.log(format_text(msg, 'bold')) - return RC_ERR(msg) - - rc = self.__stop_traffic(active_ports) - self.logger.annotate(rc,"Stopping traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - - return RC_OK() - - # update cmd - def cmd_update (self, port_id_list, mult): - - # find the relevant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if not active_ports: - msg = "No active traffic on provided ports" - self.logger.log(format_text(msg, 'bold')) - return RC_ERR(msg) - - rc = self.__update_traffic(mult, active_ports) - self.logger.annotate(rc,"Updating traffic on port(s) {0}:".format(port_id_list)) - - return rc + # clear stats def cmd_clear(self, port_id_list): @@ -949,40 +977,11 @@ class CTRexStatelessClient(object): return RC_OK() - # pause cmd - def cmd_pause (self, port_id_list): - - # find the relevant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if not active_ports: - msg = "No active traffic on provided ports" - self.logger.log(format_text(msg, 'bold')) - return RC_ERR(msg) - - rc = self.__pause_traffic(active_ports) - self.logger.annotate(rc,"Pausing traffic on port(s) {0}:".format(port_id_list)) - return rc - - - - # resume cmd - def cmd_resume (self, port_id_list): - - # find the relveant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if not active_ports: - msg = "No active traffic on porvided ports" - self.logger.log(format_text(msg, 'bold')) - return RC_ERR(msg) + - rc = self.__resume_traffic(active_ports) - self.logger.annotate(rc,"Resume traffic on port(s) {0}:".format(port_id_list)) - return rc - + # validate port(s) profile @@ -1011,150 +1010,564 @@ class CTRexStatelessClient(object): ############## High Level API With Parser ################ - @timing - def cmd_start_line (self, line): - '''Start selected traffic in specified ports on TRex\n''' - # define a parser - parser = parsing_opts.gen_parser(self, - "start", - self.cmd_start_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.TOTAL, - parsing_opts.FORCE, - parsing_opts.STREAM_FROM_PATH_OR_FILE, - parsing_opts.DURATION, - parsing_opts.MULTIPLIER_STRICT, - parsing_opts.DRY_RUN) + ################################# + # ------ private methods ------ # + @staticmethod + def __get_mask_keys(ok_values={True}, **kwargs): + masked_keys = set() + for key, val in kwargs.iteritems(): + if val in ok_values: + masked_keys.add(key) + return masked_keys - opts = parser.parse_args(line.split()) + @staticmethod + def __filter_namespace_args(namespace, ok_values): + return {k: v for k, v in namespace.__dict__.items() if k in ok_values} - if opts is None: - return RC_ERR("bad command line parameters") + # verify decorator - throws exception is client is disconnected + def __verify_connected(f): + def wrap(*args, **kwargs): + inst = args[0] + func_name = f.__name__ + if not inst.is_connected(): + raise STLStateError(func_name, 'disconnected') - if opts.dry: - self.logger.log(format_text("\n*** DRY RUN ***", 'bold')) + ret = f(*args, **kwargs) + return ret - if opts.db: - stream_list = self.streams_db.get_stream_pack(opts.db) - rc = RC(stream_list != None) - self.logger.annotate(rc,"Load stream pack (from DB):") - if rc.bad(): - return RC_ERR("Failed to load stream pack") + return wrap - else: - # load streams from file - stream_list = None - try: - stream_list = self.streams_db.load_yaml_file(opts.file[0]) - except Exception as e: - s = str(e) - rc=RC_ERR(s) - self.logger.annotate(rc) - return rc - rc = RC(stream_list != None) - self.logger.annotate(rc,"Load stream pack (from file):") - if stream_list == None: - return RC_ERR("Failed to load stream pack") + ############################ API ############################# + ############################ ############################# + ############################ ############################# - # total has no meaning with percentage - its linear - if opts.total and (opts.mult['type'] != 'percentage'): - # if total was set - divide it between the ports - opts.mult['value'] = opts.mult['value'] / len(opts.ports) - return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry) + ############################ Getters ############################# + ############################ ############################# + ############################ ############################# - @timing - def cmd_resume_line (self, line): - '''Resume active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self, - "resume", - self.cmd_stop_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - opts = parser.parse_args(line.split()) - if opts is None: - return RC_ERR("bad command line parameters") + # return verbose level of the logger + def get_verbose (self): + return self.logger.get_verbose() - return self.cmd_resume(opts.ports) + # is the client on read only mode ? + def is_read_only (self): + return self.read_only + # is the client connected ? + def is_connected (self): + return self.connected and self.comm_link.is_connected - @timing - def cmd_stop_line (self, line): - '''Stop active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self, - "stop", - self.cmd_stop_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - opts = parser.parse_args(line.split()) - if opts is None: - return RC_ERR("bad command line parameters") + # get connection info + def get_connection_info (self): + return self.connection_info - return self.cmd_stop(opts.ports) + # get supported commands by the server + def get_server_supported_cmds(self): + return self.supported_cmds - @timing - def cmd_pause_line (self, line): - '''Pause active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self, - "pause", - self.cmd_stop_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) + # get server version + def get_server_version(self): + return self.server_version - opts = parser.parse_args(line.split()) - if opts is None: - return RC_ERR("bad command line parameters") + # get server system info + def get_server_system_info(self): + return self.system_info - return self.cmd_pause(opts.ports) + # get port count + def get_port_count(self): + return len(self.ports) - @timing - def cmd_update_line (self, line): - '''Update port(s) speed currently active\n''' - parser = parsing_opts.gen_parser(self, - "update", - self.cmd_update_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, + # returns the port object + def get_port (self, port_id): + port_id = self.ports.get(port_id, None) + if (port_id != None): + return port_id + else: + raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports()) + + + # get all ports as IDs + def get_all_ports (self): + return self.ports.keys() + + # get all acquired ports + def get_acquired_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_acquired()] + + # get all active ports (TX or pause) + def get_active_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_active()] + + # get paused ports + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + # get all TX ports + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + + + ############################ Commands ############################# + ############################ ############################# + ############################ ############################# + + + # set the log on verbose level + def set_verbose (self, level): + self.logger.set_verbose(level) + + + # connects to the server + # mode can be: + # 'RO' - read only + # 'RW' - read/write + # 'RWF' - read write forced (take ownership) + def connect (self, mode = "RW"): + modes = ['RO', 'RW', 'RWF'] + if not mode in modes: + raise STLArgumentError('mode', mode, modes) + + rc = self.__connect(mode) + self.logger.annotate(rc) + + if not rc: + raise STLError(rc) + + return rc + + + # disconnects from the server + def disconnect (self, annotate = True): + rc = self.__disconnect() + if annotate: + self.logger.annotate(rc, "Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], + self.connection_info['sync_port'])) + if not rc: + raise STLError(rc) + + return rc + + + # teardown - call after test is done + def teardown (self, stop_traffic = True): + + # stop traffic + if stop_traffic: + rc = self.stop() + if not rc: + raise STLError(rc) + + # disconnect + rc = self.__disconnect() + if not rc: + raise STLError(rc) + + + + return rc + + + # pings the server on the RPC channel + @__verify_connected + def ping(self): + rc = self.__ping() + self.logger.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], + self.connection_info['sync_port'])) + + if not rc: + raise STLError(rc) + + return rc + + + # reset the server by performing + # force acquire, stop, and remove all streams + @__verify_connected + def reset(self): + + rc = self.__acquire(force = True) + self.logger.annotate(rc, "Force acquiring all ports:") + if not rc: + raise STLError(rc) + + + # force stop all ports + rc = self.__stop_traffic(self.get_all_ports(), True) + self.logger.annotate(rc,"Stop traffic on all ports:") + if not rc: + raise STLError(rc) + + + # remove all streams + rc = self.__remove_all_streams(self.get_all_ports()) + self.logger.annotate(rc,"Removing all streams from all ports:") + if not rc: + raise STLError(rc) + + # TODO: clear stats + return RC_OK() + + + # start cmd + @__verify_connected + def start (self, + profiles, + ports = None, + mult = "1", + force = False, + duration = -1, + dry = False, + total = False): + + + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + + # verify multiplier + mult_obj = parsing_opts.decode_multiplier(mult, + allow_update = False, + divide_count = len(ports) if total else 1) + if not mult_obj: + raise STLArgumentError('mult', mult) + + # some type checkings + + if not type(force) is bool: + raise STLArgumentError('force', force) + + if not isinstance(duration, (int, float)): + raise STLArgumentError('duration', duration) + + if not type(total) is bool: + raise STLArgumentError('total', total) + + + # process profiles + stream_list = [] + rc = self.__process_profiles(profiles, stream_list) + if not rc: + raise STLError(rc) + + + # dry run + if dry: + self.logger.log(format_text("\n*** DRY RUN ***", 'bold')) + + # call private method to start + rc = self.__start(ports, stream_list[0], mult_obj, force, duration, dry) + if not rc: + raise STLError(rc) + + return rc + + + # stop traffic on ports + @__verify_connected + def stop (self, ports = None): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__stop(ports) + if not rc: + raise STLError(rc) + + return rc + + + # update traffic + @__verify_connected + def update (self, ports = None, mult = "1", total = False): + + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + + # verify multiplier + mult_obj = parsing_opts.decode_multiplier(mult, + allow_update = True, + divide_count = len(ports) if total else 1) + if not mult_obj: + raise STLArgumentError('mult', mult) + + # verify total + if not type(total) is bool: + raise STLArgumentError('total', total) + + + # call low level functions + rc = self.__update(ports, mult_obj) + if not rc: + raise STLError(rc) + + return rc + + + # pause traffic on ports + @__verify_connected + def pause (self, ports = None): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__pause(ports) + if not rc: + raise STLError(rc) + + return rc + + + # resume traffic on ports + @__verify_connected + def resume (self, ports = None): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__resume(ports) + if not rc: + raise STLError(rc) + + return rc + + + # clear stats + def clear_stats (self, ports = None): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + rc = self.__clear_stats(ports) + if not rc: + raise STLError(rc) + + + + ############################ Line ############################# + ############################ Commands ############################# + ############################ ############################# + # console decorator + def __console(f): + def wrap(*args): + client = args[0] + + time1 = time.time() + + try: + rc = f(*args) + except STLError as e: + client.logger.log(format_text("\n" + e.brief() + "\n", 'bold')) + return + + # don't want to print on error + if not rc or rc.warn(): + return rc + + delta = time.time() - time1 + + + client.logger.log(format_time(delta) + "\n") + + return rc + + return wrap + + + @__console + def connect_line (self, line): + '''Connects to the TRex server''' + # define a parser + parser = parsing_opts.gen_parser(self, + "connect", + self.connect_line.__doc__, + parsing_opts.FORCE) + + opts = parser.parse_args(line.split()) + + if opts is None: + return + + # call the API + mode = "RWF" if opts.force else "RW" + self.connect(mode) + + + @__console + def disconnect_line (self, line): + self.disconnect() + + + @__console + def reset_line (self, line): + self.reset() + + + @__console + def start_line (self, line): + '''Start selected traffic in specified ports on TRex\n''' + # define a parser + parser = parsing_opts.gen_parser(self, + "start", + self.start_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL, + parsing_opts.TOTAL, + parsing_opts.FORCE, + parsing_opts.STREAM_FROM_PATH_OR_FILE, + parsing_opts.DURATION, + parsing_opts.MULTIPLIER_STRICT, + parsing_opts.DRY_RUN) + + opts = parser.parse_args(line.split()) + + + if opts is None: + return + + # pack the profile + profiles = [opts.file[0]] + + self.start(profiles, + opts.ports, + opts.mult, + opts.force, + opts.duration, + opts.dry, + opts.total) + + + + @__console + def stop_line (self, line): + '''Stop active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "stop", + self.stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return + + self.stop(opts.ports) + + + @__console + def update_line (self, line): + '''Update port(s) speed currently active\n''' + parser = parsing_opts.gen_parser(self, + "update", + self.update_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.MULTIPLIER, parsing_opts.TOTAL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line paramters") + return + + self.update(opts.ports, opts.mult, opts.total) + + + @__console + def pause_line (self, line): + '''Pause active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "pause", + self.pause_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return - # total has no meaning with percentage - its linear - if opts.total and (opts.mult['type'] != 'percentage'): - # if total was set - divide it between the ports - opts.mult['value'] = opts.mult['value'] / len(opts.ports) + self.pause(opts.ports) - return self.cmd_update(opts.ports, opts.mult) - @timing - def cmd_reset_line (self, line): - return self.cmd_reset() + @__console + def resume_line (self, line): + '''Resume active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "resume", + self.resume_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return + return self.resume(opts.ports) - def cmd_clear_line (self, line): + + @__console + def clear_stats_line (self, line): '''Clear cached local statistics\n''' # define a parser parser = parsing_opts.gen_parser(self, "clear", - self.cmd_clear_line.__doc__, + self.clear_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return + + self.clear_stats(opts.ports) - return self.cmd_clear(opts.ports) - def cmd_stats_line (self, line): + @__console + def show_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, @@ -1169,7 +1582,7 @@ class CTRexStatelessClient(object): return RC_ERR("bad command line parameters") # determine stats mask - mask = self._get_mask_keys(**self._filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) + mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) if not mask: # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS @@ -1214,7 +1627,7 @@ class CTRexStatelessClient(object): - @timing + @__console def cmd_validate_line (self, line): '''validates port(s) stream configuration\n''' @@ -1306,264 +1719,3 @@ class CTRexStatelessClient(object): return True - - ################################# - # ------ private methods ------ # - @staticmethod - def _get_mask_keys(ok_values={True}, **kwargs): - masked_keys = set() - for key, val in kwargs.iteritems(): - if val in ok_values: - masked_keys.add(key) - return masked_keys - - @staticmethod - def _filter_namespace_args(namespace, ok_values): - return {k: v for k, v in namespace.__dict__.items() if k in ok_values} - - def __verify_connected(f): - #@wraps(f) - def wrap(*args): - inst = args[0] - func_name = f.__name__ - - if not inst.stateless_client.is_connected(): - return RC_ERR("cannot execute '{0}' while client is disconnected".format(func_name)) - - ret = f(*args) - return ret - - return wrap - - - - ############################ API ############################# - ############################ ############################# - ############################ ############################# - - - ############################ Getters ############################# - ############################ ############################# - ############################ ############################# - - - # return verbose level of the logger - def get_verbose (self): - return self.logger.get_verbose() - - # is the client on read only mode ? - def is_read_only (self): - return self.read_only - - # is the client connected ? - def is_connected (self): - return self.connected and self.comm_link.is_connected - - - # get connection info - def get_connection_info (self): - return self.connection_info - - - # get supported commands by the server - def get_server_supported_cmds(self): - return self.supported_cmds - - # get server version - def get_server_version(self): - return self.server_version - - # get server system info - def get_server_system_info(self): - return self.system_info - - # get port count - def get_port_count(self): - return len(self.ports) - - # returns the port object - def get_port (self, port_id): - return self.ports.get(port_id, RC_ERR("invalid port id")) - - # get all ports as IDs - def get_all_ports (self): - return self.ports.keys() - - # get all acquired ports - def get_acquired_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_acquired()] - - # get all active ports (TX or pause) - def get_active_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_active()] - - # get paused ports - def get_paused_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_paused()] - - # get all TX ports - def get_transmitting_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_transmitting()] - - - ############################ Commands ############################# - ############################ ############################# - ############################ ############################# - - - # set the log on verbose level - def set_verbose (self, level): - self.logger.set_verbose(level) - - - # connects to the server - # mode can be: - # 'RO' - read only - # 'RW' - read/write - # 'RWF' - read write forced (take ownership) - def connect (self, mode = "RW"): - modes = ['RO', 'RW', 'RWF'] - if not mode in modes: - return RC_ERR("invalid mode '{0}'".format(mode)) - - rc = self.__connect(mode) - self.logger.annotate(rc) - - if not rc: - raise STLFailure(rc) - - return rc - - - # disconnects from the server - def disconnect (self, annotate = True): - rc = self.__disconnect() - if annotate: - self.logger.annotate(rc, "Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], - self.connection_info['sync_port'])) - if not rc: - raise STLFailure(rc) - - return rc - - - # teardown - call after test is done - def teardown (self): - # for now, its only disconnect - rc = self.__disconnect() - if not rc: - raise STLFailure(rc) - - return rc - - - # pings the server on the RPC channel - def ping(self): - rc = self.__ping() - self.logger.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], - self.connection_info['sync_port'])) - - if not rc: - raise STLFailure(rc) - - return rc - - - # reset the server by performing - # force acquire, stop, and remove all streams - def reset(self): - - rc = self.__acquire(force = True) - self.logger.annotate(rc, "Force acquiring all ports:") - if not rc: - raise STLFailure(rc) - - - # force stop all ports - rc = self.__stop_traffic(self.get_all_ports(), True) - self.logger.annotate(rc,"Stop traffic on all ports:") - if not rc: - raise STLFailure(rc) - - - # remove all streams - rc = self.__remove_all_streams(self.get_all_ports()) - self.logger.annotate(rc,"Removing all streams from all ports:") - if not rc: - raise STLFailure(rc) - - # TODO: clear stats - return RC_OK() - - # start cmd - def start (self, - profiles, - ports = None, - mult = "1", - force = False, - duration = -1, - dry = False): - - - # by default use all ports - if ports == None: - ports = self.get_all_ports() - - # verify valid port id list - rc = self.__verify_port_id_list(ports) - if not rc: - raise STLFailure(rc) - - - # verify multiplier - try: - result = parsing_opts.match_multiplier_common(mult) - except argparse.ArgumentTypeError: - raise STLFailure("bad format for multiplier: {0}".format(mult)) - - # process profiles - stream_list = [] - rc = self.__process_profiles(profiles, stream_list) - if not rc: - raise STLFailure(rc) - - - - - ############################ Line ############################# - ############################ Commands ############################# - ############################ ############################# - - def connect_line (self, line): - '''Connects to the TRex server''' - # define a parser - parser = parsing_opts.gen_parser(self, - "connect", - self.connect_line.__doc__, - parsing_opts.FORCE) - - opts = parser.parse_args(line.split()) - - if opts is None: - return RC_ERR("bad command line parameters") - - # call the API - if opts.force: - rc = self.connect(mode = "RWF") - else: - rc = self.connect(mode = "RW") - - - def disconnect_line (self, line): - return self.disconnect() - - def reset_line (self, line): - return self.reset() diff --git a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py index c1afda26..3cc32b1d 100755 --- a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py @@ -69,10 +69,19 @@ match_multiplier_help = """Multiplier should be passed in the following format: will provide a percentage of the line rate. examples : '-m 10', '-m 10kbps', '-m 10mpps', '-m 23%%' """ -def match_multiplier_common(val, strict_abs = True): - # on strict absolute we do not allow +/- - if strict_abs: +# decodes multiplier +# if allow_update - no +/- is allowed +# divide states between how many entities the +# value should be divided +def decode_multiplier(val, allow_update = False, divide_count = 1): + + # must be string + if not isinstance(val, str): + return None + + # do we allow updates ? +/- + if not allow_update: match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val) op = None else: @@ -136,19 +145,32 @@ def match_multiplier_common(val, strict_abs = True): else: result['op'] = "abs" + if result['op'] != 'percentage': + result['value'] = result['value'] / divide_count + return result else: - raise argparse.ArgumentTypeError(match_multiplier_help) + return None def match_multiplier(val): '''match some val against multiplier shortcut inputs ''' - return match_multiplier_common(val, strict_abs = False) + result = decode_multiplier(val, allow_update = False) + if not result: + raise argparse.ArgumentTypeError(match_multiplier_help) + + return val + def match_multiplier_strict(val): '''match some val against multiplier shortcut inputs ''' - return match_multiplier_common(val, strict_abs = True) + result = decode_multiplier(val, allow_update = True) + if not result: + raise argparse.ArgumentTypeError(match_multiplier_help) + + return val + def is_valid_file(filename): if not os.path.isfile(filename): @@ -287,7 +309,7 @@ class CCmdArgParser(argparse.ArgumentParser): opts.ports = self.stateless_client.get_all_ports() # so maybe we have ports configured - elif (getattr(opts, "ports", None) == []): + elif getattr(opts, "ports", None): for port in opts.ports: if not self.stateless_client._validate_port_list([port]): self.error("port id '{0}' is not a valid port id\n".format(port)) diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py index 337f0a70..aada5bfc 100644 --- a/scripts/automation/trex_control_plane/common/trex_types.py +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -14,16 +14,17 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'] # simple class to represent complex return value class RC(): - def __init__ (self, rc = None, data = None): + def __init__ (self, rc = None, data = None, is_warn = False): self.rc_list = [] if (rc != None): - tuple_rc = namedtuple('RC', ['rc', 'data']) - self.rc_list.append(tuple_rc(rc, data)) + tuple_rc = namedtuple('RC', ['rc', 'data', 'is_warn']) + self.rc_list.append(tuple_rc(rc, data, is_warn)) def __nonzero__ (self): return self.good() + def add (self, rc): self.rc_list += rc.rc_list @@ -33,6 +34,9 @@ class RC(): def bad (self): return not self.good() + def warn (self): + return any([x.is_warn for x in self.rc_list]) + def data (self): d = [x.data if x.rc else "" for x in self.rc_list] return (d if len(d) != 1 else d[0]) @@ -83,3 +87,5 @@ def RC_OK(data = ""): def RC_ERR (err): return RC(False, err) +def RC_WARN (warn): + return RC(True, warn, is_warn = True) diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 72cdcb0d..8f070959 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -382,7 +382,7 @@ class TRexConsole(TRexGeneralCmd): def do_start(self, line): '''Start selected traffic in specified port(s) on TRex\n''' - self.stateless_client.cmd_start_line(line) + self.stateless_client.start_line(line) @@ -395,7 +395,7 @@ class TRexConsole(TRexGeneralCmd): def do_stop(self, line): '''stops port(s) transmitting traffic\n''' - self.stateless_client.cmd_stop_line(line) + self.stateless_client.stop_line(line) def help_stop(self): self.do_stop("-h") @@ -405,7 +405,7 @@ class TRexConsole(TRexGeneralCmd): def do_update(self, line): '''update speed of port(s)currently transmitting traffic\n''' - self.stateless_client.cmd_update_line(line) + self.stateless_client.update_line(line) def help_update (self): self.do_update("-h") @@ -415,14 +415,14 @@ class TRexConsole(TRexGeneralCmd): def do_pause(self, line): '''pause port(s) transmitting traffic\n''' - self.stateless_client.cmd_pause_line(line) + self.stateless_client.pause_line(line) ############# resume @verify_connected_and_rw def do_resume(self, line): '''resume port(s) transmitting traffic\n''' - self.stateless_client.cmd_resume_line(line) + self.stateless_client.resume_line(line) @@ -444,7 +444,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_stats(self, line): '''Fetch statistics from TRex server by port\n''' - self.stateless_client.cmd_stats_line(line) + self.stateless_client.stats_line(line) def help_stats(self): @@ -453,7 +453,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_streams(self, line): '''Fetch statistics from TRex server by port\n''' - self.stateless_client.cmd_streams_line(line) + self.stateless_client.show_streams_line(line) def help_streams(self): @@ -462,7 +462,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_clear(self, line): '''Clear cached local statistics\n''' - self.stateless_client.cmd_clear_line(line) + self.stateless_client.clear_stats_line(line) def help_clear(self): @@ -710,7 +710,7 @@ def main(): print "\n\n*** Caught Ctrl + C... Exiting...\n\n" finally: - stateless_client.teardown() + stateless_client.teardown(stop_traffic = False) if __name__ == '__main__': diff --git a/scripts/stl_test_example.py b/scripts/stl_test_example.py index e9202ca6..9a296bec 100644 --- a/scripts/stl_test_example.py +++ b/scripts/stl_test_example.py @@ -1,20 +1,29 @@ import os import sys +import time api_path = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(api_path, 'automation/trex_control_plane/client/')) -from trex_stateless_client import CTRexStatelessClient, STLFailure +from trex_stateless_client import CTRexStatelessClient, STLError c = CTRexStatelessClient() try: c.connect() + c.stop() #before_ipackets = x.get_stats().get_rel('ipackets') - c.start(profiles = 'stl/imix_3pkt.yaml', ports = [1]) + + c.start(profiles = 'stl/imix_3pkt.yaml', ports = [0,1], mult = "1gbps") + + for i in xrange(0, 10): + time.sleep(5) + c.update(ports = [0,1], mult = "1gbps+") + #c.cmd_wait_on_traffic() + #c.stop() -except STLFailure as e: +except STLError as e: print e finally: c.teardown() -- cgit 1.2.3-korg From b726b5682fca2b1e032380401457d1afb47e1713 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 20 Jan 2016 11:09:25 -0500 Subject: draft #3 --- .../trex_control_plane/client/trex_async_client.py | 52 +-- .../trex_control_plane/client/trex_port.py | 31 ++ .../client/trex_stateless_client.py | 409 ++++++++++++++------- .../client_utils/jsonrpc_client.py | 3 - .../trex_control_plane/common/trex_stats.py | 4 + .../trex_control_plane/common/trex_types.py | 6 +- .../trex_control_plane/console/trex_console.py | 50 ++- .../trex_control_plane/console/trex_tui.py | 6 +- scripts/stl_test_example.py | 19 +- 9 files changed, 391 insertions(+), 189 deletions(-) (limited to 'scripts/automation/trex_control_plane/console') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 9828c838..ef4c48f9 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -159,7 +159,7 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 - self.async_barriers = [] + self.async_barrier = None self.connected = False @@ -171,10 +171,6 @@ class CTRexAsyncClient(): self.tr = "tcp://{0}:{1}".format(self.server, self.port) - msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) - - self.logger.log(msg) - # Socket to talk to server self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) @@ -190,8 +186,7 @@ class CTRexAsyncClient(): self.connected = True - # send a barrier and wait for ack - rc = self.block_on_stats() + rc = self.barrier() if not rc: self.disconnect() return rc @@ -216,14 +211,14 @@ class CTRexAsyncClient(): # done self.connected = False + # thread function def _run (self): - # socket must be created on the same thread - self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.RCVTIMEO, 5000) + self.socket.connect(self.tr) got_data = False @@ -299,32 +294,37 @@ class CTRexAsyncClient(): # async barrier handling routine def handle_async_barrier (self, type, data): - - for b in self.async_barriers: - if b['key'] == type: - b['ack'] = True + if self.async_barrier['key'] == type: + self.async_barrier['ack'] = True - # force update a new snapshot from the server - def block_on_stats(self, timeout = 5): - + # block on barrier for async channel + def barrier(self, timeout = 5): + # set a random key key = random.getrandbits(32) - barrier = {'key': key, 'ack' : False} - - # add to the queue - self.async_barriers.append(barrier) - - rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) - if not rc: - return rc + self.async_barrier = {'key': key, 'ack': False} + # expr time expr = time.time() + timeout - while not barrier['ack']: - time.sleep(0.001) + + while not self.async_barrier['ack']: + + # inject + rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + # fast loop + for i in xrange(0, 100): + if self.async_barrier['ack']: + break + time.sleep(0.001) + if time.time() > expr: return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) return RC_OK() + diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py index 66d87f9d..7e5942d4 100644 --- a/scripts/automation/trex_control_plane/client/trex_port.py +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -198,6 +198,9 @@ class Port(object): # remove stream from port def remove_stream (self, stream_id): + if not self.is_acquired(): + return self.err("port is not owned") + if not stream_id in self.streams: return self.err("stream {0} does not exists".format(stream_id)) @@ -219,6 +222,9 @@ class Port(object): # remove all the streams def remove_all_streams (self): + if not self.is_acquired(): + return self.err("port is not owned") + params = {"handler": self.handler, "port_id": self.port_id} @@ -244,6 +250,10 @@ class Port(object): # start traffic def start (self, mul, duration): + + if not self.is_acquired(): + return self.err("port is not owned") + if self.state == self.STATE_DOWN: return self.err("Unable to start traffic - port is down") @@ -270,6 +280,9 @@ class Port(object): # with force ignores the cached state and sends the command def stop (self, force = False): + if not self.is_acquired(): + return self.err("port is not owned") + if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): return self.err("port is not transmitting") @@ -287,6 +300,9 @@ class Port(object): def pause (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -305,6 +321,9 @@ class Port(object): def resume (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_PAUSE) : return self.err("port is not in pause mode") @@ -322,6 +341,10 @@ class Port(object): def update (self, mul): + + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -338,6 +361,9 @@ class Port(object): def validate (self): + if not self.is_acquired(): + return self.err("port is not owned") + if (self.state == self.STATE_DOWN): return self.err("port is down") @@ -413,6 +439,11 @@ class Port(object): def clear_stats(self): return self.port_stats.clear_stats() + + def get_stats (self): + return self.port_stats.get_stats() + + def invalidate_stats(self): return self.port_stats.invalidate() diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 28e55088..69cc9838 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -51,6 +51,12 @@ class STLStateError(STLError): self.msg = "Operation '{0}' is not valid while '{1}'".format(op, state) +# port state error +class STLPortStateError(STLError): + def __init__ (self, port, op, state): + self.msg = "Operation '{0}' on port '{1}' is not valid for state '{2}'".format(op, port, state) + + # raised when argument is not valid for operation class STLArgumentError(STLError): def __init__ (self, name, got, valid_values = None, extended = None): @@ -62,6 +68,12 @@ class STLArgumentError(STLError): self.msg += "\n{0}".format(extended) +class STLTimeoutError(STLError): + def __init__ (self, timeout): + self.msg = "Timeout: operation took more than '{0}' seconds".format(timeout) + + + ############################ logger ############################# ############################ ############################# ############################ ############################# @@ -109,9 +121,38 @@ class LoggerApi(object): def async_log (self, msg, level = VERBOSE_REGULAR, newline = True): self.log(msg, level, newline) - # annotates an action with a RC - writes to log the result - def annotate (self, rc, desc = None, show_status = True): - rc.annotate(self.log, desc, show_status) + + def pre_cmd (self, desc): + self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) + self.flush() + + def post_cmd (self, rc): + if rc: + self.log(format_text("[SUCCESS]\n", 'green', 'bold')) + else: + self.log(format_text("[FAILED]\n", 'red', 'bold')) + + + def log_cmd (self, desc): + self.pre_cmd(desc) + self.post_cmd(True) + + + # supress object getter + def supress (self): + class Supress(object): + def __init__ (self, logger): + self.logger = logger + + def __enter__ (self): + self.saved_level = self.logger.get_verbose() + self.logger.set_verbose(LoggerApi.VERBOSE_QUIET) + + def __exit__ (self, type, value, traceback): + self.logger.set_verbose(self.saved_level) + + return Supress(self) + # default logger - to stdout @@ -288,7 +329,7 @@ class AsyncEventHandler(object): def __async_event_port_forced_acquired (self, port_id): self.client.ports[port_id].async_event_forced_acquired() - self.client.read_only = True + def __async_event_server_stopped (self): self.client.connected = False @@ -386,7 +427,7 @@ class CTRexStatelessClient(object): server = "localhost", sync_port = 4501, async_port = 4500, - verbose_level = LoggerApi.VERBOSE_REGULAR, + verbose_level = LoggerApi.VERBOSE_QUIET, logger = None, virtual = False): @@ -398,7 +439,6 @@ class CTRexStatelessClient(object): self.server_version = {} self.system_info = {} self.session_id = random.getrandbits(32) - self.read_only = False self.connected = False # logger @@ -634,8 +674,7 @@ class CTRexStatelessClient(object): # connect to server - # mode can be RW - read / write, RWF - read write with force , RO - read only - def __connect(self, mode = "RW"): + def __connect(self): # first disconnect if already connected if self.is_connected(): @@ -645,12 +684,18 @@ class CTRexStatelessClient(object): self.connected = False # connect sync channel + self.logger.pre_cmd("connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port'])) rc = self.comm_link.connect() + self.logger.post_cmd(rc) + if not rc: return rc # connect async channel + self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port'])) rc = self.async_client.connect() + self.logger.post_cmd(rc) + if not rc: return rc @@ -694,32 +739,6 @@ class CTRexStatelessClient(object): if not rc: return rc - # acquire all ports - if mode == "RW": - rc = self.__acquire(force = False) - - # fallback to read only if failed - if not rc: - self.logger.annotate(rc, show_status = False) - self.logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) - - self.__release(self.get_acquired_ports()) - self.read_only = True - else: - self.read_only = False - - elif mode == "RWF": - rc = self.__acquire(force = True) - if not rc: - return rc - self.read_only = False - - elif mode == "RO": - # no acquire on read only - rc = RC_OK() - self.read_only = True - - self.connected = True return RC_OK() @@ -760,27 +779,36 @@ class CTRexStatelessClient(object): return rc + self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(port_id_list)) rc = self.__remove_all_streams(port_id_list) - self.logger.annotate(rc, "Removing all streams from port(s) {0}:".format(port_id_list)) - if rc.bad(): + self.logger.post_cmd(rc) + + if not rc: return rc + self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) rc = self.__add_stream_pack(stream_list, port_id_list) - self.logger.annotate(rc, "Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) - if rc.bad(): + self.logger.post_cmd(rc) + + if not rc: return rc # when not on dry - start the traffic , otherwise validate only if not dry: + + self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(port_id_list)) rc = self.__start_traffic(mult, duration, port_id_list) - self.logger.annotate(rc,"Starting traffic on port(s) {0}:".format(port_id_list)) - + self.logger.post_cmd(rc) + return rc else: + + self.logger.pre_cmd("Validating traffic profile on port(s) {0}:".format(port_id_list)) rc = self.__validate(port_id_list) - self.logger.annotate(rc,"Validating traffic profile on port(s) {0}:".format(port_id_list)) - + self.logger.post_cmd(rc) + + if rc.bad(): return rc @@ -801,8 +829,11 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_WARN(msg) + + self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(port_id_list)) rc = self.__stop_traffic(active_ports) - self.logger.annotate(rc, "Stopping traffic on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + if not rc: return rc @@ -819,8 +850,9 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_WARN(msg) + self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(port_id_list)) rc = self.__update_traffic(mult, active_ports) - self.logger.annotate(rc, "Updating traffic on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) return rc @@ -836,8 +868,10 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_WARN(msg) + self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(port_id_list)) rc = self.__pause_traffic(active_ports) - self.logger.annotate(rc, "Pausing traffic on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + return rc @@ -852,24 +886,51 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_WARN(msg) + self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(port_id_list)) rc = self.__resume_traffic(active_ports) - self.logger.annotate(rc, "Resume traffic on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + return rc # clear stats - def __clear_stats(self, port_id_list): + def __clear_stats(self, port_id_list, clear_global): for port_id in port_id_list: self.ports[port_id].clear_stats() - self.global_stats.clear_stats() + if clear_global: + self.global_stats.clear_stats() + self.logger.pre_cmd("clearing stats on port(s) {0}:".format(port_id_list)) rc = RC_OK() - self.logger.annotate(rc, "clearing stats on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + return RC + # get stats + def __get_stats (self, port_id_list): + stats = {} + + stats['global'] = self.global_stats.get_stats() + + total = {} + for port_id in port_id_list: + port_stats = self.ports[port_id].get_stats() + stats["port {0}".format(port_id)] = port_stats + + for k, v in port_stats.iteritems(): + if not k in total: + total[k] = v + else: + total[k] += v + + stats['total'] = total + + return stats + + def __process_profiles (self, profiles, out): for profile in (profiles if isinstance(profiles, list) else [profiles]): @@ -883,7 +944,6 @@ class CTRexStatelessClient(object): stream_list = self.streams_db.load_yaml_file(profile) except Exception as e: rc = RC_ERR(str(e)) - self.logger.annotate(rc) return rc out.append(stream_list) @@ -895,31 +955,6 @@ class CTRexStatelessClient(object): return RC_OK() - - # stream list - if opts.db: - stream_list = self.streams_db.get_stream_pack(opts.db) - rc = RC(stream_list != None) - self.logger.annotate(rc,"Load stream pack (from DB):") - if rc.bad(): - return RC_ERR("Failed to load stream pack") - - else: - # load streams from file - stream_list = None - try: - stream_list = self.streams_db.load_yaml_file(opts.file[0]) - except Exception as e: - s = str(e) - rc=RC_ERR(s) - self.logger.annotate(rc) - return rc - - rc = RC(stream_list != None) - self.logger.annotate(rc,"Load stream pack (from file):") - if stream_list == None: - return RC_ERR("Failed to load stream pack") - ############ functions used by other classes but not users ############## def _verify_port_id_list (self, port_id_list): @@ -938,7 +973,8 @@ class CTRexStatelessClient(object): return False # check each item of the sequence - return port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list]) + return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list])) + # transmit request on the RPC link @@ -986,8 +1022,10 @@ class CTRexStatelessClient(object): # validate port(s) profile def cmd_validate (self, port_id_list): + self.logger.pre_cmd("Validating streams on port(s) {0}:".format(port_id_list)) rc = self.__validate(port_id_list) - self.logger.annotate(rc,"Validating streams on port(s) {0}:".format(port_id_list)) + self.logger.post_cmd(rc) + return rc @@ -1025,17 +1063,22 @@ class CTRexStatelessClient(object): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} - # verify decorator - throws exception is client is disconnected - def __verify_connected(f): - def wrap(*args, **kwargs): - inst = args[0] - func_name = f.__name__ + # API decorator - double wrap because of argument + def __api_check(connected = True): + + def wrap (f): + def wrap2(*args, **kwargs): + client = args[0] - if not inst.is_connected(): - raise STLStateError(func_name, 'disconnected') + func_name = f.__name__ - ret = f(*args, **kwargs) - return ret + # check connection + if connected and not client.is_connected(): + raise STLStateError(func_name, 'disconnected') + + ret = f(*args, **kwargs) + return ret + return wrap2 return wrap @@ -1044,7 +1087,14 @@ class CTRexStatelessClient(object): ############################ API ############################# ############################ ############################# ############################ ############################# + def __enter__ (self): + self.connect(mode = "RWF") + return self + def __exit__ (self, type, value, traceback): + if self.get_active_ports(): + self.stop(self.get_active_ports()) + self.disconnect() ############################ Getters ############################# ############################ ############################# @@ -1056,8 +1106,8 @@ class CTRexStatelessClient(object): return self.logger.get_verbose() # is the client on read only mode ? - def is_read_only (self): - return self.read_only + def is_all_ports_acquired (self): + return not (self.get_all_ports() == self.get_acquired_ports()) # is the client connected ? def is_connected (self): @@ -1088,9 +1138,9 @@ class CTRexStatelessClient(object): # returns the port object def get_port (self, port_id): - port_id = self.ports.get(port_id, None) - if (port_id != None): - return port_id + port = self.ports.get(port_id, None) + if (port != None): + return port else: raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports()) @@ -1139,26 +1189,66 @@ class CTRexStatelessClient(object): # 'RO' - read only # 'RW' - read/write # 'RWF' - read write forced (take ownership) + @__api_check(False) def connect (self, mode = "RW"): modes = ['RO', 'RW', 'RWF'] if not mode in modes: raise STLArgumentError('mode', mode, modes) - rc = self.__connect(mode) - self.logger.annotate(rc) + rc = self.__connect() + if not rc: + raise STLError(rc) + + # acquire all ports for 'RW' or 'RWF' + if (mode == "RW") or (mode == "RWF"): + self.acquire(ports = self.get_all_ports(), force = True if mode == "RWF" else False) + + + + + # acquire ports + # this is not needed if connect was called with "RW" or "RWF" + # but for "RO" this might be needed + @__api_check(True) + def acquire (self, ports = None, force = False): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify ports + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # verify valid port id list + if force: + self.logger.pre_cmd("Force acquiring ports {0}:".format(ports)) + else: + self.logger.pre_cmd("Acquiring ports {0}:".format(ports)) + + rc = self.__acquire(ports, force) + + self.logger.post_cmd(rc) if not rc: + self.__release(ports) raise STLError(rc) - return rc + + + # force connect syntatic sugar + @__api_check(False) + def fconnect (self): + self.connect(mode = "RWF") # disconnects from the server - def disconnect (self, annotate = True): + @__api_check(False) + def disconnect (self, log = True): rc = self.__disconnect() - if annotate: - self.logger.annotate(rc, "Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], - self.connection_info['sync_port'])) + if log: + self.logger.log_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], + self.connection_info['sync_port'])) if not rc: raise STLError(rc) @@ -1166,29 +1256,27 @@ class CTRexStatelessClient(object): # teardown - call after test is done + # NEVER throws an exception + @__api_check(False) def teardown (self, stop_traffic = True): - # stop traffic - if stop_traffic: - rc = self.stop() - if not rc: - raise STLError(rc) + # try to stop traffic + if stop_traffic and self.get_active_ports(): + try: + self.stop() + except STLError: + pass # disconnect - rc = self.__disconnect() - if not rc: - raise STLError(rc) + self.__disconnect() - - - return rc # pings the server on the RPC channel - @__verify_connected + @__api_check(True) def ping(self): rc = self.__ping() - self.logger.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], + self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], self.connection_info['sync_port'])) if not rc: @@ -1199,25 +1287,31 @@ class CTRexStatelessClient(object): # reset the server by performing # force acquire, stop, and remove all streams - @__verify_connected + @__api_check(True) def reset(self): + self.logger.pre_cmd("Force acquiring all ports:") rc = self.__acquire(force = True) - self.logger.annotate(rc, "Force acquiring all ports:") + self.logger.post_cmd(rc) + if not rc: raise STLError(rc) # force stop all ports + self.logger.pre_cmd("Stop traffic on all ports:") rc = self.__stop_traffic(self.get_all_ports(), True) - self.logger.annotate(rc,"Stop traffic on all ports:") + self.logger.post_cmd(rc) + if not rc: raise STLError(rc) # remove all streams + self.logger.pre_cmd("Removing all streams from all ports:") rc = self.__remove_all_streams(self.get_all_ports()) - self.logger.annotate(rc,"Removing all streams from all ports:") + self.logger.post_cmd(rc) + if not rc: raise STLError(rc) @@ -1226,7 +1320,7 @@ class CTRexStatelessClient(object): # start cmd - @__verify_connected + @__api_check(True) def start (self, profiles, ports = None, @@ -1246,7 +1340,6 @@ class CTRexStatelessClient(object): if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = False, @@ -1272,21 +1365,20 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - # dry run if dry: self.logger.log(format_text("\n*** DRY RUN ***", 'bold')) # call private method to start + rc = self.__start(ports, stream_list[0], mult_obj, force, duration, dry) if not rc: raise STLError(rc) - return rc # stop traffic on ports - @__verify_connected + @__api_check(True) def stop (self, ports = None): # by default use all ports if ports == None: @@ -1301,11 +1393,10 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - return rc # update traffic - @__verify_connected + @__api_check(True) def update (self, ports = None, mult = "1", total = False): # by default use all ports @@ -1339,7 +1430,7 @@ class CTRexStatelessClient(object): # pause traffic on ports - @__verify_connected + @__api_check(True) def pause (self, ports = None): # by default use all ports if ports == None: @@ -1358,7 +1449,7 @@ class CTRexStatelessClient(object): # resume traffic on ports - @__verify_connected + @__api_check(True) def resume (self, ports = None): # by default use all ports if ports == None: @@ -1377,7 +1468,8 @@ class CTRexStatelessClient(object): # clear stats - def clear_stats (self, ports = None): + @__api_check(False) + def clear_stats (self, ports = None, clear_global = True): # by default use all ports if ports == None: ports = self.get_all_ports() @@ -1387,11 +1479,64 @@ class CTRexStatelessClient(object): if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - rc = self.__clear_stats(ports) + # verify clear global + if not type(clear_global) is bool: + raise STLArgumentError('clear_global', clear_global) + + + rc = self.__clear_stats(ports, clear_global) if not rc: raise STLError(rc) + # get stats + @__api_check(False) + def get_stats (self, ports = None, async_barrier = True): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # check async barrier + if not type(async_barrier) is bool: + raise STLArgumentError('async_barrier', async_barrier) + + + # if the user requested a barrier - use it + if async_barrier: + rc = self.async_client.barrier() + if not rc: + raise STLError(rc) + + return self.__get_stats(ports) + + + # wait while traffic is on, on timeout throw STLTimeoutError + @__api_check(True) + def wait_on_traffic (self, ports = None, timeout = 60): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + expr = time.time() + timeout + + # wait while any of the required ports are active + while set(self.get_active_ports()).intersection(ports): + time.sleep(0.01) + if time.time() > expr: + raise STLTimeoutError(timeout) + + + ############################ Line ############################# ############################ Commands ############################# @@ -1406,7 +1551,7 @@ class CTRexStatelessClient(object): try: rc = f(*args) except STLError as e: - client.logger.log(format_text("\n" + e.brief() + "\n", 'bold')) + client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) return # don't want to print on error @@ -1567,19 +1712,19 @@ class CTRexStatelessClient(object): @__console - def show_stats_line (self, line): + def print_formatted_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "stats", - self.cmd_stats_line.__doc__, + self.show_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.STATS_MASK) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return None # determine stats mask mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) @@ -1587,7 +1732,9 @@ class CTRexStatelessClient(object): # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS - stats = self.cmd_stats(opts.ports, mask) + + self.print_formatted_stats() + stats = self.get_stats(opts.ports, mask) # print stats to screen for stat_type, stat_data in stats.iteritems(): diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index e08f5d69..05a32bc4 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -204,9 +204,6 @@ class JsonRpcClient(object): # Socket to talk to server self.transport = "tcp://{0}:{1}".format(self.server, self.port) - msg = "\nConnecting To RPC Server On {0}".format(self.transport) - self.logger.log(msg) - self.socket = self.context.socket(zmq.REQ) try: self.socket.connect(self.transport) diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py index a6add4ac..3d6ece7c 100755 --- a/scripts/automation/trex_control_plane/common/trex_stats.py +++ b/scripts/automation/trex_control_plane/common/trex_stats.py @@ -314,6 +314,10 @@ class CTRexStats(object): self.last_update_ts = time.time() + def get_stats (self): + # copy and return + return dict(self.latest_stats) + def clear_stats(self): self.reference_stats = self.latest_stats diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py index aada5bfc..a7ddacea 100644 --- a/scripts/automation/trex_control_plane/common/trex_types.py +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -46,7 +46,11 @@ class RC(): return (e if len(e) != 1 else e[0]) def __str__ (self): - return str(self.data()) if self else str(self.err()) + s = "" + for x in self.rc_list: + if x.data: + s += format_text("\n{0}".format(x.data), 'bold') + return s def prn_func (self, msg, newline = True): if newline: diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 8f070959..3ecbca06 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -29,7 +29,7 @@ import sys import tty, termios import trex_root_path from common.trex_streams import * -from client.trex_stateless_client import CTRexStatelessClient, LoggerApi +from client.trex_stateless_client import CTRexStatelessClient, LoggerApi, STLError from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts @@ -209,7 +209,7 @@ class TRexConsole(TRexGeneralCmd): print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold') return - if inst.stateless_client.is_read_only(): + if inst.stateless_client.is_all_ports_acquired(): print format_text("\n'{0}' cannot be executed on read only mode\n".format(func_name), 'bold') return @@ -253,7 +253,7 @@ class TRexConsole(TRexGeneralCmd): self.supported_rpc = None return stop - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): self.prompt = "TRex (read only) > " return stop @@ -670,25 +670,41 @@ def main(): # Stateless client connection logger = ConsoleLogger() - stateless_client = CTRexStatelessClient(options.user, - options.server, - options.port, - options.pub, - verbose_level, - logger) + stateless_client = CTRexStatelessClient(username = options.user, + server = options.server, + sync_port = options.port, + async_port = options.pub, + verbose_level = verbose_level, + logger = logger) # TUI or no acquire will give us READ ONLY mode - if options.tui or not options.acquire: - rc = stateless_client.connect("RO") - else: - rc = stateless_client.connect("RW") - - # unable to connect - bye - if not rc: - rc.annotate() + try: + stateless_client.connect("RO") + except STLError as e: + logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) return + if not options.tui and options.acquire: + try: + stateless_client.acquire() + except STLError as e: + logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) + logger.log(format_text("\nSwitching to read only mode - only few commands will be available", 'bold')) + + # if options.tui or not options.acquire: + # rc = stateless_client.connect("RO") + # else: + # try: + # rc = stateless_client.connect("RW") + # except STLError as e: + # logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) + # + # with logger.supress(): + # rc = stateless_client.connect("RO") + + + # a script mode if options.batch: cont = stateless_client.run_script_file(options.batch[0]) diff --git a/scripts/automation/trex_control_plane/console/trex_tui.py b/scripts/automation/trex_control_plane/console/trex_tui.py index dbbac02b..9e66a984 100644 --- a/scripts/automation/trex_control_plane/console/trex_tui.py +++ b/scripts/automation/trex_control_plane/console/trex_tui.py @@ -71,8 +71,7 @@ class TrexTUIDashBoard(TrexTUIPanel): allowed['c'] = self.key_actions['c'] - # thats it for read only - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): return allowed if len(self.stateless_client.get_transmitting_ports()) > 0: @@ -179,8 +178,7 @@ class TrexTUIPort(TrexTUIPanel): allowed['c'] = self.key_actions['c'] - # thats it for read only - if self.stateless_client.is_read_only(): + if self.stateless_client.is_all_ports_acquired(): return allowed if self.port.state == self.port.STATE_TX: diff --git a/scripts/stl_test_example.py b/scripts/stl_test_example.py index 9a296bec..5b36a9f6 100644 --- a/scripts/stl_test_example.py +++ b/scripts/stl_test_example.py @@ -10,15 +10,19 @@ from trex_stateless_client import CTRexStatelessClient, STLError c = CTRexStatelessClient() try: - c.connect() - c.stop() + for i in xrange(0, 100): + c.connect("RO") + c.disconnect() + + # + #c.stop() #before_ipackets = x.get_stats().get_rel('ipackets') - c.start(profiles = 'stl/imix_3pkt.yaml', ports = [0,1], mult = "1gbps") + #c.start(profiles = 'stl/imix_3pkt.yaml', ports = [0,1], mult = "1gbps") - for i in xrange(0, 10): - time.sleep(5) - c.update(ports = [0,1], mult = "1gbps+") + #for i in xrange(0, 10): + # time.sleep(5) + # c.update(ports = [0,1], mult = "1gbps+") #c.cmd_wait_on_traffic() #c.stop() @@ -26,5 +30,6 @@ try: except STLError as e: print e finally: - c.teardown() + pass + #c.teardown() -- cgit 1.2.3-korg From cc75f3f75e026bc17e526577c463ab5b524ebf22 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 21 Jan 2016 10:05:18 -0500 Subject: phase 4 --- .../trex_control_plane/client/trex_port.py | 10 +- .../client/trex_stateless_client.py | 404 +++++++++------------ .../client_utils/parsing_opts.py | 5 +- .../trex_control_plane/common/trex_stats.py | 45 ++- .../trex_control_plane/console/trex_console.py | 76 ++-- .../trex_control_plane/console/trex_tui.py | 127 +++---- 6 files changed, 325 insertions(+), 342 deletions(-) (limited to 'scripts/automation/trex_control_plane/console') diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py index 7e5942d4..94240f2a 100644 --- a/scripts/automation/trex_control_plane/client/trex_port.py +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -56,7 +56,7 @@ class Port(object): def err(self, msg): return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - def ok(self, data = "ACK"): + def ok(self, data = ""): return RC_OK(data) def get_speed_bps (self): @@ -283,8 +283,12 @@ class Port(object): if not self.is_acquired(): return self.err("port is not owned") - if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): - return self.err("port is not transmitting") + # port is already stopped + if not force: + if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS): + return self.ok() + + params = {"handler": self.handler, "port_id": self.port_id} diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 69cc9838..c5d7e053 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -67,7 +67,7 @@ class STLArgumentError(STLError): if extended: self.msg += "\n{0}".format(extended) - +# raised when timeout occurs class STLTimeoutError(STLError): def __init__ (self, timeout): self.msg = "Timeout: operation took more than '{0}' seconds".format(timeout) @@ -660,7 +660,7 @@ class CTRexStatelessClient(object): return rc - def __validate (self, port_id_list = None): + def __validate_traffic (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() @@ -821,17 +821,8 @@ class CTRexStatelessClient(object): # stop cmd def __stop (self, port_id_list): - # find the relveant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if not active_ports: - msg = "No active traffic on provided ports" - self.logger.log(format_text(msg, 'bold')) - return RC_WARN(msg) - - self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(port_id_list)) - rc = self.__stop_traffic(active_ports) + rc = self.__stop_traffic(port_id_list) self.logger.post_cmd(rc) if not rc: @@ -842,16 +833,8 @@ class CTRexStatelessClient(object): #update cmd def __update (self, port_id_list, mult): - # find the relevant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if not active_ports: - msg = "No active traffic on provided ports" - self.logger.log(format_text(msg, 'bold')) - return RC_WARN(msg) - self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(port_id_list)) - rc = self.__update_traffic(mult, active_ports) + rc = self.__update_traffic(mult, port_id_list) self.logger.post_cmd(rc) return rc @@ -860,16 +843,8 @@ class CTRexStatelessClient(object): # pause cmd def __pause (self, port_id_list): - # find the relevant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if not active_ports: - msg = "No active traffic on provided ports" - self.logger.log(format_text(msg, 'bold')) - return RC_WARN(msg) - self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(port_id_list)) - rc = self.__pause_traffic(active_ports) + rc = self.__pause_traffic(port_id_list) self.logger.post_cmd(rc) return rc @@ -878,16 +853,17 @@ class CTRexStatelessClient(object): # resume cmd def __resume (self, port_id_list): - # find the relveant ports - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(port_id_list)) + rc = self.__resume_traffic(port_id_list) + self.logger.post_cmd(rc) - if not active_ports: - msg = "No active traffic on porvided ports" - self.logger.log(format_text(msg, 'bold')) - return RC_WARN(msg) + return rc - self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(port_id_list)) - rc = self.__resume_traffic(active_ports) + + # validate port(s) profile + def __validate (self, port_id_list): + self.logger.pre_cmd("Validating streams on port(s) {0}:".format(port_id_list)) + rc = self.__validate_traffic(port_id_list) self.logger.post_cmd(rc) return rc @@ -985,52 +961,8 @@ class CTRexStatelessClient(object): def _transmit_batch(self, batch_list): return self.comm_link.transmit_batch(batch_list) - ############# helper functions section ############## - - - ########## port commands ############## - - ######################### Console (high level) API ######################### - - - - # clear stats - def cmd_clear(self, port_id_list): - - for port_id in port_id_list: - self.ports[port_id].clear_stats() - - self.global_stats.clear_stats() - - return RC_OK() - - - def cmd_invalidate (self, port_id_list): - for port_id in port_id_list: - self.ports[port_id].invalidate_stats() - - self.global_stats.invalidate() - - return RC_OK() - - - - - - - - - # validate port(s) profile - def cmd_validate (self, port_id_list): - self.logger.pre_cmd("Validating streams on port(s) {0}:".format(port_id_list)) - rc = self.__validate(port_id_list) - self.logger.post_cmd(rc) - - return rc - - # stats - def cmd_stats(self, port_id_list, stats_mask=set()): + def _get_formatted_stats(self, port_id_list, stats_mask=set()): stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) stats_obj = {} @@ -1038,15 +970,24 @@ class CTRexStatelessClient(object): stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type)) return stats_obj - def cmd_streams(self, port_id_list, streams_mask=set()): + def _get_streams(self, port_id_list, streams_mask=set()): streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask) return streams_obj - ############## High Level API With Parser ################ + def _invalidate_stats (self, port_id_list): + for port_id in port_id_list: + self.ports[port_id].invalidate_stats() + + self.global_stats.invalidate() + + return RC_OK() + + + ################################# # ------ private methods ------ # @@ -1089,6 +1030,7 @@ class CTRexStatelessClient(object): ############################ ############################# def __enter__ (self): self.connect(mode = "RWF") + self.reset() return self def __exit__ (self, type, value, traceback): @@ -1174,6 +1116,34 @@ class CTRexStatelessClient(object): if port_obj.is_transmitting()] + # get stats + def get_stats (self, ports = None, async_barrier = True): + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self._validate_port_list(ports) + if not rc: + raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) + + # check async barrier + if not type(async_barrier) is bool: + raise STLArgumentError('async_barrier', async_barrier) + + + # if the user requested a barrier - use it + if async_barrier: + rc = self.async_client.barrier() + if not rc: + raise STLError(rc) + + return self.__get_stats(ports) + + # return all async events + def get_events (self): + return self.event_handler.get_events() + ############################ Commands ############################# ############################ ############################# ############################ ############################# @@ -1252,7 +1222,6 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - return rc # teardown - call after test is done @@ -1282,7 +1251,6 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - return rc # reset the server by performing @@ -1315,8 +1283,7 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - # TODO: clear stats - return RC_OK() + self.clear_stats() # start cmd @@ -1333,7 +1300,7 @@ class CTRexStatelessClient(object): # by default use all ports if ports == None: - ports = self.get_all_ports() + ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) @@ -1380,9 +1347,10 @@ class CTRexStatelessClient(object): # stop traffic on ports @__api_check(True) def stop (self, ports = None): - # by default use all ports + + # by default the user means all the active ports if ports == None: - ports = self.get_all_ports() + ports = self.get_active_ports() # verify valid port id list rc = self._validate_port_list(ports) @@ -1399,16 +1367,15 @@ class CTRexStatelessClient(object): @__api_check(True) def update (self, ports = None, mult = "1", total = False): - # by default use all ports + # by default the user means all the active ports if ports == None: - ports = self.get_all_ports() + ports = self.get_active_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - # verify multiplier mult_obj = parsing_opts.decode_multiplier(mult, allow_update = True, @@ -1426,15 +1393,15 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - return rc # pause traffic on ports @__api_check(True) def pause (self, ports = None): - # by default use all ports + + # by default the user means all the TX ports if ports == None: - ports = self.get_all_ports() + ports = self.get_transmitting_ports() # verify valid port id list rc = self._validate_port_list(ports) @@ -1445,15 +1412,15 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - return rc # resume traffic on ports @__api_check(True) def resume (self, ports = None): - # by default use all ports + + # by default the user means all the paused ports if ports == None: - ports = self.get_all_ports() + ports = self.get_paused_ports() # verify valid port id list rc = self._validate_port_list(ports) @@ -1464,63 +1431,55 @@ class CTRexStatelessClient(object): if not rc: raise STLError(rc) - return rc - - # clear stats - @__api_check(False) - def clear_stats (self, ports = None, clear_global = True): - # by default use all ports + @__api_check(True) + def validate (self, ports = None): if ports == None: - ports = self.get_all_ports() + ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - # verify clear global - if not type(clear_global) is bool: - raise STLArgumentError('clear_global', clear_global) - - - rc = self.__clear_stats(ports, clear_global) + rc = self.__validate(ports) if not rc: raise STLError(rc) - # get stats + # clear stats @__api_check(False) - def get_stats (self, ports = None, async_barrier = True): + def clear_stats (self, ports = None, clear_global = True): + # by default use all ports if ports == None: - ports = self.get_all_ports() + ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) if not rc: raise STLArgumentError('ports', ports, valid_values = self.get_all_ports()) - # check async barrier - if not type(async_barrier) is bool: - raise STLArgumentError('async_barrier', async_barrier) + # verify clear global + if not type(clear_global) is bool: + raise STLArgumentError('clear_global', clear_global) - # if the user requested a barrier - use it - if async_barrier: - rc = self.async_client.barrier() - if not rc: - raise STLError(rc) + rc = self.__clear_stats(ports, clear_global) + if not rc: + raise STLError(rc) - return self.__get_stats(ports) + + # wait while traffic is on, on timeout throw STLTimeoutError @__api_check(True) def wait_on_traffic (self, ports = None, timeout = 60): - # by default use all ports + + # by default use all acquired ports if ports == None: - ports = self.get_all_ports() + ports = self.get_acquired_ports() # verify valid port id list rc = self._validate_port_list(ports) @@ -1536,7 +1495,9 @@ class CTRexStatelessClient(object): raise STLTimeoutError(timeout) - + # clear all async events + def clear_events (self): + self.event_handler.clear_events() ############################ Line ############################# ############################ Commands ############################# @@ -1554,16 +1515,11 @@ class CTRexStatelessClient(object): client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) return - # don't want to print on error - if not rc or rc.warn(): - return rc + # if got true - print time + if rc: + delta = time.time() - time1 + client.logger.log(format_time(delta) + "\n") - delta = time.time() - time1 - - - client.logger.log(format_time(delta) + "\n") - - return rc return wrap @@ -1583,19 +1539,24 @@ class CTRexStatelessClient(object): return # call the API - mode = "RWF" if opts.force else "RW" - self.connect(mode) + self.connect("RWF" if opts.force else "RW") + # true means print time + return True @__console def disconnect_line (self, line): self.disconnect() + @__console def reset_line (self, line): self.reset() + # true means print time + return True + @__console def start_line (self, line): @@ -1629,6 +1590,9 @@ class CTRexStatelessClient(object): opts.dry, opts.total) + # true means print time + return True + @__console @@ -1643,7 +1607,17 @@ class CTRexStatelessClient(object): if opts is None: return - self.stop(opts.ports) + # find the relevant ports + ports = list(set(self.get_active_ports()).intersection(opts.ports)) + + if not ports: + self.logger.log(format_text("No active traffic on provided ports\n", 'bold')) + return + + self.stop(ports) + + # true means print time + return True @__console @@ -1660,7 +1634,17 @@ class CTRexStatelessClient(object): if opts is None: return - self.update(opts.ports, opts.mult, opts.total) + # find the relevant ports + ports = list(set(self.get_active_ports()).intersection(opts.ports)) + + if not ports: + self.logger.log(format_text("No ports in valid state to update\n", 'bold')) + return + + self.update(ports, opts.mult, opts.total) + + # true means print time + return True @__console @@ -1675,7 +1659,17 @@ class CTRexStatelessClient(object): if opts is None: return - self.pause(opts.ports) + # find the relevant ports + ports = list(set(self.get_transmitting_ports()).intersection(opts.ports)) + + if not ports: + self.logger.log(format_text("No ports in valid state to pause\n", 'bold')) + return + + self.pause(ports) + + # true means print time + return True @__console @@ -1690,7 +1684,17 @@ class CTRexStatelessClient(object): if opts is None: return - return self.resume(opts.ports) + # find the relevant ports + ports = list(set(self.get_paused_ports()).intersection(opts.ports)) + + if not ports: + self.logger.log(format_text("No ports in valid state to resume\n", 'bold')) + return + + return self.resume(ports) + + # true means print time + return True @__console @@ -1711,8 +1715,9 @@ class CTRexStatelessClient(object): + @__console - def print_formatted_stats_line (self, line): + def show_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, @@ -1724,7 +1729,7 @@ class CTRexStatelessClient(object): opts = parser.parse_args(line.split()) if opts is None: - return None + return # determine stats mask mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) @@ -1732,137 +1737,60 @@ class CTRexStatelessClient(object): # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS - - self.print_formatted_stats() - stats = self.get_stats(opts.ports, mask) + stats_opts = trex_stats.ALL_STATS_OPTS.intersection(mask) + + stats = self._get_formatted_stats(opts.ports, mask) + # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) - return RC_OK() - def cmd_streams_line(self, line): + @__console + def show_streams_line(self, line): '''Fetch streams statistics from TRex server by port\n''' # define a parser parser = parsing_opts.gen_parser(self, "streams", - self.cmd_streams_line.__doc__, + self.show_streams_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.STREAMS_MASK)#, - #parsing_opts.FULL_OUTPUT) + parsing_opts.STREAMS_MASK) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line parameters") + return - streams = self.cmd_streams(opts.ports, set(opts.streams)) + streams = self._get_streams(opts.ports, set(opts.streams)) if not streams: - # we got no streams running - self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta")) - return RC_ERR("No streams found with desired filter.") + else: # print stats to screen for stream_hdr, port_streams_data in streams.iteritems(): text_tables.print_table_with_header(port_streams_data.text_table, header= stream_hdr.split(":")[0] + ":", untouched_header= stream_hdr.split(":")[1]) - return RC_OK() @__console - def cmd_validate_line (self, line): + def validate_line (self, line): '''validates port(s) stream configuration\n''' parser = parsing_opts.gen_parser(self, "validate", - self.cmd_validate_line.__doc__, + self.validate_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) if opts is None: - return RC_ERR("bad command line paramters") - - rc = self.cmd_validate(opts.ports) - return rc - - - def cmd_exit_line (self, line): - self.logger.log(format_text("Exiting\n", 'bold')) - # a way to exit - return RC_ERR("exit") - - - def cmd_wait_line (self, line): - '''wait for a period of time\n''' - - parser = parsing_opts.gen_parser(self, - "wait", - self.cmd_wait_line.__doc__, - parsing_opts.DURATION) - - opts = parser.parse_args(line.split()) - if opts is None: - return RC_ERR("bad command line parameters") - - delay_sec = opts.duration if (opts.duration > 0) else 1 - - self.logger.log(format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold')) - time.sleep(delay_sec) - - return RC_OK() - - # run a script of commands - def run_script_file (self, filename): - - self.logger.log(format_text("\nRunning script file '{0}'...".format(filename), 'bold')) - - rc = self.cmd_connect() - if rc.bad(): return - with open(filename) as f: - script_lines = f.readlines() - - cmd_table = {} - - # register all the commands - cmd_table['start'] = self.cmd_start_line - cmd_table['stop'] = self.cmd_stop_line - cmd_table['reset'] = self.cmd_reset_line - cmd_table['wait'] = self.cmd_wait_line - cmd_table['exit'] = self.cmd_exit_line - - for index, line in enumerate(script_lines, start = 1): - line = line.strip() - if line == "": - continue - if line.startswith("#"): - continue - - sp = line.split(' ', 1) - cmd = sp[0] - if len(sp) == 2: - args = sp[1] - else: - args = "" - - self.logger.log(format_text("Executing line {0} : '{1}'\n".format(index, line))) - - if not cmd in cmd_table: - print "\n*** Error at line {0} : '{1}'\n".format(index, line) - self.logger.log(format_text("unknown command '{0}'\n".format(cmd), 'bold')) - return False + self.validate(opts.ports) - rc = cmd_table[cmd](args) - if rc.bad(): - return False - - self.logger.log(format_text("\n[Done]", 'bold')) - return True + \ No newline at end of file diff --git a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py index 3cc32b1d..ba60c191 100755 --- a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py @@ -156,7 +156,7 @@ def decode_multiplier(val, allow_update = False, divide_count = 1): def match_multiplier(val): '''match some val against multiplier shortcut inputs ''' - result = decode_multiplier(val, allow_update = False) + result = decode_multiplier(val, allow_update = True) if not result: raise argparse.ArgumentTypeError(match_multiplier_help) @@ -165,7 +165,7 @@ def match_multiplier(val): def match_multiplier_strict(val): '''match some val against multiplier shortcut inputs ''' - result = decode_multiplier(val, allow_update = True) + result = decode_multiplier(val, allow_update = False) if not result: raise argparse.ArgumentTypeError(match_multiplier_help) @@ -252,6 +252,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'default': False, 'help': "Starts TUI in xterm window"}), + FULL_OUTPUT: ArgumentPack(['--full'], {'action': 'store_true', 'help': "Prompt full info in a JSON format"}), diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py index 3d6ece7c..9c2cd7f1 100755 --- a/scripts/automation/trex_control_plane/common/trex_stats.py +++ b/scripts/automation/trex_control_plane/common/trex_stats.py @@ -260,7 +260,7 @@ class CTRexStats(object): def __init__(self): self.reference_stats = None - self.latest_stats = {} + self.latest_stats = None self.last_update_ts = time.time() self.history = deque(maxlen = 10) @@ -314,13 +314,11 @@ class CTRexStats(object): self.last_update_ts = time.time() - def get_stats (self): - # copy and return - return dict(self.latest_stats) def clear_stats(self): self.reference_stats = self.latest_stats + def invalidate (self): self.latest_stats = {} @@ -337,6 +335,10 @@ class CTRexStats(object): return "N/A" if not format: + if not field in self.reference_stats: + print "REF: " + str(self.reference_stats) + print "BASE: " + str(self.latest_stats) + return (self.latest_stats[field] - self.reference_stats[field]) else: return format_num(self.latest_stats[field] - self.reference_stats[field], suffix) @@ -403,6 +405,24 @@ class CGlobalStats(CTRexStats): self.server_version = server_version self._ports_dict = ports_dict_ref + def get_stats (self): + stats = {} + + # absolute + stats['cpu_util'] = self.get("m_cpu_util") + stats['tx_bps'] = self.get("m_tx_bps") + stats['tx_pps'] = self.get("m_tx_pps") + + stats['rx_bps'] = self.get("m_rx_bps") + stats['rx_pps'] = self.get("m_rx_pps") + stats['rx_drop_bps'] = self.get("m_rx_drop_bps") + + # relatives + stats['queue_full'] = self.get_rel("m_total_queue_full") + + return stats + + def generate_stats(self): return OrderedDict([("connection", "{host}, Port {port}".format(host=self.connection_info.get("server"), port=self.connection_info.get("sync_port"))), @@ -475,6 +495,23 @@ class CPortStats(CTRexStats): return self + # for port we need to do something smarter + def get_stats (self): + stats = {} + + stats['opackets'] = self.get_rel("opackets") + stats['ipackets'] = self.get_rel("ipackets") + stats['obytes'] = self.get_rel("obytes") + stats['ibytes'] = self.get_rel("ibytes") + stats['oerrors'] = self.get_rel("oerrors") + stats['ierrors'] = self.get_rel("ierrors") + stats['tx_bps'] = self.get("m_total_tx_bps") + stats['tx_pps'] = self.get("m_total_tx_pps") + stats['rx_bps'] = self.get("m_total_rx_bps") + stats['rx_pps'] = self.get("m_total_rx_pps") + + return stats + def generate_stats(self): diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 3ecbca06..34494561 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -438,13 +438,13 @@ class TRexConsole(TRexGeneralCmd): def do_validate (self, line): '''validates port(s) stream configuration\n''' - self.stateless_client.cmd_validate_line(line) + self.stateless_client.validate_line(line) @verify_connected def do_stats(self, line): '''Fetch statistics from TRex server by port\n''' - self.stateless_client.stats_line(line) + self.stateless_client.show_stats_line(line) def help_stats(self): @@ -523,13 +523,8 @@ class TRexConsole(TRexGeneralCmd): return - set_window_always_on_top('trex_tui') - - save_verbose = self.stateless_client.get_verbose() - - self.stateless_client.set_verbose(self.stateless_client.logger.VERBOSE_QUIET) - self.tui.show() - self.stateless_client.set_verbose(save_verbose) + with self.stateless_client.logger.supress(): + self.tui.show() def help_tui (self): @@ -601,6 +596,51 @@ class TRexConsole(TRexGeneralCmd): do_h = do_history +# run a script of commands +def run_script_file (self, filename, stateless_client): + + self.logger.log(format_text("\nRunning script file '{0}'...".format(filename), 'bold')) + + with open(filename) as f: + script_lines = f.readlines() + + cmd_table = {} + + # register all the commands + cmd_table['start'] = stateless_client.start_line + cmd_table['stop'] = stateless_client.stop_line + cmd_table['reset'] = stateless_client.reset_line + + for index, line in enumerate(script_lines, start = 1): + line = line.strip() + if line == "": + continue + if line.startswith("#"): + continue + + sp = line.split(' ', 1) + cmd = sp[0] + if len(sp) == 2: + args = sp[1] + else: + args = "" + + stateless_client.logger.log(format_text("Executing line {0} : '{1}'\n".format(index, line))) + + if not cmd in cmd_table: + print "\n*** Error at line {0} : '{1}'\n".format(index, line) + stateless_client.logger.log(format_text("unknown command '{0}'\n".format(cmd), 'bold')) + return False + + rc = cmd_table[cmd](args) + if rc.bad(): + return False + + stateless_client.logger.log(format_text("\n[Done]", 'bold')) + + return True + + # def is_valid_file(filename): if not os.path.isfile(filename): @@ -609,6 +649,7 @@ def is_valid_file(filename): return filename + def setParserOptions(): parser = argparse.ArgumentParser(prog="trex_console.py") @@ -691,33 +732,20 @@ def main(): logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) logger.log(format_text("\nSwitching to read only mode - only few commands will be available", 'bold')) - - # if options.tui or not options.acquire: - # rc = stateless_client.connect("RO") - # else: - # try: - # rc = stateless_client.connect("RW") - # except STLError as e: - # logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) - # - # with logger.supress(): - # rc = stateless_client.connect("RO") - - # a script mode if options.batch: - cont = stateless_client.run_script_file(options.batch[0]) + cont = run_script_file(options.batch[0], stateless_client) if not cont: return # console - try: console = TRexConsole(stateless_client, options.verbose) logger.prompt_redraw = console.prompt_redraw if options.tui: + set_window_always_on_top('trex_tui') console.do_tui("") else: console.start() diff --git a/scripts/automation/trex_control_plane/console/trex_tui.py b/scripts/automation/trex_control_plane/console/trex_tui.py index 9e66a984..1e22b005 100644 --- a/scripts/automation/trex_control_plane/console/trex_tui.py +++ b/scripts/automation/trex_control_plane/console/trex_tui.py @@ -8,6 +8,7 @@ from client_utils import text_tables from collections import OrderedDict import datetime from cStringIO import StringIO +from client.trex_stateless_client import STLError class SimpleBar(object): def __init__ (self, desc, pattern): @@ -60,7 +61,7 @@ class TrexTUIDashBoard(TrexTUIPanel): def show (self): - stats = self.stateless_client.cmd_stats(self.ports, trex_stats.COMPACT) + stats = self.stateless_client._get_formatted_stats(self.ports, trex_stats.COMPACT) # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) @@ -88,64 +89,44 @@ class TrexTUIDashBoard(TrexTUIPanel): ######### actions def action_pause (self): - rc = self.stateless_client.pause_traffic(self.mng.ports) + try: + rc = self.stateless_client.pause(ports = self.mng.ports) + except STLError: + pass - ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.ports): - if rc_single.rc: - ports_succeeded.append(port_id) + return "" - if len(ports_succeeded) > 0: - return "paused traffic on port(s): {0}".format(ports_succeeded) - else: - return "" def action_resume (self): - rc = self.stateless_client.resume_traffic(self.mng.ports) - - ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.ports): - if rc_single.rc: - ports_succeeded.append(port_id) + try: + self.stateless_client.resume(ports = self.mng.ports) + except STLError: + pass - if len(ports_succeeded) > 0: - return "resumed traffic on port(s): {0}".format(ports_succeeded) - else: - return "" + return "" def action_raise (self): - mul = {'type': 'percentage', 'value': 5, 'op': 'add'} - rc = self.stateless_client.update_traffic(mul, self.mng.ports) + try: + self.stateless_client.update(mult = "5%+", ports = self.mng.ports) + except STLError: + pass - ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.ports): - if rc_single.rc: - ports_succeeded.append(port_id) + return "" - if len(ports_succeeded) > 0: - return "raised B/W by %5 on port(s): {0}".format(ports_succeeded) - else: - return "" def action_lower (self): - mul = {'type': 'percentage', 'value': 5, 'op': 'sub'} - rc = self.stateless_client.update_traffic(mul, self.mng.ports) - - ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.ports): - if rc_single.rc: - ports_succeeded.append(port_id) + try: + self.stateless_client.update(mult = "5%-", ports = self.mng.ports) + except STLError: + pass - if len(ports_succeeded) > 0: - return "lowered B/W by %5 on port(s): {0}".format(ports_succeeded) - else: - return "" + return "" def action_clear (self): - self.stateless_client.cmd_clear(self.mng.ports) + self.stateless_client.clear_stats(self.mng.ports) return "cleared all stats" @@ -167,7 +148,7 @@ class TrexTUIPort(TrexTUIPanel): def show (self): - stats = self.stateless_client.cmd_stats([self.port_id], trex_stats.COMPACT) + stats = self.stateless_client._get_formatted_stats([self.port_id], trex_stats.COMPACT) # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) @@ -194,39 +175,44 @@ class TrexTUIPort(TrexTUIPanel): # actions def action_pause (self): - rc = self.stateless_client.pause_traffic([self.port_id]) - if rc.good(): - return "port {0}: paused traffic".format(self.port_id) - else: - return "" + try: + self.stateless_client.pause(ports = [self.port_id]) + except STLError: + pass + + return "" def action_resume (self): - rc = self.stateless_client.resume_traffic([self.port_id]) - if rc.good(): - return "port {0}: resumed traffic".format(self.port_id) - else: - return "" + try: + self.stateless_client.resume(ports = [self.port_id]) + except STLError: + pass + + return "" + def action_raise (self): - mul = {'type': 'percentage', 'value': 5, 'op': 'add'} - rc = self.stateless_client.update_traffic(mul, [self.port_id]) + mult = {'type': 'percentage', 'value': 5, 'op': 'add'} - if rc.good(): - return "port {0}: raised B/W by 5%".format(self.port_id) - else: - return "" + try: + self.stateless_client.update(mult = mult, ports = [self.port_id]) + except STLError: + pass + + return "" def action_lower (self): - mul = {'type': 'percentage', 'value': 5, 'op': 'sub'} - rc = self.stateless_client.update_traffic(mul, [self.port_id]) + mult = {'type': 'percentage', 'value': 5, 'op': 'sub'} - if rc.good(): - return "port {0}: lowered B/W by 5%".format(self.port_id) - else: - return "" + try: + self.stateless_client.update(mult = mult, ports = [self.port_id]) + except STLError: + pass + + return "" def action_clear (self): - self.stateless_client.cmd_clear([self.port_id]) + self.stateless_client.clear_stats([self.port_id]) return "port {0}: cleared stats".format(self.port_id) # log @@ -423,7 +409,7 @@ class TrexTUI(): if self.state == self.STATE_ACTIVE: # if no connectivity - move to lost connecitivty if not self.stateless_client.async_client.is_alive(): - self.stateless_client.cmd_invalidate(self.pm.ports) + self.stateless_client._invalidate_stats(self.pm.ports) self.state = self.STATE_LOST_CONT @@ -438,11 +424,10 @@ class TrexTUI(): # restored connectivity - try to reconnect elif self.state == self.STATE_RECONNECT: - rc = self.stateless_client.connect("RO") - if rc.good(): + try: + self.stateless_client.connect("RO") self.state = self.STATE_ACTIVE - else: - # maybe we lost it again + except STLError: self.state = self.STATE_LOST_CONT -- cgit 1.2.3-korg From 6f4a51c126b7a78ee8e37d396ed2b61b05fa506c Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 24 Jan 2016 04:01:30 -0500 Subject: added example --- api/stl/examples/stl_simple_burst.py | 53 ++++++++++ api/stl/examples/udp_64B.pcap | Bin 0 -> 104 bytes api/stl/profiles/burst.yaml | 39 ++++++++ api/stl/trex_stl_api.py | 17 ++++ .../trex_control_plane/client/trex_hltapi.py | 4 +- .../client/trex_stateless_client.py | 11 ++- .../trex_control_plane/common/trex_stats.py | 5 +- .../trex_control_plane/common/trex_streams.py | 1 - .../trex_control_plane/console/trex_console.py | 25 +++-- .../examples/interactive_stateless.py | 2 +- scripts/stl_test_api/__init__.py | 108 --------------------- scripts/stl_test_api/trex_stl_api.py | 93 ------------------ 12 files changed, 134 insertions(+), 224 deletions(-) create mode 100644 api/stl/examples/stl_simple_burst.py create mode 100644 api/stl/examples/udp_64B.pcap create mode 100644 api/stl/profiles/burst.yaml create mode 100644 api/stl/trex_stl_api.py delete mode 100644 scripts/stl_test_api/__init__.py delete mode 100644 scripts/stl_test_api/trex_stl_api.py (limited to 'scripts/automation/trex_control_plane/console') diff --git a/api/stl/examples/stl_simple_burst.py b/api/stl/examples/stl_simple_burst.py new file mode 100644 index 00000000..7efb574a --- /dev/null +++ b/api/stl/examples/stl_simple_burst.py @@ -0,0 +1,53 @@ +import sys +sys.path.insert(0, "../") + +import trex_stl_api + +from trex_stl_api import STLClient, STLError + +import time + +# define a simple burst test +def simple_burst (): + + passed = True + + try: + with STLClient() as c: + + # activate this for some logging information + #c.logger.set_verbose(c.logger.VERBOSE_REGULAR) + + # repeat for 5 times + for i in xrange(1, 6): + + # read the stats before + before_ipackets = c.get_stats()['total']['ipackets'] + + # inject burst profile on two ports and block until done + c.start(profiles = '../profiles/burst.yaml', ports = [0, 1], mult = "1gbps") + c.wait_on_traffic(ports = [0, 1]) + + after_ipackets = c.get_stats()['total']['ipackets'] + + print "Test iteration {0} - Packets Received: {1} ".format(i, (after_ipackets - before_ipackets)) + + # we have 600 packets in the burst and two ports + if (after_ipackets - before_ipackets) != (600 * 2): + passed = False + + # error handling + except STLError as e: + passed = False + print e + + + + if passed: + print "\nTest has passed :-)\n" + else: + print "\nTest has failed :-(\n" + + +simple_burst() + diff --git a/api/stl/examples/udp_64B.pcap b/api/stl/examples/udp_64B.pcap new file mode 100644 index 00000000..699b9c80 Binary files /dev/null and b/api/stl/examples/udp_64B.pcap differ diff --git a/api/stl/profiles/burst.yaml b/api/stl/profiles/burst.yaml new file mode 100644 index 00000000..dbd348c7 --- /dev/null +++ b/api/stl/profiles/burst.yaml @@ -0,0 +1,39 @@ +### Single stream UDP packet, 64B ### +##################################### +- name: stream0 + stream: + self_start: True + next_stream_id: stream1 + packet: + binary: udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 100 + rx_stats: [] + vm: [] + +- name: stream1 + stream: + self_start: False + next_stream_id: stream2 + packet: + binary: udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 200 + rx_stats: [] + vm: [] + +- name: stream2 + stream: + self_start: False + packet: + binary: udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 300 + rx_stats: [] + vm: [] diff --git a/api/stl/trex_stl_api.py b/api/stl/trex_stl_api.py new file mode 100644 index 00000000..aad39916 --- /dev/null +++ b/api/stl/trex_stl_api.py @@ -0,0 +1,17 @@ +import os +import sys +import time + + +# update the import path to include the stateless client +root_path = os.path.dirname(os.path.abspath(__file__)) + +sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client/')) +sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client_utils/')) +sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client_utils/')) + +# aliasing +import trex_stateless_client +STLClient = trex_stateless_client.STLClient +STLError = trex_stateless_client.STLError + diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py index 848d5a9e..c25c73cb 100755 --- a/scripts/automation/trex_control_plane/client/trex_hltapi.py +++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py @@ -2,7 +2,7 @@ import trex_root_path from client_utils.packet_builder import CTRexPktBuilder -from trex_stateless_client import CTRexStatelessClient +from trex_stateless_client import STLClient from common.trex_streams import * from client_utils.general_utils import id_count_gen import dpkt @@ -20,7 +20,7 @@ class CTRexHltApi(object): # sync = RPC, async = ZMQ def connect(self, device, port_list, username, sync_port = 4501, async_port = 4500, reset=False, break_locks=False): ret_dict = {"status": 0} - self.trex_client = CTRexStatelessClient(username, device, sync_port, async_port) + self.trex_client = STLClient(username, device, sync_port, async_port) rc = self.trex_client.connect() if rc.bad(): diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index c5d7e053..c1a4d1d1 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -419,8 +419,8 @@ class CCommLink(object): ############################ ############################# ############################ ############################# -class CTRexStatelessClient(object): - """docstring for CTRexStatelessClient""" +class STLClient(object): + """docstring for STLClient""" def __init__(self, username = general_utils.get_current_user(), @@ -968,6 +968,7 @@ class CTRexStatelessClient(object): stats_obj = {} for stats_type in stats_opts: stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type)) + return stats_obj def _get_streams(self, port_id_list, streams_mask=set()): @@ -1244,9 +1245,11 @@ class CTRexStatelessClient(object): # pings the server on the RPC channel @__api_check(True) def ping(self): - rc = self.__ping() self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], - self.connection_info['sync_port'])) + self.connection_info['sync_port'])) + rc = self.__ping() + + self.logger.post_cmd(rc) if not rc: raise STLError(rc) diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py index 9c2cd7f1..3f64310f 100755 --- a/scripts/automation/trex_control_plane/common/trex_stats.py +++ b/scripts/automation/trex_control_plane/common/trex_stats.py @@ -59,7 +59,7 @@ def calculate_diff_raw (samples): class CTRexInfoGenerator(object): """ This object is responsible of generating stats and information from objects maintained at - CTRexStatelessClient and the ports. + STLClient and the ports. """ def __init__(self, global_stats_ref, ports_dict_ref): @@ -477,6 +477,9 @@ class CPortStats(CTRexStats): raise TypeError("cannot add non stats object to stats") # main stats + if not self.latest_stats: + self.latest_stats = {} + self.__merge_dicts(self.latest_stats, x.latest_stats) # reference stats diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py index 800b6d49..ea3d71d1 100755 --- a/scripts/automation/trex_control_plane/common/trex_streams.py +++ b/scripts/automation/trex_control_plane/common/trex_streams.py @@ -210,7 +210,6 @@ class CStream(object): setattr(self, k, kwargs[k]) # TODO: load to _pkt_bld_obj also when passed as byte array! elif isinstance(binary, str) and binary.endswith(".pcap"): - # self.load_packet_from_pcap(binary, kwargs[k]["meta"]) self._pkt_bld_obj.load_packet_from_pcap(binary) self._pkt_bld_obj.metadata = kwargs[k]["meta"] self.packet = self._pkt_bld_obj.dump_pkt() diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 34494561..88ff45dc 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -29,7 +29,7 @@ import sys import tty, termios import trex_root_path from common.trex_streams import * -from client.trex_stateless_client import CTRexStatelessClient, LoggerApi, STLError +from client.trex_stateless_client import STLClient, LoggerApi, STLError from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts @@ -175,6 +175,7 @@ class TRexConsole(TRexGeneralCmd): ################### internal section ######################## def prompt_redraw (self): + self.postcmd(False, "") sys.stdout.write("\n" + self.prompt + readline.get_line_buffer()) sys.stdout.flush() @@ -293,9 +294,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_ping (self, line): '''Ping the server\n''' - rc = self.stateless_client.ping() - if rc.bad(): - return + self.stateless_client.ping() # set verbose on / off @@ -632,9 +631,7 @@ def run_script_file (self, filename, stateless_client): stateless_client.logger.log(format_text("unknown command '{0}'\n".format(cmd), 'bold')) return False - rc = cmd_table[cmd](args) - if rc.bad(): - return False + cmd_table[cmd](args) stateless_client.logger.log(format_text("\n[Done]", 'bold')) @@ -670,7 +667,7 @@ def setParserOptions(): default = get_current_user(), type = str) - parser.add_argument("--verbose", dest="verbose", + parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="Switch ON verbose option. Default is: OFF.", default = False) @@ -711,12 +708,12 @@ def main(): # Stateless client connection logger = ConsoleLogger() - stateless_client = CTRexStatelessClient(username = options.user, - server = options.server, - sync_port = options.port, - async_port = options.pub, - verbose_level = verbose_level, - logger = logger) + stateless_client = STLClient(username = options.user, + server = options.server, + sync_port = options.port, + async_port = options.pub, + verbose_level = verbose_level, + logger = logger) # TUI or no acquire will give us READ ONLY mode try: diff --git a/scripts/automation/trex_control_plane/examples/interactive_stateless.py b/scripts/automation/trex_control_plane/examples/interactive_stateless.py index e64b4755..f6ada17d 100644 --- a/scripts/automation/trex_control_plane/examples/interactive_stateless.py +++ b/scripts/automation/trex_control_plane/examples/interactive_stateless.py @@ -25,7 +25,7 @@ class InteractiveStatelessTRex(cmd.Cmd): self.verbose = verbose self.virtual = virtual - self.trex = CTRexStatelessClient(trex_host, trex_port, self.virtual) + self.trex = STLClient(trex_host, trex_port, self.virtual) self.DEFAULT_RUN_PARAMS = dict(m=1.5, nc=True, p=True, diff --git a/scripts/stl_test_api/__init__.py b/scripts/stl_test_api/__init__.py deleted file mode 100644 index 7381d88e..00000000 --- a/scripts/stl_test_api/__init__.py +++ /dev/null @@ -1,108 +0,0 @@ -# -# TRex stateless API -# provides a layer for communicating with TRex -# using Python API -# - -import sys -import os -import time -import cStringIO - -api_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(api_path, '../automation/trex_control_plane/client/')) - -from trex_stateless_client import CTRexStatelessClient, LoggerApi -from common import trex_stats - -# a basic test object -class BasicTestAPI(object): - - # exception class - class Failure(Exception): - def __init__ (self, rc): - self.msg = str(rc) - - def __str__ (self): - s = "\n\n******\n" - s += "Test has failed.\n\n" - s += "Error reported:\n\n {0}\n".format(self.msg) - return s - - - # logger for test - class Logger(LoggerApi): - def __init__ (self): - LoggerApi.__init__(self) - self.buffer = cStringIO.StringIO() - - def write (self, msg, newline = True): - line = str(msg) + ("\n" if newline else "") - self.buffer.write(line) - - #print line - - def flush (self): - pass - - - # main object - def __init__ (self, server = "localhost", sync_port = 4501, async_port = 4500): - self.logger = BasicTestAPI.Logger() - self.client = CTRexStatelessClient(logger = self.logger, - sync_port = sync_port, - async_port = async_port, - verbose_level = LoggerApi.VERBOSE_REGULAR) - - - self.__invalid_stats = True - - # connect to the stateless client - def connect (self): - rc = self.client.connect(mode = "RWF") - self.__verify_rc(rc) - - - # disconnect from the stateless client - def disconnect (self): - self.client.disconnect() - - - def inject_profile (self, filename, rate = "1", duration = None): - self.__invalid_stats = True - - cmd = "--total -f {0} -m {1}".format(filename, rate) - if duration: - cmd += " -d {0}".format(duration) - - rc = self.client.cmd_start_line(cmd) - self.__verify_rc(rc) - - - def wait_while_traffic_on (self, timeout = None): - while self.client.get_active_ports(): - time.sleep(0.1) - - - def get_stats (self): - if self.__invalid_stats: - # send a barrier - rc = self.client.block_on_stats() - self.__verify_rc(rc) - self.__invalid_stats = False - - total_stats = trex_stats.CPortStats(None) - - for port in self.client.ports.values(): - total_stats += port.port_stats - - return total_stats - - - # some internal methods - - # verify RC return value - def __verify_rc (self, rc): - if not rc: - raise self.Failure(rc) - diff --git a/scripts/stl_test_api/trex_stl_api.py b/scripts/stl_test_api/trex_stl_api.py deleted file mode 100644 index 6d85ee9f..00000000 --- a/scripts/stl_test_api/trex_stl_api.py +++ /dev/null @@ -1,93 +0,0 @@ -import sys -import os -import time -import cStringIO - -api_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(api_path, '../automation/trex_control_plane/client/')) - -from trex_stateless_client import CTRexStatelessClient, LoggerApi - -class BasicTest(object): - - # exception class - class Failure(Exception): - def __init__ (self, rc): - self.rc = rc - - def __str__ (self): - s = "\n******\n" - s += "Test has failed.\n\n" - s += "Error reported:\n\n {0}\n".format(str(self.rc.err())) - return s - - - # logger - class Logger(LoggerApi): - def __init__ (self): - LoggerApi.__init__(self) - self.buffer = cStringIO.StringIO() - - def write (self, msg, newline = True): - self.buffer.write(str(msg)) - - if newline: - self.buffer.write("\n") - - def flush (self): - pass - - - def __init__ (self): - self.logger = BasicTest.Logger() - self.client = CTRexStatelessClient(logger = self.logger) - - def connect (self): - rc = self.client.connect(mode = "RWF") - __verify_rc(rc) - - - def disconnect (self): - self.client.disconnect() - - - def __verify_rc (self, rc): - if rc.bad(): - raise self.Failure(rc) - - def inject_profile (self, filename, rate = "1", duration = None): - cmd = "-f {0} -m {1}".format(filename, rate) - if duration: - cmd += " -d {0}".format(duration) - - print cmd - rc = self.client.cmd_start_line(cmd) - self.__verify_rc(rc) - - - def wait_while_traffic_on (self, timeout = None): - while self.client.get_active_ports(): - time.sleep(0.1) - - -def start (): - test = BasicTest() - - try: - test.connect() - test.inject_profile("stl/imix_1pkt.yaml", rate = "5gbps", duration = 10) - test.wait_while_traffic_on() - finally: - test.disconnect() - - -def main (): - try: - start() - print "\nTest has passed :)\n" - except BasicTest.Failure as e: - print e - - - -#main() -- cgit 1.2.3-korg