diff options
Diffstat (limited to 'scripts/automation')
10 files changed, 589 insertions, 190 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 8b274134..8fdf7c9b 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -162,28 +162,94 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() + self.connected = False + + # connects the async channel + def connect (self): + + if self.connected: + self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + print "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + # before running the thread - mark as active self.active = True - self.t = threading.Thread(target= self.run) + self.alive = False + self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() - def run(self): + self.connected = True - # Socket to talk to server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) + # wait for data streaming from the server + timeout = time.time() + 5 + while not self.alive: + time.sleep(0.01) + if time.time() > timeout: + self.disconnect() + return False, "*** [subscriber] - no data flow from server at : " + self.tr + + return True, "" + + + # disconnect + def disconnect (self): + if not self.connected: + return + + # signal that the context was destroyed (exit the thread loop) + self.context.term() + + # mark for join and join + self.active = False + self.t.join() + + # done + self.connected = False + + # thread function + def _run (self): + + # no data yet... + self.alive = False + + # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') + self.socket.setsockopt(zmq.RCVTIMEO, 5000) while self.active: - line = self.socket.recv_string() + try: + + line = self.socket.recv_string() + + if not self.alive: + self.stateless_client.on_async_alive() + self.alive = True + + # got a timeout - mark as not alive and retry + except zmq.Again: + + if self.alive: + self.stateless_client.on_async_dead() + self.alive = False + + continue + + except zmq.ContextTerminated: + # outside thread signaled us to exit + self.alive = False + break + msg = json.loads(line) name = msg['name'] @@ -193,7 +259,12 @@ class CTRexAsyncClient(): self.__dispatch(name, type, data) - def get_stats(self): + + # closing of socket must be from the same thread + self.socket.close(linger = 0) + + + def get_stats (self): return self.stats def get_raw_snapshot (self): @@ -203,7 +274,6 @@ class CTRexAsyncClient(): def __dispatch (self, name, type, data): # stats if name == "trex-global": - # self.stats.update(data) self.stateless_client.handle_async_stats_update(data) # events elif name == "trex-event": @@ -212,10 +282,3 @@ class CTRexAsyncClient(): pass - def stop (self): - self.active = False - self.t.join() - - -if __name__ == "__main__": - pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py index 160abdec..77b11c37 100755 --- a/scripts/automation/trex_control_plane/client/trex_client.py +++ b/scripts/automation/trex_control_plane/client/trex_client.py @@ -131,7 +131,9 @@ class CTRexClient(object): raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.') trex_cmd_options.update( {'f' : f, 'd' : d} ) - + if not trex_cmd_options.get('l'): + self.result_obj.latency_checked = False + self.result_obj.clear_results() try: issue_time = time.time() @@ -767,6 +769,7 @@ class CTRexResult(object): """ self._history = deque(maxlen = max_history_size) self.clear_results() + self.latency_checked = True def __repr__(self): return ("Is valid history? {arg}\n".format( arg = self.is_valid_hist() ) + @@ -1032,18 +1035,19 @@ class CTRexResult(object): self._done_warmup = True # handle latency data - latency_pre = "trex-latency" - self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC - # support old typo - if self._max_latency is None: - latency_pre = "trex-latecny" - self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-") - - self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC - self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency) - - avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-") - self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list) + if self.latency_checked: + latency_pre = "trex-latency" + self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC + # support old typo + if self._max_latency is None: + latency_pre = "trex-latecny" + self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-") + + self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC + self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency) + + avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-") + self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list) tx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_tx_pkts") rx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_rx_pkts") diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 748817da..2925e7eb 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -14,7 +14,6 @@ import json from common.trex_streams import * from collections import namedtuple from common.text_opts import * -# import trex_stats from common import trex_stats from client_utils import parsing_opts, text_tables import time @@ -52,14 +51,14 @@ class RC(): return not self.good() def data (self): - return all([x.data if x.rc else "" for x in self.rc_list]) + return [x.data if x.rc else "" for x in self.rc_list] def err (self): - return all([x.data if not x.rc else "" for x in self.rc_list]) + return [x.data if not x.rc else "" for x in self.rc_list] def annotate (self, desc = None): if desc: - print format_text('\n{:<40}'.format(desc), 'bold'), + print format_text('\n{:<60}'.format(desc), 'bold'), if self.bad(): # print all the errors @@ -76,14 +75,29 @@ class RC(): print format_text("[SUCCESS]\n", 'green', 'bold') -def RC_OK(): - return RC(True, "") -def RC_ERR(err): +def RC_OK(data = None): + return RC(True, data) +def RC_ERR (err): return RC(False, err) - LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) +########## utlity ############ +def mult_to_factor (mult, max_bps, max_pps, line_util): + if mult['type'] == 'raw': + return mult['value'] + + if mult['type'] == 'bps': + return mult['value'] / max_bps + + if mult['type'] == 'pps': + return mult['value'] / max_pps + + if mult['type'] == 'percentage': + return mult['value'] / line_util + + + # describes a stream DB class CStreamsDB(object): @@ -157,22 +171,26 @@ class Port(object): STATE_PAUSE: "PAUSE"} - def __init__ (self, port_id, speed, driver, user, transmit): + def __init__ (self, port_id, speed, driver, user, comm_link): self.port_id = port_id self.state = self.STATE_IDLE self.handler = None - self.transmit = transmit + self.comm_link = comm_link + self.transmit = comm_link.transmit + self.transmit_batch = comm_link.transmit_batch self.user = user self.driver = driver self.speed = speed self.streams = {} + self.profile = None + self.port_stats = trex_stats.CPortStats(self) def err(self, msg): return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - def ok(self): - return RC_OK() + def ok(self, data = None): + return RC_OK(data) def get_speed_bps (self): return (self.speed * 1000 * 1000 * 1000) @@ -259,6 +277,33 @@ class Port(object): return self.ok() + # add multiple streams + def add_streams (self, streams_list): + batch = [] + + for stream in streams_list: + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream.stream_id, + "stream": stream.stream} + + cmd = RpcCmdData('add_stream', params) + batch.append(cmd) + + rc, data = self.transmit_batch(batch) + + if not rc: + return self.err(data) + + # add the stream + for stream in streams_list: + self.streams[stream.stream_id] = stream.stream + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + # remove stream from port def remove_stream (self, stream_id): @@ -302,14 +347,6 @@ class Port(object): def get_all_streams (self): return self.streams - - def process_mul (self, mul): - # if percentage - translate - if mul['type'] == 'percentage': - mul['type'] = 'max_bps' - mul['max'] = self.get_speed_bps() * (mul['max'] / 100) - - # start traffic def start (self, mul, duration): if self.state == self.STATE_DOWN: @@ -321,8 +358,6 @@ class Port(object): if self.state == self.STATE_TX: return self.err("Unable to start traffic - port is already transmitting") - self.process_mul(mul) - params = {"handler": self.handler, "port_id": self.port_id, "mul": mul, @@ -395,8 +430,6 @@ class Port(object): if (self.state != self.STATE_TX) : return self.err("port is not transmitting") - self.process_mul(mul) - params = {"handler": self.handler, "port_id": self.port_id, "mul": mul} @@ -407,6 +440,68 @@ class Port(object): return self.ok() + + def validate (self): + + if (self.state == self.STATE_DOWN): + return self.err("port is down") + + if (self.state == self.STATE_IDLE): + return self.err("no streams attached to port") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("validate", params) + if not rc: + return self.err(data) + + self.profile = data + + return self.ok() + + def get_profile (self): + return self.profile + + + def print_profile (self, mult, duration): + if not self.get_profile(): + return + + rate = self.get_profile()['rate'] + graph = self.get_profile()['graph'] + + print format_text("Profile Map Per Port\n", 'underline', 'bold') + + factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util']) + + print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"), + format_num(rate['max_bps'] * factor, suffix = "bps")) + + print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"), + format_num(rate['max_pps'] * factor, suffix = "pps"),) + + print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100), + format_percentage(rate['max_line_util'] * factor * 100)) + + + # duration + exp_time_base_sec = graph['expected_duration'] / (1000 * 1000) + exp_time_factor_sec = exp_time_base_sec / factor + + # user configured a duration + if duration > 0: + if exp_time_factor_sec > 0: + exp_time_factor_sec = min(exp_time_factor_sec, duration) + else: + exp_time_factor_sec = duration + + + print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec), + format_time(exp_time_factor_sec)) + print "\n" + + def get_port_state_name(self): return self.STATES_MAP.get(self.state, "Unknown") @@ -424,6 +519,7 @@ class Port(object): def clear_stats(self): return self.port_stats.clear_stats() + ################# events handler ###################### def async_event_port_stopped (self): self.state = self.STATE_STREAMS @@ -435,12 +531,9 @@ class CTRexStatelessClient(object): def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False): super(CTRexStatelessClient, self).__init__() self.user = username - self.system_info = None self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) self.verbose = False self.ports = {} - # self._conn_handler = {} - # self._active_ports = set() self._connection_info = {"server": server, "sync_port": sync_port, "async_port": async_port} @@ -448,7 +541,7 @@ class CTRexStatelessClient(object): self.server_version = {} self.__err_log = None - self._async_client = CTRexAsyncClient(server, async_port, self) + self.async_client = CTRexAsyncClient(server, async_port, self) self.streams_db = CStreamsDB() self.global_stats = trex_stats.CGlobalStats(self._connection_info, @@ -457,12 +550,26 @@ class CTRexStatelessClient(object): self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats, self.ports) - self.connected = False - self.events = [] + self.connected = False + ################# events handler ###################### + def add_event_log (self, msg, ev_type, show = False): + + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" + + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) + + if show: + print format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))) + def handle_async_stats_update(self, dump_data): global_stats = {} port_stats = {} @@ -490,22 +597,22 @@ class CTRexStatelessClient(object): for port_id, data in port_stats.iteritems(): self.ports[port_id].port_stats.update(data) + + def handle_async_event (self, type, data): # DP stopped - ev = "[event] - " - show_event = False # port started if (type == 0): port_id = int(data['port_id']) - ev += "Port {0} has started".format(port_id) + ev = "Port {0} has started".format(port_id) # port stopped elif (type == 1): port_id = int(data['port_id']) - ev += "Port {0} has stopped".format(port_id) + ev = "Port {0} has stopped".format(port_id) # call the handler self.async_event_port_stopped(port_id) @@ -513,14 +620,14 @@ class CTRexStatelessClient(object): # server stopped elif (type == 2): - ev += "Server has stopped" + ev = "Server has stopped" self.async_event_server_stopped() show_event = True # port finished traffic elif (type == 3): port_id = int(data['port_id']) - ev += "Port {0} job done".format(port_id) + ev = "Port {0} job done".format(port_id) # call the handler self.async_event_port_stopped(port_id) @@ -530,19 +637,16 @@ class CTRexStatelessClient(object): # unknown event - ignore return - if show_event: - print format_text("\n" + ev, 'bold') - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{0} - ".format(st) + format_text(ev, 'bold')) + self.add_event_log(ev, 'server', show_event) def async_event_port_stopped (self, port_id): self.ports[port_id].async_event_port_stopped() def async_event_server_stopped (self): - self.disconnect() + self.connected = False + def get_events (self): return self.events @@ -552,6 +656,24 @@ class CTRexStatelessClient(object): ############# helper functions section ############## + # measure time for functions + def timing(f): + def wrap(*args): + time1 = time.time() + ret = f(*args) + + # don't want to print on error + if ret.bad(): + return ret + + delta = time.time() - time1 + print format_time(delta) + "\n" + + return ret + + return wrap + + def validate_port_list(self, port_id_list): if not isinstance(port_id_list, list): print type(port_id_list) @@ -586,13 +708,19 @@ class CTRexStatelessClient(object): # connection sequence def connect(self): + # clear this flag self.connected = False - # connect + # connect sync channel rc, data = self.comm_link.connect() if not rc: return RC_ERR(data) + # connect async channel + rc, data = self.async_client.connect() + if not rc: + return RC_ERR(data) + # version rc, data = self.transmit("get_version") if not rc: @@ -602,7 +730,6 @@ class CTRexStatelessClient(object): self.global_stats.server_version = data # cache system info - # self.get_system_info(refresh=True) rc, data = self.transmit("get_system_info") if not rc: return RC_ERR(data) @@ -619,7 +746,9 @@ class CTRexStatelessClient(object): for port_id in xrange(self.get_port_count()): speed = self.system_info['ports'][port_id]['speed'] driver = self.system_info['ports'][port_id]['driver'] - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.transmit) + + self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link) + # acquire all ports rc = self.acquire() @@ -639,11 +768,19 @@ class CTRexStatelessClient(object): def disconnect(self): - self.connected = False self.comm_link.disconnect() + self.async_client.disconnect() return RC_OK() + def on_async_dead (self): + if self.connected: + msg = 'lost connection to server' + self.add_event_log(msg, 'local', True) + self.connected = False + + def on_async_alive (self): + pass ########### cached queries (no server traffic) ########### @@ -666,8 +803,8 @@ class CTRexStatelessClient(object): else: return port_ids - def get_stats_async(self): - return self._async_client.get_stats() + def get_stats_async (self): + return self.async_client.get_stats() def get_connection_port (self): return self.comm_link.port @@ -750,15 +887,16 @@ class CTRexStatelessClient(object): return rc + def add_stream_pack(self, stream_pack_list, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() - for stream_pack in stream_pack_list: - rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)) - + for port_id in port_id_list: + rc.add(self.ports[port_id].add_streams(stream_pack_list)) + return rc @@ -855,6 +993,17 @@ class CTRexStatelessClient(object): return rc + def validate (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].validate()) + + return rc + + def get_port_stats(self, port_id=None): pass @@ -866,9 +1015,12 @@ class CTRexStatelessClient(object): return self.comm_link.transmit(method_name, params) + def transmit_batch(self, batch_list): + return self.comm_link.transmit_batch(batch_list) ######################### Console (high level) API ######################### + @timing def cmd_ping(self): rc = self.ping() rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) @@ -924,7 +1076,7 @@ class CTRexStatelessClient(object): active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) if not active_ports: - msg = "No active traffic on porvided ports" + msg = "No active traffic on provided ports" print format_text(msg, 'bold') return RC_ERR(msg) @@ -948,17 +1100,20 @@ class CTRexStatelessClient(object): rc = self.update_traffic(mult, active_ports) rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - return RC_OK() + return rc + # clear stats def cmd_clear(self, port_id_list): + for port_id in port_id_list: self.ports[port_id].clear_stats() + self.global_stats.clear_stats() + return RC_OK() + # pause cmd def cmd_pause (self, port_id_list): @@ -972,10 +1127,8 @@ class CTRexStatelessClient(object): rc = self.pause_traffic(active_ports) rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc + return rc - return RC_OK() # resume cmd @@ -991,14 +1144,11 @@ class CTRexStatelessClient(object): rc = self.resume_traffic(active_ports) rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - - return RC_OK() + return rc # start cmd - def cmd_start (self, port_id_list, stream_list, mult, force, duration): + def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) @@ -1020,19 +1170,37 @@ class CTRexStatelessClient(object): rc = self.add_stream_pack(stream_list.compiled, port_id_list) - rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list)) + rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) if rc.bad(): return rc + # when not on dry - start the traffic , otherwise validate only + if not dry: + rc = self.start_traffic(mult, duration, port_id_list) + rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) + + return rc + else: + rc = self.validate(port_id_list) + rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list)) + + if rc.bad(): + return rc + + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) - # finally, start the traffic - rc = self.start_traffic(mult, duration, port_id_list) - rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): return rc - return RC_OK() + # validate port(s) profile + def cmd_validate (self, port_id_list): + rc = self.validate(port_id_list) + rc.annotate("Validating streams on port(s) {0}:".format(port_id_list)) + return rc + + + # stats def cmd_stats(self, port_id_list, stats_mask=set()): stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) @@ -1043,6 +1211,7 @@ class CTRexStatelessClient(object): ############## High Level API With Parser ################ + @timing def cmd_start_line (self, line): '''Start selected traffic in specified ports on TRex\n''' # define a parser @@ -1054,13 +1223,18 @@ class CTRexStatelessClient(object): parsing_opts.FORCE, parsing_opts.STREAM_FROM_PATH_OR_FILE, parsing_opts.DURATION, - parsing_opts.MULTIPLIER) + parsing_opts.MULTIPLIER_STRICT, + parsing_opts.DRY_RUN) opts = parser.parse_args(line.split()) if opts is None: return RC_ERR("bad command line parameters") + + if opts.dry: + print format_text("\n*** DRY RUN ***", 'bold') + if opts.db: stream_list = self.streams_db.get_stream_pack(opts.db) rc = RC(stream_list != None) @@ -1078,12 +1252,13 @@ class CTRexStatelessClient(object): # total has no meaning with percentage - its linear - if opts.total and (mult['type'] != 'percentage'): + if opts.total and (opts.mult['type'] != 'percentage'): # if total was set - divide it between the ports - opts.mult['max'] = opts.mult['max'] / len(opts.ports) + opts.mult['value'] = opts.mult['value'] / len(opts.ports) - return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration) + return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry) + @timing def cmd_resume_line (self, line): '''Resume active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1097,6 +1272,8 @@ class CTRexStatelessClient(object): return self.cmd_resume(opts.ports) + + @timing def cmd_stop_line (self, line): '''Stop active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1110,6 +1287,8 @@ class CTRexStatelessClient(object): return self.cmd_stop(opts.ports) + + @timing def cmd_pause_line (self, line): '''Pause active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1123,6 +1302,8 @@ class CTRexStatelessClient(object): return self.cmd_pause(opts.ports) + + @timing def cmd_update_line (self, line): '''Update port(s) speed currently active\n''' parser = parsing_opts.gen_parser(self, @@ -1139,14 +1320,15 @@ class CTRexStatelessClient(object): # total has no meaning with percentage - its linear if opts.total and (opts.mult['type'] != 'percentage'): # if total was set - divide it between the ports - opts.mult['max'] = opts.mult['max'] / len(opts.ports) + opts.mult['value'] = opts.mult['value'] / len(opts.ports) return self.cmd_update(opts.ports, opts.mult) - + @timing def cmd_reset_line (self, line): return self.cmd_reset() + def cmd_clear_line (self, line): '''Clear cached local statistics\n''' # define a parser @@ -1161,6 +1343,7 @@ class CTRexStatelessClient(object): return RC_ERR("bad command line parameters") return self.cmd_clear(opts.ports) + def cmd_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser @@ -1180,30 +1363,64 @@ class CTRexStatelessClient(object): if not mask: # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS + # get stats objects, as dictionary stats = self.cmd_stats(opts.ports, mask) # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) - return - - # if opts.db: - # stream_list = self.streams_db.get_stream_pack(opts.db) - # rc = RC(stream_list != None) - # rc.annotate("Load stream pack (from DB):") - # if rc.bad(): - # return RC_ERR("Failed to load stream pack") - # - # else: - # # load streams from file - # stream_list = self.streams_db.load_yaml_file(opts.file[0]) - # rc = RC(stream_list != None) - # rc.annotate("Load stream pack (from file):") - # if stream_list == None: - # return RC_ERR("Failed to load stream pack") - # - # - # return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration) + + return RC_OK() + + + + + @timing + def cmd_pause_line (self, line): + '''Pause active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "pause", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + return self.cmd_pause(opts.ports) + + + @timing + def cmd_resume_line (self, line): + '''Resume active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "resume", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + return self.cmd_resume(opts.ports) + + + @timing + def cmd_validate_line (self, line): + '''validates port(s) stream configuration\n''' + + parser = parsing_opts.gen_parser(self, + "validate", + self.cmd_validate_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + rc = self.cmd_validate(opts.ports) + return rc + def cmd_exit_line (self, line): print format_text("Exiting\n", 'bold') @@ -1280,6 +1497,7 @@ class CTRexStatelessClient(object): return True + ################################# # ------ private methods ------ # @staticmethod @@ -1354,4 +1572,3 @@ class CTRexStatelessClient(object): if __name__ == "__main__": pass - diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index dd208da4..f55d7798 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -37,7 +37,7 @@ class BatchMessage(object): msg = json.dumps(self.batch_list) - rc, resp_list = self.rpc_client.send_raw_msg(msg, block = False) + rc, resp_list = self.rpc_client.send_raw_msg(msg) if len(self.batch_list) == 1: return CmdResponse(True, [CmdResponse(rc, resp_list)]) else: @@ -130,11 +130,10 @@ class JsonRpcClient(object): self.socket.send(msg) break except zmq.Again: - sleep(0.1) tries += 1 if tries > 10: self.disconnect() - return CmdResponse(False, "Failed to send message to server") + return CmdResponse(False, "*** [RPC] - Failed to send message to server") tries = 0 @@ -143,11 +142,10 @@ class JsonRpcClient(object): response = self.socket.recv() break except zmq.Again: - sleep(0.1) tries += 1 if tries > 10: self.disconnect() - return CmdResponse(False, "Failed to get server response") + return CmdResponse(False, "*** [RPC] - Failed to get server response") self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -223,8 +221,8 @@ class JsonRpcClient(object): except zmq.error.ZMQError as e: return False, "ZMQ Error: Bad server or port name: " + str(e) - self.socket.setsockopt(zmq.SNDTIMEO, 5) - self.socket.setsockopt(zmq.RCVTIMEO, 5) + self.socket.setsockopt(zmq.SNDTIMEO, 1000) + self.socket.setsockopt(zmq.RCVTIMEO, 1000) self.connected = True diff --git a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py index 6c348467..7ac9e312 100755 --- a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py @@ -10,22 +10,23 @@ ArgumentGroup = namedtuple('ArgumentGroup', ['type', 'args', 'options']) # list of available parsing options MULTIPLIER = 1 -PORT_LIST = 2 -ALL_PORTS = 3 -PORT_LIST_WITH_ALL = 4 -FILE_PATH = 5 -FILE_FROM_DB = 6 -SERVER_IP = 7 -STREAM_FROM_PATH_OR_FILE = 8 -DURATION = 9 -FORCE = 10 - -TOTAL = 11 - -GLOBAL_STATS = 12 -PORT_STATS = 13 -PORT_STATUS = 14 -STATS_MASK = 15 +MULTIPLIER_STRICT = 2 +PORT_LIST = 3 +ALL_PORTS = 4 +PORT_LIST_WITH_ALL = 5 +FILE_PATH = 6 +FILE_FROM_DB = 7 +SERVER_IP = 8 +STREAM_FROM_PATH_OR_FILE = 9 +DURATION = 10 +FORCE = 11 +DRY_RUN = 12 +TOTAL = 13 + +GLOBAL_STATS = 14 +PORT_STATS = 15 +PORT_STATUS = 16 +STATS_MASK = 17 # list of ArgumentGroup types MUTEX = 1 @@ -60,10 +61,15 @@ match_multiplier_help = """Multiplier should be passed in the following format: will provide a percentage of the line rate. examples : '-m 10', '-m 10kbps', '-m 10mpps', '-m 23%%' """ -def match_multiplier(val): - '''match some val against multiplier shortcut inputs ''' +def match_multiplier_common(val, strict_abs = True): - match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val) + # on strict absolute we do not allow +/- + if strict_abs: + match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val) + op = None + else: + match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)([\+\-])?$", val) + op = match.group(4) result = {} @@ -71,44 +77,53 @@ def match_multiplier(val): value = float(match.group(1)) unit = match.group(3) + + # raw type (factor) if not unit: result['type'] = 'raw' - result['max'] = value + result['value'] = value elif unit == 'bps': - result['type'] = 'max_bps' - result['max'] = value + result['type'] = 'bps' + result['value'] = value elif unit == 'kbps': - result['type'] = 'max_bps' - result['max'] = value * 1000 + result['type'] = 'bps' + result['value'] = value * 1000 elif unit == 'mbps': - result['type'] = 'max_bps' - result['max'] = value * 1000 * 1000 + result['type'] = 'bps' + result['value'] = value * 1000 * 1000 elif unit == 'gbps': - result['type'] = 'max_bps' - result['max'] = value * 1000 * 1000 * 1000 + result['type'] = 'bps' + result['value'] = value * 1000 * 1000 * 1000 elif unit == 'pps': - result['type'] = 'max_pps' - result['max'] = value + result['type'] = 'pps' + result['value'] = value elif unit == "kpps": - result['type'] = 'max_pps' - result['max'] = value * 1000 + result['type'] = 'pps' + result['value'] = value * 1000 elif unit == "mpps": - result['type'] = 'max_pps' - result['max'] = value * 1000 * 1000 + result['type'] = 'pps' + result['value'] = value * 1000 * 1000 elif unit == "%": - # will be translated by the port object result['type'] = 'percentage' - result['max'] = value + result['value'] = value + + + if op == "+": + result['op'] = "add" + elif op == "-": + result['op'] = "sub" + else: + result['op'] = "abs" return result @@ -116,6 +131,13 @@ def match_multiplier(val): raise argparse.ArgumentTypeError(match_multiplier_help) +def match_multiplier(val): + '''match some val against multiplier shortcut inputs ''' + return match_multiplier_common(val, strict_abs = False) + +def match_multiplier_strict(val): + '''match some val against multiplier shortcut inputs ''' + return match_multiplier_common(val, strict_abs = True) def is_valid_file(filename): if not os.path.isfile(filename): @@ -127,9 +149,14 @@ def is_valid_file(filename): OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'help': match_multiplier_help, 'dest': "mult", - 'default': {'type':'raw', 'max':1}, + 'default': {'type':'raw', 'value':1, 'op': 'abs'}, 'type': match_multiplier}), + MULTIPLIER_STRICT: ArgumentPack(['-m', '--multiplier'], + {'help': match_multiplier_help, + 'dest': "mult", + 'default': {'type':'raw', 'value':1, 'op': 'abs'}, + 'type': match_multiplier_strict}), TOTAL: ArgumentPack(['-t', '--total'], {'help': "traffic will be divided between all ports specified", @@ -177,6 +204,12 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'metavar': 'SERVER', 'help': "server IP"}), + DRY_RUN: ArgumentPack(['-n', '--dry'], + {'action': 'store_true', + 'dest': 'dry', + 'default': False, + 'help': "Dry run - no traffic will be injected"}), + GLOBAL_STATS: ArgumentPack(['-g'], {'action': 'store_true', 'help': "Fetch only global statistics"}), @@ -189,6 +222,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'action': 'store_true', 'help': "Fetch only port status data"}), + # advanced options PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST, ALL_PORTS], diff --git a/scripts/automation/trex_control_plane/common/text_opts.py b/scripts/automation/trex_control_plane/common/text_opts.py index 06c2c056..5a86149c 100755 --- a/scripts/automation/trex_control_plane/common/text_opts.py +++ b/scripts/automation/trex_control_plane/common/text_opts.py @@ -19,6 +19,50 @@ TEXT_CODES = {'bold': {'start': '\x1b[1m', 'end': '\x1b[24m'}} +def format_num (size, suffix = ""): + for unit in ['','K','M','G','T','P']: + if abs(size) < 1000.0: + return "%3.2f %s%s" % (size, unit, suffix) + size /= 1000.0 + + return "NaN" + +def format_time (t_sec): + if t_sec < 0: + return "infinite" + + if t_sec < 1: + # low numbers + for unit in ['ms', 'usec', 'ns']: + t_sec *= 1000.0 + if t_sec >= 1.0: + return '{:,.2f} [{:}]'.format(t_sec, unit) + + return "NaN" + + else: + # seconds + if t_sec < 60.0: + return '{:,.2f} [{:}]'.format(t_sec, 'sec') + + # minutes + t_sec /= 60.0 + if t_sec < 60.0: + return '{:,.2f} [{:}]'.format(t_sec, 'minutes') + + # hours + t_sec /= 60.0 + if t_sec < 24.0: + return '{:,.2f} [{:}]'.format(t_sec, 'hours') + + # days + t_sec /= 24.0 + return '{:,.2f} [{:}]'.format(t_sec, 'days') + + +def format_percentage (size): + return "%0.2f %%" % (size) + def bold(text): return text_attribute(text, 'bold') diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py index bb4c72ca..89de7286 100755 --- a/scripts/automation/trex_control_plane/common/trex_streams.py +++ b/scripts/automation/trex_control_plane/common/trex_streams.py @@ -14,14 +14,26 @@ StreamPack = namedtuple('StreamPack', ['stream_id', 'stream']) class CStreamList(object): def __init__(self): - self.streams_list = {} + self.streams_list = OrderedDict() self.yaml_loader = CTRexYAMLLoader(os.path.join(os.path.dirname(os.path.realpath(__file__)), "rpc_defaults.yaml")) + def generate_numbered_name (self, name): + prefix = name.rstrip('01234567890') + suffix = name[len(prefix):] + if suffix == "": + n = "_1" + else: + n = int(suffix) + 1 + return prefix + str(n) + def append_stream(self, name, stream_obj): assert isinstance(stream_obj, CStream) - if name in self.streams_list: - raise NameError("A stream with this name already exists on this list.") + + # if name exists simply add numbered suffix to it + while name in self.streams_list: + name = self.generate_numbered_name(name) + self.streams_list[name]=stream_obj return name @@ -70,6 +82,7 @@ class CStreamList(object): stream_ids = {} for idx, stream_name in enumerate(self.streams_list): stream_ids[stream_name] = idx + # next, iterate over the streams and transform them from working with names to ids. # with that build a new dict with old stream_name as the key, and StreamPack as the stored value compiled_streams = {} diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 9236ce98..e187c8c2 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -35,7 +35,8 @@ from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts import trex_status - +import parsing_opts +from functools import wraps __version__ = "1.1" @@ -129,6 +130,18 @@ class TRexConsole(TRexGeneralCmd): ################### internal section ######################## + def verify_connected(f): + @wraps(f) + def wrap(*args): + inst = args[0] + if not inst.stateless_client.is_connected(): + print format_text("\nNot connected to server\n", 'bold') + return + + ret = f(*args) + return ret + + return wrap def get_console_identifier(self): return "{context}_{server}".format(context=self.__class__.__name__, @@ -142,6 +155,7 @@ class TRexConsole(TRexGeneralCmd): self.__dict__[name] = getattr(self.trex_console, name) def postcmd(self, stop, line): + if self.stateless_client.is_connected(): self.prompt = "TRex > " else: @@ -208,9 +222,9 @@ class TRexConsole(TRexGeneralCmd): ####################### shell commands ####################### + @verify_connected def do_ping (self, line): '''Ping the server\n''' - rc = self.stateless_client.cmd_ping() if rc.bad(): return @@ -300,6 +314,7 @@ class TRexConsole(TRexGeneralCmd): if (l > 2) and (s[l - 2] in file_flags): return TRexConsole.tree_autocomplete(s[l - 1]) + @verify_connected def do_start(self, line): '''Start selected traffic in specified port(s) on TRex\n''' @@ -310,42 +325,60 @@ class TRexConsole(TRexGeneralCmd): self.do_start("-h") ############# stop + @verify_connected def do_stop(self, line): '''stops port(s) transmitting traffic\n''' + self.stateless_client.cmd_stop_line(line) def help_stop(self): self.do_stop("-h") ############# update + @verify_connected def do_update(self, line): '''update speed of port(s)currently transmitting traffic\n''' + self.stateless_client.cmd_update_line(line) def help_update (self): self.do_update("-h") ############# pause + @verify_connected def do_pause(self, line): '''pause port(s) transmitting traffic\n''' + self.stateless_client.cmd_pause_line(line) ############# resume + @verify_connected def do_resume(self, line): '''resume port(s) transmitting traffic\n''' + self.stateless_client.cmd_resume_line(line) ########## reset + @verify_connected def do_reset (self, line): '''force stop all ports\n''' - self.stateless_client.cmd_reset() + self.stateless_client.cmd_reset_line(line) + + + ######### validate + @verify_connected + def do_validate (self, line): + '''validates port(s) stream configuration\n''' + + self.stateless_client.cmd_validate_line(line) + def do_stats(self, line): '''Fetch statistics from TRex server by port\n''' self.stateless_client.cmd_stats_line(line) - pass + def help_stats(self): self.do_stats("-h") @@ -387,13 +420,10 @@ class TRexConsole(TRexGeneralCmd): print format_text("\n\nEvent log was cleared\n\n") # tui + @verify_connected def do_tui (self, line): '''Shows a graphical console\n''' - if not self.stateless_client.is_connected(): - print format_text("\nNot connected to server\n", 'bold') - return - self.do_verbose('off') trex_status.show_trex_status(self.stateless_client) diff --git a/scripts/automation/trex_control_plane/server/extended_daemon_runner.py b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py index 734fa22e..7bc25aac 100755 --- a/scripts/automation/trex_control_plane/server/extended_daemon_runner.py +++ b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py @@ -19,7 +19,6 @@ def daemonize_parser(parser_obj, action_funcs, help_menu): parser_obj.usage = None
parser_obj.add_argument("action", choices=action_funcs,
action="store", help=help_menu)
- return
class ExtendedDaemonRunner(runner.DaemonRunner):
@@ -76,7 +75,12 @@ class ExtendedDaemonRunner(runner.DaemonRunner): self.app = app
self.daemon_context = daemon.DaemonContext()
self.daemon_context.stdin = open(app.stdin_path, 'rt')
- self.daemon_context.stdout = open(app.stdout_path, 'w+t')
+ try:
+ self.daemon_context.stdout = open(app.stdout_path, 'w+t')
+ except IOError as err:
+ # catch 'tty' error when launching server from remote location
+ app.stdout_path = "/dev/null"
+ self.daemon_context.stdout = open(app.stdout_path, 'w+t')
self.daemon_context.stderr = open(app.stderr_path,
'a+t', buffering=0)
diff --git a/scripts/automation/trex_control_plane/server/trex_daemon_server.py b/scripts/automation/trex_control_plane/server/trex_daemon_server.py index ec07cb8a..9784d42a 100755 --- a/scripts/automation/trex_control_plane/server/trex_daemon_server.py +++ b/scripts/automation/trex_control_plane/server/trex_daemon_server.py @@ -57,15 +57,7 @@ def main (): print "Launching user must have sudo privileges in order to run TRex daemon.\nTerminating daemon process." exit(-1) - try: - daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser) - except IOError as err: - # catch 'tty' error when launching server from remote location - if err.errno == errno.ENXIO: - trex_app.stdout_path = "/dev/null" - daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser) - else: - raise + daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser) #This ensures that the logger file handle does not get closed during daemonization daemon_runner.daemon_context.files_preserve=[handler.stream] |