summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2015-12-06 17:13:30 +0200
committerHanoh Haim <hhaim@cisco.com>2015-12-06 17:13:30 +0200
commit5cce0ba15e3be9246b2186b701a60c35df59d2d1 (patch)
treeebeaaacad5153c9b1f312498d8cba9a20aaad297
parent97cd0c4748938c783fc07ee2eb3450a851d3d93e (diff)
parent026f949fbafbb00fd7a21f3d84a632f5745003ea (diff)
Merge from master
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py82
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py143
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py4
-rwxr-xr-xscripts/automation/trex_control_plane/common/trex_streams.py3
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py66
-rw-r--r--src/gtest/trex_stateless_gtest.cpp204
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_stream.cpp183
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h1
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp3
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp132
-rw-r--r--src/stateless/cp/trex_stateless_port.h51
-rw-r--r--src/stateless/cp/trex_stream.h9
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp154
-rw-r--r--src/stateless/cp/trex_streams_compiler.h37
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp2
15 files changed, 639 insertions, 435 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 6793a4ca..e38c6ca7 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -159,39 +159,94 @@ class CTRexAsyncClient():
self.stats = TrexAsyncStatsManager()
+ self.connected = False
+
+ # connects the async channel
+ def connect (self):
+
+ if self.connected:
+ self.disconnect()
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
+ print "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+
+ # before running the thread - mark as active
self.active = True
- self.t = threading.Thread(target = self.run)
+ self.alive = False
+ self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
+ self.connected = True
- def run (self):
+ # wait for data streaming from the server
+ timeout = time.time() + 5
+ while not self.alive:
+ time.sleep(0.01)
+ if time.time() > timeout:
+ self.disconnect()
+ return False, "*** [subscriber] - no data flow from server at : " + self.tr
- # Socket to talk to server
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
+ return True, ""
+
+
+ # disconnect
+ def disconnect (self):
+ if not self.connected:
+ return
+
+ # signal that the context was destroyed (exit the thread loop)
+ self.context.term()
+
+ # mark for join and join
+ self.active = False
+ self.t.join()
+
+ # done
+ self.connected = False
+
+ # thread function
+ def _run (self):
+
+ # no data yet...
+ self.alive = False
+ # socket must be created on the same thread
self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
- self.socket.setsockopt(zmq.RCVTIMEO, 3000)
+ self.socket.setsockopt(zmq.RCVTIMEO, 5000)
while self.active:
try:
line = self.socket.recv_string();
- self.stateless_client.on_async_alive()
+ if not self.alive:
+ self.stateless_client.on_async_alive()
+ self.alive = True
+
+ # got a timeout - mark as not alive and retry
except zmq.Again:
- self.stateless_client.on_async_dead()
+
+ if self.alive:
+ self.stateless_client.on_async_dead()
+ self.alive = False
+
continue
+ except zmq.ContextTerminated:
+ # outside thread signaled us to exit
+ self.alive = False
+ break
+
msg = json.loads(line)
name = msg['name']
@@ -201,6 +256,10 @@ class CTRexAsyncClient():
self.__dispatch(name, type, data)
+
+ # closing of socket must be from the same thread
+ self.socket.close(linger = 0)
+
def get_stats (self):
return self.stats
@@ -220,8 +279,3 @@ class CTRexAsyncClient():
else:
pass
-
- def stop (self):
- self.active = False
- self.t.join()
-
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 4436be75..3a9d74a9 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -414,6 +414,17 @@ class Port(object):
return self.ok()
+
+ def validate (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("validate", params)
+ if not rc:
+ return self.err(data)
+
+ return self.ok()
+
################# events handler ######################
def async_event_port_stopped (self):
self.state = self.STATE_STREAMS
@@ -434,14 +445,13 @@ class CTRexStatelessClient(object):
self._server_version = None
self.__err_log = None
- self._async_client = CTRexAsyncClient(server, async_port, self)
+ self.async_client = CTRexAsyncClient(server, async_port, self)
self.streams_db = CStreamsDB()
- self.connected = False
-
self.events = []
+ self.connected = False
################# events handler ######################
def add_event_log (self, msg, ev_type, show = False):
@@ -504,7 +514,8 @@ class CTRexStatelessClient(object):
self.ports[port_id].async_event_port_stopped()
def async_event_server_stopped (self):
- self.disconnect()
+ self.connected = False
+
def get_events (self):
return self.events
@@ -519,6 +530,11 @@ class CTRexStatelessClient(object):
def wrap(*args):
time1 = time.time()
ret = f(*args)
+
+ # don't want to print on error
+ if ret.bad():
+ return ret
+
delta = time.time() - time1
for unit in ['sec','ms','usec']:
@@ -566,13 +582,19 @@ class CTRexStatelessClient(object):
# connection sequence
def connect(self):
+ # clear this flag
self.connected = False
- # connect
+ # connect sync channel
rc, data = self.comm_link.connect()
if not rc:
return RC_ERR(data)
+ # connect async channel
+ rc, data = self.async_client.connect()
+ if not rc:
+ return RC_ERR(data)
+
# version
rc, data = self.transmit("get_version")
if not rc:
@@ -618,24 +640,19 @@ class CTRexStatelessClient(object):
def disconnect(self):
- self.connected = False
self.comm_link.disconnect()
+ self.async_client.disconnect()
return RC_OK()
def on_async_dead (self):
- if self.is_connected():
+ if self.connected:
msg = 'lost connection to server'
self.add_event_log(msg, 'local', True)
-
- self.disconnect()
+ self.connected = False
def on_async_alive (self):
- if not self.is_connected():
- msg = 'server connection restored'
- self.add_event_log(msg, 'local', True)
-
- self.cmd_connect()
+ pass
########### cached queries (no server traffic) ###########
@@ -659,7 +676,7 @@ class CTRexStatelessClient(object):
return port_ids
def get_stats_async (self):
- return self._async_client.get_stats()
+ return self.async_client.get_stats()
def get_connection_port (self):
return self.comm_link.port
@@ -842,6 +859,17 @@ class CTRexStatelessClient(object):
return rc
+ def validate (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].validate())
+
+ return rc
+
+
def get_port_stats(self, port_id=None):
pass
@@ -962,19 +990,6 @@ class CTRexStatelessClient(object):
return RC_OK()
- def cmd_pause_line (self, line):
- '''Pause active traffic in specified ports on TRex\n'''
- parser = parsing_opts.gen_parser(self,
- "pause",
- self.cmd_stop_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return RC_ERR("bad command line paramters")
-
- return self.cmd_pause(opts.ports)
-
# resume cmd
def cmd_resume (self, port_id_list):
@@ -995,19 +1010,6 @@ class CTRexStatelessClient(object):
return RC_OK()
- def cmd_resume_line (self, line):
- '''Resume active traffic in specified ports on TRex\n'''
- parser = parsing_opts.gen_parser(self,
- "resume",
- self.cmd_stop_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return RC_ERR("bad command line paramters")
-
- return self.cmd_resume(opts.ports)
-
# start cmd
def cmd_start (self, port_id_list, stream_list, mult, force, duration):
@@ -1043,6 +1045,15 @@ class CTRexStatelessClient(object):
return RC_OK()
+
+ def cmd_validate (self, port_id_list):
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating streams on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
############## High Level API With Parser ################
@timing
def cmd_start_line (self, line):
@@ -1126,7 +1137,53 @@ class CTRexStatelessClient(object):
def cmd_reset_line (self, line):
return self.cmd_reset()
-
+
+ @timing
+ def cmd_pause_line (self, line):
+ '''Pause active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "pause",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_pause(opts.ports)
+
+
+ @timing
+ def cmd_resume_line (self, line):
+ '''Resume active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "resume",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_resume(opts.ports)
+
+
+ @timing
+ def cmd_validate_line (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ parser = parsing_opts.gen_parser(self,
+ "validate",
+ self.cmd_validate_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_validate(opts.ports)
+
+
def cmd_exit_line (self, line):
print format_text("Exiting\n", 'bold')
# a way to exit
@@ -1262,5 +1319,3 @@ class CTRexStatelessClient(object):
if __name__ == "__main__":
pass
-
-
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index a5789c46..90d7f8e8 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -133,7 +133,7 @@ class JsonRpcClient(object):
tries += 1
if tries > 10:
self.disconnect()
- return CmdResponse(False, "Failed to send message to server")
+ return CmdResponse(False, "*** [RPC] - Failed to send message to server")
tries = 0
@@ -145,7 +145,7 @@ class JsonRpcClient(object):
tries += 1
if tries > 10:
self.disconnect()
- return CmdResponse(False, "Failed to get server response")
+ return CmdResponse(False, "*** [RPC] - Failed to get server response")
self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py
index c2823445..89de7286 100755
--- a/scripts/automation/trex_control_plane/common/trex_streams.py
+++ b/scripts/automation/trex_control_plane/common/trex_streams.py
@@ -14,7 +14,7 @@ StreamPack = namedtuple('StreamPack', ['stream_id', 'stream'])
class CStreamList(object):
def __init__(self):
- self.streams_list = {}
+ self.streams_list = OrderedDict()
self.yaml_loader = CTRexYAMLLoader(os.path.join(os.path.dirname(os.path.realpath(__file__)),
"rpc_defaults.yaml"))
@@ -82,6 +82,7 @@ class CStreamList(object):
stream_ids = {}
for idx, stream_name in enumerate(self.streams_list):
stream_ids[stream_name] = idx
+
# next, iterate over the streams and transform them from working with names to ids.
# with that build a new dict with old stream_name as the key, and StreamPack as the stored value
compiled_streams = {}
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index be8fb70e..9d855f98 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -35,7 +35,7 @@ from common.text_opts import *
from client_utils.general_utils import user_input, get_current_user
import trex_status
import parsing_opts
-
+from functools import wraps
__version__ = "1.1"
@@ -128,6 +128,18 @@ class TRexConsole(TRexGeneralCmd):
################### internal section ########################
+ def verify_connected(f):
+ @wraps(f)
+ def wrap(*args):
+ inst = args[0]
+ if not inst.stateless_client.is_connected():
+ print format_text("\nNot connected to server\n", 'bold')
+ return
+
+ ret = f(*args)
+ return ret
+
+ return wrap
def get_console_identifier(self):
return "{context}_{server}".format(context=self.__class__.__name__,
@@ -141,6 +153,7 @@ class TRexConsole(TRexGeneralCmd):
self.__dict__[name] = getattr(self.trex_console, name)
def postcmd(self, stop, line):
+
if self.stateless_client.is_connected():
self.prompt = "TRex > "
else:
@@ -207,13 +220,9 @@ class TRexConsole(TRexGeneralCmd):
####################### shell commands #######################
+ @verify_connected
def do_ping (self, line):
'''Ping the server\n'''
-
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
rc = self.stateless_client.cmd_ping()
if rc.bad():
return
@@ -303,13 +312,10 @@ class TRexConsole(TRexGeneralCmd):
if (l > 2) and (s[l - 2] in file_flags):
return TRexConsole.tree_autocomplete(s[l - 1])
+ @verify_connected
def do_start(self, line):
'''Start selected traffic in specified port(s) on TRex\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_start_line(line)
@@ -317,64 +323,57 @@ class TRexConsole(TRexGeneralCmd):
self.do_start("-h")
############# stop
+ @verify_connected
def do_stop(self, line):
'''stops port(s) transmitting traffic\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_stop_line(line)
def help_stop(self):
self.do_stop("-h")
############# update
+ @verify_connected
def do_update(self, line):
'''update speed of port(s)currently transmitting traffic\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_update_line(line)
def help_update (self):
self.do_update("-h")
############# pause
+ @verify_connected
def do_pause(self, line):
'''pause port(s) transmitting traffic\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_pause_line(line)
############# resume
+ @verify_connected
def do_resume(self, line):
'''resume port(s) transmitting traffic\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_resume_line(line)
########## reset
+ @verify_connected
def do_reset (self, line):
'''force stop all ports\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_reset_line(line)
-
+
+ ######### validate
+ @verify_connected
+ def do_validate (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ self.stateless_client.cmd_validate_line(line)
+
+
def help_events (self):
self.do_events("-h")
@@ -404,13 +403,10 @@ class TRexConsole(TRexGeneralCmd):
print format_text("\n\nEvent log was cleared\n\n")
# tui
+ @verify_connected
def do_tui (self, line):
'''Shows a graphical console\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.do_verbose('off')
trex_status.show_trex_status(self.stateless_client)
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index ea54a935..566e7ed4 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -30,6 +30,7 @@ limitations under the License.
#include <trex_stateless_port.h>
#include <trex_rpc_server_api.h>
#include <iostream>
+#include <vector>
#define EXPECT_EQ_UINT32(a,b) EXPECT_EQ((uint32_t)(a),(uint32_t)(b))
@@ -435,11 +436,9 @@ TEST_F(basic_stl, basic_pause_resume0) {
// stream - clean
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+ std::vector<TrexStreamsCompiledObj *> objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg_queue.add_msg(lpStartCmd);
@@ -499,14 +498,9 @@ void CBBStartStopDelay2::call_after_init(CBasicStl * m_obj){
streams.push_back(stream1);
// stream - clean
-
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
-
- assert(compile.compile(streams, comp_obj) );
-
-
- /* start with different event id */
- TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(m_port_id, 1, comp_obj.clone(), 10.0 /*sec */ );
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 1, objs[0], 10.0 /*sec */ );
m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */
@@ -552,12 +546,9 @@ TEST_F(basic_stl, single_pkt_bb_start_stop_delay2) {
streams.push_back(stream1);
// stream - clean
-
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg_queue.add_msg(lpStartCmd);
@@ -633,12 +624,9 @@ TEST_F(basic_stl, single_pkt_bb_start_stop_delay1) {
streams.push_back(stream1);
// stream - clean
-
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg_queue.add_msg(lpStartCmd);
@@ -687,12 +675,10 @@ TEST_F(basic_stl, single_pkt_bb_start_stop3) {
streams.push_back(stream1);
// stream - clean
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id);
TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(port_id);
@@ -740,14 +726,12 @@ TEST_F(basic_stl, single_pkt_bb_start_stop2) {
streams.push_back(stream1);
// stream - clean
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id);
- TrexStatelessDpStart * lpStartCmd1 = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+ TrexStatelessDpStart * lpStartCmd1 = new TrexStatelessDpStart(port_id, 0, objs[0]->clone(), 10.0 /*sec */ );
t1.m_msg_queue.add_msg(lpStartCmd);
@@ -795,12 +779,10 @@ TEST_F(basic_stl, single_pkt_bb_start_stop) {
streams.push_back(stream1);
// stream - clean
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id);
@@ -880,14 +862,13 @@ TEST_F(basic_stl, simple_prog4) {
streams.push_back(stream2);
- TrexStreamsCompiledObj comp_obj(0,1.0);
+ uint8_t port_id = 0;
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 0, objs[0], 20.0 /*sec */ );
- EXPECT_TRUE(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 20.0 );
-
-
- t1.m_msg = lpstart;
+ t1.m_msg = lpStartCmd;
bool res=t1.init();
@@ -950,11 +931,10 @@ TEST_F(basic_stl, simple_prog3) {
streams.push_back(stream2);
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
- EXPECT_TRUE(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 50.0 );
+ uint8_t port_id = 0;
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpstart = new TrexStatelessDpStart(port_id, 0, objs[0], 50.0 /*sec */ );
t1.m_msg = lpstart;
@@ -1011,13 +991,10 @@ TEST_F(basic_stl, simple_prog2) {
pcap.clone_packet_into_stream(stream2);
streams.push_back(stream2);
-
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
- EXPECT_TRUE(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 );
-
+ uint8_t port_id = 0;
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpstart = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg = lpstart;
@@ -1074,11 +1051,10 @@ TEST_F(basic_stl, simple_prog1) {
streams.push_back(stream2);
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
- EXPECT_TRUE(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 );
+ uint8_t port_id = 0;
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpstart = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg = lpstart;
@@ -1119,12 +1095,10 @@ TEST_F(basic_stl, single_pkt_burst1) {
streams.push_back(stream1);
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 );
-
+ uint8_t port_id = 0;
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpstart = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg = lpstart;
@@ -1170,11 +1144,9 @@ TEST_F(basic_stl, single_pkt) {
// stream - clean
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpstart = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg = lpstart;
@@ -1226,12 +1198,11 @@ TEST_F(basic_stl, multi_pkt1) {
streams.push_back(stream2);
- // stream - clean
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10 );
+ // stream - clean
+ uint8_t port_id = 0;
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpstart = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg = lpstart;
@@ -1290,11 +1261,10 @@ TEST_F(basic_stl, multi_pkt2) {
// stream - clean
- TrexStreamsCompiledObj comp_obj(0,5.0);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10 );
+ uint8_t port_id = 0;
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs, 1, 5.0));
+ TrexStatelessDpStart *lpstart = new TrexStatelessDpStart(port_id, 0, objs[0], 10.0 /*sec */ );
t1.m_msg = lpstart;
@@ -1336,11 +1306,10 @@ TEST_F(basic_stl, multi_burst1) {
streams.push_back(stream1);
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
- assert(compile.compile(streams, comp_obj) );
-
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 40 );
+ uint8_t port_id = 0;
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpstart = new TrexStatelessDpStart(port_id, 0, objs[0], 40.0 /*sec */ );
t1.m_msg = lpstart;
@@ -1370,10 +1339,9 @@ TEST_F(basic_stl, compile_bad_1) {
streams.push_back(stream1);
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
std::string err_msg;
- EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+ std::vector<TrexStreamsCompiledObj *>objs;
+ EXPECT_FALSE(compile.compile(0, streams, objs, 1, 1, &err_msg));
delete stream1;
@@ -1403,10 +1371,12 @@ TEST_F(basic_stl, compile_bad_2) {
streams.push_back(stream1);
streams.push_back(stream2);
- TrexStreamsCompiledObj comp_obj(0,1.0);
+ uint8_t port_id = 0;
std::string err_msg;
- EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+ std::vector<TrexStreamsCompiledObj *>objs;
+ EXPECT_FALSE(compile.compile(port_id, streams, objs, 1, 1, &err_msg));
+
delete stream1;
delete stream2;
@@ -1482,10 +1452,10 @@ TEST_F(basic_stl, compile_bad_3) {
streams.push_back(stream);
/* compile */
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
std::string err_msg;
- EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+ std::vector<TrexStreamsCompiledObj *>objs;
+ EXPECT_FALSE(compile.compile(0, streams, objs, 1, 1, &err_msg));
+
for (auto stream : streams) {
delete stream;
@@ -1534,11 +1504,11 @@ TEST_F(basic_stl, compile_with_warnings) {
/* compile */
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
std::string err_msg;
- EXPECT_TRUE(compile.compile(streams, comp_obj, &err_msg));
-
+ std::vector<TrexStreamsCompiledObj *>objs;
+ EXPECT_TRUE(compile.compile(0, streams, objs, 1, 1, &err_msg));
+ delete objs[0];
+
EXPECT_TRUE(compile.get_last_compile_warnings().size() == 1);
for (auto stream : streams) {
@@ -1573,20 +1543,22 @@ TEST_F(basic_stl, compile_good_stream_id_compres) {
streams.push_back(stream1);
streams.push_back(stream2);
- TrexStreamsCompiledObj comp_obj(0,1.0);
-
+ uint8_t port_id = 0;
std::string err_msg;
- EXPECT_TRUE(compile.compile(streams, comp_obj, &err_msg));
+ std::vector<TrexStreamsCompiledObj *>objs;
+ EXPECT_TRUE(compile.compile(port_id, streams, objs, 1, 1, &err_msg));
printf(" %s \n",err_msg.c_str());
- comp_obj.Dump(stdout);
+ objs[0]->Dump(stdout);
+
+ EXPECT_EQ_UINT32(objs[0]->get_objects()[0].m_stream->m_stream_id,0);
+ EXPECT_EQ_UINT32(objs[0]->get_objects()[0].m_stream->m_next_stream_id,1);
- EXPECT_EQ_UINT32(comp_obj.get_objects()[0].m_stream->m_stream_id,0);
- EXPECT_EQ_UINT32(comp_obj.get_objects()[0].m_stream->m_next_stream_id,1);
+ EXPECT_EQ_UINT32(objs[0]->get_objects()[1].m_stream->m_stream_id,1);
+ EXPECT_EQ_UINT32(objs[0]->get_objects()[1].m_stream->m_next_stream_id,0);
- EXPECT_EQ_UINT32(comp_obj.get_objects()[1].m_stream->m_stream_id,1);
- EXPECT_EQ_UINT32(comp_obj.get_objects()[1].m_stream->m_next_stream_id,0);
+ delete objs[0];
delete stream1;
delete stream2;
@@ -1648,14 +1620,12 @@ TEST_F(basic_stl, dp_stop_event) {
// stream - clean
- TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+ std::vector<TrexStreamsCompiledObj *>objs;
+ assert(compile.compile(port_id, streams, objs));
+ TrexStatelessDpStart *lpStartCmd = new TrexStatelessDpStart(port_id, 17, objs[0], 10.0 /*sec */ );
- assert(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(port_id, 17, comp_obj.clone(), 10.0 /*sec */ );
-
-
- t1.m_msg = lpstart;
+ t1.m_msg = lpStartCmd;
/* let me handle these */
DpToCpHandlerStopEvent handler(17);
diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
index 51df3159..1e8328dc 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
@@ -23,6 +23,7 @@ limitations under the License.
#include <trex_stream.h>
#include <trex_stateless.h>
#include <trex_stateless_port.h>
+#include <trex_streams_compiler.h>
#include <iostream>
@@ -52,7 +53,8 @@ static uint64_t str2num(const string &str) {
trex_rpc_cmd_rc_e
TrexRpcCmdAddStream::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_int(params, "port_id", result);
+ uint8_t port_id = parse_port(params, result);
+
uint32_t stream_id = parse_int(params, "stream_id", result);
const Json::Value &section = parse_object(params, "stream", result);
@@ -296,15 +298,7 @@ TrexRpcCmdAddStream::validate_stream(const TrexStream *stream, Json::Value &resu
generate_execute_err(result, ss.str());
}
- /* port id should be between 0 and count - 1 */
- if (stream->m_port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- delete stream;
- generate_execute_err(result, ss.str());
- }
-
- /* add the stream to the port's stream table */
+ /* add the stream to the port's stream table */
TrexStatelessPort * port = get_stateless_obj()->get_port_by_id(stream->m_port_id);
/* does such a stream exists ? */
@@ -323,17 +317,11 @@ TrexRpcCmdAddStream::validate_stream(const TrexStream *stream, Json::Value &resu
**************************/
trex_rpc_cmd_rc_e
TrexRpcCmdRemoveStream::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
- uint32_t stream_id = parse_int(params, "stream_id", result);
-
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ uint32_t stream_id = parse_int(params, "stream_id", result);
TrexStream *stream = port->get_stream_by_id(stream_id);
if (!stream) {
@@ -362,14 +350,8 @@ TrexRpcCmdRemoveStream::_run(const Json::Value &params, Json::Value &result) {
**************************/
trex_rpc_cmd_rc_e
TrexRpcCmdRemoveAllStreams::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
try {
@@ -393,27 +375,20 @@ trex_rpc_cmd_rc_e
TrexRpcCmdGetStreamList::_run(const Json::Value &params, Json::Value &result) {
std::vector<uint32_t> stream_list;
- uint8_t port_id = parse_byte(params, "port_id", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
-
- TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ uint8_t port_id = parse_port(params, result);
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- port->get_id_list(stream_list);
+ port->get_id_list(stream_list);
- Json::Value json_list = Json::arrayValue;
+ Json::Value json_list = Json::arrayValue;
- for (auto stream_id : stream_list) {
- json_list.append(stream_id);
- }
+ for (auto stream_id : stream_list) {
+ json_list.append(stream_id);
+ }
- result["result"] = json_list;
+ result["result"] = json_list;
- return (TREX_RPC_CMD_OK);
+ return (TREX_RPC_CMD_OK);
}
/***************************
@@ -423,18 +398,13 @@ TrexRpcCmdGetStreamList::_run(const Json::Value &params, Json::Value &result) {
**************************/
trex_rpc_cmd_rc_e
TrexRpcCmdGetStream::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
- bool get_pkt = parse_bool(params, "get_pkt", result);
- uint32_t stream_id = parse_int(params, "stream_id", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ bool get_pkt = parse_bool(params, "get_pkt", result);
+ uint32_t stream_id = parse_int(params, "stream_id", result);
+
TrexStream *stream = port->get_stream_by_id(stream_id);
if (!stream) {
@@ -462,17 +432,11 @@ TrexRpcCmdGetStream::_run(const Json::Value &params, Json::Value &result) {
trex_rpc_cmd_rc_e
TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
- double duration = parse_double(params, "duration", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
-
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ double duration = parse_double(params, "duration", result);
+
/* multiplier */
const Json::Value &mul_obj = parse_object(params, "mul", result);
@@ -504,14 +468,8 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
**************************/
trex_rpc_cmd_rc_e
TrexRpcCmdStopTraffic::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
try {
@@ -531,17 +489,12 @@ TrexRpcCmdStopTraffic::_run(const Json::Value &params, Json::Value &result) {
**************************/
trex_rpc_cmd_rc_e
TrexRpcCmdGetAllStreams::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
- bool get_pkt = parse_bool(params, "get_pkt", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
-
+
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+ bool get_pkt = parse_bool(params, "get_pkt", result);
+
std::vector <TrexStream *> streams;
port->get_object_list(streams);
@@ -573,14 +526,7 @@ TrexRpcCmdGetAllStreams::_run(const Json::Value &params, Json::Value &result) {
trex_rpc_cmd_rc_e
TrexRpcCmdPauseTraffic::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
-
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
try {
@@ -601,14 +547,7 @@ TrexRpcCmdPauseTraffic::_run(const Json::Value &params, Json::Value &result) {
trex_rpc_cmd_rc_e
TrexRpcCmdResumeTraffic::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
-
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
try {
@@ -629,14 +568,7 @@ TrexRpcCmdResumeTraffic::_run(const Json::Value &params, Json::Value &result) {
trex_rpc_cmd_rc_e
TrexRpcCmdUpdateTraffic::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
-
- if (port_id >= get_stateless_obj()->get_port_count()) {
- std::stringstream ss;
- ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
- generate_execute_err(result, ss.str());
- }
-
+ uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
/* multiplier */
@@ -661,3 +593,54 @@ TrexRpcCmdUpdateTraffic::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
+/***************************
+ * validate
+ *
+ * checks that the port
+ * attached streams are
+ * valid as a program
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdValidate::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_port(params, result);
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ const TrexStreamsGraphObj *graph = NULL;
+
+ try {
+ graph = port->validate();
+ }
+ catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+
+ result["result"]["rate"]["max_bps"] = graph->get_max_bps();
+ result["result"]["rate"]["max_pps"] = graph->get_max_pps();
+ result["result"]["rate"]["max_line_util"] = graph->get_max_bps() / port->get_port_speed_bps();
+
+ result["result"]["graph"]["events_count"] = (int)graph->get_events().size();
+
+ result["result"]["graph"]["events"] = Json::arrayValue;
+ Json::Value &events_json = result["result"]["graph"]["events"];
+
+ int index = 0;
+ for (const auto &ev : graph->get_events()) {
+ Json::Value ev_json;
+
+ ev_json["time_usec"] = ev.time;
+ ev_json["diff_bps"] = ev.diff_bps;
+ ev_json["diff_pps"] = ev.diff_pps;
+ ev_json["stream_id"] = ev.stream_id;
+
+ events_json.append(ev_json);
+
+ index++;
+ if (index >= 100) {
+ break;
+ }
+ }
+
+
+ return (TREX_RPC_CMD_OK);
+}
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index b4f37e3b..80bef3b0 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -114,5 +114,6 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 2, true);
TREX_RPC_CMD_DEFINE(TrexRpcCmdSyncUser, "sync_user", 2, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false);
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index a65bbccf..52258b88 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -57,8 +57,11 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdPauseTraffic());
register_command(new TrexRpcCmdResumeTraffic());
register_command(new TrexRpcCmdUpdateTraffic());
+
+ register_command(new TrexRpcCmdValidate());
}
+
TrexRpcCommandsTable::~TrexRpcCommandsTable() {
for (auto cmd : m_rpc_cmd_table) {
delete cmd.second;
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 95bdca0b..0e45bf0b 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -115,7 +115,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration)
/* on start - we can only provide absolute values */
assert(mul.m_op == TrexPortMultiplier::OP_ABS);
- double per_core_mul = calculate_effective_mul(mul);
+ double factor = calculate_effective_factor(mul);
/* fetch all the streams from the table */
vector<TrexStream *> streams;
@@ -123,12 +123,18 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration)
/* compiler it */
- TrexStreamsCompiler compiler;
- TrexStreamsCompiledObj *compiled_obj = new TrexStreamsCompiledObj(m_port_id, per_core_mul);
+ std::vector<TrexStreamsCompiledObj *> compiled_objs;
+ std::string fail_msg;
- bool rc = compiler.compile(streams, *compiled_obj);
+ TrexStreamsCompiler compiler;
+ bool rc = compiler.compile(m_port_id,
+ streams,
+ compiled_objs,
+ get_dp_core_count(),
+ factor,
+ &fail_msg);
if (!rc) {
- throw TrexRpcException("Failed to compile streams");
+ throw TrexRpcException(fail_msg);
}
/* generate a message to all the relevant DP cores to start transmitting */
@@ -137,21 +143,29 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration)
/* mark that DP event of stoppped is possible */
m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration);
-
- m_last_all_streams_continues = compiled_obj->get_all_streams_continues();
- m_last_duration =duration;
+ /* update object status */
+ m_factor = factor;
+ m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
+ m_last_duration = duration;
change_state(PORT_STATE_TX);
- send_message_to_dp(start_msg);
+
+ /* update the DP - messages will be freed by the DP */
+ int index = 0;
+ for (auto core_id : m_cores_id_list) {
+
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+ send_message_to_dp(core_id, start_msg);
+
+ index++;
+ }
+ /* update subscribers */
Json::Value data;
data["port_id"] = m_port_id;
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
-
- /* save the per core multiplier for update messages */
- m_current_per_core_m = per_core_mul;
+
}
@@ -179,7 +193,7 @@ TrexStatelessPort::stop_traffic(void) {
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(stop_msg);
change_state(PORT_STATE_STREAMS);
@@ -202,9 +216,9 @@ TrexStatelessPort::pause_traffic(void) {
throw TrexRpcException(" pause is supported when duration is not enable is start command ");
}
- TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id);
+ TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(pause_msg);
change_state(PORT_STATE_PAUSE);
}
@@ -215,9 +229,9 @@ TrexStatelessPort::resume_traffic(void) {
verify_state(PORT_STATE_PAUSE);
/* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id);
+ TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(resume_msg);
change_state(PORT_STATE_TX);
}
@@ -230,19 +244,19 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) {
verify_state(PORT_STATE_TX | PORT_STATE_PAUSE);
/* generate a message to all the relevant DP cores to start transmitting */
- double new_per_core_m = calculate_effective_mul(mul);
+ double new_factor = calculate_effective_factor(mul);
switch (mul.m_op) {
case TrexPortMultiplier::OP_ABS:
- factor = new_per_core_m / m_current_per_core_m;
+ factor = new_factor / m_factor;
break;
case TrexPortMultiplier::OP_ADD:
- factor = (m_current_per_core_m + new_per_core_m) / m_current_per_core_m;
+ factor = (m_factor + new_factor) / m_factor;
break;
case TrexPortMultiplier::OP_SUB:
- factor = (m_current_per_core_m - new_per_core_m) / m_current_per_core_m;
+ factor = (m_factor - new_factor) / m_factor;
if (factor <= 0) {
throw TrexRpcException("Update request will lower traffic to less than zero");
}
@@ -255,9 +269,9 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) {
TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
- send_message_to_dp(update_msg);
+ send_message_to_all_dp(update_msg);
- m_current_per_core_m *= factor;
+ m_factor *= factor;
}
@@ -356,15 +370,22 @@ TrexStatelessPort::encode_stats(Json::Value &port) {
}
void
-TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) {
+TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg) {
for (auto core_id : m_cores_id_list) {
-
- /* send the message to the core */
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
- ring->Enqueue((CGenNode *)msg->clone());
+ send_message_to_dp(core_id, msg->clone());
}
+ /* original was not sent - delete it */
+ delete msg;
+}
+
+void
+TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) {
+
+ /* send the message to the core */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
+ ring->Enqueue((CGenNode *)msg);
}
/**
@@ -393,7 +414,7 @@ TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
}
uint64_t
-TrexStatelessPort::get_port_speed_bps() {
+TrexStatelessPort::get_port_speed_bps() const {
switch (m_speed) {
case TrexPlatformApi::SPEED_1G:
return (1LLU * 1000 * 1000 * 1000);
@@ -410,11 +431,11 @@ TrexStatelessPort::get_port_speed_bps() {
}
double
-TrexStatelessPort::calculate_effective_mul(const TrexPortMultiplier &mul) {
+TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul) {
- /* for a simple factor request - calculate the multiplier per core */
+ /* for a simple factor request */
if (mul.m_type == TrexPortMultiplier::MUL_FACTOR) {
- return (mul.m_value / m_cores_id_list.size());
+ return (mul.m_value);
}
/* we now need the graph - generate it if we don't have it (happens once) */
@@ -424,19 +445,19 @@ TrexStatelessPort::calculate_effective_mul(const TrexPortMultiplier &mul) {
switch (mul.m_type) {
case TrexPortMultiplier::MUL_BPS:
- return ( (mul.m_value / m_graph_obj->get_max_bps()) / m_cores_id_list.size());
+ return (mul.m_value / m_graph_obj->get_max_bps());
case TrexPortMultiplier::MUL_PPS:
- return ( (mul.m_value / m_graph_obj->get_max_pps()) / m_cores_id_list.size());
+ return (mul.m_value / m_graph_obj->get_max_pps());
case TrexPortMultiplier::MUL_PERCENTAGE:
/* if abs percentage is from the line speed - otherwise its from the current speed */
if (mul.m_op == TrexPortMultiplier::OP_ABS) {
double required = (mul.m_value / 100.0) * get_port_speed_bps();
- return ( (required / m_graph_obj->get_max_bps()) / m_cores_id_list.size());
+ return (required / m_graph_obj->get_max_bps());
} else {
- return (m_current_per_core_m * (mul.m_value / 100.0));
+ return (m_factor * (mul.m_value / 100.0));
}
default:
@@ -518,3 +539,40 @@ TrexPortMultiplier(const std::string &type_str, const std::string &op_str, doubl
}
+const TrexStreamsGraphObj *
+TrexStatelessPort::validate(void) {
+
+ /* first compile the graph */
+
+ vector<TrexStream *> streams;
+ get_object_list(streams);
+
+ if (streams.size() == 0) {
+ throw TrexException("no streams attached to port");
+ }
+
+ TrexStreamsCompiler compiler;
+ std::vector<TrexStreamsCompiledObj *> compiled_objs;
+
+ std::string fail_msg;
+ bool rc = compiler.compile(m_port_id,
+ streams,
+ compiled_objs,
+ get_dp_core_count(),
+ 1.0,
+ &fail_msg);
+ if (!rc) {
+ throw TrexException(fail_msg);
+ }
+
+ for (auto obj : compiled_objs) {
+ delete obj;
+ }
+
+ /* now create a stream graph */
+ if (!m_graph_obj) {
+ generate_streams_graph();
+ }
+
+ return m_graph_obj;
+}
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 2d15a1cc..28e42a17 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -76,6 +76,16 @@ public:
void release(void);
/**
+ * validate the state of the port before start
+ * it will return a stream graph
+ * containing information about the streams
+ * configured on this port
+ *
+ * on error it throws TrexException
+ */
+ const TrexStreamsGraphObj *validate(void);
+
+ /**
* start traffic
* throws TrexException in case of an error
*/
@@ -172,6 +182,7 @@ public:
verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
m_stream_table.add_stream(stream);
+ delete_streams_graph();
change_state(PORT_STATE_STREAMS);
}
@@ -180,6 +191,7 @@ public:
verify_state(PORT_STATE_STREAMS);
m_stream_table.remove_stream(stream);
+ delete_streams_graph();
if (m_stream_table.size() == 0) {
change_state(PORT_STATE_IDLE);
@@ -190,6 +202,7 @@ public:
verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
m_stream_table.remove_and_delete_all_streams();
+ delete_streams_graph();
change_state(PORT_STATE_IDLE);
}
@@ -212,13 +225,27 @@ public:
/**
+ * returns the number of DP cores linked to this port
+ *
+ */
+ uint8_t get_dp_core_count() {
+ return m_cores_id_list.size();
+ }
+
+ /**
* returns the traffic multiplier currently being used by the DP
*
*/
double get_multiplier() {
- return (m_current_per_core_m * m_cores_id_list.size());
+ return (m_factor);
}
+ /**
+ * get port speed in bits per second
+ *
+ */
+ uint64_t get_port_speed_bps() const;
+
private:
@@ -254,7 +281,17 @@ private:
std::string generate_handler();
- void send_message_to_dp(TrexStatelessCpToDpMsgBase *msg);
+ /**
+ * send message to all cores using duplicate
+ *
+ */
+ void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg);
+
+ /**
+ * send message to specific DP core
+ *
+ */
+ void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
/**
* triggered when event occurs
@@ -267,13 +304,9 @@ private:
* calculate effective M per core
*
*/
- double calculate_effective_mul(const TrexPortMultiplier &mul);
+ double calculate_effective_factor(const TrexPortMultiplier &mul);
- /**
- * get port speed in bits per second
- *
- */
- uint64_t get_port_speed_bps();
+
/**
* generates a graph of streams graph
@@ -303,7 +336,7 @@ private:
bool m_last_all_streams_continues;
double m_last_duration;
- double m_current_per_core_m;
+ double m_factor;
TrexDpPortEvents m_dp_events;
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index 3e48d7e4..b991b05f 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -129,12 +129,13 @@ public:
}
/* create new stream */
- TrexStream * clone_as_dp(){
- TrexStream * dp=new TrexStream(m_type,m_port_id,m_stream_id);
+ TrexStream * clone_as_dp() const {
+
+ TrexStream *dp = new TrexStream(m_type,m_port_id,m_stream_id);
- dp->m_isg_usec = m_isg_usec;
- dp->m_next_stream_id = m_next_stream_id;
+ dp->m_isg_usec = m_isg_usec;
+ dp->m_next_stream_id = m_next_stream_id;
dp->m_enabled = m_enabled;
dp->m_self_start = m_self_start;
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index c8aa1e40..d83e4ab6 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -142,8 +142,9 @@ private:
/**************************************
* stream compiled object
*************************************/
-TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id, double mul) : m_port_id(port_id), m_mul(mul) {
- m_all_continues=false;
+TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id) {
+ m_port_id = port_id;
+ m_all_continues = false;
}
TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
@@ -155,53 +156,40 @@ TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
void
-TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream){
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream *stream){
obj_st obj;
- obj.m_stream = stream->clone_as_dp();
+ obj.m_stream = stream;
m_objs.push_back(obj);
}
-void
-TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream,
- uint32_t my_dp_id, int next_dp_id) {
- obj_st obj;
-
- obj.m_stream = stream->clone_as_dp();
- /* compress the id's*/
- obj.m_stream->fix_dp_stream_id(my_dp_id,next_dp_id);
-
- m_objs.push_back(obj);
-}
-
-void TrexStreamsCompiledObj::Dump(FILE *fd){
- for (auto obj : m_objs) {
- obj.m_stream->Dump(fd);
- }
-}
-
-
TrexStreamsCompiledObj *
TrexStreamsCompiledObj::clone() {
- /* use multiplier of 1 to avoid double mult */
- TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id, 1);
+ TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id);
/**
* clone each element
*/
for (auto obj : m_objs) {
- new_compiled_obj->add_compiled_stream(obj.m_stream);
+ TrexStream *new_stream = obj.m_stream->clone_as_dp();
+ new_compiled_obj->add_compiled_stream(new_stream);
}
- new_compiled_obj->m_mul = m_mul;
-
return new_compiled_obj;
+
+}
+
+void TrexStreamsCompiledObj::Dump(FILE *fd){
+ for (auto obj : m_objs) {
+ obj.m_stream->Dump(fd);
+ }
}
+
void
TrexStreamsCompiler::add_warning(const std::string &warning) {
m_warnings.push_back("*** warning: " + warning);
@@ -219,7 +207,7 @@ TrexStreamsCompiler::check_stream(const TrexStream *stream) {
/* cont. stream can point only on itself */
if (stream->get_type() == TrexStream::stCONTINUOUS) {
if (stream->m_next_stream_id != -1) {
- ss << "continous stream '" << stream->m_stream_id << "' cannot point on another stream";
+ ss << "continous stream '" << stream->m_stream_id << "' cannot point to another stream";
err(ss.str());
}
}
@@ -381,12 +369,14 @@ TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams,
* stream compiler
*************************************/
bool
-TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
- TrexStreamsCompiledObj &obj,
- std::string *fail_msg) {
+TrexStreamsCompiler::compile(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count,
+ double factor,
+ std::string *fail_msg) {
#if 0
- fprintf(stdout,"------------pre compile \n");
for (auto stream : streams) {
stream->Dump(stdout);
}
@@ -398,7 +388,7 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
/* compile checks */
try {
- pre_compile_check(streams,nodes);
+ pre_compile_check(streams, nodes);
} catch (const TrexException &ex) {
if (fail_msg) {
*fail_msg = ex.what();
@@ -408,38 +398,94 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
return false;
}
+ /* check if all are cont. streams */
+ bool all_continues = true;
+ for (const auto stream : streams) {
+ if (stream->get_type() != TrexStream::stCONTINUOUS) {
+ all_continues = false;
+ break;
+ }
+ }
+
+ /* allocate objects for all DP cores */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id);
+ obj->m_all_continues = all_continues;
+ objs.push_back(obj);
+ }
- bool all_continues=true;
- /* for now we do something trivial, */
+ /* compile all the streams */
for (auto stream : streams) {
/* skip non-enabled streams */
if (!stream->m_enabled) {
continue;
}
- if (stream->get_type() != TrexStream::stCONTINUOUS ) {
- all_continues=false;
- }
-
- int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id;
- assert(new_id>=0);
- uint32_t my_stream_id = (uint32_t)new_id;
- int my_next_stream_id=-1;
- if (stream->m_next_stream_id>=0) {
- my_next_stream_id=nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
- }
-
- /* add it */
- obj.add_compiled_stream(stream,
- my_stream_id,
- my_next_stream_id
- );
+
+ /* compile a single stream to all cores */
+ compile_stream(stream, factor, dp_core_count, objs, nodes);
}
- obj.m_all_continues =all_continues;
+
return true;
}
+/**
+ * compiles a single stream to DP objects
+ *
+ * @author imarom (03-Dec-15)
+ *
+ */
+void
+TrexStreamsCompiler::compile_stream(const TrexStream *stream,
+ double factor,
+ uint8_t dp_core_count,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ GraphNodeMap &nodes) {
+
+
+ /* fix the stream ids */
+ int new_id = nodes.get(stream->m_stream_id)->m_compressed_stream_id;
+ assert(new_id >= 0);
+
+ int new_next_id = -1;
+ if (stream->m_next_stream_id >= 0) {
+ new_next_id = nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
+ }
+
+ /* calculate rate */
+ double per_core_rate = (stream->m_pps * (factor / dp_core_count));
+ int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count);
+
+ std::vector<TrexStream *> per_core_streams(dp_core_count);
+
+ /* for each core - creates its own version of the stream */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ TrexStream *dp_stream = stream->clone_as_dp();
+
+ /* fix stream ID */
+ dp_stream->fix_dp_stream_id(new_id, new_next_id);
+
+
+ /* adjust rate and packets count */
+ dp_stream->m_pps = per_core_rate;
+ dp_stream->m_burst_total_pkts = per_core_burst_total_pkts;
+
+ per_core_streams[i] = dp_stream;
+ }
+
+ /* take care of remainder from a burst */
+ int burst_remainder = stream->m_burst_total_pkts - (per_core_burst_total_pkts * dp_core_count);
+ per_core_streams[0]->m_burst_total_pkts += burst_remainder;
+
+ /* attach the compiled stream of every core to its object */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ objs[i]->add_compiled_stream(per_core_streams[i]);
+ }
+
+
+}
+
/**************************************
* streams graph
*************************************/
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
index a4c12f8d..e193a749 100644
--- a/src/stateless/cp/trex_streams_compiler.h
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -38,9 +38,10 @@ class GraphNodeMap;
*/
class TrexStreamsCompiledObj {
friend class TrexStreamsCompiler;
+
public:
- TrexStreamsCompiledObj(uint8_t port_id, double m_mul);
+ TrexStreamsCompiledObj(uint8_t port_id);
~TrexStreamsCompiledObj();
struct obj_st {
@@ -56,32 +57,22 @@ public:
return (m_port_id);
}
- /**
- * clone the compiled object
- *
- */
- TrexStreamsCompiledObj * clone();
-
- double get_multiplier(){
- return (m_mul);
- }
-
bool get_all_streams_continues(){
return (m_all_continues);
}
void Dump(FILE *fd);
+ TrexStreamsCompiledObj* clone();
+
private:
- void add_compiled_stream(TrexStream * stream,
- uint32_t my_dp_id, int next_dp_id);
- void add_compiled_stream(TrexStream * stream);
+ void add_compiled_stream(TrexStream *stream);
+
std::vector<obj_st> m_objs;
bool m_all_continues;
uint8_t m_port_id;
- double m_mul;
};
class TrexStreamsCompiler {
@@ -93,7 +84,13 @@ public:
* @author imarom (28-Oct-15)
*
*/
- bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj, std::string *fail_msg = NULL);
+ bool compile(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count = 1,
+ double factor = 1.0,
+ std::string *fail_msg = NULL);
+
/**
*
@@ -115,8 +112,13 @@ private:
void add_warning(const std::string &warning);
void err(const std::string &err);
+ void compile_stream(const TrexStream *stream,
+ double factor,
+ uint8_t dp_core_count,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ GraphNodeMap &nodes);
+
std::vector<std::string> m_warnings;
-
};
class TrexStreamsGraph;
@@ -157,6 +159,7 @@ public:
return m_rate_events;
}
+
private:
void add_rate_event(const rate_event_st &ev) {
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 9b62fabd..22ca922d 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -453,7 +453,7 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
node->m_pause =0;
node->m_stream_type = stream->m_type;
- node->m_next_time_offset = 1.0 / (stream->get_pps() * comp->get_multiplier());
+ node->m_next_time_offset = 1.0 / stream->get_pps();
/* stateless specific fields */
switch ( stream->m_type ) {