summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-11-07 18:47:23 +0200
committerimarom <imarom@cisco.com>2016-11-07 18:47:23 +0200
commitf9a0c5e2e1e1135cb0c0e6e192565e5b100c5a41 (patch)
treef09e19975f324d9c0eb717608473dcdbd334a608
parente85ea75669ea39e4f99519138a3a84e4df6eed2d (diff)
RX features - queueing
Signed-off-by: imarom <imarom@cisco.com>
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py121
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py91
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp51
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp26
-rw-r--r--src/stateless/cp/trex_stateless_port.h26
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp14
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h40
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp10
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h7
-rw-r--r--src/stateless/rx/trex_stateless_rx_defs.h56
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp2
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h69
12 files changed, 403 insertions, 110 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index 2d5a6379..583917ea 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -324,26 +324,25 @@ class EventsHandler(object):
# port attr changed
elif (event_type == 8):
- return
- # port_id = int(data['port_id'])
- #
- # if data['attr'] == self.client.ports[port_id].attr:
- # return # false alarm
- #
- # old_info = self.client.ports[port_id].get_formatted_info()
- # self.__async_event_port_attr_changed(port_id, data['attr'])
- #
- # new_info = self.client.ports[port_id].get_formatted_info()
- # ev = "port {0} attributes changed".format(port_id)
- # for key, old_val in old_info.items():
- # new_val = new_info[key]
- # if old_val != new_val:
- # ev += '\n {key}: {old} -> {new}'.format(
- # key = key,
- # old = old_val.lower() if type(old_val) is str else old_val,
- # new = new_val.lower() if type(new_val) is str else new_val)
- # show_event = True
+ port_id = int(data['port_id'])
+
+ if data['attr'] == self.client.ports[port_id].attr:
+ return # false alarm
+
+ old_info = self.client.ports[port_id].get_formatted_info(sync = False)
+ self.__async_event_port_attr_changed(port_id, data['attr'])
+
+ new_info = self.client.ports[port_id].get_formatted_info(sync = False)
+ ev = "port {0} attributes changed".format(port_id)
+ for key, old_val in old_info.items():
+ new_val = new_info[key]
+ if old_val != new_val:
+ ev += '\n {key}: {old} -> {new}'.format(
+ key = key,
+ old = old_val.lower() if type(old_val) is str else old_val,
+ new = new_val.lower() if type(new_val) is str else new_val)
+ show_event = True
# server stopped
@@ -839,6 +838,24 @@ class STLClient(object):
return rc
+ def __set_rx_queue (self, port_id_list, size):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].set_rx_queue(size))
+
+ return rc
+
+ def __remove_rx_queue (self, port_id_list):
+ port_id_list = self.__ports(port_id_list)
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].remove_rx_queue())
+
+ return rc
+
# connect to server
def __connect(self):
@@ -1754,6 +1771,7 @@ class STLClient(object):
if not rc:
raise STLError(rc)
+
@__api_check(True)
def ping(self):
"""
@@ -1767,6 +1785,11 @@ class STLClient(object):
+ :exc:`STLError`
"""
+ rc = self.set_port_attr(ports = [0, 1], rx_filter_mode = 'all')
+ rc = self.set_rx_queue(ports = [0, 1], size = 1000)
+ if not rc:
+ raise STLError(rc)
+
self.logger.pre_cmd("Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
self.connection_info['sync_port']))
@@ -1777,6 +1800,7 @@ class STLClient(object):
if not rc:
raise STLError(rc)
+
@__api_check(True)
def server_shutdown (self, force = False):
"""
@@ -1890,6 +1914,7 @@ class STLClient(object):
link_up = True,
rx_filter_mode = 'hw')
self.remove_rx_sniffer(ports)
+ self.remove_rx_queue(ports)
@@ -2737,7 +2762,8 @@ class STLClient(object):
# check arguments
validate_type('base_filename', base_filename, basestring)
validate_type('limit', limit, (int))
-
+ if limit <= 0:
+ raise STLError("'limit' must be a positive value")
self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports))
rc = self.__set_rx_sniffer(ports, base_filename, limit)
@@ -2750,7 +2776,7 @@ class STLClient(object):
@__api_check(True)
- def remove_rx_sniffer (self, ports = None, base_filename = 'rx_capture', limit = 1000):
+ def remove_rx_sniffer (self, ports = None):
"""
Removes RX sniffer from port(s)
@@ -2768,6 +2794,59 @@ class STLClient(object):
if not rc:
raise STLError(rc)
+
+ @__api_check(True)
+ def set_rx_queue (self, ports = None, size = 1000):
+ """
+ Sets RX queue for port(s)
+ The queue is cyclic and will hold last 'size' packets
+
+ :parameters:
+ ports - for which ports to apply a unique sniffer (each port gets a unique file)
+ size - size of the queue
+ :raises:
+ + :exe:'STLError'
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ # check arguments
+ validate_type('size', size, (int))
+ if size <= 0:
+ raise STLError("'size' must be a positive value")
+
+ self.logger.pre_cmd("Setting RX queue on port(s) {0}:".format(ports))
+ rc = self.__set_rx_queue(ports, size)
+ self.logger.post_cmd(rc)
+
+
+ if not rc:
+ raise STLError(rc)
+
+
+
+ @__api_check(True)
+ def remove_rx_queue (self, ports = None):
+ """
+ Removes RX queue from port(s)
+
+ :raises:
+ + :exe:'STLError'
+
+ """
+ ports = ports if ports is not None else self.get_acquired_ports()
+ ports = self._validate_port_list(ports)
+
+ self.logger.pre_cmd("Removing RX queue on port(s) {0}:".format(ports))
+ rc = self.__remove_rx_queue(ports)
+ self.logger.post_cmd(rc)
+
+ if not rc:
+ raise STLError(rc)
+
+
+
def clear_events (self):
"""
Clear all events
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index 4e5778a6..94745d15 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -62,6 +62,7 @@ class Port(object):
self.streams = {}
self.profile = None
self.session_id = session_id
+ self.status = {}
self.attr = {}
self.port_stats = trex_stl_stats.CPortStats(self)
@@ -250,11 +251,9 @@ class Port(object):
self.next_available_id = int(rc.data()['max_stream_id']) + 1
- # attributes
- self.attr = rc.data()['attr']
+ self.status = rc.data()
- # rx info
- self.rx_info = rc.data()['rx_info']
+ self.attr = rc.data()['attr']
return self.ok()
@@ -520,6 +519,35 @@ class Port(object):
@owned
+ def set_rx_queue (self, size):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "queue",
+ "enabled": True,
+ "size": size}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+ @owned
+ def remove_rx_queue (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "type": "queue",
+ "enabled": False}
+
+ rc = self.transmit("set_rx_feature", params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ return self.ok()
+
+
+ @owned
def pause (self):
if (self.state == self.STATE_PCAP_TX) :
@@ -610,9 +638,6 @@ class Port(object):
if rc.bad():
return self.err(rc.err())
-
- #self.attr.update(attr_dict)
-
return self.ok()
@owned
@@ -693,27 +718,30 @@ class Port(object):
print("\n")
# generate formatted (console friendly) port info
- def get_formatted_info (self):
+ def get_formatted_info (self, sync = True):
- # sync the attributes
- self.sync()
+ # sync the status
+ if sync:
+ self.sync()
+
+ attr = self.attr
info = dict(self.info)
info['status'] = self.get_port_state_name()
- if 'link' in self.attr:
- info['link'] = 'UP' if self.attr['link']['up'] else 'DOWN'
+ if 'link' in attr:
+ info['link'] = 'UP' if attr['link']['up'] else 'DOWN'
else:
info['link'] = 'N/A'
- if 'fc' in self.attr:
- info['fc'] = FLOW_CTRL_DICT_REVERSED.get(self.attr['fc']['mode'], 'N/A')
+ if 'fc' in attr:
+ info['fc'] = FLOW_CTRL_DICT_REVERSED.get(attr['fc']['mode'], 'N/A')
else:
info['fc'] = 'N/A'
- if 'promiscuous' in self.attr:
- info['prom'] = "on" if self.attr['promiscuous']['enabled'] else "off"
+ if 'promiscuous' in attr:
+ info['prom'] = "on" if attr['promiscuous']['enabled'] else "off"
else:
info['prom'] = "N/A"
@@ -740,28 +768,37 @@ class Port(object):
else:
info['is_virtual'] = 'N/A'
- if 'speed' in self.attr:
+ if 'speed' in attr:
info['speed'] = self.get_formatted_speed()
else:
info['speed'] = 'N/A'
- # RX info
- if 'rx_filter_mode' in self.attr:
- info['rx_filter_mode'] = 'Hardware Match' if self.attr['rx_filter_mode'] == 'hw' else 'Fetch All'
+ if 'rx_filter_mode' in attr:
+ info['rx_filter_mode'] = 'Hardware Match' if attr['rx_filter_mode'] == 'hw' else 'Fetch All'
else:
info['rx_filter_mode'] = 'N/A'
- if 'sniffer' in self.rx_info:
- sniffer = self.rx_info['sniffer']
- if sniffer['is_active']:
- info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit'])
- else:
- info['rx_sniffer'] = 'off'
+ # RX info
+ rx_info = self.status['rx_info']
+
+
+ # RX sniffer
+ if 'sniffer' in rx_info:
+ sniffer = rx_info['sniffer']
+ info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off'
else:
info['rx_sniffer'] = 'N/A'
+ # RX queue
+ if 'queue' in rx_info:
+ queue = rx_info['queue']
+ info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off'
+ else:
+ info['rx_queue'] = 'off'
+
+
return info
@@ -793,7 +830,7 @@ class Port(object):
"flow ctrl" : info['fc'],
"RX Filter Mode": info['rx_filter_mode'],
- "RX Queueing": 'off',
+ "RX Queueing": info['rx_queue'],
"RX sniffer": info['rx_sniffer'],
}
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index b5747d21..3653de1a 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -380,7 +380,6 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value &params, Json::Value &result) {
const Json::Value &attr = parse_object(params, "attr", result);
int ret = 0;
- bool changed = false;
/* iterate over all attributes in the dict */
for (const std::string &name : attr.getMemberNames()) {
@@ -414,20 +413,12 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value &params, Json::Value &result) {
break;
}
- if (ret != 0){
- if ( ret == -ENOTSUP ) {
- generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC.");
- }
- else if (ret) {
- generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret));
- }
- break;
- } else {
- changed = true;
+ if ( ret == -ENOTSUP ) {
+ generate_execute_err(result, "Error applying " + name + ": operation is not supported for this NIC.");
+ }
+ else if (ret) {
+ generate_execute_err(result, "Error applying " + name + " attribute, return value: " + to_string(ret));
}
- }
- if (changed) {
- get_stateless_obj()->get_platform_api()->publish_async_port_attr_changed(port_id);
}
result["result"] = Json::objectValue;
@@ -612,11 +603,11 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
}
result["result"]["attr"]["fc"]["mode"] = mode;
- /* RX data */
+ /* RX filter */
result["result"]["attr"]["rx_filter_mode"] = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->get_rx_filter_mode();
- /* RX sniffer */
- port->get_rx_capture_info().to_json(result["result"]["rx_info"]["sniffer"]);
+ /* RX info */
+ port->get_rx_features().to_json(result["result"]["rx_info"]);
return (TREX_RPC_CMD_OK);
}
@@ -740,6 +731,32 @@ TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessP
void
TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
+ bool enabled = parse_bool(msg, "enabled", result);
+
+ if (enabled) {
+
+ uint64_t size = parse_uint32(msg, "size", result);
+
+ if (size == 0) {
+ generate_parse_err(result, "queue size cannot be zero");
+ }
+
+ try {
+ port->start_rx_queue(size);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ } else {
+
+ try {
+ port->stop_rx_queue();
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ }
+
}
void
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 2318061d..691185cd 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -947,12 +947,9 @@ TrexStatelessPort::remove_and_delete_all_streams() {
void
TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
- m_rx_capture_info.enable(pcap_filename, limit);
+ m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit);
- TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id,
- pcap_filename,
- limit,
- &m_rx_capture_info.m_shared_counter);
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
send_message_to_rx(msg);
}
@@ -960,12 +957,23 @@ void
TrexStatelessPort::stop_rx_capture() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
send_message_to_rx(msg);
- m_rx_capture_info.disable();
+ m_rx_features_info.m_rx_capture_info.disable();
}
-const RXCaptureInfo &
-TrexStatelessPort::get_rx_capture_info() {
- return m_rx_capture_info;
+void
+TrexStatelessPort::start_rx_queue(uint64_t size) {
+
+ m_rx_features_info.m_rx_queue_info.enable(size);
+
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
+ send_message_to_rx(msg);
+}
+
+void
+TrexStatelessPort::stop_rx_queue() {
+ TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
+ send_message_to_rx(msg);
+ m_rx_features_info.m_rx_queue_info.disable();
}
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 973a95c6..36f17659 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -382,11 +382,30 @@ public:
*/
void stop_rx_capture();
+ /**
+ * start RX queueing of packets
+ *
+ * @author imarom (11/7/2016)
+ *
+ * @param limit
+ */
+ void start_rx_queue(uint64_t limit);
+
+ /**
+ * stop RX queueing
+ *
+ * @author imarom (11/7/2016)
+ */
+ void stop_rx_queue();
+
+
/**
- * status of the RX capture
+ * get the RX features info object
*
*/
- const RXCaptureInfo &get_rx_capture_info();
+ const RXFeaturesInfo &get_rx_features() {
+ return m_rx_features_info;
+ }
/**
* fetch the RX software packets from the queue
@@ -481,7 +500,8 @@ private:
int m_pending_async_stop_event;
- RXCaptureInfo m_rx_capture_info;
+ RXFeaturesInfo m_rx_features_info;
+
};
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index bd444dff..95168c4d 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -276,6 +276,20 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
return true;
}
+bool
+TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
+ rx_core->start_queue(m_port_id, m_size, m_shared_counter);
+
+ return true;
+}
+
+bool
+TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
+ rx_core->stop_queue(m_port_id);
+
+ return true;
+}
+
bool TrexStatelessRxSwGetPkts::handle(CRxCoreStateless *rx_core) {
RxPacketBuffer *pkt_buffer = rx_core->get_rx_sw_pkt_buffer(m_port_id);
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index f35d9da6..b598a6d6 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -423,13 +423,11 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxStartCapture(uint8_t port_id,
- const std::string &pcap_filename,
- uint64_t limit,
- uint64_t *shared_counter) : m_pcap_filename(pcap_filename) {
- m_port_id = port_id;
- m_limit = limit;
- m_shared_counter = shared_counter;
+ TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) {
+ m_port_id = port_id;
+ m_limit = rx_capture_info.m_limit;
+ m_pcap_filename = rx_capture_info.m_pcap_filename;
+ m_shared_counter = &rx_capture_info.m_shared_counter;
}
virtual bool handle(CRxCoreStateless *rx_core);
@@ -454,6 +452,34 @@ private:
uint8_t m_port_id;
};
+class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase {
+public:
+ TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) {
+ m_port_id = port_id;
+ m_size = rx_queue_info.m_size;
+ m_shared_counter = &rx_queue_info.m_shared_counter;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+ uint64_t m_size;
+ uint64_t *m_shared_counter;
+};
+
+class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase {
+public:
+ TrexStatelessRxStopQueue(uint8_t port_id) {
+ m_port_id = port_id;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+};
+
template<typename T> class TrexStatelessMsgReply {
public:
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index 3fe72f54..2a678365 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -366,6 +366,16 @@ CRxCoreStateless::stop_capture(uint8_t port_id) {
}
void
+CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) {
+ m_rx_port_mngr[port_id].start_queue(size, shared_counter);
+}
+
+void
+CRxCoreStateless::stop_queue(uint8_t port_id) {
+ m_rx_port_mngr[port_id].stop_queue();
+}
+
+void
CRxCoreStateless::enable_latency() {
for (int i = 0; i < m_max_ports; i++) {
m_rx_port_mngr[i].enable_latency();
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 689b28ec..b5844583 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -128,6 +128,13 @@ class CRxCoreStateless {
void stop_capture(uint8_t port_id);
/**
+ * start RX queueing of packets
+ *
+ */
+ void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter);
+ void stop_queue(uint8_t port_id);
+
+ /**
* enable latency feature for RX packets
* will be apply to all ports
*/
diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h
index ee124270..bdd86a72 100644
--- a/src/stateless/rx/trex_stateless_rx_defs.h
+++ b/src/stateless/rx/trex_stateless_rx_defs.h
@@ -51,8 +51,8 @@ class CRxSlCfg {
*
*/
typedef enum rx_filter_mode_ {
- RX_FILTER_MODE_HW,
- RX_FILTER_MODE_ALL
+ RX_FILTER_MODE_HW,
+ RX_FILTER_MODE_ALL
} rx_filter_mode_e;
/**
@@ -95,4 +95,56 @@ public:
uint64_t m_shared_counter;
};
+
+class RXQueueInfo {
+public:
+
+ RXQueueInfo() {
+ m_is_active = false;
+ m_shared_counter = 0;
+ }
+
+ void enable(uint64_t size) {
+ m_size = size;
+ m_is_active = true;
+ }
+
+ void disable() {
+ m_is_active = false;
+ m_size = 0;
+ }
+
+
+ void to_json(Json::Value &output) const {
+ output["is_active"] = m_is_active;
+ if (m_is_active) {
+ output["size"] = Json::UInt64(m_size);
+ output["count"] = Json::UInt64(m_shared_counter);
+ }
+ }
+
+public:
+ bool m_is_active;
+ uint64_t m_size;
+ uint64_t m_shared_counter;
+};
+
+
+/**
+ * holds all the RX features info
+ *
+ * @author imarom (11/7/2016)
+ */
+class RXFeaturesInfo {
+public:
+ RXCaptureInfo m_rx_capture_info;
+ RXQueueInfo m_rx_queue_info;
+
+ void to_json(Json::Value &msg) const {
+ m_rx_capture_info.to_json(msg["sniffer"]);
+ m_rx_queue_info.to_json(msg["queue"]);
+ }
+};
+
#endif /* __TREX_STATELESS_RX_DEFS_H__ */
+
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index 7283f703..2683dbe1 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -216,7 +216,7 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
}
if (is_feature_set(QUEUE)) {
- m_pkt_buffer->push(new RxPacket(m));
+ m_pkt_buffer->handle_pkt(m);
}
}
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
index 90527f0c..aa8ba8e9 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.h
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -33,7 +33,6 @@ class CPortLatencyHWBase;
class CRFC2544Info;
class CRxCoreErrCntrs;
-
class RXLatency {
public:
@@ -126,13 +125,18 @@ private:
class RxPacketBuffer {
public:
- RxPacketBuffer(int limit) {
- m_buffer = nullptr;
- m_head = 0;
- m_tail = 0;
- m_limit = limit;
+ RxPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+ m_buffer = nullptr;
+ m_head = 0;
+ m_tail = 0;
+ m_size = (size + 1); // for the empty/full difference 1 slot reserved
+ m_shared_counter = shared_counter;
+
+ *m_shared_counter = 0;
+
+ m_buffer = new RxPacket*[m_size](); // zeroed
- m_buffer = new RxPacket*[limit](); // zeroed
+ m_is_enabled = true;
}
~RxPacketBuffer() {
@@ -145,6 +149,18 @@ public:
delete [] m_buffer;
}
+ /* freeze the data structure - no more packets can be pushed / poped */
+ RxPacketBuffer * freeze_and_clone() {
+ /* create a new one */
+ RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter);
+
+ /* freeze the current */
+ m_shared_counter = NULL;
+ m_is_enabled = false;
+
+ return new_buffer;
+ }
+
bool is_empty() const {
return (m_head == m_tail);
}
@@ -153,15 +169,17 @@ public:
return ( next(m_head) == m_tail);
}
- int get_limit() const {
- return m_limit;
- }
+ void handle_pkt(const rte_mbuf_t *m) {
+ assert(m_is_enabled);
- void push(RxPacket *pkt) {
+ /* if full - pop the oldest */
if (is_full()) {
delete pop();
}
- m_buffer[m_head] = pkt;
+
+ (*m_shared_counter)++;
+
+ m_buffer[m_head] = new RxPacket(m);
m_head = next(m_head);
}
@@ -169,7 +187,7 @@ public:
* generate a JSON output of the queue
*
*/
- Json::Value to_json() {
+ Json::Value to_json() const {
Json::Value output = Json::arrayValue;
@@ -185,21 +203,26 @@ public:
private:
int next(int v) const {
- return ( (v + 1) % m_limit );
+ return ( (v + 1) % m_size );
}
/* pop in case of full queue - internal usage */
RxPacket * pop() {
+ assert(m_is_enabled);
assert(!is_empty());
+
RxPacket *pkt = m_buffer[m_tail];
m_tail = next(m_tail);
+ (*m_shared_counter)--;
return pkt;
}
- int m_head;
- int m_tail;
- int m_limit;
- RxPacket **m_buffer;
+ int m_head;
+ int m_tail;
+ int m_size;
+ RxPacket **m_buffer;
+ uint64_t *m_shared_counter;
+ bool m_is_enabled;
};
/************************ recoder ***************************/
@@ -294,11 +317,11 @@ public:
* queueing packets
*
*/
- void start_queue(uint32_t limit) {
+ void start_queue(uint32_t size, uint64_t *shared_counter) {
if (m_pkt_buffer) {
delete m_pkt_buffer;
}
- m_pkt_buffer = new RxPacketBuffer(limit);
+ m_pkt_buffer = new RxPacketBuffer(size, shared_counter);
set_feature(QUEUE);
}
@@ -317,11 +340,11 @@ public:
assert(m_pkt_buffer);
- /* take the current */
+ /* hold a pointer to the old one */
RxPacketBuffer *old_buffer = m_pkt_buffer;
- /* allocate a new buffer */
- m_pkt_buffer = new RxPacketBuffer(old_buffer->get_limit());
+ /* replace the old one with a new one and freeze the old */
+ m_pkt_buffer = old_buffer->freeze_and_clone();
return old_buffer;
}