summaryrefslogtreecommitdiffstats
path: root/scripts/automation
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py95
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_client.py30
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py407
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py12
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/parsing_opts.py108
-rwxr-xr-xscripts/automation/trex_control_plane/common/text_opts.py44
-rwxr-xr-xscripts/automation/trex_control_plane/common/trex_streams.py19
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py46
-rwxr-xr-xscripts/automation/trex_control_plane/server/extended_daemon_runner.py8
-rwxr-xr-xscripts/automation/trex_control_plane/server/trex_daemon_server.py10
10 files changed, 589 insertions, 190 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 8b274134..8fdf7c9b 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -162,28 +162,94 @@ class CTRexAsyncClient():
self.stats = CTRexAsyncStatsManager()
+ self.connected = False
+
+ # connects the async channel
+ def connect (self):
+
+ if self.connected:
+ self.disconnect()
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
+ print "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+ # before running the thread - mark as active
self.active = True
- self.t = threading.Thread(target= self.run)
+ self.alive = False
+ self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
- def run(self):
+ self.connected = True
- # Socket to talk to server
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
+ # wait for data streaming from the server
+ timeout = time.time() + 5
+ while not self.alive:
+ time.sleep(0.01)
+ if time.time() > timeout:
+ self.disconnect()
+ return False, "*** [subscriber] - no data flow from server at : " + self.tr
+
+ return True, ""
+
+
+ # disconnect
+ def disconnect (self):
+ if not self.connected:
+ return
+
+ # signal that the context was destroyed (exit the thread loop)
+ self.context.term()
+
+ # mark for join and join
+ self.active = False
+ self.t.join()
+
+ # done
+ self.connected = False
+
+ # thread function
+ def _run (self):
+
+ # no data yet...
+ self.alive = False
+
+ # socket must be created on the same thread
self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
+ self.socket.setsockopt(zmq.RCVTIMEO, 5000)
while self.active:
- line = self.socket.recv_string()
+ try:
+
+ line = self.socket.recv_string()
+
+ if not self.alive:
+ self.stateless_client.on_async_alive()
+ self.alive = True
+
+ # got a timeout - mark as not alive and retry
+ except zmq.Again:
+
+ if self.alive:
+ self.stateless_client.on_async_dead()
+ self.alive = False
+
+ continue
+
+ except zmq.ContextTerminated:
+ # outside thread signaled us to exit
+ self.alive = False
+ break
+
msg = json.loads(line)
name = msg['name']
@@ -193,7 +259,12 @@ class CTRexAsyncClient():
self.__dispatch(name, type, data)
- def get_stats(self):
+
+ # closing of socket must be from the same thread
+ self.socket.close(linger = 0)
+
+
+ def get_stats (self):
return self.stats
def get_raw_snapshot (self):
@@ -203,7 +274,6 @@ class CTRexAsyncClient():
def __dispatch (self, name, type, data):
# stats
if name == "trex-global":
- # self.stats.update(data)
self.stateless_client.handle_async_stats_update(data)
# events
elif name == "trex-event":
@@ -212,10 +282,3 @@ class CTRexAsyncClient():
pass
- def stop (self):
- self.active = False
- self.t.join()
-
-
-if __name__ == "__main__":
- pass \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py
index 160abdec..77b11c37 100755
--- a/scripts/automation/trex_control_plane/client/trex_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_client.py
@@ -131,7 +131,9 @@ class CTRexClient(object):
raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.')
trex_cmd_options.update( {'f' : f, 'd' : d} )
-
+ if not trex_cmd_options.get('l'):
+ self.result_obj.latency_checked = False
+
self.result_obj.clear_results()
try:
issue_time = time.time()
@@ -767,6 +769,7 @@ class CTRexResult(object):
"""
self._history = deque(maxlen = max_history_size)
self.clear_results()
+ self.latency_checked = True
def __repr__(self):
return ("Is valid history? {arg}\n".format( arg = self.is_valid_hist() ) +
@@ -1032,18 +1035,19 @@ class CTRexResult(object):
self._done_warmup = True
# handle latency data
- latency_pre = "trex-latency"
- self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC
- # support old typo
- if self._max_latency is None:
- latency_pre = "trex-latecny"
- self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")
-
- self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC
- self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency)
-
- avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-")
- self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list)
+ if self.latency_checked:
+ latency_pre = "trex-latency"
+ self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC
+ # support old typo
+ if self._max_latency is None:
+ latency_pre = "trex-latecny"
+ self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")
+
+ self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC
+ self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency)
+
+ avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-")
+ self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list)
tx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_tx_pkts")
rx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_rx_pkts")
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 748817da..2925e7eb 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -14,7 +14,6 @@ import json
from common.trex_streams import *
from collections import namedtuple
from common.text_opts import *
-# import trex_stats
from common import trex_stats
from client_utils import parsing_opts, text_tables
import time
@@ -52,14 +51,14 @@ class RC():
return not self.good()
def data (self):
- return all([x.data if x.rc else "" for x in self.rc_list])
+ return [x.data if x.rc else "" for x in self.rc_list]
def err (self):
- return all([x.data if not x.rc else "" for x in self.rc_list])
+ return [x.data if not x.rc else "" for x in self.rc_list]
def annotate (self, desc = None):
if desc:
- print format_text('\n{:<40}'.format(desc), 'bold'),
+ print format_text('\n{:<60}'.format(desc), 'bold'),
if self.bad():
# print all the errors
@@ -76,14 +75,29 @@ class RC():
print format_text("[SUCCESS]\n", 'green', 'bold')
-def RC_OK():
- return RC(True, "")
-def RC_ERR(err):
+def RC_OK(data = None):
+ return RC(True, data)
+def RC_ERR (err):
return RC(False, err)
-
LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
+########## utlity ############
+def mult_to_factor (mult, max_bps, max_pps, line_util):
+ if mult['type'] == 'raw':
+ return mult['value']
+
+ if mult['type'] == 'bps':
+ return mult['value'] / max_bps
+
+ if mult['type'] == 'pps':
+ return mult['value'] / max_pps
+
+ if mult['type'] == 'percentage':
+ return mult['value'] / line_util
+
+
+
# describes a stream DB
class CStreamsDB(object):
@@ -157,22 +171,26 @@ class Port(object):
STATE_PAUSE: "PAUSE"}
- def __init__ (self, port_id, speed, driver, user, transmit):
+ def __init__ (self, port_id, speed, driver, user, comm_link):
self.port_id = port_id
self.state = self.STATE_IDLE
self.handler = None
- self.transmit = transmit
+ self.comm_link = comm_link
+ self.transmit = comm_link.transmit
+ self.transmit_batch = comm_link.transmit_batch
self.user = user
self.driver = driver
self.speed = speed
self.streams = {}
+ self.profile = None
+
self.port_stats = trex_stats.CPortStats(self)
def err(self, msg):
return RC_ERR("port {0} : {1}".format(self.port_id, msg))
- def ok(self):
- return RC_OK()
+ def ok(self, data = None):
+ return RC_OK(data)
def get_speed_bps (self):
return (self.speed * 1000 * 1000 * 1000)
@@ -259,6 +277,33 @@ class Port(object):
return self.ok()
+ # add multiple streams
+ def add_streams (self, streams_list):
+ batch = []
+
+ for stream in streams_list:
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream.stream_id,
+ "stream": stream.stream}
+
+ cmd = RpcCmdData('add_stream', params)
+ batch.append(cmd)
+
+ rc, data = self.transmit_batch(batch)
+
+ if not rc:
+ return self.err(data)
+
+ # add the stream
+ for stream in streams_list:
+ self.streams[stream.stream_id] = stream.stream
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
# remove stream from port
def remove_stream (self, stream_id):
@@ -302,14 +347,6 @@ class Port(object):
def get_all_streams (self):
return self.streams
-
- def process_mul (self, mul):
- # if percentage - translate
- if mul['type'] == 'percentage':
- mul['type'] = 'max_bps'
- mul['max'] = self.get_speed_bps() * (mul['max'] / 100)
-
-
# start traffic
def start (self, mul, duration):
if self.state == self.STATE_DOWN:
@@ -321,8 +358,6 @@ class Port(object):
if self.state == self.STATE_TX:
return self.err("Unable to start traffic - port is already transmitting")
- self.process_mul(mul)
-
params = {"handler": self.handler,
"port_id": self.port_id,
"mul": mul,
@@ -395,8 +430,6 @@ class Port(object):
if (self.state != self.STATE_TX) :
return self.err("port is not transmitting")
- self.process_mul(mul)
-
params = {"handler": self.handler,
"port_id": self.port_id,
"mul": mul}
@@ -407,6 +440,68 @@ class Port(object):
return self.ok()
+
+ def validate (self):
+
+ if (self.state == self.STATE_DOWN):
+ return self.err("port is down")
+
+ if (self.state == self.STATE_IDLE):
+ return self.err("no streams attached to port")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("validate", params)
+ if not rc:
+ return self.err(data)
+
+ self.profile = data
+
+ return self.ok()
+
+ def get_profile (self):
+ return self.profile
+
+
+ def print_profile (self, mult, duration):
+ if not self.get_profile():
+ return
+
+ rate = self.get_profile()['rate']
+ graph = self.get_profile()['graph']
+
+ print format_text("Profile Map Per Port\n", 'underline', 'bold')
+
+ factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util'])
+
+ print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"),
+ format_num(rate['max_bps'] * factor, suffix = "bps"))
+
+ print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
+ format_num(rate['max_pps'] * factor, suffix = "pps"),)
+
+ print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100),
+ format_percentage(rate['max_line_util'] * factor * 100))
+
+
+ # duration
+ exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
+ exp_time_factor_sec = exp_time_base_sec / factor
+
+ # user configured a duration
+ if duration > 0:
+ if exp_time_factor_sec > 0:
+ exp_time_factor_sec = min(exp_time_factor_sec, duration)
+ else:
+ exp_time_factor_sec = duration
+
+
+ print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec),
+ format_time(exp_time_factor_sec))
+ print "\n"
+
+
def get_port_state_name(self):
return self.STATES_MAP.get(self.state, "Unknown")
@@ -424,6 +519,7 @@ class Port(object):
def clear_stats(self):
return self.port_stats.clear_stats()
+
################# events handler ######################
def async_event_port_stopped (self):
self.state = self.STATE_STREAMS
@@ -435,12 +531,9 @@ class CTRexStatelessClient(object):
def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
- self.system_info = None
self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
self.verbose = False
self.ports = {}
- # self._conn_handler = {}
- # self._active_ports = set()
self._connection_info = {"server": server,
"sync_port": sync_port,
"async_port": async_port}
@@ -448,7 +541,7 @@ class CTRexStatelessClient(object):
self.server_version = {}
self.__err_log = None
- self._async_client = CTRexAsyncClient(server, async_port, self)
+ self.async_client = CTRexAsyncClient(server, async_port, self)
self.streams_db = CStreamsDB()
self.global_stats = trex_stats.CGlobalStats(self._connection_info,
@@ -457,12 +550,26 @@ class CTRexStatelessClient(object):
self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats,
self.ports)
- self.connected = False
-
self.events = []
+ self.connected = False
+
################# events handler ######################
+ def add_event_log (self, msg, ev_type, show = False):
+
+ if ev_type == "server":
+ prefix = "[server]"
+ elif ev_type == "local":
+ prefix = "[local]"
+
+ ts = time.time()
+ st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
+ self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold')))
+
+ if show:
+ print format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))
+
def handle_async_stats_update(self, dump_data):
global_stats = {}
port_stats = {}
@@ -490,22 +597,22 @@ class CTRexStatelessClient(object):
for port_id, data in port_stats.iteritems():
self.ports[port_id].port_stats.update(data)
+
+
def handle_async_event (self, type, data):
# DP stopped
- ev = "[event] - "
-
show_event = False
# port started
if (type == 0):
port_id = int(data['port_id'])
- ev += "Port {0} has started".format(port_id)
+ ev = "Port {0} has started".format(port_id)
# port stopped
elif (type == 1):
port_id = int(data['port_id'])
- ev += "Port {0} has stopped".format(port_id)
+ ev = "Port {0} has stopped".format(port_id)
# call the handler
self.async_event_port_stopped(port_id)
@@ -513,14 +620,14 @@ class CTRexStatelessClient(object):
# server stopped
elif (type == 2):
- ev += "Server has stopped"
+ ev = "Server has stopped"
self.async_event_server_stopped()
show_event = True
# port finished traffic
elif (type == 3):
port_id = int(data['port_id'])
- ev += "Port {0} job done".format(port_id)
+ ev = "Port {0} job done".format(port_id)
# call the handler
self.async_event_port_stopped(port_id)
@@ -530,19 +637,16 @@ class CTRexStatelessClient(object):
# unknown event - ignore
return
- if show_event:
- print format_text("\n" + ev, 'bold')
- ts = time.time()
- st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
- self.events.append("{0} - ".format(st) + format_text(ev, 'bold'))
+ self.add_event_log(ev, 'server', show_event)
def async_event_port_stopped (self, port_id):
self.ports[port_id].async_event_port_stopped()
def async_event_server_stopped (self):
- self.disconnect()
+ self.connected = False
+
def get_events (self):
return self.events
@@ -552,6 +656,24 @@ class CTRexStatelessClient(object):
############# helper functions section ##############
+ # measure time for functions
+ def timing(f):
+ def wrap(*args):
+ time1 = time.time()
+ ret = f(*args)
+
+ # don't want to print on error
+ if ret.bad():
+ return ret
+
+ delta = time.time() - time1
+ print format_time(delta) + "\n"
+
+ return ret
+
+ return wrap
+
+
def validate_port_list(self, port_id_list):
if not isinstance(port_id_list, list):
print type(port_id_list)
@@ -586,13 +708,19 @@ class CTRexStatelessClient(object):
# connection sequence
def connect(self):
+ # clear this flag
self.connected = False
- # connect
+ # connect sync channel
rc, data = self.comm_link.connect()
if not rc:
return RC_ERR(data)
+ # connect async channel
+ rc, data = self.async_client.connect()
+ if not rc:
+ return RC_ERR(data)
+
# version
rc, data = self.transmit("get_version")
if not rc:
@@ -602,7 +730,6 @@ class CTRexStatelessClient(object):
self.global_stats.server_version = data
# cache system info
- # self.get_system_info(refresh=True)
rc, data = self.transmit("get_system_info")
if not rc:
return RC_ERR(data)
@@ -619,7 +746,9 @@ class CTRexStatelessClient(object):
for port_id in xrange(self.get_port_count()):
speed = self.system_info['ports'][port_id]['speed']
driver = self.system_info['ports'][port_id]['driver']
- self.ports[port_id] = Port(port_id, speed, driver, self.user, self.transmit)
+
+ self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link)
+
# acquire all ports
rc = self.acquire()
@@ -639,11 +768,19 @@ class CTRexStatelessClient(object):
def disconnect(self):
- self.connected = False
self.comm_link.disconnect()
+ self.async_client.disconnect()
return RC_OK()
+ def on_async_dead (self):
+ if self.connected:
+ msg = 'lost connection to server'
+ self.add_event_log(msg, 'local', True)
+ self.connected = False
+
+ def on_async_alive (self):
+ pass
########### cached queries (no server traffic) ###########
@@ -666,8 +803,8 @@ class CTRexStatelessClient(object):
else:
return port_ids
- def get_stats_async(self):
- return self._async_client.get_stats()
+ def get_stats_async (self):
+ return self.async_client.get_stats()
def get_connection_port (self):
return self.comm_link.port
@@ -750,15 +887,16 @@ class CTRexStatelessClient(object):
return rc
+
def add_stream_pack(self, stream_pack_list, port_id_list = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
- for stream_pack in stream_pack_list:
- rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list))
-
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].add_streams(stream_pack_list))
+
return rc
@@ -855,6 +993,17 @@ class CTRexStatelessClient(object):
return rc
+ def validate (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].validate())
+
+ return rc
+
+
def get_port_stats(self, port_id=None):
pass
@@ -866,9 +1015,12 @@ class CTRexStatelessClient(object):
return self.comm_link.transmit(method_name, params)
+ def transmit_batch(self, batch_list):
+ return self.comm_link.transmit_batch(batch_list)
######################### Console (high level) API #########################
+ @timing
def cmd_ping(self):
rc = self.ping()
rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port()))
@@ -924,7 +1076,7 @@ class CTRexStatelessClient(object):
active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
if not active_ports:
- msg = "No active traffic on porvided ports"
+ msg = "No active traffic on provided ports"
print format_text(msg, 'bold')
return RC_ERR(msg)
@@ -948,17 +1100,20 @@ class CTRexStatelessClient(object):
rc = self.update_traffic(mult, active_ports)
rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
- return RC_OK()
+ return rc
+ # clear stats
def cmd_clear(self, port_id_list):
+
for port_id in port_id_list:
self.ports[port_id].clear_stats()
+
self.global_stats.clear_stats()
+
return RC_OK()
+
# pause cmd
def cmd_pause (self, port_id_list):
@@ -972,10 +1127,8 @@ class CTRexStatelessClient(object):
rc = self.pause_traffic(active_ports)
rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
+ return rc
- return RC_OK()
# resume cmd
@@ -991,14 +1144,11 @@ class CTRexStatelessClient(object):
rc = self.resume_traffic(active_ports)
rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
-
- return RC_OK()
+ return rc
# start cmd
- def cmd_start (self, port_id_list, stream_list, mult, force, duration):
+ def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry):
active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
@@ -1020,19 +1170,37 @@ class CTRexStatelessClient(object):
rc = self.add_stream_pack(stream_list.compiled, port_id_list)
- rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list))
+ rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list))
if rc.bad():
return rc
+ # when not on dry - start the traffic , otherwise validate only
+ if not dry:
+ rc = self.start_traffic(mult, duration, port_id_list)
+ rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
+
+ return rc
+ else:
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list))
+
+ if rc.bad():
+ return rc
+
+ # show a profile on one port for illustration
+ self.ports[port_id_list[0]].print_profile(mult, duration)
- # finally, start the traffic
- rc = self.start_traffic(mult, duration, port_id_list)
- rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
return rc
- return RC_OK()
+ # validate port(s) profile
+ def cmd_validate (self, port_id_list):
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating streams on port(s) {0}:".format(port_id_list))
+ return rc
+
+
+ # stats
def cmd_stats(self, port_id_list, stats_mask=set()):
stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask)
@@ -1043,6 +1211,7 @@ class CTRexStatelessClient(object):
############## High Level API With Parser ################
+ @timing
def cmd_start_line (self, line):
'''Start selected traffic in specified ports on TRex\n'''
# define a parser
@@ -1054,13 +1223,18 @@ class CTRexStatelessClient(object):
parsing_opts.FORCE,
parsing_opts.STREAM_FROM_PATH_OR_FILE,
parsing_opts.DURATION,
- parsing_opts.MULTIPLIER)
+ parsing_opts.MULTIPLIER_STRICT,
+ parsing_opts.DRY_RUN)
opts = parser.parse_args(line.split())
if opts is None:
return RC_ERR("bad command line parameters")
+
+ if opts.dry:
+ print format_text("\n*** DRY RUN ***", 'bold')
+
if opts.db:
stream_list = self.streams_db.get_stream_pack(opts.db)
rc = RC(stream_list != None)
@@ -1078,12 +1252,13 @@ class CTRexStatelessClient(object):
# total has no meaning with percentage - its linear
- if opts.total and (mult['type'] != 'percentage'):
+ if opts.total and (opts.mult['type'] != 'percentage'):
# if total was set - divide it between the ports
- opts.mult['max'] = opts.mult['max'] / len(opts.ports)
+ opts.mult['value'] = opts.mult['value'] / len(opts.ports)
- return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration)
+ return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry)
+ @timing
def cmd_resume_line (self, line):
'''Resume active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1097,6 +1272,8 @@ class CTRexStatelessClient(object):
return self.cmd_resume(opts.ports)
+
+ @timing
def cmd_stop_line (self, line):
'''Stop active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1110,6 +1287,8 @@ class CTRexStatelessClient(object):
return self.cmd_stop(opts.ports)
+
+ @timing
def cmd_pause_line (self, line):
'''Pause active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1123,6 +1302,8 @@ class CTRexStatelessClient(object):
return self.cmd_pause(opts.ports)
+
+ @timing
def cmd_update_line (self, line):
'''Update port(s) speed currently active\n'''
parser = parsing_opts.gen_parser(self,
@@ -1139,14 +1320,15 @@ class CTRexStatelessClient(object):
# total has no meaning with percentage - its linear
if opts.total and (opts.mult['type'] != 'percentage'):
# if total was set - divide it between the ports
- opts.mult['max'] = opts.mult['max'] / len(opts.ports)
+ opts.mult['value'] = opts.mult['value'] / len(opts.ports)
return self.cmd_update(opts.ports, opts.mult)
-
+ @timing
def cmd_reset_line (self, line):
return self.cmd_reset()
+
def cmd_clear_line (self, line):
'''Clear cached local statistics\n'''
# define a parser
@@ -1161,6 +1343,7 @@ class CTRexStatelessClient(object):
return RC_ERR("bad command line parameters")
return self.cmd_clear(opts.ports)
+
def cmd_stats_line (self, line):
'''Fetch statistics from TRex server by port\n'''
# define a parser
@@ -1180,30 +1363,64 @@ class CTRexStatelessClient(object):
if not mask:
# set to show all stats if no filter was given
mask = trex_stats.ALL_STATS_OPTS
+
# get stats objects, as dictionary
stats = self.cmd_stats(opts.ports, mask)
# print stats to screen
for stat_type, stat_data in stats.iteritems():
text_tables.print_table_with_header(stat_data.text_table, stat_type)
- return
-
- # if opts.db:
- # stream_list = self.streams_db.get_stream_pack(opts.db)
- # rc = RC(stream_list != None)
- # rc.annotate("Load stream pack (from DB):")
- # if rc.bad():
- # return RC_ERR("Failed to load stream pack")
- #
- # else:
- # # load streams from file
- # stream_list = self.streams_db.load_yaml_file(opts.file[0])
- # rc = RC(stream_list != None)
- # rc.annotate("Load stream pack (from file):")
- # if stream_list == None:
- # return RC_ERR("Failed to load stream pack")
- #
- #
- # return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration)
+
+ return RC_OK()
+
+
+
+
+ @timing
+ def cmd_pause_line (self, line):
+ '''Pause active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "pause",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_pause(opts.ports)
+
+
+ @timing
+ def cmd_resume_line (self, line):
+ '''Resume active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "resume",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_resume(opts.ports)
+
+
+ @timing
+ def cmd_validate_line (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ parser = parsing_opts.gen_parser(self,
+ "validate",
+ self.cmd_validate_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ rc = self.cmd_validate(opts.ports)
+ return rc
+
def cmd_exit_line (self, line):
print format_text("Exiting\n", 'bold')
@@ -1280,6 +1497,7 @@ class CTRexStatelessClient(object):
return True
+
#################################
# ------ private methods ------ #
@staticmethod
@@ -1354,4 +1572,3 @@ class CTRexStatelessClient(object):
if __name__ == "__main__":
pass
-
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index dd208da4..f55d7798 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -37,7 +37,7 @@ class BatchMessage(object):
msg = json.dumps(self.batch_list)
- rc, resp_list = self.rpc_client.send_raw_msg(msg, block = False)
+ rc, resp_list = self.rpc_client.send_raw_msg(msg)
if len(self.batch_list) == 1:
return CmdResponse(True, [CmdResponse(rc, resp_list)])
else:
@@ -130,11 +130,10 @@ class JsonRpcClient(object):
self.socket.send(msg)
break
except zmq.Again:
- sleep(0.1)
tries += 1
if tries > 10:
self.disconnect()
- return CmdResponse(False, "Failed to send message to server")
+ return CmdResponse(False, "*** [RPC] - Failed to send message to server")
tries = 0
@@ -143,11 +142,10 @@ class JsonRpcClient(object):
response = self.socket.recv()
break
except zmq.Again:
- sleep(0.1)
tries += 1
if tries > 10:
self.disconnect()
- return CmdResponse(False, "Failed to get server response")
+ return CmdResponse(False, "*** [RPC] - Failed to get server response")
self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
@@ -223,8 +221,8 @@ class JsonRpcClient(object):
except zmq.error.ZMQError as e:
return False, "ZMQ Error: Bad server or port name: " + str(e)
- self.socket.setsockopt(zmq.SNDTIMEO, 5)
- self.socket.setsockopt(zmq.RCVTIMEO, 5)
+ self.socket.setsockopt(zmq.SNDTIMEO, 1000)
+ self.socket.setsockopt(zmq.RCVTIMEO, 1000)
self.connected = True
diff --git a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py
index 6c348467..7ac9e312 100755
--- a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py
@@ -10,22 +10,23 @@ ArgumentGroup = namedtuple('ArgumentGroup', ['type', 'args', 'options'])
# list of available parsing options
MULTIPLIER = 1
-PORT_LIST = 2
-ALL_PORTS = 3
-PORT_LIST_WITH_ALL = 4
-FILE_PATH = 5
-FILE_FROM_DB = 6
-SERVER_IP = 7
-STREAM_FROM_PATH_OR_FILE = 8
-DURATION = 9
-FORCE = 10
-
-TOTAL = 11
-
-GLOBAL_STATS = 12
-PORT_STATS = 13
-PORT_STATUS = 14
-STATS_MASK = 15
+MULTIPLIER_STRICT = 2
+PORT_LIST = 3
+ALL_PORTS = 4
+PORT_LIST_WITH_ALL = 5
+FILE_PATH = 6
+FILE_FROM_DB = 7
+SERVER_IP = 8
+STREAM_FROM_PATH_OR_FILE = 9
+DURATION = 10
+FORCE = 11
+DRY_RUN = 12
+TOTAL = 13
+
+GLOBAL_STATS = 14
+PORT_STATS = 15
+PORT_STATUS = 16
+STATS_MASK = 17
# list of ArgumentGroup types
MUTEX = 1
@@ -60,10 +61,15 @@ match_multiplier_help = """Multiplier should be passed in the following format:
will provide a percentage of the line rate. examples
: '-m 10', '-m 10kbps', '-m 10mpps', '-m 23%%' """
-def match_multiplier(val):
- '''match some val against multiplier shortcut inputs '''
+def match_multiplier_common(val, strict_abs = True):
- match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val)
+ # on strict absolute we do not allow +/-
+ if strict_abs:
+ match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val)
+ op = None
+ else:
+ match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)([\+\-])?$", val)
+ op = match.group(4)
result = {}
@@ -71,44 +77,53 @@ def match_multiplier(val):
value = float(match.group(1))
unit = match.group(3)
+
+
# raw type (factor)
if not unit:
result['type'] = 'raw'
- result['max'] = value
+ result['value'] = value
elif unit == 'bps':
- result['type'] = 'max_bps'
- result['max'] = value
+ result['type'] = 'bps'
+ result['value'] = value
elif unit == 'kbps':
- result['type'] = 'max_bps'
- result['max'] = value * 1000
+ result['type'] = 'bps'
+ result['value'] = value * 1000
elif unit == 'mbps':
- result['type'] = 'max_bps'
- result['max'] = value * 1000 * 1000
+ result['type'] = 'bps'
+ result['value'] = value * 1000 * 1000
elif unit == 'gbps':
- result['type'] = 'max_bps'
- result['max'] = value * 1000 * 1000 * 1000
+ result['type'] = 'bps'
+ result['value'] = value * 1000 * 1000 * 1000
elif unit == 'pps':
- result['type'] = 'max_pps'
- result['max'] = value
+ result['type'] = 'pps'
+ result['value'] = value
elif unit == "kpps":
- result['type'] = 'max_pps'
- result['max'] = value * 1000
+ result['type'] = 'pps'
+ result['value'] = value * 1000
elif unit == "mpps":
- result['type'] = 'max_pps'
- result['max'] = value * 1000 * 1000
+ result['type'] = 'pps'
+ result['value'] = value * 1000 * 1000
elif unit == "%":
- # will be translated by the port object
result['type'] = 'percentage'
- result['max'] = value
+ result['value'] = value
+
+
+ if op == "+":
+ result['op'] = "add"
+ elif op == "-":
+ result['op'] = "sub"
+ else:
+ result['op'] = "abs"
return result
@@ -116,6 +131,13 @@ def match_multiplier(val):
raise argparse.ArgumentTypeError(match_multiplier_help)
+def match_multiplier(val):
+ '''match some val against multiplier shortcut inputs '''
+ return match_multiplier_common(val, strict_abs = False)
+
+def match_multiplier_strict(val):
+ '''match some val against multiplier shortcut inputs '''
+ return match_multiplier_common(val, strict_abs = True)
def is_valid_file(filename):
if not os.path.isfile(filename):
@@ -127,9 +149,14 @@ def is_valid_file(filename):
OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
{'help': match_multiplier_help,
'dest': "mult",
- 'default': {'type':'raw', 'max':1},
+ 'default': {'type':'raw', 'value':1, 'op': 'abs'},
'type': match_multiplier}),
+ MULTIPLIER_STRICT: ArgumentPack(['-m', '--multiplier'],
+ {'help': match_multiplier_help,
+ 'dest': "mult",
+ 'default': {'type':'raw', 'value':1, 'op': 'abs'},
+ 'type': match_multiplier_strict}),
TOTAL: ArgumentPack(['-t', '--total'],
{'help': "traffic will be divided between all ports specified",
@@ -177,6 +204,12 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
{'metavar': 'SERVER',
'help': "server IP"}),
+ DRY_RUN: ArgumentPack(['-n', '--dry'],
+ {'action': 'store_true',
+ 'dest': 'dry',
+ 'default': False,
+ 'help': "Dry run - no traffic will be injected"}),
+
GLOBAL_STATS: ArgumentPack(['-g'],
{'action': 'store_true',
'help': "Fetch only global statistics"}),
@@ -189,6 +222,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
{'action': 'store_true',
'help': "Fetch only port status data"}),
+
# advanced options
PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST,
ALL_PORTS],
diff --git a/scripts/automation/trex_control_plane/common/text_opts.py b/scripts/automation/trex_control_plane/common/text_opts.py
index 06c2c056..5a86149c 100755
--- a/scripts/automation/trex_control_plane/common/text_opts.py
+++ b/scripts/automation/trex_control_plane/common/text_opts.py
@@ -19,6 +19,50 @@ TEXT_CODES = {'bold': {'start': '\x1b[1m',
'end': '\x1b[24m'}}
+def format_num (size, suffix = ""):
+ for unit in ['','K','M','G','T','P']:
+ if abs(size) < 1000.0:
+ return "%3.2f %s%s" % (size, unit, suffix)
+ size /= 1000.0
+
+ return "NaN"
+
+def format_time (t_sec):
+ if t_sec < 0:
+ return "infinite"
+
+ if t_sec < 1:
+ # low numbers
+ for unit in ['ms', 'usec', 'ns']:
+ t_sec *= 1000.0
+ if t_sec >= 1.0:
+ return '{:,.2f} [{:}]'.format(t_sec, unit)
+
+ return "NaN"
+
+ else:
+ # seconds
+ if t_sec < 60.0:
+ return '{:,.2f} [{:}]'.format(t_sec, 'sec')
+
+ # minutes
+ t_sec /= 60.0
+ if t_sec < 60.0:
+ return '{:,.2f} [{:}]'.format(t_sec, 'minutes')
+
+ # hours
+ t_sec /= 60.0
+ if t_sec < 24.0:
+ return '{:,.2f} [{:}]'.format(t_sec, 'hours')
+
+ # days
+ t_sec /= 24.0
+ return '{:,.2f} [{:}]'.format(t_sec, 'days')
+
+
+def format_percentage (size):
+ return "%0.2f %%" % (size)
+
def bold(text):
return text_attribute(text, 'bold')
diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py
index bb4c72ca..89de7286 100755
--- a/scripts/automation/trex_control_plane/common/trex_streams.py
+++ b/scripts/automation/trex_control_plane/common/trex_streams.py
@@ -14,14 +14,26 @@ StreamPack = namedtuple('StreamPack', ['stream_id', 'stream'])
class CStreamList(object):
def __init__(self):
- self.streams_list = {}
+ self.streams_list = OrderedDict()
self.yaml_loader = CTRexYAMLLoader(os.path.join(os.path.dirname(os.path.realpath(__file__)),
"rpc_defaults.yaml"))
+ def generate_numbered_name (self, name):
+ prefix = name.rstrip('01234567890')
+ suffix = name[len(prefix):]
+ if suffix == "":
+ n = "_1"
+ else:
+ n = int(suffix) + 1
+ return prefix + str(n)
+
def append_stream(self, name, stream_obj):
assert isinstance(stream_obj, CStream)
- if name in self.streams_list:
- raise NameError("A stream with this name already exists on this list.")
+
+ # if name exists simply add numbered suffix to it
+ while name in self.streams_list:
+ name = self.generate_numbered_name(name)
+
self.streams_list[name]=stream_obj
return name
@@ -70,6 +82,7 @@ class CStreamList(object):
stream_ids = {}
for idx, stream_name in enumerate(self.streams_list):
stream_ids[stream_name] = idx
+
# next, iterate over the streams and transform them from working with names to ids.
# with that build a new dict with old stream_name as the key, and StreamPack as the stored value
compiled_streams = {}
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index 9236ce98..e187c8c2 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -35,7 +35,8 @@ from common.text_opts import *
from client_utils.general_utils import user_input, get_current_user
from client_utils import parsing_opts
import trex_status
-
+import parsing_opts
+from functools import wraps
__version__ = "1.1"
@@ -129,6 +130,18 @@ class TRexConsole(TRexGeneralCmd):
################### internal section ########################
+ def verify_connected(f):
+ @wraps(f)
+ def wrap(*args):
+ inst = args[0]
+ if not inst.stateless_client.is_connected():
+ print format_text("\nNot connected to server\n", 'bold')
+ return
+
+ ret = f(*args)
+ return ret
+
+ return wrap
def get_console_identifier(self):
return "{context}_{server}".format(context=self.__class__.__name__,
@@ -142,6 +155,7 @@ class TRexConsole(TRexGeneralCmd):
self.__dict__[name] = getattr(self.trex_console, name)
def postcmd(self, stop, line):
+
if self.stateless_client.is_connected():
self.prompt = "TRex > "
else:
@@ -208,9 +222,9 @@ class TRexConsole(TRexGeneralCmd):
####################### shell commands #######################
+ @verify_connected
def do_ping (self, line):
'''Ping the server\n'''
-
rc = self.stateless_client.cmd_ping()
if rc.bad():
return
@@ -300,6 +314,7 @@ class TRexConsole(TRexGeneralCmd):
if (l > 2) and (s[l - 2] in file_flags):
return TRexConsole.tree_autocomplete(s[l - 1])
+ @verify_connected
def do_start(self, line):
'''Start selected traffic in specified port(s) on TRex\n'''
@@ -310,42 +325,60 @@ class TRexConsole(TRexGeneralCmd):
self.do_start("-h")
############# stop
+ @verify_connected
def do_stop(self, line):
'''stops port(s) transmitting traffic\n'''
+
self.stateless_client.cmd_stop_line(line)
def help_stop(self):
self.do_stop("-h")
############# update
+ @verify_connected
def do_update(self, line):
'''update speed of port(s)currently transmitting traffic\n'''
+
self.stateless_client.cmd_update_line(line)
def help_update (self):
self.do_update("-h")
############# pause
+ @verify_connected
def do_pause(self, line):
'''pause port(s) transmitting traffic\n'''
+
self.stateless_client.cmd_pause_line(line)
############# resume
+ @verify_connected
def do_resume(self, line):
'''resume port(s) transmitting traffic\n'''
+
self.stateless_client.cmd_resume_line(line)
########## reset
+ @verify_connected
def do_reset (self, line):
'''force stop all ports\n'''
- self.stateless_client.cmd_reset()
+ self.stateless_client.cmd_reset_line(line)
+
+
+ ######### validate
+ @verify_connected
+ def do_validate (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ self.stateless_client.cmd_validate_line(line)
+
def do_stats(self, line):
'''Fetch statistics from TRex server by port\n'''
self.stateless_client.cmd_stats_line(line)
- pass
+
def help_stats(self):
self.do_stats("-h")
@@ -387,13 +420,10 @@ class TRexConsole(TRexGeneralCmd):
print format_text("\n\nEvent log was cleared\n\n")
# tui
+ @verify_connected
def do_tui (self, line):
'''Shows a graphical console\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.do_verbose('off')
trex_status.show_trex_status(self.stateless_client)
diff --git a/scripts/automation/trex_control_plane/server/extended_daemon_runner.py b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py
index 734fa22e..7bc25aac 100755
--- a/scripts/automation/trex_control_plane/server/extended_daemon_runner.py
+++ b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py
@@ -19,7 +19,6 @@ def daemonize_parser(parser_obj, action_funcs, help_menu):
parser_obj.usage = None
parser_obj.add_argument("action", choices=action_funcs,
action="store", help=help_menu)
- return
class ExtendedDaemonRunner(runner.DaemonRunner):
@@ -76,7 +75,12 @@ class ExtendedDaemonRunner(runner.DaemonRunner):
self.app = app
self.daemon_context = daemon.DaemonContext()
self.daemon_context.stdin = open(app.stdin_path, 'rt')
- self.daemon_context.stdout = open(app.stdout_path, 'w+t')
+ try:
+ self.daemon_context.stdout = open(app.stdout_path, 'w+t')
+ except IOError as err:
+ # catch 'tty' error when launching server from remote location
+ app.stdout_path = "/dev/null"
+ self.daemon_context.stdout = open(app.stdout_path, 'w+t')
self.daemon_context.stderr = open(app.stderr_path,
'a+t', buffering=0)
diff --git a/scripts/automation/trex_control_plane/server/trex_daemon_server.py b/scripts/automation/trex_control_plane/server/trex_daemon_server.py
index ec07cb8a..9784d42a 100755
--- a/scripts/automation/trex_control_plane/server/trex_daemon_server.py
+++ b/scripts/automation/trex_control_plane/server/trex_daemon_server.py
@@ -57,15 +57,7 @@ def main ():
print "Launching user must have sudo privileges in order to run TRex daemon.\nTerminating daemon process."
exit(-1)
- try:
- daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser)
- except IOError as err:
- # catch 'tty' error when launching server from remote location
- if err.errno == errno.ENXIO:
- trex_app.stdout_path = "/dev/null"
- daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser)
- else:
- raise
+ daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser)
#This ensures that the logger file handle does not get closed during daemonization
daemon_runner.daemon_context.files_preserve=[handler.stream]