summaryrefslogtreecommitdiffstats
path: root/src/plugins
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2022-12-22 15:03:44 -0800
committerDave Barach <vpp@barachs.net>2023-08-09 18:45:26 +0000
commit0242d30fc717aeacb758281dad8e5b2e56bf6709 (patch)
treeeb7addb00bbe78061fa58442a6e9bdbd7f3e181c /src/plugins
parent6d733a93b2eb9c16196ee17d5cdc77db21589571 (diff)
session: async rx event notifications
Move from synchronous flushing of io and ctrl events from transports to applications to an async model via a new session_input input node that runs in interrupt mode. Events are coalesced per application worker. On the one hand, this helps by minimizing message queue locking churn. And on the other, it opens the possibility for further optimizations of event message generation, obviates need for rx rescheduling rpcs and is a first step towards a fully async data/io rx path. Type: improvement Signed-off-by: Florin Coras <fcoras@cisco.com> Change-Id: Id6bebcb65fc9feef8aa02ddf1af6d9ba6f6745ce
Diffstat (limited to 'src/plugins')
-rw-r--r--src/plugins/hs_apps/echo_client.c6
-rw-r--r--src/plugins/hs_apps/echo_server.c2
-rw-r--r--src/plugins/http/http.c9
-rw-r--r--src/plugins/quic/quic.c7
-rw-r--r--src/plugins/srtp/srtp.c5
-rw-r--r--src/plugins/unittest/session_test.c70
6 files changed, 82 insertions, 17 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c
index a7358a61952..d449045cf6a 100644
--- a/src/plugins/hs_apps/echo_client.c
+++ b/src/plugins/hs_apps/echo_client.c
@@ -706,10 +706,8 @@ ec_session_rx_callback (session_t *s)
receive_data_chunk (wrk, es);
if (svm_fifo_max_dequeue_cons (s->rx_fifo))
- {
- if (svm_fifo_set_event (s->rx_fifo))
- session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
- }
+ session_enqueue_notify (s);
+
return 0;
}
diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c
index 6a291163bd1..178e9eef4fb 100644
--- a/src/plugins/hs_apps/echo_server.c
+++ b/src/plugins/hs_apps/echo_server.c
@@ -228,6 +228,8 @@ echo_server_rx_callback (session_t * s)
/* Program self-tap to retry */
if (svm_fifo_set_event (rx_fifo))
{
+ /* TODO should be session_enqueue_notify(s) but quic tests seem
+ * to fail if that's the case */
if (session_send_io_evt_to_thread (rx_fifo,
SESSION_IO_EVT_BUILTIN_RX))
clib_warning ("failed to enqueue self-tap");
diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c
index d25123a757d..503752c377e 100644
--- a/src/plugins/http/http.c
+++ b/src/plugins/http/http.c
@@ -503,7 +503,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
hc->http_state = HTTP_STATE_WAIT_APP;
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return HTTP_SM_STOP;
@@ -777,7 +777,7 @@ state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp)
}
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return HTTP_SM_STOP;
}
@@ -808,7 +808,7 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as)
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
ASSERT (app_wrk);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return 1;
}
@@ -864,8 +864,9 @@ maybe_reschedule:
if (hc->rx_buf_offset < vec_len (hc->rx_buf) ||
svm_fifo_max_dequeue_cons (ts->rx_fifo))
{
+ /* TODO is the flag really needed? */
if (svm_fifo_set_event (ts->rx_fifo))
- session_send_io_evt_to_thread (ts->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+ session_enqueue_notify (ts);
}
return HTTP_SM_CONTINUE;
}
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index c3c4540353b..61380b8048c 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -830,7 +830,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
size_t len)
{
QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
- u32 max_enq, rv;
+ u32 max_enq;
quic_ctx_t *sctx;
session_t *stream_session;
app_worker_t *app_wrk;
@@ -895,10 +895,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
{
- rv = app_worker_lock_and_send_event (app_wrk, stream_session,
- SESSION_IO_EVT_RX);
- if (rv)
- QUIC_ERR ("Failed to ping app for RX");
+ app_worker_rx_notify (app_wrk, stream_session);
}
quic_ack_rx_data (stream_session);
}
diff --git a/src/plugins/srtp/srtp.c b/src/plugins/srtp/srtp.c
index aacadcee265..8b7c5b6374c 100644
--- a/src/plugins/srtp/srtp.c
+++ b/src/plugins/srtp/srtp.c
@@ -309,8 +309,7 @@ done:
int
srtp_add_vpp_q_builtin_rx_evt (session_t *s)
{
- if (svm_fifo_set_event (s->rx_fifo))
- session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+ session_enqueue_notify (s);
return 0;
}
@@ -320,7 +319,7 @@ srtp_notify_app_enqueue (srtp_tc_t *ctx, session_t *app_session)
app_worker_t *app_wrk;
app_wrk = app_worker_get_if_valid (app_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
- app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, app_session);
}
static inline int
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index c4e41c34dd0..70b3b32a2e4 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -1771,6 +1771,74 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd)
}
}
+/* Used to be part of application_worker.c prior to adding support for
+ * async rx
+ */
+static int
+test_mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
+ svm_msg_q_msg_t *msg)
+{
+ int rv, n_try = 0;
+
+ while (n_try < 75)
+ {
+ rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
+ if (!rv)
+ return 0;
+ /*
+ * Break the loop if mq is full, usually this is because the
+ * app has crashed or is hanging on somewhere.
+ */
+ if (rv != -1)
+ break;
+ n_try += 1;
+ usleep (1);
+ }
+
+ return -1;
+}
+
+/* Used to be part of application_worker.c prior to adding support for
+ * async rx and was used for delivering io events over mq
+ * NB: removed handling of mq congestion
+ */
+static inline int
+test_app_send_io_evt_rx (app_worker_t *app_wrk, session_t *s)
+{
+ svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
+ session_event_t *evt;
+ svm_msg_q_t *mq;
+ u32 app_session;
+ int rv;
+
+ if (app_worker_application_is_builtin (app_wrk))
+ return app_worker_rx_notify (app_wrk, s);
+
+ if (svm_fifo_has_event (s->rx_fifo))
+ return 0;
+
+ app_session = s->rx_fifo->shr->client_session_index;
+ mq = app_wrk->event_queue;
+
+ rv = test_mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
+
+ if (PREDICT_FALSE (rv))
+ {
+ clib_warning ("failed to alloc mq message");
+ return -1;
+ }
+
+ evt = svm_msg_q_msg_data (mq, mq_msg);
+ evt->event_type = SESSION_IO_EVT_RX;
+ evt->session_index = app_session;
+
+ (void) svm_fifo_set_event (s->rx_fifo);
+
+ svm_msg_q_add_and_unlock (mq, mq_msg);
+
+ return 0;
+}
+
static int
session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
{
@@ -1885,7 +1953,7 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
{
while (svm_fifo_has_event (rx_fifo))
;
- app_worker_lock_and_send_event (app_wrk, &s, SESSION_IO_EVT_RX);
+ test_app_send_io_evt_rx (app_wrk, &s);
}
}