summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2019-03-09 17:45:25 -0800
committerDave Barach <openvpp@barachs.net>2019-03-12 19:29:36 +0000
commit176bcb28d845f74be5782fc676d0dd1adf0c12bb (patch)
tree034ec50041d5f3f432c6480d9cdabc5832048929
parent0147c0fefa2f5af3f878c4a05eb1f46db7e6ade5 (diff)
svm mq: add unit test
Change-Id: I2f1fa15a99163b9c105707484503dc9502265c52 Signed-off-by: Florin Coras <fcoras@cisco.com>
-rw-r--r--src/plugins/unittest/session_test.c179
-rw-r--r--src/svm/message_queue.h3
2 files changed, 179 insertions, 3 deletions
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index fb1c54fcb1c..5ad6563ed62 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -19,6 +19,7 @@
#include <vnet/session/session.h>
#include <vnet/session/session_rules_table.h>
#include <vnet/tcp/tcp.h>
+#include <sys/epoll.h>
#define SESSION_TEST_I(_cond, _comment, _args...) \
({ \
@@ -40,6 +41,9 @@
} \
}
+#define ST_DBG(_comment, _args...) \
+ fformat(stderr, _comment "\n", ##_args); \
+
void
dummy_session_reset_callback (session_t * s)
{
@@ -1692,6 +1696,177 @@ session_test_proxy (vlib_main_t * vm, unformat_input_t * input)
return 0;
}
+static inline void
+wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd)
+{
+ if (!use_eventfd)
+ {
+ svm_msg_q_lock (mq);
+ while (svm_msg_q_is_empty (mq))
+ svm_msg_q_wait (mq);
+ }
+ else
+ {
+ int __clib_unused n_read, rv;
+ struct epoll_event ep_evt;
+ u64 buf;
+
+ while (1)
+ {
+ rv = epoll_wait (epfd, &ep_evt, 1, -1);
+ if (rv < 0)
+ {
+ ST_DBG ("epoll error");
+ exit (1);
+ }
+ else if (rv > 0 && (ep_evt.events & EPOLLIN))
+ {
+ n_read = read (fd, &buf, sizeof (buf));
+ }
+ else
+ continue;
+
+ if (!svm_msg_q_is_empty (mq))
+ {
+ svm_msg_q_lock (mq);
+ break;
+ }
+ }
+ }
+}
+
+static int
+session_test_mq (vlib_main_t * vm, unformat_input_t * input)
+{
+ int error, __clib_unused verbose, use_eventfd = 0;
+ u64 i, n_test_msgs = 1 << 10, *counter;
+ u64 options[APP_OPTIONS_N_OPTIONS];
+ int epfd = -1, rv, prod_fd = -1;
+ svm_fifo_t *rx_fifo, *tx_fifo;
+ vl_api_registration_t *reg;
+ struct epoll_event ep_evt;
+ u32 app_index, api_index;
+ u32 fifo_segment_index;
+ app_worker_t *app_wrk;
+ segment_manager_t *sm;
+ svm_msg_q_msg_t msg;
+ application_t *app;
+ svm_msg_q_t *mq;
+ f64 start, diff;
+ svm_queue_t *q;
+ session_t s;
+ pid_t pid;
+
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "verbose"))
+ verbose = 1;
+ else if (unformat (input, "%d", &n_test_msgs))
+ ;
+ else if (unformat (input, "use-eventfd"))
+ use_eventfd = 1;
+ else
+ {
+ vlib_cli_output (vm, "parse error: '%U'", format_unformat_error,
+ input);
+ return -1;
+ }
+ }
+
+ q = clib_mem_alloc (sizeof (*q));
+ api_index = vl_api_memclnt_create_internal ("session_mq_test_api", q);
+
+ clib_memset (options, 0, sizeof (options));
+ options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
+ options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
+ options[APP_OPTIONS_EVT_QUEUE_SIZE] = 2048;
+
+ reg = vl_api_client_index_to_registration (api_index);
+ if (!session_main.evt_qs_use_memfd_seg)
+ reg->clib_file_index = VL_API_INVALID_FI;
+
+ vnet_app_attach_args_t attach_args = {
+ .api_client_index = api_index,
+ .options = options,
+ .namespace_id = 0,
+ .session_cb_vft = &dummy_session_cbs,
+ .name = format (0, "session_mq_test"),
+ };
+ error = vnet_application_attach (&attach_args);
+ SESSION_TEST ((error == 0), "server attachment should work");
+
+ app_index = attach_args.app_index;
+
+ app = application_get (app_index);
+ app_wrk = application_get_worker (app, 0);
+ mq = app_wrk->event_queue;
+ if (use_eventfd)
+ {
+ svm_msg_q_alloc_producer_eventfd (mq);
+ svm_msg_q_alloc_consumer_eventfd (mq);
+ prod_fd = svm_msg_q_get_producer_eventfd (mq);
+ SESSION_TEST (prod_fd != -1, "mq producer eventd valid %u", prod_fd);
+ }
+
+ sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
+ segment_manager_alloc_session_fifos (sm, &rx_fifo, &tx_fifo,
+ &fifo_segment_index);
+ s.rx_fifo = rx_fifo;
+ s.tx_fifo = tx_fifo;
+ s.session_state = SESSION_STATE_READY;
+ counter = (u64 *) rx_fifo->data;
+ start = vlib_time_now (vm);
+
+ pid = fork ();
+ if (pid < 0)
+ SESSION_TEST (0, "fork failed");
+
+ if (pid == 0)
+ {
+ if (use_eventfd)
+ {
+ epfd = epoll_create1 (0);
+ SESSION_TEST (epfd != -1, "epfd created");
+ ep_evt.events = EPOLLIN;
+ ep_evt.data.u64 = prod_fd;
+ rv = epoll_ctl (epfd, EPOLL_CTL_ADD, prod_fd, &ep_evt);
+ SESSION_TEST (rv == 0, "epoll returned %d", rv);
+ }
+
+ for (i = 0; i < n_test_msgs; i++)
+ {
+ wait_for_event (mq, prod_fd, epfd, use_eventfd);
+ svm_msg_q_sub_w_lock (mq, &msg);
+ svm_msg_q_free_msg (mq, &msg);
+ svm_msg_q_unlock (mq);
+ *counter = *counter + 1;
+ svm_fifo_unset_event (rx_fifo);
+ }
+ exit (0);
+ }
+ else
+ {
+ ST_DBG ("client pid %u", pid);
+ for (i = 0; i < n_test_msgs; i++)
+ {
+ while (svm_fifo_has_event (rx_fifo))
+ ;
+ app_worker_lock_and_send_event (app_wrk, &s, SESSION_IO_EVT_RX);
+ }
+ }
+
+ diff = vlib_time_now (vm) - start;
+ ST_DBG ("done %u events in %.2f sec: %f evts/s", *counter,
+ diff, *counter / diff);
+
+ vnet_app_detach_args_t detach_args = {
+ .app_index = app_index,
+ .api_client_index = ~0,
+ };
+ vnet_application_detach (&detach_args);
+ return 0;
+}
+
static clib_error_t *
session_test (vlib_main_t * vm,
unformat_input_t * input, vlib_cli_command_t * cmd_arg)
@@ -1714,6 +1889,8 @@ session_test (vlib_main_t * vm,
res = session_test_proxy (vm, input);
else if (unformat (input, "endpt-cfg"))
res = session_test_endpoint_cfg (vm, input);
+ else if (unformat (input, "mq"))
+ res = session_test_mq (vm, input);
else if (unformat (input, "all"))
{
if ((res = session_test_basic (vm, input)))
@@ -1728,6 +1905,8 @@ session_test (vlib_main_t * vm,
goto done;
if ((res = session_test_endpoint_cfg (vm, input)))
goto done;
+ if ((res = session_test_mq (vm, input)))
+ goto done;
}
else
break;
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index 28bf14e545e..c57b51acc7a 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -314,9 +314,6 @@ svm_msg_q_lock (svm_msg_q_t * mq)
static inline void
svm_msg_q_unlock (svm_msg_q_t * mq)
{
- /* The other side of the connection is not polling */
- if (mq->q->cursize < (mq->q->maxsize / 8))
- (void) pthread_cond_broadcast (&mq->q->condvar);
pthread_mutex_unlock (&mq->q->mutex);
}
acket import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP from framework import VppTestCase, VppTestRunner from util import Host, ppp class TestL2xc(VppTestCase): """ L2XC Test Case """ @classmethod def setUpClass(cls): """ Perform standard class setup (defined by class method setUpClass in class VppTestCase) before running the test case, set test case related variables and configure VPP. :var int hosts_nr: Number of hosts to be created. :var int dl_pkts_per_burst: Number of packets in burst for dual-loop test. :var int sl_pkts_per_burst: Number of packets in burst for single-loop test. """ super(TestL2xc, cls).setUpClass() # Test variables cls.hosts_nr = 10 cls.dl_pkts_per_burst = 257 cls.sl_pkts_per_burst = 2 try: # create 4 pg interfaces cls.create_pg_interfaces(range(4)) # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc. cls.flows = dict() cls.flows[cls.pg0] = [cls.pg1] cls.flows[cls.pg1] = [cls.pg0] cls.flows[cls.pg2] = [cls.pg3] cls.flows[cls.pg3] = [cls.pg2] # packet sizes cls.pg_if_packet_sizes = [64, 512, 1518, 9018] cls.interfaces = list(cls.pg_interfaces) # Create bi-directional cross-connects between pg0 and pg1 cls.vapi.sw_interface_set_l2_xconnect( cls.pg0.sw_if_index, cls.pg1.sw_if_index, enable=1) cls.vapi.sw_interface_set_l2_xconnect( cls.pg1.sw_if_index, cls.pg0.sw_if_index, enable=1) # Create bi-directional cross-connects between pg2 and pg3 cls.vapi.sw_interface_set_l2_xconnect( cls.pg2.sw_if_index, cls.pg3.sw_if_index, enable=1) cls.vapi.sw_interface_set_l2_xconnect( cls.pg3.sw_if_index, cls.pg2.sw_if_index, enable=1) # mapping between packet-generator index and lists of test hosts cls.hosts_by_pg_idx = dict() # Create host MAC and IPv4 lists cls.create_host_lists(cls.hosts_nr) # setup all interfaces for i in cls.interfaces: i.admin_up() except Exception: super(TestL2xc, cls).tearDownClass() raise def setUp(self): super(TestL2xc, self).setUp() self.reset_packet_infos() def tearDown(self): """ Show various debug prints after each test. """ super(TestL2xc, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.ppcli("show l2patch")) @classmethod def create_host_lists(cls, count): """ Method to create required number of MAC and IPv4 addresses. Create required number of host MAC addresses and distribute them among interfaces. Create host IPv4 address for every host MAC address too. :param count: Number of hosts to create MAC and IPv4 addresses for. """ for pg_if in cls.pg_interfaces: cls.hosts_by_pg_idx[pg_if.sw_if_index] = [] hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index] for j in range(0, count): host = Host( "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), "172.17.1%02x.%u" % (pg_if.sw_if_index, j)) hosts.append(host) def create_stream(self, src_if, packet_sizes, packets_per_burst): """ Create input packet stream for defined interface. :param object src_if: Interface to create packet stream for. :param list packet_sizes: List of required packet sizes. :param int packets_per_burst: Number of packets in burst. :return: Stream of packets. """ pkts = [] for i in range(0, packets_per_burst): dst_if = self.flows[src_if][0] dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index]) src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index]) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) p = (Ether(dst=dst_host.mac, src=src_host.mac) / IP(src=src_host.ip4, dst=dst_host.ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) pkt_info.data = p.copy() size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) return pkts def verify_capture(self, pg_if, capture): """ Verify captured input packet stream for defined interface. :param object pg_if: Interface to verify captured packet stream for. :param list capture: Captured packet stream. """ last_info = dict() for i in self.interfaces: last_info[i.sw_if_index] = None dst_sw_if_index = pg_if.sw_if_index for packet in capture: try: ip = packet[IP] udp = packet[UDP] payload_info = self.payload_to_info(str(packet[Raw])) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug("Got packet on port %s: src=%u (id=%u)" % (pg_if.name, payload_info.src, packet_index)) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) saved_packet = next_info.data # Check standard fields self.assertEqual(ip.src, saved_packet[IP].src) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertEqual(udp.dport, saved_packet[UDP].dport) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for i in self.interfaces: remaining_packet = self.get_next_packet_info_for_interface2( i, dst_sw_if_index, last_info[i.sw_if_index]) self.assertTrue(remaining_packet is None, "Port %u: Packet expected from source %u didn't" " arrive" % (dst_sw_if_index, i.sw_if_index)) def run_l2xc_test(self, pkts_per_burst): """ L2XC test """ # Create incoming packet streams for packet-generator interfaces for i in self.interfaces: pkts = self.create_stream(i, self.pg_if_packet_sizes, pkts_per_burst) i.add_stream(pkts) # Enable packet capturing and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Verify outgoing packet streams per packet-generator interface for i in self.pg_interfaces: capture = i.get_capture() self.logger.info("Verifying capture on interface %s" % i.name) self.verify_capture(i, capture) def test_l2xc_sl(self): """ L2XC single-loop test Test scenario: 1. config 2 pairs of 2 interfaces, l2xconnected 2. sending l2 eth packets between 4 interfaces 64B, 512B, 1518B, 9018B (ether_size) burst of 2 packets per interface """ self.run_l2xc_test(self.sl_pkts_per_burst) def test_l2xc_dl(self): """ L2XC dual-loop test Test scenario: 1. config 2 pairs of 2 interfaces, l2xconnected 2. sending l2 eth packets between 4 interfaces 64B, 512B, 1518B, 9018B (ether_size) burst of 257 packets per interface """ self.run_l2xc_test(self.dl_pkts_per_burst) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)