summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
authorDan Klein <danklei@cisco.com>2015-10-27 10:00:18 +0200
committerDan Klein <danklei@cisco.com>2015-10-27 10:00:18 +0200
commita39e4416cd78fc3b147695465c4de1c896b3face (patch)
tree4d5f1fd6737bbe6159303fe21b0518a539c0a665 /scripts/automation/trex_control_plane/client
parentb44239e4c6019f10fa7cf4fe0fef8c3726435033 (diff)
more hltapi progress
connect working
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_hltapi.py72
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py185
2 files changed, 212 insertions, 45 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py
index 7453d8ec..6194d376 100755
--- a/scripts/automation/trex_control_plane/client/trex_hltapi.py
+++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py
@@ -4,14 +4,54 @@ import trex_root_path
from client_utils.packet_builder import CTRexPktBuilder
from trex_stateless_client import CTRexStatelessClient
-print "done!"
class CTRexHltApi(object):
def __init__(self):
+ self.trex_client = None
+ self.connected = False
+
+ pass
+
+ # ----- session functions ----- #
+
+ def connect(self, device, port_list, username, port=5050, reset=False, break_locks=False):
+ ret_dict = {"status": 0}
+ self.trex_client = CTRexStatelessClient(username, device, port)
+ res_ok, msg = self.trex_client.connect()
+ if not res_ok:
+ self.trex_client = None
+ ret_dict.update({"log": msg})
+ return ret_dict
+ # arrived here, connection successfully created with server
+ # next, try acquiring ports of TRex
+ port_list = self.parse_port_list(port_list)
+ response = self.trex_client.acquire(port_list, force=break_locks)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if not res_ok:
+ self.trex_client.disconnect()
+ self.trex_client = None
+ ret_dict.update({"log": log})
+ # TODO: should revert taken ports?!
+ return ret_dict
+ # arrived here, all desired ports were successfully acquired
+ print log
+ if reset:
+ # remove all port traffic configuration from TRex
+ response = self.trex_client.remove_all_streams(port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if not res_ok:
+ self.trex_client.disconnect()
+ self.trex_client = None
+ ret_dict.update({"log": log})
+ return ret_dict
+ print log
+ ret_dict.update({"status": 1})
+ self.trex_client.disconnect()
+
pass
- def connect(self, device, port_list, username, reset=False, break_locks=False):
+ def cleanup_session(self, port_list, maintain_lock=False):
pass
def interface_config(self, port_handle, mode="config"):
@@ -41,8 +81,32 @@ class CTRexHltApi(object):
def get_stream_stats(self, port_handle):
return self.traffic_stats(port_handle, mode="stream")
-
-
+ # ----- internal functions ----- #
+ @staticmethod
+ def process_response(port_list, response):
+ if isinstance(port_list, list):
+ res_ok, response = response
+ log = CTRexHltApi.join_batch_response(response)
+ else:
+ res_ok = response.success
+ log = str(response)
+ return res_ok, log
+
+ @staticmethod
+ def parse_port_list(port_list):
+ if isinstance(port_list, str):
+ return [int(port)
+ for port in port_list.split()]
+ elif isinstance(port_list, list):
+ return [int(port)
+ for port in port_list]
+ else:
+ return port_list
+
+ @staticmethod
+ def join_batch_response(responses):
+ return "\n".join([str(response)
+ for response in responses])
if __name__ == "__main__":
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index b976db66..97d3ec0a 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -13,10 +13,19 @@ from common.trex_stats import *
from common.trex_streams import *
from collections import namedtuple
+RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
+
+class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
+ __slots__ = ()
+ def __str__(self):
+ return "{id:^3} - {msg} ({stat})".format(id=self.id,
+ msg=self.msg,
+ stat="success" if self.success else "fail")
+
+# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
def __init__(self, username, server="localhost", port=5050, virtual=False):
super(CTRexStatelessClient, self).__init__()
@@ -26,48 +35,69 @@ class CTRexStatelessClient(object):
self._active_ports = set()
self._stats = CTRexStatsManager("port", "stream")
self._system_info = None
+ self._server_version = None
+ self.__err_log = None
# ----- decorator methods ----- #
def force_status(owned=True, active_and_owned=False):
def wrapper(func):
def wrapper_f(self, *args, **kwargs):
port_ids = kwargs.get("port_id")
+ if not port_ids:
+ port_ids = args[0]
if isinstance(port_ids, int):
# make sure port_ids is a list
port_ids = [port_ids]
bad_ids = set()
for port_id in port_ids:
- port_owned = self._conn_handler.get(kwargs.get(port_id))
+ port_owned = self._conn_handler.get(port_id)
if owned and not port_owned:
- bad_ids.add(port_ids)
+ bad_ids.add(port_id)
elif active_and_owned: # stronger condition than just owned, hence gets precedence
if port_owned and port_id in self._active_ports:
continue
else:
- bad_ids.add(port_ids)
+ bad_ids.add(port_id)
else:
continue
if bad_ids:
# Some port IDs are not according to desires status
raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not"
- "at allowed stated".format(func.__name__))
+ "at allowed stated".format(func.__name__, list(bad_ids)))
else:
- func(self, *args, **kwargs)
+ return func(self, *args, **kwargs)
return wrapper_f
return wrapper
@property
def system_info(self):
if not self._system_info:
- self._system_info = self.get_system_info()
- return self._system_info
+ rc, info = self.get_system_info()
+ if rc:
+ self._system_info = info
+ else:
+ self.__err_log = info
+ return self._system_info if self._system_info else "Unknown"
+
+ @property
+ def server_version(self):
+ if not self._server_version:
+ rc, ver_info = self.get_version()
+ if rc:
+ self._server_version = ver_info
+ else:
+ self.__err_log = ver_info
+ return self._server_version if self._server_version else "Unknown"
# ----- user-access methods ----- #
def connect(self):
- self.comm_link.connect()
+ rc, err = self.comm_link.connect()
+ if not rc:
+ return rc, err
+ return self._init_sync()
def disconnect(self):
- self.comm_link.disconnect()
+ return self.comm_link.disconnect()
def ping(self):
return self.transmit("ping")
@@ -90,18 +120,19 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
+ commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
- self._process_batch_result(commands, resp_list, self._handle_acquire_response)
+ return self._process_batch_result(commands, resp_list, self._handle_acquire_response)
else:
params = {"port_id": port_id,
"user": self.user,
"force": force}
- command = self.RpcCmdData("acquire", params)
- self._handle_acquire_response(command, self.transmit(command.method, command.params))
- return self._conn_handler.get(port_id)
+ command = RpcCmdData("acquire", params)
+ return self._handle_acquire_response(command,
+ self.transmit(command.method, command.params),
+ self.default_success_test)
@force_status(owned=True)
def release(self, port_id=None):
@@ -110,7 +141,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -119,7 +150,7 @@ class CTRexStatelessClient(object):
self._conn_handler.pop(port_id)
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("release", params)
+ command = RpcCmdData("release", params)
self._handle_release_response(command, self.transmit(command.method, command.params))
return
@@ -143,6 +174,28 @@ class CTRexStatelessClient(object):
"stream_id": stream_id}
return self.transmit("remove_stream", params)
+ @force_status(owned=True)
+ def remove_all_streams(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response,
+ success_test=self.ack_success_test)
+ else:
+ params = {"port_id": port_id,
+ "handler": self._conn_handler.get(port_id)}
+ command = RpcCmdData("remove_all_streams", params)
+ return self._handle_remove_streams_response(command,
+ self.transmit(command.method, command.params),
+ self.ack_success_test)
+ pass
+
@force_status(owned=True, active_and_owned=True)
def get_stream_id_list(self, port_id=None):
if not self._is_ports_valid(port_id):
@@ -167,7 +220,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -175,7 +228,7 @@ class CTRexStatelessClient(object):
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("start_traffic", params)
+ command = RpcCmdData("start_traffic", params)
self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
return
@@ -186,7 +239,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -194,12 +247,12 @@ class CTRexStatelessClient(object):
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("stop_traffic", params)
+ command = RpcCmdData("stop_traffic", params)
self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
return
def get_global_stats(self):
- command = self.RpcCmdData("get_global_stats", {})
+ command = RpcCmdData("get_global_stats", {})
return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
# return self.transmit("get_global_stats")
@@ -210,7 +263,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -218,7 +271,7 @@ class CTRexStatelessClient(object):
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("get_port_stats", params)
+ command = RpcCmdData("get_port_stats", params)
return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
@force_status(owned=True, active_and_owned=True)
@@ -228,7 +281,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -236,10 +289,18 @@ class CTRexStatelessClient(object):
else:
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
- command = self.RpcCmdData("get_stream_stats", params)
+ command = RpcCmdData("get_stream_stats", params)
return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
# ----- internal methods ----- #
+ def _init_sync(self):
+ # get server version and system info
+ err = False
+ if self.server_version == "Unknown" or self.system_info == "Unknown":
+ self.disconnect()
+ return False, self.__err_log
+ return True, ""
+
def transmit(self, method_name, params={}):
return self.comm_link.transmit(method_name, params)
@@ -265,16 +326,46 @@ class CTRexStatelessClient(object):
else:
return False
+ @staticmethod
+ def ack_success_test(result_obj):
+ if result_obj.success and result_obj.data == "ACK":
+ return True
+ else:
+ return False
+
+
# ----- handler internal methods ----- #
- def _handle_acquire_response(self, request, response):
- if response.success:
- self._conn_handler[request.get("port_id")] = response.data
+ def _handle_general_response(self, request, response, msg, success_test=None):
+ port_id = request.params.get("port_id")
+ if not success_test:
+ success_test = self.default_success_test
+ if success_test(response):
+ self._conn_handler[port_id] = response.data
+ return RpcResponseStatus(True, port_id, msg)
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
- def _handle_release_response(self, request, response):
+ def _handle_acquire_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ self._conn_handler[port_id] = response.data
+ return RpcResponseStatus(True, port_id, "Acquired")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_remove_streams_response(self, request, response, success_test):
+ port_id = request.params.get("port_id")
+ if success_test(response):
+ return RpcResponseStatus(True, port_id, "Removed")
+ else:
+ return RpcResponseStatus(False, port_id, response.data)
+
+ def _handle_release_response(self, request, response, success_test):
if response.success:
del self._conn_handler[request.get("port_id")]
- def _handle_start_traffic_response(self, request, response):
+ def _handle_start_traffic_response(self, request, response, success_test):
if response.success:
self._active_ports.add(request.get("port_id"))
@@ -282,19 +373,19 @@ class CTRexStatelessClient(object):
if response.success:
self._active_ports.remove(request.get("port_id"))
- def _handle_get_global_stats_response(self, request, response):
+ def _handle_get_global_stats_response(self, request, response, success_test):
if response.success:
return CGlobalStats(**response.success)
else:
return False
- def _handle_get_port_stats_response(self, request, response):
+ def _handle_get_port_stats_response(self, request, response, success_test):
if response.success:
return CPortStats(**response.success)
else:
return False
- def _handle_get_stream_stats_response(self, request, response):
+ def _handle_get_stream_stats_response(self, request, response, success_test):
if response.success:
return CStreamStats(**response.success)
else:
@@ -311,13 +402,21 @@ class CTRexStatelessClient(object):
return False
def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
+ res_ok = True
+ responses = []
+ if isinstance(success_test, staticmethod):
+ success_test = success_test.__func__
for i, response in enumerate(resp_list):
- # testing each result with success test so that a conclusion report could be deployed in future.
- if success_test(response):
- # run handler method with its params
- handler_func(req_list[i], response)
- else:
- continue # TODO: mark in this case somehow the bad result
+ # run handler method with its params
+ processed_response = handler_func(req_list[i], response, success_test)
+ responses.append(processed_response)
+ if not processed_response.success:
+ res_ok = False
+ # else:
+ # res_ok = False # TODO: mark in this case somehow the bad result
+ # print res_ok
+ # print responses
+ return res_ok, responses
# ------ private classes ------ #
@@ -332,7 +431,11 @@ class CTRexStatelessClient(object):
def connect(self):
if not self.virtual:
- self.rpc_link.connect()
+ return self.rpc_link.connect()
+
+ def disconnect(self):
+ if not self.virtual:
+ self.rpc_link.disconnect()
def transmit(self, method_name, params={}):
if self.virtual: