diff options
author | 2015-12-13 17:18:02 +0200 | |
---|---|---|
committer | 2015-12-13 17:18:02 +0200 | |
commit | 9738e267d806223ee25e013b5959ccac26c1a14a (patch) | |
tree | 590c8f329f2ab68c7da3f1f8f4c55f81243a08bc /scripts/automation/trex_control_plane/client | |
parent | a573adc6395c9ad8d96978508a07a654ef48c7a9 (diff) | |
parent | 301341ddb1bf17387d7fea19667bedd40fce4509 (diff) |
Merge branch 'master' into get_logs_and_version
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_async_client.py | 284 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_client.py | 30 | ||||
-rwxr-xr-x[-rw-r--r--] | scripts/automation/trex_control_plane/client/trex_hltapi.py | 297 | ||||
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_port.py | 408 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_stateless_client.py | 1500 |
5 files changed, 2108 insertions, 411 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py new file mode 100644 index 00000000..8fdf7c9b --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -0,0 +1,284 @@ +#!/router/bin/python + +try: + # support import for Python 2 + import outer_packages +except ImportError: + # support import for Python 3 + import client.outer_packages +from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage + +from common.text_opts import * + +import json +import threading +import time +import datetime +import zmq +import re + +from common.trex_stats import * +from common.trex_streams import * + +# basic async stats class +class CTRexAsyncStats(object): + def __init__ (self): + self.ref_point = None + self.current = {} + self.last_update_ts = datetime.datetime.now() + + @staticmethod + def format_num (size, suffix = ""): + + for unit in ['','K','M','G','T','P']: + if abs(size) < 1000.0: + return "%3.2f %s%s" % (size, unit, suffix) + size /= 1000.0 + + return "NaN" + + def update (self, snapshot): + + #update + self.last_update_ts = datetime.datetime.now() + + self.current = snapshot + + if self.ref_point == None: + self.ref_point = self.current + + def clear(self): + self.ref_point = self.current + + + def get(self, field, format=False, suffix=""): + + if not field in self.current: + return "N/A" + + if not format: + return self.current[field] + else: + return self.format_num(self.current[field], suffix) + + def get_rel (self, field, format=False, suffix=""): + if not field in self.current: + return "N/A" + + if not format: + return (self.current[field] - self.ref_point[field]) + else: + return self.format_num(self.current[field] - self.ref_point[field], suffix) + + + # return true if new data has arrived in the past 2 seconds + def is_online (self): + delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000 + return (delta_ms < 2000) + +# describes the general stats provided by TRex +class CTRexAsyncStatsGeneral(CTRexAsyncStats): + def __init__ (self): + super(CTRexAsyncStatsGeneral, self).__init__() + + +# per port stats +class CTRexAsyncStatsPort(CTRexAsyncStats): + def __init__ (self): + super(CTRexAsyncStatsPort, self).__init__() + + def get_stream_stats (self, stream_id): + return None + +# stats manager +class CTRexAsyncStatsManager(): + def __init__ (self): + + self.general_stats = CTRexAsyncStatsGeneral() + self.port_stats = {} + + + def get_general_stats(self): + return self.general_stats + + def get_port_stats (self, port_id): + + if not str(port_id) in self.port_stats: + return None + + return self.port_stats[str(port_id)] + + + def update(self, data): + self.__handle_snapshot(data) + + def __handle_snapshot(self, snapshot): + + general_stats = {} + port_stats = {} + + # filter the values per port and general + for key, value in snapshot.iteritems(): + + # match a pattern of ports + m = re.search('(.*)\-([0-8])', key) + if m: + + port_id = m.group(2) + field_name = m.group(1) + + if not port_id in port_stats: + port_stats[port_id] = {} + + port_stats[port_id][field_name] = value + + else: + # no port match - general stats + general_stats[key] = value + + # update the general object with the snapshot + self.general_stats.update(general_stats) + + # update all ports + for port_id, data in port_stats.iteritems(): + + if not port_id in self.port_stats: + self.port_stats[port_id] = CTRexAsyncStatsPort() + + self.port_stats[port_id].update(data) + + + + + +class CTRexAsyncClient(): + def __init__ (self, server, port, stateless_client): + + self.port = port + self.server = server + self.stateless_client = stateless_client + + self.raw_snapshot = {} + + self.stats = CTRexAsyncStatsManager() + + self.connected = False + + # connects the async channel + def connect (self): + + if self.connected: + self.disconnect() + + self.tr = "tcp://{0}:{1}".format(self.server, self.port) + print "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + + # before running the thread - mark as active + self.active = True + self.alive = False + self.t = threading.Thread(target = self._run) + + # kill this thread on exit and don't add it to the join list + self.t.setDaemon(True) + self.t.start() + + self.connected = True + + + # wait for data streaming from the server + timeout = time.time() + 5 + while not self.alive: + time.sleep(0.01) + if time.time() > timeout: + self.disconnect() + return False, "*** [subscriber] - no data flow from server at : " + self.tr + + return True, "" + + + # disconnect + def disconnect (self): + if not self.connected: + return + + # signal that the context was destroyed (exit the thread loop) + self.context.term() + + # mark for join and join + self.active = False + self.t.join() + + # done + self.connected = False + + # thread function + def _run (self): + + # no data yet... + self.alive = False + + # socket must be created on the same thread + self.socket.connect(self.tr) + self.socket.setsockopt(zmq.SUBSCRIBE, '') + self.socket.setsockopt(zmq.RCVTIMEO, 5000) + + while self.active: + try: + + line = self.socket.recv_string() + + if not self.alive: + self.stateless_client.on_async_alive() + self.alive = True + + # got a timeout - mark as not alive and retry + except zmq.Again: + + if self.alive: + self.stateless_client.on_async_dead() + self.alive = False + + continue + + except zmq.ContextTerminated: + # outside thread signaled us to exit + self.alive = False + break + + msg = json.loads(line) + + name = msg['name'] + data = msg['data'] + type = msg['type'] + self.raw_snapshot[name] = data + + self.__dispatch(name, type, data) + + + # closing of socket must be from the same thread + self.socket.close(linger = 0) + + + def get_stats (self): + return self.stats + + def get_raw_snapshot (self): + return self.raw_snapshot + + # dispatch the message to the right place + def __dispatch (self, name, type, data): + # stats + if name == "trex-global": + self.stateless_client.handle_async_stats_update(data) + # events + elif name == "trex-event": + self.stateless_client.handle_async_event(type, data) + else: + pass + + diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py index 160abdec..77b11c37 100755 --- a/scripts/automation/trex_control_plane/client/trex_client.py +++ b/scripts/automation/trex_control_plane/client/trex_client.py @@ -131,7 +131,9 @@ class CTRexClient(object): raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.') trex_cmd_options.update( {'f' : f, 'd' : d} ) - + if not trex_cmd_options.get('l'): + self.result_obj.latency_checked = False + self.result_obj.clear_results() try: issue_time = time.time() @@ -767,6 +769,7 @@ class CTRexResult(object): """ self._history = deque(maxlen = max_history_size) self.clear_results() + self.latency_checked = True def __repr__(self): return ("Is valid history? {arg}\n".format( arg = self.is_valid_hist() ) + @@ -1032,18 +1035,19 @@ class CTRexResult(object): self._done_warmup = True # handle latency data - latency_pre = "trex-latency" - self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC - # support old typo - if self._max_latency is None: - latency_pre = "trex-latecny" - self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-") - - self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC - self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency) - - avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-") - self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list) + if self.latency_checked: + latency_pre = "trex-latency" + self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC + # support old typo + if self._max_latency is None: + latency_pre = "trex-latecny" + self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-") + + self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC + self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency) + + avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-") + self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list) tx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_tx_pkts") rx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_rx_pkts") diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py index 46c283f8..92768ca4 100644..100755 --- a/scripts/automation/trex_control_plane/client/trex_hltapi.py +++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py @@ -2,16 +2,305 @@ import trex_root_path from client_utils.packet_builder import CTRexPktBuilder +from trex_stateless_client import CTRexStatelessClient +from common.trex_streams import * +from client_utils.general_utils import id_count_gen +import dpkt -print "done!" class CTRexHltApi(object): def __init__(self): - pass + self.trex_client = None + self.connected = False + # self._stream_db = CStreamList() + self._port_data = {} + + # ----- session functions ----- # + + def connect(self, device, port_list, username, port=5050, reset=False, break_locks=False): + ret_dict = {"status": 0} + self.trex_client = CTRexStatelessClient(username, device, port) + res_ok, msg = self.trex_client.connect() + if not res_ok: + self.trex_client = None + ret_dict.update({"log": msg}) + return ret_dict + # arrived here, connection successfully created with server + # next, try acquiring ports of TRex + port_list = self.parse_port_list(port_list) + response = self.trex_client.acquire(port_list, force=break_locks) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if not res_ok: + self.trex_client.disconnect() + self.trex_client = None + ret_dict.update({"log": log}) + # TODO: should revert taken ports?! + return ret_dict + # arrived here, all desired ports were successfully acquired + print log + if reset: + # remove all port traffic configuration from TRex + response = self.trex_client.remove_all_streams(port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if not res_ok: + self.trex_client.disconnect() + self.trex_client = None + ret_dict.update({"log": log}) + return ret_dict + print log + port_handle = {device: {port: port # since only supporting single TRex at the moment, 1:1 map + for port in port_list} + } + ret_dict.update({"status": 1, + "log": None, + "port_handle": port_handle, + "offline": 0}) # ports are online + self.connected = True + self._port_data_init(port_list) + return ret_dict + + def cleanup_session(self, port_list, maintain_lock=False): + ret_dict = {"status": 0} + if not maintain_lock: + # release taken ports + if port_list == "all": + port_list = self.trex_client.get_acquired_ports() + else: + port_list = self.parse_port_list(port_list) + response = self.trex_client.release(port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + print log + if not res_ok: + ret_dict.update({"log": log}) + return ret_dict + ret_dict.update({"status": 1, + "log": None}) + self.trex_client.disconnect() + self.trex_client = None + self.connected = False + return ret_dict + + def interface_config(self, port_handle, mode="config"): + ALLOWED_MODES = ["config", "modify", "destroy"] + if mode not in ALLOWED_MODES: + raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES)) + # pass this function for now... + return {"status": 1, "log": None} + + # ----- traffic functions ----- # + def traffic_config(self, mode, port_handle, + l2_encap="ethernet_ii", mac_src="00:00:01:00:00:01", mac_dst="00:00:00:00:00:00", + l3_protocol="ipv4", ip_src_addr="0.0.0.0", ip_dst_addr="192.0.0.1", l3_length=110, + transmit_mode="continuous", rate_pps=100, + **kwargs): + ALLOWED_MODES = ["create", "modify", "remove", "enable", "disable", "reset"] + if mode not in ALLOWED_MODES: + raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES)) + if mode == "create": + # create a new stream with desired attributes, starting by creating packet + try: + packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst, + l3_protocol, ip_src_addr, ip_dst_addr, l3_length) + # set transmission attributes + tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs) + # set rx_stats + rx_stats = CRxStats() # defaults with disabled + # join the generated data into stream + stream_obj = CStream() + stream_obj_params = {"enabled": True, + "self_start": True, + "next_stream_id": -1, + "isg": 0.0, + "mode": tx_mode, + "rx_stats": rx_stats, + "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed + stream_obj.load_data(**stream_obj_params) + except Exception as e: + # some exception happened during the stream creation + return {"status": 0, "log": str(e)} + # try adding the stream, until free stream_id is found + port_data = self._port_data.get(port_handle) + id_candidate = None + # TODO: change this to better implementation + while True: + id_candidate = port_data["stream_id_gen"].next() + response = self.trex_client.add_stream(stream_id=id_candidate, + stream_obj=stream_obj, + port_id=port_handle) + res_ok, log = CTRexHltApi.process_response(port_handle, response) + if res_ok: + # found non-taken stream_id on server + # save it for modifying needs + port_data["streams"].update({id_candidate: stream_obj}) + break + else: + # proceed to another iteration to use another id + continue + return {"status": 1, + "stream_id": id_candidate, + "log": None} + else: + raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode)) + + def traffic_control(self, action, port_handle): + ALLOWED_ACTIONS = ["clear_stats", "run", "stop", "sync_run"] + if action not in ALLOWED_ACTIONS: + raise ValueError("action must be one of the following values: {actions}".format(actions=ALLOWED_ACTIONS)) + # ret_dict = {"status": 0, "stopped": 1} + port_list = self.parse_port_list(port_handle) + if action == "run": + response = self.trex_client.start_traffic(port_id=port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if res_ok: + return {"status": 1, + "stopped": 0, + "log": None} + elif action == "stop": + response = self.trex_client.stop_traffic(port_id=port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if res_ok: + return {"status": 1, + "stopped": 1, + "log": None} + else: + raise NotImplementedError("action '{0}' is not supported yet on TRex".format(action)) + + # if we arrived here, this means that operation FAILED! + return {"status": 0, + "log": log} + + + def traffic_stats(self, port_handle, mode): + ALLOWED_MODES = ["aggregate", "streams", "all"] + if mode not in ALLOWED_MODES: + raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES)) + # pass this function for now... + if mode == "aggregate": + # create a new stream with desired attributes, starting by creating packet + try: + packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst, + l3_protocol, ip_src_addr, ip_dst_addr, l3_length) + # set transmission attributes + tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs) + # set rx_stats + rx_stats = CRxStats() # defaults with disabled + # join the generated data into stream + stream_obj = CStream() + stream_obj_params = {"enabled": True, + "self_start": True, + "next_stream_id": -1, + "isg": 0.0, + "mode": tx_mode, + "rx_stats": rx_stats, + "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed + stream_obj.load_data(**stream_obj_params) + except Exception as e: + # some exception happened during the stream creation + return {"status": 0, "log": str(e)} + # try adding the stream, until free stream_id is found + port_data = self._port_data.get(port_handle) + id_candidate = None + # TODO: change this to better implementation + while True: + id_candidate = port_data["stream_id_gen"].next() + response = self.trex_client.add_stream(stream_id=id_candidate, + stream_obj=stream_obj, + port_id=port_handle) + res_ok, log = CTRexHltApi.process_response(port_handle, response) + if res_ok: + # found non-taken stream_id on server + # save it for modifying needs + port_data["streams"].update({id_candidate: stream_obj}) + break + else: + # proceed to another iteration to use another id + continue + return {"status": 1, + "stream_id": id_candidate, + "log": None} + else: + raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode)) + + def get_aggregate_port_stats(self, port_handle): + return self.traffic_stats(port_handle, mode="aggregate") + + def get_stream_stats(self, port_handle): + return self.traffic_stats(port_handle, mode="streams") + + # ----- internal functions ----- # + def _port_data_init(self, port_list): + for port in port_list: + self._port_data[port] = {"stream_id_gen": id_count_gen(), + "streams": {}} + + @staticmethod + def process_response(port_list, response): + if isinstance(port_list, list): + res_ok, response = response + log = CTRexHltApi.join_batch_response(response) + else: + res_ok = response.success + log = str(response) + return res_ok, log + + @staticmethod + def parse_port_list(port_list): + if isinstance(port_list, str): + return [int(port) + for port in port_list.split()] + elif isinstance(port_list, list): + return [int(port) + for port in port_list] + else: + return port_list + + @staticmethod + def join_batch_response(responses): + return "\n".join([str(response) + for response in responses]) + + @staticmethod + def generate_stream(l2_encap, mac_src, mac_dst, + l3_protocol, ip_src_addr, ip_dst_addr, l3_length): + ALLOWED_L3_PROTOCOL = {"ipv4": dpkt.ethernet.ETH_TYPE_IP, + "ipv6": dpkt.ethernet.ETH_TYPE_IP6, + "arp": dpkt.ethernet.ETH_TYPE_ARP} + ALLOWED_L4_PROTOCOL = {"tcp": dpkt.ip.IP_PROTO_TCP, + "udp": dpkt.ip.IP_PROTO_UDP, + "icmp": dpkt.ip.IP_PROTO_ICMP, + "icmpv6": dpkt.ip.IP_PROTO_ICMP6, + "igmp": dpkt.ip.IP_PROTO_IGMP, + "rtp": dpkt.ip.IP_PROTO_IRTP, + "isis": dpkt.ip.IP_PROTO_ISIS, + "ospf": dpkt.ip.IP_PROTO_OSPF} + + pkt_bld = CTRexPktBuilder() + if l2_encap == "ethernet_ii": + pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet()) + # set Ethernet layer attributes + pkt_bld.set_eth_layer_addr("l2", "src", mac_src) + pkt_bld.set_eth_layer_addr("l2", "dst", mac_dst) + else: + raise NotImplementedError("l2_encap does not support the desired encapsulation '{0}'".format(l2_encap)) + # set l3 on l2 + if l3_protocol not in ALLOWED_L3_PROTOCOL: + raise ValueError("l3_protocol must be one of the following: {0}".format(ALLOWED_L3_PROTOCOL)) + pkt_bld.set_layer_attr("l2", "type", ALLOWED_L3_PROTOCOL[l3_protocol]) + + # set l3 attributes + if l3_protocol == "ipv4": + pkt_bld.add_pkt_layer("l3", dpkt.ip.IP()) + pkt_bld.set_ip_layer_addr("l3", "src", ip_src_addr) + pkt_bld.set_ip_layer_addr("l3", "dst", ip_dst_addr) + pkt_bld.set_layer_attr("l3", "len", l3_length) + else: + raise NotImplementedError("l3_protocol '{0}' is not supported by TRex yet.".format(l3_protocol)) + + pkt_bld.dump_pkt_to_pcap("stream_test.pcap") + return pkt_bld + - def config_traffic(self): - pass if __name__ == "__main__": pass diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py new file mode 100644 index 00000000..5c5702dd --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -0,0 +1,408 @@ + +from collections import namedtuple +from common.trex_types import * +from common import trex_stats + +# describes a single port +class Port(object): + STATE_DOWN = 0 + STATE_IDLE = 1 + STATE_STREAMS = 2 + STATE_TX = 3 + STATE_PAUSE = 4 + PortState = namedtuple('PortState', ['state_id', 'state_name']) + STATES_MAP = {STATE_DOWN: "DOWN", + STATE_IDLE: "IDLE", + STATE_STREAMS: "IDLE", + STATE_TX: "ACTIVE", + STATE_PAUSE: "PAUSE"} + + + def __init__ (self, port_id, speed, driver, user, comm_link): + self.port_id = port_id + self.state = self.STATE_IDLE + self.handler = None + self.comm_link = comm_link + self.transmit = comm_link.transmit + self.transmit_batch = comm_link.transmit_batch + self.user = user + self.driver = driver + self.speed = speed + self.streams = {} + self.profile = None + + self.port_stats = trex_stats.CPortStats(self) + + + def err(self, msg): + return RC_ERR("port {0} : {1}".format(self.port_id, msg)) + + def ok(self, data = "ACK"): + return RC_OK(data) + + def get_speed_bps (self): + return (self.speed * 1000 * 1000 * 1000) + + # take the port + def acquire(self, force = False): + params = {"port_id": self.port_id, + "user": self.user, + "force": force} + + command = RpcCmdData("acquire", params) + rc = self.transmit(command.method, command.params) + if rc.success: + self.handler = rc.data + return self.ok() + else: + return self.err(rc.data) + + # release the port + def release(self): + params = {"port_id": self.port_id, + "handler": self.handler} + + command = RpcCmdData("release", params) + rc = self.transmit(command.method, command.params) + self.handler = None + + if rc.success: + return self.ok() + else: + return self.err(rc.data) + + def is_acquired(self): + return (self.handler != None) + + def is_active(self): + return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) + + def is_transmitting (self): + return (self.state == self.STATE_TX) + + def is_paused (self): + return (self.state == self.STATE_PAUSE) + + + def sync(self): + params = {"port_id": self.port_id} + + command = RpcCmdData("get_port_status", params) + rc = self.transmit(command.method, command.params) + if not rc.success: + return self.err(rc.data) + + # sync the port + port_state = rc.data['state'] + + if port_state == "DOWN": + self.state = self.STATE_DOWN + elif port_state == "IDLE": + self.state = self.STATE_IDLE + elif port_state == "STREAMS": + self.state = self.STATE_STREAMS + elif port_state == "TX": + self.state = self.STATE_TX + elif port_state == "PAUSE": + self.state = self.STATE_PAUSE + else: + raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) + + return self.ok() + + + # return TRUE if write commands + def is_port_writable (self): + # operations on port can be done on state idle or state streams + return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) + + # add stream to the port + def add_stream (self, stream_id, stream_obj): + + if not self.is_port_writable(): + return self.err("Please stop port before attempting to add streams") + + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id, + "stream": stream_obj} + + rc, data = self.transmit("add_stream", params) + if not rc: + r = self.err(data) + print r.good() + + # add the stream + self.streams[stream_id] = stream_obj + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # add multiple streams + def add_streams (self, streams_list): + batch = [] + + for stream in streams_list: + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream.stream_id, + "stream": stream.stream} + + cmd = RpcCmdData('add_stream', params) + batch.append(cmd) + + rc, data = self.transmit_batch(batch) + + if not rc: + return self.err(data) + + # add the stream + for stream in streams_list: + self.streams[stream.stream_id] = stream.stream + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # remove stream from port + def remove_stream (self, stream_id): + + if not stream_id in self.streams: + return self.err("stream {0} does not exists".format(stream_id)) + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id} + + + rc, data = self.transmit("remove_stream", params) + if not rc: + return self.err(data) + + self.streams[stream_id] = None + + self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE + + return self.ok() + + # remove all the streams + def remove_all_streams (self): + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("remove_all_streams", params) + if not rc: + return self.err(data) + + self.streams = {} + + self.state = self.STATE_IDLE + + return self.ok() + + # get a specific stream + def get_stream (self, stream_id): + if stream_id in self.streams: + return self.streams[stream_id] + else: + return None + + def get_all_streams (self): + return self.streams + + # start traffic + def start (self, mul, duration): + if self.state == self.STATE_DOWN: + return self.err("Unable to start traffic - port is down") + + if self.state == self.STATE_IDLE: + return self.err("Unable to start traffic - no streams attached to port") + + if self.state == self.STATE_TX: + return self.err("Unable to start traffic - port is already transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul, + "duration": duration} + + rc, data = self.transmit("start_traffic", params) + if not rc: + return self.err(data) + + self.state = self.STATE_TX + + return self.ok() + + # stop traffic + # with force ignores the cached state and sends the command + def stop (self, force = False): + + if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("stop_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_STREAMS + + return self.ok() + + def pause (self): + + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("pause_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_PAUSE + + return self.ok() + + + def resume (self): + + if (self.state != self.STATE_PAUSE) : + return self.err("port is not in pause mode") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("resume_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_TX + + return self.ok() + + + def update (self, mul): + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul} + + rc, data = self.transmit("update_traffic", params) + if not rc: + return self.err(data) + + return self.ok() + + + def validate (self): + + if (self.state == self.STATE_DOWN): + return self.err("port is down") + + if (self.state == self.STATE_IDLE): + return self.err("no streams attached to port") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("validate", params) + if not rc: + return self.err(data) + + self.profile = data + + return self.ok() + + def get_profile (self): + return self.profile + + + def print_profile (self, mult, duration): + if not self.get_profile(): + return + + rate = self.get_profile()['rate'] + graph = self.get_profile()['graph'] + + print format_text("Profile Map Per Port\n", 'underline', 'bold') + + factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util']) + + print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"), + format_num(rate['max_bps'] * factor, suffix = "bps")) + + print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"), + format_num(rate['max_pps'] * factor, suffix = "pps"),) + + print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100), + format_percentage(rate['max_line_util'] * factor * 100)) + + + # duration + exp_time_base_sec = graph['expected_duration'] / (1000 * 1000) + exp_time_factor_sec = exp_time_base_sec / factor + + # user configured a duration + if duration > 0: + if exp_time_factor_sec > 0: + exp_time_factor_sec = min(exp_time_factor_sec, duration) + else: + exp_time_factor_sec = duration + + + print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec), + format_time(exp_time_factor_sec)) + print "\n" + + + def get_port_state_name(self): + return self.STATES_MAP.get(self.state, "Unknown") + + ################# stats handler ###################### + def generate_port_stats(self): + return self.port_stats.generate_stats() + pass + + def generate_port_status(self): + return {"port-type": self.driver, + "maximum": "{speed} Gb/s".format(speed=self.speed), + "port-status": self.get_port_state_name() + } + + def clear_stats(self): + return self.port_stats.clear_stats() + + + ################# events handler ###################### + def async_event_port_stopped (self): + self.state = self.STATE_STREAMS + + + def async_event_port_started (self): + self.state = self.STATE_TX + + + def async_event_port_paused (self): + self.state = self.STATE_PAUSE + + + def async_event_port_resumed (self): + self.state = self.STATE_TX + + def async_event_forced_acquired (self): + self.handler = None diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 334496d1..a2b1f6d9 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -6,324 +6,1175 @@ try: except ImportError: # support import for Python 3 import client.outer_packages + from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage from client_utils.packet_builder import CTRexPktBuilder import json -from common.trex_stats import * + +from common.trex_streams import * from collections import namedtuple +from common.text_opts import * +from common import trex_stats +from client_utils import parsing_opts, text_tables +import time +import datetime +import re +import random +from trex_port import Port +from common.trex_types import * + +from trex_async_client import CTRexAsyncClient + + +########## utlity ############ +def mult_to_factor (mult, max_bps, max_pps, line_util): + if mult['type'] == 'raw': + return mult['value'] + + if mult['type'] == 'bps': + return mult['value'] / max_bps + + if mult['type'] == 'pps': + return mult['value'] / max_pps + + if mult['type'] == 'percentage': + return mult['value'] / line_util + class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" - RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) - def __init__(self, username, server="localhost", port=5050, virtual=False): + # verbose levels + VERBOSE_SILENCE = 0 + VERBOSE_REGULAR = 1 + VERBOSE_HIGH = 2 + + def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False): super(CTRexStatelessClient, self).__init__() + self.user = username - self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) - self._conn_handler = {} - self._active_ports = set() - self._stats = CTRexStatsManager("port", "stream") - self._system_info = None - - # ----- decorator methods ----- # - def force_status(owned=True, active_and_owned=False): - def wrapper(func): - def wrapper_f(self, *args, **kwargs): - port_ids = kwargs.get("port_id") - if isinstance(port_ids, int): - # make sure port_ids is a list - port_ids = [port_ids] - bad_ids = set() - for port_id in port_ids: - port_owned = self._conn_handler.get(kwargs.get(port_id)) - if owned and not port_owned: - bad_ids.add(port_ids) - elif active_and_owned: # stronger condition than just owned, hence gets precedence - if port_owned and port_id in self._active_ports: - continue - else: - bad_ids.add(port_ids) - else: - continue - if bad_ids: - # Some port IDs are not according to desires status - raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" - "at allowed stated".format(func.__name__)) + + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) + + # default verbose level + self.verbose = self.VERBOSE_REGULAR + + self.ports = {} + self._connection_info = {"server": server, + "sync_port": sync_port, + "async_port": async_port} + self.system_info = {} + self.server_version = {} + self.__err_log = None + + self.async_client = CTRexAsyncClient(server, async_port, self) + + self.streams_db = CStreamsDB() + self.global_stats = trex_stats.CGlobalStats(self._connection_info, + self.server_version, + self.ports) + self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats, + self.ports) + + self.events = [] + + + self.read_only = False + self.connected = False + + + + # returns the port object + def get_port (self, port_id): + return self.ports.get(port_id, None) + + + def get_server (self): + return self.comm_link.get_server() + + ################# events handler ###################### + def add_event_log (self, msg, ev_type, show = False): + + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" + + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) + + if show and self.check_verbose(self.VERBOSE_REGULAR): + print format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))) + + + def handle_async_stats_update(self, dump_data): + global_stats = {} + port_stats = {} + + # filter the values per port and general + for key, value in dump_data.iteritems(): + # match a pattern of ports + m = re.search('(.*)\-([0-8])', key) + if m: + port_id = int(m.group(2)) + field_name = m.group(1) + if self.ports.has_key(port_id): + if not port_id in port_stats: + port_stats[port_id] = {} + port_stats[port_id][field_name] = value else: - func(self, *args, **kwargs) - return wrapper_f - return wrapper + continue + else: + # no port match - general stats + global_stats[key] = value - @property - def system_info(self): - if not self._system_info: - self._system_info = self.get_system_info() - return self._system_info + # update the general object with the snapshot + self.global_stats.update(global_stats) + # update all ports + for port_id, data in port_stats.iteritems(): + self.ports[port_id].port_stats.update(data) - # ----- user-access methods ----- # - def ping(self): - return self.transmit("ping") + + + def handle_async_event (self, type, data): + # DP stopped + + show_event = False + + # port started + if (type == 0): + port_id = int(data['port_id']) + ev = "Port {0} has started".format(port_id) + self.async_event_port_started(port_id) + + # port stopped + elif (type == 1): + port_id = int(data['port_id']) + ev = "Port {0} has stopped".format(port_id) + + # call the handler + self.async_event_port_stopped(port_id) + + + # port paused + elif (type == 2): + port_id = int(data['port_id']) + ev = "Port {0} has paused".format(port_id) + + # call the handler + self.async_event_port_paused(port_id) + + # port resumed + elif (type == 3): + port_id = int(data['port_id']) + ev = "Port {0} has resumed".format(port_id) + + # call the handler + self.async_event_port_resumed(port_id) + + # port finished traffic + elif (type == 4): + port_id = int(data['port_id']) + ev = "Port {0} job done".format(port_id) + + # call the handler + self.async_event_port_stopped(port_id) + show_event = True + + # port was stolen... + elif (type == 5): + port_id = int(data['port_id']) + ev = "Port {0} was forcely taken".format(port_id) + + # call the handler + self.async_event_port_forced_acquired(port_id) + show_event = True + + # server stopped + elif (type == 100): + ev = "Server has stopped" + self.async_event_server_stopped() + show_event = True + + + else: + # unknown event - ignore + return + + + self.add_event_log(ev, 'server', show_event) + + + def async_event_port_stopped (self, port_id): + self.ports[port_id].async_event_port_stopped() + + + def async_event_port_started (self, port_id): + self.ports[port_id].async_event_port_started() + + + def async_event_port_paused (self, port_id): + self.ports[port_id].async_event_port_paused() + + + def async_event_port_resumed (self, port_id): + self.ports[port_id].async_event_port_resumed() + + + def async_event_port_forced_acquired (self, port_id): + self.ports[port_id].async_event_forced_acquired() + self.read_only = True + + def async_event_server_stopped (self): + self.connected = False + + + def get_events (self): + return self.events + + def clear_events (self): + self.events = [] + + ############# helper functions section ############## + + # measure time for functions + def timing(f): + def wrap(*args): + time1 = time.time() + ret = f(*args) + + # don't want to print on error + if ret.bad(): + return ret + + delta = time.time() - time1 + print format_time(delta) + "\n" + + return ret + + return wrap + + + def validate_port_list(self, port_id_list): + if not isinstance(port_id_list, list): + print type(port_id_list) + return False + + # check each item of the sequence + return all([ (port_id >= 0) and (port_id < self.get_port_count()) + for port_id in port_id_list ]) + + # some preprocessing for port argument + def __ports (self, port_id_list): + + # none means all + if port_id_list == None: + return range(0, self.get_port_count()) + + # always list + if isinstance(port_id_list, int): + port_id_list = [port_id_list] + + if not isinstance(port_id_list, list): + raise ValueError("bad port id list: {0}".format(port_id_list)) + + for port_id in port_id_list: + if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): + raise ValueError("bad port id {0}".format(port_id)) + + return port_id_list + + ############ boot up section ################ + + # connection sequence + + # mode can be RW - read / write, RWF - read write with force , RO - read only + def connect(self, mode = "RW"): + + if self.is_connected(): + self.disconnect() + + # clear this flag + self.connected = False + + # connect sync channel + rc, data = self.comm_link.connect() + if not rc: + return RC_ERR(data) + + # connect async channel + rc, data = self.async_client.connect() + if not rc: + return RC_ERR(data) + + # version + rc, data = self.transmit("get_version") + if not rc: + return RC_ERR(data) + + self.server_version = data + self.global_stats.server_version = data + + # cache system info + rc, data = self.transmit("get_system_info") + if not rc: + return RC_ERR(data) + self.system_info = data + + # cache supported commands + rc, data = self.transmit("get_supported_cmds") + if not rc: + return RC_ERR(data) + + self.supported_cmds = data + + # create ports + for port_id in xrange(self.get_port_count()): + speed = self.system_info['ports'][port_id]['speed'] + driver = self.system_info['ports'][port_id]['driver'] + + self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link) + + + # sync the ports + rc = self.sync_ports() + if rc.bad(): + return rc + + # acquire all ports + if mode == "RW": + rc = self.acquire(force = False) + + # fallback to read only if failed + if rc.bad(): + rc.annotate(show_status = False) + print format_text("Switching to read only mode - only few commands will be available", 'bold') + + self.release(self.get_acquired_ports()) + self.read_only = True + else: + self.read_only = False + + elif mode == "RWF": + rc = self.acquire(force = True) + if rc.bad(): + return rc + self.read_only = False + + elif mode == "RO": + # no acquire on read only + rc = RC_OK() + self.read_only = True + + + + self.connected = True + return RC_OK() + + + def is_read_only (self): + return self.read_only + + def is_connected (self): + return self.connected and self.comm_link.is_connected + + + def disconnect(self): + # release any previous acquired ports + if self.is_connected(): + self.release(self.get_acquired_ports()) + + self.comm_link.disconnect() + self.async_client.disconnect() + + self.connected = False + + return RC_OK() + + + def on_async_dead (self): + if self.connected: + msg = 'lost connection to server' + self.add_event_log(msg, 'local', True) + self.connected = False + + def on_async_alive (self): + pass + + ########### cached queries (no server traffic) ########### def get_supported_cmds(self): - return self.transmit("get_supported_cmds") + return self.supported_cmds def get_version(self): - return self.transmit("get_version") + return self.server_version def get_system_info(self): - return self.transmit("get_system_info") + return self.system_info def get_port_count(self): return self.system_info.get("port_count") - def acquire(self, port_id, force=False): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_acquire_response) - else: - params = {"port_id": port_id, - "user": self.user, - "force": force} - command = self.RpcCmdData("acquire", params) - self._handle_acquire_response(command, self.transmit(command.method, command.params)) - return self._conn_handler.get(port_id) - - @force_status(owned=True) - def release(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_release_response) + def get_port_ids(self, as_str=False): + port_ids = range(self.get_port_count()) + if as_str: + return " ".join(str(p) for p in port_ids) else: - self._conn_handler.pop(port_id) - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = self.RpcCmdData("release", params) - self._handle_release_response(command, self.transmit(command.method, command.params)) - return + return port_ids - @force_status(owned=True) - def add_stream(self, stream_id, stream_obj, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - assert isinstance(stream_obj, CStream) - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id, - "stream": stream_obj.dump()} - return self.transmit("add_stream", params) - - @force_status(owned=True) - def remove_stream(self, stream_id, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id} - return self.transmit("remove_stream", params) - - @force_status(owned=True, active_and_owned=True) - def get_stream_id_list(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("get_stream_list", params) - - @force_status(owned=True, active_and_owned=True) - def get_stream(self, stream_id, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id} - return self.transmit("get_stream_list", params) - - @force_status(owned=True) - def start_traffic(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_start_traffic_response) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = self.RpcCmdData("start_traffic", params) - self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) - return + def get_stats_async (self): + return self.async_client.get_stats() + + def get_connection_port (self): + return self.comm_link.port + + def get_connection_ip (self): + return self.comm_link.server - @force_status(owned=False, active_and_owned=True) - def stop_traffic(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response) + def get_all_ports (self): + return [port_id for port_id, port_obj in self.ports.iteritems()] + + def get_acquired_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_acquired()] + + def get_active_ports(self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_active()] + + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + + def set_verbose(self, mode): + + # on high - enable link verbose + if mode == self.VERBOSE_HIGH: + self.comm_link.set_verbose(True) else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = self.RpcCmdData("stop_traffic", params) - self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) - return + self.comm_link.set_verbose(False) + + self.verbose = mode + + + def check_verbose (self, level): + return (self.verbose >= level) + + def get_verbose (self): + return self.verbose + + ############# server actions ################ + + # ping server + def ping(self): + rc, info = self.transmit("ping") + return RC(rc, info) + + def get_global_stats(self): - command = self.RpcCmdData("get_global_stats", {}) - return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) - # return self.transmit("get_global_stats") + rc, info = self.transmit("get_global_stats") + return RC(rc, info) + + + ########## port commands ############## + def sync_ports (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].sync()) + + return rc + + # acquire ports, if port_list is none - get all + def acquire (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].acquire(force)) + + return rc + + # release ports + def release (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].release()) + + return rc + + + def add_stream(self, stream_id, stream_obj, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) + + return rc + + + + def add_stream_pack(self, stream_pack_list, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].add_streams(stream_pack_list)) + + return rc + + + + def remove_stream(self, stream_id, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_stream(stream_id)) + + return rc + + + + def remove_all_streams(self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_all_streams()) + + return rc + + + def get_stream(self, stream_id, port_id, get_pkt = False): + + return self.ports[port_id].get_stream(stream_id) + + + def get_all_streams(self, port_id, get_pkt = False): + + return self.ports[port_id].get_all_streams() + + + def get_stream_id_list(self, port_id): + + return self.ports[port_id].get_stream_id_list() + + + def start_traffic (self, multiplier, duration, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].start(multiplier, duration)) + + return rc + + + def resume_traffic (self, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].resume()) + + return rc + + def pause_traffic (self, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].pause()) + + return rc + + def stop_traffic (self, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].stop(force)) + + return rc + + + def update_traffic (self, mult, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].update(mult)) + + return rc + + + def validate (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].validate()) + + return rc + - @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = self.RpcCmdData("get_port_stats", params) - return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params)) + pass - @force_status(owned=True, active_and_owned=True) def get_stream_stats(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = self.RpcCmdData("get_stream_stats", params) - return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params)) + pass + - # ----- internal methods ----- # def transmit(self, method_name, params={}): - return self.tx_link.transmit(method_name, params) + return self.comm_link.transmit(method_name, params) + def transmit_batch(self, batch_list): - return self.tx_link.transmit_batch(batch_list) + return self.comm_link.transmit_batch(batch_list) - @staticmethod - def _object_decoder(obj_type, obj_data): - if obj_type == "global": - return CGlobalStats(**obj_data) - elif obj_type == "port": - return CPortStats(**obj_data) - elif obj_type == "stream": - return CStreamStats(**obj_data) - else: - # Do not serialize the data into class - return obj_data + ######################### Console (high level) API ######################### - @staticmethod - def default_success_test(result_obj): - if result_obj.success: - return True - else: - return False + @timing + def cmd_ping(self): + rc = self.ping() + rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) + return rc - # ----- handler internal methods ----- # - def _handle_acquire_response(self, request, response): - if response.success: - self._conn_handler[request.get("port_id")] = response.data + def cmd_connect(self, mode = "RW"): + rc = self.connect(mode) + rc.annotate() + return rc - def _handle_release_response(self, request, response): - if response.success: - del self._conn_handler[request.get("port_id")] + def cmd_disconnect(self): + rc = self.disconnect() + rc.annotate() + return rc - def _handle_start_traffic_response(self, request, response): - if response.success: - self._active_ports.add(request.get("port_id")) + # reset + def cmd_reset(self): - def _handle_stop_traffic_response(self, request, response): - if response.success: - self._active_ports.remove(request.get("port_id")) - def _handle_get_global_stats_response(self, request, response): - if response.success: - return CGlobalStats(**response.success) - else: - return False + rc = self.acquire(force = True) + rc.annotate("Force acquiring all ports:") + if rc.bad(): + return rc + + + # force stop all ports + rc = self.stop_traffic(self.get_port_ids(), True) + rc.annotate("Stop traffic on all ports:") + if rc.bad(): + return rc + + + # remove all streams + rc = self.remove_all_streams(self.get_port_ids()) + rc.annotate("Removing all streams from all ports:") + if rc.bad(): + return rc + + # TODO: clear stats + return RC_OK() + + + # stop cmd + def cmd_stop (self, port_id_list): + + # find the relveant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on provided ports" + print format_text(msg, 'bold') + return RC_ERR(msg) - def _handle_get_port_stats_response(self, request, response): - if response.success: - return CPortStats(**response.success) + rc = self.stop_traffic(active_ports) + rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc + + return RC_OK() + + # update cmd + def cmd_update (self, port_id_list, mult): + + # find the relevant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on provided ports" + print format_text(msg, 'bold') + return RC_ERR(msg) + + rc = self.update_traffic(mult, active_ports) + rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list)) + + return rc + + # clear stats + def cmd_clear(self, port_id_list): + + for port_id in port_id_list: + self.ports[port_id].clear_stats() + + self.global_stats.clear_stats() + + return RC_OK() + + + # pause cmd + def cmd_pause (self, port_id_list): + + # find the relevant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on provided ports" + print format_text(msg, 'bold') + return RC_ERR(msg) + + rc = self.pause_traffic(active_ports) + rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list)) + return rc + + + + # resume cmd + def cmd_resume (self, port_id_list): + + # find the relveant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on porvided ports" + print format_text(msg, 'bold') + return RC_ERR(msg) + + rc = self.resume_traffic(active_ports) + rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list)) + return rc + + + # start cmd + def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): + + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if active_ports: + if not force: + msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) + print format_text(msg, 'bold') + return RC_ERR(msg) + else: + rc = self.cmd_stop(active_ports) + if not rc: + return rc + + + rc = self.remove_all_streams(port_id_list) + rc.annotate("Removing all streams from port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc + + + rc = self.add_stream_pack(stream_list.compiled, port_id_list) + rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) + if rc.bad(): + return rc + + # when not on dry - start the traffic , otherwise validate only + if not dry: + rc = self.start_traffic(mult, duration, port_id_list) + rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) + + return rc else: - return False + rc = self.validate(port_id_list) + rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list)) + + if rc.bad(): + return rc + + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) + + return rc + + + # validate port(s) profile + def cmd_validate (self, port_id_list): + rc = self.validate(port_id_list) + rc.annotate("Validating streams on port(s) {0}:".format(port_id_list)) + return rc + + + # stats + def cmd_stats(self, port_id_list, stats_mask=set()): + stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) + + stats_obj = {} + for stats_type in stats_opts: + stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type)) + return stats_obj + + + ############## High Level API With Parser ################ + + def cmd_connect_line (self, line): + '''Connects to the TRex server''' + # define a parser + parser = parsing_opts.gen_parser(self, + "connect", + self.cmd_connect_line.__doc__, + parsing_opts.FORCE) - def _handle_get_stream_stats_response(self, request, response): - if response.success: - return CStreamStats(**response.success) + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + if opts.force: + rc = self.cmd_connect(mode = "RWF") else: - return False + rc = self.cmd_connect(mode = "RW") + + @timing + def cmd_start_line (self, line): + '''Start selected traffic in specified ports on TRex\n''' + # define a parser + parser = parsing_opts.gen_parser(self, + "start", + self.cmd_start_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL, + parsing_opts.TOTAL, + parsing_opts.FORCE, + parsing_opts.STREAM_FROM_PATH_OR_FILE, + parsing_opts.DURATION, + parsing_opts.MULTIPLIER_STRICT, + parsing_opts.DRY_RUN) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + + if opts.dry: + print format_text("\n*** DRY RUN ***", 'bold') + + if opts.db: + stream_list = self.streams_db.get_stream_pack(opts.db) + rc = RC(stream_list != None) + rc.annotate("Load stream pack (from DB):") + if rc.bad(): + return RC_ERR("Failed to load stream pack") - def _is_ports_valid(self, port_id): - if isinstance(port_id, list) or isinstance(port_id, set): - # check each item of the sequence - return all([self._is_ports_valid(port) - for port in port_id]) - elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()): - return True else: - return False + # load streams from file + stream_list = self.streams_db.load_yaml_file(opts.file[0]) + rc = RC(stream_list != None) + rc.annotate("Load stream pack (from file):") + if stream_list == None: + return RC_ERR("Failed to load stream pack") + + + # total has no meaning with percentage - its linear + if opts.total and (opts.mult['type'] != 'percentage'): + # if total was set - divide it between the ports + opts.mult['value'] = opts.mult['value'] / len(opts.ports) + + return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry) + + @timing + def cmd_resume_line (self, line): + '''Resume active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "resume", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line parameters") + + return self.cmd_resume(opts.ports) + + + @timing + def cmd_stop_line (self, line): + '''Stop active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "stop", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line parameters") + + return self.cmd_stop(opts.ports) + + + @timing + def cmd_pause_line (self, line): + '''Pause active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "pause", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line parameters") + + return self.cmd_pause(opts.ports) + + + @timing + def cmd_update_line (self, line): + '''Update port(s) speed currently active\n''' + parser = parsing_opts.gen_parser(self, + "update", + self.cmd_update_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL, + parsing_opts.MULTIPLIER, + parsing_opts.TOTAL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + # total has no meaning with percentage - its linear + if opts.total and (opts.mult['type'] != 'percentage'): + # if total was set - divide it between the ports + opts.mult['value'] = opts.mult['value'] / len(opts.ports) + + return self.cmd_update(opts.ports, opts.mult) + + @timing + def cmd_reset_line (self, line): + return self.cmd_reset() + + + def cmd_clear_line (self, line): + '''Clear cached local statistics\n''' + # define a parser + parser = parsing_opts.gen_parser(self, + "clear", + self.cmd_clear_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + return self.cmd_clear(opts.ports) + + + def cmd_stats_line (self, line): + '''Fetch statistics from TRex server by port\n''' + # define a parser + parser = parsing_opts.gen_parser(self, + "stats", + self.cmd_stats_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL, + parsing_opts.STATS_MASK) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + # determine stats mask + mask = self._get_mask_keys(**self._filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS)) + if not mask: + # set to show all stats if no filter was given + mask = trex_stats.ALL_STATS_OPTS + + stats = self.cmd_stats(opts.ports, mask) + + # print stats to screen + for stat_type, stat_data in stats.iteritems(): + text_tables.print_table_with_header(stat_data.text_table, stat_type) + + + return RC_OK() + + + + @timing + def cmd_validate_line (self, line): + '''validates port(s) stream configuration\n''' + + parser = parsing_opts.gen_parser(self, + "validate", + self.cmd_validate_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + rc = self.cmd_validate(opts.ports) + return rc + + + def cmd_exit_line (self, line): + print format_text("Exiting\n", 'bold') + # a way to exit + return RC_ERR("exit") + - def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): - for i, response in enumerate(resp_list): - # testing each result with success test so that a conclusion report could be deployed in future. - if success_test(response): - # run handler method with its params - handler_func(req_list[i], response) + def cmd_wait_line (self, line): + '''wait for a period of time\n''' + + parser = parsing_opts.gen_parser(self, + "wait", + self.cmd_wait_line.__doc__, + parsing_opts.DURATION) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line parameters") + + delay_sec = opts.duration if (opts.duration > 0) else 1 + + print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold') + time.sleep(delay_sec) + + return RC_OK() + + # run a script of commands + def run_script_file (self, filename): + + print format_text("\nRunning script file '{0}'...".format(filename), 'bold') + + rc = self.cmd_connect() + if rc.bad(): + return + + with open(filename) as f: + script_lines = f.readlines() + + cmd_table = {} + + # register all the commands + cmd_table['start'] = self.cmd_start_line + cmd_table['stop'] = self.cmd_stop_line + cmd_table['reset'] = self.cmd_reset_line + cmd_table['wait'] = self.cmd_wait_line + cmd_table['exit'] = self.cmd_exit_line + + for index, line in enumerate(script_lines, start = 1): + line = line.strip() + if line == "": + continue + if line.startswith("#"): + continue + + sp = line.split(' ', 1) + cmd = sp[0] + if len(sp) == 2: + args = sp[1] else: - continue # TODO: mark in this case somehow the bad result + args = "" + + print format_text("Executing line {0} : '{1}'\n".format(index, line)) + if not cmd in cmd_table: + print "\n*** Error at line {0} : '{1}'\n".format(index, line) + print format_text("unknown command '{0}'\n".format(cmd), 'bold') + return False + rc = cmd_table[cmd](args) + if rc.bad(): + return False + + print format_text("\n[Done]", 'bold') + + return True + + + ################################# + # ------ private methods ------ # + @staticmethod + def _get_mask_keys(ok_values={True}, **kwargs): + masked_keys = set() + for key, val in kwargs.iteritems(): + if val in ok_values: + masked_keys.add(key) + return masked_keys + + @staticmethod + def _filter_namespace_args(namespace, ok_values): + return {k: v for k, v in namespace.__dict__.items() if k in ok_values} + + + ################################# # ------ private classes ------ # - class CTxLink(object): + class CCommLink(object): """describes the connectivity of the stateless client method""" def __init__(self, server="localhost", port=5050, virtual=False): - super(CTRexStatelessClient.CTxLink, self).__init__() + super(CTRexStatelessClient.CCommLink, self).__init__() self.virtual = virtual self.server = server self.port = port + self.verbose = False self.rpc_link = JsonRpcClient(self.server, self.port) + + @property + def is_connected(self): + if not self.virtual: + return self.rpc_link.connected + else: + return True + + def get_server (self): + return self.server + + def set_verbose(self, mode): + self.verbose = mode + return self.rpc_link.set_verbose(mode) + + def connect(self): + if not self.virtual: + return self.rpc_link.connect() + + def disconnect(self): if not self.virtual: - self.rpc_link.connect() + return self.rpc_link.disconnect() def transmit(self, method_name, params={}): if self.virtual: @@ -352,144 +1203,5 @@ class CTRexStatelessClient(object): port=self.port) -class CStream(object): - """docstring for CStream""" - DEFAULTS = {"rx_stats": CRxStats, - "mode": CTxMode, - "isg": 5.0, - "next_stream": -1, - "self_start": True, - "enabled": True} - - def __init__(self, **kwargs): - super(CStream, self).__init__() - for k, v in kwargs.items(): - setattr(self, k, v) - # set default values to unset attributes, according to DEFAULTS dict - set_keys = set(kwargs.keys()) - keys_to_set = [x - for x in self.DEFAULTS - if x not in set_keys] - for key in keys_to_set: - default = self.DEFAULTS.get(key) - if type(default) == type: - setattr(self, key, default()) - else: - setattr(self, key, default) - - @property - def packet(self): - return self._packet - - @packet.setter - def packet(self, packet_obj): - assert isinstance(packet_obj, CTRexPktBuilder) - self._packet = packet_obj - - @property - def enabled(self): - return self._enabled - - @enabled.setter - def enabled(self, bool_value): - self._enabled = bool(bool_value) - - @property - def self_start(self): - return self._self_start - - @self_start.setter - def self_start(self, bool_value): - self._self_start = bool(bool_value) - - @property - def next_stream(self): - return self._next_stream - - @next_stream.setter - def next_stream(self, value): - self._next_stream = int(value) - - def dump(self): - pass - return {"enabled": self.enabled, - "self_start": self.self_start, - "isg": self.isg, - "next_stream": self.next_stream, - "packet": self.packet.dump_pkt(), - "mode": self.mode.dump(), - "vm": self.packet.get_vm_data(), - "rx_stats": self.rx_stats.dump()} - -class CRxStats(object): - - def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): - self._rx_dict = {"enabled": enabled, - "seq_enabled": seq_enabled, - "latency_enabled": latency_enabled} - - @property - def enabled(self): - return self._rx_dict.get("enabled") - - @enabled.setter - def enabled(self, bool_value): - self._rx_dict['enabled'] = bool(bool_value) - - @property - def seq_enabled(self): - return self._rx_dict.get("seq_enabled") - - @seq_enabled.setter - def seq_enabled(self, bool_value): - self._rx_dict['seq_enabled'] = bool(bool_value) - - @property - def latency_enabled(self): - return self._rx_dict.get("latency_enabled") - - @latency_enabled.setter - def latency_enabled(self, bool_value): - self._rx_dict['latency_enabled'] = bool(bool_value) - - def dump(self): - return {k: v - for k, v in self._rx_dict.items() - if v - } - - -class CTxMode(object): - """docstring for CTxMode""" - def __init__(self, tx_mode, pps): - super(CTxMode, self).__init__() - if tx_mode not in ["continuous", "single_burst", "multi_burst"]: - raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode)) - self._tx_mode = tx_mode - self._fields = {'pps': float(pps)} - if tx_mode == "single_burst": - self._fields['total_pkts'] = 0 - elif tx_mode == "multi_burst": - self._fields['pkts_per_burst'] = 0 - self._fields['ibg'] = 0.0 - self._fields['count'] = 0 - else: - pass - - def set_tx_mode_attr(self, attr, val): - if attr in self._fields: - self._fields[attr] = type(self._fields.get(attr))(val) - else: - raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')". - format(attr, self._tx_mode)) - - def dump(self): - dump = {"type": self._tx_mode} - dump.update({k: v - for k, v in self._fields.items() - }) - return dump - - if __name__ == "__main__": pass |