summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_stateless_client.py
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-10-29 14:37:21 +0200
committerimarom <imarom@cisco.com>2015-10-29 14:37:21 +0200
commitcd64f9efd72f1502d2b1369da7f73eec15542c7c (patch)
tree869ceecf36ee4b498b3cbf6568d88ed525b11bb3 /scripts/automation/trex_control_plane/client/trex_stateless_client.py
parent3978adceba8ce3861097747868da22bce379edd2 (diff)
parentd78150a66de591a77df2496e5de828d3232a931a (diff)
Merge branch 'dan_stateless' of csi-sceasr-b45:/auto/proj-pcube-b/apps/PL-b/tools/repo//trex-core into rpc_intg1
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_stateless_client.py')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py445
1 files changed, 244 insertions, 201 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 334496d1..4e861585 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -10,58 +10,98 @@ from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
from client_utils.packet_builder import CTRexPktBuilder
import json
from common.trex_stats import *
+from common.trex_streams import *
from collections import namedtuple
+RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
+
+class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
+ __slots__ = ()
+ def __str__(self):
+ return "{id:^3} - {msg} ({stat})".format(id=self.id,
+ msg=self.msg,
+ stat="success" if self.success else "fail")
+
+# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
def __init__(self, username, server="localhost", port=5050, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
- self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual)
+ self.comm_link = CTRexStatelessClient.CCommLink(server, port, virtual)
self._conn_handler = {}
self._active_ports = set()
self._stats = CTRexStatsManager("port", "stream")
self._system_info = None
+ self._server_version = None
+ self.__err_log = None
# ----- decorator methods ----- #
def force_status(owned=True, active_and_owned=False):
def wrapper(func):
def wrapper_f(self, *args, **kwargs):
port_ids = kwargs.get("port_id")
+ if not port_ids:
+ port_ids = args[0]
if isinstance(port_ids, int):
# make sure port_ids is a list
port_ids = [port_ids]
bad_ids = set()
for port_id in port_ids:
- port_owned = self._conn_handler.get(kwargs.get(port_id))
+ port_owned = self._conn_handler.get(port_id)
if owned and not port_owned:
- bad_ids.add(port_ids)
+ bad_ids.add(port_id)
elif active_and_owned: # stronger condition than just owned, hence gets precedence
if port_owned and port_id in self._active_ports:
continue
else:
- bad_ids.add(port_ids)
+ bad_ids.add(port_id)
else:
continue
if bad_ids:
# Some port IDs are not according to desires status
- raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not"
- "at allowed stated".format(func.__name__))
+ raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not "
+ "at allowed states".format(func.__name__, list(bad_ids)))
else:
- func(self, *args, **kwargs)
+ return func(self, *args, **kwargs)
return wrapper_f
return wrapper
@property
def system_info(self):
if not self._system_info:
- self._system_info = self.get_system_info()
- return self._system_info
+ rc, info = self.get_system_info()
+ if rc:
+ self._system_info = info
+ else:
+ self.__err_log = info
+ return self._system_info if self._system_info else "Unknown"
+
+ @property
+ def server_version(self):
+ if not self._server_version:
+ rc, ver_info = self.get_version()
+ if rc:
+ self._server_version = ver_info
+ else:
+ self.__err_log = ver_info
+ return self._server_version if self._server_version else "Unknown"
+
+ def is_connected(self):
+ return self.comm_link.is_connected
# ----- user-access methods ----- #
+ def connect(self):
+ rc, err = self.comm_link.connect()
+ if not rc:
+ return rc, err
+ return self._init_sync()
+
+ def disconnect(self):
+ return self.comm_link.disconnect()
+
def ping(self):
return self.transmit("ping")
@@ -77,24 +117,38 @@ class CTRexStatelessClient(object):
def get_port_count(self):
return self.system_info.get("port_count")
+ def get_port_ids(self, as_str=False):
+ port_ids = range(self.get_port_count())
+ if as_str:
+ return " ".join(str(p) for p in port_ids)
+ else:
+ return port_ids
+
+ def get_acquired_ports(self):
+ return self._conn_handler.keys()
+
+ def get_active_ports(self):
+ return list(self._active_ports)
+
def acquire(self, port_id, force=False):
if not self._is_ports_valid(port_id):
raise ValueError("Provided illegal port id input")
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
+ commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_acquire_response)
+ return self._process_batch_result(commands, resp_list, self._handle_acquire_response)
else:
params = {"port_id": port_id,
"user": self.user,
"force": force}
- command = self.RpcCmdData("acquire", params)
- self._handle_acquire_response(command, self.transmit(command.method, command.params))
- return self._conn_handler.get(port_id)
+ command = RpcCmdData("acquire", params)
+ return self._handle_acquire_response(command,
+ self.transmit(command.method, command.params),
+ self.default_success_test)
@force_status(owned=True)
def release(self, port_id=None):
@@ -103,18 +157,20 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_release_response)
+ return self._process_batch_result(commands, resp_list, self._handle_release_response,
+ success_test=self.ack_success_test)
else:
self._conn_handler.pop(port_id)
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("release", params)
- self._handle_release_response(command, self.transmit(command.method, command.params))
- return
+ command = RpcCmdData("release", params)
+ return self._handle_release_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
@force_status(owned=True)
def add_stream(self, stream_id, stream_obj, port_id=None):
@@ -128,6 +184,27 @@ class CTRexStatelessClient(object):
return self.transmit("add_stream", params)
@force_status(owned=True)
+ def add_stream_pack(self, port_id=None, *stream_packs):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+
+ # since almost every run contains more than one transaction with server, handle all as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = []
+ for stream_pack in stream_packs:
+ commands.extend([RpcCmdData("add_stream", {"port_id": p_id,
+ "handler": self._conn_handler.get(p_id),
+ "stream_id": stream_pack.stream_id,
+ "stream": stream_pack.stream}
+ )
+ for p_id in port_ids]
+ )
+ res_ok, resp_list = self.transmit_batch(commands)
+ if res_ok:
+ return self._process_batch_result(commands, resp_list, self._handle_add_stream_response,
+ success_test=self.ack_success_test)
+
+ @force_status(owned=True)
def remove_stream(self, stream_id, port_id=None):
if not self._is_ports_valid(port_id):
raise ValueError("Provided illegal port id input")
@@ -136,6 +213,28 @@ class CTRexStatelessClient(object):
"stream_id": stream_id}
return self.transmit("remove_stream", params)
+ @force_status(owned=True)
+ def remove_all_streams(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response,
+ success_test=self.ack_success_test)
+ else:
+ params = {"port_id": port_id,
+ "handler": self._conn_handler.get(port_id)}
+ command = RpcCmdData("remove_all_streams", params)
+ return self._handle_remove_streams_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
+ pass
+
@force_status(owned=True, active_and_owned=True)
def get_stream_id_list(self, port_id=None):
if not self._is_ports_valid(port_id):
@@ -160,17 +259,19 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_start_traffic_response)
+ return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response,
+ success_test=self.ack_success_test)
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("start_traffic", params)
- self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
- return
+ command = RpcCmdData("start_traffic", params)
+ return self._handle_start_traffic_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
@force_status(owned=False, active_and_owned=True)
def stop_traffic(self, port_id=None):
@@ -179,20 +280,22 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response)
+ return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response,
+ success_test=self.ack_success_test)
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("stop_traffic", params)
- self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
- return
+ command = RpcCmdData("stop_traffic", params)
+ return self._handle_start_traffic_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
def get_global_stats(self):
- command = self.RpcCmdData("get_global_stats", {})
+ command = RpcCmdData("get_global_stats", {})
return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
# return self.transmit("get_global_stats")
@@ -203,7 +306,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -211,7 +314,7 @@ class CTRexStatelessClient(object):
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("get_port_stats", params)
+ command = RpcCmdData("get_port_stats", params)
return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
@force_status(owned=True, active_and_owned=True)
@@ -221,7 +324,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -229,15 +332,23 @@ class CTRexStatelessClient(object):
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("get_stream_stats", params)
+ command = RpcCmdData("get_stream_stats", params)
return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
# ----- internal methods ----- #
+ def _init_sync(self):
+ # get server version and system info
+ err = False
+ if self.server_version == "Unknown" or self.system_info == "Unknown":
+ self.disconnect()
+ return False, self.__err_log
+ return True, ""
+
def transmit(self, method_name, params={}):
- return self.tx_link.transmit(method_name, params)
+ return self.comm_link.transmit(method_name, params)
def transmit_batch(self, batch_list):
- return self.tx_link.transmit_batch(batch_list)
+ return self.comm_link.transmit_batch(batch_list)
@staticmethod
def _object_decoder(obj_type, obj_data):
@@ -258,36 +369,86 @@ class CTRexStatelessClient(object):
else:
return False
+ @staticmethod
+ def ack_success_test(result_obj):
+ if result_obj.success and result_obj.data == "ACK":
+ return True
+ else:
+ return False
+
+
# ----- handler internal methods ----- #
- def _handle_acquire_response(self, request, response):
- if response.success:
- self._conn_handler[request.get("port_id")] = response.data
+ def _handle_general_response(self, request, response, msg, success_test=None):
+ port_id = request.params.get("port_id")
+ if not success_test:
+ success_test = self.default_success_test
+ if success_test(response):
+ self._conn_handler[port_id] = response.data
+ return RpcResponseStatus(True, port_id, msg)
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
- def _handle_release_response(self, request, response):
- if response.success:
- del self._conn_handler[request.get("port_id")]
- def _handle_start_traffic_response(self, request, response):
- if response.success:
- self._active_ports.add(request.get("port_id"))
+ def _handle_acquire_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ self._conn_handler[port_id] = response.data
+ return RpcResponseStatus(True, port_id, "Acquired")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
- def _handle_stop_traffic_response(self, request, response):
- if response.success:
- self._active_ports.remove(request.get("port_id"))
+ def _handle_add_stream_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ stream_id = request.params.get("stream_id")
+ if success_test(response):
+ return RpcResponseStatus(True, port_id, "Stream {0} added".format(stream_id))
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_remove_streams_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ return RpcResponseStatus(True, port_id, "Removed")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_release_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ del self._conn_handler[port_id]
+ return RpcResponseStatus(True, port_id, "Released")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
- def _handle_get_global_stats_response(self, request, response):
+ def _handle_start_traffic_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ self._active_ports.add(port_id)
+ return RpcResponseStatus(True, port_id, "Traffic started")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_stop_traffic_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ self._active_ports.remove(port_id)
+ return RpcResponseStatus(True, port_id, "Traffic stopped")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_get_global_stats_response(self, request, response, success_test):
if response.success:
return CGlobalStats(**response.success)
else:
return False
- def _handle_get_port_stats_response(self, request, response):
+ def _handle_get_port_stats_response(self, request, response, success_test):
if response.success:
return CPortStats(**response.success)
else:
return False
- def _handle_get_stream_stats_response(self, request, response):
+ def _handle_get_stream_stats_response(self, request, response, success_test):
if response.success:
return CStreamStats(**response.success)
else:
@@ -298,32 +459,53 @@ class CTRexStatelessClient(object):
# check each item of the sequence
return all([self._is_ports_valid(port)
for port in port_id])
- elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()):
+ elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()):
return True
else:
return False
def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
+ res_ok = True
+ responses = []
+ if isinstance(success_test, staticmethod):
+ success_test = success_test.__func__
for i, response in enumerate(resp_list):
- # testing each result with success test so that a conclusion report could be deployed in future.
- if success_test(response):
- # run handler method with its params
- handler_func(req_list[i], response)
- else:
- continue # TODO: mark in this case somehow the bad result
+ # run handler method with its params
+ processed_response = handler_func(req_list[i], response, success_test)
+ responses.append(processed_response)
+ if not processed_response.success:
+ res_ok = False
+ # else:
+ # res_ok = False # TODO: mark in this case somehow the bad result
+ # print res_ok
+ # print responses
+ return res_ok, responses
# ------ private classes ------ #
- class CTxLink(object):
+ class CCommLink(object):
"""describes the connectivity of the stateless client method"""
def __init__(self, server="localhost", port=5050, virtual=False):
- super(CTRexStatelessClient.CTxLink, self).__init__()
+ super(CTRexStatelessClient.CCommLink, self).__init__()
self.virtual = virtual
self.server = server
self.port = port
self.rpc_link = JsonRpcClient(self.server, self.port)
+
+ @property
+ def is_connected(self):
if not self.virtual:
- self.rpc_link.connect()
+ return self.rpc_link.connected
+ else:
+ return True
+
+ def connect(self):
+ if not self.virtual:
+ return self.rpc_link.connect()
+
+ def disconnect(self):
+ if not self.virtual:
+ return self.rpc_link.disconnect()
def transmit(self, method_name, params={}):
if self.virtual:
@@ -352,144 +534,5 @@ class CTRexStatelessClient(object):
port=self.port)
-class CStream(object):
- """docstring for CStream"""
- DEFAULTS = {"rx_stats": CRxStats,
- "mode": CTxMode,
- "isg": 5.0,
- "next_stream": -1,
- "self_start": True,
- "enabled": True}
-
- def __init__(self, **kwargs):
- super(CStream, self).__init__()
- for k, v in kwargs.items():
- setattr(self, k, v)
- # set default values to unset attributes, according to DEFAULTS dict
- set_keys = set(kwargs.keys())
- keys_to_set = [x
- for x in self.DEFAULTS
- if x not in set_keys]
- for key in keys_to_set:
- default = self.DEFAULTS.get(key)
- if type(default) == type:
- setattr(self, key, default())
- else:
- setattr(self, key, default)
-
- @property
- def packet(self):
- return self._packet
-
- @packet.setter
- def packet(self, packet_obj):
- assert isinstance(packet_obj, CTRexPktBuilder)
- self._packet = packet_obj
-
- @property
- def enabled(self):
- return self._enabled
-
- @enabled.setter
- def enabled(self, bool_value):
- self._enabled = bool(bool_value)
-
- @property
- def self_start(self):
- return self._self_start
-
- @self_start.setter
- def self_start(self, bool_value):
- self._self_start = bool(bool_value)
-
- @property
- def next_stream(self):
- return self._next_stream
-
- @next_stream.setter
- def next_stream(self, value):
- self._next_stream = int(value)
-
- def dump(self):
- pass
- return {"enabled": self.enabled,
- "self_start": self.self_start,
- "isg": self.isg,
- "next_stream": self.next_stream,
- "packet": self.packet.dump_pkt(),
- "mode": self.mode.dump(),
- "vm": self.packet.get_vm_data(),
- "rx_stats": self.rx_stats.dump()}
-
-class CRxStats(object):
-
- def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False):
- self._rx_dict = {"enabled": enabled,
- "seq_enabled": seq_enabled,
- "latency_enabled": latency_enabled}
-
- @property
- def enabled(self):
- return self._rx_dict.get("enabled")
-
- @enabled.setter
- def enabled(self, bool_value):
- self._rx_dict['enabled'] = bool(bool_value)
-
- @property
- def seq_enabled(self):
- return self._rx_dict.get("seq_enabled")
-
- @seq_enabled.setter
- def seq_enabled(self, bool_value):
- self._rx_dict['seq_enabled'] = bool(bool_value)
-
- @property
- def latency_enabled(self):
- return self._rx_dict.get("latency_enabled")
-
- @latency_enabled.setter
- def latency_enabled(self, bool_value):
- self._rx_dict['latency_enabled'] = bool(bool_value)
-
- def dump(self):
- return {k: v
- for k, v in self._rx_dict.items()
- if v
- }
-
-
-class CTxMode(object):
- """docstring for CTxMode"""
- def __init__(self, tx_mode, pps):
- super(CTxMode, self).__init__()
- if tx_mode not in ["continuous", "single_burst", "multi_burst"]:
- raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode))
- self._tx_mode = tx_mode
- self._fields = {'pps': float(pps)}
- if tx_mode == "single_burst":
- self._fields['total_pkts'] = 0
- elif tx_mode == "multi_burst":
- self._fields['pkts_per_burst'] = 0
- self._fields['ibg'] = 0.0
- self._fields['count'] = 0
- else:
- pass
-
- def set_tx_mode_attr(self, attr, val):
- if attr in self._fields:
- self._fields[attr] = type(self._fields.get(attr))(val)
- else:
- raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')".
- format(attr, self._tx_mode))
-
- def dump(self):
- dump = {"type": self._tx_mode}
- dump.update({k: v
- for k, v in self._fields.items()
- })
- return dump
-
-
if __name__ == "__main__":
pass