From 0c2b3c83f9cc0c25277c39660dce132aad55c3d7 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Wed, 28 Oct 2015 07:28:37 +0200 Subject: updated more HLTAPI functionality and fixed found bugs. Working: Start/stop traffic, traffic config (semi), connect, clean Missing: stats Next: boost console --- .../trex_control_plane/client/trex_hltapi.py | 158 ++++++++++++++++++--- .../client/trex_stateless_client.py | 36 +++-- .../client_utils/general_utils.py | 16 +++ .../trex_control_plane/common/trex_streams.py | 3 +- .../trex_control_plane/console/trex_console.py | 4 +- .../examples/stateless_example.py | 31 +++- 6 files changed, 213 insertions(+), 35 deletions(-) mode change 100644 => 100755 scripts/automation/trex_control_plane/examples/stateless_example.py diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py index cc7005e8..92768ca4 100755 --- a/scripts/automation/trex_control_plane/client/trex_hltapi.py +++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py @@ -3,6 +3,8 @@ import trex_root_path from client_utils.packet_builder import CTRexPktBuilder from trex_stateless_client import CTRexStatelessClient +from common.trex_streams import * +from client_utils.general_utils import id_count_gen import dpkt @@ -11,8 +13,8 @@ class CTRexHltApi(object): def __init__(self): self.trex_client = None self.connected = False - - pass + # self._stream_db = CStreamList() + self._port_data = {} # ----- session functions ----- # @@ -50,11 +52,12 @@ class CTRexHltApi(object): port_handle = {device: {port: port # since only supporting single TRex at the moment, 1:1 map for port in port_list} } - ret_dict.update({"status": 1, "log": None, "port_handle": port_handle, "offline": 0}) # ports are online + self.connected = True + self._port_data_init(port_list) return ret_dict def cleanup_session(self, port_list, maintain_lock=False): @@ -75,6 +78,7 @@ class CTRexHltApi(object): "log": None}) self.trex_client.disconnect() self.trex_client = None + self.connected = False return ret_dict def interface_config(self, port_handle, mode="config"): @@ -86,37 +90,150 @@ class CTRexHltApi(object): # ----- traffic functions ----- # def traffic_config(self, mode, port_handle, - l2_encap, mac_src, mac_dst, - l3_protocol, ip_src_addr, ip_dst_addr, l3_length, - transmit_mode, rate_pps): + l2_encap="ethernet_ii", mac_src="00:00:01:00:00:01", mac_dst="00:00:00:00:00:00", + l3_protocol="ipv4", ip_src_addr="0.0.0.0", ip_dst_addr="192.0.0.1", l3_length=110, + transmit_mode="continuous", rate_pps=100, + **kwargs): ALLOWED_MODES = ["create", "modify", "remove", "enable", "disable", "reset"] if mode not in ALLOWED_MODES: raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES)) if mode == "create": - # create a new stream with desired attributes - stream = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst, - l3_protocol, ip_src_addr, ip_dst_addr, l3_length, - transmit_mode, rate_pps) - - - - pass + # create a new stream with desired attributes, starting by creating packet + try: + packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst, + l3_protocol, ip_src_addr, ip_dst_addr, l3_length) + # set transmission attributes + tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs) + # set rx_stats + rx_stats = CRxStats() # defaults with disabled + # join the generated data into stream + stream_obj = CStream() + stream_obj_params = {"enabled": True, + "self_start": True, + "next_stream_id": -1, + "isg": 0.0, + "mode": tx_mode, + "rx_stats": rx_stats, + "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed + stream_obj.load_data(**stream_obj_params) + except Exception as e: + # some exception happened during the stream creation + return {"status": 0, "log": str(e)} + # try adding the stream, until free stream_id is found + port_data = self._port_data.get(port_handle) + id_candidate = None + # TODO: change this to better implementation + while True: + id_candidate = port_data["stream_id_gen"].next() + response = self.trex_client.add_stream(stream_id=id_candidate, + stream_obj=stream_obj, + port_id=port_handle) + res_ok, log = CTRexHltApi.process_response(port_handle, response) + if res_ok: + # found non-taken stream_id on server + # save it for modifying needs + port_data["streams"].update({id_candidate: stream_obj}) + break + else: + # proceed to another iteration to use another id + continue + return {"status": 1, + "stream_id": id_candidate, + "log": None} else: raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode)) def traffic_control(self, action, port_handle): - pass + ALLOWED_ACTIONS = ["clear_stats", "run", "stop", "sync_run"] + if action not in ALLOWED_ACTIONS: + raise ValueError("action must be one of the following values: {actions}".format(actions=ALLOWED_ACTIONS)) + # ret_dict = {"status": 0, "stopped": 1} + port_list = self.parse_port_list(port_handle) + if action == "run": + response = self.trex_client.start_traffic(port_id=port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if res_ok: + return {"status": 1, + "stopped": 0, + "log": None} + elif action == "stop": + response = self.trex_client.stop_traffic(port_id=port_list) + res_ok, log = CTRexHltApi.process_response(port_list, response) + if res_ok: + return {"status": 1, + "stopped": 1, + "log": None} + else: + raise NotImplementedError("action '{0}' is not supported yet on TRex".format(action)) + + # if we arrived here, this means that operation FAILED! + return {"status": 0, + "log": log} + def traffic_stats(self, port_handle, mode): - pass + ALLOWED_MODES = ["aggregate", "streams", "all"] + if mode not in ALLOWED_MODES: + raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES)) + # pass this function for now... + if mode == "aggregate": + # create a new stream with desired attributes, starting by creating packet + try: + packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst, + l3_protocol, ip_src_addr, ip_dst_addr, l3_length) + # set transmission attributes + tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs) + # set rx_stats + rx_stats = CRxStats() # defaults with disabled + # join the generated data into stream + stream_obj = CStream() + stream_obj_params = {"enabled": True, + "self_start": True, + "next_stream_id": -1, + "isg": 0.0, + "mode": tx_mode, + "rx_stats": rx_stats, + "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed + stream_obj.load_data(**stream_obj_params) + except Exception as e: + # some exception happened during the stream creation + return {"status": 0, "log": str(e)} + # try adding the stream, until free stream_id is found + port_data = self._port_data.get(port_handle) + id_candidate = None + # TODO: change this to better implementation + while True: + id_candidate = port_data["stream_id_gen"].next() + response = self.trex_client.add_stream(stream_id=id_candidate, + stream_obj=stream_obj, + port_id=port_handle) + res_ok, log = CTRexHltApi.process_response(port_handle, response) + if res_ok: + # found non-taken stream_id on server + # save it for modifying needs + port_data["streams"].update({id_candidate: stream_obj}) + break + else: + # proceed to another iteration to use another id + continue + return {"status": 1, + "stream_id": id_candidate, + "log": None} + else: + raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode)) def get_aggregate_port_stats(self, port_handle): return self.traffic_stats(port_handle, mode="aggregate") def get_stream_stats(self, port_handle): - return self.traffic_stats(port_handle, mode="stream") + return self.traffic_stats(port_handle, mode="streams") # ----- internal functions ----- # + def _port_data_init(self, port_list): + for port in port_list: + self._port_data[port] = {"stream_id_gen": id_count_gen(), + "streams": {}} + @staticmethod def process_response(port_list, response): if isinstance(port_list, list): @@ -145,8 +262,7 @@ class CTRexHltApi(object): @staticmethod def generate_stream(l2_encap, mac_src, mac_dst, - l3_protocol, ip_src_addr, ip_dst_addr, l3_length, - transmit_mode, rate_pps): + l3_protocol, ip_src_addr, ip_dst_addr, l3_length): ALLOWED_L3_PROTOCOL = {"ipv4": dpkt.ethernet.ETH_TYPE_IP, "ipv6": dpkt.ethernet.ETH_TYPE_IP6, "arp": dpkt.ethernet.ETH_TYPE_ARP} @@ -177,9 +293,13 @@ class CTRexHltApi(object): pkt_bld.add_pkt_layer("l3", dpkt.ip.IP()) pkt_bld.set_ip_layer_addr("l3", "src", ip_src_addr) pkt_bld.set_ip_layer_addr("l3", "dst", ip_dst_addr) + pkt_bld.set_layer_attr("l3", "len", l3_length) else: raise NotImplementedError("l3_protocol '{0}' is not supported by TRex yet.".format(l3_protocol)) + pkt_bld.dump_pkt_to_pcap("stream_test.pcap") + return pkt_bld + if __name__ == "__main__": diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 3079ee5e..bbdcddbe 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -62,7 +62,7 @@ class CTRexStatelessClient(object): continue if bad_ids: # Some port IDs are not according to desires status - raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" + raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not " "at allowed stated".format(func.__name__, list(bad_ids))) else: return func(self, *args, **kwargs) @@ -229,13 +229,15 @@ class CTRexStatelessClient(object): for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: - self._process_batch_result(commands, resp_list, self._handle_start_traffic_response) + return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response, + success_test=self.ack_success_test) else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} command = RpcCmdData("start_traffic", params) - self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) - return + return self._handle_start_traffic_response(command, + self.transmit(command.method, command.params), + self.ack_success_test) @force_status(owned=False, active_and_owned=True) def stop_traffic(self, port_id=None): @@ -248,13 +250,15 @@ class CTRexStatelessClient(object): for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: - self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response) + return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response, + success_test=self.ack_success_test) else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} command = RpcCmdData("stop_traffic", params) - self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) - return + return self._handle_start_traffic_response(command, + self.transmit(command.method, command.params), + self.ack_success_test) def get_global_stats(self): command = RpcCmdData("get_global_stats", {}) @@ -375,12 +379,20 @@ class CTRexStatelessClient(object): return RpcResponseStatus(False, port_id, response.data) def _handle_start_traffic_response(self, request, response, success_test): - if response.success: - self._active_ports.add(request.get("port_id")) + port_id = request.params.get("port_id") + if success_test(response): + self._active_ports.add(port_id) + return RpcResponseStatus(True, port_id, "Traffic started") + else: + return RpcResponseStatus(False, port_id, response.data) - def _handle_stop_traffic_response(self, request, response): - if response.success: - self._active_ports.remove(request.get("port_id")) + def _handle_stop_traffic_response(self, request, response, success_test): + port_id = request.params.get("port_id") + if success_test(response): + self._active_ports.remove(port_id) + return RpcResponseStatus(True, port_id, "Traffic stopped") + else: + return RpcResponseStatus(False, port_id, response.data) def _handle_get_global_stats_response(self, request, response, success_test): if response.success: diff --git a/scripts/automation/trex_control_plane/client_utils/general_utils.py b/scripts/automation/trex_control_plane/client_utils/general_utils.py index 5488b9dd..3c025608 100755 --- a/scripts/automation/trex_control_plane/client_utils/general_utils.py +++ b/scripts/automation/trex_control_plane/client_utils/general_utils.py @@ -75,6 +75,22 @@ def random_id_gen(length=8): return_id += random.choice(id_chars) yield return_id +def id_count_gen(): + """ + A generator for creating an increasing id for objects, starting from 0 + + :parameters: + None + + :return: + an id (unsigned int) with each next() request. + """ + return_id = 0 + while True: + yield return_id + return_id += 1 + + if __name__ == "__main__": pass diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py index 750da198..bb4c72ca 100755 --- a/scripts/automation/trex_control_plane/common/trex_streams.py +++ b/scripts/automation/trex_control_plane/common/trex_streams.py @@ -23,7 +23,7 @@ class CStreamList(object): if name in self.streams_list: raise NameError("A stream with this name already exists on this list.") self.streams_list[name]=stream_obj - return + return name def remove_stream(self, name): popped = self.streams_list.pop(name) @@ -184,6 +184,7 @@ class CStream(object): if isinstance(kwargs[k], CTRexPktBuilder): if "vm" not in kwargs: self.load_packet_obj(kwargs[k]) + break # vm field check is skipped else: raise ValueError("When providing packet object with a CTRexPktBuilder, vm parameter " "should not be supplied") diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 4f9743f4..2ea29473 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -449,10 +449,10 @@ class TRexConsole(cmd.Cmd): try: compiled_streams = stream_list.compile_streams() self.user_streams[name] = LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump_compiled()) + [StreamPack(v.stream_id, v.stream.dump()) for k, v in compiled_streams.items()]) - print "Stream '{0}' loaded successfully".format(name) + print "Stream list '{0}' loaded successfully".format(name) except Exception as e: raise return diff --git a/scripts/automation/trex_control_plane/examples/stateless_example.py b/scripts/automation/trex_control_plane/examples/stateless_example.py old mode 100644 new mode 100755 index f97bce4c..bb0fe983 --- a/scripts/automation/trex_control_plane/examples/stateless_example.py +++ b/scripts/automation/trex_control_plane/examples/stateless_example.py @@ -1 +1,30 @@ -__author__ = 'danklei' +#!/router/bin/python + +import trex_root_path +from client.trex_hltapi import CTRexHltApi + +if __name__ == "__main__": + port_list = [1,2] + try: + hlt_client = CTRexHltApi() + con = hlt_client.connect("localhost", port_list, "danklei", break_locks=True, reset=True)#, port=6666) + print con + + res = hlt_client.traffic_config("create", 1)#, ip_src_addr="2000.2.2") + print res + res = hlt_client.traffic_config("create", 2)#, ip_src_addr="2000.2.2") + print res + + res = hlt_client.traffic_control("run", [1, 2])#, ip_src_addr="2000.2.2") + print res + + res = hlt_client.traffic_control("stop", [1, 2])#, ip_src_addr="2000.2.2") + print res + + + + except Exception as e: + raise + finally: + res = hlt_client.cleanup_session(port_list) + print res \ No newline at end of file -- cgit 1.2.3-korg