From d0c838e0abd0b39df54547623b2fa87fcc8aa807 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 24 Nov 2016 16:58:34 +0200 Subject: self code review Signed-off-by: imarom --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 2 +- src/rpc-server/commands/trex_rpc_cmd_stream.cpp | 3 +- src/stateless/cp/trex_stateless_port.cpp | 6 +- src/stateless/cp/trex_stateless_port.h | 4 +- .../messaging/trex_stateless_messaging.cpp | 2 +- src/stateless/messaging/trex_stateless_messaging.h | 6 +- src/stateless/rx/trex_stateless_rx_core.h | 2 +- src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 111 +++++++++++++- src/stateless/rx/trex_stateless_rx_port_mngr.h | 162 +++++++-------------- 9 files changed, 172 insertions(+), 126 deletions(-) (limited to 'src') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 14b38165..11dd99bd 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -812,7 +812,7 @@ TrexRpcCmdGetRxQueuePkts::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); try { - const RxPacketBuffer *pkt_buffer = port->get_rx_queue_pkts(); + const RXPacketBuffer *pkt_buffer = port->get_rx_queue_pkts(); if (pkt_buffer) { result["result"]["pkts"] = pkt_buffer->to_json(); } else { diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index d4e900ac..9a57c5f9 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -670,6 +670,7 @@ TrexRpcCmdStartTraffic::_run(const Json::Value ¶ms, Json::Value &result) { generate_parse_err(result, "start message can only specify absolute speed rate"); } + dsec_t ts = now_sec(); TrexPortMultiplier mul(type, op, value); try { @@ -680,7 +681,7 @@ TrexRpcCmdStartTraffic::_run(const Json::Value ¶ms, Json::Value &result) { } result["result"]["multiplier"] = port->get_multiplier(); - result["result"]["ts"] = now_sec(); + result["result"]["ts"] = ts; return (TREX_RPC_CMD_OK); } diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index d4bc5c36..75530ea3 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -967,7 +967,7 @@ TrexStatelessPort::stop_rx_queue() { } -RxPacketBuffer * +RXPacketBuffer * TrexStatelessPort::get_rx_queue_pkts() { if (m_rx_features_info.m_rx_queue_info.is_empty()) { @@ -975,12 +975,12 @@ TrexStatelessPort::get_rx_queue_pkts() { } /* ask RX core for the pkt queue */ - TrexStatelessMsgReply msg_reply; + TrexStatelessMsgReply msg_reply; TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxQueueGetPkts(m_port_id, msg_reply); send_message_to_rx(msg); - RxPacketBuffer *pkt_buffer = msg_reply.wait_for_reply(); + RXPacketBuffer *pkt_buffer = msg_reply.wait_for_reply(); return pkt_buffer; } diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index cf6b2716..4aa7ff36 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -31,7 +31,7 @@ class TrexStatelessCpToDpMsgBase; class TrexStatelessCpToRxMsgBase; class TrexStreamsGraphObj; class TrexPortMultiplier; -class RxPacketBuffer; +class RXPacketBuffer; /** @@ -406,7 +406,7 @@ public: * fetch the RX queue packets from the queue * */ - RxPacketBuffer *get_rx_queue_pkts(); + RXPacketBuffer *get_rx_queue_pkts(); /** * return the port attribute object diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index c2182f3c..a8fb7ba9 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -288,7 +288,7 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) { - RxPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); + RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); assert(pkt_buffer); m_reply.set(pkt_buffer); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 52b1662e..ed2ec90e 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -32,7 +32,7 @@ class TrexStatelessDpCore; class CRxCoreStateless; class TrexStreamsCompiledObj; class CFlowGenListPerThread; -class RxPacketBuffer; +class RXPacketBuffer; /** * defines the base class for CP to DP messages @@ -524,7 +524,7 @@ private: class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply &reply) : m_reply(reply) { + TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply &reply) : m_reply(reply) { m_port_id = port_id; } @@ -536,7 +536,7 @@ public: private: uint8_t m_port_id; - TrexStatelessMsgReply &m_reply; + TrexStatelessMsgReply &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 519724d8..9df36310 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -111,7 +111,7 @@ class CRxCoreStateless { double get_cpu_util(); void update_cpu_util(); - RxPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { + RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index 2683dbe1..46fec432 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -23,8 +23,29 @@ #include "common/captureFile.h" #include "trex_stateless_rx_core.h" -/************************** latency feature ************/ -void RXLatency::handle_pkt(const rte_mbuf_t *m) { +/************************************** + * latency RX feature + * + *************************************/ +RXLatency::RXLatency() { + m_rcv_all = false; + m_rfc2544 = NULL; + m_err_cntrs = NULL; + + for (int i = 0; i < MAX_FLOW_STATS; i++) { + m_rx_pg_stat[i].clear(); + m_rx_pg_stat_payload[i].clear(); + } +} + +void +RXLatency::create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) { + m_rfc2544 = rfc2544; + m_err_cntrs = err_cntrs; +} + +void +RXLatency::handle_pkt(const rte_mbuf_t *m) { CFlowStatParser parser; if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { @@ -135,6 +156,92 @@ RXLatency::reset_stats() { } } +/************************************** + * RX feature queue + * + *************************************/ + +RXPacketBuffer::RXPacketBuffer(uint64_t size, uint64_t *shared_counter) { + m_buffer = nullptr; + m_head = 0; + m_tail = 0; + m_size = (size + 1); // for the empty/full difference 1 slot reserved + m_shared_counter = shared_counter; + + /* reset the counter */ + *m_shared_counter = 0; + + /* generate queue */ + m_buffer = new RXPacket*[m_size](); // zeroed + + m_is_enabled = true; +} + +RXPacketBuffer::~RXPacketBuffer() { + assert(m_buffer); + + while (!is_empty()) { + RXPacket *pkt = pop(); + delete pkt; + } + delete [] m_buffer; +} + +RXPacketBuffer * +RXPacketBuffer::freeze_and_clone() { + /* create a new one - same size and shared counter 0 */ + RXPacketBuffer *new_buffer = new RXPacketBuffer(m_size, m_shared_counter); + + /* freeze the current */ + m_shared_counter = NULL; + m_is_enabled = false; + + return new_buffer; +} + +void +RXPacketBuffer::handle_pkt(const rte_mbuf_t *m) { + assert(m_is_enabled); + + /* if full - pop the oldest */ + if (is_full()) { + delete pop(); + } + + /* push packet */ + m_buffer[m_head] = new RXPacket(m); + m_head = next(m_head); + + /* update the shared counter - control plane memory */ + (*m_shared_counter)++; +} + +RXPacket * +RXPacketBuffer::pop() { + assert(m_is_enabled); + assert(!is_empty()); + + RXPacket *pkt = m_buffer[m_tail]; + m_tail = next(m_tail); + (*m_shared_counter)--; + return pkt; +} + +Json::Value +RXPacketBuffer::to_json() const { + + Json::Value output = Json::arrayValue; + + int tmp = m_tail; + while (tmp != m_head) { + RXPacket *pkt = m_buffer[tmp]; + output.append(pkt->to_json()); + tmp = next(tmp); + } + + return output; +} + /****************************** packet recorder ****************************/ RXPacketRecorder::RXPacketRecorder() { diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index fd023ea9..9df42039 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -27,30 +27,21 @@ #include "common/captureFile.h" -/************************* latency ***********************/ class CPortLatencyHWBase; class CRFC2544Info; class CRxCoreErrCntrs; +/************************************** + * RX feature latency + * + *************************************/ class RXLatency { public: - RXLatency() { - m_rcv_all = false; - m_rfc2544 = NULL; - m_err_cntrs = NULL; + RXLatency(); - for (int i = 0; i < MAX_FLOW_STATS; i++) { - m_rx_pg_stat[i].clear(); - m_rx_pg_stat_payload[i].clear(); - } - } - - void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) { - m_rfc2544 = rfc2544; - m_err_cntrs = err_cntrs; - } + void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs); void reset_stats(); @@ -81,26 +72,24 @@ public: CRxCoreErrCntrs *m_err_cntrs; }; -/************************ queue ***************************/ - /** * describes a single saved RX packet * */ -class RxPacket { +class RXPacket { public: - RxPacket(const rte_mbuf_t *m) { + RXPacket(const rte_mbuf_t *m) { /* assume single part packet */ assert(m->nb_segs == 1); m_size = m->pkt_len; const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *); - m_raw = (uint8_t *)malloc(m_size); + m_raw = new uint8_t[m_size]; memcpy(m_raw, p, m_size); - /* save the packet timestamp */ + /* generate a packet timestamp */ m_timestamp = now_sec(); } @@ -112,9 +101,9 @@ public: return output; } - ~RxPacket() { + ~RXPacket() { if (m_raw) { - delete m_raw; + delete [] m_raw; } } @@ -125,48 +114,36 @@ private: dsec_t m_timestamp; }; -/** - * a simple cyclic buffer to hold RX packets - * - */ -class RxPacketBuffer { -public: - - RxPacketBuffer(uint64_t size, uint64_t *shared_counter) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_size = (size + 1); // for the empty/full difference 1 slot reserved - m_shared_counter = shared_counter; - *m_shared_counter = 0; - - m_buffer = new RxPacket*[m_size](); // zeroed - - m_is_enabled = true; - } +/************************************** + * RX feature queue + * + *************************************/ - ~RxPacketBuffer() { - assert(m_buffer); +class RXPacketBuffer { +public: - while (!is_empty()) { - RxPacket *pkt = pop(); - delete pkt; - } - delete [] m_buffer; - } + RXPacketBuffer(uint64_t size, uint64_t *shared_counter); + ~RXPacketBuffer(); - /* freeze the data structure - no more packets can be pushed / poped */ - RxPacketBuffer * freeze_and_clone() { - /* create a new one */ - RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter); + /** + * handle a new packet + * + */ + void handle_pkt(const rte_mbuf_t *m); + + /** + * freezes the queue and clones a new one + * + */ + RXPacketBuffer * freeze_and_clone(); - /* freeze the current */ - m_shared_counter = NULL; - m_is_enabled = false; + /** + * generate a JSON output of the queue + * + */ + Json::Value to_json() const; - return new_buffer; - } bool is_empty() const { return (m_head == m_tail); @@ -176,72 +153,33 @@ public: return ( next(m_head) == m_tail); } - void handle_pkt(const rte_mbuf_t *m) { - assert(m_is_enabled); - - /* if full - pop the oldest */ - if (is_full()) { - delete pop(); - } - - (*m_shared_counter)++; - - m_buffer[m_head] = new RxPacket(m); - m_head = next(m_head); - } - - /** - * generate a JSON output of the queue - * - */ - Json::Value to_json() const { - - Json::Value output = Json::arrayValue; - - int tmp = m_tail; - while (tmp != m_head) { - RxPacket *pkt = m_buffer[tmp]; - output.append(pkt->to_json()); - tmp = next(tmp); - } - - return output; - } - private: int next(int v) const { return ( (v + 1) % m_size ); } /* pop in case of full queue - internal usage */ - RxPacket * pop() { - assert(m_is_enabled); - assert(!is_empty()); - - RxPacket *pkt = m_buffer[m_tail]; - m_tail = next(m_tail); - (*m_shared_counter)--; - return pkt; - } + RXPacket * pop(); int m_head; int m_tail; int m_size; - RxPacket **m_buffer; + RXPacket **m_buffer; uint64_t *m_shared_counter; bool m_is_enabled; }; -/************************ recoder ***************************/ -/** - * RX packet recorder to PCAP file +/************************************** + * RX feature PCAP recorder * - */ + *************************************/ + class RXPacketRecorder { public: RXPacketRecorder(); ~RXPacketRecorder(); + void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter); void stop(); void handle_pkt(const rte_mbuf_t *m); @@ -251,7 +189,7 @@ private: CCapPktRaw m_pkt; dsec_t m_epoch; uint64_t m_limit; - uint64_t *m_shared_counter; + uint64_t *m_shared_counter; }; @@ -265,9 +203,9 @@ private: class RXPortManager { public: enum features_t { - LATENCY = 0x1, + LATENCY = 0x1, CAPTURE = 0x2, - QUEUE = 0x4 + QUEUE = 0x4 }; RXPortManager() { @@ -328,7 +266,7 @@ public: if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RxPacketBuffer(size, shared_counter); + m_pkt_buffer = new RXPacketBuffer(size, shared_counter); set_feature(QUEUE); } @@ -340,7 +278,7 @@ public: unset_feature(QUEUE); } - RxPacketBuffer *get_pkt_buffer() { + RXPacketBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { return NULL; } @@ -348,7 +286,7 @@ public: assert(m_pkt_buffer); /* hold a pointer to the old one */ - RxPacketBuffer *old_buffer = m_pkt_buffer; + RXPacketBuffer *old_buffer = m_pkt_buffer; /* replace the old one with a new one and freeze the old */ m_pkt_buffer = old_buffer->freeze_and_clone(); @@ -407,7 +345,7 @@ private: uint32_t m_features; RXPacketRecorder m_recorder; RXLatency m_latency; - RxPacketBuffer *m_pkt_buffer; + RXPacketBuffer *m_pkt_buffer; CCpuUtlDp *m_cpu_dp_u; CPortLatencyHWBase *m_io; }; -- cgit 1.2.3-korg