summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rwxr-xr-x[-rw-r--r--]scripts/automation/trex_control_plane/client/trex_hltapi.py297
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py445
2 files changed, 537 insertions, 205 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py
index 46c283f8..92768ca4 100644..100755
--- a/scripts/automation/trex_control_plane/client/trex_hltapi.py
+++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py
@@ -2,16 +2,305 @@
import trex_root_path
from client_utils.packet_builder import CTRexPktBuilder
+from trex_stateless_client import CTRexStatelessClient
+from common.trex_streams import *
+from client_utils.general_utils import id_count_gen
+import dpkt
-print "done!"
class CTRexHltApi(object):
def __init__(self):
- pass
+ self.trex_client = None
+ self.connected = False
+ # self._stream_db = CStreamList()
+ self._port_data = {}
+
+ # ----- session functions ----- #
+
+ def connect(self, device, port_list, username, port=5050, reset=False, break_locks=False):
+ ret_dict = {"status": 0}
+ self.trex_client = CTRexStatelessClient(username, device, port)
+ res_ok, msg = self.trex_client.connect()
+ if not res_ok:
+ self.trex_client = None
+ ret_dict.update({"log": msg})
+ return ret_dict
+ # arrived here, connection successfully created with server
+ # next, try acquiring ports of TRex
+ port_list = self.parse_port_list(port_list)
+ response = self.trex_client.acquire(port_list, force=break_locks)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if not res_ok:
+ self.trex_client.disconnect()
+ self.trex_client = None
+ ret_dict.update({"log": log})
+ # TODO: should revert taken ports?!
+ return ret_dict
+ # arrived here, all desired ports were successfully acquired
+ print log
+ if reset:
+ # remove all port traffic configuration from TRex
+ response = self.trex_client.remove_all_streams(port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if not res_ok:
+ self.trex_client.disconnect()
+ self.trex_client = None
+ ret_dict.update({"log": log})
+ return ret_dict
+ print log
+ port_handle = {device: {port: port # since only supporting single TRex at the moment, 1:1 map
+ for port in port_list}
+ }
+ ret_dict.update({"status": 1,
+ "log": None,
+ "port_handle": port_handle,
+ "offline": 0}) # ports are online
+ self.connected = True
+ self._port_data_init(port_list)
+ return ret_dict
+
+ def cleanup_session(self, port_list, maintain_lock=False):
+ ret_dict = {"status": 0}
+ if not maintain_lock:
+ # release taken ports
+ if port_list == "all":
+ port_list = self.trex_client.get_acquired_ports()
+ else:
+ port_list = self.parse_port_list(port_list)
+ response = self.trex_client.release(port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ print log
+ if not res_ok:
+ ret_dict.update({"log": log})
+ return ret_dict
+ ret_dict.update({"status": 1,
+ "log": None})
+ self.trex_client.disconnect()
+ self.trex_client = None
+ self.connected = False
+ return ret_dict
+
+ def interface_config(self, port_handle, mode="config"):
+ ALLOWED_MODES = ["config", "modify", "destroy"]
+ if mode not in ALLOWED_MODES:
+ raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES))
+ # pass this function for now...
+ return {"status": 1, "log": None}
+
+ # ----- traffic functions ----- #
+ def traffic_config(self, mode, port_handle,
+ l2_encap="ethernet_ii", mac_src="00:00:01:00:00:01", mac_dst="00:00:00:00:00:00",
+ l3_protocol="ipv4", ip_src_addr="0.0.0.0", ip_dst_addr="192.0.0.1", l3_length=110,
+ transmit_mode="continuous", rate_pps=100,
+ **kwargs):
+ ALLOWED_MODES = ["create", "modify", "remove", "enable", "disable", "reset"]
+ if mode not in ALLOWED_MODES:
+ raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES))
+ if mode == "create":
+ # create a new stream with desired attributes, starting by creating packet
+ try:
+ packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst,
+ l3_protocol, ip_src_addr, ip_dst_addr, l3_length)
+ # set transmission attributes
+ tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs)
+ # set rx_stats
+ rx_stats = CRxStats() # defaults with disabled
+ # join the generated data into stream
+ stream_obj = CStream()
+ stream_obj_params = {"enabled": True,
+ "self_start": True,
+ "next_stream_id": -1,
+ "isg": 0.0,
+ "mode": tx_mode,
+ "rx_stats": rx_stats,
+ "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed
+ stream_obj.load_data(**stream_obj_params)
+ except Exception as e:
+ # some exception happened during the stream creation
+ return {"status": 0, "log": str(e)}
+ # try adding the stream, until free stream_id is found
+ port_data = self._port_data.get(port_handle)
+ id_candidate = None
+ # TODO: change this to better implementation
+ while True:
+ id_candidate = port_data["stream_id_gen"].next()
+ response = self.trex_client.add_stream(stream_id=id_candidate,
+ stream_obj=stream_obj,
+ port_id=port_handle)
+ res_ok, log = CTRexHltApi.process_response(port_handle, response)
+ if res_ok:
+ # found non-taken stream_id on server
+ # save it for modifying needs
+ port_data["streams"].update({id_candidate: stream_obj})
+ break
+ else:
+ # proceed to another iteration to use another id
+ continue
+ return {"status": 1,
+ "stream_id": id_candidate,
+ "log": None}
+ else:
+ raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode))
+
+ def traffic_control(self, action, port_handle):
+ ALLOWED_ACTIONS = ["clear_stats", "run", "stop", "sync_run"]
+ if action not in ALLOWED_ACTIONS:
+ raise ValueError("action must be one of the following values: {actions}".format(actions=ALLOWED_ACTIONS))
+ # ret_dict = {"status": 0, "stopped": 1}
+ port_list = self.parse_port_list(port_handle)
+ if action == "run":
+ response = self.trex_client.start_traffic(port_id=port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if res_ok:
+ return {"status": 1,
+ "stopped": 0,
+ "log": None}
+ elif action == "stop":
+ response = self.trex_client.stop_traffic(port_id=port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if res_ok:
+ return {"status": 1,
+ "stopped": 1,
+ "log": None}
+ else:
+ raise NotImplementedError("action '{0}' is not supported yet on TRex".format(action))
+
+ # if we arrived here, this means that operation FAILED!
+ return {"status": 0,
+ "log": log}
+
+
+ def traffic_stats(self, port_handle, mode):
+ ALLOWED_MODES = ["aggregate", "streams", "all"]
+ if mode not in ALLOWED_MODES:
+ raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES))
+ # pass this function for now...
+ if mode == "aggregate":
+ # create a new stream with desired attributes, starting by creating packet
+ try:
+ packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst,
+ l3_protocol, ip_src_addr, ip_dst_addr, l3_length)
+ # set transmission attributes
+ tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs)
+ # set rx_stats
+ rx_stats = CRxStats() # defaults with disabled
+ # join the generated data into stream
+ stream_obj = CStream()
+ stream_obj_params = {"enabled": True,
+ "self_start": True,
+ "next_stream_id": -1,
+ "isg": 0.0,
+ "mode": tx_mode,
+ "rx_stats": rx_stats,
+ "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed
+ stream_obj.load_data(**stream_obj_params)
+ except Exception as e:
+ # some exception happened during the stream creation
+ return {"status": 0, "log": str(e)}
+ # try adding the stream, until free stream_id is found
+ port_data = self._port_data.get(port_handle)
+ id_candidate = None
+ # TODO: change this to better implementation
+ while True:
+ id_candidate = port_data["stream_id_gen"].next()
+ response = self.trex_client.add_stream(stream_id=id_candidate,
+ stream_obj=stream_obj,
+ port_id=port_handle)
+ res_ok, log = CTRexHltApi.process_response(port_handle, response)
+ if res_ok:
+ # found non-taken stream_id on server
+ # save it for modifying needs
+ port_data["streams"].update({id_candidate: stream_obj})
+ break
+ else:
+ # proceed to another iteration to use another id
+ continue
+ return {"status": 1,
+ "stream_id": id_candidate,
+ "log": None}
+ else:
+ raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode))
+
+ def get_aggregate_port_stats(self, port_handle):
+ return self.traffic_stats(port_handle, mode="aggregate")
+
+ def get_stream_stats(self, port_handle):
+ return self.traffic_stats(port_handle, mode="streams")
+
+ # ----- internal functions ----- #
+ def _port_data_init(self, port_list):
+ for port in port_list:
+ self._port_data[port] = {"stream_id_gen": id_count_gen(),
+ "streams": {}}
+
+ @staticmethod
+ def process_response(port_list, response):
+ if isinstance(port_list, list):
+ res_ok, response = response
+ log = CTRexHltApi.join_batch_response(response)
+ else:
+ res_ok = response.success
+ log = str(response)
+ return res_ok, log
+
+ @staticmethod
+ def parse_port_list(port_list):
+ if isinstance(port_list, str):
+ return [int(port)
+ for port in port_list.split()]
+ elif isinstance(port_list, list):
+ return [int(port)
+ for port in port_list]
+ else:
+ return port_list
+
+ @staticmethod
+ def join_batch_response(responses):
+ return "\n".join([str(response)
+ for response in responses])
+
+ @staticmethod
+ def generate_stream(l2_encap, mac_src, mac_dst,
+ l3_protocol, ip_src_addr, ip_dst_addr, l3_length):
+ ALLOWED_L3_PROTOCOL = {"ipv4": dpkt.ethernet.ETH_TYPE_IP,
+ "ipv6": dpkt.ethernet.ETH_TYPE_IP6,
+ "arp": dpkt.ethernet.ETH_TYPE_ARP}
+ ALLOWED_L4_PROTOCOL = {"tcp": dpkt.ip.IP_PROTO_TCP,
+ "udp": dpkt.ip.IP_PROTO_UDP,
+ "icmp": dpkt.ip.IP_PROTO_ICMP,
+ "icmpv6": dpkt.ip.IP_PROTO_ICMP6,
+ "igmp": dpkt.ip.IP_PROTO_IGMP,
+ "rtp": dpkt.ip.IP_PROTO_IRTP,
+ "isis": dpkt.ip.IP_PROTO_ISIS,
+ "ospf": dpkt.ip.IP_PROTO_OSPF}
+
+ pkt_bld = CTRexPktBuilder()
+ if l2_encap == "ethernet_ii":
+ pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet())
+ # set Ethernet layer attributes
+ pkt_bld.set_eth_layer_addr("l2", "src", mac_src)
+ pkt_bld.set_eth_layer_addr("l2", "dst", mac_dst)
+ else:
+ raise NotImplementedError("l2_encap does not support the desired encapsulation '{0}'".format(l2_encap))
+ # set l3 on l2
+ if l3_protocol not in ALLOWED_L3_PROTOCOL:
+ raise ValueError("l3_protocol must be one of the following: {0}".format(ALLOWED_L3_PROTOCOL))
+ pkt_bld.set_layer_attr("l2", "type", ALLOWED_L3_PROTOCOL[l3_protocol])
+
+ # set l3 attributes
+ if l3_protocol == "ipv4":
+ pkt_bld.add_pkt_layer("l3", dpkt.ip.IP())
+ pkt_bld.set_ip_layer_addr("l3", "src", ip_src_addr)
+ pkt_bld.set_ip_layer_addr("l3", "dst", ip_dst_addr)
+ pkt_bld.set_layer_attr("l3", "len", l3_length)
+ else:
+ raise NotImplementedError("l3_protocol '{0}' is not supported by TRex yet.".format(l3_protocol))
+
+ pkt_bld.dump_pkt_to_pcap("stream_test.pcap")
+ return pkt_bld
+
- def config_traffic(self):
- pass
if __name__ == "__main__":
pass
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 334496d1..4e861585 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -10,58 +10,98 @@ from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
from client_utils.packet_builder import CTRexPktBuilder
import json
from common.trex_stats import *
+from common.trex_streams import *
from collections import namedtuple
+RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
+
+class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
+ __slots__ = ()
+ def __str__(self):
+ return "{id:^3} - {msg} ({stat})".format(id=self.id,
+ msg=self.msg,
+ stat="success" if self.success else "fail")
+
+# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
def __init__(self, username, server="localhost", port=5050, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
- self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual)
+ self.comm_link = CTRexStatelessClient.CCommLink(server, port, virtual)
self._conn_handler = {}
self._active_ports = set()
self._stats = CTRexStatsManager("port", "stream")
self._system_info = None
+ self._server_version = None
+ self.__err_log = None
# ----- decorator methods ----- #
def force_status(owned=True, active_and_owned=False):
def wrapper(func):
def wrapper_f(self, *args, **kwargs):
port_ids = kwargs.get("port_id")
+ if not port_ids:
+ port_ids = args[0]
if isinstance(port_ids, int):
# make sure port_ids is a list
port_ids = [port_ids]
bad_ids = set()
for port_id in port_ids:
- port_owned = self._conn_handler.get(kwargs.get(port_id))
+ port_owned = self._conn_handler.get(port_id)
if owned and not port_owned:
- bad_ids.add(port_ids)
+ bad_ids.add(port_id)
elif active_and_owned: # stronger condition than just owned, hence gets precedence
if port_owned and port_id in self._active_ports:
continue
else:
- bad_ids.add(port_ids)
+ bad_ids.add(port_id)
else:
continue
if bad_ids:
# Some port IDs are not according to desires status
- raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not"
- "at allowed stated".format(func.__name__))
+ raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not "
+ "at allowed states".format(func.__name__, list(bad_ids)))
else:
- func(self, *args, **kwargs)
+ return func(self, *args, **kwargs)
return wrapper_f
return wrapper
@property
def system_info(self):
if not self._system_info:
- self._system_info = self.get_system_info()
- return self._system_info
+ rc, info = self.get_system_info()
+ if rc:
+ self._system_info = info
+ else:
+ self.__err_log = info
+ return self._system_info if self._system_info else "Unknown"
+
+ @property
+ def server_version(self):
+ if not self._server_version:
+ rc, ver_info = self.get_version()
+ if rc:
+ self._server_version = ver_info
+ else:
+ self.__err_log = ver_info
+ return self._server_version if self._server_version else "Unknown"
+
+ def is_connected(self):
+ return self.comm_link.is_connected
# ----- user-access methods ----- #
+ def connect(self):
+ rc, err = self.comm_link.connect()
+ if not rc:
+ return rc, err
+ return self._init_sync()
+
+ def disconnect(self):
+ return self.comm_link.disconnect()
+
def ping(self):
return self.transmit("ping")
@@ -77,24 +117,38 @@ class CTRexStatelessClient(object):
def get_port_count(self):
return self.system_info.get("port_count")
+ def get_port_ids(self, as_str=False):
+ port_ids = range(self.get_port_count())
+ if as_str:
+ return " ".join(str(p) for p in port_ids)
+ else:
+ return port_ids
+
+ def get_acquired_ports(self):
+ return self._conn_handler.keys()
+
+ def get_active_ports(self):
+ return list(self._active_ports)
+
def acquire(self, port_id, force=False):
if not self._is_ports_valid(port_id):
raise ValueError("Provided illegal port id input")
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
+ commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_acquire_response)
+ return self._process_batch_result(commands, resp_list, self._handle_acquire_response)
else:
params = {"port_id": port_id,
"user": self.user,
"force": force}
- command = self.RpcCmdData("acquire", params)
- self._handle_acquire_response(command, self.transmit(command.method, command.params))
- return self._conn_handler.get(port_id)
+ command = RpcCmdData("acquire", params)
+ return self._handle_acquire_response(command,
+ self.transmit(command.method, command.params),
+ self.default_success_test)
@force_status(owned=True)
def release(self, port_id=None):
@@ -103,18 +157,20 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_release_response)
+ return self._process_batch_result(commands, resp_list, self._handle_release_response,
+ success_test=self.ack_success_test)
else:
self._conn_handler.pop(port_id)
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("release", params)
- self._handle_release_response(command, self.transmit(command.method, command.params))
- return
+ command = RpcCmdData("release", params)
+ return self._handle_release_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
@force_status(owned=True)
def add_stream(self, stream_id, stream_obj, port_id=None):
@@ -128,6 +184,27 @@ class CTRexStatelessClient(object):
return self.transmit("add_stream", params)
@force_status(owned=True)
+ def add_stream_pack(self, port_id=None, *stream_packs):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+
+ # since almost every run contains more than one transaction with server, handle all as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = []
+ for stream_pack in stream_packs:
+ commands.extend([RpcCmdData("add_stream", {"port_id": p_id,
+ "handler": self._conn_handler.get(p_id),
+ "stream_id": stream_pack.stream_id,
+ "stream": stream_pack.stream}
+ )
+ for p_id in port_ids]
+ )
+ res_ok, resp_list = self.transmit_batch(commands)
+ if res_ok:
+ return self._process_batch_result(commands, resp_list, self._handle_add_stream_response,
+ success_test=self.ack_success_test)
+
+ @force_status(owned=True)
def remove_stream(self, stream_id, port_id=None):
if not self._is_ports_valid(port_id):
raise ValueError("Provided illegal port id input")
@@ -136,6 +213,28 @@ class CTRexStatelessClient(object):
"stream_id": stream_id}
return self.transmit("remove_stream", params)
+ @force_status(owned=True)
+ def remove_all_streams(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response,
+ success_test=self.ack_success_test)
+ else:
+ params = {"port_id": port_id,
+ "handler": self._conn_handler.get(port_id)}
+ command = RpcCmdData("remove_all_streams", params)
+ return self._handle_remove_streams_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
+ pass
+
@force_status(owned=True, active_and_owned=True)
def get_stream_id_list(self, port_id=None):
if not self._is_ports_valid(port_id):
@@ -160,17 +259,19 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_start_traffic_response)
+ return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response,
+ success_test=self.ack_success_test)
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("start_traffic", params)
- self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
- return
+ command = RpcCmdData("start_traffic", params)
+ return self._handle_start_traffic_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
@force_status(owned=False, active_and_owned=True)
def stop_traffic(self, port_id=None):
@@ -179,20 +280,22 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response)
+ return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response,
+ success_test=self.ack_success_test)
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("stop_traffic", params)
- self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
- return
+ command = RpcCmdData("stop_traffic", params)
+ return self._handle_start_traffic_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
def get_global_stats(self):
- command = self.RpcCmdData("get_global_stats", {})
+ command = RpcCmdData("get_global_stats", {})
return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
# return self.transmit("get_global_stats")
@@ -203,7 +306,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -211,7 +314,7 @@ class CTRexStatelessClient(object):
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("get_port_stats", params)
+ command = RpcCmdData("get_port_stats", params)
return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
@force_status(owned=True, active_and_owned=True)
@@ -221,7 +324,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -229,15 +332,23 @@ class CTRexStatelessClient(object):
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("get_stream_stats", params)
+ command = RpcCmdData("get_stream_stats", params)
return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
# ----- internal methods ----- #
+ def _init_sync(self):
+ # get server version and system info
+ err = False
+ if self.server_version == "Unknown" or self.system_info == "Unknown":
+ self.disconnect()
+ return False, self.__err_log
+ return True, ""
+
def transmit(self, method_name, params={}):
- return self.tx_link.transmit(method_name, params)
+ return self.comm_link.transmit(method_name, params)
def transmit_batch(self, batch_list):
- return self.tx_link.transmit_batch(batch_list)
+ return self.comm_link.transmit_batch(batch_list)
@staticmethod
def _object_decoder(obj_type, obj_data):
@@ -258,36 +369,86 @@ class CTRexStatelessClient(object):
else:
return False
+ @staticmethod
+ def ack_success_test(result_obj):
+ if result_obj.success and result_obj.data == "ACK":
+ return True
+ else:
+ return False
+
+
# ----- handler internal methods ----- #
- def _handle_acquire_response(self, request, response):
- if response.success:
- self._conn_handler[request.get("port_id")] = response.data
+ def _handle_general_response(self, request, response, msg, success_test=None):
+ port_id = request.params.get("port_id")
+ if not success_test:
+ success_test = self.default_success_test
+ if success_test(response):
+ self._conn_handler[port_id] = response.data
+ return RpcResponseStatus(True, port_id, msg)
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
- def _handle_release_response(self, request, response):
- if response.success:
- del self._conn_handler[request.get("port_id")]
- def _handle_start_traffic_response(self, request, response):
- if response.success:
- self._active_ports.add(request.get("port_id"))
+ def _handle_acquire_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ self._conn_handler[port_id] = response.data
+ return RpcResponseStatus(True, port_id, "Acquired")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
- def _handle_stop_traffic_response(self, request, response):
- if response.success:
- self._active_ports.remove(request.get("port_id"))
+ def _handle_add_stream_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ stream_id = request.params.get("stream_id")
+ if success_test(response):
+ return RpcResponseStatus(True, port_id, "Stream {0} added".format(stream_id))
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_remove_streams_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ return RpcResponseStatus(True, port_id, "Removed")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_release_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ del self._conn_handler[port_id]
+ return RpcResponseStatus(True, port_id, "Released")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
- def _handle_get_global_stats_response(self, request, response):
+ def _handle_start_traffic_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ self._active_ports.add(port_id)
+ return RpcResponseStatus(True, port_id, "Traffic started")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_stop_traffic_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ self._active_ports.remove(port_id)
+ return RpcResponseStatus(True, port_id, "Traffic stopped")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_get_global_stats_response(self, request, response, success_test):
if response.success:
return CGlobalStats(**response.success)
else:
return False
- def _handle_get_port_stats_response(self, request, response):
+ def _handle_get_port_stats_response(self, request, response, success_test):
if response.success:
return CPortStats(**response.success)
else:
return False
- def _handle_get_stream_stats_response(self, request, response):
+ def _handle_get_stream_stats_response(self, request, response, success_test):
if response.success:
return CStreamStats(**response.success)
else:
@@ -298,32 +459,53 @@ class CTRexStatelessClient(object):
# check each item of the sequence
return all([self._is_ports_valid(port)
for port in port_id])
- elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()):
+ elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()):
return True
else:
return False
def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
+ res_ok = True
+ responses = []
+ if isinstance(success_test, staticmethod):
+ success_test = success_test.__func__
for i, response in enumerate(resp_list):
- # testing each result with success test so that a conclusion report could be deployed in future.
- if success_test(response):
- # run handler method with its params
- handler_func(req_list[i], response)
- else:
- continue # TODO: mark in this case somehow the bad result
+ # run handler method with its params
+ processed_response = handler_func(req_list[i], response, success_test)
+ responses.append(processed_response)
+ if not processed_response.success:
+ res_ok = False
+ # else:
+ # res_ok = False # TODO: mark in this case somehow the bad result
+ # print res_ok
+ # print responses
+ return res_ok, responses
# ------ private classes ------ #
- class CTxLink(object):
+ class CCommLink(object):
"""describes the connectivity of the stateless client method"""
def __init__(self, server="localhost", port=5050, virtual=False):
- super(CTRexStatelessClient.CTxLink, self).__init__()
+ super(CTRexStatelessClient.CCommLink, self).__init__()
self.virtual = virtual
self.server = server
self.port = port
self.rpc_link = JsonRpcClient(self.server, self.port)
+
+ @property
+ def is_connected(self):
if not self.virtual:
- self.rpc_link.connect()
+ return self.rpc_link.connected
+ else:
+ return True
+
+ def connect(self):
+ if not self.virtual:
+ return self.rpc_link.connect()
+
+ def disconnect(self):
+ if not self.virtual:
+ return self.rpc_link.disconnect()
def transmit(self, method_name, params={}):
if self.virtual:
@@ -352,144 +534,5 @@ class CTRexStatelessClient(object):
port=self.port)
-class CStream(object):
- """docstring for CStream"""
- DEFAULTS = {"rx_stats": CRxStats,
- "mode": CTxMode,
- "isg": 5.0,
- "next_stream": -1,
- "self_start": True,
- "enabled": True}
-
- def __init__(self, **kwargs):
- super(CStream, self).__init__()
- for k, v in kwargs.items():
- setattr(self, k, v)
- # set default values to unset attributes, according to DEFAULTS dict
- set_keys = set(kwargs.keys())
- keys_to_set = [x
- for x in self.DEFAULTS
- if x not in set_keys]
- for key in keys_to_set:
- default = self.DEFAULTS.get(key)
- if type(default) == type:
- setattr(self, key, default())
- else:
- setattr(self, key, default)
-
- @property
- def packet(self):
- return self._packet
-
- @packet.setter
- def packet(self, packet_obj):
- assert isinstance(packet_obj, CTRexPktBuilder)
- self._packet = packet_obj
-
- @property
- def enabled(self):
- return self._enabled
-
- @enabled.setter
- def enabled(self, bool_value):
- self._enabled = bool(bool_value)
-
- @property
- def self_start(self):
- return self._self_start
-
- @self_start.setter
- def self_start(self, bool_value):
- self._self_start = bool(bool_value)
-
- @property
- def next_stream(self):
- return self._next_stream
-
- @next_stream.setter
- def next_stream(self, value):
- self._next_stream = int(value)
-
- def dump(self):
- pass
- return {"enabled": self.enabled,
- "self_start": self.self_start,
- "isg": self.isg,
- "next_stream": self.next_stream,
- "packet": self.packet.dump_pkt(),
- "mode": self.mode.dump(),
- "vm": self.packet.get_vm_data(),
- "rx_stats": self.rx_stats.dump()}
-
-class CRxStats(object):
-
- def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False):
- self._rx_dict = {"enabled": enabled,
- "seq_enabled": seq_enabled,
- "latency_enabled": latency_enabled}
-
- @property
- def enabled(self):
- return self._rx_dict.get("enabled")
-
- @enabled.setter
- def enabled(self, bool_value):
- self._rx_dict['enabled'] = bool(bool_value)
-
- @property
- def seq_enabled(self):
- return self._rx_dict.get("seq_enabled")
-
- @seq_enabled.setter
- def seq_enabled(self, bool_value):
- self._rx_dict['seq_enabled'] = bool(bool_value)
-
- @property
- def latency_enabled(self):
- return self._rx_dict.get("latency_enabled")
-
- @latency_enabled.setter
- def latency_enabled(self, bool_value):
- self._rx_dict['latency_enabled'] = bool(bool_value)
-
- def dump(self):
- return {k: v
- for k, v in self._rx_dict.items()
- if v
- }
-
-
-class CTxMode(object):
- """docstring for CTxMode"""
- def __init__(self, tx_mode, pps):
- super(CTxMode, self).__init__()
- if tx_mode not in ["continuous", "single_burst", "multi_burst"]:
- raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode))
- self._tx_mode = tx_mode
- self._fields = {'pps': float(pps)}
- if tx_mode == "single_burst":
- self._fields['total_pkts'] = 0
- elif tx_mode == "multi_burst":
- self._fields['pkts_per_burst'] = 0
- self._fields['ibg'] = 0.0
- self._fields['count'] = 0
- else:
- pass
-
- def set_tx_mode_attr(self, attr, val):
- if attr in self._fields:
- self._fields[attr] = type(self._fields.get(attr))(val)
- else:
- raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')".
- format(attr, self._tx_mode))
-
- def dump(self):
- dump = {"type": self._tx_mode}
- dump.update({k: v
- for k, v in self._fields.items()
- })
- return dump
-
-
if __name__ == "__main__":
pass