summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
authorYaroslav Brustinov <ybrustin@cisco.com>2015-12-13 17:18:02 +0200
committerYaroslav Brustinov <ybrustin@cisco.com>2015-12-13 17:18:02 +0200
commit9738e267d806223ee25e013b5959ccac26c1a14a (patch)
tree590c8f329f2ab68c7da3f1f8f4c55f81243a08bc /scripts/automation/trex_control_plane/client
parenta573adc6395c9ad8d96978508a07a654ef48c7a9 (diff)
parent301341ddb1bf17387d7fea19667bedd40fce4509 (diff)
Merge branch 'master' into get_logs_and_version
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py284
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_client.py30
-rwxr-xr-x[-rw-r--r--]scripts/automation/trex_control_plane/client/trex_hltapi.py297
-rw-r--r--scripts/automation/trex_control_plane/client/trex_port.py408
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py1500
5 files changed, 2108 insertions, 411 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
new file mode 100644
index 00000000..8fdf7c9b
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -0,0 +1,284 @@
+#!/router/bin/python
+
+try:
+ # support import for Python 2
+ import outer_packages
+except ImportError:
+ # support import for Python 3
+ import client.outer_packages
+from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+
+from common.text_opts import *
+
+import json
+import threading
+import time
+import datetime
+import zmq
+import re
+
+from common.trex_stats import *
+from common.trex_streams import *
+
+# basic async stats class
+class CTRexAsyncStats(object):
+ def __init__ (self):
+ self.ref_point = None
+ self.current = {}
+ self.last_update_ts = datetime.datetime.now()
+
+ @staticmethod
+ def format_num (size, suffix = ""):
+
+ for unit in ['','K','M','G','T','P']:
+ if abs(size) < 1000.0:
+ return "%3.2f %s%s" % (size, unit, suffix)
+ size /= 1000.0
+
+ return "NaN"
+
+ def update (self, snapshot):
+
+ #update
+ self.last_update_ts = datetime.datetime.now()
+
+ self.current = snapshot
+
+ if self.ref_point == None:
+ self.ref_point = self.current
+
+ def clear(self):
+ self.ref_point = self.current
+
+
+ def get(self, field, format=False, suffix=""):
+
+ if not field in self.current:
+ return "N/A"
+
+ if not format:
+ return self.current[field]
+ else:
+ return self.format_num(self.current[field], suffix)
+
+ def get_rel (self, field, format=False, suffix=""):
+ if not field in self.current:
+ return "N/A"
+
+ if not format:
+ return (self.current[field] - self.ref_point[field])
+ else:
+ return self.format_num(self.current[field] - self.ref_point[field], suffix)
+
+
+ # return true if new data has arrived in the past 2 seconds
+ def is_online (self):
+ delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000
+ return (delta_ms < 2000)
+
+# describes the general stats provided by TRex
+class CTRexAsyncStatsGeneral(CTRexAsyncStats):
+ def __init__ (self):
+ super(CTRexAsyncStatsGeneral, self).__init__()
+
+
+# per port stats
+class CTRexAsyncStatsPort(CTRexAsyncStats):
+ def __init__ (self):
+ super(CTRexAsyncStatsPort, self).__init__()
+
+ def get_stream_stats (self, stream_id):
+ return None
+
+# stats manager
+class CTRexAsyncStatsManager():
+ def __init__ (self):
+
+ self.general_stats = CTRexAsyncStatsGeneral()
+ self.port_stats = {}
+
+
+ def get_general_stats(self):
+ return self.general_stats
+
+ def get_port_stats (self, port_id):
+
+ if not str(port_id) in self.port_stats:
+ return None
+
+ return self.port_stats[str(port_id)]
+
+
+ def update(self, data):
+ self.__handle_snapshot(data)
+
+ def __handle_snapshot(self, snapshot):
+
+ general_stats = {}
+ port_stats = {}
+
+ # filter the values per port and general
+ for key, value in snapshot.iteritems():
+
+ # match a pattern of ports
+ m = re.search('(.*)\-([0-8])', key)
+ if m:
+
+ port_id = m.group(2)
+ field_name = m.group(1)
+
+ if not port_id in port_stats:
+ port_stats[port_id] = {}
+
+ port_stats[port_id][field_name] = value
+
+ else:
+ # no port match - general stats
+ general_stats[key] = value
+
+ # update the general object with the snapshot
+ self.general_stats.update(general_stats)
+
+ # update all ports
+ for port_id, data in port_stats.iteritems():
+
+ if not port_id in self.port_stats:
+ self.port_stats[port_id] = CTRexAsyncStatsPort()
+
+ self.port_stats[port_id].update(data)
+
+
+
+
+
+class CTRexAsyncClient():
+ def __init__ (self, server, port, stateless_client):
+
+ self.port = port
+ self.server = server
+ self.stateless_client = stateless_client
+
+ self.raw_snapshot = {}
+
+ self.stats = CTRexAsyncStatsManager()
+
+ self.connected = False
+
+ # connects the async channel
+ def connect (self):
+
+ if self.connected:
+ self.disconnect()
+
+ self.tr = "tcp://{0}:{1}".format(self.server, self.port)
+ print "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+
+ # before running the thread - mark as active
+ self.active = True
+ self.alive = False
+ self.t = threading.Thread(target = self._run)
+
+ # kill this thread on exit and don't add it to the join list
+ self.t.setDaemon(True)
+ self.t.start()
+
+ self.connected = True
+
+
+ # wait for data streaming from the server
+ timeout = time.time() + 5
+ while not self.alive:
+ time.sleep(0.01)
+ if time.time() > timeout:
+ self.disconnect()
+ return False, "*** [subscriber] - no data flow from server at : " + self.tr
+
+ return True, ""
+
+
+ # disconnect
+ def disconnect (self):
+ if not self.connected:
+ return
+
+ # signal that the context was destroyed (exit the thread loop)
+ self.context.term()
+
+ # mark for join and join
+ self.active = False
+ self.t.join()
+
+ # done
+ self.connected = False
+
+ # thread function
+ def _run (self):
+
+ # no data yet...
+ self.alive = False
+
+ # socket must be created on the same thread
+ self.socket.connect(self.tr)
+ self.socket.setsockopt(zmq.SUBSCRIBE, '')
+ self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+
+ while self.active:
+ try:
+
+ line = self.socket.recv_string()
+
+ if not self.alive:
+ self.stateless_client.on_async_alive()
+ self.alive = True
+
+ # got a timeout - mark as not alive and retry
+ except zmq.Again:
+
+ if self.alive:
+ self.stateless_client.on_async_dead()
+ self.alive = False
+
+ continue
+
+ except zmq.ContextTerminated:
+ # outside thread signaled us to exit
+ self.alive = False
+ break
+
+ msg = json.loads(line)
+
+ name = msg['name']
+ data = msg['data']
+ type = msg['type']
+ self.raw_snapshot[name] = data
+
+ self.__dispatch(name, type, data)
+
+
+ # closing of socket must be from the same thread
+ self.socket.close(linger = 0)
+
+
+ def get_stats (self):
+ return self.stats
+
+ def get_raw_snapshot (self):
+ return self.raw_snapshot
+
+ # dispatch the message to the right place
+ def __dispatch (self, name, type, data):
+ # stats
+ if name == "trex-global":
+ self.stateless_client.handle_async_stats_update(data)
+ # events
+ elif name == "trex-event":
+ self.stateless_client.handle_async_event(type, data)
+ else:
+ pass
+
+
diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py
index 160abdec..77b11c37 100755
--- a/scripts/automation/trex_control_plane/client/trex_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_client.py
@@ -131,7 +131,9 @@ class CTRexClient(object):
raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.')
trex_cmd_options.update( {'f' : f, 'd' : d} )
-
+ if not trex_cmd_options.get('l'):
+ self.result_obj.latency_checked = False
+
self.result_obj.clear_results()
try:
issue_time = time.time()
@@ -767,6 +769,7 @@ class CTRexResult(object):
"""
self._history = deque(maxlen = max_history_size)
self.clear_results()
+ self.latency_checked = True
def __repr__(self):
return ("Is valid history? {arg}\n".format( arg = self.is_valid_hist() ) +
@@ -1032,18 +1035,19 @@ class CTRexResult(object):
self._done_warmup = True
# handle latency data
- latency_pre = "trex-latency"
- self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC
- # support old typo
- if self._max_latency is None:
- latency_pre = "trex-latecny"
- self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")
-
- self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC
- self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency)
-
- avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-")
- self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list)
+ if self.latency_checked:
+ latency_pre = "trex-latency"
+ self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC
+ # support old typo
+ if self._max_latency is None:
+ latency_pre = "trex-latecny"
+ self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")
+
+ self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC
+ self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency)
+
+ avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-")
+ self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list)
tx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_tx_pkts")
rx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_rx_pkts")
diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py
index 46c283f8..92768ca4 100644..100755
--- a/scripts/automation/trex_control_plane/client/trex_hltapi.py
+++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py
@@ -2,16 +2,305 @@
import trex_root_path
from client_utils.packet_builder import CTRexPktBuilder
+from trex_stateless_client import CTRexStatelessClient
+from common.trex_streams import *
+from client_utils.general_utils import id_count_gen
+import dpkt
-print "done!"
class CTRexHltApi(object):
def __init__(self):
- pass
+ self.trex_client = None
+ self.connected = False
+ # self._stream_db = CStreamList()
+ self._port_data = {}
+
+ # ----- session functions ----- #
+
+ def connect(self, device, port_list, username, port=5050, reset=False, break_locks=False):
+ ret_dict = {"status": 0}
+ self.trex_client = CTRexStatelessClient(username, device, port)
+ res_ok, msg = self.trex_client.connect()
+ if not res_ok:
+ self.trex_client = None
+ ret_dict.update({"log": msg})
+ return ret_dict
+ # arrived here, connection successfully created with server
+ # next, try acquiring ports of TRex
+ port_list = self.parse_port_list(port_list)
+ response = self.trex_client.acquire(port_list, force=break_locks)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if not res_ok:
+ self.trex_client.disconnect()
+ self.trex_client = None
+ ret_dict.update({"log": log})
+ # TODO: should revert taken ports?!
+ return ret_dict
+ # arrived here, all desired ports were successfully acquired
+ print log
+ if reset:
+ # remove all port traffic configuration from TRex
+ response = self.trex_client.remove_all_streams(port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if not res_ok:
+ self.trex_client.disconnect()
+ self.trex_client = None
+ ret_dict.update({"log": log})
+ return ret_dict
+ print log
+ port_handle = {device: {port: port # since only supporting single TRex at the moment, 1:1 map
+ for port in port_list}
+ }
+ ret_dict.update({"status": 1,
+ "log": None,
+ "port_handle": port_handle,
+ "offline": 0}) # ports are online
+ self.connected = True
+ self._port_data_init(port_list)
+ return ret_dict
+
+ def cleanup_session(self, port_list, maintain_lock=False):
+ ret_dict = {"status": 0}
+ if not maintain_lock:
+ # release taken ports
+ if port_list == "all":
+ port_list = self.trex_client.get_acquired_ports()
+ else:
+ port_list = self.parse_port_list(port_list)
+ response = self.trex_client.release(port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ print log
+ if not res_ok:
+ ret_dict.update({"log": log})
+ return ret_dict
+ ret_dict.update({"status": 1,
+ "log": None})
+ self.trex_client.disconnect()
+ self.trex_client = None
+ self.connected = False
+ return ret_dict
+
+ def interface_config(self, port_handle, mode="config"):
+ ALLOWED_MODES = ["config", "modify", "destroy"]
+ if mode not in ALLOWED_MODES:
+ raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES))
+ # pass this function for now...
+ return {"status": 1, "log": None}
+
+ # ----- traffic functions ----- #
+ def traffic_config(self, mode, port_handle,
+ l2_encap="ethernet_ii", mac_src="00:00:01:00:00:01", mac_dst="00:00:00:00:00:00",
+ l3_protocol="ipv4", ip_src_addr="0.0.0.0", ip_dst_addr="192.0.0.1", l3_length=110,
+ transmit_mode="continuous", rate_pps=100,
+ **kwargs):
+ ALLOWED_MODES = ["create", "modify", "remove", "enable", "disable", "reset"]
+ if mode not in ALLOWED_MODES:
+ raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES))
+ if mode == "create":
+ # create a new stream with desired attributes, starting by creating packet
+ try:
+ packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst,
+ l3_protocol, ip_src_addr, ip_dst_addr, l3_length)
+ # set transmission attributes
+ tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs)
+ # set rx_stats
+ rx_stats = CRxStats() # defaults with disabled
+ # join the generated data into stream
+ stream_obj = CStream()
+ stream_obj_params = {"enabled": True,
+ "self_start": True,
+ "next_stream_id": -1,
+ "isg": 0.0,
+ "mode": tx_mode,
+ "rx_stats": rx_stats,
+ "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed
+ stream_obj.load_data(**stream_obj_params)
+ except Exception as e:
+ # some exception happened during the stream creation
+ return {"status": 0, "log": str(e)}
+ # try adding the stream, until free stream_id is found
+ port_data = self._port_data.get(port_handle)
+ id_candidate = None
+ # TODO: change this to better implementation
+ while True:
+ id_candidate = port_data["stream_id_gen"].next()
+ response = self.trex_client.add_stream(stream_id=id_candidate,
+ stream_obj=stream_obj,
+ port_id=port_handle)
+ res_ok, log = CTRexHltApi.process_response(port_handle, response)
+ if res_ok:
+ # found non-taken stream_id on server
+ # save it for modifying needs
+ port_data["streams"].update({id_candidate: stream_obj})
+ break
+ else:
+ # proceed to another iteration to use another id
+ continue
+ return {"status": 1,
+ "stream_id": id_candidate,
+ "log": None}
+ else:
+ raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode))
+
+ def traffic_control(self, action, port_handle):
+ ALLOWED_ACTIONS = ["clear_stats", "run", "stop", "sync_run"]
+ if action not in ALLOWED_ACTIONS:
+ raise ValueError("action must be one of the following values: {actions}".format(actions=ALLOWED_ACTIONS))
+ # ret_dict = {"status": 0, "stopped": 1}
+ port_list = self.parse_port_list(port_handle)
+ if action == "run":
+ response = self.trex_client.start_traffic(port_id=port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if res_ok:
+ return {"status": 1,
+ "stopped": 0,
+ "log": None}
+ elif action == "stop":
+ response = self.trex_client.stop_traffic(port_id=port_list)
+ res_ok, log = CTRexHltApi.process_response(port_list, response)
+ if res_ok:
+ return {"status": 1,
+ "stopped": 1,
+ "log": None}
+ else:
+ raise NotImplementedError("action '{0}' is not supported yet on TRex".format(action))
+
+ # if we arrived here, this means that operation FAILED!
+ return {"status": 0,
+ "log": log}
+
+
+ def traffic_stats(self, port_handle, mode):
+ ALLOWED_MODES = ["aggregate", "streams", "all"]
+ if mode not in ALLOWED_MODES:
+ raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES))
+ # pass this function for now...
+ if mode == "aggregate":
+ # create a new stream with desired attributes, starting by creating packet
+ try:
+ packet = CTRexHltApi.generate_stream(l2_encap, mac_src, mac_dst,
+ l3_protocol, ip_src_addr, ip_dst_addr, l3_length)
+ # set transmission attributes
+ tx_mode = CTxMode(transmit_mode, rate_pps, **kwargs)
+ # set rx_stats
+ rx_stats = CRxStats() # defaults with disabled
+ # join the generated data into stream
+ stream_obj = CStream()
+ stream_obj_params = {"enabled": True,
+ "self_start": True,
+ "next_stream_id": -1,
+ "isg": 0.0,
+ "mode": tx_mode,
+ "rx_stats": rx_stats,
+ "packet": packet} # vm is excluded from this list since CTRexPktBuilder obj is passed
+ stream_obj.load_data(**stream_obj_params)
+ except Exception as e:
+ # some exception happened during the stream creation
+ return {"status": 0, "log": str(e)}
+ # try adding the stream, until free stream_id is found
+ port_data = self._port_data.get(port_handle)
+ id_candidate = None
+ # TODO: change this to better implementation
+ while True:
+ id_candidate = port_data["stream_id_gen"].next()
+ response = self.trex_client.add_stream(stream_id=id_candidate,
+ stream_obj=stream_obj,
+ port_id=port_handle)
+ res_ok, log = CTRexHltApi.process_response(port_handle, response)
+ if res_ok:
+ # found non-taken stream_id on server
+ # save it for modifying needs
+ port_data["streams"].update({id_candidate: stream_obj})
+ break
+ else:
+ # proceed to another iteration to use another id
+ continue
+ return {"status": 1,
+ "stream_id": id_candidate,
+ "log": None}
+ else:
+ raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode))
+
+ def get_aggregate_port_stats(self, port_handle):
+ return self.traffic_stats(port_handle, mode="aggregate")
+
+ def get_stream_stats(self, port_handle):
+ return self.traffic_stats(port_handle, mode="streams")
+
+ # ----- internal functions ----- #
+ def _port_data_init(self, port_list):
+ for port in port_list:
+ self._port_data[port] = {"stream_id_gen": id_count_gen(),
+ "streams": {}}
+
+ @staticmethod
+ def process_response(port_list, response):
+ if isinstance(port_list, list):
+ res_ok, response = response
+ log = CTRexHltApi.join_batch_response(response)
+ else:
+ res_ok = response.success
+ log = str(response)
+ return res_ok, log
+
+ @staticmethod
+ def parse_port_list(port_list):
+ if isinstance(port_list, str):
+ return [int(port)
+ for port in port_list.split()]
+ elif isinstance(port_list, list):
+ return [int(port)
+ for port in port_list]
+ else:
+ return port_list
+
+ @staticmethod
+ def join_batch_response(responses):
+ return "\n".join([str(response)
+ for response in responses])
+
+ @staticmethod
+ def generate_stream(l2_encap, mac_src, mac_dst,
+ l3_protocol, ip_src_addr, ip_dst_addr, l3_length):
+ ALLOWED_L3_PROTOCOL = {"ipv4": dpkt.ethernet.ETH_TYPE_IP,
+ "ipv6": dpkt.ethernet.ETH_TYPE_IP6,
+ "arp": dpkt.ethernet.ETH_TYPE_ARP}
+ ALLOWED_L4_PROTOCOL = {"tcp": dpkt.ip.IP_PROTO_TCP,
+ "udp": dpkt.ip.IP_PROTO_UDP,
+ "icmp": dpkt.ip.IP_PROTO_ICMP,
+ "icmpv6": dpkt.ip.IP_PROTO_ICMP6,
+ "igmp": dpkt.ip.IP_PROTO_IGMP,
+ "rtp": dpkt.ip.IP_PROTO_IRTP,
+ "isis": dpkt.ip.IP_PROTO_ISIS,
+ "ospf": dpkt.ip.IP_PROTO_OSPF}
+
+ pkt_bld = CTRexPktBuilder()
+ if l2_encap == "ethernet_ii":
+ pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet())
+ # set Ethernet layer attributes
+ pkt_bld.set_eth_layer_addr("l2", "src", mac_src)
+ pkt_bld.set_eth_layer_addr("l2", "dst", mac_dst)
+ else:
+ raise NotImplementedError("l2_encap does not support the desired encapsulation '{0}'".format(l2_encap))
+ # set l3 on l2
+ if l3_protocol not in ALLOWED_L3_PROTOCOL:
+ raise ValueError("l3_protocol must be one of the following: {0}".format(ALLOWED_L3_PROTOCOL))
+ pkt_bld.set_layer_attr("l2", "type", ALLOWED_L3_PROTOCOL[l3_protocol])
+
+ # set l3 attributes
+ if l3_protocol == "ipv4":
+ pkt_bld.add_pkt_layer("l3", dpkt.ip.IP())
+ pkt_bld.set_ip_layer_addr("l3", "src", ip_src_addr)
+ pkt_bld.set_ip_layer_addr("l3", "dst", ip_dst_addr)
+ pkt_bld.set_layer_attr("l3", "len", l3_length)
+ else:
+ raise NotImplementedError("l3_protocol '{0}' is not supported by TRex yet.".format(l3_protocol))
+
+ pkt_bld.dump_pkt_to_pcap("stream_test.pcap")
+ return pkt_bld
+
- def config_traffic(self):
- pass
if __name__ == "__main__":
pass
diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py
new file mode 100644
index 00000000..5c5702dd
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client/trex_port.py
@@ -0,0 +1,408 @@
+
+from collections import namedtuple
+from common.trex_types import *
+from common import trex_stats
+
+# describes a single port
+class Port(object):
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+ PortState = namedtuple('PortState', ['state_id', 'state_name'])
+ STATES_MAP = {STATE_DOWN: "DOWN",
+ STATE_IDLE: "IDLE",
+ STATE_STREAMS: "IDLE",
+ STATE_TX: "ACTIVE",
+ STATE_PAUSE: "PAUSE"}
+
+
+ def __init__ (self, port_id, speed, driver, user, comm_link):
+ self.port_id = port_id
+ self.state = self.STATE_IDLE
+ self.handler = None
+ self.comm_link = comm_link
+ self.transmit = comm_link.transmit
+ self.transmit_batch = comm_link.transmit_batch
+ self.user = user
+ self.driver = driver
+ self.speed = speed
+ self.streams = {}
+ self.profile = None
+
+ self.port_stats = trex_stats.CPortStats(self)
+
+
+ def err(self, msg):
+ return RC_ERR("port {0} : {1}".format(self.port_id, msg))
+
+ def ok(self, data = "ACK"):
+ return RC_OK(data)
+
+ def get_speed_bps (self):
+ return (self.speed * 1000 * 1000 * 1000)
+
+ # take the port
+ def acquire(self, force = False):
+ params = {"port_id": self.port_id,
+ "user": self.user,
+ "force": force}
+
+ command = RpcCmdData("acquire", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return self.ok()
+ else:
+ return self.err(rc.data)
+
+ # release the port
+ def release(self):
+ params = {"port_id": self.port_id,
+ "handler": self.handler}
+
+ command = RpcCmdData("release", params)
+ rc = self.transmit(command.method, command.params)
+ self.handler = None
+
+ if rc.success:
+ return self.ok()
+ else:
+ return self.err(rc.data)
+
+ def is_acquired(self):
+ return (self.handler != None)
+
+ def is_active(self):
+ return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
+
+ def is_transmitting (self):
+ return (self.state == self.STATE_TX)
+
+ def is_paused (self):
+ return (self.state == self.STATE_PAUSE)
+
+
+ def sync(self):
+ params = {"port_id": self.port_id}
+
+ command = RpcCmdData("get_port_status", params)
+ rc = self.transmit(command.method, command.params)
+ if not rc.success:
+ return self.err(rc.data)
+
+ # sync the port
+ port_state = rc.data['state']
+
+ if port_state == "DOWN":
+ self.state = self.STATE_DOWN
+ elif port_state == "IDLE":
+ self.state = self.STATE_IDLE
+ elif port_state == "STREAMS":
+ self.state = self.STATE_STREAMS
+ elif port_state == "TX":
+ self.state = self.STATE_TX
+ elif port_state == "PAUSE":
+ self.state = self.STATE_PAUSE
+ else:
+ raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
+
+ return self.ok()
+
+
+ # return TRUE if write commands
+ def is_port_writable (self):
+ # operations on port can be done on state idle or state streams
+ return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
+
+ # add stream to the port
+ def add_stream (self, stream_id, stream_obj):
+
+ if not self.is_port_writable():
+ return self.err("Please stop port before attempting to add streams")
+
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id,
+ "stream": stream_obj}
+
+ rc, data = self.transmit("add_stream", params)
+ if not rc:
+ r = self.err(data)
+ print r.good()
+
+ # add the stream
+ self.streams[stream_id] = stream_obj
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ # add multiple streams
+ def add_streams (self, streams_list):
+ batch = []
+
+ for stream in streams_list:
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream.stream_id,
+ "stream": stream.stream}
+
+ cmd = RpcCmdData('add_stream', params)
+ batch.append(cmd)
+
+ rc, data = self.transmit_batch(batch)
+
+ if not rc:
+ return self.err(data)
+
+ # add the stream
+ for stream in streams_list:
+ self.streams[stream.stream_id] = stream.stream
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ # remove stream from port
+ def remove_stream (self, stream_id):
+
+ if not stream_id in self.streams:
+ return self.err("stream {0} does not exists".format(stream_id))
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id}
+
+
+ rc, data = self.transmit("remove_stream", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams[stream_id] = None
+
+ self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE
+
+ return self.ok()
+
+ # remove all the streams
+ def remove_all_streams (self):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("remove_all_streams", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams = {}
+
+ self.state = self.STATE_IDLE
+
+ return self.ok()
+
+ # get a specific stream
+ def get_stream (self, stream_id):
+ if stream_id in self.streams:
+ return self.streams[stream_id]
+ else:
+ return None
+
+ def get_all_streams (self):
+ return self.streams
+
+ # start traffic
+ def start (self, mul, duration):
+ if self.state == self.STATE_DOWN:
+ return self.err("Unable to start traffic - port is down")
+
+ if self.state == self.STATE_IDLE:
+ return self.err("Unable to start traffic - no streams attached to port")
+
+ if self.state == self.STATE_TX:
+ return self.err("Unable to start traffic - port is already transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul,
+ "duration": duration}
+
+ rc, data = self.transmit("start_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+ # stop traffic
+ # with force ignores the cached state and sends the command
+ def stop (self, force = False):
+
+ if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("stop_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ def pause (self):
+
+ if (self.state != self.STATE_TX) :
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("pause_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_PAUSE
+
+ return self.ok()
+
+
+ def resume (self):
+
+ if (self.state != self.STATE_PAUSE) :
+ return self.err("port is not in pause mode")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("resume_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+
+ def update (self, mul):
+ if (self.state != self.STATE_TX) :
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul}
+
+ rc, data = self.transmit("update_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ return self.ok()
+
+
+ def validate (self):
+
+ if (self.state == self.STATE_DOWN):
+ return self.err("port is down")
+
+ if (self.state == self.STATE_IDLE):
+ return self.err("no streams attached to port")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("validate", params)
+ if not rc:
+ return self.err(data)
+
+ self.profile = data
+
+ return self.ok()
+
+ def get_profile (self):
+ return self.profile
+
+
+ def print_profile (self, mult, duration):
+ if not self.get_profile():
+ return
+
+ rate = self.get_profile()['rate']
+ graph = self.get_profile()['graph']
+
+ print format_text("Profile Map Per Port\n", 'underline', 'bold')
+
+ factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util'])
+
+ print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"),
+ format_num(rate['max_bps'] * factor, suffix = "bps"))
+
+ print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
+ format_num(rate['max_pps'] * factor, suffix = "pps"),)
+
+ print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100),
+ format_percentage(rate['max_line_util'] * factor * 100))
+
+
+ # duration
+ exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
+ exp_time_factor_sec = exp_time_base_sec / factor
+
+ # user configured a duration
+ if duration > 0:
+ if exp_time_factor_sec > 0:
+ exp_time_factor_sec = min(exp_time_factor_sec, duration)
+ else:
+ exp_time_factor_sec = duration
+
+
+ print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec),
+ format_time(exp_time_factor_sec))
+ print "\n"
+
+
+ def get_port_state_name(self):
+ return self.STATES_MAP.get(self.state, "Unknown")
+
+ ################# stats handler ######################
+ def generate_port_stats(self):
+ return self.port_stats.generate_stats()
+ pass
+
+ def generate_port_status(self):
+ return {"port-type": self.driver,
+ "maximum": "{speed} Gb/s".format(speed=self.speed),
+ "port-status": self.get_port_state_name()
+ }
+
+ def clear_stats(self):
+ return self.port_stats.clear_stats()
+
+
+ ################# events handler ######################
+ def async_event_port_stopped (self):
+ self.state = self.STATE_STREAMS
+
+
+ def async_event_port_started (self):
+ self.state = self.STATE_TX
+
+
+ def async_event_port_paused (self):
+ self.state = self.STATE_PAUSE
+
+
+ def async_event_port_resumed (self):
+ self.state = self.STATE_TX
+
+ def async_event_forced_acquired (self):
+ self.handler = None
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 334496d1..a2b1f6d9 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -6,324 +6,1175 @@ try:
except ImportError:
# support import for Python 3
import client.outer_packages
+
from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
from client_utils.packet_builder import CTRexPktBuilder
import json
-from common.trex_stats import *
+
+from common.trex_streams import *
from collections import namedtuple
+from common.text_opts import *
+from common import trex_stats
+from client_utils import parsing_opts, text_tables
+import time
+import datetime
+import re
+import random
+from trex_port import Port
+from common.trex_types import *
+
+from trex_async_client import CTRexAsyncClient
+
+
+########## utlity ############
+def mult_to_factor (mult, max_bps, max_pps, line_util):
+ if mult['type'] == 'raw':
+ return mult['value']
+
+ if mult['type'] == 'bps':
+ return mult['value'] / max_bps
+
+ if mult['type'] == 'pps':
+ return mult['value'] / max_pps
+
+ if mult['type'] == 'percentage':
+ return mult['value'] / line_util
+
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
- def __init__(self, username, server="localhost", port=5050, virtual=False):
+ # verbose levels
+ VERBOSE_SILENCE = 0
+ VERBOSE_REGULAR = 1
+ VERBOSE_HIGH = 2
+
+ def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
+
self.user = username
- self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual)
- self._conn_handler = {}
- self._active_ports = set()
- self._stats = CTRexStatsManager("port", "stream")
- self._system_info = None
-
- # ----- decorator methods ----- #
- def force_status(owned=True, active_and_owned=False):
- def wrapper(func):
- def wrapper_f(self, *args, **kwargs):
- port_ids = kwargs.get("port_id")
- if isinstance(port_ids, int):
- # make sure port_ids is a list
- port_ids = [port_ids]
- bad_ids = set()
- for port_id in port_ids:
- port_owned = self._conn_handler.get(kwargs.get(port_id))
- if owned and not port_owned:
- bad_ids.add(port_ids)
- elif active_and_owned: # stronger condition than just owned, hence gets precedence
- if port_owned and port_id in self._active_ports:
- continue
- else:
- bad_ids.add(port_ids)
- else:
- continue
- if bad_ids:
- # Some port IDs are not according to desires status
- raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not"
- "at allowed stated".format(func.__name__))
+
+ self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
+
+ # default verbose level
+ self.verbose = self.VERBOSE_REGULAR
+
+ self.ports = {}
+ self._connection_info = {"server": server,
+ "sync_port": sync_port,
+ "async_port": async_port}
+ self.system_info = {}
+ self.server_version = {}
+ self.__err_log = None
+
+ self.async_client = CTRexAsyncClient(server, async_port, self)
+
+ self.streams_db = CStreamsDB()
+ self.global_stats = trex_stats.CGlobalStats(self._connection_info,
+ self.server_version,
+ self.ports)
+ self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats,
+ self.ports)
+
+ self.events = []
+
+
+ self.read_only = False
+ self.connected = False
+
+
+
+ # returns the port object
+ def get_port (self, port_id):
+ return self.ports.get(port_id, None)
+
+
+ def get_server (self):
+ return self.comm_link.get_server()
+
+ ################# events handler ######################
+ def add_event_log (self, msg, ev_type, show = False):
+
+ if ev_type == "server":
+ prefix = "[server]"
+ elif ev_type == "local":
+ prefix = "[local]"
+
+ ts = time.time()
+ st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
+ self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold')))
+
+ if show and self.check_verbose(self.VERBOSE_REGULAR):
+ print format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))
+
+
+ def handle_async_stats_update(self, dump_data):
+ global_stats = {}
+ port_stats = {}
+
+ # filter the values per port and general
+ for key, value in dump_data.iteritems():
+ # match a pattern of ports
+ m = re.search('(.*)\-([0-8])', key)
+ if m:
+ port_id = int(m.group(2))
+ field_name = m.group(1)
+ if self.ports.has_key(port_id):
+ if not port_id in port_stats:
+ port_stats[port_id] = {}
+ port_stats[port_id][field_name] = value
else:
- func(self, *args, **kwargs)
- return wrapper_f
- return wrapper
+ continue
+ else:
+ # no port match - general stats
+ global_stats[key] = value
- @property
- def system_info(self):
- if not self._system_info:
- self._system_info = self.get_system_info()
- return self._system_info
+ # update the general object with the snapshot
+ self.global_stats.update(global_stats)
+ # update all ports
+ for port_id, data in port_stats.iteritems():
+ self.ports[port_id].port_stats.update(data)
- # ----- user-access methods ----- #
- def ping(self):
- return self.transmit("ping")
+
+
+ def handle_async_event (self, type, data):
+ # DP stopped
+
+ show_event = False
+
+ # port started
+ if (type == 0):
+ port_id = int(data['port_id'])
+ ev = "Port {0} has started".format(port_id)
+ self.async_event_port_started(port_id)
+
+ # port stopped
+ elif (type == 1):
+ port_id = int(data['port_id'])
+ ev = "Port {0} has stopped".format(port_id)
+
+ # call the handler
+ self.async_event_port_stopped(port_id)
+
+
+ # port paused
+ elif (type == 2):
+ port_id = int(data['port_id'])
+ ev = "Port {0} has paused".format(port_id)
+
+ # call the handler
+ self.async_event_port_paused(port_id)
+
+ # port resumed
+ elif (type == 3):
+ port_id = int(data['port_id'])
+ ev = "Port {0} has resumed".format(port_id)
+
+ # call the handler
+ self.async_event_port_resumed(port_id)
+
+ # port finished traffic
+ elif (type == 4):
+ port_id = int(data['port_id'])
+ ev = "Port {0} job done".format(port_id)
+
+ # call the handler
+ self.async_event_port_stopped(port_id)
+ show_event = True
+
+ # port was stolen...
+ elif (type == 5):
+ port_id = int(data['port_id'])
+ ev = "Port {0} was forcely taken".format(port_id)
+
+ # call the handler
+ self.async_event_port_forced_acquired(port_id)
+ show_event = True
+
+ # server stopped
+ elif (type == 100):
+ ev = "Server has stopped"
+ self.async_event_server_stopped()
+ show_event = True
+
+
+ else:
+ # unknown event - ignore
+ return
+
+
+ self.add_event_log(ev, 'server', show_event)
+
+
+ def async_event_port_stopped (self, port_id):
+ self.ports[port_id].async_event_port_stopped()
+
+
+ def async_event_port_started (self, port_id):
+ self.ports[port_id].async_event_port_started()
+
+
+ def async_event_port_paused (self, port_id):
+ self.ports[port_id].async_event_port_paused()
+
+
+ def async_event_port_resumed (self, port_id):
+ self.ports[port_id].async_event_port_resumed()
+
+
+ def async_event_port_forced_acquired (self, port_id):
+ self.ports[port_id].async_event_forced_acquired()
+ self.read_only = True
+
+ def async_event_server_stopped (self):
+ self.connected = False
+
+
+ def get_events (self):
+ return self.events
+
+ def clear_events (self):
+ self.events = []
+
+ ############# helper functions section ##############
+
+ # measure time for functions
+ def timing(f):
+ def wrap(*args):
+ time1 = time.time()
+ ret = f(*args)
+
+ # don't want to print on error
+ if ret.bad():
+ return ret
+
+ delta = time.time() - time1
+ print format_time(delta) + "\n"
+
+ return ret
+
+ return wrap
+
+
+ def validate_port_list(self, port_id_list):
+ if not isinstance(port_id_list, list):
+ print type(port_id_list)
+ return False
+
+ # check each item of the sequence
+ return all([ (port_id >= 0) and (port_id < self.get_port_count())
+ for port_id in port_id_list ])
+
+ # some preprocessing for port argument
+ def __ports (self, port_id_list):
+
+ # none means all
+ if port_id_list == None:
+ return range(0, self.get_port_count())
+
+ # always list
+ if isinstance(port_id_list, int):
+ port_id_list = [port_id_list]
+
+ if not isinstance(port_id_list, list):
+ raise ValueError("bad port id list: {0}".format(port_id_list))
+
+ for port_id in port_id_list:
+ if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
+ raise ValueError("bad port id {0}".format(port_id))
+
+ return port_id_list
+
+ ############ boot up section ################
+
+ # connection sequence
+
+ # mode can be RW - read / write, RWF - read write with force , RO - read only
+ def connect(self, mode = "RW"):
+
+ if self.is_connected():
+ self.disconnect()
+
+ # clear this flag
+ self.connected = False
+
+ # connect sync channel
+ rc, data = self.comm_link.connect()
+ if not rc:
+ return RC_ERR(data)
+
+ # connect async channel
+ rc, data = self.async_client.connect()
+ if not rc:
+ return RC_ERR(data)
+
+ # version
+ rc, data = self.transmit("get_version")
+ if not rc:
+ return RC_ERR(data)
+
+ self.server_version = data
+ self.global_stats.server_version = data
+
+ # cache system info
+ rc, data = self.transmit("get_system_info")
+ if not rc:
+ return RC_ERR(data)
+ self.system_info = data
+
+ # cache supported commands
+ rc, data = self.transmit("get_supported_cmds")
+ if not rc:
+ return RC_ERR(data)
+
+ self.supported_cmds = data
+
+ # create ports
+ for port_id in xrange(self.get_port_count()):
+ speed = self.system_info['ports'][port_id]['speed']
+ driver = self.system_info['ports'][port_id]['driver']
+
+ self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link)
+
+
+ # sync the ports
+ rc = self.sync_ports()
+ if rc.bad():
+ return rc
+
+ # acquire all ports
+ if mode == "RW":
+ rc = self.acquire(force = False)
+
+ # fallback to read only if failed
+ if rc.bad():
+ rc.annotate(show_status = False)
+ print format_text("Switching to read only mode - only few commands will be available", 'bold')
+
+ self.release(self.get_acquired_ports())
+ self.read_only = True
+ else:
+ self.read_only = False
+
+ elif mode == "RWF":
+ rc = self.acquire(force = True)
+ if rc.bad():
+ return rc
+ self.read_only = False
+
+ elif mode == "RO":
+ # no acquire on read only
+ rc = RC_OK()
+ self.read_only = True
+
+
+
+ self.connected = True
+ return RC_OK()
+
+
+ def is_read_only (self):
+ return self.read_only
+
+ def is_connected (self):
+ return self.connected and self.comm_link.is_connected
+
+
+ def disconnect(self):
+ # release any previous acquired ports
+ if self.is_connected():
+ self.release(self.get_acquired_ports())
+
+ self.comm_link.disconnect()
+ self.async_client.disconnect()
+
+ self.connected = False
+
+ return RC_OK()
+
+
+ def on_async_dead (self):
+ if self.connected:
+ msg = 'lost connection to server'
+ self.add_event_log(msg, 'local', True)
+ self.connected = False
+
+ def on_async_alive (self):
+ pass
+
+ ########### cached queries (no server traffic) ###########
def get_supported_cmds(self):
- return self.transmit("get_supported_cmds")
+ return self.supported_cmds
def get_version(self):
- return self.transmit("get_version")
+ return self.server_version
def get_system_info(self):
- return self.transmit("get_system_info")
+ return self.system_info
def get_port_count(self):
return self.system_info.get("port_count")
- def acquire(self, port_id, force=False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_acquire_response)
- else:
- params = {"port_id": port_id,
- "user": self.user,
- "force": force}
- command = self.RpcCmdData("acquire", params)
- self._handle_acquire_response(command, self.transmit(command.method, command.params))
- return self._conn_handler.get(port_id)
-
- @force_status(owned=True)
- def release(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_release_response)
+ def get_port_ids(self, as_str=False):
+ port_ids = range(self.get_port_count())
+ if as_str:
+ return " ".join(str(p) for p in port_ids)
else:
- self._conn_handler.pop(port_id)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = self.RpcCmdData("release", params)
- self._handle_release_response(command, self.transmit(command.method, command.params))
- return
+ return port_ids
- @force_status(owned=True)
- def add_stream(self, stream_id, stream_obj, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- assert isinstance(stream_obj, CStream)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id,
- "stream": stream_obj.dump()}
- return self.transmit("add_stream", params)
-
- @force_status(owned=True)
- def remove_stream(self, stream_id, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id}
- return self.transmit("remove_stream", params)
-
- @force_status(owned=True, active_and_owned=True)
- def get_stream_id_list(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- return self.transmit("get_stream_list", params)
-
- @force_status(owned=True, active_and_owned=True)
- def get_stream(self, stream_id, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id}
- return self.transmit("get_stream_list", params)
-
- @force_status(owned=True)
- def start_traffic(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_start_traffic_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = self.RpcCmdData("start_traffic", params)
- self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
- return
+ def get_stats_async (self):
+ return self.async_client.get_stats()
+
+ def get_connection_port (self):
+ return self.comm_link.port
+
+ def get_connection_ip (self):
+ return self.comm_link.server
- @force_status(owned=False, active_and_owned=True)
- def stop_traffic(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response)
+ def get_all_ports (self):
+ return [port_id for port_id, port_obj in self.ports.iteritems()]
+
+ def get_acquired_ports(self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_acquired()]
+
+ def get_active_ports(self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_active()]
+
+ def get_paused_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_paused()]
+
+ def get_transmitting_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_transmitting()]
+
+ def set_verbose(self, mode):
+
+ # on high - enable link verbose
+ if mode == self.VERBOSE_HIGH:
+ self.comm_link.set_verbose(True)
else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = self.RpcCmdData("stop_traffic", params)
- self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
- return
+ self.comm_link.set_verbose(False)
+
+ self.verbose = mode
+
+
+ def check_verbose (self, level):
+ return (self.verbose >= level)
+
+ def get_verbose (self):
+ return self.verbose
+
+ ############# server actions ################
+
+ # ping server
+ def ping(self):
+ rc, info = self.transmit("ping")
+ return RC(rc, info)
+
+
def get_global_stats(self):
- command = self.RpcCmdData("get_global_stats", {})
- return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
- # return self.transmit("get_global_stats")
+ rc, info = self.transmit("get_global_stats")
+ return RC(rc, info)
+
+
+ ########## port commands ##############
+ def sync_ports (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].sync())
+
+ return rc
+
+ # acquire ports, if port_list is none - get all
+ def acquire (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].acquire(force))
+
+ return rc
+
+ # release ports
+ def release (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].release())
+
+ return rc
+
+
+ def add_stream(self, stream_id, stream_obj, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].add_stream(stream_id, stream_obj))
+
+ return rc
+
+
+
+ def add_stream_pack(self, stream_pack_list, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].add_streams(stream_pack_list))
+
+ return rc
+
+
+
+ def remove_stream(self, stream_id, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_stream(stream_id))
+
+ return rc
+
+
+
+ def remove_all_streams(self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_all_streams())
+
+ return rc
+
+
+ def get_stream(self, stream_id, port_id, get_pkt = False):
+
+ return self.ports[port_id].get_stream(stream_id)
+
+
+ def get_all_streams(self, port_id, get_pkt = False):
+
+ return self.ports[port_id].get_all_streams()
+
+
+ def get_stream_id_list(self, port_id):
+
+ return self.ports[port_id].get_stream_id_list()
+
+
+ def start_traffic (self, multiplier, duration, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].start(multiplier, duration))
+
+ return rc
+
+
+ def resume_traffic (self, port_id_list = None, force = False):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].resume())
+
+ return rc
+
+ def pause_traffic (self, port_id_list = None, force = False):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].pause())
+
+ return rc
+
+ def stop_traffic (self, port_id_list = None, force = False):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].stop(force))
+
+ return rc
+
+
+ def update_traffic (self, mult, port_id_list = None, force = False):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].update(mult))
+
+ return rc
+
+
+ def validate (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].validate())
+
+ return rc
+
- @force_status(owned=True, active_and_owned=True)
def get_port_stats(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = self.RpcCmdData("get_port_stats", params)
- return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
+ pass
- @force_status(owned=True, active_and_owned=True)
def get_stream_stats(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = self.RpcCmdData("get_stream_stats", params)
- return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
+ pass
+
- # ----- internal methods ----- #
def transmit(self, method_name, params={}):
- return self.tx_link.transmit(method_name, params)
+ return self.comm_link.transmit(method_name, params)
+
def transmit_batch(self, batch_list):
- return self.tx_link.transmit_batch(batch_list)
+ return self.comm_link.transmit_batch(batch_list)
- @staticmethod
- def _object_decoder(obj_type, obj_data):
- if obj_type == "global":
- return CGlobalStats(**obj_data)
- elif obj_type == "port":
- return CPortStats(**obj_data)
- elif obj_type == "stream":
- return CStreamStats(**obj_data)
- else:
- # Do not serialize the data into class
- return obj_data
+ ######################### Console (high level) API #########################
- @staticmethod
- def default_success_test(result_obj):
- if result_obj.success:
- return True
- else:
- return False
+ @timing
+ def cmd_ping(self):
+ rc = self.ping()
+ rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port()))
+ return rc
- # ----- handler internal methods ----- #
- def _handle_acquire_response(self, request, response):
- if response.success:
- self._conn_handler[request.get("port_id")] = response.data
+ def cmd_connect(self, mode = "RW"):
+ rc = self.connect(mode)
+ rc.annotate()
+ return rc
- def _handle_release_response(self, request, response):
- if response.success:
- del self._conn_handler[request.get("port_id")]
+ def cmd_disconnect(self):
+ rc = self.disconnect()
+ rc.annotate()
+ return rc
- def _handle_start_traffic_response(self, request, response):
- if response.success:
- self._active_ports.add(request.get("port_id"))
+ # reset
+ def cmd_reset(self):
- def _handle_stop_traffic_response(self, request, response):
- if response.success:
- self._active_ports.remove(request.get("port_id"))
- def _handle_get_global_stats_response(self, request, response):
- if response.success:
- return CGlobalStats(**response.success)
- else:
- return False
+ rc = self.acquire(force = True)
+ rc.annotate("Force acquiring all ports:")
+ if rc.bad():
+ return rc
+
+
+ # force stop all ports
+ rc = self.stop_traffic(self.get_port_ids(), True)
+ rc.annotate("Stop traffic on all ports:")
+ if rc.bad():
+ return rc
+
+
+ # remove all streams
+ rc = self.remove_all_streams(self.get_port_ids())
+ rc.annotate("Removing all streams from all ports:")
+ if rc.bad():
+ return rc
+
+ # TODO: clear stats
+ return RC_OK()
+
+
+ # stop cmd
+ def cmd_stop (self, port_id_list):
+
+ # find the relveant ports
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if not active_ports:
+ msg = "No active traffic on provided ports"
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
- def _handle_get_port_stats_response(self, request, response):
- if response.success:
- return CPortStats(**response.success)
+ rc = self.stop_traffic(active_ports)
+ rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
+ # update cmd
+ def cmd_update (self, port_id_list, mult):
+
+ # find the relevant ports
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if not active_ports:
+ msg = "No active traffic on provided ports"
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+
+ rc = self.update_traffic(mult, active_ports)
+ rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list))
+
+ return rc
+
+ # clear stats
+ def cmd_clear(self, port_id_list):
+
+ for port_id in port_id_list:
+ self.ports[port_id].clear_stats()
+
+ self.global_stats.clear_stats()
+
+ return RC_OK()
+
+
+ # pause cmd
+ def cmd_pause (self, port_id_list):
+
+ # find the relevant ports
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if not active_ports:
+ msg = "No active traffic on provided ports"
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+
+ rc = self.pause_traffic(active_ports)
+ rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list))
+ return rc
+
+
+
+ # resume cmd
+ def cmd_resume (self, port_id_list):
+
+ # find the relveant ports
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if not active_ports:
+ msg = "No active traffic on porvided ports"
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+
+ rc = self.resume_traffic(active_ports)
+ rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list))
+ return rc
+
+
+ # start cmd
+ def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry):
+
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if active_ports:
+ if not force:
+ msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports)
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+ else:
+ rc = self.cmd_stop(active_ports)
+ if not rc:
+ return rc
+
+
+ rc = self.remove_all_streams(port_id_list)
+ rc.annotate("Removing all streams from port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+
+ rc = self.add_stream_pack(stream_list.compiled, port_id_list)
+ rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list))
+ if rc.bad():
+ return rc
+
+ # when not on dry - start the traffic , otherwise validate only
+ if not dry:
+ rc = self.start_traffic(mult, duration, port_id_list)
+ rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
+
+ return rc
else:
- return False
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list))
+
+ if rc.bad():
+ return rc
+
+ # show a profile on one port for illustration
+ self.ports[port_id_list[0]].print_profile(mult, duration)
+
+ return rc
+
+
+ # validate port(s) profile
+ def cmd_validate (self, port_id_list):
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating streams on port(s) {0}:".format(port_id_list))
+ return rc
+
+
+ # stats
+ def cmd_stats(self, port_id_list, stats_mask=set()):
+ stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask)
+
+ stats_obj = {}
+ for stats_type in stats_opts:
+ stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type))
+ return stats_obj
+
+
+ ############## High Level API With Parser ################
+
+ def cmd_connect_line (self, line):
+ '''Connects to the TRex server'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "connect",
+ self.cmd_connect_line.__doc__,
+ parsing_opts.FORCE)
- def _handle_get_stream_stats_response(self, request, response):
- if response.success:
- return CStreamStats(**response.success)
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ if opts.force:
+ rc = self.cmd_connect(mode = "RWF")
else:
- return False
+ rc = self.cmd_connect(mode = "RW")
+
+ @timing
+ def cmd_start_line (self, line):
+ '''Start selected traffic in specified ports on TRex\n'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "start",
+ self.cmd_start_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.TOTAL,
+ parsing_opts.FORCE,
+ parsing_opts.STREAM_FROM_PATH_OR_FILE,
+ parsing_opts.DURATION,
+ parsing_opts.MULTIPLIER_STRICT,
+ parsing_opts.DRY_RUN)
+
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+
+ if opts.dry:
+ print format_text("\n*** DRY RUN ***", 'bold')
+
+ if opts.db:
+ stream_list = self.streams_db.get_stream_pack(opts.db)
+ rc = RC(stream_list != None)
+ rc.annotate("Load stream pack (from DB):")
+ if rc.bad():
+ return RC_ERR("Failed to load stream pack")
- def _is_ports_valid(self, port_id):
- if isinstance(port_id, list) or isinstance(port_id, set):
- # check each item of the sequence
- return all([self._is_ports_valid(port)
- for port in port_id])
- elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()):
- return True
else:
- return False
+ # load streams from file
+ stream_list = self.streams_db.load_yaml_file(opts.file[0])
+ rc = RC(stream_list != None)
+ rc.annotate("Load stream pack (from file):")
+ if stream_list == None:
+ return RC_ERR("Failed to load stream pack")
+
+
+ # total has no meaning with percentage - its linear
+ if opts.total and (opts.mult['type'] != 'percentage'):
+ # if total was set - divide it between the ports
+ opts.mult['value'] = opts.mult['value'] / len(opts.ports)
+
+ return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry)
+
+ @timing
+ def cmd_resume_line (self, line):
+ '''Resume active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "resume",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ return self.cmd_resume(opts.ports)
+
+
+ @timing
+ def cmd_stop_line (self, line):
+ '''Stop active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "stop",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ return self.cmd_stop(opts.ports)
+
+
+ @timing
+ def cmd_pause_line (self, line):
+ '''Pause active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "pause",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ return self.cmd_pause(opts.ports)
+
+
+ @timing
+ def cmd_update_line (self, line):
+ '''Update port(s) speed currently active\n'''
+ parser = parsing_opts.gen_parser(self,
+ "update",
+ self.cmd_update_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.MULTIPLIER,
+ parsing_opts.TOTAL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ # total has no meaning with percentage - its linear
+ if opts.total and (opts.mult['type'] != 'percentage'):
+ # if total was set - divide it between the ports
+ opts.mult['value'] = opts.mult['value'] / len(opts.ports)
+
+ return self.cmd_update(opts.ports, opts.mult)
+
+ @timing
+ def cmd_reset_line (self, line):
+ return self.cmd_reset()
+
+
+ def cmd_clear_line (self, line):
+ '''Clear cached local statistics\n'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "clear",
+ self.cmd_clear_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+ return self.cmd_clear(opts.ports)
+
+
+ def cmd_stats_line (self, line):
+ '''Fetch statistics from TRex server by port\n'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "stats",
+ self.cmd_stats_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.STATS_MASK)
+
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ # determine stats mask
+ mask = self._get_mask_keys(**self._filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS))
+ if not mask:
+ # set to show all stats if no filter was given
+ mask = trex_stats.ALL_STATS_OPTS
+
+ stats = self.cmd_stats(opts.ports, mask)
+
+ # print stats to screen
+ for stat_type, stat_data in stats.iteritems():
+ text_tables.print_table_with_header(stat_data.text_table, stat_type)
+
+
+ return RC_OK()
+
+
+
+ @timing
+ def cmd_validate_line (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ parser = parsing_opts.gen_parser(self,
+ "validate",
+ self.cmd_validate_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ rc = self.cmd_validate(opts.ports)
+ return rc
+
+
+ def cmd_exit_line (self, line):
+ print format_text("Exiting\n", 'bold')
+ # a way to exit
+ return RC_ERR("exit")
+
- def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
- for i, response in enumerate(resp_list):
- # testing each result with success test so that a conclusion report could be deployed in future.
- if success_test(response):
- # run handler method with its params
- handler_func(req_list[i], response)
+ def cmd_wait_line (self, line):
+ '''wait for a period of time\n'''
+
+ parser = parsing_opts.gen_parser(self,
+ "wait",
+ self.cmd_wait_line.__doc__,
+ parsing_opts.DURATION)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ delay_sec = opts.duration if (opts.duration > 0) else 1
+
+ print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold')
+ time.sleep(delay_sec)
+
+ return RC_OK()
+
+ # run a script of commands
+ def run_script_file (self, filename):
+
+ print format_text("\nRunning script file '{0}'...".format(filename), 'bold')
+
+ rc = self.cmd_connect()
+ if rc.bad():
+ return
+
+ with open(filename) as f:
+ script_lines = f.readlines()
+
+ cmd_table = {}
+
+ # register all the commands
+ cmd_table['start'] = self.cmd_start_line
+ cmd_table['stop'] = self.cmd_stop_line
+ cmd_table['reset'] = self.cmd_reset_line
+ cmd_table['wait'] = self.cmd_wait_line
+ cmd_table['exit'] = self.cmd_exit_line
+
+ for index, line in enumerate(script_lines, start = 1):
+ line = line.strip()
+ if line == "":
+ continue
+ if line.startswith("#"):
+ continue
+
+ sp = line.split(' ', 1)
+ cmd = sp[0]
+ if len(sp) == 2:
+ args = sp[1]
else:
- continue # TODO: mark in this case somehow the bad result
+ args = ""
+
+ print format_text("Executing line {0} : '{1}'\n".format(index, line))
+ if not cmd in cmd_table:
+ print "\n*** Error at line {0} : '{1}'\n".format(index, line)
+ print format_text("unknown command '{0}'\n".format(cmd), 'bold')
+ return False
+ rc = cmd_table[cmd](args)
+ if rc.bad():
+ return False
+
+ print format_text("\n[Done]", 'bold')
+
+ return True
+
+
+ #################################
+ # ------ private methods ------ #
+ @staticmethod
+ def _get_mask_keys(ok_values={True}, **kwargs):
+ masked_keys = set()
+ for key, val in kwargs.iteritems():
+ if val in ok_values:
+ masked_keys.add(key)
+ return masked_keys
+
+ @staticmethod
+ def _filter_namespace_args(namespace, ok_values):
+ return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
+
+
+ #################################
# ------ private classes ------ #
- class CTxLink(object):
+ class CCommLink(object):
"""describes the connectivity of the stateless client method"""
def __init__(self, server="localhost", port=5050, virtual=False):
- super(CTRexStatelessClient.CTxLink, self).__init__()
+ super(CTRexStatelessClient.CCommLink, self).__init__()
self.virtual = virtual
self.server = server
self.port = port
+ self.verbose = False
self.rpc_link = JsonRpcClient(self.server, self.port)
+
+ @property
+ def is_connected(self):
+ if not self.virtual:
+ return self.rpc_link.connected
+ else:
+ return True
+
+ def get_server (self):
+ return self.server
+
+ def set_verbose(self, mode):
+ self.verbose = mode
+ return self.rpc_link.set_verbose(mode)
+
+ def connect(self):
+ if not self.virtual:
+ return self.rpc_link.connect()
+
+ def disconnect(self):
if not self.virtual:
- self.rpc_link.connect()
+ return self.rpc_link.disconnect()
def transmit(self, method_name, params={}):
if self.virtual:
@@ -352,144 +1203,5 @@ class CTRexStatelessClient(object):
port=self.port)
-class CStream(object):
- """docstring for CStream"""
- DEFAULTS = {"rx_stats": CRxStats,
- "mode": CTxMode,
- "isg": 5.0,
- "next_stream": -1,
- "self_start": True,
- "enabled": True}
-
- def __init__(self, **kwargs):
- super(CStream, self).__init__()
- for k, v in kwargs.items():
- setattr(self, k, v)
- # set default values to unset attributes, according to DEFAULTS dict
- set_keys = set(kwargs.keys())
- keys_to_set = [x
- for x in self.DEFAULTS
- if x not in set_keys]
- for key in keys_to_set:
- default = self.DEFAULTS.get(key)
- if type(default) == type:
- setattr(self, key, default())
- else:
- setattr(self, key, default)
-
- @property
- def packet(self):
- return self._packet
-
- @packet.setter
- def packet(self, packet_obj):
- assert isinstance(packet_obj, CTRexPktBuilder)
- self._packet = packet_obj
-
- @property
- def enabled(self):
- return self._enabled
-
- @enabled.setter
- def enabled(self, bool_value):
- self._enabled = bool(bool_value)
-
- @property
- def self_start(self):
- return self._self_start
-
- @self_start.setter
- def self_start(self, bool_value):
- self._self_start = bool(bool_value)
-
- @property
- def next_stream(self):
- return self._next_stream
-
- @next_stream.setter
- def next_stream(self, value):
- self._next_stream = int(value)
-
- def dump(self):
- pass
- return {"enabled": self.enabled,
- "self_start": self.self_start,
- "isg": self.isg,
- "next_stream": self.next_stream,
- "packet": self.packet.dump_pkt(),
- "mode": self.mode.dump(),
- "vm": self.packet.get_vm_data(),
- "rx_stats": self.rx_stats.dump()}
-
-class CRxStats(object):
-
- def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False):
- self._rx_dict = {"enabled": enabled,
- "seq_enabled": seq_enabled,
- "latency_enabled": latency_enabled}
-
- @property
- def enabled(self):
- return self._rx_dict.get("enabled")
-
- @enabled.setter
- def enabled(self, bool_value):
- self._rx_dict['enabled'] = bool(bool_value)
-
- @property
- def seq_enabled(self):
- return self._rx_dict.get("seq_enabled")
-
- @seq_enabled.setter
- def seq_enabled(self, bool_value):
- self._rx_dict['seq_enabled'] = bool(bool_value)
-
- @property
- def latency_enabled(self):
- return self._rx_dict.get("latency_enabled")
-
- @latency_enabled.setter
- def latency_enabled(self, bool_value):
- self._rx_dict['latency_enabled'] = bool(bool_value)
-
- def dump(self):
- return {k: v
- for k, v in self._rx_dict.items()
- if v
- }
-
-
-class CTxMode(object):
- """docstring for CTxMode"""
- def __init__(self, tx_mode, pps):
- super(CTxMode, self).__init__()
- if tx_mode not in ["continuous", "single_burst", "multi_burst"]:
- raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode))
- self._tx_mode = tx_mode
- self._fields = {'pps': float(pps)}
- if tx_mode == "single_burst":
- self._fields['total_pkts'] = 0
- elif tx_mode == "multi_burst":
- self._fields['pkts_per_burst'] = 0
- self._fields['ibg'] = 0.0
- self._fields['count'] = 0
- else:
- pass
-
- def set_tx_mode_attr(self, attr, val):
- if attr in self._fields:
- self._fields[attr] = type(self._fields.get(attr))(val)
- else:
- raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')".
- format(attr, self._tx_mode))
-
- def dump(self):
- dump = {"type": self._tx_mode}
- dump.update({k: v
- for k, v in self._fields.items()
- })
- return dump
-
-
if __name__ == "__main__":
pass