summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-05-03 14:57:34 +0300
committerimarom <imarom@cisco.com>2016-05-09 16:48:14 +0300
commit8691f4019dc2123c1aa7413cf3666138756c2f66 (patch)
tree4b09f137d266471b51a4e5270e8d113806c97c93
parent64847bb6d182c73f7489a821ea5724687dab1bc1 (diff)
first remote PCAP push - draft
-rwxr-xr-xscripts/automation/trex_control_plane/stl/console/trex_console.py6
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py81
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py23
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py7
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py6
-rwxr-xr-xsrc/bp_sim.cpp142
-rwxr-xr-xsrc/bp_sim.h11
-rwxr-xr-xsrc/common/captureFile.cpp16
-rwxr-xr-xsrc/common/captureFile.h6
-rw-r--r--src/main_dpdk.cpp24
-rw-r--r--src/publisher/trex_publisher.h1
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp28
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h2
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp1
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp14
-rw-r--r--src/stateless/cp/trex_dp_port_events.h13
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp55
-rw-r--r--src/stateless/cp/trex_stateless_port.h6
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp120
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h12
-rw-r--r--src/stateless/dp/trex_stream_node.h129
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp20
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h38
23 files changed, 663 insertions, 98 deletions
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py
index f8161dcb..ab70d357 100755
--- a/scripts/automation/trex_control_plane/stl/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py
@@ -319,9 +319,13 @@ class TRexConsole(TRexGeneralCmd):
return self.do_history(line)
def do_push (self, line):
- '''Push a PCAP file\n'''
+ '''Push a local PCAP file\n'''
return self.stateless_client.push_line(line)
+ #def do_push_remote (self, line):
+ # '''Push a remote accessible PCAP file\n'''
+ # return self.stateless_client.push_remote_line(line)
+
def help_push (self):
return self.do_push("-h")
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index 862a9979..c7d59690 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -313,6 +313,10 @@ class EventsHandler(object):
if session_id != self.client.session_id:
self.__async_event_port_released(port_id)
+ elif (type == 7):
+ port_id = int(data['port_id'])
+ ev = "port {0} job failed".format(port_id)
+ show_event = True
# server stopped
elif (type == 100):
@@ -711,6 +715,17 @@ class STLClient(object):
return rc
+ def __push_remote (self, pcap_filename, port_id_list, ipg_usec, speedup, count):
+
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].push_remote(pcap_filename, ipg_usec, speedup, count))
+
+ return rc
+
+
def __validate (self, port_id_list = None):
port_id_list = self.__ports(port_id_list)
@@ -1852,6 +1867,49 @@ class STLClient(object):
@__api_check(True)
+ def push_remote (self, pcap_filename, ports = None, ipg_usec = None, speedup = 1.0, count = 1):
+ """
+ Push a remote reachable PCAP file
+ the path must be fullpath accessible to the server
+
+ :parameters:
+ pcap_filename : str
+ PCAP file name in full path and accessible to the server
+
+ ports : list
+ Ports on which to execute the command
+
+ ipg_usec : float
+ Inter-packet gap in microseconds
+
+ speedup : float
+ A factor to adjust IPG. effectively IPG = IPG / speedup
+
+ count: int
+ How many times to transmit the cap
+
+
+ :raises:
+ + :exc:`STLError`
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ validate_type('pcap_filename', pcap_filename, str)
+ validate_type('ipg_usec', ipg_usec, (float, int, type(None)))
+ validate_type('speedup', speedup, float)
+ validate_type('count', count, int)
+
+ self.logger.pre_cmd("Pushing remote pcap on port(s) {0}:".format(ports))
+ rc = self.__push_remote(pcap_filename, ports, ipg_usec, speedup, count)
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+ @__api_check(True)
def validate (self, ports = None, mult = "1", duration = "-1", total = False):
"""
Validate port(s) configuration
@@ -2519,6 +2577,7 @@ class STLClient(object):
"push",
self.push_line.__doc__,
parsing_opts.FILE_PATH,
+ parsing_opts.REMOTE_FILE,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.COUNT,
parsing_opts.DURATION,
@@ -2541,15 +2600,23 @@ class STLClient(object):
self.stop(active_ports)
# pcap injection removes all previous streams from the ports
- self.remove_all_streams(ports = opts.ports)
+ if opts.remote:
+ self.push_remote(opts.file[0],
+ ports = opts.ports,
+ ipg_usec = opts.ipg_usec,
+ speedup = opts.speedup,
+ count = opts.count)
+
+ else:
+ self.remove_all_streams(ports = opts.ports)
- profile = STLProfile.load_pcap(opts.file[0],
- opts.ipg_usec,
- opts.speedup,
- opts.count)
+ profile = STLProfile.load_pcap(opts.file[0],
+ opts.ipg_usec,
+ opts.speedup,
+ opts.count)
- id_list = self.add_streams(profile.get_streams(), opts.ports)
- self.start(ports = opts.ports, duration = opts.duration, force = opts.force)
+ id_list = self.add_streams(profile.get_streams(), opts.ports)
+ self.start(ports = opts.ports, duration = opts.duration, force = opts.force)
return True
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index e8f89b27..986cb3c6 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -148,6 +148,7 @@ class Port(object):
return self.owner
def sync(self):
+
params = {"port_id": self.port_id}
rc = self.transmit("get_port_status", params)
@@ -553,6 +554,28 @@ class Port(object):
return self.ok()
+ def push_remote (self, pcap_filename, ipg_usec, speedup, count):
+ if not self.is_acquired():
+ return self.err("port is not owned")
+
+ if (self.state == self.STATE_DOWN):
+ return self.err("port is down")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "pcap_filename": pcap_filename,
+ "ipg_usec": ipg_usec if ipg_usec is not None else -1,
+ "speedup": speedup,
+ "count": count}
+
+ rc = self.transmit("push_remote", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ self.state = self.STATE_TX
+ return self.ok()
+
+
def get_profile (self):
return self.profile
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
index 165942d8..6ee587c3 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
@@ -7,6 +7,7 @@ from .trex_stl_packet_builder_scapy import STLPktBuilder, Ether, IP, UDP, TCP, R
from collections import OrderedDict, namedtuple
from scapy.utils import ltoa
+from scapy.error import Scapy_Exception
import random
import yaml
import base64
@@ -967,7 +968,11 @@ class STLProfile(object):
streams = []
last_ts_usec = 0
- pkts = RawPcapReader(pcap_file).read_all()
+ try:
+ pkts = RawPcapReader(pcap_file).read_all()
+ except Scapy_Exception as e:
+ raise STLError("failed to open PCAP file '{0}'".format(pcap_file))
+
for i, (cap, meta) in enumerate(pkts, start = 1):
# IPG - if not provided, take from cap
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
index ad46625b..98e3ca6a 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
@@ -32,6 +32,7 @@ PROMISCUOUS = 19
NO_PROMISCUOUS = 20
PROMISCUOUS_SWITCH = 21
TUNABLES = 22
+REMOTE_FILE = 23
GLOBAL_STATS = 50
PORT_STATS = 51
@@ -290,6 +291,11 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
'default': False,
'help': "Set if you want to stop active ports before appyling command."}),
+ REMOTE_FILE: ArgumentPack(['-r', '--remote'],
+ {"action": "store_true",
+ 'default': False,
+ 'help': "file path should be interpeted by the server (remote file)"}),
+
FILE_PATH: ArgumentPack(['-f'],
{'metavar': 'FILE',
'dest': 'file',
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 94f8a2ba..2491d122 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -3645,78 +3645,90 @@ int CNodeGenerator::flush_file(dsec_t max_time,
uint8_t type=node->m_type;
if ( type == CGenNode::STATELESS_PKT ) {
- m_p_queue.pop();
- CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
+ m_p_queue.pop();
+ CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
- /* if the stream has been deactivated - end */
- if ( unlikely( node_sl->is_mask_for_free() ) ) {
- thread->free_node(node);
- } else {
+ /* if the stream has been deactivated - end */
+ if ( unlikely( node_sl->is_mask_for_free() ) ) {
+ thread->free_node(node);
+ } else {
- /* count before handle - node might be destroyed */
- #ifdef TREX_SIM
- update_stl_stats(node_sl);
- #endif
+ /* count before handle - node might be destroyed */
+ #ifdef TREX_SIM
+ update_stl_stats(node_sl);
+ #endif
- node_sl->handle(thread);
+ node_sl->handle(thread);
- #ifdef TREX_SIM
- if (has_limit_reached()) {
- thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
- }
- #endif
+ #ifdef TREX_SIM
+ if (has_limit_reached()) {
+ thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
+ }
+ #endif
- }
+ }
- }else{
- if ( likely( type == CGenNode::FLOW_PKT ) ) {
- /* PKT */
- if ( !(node->is_repeat_flow()) || (always==false)) {
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
+ } else if ( likely( type == CGenNode::FLOW_PKT ) ) {
+ /* PKT */
+ if ( !(node->is_repeat_flow()) || (always==false)) {
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
+ }
+ m_p_queue.pop();
+ if ( node->is_last_in_flow() ) {
+ if ((node->is_repeat_flow()) && (always==false)) {
+ /* Flow is repeated, reschedule it */
+ thread->reschedule_flow( node);
+ } else {
+ /* Flow will not be repeated, so free node */
+ thread->free_last_flow_node( node);
}
- m_p_queue.pop();
- if ( node->is_last_in_flow() ) {
- if ((node->is_repeat_flow()) && (always==false)) {
- /* Flow is repeated, reschedule it */
- thread->reschedule_flow( node);
- }else{
- /* Flow will not be repeated, so free node */
- thread->free_last_flow_node( node);
- }
- }else{
- node->update_next_pkt_in_flow();
+ } else {
+ node->update_next_pkt_in_flow();
+ m_p_queue.push(node);
+ }
+ } else if ((type == CGenNode::FLOW_FIF)) {
+ /* callback to our method */
+ m_p_queue.pop();
+ if ( always == false) {
+ thread->m_cur_time_sec = node->m_time ;
+
+ if ( thread->generate_flows_roundrobin(&done) <0){
+ break;
+ }
+ if (!done) {
+ node->m_time +=d_time;
m_p_queue.push(node);
+ } else {
+ thread->free_node(node);
}
- }else{
- if ((type == CGenNode::FLOW_FIF)) {
- /* callback to our method */
- m_p_queue.pop();
- if ( always == false) {
- thread->m_cur_time_sec = node->m_time ;
-
- if ( thread->generate_flows_roundrobin(&done) <0){
- break;
- }
- if (!done) {
- node->m_time +=d_time;
- m_p_queue.push(node);
- }else{
- thread->free_node(node);
- }
- }else{
- thread->free_node(node);
- }
+ } else {
+ thread->free_node(node);
+ }
+
+ } else if (type == CGenNode::PCAP_PKT) {
+ m_p_queue.pop();
+
+ CGenNodePCAP *node_pcap = (CGenNodePCAP *)node;
+ node_pcap->handle(thread);
+
+ if (node_pcap->has_next()) {
+ node_pcap->next();
+ node_pcap->m_time += node_pcap->get_ipg();
+ m_p_queue.push(node);
+ } else {
+ thread->free_node(node);
+ thread->m_stateless_dp_info.stop_traffic(node_pcap->get_port_id(), false, 0);
- }else{
- bool exit_sccheduler = handle_slow_messages(type,node,thread,always);
- if (exit_sccheduler) {
- break;
- }
- }
+ }
+
+ } else {
+ bool exit_sccheduler = handle_slow_messages(type,node,thread,always);
+ if (exit_sccheduler) {
+ break;
}
}
}
@@ -6212,10 +6224,18 @@ void CGenNodeBase::free_base(){
CGenNodeStateless* p=(CGenNodeStateless*)this;
p->free_stl_node();
return;
+ }
+
+ if (m_type == PCAP_PKT) {
+ CGenNodePCAP *p = (CGenNodePCAP *)this;
+ p->destroy();
+ return;
}
+
if ( m_type == COMMAND ) {
CGenNodeCommand* p=(CGenNodeCommand*)this;
p->free_command();
}
}
+
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 1ec036c0..7399137b 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -61,6 +61,8 @@ limitations under the License.
#include <trex_stateless_dp_core.h>
+class CGenNodePCAP;
+
#undef NAT_TRACE_
#define FORCE_NO_INLINE __attribute__ ((noinline))
@@ -1419,7 +1421,9 @@ public:
EXIT_SCHED =6,
COMMAND =7,
- EXIT_PORT_SCHED =8
+ EXIT_PORT_SCHED =8,
+
+ PCAP_PKT =9,
};
@@ -3558,10 +3562,14 @@ public :
inline CGenNode * create_node(void);
+
inline CGenNodeStateless * create_node_sl(void){
return ((CGenNodeStateless*)create_node() );
}
+ inline CGenNodePCAP * allocate_pcap_node(void) {
+ return ((CGenNodePCAP*)create_node());
+ }
inline void free_node(CGenNode *p);
inline void free_last_flow_node(CGenNode *p);
@@ -3583,7 +3591,6 @@ public:
bool set_stateless_next_node( CGenNodeStateless * cur_node,
CGenNodeStateless * next_node);
-
void Dump(FILE *fd);
void DumpCsv(FILE *fd);
void DumpStats(FILE *fd);
diff --git a/src/common/captureFile.cpp b/src/common/captureFile.cpp
index e73c37ad..4c50bcb2 100755
--- a/src/common/captureFile.cpp
+++ b/src/common/captureFile.cpp
@@ -244,28 +244,23 @@ bool CErfCmp::compare(std::string f1, std::string f2 ){
return (res);
}
-
-
/**
* try to create type by type
* @param name
*
* @return CCapReaderBase*
*/
-CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops)
+CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops, std::ostream &err)
{
- if (name == NULL) {
- printf("Got null file name\n");
- return NULL;
- }
+ assert(name);
/* make sure we have a file */
FILE * f = CAP_FOPEN_64(name, "rb");
if (f == NULL) {
if (errno == ENOENT) {
- printf("\nERROR: Cap file not found %s\n\n",name);
+ err << "CAP file '" << name << "' not found";
} else {
- printf("\nERROR: Failed to open cap file '%s' with errno %d\n\n", name, errno);
+ err << "failed to open CAP file '" << name << "' with errno " << errno;
}
return NULL;
}
@@ -281,8 +276,7 @@ CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops)
delete next;
}
- printf("\nERROR: file %s format not supported",name);
- printf("\nERROR: formats supported are LIBPCAP and ERF. other formats are deprecated\n\n");
+ err << "unsupported CAP format (not PCAP or ERF): " << name << "\n";
return NULL;
}
diff --git a/src/common/captureFile.h b/src/common/captureFile.h
index 3be83432..32b98272 100755
--- a/src/common/captureFile.h
+++ b/src/common/captureFile.h
@@ -24,6 +24,8 @@ limitations under the License.
#include <math.h>
#include <stdlib.h>
#include <string>
+#include <iostream>
+
#ifdef WIN32
#pragma warning(disable:4786)
#endif
@@ -201,11 +203,13 @@ public:
* @param name - cature file name
* @param loops - number of loops for the same capture. use 0
* for one time transmition
+ * @param err - IO stream to print error
+ *
* @return CCapReaderBase* - pointer to new instance (allocated
* by the function). the user should release the
* instance once it has no use any more.
*/
- static CCapReaderBase * CreateReader(char * name, int loops = 0);
+ static CCapReaderBase * CreateReader(char * name, int loops = 0, std::ostream &err = std::cout);
private:
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 6dec3dec..3c345aa5 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -1773,6 +1773,8 @@ protected:
class CCoreEthIFStateless : public CCoreEthIF {
public:
virtual int send_node(CGenNode * node);
+protected:
+ int send_pcap_node(CGenNodePCAP *pcap_node);
};
bool CCoreEthIF::Create(uint8_t core_id,
@@ -1998,7 +2000,13 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){
-int CCoreEthIFStateless::send_node(CGenNode * no){
+int CCoreEthIFStateless::send_node(CGenNode * no) {
+
+ /* slow path - PCAP nodes */
+ if (no->m_type == CGenNode::PCAP_PKT) {
+ return send_pcap_node((CGenNodePCAP *)no);
+ }
+
CGenNodeStateless * node_sl=(CGenNodeStateless *) no;
/* check that we have mbuf */
rte_mbuf_t * m=node_sl->get_cache_mbuf();
@@ -2027,6 +2035,20 @@ int CCoreEthIFStateless::send_node(CGenNode * no){
return (0);
};
+int CCoreEthIFStateless::send_pcap_node(CGenNodePCAP *pcap_node) {
+ rte_mbuf_t *m = pcap_node->get_pkt();
+ if (!m) {
+ return (-1);
+ }
+
+ pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
+ CCorePerPort *lp_port=&m_ports[dir];
+ CVirtualIFPerSideStats *lp_stats = &m_stats[dir];
+
+ send_pkt(lp_port, m, lp_stats);
+
+ return (0);
+}
int CCoreEthIF::send_node(CGenNode * node){
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index f8843758..1d283478 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -48,6 +48,7 @@ public:
EVENT_PORT_FINISHED_TX = 4,
EVENT_PORT_ACQUIRED = 5,
EVENT_PORT_RELEASED = 6,
+ EVENT_PORT_ERROR = 7,
EVENT_SERVER_STOPPED = 100,
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index f7a23188..d48c770e 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -451,3 +451,31 @@ TrexRpcPublishNow::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
+
+
+/**
+ * push a remote PCAP on a port
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_port(params, result);
+ std::string pcap_filename = parse_string(params, "pcap_filename", result);
+ double ipg = parse_double(params, "ipg", result);
+ double speedup = parse_double(params, "speedup", result);
+ uint32_t count = parse_uint32(params, "count", result);
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->push_remote(pcap_filename, ipg, speedup, count);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+
+}
+
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 428bdd7b..99c83545 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -130,5 +130,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true, APIClass
TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 5, true, APIClass::API_CLASS_TYPE_CORE);
+
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 924503f2..7104792e 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -65,6 +65,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdRemoveRXFilters());
register_command(new TrexRpcCmdValidate());
+ register_command(new TrexRpcCmdPushRemote());
}
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index 1321a362..fc96e00a 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -78,6 +78,9 @@ protected:
virtual void on_event() {
/* do nothing */
}
+ virtual void on_error(int thread_id) {
+ /* do nothing */
+ }
};
void
@@ -105,14 +108,14 @@ TrexDpPortEvents::barrier() {
*
*/
void
-TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id, bool status) {
TrexDpPortEvent *event = lookup(event_id);
/* event might have been deleted */
if (!event) {
return;
}
- bool done = event->on_core_reporting_in(thread_id);
+ bool done = event->on_core_reporting_in(thread_id, status);
if (done) {
destroy_event(event_id);
@@ -150,7 +153,7 @@ TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
}
bool
-TrexDpPortEvent::on_core_reporting_in(int thread_id) {
+TrexDpPortEvent::on_core_reporting_in(int thread_id, bool status) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
std::stringstream err;
@@ -163,6 +166,11 @@ TrexDpPortEvent::on_core_reporting_in(int thread_id) {
m_signal.at(thread_id) = true;
m_pending_cnt--;
+ /* if any core reported an error - mark as a failure */
+ if (!status) {
+ on_error(thread_id);
+ }
+
/* event occured */
if (m_pending_cnt == 0) {
on_event();
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 3b8c8633..681e47ab 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -48,13 +48,22 @@ protected:
*/
virtual void on_event() = 0;
+ /**
+ * when a thread ID encounter an error
+ *
+ * @author imarom (20-Apr-16)
+ *
+ * @param thread_id
+ */
+ virtual void on_error(int thread_id) = 0;
+
TrexStatelessPort *get_port() {
return m_port;
}
private:
void init(TrexStatelessPort *port, int event_id, int timeout_ms);
- bool on_core_reporting_in(int thread_id);
+ bool on_core_reporting_in(int thread_id, bool status = true);
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
@@ -98,7 +107,7 @@ public:
/**
* a core has reached the event
*/
- void on_core_reporting_in(int event_id, int thread_id);
+ void on_core_reporting_in(int event_id, int thread_id, bool status = true);
private:
TrexDpPortEvent *lookup(int event_id);
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 90142d9b..b09393f9 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -24,6 +24,7 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_streams_compiler.h>
#include <common/basic_utils.h>
+#include <common/captureFile.h>
#include <string>
@@ -70,6 +71,20 @@ protected:
assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
+
+ /**
+ * when a DP core encountered an error
+ *
+ * @author imarom (20-Apr-16)
+ */
+ virtual void on_error(int thread_id) {
+ Json::Value data;
+
+ data["port_id"] = get_port()->get_port_id();
+ data["thread_id"] = thread_id;
+
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
+ }
};
/***************************
@@ -395,6 +410,46 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
+void
+TrexStatelessPort::push_remote(const std::string &pcap_filename, double ipg_usec, double speedup, uint32_t count) {
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ /* check that file exists */
+ CCapReaderBase *reader;
+ std::stringstream ss;
+ reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!reader) {
+ throw TrexException(ss.str());
+ }
+ delete reader;
+
+ /* only one core gets to play */
+ int tx_core = m_cores_id_list[0];
+
+ /* create async event */
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
+
+ /* mark all other cores as done */
+ for (int index = 1; index < m_cores_id_list.size(); index++) {
+ /* mimic an end event */
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
+ }
+
+ /* send a message to core */
+ change_state(PORT_STATE_TX);
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_pending_async_stop_event,
+ pcap_filename);
+ send_message_to_dp(tx_core, push_msg);
+
+ /* update subscribers */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
+}
+
std::string
TrexStatelessPort::get_state_as_string() const {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 520940d8..502c066d 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -212,6 +212,12 @@ public:
void update_traffic(const TrexPortMultiplier &mul, bool force);
/**
+ * push a PCAP file onto the port
+ *
+ */
+ void push_remote(const std::string &pcap_filename, double ipg_usec, double speedup, uint32_t count);
+
+ /**
* get the port state
*
*/
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index d3d49a34..31c907fa 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -262,6 +262,36 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
return (true);
}
+bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_filename){
+
+ /* push pcap can only happen on an idle port from the core prespective */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
+
+ CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
+ if (!pcap_node) {
+ return (false);
+ }
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
+
+ uint8_t mac_addr[12];
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
+
+ bool rc = pcap_node->create(port_id, pcap_filename, dir, mac_addr);
+ if (!rc) {
+ m_core->free_node((CGenNode *)pcap_node);
+ return (false);
+ }
+
+ /* schedule the node for now */
+ pcap_node->m_time = m_core->m_cur_time_sec;
+ m_core->m_node_gen.add_node((CGenNode *)pcap_node);
+
+ m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+
+ return (true);
+}
+
bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
bool stop_on_id,
@@ -305,7 +335,6 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
m_core=core;
m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
- m_port_id=0;
m_active_streams=0;
m_active_nodes.clear();
}
@@ -579,6 +608,7 @@ void
TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
TrexStream * stream,
TrexStreamsCompiledObj *comp) {
+
CGenNodeStateless *node = m_core->create_node_sl();
/* add periodic */
@@ -834,6 +864,37 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){
lp_port->pause_traffic(port_id);
}
+
+void
+TrexStatelessDpCore::push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename) {
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->set_event_id(event_id);
+
+ /* delegate the command to the port */
+ bool rc = lp_port->push_pcap(port_id, pcap_filename);
+ if (!rc) {
+ /* report back that we stopped */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ event_id,
+ false);
+ ring->Enqueue((CGenNode *)event_msg);
+ return;
+ }
+
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+ #if 0
+ if ( duration > 0.0 ){
+ add_port_duration(duration, port_id, event_id);
+ }
+ #endif
+}
+
+
void
TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
@@ -895,3 +956,60 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
event_id);
ring->Enqueue((CGenNode *)event_msg);
}
+
+
+/**
+ * PCAP node
+ */
+bool CGenNodePCAP::create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr) {
+ std::stringstream ss;
+
+ m_type = CGenNode::PCAP_PKT;
+ m_flags = 0;
+ m_src_port = 0;
+ m_port_id = port_id;
+
+ /* copy MAC addr info */
+ memcpy(m_mac_addr, mac_addr, 12);
+
+ /* set the dir */
+ set_mbuf_dir(dir);
+
+ /* create the PCAP reader */
+ m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!m_reader) {
+ return false;
+ }
+
+ m_raw_packet = new CCapPktRaw();
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ /* handle error */
+ delete m_reader;
+ return (false);
+ }
+
+ /* this is the reference time */
+ //m_base_time = m_raw_packet->get_time();
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* ready */
+ m_state = PCAP_ACTIVE;
+
+ return true;
+}
+
+void CGenNodePCAP::destroy() {
+
+ if (m_raw_packet) {
+ delete m_raw_packet;
+ m_raw_packet = NULL;
+ }
+
+ if (m_reader) {
+ delete m_reader;
+ m_reader = NULL;
+ }
+
+ m_state = PCAP_INVALID;
+}
+
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index cb102b8d..01033a7c 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -70,6 +70,8 @@ public:
bool update_traffic(uint8_t port_id, double factor);
+ bool push_pcap(uint8_t port_id, const std::string &pcap_filename);
+
bool stop_traffic(uint8_t port_id,
bool stop_on_id,
int event_id);
@@ -91,7 +93,6 @@ public:
public:
state_e m_state;
- uint8_t m_port_id;
uint32_t m_active_streams; /* how many active streams on this port */
@@ -149,7 +150,7 @@ public:
*/
void start_traffic(TrexStreamsCompiledObj *obj,
double duration,
- int m_event_id);
+ int event_id);
/* pause the streams, work only if all are continues */
@@ -161,6 +162,13 @@ public:
/**
+ * push a PCAP file on port
+ *
+ */
+ void push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename);
+
+
+ /**
* update current traffic rate
*
* @author imarom (25-Nov-15)
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index c85bf8b5..8ccb5286 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -26,6 +26,8 @@ limitations under the License.
#include <stdio.h>
class TrexStatelessDpCore;
+class TrexStatelessDpPerPort;
+
#include <trex_stream.h>
class TrexStatelessCpToDpMsgBase;
@@ -387,6 +389,133 @@ private:
static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+/* this is a event for PCAP transmitting */
+struct CGenNodePCAP : public CGenNodeBase {
+friend class TrexStatelessDpPerPort;
+
+public:
+
+ /**
+ * creates a node from a PCAP file
+ */
+ bool create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr);
+ void destroy();
+
+ /**
+ * advance - will read the next packet
+ *
+ * @author imarom (03-May-16)
+ */
+ void next() {
+ assert(m_state == PCAP_ACTIVE);
+
+ /* save the previous packet time */
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* advance */
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ m_state = PCAP_EOF;
+ return;
+ }
+
+ }
+
+ /**
+ * return true if the PCAP has next packet
+ *
+ */
+ bool has_next() {
+ assert(m_state != PCAP_INVALID);
+ return (m_state == PCAP_ACTIVE);
+ }
+
+ /**
+ * return the time for the next scheduling for a packet
+ *
+ */
+ inline double get_ipg() {
+ assert(m_state != PCAP_INVALID);
+ return m_raw_packet->get_time() - m_last_pkt_time;
+ //return 0.00001;
+ }
+
+ /**
+ * get the current packet as MBUF
+ *
+ */
+ inline rte_mbuf_t *get_pkt() {
+ assert(m_state != PCAP_INVALID);
+
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m_raw_packet->getTotalLen());
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, m_raw_packet->getTotalLen());
+ assert(p);
+ /* copy the packet */
+ memcpy(p, m_raw_packet->raw, m_raw_packet->getTotalLen());
+
+ /* fix the MAC */
+ memcpy(p, m_mac_addr, 12);
+
+ return (m);
+ }
+
+
+ inline void handle(CFlowGenListPerThread *thread) {
+ assert(m_state != PCAP_INVALID);
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ }
+
+ void set_mbuf_dir(pkt_dir_t dir) {
+ if (dir) {
+ m_flags |=NODE_FLAGS_DIR;
+ }else{
+ m_flags &=~NODE_FLAGS_DIR;
+ }
+ }
+
+ inline pkt_dir_t get_mbuf_dir(){
+ return ((pkt_dir_t)( m_flags &1));
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+private:
+
+ enum {
+ PCAP_INVALID = 0,
+ PCAP_ACTIVE,
+ PCAP_EOF
+ };
+
+ /* cache line 0 */
+ /* important stuff here */
+ uint8_t m_mac_addr[12];
+ uint8_t m_state;
+
+ //double m_base_time;
+ //double m_current_pkt_time;
+ double m_last_pkt_time;
+
+ void * m_cache_mbuf;
+
+ double m_next_time_offset; /* in sec */
+
+ CCapReaderBase *m_reader;
+ CCapPktRaw *m_raw_packet;
+
+ uint8_t m_port_id;
+
+ /* pad to match the size of CGenNode */
+ uint8_t m_pad_end[25];
+
+} __rte_cache_aligned;
+
+
+static_assert(sizeof(CGenNodePCAP) == sizeof(CGenNode), "sizeof(CGenNodePCAP) != sizeof(CGenNode)" );
#endif /* __TREX_STREAM_NODE_H__ */
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 7edf0f13..59e0a0a8 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -181,6 +181,24 @@ TrexStatelessDpUpdate::clone() {
return new_msg;
}
+
+/*************************
+ push PCAP message
+ ************************/
+bool
+TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->push_pcap(m_port_id, m_event_id, m_pcap_filename);
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpPushPCAP::clone() {
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id, m_event_id, m_pcap_filename);
+
+ return new_msg;
+}
+
+
/*************************
barrier message
************************/
@@ -203,7 +221,7 @@ TrexStatelessDpBarrier::clone() {
bool
TrexDpPortEventMsg::handle() {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
- port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
+ port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id, get_status());
return (true);
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 0eed01bd..8fb2a456 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -246,6 +246,29 @@ private:
double m_factor;
};
+
+/**
+ * psuh a PCAP message
+ */
+class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPushPCAP(uint8_t port_id, int event_id, const std::string &pcap_filename) : m_pcap_filename(pcap_filename) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ std::string m_pcap_filename;
+ uint8_t m_port_id;
+ int m_event_id;
+};
+
+
/**
* barrier message for DP core
*
@@ -267,6 +290,7 @@ private:
int m_event_id;
};
+
/************************* messages from DP to CP **********************/
/**
@@ -303,10 +327,11 @@ public:
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
- TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
- m_thread_id = thread_id;
- m_port_id = port_id;
- m_event_id = event_id;
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id, bool status = true) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_status = status;
}
virtual bool handle();
@@ -323,10 +348,15 @@ public:
return m_event_id;
}
+ bool get_status() {
+ return m_status;
+ }
+
private:
int m_thread_id;
uint8_t m_port_id;
int m_event_id;
+ bool m_status;
};