diff options
author | 2016-11-16 17:40:01 +0200 | |
---|---|---|
committer | 2016-11-16 17:40:01 +0200 | |
commit | 11c216470c30e4c200e46e1b51d721a549f440d6 (patch) | |
tree | 03ad20c67decf7da4d52e5679c2fd22dd8f5eaa6 /scripts/automation/trex_control_plane/stl/trex_stl_lib | |
parent | e46e3f598e52112b9db21d6faabde7a5c87341cb (diff) | |
parent | efba1bd2c32391c443a9dbaf0fffa6468bb681c6 (diff) |
Merge branch 'master' to rx_features phase 0.5
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib')
4 files changed, 100 insertions, 37 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py index 2c95844b..11e87592 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py @@ -137,6 +137,10 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): + THREAD_STATE_ACTIVE = 1 + THREAD_STATE_ZOMBIE = 2 + THREAD_STATE_DEAD = 3 + def __init__ (self, server, port, stateless_client): self.port = port @@ -159,7 +163,10 @@ class CTRexAsyncClient(): self.connected = False self.zipped = ZippedMsg() - + + self.t_state = self.THREAD_STATE_DEAD + + # connects the async channel def connect (self): @@ -173,8 +180,8 @@ class CTRexAsyncClient(): self.socket = self.context.socket(zmq.SUB) - # before running the thread - mark as active - self.active = True + # before running the thread - mark as active + self.t_state = self.THREAD_STATE_ACTIVE self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list @@ -198,26 +205,26 @@ class CTRexAsyncClient(): return RC_OK() - - # disconnect def disconnect (self): if not self.connected: return # mark for join - self.active = False - - # signal that the context was destroyed (exit the thread loop) + self.t_state = self.THREAD_STATE_DEAD self.context.term() - - # join self.t.join() + # done self.connected = False + # set the thread as a zombie (in case of server death) + def set_as_zombie (self): + self.last_data_recv_ts = None + self.t_state = self.THREAD_STATE_ZOMBIE + # thread function def _run (self): @@ -231,12 +238,19 @@ class CTRexAsyncClient(): self.monitor.reset() - while self.active: + while self.t_state != self.THREAD_STATE_DEAD: try: with self.monitor: line = self.socket.recv() + # last data recv. + self.last_data_recv_ts = time.time() + + # if thread was marked as zomibe - it does nothing besides fetching messages + if self.t_state == self.THREAD_STATE_ZOMBIE: + continue + self.monitor.on_recv_msg(line) # try to decomrpess @@ -246,7 +260,6 @@ class CTRexAsyncClient(): line = line.decode() - self.last_data_recv_ts = time.time() # signal once if not got_data: @@ -259,13 +272,14 @@ class CTRexAsyncClient(): # signal once if got_data: self.event_handler.on_async_dead() + self.set_as_zombie() got_data = False continue except zmq.ContextTerminated: # outside thread signaled us to exit - assert(not self.active) + assert(self.t_state != self.THREAD_STATE_ACTIVE) break msg = json.loads(line) @@ -283,16 +297,29 @@ class CTRexAsyncClient(): # closing of socket must be from the same thread self.socket.close(linger = 0) - def is_thread_alive (self): - return self.t.is_alive() - - # did we get info for the last 3 seconds ? + + # return True if the subscriber got data in the last 3 seconds + # even if zombie - will return true if got data def is_alive (self): + + # maybe the thread has exited with exception + if not self.t.is_alive(): + return False + + # simply no data if self.last_data_recv_ts == None: return False + # timeout of data return ( (time.time() - self.last_data_recv_ts) < 3 ) + + # more granular than active - it means that thread state is active we get info + # zomibes will return false + def is_active (self): + return self.is_alive() and self.t_state == self.THREAD_STATE_ACTIVE + + def get_stats (self): return self.stats diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 9290acbf..cf328d2e 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -177,8 +177,8 @@ class EventsHandler(object): def on_async_dead (self): if self.client.connected: msg = 'Lost connection to server' - self.__add_event_log('local', 'info', msg, True) self.client.connected = False + self.__add_event_log('local', 'info', msg, True) def on_async_alive (self): @@ -346,6 +346,8 @@ class EventsHandler(object): # server stopped elif (event_type == 100): ev = "Server has stopped" + # to avoid any new messages on async + self.client.async_client.set_as_zombie() self.__async_event_server_stopped() show_event = True @@ -2518,7 +2520,7 @@ class STLClient(object): slave = port ^ 0x1 if slave in ports: - raise STLError("dual mode: cannot provide adjacent ports ({0}, {1}) in a batch".format(master, slave)) + raise STLError("dual mode: please specify only one of adjacent ports ({0}, {1}) in a batch".format(master, slave)) if not slave in self.get_acquired_ports(): raise STLError("dual mode: adjacent port {0} must be owned during dual mode".format(slave)) @@ -2567,7 +2569,7 @@ class STLClient(object): self.logger.post_cmd(RC_ERR(e)) raise - all_ports = ports + [p ^ 0x1 for p in ports] + all_ports = ports + [p ^ 0x1 for p in ports if profile_b] self.remove_all_streams(ports = all_ports) @@ -2576,7 +2578,8 @@ class STLClient(object): slave = port ^ 0x1 self.add_streams(profile_a.get_streams(), master) - self.add_streams(profile_b.get_streams(), slave) + if profile_b: + self.add_streams(profile_b.get_streams(), slave) return self.start(ports = all_ports, duration = duration) @@ -2738,7 +2741,7 @@ class STLClient(object): while set(self.get_active_ports()).intersection(ports): # make sure ASYNC thread is still alive - otherwise we will be stuck forever - if not self.async_client.is_thread_alive(): + if not self.async_client.is_active(): raise STLError("subscriber thread is dead") time.sleep(0.01) @@ -3521,21 +3524,28 @@ class STLClient(object): @__console def push_line (self, line): '''Push a pcap file ''' + args = [self, + "push", + self.push_line.__doc__, + parsing_opts.REMOTE_FILE, + parsing_opts.PORT_LIST_WITH_ALL, + parsing_opts.COUNT, + parsing_opts.DURATION, + parsing_opts.IPG, + parsing_opts.SPEEDUP, + parsing_opts.FORCE, + parsing_opts.DUAL] + + parser = parsing_opts.gen_parser(*(args + [parsing_opts.FILE_PATH_NO_CHECK])) + opts = parser.parse_args(line.split(), verify_acquired = True) - parser = parsing_opts.gen_parser(self, - "push", - self.push_line.__doc__, - parsing_opts.FILE_PATH, - parsing_opts.REMOTE_FILE, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.COUNT, - parsing_opts.DURATION, - parsing_opts.IPG, - parsing_opts.SPEEDUP, - parsing_opts.FORCE, - parsing_opts.DUAL) + if not opts: + return opts + + if not opts.remote: + parser = parsing_opts.gen_parser(*(args + [parsing_opts.FILE_PATH])) + opts = parser.parse_args(line.split(), verify_acquired = True) - opts = parser.parse_args(line.split(), verify_acquired = True) if not opts: return opts diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py index e63f9125..aa797773 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py @@ -1046,6 +1046,17 @@ class STLProfile(object): else: pkts_a, pkts_b = PCAPReader(pcap_file).read_all(split_mode = split_mode) + # swap the packets if a is empty, or the ts of first packet in b is earlier + if not pkts_a: + pkts_a, pkts_b = pkts_b, pkts_a + elif (ipg_usec is None) and pkts_b: + meta = pkts_a[0][1] + start_time_a = meta[0] * 1e6 + meta[1] + meta = pkts_b[0][1] + start_time_b = meta[0] * 1e6 + meta[1] + if start_time_b < start_time_a: + pkts_a, pkts_b = pkts_b, pkts_a + profile_a = STLProfile.__pkts_to_streams(pkts_a, ipg_usec, speedup, @@ -1073,6 +1084,8 @@ class STLProfile(object): def __pkts_to_streams (pkts, ipg_usec, speedup, loop_count, vm, packet_hook, start_delay_usec = 0): streams = [] + if speedup == 0: + raise STLError('Speedup should not be 0') # 10 ms delay before starting the PCAP last_ts_usec = -(start_delay_usec) @@ -1084,7 +1097,10 @@ class STLProfile(object): for i, (cap, meta) in enumerate(pkts, start = 1): # IPG - if not provided, take from cap if ipg_usec == None: - ts_usec = (meta[0] * 1e6 + meta[1]) / float(speedup) + packet_time = meta[0] * 1e6 + meta[1] + if i == 1: + base_time = packet_time + ts_usec = (packet_time - base_time) / float(speedup) else: ts_usec = (ipg_usec * i) / float(speedup) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py index 97c9035a..e7f04546 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py @@ -43,7 +43,7 @@ CORE_MASK = 26 DUAL = 27 FLOW_CTRL = 28 SUPPORTED = 29 -RX_FILTER_MODE = 30 +FILE_PATH_NO_CHECK = 30 OUTPUT_FILENAME = 31 ALL_FILES = 32 @@ -54,6 +54,8 @@ IPV4 = 35 DEST = 36 RETRIES = 37 +RX_FILTER_MODE = 38 + GLOBAL_STATS = 50 PORT_STATS = 51 PORT_STATUS = 52 @@ -440,6 +442,14 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'type': is_valid_file, 'help': "File path to load"}), + FILE_PATH_NO_CHECK: ArgumentPack(['-f'], + {'metavar': 'FILE', + 'dest': 'file', + 'nargs': 1, + 'required': True, + 'type': str, + 'help': "File path to load"}), + FILE_FROM_DB: ArgumentPack(['--db'], {'metavar': 'LOADED_STREAM_PACK', 'help': "A stream pack which already loaded into console cache."}), |