diff options
author | imarom <imarom@cisco.com> | 2016-05-03 14:57:34 +0300 |
---|---|---|
committer | imarom <imarom@cisco.com> | 2016-05-09 16:48:14 +0300 |
commit | 8691f4019dc2123c1aa7413cf3666138756c2f66 (patch) | |
tree | 4b09f137d266471b51a4e5270e8d113806c97c93 /src | |
parent | 64847bb6d182c73f7489a821ea5724687dab1bc1 (diff) |
first remote PCAP push - draft
Diffstat (limited to 'src')
-rwxr-xr-x | src/bp_sim.cpp | 142 | ||||
-rwxr-xr-x | src/bp_sim.h | 11 | ||||
-rwxr-xr-x | src/common/captureFile.cpp | 16 | ||||
-rwxr-xr-x | src/common/captureFile.h | 6 | ||||
-rw-r--r-- | src/main_dpdk.cpp | 24 | ||||
-rw-r--r-- | src/publisher/trex_publisher.h | 1 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmd_general.cpp | 28 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmds.h | 2 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_cmds_table.cpp | 1 | ||||
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.cpp | 14 | ||||
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.h | 13 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 55 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 6 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 120 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 12 | ||||
-rw-r--r-- | src/stateless/dp/trex_stream_node.h | 129 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 20 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 38 |
18 files changed, 549 insertions, 89 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 94f8a2ba..2491d122 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -3645,78 +3645,90 @@ int CNodeGenerator::flush_file(dsec_t max_time, uint8_t type=node->m_type; if ( type == CGenNode::STATELESS_PKT ) { - m_p_queue.pop(); - CGenNodeStateless *node_sl = (CGenNodeStateless *)node; + m_p_queue.pop(); + CGenNodeStateless *node_sl = (CGenNodeStateless *)node; - /* if the stream has been deactivated - end */ - if ( unlikely( node_sl->is_mask_for_free() ) ) { - thread->free_node(node); - } else { + /* if the stream has been deactivated - end */ + if ( unlikely( node_sl->is_mask_for_free() ) ) { + thread->free_node(node); + } else { - /* count before handle - node might be destroyed */ - #ifdef TREX_SIM - update_stl_stats(node_sl); - #endif + /* count before handle - node might be destroyed */ + #ifdef TREX_SIM + update_stl_stats(node_sl); + #endif - node_sl->handle(thread); + node_sl->handle(thread); - #ifdef TREX_SIM - if (has_limit_reached()) { - thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0); - } - #endif + #ifdef TREX_SIM + if (has_limit_reached()) { + thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0); + } + #endif - } + } - }else{ - if ( likely( type == CGenNode::FLOW_PKT ) ) { - /* PKT */ - if ( !(node->is_repeat_flow()) || (always==false)) { - flush_one_node_to_file(node); - #ifdef _DEBUG - update_stats(node); - #endif + } else if ( likely( type == CGenNode::FLOW_PKT ) ) { + /* PKT */ + if ( !(node->is_repeat_flow()) || (always==false)) { + flush_one_node_to_file(node); + #ifdef _DEBUG + update_stats(node); + #endif + } + m_p_queue.pop(); + if ( node->is_last_in_flow() ) { + if ((node->is_repeat_flow()) && (always==false)) { + /* Flow is repeated, reschedule it */ + thread->reschedule_flow( node); + } else { + /* Flow will not be repeated, so free node */ + thread->free_last_flow_node( node); } - m_p_queue.pop(); - if ( node->is_last_in_flow() ) { - if ((node->is_repeat_flow()) && (always==false)) { - /* Flow is repeated, reschedule it */ - thread->reschedule_flow( node); - }else{ - /* Flow will not be repeated, so free node */ - thread->free_last_flow_node( node); - } - }else{ - node->update_next_pkt_in_flow(); + } else { + node->update_next_pkt_in_flow(); + m_p_queue.push(node); + } + } else if ((type == CGenNode::FLOW_FIF)) { + /* callback to our method */ + m_p_queue.pop(); + if ( always == false) { + thread->m_cur_time_sec = node->m_time ; + + if ( thread->generate_flows_roundrobin(&done) <0){ + break; + } + if (!done) { + node->m_time +=d_time; m_p_queue.push(node); + } else { + thread->free_node(node); } - }else{ - if ((type == CGenNode::FLOW_FIF)) { - /* callback to our method */ - m_p_queue.pop(); - if ( always == false) { - thread->m_cur_time_sec = node->m_time ; - - if ( thread->generate_flows_roundrobin(&done) <0){ - break; - } - if (!done) { - node->m_time +=d_time; - m_p_queue.push(node); - }else{ - thread->free_node(node); - } - }else{ - thread->free_node(node); - } + } else { + thread->free_node(node); + } + + } else if (type == CGenNode::PCAP_PKT) { + m_p_queue.pop(); + + CGenNodePCAP *node_pcap = (CGenNodePCAP *)node; + node_pcap->handle(thread); + + if (node_pcap->has_next()) { + node_pcap->next(); + node_pcap->m_time += node_pcap->get_ipg(); + m_p_queue.push(node); + } else { + thread->free_node(node); + thread->m_stateless_dp_info.stop_traffic(node_pcap->get_port_id(), false, 0); - }else{ - bool exit_sccheduler = handle_slow_messages(type,node,thread,always); - if (exit_sccheduler) { - break; - } - } + } + + } else { + bool exit_sccheduler = handle_slow_messages(type,node,thread,always); + if (exit_sccheduler) { + break; } } } @@ -6212,10 +6224,18 @@ void CGenNodeBase::free_base(){ CGenNodeStateless* p=(CGenNodeStateless*)this; p->free_stl_node(); return; + } + + if (m_type == PCAP_PKT) { + CGenNodePCAP *p = (CGenNodePCAP *)this; + p->destroy(); + return; } + if ( m_type == COMMAND ) { CGenNodeCommand* p=(CGenNodeCommand*)this; p->free_command(); } } + diff --git a/src/bp_sim.h b/src/bp_sim.h index 1ec036c0..7399137b 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -61,6 +61,8 @@ limitations under the License. #include <trex_stateless_dp_core.h> +class CGenNodePCAP; + #undef NAT_TRACE_ #define FORCE_NO_INLINE __attribute__ ((noinline)) @@ -1419,7 +1421,9 @@ public: EXIT_SCHED =6, COMMAND =7, - EXIT_PORT_SCHED =8 + EXIT_PORT_SCHED =8, + + PCAP_PKT =9, }; @@ -3558,10 +3562,14 @@ public : inline CGenNode * create_node(void); + inline CGenNodeStateless * create_node_sl(void){ return ((CGenNodeStateless*)create_node() ); } + inline CGenNodePCAP * allocate_pcap_node(void) { + return ((CGenNodePCAP*)create_node()); + } inline void free_node(CGenNode *p); inline void free_last_flow_node(CGenNode *p); @@ -3583,7 +3591,6 @@ public: bool set_stateless_next_node( CGenNodeStateless * cur_node, CGenNodeStateless * next_node); - void Dump(FILE *fd); void DumpCsv(FILE *fd); void DumpStats(FILE *fd); diff --git a/src/common/captureFile.cpp b/src/common/captureFile.cpp index e73c37ad..4c50bcb2 100755 --- a/src/common/captureFile.cpp +++ b/src/common/captureFile.cpp @@ -244,28 +244,23 @@ bool CErfCmp::compare(std::string f1, std::string f2 ){ return (res); } - - /** * try to create type by type * @param name * * @return CCapReaderBase* */ -CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops) +CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops, std::ostream &err) { - if (name == NULL) { - printf("Got null file name\n"); - return NULL; - } + assert(name); /* make sure we have a file */ FILE * f = CAP_FOPEN_64(name, "rb"); if (f == NULL) { if (errno == ENOENT) { - printf("\nERROR: Cap file not found %s\n\n",name); + err << "CAP file '" << name << "' not found"; } else { - printf("\nERROR: Failed to open cap file '%s' with errno %d\n\n", name, errno); + err << "failed to open CAP file '" << name << "' with errno " << errno; } return NULL; } @@ -281,8 +276,7 @@ CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops) delete next; } - printf("\nERROR: file %s format not supported",name); - printf("\nERROR: formats supported are LIBPCAP and ERF. other formats are deprecated\n\n"); + err << "unsupported CAP format (not PCAP or ERF): " << name << "\n"; return NULL; } diff --git a/src/common/captureFile.h b/src/common/captureFile.h index 3be83432..32b98272 100755 --- a/src/common/captureFile.h +++ b/src/common/captureFile.h @@ -24,6 +24,8 @@ limitations under the License. #include <math.h> #include <stdlib.h> #include <string> +#include <iostream> + #ifdef WIN32 #pragma warning(disable:4786) #endif @@ -201,11 +203,13 @@ public: * @param name - cature file name * @param loops - number of loops for the same capture. use 0 * for one time transmition + * @param err - IO stream to print error + * * @return CCapReaderBase* - pointer to new instance (allocated * by the function). the user should release the * instance once it has no use any more. */ - static CCapReaderBase * CreateReader(char * name, int loops = 0); + static CCapReaderBase * CreateReader(char * name, int loops = 0, std::ostream &err = std::cout); private: diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 6dec3dec..3c345aa5 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -1773,6 +1773,8 @@ protected: class CCoreEthIFStateless : public CCoreEthIF { public: virtual int send_node(CGenNode * node); +protected: + int send_pcap_node(CGenNodePCAP *pcap_node); }; bool CCoreEthIF::Create(uint8_t core_id, @@ -1998,7 +2000,13 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){ -int CCoreEthIFStateless::send_node(CGenNode * no){ +int CCoreEthIFStateless::send_node(CGenNode * no) { + + /* slow path - PCAP nodes */ + if (no->m_type == CGenNode::PCAP_PKT) { + return send_pcap_node((CGenNodePCAP *)no); + } + CGenNodeStateless * node_sl=(CGenNodeStateless *) no; /* check that we have mbuf */ rte_mbuf_t * m=node_sl->get_cache_mbuf(); @@ -2027,6 +2035,20 @@ int CCoreEthIFStateless::send_node(CGenNode * no){ return (0); }; +int CCoreEthIFStateless::send_pcap_node(CGenNodePCAP *pcap_node) { + rte_mbuf_t *m = pcap_node->get_pkt(); + if (!m) { + return (-1); + } + + pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir(); + CCorePerPort *lp_port=&m_ports[dir]; + CVirtualIFPerSideStats *lp_stats = &m_stats[dir]; + + send_pkt(lp_port, m, lp_stats); + + return (0); +} int CCoreEthIF::send_node(CGenNode * node){ diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h index f8843758..1d283478 100644 --- a/src/publisher/trex_publisher.h +++ b/src/publisher/trex_publisher.h @@ -48,6 +48,7 @@ public: EVENT_PORT_FINISHED_TX = 4, EVENT_PORT_ACQUIRED = 5, EVENT_PORT_RELEASED = 6, + EVENT_PORT_ERROR = 7, EVENT_SERVER_STOPPED = 100, diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index f7a23188..d48c770e 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -451,3 +451,31 @@ TrexRpcPublishNow::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } + + +/** + * push a remote PCAP on a port + * + */ +trex_rpc_cmd_rc_e +TrexRpcCmdPushRemote::_run(const Json::Value ¶ms, Json::Value &result) { + + uint8_t port_id = parse_port(params, result); + std::string pcap_filename = parse_string(params, "pcap_filename", result); + double ipg = parse_double(params, "ipg", result); + double speedup = parse_double(params, "speedup", result); + uint32_t count = parse_uint32(params, "count", result); + + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + try { + port->push_remote(pcap_filename, ipg, speedup, count); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + result["result"] = Json::objectValue; + return (TREX_RPC_CMD_OK); + +} + diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 428bdd7b..99c83545 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -130,5 +130,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true, APIClass TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 5, true, APIClass::API_CLASS_TYPE_CORE); + #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 924503f2..7104792e 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -65,6 +65,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdRemoveRXFilters()); register_command(new TrexRpcCmdValidate()); + register_command(new TrexRpcCmdPushRemote()); } diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp index 1321a362..fc96e00a 100644 --- a/src/stateless/cp/trex_dp_port_events.cpp +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -78,6 +78,9 @@ protected: virtual void on_event() { /* do nothing */ } + virtual void on_error(int thread_id) { + /* do nothing */ + } }; void @@ -105,14 +108,14 @@ TrexDpPortEvents::barrier() { * */ void -TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) { +TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id, bool status) { TrexDpPortEvent *event = lookup(event_id); /* event might have been deleted */ if (!event) { return; } - bool done = event->on_core_reporting_in(thread_id); + bool done = event->on_core_reporting_in(thread_id, status); if (done) { destroy_event(event_id); @@ -150,7 +153,7 @@ TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) { } bool -TrexDpPortEvent::on_core_reporting_in(int thread_id) { +TrexDpPortEvent::on_core_reporting_in(int thread_id, bool status) { /* mark sure no double signal */ if (m_signal.at(thread_id)) { std::stringstream err; @@ -163,6 +166,11 @@ TrexDpPortEvent::on_core_reporting_in(int thread_id) { m_signal.at(thread_id) = true; m_pending_cnt--; + /* if any core reported an error - mark as a failure */ + if (!status) { + on_error(thread_id); + } + /* event occured */ if (m_pending_cnt == 0) { on_event(); diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h index 3b8c8633..681e47ab 100644 --- a/src/stateless/cp/trex_dp_port_events.h +++ b/src/stateless/cp/trex_dp_port_events.h @@ -48,13 +48,22 @@ protected: */ virtual void on_event() = 0; + /** + * when a thread ID encounter an error + * + * @author imarom (20-Apr-16) + * + * @param thread_id + */ + virtual void on_error(int thread_id) = 0; + TrexStatelessPort *get_port() { return m_port; } private: void init(TrexStatelessPort *port, int event_id, int timeout_ms); - bool on_core_reporting_in(int thread_id); + bool on_core_reporting_in(int thread_id, bool status = true); std::unordered_map<int, bool> m_signal; int m_pending_cnt; @@ -98,7 +107,7 @@ public: /** * a core has reached the event */ - void on_core_reporting_in(int event_id, int thread_id); + void on_core_reporting_in(int event_id, int thread_id, bool status = true); private: TrexDpPortEvent *lookup(int event_id); diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 90142d9b..b09393f9 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -24,6 +24,7 @@ limitations under the License. #include <trex_stateless_messaging.h> #include <trex_streams_compiler.h> #include <common/basic_utils.h> +#include <common/captureFile.h> #include <string> @@ -70,6 +71,20 @@ protected: assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID); get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; } + + /** + * when a DP core encountered an error + * + * @author imarom (20-Apr-16) + */ + virtual void on_error(int thread_id) { + Json::Value data; + + data["port_id"] = get_port()->get_port_id(); + data["thread_id"] = thread_id; + + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data); + } }; /*************************** @@ -395,6 +410,46 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) { } +void +TrexStatelessPort::push_remote(const std::string &pcap_filename, double ipg_usec, double speedup, uint32_t count) { + /* command allowed only on state stream */ + verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS); + + /* check that file exists */ + CCapReaderBase *reader; + std::stringstream ss; + reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss); + if (!reader) { + throw TrexException(ss.str()); + } + delete reader; + + /* only one core gets to play */ + int tx_core = m_cores_id_list[0]; + + /* create async event */ + assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID); + m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent()); + + /* mark all other cores as done */ + for (int index = 1; index < m_cores_id_list.size(); index++) { + /* mimic an end event */ + m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]); + } + + /* send a message to core */ + change_state(PORT_STATE_TX); + TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id, + m_pending_async_stop_event, + pcap_filename); + send_message_to_dp(tx_core, push_msg); + + /* update subscribers */ + Json::Value data; + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data); +} + std::string TrexStatelessPort::get_state_as_string() const { diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 520940d8..502c066d 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -212,6 +212,12 @@ public: void update_traffic(const TrexPortMultiplier &mul, bool force); /** + * push a PCAP file onto the port + * + */ + void push_remote(const std::string &pcap_filename, double ipg_usec, double speedup, uint32_t count); + + /** * get the port state * */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index d3d49a34..31c907fa 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -262,6 +262,36 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){ return (true); } +bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_filename){ + + /* push pcap can only happen on an idle port from the core prespective */ + assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE); + + CGenNodePCAP *pcap_node = m_core->allocate_pcap_node(); + if (!pcap_node) { + return (false); + } + + pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id); + + uint8_t mac_addr[12]; + m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr); + + bool rc = pcap_node->create(port_id, pcap_filename, dir, mac_addr); + if (!rc) { + m_core->free_node((CGenNode *)pcap_node); + return (false); + } + + /* schedule the node for now */ + pcap_node->m_time = m_core->m_cur_time_sec; + m_core->m_node_gen.add_node((CGenNode *)pcap_node); + + m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + + return (true); +} + bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, bool stop_on_id, @@ -305,7 +335,6 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){ m_core=core; m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; - m_port_id=0; m_active_streams=0; m_active_nodes.clear(); } @@ -579,6 +608,7 @@ void TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, TrexStream * stream, TrexStreamsCompiledObj *comp) { + CGenNodeStateless *node = m_core->create_node_sl(); /* add periodic */ @@ -834,6 +864,37 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){ lp_port->pause_traffic(port_id); } + +void +TrexStatelessDpCore::push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename) { + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + lp_port->set_event_id(event_id); + + /* delegate the command to the port */ + bool rc = lp_port->push_pcap(port_id, pcap_filename); + if (!rc) { + /* report back that we stopped */ + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); + TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id, + port_id, + event_id, + false); + ring->Enqueue((CGenNode *)event_msg); + return; + } + + m_state = TrexStatelessDpCore::STATE_TRANSMITTING; + + #if 0 + if ( duration > 0.0 ){ + add_port_duration(duration, port_id, event_id); + } + #endif +} + + void TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) { @@ -895,3 +956,60 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) { event_id); ring->Enqueue((CGenNode *)event_msg); } + + +/** + * PCAP node + */ +bool CGenNodePCAP::create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr) { + std::stringstream ss; + + m_type = CGenNode::PCAP_PKT; + m_flags = 0; + m_src_port = 0; + m_port_id = port_id; + + /* copy MAC addr info */ + memcpy(m_mac_addr, mac_addr, 12); + + /* set the dir */ + set_mbuf_dir(dir); + + /* create the PCAP reader */ + m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss); + if (!m_reader) { + return false; + } + + m_raw_packet = new CCapPktRaw(); + if ( m_reader->ReadPacket(m_raw_packet) == false ){ + /* handle error */ + delete m_reader; + return (false); + } + + /* this is the reference time */ + //m_base_time = m_raw_packet->get_time(); + m_last_pkt_time = m_raw_packet->get_time(); + + /* ready */ + m_state = PCAP_ACTIVE; + + return true; +} + +void CGenNodePCAP::destroy() { + + if (m_raw_packet) { + delete m_raw_packet; + m_raw_packet = NULL; + } + + if (m_reader) { + delete m_reader; + m_reader = NULL; + } + + m_state = PCAP_INVALID; +} + diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index cb102b8d..01033a7c 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -70,6 +70,8 @@ public: bool update_traffic(uint8_t port_id, double factor); + bool push_pcap(uint8_t port_id, const std::string &pcap_filename); + bool stop_traffic(uint8_t port_id, bool stop_on_id, int event_id); @@ -91,7 +93,6 @@ public: public: state_e m_state; - uint8_t m_port_id; uint32_t m_active_streams; /* how many active streams on this port */ @@ -149,7 +150,7 @@ public: */ void start_traffic(TrexStreamsCompiledObj *obj, double duration, - int m_event_id); + int event_id); /* pause the streams, work only if all are continues */ @@ -161,6 +162,13 @@ public: /** + * push a PCAP file on port + * + */ + void push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename); + + + /** * update current traffic rate * * @author imarom (25-Nov-15) diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index c85bf8b5..8ccb5286 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -26,6 +26,8 @@ limitations under the License. #include <stdio.h> class TrexStatelessDpCore; +class TrexStatelessDpPerPort; + #include <trex_stream.h> class TrexStatelessCpToDpMsgBase; @@ -387,6 +389,133 @@ private: static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" ); +/* this is a event for PCAP transmitting */ +struct CGenNodePCAP : public CGenNodeBase { +friend class TrexStatelessDpPerPort; + +public: + + /** + * creates a node from a PCAP file + */ + bool create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr); + void destroy(); + + /** + * advance - will read the next packet + * + * @author imarom (03-May-16) + */ + void next() { + assert(m_state == PCAP_ACTIVE); + + /* save the previous packet time */ + m_last_pkt_time = m_raw_packet->get_time(); + + /* advance */ + if ( m_reader->ReadPacket(m_raw_packet) == false ){ + m_state = PCAP_EOF; + return; + } + + } + + /** + * return true if the PCAP has next packet + * + */ + bool has_next() { + assert(m_state != PCAP_INVALID); + return (m_state == PCAP_ACTIVE); + } + + /** + * return the time for the next scheduling for a packet + * + */ + inline double get_ipg() { + assert(m_state != PCAP_INVALID); + return m_raw_packet->get_time() - m_last_pkt_time; + //return 0.00001; + } + + /** + * get the current packet as MBUF + * + */ + inline rte_mbuf_t *get_pkt() { + assert(m_state != PCAP_INVALID); + + rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m_raw_packet->getTotalLen()); + assert(m); + + char *p = rte_pktmbuf_append(m, m_raw_packet->getTotalLen()); + assert(p); + /* copy the packet */ + memcpy(p, m_raw_packet->raw, m_raw_packet->getTotalLen()); + + /* fix the MAC */ + memcpy(p, m_mac_addr, 12); + + return (m); + } + + + inline void handle(CFlowGenListPerThread *thread) { + assert(m_state != PCAP_INVALID); + thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + } + + void set_mbuf_dir(pkt_dir_t dir) { + if (dir) { + m_flags |=NODE_FLAGS_DIR; + }else{ + m_flags &=~NODE_FLAGS_DIR; + } + } + + inline pkt_dir_t get_mbuf_dir(){ + return ((pkt_dir_t)( m_flags &1)); + } + + uint8_t get_port_id() { + return m_port_id; + } + +private: + + enum { + PCAP_INVALID = 0, + PCAP_ACTIVE, + PCAP_EOF + }; + + /* cache line 0 */ + /* important stuff here */ + uint8_t m_mac_addr[12]; + uint8_t m_state; + + //double m_base_time; + //double m_current_pkt_time; + double m_last_pkt_time; + + void * m_cache_mbuf; + + double m_next_time_offset; /* in sec */ + + CCapReaderBase *m_reader; + CCapPktRaw *m_raw_packet; + + uint8_t m_port_id; + + /* pad to match the size of CGenNode */ + uint8_t m_pad_end[25]; + +} __rte_cache_aligned; + + +static_assert(sizeof(CGenNodePCAP) == sizeof(CGenNode), "sizeof(CGenNodePCAP) != sizeof(CGenNode)" ); #endif /* __TREX_STREAM_NODE_H__ */ + diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 7edf0f13..59e0a0a8 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -181,6 +181,24 @@ TrexStatelessDpUpdate::clone() { return new_msg; } + +/************************* + push PCAP message + ************************/ +bool +TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) { + dp_core->push_pcap(m_port_id, m_event_id, m_pcap_filename); + return true; +} + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpPushPCAP::clone() { + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id, m_event_id, m_pcap_filename); + + return new_msg; +} + + /************************* barrier message ************************/ @@ -203,7 +221,7 @@ TrexStatelessDpBarrier::clone() { bool TrexDpPortEventMsg::handle() { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id); - port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id); + port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id, get_status()); return (true); } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 0eed01bd..8fb2a456 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -246,6 +246,29 @@ private: double m_factor; }; + +/** + * psuh a PCAP message + */ +class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpPushPCAP(uint8_t port_id, int event_id, const std::string &pcap_filename) : m_pcap_filename(pcap_filename) { + m_port_id = port_id; + m_event_id = event_id; + } + + virtual bool handle(TrexStatelessDpCore *dp_core); + + virtual TrexStatelessCpToDpMsgBase * clone(); + +private: + std::string m_pcap_filename; + uint8_t m_port_id; + int m_event_id; +}; + + /** * barrier message for DP core * @@ -267,6 +290,7 @@ private: int m_event_id; }; + /************************* messages from DP to CP **********************/ /** @@ -303,10 +327,11 @@ public: class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase { public: - TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) { - m_thread_id = thread_id; - m_port_id = port_id; - m_event_id = event_id; + TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id, bool status = true) { + m_thread_id = thread_id; + m_port_id = port_id; + m_event_id = event_id; + m_status = status; } virtual bool handle(); @@ -323,10 +348,15 @@ public: return m_event_id; } + bool get_status() { + return m_status; + } + private: int m_thread_id; uint8_t m_port_id; int m_event_id; + bool m_status; }; |