summaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2017-04-19 13:00:05 -0700
committerDave Barach <openvpp@barachs.net>2017-04-24 12:02:14 +0000
commita5464817522c7a7dc760af4612f1d6a68ed0afc8 (patch)
treec173b6d4e0fac69394d3c1b61a842dc582b2a218 /src/vnet/session
parentbc66a9122f73b97ca1ae60f1df47b39c141be3ae (diff)
Session layer improvements
Among others: - Moved app event queue to shared memory segment - Use private memory segment for builtin apps - Remove pid from svm fifo - Protect session fifo (de)allocation - Use fifo event for session disconnects - Have session queue node poll in all wk threads Change-Id: I89dbf7fdfebef12f5ef2b34ba3ef3c2c07f49ff2 Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/application.c48
-rw-r--r--src/vnet/session/application.h12
-rw-r--r--src/vnet/session/application_interface.c26
-rw-r--r--src/vnet/session/application_interface.h38
-rw-r--r--src/vnet/session/node.c63
-rw-r--r--src/vnet/session/segment_manager.c134
-rw-r--r--src/vnet/session/segment_manager.h12
-rw-r--r--src/vnet/session/session.c138
-rw-r--r--src/vnet/session/session.h19
-rwxr-xr-xsrc/vnet/session/session_api.c58
10 files changed, 330 insertions, 218 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 5a45537b798..ccf9837f4cc 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -87,8 +87,6 @@ application_new ()
void
application_del (application_t * app)
{
- api_main_t *am = &api_main;
- void *oldheap;
segment_manager_t *sm;
u64 handle;
u32 index, *handles = 0;
@@ -96,6 +94,11 @@ application_del (application_t * app)
vnet_unbind_args_t _a, *a = &_a;
/*
+ * The app event queue allocated in first segment is cleared with
+ * the segment manager. No need to explicitly free it.
+ */
+
+ /*
* Cleanup segment managers
*/
if (app->connects_seg_manager != (u32) ~ 0)
@@ -120,14 +123,6 @@ application_del (application_t * app)
vnet_unbind (a);
}
- /*
- * Free the event fifo in the /vpe-api shared-memory segment
- */
- oldheap = svm_push_data_heap (am->vlib_rp);
- if (app->event_queue)
- unix_shared_memory_queue_free (app->event_queue);
- svm_pop_heap (oldheap);
-
application_table_del (app);
pool_put (app_pool, app);
}
@@ -149,30 +144,14 @@ int
application_init (application_t * app, u32 api_client_index, u64 * options,
session_cb_vft_t * cb_fns)
{
- api_main_t *am = &api_main;
segment_manager_t *sm;
segment_manager_properties_t *props;
- void *oldheap;
- u32 app_evt_queue_size;
+ u32 app_evt_queue_size, first_seg_size;
int rv;
app_evt_queue_size = options[APP_EVT_QUEUE_SIZE] > 0 ?
options[APP_EVT_QUEUE_SIZE] : default_app_evt_queue_size;
- /* Allocate event fifo in the /vpe-api shared-memory segment */
- oldheap = svm_push_data_heap (am->vlib_rp);
-
- /* Allocate server event queue */
- app->event_queue =
- unix_shared_memory_queue_init (app_evt_queue_size,
- sizeof (session_fifo_event_t),
- 0 /* consumer pid */ ,
- 0
- /* (do not) signal when queue non-empty */
- );
-
- svm_pop_heap (oldheap);
-
/* Setup segment manager */
sm = segment_manager_new ();
sm->app_index = app->index;
@@ -181,16 +160,21 @@ application_init (application_t * app, u32 api_client_index, u64 * options,
props->rx_fifo_size = options[SESSION_OPTIONS_RX_FIFO_SIZE];
props->tx_fifo_size = options[SESSION_OPTIONS_TX_FIFO_SIZE];
props->add_segment = props->add_segment_size != 0;
+ props->use_private_segment = options[APP_OPTIONS_FLAGS]
+ & APP_OPTIONS_FLAGS_BUILTIN_APP;
- if ((rv = segment_manager_init (sm, props,
- options[SESSION_OPTIONS_SEGMENT_SIZE])))
+ first_seg_size = options[SESSION_OPTIONS_SEGMENT_SIZE];
+ if ((rv = segment_manager_init (sm, props, first_seg_size)))
return rv;
app->first_segment_manager = segment_manager_index (sm);
app->api_client_index = api_client_index;
- app->flags = options[SESSION_OPTIONS_FLAGS];
+ app->flags = options[APP_OPTIONS_FLAGS];
app->cb_fns = *cb_fns;
+ /* Allocate app event queue in the first shared-memory segment */
+ app->event_queue = segment_manager_alloc_queue (sm, app_evt_queue_size);
+
/* Check that the obvious things are properly set up */
application_verify_cb_fns (cb_fns);
@@ -451,8 +435,8 @@ application_format_connects (application_t * app, int verbose)
continue;
fifo = fifos[i];
- session_index = fifo->server_session_index;
- thread_index = fifo->server_thread_index;
+ session_index = fifo->master_session_index;
+ thread_index = fifo->master_thread_index;
session = stream_session_get (session_index, thread_index);
str = format (0, "%U", format_stream_session, session, verbose);
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index 6bcee9d312e..35caae85dbb 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -61,18 +61,6 @@ typedef struct _application
/** Flags */
u32 flags;
- /* Stream server mode: accept or connect
- * TODO REMOVE*/
- u8 mode;
-
- /** Index of the listen session or connect session
- * TODO REMOVE*/
- u32 session_index;
-
- /** Session thread index for client connect sessions
- * TODO REMOVE */
- u32 thread_index;
-
/*
* Binary API interface to external app
*/
diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c
index 96d2c62132a..ad44baa15c3 100644
--- a/src/vnet/session/application_interface.c
+++ b/src/vnet/session/application_interface.c
@@ -142,7 +142,7 @@ vnet_connect_i (u32 app_index, u32 api_context, session_type_t sst,
* Server is willing to have a direct fifo connection created
* instead of going through the state machine, etc.
*/
- if (server->flags & SESSION_OPTIONS_FLAGS_USE_FIFO)
+ if (server->flags & APP_OPTIONS_FLAGS_USE_FIFO)
return server->cb_fns.
redirect_connect_callback (server->api_client_index, mp);
}
@@ -363,7 +363,11 @@ vnet_disconnect_session (vnet_disconnect_args_t * a)
if (!s || s->app_index != a->app_index)
return VNET_API_ERROR_INVALID_VALUE;
- stream_session_disconnect (s);
+ /* We're peeking into another's thread pool. Make sure */
+ ASSERT (s->session_index == index);
+
+ session_send_session_evt_to_thread (a->handle, FIFO_EVENT_DISCONNECT,
+ thread_index);
return 0;
}
@@ -395,24 +399,6 @@ vnet_connect (vnet_connect_args_t * a)
return vnet_connect_i (a->app_index, a->api_context, sst, &a->tep, a->mp);
}
-int
-vnet_disconnect (vnet_disconnect_args_t * a)
-{
- stream_session_t *session;
- u32 session_index, thread_index;
-
- if (api_parse_session_handle (a->handle, &session_index, &thread_index))
- {
- clib_warning ("Invalid handle");
- return -1;
- }
-
- session = stream_session_get (session_index, thread_index);
- stream_session_disconnect (session);
-
- return 0;
-}
-
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 2c497531533..7d924c146a4 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -30,10 +30,18 @@ typedef enum _session_api_proto
typedef struct _vnet_app_attach_args_t
{
+ /** Binary API client index */
u32 api_client_index;
+
+ /** Application and segment manager options */
u64 *options;
+
+ /** Session to application callback functions */
session_cb_vft_t *session_cb_vft;
+ /** Flag that indicates if app is builtin */
+ u8 builtin;
+
/*
* Results
*/
@@ -110,7 +118,7 @@ typedef struct _vnet_disconnect_args_t
typedef enum
{
APP_EVT_QUEUE_SIZE,
- SESSION_OPTIONS_FLAGS,
+ APP_OPTIONS_FLAGS,
SESSION_OPTIONS_SEGMENT_SIZE,
SESSION_OPTIONS_ADD_SEGMENT_SIZE,
SESSION_OPTIONS_RX_FIFO_SIZE,
@@ -119,11 +127,30 @@ typedef enum
SESSION_OPTIONS_N_OPTIONS
} app_attach_options_index_t;
-/** Server can handle delegated connect requests from local clients */
-#define SESSION_OPTIONS_FLAGS_USE_FIFO (1<<0)
+#define foreach_app_options_flags \
+ _(USE_FIFO, "Use FIFO with redirects") \
+ _(ADD_SEGMENT, "Add segment and signal app if needed") \
+ _(BUILTIN_APP, "Application is builtin") \
+
+typedef enum _app_options
+{
+#define _(sym, str) APP_OPTIONS_##sym,
+ foreach_app_options_flags
+#undef _
+} app_options_t;
+
+typedef enum _app_options_flags
+{
+#define _(sym, str) APP_OPTIONS_FLAGS_##sym = 1 << APP_OPTIONS_##sym,
+ foreach_app_options_flags
+#undef _
+} app_options_flags_t;
-/** Server wants vpp to add segments when out of memory for fifos */
-#define SESSION_OPTIONS_FLAGS_ADD_SEGMENT (1<<1)
+///** Server can handle delegated connect requests from local clients */
+//#define APP_OPTIONS_FLAGS_USE_FIFO (1<<0)
+//
+///** Server wants vpp to add segments when out of memory for fifos */
+//#define APP_OPTIONS_FLAGS_ADD_SEGMENT (1<<1)
#define VNET_CONNECT_REDIRECTED 123
@@ -138,7 +165,6 @@ int vnet_disconnect_session (vnet_disconnect_args_t * a);
int vnet_bind (vnet_bind_args_t * a);
int vnet_connect (vnet_connect_args_t * a);
int vnet_unbind (vnet_unbind_args_t * a);
-int vnet_disconnect (vnet_disconnect_args_t * a);
int
api_parse_session_handle (u64 handle, u32 * session_index,
diff --git a/src/vnet/session/node.c b/src/vnet/session/node.c
index dd211c51a59..210754fa723 100644
--- a/src/vnet/session/node.c
+++ b/src/vnet/session/node.c
@@ -218,8 +218,8 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
* 2) buffer chains */
if (peek_data)
{
- n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
- rx_offset, len_to_deq0, data0);
+ n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
+ len_to_deq0, data0);
if (n_bytes_read <= 0)
goto dequeue_fail;
@@ -230,8 +230,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
else
{
n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
- s0->pid, len_to_deq0,
- data0);
+ len_to_deq0, data0);
if (n_bytes_read <= 0)
goto dequeue_fail;
}
@@ -301,6 +300,26 @@ session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
n_tx_pkts, 0);
}
+stream_session_t *
+session_event_get_session (session_fifo_event_t * e0, u8 thread_index)
+{
+ svm_fifo_t *f0;
+ stream_session_t *s0;
+ u32 session_index0;
+
+ f0 = e0->fifo;
+ session_index0 = f0->master_session_index;
+
+ /* $$$ add multiple event queues, per vpp worker thread */
+ ASSERT (f0->master_thread_index == thread_index);
+
+ s0 = stream_session_get_if_valid (session_index0, thread_index);
+
+ ASSERT (s0->thread_index == thread_index);
+
+ return s0;
+}
+
static uword
session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * frame)
@@ -370,34 +389,24 @@ skip_dequeue:
n_events = vec_len (my_fifo_events);
for (i = 0; i < n_events; i++)
{
- svm_fifo_t *f0; /* $$$ prefetch 1 ahead maybe */
- stream_session_t *s0;
- u32 session_index0;
+ stream_session_t *s0; /* $$$ prefetch 1 ahead maybe */
session_fifo_event_t *e0;
e0 = &my_fifo_events[i];
- f0 = e0->fifo;
- session_index0 = f0->server_session_index;
-
- /* $$$ add multiple event queues, per vpp worker thread */
- ASSERT (f0->server_thread_index == my_thread_index);
- s0 = stream_session_get_if_valid (session_index0, my_thread_index);
-
- if (CLIB_DEBUG && !s0)
+ switch (e0->event_type)
{
- clib_warning ("It's dead, Jim!");
- continue;
- }
-
- if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
- continue;
+ case FIFO_EVENT_APP_TX:
+ s0 = session_event_get_session (e0, my_thread_index);
- ASSERT (s0->thread_index == my_thread_index);
+ if (CLIB_DEBUG && !s0)
+ {
+ clib_warning ("It's dead, Jim!");
+ continue;
+ }
- switch (e0->event_type)
- {
- case FIFO_EVENT_SERVER_TX:
+ if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
+ continue;
/* Spray packets in per session type frames, since they go to
* different nodes */
rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
@@ -408,10 +417,12 @@ skip_dequeue:
goto done;
break;
- case FIFO_EVENT_SERVER_EXIT:
+ case FIFO_EVENT_DISCONNECT:
+ s0 = stream_session_get_from_handle (e0->session_handle);
stream_session_disconnect (s0);
break;
case FIFO_EVENT_BUILTIN_RX:
+ s0 = session_event_get_session (e0, my_thread_index);
svm_fifo_unset_event (s0->server_rx_fifo);
/* Get session's server */
app = application_get (s0->app_index);
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
index 16e5bc56736..e053232001d 100644
--- a/src/vnet/session/segment_manager.c
+++ b/src/vnet/session/segment_manager.c
@@ -28,6 +28,11 @@ u32 segment_name_counter = 0;
segment_manager_t *segment_managers = 0;
/**
+ * Process private segment index
+ */
+u32 private_segment_index = ~0;
+
+/**
* Default fifo and segment size. TODO config.
*/
u32 default_fifo_size = 1 << 16;
@@ -100,6 +105,26 @@ session_manager_add_first_segment (segment_manager_t * sm, u32 segment_size)
return rv;
}
+static void
+segment_manager_alloc_process_private_segment ()
+{
+ svm_fifo_segment_create_args_t _a, *a = &_a;
+
+ if (private_segment_index != ~0)
+ return;
+
+ memset (a, 0, sizeof (*a));
+ a->segment_name = "process-private-segment";
+ a->segment_size = ~0;
+ a->new_segment_index = ~0;
+
+ if (svm_fifo_segment_create_process_private (a))
+ clib_warning ("Failed to create process private segment");
+
+ private_segment_index = a->new_segment_index;
+ ASSERT (private_segment_index != ~0);
+}
+
/**
* Initializes segment manager based on options provided.
* Returns error if svm segment allocation fails.
@@ -114,7 +139,9 @@ segment_manager_init (segment_manager_t * sm,
/* app allocates these */
sm->properties = properties;
- if (first_seg_size > 0)
+ first_seg_size = first_seg_size > 0 ? first_seg_size : default_segment_size;
+
+ if (sm->properties->use_private_segment == 0)
{
rv = session_manager_add_first_segment (sm, first_seg_size);
if (rv)
@@ -123,7 +150,15 @@ segment_manager_init (segment_manager_t * sm,
return rv;
}
}
+ else
+ {
+ if (private_segment_index == ~0)
+ segment_manager_alloc_process_private_segment ();
+ ASSERT (private_segment_index != ~0);
+ vec_add1 (sm->segment_indices, private_segment_index);
+ }
+ clib_spinlock_init (&sm->lockp);
return 0;
}
@@ -162,8 +197,8 @@ segment_manager_del (segment_manager_t * sm)
stream_session_t *session;
fifo = fifos[i];
- session_index = fifo->server_session_index;
- thread_index = fifo->server_thread_index;
+ session_index = fifo->master_session_index;
+ thread_index = fifo->master_thread_index;
session = stream_session_get (session_index, thread_index);
@@ -183,7 +218,9 @@ segment_manager_del (segment_manager_t * sm)
deleted_thread_indices[i]);
/* Instead of directly removing the session call disconnect */
- stream_session_disconnect (session);
+ session_send_session_evt_to_thread (stream_session_handle (session),
+ FIFO_EVENT_DISCONNECT,
+ deleted_thread_indices[i]);
/*
stream_session_table_del (smm, session);
@@ -200,6 +237,7 @@ segment_manager_del (segment_manager_t * sm)
/* svm_fifo_segment_delete (fifo_segment); */
}
+ clib_spinlock_free (&sm->lockp);
vec_free (deleted_sessions);
vec_free (deleted_thread_indices);
pool_put (segment_managers, sm);
@@ -232,9 +270,13 @@ segment_manager_alloc_session_fifos (segment_manager_t * sm,
u8 added_a_segment = 0;
int i;
- /* Allocate svm fifos */
ASSERT (vec_len (sm->segment_indices));
+ /* Make sure we don't have multiple threads trying to allocate segments
+ * at the same time. */
+ clib_spinlock_lock (&sm->lockp);
+
+ /* Allocate svm fifos */
again:
for (i = 0; i < vec_len (sm->segment_indices); i++)
{
@@ -283,7 +325,9 @@ again:
}
if (session_manager_add_segment (sm))
- return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
+ {
+ return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
+ }
added_a_segment = 1;
goto again;
@@ -295,14 +339,16 @@ again:
}
}
- if (added_a_segment)
- return segment_manager_notify_app_seg_add (sm, *fifo_segment_index);
-
/* Backpointers to segment manager */
sm_index = segment_manager_index (sm);
(*server_tx_fifo)->segment_manager = sm_index;
(*server_rx_fifo)->segment_manager = sm_index;
+ clib_spinlock_unlock (&sm->lockp);
+
+ if (added_a_segment)
+ return segment_manager_notify_app_seg_add (sm, *fifo_segment_index);
+
return 0;
}
@@ -313,26 +359,72 @@ segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
segment_manager_t *sm;
svm_fifo_segment_private_t *fifo_segment;
+ sm = segment_manager_get_if_valid (rx_fifo->segment_manager);
+
+ /* It's possible to have no segment manager if the session was removed
+ * as result of a detach */
+ if (!sm)
+ return;
+
fifo_segment = svm_fifo_get_segment (svm_segment_index);
svm_fifo_segment_free_fifo (fifo_segment, rx_fifo);
svm_fifo_segment_free_fifo (fifo_segment, tx_fifo);
- /* If we have segment manager, try doing some cleanup.
- * It's possible to have no segment manager if the session was removed
- * as result of a detach */
- sm = segment_manager_get_if_valid (rx_fifo->segment_manager);
- if (sm)
+ /* Remove segment only if it holds no fifos and not the first */
+ if (sm->segment_indices[0] != svm_segment_index
+ && !svm_fifo_segment_has_fifos (fifo_segment))
{
- /* Remove segment only if it holds no fifos and not the first */
- if (sm->segment_indices[0] != svm_segment_index
- && !svm_fifo_segment_has_fifos (fifo_segment))
- {
- svm_fifo_segment_delete (fifo_segment);
- vec_del1 (sm->segment_indices, svm_segment_index);
- }
+ svm_fifo_segment_delete (fifo_segment);
+ vec_del1 (sm->segment_indices, svm_segment_index);
}
}
+/**
+ * Allocates shm queue in the first segment
+ */
+unix_shared_memory_queue_t *
+segment_manager_alloc_queue (segment_manager_t * sm, u32 queue_size)
+{
+ ssvm_shared_header_t *sh;
+ svm_fifo_segment_private_t *segment;
+ unix_shared_memory_queue_t *q;
+ void *oldheap;
+
+ ASSERT (sm->segment_indices != 0);
+
+ segment = svm_fifo_get_segment (sm->segment_indices[0]);
+ sh = segment->ssvm.sh;
+
+ oldheap = ssvm_push_heap (sh);
+ q =
+ unix_shared_memory_queue_init (queue_size, sizeof (session_fifo_event_t),
+ 0 /* consumer pid */ , 0
+ /* signal when queue non-empty */ );
+ ssvm_pop_heap (oldheap);
+ return q;
+}
+
+/**
+ * Frees shm queue allocated in the first segment
+ */
+void
+segment_manager_dealloc_queue (segment_manager_t * sm,
+ unix_shared_memory_queue_t * q)
+{
+ ssvm_shared_header_t *sh;
+ svm_fifo_segment_private_t *segment;
+ void *oldheap;
+
+ ASSERT (sm->segment_indices != 0);
+
+ segment = svm_fifo_get_segment (sm->segment_indices[0]);
+ sh = segment->ssvm.sh;
+
+ oldheap = ssvm_push_heap (sh);
+ unix_shared_memory_queue_free (q);
+ ssvm_pop_heap (oldheap);
+}
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vnet/session/segment_manager.h b/src/vnet/session/segment_manager.h
index 778d6040e94..2710bb544d7 100644
--- a/src/vnet/session/segment_manager.h
+++ b/src/vnet/session/segment_manager.h
@@ -18,6 +18,10 @@
#include <vnet/vnet.h>
#include <svm/svm_fifo_segment.h>
+#include <vlibmemory/unix_shared_memory_queue.h>
+#include <vlibmemory/api.h>
+#include <vppinfra/lock.h>
+
typedef struct _segment_manager_properties
{
/** Session fifo sizes. */
@@ -30,10 +34,14 @@ typedef struct _segment_manager_properties
/** Flag that indicates if additional segments should be created */
u8 add_segment;
+ /** Use private memory segment instead of shared memory */
+ u8 use_private_segment;
} segment_manager_properties_t;
typedef struct _segment_manager
{
+ clib_spinlock_t lockp;
+
/** segments mapped by this manager */
u32 *segment_indices;
@@ -95,6 +103,10 @@ segment_manager_alloc_session_fifos (segment_manager_t * sm,
void
segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
svm_fifo_t * tx_fifo);
+unix_shared_memory_queue_t *segment_manager_alloc_queue (segment_manager_t *
+ sm, u32 queue_size);
+void segment_manager_dealloc_queue (segment_manager_t * sm,
+ unix_shared_memory_queue_t * q);
#endif /* SRC_VNET_SESSION_SEGMENT_MANAGER_H_ */
/*
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index e6cfe7dab3d..d17c93f8f69 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -377,33 +377,6 @@ stream_session_lookup_transport6 (ip6_address_t * lcl, ip6_address_t * rmt,
return 0;
}
-/**
- * Allocate vpp event queue (once) per worker thread
- */
-void
-session_vpp_event_queue_allocate (session_manager_main_t * smm,
- u32 thread_index)
-{
- api_main_t *am = &api_main;
- void *oldheap;
-
- if (smm->vpp_event_queues[thread_index] == 0)
- {
- /* Allocate event fifo in the /vpe-api shared-memory segment */
- oldheap = svm_push_data_heap (am->vlib_rp);
-
- smm->vpp_event_queues[thread_index] =
- unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
- sizeof (session_fifo_event_t),
- 0 /* consumer pid */ ,
- 0
- /* (do not) send signal when queue non-empty */
- );
-
- svm_pop_heap (oldheap);
- }
-}
-
int
stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
stream_session_t ** ret_s)
@@ -428,11 +401,11 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
/* Initialize backpointers */
pool_index = s - smm->sessions[thread_index];
- server_rx_fifo->server_session_index = pool_index;
- server_rx_fifo->server_thread_index = thread_index;
+ server_rx_fifo->master_session_index = pool_index;
+ server_rx_fifo->master_thread_index = thread_index;
- server_tx_fifo->server_session_index = pool_index;
- server_tx_fifo->server_thread_index = thread_index;
+ server_tx_fifo->master_session_index = pool_index;
+ server_tx_fifo->master_thread_index = thread_index;
s->server_rx_fifo = server_rx_fifo;
s->server_tx_fifo = server_tx_fifo;
@@ -485,7 +458,7 @@ stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
if (PREDICT_FALSE (len > svm_fifo_max_enqueue (s->server_rx_fifo)))
return -1;
- enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, s->pid, len, data);
+ enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
if (queue_event)
{
@@ -527,14 +500,14 @@ stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
- return svm_fifo_peek (s->server_tx_fifo, s->pid, offset, max_bytes, buffer);
+ return svm_fifo_peek (s->server_tx_fifo, offset, max_bytes, buffer);
}
u32
stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
- return svm_fifo_dequeue_drop (s->server_tx_fifo, s->pid, max_bytes);
+ return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
}
/**
@@ -568,7 +541,7 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
{
/* Fabricate event */
evt.fifo = s->server_rx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_RX;
+ evt.event_type = FIFO_EVENT_APP_RX;
evt.event_id = serial_number++;
/* Add event to server's event queue */
@@ -899,37 +872,45 @@ stream_session_stop_listen (stream_session_t * s)
return 0;
}
+void
+session_send_session_evt_to_thread (u64 session_handle,
+ fifo_event_type_t evt_type,
+ u32 thread_index)
+{
+ static u16 serial_number = 0;
+ session_fifo_event_t evt;
+ unix_shared_memory_queue_t *q;
+
+ /* Fabricate event */
+ evt.session_handle = session_handle;
+ evt.event_type = evt_type;
+ evt.event_id = serial_number++;
+
+ q = session_manager_get_vpp_event_queue (thread_index);
+
+ /* Based on request block (or not) for lack of space */
+ if (PREDICT_TRUE (q->cursize < q->maxsize))
+ unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ else
+ {
+ clib_warning ("queue full");
+ return;
+ }
+}
+
/**
* Disconnect session and propagate to transport. This should eventually
* result in a delete notification that allows us to cleanup session state.
* Called for both active/passive disconnects.
+ *
+ * Should be called from the session's thread.
*/
void
stream_session_disconnect (stream_session_t * s)
{
-// session_fifo_event_t evt;
-
s->session_state = SESSION_STATE_CLOSED;
- /* RPC to vpp evt queue in the right thread */
-
tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
-
-// {
-// /* Fabricate event */
-// evt.fifo = s->server_rx_fifo;
-// evt.event_type = FIFO_EVENT_SERVER_RX;
-// evt.event_id = serial_number++;
-//
-// /* Based on request block (or not) for lack of space */
-// if (PREDICT_TRUE(q->cursize < q->maxsize))
-// unix_shared_memory_queue_add (app->event_queue, (u8 *) &evt,
-// 0 /* do wait for mutex */);
-// else
-// {
-// clib_warning("fifo full");
-// return -1;
-// }
-// }
}
/**
@@ -976,6 +957,33 @@ session_get_transport_vft (u8 type)
return &tp_vfts[type];
}
+/**
+ * Allocate vpp event queue (once) per worker thread
+ */
+void
+session_vpp_event_queue_allocate (session_manager_main_t * smm,
+ u32 thread_index)
+{
+ api_main_t *am = &api_main;
+ void *oldheap;
+
+ if (smm->vpp_event_queues[thread_index] == 0)
+ {
+ /* Allocate event fifo in the /vpe-api shared-memory segment */
+ oldheap = svm_push_data_heap (am->vlib_rp);
+
+ smm->vpp_event_queues[thread_index] =
+ unix_shared_memory_queue_init (2048 /* nels $$$$ config */ ,
+ sizeof (session_fifo_event_t),
+ 0 /* consumer pid */ ,
+ 0
+ /* (do not) send signal when queue non-empty */
+ );
+
+ svm_pop_heap (oldheap);
+ }
+}
+
static clib_error_t *
session_manager_main_enable (vlib_main_t * vm)
{
@@ -1043,6 +1051,18 @@ session_manager_main_enable (vlib_main_t * vm)
return 0;
}
+void
+session_node_enable_disable (u8 is_en)
+{
+ u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
+ /* *INDENT-OFF* */
+ foreach_vlib_main (({
+ vlib_node_set_state (this_vlib_main, session_queue_node.index,
+ state);
+ }));
+ /* *INDENT-ON* */
+}
+
clib_error_t *
vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
{
@@ -1051,16 +1071,14 @@ vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
if (session_manager_main.is_enabled)
return 0;
- vlib_node_set_state (vm, session_queue_node.index,
- VLIB_NODE_STATE_POLLING);
+ session_node_enable_disable (is_en);
return session_manager_main_enable (vm);
}
else
{
session_manager_main.is_enabled = 0;
- vlib_node_set_state (vm, session_queue_node.index,
- VLIB_NODE_STATE_DISABLED);
+ session_node_enable_disable (is_en);
}
return 0;
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 6e4ea96dd5c..8cd72f3577a 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -17,9 +17,6 @@
#include <vnet/session/transport.h>
#include <vlibmemory/unix_shared_memory_queue.h>
-#include <vlibmemory/api.h>
-#include <vppinfra/sparse_vec.h>
-#include <svm/svm_fifo_segment.h>
#include <vnet/session/session_debug.h>
#include <vnet/session/segment_manager.h>
@@ -31,10 +28,10 @@
typedef enum
{
- FIFO_EVENT_SERVER_RX,
- FIFO_EVENT_SERVER_TX,
+ FIFO_EVENT_APP_RX,
+ FIFO_EVENT_APP_TX,
FIFO_EVENT_TIMEOUT,
- FIFO_EVENT_SERVER_EXIT,
+ FIFO_EVENT_DISCONNECT,
FIFO_EVENT_BUILTIN_RX
} fifo_event_type_t;
@@ -96,7 +93,11 @@ typedef enum
/* *INDENT-OFF* */
typedef CLIB_PACKED (struct {
- svm_fifo_t * fifo;
+ union
+ {
+ svm_fifo_t * fifo;
+ u64 session_handle;
+ };
u8 event_type;
u16 event_id;
}) session_fifo_event_t;
@@ -370,7 +371,9 @@ int stream_session_listen (stream_session_t * s, transport_endpoint_t * tep);
int stream_session_stop_listen (stream_session_t * s);
void stream_session_disconnect (stream_session_t * s);
void stream_session_cleanup (stream_session_t * s);
-
+void session_send_session_evt_to_thread (u64 session_handle,
+ fifo_event_type_t evt_type,
+ u32 thread_index);
u8 *format_stream_session (u8 * s, va_list * args);
void session_register_transport (u8 type, const transport_proto_vft_t * vft);
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 8116b673a32..79d67a2fece 100755
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -96,7 +96,7 @@ send_session_accept_callback (stream_session_t * s)
memset (mp, 0, sizeof (*mp));
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
-
+ mp->context = server->index;
listener = listen_session_get (s->session_type, s->listener_index);
tp_vft = session_get_transport_vft (s->session_type);
tc = tp_vft->get_connection (s->connection_index, s->thread_index);
@@ -270,23 +270,6 @@ static session_cb_vft_t uri_session_cb_vft = {
.redirect_connect_callback = redirect_connect_callback
};
-static int
-api_session_not_valid (u32 session_index, u32 thread_index)
-{
- session_manager_main_t *smm = vnet_get_session_manager_main ();
- stream_session_t *pool;
-
- if (thread_index >= vec_len (smm->sessions))
- return VNET_API_ERROR_INVALID_VALUE;
-
- pool = smm->sessions[thread_index];
-
- if (pool_is_free_index (pool, session_index))
- return VNET_API_ERROR_INVALID_VALUE_2;
-
- return 0;
-}
-
static void
vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
{
@@ -324,9 +307,9 @@ vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
rv = vnet_application_attach (a);
done:
+
/* *INDENT-OFF* */
REPLY_MACRO2 (VL_API_APPLICATION_ATTACH_REPLY, ({
- rmp->retval = rv;
if (!rv)
{
rmp->segment_name_length = 0;
@@ -558,24 +541,33 @@ static void
vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
{
stream_session_t *s;
- int rv;
u32 session_index, thread_index;
- session_index = stream_session_index_from_handle (mp->handle);
- thread_index = stream_session_thread_from_handle (mp->handle);
- if (api_session_not_valid (session_index, thread_index))
- return;
-
- s = stream_session_get (session_index, thread_index);
- rv = mp->retval;
+ vnet_disconnect_args_t _a, *a = &_a;
- if (rv)
+ /* Server isn't interested, kill the session */
+ if (mp->retval)
{
- /* Server isn't interested, kill the session */
- stream_session_disconnect (s);
- return;
+ a->app_index = mp->context;
+ a->handle = mp->handle;
+ vnet_disconnect_session (a);
+ }
+ else
+ {
+ stream_session_parse_handle (mp->handle, &session_index, &thread_index);
+ s = stream_session_get_if_valid (session_index, thread_index);
+ if (!s)
+ {
+ clib_warning ("session doesn't exist");
+ return;
+ }
+ if (s->app_index != mp->context)
+ {
+ clib_warning ("app doesn't own session");
+ return;
+ }
+ /* XXX volatile? */
+ s->session_state = SESSION_STATE_READY;
}
-
- s->session_state = SESSION_STATE_READY;
}
static void