summaryrefslogtreecommitdiffstats
path: root/src/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins')
-rw-r--r--src/plugins/hs_apps/echo_client.c6
-rw-r--r--src/plugins/hs_apps/echo_server.c2
-rw-r--r--src/plugins/http/http.c9
-rw-r--r--src/plugins/quic/quic.c7
-rw-r--r--src/plugins/srtp/srtp.c5
-rw-r--r--src/plugins/unittest/session_test.c70
6 files changed, 82 insertions, 17 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c
index a7358a61952..d449045cf6a 100644
--- a/src/plugins/hs_apps/echo_client.c
+++ b/src/plugins/hs_apps/echo_client.c
@@ -706,10 +706,8 @@ ec_session_rx_callback (session_t *s)
receive_data_chunk (wrk, es);
if (svm_fifo_max_dequeue_cons (s->rx_fifo))
- {
- if (svm_fifo_set_event (s->rx_fifo))
- session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
- }
+ session_enqueue_notify (s);
+
return 0;
}
diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c
index 6a291163bd1..178e9eef4fb 100644
--- a/src/plugins/hs_apps/echo_server.c
+++ b/src/plugins/hs_apps/echo_server.c
@@ -228,6 +228,8 @@ echo_server_rx_callback (session_t * s)
/* Program self-tap to retry */
if (svm_fifo_set_event (rx_fifo))
{
+ /* TODO should be session_enqueue_notify(s) but quic tests seem
+ * to fail if that's the case */
if (session_send_io_evt_to_thread (rx_fifo,
SESSION_IO_EVT_BUILTIN_RX))
clib_warning ("failed to enqueue self-tap");
diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c
index d25123a757d..503752c377e 100644
--- a/src/plugins/http/http.c
+++ b/src/plugins/http/http.c
@@ -503,7 +503,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
hc->http_state = HTTP_STATE_WAIT_APP;
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return HTTP_SM_STOP;
@@ -777,7 +777,7 @@ state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp)
}
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return HTTP_SM_STOP;
}
@@ -808,7 +808,7 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as)
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
ASSERT (app_wrk);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return 1;
}
@@ -864,8 +864,9 @@ maybe_reschedule:
if (hc->rx_buf_offset < vec_len (hc->rx_buf) ||
svm_fifo_max_dequeue_cons (ts->rx_fifo))
{
+ /* TODO is the flag really needed? */
if (svm_fifo_set_event (ts->rx_fifo))
- session_send_io_evt_to_thread (ts->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+ session_enqueue_notify (ts);
}
return HTTP_SM_CONTINUE;
}
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index c3c4540353b..61380b8048c 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -830,7 +830,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
size_t len)
{
QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
- u32 max_enq, rv;
+ u32 max_enq;
quic_ctx_t *sctx;
session_t *stream_session;
app_worker_t *app_wrk;
@@ -895,10 +895,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
{
- rv = app_worker_lock_and_send_event (app_wrk, stream_session,
- SESSION_IO_EVT_RX);
- if (rv)
- QUIC_ERR ("Failed to ping app for RX");
+ app_worker_rx_notify (app_wrk, stream_session);
}
quic_ack_rx_data (stream_session);
}
diff --git a/src/plugins/srtp/srtp.c b/src/plugins/srtp/srtp.c
index aacadcee265..8b7c5b6374c 100644
--- a/src/plugins/srtp/srtp.c
+++ b/src/plugins/srtp/srtp.c
@@ -309,8 +309,7 @@ done:
int
srtp_add_vpp_q_builtin_rx_evt (session_t *s)
{
- if (svm_fifo_set_event (s->rx_fifo))
- session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+ session_enqueue_notify (s);
return 0;
}
@@ -320,7 +319,7 @@ srtp_notify_app_enqueue (srtp_tc_t *ctx, session_t *app_session)
app_worker_t *app_wrk;
app_wrk = app_worker_get_if_valid (app_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
- app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, app_session);
}
static inline int
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index c4e41c34dd0..70b3b32a2e4 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -1771,6 +1771,74 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd)
}
}
+/* Used to be part of application_worker.c prior to adding support for
+ * async rx
+ */
+static int
+test_mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
+ svm_msg_q_msg_t *msg)
+{
+ int rv, n_try = 0;
+
+ while (n_try < 75)
+ {
+ rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
+ if (!rv)
+ return 0;
+ /*
+ * Break the loop if mq is full, usually this is because the
+ * app has crashed or is hanging on somewhere.
+ */
+ if (rv != -1)
+ break;
+ n_try += 1;
+ usleep (1);
+ }
+
+ return -1;
+}
+
+/* Used to be part of application_worker.c prior to adding support for
+ * async rx and was used for delivering io events over mq
+ * NB: removed handling of mq congestion
+ */
+static inline int
+test_app_send_io_evt_rx (app_worker_t *app_wrk, session_t *s)
+{
+ svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
+ session_event_t *evt;
+ svm_msg_q_t *mq;
+ u32 app_session;
+ int rv;
+
+ if (app_worker_application_is_builtin (app_wrk))
+ return app_worker_rx_notify (app_wrk, s);
+
+ if (svm_fifo_has_event (s->rx_fifo))
+ return 0;
+
+ app_session = s->rx_fifo->shr->client_session_index;
+ mq = app_wrk->event_queue;
+
+ rv = test_mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
+
+ if (PREDICT_FALSE (rv))
+ {
+ clib_warning ("failed to alloc mq message");
+ return -1;
+ }
+
+ evt = svm_msg_q_msg_data (mq, mq_msg);
+ evt->event_type = SESSION_IO_EVT_RX;
+ evt->session_index = app_session;
+
+ (void) svm_fifo_set_event (s->rx_fifo);
+
+ svm_msg_q_add_and_unlock (mq, mq_msg);
+
+ return 0;
+}
+
static int
session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
{
@@ -1885,7 +1953,7 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
{
while (svm_fifo_has_event (rx_fifo))
;
- app_worker_lock_and_send_event (app_wrk, &s, SESSION_IO_EVT_RX);
+ test_app_send_io_evt_rx (app_wrk, &s);
}
}