diff options
Diffstat (limited to 'scripts')
34 files changed, 1881 insertions, 650 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py new file mode 100644 index 00000000..72cce5aa --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -0,0 +1,203 @@ +#!/router/bin/python + +try: + # support import for Python 2 + import outer_packages +except ImportError: + # support import for Python 3 + import client.outer_packages +from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage + +import json +import threading +import time +import datetime +import zmq +import re + +from common.trex_stats import * +from common.trex_streams import * + +# basic async stats class +class TrexAsyncStats(object): + def __init__ (self): + self.ref_point = None + self.current = {} + self.last_update_ts = datetime.datetime.now() + + def __format_num (self, size, suffix = ""): + + for unit in ['','K','M','G','T','P']: + if abs(size) < 1000.0: + return "%3.2f %s%s" % (size, unit, suffix) + size /= 1000.0 + + return "NaN" + + def update (self, snapshot): + + #update + self.last_update_ts = datetime.datetime.now() + + self.current = snapshot + + if self.ref_point == None: + self.ref_point = self.current + + + def get (self, field, format = False, suffix = ""): + + if not field in self.current: + return "N/A" + + if not format: + return self.current[field] + else: + return self.__format_num(self.current[field], suffix) + + + def get_rel (self, field, format = False, suffix = ""): + if not field in self.current: + return "N/A" + + if not format: + return (self.current[field] - self.ref_point[field]) + else: + return self.__format_num(self.current[field] - self.ref_point[field], suffix) + + + # return true if new data has arrived in the past 2 seconds + def is_online (self): + delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000 + return (delta_ms < 2000) + +# describes the general stats provided by TRex +class TrexAsyncStatsGeneral(TrexAsyncStats): + def __init__ (self): + super(TrexAsyncStatsGeneral, self).__init__() + + +# per port stats +class TrexAsyncStatsPort(TrexAsyncStats): + def __init__ (self): + super(TrexAsyncStatsPort, self).__init__() + + def get_stream_stats (self, stream_id): + return None + +# stats manager +class TrexAsyncStatsManager(): + def __init__ (self): + + self.general_stats = TrexAsyncStatsGeneral() + self.port_stats = {} + + + def get_general_stats (self): + return self.general_stats + + def get_port_stats (self, port_id): + + if not str(port_id) in self.port_stats: + return None + + return self.port_stats[str(port_id)] + + def update (self, snapshot): + + if snapshot['name'] == 'trex-global': + self.__handle_snapshot(snapshot['data']) + else: + # for now ignore the rest + return + + def __handle_snapshot (self, snapshot): + + general_stats = {} + port_stats = {} + + # filter the values per port and general + for key, value in snapshot.iteritems(): + + # match a pattern of ports + m = re.search('(.*)\-([0-8])', key) + if m: + + port_id = m.group(2) + field_name = m.group(1) + + if not port_id in port_stats: + port_stats[port_id] = {} + + port_stats[port_id][field_name] = value + + else: + # no port match - general stats + general_stats[key] = value + + # update the general object with the snapshot + self.general_stats.update(general_stats) + + # update all ports + for port_id, data in port_stats.iteritems(): + + if not port_id in self.port_stats: + self.port_stats[port_id] = TrexAsyncStatsPort() + + self.port_stats[port_id].update(data) + + + + + +class CTRexAsyncClient(): + def __init__ (self, port): + + self.port = port + + self.raw_snapshot = {} + + self.stats = TrexAsyncStatsManager() + + + self.tr = "tcp://localhost:{0}".format(self.port) + print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + + self.active = True + self.t = threading.Thread(target = self._run) + + # kill this thread on exit and don't add it to the join list + self.t.setDaemon(True) + self.t.start() + + + def _run (self): + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + self.socket.connect(self.tr) + self.socket.setsockopt(zmq.SUBSCRIBE, '') + + while self.active: + msg = json.loads(self.socket.recv_string()) + + key = msg['name'] + self.raw_snapshot[key] = msg['data'] + + self.stats.update(msg) + + + def get_stats (self): + return self.stats + + def get_raw_snapshot (self): + #return str(self.stats.global_stats.get('m_total_tx_bytes')) + " / " + str(self.stats.global_stats.get_rel('m_total_tx_bytes')) + return self.raw_snapshot + + + def stop (self): + self.active = False + self.t.join() + diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py index 46c283f8..92768ca4 100644..100755 --- a/scripts/automation/trex_control_plane/client/trex_hltapi.py +++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py @@ -2,16 +2,305 @@ import trex_root_path from client_utils.packet_builder import CTRexPktBuilder +from trex_stateless_client import CTRexStatelessClient +from common.trex_streams import * +from client_utils.general_utils import id_count_gen +import dpkt -print "done!" class CTRexHltApi(object): def __init__(self): - pass + self.trex_client = None + self.connected = False + # self._stream_db = CStreamList() + self._port_data = {} + + # ----- session functions ----- # + + def connect(self, device, port_list, username, port=5050, reset=False, break_locks=False): + ret_dict = {"status": 0} + self.trex_client = CTRexStatelessClient(username, device, port) + res_ok, msg = self.trex_client.connect() + if not res_ok: + self.trex_client = None + ret_dict.update({"log": msg}) + return ret_dict + # arrived here, connection successfully created with server + # next, try acquiring ports of TRex + port_list = self.parse_port_list(port_list) + response = self.trex_client.acquire(port_list, force=break_locks) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if not res_ok: + self.trex_client.disconnect() + self.trex_client = None + ret_dict.update({"log": log}) + # TODO: should revert taken ports?! + return ret_dict + # arrived here, all desired ports were successfully acquired + print log + if reset: + # remove all port traffic configuration from TRex + response = self.trex_client.remove_all_streams(port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if not res_ok: + self.trex_client.disconnect() + self.trex_client = None + ret_dict.update({"log": log}) + return ret_dict + print log + port_handle = {device: {port: port # since only supporting single TRex at the moment, 1:1 map + for port in port_list} + } + ret_dict.update({"status": 1, + "log": None, + "port_handle": port_handle, + "offline": 0}) # ports are online + self.connected = True + self._port_data_init(port_list) + return ret_dict + + def cleanup_session(self, port_list, maintain_lock=False): + ret_dict = {"status": 0} + if not maintain_lock: + # release taken ports + if port_list == "all": + port_list = self.trex_client.get_acquired_ports() + else: + port_list = self.parse_port_list(port_list) + response = self.trex_client.release(port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + print log + if not res_ok: + ret_dict.update({"log": log}) + return ret_dict + ret_dict.update({"status": 1, + "log": None}) + self.trex_client.disconnect() + self.trex_client = None + self.connected = False + return ret_dict + + def interface_config(self, port_handle, mode="config"): + ALLOWED_MODES = ["config", "modify", "destroy"] + if mode not in ALLOWED_MODES: + raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES)) + # pass this function for now... + return {"status": 1, "log": None} + + # ----- traffic functions ----- # + def traffic_config(self, mode, port_handle, + l2_encap="ethernet_ii", mac_src="00:00:01:00:00:01", mac_dst="00:00:00:00:00:00", + l3_protocol="ipv4", ip_src_addr="0.0.0.0", ip_dst_addr="192.0.0.1", l3_length=110, + transmit_mode="continuous", rate_pps=100, + **kwargs): + ALLOWED_MODES = ["create", "modify", "remove", "enable", "disable", "reset"] + if mode not in ALLOWED_MODES: + raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES)) + if mode == "create": + # create a new stream with desired attributes, starting by creating packet + try: + packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst, + l3_protocol, ip_src_addr, ip_dst_addr, l3_length) + # set transmission attributes + tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs) + # set rx_stats + rx_stats = CRxStats() # defaults with disabled + # join the generated data into stream + stream_obj = CStream() + stream_obj_params = {"enabled": True, + "self_start": True, + "next_stream_id": -1, + "isg": 0.0, + "mode": tx_mode, + "rx_stats": rx_stats, + "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed + stream_obj.load_data(**stream_obj_params) + except Exception as e: + # some exception happened during the stream creation + return {"status": 0, "log": str(e)} + # try adding the stream, until free stream_id is found + port_data = self._port_data.get(port_handle) + id_candidate = None + # TODO: change this to better implementation + while True: + id_candidate = port_data["stream_id_gen"].next() + response = self.trex_client.add_stream(stream_id=id_candidate, + stream_obj=stream_obj, + port_id=port_handle) + res_ok, log = CTRexHltApi.process_response(port_handle, response) + if res_ok: + # found non-taken stream_id on server + # save it for modifying needs + port_data["streams"].update({id_candidate: stream_obj}) + break + else: + # proceed to another iteration to use another id + continue + return {"status": 1, + "stream_id": id_candidate, + "log": None} + else: + raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode)) + + def traffic_control(self, action, port_handle): + ALLOWED_ACTIONS = ["clear_stats", "run", "stop", "sync_run"] + if action not in ALLOWED_ACTIONS: + raise ValueError("action must be one of the following values: {actions}".format(actions=ALLOWED_ACTIONS)) + # ret_dict = {"status": 0, "stopped": 1} + port_list = self.parse_port_list(port_handle) + if action == "run": + response = self.trex_client.start_traffic(port_id=port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if res_ok: + return {"status": 1, + "stopped": 0, + "log": None} + elif action == "stop": + response = self.trex_client.stop_traffic(port_id=port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if res_ok: + return {"status": 1, + "stopped": 1, + "log": None} + else: + raise NotImplementedError("action '{0}' is not supported yet on TRex".format(action)) + + # if we arrived here, this means that operation FAILED! + return {"status": 0, + "log": log} + + + def traffic_stats(self, port_handle, mode): + ALLOWED_MODES = ["aggregate", "streams", "all"] + if mode not in ALLOWED_MODES: + raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES)) + # pass this function for now... + if mode == "aggregate": + # create a new stream with desired attributes, starting by creating packet + try: + packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst, + l3_protocol, ip_src_addr, ip_dst_addr, l3_length) + # set transmission attributes + tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs) + # set rx_stats + rx_stats = CRxStats() # defaults with disabled + # join the generated data into stream + stream_obj = CStream() + stream_obj_params = {"enabled": True, + "self_start": True, + "next_stream_id": -1, + "isg": 0.0, + "mode": tx_mode, + "rx_stats": rx_stats, + "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed + stream_obj.load_data(**stream_obj_params) + except Exception as e: + # some exception happened during the stream creation + return {"status": 0, "log": str(e)} + # try adding the stream, until free stream_id is found + port_data = self._port_data.get(port_handle) + id_candidate = None + # TODO: change this to better implementation + while True: + id_candidate = port_data["stream_id_gen"].next() + response = self.trex_client.add_stream(stream_id=id_candidate, + stream_obj=stream_obj, + port_id=port_handle) + res_ok, log = CTRexHltApi.process_response(port_handle, response) + if res_ok: + # found non-taken stream_id on server + # save it for modifying needs + port_data["streams"].update({id_candidate: stream_obj}) + break + else: + # proceed to another iteration to use another id + continue + return {"status": 1, + "stream_id": id_candidate, + "log": None} + else: + raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode)) + + def get_aggregate_port_stats(self, port_handle): + return self.traffic_stats(port_handle, mode="aggregate") + + def get_stream_stats(self, port_handle): + return self.traffic_stats(port_handle, mode="streams") + + # ----- internal functions ----- # + def _port_data_init(self, port_list): + for port in port_list: + self._port_data[port] = {"stream_id_gen": id_count_gen(), + "streams": {}} + + @staticmethod + def process_response(port_list, response): + if isinstance(port_list, list): + res_ok, response = response + log = CTRexHltApi.join_batch_response(response) + else: + res_ok = response.success + log = str(response) + return res_ok, log + + @staticmethod + def parse_port_list(port_list): + if isinstance(port_list, str): + return [int(port) + for port in port_list.split()] + elif isinstance(port_list, list): + return [int(port) + for port in port_list] + else: + return port_list + + @staticmethod + def join_batch_response(responses): + return "\n".join([str(response) + for response in responses]) + + @staticmethod + def generate_stream(l2_encap, mac_src, mac_dst, + l3_protocol, ip_src_addr, ip_dst_addr, l3_length): + ALLOWED_L3_PROTOCOL = {"ipv4": dpkt.ethernet.ETH_TYPE_IP, + "ipv6": dpkt.ethernet.ETH_TYPE_IP6, + "arp": dpkt.ethernet.ETH_TYPE_ARP} + ALLOWED_L4_PROTOCOL = {"tcp": dpkt.ip.IP_PROTO_TCP, + "udp": dpkt.ip.IP_PROTO_UDP, + "icmp": dpkt.ip.IP_PROTO_ICMP, + "icmpv6": dpkt.ip.IP_PROTO_ICMP6, + "igmp": dpkt.ip.IP_PROTO_IGMP, + "rtp": dpkt.ip.IP_PROTO_IRTP, + "isis": dpkt.ip.IP_PROTO_ISIS, + "ospf": dpkt.ip.IP_PROTO_OSPF} + + pkt_bld = CTRexPktBuilder() + if l2_encap == "ethernet_ii": + pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet()) + # set Ethernet layer attributes + pkt_bld.set_eth_layer_addr("l2", "src", mac_src) + pkt_bld.set_eth_layer_addr("l2", "dst", mac_dst) + else: + raise NotImplementedError("l2_encap does not support the desired encapsulation '{0}'".format(l2_encap)) + # set l3 on l2 + if l3_protocol not in ALLOWED_L3_PROTOCOL: + raise ValueError("l3_protocol must be one of the following: {0}".format(ALLOWED_L3_PROTOCOL)) + pkt_bld.set_layer_attr("l2", "type", ALLOWED_L3_PROTOCOL[l3_protocol]) + + # set l3 attributes + if l3_protocol == "ipv4": + pkt_bld.add_pkt_layer("l3", dpkt.ip.IP()) + pkt_bld.set_ip_layer_addr("l3", "src", ip_src_addr) + pkt_bld.set_ip_layer_addr("l3", "dst", ip_dst_addr) + pkt_bld.set_layer_attr("l3", "len", l3_length) + else: + raise NotImplementedError("l3_protocol '{0}' is not supported by TRex yet.".format(l3_protocol)) + + pkt_bld.dump_pkt_to_pcap("stream_test.pcap") + return pkt_bld + - def config_traffic(self): - pass if __name__ == "__main__": pass diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 334496d1..627c3365 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -10,58 +10,151 @@ from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage from client_utils.packet_builder import CTRexPktBuilder import json from common.trex_stats import * +from common.trex_streams import * from collections import namedtuple +from trex_async_client import CTRexAsyncClient + +RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) + +class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): + __slots__ = () + def __str__(self): + return "{id:^3} - {msg} ({stat})".format(id=self.id, + msg=self.msg, + stat="success" if self.success else "fail") + +# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg']) class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" - RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) - def __init__(self, username, server="localhost", port=5050, virtual=False): + def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False): super(CTRexStatelessClient, self).__init__() self.user = username - self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) + self.verbose = False self._conn_handler = {} self._active_ports = set() self._stats = CTRexStatsManager("port", "stream") self._system_info = None + self._server_version = None + self.__err_log = None + + self._async_client = CTRexAsyncClient(async_port) + # ----- decorator methods ----- # + def acquired(func): + def wrapper_f(self, *args, **kwargs): + # print func.__name__ + # print args + # print kwargs + port_ids = kwargs.get("port_id") + if not port_ids: + # print "FROM ARGS!" + # print args + port_ids = args[0] + if isinstance(port_ids, int): + # make sure port_ids is a list + port_ids = [port_ids] + bad_ids = set() + # print "=============" + # print port_ids + for port_id in port_ids: + port_owned = self._conn_handler.get(port_id) + if not port_owned: + bad_ids.add(port_id) + # elif active_and_owned: # stronger condition than just owned, hence gets precedence + # if port_owned and port_id in self._active_ports: + # continue + # else: + # bad_ids.add(port_id) + else: + continue + if bad_ids: + # Some port IDs are not according to desires status + raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not " + "at allowed states".format(func.__name__, list(bad_ids))) + else: + return func(self, *args, **kwargs) + return wrapper_f + def force_status(owned=True, active_and_owned=False): def wrapper(func): def wrapper_f(self, *args, **kwargs): + # print args + # print kwargs port_ids = kwargs.get("port_id") + if not port_ids: + #print "FROM ARGS!" + #print args + port_ids = args[0] if isinstance(port_ids, int): # make sure port_ids is a list port_ids = [port_ids] bad_ids = set() + # print "=============" + # print port_ids for port_id in port_ids: - port_owned = self._conn_handler.get(kwargs.get(port_id)) + port_owned = self._conn_handler.get(port_id) if owned and not port_owned: - bad_ids.add(port_ids) + bad_ids.add(port_id) elif active_and_owned: # stronger condition than just owned, hence gets precedence if port_owned and port_id in self._active_ports: continue else: - bad_ids.add(port_ids) + bad_ids.add(port_id) else: continue if bad_ids: # Some port IDs are not according to desires status - raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" - "at allowed stated".format(func.__name__)) + raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not " + "at allowed states".format(func.__name__, list(bad_ids))) else: - func(self, *args, **kwargs) + return func(self, *args, **kwargs) return wrapper_f return wrapper @property def system_info(self): if not self._system_info: - self._system_info = self.get_system_info() - return self._system_info + rc, info = self.get_system_info() + if rc: + self._system_info = info + else: + self.__err_log = info + return self._system_info if self._system_info else "Unknown" + + @property + def server_version(self): + if not self._server_version: + rc, ver_info = self.get_version() + if rc: + self._server_version = ver_info + else: + self.__err_log = ver_info + return self._server_version if self._server_version else "Unknown" + + def is_connected(self): + return self.comm_link.is_connected # ----- user-access methods ----- # + def connect(self): + rc, err = self.comm_link.connect() + if not rc: + return rc, err + return self._init_sync() + + def get_stats_async (self): + return self._async_client.get_stats() + + def get_connection_port (self): + return self.comm_link.port + + def disconnect(self): + return self.comm_link.disconnect() + def ping(self): return self.transmit("ping") @@ -77,24 +170,45 @@ class CTRexStatelessClient(object): def get_port_count(self): return self.system_info.get("port_count") + def get_port_ids(self, as_str=False): + port_ids = range(self.get_port_count()) + if as_str: + return " ".join(str(p) for p in port_ids) + else: + return port_ids + + def sync_user(self, sync_streams=False): + return self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) + + def get_acquired_ports(self): + return self._conn_handler.keys() + + def get_active_ports(self): + return list(self._active_ports) + + def set_verbose(self, mode): + self.comm_link.set_verbose(mode) + self.verbose = mode + def acquire(self, port_id, force=False): if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) + commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: - self._process_batch_result(commands, resp_list, self._handle_acquire_response) + return self._process_batch_result(commands, resp_list, self._handle_acquire_response) else: params = {"port_id": port_id, "user": self.user, "force": force} - command = self.RpcCmdData("acquire", params) - self._handle_acquire_response(command, self.transmit(command.method, command.params)) - return self._conn_handler.get(port_id) + command = RpcCmdData("acquire", params) + return self._handle_acquire_response(command, + self.transmit(command.method, command.params), + self.default_success_test) @force_status(owned=True) def release(self, port_id=None): @@ -103,18 +217,20 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: - self._process_batch_result(commands, resp_list, self._handle_release_response) + return self._process_batch_result(commands, resp_list, self._handle_release_response, + success_test=self.ack_success_test) else: self._conn_handler.pop(port_id) params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("release", params) - self._handle_release_response(command, self.transmit(command.method, command.params)) - return + command = RpcCmdData("release", params) + return self._handle_release_response(command, + self.transmit(command.method, command.params), + self.ack_success_test) @force_status(owned=True) def add_stream(self, stream_id, stream_obj, port_id=None): @@ -128,6 +244,27 @@ class CTRexStatelessClient(object): return self.transmit("add_stream", params) @force_status(owned=True) + def add_stream_pack(self, port_id=None, *stream_packs): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + + # since almost every run contains more than one transaction with server, handle all as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [] + for stream_pack in stream_packs: + commands.extend([RpcCmdData("add_stream", {"port_id": p_id, + "handler": self._conn_handler.get(p_id), + "stream_id": stream_pack.stream_id, + "stream": stream_pack.stream} + ) + for p_id in port_ids] + ) + res_ok, resp_list = self.transmit_batch(commands) + if res_ok: + return self._process_batch_result(commands, resp_list, self._handle_add_stream_response, + success_test=self.ack_success_test) + + @force_status(owned=True) def remove_stream(self, stream_id, port_id=None): if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") @@ -136,7 +273,39 @@ class CTRexStatelessClient(object): "stream_id": stream_id} return self.transmit("remove_stream", params) - @force_status(owned=True, active_and_owned=True) + @force_status(owned=True) + def remove_all_streams(self, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + if rc: + return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response, + success_test=self.ack_success_test) + else: + params = {"port_id": port_id, + "handler": self._conn_handler.get(port_id)} + command = RpcCmdData("remove_all_streams", params) + return self._handle_remove_streams_response(command, + self.transmit(command.method, command.params), + self.ack_success_test) + pass + + + @force_status(owned=True)#, active_and_owned=True) + def get_all_streams(self, port_id, get_pkt = False): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id, + "get_pkt": get_pkt} + return self.transmit("get_all_streams", params) + + @force_status(owned=True)#, active_and_owned=True) def get_stream_id_list(self, port_id=None): if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") @@ -145,32 +314,38 @@ class CTRexStatelessClient(object): return self.transmit("get_stream_list", params) @force_status(owned=True, active_and_owned=True) - def get_stream(self, stream_id, port_id=None): + def get_stream(self, stream_id, port_id, get_pkt = False): if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, - "stream_id": stream_id} + "stream_id": stream_id, + "get_pkt": get_pkt} return self.transmit("get_stream_list", params) - @force_status(owned=True) - def start_traffic(self, port_id=None): + @acquired + def start_traffic(self, multiplier, port_id=None): if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), + "port_id": p_id, + "mul": multiplier}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: - self._process_batch_result(commands, resp_list, self._handle_start_traffic_response) + return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response, + success_test=self.ack_success_test) else: params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = self.RpcCmdData("start_traffic", params) - self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) - return + "port_id": port_id, + "mul": multiplier} + command = RpcCmdData("start_traffic", params) + return self._handle_start_traffic_response(command, + self.transmit(command.method, command.params), + self.ack_success_test) @force_status(owned=False, active_and_owned=True) def stop_traffic(self, port_id=None): @@ -179,22 +354,24 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: - self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response) + return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response, + success_test=self.ack_success_test) else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("stop_traffic", params) - self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) - return + command = RpcCmdData("stop_traffic", params) + return self._handle_start_traffic_response(command, + self.transmit(command.method, command.params), + self.ack_success_test) - def get_global_stats(self): - command = self.RpcCmdData("get_global_stats", {}) - return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) - # return self.transmit("get_global_stats") +# def get_global_stats(self): +# command = RpcCmdData("get_global_stats", {}) +# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) +# # return self.transmit("get_global_stats") @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): @@ -203,7 +380,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -211,7 +388,7 @@ class CTRexStatelessClient(object): else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("get_port_stats", params) + command = RpcCmdData("get_port_stats", params) return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params)) @force_status(owned=True, active_and_owned=True) @@ -221,7 +398,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -229,15 +406,36 @@ class CTRexStatelessClient(object): else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("get_stream_stats", params) + command = RpcCmdData("get_stream_stats", params) return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params)) # ----- internal methods ----- # + def _init_sync(self): + # get server version and system info + err = False + if self.server_version == "Unknown" or self.system_info == "Unknown": + self.disconnect() + return False, self.__err_log + # sync with previous session + res_ok, port_info = self.sync_user() + if not res_ok: + self.disconnect() + return False, port_info + else: + # handle sync data + for port in port_info: + self._conn_handler[port.get("port_id")] = port.get("handler") + if port.get("state") == "transmitting": + # port is active + self._active_ports.add(port.get("port_id")) + return True, "" + + def transmit(self, method_name, params={}): - return self.tx_link.transmit(method_name, params) + return self.comm_link.transmit(method_name, params) def transmit_batch(self, batch_list): - return self.tx_link.transmit_batch(batch_list) + return self.comm_link.transmit_batch(batch_list) @staticmethod def _object_decoder(obj_type, obj_data): @@ -258,36 +456,86 @@ class CTRexStatelessClient(object): else: return False + @staticmethod + def ack_success_test(result_obj): + if result_obj.success and result_obj.data == "ACK": + return True + else: + return False + + # ----- handler internal methods ----- # - def _handle_acquire_response(self, request, response): - if response.success: - self._conn_handler[request.get("port_id")] = response.data + def _handle_general_response(self, request, response, msg, success_test=None): + port_id = request.params.get("port_id") + if not success_test: + success_test = self.default_success_test + if success_test(response): + self._conn_handler[port_id] = response.data + return RpcResponseStatus(True, port_id, msg) + else: + return RpcResponseStatus(False, port_id, response.data) - def _handle_release_response(self, request, response): - if response.success: - del self._conn_handler[request.get("port_id")] - def _handle_start_traffic_response(self, request, response): - if response.success: - self._active_ports.add(request.get("port_id")) + def _handle_acquire_response(self, request, response, success_test): + port_id = request.params.get("port_id") + if success_test(response): + self._conn_handler[port_id] = response.data + return RpcResponseStatus(True, port_id, "Acquired") + else: + return RpcResponseStatus(False, port_id, response.data) - def _handle_stop_traffic_response(self, request, response): - if response.success: - self._active_ports.remove(request.get("port_id")) + def _handle_add_stream_response(self, request, response, success_test): + port_id = request.params.get("port_id") + stream_id = request.params.get("stream_id") + if success_test(response): + return RpcResponseStatus(True, port_id, "Stream {0} added".format(stream_id)) + else: + return RpcResponseStatus(False, port_id, response.data) + + def _handle_remove_streams_response(self, request, response, success_test): + port_id = request.params.get("port_id") + if success_test(response): + return RpcResponseStatus(True, port_id, "Removed") + else: + return RpcResponseStatus(False, port_id, response.data) - def _handle_get_global_stats_response(self, request, response): + def _handle_release_response(self, request, response, success_test): + port_id = request.params.get("port_id") + if success_test(response): + del self._conn_handler[port_id] + return RpcResponseStatus(True, port_id, "Released") + else: + return RpcResponseStatus(False, port_id, response.data) + + def _handle_start_traffic_response(self, request, response, success_test): + port_id = request.params.get("port_id") + if success_test(response): + self._active_ports.add(port_id) + return RpcResponseStatus(True, port_id, "Traffic started") + else: + return RpcResponseStatus(False, port_id, response.data) + + def _handle_stop_traffic_response(self, request, response, success_test): + port_id = request.params.get("port_id") + if success_test(response): + self._active_ports.remove(port_id) + return RpcResponseStatus(True, port_id, "Traffic stopped") + else: + return RpcResponseStatus(False, port_id, response.data) + + def _handle_get_global_stats_response(self, request, response, success_test): if response.success: return CGlobalStats(**response.success) else: return False - def _handle_get_port_stats_response(self, request, response): + def _handle_get_port_stats_response(self, request, response, success_test): if response.success: return CPortStats(**response.success) else: return False - def _handle_get_stream_stats_response(self, request, response): + def _handle_get_stream_stats_response(self, request, response, success_test): if response.success: return CStreamStats(**response.success) else: @@ -298,32 +546,58 @@ class CTRexStatelessClient(object): # check each item of the sequence return all([self._is_ports_valid(port) for port in port_id]) - elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()): + elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()): return True else: return False def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): + res_ok = True + responses = [] + if isinstance(success_test, staticmethod): + success_test = success_test.__func__ for i, response in enumerate(resp_list): - # testing each result with success test so that a conclusion report could be deployed in future. - if success_test(response): - # run handler method with its params - handler_func(req_list[i], response) - else: - continue # TODO: mark in this case somehow the bad result + # run handler method with its params + processed_response = handler_func(req_list[i], response, success_test) + responses.append(processed_response) + if not processed_response.success: + res_ok = False + # else: + # res_ok = False # TODO: mark in this case somehow the bad result + # print res_ok + # print responses + return res_ok, responses # ------ private classes ------ # - class CTxLink(object): + class CCommLink(object): """describes the connectivity of the stateless client method""" def __init__(self, server="localhost", port=5050, virtual=False): - super(CTRexStatelessClient.CTxLink, self).__init__() + super(CTRexStatelessClient.CCommLink, self).__init__() self.virtual = virtual self.server = server self.port = port + self.verbose = False self.rpc_link = JsonRpcClient(self.server, self.port) + + @property + def is_connected(self): + if not self.virtual: + return self.rpc_link.connected + else: + return True + + def set_verbose(self, mode): + self.verbose = mode + return self.rpc_link.set_verbose(mode) + + def connect(self): + if not self.virtual: + return self.rpc_link.connect() + + def disconnect(self): if not self.virtual: - self.rpc_link.connect() + return self.rpc_link.disconnect() def transmit(self, method_name, params={}): if self.virtual: @@ -352,144 +626,5 @@ class CTRexStatelessClient(object): port=self.port) -class CStream(object): - """docstring for CStream""" - DEFAULTS = {"rx_stats": CRxStats, - "mode": CTxMode, - "isg": 5.0, - "next_stream": -1, - "self_start": True, - "enabled": True} - - def __init__(self, **kwargs): - super(CStream, self).__init__() - for k, v in kwargs.items(): - setattr(self, k, v) - # set default values to unset attributes, according to DEFAULTS dict - set_keys = set(kwargs.keys()) - keys_to_set = [x - for x in self.DEFAULTS - if x not in set_keys] - for key in keys_to_set: - default = self.DEFAULTS.get(key) - if type(default) == type: - setattr(self, key, default()) - else: - setattr(self, key, default) - - @property - def packet(self): - return self._packet - - @packet.setter - def packet(self, packet_obj): - assert isinstance(packet_obj, CTRexPktBuilder) - self._packet = packet_obj - - @property - def enabled(self): - return self._enabled - - @enabled.setter - def enabled(self, bool_value): - self._enabled = bool(bool_value) - - @property - def self_start(self): - return self._self_start - - @self_start.setter - def self_start(self, bool_value): - self._self_start = bool(bool_value) - - @property - def next_stream(self): - return self._next_stream - - @next_stream.setter - def next_stream(self, value): - self._next_stream = int(value) - - def dump(self): - pass - return {"enabled": self.enabled, - "self_start": self.self_start, - "isg": self.isg, - "next_stream": self.next_stream, - "packet": self.packet.dump_pkt(), - "mode": self.mode.dump(), - "vm": self.packet.get_vm_data(), - "rx_stats": self.rx_stats.dump()} - -class CRxStats(object): - - def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): - self._rx_dict = {"enabled": enabled, - "seq_enabled": seq_enabled, - "latency_enabled": latency_enabled} - - @property - def enabled(self): - return self._rx_dict.get("enabled") - - @enabled.setter - def enabled(self, bool_value): - self._rx_dict['enabled'] = bool(bool_value) - - @property - def seq_enabled(self): - return self._rx_dict.get("seq_enabled") - - @seq_enabled.setter - def seq_enabled(self, bool_value): - self._rx_dict['seq_enabled'] = bool(bool_value) - - @property - def latency_enabled(self): - return self._rx_dict.get("latency_enabled") - - @latency_enabled.setter - def latency_enabled(self, bool_value): - self._rx_dict['latency_enabled'] = bool(bool_value) - - def dump(self): - return {k: v - for k, v in self._rx_dict.items() - if v - } - - -class CTxMode(object): - """docstring for CTxMode""" - def __init__(self, tx_mode, pps): - super(CTxMode, self).__init__() - if tx_mode not in ["continuous", "single_burst", "multi_burst"]: - raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode)) - self._tx_mode = tx_mode - self._fields = {'pps': float(pps)} - if tx_mode == "single_burst": - self._fields['total_pkts'] = 0 - elif tx_mode == "multi_burst": - self._fields['pkts_per_burst'] = 0 - self._fields['ibg'] = 0.0 - self._fields['count'] = 0 - else: - pass - - def set_tx_mode_attr(self, attr, val): - if attr in self._fields: - self._fields[attr] = type(self._fields.get(attr))(val) - else: - raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')". - format(attr, self._tx_mode)) - - def dump(self): - dump = {"type": self._tx_mode} - dump.update({k: v - for k, v in self._fields.items() - }) - return dump - - if __name__ == "__main__": pass diff --git a/scripts/automation/trex_control_plane/client_utils/general_utils.py b/scripts/automation/trex_control_plane/client_utils/general_utils.py index 5488b9dd..69ad14b2 100755 --- a/scripts/automation/trex_control_plane/client_utils/general_utils.py +++ b/scripts/automation/trex_control_plane/client_utils/general_utils.py @@ -24,7 +24,7 @@ def user_input(): def get_current_user(): if pwd: - return pwd.getpwuid( os.geteuid() ).pw_name + return pwd.getpwuid(os.geteuid()).pw_name else: return getpass.getuser() @@ -75,6 +75,22 @@ def random_id_gen(length=8): return_id += random.choice(id_chars) yield return_id +def id_count_gen(): + """ + A generator for creating an increasing id for objects, starting from 0 + + :parameters: + None + + :return: + an id (unsigned int) with each next() request. + """ + return_id = 0 + while True: + yield return_id + return_id += 1 + + if __name__ == "__main__": pass diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index ed14e6f8..58491aba 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -6,6 +6,9 @@ import json import general_utils import re from time import sleep +from collections import namedtuple + +CmdResponse = namedtuple('CmdResponse', ['success', 'data']) class bcolors: BLUE = '\033[94m' @@ -23,12 +26,12 @@ class BatchMessage(object): self.rpc_client = rpc_client self.batch_list = [] - def add (self, method_name, params = {}): + def add (self, method_name, params={}): id, msg = self.rpc_client.create_jsonrpc_v2(method_name, params, encode = False) self.batch_list.append(msg) - def invoke (self, block = False): + def invoke(self, block = False): if not self.rpc_client.connected: return False, "Not connected to server" @@ -36,9 +39,9 @@ class BatchMessage(object): rc, resp_list = self.rpc_client.send_raw_msg(msg, block = False) if len(self.batch_list) == 1: - return True, [(rc, resp_list)] + return CmdResponse(True, [CmdResponse(rc, resp_list)]) else: - return rc, resp_list + return CmdResponse(rc, resp_list) # JSON RPC v2.0 client @@ -47,7 +50,7 @@ class JsonRpcClient(object): def __init__ (self, default_server, default_port): self.verbose = False self.connected = False - + # default values self.port = default_port self.server = default_server @@ -73,7 +76,7 @@ class JsonRpcClient(object): # float pretty_str = re.sub(r'([ ]*:[ ]+)(\-?[1-9][0-9]*\.[0-9]+)',r'\1{0}\2{1}'.format(bcolors.MAGENTA, bcolors.ENDC), pretty_str) # strings - + pretty_str = re.sub(r'([ ]*:[ ]+)("[^"]*")',r'\1{0}\2{1}'.format(bcolors.RED, bcolors.ENDC), pretty_str) pretty_str = re.sub(r"('[^']*')", r'{0}\1{1}'.format(bcolors.MAGENTA, bcolors.RED), pretty_str) except : @@ -115,7 +118,7 @@ class JsonRpcClient(object): return self.send_raw_msg(msg, block) - + # low level send of string message def send_raw_msg (self, msg, block = False): self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n") @@ -127,7 +130,7 @@ class JsonRpcClient(object): self.socket.send(msg, flags = zmq.NOBLOCK) except zmq.error.ZMQError as e: self.disconnect() - return False, "Failed To Get Send Message" + return CmdResponse(False, "Failed To Get Send Message") got_response = False @@ -145,7 +148,7 @@ class JsonRpcClient(object): if not got_response: self.disconnect() - return False, "Failed To Get Server Response" + return CmdResponse(False, "Failed To Get Server Response") self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -159,13 +162,13 @@ class JsonRpcClient(object): for single_response in response_json: rc, msg = self.process_single_response(single_response) - rc_list.append( (rc, msg) ) + rc_list.append( CmdResponse(rc, msg) ) - return True, rc_list + return CmdResponse(True, rc_list) else: rc, msg = self.process_single_response(response_json) - return rc, msg + return CmdResponse(rc, msg) def process_single_response (self, response_json): @@ -182,7 +185,7 @@ class JsonRpcClient(object): # if no error there should be a result if ("result" not in response_json): - return False, "Malfromed Response ({0})".format(str(response)) + return False, "Malformed Response ({0})".format(str(response)) return True, response_json["result"] @@ -200,7 +203,7 @@ class JsonRpcClient(object): else: return False, "Not connected to server" - def connect(self, server = None, port = None): + def connect(self, server=None, port=None): if self.connected: self.disconnect() @@ -511,3 +514,6 @@ class TrexStatelessClient(JsonRpcClient): return True, resp_list # return self.invoke_rpc_method('add_stream', params = params) + +if __name__ == "__main__": + pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/common/text_opts.py b/scripts/automation/trex_control_plane/common/text_opts.py new file mode 100755 index 00000000..06c2c056 --- /dev/null +++ b/scripts/automation/trex_control_plane/common/text_opts.py @@ -0,0 +1,100 @@ +import json +import re + +TEXT_CODES = {'bold': {'start': '\x1b[1m', + 'end': '\x1b[22m'}, + 'cyan': {'start': '\x1b[36m', + 'end': '\x1b[39m'}, + 'blue': {'start': '\x1b[34m', + 'end': '\x1b[39m'}, + 'red': {'start': '\x1b[31m', + 'end': '\x1b[39m'}, + 'magenta': {'start': '\x1b[35m', + 'end': '\x1b[39m'}, + 'green': {'start': '\x1b[32m', + 'end': '\x1b[39m'}, + 'yellow': {'start': '\x1b[33m', + 'end': '\x1b[39m'}, + 'underline': {'start': '\x1b[4m', + 'end': '\x1b[24m'}} + + +def bold(text): + return text_attribute(text, 'bold') + + +def cyan(text): + return text_attribute(text, 'cyan') + + +def blue(text): + return text_attribute(text, 'blue') + + +def red(text): + return text_attribute(text, 'red') + + +def magenta(text): + return text_attribute(text, 'magenta') + + +def green(text): + return text_attribute(text, 'green') + +def yellow(text): + return text_attribute(text, 'yellow') + +def underline(text): + return text_attribute(text, 'underline') + + +def text_attribute(text, attribute): + return "{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'], + txt=text, + stop=TEXT_CODES[attribute]['end']) + + +FUNC_DICT = {'blue': blue, + 'bold': bold, + 'green': green, + 'yellow': yellow, + 'cyan': cyan, + 'magenta': magenta, + 'underline': underline, + 'red': red} + + +def format_text(text, *args): + return_string = text + for i in args: + func = FUNC_DICT.get(i) + if func: + return_string = func(return_string) + return return_string + +# pretty print for JSON +def pretty_json (json_str, use_colors = True): + pretty_str = json.dumps(json.loads(json_str), indent = 4, separators=(',', ': '), sort_keys = True) + + if not use_colors: + return pretty_str + + try: + # int numbers + pretty_str = re.sub(r'([ ]*:[ ]+)(\-?[1-9][0-9]*[^.])',r'\1{0}'.format(blue(r'\2')), pretty_str) + # float + pretty_str = re.sub(r'([ ]*:[ ]+)(\-?[1-9][0-9]*\.[0-9]+)',r'\1{0}'.format(magenta(r'\2')), pretty_str) + # # strings + # + pretty_str = re.sub(r'([ ]*:[ ]+)("[^"]*")',r'\1{0}'.format(red(r'\2')), pretty_str) + pretty_str = re.sub(r"('[^']*')", r'{0}\1{1}'.format(TEXT_CODES['magenta']['start'], + TEXT_CODES['red']['start']), pretty_str) + except : + pass + + return pretty_str + + +if __name__ == "__main__": + pass diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py index 783f2769..bb4c72ca 100755 --- a/scripts/automation/trex_control_plane/common/trex_streams.py +++ b/scripts/automation/trex_control_plane/common/trex_streams.py @@ -23,7 +23,7 @@ class CStreamList(object): if name in self.streams_list: raise NameError("A stream with this name already exists on this list.") self.streams_list[name]=stream_obj - return + return name def remove_stream(self, name): popped = self.streams_list.pop(name) @@ -48,6 +48,7 @@ class CStreamList(object): self.streams_list.clear() streams_data = load_yaml_to_obj(file_path) assert isinstance(streams_data, list) + new_streams_data = [] for stream in streams_data: stream_name = stream.get("name") raw_stream = stream.get("stream") @@ -58,10 +59,11 @@ class CStreamList(object): new_stream_data = self.yaml_loader.validate_yaml(raw_stream, "stream", multiplier= multiplier) + new_streams_data.append(new_stream_data) new_stream_obj = CStream() new_stream_obj.load_data(**new_stream_data) self.append_stream(stream_name, new_stream_obj) - return new_stream_data + return new_streams_data def compile_streams(self): # first, assign an id to each stream @@ -156,7 +158,6 @@ class CStream(object): """docstring for CStream""" FIELDS = ["enabled", "self_start", "next_stream_id", "isg", "mode", "rx_stats", "packet", "vm"] - # COMPILE_FIELDS = ["enabled", "self_start", "next_stream_id", "isg", "mode", "rx_stats", "packet", "vm"] def __init__(self): self.is_loaded = False @@ -183,6 +184,7 @@ class CStream(object): if isinstance(kwargs[k], CTRexPktBuilder): if "vm" not in kwargs: self.load_packet_obj(kwargs[k]) + break # vm field check is skipped else: raise ValueError("When providing packet object with a CTRexPktBuilder, vm parameter " "should not be supplied") @@ -226,8 +228,7 @@ class CStream(object): return - def dump(self, compilation=False): - # fields = CStream.COMPILE_FIELDS if compilation else CStream.FIELDS + def dump(self): if self.is_loaded: dump = {} for key in CStream.FIELDS: @@ -239,10 +240,6 @@ class CStream(object): else: raise RuntimeError("CStream object isn't loaded with data. Use 'load_data' method.") - def dump_compiled(self): - return self.dump(compilation=True) - - if __name__ == "__main__": pass diff --git a/scripts/automation/trex_control_plane/console/parsing_opts.py b/scripts/automation/trex_control_plane/console/parsing_opts.py new file mode 100755 index 00000000..c94a7461 --- /dev/null +++ b/scripts/automation/trex_control_plane/console/parsing_opts.py @@ -0,0 +1,103 @@ +import argparse +from collections import namedtuple +import sys + +ArgumentPack = namedtuple('ArgumentPack', ['name_or_flags', 'options']) +ArgumentGroup = namedtuple('ArgumentGroup', ['type', 'args', 'options']) + + +# list of available parsing options +MULTIPLIER = 1 +PORT_LIST = 2 +ALL_PORTS = 3 +PORT_LIST_WITH_ALL = 4 +FILE_PATH = 5 +FILE_FROM_DB = 6 +STREAM_FROM_PATH_OR_FILE = 7 + +# list of ArgumentGroup types +MUTEX = 1 + + + + +OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], + {'help': "Set multiplier for stream", 'dest': "mult", 'type': float}), + PORT_LIST: ArgumentPack(['--port'], + {"nargs": '+', + # "action": "store_" + 'help': "A list of ports on which to apply the command", + 'default': []}), + ALL_PORTS: ArgumentPack(['-a'], + {"action": "store_true", + "dest": "all", + 'help': "Set this flag to apply the command on all available ports"}), + + FILE_PATH: ArgumentPack(['-f'], + {'help': "File path to YAML file that describes a stream pack"}), + FILE_FROM_DB: ArgumentPack(['--db'], + {'help': "A stream pack which already loaded into console cache."}), + # advanced options + PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST, + ALL_PORTS], + {'required': True}), + STREAM_FROM_PATH_OR_FILE: ArgumentGroup(MUTEX, [FILE_PATH, + FILE_FROM_DB], + {'required': True}) + } + + +class CCmdArgParser(argparse.ArgumentParser): + + def __init__(self, *args, **kwargs): + super(CCmdArgParser, self).__init__(*args, **kwargs) + pass + + # def error(self, message): + # try: + # super(CCmdArgParser, self).error(message) # this will trigger system exit! + # except SystemExit: + # return -1 + # + # # self.print_usage(sys.stderr) + # # print ('%s: error: %s\n') % (self.prog, message) + # # self.print_help() + # return + + def exit(self, status=0, message=None): + try: + super(CCmdArgParser, self).exit(status, message) # this will trigger system exit! + except SystemExit: + return -1 + return + +def gen_parser(op_name, description, *args): + parser = CCmdArgParser(prog=op_name, conflict_handler='resolve', + # add_help=False, + description=description) + for param in args: + try: + argument = OPTIONS_DB[param] + if isinstance(argument, ArgumentGroup): + if argument.type == MUTEX: + # handle as mutually exclusive group + group = parser.add_mutually_exclusive_group(**argument.options) + for sub_argument in argument.args: + group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags, + **OPTIONS_DB[sub_argument].options) + else: + # ignore invalid objects + continue + elif isinstance(argument, ArgumentPack): + parser.add_argument(*argument.name_or_flags, + **argument.options) + else: + # ignore invalid objects + continue + except KeyError as e: + cause = e.args[0] + raise KeyError("The attribute '{0}' is missing as a field of the {1} option.\n".format(cause, param)) + return parser + +if __name__ == "__main__": + pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index a9ac040b..a2c738ab 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -17,6 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + import cmd import json import ast @@ -28,18 +29,20 @@ import sys import tty, termios import trex_root_path from common.trex_streams import * - - -from client_utils.jsonrpc_client import TrexStatelessClient +from client.trex_stateless_client import CTRexStatelessClient +from common.text_opts import * +from client_utils.general_utils import user_input, get_current_user +import parsing_opts import trex_status from collections import namedtuple +__version__ = "1.0" + LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) -# -def readch (choices = []): - +def readch(choices=[]): + fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: @@ -55,22 +58,49 @@ def readch (choices = []): return None -class YesNoMenu(object): +class ConfirmMenu(object): def __init__ (self, caption): - self.caption = caption + self.caption = "{cap} [confirm] : ".format(cap=caption) - def show (self): - print "{0}".format(self.caption) - sys.stdout.write("[Y/y/N/n] : ") - ch = readch(choices = ['y', 'Y', 'n', 'N']) - if ch == None: - return None - - print "\n" - if ch == 'y' or ch == 'Y': - return True + def show(self): + sys.stdout.write(self.caption) + input = user_input() + if input: + return False else: + # user hit Enter + return True + + +class CStreamsDB(object): + + def __init__(self): + self.stream_packs = {} + + def load_streams(self, name, LoadedStreamList_obj): + if name in self.stream_packs: return False + else: + self.stream_packs[name] = LoadedStreamList_obj + return True + + def remove_stream_packs(self, *names): + removed_streams = [] + for name in names: + removed = self.stream_packs.pop(name) + if removed: + removed_streams.append(name) + return removed_streams + + def clear(self): + self.stream_packs.clear() + + def get_loaded_streams_names(self): + return self.stream_packs.keys() + + def get_stream_pack(self, name): + return self.stream_packs.get(name) + # multi level cmd menu class CmdMenu(object): @@ -119,31 +149,33 @@ class AddStreamMenu(CmdMenu): self.add_menu('Please select ISG', ['d', 'e', 'f']) # main console object -class TrexConsole(cmd.Cmd): +class TRexConsole(cmd.Cmd): """Trex Console""" - - def __init__(self, rpc_client): + + def __init__(self, stateless_client, verbose): cmd.Cmd.__init__(self) - self.rpc_client = rpc_client + self.stateless_client = stateless_client self.do_connect("") - self.intro = "\n-=TRex Console V1.0=-\n" - self.intro += "\nType 'help' or '?' for supported actions\n" + self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) + self.intro += "\nType 'help' or '?' for supported actions\n" self.verbose = False self.postcmd(False, "") self.user_streams = {} - + self.streams_db = CStreamsDB() + # a cool hack - i stole this function and added space def completenames(self, text, *ignored): dotext = 'do_'+text return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)] + # set verbose on / off def do_verbose (self, line): '''Shows or set verbose mode\n''' @@ -152,40 +184,41 @@ class TrexConsole(cmd.Cmd): elif line == "on": self.verbose = True - self.rpc_client.set_verbose(True) - print "\nverbose set to on\n" + self.stateless_client.set_verbose(True) + print green("\nverbose set to on\n") elif line == "off": self.verbose = False - self.rpc_client.set_verbose(False) - print "\nverbose set to off\n" + self.stateless_client.set_verbose(False) + print green("\nverbose set to off\n") else: - print "\nplease specify 'on' or 'off'\n" + print magenta("\nplease specify 'on' or 'off'\n") # query the server for registered commands def do_query_server(self, line): '''query the RPC server for supported remote commands\n''' - rc, msg = self.rpc_client.query_rpc_server() - if not rc: - print "\n*** " + msg + "\n" + res_ok, msg = self.stateless_client.get_supported_cmds() + if not res_ok: + print format_text("[FAILED]\n", 'red', 'bold') return - - print "\nRPC server supports the following commands: \n\n" + print "\nRPC server supports the following commands:\n" for func in msg: if func: print func - print "\n" + print '' + print format_text("[SUCCESS]\n", 'green', 'bold') + return def do_ping (self, line): '''Pings the RPC server\n''' print "\n-> Pinging RPC server" - rc, msg = self.rpc_client.ping_rpc_server() - if rc: - print "[SUCCESS]\n" + res_ok, msg = self.stateless_client.ping() + if res_ok: + print format_text("[SUCCESS]\n", 'green', 'bold') else: print "\n*** " + msg + "\n" return @@ -195,13 +228,21 @@ class TrexConsole(cmd.Cmd): self.do_acquire(line, True) + def complete_force_acquire(self, text, line, begidx, endidx): + return self.port_auto_complete(text, line, begidx, endidx, acquired=False) + + def extract_port_ids_from_line(self, line): + return {int(x) for x in line.split()} + + def extract_port_ids_from_list(self, port_list): + return {int(x) for x in port_list} + def parse_ports_from_line (self, line): port_list = set() - if line: for port_id in line.split(' '): - if (not port_id.isdigit()) or (int(port_id) < 0) or (int(port_id) >= self.rpc_client.get_port_count()): - print "Please provide a list of ports seperated by spaces between 0 and {0}".format(self.rpc_client.get_port_count() - 1) + if (not port_id.isdigit()) or (int(port_id) < 0) or (int(port_id) >= self.stateless_client.get_port_count()): + print "Please provide a list of ports separated by spaces between 0 and {0}".format(self.stateless_client.get_port_count() - 1) return None port_list.add(int(port_id)) @@ -209,106 +250,122 @@ class TrexConsole(cmd.Cmd): port_list = list(port_list) else: - port_list = [i for i in xrange(0, self.rpc_client.get_port_count())] + port_list = [i for i in xrange(0, self.stateless_client.get_port_count())] return port_list - def do_acquire (self, line, force = False): + + def do_acquire(self, line, force=False): '''Acquire ports\n''' # make sure that the user wants to acquire all - if line == "": - ask = YesNoMenu('Do you want to acquire all ports ? ') + args = line.split() + if len(args) < 1: + print magenta("Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports") + return + + if args[0] == "all": + ask = ConfirmMenu('Are you sure you want to acquire all ports ? ') rc = ask.show() if rc == False: + print yellow("[ABORTED]\n") return - - port_list = self.parse_ports_from_line(line) - if not port_list: - return - - print "\nTrying to acquire ports: " + (" ".join(str(x) for x in port_list)) + "\n" - - rc, resp_list = self.rpc_client.take_ownership(port_list, force) - - if not rc: - print "\n*** " + resp_list + "\n" - return - - for i, rc in enumerate(resp_list): - if rc[0]: - print "Port {0} - Acquired".format(port_list[i]) else: - print "Port {0} - ".format(port_list[i]) + rc[1] - - print "\n" - - def do_release (self, line): - '''Release ports\n''' - - if line: - port_list = self.parse_ports_from_line(line) + port_list = self.stateless_client.get_port_ids() else: - port_list = self.rpc_client.get_owned_ports() - - if not port_list: - return - - rc, resp_list = self.rpc_client.release_ports(port_list) - - - print "\n" - - for i, rc in enumerate(resp_list): - if rc[0]: - print "Port {0} - Released".format(port_list[i]) + port_list = self.extract_port_ids_from_line(line) + + # rc, resp_list = self.stateless_client.take_ownership(port_list, force) + try: + res_ok, log = self.stateless_client.acquire(port_list, force) + self.prompt_response(log) + if not res_ok: + print format_text("[FAILED]\n", 'red', 'bold') + return + print format_text("[SUCCESS]\n", 'green', 'bold') + except ValueError as e: + print magenta(str(e)) + print format_text("[FAILED]\n", 'red', 'bold') + + + def port_auto_complete(self, text, line, begidx, endidx, acquired=True, active=False): + if acquired: + if not active: + ret_list = [x + for x in map(str, self.stateless_client.get_acquired_ports()) + if x.startswith(text)] else: - print "Port {0} - Failed to release port, probably not owned by you or port is under traffic" - - print "\n" - - def do_get_port_stats (self, line): - '''Get ports stats\n''' + ret_list = [x + for x in map(str, self.stateless_client.get_active_ports()) + if x.startswith(text)] + else: + ret_list = [x + for x in map(str, self.stateless_client.get_port_ids()) + if x.startswith(text)] + ret_list.append("all") + return ret_list - port_list = self.parse_ports_from_line(line) - if not port_list: - return - rc, resp_list = self.rpc_client.get_port_stats(port_list) + def complete_acquire(self, text, line, begidx, endidx): + return self.port_auto_complete(text, line, begidx, endidx, acquired=False) - if not rc: - print "\n*** " + resp_list + "\n" - return + def do_release (self, line): + '''Release ports\n''' - for i, rc in enumerate(resp_list): - if rc[0]: - print "\nPort {0} stats:\n{1}\n".format(port_list[i], self.rpc_client.pretty_json(json.dumps(rc[1]))) + # if line: + # port_list = self.parse_ports_from_line(line) + # else: + # port_list = self.stateless_client.get_owned_ports() + args = line.split() + if len(args) < 1: + print "Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports" + if args[0] == "all": + ask = ConfirmMenu('Are you sure you want to release all acquired ports? ') + rc = ask.show() + if rc == False: + print yellow("[ABORTED]\n") + return else: - print "\nPort {0} - ".format(i) + rc[1] + "\n" + port_list = self.stateless_client.get_acquired_ports() + else: + port_list = self.extract_port_ids_from_line(line) - print "\n" + try: + res_ok, log = self.stateless_client.release(port_list) + self.prompt_response(log) + if not res_ok: + print format_text("[FAILED]\n", 'red', 'bold') + return + print format_text("[SUCCESS]\n", 'green', 'bold') + except ValueError as e: + print magenta(str(e)) + print format_text("[FAILED]\n", 'red', 'bold') + return + def complete_release(self, text, line, begidx, endidx): + return self.port_auto_complete(text, line, begidx, endidx) def do_connect (self, line): '''Connects to the server\n''' if line == "": - rc, msg = self.rpc_client.connect() + res_ok, msg = self.stateless_client.connect() else: sp = line.split() if (len(sp) != 2): print "\n[usage] connect [server] [port] or without parameters\n" return - rc, msg = self.rpc_client.connect(sp[0], sp[1]) + res_ok, msg = self.stateless_client.connect(sp[0], sp[1]) - if rc: - print "[SUCCESS]\n" + if res_ok: + print format_text("[SUCCESS]\n", 'green', 'bold') else: print "\n*** " + msg + "\n" + print format_text("[FAILED]\n", 'red', 'bold') return - self.supported_rpc = self.rpc_client.get_supported_cmds() + self.supported_rpc = self.stateless_client.get_supported_cmds().data def do_rpc (self, line): '''Launches a RPC on the server\n''' @@ -341,22 +398,28 @@ class TrexConsole(cmd.Cmd): print "Example: rpc test_add {'x': 12, 'y': 17}\n" return - rc, msg = self.rpc_client.invoke_rpc_method(method, params) - if rc: - print "\nServer Response:\n\n" + self.rpc_client.pretty_json(json.dumps(msg)) + "\n" + res_ok, msg = self.stateless_client.transmit(method, params) + if res_ok: + print "\nServer Response:\n\n" + pretty_json(json.dumps(msg)) + "\n" else: print "\n*** " + msg + "\n" #print "Please try 'reconnect' to reconnect to server" def complete_rpc (self, text, line, begidx, endidx): - return [x for x in self.supported_rpc if x.startswith(text)] + return [x + for x in self.supported_rpc + if x.startswith(text)] def do_status (self, line): '''Shows a graphical console\n''' + if not self.stateless_client.is_connected(): + print "Not connected to server\n" + return + self.do_verbose('off') - trex_status.show_trex_status(self.rpc_client) + trex_status.show_trex_status(self.stateless_client) def do_quit(self, line): '''Exit the client\n''' @@ -364,22 +427,22 @@ class TrexConsole(cmd.Cmd): def do_disconnect (self, line): '''Disconnect from the server\n''' - if not self.rpc_client.is_connected(): + if not self.stateless_client.is_connected(): print "Not connected to server\n" return - rc, msg = self.rpc_client.disconnect() - if rc: - print "[SUCCESS]\n" + res_ok, msg = self.stateless_client.disconnect() + if res_ok: + print format_text("[SUCCESS]\n", 'green', 'bold') else: print msg + "\n" def do_whoami (self, line): '''Prints console user name\n''' - print "\n" + self.rpc_client.whoami() + "\n" - + print "\n" + self.stateless_client.user + "\n" + def postcmd(self, stop, line): - if self.rpc_client.is_connected(): + if self.stateless_client.is_connected(): self.prompt = "TRex > " else: self.supported_rpc = None @@ -390,47 +453,47 @@ class TrexConsole(cmd.Cmd): def default(self, line): print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line) - def do_help (self, line): - '''Shows This Help Screen\n''' - if line: - try: - func = getattr(self, 'help_' + line) - except AttributeError: - try: - doc = getattr(self, 'do_' + line).__doc__ - if doc: - self.stdout.write("%s\n"%str(doc)) - return - except AttributeError: - pass - self.stdout.write("%s\n"%str(self.nohelp % (line,))) - return - func() - return - - print "\nSupported Console Commands:" - print "----------------------------\n" - - cmds = [x[3:] for x in self.get_names() if x.startswith("do_")] - for cmd in cmds: - if cmd == "EOF": - continue - - try: - doc = getattr(self, 'do_' + cmd).__doc__ - if doc: - help = str(doc) - else: - help = "*** Undocumented Function ***\n" - except AttributeError: - help = "*** Undocumented Function ***\n" - - print "{:<30} {:<30}".format(cmd + " - ", help) - - def do_load_stream_list(self, line): + # def do_help (self, line): + # '''Shows This Help Screen\n''' + # if line: + # try: + # func = getattr(self, 'help_' + line) + # except AttributeError: + # try: + # doc = getattr(self, 'do_' + line).__doc__ + # if doc: + # self.stdout.write("%s\n"%str(doc)) + # return + # except AttributeError: + # pass + # self.stdout.write("%s\n"%str(self.nohelp % (line,))) + # return + # func() + # return + # + # print "\nSupported Console Commands:" + # print "----------------------------\n" + # + # cmds = [x[3:] for x in self.get_names() if x.startswith("do_")] + # for cmd in cmds: + # if cmd == "EOF": + # continue + # + # try: + # doc = getattr(self, 'do_' + cmd).__doc__ + # if doc: + # help = str(doc) + # else: + # help = "*** Undocumented Function ***\n" + # except AttributeError: + # help = "*** Undocumented Function ***\n" + # + # print "{:<30} {:<30}".format(cmd + " - ", help) + + def do_stream_db_add(self, line): '''Loads a YAML stream list serialization into user console \n''' args = line.split() - if args >= 2: + if len(args) >= 2: name = args[0] yaml_path = args[1] try: @@ -439,23 +502,24 @@ class TrexConsole(cmd.Cmd): multiplier = 1 stream_list = CStreamList() loaded_obj = stream_list.load_yaml(yaml_path, multiplier) - # print self.rpc_client.pretty_json(json.dumps(loaded_obj)) - if name in self.user_streams: - print "Picked name already exist. Please pick another name." - else: - try: - compiled_streams = stream_list.compile_streams() - self.user_streams[name] = LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump_compiled()) - for k, v in compiled_streams.items()]) - - print "Stream '{0}' loaded successfully".format(name) - except Exception as e: - raise + # print self.stateless_client.pretty_json(json.dumps(loaded_obj)) + try: + compiled_streams = stream_list.compile_streams() + res_ok = self.streams_db.load_streams(name, LoadedStreamList(loaded_obj, + [StreamPack(v.stream_id, v.stream.dump()) + for k, v in compiled_streams.items()])) + if res_ok: + print green("Stream pack '{0}' loaded and added successfully\n".format(name)) + else: + print magenta("Picked name already exist. Please pick another name.\n") + except Exception as e: + print "adding new stream failed due to the following error:\n", str(e) + print format_text("[FAILED]\n", 'red', 'bold') + return else: - print "please provide load name and YAML path, separated by space.\n" \ - "Optionally, you may provide a third argument to specify multiplier." + print magenta("please provide load name and YAML path, separated by space.\n" + "Optionally, you may provide a third argument to specify multiplier.\n") @staticmethod def tree_autocomplete(text): @@ -470,92 +534,271 @@ class TrexConsole(cmd.Cmd): if x.startswith(start_string)] - def complete_load_stream_list(self, text, line, begidx, endidx): + def complete_stream_db_add(self, text, line, begidx, endidx): arg_num = len(line.split()) - 1 if arg_num == 2: - return TrexConsole.tree_autocomplete(line.split()[-1]) + return TRexConsole.tree_autocomplete(line.split()[-1]) else: return [text] - def do_show_stream_list(self, line): + def do_stream_db_show(self, line): '''Shows the loaded stream list named [name] \n''' args = line.split() if args: list_name = args[0] try: - stream = self.user_streams[list_name] + stream = self.streams_db.get_stream_pack(list_name)#user_streams[list_name] if len(args) >= 2 and args[1] == "full": - print self.rpc_client.pretty_json(json.dumps(stream.compiled)) + print pretty_json(json.dumps(stream.compiled)) else: - print self.rpc_client.pretty_json(json.dumps(stream.loaded)) + print pretty_json(json.dumps(stream.loaded)) except KeyError as e: print "Unknown stream list name provided" else: - print "\nAvailable stream lists:\n{0}".format(', '.join([x - for x in self.user_streams.keys()])) + print "Available stream packs:\n{0}".format(', '.join(sorted(self.streams_db.get_loaded_streams_names()))) - def complete_show_stream_list(self, text, line, begidx, endidx): + def complete_stream_db_show(self, text, line, begidx, endidx): return [x - for x in self.user_streams.keys() + for x in self.streams_db.get_loaded_streams_names() if x.startswith(text)] + def do_stream_db_remove(self, line): + '''Removes a single loaded stream packs from loaded stream pack repository\n''' + args = line.split() + if args: + removed_streams = self.streams_db.remove_stream_packs(*args) + if removed_streams: + print green("The following stream packs were removed:") + print bold(", ".join(sorted(removed_streams))) + print format_text("[SUCCESS]\n", 'green', 'bold') + else: + print red("No streams were removed. Make sure to provide valid stream pack names.") + else: + print magenta("Please provide stream pack name(s), separated with spaces.") + + def do_stream_db_clear(self, line): + '''Clears all loaded stream packs from loaded stream pack repository\n''' + self.streams_db.clear() + print format_text("[SUCCESS]\n", 'green', 'bold') + + + def complete_stream_db_remove(self, text, line, begidx, endidx): + return [x + for x in self.streams_db.get_loaded_streams_names() + if x.startswith(text)] + + def do_attach(self, line): + '''Assign loaded stream pack into specified ports on TRex\n''' args = line.split() - if len(args) >= 1: + if len(args) >= 2: + stream_pack_name = args[0] + stream_list = self.streams_db.get_stream_pack(stream_pack_name) #user_streams[args[0]] + if not stream_list: + print "Provided stream list name '{0}' doesn't exists.".format(stream_pack_name) + print format_text("[FAILED]\n", 'red', 'bold') + return + if args[1] == "all": + ask = ConfirmMenu('Are you sure you want to release all acquired ports? ') + rc = ask.show() + if rc == False: + print yellow("[ABORTED]\n") + return + else: + port_list = self.stateless_client.get_acquired_ports() + else: + port_list = self.extract_port_ids_from_line(' '.join(args[1:])) + owned = set(self.stateless_client.get_acquired_ports()) try: - stream_list = self.user_streams[args[0]] - port_list = self.parse_ports_from_line(' '.join(args[1:])) - owned = set(self.rpc_client.get_owned_ports()) if set(port_list).issubset(owned): - rc, resp_list = self.rpc_client.add_stream(port_list, stream_list.compiled) - if not rc: - print "\n*** " + resp_list + "\n" + res_ok, log = self.stateless_client.add_stream_pack(port_list, *stream_list.compiled) + # res_ok, msg = self.stateless_client.add_stream(port_list, stream_list.compiled) + self.prompt_response(log) + if not res_ok: + print format_text("[FAILED]\n", 'red', 'bold') return + print format_text("[SUCCESS]\n", 'green', 'bold') + return else: - print "Not all desired ports are aquired.\n" \ + print "Not all desired ports are acquired.\n" \ "Acquired ports are: {acq}\n" \ "Requested ports: {req}\n" \ "Missing ports: {miss}".format(acq=list(owned), req=port_list, miss=list(set(port_list).difference(owned))) - except KeyError as e: - cause = e.args[0] - print "Provided stream list name '{0}' doesn't exists.".format(cause) + print format_text("[FAILED]\n", 'red', 'bold') + except ValueError as e: + print magenta(str(e)) + print format_text("[FAILED]\n", 'red', 'bold') + else: + print magenta("Please provide list name and ports to attach to, " + "or specify 'all' to attach all owned ports.\n") + + def complete_attach(self, text, line, begidx, endidx): + arg_num = len(line.split()) - 1 + if arg_num == 1: + # return optional streams packs + if line.endswith(" "): + return self.port_auto_complete(text, line, begidx, endidx) + return [x + for x in self.streams_db.get_loaded_streams_names() + if x.startswith(text)] + elif arg_num >= 2: + # return optional ports to attach to + return self.port_auto_complete(text, line, begidx, endidx) else: - print "Please provide list name and ports to attach to, or leave empty to attach to all ports." + return [text] + def prompt_response(self, response_obj): + resp_list = response_obj if isinstance(response_obj, list) else [response_obj] + def format_return_status(return_status): + if return_status: + return green("OK") + else: + return red("FAIL") + for response in resp_list: + response_str = "{id:^3} - {msg} ({stat})".format(id=response.id, + msg=response.msg, + stat=format_return_status(response.success)) + print response_str + return + def do_remove_all_streams(self, line): + '''Acquire ports\n''' + # make sure that the user wants to acquire all + args = line.split() + if len(args) < 1: + print magenta("Please provide a list of ports separated by spaces, " + "or specify 'all' to remove from all acquired ports") + return + if args[0] == "all": + ask = ConfirmMenu('Are you sure you want to remove all stream packs from all acquired ports? ') + rc = ask.show() + if rc == False: + print yellow("[ABORTED]\n") + return + else: + port_list = self.stateless_client.get_acquired_ports() + else: + port_list = self.extract_port_ids_from_line(line) + + # rc, resp_list = self.stateless_client.take_ownership(port_list, force) + try: + res_ok, log = self.stateless_client.remove_all_streams(port_list) + self.prompt_response(log) + if not res_ok: + print format_text("[FAILED]\n", 'red', 'bold') + return + print format_text("[SUCCESS]\n", 'green', 'bold') + except ValueError as e: + print magenta(str(e)) + print format_text("[FAILED]\n", 'red', 'bold') + def complete_remove_all_streams(self, text, line, begidx, endidx): + return self.port_auto_complete(text, line, begidx, endidx) + def do_start_traffic(self, line): + '''Start pre-submitted traffic in specified ports on TRex\n''' + # make sure that the user wants to acquire all + # parser = parsing_opts.gen_parser("start_traffic", self.do_start_traffic.__doc__, + # parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.MULTIPLIER) + # opts = parser.parse_args(line.split()) + # + # print opts + # return + # # return + # # if not opts.port_list: + # # print magenta("Please provide a list of ports separated by spaces, " + # # "or specify 'all' to start traffic on all acquired ports") + # # return + # + # if "all" in opts.port_list: + # ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ') + # rc = ask.show() + # if rc == False: + # print yellow("[ABORTED]\n") + # return + # else: + # port_list = self.stateless_client.get_acquired_ports() + # else: + # try: + # port_list = self.extract_port_ids_from_list(opts.port_list) + # except ValueError as e: + # print magenta(e) + # return + args = line.split() + if len(args) < 1: + print magenta("Please provide a list of ports separated by spaces, " + "or specify 'all' to start traffic on all acquired ports") + return + if args[0] == "all": + ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ') + rc = ask.show() + if rc == False: + print yellow("[ABORTED]\n") + return + else: + port_list = self.stateless_client.get_acquired_ports() + else: + port_list = self.extract_port_ids_from_line(line) + try: + res_ok, log = self.stateless_client.start_traffic(1.0, port_id=port_list) + self.prompt_response(log) + if not res_ok: + print format_text("[FAILED]\n", 'red', 'bold') + return + print format_text("[SUCCESS]\n", 'green', 'bold') + except ValueError as e: + print magenta(str(e)) + print format_text("[FAILED]\n", 'red', 'bold') + def complete_start_traffic(self, text, line, begidx, endidx): + return self.port_auto_complete(text, line, begidx, endidx) + def help_start_traffic(self): + self.do_start_traffic("-h") - # adds a very simple stream - def do_add_simple_stream (self, line): - if line == "": - add_stream = AddStreamMenu() - add_stream.show() + def do_stop_traffic(self, line): + '''Stop active traffic in specified ports on TRex\n''' + # make sure that the user wants to acquire all + args = line.split() + if len(args) < 1: + print magenta("Please provide a list of ports separated by spaces, " + "or specify 'all' to stop traffic on all acquired ports") return + if args[0] == "all": + ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ') + rc = ask.show() + if rc == False: + print yellow("[ABORTED]\n") + return + else: + port_list = self.stateless_client.get_active_ports() + else: + port_list = self.extract_port_ids_from_line(line) - params = line.split() - port_id = int(params[0]) - stream_id = int(params[1]) + try: + res_ok, log = self.stateless_client.stop_traffic(port_list) + self.prompt_response(log) + if not res_ok: + print format_text("[FAILED]\n", 'red', 'bold') + return + print format_text("[SUCCESS]\n", 'green', 'bold') + except ValueError as e: + print magenta(str(e)) + print format_text("[FAILED]\n", 'red', 'bold') - packet = [0xFF,0xFF,0xFF] - rc, msg = self.rpc_client.add_stream(port_id = port_id, stream_id = stream_id, isg = 1.1, next_stream_id = -1, packet = packet) - if rc: - print "\nServer Response:\n\n" + self.rpc_client.pretty_json(json.dumps(msg)) + "\n" - else: - print "\n*** " + msg + "\n" + def complete_stop_traffic(self, text, line, begidx, endidx): + return self.port_auto_complete(text, line, begidx, endidx, active=True) # aliasing do_exit = do_EOF = do_q = do_quit -def setParserOptions (): +def setParserOptions(): parser = argparse.ArgumentParser(prog="trex_console.py") parser.add_argument("-s", "--server", help = "TRex Server [default is localhost]", @@ -566,22 +809,30 @@ def setParserOptions (): default = 5050, type = int) - parser.add_argument("-u", "--user", help = "User Name [default is random generated]\n", - default = 'user_' + ''.join(random.choice(string.digits) for _ in range(5)), + parser.add_argument("-z", "--pub", help = "TRex Async Publisher Port [default is 4500]\n", + default = 4500, + type = int) + + parser.add_argument("-u", "--user", help = "User Name [default is currently logged in user]\n", + default = get_current_user(), type = str) + parser.add_argument("--verbose", dest="verbose", + action="store_true", help="Switch ON verbose option. Default is: OFF.", + default = False) + return parser -def main (): +def main(): parser = setParserOptions() - options = parser.parse_args(sys.argv[1:]) + options = parser.parse_args() - # RPC client - rpc_client = TrexStatelessClient(options.server, options.port, options.user) + # Stateless client connection + stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub) # console try: - console = TrexConsole(rpc_client) + console = TRexConsole(stateless_client, options.verbose) console.cmdloop() except KeyboardInterrupt as e: print "\n\n*** Caught Ctrl + C... Exiting...\n\n" diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py index 2c5a648f..2b97d7d3 100644 --- a/scripts/automation/trex_control_plane/console/trex_status.py +++ b/scripts/automation/trex_control_plane/console/trex_status.py @@ -11,22 +11,27 @@ import datetime g_curses_active = False +################### utils ################# + # simple percetange show def percentage (a, total): x = int ((float(a) / total) * 100) return str(x) + "%" -# simple float to human readable -def float_to_human_readable (size, suffix = "bps"): - for unit in ['','K','M','G']: - if abs(size) < 1024.0: - return "%3.1f %s%s" % (size, unit, suffix) - size /= 1024.0 - return "NaN" +################### panels ################# # panel object class TrexStatusPanel(object): - def __init__ (self, h, l, y, x, headline): + def __init__ (self, h, l, y, x, headline, status_obj): + + self.status_obj = status_obj + + self.log = status_obj.log + self.stateless_client = status_obj.stateless_client + + self.stats = status_obj.stats + self.general_stats = status_obj.general_stats + self.h = h self.l = l self.y = y @@ -53,64 +58,26 @@ class TrexStatusPanel(object): return self.win -# total stats (ports + global) -class Stats(): - def __init__ (self, rpc_client, port_list, interval = 100): - - self.rpc_client = rpc_client - - self.port_list = port_list - self.port_stats = {} - - self.interval = interval - self.delay_count = 0 - - def get_port_stats (self, port_id): - if self.port_stats.get(port_id): - return self.port_stats[port_id] - else: - return None - - def query_sync (self): - self.delay_count += 1 - if self.delay_count < self.interval: - return - - self.delay_count = 0 - - # query global stats - - # query port stats - - rc, resp_list = self.rpc_client.get_port_stats(self.port_list) - if not rc: - return - - for i, rc in enumerate(resp_list): - if rc[0]: - self.port_stats[self.port_list[i]] = rc[1] - - # various kinds of panels # Server Info Panel class ServerInfoPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj): - super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:") - - self.status_obj = status_obj + super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj) def draw (self): - if self.status_obj.server_version == None: + if not self.status_obj.server_version : + return + + if not self.status_obj.server_sys_info: return - self.clear() - connection_details = self.status_obj.rpc_client.get_connection_details() + self.clear() - self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(connection_details['port']))) + self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port()))) self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"])) self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:", self.status_obj.server_version["build_date"] + " @ " + @@ -123,7 +90,7 @@ class ServerInfoPanel(TrexStatusPanel): self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"])) - ports_owned = " ".join(str(x) for x in self.status_obj.rpc_client.get_owned_ports()) + ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports_list) if not ports_owned: ports_owned = "None" @@ -134,92 +101,123 @@ class ServerInfoPanel(TrexStatusPanel): class GeneralInfoPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj): - super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:") - - self.status_obj = status_obj + super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj) def draw (self): - pass + self.clear() + + if not self.general_stats.is_online(): + self.getwin().addstr(3, 2, "No Published Data From TRex Server") + return + + self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util"))) + + self.getwin().addstr(6, 2, "{:<30} {:} / {:}".format("Total Tx. rate:", + self.general_stats.get("m_tx_bps", format = True, suffix = "bps"), + self.general_stats.get("m_tx_pps", format = True, suffix = "pps"))) + + + self.getwin().addstr(8, 2, "{:<30} {:} / {:}".format("Total Tx:", + self.general_stats.get_rel("m_total_tx_bytes", format = True, suffix = "B"), + self.general_stats.get_rel("m_total_tx_pkts", format = True, suffix = "pkts"))) + + self.getwin().addstr(11, 2, "{:<30} {:} / {:}".format("Total Rx. rate:", + self.general_stats.get("m_rx_bps", format = True, suffix = "bps"), + self.general_stats.get("m_rx_pps", format = True, suffix = "pps"))) + + + self.getwin().addstr(13, 2, "{:<30} {:} / {:}".format("Total Rx:", + self.general_stats.get_rel("m_total_rx_bytes", format = True, suffix = "B"), + self.general_stats.get_rel("m_total_rx_pkts", format = True, suffix = "pkts"))) # all ports stats class PortsStatsPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj): - super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:") + super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj) - self.status_obj = status_obj def draw (self): self.clear() - owned_ports = self.status_obj.rpc_client.get_owned_ports() + owned_ports = self.status_obj.owned_ports_list if not owned_ports: self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports") return # table header - self.getwin().addstr(3, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( - "Port ID", "Tx [pps]", "Tx [bps]", "Tx [bytes]", "Rx [pps]", "Rx [bps]", "Rx [bytes]")) + self.getwin().addstr(3, 2, "{:^15} {:^30} {:^30} {:^30}".format( + "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) + - # port loop - self.status_obj.stats.query_sync() for i, port_index in enumerate(owned_ports): port_stats = self.status_obj.stats.get_port_stats(port_index) if port_stats: - self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15,.2f} {:^15,.2f} {:^15,} {:^15,.2f} {:^15,.2f} {:^15,}".format( + self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format( "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), - port_stats["tx_pps"], - port_stats["tx_bps"], - port_stats["total_tx_bytes"], - port_stats["rx_pps"], - port_stats["rx_bps"], - port_stats["total_rx_bytes"])) - + "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), + port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), + + "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), + port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), + "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), + port_stats.get_rel("ibytes", format = True, suffix = "B")))) + else: - self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( + self.getwin().addstr(5 + (i * 4), 2, 2, "{:^15} {:^30} {:^30} {:^30}".format( "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), "N/A", "N/A", "N/A", - "N/A", - "N/A", "N/A")) + + # old format +# if port_stats: +# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), +# port_stats.get("m_total_tx_pps", format = True, suffix = "pps"), +# port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), +# port_stats.get_rel("obytes", format = True, suffix = "B"), +# port_stats.get("m_total_rx_pps", format = True, suffix = "pps"), +# port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), +# port_stats.get_rel("ibytes", format = True, suffix = "B"))) +# +# else: +# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( +# "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]), +# "N/A", +# "N/A", +# "N/A", +# "N/A", +# "N/A", +# "N/A")) + # control panel class ControlPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj): - super(ControlPanel, self).__init__(h, l, y, x, "") + super(ControlPanel, self).__init__(h, l, y, x, "", status_obj) - self.status_obj = status_obj def draw (self): self.clear() self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit" - .format(self.status_obj.rpc_client.get_port_count() - 1)) + .format(self.status_obj.stateless_client.get_port_count() - 1)) - index = 3 - - cut = len(self.status_obj.log) - 4 - if cut < 0: - cut = 0 - - for l in self.status_obj.log[cut:]: - self.getwin().addstr(index, 2, l) - index += 1 + self.log.draw(self.getwin(), 2, 3) # specific ports panels class SinglePortPanel(TrexStatusPanel): def __init__ (self, h, l, y, x, status_obj, port_id): - super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id)) + super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj) - self.status_obj = status_obj self.port_id = port_id def draw (self): @@ -227,7 +225,7 @@ class SinglePortPanel(TrexStatusPanel): self.clear() - if not self.port_id in self.status_obj.rpc_client.get_owned_ports(): + if not self.port_id in self.status_obj.owned_ports_list: self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id)) return @@ -241,16 +239,19 @@ class SinglePortPanel(TrexStatusPanel): y += 2 # streams - if 'streams' in self.status_obj.snapshot[self.port_id]: - for stream_id, stream in self.status_obj.snapshot[self.port_id]['streams'].iteritems(): + + if 'streams' in self.status_obj.owned_ports[str(self.port_id)]: + stream_info = self.status_obj.owned_ports[str(self.port_id)]['streams'] + + for stream_id, stream in sorted(stream_info.iteritems(), key=operator.itemgetter(0)): self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( stream_id, - ("True" if stream['stream']['enabled'] else "False"), - stream['stream']['mode']['type'], - ("True" if stream['stream']['self_start'] else "False"), - stream['stream']['isg'], - (stream['stream']['next_stream_id'] if stream['stream']['next_stream_id'] != -1 else "None"), - ("{0} instr.".format(len(stream['stream']['vm'])) if stream['stream']['vm'] else "None"))) + ("True" if stream['enabled'] else "False"), + stream['mode']['type'], + ("True" if stream['self_start'] else "False"), + stream['isg'], + (stream['next_stream_id'] if stream['next_stream_id'] != -1 else "None"), + ("{0} instr.".format(len(stream['vm'])) if stream['vm'] else "None"))) y += 1 @@ -260,128 +261,180 @@ class SinglePortPanel(TrexStatusPanel): self.getwin().addstr(y, 2, "Traffic:", curses.A_UNDERLINE) y += 2 - self.status_obj.stats.query_sync() - port_stats = self.status_obj.stats.get_port_stats(self.port_id) - # table header - self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( - "Port ID", "Tx [pps]", "Tx [bps]", "Tx [bytes]", "Rx [pps]", "Rx [bps]", "Rx [bytes]")) + # table header + self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( + "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]")) + y += 2 - if port_stats: - self.getwin().addstr(y, 2, "{:^15} {:^15,} {:^15,} {:^15,} {:^15,} {:^15,} {:^15,}".format( - "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), - port_stats["tx_pps"], - port_stats["tx_bps"], - port_stats["total_tx_bytes"], - port_stats["rx_pps"], - port_stats["rx_bps"], - port_stats["total_rx_bytes"])) + port_stats = self.status_obj.stats.get_port_stats(self.port_id) + if port_stats: + self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format( + "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), + "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"), + port_stats.get("m_total_tx_pps", format = True, suffix = "pps")), + + "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"), + port_stats.get("m_total_rx_pps", format = True, suffix = "pps")), + "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"), + port_stats.get_rel("ibytes", format = True, suffix = "B")))) + else: - self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format( + self.getwin().addstr(y + (i * 4), 2, 2, "{:^15} {:^30} {:^30} {:^30}".format( "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]), "N/A", "N/A", "N/A", - "N/A", - "N/A", "N/A")) - y += 2 -# status object -class TrexStatus(): - def __init__ (self, stdscr, rpc_client): - self.stdscr = stdscr +################### main objects ################# + +# status log +class TrexStatusLog(): + def __init__ (self): self.log = [] - self.rpc_client = rpc_client - self.snapshot = self.rpc_client.snapshot() + def add_event (self, msg): + self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) - # fetch server info - self.get_server_info() + def draw (self, window, x, y, max_lines = 4): + index = y + + cut = len(self.log) - max_lines + if cut < 0: + cut = 0 + + for msg in self.log[cut:]: + window.addstr(index, x, msg) + index += 1 + +# status commands +class TrexStatusCommands(): + def __init__ (self, status_object): - # create stats objects - self.stats = Stats(rpc_client, self.rpc_client.get_owned_ports()) + self.status_object = status_object + + self.stateless_client = status_object.stateless_client + self.log = self.status_object.log - # register actions self.actions = {} - self.actions[ord('q')] = self.action_quit - self.actions[ord('p')] = self.action_ping - self.actions[ord('f')] = self.action_freeze + self.actions[ord('q')] = self._quit + self.actions[ord('p')] = self._ping + self.actions[ord('f')] = self._freeze - self.actions[ord('g')] = self.action_show_ports_stats + self.actions[ord('g')] = self._show_ports_stats - for port_id in xrange(0, self.rpc_client.get_port_count()): - self.actions[ord('0') + port_id] = self.action_show_port_generator(port_id) + # register all the available ports shortcuts + for port_id in xrange(0, self.stateless_client.get_port_count()): + self.actions[ord('0') + port_id] = self._show_port_generator(port_id) + + + # handle a key pressed + def handle (self, ch): + if ch in self.actions: + return self.actions[ch]() + else: + self.log.add_event("Unknown key pressed, please see legend") + return True + + # show all ports + def _show_ports_stats (self): + self.log.add_event("Switching to all ports view") + self.status_object.stats_panel = self.status_object.ports_stats_panel - - # all ports stats - def action_show_ports_stats (self): - self.add_log_event("Switching to all ports view") - self.stats_panel = self.ports_stats_panel - return True - # function generator for different ports requests - def action_show_port_generator (self, port_id): - def action_show_port(): - self.add_log_event("Switching panel to port {0}".format(port_id)) - self.stats_panel = self.ports_panels[port_id] + + # function generator for different ports requests + def _show_port_generator (self, port_id): + def _show_port(): + self.log.add_event("Switching panel to port {0}".format(port_id)) + self.status_object.stats_panel = self.status_object.ports_panels[port_id] return True - return action_show_port + return _show_port - def action_freeze (self): - self.update_active = not self.update_active - self.add_log_event("Update continued" if self.update_active else "Update stopped") + def _freeze (self): + self.status_object.update_active = not self.status_object.update_active + self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped") return True - def action_quit(self): + def _quit(self): return False - def action_ping (self): - self.add_log_event("Pinging RPC server") + def _ping (self): + self.log.add_event("Pinging RPC server") - rc, msg = self.rpc_client.ping_rpc_server() + rc, msg = self.stateless_client.ping() if rc: - self.add_log_event("Server replied: '{0}'".format(msg)) + self.log.add_event("Server replied: '{0}'".format(msg)) else: - self.add_log_event("Failed to get reply") + self.log.add_event("Failed to get reply") return True - def get_server_info (self): +# status object +# +# +# +class TrexStatus(): + def __init__ (self, stdscr, stateless_client): + self.stdscr = stdscr - self.server_version = self.rpc_client.get_rpc_server_version() - self.server_sys_info = self.rpc_client.get_system_info() + self.stateless_client = stateless_client + self.log = TrexStatusLog() + self.cmds = TrexStatusCommands(self) - def add_log_event (self, msg): - self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) + self.stats = stateless_client.get_stats_async() + self.general_stats = stateless_client.get_stats_async().get_general_stats() - # control panel - def update_control (self): - self.control_panel.clear() + # fetch server info + rc, self.server_sys_info = self.stateless_client.get_system_info() + if not rc: + return - self.control_panel.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit" - .format(self.rpc_client.get_port_count() - 1)) + rc, self.server_version = self.stateless_client.get_version() + if not rc: + return - index = 3 + # list of owned ports + self.owned_ports_list = self.stateless_client.get_acquired_ports() + + # data per port + self.owned_ports = {} - cut = len(self.log) - 4 - if cut < 0: - cut = 0 + for port_id in self.owned_ports_list: + self.owned_ports[str(port_id)] = {} + self.owned_ports[str(port_id)]['streams'] = {} - for l in self.log[cut:]: - self.control_panel.getwin().addstr(index, 2, l) - index += 1 + rc, stream_list = self.stateless_client.get_all_streams(port_id) + if not rc: + raise Exception("unable to get streams") + self.owned_ports[str(port_id)] = stream_list + + + try: + curses.curs_set(0) + except: + pass + + curses.use_default_colors() + self.stdscr.nodelay(1) + curses.nonl() + curses.noecho() + + self.generate_layout() + + def generate_layout (self): self.max_y = self.stdscr.getmaxyx()[0] self.max_x = self.stdscr.getmaxyx()[1] @@ -394,7 +447,7 @@ class TrexStatus(): self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self) self.ports_panels = {} - for i in xrange(0, self.rpc_client.get_port_count()): + for i in xrange(0, self.stateless_client.get_port_count()): self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i) # at start time we point to the main one @@ -411,28 +464,27 @@ class TrexStatus(): # no key , continue if ch == curses.ERR: return True - - # check for registered function - if ch in self.actions: - return self.actions[ch]() - else: - self.add_log_event("Unknown key pressed, please see legend") - - return True + + return self.cmds.handle(ch) # main run entry point def run (self): - try: - curses.curs_set(0) - except: - pass - curses.use_default_colors() - self.stdscr.nodelay(1) - curses.nonl() - curses.noecho() + # list of owned ports + self.owned_ports_list = self.stateless_client.get_acquired_ports() - self.generate_layout() + # data per port + self.owned_ports = {} + + for port_id in self.owned_ports_list: + self.owned_ports[str(port_id)] = {} + self.owned_ports[str(port_id)]['streams'] = {} + + rc, stream_list = self.stateless_client.get_all_streams(port_id) + if not rc: + raise Exception("unable to get streams") + + self.owned_ports[str(port_id)] = stream_list self.update_active = True while (True): @@ -454,14 +506,21 @@ class TrexStatus(): sleep(0.01) -def show_trex_status_internal (stdscr, rpc_client): - trex_status = TrexStatus(stdscr, rpc_client) +# global container +trex_status = None + +def show_trex_status_internal (stdscr, stateless_client): + global trex_status + + if trex_status == None: + trex_status = TrexStatus(stdscr, stateless_client) + trex_status.run() -def show_trex_status (rpc_client): +def show_trex_status (stateless_client): try: - curses.wrapper(show_trex_status_internal, rpc_client) + curses.wrapper(show_trex_status_internal, stateless_client) except KeyboardInterrupt: curses.endwin() diff --git a/scripts/automation/trex_control_plane/examples/stateless_example.py b/scripts/automation/trex_control_plane/examples/stateless_example.py new file mode 100755 index 00000000..bb0fe983 --- /dev/null +++ b/scripts/automation/trex_control_plane/examples/stateless_example.py @@ -0,0 +1,30 @@ +#!/router/bin/python + +import trex_root_path +from client.trex_hltapi import CTRexHltApi + +if __name__ == "__main__": + port_list = [1,2] + try: + hlt_client = CTRexHltApi() + con = hlt_client.connect("localhost", port_list, "danklei", break_locks=True, reset=True)#, port=6666) + print con + + res = hlt_client.traffic_config("create", 1)#, ip_src_addr="2000.2.2") + print res + res = hlt_client.traffic_config("create", 2)#, ip_src_addr="2000.2.2") + print res + + res = hlt_client.traffic_control("run", [1, 2])#, ip_src_addr="2000.2.2") + print res + + res = hlt_client.traffic_control("stop", [1, 2])#, ip_src_addr="2000.2.2") + print res + + + + except Exception as e: + raise + finally: + res = hlt_client.cleanup_session(port_list) + print res
\ No newline at end of file diff --git a/scripts/cap2/many_client_example.yaml b/scripts/cap2/many_client_example.yaml new file mode 100644 index 00000000..7908a5a4 --- /dev/null +++ b/scripts/cap2/many_client_example.yaml @@ -0,0 +1,42 @@ +- duration : 1.0 + generator : + distribution : "random" + clients_start : "20.0.0.1" + clients_end : "20.10.85.255" + servers_start : "90.0.0.1" + servers_end : "90.0.255.255" + clients_per_gb : 201 + min_clients : 101 + dual_port_mask : "1.0.0.0" + tcp_aging : 0 + udp_aging : 0 + generator_clients : + - name : "c1" + distribution : "random" + ip_start : "21.0.0.1" + ip_end : "21.10.255.255" + - name : "c2" + distribution : "random" + ip_start : "36.0.0.1" + ip_end : "36.0.1.254" + generator_servers : + - name : "s1" + distribution : "random" + ip_start : "22.0.0.1" + ip_end : "22.1.255.255" + track_ports : false + - name : "s2" + distribution : "random" + ip_start : "38.0.0.1" + ip_end : "38.0.3.255" + track_ports : false + mac : [0x0,0x0,0x0,0x1,0x0,0x00] + cap_info : + - name: cap2/http_get.pcap + client_pool: "c1" + server_pool: "s1" + cps : 1.0 + ipg : 100 + rtt : 10000 + w : 1 + diff --git a/scripts/exp/dns-0-ex.erf b/scripts/exp/dns-0-ex.erf Binary files differindex 5ffffcb3..fdb19009 100755 --- a/scripts/exp/dns-0-ex.erf +++ b/scripts/exp/dns-0-ex.erf diff --git a/scripts/exp/dns-0.erf b/scripts/exp/dns-0.erf Binary files differindex d8d601b7..fdb19009 100644 --- a/scripts/exp/dns-0.erf +++ b/scripts/exp/dns-0.erf diff --git a/scripts/exp/dns_e-0-ex.erf b/scripts/exp/dns_e-0-ex.erf Binary files differindex 7ebfd69a..e0de09fc 100755 --- a/scripts/exp/dns_e-0-ex.erf +++ b/scripts/exp/dns_e-0-ex.erf diff --git a/scripts/exp/dns_e-0.erf b/scripts/exp/dns_e-0.erf Binary files differindex 7ebfd69a..e0de09fc 100644 --- a/scripts/exp/dns_e-0.erf +++ b/scripts/exp/dns_e-0.erf diff --git a/scripts/exp/dns_flip-0-ex.erf b/scripts/exp/dns_flip-0-ex.erf Binary files differindex f6074ad7..774f0fdf 100755 --- a/scripts/exp/dns_flip-0-ex.erf +++ b/scripts/exp/dns_flip-0-ex.erf diff --git a/scripts/exp/dns_flip-0.erf b/scripts/exp/dns_flip-0.erf Binary files differindex f6074ad7..774f0fdf 100644 --- a/scripts/exp/dns_flip-0.erf +++ b/scripts/exp/dns_flip-0.erf diff --git a/scripts/exp/dns_ipv6-0-ex.erf b/scripts/exp/dns_ipv6-0-ex.erf Binary files differindex c47a6496..e0d33efc 100755 --- a/scripts/exp/dns_ipv6-0-ex.erf +++ b/scripts/exp/dns_ipv6-0-ex.erf diff --git a/scripts/exp/dns_ipv6-0.erf b/scripts/exp/dns_ipv6-0.erf Binary files differindex 53dee235..e0d33efc 100644 --- a/scripts/exp/dns_ipv6-0.erf +++ b/scripts/exp/dns_ipv6-0.erf diff --git a/scripts/exp/dns_one_server-0-ex.erf b/scripts/exp/dns_one_server-0-ex.erf Binary files differindex 0d3d447b..15323016 100755 --- a/scripts/exp/dns_one_server-0-ex.erf +++ b/scripts/exp/dns_one_server-0-ex.erf diff --git a/scripts/exp/dns_one_server-0.erf b/scripts/exp/dns_one_server-0.erf Binary files differindex ef76a69b..15323016 100644 --- a/scripts/exp/dns_one_server-0.erf +++ b/scripts/exp/dns_one_server-0.erf diff --git a/scripts/exp/dns_p-0-ex.erf b/scripts/exp/dns_p-0-ex.erf Binary files differindex ec313584..7d93c1d3 100755 --- a/scripts/exp/dns_p-0-ex.erf +++ b/scripts/exp/dns_p-0-ex.erf diff --git a/scripts/exp/dns_p-0.erf b/scripts/exp/dns_p-0.erf Binary files differindex ec313584..7d93c1d3 100644 --- a/scripts/exp/dns_p-0.erf +++ b/scripts/exp/dns_p-0.erf diff --git a/scripts/exp/dyn_pyld1-0-ex.erf b/scripts/exp/dyn_pyld1-0-ex.erf Binary files differindex 7d2089db..6a0028dc 100755 --- a/scripts/exp/dyn_pyld1-0-ex.erf +++ b/scripts/exp/dyn_pyld1-0-ex.erf diff --git a/scripts/exp/dyn_pyld1-0.erf b/scripts/exp/dyn_pyld1-0.erf Binary files differindex 175a810c..6a0028dc 100644 --- a/scripts/exp/dyn_pyld1-0.erf +++ b/scripts/exp/dyn_pyld1-0.erf diff --git a/scripts/exp/imix-0-ex.erf b/scripts/exp/imix-0-ex.erf Binary files differindex 233e6b31..07fb2ace 100755 --- a/scripts/exp/imix-0-ex.erf +++ b/scripts/exp/imix-0-ex.erf diff --git a/scripts/exp/imix-0.erf b/scripts/exp/imix-0.erf Binary files differindex c41a3006..07fb2ace 100644 --- a/scripts/exp/imix-0.erf +++ b/scripts/exp/imix-0.erf diff --git a/scripts/exp/imix_v6-0-ex.erf b/scripts/exp/imix_v6-0-ex.erf Binary files differindex 56412091..c5f247d1 100755 --- a/scripts/exp/imix_v6-0-ex.erf +++ b/scripts/exp/imix_v6-0-ex.erf diff --git a/scripts/exp/imix_v6-0.erf b/scripts/exp/imix_v6-0.erf Binary files differindex a85ed2b9..c5f247d1 100644 --- a/scripts/exp/imix_v6-0.erf +++ b/scripts/exp/imix_v6-0.erf diff --git a/scripts/exp/limit_single_pkt-0-ex.erf b/scripts/exp/limit_single_pkt-0-ex.erf Binary files differindex 3f7f0ff2..adc6fd46 100755 --- a/scripts/exp/limit_single_pkt-0-ex.erf +++ b/scripts/exp/limit_single_pkt-0-ex.erf diff --git a/scripts/exp/limit_single_pkt-0.erf b/scripts/exp/limit_single_pkt-0.erf Binary files differindex 548d2e3f..adc6fd46 100644 --- a/scripts/exp/limit_single_pkt-0.erf +++ b/scripts/exp/limit_single_pkt-0.erf diff --git a/scripts/exp/sfr2-0-ex.erf b/scripts/exp/sfr2-0-ex.erf Binary files differindex 5e2b791f..e5dfc4c3 100755 --- a/scripts/exp/sfr2-0-ex.erf +++ b/scripts/exp/sfr2-0-ex.erf diff --git a/scripts/exp/sfr2-0.erf b/scripts/exp/sfr2-0.erf Binary files differindex bf5ff3ef..e5dfc4c3 100644 --- a/scripts/exp/sfr2-0.erf +++ b/scripts/exp/sfr2-0.erf |