summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py116
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_client.py44
-rw-r--r--scripts/automation/trex_control_plane/client/trex_port.py430
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py946
4 files changed, 965 insertions, 571 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 8b274134..459d6915 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -19,6 +19,7 @@ import re
from common.trex_stats import *
from common.trex_streams import *
+from common.trex_types import *
# basic async stats class
class CTRexAsyncStats(object):
@@ -152,38 +153,115 @@ class CTRexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, server, port, stateless_client):
+ def __init__ (self, server, port, stateless_client, prn_func = None):
self.port = port
self.server = server
self.stateless_client = stateless_client
+ self.prn_func = prn_func
self.raw_snapshot = {}
self.stats = CTRexAsyncStatsManager()
+ self.last_data_recv_ts = 0
+
+ self.connected = False
+
+ # connects the async channel
+ def connect (self):
+
+ if self.connected:
+ self.disconnect()
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
+ msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+
+ if self.prn_func:
+ self.prn_func(msg)
+ else:
+ print msg
+
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+
+ # before running the thread - mark as active
self.active = True
- self.t = threading.Thread(target= self.run)
+ self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
- def run(self):
+ self.connected = True
- # Socket to talk to server
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
+ # wait for data streaming from the server
+ timeout = time.time() + 5
+ while not self.is_alive():
+ time.sleep(0.01)
+ if time.time() > timeout:
+ self.disconnect()
+ return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr)
+
+ return RC_OK()
+
+
+ # disconnect
+ def disconnect (self):
+ if not self.connected:
+ return
+
+ # signal that the context was destroyed (exit the thread loop)
+ self.context.term()
+
+ # mark for join and join
+ self.active = False
+ self.t.join()
+
+ # done
+ self.connected = False
+
+ # thread function
+ def _run (self):
+
+
+ # socket must be created on the same thread
self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
+ self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+
+ got_data = False
while self.active:
- line = self.socket.recv_string()
+ try:
+
+ line = self.socket.recv_string()
+ self.last_data_recv_ts = time.time()
+
+ # signal once
+ if not got_data:
+ self.stateless_client.on_async_alive()
+ got_data = True
+
+
+ # got a timeout - mark as not alive and retry
+ except zmq.Again:
+
+ # signal once
+ if got_data:
+ self.stateless_client.on_async_dead()
+ got_data = False
+
+ continue
+
+ except zmq.ContextTerminated:
+ # outside thread signaled us to exit
+ break
+
msg = json.loads(line)
name = msg['name']
@@ -193,7 +271,19 @@ class CTRexAsyncClient():
self.__dispatch(name, type, data)
- def get_stats(self):
+
+ # closing of socket must be from the same thread
+ self.socket.close(linger = 0)
+
+
+ # did we get info for the last 3 seconds ?
+ def is_alive (self):
+ if self.last_data_recv_ts == None:
+ return False
+
+ return ( (time.time() - self.last_data_recv_ts) < 3 )
+
+ def get_stats (self):
return self.stats
def get_raw_snapshot (self):
@@ -203,7 +293,6 @@ class CTRexAsyncClient():
def __dispatch (self, name, type, data):
# stats
if name == "trex-global":
- # self.stats.update(data)
self.stateless_client.handle_async_stats_update(data)
# events
elif name == "trex-event":
@@ -212,10 +301,3 @@ class CTRexAsyncClient():
pass
- def stop (self):
- self.active = False
- self.t.join()
-
-
-if __name__ == "__main__":
- pass \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py
index 160abdec..5709b7a5 100755
--- a/scripts/automation/trex_control_plane/client/trex_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_client.py
@@ -88,7 +88,7 @@ class CTRexClient(object):
finally:
self.prompt_verbose_data()
- def start_trex (self, f, d, block_to_success = True, timeout = 30, user = None, **trex_cmd_options):
+ def start_trex (self, f, d, block_to_success = True, timeout = 40, user = None, trex_development = False, **trex_cmd_options):
"""
Request to start a TRex run on server.
@@ -104,7 +104,7 @@ class CTRexClient(object):
timeout : int
maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running'
- default value: **30**
+ default value: **40**
user : str
the identity of the the run issuer.
trex_cmd_options : key, val
@@ -125,13 +125,17 @@ class CTRexClient(object):
user = user or self.__default_user
try:
d = int(d)
- if d < 30: # specify a test should take at least 30 seconds long.
+ if d < 30 and not trex_development: # test duration should be at least 30 seconds, unless trex_development flag is specified.
raise ValueError
except ValueError:
raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.')
trex_cmd_options.update( {'f' : f, 'd' : d} )
-
+ if not trex_cmd_options.get('l'):
+ self.result_obj.latency_checked = False
+ if 'k' in trex_cmd_options:
+ timeout += int(trex_cmd_options['k']) # during 'k' seconds TRex stays in 'Starting' state
+
self.result_obj.clear_results()
try:
issue_time = time.time()
@@ -544,7 +548,7 @@ class CTRexClient(object):
Get TRex version details.
:return:
- Trex details (Version, User, Date, Uuid) as ordered dictionary
+ Trex details (Version, User, Date, Uuid, Git SHA) as ordered dictionary
:raises:
+ :exc:`trex_exceptions.TRexRequestDenied`, in case TRex version could not be determined.
@@ -556,9 +560,11 @@ class CTRexClient(object):
version_dict = OrderedDict()
result_lines = binascii.a2b_base64(self.server.get_trex_version()).split('\n')
for line in result_lines:
+ if not line:
+ continue
key, value = line.strip().split(':', 1)
version_dict[key.strip()] = value.strip()
- for key in ('Version', 'User', 'Date', 'Uuid'):
+ for key in ('Version', 'User', 'Date', 'Uuid', 'Git SHA'):
if key not in version_dict:
raise Exception('get_trex_version: got server response without key: {0}'.format(key))
return version_dict
@@ -767,6 +773,7 @@ class CTRexResult(object):
"""
self._history = deque(maxlen = max_history_size)
self.clear_results()
+ self.latency_checked = True
def __repr__(self):
return ("Is valid history? {arg}\n".format( arg = self.is_valid_hist() ) +
@@ -1032,18 +1039,19 @@ class CTRexResult(object):
self._done_warmup = True
# handle latency data
- latency_pre = "trex-latency"
- self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC
- # support old typo
- if self._max_latency is None:
- latency_pre = "trex-latecny"
- self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")
-
- self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC
- self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency)
-
- avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-")
- self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list)
+ if self.latency_checked:
+ latency_pre = "trex-latency"
+ self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")#None # TBC
+ # support old typo
+ if self._max_latency is None:
+ latency_pre = "trex-latecny"
+ self._max_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), ".*max-")
+
+ self._avg_latency = self.get_last_value("{latency}.data".format(latency = latency_pre), "avg-")#None # TBC
+ self._avg_latency = CTRexResult.__avg_all_and_rename_keys(self._avg_latency)
+
+ avg_win_latency_list = self.get_value_list("{latency}.data".format(latency = latency_pre), "avg-")
+ self._avg_window_latency = CTRexResult.__calc_latency_win_stats(avg_win_latency_list)
tx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_tx_pkts")
rx_pkts = CTRexResult.__get_value_by_path(latest_dump, "trex-global.data.m_total_rx_pkts")
diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py
new file mode 100644
index 00000000..54b4945e
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client/trex_port.py
@@ -0,0 +1,430 @@
+
+from collections import namedtuple
+from common.trex_types import *
+from common import trex_stats
+
+
+########## utlity ############
+def mult_to_factor (mult, max_bps, max_pps, line_util):
+ if mult['type'] == 'raw':
+ return mult['value']
+
+ if mult['type'] == 'bps':
+ return mult['value'] / max_bps
+
+ if mult['type'] == 'pps':
+ return mult['value'] / max_pps
+
+ if mult['type'] == 'percentage':
+ return mult['value'] / line_util
+
+
+# describes a single port
+class Port(object):
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+ PortState = namedtuple('PortState', ['state_id', 'state_name'])
+ STATES_MAP = {STATE_DOWN: "DOWN",
+ STATE_IDLE: "IDLE",
+ STATE_STREAMS: "IDLE",
+ STATE_TX: "ACTIVE",
+ STATE_PAUSE: "PAUSE"}
+
+
+ def __init__ (self, port_id, speed, driver, user, comm_link, session_id):
+ self.port_id = port_id
+ self.state = self.STATE_IDLE
+ self.handler = None
+ self.comm_link = comm_link
+ self.transmit = comm_link.transmit
+ self.transmit_batch = comm_link.transmit_batch
+ self.user = user
+ self.driver = driver
+ self.speed = speed
+ self.streams = {}
+ self.profile = None
+ self.session_id = session_id
+
+ self.port_stats = trex_stats.CPortStats(self)
+
+
+ def err(self, msg):
+ return RC_ERR("port {0} : {1}".format(self.port_id, msg))
+
+ def ok(self, data = "ACK"):
+ return RC_OK(data)
+
+ def get_speed_bps (self):
+ return (self.speed * 1000 * 1000 * 1000)
+
+ # take the port
+ def acquire(self, force = False):
+ params = {"port_id": self.port_id,
+ "user": self.user,
+ "session_id": self.session_id,
+ "force": force}
+
+ command = RpcCmdData("acquire", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.good():
+ self.handler = rc.data()
+ return self.ok()
+ else:
+ return self.err(rc.err())
+
+ # release the port
+ def release(self):
+ params = {"port_id": self.port_id,
+ "handler": self.handler}
+
+ command = RpcCmdData("release", params)
+ rc = self.transmit(command.method, command.params)
+ self.handler = None
+
+ if rc.good():
+ return self.ok()
+ else:
+ return self.err(rc.err())
+
+ def is_acquired(self):
+ return (self.handler != None)
+
+ def is_active(self):
+ return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
+
+ def is_transmitting (self):
+ return (self.state == self.STATE_TX)
+
+ def is_paused (self):
+ return (self.state == self.STATE_PAUSE)
+
+
+ def sync(self):
+ params = {"port_id": self.port_id}
+
+ command = RpcCmdData("get_port_status", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ # sync the port
+ port_state = rc.data()['state']
+
+ if port_state == "DOWN":
+ self.state = self.STATE_DOWN
+ elif port_state == "IDLE":
+ self.state = self.STATE_IDLE
+ elif port_state == "STREAMS":
+ self.state = self.STATE_STREAMS
+ elif port_state == "TX":
+ self.state = self.STATE_TX
+ elif port_state == "PAUSE":
+ self.state = self.STATE_PAUSE
+ else:
+ raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
+
+ return self.ok()
+
+
+ # return TRUE if write commands
+ def is_port_writable (self):
+ # operations on port can be done on state idle or state streams
+ return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
+
+ # add stream to the port
+ def add_stream (self, stream_id, stream_obj):
+
+ if not self.is_port_writable():
+ return self.err("Please stop port before attempting to add streams")
+
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id,
+ "stream": stream_obj}
+
+ rc = self.transmit("add_stream", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ # add the stream
+ self.streams[stream_id] = stream_obj
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ # add multiple streams
+ def add_streams (self, streams_list):
+ batch = []
+
+ for stream in streams_list:
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream.stream_id,
+ "stream": stream.stream}
+
+ cmd = RpcCmdData('add_stream', params)
+ batch.append(cmd)
+
+ rc = self.transmit_batch(batch)
+ if rc.bad():
+ return self.err(rc.err())
+
+ # validate that every action succeeded
+
+ # add the stream
+ for stream in streams_list:
+ self.streams[stream.stream_id] = stream.stream
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ # remove stream from port
+ def remove_stream (self, stream_id):
+
+ if not stream_id in self.streams:
+ return self.err("stream {0} does not exists".format(stream_id))
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id}
+
+
+ rc = self.transmit("remove_stream", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ self.streams[stream_id] = None
+
+ self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE
+
+ return self.ok()
+
+ # remove all the streams
+ def remove_all_streams (self):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc = self.transmit("remove_all_streams", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ self.streams = {}
+
+ self.state = self.STATE_IDLE
+
+ return self.ok()
+
+ # get a specific stream
+ def get_stream (self, stream_id):
+ if stream_id in self.streams:
+ return self.streams[stream_id]
+ else:
+ return None
+
+ def get_all_streams (self):
+ return self.streams
+
+ # start traffic
+ def start (self, mul, duration):
+ if self.state == self.STATE_DOWN:
+ return self.err("Unable to start traffic - port is down")
+
+ if self.state == self.STATE_IDLE:
+ return self.err("Unable to start traffic - no streams attached to port")
+
+ if self.state == self.STATE_TX:
+ return self.err("Unable to start traffic - port is already transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul,
+ "duration": duration}
+
+ rc = self.transmit("start_traffic", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+ # stop traffic
+ # with force ignores the cached state and sends the command
+ def stop (self, force = False):
+
+ if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc = self.transmit("stop_traffic", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ # only valid state after stop
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ def pause (self):
+
+ if (self.state != self.STATE_TX) :
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc = self.transmit("pause_traffic", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ # only valid state after stop
+ self.state = self.STATE_PAUSE
+
+ return self.ok()
+
+
+ def resume (self):
+
+ if (self.state != self.STATE_PAUSE) :
+ return self.err("port is not in pause mode")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc = self.transmit("resume_traffic", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ # only valid state after stop
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+
+ def update (self, mul):
+ if (self.state != self.STATE_TX) :
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul}
+
+ rc = self.transmit("update_traffic", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+
+ def validate (self):
+
+ if (self.state == self.STATE_DOWN):
+ return self.err("port is down")
+
+ if (self.state == self.STATE_IDLE):
+ return self.err("no streams attached to port")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc = self.transmit("validate", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ self.profile = rc.data()
+
+ return self.ok()
+
+ def get_profile (self):
+ return self.profile
+
+
+ def print_profile (self, mult, duration):
+ if not self.get_profile():
+ return
+
+ rate = self.get_profile()['rate']
+ graph = self.get_profile()['graph']
+
+ print format_text("Profile Map Per Port\n", 'underline', 'bold')
+
+ factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util'])
+
+ print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"),
+ format_num(rate['max_bps'] * factor, suffix = "bps"))
+
+ print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
+ format_num(rate['max_pps'] * factor, suffix = "pps"),)
+
+ print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100),
+ format_percentage(rate['max_line_util'] * factor * 100))
+
+
+ # duration
+ exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
+ exp_time_factor_sec = exp_time_base_sec / factor
+
+ # user configured a duration
+ if duration > 0:
+ if exp_time_factor_sec > 0:
+ exp_time_factor_sec = min(exp_time_factor_sec, duration)
+ else:
+ exp_time_factor_sec = duration
+
+
+ print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec),
+ format_time(exp_time_factor_sec))
+ print "\n"
+
+
+ def get_port_state_name(self):
+ return self.STATES_MAP.get(self.state, "Unknown")
+
+ ################# stats handler ######################
+ def generate_port_stats(self):
+ return self.port_stats.generate_stats()
+ pass
+
+ def generate_port_status(self):
+ return {"port-type": self.driver,
+ "maximum": "{speed} Gb/s".format(speed=self.speed),
+ "port-status": self.get_port_state_name()
+ }
+
+ def clear_stats(self):
+ return self.port_stats.clear_stats()
+
+ def invalidate_stats(self):
+ return self.port_stats.invalidate()
+
+
+ ################# events handler ######################
+ def async_event_port_stopped (self):
+ self.state = self.STATE_STREAMS
+
+
+ def async_event_port_started (self):
+ self.state = self.STATE_TX
+
+
+ def async_event_port_paused (self):
+ self.state = self.STATE_PAUSE
+
+
+ def async_event_port_resumed (self):
+ self.state = self.STATE_TX
+
+ def async_event_forced_acquired (self):
+ self.handler = None
+
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 748817da..58fa53c9 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -14,433 +14,40 @@ import json
from common.trex_streams import *
from collections import namedtuple
from common.text_opts import *
-# import trex_stats
from common import trex_stats
from client_utils import parsing_opts, text_tables
import time
import datetime
import re
+import random
+from trex_port import Port
+from common.trex_types import *
from trex_async_client import CTRexAsyncClient
-RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
-class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
- __slots__ = ()
- def __str__(self):
- return "{id:^3} - {msg} ({stat})".format(id=self.id,
- msg=self.msg,
- stat="success" if self.success else "fail")
-
-# simple class to represent complex return value
-class RC():
-
- def __init__ (self, rc = None, data = None):
- self.rc_list = []
-
- if (rc != None) and (data != None):
- tuple_rc = namedtuple('RC', ['rc', 'data'])
- self.rc_list.append(tuple_rc(rc, data))
-
- def add (self, rc):
- self.rc_list += rc.rc_list
-
- def good (self):
- return all([x.rc for x in self.rc_list])
-
- def bad (self):
- return not self.good()
-
- def data (self):
- return all([x.data if x.rc else "" for x in self.rc_list])
-
- def err (self):
- return all([x.data if not x.rc else "" for x in self.rc_list])
-
- def annotate (self, desc = None):
- if desc:
- print format_text('\n{:<40}'.format(desc), 'bold'),
-
- if self.bad():
- # print all the errors
- print ""
- for x in self.rc_list:
- if not x.rc:
- print format_text("\n{0}".format(x.data), 'bold')
-
- print ""
- print format_text("[FAILED]\n", 'red', 'bold')
-
-
- else:
- print format_text("[SUCCESS]\n", 'green', 'bold')
-
-
-def RC_OK():
- return RC(True, "")
-def RC_ERR(err):
- return RC(False, err)
-
-
-LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
-
-# describes a stream DB
-class CStreamsDB(object):
-
- def __init__(self):
- self.stream_packs = {}
-
- def load_yaml_file(self, filename):
-
- stream_pack_name = filename
- if stream_pack_name in self.get_loaded_streams_names():
- self.remove_stream_packs(stream_pack_name)
-
- stream_list = CStreamList()
- loaded_obj = stream_list.load_yaml(filename)
-
- try:
- compiled_streams = stream_list.compile_streams()
- rc = self.load_streams(stream_pack_name,
- LoadedStreamList(loaded_obj,
- [StreamPack(v.stream_id, v.stream.dump())
- for k, v in compiled_streams.items()]))
-
- except Exception as e:
- return None
-
- return self.get_stream_pack(stream_pack_name)
-
- def load_streams(self, name, LoadedStreamList_obj):
- if name in self.stream_packs:
- return False
- else:
- self.stream_packs[name] = LoadedStreamList_obj
- return True
-
- def remove_stream_packs(self, *names):
- removed_streams = []
- for name in names:
- removed = self.stream_packs.pop(name)
- if removed:
- removed_streams.append(name)
- return removed_streams
-
- def clear(self):
- self.stream_packs.clear()
-
- def get_loaded_streams_names(self):
- return self.stream_packs.keys()
-
- def stream_pack_exists (self, name):
- return name in self.get_loaded_streams_names()
-
- def get_stream_pack(self, name):
- if not self.stream_pack_exists(name):
- return None
- else:
- return self.stream_packs.get(name)
-
-
-# describes a single port
-class Port(object):
- STATE_DOWN = 0
- STATE_IDLE = 1
- STATE_STREAMS = 2
- STATE_TX = 3
- STATE_PAUSE = 4
- PortState = namedtuple('PortState', ['state_id', 'state_name'])
- STATES_MAP = {STATE_DOWN: "DOWN",
- STATE_IDLE: "IDLE",
- STATE_STREAMS: "STREAMS",
- STATE_TX: "ACTIVE",
- STATE_PAUSE: "PAUSE"}
-
-
- def __init__ (self, port_id, speed, driver, user, transmit):
- self.port_id = port_id
- self.state = self.STATE_IDLE
- self.handler = None
- self.transmit = transmit
- self.user = user
- self.driver = driver
- self.speed = speed
- self.streams = {}
- self.port_stats = trex_stats.CPortStats(self)
-
- def err(self, msg):
- return RC_ERR("port {0} : {1}".format(self.port_id, msg))
-
- def ok(self):
- return RC_OK()
-
- def get_speed_bps (self):
- return (self.speed * 1000 * 1000 * 1000)
-
- # take the port
- def acquire(self, force = False):
- params = {"port_id": self.port_id,
- "user": self.user,
- "force": force}
-
- command = RpcCmdData("acquire", params)
- rc = self.transmit(command.method, command.params)
- if rc.success:
- self.handler = rc.data
- return self.ok()
- else:
- return self.err(rc.data)
-
- # release the port
- def release(self):
- params = {"port_id": self.port_id,
- "handler": self.handler}
-
- command = RpcCmdData("release", params)
- rc = self.transmit(command.method, command.params)
- if rc.success:
- self.handler = rc.data
- return self.ok()
- else:
- return self.err(rc.data)
-
- def is_acquired(self):
- return (self.handler != None)
-
- def is_active(self):
- return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
-
- def sync(self, sync_data):
- self.handler = sync_data['handler']
- port_state = sync_data['state'].upper()
- if port_state == "DOWN":
- self.state = self.STATE_DOWN
- elif port_state == "IDLE":
- self.state = self.STATE_IDLE
- elif port_state == "STREAMS":
- self.state = self.STATE_STREAMS
- elif port_state == "TX":
- self.state = self.STATE_TX
- elif port_state == "PAUSE":
- self.state = self.STATE_PAUSE
- else:
- raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
-
- return self.ok()
-
-
- # return TRUE if write commands
- def is_port_writable (self):
- # operations on port can be done on state idle or state streams
- return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
-
- # add stream to the port
- def add_stream (self, stream_id, stream_obj):
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to add streams")
-
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream_id,
- "stream": stream_obj}
-
- rc, data = self.transmit("add_stream", params)
- if not rc:
- r = self.err(data)
- print r.good()
-
- # add the stream
- self.streams[stream_id] = stream_obj
-
- # the only valid state now
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
- # remove stream from port
- def remove_stream (self, stream_id):
-
- if not stream_id in self.streams:
- return self.err("stream {0} does not exists".format(stream_id))
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream_id}
-
-
- rc, data = self.transmit("remove_stream", params)
- if not rc:
- return self.err(data)
-
- self.streams[stream_id] = None
-
- return self.ok()
-
- # remove all the streams
- def remove_all_streams (self):
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
+class CTRexStatelessClient(object):
+ """docstring for CTRexStatelessClient"""
- rc, data = self.transmit("remove_all_streams", params)
- if not rc:
- return self.err(data)
+ # verbose levels
+ VERBOSE_QUIET = 0
+ VERBOSE_REGULAR = 1
+ VERBOSE_HIGH = 2
+
+ def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, quiet = False, virtual = False):
+ super(CTRexStatelessClient, self).__init__()
- self.streams = {}
+ self.user = username
- return self.ok()
+ self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func)
- # get a specific stream
- def get_stream (self, stream_id):
- if stream_id in self.streams:
- return self.streams[stream_id]
+ # default verbose level
+ if not quiet:
+ self.verbose = self.VERBOSE_REGULAR
else:
- return None
-
- def get_all_streams (self):
- return self.streams
-
-
- def process_mul (self, mul):
- # if percentage - translate
- if mul['type'] == 'percentage':
- mul['type'] = 'max_bps'
- mul['max'] = self.get_speed_bps() * (mul['max'] / 100)
-
-
- # start traffic
- def start (self, mul, duration):
- if self.state == self.STATE_DOWN:
- return self.err("Unable to start traffic - port is down")
-
- if self.state == self.STATE_IDLE:
- return self.err("Unable to start traffic - no streams attached to port")
-
- if self.state == self.STATE_TX:
- return self.err("Unable to start traffic - port is already transmitting")
-
- self.process_mul(mul)
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "mul": mul,
- "duration": duration}
-
- rc, data = self.transmit("start_traffic", params)
- if not rc:
- return self.err(data)
-
- self.state = self.STATE_TX
-
- return self.ok()
-
- # stop traffic
- # with force ignores the cached state and sends the command
- def stop (self, force = False):
-
- if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("stop_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
- def pause (self):
-
- if (self.state != self.STATE_TX) :
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
+ self.verbose = self.VERBOSE_QUIET
- rc, data = self.transmit("pause_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_PAUSE
-
- return self.ok()
-
-
- def resume (self):
-
- if (self.state != self.STATE_PAUSE) :
- return self.err("port is not in pause mode")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("resume_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_TX
-
- return self.ok()
-
-
- def update (self, mul):
- if (self.state != self.STATE_TX) :
- return self.err("port is not transmitting")
-
- self.process_mul(mul)
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "mul": mul}
-
- rc, data = self.transmit("update_traffic", params)
- if not rc:
- return self.err(data)
-
- return self.ok()
-
- def get_port_state_name(self):
- return self.STATES_MAP.get(self.state, "Unknown")
-
- ################# stats handler ######################
- def generate_port_stats(self):
- return self.port_stats.generate_stats()
- pass
-
- def generate_port_status(self):
- return {"port-type": self.driver,
- "maximum": "{speed} Gb/s".format(speed=self.speed),
- "port-status": self.get_port_state_name()
- }
-
- def clear_stats(self):
- return self.port_stats.clear_stats()
-
- ################# events handler ######################
- def async_event_port_stopped (self):
- self.state = self.STATE_STREAMS
-
-
-class CTRexStatelessClient(object):
- """docstring for CTRexStatelessClient"""
-
- def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
- super(CTRexStatelessClient, self).__init__()
- self.user = username
- self.system_info = None
- self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
- self.verbose = False
self.ports = {}
- # self._conn_handler = {}
- # self._active_ports = set()
self._connection_info = {"server": server,
"sync_port": sync_port,
"async_port": async_port}
@@ -448,7 +55,7 @@ class CTRexStatelessClient(object):
self.server_version = {}
self.__err_log = None
- self._async_client = CTRexAsyncClient(server, async_port, self)
+ self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func)
self.streams_db = CStreamsDB()
self.global_stats = trex_stats.CGlobalStats(self._connection_info,
@@ -457,12 +64,44 @@ class CTRexStatelessClient(object):
self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats,
self.ports)
+ self.events = []
+
+ self.session_id = random.getrandbits(32)
+ self.read_only = False
self.connected = False
- self.events = []
+
+
+ # returns the port object
+ def get_port (self, port_id):
+ return self.ports.get(port_id, None)
+
+
+ # connection server ip
+ def get_server_ip (self):
+ return self.comm_link.get_server()
+
+ # connection server port
+ def get_server_port (self):
+ return self.comm_link.get_port()
+
################# events handler ######################
-
+ def add_event_log (self, msg, ev_type, show = False):
+
+ if ev_type == "server":
+ prefix = "[server]"
+ elif ev_type == "local":
+ prefix = "[local]"
+
+ ts = time.time()
+ st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
+ self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold')))
+
+ if show:
+ self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))))
+
+
def handle_async_stats_update(self, dump_data):
global_stats = {}
port_stats = {}
@@ -490,59 +129,108 @@ class CTRexStatelessClient(object):
for port_id, data in port_stats.iteritems():
self.ports[port_id].port_stats.update(data)
+
+
def handle_async_event (self, type, data):
# DP stopped
- ev = "[event] - "
-
show_event = False
# port started
if (type == 0):
port_id = int(data['port_id'])
- ev += "Port {0} has started".format(port_id)
+ ev = "Port {0} has started".format(port_id)
+ self.async_event_port_started(port_id)
# port stopped
elif (type == 1):
port_id = int(data['port_id'])
- ev += "Port {0} has stopped".format(port_id)
+ ev = "Port {0} has stopped".format(port_id)
# call the handler
self.async_event_port_stopped(port_id)
- # server stopped
+ # port paused
elif (type == 2):
- ev += "Server has stopped"
- self.async_event_server_stopped()
- show_event = True
+ port_id = int(data['port_id'])
+ ev = "Port {0} has paused".format(port_id)
- # port finished traffic
+ # call the handler
+ self.async_event_port_paused(port_id)
+
+ # port resumed
elif (type == 3):
port_id = int(data['port_id'])
- ev += "Port {0} job done".format(port_id)
+ ev = "Port {0} has resumed".format(port_id)
+
+ # call the handler
+ self.async_event_port_resumed(port_id)
+
+ # port finished traffic
+ elif (type == 4):
+ port_id = int(data['port_id'])
+ ev = "Port {0} job done".format(port_id)
# call the handler
self.async_event_port_stopped(port_id)
show_event = True
+ # port was stolen...
+ elif (type == 5):
+ session_id = data['session_id']
+
+ # false alarm, its us
+ if session_id == self.session_id:
+ return
+
+ port_id = int(data['port_id'])
+ who = data['who']
+
+ ev = "Port {0} was forcely taken by '{1}'".format(port_id, who)
+
+ # call the handler
+ self.async_event_port_forced_acquired(port_id)
+ show_event = True
+
+ # server stopped
+ elif (type == 100):
+ ev = "Server has stopped"
+ self.async_event_server_stopped()
+ show_event = True
+
+
else:
# unknown event - ignore
return
- if show_event:
- print format_text("\n" + ev, 'bold')
- ts = time.time()
- st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
- self.events.append("{0} - ".format(st) + format_text(ev, 'bold'))
+ self.add_event_log(ev, 'server', show_event)
def async_event_port_stopped (self, port_id):
self.ports[port_id].async_event_port_stopped()
+
+ def async_event_port_started (self, port_id):
+ self.ports[port_id].async_event_port_started()
+
+
+ def async_event_port_paused (self, port_id):
+ self.ports[port_id].async_event_port_paused()
+
+
+ def async_event_port_resumed (self, port_id):
+ self.ports[port_id].async_event_port_resumed()
+
+
+ def async_event_port_forced_acquired (self, port_id):
+ self.ports[port_id].async_event_forced_acquired()
+ self.read_only = True
+
def async_event_server_stopped (self):
- self.disconnect()
+ self.connected = False
+
def get_events (self):
return self.events
@@ -552,6 +240,24 @@ class CTRexStatelessClient(object):
############# helper functions section ##############
+ # measure time for functions
+ def timing(f):
+ def wrap(*args):
+ time1 = time.time()
+ ret = f(*args)
+
+ # don't want to print on error
+ if ret.bad():
+ return ret
+
+ delta = time.time() - time1
+ print format_time(delta) + "\n"
+
+ return ret
+
+ return wrap
+
+
def validate_port_list(self, port_id_list):
if not isinstance(port_id_list, list):
print type(port_id_list)
@@ -584,66 +290,120 @@ class CTRexStatelessClient(object):
############ boot up section ################
# connection sequence
- def connect(self):
+ # mode can be RW - read / write, RWF - read write with force , RO - read only
+ def connect(self, mode = "RW"):
+
+ if self.is_connected():
+ self.disconnect()
+
+ # clear this flag
self.connected = False
- # connect
- rc, data = self.comm_link.connect()
- if not rc:
- return RC_ERR(data)
+ # connect sync channel
+ rc = self.comm_link.connect()
+ if rc.bad():
+ return rc
+
+ # connect async channel
+ rc = self.async_client.connect()
+ if rc.bad():
+ return rc
# version
- rc, data = self.transmit("get_version")
- if not rc:
- return RC_ERR(data)
+ rc = self.transmit("get_version")
+ if rc.bad():
+ return rc
- self.server_version = data
- self.global_stats.server_version = data
+ self.server_version = rc.data()
+ self.global_stats.server_version = rc.data()
# cache system info
- # self.get_system_info(refresh=True)
- rc, data = self.transmit("get_system_info")
- if not rc:
- return RC_ERR(data)
- self.system_info = data
+ rc = self.transmit("get_system_info")
+ if rc.bad():
+ return rc
+
+ self.system_info = rc.data()
# cache supported commands
- rc, data = self.transmit("get_supported_cmds")
- if not rc:
- return RC_ERR(data)
+ rc = self.transmit("get_supported_cmds")
+ if rc.bad():
+ return rc
- self.supported_cmds = data
+ self.supported_cmds = rc.data()
# create ports
for port_id in xrange(self.get_port_count()):
speed = self.system_info['ports'][port_id]['speed']
driver = self.system_info['ports'][port_id]['driver']
- self.ports[port_id] = Port(port_id, speed, driver, self.user, self.transmit)
- # acquire all ports
- rc = self.acquire()
- if rc.bad():
- return rc
+ self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id)
+
- rc = self.sync_with_server()
+ # sync the ports
+ rc = self.sync_ports()
if rc.bad():
return rc
- self.connected = True
+ # acquire all ports
+ if mode == "RW":
+ rc = self.acquire(force = False)
+
+ # fallback to read only if failed
+ if rc.bad():
+ rc.annotate(show_status = False)
+ print format_text("Switching to read only mode - only few commands will be available", 'bold')
+
+ self.release(self.get_acquired_ports())
+ self.read_only = True
+ else:
+ self.read_only = False
+
+ elif mode == "RWF":
+ rc = self.acquire(force = True)
+ if rc.bad():
+ return rc
+ self.read_only = False
+ elif mode == "RO":
+ # no acquire on read only
+ rc = RC_OK()
+ self.read_only = True
+
+
+
+ self.connected = True
return RC_OK()
+
+ def is_read_only (self):
+ return self.read_only
+
def is_connected (self):
return self.connected and self.comm_link.is_connected
def disconnect(self):
- self.connected = False
+ # release any previous acquired ports
+ if self.is_connected():
+ self.release(self.get_acquired_ports())
+
self.comm_link.disconnect()
+ self.async_client.disconnect()
+
+ self.connected = False
+
return RC_OK()
+ def on_async_dead (self):
+ if self.connected:
+ msg = 'lost connection to server'
+ self.add_event_log(msg, 'local', True)
+ self.connected = False
+
+ def on_async_alive (self):
+ pass
########### cached queries (no server traffic) ###########
@@ -666,8 +426,8 @@ class CTRexStatelessClient(object):
else:
return port_ids
- def get_stats_async(self):
- return self._async_client.get_stats()
+ def get_stats_async (self):
+ return self.async_client.get_stats()
def get_connection_port (self):
return self.comm_link.port
@@ -675,6 +435,9 @@ class CTRexStatelessClient(object):
def get_connection_ip (self):
return self.comm_link.server
+ def get_all_ports (self):
+ return [port_id for port_id, port_obj in self.ports.iteritems()]
+
def get_acquired_ports(self):
return [port_id
for port_id, port_obj in self.ports.iteritems()
@@ -685,36 +448,60 @@ class CTRexStatelessClient(object):
for port_id, port_obj in self.ports.iteritems()
if port_obj.is_active()]
+ def get_paused_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_paused()]
+
+ def get_transmitting_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_transmitting()]
+
def set_verbose(self, mode):
- self.comm_link.set_verbose(mode)
+
+ # on high - enable link verbose
+ if mode == self.VERBOSE_HIGH:
+ self.comm_link.set_verbose(True)
+ else:
+ self.comm_link.set_verbose(False)
+
self.verbose = mode
+
+ def check_verbose (self, level):
+ return (self.verbose >= level)
+
+ def get_verbose (self):
+ return self.verbose
+
+ def prn_func (self, msg, level = VERBOSE_REGULAR):
+ if self.check_verbose(level):
+ print msg
+
############# server actions ################
# ping server
def ping(self):
- rc, info = self.transmit("ping")
- return RC(rc, info)
+ return self.transmit("ping")
- def sync_with_server(self, sync_streams=False):
- rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
- if not rc:
- return RC_ERR(data)
-
- for port_info in data:
- rc = self.ports[port_info['port_id']].sync(port_info)
- if rc.bad():
- return rc
-
- return RC_OK()
def get_global_stats(self):
- rc, info = self.transmit("get_global_stats")
- return RC(rc, info)
+ return self.transmit("get_global_stats")
########## port commands ##############
+ def sync_ports (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].sync())
+
+ return rc
+
# acquire ports, if port_list is none - get all
def acquire (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
@@ -733,7 +520,7 @@ class CTRexStatelessClient(object):
rc = RC()
for port_id in port_id_list:
- rc.add(self.ports[port_id].release(force))
+ rc.add(self.ports[port_id].release())
return rc
@@ -750,15 +537,16 @@ class CTRexStatelessClient(object):
return rc
+
def add_stream_pack(self, stream_pack_list, port_id_list = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
- for stream_pack in stream_pack_list:
- rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list))
-
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].add_streams(stream_pack_list))
+
return rc
@@ -855,6 +643,17 @@ class CTRexStatelessClient(object):
return rc
+ def validate (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].validate())
+
+ return rc
+
+
def get_port_stats(self, port_id=None):
pass
@@ -866,16 +665,19 @@ class CTRexStatelessClient(object):
return self.comm_link.transmit(method_name, params)
+ def transmit_batch(self, batch_list):
+ return self.comm_link.transmit_batch(batch_list)
######################### Console (high level) API #########################
+ @timing
def cmd_ping(self):
rc = self.ping()
rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port()))
return rc
- def cmd_connect(self):
- rc = self.connect()
+ def cmd_connect(self, mode = "RW"):
+ rc = self.connect(mode)
rc.annotate()
return rc
@@ -886,13 +688,7 @@ class CTRexStatelessClient(object):
# reset
def cmd_reset(self):
-
-
- # sync with the server
- rc = self.sync_with_server()
- rc.annotate("Syncing with the server:")
- if rc.bad():
- return rc
+ #self.release(self.get_acquired_ports())
rc = self.acquire(force = True)
rc.annotate("Force acquiring all ports:")
@@ -924,7 +720,7 @@ class CTRexStatelessClient(object):
active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
if not active_ports:
- msg = "No active traffic on porvided ports"
+ msg = "No active traffic on provided ports"
print format_text(msg, 'bold')
return RC_ERR(msg)
@@ -948,15 +744,26 @@ class CTRexStatelessClient(object):
rc = self.update_traffic(mult, active_ports)
rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
- return RC_OK()
+ return rc
+ # clear stats
def cmd_clear(self, port_id_list):
+
for port_id in port_id_list:
self.ports[port_id].clear_stats()
+
self.global_stats.clear_stats()
+
+ return RC_OK()
+
+
+ def cmd_invalidate (self, port_id_list):
+ for port_id in port_id_list:
+ self.ports[port_id].invalidate_stats()
+
+ self.global_stats.invalidate()
+
return RC_OK()
# pause cmd
@@ -972,10 +779,8 @@ class CTRexStatelessClient(object):
rc = self.pause_traffic(active_ports)
rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
+ return rc
- return RC_OK()
# resume cmd
@@ -991,14 +796,11 @@ class CTRexStatelessClient(object):
rc = self.resume_traffic(active_ports)
rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
-
- return RC_OK()
+ return rc
# start cmd
- def cmd_start (self, port_id_list, stream_list, mult, force, duration):
+ def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry):
active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
@@ -1020,19 +822,37 @@ class CTRexStatelessClient(object):
rc = self.add_stream_pack(stream_list.compiled, port_id_list)
- rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list))
+ rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list))
if rc.bad():
return rc
+ # when not on dry - start the traffic , otherwise validate only
+ if not dry:
+ rc = self.start_traffic(mult, duration, port_id_list)
+ rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
- # finally, start the traffic
- rc = self.start_traffic(mult, duration, port_id_list)
- rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
return rc
+ else:
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list))
- return RC_OK()
+ if rc.bad():
+ return rc
+ # show a profile on one port for illustration
+ self.ports[port_id_list[0]].print_profile(mult, duration)
+
+ return rc
+
+
+ # validate port(s) profile
+ def cmd_validate (self, port_id_list):
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating streams on port(s) {0}:".format(port_id_list))
+ return rc
+
+
+ # stats
def cmd_stats(self, port_id_list, stats_mask=set()):
stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask)
@@ -1043,6 +863,26 @@ class CTRexStatelessClient(object):
############## High Level API With Parser ################
+
+ def cmd_connect_line (self, line):
+ '''Connects to the TRex server'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "connect",
+ self.cmd_connect_line.__doc__,
+ parsing_opts.FORCE)
+
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ if opts.force:
+ rc = self.cmd_connect(mode = "RWF")
+ else:
+ rc = self.cmd_connect(mode = "RW")
+
+ @timing
def cmd_start_line (self, line):
'''Start selected traffic in specified ports on TRex\n'''
# define a parser
@@ -1054,13 +894,19 @@ class CTRexStatelessClient(object):
parsing_opts.FORCE,
parsing_opts.STREAM_FROM_PATH_OR_FILE,
parsing_opts.DURATION,
- parsing_opts.MULTIPLIER)
+ parsing_opts.MULTIPLIER_STRICT,
+ parsing_opts.DRY_RUN)
opts = parser.parse_args(line.split())
+
if opts is None:
return RC_ERR("bad command line parameters")
+
+ if opts.dry:
+ print format_text("\n*** DRY RUN ***", 'bold')
+
if opts.db:
stream_list = self.streams_db.get_stream_pack(opts.db)
rc = RC(stream_list != None)
@@ -1070,7 +916,15 @@ class CTRexStatelessClient(object):
else:
# load streams from file
- stream_list = self.streams_db.load_yaml_file(opts.file[0])
+ stream_list = None;
+ try:
+ stream_list = self.streams_db.load_yaml_file(opts.file[0])
+ except Exception as e:
+ s = str(e)
+ rc=RC_ERR(s)
+ rc.annotate()
+ return rc
+
rc = RC(stream_list != None)
rc.annotate("Load stream pack (from file):")
if stream_list == None:
@@ -1078,12 +932,13 @@ class CTRexStatelessClient(object):
# total has no meaning with percentage - its linear
- if opts.total and (mult['type'] != 'percentage'):
+ if opts.total and (opts.mult['type'] != 'percentage'):
# if total was set - divide it between the ports
- opts.mult['max'] = opts.mult['max'] / len(opts.ports)
+ opts.mult['value'] = opts.mult['value'] / len(opts.ports)
- return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration)
+ return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry)
+ @timing
def cmd_resume_line (self, line):
'''Resume active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1097,6 +952,8 @@ class CTRexStatelessClient(object):
return self.cmd_resume(opts.ports)
+
+ @timing
def cmd_stop_line (self, line):
'''Stop active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1110,6 +967,8 @@ class CTRexStatelessClient(object):
return self.cmd_stop(opts.ports)
+
+ @timing
def cmd_pause_line (self, line):
'''Pause active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1123,6 +982,8 @@ class CTRexStatelessClient(object):
return self.cmd_pause(opts.ports)
+
+ @timing
def cmd_update_line (self, line):
'''Update port(s) speed currently active\n'''
parser = parsing_opts.gen_parser(self,
@@ -1139,14 +1000,15 @@ class CTRexStatelessClient(object):
# total has no meaning with percentage - its linear
if opts.total and (opts.mult['type'] != 'percentage'):
# if total was set - divide it between the ports
- opts.mult['max'] = opts.mult['max'] / len(opts.ports)
+ opts.mult['value'] = opts.mult['value'] / len(opts.ports)
return self.cmd_update(opts.ports, opts.mult)
-
+ @timing
def cmd_reset_line (self, line):
return self.cmd_reset()
+
def cmd_clear_line (self, line):
'''Clear cached local statistics\n'''
# define a parser
@@ -1161,6 +1023,7 @@ class CTRexStatelessClient(object):
return RC_ERR("bad command line parameters")
return self.cmd_clear(opts.ports)
+
def cmd_stats_line (self, line):
'''Fetch statistics from TRex server by port\n'''
# define a parser
@@ -1180,30 +1043,34 @@ class CTRexStatelessClient(object):
if not mask:
# set to show all stats if no filter was given
mask = trex_stats.ALL_STATS_OPTS
- # get stats objects, as dictionary
+
stats = self.cmd_stats(opts.ports, mask)
+
# print stats to screen
for stat_type, stat_data in stats.iteritems():
text_tables.print_table_with_header(stat_data.text_table, stat_type)
- return
-
- # if opts.db:
- # stream_list = self.streams_db.get_stream_pack(opts.db)
- # rc = RC(stream_list != None)
- # rc.annotate("Load stream pack (from DB):")
- # if rc.bad():
- # return RC_ERR("Failed to load stream pack")
- #
- # else:
- # # load streams from file
- # stream_list = self.streams_db.load_yaml_file(opts.file[0])
- # rc = RC(stream_list != None)
- # rc.annotate("Load stream pack (from file):")
- # if stream_list == None:
- # return RC_ERR("Failed to load stream pack")
- #
- #
- # return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration)
+
+
+ return RC_OK()
+
+
+
+ @timing
+ def cmd_validate_line (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ parser = parsing_opts.gen_parser(self,
+ "validate",
+ self.cmd_validate_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ rc = self.cmd_validate(opts.ports)
+ return rc
+
def cmd_exit_line (self, line):
print format_text("Exiting\n", 'bold')
@@ -1280,6 +1147,7 @@ class CTRexStatelessClient(object):
return True
+
#################################
# ------ private methods ------ #
@staticmethod
@@ -1294,17 +1162,18 @@ class CTRexStatelessClient(object):
def _filter_namespace_args(namespace, ok_values):
return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
+
#################################
# ------ private classes ------ #
class CCommLink(object):
"""describes the connectivity of the stateless client method"""
- def __init__(self, server="localhost", port=5050, virtual=False):
+ def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None):
super(CTRexStatelessClient.CCommLink, self).__init__()
self.virtual = virtual
self.server = server
self.port = port
self.verbose = False
- self.rpc_link = JsonRpcClient(self.server, self.port)
+ self.rpc_link = JsonRpcClient(self.server, self.port, prn_func)
@property
def is_connected(self):
@@ -1313,6 +1182,12 @@ class CTRexStatelessClient(object):
else:
return True
+ def get_server (self):
+ return self.server
+
+ def get_port (self):
+ return self.port
+
def set_verbose(self, mode):
self.verbose = mode
return self.rpc_link.set_verbose(mode)
@@ -1354,4 +1229,3 @@ class CTRexStatelessClient(object):
if __name__ == "__main__":
pass
-