summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--api/stl/examples/stl_simple_burst.py53
-rw-r--r--api/stl/examples/udp_64B.pcapbin0 -> 104 bytes
-rw-r--r--api/stl/profiles/burst.yaml39
-rw-r--r--api/stl/trex_stl_api.py17
-rwxr-xr-xlinux_dpdk/ws_main.py78
-rwxr-xr-xlinux_dpdk/wscript9
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py2
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py83
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_hltapi.py4
-rw-r--r--scripts/automation/trex_control_plane/client/trex_port.py41
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py1971
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py30
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/parsing_opts.py41
-rwxr-xr-xscripts/automation/trex_control_plane/common/trex_stats.py48
-rwxr-xr-xscripts/automation/trex_control_plane/common/trex_streams.py1
-rw-r--r--scripts/automation/trex_control_plane/common/trex_types.py51
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py204
-rw-r--r--scripts/automation/trex_control_plane/console/trex_status.py1032
-rw-r--r--scripts/automation/trex_control_plane/console/trex_tui.py133
-rw-r--r--scripts/automation/trex_control_plane/examples/interactive_stateless.py2
-rw-r--r--src/internal_api/trex_platform_api.h3
-rwxr-xr-xsrc/main_dpdk.cpp113
-rw-r--r--src/mock/trex_platform_api_mock.cpp1
-rw-r--r--src/publisher/trex_publisher.cpp15
-rw-r--r--src/publisher/trex_publisher.h12
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp16
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h1
-rw-r--r--src/rpc-server/trex_rpc_async_server.cpp3
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp1
-rw-r--r--src/sim/trex_sim_stateless.cpp4
30 files changed, 2477 insertions, 1531 deletions
diff --git a/api/stl/examples/stl_simple_burst.py b/api/stl/examples/stl_simple_burst.py
new file mode 100644
index 00000000..7efb574a
--- /dev/null
+++ b/api/stl/examples/stl_simple_burst.py
@@ -0,0 +1,53 @@
+import sys
+sys.path.insert(0, "../")
+
+import trex_stl_api
+
+from trex_stl_api import STLClient, STLError
+
+import time
+
+# define a simple burst test
+def simple_burst ():
+
+ passed = True
+
+ try:
+ with STLClient() as c:
+
+ # activate this for some logging information
+ #c.logger.set_verbose(c.logger.VERBOSE_REGULAR)
+
+ # repeat for 5 times
+ for i in xrange(1, 6):
+
+ # read the stats before
+ before_ipackets = c.get_stats()['total']['ipackets']
+
+ # inject burst profile on two ports and block until done
+ c.start(profiles = '../profiles/burst.yaml', ports = [0, 1], mult = "1gbps")
+ c.wait_on_traffic(ports = [0, 1])
+
+ after_ipackets = c.get_stats()['total']['ipackets']
+
+ print "Test iteration {0} - Packets Received: {1} ".format(i, (after_ipackets - before_ipackets))
+
+ # we have 600 packets in the burst and two ports
+ if (after_ipackets - before_ipackets) != (600 * 2):
+ passed = False
+
+ # error handling
+ except STLError as e:
+ passed = False
+ print e
+
+
+
+ if passed:
+ print "\nTest has passed :-)\n"
+ else:
+ print "\nTest has failed :-(\n"
+
+
+simple_burst()
+
diff --git a/api/stl/examples/udp_64B.pcap b/api/stl/examples/udp_64B.pcap
new file mode 100644
index 00000000..699b9c80
--- /dev/null
+++ b/api/stl/examples/udp_64B.pcap
Binary files differ
diff --git a/api/stl/profiles/burst.yaml b/api/stl/profiles/burst.yaml
new file mode 100644
index 00000000..dbd348c7
--- /dev/null
+++ b/api/stl/profiles/burst.yaml
@@ -0,0 +1,39 @@
+### Single stream UDP packet, 64B ###
+#####################################
+- name: stream0
+ stream:
+ self_start: True
+ next_stream_id: stream1
+ packet:
+ binary: udp_64B.pcap
+ mode:
+ type: single_burst
+ pps: 100
+ total_pkts : 100
+ rx_stats: []
+ vm: []
+
+- name: stream1
+ stream:
+ self_start: False
+ next_stream_id: stream2
+ packet:
+ binary: udp_64B.pcap
+ mode:
+ type: single_burst
+ pps: 100
+ total_pkts : 200
+ rx_stats: []
+ vm: []
+
+- name: stream2
+ stream:
+ self_start: False
+ packet:
+ binary: udp_64B.pcap
+ mode:
+ type: single_burst
+ pps: 100
+ total_pkts : 300
+ rx_stats: []
+ vm: []
diff --git a/api/stl/trex_stl_api.py b/api/stl/trex_stl_api.py
new file mode 100644
index 00000000..aad39916
--- /dev/null
+++ b/api/stl/trex_stl_api.py
@@ -0,0 +1,17 @@
+import os
+import sys
+import time
+
+
+# update the import path to include the stateless client
+root_path = os.path.dirname(os.path.abspath(__file__))
+
+sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client/'))
+sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client_utils/'))
+sys.path.insert(0, os.path.join(root_path, '../../scripts/automation/trex_control_plane/client_utils/'))
+
+# aliasing
+import trex_stateless_client
+STLClient = trex_stateless_client.STLClient
+STLError = trex_stateless_client.STLError
+
diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py
index 1a96e127..f098e193 100755
--- a/linux_dpdk/ws_main.py
+++ b/linux_dpdk/ws_main.py
@@ -86,6 +86,7 @@ def options(opt):
opt.load('compiler_cc')
opt.add_option('--pkg-dir', '--pkg_dir', dest='pkg_dir', default=False, action='store', help="Destination folder for 'pkg' option.")
opt.add_option('--pkg-file', '--pkg_file', dest='pkg_file', default=False, action='store', help="Destination filename for 'pkg' option.")
+ opt.add_option('--publish-commit', '--publish_commit', dest='publish_commit', default=False, action='store', help="Specify commit id for 'publish_both' option (Please make sure it's good!)")
def configure(conf):
conf.load('g++')
@@ -887,11 +888,14 @@ class Env(object):
s= Env().get_env('TREX_EX_WEB_SRV');
return s;
+ @staticmethod
+ def get_trex_regression_workspace():
+ return Env().get_env('TREX_REGRESSION_WORKSPACE')
+
+
def check_release_permission():
if os.getenv('USER') not in USERS_ALLOWED_TO_RELEASE:
- print 'You are not allowed to release TRex. Please contact Hanoch.'
- return False
- return True
+ raise Exception('You are not allowed to release TRex. Please contact Hanoch.')
# build package in parent dir. can provide custom name and folder with --pkg-dir and --pkg-file
def pkg(self):
@@ -920,9 +924,8 @@ def release(bld, custom_dir = None):
""" release to local folder """
if custom_dir:
exec_p = custom_dir
- elif not check_release_permission():
- return
else:
+ check_release_permission()
exec_p = Env().get_release_path()
print "copy images and libs"
os.system(' mkdir -p '+exec_p);
@@ -947,34 +950,77 @@ def release(bld, custom_dir = None):
os.system("mv %s/%s.tar.gz %s" % (os.getcwd(),rel,exec_p));
-def publish(bld):
- if not check_release_permission():
- return
+def publish(bld, custom_source = None):
+ check_release_permission()
exec_p = Env().get_release_path()
rel=get_build_num ()
release_name ='%s.tar.gz' % (rel);
- from_ = exec_p+'/'+release_name;
+ if custom_source:
+ from_ = custom_source
+ else:
+ from_ = exec_p+'/'+release_name;
os.system("rsync -av %s %s:%s/%s " %(from_,Env().get_local_web_server(),Env().get_remote_release_path (), release_name))
os.system("ssh %s 'cd %s;rm be_latest; ln -P %s be_latest' " %(Env().get_local_web_server(),Env().get_remote_release_path (),release_name))
#os.system("ssh %s 'cd %s;rm latest; ln -P %s latest' " %(Env().get_local_web_server(),Env().get_remote_release_path (),release_name))
-def publish_ext(bld):
- if not check_release_permission():
- return
+def publish_ext(bld, custom_source = None):
+ check_release_permission()
exec_p = Env().get_release_path()
rel=get_build_num ()
release_name ='%s.tar.gz' % (rel);
- from_ = exec_p+'/'+release_name;
+ if custom_source:
+ from_ = custom_source
+ else:
+ from_ = exec_p+'/'+release_name;
os.system('rsync -avz -e "ssh -i %s" --rsync-path=/usr/bin/rsync %s %s@%s:%s/release/%s' % (Env().get_trex_ex_web_key(),from_, Env().get_trex_ex_web_user(),Env().get_trex_ex_web_srv(),Env().get_trex_ex_web_path() ,release_name) )
os.system("ssh -i %s -l %s %s 'cd %s/release/;rm be_latest; ln -P %s be_latest' " %(Env().get_trex_ex_web_key(),Env().get_trex_ex_web_user(),Env().get_trex_ex_web_srv(),Env().get_trex_ex_web_path(),release_name))
#os.system("ssh -i %s -l %s %s 'cd %s/release/;rm latest; ln -P %s latest' " %(Env().get_trex_ex_web_key(),Env().get_trex_ex_web_user(),Env().get_trex_ex_web_srv(),Env().get_trex_ex_web_path(),release_name))
-#WIP
-def release_successful(self):
- print 'Not implemented'
+# publish latest passed regression package (or custom commit from --publish_commit option) as be_latest to trex-tgn.cisco.com and internal wiki
+def publish_both(self):
+ check_release_permission()
+ packages_dir = Env().get_env('TREX_LOCAL_PUBLISH_PATH') + '/experiment/packages'
+ publish_commit = self.options.publish_commit
+ if publish_commit:
+ package_file = '%s/%s.tar.gz' % (packages_dir, publish_commit)
+ else:
+ last_passed_commit_file = Env().get_trex_regression_workspace() + '/reports/last_passed_commit'
+ with open(last_passed_commit_file) as f:
+ last_passed_commit = f.read().strip()
+ package_file = '%s/%s.tar.gz' % (packages_dir, last_passed_commit)
+ publish(self, custom_source = package_file)
+ publish_ext(self, custom_source = package_file)
+
+# print detailed latest passed regression commit + brief info of 5 commits before it
+def show(bld):
+ last_passed_commit_file = Env().get_trex_regression_workspace() + '/reports/last_passed_commit'
+ with open(last_passed_commit_file) as f:
+ last_passed_commit = f.read().strip()
+
+ # last passed nightly
+ command = 'timeout 10 git show %s --quiet' % last_passed_commit
+ result, output = commands.getstatusoutput(command)
+ if result == 0:
+ print 'Last passed regression commit:\n%s\n' % output
+ else:
+ raise Exception('Error getting commit info with command: %s' % command)
+
+ # brief list of 5 commits before passed
+ result, output = commands.getstatusoutput('git --version')
+ if result != 0 or output.startswith('git version 1'):
+ # old format, no color etc.
+ command = "timeout 10 git log --no-merges -n 5 --pretty=format:'%%h %%an %%ci %%s' %s^@" % last_passed_commit
+ else:
+ # new format, with color, padding, truncating etc.
+ command = "timeout 10 git log --no-merges -n 5 --pretty=format:'%%C(auto)%%h%%Creset %%<(10,trunc)%%an %%ci %%<(100,trunc)%%s' %s^@ " % last_passed_commit
+ result, output = commands.getstatusoutput(command)
+ if result == 0:
+ print output
+ else:
+ raise Exception('Error getting commits info with command: %s' % command)
def test (bld):
r=commands.getstatusoutput("git log --pretty=format:'%H' -n 1")
diff --git a/linux_dpdk/wscript b/linux_dpdk/wscript
index 67434a19..981e4b92 100755
--- a/linux_dpdk/wscript
+++ b/linux_dpdk/wscript
@@ -43,17 +43,16 @@ def publish_ext(bld):
def publish_web(bld):
ws_main.publish_web(bld)
-def release_successful(bld):
- ws_main.release_successful(bld)
-
def sync(bld):
ws_main.sync(bld)
def test(bld):
ws_main.test(bld)
+def show(bld):
+ ws_main.show(bld)
-
-
+def publish_both(bld):
+ ws_main.publish_both(bld)
diff --git a/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py b/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py
index 96393d1e..b8831c04 100755
--- a/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py
+++ b/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py
@@ -212,7 +212,7 @@ class CTRexPktBuilder_Test(pkt_bld_general_test.CGeneralPktBld_Test):
# finally, set IP header len with relation to payload data
self.pkt_bld.set_layer_attr("l3_ip", "len", len(self.pkt_bld.get_layer('l3_ip')))
- filepath = "reports/test.pcap"
+ filepath = 'reports/test%s.pcap' % os.getenv('SETUP_DIR', '')
self.pkt_bld.dump_pkt_to_pcap(filepath)
assert os.path.isfile(filepath)
# remove pcap after ensuring it exists
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 66e65a32..ef4c48f9 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -16,6 +16,7 @@ import time
import datetime
import zmq
import re
+import random
from common.trex_stats import *
from common.trex_streams import *
@@ -143,18 +144,22 @@ class CTRexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, server, port, stateless_client, prn_func = None):
+ def __init__ (self, server, port, stateless_client):
self.port = port
self.server = server
+
self.stateless_client = stateless_client
- self.prn_func = prn_func
+
+ self.event_handler = stateless_client.event_handler
+ self.logger = self.stateless_client.logger
self.raw_snapshot = {}
self.stats = CTRexAsyncStatsManager()
self.last_data_recv_ts = 0
+ self.async_barrier = None
self.connected = False
@@ -166,13 +171,6 @@ class CTRexAsyncClient():
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
-
- if self.prn_func:
- self.prn_func(msg)
- else:
- print msg
-
# Socket to talk to server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
@@ -188,17 +186,15 @@ class CTRexAsyncClient():
self.connected = True
-
- # wait for data streaming from the server
- timeout = time.time() + 5
- while not self.is_alive():
- time.sleep(0.01)
- if time.time() > timeout:
- self.disconnect()
- return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr)
+ rc = self.barrier()
+ if not rc:
+ self.disconnect()
+ return rc
return RC_OK()
+
+
# disconnect
def disconnect (self):
@@ -215,14 +211,14 @@ class CTRexAsyncClient():
# done
self.connected = False
+
# thread function
def _run (self):
-
# socket must be created on the same thread
- self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+ self.socket.connect(self.tr)
got_data = False
@@ -234,7 +230,7 @@ class CTRexAsyncClient():
# signal once
if not got_data:
- self.stateless_client.on_async_alive()
+ self.event_handler.on_async_alive()
got_data = True
@@ -243,7 +239,7 @@ class CTRexAsyncClient():
# signal once
if got_data:
- self.stateless_client.on_async_dead()
+ self.event_handler.on_async_dead()
got_data = False
continue
@@ -283,11 +279,52 @@ class CTRexAsyncClient():
def __dispatch (self, name, type, data):
# stats
if name == "trex-global":
- self.stateless_client.handle_async_stats_update(data)
+ self.event_handler.handle_async_stats_update(data)
+
# events
elif name == "trex-event":
- self.stateless_client.handle_async_event(type, data)
+ self.event_handler.handle_async_event(type, data)
+
+ # barriers
+ elif name == "trex-barrier":
+ self.handle_async_barrier(type, data)
else:
pass
+ # async barrier handling routine
+ def handle_async_barrier (self, type, data):
+ if self.async_barrier['key'] == type:
+ self.async_barrier['ack'] = True
+
+
+ # block on barrier for async channel
+ def barrier(self, timeout = 5):
+
+ # set a random key
+ key = random.getrandbits(32)
+ self.async_barrier = {'key': key, 'ack': False}
+
+ # expr time
+ expr = time.time() + timeout
+
+ while not self.async_barrier['ack']:
+
+ # inject
+ rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
+ if not rc:
+ return rc
+
+ # fast loop
+ for i in xrange(0, 100):
+ if self.async_barrier['ack']:
+ break
+ time.sleep(0.001)
+
+ if time.time() > expr:
+ return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
+
+ return RC_OK()
+
+
+
diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py
index 848d5a9e..c25c73cb 100755
--- a/scripts/automation/trex_control_plane/client/trex_hltapi.py
+++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py
@@ -2,7 +2,7 @@
import trex_root_path
from client_utils.packet_builder import CTRexPktBuilder
-from trex_stateless_client import CTRexStatelessClient
+from trex_stateless_client import STLClient
from common.trex_streams import *
from client_utils.general_utils import id_count_gen
import dpkt
@@ -20,7 +20,7 @@ class CTRexHltApi(object):
# sync = RPC, async = ZMQ
def connect(self, device, port_list, username, sync_port = 4501, async_port = 4500, reset=False, break_locks=False):
ret_dict = {"status": 0}
- self.trex_client = CTRexStatelessClient(username, device, sync_port, async_port)
+ self.trex_client = STLClient(username, device, sync_port, async_port)
rc = self.trex_client.connect()
if rc.bad():
diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py
index 66d87f9d..94240f2a 100644
--- a/scripts/automation/trex_control_plane/client/trex_port.py
+++ b/scripts/automation/trex_control_plane/client/trex_port.py
@@ -56,7 +56,7 @@ class Port(object):
def err(self, msg):
return RC_ERR("port {0} : {1}".format(self.port_id, msg))
- def ok(self, data = "ACK"):
+ def ok(self, data = ""):
return RC_OK(data)
def get_speed_bps (self):
@@ -198,6 +198,9 @@ class Port(object):
# remove stream from port
def remove_stream (self, stream_id):
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
if not stream_id in self.streams:
return self.err("stream {0} does not exists".format(stream_id))
@@ -219,6 +222,9 @@ class Port(object):
# remove all the streams
def remove_all_streams (self):
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
params = {"handler": self.handler,
"port_id": self.port_id}
@@ -244,6 +250,10 @@ class Port(object):
# start traffic
def start (self, mul, duration):
+
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
if self.state == self.STATE_DOWN:
return self.err("Unable to start traffic - port is down")
@@ -270,8 +280,15 @@ class Port(object):
# with force ignores the cached state and sends the command
def stop (self, force = False):
- if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
- return self.err("port is not transmitting")
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
+ # port is already stopped
+ if not force:
+ if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS):
+ return self.ok()
+
+
params = {"handler": self.handler,
"port_id": self.port_id}
@@ -287,6 +304,9 @@ class Port(object):
def pause (self):
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
if (self.state != self.STATE_TX) :
return self.err("port is not transmitting")
@@ -305,6 +325,9 @@ class Port(object):
def resume (self):
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
if (self.state != self.STATE_PAUSE) :
return self.err("port is not in pause mode")
@@ -322,6 +345,10 @@ class Port(object):
def update (self, mul):
+
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
if (self.state != self.STATE_TX) :
return self.err("port is not transmitting")
@@ -338,6 +365,9 @@ class Port(object):
def validate (self):
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
if (self.state == self.STATE_DOWN):
return self.err("port is down")
@@ -413,6 +443,11 @@ class Port(object):
def clear_stats(self):
return self.port_stats.clear_stats()
+
+ def get_stats (self):
+ return self.port_stats.get_stats()
+
+
def invalidate_stats(self):
return self.port_stats.invalidate()
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index dc39bee6..c1a4d1d1 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -8,6 +8,7 @@ except ImportError:
import client.outer_packages
from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+from client_utils import general_utils
from client_utils.packet_builder import CTRexPktBuilder
import json
@@ -22,86 +23,185 @@ import re
import random
from trex_port import Port
from common.trex_types import *
-
from trex_async_client import CTRexAsyncClient
+# basic error for API
+class STLError(Exception):
+ def __init__ (self, msg):
+ self.msg = str(msg)
+
+ def __str__ (self):
+ exc_type, exc_obj, exc_tb = sys.exc_info()
+ fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
+
+
+ s = "\n******\n"
+ s += "Error at {0}:{1}\n\n".format(format_text(fname, 'bold'), format_text(exc_tb.tb_lineno), 'bold')
+ s += "specific error:\n\n{0}\n".format(format_text(self.msg, 'bold'))
+
+ return s
+
+ def brief (self):
+ return self.msg
+
+
+# raised when the client state is invalid for operation
+class STLStateError(STLError):
+ def __init__ (self, op, state):
+ self.msg = "Operation '{0}' is not valid while '{1}'".format(op, state)
+
+
+# port state error
+class STLPortStateError(STLError):
+ def __init__ (self, port, op, state):
+ self.msg = "Operation '{0}' on port '{1}' is not valid for state '{2}'".format(op, port, state)
+
+
+# raised when argument is not valid for operation
+class STLArgumentError(STLError):
+ def __init__ (self, name, got, valid_values = None, extended = None):
+ self.msg = "Argument: '{0}' invalid value: '{1}'".format(name, got)
+ if valid_values:
+ self.msg += " - valid values are '{0}'".format(valid_values)
+
+ if extended:
+ self.msg += "\n{0}".format(extended)
-class CTRexStatelessClient(object):
- """docstring for CTRexStatelessClient"""
+# raised when timeout occurs
+class STLTimeoutError(STLError):
+ def __init__ (self, timeout):
+ self.msg = "Timeout: operation took more than '{0}' seconds".format(timeout)
+
+
+############################ logger #############################
+############################ #############################
+############################ #############################
+
+# logger API for the client
+class LoggerApi(object):
# verbose levels
- VERBOSE_QUIET = 0
+ VERBOSE_QUIET = 0
VERBOSE_REGULAR = 1
VERBOSE_HIGH = 2
-
- def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, quiet = False, virtual = False):
- super(CTRexStatelessClient, self).__init__()
- self.user = username
+ def __init__(self):
+ self.level = LoggerApi.VERBOSE_REGULAR
+
+ # implemented by specific logger
+ def write(self, msg, newline = True):
+ raise Exception("implement this")
+
+ # implemented by specific logger
+ def flush(self):
+ raise Exception("implement this")
+
+ def set_verbose (self, level):
+ if not level in xrange(self.VERBOSE_QUIET, self.VERBOSE_HIGH + 1):
+ raise ValueError("bad value provided for logger")
+
+ self.level = level
+
+ def get_verbose (self):
+ return self.level
+
+
+ def check_verbose (self, level):
+ return (self.level >= level)
+
+
+ # simple log message with verbose
+ def log (self, msg, level = VERBOSE_REGULAR, newline = True):
+ if not self.check_verbose(level):
+ return
+
+ self.write(msg, newline)
+
+ # logging that comes from async event
+ def async_log (self, msg, level = VERBOSE_REGULAR, newline = True):
+ self.log(msg, level, newline)
- self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func)
- # default verbose level
- if not quiet:
- self.verbose = self.VERBOSE_REGULAR
+ def pre_cmd (self, desc):
+ self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False)
+ self.flush()
+
+ def post_cmd (self, rc):
+ if rc:
+ self.log(format_text("[SUCCESS]\n", 'green', 'bold'))
else:
- self.verbose = self.VERBOSE_QUIET
+ self.log(format_text("[FAILED]\n", 'red', 'bold'))
- self.ports = {}
- self._connection_info = {"server": server,
- "sync_port": sync_port,
- "async_port": async_port}
- self.system_info = {}
- self.server_version = {}
- self.__err_log = None
- self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func)
+ def log_cmd (self, desc):
+ self.pre_cmd(desc)
+ self.post_cmd(True)
- self.streams_db = CStreamsDB()
- self.global_stats = trex_stats.CGlobalStats(self._connection_info,
- self.server_version,
- self.ports)
- self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats,
- self.ports)
- self.events = []
+ # supress object getter
+ def supress (self):
+ class Supress(object):
+ def __init__ (self, logger):
+ self.logger = logger
- self.session_id = random.getrandbits(32)
- self.read_only = False
- self.connected = False
- self.prompt_redraw_cb = None
+ def __enter__ (self):
+ self.saved_level = self.logger.get_verbose()
+ self.logger.set_verbose(LoggerApi.VERBOSE_QUIET)
+ def __exit__ (self, type, value, traceback):
+ self.logger.set_verbose(self.saved_level)
- # returns the port object
- def get_port (self, port_id):
- return self.ports.get(port_id, None)
+ return Supress(self)
- # connection server ip
- def get_server_ip (self):
- return self.comm_link.get_server()
- # connection server port
- def get_server_port (self):
- return self.comm_link.get_port()
+# default logger - to stdout
+class DefaultLogger(LoggerApi):
+ def write (self, msg, newline = True):
+ if newline:
+ print msg
+ else:
+ print msg,
+ def flush (self):
+ sys.stdout.flush()
- ################# events handler ######################
- def add_event_log (self, msg, ev_type, show = False):
- if ev_type == "server":
- prefix = "[server]"
- elif ev_type == "local":
- prefix = "[local]"
+############################ async event hander #############################
+############################ #############################
+############################ #############################
- ts = time.time()
- st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
- self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold')))
+# handles different async events given to the client
+class AsyncEventHandler(object):
- if show:
- self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))), redraw_console = True)
-
+ def __init__ (self, client):
+ self.client = client
+ self.logger = self.client.logger
+
+ self.events = []
+ # public functions
+
+ def get_events (self):
+ return self.events
+
+
+ def clear_events (self):
+ self.events = []
+
+
+ def on_async_dead (self):
+ if self.client.connected:
+ msg = 'lost connection to server'
+ self.__add_event_log(msg, 'local', True)
+ self.client.connected = False
+
+
+ def on_async_alive (self):
+ pass
+
+
+ # handles an async stats update from the subscriber
def handle_async_stats_update(self, dump_data):
global_stats = {}
port_stats = {}
@@ -113,7 +213,7 @@ class CTRexStatelessClient(object):
if m:
port_id = int(m.group(2))
field_name = m.group(1)
- if self.ports.has_key(port_id):
+ if self.client.ports.has_key(port_id):
if not port_id in port_stats:
port_stats[port_id] = {}
port_stats[port_id][field_name] = value
@@ -124,13 +224,14 @@ class CTRexStatelessClient(object):
global_stats[key] = value
# update the general object with the snapshot
- self.global_stats.update(global_stats)
+ self.client.global_stats.update(global_stats)
+
# update all ports
for port_id, data in port_stats.iteritems():
- self.ports[port_id].port_stats.update(data)
-
+ self.client.ports[port_id].port_stats.update(data)
+ # dispatcher for server async events (port started, port stopped and etc.)
def handle_async_event (self, type, data):
# DP stopped
@@ -140,7 +241,7 @@ class CTRexStatelessClient(object):
if (type == 0):
port_id = int(data['port_id'])
ev = "Port {0} has started".format(port_id)
- self.async_event_port_started(port_id)
+ self.__async_event_port_started(port_id)
# port stopped
elif (type == 1):
@@ -148,8 +249,8 @@ class CTRexStatelessClient(object):
ev = "Port {0} has stopped".format(port_id)
# call the handler
- self.async_event_port_stopped(port_id)
-
+ self.__async_event_port_stopped(port_id)
+
# port paused
elif (type == 2):
@@ -157,7 +258,7 @@ class CTRexStatelessClient(object):
ev = "Port {0} has paused".format(port_id)
# call the handler
- self.async_event_port_paused(port_id)
+ self.__async_event_port_paused(port_id)
# port resumed
elif (type == 3):
@@ -165,7 +266,7 @@ class CTRexStatelessClient(object):
ev = "Port {0} has resumed".format(port_id)
# call the handler
- self.async_event_port_resumed(port_id)
+ self.__async_event_port_resumed(port_id)
# port finished traffic
elif (type == 4):
@@ -173,7 +274,7 @@ class CTRexStatelessClient(object):
ev = "Port {0} job done".format(port_id)
# call the handler
- self.async_event_port_stopped(port_id)
+ self.__async_event_port_stopped(port_id)
show_event = True
# port was stolen...
@@ -181,7 +282,7 @@ class CTRexStatelessClient(object):
session_id = data['session_id']
# false alarm, its us
- if session_id == self.session_id:
+ if session_id == self.client.session_id:
return
port_id = int(data['port_id'])
@@ -190,13 +291,13 @@ class CTRexStatelessClient(object):
ev = "Port {0} was forcely taken by '{1}'".format(port_id, who)
# call the handler
- self.async_event_port_forced_acquired(port_id)
+ self.__async_event_port_forced_acquired(port_id)
show_event = True
# server stopped
elif (type == 100):
ev = "Server has stopped"
- self.async_event_server_stopped()
+ self.__async_event_server_stopped()
show_event = True
@@ -205,338 +306,242 @@ class CTRexStatelessClient(object):
return
- self.add_event_log(ev, 'server', show_event)
-
-
- def async_event_port_stopped (self, port_id):
- self.ports[port_id].async_event_port_stopped()
-
-
- def async_event_port_started (self, port_id):
- self.ports[port_id].async_event_port_started()
-
-
- def async_event_port_paused (self, port_id):
- self.ports[port_id].async_event_port_paused()
-
+ self.__add_event_log(ev, 'server', show_event)
- def async_event_port_resumed (self, port_id):
- self.ports[port_id].async_event_port_resumed()
+ # private functions
- def async_event_port_forced_acquired (self, port_id):
- self.ports[port_id].async_event_forced_acquired()
- self.read_only = True
+ def __async_event_port_stopped (self, port_id):
+ self.client.ports[port_id].async_event_port_stopped()
- def async_event_server_stopped (self):
- self.connected = False
+ def __async_event_port_started (self, port_id):
+ self.client.ports[port_id].async_event_port_started()
- def get_events (self):
- return self.events
- def clear_events (self):
- self.events = []
+ def __async_event_port_paused (self, port_id):
+ self.client.ports[port_id].async_event_port_paused()
- ############# helper functions section ##############
- # measure time for functions
- def timing(f):
- def wrap(*args):
- time1 = time.time()
- ret = f(*args)
+ def __async_event_port_resumed (self, port_id):
+ self.client.ports[port_id].async_event_port_resumed()
- # don't want to print on error
- if ret.bad():
- return ret
- delta = time.time() - time1
- print format_time(delta) + "\n"
+ def __async_event_port_forced_acquired (self, port_id):
+ self.client.ports[port_id].async_event_forced_acquired()
- return ret
- return wrap
+ def __async_event_server_stopped (self):
+ self.client.connected = False
- def validate_port_list(self, port_id_list):
- if not isinstance(port_id_list, list):
- print type(port_id_list)
- return False
-
- # check each item of the sequence
- return all([ (port_id >= 0) and (port_id < self.get_port_count())
- for port_id in port_id_list ])
-
- # some preprocessing for port argument
- def __ports (self, port_id_list):
-
- # none means all
- if port_id_list == None:
- return range(0, self.get_port_count())
-
- # always list
- if isinstance(port_id_list, int):
- port_id_list = [port_id_list]
-
- if not isinstance(port_id_list, list):
- raise ValueError("bad port id list: {0}".format(port_id_list))
-
- for port_id in port_id_list:
- if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
- raise ValueError("bad port id {0}".format(port_id))
-
- return port_id_list
-
- ############ boot up section ################
-
- # connection sequence
-
- # mode can be RW - read / write, RWF - read write with force , RO - read only
- def connect(self, mode = "RW"):
-
- if self.is_connected():
- self.disconnect()
-
- # clear this flag
- self.connected = False
-
- # connect sync channel
- rc = self.comm_link.connect()
- if rc.bad():
- return rc
-
- # connect async channel
- rc = self.async_client.connect()
- if rc.bad():
- return rc
-
- # version
- rc = self.transmit("get_version")
- if rc.bad():
- return rc
-
- self.server_version = rc.data()
- self.global_stats.server_version = rc.data()
-
- # cache system info
- rc = self.transmit("get_system_info")
- if rc.bad():
- return rc
-
- self.system_info = rc.data()
-
- # cache supported commands
- rc = self.transmit("get_supported_cmds")
- if rc.bad():
- return rc
-
- self.supported_cmds = rc.data()
-
- # create ports
- for port_id in xrange(self.get_port_count()):
- speed = self.system_info['ports'][port_id]['speed']
- driver = self.system_info['ports'][port_id]['driver']
-
- self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id)
+ # add event to log
+ def __add_event_log (self, msg, ev_type, show = False):
+ if ev_type == "server":
+ prefix = "[server]"
+ elif ev_type == "local":
+ prefix = "[local]"
- # sync the ports
- rc = self.sync_ports()
- if rc.bad():
- return rc
+ ts = time.time()
+ st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
+ self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold')))
- # acquire all ports
- if mode == "RW":
- rc = self.acquire(force = False)
+ if show:
+ self.logger.async_log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))))
- # fallback to read only if failed
- if rc.bad():
- rc.annotate(show_status = False)
- print format_text("Switching to read only mode - only few commands will be available", 'bold')
- self.release(self.get_acquired_ports())
- self.read_only = True
- else:
- self.read_only = False
+
- elif mode == "RWF":
- rc = self.acquire(force = True)
- if rc.bad():
- return rc
- self.read_only = False
- elif mode == "RO":
- # no acquire on read only
- rc = RC_OK()
- self.read_only = True
+############################ RPC layer #############################
+############################ #############################
+############################ #############################
+class CCommLink(object):
+ """describes the connectivity of the stateless client method"""
+ def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None):
+ self.virtual = virtual
+ self.server = server
+ self.port = port
+ self.rpc_link = JsonRpcClient(self.server, self.port, prn_func)
-
- self.connected = True
- return RC_OK()
-
+ @property
+ def is_connected(self):
+ if not self.virtual:
+ return self.rpc_link.connected
+ else:
+ return True
- def is_read_only (self):
- return self.read_only
+ def get_server (self):
+ return self.server
- def is_connected (self):
- return self.connected and self.comm_link.is_connected
+ def get_port (self):
+ return self.port
+ def connect(self):
+ if not self.virtual:
+ return self.rpc_link.connect()
def disconnect(self):
- # release any previous acquired ports
- if self.is_connected():
- self.release(self.get_acquired_ports())
-
- self.comm_link.disconnect()
- self.async_client.disconnect()
-
- self.connected = False
-
- return RC_OK()
-
-
- def on_async_dead (self):
- if self.connected:
- msg = 'lost connection to server'
- self.add_event_log(msg, 'local', True)
- self.connected = False
-
- def on_async_alive (self):
- pass
-
- ########### cached queries (no server traffic) ###########
+ if not self.virtual:
+ return self.rpc_link.disconnect()
- def get_supported_cmds(self):
- return self.supported_cmds
+ def transmit(self, method_name, params={}):
+ if self.virtual:
+ self._prompt_virtual_tx_msg()
+ _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params)
+ print msg
+ return
+ else:
+ return self.rpc_link.invoke_rpc_method(method_name, params)
- def get_version(self):
- return self.server_version
+ def transmit_batch(self, batch_list):
+ if self.virtual:
+ self._prompt_virtual_tx_msg()
+ print [msg
+ for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params)
+ for command in batch_list]]
+ else:
+ batch = self.rpc_link.create_batch()
+ for command in batch_list:
+ batch.add(command.method, command.params)
+ # invoke the batch
+ return batch.invoke()
- def get_system_info(self):
- return self.system_info
+ def _prompt_virtual_tx_msg(self):
+ print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server,
+ port=self.port)
- def get_port_count(self):
- return self.system_info.get("port_count")
- def get_port_ids(self, as_str=False):
- port_ids = range(self.get_port_count())
- if as_str:
- return " ".join(str(p) for p in port_ids)
- else:
- return port_ids
- def get_stats_async (self):
- return self.async_client.get_stats()
+############################ client #############################
+############################ #############################
+############################ #############################
- def get_connection_port (self):
- return self.comm_link.port
+class STLClient(object):
+ """docstring for STLClient"""
- def get_connection_ip (self):
- return self.comm_link.server
+ def __init__(self,
+ username = general_utils.get_current_user(),
+ server = "localhost",
+ sync_port = 4501,
+ async_port = 4500,
+ verbose_level = LoggerApi.VERBOSE_QUIET,
+ logger = None,
+ virtual = False):
- def get_all_ports (self):
- return [port_id for port_id, port_obj in self.ports.iteritems()]
- def get_acquired_ports(self):
- return [port_id
- for port_id, port_obj in self.ports.iteritems()
- if port_obj.is_acquired()]
+ self.username = username
+
+ # init objects
+ self.ports = {}
+ self.server_version = {}
+ self.system_info = {}
+ self.session_id = random.getrandbits(32)
+ self.connected = False
- def get_active_ports(self):
- return [port_id
- for port_id, port_obj in self.ports.iteritems()
- if port_obj.is_active()]
+ # logger
+ self.logger = DefaultLogger() if not logger else logger
- def get_paused_ports (self):
- return [port_id
- for port_id, port_obj in self.ports.iteritems()
- if port_obj.is_paused()]
-
- def get_transmitting_ports (self):
- return [port_id
- for port_id, port_obj in self.ports.iteritems()
- if port_obj.is_transmitting()]
+ # initial verbose
+ self.logger.set_verbose(verbose_level)
- def set_verbose(self, mode):
+ # low level RPC layer
+ self.comm_link = CCommLink(server,
+ sync_port,
+ virtual,
+ self.logger)
- # on high - enable link verbose
- if mode == self.VERBOSE_HIGH:
- self.comm_link.set_verbose(True)
- else:
- self.comm_link.set_verbose(False)
+ # async event handler manager
+ self.event_handler = AsyncEventHandler(self)
- self.verbose = mode
+ # async subscriber level
+ self.async_client = CTRexAsyncClient(server,
+ async_port,
+ self)
+
+
- def check_verbose (self, level):
- return (self.verbose >= level)
+ # stats
+ self.connection_info = {"username": username,
+ "server": server,
+ "sync_port": sync_port,
+ "async_port": async_port,
+ "virtual": virtual}
- def get_verbose (self):
- return self.verbose
+
+ self.global_stats = trex_stats.CGlobalStats(self.connection_info,
+ self.server_version,
+ self.ports)
- def prn_func (self, msg, level = VERBOSE_REGULAR, redraw_console = False):
- if not self.check_verbose(level):
- return
+ self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats,
+ self.ports)
- if redraw_console and self.prompt_redraw_cb:
- print "\n" + msg + "\n"
- self.prompt_redraw_cb()
- else:
- print msg
+ # stream DB
+ self.streams_db = CStreamsDB()
- sys.stdout.flush()
+
+
+ ############# private functions - used by the class itself ###########
- def set_prompt_redraw_cb(self, cb):
- self.prompt_redraw_cb = cb
+ # some preprocessing for port argument
+ def __ports (self, port_id_list):
- ############# server actions ################
+ # none means all
+ if port_id_list == None:
+ return range(0, self.get_port_count())
- # ping server
- def ping(self):
- return self.transmit("ping")
+ # always list
+ if isinstance(port_id_list, int):
+ port_id_list = [port_id_list]
+ if not isinstance(port_id_list, list):
+ raise ValueError("bad port id list: {0}".format(port_id_list))
+ for port_id in port_id_list:
+ if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
+ raise ValueError("bad port id {0}".format(port_id))
- def get_global_stats(self):
- return self.transmit("get_global_stats")
+ return port_id_list
- ########## port commands ##############
- def sync_ports (self, port_id_list = None, force = False):
+ # sync ports
+ def __sync_ports (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
rc = RC()
for port_id in port_id_list:
rc.add(self.ports[port_id].sync())
-
+
return rc
# acquire ports, if port_list is none - get all
- def acquire (self, port_id_list = None, force = False):
+ def __acquire (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
rc = RC()
for port_id in port_id_list:
rc.add(self.ports[port_id].acquire(force))
-
+
return rc
-
+
# release ports
- def release (self, port_id_list = None):
+ def __release (self, port_id_list = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
for port_id in port_id_list:
rc.add(self.ports[port_id].release())
-
+
return rc
-
- def add_stream(self, stream_id, stream_obj, port_id_list = None):
+
+ def __add_stream(self, stream_id, stream_obj, port_id_list = None):
port_id_list = self.__ports(port_id_list)
@@ -544,12 +549,12 @@ class CTRexStatelessClient(object):
for port_id in port_id_list:
rc.add(self.ports[port_id].add_stream(stream_id, stream_obj))
-
+
return rc
-
- def add_stream_pack(self, stream_pack, port_id_list = None):
+
+ def __add_stream_pack(self, stream_pack, port_id_list = None):
port_id_list = self.__ports(port_id_list)
@@ -562,45 +567,45 @@ class CTRexStatelessClient(object):
- def remove_stream(self, stream_id, port_id_list = None):
+ def __remove_stream(self, stream_id, port_id_list = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
for port_id in port_id_list:
rc.add(self.ports[port_id].remove_stream(stream_id))
-
+
return rc
- def remove_all_streams(self, port_id_list = None):
+ def __remove_all_streams(self, port_id_list = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
for port_id in port_id_list:
rc.add(self.ports[port_id].remove_all_streams())
-
+
return rc
-
- def get_stream(self, stream_id, port_id, get_pkt = False):
+
+ def __get_stream(self, stream_id, port_id, get_pkt = False):
return self.ports[port_id].get_stream(stream_id)
- def get_all_streams(self, port_id, get_pkt = False):
+ def __get_all_streams(self, port_id, get_pkt = False):
return self.ports[port_id].get_all_streams()
- def get_stream_id_list(self, port_id):
+ def __get_stream_id_list(self, port_id):
return self.ports[port_id].get_stream_id_list()
- def start_traffic (self, multiplier, duration, port_id_list = None):
+ def __start_traffic (self, multiplier, duration, port_id_list = None):
port_id_list = self.__ports(port_id_list)
@@ -608,11 +613,11 @@ class CTRexStatelessClient(object):
for port_id in port_id_list:
rc.add(self.ports[port_id].start(multiplier, duration))
-
+
return rc
- def resume_traffic (self, port_id_list = None, force = False):
+ def __resume_traffic (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
rc = RC()
@@ -622,7 +627,7 @@ class CTRexStatelessClient(object):
return rc
- def pause_traffic (self, port_id_list = None, force = False):
+ def __pause_traffic (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
rc = RC()
@@ -632,280 +637,937 @@ class CTRexStatelessClient(object):
return rc
- def stop_traffic (self, port_id_list = None, force = False):
+
+ def __stop_traffic (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
rc = RC()
for port_id in port_id_list:
rc.add(self.ports[port_id].stop(force))
-
+
return rc
- def update_traffic (self, mult, port_id_list = None, force = False):
+ def __update_traffic (self, mult, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
rc = RC()
for port_id in port_id_list:
rc.add(self.ports[port_id].update(mult))
-
+
return rc
- def validate (self, port_id_list = None):
+ def __validate_traffic (self, port_id_list = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
for port_id in port_id_list:
rc.add(self.ports[port_id].validate())
-
+
return rc
- def get_port_stats(self, port_id=None):
- pass
- def get_stream_stats(self, port_id=None):
- pass
+ # connect to server
+ def __connect(self):
- def transmit(self, method_name, params={}):
- return self.comm_link.transmit(method_name, params)
+ # first disconnect if already connected
+ if self.is_connected():
+ self.__disconnect()
+ # clear this flag
+ self.connected = False
- def transmit_batch(self, batch_list):
- return self.comm_link.transmit_batch(batch_list)
+ # connect sync channel
+ self.logger.pre_cmd("connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port']))
+ rc = self.comm_link.connect()
+ self.logger.post_cmd(rc)
- ######################### Console (high level) API #########################
+ if not rc:
+ return rc
- @timing
- def cmd_ping(self):
- rc = self.ping()
- rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port()))
- return rc
+ # connect async channel
+ self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port']))
+ rc = self.async_client.connect()
+ self.logger.post_cmd(rc)
- def cmd_connect(self, mode = "RW"):
- rc = self.connect(mode)
- rc.annotate()
- return rc
+ if not rc:
+ return rc
- def cmd_disconnect(self):
- rc = self.disconnect()
- rc.annotate()
- return rc
+ # version
+ rc = self._transmit("get_version")
+ if not rc:
+ return rc
- # reset
- def cmd_reset(self):
- #self.release(self.get_acquired_ports())
+ self.server_version = rc.data()
+ self.global_stats.server_version = rc.data()
- rc = self.acquire(force = True)
- rc.annotate("Force acquiring all ports:")
- if rc.bad():
+ # cache system info
+ rc = self._transmit("get_system_info")
+ if not rc:
return rc
+ self.system_info = rc.data()
- # force stop all ports
- rc = self.stop_traffic(self.get_port_ids(), True)
- rc.annotate("Stop traffic on all ports:")
- if rc.bad():
+ # cache supported commands
+ rc = self._transmit("get_supported_cmds")
+ if not rc:
return rc
+ self.supported_cmds = rc.data()
- # remove all streams
- rc = self.remove_all_streams(self.get_port_ids())
- rc.annotate("Removing all streams from all ports:")
- if rc.bad():
+ # create ports
+ for port_id in xrange(self.system_info["port_count"]):
+ speed = self.system_info['ports'][port_id]['speed']
+ driver = self.system_info['ports'][port_id]['driver']
+
+ self.ports[port_id] = Port(port_id,
+ speed,
+ driver,
+ self.username,
+ self.comm_link,
+ self.session_id)
+
+
+ # sync the ports
+ rc = self.__sync_ports()
+ if not rc:
return rc
- # TODO: clear stats
- return RC_OK()
+ self.connected = True
+ return RC_OK()
- # stop cmd
- def cmd_stop (self, port_id_list):
- # find the relveant ports
- active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+ # disconenct from server
+ def __disconnect(self):
+ # release any previous acquired ports
+ if self.is_connected():
+ self.__release(self.get_acquired_ports())
- if not active_ports:
- msg = "No active traffic on provided ports"
- print format_text(msg, 'bold')
- return RC_ERR(msg)
+ self.comm_link.disconnect()
+ self.async_client.disconnect()
- rc = self.stop_traffic(active_ports)
- rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
+ self.connected = False
return RC_OK()
- # update cmd
- def cmd_update (self, port_id_list, mult):
- # find the relevant ports
+ # ping server
+ def __ping (self):
+ return self._transmit("ping")
+
+
+ # start command
+ def __start (self, port_id_list, stream_list, mult, force, duration, dry):
+
active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
- if not active_ports:
- msg = "No active traffic on provided ports"
- print format_text(msg, 'bold')
- return RC_ERR(msg)
+ if active_ports:
+ if not force:
+ msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports)
+ self.logger.log(format_text(msg, 'bold'))
+ return RC_ERR(msg)
+ else:
+ rc = self.__stop(active_ports)
+ if not rc:
+ return rc
+
- rc = self.update_traffic(mult, active_ports)
- rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list))
+ self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(port_id_list))
+ rc = self.__remove_all_streams(port_id_list)
+ self.logger.post_cmd(rc)
- return rc
+ if not rc:
+ return rc
+
- # clear stats
- def cmd_clear(self, port_id_list):
+ self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list))
+ rc = self.__add_stream_pack(stream_list, port_id_list)
+ self.logger.post_cmd(rc)
- for port_id in port_id_list:
- self.ports[port_id].clear_stats()
+ if not rc:
+ return rc
+
+ # when not on dry - start the traffic , otherwise validate only
+ if not dry:
- self.global_stats.clear_stats()
+ self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(port_id_list))
+ rc = self.__start_traffic(mult, duration, port_id_list)
+ self.logger.post_cmd(rc)
- return RC_OK()
+ return rc
+ else:
+ self.logger.pre_cmd("Validating traffic profile on port(s) {0}:".format(port_id_list))
+ rc = self.__validate(port_id_list)
+ self.logger.post_cmd(rc)
+
- def cmd_invalidate (self, port_id_list):
- for port_id in port_id_list:
- self.ports[port_id].invalidate_stats()
+ if rc.bad():
+ return rc
+
+ # show a profile on one port for illustration
+ self.ports[port_id_list[0]].print_profile(mult, duration)
+
+ return rc
- self.global_stats.invalidate()
- return RC_OK()
+ # stop cmd
+ def __stop (self, port_id_list):
- # pause cmd
- def cmd_pause (self, port_id_list):
+ self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(port_id_list))
+ rc = self.__stop_traffic(port_id_list)
+ self.logger.post_cmd(rc)
- # find the relevant ports
- active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+ if not rc:
+ return rc
+
+ return RC_OK()
- if not active_ports:
- msg = "No active traffic on provided ports"
- print format_text(msg, 'bold')
- return RC_ERR(msg)
+ #update cmd
+ def __update (self, port_id_list, mult):
+
+ self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(port_id_list))
+ rc = self.__update_traffic(mult, port_id_list)
+ self.logger.post_cmd(rc)
- rc = self.pause_traffic(active_ports)
- rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list))
return rc
+ # pause cmd
+ def __pause (self, port_id_list):
+
+ self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(port_id_list))
+ rc = self.__pause_traffic(port_id_list)
+ self.logger.post_cmd(rc)
+
+ return rc
+
# resume cmd
- def cmd_resume (self, port_id_list):
+ def __resume (self, port_id_list):
- # find the relveant ports
- active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+ self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(port_id_list))
+ rc = self.__resume_traffic(port_id_list)
+ self.logger.post_cmd(rc)
+
+ return rc
- if not active_ports:
- msg = "No active traffic on porvided ports"
- print format_text(msg, 'bold')
- return RC_ERR(msg)
- rc = self.resume_traffic(active_ports)
- rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list))
+ # validate port(s) profile
+ def __validate (self, port_id_list):
+ self.logger.pre_cmd("Validating streams on port(s) {0}:".format(port_id_list))
+ rc = self.__validate_traffic(port_id_list)
+ self.logger.post_cmd(rc)
+
return rc
- # start cmd
- def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry):
+ # clear stats
+ def __clear_stats(self, port_id_list, clear_global):
- active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+ for port_id in port_id_list:
+ self.ports[port_id].clear_stats()
- if active_ports:
- if not force:
- msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports)
- print format_text(msg, 'bold')
- return RC_ERR(msg)
- else:
- rc = self.cmd_stop(active_ports)
- if not rc:
+ if clear_global:
+ self.global_stats.clear_stats()
+
+ self.logger.pre_cmd("clearing stats on port(s) {0}:".format(port_id_list))
+ rc = RC_OK()
+ self.logger.post_cmd(rc)
+
+ return RC
+
+
+ # get stats
+ def __get_stats (self, port_id_list):
+ stats = {}
+
+ stats['global'] = self.global_stats.get_stats()
+
+ total = {}
+ for port_id in port_id_list:
+ port_stats = self.ports[port_id].get_stats()
+ stats["port {0}".format(port_id)] = port_stats
+
+ for k, v in port_stats.iteritems():
+ if not k in total:
+ total[k] = v
+ else:
+ total[k] += v
+
+ stats['total'] = total
+
+ return stats
+
+
+ def __process_profiles (self, profiles, out):
+
+ for profile in (profiles if isinstance(profiles, list) else [profiles]):
+ # filename
+ if isinstance(profile, str):
+
+ if not os.path.isfile(profile):
+ return RC_ERR("file '{0}' does not exists".format(profile))
+
+ try:
+ stream_list = self.streams_db.load_yaml_file(profile)
+ except Exception as e:
+ rc = RC_ERR(str(e))
return rc
+ out.append(stream_list)
- rc = self.remove_all_streams(port_id_list)
- rc.annotate("Removing all streams from port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
+ else:
+ return RC_ERR("unknown profile '{0}'".format(profile))
- rc = self.add_stream_pack(stream_list, port_id_list)
- rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list))
- if rc.bad():
- return rc
+ return RC_OK()
- # when not on dry - start the traffic , otherwise validate only
- if not dry:
- rc = self.start_traffic(mult, duration, port_id_list)
- rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
- return rc
- else:
- rc = self.validate(port_id_list)
- rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list))
+ ############ functions used by other classes but not users ##############
- if rc.bad():
- return rc
+ def _verify_port_id_list (self, port_id_list):
+ # check arguments
+ if not isinstance(port_id_list, list):
+ return RC_ERR("ports should be an instance of 'list' not {0}".format(type(port_id_list)))
- # show a profile on one port for illustration
- self.ports[port_id_list[0]].print_profile(mult, duration)
+ # all ports are valid ports
+ if not port_id_list or not all([port_id in self.get_all_ports() for port_id in port_id_list]):
+ return RC_ERR("")
- return rc
+ return RC_OK()
+
+ def _validate_port_list(self, port_id_list):
+ if not isinstance(port_id_list, list):
+ return False
+ # check each item of the sequence
+ return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list]))
- # validate port(s) profile
- def cmd_validate (self, port_id_list):
- rc = self.validate(port_id_list)
- rc.annotate("Validating streams on port(s) {0}:".format(port_id_list))
- return rc
+ # transmit request on the RPC link
+ def _transmit(self, method_name, params={}):
+ return self.comm_link.transmit(method_name, params)
+
+ # transmit batch request on the RPC link
+ def _transmit_batch(self, batch_list):
+ return self.comm_link.transmit_batch(batch_list)
+
# stats
- def cmd_stats(self, port_id_list, stats_mask=set()):
+ def _get_formatted_stats(self, port_id_list, stats_mask=set()):
stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask)
stats_obj = {}
for stats_type in stats_opts:
stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type))
+
return stats_obj
- def cmd_streams(self, port_id_list, streams_mask=set()):
+ def _get_streams(self, port_id_list, streams_mask=set()):
streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask)
return streams_obj
- ############## High Level API With Parser ################
+ def _invalidate_stats (self, port_id_list):
+ for port_id in port_id_list:
+ self.ports[port_id].invalidate_stats()
+
+ self.global_stats.invalidate()
+
+ return RC_OK()
+
+
+
+
+
+ #################################
+ # ------ private methods ------ #
+ @staticmethod
+ def __get_mask_keys(ok_values={True}, **kwargs):
+ masked_keys = set()
+ for key, val in kwargs.iteritems():
+ if val in ok_values:
+ masked_keys.add(key)
+ return masked_keys
+
+ @staticmethod
+ def __filter_namespace_args(namespace, ok_values):
+ return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
+
+
+ # API decorator - double wrap because of argument
+ def __api_check(connected = True):
+
+ def wrap (f):
+ def wrap2(*args, **kwargs):
+ client = args[0]
+
+ func_name = f.__name__
+
+ # check connection
+ if connected and not client.is_connected():
+ raise STLStateError(func_name, 'disconnected')
+
+ ret = f(*args, **kwargs)
+ return ret
+ return wrap2
+
+ return wrap
+
+
+
+ ############################ API #############################
+ ############################ #############################
+ ############################ #############################
+ def __enter__ (self):
+ self.connect(mode = "RWF")
+ self.reset()
+ return self
+
+ def __exit__ (self, type, value, traceback):
+ if self.get_active_ports():
+ self.stop(self.get_active_ports())
+ self.disconnect()
+
+ ############################ Getters #############################
+ ############################ #############################
+ ############################ #############################
+
+
+ # return verbose level of the logger
+ def get_verbose (self):
+ return self.logger.get_verbose()
+
+ # is the client on read only mode ?
+ def is_all_ports_acquired (self):
+ return not (self.get_all_ports() == self.get_acquired_ports())
+
+ # is the client connected ?
+ def is_connected (self):
+ return self.connected and self.comm_link.is_connected
+
+
+ # get connection info
+ def get_connection_info (self):
+ return self.connection_info
+
+
+ # get supported commands by the server
+ def get_server_supported_cmds(self):
+ return self.supported_cmds
+
+ # get server version
+ def get_server_version(self):
+ return self.server_version
+
+ # get server system info
+ def get_server_system_info(self):
+ return self.system_info
+
+ # get port count
+ def get_port_count(self):
+ return len(self.ports)
+
+
+ # returns the port object
+ def get_port (self, port_id):
+ port = self.ports.get(port_id, None)
+ if (port != None):
+ return port
+ else:
+ raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports())
+
+
+ # get all ports as IDs
+ def get_all_ports (self):
+ return self.ports.keys()
+
+ # get all acquired ports
+ def get_acquired_ports(self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_acquired()]
+
+ # get all active ports (TX or pause)
+ def get_active_ports(self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_active()]
+
+ # get paused ports
+ def get_paused_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_paused()]
+
+ # get all TX ports
+ def get_transmitting_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_transmitting()]
+
+
+ # get stats
+ def get_stats (self, ports = None, async_barrier = True):
+ # by default use all ports
+ if ports == None:
+ ports = self.get_all_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ # check async barrier
+ if not type(async_barrier) is bool:
+ raise STLArgumentError('async_barrier', async_barrier)
+
+
+ # if the user requested a barrier - use it
+ if async_barrier:
+ rc = self.async_client.barrier()
+ if not rc:
+ raise STLError(rc)
+
+ return self.__get_stats(ports)
+
+ # return all async events
+ def get_events (self):
+ return self.event_handler.get_events()
+
+ ############################ Commands #############################
+ ############################ #############################
+ ############################ #############################
+
+
+ # set the log on verbose level
+ def set_verbose (self, level):
+ self.logger.set_verbose(level)
+
+
+ # connects to the server
+ # mode can be:
+ # 'RO' - read only
+ # 'RW' - read/write
+ # 'RWF' - read write forced (take ownership)
+ @__api_check(False)
+ def connect (self, mode = "RW"):
+ modes = ['RO', 'RW', 'RWF']
+ if not mode in modes:
+ raise STLArgumentError('mode', mode, modes)
+
+ rc = self.__connect()
+ if not rc:
+ raise STLError(rc)
+
+ # acquire all ports for 'RW' or 'RWF'
+ if (mode == "RW") or (mode == "RWF"):
+ self.acquire(ports = self.get_all_ports(), force = True if mode == "RWF" else False)
+
+
+
+
+ # acquire ports
+ # this is not needed if connect was called with "RW" or "RWF"
+ # but for "RO" this might be needed
+ @__api_check(True)
+ def acquire (self, ports = None, force = False):
+ # by default use all ports
+ if ports == None:
+ ports = self.get_all_ports()
+
+ # verify ports
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ # verify valid port id list
+ if force:
+ self.logger.pre_cmd("Force acquiring ports {0}:".format(ports))
+ else:
+ self.logger.pre_cmd("Acquiring ports {0}:".format(ports))
+
+ rc = self.__acquire(ports, force)
+
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ self.__release(ports)
+ raise STLError(rc)
+
+
+
+ # force connect syntatic sugar
+ @__api_check(False)
+ def fconnect (self):
+ self.connect(mode = "RWF")
+
+
+ # disconnects from the server
+ @__api_check(False)
+ def disconnect (self, log = True):
+ rc = self.__disconnect()
+ if log:
+ self.logger.log_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'],
+ self.connection_info['sync_port']))
+ if not rc:
+ raise STLError(rc)
+
+
+
+ # teardown - call after test is done
+ # NEVER throws an exception
+ @__api_check(False)
+ def teardown (self, stop_traffic = True):
+
+ # try to stop traffic
+ if stop_traffic and self.get_active_ports():
+ try:
+ self.stop()
+ except STLError:
+ pass
+
+ # disconnect
+ self.__disconnect()
+
+
+
+ # pings the server on the RPC channel
+ @__api_check(True)
+ def ping(self):
+ self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
+ self.connection_info['sync_port']))
+ rc = self.__ping()
+
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+
+ # reset the server by performing
+ # force acquire, stop, and remove all streams
+ @__api_check(True)
+ def reset(self):
+
+ self.logger.pre_cmd("Force acquiring all ports:")
+ rc = self.__acquire(force = True)
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+ # force stop all ports
+ self.logger.pre_cmd("Stop traffic on all ports:")
+ rc = self.__stop_traffic(self.get_all_ports(), True)
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+ # remove all streams
+ self.logger.pre_cmd("Removing all streams from all ports:")
+ rc = self.__remove_all_streams(self.get_all_ports())
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+ self.clear_stats()
+
+
+ # start cmd
+ @__api_check(True)
+ def start (self,
+ profiles,
+ ports = None,
+ mult = "1",
+ force = False,
+ duration = -1,
+ dry = False,
+ total = False):
+
+
+ # by default use all ports
+ if ports == None:
+ ports = self.get_acquired_ports()
- def cmd_connect_line (self, line):
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ # verify multiplier
+ mult_obj = parsing_opts.decode_multiplier(mult,
+ allow_update = False,
+ divide_count = len(ports) if total else 1)
+ if not mult_obj:
+ raise STLArgumentError('mult', mult)
+
+ # some type checkings
+
+ if not type(force) is bool:
+ raise STLArgumentError('force', force)
+
+ if not isinstance(duration, (int, float)):
+ raise STLArgumentError('duration', duration)
+
+ if not type(total) is bool:
+ raise STLArgumentError('total', total)
+
+
+ # process profiles
+ stream_list = []
+ rc = self.__process_profiles(profiles, stream_list)
+ if not rc:
+ raise STLError(rc)
+
+ # dry run
+ if dry:
+ self.logger.log(format_text("\n*** DRY RUN ***", 'bold'))
+
+ # call private method to start
+
+ rc = self.__start(ports, stream_list[0], mult_obj, force, duration, dry)
+ if not rc:
+ raise STLError(rc)
+
+
+
+ # stop traffic on ports
+ @__api_check(True)
+ def stop (self, ports = None):
+
+ # by default the user means all the active ports
+ if ports == None:
+ ports = self.get_active_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ rc = self.__stop(ports)
+ if not rc:
+ raise STLError(rc)
+
+
+
+ # update traffic
+ @__api_check(True)
+ def update (self, ports = None, mult = "1", total = False):
+
+ # by default the user means all the active ports
+ if ports == None:
+ ports = self.get_active_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ # verify multiplier
+ mult_obj = parsing_opts.decode_multiplier(mult,
+ allow_update = True,
+ divide_count = len(ports) if total else 1)
+ if not mult_obj:
+ raise STLArgumentError('mult', mult)
+
+ # verify total
+ if not type(total) is bool:
+ raise STLArgumentError('total', total)
+
+
+ # call low level functions
+ rc = self.__update(ports, mult_obj)
+ if not rc:
+ raise STLError(rc)
+
+
+
+ # pause traffic on ports
+ @__api_check(True)
+ def pause (self, ports = None):
+
+ # by default the user means all the TX ports
+ if ports == None:
+ ports = self.get_transmitting_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ rc = self.__pause(ports)
+ if not rc:
+ raise STLError(rc)
+
+
+
+ # resume traffic on ports
+ @__api_check(True)
+ def resume (self, ports = None):
+
+ # by default the user means all the paused ports
+ if ports == None:
+ ports = self.get_paused_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ rc = self.__resume(ports)
+ if not rc:
+ raise STLError(rc)
+
+
+ @__api_check(True)
+ def validate (self, ports = None):
+ if ports == None:
+ ports = self.get_acquired_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ rc = self.__validate(ports)
+ if not rc:
+ raise STLError(rc)
+
+
+ # clear stats
+ @__api_check(False)
+ def clear_stats (self, ports = None, clear_global = True):
+
+ # by default use all ports
+ if ports == None:
+ ports = self.get_acquired_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ # verify clear global
+ if not type(clear_global) is bool:
+ raise STLArgumentError('clear_global', clear_global)
+
+
+ rc = self.__clear_stats(ports, clear_global)
+ if not rc:
+ raise STLError(rc)
+
+
+
+
+
+ # wait while traffic is on, on timeout throw STLTimeoutError
+ @__api_check(True)
+ def wait_on_traffic (self, ports = None, timeout = 60):
+
+ # by default use all acquired ports
+ if ports == None:
+ ports = self.get_acquired_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ expr = time.time() + timeout
+
+ # wait while any of the required ports are active
+ while set(self.get_active_ports()).intersection(ports):
+ time.sleep(0.01)
+ if time.time() > expr:
+ raise STLTimeoutError(timeout)
+
+
+ # clear all async events
+ def clear_events (self):
+ self.event_handler.clear_events()
+
+ ############################ Line #############################
+ ############################ Commands #############################
+ ############################ #############################
+ # console decorator
+ def __console(f):
+ def wrap(*args):
+ client = args[0]
+
+ time1 = time.time()
+
+ try:
+ rc = f(*args)
+ except STLError as e:
+ client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold'))
+ return
+
+ # if got true - print time
+ if rc:
+ delta = time.time() - time1
+ client.logger.log(format_time(delta) + "\n")
+
+
+ return wrap
+
+
+ @__console
+ def connect_line (self, line):
'''Connects to the TRex server'''
# define a parser
parser = parsing_opts.gen_parser(self,
"connect",
- self.cmd_connect_line.__doc__,
+ self.connect_line.__doc__,
parsing_opts.FORCE)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line parameters")
+ return
- if opts.force:
- rc = self.cmd_connect(mode = "RWF")
- else:
- rc = self.cmd_connect(mode = "RW")
+ # call the API
+ self.connect("RWF" if opts.force else "RW")
- @timing
- def cmd_start_line (self, line):
+ # true means print time
+ return True
+
+ @__console
+ def disconnect_line (self, line):
+ self.disconnect()
+
+
+
+ @__console
+ def reset_line (self, line):
+ self.reset()
+
+ # true means print time
+ return True
+
+
+ @__console
+ def start_line (self, line):
'''Start selected traffic in specified ports on TRex\n'''
# define a parser
parser = parsing_opts.gen_parser(self,
"start",
- self.cmd_start_line.__doc__,
+ self.start_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.TOTAL,
parsing_opts.FORCE,
@@ -918,361 +1580,220 @@ class CTRexStatelessClient(object):
if opts is None:
- return RC_ERR("bad command line parameters")
-
-
- if opts.dry:
- print format_text("\n*** DRY RUN ***", 'bold')
-
- if opts.db:
- stream_list = self.streams_db.get_stream_pack(opts.db)
- rc = RC(stream_list != None)
- rc.annotate("Load stream pack (from DB):")
- if rc.bad():
- return RC_ERR("Failed to load stream pack")
+ return
- else:
- # load streams from file
- stream_list = None
- try:
- stream_list = self.streams_db.load_yaml_file(opts.file[0])
- except Exception as e:
- s = str(e)
- rc=RC_ERR(s)
- rc.annotate()
- return rc
+ # pack the profile
+ profiles = [opts.file[0]]
- rc = RC(stream_list != None)
- rc.annotate("Load stream pack (from file):")
- if stream_list == None:
- return RC_ERR("Failed to load stream pack")
+ self.start(profiles,
+ opts.ports,
+ opts.mult,
+ opts.force,
+ opts.duration,
+ opts.dry,
+ opts.total)
+ # true means print time
+ return True
- # total has no meaning with percentage - its linear
- if opts.total and (opts.mult['type'] != 'percentage'):
- # if total was set - divide it between the ports
- opts.mult['value'] = opts.mult['value'] / len(opts.ports)
- return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry)
- @timing
- def cmd_resume_line (self, line):
- '''Resume active traffic in specified ports on TRex\n'''
+ @__console
+ def stop_line (self, line):
+ '''Stop active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
- "resume",
- self.cmd_stop_line.__doc__,
+ "stop",
+ self.stop_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line parameters")
+ return
+
+ # find the relevant ports
+ ports = list(set(self.get_active_ports()).intersection(opts.ports))
- return self.cmd_resume(opts.ports)
+ if not ports:
+ self.logger.log(format_text("No active traffic on provided ports\n", 'bold'))
+ return
+ self.stop(ports)
- @timing
- def cmd_stop_line (self, line):
- '''Stop active traffic in specified ports on TRex\n'''
+ # true means print time
+ return True
+
+
+ @__console
+ def update_line (self, line):
+ '''Update port(s) speed currently active\n'''
parser = parsing_opts.gen_parser(self,
- "stop",
- self.cmd_stop_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
+ "update",
+ self.update_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL,
+ parsing_opts.MULTIPLIER,
+ parsing_opts.TOTAL)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line parameters")
+ return
+
+ # find the relevant ports
+ ports = list(set(self.get_active_ports()).intersection(opts.ports))
+
+ if not ports:
+ self.logger.log(format_text("No ports in valid state to update\n", 'bold'))
+ return
+
+ self.update(ports, opts.mult, opts.total)
- return self.cmd_stop(opts.ports)
+ # true means print time
+ return True
- @timing
- def cmd_pause_line (self, line):
+ @__console
+ def pause_line (self, line):
'''Pause active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
"pause",
- self.cmd_stop_line.__doc__,
+ self.pause_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line parameters")
+ return
+
+ # find the relevant ports
+ ports = list(set(self.get_transmitting_ports()).intersection(opts.ports))
- return self.cmd_pause(opts.ports)
+ if not ports:
+ self.logger.log(format_text("No ports in valid state to pause\n", 'bold'))
+ return
+ self.pause(ports)
- @timing
- def cmd_update_line (self, line):
- '''Update port(s) speed currently active\n'''
+ # true means print time
+ return True
+
+
+ @__console
+ def resume_line (self, line):
+ '''Resume active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
- "update",
- self.cmd_update_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL,
- parsing_opts.MULTIPLIER,
- parsing_opts.TOTAL)
+ "resume",
+ self.resume_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line paramters")
+ return
- # total has no meaning with percentage - its linear
- if opts.total and (opts.mult['type'] != 'percentage'):
- # if total was set - divide it between the ports
- opts.mult['value'] = opts.mult['value'] / len(opts.ports)
+ # find the relevant ports
+ ports = list(set(self.get_paused_ports()).intersection(opts.ports))
- return self.cmd_update(opts.ports, opts.mult)
+ if not ports:
+ self.logger.log(format_text("No ports in valid state to resume\n", 'bold'))
+ return
- @timing
- def cmd_reset_line (self, line):
- return self.cmd_reset()
+ return self.resume(ports)
+ # true means print time
+ return True
- def cmd_clear_line (self, line):
+
+ @__console
+ def clear_stats_line (self, line):
'''Clear cached local statistics\n'''
# define a parser
parser = parsing_opts.gen_parser(self,
"clear",
- self.cmd_clear_line.__doc__,
+ self.clear_stats_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line parameters")
+ return
+
+ self.clear_stats(opts.ports)
- return self.cmd_clear(opts.ports)
- def cmd_stats_line (self, line):
+
+ @__console
+ def show_stats_line (self, line):
'''Fetch statistics from TRex server by port\n'''
# define a parser
parser = parsing_opts.gen_parser(self,
"stats",
- self.cmd_stats_line.__doc__,
+ self.show_stats_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.STATS_MASK)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line parameters")
+ return
# determine stats mask
- mask = self._get_mask_keys(**self._filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS))
+ mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS))
if not mask:
# set to show all stats if no filter was given
mask = trex_stats.ALL_STATS_OPTS
- stats = self.cmd_stats(opts.ports, mask)
+ stats_opts = trex_stats.ALL_STATS_OPTS.intersection(mask)
+
+ stats = self._get_formatted_stats(opts.ports, mask)
+
# print stats to screen
for stat_type, stat_data in stats.iteritems():
text_tables.print_table_with_header(stat_data.text_table, stat_type)
- return RC_OK()
- def cmd_streams_line(self, line):
+ @__console
+ def show_streams_line(self, line):
'''Fetch streams statistics from TRex server by port\n'''
# define a parser
parser = parsing_opts.gen_parser(self,
"streams",
- self.cmd_streams_line.__doc__,
+ self.show_streams_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
- parsing_opts.STREAMS_MASK)#,
- #parsing_opts.FULL_OUTPUT)
+ parsing_opts.STREAMS_MASK)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line parameters")
+ return
- streams = self.cmd_streams(opts.ports, set(opts.streams))
+ streams = self._get_streams(opts.ports, set(opts.streams))
if not streams:
- # we got no streams running
+ self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta"))
- print format_text("No streams found with desired filter.\n", "bold", "magenta")
- return RC_ERR("No streams found with desired filter.")
else:
# print stats to screen
for stream_hdr, port_streams_data in streams.iteritems():
text_tables.print_table_with_header(port_streams_data.text_table,
header= stream_hdr.split(":")[0] + ":",
untouched_header= stream_hdr.split(":")[1])
- return RC_OK()
- @timing
- def cmd_validate_line (self, line):
+ @__console
+ def validate_line (self, line):
'''validates port(s) stream configuration\n'''
parser = parsing_opts.gen_parser(self,
"validate",
- self.cmd_validate_line.__doc__,
+ self.validate_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line paramters")
-
- rc = self.cmd_validate(opts.ports)
- return rc
-
-
- def cmd_exit_line (self, line):
- print format_text("Exiting\n", 'bold')
- # a way to exit
- return RC_ERR("exit")
-
-
- def cmd_wait_line (self, line):
- '''wait for a period of time\n'''
-
- parser = parsing_opts.gen_parser(self,
- "wait",
- self.cmd_wait_line.__doc__,
- parsing_opts.DURATION)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return RC_ERR("bad command line parameters")
-
- delay_sec = opts.duration if (opts.duration > 0) else 1
-
- print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold')
- time.sleep(delay_sec)
-
- return RC_OK()
-
- # run a script of commands
- def run_script_file (self, filename):
-
- print format_text("\nRunning script file '{0}'...".format(filename), 'bold')
-
- rc = self.cmd_connect()
- if rc.bad():
return
- with open(filename) as f:
- script_lines = f.readlines()
-
- cmd_table = {}
-
- # register all the commands
- cmd_table['start'] = self.cmd_start_line
- cmd_table['stop'] = self.cmd_stop_line
- cmd_table['reset'] = self.cmd_reset_line
- cmd_table['wait'] = self.cmd_wait_line
- cmd_table['exit'] = self.cmd_exit_line
-
- for index, line in enumerate(script_lines, start = 1):
- line = line.strip()
- if line == "":
- continue
- if line.startswith("#"):
- continue
-
- sp = line.split(' ', 1)
- cmd = sp[0]
- if len(sp) == 2:
- args = sp[1]
- else:
- args = ""
-
- print format_text("Executing line {0} : '{1}'\n".format(index, line))
-
- if not cmd in cmd_table:
- print "\n*** Error at line {0} : '{1}'\n".format(index, line)
- print format_text("unknown command '{0}'\n".format(cmd), 'bold')
- return False
-
- rc = cmd_table[cmd](args)
- if rc.bad():
- return False
-
- print format_text("\n[Done]", 'bold')
-
- return True
-
-
- #################################
- # ------ private methods ------ #
- @staticmethod
- def _get_mask_keys(ok_values={True}, **kwargs):
- masked_keys = set()
- for key, val in kwargs.iteritems():
- if val in ok_values:
- masked_keys.add(key)
- return masked_keys
-
- @staticmethod
- def _filter_namespace_args(namespace, ok_values):
- return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
-
-
- #################################
- # ------ private classes ------ #
- class CCommLink(object):
- """describes the connectivity of the stateless client method"""
- def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None):
- super(CTRexStatelessClient.CCommLink, self).__init__()
- self.virtual = virtual
- self.server = server
- self.port = port
- self.verbose = False
- self.rpc_link = JsonRpcClient(self.server, self.port, prn_func)
-
- @property
- def is_connected(self):
- if not self.virtual:
- return self.rpc_link.connected
- else:
- return True
-
- def get_server (self):
- return self.server
-
- def get_port (self):
- return self.port
-
- def set_verbose(self, mode):
- self.verbose = mode
- return self.rpc_link.set_verbose(mode)
-
- def connect(self):
- if not self.virtual:
- return self.rpc_link.connect()
-
- def disconnect(self):
- if not self.virtual:
- return self.rpc_link.disconnect()
-
- def transmit(self, method_name, params={}):
- if self.virtual:
- self._prompt_virtual_tx_msg()
- _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params)
- print msg
- return
- else:
- return self.rpc_link.invoke_rpc_method(method_name, params)
-
- def transmit_batch(self, batch_list):
- if self.virtual:
- self._prompt_virtual_tx_msg()
- print [msg
- for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params)
- for command in batch_list]]
- else:
- batch = self.rpc_link.create_batch()
- for command in batch_list:
- batch.add(command.method, command.params)
- # invoke the batch
- return batch.invoke()
+ self.validate(opts.ports)
- def _prompt_virtual_tx_msg(self):
- print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server,
- port=self.port)
-if __name__ == "__main__":
- pass
+ \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index bdae7bd9..05a32bc4 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -42,8 +42,8 @@ class BatchMessage(object):
# JSON RPC v2.0 client
class JsonRpcClient(object):
- def __init__ (self, default_server, default_port, prn_func = None):
- self.verbose = False
+ def __init__ (self, default_server, default_port, logger):
+ self.logger = logger
self.connected = False
# default values
@@ -51,7 +51,6 @@ class JsonRpcClient(object):
self.server = default_server
self.id_gen = general_utils.random_id_gen()
- self.prn_func = prn_func
def get_connection_details (self):
rc = {}
@@ -82,10 +81,7 @@ class JsonRpcClient(object):
return pretty_str
def verbose_msg (self, msg):
- if not self.verbose:
- return
-
- print "[verbose] " + msg
+ self.logger.log("[verbose] " + msg, level = self.logger.VERBOSE_HIGH)
# batch messages
@@ -128,7 +124,7 @@ class JsonRpcClient(object):
break
except zmq.Again:
tries += 1
- if tries > 10:
+ if tries > 5:
self.disconnect()
return RC_ERR("*** [RPC] - Failed to send message to server")
@@ -140,9 +136,9 @@ class JsonRpcClient(object):
break
except zmq.Again:
tries += 1
- if tries > 10:
+ if tries > 5:
self.disconnect()
- return RC_ERR("*** [RPC] - Failed to get server response")
+ return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport))
self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
@@ -177,16 +173,14 @@ class JsonRpcClient(object):
else:
return RC_ERR(response_json["error"]["message"])
+
# if no error there should be a result
if ("result" not in response_json):
return RC_ERR("Malformed Response ({0})".format(str(response_json)))
return RC_OK(response_json["result"])
-
- def set_verbose(self, mode):
- self.verbose = mode
def disconnect (self):
if self.connected:
@@ -198,7 +192,7 @@ class JsonRpcClient(object):
return RC_ERR("Not connected to server")
- def connect(self, server = None, port = None, prn_func = None):
+ def connect(self, server = None, port = None):
if self.connected:
self.disconnect()
@@ -210,12 +204,6 @@ class JsonRpcClient(object):
# Socket to talk to server
self.transport = "tcp://{0}:{1}".format(self.server, self.port)
- msg = "\nConnecting To RPC Server On {0}".format(self.transport)
- if self.prn_func:
- self.prn_func(msg)
- else:
- print msg
-
self.socket = self.context.socket(zmq.REQ)
try:
self.socket.connect(self.transport)
@@ -245,7 +233,7 @@ class JsonRpcClient(object):
return self.connected
def __del__(self):
- print "Shutting down RPC client\n"
+ self.logger.log("Shutting down RPC client\n")
if hasattr(self, "context"):
self.context.destroy(linger=0)
diff --git a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py
index 3735a45b..ba60c191 100755
--- a/scripts/automation/trex_control_plane/client_utils/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/client_utils/parsing_opts.py
@@ -69,10 +69,19 @@ match_multiplier_help = """Multiplier should be passed in the following format:
will provide a percentage of the line rate. examples
: '-m 10', '-m 10kbps', '-m 10mpps', '-m 23%%' """
-def match_multiplier_common(val, strict_abs = True):
- # on strict absolute we do not allow +/-
- if strict_abs:
+# decodes multiplier
+# if allow_update - no +/- is allowed
+# divide states between how many entities the
+# value should be divided
+def decode_multiplier(val, allow_update = False, divide_count = 1):
+
+ # must be string
+ if not isinstance(val, str):
+ return None
+
+ # do we allow updates ? +/-
+ if not allow_update:
match = re.match("^(\d+(\.\d+)?)(bps|kbps|mbps|gbps|pps|kpps|mpps|%?)$", val)
op = None
else:
@@ -136,19 +145,32 @@ def match_multiplier_common(val, strict_abs = True):
else:
result['op'] = "abs"
+ if result['op'] != 'percentage':
+ result['value'] = result['value'] / divide_count
+
return result
else:
- raise argparse.ArgumentTypeError(match_multiplier_help)
+ return None
def match_multiplier(val):
'''match some val against multiplier shortcut inputs '''
- return match_multiplier_common(val, strict_abs = False)
+ result = decode_multiplier(val, allow_update = True)
+ if not result:
+ raise argparse.ArgumentTypeError(match_multiplier_help)
+
+ return val
+
def match_multiplier_strict(val):
'''match some val against multiplier shortcut inputs '''
- return match_multiplier_common(val, strict_abs = True)
+ result = decode_multiplier(val, allow_update = False)
+ if not result:
+ raise argparse.ArgumentTypeError(match_multiplier_help)
+
+ return val
+
def is_valid_file(filename):
if not os.path.isfile(filename):
@@ -230,6 +252,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
'default': False,
'help': "Starts TUI in xterm window"}),
+
FULL_OUTPUT: ArgumentPack(['--full'],
{'action': 'store_true',
'help': "Prompt full info in a JSON format"}),
@@ -284,12 +307,12 @@ class CCmdArgParser(argparse.ArgumentParser):
# if all ports are marked or
if (getattr(opts, "all_ports", None) == True) or (getattr(opts, "ports", None) == []):
- opts.ports = self.stateless_client.get_port_ids()
+ opts.ports = self.stateless_client.get_all_ports()
# so maybe we have ports configured
- elif (getattr(opts, "ports", None) == []):
+ elif getattr(opts, "ports", None):
for port in opts.ports:
- if not self.stateless_client.validate_port_list([port]):
+ if not self.stateless_client._validate_port_list([port]):
self.error("port id '{0}' is not a valid port id\n".format(port))
return opts
diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py
index a6add4ac..3f64310f 100755
--- a/scripts/automation/trex_control_plane/common/trex_stats.py
+++ b/scripts/automation/trex_control_plane/common/trex_stats.py
@@ -59,7 +59,7 @@ def calculate_diff_raw (samples):
class CTRexInfoGenerator(object):
"""
This object is responsible of generating stats and information from objects maintained at
- CTRexStatelessClient and the ports.
+ STLClient and the ports.
"""
def __init__(self, global_stats_ref, ports_dict_ref):
@@ -260,7 +260,7 @@ class CTRexStats(object):
def __init__(self):
self.reference_stats = None
- self.latest_stats = {}
+ self.latest_stats = None
self.last_update_ts = time.time()
self.history = deque(maxlen = 10)
@@ -314,9 +314,11 @@ class CTRexStats(object):
self.last_update_ts = time.time()
+
def clear_stats(self):
self.reference_stats = self.latest_stats
+
def invalidate (self):
self.latest_stats = {}
@@ -333,6 +335,10 @@ class CTRexStats(object):
return "N/A"
if not format:
+ if not field in self.reference_stats:
+ print "REF: " + str(self.reference_stats)
+ print "BASE: " + str(self.latest_stats)
+
return (self.latest_stats[field] - self.reference_stats[field])
else:
return format_num(self.latest_stats[field] - self.reference_stats[field], suffix)
@@ -399,6 +405,24 @@ class CGlobalStats(CTRexStats):
self.server_version = server_version
self._ports_dict = ports_dict_ref
+ def get_stats (self):
+ stats = {}
+
+ # absolute
+ stats['cpu_util'] = self.get("m_cpu_util")
+ stats['tx_bps'] = self.get("m_tx_bps")
+ stats['tx_pps'] = self.get("m_tx_pps")
+
+ stats['rx_bps'] = self.get("m_rx_bps")
+ stats['rx_pps'] = self.get("m_rx_pps")
+ stats['rx_drop_bps'] = self.get("m_rx_drop_bps")
+
+ # relatives
+ stats['queue_full'] = self.get_rel("m_total_queue_full")
+
+ return stats
+
+
def generate_stats(self):
return OrderedDict([("connection", "{host}, Port {port}".format(host=self.connection_info.get("server"),
port=self.connection_info.get("sync_port"))),
@@ -453,6 +477,9 @@ class CPortStats(CTRexStats):
raise TypeError("cannot add non stats object to stats")
# main stats
+ if not self.latest_stats:
+ self.latest_stats = {}
+
self.__merge_dicts(self.latest_stats, x.latest_stats)
# reference stats
@@ -471,6 +498,23 @@ class CPortStats(CTRexStats):
return self
+ # for port we need to do something smarter
+ def get_stats (self):
+ stats = {}
+
+ stats['opackets'] = self.get_rel("opackets")
+ stats['ipackets'] = self.get_rel("ipackets")
+ stats['obytes'] = self.get_rel("obytes")
+ stats['ibytes'] = self.get_rel("ibytes")
+ stats['oerrors'] = self.get_rel("oerrors")
+ stats['ierrors'] = self.get_rel("ierrors")
+ stats['tx_bps'] = self.get("m_total_tx_bps")
+ stats['tx_pps'] = self.get("m_total_tx_pps")
+ stats['rx_bps'] = self.get("m_total_rx_bps")
+ stats['rx_pps'] = self.get("m_total_rx_pps")
+
+ return stats
+
def generate_stats(self):
diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py
index 800b6d49..ea3d71d1 100755
--- a/scripts/automation/trex_control_plane/common/trex_streams.py
+++ b/scripts/automation/trex_control_plane/common/trex_streams.py
@@ -210,7 +210,6 @@ class CStream(object):
setattr(self, k, kwargs[k])
# TODO: load to _pkt_bld_obj also when passed as byte array!
elif isinstance(binary, str) and binary.endswith(".pcap"):
- # self.load_packet_from_pcap(binary, kwargs[k]["meta"])
self._pkt_bld_obj.load_packet_from_pcap(binary)
self._pkt_bld_obj.metadata = kwargs[k]["meta"]
self.packet = self._pkt_bld_obj.dump_pkt()
diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py
index 7c3f04c5..a7ddacea 100644
--- a/scripts/automation/trex_control_plane/common/trex_types.py
+++ b/scripts/automation/trex_control_plane/common/trex_types.py
@@ -14,12 +14,16 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
# simple class to represent complex return value
class RC():
- def __init__ (self, rc = None, data = None):
+ def __init__ (self, rc = None, data = None, is_warn = False):
self.rc_list = []
- if (rc != None) and (data != None):
- tuple_rc = namedtuple('RC', ['rc', 'data'])
- self.rc_list.append(tuple_rc(rc, data))
+ if (rc != None):
+ tuple_rc = namedtuple('RC', ['rc', 'data', 'is_warn'])
+ self.rc_list.append(tuple_rc(rc, data, is_warn))
+
+ def __nonzero__ (self):
+ return self.good()
+
def add (self, rc):
self.rc_list += rc.rc_list
@@ -30,39 +34,62 @@ class RC():
def bad (self):
return not self.good()
+ def warn (self):
+ return any([x.is_warn for x in self.rc_list])
+
def data (self):
d = [x.data if x.rc else "" for x in self.rc_list]
- return (d if len(d) > 1 else d[0])
+ return (d if len(d) != 1 else d[0])
def err (self):
e = [x.data if not x.rc else "" for x in self.rc_list]
- return (e if len(e) > 1 else e[0])
+ return (e if len(e) != 1 else e[0])
+
+ def __str__ (self):
+ s = ""
+ for x in self.rc_list:
+ if x.data:
+ s += format_text("\n{0}".format(x.data), 'bold')
+ return s
+
+ def prn_func (self, msg, newline = True):
+ if newline:
+ print msg
+ else:
+ print msg,
+
+ def annotate (self, log_func = None, desc = None, show_status = True):
+
+ if not log_func:
+ log_func = self.prn_func
- def annotate (self, desc = None, show_status = True):
if desc:
- print format_text('\n{:<60}'.format(desc), 'bold'),
+ log_func(format_text('\n{:<60}'.format(desc), 'bold'), newline = False)
else:
- print ""
+ log_func("")
if self.bad():
# print all the errors
print ""
for x in self.rc_list:
if not x.rc:
- print format_text("\n{0}".format(x.data), 'bold')
+ log_func(format_text("\n{0}".format(x.data), 'bold'))
print ""
if show_status:
- print format_text("[FAILED]\n", 'red', 'bold')
+ log_func(format_text("[FAILED]\n", 'red', 'bold'))
else:
if show_status:
- print format_text("[SUCCESS]\n", 'green', 'bold')
+ log_func(format_text("[SUCCESS]\n", 'green', 'bold'))
def RC_OK(data = ""):
return RC(True, data)
+
def RC_ERR (err):
return RC(False, err)
+def RC_WARN (warn):
+ return RC(True, warn, is_warn = True)
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index 2672665c..88ff45dc 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -29,7 +29,7 @@ import sys
import tty, termios
import trex_root_path
from common.trex_streams import *
-from client.trex_stateless_client import CTRexStatelessClient
+from client.trex_stateless_client import STLClient, LoggerApi, STLError
from common.text_opts import *
from client_utils.general_utils import user_input, get_current_user
from client_utils import parsing_opts
@@ -39,6 +39,28 @@ from functools import wraps
__version__ = "1.1"
+# console custom logger
+class ConsoleLogger(LoggerApi):
+ def __init__ (self):
+ self.prompt_redraw = None
+
+ def write (self, msg, newline = True):
+ if newline:
+ print msg
+ else:
+ print msg,
+
+ def flush (self):
+ sys.stdout.flush()
+
+ # override this for the prompt fix
+ def async_log (self, msg, level = LoggerApi.VERBOSE_REGULAR, newline = True):
+ self.log(msg, level, newline)
+ if self.prompt_redraw:
+ self.prompt_redraw()
+ self.flush()
+
+
def set_window_always_on_top (title):
# we need the GDK module, if not available - ignroe this command
try:
@@ -133,9 +155,9 @@ class TRexGeneralCmd(cmd.Cmd):
class TRexConsole(TRexGeneralCmd):
"""Trex Console"""
- def __init__(self, stateless_client, verbose=False):
+ def __init__(self, stateless_client, verbose = False):
+
self.stateless_client = stateless_client
- self.stateless_client.set_prompt_redraw_cb(self.prompt_redraw)
TRexGeneralCmd.__init__(self)
@@ -153,7 +175,10 @@ class TRexConsole(TRexGeneralCmd):
################### internal section ########################
def prompt_redraw (self):
- sys.stdout.write(self.prompt + readline.get_line_buffer())
+ self.postcmd(False, "")
+ sys.stdout.write("\n" + self.prompt + readline.get_line_buffer())
+ sys.stdout.flush()
+
def verify_connected(f):
@wraps(f)
@@ -185,7 +210,7 @@ class TRexConsole(TRexGeneralCmd):
print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold')
return
- if inst.stateless_client.is_read_only():
+ if inst.stateless_client.is_all_ports_acquired():
print format_text("\n'{0}' cannot be executed on read only mode\n".format(func_name), 'bold')
return
@@ -197,7 +222,7 @@ class TRexConsole(TRexGeneralCmd):
def get_console_identifier(self):
return "{context}_{server}".format(context=self.__class__.__name__,
- server=self.stateless_client.get_server_ip())
+ server=self.stateless_client.get_connection_info()['server'])
def register_main_console_methods(self):
main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__)))
@@ -229,7 +254,7 @@ class TRexConsole(TRexGeneralCmd):
self.supported_rpc = None
return stop
- if self.stateless_client.is_read_only():
+ if self.stateless_client.is_all_ports_acquired():
self.prompt = "TRex (read only) > "
return stop
@@ -264,44 +289,12 @@ class TRexConsole(TRexGeneralCmd):
return targets
- # annotation method
- @staticmethod
- def annotate (desc, rc = None, err_log = None, ext_err_msg = None):
- print format_text('\n{:<40}'.format(desc), 'bold'),
- if rc == None:
- print "\n"
- return
-
- if rc == False:
- # do we have a complex log object ?
- if isinstance(err_log, list):
- print ""
- for func in err_log:
- if func:
- print func
- print ""
-
- elif isinstance(err_log, str):
- print "\n" + err_log + "\n"
-
- print format_text("[FAILED]\n", 'red', 'bold')
- if ext_err_msg:
- print format_text(ext_err_msg + "\n", 'blue', 'bold')
-
- return False
-
- else:
- print format_text("[SUCCESS]\n", 'green', 'bold')
- return True
-
####################### shell commands #######################
@verify_connected
def do_ping (self, line):
'''Ping the server\n'''
- rc = self.stateless_client.cmd_ping()
- if rc.bad():
- return
+ self.stateless_client.ping()
# set verbose on / off
@@ -312,12 +305,12 @@ class TRexConsole(TRexGeneralCmd):
elif line == "on":
self.verbose = True
- self.stateless_client.set_verbose(self.stateless_client.VERBOSE_HIGH)
+ self.stateless_client.set_verbose(self.stateless_client.logger.VERBOSE_HIGH)
print format_text("\nverbose set to on\n", 'green', 'bold')
elif line == "off":
self.verbose = False
- self.stateless_client.set_verbose(self.stateless_client.VERBOSE_REGULAR)
+ self.stateless_client.set_verbose(self.stateless_client.logger.VERBOSE_REGULAR)
print format_text("\nverbose set to off\n", 'green', 'bold')
else:
@@ -361,13 +354,13 @@ class TRexConsole(TRexGeneralCmd):
def do_connect (self, line):
'''Connects to the server\n'''
- self.stateless_client.cmd_connect_line(line)
+ self.stateless_client.connect_line(line)
def do_disconnect (self, line):
'''Disconnect from the server\n'''
- self.stateless_client.cmd_disconnect()
+ self.stateless_client.disconnect_line(line)
############### start
@@ -388,7 +381,7 @@ class TRexConsole(TRexGeneralCmd):
def do_start(self, line):
'''Start selected traffic in specified port(s) on TRex\n'''
- self.stateless_client.cmd_start_line(line)
+ self.stateless_client.start_line(line)
@@ -401,7 +394,7 @@ class TRexConsole(TRexGeneralCmd):
def do_stop(self, line):
'''stops port(s) transmitting traffic\n'''
- self.stateless_client.cmd_stop_line(line)
+ self.stateless_client.stop_line(line)
def help_stop(self):
self.do_stop("-h")
@@ -411,7 +404,7 @@ class TRexConsole(TRexGeneralCmd):
def do_update(self, line):
'''update speed of port(s)currently transmitting traffic\n'''
- self.stateless_client.cmd_update_line(line)
+ self.stateless_client.update_line(line)
def help_update (self):
self.do_update("-h")
@@ -421,14 +414,14 @@ class TRexConsole(TRexGeneralCmd):
def do_pause(self, line):
'''pause port(s) transmitting traffic\n'''
- self.stateless_client.cmd_pause_line(line)
+ self.stateless_client.pause_line(line)
############# resume
@verify_connected_and_rw
def do_resume(self, line):
'''resume port(s) transmitting traffic\n'''
- self.stateless_client.cmd_resume_line(line)
+ self.stateless_client.resume_line(line)
@@ -436,7 +429,7 @@ class TRexConsole(TRexGeneralCmd):
@verify_connected_and_rw
def do_reset (self, line):
'''force stop all ports\n'''
- self.stateless_client.cmd_reset_line(line)
+ self.stateless_client.reset_line(line)
######### validate
@@ -444,13 +437,13 @@ class TRexConsole(TRexGeneralCmd):
def do_validate (self, line):
'''validates port(s) stream configuration\n'''
- self.stateless_client.cmd_validate_line(line)
+ self.stateless_client.validate_line(line)
@verify_connected
def do_stats(self, line):
'''Fetch statistics from TRex server by port\n'''
- self.stateless_client.cmd_stats_line(line)
+ self.stateless_client.show_stats_line(line)
def help_stats(self):
@@ -459,7 +452,7 @@ class TRexConsole(TRexGeneralCmd):
@verify_connected
def do_streams(self, line):
'''Fetch statistics from TRex server by port\n'''
- self.stateless_client.cmd_streams_line(line)
+ self.stateless_client.show_streams_line(line)
def help_streams(self):
@@ -468,7 +461,7 @@ class TRexConsole(TRexGeneralCmd):
@verify_connected
def do_clear(self, line):
'''Clear cached local statistics\n'''
- self.stateless_client.cmd_clear_line(line)
+ self.stateless_client.clear_stats_line(line)
def help_clear(self):
@@ -520,20 +513,17 @@ class TRexConsole(TRexGeneralCmd):
if opts.xterm:
- exe = './trex-console -t -q -s {0} -p {1}'.format(self.stateless_client.get_server_ip(), self.stateless_client.get_server_port())
+ info = self.stateless_client.get_connection_info()
+
+ exe = './trex-console -t -q -s {0} -p {1} --async_port {2}'.format(info['server'], info['sync_port'], info['async_port'])
cmd = ['xterm', '-geometry', '111x42', '-sl', '0', '-title', 'trex_tui', '-e', exe]
self.terminal = subprocess.Popen(cmd)
return
- set_window_always_on_top('trex_tui')
-
- save_verbose = self.stateless_client.get_verbose()
-
- self.stateless_client.set_verbose(self.stateless_client.VERBOSE_QUIET)
- self.tui.show()
- self.stateless_client.set_verbose(save_verbose)
+ with self.stateless_client.logger.supress():
+ self.tui.show()
def help_tui (self):
@@ -605,6 +595,49 @@ class TRexConsole(TRexGeneralCmd):
do_h = do_history
+# run a script of commands
+def run_script_file (self, filename, stateless_client):
+
+ self.logger.log(format_text("\nRunning script file '{0}'...".format(filename), 'bold'))
+
+ with open(filename) as f:
+ script_lines = f.readlines()
+
+ cmd_table = {}
+
+ # register all the commands
+ cmd_table['start'] = stateless_client.start_line
+ cmd_table['stop'] = stateless_client.stop_line
+ cmd_table['reset'] = stateless_client.reset_line
+
+ for index, line in enumerate(script_lines, start = 1):
+ line = line.strip()
+ if line == "":
+ continue
+ if line.startswith("#"):
+ continue
+
+ sp = line.split(' ', 1)
+ cmd = sp[0]
+ if len(sp) == 2:
+ args = sp[1]
+ else:
+ args = ""
+
+ stateless_client.logger.log(format_text("Executing line {0} : '{1}'\n".format(index, line)))
+
+ if not cmd in cmd_table:
+ print "\n*** Error at line {0} : '{1}'\n".format(index, line)
+ stateless_client.logger.log(format_text("unknown command '{0}'\n".format(cmd), 'bold'))
+ return False
+
+ cmd_table[cmd](args)
+
+ stateless_client.logger.log(format_text("\n[Done]", 'bold'))
+
+ return True
+
+
#
def is_valid_file(filename):
if not os.path.isfile(filename):
@@ -613,6 +646,7 @@ def is_valid_file(filename):
return filename
+
def setParserOptions():
parser = argparse.ArgumentParser(prog="trex_console.py")
@@ -633,7 +667,7 @@ def setParserOptions():
default = get_current_user(),
type = str)
- parser.add_argument("--verbose", dest="verbose",
+ parser.add_argument("-v", "--verbose", dest="verbose",
action="store_true", help="Switch ON verbose option. Default is: OFF.",
default = False)
@@ -665,34 +699,50 @@ def main():
options = parser.parse_args()
# Stateless client connection
- stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub, options.quiet)
+ if options.quiet:
+ verbose_level = LoggerApi.VERBOSE_QUIET
+ elif options.verbose:
+ verbose_level = LoggerApi.VERBOSE_HIGH
+ else:
+ verbose_level = LoggerApi.VERBOSE_REGULAR
- if not options.quiet:
- print "\nlogged as {0}".format(format_text(options.user, 'bold'))
+ # Stateless client connection
+ logger = ConsoleLogger()
+ stateless_client = STLClient(username = options.user,
+ server = options.server,
+ sync_port = options.port,
+ async_port = options.pub,
+ verbose_level = verbose_level,
+ logger = logger)
# TUI or no acquire will give us READ ONLY mode
- if options.tui or not options.acquire:
- rc = stateless_client.connect("RO")
- else:
- rc = stateless_client.connect("RW")
-
- # unable to connect - bye
- if rc.bad():
- rc.annotate()
+ try:
+ stateless_client.connect("RO")
+ except STLError as e:
+ logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold'))
return
+ if not options.tui and options.acquire:
+ try:
+ stateless_client.acquire()
+ except STLError as e:
+ logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold'))
+ logger.log(format_text("\nSwitching to read only mode - only few commands will be available", 'bold'))
+
# a script mode
if options.batch:
- cont = stateless_client.run_script_file(options.batch[0])
+ cont = run_script_file(options.batch[0], stateless_client)
if not cont:
return
# console
-
try:
console = TRexConsole(stateless_client, options.verbose)
+ logger.prompt_redraw = console.prompt_redraw
+
if options.tui:
+ set_window_always_on_top('trex_tui')
console.do_tui("")
else:
console.start()
@@ -701,7 +751,7 @@ def main():
print "\n\n*** Caught Ctrl + C... Exiting...\n\n"
finally:
- stateless_client.disconnect()
+ stateless_client.teardown(stop_traffic = False)
if __name__ == '__main__':
diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py
index cdf3fb69..45769693 100644
--- a/scripts/automation/trex_control_plane/console/trex_status.py
+++ b/scripts/automation/trex_control_plane/console/trex_status.py
@@ -1,525 +1,525 @@
-from time import sleep
-
-import os
-
-import curses
-from curses import panel
-import random
-import collections
-import operator
-import datetime
-
-g_curses_active = False
-
-################### utils #################
-
-# simple percetange show
-def percentage (a, total):
- x = int ((float(a) / total) * 100)
- return str(x) + "%"
-
-################### panels #################
-
-# panel object
-class TrexStatusPanel(object):
- def __init__ (self, h, l, y, x, headline, status_obj):
-
- self.status_obj = status_obj
-
- self.log = status_obj.log
- self.stateless_client = status_obj.stateless_client
-
- self.stats = status_obj.stats
- self.general_stats = status_obj.general_stats
-
- self.h = h
- self.l = l
- self.y = y
- self.x = x
- self.headline = headline
-
- self.win = curses.newwin(h, l, y, x)
- self.win.erase()
- self.win.box()
-
- self.win.addstr(1, 2, headline, curses.A_UNDERLINE)
- self.win.refresh()
-
- panel.new_panel(self.win)
- self.panel = panel.new_panel(self.win)
- self.panel.top()
-
- def clear (self):
- self.win.erase()
- self.win.box()
- self.win.addstr(1, 2, self.headline, curses.A_UNDERLINE)
-
- def getwin (self):
- return self.win
-
-
-# various kinds of panels
-
-# Server Info Panel
-class ServerInfoPanel(TrexStatusPanel):
- def __init__ (self, h, l, y, x, status_obj):
-
- super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj)
-
- def draw (self):
-
- if not self.status_obj.server_version :
- return
-
- if not self.status_obj.server_sys_info:
- return
-
-
- self.clear()
-
- self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port())))
- self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"]))
- self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:",
- self.status_obj.server_version["build_date"] + " @ " +
- self.status_obj.server_version["build_time"] + " by " +
- self.status_obj.server_version["built_by"]))
-
- self.getwin().addstr(6, 2, "{:<30} {:30}".format("Server Uptime:", self.status_obj.server_sys_info["uptime"]))
- self.getwin().addstr(7, 2, "{:<30} {:<3} / {:<30}".format("DP Cores:", str(self.status_obj.server_sys_info["dp_core_count"]) +
- " cores", self.status_obj.server_sys_info["core_type"]))
-
- self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"]))
-
- ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports_list)
-
- if not ports_owned:
- ports_owned = "None"
-
- self.getwin().addstr(10, 2, "{:<30} {:<30}".format("Ports Owned:", ports_owned))
-
-# general info panel
-class GeneralInfoPanel(TrexStatusPanel):
- def __init__ (self, h, l, y, x, status_obj):
-
- super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj)
-
- def draw (self):
- self.clear()
-
- if not self.general_stats.is_online():
- self.getwin().addstr(3, 2, "No Published Data From TRex Server")
- return
-
- self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util")))
-
- self.getwin().addstr(6, 2, "{:<30} {:} / {:}".format("Total Tx. rate:",
- self.general_stats.get("m_tx_bps", format = True, suffix = "bps"),
- self.general_stats.get("m_tx_pps", format = True, suffix = "pps")))
-
-
- self.getwin().addstr(8, 2, "{:<30} {:} / {:}".format("Total Tx:",
- self.general_stats.get_rel("m_total_tx_bytes", format = True, suffix = "B"),
- self.general_stats.get_rel("m_total_tx_pkts", format = True, suffix = "pkts")))
-
- self.getwin().addstr(11, 2, "{:<30} {:} / {:}".format("Total Rx. rate:",
- self.general_stats.get("m_rx_bps", format = True, suffix = "bps"),
- self.general_stats.get("m_rx_pps", format = True, suffix = "pps")))
-
-
- self.getwin().addstr(13, 2, "{:<30} {:} / {:}".format("Total Rx:",
- self.general_stats.get_rel("m_total_rx_bytes", format = True, suffix = "B"),
- self.general_stats.get_rel("m_total_rx_pkts", format = True, suffix = "pkts")))
-
-# all ports stats
-class PortsStatsPanel(TrexStatusPanel):
- def __init__ (self, h, l, y, x, status_obj):
-
- super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj)
-
-
- def draw (self):
-
- self.clear()
-
- owned_ports = self.status_obj.owned_ports_list
- if not owned_ports:
- self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports")
- return
-
- # table header
- self.getwin().addstr(3, 2, "{:^15} {:^30} {:^30} {:^30}".format(
- "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]"))
-
-
-
- for i, port_index in enumerate(owned_ports):
-
- port_stats = self.status_obj.stats.get_port_stats(port_index)
-
- if port_stats:
- self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format(
- "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
- "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
- port_stats.get("m_total_tx_pps", format = True, suffix = "pps")),
-
- "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
- port_stats.get("m_total_rx_pps", format = True, suffix = "pps")),
- "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"),
- port_stats.get_rel("ibytes", format = True, suffix = "B"))))
-
- else:
-
- self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format(
- "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
- "N/A",
- "N/A",
- "N/A",
- "N/A"))
-
-
- # old format
+#from time import sleep
+#
+#import os
+#
+#import curses
+#from curses import panel
+#import random
+#import collections
+#import operator
+#import datetime
+#
+#g_curses_active = False
+#
+#################### utils #################
+#
+## simple percetange show
+#def percentage (a, total):
+# x = int ((float(a) / total) * 100)
+# return str(x) + "%"
+#
+#################### panels #################
+#
+## panel object
+#class TrexStatusPanel(object):
+# def __init__ (self, h, l, y, x, headline, status_obj):
+#
+# self.status_obj = status_obj
+#
+# self.log = status_obj.log
+# self.stateless_client = status_obj.stateless_client
+#
+# self.stats = status_obj.stats
+# self.general_stats = status_obj.general_stats
+#
+# self.h = h
+# self.l = l
+# self.y = y
+# self.x = x
+# self.headline = headline
+#
+# self.win = curses.newwin(h, l, y, x)
+# self.win.erase()
+# self.win.box()
+#
+# self.win.addstr(1, 2, headline, curses.A_UNDERLINE)
+# self.win.refresh()
+#
+# panel.new_panel(self.win)
+# self.panel = panel.new_panel(self.win)
+# self.panel.top()
+#
+# def clear (self):
+# self.win.erase()
+# self.win.box()
+# self.win.addstr(1, 2, self.headline, curses.A_UNDERLINE)
+#
+# def getwin (self):
+# return self.win
+#
+#
+## various kinds of panels
+#
+## Server Info Panel
+#class ServerInfoPanel(TrexStatusPanel):
+# def __init__ (self, h, l, y, x, status_obj):
+#
+# super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj)
+#
+# def draw (self):
+#
+# if not self.status_obj.server_version :
+# return
+#
+# if not self.status_obj.server_sys_info:
+# return
+#
+#
+# self.clear()
+#
+# self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port())))
+# self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"]))
+# self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:",
+# self.status_obj.server_version["build_date"] + " @ " +
+# self.status_obj.server_version["build_time"] + " by " +
+# self.status_obj.server_version["built_by"]))
+#
+# self.getwin().addstr(6, 2, "{:<30} {:30}".format("Server Uptime:", self.status_obj.server_sys_info["uptime"]))
+# self.getwin().addstr(7, 2, "{:<30} {:<3} / {:<30}".format("DP Cores:", str(self.status_obj.server_sys_info["dp_core_count"]) +
+# " cores", self.status_obj.server_sys_info["core_type"]))
+#
+# self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"]))
+#
+# ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports_list)
+#
+# if not ports_owned:
+# ports_owned = "None"
+#
+# self.getwin().addstr(10, 2, "{:<30} {:<30}".format("Ports Owned:", ports_owned))
+#
+## general info panel
+#class GeneralInfoPanel(TrexStatusPanel):
+# def __init__ (self, h, l, y, x, status_obj):
+#
+# super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj)
+#
+# def draw (self):
+# self.clear()
+#
+# if not self.general_stats.is_online():
+# self.getwin().addstr(3, 2, "No Published Data From TRex Server")
+# return
+#
+# self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util")))
+#
+# self.getwin().addstr(6, 2, "{:<30} {:} / {:}".format("Total Tx. rate:",
+# self.general_stats.get("m_tx_bps", format = True, suffix = "bps"),
+# self.general_stats.get("m_tx_pps", format = True, suffix = "pps")))
+#
+#
+# self.getwin().addstr(8, 2, "{:<30} {:} / {:}".format("Total Tx:",
+# self.general_stats.get_rel("m_total_tx_bytes", format = True, suffix = "B"),
+# self.general_stats.get_rel("m_total_tx_pkts", format = True, suffix = "pkts")))
+#
+# self.getwin().addstr(11, 2, "{:<30} {:} / {:}".format("Total Rx. rate:",
+# self.general_stats.get("m_rx_bps", format = True, suffix = "bps"),
+# self.general_stats.get("m_rx_pps", format = True, suffix = "pps")))
+#
+#
+# self.getwin().addstr(13, 2, "{:<30} {:} / {:}".format("Total Rx:",
+# self.general_stats.get_rel("m_total_rx_bytes", format = True, suffix = "B"),
+# self.general_stats.get_rel("m_total_rx_pkts", format = True, suffix = "pkts")))
+#
+## all ports stats
+#class PortsStatsPanel(TrexStatusPanel):
+# def __init__ (self, h, l, y, x, status_obj):
+#
+# super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj)
+#
+#
+# def draw (self):
+#
+# self.clear()
+#
+# owned_ports = self.status_obj.owned_ports_list
+# if not owned_ports:
+# self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports")
+# return
+#
+# # table header
+# self.getwin().addstr(3, 2, "{:^15} {:^30} {:^30} {:^30}".format(
+# "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]"))
+#
+#
+#
+# for i, port_index in enumerate(owned_ports):
+#
+# port_stats = self.status_obj.stats.get_port_stats(port_index)
+#
# if port_stats:
-# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format(
# "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
-# port_stats.get("m_total_tx_pps", format = True, suffix = "pps"),
-# port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
-# port_stats.get_rel("obytes", format = True, suffix = "B"),
-# port_stats.get("m_total_rx_pps", format = True, suffix = "pps"),
-# port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
-# port_stats.get_rel("ibytes", format = True, suffix = "B")))
+# "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
+# port_stats.get("m_total_tx_pps", format = True, suffix = "pps")),
+#
+# "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
+# port_stats.get("m_total_rx_pps", format = True, suffix = "pps")),
+# "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"),
+# port_stats.get_rel("ibytes", format = True, suffix = "B"))))
#
# else:
-# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+#
+# self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^30} {:^30} {:^30}".format(
# "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
# "N/A",
# "N/A",
# "N/A",
-# "N/A",
-# "N/A",
# "N/A"))
-
-# control panel
-class ControlPanel(TrexStatusPanel):
- def __init__ (self, h, l, y, x, status_obj):
-
- super(ControlPanel, self).__init__(h, l, y, x, "", status_obj)
-
-
- def draw (self):
- self.clear()
-
- self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit"
- .format(self.status_obj.stateless_client.get_port_count() - 1))
-
- self.log.draw(self.getwin(), 2, 3)
-
-# specific ports panels
-class SinglePortPanel(TrexStatusPanel):
- def __init__ (self, h, l, y, x, status_obj, port_id):
-
- super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj)
-
- self.port_id = port_id
-
- def draw (self):
- y = 3
-
- self.clear()
-
- if not self.port_id in self.status_obj.owned_ports_list:
- self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id))
- return
-
- # streams
- self.getwin().addstr(y, 2, "Streams:", curses.A_UNDERLINE)
- y += 2
-
- # stream table header
- self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
- "Stream ID", "Enabled", "Type", "Self Start", "ISG", "Next Stream", "VM"))
- y += 2
-
- # streams
-
- if 'streams' in self.status_obj.owned_ports[str(self.port_id)]:
- stream_info = self.status_obj.owned_ports[str(self.port_id)]['streams']
-
- for stream_id, stream in sorted(stream_info.iteritems(), key=operator.itemgetter(0)):
- self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
- stream_id,
- ("True" if stream['enabled'] else "False"),
- stream['mode']['type'],
- ("True" if stream['self_start'] else "False"),
- stream['isg'],
- (stream['next_stream_id'] if stream['next_stream_id'] != -1 else "None"),
- ("{0} instr.".format(len(stream['vm'])) if stream['vm'] else "None")))
-
- y += 1
-
- # new section - traffic
- y += 2
-
- self.getwin().addstr(y, 2, "Traffic:", curses.A_UNDERLINE)
- y += 2
-
-
-
- # table header
- self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
- "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]"))
-
-
- y += 2
-
- port_stats = self.status_obj.stats.get_port_stats(self.port_id)
-
- if port_stats:
- self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
- "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]),
- "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
- port_stats.get("m_total_tx_pps", format = True, suffix = "pps")),
-
- "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
- port_stats.get("m_total_rx_pps", format = True, suffix = "pps")),
- "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"),
- port_stats.get_rel("ibytes", format = True, suffix = "B"))))
-
- else:
- self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
- "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]),
- "N/A",
- "N/A",
- "N/A",
- "N/A"))
-
-
-################### main objects #################
-
-# status log
-class TrexStatusLog():
- def __init__ (self):
- self.log = []
-
- def add_event (self, msg):
- self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg))
-
- def draw (self, window, x, y, max_lines = 4):
- index = y
-
- cut = len(self.log) - max_lines
- if cut < 0:
- cut = 0
-
- for msg in self.log[cut:]:
- window.addstr(index, x, msg)
- index += 1
-
-# status commands
-class TrexStatusCommands():
- def __init__ (self, status_object):
-
- self.status_object = status_object
-
- self.stateless_client = status_object.stateless_client
- self.log = self.status_object.log
-
- self.actions = {}
- self.actions[ord('q')] = self._quit
- self.actions[ord('p')] = self._ping
- self.actions[ord('f')] = self._freeze
-
- self.actions[ord('g')] = self._show_ports_stats
-
- # register all the available ports shortcuts
- for port_id in xrange(0, self.stateless_client.get_port_count()):
- self.actions[ord('0') + port_id] = self._show_port_generator(port_id)
-
-
- # handle a key pressed
- def handle (self, ch):
- if ch in self.actions:
- return self.actions[ch]()
- else:
- self.log.add_event("Unknown key pressed, please see legend")
- return True
-
- # show all ports
- def _show_ports_stats (self):
- self.log.add_event("Switching to all ports view")
- self.status_object.stats_panel = self.status_object.ports_stats_panel
-
- return True
-
-
- # function generator for different ports requests
- def _show_port_generator (self, port_id):
- def _show_port():
- self.log.add_event("Switching panel to port {0}".format(port_id))
- self.status_object.stats_panel = self.status_object.ports_panels[port_id]
-
- return True
-
- return _show_port
-
- def _freeze (self):
- self.status_object.update_active = not self.status_object.update_active
- self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped")
-
- return True
-
- def _quit(self):
- return False
-
- def _ping (self):
- self.log.add_event("Pinging RPC server")
-
- rc, msg = self.stateless_client.ping()
- if rc:
- self.log.add_event("Server replied: '{0}'".format(msg))
- else:
- self.log.add_event("Failed to get reply")
-
- return True
-
-# status object
-#
-#
-#
-class CTRexStatus():
- def __init__ (self, stdscr, stateless_client):
- self.stdscr = stdscr
-
- self.stateless_client = stateless_client
-
- self.log = TrexStatusLog()
- self.cmds = TrexStatusCommands(self)
-
- self.stats = stateless_client.get_stats_async()
- self.general_stats = stateless_client.get_stats_async().get_general_stats()
-
- # fetch server info
- self.server_sys_info = self.stateless_client.get_system_info()
-
- self.server_version = self.stateless_client.get_version()
-
- # list of owned ports
- self.owned_ports_list = self.stateless_client.get_acquired_ports()
-
- # data per port
- self.owned_ports = {}
-
- for port_id in self.owned_ports_list:
- self.owned_ports[str(port_id)] = {}
- self.owned_ports[str(port_id)]['streams'] = {}
-
- stream_list = self.stateless_client.get_all_streams(port_id)
-
- self.owned_ports[str(port_id)] = stream_list
-
-
- try:
- curses.curs_set(0)
- except:
- pass
-
- curses.use_default_colors()
- self.stdscr.nodelay(1)
- curses.nonl()
- curses.noecho()
-
- self.generate_layout()
-
-
- def generate_layout (self):
- self.max_y = self.stdscr.getmaxyx()[0]
- self.max_x = self.stdscr.getmaxyx()[1]
-
- self.server_info_panel = ServerInfoPanel(int(self.max_y * 0.3), self.max_x / 2, int(self.max_y * 0.5), self.max_x /2, self)
- self.general_info_panel = GeneralInfoPanel(int(self.max_y * 0.5), self.max_x / 2, 0, self.max_x /2, self)
- self.control_panel = ControlPanel(int(self.max_y * 0.2), self.max_x , int(self.max_y * 0.8), 0, self)
-
- # those can be switched on the same place
- self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self)
-
- self.ports_panels = {}
- for i in xrange(0, self.stateless_client.get_port_count()):
- self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i)
-
- # at start time we point to the main one
- self.stats_panel = self.ports_stats_panel
- self.stats_panel.panel.top()
-
- panel.update_panels(); self.stdscr.refresh()
- return
-
-
- def wait_for_key_input (self):
- ch = self.stdscr.getch()
-
- # no key , continue
- if ch == curses.ERR:
- return True
-
- return self.cmds.handle(ch)
-
- # main run entry point
- def run (self):
-
- # list of owned ports
- self.owned_ports_list = self.stateless_client.get_acquired_ports()
-
- # data per port
- self.owned_ports = {}
-
- for port_id in self.owned_ports_list:
- self.owned_ports[str(port_id)] = {}
- self.owned_ports[str(port_id)]['streams'] = {}
-
- stream_list = self.stateless_client.get_all_streams(port_id)
-
- self.owned_ports[str(port_id)] = stream_list
-
- self.update_active = True
- while (True):
-
- rc = self.wait_for_key_input()
- if not rc:
- break
-
- self.server_info_panel.draw()
- self.general_info_panel.draw()
- self.control_panel.draw()
-
- # can be different kinds of panels
- self.stats_panel.panel.top()
- self.stats_panel.draw()
-
- panel.update_panels()
- self.stdscr.refresh()
- sleep(0.01)
-
-
-# global container
-trex_status = None
-
-def show_trex_status_internal (stdscr, stateless_client):
- global trex_status
-
- if trex_status == None:
- trex_status = CTRexStatus(stdscr, stateless_client)
-
- trex_status.run()
-
-def show_trex_status (stateless_client):
-
- try:
- curses.wrapper(show_trex_status_internal, stateless_client)
- except KeyboardInterrupt:
- curses.endwin()
-
-def cleanup ():
- try:
- curses.endwin()
- except:
- pass
-
+#
+#
+# # old format
+## if port_stats:
+## self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+## "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
+## port_stats.get("m_total_tx_pps", format = True, suffix = "pps"),
+## port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
+## port_stats.get_rel("obytes", format = True, suffix = "B"),
+## port_stats.get("m_total_rx_pps", format = True, suffix = "pps"),
+## port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
+## port_stats.get_rel("ibytes", format = True, suffix = "B")))
+##
+## else:
+## self.getwin().addstr(5 + (i * 4), 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+## "{0} ({1})".format(str(port_index), self.status_obj.server_sys_info["ports"][port_index]["speed"]),
+## "N/A",
+## "N/A",
+## "N/A",
+## "N/A",
+## "N/A",
+## "N/A"))
+#
+## control panel
+#class ControlPanel(TrexStatusPanel):
+# def __init__ (self, h, l, y, x, status_obj):
+#
+# super(ControlPanel, self).__init__(h, l, y, x, "", status_obj)
+#
+#
+# def draw (self):
+# self.clear()
+#
+# self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit"
+# .format(self.status_obj.stateless_client.get_port_count() - 1))
+#
+# self.log.draw(self.getwin(), 2, 3)
+#
+## specific ports panels
+#class SinglePortPanel(TrexStatusPanel):
+# def __init__ (self, h, l, y, x, status_obj, port_id):
+#
+# super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj)
+#
+# self.port_id = port_id
+#
+# def draw (self):
+# y = 3
+#
+# self.clear()
+#
+# if not self.port_id in self.status_obj.owned_ports_list:
+# self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id))
+# return
+#
+# # streams
+# self.getwin().addstr(y, 2, "Streams:", curses.A_UNDERLINE)
+# y += 2
+#
+# # stream table header
+# self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+# "Stream ID", "Enabled", "Type", "Self Start", "ISG", "Next Stream", "VM"))
+# y += 2
+#
+# # streams
+#
+# if 'streams' in self.status_obj.owned_ports[str(self.port_id)]:
+# stream_info = self.status_obj.owned_ports[str(self.port_id)]['streams']
+#
+# for stream_id, stream in sorted(stream_info.iteritems(), key=operator.itemgetter(0)):
+# self.getwin().addstr(y, 2, "{:^15} {:^15} {:^15} {:^15} {:^15} {:^15} {:^15}".format(
+# stream_id,
+# ("True" if stream['enabled'] else "False"),
+# stream['mode']['type'],
+# ("True" if stream['self_start'] else "False"),
+# stream['isg'],
+# (stream['next_stream_id'] if stream['next_stream_id'] != -1 else "None"),
+# ("{0} instr.".format(len(stream['vm'])) if stream['vm'] else "None")))
+#
+# y += 1
+#
+# # new section - traffic
+# y += 2
+#
+# self.getwin().addstr(y, 2, "Traffic:", curses.A_UNDERLINE)
+# y += 2
+#
+#
+#
+# # table header
+# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
+# "Port ID", "Tx Rate [bps/pps]", "Rx Rate [bps/pps]", "Total Bytes [tx/rx]"))
+#
+#
+# y += 2
+#
+# port_stats = self.status_obj.stats.get_port_stats(self.port_id)
+#
+# if port_stats:
+# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
+# "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]),
+# "{0} / {1}".format(port_stats.get("m_total_tx_bps", format = True, suffix = "bps"),
+# port_stats.get("m_total_tx_pps", format = True, suffix = "pps")),
+#
+# "{0} / {1}".format(port_stats.get("m_total_rx_bps", format = True, suffix = "bps"),
+# port_stats.get("m_total_rx_pps", format = True, suffix = "pps")),
+# "{0} / {1}".format(port_stats.get_rel("obytes", format = True, suffix = "B"),
+# port_stats.get_rel("ibytes", format = True, suffix = "B"))))
+#
+# else:
+# self.getwin().addstr(y, 2, "{:^15} {:^30} {:^30} {:^30}".format(
+# "{0} ({1})".format(str(self.port_id), self.status_obj.server_sys_info["ports"][self.port_id]["speed"]),
+# "N/A",
+# "N/A",
+# "N/A",
+# "N/A"))
+#
+#
+#################### main objects #################
+#
+## status log
+#class TrexStatusLog():
+# def __init__ (self):
+# self.log = []
+#
+# def add_event (self, msg):
+# self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg))
+#
+# def draw (self, window, x, y, max_lines = 4):
+# index = y
+#
+# cut = len(self.log) - max_lines
+# if cut < 0:
+# cut = 0
+#
+# for msg in self.log[cut:]:
+# window.addstr(index, x, msg)
+# index += 1
+#
+## status commands
+#class TrexStatusCommands():
+# def __init__ (self, status_object):
+#
+# self.status_object = status_object
+#
+# self.stateless_client = status_object.stateless_client
+# self.log = self.status_object.log
+#
+# self.actions = {}
+# self.actions[ord('q')] = self._quit
+# self.actions[ord('p')] = self._ping
+# self.actions[ord('f')] = self._freeze
+#
+# self.actions[ord('g')] = self._show_ports_stats
+#
+# # register all the available ports shortcuts
+# for port_id in xrange(0, self.stateless_client.get_port_count()):
+# self.actions[ord('0') + port_id] = self._show_port_generator(port_id)
+#
+#
+# # handle a key pressed
+# def handle (self, ch):
+# if ch in self.actions:
+# return self.actions[ch]()
+# else:
+# self.log.add_event("Unknown key pressed, please see legend")
+# return True
+#
+# # show all ports
+# def _show_ports_stats (self):
+# self.log.add_event("Switching to all ports view")
+# self.status_object.stats_panel = self.status_object.ports_stats_panel
+#
+# return True
+#
+#
+# # function generator for different ports requests
+# def _show_port_generator (self, port_id):
+# def _show_port():
+# self.log.add_event("Switching panel to port {0}".format(port_id))
+# self.status_object.stats_panel = self.status_object.ports_panels[port_id]
+#
+# return True
+#
+# return _show_port
+#
+# def _freeze (self):
+# self.status_object.update_active = not self.status_object.update_active
+# self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped")
+#
+# return True
+#
+# def _quit(self):
+# return False
+#
+# def _ping (self):
+# self.log.add_event("Pinging RPC server")
+#
+# rc, msg = self.stateless_client.ping()
+# if rc:
+# self.log.add_event("Server replied: '{0}'".format(msg))
+# else:
+# self.log.add_event("Failed to get reply")
+#
+# return True
+#
+## status object
+##
+##
+##
+#class CTRexStatus():
+# def __init__ (self, stdscr, stateless_client):
+# self.stdscr = stdscr
+#
+# self.stateless_client = stateless_client
+#
+# self.log = TrexStatusLog()
+# self.cmds = TrexStatusCommands(self)
+#
+# self.stats = stateless_client.get_stats_async()
+# self.general_stats = stateless_client.get_stats_async().get_general_stats()
+#
+# # fetch server info
+# self.server_sys_info = self.stateless_client.get_system_info()
+#
+# self.server_version = self.stateless_client.get_server_version()
+#
+# # list of owned ports
+# self.owned_ports_list = self.stateless_client.get_acquired_ports()
+#
+# # data per port
+# self.owned_ports = {}
+#
+# for port_id in self.owned_ports_list:
+# self.owned_ports[str(port_id)] = {}
+# self.owned_ports[str(port_id)]['streams'] = {}
+#
+# stream_list = self.stateless_client.get_all_streams(port_id)
+#
+# self.owned_ports[str(port_id)] = stream_list
+#
+#
+# try:
+# curses.curs_set(0)
+# except:
+# pass
+#
+# curses.use_default_colors()
+# self.stdscr.nodelay(1)
+# curses.nonl()
+# curses.noecho()
+#
+# self.generate_layout()
+#
+#
+# def generate_layout (self):
+# self.max_y = self.stdscr.getmaxyx()[0]
+# self.max_x = self.stdscr.getmaxyx()[1]
+#
+# self.server_info_panel = ServerInfoPanel(int(self.max_y * 0.3), self.max_x / 2, int(self.max_y * 0.5), self.max_x /2, self)
+# self.general_info_panel = GeneralInfoPanel(int(self.max_y * 0.5), self.max_x / 2, 0, self.max_x /2, self)
+# self.control_panel = ControlPanel(int(self.max_y * 0.2), self.max_x , int(self.max_y * 0.8), 0, self)
+#
+# # those can be switched on the same place
+# self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self)
+#
+# self.ports_panels = {}
+# for i in xrange(0, self.stateless_client.get_port_count()):
+# self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i)
+#
+# # at start time we point to the main one
+# self.stats_panel = self.ports_stats_panel
+# self.stats_panel.panel.top()
+#
+# panel.update_panels(); self.stdscr.refresh()
+# return
+#
+#
+# def wait_for_key_input (self):
+# ch = self.stdscr.getch()
+#
+# # no key , continue
+# if ch == curses.ERR:
+# return True
+#
+# return self.cmds.handle(ch)
+#
+# # main run entry point
+# def run (self):
+#
+# # list of owned ports
+# self.owned_ports_list = self.stateless_client.get_acquired_ports()
+#
+# # data per port
+# self.owned_ports = {}
+#
+# for port_id in self.owned_ports_list:
+# self.owned_ports[str(port_id)] = {}
+# self.owned_ports[str(port_id)]['streams'] = {}
+#
+# stream_list = self.stateless_client.get_all_streams(port_id)
+#
+# self.owned_ports[str(port_id)] = stream_list
+#
+# self.update_active = True
+# while (True):
+#
+# rc = self.wait_for_key_input()
+# if not rc:
+# break
+#
+# self.server_info_panel.draw()
+# self.general_info_panel.draw()
+# self.control_panel.draw()
+#
+# # can be different kinds of panels
+# self.stats_panel.panel.top()
+# self.stats_panel.draw()
+#
+# panel.update_panels()
+# self.stdscr.refresh()
+# sleep(0.01)
+#
+#
+## global container
+#trex_status = None
+#
+#def show_trex_status_internal (stdscr, stateless_client):
+# global trex_status
+#
+# if trex_status == None:
+# trex_status = CTRexStatus(stdscr, stateless_client)
+#
+# trex_status.run()
+#
+#def show_trex_status (stateless_client):
+#
+# try:
+# curses.wrapper(show_trex_status_internal, stateless_client)
+# except KeyboardInterrupt:
+# curses.endwin()
+#
+#def cleanup ():
+# try:
+# curses.endwin()
+# except:
+# pass
+#
diff --git a/scripts/automation/trex_control_plane/console/trex_tui.py b/scripts/automation/trex_control_plane/console/trex_tui.py
index dbbac02b..1e22b005 100644
--- a/scripts/automation/trex_control_plane/console/trex_tui.py
+++ b/scripts/automation/trex_control_plane/console/trex_tui.py
@@ -8,6 +8,7 @@ from client_utils import text_tables
from collections import OrderedDict
import datetime
from cStringIO import StringIO
+from client.trex_stateless_client import STLError
class SimpleBar(object):
def __init__ (self, desc, pattern):
@@ -60,7 +61,7 @@ class TrexTUIDashBoard(TrexTUIPanel):
def show (self):
- stats = self.stateless_client.cmd_stats(self.ports, trex_stats.COMPACT)
+ stats = self.stateless_client._get_formatted_stats(self.ports, trex_stats.COMPACT)
# print stats to screen
for stat_type, stat_data in stats.iteritems():
text_tables.print_table_with_header(stat_data.text_table, stat_type)
@@ -71,8 +72,7 @@ class TrexTUIDashBoard(TrexTUIPanel):
allowed['c'] = self.key_actions['c']
- # thats it for read only
- if self.stateless_client.is_read_only():
+ if self.stateless_client.is_all_ports_acquired():
return allowed
if len(self.stateless_client.get_transmitting_ports()) > 0:
@@ -89,64 +89,44 @@ class TrexTUIDashBoard(TrexTUIPanel):
######### actions
def action_pause (self):
- rc = self.stateless_client.pause_traffic(self.mng.ports)
+ try:
+ rc = self.stateless_client.pause(ports = self.mng.ports)
+ except STLError:
+ pass
- ports_succeeded = []
- for rc_single, port_id in zip(rc.rc_list, self.mng.ports):
- if rc_single.rc:
- ports_succeeded.append(port_id)
+ return ""
- if len(ports_succeeded) > 0:
- return "paused traffic on port(s): {0}".format(ports_succeeded)
- else:
- return ""
def action_resume (self):
- rc = self.stateless_client.resume_traffic(self.mng.ports)
-
- ports_succeeded = []
- for rc_single, port_id in zip(rc.rc_list, self.mng.ports):
- if rc_single.rc:
- ports_succeeded.append(port_id)
+ try:
+ self.stateless_client.resume(ports = self.mng.ports)
+ except STLError:
+ pass
- if len(ports_succeeded) > 0:
- return "resumed traffic on port(s): {0}".format(ports_succeeded)
- else:
- return ""
+ return ""
def action_raise (self):
- mul = {'type': 'percentage', 'value': 5, 'op': 'add'}
- rc = self.stateless_client.update_traffic(mul, self.mng.ports)
+ try:
+ self.stateless_client.update(mult = "5%+", ports = self.mng.ports)
+ except STLError:
+ pass
- ports_succeeded = []
- for rc_single, port_id in zip(rc.rc_list, self.mng.ports):
- if rc_single.rc:
- ports_succeeded.append(port_id)
+ return ""
- if len(ports_succeeded) > 0:
- return "raised B/W by %5 on port(s): {0}".format(ports_succeeded)
- else:
- return ""
def action_lower (self):
- mul = {'type': 'percentage', 'value': 5, 'op': 'sub'}
- rc = self.stateless_client.update_traffic(mul, self.mng.ports)
-
- ports_succeeded = []
- for rc_single, port_id in zip(rc.rc_list, self.mng.ports):
- if rc_single.rc:
- ports_succeeded.append(port_id)
+ try:
+ self.stateless_client.update(mult = "5%-", ports = self.mng.ports)
+ except STLError:
+ pass
- if len(ports_succeeded) > 0:
- return "lowered B/W by %5 on port(s): {0}".format(ports_succeeded)
- else:
- return ""
+ return ""
def action_clear (self):
- self.stateless_client.cmd_clear(self.mng.ports)
+ self.stateless_client.clear_stats(self.mng.ports)
return "cleared all stats"
@@ -168,7 +148,7 @@ class TrexTUIPort(TrexTUIPanel):
def show (self):
- stats = self.stateless_client.cmd_stats([self.port_id], trex_stats.COMPACT)
+ stats = self.stateless_client._get_formatted_stats([self.port_id], trex_stats.COMPACT)
# print stats to screen
for stat_type, stat_data in stats.iteritems():
text_tables.print_table_with_header(stat_data.text_table, stat_type)
@@ -179,8 +159,7 @@ class TrexTUIPort(TrexTUIPanel):
allowed['c'] = self.key_actions['c']
- # thats it for read only
- if self.stateless_client.is_read_only():
+ if self.stateless_client.is_all_ports_acquired():
return allowed
if self.port.state == self.port.STATE_TX:
@@ -196,39 +175,44 @@ class TrexTUIPort(TrexTUIPanel):
# actions
def action_pause (self):
- rc = self.stateless_client.pause_traffic([self.port_id])
- if rc.good():
- return "port {0}: paused traffic".format(self.port_id)
- else:
- return ""
+ try:
+ self.stateless_client.pause(ports = [self.port_id])
+ except STLError:
+ pass
+
+ return ""
def action_resume (self):
- rc = self.stateless_client.resume_traffic([self.port_id])
- if rc.good():
- return "port {0}: resumed traffic".format(self.port_id)
- else:
- return ""
+ try:
+ self.stateless_client.resume(ports = [self.port_id])
+ except STLError:
+ pass
+
+ return ""
+
def action_raise (self):
- mul = {'type': 'percentage', 'value': 5, 'op': 'add'}
- rc = self.stateless_client.update_traffic(mul, [self.port_id])
+ mult = {'type': 'percentage', 'value': 5, 'op': 'add'}
- if rc.good():
- return "port {0}: raised B/W by 5%".format(self.port_id)
- else:
- return ""
+ try:
+ self.stateless_client.update(mult = mult, ports = [self.port_id])
+ except STLError:
+ pass
+
+ return ""
def action_lower (self):
- mul = {'type': 'percentage', 'value': 5, 'op': 'sub'}
- rc = self.stateless_client.update_traffic(mul, [self.port_id])
+ mult = {'type': 'percentage', 'value': 5, 'op': 'sub'}
- if rc.good():
- return "port {0}: lowered B/W by 5%".format(self.port_id)
- else:
- return ""
+ try:
+ self.stateless_client.update(mult = mult, ports = [self.port_id])
+ except STLError:
+ pass
+
+ return ""
def action_clear (self):
- self.stateless_client.cmd_clear([self.port_id])
+ self.stateless_client.clear_stats([self.port_id])
return "port {0}: cleared stats".format(self.port_id)
# log
@@ -425,7 +409,7 @@ class TrexTUI():
if self.state == self.STATE_ACTIVE:
# if no connectivity - move to lost connecitivty
if not self.stateless_client.async_client.is_alive():
- self.stateless_client.cmd_invalidate(self.pm.ports)
+ self.stateless_client._invalidate_stats(self.pm.ports)
self.state = self.STATE_LOST_CONT
@@ -440,11 +424,10 @@ class TrexTUI():
# restored connectivity - try to reconnect
elif self.state == self.STATE_RECONNECT:
- rc = self.stateless_client.connect("RO")
- if rc.good():
+ try:
+ self.stateless_client.connect("RO")
self.state = self.STATE_ACTIVE
- else:
- # maybe we lost it again
+ except STLError:
self.state = self.STATE_LOST_CONT
diff --git a/scripts/automation/trex_control_plane/examples/interactive_stateless.py b/scripts/automation/trex_control_plane/examples/interactive_stateless.py
index e64b4755..f6ada17d 100644
--- a/scripts/automation/trex_control_plane/examples/interactive_stateless.py
+++ b/scripts/automation/trex_control_plane/examples/interactive_stateless.py
@@ -25,7 +25,7 @@ class InteractiveStatelessTRex(cmd.Cmd):
self.verbose = verbose
self.virtual = virtual
- self.trex = CTRexStatelessClient(trex_host, trex_port, self.virtual)
+ self.trex = STLClient(trex_host, trex_port, self.virtual)
self.DEFAULT_RUN_PARAMS = dict(m=1.5,
nc=True,
p=True,
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index 3ae49da8..f8bc10d5 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -110,6 +110,7 @@ public:
virtual void get_global_stats(TrexPlatformGlobalStats &stats) const = 0;
virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const = 0;
virtual void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const = 0;
+ virtual void publish_async_data_now(uint32_t key) const = 0;
virtual uint8_t get_dp_core_count() const = 0;
virtual ~TrexPlatformApi() {}
@@ -127,6 +128,7 @@ public:
void get_global_stats(TrexPlatformGlobalStats &stats) const;
void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const;
void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const;
+ void publish_async_data_now(uint32_t key) const;
uint8_t get_dp_core_count() const;
};
@@ -146,6 +148,7 @@ public:
speed = SPEED_INVALID;
}
+ void publish_async_data_now(uint32_t key) const {}
uint8_t get_dp_core_count() const;
};
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index fe6c6bdb..d40c4c8b 100755
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -2609,8 +2609,12 @@ private:
bool is_all_cores_finished();
public:
+
+ void publish_async_data();
+ void publish_async_barrier(uint32_t key);
+
void dump_stats(FILE *fd,
- std::string & json,CGlobalStats::DumpFormat format);
+ CGlobalStats::DumpFormat format);
void dump_template_info(std::string & json);
bool sanity_check();
void update_stats(void);
@@ -2649,6 +2653,7 @@ private:
CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */
CLatencyPktInfo m_latency_pkt;
TrexPublisher m_zmq_publisher;
+ CGlobalStats m_stats;
public:
TrexStateless *m_trex_stateless;
@@ -3448,11 +3453,11 @@ void CGlobalTRex::dump_template_info(std::string & json){
json+="]}" ;
}
-void CGlobalTRex::dump_stats(FILE *fd,std::string & json,
- CGlobalStats::DumpFormat format){
- CGlobalStats stats;
+void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){
+
update_stats();
- get_stats(stats);
+ get_stats(m_stats);
+
if (format==CGlobalStats::dmpTABLE) {
if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){
switch (m_io_modes.m_pp_mode ){
@@ -3461,11 +3466,11 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json,
break;
case CTrexGlobalIoMode::ppTABLE:
fprintf(fd,"\n-Per port stats table \n");
- stats.Dump(fd,CGlobalStats::dmpTABLE);
+ m_stats.Dump(fd,CGlobalStats::dmpTABLE);
break;
case CTrexGlobalIoMode::ppSTANDARD:
fprintf(fd,"\n-Per port stats - standard\n");
- stats.Dump(fd,CGlobalStats::dmpSTANDARD);
+ m_stats.Dump(fd,CGlobalStats::dmpSTANDARD);
break;
};
@@ -3475,22 +3480,62 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json,
break;
case CTrexGlobalIoMode::apENABLE:
fprintf(fd,"\n-Global stats enabled \n");
- stats.DumpAllPorts(fd);
+ m_stats.DumpAllPorts(fd);
break;
};
}
}else{
/* at exit , always need to dump it in standartd mode for scripts*/
- stats.Dump(fd,format);
- stats.DumpAllPorts(fd);
+ m_stats.Dump(fd,format);
+ m_stats.DumpAllPorts(fd);
}
- stats.dump_json(json);
+
+}
+
+
+void
+CGlobalTRex::publish_async_data() {
+ std::string json;
+
+ m_stats.dump_json(json);
+ m_zmq_publisher.publish_json(json);
+
+ /* generator json , all cores are the same just sample the first one */
+ m_fl.m_threads_info[0]->m_node_gen.dump_json(json);
+ m_zmq_publisher.publish_json(json);
+
+
+ if ( !get_is_stateless() ){
+ dump_template_info(json);
+ m_zmq_publisher.publish_json(json);
+ }
+
+ if ( get_is_rx_check_mode() ) {
+ m_mg.rx_check_dump_json(json );
+ m_zmq_publisher.publish_json(json);
+ }
+
+ /* backward compatible */
+ m_mg.dump_json(json );
+ m_zmq_publisher.publish_json(json);
+
+ /* more info */
+ m_mg.dump_json_v2(json );
+ m_zmq_publisher.publish_json(json);
+
+ /* stateless info - nothing for now */
+ //m_trex_stateless->generate_publish_snapshot(json);
+ //m_zmq_publisher.publish_json(json);
}
+void
+CGlobalTRex::publish_async_barrier(uint32_t key) {
+ m_zmq_publisher.publish_barrier(key);
+}
-int CGlobalTRex::run_in_master(){
+int CGlobalTRex::run_in_master() {
- std::string json;
+
bool was_stopped=false;
if ( get_is_stateless() ) {
@@ -3530,7 +3575,7 @@ int CGlobalTRex::run_in_master(){
m_io_modes.DumpHelp(stdout);
}
- dump_stats(stdout,json,CGlobalStats::dmpTABLE);
+ dump_stats(stdout,CGlobalStats::dmpTABLE);
if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
fprintf (stdout," current time : %.1f sec \n",now_sec());
@@ -3542,16 +3587,6 @@ int CGlobalTRex::run_in_master(){
fprintf (stdout," test duration : %.1f sec \n",d);
}
- m_zmq_publisher.publish_json(json);
-
- /* generator json , all cores are the same just sample the first one */
- m_fl.m_threads_info[0]->m_node_gen.dump_json(json);
- m_zmq_publisher.publish_json(json);
-
- if ( !get_is_stateless() ){
- dump_template_info(json);
- m_zmq_publisher.publish_json(json);
- }
if ( !CGlobalInfo::m_options.is_latency_disabled() ){
m_mg.update();
@@ -3591,24 +3626,12 @@ int CGlobalTRex::run_in_master(){
}
- if ( get_is_rx_check_mode() ) {
- m_mg.rx_check_dump_json(json );
- m_zmq_publisher.publish_json(json);
- }
-
- /* backward compatible */
- m_mg.dump_json(json );
- m_zmq_publisher.publish_json(json);
-
- /* more info */
- m_mg.dump_json_v2(json );
- m_zmq_publisher.publish_json(json);
+
}
- /* stateless info */
- m_trex_stateless->generate_publish_snapshot(json);
- m_zmq_publisher.publish_json(json);
+ /* publish data */
+ publish_async_data();
/* check from messages from DP */
check_for_dp_messages();
@@ -3679,11 +3702,10 @@ int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){
int CGlobalTRex::stop_master(){
delay(1000);
- std::string json;
fprintf(stdout," ==================\n");
fprintf(stdout," interface sum \n");
fprintf(stdout," ==================\n");
- dump_stats(stdout,json,CGlobalStats::dmpSTANDARD);
+ dump_stats(stdout,CGlobalStats::dmpSTANDARD);
fprintf(stdout," ==================\n");
fprintf(stdout," \n\n");
@@ -3724,7 +3746,7 @@ int CGlobalTRex::stop_master(){
m_mg.DumpRxCheckVerification(stdout,total_tx_rx_check);
}
- dump_stats(stdout,json,CGlobalStats::dmpSTANDARD);
+ dump_stats(stdout,CGlobalStats::dmpSTANDARD);
dump_post_test_stats(stdout);
m_fl.Delete();
@@ -4903,3 +4925,10 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id,
driver_name = CTRexExtendedDriverDb::Ins()->get_driver_name();
speed = CTRexExtendedDriverDb::Ins()->get_drv()->get_driver_speed();
}
+
+void
+TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const {
+ g_trex.publish_async_data();
+ g_trex.publish_async_barrier(key);
+}
+
diff --git a/src/mock/trex_platform_api_mock.cpp b/src/mock/trex_platform_api_mock.cpp
index 416c4b69..7cacd96c 100644
--- a/src/mock/trex_platform_api_mock.cpp
+++ b/src/mock/trex_platform_api_mock.cpp
@@ -51,3 +51,4 @@ void
TrexMockPlatformApi::port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {
cores_id_list.push_back(std::make_pair(0, 0));
}
+
diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp
index 35653069..f56d56df 100644
--- a/src/publisher/trex_publisher.cpp
+++ b/src/publisher/trex_publisher.cpp
@@ -94,6 +94,21 @@ TrexPublisher::publish_event(event_type_e type, const Json::Value &data) {
publish_json(s);
}
+void
+TrexPublisher::publish_barrier(uint32_t key) {
+ Json::FastWriter writer;
+ Json::Value value;
+ std::string s;
+
+ value["name"] = "trex-barrier";
+ value["type"] = key;
+ value["data"] = Json::objectValue;
+
+ s = writer.write(value);
+ publish_json(s);
+}
+
+
/**
* error handling
*
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index 52978476..f086babb 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -53,8 +53,20 @@ public:
};
+ /**
+ * publishes an async event
+ *
+ */
virtual void publish_event(event_type_e type, const Json::Value &data = Json::nullValue);
+ /**
+ * publishes a barrier requested by the user
+ *
+ * @author imarom (17-Jan-16)
+ *
+ */
+ virtual void publish_barrier(uint32_t key);
+
private:
void show_zmq_last_error(const std::string &err);
private:
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index a701f6db..66999144 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -317,3 +317,19 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
+/**
+ * publish async data now (fast flush)
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcPublishNow::_run(const Json::Value &params, Json::Value &result) {
+ TrexStateless *main = get_stateless_obj();
+
+ uint32_t key = parse_uint32(params, "key", result);
+
+ main->get_platform_api()->publish_async_data_now(key);
+
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+
+}
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index b1750053..081398d1 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -56,6 +56,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false);
* general cmds
*/
TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false);
+TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 1, false);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false);
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp
index 6e5fbfc6..aee92539 100644
--- a/src/rpc-server/trex_rpc_async_server.cpp
+++ b/src/rpc-server/trex_rpc_async_server.cpp
@@ -51,6 +51,8 @@ TrexRpcServerAsync::_prepare() {
*/
void
TrexRpcServerAsync::_rpc_thread_cb() {
+/* disabled, using the main publisher */
+#if 0
std::stringstream ss;
/* create a socket based on the configuration */
@@ -105,6 +107,7 @@ TrexRpcServerAsync::_rpc_thread_cb() {
/* must be closed from the same thread */
zmq_close(m_socket);
+#endif
}
void
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 82c723b7..5218cd0a 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -34,6 +34,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
/* general */
register_command(new TrexRpcCmdPing());
+ register_command(new TrexRpcPublishNow());
register_command(new TrexRpcCmdGetCmds());
register_command(new TrexRpcCmdGetVersion());
register_command(new TrexRpcCmdGetSysInfo());
diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp
index 215315e0..13f264cf 100644
--- a/src/sim/trex_sim_stateless.cpp
+++ b/src/sim/trex_sim_stateless.cpp
@@ -97,6 +97,10 @@ public:
}
}
+ virtual void publish_async_data_now(uint32_t key) const {
+
+ }
+
private:
int m_dp_core_count;
};