diff options
Diffstat (limited to 'src/vnet/session/session.c')
-rw-r--r-- | src/vnet/session/session.c | 95 |
1 files changed, 71 insertions, 24 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index d258b82c983..dfc967b12dc 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -355,14 +355,19 @@ session_enqueue_stream_connection (transport_connection_t * tc, return enqueued; } + int -session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b, - u8 proto, u8 queue_event) +session_enqueue_dgram_connection (stream_session_t * s, + session_dgram_hdr_t * hdr, + vlib_buffer_t * b, u8 proto, u8 queue_event) { int enqueued = 0, rv, in_order_off; - if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length) - return -1; + ASSERT (svm_fifo_max_enqueue (s->server_rx_fifo) + >= b->current_length + sizeof (*hdr)); + + svm_fifo_enqueue_nowait (s->server_rx_fifo, sizeof (session_dgram_hdr_t), + (u8 *) hdr); enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length, vlib_buffer_get_current (b)); if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0)) @@ -530,6 +535,16 @@ session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) return errors; } +int +session_manager_flush_all_enqueue_events (u8 transport_proto) +{ + vlib_thread_main_t *vtm = vlib_get_thread_main (); + int i, errors = 0; + for (i = 0; i < 1 + vtm->n_threads; i++) + errors += session_manager_flush_enqueue_events (transport_proto, i); + return errors; +} + /** * Init fifo tail and head pointers * @@ -825,7 +840,7 @@ session_open_cl (u32 app_index, session_endpoint_t * rmt, u32 opaque) if (session_alloc_and_init (sm, tc, 1, &s)) return -1; s->app_index = app->index; - s->session_state = SESSION_STATE_CONNECTING_READY; + s->session_state = SESSION_STATE_OPENED; /* Tell the app about the new event fifo for this session */ app->cb_fns.session_connected_callback (app->index, opaque, s, 0); @@ -841,10 +856,6 @@ session_open_vc (u32 app_index, session_endpoint_t * rmt, u32 opaque) u64 handle; int rv; - /* TODO until udp is fixed */ - if (rmt->transport_proto == TRANSPORT_PROTO_UDP) - return session_open_cl (app_index, rmt, opaque); - tep = session_endpoint_to_transport (rmt); rv = tp_vfts[rmt->transport_proto].open (tep); if (rv < 0) @@ -912,14 +923,6 @@ session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque) return session_open_srv_fns[tst] (app_index, rmt, opaque); } -/** - * Ask transport to listen on local transport endpoint. - * - * @param s Session for which listen will be called. Note that unlike - * established sessions, listen sessions are not associated to a - * thread. - * @param tep Local endpoint to be listened on. - */ int session_listen_vc (stream_session_t * s, session_endpoint_t * sep) { @@ -948,6 +951,40 @@ session_listen_vc (stream_session_t * s, session_endpoint_t * sep) } int +session_listen_cl (stream_session_t * s, session_endpoint_t * sep) +{ + transport_connection_t *tc; + application_t *server; + segment_manager_t *sm; + u32 tci; + + /* Transport bind/listen */ + tci = tp_vfts[sep->transport_proto].bind (s->session_index, + session_endpoint_to_transport + (sep)); + + if (tci == (u32) ~ 0) + return -1; + + /* Attach transport to session */ + s->connection_index = tci; + tc = tp_vfts[sep->transport_proto].get_listener (tci); + + /* Weird but handle it ... */ + if (tc == 0) + return -1; + + server = application_get (s->app_index); + sm = application_get_listen_segment_manager (server, s); + if (session_alloc_fifos (sm, s)) + return -1; + + /* Add to the main lookup table */ + session_lookup_add_connection (tc, s->session_index); + return 0; +} + +int session_listen_app (stream_session_t * s, session_endpoint_t * sep) { session_endpoint_extended_t esep; @@ -965,11 +1002,19 @@ typedef int (*session_listen_service_fn) (stream_session_t *, static session_listen_service_fn session_listen_srv_fns[TRANSPORT_N_SERVICES] = { session_listen_vc, - session_listen_vc, + session_listen_cl, session_listen_app, }; /* *INDENT-ON* */ +/** + * Ask transport to listen on local transport endpoint. + * + * @param s Session for which listen will be called. Note that unlike + * established sessions, listen sessions are not associated to a + * thread. + * @param tep Local endpoint to be listened on. + */ int stream_session_listen (stream_session_t * s, session_endpoint_t * sep) { @@ -1125,7 +1170,8 @@ session_manager_get_evt_q_segment (void) static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = { session_tx_fifo_peek_and_snd, session_tx_fifo_dequeue_and_snd, - session_tx_fifo_dequeue_internal + session_tx_fifo_dequeue_internal, + session_tx_fifo_dequeue_and_snd }; /* *INDENT-ON* */ @@ -1228,11 +1274,12 @@ session_manager_main_enable (vlib_main_t * vm) vec_validate (smm->peekers_rw_locks, num_threads - 1); for (i = 0; i < TRANSPORT_N_PROTO; i++) - for (j = 0; j < num_threads; j++) - { - vec_validate (smm->session_to_enqueue[i], num_threads - 1); - vec_validate (smm->current_enqueue_epoch[i], num_threads - 1); - } + { + vec_validate (smm->current_enqueue_epoch[i], num_threads - 1); + vec_validate (smm->session_to_enqueue[i], num_threads - 1); + for (j = 0; j < num_threads; j++) + smm->current_enqueue_epoch[i][j] = 1; + } for (i = 0; i < num_threads; i++) { |