summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py25
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py222
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py2
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py12
-rwxr-xr-xsrc/bp_sim.cpp15
-rwxr-xr-xsrc/bp_sim.h4
-rwxr-xr-xsrc/common/pcap.cpp3
-rw-r--r--src/main_dpdk.cpp197
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp4
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp21
-rw-r--r--src/stateless/cp/trex_stateless_port.h17
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp62
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h16
-rw-r--r--src/stateless/dp/trex_stream_node.h78
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp9
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h15
16 files changed, 437 insertions, 265 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index c7d59690..0d95131f 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -335,30 +335,37 @@ class EventsHandler(object):
# private functions
+ # on rare cases events may come on a non existent prot
+ # (server was re-run with different config)
def __async_event_port_job_done (self, port_id):
- self.client.ports[port_id].async_event_port_job_done()
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_job_done()
def __async_event_port_stopped (self, port_id):
- self.client.ports[port_id].async_event_port_stopped()
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_stopped()
def __async_event_port_started (self, port_id):
- self.client.ports[port_id].async_event_port_started()
-
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_started()
def __async_event_port_paused (self, port_id):
- self.client.ports[port_id].async_event_port_paused()
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_paused()
def __async_event_port_resumed (self, port_id):
- self.client.ports[port_id].async_event_port_resumed()
-
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_port_resumed()
def __async_event_port_acquired (self, port_id, who):
- self.client.ports[port_id].async_event_acquired(who)
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_acquired(who)
def __async_event_port_released (self, port_id):
- self.client.ports[port_id].async_event_released()
+ if port_id in self.client.ports:
+ self.client.ports[port_id].async_event_released()
def __async_event_server_stopped (self):
self.client.connected = False
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index 986cb3c6..43fc29e6 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -29,17 +29,20 @@ def mult_to_factor (mult, max_bps_l2, max_pps, line_util):
# describes a single port
class Port(object):
- STATE_DOWN = 0
- STATE_IDLE = 1
- STATE_STREAMS = 2
- STATE_TX = 3
- STATE_PAUSE = 4
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+ STATE_PCAP_TX = 5
+
PortState = namedtuple('PortState', ['state_id', 'state_name'])
STATES_MAP = {STATE_DOWN: "DOWN",
STATE_IDLE: "IDLE",
STATE_STREAMS: "IDLE",
STATE_TX: "ACTIVE",
- STATE_PAUSE: "PAUSE"}
+ STATE_PAUSE: "PAUSE",
+ STATE_PCAP_TX : "ACTIVE"}
def __init__ (self, port_id, user, comm_link, session_id, info):
@@ -67,6 +70,54 @@ class Port(object):
self.owner = ''
+ # decorator to verify port is up
+ def up(func):
+ def func_wrapper(*args):
+ port = args[0]
+
+ if not port.is_up():
+ return port.err("{0} - port is down".format(func.__name__))
+
+ return func(*args)
+
+ return func_wrapper
+
+ # owned
+ def owned(func):
+ def func_wrapper(*args):
+ port = args[0]
+
+ if not port.is_up():
+ return port.err("{0} - port is down".format(func.__name__))
+
+ if not port.is_acquired():
+ return port.err("{0} - port is not owned".format(func.__name__))
+
+ return func(*args)
+
+ return func_wrapper
+
+
+ # decorator to check server is readable (port not down and etc.)
+ def writeable(func):
+ def func_wrapper(*args):
+ port = args[0]
+
+ if not port.is_up():
+ return port.err("{0} - port is down".format(func.__name__))
+
+ if not port.is_acquired():
+ return port.err("{0} - port is not owned".format(func.__name__))
+
+ if not port.is_writeable():
+ return port.err("{0} - port is not in a writeable state".format(func.__name__))
+
+ return func(*args)
+
+ return func_wrapper
+
+
+
def err(self, msg):
return RC_ERR("port {0} : {1}\n".format(self.port_id, msg))
@@ -79,7 +130,39 @@ class Port(object):
def get_formatted_speed (self):
return "{0} Gbps".format(self.info['speed'])
+ def is_acquired(self):
+ return (self.handler != None)
+
+ def is_up (self):
+ return (self.state != self.STATE_DOWN)
+
+ def is_active(self):
+ return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX)
+
+ def is_transmitting (self):
+ return (self.state == self.STATE_TX) or (self.state == self.STATE_PCAP_TX)
+
+ def is_paused (self):
+ return (self.state == self.STATE_PAUSE)
+
+ def is_writeable (self):
+ # operations on port can be done on state idle or state streams
+ return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
+
+ def get_owner (self):
+ if self.is_acquired():
+ return self.user
+ else:
+ return self.owner
+
+ def __allocate_stream_id (self):
+ id = self.next_available_id
+ self.next_available_id += 1
+ return id
+
+
# take the port
+ @up
def acquire(self, force = False, sync_streams = True):
params = {"port_id": self.port_id,
"user": self.user,
@@ -99,6 +182,7 @@ class Port(object):
# sync all the streams with the server
+ @up
def sync_streams (self):
params = {"port_id": self.port_id}
@@ -114,6 +198,7 @@ class Port(object):
return self.ok()
# release the port
+ @up
def release(self):
params = {"port_id": self.port_id,
"handler": self.handler}
@@ -129,24 +214,9 @@ class Port(object):
else:
return self.err(rc.err())
- def is_acquired(self):
- return (self.handler != None)
-
- def is_active(self):
- return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
-
- def is_transmitting (self):
- return (self.state == self.STATE_TX)
-
- def is_paused (self):
- return (self.state == self.STATE_PAUSE)
-
- def get_owner (self):
- if self.is_acquired():
- return self.user
- else:
- return self.owner
+
+ @up
def sync(self):
params = {"port_id": self.port_id}
@@ -168,6 +238,8 @@ class Port(object):
self.state = self.STATE_TX
elif port_state == "PAUSE":
self.state = self.STATE_PAUSE
+ elif port_state == "PCAP_TX":
+ self.state = self.STATE_PCAP_TX
else:
raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
@@ -182,27 +254,11 @@ class Port(object):
return self.ok()
- # return TRUE if write commands
- def is_port_writable (self):
- # operations on port can be done on state idle or state streams
- return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
-
-
- def __allocate_stream_id (self):
- id = self.next_available_id
- self.next_available_id += 1
- return id
-
# add streams
+ @writeable
def add_streams (self, streams_list):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to add streams")
-
# listify
streams_list = streams_list if isinstance(streams_list, list) else [streams_list]
@@ -274,14 +330,9 @@ class Port(object):
# remove stream from port
+ @writeable
def remove_streams (self, stream_id_list):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to remove streams")
-
# single element to list
stream_id_list = stream_id_list if isinstance(stream_id_list, list) else [stream_id_list]
@@ -315,14 +366,9 @@ class Port(object):
# remove all the streams
+ @writeable
def remove_all_streams (self):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to remove streams")
-
params = {"handler": self.handler,
"port_id": self.port_id}
@@ -349,19 +395,11 @@ class Port(object):
return self.streams
- # start traffic
+ @writeable
def start (self, mul, duration, force):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if self.state == self.STATE_DOWN:
- return self.err("Unable to start traffic - port is down")
if self.state == self.STATE_IDLE:
- return self.err("Unable to start traffic - no streams attached to port")
-
- if self.state == self.STATE_TX:
- return self.err("Unable to start traffic - port is already transmitting")
+ return self.err("unable to start traffic - no streams attached to port")
params = {"handler": self.handler,
"port_id": self.port_id,
@@ -379,15 +417,12 @@ class Port(object):
# stop traffic
# with force ignores the cached state and sends the command
+ @owned
def stop (self, force = False):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- # port is already stopped
- if not force:
- if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS):
- return self.ok()
+ # if not is not active and not force - go back
+ if not self.is_active() and not force:
+ return self.ok()
params = {"handler": self.handler,
"port_id": self.port_id}
@@ -421,19 +456,10 @@ class Port(object):
return not self.tx_stopped_ts or (datetime.now() - self.tx_stopped_ts) > timedelta(milliseconds = rx_delay_ms)
-
+ @writeable
def remove_rx_filters (self):
assert(self.has_rx_enabled())
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if self.state == self.STATE_DOWN:
- return self.err("Unable to remove RX filters - port is down")
-
- if self.state == self.STATE_TX:
- return self.err("Unable to remove RX filters - port is transmitting")
-
if self.state == self.STATE_IDLE:
return self.ok()
@@ -447,12 +473,9 @@ class Port(object):
return self.ok()
-
+ @owned
def pause (self):
- if not self.is_acquired():
- return self.err("port is not owned")
-
if (self.state != self.STATE_TX) :
return self.err("port is not transmitting")
@@ -467,12 +490,9 @@ class Port(object):
return self.ok()
-
+ @owned
def resume (self):
- if not self.is_acquired():
- return self.err("port is not owned")
-
if (self.state != self.STATE_PAUSE) :
return self.err("port is not in pause mode")
@@ -489,12 +509,9 @@ class Port(object):
return self.ok()
-
+ @owned
def update (self, mul, force):
- if not self.is_acquired():
- return self.err("port is not owned")
-
if (self.state != self.STATE_TX) :
return self.err("port is not transmitting")
@@ -509,15 +526,9 @@ class Port(object):
return self.ok()
-
+ @owned
def validate (self):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state == self.STATE_DOWN):
- return self.err("port is down")
-
if (self.state == self.STATE_IDLE):
return self.err("no streams attached to port")
@@ -533,12 +544,8 @@ class Port(object):
return self.ok()
+ @owned
def set_attr (self, attr_dict):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state == self.STATE_DOWN):
- return self.err("port is down")
params = {"handler": self.handler,
"port_id": self.port_id,
@@ -553,13 +560,8 @@ class Port(object):
return self.ok()
-
+ @writeable
def push_remote (self, pcap_filename, ipg_usec, speedup, count):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state == self.STATE_DOWN):
- return self.err("port is down")
params = {"handler": self.handler,
"port_id": self.port_id,
@@ -572,7 +574,7 @@ class Port(object):
if rc.bad():
return self.err(rc.err())
- self.state = self.STATE_TX
+ self.state = self.STATE_PCAP_TX
return self.ok()
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
index 6ee587c3..66aeaef4 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
@@ -939,7 +939,7 @@ class STLProfile(object):
Name of the pcap file
ipg_usec : float
- Inter packet gap in usec. If IPG=0, IPG is taken from pcap file
+ Inter packet gap in usec. If IPG is None, IPG is taken from pcap file
speedup : float
When reading the pcap file, divide IPG by this "speedup" factor. Resulting IPG is sped up by this factor.
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
index 5c0dfb14..7e0bf9e4 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py
@@ -150,11 +150,15 @@ def format_text(text, *args):
def format_threshold (value, red_zone, green_zone):
- if value >= red_zone[0] and value <= red_zone[1]:
- return format_text("{0}".format(value), 'red')
+ try:
+ if value >= red_zone[0] and value <= red_zone[1]:
+ return format_text("{0}".format(value), 'red')
- if value >= green_zone[0] and value <= green_zone[1]:
- return format_text("{0}".format(value), 'green')
+ if value >= green_zone[0] and value <= green_zone[1]:
+ return format_text("{0}".format(value), 'green')
+ except TypeError:
+ # if value is not comparable or not a number - skip this
+ pass
return "{0}".format(value)
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 2491d122..18b1053e 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -3713,17 +3713,14 @@ int CNodeGenerator::flush_file(dsec_t max_time,
m_p_queue.pop();
CGenNodePCAP *node_pcap = (CGenNodePCAP *)node;
- node_pcap->handle(thread);
-
- if (node_pcap->has_next()) {
- node_pcap->next();
- node_pcap->m_time += node_pcap->get_ipg();
- m_p_queue.push(node);
- } else {
- thread->free_node(node);
- thread->m_stateless_dp_info.stop_traffic(node_pcap->get_port_id(), false, 0);
+ /* might have been marked for free */
+ if ( unlikely( node_pcap->is_marked_for_free() ) ) {
+ thread->free_node(node);
+ } else {
+ node_pcap->handle(thread);
}
+
} else {
bool exit_sccheduler = handle_slow_messages(type,node,thread,always);
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 7399137b..132824b3 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -3591,6 +3591,10 @@ public:
bool set_stateless_next_node( CGenNodeStateless * cur_node,
CGenNodeStateless * next_node);
+ void stop_stateless_traffic(uint8_t port_id) {
+ m_stateless_dp_info.stop_traffic(port_id, false, 0);
+ }
+
void Dump(FILE *fd);
void DumpCsv(FILE *fd);
void DumpStats(FILE *fd);
diff --git a/src/common/pcap.cpp b/src/common/pcap.cpp
index 9b360a3e..a68fb620 100755
--- a/src/common/pcap.cpp
+++ b/src/common/pcap.cpp
@@ -156,8 +156,7 @@ bool LibPCapReader::ReadPacket(CCapPktRaw *lpPacket)
}
if (pkt_header.len > READER_MAX_PACKET_SIZE) {
/* cannot read this packet */
- printf("ERROR packet is too big, bigger than %d \n",READER_MAX_PACKET_SIZE);
- exit(-1);
+ //printf("ERROR packet is too big, bigger than %d \n",READER_MAX_PACKET_SIZE);
return false;
}
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 3c345aa5..5a383bc6 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -2671,6 +2671,10 @@ public:
}
int run_in_rx_core();
int run_in_master();
+
+ bool handle_fast_path();
+ bool handle_slow_path(bool &was_stopped);
+
int stop_master();
/* return the minimum number of dp cores needed to support the active ports
this is for c==1 or m_cores_mul==1
@@ -3715,125 +3719,150 @@ CGlobalTRex::publish_async_barrier(uint32_t key) {
m_zmq_publisher.publish_barrier(key);
}
-int CGlobalTRex::run_in_master() {
- bool was_stopped=false;
- if ( get_is_stateless() ) {
- m_trex_stateless->launch_control_plane();
- }
+bool
+CGlobalTRex::handle_slow_path(bool &was_stopped) {
+ m_stats_cnt+=1;
- /* exception and scope safe */
- std::unique_lock<std::mutex> cp_lock(m_cp_lock);
- while ( true ) {
- m_stats_cnt+=1;
-
-
- if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ){
- if ( m_io_modes.handle_io_modes() ){
- was_stopped=true;
- break;
- }
- }
-
- if ( sanity_check() ){
- printf(" Test was stopped \n");
+ if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ) {
+ if ( m_io_modes.handle_io_modes() ) {
was_stopped=true;
- break;
+ return false;
}
- if (m_io_modes.m_g_mode != CTrexGlobalIoMode::gDISABLE ) {
+ }
+
+ if ( sanity_check() ) {
+ printf(" Test was stopped \n");
+ was_stopped=true;
+ return false;
+ }
+ if (m_io_modes.m_g_mode != CTrexGlobalIoMode::gDISABLE ) {
+ fprintf(stdout,"\033[2J");
+ fprintf(stdout,"\033[2H");
+
+ } else {
+ if ( m_io_modes.m_g_disable_first ) {
+ m_io_modes.m_g_disable_first=false;
fprintf(stdout,"\033[2J");
fprintf(stdout,"\033[2H");
-
- }else{
- if ( m_io_modes.m_g_disable_first ){
- m_io_modes.m_g_disable_first=false;
- fprintf(stdout,"\033[2J");
- fprintf(stdout,"\033[2H");
- printf("clean !!!\n");
- fflush(stdout);
- }
+ printf("clean !!!\n");
+ fflush(stdout);
}
+ }
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gHELP ) {
- m_io_modes.DumpHelp(stdout);
- }
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gHELP ) {
+ m_io_modes.DumpHelp(stdout);
+ }
- dump_stats(stdout,CGlobalStats::dmpTABLE);
+ dump_stats(stdout,CGlobalStats::dmpTABLE);
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
- fprintf (stdout," current time : %.1f sec \n",now_sec());
- float d= CGlobalInfo::m_options.m_duration - now_sec();
- if (d<0) {
- d=0;
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
+ fprintf (stdout," current time : %.1f sec \n",now_sec());
+ float d= CGlobalInfo::m_options.m_duration - now_sec();
+ if (d<0) {
+ d=0;
- }
- fprintf (stdout," test duration : %.1f sec \n",d);
}
+ fprintf (stdout," test duration : %.1f sec \n",d);
+ }
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gMem) {
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gMem) {
- if ( m_stats_cnt%4==0){
- fprintf (stdout," %s \n",CGlobalInfo::dump_pool_as_json().c_str());
- }
+ if ( m_stats_cnt%4==0) {
+ fprintf (stdout," %s \n",CGlobalInfo::dump_pool_as_json().c_str());
}
+ }
- if ( CGlobalInfo::m_options.is_rx_enabled() ){
- m_mg.update();
+ if ( CGlobalInfo::m_options.is_rx_enabled() ) {
+ m_mg.update();
- if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){
- switch (m_io_modes.m_l_mode) {
- case CTrexGlobalIoMode::lDISABLE:
- fprintf(stdout,"\n+Latency stats disabled \n");
+ if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
+ switch (m_io_modes.m_l_mode) {
+ case CTrexGlobalIoMode::lDISABLE:
+ fprintf(stdout,"\n+Latency stats disabled \n");
+ break;
+ case CTrexGlobalIoMode::lENABLE:
+ fprintf(stdout,"\n-Latency stats enabled \n");
+ m_mg.DumpShort(stdout);
+ break;
+ case CTrexGlobalIoMode::lENABLE_Extended:
+ fprintf(stdout,"\n-Latency stats extended \n");
+ m_mg.Dump(stdout);
+ break;
+ }
+
+ if ( get_is_rx_check_mode() ) {
+
+ switch (m_io_modes.m_rc_mode) {
+ case CTrexGlobalIoMode::rcDISABLE:
+ fprintf(stdout,"\n+Rx Check stats disabled \n");
break;
- case CTrexGlobalIoMode::lENABLE:
- fprintf(stdout,"\n-Latency stats enabled \n");
- m_mg.DumpShort(stdout);
+ case CTrexGlobalIoMode::rcENABLE:
+ fprintf(stdout,"\n-Rx Check stats enabled \n");
+ m_mg.DumpShortRxCheck(stdout);
break;
- case CTrexGlobalIoMode::lENABLE_Extended:
- fprintf(stdout,"\n-Latency stats extended \n");
- m_mg.Dump(stdout);
+ case CTrexGlobalIoMode::rcENABLE_Extended:
+ fprintf(stdout,"\n-Rx Check stats enhanced \n");
+ m_mg.DumpRxCheck(stdout);
break;
}
- if ( get_is_rx_check_mode() ) {
-
- switch (m_io_modes.m_rc_mode) {
- case CTrexGlobalIoMode::rcDISABLE:
- fprintf(stdout,"\n+Rx Check stats disabled \n");
- break;
- case CTrexGlobalIoMode::rcENABLE:
- fprintf(stdout,"\n-Rx Check stats enabled \n");
- m_mg.DumpShortRxCheck(stdout);
- break;
- case CTrexGlobalIoMode::rcENABLE_Extended:
- fprintf(stdout,"\n-Rx Check stats enhanced \n");
- m_mg.DumpRxCheck(stdout);
- break;
- }
-
- }
-
}
+
}
+ }
+ /* publish data */
+ publish_async_data(false);
+ return true;
+}
- /* publish data */
- publish_async_data(false);
- /* check from messages from DP */
- check_for_dp_messages();
+bool
+CGlobalTRex::handle_fast_path() {
+ /* check from messages from DP */
+ check_for_dp_messages();
- cp_lock.unlock();
- delay(500);
- cp_lock.lock();
+ if ( is_all_cores_finished() ) {
+ return false;
+ }
+
+ return true;
+}
- if ( is_all_cores_finished() ) {
+int CGlobalTRex::run_in_master() {
+ bool was_stopped=false;
+
+ if ( get_is_stateless() ) {
+ m_trex_stateless->launch_control_plane();
+ }
+
+ /* exception and scope safe */
+ std::unique_lock<std::mutex> cp_lock(m_cp_lock);
+
+ uint32_t slow_path_ms = 0;
+
+ while ( true ) {
+
+ if (slow_path_ms >= 500) {
+ if (!handle_slow_path(was_stopped)) {
+ break;
+ }
+ slow_path_ms = 0;
+ }
+
+ if (!handle_fast_path()) {
break;
}
+
+ cp_lock.unlock();
+ delay(20);
+ slow_path_ms += 20;
+ cp_lock.lock();
}
/* on exit release the lock */
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index d48c770e..5fe707af 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -462,14 +462,14 @@ TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
uint8_t port_id = parse_port(params, result);
std::string pcap_filename = parse_string(params, "pcap_filename", result);
- double ipg = parse_double(params, "ipg", result);
+ double ipg_usec = parse_double(params, "ipg_usec", result);
double speedup = parse_double(params, "speedup", result);
uint32_t count = parse_uint32(params, "count", result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
try {
- port->push_remote(pcap_filename, ipg, speedup, count);
+ port->push_remote(pcap_filename, ipg_usec, speedup, count);
} catch (const TrexException &ex) {
generate_execute_err(result, ex.what());
}
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index b09393f9..aa2d43f3 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -255,6 +255,13 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
}
+bool TrexStatelessPort::is_active() const {
+ return ( (m_port_state == PORT_STATE_TX)
+ || (m_port_state == PORT_STATE_PAUSE)
+ || (m_port_state == PORT_STATE_PCAP_TX)
+ );
+}
+
/**
* stop traffic on port
*
@@ -264,9 +271,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
*/
void
TrexStatelessPort::stop_traffic(void) {
-
- if (!( (m_port_state == PORT_STATE_TX)
- || (m_port_state == PORT_STATE_PAUSE) )) {
+ if (!is_active()) {
return;
}
@@ -438,10 +443,13 @@ TrexStatelessPort::push_remote(const std::string &pcap_filename, double ipg_usec
}
/* send a message to core */
- change_state(PORT_STATE_TX);
+ change_state(PORT_STATE_PCAP_TX);
TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
m_pending_async_stop_event,
- pcap_filename);
+ pcap_filename,
+ ipg_usec,
+ speedup,
+ count);
send_message_to_dp(tx_core, push_msg);
/* update subscribers */
@@ -468,6 +476,9 @@ TrexStatelessPort::get_state_as_string() const {
case PORT_STATE_PAUSE:
return "PAUSE";
+
+ case PORT_STATE_PCAP_TX:
+ return "PCAP_TX";
}
return "UNKNOWN";
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 502c066d..ccbfad0d 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -129,11 +129,12 @@ public:
* port state
*/
enum port_state_e {
- PORT_STATE_DOWN = 0x1,
- PORT_STATE_IDLE = 0x2,
- PORT_STATE_STREAMS = 0x4,
- PORT_STATE_TX = 0x8,
- PORT_STATE_PAUSE = 0x10,
+ PORT_STATE_DOWN = 0x1,
+ PORT_STATE_IDLE = 0x2,
+ PORT_STATE_STREAMS = 0x4,
+ PORT_STATE_TX = 0x8,
+ PORT_STATE_PAUSE = 0x10,
+ PORT_STATE_PCAP_TX = 0x20,
};
/**
@@ -226,6 +227,12 @@ public:
}
/**
+ * return true if the port is active
+ * (paused is considered active)
+ */
+ bool is_active() const;
+
+ /**
* port state as string
*
*/
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 31c907fa..6450d0f9 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -262,7 +262,11 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
return (true);
}
-bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_filename){
+bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
/* push pcap can only happen on an idle port from the core prespective */
assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
@@ -277,7 +281,13 @@ bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_
uint8_t mac_addr[12];
m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
- bool rc = pcap_node->create(port_id, pcap_filename, dir, mac_addr);
+ bool rc = pcap_node->create(port_id,
+ dir,
+ mac_addr,
+ pcap_filename,
+ ipg_usec,
+ speedup,
+ count);
if (!rc) {
m_core->free_node((CGenNode *)pcap_node);
return (false);
@@ -287,8 +297,11 @@ bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_
pcap_node->m_time = m_core->m_cur_time_sec;
m_core->m_node_gen.add_node((CGenNode *)pcap_node);
- m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ /* hold a pointer to the node */
+ assert(m_active_pcap_node == NULL);
+ m_active_pcap_node = pcap_node;
+ m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
return (true);
}
@@ -324,6 +337,19 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
}
}
+ /* check for active PCAP node */
+ if (m_active_pcap_node) {
+ /* when got async stop from outside or duration */
+ if (m_active_pcap_node->is_active()) {
+ m_active_pcap_node->mark_for_free();
+ } else {
+ /* graceful stop - node was put out by the scheduler */
+ m_core->free_node( (CGenNode *)m_active_pcap_node);
+ }
+
+ m_active_pcap_node = NULL;
+ }
+
/* active stream should be zero */
assert(m_active_streams==0);
m_active_nodes.clear();
@@ -337,6 +363,7 @@ void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
m_active_streams=0;
m_active_nodes.clear();
+ m_active_pcap_node = NULL;
}
@@ -866,14 +893,19 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){
void
-TrexStatelessDpCore::push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename) {
+TrexStatelessDpCore::push_pcap(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
lp_port->set_event_id(event_id);
/* delegate the command to the port */
- bool rc = lp_port->push_pcap(port_id, pcap_filename);
+ bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
if (!rc) {
/* report back that we stopped */
CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
@@ -913,7 +945,6 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id,
the scheduler invokes it, it will be free */
TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
-
if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
/* nothing to do ! already stopped */
//printf(" skip .. %f\n",m_core->m_cur_time_sec);
@@ -961,13 +992,30 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
/**
* PCAP node
*/
-bool CGenNodePCAP::create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr) {
+bool CGenNodePCAP::create(uint8_t port_id,
+ pkt_dir_t dir,
+ const uint8_t *mac_addr,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
std::stringstream ss;
m_type = CGenNode::PCAP_PKT;
m_flags = 0;
m_src_port = 0;
m_port_id = port_id;
+ m_count = count;
+
+ if (ipg_usec != -1) {
+ /* fixed IPG */
+ m_ipg_sec = usec_to_sec(ipg_usec / speedup);
+ m_speedup = 0;
+ } else {
+ /* packet IPG */
+ m_ipg_sec = -1;
+ m_speedup = speedup;
+ }
/* copy MAC addr info */
memcpy(m_mac_addr, mac_addr, 12);
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 01033a7c..115f8873 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -33,7 +33,7 @@ class CFlowGenListPerThread;
class CGenNodeStateless;
class TrexStreamsCompiledObj;
class TrexStream;
-
+class CGenNodePCAP;
class CDpOneStream {
public:
@@ -70,7 +70,11 @@ public:
bool update_traffic(uint8_t port_id, double factor);
- bool push_pcap(uint8_t port_id, const std::string &pcap_filename);
+ bool push_pcap(uint8_t port_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
bool stop_traffic(uint8_t port_id,
bool stop_on_id,
@@ -97,6 +101,7 @@ public:
uint32_t m_active_streams; /* how many active streams on this port */
std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CGenNodePCAP *m_active_pcap_node;
CFlowGenListPerThread * m_core ;
int m_event_id;
};
@@ -165,7 +170,12 @@ public:
* push a PCAP file on port
*
*/
- void push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename);
+ void push_pcap(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
/**
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index 8ccb5286..a970c1f7 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -398,7 +398,14 @@ public:
/**
* creates a node from a PCAP file
*/
- bool create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr);
+ bool create(uint8_t port_id,
+ pkt_dir_t dir,
+ const uint8_t *mac_addr,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
+
void destroy();
/**
@@ -407,36 +414,44 @@ public:
* @author imarom (03-May-16)
*/
void next() {
- assert(m_state == PCAP_ACTIVE);
+ assert(is_active());
/* save the previous packet time */
m_last_pkt_time = m_raw_packet->get_time();
/* advance */
if ( m_reader->ReadPacket(m_raw_packet) == false ){
- m_state = PCAP_EOF;
- return;
+ m_count--;
+
+ /* if its the end - go home... */
+ if (m_count == 0) {
+ m_state = PCAP_INACTIVE;
+ return;
+ }
+
+ /* rewind and load the first packet */
+ m_reader->Rewind();
+ if (!m_reader->ReadPacket(m_raw_packet)) {
+ m_state = PCAP_INACTIVE;
+ return;
+ }
}
}
/**
- * return true if the PCAP has next packet
- *
- */
- bool has_next() {
- assert(m_state != PCAP_INVALID);
- return (m_state == PCAP_ACTIVE);
- }
-
- /**
* return the time for the next scheduling for a packet
*
*/
inline double get_ipg() {
assert(m_state != PCAP_INVALID);
- return m_raw_packet->get_time() - m_last_pkt_time;
- //return 0.00001;
+
+ /* fixed IPG */
+ if (m_ipg_sec != -1) {
+ return m_ipg_sec;
+ } else {
+ return ((m_raw_packet->get_time() - m_last_pkt_time) / m_speedup);
+ }
}
/**
@@ -465,6 +480,17 @@ public:
inline void handle(CFlowGenListPerThread *thread) {
assert(m_state != PCAP_INVALID);
thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+
+ // read the next packet
+ next();
+
+ if (is_active()) {
+ m_time += get_ipg();
+ thread->m_node_gen.m_p_queue.push((CGenNode *)this);
+
+ } else {
+ thread->stop_stateless_traffic(get_port_id());
+ }
}
void set_mbuf_dir(pkt_dir_t dir) {
@@ -483,12 +509,25 @@ public:
return m_port_id;
}
+ void mark_for_free() {
+ m_state = PCAP_MARKED_FOR_FREE;
+ }
+
+ bool is_active() {
+ return (m_state == PCAP_ACTIVE);
+ }
+
+ bool is_marked_for_free() {
+ return (m_state == PCAP_MARKED_FOR_FREE);
+ }
+
private:
enum {
PCAP_INVALID = 0,
PCAP_ACTIVE,
- PCAP_EOF
+ PCAP_INACTIVE,
+ PCAP_MARKED_FOR_FREE
};
/* cache line 0 */
@@ -496,11 +535,10 @@ private:
uint8_t m_mac_addr[12];
uint8_t m_state;
- //double m_base_time;
- //double m_current_pkt_time;
double m_last_pkt_time;
-
- void * m_cache_mbuf;
+ double m_speedup;
+ double m_ipg_sec;
+ uint32_t m_count;
double m_next_time_offset; /* in sec */
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 59e0a0a8..c0151c76 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -187,13 +187,18 @@ TrexStatelessDpUpdate::clone() {
************************/
bool
TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) {
- dp_core->push_pcap(m_port_id, m_event_id, m_pcap_filename);
+ dp_core->push_pcap(m_port_id, m_event_id, m_pcap_filename, m_ipg_usec, m_speedup, m_count);
return true;
}
TrexStatelessCpToDpMsgBase *
TrexStatelessDpPushPCAP::clone() {
- TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id, m_event_id, m_pcap_filename);
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_event_id,
+ m_pcap_filename,
+ m_ipg_usec,
+ m_speedup,
+ m_count);
return new_msg;
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 8fb2a456..c3de82ee 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -253,9 +253,17 @@ private:
class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase {
public:
- TrexStatelessDpPushPCAP(uint8_t port_id, int event_id, const std::string &pcap_filename) : m_pcap_filename(pcap_filename) {
+ TrexStatelessDpPushPCAP(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) : m_pcap_filename(pcap_filename) {
m_port_id = port_id;
m_event_id = event_id;
+ m_ipg_usec = ipg_usec;
+ m_speedup = speedup;
+ m_count = count;
}
virtual bool handle(TrexStatelessDpCore *dp_core);
@@ -264,8 +272,11 @@ public:
private:
std::string m_pcap_filename;
- uint8_t m_port_id;
int m_event_id;
+ double m_ipg_usec;
+ double m_speedup;
+ uint32_t m_count;
+ uint8_t m_port_id;
};