From ac2e93d4247b2db94cd07301b274336bb08dec46 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 11 Jan 2017 18:19:47 +0200 Subject: capture - draft commit Signed-off-by: imarom --- src/stateless/cp/trex_stateless.cpp | 31 +--- src/stateless/cp/trex_stateless.h | 12 +- src/stateless/cp/trex_stateless_port.h | 15 +- .../messaging/trex_stateless_messaging.cpp | 46 ++++- src/stateless/messaging/trex_stateless_messaging.h | 54 ++++-- src/stateless/rx/trex_stateless_capture.cpp | 142 ++++++++++++-- src/stateless/rx/trex_stateless_capture.h | 205 +++++++++++++++++++-- src/stateless/rx/trex_stateless_rx_core.cpp | 14 -- src/stateless/rx/trex_stateless_rx_core.h | 10 +- 9 files changed, 413 insertions(+), 116 deletions(-) (limited to 'src/stateless') diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 32babbf7..6ab9b417 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -19,7 +19,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -//#include #include #include @@ -142,35 +141,11 @@ TrexStateless::get_dp_core_count() { return m_platform_api->get_dp_core_count(); } -capture_id_t -TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) { - static MsgReply reply; - - reply.reset(); - - CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); - TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply); - - ring->Enqueue((CGenNode *)msg); - - capture_id_t new_id = reply.wait_for_reply(); - - return (new_id); -} +void +TrexStateless::send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const { -capture_id_t -TrexStateless::stop_capture(capture_id_t capture_id) { - static MsgReply reply; - - reply.reset(); - CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); - TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply); - ring->Enqueue((CGenNode *)msg); - - capture_id_t rc = reply.wait_for_reply(); - - return (rc); } + diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 33f16ce9..87d227f6 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -102,7 +102,6 @@ public: * defines the TRex stateless operation mode * */ -class CaptureFilter; class TrexStateless { public: @@ -133,16 +132,9 @@ public: /** - * starts a capture on a 'filter' of ports - * with a limit of packets + * send a message to the RX core */ - capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit); - - /** - * stops an active capture - * - */ - capture_id_t stop_capture(capture_id_t capture_id); + void send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const; /** * shutdown the server diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 2cc1b9ca..0ef8ae60 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -140,7 +140,7 @@ public: } if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { - throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists"); + throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists"); } m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW); @@ -439,19 +439,6 @@ public: void get_pci_info(std::string &pci_addr, int &numa_node); - - /** - * starts capturing packets - * - */ - void start_capture(capture_mode_e mode, uint64_t limit); - - /** - * stops capturing packets - * - */ - void stop_capture(); - /** * start RX queueing of packets * diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index f441c692..b9bb1d1c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -262,24 +262,58 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool -TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter); +TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStart start_rc; + + TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc); + + /* mark as done */ + m_reply.set_reply(start_rc); + + return true; +} + +bool +TrexStatelessRxCaptureStop::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStop stop_rc; + + TrexStatelessCaptureMngr::getInstance().stop(m_capture_id, stop_rc); + + /* mark as done */ + m_reply.set_reply(stop_rc); + + return true; +} +bool +TrexStatelessRxCaptureFetch::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCFetch fetch_rc; + + TrexStatelessCaptureMngr::getInstance().fetch(m_capture_id, m_pkt_limit, fetch_rc); + /* mark as done */ - m_reply.set_reply(capture_id); + m_reply.set_reply(fetch_rc); return true; } bool -TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { - capture_id_t rc = rx_core->stop_capture(m_capture_id); +TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStatus status_rc; - m_reply.set_reply(rc); + status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json()); + + /* mark as done */ + m_reply.set_reply(status_rc); return true; } + bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { rx_core->start_queue(m_port_id, m_size); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 5f4978f5..4027d075 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -485,12 +485,16 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { }; +class TrexStatelessRxCapture : public TrexStatelessCpToRxMsgBase { +public: + virtual bool handle (CRxCoreStateless *rx_core) = 0; +}; -class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture { public: - TrexStatelessRxStartCapture(const CaptureFilter& filter, + TrexStatelessRxCaptureStart(const CaptureFilter& filter, uint64_t limit, - MsgReply &reply) : m_reply(reply) { + MsgReply &reply) : m_reply(reply) { m_limit = limit; m_filter = filter; @@ -499,24 +503,52 @@ public: virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - uint64_t m_limit; - CaptureFilter m_filter; - MsgReply &m_reply; + uint8_t m_port_id; + uint64_t m_limit; + CaptureFilter m_filter; + MsgReply &m_reply; +}; + + +class TrexStatelessRxCaptureStop : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStop(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + m_capture_id = capture_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + MsgReply &m_reply; }; -class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureFetch : public TrexStatelessRxCapture { public: - TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + TrexStatelessRxCaptureFetch(capture_id_t capture_id, uint32_t pkt_limit, MsgReply &reply) : m_reply(reply) { m_capture_id = capture_id; + m_pkt_limit = pkt_limit; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + uint32_t m_pkt_limit; + MsgReply &m_reply; +}; + + +class TrexStatelessRxCaptureStatus : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStatus(MsgReply &reply) : m_reply(reply) { } virtual bool handle(CRxCoreStateless *rx_core); private: - capture_id_t m_capture_id; - MsgReply &m_reply; + MsgReply &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 4ed126cc..85be7aef 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -25,6 +25,7 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons m_id = id; m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); m_filter = filter; + m_state = STATE_ACTIVE; } TrexStatelessCapture::~TrexStatelessCapture() { @@ -35,9 +36,15 @@ TrexStatelessCapture::~TrexStatelessCapture() { void TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { + + if (m_state != STATE_ACTIVE) { + delete pkt; + return; + } /* if not in filter - back off */ if (!m_filter.in_filter(pkt)) { + delete pkt; return; } @@ -46,6 +53,11 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { void TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { + + if (m_state != STATE_ACTIVE) { + return; + } + if (!m_filter.in_rx(port)) { return; } @@ -53,6 +65,56 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { m_pkt_buffer->push(m); } + +Json::Value +TrexStatelessCapture::to_json() const { + Json::Value output = Json::objectValue; + + output["id"] = Json::UInt64(m_id); + output["filter"] = m_filter.to_json(); + output["count"] = m_pkt_buffer->get_element_count(); + output["bytes"] = m_pkt_buffer->get_bytes(); + output["limit"] = m_pkt_buffer->get_capacity(); + + switch (m_state) { + case STATE_ACTIVE: + output["state"] = "ACTIVE"; + break; + + case STATE_STOPPED: + output["state"] = "STOPPED"; + break; + + default: + assert(0); + + } + + return output; +} + +TrexPktBuffer * +TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { + + /* if the total sum of packets is within the limit range - take it */ + if (m_pkt_buffer->get_element_count() <= pkt_limit) { + TrexPktBuffer *current = m_pkt_buffer; + m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode()); + pending = 0; + return current; + } + + /* harder part - partial fetch */ + TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit); + for (int i = 0; i < pkt_limit; i++) { + const TrexPkt *pkt = m_pkt_buffer->pop(); + partial->push(pkt); + } + + pending = m_pkt_buffer->get_element_count(); + return partial; +} + void TrexStatelessCaptureMngr::update_global_filter() { CaptureFilter new_filter; @@ -64,11 +126,25 @@ TrexStatelessCaptureMngr::update_global_filter() { m_global_filter = new_filter; } -capture_id_t -TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { +TrexStatelessCapture * +TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return m_captures[i]; + } + } + + /* does not exist */ + return nullptr; +} + +void +TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) { + if (m_captures.size() > MAX_CAPTURE_SIZE) { - return CAPTURE_TOO_MANY_CAPTURES; + rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED); + return; } @@ -79,15 +155,46 @@ TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { /* update global filter */ update_global_filter(); - return new_id; + /* result */ + rc.set_new_id(new_id); +} + +void +TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + capture->stop(); + rc.set_count(capture->get_pkt_count()); } -capture_id_t -TrexStatelessCaptureMngr::remove(capture_id_t id) { +void +TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + if (capture->is_active()) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_FETCH_UNDER_ACTIVE); + return; + } + uint32_t pending = 0; + TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); + + rc.set_pkt_buffer(pkt_buffer, pending); +} + +void +TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) { + int index = -1; for (int i = 0; i < m_captures.size(); i++) { - if (m_captures[i]->get_id() == id) { + if (m_captures[i]->get_id() == capture_id) { index = i; break; } @@ -95,24 +202,26 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) { /* does not exist */ if (index == -1) { - return CAPTURE_ID_NOT_FOUND; + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; } TrexStatelessCapture *capture = m_captures[index]; m_captures.erase(m_captures.begin() + index); + /* free memory */ delete capture; /* update global filter */ update_global_filter(); - - return id; } void TrexStatelessCaptureMngr::reset() { + TrexCaptureRCRemove dummy; + while (m_captures.size() > 0) { - remove(m_captures[0]->get_id()); + remove(m_captures[0]->get_id(), dummy); } } @@ -130,3 +239,14 @@ TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port) } } +Json::Value +TrexStatelessCaptureMngr::to_json() const { + Json::Value lst = Json::arrayValue; + + for (TrexStatelessCapture *capture : m_captures) { + lst.append(capture->to_json()); + } + + return lst; +} + diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 4d0b6a78..6cd25a94 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -22,7 +22,146 @@ limitations under the License. #define __TREX_STATELESS_CAPTURE_H__ #include +#include + #include "trex_stateless_pkt.h" +#include "trex_stateless_capture_msg.h" + +typedef int64_t capture_id_t; + +class TrexCaptureRC { +public: + + TrexCaptureRC() { + m_rc = RC_INVALID; + m_pkt_buffer = NULL; + } + + enum rc_e { + RC_INVALID = 0, + RC_OK = 1, + RC_CAPTURE_NOT_FOUND, + RC_CAPTURE_LIMIT_REACHED, + RC_CAPTURE_FETCH_UNDER_ACTIVE + }; + + bool operator !() const { + return (m_rc != RC_OK); + } + + std::string get_err() const { + assert(m_rc != RC_INVALID); + + switch (m_rc) { + case RC_OK: + return ""; + case RC_CAPTURE_LIMIT_REACHED: + return "capture limit has reached"; + case RC_CAPTURE_NOT_FOUND: + return "capture ID not found"; + case RC_CAPTURE_FETCH_UNDER_ACTIVE: + return "fetch command cannot be executed on an active capture"; + default: + assert(0); + } + } + + void set_err(rc_e rc) { + m_rc = rc; + } + + Json::Value get_json() const { + return m_json_rc; + } + +public: + rc_e m_rc; + capture_id_t m_capture_id; + TrexPktBuffer *m_pkt_buffer; + Json::Value m_json_rc; +}; + +class TrexCaptureRCStart : public TrexCaptureRC { +public: + + void set_new_id(capture_id_t new_id) { + m_capture_id = new_id; + m_rc = RC_OK; + } + + capture_id_t get_new_id() const { + return m_capture_id; + } + +private: + capture_id_t m_capture_id; +}; + + +class TrexCaptureRCStop : public TrexCaptureRC { +public: + void set_count(uint32_t pkt_count) { + m_pkt_count = pkt_count; + m_rc = RC_OK; + } + + uint32_t get_pkt_count() const { + return m_pkt_count; + } + +private: + uint32_t m_pkt_count; +}; + +class TrexCaptureRCFetch : public TrexCaptureRC { +public: + + TrexCaptureRCFetch() { + m_pkt_buffer = nullptr; + m_pending = 0; + } + + void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_rc = RC_OK; + } + + const TrexPktBuffer *get_pkt_buffer() const { + return m_pkt_buffer; + } + + uint32_t get_pending() const { + return m_pending; + } + +private: + const TrexPktBuffer *m_pkt_buffer; + uint32_t m_pending; +}; + +class TrexCaptureRCRemove : public TrexCaptureRC { +public: + void set_ok() { + m_rc = RC_OK; + } +}; + +class TrexCaptureRCStatus : public TrexCaptureRC { +public: + + void set_status(const Json::Value &json) { + m_json = json; + m_rc = RC_OK; + } + + const Json::Value & get_status() const { + return m_json; + } + +private: + Json::Value m_json; +}; /** * capture filter @@ -82,20 +221,27 @@ public: return *this; } + Json::Value to_json() const { + Json::Value output = Json::objectValue; + output["tx"] = Json::UInt64(m_tx_active); + output["rx"] = Json::UInt64(m_rx_active); + + return output; + } + private: uint64_t m_tx_active; uint64_t m_rx_active; }; -typedef int64_t capture_id_t; -enum { - CAPTURE_ID_NOT_FOUND = -1, - CAPTURE_TOO_MANY_CAPTURES = -2, -}; class TrexStatelessCapture { public: + enum state_e { + STATE_ACTIVE, + STATE_STOPPED, + }; TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); @@ -112,7 +258,24 @@ public: return m_filter; } + Json::Value to_json() const; + + void stop() { + m_state = STATE_STOPPED; + } + + TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending); + + bool is_active() const { + return m_state == STATE_ACTIVE; + } + + uint32_t get_pkt_count() const { + return m_pkt_buffer->get_element_count(); + } + private: + state_e m_state; TrexPktBuffer *m_pkt_buffer; CaptureFilter m_filter; uint64_t m_id; @@ -134,18 +297,28 @@ public: } /** - * adds a capture buffer - * returns ID + * starts a new capture */ - capture_id_t add(uint64_t limit, const CaptureFilter &filter); + void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc); - /** - * stops capture mode - * on success, will return the ID of the removed one - * o.w it will be an error + * stops an existing capture + * */ - capture_id_t remove(capture_id_t id); + void stop(capture_id_t capture_id, TrexCaptureRCStop &rc); + + /** + * fetch packets from an existing capture + * + */ + void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc); + + /** + * removes an existing capture + * all packets captured will be detroyed + */ + void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc); + /** * removes all captures @@ -153,6 +326,7 @@ public: */ void reset(); + /** * return true if any filter is active * @@ -182,6 +356,8 @@ public: handle_pkt_rx_slow_path(m, port); } + Json::Value to_json() const; + private: TrexStatelessCaptureMngr() { @@ -189,6 +365,9 @@ private: m_id_counter = 1; } + + TrexStatelessCapture * lookup(capture_id_t capture_id); + void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); void update_global_filter(); diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f1ba303a..00c18082 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -270,10 +270,6 @@ void CRxCoreStateless::start() { m_monitor.disable(); } -void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) { - -} - int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) { int total_pkts = 0; @@ -332,16 +328,6 @@ double CRxCoreStateless::get_cpu_util() { } -capture_id_t -CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) { - return TrexStatelessCaptureMngr::getInstance().add(limit, filter); -} - -capture_id_t -CRxCoreStateless::stop_capture(capture_id_t capture_id) { - return TrexStatelessCaptureMngr::getInstance().remove(capture_id); -} - void CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) { m_rx_port_mngr[port_id].start_queue(size); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 21ed51ba..954a5f04 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -131,14 +131,7 @@ class CRxCoreStateless { const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } - - /** - * start capturing packets - * - */ - capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter); - capture_id_t stop_capture(capture_id_t capture_id); - + /** * start RX queueing of packets * @@ -175,7 +168,6 @@ class CRxCoreStateless { void recalculate_next_state(); bool are_any_features_active(); - void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); void handle_grat_arp(); -- cgit 1.2.3-korg