aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2017-04-04 23:08:23 -0700
committerFlorin Coras <fcoras@cisco.com>2017-04-13 18:35:50 -0700
commit6cf30adc2cd3aa818e5d97cf71ea8b2fc2aaefa7 (patch)
tree3c4afef26295500b243f3655d96071565c2d2464
parent0f7d2ff58a63fdc671c1c0954ffe7c6ff0501daa (diff)
Session layer refactoring
Major refactoring of the session layer api - Add attatch api for application binding to the the session layer - Simplify listen/connect calls - Update application CLI - Add transport endpoint to accept callback - Associate segment manager to application and allow for multiple binds/connects per app Additional: - svm fifo cleanup - add fifo free, format fns - add fifo offset enqueue unit test Change-Id: Id93a65047de61afc2bf3d58c9b544339c02065af Signed-off-by: Florin Coras <fcoras@cisco.com> Signed-off-by: Dave Barach <dave@barachs.net>
-rw-r--r--src/scripts/vnet/uri/udp3
-rw-r--r--src/svm/svm_fifo.c66
-rw-r--r--src/svm/svm_fifo.h32
-rw-r--r--src/svm/svm_fifo_segment.h14
-rw-r--r--src/uri/uri_tcp_test.c315
-rw-r--r--src/uri/uri_udp_test.c326
-rw-r--r--src/vnet.am2
-rw-r--r--src/vnet/api_errno.h4
-rw-r--r--src/vnet/session/application.c458
-rw-r--r--src/vnet/session/application.h77
-rw-r--r--src/vnet/session/application_interface.c278
-rw-r--r--src/vnet/session/application_interface.h45
-rw-r--r--src/vnet/session/segment_manager.c342
-rw-r--r--src/vnet/session/segment_manager.h106
-rw-r--r--src/vnet/session/session.api237
-rw-r--r--src/vnet/session/session.c564
-rw-r--r--src/vnet/session/session.h175
-rw-r--r--src/vnet/session/session_api.c678
-rw-r--r--src/vnet/session/transport.h23
-rw-r--r--src/vnet/tcp/builtin_client.c161
-rw-r--r--src/vnet/tcp/builtin_client.h7
-rw-r--r--src/vnet/tcp/builtin_server.c206
-rw-r--r--src/vnet/tcp/tcp.c20
-rw-r--r--src/vnet/tcp/tcp.h9
-rw-r--r--src/vnet/tcp/tcp_input.c7
-rw-r--r--src/vnet/tcp/tcp_test.c127
-rw-r--r--src/vnet/udp/builtin_server.c34
27 files changed, 2601 insertions, 1715 deletions
diff --git a/src/scripts/vnet/uri/udp b/src/scripts/vnet/uri/udp
index ca13b83c..c7628f49 100644
--- a/src/scripts/vnet/uri/udp
+++ b/src/scripts/vnet/uri/udp
@@ -1,5 +1,5 @@
loop create
-set int ip address loop0 10.0.0.1/32
+set int ip address loop0 6.0.0.1/32
set int state loop0 up
packet-generator new {
@@ -17,3 +17,4 @@ packet-generator new {
incrementing 100
}
}
+session enable
diff --git a/src/svm/svm_fifo.c b/src/svm/svm_fifo.c
index cc84feb9..097bab77 100644
--- a/src/svm/svm_fifo.c
+++ b/src/svm/svm_fifo.c
@@ -20,8 +20,6 @@ svm_fifo_t *
svm_fifo_create (u32 data_size_in_bytes)
{
svm_fifo_t *f;
- pthread_mutexattr_t attr;
- pthread_condattr_t cattr;
f = clib_mem_alloc_aligned_or_null (sizeof (*f) + data_size_in_bytes,
CLIB_CACHE_LINE_BYTES);
@@ -32,29 +30,16 @@ svm_fifo_create (u32 data_size_in_bytes)
f->nitems = data_size_in_bytes;
f->ooos_list_head = OOO_SEGMENT_INVALID_INDEX;
- memset (&attr, 0, sizeof (attr));
- memset (&cattr, 0, sizeof (cattr));
-
- if (pthread_mutexattr_init (&attr))
- clib_unix_warning ("mutexattr_init");
- if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
- clib_unix_warning ("pthread_mutexattr_setpshared");
- if (pthread_mutex_init (&f->mutex, &attr))
- clib_unix_warning ("mutex_init");
- if (pthread_mutexattr_destroy (&attr))
- clib_unix_warning ("mutexattr_destroy");
- if (pthread_condattr_init (&cattr))
- clib_unix_warning ("condattr_init");
- if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
- clib_unix_warning ("condattr_setpshared");
- if (pthread_cond_init (&f->condvar, &cattr))
- clib_unix_warning ("cond_init1");
- if (pthread_condattr_destroy (&cattr))
- clib_unix_warning ("cond_init2");
-
return (f);
}
+void
+svm_fifo_free (svm_fifo_t * f)
+{
+ pool_free (f->ooo_segments);
+ clib_mem_free (f);
+}
+
always_inline ooo_segment_t *
ooo_segment_new (svm_fifo_t * f, u32 start, u32 length)
{
@@ -567,6 +552,43 @@ svm_fifo_dequeue_drop (svm_fifo_t * f, int pid, u32 max_bytes)
return total_drop_bytes;
}
+u8 *
+format_svm_fifo (u8 * s, va_list * args)
+{
+ svm_fifo_t *f = va_arg (*args, svm_fifo_t *);
+ int verbose = va_arg (*args, int);
+
+ s = format (s, "cursize %u nitems %u has_event %d\n",
+ f->cursize, f->nitems, f->has_event);
+ s = format (s, "head %d tail %d\n", f->head, f->tail);
+
+ if (verbose > 1)
+ s = format
+ (s, "server session %d thread %d client session %d thread %d\n",
+ f->server_session_index, f->server_thread_index,
+ f->client_session_index, f->client_thread_index);
+
+ if (verbose)
+ {
+ ooo_segment_t *seg;
+ u32 seg_index;
+
+ s =
+ format (s, "ooo pool %d active elts\n", pool_elts (f->ooo_segments));
+
+ seg_index = f->ooos_list_head;
+
+ while (seg_index != OOO_SEGMENT_INVALID_INDEX)
+ {
+ seg = pool_elt_at_index (f->ooo_segments, seg_index);
+ s = format (s, " pos %u, len %u next %d\n",
+ seg->fifo_position, seg->length, seg->next);
+ seg_index = seg->next;
+ }
+ }
+ return s;
+}
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/svm/svm_fifo.h b/src/svm/svm_fifo.h
index 80e5b0f2..9beb63f5 100644
--- a/src/svm/svm_fifo.h
+++ b/src/svm/svm_fifo.h
@@ -48,10 +48,6 @@ typedef struct
u32 nitems;
CLIB_CACHE_LINE_ALIGN_MARK (end_cursize);
- pthread_mutex_t mutex; /* 8 bytes */
- pthread_cond_t condvar; /* 8 bytes */
- svm_lock_tag_t tag;
-
volatile u8 has_event; /**< non-zero if deq event exists */
u32 owner_pid;
@@ -60,6 +56,7 @@ typedef struct
u32 client_session_index;
u8 server_thread_index;
u8 client_thread_index;
+ u32 segment_manager;
CLIB_CACHE_LINE_ALIGN_MARK (end_shared);
u32 head;
CLIB_CACHE_LINE_ALIGN_MARK (end_consumer);
@@ -74,30 +71,6 @@ typedef struct
CLIB_CACHE_LINE_ALIGN_MARK (data);
} svm_fifo_t;
-static inline int
-svm_fifo_lock (svm_fifo_t * f, u32 pid, u32 tag, int nowait)
-{
- if (PREDICT_TRUE (nowait == 0))
- pthread_mutex_lock (&f->mutex);
- else
- {
- if (pthread_mutex_trylock (&f->mutex))
- return -1;
- }
- f->owner_pid = pid;
- f->tag = tag;
- return 0;
-}
-
-static inline void
-svm_fifo_unlock (svm_fifo_t * f)
-{
- f->owner_pid = 0;
- f->tag = 0;
- CLIB_MEMORY_BARRIER ();
- pthread_mutex_unlock (&f->mutex);
-}
-
static inline u32
svm_fifo_max_dequeue (svm_fifo_t * f)
{
@@ -139,6 +112,7 @@ svm_fifo_unset_event (svm_fifo_t * f)
}
svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
+void svm_fifo_free (svm_fifo_t * f);
int svm_fifo_enqueue_nowait (svm_fifo_t * f, int pid, u32 max_bytes,
u8 * copy_from_here);
@@ -154,6 +128,8 @@ int svm_fifo_peek (svm_fifo_t * f, int pid, u32 offset, u32 max_bytes,
u8 * copy_here);
int svm_fifo_dequeue_drop (svm_fifo_t * f, int pid, u32 max_bytes);
+format_function_t format_svm_fifo;
+
always_inline ooo_segment_t *
svm_fifo_newest_ooo_segment (svm_fifo_t * f)
{
diff --git a/src/svm/svm_fifo_segment.h b/src/svm/svm_fifo_segment.h
index ecb5653a..9ab47a4c 100644
--- a/src/svm/svm_fifo_segment.h
+++ b/src/svm/svm_fifo_segment.h
@@ -55,6 +55,18 @@ svm_fifo_get_segment (u32 segment_index)
return vec_elt_at_index (ssm->segments, segment_index);
}
+static inline u8
+svm_fifo_segment_has_fifos (svm_fifo_segment_private_t * fifo_segment)
+{
+ return vec_len ((svm_fifo_t **) fifo_segment->h->fifos) != 0;
+}
+
+static inline svm_fifo_t **
+svm_fifo_segment_get_fifos (svm_fifo_segment_private_t * fifo_segment)
+{
+ return (svm_fifo_t **) fifo_segment->h->fifos;
+}
+
#define foreach_ssvm_fifo_segment_api_error \
_(OUT_OF_SPACE, "Out of space in segment", -200)
@@ -73,9 +85,7 @@ svm_fifo_t *svm_fifo_segment_alloc_fifo (svm_fifo_segment_private_t * s,
u32 data_size_in_bytes);
void svm_fifo_segment_free_fifo (svm_fifo_segment_private_t * s,
svm_fifo_t * f);
-
void svm_fifo_segment_init (u64 baseva, u32 timeout_in_seconds);
-
u32 svm_fifo_segment_index (svm_fifo_segment_private_t * s);
#endif /* __included_ssvm_fifo_segment_h__ */
diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c
index e2834817..c057e06e 100644
--- a/src/uri/uri_tcp_test.c
+++ b/src/uri/uri_tcp_test.c
@@ -15,8 +15,6 @@
#include <stdio.h>
#include <signal.h>
-#include <vlib/vlib.h>
-#include <vnet/vnet.h>
#include <svm/svm_fifo_segment.h>
#include <vlibmemory/api.h>
#include <vpp/api/vpe_msg_enum.h>
@@ -47,8 +45,7 @@ typedef struct
svm_fifo_t *server_rx_fifo;
svm_fifo_t *server_tx_fifo;
- u32 vpp_session_index;
- u32 vpp_session_thread;
+ u32 vpp_session_handle;
} session_t;
typedef enum
@@ -116,7 +113,7 @@ typedef struct
pthread_t client_rx_thread_handle;
u32 client_bytes_received;
u8 test_return_packets;
- u32 bytes_to_send;
+ u64 bytes_to_send;
/* convenience */
svm_fifo_segment_main_t *segment_main;
@@ -152,6 +149,88 @@ wait_for_state_change (uri_tcp_test_main_t * utm, connection_state_t state)
return -1;
}
+void
+application_attach (uri_tcp_test_main_t * utm)
+{
+ vl_api_application_attach_t *bmp;
+ u32 fifo_size = 3 << 20;
+ bmp = vl_msg_api_alloc (sizeof (*bmp));
+ memset (bmp, 0, sizeof (*bmp));
+
+ bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
+ bmp->client_index = utm->my_client_index;
+ bmp->context = ntohl (0xfeedface);
+ bmp->options[SESSION_OPTIONS_FLAGS] =
+ SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
+ bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
+ bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
+ bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
+ bmp->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 20;
+ vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
+}
+
+void
+application_detach (uri_tcp_test_main_t * utm)
+{
+ vl_api_application_detach_t *bmp;
+ bmp = vl_msg_api_alloc (sizeof (*bmp));
+ memset (bmp, 0, sizeof (*bmp));
+
+ bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_DETACH);
+ bmp->client_index = utm->my_client_index;
+ bmp->context = ntohl (0xfeedface);
+ vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
+}
+
+static void
+vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
+ mp)
+{
+ uri_tcp_test_main_t *utm = &uri_tcp_test_main;
+ svm_fifo_segment_create_args_t _a, *a = &_a;
+ int rv;
+
+ if (mp->retval)
+ {
+ uword *errp = hash_get (utm->error_string_by_error_number, mp->retval);
+ clib_warning ("attach failed: %s", *errp);
+ utm->state = STATE_FAILED;
+ return;
+ }
+
+ if (mp->segment_name_length == 0)
+ {
+ clib_warning ("segment_name_length zero");
+ return;
+ }
+
+ a->segment_name = (char *) mp->segment_name;
+ a->segment_size = mp->segment_size;
+
+ ASSERT (mp->app_event_queue_address);
+
+ /* Attach to the segment vpp created */
+ rv = svm_fifo_segment_attach (a);
+ if (rv)
+ {
+ clib_warning ("svm_fifo_segment_attach ('%s') failed",
+ mp->segment_name);
+ return;
+ }
+
+ utm->our_event_queue =
+ (unix_shared_memory_queue_t *) mp->app_event_queue_address;
+
+}
+
+static void
+vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t *
+ mp)
+{
+ if (mp->retval)
+ clib_warning ("detach returned with err: %d", mp->retval);
+}
+
static void
init_error_string_table (uri_tcp_test_main_t * utm)
{
@@ -239,21 +318,18 @@ vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
vl_api_disconnect_session_reply_t *rmp;
uword *p;
int rv = 0;
- u64 key;
-
- key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
- p = hash_get (utm->session_index_by_vpp_handles, key);
+ p = hash_get (utm->session_index_by_vpp_handles, mp->handle);
if (p)
{
session = pool_elt_at_index (utm->sessions, p[0]);
- hash_unset (utm->session_index_by_vpp_handles, key);
+ hash_unset (utm->session_index_by_vpp_handles, mp->handle);
pool_put (utm->sessions, session);
}
else
{
- clib_warning ("couldn't find session key %llx", key);
+ clib_warning ("couldn't find session key %llx", mp->handle);
rv = -11;
}
@@ -264,8 +340,7 @@ vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
rmp->retval = rv;
- rmp->session_index = mp->session_index;
- rmp->session_thread_index = mp->session_thread_index;
+ rmp->handle = mp->handle;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
}
@@ -277,22 +352,19 @@ vl_api_reset_session_t_handler (vl_api_reset_session_t * mp)
vl_api_reset_session_reply_t *rmp;
uword *p;
int rv = 0;
- u64 key;
- key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
-
- p = hash_get (utm->session_index_by_vpp_handles, key);
+ p = hash_get (utm->session_index_by_vpp_handles, mp->handle);
if (p)
{
session = pool_elt_at_index (utm->sessions, p[0]);
- hash_unset (utm->session_index_by_vpp_handles, key);
+ hash_unset (utm->session_index_by_vpp_handles, mp->handle);
pool_put (utm->sessions, session);
utm->time_to_stop = 1;
}
else
{
- clib_warning ("couldn't find session key %llx", key);
+ clib_warning ("couldn't find session key %llx", mp->handle);
rv = -11;
}
@@ -300,8 +372,7 @@ vl_api_reset_session_t_handler (vl_api_reset_session_t * mp)
memset (rmp, 0, sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_RESET_SESSION_REPLY);
rmp->retval = rv;
- rmp->session_index = mp->session_index;
- rmp->session_thread_index = mp->session_thread_index;
+ rmp->handle = mp->handle;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
}
@@ -343,7 +414,7 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
{
if (n_read == -2)
{
- clib_warning ("weird!");
+// clib_warning ("weird!");
break;
}
}
@@ -409,52 +480,19 @@ static void
vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
{
uri_tcp_test_main_t *utm = &uri_tcp_test_main;
- svm_fifo_segment_create_args_t _a, *a = &_a;
session_t *session;
u32 session_index;
svm_fifo_t *rx_fifo, *tx_fifo;
int rv;
- u64 key;
if (mp->retval)
{
- clib_warning ("connection failed with code: %d", mp->retval);
- utm->state = STATE_FAILED;
- return;
- }
-
- /*
- * Attatch to segment
- */
-
- if (mp->segment_name_length == 0)
- {
- clib_warning ("segment_name_length zero");
+ uword *errp = hash_get (utm->error_string_by_error_number, -mp->retval);
+ clib_warning ("connection failed with code: %s", *errp);
utm->state = STATE_FAILED;
return;
}
- a->segment_name = (char *) mp->segment_name;
- a->segment_size = mp->segment_size;
-
- ASSERT (mp->client_event_queue_address);
-
- /* Attach to the segment vpp created */
- rv = svm_fifo_segment_attach (a);
- if (rv)
- {
- clib_warning ("svm_fifo_segment_attach ('%s') failed",
- mp->segment_name);
- return;
- }
-
- /*
- * Save the queues
- */
-
- utm->our_event_queue = (unix_shared_memory_queue_t *)
- mp->client_event_queue_address;
-
utm->vpp_event_queue = (unix_shared_memory_queue_t *)
mp->vpp_event_queue_address;
@@ -472,16 +510,14 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
session->server_rx_fifo = rx_fifo;
session->server_tx_fifo = tx_fifo;
- session->vpp_session_index = mp->session_index;
- session->vpp_session_thread = mp->session_thread_index;
+ session->vpp_session_handle = mp->handle;
/* Save handle */
utm->connected_session_index = session_index;
utm->state = STATE_READY;
/* Add it to lookup table */
- key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
- hash_set (utm->session_index_by_vpp_handles, key, session_index);
+ hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
/* Start RX thread */
rv = pthread_create (&utm->client_rx_thread_handle,
@@ -606,8 +642,7 @@ client_disconnect (uri_tcp_test_main_t * utm)
memset (dmp, 0, sizeof (*dmp));
dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
dmp->client_index = utm->my_client_index;
- dmp->session_index = connected_session->vpp_session_index;
- dmp->session_thread_index = connected_session->vpp_session_thread;
+ dmp->handle = connected_session->vpp_session_handle;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & dmp);
}
@@ -616,6 +651,7 @@ client_test (uri_tcp_test_main_t * utm)
{
int i;
+ application_attach (utm);
client_connect (utm);
if (wait_for_state_change (utm, STATE_READY))
@@ -636,47 +672,26 @@ client_test (uri_tcp_test_main_t * utm)
if (wait_for_state_change (utm, STATE_START))
{
+ clib_warning ("Disconnect failed");
return;
}
+ application_detach (utm);
}
static void
vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
{
uri_tcp_test_main_t *utm = &uri_tcp_test_main;
- svm_fifo_segment_create_args_t _a, *a = &_a;
- int rv;
if (mp->retval)
{
- clib_warning ("bind failed: %d", mp->retval);
+ uword *errp = hash_get (utm->error_string_by_error_number,
+ -clib_net_to_host_u32 (mp->retval));
+ clib_warning ("bind failed: %s", (char *) *errp);
utm->state = STATE_FAILED;
return;
}
- if (mp->segment_name_length == 0)
- {
- clib_warning ("segment_name_length zero");
- return;
- }
-
- a->segment_name = (char *) mp->segment_name;
- a->segment_size = mp->segment_size;
-
- ASSERT (mp->server_event_queue_address);
-
- /* Attach to the segment vpp created */
- rv = svm_fifo_segment_attach (a);
- if (rv)
- {
- clib_warning ("svm_fifo_segment_attach ('%s') failed",
- mp->segment_name);
- return;
- }
-
- utm->our_event_queue =
- (unix_shared_memory_queue_t *) mp->server_event_queue_address;
-
utm->state = STATE_READY;
}
@@ -691,6 +706,89 @@ vl_api_unbind_uri_reply_t_handler (vl_api_unbind_uri_reply_t * mp)
utm->state = STATE_START;
}
+u8 *
+format_ip4_address (u8 * s, va_list * args)
+{
+ u8 *a = va_arg (*args, u8 *);
+ return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
+}
+
+u8 *
+format_ip6_address (u8 * s, va_list * args)
+{
+ ip6_address_t *a = va_arg (*args, ip6_address_t *);
+ u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
+
+ i_max_n_zero = ARRAY_LEN (a->as_u16);
+ max_n_zeros = 0;
+ i_first_zero = i_max_n_zero;
+ n_zeros = 0;
+ for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
+ {
+ u32 is_zero = a->as_u16[i] == 0;
+ if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
+ {
+ i_first_zero = i;
+ n_zeros = 0;
+ }
+ n_zeros += is_zero;
+ if ((!is_zero && n_zeros > max_n_zeros)
+ || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
+ {
+ i_max_n_zero = i_first_zero;
+ max_n_zeros = n_zeros;
+ i_first_zero = ARRAY_LEN (a->as_u16);
+ n_zeros = 0;
+ }
+ }
+
+ last_double_colon = 0;
+ for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
+ {
+ if (i == i_max_n_zero && max_n_zeros > 1)
+ {
+ s = format (s, "::");
+ i += max_n_zeros - 1;
+ last_double_colon = 1;
+ }
+ else
+ {
+ s = format (s, "%s%x",
+ (last_double_colon || i == 0) ? "" : ":",
+ clib_net_to_host_u16 (a->as_u16[i]));
+ last_double_colon = 0;
+ }
+ }
+
+ return s;
+}
+
+/* Format an IP46 address. */
+u8 *
+format_ip46_address (u8 * s, va_list * args)
+{
+ ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
+ ip46_type_t type = va_arg (*args, ip46_type_t);
+ int is_ip4 = 1;
+
+ switch (type)
+ {
+ case IP46_TYPE_ANY:
+ is_ip4 = ip46_address_is_ip4 (ip46);
+ break;
+ case IP46_TYPE_IP4:
+ is_ip4 = 1;
+ break;
+ case IP46_TYPE_IP6:
+ is_ip4 = 0;
+ break;
+ }
+
+ return is_ip4 ?
+ format (s, "%U", format_ip4_address, &ip46->ip4) :
+ format (s, "%U", format_ip6_address, &ip46->ip6);
+}
+
static void
vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
{
@@ -699,12 +797,15 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
svm_fifo_t *rx_fifo, *tx_fifo;
session_t *session;
static f64 start_time;
- u64 key;
u32 session_index;
+ u8 *ip_str;
if (start_time == 0.0)
start_time = clib_time_now (&utm->clib_time);
+ ip_str = format (0, "%U", format_ip46_address, &mp->ip, mp->is_ip4);
+ clib_warning ("Accepted session from: %s:%d", ip_str,
+ clib_net_to_host_u16 (mp->port));
utm->vpp_event_queue = (unix_shared_memory_queue_t *)
mp->vpp_event_queue_address;
@@ -721,8 +822,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
session->server_tx_fifo = tx_fifo;
/* Add it to lookup table */
- key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
- hash_set (utm->session_index_by_vpp_handles, key, session_index);
+ hash_set (utm->session_index_by_vpp_handles, mp->handle, session_index);
utm->state = STATE_READY;
@@ -741,9 +841,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
rmp = vl_msg_api_alloc (sizeof (*rmp));
memset (rmp, 0, sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
- rmp->session_type = mp->session_type;
- rmp->session_index = mp->session_index;
- rmp->session_thread_index = mp->session_thread_index;
+ rmp->handle = mp->handle;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
}
@@ -837,22 +935,15 @@ server_handle_event_queue (uri_tcp_test_main_t * utm)
}
void
-server_bind (uri_tcp_test_main_t * utm)
+server_listen (uri_tcp_test_main_t * utm)
{
vl_api_bind_uri_t *bmp;
- u32 fifo_size = 3 << 20;
bmp = vl_msg_api_alloc (sizeof (*bmp));
memset (bmp, 0, sizeof (*bmp));
bmp->_vl_msg_id = ntohs (VL_API_BIND_URI);
bmp->client_index = utm->my_client_index;
bmp->context = ntohl (0xfeedface);
- bmp->initial_segment_size = 256 << 20; /* size of initial segment */
- bmp->options[SESSION_OPTIONS_FLAGS] =
- SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
- bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
- bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
- bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
memcpy (bmp->uri, utm->uri, vec_len (utm->uri));
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
}
@@ -874,8 +965,10 @@ server_unbind (uri_tcp_test_main_t * utm)
void
server_test (uri_tcp_test_main_t * utm)
{
+ application_attach (utm);
+
/* Bind to uri */
- server_bind (utm);
+ server_listen (utm);
if (wait_for_state_change (utm, STATE_READY))
{
@@ -895,6 +988,8 @@ server_test (uri_tcp_test_main_t * utm)
return;
}
+ application_detach (utm);
+
fformat (stdout, "Test complete...\n");
}
@@ -916,7 +1011,9 @@ _(CONNECT_URI_REPLY, connect_uri_reply) \
_(DISCONNECT_SESSION, disconnect_session) \
_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
_(RESET_SESSION, reset_session) \
-_(MAP_ANOTHER_SEGMENT, map_another_segment)
+_(APPLICATION_ATTACH_REPLY, application_attach_reply) \
+_(APPLICATION_DETACH_REPLY, application_detach_reply) \
+_(MAP_ANOTHER_SEGMENT, map_another_segment) \
void
uri_api_hookup (uri_tcp_test_main_t * utm)
@@ -941,7 +1038,7 @@ main (int argc, char **argv)
u8 *heap, *uri = 0;
u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234";
u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
- u32 bytes_to_send = 64 << 10, mbytes;
+ u64 bytes_to_send = 64 << 10, mbytes;
u32 tmp;
mheap_t *h;
session_t *session;
@@ -988,10 +1085,14 @@ main (int argc, char **argv)
drop_packets = 1;
else if (unformat (a, "test"))
test_return_packets = 1;
- else if (unformat (a, "mbytes %d", &mbytes))
+ else if (unformat (a, "mbytes %lld", &mbytes))
{
bytes_to_send = mbytes << 20;
}
+ else if (unformat (a, "gbytes %lld", &mbytes))
+ {
+ bytes_to_send = mbytes << 30;
+ }
else
{
fformat (stderr, "%s: usage [master|slave]\n");
diff --git a/src/uri/uri_udp_test.c b/src/uri/uri_udp_test.c
index e6c239c1..598052bc 100644
--- a/src/uri/uri_udp_test.c
+++ b/src/uri/uri_udp_test.c
@@ -55,6 +55,7 @@ typedef enum
{
STATE_START,
STATE_READY,
+ STATE_FAILED,
STATE_DISCONNECTING,
} connection_state_t;
@@ -162,6 +163,86 @@ setup_signal_handlers (void)
return 0;
}
+void
+application_attach (uri_udp_test_main_t * utm)
+{
+ vl_api_application_attach_t *bmp;
+ u32 fifo_size = 3 << 20;
+ bmp = vl_msg_api_alloc (sizeof (*bmp));
+ memset (bmp, 0, sizeof (*bmp));
+
+ bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_ATTACH);
+ bmp->client_index = utm->my_client_index;
+ bmp->context = ntohl (0xfeedface);
+ bmp->options[SESSION_OPTIONS_FLAGS] =
+ SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
+ bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
+ bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
+ bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
+ bmp->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 20;
+ vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
+}
+
+void
+application_detach (uri_udp_test_main_t * utm)
+{
+ vl_api_application_detach_t *bmp;
+ bmp = vl_msg_api_alloc (sizeof (*bmp));
+ memset (bmp, 0, sizeof (*bmp));
+
+ bmp->_vl_msg_id = ntohs (VL_API_APPLICATION_DETACH);
+ bmp->client_index = utm->my_client_index;
+ bmp->context = ntohl (0xfeedface);
+ vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
+}
+
+static void
+vl_api_application_attach_reply_t_handler (vl_api_application_attach_reply_t *
+ mp)
+{
+ uri_udp_test_main_t *utm = &uri_udp_test_main;
+ svm_fifo_segment_create_args_t _a, *a = &_a;
+ int rv;
+
+ if (mp->retval)
+ {
+ clib_warning ("attach failed: %d", mp->retval);
+ utm->state = STATE_FAILED;
+ return;
+ }
+
+ if (mp->segment_name_length == 0)
+ {
+ clib_warning ("segment_name_length zero");
+ return;
+ }
+
+ a->segment_name = (char *) mp->segment_name;
+ a->segment_size = mp->segment_size;
+
+ ASSERT (mp->app_event_queue_address);
+
+ /* Attach to the segment vpp created */
+ rv = svm_fifo_segment_attach (a);
+ if (rv)
+ {
+ clib_warning ("svm_fifo_segment_attach ('%s') failed",
+ mp->segment_name);
+ return;
+ }
+
+ utm->our_event_queue =
+ (unix_shared_memory_queue_t *) mp->app_event_queue_address;
+}
+
+static void
+vl_api_application_detach_reply_t_handler (vl_api_application_detach_reply_t *
+ mp)
+{
+ if (mp->retval)
+ clib_warning ("detach returned with err: %d", mp->retval);
+}
+
u8 *
format_api_error (u8 * s, va_list * args)
{
@@ -255,9 +336,22 @@ cut_through_thread_fn (void *arg)
}
static void
-uri_udp_slave_test (uri_udp_test_main_t * utm)
+udp_client_connect (uri_udp_test_main_t * utm)
{
vl_api_connect_uri_t *cmp;
+ cmp = vl_msg_api_alloc (sizeof (*cmp));
+ memset (cmp, 0, sizeof (*cmp));
+
+ cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
+ cmp->client_index = utm->my_client_index;
+ cmp->context = ntohl (0xfeedface);
+ memcpy (cmp->uri, utm->connect_uri, vec_len (utm->connect_uri));
+ vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & cmp);
+}
+
+static void
+client_send (uri_udp_test_main_t * utm, session_t * session)
+{
int i;
u8 *test_data = 0;
u64 bytes_received = 0, bytes_sent = 0;
@@ -265,30 +359,16 @@ uri_udp_slave_test (uri_udp_test_main_t * utm)
int rv;
int mypid = getpid ();
f64 before, after, delta, bytes_per_second;
- session_t *session;
svm_fifo_t *rx_fifo, *tx_fifo;
int buffer_offset, bytes_to_send = 0;
+ /*
+ * Prepare test data
+ */
vec_validate (test_data, 64 * 1024 - 1);
for (i = 0; i < vec_len (test_data); i++)
test_data[i] = i & 0xff;
- cmp = vl_msg_api_alloc (sizeof (*cmp));
- memset (cmp, 0, sizeof (*cmp));
-
- cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
- cmp->client_index = utm->my_client_index;
- cmp->context = ntohl (0xfeedface);
- memcpy (cmp->uri, utm->connect_uri, vec_len (utm->connect_uri));
- vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & cmp);
-
- if (wait_for_state_change (utm, STATE_READY))
- {
- clib_warning ("timeout waiting for STATE_READY");
- return;
- }
-
- session = pool_elt_at_index (utm->sessions, utm->cut_through_session_index);
rx_fifo = session->server_rx_fifo;
tx_fifo = session->server_tx_fifo;
@@ -375,35 +455,38 @@ uri_udp_slave_test (uri_udp_test_main_t * utm)
}
static void
-vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
+uri_udp_client_test (uri_udp_test_main_t * utm)
{
- uri_udp_test_main_t *utm = &uri_udp_test_main;
- svm_fifo_segment_create_args_t _a, *a = &_a;
- int rv;
+ session_t *session;
- if (mp->segment_name_length == 0)
+ application_attach (utm);
+ udp_client_connect (utm);
+
+ if (wait_for_state_change (utm, STATE_READY))
{
- clib_warning ("segment_name_length zero");
+ clib_warning ("timeout waiting for STATE_READY");
return;
}
- a->segment_name = (char *) mp->segment_name;
- a->segment_size = mp->segment_size;
+ /* Only works with cut through sessions */
+ session = pool_elt_at_index (utm->sessions, utm->cut_through_session_index);
- ASSERT (mp->server_event_queue_address);
+ client_send (utm, session);
+ application_detach (utm);
+}
- /* Attach to the segment vpp created */
- rv = svm_fifo_segment_attach (a);
- if (rv)
+static void
+vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
+{
+ uri_udp_test_main_t *utm = &uri_udp_test_main;
+
+ if (mp->retval)
{
- clib_warning ("svm_fifo_segment_attach ('%s') failed",
- mp->segment_name);
+ clib_warning ("bind failed: %d", mp->retval);
+ utm->state = STATE_FAILED;
return;
}
- utm->our_event_queue = (unix_shared_memory_queue_t *)
- mp->server_event_queue_address;
-
utm->state = STATE_READY;
}
@@ -427,6 +510,9 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
mp->segment_size);
}
+/**
+ * Acting as server for redirected connect requests
+ */
static void
vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
{
@@ -456,7 +542,6 @@ vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
vec_add2 (utm->seg, seg, 1);
segment_index = vec_len (sm->segments) - 1;
-
memcpy (seg, sm->segments + segment_index, sizeof (utm->seg[0]));
pool_get (utm->sessions, session);
@@ -521,7 +606,6 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
svm_fifo_t *rx_fifo, *tx_fifo;
session_t *session;
static f64 start_time;
- u64 key;
if (start_time == 0.0)
start_time = clib_time_now (&utm->clib_time);
@@ -539,9 +623,8 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
session->server_rx_fifo = rx_fifo;
session->server_tx_fifo = tx_fifo;
- key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
-
- hash_set (utm->session_index_by_vpp_handles, key, session - utm->sessions);
+ hash_set (utm->session_index_by_vpp_handles, mp->handle,
+ session - utm->sessions);
utm->state = STATE_READY;
@@ -556,9 +639,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
rmp = vl_msg_api_alloc (sizeof (*rmp));
memset (rmp, 0, sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
- rmp->session_type = mp->session_type;
- rmp->session_index = mp->session_index;
- rmp->session_thread_index = mp->session_thread_index;
+ rmp->handle = mp->handle;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
}
@@ -570,21 +651,18 @@ vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
vl_api_disconnect_session_reply_t *rmp;
uword *p;
int rv = 0;
- u64 key;
-
- key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
- p = hash_get (utm->session_index_by_vpp_handles, key);
+ p = hash_get (utm->session_index_by_vpp_handles, mp->handle);
if (p)
{
session = pool_elt_at_index (utm->sessions, p[0]);
- hash_unset (utm->session_index_by_vpp_handles, key);
+ hash_unset (utm->session_index_by_vpp_handles, mp->handle);
pool_put (utm->sessions, session);
}
else
{
- clib_warning ("couldn't find session key %llx", key);
+ clib_warning ("couldn't find session key %llx", mp->handle);
rv = -11;
}
@@ -592,77 +670,76 @@ vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
memset (rmp, 0, sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION_REPLY);
rmp->retval = rv;
- rmp->session_index = mp->session_index;
- rmp->session_thread_index = mp->session_thread_index;
+ rmp->handle = mp->handle;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
}
static void
vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
{
- svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
uri_udp_test_main_t *utm = &uri_udp_test_main;
- svm_fifo_segment_create_args_t _a, *a = &_a;
- ssvm_shared_header_t *sh;
- svm_fifo_segment_private_t *seg;
- svm_fifo_segment_header_t *fsh;
- session_t *session;
- u32 segment_index;
- int rv;
ASSERT (utm->i_am_master == 0);
- if (mp->segment_name_length == 0)
+ /* We've been redirected */
+ if (mp->segment_name_length > 0)
{
- clib_warning ("segment_name_length zero");
- return;
- }
-
- memset (a, 0, sizeof (*a));
-
- a->segment_name = (char *) mp->segment_name;
-
- sleep (1);
-
- rv = svm_fifo_segment_attach (a);
- if (rv)
- {
- clib_warning ("sm_fifo_segment_create ('%v') failed", mp->segment_name);
- return;
- }
-
- segment_index = vec_len (sm->segments) - 1;
+ svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
+ svm_fifo_segment_create_args_t _a, *a = &_a;
+ u32 segment_index;
+ session_t *session;
+ ssvm_shared_header_t *sh;
+ svm_fifo_segment_private_t *seg;
+ svm_fifo_segment_header_t *fsh;
+ int rv;
+
+ memset (a, 0, sizeof (*a));
+ a->segment_name = (char *) mp->segment_name;
+
+ sleep (1);
+
+ rv = svm_fifo_segment_attach (a);
+ if (rv)
+ {
+ clib_warning ("sm_fifo_segment_create ('%v') failed",
+ mp->segment_name);
+ return;
+ }
- vec_add2 (utm->seg, seg, 1);
+ segment_index = vec_len (sm->segments) - 1;
+ vec_add2 (utm->seg, seg, 1);
- memcpy (seg, sm->segments + segment_index, sizeof (*seg));
- sh = seg->ssvm.sh;
- fsh = (svm_fifo_segment_header_t *) sh->opaque[0];
+ memcpy (seg, sm->segments + segment_index, sizeof (*seg));
+ sh = seg->ssvm.sh;
+ fsh = (svm_fifo_segment_header_t *) sh->opaque[0];
- while (vec_len (fsh->fifos) < 2)
- sleep (1);
+ while (vec_len (fsh->fifos) < 2)
+ sleep (1);
- pool_get (utm->sessions, session);
- utm->cut_through_session_index = session - utm->sessions;
+ pool_get (utm->sessions, session);
+ utm->cut_through_session_index = session - utm->sessions;
- session->server_rx_fifo = (svm_fifo_t *) fsh->fifos[0];
- ASSERT (session->server_rx_fifo);
- session->server_tx_fifo = (svm_fifo_t *) fsh->fifos[1];
- ASSERT (session->server_tx_fifo);
+ session->server_rx_fifo = (svm_fifo_t *) fsh->fifos[0];
+ ASSERT (session->server_rx_fifo);
+ session->server_tx_fifo = (svm_fifo_t *) fsh->fifos[1];
+ ASSERT (session->server_tx_fifo);
+ }
/* security: could unlink /dev/shm/<mp->segment_name> here, maybe */
utm->state = STATE_READY;
}
-#define foreach_uri_msg \
-_(BIND_URI_REPLY, bind_uri_reply) \
-_(CONNECT_URI, connect_uri) \
-_(CONNECT_URI_REPLY, connect_uri_reply) \
-_(UNBIND_URI_REPLY, unbind_uri_reply) \
-_(ACCEPT_SESSION, accept_session) \
-_(DISCONNECT_SESSION, disconnect_session) \
-_(MAP_ANOTHER_SEGMENT, map_another_segment)
+#define foreach_uri_msg \
+_(BIND_URI_REPLY, bind_uri_reply) \
+_(CONNECT_URI, connect_uri) \
+_(CONNECT_URI_REPLY, connect_uri_reply) \
+_(UNBIND_URI_REPLY, unbind_uri_reply) \
+_(ACCEPT_SESSION, accept_session) \
+_(DISCONNECT_SESSION, disconnect_session) \
+_(MAP_ANOTHER_SEGMENT, map_another_segment) \
+_(APPLICATION_ATTACH_REPLY, application_attach_reply) \
+_(APPLICATION_DETACH_REPLY, application_detach_reply) \
void
uri_api_hookup (uri_udp_test_main_t * utm)
@@ -679,7 +756,6 @@ uri_api_hookup (uri_udp_test_main_t * utm)
}
-
int
connect_to_vpp (char *name)
{
@@ -784,26 +860,43 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
}
}
-void
-uri_udp_test (uri_udp_test_main_t * utm)
+static void
+server_unbind (uri_udp_test_main_t * utm)
{
- vl_api_bind_uri_t *bmp;
vl_api_unbind_uri_t *ump;
+ ump = vl_msg_api_alloc (sizeof (*ump));
+ memset (ump, 0, sizeof (*ump));
+
+ ump->_vl_msg_id = ntohs (VL_API_UNBIND_URI);
+ ump->client_index = utm->my_client_index;
+ memcpy (ump->uri, utm->uri, vec_len (utm->uri));
+ vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & ump);
+}
+
+static void
+server_listen (uri_udp_test_main_t * utm)
+{
+ vl_api_bind_uri_t *bmp;
+
bmp = vl_msg_api_alloc (sizeof (*bmp));
memset (bmp, 0, sizeof (*bmp));
bmp->_vl_msg_id = ntohs (VL_API_BIND_URI);
bmp->client_index = utm->my_client_index;
bmp->context = ntohl (0xfeedface);
- bmp->initial_segment_size = 256 << 20; /* size of initial segment */
- bmp->options[SESSION_OPTIONS_FLAGS] =
- SESSION_OPTIONS_FLAGS_USE_FIFO | SESSION_OPTIONS_FLAGS_ADD_SEGMENT;
- bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = 16 << 10;
- bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = 16 << 10;
- bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
memcpy (bmp->uri, utm->uri, vec_len (utm->uri));
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
+}
+
+void
+udp_server_test (uri_udp_test_main_t * utm)
+{
+
+ application_attach (utm);
+
+ /* Bind to uri */
+ server_listen (utm);
if (wait_for_state_change (utm, STATE_READY))
{
@@ -813,13 +906,8 @@ uri_udp_test (uri_udp_test_main_t * utm)
server_handle_event_queue (utm);
- ump = vl_msg_api_alloc (sizeof (*ump));
- memset (ump, 0, sizeof (*ump));
-
- ump->_vl_msg_id = ntohs (VL_API_UNBIND_URI);
- ump->client_index = utm->my_client_index;
- memcpy (ump->uri, utm->uri, vec_len (utm->uri));
- vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & ump);
+ /* Cleanup */
+ server_unbind (utm);
if (wait_for_state_change (utm, STATE_START))
{
@@ -827,6 +915,8 @@ uri_udp_test (uri_udp_test_main_t * utm)
return;
}
+ application_detach (utm);
+
fformat (stdout, "Test complete...\n");
}
@@ -892,7 +982,7 @@ main (int argc, char **argv)
utm->i_am_master = i_am_master;
utm->segment_main = &svm_fifo_segment_main;
- utm->connect_uri = format (0, "udp://10.0.0.1/1234%c", 0);
+ utm->connect_uri = format (0, "udp://6.0.0.1/1234%c", 0);
setup_signal_handlers ();
@@ -907,7 +997,7 @@ main (int argc, char **argv)
if (i_am_master == 0)
{
- uri_udp_slave_test (utm);
+ uri_udp_client_test (utm);
exit (0);
}
@@ -920,7 +1010,7 @@ main (int argc, char **argv)
for (i = 0; i < 200000; i++)
pool_put_index (utm->sessions, i);
- uri_udp_test (utm);
+ udp_server_test (utm);
vl_client_disconnect_from_vlib ();
exit (0);
diff --git a/src/vnet.am b/src/vnet.am
index bed4902b..25b84616 100644
--- a/src/vnet.am
+++ b/src/vnet.am
@@ -827,6 +827,7 @@ libvnet_la_SOURCES += \
vnet/session/session_cli.c \
vnet/session/hashes.c \
vnet/session/application_interface.c \
+ vnet/session/segment_manager.c \
vnet/session/session_api.c
nobase_include_HEADERS += \
@@ -835,6 +836,7 @@ nobase_include_HEADERS += \
vnet/session/transport.h \
vnet/session/application_interface.h \
vnet/session/session_debug.h \
+ vnet/session/segment_manager.h \
vnet/session/session.api.h
API_FILES += vnet/session/session.api
diff --git a/src/vnet/api_errno.h b/src/vnet/api_errno.h
index f3ffd2a6..e939404b 100644
--- a/src/vnet/api_errno.h
+++ b/src/vnet/api_errno.h
@@ -105,7 +105,9 @@ _(INVALID_GPE_MODE, -112, "Invalid GPE mode") \
_(LISP_GPE_ENTRIES_PRESENT, -113, "LISP GPE entries are present") \
_(ADDRESS_FOUND_FOR_INTERFACE, -114, "Address found for interface") \
_(SESSION_CONNECT_FAIL, -115, "Session failed to connect") \
-_(ENTRY_ALREADY_EXISTS, -116, "Entry already exists")
+_(ENTRY_ALREADY_EXISTS, -116, "Entry already exists") \
+_(SVM_SEGMENT_CREATE_FAIL, -117, "svm segment create fail") \
+_(APPLICATION_NOT_ATTACHED, -118, "application not attached")
typedef enum
{
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 513e5fac..5a45537b 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -14,18 +14,24 @@
*/
#include <vnet/session/application.h>
+#include <vnet/session/application_interface.h>
#include <vnet/session/session.h>
-/*
+/**
* Pool from which we allocate all applications
*/
static application_t *app_pool;
-/*
+/**
* Hash table of apps by api client index
*/
static uword *app_by_api_client_index;
+/**
+ * Default application event queue size
+ */
+static u32 default_app_evt_queue_size = 128;
+
int
application_api_queue_is_full (application_t * app)
{
@@ -67,37 +73,71 @@ application_lookup (u32 api_client_index)
return 0;
}
+application_t *
+application_new ()
+{
+ application_t *app;
+ pool_get (app_pool, app);
+ memset (app, 0, sizeof (*app));
+ app->index = application_get_index (app);
+ app->connects_seg_manager = ~0;
+ return app;
+}
+
void
application_del (application_t * app)
{
- session_manager_main_t *smm = vnet_get_session_manager_main ();
api_main_t *am = &api_main;
void *oldheap;
- session_manager_t *sm;
+ segment_manager_t *sm;
+ u64 handle;
+ u32 index, *handles = 0;
+ int i;
+ vnet_unbind_args_t _a, *a = &_a;
+
+ /*
+ * Cleanup segment managers
+ */
+ if (app->connects_seg_manager != (u32) ~ 0)
+ {
+ sm = segment_manager_get (app->connects_seg_manager);
+ segment_manager_del (sm);
+ }
- if (app->mode == APP_SERVER)
+ /* *INDENT-OFF* */
+ hash_foreach (handle, index, app->listeners_table,
+ ({
+ vec_add1 (handles, handle);
+ }));
+ /* *INDENT-ON* */
+
+ /* Actual listener cleanup */
+ for (i = 0; i < vec_len (handles); i++)
{
- sm = session_manager_get (app->session_manager_index);
- session_manager_del (smm, sm);
+ a->app_index = app->api_client_index;
+ a->handle = handles[i];
+ /* seg manager is removed when unbind completes */
+ vnet_unbind (a);
}
- /* Free the event fifo in the /vpe-api shared-memory segment */
+ /*
+ * Free the event fifo in the /vpe-api shared-memory segment
+ */
oldheap = svm_push_data_heap (am->vlib_rp);
if (app->event_queue)
unix_shared_memory_queue_free (app->event_queue);
svm_pop_heap (oldheap);
application_table_del (app);
-
pool_put (app_pool, app);
}
static void
-application_verify_cb_fns (application_type_t type, session_cb_vft_t * cb_fns)
+application_verify_cb_fns (session_cb_vft_t * cb_fns)
{
- if (type == APP_SERVER && cb_fns->session_accept_callback == 0)
+ if (cb_fns->session_accept_callback == 0)
clib_warning ("No accept callback function provided");
- if (type == APP_CLIENT && cb_fns->session_connected_callback == 0)
+ if (cb_fns->session_connected_callback == 0)
clib_warning ("No session connected callback function provided");
if (cb_fns->session_disconnect_callback == 0)
clib_warning ("No session disconnect callback function provided");
@@ -105,25 +145,26 @@ application_verify_cb_fns (application_type_t type, session_cb_vft_t * cb_fns)
clib_warning ("No session reset callback function provided");
}
-application_t *
-application_new (application_type_t type, session_type_t sst,
- u32 api_client_index, u32 flags, session_cb_vft_t * cb_fns)
+int
+application_init (application_t * app, u32 api_client_index, u64 * options,
+ session_cb_vft_t * cb_fns)
{
- session_manager_main_t *smm = vnet_get_session_manager_main ();
api_main_t *am = &api_main;
- application_t *app;
+ segment_manager_t *sm;
+ segment_manager_properties_t *props;
void *oldheap;
- session_manager_t *sm;
+ u32 app_evt_queue_size;
+ int rv;
- pool_get (app_pool, app);
- memset (app, 0, sizeof (*app));
+ app_evt_queue_size = options[APP_EVT_QUEUE_SIZE] > 0 ?
+ options[APP_EVT_QUEUE_SIZE] : default_app_evt_queue_size;
/* Allocate event fifo in the /vpe-api shared-memory segment */
oldheap = svm_push_data_heap (am->vlib_rp);
/* Allocate server event queue */
app->event_queue =
- unix_shared_memory_queue_init (128 /* nels $$$$ config */ ,
+ unix_shared_memory_queue_init (app_evt_queue_size,
sizeof (session_fifo_event_t),
0 /* consumer pid */ ,
0
@@ -132,36 +173,31 @@ application_new (application_type_t type, session_type_t sst,
svm_pop_heap (oldheap);
- /* If a server, allocate session manager */
- if (type == APP_SERVER)
- {
- pool_get (smm->session_managers, sm);
- memset (sm, 0, sizeof (*sm));
+ /* Setup segment manager */
+ sm = segment_manager_new ();
+ sm->app_index = app->index;
+ props = &app->sm_properties;
+ props->add_segment_size = options[SESSION_OPTIONS_ADD_SEGMENT_SIZE];
+ props->rx_fifo_size = options[SESSION_OPTIONS_RX_FIFO_SIZE];
+ props->tx_fifo_size = options[SESSION_OPTIONS_TX_FIFO_SIZE];
+ props->add_segment = props->add_segment_size != 0;
- app->session_manager_index = sm - smm->session_managers;
- }
- else if (type == APP_CLIENT)
- {
- /* Allocate connect session manager if needed */
- if (smm->connect_manager_index[sst] == INVALID_INDEX)
- connects_session_manager_init (smm, sst);
- app->session_manager_index = smm->connect_manager_index[sst];
- }
+ if ((rv = segment_manager_init (sm, props,
+ options[SESSION_OPTIONS_SEGMENT_SIZE])))
+ return rv;
- app->mode = type;
- app->index = application_get_index (app);
- app->session_type = sst;
+ app->first_segment_manager = segment_manager_index (sm);
app->api_client_index = api_client_index;
- app->flags = flags;
+ app->flags = options[SESSION_OPTIONS_FLAGS];
app->cb_fns = *cb_fns;
/* Check that the obvious things are properly set up */
- application_verify_cb_fns (type, cb_fns);
+ application_verify_cb_fns (cb_fns);
/* Add app to lookup by api_client_index table */
application_table_add (app);
- return app;
+ return 0;
}
application_t *
@@ -185,108 +221,286 @@ application_get_index (application_t * app)
return app - app_pool;
}
+static segment_manager_t *
+application_alloc_segment_manager (application_t * app)
+{
+ segment_manager_t *sm = 0;
+
+ if (app->first_segment_manager != (u32) ~ 0)
+ {
+ sm = segment_manager_get (app->first_segment_manager);
+ app->first_segment_manager = ~0;
+ return sm;
+ }
+
+ sm = segment_manager_new ();
+ if (segment_manager_init (sm, &app->sm_properties, 0))
+ return 0;
+ return sm;
+}
+
+/**
+ * Start listening local transport endpoint for requested transport.
+ *
+ * Creates a 'dummy' stream session with state LISTENING to be used in session
+ * lookups, prior to establishing connection. Requests transport to build
+ * it's own specific listening connection.
+ */
int
-application_server_init (application_t * server, u32 segment_size,
- u32 add_segment_size, u32 rx_fifo_size,
- u32 tx_fifo_size, u8 ** segment_name)
+application_start_listen (application_t * srv, session_type_t session_type,
+ transport_endpoint_t * tep, u64 * res)
{
- session_manager_main_t *smm = vnet_get_session_manager_main ();
- session_manager_t *sm;
- int rv;
+ segment_manager_t *sm;
+ stream_session_t *s;
+ u64 handle;
+
+ s = listen_session_new (session_type);
+ s->app_index = srv->index;
+
+ if (stream_session_listen (s, tep))
+ goto err;
+
+ /* Allocate segment manager. All sessions derived out of a listen session
+ * have fifos allocated by the same segment manager. */
+ sm = application_alloc_segment_manager (srv);
+ if (sm == 0)
+ goto err;
+
+ /* Add to app's listener table. Useful to find all child listeners
+ * when app goes down, although, just for unbinding this is not needed */
+ handle = listen_session_get_handle (s);
+ hash_set (srv->listeners_table, handle, segment_manager_index (sm));
- sm = session_manager_get (server->session_manager_index);
+ *res = handle;
+ return 0;
+
+err:
+ listen_session_del (s);
+ return -1;
+}
+
+/**
+ * Stop listening on session associated to handle
+ */
+int
+application_stop_listen (application_t * srv, u64 handle)
+{
+ stream_session_t *listener;
+ uword *indexp;
+ segment_manager_t *sm;
- /* Add first segment */
- if ((rv = session_manager_add_first_segment (smm, sm, segment_size,
- segment_name)))
+ if (srv && hash_get (srv->listeners_table, handle) == 0)
{
- return rv;
+ clib_warning ("app doesn't own handle %llu!", handle);
+ return -1;
}
- /* Setup session manager */
- sm->add_segment_size = add_segment_size;
- sm->rx_fifo_size = rx_fifo_size;
- sm->tx_fifo_size = tx_fifo_size;
- sm->add_segment = sm->add_segment_size != 0;
+ listener = listen_session_get_from_handle (handle);
+ stream_session_stop_listen (listener);
+
+ indexp = hash_get (srv->listeners_table, handle);
+ ASSERT (indexp);
+
+ sm = segment_manager_get (*indexp);
+ segment_manager_del (sm);
+ hash_unset (srv->listeners_table, handle);
+ listen_session_del (listener);
+
return 0;
}
+int
+application_open_session (application_t * app, session_type_t sst,
+ transport_endpoint_t * tep, u32 api_context)
+{
+ segment_manager_t *sm;
+ transport_connection_t *tc = 0;
+ int rv;
+
+ /* Make sure we have a segment manager for connects */
+ if (app->connects_seg_manager == (u32) ~ 0)
+ {
+ sm = application_alloc_segment_manager (app);
+ if (sm == 0)
+ return -1;
+ app->connects_seg_manager = segment_manager_index (sm);
+ }
+
+ if ((rv = stream_session_open (app->index, sst, tep, &tc)))
+ return rv;
+
+ /* Store api_context for when the reply comes. Not the nicest thing
+ * but better allocating a separate half-open pool. */
+ tc->s_index = api_context;
+
+ return 0;
+}
+
+segment_manager_t *
+application_get_connect_segment_manager (application_t * app)
+{
+ ASSERT (app->connects_seg_manager != (u32) ~ 0);
+ return segment_manager_get (app->connects_seg_manager);
+}
+
+segment_manager_t *
+application_get_listen_segment_manager (application_t * app,
+ stream_session_t * s)
+{
+ uword *smp;
+ smp = hash_get (app->listeners_table, listen_session_get_handle (s));
+ ASSERT (smp != 0);
+ return segment_manager_get (*smp);
+}
+
+static u8 *
+app_get_name_from_reg_index (application_t * app)
+{
+ u8 *app_name;
+
+ vl_api_registration_t *regp;
+ regp = vl_api_client_index_to_registration (app->api_client_index);
+ if (!regp)
+ app_name = format (0, "builtin-%d%c", app->index, 0);
+ else
+ app_name = format (0, "%s%c", regp->name, 0);
+
+ return app_name;
+}
+
u8 *
-format_application_server (u8 * s, va_list * args)
+format_application_listener (u8 * s, va_list * args)
{
- application_t *srv = va_arg (*args, application_t *);
+ application_t *app = va_arg (*args, application_t *);
+ u64 handle = va_arg (*args, u64);
+ u32 index = va_arg (*args, u32);
int verbose = va_arg (*args, int);
- vl_api_registration_t *regp;
stream_session_t *listener;
- u8 *server_name, *str, *seg_name;
- u32 segment_size;
+ u8 *app_name, *str;
- if (srv == 0)
+ if (app == 0)
{
if (verbose)
- s = format (s, "%-40s%-20s%-15s%-15s%-10s", "Connection", "Server",
- "Segment", "API Client", "Cookie");
+ s = format (s, "%-40s%-20s%-15s%-15s%-10s", "Connection", "App",
+ "API Client", "ListenerID", "SegManager");
else
- s = format (s, "%-40s%-20s", "Connection", "Server");
+ s = format (s, "%-40s%-20s", "Connection", "App");
return s;
}
- regp = vl_api_client_index_to_registration (srv->api_client_index);
- if (!regp)
- server_name = format (0, "builtin-%d%c", srv->index, 0);
- else
- server_name = regp->name;
-
- listener = stream_session_listener_get (srv->session_type,
- srv->session_index);
+ app_name = app_get_name_from_reg_index (app);
+ listener = listen_session_get_from_handle (handle);
str = format (0, "%U", format_stream_session, listener, verbose);
- session_manager_get_segment_info (listener->server_segment_index, &seg_name,
- &segment_size);
if (verbose)
{
- s = format (s, "%-40s%-20s%-20s%-10d%-10d", str, server_name,
- seg_name, srv->api_client_index, srv->accept_cookie);
+ s = format (s, "%-40s%-20s%-15u%-15u%-10u", str, app_name,
+ app->api_client_index, handle, index);
}
else
- s = format (s, "%-40s%-20s", str, server_name);
+ s = format (s, "%-40s%-20s", str, app_name);
+
+ vec_free (app_name);
return s;
}
-u8 *
-format_application_client (u8 * s, va_list * args)
+void
+application_format_connects (application_t * app, int verbose)
{
- application_t *client = va_arg (*args, application_t *);
- int verbose = va_arg (*args, int);
- stream_session_t *session;
- u8 *str, *seg_name;
- u32 segment_size;
+ vlib_main_t *vm = vlib_get_main ();
+ segment_manager_t *sm;
+ u8 *app_name, *s = 0;
+ int i, j;
- if (client == 0)
+ /* Header */
+ if (app == 0)
{
if (verbose)
- s =
- format (s, "%-40s%-20s%-10s", "Connection", "Segment",
- "API Client");
+ vlib_cli_output (vm, "%-40s%-20s%-15s%-10s", "Connection", "App",
+ "API Client", "SegManager");
else
- s = format (s, "%-40s", "Connection");
+ vlib_cli_output (vm, "%-40s%-20s", "Connection", "App");
+ return;
+ }
- return s;
+ /* make sure */
+ if (app->connects_seg_manager == (u32) ~ 0)
+ return;
+
+ app_name = app_get_name_from_reg_index (app);
+
+ /* Across all fifo segments */
+ sm = segment_manager_get (app->connects_seg_manager);
+ for (j = 0; j < vec_len (sm->segment_indices); j++)
+ {
+ svm_fifo_segment_private_t *fifo_segment;
+ svm_fifo_t **fifos;
+ u8 *str;
+
+ fifo_segment = svm_fifo_get_segment (sm->segment_indices[j]);
+ fifos = svm_fifo_segment_get_fifos (fifo_segment);
+ for (i = 0; i < vec_len (fifos); i++)
+ {
+ svm_fifo_t *fifo;
+ u32 session_index, thread_index;
+ stream_session_t *session;
+
+ /* There are 2 fifos/session. Avoid printing twice. */
+ if (i % 2)
+ continue;
+
+ fifo = fifos[i];
+ session_index = fifo->server_session_index;
+ thread_index = fifo->server_thread_index;
+
+ session = stream_session_get (session_index, thread_index);
+ str = format (0, "%U", format_stream_session, session, verbose);
+
+ if (verbose)
+ s = format (s, "%-40s%-20s%-15u%-10u", str, app_name,
+ app->api_client_index, app->connects_seg_manager);
+ else
+ s = format (s, "%-40s%-20s", str, app_name);
+
+ vlib_cli_output (vm, "%v", s);
+
+ vec_reset_length (s);
+ vec_free (str);
+ }
+ vec_free (s);
}
- session = stream_session_get (client->session_index, client->thread_index);
- str = format (0, "%U", format_stream_session, session, verbose);
+ vec_free (app_name);
+}
- session_manager_get_segment_info (session->server_segment_index, &seg_name,
- &segment_size);
- if (verbose)
+u8 *
+format_application (u8 * s, va_list * args)
+{
+ application_t *app = va_arg (*args, application_t *);
+ CLIB_UNUSED (int verbose) = va_arg (*args, int);
+ u8 *app_name;
+
+ if (app == 0)
{
- s = format (s, "%-40s%-20s%-10d%", str, seg_name,
- client->api_client_index);
+ if (verbose)
+ s = format (s, "%-10s%-20s%-15s%-15s%-15s%-15s", "Index", "Name",
+ "API Client", "Add seg size", "Rx fifo size",
+ "Tx fifo size");
+ else
+ s = format (s, "%-10s%-20s%-20s", "Index", "Name", "API Client");
+ return s;
}
+
+ app_name = app_get_name_from_reg_index (app);
+ if (verbose)
+ s = format (s, "%-10d%-20s%-15d%-15d%-15d%-15d", app->index, app_name,
+ app->api_client_index, app->sm_properties.add_segment_size,
+ app->sm_properties.rx_fifo_size,
+ app->sm_properties.tx_fifo_size);
else
- s = format (s, "%-40s", str);
+ s = format (s, "%-10d%-20s%-20d", app->index, app_name,
+ app->api_client_index);
return s;
}
@@ -294,13 +508,12 @@ static clib_error_t *
show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
vlib_cli_command_t * cmd)
{
- session_manager_main_t *smm = &session_manager_main;
application_t *app;
int do_server = 0;
int do_client = 0;
int verbose = 0;
- if (!smm->is_enabled)
+ if (!session_manager_is_enabled ())
{
clib_error_return (0, "session layer is not enabled");
}
@@ -319,17 +532,24 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
if (do_server)
{
+ u64 handle;
+ u32 index;
if (pool_elts (app_pool))
{
- vlib_cli_output (vm, "%U", format_application_server,
- 0 /* header */ ,
+ vlib_cli_output (vm, "%U", format_application_listener,
+ 0 /* header */ , 0, 0,
verbose);
/* *INDENT-OFF* */
pool_foreach (app, app_pool,
({
- if (app->mode == APP_SERVER)
- vlib_cli_output (vm, "%U", format_application_server, app,
- verbose);
+ /* App's listener sessions */
+ if (hash_elts (app->listeners_table) == 0)
+ continue;
+ hash_foreach (handle, index, app->listeners_table,
+ ({
+ vlib_cli_output (vm, "%U", format_application_listener, app,
+ handle, index, verbose);
+ }));
}));
/* *INDENT-ON* */
}
@@ -341,15 +561,14 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
{
if (pool_elts (app_pool))
{
- vlib_cli_output (vm, "%U", format_application_client,
- 0 /* header */ ,
- verbose);
+ application_format_connects (0, verbose);
+
/* *INDENT-OFF* */
pool_foreach (app, app_pool,
({
- if (app->mode == APP_CLIENT)
- vlib_cli_output (vm, "%U", format_application_client, app,
- verbose);
+ if (app->connects_seg_manager == (u32)~0)
+ continue;
+ application_format_connects (app, verbose);
}));
/* *INDENT-ON* */
}
@@ -357,6 +576,19 @@ show_app_command_fn (vlib_main_t * vm, unformat_input_t * input,
vlib_cli_output (vm, "No active client bindings");
}
+ /* Print app related info */
+ if (!do_server && !do_client)
+ {
+ vlib_cli_output (vm, "%U", format_application, 0, verbose);
+ pool_foreach (app, app_pool, (
+ {
+ vlib_cli_output (vm, "%U",
+ format_application, app,
+ verbose);
+ }
+ ));
+ }
+
return 0;
}
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index 480828f7..6bcee9d3 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -18,11 +18,13 @@
#include <vnet/vnet.h>
#include <vnet/session/session.h>
+#include <vnet/session/segment_manager.h>
typedef enum
{
APP_SERVER,
- APP_CLIENT
+ APP_CLIENT,
+ APP_N_TYPES
} application_type_t;
typedef struct _stream_session_cb_vft
@@ -35,7 +37,7 @@ typedef struct _stream_session_cb_vft
int (*session_accept_callback) (stream_session_t * new_session);
/* Connection request callback */
- int (*session_connected_callback) (u32 api_client_index,
+ int (*session_connected_callback) (u32 app_index, u32 api_context,
stream_session_t * s, u8 code);
/** Notify app that session is closing */
@@ -59,45 +61,52 @@ typedef struct _application
/** Flags */
u32 flags;
+ /* Stream server mode: accept or connect
+ * TODO REMOVE*/
+ u8 mode;
+
+ /** Index of the listen session or connect session
+ * TODO REMOVE*/
+ u32 session_index;
+
+ /** Session thread index for client connect sessions
+ * TODO REMOVE */
+ u32 thread_index;
+
+ /*
+ * Binary API interface to external app
+ */
+
/** Binary API connection index, ~0 if internal */
u32 api_client_index;
- /* */
- u32 api_context;
-
/** Application listens for events on this svm queue */
unix_shared_memory_queue_t *event_queue;
- /** Stream session type */
- u8 session_type;
-
- /* Stream server mode: accept or connect */
- u8 mode;
+ /*
+ * Callbacks: shoulder-taps for the server/client
+ */
- u32 session_manager_index;
+ session_cb_vft_t cb_fns;
/*
- * Bind/Listen specific
+ * svm segment management
*/
+ u32 connects_seg_manager;
- /** Accept cookie, for multiple session flavors ($$$ maybe) */
- u32 accept_cookie;
+ /* Lookup tables for listeners. Value is segment manager index */
+ uword *listeners_table;
- /** Index of the listen session or connect session */
- u32 session_index;
+ u32 first_segment_manager;
- /** Session thread index for client connect sessions */
- u32 thread_index;
-
- /*
- * Callbacks: shoulder-taps for the server/client
- */
- session_cb_vft_t cb_fns;
+ /** Segment manager properties. Shared by all segment managers */
+ segment_manager_properties_t sm_properties;
} application_t;
-application_t *application_new (application_type_t type, session_type_t sst,
- u32 api_client_index, u32 flags,
- session_cb_vft_t * cb_fns);
+application_t *application_new ();
+int
+application_init (application_t * app, u32 api_client_index, u64 * options,
+ session_cb_vft_t * cb_fns);
void application_del (application_t * app);
application_t *application_get (u32 index);
application_t *application_get_if_valid (u32 index);
@@ -105,11 +114,21 @@ application_t *application_lookup (u32 api_client_index);
u32 application_get_index (application_t * app);
int
-application_server_init (application_t * server, u32 segment_size,
- u32 add_segment_size, u32 rx_fifo_size,
- u32 tx_fifo_size, u8 ** segment_name);
+application_start_listen (application_t * app, session_type_t session_type,
+ transport_endpoint_t * tep, u64 * handle);
+int application_stop_listen (application_t * srv, u64 handle);
+int
+application_open_session (application_t * app, session_type_t sst,
+ transport_endpoint_t * tep, u32 api_context);
int application_api_queue_is_full (application_t * app);
+segment_manager_t *application_get_listen_segment_manager (application_t *
+ app,
+ stream_session_t *
+ s);
+segment_manager_t *application_get_connect_segment_manager (application_t *
+ app);
+
#endif /* SRC_VNET_SESSION_APPLICATION_H_ */
/*
diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c
index 4b30bd87..96d2c621 100644
--- a/src/vnet/session/application_interface.c
+++ b/src/vnet/session/application_interface.c
@@ -79,81 +79,51 @@ api_parse_session_handle (u64 handle, u32 * session_index, u32 * thread_index)
}
int
-vnet_bind_i (u32 api_client_index, ip46_address_t * ip46, u16 port_host_order,
- session_type_t sst, u64 * options, session_cb_vft_t * cb_fns,
- application_t ** app, u32 * len_seg_name, char *seg_name)
+vnet_bind_i (u32 app_index, session_type_t sst,
+ transport_endpoint_t * tep, u64 * handle)
{
- u8 *segment_name = 0;
- application_t *server = 0;
+ application_t *app;
stream_session_t *listener;
- u8 is_ip4;
-
- listener =
- stream_session_lookup_listener (ip46,
- clib_host_to_net_u16 (port_host_order),
- sst);
-
- if (listener)
- return VNET_API_ERROR_ADDRESS_IN_USE;
- if (application_lookup (api_client_index))
+ app = application_get_if_valid (app_index);
+ if (!app)
{
- clib_warning ("Only one connection supported for now");
- return VNET_API_ERROR_ADDRESS_IN_USE;
+ clib_warning ("app not attached");
+ return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
}
- is_ip4 = SESSION_TYPE_IP4_UDP == sst || SESSION_TYPE_IP4_TCP == sst;
- if (!ip_is_zero (ip46, is_ip4) && !ip_is_local (ip46, is_ip4))
- return VNET_API_ERROR_INVALID_VALUE;
-
- /* Allocate and initialize stream server */
- server = application_new (APP_SERVER, sst, api_client_index,
- options[SESSION_OPTIONS_FLAGS], cb_fns);
+ listener = stream_session_lookup_listener (&tep->ip,
+ clib_host_to_net_u16 (tep->port),
+ sst);
+ if (listener)
+ return VNET_API_ERROR_ADDRESS_IN_USE;
- application_server_init (server, options[SESSION_OPTIONS_SEGMENT_SIZE],
- options[SESSION_OPTIONS_ADD_SEGMENT_SIZE],
- options[SESSION_OPTIONS_RX_FIFO_SIZE],
- options[SESSION_OPTIONS_TX_FIFO_SIZE],
- &segment_name);
+ if (!ip_is_zero (&tep->ip, tep->is_ip4)
+ && !ip_is_local (&tep->ip, tep->is_ip4))
+ return VNET_API_ERROR_INVALID_VALUE_2;
/* Setup listen path down to transport */
- stream_session_start_listen (server->index, ip46, port_host_order);
-
- /*
- * Return values
- */
-
- ASSERT (vec_len (segment_name) <= 128);
- *len_seg_name = vec_len (segment_name);
- memcpy (seg_name, segment_name, *len_seg_name);
- *app = server;
-
- return 0;
+ return application_start_listen (app, sst, tep, handle);
}
int
-vnet_unbind_i (u32 api_client_index)
+vnet_unbind_i (u32 app_index, u64 handle)
{
- application_t *server;
+ application_t *app = application_get_if_valid (app_index);
- /*
- * Find the stream_server_t corresponding to the api client
- */
- server = application_lookup (api_client_index);
- if (!server)
- return VNET_API_ERROR_INVALID_VALUE_2;
+ if (!app)
+ {
+ clib_warning ("app not attached");
+ return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
+ }
/* Clear the listener */
- stream_session_stop_listen (server->index);
- application_del (server);
-
- return 0;
+ return application_stop_listen (app, handle);
}
int
-vnet_connect_i (u32 api_client_index, u32 api_context, session_type_t sst,
- ip46_address_t * ip46, u16 port, u64 * options, void *mp,
- session_cb_vft_t * cb_fns)
+vnet_connect_i (u32 app_index, u32 api_context, session_type_t sst,
+ transport_endpoint_t * tep, void *mp)
{
stream_session_t *listener;
application_t *server, *app;
@@ -161,8 +131,8 @@ vnet_connect_i (u32 api_client_index, u32 api_context, session_type_t sst,
/*
* Figure out if connecting to a local server
*/
- listener = stream_session_lookup_listener (ip46,
- clib_host_to_net_u16 (port),
+ listener = stream_session_lookup_listener (&tep->ip,
+ clib_host_to_net_u16 (tep->port),
sst);
if (listener)
{
@@ -177,16 +147,11 @@ vnet_connect_i (u32 api_client_index, u32 api_context, session_type_t sst,
redirect_connect_callback (server->api_client_index, mp);
}
- /* Create client app */
- app = application_new (APP_CLIENT, sst, api_client_index,
- options[SESSION_OPTIONS_FLAGS], cb_fns);
-
- app->api_context = api_context;
-
/*
* Not connecting to a local server. Create regular session
*/
- return stream_session_open (sst, ip46, port, app->index);
+ app = application_get (app_index);
+ return application_open_session (app, sst, tep, api_context);
}
/**
@@ -209,30 +174,31 @@ vnet_connect_i (u32 api_client_index, u32 api_context, session_type_t sst,
uword
unformat_vnet_uri (unformat_input_t * input, va_list * args)
{
- ip46_address_t *address = va_arg (*args, ip46_address_t *);
session_type_t *sst = va_arg (*args, session_type_t *);
- u16 *port = va_arg (*args, u16 *);
+ transport_endpoint_t *tep = va_arg (*args, transport_endpoint_t *);
- if (unformat (input, "tcp://%U/%d", unformat_ip4_address, &address->ip4,
- port))
+ if (unformat (input, "tcp://%U/%d", unformat_ip4_address, &tep->ip.ip4,
+ &tep->port))
{
*sst = SESSION_TYPE_IP4_TCP;
+ tep->is_ip4 = 1;
return 1;
}
- if (unformat (input, "udp://%U/%d", unformat_ip4_address, &address->ip4,
- port))
+ if (unformat (input, "udp://%U/%d", unformat_ip4_address, &tep->ip.ip4,
+ &tep->port))
{
*sst = SESSION_TYPE_IP4_UDP;
+ tep->is_ip4 = 1;
return 1;
}
- if (unformat (input, "udp://%U/%d", unformat_ip6_address, &address->ip6,
- port))
+ if (unformat (input, "udp://%U/%d", unformat_ip6_address, &tep->ip.ip6,
+ &tep->port))
{
*sst = SESSION_TYPE_IP6_UDP;
return 1;
}
- if (unformat (input, "tcp://%U/%d", unformat_ip6_address, &address->ip6,
- port))
+ if (unformat (input, "tcp://%U/%d", unformat_ip6_address, &tep->ip.ip6,
+ &tep->port))
{
*sst = SESSION_TYPE_IP6_TCP;
return 1;
@@ -242,8 +208,7 @@ unformat_vnet_uri (unformat_input_t * input, va_list * args)
}
int
-parse_uri (char *uri, session_type_t * sst, ip46_address_t * addr,
- u16 * port_number_host_byte_order)
+parse_uri (char *uri, session_type_t * sst, transport_endpoint_t * tep)
{
unformat_input_t _input, *input = &_input;
@@ -252,8 +217,7 @@ parse_uri (char *uri, session_type_t * sst, ip46_address_t * addr,
/* Parse uri */
unformat_init_string (input, uri, strlen (uri));
- if (!unformat (input, "%U", unformat_vnet_uri, addr, sst,
- port_number_host_byte_order))
+ if (!unformat (input, "%U", unformat_vnet_uri, sst, tep))
{
unformat_free (input);
return VNET_API_ERROR_INVALID_VALUE;
@@ -263,26 +227,51 @@ parse_uri (char *uri, session_type_t * sst, ip46_address_t * addr,
return 0;
}
+/**
+ * Attaches application.
+ *
+ * Allocates a vpp app, i.e., a structure that keeps back pointers
+ * to external app and a segment manager for shared memory fifo based
+ * communication with the external app.
+ */
int
-vnet_bind_uri (vnet_bind_args_t * a)
+vnet_application_attach (vnet_app_attach_args_t * a)
{
- application_t *server = 0;
- u16 port_host_order;
- session_type_t sst = SESSION_N_TYPES;
- ip46_address_t ip46;
+ application_t *app = 0;
+ segment_manager_t *sm;
+ u8 *seg_name;
int rv;
- memset (&ip46, 0, sizeof (ip46));
- rv = parse_uri (a->uri, &sst, &ip46, &port_host_order);
- if (rv)
+ app = application_new ();
+ if ((rv = application_init (app, a->api_client_index, a->options,
+ a->session_cb_vft)))
return rv;
- if ((rv = vnet_bind_i (a->api_client_index, &ip46, port_host_order, sst,
- a->options, a->session_cb_vft, &server,
- &a->segment_name_length, a->segment_name)))
- return rv;
+ a->app_event_queue_address = (u64) app->event_queue;
+ sm = segment_manager_get (app->first_segment_manager);
+ segment_manager_get_segment_info (sm->segment_indices[0],
+ &seg_name, &a->segment_size);
- a->server_event_queue_address = (u64) server->event_queue;
+ a->segment_name_length = vec_len (seg_name);
+ a->segment_name = seg_name;
+ ASSERT (vec_len (a->segment_name) <= 128);
+ a->app_index = app->index;
+ return 0;
+}
+
+int
+vnet_application_detach (vnet_app_detach_args_t * a)
+{
+ application_t *app;
+ app = application_get_if_valid (a->app_index);
+
+ if (!app)
+ {
+ clib_warning ("app not attached");
+ return VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
+ }
+
+ application_del (app);
return 0;
}
@@ -308,125 +297,102 @@ session_type_from_proto_and_ip (session_api_proto_t proto, u8 is_ip4)
}
int
-vnet_unbind_uri (char *uri, u32 api_client_index)
+vnet_bind_uri (vnet_bind_args_t * a)
{
- u16 port_number_host_byte_order;
session_type_t sst = SESSION_N_TYPES;
- ip46_address_t ip46_address;
- stream_session_t *listener;
+ transport_endpoint_t tep;
int rv;
- rv = parse_uri (uri, &sst, &ip46_address, &port_number_host_byte_order);
+ memset (&tep, 0, sizeof (tep));
+ rv = parse_uri (a->uri, &sst, &tep);
if (rv)
return rv;
- listener =
- stream_session_lookup_listener (&ip46_address,
- clib_host_to_net_u16
- (port_number_host_byte_order), sst);
+ if ((rv = vnet_bind_i (a->app_index, sst, &tep, &a->handle)))
+ return rv;
+
+ return 0;
+}
+
+int
+vnet_unbind_uri (vnet_unbind_args_t * a)
+{
+ session_type_t sst = SESSION_N_TYPES;
+ stream_session_t *listener;
+ transport_endpoint_t tep;
+ int rv;
+
+ rv = parse_uri (a->uri, &sst, &tep);
+ if (rv)
+ return rv;
+ listener = stream_session_lookup_listener (&tep.ip,
+ clib_host_to_net_u16 (tep.port),
+ sst);
if (!listener)
return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
- /* External client? */
- if (api_client_index != ~0)
- {
- ASSERT (vl_api_client_index_to_registration (api_client_index));
- }
-
- return vnet_unbind_i (api_client_index);
+ return vnet_unbind_i (a->app_index, listen_session_get_handle (listener));
}
int
vnet_connect_uri (vnet_connect_args_t * a)
{
- ip46_address_t ip46_address;
- u16 port;
+ transport_endpoint_t tep;
session_type_t sst;
- application_t *app;
int rv;
- app = application_lookup (a->api_client_index);
- if (app)
- {
- clib_warning ("Already have a connect from this app");
- return VNET_API_ERROR_INVALID_VALUE_2;
- }
-
/* Parse uri */
- rv = parse_uri (a->uri, &sst, &ip46_address, &port);
+ memset (&tep, 0, sizeof (tep));
+ rv = parse_uri (a->uri, &sst, &tep);
if (rv)
return rv;
- return vnet_connect_i (a->api_client_index, a->api_context, sst,
- &ip46_address, port, a->options, a->mp,
- a->session_cb_vft);
+ return vnet_connect_i (a->app_index, a->api_context, sst, &tep, a->mp);
}
int
-vnet_disconnect_session (u32 session_index, u32 thread_index)
+vnet_disconnect_session (vnet_disconnect_args_t * a)
{
- stream_session_t *session;
+ u32 index, thread_index;
+ stream_session_t *s;
- session = stream_session_get (session_index, thread_index);
- stream_session_disconnect (session);
+ stream_session_parse_handle (a->handle, &index, &thread_index);
+ s = stream_session_get_if_valid (index, thread_index);
+
+ if (!s || s->app_index != a->app_index)
+ return VNET_API_ERROR_INVALID_VALUE;
+ stream_session_disconnect (s);
return 0;
}
-
int
vnet_bind (vnet_bind_args_t * a)
{
- application_t *server = 0;
session_type_t sst = SESSION_N_TYPES;
int rv;
sst = session_type_from_proto_and_ip (a->proto, a->tep.is_ip4);
- if ((rv = vnet_bind_i (a->api_client_index, &a->tep.ip, a->tep.port, sst,
- a->options, a->session_cb_vft, &server,
- &a->segment_name_length, a->segment_name)))
+ if ((rv = vnet_bind_i (a->app_index, sst, &a->tep, &a->handle)))
return rv;
- a->server_event_queue_address = (u64) server->event_queue;
- a->handle = (u64) a->tep.vrf << 32 | (u64) server->session_index;
return 0;
}
int
vnet_unbind (vnet_unbind_args_t * a)
{
- application_t *server;
-
- if (a->api_client_index != ~0)
- {
- ASSERT (vl_api_client_index_to_registration (a->api_client_index));
- }
-
- /* Make sure this is the right one */
- server = application_lookup (a->api_client_index);
- ASSERT (server->session_index == (0xFFFFFFFF & a->handle));
-
- /* TODO use handle to disambiguate namespaces/vrfs */
- return vnet_unbind_i (a->api_client_index);
+ return vnet_unbind_i (a->app_index, a->handle);
}
int
vnet_connect (vnet_connect_args_t * a)
{
session_type_t sst;
- application_t *app;
-
- app = application_lookup (a->api_client_index);
- if (app)
- {
- clib_warning ("Already have a connect from this app");
- return VNET_API_ERROR_INVALID_VALUE_2;
- }
sst = session_type_from_proto_and_ip (a->proto, a->tep.is_ip4);
- return vnet_connect_i (a->api_client_index, a->api_context, sst, &a->tep.ip,
- a->tep.port, a->options, a->mp, a->session_cb_vft);
+ return vnet_connect_i (a->app_index, a->api_context, sst, &a->tep, a->mp);
}
int
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index a5f2b9a6..2c497531 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -28,6 +28,27 @@ typedef enum _session_api_proto
SESSION_PROTO_UDP
} session_api_proto_t;
+typedef struct _vnet_app_attach_args_t
+{
+ u32 api_client_index;
+ u64 *options;
+ session_cb_vft_t *session_cb_vft;
+
+ /*
+ * Results
+ */
+ u8 *segment_name;
+ u32 segment_name_length;
+ u32 segment_size;
+ u64 app_event_queue_address;
+ u32 app_index;
+} vnet_app_attach_args_t;
+
+typedef struct _vnet_app_detach_args_t
+{
+ u32 app_index;
+} vnet_app_detach_args_t;
+
typedef struct _vnet_bind_args_t
{
union
@@ -40,9 +61,7 @@ typedef struct _vnet_bind_args_t
};
};
- u32 api_client_index;
- u64 *options;
- session_cb_vft_t *session_cb_vft;
+ u32 app_index;
/*
* Results
@@ -60,7 +79,7 @@ typedef struct _vnet_unbind_args_t
char *uri;
u64 handle;
};
- u32 api_client_index;
+ u32 app_index;
} vnet_unbind_args_t;
typedef struct _vnet_connect_args
@@ -74,10 +93,8 @@ typedef struct _vnet_connect_args
session_api_proto_t proto;
};
};
- u32 api_client_index;
+ u32 app_index;
u32 api_context;
- u64 *options;
- session_cb_vft_t *session_cb_vft;
/* Used for redirects */
void *mp;
@@ -86,12 +103,13 @@ typedef struct _vnet_connect_args
typedef struct _vnet_disconnect_args_t
{
u64 handle;
- u32 api_client_index;
+ u32 app_index;
} vnet_disconnect_args_t;
-/* Bind / connect options */
+/* Application attach options */
typedef enum
{
+ APP_EVT_QUEUE_SIZE,
SESSION_OPTIONS_FLAGS,
SESSION_OPTIONS_SEGMENT_SIZE,
SESSION_OPTIONS_ADD_SEGMENT_SIZE,
@@ -99,7 +117,7 @@ typedef enum
SESSION_OPTIONS_TX_FIFO_SIZE,
SESSION_OPTIONS_ACCEPT_COOKIE,
SESSION_OPTIONS_N_OPTIONS
-} session_options_index_t;
+} app_attach_options_index_t;
/** Server can handle delegated connect requests from local clients */
#define SESSION_OPTIONS_FLAGS_USE_FIFO (1<<0)
@@ -109,10 +127,13 @@ typedef enum
#define VNET_CONNECT_REDIRECTED 123
+int vnet_application_attach (vnet_app_attach_args_t * a);
+int vnet_application_detach (vnet_app_detach_args_t * a);
+
int vnet_bind_uri (vnet_bind_args_t *);
-int vnet_unbind_uri (char *uri, u32 api_client_index);
+int vnet_unbind_uri (vnet_unbind_args_t * a);
int vnet_connect_uri (vnet_connect_args_t * a);
-int vnet_disconnect_session (u32 session_index, u32 thread_index);
+int vnet_disconnect_session (vnet_disconnect_args_t * a);
int vnet_bind (vnet_bind_args_t * a);
int vnet_connect (vnet_connect_args_t * a);
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
new file mode 100644
index 00000000..16e5bc56
--- /dev/null
+++ b/src/vnet/session/segment_manager.c
@@ -0,0 +1,342 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vnet/session/segment_manager.h>
+#include <vnet/session/session.h>
+#include <vnet/session/application.h>
+
+/**
+ * Counter used to build segment names
+ */
+u32 segment_name_counter = 0;
+
+/**
+ * Pool of segment managers
+ */
+segment_manager_t *segment_managers = 0;
+
+/**
+ * Default fifo and segment size. TODO config.
+ */
+u32 default_fifo_size = 1 << 16;
+u32 default_segment_size = 1 << 20;
+
+void
+segment_manager_get_segment_info (u32 index, u8 ** name, u32 * size)
+{
+ svm_fifo_segment_private_t *s;
+ s = svm_fifo_get_segment (index);
+ *name = s->h->segment_name;
+ *size = s->ssvm.ssvm_size;
+}
+
+always_inline int
+session_manager_add_segment_i (segment_manager_t * sm, u32 segment_size,
+ u8 * segment_name)
+{
+ svm_fifo_segment_create_args_t _ca, *ca = &_ca;
+ int rv;
+
+ memset (ca, 0, sizeof (*ca));
+
+ ca->segment_name = (char *) segment_name;
+ ca->segment_size = segment_size;
+
+ rv = svm_fifo_segment_create (ca);
+ if (rv)
+ {
+ clib_warning ("svm_fifo_segment_create ('%s', %d) failed",
+ ca->segment_name, ca->segment_size);
+ vec_free (segment_name);
+ return VNET_API_ERROR_SVM_SEGMENT_CREATE_FAIL;
+ }
+
+ vec_add1 (sm->segment_indices, ca->new_segment_index);
+
+ return 0;
+}
+
+int
+session_manager_add_segment (segment_manager_t * sm)
+{
+ u8 *segment_name;
+ svm_fifo_segment_create_args_t _ca, *ca = &_ca;
+ u32 add_segment_size;
+ int rv;
+
+ memset (ca, 0, sizeof (*ca));
+ segment_name = format (0, "%d-%d%c", getpid (), segment_name_counter++, 0);
+ add_segment_size = sm->properties->add_segment_size ?
+ sm->properties->add_segment_size : default_segment_size;
+
+ rv = session_manager_add_segment_i (sm, add_segment_size, segment_name);
+ vec_free (segment_name);
+ return rv;
+}
+
+int
+session_manager_add_first_segment (segment_manager_t * sm, u32 segment_size)
+{
+ svm_fifo_segment_create_args_t _ca, *ca = &_ca;
+ u8 *segment_name;
+ int rv;
+
+ memset (ca, 0, sizeof (*ca));
+ segment_name = format (0, "%d-%d%c", getpid (), segment_name_counter++, 0);
+ rv = session_manager_add_segment_i (sm, segment_size, segment_name);
+ vec_free (segment_name);
+ return rv;
+}
+
+/**
+ * Initializes segment manager based on options provided.
+ * Returns error if svm segment allocation fails.
+ */
+int
+segment_manager_init (segment_manager_t * sm,
+ segment_manager_properties_t * properties,
+ u32 first_seg_size)
+{
+ int rv;
+
+ /* app allocates these */
+ sm->properties = properties;
+
+ if (first_seg_size > 0)
+ {
+ rv = session_manager_add_first_segment (sm, first_seg_size);
+ if (rv)
+ {
+ clib_warning ("Failed to allocate segment");
+ return rv;
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * Removes segment manager.
+ *
+ * Since the fifos allocated in the segment keep backpointers to the sessions
+ * prior to removing the segment, we call session disconnect. This
+ * subsequently propages into transport.
+ */
+void
+segment_manager_del (segment_manager_t * sm)
+{
+ u32 *deleted_sessions = 0;
+ u32 *deleted_thread_indices = 0;
+ int i, j;
+
+ /* Across all fifo segments used by the server */
+ for (j = 0; j < vec_len (sm->segment_indices); j++)
+ {
+ svm_fifo_segment_private_t *fifo_segment;
+ svm_fifo_t **fifos;
+ /* Vector of fifos allocated in the segment */
+ fifo_segment = svm_fifo_get_segment (sm->segment_indices[j]);
+ fifos = svm_fifo_segment_get_fifos (fifo_segment);
+
+ /*
+ * Remove any residual sessions from the session lookup table
+ * Don't bother deleting the individual fifos, we're going to
+ * throw away the fifo segment in a minute.
+ */
+ for (i = 0; i < vec_len (fifos); i++)
+ {
+ svm_fifo_t *fifo;
+ u32 session_index, thread_index;
+ stream_session_t *session;
+
+ fifo = fifos[i];
+ session_index = fifo->server_session_index;
+ thread_index = fifo->server_thread_index;
+
+ session = stream_session_get (session_index, thread_index);
+
+ /* Add to the deleted_sessions vector (once!) */
+ if (!session->is_deleted)
+ {
+ session->is_deleted = 1;
+ vec_add1 (deleted_sessions, session_index);
+ vec_add1 (deleted_thread_indices, thread_index);
+ }
+ }
+
+ for (i = 0; i < vec_len (deleted_sessions); i++)
+ {
+ stream_session_t *session;
+ session = stream_session_get (deleted_sessions[i],
+ deleted_thread_indices[i]);
+
+ /* Instead of directly removing the session call disconnect */
+ stream_session_disconnect (session);
+
+ /*
+ stream_session_table_del (smm, session);
+ pool_put(smm->sessions[deleted_thread_indices[i]], session);
+ */
+ }
+
+ vec_reset_length (deleted_sessions);
+ vec_reset_length (deleted_thread_indices);
+
+ /* Instead of removing the segment, test when removing the session if
+ * the segment can be removed
+ */
+ /* svm_fifo_segment_delete (fifo_segment); */
+ }
+
+ vec_free (deleted_sessions);
+ vec_free (deleted_thread_indices);
+ pool_put (segment_managers, sm);
+}
+
+static int
+segment_manager_notify_app_seg_add (segment_manager_t * sm,
+ u32 fifo_segment_index)
+{
+ application_t *app = application_get (sm->app_index);
+ u32 seg_size = 0;
+ u8 *seg_name;
+
+ /* Send an API message to the external app, to map new segment */
+ ASSERT (app->cb_fns.add_segment_callback);
+
+ segment_manager_get_segment_info (fifo_segment_index, &seg_name, &seg_size);
+ return app->cb_fns.add_segment_callback (app->api_client_index, seg_name,
+ seg_size);
+}
+
+int
+segment_manager_alloc_session_fifos (segment_manager_t * sm,
+ svm_fifo_t ** server_rx_fifo,
+ svm_fifo_t ** server_tx_fifo,
+ u32 * fifo_segment_index)
+{
+ svm_fifo_segment_private_t *fifo_segment;
+ u32 fifo_size, sm_index;
+ u8 added_a_segment = 0;
+ int i;
+
+ /* Allocate svm fifos */
+ ASSERT (vec_len (sm->segment_indices));
+
+again:
+ for (i = 0; i < vec_len (sm->segment_indices); i++)
+ {
+ *fifo_segment_index = sm->segment_indices[i];
+ fifo_segment = svm_fifo_get_segment (*fifo_segment_index);
+
+ fifo_size = sm->properties->rx_fifo_size;
+ fifo_size = (fifo_size == 0) ? default_fifo_size : fifo_size;
+ *server_rx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, fifo_size);
+
+ fifo_size = sm->properties->tx_fifo_size;
+ fifo_size = (fifo_size == 0) ? default_fifo_size : fifo_size;
+ *server_tx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, fifo_size);
+
+ if (*server_rx_fifo == 0)
+ {
+ /* This would be very odd, but handle it... */
+ if (*server_tx_fifo != 0)
+ {
+ svm_fifo_segment_free_fifo (fifo_segment, *server_tx_fifo);
+ *server_tx_fifo = 0;
+ }
+ continue;
+ }
+ if (*server_tx_fifo == 0)
+ {
+ if (*server_rx_fifo != 0)
+ {
+ svm_fifo_segment_free_fifo (fifo_segment, *server_rx_fifo);
+ *server_rx_fifo = 0;
+ }
+ continue;
+ }
+ break;
+ }
+
+ /* See if we're supposed to create another segment */
+ if (*server_rx_fifo == 0)
+ {
+ if (sm->properties->add_segment)
+ {
+ if (added_a_segment)
+ {
+ clib_warning ("added a segment, still cant allocate a fifo");
+ return SESSION_ERROR_NEW_SEG_NO_SPACE;
+ }
+
+ if (session_manager_add_segment (sm))
+ return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
+
+ added_a_segment = 1;
+ goto again;
+ }
+ else
+ {
+ clib_warning ("No space to allocate fifos!");
+ return SESSION_ERROR_NO_SPACE;
+ }
+ }
+
+ if (added_a_segment)
+ return segment_manager_notify_app_seg_add (sm, *fifo_segment_index);
+
+ /* Backpointers to segment manager */
+ sm_index = segment_manager_index (sm);
+ (*server_tx_fifo)->segment_manager = sm_index;
+ (*server_rx_fifo)->segment_manager = sm_index;
+
+ return 0;
+}
+
+void
+segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
+ svm_fifo_t * tx_fifo)
+{
+ segment_manager_t *sm;
+ svm_fifo_segment_private_t *fifo_segment;
+
+ fifo_segment = svm_fifo_get_segment (svm_segment_index);
+ svm_fifo_segment_free_fifo (fifo_segment, rx_fifo);
+ svm_fifo_segment_free_fifo (fifo_segment, tx_fifo);
+
+ /* If we have segment manager, try doing some cleanup.
+ * It's possible to have no segment manager if the session was removed
+ * as result of a detach */
+ sm = segment_manager_get_if_valid (rx_fifo->segment_manager);
+ if (sm)
+ {
+ /* Remove segment only if it holds no fifos and not the first */
+ if (sm->segment_indices[0] != svm_segment_index
+ && !svm_fifo_segment_has_fifos (fifo_segment))
+ {
+ svm_fifo_segment_delete (fifo_segment);
+ vec_del1 (sm->segment_indices, svm_segment_index);
+ }
+ }
+}
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/vnet/session/segment_manager.h b/src/vnet/session/segment_manager.h
new file mode 100644
index 00000000..778d6040
--- /dev/null
+++ b/src/vnet/session/segment_manager.h
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SRC_VNET_SESSION_SEGMENT_MANAGER_H_
+#define SRC_VNET_SESSION_SEGMENT_MANAGER_H_
+
+#include <vnet/vnet.h>
+#include <svm/svm_fifo_segment.h>
+
+typedef struct _segment_manager_properties
+{
+ /** Session fifo sizes. */
+ u32 rx_fifo_size;
+ u32 tx_fifo_size;
+
+ /** Configured additional segment size */
+ u32 add_segment_size;
+
+ /** Flag that indicates if additional segments should be created */
+ u8 add_segment;
+
+} segment_manager_properties_t;
+
+typedef struct _segment_manager
+{
+ /** segments mapped by this manager */
+ u32 *segment_indices;
+
+ /** Owner app index */
+ u32 app_index;
+
+ /** Pointer to manager properties. Could be shared among all of
+ * an app's segment managers s*/
+ segment_manager_properties_t *properties;
+} segment_manager_t;
+
+/** Pool of segment managers */
+extern segment_manager_t *segment_managers;
+
+always_inline segment_manager_t *
+segment_manager_new ()
+{
+ segment_manager_t *sm;
+ pool_get (segment_managers, sm);
+ memset (sm, 0, sizeof (*sm));
+ return sm;
+}
+
+always_inline segment_manager_t *
+segment_manager_get (u32 index)
+{
+ return pool_elt_at_index (segment_managers, index);
+}
+
+always_inline segment_manager_t *
+segment_manager_get_if_valid (u32 index)
+{
+ if (pool_is_free_index (segment_managers, index))
+ return 0;
+ return pool_elt_at_index (segment_managers, index);
+}
+
+always_inline u32
+segment_manager_index (segment_manager_t * sm)
+{
+ return sm - segment_managers;
+}
+
+int
+segment_manager_init (segment_manager_t * sm,
+ segment_manager_properties_t * properties,
+ u32 seg_size);
+
+void segment_manager_get_segment_info (u32 index, u8 ** name, u32 * size);
+int
+session_manager_add_first_segment (segment_manager_t * sm, u32 segment_size);
+int session_manager_add_segment (segment_manager_t * sm);
+void segment_manager_del (segment_manager_t * sm);
+int
+segment_manager_alloc_session_fifos (segment_manager_t * sm,
+ svm_fifo_t ** server_rx_fifo,
+ svm_fifo_t ** server_tx_fifo,
+ u32 * fifo_segment_index);
+void
+segment_manager_dealloc_fifos (u32 svm_segment_index, svm_fifo_t * rx_fifo,
+ svm_fifo_t * tx_fifo);
+
+#endif /* SRC_VNET_SESSION_SEGMENT_MANAGER_H_ */
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/vnet/session/session.api b/src/vnet/session/session.api
index 582765b5..e207e46f 100644
--- a/src/vnet/session/session.api
+++ b/src/vnet/session/session.api
@@ -13,6 +13,68 @@
* limitations under the License.
*/
+/** \brief client->vpp, attach application to session layer
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param initial_segment_size - size of the initial shm segment to be
+ allocated
+ @param options - segment size, fifo sizes, etc.
+*/
+ define application_attach {
+ u32 client_index;
+ u32 context;
+ u32 initial_segment_size;
+ u64 options[16];
+ };
+
+ /** \brief Application attach reply
+ @param context - sender context, to match reply w/ request
+ @param retval - return code for the request
+ @param app_event_queue_address - vpp event queue address or 0 if this
+ connection shouldn't send events
+ @param segment_size - size of first shm segment
+ @param segment_name_length - length of segment name
+ @param segment_name - name of segment client needs to attach to
+*/
+define application_attach_reply {
+ u32 context;
+ i32 retval;
+ u64 app_event_queue_address;
+ u32 segment_size;
+ u8 segment_name_length;
+ u8 segment_name[128];
+};
+
+ /** \brief client->vpp, attach application to session layer
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+*/
+ define application_detach {
+ u32 client_index;
+ u32 context;
+ };
+
+ /** \brief detach reply
+ @param context - sender context, to match reply w/ request
+ @param retval - return code for the request
+*/
+define application_detach_reply {
+ u32 context;
+ i32 retval;
+};
+
+/** \brief vpp->client, please map an additional shared memory segment
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param segment_name -
+*/
+define map_another_segment {
+ u32 client_index;
+ u32 context;
+ u32 segment_size;
+ u8 segment_name[128];
+};
+
/** \brief Bind to a given URI
@param client_index - opaque cookie to identify the sender
@param context - sender context, to match reply w/ request
@@ -25,9 +87,7 @@ define bind_uri {
u32 client_index;
u32 context;
u32 accept_cookie;
- u32 initial_segment_size;
u8 uri[128];
- u64 options[16];
};
/** \brief Unbind a given URI
@@ -49,7 +109,10 @@ define unbind_uri {
@param accept_cookie - sender accept cookie, to identify this bind flavor
@param uri - a URI, e.g. "tcp4://0.0.0.0/0/80"
"tcp6://::/0/80" [ipv6], etc.
- @param options - socket options, fifo sizes, etc.
+ @param options - socket options, fifo sizes, etc. passed by vpp to the
+ server when redirecting connects
+ @param client_queue_address - binary API client queue address. Used by
+ local server when connect was redirected.
*/
define connect_uri {
u32 client_index;
@@ -62,18 +125,10 @@ define connect_uri {
/** \brief Bind reply
@param context - sender context, to match reply w/ request
@param retval - return code for the request
- @param event_queue_address - vpp event queue address or 0 if this
- connection shouldn't send events
- @param segment_name_length - length of segment name
- @param segment_name - name of segment client needs to attach to
*/
define bind_uri_reply {
u32 context;
i32 retval;
- u64 server_event_queue_address;
- u8 segment_name_length;
- u32 segment_size;
- u8 segment_name[128];
};
/** \brief unbind reply
@@ -88,43 +143,28 @@ define unbind_uri_reply {
/** \brief vpp->client, connect reply
@param context - sender context, to match reply w/ request
@param retval - return code for the request
+ @param handle - session handle
@param server_rx_fifo - rx (vpp -> vpp-client) fifo address
@param server_tx_fifo - tx (vpp-client -> vpp) fifo address
- @param session_index - session index;
- @param session_thread_index - session thread index
- @param session_type - session thread type
@param vpp_event_queue_address - vpp's event queue address
- @param client_event_queue_address - client's event queue address
+ @param segment_size - size of segment to be attached. Only for redirects.
@param segment_name_length - non-zero if the client needs to attach to
- the fifo segment
+ the fifo segment. This should only happen
+ if session was redirected.
@param segment_name - set if the client needs to attach to the segment
*/
define connect_uri_reply {
u32 context;
i32 retval;
+ u64 handle;
u64 server_rx_fifo;
u64 server_tx_fifo;
- u32 session_index;
- u32 session_thread_index;
- u8 session_type;
- u64 client_event_queue_address;
u64 vpp_event_queue_address;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[128];
};
-/** \brief vpp->client, please map an additional shared memory segment
- @param context - sender context, to match reply w/ request
- @param segment_name -
-*/
-define map_another_segment {
- u32 client_index;
- u32 context;
- u32 segment_size;
- u8 segment_name[128];
-};
-
/** \brief client->vpp
@param context - sender context, to match reply w/ request
@param retval - return code for the request
@@ -136,25 +176,27 @@ define map_another_segment_reply {
/** \brief vpp->client, accept this session
@param context - sender context, to match reply w/ request
- @param accept_cookie - tells client which bind flavor just occurred
+ @param listener_handle - tells client which listener this pertains to
+ @param handle - unique session identifier
+ @param session_thread_index - thread index of new session
@param rx_fifo_address - rx (vpp -> vpp-client) fifo address
@param tx_fifo_address - tx (vpp-client -> vpp) fifo address
- @param session_index - index of new session
- @param session_thread_index - thread index of new session
@param vpp_event_queue_address - vpp's event queue address
- @param session_type - type of session
-
+ @param port - remote port
+ @param is_ip4 - 1 if the ip is ip4
+ @param ip - remote ip
*/
define accept_session {
u32 client_index;
u32 context;
- u32 accept_cookie;
+ u64 listener_handle;
+ u64 handle;
u64 server_rx_fifo;
u64 server_tx_fifo;
- u32 session_index;
- u32 session_thread_index;
u64 vpp_event_queue_address;
- u8 session_type;
+ u16 port;
+ u8 is_ip4;
+ u8 ip[16];
};
/** \brief client->vpp, reply to an accept message
@@ -167,23 +209,19 @@ define accept_session {
define accept_session_reply {
u32 context;
i32 retval;
- u8 session_type;
- u8 session_thread_index;
- u32 session_index;
+ u64 handle;
};
/** \brief bidirectional disconnect API
@param client_index - opaque cookie to identify the sender
client to vpp direction only
@param context - sender context, to match reply w/ request
- @param session_index - cookie #1 from accept_session / connect_reply
- @param session_thread_index - cookie #2
+ @param handle - session handle obtained from accept/connect
*/
define disconnect_session {
u32 client_index;
u32 context;
- u32 session_index;
- u32 session_thread_index;
+ u64 handle;
};
/** \brief bidirectional disconnect reply API
@@ -191,31 +229,25 @@ define disconnect_session {
client to vpp direction only
@param context - sender context, to match reply w/ request
@param retval - return code for the request
- @param session_index - session index from accept_session / connect_reply
- @param session_thread_index - thread index from accept_session /
- connect_reply
+ @param handle - session handle
*/
define disconnect_session_reply {
u32 client_index;
u32 context;
i32 retval;
- u32 session_index;
- u32 session_thread_index;
+ u64 handle;
};
/** \brief vpp->client reset session API
@param client_index - opaque cookie to identify the sender
client to vpp direction only
@param context - sender context, to match reply w/ request
- @param session_index - session index from accept_session / connect_reply
- @param session_thread_index - thread index from accept_session /
- connect_reply
+ @param handle - session handle obtained via accept/connects
*/
define reset_session {
u32 client_index;
u32 context;
- u32 session_index;
- u32 session_thread_index;
+ u64 handle;
};
/** \brief client->vpp reset session reply
@@ -223,16 +255,13 @@ define reset_session {
client to vpp direction only
@param context - sender context, to match reply w/ request
@param retval - return code for the request
- @param session_index - session index from accept_session / connect_reply
- @param session_thread_index - thread index from accept_session /
- connect_reply
+ @param handle - session handle obtained via accept/connect
*/
define reset_session_reply {
u32 client_index;
u32 context;
i32 retval;
- u32 session_index;
- u32 session_thread_index;
+ u64 handle;
};
/** \brief Bind to an ip:port pair for a given transport protocol
@@ -277,7 +306,7 @@ define unbind_sock {
@param proto - protocol 0 - TCP 1 - UDP
@param client_queue_address - client's API queue address. Non-zero when
used to perform redirects
- @param options - socket options, fifo sizes, etc.
+ @param options - socket options, fifo sizes, etc. when doing redirects
*/
define connect_sock {
u32 client_index;
@@ -326,7 +355,7 @@ define unbind_sock_reply {
@param server_rx_fifo - rx (vpp -> vpp-client) fifo address
@param server_tx_fifo - tx (vpp-client -> vpp) fifo address
@param vpp_event_queue_address - vpp's event queue address
- @param client_event_queue_address - client's event queue address
+ @param segment_size - size of segment to be attached. Only for redirects.
@param segment_name_length - non-zero if the client needs to attach to
the fifo segment
@param segment_name - set if the client needs to attach to the segment
@@ -337,92 +366,12 @@ define connect_sock_reply {
u64 handle;
u64 server_rx_fifo;
u64 server_tx_fifo;
- u64 client_event_queue_address;
u64 vpp_event_queue_address;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[128];
};
-/** \brief bidirectional disconnect API
- @param client_index - opaque cookie to identify the sender
- client to vpp direction only
- @param context - sender context, to match reply w/ request
- @param handle - session handle obtained through accept/connect
-*/
-define disconnect_sock {
- u32 client_index;
- u32 context;
- u64 handle;
-};
-
-/** \brief bidirectional disconnect reply API
- @param client_index - opaque cookie to identify the sender
- client to vpp direction only
- @param client_context - sender context, to match reply w/ request
- @param handle - session handle obtained through accept/connect
-*/
-define disconnect_sock_reply {
- u32 client_index;
- u32 context;
- i32 retval;
- u64 handle;
-};
-
-/** \brief vpp->client, accept this session
- @param context - sender context, to match reply w/ request
- @param accept_cookie - tells client which bind flavor just occurred
- @param handle - session handle obtained through accept/connect
- @param rx_fifo_address - rx (vpp -> vpp-client) fifo address
- @param tx_fifo_address - tx (vpp-client -> vpp) fifo address
- @param vpp_event_queue_address - vpp's event queue address
-*/
-define accept_sock {
- u32 client_index;
- u32 context;
- u32 accept_cookie;
- u64 handle;
- u64 server_rx_fifo;
- u64 server_tx_fifo;
- u64 vpp_event_queue_address;
-};
-
-/** \brief client->vpp, reply to an accept message
- @param context - sender context, to match reply w/ request
- @param retval - return code for the request
- @param handle - session handle obtained through accept/connect
-*/
-define accept_sock_reply {
- u32 context;
- i32 retval;
- u64 handle;
-};
-
-/** \brief vpp->client reset session API
- @param client_index - opaque cookie to identify the sender
- client to vpp direction only
- @param context - sender context, to match reply w/ request
- @param handle - session handle obtained through accept/connect
-*/
-define reset_sock {
- u32 client_index;
- u32 context;
- u64 handle;
-};
-
-/** \brief client->vpp reset session reply
- @param client_index - opaque cookie to identify the sender
- client to vpp direction only
- @param context - sender context, to match reply w/ request
- @param handle - session handle obtained through accept/connect
-*/
-define reset_sock_reply {
- u32 client_index;
- u32 context;
- i32 retval;
- u64 handle;
-};
-
/** \brief enable/disable session layer
@param client_index - opaque cookie to identify the sender
client to vpp direction only
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 8e2b2616..e6cfe7da 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -36,15 +36,14 @@ session_manager_main_t session_manager_main;
* Session lookup key; (src-ip, dst-ip, src-port, dst-port, session-type)
* Value: (owner thread index << 32 | session_index);
*/
-static void
-stream_session_table_add_for_tc (u8 sst, transport_connection_t * tc,
- u64 value)
+void
+stream_session_table_add_for_tc (transport_connection_t * tc, u64 value)
{
session_manager_main_t *smm = &session_manager_main;
session_kv4_t kv4;
session_kv6_t kv6;
- switch (sst)
+ switch (tc->proto)
{
case SESSION_TYPE_IP4_UDP:
case SESSION_TYPE_IP4_TCP:
@@ -72,12 +71,12 @@ stream_session_table_add (session_manager_main_t * smm, stream_session_t * s,
tc = tp_vfts[s->session_type].get_connection (s->connection_index,
s->thread_index);
- stream_session_table_add_for_tc (s->session_type, tc, value);
+ stream_session_table_add_for_tc (tc, value);
}
static void
-stream_session_half_open_table_add (u8 sst, transport_connection_t * tc,
- u64 value)
+stream_session_half_open_table_add (session_type_t sst,
+ transport_connection_t * tc, u64 value)
{
session_manager_main_t *smm = &session_manager_main;
session_kv4_t kv4;
@@ -105,14 +104,13 @@ stream_session_half_open_table_add (u8 sst, transport_connection_t * tc,
}
}
-static int
-stream_session_table_del_for_tc (session_manager_main_t * smm, u8 sst,
- transport_connection_t * tc)
+int
+stream_session_table_del_for_tc (transport_connection_t * tc)
{
+ session_manager_main_t *smm = &session_manager_main;
session_kv4_t kv4;
session_kv6_t kv6;
-
- switch (sst)
+ switch (tc->proto)
{
case SESSION_TYPE_IP4_UDP:
case SESSION_TYPE_IP4_TCP:
@@ -141,7 +139,7 @@ stream_session_table_del (session_manager_main_t * smm, stream_session_t * s)
ts = tp_vfts[s->session_type].get_connection (s->connection_index,
s->thread_index);
- return stream_session_table_del_for_tc (smm, s->session_type, ts);
+ return stream_session_table_del_for_tc (ts);
}
static void
@@ -383,7 +381,7 @@ stream_session_lookup_transport6 (ip6_address_t * lcl, ip6_address_t * rmt,
* Allocate vpp event queue (once) per worker thread
*/
void
-vpp_session_event_queue_allocate (session_manager_main_t * smm,
+session_vpp_event_queue_allocate (session_manager_main_t * smm,
u32 thread_index)
{
api_main_t *am = &api_main;
@@ -406,266 +404,24 @@ vpp_session_event_queue_allocate (session_manager_main_t * smm,
}
}
-void
-session_manager_get_segment_info (u32 index, u8 ** name, u32 * size)
-{
- svm_fifo_segment_private_t *s;
- s = svm_fifo_get_segment (index);
- *name = s->h->segment_name;
- *size = s->ssvm.ssvm_size;
-}
-
-always_inline int
-session_manager_add_segment_i (session_manager_main_t * smm,
- session_manager_t * sm,
- u32 segment_size, u8 * segment_name)
-{
- svm_fifo_segment_create_args_t _ca, *ca = &_ca;
- int rv;
-
- memset (ca, 0, sizeof (*ca));
-
- ca->segment_name = (char *) segment_name;
- ca->segment_size = segment_size;
-
- rv = svm_fifo_segment_create (ca);
- if (rv)
- {
- clib_warning ("svm_fifo_segment_create ('%s', %d) failed",
- ca->segment_name, ca->segment_size);
- vec_free (segment_name);
- return -1;
- }
-
- vec_add1 (sm->segment_indices, ca->new_segment_index);
-
- return 0;
-}
-
-static int
-session_manager_add_segment (session_manager_main_t * smm,
- session_manager_t * sm)
-{
- u8 *segment_name;
- svm_fifo_segment_create_args_t _ca, *ca = &_ca;
- u32 add_segment_size;
- u32 default_segment_size = 128 << 10;
-
- memset (ca, 0, sizeof (*ca));
- segment_name = format (0, "%d-%d%c", getpid (),
- smm->unique_segment_name_counter++, 0);
- add_segment_size =
- sm->add_segment_size ? sm->add_segment_size : default_segment_size;
-
- return session_manager_add_segment_i (smm, sm, add_segment_size,
- segment_name);
-}
-
-int
-session_manager_add_first_segment (session_manager_main_t * smm,
- session_manager_t * sm, u32 segment_size,
- u8 ** segment_name)
-{
- svm_fifo_segment_create_args_t _ca, *ca = &_ca;
- memset (ca, 0, sizeof (*ca));
- *segment_name = format (0, "%d-%d%c", getpid (),
- smm->unique_segment_name_counter++, 0);
- return session_manager_add_segment_i (smm, sm, segment_size, *segment_name);
-}
-
-void
-session_manager_del (session_manager_main_t * smm, session_manager_t * sm)
-{
- u32 *deleted_sessions = 0;
- u32 *deleted_thread_indices = 0;
- int i, j;
-
- /* Across all fifo segments used by the server */
- for (j = 0; j < vec_len (sm->segment_indices); j++)
- {
- svm_fifo_segment_private_t *fifo_segment;
- svm_fifo_t **fifos;
- /* Vector of fifos allocated in the segment */
- fifo_segment = svm_fifo_get_segment (sm->segment_indices[j]);
- fifos = (svm_fifo_t **) fifo_segment->h->fifos;
-
- /*
- * Remove any residual sessions from the session lookup table
- * Don't bother deleting the individual fifos, we're going to
- * throw away the fifo segment in a minute.
- */
- for (i = 0; i < vec_len (fifos); i++)
- {
- svm_fifo_t *fifo;
- u32 session_index, thread_index;
- stream_session_t *session;
-
- fifo = fifos[i];
- session_index = fifo->server_session_index;
- thread_index = fifo->server_thread_index;
-
- session = pool_elt_at_index (smm->sessions[thread_index],
- session_index);
-
- /* Add to the deleted_sessions vector (once!) */
- if (!session->is_deleted)
- {
- session->is_deleted = 1;
- vec_add1 (deleted_sessions,
- session - smm->sessions[thread_index]);
- vec_add1 (deleted_thread_indices, thread_index);
- }
- }
-
- for (i = 0; i < vec_len (deleted_sessions); i++)
- {
- stream_session_t *session;
-
- session =
- pool_elt_at_index (smm->sessions[deleted_thread_indices[i]],
- deleted_sessions[i]);
-
- /* Instead of directly removing the session call disconnect */
- stream_session_disconnect (session);
-
- /*
- stream_session_table_del (smm, session);
- pool_put(smm->sessions[deleted_thread_indices[i]], session);
- */
- }
-
- vec_reset_length (deleted_sessions);
- vec_reset_length (deleted_thread_indices);
-
- /* Instead of removing the segment, test when removing the session if
- * the segment can be removed
- */
- /* svm_fifo_segment_delete (fifo_segment); */
- }
-
- vec_free (deleted_sessions);
- vec_free (deleted_thread_indices);
-}
-
-int
-session_manager_allocate_session_fifos (session_manager_main_t * smm,
- session_manager_t * sm,
- svm_fifo_t ** server_rx_fifo,
- svm_fifo_t ** server_tx_fifo,
- u32 * fifo_segment_index,
- u8 * added_a_segment)
-{
- svm_fifo_segment_private_t *fifo_segment;
- u32 fifo_size, default_fifo_size = 1 << 16; /* TODO config */
- int i;
-
- *added_a_segment = 0;
-
- /* Allocate svm fifos */
- ASSERT (vec_len (sm->segment_indices));
-
-again:
- for (i = 0; i < vec_len (sm->segment_indices); i++)
- {
- *fifo_segment_index = sm->segment_indices[i];
- fifo_segment = svm_fifo_get_segment (*fifo_segment_index);
-
- fifo_size = sm->rx_fifo_size;
- fifo_size = (fifo_size == 0) ? default_fifo_size : fifo_size;
- *server_rx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, fifo_size);
-
- fifo_size = sm->tx_fifo_size;
- fifo_size = (fifo_size == 0) ? default_fifo_size : fifo_size;
- *server_tx_fifo = svm_fifo_segment_alloc_fifo (fifo_segment, fifo_size);
-
- if (*server_rx_fifo == 0)
- {
- /* This would be very odd, but handle it... */
- if (*server_tx_fifo != 0)
- {
- svm_fifo_segment_free_fifo (fifo_segment, *server_tx_fifo);
- *server_tx_fifo = 0;
- }
- continue;
- }
- if (*server_tx_fifo == 0)
- {
- if (*server_rx_fifo != 0)
- {
- svm_fifo_segment_free_fifo (fifo_segment, *server_rx_fifo);
- *server_rx_fifo = 0;
- }
- continue;
- }
- break;
- }
-
- /* See if we're supposed to create another segment */
- if (*server_rx_fifo == 0)
- {
- if (sm->add_segment)
- {
- if (*added_a_segment)
- {
- clib_warning ("added a segment, still cant allocate a fifo");
- return SESSION_ERROR_NEW_SEG_NO_SPACE;
- }
-
- if (session_manager_add_segment (smm, sm))
- return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
-
- *added_a_segment = 1;
- goto again;
- }
- else
- {
- clib_warning ("No space to allocate fifos!");
- return SESSION_ERROR_NO_SPACE;
- }
- }
- return 0;
-}
-
int
-stream_session_create_i (session_manager_main_t * smm, application_t * app,
- transport_connection_t * tc,
+stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
stream_session_t ** ret_s)
{
- int rv;
+ session_manager_main_t *smm = &session_manager_main;
svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0;
u32 fifo_segment_index;
- u32 pool_index, seg_size;
+ u32 pool_index;
stream_session_t *s;
u64 value;
u32 thread_index = tc->thread_index;
- session_manager_t *sm;
- u8 segment_added;
- u8 *seg_name;
-
- sm = session_manager_get (app->session_manager_index);
-
- /* Check the API queue */
- if (app->mode == APP_SERVER && application_api_queue_is_full (app))
- return SESSION_ERROR_API_QUEUE_FULL;
+ int rv;
- if ((rv = session_manager_allocate_session_fifos (smm, sm, &server_rx_fifo,
- &server_tx_fifo,
- &fifo_segment_index,
- &segment_added)))
+ if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo,
+ &server_tx_fifo,
+ &fifo_segment_index)))
return rv;
- if (segment_added && app->mode == APP_SERVER)
- {
- /* Send an API message to the external server, to map new segment */
- ASSERT (app->cb_fns.add_segment_callback);
-
- session_manager_get_segment_info (fifo_segment_index, &seg_name,
- &seg_size);
- if (app->cb_fns.add_segment_callback (app->api_client_index, seg_name,
- seg_size))
- return VNET_API_ERROR_URI_FIFO_CREATE_FAILED;
- }
-
/* Create the session */
pool_get (smm->sessions[thread_index], s);
memset (s, 0, sizeof (*s));
@@ -682,10 +438,9 @@ stream_session_create_i (session_manager_main_t * smm, application_t * app,
s->server_tx_fifo = server_tx_fifo;
/* Initialize state machine, such as it is... */
- s->session_type = app->session_type;
+ s->session_type = tc->proto;
s->session_state = SESSION_STATE_CONNECTING;
- s->app_index = application_get_index (app);
- s->server_segment_index = fifo_segment_index;
+ s->svm_segment_index = fifo_segment_index;
s->thread_index = thread_index;
s->session_index = pool_index;
@@ -697,7 +452,7 @@ stream_session_create_i (session_manager_main_t * smm, application_t * app,
/* Add to the main lookup table */
value = (((u64) thread_index) << 32) | (u64) s->session_index;
- stream_session_table_add_for_tc (app->session_type, tc, value);
+ stream_session_table_add_for_tc (tc, value);
*ret_s = s;
@@ -881,94 +636,6 @@ session_manager_flush_enqueue_events (u32 thread_index)
return errors;
}
-/*
- * Start listening on server's ip/port pair for requested transport.
- *
- * Creates a 'dummy' stream session with state LISTENING to be used in session
- * lookups, prior to establishing connection. Requests transport to build
- * it's own specific listening connection.
- */
-int
-stream_session_start_listen (u32 server_index, ip46_address_t * ip, u16 port)
-{
- session_manager_main_t *smm = &session_manager_main;
- stream_session_t *s;
- transport_connection_t *tc;
- application_t *srv;
- u32 tci;
-
- srv = application_get (server_index);
-
- pool_get (smm->listen_sessions[srv->session_type], s);
- memset (s, 0, sizeof (*s));
-
- s->session_type = srv->session_type;
- s->session_state = SESSION_STATE_LISTENING;
- s->session_index = s - smm->listen_sessions[srv->session_type];
- s->app_index = srv->index;
-
- /* Transport bind/listen */
- tci = tp_vfts[srv->session_type].bind (s->session_index, ip, port);
-
- /* Attach transport to session */
- s->connection_index = tci;
- tc = tp_vfts[srv->session_type].get_listener (tci);
-
- srv->session_index = s->session_index;
-
- /* Add to the main lookup table */
- stream_session_table_add_for_tc (s->session_type, tc, s->session_index);
-
- return 0;
-}
-
-void
-stream_session_stop_listen (u32 server_index)
-{
- session_manager_main_t *smm = &session_manager_main;
- stream_session_t *listener;
- transport_connection_t *tc;
- application_t *srv;
-
- srv = application_get (server_index);
- listener = pool_elt_at_index (smm->listen_sessions[srv->session_type],
- srv->session_index);
-
- tc = tp_vfts[srv->session_type].get_listener (listener->connection_index);
- stream_session_table_del_for_tc (smm, listener->session_type, tc);
-
- tp_vfts[srv->session_type].unbind (listener->connection_index);
- pool_put (smm->listen_sessions[srv->session_type], listener);
-}
-
-int
-connect_server_add_segment_cb (application_t * ss, char *segment_name,
- u32 segment_size)
-{
- /* Does exactly nothing, but die */
- ASSERT (0);
- return 0;
-}
-
-void
-connects_session_manager_init (session_manager_main_t * smm, u8 session_type)
-{
- session_manager_t *sm;
- u32 connect_fifo_size = 256 << 10; /* Config? */
- u32 default_segment_size = 1 << 20;
-
- pool_get (smm->session_managers, sm);
- memset (sm, 0, sizeof (*sm));
-
- sm->add_segment_size = default_segment_size;
- sm->rx_fifo_size = connect_fifo_size;
- sm->tx_fifo_size = connect_fifo_size;
- sm->add_segment = 1;
-
- session_manager_add_segment (smm, sm);
- smm->connect_manager_index[session_type] = sm - smm->session_managers;
-}
-
void
stream_session_connect_notify (transport_connection_t * tc, u8 sst,
u8 is_fail)
@@ -976,34 +643,36 @@ stream_session_connect_notify (transport_connection_t * tc, u8 sst,
session_manager_main_t *smm = &session_manager_main;
application_t *app;
stream_session_t *new_s = 0;
- u64 value;
+ u64 handle;
+ u32 api_context = 0;
- value = stream_session_half_open_lookup (smm, &tc->lcl_ip, &tc->rmt_ip,
- tc->lcl_port, tc->rmt_port,
- tc->proto);
- if (value == HALF_OPEN_LOOKUP_INVALID_VALUE)
+ handle = stream_session_half_open_lookup (smm, &tc->lcl_ip, &tc->rmt_ip,
+ tc->lcl_port, tc->rmt_port,
+ tc->proto);
+ if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
{
clib_warning ("This can't be good!");
return;
}
- app = application_get (value >> 32);
+ /* Get the app's index from the handle we stored when opening connection */
+ app = application_get (handle >> 32);
+ api_context = tc->s_index;
if (!is_fail)
{
- /* Create new session (server segments are allocated if needed) */
- if (stream_session_create_i (smm, app, tc, &new_s))
- return;
+ segment_manager_t *sm;
+ sm = application_get_connect_segment_manager (app);
- app->session_index = stream_session_get_index (new_s);
- app->thread_index = new_s->thread_index;
+ /* Create new session (svm segments are allocated if needed) */
+ if (stream_session_create_i (sm, tc, &new_s))
+ return;
- /* Allocate vpp event queue for this thread if needed */
- vpp_session_event_queue_allocate (smm, tc->thread_index);
+ new_s->app_index = app->index;
}
/* Notify client */
- app->cb_fns.session_connected_callback (app->api_client_index, new_s,
+ app->cb_fns.session_connected_callback (app->index, api_context, new_s,
is_fail);
/* Cleanup session lookup */
@@ -1046,48 +715,13 @@ void
stream_session_delete (stream_session_t * s)
{
session_manager_main_t *smm = vnet_get_session_manager_main ();
- svm_fifo_segment_private_t *fifo_segment;
- application_t *app;
/* Delete from the main lookup table. */
stream_session_table_del (smm, s);
/* Cleanup fifo segments */
- fifo_segment = svm_fifo_get_segment (s->server_segment_index);
- svm_fifo_segment_free_fifo (fifo_segment, s->server_rx_fifo);
- svm_fifo_segment_free_fifo (fifo_segment, s->server_tx_fifo);
-
- app = application_get_if_valid (s->app_index);
-
- /* No app. A possibility: after disconnect application called unbind */
- if (!app)
- return;
-
- if (app->mode == APP_CLIENT)
- {
- /* Cleanup app if client */
- application_del (app);
- }
- else if (app->mode == APP_SERVER)
- {
- session_manager_t *sm;
- svm_fifo_segment_private_t *fifo_segment;
- svm_fifo_t **fifos;
- u32 fifo_index;
-
- /* For server, see if any segments can be removed */
- sm = session_manager_get (app->session_manager_index);
-
- /* Delete fifo */
- fifo_segment = svm_fifo_get_segment (s->server_segment_index);
- fifos = (svm_fifo_t **) fifo_segment->h->fifos;
-
- fifo_index = svm_fifo_segment_index (fifo_segment);
-
- /* Remove segment only if it holds no fifos and not the first */
- if (sm->segment_indices[0] != fifo_index && vec_len (fifos) == 0)
- svm_fifo_segment_delete (fifo_segment);
- }
+ segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo,
+ s->server_tx_fifo);
pool_put (smm->sessions[s->thread_index], s);
}
@@ -1134,21 +768,22 @@ int
stream_session_accept (transport_connection_t * tc, u32 listener_index,
u8 sst, u8 notify)
{
- session_manager_main_t *smm = &session_manager_main;
application_t *server;
stream_session_t *s, *listener;
+ segment_manager_t *sm;
int rv;
/* Find the server */
- listener = pool_elt_at_index (smm->listen_sessions[sst], listener_index);
+ listener = listen_session_get (sst, listener_index);
server = application_get (listener->app_index);
- if ((rv = stream_session_create_i (smm, server, tc, &s)))
+ sm = application_get_listen_segment_manager (server, listener);
+ if ((rv = stream_session_create_i (sm, tc, &s)))
return rv;
- /* Allocate vpp event queue for this thread if needed */
- vpp_session_event_queue_allocate (smm, tc->thread_index);
+ s->app_index = server->index;
+ s->listener_index = listener_index;
/* Shoulder-tap the server */
if (notify)
@@ -1159,38 +794,112 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index,
return 0;
}
+/**
+ * Ask transport to open connection to remote transport endpoint.
+ *
+ * Stores handle for matching request with reply since the call can be
+ * asynchronous. For instance, for TCP the 3-way handshake must complete
+ * before reply comes. Session is only created once connection is established.
+ *
+ * @param app_index Index of the application requesting the connect
+ * @param st Session type requested.
+ * @param tep Remote transport endpoint
+ * @param res Resulting transport connection .
+ */
int
-stream_session_open (u8 sst, ip46_address_t * addr, u16 port_host_byte_order,
- u32 app_index)
+stream_session_open (u32 app_index, session_type_t st,
+ transport_endpoint_t * tep,
+ transport_connection_t ** res)
{
transport_connection_t *tc;
- u32 tci;
- u64 value;
int rv;
+ u64 handle;
- /* Ask transport to open connection */
- rv = tp_vfts[sst].open (addr, port_host_byte_order);
+ rv = tp_vfts[st].open (&tep->ip, tep->port);
if (rv < 0)
{
clib_warning ("Transport failed to open connection.");
return VNET_API_ERROR_SESSION_CONNECT_FAIL;
}
- tci = rv;
+ tc = tp_vfts[st].get_half_open ((u32) rv);
- /* Get transport connection */
- tc = tp_vfts[sst].get_half_open (tci);
-
- /* Store api_client_index and transport connection index */
- value = (((u64) app_index) << 32) | (u64) tc->c_index;
+ /* Save app and tc index. The latter is needed to help establish the
+ * connection while the former is needed when the connect notify comes
+ * and we have to notify the external app */
+ handle = (((u64) app_index) << 32) | (u64) tc->c_index;
/* Add to the half-open lookup table */
- stream_session_half_open_table_add (sst, tc, value);
+ stream_session_half_open_table_add (st, tc, handle);
+
+ *res = tc;
+
+ return 0;
+}
+
+/**
+ * Ask transport to listen on local transport endpoint.
+ *
+ * @param s Session for which listen will be called. Note that unlike
+ * established sessions, listen sessions are not associated to a
+ * thread.
+ * @param tep Local endpoint to be listened on.
+ */
+int
+stream_session_listen (stream_session_t * s, transport_endpoint_t * tep)
+{
+ transport_connection_t *tc;
+ u32 tci;
+
+ /* Transport bind/listen */
+ tci = tp_vfts[s->session_type].bind (s->session_index, &tep->ip, tep->port);
+
+ if (tci == (u32) ~ 0)
+ return -1;
+
+ /* Attach transport to session */
+ s->connection_index = tci;
+ tc = tp_vfts[s->session_type].get_listener (tci);
+
+ /* Weird but handle it ... */
+ if (tc == 0)
+ return -1;
+
+ /* Add to the main lookup table */
+ stream_session_table_add_for_tc (tc, s->session_index);
return 0;
}
/**
+ * Ask transport to stop listening on local transport endpoint.
+ *
+ * @param s Session to stop listening on. It must be in state LISTENING.
+ */
+int
+stream_session_stop_listen (stream_session_t * s)
+{
+ transport_connection_t *tc;
+
+ if (s->session_state != SESSION_STATE_LISTENING)
+ {
+ clib_warning ("not a listening session");
+ return -1;
+ }
+
+ tc = tp_vfts[s->session_type].get_listener (s->connection_index);
+ if (!tc)
+ {
+ clib_warning ("no transport");
+ return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
+ }
+
+ stream_session_table_del_for_tc (tc);
+ tp_vfts[s->session_type].unbind (s->connection_index);
+ return 0;
+}
+
+/**
* Disconnect session and propagate to transport. This should eventually
* result in a delete notification that allows us to cleanup session state.
* Called for both active/passive disconnects.
@@ -1297,6 +1006,10 @@ session_manager_main_enable (vlib_main_t * vm)
vec_validate (smm->last_event_poll_by_thread, num_threads - 1);
#endif
+ /* Allocate vpp event queues */
+ for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
+ session_vpp_event_queue_allocate (smm, i);
+
/* $$$$ preallocate hack config parameter */
for (i = 0; i < 200000; i++)
{
@@ -1322,9 +1035,6 @@ session_manager_main_enable (vlib_main_t * vm)
200000 /* $$$$ config parameter nbuckets */ ,
(64 << 20) /*$$$ config parameter table size */ );
- for (i = 0; i < SESSION_N_TYPES; i++)
- smm->connect_manager_index[i] = INVALID_INDEX;
-
smm->is_enabled = 1;
/* Enable TCP transport */
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 6878b4d2..6e4ea96d 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -21,6 +21,7 @@
#include <vppinfra/sparse_vec.h>
#include <svm/svm_fifo_segment.h>
#include <vnet/session/session_debug.h>
+#include <vnet/session/segment_manager.h>
#define HALF_OPEN_LOOKUP_INVALID_VALUE ((u64)~0)
#define INVALID_INDEX ((u32)~0)
@@ -107,6 +108,9 @@ typedef struct _stream_session_t
svm_fifo_t *server_rx_fifo;
svm_fifo_t *server_tx_fifo;
+ /** svm segment index where fifos were allocated */
+ u32 svm_segment_index;
+
/** Type */
u8 session_type;
@@ -133,27 +137,10 @@ typedef struct _stream_session_t
/** stream server pool index */
u32 app_index;
- /** svm segment index */
- u32 server_segment_index;
+ /** Parent listener session if the result of an accept */
+ u32 listener_index;
} stream_session_t;
-typedef struct _session_manager
-{
- /** segments mapped by this server */
- u32 *segment_indices;
-
- /** Session fifo sizes. They are provided for binds and take default
- * values for connects */
- u32 rx_fifo_size;
- u32 tx_fifo_size;
-
- /** Configured additional segment size */
- u32 add_segment_size;
-
- /** Flag that indicates if additional segments should be created */
- u8 add_segment;
-} session_manager_t;
-
/* Forward definition */
typedef struct _session_manager_main session_manager_main_t;
@@ -206,11 +193,6 @@ struct _session_manager_main
/** Unique segment name counter */
u32 unique_segment_name_counter;
- /* Connection manager used by incoming connects */
- u32 connect_manager_index[SESSION_N_TYPES];
-
- session_manager_t *session_managers;
-
/** Per transport rx function that can either dequeue or peek */
session_fifo_rx_fn *session_tx_fns[SESSION_N_TYPES];
@@ -242,37 +224,6 @@ vnet_get_session_manager_main ()
return &session_manager_main;
}
-always_inline session_manager_t *
-session_manager_get (u32 index)
-{
- return pool_elt_at_index (session_manager_main.session_managers, index);
-}
-
-always_inline unix_shared_memory_queue_t *
-session_manager_get_vpp_event_queue (u32 thread_index)
-{
- return session_manager_main.vpp_event_queues[thread_index];
-}
-
-always_inline session_manager_t *
-connects_session_manager_get (session_manager_main_t * smm,
- session_type_t session_type)
-{
- return pool_elt_at_index (smm->session_managers,
- smm->connect_manager_index[session_type]);
-}
-
-void session_manager_get_segment_info (u32 index, u8 ** name, u32 * size);
-int session_manager_flush_enqueue_events (u32 thread_index);
-int
-session_manager_add_first_segment (session_manager_main_t * smm,
- session_manager_t * sm, u32 segment_size,
- u8 ** segment_name);
-void
-session_manager_del (session_manager_main_t * smm, session_manager_t * sm);
-void
-connects_session_manager_init (session_manager_main_t * smm, u8 session_type);
-
/*
* Stream session functions
*/
@@ -300,6 +251,8 @@ transport_connection_t
u32 thread_index);
stream_session_t *stream_session_lookup_listener (ip46_address_t * lcl,
u16 lcl_port, u8 proto);
+void stream_session_table_add_for_tc (transport_connection_t * tc, u64 value);
+int stream_session_table_del_for_tc (transport_connection_t * tc);
always_inline stream_session_t *
stream_session_get_tsi (u64 ti_and_si, u32 thread_index)
@@ -310,7 +263,7 @@ stream_session_get_tsi (u64 ti_and_si, u32 thread_index)
}
always_inline stream_session_t *
-stream_session_get (u64 si, u32 thread_index)
+stream_session_get (u32 si, u32 thread_index)
{
return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
}
@@ -327,6 +280,40 @@ stream_session_get_if_valid (u64 si, u32 thread_index)
return pool_elt_at_index (session_manager_main.sessions[thread_index], si);
}
+always_inline u64
+stream_session_handle (stream_session_t * s)
+{
+ return ((u64) s->thread_index << 32) | (u64) s->session_index;
+}
+
+always_inline u32
+stream_session_index_from_handle (u64 handle)
+{
+ return handle & 0xFFFFFFFF;
+}
+
+always_inline u32
+stream_session_thread_from_handle (u64 handle)
+{
+ return handle >> 32;
+}
+
+always_inline void
+stream_session_parse_handle (u64 handle, u32 * index, u32 * thread_index)
+{
+ *index = stream_session_index_from_handle (handle);
+ *thread_index = stream_session_thread_from_handle (handle);
+}
+
+always_inline stream_session_t *
+stream_session_get_from_handle (u64 handle)
+{
+ session_manager_main_t *smm = &session_manager_main;
+ return pool_elt_at_index (smm->sessions[stream_session_thread_from_handle
+ (handle)],
+ stream_session_index_from_handle (handle));
+}
+
always_inline stream_session_t *
stream_session_listener_get (u8 sst, u64 si)
{
@@ -375,13 +362,14 @@ void stream_session_reset_notify (transport_connection_t * tc);
int
stream_session_accept (transport_connection_t * tc, u32 listener_index,
u8 sst, u8 notify);
-int stream_session_open (u8 sst, ip46_address_t * addr,
- u16 port_host_byte_order, u32 api_client_index);
+int
+stream_session_open (u32 app_index, session_type_t st,
+ transport_endpoint_t * tep,
+ transport_connection_t ** tc);
+int stream_session_listen (stream_session_t * s, transport_endpoint_t * tep);
+int stream_session_stop_listen (stream_session_t * s);
void stream_session_disconnect (stream_session_t * s);
void stream_session_cleanup (stream_session_t * s);
-int
-stream_session_start_listen (u32 server_index, ip46_address_t * ip, u16 port);
-void stream_session_stop_listen (u32 server_index);
u8 *format_stream_session (u8 * s, va_list * args);
@@ -390,6 +378,71 @@ transport_proto_vft_t *session_get_transport_vft (u8 type);
clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
+always_inline unix_shared_memory_queue_t *
+session_manager_get_vpp_event_queue (u32 thread_index)
+{
+ return session_manager_main.vpp_event_queues[thread_index];
+}
+
+int session_manager_flush_enqueue_events (u32 thread_index);
+
+always_inline u64
+listen_session_get_handle (stream_session_t * s)
+{
+ ASSERT (s->session_state == SESSION_STATE_LISTENING);
+ return ((u64) s->session_type << 32) | s->session_index;
+}
+
+always_inline stream_session_t *
+listen_session_get_from_handle (u64 handle)
+{
+ session_manager_main_t *smm = &session_manager_main;
+ stream_session_t *s;
+ u32 type, index;
+ type = handle >> 32;
+ index = handle & 0xFFFFFFFF;
+
+ if (pool_is_free_index (smm->listen_sessions[type], index))
+ return 0;
+
+ s = pool_elt_at_index (smm->listen_sessions[type], index);
+ ASSERT (s->session_state == SESSION_STATE_LISTENING);
+ return s;
+}
+
+always_inline stream_session_t *
+listen_session_new (session_type_t type)
+{
+ stream_session_t *s;
+ pool_get (session_manager_main.listen_sessions[type], s);
+ memset (s, 0, sizeof (*s));
+
+ s->session_type = type;
+ s->session_state = SESSION_STATE_LISTENING;
+ s->session_index = s - session_manager_main.listen_sessions[type];
+
+ return s;
+}
+
+always_inline stream_session_t *
+listen_session_get (session_type_t type, u32 index)
+{
+ return pool_elt_at_index (session_manager_main.listen_sessions[type],
+ index);
+}
+
+always_inline void
+listen_session_del (stream_session_t * s)
+{
+ pool_put (session_manager_main.listen_sessions[s->session_type], s);
+}
+
+always_inline u8
+session_manager_is_enabled ()
+{
+ return session_manager_main.is_enabled == 1;
+}
+
#endif /* __included_session_h__ */
/*
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 9c38428a..a82dfe0b 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -38,6 +38,8 @@
#define foreach_session_api_msg \
_(MAP_ANOTHER_SEGMENT_REPLY, map_another_segment_reply) \
+_(APPLICATION_ATTACH, application_attach) \
+_(APPLICATION_DETACH, application_detach) \
_(BIND_URI, bind_uri) \
_(UNBIND_URI, unbind_uri) \
_(CONNECT_URI, connect_uri) \
@@ -48,13 +50,8 @@ _(RESET_SESSION_REPLY, reset_session_reply) \
_(BIND_SOCK, bind_sock) \
_(UNBIND_SOCK, unbind_sock) \
_(CONNECT_SOCK, connect_sock) \
-_(DISCONNECT_SOCK, disconnect_sock) \
-_(DISCONNECT_SOCK_REPLY, disconnect_sock_reply) \
-_(ACCEPT_SOCK_REPLY, accept_sock_reply) \
-_(RESET_SOCK_REPLY, reset_sock_reply) \
_(SESSION_ENABLE_DISABLE, session_enable_disable) \
-
static int
send_add_segment_callback (u32 api_client_index, const u8 * segment_name,
u32 segment_size)
@@ -80,11 +77,14 @@ send_add_segment_callback (u32 api_client_index, const u8 * segment_name,
}
static int
-send_session_accept_uri_callback (stream_session_t * s)
+send_session_accept_callback (stream_session_t * s)
{
vl_api_accept_session_t *mp;
unix_shared_memory_queue_t *q, *vpp_queue;
application_t *server = application_get (s->app_index);
+ transport_connection_t *tc;
+ transport_proto_vft_t *tp_vft;
+ stream_session_t *listener;
q = vl_api_client_index_to_input_queue (server->api_client_index);
vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
@@ -93,24 +93,28 @@ send_session_accept_uri_callback (stream_session_t * s)
return -1;
mp = vl_msg_api_alloc (sizeof (*mp));
- mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
+ memset (mp, 0, sizeof (*mp));
- /* Note: session_type is the first octet in all types of sessions */
+ mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION);
- mp->accept_cookie = server->accept_cookie;
+ listener = listen_session_get (s->session_type, s->listener_index);
+ tp_vft = session_get_transport_vft (s->session_type);
+ tc = tp_vft->get_connection (s->connection_index, s->thread_index);
+ mp->listener_handle = listen_session_get_handle (listener);
+ mp->handle = stream_session_handle (s);
mp->server_rx_fifo = (u64) s->server_rx_fifo;
mp->server_tx_fifo = (u64) s->server_tx_fifo;
- mp->session_thread_index = s->thread_index;
- mp->session_index = s->session_index;
- mp->session_type = s->session_type;
mp->vpp_event_queue_address = (u64) vpp_queue;
+ mp->port = tc->rmt_port;
+ mp->is_ip4 = tc->is_ip4;
+ clib_memcpy (&mp->ip, &tc->rmt_ip, sizeof (tc->rmt_ip));
vl_msg_api_send_shmem (q, (u8 *) & mp);
return 0;
}
static void
-send_session_disconnect_uri_callback (stream_session_t * s)
+send_session_disconnect_callback (stream_session_t * s)
{
vl_api_disconnect_session_t *mp;
unix_shared_memory_queue_t *q;
@@ -124,14 +128,12 @@ send_session_disconnect_uri_callback (stream_session_t * s)
mp = vl_msg_api_alloc (sizeof (*mp));
memset (mp, 0, sizeof (*mp));
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION);
-
- mp->session_thread_index = s->thread_index;
- mp->session_index = s->session_index;
+ mp->handle = stream_session_handle (s);
vl_msg_api_send_shmem (q, (u8 *) & mp);
}
static void
-send_session_reset_uri_callback (stream_session_t * s)
+send_session_reset_callback (stream_session_t * s)
{
vl_api_reset_session_t *mp;
unix_shared_memory_queue_t *q;
@@ -145,22 +147,20 @@ send_session_reset_uri_callback (stream_session_t * s)
mp = vl_msg_api_alloc (sizeof (*mp));
memset (mp, 0, sizeof (*mp));
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_RESET_SESSION);
-
- mp->session_thread_index = s->thread_index;
- mp->session_index = s->session_index;
+ mp->handle = stream_session_handle (s);
vl_msg_api_send_shmem (q, (u8 *) & mp);
}
static int
-send_session_connected_uri_callback (u32 api_client_index,
- stream_session_t * s, u8 is_fail)
+send_session_connected_callback (u32 app_index, u32 api_context,
+ stream_session_t * s, u8 is_fail)
{
vl_api_connect_uri_reply_t *mp;
unix_shared_memory_queue_t *q;
- application_t *app = application_lookup (api_client_index);
- u8 *seg_name;
+ application_t *app;
unix_shared_memory_queue_t *vpp_queue;
+ app = application_get (app_index);
q = vl_api_client_index_to_input_queue (app->api_client_index);
if (!q)
@@ -168,24 +168,15 @@ send_session_connected_uri_callback (u32 api_client_index,
mp = vl_msg_api_alloc (sizeof (*mp));
mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_CONNECT_URI_REPLY);
- mp->context = app->api_context;
+ mp->context = api_context;
if (!is_fail)
{
vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
mp->server_rx_fifo = (u64) s->server_rx_fifo;
mp->server_tx_fifo = (u64) s->server_tx_fifo;
- mp->session_thread_index = s->thread_index;
- mp->session_index = s->session_index;
- mp->session_type = s->session_type;
+ mp->handle = stream_session_handle (s);
mp->vpp_event_queue_address = (u64) vpp_queue;
- mp->client_event_queue_address = (u64) app->event_queue;
mp->retval = 0;
-
- session_manager_get_segment_info (s->server_segment_index, &seg_name,
- &mp->segment_size);
- mp->segment_name_length = vec_len (seg_name);
- if (mp->segment_name_length)
- clib_memcpy (mp->segment_name, seg_name, mp->segment_name_length);
}
else
{
@@ -195,199 +186,14 @@ send_session_connected_uri_callback (u32 api_client_index,
vl_msg_api_send_shmem (q, (u8 *) & mp);
/* Remove client if connect failed */
- if (is_fail)
- {
- application_del (app);
- }
- else
- {
- s->session_state = SESSION_STATE_READY;
- }
-
- return 0;
-}
-
-/**
- * Redirect a connect_uri message to the indicated server.
- * Only sent if the server has bound the related port with
- * URI_OPTIONS_FLAGS_USE_FIFO
- */
-static int
-redirect_connect_uri_callback (u32 server_api_client_index, void *mp_arg)
-{
- vl_api_connect_uri_t *mp = mp_arg;
- unix_shared_memory_queue_t *server_q, *client_q;
- vlib_main_t *vm = vlib_get_main ();
- f64 timeout = vlib_time_now (vm) + 0.5;
- int rv = 0;
-
- server_q = vl_api_client_index_to_input_queue (server_api_client_index);
-
- if (!server_q)
- {
- rv = VNET_API_ERROR_INVALID_VALUE;
- goto out;
- }
-
- client_q = vl_api_client_index_to_input_queue (mp->client_index);
- if (!client_q)
- {
- rv = VNET_API_ERROR_INVALID_VALUE_2;
- goto out;
- }
-
- /* Tell the server the client's API queue address, so it can reply */
- mp->client_queue_address = (u64) client_q;
-
- /*
- * Bounce message handlers MUST NOT block the data-plane.
- * Spin waiting for the queue lock, but
- */
-
- while (vlib_time_now (vm) < timeout)
- {
- rv =
- unix_shared_memory_queue_add (server_q, (u8 *) & mp, 1 /*nowait */ );
- switch (rv)
- {
- /* correctly enqueued */
- case 0:
- return VNET_CONNECT_REDIRECTED;
-
- /* continue spinning, wait for pthread_mutex_trylock to work */
- case -1:
- continue;
-
- /* queue stuffed, drop the msg */
- case -2:
- rv = VNET_API_ERROR_QUEUE_FULL;
- goto out;
- }
- }
-out:
- /* Dispose of the message */
- vl_msg_api_free (mp);
- return rv;
-}
-
-static u64
-make_session_handle (stream_session_t * s)
-{
- return (u64) s->session_index << 32 | (u64) s->thread_index;
-}
-
-static int
-send_session_accept_callback (stream_session_t * s)
-{
- vl_api_accept_sock_t *mp;
- unix_shared_memory_queue_t *q, *vpp_queue;
- application_t *server = application_get (s->app_index);
-
- q = vl_api_client_index_to_input_queue (server->api_client_index);
- vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
-
- if (!q)
- return -1;
-
- mp = vl_msg_api_alloc (sizeof (*mp));
- mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SOCK);
-
- /* Note: session_type is the first octet in all types of sessions */
-
- mp->accept_cookie = server->accept_cookie;
- mp->server_rx_fifo = (u64) s->server_rx_fifo;
- mp->server_tx_fifo = (u64) s->server_tx_fifo;
- mp->handle = make_session_handle (s);
- mp->vpp_event_queue_address = (u64) vpp_queue;
- vl_msg_api_send_shmem (q, (u8 *) & mp);
-
- return 0;
-}
-
-static int
-send_session_connected_callback (u32 api_client_index, stream_session_t * s,
- u8 is_fail)
-{
- vl_api_connect_sock_reply_t *mp;
- unix_shared_memory_queue_t *q;
- application_t *app = application_lookup (api_client_index);
- u8 *seg_name;
- unix_shared_memory_queue_t *vpp_queue;
-
- q = vl_api_client_index_to_input_queue (app->api_client_index);
-
- if (!q)
- return -1;
-
- mp = vl_msg_api_alloc (sizeof (*mp));
- mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_CONNECT_SOCK_REPLY);
- mp->context = app->api_context;
- mp->retval = is_fail;
if (!is_fail)
{
- vpp_queue = session_manager_get_vpp_event_queue (s->thread_index);
- mp->server_rx_fifo = (u64) s->server_rx_fifo;
- mp->server_tx_fifo = (u64) s->server_tx_fifo;
- mp->handle = make_session_handle (s);
- mp->vpp_event_queue_address = (u64) vpp_queue;
- mp->client_event_queue_address = (u64) app->event_queue;
-
- session_manager_get_segment_info (s->server_segment_index, &seg_name,
- &mp->segment_size);
- mp->segment_name_length = vec_len (seg_name);
- if (mp->segment_name_length)
- clib_memcpy (mp->segment_name, seg_name, mp->segment_name_length);
+ s->session_state = SESSION_STATE_READY;
}
- vl_msg_api_send_shmem (q, (u8 *) & mp);
-
- /* Remove client if connect failed */
- if (is_fail)
- application_del (app);
-
return 0;
}
-static void
-send_session_disconnect_callback (stream_session_t * s)
-{
- vl_api_disconnect_sock_t *mp;
- unix_shared_memory_queue_t *q;
- application_t *app = application_get (s->app_index);
-
- q = vl_api_client_index_to_input_queue (app->api_client_index);
-
- if (!q)
- return;
-
- mp = vl_msg_api_alloc (sizeof (*mp));
- memset (mp, 0, sizeof (*mp));
- mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SOCK);
-
- mp->handle = make_session_handle (s);
- vl_msg_api_send_shmem (q, (u8 *) & mp);
-}
-
-static void
-send_session_reset_callback (stream_session_t * s)
-{
- vl_api_reset_sock_t *mp;
- unix_shared_memory_queue_t *q;
- application_t *app = application_get (s->app_index);
-
- q = vl_api_client_index_to_input_queue (app->api_client_index);
-
- if (!q)
- return;
-
- mp = vl_msg_api_alloc (sizeof (*mp));
- memset (mp, 0, sizeof (*mp));
- mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_RESET_SOCK);
-
- mp->handle = make_session_handle (s);
- vl_msg_api_send_shmem (q, (u8 *) & mp);
-}
-
/**
* Redirect a connect_uri message to the indicated server.
* Only sent if the server has bound the related port with
@@ -396,10 +202,11 @@ send_session_reset_callback (stream_session_t * s)
static int
redirect_connect_callback (u32 server_api_client_index, void *mp_arg)
{
- vl_api_connect_sock_t *mp = mp_arg;
+ vl_api_connect_uri_t *mp = mp_arg;
unix_shared_memory_queue_t *server_q, *client_q;
vlib_main_t *vm = vlib_get_main ();
f64 timeout = vlib_time_now (vm) + 0.5;
+ application_t *app;
int rv = 0;
server_q = vl_api_client_index_to_input_queue (server_api_client_index);
@@ -419,6 +226,9 @@ redirect_connect_callback (u32 server_api_client_index, void *mp_arg)
/* Tell the server the client's API queue address, so it can reply */
mp->client_queue_address = (u64) client_q;
+ app = application_lookup (mp->client_index);
+ mp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = app->sm_properties.rx_fifo_size;
+ mp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = app->sm_properties.tx_fifo_size;
/*
* Bounce message handlers MUST NOT block the data-plane.
@@ -452,15 +262,6 @@ out:
}
static session_cb_vft_t uri_session_cb_vft = {
- .session_accept_callback = send_session_accept_uri_callback,
- .session_disconnect_callback = send_session_disconnect_uri_callback,
- .session_connected_callback = send_session_connected_uri_callback,
- .session_reset_callback = send_session_reset_uri_callback,
- .add_segment_callback = send_add_segment_callback,
- .redirect_connect_callback = redirect_connect_uri_callback
-};
-
-static session_cb_vft_t session_cb_vft = {
.session_accept_callback = send_session_accept_callback,
.session_disconnect_callback = send_session_disconnect_callback,
.session_connected_callback = send_session_connected_callback,
@@ -498,60 +299,134 @@ vl_api_session_enable_disable_t_handler (vl_api_session_enable_disable_t * mp)
}
static void
-vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
+vl_api_application_attach_t_handler (vl_api_application_attach_t * mp)
{
- vl_api_bind_uri_reply_t *rmp;
- vnet_bind_args_t _a, *a = &_a;
- char segment_name[128];
- u32 segment_name_length;
+ vl_api_application_attach_reply_t *rmp;
+ vnet_app_attach_args_t _a, *a = &_a;
int rv;
- _Static_assert (sizeof (u64) * SESSION_OPTIONS_N_OPTIONS <=
- sizeof (mp->options),
- "Out of options, fix api message definition");
+ if (session_manager_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
- segment_name_length = ARRAY_LEN (segment_name);
+ STATIC_ASSERT (sizeof (u64) * SESSION_OPTIONS_N_OPTIONS <=
+ sizeof (mp->options),
+ "Out of options, fix api message definition");
memset (a, 0, sizeof (*a));
- a->uri = (char *) mp->uri;
a->api_client_index = mp->client_index;
a->options = mp->options;
- a->segment_name = segment_name;
- a->segment_name_length = segment_name_length;
a->session_cb_vft = &uri_session_cb_vft;
- a->options[SESSION_OPTIONS_SEGMENT_SIZE] = mp->initial_segment_size;
- a->options[SESSION_OPTIONS_ACCEPT_COOKIE] = mp->accept_cookie;
- rv = vnet_bind_uri (a);
+ rv = vnet_application_attach (a);
+done:
/* *INDENT-OFF* */
- REPLY_MACRO2 (VL_API_BIND_URI_REPLY, ({
+ REPLY_MACRO2 (VL_API_APPLICATION_ATTACH_REPLY, ({
rmp->retval = rv;
if (!rv)
{
rmp->segment_name_length = 0;
/* $$$$ policy? */
- rmp->segment_size = mp->initial_segment_size;
- if (segment_name_length)
+ rmp->segment_size = a->segment_size;
+ if (a->segment_name_length)
{
- memcpy (rmp->segment_name, segment_name, segment_name_length);
- rmp->segment_name_length = segment_name_length;
+ memcpy (rmp->segment_name, a->segment_name,
+ a->segment_name_length);
+ rmp->segment_name_length = a->segment_name_length;
}
- rmp->server_event_queue_address = a->server_event_queue_address;
+ rmp->app_event_queue_address = a->app_event_queue_address;
}
}));
/* *INDENT-ON* */
}
static void
+vl_api_application_detach_t_handler (vl_api_application_detach_t * mp)
+{
+ vl_api_application_detach_reply_t *rmp;
+ int rv = VNET_API_ERROR_INVALID_VALUE_2;
+ vnet_app_detach_args_t _a, *a = &_a;
+ application_t *app;
+
+ if (session_manager_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
+
+ app = application_lookup (mp->client_index);
+ if (app)
+ {
+ a->app_index = app->index;
+ rv = vnet_application_detach (a);
+ }
+
+done:
+ REPLY_MACRO (VL_API_APPLICATION_DETACH_REPLY);
+}
+
+static void
+vl_api_bind_uri_t_handler (vl_api_bind_uri_t * mp)
+{
+ vl_api_bind_uri_reply_t *rmp;
+ vnet_bind_args_t _a, *a = &_a;
+ application_t *app;
+ int rv;
+
+ if (session_manager_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
+
+ app = application_lookup (mp->client_index);
+ if (app)
+ {
+ memset (a, 0, sizeof (*a));
+ a->uri = (char *) mp->uri;
+ a->app_index = app->index;
+ rv = vnet_bind_uri (a);
+ }
+ else
+ {
+ rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
+ }
+
+done:
+ REPLY_MACRO (VL_API_BIND_URI_REPLY);
+}
+
+static void
vl_api_unbind_uri_t_handler (vl_api_unbind_uri_t * mp)
{
vl_api_unbind_uri_reply_t *rmp;
+ application_t *app;
+ vnet_unbind_args_t _a, *a = &_a;
int rv;
- rv = vnet_unbind_uri ((char *) mp->uri, mp->client_index);
+ if (session_manager_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
+
+ app = application_lookup (mp->client_index);
+ if (app)
+ {
+ a->uri = (char *) mp->uri;
+ a->app_index = app->index;
+ rv = vnet_unbind_uri (a);
+ }
+ else
+ {
+ rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
+ }
+done:
REPLY_MACRO (VL_API_UNBIND_URI_REPLY);
}
@@ -560,26 +435,37 @@ vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp)
{
vl_api_connect_uri_reply_t *rmp;
vnet_connect_args_t _a, *a = &_a;
+ application_t *app;
int rv;
- a->uri = (char *) mp->uri;
- a->api_client_index = mp->client_index;
- a->api_context = mp->context;
- a->options = mp->options;
- a->session_cb_vft = &uri_session_cb_vft;
- a->mp = mp;
+ if (session_manager_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
- rv = vnet_connect_uri (a);
+ app = application_lookup (mp->client_index);
+ if (app)
+ {
+ a->uri = (char *) mp->uri;
+ a->api_context = mp->context;
+ a->app_index = app->index;
+ a->mp = mp;
+ rv = vnet_connect_uri (a);
+ }
+ else
+ {
+ rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
+ }
if (rv == 0 || rv == VNET_CONNECT_REDIRECTED)
return;
/* Got some error, relay it */
+done:
/* *INDENT-OFF* */
- REPLY_MACRO2 (VL_API_CONNECT_URI_REPLY, ({
- rmp->retval = rv;
- }));
+ REPLY_MACRO (VL_API_CONNECT_URI_REPLY);
/* *INDENT-ON* */
}
@@ -587,13 +473,29 @@ static void
vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
{
vl_api_disconnect_session_reply_t *rmp;
- int rv;
+ vnet_disconnect_args_t _a, *a = &_a;
+ application_t *app;
+ int rv = 0;
- rv = api_session_not_valid (mp->session_index, mp->session_thread_index);
- if (!rv)
- rv =
- vnet_disconnect_session (mp->session_index, mp->session_thread_index);
+ if (session_manager_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
+
+ app = application_lookup (mp->client_index);
+ if (app)
+ {
+ a->handle = mp->handle;
+ a->app_index = app->index;
+ rv = vnet_disconnect_session (a);
+ }
+ else
+ {
+ rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
+ }
+done:
REPLY_MACRO (VL_API_DISCONNECT_SESSION_REPLY);
}
@@ -601,11 +503,8 @@ static void
vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
mp)
{
- if (api_session_not_valid (mp->session_index, mp->session_thread_index))
- {
- clib_warning ("Invalid session!");
- return;
- }
+ vnet_disconnect_args_t _a, *a = &_a;
+ application_t *app;
/* Client objected to disconnecting the session, log and continue */
if (mp->retval)
@@ -615,15 +514,29 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
}
/* Disconnect has been confirmed. Confirm close to transport */
- vnet_disconnect_session (mp->session_index, mp->session_thread_index);
+ app = application_lookup (mp->client_index);
+ if (app)
+ {
+ a->handle = mp->handle;
+ a->app_index = app->index;
+ vnet_disconnect_session (a);
+ }
}
static void
vl_api_reset_session_reply_t_handler (vl_api_reset_session_reply_t * mp)
{
+ application_t *app;
stream_session_t *s;
+ u32 index, thread_index;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
- if (api_session_not_valid (mp->session_index, mp->session_thread_index))
+ stream_session_parse_handle (mp->handle, &index, &thread_index);
+ s = stream_session_get_if_valid (index, thread_index);
+ if (s == 0 || app->index != s->app_index)
{
clib_warning ("Invalid session!");
return;
@@ -636,8 +549,6 @@ vl_api_reset_session_reply_t_handler (vl_api_reset_session_reply_t * mp)
return;
}
- s = stream_session_get (mp->session_index, mp->session_thread_index);
-
/* This comes as a response to a reset, transport only waiting for
* confirmation to remove connection state, no need to disconnect */
stream_session_cleanup (s);
@@ -648,11 +559,13 @@ vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp)
{
stream_session_t *s;
int rv;
-
- if (api_session_not_valid (mp->session_index, mp->session_thread_index))
+ u32 session_index, thread_index;
+ session_index = stream_session_index_from_handle (mp->handle);
+ thread_index = stream_session_thread_from_handle (mp->handle);
+ if (api_session_not_valid (session_index, thread_index))
return;
- s = stream_session_get (mp->session_index, mp->session_thread_index);
+ s = stream_session_get (session_index, thread_index);
rv = mp->retval;
if (rv)
@@ -677,49 +590,31 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp)
{
vl_api_bind_sock_reply_t *rmp;
vnet_bind_args_t _a, *a = &_a;
- char segment_name[128];
- u32 segment_name_length;
- int rv;
-
- STATIC_ASSERT (sizeof (u64) * SESSION_OPTIONS_N_OPTIONS <=
- sizeof (mp->options),
- "Out of options, fix api message definition");
-
- segment_name_length = ARRAY_LEN (segment_name);
-
- memset (a, 0, sizeof (*a));
-
- clib_memcpy (&a->tep.ip, mp->ip,
- (mp->is_ip4 ? sizeof (ip4_address_t) :
- sizeof (ip6_address_t)));
- a->tep.is_ip4 = mp->is_ip4;
- a->tep.port = mp->port;
- a->tep.vrf = mp->vrf;
-
- a->api_client_index = mp->client_index;
- a->options = mp->options;
- a->segment_name = segment_name;
- a->segment_name_length = segment_name_length;
- a->session_cb_vft = &session_cb_vft;
+ int rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
+ application_t *app;
- rv = vnet_bind_uri (a);
+ if (session_manager_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
- /* *INDENT-OFF* */
- REPLY_MACRO2 (VL_API_BIND_SOCK_REPLY, ({
- rmp->retval = rv;
- if (!rv)
- {
- rmp->segment_name_length = 0;
- rmp->segment_size = mp->options[SESSION_OPTIONS_SEGMENT_SIZE];
- if (segment_name_length)
- {
- memcpy(rmp->segment_name, segment_name, segment_name_length);
- rmp->segment_name_length = segment_name_length;
- }
- rmp->server_event_queue_address = a->server_event_queue_address;
- }
- }));
- /* *INDENT-ON* */
+ app = application_lookup (mp->client_index);
+ if (app)
+ {
+ memset (a, 0, sizeof (*a));
+ clib_memcpy (&a->tep.ip, mp->ip, (mp->is_ip4 ?
+ sizeof (ip4_address_t) :
+ sizeof (ip6_address_t)));
+ a->tep.is_ip4 = mp->is_ip4;
+ a->tep.port = mp->port;
+ a->tep.vrf = mp->vrf;
+ a->app_index = app->index;
+
+ rv = vnet_bind (a);
+ }
+done:
+ REPLY_MACRO (VL_API_BIND_SOCK_REPLY);
}
static void
@@ -727,13 +622,24 @@ vl_api_unbind_sock_t_handler (vl_api_unbind_sock_t * mp)
{
vl_api_unbind_sock_reply_t *rmp;
vnet_unbind_args_t _a, *a = &_a;
- int rv;
+ application_t *app;
+ int rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
- a->api_client_index = mp->client_index;
- a->handle = mp->handle;
+ if (session_manager_is_enabled () == 0)
+ {
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
+ }
- rv = vnet_unbind (a);
+ app = application_lookup (mp->client_index);
+ if (app)
+ {
+ a->app_index = mp->client_index;
+ a->handle = mp->handle;
+ rv = vnet_unbind (a);
+ }
+done:
REPLY_MACRO (VL_API_UNBIND_SOCK_REPLY);
}
@@ -742,114 +648,55 @@ vl_api_connect_sock_t_handler (vl_api_connect_sock_t * mp)
{
vl_api_connect_sock_reply_t *rmp;
vnet_connect_args_t _a, *a = &_a;
+ application_t *app;
int rv;
- clib_memcpy (&a->tep.ip, mp->ip,
- (mp->is_ip4 ? sizeof (ip4_address_t) :
- sizeof (ip6_address_t)));
- a->tep.is_ip4 = mp->is_ip4;
- a->tep.port = mp->port;
- a->tep.vrf = mp->vrf;
- a->options = mp->options;
- a->session_cb_vft = &session_cb_vft;
- a->api_context = mp->context;
- a->mp = mp;
-
- rv = vnet_connect (a);
-
- if (rv == 0 || rv == VNET_CONNECT_REDIRECTED)
- return;
-
- /* Got some error, relay it */
-
- /* *INDENT-OFF* */
- REPLY_MACRO2 (VL_API_CONNECT_URI_REPLY, ({
- rmp->retval = rv;
- }));
- /* *INDENT-ON* */
-}
-
-static void
-vl_api_disconnect_sock_t_handler (vl_api_disconnect_sock_t * mp)
-{
- vnet_disconnect_args_t _a, *a = &_a;
- vl_api_disconnect_sock_reply_t *rmp;
- int rv;
-
- a->api_client_index = mp->client_index;
- a->handle = mp->handle;
- rv = vnet_disconnect (a);
-
- REPLY_MACRO (VL_API_DISCONNECT_SOCK_REPLY);
-}
-
-static void
-vl_api_disconnect_sock_reply_t_handler (vl_api_disconnect_sock_reply_t * mp)
-{
- vnet_disconnect_args_t _a, *a = &_a;
-
- /* Client objected to disconnecting the session, log and continue */
- if (mp->retval)
+ if (session_manager_is_enabled () == 0)
{
- clib_warning ("client retval %d", mp->retval);
- return;
+ rv = VNET_API_ERROR_FEATURE_DISABLED;
+ goto done;
}
- a->api_client_index = mp->client_index;
- a->handle = mp->handle;
-
- vnet_disconnect (a);
-}
-
-static void
-vl_api_reset_sock_reply_t_handler (vl_api_reset_sock_reply_t * mp)
-{
- stream_session_t *s;
- u32 session_index, thread_index;
-
- /* Client objected to resetting the session, log and continue */
- if (mp->retval)
+ app = application_lookup (mp->client_index);
+ if (app)
{
- clib_warning ("client retval %d", mp->retval);
- return;
+ clib_memcpy (&a->tep.ip, mp->ip,
+ (mp->is_ip4 ? sizeof (ip4_address_t) :
+ sizeof (ip6_address_t)));
+ a->api_context = mp->context;
+ a->app_index = app->index;
+ a->mp = mp;
+ rv = vnet_connect (a);
}
-
- if (api_parse_session_handle (mp->handle, &session_index, &thread_index))
+ else
{
- clib_warning ("Invalid handle");
- return;
+ rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED;
}
- s = stream_session_get (session_index, thread_index);
+ if (rv == 0 || rv == VNET_CONNECT_REDIRECTED)
+ return;
- /* This comes as a response to a reset, transport only waiting for
- * confirmation to remove connection state, no need to disconnect */
- stream_session_cleanup (s);
+ /* Got some error, relay it */
+
+done:
+ REPLY_MACRO (VL_API_CONNECT_URI_REPLY);
}
-static void
-vl_api_accept_sock_reply_t_handler (vl_api_accept_sock_reply_t * mp)
+static clib_error_t *
+application_reaper_cb (u32 client_index)
{
- stream_session_t *s;
- u32 session_index, thread_index;
-
- if (api_parse_session_handle (mp->handle, &session_index, &thread_index))
- {
- clib_warning ("Invalid handle");
- return;
- }
- s = stream_session_get (session_index, thread_index);
-
- if (mp->retval)
+ application_t *app = application_lookup (client_index);
+ vnet_app_detach_args_t _a, *a = &_a;
+ if (app)
{
- /* Server isn't interested, kill the session */
- stream_session_disconnect (s);
- return;
+ a->app_index = app->index;
+ vnet_application_detach (a);
}
-
- s->session_state = SESSION_STATE_READY;
+ return 0;
}
+VL_MSG_API_REAPER_FUNCTION (application_reaper_cb);
+
#define vl_msg_name_crc_list
#include <vnet/vnet_all_api_h.h>
#undef vl_msg_name_crc_list
@@ -903,6 +750,7 @@ session_api_hookup (vlib_main_t * vm)
}
VLIB_API_INIT_FUNCTION (session_api_hookup);
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index 2f912cbc..7ea7af15 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -30,7 +30,7 @@ typedef struct _transport_connection
ip46_address_t lcl_ip; /**< Local IP */
u16 lcl_port; /**< Local port */
u16 rmt_port; /**< Remote port */
- u8 proto; /**< Transport protocol id */
+ u8 proto; /**< Transport protocol id (also session type) */
u32 s_index; /**< Parent session index */
u32 c_index; /**< Connection index in transport pool */
@@ -103,7 +103,8 @@ typedef CLIB_PACKED (struct {
{
struct
{
- ip4_address_t src; ip4_address_t dst;
+ ip4_address_t src;
+ ip4_address_t dst;
u16 src_port;
u16 dst_port;
/* align by making this 4 octets even though its a 1-bit field
@@ -122,10 +123,14 @@ typedef CLIB_PACKED (struct {
struct
{
/* 48 octets */
- ip6_address_t src; ip6_address_t dst;
+ ip6_address_t src;
+ ip6_address_t dst;
u16 src_port;
- u16 dst_port; u32 proto; u8 unused_for_now[8];
- }; u64 as_u64[6];
+ u16 dst_port;
+ u32 proto;
+ u8 unused_for_now[8];
+ };
+ u64 as_u64[6];
};
}) v6_connection_key_t;
/* *INDENT-ON* */
@@ -233,10 +238,10 @@ make_v6_ss_kv_from_tc (session_kv6_t * kv, transport_connection_t * t)
typedef struct _transport_endpoint
{
- ip46_address_t ip;
- u16 port;
- u8 is_ip4;
- u32 vrf;
+ ip46_address_t ip; /** ip address */
+ u16 port; /** port in host order */
+ u8 is_ip4; /** 1 if ip4 */
+ u32 vrf; /** fib table the endpoint is associated with */
} transport_endpoint_t;
typedef clib_bihash_24_8_t transport_endpoint_table_t;
diff --git a/src/vnet/tcp/builtin_client.c b/src/vnet/tcp/builtin_client.c
index 9e8e1561..f8fbf28c 100644
--- a/src/vnet/tcp/builtin_client.c
+++ b/src/vnet/tcp/builtin_client.c
@@ -237,8 +237,7 @@ tclient_thread_fn (void *arg)
memset (dmp, 0, sizeof (*dmp));
dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
dmp->client_index = tm->my_client_index;
- dmp->session_index = sp->vpp_session_index;
- dmp->session_thread_index = sp->vpp_session_thread;
+ dmp->handle = sp->vpp_session_handle;
vl_msg_api_send_shmem (tm->vl_input_queue, (u8 *) & dmp);
pool_put (tm->sessions, sp);
}
@@ -253,9 +252,10 @@ tclient_thread_fn (void *arg)
static void
vl_api_memclnt_create_reply_t_handler (vl_api_memclnt_create_reply_t * mp)
{
+ vlib_main_t *vm = vlib_get_main ();
tclient_main_t *tm = &tclient_main;
-
tm->my_client_index = mp->index;
+ vlib_process_signal_event (vm, tm->node_index, 1 /* evt */ , 0 /* data */ );
}
static void
@@ -264,7 +264,6 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
tclient_main_t *tm = &tclient_main;
session_t *session;
u32 session_index;
- u64 key;
i32 retval = /* clib_net_to_host_u32 ( */ mp->retval /*) */ ;
if (retval < 0)
@@ -291,24 +290,24 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
session->server_rx_fifo->client_session_index = session_index;
session->server_tx_fifo = (svm_fifo_t *) mp->server_tx_fifo;
session->server_tx_fifo->client_session_index = session_index;
-
- session->vpp_session_index = mp->session_index;
- session->vpp_session_thread = mp->session_thread_index;
+ session->vpp_session_handle = mp->handle;
/* Add it to the session lookup table */
- key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
- hash_set (tm->session_index_by_vpp_handles, key, session_index);
+ hash_set (tm->session_index_by_vpp_handles, mp->handle, session_index);
tm->ready_connections++;
}
-static void
+static int
create_api_loopback (tclient_main_t * tm)
{
+ vlib_main_t *vm = vlib_get_main ();
vl_api_memclnt_create_t _m, *mp = &_m;
extern void vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t *);
api_main_t *am = &api_main;
vl_shmem_hdr_t *shmem_hdr;
+ uword *event_data = 0, event_type;
+ int resolved = 0;
/*
* Create a "loopback" API client connection
@@ -324,6 +323,25 @@ create_api_loopback (tclient_main_t * tm)
strncpy ((char *) mp->name, "tcp_tester", sizeof (mp->name) - 1);
vl_api_memclnt_create_t_handler (mp);
+
+ /* Wait for reply */
+ tm->node_index = vlib_get_current_process (vm)->node_runtime.node_index;
+ vlib_process_wait_for_event_or_clock (vm, 1.0);
+ event_type = vlib_process_get_events (vm, &event_data);
+ switch (event_type)
+ {
+ case 1:
+ resolved = 1;
+ break;
+ case ~0:
+ /* timed out */
+ break;
+ default:
+ clib_warning ("unknown event_type %d", event_type);
+ }
+ if (!resolved)
+ return -1;
+ return 0;
}
#define foreach_tclient_static_api_msg \
@@ -333,17 +351,7 @@ _(CONNECT_URI_REPLY, connect_uri_reply)
static clib_error_t *
tclient_api_hookup (vlib_main_t * vm)
{
- tclient_main_t *tm = &tclient_main;
vl_msg_api_msg_config_t _c, *c = &_c;
- int i;
-
- /* Init test data */
- vec_validate (tm->connect_test_data, 64 * 1024 - 1);
- for (i = 0; i < vec_len (tm->connect_test_data); i++)
- tm->connect_test_data[i] = i & 0xff;
-
- tm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
- vec_validate (tm->rx_buf, vec_len (tm->connect_test_data) - 1);
/* Hook up client-side static APIs to our handlers */
#define _(N,n) do { \
@@ -365,18 +373,105 @@ tclient_api_hookup (vlib_main_t * vm)
return 0;
}
-VLIB_API_INIT_FUNCTION (tclient_api_hookup);
+static int
+tcp_test_clients_init (vlib_main_t * vm)
+{
+ tclient_main_t *tm = &tclient_main;
+ int i;
+
+ tclient_api_hookup (vm);
+ if (create_api_loopback (tm))
+ return -1;
+
+ /* Init test data */
+ vec_validate (tm->connect_test_data, 64 * 1024 - 1);
+ for (i = 0; i < vec_len (tm->connect_test_data); i++)
+ tm->connect_test_data[i] = i & 0xff;
+
+ tm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
+ vec_validate (tm->rx_buf, vec_len (tm->connect_test_data) - 1);
+
+ tm->is_init = 1;
+
+ return 0;
+}
+
+static void
+builtin_session_reset_callback (stream_session_t * s)
+{
+ return;
+}
+
+static int
+builtin_session_connected_callback (u32 app_index, u32 api_context,
+ stream_session_t * s, u8 code)
+{
+ return 0;
+}
+
+static int
+builtin_session_create_callback (stream_session_t * s)
+{
+ return 0;
+}
+
+static void
+builtin_session_disconnect_callback (stream_session_t * s)
+{
+ return;
+}
+
+static int
+builtin_server_rx_callback (stream_session_t * s)
+{
+ return 0;
+}
+
+/* *INDENT-OFF* */
+static session_cb_vft_t builtin_clients = {
+ .session_reset_callback = builtin_session_reset_callback,
+ .session_connected_callback = builtin_session_connected_callback,
+ .session_accept_callback = builtin_session_create_callback,
+ .session_disconnect_callback = builtin_session_disconnect_callback,
+ .builtin_server_rx_callback = builtin_server_rx_callback
+};
+/* *INDENT-ON* */
+
+static int
+attach_builtin_test_clients ()
+{
+ vnet_app_attach_args_t _a, *a = &_a;
+ u8 segment_name[128];
+ u32 segment_name_length;
+ u64 options[16];
+
+ segment_name_length = ARRAY_LEN (segment_name);
+
+ memset (a, 0, sizeof (*a));
+ memset (options, 0, sizeof (options));
+
+ a->api_client_index = ~0;
+ a->segment_name = segment_name;
+ a->segment_name_length = segment_name_length;
+ a->session_cb_vft = &builtin_clients;
+
+ options[SESSION_OPTIONS_ACCEPT_COOKIE] = 0x12345678;
+ options[SESSION_OPTIONS_SEGMENT_SIZE] = (2 << 30); /*$$$$ config / arg */
+ a->options = options;
+
+ return vnet_application_attach (a);
+}
static clib_error_t *
test_tcp_clients_command_fn (vlib_main_t * vm,
unformat_input_t * input,
vlib_cli_command_t * cmd)
{
+ tclient_main_t *tm = &tclient_main;
u8 *connect_uri = (u8 *) "tcp://6.0.1.1/1234";
u8 *uri;
- tclient_main_t *tm = &tclient_main;
- int i;
u32 n_clients = 1;
+ int i;
tm->bytes_to_send = 8192;
tm->n_iterations = 1;
@@ -397,14 +492,19 @@ test_tcp_clients_command_fn (vlib_main_t * vm,
format_unformat_error, input);
}
+ if (tm->is_init == 0)
+ {
+ if (tcp_test_clients_init (vm))
+ return clib_error_return (0, "failed init");
+ }
+
tm->ready_connections = 0;
tm->expected_connections = n_clients;
+
uri = connect_uri;
if (tm->connect_uri)
uri = tm->connect_uri;
- create_api_loopback (tm);
-
#if TCP_BUILTIN_CLIENT_PTHREAD
/* Start a transmit thread */
if (tm->client_thread_handle == 0)
@@ -420,6 +520,7 @@ test_tcp_clients_command_fn (vlib_main_t * vm,
}
#endif
vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
+ attach_builtin_test_clients ();
/* Fire off connect requests, in something approaching a normal manner */
for (i = 0; i < n_clients; i++)
@@ -461,6 +562,16 @@ VLIB_CLI_COMMAND (test_clients_command, static) =
};
/* *INDENT-ON* */
+clib_error_t *
+tcp_test_clients_main_init (vlib_main_t * vm)
+{
+ tclient_main_t *tm = &tclient_main;
+ tm->is_init = 0;
+ return 0;
+}
+
+VLIB_INIT_FUNCTION (tcp_test_clients_main_init);
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vnet/tcp/builtin_client.h b/src/vnet/tcp/builtin_client.h
index 64030302..2bd87c07 100644
--- a/src/vnet/tcp/builtin_client.h
+++ b/src/vnet/tcp/builtin_client.h
@@ -39,8 +39,7 @@ typedef struct
svm_fifo_t *server_rx_fifo;
svm_fifo_t *server_tx_fifo;
- u32 vpp_session_index;
- u32 vpp_session_thread;
+ u64 vpp_session_handle;
} session_t;
typedef struct
@@ -110,6 +109,10 @@ typedef struct
u32 client_bytes_received;
u8 test_return_packets;
+ u8 is_init;
+
+ u32 node_index;
+
/* convenience */
vlib_main_t *vlib_main;
vnet_main_t *vnet_main;
diff --git a/src/vnet/tcp/builtin_server.c b/src/vnet/tcp/builtin_server.c
index 917d4bd3..8308e3d9 100644
--- a/src/vnet/tcp/builtin_server.c
+++ b/src/vnet/tcp/builtin_server.c
@@ -18,17 +18,46 @@
#include <vnet/session/application.h>
#include <vnet/session/application_interface.h>
+/* define message IDs */
+#include <vpp/api/vpe_msg_enum.h>
+
+/* define message structures */
+#define vl_typedefs
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_typedefs
+
+/* define generated endian-swappers */
+#define vl_endianfun
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_endianfun
+
+/* instantiate all the print functions we know about */
+#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__)
+#define vl_printfun
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_printfun
+
typedef struct
{
u8 *rx_buf;
unix_shared_memory_queue_t **vpp_queue;
- u32 byte_index;
+ u64 byte_index;
+
+ /* Sever's event queue */
+ unix_shared_memory_queue_t *vl_input_queue;
+
+ /* API client handle */
+ u32 my_client_index;
+
+ u32 app_index;
+
+ /* process node index for evnt scheduling */
+ u32 node_index;
vlib_main_t *vlib_main;
} builtin_server_main_t;
builtin_server_main_t builtin_server_main;
-
int
builtin_session_accept_callback (stream_session_t * s)
{
@@ -45,9 +74,13 @@ builtin_session_accept_callback (stream_session_t * s)
void
builtin_session_disconnect_callback (stream_session_t * s)
{
+ builtin_server_main_t *bsm = &builtin_server_main;
+ vnet_disconnect_args_t _a, *a = &_a;
clib_warning ("called...");
- vnet_disconnect_session (s->session_index, s->thread_index);
+ a->handle = stream_session_handle (s);
+ a->app_index = bsm->app_index;
+ vnet_disconnect_session (a);
}
void
@@ -60,7 +93,7 @@ builtin_session_reset_callback (stream_session_t * s)
int
-builtin_session_connected_callback (u32 client_index,
+builtin_session_connected_callback (u32 app_index, u32 api_context,
stream_session_t * s, u8 is_fail)
{
clib_warning ("called...");
@@ -91,7 +124,7 @@ test_bytes (builtin_server_main_t * bsm, int actual_transfer)
{
if (bsm->rx_buf[i] != ((bsm->byte_index + i) & 0xff))
{
- clib_warning ("at %d expected %d got %d", bsm->byte_index + i,
+ clib_warning ("at %lld expected %d got %d", bsm->byte_index + i,
(bsm->byte_index + i) & 0xff, bsm->rx_buf[i]);
}
}
@@ -190,23 +223,66 @@ static session_cb_vft_t builtin_session_cb_vft = {
.session_reset_callback = builtin_session_reset_callback
};
+/* Abuse VPP's input queue */
static int
-server_create (vlib_main_t * vm)
+create_api_loopback (vlib_main_t * vm)
{
- vnet_bind_args_t _a, *a = &_a;
- u64 options[SESSION_OPTIONS_N_OPTIONS];
- char segment_name[128];
- u32 num_threads;
- vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ builtin_server_main_t *bsm = &builtin_server_main;
+ vl_api_memclnt_create_t _m, *mp = &_m;
+ extern void vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t *);
+ api_main_t *am = &api_main;
+ vl_shmem_hdr_t *shmem_hdr;
+ uword *event_data = 0, event_type;
+ int resolved = 0;
- num_threads = 1 /* main thread */ + vtm->n_threads;
- vec_validate (builtin_server_main.vpp_queue, num_threads - 1);
+ /*
+ * Create a "loopback" API client connection
+ * Don't do things like this unless you know what you're doing...
+ */
+
+ shmem_hdr = am->shmem_hdr;
+ bsm->vl_input_queue = shmem_hdr->vl_input_queue;
+ memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = VL_API_MEMCLNT_CREATE;
+ mp->context = 0xFEEDFACE;
+ mp->input_queue = (u64) bsm->vl_input_queue;
+ strncpy ((char *) mp->name, "tcp_test_server", sizeof (mp->name) - 1);
+
+ vl_api_memclnt_create_t_handler (mp);
+
+ /* Wait for reply */
+ bsm->node_index = vlib_get_current_process (vm)->node_runtime.node_index;
+ vlib_process_wait_for_event_or_clock (vm, 1.0);
+ event_type = vlib_process_get_events (vm, &event_data);
+ switch (event_type)
+ {
+ case 1:
+ resolved = 1;
+ break;
+ case ~0:
+ /* timed out */
+ break;
+ default:
+ clib_warning ("unknown event_type %d", event_type);
+ }
+ if (!resolved)
+ return -1;
+
+ return 0;
+}
+
+static int
+server_attach ()
+{
+ builtin_server_main_t *bsm = &builtin_server_main;
+ u8 segment_name[128];
+ u64 options[SESSION_OPTIONS_N_OPTIONS];
+ vnet_app_attach_args_t _a, *a = &_a;
memset (a, 0, sizeof (*a));
memset (options, 0, sizeof (options));
- a->uri = "tcp://0.0.0.0/1234";
- a->api_client_index = ~0;
+ a->api_client_index = bsm->my_client_index;
a->session_cb_vft = &builtin_session_cb_vft;
a->options = options;
a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 128 << 20;
@@ -215,9 +291,94 @@ server_create (vlib_main_t * vm)
a->segment_name = segment_name;
a->segment_name_length = ARRAY_LEN (segment_name);
+ if (vnet_application_attach (a))
+ {
+ clib_warning ("failed to attach server");
+ return -1;
+ }
+ bsm->app_index = a->app_index;
+ return 0;
+}
+
+static int
+server_listen ()
+{
+ builtin_server_main_t *bsm = &builtin_server_main;
+ vnet_bind_args_t _a, *a = &_a;
+ memset (a, 0, sizeof (*a));
+ a->app_index = bsm->app_index;
+ a->uri = "tcp://0.0.0.0/1234";
return vnet_bind_uri (a);
}
+static int
+server_create (vlib_main_t * vm)
+{
+ builtin_server_main_t *bsm = &builtin_server_main;
+ u32 num_threads;
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+
+ if (bsm->my_client_index == (u32) ~ 0)
+ {
+ if (create_api_loopback (vm))
+ return -1;
+ }
+
+ num_threads = 1 /* main thread */ + vtm->n_threads;
+ vec_validate (builtin_server_main.vpp_queue, num_threads - 1);
+
+ if (server_attach ())
+ {
+ clib_warning ("failed to attach server");
+ return -1;
+ }
+ if (server_listen ())
+ {
+ clib_warning ("failed to start listening");
+ return -1;
+ }
+ return 0;
+}
+
+/* Get our api client index */
+static void
+vl_api_memclnt_create_reply_t_handler (vl_api_memclnt_create_reply_t * mp)
+{
+ vlib_main_t *vm = vlib_get_main ();
+ builtin_server_main_t *bsm = &builtin_server_main;
+ bsm->my_client_index = mp->index;
+ vlib_process_signal_event (vm, bsm->node_index, 1 /* evt */ ,
+ 0 /* data */ );
+}
+
+#define foreach_tcp_builtin_server_api_msg \
+_(MEMCLNT_CREATE_REPLY, memclnt_create_reply) \
+
+static clib_error_t *
+tcp_builtin_server_api_hookup (vlib_main_t * vm)
+{
+ vl_msg_api_msg_config_t _c, *c = &_c;
+
+ /* Hook up client-side static APIs to our handlers */
+#define _(N,n) do { \
+ c->id = VL_API_##N; \
+ c->name = #n; \
+ c->handler = vl_api_##n##_t_handler; \
+ c->cleanup = vl_noop_handler; \
+ c->endian = vl_api_##n##_t_endian; \
+ c->print = vl_api_##n##_t_print; \
+ c->size = sizeof(vl_api_##n##_t); \
+ c->traced = 1; /* trace, so these msgs print */ \
+ c->replay = 0; /* don't replay client create/delete msgs */ \
+ c->message_bounce = 0; /* don't bounce this message */ \
+ vl_msg_api_config(c);} while (0);
+
+ foreach_tcp_builtin_server_api_msg;
+#undef _
+
+ return 0;
+}
+
static clib_error_t *
server_create_command_fn (vlib_main_t * vm,
unformat_input_t * input, vlib_cli_command_t * cmd)
@@ -234,6 +395,7 @@ server_create_command_fn (vlib_main_t * vm,
}
#endif
+ tcp_builtin_server_api_hookup (vm);
vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
rv = server_create (vm);
switch (rv)
@@ -249,12 +411,22 @@ server_create_command_fn (vlib_main_t * vm,
/* *INDENT-OFF* */
VLIB_CLI_COMMAND (server_create_command, static) =
{
- .path = "test server",
- .short_help = "test server",
+ .path = "test tcp server",
+ .short_help = "test tcp server",
.function = server_create_command_fn,
};
/* *INDENT-ON* */
+clib_error_t *
+builtin_tcp_server_main_init (vlib_main_t * vm)
+{
+ builtin_server_main_t *bsm = &builtin_server_main;
+ bsm->my_client_index = ~0;
+ return 0;
+}
+
+VLIB_INIT_FUNCTION (builtin_tcp_server_main_init);
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index b6c34828..a0c66b9f 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -34,14 +34,19 @@ tcp_connection_bind (u32 session_index, ip46_address_t * ip,
listener->c_lcl_port = clib_host_to_net_u16 (port_host_byte_order);
if (is_ip4)
- listener->c_lcl_ip4.as_u32 = ip->ip4.as_u32;
+ {
+ listener->c_lcl_ip4.as_u32 = ip->ip4.as_u32;
+ listener->c_is_ip4 = 1;
+ listener->c_proto = SESSION_TYPE_IP4_TCP;
+ }
else
- clib_memcpy (&listener->c_lcl_ip6, &ip->ip6, sizeof (ip6_address_t));
+ {
+ clib_memcpy (&listener->c_lcl_ip6, &ip->ip6, sizeof (ip6_address_t));
+ listener->c_proto = SESSION_TYPE_IP6_TCP;
+ }
listener->c_s_index = session_index;
- listener->c_proto = SESSION_TYPE_IP4_TCP;
listener->state = TCP_STATE_LISTEN;
- listener->c_is_ip4 = 1;
tcp_connection_timers_init (listener);
@@ -62,7 +67,6 @@ tcp_session_bind_ip6 (u32 session_index, ip46_address_t * ip,
u16 port_host_byte_order)
{
return tcp_connection_bind (session_index, ip, port_host_byte_order, 0);
-
}
static void
@@ -397,6 +401,7 @@ tcp_connection_open (ip46_address_t * rmt_addr, u16 rmt_port, u8 is_ip4)
tc->c_lcl_port = clib_host_to_net_u16 (lcl_port);
tc->c_c_index = tc - tm->half_open_connections;
tc->c_is_ip4 = is_ip4;
+ tc->c_proto = is_ip4 ? SESSION_TYPE_IP4_TCP : SESSION_TYPE_IP6_TCP;
/* The other connection vars will be initialized after SYN ACK */
tcp_connection_timers_init (tc);
@@ -518,7 +523,10 @@ format_tcp_session (u8 * s, va_list * args)
tcp_connection_t *tc;
tc = tcp_connection_get (tci, thread_index);
- return format (s, "%U", format_tcp_connection, tc);
+ if (tc)
+ return format (s, "%U", format_tcp_connection, tc);
+ else
+ return format (s, "empty");
}
u8 *
diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h
index 2f5da108..93f3245d 100644
--- a/src/vnet/tcp/tcp.h
+++ b/src/vnet/tcp/tcp.h
@@ -100,8 +100,6 @@ extern timer_expiration_handler tcp_timer_retransmit_syn_handler;
#define TCP_RTO_SYN_RETRIES 3 /* SYN retries without doubling RTO */
#define TCP_RTO_INIT 1 * THZ /* Initial retransmit timer */
-void tcp_update_time (f64 now, u32 thread_index);
-
/** TCP connection flags */
#define foreach_tcp_connection_flag \
_(SNDACK, "Send ACK") \
@@ -481,6 +479,13 @@ tcp_time_now (void)
return clib_cpu_time_now () * tcp_main.tstamp_ticks_per_clock;
}
+always_inline void
+tcp_update_time (f64 now, u32 thread_index)
+{
+ tw_timer_expire_timers_16t_2w_512sl (&tcp_main.timer_wheels[thread_index],
+ now);
+}
+
u32 tcp_push_header (transport_connection_t * tconn, vlib_buffer_t * b);
u32
diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c
index 7e9fa47b..ae1f92d5 100644
--- a/src/vnet/tcp/tcp_input.c
+++ b/src/vnet/tcp/tcp_input.c
@@ -1841,6 +1841,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
case TCP_STATE_ESTABLISHED:
case TCP_STATE_FIN_WAIT_1:
case TCP_STATE_FIN_WAIT_2:
+ vlib_buffer_advance (b0, n_advance_bytes0);
error0 = tcp_segment_rcv (tm, tc0, b0, n_data_bytes0, &next0);
break;
case TCP_STATE_CLOSE_WAIT:
@@ -2410,12 +2411,6 @@ VLIB_REGISTER_NODE (tcp6_input_node) =
/* *INDENT-ON* */
VLIB_NODE_FUNCTION_MULTIARCH (tcp6_input_node, tcp6_input);
-void
-tcp_update_time (f64 now, u32 thread_index)
-{
- tcp_main_t *tm = vnet_get_tcp_main ();
- tw_timer_expire_timers_16t_2w_512sl (&tm->timer_wheels[thread_index], now);
-}
static void
tcp_dispatch_table_init (tcp_main_t * tm)
diff --git a/src/vnet/tcp/tcp_test.c b/src/vnet/tcp/tcp_test.c
index 0725bb04..3dbbdf6f 100644
--- a/src/vnet/tcp/tcp_test.c
+++ b/src/vnet/tcp/tcp_test.c
@@ -12,7 +12,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#include <vnet/tcp/tcp.h>
#define TCP_TEST_I(_cond, _comment, _args...) \
@@ -174,6 +173,118 @@ tcp_test_sack ()
return 0;
}
+static int
+tcp_test_fifo (vlib_main_t * vm, unformat_input_t * input)
+{
+ svm_fifo_t *f;
+ u32 fifo_size = 1 << 20;
+ u32 *test_data = 0;
+ u32 offset;
+ int i, rv;
+ u32 data_word, test_data_len;
+
+ /* $$$ parse args */
+ test_data_len = fifo_size / sizeof (u32);
+ vec_validate (test_data, test_data_len - 1);
+
+ for (i = 0; i < vec_len (test_data); i++)
+ test_data[i] = i;
+
+ f = svm_fifo_create (fifo_size);
+
+ /* Paint fifo data vector with -1's */
+ memset (f->data, 0xFF, test_data_len);
+
+ /* Enqueue an initial (un-dequeued) chunk */
+ rv = svm_fifo_enqueue_nowait (f, 0 /* pid */ ,
+ sizeof (u32), (u8 *) test_data);
+
+ if (rv != sizeof (u32))
+ {
+ clib_warning ("enqueue returned %d", rv);
+ goto out;
+ }
+
+ /*
+ * Create 3 chunks in the future. The offsets are relative
+ * to the current fifo tail
+ */
+ for (i = 0; i < 3; i++)
+ {
+ offset = (2 * i + 1) * sizeof (u32);
+ vlib_cli_output (vm, "add offset %d", offset);
+
+ rv = svm_fifo_enqueue_with_offset
+ (f, 0 /* pid */ , offset, sizeof (u32),
+ (u8 *) (test_data + ((offset + sizeof (u32)) / sizeof (u32))));
+
+ if (rv)
+ {
+ clib_warning ("enqueue returned %d", rv);
+ goto out;
+ }
+ }
+
+ /* Paint missing data backwards */
+ for (i = 3; i > 0; i--)
+ {
+ offset = (2 * i + 0) * sizeof (u32);
+
+ vlib_cli_output (vm, "add offset %d", offset);
+
+ rv = svm_fifo_enqueue_with_offset
+ (f, 0 /* pid */ , offset, sizeof (u32),
+ (u8 *) (test_data + ((offset + sizeof (u32)) / sizeof (u32))));
+
+ if (rv)
+ {
+ clib_warning ("enqueue returned %d", rv);
+ goto out;
+ }
+ }
+
+ vlib_cli_output (vm, "fifo before missing link: %U",
+ format_svm_fifo, f, 1 /* verbose */ );
+
+ /* Enqueue the missing u32 */
+ rv = svm_fifo_enqueue_nowait (f, 0 /* pid */ ,
+ sizeof (u32), (u8 *) (test_data + 1));
+ if (rv != 7 * sizeof (u32))
+ {
+ clib_warning ("enqueue returned %d", rv);
+ goto out;
+ }
+
+ vlib_cli_output (vm, "fifo after missing link: %U",
+ format_svm_fifo, f, 1 /* verbose */ );
+
+ /* Collect results */
+ for (i = 0; i < 7; i++)
+ {
+ rv = svm_fifo_dequeue_nowait (f, 0 /* pid */ , sizeof (u32),
+ (u8 *) & data_word);
+ if (rv != sizeof (u32))
+ {
+ clib_warning ("dequeue returned %d", rv);
+ goto out;
+ }
+ if (data_word != test_data[i])
+ {
+ clib_warning ("recovered data %d not %d", data_word, test_data[i]);
+ goto out;
+ }
+ }
+
+ clib_warning ("test complete...");
+
+out:
+ svm_fifo_free (f);
+ vec_free (test_data);
+ return 0;
+}
+
+
+
static clib_error_t *
tcp_test (vlib_main_t * vm,
unformat_input_t * input, vlib_cli_command_t * cmd_arg)
@@ -186,6 +297,10 @@ tcp_test (vlib_main_t * vm,
{
res = tcp_test_sack ();
}
+ else if (unformat (input, "fifo"))
+ {
+ res = tcp_test_fifo (vm, input);
+ }
else
{
return clib_error_return (0, "unknown input `%U'",
@@ -203,10 +318,16 @@ tcp_test (vlib_main_t * vm,
}
}
+/* *INDENT-OFF* */
VLIB_CLI_COMMAND (tcp_test_command, static) =
{
-.path = "test tcp",.short_help = "internal tcp unit tests",.function =
- tcp_test,};
+ .path = "test tcp",
+ .short_help = "internal tcp unit tests",
+ .function = tcp_test,
+};
+/* *INDENT-ON* */
+
+
/*
* fd.io coding-style-patch-verification: ON
*
diff --git a/src/vnet/udp/builtin_server.c b/src/vnet/udp/builtin_server.c
index 57f774c5..8565f04c 100644
--- a/src/vnet/udp/builtin_server.c
+++ b/src/vnet/udp/builtin_server.c
@@ -91,12 +91,11 @@ static session_cb_vft_t builtin_server = {
/* *INDENT-ON* */
static int
-bind_builtin_uri_server (u8 * uri)
+attach_builtin_uri_server ()
{
- vnet_bind_args_t _a, *a = &_a;
- char segment_name[128];
+ vnet_app_attach_args_t _a, *a = &_a;
+ u8 segment_name[128];
u32 segment_name_length;
- int rv;
u64 options[16];
segment_name_length = ARRAY_LEN (segment_name);
@@ -104,8 +103,7 @@ bind_builtin_uri_server (u8 * uri)
memset (a, 0, sizeof (*a));
memset (options, 0, sizeof (options));
- a->uri = (char *) uri;
- a->api_client_index = ~0; /* built-in server */
+ a->api_client_index = ~0;
a->segment_name = segment_name;
a->segment_name_length = segment_name_length;
a->session_cb_vft = &builtin_server;
@@ -114,6 +112,23 @@ bind_builtin_uri_server (u8 * uri)
options[SESSION_OPTIONS_SEGMENT_SIZE] = (2 << 30); /*$$$$ config / arg */
a->options = options;
+ return vnet_application_attach (a);
+}
+
+static int
+bind_builtin_uri_server (u8 * uri)
+{
+ vnet_bind_args_t _a, *a = &_a;
+ int rv;
+
+ rv = attach_builtin_uri_server ();
+ if (rv)
+ return rv;
+
+ memset (a, 0, sizeof (*a));
+ a->uri = (char *) uri;
+ a->app_index = ~0; /* built-in server */
+
rv = vnet_bind_uri (a);
return rv;
@@ -122,11 +137,12 @@ bind_builtin_uri_server (u8 * uri)
static int
unbind_builtin_uri_server (u8 * uri)
{
- int rv;
+ vnet_unbind_args_t _a, *a = &_a;
- rv = vnet_unbind_uri ((char *) uri, ~0 /* client_index */ );
+ a->app_index = ~0;
+ a->uri = (char *) uri;
- return rv;
+ return vnet_unbind_uri (a);
}
static clib_error_t *