diff options
author | imarom <imarom@cisco.com> | 2016-11-07 18:47:23 +0200 |
---|---|---|
committer | imarom <imarom@cisco.com> | 2016-11-07 18:47:23 +0200 |
commit | f9a0c5e2e1e1135cb0c0e6e192565e5b100c5a41 (patch) | |
tree | f09e19975f324d9c0eb717608473dcdbd334a608 | |
parent | e85ea75669ea39e4f99519138a3a84e4df6eed2d (diff) |
RX features - queueing
Signed-off-by: imarom <imarom@cisco.com>
-rwxr-xr-x | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py | 121 | ||||
-rw-r--r-- | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py | 91 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmd_general.cpp | 51 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 26 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 26 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 14 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 40 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 10 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 7 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_defs.h | 56 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 2 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.h | 69 |
12 files changed, 403 insertions, 110 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 2d5a6379..583917ea 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -324,26 +324,25 @@ class EventsHandler(object): # port attr changed elif (event_type == 8): - return - # port_id = int(data['port_id']) - # - # if data['attr'] == self.client.ports[port_id].attr: - # return # false alarm - # - # old_info = self.client.ports[port_id].get_formatted_info() - # self.__async_event_port_attr_changed(port_id, data['attr']) - # - # new_info = self.client.ports[port_id].get_formatted_info() - # ev = "port {0} attributes changed".format(port_id) - # for key, old_val in old_info.items(): - # new_val = new_info[key] - # if old_val != new_val: - # ev += '\n {key}: {old} -> {new}'.format( - # key = key, - # old = old_val.lower() if type(old_val) is str else old_val, - # new = new_val.lower() if type(new_val) is str else new_val) - # show_event = True + port_id = int(data['port_id']) + + if data['attr'] == self.client.ports[port_id].attr: + return # false alarm + + old_info = self.client.ports[port_id].get_formatted_info(sync = False) + self.__async_event_port_attr_changed(port_id, data['attr']) + + new_info = self.client.ports[port_id].get_formatted_info(sync = False) + ev = "port {0} attributes changed".format(port_id) + for key, old_val in old_info.items(): + new_val = new_info[key] + if old_val != new_val: + ev += '\n {key}: {old} -> {new}'.format( + key = key, + old = old_val.lower() if type(old_val) is str else old_val, + new = new_val.lower() if type(new_val) is str else new_val) + show_event = True # server stopped @@ -839,6 +838,24 @@ class STLClient(object): return rc + def __set_rx_queue (self, port_id_list, size): + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].set_rx_queue(size)) + + return rc + + def __remove_rx_queue (self, port_id_list): + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_rx_queue()) + + return rc + # connect to server def __connect(self): @@ -1754,6 +1771,7 @@ class STLClient(object): if not rc: raise STLError(rc) + @__api_check(True) def ping(self): """ @@ -1767,6 +1785,11 @@ class STLClient(object): + :exc:`STLError` """ + rc = self.set_port_attr(ports = [0, 1], rx_filter_mode = 'all') + rc = self.set_rx_queue(ports = [0, 1], size = 1000) + if not rc: + raise STLError(rc) + self.logger.pre_cmd("Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'], self.connection_info['sync_port'])) @@ -1777,6 +1800,7 @@ class STLClient(object): if not rc: raise STLError(rc) + @__api_check(True) def server_shutdown (self, force = False): """ @@ -1890,6 +1914,7 @@ class STLClient(object): link_up = True, rx_filter_mode = 'hw') self.remove_rx_sniffer(ports) + self.remove_rx_queue(ports) @@ -2737,7 +2762,8 @@ class STLClient(object): # check arguments validate_type('base_filename', base_filename, basestring) validate_type('limit', limit, (int)) - + if limit <= 0: + raise STLError("'limit' must be a positive value") self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports)) rc = self.__set_rx_sniffer(ports, base_filename, limit) @@ -2750,7 +2776,7 @@ class STLClient(object): @__api_check(True) - def remove_rx_sniffer (self, ports = None, base_filename = 'rx_capture', limit = 1000): + def remove_rx_sniffer (self, ports = None): """ Removes RX sniffer from port(s) @@ -2768,6 +2794,59 @@ class STLClient(object): if not rc: raise STLError(rc) + + @__api_check(True) + def set_rx_queue (self, ports = None, size = 1000): + """ + Sets RX queue for port(s) + The queue is cyclic and will hold last 'size' packets + + :parameters: + ports - for which ports to apply a unique sniffer (each port gets a unique file) + size - size of the queue + :raises: + + :exe:'STLError' + + """ + ports = ports if ports is not None else self.get_acquired_ports() + ports = self._validate_port_list(ports) + + # check arguments + validate_type('size', size, (int)) + if size <= 0: + raise STLError("'size' must be a positive value") + + self.logger.pre_cmd("Setting RX queue on port(s) {0}:".format(ports)) + rc = self.__set_rx_queue(ports, size) + self.logger.post_cmd(rc) + + + if not rc: + raise STLError(rc) + + + + @__api_check(True) + def remove_rx_queue (self, ports = None): + """ + Removes RX queue from port(s) + + :raises: + + :exe:'STLError' + + """ + ports = ports if ports is not None else self.get_acquired_ports() + ports = self._validate_port_list(ports) + + self.logger.pre_cmd("Removing RX queue on port(s) {0}:".format(ports)) + rc = self.__remove_rx_queue(ports) + self.logger.post_cmd(rc) + + if not rc: + raise STLError(rc) + + + def clear_events (self): """ Clear all events diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index 4e5778a6..94745d15 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -62,6 +62,7 @@ class Port(object): self.streams = {} self.profile = None self.session_id = session_id + self.status = {} self.attr = {} self.port_stats = trex_stl_stats.CPortStats(self) @@ -250,11 +251,9 @@ class Port(object): self.next_available_id = int(rc.data()['max_stream_id']) + 1 - # attributes - self.attr = rc.data()['attr'] + self.status = rc.data() - # rx info - self.rx_info = rc.data()['rx_info'] + self.attr = rc.data()['attr'] return self.ok() @@ -520,6 +519,35 @@ class Port(object): @owned + def set_rx_queue (self, size): + + params = {"handler": self.handler, + "port_id": self.port_id, + "type": "queue", + "enabled": True, + "size": size} + + rc = self.transmit("set_rx_feature", params) + if rc.bad(): + return self.err(rc.err()) + + return self.ok() + + @owned + def remove_rx_queue (self): + params = {"handler": self.handler, + "port_id": self.port_id, + "type": "queue", + "enabled": False} + + rc = self.transmit("set_rx_feature", params) + if rc.bad(): + return self.err(rc.err()) + + return self.ok() + + + @owned def pause (self): if (self.state == self.STATE_PCAP_TX) : @@ -610,9 +638,6 @@ class Port(object): if rc.bad(): return self.err(rc.err()) - - #self.attr.update(attr_dict) - return self.ok() @owned @@ -693,27 +718,30 @@ class Port(object): print("\n") # generate formatted (console friendly) port info - def get_formatted_info (self): + def get_formatted_info (self, sync = True): - # sync the attributes - self.sync() + # sync the status + if sync: + self.sync() + + attr = self.attr info = dict(self.info) info['status'] = self.get_port_state_name() - if 'link' in self.attr: - info['link'] = 'UP' if self.attr['link']['up'] else 'DOWN' + if 'link' in attr: + info['link'] = 'UP' if attr['link']['up'] else 'DOWN' else: info['link'] = 'N/A' - if 'fc' in self.attr: - info['fc'] = FLOW_CTRL_DICT_REVERSED.get(self.attr['fc']['mode'], 'N/A') + if 'fc' in attr: + info['fc'] = FLOW_CTRL_DICT_REVERSED.get(attr['fc']['mode'], 'N/A') else: info['fc'] = 'N/A' - if 'promiscuous' in self.attr: - info['prom'] = "on" if self.attr['promiscuous']['enabled'] else "off" + if 'promiscuous' in attr: + info['prom'] = "on" if attr['promiscuous']['enabled'] else "off" else: info['prom'] = "N/A" @@ -740,28 +768,37 @@ class Port(object): else: info['is_virtual'] = 'N/A' - if 'speed' in self.attr: + if 'speed' in attr: info['speed'] = self.get_formatted_speed() else: info['speed'] = 'N/A' - # RX info - if 'rx_filter_mode' in self.attr: - info['rx_filter_mode'] = 'Hardware Match' if self.attr['rx_filter_mode'] == 'hw' else 'Fetch All' + if 'rx_filter_mode' in attr: + info['rx_filter_mode'] = 'Hardware Match' if attr['rx_filter_mode'] == 'hw' else 'Fetch All' else: info['rx_filter_mode'] = 'N/A' - if 'sniffer' in self.rx_info: - sniffer = self.rx_info['sniffer'] - if sniffer['is_active']: - info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) - else: - info['rx_sniffer'] = 'off' + # RX info + rx_info = self.status['rx_info'] + + + # RX sniffer + if 'sniffer' in rx_info: + sniffer = rx_info['sniffer'] + info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off' else: info['rx_sniffer'] = 'N/A' + # RX queue + if 'queue' in rx_info: + queue = rx_info['queue'] + info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off' + else: + info['rx_queue'] = 'off' + + return info @@ -793,7 +830,7 @@ class Port(object): "flow ctrl" : info['fc'], "RX Filter Mode": info['rx_filter_mode'], - "RX Queueing": 'off', + "RX Queueing": info['rx_queue'], "RX sniffer": info['rx_sniffer'], } diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index b5747d21..3653de1a 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -380,7 +380,6 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value ¶ms, Json::Value &result) { const Json::Value &attr = parse_object(params, "attr", result); int ret = 0; - bool changed = false; /* iterate over all attributes in the dict */ for (const std::string &name : attr.getMemberNames()) { @@ -414,20 +413,12 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value ¶ms, Json::Value &result) { break; } - if (ret != 0){ - if ( ret == -ENOTSUP ) { - generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC."); - } - else if (ret) { - generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret)); - } - break; - } else { - changed = true; + if ( ret == -ENOTSUP ) { + generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC."); + } + else if (ret) { + generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret)); } - } - if (changed) { - get_stateless_obj()->get_platform_api()->publish_async_port_attr_changed(port_id); } result["result"] = Json::objectValue; @@ -612,11 +603,11 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value ¶ms, Json::Value &result) { } result["result"]["attr"]["fc"]["mode"] = mode; - /* RX data */ + /* RX filter */ result["result"]["attr"]["rx_filter_mode"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_rx_filter_mode(); - /* RX sniffer */ - port->get_rx_capture_info().to_json(result["result"]["rx_info"]["sniffer"]); + /* RX info */ + port->get_rx_features().to_json(result["result"]["rx_info"]); return (TREX_RPC_CMD_OK); } @@ -740,6 +731,32 @@ TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessP void TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { + bool enabled = parse_bool(msg, "enabled", result); + + if (enabled) { + + uint64_t size = parse_uint32(msg, "size", result); + + if (size == 0) { + generate_parse_err(result, "queue size cannot be zero"); + } + + try { + port->start_rx_queue(size); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + } else { + + try { + port->stop_rx_queue(); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + } + } void diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 2318061d..691185cd 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -947,12 +947,9 @@ TrexStatelessPort::remove_and_delete_all_streams() { void TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) { - m_rx_capture_info.enable(pcap_filename, limit); + m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit); - TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, - pcap_filename, - limit, - &m_rx_capture_info.m_shared_counter); + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info); send_message_to_rx(msg); } @@ -960,12 +957,23 @@ void TrexStatelessPort::stop_rx_capture() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id); send_message_to_rx(msg); - m_rx_capture_info.disable(); + m_rx_features_info.m_rx_capture_info.disable(); } -const RXCaptureInfo & -TrexStatelessPort::get_rx_capture_info() { - return m_rx_capture_info; +void +TrexStatelessPort::start_rx_queue(uint64_t size) { + + m_rx_features_info.m_rx_queue_info.enable(size); + + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info); + send_message_to_rx(msg); +} + +void +TrexStatelessPort::stop_rx_queue() { + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); + send_message_to_rx(msg); + m_rx_features_info.m_rx_queue_info.disable(); } diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 973a95c6..36f17659 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -382,11 +382,30 @@ public: */ void stop_rx_capture(); + /** + * start RX queueing of packets + * + * @author imarom (11/7/2016) + * + * @param limit + */ + void start_rx_queue(uint64_t limit); + + /** + * stop RX queueing + * + * @author imarom (11/7/2016) + */ + void stop_rx_queue(); + + /** - * status of the RX capture + * get the RX features info object * */ - const RXCaptureInfo &get_rx_capture_info(); + const RXFeaturesInfo &get_rx_features() { + return m_rx_features_info; + } /** * fetch the RX software packets from the queue @@ -481,7 +500,8 @@ private: int m_pending_async_stop_event; - RXCaptureInfo m_rx_capture_info; + RXFeaturesInfo m_rx_features_info; + }; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index bd444dff..95168c4d 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -276,6 +276,20 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { return true; } +bool +TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { + rx_core->start_queue(m_port_id, m_size, m_shared_counter); + + return true; +} + +bool +TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { + rx_core->stop_queue(m_port_id); + + return true; +} + bool TrexStatelessRxSwGetPkts::handle(CRxCoreStateless *rx_core) { RxPacketBuffer *pkt_buffer = rx_core->get_rx_sw_pkt_buffer(m_port_id); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index f35d9da6..b598a6d6 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -423,13 +423,11 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxStartCapture(uint8_t port_id, - const std::string &pcap_filename, - uint64_t limit, - uint64_t *shared_counter) : m_pcap_filename(pcap_filename) { - m_port_id = port_id; - m_limit = limit; - m_shared_counter = shared_counter; + TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) { + m_port_id = port_id; + m_limit = rx_capture_info.m_limit; + m_pcap_filename = rx_capture_info.m_pcap_filename; + m_shared_counter = &rx_capture_info.m_shared_counter; } virtual bool handle(CRxCoreStateless *rx_core); @@ -454,6 +452,34 @@ private: uint8_t m_port_id; }; +class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) { + m_port_id = port_id; + m_size = rx_queue_info.m_size; + m_shared_counter = &rx_queue_info.m_shared_counter; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; + uint64_t m_size; + uint64_t *m_shared_counter; +}; + +class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxStopQueue(uint8_t port_id) { + m_port_id = port_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; +}; + template<typename T> class TrexStatelessMsgReply { public: diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index 3fe72f54..2a678365 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -366,6 +366,16 @@ CRxCoreStateless::stop_capture(uint8_t port_id) { } void +CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) { + m_rx_port_mngr[port_id].start_queue(size, shared_counter); +} + +void +CRxCoreStateless::stop_queue(uint8_t port_id) { + m_rx_port_mngr[port_id].stop_queue(); +} + +void CRxCoreStateless::enable_latency() { for (int i = 0; i < m_max_ports; i++) { m_rx_port_mngr[i].enable_latency(); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 689b28ec..b5844583 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -128,6 +128,13 @@ class CRxCoreStateless { void stop_capture(uint8_t port_id); /** + * start RX queueing of packets + * + */ + void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter); + void stop_queue(uint8_t port_id); + + /** * enable latency feature for RX packets * will be apply to all ports */ diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h index ee124270..bdd86a72 100644 --- a/src/stateless/rx/trex_stateless_rx_defs.h +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -51,8 +51,8 @@ class CRxSlCfg { * */ typedef enum rx_filter_mode_ { - RX_FILTER_MODE_HW, - RX_FILTER_MODE_ALL + RX_FILTER_MODE_HW, + RX_FILTER_MODE_ALL } rx_filter_mode_e; /** @@ -95,4 +95,56 @@ public: uint64_t m_shared_counter; }; + +class RXQueueInfo { +public: + + RXQueueInfo() { + m_is_active = false; + m_shared_counter = 0; + } + + void enable(uint64_t size) { + m_size = size; + m_is_active = true; + } + + void disable() { + m_is_active = false; + m_size = 0; + } + + + void to_json(Json::Value &output) const { + output["is_active"] = m_is_active; + if (m_is_active) { + output["size"] = Json::UInt64(m_size); + output["count"] = Json::UInt64(m_shared_counter); + } + } + +public: + bool m_is_active; + uint64_t m_size; + uint64_t m_shared_counter; +}; + + +/** + * holds all the RX features info + * + * @author imarom (11/7/2016) + */ +class RXFeaturesInfo { +public: + RXCaptureInfo m_rx_capture_info; + RXQueueInfo m_rx_queue_info; + + void to_json(Json::Value &msg) const { + m_rx_capture_info.to_json(msg["sniffer"]); + m_rx_queue_info.to_json(msg["queue"]); + } +}; + #endif /* __TREX_STATELESS_RX_DEFS_H__ */ + diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index 7283f703..2683dbe1 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -216,7 +216,7 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { } if (is_feature_set(QUEUE)) { - m_pkt_buffer->push(new RxPacket(m)); + m_pkt_buffer->handle_pkt(m); } } diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 90527f0c..aa8ba8e9 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -33,7 +33,6 @@ class CPortLatencyHWBase; class CRFC2544Info; class CRxCoreErrCntrs; - class RXLatency { public: @@ -126,13 +125,18 @@ private: class RxPacketBuffer { public: - RxPacketBuffer(int limit) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_limit = limit; + RxPacketBuffer(uint64_t size, uint64_t *shared_counter) { + m_buffer = nullptr; + m_head = 0; + m_tail = 0; + m_size = (size + 1); // for the empty/full difference 1 slot reserved + m_shared_counter = shared_counter; + + *m_shared_counter = 0; + + m_buffer = new RxPacket*[m_size](); // zeroed - m_buffer = new RxPacket*[limit](); // zeroed + m_is_enabled = true; } ~RxPacketBuffer() { @@ -145,6 +149,18 @@ public: delete [] m_buffer; } + /* freeze the data structure - no more packets can be pushed / poped */ + RxPacketBuffer * freeze_and_clone() { + /* create a new one */ + RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter); + + /* freeze the current */ + m_shared_counter = NULL; + m_is_enabled = false; + + return new_buffer; + } + bool is_empty() const { return (m_head == m_tail); } @@ -153,15 +169,17 @@ public: return ( next(m_head) == m_tail); } - int get_limit() const { - return m_limit; - } + void handle_pkt(const rte_mbuf_t *m) { + assert(m_is_enabled); - void push(RxPacket *pkt) { + /* if full - pop the oldest */ if (is_full()) { delete pop(); } - m_buffer[m_head] = pkt; + + (*m_shared_counter)++; + + m_buffer[m_head] = new RxPacket(m); m_head = next(m_head); } @@ -169,7 +187,7 @@ public: * generate a JSON output of the queue * */ - Json::Value to_json() { + Json::Value to_json() const { Json::Value output = Json::arrayValue; @@ -185,21 +203,26 @@ public: private: int next(int v) const { - return ( (v + 1) % m_limit ); + return ( (v + 1) % m_size ); } /* pop in case of full queue - internal usage */ RxPacket * pop() { + assert(m_is_enabled); assert(!is_empty()); + RxPacket *pkt = m_buffer[m_tail]; m_tail = next(m_tail); + (*m_shared_counter)--; return pkt; } - int m_head; - int m_tail; - int m_limit; - RxPacket **m_buffer; + int m_head; + int m_tail; + int m_size; + RxPacket **m_buffer; + uint64_t *m_shared_counter; + bool m_is_enabled; }; /************************ recoder ***************************/ @@ -294,11 +317,11 @@ public: * queueing packets * */ - void start_queue(uint32_t limit) { + void start_queue(uint32_t size, uint64_t *shared_counter) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RxPacketBuffer(limit); + m_pkt_buffer = new RxPacketBuffer(size, shared_counter); set_feature(QUEUE); } @@ -317,11 +340,11 @@ public: assert(m_pkt_buffer); - /* take the current */ + /* hold a pointer to the old one */ RxPacketBuffer *old_buffer = m_pkt_buffer; - /* allocate a new buffer */ - m_pkt_buffer = new RxPacketBuffer(old_buffer->get_limit()); + /* replace the old one with a new one and freeze the old */ + m_pkt_buffer = old_buffer->freeze_and_clone(); return old_buffer; } |