From 2d9d5e147b8f15a8308dad46711390f3b168ec56 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 18 Jan 2016 11:27:10 -0500 Subject: highly draft - just backing up --- .../trex_control_plane/client/trex_async_client.py | 22 +- .../client/trex_stateless_client.py | 1286 ++++++++++++-------- 2 files changed, 780 insertions(+), 528 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 2bb0e9cd..9828c838 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -144,12 +144,15 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client, prn_func = None): + def __init__ (self, server, port, stateless_client): self.port = port self.server = server + self.stateless_client = stateless_client - self.prn_func = prn_func + + self.event_handler = stateless_client.event_handler + self.logger = self.stateless_client.logger self.raw_snapshot = {} @@ -170,10 +173,7 @@ class CTRexAsyncClient(): msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) - if self.prn_func: - self.prn_func(msg) - else: - print msg + self.logger.log(msg) # Socket to talk to server self.context = zmq.Context() @@ -235,7 +235,7 @@ class CTRexAsyncClient(): # signal once if not got_data: - self.stateless_client.on_async_alive() + self.event_handler.on_async_alive() got_data = True @@ -244,7 +244,7 @@ class CTRexAsyncClient(): # signal once if got_data: - self.stateless_client.on_async_dead() + self.event_handler.on_async_dead() got_data = False continue @@ -284,11 +284,11 @@ class CTRexAsyncClient(): def __dispatch (self, name, type, data): # stats if name == "trex-global": - self.stateless_client.handle_async_stats_update(data) + self.event_handler.handle_async_stats_update(data) # events elif name == "trex-event": - self.stateless_client.handle_async_event(type, data) + self.event_handler.handle_async_event(type, data) # barriers elif name == "trex-barrier": @@ -315,7 +315,7 @@ class CTRexAsyncClient(): # add to the queue self.async_barriers.append(barrier) - rc = self.stateless_client.transmit("publish_now", params = {'key' : key}) + rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) if not rc: return rc diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 105c4d01..43912e55 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -25,6 +25,26 @@ from trex_port import Port from common.trex_types import * from trex_async_client import CTRexAsyncClient +############################ logger ############################# +############################ ############################# +############################ ############################# + +class STLFailure(Exception): + def __init__ (self, rc_or_str): + self.msg = str(rc_or_str) + + def __str__ (self): + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + + + s = "\n******\n" + s += "Error reported at {0}:{1}\n\n".format(format_text(fname, 'bold'), format_text(exc_tb.tb_lineno), 'bold') + s += "specific error:\n\n'{0}'\n".format(format_text(self.msg, 'bold')) + + return s + + # logger API for the client class LoggerApi(object): # verbose levels @@ -35,9 +55,11 @@ class LoggerApi(object): def __init__(self): self.level = LoggerApi.VERBOSE_REGULAR + # implemented by specific logger def write(self, msg, newline = True): raise Exception("implement this") + # implemented by specific logger def flush(self): raise Exception("implement this") @@ -62,10 +84,15 @@ class LoggerApi(object): self.write(msg, newline) + # logging that comes from async event + def async_log (self, msg, level = VERBOSE_REGULAR, newline = True): + self.log(msg, level, newline) + # annotates an action with a RC - writes to log the result def annotate (self, rc, desc = None, show_status = True): rc.annotate(self.log, desc, show_status) + # default logger - to stdout class DefaultLogger(LoggerApi): def write (self, msg, newline = True): @@ -78,90 +105,41 @@ class DefaultLogger(LoggerApi): sys.stdout.flush() -class CTRexStatelessClient(object): - """docstring for CTRexStatelessClient""" - - def __init__(self, - username = general_utils.get_current_user(), - server = "localhost", - sync_port = 4501, - async_port = 4500, - verbose_level = LoggerApi.VERBOSE_REGULAR, - virtual = False, - logger = None): - - super(CTRexStatelessClient, self).__init__() - - self.user = username - - # logger - if not logger: - self.logger = DefaultLogger() - else: - self.logger = logger - - # initial verbose - self.logger.set_verbose(verbose_level) - - self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.logger) - - self.ports = {} - self._connection_info = {"server": server, - "sync_port": sync_port, - "async_port": async_port} - self.system_info = {} - self.server_version = {} +############################ async event hander ############################# +############################ ############################# +############################ ############################# - self.async_client = CTRexAsyncClient(server, async_port, self, self.logger.log) +# handles different async events given to the client +class AsyncEventHandler(object): - self.streams_db = CStreamsDB() - self.global_stats = trex_stats.CGlobalStats(self._connection_info, - self.server_version, - self.ports) - self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, - self.ports) + def __init__ (self, client): + self.client = client + self.logger = self.client.logger self.events = [] - self.session_id = random.getrandbits(32) - self.read_only = False - self.connected = False - self.prompt_redraw_cb = None - - - # returns the port object - def get_port (self, port_id): - return self.ports.get(port_id, None) - + # public functions - # connection server ip - def get_server_ip (self): - return self.comm_link.get_server() + def get_events (self): + return self.events - # connection server port - def get_server_port (self): - return self.comm_link.get_port() + def clear_events (self): + self.events = [] - ################# events handler ###################### - def add_event_log (self, msg, ev_type, show = False): - if ev_type == "server": - prefix = "[server]" - elif ev_type == "local": - prefix = "[local]" + def on_async_dead (self): + if self.client.connected: + msg = 'lost connection to server' + self.__add_event_log(msg, 'local', True) + self.client.connected = False - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) - if show: - self.logger.log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) - if self.prompt_redraw_cb and self.logger.check_verbose(self.logger.VERBOSE_REGULAR): - self.prompt_redraw_cb() + def on_async_alive (self): + pass - + # handles an async stats update from the subscriber def handle_async_stats_update(self, dump_data): global_stats = {} port_stats = {} @@ -173,7 +151,7 @@ class CTRexStatelessClient(object): if m: port_id = int(m.group(2)) field_name = m.group(1) - if self.ports.has_key(port_id): + if self.client.ports.has_key(port_id): if not port_id in port_stats: port_stats[port_id] = {} port_stats[port_id][field_name] = value @@ -184,13 +162,14 @@ class CTRexStatelessClient(object): global_stats[key] = value # update the general object with the snapshot - self.global_stats.update(global_stats) + self.client.global_stats.update(global_stats) + # update all ports for port_id, data in port_stats.iteritems(): - self.ports[port_id].port_stats.update(data) - + self.client.ports[port_id].port_stats.update(data) + # dispatcher for server async events (port started, port stopped and etc.) def handle_async_event (self, type, data): # DP stopped @@ -200,7 +179,7 @@ class CTRexStatelessClient(object): if (type == 0): port_id = int(data['port_id']) ev = "Port {0} has started".format(port_id) - self.async_event_port_started(port_id) + self.__async_event_port_started(port_id) # port stopped elif (type == 1): @@ -208,8 +187,8 @@ class CTRexStatelessClient(object): ev = "Port {0} has stopped".format(port_id) # call the handler - self.async_event_port_stopped(port_id) - + self.__async_event_port_stopped(port_id) + # port paused elif (type == 2): @@ -217,7 +196,7 @@ class CTRexStatelessClient(object): ev = "Port {0} has paused".format(port_id) # call the handler - self.async_event_port_paused(port_id) + self.__async_event_port_paused(port_id) # port resumed elif (type == 3): @@ -225,7 +204,7 @@ class CTRexStatelessClient(object): ev = "Port {0} has resumed".format(port_id) # call the handler - self.async_event_port_resumed(port_id) + self.__async_event_port_resumed(port_id) # port finished traffic elif (type == 4): @@ -233,7 +212,7 @@ class CTRexStatelessClient(object): ev = "Port {0} job done".format(port_id) # call the handler - self.async_event_port_stopped(port_id) + self.__async_event_port_stopped(port_id) show_event = True # port was stolen... @@ -241,7 +220,7 @@ class CTRexStatelessClient(object): session_id = data['session_id'] # false alarm, its us - if session_id == self.session_id: + if session_id == self.client.session_id: return port_id = int(data['port_id']) @@ -250,13 +229,13 @@ class CTRexStatelessClient(object): ev = "Port {0} was forcely taken by '{1}'".format(port_id, who) # call the handler - self.async_event_port_forced_acquired(port_id) + self.__async_event_port_forced_acquired(port_id) show_event = True # server stopped elif (type == 100): ev = "Server has stopped" - self.async_event_server_stopped() + self.__async_event_server_stopped() show_event = True @@ -265,70 +244,186 @@ class CTRexStatelessClient(object): return - self.add_event_log(ev, 'server', show_event) + self.__add_event_log(ev, 'server', show_event) - def async_event_port_stopped (self, port_id): - self.ports[port_id].async_event_port_stopped() + # private functions + def __async_event_port_stopped (self, port_id): + self.client.ports[port_id].async_event_port_stopped() - def async_event_port_started (self, port_id): - self.ports[port_id].async_event_port_started() - - def async_event_port_paused (self, port_id): - self.ports[port_id].async_event_port_paused() + def __async_event_port_started (self, port_id): + self.client.ports[port_id].async_event_port_started() - def async_event_port_resumed (self, port_id): - self.ports[port_id].async_event_port_resumed() + def __async_event_port_paused (self, port_id): + self.client.ports[port_id].async_event_port_paused() - def async_event_port_forced_acquired (self, port_id): - self.ports[port_id].async_event_forced_acquired() - self.read_only = True + def __async_event_port_resumed (self, port_id): + self.client.ports[port_id].async_event_port_resumed() - def async_event_server_stopped (self): - self.connected = False + def __async_event_port_forced_acquired (self, port_id): + self.client.ports[port_id].async_event_forced_acquired() + self.client.read_only = True - def get_events (self): - return self.events + def __async_event_server_stopped (self): + self.client.connected = False - def clear_events (self): - self.events = [] - ############# helper functions section ############## + # add event to log + def __add_event_log (self, msg, ev_type, show = False): - # measure time for functions - def timing(f): - def wrap(*args): - - time1 = time.time() - ret = f(*args) + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" - # don't want to print on error - if ret.bad(): - return ret + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) - delta = time.time() - time1 + if show: + self.logger.async_log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) - client = args[0] - client.logger.log(format_time(delta) + "\n") - return ret + - return wrap +############################ RPC layer ############################# +############################ ############################# +############################ ############################# - def validate_port_list(self, port_id_list): - if not isinstance(port_id_list, list): - print type(port_id_list) - return False +class CCommLink(object): + """describes the connectivity of the stateless client method""" + def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): + self.virtual = virtual + self.server = server + self.port = port + self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) + + @property + def is_connected(self): + if not self.virtual: + return self.rpc_link.connected + else: + return True + + def get_server (self): + return self.server + + def get_port (self): + return self.port + + def connect(self): + if not self.virtual: + return self.rpc_link.connect() + + def disconnect(self): + if not self.virtual: + return self.rpc_link.disconnect() + + def transmit(self, method_name, params={}): + if self.virtual: + self._prompt_virtual_tx_msg() + _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) + print msg + return + else: + return self.rpc_link.invoke_rpc_method(method_name, params) + + def transmit_batch(self, batch_list): + if self.virtual: + self._prompt_virtual_tx_msg() + print [msg + for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) + for command in batch_list]] + else: + batch = self.rpc_link.create_batch() + for command in batch_list: + batch.add(command.method, command.params) + # invoke the batch + return batch.invoke() + + def _prompt_virtual_tx_msg(self): + print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, + port=self.port) - # check each item of the sequence - return all([ (port_id >= 0) and (port_id < self.get_port_count()) - for port_id in port_id_list ]) + + +############################ client ############################# +############################ ############################# +############################ ############################# + +class CTRexStatelessClient(object): + """docstring for CTRexStatelessClient""" + + def __init__(self, + username = general_utils.get_current_user(), + server = "localhost", + sync_port = 4501, + async_port = 4500, + verbose_level = LoggerApi.VERBOSE_REGULAR, + logger = None, + virtual = False): + + + self.username = username + + # init objects + self.ports = {} + self.server_version = {} + self.system_info = {} + self.session_id = random.getrandbits(32) + self.read_only = False + self.connected = False + + # logger + self.logger = DefaultLogger() if not logger else logger + + # initial verbose + self.logger.set_verbose(verbose_level) + + # low level RPC layer + self.comm_link = CCommLink(server, + sync_port, + virtual, + self.logger) + + # async event handler manager + self.event_handler = AsyncEventHandler(self) + + # async subscriber level + self.async_client = CTRexAsyncClient(server, + async_port, + self) + + + + + # stats + self.connection_info = {"username": username, + "server": server, + "sync_port": sync_port, + "async_port": async_port, + "virtual": virtual} + + + self.global_stats = trex_stats.CGlobalStats(self.connection_info, + self.server_version, + self.ports) + + self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats, + self.ports) + + # stream DB + self.streams_db = CStreamsDB() + + + + ############# private functions - used by the class itself ########### # some preprocessing for port argument def __ports (self, port_id_list): @@ -350,423 +445,456 @@ class CTRexStatelessClient(object): return port_id_list - ############ boot up section ################ + # sync ports + def __sync_ports (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) - # mode can be RW - read / write, RWF - read write with force , RO - read only - def connect(self, mode = "RW"): - - if self.is_connected(): - self.disconnect() - - # clear this flag - self.connected = False - + rc = RC() - # connect sync channel - rc = self.comm_link.connect() - if rc.bad(): - return rc + for port_id in port_id_list: + rc.add(self.ports[port_id].sync()) - # connect async channel - rc = self.async_client.connect() - if rc.bad(): - return rc + return rc - # version - rc = self.transmit("get_version") - if rc.bad(): - return rc + # acquire ports, if port_list is none - get all + def __acquire (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) - self.server_version = rc.data() - self.global_stats.server_version = rc.data() + rc = RC() - # cache system info - rc = self.transmit("get_system_info") - if rc.bad(): - return rc + for port_id in port_id_list: + rc.add(self.ports[port_id].acquire(force)) - self.system_info = rc.data() + return rc - # cache supported commands - rc = self.transmit("get_supported_cmds") - if rc.bad(): - return rc + # release ports + def __release (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) - self.supported_cmds = rc.data() + rc = RC() - # create ports - for port_id in xrange(self.get_port_count()): - speed = self.system_info['ports'][port_id]['speed'] - driver = self.system_info['ports'][port_id]['driver'] + for port_id in port_id_list: + rc.add(self.ports[port_id].release()) - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id) + return rc - # sync the ports - rc = self.sync_ports() - if rc.bad(): - return rc + def __add_stream(self, stream_id, stream_obj, port_id_list = None): - # acquire all ports - if mode == "RW": - rc = self.acquire(force = False) + port_id_list = self.__ports(port_id_list) - # fallback to read only if failed - if rc.bad(): - self.annotate(rc, show_status = False) - self.logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) + rc = RC() - self.release(self.get_acquired_ports()) - self.read_only = True - else: - self.read_only = False + for port_id in port_id_list: + rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) - elif mode == "RWF": - rc = self.acquire(force = True) - if rc.bad(): - return rc - self.read_only = False + return rc - elif mode == "RO": - # no acquire on read only - rc = RC_OK() - self.read_only = True - - self.connected = True - return RC_OK() + def __add_stream_pack(self, stream_pack, port_id_list = None): + port_id_list = self.__ports(port_id_list) - def is_read_only (self): - return self.read_only + rc = RC() - def is_connected (self): - return self.connected and self.comm_link.is_connected + for port_id in port_id_list: + rc.add(self.ports[port_id].add_streams(stream_pack)) + return rc - def disconnect(self): - # release any previous acquired ports - if self.is_connected(): - self.release(self.get_acquired_ports()) - self.comm_link.disconnect() - self.async_client.disconnect() - self.connected = False + def __remove_stream(self, stream_id, port_id_list = None): + port_id_list = self.__ports(port_id_list) - return RC_OK() + rc = RC() + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_stream(stream_id)) - def on_async_dead (self): - if self.connected: - msg = 'lost connection to server' - self.add_event_log(msg, 'local', True) - self.connected = False + return rc - def on_async_alive (self): - pass - ########### cached queries (no server traffic) ########### - def get_supported_cmds(self): - return self.supported_cmds + def __remove_all_streams(self, port_id_list = None): + port_id_list = self.__ports(port_id_list) - def get_version(self): - return self.server_version + rc = RC() - def get_system_info(self): - return self.system_info + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_all_streams()) - def get_port_count(self): - return self.system_info.get("port_count") + return rc - def get_port_ids(self, as_str=False): - port_ids = range(self.get_port_count()) - if as_str: - return " ".join(str(p) for p in port_ids) - else: - return port_ids - def get_stats_async (self): - return self.async_client.get_stats() + def __get_stream(self, stream_id, port_id, get_pkt = False): - def get_connection_port (self): - return self.comm_link.port + return self.ports[port_id].get_stream(stream_id) - def get_connection_ip (self): - return self.comm_link.server - def get_all_ports (self): - return [port_id for port_id, port_obj in self.ports.iteritems()] + def __get_all_streams(self, port_id, get_pkt = False): - def get_acquired_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_acquired()] + return self.ports[port_id].get_all_streams() - def get_active_ports(self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_active()] - def get_paused_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_paused()] + def __get_stream_id_list(self, port_id): - def get_transmitting_ports (self): - return [port_id - for port_id, port_obj in self.ports.iteritems() - if port_obj.is_transmitting()] + return self.ports[port_id].get_stream_id_list() - def set_verbose (self, level): - self.logger.set_verbose(level) - def get_verbose (self): - return self.logger.get_verbose() + def __start_traffic (self, multiplier, duration, port_id_list = None): - def set_prompt_redraw_cb(self, cb): - self.prompt_redraw_cb = cb + port_id_list = self.__ports(port_id_list) + rc = RC() - def annotate (self, rc, desc = None, show_status = True): + for port_id in port_id_list: + rc.add(self.ports[port_id].start(multiplier, duration)) - rc.annotate(self.logger.log, desc, show_status) + return rc - ############# server actions ################ - # ping server - def ping(self): - return self.transmit("ping") + def __resume_traffic (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + rc = RC() + for port_id in port_id_list: + rc.add(self.ports[port_id].resume()) - def get_global_stats(self): - return self.transmit("get_global_stats") + return rc + def __pause_traffic (self, port_id_list = None, force = False): - ########## port commands ############## - def sync_ports (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) - rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].sync()) - + rc.add(self.ports[port_id].pause()) + return rc - # acquire ports, if port_list is none - get all - def acquire (self, port_id_list = None, force = False): - port_id_list = self.__ports(port_id_list) + def __stop_traffic (self, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].acquire(force)) - + rc.add(self.ports[port_id].stop(force)) + return rc - - # release ports - def release (self, port_id_list = None): - port_id_list = self.__ports(port_id_list) + + def __update_traffic (self, mult, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].release()) - + rc.add(self.ports[port_id].update(mult)) + return rc - - def add_stream(self, stream_id, stream_obj, port_id_list = None): + def __validate (self, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) - + rc.add(self.ports[port_id].validate()) + return rc - - def add_stream_pack(self, stream_pack, port_id_list = None): - port_id_list = self.__ports(port_id_list) - rc = RC() + # connect to server + # mode can be RW - read / write, RWF - read write with force , RO - read only + def __connect(self, mode = "RW"): - for port_id in port_id_list: - rc.add(self.ports[port_id].add_streams(stream_pack)) + # first disconnect if already connected + if self.is_connected(): + self.__disconnect() - return rc + # clear this flag + self.connected = False + + # connect sync channel + rc = self.comm_link.connect() + if not rc: + return rc + # connect async channel + rc = self.async_client.connect() + if not rc: + return rc + # version + rc = self._transmit("get_version") + if not rc: + return rc - def remove_stream(self, stream_id, port_id_list = None): - port_id_list = self.__ports(port_id_list) + self.server_version = rc.data() + self.global_stats.server_version = rc.data() - rc = RC() + # cache system info + rc = self._transmit("get_system_info") + if not rc: + return rc - for port_id in port_id_list: - rc.add(self.ports[port_id].remove_stream(stream_id)) - - return rc + self.system_info = rc.data() + # cache supported commands + rc = self._transmit("get_supported_cmds") + if not rc: + return rc + self.supported_cmds = rc.data() - def remove_all_streams(self, port_id_list = None): - port_id_list = self.__ports(port_id_list) + # create ports + for port_id in xrange(self.system_info["port_count"]): + speed = self.system_info['ports'][port_id]['speed'] + driver = self.system_info['ports'][port_id]['driver'] + + self.ports[port_id] = Port(port_id, + speed, + driver, + self.username, + self.comm_link, + self.session_id) + + + # sync the ports + rc = self.__sync_ports() + if not rc: + return rc + + # acquire all ports + if mode == "RW": + rc = self.__acquire(force = False) + + # fallback to read only if failed + if not rc: + self.logger.annotate(rc, show_status = False) + self.logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold')) + + self.__release(self.get_acquired_ports()) + self.read_only = True + else: + self.read_only = False + + elif mode == "RWF": + rc = self.__acquire(force = True) + if not rc: + return rc + self.read_only = False + + elif mode == "RO": + # no acquire on read only + rc = RC_OK() + self.read_only = True - rc = RC() - for port_id in port_id_list: - rc.add(self.ports[port_id].remove_all_streams()) - return rc + self.connected = True + return RC_OK() - - def get_stream(self, stream_id, port_id, get_pkt = False): - return self.ports[port_id].get_stream(stream_id) + # disconenct from server + def __disconnect(self): + # release any previous acquired ports + if self.is_connected(): + self.__release(self.get_acquired_ports()) + self.comm_link.disconnect() + self.async_client.disconnect() - def get_all_streams(self, port_id, get_pkt = False): + self.connected = False - return self.ports[port_id].get_all_streams() + return RC_OK() - def get_stream_id_list(self, port_id): + # ping server + def __ping (self): + return self._transmit("ping") - return self.ports[port_id].get_stream_id_list() + # start command + def __start (self, port_id_list, stream_list, mult, force, duration, dry): - def start_traffic (self, multiplier, duration, port_id_list = None): + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - port_id_list = self.__ports(port_id_list) + if active_ports: + if not force: + msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) + self.logger.log(format_text(msg, 'bold')) + return RC_ERR(msg) + else: + rc = self.cmd_stop(active_ports) + if not rc: + return rc - rc = RC() - for port_id in port_id_list: - rc.add(self.ports[port_id].start(multiplier, duration)) - - return rc + rc = self.__remove_all_streams(port_id_list) + self.logger.annotate(rc,"Removing all streams from port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc + + rc = self.__add_stream_pack(stream_list, port_id_list) + self.logger.annotate(rc,"Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) + if rc.bad(): + return rc + + # when not on dry - start the traffic , otherwise validate only + if not dry: + rc = self.__start_traffic(mult, duration, port_id_list) + self.logger.annotate(rc,"Starting traffic on port(s) {0}:".format(port_id_list)) + + return rc + else: + rc = self.__validate(port_id_list) + self.logger.annotate(rc,"Validating traffic profile on port(s) {0}:".format(port_id_list)) + + if rc.bad(): + return rc + + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) + + return rc - def resume_traffic (self, port_id_list = None, force = False): - port_id_list = self.__ports(port_id_list) - rc = RC() - for port_id in port_id_list: - rc.add(self.ports[port_id].resume()) + def __verify_port_id_list (self, port_id_list): + # check arguments + if not isinstance(port_id_list, list): + return RC_ERR("ports should be an instance of 'list'") - return rc + # all ports are valid ports + if not all([port_id in self.get_all_ports() for port_id in port_id_list]): + return RC_ERR("Port IDs valid values are '{0}' but provided '{1}'".format(self.get_all_ports(), port_id_list)) + + return RC_OK() - def pause_traffic (self, port_id_list = None, force = False): - port_id_list = self.__ports(port_id_list) - rc = RC() + def __verify_mult (self, mult, strict): + if not isinstance(mult, dict): + return RC_ERR("mult should be an instance of dict") - for port_id in port_id_list: - rc.add(self.ports[port_id].pause()) + types = ["raw", "bps", "pps", "percentage"] + if not mult.get('type', None) in types: + return RC_ERR("mult should contain 'type' field of one of '{0}'".format(types)) + + if strict: + ops = ["abs"] + else: + ops = ["abs", "add", "sub"] + if not mult.get('op', None) in ops: + return RC_ERR("mult should contain 'op' field of one of '{0}'".format(ops)) - return rc + return RC_OK() - def stop_traffic (self, port_id_list = None, force = False): + def __process_profiles (self, profiles, out): - port_id_list = self.__ports(port_id_list) - rc = RC() + for profile in (profiles if isinstance(profiles, list) else [profiles]): + # filename + if isinstance(profile, str): - for port_id in port_id_list: - rc.add(self.ports[port_id].stop(force)) - - return rc + if not os.path.isfile(profile): + return RC_ERR("file '{0}' does not exists".format(profile)) + try: + stream_list = self.streams_db.load_yaml_file(profile) + except Exception as e: + rc = RC_ERR(str(e)) + self.logger.annotate(rc) + return rc - def update_traffic (self, mult, port_id_list = None, force = False): + out += stream_list + else: + return RC_ERR("unknown profile '{0}'".format(profile)) - port_id_list = self.__ports(port_id_list) - rc = RC() - for port_id in port_id_list: - rc.add(self.ports[port_id].update(mult)) - - return rc + return RC_OK() - def validate (self, port_id_list = None): - port_id_list = self.__ports(port_id_list) - rc = RC() + # stream list + if opts.db: + stream_list = self.streams_db.get_stream_pack(opts.db) + rc = RC(stream_list != None) + self.logger.annotate(rc,"Load stream pack (from DB):") + if rc.bad(): + return RC_ERR("Failed to load stream pack") - for port_id in port_id_list: - rc.add(self.ports[port_id].validate()) - - return rc + else: + # load streams from file + stream_list = None + try: + stream_list = self.streams_db.load_yaml_file(opts.file[0]) + except Exception as e: + s = str(e) + rc=RC_ERR(s) + self.logger.annotate(rc) + return rc + + rc = RC(stream_list != None) + self.logger.annotate(rc,"Load stream pack (from file):") + if stream_list == None: + return RC_ERR("Failed to load stream pack") + ############ functions used by other classes but not users ############## - def get_port_stats(self, port_id=None): - pass + def _validate_port_list(self, port_id_list): + if not isinstance(port_id_list, list): + print type(port_id_list) + return False - def get_stream_stats(self, port_id=None): - pass + # check each item of the sequence + return all([ (port_id >= 0) and (port_id < self.get_port_count()) + for port_id in port_id_list ]) - def transmit(self, method_name, params={}): + # transmit request on the RPC link + def _transmit(self, method_name, params={}): return self.comm_link.transmit(method_name, params) - - def transmit_batch(self, batch_list): + # transmit batch request on the RPC link + def _transmit_batch(self, batch_list): return self.comm_link.transmit_batch(batch_list) - ######################### Console (high level) API ######################### - - @timing - def cmd_ping(self): - rc = self.ping() - self.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) - return rc + ############# helper functions section ############## - def cmd_connect(self, mode = "RW"): - rc = self.connect(mode) - self.annotate(rc) - return rc + # measure time for functions + def timing(f): + def wrap(*args): + + time1 = time.time() + ret = f(*args) - def cmd_disconnect(self): - rc = self.disconnect() - self.annotate(rc) - return rc + # don't want to print on error + if ret.bad(): + return ret - # reset - def cmd_reset(self): + delta = time.time() - time1 - rc = self.acquire(force = True) - self.annotate(rc, "Force acquiring all ports:") - if rc.bad(): - return rc + client = args[0] + client.logger.log(format_time(delta) + "\n") + return ret - # force stop all ports - rc = self.stop_traffic(self.get_port_ids(), True) - self.annotate(rc,"Stop traffic on all ports:") - if rc.bad(): - return rc + return wrap - # remove all streams - rc = self.remove_all_streams(self.get_port_ids()) - self.annotate(rc,"Removing all streams from all ports:") - if rc.bad(): - return rc - # TODO: clear stats - return RC_OK() - + ########## port commands ############## + + ######################### Console (high level) API ######################### # stop cmd def cmd_stop (self, port_id_list): @@ -779,8 +907,8 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) - rc = self.stop_traffic(active_ports) - self.annotate(rc,"Stopping traffic on port(s) {0}:".format(port_id_list)) + rc = self.__stop_traffic(active_ports) + self.logger.annotate(rc,"Stopping traffic on port(s) {0}:".format(port_id_list)) if rc.bad(): return rc @@ -797,8 +925,8 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) - rc = self.update_traffic(mult, active_ports) - self.annotate(rc,"Updating traffic on port(s) {0}:".format(port_id_list)) + rc = self.__update_traffic(mult, active_ports) + self.logger.annotate(rc,"Updating traffic on port(s) {0}:".format(port_id_list)) return rc @@ -832,8 +960,8 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) - rc = self.pause_traffic(active_ports) - self.annotate(rc,"Pausing traffic on port(s) {0}:".format(port_id_list)) + rc = self.__pause_traffic(active_ports) + self.logger.annotate(rc,"Pausing traffic on port(s) {0}:".format(port_id_list)) return rc @@ -849,61 +977,18 @@ class CTRexStatelessClient(object): self.logger.log(format_text(msg, 'bold')) return RC_ERR(msg) - rc = self.resume_traffic(active_ports) - self.annotate(rc,"Resume traffic on port(s) {0}:".format(port_id_list)) + rc = self.__resume_traffic(active_ports) + self.logger.annotate(rc,"Resume traffic on port(s) {0}:".format(port_id_list)) return rc - # start cmd - def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): - - active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - - if active_ports: - if not force: - msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) - self.logger.log(format_text(msg, 'bold')) - return RC_ERR(msg) - else: - rc = self.cmd_stop(active_ports) - if not rc: - return rc - - - rc = self.remove_all_streams(port_id_list) - self.annotate(rc,"Removing all streams from port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - - - rc = self.add_stream_pack(stream_list, port_id_list) - self.annotate(rc,"Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) - if rc.bad(): - return rc - - # when not on dry - start the traffic , otherwise validate only - if not dry: - rc = self.start_traffic(mult, duration, port_id_list) - self.annotate(rc,"Starting traffic on port(s) {0}:".format(port_id_list)) - - return rc - else: - rc = self.validate(port_id_list) - self.annotate(rc,"Validating traffic profile on port(s) {0}:".format(port_id_list)) - - if rc.bad(): - return rc - - # show a profile on one port for illustration - self.ports[port_id_list[0]].print_profile(mult, duration) - - return rc + # validate port(s) profile def cmd_validate (self, port_id_list): - rc = self.validate(port_id_list) - self.annotate(rc,"Validating streams on port(s) {0}:".format(port_id_list)) + rc = self.__validate(port_id_list) + self.logger.annotate(rc,"Validating streams on port(s) {0}:".format(port_id_list)) return rc @@ -925,23 +1010,6 @@ class CTRexStatelessClient(object): ############## High Level API With Parser ################ - def cmd_connect_line (self, line): - '''Connects to the TRex server''' - # define a parser - parser = parsing_opts.gen_parser(self, - "connect", - self.cmd_connect_line.__doc__, - parsing_opts.FORCE) - - opts = parser.parse_args(line.split()) - - if opts is None: - return RC_ERR("bad command line parameters") - - if opts.force: - rc = self.cmd_connect(mode = "RWF") - else: - rc = self.cmd_connect(mode = "RW") @timing def cmd_start_line (self, line): @@ -971,7 +1039,7 @@ class CTRexStatelessClient(object): if opts.db: stream_list = self.streams_db.get_stream_pack(opts.db) rc = RC(stream_list != None) - self.annotate(rc,"Load stream pack (from DB):") + self.logger.annotate(rc,"Load stream pack (from DB):") if rc.bad(): return RC_ERR("Failed to load stream pack") @@ -983,11 +1051,11 @@ class CTRexStatelessClient(object): except Exception as e: s = str(e) rc=RC_ERR(s) - self.annotate(rc) + self.logger.annotate(rc) return rc rc = RC(stream_list != None) - self.annotate(rc,"Load stream pack (from file):") + self.logger.annotate(rc,"Load stream pack (from file):") if stream_list == None: return RC_ERR("Failed to load stream pack") @@ -1253,65 +1321,249 @@ class CTRexStatelessClient(object): def _filter_namespace_args(namespace, ok_values): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} + def __verify_connected(f): + #@wraps(f) + def wrap(*args): + inst = args[0] + func_name = f.__name__ - ################################# - # ------ private classes ------ # - class CCommLink(object): - """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): - super(CTRexStatelessClient.CCommLink, self).__init__() - self.virtual = virtual - self.server = server - self.port = port - self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) - - @property - def is_connected(self): - if not self.virtual: - return self.rpc_link.connected - else: - return True + if not inst.stateless_client.is_connected(): + return RC_ERR("cannot execute '{0}' while client is disconnected".format(func_name)) - def get_server (self): - return self.server + ret = f(*args) + return ret - def get_port (self): - return self.port + return wrap - def connect(self): - if not self.virtual: - return self.rpc_link.connect() - def disconnect(self): - if not self.virtual: - return self.rpc_link.disconnect() - def transmit(self, method_name, params={}): - if self.virtual: - self._prompt_virtual_tx_msg() - _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) - print msg - return - else: - return self.rpc_link.invoke_rpc_method(method_name, params) - - def transmit_batch(self, batch_list): - if self.virtual: - self._prompt_virtual_tx_msg() - print [msg - for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) - for command in batch_list]] - else: - batch = self.rpc_link.create_batch() - for command in batch_list: - batch.add(command.method, command.params) - # invoke the batch - return batch.invoke() + ############################ API ############################# + ############################ ############################# + ############################ ############################# + + + ############################ Getters ############################# + ############################ ############################# + ############################ ############################# + + + # return verbose level of the logger + def get_verbose (self): + return self.logger.get_verbose() + + # is the client on read only mode ? + def is_read_only (self): + return self.read_only + + # is the client connected ? + def is_connected (self): + return self.connected and self.comm_link.is_connected + + + # get connection info + def get_connection_info (self): + return self.connection_info + + + # get supported commands by the server + def get_server_supported_cmds(self): + return self.supported_cmds + + # get server version + def get_server_version(self): + return self.server_version + + # get server system info + def get_server_system_info(self): + return self.system_info + + # get port count + def get_port_count(self): + return len(self.ports) + + # returns the port object + def get_port (self, port_id): + return self.ports.get(port_id, RC_ERR("invalid port id")) + + # get all ports as IDs + def get_all_ports (self): + return self.ports.keys() + + # get all acquired ports + def get_acquired_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_acquired()] + + # get all active ports (TX or pause) + def get_active_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_active()] + + # get paused ports + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + # get all TX ports + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + + + ############################ Commands ############################# + ############################ ############################# + ############################ ############################# + + + # set the log on verbose level + def set_verbose (self, level): + self.logger.set_verbose(level) + + + # connects to the server + # mode can be: + # 'RO' - read only + # 'RW' - read/write + # 'RWF' - read write forced (take ownership) + def connect (self, mode = "RW"): + modes = ['RO', 'RW', 'RWF'] + if not mode in modes: + return RC_ERR("invalid mode '{0}'".format(mode)) + + rc = self.__connect(mode) + self.logger.annotate(rc) + + if not rc: + raise STLFailure(rc) + + return rc + + + # disconnects from the server + def disconnect (self, annotate = True): + rc = self.__disconnect() + if annotate: + self.logger.annotate(rc, "Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'], + self.connection_info['sync_port'])) + if not rc: + raise STLFailure(rc) + + return rc + + + # teardown - call after test is done + def teardown (self): + # for now, its only disconnect + rc = self.__disconnect() + if not rc: + raise STLFailure(rc) + + return rc + + + # pings the server on the RPC channel + def ping(self): + rc = self.__ping() + self.logger.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], + self.connection_info['sync_port'])) + + if not rc: + raise STLFailure(rc) + + return rc + + + # reset the server by performing + # force acquire, stop, and remove all streams + def reset(self): + + rc = self.__acquire(force = True) + self.logger.annotate(rc, "Force acquiring all ports:") + if not rc: + raise STLFailure(rc) + + + # force stop all ports + rc = self.__stop_traffic(self.get_all_ports(), True) + self.logger.annotate(rc,"Stop traffic on all ports:") + if not rc: + raise STLFailure(rc) + + + # remove all streams + rc = self.__remove_all_streams(self.get_all_ports()) + self.logger.annotate(rc,"Removing all streams from all ports:") + if not rc: + raise STLFailure(rc) + + # TODO: clear stats + return RC_OK() + + # start cmd + def start (self, + profiles, + ports = None, + mult = "1", + force = False, + duration = -1, + dry = False): + + + # by default use all ports + if ports == None: + ports = self.get_all_ports() + + # verify valid port id list + rc = self.__verify_port_id_list(ports) + if not rc: + raise STLFailure(rc) + + + # verify multiplier + try: + result = parsing_opts.match_multiplier_common(mult) + except argparse.ArgumentTypeError: + raise STLFailure("bad format for multiplier: {0}".format(mult)) + + # process profiles + stream_list = [] + rc = self.__process_profiles(profiles, stream_list) + if not rc: + raise STLFailure(rc) + + + + + ############################ Line ############################# + ############################ Commands ############################# + ############################ ############################# + + def connect_line (self, line): + '''Connects to the TRex server''' + # define a parser + parser = parsing_opts.gen_parser(self, + "connect", + self.connect_line.__doc__, + parsing_opts.FORCE) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + # call the API + if opts.force: + rc = self.connect(mode = "RWF") + else: + rc = self.connect(mode = "RW") - def _prompt_virtual_tx_msg(self): - print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, - port=self.port) + def disconnect_line (self, line): + return self.disconnect() -if __name__ == "__main__": - pass + def reset_line (self, line): + return self.reset() -- cgit 1.2.3-korg