aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/session/session.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session/session.c')
-rw-r--r--src/vnet/session/session.c95
1 files changed, 71 insertions, 24 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index d258b82c983..dfc967b12dc 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -355,14 +355,19 @@ session_enqueue_stream_connection (transport_connection_t * tc,
return enqueued;
}
+
int
-session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b,
- u8 proto, u8 queue_event)
+session_enqueue_dgram_connection (stream_session_t * s,
+ session_dgram_hdr_t * hdr,
+ vlib_buffer_t * b, u8 proto, u8 queue_event)
{
int enqueued = 0, rv, in_order_off;
- if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length)
- return -1;
+ ASSERT (svm_fifo_max_enqueue (s->server_rx_fifo)
+ >= b->current_length + sizeof (*hdr));
+
+ svm_fifo_enqueue_nowait (s->server_rx_fifo, sizeof (session_dgram_hdr_t),
+ (u8 *) hdr);
enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
@@ -530,6 +535,16 @@ session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
return errors;
}
+int
+session_manager_flush_all_enqueue_events (u8 transport_proto)
+{
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ int i, errors = 0;
+ for (i = 0; i < 1 + vtm->n_threads; i++)
+ errors += session_manager_flush_enqueue_events (transport_proto, i);
+ return errors;
+}
+
/**
* Init fifo tail and head pointers
*
@@ -825,7 +840,7 @@ session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque)
if (session_alloc_and_init (sm, tc, 1, &s))
return -1;
s->app_index = app->index;
- s->session_state = SESSION_STATE_CONNECTING_READY;
+ s->session_state = SESSION_STATE_OPENED;
/* Tell the app about the new event fifo for this session */
app->cb_fns.session_connected_callback (app->index, opaque, s, 0);
@@ -841,10 +856,6 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque)
u64 handle;
int rv;
- /* TODO until udp is fixed */
- if (rmt->transport_proto == TRANSPORT_PROTO_UDP)
- return session_open_cl (app_index, rmt, opaque);
-
tep = session_endpoint_to_transport (rmt);
rv = tp_vfts[rmt->transport_proto].open (tep);
if (rv < 0)
@@ -912,14 +923,6 @@ session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque)
return session_open_srv_fns[tst] (app_index, rmt, opaque);
}
-/**
- * Ask transport to listen on local transport endpoint.
- *
- * @param s Session for which listen will be called. Note that unlike
- * established sessions, listen sessions are not associated to a
- * thread.
- * @param tep Local endpoint to be listened on.
- */
int
session_listen_vc (stream_session_t * s, session_endpoint_t * sep)
{
@@ -948,6 +951,40 @@ session_listen_vc (stream_session_t * s, session_endpoint_t * sep)
}
int
+session_listen_cl (stream_session_t * s, session_endpoint_t * sep)
+{
+ transport_connection_t *tc;
+ application_t *server;
+ segment_manager_t *sm;
+ u32 tci;
+
+ /* Transport bind/listen */
+ tci = tp_vfts[sep->transport_proto].bind (s->session_index,
+ session_endpoint_to_transport
+ (sep));
+
+ if (tci == (u32) ~ 0)
+ return -1;
+
+ /* Attach transport to session */
+ s->connection_index = tci;
+ tc = tp_vfts[sep->transport_proto].get_listener (tci);
+
+ /* Weird but handle it ... */
+ if (tc == 0)
+ return -1;
+
+ server = application_get (s->app_index);
+ sm = application_get_listen_segment_manager (server, s);
+ if (session_alloc_fifos (sm, s))
+ return -1;
+
+ /* Add to the main lookup table */
+ session_lookup_add_connection (tc, s->session_index);
+ return 0;
+}
+
+int
session_listen_app (stream_session_t * s, session_endpoint_t * sep)
{
session_endpoint_extended_t esep;
@@ -965,11 +1002,19 @@ typedef int (*session_listen_service_fn) (stream_session_t *,
static session_listen_service_fn
session_listen_srv_fns[TRANSPORT_N_SERVICES] = {
session_listen_vc,
- session_listen_vc,
+ session_listen_cl,
session_listen_app,
};
/* *INDENT-ON* */
+/**
+ * Ask transport to listen on local transport endpoint.
+ *
+ * @param s Session for which listen will be called. Note that unlike
+ * established sessions, listen sessions are not associated to a
+ * thread.
+ * @param tep Local endpoint to be listened on.
+ */
int
stream_session_listen (stream_session_t * s, session_endpoint_t * sep)
{
@@ -1125,7 +1170,8 @@ session_manager_get_evt_q_segment (void)
static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
session_tx_fifo_peek_and_snd,
session_tx_fifo_dequeue_and_snd,
- session_tx_fifo_dequeue_internal
+ session_tx_fifo_dequeue_internal,
+ session_tx_fifo_dequeue_and_snd
};
/* *INDENT-ON* */
@@ -1228,11 +1274,12 @@ session_manager_main_enable (vlib_main_t * vm)
vec_validate (smm->peekers_rw_locks, num_threads - 1);
for (i = 0; i < TRANSPORT_N_PROTO; i++)
- for (j = 0; j < num_threads; j++)
- {
- vec_validate (smm->session_to_enqueue[i], num_threads - 1);
- vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
- }
+ {
+ vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
+ vec_validate (smm->session_to_enqueue[i], num_threads - 1);
+ for (j = 0; j < num_threads; j++)
+ smm->current_enqueue_epoch[i][j] = 1;
+ }
for (i = 0; i < num_threads; i++)
{