summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/plugins/hs_apps/echo_client.c6
-rw-r--r--src/plugins/hs_apps/echo_server.c2
-rw-r--r--src/plugins/http/http.c9
-rw-r--r--src/plugins/quic/quic.c7
-rw-r--r--src/plugins/srtp/srtp.c5
-rw-r--r--src/plugins/unittest/session_test.c70
-rw-r--r--src/svm/message_queue.c10
-rw-r--r--src/svm/message_queue.h11
-rw-r--r--src/vnet/CMakeLists.txt1
-rw-r--r--src/vnet/session/application.c8
-rw-r--r--src/vnet/session/application.h47
-rw-r--r--src/vnet/session/application_local.c40
-rw-r--r--src/vnet/session/application_worker.c406
-rw-r--r--src/vnet/session/segment_manager.c2
-rw-r--r--src/vnet/session/session.c517
-rw-r--r--src/vnet/session/session.h25
-rw-r--r--src/vnet/session/session_api.c50
-rw-r--r--src/vnet/session/session_input.c296
-rw-r--r--src/vnet/session/session_node.c21
-rw-r--r--src/vnet/session/session_types.h3
-rw-r--r--src/vnet/tcp/tcp_input.c19
-rw-r--r--src/vnet/tls/tls.c8
-rw-r--r--src/vnet/udp/udp_input.c15
23 files changed, 949 insertions, 629 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c
index a7358a61952..d449045cf6a 100644
--- a/src/plugins/hs_apps/echo_client.c
+++ b/src/plugins/hs_apps/echo_client.c
@@ -706,10 +706,8 @@ ec_session_rx_callback (session_t *s)
receive_data_chunk (wrk, es);
if (svm_fifo_max_dequeue_cons (s->rx_fifo))
- {
- if (svm_fifo_set_event (s->rx_fifo))
- session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
- }
+ session_enqueue_notify (s);
+
return 0;
}
diff --git a/src/plugins/hs_apps/echo_server.c b/src/plugins/hs_apps/echo_server.c
index 6a291163bd1..178e9eef4fb 100644
--- a/src/plugins/hs_apps/echo_server.c
+++ b/src/plugins/hs_apps/echo_server.c
@@ -228,6 +228,8 @@ echo_server_rx_callback (session_t * s)
/* Program self-tap to retry */
if (svm_fifo_set_event (rx_fifo))
{
+ /* TODO should be session_enqueue_notify(s) but quic tests seem
+ * to fail if that's the case */
if (session_send_io_evt_to_thread (rx_fifo,
SESSION_IO_EVT_BUILTIN_RX))
clib_warning ("failed to enqueue self-tap");
diff --git a/src/plugins/http/http.c b/src/plugins/http/http.c
index d25123a757d..503752c377e 100644
--- a/src/plugins/http/http.c
+++ b/src/plugins/http/http.c
@@ -503,7 +503,7 @@ state_srv_wait_method (http_conn_t *hc, transport_send_params_t *sp)
hc->http_state = HTTP_STATE_WAIT_APP;
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return HTTP_SM_STOP;
@@ -777,7 +777,7 @@ state_cln_wait_method (http_conn_t *hc, transport_send_params_t *sp)
}
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return HTTP_SM_STOP;
}
@@ -808,7 +808,7 @@ cln_drain_rx_buf (http_conn_t *hc, session_t *ts, session_t *as)
app_wrk = app_worker_get_if_valid (as->app_wrk_index);
ASSERT (app_wrk);
- app_worker_lock_and_send_event (app_wrk, as, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, as);
return 1;
}
@@ -864,8 +864,9 @@ maybe_reschedule:
if (hc->rx_buf_offset < vec_len (hc->rx_buf) ||
svm_fifo_max_dequeue_cons (ts->rx_fifo))
{
+ /* TODO is the flag really needed? */
if (svm_fifo_set_event (ts->rx_fifo))
- session_send_io_evt_to_thread (ts->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+ session_enqueue_notify (ts);
}
return HTTP_SM_CONTINUE;
}
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index c3c4540353b..61380b8048c 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -830,7 +830,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
size_t len)
{
QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
- u32 max_enq, rv;
+ u32 max_enq;
quic_ctx_t *sctx;
session_t *stream_session;
app_worker_t *app_wrk;
@@ -895,10 +895,7 @@ quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
{
- rv = app_worker_lock_and_send_event (app_wrk, stream_session,
- SESSION_IO_EVT_RX);
- if (rv)
- QUIC_ERR ("Failed to ping app for RX");
+ app_worker_rx_notify (app_wrk, stream_session);
}
quic_ack_rx_data (stream_session);
}
diff --git a/src/plugins/srtp/srtp.c b/src/plugins/srtp/srtp.c
index aacadcee265..8b7c5b6374c 100644
--- a/src/plugins/srtp/srtp.c
+++ b/src/plugins/srtp/srtp.c
@@ -309,8 +309,7 @@ done:
int
srtp_add_vpp_q_builtin_rx_evt (session_t *s)
{
- if (svm_fifo_set_event (s->rx_fifo))
- session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+ session_enqueue_notify (s);
return 0;
}
@@ -320,7 +319,7 @@ srtp_notify_app_enqueue (srtp_tc_t *ctx, session_t *app_session)
app_worker_t *app_wrk;
app_wrk = app_worker_get_if_valid (app_session->app_wrk_index);
if (PREDICT_TRUE (app_wrk != 0))
- app_worker_lock_and_send_event (app_wrk, app_session, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, app_session);
}
static inline int
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index c4e41c34dd0..70b3b32a2e4 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -1771,6 +1771,74 @@ wait_for_event (svm_msg_q_t * mq, int fd, int epfd, u8 use_eventfd)
}
}
+/* Used to be part of application_worker.c prior to adding support for
+ * async rx
+ */
+static int
+test_mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
+ svm_msg_q_msg_t *msg)
+{
+ int rv, n_try = 0;
+
+ while (n_try < 75)
+ {
+ rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
+ if (!rv)
+ return 0;
+ /*
+ * Break the loop if mq is full, usually this is because the
+ * app has crashed or is hanging on somewhere.
+ */
+ if (rv != -1)
+ break;
+ n_try += 1;
+ usleep (1);
+ }
+
+ return -1;
+}
+
+/* Used to be part of application_worker.c prior to adding support for
+ * async rx and was used for delivering io events over mq
+ * NB: removed handling of mq congestion
+ */
+static inline int
+test_app_send_io_evt_rx (app_worker_t *app_wrk, session_t *s)
+{
+ svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
+ session_event_t *evt;
+ svm_msg_q_t *mq;
+ u32 app_session;
+ int rv;
+
+ if (app_worker_application_is_builtin (app_wrk))
+ return app_worker_rx_notify (app_wrk, s);
+
+ if (svm_fifo_has_event (s->rx_fifo))
+ return 0;
+
+ app_session = s->rx_fifo->shr->client_session_index;
+ mq = app_wrk->event_queue;
+
+ rv = test_mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
+
+ if (PREDICT_FALSE (rv))
+ {
+ clib_warning ("failed to alloc mq message");
+ return -1;
+ }
+
+ evt = svm_msg_q_msg_data (mq, mq_msg);
+ evt->event_type = SESSION_IO_EVT_RX;
+ evt->session_index = app_session;
+
+ (void) svm_fifo_set_event (s->rx_fifo);
+
+ svm_msg_q_add_and_unlock (mq, mq_msg);
+
+ return 0;
+}
+
static int
session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
{
@@ -1885,7 +1953,7 @@ session_test_mq_speed (vlib_main_t * vm, unformat_input_t * input)
{
while (svm_fifo_has_event (rx_fifo))
;
- app_worker_lock_and_send_event (app_wrk, &s, SESSION_IO_EVT_RX);
+ test_app_send_io_evt_rx (app_wrk, &s);
}
}
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index 2880645b427..ab0d230b1f0 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -340,15 +340,15 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
return (dist1 < dist2);
}
-static void
-svm_msg_q_add_raw (svm_msg_q_t *mq, u8 *elem)
+void
+svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
{
svm_msg_q_shared_queue_t *sq = mq->q.shr;
i8 *tailp;
u32 sz;
tailp = (i8 *) (&sq->data[0] + sq->elsize * sq->tail);
- clib_memcpy_fast (tailp, elem, sq->elsize);
+ clib_memcpy_fast (tailp, msg, sq->elsize);
sq->tail = (sq->tail + 1) % sq->maxsize;
@@ -381,7 +381,7 @@ svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
svm_msg_q_wait_prod (mq);
}
- svm_msg_q_add_raw (mq, (u8 *) msg);
+ svm_msg_q_add_raw (mq, msg);
svm_msg_q_unlock (mq);
@@ -392,7 +392,7 @@ void
svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
- svm_msg_q_add_raw (mq, (u8 *) msg);
+ svm_msg_q_add_raw (mq, msg);
svm_msg_q_unlock (mq);
}
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index 0780cca1c32..4473c44f4e3 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -193,6 +193,17 @@ void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
/**
* Producer enqueue one message to queue
*
+ * Must be called with mq locked. Prior to calling this, the producer should've
+ * obtained a message buffer from one of the rings.
+ *
+ * @param mq message queue
+ * @param msg message to be enqueued
+ */
+void svm_msg_q_add_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *msg);
+
+/**
+ * Producer enqueue one message to queue
+ *
* Prior to calling this, the producer should've obtained a message buffer
* from one of the rings by calling @ref svm_msg_q_alloc_msg.
*
diff --git a/src/vnet/CMakeLists.txt b/src/vnet/CMakeLists.txt
index 5aada92e8d9..5e913dffdea 100644
--- a/src/vnet/CMakeLists.txt
+++ b/src/vnet/CMakeLists.txt
@@ -1018,6 +1018,7 @@ list(APPEND VNET_SOURCES
session/session_rules_table.c
session/session_lookup.c
session/session_node.c
+ session/session_input.c
session/transport.c
session/application.c
session/application_worker.c
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index cf867401ea2..fdd5a0a67cd 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -725,6 +725,12 @@ application_get_if_valid (u32 app_index)
return pool_elt_at_index (app_main.app_pool, app_index);
}
+static int
+_null_app_tx_callback (session_t *s)
+{
+ return 0;
+}
+
static void
application_verify_cb_fns (session_cb_vft_t * cb_fns)
{
@@ -736,6 +742,8 @@ application_verify_cb_fns (session_cb_vft_t * cb_fns)
clib_warning ("No session disconnect callback function provided");
if (cb_fns->session_reset_callback == 0)
clib_warning ("No session reset callback function provided");
+ if (!cb_fns->builtin_app_tx_callback)
+ cb_fns->builtin_app_tx_callback = _null_app_tx_callback;
}
/**
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index c3d6180022f..5505d91ea09 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -77,17 +77,17 @@ typedef struct app_worker_
/** Pool of half-open session handles. Tracked in case worker detaches */
session_handle_t *half_open_table;
+ /* Per vpp worker fifos of events for app worker */
+ session_event_t **wrk_evts;
+
+ /* Vector of vpp workers mq congestion flags */
+ u8 *wrk_mq_congested;
+
/** Protects detached seg managers */
clib_spinlock_t detached_seg_managers_lock;
/** Vector of detached listener segment managers */
u32 *detached_seg_managers;
-
- /** Fifo of messages postponed because of mq congestion */
- app_wrk_postponed_msg_t *postponed_mq_msgs;
-
- /** Lock to add/sub message from ref @postponed_mq_msgs */
- clib_spinlock_t postponed_mq_msgs_lock;
} app_worker_t;
typedef struct app_worker_map_
@@ -317,6 +317,12 @@ void application_enable_rx_mqs_nodes (u8 is_en);
* App worker
*/
+always_inline u8
+app_worker_mq_is_congested (app_worker_t *app_wrk)
+{
+ return app_wrk->mq_congested > 0;
+}
+
app_worker_t *app_worker_alloc (application_t * app);
int application_alloc_worker_and_init (application_t * app,
app_worker_t ** wrk);
@@ -331,6 +337,10 @@ session_error_t app_worker_start_listen (app_worker_t *app_wrk,
app_listener_t *lstnr);
int app_worker_stop_listen (app_worker_t * app_wrk, app_listener_t * al);
int app_worker_init_accepted (session_t * s);
+int app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
+ u32 opaque, int err);
+int app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
+ u32 opaque, session_error_t err);
int app_worker_accept_notify (app_worker_t * app_wrk, session_t * s);
int app_worker_init_connected (app_worker_t * app_wrk, session_t * s);
int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
@@ -343,13 +353,21 @@ int app_worker_transport_closed_notify (app_worker_t * app_wrk,
int app_worker_reset_notify (app_worker_t * app_wrk, session_t * s);
int app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
session_cleanup_ntf_t ntf);
+int app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
+ session_cleanup_ntf_t ntf,
+ void (*cleanup_cb) (session_t *s));
int app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
session_handle_t new_sh);
-int app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s);
-int app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s);
+int app_worker_rx_notify (app_worker_t *app_wrk, session_t *s);
int app_worker_session_fifo_tuning (app_worker_t * app_wrk, session_t * s,
svm_fifo_t * f,
session_ft_action_t act, u32 len);
+void app_worker_add_event (app_worker_t *app_wrk, session_t *s,
+ session_evt_type_t evt_type);
+void app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
+ session_event_t *evt);
+int app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index);
+void app_worker_del_all_events (app_worker_t *app_wrk);
segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
session_t *);
segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
@@ -364,9 +382,10 @@ void app_wrk_send_ctrl_evt_fd (app_worker_t *app_wrk, u8 evt_type, void *msg,
u32 msg_len, int fd);
void app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
u32 msg_len);
-int app_worker_send_event (app_worker_t * app, session_t * s, u8 evt);
-int app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
- u8 evt_type);
+u8 app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index);
+void app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index);
+void app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk,
+ u32 thread_index);
session_t *app_worker_proxy_listener (app_worker_t * app, u8 fib_proto,
u8 transport_proto);
void app_worker_del_detached_sm (app_worker_t * app_wrk, u32 sm_index);
@@ -395,6 +414,12 @@ void sapi_socket_close_w_handle (u32 api_handle);
crypto_engine_type_t app_crypto_engine_type_add (void);
u8 app_crypto_engine_n_types (void);
+static inline u8
+app_worker_application_is_builtin (app_worker_t *app_wrk)
+{
+ return app_wrk->app_is_builtin;
+}
+
#endif /* SRC_VNET_SESSION_APPLICATION_H_ */
/*
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c
index 6ac4da2c655..192c22b659a 100644
--- a/src/vnet/session/application_local.c
+++ b/src/vnet/session/application_local.c
@@ -1028,6 +1028,17 @@ ct_close_is_reset (ct_connection_t *ct, session_t *s)
}
static void
+ct_session_cleanup_server_session (session_t *s)
+{
+ ct_connection_t *ct;
+
+ ct = (ct_connection_t *) session_get_transport (s);
+ ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
+ session_free (s);
+ ct_connection_free (ct);
+}
+
+static void
ct_session_postponed_cleanup (ct_connection_t *ct)
{
ct_connection_t *peer_ct;
@@ -1047,33 +1058,38 @@ ct_session_postponed_cleanup (ct_connection_t *ct)
}
session_transport_closed_notify (&ct->connection);
+ /* It would be cleaner to call session_transport_delete_notify
+ * but then we can't control session cleanup lower */
+ session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
+ if (app_wrk)
+ app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
+
if (ct->flags & CT_CONN_F_CLIENT)
{
- if (app_wrk)
- app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
-
/* Normal free for client session as the fifos are allocated through
* the connects segment manager in a segment that's not shared with
* the server */
ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
- session_free_w_fifos (s);
+ session_program_cleanup (s);
+ ct_connection_free (ct);
}
else
{
/* Manual session and fifo segment cleanup to avoid implicit
* segment manager cleanups and notifications */
- app_wrk = app_worker_get_if_valid (s->app_wrk_index);
if (app_wrk)
{
- app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
- app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
+ /* Remove custom cleanup notify infra when/if switching to normal
+ * session cleanup. Note that ct is freed in the cb function */
+ app_worker_cleanup_notify_custom (app_wrk, s,
+ SESSION_CLEANUP_SESSION,
+ ct_session_cleanup_server_session);
+ }
+ else
+ {
+ ct_connection_free (ct);
}
-
- ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
- session_free (s);
}
-
- ct_connection_free (ct);
}
static void
diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c
index c3941d1fd7b..127963a1eda 100644
--- a/src/vnet/session/application_worker.c
+++ b/src/vnet/session/application_worker.c
@@ -26,6 +26,7 @@ app_worker_t *
app_worker_alloc (application_t * app)
{
app_worker_t *app_wrk;
+
pool_get (app_workers, app_wrk);
clib_memset (app_wrk, 0, sizeof (*app_wrk));
app_wrk->wrk_index = app_wrk - app_workers;
@@ -33,7 +34,8 @@ app_worker_alloc (application_t * app)
app_wrk->wrk_map_index = ~0;
app_wrk->connects_seg_manager = APP_INVALID_SEGMENT_MANAGER_INDEX;
clib_spinlock_init (&app_wrk->detached_seg_managers_lock);
- clib_spinlock_init (&app_wrk->postponed_mq_msgs_lock);
+ vec_validate (app_wrk->wrk_evts, vlib_num_workers ());
+ vec_validate (app_wrk->wrk_mq_congested, vlib_num_workers ());
APP_DBG ("New app %v worker %u", app->name, app_wrk->wrk_index);
return app_wrk;
}
@@ -65,17 +67,25 @@ app_worker_free (app_worker_t * app_wrk)
int i;
/*
+ * Cleanup vpp wrk events
+ */
+ app_worker_del_all_events (app_wrk);
+ for (i = 0; i < vec_len (app_wrk->wrk_evts); i++)
+ clib_fifo_free (app_wrk->wrk_evts[i]);
+
+ vec_free (app_wrk->wrk_evts);
+ vec_free (app_wrk->wrk_mq_congested);
+
+ /*
* Listener cleanup
*/
- /* *INDENT-OFF* */
hash_foreach (handle, sm_index, app_wrk->listeners_table, ({
ls = listen_session_get_from_handle (handle);
vec_add1 (handles, app_listen_session_handle (ls));
vec_add1 (sm_indices, sm_index);
sm = segment_manager_get (sm_index);
}));
- /* *INDENT-ON* */
for (i = 0; i < vec_len (handles); i++)
{
@@ -127,7 +137,6 @@ app_worker_free (app_worker_t * app_wrk)
}
vec_free (app_wrk->detached_seg_managers);
clib_spinlock_free (&app_wrk->detached_seg_managers_lock);
- clib_spinlock_free (&app_wrk->postponed_mq_msgs_lock);
if (CLIB_DEBUG)
clib_memset (app_wrk, 0xfe, sizeof (*app_wrk));
@@ -339,7 +348,7 @@ app_worker_init_accepted (session_t * s)
listener = listen_session_get_from_handle (s->listener_handle);
app_wrk = application_listener_select_worker (listener);
- if (PREDICT_FALSE (app_wrk->mq_congested))
+ if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
return -1;
s->app_wrk_index = app_wrk->wrk_index;
@@ -355,10 +364,39 @@ app_worker_init_accepted (session_t * s)
}
int
+app_worker_listened_notify (app_worker_t *app_wrk, session_handle_t alsh,
+ u32 opaque, session_error_t err)
+{
+ session_event_t evt;
+
+ evt.event_type = SESSION_CTRL_EVT_BOUND;
+ evt.session_handle = alsh;
+ evt.as_u64[1] = (u64) opaque << 32 | err;
+
+ app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
+
+ return 0;
+}
+
+int
+app_worker_unlisten_reply (app_worker_t *app_wrk, session_handle_t sh,
+ u32 opaque, session_error_t err)
+{
+ session_event_t evt = {};
+
+ evt.event_type = SESSION_CTRL_EVT_UNLISTEN_REPLY;
+ evt.session_handle = sh;
+ evt.as_u64[1] = (u64) opaque << 32 | (u32) err;
+
+ app_worker_add_event_custom (app_wrk, 0 /* thread index */, &evt);
+ return 0;
+}
+
+int
app_worker_accept_notify (app_worker_t * app_wrk, session_t * s)
{
- application_t *app = application_get (app_wrk->app_index);
- return app->cb_fns.session_accept_callback (s);
+ app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_ACCEPTED);
+ return 0;
}
int
@@ -382,9 +420,16 @@ int
app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
session_error_t err, u32 opaque)
{
- application_t *app = application_get (app_wrk->app_index);
- return app->cb_fns.session_connected_callback (app_wrk->wrk_index, opaque,
- s, err);
+ session_event_t evt = {};
+ u32 thread_index;
+
+ evt.event_type = SESSION_CTRL_EVT_CONNECTED;
+ evt.session_index = s ? s->session_index : ~0;
+ evt.as_u64[1] = (u64) opaque << 32 | (u32) err;
+ thread_index = s ? s->thread_index : vlib_get_thread_index ();
+
+ app_worker_add_event_custom (app_wrk, thread_index, &evt);
+ return 0;
}
int
@@ -402,36 +447,28 @@ app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
int
app_worker_del_half_open (app_worker_t *app_wrk, session_t *s)
{
- application_t *app = application_get (app_wrk->app_index);
- ASSERT (session_vlib_thread_is_cl_thread ());
- pool_put_index (app_wrk->half_open_table, s->ho_index);
- if (app->cb_fns.half_open_cleanup_callback)
- app->cb_fns.half_open_cleanup_callback (s);
+ app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_HALF_CLEANUP);
return 0;
}
int
app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
{
- application_t *app = application_get (app_wrk->app_index);
- app->cb_fns.session_disconnect_callback (s);
+ app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_DISCONNECTED);
return 0;
}
int
app_worker_transport_closed_notify (app_worker_t * app_wrk, session_t * s)
{
- application_t *app = application_get (app_wrk->app_index);
- if (app->cb_fns.session_transport_closed_callback)
- app->cb_fns.session_transport_closed_callback (s);
+ app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_TRANSPORT_CLOSED);
return 0;
}
int
app_worker_reset_notify (app_worker_t * app_wrk, session_t * s)
{
- application_t *app = application_get (app_wrk->app_index);
- app->cb_fns.session_reset_callback (s);
+ app_worker_add_event (app_wrk, s, SESSION_CTRL_EVT_RESET);
return 0;
}
@@ -439,29 +476,37 @@ int
app_worker_cleanup_notify (app_worker_t * app_wrk, session_t * s,
session_cleanup_ntf_t ntf)
{
- application_t *app = application_get (app_wrk->app_index);
- if (app->cb_fns.session_cleanup_callback)
- app->cb_fns.session_cleanup_callback (s, ntf);
+ session_event_t evt;
+
+ evt.event_type = SESSION_CTRL_EVT_CLEANUP;
+ evt.as_u64[0] = (u64) ntf << 32 | s->session_index;
+ evt.as_u64[1] = pointer_to_uword (session_cleanup);
+
+ app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
+
return 0;
}
int
-app_worker_builtin_rx (app_worker_t * app_wrk, session_t * s)
+app_worker_cleanup_notify_custom (app_worker_t *app_wrk, session_t *s,
+ session_cleanup_ntf_t ntf,
+ void (*cleanup_cb) (session_t *s))
{
- application_t *app = application_get (app_wrk->app_index);
- app->cb_fns.builtin_app_rx_callback (s);
+ session_event_t evt;
+
+ evt.event_type = SESSION_CTRL_EVT_CLEANUP;
+ evt.as_u64[0] = (u64) ntf << 32 | s->session_index;
+ evt.as_u64[1] = pointer_to_uword (cleanup_cb);
+
+ app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
+
return 0;
}
int
-app_worker_builtin_tx (app_worker_t * app_wrk, session_t * s)
+app_worker_rx_notify (app_worker_t *app_wrk, session_t *s)
{
- application_t *app = application_get (app_wrk->app_index);
-
- if (!app->cb_fns.builtin_app_tx_callback)
- return 0;
-
- app->cb_fns.builtin_app_tx_callback (s);
+ app_worker_add_event (app_wrk, s, SESSION_IO_EVT_RX);
return 0;
}
@@ -469,8 +514,13 @@ int
app_worker_migrate_notify (app_worker_t * app_wrk, session_t * s,
session_handle_t new_sh)
{
- application_t *app = application_get (app_wrk->app_index);
- app->cb_fns.session_migrate_callback (s, new_sh);
+ session_event_t evt;
+
+ evt.event_type = SESSION_CTRL_EVT_MIGRATED;
+ evt.session_index = s->session_index;
+ evt.as_u64[1] = new_sh;
+
+ app_worker_add_event_custom (app_wrk, s->thread_index, &evt);
return 0;
}
@@ -514,7 +564,7 @@ int
app_worker_connect_session (app_worker_t *app_wrk, session_endpoint_cfg_t *sep,
session_handle_t *rsh)
{
- if (PREDICT_FALSE (app_wrk->mq_congested))
+ if (PREDICT_FALSE (app_worker_mq_is_congested (app_wrk)))
return SESSION_E_REFUSED;
sep->app_wrk_index = app_wrk->wrk_index;
@@ -601,24 +651,27 @@ app_worker_proxy_listener (app_worker_t * app_wrk, u8 fib_proto,
int
app_worker_add_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
{
- application_t *app = application_get (app_wrk->app_index);
+ session_event_t evt;
+
+ evt.event_type = SESSION_CTRL_EVT_APP_ADD_SEGMENT;
+ evt.as_u64[1] = segment_handle;
- return app->cb_fns.add_segment_callback (app_wrk->wrk_index,
- segment_handle);
+ app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
+
+ return 0;
}
int
app_worker_del_segment_notify (app_worker_t * app_wrk, u64 segment_handle)
{
- application_t *app = application_get (app_wrk->app_index);
- return app->cb_fns.del_segment_callback (app_wrk->wrk_index,
- segment_handle);
-}
+ session_event_t evt;
-static inline u8
-app_worker_application_is_builtin (app_worker_t * app_wrk)
-{
- return app_wrk->app_is_builtin;
+ evt.event_type = SESSION_CTRL_EVT_APP_DEL_SEGMENT;
+ evt.as_u64[1] = segment_handle;
+
+ app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
+
+ return 0;
}
static int
@@ -677,126 +730,38 @@ app_wrk_send_fd (app_worker_t *app_wrk, int fd)
return 0;
}
-static int
-mq_try_lock_and_alloc_msg (svm_msg_q_t *mq, session_mq_rings_e ring,
- svm_msg_q_msg_t *msg)
-{
- int rv, n_try = 0;
-
- while (n_try < 75)
- {
- rv = svm_msg_q_lock_and_alloc_msg_w_ring (mq, ring, SVM_Q_NOWAIT, msg);
- if (!rv)
- return 0;
- /*
- * Break the loop if mq is full, usually this is because the
- * app has crashed or is hanging on somewhere.
- */
- if (rv != -1)
- break;
- n_try += 1;
- usleep (1);
- }
-
- return -1;
-}
-
-typedef union app_wrk_mq_rpc_args_
-{
- struct
- {
- u32 thread_index;
- u32 app_wrk_index;
- };
- uword as_uword;
-} app_wrk_mq_rpc_ags_t;
-
-static int
-app_wrk_handle_mq_postponed_msgs (void *arg)
+void
+app_worker_add_event (app_worker_t *app_wrk, session_t *s,
+ session_evt_type_t evt_type)
{
- svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
- app_wrk_postponed_msg_t *pm;
- app_wrk_mq_rpc_ags_t args;
- u32 max_msg, n_msg = 0;
- app_worker_t *app_wrk;
session_event_t *evt;
- svm_msg_q_t *mq;
-
- args.as_uword = pointer_to_uword (arg);
- app_wrk = app_worker_get_if_valid (args.app_wrk_index);
- if (!app_wrk)
- return 0;
-
- mq = app_wrk->event_queue;
-
- clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
-
- max_msg = clib_min (32, clib_fifo_elts (app_wrk->postponed_mq_msgs));
-
- while (n_msg < max_msg)
- {
- pm = clib_fifo_head (app_wrk->postponed_mq_msgs);
- if (mq_try_lock_and_alloc_msg (mq, pm->ring, mq_msg))
- break;
- evt = svm_msg_q_msg_data (mq, mq_msg);
- clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = pm->event_type;
- clib_memcpy_fast (evt->data, pm->data, pm->len);
-
- if (pm->fd != -1)
- app_wrk_send_fd (app_wrk, pm->fd);
-
- svm_msg_q_add_and_unlock (mq, mq_msg);
-
- clib_fifo_advance_head (app_wrk->postponed_mq_msgs, 1);
- n_msg += 1;
- }
+ ASSERT (s->thread_index == vlib_get_thread_index ());
+ clib_fifo_add2 (app_wrk->wrk_evts[s->thread_index], evt);
+ evt->session_index = s->session_index;
+ evt->event_type = evt_type;
+ evt->postponed = 0;
- if (!clib_fifo_elts (app_wrk->postponed_mq_msgs))
+ /* First event for this app_wrk. Schedule it for handling in session input */
+ if (clib_fifo_elts (app_wrk->wrk_evts[s->thread_index]) == 1)
{
- app_wrk->mq_congested = 0;
+ session_worker_t *wrk = session_main_get_worker (s->thread_index);
+ session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
}
- else
- {
- session_send_rpc_evt_to_thread_force (
- args.thread_index, app_wrk_handle_mq_postponed_msgs,
- uword_to_pointer (args.as_uword, void *));
- }
-
- clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
-
- return 0;
}
-static void
-app_wrk_add_mq_postponed_msg (app_worker_t *app_wrk, session_mq_rings_e ring,
- u8 evt_type, void *msg, u32 msg_len, int fd)
+void
+app_worker_add_event_custom (app_worker_t *app_wrk, u32 thread_index,
+ session_event_t *evt)
{
- app_wrk_postponed_msg_t *pm;
-
- clib_spinlock_lock (&app_wrk->postponed_mq_msgs_lock);
+ clib_fifo_add1 (app_wrk->wrk_evts[thread_index], *evt);
- app_wrk->mq_congested = 1;
-
- clib_fifo_add2 (app_wrk->postponed_mq_msgs, pm);
- clib_memcpy_fast (pm->data, msg, msg_len);
- pm->event_type = evt_type;
- pm->ring = ring;
- pm->len = msg_len;
- pm->fd = fd;
-
- if (clib_fifo_elts (app_wrk->postponed_mq_msgs) == 1)
+ /* First event for this app_wrk. Schedule it for handling in session input */
+ if (clib_fifo_elts (app_wrk->wrk_evts[thread_index]) == 1)
{
- app_wrk_mq_rpc_ags_t args = { .thread_index = vlib_get_thread_index (),
- .app_wrk_index = app_wrk->wrk_index };
-
- session_send_rpc_evt_to_thread_force (
- args.thread_index, app_wrk_handle_mq_postponed_msgs,
- uword_to_pointer (args.as_uword, void *));
+ session_worker_t *wrk = session_main_get_worker (thread_index);
+ session_wrk_program_app_wrk_evts (wrk, app_wrk->wrk_index);
}
-
- clib_spinlock_unlock (&app_wrk->postponed_mq_msgs_lock);
}
always_inline void
@@ -806,14 +771,9 @@ app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
svm_msg_q_msg_t _mq_msg, *mq_msg = &_mq_msg;
svm_msg_q_t *mq = app_wrk->event_queue;
session_event_t *evt;
- int rv;
-
- if (PREDICT_FALSE (app_wrk->mq_congested))
- goto handle_congestion;
- rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_CTRL_EVT_RING, mq_msg);
- if (PREDICT_FALSE (rv))
- goto handle_congestion;
+ ASSERT (!svm_msg_q_or_ring_is_full (mq, SESSION_MQ_CTRL_EVT_RING));
+ *mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_CTRL_EVT_RING);
evt = svm_msg_q_msg_data (mq, mq_msg);
clib_memset (evt, 0, sizeof (*evt));
@@ -823,14 +783,7 @@ app_wrk_send_ctrl_evt_inline (app_worker_t *app_wrk, u8 evt_type, void *msg,
if (fd != -1)
app_wrk_send_fd (app_wrk, fd);
- svm_msg_q_add_and_unlock (mq, mq_msg);
-
- return;
-
-handle_congestion:
-
- app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_CTRL_EVT_RING, evt_type,
- msg, msg_len, fd);
+ svm_msg_q_add_raw (mq, mq_msg);
}
void
@@ -847,109 +800,26 @@ app_wrk_send_ctrl_evt (app_worker_t *app_wrk, u8 evt_type, void *msg,
app_wrk_send_ctrl_evt_inline (app_wrk, evt_type, msg, msg_len, -1);
}
-static inline int
-app_send_io_evt_rx (app_worker_t * app_wrk, session_t * s)
+u8
+app_worker_mq_wrk_is_congested (app_worker_t *app_wrk, u32 thread_index)
{
- svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
- session_event_t *evt;
- svm_msg_q_t *mq;
- u32 app_session;
- int rv;
-
- if (app_worker_application_is_builtin (app_wrk))
- return app_worker_builtin_rx (app_wrk, s);
-
- if (svm_fifo_has_event (s->rx_fifo))
- return 0;
-
- app_session = s->rx_fifo->shr->client_session_index;
- mq = app_wrk->event_queue;
-
- if (PREDICT_FALSE (app_wrk->mq_congested))
- goto handle_congestion;
-
- rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
-
- if (PREDICT_FALSE (rv))
- goto handle_congestion;
-
- evt = svm_msg_q_msg_data (mq, mq_msg);
- evt->event_type = SESSION_IO_EVT_RX;
- evt->session_index = app_session;
-
- (void) svm_fifo_set_event (s->rx_fifo);
-
- svm_msg_q_add_and_unlock (mq, mq_msg);
-
- return 0;
-
-handle_congestion:
-
- app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
- SESSION_IO_EVT_RX, &app_session,
- sizeof (app_session), -1);
- return -1;
+ return app_wrk->wrk_mq_congested[thread_index] > 0;
}
-static inline int
-app_send_io_evt_tx (app_worker_t * app_wrk, session_t * s)
+void
+app_worker_set_mq_wrk_congested (app_worker_t *app_wrk, u32 thread_index)
{
- svm_msg_q_msg_t _mq_msg = { 0 }, *mq_msg = &_mq_msg;
- session_event_t *evt;
- svm_msg_q_t *mq;
- u32 app_session;
- int rv;
-
- if (app_worker_application_is_builtin (app_wrk))
- return app_worker_builtin_tx (app_wrk, s);
-
- app_session = s->tx_fifo->shr->client_session_index;
- mq = app_wrk->event_queue;
-
- if (PREDICT_FALSE (app_wrk->mq_congested))
- goto handle_congestion;
-
- rv = mq_try_lock_and_alloc_msg (mq, SESSION_MQ_IO_EVT_RING, mq_msg);
-
- if (PREDICT_FALSE (rv))
- goto handle_congestion;
-
- evt = svm_msg_q_msg_data (mq, mq_msg);
- evt->event_type = SESSION_IO_EVT_TX;
- evt->session_index = app_session;
-
- svm_msg_q_add_and_unlock (mq, mq_msg);
-
- return 0;
-
-handle_congestion:
-
- app_wrk_add_mq_postponed_msg (app_wrk, SESSION_MQ_IO_EVT_RING,
- SESSION_IO_EVT_TX, &app_session,
- sizeof (app_session), -1);
- return -1;
+ clib_atomic_fetch_add_relax (&app_wrk->mq_congested, 1);
+ ASSERT (thread_index == vlib_get_thread_index ());
+ app_wrk->wrk_mq_congested[thread_index] = 1;
}
-/* *INDENT-OFF* */
-typedef int (app_send_evt_handler_fn) (app_worker_t *app,
- session_t *s);
-static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
- app_send_io_evt_rx,
- app_send_io_evt_tx,
-};
-/* *INDENT-ON* */
-
-/**
- * Send event to application
- *
- * Logic from queue perspective is blocking. However, if queue is full,
- * we return.
- */
-int
-app_worker_lock_and_send_event (app_worker_t * app, session_t * s,
- u8 evt_type)
+void
+app_worker_unset_wrk_mq_congested (app_worker_t *app_wrk, u32 thread_index)
{
- return app_send_evt_handler_fns[evt_type] (app, s);
+ clib_atomic_fetch_sub_relax (&app_wrk->mq_congested, 1);
+ ASSERT (thread_index == vlib_get_thread_index ());
+ app_wrk->wrk_mq_congested[thread_index] = 0;
}
u8 *
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
index ad0ba89a1dd..d459b73650a 100644
--- a/src/vnet/session/segment_manager.c
+++ b/src/vnet/session/segment_manager.c
@@ -866,7 +866,7 @@ segment_manager_dealloc_fifos (svm_fifo_t * rx_fifo, svm_fifo_t * tx_fifo)
/* Thread that allocated the fifos must be the one to clean them up */
ASSERT (rx_fifo->master_thread_index == vlib_get_thread_index () ||
- rx_fifo->refcnt > 1);
+ rx_fifo->refcnt > 1 || vlib_thread_is_main_w_barrier ());
/* It's possible to have no segment manager if the session was removed
* as result of a detach. */
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 228234ceefc..b494041f9eb 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -240,18 +240,26 @@ session_is_valid (u32 si, u8 thread_index)
|| s->session_state <= SESSION_STATE_LISTENING)
return 1;
- if (s->session_state == SESSION_STATE_CONNECTING &&
+ if ((s->session_state == SESSION_STATE_CONNECTING ||
+ s->session_state == SESSION_STATE_TRANSPORT_CLOSED) &&
(s->flags & SESSION_F_HALF_OPEN))
return 1;
tc = session_get_transport (s);
- if (s->connection_index != tc->c_index
- || s->thread_index != tc->thread_index || tc->s_index != si)
+ if (s->connection_index != tc->c_index ||
+ s->thread_index != tc->thread_index || tc->s_index != si)
return 0;
return 1;
}
+void
+session_cleanup (session_t *s)
+{
+ segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
+ session_free (s);
+}
+
static void
session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
{
@@ -259,16 +267,21 @@ session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
app_wrk = app_worker_get_if_valid (s->app_wrk_index);
if (!app_wrk)
- return;
+ {
+ if (ntf == SESSION_CLEANUP_TRANSPORT)
+ return;
+
+ session_cleanup (s);
+ return;
+ }
app_worker_cleanup_notify (app_wrk, s, ntf);
}
void
-session_free_w_fifos (session_t * s)
+session_program_cleanup (session_t *s)
{
+ ASSERT (s->session_state == SESSION_STATE_TRANSPORT_DELETED);
session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
- segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
- session_free (s);
}
/**
@@ -285,7 +298,7 @@ session_delete (session_t * s)
if ((rv = session_lookup_del_session (s)))
clib_warning ("session %u hash delete rv %d", s->session_index, rv);
- session_free_w_fifos (s);
+ session_program_cleanup (s);
}
void
@@ -307,7 +320,7 @@ session_cleanup_half_open (session_handle_t ho_handle)
transport_cleanup (session_get_transport_proto (ho),
ho->connection_index, ho->app_index /* overloaded */);
}
- else
+ else if (ho->session_state != SESSION_STATE_TRANSPORT_DELETED)
{
/* Cleanup half-open session lookup table if need be */
if (ho->session_state != SESSION_STATE_TRANSPORT_CLOSED)
@@ -333,7 +346,8 @@ session_half_open_free (session_t *ho)
app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
if (app_wrk)
app_worker_del_half_open (app_wrk, ho);
- session_free (ho);
+ else
+ session_free (ho);
}
static void
@@ -354,6 +368,7 @@ session_half_open_delete_notify (transport_connection_t *tc)
if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
session_lookup_del_half_open (tc);
}
+ session_set_state (ho, SESSION_STATE_TRANSPORT_DELETED);
/* Notification from ctrl thread accepted without rpc */
if (tc->thread_index == transport_cl_thread ())
@@ -558,10 +573,158 @@ session_fifo_tuning (session_t * s, svm_fifo_t * f,
}
}
+void
+session_wrk_program_app_wrk_evts (session_worker_t *wrk, u32 app_wrk_index)
+{
+ u8 need_interrupt;
+
+ ASSERT ((wrk - session_main.wrk) == vlib_get_thread_index ());
+ need_interrupt = clib_bitmap_is_zero (wrk->app_wrks_pending_ntf);
+ wrk->app_wrks_pending_ntf =
+ clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk_index, 1);
+
+ if (need_interrupt)
+ vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
+}
+
+always_inline void
+session_program_io_event (app_worker_t *app_wrk, session_t *s,
+ session_evt_type_t et, u8 is_cl)
+{
+ if (is_cl)
+ {
+ /* Special events for connectionless sessions */
+ et += SESSION_IO_EVT_BUILTIN_RX - SESSION_IO_EVT_RX;
+
+ ASSERT (s->thread_index == 0);
+ session_event_t evt = {
+ .event_type = et,
+ .session_handle = session_handle (s),
+ };
+
+ app_worker_add_event_custom (app_wrk, vlib_get_thread_index (), &evt);
+ }
+ else
+ {
+ app_worker_add_event (app_wrk, s, et);
+ }
+}
+
+static inline int
+session_notify_subscribers (u32 app_index, session_t *s, svm_fifo_t *f,
+ session_evt_type_t evt_type)
+{
+ app_worker_t *app_wrk;
+ application_t *app;
+ u8 is_cl;
+ int i;
+
+ app = application_get (app_index);
+ if (!app)
+ return -1;
+
+ is_cl = s->thread_index != vlib_get_thread_index ();
+ for (i = 0; i < f->shr->n_subscribers; i++)
+ {
+ app_wrk = application_get_worker (app, f->shr->subscribers[i]);
+ if (!app_wrk)
+ continue;
+ session_program_io_event (app_wrk, s, evt_type, is_cl ? 1 : 0);
+ }
+
+ return 0;
+}
+
+always_inline int
+session_enqueue_notify_inline (session_t *s, u8 is_cl)
+{
+ app_worker_t *app_wrk;
+
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (PREDICT_FALSE (!app_wrk))
+ return -1;
+
+ session_program_io_event (app_wrk, s, SESSION_IO_EVT_RX, is_cl);
+
+ if (PREDICT_FALSE (svm_fifo_n_subscribers (s->rx_fifo)))
+ return session_notify_subscribers (app_wrk->app_index, s, s->rx_fifo,
+ SESSION_IO_EVT_RX);
+
+ return 0;
+}
+
+int
+session_enqueue_notify (session_t *s)
+{
+ return session_enqueue_notify_inline (s, 0 /* is_cl */);
+}
+
+int
+session_enqueue_notify_cl (session_t *s)
+{
+ return session_enqueue_notify_inline (s, 1 /* is_cl */);
+}
+
+int
+session_dequeue_notify (session_t *s)
+{
+ app_worker_t *app_wrk;
+ u8 is_cl;
+
+ /* Unset as soon as event is requested */
+ svm_fifo_clear_deq_ntf (s->tx_fifo);
+
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (PREDICT_FALSE (!app_wrk))
+ return -1;
+
+ is_cl = s->session_state == SESSION_STATE_LISTENING ||
+ s->session_state == SESSION_STATE_OPENED;
+ session_program_io_event (app_wrk, s, SESSION_IO_EVT_TX, is_cl ? 1 : 0);
+
+ if (PREDICT_FALSE (svm_fifo_n_subscribers (s->tx_fifo)))
+ return session_notify_subscribers (app_wrk->app_index, s, s->tx_fifo,
+ SESSION_IO_EVT_TX);
+
+ return 0;
+}
+
+/**
+ * Flushes queue of sessions that are to be notified of new data
+ * enqueued events.
+ *
+ * @param transport_proto transport protocol for which queue to be flushed
+ * @param thread_index Thread index for which the flush is to be performed.
+ * @return 0 on success or a positive number indicating the number of
+ * failures due to API queue being full.
+ */
+void
+session_main_flush_enqueue_events (transport_proto_t transport_proto,
+ u32 thread_index)
+{
+ session_worker_t *wrk = session_main_get_worker (thread_index);
+ session_handle_t *handles;
+ session_t *s;
+ u32 i;
+
+ handles = wrk->session_to_enqueue[transport_proto];
+
+ for (i = 0; i < vec_len (handles); i++)
+ {
+ s = session_get_from_handle (handles[i]);
+ session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
+ 0 /* TODO/not needed */);
+ session_enqueue_notify_inline (s,
+ s->thread_index != thread_index ? 1 : 0);
+ }
+
+ vec_reset_length (handles);
+ wrk->session_to_enqueue[transport_proto] = handles;
+}
+
/*
- * Enqueue data for delivery to session peer. Does not notify peer of enqueue
- * event but on request can queue notification events for later delivery by
- * calling stream_server_flush_enqueue_events().
+ * Enqueue data for delivery to app. If requested, it queues app notification
+ * event for later delivery.
*
* @param tc Transport connection which is to be enqueued data
* @param b Buffer to be enqueued
@@ -610,15 +773,14 @@ session_enqueue_stream_connection (transport_connection_t * tc,
if (queue_event)
{
- /* Queue RX event on this fifo. Eventually these will need to be flushed
- * by calling stream_server_flush_enqueue_events () */
- session_worker_t *wrk;
-
- wrk = session_main_get_worker (s->thread_index);
+ /* Queue RX event on this fifo. Eventually these will need to be
+ * flushed by calling @ref session_main_flush_enqueue_events () */
if (!(s->flags & SESSION_F_RX_EVT))
{
+ session_worker_t *wrk = session_main_get_worker (s->thread_index);
+ ASSERT (s->thread_index == vlib_get_thread_index ());
s->flags |= SESSION_F_RX_EVT;
- vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
+ vec_add1 (wrk->session_to_enqueue[tc->proto], session_handle (s));
}
session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
@@ -627,10 +789,11 @@ session_enqueue_stream_connection (transport_connection_t * tc,
return enqueued;
}
-int
-session_enqueue_dgram_connection (session_t * s,
- session_dgram_hdr_t * hdr,
- vlib_buffer_t * b, u8 proto, u8 queue_event)
+always_inline int
+session_enqueue_dgram_connection_inline (session_t *s,
+ session_dgram_hdr_t *hdr,
+ vlib_buffer_t *b, u8 proto,
+ u8 queue_event, u32 is_cl)
{
int rv;
@@ -639,12 +802,10 @@ session_enqueue_dgram_connection (session_t * s,
if (PREDICT_TRUE (!(b->flags & VLIB_BUFFER_NEXT_PRESENT)))
{
- /* *INDENT-OFF* */
svm_fifo_seg_t segs[2] = {
{ (u8 *) hdr, sizeof (*hdr) },
{ vlib_buffer_get_current (b), b->current_length }
};
- /* *INDENT-ON* */
rv = svm_fifo_enqueue_segments (s->rx_fifo, segs, 2,
0 /* allow_partial */ );
@@ -676,15 +837,16 @@ session_enqueue_dgram_connection (session_t * s,
if (queue_event && rv > 0)
{
- /* Queue RX event on this fifo. Eventually these will need to be flushed
- * by calling stream_server_flush_enqueue_events () */
- session_worker_t *wrk;
-
- wrk = session_main_get_worker (s->thread_index);
+ /* Queue RX event on this fifo. Eventually these will need to be
+ * flushed by calling @ref session_main_flush_enqueue_events () */
if (!(s->flags & SESSION_F_RX_EVT))
{
+ u32 thread_index =
+ is_cl ? vlib_get_thread_index () : s->thread_index;
+ session_worker_t *wrk = session_main_get_worker (thread_index);
+ ASSERT (s->thread_index == vlib_get_thread_index () || is_cl);
s->flags |= SESSION_F_RX_EVT;
- vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
+ vec_add1 (wrk->session_to_enqueue[proto], session_handle (s));
}
session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED, 0);
@@ -693,6 +855,23 @@ session_enqueue_dgram_connection (session_t * s,
}
int
+session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr,
+ vlib_buffer_t *b, u8 proto, u8 queue_event)
+{
+ return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
+ queue_event, 0 /* is_cl */);
+}
+
+int
+session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr,
+ vlib_buffer_t *b, u8 proto,
+ u8 queue_event)
+{
+ return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
+ queue_event, 1 /* is_cl */);
+}
+
+int
session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes)
{
@@ -715,189 +894,6 @@ session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
return rv;
}
-static inline int
-session_notify_subscribers (u32 app_index, session_t * s,
- svm_fifo_t * f, session_evt_type_t evt_type)
-{
- app_worker_t *app_wrk;
- application_t *app;
- int i;
-
- app = application_get (app_index);
- if (!app)
- return -1;
-
- for (i = 0; i < f->shr->n_subscribers; i++)
- {
- app_wrk = application_get_worker (app, f->shr->subscribers[i]);
- if (!app_wrk)
- continue;
- if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
- return -1;
- }
-
- return 0;
-}
-
-/**
- * Notify session peer that new data has been enqueued.
- *
- * @param s Stream session for which the event is to be generated.
- * @param lock Flag to indicate if call should lock message queue.
- *
- * @return 0 on success or negative number if failed to send notification.
- */
-static inline int
-session_enqueue_notify_inline (session_t * s)
-{
- app_worker_t *app_wrk;
- u32 session_index;
- u8 n_subscribers;
- u32 thread_index;
-
- session_index = s->session_index;
- thread_index = s->thread_index;
- n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
-
- app_wrk = app_worker_get_if_valid (s->app_wrk_index);
- if (PREDICT_FALSE (!app_wrk))
- {
- SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
- return 0;
- }
-
- SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
-
- s->flags &= ~SESSION_F_RX_EVT;
-
- /* Application didn't confirm accept yet */
- if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
- return 0;
-
- if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
- SESSION_IO_EVT_RX)))
- return -1;
-
- if (PREDICT_FALSE (n_subscribers))
- {
- s = session_get (session_index, thread_index);
- return session_notify_subscribers (app_wrk->app_index, s,
- s->rx_fifo, SESSION_IO_EVT_RX);
- }
-
- return 0;
-}
-
-int
-session_enqueue_notify (session_t * s)
-{
- return session_enqueue_notify_inline (s);
-}
-
-static void
-session_enqueue_notify_rpc (void *arg)
-{
- u32 session_index = pointer_to_uword (arg);
- session_t *s;
-
- s = session_get_if_valid (session_index, vlib_get_thread_index ());
- if (!s)
- return;
-
- session_enqueue_notify (s);
-}
-
-/**
- * Like session_enqueue_notify, but can be called from a thread that does not
- * own the session.
- */
-void
-session_enqueue_notify_thread (session_handle_t sh)
-{
- u32 thread_index = session_thread_from_handle (sh);
- u32 session_index = session_index_from_handle (sh);
-
- /*
- * Pass session index (u32) as opposed to handle (u64) in case pointers
- * are not 64-bit.
- */
- session_send_rpc_evt_to_thread (thread_index,
- session_enqueue_notify_rpc,
- uword_to_pointer (session_index, void *));
-}
-
-int
-session_dequeue_notify (session_t * s)
-{
- app_worker_t *app_wrk;
-
- svm_fifo_clear_deq_ntf (s->tx_fifo);
-
- app_wrk = app_worker_get_if_valid (s->app_wrk_index);
- if (PREDICT_FALSE (!app_wrk))
- return -1;
-
- if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
- SESSION_IO_EVT_TX)))
- return -1;
-
- if (PREDICT_FALSE (s->tx_fifo->shr->n_subscribers))
- return session_notify_subscribers (app_wrk->app_index, s,
- s->tx_fifo, SESSION_IO_EVT_TX);
-
- return 0;
-}
-
-/**
- * Flushes queue of sessions that are to be notified of new data
- * enqueued events.
- *
- * @param thread_index Thread index for which the flush is to be performed.
- * @return 0 on success or a positive number indicating the number of
- * failures due to API queue being full.
- */
-int
-session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
-{
- session_worker_t *wrk = session_main_get_worker (thread_index);
- session_t *s;
- int i, errors = 0;
- u32 *indices;
-
- indices = wrk->session_to_enqueue[transport_proto];
-
- for (i = 0; i < vec_len (indices); i++)
- {
- s = session_get_if_valid (indices[i], thread_index);
- if (PREDICT_FALSE (!s))
- {
- errors++;
- continue;
- }
-
- session_fifo_tuning (s, s->rx_fifo, SESSION_FT_ACTION_ENQUEUED,
- 0 /* TODO/not needed */ );
-
- if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
- errors++;
- }
-
- vec_reset_length (indices);
- wrk->session_to_enqueue[transport_proto] = indices;
-
- return errors;
-}
-
-int
-session_main_flush_all_enqueue_events (u8 transport_proto)
-{
- vlib_thread_main_t *vtm = vlib_get_thread_main ();
- int i, errors = 0;
- for (i = 0; i < 1 + vtm->n_threads; i++)
- errors += session_main_flush_enqueue_events (transport_proto, i);
- return errors;
-}
-
int
session_stream_connect_notify (transport_connection_t * tc,
session_error_t err)
@@ -951,43 +947,20 @@ session_stream_connect_notify (transport_connection_t * tc,
return 0;
}
-typedef union session_switch_pool_reply_args_
-{
- struct
- {
- u32 session_index;
- u16 thread_index;
- u8 is_closed;
- };
- u64 as_u64;
-} session_switch_pool_reply_args_t;
-
-STATIC_ASSERT (sizeof (session_switch_pool_reply_args_t) <= sizeof (uword),
- "switch pool reply args size");
-
static void
-session_switch_pool_reply (void *arg)
+session_switch_pool_closed_rpc (void *arg)
{
- session_switch_pool_reply_args_t rargs;
+ session_handle_t sh;
session_t *s;
- rargs.as_u64 = pointer_to_uword (arg);
- s = session_get_if_valid (rargs.session_index, rargs.thread_index);
+ sh = pointer_to_uword (arg);
+ s = session_get_from_handle_if_valid (sh);
if (!s)
return;
- /* Session closed during migration. Clean everything up */
- if (rargs.is_closed)
- {
- transport_cleanup (session_get_transport_proto (s), s->connection_index,
- s->thread_index);
- segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
- session_free (s);
- return;
- }
-
- /* Notify app that it has data on the new session */
- session_enqueue_notify (s);
+ transport_cleanup (session_get_transport_proto (s), s->connection_index,
+ s->thread_index);
+ session_cleanup (s);
}
typedef struct _session_switch_pool_args
@@ -1005,8 +978,7 @@ static void
session_switch_pool (void *cb_args)
{
session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
- session_switch_pool_reply_args_t rargs;
- session_handle_t new_sh;
+ session_handle_t sh, new_sh;
segment_manager_t *sm;
app_worker_t *app_wrk;
session_t *s;
@@ -1014,37 +986,32 @@ session_switch_pool (void *cb_args)
ASSERT (args->thread_index == vlib_get_thread_index ());
s = session_get (args->session_index, args->thread_index);
- /* Check if session closed during migration */
- rargs.is_closed = s->session_state >= SESSION_STATE_TRANSPORT_CLOSING;
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (!app_wrk)
+ goto app_closed;
- transport_cleanup (session_get_transport_proto (s), s->connection_index,
- s->thread_index);
+ /* Cleanup fifo segment slice state for fifos */
+ sm = app_worker_get_connect_segment_manager (app_wrk);
+ segment_manager_detach_fifo (sm, &s->rx_fifo);
+ segment_manager_detach_fifo (sm, &s->tx_fifo);
- app_wrk = app_worker_get_if_valid (s->app_wrk_index);
- if (app_wrk)
- {
- /* Cleanup fifo segment slice state for fifos */
- sm = app_worker_get_connect_segment_manager (app_wrk);
- segment_manager_detach_fifo (sm, &s->rx_fifo);
- segment_manager_detach_fifo (sm, &s->tx_fifo);
+ /* Check if session closed during migration */
+ if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+ goto app_closed;
- /* Notify app, using old session, about the migration event */
- if (!rargs.is_closed)
- {
- new_sh = session_make_handle (args->new_session_index,
- args->new_thread_index);
- app_worker_migrate_notify (app_wrk, s, new_sh);
- }
- }
+ new_sh =
+ session_make_handle (args->new_session_index, args->new_thread_index);
+ app_worker_migrate_notify (app_wrk, s, new_sh);
- /* Trigger app read and fifo updates on the new thread */
- rargs.session_index = args->new_session_index;
- rargs.thread_index = args->new_thread_index;
- session_send_rpc_evt_to_thread (args->new_thread_index,
- session_switch_pool_reply,
- uword_to_pointer (rargs.as_u64, void *));
+ clib_mem_free (cb_args);
+ return;
- session_free (s);
+app_closed:
+ /* Session closed during migration. Clean everything up */
+ sh = session_handle (s);
+ session_send_rpc_evt_to_thread (args->new_thread_index,
+ session_switch_pool_closed_rpc,
+ uword_to_pointer (sh, void *));
clib_mem_free (cb_args);
}
@@ -1184,6 +1151,7 @@ session_transport_delete_notify (transport_connection_t * tc)
break;
case SESSION_STATE_CLOSED:
session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
+ session_set_state (s, SESSION_STATE_TRANSPORT_DELETED);
session_delete (s);
break;
default:
@@ -1633,7 +1601,7 @@ session_transport_close (session_t * s)
session_set_state (s, SESSION_STATE_CLOSED);
/* If transport is already deleted, just free the session */
else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
- session_free_w_fifos (s);
+ session_program_cleanup (s);
return;
}
@@ -1660,7 +1628,7 @@ session_transport_reset (session_t * s)
if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
session_set_state (s, SESSION_STATE_CLOSED);
else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
- session_free_w_fifos (s);
+ session_program_cleanup (s);
return;
}
@@ -2157,6 +2125,7 @@ session_node_enable_disable (u8 is_en)
if (!sm->poll_main)
continue;
}
+ vlib_node_set_state (vm, session_input_node.index, mstate);
vlib_node_set_state (vm, session_queue_node.index, state);
}
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 6b6d1f64499..4de7bb252f0 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -100,8 +100,8 @@ typedef struct session_worker_
/** Convenience pointer to this worker's vlib_main */
vlib_main_t *vm;
- /** Per-proto vector of sessions to enqueue */
- u32 **session_to_enqueue;
+ /** Per-proto vector of session handles to enqueue */
+ session_handle_t **session_to_enqueue;
/** Timerfd used to periodically signal wrk session queue node */
int timerfd;
@@ -148,6 +148,9 @@ typedef struct session_worker_
/** List head for first worker evts pending handling on main */
clib_llist_index_t evts_pending_main;
+ /** Per-app-worker bitmap of pending notifications */
+ uword *app_wrks_pending_ntf;
+
int config_index;
u8 dma_enabled;
session_dma_transfer *dma_trans;
@@ -275,6 +278,7 @@ typedef struct session_main_
extern session_main_t session_main;
extern vlib_node_registration_t session_queue_node;
+extern vlib_node_registration_t session_input_node;
extern vlib_node_registration_t session_queue_process_node;
extern vlib_node_registration_t session_queue_pre_input_node;
@@ -358,7 +362,8 @@ int session_wrk_handle_mq (session_worker_t *wrk, svm_msg_q_t *mq);
session_t *session_alloc (u32 thread_index);
void session_free (session_t * s);
-void session_free_w_fifos (session_t * s);
+void session_cleanup (session_t *s);
+void session_program_cleanup (session_t *s);
void session_cleanup_half_open (session_handle_t ho_handle);
u8 session_is_valid (u32 si, u8 thread_index);
@@ -452,8 +457,9 @@ void session_transport_reset (session_t * s);
void session_transport_cleanup (session_t * s);
int session_send_io_evt_to_thread (svm_fifo_t * f,
session_evt_type_t evt_type);
-int session_enqueue_notify (session_t * s);
+int session_enqueue_notify (session_t *s);
int session_dequeue_notify (session_t * s);
+int session_enqueue_notify_cl (session_t *s);
int session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
session_evt_type_t evt_type);
void session_send_rpc_evt_to_thread (u32 thread_index, void *fp,
@@ -485,6 +491,10 @@ int session_enqueue_dgram_connection (session_t * s,
session_dgram_hdr_t * hdr,
vlib_buffer_t * b, u8 proto,
u8 queue_event);
+int session_enqueue_dgram_connection_cl (session_t *s,
+ session_dgram_hdr_t *hdr,
+ vlib_buffer_t *b, u8 proto,
+ u8 queue_event);
int session_stream_connect_notify (transport_connection_t * tc,
session_error_t err);
int session_dgram_connect_notify (transport_connection_t * tc,
@@ -502,6 +512,7 @@ int session_stream_accept (transport_connection_t * tc, u32 listener_index,
u32 thread_index, u8 notify);
int session_dgram_accept (transport_connection_t * tc, u32 listener_index,
u32 thread_index);
+
/**
* Initialize session layer for given transport proto and ip version
*
@@ -765,8 +776,8 @@ do { \
return clib_error_return (0, "session layer is not enabled"); \
} while (0)
-int session_main_flush_enqueue_events (u8 proto, u32 thread_index);
-int session_main_flush_all_enqueue_events (u8 transport_proto);
+void session_main_flush_enqueue_events (transport_proto_t transport_proto,
+ u32 thread_index);
void session_queue_run_on_main_thread (vlib_main_t * vm);
/**
@@ -799,6 +810,8 @@ fifo_segment_t *session_main_get_wrk_mqs_segment (void);
void session_node_enable_disable (u8 is_en);
clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
void session_wrk_handle_evts_main_rpc (void *);
+void session_wrk_program_app_wrk_evts (session_worker_t *wrk,
+ u32 app_wrk_index);
session_t *session_alloc_for_connection (transport_connection_t * tc);
session_t *session_alloc_for_half_open (transport_connection_t *tc);
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index ff11bcb690a..55fc72ee4c2 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -460,6 +460,52 @@ mq_send_session_cleanup_cb (session_t * s, session_cleanup_ntf_t ntf)
app_wrk_send_ctrl_evt (app_wrk, SESSION_CTRL_EVT_CLEANUP, &m, sizeof (m));
}
+static int
+mq_send_io_rx_event (session_t *s)
+{
+ session_event_t *mq_evt;
+ svm_msg_q_msg_t mq_msg;
+ app_worker_t *app_wrk;
+ svm_msg_q_t *mq;
+
+ if (svm_fifo_has_event (s->rx_fifo))
+ return 0;
+
+ app_wrk = app_worker_get (s->app_wrk_index);
+ mq = app_wrk->event_queue;
+
+ mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ mq_evt = svm_msg_q_msg_data (mq, &mq_msg);
+
+ mq_evt->event_type = SESSION_IO_EVT_RX;
+ mq_evt->session_index = s->rx_fifo->shr->client_session_index;
+
+ (void) svm_fifo_set_event (s->rx_fifo);
+
+ svm_msg_q_add_raw (mq, &mq_msg);
+
+ return 0;
+}
+
+static int
+mq_send_io_tx_event (session_t *s)
+{
+ app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
+ svm_msg_q_t *mq = app_wrk->event_queue;
+ session_event_t *mq_evt;
+ svm_msg_q_msg_t mq_msg;
+
+ mq_msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+ mq_evt = svm_msg_q_msg_data (mq, &mq_msg);
+
+ mq_evt->event_type = SESSION_IO_EVT_TX;
+ mq_evt->session_index = s->tx_fifo->shr->client_session_index;
+
+ svm_msg_q_add_raw (mq, &mq_msg);
+
+ return 0;
+}
+
static session_cb_vft_t session_mq_cb_vft = {
.session_accept_callback = mq_send_session_accepted_cb,
.session_disconnect_callback = mq_send_session_disconnected_cb,
@@ -469,6 +515,8 @@ static session_cb_vft_t session_mq_cb_vft = {
.session_cleanup_callback = mq_send_session_cleanup_cb,
.add_segment_callback = mq_send_add_segment_cb,
.del_segment_callback = mq_send_del_segment_cb,
+ .builtin_app_rx_callback = mq_send_io_rx_event,
+ .builtin_app_tx_callback = mq_send_io_tx_event,
};
static void
@@ -1246,6 +1294,8 @@ static session_cb_vft_t session_mq_sapi_cb_vft = {
.session_cleanup_callback = mq_send_session_cleanup_cb,
.add_segment_callback = mq_send_add_segment_sapi_cb,
.del_segment_callback = mq_send_del_segment_sapi_cb,
+ .builtin_app_rx_callback = mq_send_io_rx_event,
+ .builtin_app_tx_callback = mq_send_io_tx_event,
};
static void
diff --git a/src/vnet/session/session_input.c b/src/vnet/session/session_input.c
new file mode 100644
index 00000000000..8c1f11ca96e
--- /dev/null
+++ b/src/vnet/session/session_input.c
@@ -0,0 +1,296 @@
+/* SPDX-License-Identifier: Apache-2.0
+ * Copyright(c) 2023 Cisco Systems, Inc.
+ */
+
+#include <vnet/session/session.h>
+#include <vnet/session/application.h>
+
+static inline int
+mq_try_lock (svm_msg_q_t *mq)
+{
+ int rv, n_try = 0;
+
+ while (n_try < 100)
+ {
+ rv = svm_msg_q_try_lock (mq);
+ if (!rv)
+ return 0;
+ n_try += 1;
+ usleep (1);
+ }
+
+ return -1;
+}
+
+always_inline u8
+mq_event_ring_index (session_evt_type_t et)
+{
+ return (et >= SESSION_CTRL_EVT_RPC ? SESSION_MQ_CTRL_EVT_RING :
+ SESSION_MQ_IO_EVT_RING);
+}
+
+void
+app_worker_del_all_events (app_worker_t *app_wrk)
+{
+ session_worker_t *wrk;
+ session_event_t *evt;
+ u32 thread_index;
+ session_t *s;
+
+ for (thread_index = 0; thread_index < vec_len (app_wrk->wrk_evts);
+ thread_index++)
+ {
+ while (clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
+ {
+ clib_fifo_sub2 (app_wrk->wrk_evts[thread_index], evt);
+ switch (evt->event_type)
+ {
+ case SESSION_CTRL_EVT_MIGRATED:
+ s = session_get (evt->session_index, thread_index);
+ transport_cleanup (session_get_transport_proto (s),
+ s->connection_index, s->thread_index);
+ session_free (s);
+ break;
+ case SESSION_CTRL_EVT_CLEANUP:
+ s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
+ if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
+ break;
+ uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
+ break;
+ case SESSION_CTRL_EVT_HALF_CLEANUP:
+ s = ho_session_get (evt->session_index);
+ pool_put_index (app_wrk->half_open_table, s->ho_index);
+ session_free (s);
+ break;
+ default:
+ break;
+ }
+ }
+ wrk = session_main_get_worker (thread_index);
+ clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
+ }
+}
+
+always_inline int
+app_worker_flush_events_inline (app_worker_t *app_wrk, u32 thread_index,
+ u8 is_builtin)
+{
+ application_t *app = application_get (app_wrk->app_index);
+ svm_msg_q_t *mq = app_wrk->event_queue;
+ session_event_t *evt;
+ u32 n_evts = 128, i;
+ u8 ring_index, mq_is_cong;
+ session_t *s;
+
+ n_evts = clib_min (n_evts, clib_fifo_elts (app_wrk->wrk_evts[thread_index]));
+
+ if (!is_builtin)
+ {
+ mq_is_cong = app_worker_mq_is_congested (app_wrk);
+ if (mq_try_lock (mq))
+ {
+ app_worker_set_mq_wrk_congested (app_wrk, thread_index);
+ return 0;
+ }
+ }
+
+ for (i = 0; i < n_evts; i++)
+ {
+ evt = clib_fifo_head (app_wrk->wrk_evts[thread_index]);
+ if (!is_builtin)
+ {
+ ring_index = mq_event_ring_index (evt->event_type);
+ if (svm_msg_q_or_ring_is_full (mq, ring_index))
+ {
+ app_worker_set_mq_wrk_congested (app_wrk, thread_index);
+ break;
+ }
+ }
+
+ switch (evt->event_type)
+ {
+ case SESSION_IO_EVT_RX:
+ s = session_get (evt->session_index, thread_index);
+ s->flags &= ~SESSION_F_RX_EVT;
+ /* Application didn't confirm accept yet */
+ if (PREDICT_FALSE (s->session_state == SESSION_STATE_ACCEPTING))
+ break;
+ app->cb_fns.builtin_app_rx_callback (s);
+ break;
+ /* Handle sessions that might not be on current thread */
+ case SESSION_IO_EVT_BUILTIN_RX:
+ s = session_get_from_handle_if_valid (evt->session_handle);
+ if (!s || s->session_state == SESSION_STATE_ACCEPTING)
+ break;
+ app->cb_fns.builtin_app_rx_callback (s);
+ break;
+ case SESSION_IO_EVT_TX:
+ s = session_get (evt->session_index, thread_index);
+ app->cb_fns.builtin_app_tx_callback (s);
+ break;
+ case SESSION_IO_EVT_TX_MAIN:
+ s = session_get_from_handle_if_valid (evt->session_handle);
+ if (!s)
+ break;
+ app->cb_fns.builtin_app_tx_callback (s);
+ break;
+ case SESSION_CTRL_EVT_BOUND:
+ /* No app cb function currently */
+ if (is_builtin)
+ break;
+ mq_send_session_bound_cb (app_wrk->wrk_index, evt->as_u64[1] >> 32,
+ evt->session_handle,
+ evt->as_u64[1] & 0xffffffff);
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ s = session_get (evt->session_index, thread_index);
+ app->cb_fns.session_accept_callback (s);
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ if (!(evt->as_u64[1] & 0xffffffff))
+ s = session_get (evt->session_index, thread_index);
+ else
+ s = 0;
+ app->cb_fns.session_connected_callback (app_wrk->wrk_index,
+ evt->as_u64[1] >> 32, s,
+ evt->as_u64[1] & 0xffffffff);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ s = session_get (evt->session_index, thread_index);
+ app->cb_fns.session_disconnect_callback (s);
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ s = session_get (evt->session_index, thread_index);
+ app->cb_fns.session_reset_callback (s);
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN_REPLY:
+ if (is_builtin)
+ break;
+ mq_send_unlisten_reply (app_wrk, evt->session_handle,
+ evt->as_u64[1] >> 32,
+ evt->as_u64[1] & 0xffffffff);
+ break;
+ case SESSION_CTRL_EVT_MIGRATED:
+ s = session_get (evt->session_index, thread_index);
+ app->cb_fns.session_migrate_callback (s, evt->as_u64[1]);
+ transport_cleanup (session_get_transport_proto (s),
+ s->connection_index, s->thread_index);
+ session_free (s);
+ /* Notify app that it has data on the new session */
+ s = session_get_from_handle (evt->as_u64[1]);
+ session_send_io_evt_to_thread (s->rx_fifo,
+ SESSION_IO_EVT_BUILTIN_RX);
+ break;
+ case SESSION_CTRL_EVT_TRANSPORT_CLOSED:
+ s = session_get (evt->session_index, thread_index);
+ if (app->cb_fns.session_transport_closed_callback)
+ app->cb_fns.session_transport_closed_callback (s);
+ break;
+ case SESSION_CTRL_EVT_CLEANUP:
+ s = session_get (evt->as_u64[0] & 0xffffffff, thread_index);
+ if (app->cb_fns.session_cleanup_callback)
+ app->cb_fns.session_cleanup_callback (s, evt->as_u64[0] >> 32);
+ if (evt->as_u64[0] >> 32 != SESSION_CLEANUP_SESSION)
+ break;
+ uword_to_pointer (evt->as_u64[1], void (*) (session_t * s)) (s);
+ break;
+ case SESSION_CTRL_EVT_HALF_CLEANUP:
+ s = ho_session_get (evt->session_index);
+ ASSERT (session_vlib_thread_is_cl_thread ());
+ if (app->cb_fns.half_open_cleanup_callback)
+ app->cb_fns.half_open_cleanup_callback (s);
+ pool_put_index (app_wrk->half_open_table, s->ho_index);
+ session_free (s);
+ break;
+ case SESSION_CTRL_EVT_APP_ADD_SEGMENT:
+ app->cb_fns.add_segment_callback (app_wrk->wrk_index,
+ evt->as_u64[1]);
+ break;
+ case SESSION_CTRL_EVT_APP_DEL_SEGMENT:
+ app->cb_fns.del_segment_callback (app_wrk->wrk_index,
+ evt->as_u64[1]);
+ break;
+ default:
+ clib_warning ("unexpected event: %u", evt->event_type);
+ ASSERT (0);
+ break;
+ }
+ clib_fifo_advance_head (app_wrk->wrk_evts[thread_index], 1);
+ }
+
+ if (!is_builtin)
+ {
+ svm_msg_q_unlock (mq);
+ if (mq_is_cong && i == n_evts)
+ app_worker_unset_wrk_mq_congested (app_wrk, thread_index);
+ }
+
+ return 0;
+}
+
+int
+app_wrk_flush_wrk_events (app_worker_t *app_wrk, u32 thread_index)
+{
+ if (app_worker_application_is_builtin (app_wrk))
+ return app_worker_flush_events_inline (app_wrk, thread_index,
+ 1 /* is_builtin */);
+ else
+ return app_worker_flush_events_inline (app_wrk, thread_index,
+ 0 /* is_builtin */);
+}
+
+static inline int
+session_wrk_flush_events (session_worker_t *wrk)
+{
+ app_worker_t *app_wrk;
+ uword app_wrk_index;
+ u32 thread_index;
+
+ thread_index = wrk->vm->thread_index;
+ app_wrk_index = clib_bitmap_first_set (wrk->app_wrks_pending_ntf);
+
+ while (app_wrk_index != ~0)
+ {
+ app_wrk = app_worker_get_if_valid (app_wrk_index);
+ /* app_wrk events are flushed on free, so should be valid here */
+ ASSERT (app_wrk != 0);
+ app_wrk_flush_wrk_events (app_wrk, thread_index);
+
+ if (!clib_fifo_elts (app_wrk->wrk_evts[thread_index]))
+ clib_bitmap_set (wrk->app_wrks_pending_ntf, app_wrk->wrk_index, 0);
+
+ app_wrk_index =
+ clib_bitmap_next_set (wrk->app_wrks_pending_ntf, app_wrk_index + 1);
+ }
+
+ if (!clib_bitmap_is_zero (wrk->app_wrks_pending_ntf))
+ vlib_node_set_interrupt_pending (wrk->vm, session_input_node.index);
+
+ return 0;
+}
+
+VLIB_NODE_FN (session_input_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+ u32 thread_index = vm->thread_index;
+ session_worker_t *wrk;
+
+ wrk = session_main_get_worker (thread_index);
+ session_wrk_flush_events (wrk);
+
+ return 0;
+}
+
+VLIB_REGISTER_NODE (session_input_node) = {
+ .name = "session-input",
+ .type = VLIB_NODE_TYPE_INPUT,
+ .state = VLIB_NODE_STATE_DISABLED,
+};
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */ \ No newline at end of file
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index e15625e37ca..4f2cae4d196 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -142,10 +142,14 @@ session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
session_worker_stat_error_inc (wrk, rv, 1);
app_wrk = application_get_worker (app, mp->wrk_index);
- mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+ app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
if (mp->ext_config)
session_mq_free_ext_config (app, mp->ext_config);
+
+ /* Make sure events are flushed before releasing barrier, to avoid
+ * potential race with accept. */
+ app_wrk_flush_wrk_events (app_wrk, 0);
}
static void
@@ -170,7 +174,8 @@ session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
rv = vnet_bind_uri (a);
app_wrk = application_get_worker (app, 0);
- mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+ app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
+ app_wrk_flush_wrk_events (app_wrk, 0);
}
static void
@@ -215,7 +220,7 @@ session_mq_connect_one (session_connect_msg_t *mp)
wrk = session_main_get_worker (vlib_get_thread_index ());
session_worker_stat_error_inc (wrk, rv, 1);
app_wrk = application_get_worker (app, mp->wrk_index);
- mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
+ app_worker_connect_notify (app_wrk, 0, rv, mp->context);
}
if (mp->ext_config)
@@ -324,7 +329,7 @@ session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
{
session_worker_stat_error_inc (wrk, rv, 1);
app_wrk = application_get_worker (app, 0 /* default wrk only */ );
- mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
+ app_worker_connect_notify (app_wrk, 0, rv, mp->context);
}
}
@@ -410,7 +415,7 @@ session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
if (!app_wrk)
return;
- mq_send_unlisten_reply (app_wrk, sh, mp->context, rv);
+ app_worker_unlisten_reply (app_wrk, sh, mp->context, rv);
}
static void
@@ -466,7 +471,7 @@ session_mq_accepted_reply_handler (session_worker_t *wrk,
session_set_state (s, SESSION_STATE_READY);
if (!svm_fifo_is_empty_prod (s->rx_fifo))
- app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, s);
/* Closed while waiting for app to reply. Resend disconnect */
if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
@@ -669,7 +674,7 @@ session_mq_worker_update_handler (void *data)
session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
- app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
+ app_worker_rx_notify (app_wrk, s);
if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
app_worker_close_notify (app_wrk, s);
@@ -1790,7 +1795,7 @@ session_event_dispatch_io (session_worker_t * wrk, vlib_node_runtime_t * node,
break;
svm_fifo_unset_event (s->rx_fifo);
app_wrk = app_worker_get (s->app_wrk_index);
- app_worker_builtin_rx (app_wrk, s);
+ app_worker_rx_notify (app_wrk, s);
break;
case SESSION_IO_EVT_TX_MAIN:
s = session_get_if_valid (e->session_index, 0 /* main thread */);
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 8755a146bce..4fe0c7cf09a 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -379,6 +379,8 @@ typedef enum
SESSION_CTRL_EVT_APP_WRK_RPC,
SESSION_CTRL_EVT_TRANSPORT_ATTR,
SESSION_CTRL_EVT_TRANSPORT_ATTR_REPLY,
+ SESSION_CTRL_EVT_TRANSPORT_CLOSED,
+ SESSION_CTRL_EVT_HALF_CLEANUP,
} session_evt_type_t;
#define foreach_session_ctrl_evt \
@@ -437,6 +439,7 @@ typedef struct
session_handle_t session_handle;
session_rpc_args_t rpc_args;
u32 ctrl_data_index;
+ u64 as_u64[2];
struct
{
u8 data[0];
diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c
index f0d039fbbb8..3d8afaaba3d 100644
--- a/src/vnet/tcp/tcp_input.c
+++ b/src/vnet/tcp/tcp_input.c
@@ -1394,11 +1394,10 @@ always_inline uword
tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame, int is_ip4)
{
- u32 thread_index = vm->thread_index, errors = 0;
+ u32 thread_index = vm->thread_index, n_left_from, *from;
tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
u16 err_counters[TCP_N_ERROR] = { 0 };
- u32 n_left_from, *from;
if (node->flags & VLIB_NODE_FLAG_TRACE)
tcp_established_trace_frame (vm, node, frame, is_ip4);
@@ -1462,9 +1461,7 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
b += 1;
}
- errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP,
- thread_index);
- err_counters[TCP_ERROR_MSG_QUEUE_FULL] = errors;
+ session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
tcp_store_err_counters (established, err_counters);
tcp_handle_postponed_dequeues (wrk);
tcp_handle_disconnects (wrk);
@@ -1746,7 +1743,7 @@ always_inline uword
tcp46_syn_sent_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
vlib_frame_t *frame, int is_ip4)
{
- u32 n_left_from, *from, thread_index = vm->thread_index, errors = 0;
+ u32 n_left_from, *from, thread_index = vm->thread_index;
tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
@@ -1981,9 +1978,7 @@ tcp46_syn_sent_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
tcp_inc_counter (syn_sent, error, 1);
}
- errors =
- session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
- tcp_inc_counter (syn_sent, TCP_ERROR_MSG_QUEUE_FULL, errors);
+ session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
vlib_buffer_free (vm, from, frame->n_vectors);
tcp_handle_disconnects (wrk);
@@ -2058,7 +2053,7 @@ always_inline uword
tcp46_rcv_process_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
vlib_frame_t *frame, int is_ip4)
{
- u32 thread_index = vm->thread_index, errors, n_left_from, *from, max_deq;
+ u32 thread_index = vm->thread_index, n_left_from, *from, max_deq;
tcp_worker_ctx_t *wrk = tcp_get_worker (thread_index);
vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
@@ -2431,9 +2426,7 @@ tcp46_rcv_process_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
tcp_inc_counter (rcv_process, error, 1);
}
- errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP,
- thread_index);
- tcp_inc_counter (rcv_process, TCP_ERROR_MSG_QUEUE_FULL, errors);
+ session_main_flush_enqueue_events (TRANSPORT_PROTO_TCP, thread_index);
tcp_handle_postponed_dequeues (wrk);
tcp_handle_disconnects (wrk);
vlib_buffer_free (vm, from, frame->n_vectors);
diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c
index fb625c841c6..8175e22097a 100644
--- a/src/vnet/tls/tls.c
+++ b/src/vnet/tls/tls.c
@@ -61,8 +61,7 @@ tls_add_vpp_q_rx_evt (session_t * s)
int
tls_add_vpp_q_builtin_rx_evt (session_t * s)
{
- if (svm_fifo_set_event (s->rx_fifo))
- session_send_io_evt_to_thread (s->rx_fifo, SESSION_IO_EVT_BUILTIN_RX);
+ session_enqueue_notify (s);
return 0;
}
@@ -75,9 +74,10 @@ tls_add_vpp_q_tx_evt (session_t * s)
}
static inline int
-tls_add_app_q_evt (app_worker_t * app, session_t * app_session)
+tls_add_app_q_evt (app_worker_t *app_wrk, session_t *app_session)
{
- return app_worker_lock_and_send_event (app, app_session, SESSION_IO_EVT_RX);
+ app_worker_add_event (app_wrk, app_session, SESSION_IO_EVT_RX);
+ return 0;
}
u32
diff --git a/src/vnet/udp/udp_input.c b/src/vnet/udp/udp_input.c
index 6e5ed158d56..33ee2cddefd 100644
--- a/src/vnet/udp/udp_input.c
+++ b/src/vnet/udp/udp_input.c
@@ -149,11 +149,9 @@ udp_connection_enqueue (udp_connection_t * uc0, session_t * s0,
* enqueue event now while we still have the peeker lock */
if (s0->thread_index != thread_index)
{
- wrote0 = session_enqueue_dgram_connection (s0, hdr0, b,
- TRANSPORT_PROTO_UDP,
- /* queue event */ 0);
- if (queue_event && !svm_fifo_has_event (s0->rx_fifo))
- session_enqueue_notify (s0);
+ wrote0 = session_enqueue_dgram_connection_cl (
+ s0, hdr0, b, TRANSPORT_PROTO_UDP,
+ /* queue event */ queue_event && !svm_fifo_has_event (s0->rx_fifo));
}
else
{
@@ -232,10 +230,9 @@ always_inline uword
udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame, u8 is_ip4)
{
- u32 n_left_from, *from, errors, *first_buffer;
+ u32 thread_index = vm->thread_index, n_left_from, *from, *first_buffer;
vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
u16 err_counters[UDP_N_ERROR] = { 0 };
- u32 thread_index = vm->thread_index;
from = first_buffer = vlib_frame_vector_args (frame);
n_left_from = frame->n_vectors;
@@ -327,9 +324,7 @@ udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
}
vlib_buffer_free (vm, first_buffer, frame->n_vectors);
- errors = session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP,
- thread_index);
- err_counters[UDP_ERROR_MQ_FULL] = errors;
+ session_main_flush_enqueue_events (TRANSPORT_PROTO_UDP, thread_index);
udp_store_err_counters (vm, is_ip4, err_counters);
return frame->n_vectors;
}