diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_async_client.py | 82 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_stateless_client.py | 143 |
2 files changed, 167 insertions, 58 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 6793a4ca..e38c6ca7 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -159,39 +159,94 @@ class CTRexAsyncClient(): self.stats = TrexAsyncStatsManager() + self.connected = False + + # connects the async channel + def connect (self): + + if self.connected: + self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + print "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + + # before running the thread - mark as active self.active = True - self.t = threading.Thread(target = self.run) + self.alive = False + self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() + self.connected = True - def run (self): + # wait for data streaming from the server + timeout = time.time() + 5 + while not self.alive: + time.sleep(0.01) + if time.time() > timeout: + self.disconnect() + return False, "*** [subscriber] - no data flow from server at : " + self.tr - # Socket to talk to server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) + return True, "" + + + # disconnect + def disconnect (self): + if not self.connected: + return + + # signal that the context was destroyed (exit the thread loop) + self.context.term() + + # mark for join and join + self.active = False + self.t.join() + + # done + self.connected = False + + # thread function + def _run (self): + + # no data yet... + self.alive = False + # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') - self.socket.setsockopt(zmq.RCVTIMEO, 3000) + self.socket.setsockopt(zmq.RCVTIMEO, 5000) while self.active: try: line = self.socket.recv_string(); - self.stateless_client.on_async_alive() + if not self.alive: + self.stateless_client.on_async_alive() + self.alive = True + + # got a timeout - mark as not alive and retry except zmq.Again: - self.stateless_client.on_async_dead() + + if self.alive: + self.stateless_client.on_async_dead() + self.alive = False + continue + except zmq.ContextTerminated: + # outside thread signaled us to exit + self.alive = False + break + msg = json.loads(line) name = msg['name'] @@ -201,6 +256,10 @@ class CTRexAsyncClient(): self.__dispatch(name, type, data) + + # closing of socket must be from the same thread + self.socket.close(linger = 0) + def get_stats (self): return self.stats @@ -220,8 +279,3 @@ class CTRexAsyncClient(): else: pass - - def stop (self): - self.active = False - self.t.join() - diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 4436be75..3a9d74a9 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -414,6 +414,17 @@ class Port(object): return self.ok() + + def validate (self): + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("validate", params) + if not rc: + return self.err(data) + + return self.ok() + ################# events handler ###################### def async_event_port_stopped (self): self.state = self.STATE_STREAMS @@ -434,14 +445,13 @@ class CTRexStatelessClient(object): self._server_version = None self.__err_log = None - self._async_client = CTRexAsyncClient(server, async_port, self) + self.async_client = CTRexAsyncClient(server, async_port, self) self.streams_db = CStreamsDB() - self.connected = False - self.events = [] + self.connected = False ################# events handler ###################### def add_event_log (self, msg, ev_type, show = False): @@ -504,7 +514,8 @@ class CTRexStatelessClient(object): self.ports[port_id].async_event_port_stopped() def async_event_server_stopped (self): - self.disconnect() + self.connected = False + def get_events (self): return self.events @@ -519,6 +530,11 @@ class CTRexStatelessClient(object): def wrap(*args): time1 = time.time() ret = f(*args) + + # don't want to print on error + if ret.bad(): + return ret + delta = time.time() - time1 for unit in ['sec','ms','usec']: @@ -566,13 +582,19 @@ class CTRexStatelessClient(object): # connection sequence def connect(self): + # clear this flag self.connected = False - # connect + # connect sync channel rc, data = self.comm_link.connect() if not rc: return RC_ERR(data) + # connect async channel + rc, data = self.async_client.connect() + if not rc: + return RC_ERR(data) + # version rc, data = self.transmit("get_version") if not rc: @@ -618,24 +640,19 @@ class CTRexStatelessClient(object): def disconnect(self): - self.connected = False self.comm_link.disconnect() + self.async_client.disconnect() return RC_OK() def on_async_dead (self): - if self.is_connected(): + if self.connected: msg = 'lost connection to server' self.add_event_log(msg, 'local', True) - - self.disconnect() + self.connected = False def on_async_alive (self): - if not self.is_connected(): - msg = 'server connection restored' - self.add_event_log(msg, 'local', True) - - self.cmd_connect() + pass ########### cached queries (no server traffic) ########### @@ -659,7 +676,7 @@ class CTRexStatelessClient(object): return port_ids def get_stats_async (self): - return self._async_client.get_stats() + return self.async_client.get_stats() def get_connection_port (self): return self.comm_link.port @@ -842,6 +859,17 @@ class CTRexStatelessClient(object): return rc + def validate (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].validate()) + + return rc + + def get_port_stats(self, port_id=None): pass @@ -962,19 +990,6 @@ class CTRexStatelessClient(object): return RC_OK() - def cmd_pause_line (self, line): - '''Pause active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self, - "pause", - self.cmd_stop_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - - opts = parser.parse_args(line.split()) - if opts is None: - return RC_ERR("bad command line paramters") - - return self.cmd_pause(opts.ports) - # resume cmd def cmd_resume (self, port_id_list): @@ -995,19 +1010,6 @@ class CTRexStatelessClient(object): return RC_OK() - def cmd_resume_line (self, line): - '''Resume active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self, - "resume", - self.cmd_stop_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - - opts = parser.parse_args(line.split()) - if opts is None: - return RC_ERR("bad command line paramters") - - return self.cmd_resume(opts.ports) - # start cmd def cmd_start (self, port_id_list, stream_list, mult, force, duration): @@ -1043,6 +1045,15 @@ class CTRexStatelessClient(object): return RC_OK() + + def cmd_validate (self, port_id_list): + rc = self.validate(port_id_list) + rc.annotate("Validating streams on port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc + + return RC_OK() + ############## High Level API With Parser ################ @timing def cmd_start_line (self, line): @@ -1126,7 +1137,53 @@ class CTRexStatelessClient(object): def cmd_reset_line (self, line): return self.cmd_reset() - + + @timing + def cmd_pause_line (self, line): + '''Pause active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "pause", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + return self.cmd_pause(opts.ports) + + + @timing + def cmd_resume_line (self, line): + '''Resume active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "resume", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + return self.cmd_resume(opts.ports) + + + @timing + def cmd_validate_line (self, line): + '''validates port(s) stream configuration\n''' + + parser = parsing_opts.gen_parser(self, + "validate", + self.cmd_validate_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + return self.cmd_validate(opts.ports) + + def cmd_exit_line (self, line): print format_text("Exiting\n", 'bold') # a way to exit @@ -1262,5 +1319,3 @@ class CTRexStatelessClient(object): if __name__ == "__main__": pass - - |