summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/vcl/vcl_locked.c9
-rw-r--r--src/vnet/session/application.c55
-rw-r--r--src/vnet/session/application.h5
-rw-r--r--src/vnet/session/application_worker.c74
-rw-r--r--src/vnet/session/session.c13
-rw-r--r--src/vnet/session/session.h3
-rw-r--r--src/vnet/session/session_api.c21
-rw-r--r--src/vnet/udp/udp_input.c31
8 files changed, 171 insertions, 40 deletions
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c
index 563d97ef804..412db8def3b 100644
--- a/src/vcl/vcl_locked.c
+++ b/src/vcl/vcl_locked.c
@@ -847,13 +847,14 @@ vls_share_session (vls_worker_t * vls_wrk, vcl_locked_session_t * vls)
vls_shared_data_pool_runlock ();
- if (s->rx_fifo)
+ if (s->session_state == VCL_STATE_LISTEN)
{
- vcl_session_share_fifos (s, s->rx_fifo, s->tx_fifo);
+ s->session_state = VCL_STATE_LISTEN_NO_MQ;
+ s->rx_fifo = s->tx_fifo = 0;
}
- else if (s->session_state == VCL_STATE_LISTEN)
+ else if (s->rx_fifo)
{
- s->session_state = VCL_STATE_LISTEN_NO_MQ;
+ vcl_session_share_fifos (s, s->rx_fifo, s->tx_fifo);
}
}
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index fdd5a0a67cd..5c8efe1c438 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -52,6 +52,7 @@ static void
app_listener_free (application_t * app, app_listener_t * app_listener)
{
clib_bitmap_free (app_listener->workers);
+ vec_free (app_listener->cl_listeners);
if (CLIB_DEBUG)
clib_memset (app_listener, 0xfa, sizeof (*app_listener));
pool_put (app->listeners, app_listener);
@@ -321,6 +322,13 @@ app_listener_get_local_session (app_listener_t * al)
return listen_session_get (al->local_index);
}
+session_t *
+app_listener_get_wrk_cl_session (app_listener_t *al, u32 wrk_map_index)
+{
+ u32 si = vec_elt (al->cl_listeners, wrk_map_index);
+ return session_get (si, 0 /* listener thread */);
+}
+
static app_worker_map_t *
app_worker_map_alloc (application_t * app)
{
@@ -1017,6 +1025,53 @@ application_listener_select_worker (session_t * ls)
return app_listener_select_worker (app, al);
}
+always_inline u32
+app_listener_cl_flow_hash (session_dgram_hdr_t *hdr)
+{
+ u32 hash = 0;
+
+ if (hdr->is_ip4)
+ {
+ hash = clib_crc32c_u32 (hash, hdr->rmt_ip.ip4.as_u32);
+ hash = clib_crc32c_u32 (hash, hdr->lcl_ip.ip4.as_u32);
+ hash = clib_crc32c_u16 (hash, hdr->rmt_port);
+ hash = clib_crc32c_u16 (hash, hdr->lcl_port);
+ }
+ else
+ {
+ hash = clib_crc32c_u64 (hash, hdr->rmt_ip.ip6.as_u64[0]);
+ hash = clib_crc32c_u64 (hash, hdr->rmt_ip.ip6.as_u64[1]);
+ hash = clib_crc32c_u64 (hash, hdr->lcl_ip.ip6.as_u64[0]);
+ hash = clib_crc32c_u64 (hash, hdr->lcl_ip.ip6.as_u64[1]);
+ hash = clib_crc32c_u16 (hash, hdr->rmt_port);
+ hash = clib_crc32c_u16 (hash, hdr->lcl_port);
+ }
+
+ return hash;
+}
+
+session_t *
+app_listener_select_wrk_cl_session (session_t *ls, session_dgram_hdr_t *hdr)
+{
+ u32 wrk_map_index = 0;
+ application_t *app;
+ app_listener_t *al;
+
+ app = application_get (ls->app_index);
+ al = app_listener_get (app, ls->al_index);
+ /* Crude test to check if only worker 0 is set */
+ if (al->workers[0] != 1)
+ {
+ u32 hash = app_listener_cl_flow_hash (hdr);
+ hash %= vec_len (al->workers) * sizeof (uword);
+ wrk_map_index = clib_bitmap_next_set (al->workers, hash);
+ if (wrk_map_index == ~0)
+ wrk_map_index = clib_bitmap_first_set (al->workers);
+ }
+
+ return app_listener_get_wrk_cl_session (al, wrk_map_index);
+}
+
int
application_alloc_worker_and_init (application_t * app, app_worker_t ** wrk)
{
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index 5505d91ea09..7c63b90ac5a 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -106,6 +106,8 @@ typedef struct app_listener_
session_handle_t ls_handle; /**< session handle of the local or global
listening session that also identifies
the app listener */
+ u32 *cl_listeners; /**< vector that maps app workers to their
+ cl sessions with fifos */
} app_listener_t;
typedef enum app_rx_mq_flags_
@@ -254,6 +256,8 @@ void app_listener_cleanup (app_listener_t * app_listener);
session_handle_t app_listener_handle (app_listener_t * app_listener);
app_listener_t *app_listener_lookup (application_t * app,
session_endpoint_cfg_t * sep);
+session_t *app_listener_select_wrk_cl_session (session_t *ls,
+ session_dgram_hdr_t *hdr);
/**
* Get app listener handle for listening session
@@ -280,6 +284,7 @@ app_listener_t *app_listener_get_w_handle (session_handle_t handle);
app_listener_t *app_listener_get_w_session (session_t * ls);
session_t *app_listener_get_session (app_listener_t * al);
session_t *app_listener_get_local_session (app_listener_t * al);
+session_t *app_listener_get_wrk_cl_session (app_listener_t *al, u32 wrk_index);
application_t *application_get (u32 index);
application_t *application_get_if_valid (u32 index);
diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c
index 960d916133f..ea5a4db7fd5 100644
--- a/src/vnet/session/application_worker.c
+++ b/src/vnet/session/application_worker.c
@@ -186,12 +186,67 @@ app_worker_alloc_session_fifos (segment_manager_t * sm, session_t * s)
}
int
+app_worker_alloc_wrk_cl_session (app_worker_t *app_wrk, session_t *ls)
+{
+ svm_fifo_t *rx_fifo = 0, *tx_fifo = 0;
+ segment_manager_t *sm;
+ session_handle_t lsh;
+ app_listener_t *al;
+ session_t *s;
+
+ al = app_listener_get_w_session (ls);
+ sm = app_worker_get_listen_segment_manager (app_wrk, ls);
+ lsh = session_handle (ls);
+
+ s = session_alloc (0 /* listener on main worker */);
+ session_set_state (s, SESSION_STATE_LISTENING);
+ s->flags |= SESSION_F_IS_CLESS;
+ s->app_wrk_index = app_wrk->wrk_index;
+ ls = session_get_from_handle (lsh);
+ s->session_type = ls->session_type;
+ s->connection_index = ls->connection_index;
+
+ segment_manager_alloc_session_fifos (sm, s->thread_index, &rx_fifo,
+ &tx_fifo);
+
+ rx_fifo->shr->master_session_index = s->session_index;
+ rx_fifo->master_thread_index = s->thread_index;
+
+ tx_fifo->shr->master_session_index = s->session_index;
+ tx_fifo->master_thread_index = s->thread_index;
+
+ s->rx_fifo = rx_fifo;
+ s->tx_fifo = tx_fifo;
+
+ vec_validate (al->cl_listeners, app_wrk->wrk_map_index);
+ al->cl_listeners[app_wrk->wrk_map_index] = s->session_index;
+
+ return 0;
+}
+
+void
+app_worker_free_wrk_cl_session (app_worker_t *app_wrk, session_t *ls)
+{
+ app_listener_t *al;
+ session_t *s;
+
+ al = app_listener_get_w_session (ls);
+
+ s = app_listener_get_wrk_cl_session (al, app_wrk->wrk_map_index);
+ segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
+ session_free (s);
+
+ al->cl_listeners[app_wrk->wrk_map_index] = SESSION_INVALID_INDEX;
+}
+
+int
app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
{
segment_manager_t *sm;
/* Allocate segment manager. All sessions derived out of a listen session
- * have fifos allocated by the same segment manager. */
+ * have fifos allocated by the same segment manager.
+ * TODO(fcoras): limit memory consumption by cless listeners */
if (!(sm = app_worker_alloc_segment_manager (app_wrk)))
return SESSION_E_ALLOC;
@@ -202,12 +257,9 @@ app_worker_init_listener (app_worker_t * app_wrk, session_t * ls)
hash_set (app_wrk->listeners_table, listen_session_get_handle (ls),
segment_manager_index (sm));
- if (transport_connection_is_cless (session_get_transport (ls)))
- {
- if (ls->rx_fifo)
- return SESSION_E_NOSUPPORT;
- return app_worker_alloc_session_fifos (sm, ls);
- }
+ if (ls->flags & SESSION_F_IS_CLESS)
+ return app_worker_alloc_wrk_cl_session (app_wrk, ls);
+
return 0;
}
@@ -276,12 +328,8 @@ app_worker_stop_listen_session (app_worker_t * app_wrk, session_t * ls)
if (PREDICT_FALSE (!sm_indexp))
return;
- /* Dealloc fifos, if any (dgram listeners) */
- if (ls->rx_fifo)
- {
- segment_manager_dealloc_fifos (ls->rx_fifo, ls->tx_fifo);
- ls->tx_fifo = ls->rx_fifo = 0;
- }
+ if (ls->flags & SESSION_F_IS_CLESS)
+ app_worker_free_wrk_cl_session (app_wrk, ls);
/* Try to cleanup segment manager */
sm = segment_manager_get (*sm_indexp);
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 2afc37a77f8..0d2e1b1615e 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -867,11 +867,22 @@ session_enqueue_dgram_connection (session_t *s, session_dgram_hdr_t *hdr,
}
int
+session_enqueue_dgram_connection2 (session_t *s, session_dgram_hdr_t *hdr,
+ vlib_buffer_t *b, u8 proto, u8 queue_event)
+{
+ return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
+ queue_event, 1 /* is_cl */);
+}
+
+int
session_enqueue_dgram_connection_cl (session_t *s, session_dgram_hdr_t *hdr,
vlib_buffer_t *b, u8 proto,
u8 queue_event)
{
- return session_enqueue_dgram_connection_inline (s, hdr, b, proto,
+ session_t *awls;
+
+ awls = app_listener_select_wrk_cl_session (s, hdr);
+ return session_enqueue_dgram_connection_inline (awls, hdr, b, proto,
queue_event, 1 /* is_cl */);
}
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index ebaad5a93a7..78158d5f3ed 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -495,6 +495,9 @@ int session_enqueue_dgram_connection (session_t * s,
session_dgram_hdr_t * hdr,
vlib_buffer_t * b, u8 proto,
u8 queue_event);
+int session_enqueue_dgram_connection2 (session_t *s, session_dgram_hdr_t *hdr,
+ vlib_buffer_t *b, u8 proto,
+ u8 queue_event);
int session_enqueue_dgram_connection_cl (session_t *s,
session_dgram_hdr_t *hdr,
vlib_buffer_t *b, u8 proto,
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 0574a58c723..84d44e9fa3e 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -276,7 +276,7 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
session_handle_t handle, int rv)
{
session_bound_msg_t m = { 0 };
- transport_endpoint_t tep;
+ transport_connection_t *ltc;
fifo_segment_t *eq_seg;
app_worker_t *app_wrk;
application_t *app;
@@ -298,23 +298,24 @@ mq_send_session_bound_cb (u32 app_wrk_index, u32 api_context,
else
ls = app_listener_get_local_session (al);
- session_get_endpoint (ls, &tep, 1 /* is_lcl */);
- m.lcl_port = tep.port;
- m.lcl_is_ip4 = tep.is_ip4;
- clib_memcpy_fast (m.lcl_ip, &tep.ip, sizeof (tep.ip));
+ ltc = session_get_transport (ls);
+ m.lcl_port = ltc->lcl_port;
+ m.lcl_is_ip4 = ltc->is_ip4;
+ clib_memcpy_fast (m.lcl_ip, &ltc->lcl_ip, sizeof (m.lcl_ip));
app = application_get (app_wrk->app_index);
eq_seg = application_get_rx_mqs_segment (app);
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, ls->thread_index);
m.mq_index = ls->thread_index;
- if (session_transport_service_type (ls) == TRANSPORT_SERVICE_CL &&
- ls->rx_fifo)
+ if (transport_connection_is_cless (ltc))
{
+ session_t *wrk_ls;
m.mq_index = transport_cl_thread ();
m.vpp_evt_q = fifo_segment_msg_q_offset (eq_seg, m.mq_index);
- m.rx_fifo = fifo_segment_fifo_offset (ls->rx_fifo);
- m.tx_fifo = fifo_segment_fifo_offset (ls->tx_fifo);
- m.segment_handle = session_segment_handle (ls);
+ wrk_ls = app_listener_get_wrk_cl_session (al, app_wrk->wrk_map_index);
+ m.rx_fifo = fifo_segment_fifo_offset (wrk_ls->rx_fifo);
+ m.tx_fifo = fifo_segment_fifo_offset (wrk_ls->tx_fifo);
+ m.segment_handle = session_segment_handle (wrk_ls);
}
snd_msg:
diff --git a/src/vnet/udp/udp_input.c b/src/vnet/udp/udp_input.c
index 66f6229efa6..9cabfdf73cd 100644
--- a/src/vnet/udp/udp_input.c
+++ b/src/vnet/udp/udp_input.c
@@ -136,39 +136,46 @@ udp_connection_enqueue (udp_connection_t * uc0, session_t * s0,
int wrote0;
if (!(uc0->flags & UDP_CONN_F_CONNECTED))
- clib_spinlock_lock (&uc0->rx_lock);
+ {
+ clib_spinlock_lock (&uc0->rx_lock);
+
+ wrote0 = session_enqueue_dgram_connection_cl (
+ s0, hdr0, b, TRANSPORT_PROTO_UDP, queue_event);
+
+ clib_spinlock_unlock (&uc0->rx_lock);
+
+ /* Expect cl udp enqueue to fail because fifo enqueue */
+ if (PREDICT_FALSE (wrote0 == 0))
+ *error0 = UDP_ERROR_FIFO_FULL;
+
+ return;
+ }
if (svm_fifo_max_enqueue_prod (s0->rx_fifo)
< hdr0->data_length + sizeof (session_dgram_hdr_t))
{
*error0 = UDP_ERROR_FIFO_FULL;
- goto unlock_rx_lock;
+ return;
}
/* If session is owned by another thread and rx event needed,
* enqueue event now while we still have the peeker lock */
if (s0->thread_index != thread_index)
{
- wrote0 = session_enqueue_dgram_connection_cl (
+ wrote0 = session_enqueue_dgram_connection2 (
s0, hdr0, b, TRANSPORT_PROTO_UDP,
- /* queue event */ queue_event && !svm_fifo_has_event (s0->rx_fifo));
+ queue_event && !svm_fifo_has_event (s0->rx_fifo));
}
else
{
- wrote0 = session_enqueue_dgram_connection (s0, hdr0, b,
- TRANSPORT_PROTO_UDP,
- queue_event);
+ wrote0 = session_enqueue_dgram_connection (
+ s0, hdr0, b, TRANSPORT_PROTO_UDP, queue_event);
}
/* In some rare cases, session_enqueue_dgram_connection can fail because a
* chunk cannot be allocated in the RX FIFO */
if (PREDICT_FALSE (wrote0 == 0))
*error0 = UDP_ERROR_FIFO_NOMEM;
-
-unlock_rx_lock:
-
- if (!(uc0->flags & UDP_CONN_F_CONNECTED))
- clib_spinlock_unlock (&uc0->rx_lock);
}
always_inline session_t *