summaryrefslogtreecommitdiffstats
path: root/scripts/automation
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py97
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_client.py78
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py1305
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py325
-rw-r--r--scripts/automation/trex_control_plane/console/line_parsing.py5
-rw-r--r--scripts/automation/trex_control_plane/console/old_console.py958
-rwxr-xr-xscripts/automation/trex_control_plane/console/parsing_opts.py193
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py884
-rw-r--r--scripts/automation/trex_control_plane/console/trex_status.py215
-rwxr-xr-xscripts/automation/trex_control_plane/server/trex_server.py45
10 files changed, 2640 insertions, 1465 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 49ef9506..adb91d97 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -8,6 +8,8 @@ except ImportError:
import client.outer_packages
from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+from common.text_opts import *
+
import json
import threading
import time
@@ -25,6 +27,15 @@ class TrexAsyncStats(object):
self.current = {}
self.last_update_ts = datetime.datetime.now()
+ def __format_num (self, size, suffix = ""):
+
+ for unit in ['','K','M','G','T','P']:
+ if abs(size) < 1000.0:
+ return "%3.2f %s%s" % (size, unit, suffix)
+ size /= 1000.0
+
+ return "NaN"
+
def update (self, snapshot):
#update
@@ -36,18 +47,25 @@ class TrexAsyncStats(object):
self.ref_point = self.current
- def get (self, field):
+ def get (self, field, format = False, suffix = ""):
if not field in self.current:
- return 0
+ return "N/A"
+
+ if not format:
+ return self.current[field]
+ else:
+ return self.__format_num(self.current[field], suffix)
- return self.current[field]
- def get_rel (self, field):
+ def get_rel (self, field, format = False, suffix = ""):
if not field in self.current:
- return 0
+ return "N/A"
- return self.current[field] - self.ref_point[field]
+ if not format:
+ return (self.current[field] - self.ref_point[field])
+ else:
+ return self.__format_num(self.current[field] - self.ref_point[field], suffix)
# return true if new data has arrived in the past 2 seconds
@@ -66,6 +84,8 @@ class TrexAsyncStatsPort(TrexAsyncStats):
def __init__ (self):
super(TrexAsyncStatsPort, self).__init__()
+ def get_stream_stats (self, stream_id):
+ return None
# stats manager
class TrexAsyncStatsManager():
@@ -80,19 +100,14 @@ class TrexAsyncStatsManager():
def get_port_stats (self, port_id):
- if not port_id in self.port_stats:
+ if not str(port_id) in self.port_stats:
return None
- return self.port_stats[port_id]
+ return self.port_stats[str(port_id)]
-
- def update (self, snapshot):
-
- if snapshot['name'] == 'trex-global':
- self.__handle_snapshot(snapshot['data'])
- else:
- # for now ignore the rest
- return
+
+ def update (self, data):
+ self.__handle_snapshot(data)
def __handle_snapshot (self, snapshot):
@@ -103,14 +118,16 @@ class TrexAsyncStatsManager():
for key, value in snapshot.iteritems():
# match a pattern of ports
- m = re.search('.*\-([0-8])', key)
+ m = re.search('(.*)\-([0-8])', key)
if m:
- port_id = m.group(1)
+
+ port_id = m.group(2)
+ field_name = m.group(1)
if not port_id in port_stats:
port_stats[port_id] = {}
- port_stats[port_id][key] = value
+ port_stats[port_id][field_name] = value
else:
# no port match - general stats
@@ -132,27 +149,30 @@ class TrexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, port):
+ def __init__ (self, server, port, stateless_client):
self.port = port
+ self.server = server
+ self.stateless_client = stateless_client
self.raw_snapshot = {}
self.stats = TrexAsyncStatsManager()
- self.tr = "tcp://localhost:{0}".format(self.port)
+ self.tr = "tcp://{0}:{1}".format(self.server, self.port)
print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
self.active = True
- self.t = threading.Thread(target = self._run)
+ self.t = threading.Thread(target = self.run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
- def _run (self):
+
+ def run (self):
# Socket to talk to server
self.context = zmq.Context()
@@ -162,12 +182,15 @@ class CTRexAsyncClient():
self.socket.setsockopt(zmq.SUBSCRIBE, '')
while self.active:
- msg = json.loads(self.socket.recv_string())
+ line = self.socket.recv_string();
+ msg = json.loads(line)
- key = msg['name']
- self.raw_snapshot[key] = msg['data']
+ name = msg['name']
+ data = msg['data']
+ type = msg['type']
+ self.raw_snapshot[name] = data
- self.stats.update(msg)
+ self.__dispatch(name, type, data)
def get_stats (self):
@@ -178,6 +201,26 @@ class CTRexAsyncClient():
return self.raw_snapshot
+ # dispatch the message to the right place
+ def __dispatch (self, name, type, data):
+ # stats
+ if name == "trex-global":
+ self.stats.update(data)
+ # events
+ elif name == "trex-event":
+ self.__handle_async_event(type, data)
+ else:
+ # ignore
+ pass
+
+ def __handle_async_event (self, type, data):
+ # DP stopped
+ if (type == 0):
+ port_id = int(data['port_id'])
+ print format_text("\n[Event] - Port {0} Stopped".format(port_id), 'bold')
+ # call the handler
+ self.stateless_client.async_event_port_stopped(port_id)
+
def stop (self):
self.active = False
self.t.join()
diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py
index c3677132..160abdec 100755
--- a/scripts/automation/trex_control_plane/client/trex_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_client.py
@@ -22,7 +22,7 @@ import time
import re
import copy
import binascii
-from collections import deque
+from collections import deque, OrderedDict
from json import JSONDecoder
from distutils.util import strtobool
@@ -497,6 +497,78 @@ class CTRexClient(object):
finally:
self.prompt_verbose_data()
+ def get_trex_daemon_log (self):
+ """
+ Get Trex daemon log.
+
+ :return:
+ String representation of TRex daemon log
+
+ :raises:
+ + :exc:`trex_exceptions.TRexRequestDenied`, in case file could not be read.
+ + ProtocolError, in case of error in JSON-RPC protocol.
+
+ """
+ try:
+ return binascii.a2b_base64(self.server.get_trex_daemon_log())
+ except AppError as err:
+ self._handle_AppError_exception(err.args[0])
+ except ProtocolError:
+ raise
+ finally:
+ self.prompt_verbose_data()
+
+ def get_trex_log (self):
+ """
+ Get TRex CLI output log
+
+ :return:
+ String representation of TRex log
+
+ :raises:
+ + :exc:`trex_exceptions.TRexRequestDenied`, in case file could not be fetched at server side.
+ + ProtocolError, in case of error in JSON-RPC protocol.
+
+ """
+ try:
+ return binascii.a2b_base64(self.server.get_trex_log())
+ except AppError as err:
+ self._handle_AppError_exception(err.args[0])
+ except ProtocolError:
+ raise
+ finally:
+ self.prompt_verbose_data()
+
+ def get_trex_version (self):
+ """
+ Get TRex version details.
+
+ :return:
+ Trex details (Version, User, Date, Uuid) as ordered dictionary
+
+ :raises:
+ + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex version could not be determined.
+ + ProtocolError, in case of error in JSON-RPC protocol.
+ + General Exception is case one of the keys is missing in response
+ """
+
+ try:
+ version_dict = OrderedDict()
+ result_lines = binascii.a2b_base64(self.server.get_trex_version()).split('\n')
+ for line in result_lines:
+ key, value = line.strip().split(':', 1)
+ version_dict[key.strip()] = value.strip()
+ for key in ('Version', 'User', 'Date', 'Uuid'):
+ if key not in version_dict:
+ raise Exception('get_trex_version: got server response without key: {0}'.format(key))
+ return version_dict
+ except AppError as err:
+ self._handle_AppError_exception(err.args[0])
+ except ProtocolError:
+ raise
+ finally:
+ self.prompt_verbose_data()
+
def reserve_trex (self, user = None):
"""
Reserves the usage of TRex to a certain user.
@@ -650,8 +722,8 @@ class CTRexClient(object):
"""
if self.verbose:
print ('\n')
- print ("(*) JSON-RPC request: "+ self.history.request)
- print ("(*) JSON-RPC response: "+ self.history.response)
+ print ("(*) JSON-RPC request:", self.history.request)
+ print ("(*) JSON-RPC response:", self.history.response)
def __verbose_print(self, print_str):
"""
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index aeb25422..7bcbf2c7 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -6,12 +6,16 @@ try:
except ImportError:
# support import for Python 3
import client.outer_packages
+
from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
from client_utils.packet_builder import CTRexPktBuilder
import json
from common.trex_stats import *
from common.trex_streams import *
from collections import namedtuple
+from common.text_opts import *
+import parsing_opts
+import time
from trex_async_client import CTRexAsyncClient
@@ -24,107 +28,475 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
msg=self.msg,
stat="success" if self.success else "fail")
-# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])
+# simple class to represent complex return value
+class RC:
+
+ def __init__ (self, rc = None, data = None):
+ self.rc_list = []
+
+ if (rc != None) and (data != None):
+ tuple_rc = namedtuple('RC', ['rc', 'data'])
+ self.rc_list.append(tuple_rc(rc, data))
+
+ def add (self, rc):
+ self.rc_list += rc.rc_list
+
+ def good (self):
+ return all([x.rc for x in self.rc_list])
+
+ def bad (self):
+ return not self.good()
+
+ def data (self):
+ return all([x.data if x.rc else "" for x in self.rc_list])
+
+ def err (self):
+ return all([x.data if not x.rc else "" for x in self.rc_list])
+
+ def annotate (self, desc = None):
+ if desc:
+ print format_text('\n{:<40}'.format(desc), 'bold'),
+
+ if self.bad():
+ # print all the errors
+ print ""
+ for x in self.rc_list:
+ if not x.rc:
+ print format_text("\n{0}".format(x.data), 'bold')
+
+ print ""
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+
+ else:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+
+
+def RC_OK():
+ return RC(True, "")
+def RC_ERR (err):
+ return RC(False, err)
+
+
+LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
+
+# describes a stream DB
+class CStreamsDB(object):
+
+ def __init__(self):
+ self.stream_packs = {}
+
+ def load_yaml_file (self, filename):
+
+ stream_pack_name = filename
+ if stream_pack_name in self.get_loaded_streams_names():
+ self.remove_stream_packs(stream_pack_name)
+
+ stream_list = CStreamList()
+ loaded_obj = stream_list.load_yaml(filename)
+
+ try:
+ compiled_streams = stream_list.compile_streams()
+ rc = self.load_streams(stream_pack_name,
+ LoadedStreamList(loaded_obj,
+ [StreamPack(v.stream_id, v.stream.dump())
+ for k, v in compiled_streams.items()]))
+
+ except Exception as e:
+ return None
+
+ return self.get_stream_pack(stream_pack_name)
+
+ def load_streams(self, name, LoadedStreamList_obj):
+ if name in self.stream_packs:
+ return False
+ else:
+ self.stream_packs[name] = LoadedStreamList_obj
+ return True
+
+ def remove_stream_packs(self, *names):
+ removed_streams = []
+ for name in names:
+ removed = self.stream_packs.pop(name)
+ if removed:
+ removed_streams.append(name)
+ return removed_streams
+
+ def clear(self):
+ self.stream_packs.clear()
+
+ def get_loaded_streams_names(self):
+ return self.stream_packs.keys()
+
+ def stream_pack_exists (self, name):
+ return name in self.get_loaded_streams_names()
+
+ def get_stream_pack(self, name):
+ if not self.stream_pack_exists(name):
+ return None
+ else:
+ return self.stream_packs.get(name)
+
+
+# describes a single port
+class Port(object):
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+
+ def __init__ (self, port_id, user, transmit):
+ self.port_id = port_id
+ self.state = self.STATE_IDLE
+ self.handler = None
+ self.transmit = transmit
+ self.user = user
+
+ self.streams = {}
+
+ def err(self, msg):
+ return RC_ERR("port {0} : {1}".format(self.port_id, msg))
+
+ def ok(self):
+ return RC_OK()
+
+ # take the port
+ def acquire(self, force = False):
+ params = {"port_id": self.port_id,
+ "user": self.user,
+ "force": force}
+
+ command = RpcCmdData("acquire", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return self.ok()
+ else:
+ return self.err(rc.data)
+
+
+ # release the port
+ def release(self):
+ params = {"port_id": self.port_id,
+ "handler": self.handler}
+
+ command = RpcCmdData("release", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return self.ok()
+ else:
+ return self.err(rc.data)
+
+ def is_acquired(self):
+ return (self.handler != None)
+
+ def is_active(self):
+ return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
+
+ def sync(self, sync_data):
+ self.handler = sync_data['handler']
+ port_state = sync_data['state'].upper()
+ if port_state == "DOWN":
+ self.state = self.STATE_DOWN
+ elif port_state == "IDLE":
+ self.state = self.STATE_IDLE
+ elif port_state == "STREAMS":
+ self.state = self.STATE_STREAMS
+ elif port_state == "TX":
+ self.state = self.STATE_TX
+ elif port_state == "PAUSE":
+ self.state = self.STATE_PAUSE
+ else:
+ raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
+
+ return self.ok()
+
+
+ # return TRUE if write commands
+ def is_port_writable (self):
+ # operations on port can be done on state idle or state streams
+ return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
+
+ # add stream to the port
+ def add_stream (self, stream_id, stream_obj):
+
+ if not self.is_port_writable():
+ return self.err("Please stop port before attempting to add streams")
+
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id,
+ "stream": stream_obj}
+
+ rc, data = self.transmit("add_stream", params)
+ if not rc:
+ r = self.err(data)
+ print r.good()
+
+ # add the stream
+ self.streams[stream_id] = stream_obj
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ # remove stream from port
+ def remove_stream (self, stream_id):
+
+ if not stream_id in self.streams:
+ return self.err("stream {0} does not exists".format(stream_id))
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id}
+
+
+ rc, data = self.transmit("remove_stream", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams[stream_id] = None
+
+ return self.ok()
+
+ # remove all the streams
+ def remove_all_streams (self):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("remove_all_streams", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams = {}
+
+ return self.ok()
+
+ # get a specific stream
+ def get_stream (self, stream_id):
+ if stream_id in self.streams:
+ return self.streams[stream_id]
+ else:
+ return None
+
+ def get_all_streams (self):
+ return self.streams
+
+
+ # start traffic
+ def start (self, mul, duration):
+ if self.state == self.STATE_DOWN:
+ return self.err("Unable to start traffic - port is down")
+
+ if self.state == self.STATE_IDLE:
+ return self.err("Unable to start traffic - no streams attached to port")
+
+ if self.state == self.STATE_TX:
+ return self.err("Unable to start traffic - port is already transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul,
+ "duration": duration}
+
+ rc, data = self.transmit("start_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+ # stop traffic
+ # with force ignores the cached state and sends the command
+ def stop (self, force = False):
+
+ if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("stop_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ def pause (self):
+
+ if (self.state != self.STATE_TX) :
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("pause_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_PAUSE
+
+ return self.ok()
+
+ def resume (self):
+
+ if (self.state != self.STATE_PAUSE) :
+ return self.err("port is not in pause mode")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("resume_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+ ################# events handler ######################
+ def async_event_port_stopped (self):
+ self.state = self.STATE_STREAMS
+
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False):
+ def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
self.verbose = False
+ self.ports = []
self._conn_handler = {}
self._active_ports = set()
- self._stats = CTRexStatsManager("port", "stream")
self._system_info = None
self._server_version = None
self.__err_log = None
- self._async_client = CTRexAsyncClient(async_port)
-
-
- # ----- decorator methods ----- #
- def force_status(owned=True, active_and_owned=False):
- def wrapper(func):
- def wrapper_f(self, *args, **kwargs):
- port_ids = kwargs.get("port_id")
- if not port_ids:
- port_ids = args[0]
- if isinstance(port_ids, int):
- # make sure port_ids is a list
- port_ids = [port_ids]
- bad_ids = set()
- for port_id in port_ids:
- port_owned = self._conn_handler.get(port_id)
- if owned and not port_owned:
- bad_ids.add(port_id)
- elif active_and_owned: # stronger condition than just owned, hence gets precedence
- if port_owned and port_id in self._active_ports:
- continue
- else:
- bad_ids.add(port_id)
- else:
- continue
- if bad_ids:
- # Some port IDs are not according to desires status
- raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not "
- "at allowed states".format(func.__name__, list(bad_ids)))
- else:
- return func(self, *args, **kwargs)
- return wrapper_f
- return wrapper
-
- @property
- def system_info(self):
- if not self._system_info:
- rc, info = self.get_system_info()
- if rc:
- self._system_info = info
- else:
- self.__err_log = info
- return self._system_info if self._system_info else "Unknown"
-
- @property
- def server_version(self):
- if not self._server_version:
- rc, ver_info = self.get_version()
- if rc:
- self._server_version = ver_info
- else:
- self.__err_log = ver_info
- return self._server_version if self._server_version else "Unknown"
+ self._async_client = CTRexAsyncClient(server, async_port, self)
+
+ self.streams_db = CStreamsDB()
+
+ self.connected = False
+
+ ################# events handler ######################
+ def async_event_port_stopped (self, port_id):
+ self.ports[port_id].async_event_port_stopped()
+
+ ############# helper functions section ##############
+
+ def validate_port_list(self, port_id_list):
+ if not isinstance(port_id_list, list):
+ print type(port_id_list)
+ return False
+
+ # check each item of the sequence
+ return all([ (port_id >= 0) and (port_id < self.get_port_count())
+ for port_id in port_id_list ])
+
+ # some preprocessing for port argument
+ def __ports (self, port_id_list):
+
+ # none means all
+ if port_id_list == None:
+ return range(0, self.get_port_count())
+
+ # always list
+ if isinstance(port_id_list, int):
+ port_id_list = [port_id_list]
+
+ if not isinstance(port_id_list, list):
+ raise ValueError("bad port id list: {0}".format(port_id_list))
+
+ for port_id in port_id_list:
+ if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
+ raise ValueError("bad port id {0}".format(port_id))
- def is_connected(self):
- return self.comm_link.is_connected
+ return port_id_list
- # ----- user-access methods ----- #
+ ############ boot up section ################
+
+ # connection sequence
def connect(self):
- rc, err = self.comm_link.connect()
+
+ self.connected = False
+
+ # connect
+ rc, data = self.comm_link.connect()
if not rc:
- return rc, err
- return self._init_sync()
+ return RC_ERR(data)
- def get_stats_async (self):
- return self._async_client.get_stats()
+ # version
+ rc, data = self.transmit("get_version")
+ if not rc:
+ return RC_ERR(data)
+
+ self.server_version = data
+
+ # cache system info
+ rc, data = self.transmit("get_system_info")
+ if not rc:
+ return RC_ERR(data)
+
+ self.system_info = data
+
+ # cache supported commands
+ rc, data = self.transmit("get_supported_cmds")
+ if not rc:
+ return RC_ERR(data)
+
+ self.supported_cmds = data
+
+ # create ports
+ for port_id in xrange(self.get_port_count()):
+ self.ports.append(Port(port_id, self.user, self.transmit))
+
+ # acquire all ports
+ rc = self.acquire()
+ if rc.bad():
+ return rc
+
+ rc = self.sync_with_server()
+ if rc.bad():
+ return rc
+
+ self.connected = True
+
+ return RC_OK()
+
+ def is_connected (self):
+ return self.connected and self.comm_link.is_connected
- def get_connection_port (self):
- return self.comm_link.port
def disconnect(self):
- return self.comm_link.disconnect()
+ self.connected = False
+ self.comm_link.disconnect()
+ return RC_OK()
- def ping(self):
- return self.transmit("ping")
+
+
+ ########### cached queries (no server traffic) ###########
def get_supported_cmds(self):
- return self.transmit("get_supported_cmds")
+ return self.supported_cmds
def get_version(self):
- return self.transmit("get_version")
+ return self.server_version
def get_system_info(self):
- return self.transmit("get_system_info")
+ return self.system_info
def get_port_count(self):
return self.system_info.get("port_count")
@@ -136,385 +508,489 @@ class CTRexStatelessClient(object):
else:
return port_ids
- def sync_user(self, sync_streams=False):
- return self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
+ def get_stats_async (self):
+ return self._async_client.get_stats()
+
+ def get_connection_port (self):
+ return self.comm_link.port
+
+ def get_connection_ip (self):
+ return self.comm_link.server
def get_acquired_ports(self):
- return self._conn_handler.keys()
+ return [port.port_id for port in self.ports if port.is_acquired()]
+
def get_active_ports(self):
- return list(self._active_ports)
+ return [port.port_id for port in self.ports if port.is_active()]
def set_verbose(self, mode):
self.comm_link.set_verbose(mode)
self.verbose = mode
- def acquire(self, port_id, force=False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_acquire_response)
- else:
- params = {"port_id": port_id,
- "user": self.user,
- "force": force}
- command = RpcCmdData("acquire", params)
- return self._handle_acquire_response(command,
- self.transmit(command.method, command.params),
- self.default_success_test)
-
- @force_status(owned=True)
- def release(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_release_response,
- success_test=self.ack_success_test)
- else:
- self._conn_handler.pop(port_id)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("release", params)
- return self._handle_release_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
-
- @force_status(owned=True)
- def add_stream(self, stream_id, stream_obj, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- assert isinstance(stream_obj, CStream)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id,
- "stream": stream_obj.dump()}
- return self.transmit("add_stream", params)
-
- @force_status(owned=True)
- def add_stream_pack(self, port_id=None, *stream_packs):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
-
- # since almost every run contains more than one transaction with server, handle all as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = []
- for stream_pack in stream_packs:
- commands.extend([RpcCmdData("add_stream", {"port_id": p_id,
- "handler": self._conn_handler.get(p_id),
- "stream_id": stream_pack.stream_id,
- "stream": stream_pack.stream}
- )
- for p_id in port_ids]
- )
- res_ok, resp_list = self.transmit_batch(commands)
- if res_ok:
- return self._process_batch_result(commands, resp_list, self._handle_add_stream_response,
- success_test=self.ack_success_test)
-
- @force_status(owned=True)
- def remove_stream(self, stream_id, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id}
- return self.transmit("remove_stream", params)
-
- @force_status(owned=True)
- def remove_all_streams(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response,
- success_test=self.ack_success_test)
- else:
- params = {"port_id": port_id,
- "handler": self._conn_handler.get(port_id)}
- command = RpcCmdData("remove_all_streams", params)
- return self._handle_remove_streams_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
- pass
+ ############# server actions ################
+
+ # ping server
+ def ping(self):
+ rc, info = self.transmit("ping")
+ return RC(rc, info)
+
+
+ def sync_with_server(self, sync_streams=False):
+ rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
+ if not rc:
+ return RC_ERR(data)
+
+ for port_info in data:
+ rc = self.ports[port_info['port_id']].sync(port_info)
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
+
+
+ ########## port commands ##############
+ # acquire ports, if port_list is none - get all
+ def acquire (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].acquire(force))
+
+ return rc
+
+ # release ports
+ def release (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].release(force))
+
+ return rc
+
+
+ def add_stream(self, stream_id, stream_obj, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].add_stream(stream_id, stream_obj))
+
+ return rc
+
+
+ def add_stream_pack(self, stream_pack_list, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for stream_pack in stream_pack_list:
+ rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list))
+
+ return rc
+
+
+
+ def remove_stream(self, stream_id, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_stream(stream_id))
+
+ return rc
+
+
+
+ def remove_all_streams(self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_all_streams())
+
+ return rc
+
+
+ def get_stream(self, stream_id, port_id, get_pkt = False):
+
+ return self.ports[port_id].get_stream(stream_id)
+
+
+ def get_all_streams(self, port_id, get_pkt = False):
+
+ return self.ports[port_id].get_all_streams()
+
+
+ def get_stream_id_list(self, port_id):
+
+ return self.ports[port_id].get_stream_id_list()
+
+
+ def start_traffic (self, multiplier, duration, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].start(multiplier, duration))
+
+ return rc
+
+
+ def resume_traffic (self, port_id_list = None, force = False):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].resume())
+
+ return rc
+
+ def pause_traffic (self, port_id_list = None, force = False):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].pause())
+
+ return rc
+
+ def stop_traffic (self, port_id_list = None, force = False):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].stop(force))
+
+ return rc
+
- @force_status(owned=True, active_and_owned=True)
- def get_stream_id_list(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- return self.transmit("get_stream_list", params)
-
- @force_status(owned=True, active_and_owned=True)
- def get_stream(self, stream_id, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id}
- return self.transmit("get_stream_list", params)
-
- @force_status(owned=True)
- def start_traffic(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id, "mul": 1.0})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response,
- success_test=self.ack_success_test)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "mul": 1.0}
- command = RpcCmdData("start_traffic", params)
- return self._handle_start_traffic_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
-
- @force_status(owned=False, active_and_owned=True)
- def stop_traffic(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response,
- success_test=self.ack_success_test)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("stop_traffic", params)
- return self._handle_start_traffic_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
-
-# def get_global_stats(self):
-# command = RpcCmdData("get_global_stats", {})
-# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
-# # return self.transmit("get_global_stats")
-
- @force_status(owned=True, active_and_owned=True)
def get_port_stats(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("get_port_stats", params)
- return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
+ pass
- @force_status(owned=True, active_and_owned=True)
def get_stream_stats(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("get_stream_stats", params)
- return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
-
- # ----- internal methods ----- #
- def _init_sync(self):
- # get server version and system info
- err = False
- if self.server_version == "Unknown" or self.system_info == "Unknown":
- self.disconnect()
- return False, self.__err_log
- # sync with previous session
- res_ok, port_info = self.sync_user()
- if not res_ok:
- self.disconnect()
- return False, port_info
- else:
- # handle sync data
- for port in port_info:
- self._conn_handler[port.get("port_id")] = port.get("handler")
- if port.get("state") == "transmitting":
- # port is active
- self._active_ports.add(port.get("port_id"))
- return True, ""
+ pass
def transmit(self, method_name, params={}):
return self.comm_link.transmit(method_name, params)
- def transmit_batch(self, batch_list):
- return self.comm_link.transmit_batch(batch_list)
-
- @staticmethod
- def _object_decoder(obj_type, obj_data):
- if obj_type == "global":
- return CGlobalStats(**obj_data)
- elif obj_type == "port":
- return CPortStats(**obj_data)
- elif obj_type == "stream":
- return CStreamStats(**obj_data)
- else:
- # Do not serialize the data into class
- return obj_data
- @staticmethod
- def default_success_test(result_obj):
- if result_obj.success:
- return True
- else:
- return False
- @staticmethod
- def ack_success_test(result_obj):
- if result_obj.success and result_obj.data == "ACK":
- return True
- else:
- return False
+ ######################### Console (high level) API #########################
+ def cmd_ping(self):
+ rc = self.ping()
+ rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port()))
+ return rc
- # ----- handler internal methods ----- #
- def _handle_general_response(self, request, response, msg, success_test=None):
- port_id = request.params.get("port_id")
- if not success_test:
- success_test = self.default_success_test
- if success_test(response):
- self._conn_handler[port_id] = response.data
- return RpcResponseStatus(True, port_id, msg)
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ def cmd_connect(self):
+ rc = self.connect()
+ rc.annotate()
+ return rc
+ def cmd_disconnect(self):
+ rc = self.disconnect()
+ rc.annotate()
+ return rc
- def _handle_acquire_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- self._conn_handler[port_id] = response.data
- return RpcResponseStatus(True, port_id, "Acquired")
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ # reset
+ def cmd_reset(self):
- def _handle_add_stream_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- stream_id = request.params.get("stream_id")
- if success_test(response):
- return RpcResponseStatus(True, port_id, "Stream {0} added".format(stream_id))
- else:
- return RpcResponseStatus(False, port_id, response.data)
- def _handle_remove_streams_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- return RpcResponseStatus(True, port_id, "Removed")
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ # sync with the server
+ rc = self.sync_with_server()
+ rc.annotate("Syncing with the server:")
+ if rc.bad():
+ return rc
- def _handle_release_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- del self._conn_handler[port_id]
- return RpcResponseStatus(True, port_id, "Released")
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ rc = self.acquire(force = True)
+ rc.annotate("Force acquiring all ports:")
+ if rc.bad():
+ return rc
- def _handle_start_traffic_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- self._active_ports.add(port_id)
- return RpcResponseStatus(True, port_id, "Traffic started")
- else:
- return RpcResponseStatus(False, port_id, response.data)
- def _handle_stop_traffic_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- self._active_ports.remove(port_id)
- return RpcResponseStatus(True, port_id, "Traffic stopped")
- else:
- return RpcResponseStatus(False, port_id, response.data)
+ # force stop all ports
+ rc = self.stop_traffic(self.get_port_ids(), True)
+ rc.annotate("Stop traffic on all ports:")
+ if rc.bad():
+ return rc
- def _handle_get_global_stats_response(self, request, response, success_test):
- if response.success:
- return CGlobalStats(**response.success)
- else:
- return False
- def _handle_get_port_stats_response(self, request, response, success_test):
- if response.success:
- return CPortStats(**response.success)
- else:
- return False
+ # remove all streams
+ rc = self.remove_all_streams(self.get_port_ids())
+ rc.annotate("Removing all streams from all ports:")
+ if rc.bad():
+ return rc
- def _handle_get_stream_stats_response(self, request, response, success_test):
- if response.success:
- return CStreamStats(**response.success)
- else:
- return False
+ # TODO: clear stats
+ return RC_OK()
+
+
+ # stop cmd
+ def cmd_stop (self, port_id_list):
+
+ # find the relveant ports
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if not active_ports:
+ msg = "No active traffic on porvided ports"
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+
+ rc = self.stop_traffic(active_ports)
+ rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
+ # pause cmd
+ def cmd_pause (self, port_id_list):
+
+ # find the relveant ports
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if not active_ports:
+ msg = "No active traffic on porvided ports"
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+
+ rc = self.pause_traffic(active_ports)
+ rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
+ def cmd_pause_line (self, line):
+ '''Pause active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "pause",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_pause(opts.ports)
+
+
+ # resume cmd
+ def cmd_resume (self, port_id_list):
+
+ # find the relveant ports
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if not active_ports:
+ msg = "No active traffic on porvided ports"
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+
+ rc = self.resume_traffic(active_ports)
+ rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
+
+ def cmd_resume_line (self, line):
+ '''Resume active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "resume",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_resume(opts.ports)
+
+
+ # start cmd
+ def cmd_start (self, port_id_list, stream_list, mult, force, duration):
+
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
+ if active_ports:
+ if not force:
+ msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports)
+ print format_text(msg, 'bold')
+ return RC_ERR(msg)
+ else:
+ rc = self.cmd_stop(active_ports)
+ if not rc:
+ return rc
+
+
+ rc = self.remove_all_streams(port_id_list)
+ rc.annotate("Removing all streams from port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+
+ rc = self.add_stream_pack(stream_list.compiled, port_id_list)
+ rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+
+ # finally, start the traffic
+ rc = self.start_traffic(mult, duration, port_id_list)
+ rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
+ ############## High Level API With Parser ################
+ def cmd_start_line (self, line):
+ '''Start selected traffic in specified ports on TRex\n'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "start",
+ self.cmd_start_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.FORCE,
+ parsing_opts.STREAM_FROM_PATH_OR_FILE,
+ parsing_opts.DURATION,
+ parsing_opts.MULTIPLIER)
+
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ if opts.db:
+ stream_list = self.stream_db.get_stream_pack(opts.db)
+ rc = RC(stream_list != None)
+ rc.annotate("Load stream pack (from DB):")
+ if rc.bad():
+ return RC_ERR("Failed to load stream pack")
- def _is_ports_valid(self, port_id):
- if isinstance(port_id, list) or isinstance(port_id, set):
- # check each item of the sequence
- return all([self._is_ports_valid(port)
- for port in port_id])
- elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()):
- return True
else:
- return False
+ # load streams from file
+ stream_list = self.streams_db.load_yaml_file(opts.file[0])
+ rc = RC(stream_list != None)
+ rc.annotate("Load stream pack (from file):")
+ if stream_list == None:
+ return RC_ERR("Failed to load stream pack")
+
+
+ return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration)
+
+ def cmd_stop_line (self, line):
+ '''Stop active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "stop",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_stop(opts.ports)
+
+
+ def cmd_reset_line (self, line):
+ return self.cmd_reset()
+
+
+ def cmd_exit_line (self, line):
+ print format_text("Exiting\n", 'bold')
+ # a way to exit
+ return RC_ERR("exit")
+
- def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
- res_ok = True
- responses = []
- if isinstance(success_test, staticmethod):
- success_test = success_test.__func__
- for i, response in enumerate(resp_list):
- # run handler method with its params
- processed_response = handler_func(req_list[i], response, success_test)
- responses.append(processed_response)
- if not processed_response.success:
- res_ok = False
- # else:
- # res_ok = False # TODO: mark in this case somehow the bad result
- # print res_ok
- # print responses
- return res_ok, responses
+ def cmd_wait_line (self, line):
+ '''wait for a period of time\n'''
+ parser = parsing_opts.gen_parser(self,
+ "wait",
+ self.cmd_wait_line.__doc__,
+ parsing_opts.DURATION)
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ delay_sec = opts.duration if (opts.duration > 0) else 1
+
+ print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold')
+ time.sleep(delay_sec)
+
+ return RC_OK()
+
+ # run a script of commands
+ def run_script_file (self, filename):
+
+ print format_text("\nRunning script file '{0}'...".format(filename), 'bold')
+
+ rc = self.cmd_connect()
+ if rc.bad():
+ return
+
+ with open(filename) as f:
+ script_lines = f.readlines()
+
+ cmd_table = {}
+
+ # register all the commands
+ cmd_table['start'] = self.cmd_start_line
+ cmd_table['stop'] = self.cmd_stop_line
+ cmd_table['reset'] = self.cmd_reset_line
+ cmd_table['wait'] = self.cmd_wait_line
+ cmd_table['exit'] = self.cmd_exit_line
+
+ for index, line in enumerate(script_lines, start = 1):
+ line = line.strip()
+ if line == "":
+ continue
+ if line.startswith("#"):
+ continue
+
+ sp = line.split(' ', 1)
+ cmd = sp[0]
+ if len(sp) == 2:
+ args = sp[1]
+ else:
+ args = ""
+
+ print format_text("Executing line {0} : '{1}'\n".format(index, line))
+
+ if not cmd in cmd_table:
+ print "\n*** Error at line {0} : '{1}'\n".format(index, line)
+ print format_text("unknown command '{0}'\n".format(cmd), 'bold')
+ return False
+
+ rc = cmd_table[cmd](args)
+ if rc.bad():
+ return False
+
+ print format_text("\n[Done]", 'bold')
+
+ return True
+
+ #################################
# ------ private classes ------ #
class CCommLink(object):
"""describes the connectivity of the stateless client method"""
@@ -574,3 +1050,4 @@ class CTRexStatelessClient(object):
if __name__ == "__main__":
pass
+
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index 58491aba..b826f02f 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -110,45 +110,45 @@ class JsonRpcClient(object):
return id, msg
- def invoke_rpc_method (self, method_name, params = {}, block = False):
+ def invoke_rpc_method (self, method_name, params = {}):
if not self.connected:
return False, "Not connected to server"
id, msg = self.create_jsonrpc_v2(method_name, params)
- return self.send_raw_msg(msg, block)
+ return self.send_raw_msg(msg)
# low level send of string message
- def send_raw_msg (self, msg, block = False):
+ def send_raw_msg (self, msg):
+
self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
- if block:
- self.socket.send(msg)
- else:
+ tries = 0
+ while True:
try:
- self.socket.send(msg, flags = zmq.NOBLOCK)
- except zmq.error.ZMQError as e:
- self.disconnect()
- return CmdResponse(False, "Failed To Get Send Message")
-
- got_response = False
+ self.socket.send(msg)
+ break
+ except zmq.Again:
+ sleep(0.1)
+ tries += 1
+ if tries > 10:
+ self.disconnect()
+ return CmdResponse(False, "Failed to send message to server")
+
+
+ tries = 0
+ while True:
+ try:
+ response = self.socket.recv()
+ break
+ except zmq.Again:
+ sleep(0.1)
+ tries += 1
+ if tries > 10:
+ self.disconnect()
+ return CmdResponse(False, "Failed to get server response")
- if block:
- response = self.socket.recv()
- got_response = True
- else:
- for i in xrange(0 ,10):
- try:
- response = self.socket.recv(flags = zmq.NOBLOCK)
- got_response = True
- break
- except zmq.Again:
- sleep(0.2)
-
- if not got_response:
- self.disconnect()
- return CmdResponse(False, "Failed To Get Server Response")
self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
@@ -223,6 +223,8 @@ class JsonRpcClient(object):
except zmq.error.ZMQError as e:
return False, "ZMQ Error: Bad server or port name: " + str(e)
+ self.socket.setsockopt(zmq.SNDTIMEO, 5)
+ self.socket.setsockopt(zmq.RCVTIMEO, 5)
self.connected = True
@@ -248,272 +250,3 @@ class JsonRpcClient(object):
if hasattr(self, "context"):
self.context.destroy(linger=0)
-# MOVE THIS TO DAN'S FILE
-class TrexStatelessClient(JsonRpcClient):
-
- def __init__ (self, server, port, user):
-
- super(TrexStatelessClient, self).__init__(server, port)
-
- self.user = user
- self.port_handlers = {}
-
- self.supported_cmds = []
- self.system_info = None
- self.server_version = None
-
-
- def whoami (self):
- return self.user
-
- def ping_rpc_server(self):
-
- return self.invoke_rpc_method("ping", block = False)
-
- def get_rpc_server_version (self):
- return self.server_version
-
- def get_system_info (self):
- if not self.system_info:
- return {}
-
- return self.system_info
-
- def get_supported_cmds(self):
- if not self.supported_cmds:
- return {}
-
- return self.supported_cmds
-
- def get_port_count (self):
- if not self.system_info:
- return 0
-
- return self.system_info["port_count"]
-
- # sync the client with all the server required data
- def sync (self):
-
- # get server version
- rc, msg = self.invoke_rpc_method("get_version")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.server_version = msg
-
- # get supported commands
- rc, msg = self.invoke_rpc_method("get_supported_cmds")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.supported_cmds = [str(x) for x in msg if x]
-
- # get system info
- rc, msg = self.invoke_rpc_method("get_system_info")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.system_info = msg
-
- return True, ""
-
- def connect (self):
- rc, err = super(TrexStatelessClient, self).connect()
- if not rc:
- return rc, err
-
- return self.sync()
-
-
- # take ownership over ports
- def take_ownership (self, port_id_array, force = False):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- for port_id in port_id_array:
- batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
-
- def release_ports (self, port_id_array):
- batch = self.create_batch()
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("release", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers.pop(port_id_array[i])
-
- return True, resp_list
-
- def get_owned_ports (self):
- return self.port_handlers.keys()
-
- # fetch port stats
- def get_port_stats (self, port_id_array):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- # empty list means all
- if port_id_array == []:
- port_id_array = list([x for x in xrange(0, self.system_info["port_count"])])
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
-
- return rc, resp_list
-
- # snapshot will take a snapshot of all your owned ports for streams and etc.
- def snapshot(self):
-
-
- if len(self.get_owned_ports()) == 0:
- return {}
-
- snap = {}
-
- batch = self.create_batch()
-
- for port_id in self.get_owned_ports():
-
- batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
- batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- # split the list to 2s
- index = 0
- for port_id in self.get_owned_ports():
- if not resp_list[index] or not resp_list[index + 1]:
- snap[port_id] = None
- continue
-
- # fetch the first two
- stats = resp_list[index][1]
- stream_list = resp_list[index + 1][1]
-
- port = {}
- port['status'] = stats['status']
- port['stream_list'] = []
-
- # get all the streams
- if len(stream_list) > 0:
- batch = self.create_batch()
- for stream_id in stream_list:
- batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]})
-
- rc, stream_resp_list = batch.invoke()
- if not rc:
- port = {}
-
- port['streams'] = {}
- for i, resp in enumerate(stream_resp_list):
- if resp[0]:
- port['streams'][stream_list[i]] = resp[1]
-
- snap[port_id] = port
-
- # move to next one
- index += 2
-
-
- return snap
-
- # add stream
- # def add_stream (self, port_id, stream_id, isg, next_stream_id, packet, vm=[]):
- # if not port_id in self.get_owned_ports():
- # return False, "Port {0} is not owned... please take ownership before adding streams".format(port_id)
- #
- # handler = self.port_handlers[port_id]
- #
- # stream = {}
- # stream['enabled'] = True
- # stream['self_start'] = True
- # stream['isg'] = isg
- # stream['next_stream_id'] = next_stream_id
- # stream['packet'] = {}
- # stream['packet']['binary'] = packet
- # stream['packet']['meta'] = ""
- # stream['vm'] = vm
- # stream['rx_stats'] = {}
- # stream['rx_stats']['enabled'] = False
- #
- # stream['mode'] = {}
- # stream['mode']['type'] = 'continuous'
- # stream['mode']['pps'] = 10.0
- #
- # params = {}
- # params['handler'] = handler
- # params['stream'] = stream
- # params['port_id'] = port_id
- # params['stream_id'] = stream_id
- #
- # print params
- # return self.invoke_rpc_method('add_stream', params = params)
-
- def add_stream(self, port_id_array, stream_pack_list):
- batch = self.create_batch()
-
- for port_id in port_id_array:
- for stream_pack in stream_pack_list:
- params = {"port_id": port_id,
- "handler": self.port_handlers[port_id],
- "stream_id": stream_pack.stream_id,
- "stream": stream_pack.stream}
- batch.add("add_stream", params=params)
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- print "Stream {0} - {1}".format(i, rc[1])
- # self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
- # return self.invoke_rpc_method('add_stream', params = params)
-
-if __name__ == "__main__":
- pass \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/console/line_parsing.py b/scripts/automation/trex_control_plane/console/line_parsing.py
deleted file mode 100644
index 34776424..00000000
--- a/scripts/automation/trex_control_plane/console/line_parsing.py
+++ /dev/null
@@ -1,5 +0,0 @@
-__author__ = 'danklei'
-
-
-if __name__ == "__main__":
- pass \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/console/old_console.py b/scripts/automation/trex_control_plane/console/old_console.py
new file mode 100644
index 00000000..9d61a3a6
--- /dev/null
+++ b/scripts/automation/trex_control_plane/console/old_console.py
@@ -0,0 +1,958 @@
+
+# main console object
+class TRexConsole1(cmd.Cmd):
+ """Trex Console"""
+
+ def __init__(self, stateless_client, verbose):
+ cmd.Cmd.__init__(self)
+
+ self.stateless_client = stateless_client
+
+ self.do_connect("")
+
+ self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__)
+ self.intro += "\nType 'help' or '?' for supported actions\n"
+
+ self.verbose = False
+ self._silent = True
+
+ self.postcmd(False, "")
+
+ self.user_streams = {}
+ self.streams_db = CStreamsDB()
+
+
+ # a cool hack - i stole this function and added space
+ def completenames(self, text, *ignored):
+ dotext = 'do_'+text
+ return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)]
+
+
+ # set verbose on / off
+ def do_verbose(self, line):
+ '''Shows or set verbose mode\n'''
+ if line == "":
+ print "\nverbose is " + ("on\n" if self.verbose else "off\n")
+
+ elif line == "on":
+ self.verbose = True
+ self.stateless_client.set_verbose(True)
+ print green("\nverbose set to on\n")
+
+ elif line == "off":
+ self.verbose = False
+ self.stateless_client.set_verbose(False)
+ print green("\nverbose set to off\n")
+
+ else:
+ print magenta("\nplease specify 'on' or 'off'\n")
+
+ # query the server for registered commands
+ def do_query_server(self, line):
+ '''query the RPC server for supported remote commands\n'''
+
+ res_ok, msg = self.stateless_client.get_supported_cmds()
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ print "\nRPC server supports the following commands:\n"
+ for func in msg:
+ if func:
+ print func
+ print ''
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ return
+
+ def do_ping(self, line):
+ '''Pings the RPC server\n'''
+
+ print "\n-> Pinging RPC server"
+
+ res_ok, msg = self.stateless_client.ping()
+ if res_ok:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ else:
+ print "\n*** " + msg + "\n"
+ return
+
+ def do_force_acquire(self, line):
+ '''Acquires ports by force\n'''
+
+ self.do_acquire(line, True)
+
+ def complete_force_acquire(self, text, line, begidx, endidx):
+ return self.port_auto_complete(text, line, begidx, endidx, acquired=False)
+
+ def extract_port_ids_from_line(self, line):
+ return {int(x) for x in line.split()}
+
+ def extract_port_ids_from_list(self, port_list):
+ return {int(x) for x in port_list}
+
+ def parse_ports_from_line (self, line):
+ port_list = set()
+ if line:
+ for port_id in line.split(' '):
+ if (not port_id.isdigit()) or (int(port_id) < 0) or (int(port_id) >= self.stateless_client.get_port_count()):
+ print "Please provide a list of ports separated by spaces between 0 and {0}".format(self.stateless_client.get_port_count() - 1)
+ return None
+
+ port_list.add(int(port_id))
+
+ port_list = list(port_list)
+
+ else:
+ port_list = [i for i in xrange(0, self.stateless_client.get_port_count())]
+
+ return port_list
+
+
+ def do_acquire(self, line, force=False):
+ '''Acquire ports\n'''
+
+ # make sure that the user wants to acquire all
+ args = line.split()
+ if len(args) < 1:
+ print magenta("Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports")
+ return
+
+ if args[0] == "all":
+ ask = ConfirmMenu('Are you sure you want to acquire all ports ? ')
+ rc = ask.show()
+ if rc == False:
+ print yellow("[ABORTED]\n")
+ return
+ else:
+ port_list = self.stateless_client.get_port_ids()
+ else:
+ port_list = self.extract_port_ids_from_line(line)
+
+ # rc, resp_list = self.stateless_client.take_ownership(port_list, force)
+ try:
+ res_ok, log = self.stateless_client.acquire(port_list, force)
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ except ValueError as e:
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+
+ def port_auto_complete(self, text, line, begidx, endidx, acquired=True, active=False):
+ if acquired:
+ if not active:
+ ret_list = [x
+ for x in map(str, self.stateless_client.get_acquired_ports())
+ if x.startswith(text)]
+ else:
+ ret_list = [x
+ for x in map(str, self.stateless_client.get_active_ports())
+ if x.startswith(text)]
+ else:
+ ret_list = [x
+ for x in map(str, self.stateless_client.get_port_ids())
+ if x.startswith(text)]
+ ret_list.append("all")
+ return ret_list
+
+
+ def complete_acquire(self, text, line, begidx, endidx):
+ return self.port_auto_complete(text, line, begidx, endidx, acquired=False)
+
+ def do_release (self, line):
+ '''Release ports\n'''
+
+ # if line:
+ # port_list = self.parse_ports_from_line(line)
+ # else:
+ # port_list = self.stateless_client.get_owned_ports()
+ args = line.split()
+ if len(args) < 1:
+ print "Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports"
+ if args[0] == "all":
+ ask = ConfirmMenu('Are you sure you want to release all acquired ports? ')
+ rc = ask.show()
+ if rc == False:
+ print yellow("[ABORTED]\n")
+ return
+ else:
+ port_list = self.stateless_client.get_acquired_ports()
+ else:
+ port_list = self.extract_port_ids_from_line(line)
+
+ try:
+ res_ok, log = self.stateless_client.release(port_list)
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ except ValueError as e:
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+
+ def complete_release(self, text, line, begidx, endidx):
+ return self.port_auto_complete(text, line, begidx, endidx)
+
+ def do_connect (self, line):
+ '''Connects to the server\n'''
+
+ if line == "":
+ res_ok, msg = self.stateless_client.connect()
+ else:
+ sp = line.split()
+ if (len(sp) != 2):
+ print "\n[usage] connect [server] [port] or without parameters\n"
+ return
+
+ res_ok, msg = self.stateless_client.connect(sp[0], sp[1])
+
+ if res_ok:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ else:
+ print "\n*** " + msg + "\n"
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+
+ self.supported_rpc = self.stateless_client.get_supported_cmds().data
+
+ # def do_rpc (self, line):
+ # '''Launches a RPC on the server\n'''
+ #
+ # if line == "":
+ # print "\nUsage: [method name] [param dict as string]\n"
+ # print "Example: rpc test_add {'x': 12, 'y': 17}\n"
+ # return
+ #
+ # sp = line.split(' ', 1)
+ # method = sp[0]
+ #
+ # params = None
+ # bad_parse = False
+ # if len(sp) > 1:
+ #
+ # try:
+ # params = ast.literal_eval(sp[1])
+ # if not isinstance(params, dict):
+ # bad_parse = True
+ #
+ # except ValueError as e1:
+ # bad_parse = True
+ # except SyntaxError as e2:
+ # bad_parse = True
+ #
+ # if bad_parse:
+ # print "\nValue should be a valid dict: '{0}'".format(sp[1])
+ # print "\nUsage: [method name] [param dict as string]\n"
+ # print "Example: rpc test_add {'x': 12, 'y': 17}\n"
+ # return
+ #
+ # res_ok, msg = self.stateless_client.transmit(method, params)
+ # if res_ok:
+ # print "\nServer Response:\n\n" + pretty_json(json.dumps(msg)) + "\n"
+ # else:
+ # print "\n*** " + msg + "\n"
+ # #print "Please try 'reconnect' to reconnect to server"
+ #
+ #
+ # def complete_rpc (self, text, line, begidx, endidx):
+ # return [x
+ # for x in self.supported_rpc
+ # if x.startswith(text)]
+
+ def do_status (self, line):
+ '''Shows a graphical console\n'''
+
+ if not self.stateless_client.is_connected():
+ print "Not connected to server\n"
+ return
+
+ self.do_verbose('off')
+ trex_status.show_trex_status(self.stateless_client)
+
+ def do_quit(self, line):
+ '''Exit the client\n'''
+ return True
+
+ def do_disconnect (self, line):
+ '''Disconnect from the server\n'''
+ if not self.stateless_client.is_connected():
+ print "Not connected to server\n"
+ return
+
+ res_ok, msg = self.stateless_client.disconnect()
+ if res_ok:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ else:
+ print msg + "\n"
+
+ def do_whoami (self, line):
+ '''Prints console user name\n'''
+ print "\n" + self.stateless_client.user + "\n"
+
+ def postcmd(self, stop, line):
+ if self.stateless_client.is_connected():
+ self.prompt = "TRex > "
+ else:
+ self.supported_rpc = None
+ self.prompt = "TRex (offline) > "
+
+ return stop
+
+ def default(self, line):
+ print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line)
+
+ # def do_help (self, line):
+ # '''Shows This Help Screen\n'''
+ # if line:
+ # try:
+ # func = getattr(self, 'help_' + line)
+ # except AttributeError:
+ # try:
+ # doc = getattr(self, 'do_' + line).__doc__
+ # if doc:
+ # self.stdout.write("%s\n"%str(doc))
+ # return
+ # except AttributeError:
+ # pass
+ # self.stdout.write("%s\n"%str(self.nohelp % (line,)))
+ # return
+ # func()
+ # return
+ #
+ # print "\nSupported Console Commands:"
+ # print "----------------------------\n"
+ #
+ # cmds = [x[3:] for x in self.get_names() if x.startswith("do_")]
+ # for cmd in cmds:
+ # if cmd == "EOF":
+ # continue
+ #
+ # try:
+ # doc = getattr(self, 'do_' + cmd).__doc__
+ # if doc:
+ # help = str(doc)
+ # else:
+ # help = "*** Undocumented Function ***\n"
+ # except AttributeError:
+ # help = "*** Undocumented Function ***\n"
+ #
+ # print "{:<30} {:<30}".format(cmd + " - ", help)
+
+ def do_stream_db_add(self, line):
+ '''Loads a YAML stream list serialization into user console \n'''
+ args = line.split()
+ if len(args) >= 2:
+ name = args[0]
+ yaml_path = args[1]
+ try:
+ multiplier = args[2]
+ except IndexError:
+ multiplier = 1
+ stream_list = CStreamList()
+ loaded_obj = stream_list.load_yaml(yaml_path, multiplier)
+ # print self.stateless_client.pretty_json(json.dumps(loaded_obj))
+ try:
+ compiled_streams = stream_list.compile_streams()
+ res_ok = self.streams_db.load_streams(name, LoadedStreamList(loaded_obj,
+ [StreamPack(v.stream_id, v.stream.dump())
+ for k, v in compiled_streams.items()]))
+ if res_ok:
+ print green("Stream pack '{0}' loaded and added successfully\n".format(name))
+ else:
+ print magenta("Picked name already exist. Please pick another name.\n")
+ except Exception as e:
+ print "adding new stream failed due to the following error:\n", str(e)
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+ return
+ else:
+ print magenta("please provide load name and YAML path, separated by space.\n"
+ "Optionally, you may provide a third argument to specify multiplier.\n")
+
+ @staticmethod
+ def tree_autocomplete(text):
+ dir = os.path.dirname(text)
+ if dir:
+ path = dir
+ else:
+ path = "."
+ start_string = os.path.basename(text)
+ return [x
+ for x in os.listdir(path)
+ if x.startswith(start_string)]
+
+
+ def complete_stream_db_add(self, text, line, begidx, endidx):
+ arg_num = len(line.split()) - 1
+ if arg_num == 2:
+ return TRexConsole.tree_autocomplete(line.split()[-1])
+ else:
+ return [text]
+
+ def do_stream_db_show(self, line):
+ '''Shows the loaded stream list named [name] \n'''
+ args = line.split()
+ if args:
+ list_name = args[0]
+ try:
+ stream = self.streams_db.get_stream_pack(list_name)#user_streams[list_name]
+ if len(args) >= 2 and args[1] == "full":
+ print pretty_json(json.dumps(stream.compiled))
+ else:
+ print pretty_json(json.dumps(stream.loaded))
+ except KeyError as e:
+ print "Unknown stream list name provided"
+ else:
+ print "Available stream packs:\n{0}".format(', '.join(sorted(self.streams_db.get_loaded_streams_names())))
+
+ def complete_stream_db_show(self, text, line, begidx, endidx):
+ return [x
+ for x in self.streams_db.get_loaded_streams_names()
+ if x.startswith(text)]
+
+ def do_stream_db_remove(self, line):
+ '''Removes a single loaded stream packs from loaded stream pack repository\n'''
+ args = line.split()
+ if args:
+ removed_streams = self.streams_db.remove_stream_packs(*args)
+ if removed_streams:
+ print green("The following stream packs were removed:")
+ print bold(", ".join(sorted(removed_streams)))
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ else:
+ print red("No streams were removed. Make sure to provide valid stream pack names.")
+ else:
+ print magenta("Please provide stream pack name(s), separated with spaces.")
+
+ def do_stream_db_clear(self, line):
+ '''Clears all loaded stream packs from loaded stream pack repository\n'''
+ self.streams_db.clear()
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+
+
+ def complete_stream_db_remove(self, text, line, begidx, endidx):
+ return [x
+ for x in self.streams_db.get_loaded_streams_names()
+ if x.startswith(text)]
+
+
+ def do_attach(self, line):
+ '''Assign loaded stream pack into specified ports on TRex\n'''
+ args = line.split()
+ if len(args) >= 2:
+ stream_pack_name = args[0]
+ stream_list = self.streams_db.get_stream_pack(stream_pack_name) #user_streams[args[0]]
+ if not stream_list:
+ print "Provided stream list name '{0}' doesn't exists.".format(stream_pack_name)
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ if args[1] == "all":
+ ask = ConfirmMenu('Are you sure you want to release all acquired ports? ')
+ rc = ask.show()
+ if rc == False:
+ print yellow("[ABORTED]\n")
+ return
+ else:
+ port_list = self.stateless_client.get_acquired_ports()
+ else:
+ port_list = self.extract_port_ids_from_line(' '.join(args[1:]))
+ owned = set(self.stateless_client.get_acquired_ports())
+ try:
+ if set(port_list).issubset(owned):
+ res_ok, log = self.stateless_client.add_stream_pack(stream_list.compiled, port_id=port_list)
+ # res_ok, msg = self.stateless_client.add_stream(port_list, stream_list.compiled)
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ return
+ else:
+ print "Not all desired ports are acquired.\n" \
+ "Acquired ports are: {acq}\n" \
+ "Requested ports: {req}\n" \
+ "Missing ports: {miss}".format(acq=list(owned),
+ req=port_list,
+ miss=list(set(port_list).difference(owned)))
+ print format_text("[FAILED]\n", 'red', 'bold')
+ except ValueError as e:
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+ else:
+ print magenta("Please provide list name and ports to attach to, "
+ "or specify 'all' to attach all owned ports.\n")
+
+ def complete_attach(self, text, line, begidx, endidx):
+ arg_num = len(line.split()) - 1
+ if arg_num == 1:
+ # return optional streams packs
+ if line.endswith(" "):
+ return self.port_auto_complete(text, line, begidx, endidx)
+ return [x
+ for x in self.streams_db.get_loaded_streams_names()
+ if x.startswith(text)]
+ elif arg_num >= 2:
+ # return optional ports to attach to
+ return self.port_auto_complete(text, line, begidx, endidx)
+ else:
+ return [text]
+
+ def prompt_response(self, response_obj):
+ resp_list = response_obj if isinstance(response_obj, list) else [response_obj]
+ def format_return_status(return_status):
+ if return_status:
+ return green("OK")
+ else:
+ return red("FAIL")
+
+ for response in resp_list:
+ response_str = "{id:^3} - {msg} ({stat})".format(id=response.id,
+ msg=response.msg,
+ stat=format_return_status(response.success))
+ print response_str
+ return
+
+ def do_remove_all_streams(self, line):
+ '''Acquire ports\n'''
+
+ # make sure that the user wants to acquire all
+ args = line.split()
+ if len(args) < 1:
+ print magenta("Please provide a list of ports separated by spaces, "
+ "or specify 'all' to remove from all acquired ports")
+ return
+ if args[0] == "all":
+ ask = ConfirmMenu('Are you sure you want to remove all stream packs from all acquired ports? ')
+ rc = ask.show()
+ if rc == False:
+ print yellow("[ABORTED]\n")
+ return
+ else:
+ port_list = self.stateless_client.get_acquired_ports()
+ else:
+ port_list = self.extract_port_ids_from_line(line)
+
+ # rc, resp_list = self.stateless_client.take_ownership(port_list, force)
+ try:
+ res_ok, log = self.stateless_client.remove_all_streams(port_list)
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ except ValueError as e:
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+ def complete_remove_all_streams(self, text, line, begidx, endidx):
+ return self.port_auto_complete(text, line, begidx, endidx)
+
+ def do_start(self, line):
+ '''Start selected traffic in specified ports on TRex\n'''
+ # make sure that the user wants to acquire all
+ parser = parsing_opts.gen_parser("start", self.do_start.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.FORCE,
+ parsing_opts.STREAM_FROM_PATH_OR_FILE,
+ parsing_opts.DURATION,
+ parsing_opts.MULTIPLIER)
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ # avoid further processing in this command
+ return
+ # print opts
+ port_list = self.extract_port_list(opts)
+ # print port_list
+ if opts.force:
+ # stop all active ports, if any
+ res_ok = self.stop_traffic(set(self.stateless_client.get_active_ports()).intersection(port_list))
+ if not res_ok:
+ print yellow("[ABORTED]\n")
+ return
+ # remove all traffic from ports
+ res_ok = self.remove_all_streams(port_list)
+ if not res_ok:
+ print yellow("[ABORTED]\n")
+ return
+ # decide which traffic to use
+ stream_pack_name = None
+ if opts.db:
+ # use pre-loaded traffic
+ print format_text('{:<30}'.format("Load stream pack (from DB):"), 'bold'),
+ if opts.db not in self.streams_db.get_loaded_streams_names():
+ print format_text("[FAILED]\n", 'red', 'bold')
+ print yellow("[ABORTED]\n")
+ return
+ else:
+ stream_pack_name = opts.db
+ else:
+ # try loading a YAML file
+ print format_text('{:<30}'.format("Load stream pack (from file):"), 'bold'),
+ stream_list = CStreamList()
+ loaded_obj = stream_list.load_yaml(opts.file[0])
+ # print self.stateless_client.pretty_json(json.dumps(loaded_obj))
+ try:
+ compiled_streams = stream_list.compile_streams()
+ res_ok = self.streams_db.load_streams(opts.file[1],
+ LoadedStreamList(loaded_obj,
+ [StreamPack(v.stream_id, v.stream.dump())
+ for k, v in compiled_streams.items()]))
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ print yellow("[ABORTED]\n")
+ return
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ stream_pack_name = opts.file[1]
+ except Exception as e:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ print yellow("[ABORTED]\n")
+ res_ok = self.attach_to_port(stream_pack_name, port_list)
+ if not res_ok:
+ print yellow("[ABORTED]\n")
+ return
+ # finally, start the traffic
+ res_ok = self.start_traffic(opts.mult, port_list)
+ if not res_ok:
+ print yellow("[ABORTED]\n")
+ return
+ return
+
+ def help_start(self):
+ self.do_start("-h")
+
+ def do_stop(self, line):
+ '''Stop active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser("stop", self.do_stop.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ # avoid further processing in this command
+ return
+ port_list = self.extract_port_list(opts)
+ res_ok = self.stop_traffic(port_list)
+ return
+
+ def do_pause(self, line):
+ '''Pause active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser("stop", self.do_stop.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ # avoid further processing in this command
+ return
+ port_list = self.extract_port_list(opts)
+ res_ok = self.stop_traffic(port_list)
+ return
+
+
+ def help_stop(self):
+ self.do_stop("-h")
+
+
+ def do_debug(self, line):
+ '''Enter DEBUG mode of the console to invoke smaller building blocks with server'''
+ i = DebugTRexConsole(self)
+ i.prompt = self.prompt[:-3] + ':' + blue('debug') + ' > '
+ i.cmdloop()
+
+ # aliasing
+ do_exit = do_EOF = do_q = do_quit
+
+ # ----- utility methods ----- #
+
+ def start_traffic(self, multiplier, port_list):#, silent=True):
+ print format_text('{:<30}'.format("Start traffic:"), 'bold'),
+ try:
+ res_ok, log = self.stateless_client.start_traffic(multiplier, port_id=port_list)
+ if not self._silent:
+ print ''
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return False
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ return True
+ except ValueError as e:
+ print ''
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return False
+
+ def attach_to_port(self, stream_pack_name, port_list):
+ print format_text('{:<30}'.format("Attaching traffic to ports:"), 'bold'),
+ stream_list = self.streams_db.get_stream_pack(stream_pack_name) #user_streams[args[0]]
+ if not stream_list:
+ print "Provided stream list name '{0}' doesn't exists.".format(stream_pack_name)
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ try:
+ res_ok, log = self.stateless_client.add_stream_pack(stream_list.compiled, port_id=port_list)
+ if not self._silent:
+ print ''
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return False
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ return True
+ except ValueError as e:
+ print ''
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return False
+
+ def stop_traffic(self, port_list):
+ print format_text('{:<30}'.format("Stop traffic:"), 'bold'),
+ try:
+ res_ok, log = self.stateless_client.stop_traffic(port_id=port_list)
+ if not self._silent:
+ print ''
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ return True
+ except ValueError as e:
+ print ''
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+ def remove_all_streams(self, port_list):
+ '''Remove all streams from given port_list'''
+ print format_text('{:<30}'.format("Remove all streams:"), 'bold'),
+ try:
+ res_ok, log = self.stateless_client.remove_all_streams(port_id=port_list)
+ if not self._silent:
+ print ''
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ return True
+ except ValueError as e:
+ print ''
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+
+
+
+
+ def extract_port_list(self, opts):
+ if opts.all_ports or "all" in opts.ports:
+ # handling all ports
+ port_list = self.stateless_client.get_acquired_ports()
+ else:
+ port_list = self.extract_port_ids_from_list(opts.ports)
+ return port_list
+
+ def decode_multiplier(self, opts_mult):
+ pass
+
+
+class DebugTRexConsole(cmd.Cmd):
+
+ def __init__(self, trex_main_console):
+ cmd.Cmd.__init__(self)
+ self.trex_console = trex_main_console
+ self.stateless_client = self.trex_console.stateless_client
+ self.streams_db = self.trex_console.streams_db
+ self.register_main_console_methods()
+ self.do_silent("on")
+ pass
+
+ # ----- super methods overriding ----- #
+ def completenames(self, text, *ignored):
+ dotext = 'do_'+text
+ return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)]
+
+ def get_names(self):
+ result = cmd.Cmd.get_names(self)
+ result += self.trex_console.get_names()
+ return list(set(result))
+
+ def register_main_console_methods(self):
+ main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__)))
+ for name in main_names:
+ for prefix in 'do_', 'help_', 'complete_':
+ if name.startswith(prefix):
+ self.__dict__[name] = getattr(self.trex_console, name)
+
+ # if (name[:3] == 'do_') or (name[:5] == 'help_') or (name[:9] == 'complete_'):
+ # chosen.append(name)
+ # self.__dict__[name] = getattr(self.trex_console, name)
+ # # setattr(self, name, classmethod(getattr(self.trex_console, name)))
+
+ # print chosen
+ # self.get_names()
+
+ # return result
+
+
+ # ----- DEBUGGING methods ----- #
+ # set silent on / off
+ def do_silent(self, line):
+ '''Shows or set silent mode\n'''
+ if line == "":
+ print "\nsilent mode is " + ("on\n" if self.trex_console._silent else "off\n")
+
+ elif line == "on":
+ self.verbose = True
+ self.stateless_client.set_verbose(True)
+ print green("\nsilent set to on\n")
+
+ elif line == "off":
+ self.verbose = False
+ self.stateless_client.set_verbose(False)
+ print green("\nsilent set to off\n")
+
+ else:
+ print magenta("\nplease specify 'on' or 'off'\n")
+
+ def do_quit(self, line):
+ '''Exit the debug client back to main console\n'''
+ self.do_silent("off")
+ return True
+
+ def do_start_traffic(self, line):
+ '''Start pre-submitted traffic in specified ports on TRex\n'''
+ # make sure that the user wants to acquire all
+ parser = parsing_opts.gen_parser("start_traffic", self.do_start_traffic.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.MULTIPLIER)
+ opts = parser.parse_args(line.split())
+ # print opts
+ # return
+ if opts is None:
+ # avoid further processing in this command
+ return
+ try:
+ port_list = self.trex_console.extract_port_list(opts)
+ return self.trex_console.start_traffic(opts.mult, port_list)
+ except Exception as e:
+ print e
+ return
+
+ def do_stop_traffic(self, line):
+ '''Stop active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser("stop_traffic", self.do_stop_traffic.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+ opts = parser.parse_args(line.split())
+ # print opts
+ # return
+ if opts is None:
+ # avoid further processing in this command
+ return
+ try:
+ port_list = self.trex_console.extract_port_list(opts)
+ return self.trex_console.stop_traffic(port_list)
+ except Exception as e:
+ print e
+ return
+
+
+ def complete_stop_traffic(self, text, line, begidx, endidx):
+ return self.port_auto_complete(text, line, begidx, endidx, active=True)
+
+ # return
+ # # return
+ # # if not opts.port_list:
+ # # print magenta("Please provide a list of ports separated by spaces, "
+ # # "or specify 'all' to start traffic on all acquired ports")
+ # # return
+ #
+
+
+ return
+ args = line.split()
+ if len(args) < 1:
+ print magenta("Please provide a list of ports separated by spaces, "
+ "or specify 'all' to start traffic on all acquired ports")
+ return
+ if args[0] == "all":
+ ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ')
+ rc = ask.show()
+ if rc == False:
+ print yellow("[ABORTED]\n")
+ return
+ else:
+ port_list = self.stateless_client.get_acquired_ports()
+ else:
+ port_list = self.extract_port_ids_from_line(line)
+
+ try:
+ res_ok, log = self.stateless_client.start_traffic(1.0, port_id=port_list)
+ self.prompt_response(log)
+ if not res_ok:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ except ValueError as e:
+ print magenta(str(e))
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+ def complete_start_traffic(self, text, line, begidx, endidx):
+ # return self.port_auto_complete(text, line, begidx, endidx)
+ return [text]
+
+ def help_start_traffic(self):
+ self.do_start_traffic("-h")
+
+ def help_stop_traffic(self):
+ self.do_stop_traffic("-h")
+
+ # def do_help(self):
+
+ def do_rpc (self, line):
+ '''Launches a RPC on the server\n'''
+
+ if line == "":
+ print "\nUsage: [method name] [param dict as string]\n"
+ print "Example: rpc test_add {'x': 12, 'y': 17}\n"
+ return
+
+ sp = line.split(' ', 1)
+ method = sp[0]
+
+ params = None
+ bad_parse = False
+ if len(sp) > 1:
+
+ try:
+ params = ast.literal_eval(sp[1])
+ if not isinstance(params, dict):
+ bad_parse = True
+
+ except ValueError as e1:
+ bad_parse = True
+ except SyntaxError as e2:
+ bad_parse = True
+
+ if bad_parse:
+ print "\nValue should be a valid dict: '{0}'".format(sp[1])
+ print "\nUsage: [method name] [param dict as string]\n"
+ print "Example: rpc test_add {'x': 12, 'y': 17}\n"
+ return
+
+ res_ok, msg = self.stateless_client.transmit(method, params)
+ if res_ok:
+ print "\nServer Response:\n\n" + pretty_json(json.dumps(msg)) + "\n"
+ else:
+ print "\n*** " + msg + "\n"
+ #print "Please try 'reconnect' to reconnect to server"
+
+
+ def complete_rpc (self, text, line, begidx, endidx):
+ return [x
+ for x in self.trex_console.supported_rpc
+ if x.startswith(text)]
+
+ # aliasing
+ do_exit = do_EOF = do_q = do_quit
+
+#
diff --git a/scripts/automation/trex_control_plane/console/parsing_opts.py b/scripts/automation/trex_control_plane/console/parsing_opts.py
new file mode 100755
index 00000000..d5c21af0
--- /dev/null
+++ b/scripts/automation/trex_control_plane/console/parsing_opts.py
@@ -0,0 +1,193 @@
+import argparse
+from collections import namedtuple
+import sys
+import re
+import os
+
+ArgumentPack = namedtuple('ArgumentPack', ['name_or_flags', 'options'])
+ArgumentGroup = namedtuple('ArgumentGroup', ['type', 'args', 'options'])
+
+
+# list of available parsing options
+MULTIPLIER = 1
+PORT_LIST = 2
+ALL_PORTS = 3
+PORT_LIST_WITH_ALL = 4
+FILE_PATH = 5
+FILE_FROM_DB = 6
+SERVER_IP = 7
+STREAM_FROM_PATH_OR_FILE = 8
+DURATION = 9
+FORCE = 10
+
+# list of ArgumentGroup types
+MUTEX = 1
+
+
+def match_time_unit(val):
+ '''match some val against time shortcut inputs '''
+ match = re.match("^(\d+)([m|h]?)$", val)
+ if match:
+ digit = int(match.group(1))
+ unit = match.group(2)
+ if not unit:
+ return digit
+ elif unit == 'm':
+ return digit*60
+ else:
+ return digit*60*60
+ else:
+ raise argparse.ArgumentTypeError("Duration should be passed in the following format: \n"
+ "-d 100 : in sec \n"
+ "-d 10m : in min \n"
+ "-d 1h : in hours")
+
+def match_multiplier(val):
+ '''match some val against multiplier shortcut inputs '''
+ match = re.match("^(\d+)(gb|kpps|%?)$", val)
+ if match:
+ digit = int(match.group(1))
+ unit = match.group(2)
+ if not unit:
+ return digit
+ elif unit == 'gb':
+ raise NotImplementedError("gb units are not supported yet")
+ else:
+ raise NotImplementedError("kpps units are not supported yet")
+ else:
+ raise argparse.ArgumentTypeError("Multiplier should be passed in the following format: \n"
+ "-m 100 : multiply stream file by this factor \n"
+ "-m 10gb : from graph calculate the maximum rate as this bandwidth (for each port)\n"
+ "-m 10kpps : from graph calculate the maximum rate as this pps (for each port)\n"
+ "-m 40% : from graph calculate the maximum rate as this percent from total port (for each port)")
+
+
+
+def is_valid_file(filename):
+ if not os.path.isfile(filename):
+ raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename)
+
+ return filename
+
+
+OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
+ {'help': "Set multiplier for stream",
+ 'dest': "mult",
+ 'default': 1.0,
+ 'type': match_multiplier}),
+
+ PORT_LIST: ArgumentPack(['--port'],
+ {"nargs": '+',
+ 'dest':'ports',
+ 'metavar': 'PORTS',
+ 'type': int,
+ 'help': "A list of ports on which to apply the command",
+ 'default': []}),
+
+ ALL_PORTS: ArgumentPack(['-a'],
+ {"action": "store_true",
+ "dest": "all_ports",
+ 'help': "Set this flag to apply the command on all available ports"}),
+ DURATION: ArgumentPack(['-d'],
+ {'action': "store",
+ 'metavar': 'TIME',
+ 'dest': 'duration',
+ 'type': match_time_unit,
+ 'default': -1.0,
+ 'help': "Set duration time for TRex."}),
+
+ FORCE: ArgumentPack(['--force'],
+ {"action": "store_true",
+ 'default': False,
+ 'help': "Set if you want to stop active ports before applying new TRex run on them."}),
+
+ FILE_PATH: ArgumentPack(['-f'],
+ {'metavar': 'FILE',
+ 'dest': 'file',
+ 'nargs': 1,
+ 'type': is_valid_file,
+ 'help': "File path to YAML file that describes a stream pack. "}),
+
+ FILE_FROM_DB: ArgumentPack(['--db'],
+ {'metavar': 'LOADED_STREAM_PACK',
+ 'help': "A stream pack which already loaded into console cache."}),
+
+ SERVER_IP: ArgumentPack(['--server'],
+ {'metavar': 'SERVER',
+ 'help': "server IP"}),
+
+ # advanced options
+ PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST,
+ ALL_PORTS],
+ {'required': True}),
+ STREAM_FROM_PATH_OR_FILE: ArgumentGroup(MUTEX, [FILE_PATH,
+ FILE_FROM_DB],
+ {'required': True})
+ }
+
+
+class CCmdArgParser(argparse.ArgumentParser):
+
+ def __init__(self, stateless_client, *args, **kwargs):
+ super(CCmdArgParser, self).__init__(*args, **kwargs)
+ self.stateless_client = stateless_client
+
+ def parse_args(self, args=None, namespace=None):
+ try:
+ opts = super(CCmdArgParser, self).parse_args(args, namespace)
+ if opts is None:
+ return None
+
+ if getattr(opts, "all_ports", None):
+ opts.ports = self.stateless_client.get_port_ids()
+
+ if getattr(opts, "ports", None):
+ for port in opts.ports:
+ if not self.stateless_client.validate_port_list([port]):
+ self.error("port id '{0}' is not a valid port id\n".format(port))
+
+ return opts
+
+ except SystemExit:
+ # recover from system exit scenarios, such as "help", or bad arguments.
+ return None
+
+
+def get_flags (opt):
+ return OPTIONS_DB[opt].name_or_flags
+
+def gen_parser(stateless_client, op_name, description, *args):
+ parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve',
+ description=description)
+ for param in args:
+ try:
+
+ if isinstance(param, int):
+ argument = OPTIONS_DB[param]
+ else:
+ argument = param
+
+ if isinstance(argument, ArgumentGroup):
+ if argument.type == MUTEX:
+ # handle as mutually exclusive group
+ group = parser.add_mutually_exclusive_group(**argument.options)
+ for sub_argument in argument.args:
+ group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags,
+ **OPTIONS_DB[sub_argument].options)
+ else:
+ # ignore invalid objects
+ continue
+ elif isinstance(argument, ArgumentPack):
+ parser.add_argument(*argument.name_or_flags,
+ **argument.options)
+ else:
+ # ignore invalid objects
+ continue
+ except KeyError as e:
+ cause = e.args[0]
+ raise KeyError("The attribute '{0}' is missing as a field of the {1} option.\n".format(cause, param))
+ return parser
+
+
+if __name__ == "__main__":
+ pass \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index e707a9e1..c03f2a82 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -23,6 +23,7 @@ import json
import ast
import argparse
import random
+import readline
import string
import os
import sys
@@ -32,152 +33,173 @@ from common.trex_streams import *
from client.trex_stateless_client import CTRexStatelessClient
from common.text_opts import *
from client_utils.general_utils import user_input, get_current_user
-
import trex_status
-from collections import namedtuple
+import parsing_opts
-__version__ = "1.0"
-LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
+__version__ = "1.1"
-def readch(choices=[]):
+class TRexGeneralCmd(cmd.Cmd):
+ def __init__(self):
+ cmd.Cmd.__init__(self)
+ # configure history behaviour
+ self._history_file_dir = "/tmp/trex/console/"
+ self._history_file = self.get_history_file_full_path()
+ readline.set_history_length(100)
+ # load history, if any
+ self.load_console_history()
- fd = sys.stdin.fileno()
- old_settings = termios.tcgetattr(fd)
- try:
- tty.setraw(sys.stdin.fileno())
- while True:
- ch = sys.stdin.read(1)
- if (ord(ch) == 3) or (ord(ch) == 4):
- return None
- if ch in choices:
- return ch
- finally:
- termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
-
- return None
-
-class ConfirmMenu(object):
- def __init__ (self, caption):
- self.caption = "{cap} [confirm] : ".format(cap=caption)
-
- def show(self):
- sys.stdout.write(self.caption)
- input = user_input()
- if input:
- return False
- else:
- # user hit Enter
- return True
-
-class CStreamsDB(object):
+ def get_console_identifier(self):
+ return self.__class__.__name__
- def __init__(self):
- self.stream_packs = {}
+ def get_history_file_full_path(self):
+ return "{dir}{filename}.hist".format(dir=self._history_file_dir,
+ filename=self.get_console_identifier())
- def load_streams(self, name, LoadedStreamList_obj):
- if name in self.stream_packs:
- return False
- else:
- self.stream_packs[name] = LoadedStreamList_obj
- return True
+ def load_console_history(self):
+ if os.path.exists(self._history_file):
+ readline.read_history_file(self._history_file)
+ return
- def remove_stream_packs(self, *names):
- removed_streams = []
- for name in names:
- removed = self.stream_packs.pop(name)
- if removed:
- removed_streams.append(name)
- return removed_streams
+ def save_console_history(self):
+ if not os.path.exists(self._history_file_dir):
+ os.makedirs(self._history_file_dir)
+ # os.mknod(self._history_file)
+ readline.write_history_file(self._history_file)
+ return
- def clear(self):
- self.stream_packs.clear()
+ def emptyline(self):
+ """Called when an empty line is entered in response to the prompt.
- def get_loaded_streams_names(self):
- return self.stream_packs.keys()
+ This overriding is such that when empty line is passed, **nothing happens**.
+ """
+ return
- def get_stream_pack(self, name):
- return self.stream_packs.get(name)
+ def completenames(self, text, *ignored):
+ """
+ This overriding is such that a space is added to name completion.
+ """
+ dotext = 'do_'+text
+ return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)]
+ def precmd(self, line):
+ # before doing anything, save history snapshot of the console
+ # this is done before executing the command in case of ungraceful application exit
+ self.save_console_history()
+ return line
-# multi level cmd menu
-class CmdMenu(object):
- def __init__ (self):
- self.menus = []
+#
+# main console object
+class TRexConsole(TRexGeneralCmd):
+ """Trex Console"""
- def add_menu (self, caption, options):
- menu = {}
- menu['caption'] = caption
- menu['options'] = options
- self.menus.append(menu)
+ def __init__(self, stateless_client, acquire_all_ports=True, verbose=False):
+ self.stateless_client = stateless_client
+ TRexGeneralCmd.__init__(self)
- def show (self):
- cur_level = 0
- print "\n"
- selected_path = []
- for menu in self.menus:
- # show all the options
- print "{0}\n".format(menu['caption'])
- for i, option in enumerate(menu['options']):
- print "{0}. {1}".format(i + 1, option)
+ self.verbose = verbose
+ self.acquire_all_ports = acquire_all_ports
- #print "\nPlease select an option: "
+ self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__)
+ self.intro += "\nType 'help' or '?' for supported actions\n"
- choices = range(0, len(menu['options']))
- choices = [ chr(x + 48) for x in choices]
+ self.postcmd(False, "")
- print ""
- ch = readch(choices)
- print ""
- if ch == None:
- return None
+ ################### internal section ########################
- selected_path.append(int(ch) - 1)
+ def get_console_identifier(self):
+ return "{context}_{server}".format(context=self.__class__.__name__,
+ server=self.stateless_client.get_system_info()['hostname'])
+
+ def register_main_console_methods(self):
+ main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__)))
+ for name in main_names:
+ for prefix in 'do_', 'help_', 'complete_':
+ if name.startswith(prefix):
+ self.__dict__[name] = getattr(self.trex_console, name)
- return selected_path
+ def postcmd(self, stop, line):
+ if self.stateless_client.is_connected():
+ self.prompt = "TRex > "
+ else:
+ self.supported_rpc = None
+ self.prompt = "TRex (offline) > "
+ return stop
-class AddStreamMenu(CmdMenu):
- def __init__ (self):
- super(AddStreamMenu, self).__init__()
- self.add_menu('Please select type of stream', ['a', 'b', 'c'])
- self.add_menu('Please select ISG', ['d', 'e', 'f'])
+ def default(self, line):
+ print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line)
-# main console object
-class TRexConsole(cmd.Cmd):
- """Trex Console"""
+ @staticmethod
+ def tree_autocomplete(text):
+ dir = os.path.dirname(text)
+ if dir:
+ path = dir
+ else:
+ path = "."
- def __init__(self, stateless_client, verbose):
- cmd.Cmd.__init__(self)
- self.stateless_client = stateless_client
+ start_string = os.path.basename(text)
+
+ targets = []
- self.do_connect("")
+ for x in os.listdir(path):
+ if x.startswith(start_string):
+ y = os.path.join(path, x)
+ if os.path.isfile(y):
+ targets.append(x + ' ')
+ elif os.path.isdir(y):
+ targets.append(x + '/')
- self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__)
- self.intro += "\nType 'help' or '?' for supported actions\n"
+ return targets
- self.verbose = False
+ # annotation method
+ @staticmethod
+ def annotate (desc, rc = None, err_log = None, ext_err_msg = None):
+ print format_text('\n{:<40}'.format(desc), 'bold'),
+ if rc == None:
+ print "\n"
+ return
- self.postcmd(False, "")
+ if rc == False:
+ # do we have a complex log object ?
+ if isinstance(err_log, list):
+ print ""
+ for func in err_log:
+ if func:
+ print func
+ print ""
- self.user_streams = {}
- self.streams_db = CStreamsDB()
+ elif isinstance(err_log, str):
+ print "\n" + err_log + "\n"
+ print format_text("[FAILED]\n", 'red', 'bold')
+ if ext_err_msg:
+ print format_text(ext_err_msg + "\n", 'blue', 'bold')
- # a cool hack - i stole this function and added space
- def completenames(self, text, *ignored):
- dotext = 'do_'+text
- return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)]
+ return False
+
+ else:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ return True
+
+
+ ####################### shell commands #######################
+ def do_ping (self, line):
+ '''Ping the server\n'''
+
+ rc = self.stateless_client.cmd_ping()
+ if rc.bad():
+ return
# set verbose on / off
- def do_verbose (self, line):
+ def do_verbose(self, line):
'''Shows or set verbose mode\n'''
if line == "":
print "\nverbose is " + ("on\n" if self.verbose else "off\n")
@@ -185,582 +207,147 @@ class TRexConsole(cmd.Cmd):
elif line == "on":
self.verbose = True
self.stateless_client.set_verbose(True)
- print green("\nverbose set to on\n")
+ print format_text("\nverbose set to on\n", 'green', 'bold')
elif line == "off":
self.verbose = False
self.stateless_client.set_verbose(False)
- print green("\nverbose set to off\n")
+ print format_text("\nverbose set to off\n", 'green', 'bold')
else:
- print magenta("\nplease specify 'on' or 'off'\n")
+ print format_text("\nplease specify 'on' or 'off'\n", 'bold')
- # query the server for registered commands
- def do_query_server(self, line):
- '''query the RPC server for supported remote commands\n'''
- res_ok, msg = self.stateless_client.get_supported_cmds()
- if not res_ok:
- print format_text("[FAILED]\n", 'red', 'bold')
- return
- print "\nRPC server supports the following commands:\n"
- for func in msg:
- if func:
- print func
- print ''
- print format_text("[SUCCESS]\n", 'green', 'bold')
- return
-
- def do_ping (self, line):
- '''Pings the RPC server\n'''
-
- print "\n-> Pinging RPC server"
+ ############### connect
+ def do_connect (self, line):
+ '''Connects to the server\n'''
- res_ok, msg = self.stateless_client.ping()
- if res_ok:
- print format_text("[SUCCESS]\n", 'green', 'bold')
- else:
- print "\n*** " + msg + "\n"
+ rc = self.stateless_client.cmd_connect()
+ if rc.bad():
return
- def do_force_acquire (self, line):
- '''Acquires ports by force\n'''
-
- self.do_acquire(line, True)
-
- def complete_force_acquire(self, text, line, begidx, endidx):
- return self.port_auto_complete(text, line, begidx, endidx, acquired=False)
-
- def extract_port_ids_from_line(self, line):
- return {int(x) for x in line.split()}
-
- def parse_ports_from_line (self, line):
- port_list = set()
- if line:
- for port_id in line.split(' '):
- if (not port_id.isdigit()) or (int(port_id) < 0) or (int(port_id) >= self.stateless_client.get_port_count()):
- print "Please provide a list of ports separated by spaces between 0 and {0}".format(self.stateless_client.get_port_count() - 1)
- return None
-
- port_list.add(int(port_id))
-
- port_list = list(port_list)
-
- else:
- port_list = [i for i in xrange(0, self.stateless_client.get_port_count())]
-
- return port_list
-
- def do_acquire(self, line, force=False):
- '''Acquire ports\n'''
-
- # make sure that the user wants to acquire all
- args = line.split()
- if len(args) < 1:
- print "Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports"
- if args[0] == "all":
- ask = ConfirmMenu('Are you sure you want to acquire all ports ? ')
- rc = ask.show()
- if rc == False:
- print yellow("[ABORTED]\n")
- return
- else:
- port_list = self.stateless_client.get_port_ids()
- else:
- port_list = self.extract_port_ids_from_line(line)
-
- # rc, resp_list = self.stateless_client.take_ownership(port_list, force)
- try:
- res_ok, log = self.stateless_client.acquire(port_list, force)
- self.prompt_response(log)
- if not res_ok:
- print format_text("[FAILED]\n", 'red', 'bold')
- return
- print format_text("[SUCCESS]\n", 'green', 'bold')
- except ValueError as e:
- print magenta(str(e))
- print format_text("[FAILED]\n", 'red', 'bold')
+ def do_disconnect (self, line):
+ '''Disconnect from the server\n'''
- def port_auto_complete(self, text, line, begidx, endidx, acquired=True, active=False):
- if acquired:
- if not active:
- ret_list = [x
- for x in map(str, self.stateless_client.get_acquired_ports())
- if x.startswith(text)]
- else:
- ret_list = [x
- for x in map(str, self.stateless_client.get_active_ports())
- if x.startswith(text)]
- else:
- ret_list = [x
- for x in map(str, self.stateless_client.get_port_ids())
- if x.startswith(text)]
- ret_list.append("all")
- return ret_list
-
-
- def complete_acquire(self, text, line, begidx, endidx):
- return self.port_auto_complete(text, line, begidx, endidx, acquired=False)
-
- def do_release (self, line):
- '''Release ports\n'''
-
- # if line:
- # port_list = self.parse_ports_from_line(line)
- # else:
- # port_list = self.stateless_client.get_owned_ports()
- args = line.split()
- if len(args) < 1:
- print "Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports"
- if args[0] == "all":
- ask = ConfirmMenu('Are you sure you want to release all acquired ports? ')
- rc = ask.show()
- if rc == False:
- print yellow("[ABORTED]\n")
- return
- else:
- port_list = self.stateless_client.get_acquired_ports()
- else:
- port_list = self.extract_port_ids_from_line(line)
-
- try:
- res_ok, log = self.stateless_client.release(port_list)
- self.prompt_response(log)
- if not res_ok:
- print format_text("[FAILED]\n", 'red', 'bold')
- return
- print format_text("[SUCCESS]\n", 'green', 'bold')
- except ValueError as e:
- print magenta(str(e))
- print format_text("[FAILED]\n", 'red', 'bold')
+ rc = self.stateless_client.cmd_disconnect()
+ if rc.bad():
return
- def complete_release(self, text, line, begidx, endidx):
- return self.port_auto_complete(text, line, begidx, endidx)
+
+ ############### start
- def do_connect (self, line):
- '''Connects to the server\n'''
+ def complete_start(self, text, line, begidx, endidx):
+ s = line.split()
+ l = len(s)
- if line == "":
- res_ok, msg = self.stateless_client.connect()
- else:
- sp = line.split()
- if (len(sp) != 2):
- print "\n[usage] connect [server] [port] or without parameters\n"
- return
+ file_flags = parsing_opts.get_flags(parsing_opts.FILE_PATH)
- res_ok, msg = self.stateless_client.connect(sp[0], sp[1])
-
- if res_ok:
- print format_text("[SUCCESS]\n", 'green', 'bold')
- else:
- print "\n*** " + msg + "\n"
- print format_text("[FAILED]\n", 'red', 'bold')
- return
+ if (l > 1) and (s[l - 1] in file_flags):
+ return TRexConsole.tree_autocomplete("")
- self.supported_rpc = self.stateless_client.get_supported_cmds().data
+ if (l > 2) and (s[l - 2] in file_flags):
+ return TRexConsole.tree_autocomplete(s[l - 1])
- def do_rpc (self, line):
- '''Launches a RPC on the server\n'''
+ def do_start(self, line):
+ '''Start selected traffic in specified port(s) on TRex\n'''
- if line == "":
- print "\nUsage: [method name] [param dict as string]\n"
- print "Example: rpc test_add {'x': 12, 'y': 17}\n"
- return
+ self.stateless_client.cmd_start_line(line)
- sp = line.split(' ', 1)
- method = sp[0]
- params = None
- bad_parse = False
- if len(sp) > 1:
+ def help_start(self):
+ self.do_start("-h")
- try:
- params = ast.literal_eval(sp[1])
- if not isinstance(params, dict):
- bad_parse = True
+ ############# stop
+ def do_stop(self, line):
+ '''stops port(s) transmitting traffic\n'''
+ self.stateless_client.cmd_stop_line(line)
- except ValueError as e1:
- bad_parse = True
- except SyntaxError as e2:
- bad_parse = True
+ ############# stop
+ def do_pause(self, line):
+ '''pause port(s) transmitting traffic\n'''
+ self.stateless_client.cmd_pause_line(line)
- if bad_parse:
- print "\nValue should be a valid dict: '{0}'".format(sp[1])
- print "\nUsage: [method name] [param dict as string]\n"
- print "Example: rpc test_add {'x': 12, 'y': 17}\n"
- return
+ ############# stop
+ def do_resume(self, line):
+ '''resume port(s) transmitting traffic\n'''
+ self.stateless_client.cmd_resume_line(line)
- res_ok, msg = self.stateless_client.transmit(method, params)
- if res_ok:
- print "\nServer Response:\n\n" + pretty_json(json.dumps(msg)) + "\n"
- else:
- print "\n*** " + msg + "\n"
- #print "Please try 'reconnect' to reconnect to server"
+
+ def help_stop(self):
+ self.do_stop("-h")
- def complete_rpc (self, text, line, begidx, endidx):
- return [x
- for x in self.supported_rpc
- if x.startswith(text)]
+ ########## reset
+ def do_reset (self, line):
+ '''force stop all ports\n'''
+ self.stateless_client.cmd_reset()
- def do_status (self, line):
+
+ # tui
+ def do_tui (self, line):
'''Shows a graphical console\n'''
if not self.stateless_client.is_connected():
- print "Not connected to server\n"
+ print format_text("\nNot connected to server\n", 'bold')
return
self.do_verbose('off')
trex_status.show_trex_status(self.stateless_client)
+ # quit function
def do_quit(self, line):
'''Exit the client\n'''
return True
- def do_disconnect (self, line):
- '''Disconnect from the server\n'''
- if not self.stateless_client.is_connected():
- print "Not connected to server\n"
- return
-
- res_ok, msg = self.stateless_client.disconnect()
- if res_ok:
- print format_text("[SUCCESS]\n", 'green', 'bold')
- else:
- print msg + "\n"
-
- def do_whoami (self, line):
- '''Prints console user name\n'''
- print "\n" + self.stateless_client.user + "\n"
-
- def postcmd(self, stop, line):
- if self.stateless_client.is_connected():
- self.prompt = "TRex > "
- else:
- self.supported_rpc = None
- self.prompt = "TRex (offline) > "
-
- return stop
-
- def default(self, line):
- print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line)
-
+
def do_help (self, line):
- '''Shows This Help Screen\n'''
- if line:
- try:
- func = getattr(self, 'help_' + line)
- except AttributeError:
- try:
- doc = getattr(self, 'do_' + line).__doc__
- if doc:
- self.stdout.write("%s\n"%str(doc))
- return
- except AttributeError:
- pass
- self.stdout.write("%s\n"%str(self.nohelp % (line,)))
- return
- func()
- return
-
- print "\nSupported Console Commands:"
- print "----------------------------\n"
-
- cmds = [x[3:] for x in self.get_names() if x.startswith("do_")]
- for cmd in cmds:
- if cmd == "EOF":
- continue
-
- try:
- doc = getattr(self, 'do_' + cmd).__doc__
- if doc:
- help = str(doc)
- else:
- help = "*** Undocumented Function ***\n"
- except AttributeError:
- help = "*** Undocumented Function ***\n"
-
- print "{:<30} {:<30}".format(cmd + " - ", help)
-
- def do_stream_db_add(self, line):
- '''Loads a YAML stream list serialization into user console \n'''
- args = line.split()
- if len(args) >= 2:
- name = args[0]
- yaml_path = args[1]
- try:
- multiplier = args[2]
- except IndexError:
- multiplier = 1
- stream_list = CStreamList()
- loaded_obj = stream_list.load_yaml(yaml_path, multiplier)
- # print self.stateless_client.pretty_json(json.dumps(loaded_obj))
- try:
- compiled_streams = stream_list.compile_streams()
- res_ok = self.streams_db.load_streams(name, LoadedStreamList(loaded_obj,
- [StreamPack(v.stream_id, v.stream.dump())
- for k, v in compiled_streams.items()]))
- if res_ok:
- print green("Stream pack '{0}' loaded and added successfully\n".format(name))
- else:
- print magenta("Picked name already exist. Please pick another name.\n")
- except Exception as e:
- print "adding new stream failed due to the following error:\n", str(e)
- print format_text("[FAILED]\n", 'red', 'bold')
-
- return
- else:
- print magenta("please provide load name and YAML path, separated by space.\n"
- "Optionally, you may provide a third argument to specify multiplier.\n")
-
- @staticmethod
- def tree_autocomplete(text):
- dir = os.path.dirname(text)
- if dir:
- path = dir
- else:
- path = "."
- start_string = os.path.basename(text)
- return [x
- for x in os.listdir(path)
- if x.startswith(start_string)]
+ '''Shows This Help Screen\n'''
+ if line:
+ try:
+ func = getattr(self, 'help_' + line)
+ except AttributeError:
+ try:
+ doc = getattr(self, 'do_' + line).__doc__
+ if doc:
+ self.stdout.write("%s\n"%str(doc))
+ return
+ except AttributeError:
+ pass
+ self.stdout.write("%s\n"%str(self.nohelp % (line,)))
+ return
+ func()
+ return
+
+ print "\nSupported Console Commands:"
+ print "----------------------------\n"
+
+ cmds = [x[3:] for x in self.get_names() if x.startswith("do_")]
+ for cmd in cmds:
+ if ( (cmd == "EOF") or (cmd == "q") or (cmd == "exit")):
+ continue
+
+ try:
+ doc = getattr(self, 'do_' + cmd).__doc__
+ if doc:
+ help = str(doc)
+ else:
+ help = "*** Undocumented Function ***\n"
+ except AttributeError:
+ help = "*** Undocumented Function ***\n"
+
+ print "{:<30} {:<30}".format(cmd + " - ", help)
+ do_exit = do_EOF = do_q = do_quit
- def complete_stream_db_add(self, text, line, begidx, endidx):
- arg_num = len(line.split()) - 1
- if arg_num == 2:
- return TRexConsole.tree_autocomplete(line.split()[-1])
- else:
- return [text]
-
- def do_stream_db_show(self, line):
- '''Shows the loaded stream list named [name] \n'''
- args = line.split()
- if args:
- list_name = args[0]
- try:
- stream = self.streams_db.get_stream_pack(list_name)#user_streams[list_name]
- if len(args) >= 2 and args[1] == "full":
- print pretty_json(json.dumps(stream.compiled))
- else:
- print pretty_json(json.dumps(stream.loaded))
- except KeyError as e:
- print "Unknown stream list name provided"
- else:
- print "Available stream packs:\n{0}".format(', '.join(sorted(self.streams_db.get_loaded_streams_names())))
-
- def complete_stream_db_show(self, text, line, begidx, endidx):
- return [x
- for x in self.streams_db.get_loaded_streams_names()
- if x.startswith(text)]
-
- def do_stream_db_remove(self, line):
- '''Removes a single loaded stream packs from loaded stream pack repository\n'''
- args = line.split()
- if args:
- removed_streams = self.streams_db.remove_stream_packs(*args)
- if removed_streams:
- print green("The following stream packs were removed:")
- print bold(", ".join(sorted(removed_streams)))
- print format_text("[SUCCESS]\n", 'green', 'bold')
- else:
- print red("No streams were removed. Make sure to provide valid stream pack names.")
- else:
- print magenta("Please provide stream pack name(s), separated with spaces.")
-
- def do_stream_db_clear(self, line):
- '''Clears all loaded stream packs from loaded stream pack repository\n'''
- self.streams_db.clear()
- print format_text("[SUCCESS]\n", 'green', 'bold')
-
-
- def complete_stream_db_remove(self, text, line, begidx, endidx):
- return [x
- for x in self.streams_db.get_loaded_streams_names()
- if x.startswith(text)]
-
-
- def do_attach(self, line):
- '''Assign loaded stream pack into specified ports on TRex\n'''
- args = line.split()
- if len(args) >= 2:
- stream_pack_name = args[0]
- stream_list = self.streams_db.get_stream_pack(stream_pack_name) #user_streams[args[0]]
- if not stream_list:
- print "Provided stream list name '{0}' doesn't exists.".format(stream_pack_name)
- print format_text("[FAILED]\n", 'red', 'bold')
- return
- if args[0] == "all":
- ask = ConfirmMenu('Are you sure you want to release all acquired ports? ')
- rc = ask.show()
- if rc == False:
- print yellow("[ABORTED]\n")
- return
- else:
- port_list = self.stateless_client.get_acquired_ports()
- else:
- port_list = self.extract_port_ids_from_line(' '.join(args[1:]))
- owned = set(self.stateless_client.get_acquired_ports())
- try:
- if set(port_list).issubset(owned):
- res_ok, log = self.stateless_client.add_stream_pack(port_list, *stream_list.compiled)
- # res_ok, msg = self.stateless_client.add_stream(port_list, stream_list.compiled)
- self.prompt_response(log)
- if not res_ok:
- print format_text("[FAILED]\n", 'red', 'bold')
- return
- print format_text("[SUCCESS]\n", 'green', 'bold')
- return
- else:
- print "Not all desired ports are acquired.\n" \
- "Acquired ports are: {acq}\n" \
- "Requested ports: {req}\n" \
- "Missing ports: {miss}".format(acq=list(owned),
- req=port_list,
- miss=list(set(port_list).difference(owned)))
- print format_text("[FAILED]\n", 'red', 'bold')
- except ValueError as e:
- print magenta(str(e))
- print format_text("[FAILED]\n", 'red', 'bold')
- else:
- print magenta("Please provide list name and ports to attach to, "
- "or specify 'all' to attach all owned ports.\n")
-
- def complete_attach(self, text, line, begidx, endidx):
- arg_num = len(line.split()) - 1
- if arg_num == 1:
- # return optional streams packs
- if line.endswith(" "):
- return self.port_auto_complete(text, line, begidx, endidx)
- return [x
- for x in self.streams_db.get_loaded_streams_names()
- if x.startswith(text)]
- elif arg_num >= 2:
- # return optional ports to attach to
- return self.port_auto_complete(text, line, begidx, endidx)
- else:
- return [text]
-
- def prompt_response(self, response_obj):
- resp_list = response_obj if isinstance(response_obj, list) else [response_obj]
- def format_return_status(return_status):
- if return_status:
- return green("OK")
- else:
- return red("FAIL")
-
- for response in resp_list:
- response_str = "{id:^3} - {msg} ({stat})".format(id=response.id,
- msg=response.msg,
- stat=format_return_status(response.success))
- print response_str
- return
-
- def do_remove_all_streams(self, line):
- '''Acquire ports\n'''
-
- # make sure that the user wants to acquire all
- args = line.split()
- if len(args) < 1:
- print magenta("Please provide a list of ports separated by spaces, "
- "or specify 'all' to remove from all acquired ports")
- return
- if args[0] == "all":
- ask = ConfirmMenu('Are you sure you want to remove all stream packs from all acquired ports? ')
- rc = ask.show()
- if rc == False:
- print yellow("[ABORTED]\n")
- return
- else:
- port_list = self.stateless_client.get_acquired_ports()
- else:
- port_list = self.extract_port_ids_from_line(line)
-
- # rc, resp_list = self.stateless_client.take_ownership(port_list, force)
- try:
- res_ok, log = self.stateless_client.remove_all_streams(port_list)
- self.prompt_response(log)
- if not res_ok:
- print format_text("[FAILED]\n", 'red', 'bold')
- return
- print format_text("[SUCCESS]\n", 'green', 'bold')
- except ValueError as e:
- print magenta(str(e))
- print format_text("[FAILED]\n", 'red', 'bold')
-
- def complete_remove_all_streams(self, text, line, begidx, endidx):
- return self.port_auto_complete(text, line, begidx, endidx)
- def do_start_traffic(self, line):
- '''Start pre-submitted traffic in specified ports on TRex\n'''
- # make sure that the user wants to acquire all
- args = line.split()
- if len(args) < 1:
- print magenta("Please provide a list of ports separated by spaces, "
- "or specify 'all' to start traffic on all acquired ports")
- return
- if args[0] == "all":
- ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ')
- rc = ask.show()
- if rc == False:
- print yellow("[ABORTED]\n")
- return
- else:
- port_list = self.stateless_client.get_acquired_ports()
- else:
- port_list = self.extract_port_ids_from_line(line)
-
- try:
- res_ok, log = self.stateless_client.start_traffic(port_list)
- self.prompt_response(log)
- if not res_ok:
- print format_text("[FAILED]\n", 'red', 'bold')
- return
- print format_text("[SUCCESS]\n", 'green', 'bold')
- except ValueError as e:
- print magenta(str(e))
- print format_text("[FAILED]\n", 'red', 'bold')
+#
+def is_valid_file(filename):
+ if not os.path.isfile(filename):
+ raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename)
- def complete_start_traffic(self, text, line, begidx, endidx):
- return self.port_auto_complete(text, line, begidx, endidx)
+ return filename
- def do_stop_traffic(self, line):
- '''Stop active traffic in specified ports on TRex\n'''
- # make sure that the user wants to acquire all
- args = line.split()
- if len(args) < 1:
- print magenta("Please provide a list of ports separated by spaces, "
- "or specify 'all' to stop traffic on all acquired ports")
- return
- if args[0] == "all":
- ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ')
- rc = ask.show()
- if rc == False:
- print yellow("[ABORTED]\n")
- return
- else:
- port_list = self.stateless_client.get_active_ports()
- else:
- port_list = self.extract_port_ids_from_line(line)
-
- try:
- res_ok, log = self.stateless_client.stop_traffic(port_list)
- self.prompt_response(log)
- if not res_ok:
- print format_text("[FAILED]\n", 'red', 'bold')
- return
- print format_text("[SUCCESS]\n", 'green', 'bold')
- except ValueError as e:
- print magenta(str(e))
- print format_text("[FAILED]\n", 'red', 'bold')
-
- def complete_stop_traffic(self, text, line, begidx, endidx):
- return self.port_auto_complete(text, line, begidx, endidx, active=True)
-
- # aliasing
- do_exit = do_EOF = do_q = do_quit
def setParserOptions():
parser = argparse.ArgumentParser(prog="trex_console.py")
@@ -769,12 +356,13 @@ def setParserOptions():
default = "localhost",
type = str)
- parser.add_argument("-p", "--port", help = "TRex Server Port [default is 5050]\n",
- default = 5050,
+ parser.add_argument("-p", "--port", help = "TRex Server Port [default is 4501]\n",
+ default = 4501,
type = int)
- parser.add_argument("-z", "--pub", help = "TRex Async Publisher Port [default is 4500]\n",
+ parser.add_argument("--async_port", help = "TRex ASync Publisher Port [default is 4500]\n",
default = 4500,
+ dest='pub',
type = int)
parser.add_argument("-u", "--user", help = "User Name [default is currently logged in user]\n",
@@ -785,18 +373,38 @@ def setParserOptions():
action="store_true", help="Switch ON verbose option. Default is: OFF.",
default = False)
+
+ parser.add_argument("--no_acquire", dest="acquire",
+ action="store_false", help="Acquire all ports on connect. Default is: ON.",
+ default = True)
+
+ parser.add_argument("--batch", dest="batch",
+ nargs = 1,
+ type = is_valid_file,
+ help = "Run the console in a batch mode with file",
+ default = None)
+
return parser
+
def main():
parser = setParserOptions()
options = parser.parse_args()
# Stateless client connection
stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub)
+ rc = stateless_client.cmd_connect()
+ if rc.bad():
+ return
+ if options.batch:
+ cont = stateless_client.run_script_file(options.batch[0])
+ if not cont:
+ return
+
# console
try:
- console = TRexConsole(stateless_client, options.verbose)
+ console = TRexConsole(stateless_client, options.acquire, options.verbose)
console.cmdloop()
except KeyboardInterrupt as e:
print "\n\n*** Caught Ctrl + C... Exiting...\n\n"
diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py
index 4e73e0bb..869812a1 100644
--- a/scripts/automation/trex_control_plane/console/trex_status.py
+++ b/scripts/automation/trex_control_plane/console/trex_status.py
@@ -18,15 +18,6 @@ def percentage (a, total):
x = int ((float(a) / total) * 100)
return str(x) + "%"
-# simple float to human readable
-def float_to_human_readable (size, suffix = "bps"):
- for unit in ['','K','M','G','T']:
- if abs(size) < 1000.0:
- return "%3.2f %s%s" % (size, unit, suffix)
- size /= 1000.0
- return "NaN"
-
-
################### panels #################
# panel object
@@ -37,6 +28,8 @@ class TrexStatusPanel(object):
self.log = status_obj.log
self.stateless_client = status_obj.stateless_client
+
+ self.stats = status_obj.stats
self.general_stats = status_obj.general_stats
self.h = h
@@ -97,7 +90,7 @@ class ServerInfoPanel(TrexStatusPanel):
self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"]))
- ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports)
+ ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports_list)
if not ports_owned:
ports_owned = "None"
@@ -119,18 +112,23 @@ class GeneralInfoPanel(TrexStatusPanel):
self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util")))
- self.getwin().addstr(5, 2, "{:<30} {:} / {:}".format("Total Tx. rate:",
- float_to_human_readable(self.general_stats.get("m_tx_bps")),
- float_to_human_readable(self.general_stats.get("m_tx_pps"), suffix = "pps")))
+ self.getwin().addstr(6, 2, "{:<30} {:} / {:}".format("Total Tx. rate:",
+ self.general_stats.get("m_tx_bps", format = True, suffix = "bps"),
+ self.general_stats.get("m_tx_pps", format = True, suffix = "pps")))
+
+
+ self.getwin().addstr(8, 2, "{:<30} {:} / {:}".format("Total Tx:",
+ self.general_stats.get_rel("m_total_tx_bytes", format = True, suffix = "B"),
+ self.general_stats.get_rel("m_total_tx_pkts", format = True, suffix = "pkts")))
- # missing RX field
- #self.getwin().addstr(5, 2, "{:<30} {:} / {:}".format("Total Rx. rate:",
- # float_to_human_readable(self.general_stats.get("m_rx_bps")),
- # float_to_human_readable(self.general_stats.get("m_rx_pps"), suffix = "pps")))
+ self.getwin().addstr(11, 2, "{:<30} {:} / {:}".format("Total Rx. rate:",
+ self.general_stats.get("m_rx_bps", format = True, suffix = "bps"),
+ self.general_stats.get("m_rx_pps", format = True, suffix = "pps")))
- self.getwin().addstr(7, 2, "{:<30} {:} / {:}".format("Total Tx:",
- float_to_human_readable(self.general_stats.get_rel("m_total_tx_bytes"), suffix = "B"),
- float_to_human_readable(self.general_stats.get_rel("m_total_tx_pkts"), suffix = "pkts")))
+
+ self.getwin().addstr(13, 2, "{:<30} {:} / {:}".format("Total Rx:",
+ self.general_stats.get_rel("m_total_rx_bytes", format = True, suffix = "B"),
+ self.general_stats.get_rel("m_total_rx_pkts", format = True, suffix = "pkts")))
# all ports stats
class PortsStatsPanel(TrexStatusPanel):
@@ -142,44 +140,64 @@ class PortsStatsPanel(TrexStatusPanel):
def draw (self):
self.clear()
- return
- owned_ports = self.status_obj.owned_ports
+ owned_ports = self.status_obj.owned_ports_list
if not owned_ports:
self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports")
return
# table header
- self.getwin().addstr(3, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
- "Port ID", "Tx [pps]", "Tx [bps]", "Tx [bytes]", "Rx [pps]", "Rx [bps]", "Rx [bytes]"))
+ self.getwin().addstr(3, 2, "{:^15} {:^30} {:^30} {:^30}".format(
+ "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]"))
+
- # port loop
- self.status_obj.stats.query_sync()
for i, port_index in enumerate(owned_ports):
port_stats = self.status_obj.stats.get_port_stats(port_index)
if port_stats:
- self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15,.2f} {:^15,.2f} {:^15,} {:^15,.2f} {:^15,.2f} {:^15,}".format(
+ self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format(
"{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
- port_stats["tx_pps"],
- port_stats["tx_bps"],
- port_stats["total_tx_bytes"],
- port_stats["rx_pps"],
- port_stats["rx_bps"],
- port_stats["total_rx_bytes"]))
-
+ "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
+ port_stats.get("m_total_tx_pps", format = True, suffix = "pps")),
+
+ "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
+ port_stats.get("m_total_rx_pps", format = True, suffix = "pps")),
+ "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"),
+ port_stats.get_rel("ibytes", format = True, suffix = "B"))))
+
else:
- self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+
+ self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format(
"{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
"N/A",
"N/A",
"N/A",
- "N/A",
- "N/A",
"N/A"))
+
+ # old format
+# if port_stats:
+# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+# "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
+# port_stats.get("m_total_tx_pps", format = True, suffix = "pps"),
+# port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
+# port_stats.get_rel("obytes", format = True, suffix = "B"),
+# port_stats.get("m_total_rx_pps", format = True, suffix = "pps"),
+# port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
+# port_stats.get_rel("ibytes", format = True, suffix = "B")))
+#
+# else:
+# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+# "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
+# "N/A",
+# "N/A",
+# "N/A",
+# "N/A",
+# "N/A",
+# "N/A"))
+
# control panel
class ControlPanel(TrexStatusPanel):
def __init__ (self, h, l, y, x, status_obj):
@@ -208,7 +226,7 @@ class SinglePortPanel(TrexStatusPanel):
self.clear()
- if not self.port_id in self.status_obj.stateless_client.get_owned_ports():
+ if not self.port_id in self.status_obj.owned_ports_list:
self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id))
return
@@ -222,16 +240,19 @@ class SinglePortPanel(TrexStatusPanel):
y += 2
# streams
- if 'streams' in self.status_obj.snapshot[self.port_id]:
- for stream_id, stream in self.status_obj.snapshot[self.port_id]['streams'].iteritems():
+
+ if 'streams' in self.status_obj.owned_ports[str(self.port_id)]:
+ stream_info = self.status_obj.owned_ports[str(self.port_id)]['streams']
+
+ for stream_id, stream in sorted(stream_info.iteritems(), key=operator.itemgetter(0)):
self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
stream_id,
- ("True" if stream['stream']['enabled'] else "False"),
- stream['stream']['mode']['type'],
- ("True" if stream['stream']['self_start'] else "False"),
- stream['stream']['isg'],
- (stream['stream']['next_stream_id'] if stream['stream']['next_stream_id'] != -1 else "None"),
- ("{0} instr.".format(len(stream['stream']['vm'])) if stream['stream']['vm'] else "None")))
+ ("True" if stream['enabled'] else "False"),
+ stream['mode']['type'],
+ ("True" if stream['self_start'] else "False"),
+ stream['isg'],
+ (stream['next_stream_id'] if stream['next_stream_id'] != -1 else "None"),
+ ("{0} instr.".format(len(stream['vm'])) if stream['vm'] else "None")))
y += 1
@@ -241,37 +262,36 @@ class SinglePortPanel(TrexStatusPanel):
self.getwin().addstr(y, 2, "Traffic:", curses.A_UNDERLINE)
y += 2
- self.status_obj.stats.query_sync()
- port_stats = self.status_obj.stats.get_port_stats(self.port_id)
- # table header
- self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
- "Port ID", "Tx [pps]", "Tx [bps]", "Tx [bytes]", "Rx [pps]", "Rx [bps]", "Rx [bytes]"))
+ # table header
+ self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
+ "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]"))
+
y += 2
- if port_stats:
- self.getwin().addstr(y, 2, "{:^15} {:^15,} {:^15,} {:^15,} {:^15,} {:^15,} {:^15,}".format(
- "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]),
- port_stats["tx_pps"],
- port_stats["tx_bps"],
- port_stats["total_tx_bytes"],
- port_stats["rx_pps"],
- port_stats["rx_bps"],
- port_stats["total_rx_bytes"]))
+ port_stats = self.status_obj.stats.get_port_stats(self.port_id)
+ if port_stats:
+ self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
+ "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]),
+ "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
+ port_stats.get("m_total_tx_pps", format = True, suffix = "pps")),
+
+ "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
+ port_stats.get("m_total_rx_pps", format = True, suffix = "pps")),
+ "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"),
+ port_stats.get_rel("ibytes", format = True, suffix = "B"))))
+
else:
- self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+ self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
"{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]),
"N/A",
"N/A",
"N/A",
- "N/A",
- "N/A",
"N/A"))
- y += 2
################### main objects #################
@@ -371,23 +391,44 @@ class TrexStatus():
self.stateless_client = stateless_client
- self.log = TrexStatusLog()
+ self.log = TrexStatusLog()
self.cmds = TrexStatusCommands(self)
+ self.stats = stateless_client.get_stats_async()
self.general_stats = stateless_client.get_stats_async().get_general_stats()
# fetch server info
- rc, self.server_sys_info = self.stateless_client.get_system_info()
- if not rc:
- return
+ self.server_sys_info = self.stateless_client.get_system_info()
- rc, self.server_version = self.stateless_client.get_version()
- if not rc:
- return
+ self.server_version = self.stateless_client.get_version()
+
+ # list of owned ports
+ self.owned_ports_list = self.stateless_client.get_acquired_ports()
+
+ # data per port
+ self.owned_ports = {}
- self.owned_ports = self.stateless_client.get_acquired_ports()
+ for port_id in self.owned_ports_list:
+ self.owned_ports[str(port_id)] = {}
+ self.owned_ports[str(port_id)]['streams'] = {}
+ stream_list = self.stateless_client.get_all_streams(port_id)
+
+ self.owned_ports[str(port_id)] = stream_list
+
+ try:
+ curses.curs_set(0)
+ except:
+ pass
+
+ curses.use_default_colors()
+ self.stdscr.nodelay(1)
+ curses.nonl()
+ curses.noecho()
+
+ self.generate_layout()
+
def generate_layout (self):
self.max_y = self.stdscr.getmaxyx()[0]
@@ -423,17 +464,20 @@ class TrexStatus():
# main run entry point
def run (self):
- try:
- curses.curs_set(0)
- except:
- pass
- curses.use_default_colors()
- self.stdscr.nodelay(1)
- curses.nonl()
- curses.noecho()
+ # list of owned ports
+ self.owned_ports_list = self.stateless_client.get_acquired_ports()
- self.generate_layout()
+ # data per port
+ self.owned_ports = {}
+
+ for port_id in self.owned_ports_list:
+ self.owned_ports[str(port_id)] = {}
+ self.owned_ports[str(port_id)]['streams'] = {}
+
+ stream_list = self.stateless_client.get_all_streams(port_id)
+
+ self.owned_ports[str(port_id)] = stream_list
self.update_active = True
while (True):
@@ -455,8 +499,15 @@ class TrexStatus():
sleep(0.01)
+# global container
+trex_status = None
+
def show_trex_status_internal (stdscr, stateless_client):
- trex_status = TrexStatus(stdscr, stateless_client)
+ global trex_status
+
+ if trex_status == None:
+ trex_status = TrexStatus(stdscr, stateless_client)
+
trex_status.run()
def show_trex_status (stateless_client):
diff --git a/scripts/automation/trex_control_plane/server/trex_server.py b/scripts/automation/trex_control_plane/server/trex_server.py
index e48f8963..7dee89e9 100755
--- a/scripts/automation/trex_control_plane/server/trex_server.py
+++ b/scripts/automation/trex_control_plane/server/trex_server.py
@@ -59,6 +59,7 @@ class CTRexServer(object):
self.__check_trex_path_validity()
self.__check_files_path_validity()
self.trex = CTRex()
+ self.trex_version = None
self.trex_host = trex_host
self.trex_daemon_port = trex_daemon_port
self.trex_zmq_port = trex_zmq_port
@@ -98,6 +99,7 @@ class CTRexServer(object):
logger.info("current working dir is: {0}".format(self.TREX_PATH) )
logger.info("current files dir is : {0}".format(self.trex_files_path) )
logger.debug("Starting TRex server. Registering methods to process.")
+ logger.info(self.get_trex_version(base64 = False))
self.server = SimpleJSONRPCServer( (self.trex_host, self.trex_daemon_port) )
except socket.error as e:
if e.errno == errno.EADDRINUSE:
@@ -116,6 +118,9 @@ class CTRexServer(object):
# set further functionality and peripherals to server instance
try:
self.server.register_function(self.add)
+ self.server.register_function(self.get_trex_log)
+ self.server.register_function(self.get_trex_daemon_log)
+ self.server.register_function(self.get_trex_version)
self.server.register_function(self.connectivity_check)
self.server.register_function(self.start_trex)
self.server.register_function(self.stop_trex)
@@ -140,6 +145,46 @@ class CTRexServer(object):
self.server.shutdown()
pass
+ # get files from Trex server and return their content (mainly for logs)
+ @staticmethod
+ def _pull_file(filepath):
+ try:
+ with open(filepath, 'rb') as f:
+ file_content = f.read()
+ return binascii.b2a_base64(file_content)
+ except Exception as e:
+ err_str = "Can't get requested file: {0}, possibly due to TRex that did not run".format(filepath)
+ logger.error('{0}, error: {1}'.format(err_str, e))
+ return Fault(-33, err_str)
+
+ # get Trex log /tmp/trex.txt
+ def get_trex_log(self):
+ logger.info("Processing get_trex_log() command.")
+ return self._pull_file('/tmp/trex.txt')
+
+ # get daemon log /var/log/trex/trex_daemon_server.log
+ def get_trex_daemon_log (self):
+ logger.info("Processing get_trex_daemon_log() command.")
+ return self._pull_file('/var/log/trex/trex_daemon_server.log')
+
+ # get Trex version from ./t-rex-64 --help (last 4 lines)
+ def get_trex_version (self, base64 = True):
+ try:
+ logger.info("Processing get_trex_version() command.")
+ if not self.trex_version:
+ help_print = subprocess.Popen(['./t-rex-64', '--help'], cwd = self.TREX_PATH, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ help_print.wait()
+ help_print_stdout = help_print.stdout.read()
+ self.trex_version = binascii.b2a_base64('\n'.join(help_print_stdout.split('\n')[-5:-1]))
+ if base64:
+ return self.trex_version
+ else:
+ return binascii.a2b_base64(self.trex_version)
+ except Exception as e:
+ err_str = "Can't get trex version, error: {0}".format(e)
+ logger.error(err_str)
+ return Fault(-33, err_str)
+
def stop_handler (self, signum, frame):
logger.info("Daemon STOP request detected.")
if self.is_running():