summaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
authorDan Klein <danklein10@gmail.com>2015-10-07 13:47:18 +0300
committerDan Klein <danklein10@gmail.com>2015-10-07 13:47:18 +0300
commit4f286bfefa6bbb0be4cdcf1fb004c82fc334c21f (patch)
treea01524843895e9f272974835d5d20390fae40170 /scripts
parentbafc3ec4b2686cdec4ac1c33f69f7607f368d4ce (diff)
progress in TRexStatelessClient module
mainly at batching support
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py202
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py6
2 files changed, 139 insertions, 69 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 3a48c612..412bdc09 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -17,17 +17,18 @@ class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
- def __init__(self, server="localhost", port=5050, virtual=False):
+ def __init__(self, username, server="localhost", port=5050, virtual=False):
super(CTRexStatelessClient, self).__init__()
+ self.user = username
self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual)
self._conn_handler = {}
self._active_ports = set()
self._port_stats = CTRexStatsManager()
self._stream_stats = CTRexStatsManager()
-
+ self._system_info = None
# ----- decorator methods ----- #
- def force_status(owned=True, active=False):
+ def force_status(owned=True, active_and_owned=False):
def wrapper(func):
def wrapper_f(self, *args, **kwargs):
port_ids = kwargs.get("port_id")
@@ -36,52 +37,79 @@ class CTRexStatelessClient(object):
port_ids = [port_ids]
bad_ids = set()
for port_id in port_ids:
- if not self._conn_handler.get(kwargs.get(port_id)):
+ port_owned = self._conn_handler.get(kwargs.get(port_id))
+ if owned and not port_owned:
bad_ids.add(port_ids)
+ elif active_and_owned: # stronger condition than just owned, hence gets precedence
+ if port_owned and port_id in self._active_ports:
+ continue
+ else:
+ bad_ids.add(port_ids)
+ else:
+ continue
if bad_ids:
# Some port IDs are not according to desires status
- own_str = "owned" if owned else "not-owned"
- act_str = "active" if active else "non-active"
- raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not both" \
- "{2} and {3}".format(func.__name__,
- bad_ids,
- own_str,
- act_str))
+ raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" \
+ "at allowed stated".format(func.__name__))
else:
func(self, *args, **kwargs)
return wrapper_f
return wrapper
- # def owned(func):
- # def wrapper(self, *args, **kwargs):
- # if self._conn_handler.get(kwargs.get("port_id")):
- # return func(self, *args, **kwargs)
- # else:
- # raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned".
- # format(func.__name__))
- # return wrapper
+ @property
+ def system_info(self):
+ if not self._system_info:
+ self._system_info = self.get_system_info()
+ return self._system_info
# ----- user-access methods ----- #
- def acquire(self, port_id, username, force=False):
+ def ping(self):
+ return self.transmit("ping")
+
+ def get_supported_cmds(self):
+ return self.transmit("get_supported_cmds")
+
+ def get_version(self):
+ return self.transmit("get_version")
+
+ def get_system_info(self):
+ return self.transmit("get_system_info")
+
+ def get_port_count(self):
+ return self.system_info.get("port_count")
+
+ def acquire(self, port_id, force=False):
+ if not CTRexStatelessClient._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("acquire", {"port_id":p_id, "user":username, "force":force})
+ commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
+ # TODO: further processing here
else:
params = {"port_id": port_id,
- "user": username,
+ "user": self.user,
"force": force}
self._conn_handler[port_id] = self.transmit("acquire", params)
return self._conn_handler[port_id]
@force_status(owned=True)
def release(self, port_id=None):
- self._conn_handler.pop(port_id)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- return self.transmit("release", params)
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ # TODO: further processing here
+ else:
+ self._conn_handler.pop(port_id)
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ return self.transmit("release", params)
@force_status(owned=True)
def add_stream(self, stream_id, stream_obj, port_id=None):
@@ -99,45 +127,77 @@ class CTRexStatelessClient(object):
"stream_id": stream_id}
return self.transmit("remove_stream", params)
- @force_status(owned=True,active=)
+ @force_status(owned=True, active_and_owned=True)
def get_stream_id_list(self, port_id=None):
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
return self.transmit("get_stream_list", params)
- @owned
+ @force_status(owned=True, active_and_owned=True)
def get_stream(self, stream_id, port_id=None):
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id,
"stream_id": stream_id}
return self.transmit("get_stream_list", params)
- @owned
+ @force_status(owned=True)
def start_traffic(self, port_id=None):
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- return self.transmit("start_traffic", params)
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ # TODO: further processing here
+ else:
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ return self.transmit("start_traffic", params)
- @owned
+ @force_status(owned=False, active_and_owned=True)
def stop_traffic(self, port_id=None):
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- return self.transmit("stop_traffic", params)
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ # TODO: further processing here
+ else:
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ return self.transmit("stop_traffic", params)
def get_global_stats(self):
return self.transmit("get_global_stats")
- @owned
+ @force_status(owned=True, active_and_owned=True)
def get_port_stats(self, port_id=None):
- params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed
- "port_id": port_id}
- return self.transmit("get_port_stats", params)
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ # TODO: further processing here
+ else:
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ return self.transmit("get_port_stats", params)
- @owned
+ @force_status(owned=True, active_and_owned=True)
def get_stream_stats(self, port_id=None):
- params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed
- "port_id": port_id}
- return self.transmit("get_stream_stats", params)
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ # TODO: further processing here
+ else:
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ return self.transmit("get_stream_stats", params)
# ----- internal methods ----- #
def transmit(self, method_name, params={}):
@@ -148,18 +208,25 @@ class CTRexStatelessClient(object):
@staticmethod
def _object_decoder(obj_type, obj_data):
- if obj_type=="global":
+ if obj_type == "global":
return CGlobalStats(**obj_data)
- elif obj_type=="port":
+ elif obj_type == "port":
return CPortStats(**obj_data)
- elif obj_type=="stream":
+ elif obj_type == "stream":
return CStreamStats(**obj_data)
else:
# Do not serialize the data into class
return obj_data
-
-
+ def _is_ports_valid(self, port_id):
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # check each item of the sequence
+ return all([CTRexStatelessClient._is_ports_valid(port)
+ for port in port_id])
+ elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()):
+ return True
+ else:
+ return False
# ------ private classes ------ #
@@ -196,12 +263,12 @@ class CTRexStatelessClient(object):
# invoke the batch
return batch.invoke()
-
def _prompt_virtual_tx_msg(self):
print "Transmitting virtually over tcp://{server}:{port}".format(
server=self.server,
port=self.port)
+
class CStream(object):
"""docstring for CStream"""
DEFAULTS = {"rx_stats": CRxStats,
@@ -217,7 +284,9 @@ class CStream(object):
setattr(self, k, v)
# set default values to unset attributes, according to DEFAULTS dict
set_keys = set(kwargs.keys())
- keys_to_set = [x for x in self.DEFAULTS if x not in set_keys]
+ keys_to_set = [x
+ for x in self.DEFAULTS
+ if x not in set_keys]
for key in keys_to_set:
default = self.DEFAULTS.get(key)
if type(default)==type:
@@ -260,21 +329,21 @@ class CStream(object):
def dump(self):
pass
- return {"enabled":self.enabled,
- "self_start":self.self_start,
- "isg":self.isg,
- "next_stream":self.next_stream,
- "packet":self.packet.dump_pkt(),
- "mode":self.mode.dump(),
- "vm":self.packet.get_vm_data(),
- "rx_stats":self.rx_stats.dump()}
+ return {"enabled": self.enabled,
+ "self_start": self.self_start,
+ "isg": self.isg,
+ "next_stream": self.next_stream,
+ "packet": self.packet.dump_pkt(),
+ "mode": self.mode.dump(),
+ "vm": self.packet.get_vm_data(),
+ "rx_stats": self.rx_stats.dump()}
class CRxStats(object):
def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False):
- self._rx_dict = {"enabled" : enabled,
- "seq_enabled" : seq_enabled,
- "latency_enabled" : latency_enabled}
+ self._rx_dict = {"enabled": enabled,
+ "seq_enabled": seq_enabled,
+ "latency_enabled": latency_enabled}
@property
def enabled(self):
@@ -301,11 +370,12 @@ class CRxStats(object):
self._rx_dict['latency_enabled'] = bool(bool_value)
def dump(self):
- return {k:v
+ return {k: v
for k,v in self._rx_dict.items()
if v
}
+
class CTxMode(object):
"""docstring for CTxMode"""
def __init__(self, tx_mode, pps):
@@ -313,7 +383,7 @@ class CTxMode(object):
if tx_mode not in ["continuous", "single_burst", "multi_burst"]:
raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode))
self._tx_mode = tx_mode
- self._fields = {'pps':float(pps)}
+ self._fields = {'pps': float(pps)}
if tx_mode == "single_burst":
self._fields['total_pkts'] = 0
elif tx_mode == "multi_burst":
@@ -331,8 +401,8 @@ class CTxMode(object):
format(attr, self._tx_mode))
def dump(self):
- dump = {"type":self._tx_mode}
- dump.update({k:v
+ dump = {"type": self._tx_mode}
+ dump.update({k: v
for k, v in self._fields.items()
})
return dump
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index db5ddc51..b8b1734d 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -171,7 +171,7 @@ class JsonRpcClient(object):
def process_single_response (self, response_json):
if (response_json.get("jsonrpc") != "2.0"):
- return False, "Malfromed Response ({0})".format(str(response))
+ return False, "Malformed Response ({0})".format(str(response))
# error reported by server
if ("error" in response_json):
@@ -182,7 +182,7 @@ class JsonRpcClient(object):
# if no error there should be a result
if ("result" not in response_json):
- return False, "Malfromed Response ({0})".format(str(response))
+ return False, "Malformed Response ({0})".format(str(response))
return True, response_json["result"]
@@ -191,7 +191,7 @@ class JsonRpcClient(object):
def set_verbose(self, mode):
self.verbose = mode
- def disconnect (self):
+ def disconnect(self):
if self.connected:
self.socket.close(linger = 0)
self.context.destroy(linger = 0)