summaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/application.c16
-rw-r--r--src/vnet/session/application.h3
-rw-r--r--src/vnet/session/application_interface.c5
-rw-r--r--src/vnet/session/application_interface.h4
-rw-r--r--src/vnet/session/node.c56
-rw-r--r--src/vnet/session/session.c38
-rw-r--r--src/vnet/session/session.h28
-rw-r--r--src/vnet/session/session_api.c50
-rw-r--r--src/vnet/session/transport.h66
9 files changed, 174 insertions, 92 deletions
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index a542eebe43f..513e5faca9c 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -92,6 +92,19 @@ application_del (application_t * app)
pool_put (app_pool, app);
}
+static void
+application_verify_cb_fns (application_type_t type, session_cb_vft_t * cb_fns)
+{
+ if (type == APP_SERVER && cb_fns->session_accept_callback == 0)
+ clib_warning ("No accept callback function provided");
+ if (type == APP_CLIENT && cb_fns->session_connected_callback == 0)
+ clib_warning ("No session connected callback function provided");
+ if (cb_fns->session_disconnect_callback == 0)
+ clib_warning ("No session disconnect callback function provided");
+ if (cb_fns->session_reset_callback == 0)
+ clib_warning ("No session reset callback function provided");
+}
+
application_t *
application_new (application_type_t type, session_type_t sst,
u32 api_client_index, u32 flags, session_cb_vft_t * cb_fns)
@@ -142,6 +155,9 @@ application_new (application_type_t type, session_type_t sst,
app->flags = flags;
app->cb_fns = *cb_fns;
+ /* Check that the obvious things are properly set up */
+ application_verify_cb_fns (type, cb_fns);
+
/* Add app to lookup by api_client_index table */
application_table_add (app);
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index 480828f7b34..a60a8b8b5f9 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -45,7 +45,8 @@ typedef struct _stream_session_cb_vft
void (*session_reset_callback) (stream_session_t * s);
/* Direct RX callback, for built-in servers */
- int (*builtin_server_rx_callback) (stream_session_t * session);
+ int (*builtin_server_rx_callback) (stream_session_t * session,
+ session_fifo_event_t * ep);
/* Redirect connection to local server */
int (*redirect_connect_callback) (u32 api_client_index, void *mp);
diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c
index 6ddfb70f609..4b30bd876a7 100644
--- a/src/vnet/session/application_interface.c
+++ b/src/vnet/session/application_interface.c
@@ -98,7 +98,7 @@ vnet_bind_i (u32 api_client_index, ip46_address_t * ip46, u16 port_host_order,
if (application_lookup (api_client_index))
{
- clib_warning ("Only one bind supported for now");
+ clib_warning ("Only one connection supported for now");
return VNET_API_ERROR_ADDRESS_IN_USE;
}
@@ -364,8 +364,7 @@ vnet_connect_uri (vnet_connect_args_t * a)
}
int
-vnet_disconnect_session (u32 client_index, u32 session_index,
- u32 thread_index)
+vnet_disconnect_session (u32 session_index, u32 thread_index)
{
stream_session_t *session;
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 8d87c067841..a5f2b9a6a1a 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -112,9 +112,7 @@ typedef enum
int vnet_bind_uri (vnet_bind_args_t *);
int vnet_unbind_uri (char *uri, u32 api_client_index);
int vnet_connect_uri (vnet_connect_args_t * a);
-int
-vnet_disconnect_session (u32 client_index, u32 session_index,
- u32 thread_index);
+int vnet_disconnect_session (u32 session_index, u32 thread_index);
int vnet_bind (vnet_bind_args_t * a);
int vnet_connect (vnet_connect_args_t * a);
diff --git a/src/vnet/session/node.c b/src/vnet/session/node.c
index 399077decb0..7fd7e0b7499 100644
--- a/src/vnet/session/node.c
+++ b/src/vnet/session/node.c
@@ -78,10 +78,11 @@ static u32 session_type_to_next[] = {
};
always_inline int
-session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_main_t * smm, session_fifo_event_t * e0,
- stream_session_t * s0, u32 thread_index, int *n_tx_packets,
- u8 peek_data)
+session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
+ session_manager_main_t * smm,
+ session_fifo_event_t * e0,
+ stream_session_t * s0, u32 thread_index,
+ int *n_tx_packets, u8 peek_data)
{
u32 n_trace = vlib_get_trace_count (vm, node);
u32 left_to_snd0, max_len_to_snd0, len_to_deq0, n_bufs, snd_space0;
@@ -120,7 +121,7 @@ session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node,
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
- rx_offset = transport_vft->rx_fifo_offset (tc0);
+ rx_offset = transport_vft->tx_fifo_offset (tc0);
}
/* TODO check if transport is willing to send len_to_snd0
@@ -194,25 +195,27 @@ session_fifo_rx_i (vlib_main_t * vm, vlib_node_runtime_t * node,
t0->server_thread_index = s0->thread_index;
}
+ len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
+
/* *INDENT-OFF* */
if (1)
{
ELOG_TYPE_DECLARE (e) = {
- .format = "evt-dequeue: id %d length %d",
- .format_args = "i4i4",
+ .format = "evt-deq: id %d len %d rd %d wnd %d",
+ .format_args = "i4i4i4i4",
};
struct
{
- u32 data[2];
+ u32 data[4];
} *ed;
ed = ELOG_DATA (&vm->elog_main, e);
ed->data[0] = e0->event_id;
ed->data[1] = e0->enqueue_length;
+ ed->data[2] = len_to_deq0;
+ ed->data[3] = left_to_snd0;
}
/* *INDENT-ON* */
- len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
-
/* Make room for headers */
data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
@@ -276,22 +279,25 @@ dequeue_fail:
}
int
-session_fifo_rx_peek (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_main_t * smm, session_fifo_event_t * e0,
- stream_session_t * s0, u32 thread_index, int *n_tx_pkts)
+session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
+ session_manager_main_t * smm,
+ session_fifo_event_t * e0,
+ stream_session_t * s0, u32 thread_index,
+ int *n_tx_pkts)
{
- return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts,
- 1);
+ return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
+ n_tx_pkts, 1);
}
int
-session_fifo_rx_dequeue (vlib_main_t * vm, vlib_node_runtime_t * node,
- session_manager_main_t * smm,
- session_fifo_event_t * e0, stream_session_t * s0,
- u32 thread_index, int *n_tx_pkts)
+session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
+ session_manager_main_t * smm,
+ session_fifo_event_t * e0,
+ stream_session_t * s0, u32 thread_index,
+ int *n_tx_pkts)
{
- return session_fifo_rx_i (vm, node, smm, e0, s0, thread_index, n_tx_pkts,
- 0);
+ return session_tx_fifo_read_and_snd_i (vm, node, smm, e0, s0, thread_index,
+ n_tx_pkts, 0);
}
static uword
@@ -369,12 +375,16 @@ skip_dequeue:
s0 = stream_session_get_if_valid (server_session_index0,
my_thread_index);
- if (!s0)
+
+ if (CLIB_DEBUG && !s0)
{
- clib_warning ("It's dead Jim!");
+ clib_warning ("It's dead, Jim!");
continue;
}
+ if (PREDICT_FALSE (s0->session_state == SESSION_STATE_CLOSED))
+ continue;
+
ASSERT (s0->thread_index == my_thread_index);
switch (e0->event_type)
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index b5a168ca26c..8867e794eeb 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -373,7 +373,7 @@ stream_session_lookup_transport6 (ip6_address_t * lcl, ip6_address_t * rmt,
/* Finally, try half-open connections */
rv = clib_bihash_search_inline_48_8 (&smm->v6_half_open_hash, &kv6);
if (rv == 0)
- return tp_vfts[s->session_type].get_half_open (kv6.value & 0xFFFFFFFF);
+ return tp_vfts[proto].get_half_open (kv6.value & 0xFFFFFFFF);
return 0;
}
@@ -617,7 +617,10 @@ again:
goto again;
}
else
- return SESSION_ERROR_NO_SPACE;
+ {
+ clib_warning ("No space to allocate fifos!");
+ return SESSION_ERROR_NO_SPACE;
+ }
}
return 0;
}
@@ -806,6 +809,10 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block)
evt.event_id = serial_number++;
evt.enqueue_length = svm_fifo_max_dequeue (s->server_rx_fifo);
+ /* Built-in server? Hand event to the callback... */
+ if (app->cb_fns.builtin_server_rx_callback)
+ return app->cb_fns.builtin_server_rx_callback (s, &evt);
+
/* Add event to server's event queue */
q = app->event_queue;
@@ -1043,13 +1050,9 @@ stream_session_delete (stream_session_t * s)
session_manager_main_t *smm = vnet_get_session_manager_main ();
svm_fifo_segment_private_t *fifo_segment;
application_t *app;
- int rv;
- /* delete from the main lookup table */
- rv = stream_session_table_del (smm, s);
-
- if (rv)
- clib_warning ("hash delete error, rv %d", rv);
+ /* Delete from the main lookup table. */
+ stream_session_table_del (smm, s);
/* Cleanup fifo segments */
fifo_segment = svm_fifo_get_segment (s->server_segment_index);
@@ -1197,18 +1200,30 @@ stream_session_open (u8 sst, ip46_address_t * addr, u16 port_host_byte_order,
void
stream_session_disconnect (stream_session_t * s)
{
- tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
s->session_state = SESSION_STATE_CLOSED;
+ tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
}
/**
* Cleanup transport and session state.
+ *
+ * Notify transport of the cleanup, wait for a delete notify to actually
+ * remove the session state.
*/
void
stream_session_cleanup (stream_session_t * s)
{
+ session_manager_main_t *smm = &session_manager_main;
+ int rv;
+
+ s->session_state = SESSION_STATE_CLOSED;
+
+ /* Delete from the main lookup table to avoid more enqueues */
+ rv = stream_session_table_del (smm, s);
+ if (rv)
+ clib_warning ("hash delete error, rv %d", rv);
+
tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index);
- stream_session_delete (s);
}
void
@@ -1221,7 +1236,8 @@ session_register_transport (u8 type, const transport_proto_vft_t * vft)
/* If an offset function is provided, then peek instead of dequeue */
smm->session_rx_fns[type] =
- (vft->rx_fifo_offset) ? session_fifo_rx_peek : session_fifo_rx_dequeue;
+ (vft->tx_fifo_offset) ? session_tx_fifo_peek_and_snd :
+ session_tx_fifo_dequeue_and_snd;
}
transport_proto_vft_t *
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 46e5ce2cdd0..1b712e2e59e 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -102,33 +102,33 @@ typedef CLIB_PACKED (struct
typedef struct _stream_session_t
{
+ /** fifo pointers. Once allocated, these do not move */
+ svm_fifo_t *server_rx_fifo;
+ svm_fifo_t *server_tx_fifo;
+
/** Type */
u8 session_type;
/** State */
u8 session_state;
+ u8 thread_index;
+
+ /** used during unbind processing */
+ u8 is_deleted;
+
+ /** To avoid n**2 "one event per frame" check */
+ u8 enqueue_epoch;
+
/** Session index in per_thread pool */
u32 session_index;
/** Transport specific */
u32 connection_index;
- u8 thread_index;
-
/** Application specific */
u32 pid;
- /** fifo pointers. Once allocated, these do not move */
- svm_fifo_t *server_rx_fifo;
- svm_fifo_t *server_tx_fifo;
-
- /** To avoid n**2 "one event per frame" check */
- u8 enqueue_epoch;
-
- /** used during unbind processing */
- u8 is_deleted;
-
/** stream server pool index */
u32 app_index;
@@ -162,8 +162,8 @@ typedef int
session_fifo_event_t * e0, stream_session_t * s0,
u32 thread_index, int *n_tx_pkts);
-extern session_fifo_rx_fn session_fifo_rx_peek;
-extern session_fifo_rx_fn session_fifo_rx_dequeue;
+extern session_fifo_rx_fn session_tx_fifo_peek_and_snd;
+extern session_fifo_rx_fn session_tx_fifo_dequeue_and_snd;
struct _session_manager_main
{
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 8852fc6ee08..9c38428aa36 100644
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -130,6 +130,27 @@ send_session_disconnect_uri_callback (stream_session_t * s)
vl_msg_api_send_shmem (q, (u8 *) & mp);
}
+static void
+send_session_reset_uri_callback (stream_session_t * s)
+{
+ vl_api_reset_session_t *mp;
+ unix_shared_memory_queue_t *q;
+ application_t *app = application_get (s->app_index);
+
+ q = vl_api_client_index_to_input_queue (app->api_client_index);
+
+ if (!q)
+ return;
+
+ mp = vl_msg_api_alloc (sizeof (*mp));
+ memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_RESET_SESSION);
+
+ mp->session_thread_index = s->thread_index;
+ mp->session_index = s->session_index;
+ vl_msg_api_send_shmem (q, (u8 *) & mp);
+}
+
static int
send_session_connected_uri_callback (u32 api_client_index,
stream_session_t * s, u8 is_fail)
@@ -347,6 +368,26 @@ send_session_disconnect_callback (stream_session_t * s)
vl_msg_api_send_shmem (q, (u8 *) & mp);
}
+static void
+send_session_reset_callback (stream_session_t * s)
+{
+ vl_api_reset_sock_t *mp;
+ unix_shared_memory_queue_t *q;
+ application_t *app = application_get (s->app_index);
+
+ q = vl_api_client_index_to_input_queue (app->api_client_index);
+
+ if (!q)
+ return;
+
+ mp = vl_msg_api_alloc (sizeof (*mp));
+ memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_RESET_SOCK);
+
+ mp->handle = make_session_handle (s);
+ vl_msg_api_send_shmem (q, (u8 *) & mp);
+}
+
/**
* Redirect a connect_uri message to the indicated server.
* Only sent if the server has bound the related port with
@@ -414,6 +455,7 @@ static session_cb_vft_t uri_session_cb_vft = {
.session_accept_callback = send_session_accept_uri_callback,
.session_disconnect_callback = send_session_disconnect_uri_callback,
.session_connected_callback = send_session_connected_uri_callback,
+ .session_reset_callback = send_session_reset_uri_callback,
.add_segment_callback = send_add_segment_callback,
.redirect_connect_callback = redirect_connect_uri_callback
};
@@ -422,6 +464,7 @@ static session_cb_vft_t session_cb_vft = {
.session_accept_callback = send_session_accept_callback,
.session_disconnect_callback = send_session_disconnect_callback,
.session_connected_callback = send_session_connected_callback,
+ .session_reset_callback = send_session_reset_callback,
.add_segment_callback = send_add_segment_callback,
.redirect_connect_callback = redirect_connect_callback
};
@@ -548,8 +591,8 @@ vl_api_disconnect_session_t_handler (vl_api_disconnect_session_t * mp)
rv = api_session_not_valid (mp->session_index, mp->session_thread_index);
if (!rv)
- rv = vnet_disconnect_session (mp->client_index, mp->session_index,
- mp->session_thread_index);
+ rv =
+ vnet_disconnect_session (mp->session_index, mp->session_thread_index);
REPLY_MACRO (VL_API_DISCONNECT_SESSION_REPLY);
}
@@ -572,8 +615,7 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
}
/* Disconnect has been confirmed. Confirm close to transport */
- vnet_disconnect_session (mp->client_index, mp->session_index,
- mp->session_thread_index);
+ vnet_disconnect_session (mp->session_index, mp->session_thread_index);
}
static void
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index f486dbb289f..0da30261bef 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -74,7 +74,7 @@ typedef struct _transport_proto_vft
u32 (*push_header) (transport_connection_t * tconn, vlib_buffer_t * b);
u16 (*send_mss) (transport_connection_t * tc);
u32 (*send_space) (transport_connection_t * tc);
- u32 (*rx_fifo_offset) (transport_connection_t * tc);
+ u32 (*tx_fifo_offset) (transport_connection_t * tc);
/*
* Connection retrieval
@@ -92,39 +92,39 @@ typedef struct _transport_proto_vft
} transport_proto_vft_t;
+/* *INDENT-OFF* */
/* 16 octets */
-typedef CLIB_PACKED (struct
- {
- union
- {
- struct
- {
- ip4_address_t src; ip4_address_t dst;
- u16 src_port;
- u16 dst_port;
- /* align by making this 4 octets even though its a 1-bit field
- * NOTE: avoid key overlap with other transports that use 5 tuples for
- * session identification.
- */
- u32 proto;
- };
- u64 as_u64[2];
- };
- }) v4_connection_key_t;
-
-typedef CLIB_PACKED (struct
- {
- union
- {
- struct
- {
- /* 48 octets */
- ip6_address_t src; ip6_address_t dst;
- u16 src_port;
- u16 dst_port; u32 proto; u8 unused_for_now[8];
- }; u64 as_u64[6];
- };
- }) v6_connection_key_t;
+typedef CLIB_PACKED (struct {
+ union
+ {
+ struct
+ {
+ ip4_address_t src; ip4_address_t dst;
+ u16 src_port;
+ u16 dst_port;
+ /* align by making this 4 octets even though its a 1-bit field
+ * NOTE: avoid key overlap with other transports that use 5 tuples for
+ * session identification.
+ */
+ u32 proto;
+ };
+ u64 as_u64[2];
+ };
+}) v4_connection_key_t;
+
+typedef CLIB_PACKED (struct {
+ union
+ {
+ struct
+ {
+ /* 48 octets */
+ ip6_address_t src; ip6_address_t dst;
+ u16 src_port;
+ u16 dst_port; u32 proto; u8 unused_for_now[8];
+ }; u64 as_u64[6];
+ };
+}) v6_connection_key_t;
+/* *INDENT-ON* */
typedef clib_bihash_kv_16_8_t session_kv4_t;
typedef clib_bihash_kv_48_8_t session_kv6_t;