path: root/src/stateless/dp
diff options
Diffstat (limited to 'src/stateless/dp')
2 files changed, 215 insertions, 100 deletions
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 3755b82c..73387f0e 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -18,118 +18,148 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
#include <trex_stateless_dp_core.h>
-#include <stdio.h>
-#include <unistd.h>
-#include <trex_stateless.h>
+#include <trex_stateless_messaging.h>
+#include <trex_streams_compiler.h>
#include <bp_sim.h>
+ * extended info for the stateless node
+ * TODO:
+ * static_assert(sizeof(dp_node_extended_info_st) <= sizeof(CGenNodeStateless::m_pad_end), "hello");
+ */
+typedef struct dp_node_extended_info_ {
+ double next_time_offset;
+ uint8_t is_stream_active;
-// DPDK c++ issue
-#define UINT8_MAX 255
-#define UINT16_MAX 0xFFFF
-// DPDK c++ issue
+} dp_node_extended_info_st;
-#include <rte_ethdev.h>
-#include "mbuf.h"
+TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core) {
+ m_thread_id = thread_id;
+ m_core = core;
- * TEST
- *
- */
-static const uint8_t udp_pkt[]={
- 0x00,0x00,0x00,0x01,0x00,0x00,
- 0x00,0x00,0x00,0x01,0x00,0x00,
- 0x08,0x00,
- 0x45,0x00,0x00,0x81,
- 0xaf,0x7e,0x00,0x00,
- 0x12,0x11,0xd9,0x23,
- 0x01,0x01,0x01,0x01,
- 0x3d,0xad,0x72,0x1b,
- 0x11,0x11,
- 0x11,0x11,
- 0x00,0x6d,
- 0x00,0x00,
- 0x64,0x31,0x3a,0x61,
- 0x64,0x32,0x3a,0x69,0x64,
- 0x32,0x30,0x3a,0xd0,0x0e,
- 0xa1,0x4b,0x7b,0xbd,0xbd,
- 0x16,0xc6,0xdb,0xc4,0xbb,0x43,
- 0xf9,0x4b,0x51,0x68,0x33,0x72,
- 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f,
- 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3,
- 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f,
- 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39,
- 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31,
- 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d,
- 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d,
- 0xe7
-static int
-test_inject_pkt(uint8_t *pkt, uint32_t pkt_size) {
- rte_mempool_t * mp= CGlobalInfo::m_mem_pool[0].m_big_mbuf_pool ;
- #else
- rte_mempool_t * mp = NULL;
- #endif
- rte_mbuf_t *m = rte_pktmbuf_alloc(mp);
- if ( unlikely(m==0) ) {
- printf("ERROR no packets \n");
- return (-1);
+ CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
+ m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
+ m_ring_to_cp = cp_dp->getRingDpToCp(thread_id);
+ m_state = STATE_IDLE;
+TrexStatelessDpCore::start() {
+ /* creates a maintenace job using the scheduler */
+ CGenNode * node_sync = m_core->create_node() ;
+ node_sync->m_type = CGenNode::FLOW_SYNC;
+ node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
+ m_core->m_node_gen.add_node(node_sync);
+ double old_offset = 0.0;
+ m_core->m_node_gen.flush_file(100000000, 0.0, false, m_core, old_offset);
+TrexStatelessDpCore::handle_pkt_event(CGenNode *node) {
+ //TODO: optimize the fast path here...
+ CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
+ dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node_sl->get_opaque_storage();
+ /* is this stream active ? */
+ if (!opaque->is_stream_active) {
+ m_core->free_node(node);
+ return;
+ m_core->m_node_gen.m_v_if->send_node(node);
+ /* in case of continues */
+ node->m_time += opaque->next_time_offset;
+ /* insert a new event */
+ m_core->m_node_gen.m_p_queue.push(node);
+TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len) {
+ CGenNodeStateless *node = m_core->create_node_sl();
+ /* add periodic */
+ node->m_type = CGenNode::STATELESS_PKT;
+ node->m_time = m_core->m_cur_time_sec + 0.0 /* STREAM ISG */;
+ node->m_flags = 0;
+ /* set socket id */
+ node->set_socket_id(m_core->m_node_gen.m_socket_id);
+ /* build a mbuf from a packet */
+ uint16_t pkt_size = pkt_len;
+ const uint8_t *stream_pkt = pkt;
+ dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node->get_opaque_storage();
+ opaque->next_time_offset = 1.0 / pps;
+ opaque->is_stream_active = 1;
+ /* allocate const mbuf */
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
+ assert(m);
char *p = rte_pktmbuf_append(m, pkt_size);
- /* set pkt data */
- memcpy(p,pkt,pkt_size);
+ /* copy the packet */
+ memcpy(p,stream_pkt,pkt_size);
- rte_mbuf_t *tx_pkts[32];
- tx_pkts[0] = m;
- uint8_t nb_pkts = 1;
- uint16_t ret = rte_eth_tx_burst(0, 0, tx_pkts, nb_pkts);
- (void)ret;
- rte_pktmbuf_free(m);
+ /* set dir 0 or 1 client or server */
+ pkt_dir_t dir = 0;
+ node->set_mbuf_cache_dir(dir);
- return (0);
+ /* TBD repace the mac if req we should add flag */
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
-static int
- return (test_inject_pkt((uint8_t*)udp_pkt,sizeof(udp_pkt)));
+ /* set the packet as a readonly */
+ node->set_cache_mbuf(m);
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+ /* keep track */
+ m_active_nodes.push_back(node);
+ /* schedule */
+ m_core->m_node_gen.add_node((CGenNode *)node);
-TrexStatelessDpCore::test_inject_dummy_pkt() {
- test_inject_udp_pkt();
+TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
+ for (auto single_stream : obj->get_objects()) {
+ add_cont_stream(single_stream.m_pps, single_stream.m_pkt, single_stream.m_pkt_len);
+ }
- * DP core
- *
- **************************/
-TrexStatelessDpCore::TrexStatelessDpCore(uint8_t core_id) : m_core_id(core_id) {
+TrexStatelessDpCore::stop_traffic() {
+ /* we cannot remove nodes not from the top of the queue so
+ for every active node - make sure next time
+ the scheduler invokes it, it will be free */
+ for (auto node : m_active_nodes) {
+ dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node->get_opaque_storage();
+ opaque->is_stream_active = 0;
+ }
+ m_active_nodes.clear();
+ m_state = STATE_IDLE;
- * main function for DP core
+ * handle a message from CP to DP
-TrexStatelessDpCore::run() {
- printf("\nOn DP core %d\n", m_core_id);
- while (true) {
- test_inject_dummy_pkt();
- rte_pause();
- }
+TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
+ msg->handle(this);
+ delete msg;
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 4b09b752..d95f7eeb 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -21,23 +21,108 @@ limitations under the License.
-#include <stdint.h>
+#include <vector>
+#include <msg_manager.h>
+#include <pal_utl.h>
+class TrexStatelessCpToDpMsgBase;
+class TrexStatelessDpStart;
+class CFlowGenListPerThread;
+class CGenNode;
+class TrexStreamsCompiledObj;
+class CGenNodeStateless;
- * stateless DP core object
- *
- */
class TrexStatelessDpCore {
- TrexStatelessDpCore(uint8_t core_id);
+ /* states */
+ enum state_e {
+ };
+ TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core);
+ /**
+ * launch the stateless DP core code
+ *
+ */
+ void start();
+ /**
+ * handle pkt event
+ *
+ * @author imarom (27-Oct-15)
+ */
+ void handle_pkt_event(CGenNode *node);
+ /**
+ * dummy traffic creator
+ *
+ * @author imarom (27-Oct-15)
+ *
+ * @param pkt
+ * @param pkt_len
+ */
+ void start_traffic(TrexStreamsCompiledObj *obj);
+ /**
+ * stop all traffic for this core
+ *
+ */
+ void stop_traffic();
+ /**
+ * check for and handle messages from CP
+ *
+ * @author imarom (27-Oct-15)
+ */
+ void periodic_check_for_cp_messages() {
+ // doing this inline for performance reasons
- /* starts the DP core run */
- void run();
+ /* fast path */
+ if ( likely ( m_ring_from_cp->isEmpty() ) ) {
+ return;
+ }
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (m_ring_from_cp->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+ TrexStatelessCpToDpMsgBase * msg = (TrexStatelessCpToDpMsgBase *)node;
+ handle_cp_msg(msg);
+ }
+ }
- void test_inject_dummy_pkt();
- uint8_t m_core_id;
+ /**
+ * handles a CP to DP message
+ *
+ * @author imarom (27-Oct-15)
+ *
+ * @param msg
+ */
+ void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg);
+ void add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len);
+ uint8_t m_thread_id;
+ state_e m_state;
+ CNodeRing *m_ring_from_cp;
+ CNodeRing *m_ring_to_cp;
+ /* holds the current active nodes */
+ std::vector<CGenNodeStateless *> m_active_nodes;
+ /* pointer to the main object */
+ CFlowGenListPerThread *m_core;
#endif /* __TREX_STATELESS_DP_CORE_H__ */