diff options
author | 2016-05-08 10:57:57 +0300 | |
---|---|---|
committer | 2016-05-09 16:48:15 +0300 | |
commit | 75ce59e5652f9094beab854d263a850cfc81a3de (patch) | |
tree | 10e7216b433aaae1888850a59f48e9c1df84af65 | |
parent | 8691f4019dc2123c1aa7413cf3666138756c2f66 (diff) |
PCAP refinement
-rwxr-xr-x | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py | 25 | ||||
-rw-r--r-- | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py | 222 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py | 2 | ||||
-rw-r--r-- | scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py | 12 | ||||
-rwxr-xr-x | src/bp_sim.cpp | 15 | ||||
-rwxr-xr-x | src/bp_sim.h | 4 | ||||
-rwxr-xr-x | src/common/pcap.cpp | 3 | ||||
-rw-r--r-- | src/main_dpdk.cpp | 197 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmd_general.cpp | 4 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 21 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 17 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 62 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 16 | ||||
-rw-r--r-- | src/stateless/dp/trex_stream_node.h | 78 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 9 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 15 |
16 files changed, 437 insertions, 265 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index c7d59690..0d95131f 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -335,30 +335,37 @@ class EventsHandler(object): # private functions + # on rare cases events may come on a non existent prot + # (server was re-run with different config) def __async_event_port_job_done (self, port_id): - self.client.ports[port_id].async_event_port_job_done() + if port_id in self.client.ports: + self.client.ports[port_id].async_event_port_job_done() def __async_event_port_stopped (self, port_id): - self.client.ports[port_id].async_event_port_stopped() + if port_id in self.client.ports: + self.client.ports[port_id].async_event_port_stopped() def __async_event_port_started (self, port_id): - self.client.ports[port_id].async_event_port_started() - + if port_id in self.client.ports: + self.client.ports[port_id].async_event_port_started() def __async_event_port_paused (self, port_id): - self.client.ports[port_id].async_event_port_paused() + if port_id in self.client.ports: + self.client.ports[port_id].async_event_port_paused() def __async_event_port_resumed (self, port_id): - self.client.ports[port_id].async_event_port_resumed() - + if port_id in self.client.ports: + self.client.ports[port_id].async_event_port_resumed() def __async_event_port_acquired (self, port_id, who): - self.client.ports[port_id].async_event_acquired(who) + if port_id in self.client.ports: + self.client.ports[port_id].async_event_acquired(who) def __async_event_port_released (self, port_id): - self.client.ports[port_id].async_event_released() + if port_id in self.client.ports: + self.client.ports[port_id].async_event_released() def __async_event_server_stopped (self): self.client.connected = False diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index 986cb3c6..43fc29e6 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -29,17 +29,20 @@ def mult_to_factor (mult, max_bps_l2, max_pps, line_util): # describes a single port class Port(object): - STATE_DOWN = 0 - STATE_IDLE = 1 - STATE_STREAMS = 2 - STATE_TX = 3 - STATE_PAUSE = 4 + STATE_DOWN = 0 + STATE_IDLE = 1 + STATE_STREAMS = 2 + STATE_TX = 3 + STATE_PAUSE = 4 + STATE_PCAP_TX = 5 + PortState = namedtuple('PortState', ['state_id', 'state_name']) STATES_MAP = {STATE_DOWN: "DOWN", STATE_IDLE: "IDLE", STATE_STREAMS: "IDLE", STATE_TX: "ACTIVE", - STATE_PAUSE: "PAUSE"} + STATE_PAUSE: "PAUSE", + STATE_PCAP_TX : "ACTIVE"} def __init__ (self, port_id, user, comm_link, session_id, info): @@ -67,6 +70,54 @@ class Port(object): self.owner = '' + # decorator to verify port is up + def up(func): + def func_wrapper(*args): + port = args[0] + + if not port.is_up(): + return port.err("{0} - port is down".format(func.__name__)) + + return func(*args) + + return func_wrapper + + # owned + def owned(func): + def func_wrapper(*args): + port = args[0] + + if not port.is_up(): + return port.err("{0} - port is down".format(func.__name__)) + + if not port.is_acquired(): + return port.err("{0} - port is not owned".format(func.__name__)) + + return func(*args) + + return func_wrapper + + + # decorator to check server is readable (port not down and etc.) + def writeable(func): + def func_wrapper(*args): + port = args[0] + + if not port.is_up(): + return port.err("{0} - port is down".format(func.__name__)) + + if not port.is_acquired(): + return port.err("{0} - port is not owned".format(func.__name__)) + + if not port.is_writeable(): + return port.err("{0} - port is not in a writeable state".format(func.__name__)) + + return func(*args) + + return func_wrapper + + + def err(self, msg): return RC_ERR("port {0} : {1}\n".format(self.port_id, msg)) @@ -79,7 +130,39 @@ class Port(object): def get_formatted_speed (self): return "{0} Gbps".format(self.info['speed']) + def is_acquired(self): + return (self.handler != None) + + def is_up (self): + return (self.state != self.STATE_DOWN) + + def is_active(self): + return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) or (self.state == self.STATE_PCAP_TX) + + def is_transmitting (self): + return (self.state == self.STATE_TX) or (self.state == self.STATE_PCAP_TX) + + def is_paused (self): + return (self.state == self.STATE_PAUSE) + + def is_writeable (self): + # operations on port can be done on state idle or state streams + return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) + + def get_owner (self): + if self.is_acquired(): + return self.user + else: + return self.owner + + def __allocate_stream_id (self): + id = self.next_available_id + self.next_available_id += 1 + return id + + # take the port + @up def acquire(self, force = False, sync_streams = True): params = {"port_id": self.port_id, "user": self.user, @@ -99,6 +182,7 @@ class Port(object): # sync all the streams with the server + @up def sync_streams (self): params = {"port_id": self.port_id} @@ -114,6 +198,7 @@ class Port(object): return self.ok() # release the port + @up def release(self): params = {"port_id": self.port_id, "handler": self.handler} @@ -129,24 +214,9 @@ class Port(object): else: return self.err(rc.err()) - def is_acquired(self): - return (self.handler != None) - - def is_active(self): - return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) - - def is_transmitting (self): - return (self.state == self.STATE_TX) - - def is_paused (self): - return (self.state == self.STATE_PAUSE) - - def get_owner (self): - if self.is_acquired(): - return self.user - else: - return self.owner + + @up def sync(self): params = {"port_id": self.port_id} @@ -168,6 +238,8 @@ class Port(object): self.state = self.STATE_TX elif port_state == "PAUSE": self.state = self.STATE_PAUSE + elif port_state == "PCAP_TX": + self.state = self.STATE_PCAP_TX else: raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state)) @@ -182,27 +254,11 @@ class Port(object): return self.ok() - # return TRUE if write commands - def is_port_writable (self): - # operations on port can be done on state idle or state streams - return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) - - - def __allocate_stream_id (self): - id = self.next_available_id - self.next_available_id += 1 - return id - # add streams + @writeable def add_streams (self, streams_list): - if not self.is_acquired(): - return self.err("port is not owned") - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to add streams") - # listify streams_list = streams_list if isinstance(streams_list, list) else [streams_list] @@ -274,14 +330,9 @@ class Port(object): # remove stream from port + @writeable def remove_streams (self, stream_id_list): - if not self.is_acquired(): - return self.err("port is not owned") - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to remove streams") - # single element to list stream_id_list = stream_id_list if isinstance(stream_id_list, list) else [stream_id_list] @@ -315,14 +366,9 @@ class Port(object): # remove all the streams + @writeable def remove_all_streams (self): - if not self.is_acquired(): - return self.err("port is not owned") - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to remove streams") - params = {"handler": self.handler, "port_id": self.port_id} @@ -349,19 +395,11 @@ class Port(object): return self.streams - # start traffic + @writeable def start (self, mul, duration, force): - if not self.is_acquired(): - return self.err("port is not owned") - - if self.state == self.STATE_DOWN: - return self.err("Unable to start traffic - port is down") if self.state == self.STATE_IDLE: - return self.err("Unable to start traffic - no streams attached to port") - - if self.state == self.STATE_TX: - return self.err("Unable to start traffic - port is already transmitting") + return self.err("unable to start traffic - no streams attached to port") params = {"handler": self.handler, "port_id": self.port_id, @@ -379,15 +417,12 @@ class Port(object): # stop traffic # with force ignores the cached state and sends the command + @owned def stop (self, force = False): - if not self.is_acquired(): - return self.err("port is not owned") - - # port is already stopped - if not force: - if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS): - return self.ok() + # if not is not active and not force - go back + if not self.is_active() and not force: + return self.ok() params = {"handler": self.handler, "port_id": self.port_id} @@ -421,19 +456,10 @@ class Port(object): return not self.tx_stopped_ts or (datetime.now() - self.tx_stopped_ts) > timedelta(milliseconds = rx_delay_ms) - + @writeable def remove_rx_filters (self): assert(self.has_rx_enabled()) - if not self.is_acquired(): - return self.err("port is not owned") - - if self.state == self.STATE_DOWN: - return self.err("Unable to remove RX filters - port is down") - - if self.state == self.STATE_TX: - return self.err("Unable to remove RX filters - port is transmitting") - if self.state == self.STATE_IDLE: return self.ok() @@ -447,12 +473,9 @@ class Port(object): return self.ok() - + @owned def pause (self): - if not self.is_acquired(): - return self.err("port is not owned") - if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -467,12 +490,9 @@ class Port(object): return self.ok() - + @owned def resume (self): - if not self.is_acquired(): - return self.err("port is not owned") - if (self.state != self.STATE_PAUSE) : return self.err("port is not in pause mode") @@ -489,12 +509,9 @@ class Port(object): return self.ok() - + @owned def update (self, mul, force): - if not self.is_acquired(): - return self.err("port is not owned") - if (self.state != self.STATE_TX) : return self.err("port is not transmitting") @@ -509,15 +526,9 @@ class Port(object): return self.ok() - + @owned def validate (self): - if not self.is_acquired(): - return self.err("port is not owned") - - if (self.state == self.STATE_DOWN): - return self.err("port is down") - if (self.state == self.STATE_IDLE): return self.err("no streams attached to port") @@ -533,12 +544,8 @@ class Port(object): return self.ok() + @owned def set_attr (self, attr_dict): - if not self.is_acquired(): - return self.err("port is not owned") - - if (self.state == self.STATE_DOWN): - return self.err("port is down") params = {"handler": self.handler, "port_id": self.port_id, @@ -553,13 +560,8 @@ class Port(object): return self.ok() - + @writeable def push_remote (self, pcap_filename, ipg_usec, speedup, count): - if not self.is_acquired(): - return self.err("port is not owned") - - if (self.state == self.STATE_DOWN): - return self.err("port is down") params = {"handler": self.handler, "port_id": self.port_id, @@ -572,7 +574,7 @@ class Port(object): if rc.bad(): return self.err(rc.err()) - self.state = self.STATE_TX + self.state = self.STATE_PCAP_TX return self.ok() diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py index 6ee587c3..66aeaef4 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py @@ -939,7 +939,7 @@ class STLProfile(object): Name of the pcap file ipg_usec : float - Inter packet gap in usec. If IPG=0, IPG is taken from pcap file + Inter packet gap in usec. If IPG is None, IPG is taken from pcap file speedup : float When reading the pcap file, divide IPG by this "speedup" factor. Resulting IPG is sped up by this factor. diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py index 5c0dfb14..7e0bf9e4 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py @@ -150,11 +150,15 @@ def format_text(text, *args): def format_threshold (value, red_zone, green_zone): - if value >= red_zone[0] and value <= red_zone[1]: - return format_text("{0}".format(value), 'red') + try: + if value >= red_zone[0] and value <= red_zone[1]: + return format_text("{0}".format(value), 'red') - if value >= green_zone[0] and value <= green_zone[1]: - return format_text("{0}".format(value), 'green') + if value >= green_zone[0] and value <= green_zone[1]: + return format_text("{0}".format(value), 'green') + except TypeError: + # if value is not comparable or not a number - skip this + pass return "{0}".format(value) diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 2491d122..18b1053e 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -3713,17 +3713,14 @@ int CNodeGenerator::flush_file(dsec_t max_time, m_p_queue.pop(); CGenNodePCAP *node_pcap = (CGenNodePCAP *)node; - node_pcap->handle(thread); - - if (node_pcap->has_next()) { - node_pcap->next(); - node_pcap->m_time += node_pcap->get_ipg(); - m_p_queue.push(node); - } else { - thread->free_node(node); - thread->m_stateless_dp_info.stop_traffic(node_pcap->get_port_id(), false, 0); + /* might have been marked for free */ + if ( unlikely( node_pcap->is_marked_for_free() ) ) { + thread->free_node(node); + } else { + node_pcap->handle(thread); } + } else { bool exit_sccheduler = handle_slow_messages(type,node,thread,always); diff --git a/src/bp_sim.h b/src/bp_sim.h index 7399137b..132824b3 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -3591,6 +3591,10 @@ public: bool set_stateless_next_node( CGenNodeStateless * cur_node, CGenNodeStateless * next_node); + void stop_stateless_traffic(uint8_t port_id) { + m_stateless_dp_info.stop_traffic(port_id, false, 0); + } + void Dump(FILE *fd); void DumpCsv(FILE *fd); void DumpStats(FILE *fd); diff --git a/src/common/pcap.cpp b/src/common/pcap.cpp index 9b360a3e..a68fb620 100755 --- a/src/common/pcap.cpp +++ b/src/common/pcap.cpp @@ -156,8 +156,7 @@ bool LibPCapReader::ReadPacket(CCapPktRaw *lpPacket) } if (pkt_header.len > READER_MAX_PACKET_SIZE) { /* cannot read this packet */ - printf("ERROR packet is too big, bigger than %d \n",READER_MAX_PACKET_SIZE); - exit(-1); + //printf("ERROR packet is too big, bigger than %d \n",READER_MAX_PACKET_SIZE); return false; } diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 3c345aa5..5a383bc6 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2671,6 +2671,10 @@ public: } int run_in_rx_core(); int run_in_master(); + + bool handle_fast_path(); + bool handle_slow_path(bool &was_stopped); + int stop_master(); /* return the minimum number of dp cores needed to support the active ports this is for c==1 or m_cores_mul==1 @@ -3715,125 +3719,150 @@ CGlobalTRex::publish_async_barrier(uint32_t key) { m_zmq_publisher.publish_barrier(key); } -int CGlobalTRex::run_in_master() { - bool was_stopped=false; - if ( get_is_stateless() ) { - m_trex_stateless->launch_control_plane(); - } +bool +CGlobalTRex::handle_slow_path(bool &was_stopped) { + m_stats_cnt+=1; - /* exception and scope safe */ - std::unique_lock<std::mutex> cp_lock(m_cp_lock); - while ( true ) { - m_stats_cnt+=1; - - - if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ){ - if ( m_io_modes.handle_io_modes() ){ - was_stopped=true; - break; - } - } - - if ( sanity_check() ){ - printf(" Test was stopped \n"); + if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ) { + if ( m_io_modes.handle_io_modes() ) { was_stopped=true; - break; + return false; } - if (m_io_modes.m_g_mode != CTrexGlobalIoMode::gDISABLE ) { + } + + if ( sanity_check() ) { + printf(" Test was stopped \n"); + was_stopped=true; + return false; + } + if (m_io_modes.m_g_mode != CTrexGlobalIoMode::gDISABLE ) { + fprintf(stdout,"\033[2J"); + fprintf(stdout,"\033[2H"); + + } else { + if ( m_io_modes.m_g_disable_first ) { + m_io_modes.m_g_disable_first=false; fprintf(stdout,"\033[2J"); fprintf(stdout,"\033[2H"); - - }else{ - if ( m_io_modes.m_g_disable_first ){ - m_io_modes.m_g_disable_first=false; - fprintf(stdout,"\033[2J"); - fprintf(stdout,"\033[2H"); - printf("clean !!!\n"); - fflush(stdout); - } + printf("clean !!!\n"); + fflush(stdout); } + } - if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gHELP ) { - m_io_modes.DumpHelp(stdout); - } + if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gHELP ) { + m_io_modes.DumpHelp(stdout); + } - dump_stats(stdout,CGlobalStats::dmpTABLE); + dump_stats(stdout,CGlobalStats::dmpTABLE); - if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) { - fprintf (stdout," current time : %.1f sec \n",now_sec()); - float d= CGlobalInfo::m_options.m_duration - now_sec(); - if (d<0) { - d=0; + if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) { + fprintf (stdout," current time : %.1f sec \n",now_sec()); + float d= CGlobalInfo::m_options.m_duration - now_sec(); + if (d<0) { + d=0; - } - fprintf (stdout," test duration : %.1f sec \n",d); } + fprintf (stdout," test duration : %.1f sec \n",d); + } - if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gMem) { + if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gMem) { - if ( m_stats_cnt%4==0){ - fprintf (stdout," %s \n",CGlobalInfo::dump_pool_as_json().c_str()); - } + if ( m_stats_cnt%4==0) { + fprintf (stdout," %s \n",CGlobalInfo::dump_pool_as_json().c_str()); } + } - if ( CGlobalInfo::m_options.is_rx_enabled() ){ - m_mg.update(); + if ( CGlobalInfo::m_options.is_rx_enabled() ) { + m_mg.update(); - if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){ - switch (m_io_modes.m_l_mode) { - case CTrexGlobalIoMode::lDISABLE: - fprintf(stdout,"\n+Latency stats disabled \n"); + if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) { + switch (m_io_modes.m_l_mode) { + case CTrexGlobalIoMode::lDISABLE: + fprintf(stdout,"\n+Latency stats disabled \n"); + break; + case CTrexGlobalIoMode::lENABLE: + fprintf(stdout,"\n-Latency stats enabled \n"); + m_mg.DumpShort(stdout); + break; + case CTrexGlobalIoMode::lENABLE_Extended: + fprintf(stdout,"\n-Latency stats extended \n"); + m_mg.Dump(stdout); + break; + } + + if ( get_is_rx_check_mode() ) { + + switch (m_io_modes.m_rc_mode) { + case CTrexGlobalIoMode::rcDISABLE: + fprintf(stdout,"\n+Rx Check stats disabled \n"); break; - case CTrexGlobalIoMode::lENABLE: - fprintf(stdout,"\n-Latency stats enabled \n"); - m_mg.DumpShort(stdout); + case CTrexGlobalIoMode::rcENABLE: + fprintf(stdout,"\n-Rx Check stats enabled \n"); + m_mg.DumpShortRxCheck(stdout); break; - case CTrexGlobalIoMode::lENABLE_Extended: - fprintf(stdout,"\n-Latency stats extended \n"); - m_mg.Dump(stdout); + case CTrexGlobalIoMode::rcENABLE_Extended: + fprintf(stdout,"\n-Rx Check stats enhanced \n"); + m_mg.DumpRxCheck(stdout); break; } - if ( get_is_rx_check_mode() ) { - - switch (m_io_modes.m_rc_mode) { - case CTrexGlobalIoMode::rcDISABLE: - fprintf(stdout,"\n+Rx Check stats disabled \n"); - break; - case CTrexGlobalIoMode::rcENABLE: - fprintf(stdout,"\n-Rx Check stats enabled \n"); - m_mg.DumpShortRxCheck(stdout); - break; - case CTrexGlobalIoMode::rcENABLE_Extended: - fprintf(stdout,"\n-Rx Check stats enhanced \n"); - m_mg.DumpRxCheck(stdout); - break; - } - - } - } + } + } + /* publish data */ + publish_async_data(false); + return true; +} - /* publish data */ - publish_async_data(false); - /* check from messages from DP */ - check_for_dp_messages(); +bool +CGlobalTRex::handle_fast_path() { + /* check from messages from DP */ + check_for_dp_messages(); - cp_lock.unlock(); - delay(500); - cp_lock.lock(); + if ( is_all_cores_finished() ) { + return false; + } + + return true; +} - if ( is_all_cores_finished() ) { +int CGlobalTRex::run_in_master() { + bool was_stopped=false; + + if ( get_is_stateless() ) { + m_trex_stateless->launch_control_plane(); + } + + /* exception and scope safe */ + std::unique_lock<std::mutex> cp_lock(m_cp_lock); + + uint32_t slow_path_ms = 0; + + while ( true ) { + + if (slow_path_ms >= 500) { + if (!handle_slow_path(was_stopped)) { + break; + } + slow_path_ms = 0; + } + + if (!handle_fast_path()) { break; } + + cp_lock.unlock(); + delay(20); + slow_path_ms += 20; + cp_lock.lock(); } /* on exit release the lock */ diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index d48c770e..5fe707af 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -462,14 +462,14 @@ TrexRpcCmdPushRemote::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_port(params, result); std::string pcap_filename = parse_string(params, "pcap_filename", result); - double ipg = parse_double(params, "ipg", result); + double ipg_usec = parse_double(params, "ipg_usec", result); double speedup = parse_double(params, "speedup", result); uint32_t count = parse_uint32(params, "count", result); TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); try { - port->push_remote(pcap_filename, ipg, speedup, count); + port->push_remote(pcap_filename, ipg_usec, speedup, count); } catch (const TrexException &ex) { generate_execute_err(result, ex.what()); } diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index b09393f9..aa2d43f3 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -255,6 +255,13 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, } +bool TrexStatelessPort::is_active() const { + return ( (m_port_state == PORT_STATE_TX) + || (m_port_state == PORT_STATE_PAUSE) + || (m_port_state == PORT_STATE_PCAP_TX) + ); +} + /** * stop traffic on port * @@ -264,9 +271,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, */ void TrexStatelessPort::stop_traffic(void) { - - if (!( (m_port_state == PORT_STATE_TX) - || (m_port_state == PORT_STATE_PAUSE) )) { + if (!is_active()) { return; } @@ -438,10 +443,13 @@ TrexStatelessPort::push_remote(const std::string &pcap_filename, double ipg_usec } /* send a message to core */ - change_state(PORT_STATE_TX); + change_state(PORT_STATE_PCAP_TX); TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id, m_pending_async_stop_event, - pcap_filename); + pcap_filename, + ipg_usec, + speedup, + count); send_message_to_dp(tx_core, push_msg); /* update subscribers */ @@ -468,6 +476,9 @@ TrexStatelessPort::get_state_as_string() const { case PORT_STATE_PAUSE: return "PAUSE"; + + case PORT_STATE_PCAP_TX: + return "PCAP_TX"; } return "UNKNOWN"; diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 502c066d..ccbfad0d 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -129,11 +129,12 @@ public: * port state */ enum port_state_e { - PORT_STATE_DOWN = 0x1, - PORT_STATE_IDLE = 0x2, - PORT_STATE_STREAMS = 0x4, - PORT_STATE_TX = 0x8, - PORT_STATE_PAUSE = 0x10, + PORT_STATE_DOWN = 0x1, + PORT_STATE_IDLE = 0x2, + PORT_STATE_STREAMS = 0x4, + PORT_STATE_TX = 0x8, + PORT_STATE_PAUSE = 0x10, + PORT_STATE_PCAP_TX = 0x20, }; /** @@ -226,6 +227,12 @@ public: } /** + * return true if the port is active + * (paused is considered active) + */ + bool is_active() const; + + /** * port state as string * */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 31c907fa..6450d0f9 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -262,7 +262,11 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){ return (true); } -bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_filename){ +bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, + const std::string &pcap_filename, + double ipg_usec, + double speedup, + uint32_t count) { /* push pcap can only happen on an idle port from the core prespective */ assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE); @@ -277,7 +281,13 @@ bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_ uint8_t mac_addr[12]; m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr); - bool rc = pcap_node->create(port_id, pcap_filename, dir, mac_addr); + bool rc = pcap_node->create(port_id, + dir, + mac_addr, + pcap_filename, + ipg_usec, + speedup, + count); if (!rc) { m_core->free_node((CGenNode *)pcap_node); return (false); @@ -287,8 +297,11 @@ bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_ pcap_node->m_time = m_core->m_cur_time_sec; m_core->m_node_gen.add_node((CGenNode *)pcap_node); - m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + /* hold a pointer to the node */ + assert(m_active_pcap_node == NULL); + m_active_pcap_node = pcap_node; + m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; return (true); } @@ -324,6 +337,19 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, } } + /* check for active PCAP node */ + if (m_active_pcap_node) { + /* when got async stop from outside or duration */ + if (m_active_pcap_node->is_active()) { + m_active_pcap_node->mark_for_free(); + } else { + /* graceful stop - node was put out by the scheduler */ + m_core->free_node( (CGenNode *)m_active_pcap_node); + } + + m_active_pcap_node = NULL; + } + /* active stream should be zero */ assert(m_active_streams==0); m_active_nodes.clear(); @@ -337,6 +363,7 @@ void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; m_active_streams=0; m_active_nodes.clear(); + m_active_pcap_node = NULL; } @@ -866,14 +893,19 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){ void -TrexStatelessDpCore::push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename) { +TrexStatelessDpCore::push_pcap(uint8_t port_id, + int event_id, + const std::string &pcap_filename, + double ipg_usec, + double speedup, + uint32_t count) { TrexStatelessDpPerPort * lp_port = get_port_db(port_id); lp_port->set_event_id(event_id); /* delegate the command to the port */ - bool rc = lp_port->push_pcap(port_id, pcap_filename); + bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count); if (!rc) { /* report back that we stopped */ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); @@ -913,7 +945,6 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id, the scheduler invokes it, it will be free */ TrexStatelessDpPerPort * lp_port = get_port_db(port_id); - if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){ /* nothing to do ! already stopped */ //printf(" skip .. %f\n",m_core->m_cur_time_sec); @@ -961,13 +992,30 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) { /** * PCAP node */ -bool CGenNodePCAP::create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr) { +bool CGenNodePCAP::create(uint8_t port_id, + pkt_dir_t dir, + const uint8_t *mac_addr, + const std::string &pcap_filename, + double ipg_usec, + double speedup, + uint32_t count) { std::stringstream ss; m_type = CGenNode::PCAP_PKT; m_flags = 0; m_src_port = 0; m_port_id = port_id; + m_count = count; + + if (ipg_usec != -1) { + /* fixed IPG */ + m_ipg_sec = usec_to_sec(ipg_usec / speedup); + m_speedup = 0; + } else { + /* packet IPG */ + m_ipg_sec = -1; + m_speedup = speedup; + } /* copy MAC addr info */ memcpy(m_mac_addr, mac_addr, 12); diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 01033a7c..115f8873 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -33,7 +33,7 @@ class CFlowGenListPerThread; class CGenNodeStateless; class TrexStreamsCompiledObj; class TrexStream; - +class CGenNodePCAP; class CDpOneStream { public: @@ -70,7 +70,11 @@ public: bool update_traffic(uint8_t port_id, double factor); - bool push_pcap(uint8_t port_id, const std::string &pcap_filename); + bool push_pcap(uint8_t port_id, + const std::string &pcap_filename, + double ipg_usec, + double speedup, + uint32_t count); bool stop_traffic(uint8_t port_id, bool stop_on_id, @@ -97,6 +101,7 @@ public: uint32_t m_active_streams; /* how many active streams on this port */ std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */ + CGenNodePCAP *m_active_pcap_node; CFlowGenListPerThread * m_core ; int m_event_id; }; @@ -165,7 +170,12 @@ public: * push a PCAP file on port * */ - void push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename); + void push_pcap(uint8_t port_id, + int event_id, + const std::string &pcap_filename, + double ipg_usec, + double speedup, + uint32_t count); /** diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index 8ccb5286..a970c1f7 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -398,7 +398,14 @@ public: /** * creates a node from a PCAP file */ - bool create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr); + bool create(uint8_t port_id, + pkt_dir_t dir, + const uint8_t *mac_addr, + const std::string &pcap_filename, + double ipg_usec, + double speedup, + uint32_t count); + void destroy(); /** @@ -407,36 +414,44 @@ public: * @author imarom (03-May-16) */ void next() { - assert(m_state == PCAP_ACTIVE); + assert(is_active()); /* save the previous packet time */ m_last_pkt_time = m_raw_packet->get_time(); /* advance */ if ( m_reader->ReadPacket(m_raw_packet) == false ){ - m_state = PCAP_EOF; - return; + m_count--; + + /* if its the end - go home... */ + if (m_count == 0) { + m_state = PCAP_INACTIVE; + return; + } + + /* rewind and load the first packet */ + m_reader->Rewind(); + if (!m_reader->ReadPacket(m_raw_packet)) { + m_state = PCAP_INACTIVE; + return; + } } } /** - * return true if the PCAP has next packet - * - */ - bool has_next() { - assert(m_state != PCAP_INVALID); - return (m_state == PCAP_ACTIVE); - } - - /** * return the time for the next scheduling for a packet * */ inline double get_ipg() { assert(m_state != PCAP_INVALID); - return m_raw_packet->get_time() - m_last_pkt_time; - //return 0.00001; + + /* fixed IPG */ + if (m_ipg_sec != -1) { + return m_ipg_sec; + } else { + return ((m_raw_packet->get_time() - m_last_pkt_time) / m_speedup); + } } /** @@ -465,6 +480,17 @@ public: inline void handle(CFlowGenListPerThread *thread) { assert(m_state != PCAP_INVALID); thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + + // read the next packet + next(); + + if (is_active()) { + m_time += get_ipg(); + thread->m_node_gen.m_p_queue.push((CGenNode *)this); + + } else { + thread->stop_stateless_traffic(get_port_id()); + } } void set_mbuf_dir(pkt_dir_t dir) { @@ -483,12 +509,25 @@ public: return m_port_id; } + void mark_for_free() { + m_state = PCAP_MARKED_FOR_FREE; + } + + bool is_active() { + return (m_state == PCAP_ACTIVE); + } + + bool is_marked_for_free() { + return (m_state == PCAP_MARKED_FOR_FREE); + } + private: enum { PCAP_INVALID = 0, PCAP_ACTIVE, - PCAP_EOF + PCAP_INACTIVE, + PCAP_MARKED_FOR_FREE }; /* cache line 0 */ @@ -496,11 +535,10 @@ private: uint8_t m_mac_addr[12]; uint8_t m_state; - //double m_base_time; - //double m_current_pkt_time; double m_last_pkt_time; - - void * m_cache_mbuf; + double m_speedup; + double m_ipg_sec; + uint32_t m_count; double m_next_time_offset; /* in sec */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 59e0a0a8..c0151c76 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -187,13 +187,18 @@ TrexStatelessDpUpdate::clone() { ************************/ bool TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) { - dp_core->push_pcap(m_port_id, m_event_id, m_pcap_filename); + dp_core->push_pcap(m_port_id, m_event_id, m_pcap_filename, m_ipg_usec, m_speedup, m_count); return true; } TrexStatelessCpToDpMsgBase * TrexStatelessDpPushPCAP::clone() { - TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id, m_event_id, m_pcap_filename); + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id, + m_event_id, + m_pcap_filename, + m_ipg_usec, + m_speedup, + m_count); return new_msg; } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 8fb2a456..c3de82ee 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -253,9 +253,17 @@ private: class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase { public: - TrexStatelessDpPushPCAP(uint8_t port_id, int event_id, const std::string &pcap_filename) : m_pcap_filename(pcap_filename) { + TrexStatelessDpPushPCAP(uint8_t port_id, + int event_id, + const std::string &pcap_filename, + double ipg_usec, + double speedup, + uint32_t count) : m_pcap_filename(pcap_filename) { m_port_id = port_id; m_event_id = event_id; + m_ipg_usec = ipg_usec; + m_speedup = speedup; + m_count = count; } virtual bool handle(TrexStatelessDpCore *dp_core); @@ -264,8 +272,11 @@ public: private: std::string m_pcap_filename; - uint8_t m_port_id; int m_event_id; + double m_ipg_usec; + double m_speedup; + uint32_t m_count; + uint8_t m_port_id; }; |