summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py1235
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py273
-rwxr-xr-xscripts/automation/trex_control_plane/console/parsing_opts.py9
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py237
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp12
5 files changed, 817 insertions, 949 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 168853b3..0df2ac5d 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -6,12 +6,16 @@ try:
except ImportError:
# support import for Python 3
import client.outer_packages
+
from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
from client_utils.packet_builder import CTRexPktBuilder
import json
from common.trex_stats import *
from common.trex_streams import *
from collections import namedtuple
+from common.text_opts import *
+import parsing_opts
+import time
from trex_async_client import CTRexAsyncClient
@@ -24,148 +28,417 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
msg=self.msg,
stat="success" if self.success else "fail")
-# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])
+# simple class to represent complex return value
+class RC:
+
+ def __init__ (self, rc = None, data = None):
+ self.rc_list = []
+
+ if (rc != None) and (data != None):
+ tuple_rc = namedtuple('RC', ['rc', 'data'])
+ self.rc_list.append(tuple_rc(rc, data))
+
+ def add (self, rc):
+ self.rc_list += rc.rc_list
+
+ def good (self):
+ return all([x.rc for x in self.rc_list])
+
+ def bad (self):
+ return not self.good()
+
+ def data (self):
+ return all([x.data if x.rc else "" for x in self.rc_list])
+
+ def err (self):
+ return all([x.data if not x.rc else "" for x in self.rc_list])
+
+ def annotate (self, desc = None):
+ if desc:
+ print format_text('\n{:<40}'.format(desc), 'bold'),
+
+ if self.bad():
+ # print all the errors
+ print ""
+ for x in self.rc_list:
+ if not x.rc:
+ print format_text("\n{0}".format(x.data), 'bold')
+
+ print ""
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+
+ else:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+
+
+def RC_OK():
+ return RC(True, "")
+def RC_ERR (err):
+ return RC(False, err)
+
+
+LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
+
+# describes a stream DB
+class CStreamsDB(object):
+
+ def __init__(self):
+ self.stream_packs = {}
+
+ def load_yaml_file (self, filename):
+
+ stream_pack_name = filename
+ if stream_pack_name in self.get_loaded_streams_names():
+ self.remove_stream_packs(stream_pack_name)
+
+ stream_list = CStreamList()
+ loaded_obj = stream_list.load_yaml(filename)
+
+ try:
+ compiled_streams = stream_list.compile_streams()
+ rc = self.load_streams(stream_pack_name,
+ LoadedStreamList(loaded_obj,
+ [StreamPack(v.stream_id, v.stream.dump())
+ for k, v in compiled_streams.items()]))
+
+ except Exception as e:
+ return None
+
+ return self.get_stream_pack(stream_pack_name)
+
+ def load_streams(self, name, LoadedStreamList_obj):
+ if name in self.stream_packs:
+ return False
+ else:
+ self.stream_packs[name] = LoadedStreamList_obj
+ return True
+
+ def remove_stream_packs(self, *names):
+ removed_streams = []
+ for name in names:
+ removed = self.stream_packs.pop(name)
+ if removed:
+ removed_streams.append(name)
+ return removed_streams
+
+ def clear(self):
+ self.stream_packs.clear()
+
+ def get_loaded_streams_names(self):
+ return self.stream_packs.keys()
+
+ def stream_pack_exists (self, name):
+ return name in self.get_loaded_streams_names()
+
+ def get_stream_pack(self, name):
+ if not self.stream_pack_exists(name):
+ return None
+ else:
+ return self.stream_packs.get(name)
+
+
+# describes a single port
+class Port:
+
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+
+ def __init__ (self, port_id, user, transmit):
+ self.port_id = port_id
+ self.state = self.STATE_IDLE
+ self.handler = None
+ self.transmit = transmit
+ self.user = user
+
+ self.streams = {}
+
+ def err (self, msg):
+ return RC_ERR("port {0} : {1}".format(self.port_id, msg))
+
+ def ok (self):
+ return RC_OK()
+
+ # take the port
+ def acquire (self, force = False):
+ params = {"port_id": self.port_id,
+ "user": self.user,
+ "force": force}
+
+ command = RpcCmdData("acquire", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return self.ok()
+ else:
+ return self.err(rc.data)
+
+
+ # release the port
+ def release (self):
+ params = {"port_id": self.port_id,
+ "handler": self.handler}
+
+ command = RpcCmdData("release", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return self.ok()
+ else:
+ return self.err(rc.data)
+
+ def is_acquired (self):
+ return (self.handler != None)
+
+ def is_active (self):
+ return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
+
+ def sync (self, sync_data):
+
+ self.handler = sync_data['handler']
+
+ if sync_data['state'] == "DOWN":
+ self.state = self.STATE_DOWN
+ elif sync_data['state'] == "IDLE":
+ self.state = self.STATE_IDLE
+ elif sync_data['state'] == "STREAMS":
+ self.state = self.STATE_STREAMS
+ elif sync_data['state'] == "TX":
+ self.state = self.STATE_TX
+ elif sync_data['state'] == "PAUSE":
+ self.state = self.STATE_PAUSE
+ else:
+ raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
+
+ return self.ok()
+
+
+ # return TRUE if write commands
+ def is_port_writeable (self):
+ # operations on port can be done on state idle or state sreams
+ return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
+
+ # add stream to the port
+ def add_stream (self, stream_id, stream_obj):
+
+ if not self.is_port_writeable():
+ return self.err("Please stop port before attempting to add streams")
+
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id,
+ "stream": stream_obj}
+
+ rc, data = self.transmit("add_stream", params)
+ if not rc:
+ r = self.err(data)
+ print r.good()
+
+ # add the stream
+ self.streams[stream_id] = stream_obj
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ # remove stream from port
+ def remove_stream (self, stream_id):
+
+ if not stream_id in self.streams:
+ return self.err("stream {0} does not exists".format(stream_id))
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id}
+
+
+ rc, data = self.transmit("remove_stream", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams[stream_id] = None
+
+ return self.ok()
+
+ # remove all the streams
+ def remove_all_streams (self):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("remove_all_streams", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams = {}
+
+ return self.ok()
+
+ # start traffic
+ def start (self, mul):
+ if self.state == self.STATE_DOWN:
+ return self.err("Unable to start traffic - port is down")
+
+ if self.state == self.STATE_IDLE:
+ return self.err("Unable to start traffic - no streams attached to port")
+
+ if self.state == self.STATE_TX:
+ return self.err("Unable to start traffic - port is already transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul}
+
+ rc, data = self.transmit("start_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+ # stop traffic
+ # with force ignores the cached state and sends the command
+ def stop (self, force = False):
+
+ if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("stop_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False):
+ def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
self.verbose = False
self._conn_handler = {}
self._active_ports = set()
- self._stats = CTRexStatsManager("port", "stream")
self._system_info = None
self._server_version = None
self.__err_log = None
self._async_client = CTRexAsyncClient(async_port)
+ self.streams_db = CStreamsDB()
- # ----- decorator methods ----- #
- def acquired(func):
- def wrapper_f(self, *args, **kwargs):
- # print func.__name__
- # print args
- # print kwargs
- port_ids = kwargs.get("port_id")
- # if not port_ids:
- # # print "FROM ARGS!"
- # # print args
- # port_ids = args[0]
- if isinstance(port_ids, int):
- # make sure port_ids is a list
- port_ids = [port_ids]
- bad_ids = set()
- # print "============="
- # print port_ids
- for port_id in port_ids:
- port_owned = self._conn_handler.get(port_id)
- if not port_owned:
- bad_ids.add(port_id)
- # elif active_and_owned: # stronger condition than just owned, hence gets precedence
- # if port_owned and port_id in self._active_ports:
- # continue
- # else:
- # bad_ids.add(port_id)
- else:
- continue
- if bad_ids:
- # Some port IDs are not according to desires status
- raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} aren't "
- "acquired".format(func.__name__, list(bad_ids)))
- else:
- return func(self, *args, **kwargs)
- return wrapper_f
-
- def force_status(owned=True, active_and_owned=False):
- def wrapper(func):
- def wrapper_f(self, *args, **kwargs):
- # print args
- # print kwargs
- port_ids = kwargs.get("port_id")
- if not port_ids:
- #print "FROM ARGS!"
- #print args
- port_ids = args[0]
- if isinstance(port_ids, int):
- # make sure port_ids is a list
- port_ids = [port_ids]
- bad_ids = set()
- # print "============="
- # print port_ids
- for port_id in port_ids:
- port_owned = self._conn_handler.get(port_id)
- if owned and not port_owned:
- bad_ids.add(port_id)
- elif active_and_owned: # stronger condition than just owned, hence gets precedence
- if port_owned and port_id in self._active_ports:
- continue
- else:
- bad_ids.add(port_id)
- else:
- continue
- if bad_ids:
- # Some port IDs are not according to desires status
- raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not "
- "at allowed states".format(func.__name__, list(bad_ids)))
- else:
- return func(self, *args, **kwargs)
- return wrapper_f
- return wrapper
-
- @property
- def system_info(self):
- if not self._system_info:
- rc, info = self.get_system_info()
- if rc:
- self._system_info = info
- else:
- self.__err_log = info
- return self._system_info if self._system_info else "Unknown"
-
- @property
- def server_version(self):
- if not self._server_version:
- rc, ver_info = self.get_version()
- if rc:
- self._server_version = ver_info
- else:
- self.__err_log = ver_info
- return self._server_version if self._server_version else "Unknown"
+ self.connected = False
+
+ ############# helper functions section ##############
+
+ def validate_port_list(self, port_id_list):
+ if not isinstance(port_id_list, list):
+ print type(port_id_list)
+ return False
+
+ # check each item of the sequence
+ return all([ (port_id >= 0) and (port_id < self.get_port_count())
+ for port_id in port_id_list ])
+
+ # some preprocessing for port argument
+ def __ports (self, port_id_list):
- def is_connected(self):
- return self.comm_link.is_connected
+ # none means all
+ if port_id_list == None:
+ return range(0, self.get_port_count())
- # ----- user-access methods ----- #
- def connect(self):
- rc, err = self.comm_link.connect()
+ # always list
+ if isinstance(port_id_list, int):
+ port_id_list = [port_id_list]
+
+ if not isinstance(port_id_list, list):
+ raise ValueError("bad port id list: {0}".format(port_id_list))
+
+ for port_id in port_id_list:
+ if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
+ raise ValueError("bad port id {0}".format(port_id))
+
+ return port_id_list
+
+ ############ boot up section ################
+
+ # connection sequence
+ def connect (self):
+
+ self.connected = False
+
+ # connect
+ rc, data = self.comm_link.connect()
if not rc:
- return rc, err
- return self._init_sync()
+ return RC_ERR(data)
- def get_stats_async (self):
- return self._async_client.get_stats()
+ # cache system info
+ rc, data = self.transmit("get_system_info")
+ if not rc:
+ return RC_ERR(data)
+
+ self.system_info = data
+
+ # cache supported cmds
+ rc, data = self.transmit("get_supported_cmds")
+ if not rc:
+ return RC_ERR(data)
+
+ self.supported_cmds = data
+
+ # create ports
+ self.ports = []
+ for port_id in xrange(0, self.get_port_count()):
+ self.ports.append(Port(port_id, self.user, self.transmit))
+
+ # acquire all ports
+ rc = self.acquire()
+ if rc.bad():
+ return rc
+
+ rc = self.sync_with_server()
+ if rc.bad():
+ return rc
+
+ self.connected = True
+
+ return RC_OK()
+
+ def is_connected (self):
+ return self.connected
- def get_connection_port (self):
- return self.comm_link.port
def disconnect(self):
- return self.comm_link.disconnect()
+ self.connected = False
+ self.comm_link.disconnect()
+ return RC_OK()
- def ping(self):
- return self.transmit("ping")
+
+
+ ########### cached queries (no server traffic) ###########
def get_supported_cmds(self):
- return self.transmit("get_supported_cmds")
+ return self.supported_cmds
def get_version(self):
- return self.transmit("get_version")
+ return self.server_version
def get_system_info(self):
- return self.transmit("get_system_info")
+ return self.system_info
def get_port_count(self):
return self.system_info.get("port_count")
@@ -177,483 +450,406 @@ class CTRexStatelessClient(object):
else:
return port_ids
- def sync_user(self, sync_streams=False):
- return self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
+ def get_stats_async (self):
+ return self._async_client.get_stats()
+
+ def get_connection_port (self):
+ return self.comm_link.port
+
+ def get_connection_ip (self):
+ return self.comm_link.server
def get_acquired_ports(self):
- return self._conn_handler.keys()
+ return [port.port_id for port in self.ports if port.is_acquired()]
+
def get_active_ports(self):
- return list(self._active_ports)
+ return [port.port_id for port in self.ports if port.is_active()]
def set_verbose(self, mode):
self.comm_link.set_verbose(mode)
self.verbose = mode
- def acquire(self, port_id, force=False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_acquire_response)
- else:
- params = {"port_id": port_id,
- "user": self.user,
- "force": force}
- command = RpcCmdData("acquire", params)
- return self._handle_acquire_response(command,
- self.transmit(command.method, command.params),
- self.default_success_test)
-
- @force_status(owned=True)
- def release(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_release_response,
- success_test=self.ack_success_test)
- else:
- self._conn_handler.pop(port_id)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("release", params)
- return self._handle_release_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
-
- @acquired
- def add_stream(self, stream_id, stream_obj, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- assert isinstance(stream_obj, CStream)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id,
- "stream": stream_obj.dump()}
- return self.transmit("add_stream", params)
+ ############# server actions ################
+
+ # ping server
+ def ping(self):
+ rc, info = self.transmit("ping")
+ return RC(rc, info)
+
+
+ def sync_with_server(self, sync_streams=False):
+ rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
+ if not rc:
+ return RC_ERR(data)
+
+ for port_info in data:
+
+ rc = self.ports[port_info['port_id']].sync(port_info)
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
+
+
+ ########## port commands ##############
+ # acquire ports, if port_list is none - get all
+ def acquire (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].acquire(force))
+
+ return rc
+
+ # release ports
+ def release (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].release(force))
+
+ return rc
+
+
+ def add_stream(self, stream_id, stream_obj, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
- @acquired
- def add_stream_pack(self, stream_pack_list, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].add_stream(stream_id, stream_obj))
+
+ return rc
+
+
+ def add_stream_pack(self, stream_pack_list, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
- # since almost every run contains more than one transaction with server, handle all as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = []
for stream_pack in stream_pack_list:
- commands.extend([RpcCmdData("add_stream", {"port_id": p_id,
- "handler": self._conn_handler.get(p_id),
- "stream_id": stream_pack.stream_id,
- "stream": stream_pack.stream}
- )
- for p_id in port_ids]
- )
- res_ok, resp_list = self.transmit_batch(commands)
- if not res_ok:
- return res_ok, resp_list
-
- return self._process_batch_result(commands, resp_list, self._handle_add_stream_response,
- success_test=self.ack_success_test)
-
- @force_status(owned=True)
- def remove_stream(self, stream_id, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id}
- return self.transmit("remove_stream", params)
-
- def remove_all_streams(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response,
- success_test=self.ack_success_test)
- else:
- params = {"port_id": port_id,
- "handler": self._conn_handler.get(port_id)}
- command = RpcCmdData("remove_all_streams", params)
- return self._handle_remove_streams_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
- pass
+ rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list))
+
+ return rc
- @force_status(owned=True)#, active_and_owned=True)
- def get_all_streams(self, port_id, get_pkt = False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "get_pkt": get_pkt}
- return self.transmit("get_all_streams", params)
-
- @force_status(owned=True)#, active_and_owned=True)
- def get_stream_id_list(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- return self.transmit("get_stream_list", params)
-
- @force_status(owned=True, active_and_owned=True)
+
+ def remove_stream(self, stream_id, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_stream(stream_id))
+
+ return rc
+
+
+
+ def remove_all_streams(self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_all_streams())
+
+ return rc
+
+
def get_stream(self, stream_id, port_id, get_pkt = False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id,
- "get_pkt": get_pkt}
- return self.transmit("get_stream_list", params)
-
- def start_traffic(self, multiplier, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id),
- "port_id": p_id,
- "mul": multiplier})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response,
- success_test=self.ack_success_test)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "mul": multiplier}
- command = RpcCmdData("start_traffic", params)
- return self._handle_start_traffic_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
-
- def stop_traffic(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- if not port_ids:
- # don't invoke if port ids is empty
- return True, []
- commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response,
- success_test=self.ack_success_test)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("stop_traffic", params)
- return self._handle_stop_traffic_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
-
-# def get_global_stats(self):
-# command = RpcCmdData("get_global_stats", {})
-# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
-# # return self.transmit("get_global_stats")
-
- @force_status(owned=True, active_and_owned=True)
+
+ return self.ports[port_id].get_stream(stream_id)
+
+
+ def get_all_streams(self, port_id, get_pkt = False):
+
+ return self.ports[port_id].get_all_streams()
+
+
+ def get_stream_id_list(self, port_id):
+
+ return self.ports[port_id].get_stream_id_list()
+
+
+ def start_traffic (self, multiplier, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].start(multiplier))
+
+ return rc
+
+
+
+ def stop_traffic (self, port_id_list = None, force = False):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].stop(force))
+
+ return rc
+
+
def get_port_stats(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("get_port_stats", params)
- return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
+ pass
- @force_status(owned=True, active_and_owned=True)
def get_stream_stats(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("get_stream_stats", params)
- return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
-
- # ----- internal methods ----- #
- def _init_sync(self):
- # get server version and system info
- err = False
- if self.server_version == "Unknown" or self.system_info == "Unknown":
- self.disconnect()
- return False, self.__err_log
- # sync with previous session
- res_ok, port_info = self.sync_user()
- if not res_ok:
- self.disconnect()
- return False, port_info
- else:
- # handle sync data
- for port in port_info:
- self._conn_handler[port.get("port_id")] = port.get("handler")
- if port.get("state") == "transmitting":
- # port is active
- self._active_ports.add(port.get("port_id"))
- return True, ""
+ pass
def transmit(self, method_name, params={}):
return self.comm_link.transmit(method_name, params)
- def transmit_batch(self, batch_list):
- return self.comm_link.transmit_batch(batch_list)
-
- @staticmethod
- def _object_decoder(obj_type, obj_data):
- if obj_type == "global":
- return CGlobalStats(**obj_data)
- elif obj_type == "port":
- return CPortStats(**obj_data)
- elif obj_type == "stream":
- return CStreamStats(**obj_data)
- else:
- # Do not serialize the data into class
- return obj_data
- @staticmethod
- def default_success_test(result_obj):
- if result_obj.success:
- return True
- else:
- return False
- @staticmethod
- def ack_success_test(result_obj):
- if result_obj.success and result_obj.data == "ACK":
- return True
- else:
- return False
+ ######################### Console (high level) API #########################
+ def cmd_ping (self):
+ rc = self.ping()
+ rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port()))
+ return rc
- ######################### Console (high level) API #########################
+ def cmd_connect (self):
+ rc = self.connect()
+ rc.annotate()
+ return rc
+
+ def cmd_disconnect (self):
+ rc = self.disconnect()
+ rc.annotate()
+ return rc
# reset
- # acquire, stop, remove streams and clear stats
- #
- #
- def cmd_reset (self, annotate_func):
+ def cmd_reset (self):
- ports = self.get_port_ids()
# sync with the server
- rc, log = self._init_sync()
- annotate_func("Syncing with the server:", rc, log)
- if not rc:
- return False
+ rc = self.sync_with_server()
+ rc.annotate("Syncing with the server:")
+ if rc.bad():
+ return rc
+ rc = self.acquire(force = True)
+ rc.annotate("Force acquiring all ports:")
+ if rc.bad():
+ return rc
- # force acquire all ports
- rc, log = self.acquire(ports, force = True)
- annotate_func("Force acquiring all ports:", rc, log)
- if not rc:
- return False
- # force stop
- rc, log = self.stop_traffic(ports)
- annotate_func("Stop traffic on all ports:", rc, log)
- if not rc:
- return False
+ # force stop all ports
+ rc = self.stop_traffic(self.get_port_ids(), True)
+ rc.annotate("Stop traffic on all ports:")
+ if rc.bad():
+ return rc
+
# remove all streams
- rc, log = self.remove_all_streams(ports)
- annotate_func("Removing all streams from all ports:", rc, log)
- if not rc:
- return False
+ rc = self.remove_all_streams(self.get_port_ids())
+ rc.annotate("Removing all streams from all ports:")
+ if rc.bad():
+ return rc
# TODO: clear stats
- return True
+ return RC_OK()
# stop cmd
- def cmd_stop (self, ports, annotate_func):
+ def cmd_stop (self, port_id_list):
# find the relveant ports
- active_ports = set(self.get_active_ports()).intersection(ports)
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
if not active_ports:
- annotate_func("No active traffic on porivded ports")
- return True
+ msg = "No active traffic on porvided ports"
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
- rc, log = self.stop_traffic(active_ports)
- annotate_func("Stopping traffic on ports {0}:".format([port for port in active_ports]), rc, log)
- if not rc:
- return False
+ rc = self.stop_traffic(active_ports)
+ rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
- return True
+ return RC_OK()
# start cmd
- def cmd_start (self, ports, stream_list, mult, force, annotate_func):
+ def cmd_start (self, port_id_list, stream_list, mult, force):
- if force:
- rc = self.cmd_stop(ports, annotate_func)
- if not rc:
- return False
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
- rc, log = self.remove_all_streams(ports)
- annotate_func("Removing all streams from ports {0}:".format([port for port in ports]), rc, log,
- "Please either retry with --force or stop traffic")
- if not rc:
- return False
+ if active_ports:
+ if not force:
+ msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports)
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+ else:
+ rc = self.cmd_stop(active_ports)
+ if not rc:
+ return rc
- rc, log = self.add_stream_pack(stream_list.compiled, port_id= ports)
- annotate_func("Attaching streams to port {0}:".format([port for port in ports]), rc, log)
- if not rc:
- return False
- # finally, start the traffic
- rc, log = self.start_traffic(mult, ports)
- annotate_func("Starting traffic on ports {0}:".format([port for port in ports]), rc, log)
- if not rc:
- return False
+ rc = self.remove_all_streams(port_id_list)
+ rc.annotate("Removing all streams from port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
- return True
- # ----- handler internal methods ----- #
- def _handle_general_response(self, request, response, msg, success_test=None):
- port_id = request.params.get("port_id")
- if not success_test:
- success_test = self.default_success_test
- if success_test(response):
- self._conn_handler[port_id] = response.data
- return RpcResponseStatus(True, port_id, msg)
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ rc = self.add_stream_pack(stream_list.compiled, port_id_list)
+ rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
- def _handle_acquire_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- self._conn_handler[port_id] = response.data
- return RpcResponseStatus(True, port_id, "Acquired")
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ # finally, start the traffic
+ rc = self.start_traffic(mult, port_id_list)
+ rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
+ ############## High Level API With Parser ################
+ def cmd_start_line (self, line):
+ '''Start selected traffic in specified ports on TRex\n'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "start",
+ self.cmd_start_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.FORCE,
+ parsing_opts.STREAM_FROM_PATH_OR_FILE,
+ parsing_opts.DURATION,
+ parsing_opts.MULTIPLIER)
+
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ if opts.db:
+ stream_list = self.stream_db.get_stream_pack(opts.db)
+ rc = RC(stream_list != None)
+ rc.annotate("Load stream pack (from DB):")
+ if rc.bad():
+ return RC_ERR("Failed to load stream pack")
- def _handle_add_stream_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- stream_id = request.params.get("stream_id")
- if success_test(response):
- return RpcResponseStatus(True, port_id, "Stream {0} added".format(stream_id))
else:
- return RpcResponseStatus(False, port_id, response.data)
+ # load streams from file
+ stream_list = self.streams_db.load_yaml_file(opts.file[0])
+ rc = RC(stream_list != None)
+ rc.annotate("Load stream pack (from file):")
+ if stream_list == None:
+ return RC_ERR("Failed to load stream pack")
- def _handle_remove_streams_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- return RpcResponseStatus(True, port_id, "Removed")
- else:
- return RpcResponseStatus(False, port_id, response.data)
- def _handle_release_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- del self._conn_handler[port_id]
- return RpcResponseStatus(True, port_id, "Released")
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force)
- def _handle_start_traffic_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- self._active_ports.add(port_id)
- return RpcResponseStatus(True, port_id, "Traffic started")
- else:
- return RpcResponseStatus(False, port_id, response.data)
-
- def _handle_stop_traffic_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- if port_id in self._active_ports:
- self._active_ports.remove(port_id)
- return RpcResponseStatus(True, port_id, "Traffic stopped")
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ def cmd_stop_line (self, line):
+ '''Stop active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "stop",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
- def _handle_get_global_stats_response(self, request, response, success_test):
- if response.success:
- return CGlobalStats(**response.success)
- else:
- return False
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
- def _handle_get_port_stats_response(self, request, response, success_test):
- if response.success:
- return CPortStats(**response.success)
- else:
- return False
+ return self.cmd_stop(opts.ports)
- def _handle_get_stream_stats_response(self, request, response, success_test):
- if response.success:
- return CStreamStats(**response.success)
- else:
- return False
- def _is_ports_valid(self, port_id):
- if isinstance(port_id, list) or isinstance(port_id, set):
- # check each item of the sequence
- return all([self._is_ports_valid(port)
- for port in port_id])
- elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()):
- return True
- else:
- return False
+ def cmd_reset_line (self, line):
+ return self.cmd_reset()
+
+
+ def cmd_exit_line (self, line):
+ print format_text("Exiting\n", 'bold')
+ # a way to exit
+ return RC_ERR("exit")
+
+
+ def cmd_wait_line (self, line):
+ '''wait for a period of time\n'''
+
+ parser = parsing_opts.gen_parser(self,
+ "wait",
+ self.cmd_wait_line.__doc__,
+ parsing_opts.DURATION)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ delay_sec = opts.d if opts.d else 1
- def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
- res_ok = True
- responses = []
- if isinstance(success_test, staticmethod):
- success_test = success_test.__func__
- for i, response in enumerate(resp_list):
- # run handler method with its params
- processed_response = handler_func(req_list[i], response, success_test)
- responses.append(processed_response)
- if not processed_response.success:
- res_ok = False
- # else:
- # res_ok = False # TODO: mark in this case somehow the bad result
- # print res_ok
- # print responses
- return res_ok, responses
+ print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold')
+ time.sleep(delay_sec)
+ return RC_OK()
+ # run a script of commands
+ def run_script_file (self, filename):
+
+ print format_text("\nRunning script file '{0}'...".format(filename), 'bold')
+
+ rc = self.cmd_connect()
+ if rc.bad():
+ return
+
+ with open(filename) as f:
+ script_lines = f.readlines()
+
+ cmd_table = {}
+
+ # register all the commands
+ cmd_table['start'] = self.cmd_start_line
+ cmd_table['stop'] = self.cmd_stop_line
+ cmd_table['reset'] = self.cmd_reset_line
+ cmd_table['wait'] = self.cmd_wait_line
+ cmd_table['exit'] = self.cmd_exit_line
+
+ for index, line in enumerate(script_lines):
+ line = line.strip()
+ if line == "":
+ continue
+ if line.startswith("#"):
+ continue
+
+ sp = line.split(' ', 1)
+ cmd = sp[0]
+ if len(sp) == 2:
+ args = sp[1]
+ else:
+ args = ""
+
+ print format_text("Executing line {0} : '{1}'\n".format(index, line))
+
+ if not cmd in cmd_table:
+ print "\n*** Error at line {0} : '{1}'\n".format(index, line)
+ print format_text("unknown command '{0}'\n".format(cmd), 'bold')
+ return False
+
+ rc = cmd_table[cmd](args)
+ if rc.bad():
+ return False
+
+ print format_text("\n[Done]", 'bold')
+
+ return True
+
+ #################################
# ------ private classes ------ #
class CCommLink(object):
"""describes the connectivity of the stateless client method"""
@@ -713,3 +909,4 @@ class CTRexStatelessClient(object):
if __name__ == "__main__":
pass
+
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index 58491aba..077c82ad 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -110,7 +110,7 @@ class JsonRpcClient(object):
return id, msg
- def invoke_rpc_method (self, method_name, params = {}, block = False):
+ def invoke_rpc_method (self, method_name, params = {}, block = True):
if not self.connected:
return False, "Not connected to server"
@@ -120,7 +120,7 @@ class JsonRpcClient(object):
# low level send of string message
- def send_raw_msg (self, msg, block = False):
+ def send_raw_msg (self, msg, block = True):
self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
if block:
@@ -248,272 +248,3 @@ class JsonRpcClient(object):
if hasattr(self, "context"):
self.context.destroy(linger=0)
-# MOVE THIS TO DAN'S FILE
-class TrexStatelessClient(JsonRpcClient):
-
- def __init__ (self, server, port, user):
-
- super(TrexStatelessClient, self).__init__(server, port)
-
- self.user = user
- self.port_handlers = {}
-
- self.supported_cmds = []
- self.system_info = None
- self.server_version = None
-
-
- def whoami (self):
- return self.user
-
- def ping_rpc_server(self):
-
- return self.invoke_rpc_method("ping", block = False)
-
- def get_rpc_server_version (self):
- return self.server_version
-
- def get_system_info (self):
- if not self.system_info:
- return {}
-
- return self.system_info
-
- def get_supported_cmds(self):
- if not self.supported_cmds:
- return {}
-
- return self.supported_cmds
-
- def get_port_count (self):
- if not self.system_info:
- return 0
-
- return self.system_info["port_count"]
-
- # sync the client with all the server required data
- def sync (self):
-
- # get server version
- rc, msg = self.invoke_rpc_method("get_version")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.server_version = msg
-
- # get supported commands
- rc, msg = self.invoke_rpc_method("get_supported_cmds")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.supported_cmds = [str(x) for x in msg if x]
-
- # get system info
- rc, msg = self.invoke_rpc_method("get_system_info")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.system_info = msg
-
- return True, ""
-
- def connect (self):
- rc, err = super(TrexStatelessClient, self).connect()
- if not rc:
- return rc, err
-
- return self.sync()
-
-
- # take ownership over ports
- def take_ownership (self, port_id_array, force = False):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- for port_id in port_id_array:
- batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
-
- def release_ports (self, port_id_array):
- batch = self.create_batch()
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("release", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers.pop(port_id_array[i])
-
- return True, resp_list
-
- def get_owned_ports (self):
- return self.port_handlers.keys()
-
- # fetch port stats
- def get_port_stats (self, port_id_array):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- # empty list means all
- if port_id_array == []:
- port_id_array = list([x for x in xrange(0, self.system_info["port_count"])])
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
-
- return rc, resp_list
-
- # snapshot will take a snapshot of all your owned ports for streams and etc.
- def snapshot(self):
-
-
- if len(self.get_owned_ports()) == 0:
- return {}
-
- snap = {}
-
- batch = self.create_batch()
-
- for port_id in self.get_owned_ports():
-
- batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
- batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- # split the list to 2s
- index = 0
- for port_id in self.get_owned_ports():
- if not resp_list[index] or not resp_list[index + 1]:
- snap[port_id] = None
- continue
-
- # fetch the first two
- stats = resp_list[index][1]
- stream_list = resp_list[index + 1][1]
-
- port = {}
- port['status'] = stats['status']
- port['stream_list'] = []
-
- # get all the streams
- if len(stream_list) > 0:
- batch = self.create_batch()
- for stream_id in stream_list:
- batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]})
-
- rc, stream_resp_list = batch.invoke()
- if not rc:
- port = {}
-
- port['streams'] = {}
- for i, resp in enumerate(stream_resp_list):
- if resp[0]:
- port['streams'][stream_list[i]] = resp[1]
-
- snap[port_id] = port
-
- # move to next one
- index += 2
-
-
- return snap
-
- # add stream
- # def add_stream (self, port_id, stream_id, isg, next_stream_id, packet, vm=[]):
- # if not port_id in self.get_owned_ports():
- # return False, "Port {0} is not owned... please take ownership before adding streams".format(port_id)
- #
- # handler = self.port_handlers[port_id]
- #
- # stream = {}
- # stream['enabled'] = True
- # stream['self_start'] = True
- # stream['isg'] = isg
- # stream['next_stream_id'] = next_stream_id
- # stream['packet'] = {}
- # stream['packet']['binary'] = packet
- # stream['packet']['meta'] = ""
- # stream['vm'] = vm
- # stream['rx_stats'] = {}
- # stream['rx_stats']['enabled'] = False
- #
- # stream['mode'] = {}
- # stream['mode']['type'] = 'continuous'
- # stream['mode']['pps'] = 10.0
- #
- # params = {}
- # params['handler'] = handler
- # params['stream'] = stream
- # params['port_id'] = port_id
- # params['stream_id'] = stream_id
- #
- # print params
- # return self.invoke_rpc_method('add_stream', params = params)
-
- def add_stream(self, port_id_array, stream_pack_list):
- batch = self.create_batch()
-
- for port_id in port_id_array:
- for stream_pack in stream_pack_list:
- params = {"port_id": port_id,
- "handler": self.port_handlers[port_id],
- "stream_id": stream_pack.stream_id,
- "stream": stream_pack.stream}
- batch.add("add_stream", params=params)
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- print "Stream {0} - {1}".format(i, rc[1])
- # self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
- # return self.invoke_rpc_method('add_stream', params = params)
-
-if __name__ == "__main__":
- pass \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/console/parsing_opts.py b/scripts/automation/trex_control_plane/console/parsing_opts.py
index f983d837..c154ce24 100755
--- a/scripts/automation/trex_control_plane/console/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/console/parsing_opts.py
@@ -135,12 +135,13 @@ class CCmdArgParser(argparse.ArgumentParser):
if opts is None:
return None
- if opts.all_ports:
+ if getattr(opts, "all_ports", None):
opts.ports = self.stateless_client.get_port_ids()
- for port in opts.ports:
- if not self.stateless_client._is_ports_valid(port):
- self.error("port id {0} is not a valid\n".format(port))
+ if getattr(opts, "ports", None):
+ for port in opts.ports:
+ if not self.stateless_client.validate_port_list([port]):
+ self.error("port id '{0}' is not a valid port id\n".format(port))
return opts
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index 5470e694..88e8dede 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -30,72 +30,12 @@ import tty, termios
import trex_root_path
from common.trex_streams import *
from client.trex_stateless_client import CTRexStatelessClient
-from client.trex_stateless_client import RpcResponseStatus
from common.text_opts import *
from client_utils.general_utils import user_input, get_current_user
-import parsing_opts
import trex_status
-from collections import namedtuple
-__version__ = "1.0"
-
-LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
-
-class CStreamsDB(object):
-
- def __init__(self):
- self.stream_packs = {}
-
- def load_yaml_file (self, filename):
-
- stream_pack_name = filename
- if stream_pack_name in self.get_loaded_streams_names():
- self.remove_stream_packs(stream_pack_name)
-
- stream_list = CStreamList()
- loaded_obj = stream_list.load_yaml(filename)
- try:
- compiled_streams = stream_list.compile_streams()
- rc = self.load_streams(stream_pack_name,
- LoadedStreamList(loaded_obj,
- [StreamPack(v.stream_id, v.stream.dump())
- for k, v in compiled_streams.items()]))
-
- except Exception as e:
- return None
-
- return self.get_stream_pack(stream_pack_name)
-
- def load_streams(self, name, LoadedStreamList_obj):
- if name in self.stream_packs:
- return False
- else:
- self.stream_packs[name] = LoadedStreamList_obj
- return True
-
- def remove_stream_packs(self, *names):
- removed_streams = []
- for name in names:
- removed = self.stream_packs.pop(name)
- if removed:
- removed_streams.append(name)
- return removed_streams
-
- def clear(self):
- self.stream_packs.clear()
-
- def get_loaded_streams_names(self):
- return self.stream_packs.keys()
-
- def stream_pack_exists (self, name):
- return name in self.get_loaded_streams_names()
-
- def get_stream_pack(self, name):
- if not self.stream_pack_exists(name):
- return None
- else:
- return self.stream_packs.get(name)
+__version__ = "1.0"
#
@@ -111,15 +51,11 @@ class TRexConsole(cmd.Cmd):
self.verbose = verbose
self.acquire_all_ports = acquire_all_ports
- self.do_connect("")
-
self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__)
self.intro += "\nType 'help' or '?' for supported actions\n"
self.postcmd(False, "")
- self.streams_db = CStreamsDB()
-
################### internal section ########################
@@ -155,51 +91,23 @@ class TRexConsole(cmd.Cmd):
path = dir
else:
path = "."
- start_string = os.path.basename(text)
- return [x
- for x in os.listdir(path)
- if x.startswith(start_string)]
-
- ####################### shell commands #######################
- # set verbose on / off
- def do_verbose(self, line):
- '''Shows or set verbose mode\n'''
- if line == "":
- print "\nverbose is " + ("on\n" if self.verbose else "off\n")
-
- elif line == "on":
- self.verbose = True
- self.stateless_client.set_verbose(True)
- print green("\nverbose set to on\n")
-
- elif line == "off":
- self.verbose = False
- self.stateless_client.set_verbose(False)
- print green("\nverbose set to off\n")
- else:
- print magenta("\nplease specify 'on' or 'off'\n")
-
- ############### connect
- def do_connect (self, line):
- '''Connects to the server\n'''
-
- res_ok, msg = self.stateless_client.connect()
- if res_ok:
- print format_text("[SUCCESS]\n", 'green', 'bold')
- else:
- print "\n*** " + msg + "\n"
- print format_text("[FAILED]\n", 'red', 'bold')
- return
+ start_string = os.path.basename(text)
+
+ targets = []
- self.supported_rpc = self.stateless_client.get_supported_cmds().data
+ for x in os.listdir(path):
+ if x.startswith(start_string):
+ y = os.path.join(path, x)
+ if os.path.isfile(y):
+ targets.append(x + ' ')
+ elif os.path.isdir(y):
+ targets.append(x + '/')
- if self.acquire_all_ports:
- res_ok, log = self.stateless_client.acquire(self.stateless_client.get_port_ids())
- if not res_ok:
- print "\n*** Failed to acquire all ports... exiting..."""
+ return targets
+ # annotation method
@staticmethod
def annotate (desc, rc = None, err_log = None, ext_err_msg = None):
print format_text('\n{:<40}'.format(desc), 'bold'),
@@ -230,6 +138,54 @@ class TRexConsole(cmd.Cmd):
return True
+ ####################### shell commands #######################
+ def do_ping (self, line):
+ '''Ping the server\n'''
+
+ rc = self.stateless_client.cmd_ping()
+ if rc.bad():
+ return
+
+ def do_test (self, line):
+ print self.stateless_client.get_acquired_ports()
+
+ # set verbose on / off
+ def do_verbose(self, line):
+ '''Shows or set verbose mode\n'''
+ if line == "":
+ print "\nverbose is " + ("on\n" if self.verbose else "off\n")
+
+ elif line == "on":
+ self.verbose = True
+ self.stateless_client.set_verbose(True)
+ print format_text("\nverbose set to on\n", 'green', 'bold')
+
+ elif line == "off":
+ self.verbose = False
+ self.stateless_client.set_verbose(False)
+ print format_text("\nverbose set to off\n", 'green', 'bold')
+
+ else:
+ print format_text("\nplease specify 'on' or 'off'\n", 'bold')
+
+
+ ############### connect
+ def do_connect (self, line):
+ '''Connects to the server\n'''
+
+ rc = self.stateless_client.cmd_connect()
+ if rc.bad():
+ return
+
+
+ def do_disconnect (self, line):
+ '''Disconnect from the server\n'''
+
+ rc = self.stateless_client.cmd_disconnect()
+ if rc.bad():
+ return
+
+
############### start
def complete_start(self, text, line, begidx, endidx):
@@ -247,37 +203,7 @@ class TRexConsole(cmd.Cmd):
def do_start(self, line):
'''Start selected traffic in specified ports on TRex\n'''
- # make sure that the user wants to acquire all
- parser = parsing_opts.gen_parser(self.stateless_client,
- "start",
- self.do_start.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL,
- parsing_opts.FORCE,
- parsing_opts.STREAM_FROM_PATH_OR_FILE,
- parsing_opts.DURATION,
- parsing_opts.MULTIPLIER)
-
- opts = parser.parse_args(line.split())
-
- if opts is None:
- return
-
- if opts.db:
- stream_list = self.stream_db.get_stream_pack(opts.db)
- self.annotate("Load stream pack (from DB):", (stream_list != None))
- if stream_list == None:
- return
-
- else:
- # load streams from file
- stream_list = self.streams_db.load_yaml_file(opts.file[0])
- self.annotate("Load stream pack (from file):", (stream_list != None))
- if stream_list == None:
- return
-
-
- self.stateless_client.cmd_start(opts.ports, stream_list, opts.mult, opts.force, self.annotate)
- return
+ self.stateless_client.cmd_start_line(line)
def help_start(self):
@@ -285,18 +211,9 @@ class TRexConsole(cmd.Cmd):
############# stop
def do_stop(self, line):
- '''Stop active traffic in specified ports on TRex\n'''
- parser = parsing_opts.gen_parser(self.stateless_client,
- "stop",
- self.do_stop.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return
+ self.stateless_client.cmd_stop_line(line)
- self.stateless_client.cmd_stop(opts.ports, self.annotate)
- return
+
def help_stop(self):
self.do_stop("-h")
@@ -304,7 +221,7 @@ class TRexConsole(cmd.Cmd):
########## reset
def do_reset (self, line):
'''force stop all ports\n'''
- self.stateless_client.cmd_reset(self.annotate)
+ self.stateless_client.cmd_reset()
# tui
@@ -312,7 +229,7 @@ class TRexConsole(cmd.Cmd):
'''Shows a graphical console\n'''
if not self.stateless_client.is_connected():
- print "Not connected to server\n"
+ print format_text("\nNot connected to server\n", 'bold')
return
self.do_verbose('off')
@@ -364,6 +281,14 @@ class TRexConsole(cmd.Cmd):
do_exit = do_EOF = do_q = do_quit
+#
+def is_valid_file(filename):
+ if not os.path.isfile(filename):
+ raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename)
+
+ return filename
+
+
def setParserOptions():
parser = argparse.ArgumentParser(prog="trex_console.py")
@@ -393,6 +318,12 @@ def setParserOptions():
action="store_false", help="Acquire all ports on connect. Default is: ON.",
default = True)
+ parser.add_argument("--batch", dest="batch",
+ nargs = 1,
+ type = is_valid_file,
+ help = "Run the console in a batch mode with file",
+ default = None)
+
return parser
@@ -402,7 +333,15 @@ def main():
# Stateless client connection
stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub)
+ rc = stateless_client.cmd_connect()
+ if rc.bad():
+ return
+ if options.batch:
+ cont = stateless_client.run_script_file(options.batch[0])
+ if not cont:
+ return
+
# console
try:
console = TRexConsole(stateless_client, options.acquire, options.verbose)
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 907b9cf4..7f2382d3 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -186,22 +186,22 @@ TrexStatelessPort::get_state_as_string() const {
switch (get_state()) {
case PORT_STATE_DOWN:
- return "down";
+ return "DOWN";
case PORT_STATE_IDLE:
- return "no streams";
+ return "IDLE";
case PORT_STATE_STREAMS:
- return "with streams, idle";
+ return "STREAMS";
case PORT_STATE_TX:
- return "transmitting";
+ return "TX";
case PORT_STATE_PAUSE:
- return "paused";
+ return "PAUSE";
}
- return "unknown";
+ return "UNKNOWN";
}
void