aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2017-12-19 04:50:01 -0800
committerDave Barach <openvpp@barachs.net>2018-01-05 19:00:45 +0000
commit90a63988fa01685626b6d6a01b79ea5370f7fbac (patch)
tree69951111b8f8c43c5dbfc61cc5b133f74a58ddda /src
parente6bfeab1c352ae73a19361c038e2a06a58c035db (diff)
sock api: add infra for bootstrapping shm clients
- add function to sock client that bootstraps shm api - allow sock clients to request custom shm ring configs Change-Id: Iabc1dd4f0dc8bbf8ba24de37f4966339fcf86107 Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src')
-rw-r--r--src/tests/vnet/session/tcp_echo.c112
-rw-r--r--src/vat/api_format.c72
-rw-r--r--src/vat/main.c2
-rw-r--r--src/vat/vat.h2
-rw-r--r--src/vlibapi/vat_helper_macros.h64
-rw-r--r--src/vlibmemory/api_common.h38
-rw-r--r--src/vlibmemory/memclnt.api17
-rw-r--r--src/vlibmemory/memory_shared.c82
-rw-r--r--src/vlibmemory/memory_vlib.c4
-rw-r--r--src/vlibmemory/socket_client.c324
-rw-r--r--src/vlibmemory/socksvr_vlib.c200
-rw-r--r--src/vnet/lisp-cp/one_api.c14
-rw-r--r--src/vppinfra/file.h7
13 files changed, 679 insertions, 259 deletions
diff --git a/src/tests/vnet/session/tcp_echo.c b/src/tests/vnet/session/tcp_echo.c
index 8bdcac3a88c..30eb54dcdda 100644
--- a/src/tests/vnet/session/tcp_echo.c
+++ b/src/tests/vnet/session/tcp_echo.c
@@ -15,10 +15,12 @@
#include <stdio.h>
#include <signal.h>
+
+#include <vnet/session/application_interface.h>
#include <svm/svm_fifo_segment.h>
#include <vlibmemory/api.h>
+
#include <vpp/api/vpe_msg_enum.h>
-#include <vnet/session/application_interface.h>
#define vl_typedefs /* define message structures */
#include <vpp/api/vpe_all_api_h.h>
@@ -91,6 +93,8 @@ typedef struct
/* $$$ single thread only for the moment */
unix_shared_memory_queue_t *vpp_event_queue;
+ u8 *socket_name;
+
pid_t my_pid;
/* For deadman timers */
@@ -114,6 +118,11 @@ typedef struct
u8 test_return_packets;
u64 bytes_to_send;
+ /** Flag that decides if socket, instead of svm, api is used to connect to
+ * vpp. If sock api is used, shm binary api is subsequently bootstrapped
+ * and all other messages are exchanged using shm IPC. */
+ u8 use_sock_api;
+
/* convenience */
svm_fifo_segment_main_t *segment_main;
} uri_tcp_test_main_t;
@@ -313,15 +322,33 @@ connect_to_vpp (char *name)
uri_tcp_test_main_t *utm = &uri_tcp_test_main;
api_main_t *am = &api_main;
- if (vl_client_connect_to_vlib ("/vpe-api", name, 32) < 0)
- return -1;
-
- utm->vl_input_queue = am->shmem_hdr->vl_input_queue;
- utm->my_client_index = am->my_client_index;
+ if (utm->use_sock_api)
+ {
+ if (vl_socket_client_connect ((char *) utm->socket_name, name,
+ 0 /* default rx, tx buffer */ ))
+ return -1;
+ return vl_socket_client_init_shm (0);
+ }
+ else
+ {
+ if (vl_client_connect_to_vlib ("/vpe-api", name, 32) < 0)
+ return -1;
+ utm->vl_input_queue = am->shmem_hdr->vl_input_queue;
+ utm->my_client_index = am->my_client_index;
+ }
return 0;
}
+void
+disconnect_from_vpp (uri_tcp_test_main_t * utm)
+{
+ if (utm->use_sock_api)
+ vl_socket_client_disconnect ();
+ else
+ vl_client_disconnect_from_vlib ();
+}
+
static void
vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
{
@@ -733,7 +760,7 @@ client_disconnect (uri_tcp_test_main_t * utm)
}
static void
-client_test (uri_tcp_test_main_t * utm)
+client_run (uri_tcp_test_main_t * utm)
{
int i;
@@ -1085,8 +1112,20 @@ server_unbind (uri_tcp_test_main_t * utm)
}
void
-server_test (uri_tcp_test_main_t * utm)
+server_run (uri_tcp_test_main_t * utm)
{
+ session_t *session;
+ int i;
+
+ /* $$$$ hack preallocation */
+ for (i = 0; i < 200000; i++)
+ {
+ pool_get (utm->sessions, session);
+ memset (session, 0, sizeof (*session));
+ }
+ for (i = 0; i < 200000; i++)
+ pool_put_index (utm->sessions, i);
+
if (application_attach (utm))
return;
@@ -1124,17 +1163,17 @@ vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
session_print_stats (utm, session);
}
-#define foreach_uri_msg \
-_(BIND_URI_REPLY, bind_uri_reply) \
-_(UNBIND_URI_REPLY, unbind_uri_reply) \
-_(ACCEPT_SESSION, accept_session) \
-_(CONNECT_SESSION_REPLY, connect_session_reply) \
-_(DISCONNECT_SESSION, disconnect_session) \
-_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
-_(RESET_SESSION, reset_session) \
-_(APPLICATION_ATTACH_REPLY, application_attach_reply) \
-_(APPLICATION_DETACH_REPLY, application_detach_reply) \
-_(MAP_ANOTHER_SEGMENT, map_another_segment) \
+#define foreach_uri_msg \
+_(BIND_URI_REPLY, bind_uri_reply) \
+_(UNBIND_URI_REPLY, unbind_uri_reply) \
+_(ACCEPT_SESSION, accept_session) \
+_(CONNECT_SESSION_REPLY, connect_session_reply) \
+_(DISCONNECT_SESSION, disconnect_session) \
+_(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
+_(RESET_SESSION, reset_session) \
+_(APPLICATION_ATTACH_REPLY, application_attach_reply) \
+_(APPLICATION_DETACH_REPLY, application_detach_reply) \
+_(MAP_ANOTHER_SEGMENT, map_another_segment) \
void
uri_api_hookup (uri_tcp_test_main_t * utm)
@@ -1162,8 +1201,6 @@ main (int argc, char **argv)
u64 bytes_to_send = 64 << 10, mbytes;
u32 tmp;
mheap_t *h;
- session_t *session;
- int i;
int i_am_master = 1, drop_packets = 0, test_return_packets = 0;
clib_mem_init (0, 256 << 20);
@@ -1180,6 +1217,8 @@ main (int argc, char **argv)
utm->my_pid = getpid ();
utm->configured_segment_size = 1 << 20;
+ utm->socket_name = 0;
+ utm->use_sock_api = 1;
clib_time_init (&utm->clib_time);
init_error_string_table (utm);
@@ -1214,6 +1253,10 @@ main (int argc, char **argv)
{
bytes_to_send = mbytes << 30;
}
+ else if (unformat (a, "socket-name %s", &utm->socket_name))
+ ;
+ else if (unformat (a, "use-svm-api"))
+ utm->use_sock_api = 0;
else
{
fformat (stderr, "%s: usage [master|slave]\n");
@@ -1221,6 +1264,9 @@ main (int argc, char **argv)
}
}
+ if (!utm->socket_name)
+ utm->socket_name = format (0, "%s%c", API_SOCKET_FILE, 0);
+
if (uri)
{
utm->uri = format (0, "%s%c", uri, 0);
@@ -1242,7 +1288,8 @@ main (int argc, char **argv)
setup_signal_handlers ();
uri_api_hookup (utm);
- if (connect_to_vpp (i_am_master ? "uri_tcp_server" : "uri_tcp_client") < 0)
+ if (connect_to_vpp (i_am_master ? "tcp_echo_server" : "tcp_echo_client") <
+ 0)
{
svm_region_exit ();
fformat (stderr, "Couldn't connect to vpe, exiting...\n");
@@ -1250,24 +1297,11 @@ main (int argc, char **argv)
}
if (i_am_master == 0)
- {
- client_test (utm);
- vl_client_disconnect_from_vlib ();
- exit (0);
- }
-
- /* $$$$ hack preallocation */
- for (i = 0; i < 200000; i++)
- {
- pool_get (utm->sessions, session);
- memset (session, 0, sizeof (*session));
- }
- for (i = 0; i < 200000; i++)
- pool_put_index (utm->sessions, i);
-
- server_test (utm);
+ client_run (utm);
+ else
+ server_run (utm);
- vl_client_disconnect_from_vlib ();
+ disconnect_from_vpp (utm);
exit (0);
}
diff --git a/src/vat/api_format.c b/src/vat/api_format.c
index 445b4962b36..88c64473518 100644
--- a/src/vat/api_format.c
+++ b/src/vat/api_format.c
@@ -88,9 +88,9 @@ vl (void *p)
int
vat_socket_connect (vat_main_t * vam)
{
- return vl_socket_client_connect
- (&vam->socket_client_main, (char *) vam->socket_name,
- "vpp_api_test(s)", 0 /* default socket rx, tx buffer */ );
+ vam->socket_client_main = &socket_client_main;
+ return vl_socket_client_connect ((char *) vam->socket_name, "vpp_api_test",
+ 0 /* default socket rx, tx buffer */ );
}
#else /* vpp built-in case, we don't do sockets... */
int
@@ -99,10 +99,23 @@ vat_socket_connect (vat_main_t * vam)
return 0;
}
-void
-vl_socket_client_read_reply (socket_client_main_t * scm)
+int
+vl_socket_client_read (int wait)
+{
+ return -1;
+};
+
+int
+vl_socket_client_write ()
{
+ return -1;
};
+
+void *
+vl_socket_client_msg_alloc (int nbytes)
+{
+ return 0;
+}
#endif
@@ -1464,7 +1477,8 @@ static void vl_api_control_ping_reply_t_handler
vam->retval = retval;
vam->result_ready = 1;
}
- vam->socket_client_main.control_pings_outstanding--;
+ if (vam->socket_client_main)
+ vam->socket_client_main->control_pings_outstanding--;
}
static void vl_api_control_ping_reply_t_handler_json
@@ -2186,7 +2200,7 @@ static void vl_api_memfd_segment_create_reply_t_handler
#if VPP_API_TEST_BUILTIN == 0
vat_main_t *vam = &vat_main;
api_main_t *am = &api_main;
- socket_client_main_t *scm = &vam->socket_client_main;
+ socket_client_main_t *scm = vam->socket_client_main;
int my_fd = -1;
clib_error_t *error;
memfd_private_t memfd;
@@ -2224,8 +2238,7 @@ static void vl_api_memfd_segment_create_reply_t_handler
32 /* input_queue_length */ );
vam->vl_input_queue = am->shmem_hdr->vl_input_queue;
- vl_socket_client_enable_disable (&vam->socket_client_main,
- 0 /* disable socket */ );
+ vl_socket_client_enable_disable (0 /* disable socket */ );
}
out:
@@ -21866,6 +21879,46 @@ api_memfd_segment_create (vat_main_t * vam)
}
static int
+api_sock_init_shm (vat_main_t * vam)
+{
+#if VPP_API_TEST_BUILTIN == 0
+ unformat_input_t *i = vam->input;
+ vl_api_shm_elem_config_t *config = 0;
+ u64 size = 64 << 20;
+ int rv;
+
+ while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (i, "size %U", unformat_memory_size, &size))
+ ;
+ else
+ break;
+ }
+
+ /* Try customized config to see if it works */
+ vec_validate (config, 3);
+ config[0].type = VL_API_VLIB_RING;
+ config[0].count = 256;
+ config[0].size = 256;
+ config[1].type = VL_API_CLIENT_RING;
+ config[1].count = 256;
+ config[1].size = 1024;
+ config[2].type = VL_API_CLIENT_RING;
+ config[2].count = 8;
+ config[2].size = 4096;
+ config[3].type = VL_API_QUEUE;
+ config[3].count = 256;
+ config[3].size = sizeof (uword);
+ rv = vl_socket_client_init_shm (config);
+ if (!rv)
+ vam->client_index_invalid = 1;
+ return rv;
+#else
+ return -99;
+#endif
+}
+
+static int
api_dns_enable_disable (vat_main_t * vam)
{
unformat_input_t *line_input = vam->input;
@@ -23093,6 +23146,7 @@ _(sw_interface_set_lldp, "<intfc> | sw_if_index <nn> [port-desc <description>]\n
" [mgmt-ip4 <ip4>] [mgmt-ip6 <ip6>] [mgmt-oid <object id>] [disable]") \
_(tcp_configure_src_addresses, "<ip4|6>first-<ip4|6>last [vrf <id>]") \
_(memfd_segment_create,"size <nnn>") \
+_(sock_init_shm, "size <nnn>") \
_(app_namespace_add_del, "[add] id <ns-id> secret <nn> sw_if_index <nn>")\
_(dns_enable_disable, "[enable][disable]") \
_(dns_name_server_add_del, "<ip-address> [del]") \
diff --git a/src/vat/main.c b/src/vat/main.c
index aa990a31d93..723c5e8cc3a 100644
--- a/src/vat/main.c
+++ b/src/vat/main.c
@@ -389,7 +389,7 @@ main (int argc, char **argv)
if (vam->socket_name && vat_socket_connect (vam))
fformat (stderr, "WARNING: socket connection failed");
- if (vam->socket_client_main.socket_fd == 0
+ if ((!vam->socket_client_main || vam->socket_client_main->socket_fd == 0)
&& connect_to_vpe ("vpp_api_test") < 0)
{
svm_region_exit ();
diff --git a/src/vat/vat.h b/src/vat/vat.h
index 1ae46f30a7e..ddb4644848a 100644
--- a/src/vat/vat.h
+++ b/src/vat/vat.h
@@ -209,7 +209,7 @@ typedef struct
ip4_nbr_counter_t **ip4_nbr_counters;
ip6_nbr_counter_t **ip6_nbr_counters;
- socket_client_main_t socket_client_main;
+ socket_client_main_t *socket_client_main;
u8 *socket_name;
/* Convenience */
diff --git a/src/vlibapi/vat_helper_macros.h b/src/vlibapi/vat_helper_macros.h
index fd2e563512f..52fdcb1cb5a 100644
--- a/src/vlibapi/vat_helper_macros.h
+++ b/src/vlibapi/vat_helper_macros.h
@@ -22,13 +22,10 @@
/* M: construct, but don't yet send a message */
#define M(T, mp) \
do { \
- socket_client_main_t *scm = &vam->socket_client_main; \
+ socket_client_main_t *scm = vam->socket_client_main; \
vam->result_ready = 0; \
- if (scm->socket_enable) \
- { \
- mp = (void *)scm->socket_tx_buffer; \
- scm->socket_tx_nbytes = sizeof (*mp); \
- } \
+ if (scm && scm->socket_enable) \
+ mp = vl_socket_client_msg_alloc (sizeof(*mp)); \
else \
mp = vl_msg_api_alloc_as_if_client(sizeof(*mp)); \
memset (mp, 0, sizeof (*mp)); \
@@ -39,30 +36,25 @@ do { \
/* MPING: construct a control-ping message, don't send it yet */
#define MPING(T, mp) \
do { \
- socket_client_main_t *scm = &vam->socket_client_main; \
+ socket_client_main_t *scm = vam->socket_client_main; \
vam->result_ready = 0; \
- if (scm->socket_enable) \
- { \
- mp = (void *)scm->socket_tx_buffer; \
- scm->socket_tx_nbytes = sizeof (*mp); \
- } \
+ if (scm && scm->socket_enable) \
+ mp = vl_socket_client_msg_alloc (sizeof(*mp)); \
else \
mp = vl_msg_api_alloc_as_if_client(sizeof(*mp)); \
memset (mp, 0, sizeof (*mp)); \
mp->_vl_msg_id = ntohs (VL_API_##T+__plugin_msg_base); \
mp->client_index = vam->my_client_index; \
- scm->control_pings_outstanding++; \
+ if (scm) \
+ scm->control_pings_outstanding++; \
} while(0);
#define M2(T, mp, n) \
do { \
- socket_client_main_t *scm = &vam->socket_client_main; \
+ socket_client_main_t *scm = vam->socket_client_main; \
vam->result_ready = 0; \
- if (scm->socket_enable) \
- { \
- mp = (void *)scm->socket_tx_buffer; \
- scm->socket_tx_nbytes = sizeof (*mp) + n; \
- } \
+ if (scm && scm->socket_enable) \
+ mp = vl_socket_client_msg_alloc (sizeof(*mp)); \
else \
mp = vl_msg_api_alloc_as_if_client(sizeof(*mp) + n); \
memset (mp, 0, sizeof (*mp)); \
@@ -73,27 +65,9 @@ do { \
/* S: send a message */
#define S(mp) \
do { \
- int n; \
- socket_client_main_t *scm = &vam->socket_client_main; \
- if (scm->socket_enable) \
- { \
- msgbuf_t msgbuf = \
- { \
- .q = 0, \
- .gc_mark_timestamp = 0, \
- .data_len = htonl(scm->socket_tx_nbytes), \
- }; \
- \
- /* coverity[UNINIT] */ \
- n = write (scm->socket_fd, &msgbuf, sizeof (msgbuf)); \
- if (n < sizeof (msgbuf)) \
- clib_unix_warning ("socket write (msgbuf)"); \
- \
- n = write (scm->socket_fd, scm->socket_tx_buffer, \
- scm->socket_tx_nbytes); \
- if (n < scm->socket_tx_nbytes) \
- clib_unix_warning ("socket write (msg)"); \
- } \
+ socket_client_main_t *scm = vam->socket_client_main; \
+ if (scm && scm->socket_enable) \
+ vl_socket_client_write (); \
else \
vl_msg_api_send_shmem (vam->vl_input_queue, (u8 *)&mp); \
} while (0);
@@ -102,10 +76,11 @@ do { \
#define W(ret) \
do { \
f64 timeout = vat_time_now (vam) + 1.0; \
- socket_client_main_t *scm = &vam->socket_client_main; \
+ socket_client_main_t *scm = vam->socket_client_main; \
ret = -99; \
\
- vl_socket_client_read_reply (scm); \
+ if (scm && scm->socket_enable) \
+ vl_socket_client_read (5); \
while (vat_time_now (vam) < timeout) { \
if (vam->result_ready == 1) { \
ret = vam->retval; \
@@ -119,10 +94,11 @@ do { \
#define W2(ret, body) \
do { \
f64 timeout = vat_time_now (vam) + 1.0; \
- socket_client_main_t *scm = &vam->socket_client_main; \
+ socket_client_main_t *scm = vam->socket_client_main; \
ret = -99; \
\
- vl_socket_client_read_reply (scm); \
+ if (scm && scm->socket_enable) \
+ vl_socket_client_read (5); \
while (vat_time_now (vam) < timeout) { \
if (vam->result_ready == 1) { \
(body); \
diff --git a/src/vlibmemory/api_common.h b/src/vlibmemory/api_common.h
index 2080a4bb18e..bd0da10d569 100644
--- a/src/vlibmemory/api_common.h
+++ b/src/vlibmemory/api_common.h
@@ -43,6 +43,24 @@ typedef struct ring_alloc_
u32 misses;
} ring_alloc_t;
+typedef enum
+{
+ VL_API_VLIB_RING,
+ VL_API_CLIENT_RING,
+ VL_API_QUEUE
+} vl_api_shm_config_type_t;
+
+typedef struct vl_api_ring_config_
+{
+ u8 type;
+ u8 _pad;
+ u16 count;
+ u32 size;
+} vl_api_shm_elem_config_t;
+
+STATIC_ASSERT (sizeof (vl_api_shm_elem_config_t) == 8,
+ "Size must be exactly 8 bytes");
+
/*
* Initializers for the (shared-memory) rings
* _(size, n). Note: each msg has space for a header.
@@ -129,8 +147,8 @@ u16 vl_client_get_first_plugin_msg_id (const char *plugin_name);
void vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length);
u32 vl_api_memclnt_create_internal (char *, unix_shared_memory_queue_t *);
-void vl_init_shmem (svm_region_t * vlib_rp, int is_vlib,
- int is_private_region);
+void vl_init_shmem (svm_region_t * vlib_rp, vl_api_shm_elem_config_t * config,
+ int is_vlib, int is_private_region);
void vl_client_install_client_message_handlers (void);
void vl_api_send_pending_rpc_requests (vlib_main_t * vm);
@@ -197,6 +215,9 @@ typedef struct
u8 *socket_rx_buffer;
u32 socket_tx_nbytes;
int control_pings_outstanding;
+
+ u8 *name;
+ clib_time_t clib_time;
} socket_client_main_t;
extern socket_client_main_t socket_client_main;
@@ -226,11 +247,14 @@ vl_api_registration_t *sockclnt_get_registration (u32 index);
void vl_api_socket_process_msg (clib_file_t * uf, vl_api_registration_t * rp,
i8 * input_v);
-int
-vl_socket_client_connect (socket_client_main_t * scm, char *socket_path,
- char *client_name, u32 socket_buffer_size);
-void vl_socket_client_read_reply (socket_client_main_t * scm);
-void vl_socket_client_enable_disable (socket_client_main_t * scm, int enable);
+int vl_socket_client_connect (char *socket_path, char *client_name,
+ u32 socket_buffer_size);
+int vl_socket_client_init_shm (vl_api_shm_elem_config_t * config);
+void vl_socket_client_disconnect (void);
+int vl_socket_client_read (int wait);
+int vl_socket_client_write (void);
+void vl_socket_client_enable_disable (int enable);
+void *vl_socket_client_msg_alloc (int nbytes);
#endif /* included_vlibmemory_api_common_h */
diff --git a/src/vlibmemory/memclnt.api b/src/vlibmemory/memclnt.api
index 6d6a1fe06dc..20a73f2b3c2 100644
--- a/src/vlibmemory/memclnt.api
+++ b/src/vlibmemory/memclnt.api
@@ -184,6 +184,23 @@ define memfd_segment_create_reply
};
/*
+ * Initialize shm api over socket api
+ */
+define sock_init_shm {
+ u32 client_index;
+ u32 context;
+ u32 requested_size;
+ u8 nitems;
+ u64 configs[nitems];
+};
+
+define sock_init_shm_reply {
+ u32 client_index;
+ u32 context;
+ i32 retval;
+};
+
+/*
* Memory client ping / response
* Only sent on inactive connections
*/
diff --git a/src/vlibmemory/memory_shared.c b/src/vlibmemory/memory_shared.c
index 7af2433b44e..0a3c23cbb3e 100644
--- a/src/vlibmemory/memory_shared.c
+++ b/src/vlibmemory/memory_shared.c
@@ -364,21 +364,11 @@ vl_set_api_pvt_heap_size (u64 size)
am->api_pvt_heap_size = size;
}
-void
-vl_init_shmem (svm_region_t * vlib_rp, int is_vlib, int is_private_region)
+static void
+vl_api_default_mem_config (vl_shmem_hdr_t * shmem_hdr)
{
api_main_t *am = &api_main;
- vl_shmem_hdr_t *shmem_hdr = 0;
u32 vlib_input_queue_length;
- void *oldheap;
- ASSERT (vlib_rp);
-
- /* $$$$ need private region config parameters */
-
- oldheap = svm_push_data_heap (vlib_rp);
-
- vec_validate (shmem_hdr, 0);
- shmem_hdr->version = VL_SHM_VERSION;
/* vlib main input queue */
vlib_input_queue_length = 1024;
@@ -389,7 +379,6 @@ vl_init_shmem (svm_region_t * vlib_rp, int is_vlib, int is_private_region)
unix_shared_memory_queue_init (vlib_input_queue_length, sizeof (uword),
getpid (), am->vlib_signal);
- /* Set up the msg ring allocator */
#define _(sz,n) \
do { \
ring_alloc_t _rp; \
@@ -417,6 +406,70 @@ vl_init_shmem (svm_region_t * vlib_rp, int is_vlib, int is_private_region)
foreach_clnt_aring_size;
#undef _
+}
+
+void
+vl_api_mem_config (vl_shmem_hdr_t * hdr, vl_api_shm_elem_config_t * config)
+{
+ api_main_t *am = &api_main;
+ vl_api_shm_elem_config_t *c;
+ ring_alloc_t *rp;
+ u32 size;
+
+ if (!config)
+ {
+ vl_api_default_mem_config (hdr);
+ return;
+ }
+
+ vec_foreach (c, config)
+ {
+ switch (c->type)
+ {
+ case VL_API_QUEUE:
+ hdr->vl_input_queue = unix_shared_memory_queue_init (c->count,
+ c->size,
+ getpid (),
+ am->vlib_signal);
+ continue;
+ case VL_API_VLIB_RING:
+ vec_add2 (hdr->vl_rings, rp, 1);
+ break;
+ case VL_API_CLIENT_RING:
+ vec_add2 (hdr->client_rings, rp, 1);
+ break;
+ default:
+ clib_warning ("unknown config type: %d", c->type);
+ continue;
+ }
+
+ size = sizeof (ring_alloc_t) + c->size;
+ rp->rp = unix_shared_memory_queue_init (c->count, size, 0, 0);
+ rp->size = size;
+ rp->nitems = c->count;
+ rp->hits = 0;
+ rp->misses = 0;
+ }
+}
+
+void
+vl_init_shmem (svm_region_t * vlib_rp, vl_api_shm_elem_config_t * config,
+ int is_vlib, int is_private_region)
+{
+ api_main_t *am = &api_main;
+ vl_shmem_hdr_t *shmem_hdr = 0;
+ void *oldheap;
+ ASSERT (vlib_rp);
+
+ /* $$$$ need private region config parameters */
+
+ oldheap = svm_push_data_heap (vlib_rp);
+
+ vec_validate (shmem_hdr, 0);
+ shmem_hdr->version = VL_SHM_VERSION;
+
+ /* Set up the queue and msg ring allocator */
+ vl_api_mem_config (shmem_hdr, config);
if (is_private_region == 0)
{
@@ -581,7 +634,8 @@ vl_map_shmem (const char *region_name, int is_vlib)
}
/* Nope, it's our problem... */
- vl_init_shmem (vlib_rp, 1 /* is vlib */ , 0 /* is_private_region */ );
+ vl_init_shmem (vlib_rp, 0 /* default config */ , 1 /* is vlib */ ,
+ 0 /* is_private_region */ );
vec_add1 (am->mapped_shmem_regions, vlib_rp);
return 0;
diff --git a/src/vlibmemory/memory_vlib.c b/src/vlibmemory/memory_vlib.c
index 1c099ed0a3e..805438152ce 100644
--- a/src/vlibmemory/memory_vlib.c
+++ b/src/vlibmemory/memory_vlib.c
@@ -1426,7 +1426,7 @@ vl_api_client_command (vlib_main_t * vm,
if (!pool_elts (am->vl_clients))
goto socket_clients;
vlib_cli_output (vm, "Shared memory clients");
- vlib_cli_output (vm, "%16s %8s %14s %18s %s",
+ vlib_cli_output (vm, "%20s %8s %14s %18s %s",
"Name", "PID", "Queue Length", "Queue VA", "Health");
/* *INDENT-OFF* */
@@ -1443,7 +1443,7 @@ vl_api_client_command (vlib_main_t * vm,
q = regp->vl_input_queue;
- vlib_cli_output (vm, "%16s %8d %14d 0x%016llx %s\n",
+ vlib_cli_output (vm, "%20s %8d %14d 0x%016llx %s\n",
regp->name, q->consumer_pid, q->cursize,
q, health);
}
diff --git a/src/vlibmemory/socket_client.c b/src/vlibmemory/socket_client.c
index b60fd4f6229..d7a9ad5fe86 100644
--- a/src/vlibmemory/socket_client.c
+++ b/src/vlibmemory/socket_client.c
@@ -21,6 +21,8 @@
#include <stdlib.h>
#include <setjmp.h>
#include <sys/types.h>
+#define __USE_GNU
+#include <sys/socket.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <netinet/in.h>
@@ -43,6 +45,7 @@
#include <vlib/vlib.h>
#include <vlib/unix/unix.h>
+#include <svm/memfd.h>
#include <vlibmemory/api.h>
#include <vlibmemory/vl_memory_msg_enum.h>
@@ -65,22 +68,26 @@ socket_client_main_t socket_client_main;
/* Debug aid */
u32 vl (void *p) __attribute__ ((weak));
+
u32
vl (void *p)
{
return vec_len (p);
}
-void
-vl_socket_client_read_reply (socket_client_main_t * scm)
+int
+vl_socket_client_read (int wait)
{
+ socket_client_main_t *scm = &socket_client_main;
int n, current_rx_index;
- msgbuf_t *mbp;
+ msgbuf_t *mbp = 0;
+ f64 timeout;
- if (scm->socket_fd == 0 || scm->socket_enable == 0)
- return;
+ if (scm->socket_fd == 0)
+ return -1;
- mbp = 0;
+ if (wait)
+ timeout = clib_time_now (&scm->clib_time) + wait;
while (1)
{
@@ -96,7 +103,7 @@ vl_socket_client_read_reply (socket_client_main_t * scm)
if (n < 0)
{
clib_unix_warning ("socket_read");
- return;
+ return -1;
}
_vec_len (scm->socket_rx_buffer) += n;
}
@@ -127,20 +134,207 @@ vl_socket_client_read_reply (socket_client_main_t * scm)
&& scm->control_pings_outstanding == 0)
break;
}
+
+ if (wait && clib_time_now (&scm->clib_time) >= timeout)
+ return -1;
+ }
+ return 0;
+}
+
+int
+vl_socket_client_write (void)
+{
+ socket_client_main_t *scm = &socket_client_main;
+ int n;
+
+ msgbuf_t msgbuf = {
+ .q = 0,
+ .gc_mark_timestamp = 0,
+ .data_len = htonl (scm->socket_tx_nbytes),
+ };
+
+ n = write (scm->socket_fd, &msgbuf, sizeof (msgbuf));
+ if (n < sizeof (msgbuf))
+ {
+ clib_unix_warning ("socket write (msgbuf)");
+ return -1;
+ }
+
+ n = write (scm->socket_fd, scm->socket_tx_buffer, scm->socket_tx_nbytes);
+ if (n < scm->socket_tx_nbytes)
+ {
+ clib_unix_warning ("socket write (msg)");
+ return -1;
+ }
+
+ return n;
+}
+
+void *
+vl_socket_client_msg_alloc (int nbytes)
+{
+ socket_client_main.socket_tx_nbytes = nbytes;
+ return ((void *) socket_client_main.socket_tx_buffer);
+}
+
+void
+vl_socket_client_disconnect (void)
+{
+ socket_client_main_t *scm = &socket_client_main;
+ if (scm->socket_fd && (close (scm->socket_fd) < 0))
+ clib_unix_warning ("close");
+ scm->socket_fd = 0;
+}
+
+void
+vl_socket_client_enable_disable (int enable)
+{
+ socket_client_main_t *scm = &socket_client_main;
+ scm->socket_enable = enable;
+}
+
+static clib_error_t *
+receive_fd_msg (int socket_fd, int *my_fd)
+{
+ char msgbuf[16];
+ char ctl[CMSG_SPACE (sizeof (int)) + CMSG_SPACE (sizeof (struct ucred))];
+ struct msghdr mh = { 0 };
+ struct iovec iov[1];
+ ssize_t size;
+ struct ucred *cr = 0;
+ struct cmsghdr *cmsg;
+ pid_t pid __attribute__ ((unused));
+ uid_t uid __attribute__ ((unused));
+ gid_t gid __attribute__ ((unused));
+
+ iov[0].iov_base = msgbuf;
+ iov[0].iov_len = 5;
+ mh.msg_iov = iov;
+ mh.msg_iovlen = 1;
+ mh.msg_control = ctl;
+ mh.msg_controllen = sizeof (ctl);
+
+ memset (ctl, 0, sizeof (ctl));
+
+ /* receive the incoming message */
+ size = recvmsg (socket_fd, &mh, 0);
+ if (size != 5)
+ {
+ return (size == 0) ? clib_error_return (0, "disconnected") :
+ clib_error_return_unix (0, "recvmsg: malformed message (fd %d)",
+ socket_fd);
+ }
+
+ cmsg = CMSG_FIRSTHDR (&mh);
+ while (cmsg)
+ {
+ if (cmsg->cmsg_level == SOL_SOCKET)
+ {
+ if (cmsg->cmsg_type == SCM_CREDENTIALS)
+ {
+ cr = (struct ucred *) CMSG_DATA (cmsg);
+ uid = cr->uid;
+ gid = cr->gid;
+ pid = cr->pid;
+ }
+ else if (cmsg->cmsg_type == SCM_RIGHTS)
+ {
+ clib_memcpy (my_fd, CMSG_DATA (cmsg), sizeof (int));
+ }
+ }
+ cmsg = CMSG_NXTHDR (&mh, cmsg);
+ }
+ return 0;
+}
+
+static void vl_api_sock_init_shm_reply_t_handler
+ (vl_api_sock_init_shm_reply_t * mp)
+{
+ socket_client_main_t *scm = &socket_client_main;
+ int my_fd = -1;
+ clib_error_t *error;
+ i32 retval = ntohl (mp->retval);
+ memfd_private_t memfd;
+ api_main_t *am = &api_main;
+ u8 *new_name;
+
+ if (retval)
+ {
+ clib_warning ("failed to init shmem");
+ return;
+ }
+
+ /*
+ * Check the socket for the magic fd
+ */
+ error = receive_fd_msg (scm->socket_fd, &my_fd);
+ if (error)
+ {
+ retval = -99;
+ return;
}
+
+ memset (&memfd, 0, sizeof (memfd));
+ memfd.fd = my_fd;
+
+ /* Note: this closes memfd.fd */
+ retval = memfd_slave_init (&memfd);
+ if (retval)
+ clib_warning ("WARNING: segment map returned %d", retval);
+
+ /*
+ * Pivot to the memory client segment that vpp just created
+ */
+ am->vlib_rp = (void *) (memfd.requested_va + MMAP_PAGESIZE);
+ am->shmem_hdr = (void *) am->vlib_rp->user_ctx;
+
+ new_name = format (0, "%v[shm]%c", scm->name, 0);
+ vl_client_install_client_message_handlers ();
+ vl_client_connect_to_vlib_no_map ("pvt", (char *) new_name,
+ 32 /* input_queue_length */ );
+ vl_socket_client_enable_disable (0);
+ vec_free (new_name);
+}
+
+static void
+vl_api_sockclnt_create_reply_t_handler (vl_api_sockclnt_create_reply_t * mp)
+{
+ socket_client_main_t *scm = &socket_client_main;
+ if (!mp->response)
+ scm->socket_enable = 1;
+}
+
+#define foreach_sock_client_api_msg \
+_(SOCKCLNT_CREATE_REPLY, sockclnt_create_reply) \
+_(SOCK_INIT_SHM_REPLY, sock_init_shm_reply) \
+
+static void
+noop_handler (void *notused)
+{
+}
+
+void
+vl_sock_client_install_message_handlers (void)
+{
+
+#define _(N,n) \
+ vl_msg_api_set_handlers(VL_API_##N, #n, \
+ vl_api_##n##_t_handler, \
+ noop_handler, \
+ vl_api_##n##_t_endian, \
+ vl_api_##n##_t_print, \
+ sizeof(vl_api_##n##_t), 1);
+ foreach_sock_client_api_msg;
+#undef _
}
int
-vl_socket_client_connect (socket_client_main_t * scm, char *socket_path,
- char *client_name, u32 socket_buffer_size)
+vl_socket_client_connect (char *socket_path, char *client_name,
+ u32 socket_buffer_size)
{
- char buffer[256];
- char *rdptr;
- int n, total_bytes;
- vl_api_sockclnt_create_reply_t *rp;
+ socket_client_main_t *scm = &socket_client_main;
vl_api_sockclnt_create_t *mp;
- clib_socket_t *sock = &scm->client_socket;
- msgbuf_t *mbp;
+ clib_socket_t *sock;
clib_error_t *error;
/* Already connected? */
@@ -151,84 +345,74 @@ vl_socket_client_connect (socket_client_main_t * scm, char *socket_path,
if (socket_path == 0 || client_name == 0)
return (-3);
+ sock = &scm->client_socket;
sock->config = socket_path;
sock->flags = CLIB_SOCKET_F_IS_CLIENT | CLIB_SOCKET_F_SEQPACKET;
- error = clib_socket_init (sock);
-
- if (error)
+ if ((error = clib_socket_init (sock)))
{
clib_error_report (error);
return (-1);
}
- scm->socket_fd = sock->fd;
+ vl_sock_client_install_message_handlers ();
- mbp = (msgbuf_t *) buffer;
- mbp->q = 0;
- mbp->data_len = htonl (sizeof (*mp));
- mbp->gc_mark_timestamp = 0;
+ scm->socket_fd = sock->fd;
+ scm->socket_buffer_size = socket_buffer_size ? socket_buffer_size :
+ SOCKET_CLIENT_DEFAULT_BUFFER_SIZE;
+ vec_validate (scm->socket_tx_buffer, scm->socket_buffer_size - 1);
+ vec_validate (scm->socket_rx_buffer, scm->socket_buffer_size - 1);
+ _vec_len (scm->socket_rx_buffer) = 0;
+ _vec_len (scm->socket_tx_buffer) = 0;
+ scm->name = format (0, "%s", client_name);
- mp = (vl_api_sockclnt_create_t *) mbp->data;
+ mp = vl_socket_client_msg_alloc (sizeof (*mp));
mp->_vl_msg_id = htons (VL_API_SOCKCLNT_CREATE);
strncpy ((char *) mp->name, client_name, sizeof (mp->name) - 1);
mp->name[sizeof (mp->name) - 1] = 0;
mp->context = 0xfeedface;
- n = write (scm->socket_fd, mbp, sizeof (*mbp) + sizeof (*mp));
- if (n < 0)
- {
- clib_unix_warning ("socket write (msg)");
- return (-1);
- }
+ if (vl_socket_client_write () <= 0)
+ return (-1);
- memset (buffer, 0, sizeof (buffer));
+ if (vl_socket_client_read (1))
+ return (-1);
- total_bytes = 0;
- rdptr = buffer;
- do
+ clib_time_init (&scm->clib_time);
+ return (0);
+}
+
+int
+vl_socket_client_init_shm (vl_api_shm_elem_config_t * config)
+{
+ vl_api_sock_init_shm_t *mp;
+ int rv, i;
+ u64 *cfg;
+
+ mp = vl_socket_client_msg_alloc (sizeof (*mp) +
+ vec_len (config) * sizeof (u64));
+ memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_SOCK_INIT_SHM);
+ mp->client_index = ~0;
+ mp->requested_size = 64 << 20;
+
+ if (config)
{
- n = read (scm->socket_fd, rdptr, sizeof (buffer) - (rdptr - buffer));
- if (n < 0)
+ for (i = 0; i < vec_len (config); i++)
{
- clib_unix_warning ("socket read");
+ cfg = (u64 *) & config[i];
+ mp->configs[i] = *cfg;
}
- total_bytes += n;
- rdptr += n;
- }
- while (total_bytes < sizeof (vl_api_sockclnt_create_reply_t)
- + sizeof (msgbuf_t));
-
- rp = (vl_api_sockclnt_create_reply_t *) (buffer + sizeof (msgbuf_t));
- if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_CREATE_REPLY)
- {
- clib_warning ("connect reply got msg id %d\n", ntohs (rp->_vl_msg_id));
- return (-1);
+ mp->nitems = vec_len (config);
}
+ rv = vl_socket_client_write ();
+ if (rv <= 0)
+ return rv;
- /* allocate tx, rx buffers */
- scm->socket_buffer_size = socket_buffer_size ? socket_buffer_size :
- SOCKET_CLIENT_DEFAULT_BUFFER_SIZE;
- vec_validate (scm->socket_tx_buffer, scm->socket_buffer_size - 1);
- vec_validate (scm->socket_rx_buffer, scm->socket_buffer_size - 1);
- _vec_len (scm->socket_rx_buffer) = 0;
- scm->socket_enable = 1;
+ if (vl_socket_client_read (1))
+ return -1;
- return (0);
-}
-
-void
-vl_socket_client_disconnect (socket_client_main_t * scm)
-{
- if (scm->socket_fd && (close (scm->socket_fd) < 0))
- clib_unix_warning ("close");
- scm->socket_fd = 0;
-}
-
-void
-vl_socket_client_enable_disable (socket_client_main_t * scm, int enable)
-{
- scm->socket_enable = enable;
+ return 0;
}
/*
diff --git a/src/vlibmemory/socksvr_vlib.c b/src/vlibmemory/socksvr_vlib.c
index 1a263e7bf37..314e2eb60d3 100644
--- a/src/vlibmemory/socksvr_vlib.c
+++ b/src/vlibmemory/socksvr_vlib.c
@@ -62,13 +62,13 @@ dump_socket_clients (vlib_main_t * vm, api_main_t * am)
return;
vlib_cli_output (vm, "Socket clients");
- vlib_cli_output (vm, "%16s %8s", "Name", "Fildesc");
+ vlib_cli_output (vm, "%20s %8s", "Name", "Fildesc");
/* *INDENT-OFF* */
pool_foreach (reg, sm->registration_pool,
({
if (reg->registration_type == REGISTRATION_TYPE_SOCKET_SERVER) {
f = pool_elt_at_index (fm->file_pool, reg->clib_file_index);
- vlib_cli_output (vm, "%16s %8d",
+ vlib_cli_output (vm, "%20s %8d",
reg->name, f->file_descriptor);
}
}));
@@ -78,13 +78,15 @@ dump_socket_clients (vlib_main_t * vm, api_main_t * am)
void
vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
{
- u16 msg_id = ntohs (*(u16 *) elem);
- api_main_t *am = &api_main;
- msgbuf_t *mb = (msgbuf_t *) (elem - offsetof (msgbuf_t, data));
#if CLIB_DEBUG > 1
u32 output_length;
#endif
- clib_file_t *cf = rp->clib_file_index + file_main.file_pool;
+ socket_main_t *sm = &socket_main;
+ u16 msg_id = ntohs (*(u16 *) elem);
+ api_main_t *am = &api_main;
+ msgbuf_t *mb = (msgbuf_t *) (elem - offsetof (msgbuf_t, data));
+ clib_file_t *cf = clib_file_get (&file_main, rp->clib_file_index);
+ vl_api_registration_t *sock_rp;
ASSERT (rp->registration_type > REGISTRATION_TYPE_SHMEM);
@@ -95,16 +97,15 @@ vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
return;
}
+ sock_rp = pool_elt_at_index (sm->registration_pool,
+ rp->vl_api_registration_pool_index);
+ ASSERT (sock_rp);
+
/* Add the msgbuf_t to the output vector */
- vl_socket_add_pending_output_no_flush (cf,
- rp->vl_api_registration_pool_index +
- socket_main.registration_pool,
- (u8 *) mb, sizeof (*mb));
+ vl_socket_add_pending_output_no_flush (cf, sock_rp, (u8 *) mb,
+ sizeof (*mb));
/* Send the message */
- vl_socket_add_pending_output (cf,
- rp->vl_api_registration_pool_index
- + socket_main.registration_pool,
- elem, ntohl (mb->data_len));
+ vl_socket_add_pending_output (cf, sock_rp, elem, ntohl (mb->data_len));
#if CLIB_DEBUG > 1
output_length = sizeof (*mb) + ntohl (mb->data_len);
@@ -318,7 +319,6 @@ vl_socket_write_ready (clib_file_t * uf)
/* Flush output vector. */
n = write (uf->file_descriptor,
rp->output_vector, vec_len (rp->output_vector));
-
if (n < 0)
{
#if DEBUG > 2
@@ -402,7 +402,7 @@ vl_api_sockclnt_create_t_handler (vl_api_sockclnt_create_t * mp)
{
vl_api_registration_t *regp;
vl_api_sockclnt_create_reply_t *rp;
- int rv = 1;
+ int rv = 0;
regp = socket_main.current_rp;
@@ -480,54 +480,93 @@ send_fd_msg (int socket_fd, int fd_to_share)
return 0;
}
+vl_api_shm_elem_config_t *
+vl_api_make_shm_config (vl_api_sock_init_shm_t * mp)
+{
+ vl_api_shm_elem_config_t *config = 0, *c;
+ u64 cfg;
+ int i;
+
+ if (!mp->nitems)
+ {
+ vec_validate (config, 3);
+ config[0].type = VL_API_VLIB_RING;
+ config[0].count = 128;
+ config[0].size = 256;
+ config[1].type = VL_API_CLIENT_RING;
+ config[1].count = 128;
+ config[1].size = 1024;
+ config[2].type = VL_API_CLIENT_RING;
+ config[2].count = 8;
+ config[2].size = 4096;
+ config[3].type = VL_API_QUEUE;
+ config[3].count = 128;
+ config[3].size = sizeof (uword);
+ }
+ else
+ {
+ vec_validate (config, mp->nitems - 1);
+ for (i = 0; i < mp->nitems; i++)
+ {
+ cfg = mp->configs[i];
+ /* Pretty much a hack but it avoids defining our own api type
+ * in memclnt.api */
+ c = (vl_api_shm_elem_config_t *) & cfg;
+ config[i].type = c->type;
+ config[i].count = c->count;
+ config[i].size = c->size;
+ }
+ }
+ return config;
+}
+
/*
- * Create a memory-fd segment.
+ * Bootstrap shm api using the socket api
*/
void
-vl_api_memfd_segment_create_t_handler (vl_api_memfd_segment_create_t * mp)
+vl_api_sock_init_shm_t_handler (vl_api_sock_init_shm_t * mp)
{
- vl_api_memfd_segment_create_reply_t *rmp;
- api_main_t *am = &api_main;
- clib_file_t *cf;
+ vl_api_sock_init_shm_reply_t *rmp;
memfd_private_t _memfd_private, *memfd = &_memfd_private;
- vl_api_registration_t *regp;
- vlib_main_t *vm = vlib_get_main ();
svm_map_region_args_t _args, *a = &_args;
+ vl_api_registration_t *regp;
+ api_main_t *am = &api_main;
svm_region_t *vlib_rp;
+ clib_file_t *cf;
+ vl_api_shm_elem_config_t *config = 0;
int rv;
regp = vl_api_client_index_to_registration (mp->client_index);
-
if (regp == 0)
{
clib_warning ("API client disconnected");
return;
}
-
if (regp->registration_type != REGISTRATION_TYPE_SOCKET_SERVER)
{
rv = -31; /* VNET_API_ERROR_INVALID_REGISTRATION */
goto reply;
}
+ /*
+ * Set up a memfd segment of the requested size wherein the
+ * shmem data structures will be initialized
+ */
memset (memfd, 0, sizeof (*memfd));
-
- /* Embed in api_main_t */
memfd->memfd_size = mp->requested_size;
memfd->requested_va = 0ULL;
memfd->i_am_master = 1;
memfd->name = format (0, "%s%c", regp->name, 0);
- /* Set up a memfd segment of the requested size */
- rv = memfd_master_init (memfd, mp->client_index);
-
- if (rv)
+ if ((rv = memfd_master_init (memfd, mp->client_index)))
goto reply;
/* Remember to close this fd when the socket connection goes away */
vec_add1 (regp->additional_fds_to_close, memfd->fd);
- /* And create a plausible svm_region in it */
+ /*
+ * Create a plausible svm_region in the memfd backed segment
+ */
memset (a, 0, sizeof (*a));
a->baseva = memfd->sh->memfd_va + MMAP_PAGESIZE;
a->size = memfd->memfd_size - MMAP_PAGESIZE;
@@ -536,24 +575,77 @@ vl_api_memfd_segment_create_t_handler (vl_api_memfd_segment_create_t * mp)
a->flags = SVM_FLAGS_MHEAP;
svm_region_init_mapped_region (a, (svm_region_t *) a->baseva);
- vlib_rp = (svm_region_t *) a->baseva;
-
/*
* Part deux, initialize the svm_region_t shared-memory header
* api allocation rings, and so on.
*/
- vl_init_shmem (vlib_rp, 1 /* is_vlib (dont-care) */ , 1 /* is_private */ );
-
+ config = vl_api_make_shm_config (mp);
+ vlib_rp = (svm_region_t *) a->baseva;
+ vl_init_shmem (vlib_rp, config, 1 /* is_vlib (dont-care) */ ,
+ 1 /* is_private */ );
vec_add1 (am->vlib_private_rps, vlib_rp);
-
memfd->sh->ready = 1;
+ vec_free (config);
/* Recompute the set of input queues to poll in memclnt_process */
vec_reset_length (vl_api_queue_cursizes);
reply:
- /* send the reply message */
+ rmp = vl_msg_api_alloc (sizeof (*rmp));
+ rmp->_vl_msg_id = htons (VL_API_SOCK_INIT_SHM_REPLY);
+ rmp->context = mp->context;
+ rmp->retval = htonl (rv);
+
+ vl_msg_api_send (regp, (u8 *) rmp);
+
+ if (rv != 0)
+ return;
+
+ /*
+ * We need the reply message to make it out the back door
+ * before we send the magic fd message so force a flush
+ */
+ cf = clib_file_get (&file_main, regp->clib_file_index);
+ cf->write_function (cf);
+
+ /* Send the magic "here's your sign (aka fd)" socket message */
+ send_fd_msg (cf->file_descriptor, memfd->fd);
+}
+
+/*
+ * Create a memory-fd segment.
+ */
+void
+vl_api_memfd_segment_create_t_handler (vl_api_memfd_segment_create_t * mp)
+{
+ vl_api_memfd_segment_create_reply_t *rmp;
+ clib_file_t *cf;
+ memfd_private_t _memfd_private, *memfd = &_memfd_private;
+ vl_api_registration_t *regp;
+ int rv;
+
+ regp = vl_api_client_index_to_registration (mp->client_index);
+ if (regp == 0)
+ {
+ clib_warning ("API client disconnected");
+ return;
+ }
+
+ memset (memfd, 0, sizeof (*memfd));
+ memfd->memfd_size = mp->requested_size;
+ memfd->requested_va = 0ULL;
+ memfd->i_am_master = 1;
+ memfd->name = format (0, "%s%c", regp->name, 0);
+
+ /* Set up a memfd segment of the requested size */
+ if ((rv = memfd_master_init (memfd, mp->client_index)))
+ goto reply;
+
+ /* Remember to close this fd when the socket connection goes away */
+ vec_add1 (regp->additional_fds_to_close, memfd->fd);
+
+reply:
rmp = vl_msg_api_alloc (sizeof (*rmp));
rmp->_vl_msg_id = htons (VL_API_MEMFD_SEGMENT_CREATE_REPLY);
@@ -569,17 +661,17 @@ reply:
* We need the reply message to make it out the back door
* before we send the magic fd message.
*/
- vlib_process_suspend (vm, 11e-6);
-
cf = file_main.file_pool + regp->clib_file_index;
+ cf->write_function (cf);
/* send the magic "here's your sign (aka fd)" socket message */
send_fd_msg (cf->file_descriptor, memfd->fd);
}
-#define foreach_vlib_api_msg \
-_(SOCKCLNT_CREATE, sockclnt_create) \
-_(SOCKCLNT_DELETE, sockclnt_delete) \
+#define foreach_vlib_api_msg \
+_(SOCKCLNT_CREATE, sockclnt_create) \
+_(SOCKCLNT_DELETE, sockclnt_delete) \
+_(SOCK_INIT_SHM, sock_init_shm) \
_(MEMFD_SEGMENT_CREATE, memfd_segment_create)
clib_error_t *
@@ -588,8 +680,6 @@ socksvr_api_init (vlib_main_t * vm)
clib_file_main_t *fm = &file_main;
clib_file_t template = { 0 };
vl_api_registration_t *rp;
- vl_msg_api_msg_config_t cfg;
- vl_msg_api_msg_config_t *c = &cfg;
socket_main_t *sm = &socket_main;
clib_socket_t *sock = &sm->socksvr_listen_socket;
clib_error_t *error;
@@ -598,19 +688,13 @@ socksvr_api_init (vlib_main_t * vm)
if (sm->socket_name == 0)
return 0;
-#define _(N,n) do { \
- c->id = VL_API_##N; \
- c->name = #n; \
- c->handler = vl_api_##n##_t_handler; \
- c->cleanup = vl_noop_handler; \
- c->endian = vl_api_##n##_t_endian; \
- c->print = vl_api_##n##_t_print; \
- c->size = sizeof(vl_api_##n##_t); \
- c->traced = 1; /* trace, so these msgs print */ \
- c->replay = 0; /* don't replay client create/delete msgs */ \
- c->message_bounce = 0; /* don't bounce this message */ \
- vl_msg_api_config(c);} while (0);
-
+#define _(N,n) \
+ vl_msg_api_set_handlers(VL_API_##N, #n, \
+ vl_api_##n##_t_handler, \
+ vl_noop_handler, \
+ vl_api_##n##_t_endian, \
+ vl_api_##n##_t_print, \
+ sizeof(vl_api_##n##_t), 1);
foreach_vlib_api_msg;
#undef _
diff --git a/src/vnet/lisp-cp/one_api.c b/src/vnet/lisp-cp/one_api.c
index 0def13c6db9..0708bf033f9 100644
--- a/src/vnet/lisp-cp/one_api.c
+++ b/src/vnet/lisp-cp/one_api.c
@@ -1328,16 +1328,9 @@ vl_api_one_eid_table_vni_dump_t_handler (vl_api_one_eid_table_vni_dump_t * mp)
static void
vl_api_show_one_status_t_handler (vl_api_show_one_status_t * mp)
{
- unix_shared_memory_queue_t *q = NULL;
vl_api_show_one_status_reply_t *rmp = NULL;
int rv = 0;
- q = vl_api_client_index_to_input_queue (mp->client_index);
- if (q == 0)
- {
- return;
- }
-
/* *INDENT-OFF* */
REPLY_MACRO2(VL_API_SHOW_ONE_STATUS_REPLY,
({
@@ -1351,19 +1344,12 @@ static void
vl_api_one_get_map_request_itr_rlocs_t_handler
(vl_api_one_get_map_request_itr_rlocs_t * mp)
{
- unix_shared_memory_queue_t *q = NULL;
vl_api_one_get_map_request_itr_rlocs_reply_t *rmp = NULL;
lisp_cp_main_t *lcm = vnet_lisp_cp_get_main ();
locator_set_t *loc_set = 0;
u8 *tmp_str = 0;
int rv = 0;
- q = vl_api_client_index_to_input_queue (mp->client_index);
- if (q == 0)
- {
- return;
- }
-
if (~0 == lcm->mreq_itr_rlocs)
{
tmp_str = format (0, " ");
diff --git a/src/vppinfra/file.h b/src/vppinfra/file.h
index 6ebf5122567..4231a39b723 100644
--- a/src/vppinfra/file.h
+++ b/src/vppinfra/file.h
@@ -122,6 +122,13 @@ clib_file_set_data_available_to_write (clib_file_main_t * um,
return was_available != 0;
}
+always_inline clib_file_t *
+clib_file_get (clib_file_main_t * fm, u32 file_index)
+{
+ if (pool_is_free_index (fm->file_pool, file_index))
+ return 0;
+ return pool_elt_at_index (fm->file_pool, file_index);
+}
#endif /* included_clib_file_h */
id='n2676' href='#n2676'>2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005
/*
 * Copyright (c) 2021 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <sys/socket.h>

#include <vnet/session/application.h>
#include <vnet/session/transport.h>
#include <vnet/session/session.h>
#include <vlib/unix/plugin.h>
#include <vpp/app/version.h>

#include <vppinfra/lock.h>

#include <quic/quic.h>
#include <quic/certs.h>
#include <quic/error.h>

#include <quicly/constants.h>
#include <quicly/defaults.h>
#include <picotls.h>

#include <quic/quic_crypto.h>

extern quicly_crypto_engine_t quic_crypto_engine;

static char *quic_error_strings[] = {
#define quic_error(n,s) s,
#include <quic/quic_error.def>
#undef quic_error
};

#define DEFAULT_MAX_PACKETS_PER_KEY 16777216

quic_main_t quic_main;
static void quic_update_timer (quic_ctx_t * ctx);
static void quic_check_quic_session_connected (quic_ctx_t * ctx);
static int quic_reset_connection (u64 udp_session_handle,
				  quic_rx_packet_ctx_t * pctx);
static void quic_proto_on_close (u32 ctx_index, u32 thread_index);

static quicly_stream_open_t on_stream_open;
static quicly_closed_by_remote_t on_closed_by_remote;
static quicly_now_t quicly_vpp_now_cb;

/* Crypto contexts */

static inline void
quic_crypto_context_make_key_from_ctx (clib_bihash_kv_24_8_t * kv,
				       quic_ctx_t * ctx)
{
  application_t *app = application_get (ctx->parent_app_id);
  kv->key[0] = ((u64) ctx->ckpair_index) << 32 | (u64) ctx->crypto_engine;
  kv->key[1] = app->sm_properties.rx_fifo_size - 1;
  kv->key[2] = app->sm_properties.tx_fifo_size - 1;
}

static inline void
quic_crypto_context_make_key_from_crctx (clib_bihash_kv_24_8_t * kv,
					 crypto_context_t * crctx)
{
  quic_crypto_context_data_t *data =
    (quic_crypto_context_data_t *) crctx->data;
  kv->key[0] = ((u64) crctx->ckpair_index) << 32 | (u64) crctx->crypto_engine;
  kv->key[1] = data->quicly_ctx.transport_params.max_stream_data.bidi_local;
  kv->key[2] = data->quicly_ctx.transport_params.max_stream_data.bidi_remote;
}

static void
quic_crypto_context_free_if_needed (crypto_context_t * crctx, u8 thread_index)
{
  quic_main_t *qm = &quic_main;
  clib_bihash_kv_24_8_t kv;
  if (crctx->n_subscribers)
    return;
  quic_crypto_context_make_key_from_crctx (&kv, crctx);
  clib_bihash_add_del_24_8 (&qm->wrk_ctx[thread_index].crypto_context_hash,
			    &kv, 0 /* is_add */ );
  clib_mem_free (crctx->data);
  pool_put (qm->wrk_ctx[thread_index].crypto_ctx_pool, crctx);
}

static int
quic_app_cert_key_pair_delete_callback (app_cert_key_pair_t * ckpair)
{
  quic_main_t *qm = &quic_main;
  crypto_context_t *crctx;
  clib_bihash_kv_24_8_t kv;
  vlib_thread_main_t *vtm = vlib_get_thread_main ();
  int num_threads = 1 /* main thread */  + vtm->n_threads;
  int i;

  for (i = 0; i < num_threads; i++)
    {
      /* *INDENT-OFF* */
      pool_foreach (crctx, qm->wrk_ctx[i].crypto_ctx_pool)  {
	if (crctx->ckpair_index == ckpair->cert_key_index)
	  {
	    quic_crypto_context_make_key_from_crctx (&kv, crctx);
	    clib_bihash_add_del_24_8 (&qm->wrk_ctx[i].crypto_context_hash, &kv, 0 /* is_add */ );
	  }
      }
      /* *INDENT-ON* */
    }
  return 0;
}

static crypto_context_t *
quic_crypto_context_alloc (u8 thread_index)
{
  quic_main_t *qm = &quic_main;
  crypto_context_t *crctx;
  u32 idx;

  pool_get (qm->wrk_ctx[thread_index].crypto_ctx_pool, crctx);
  clib_memset (crctx, 0, sizeof (*crctx));
  idx = (crctx - qm->wrk_ctx[thread_index].crypto_ctx_pool);
  crctx->ctx_index = ((u32) thread_index) << 24 | idx;

  return crctx;
}

static crypto_context_t *
quic_crypto_context_get (u32 cr_index, u32 thread_index)
{
  quic_main_t *qm = &quic_main;
  ASSERT (cr_index >> 24 == thread_index);
  return pool_elt_at_index (qm->wrk_ctx[thread_index].crypto_ctx_pool,
			    cr_index & 0x00ffffff);
}

static clib_error_t *
quic_list_crypto_context_command_fn (vlib_main_t * vm,
				     unformat_input_t * input,
				     vlib_cli_command_t * cmd)
{
  quic_main_t *qm = &quic_main;
  crypto_context_t *crctx;
  vlib_thread_main_t *vtm = vlib_get_thread_main ();
  int i, num_threads = 1 /* main thread */  + vtm->n_threads;
  for (i = 0; i < num_threads; i++)
    {
      /* *INDENT-OFF* */
      pool_foreach (crctx, qm->wrk_ctx[i].crypto_ctx_pool)  {
	vlib_cli_output (vm, "[%d][Q]%U", i, format_crypto_context, crctx);
      }
      /* *INDENT-ON* */
    }
  return 0;
}

static clib_error_t *
quic_set_max_packets_per_key_fn (vlib_main_t * vm,
				 unformat_input_t * input,
				 vlib_cli_command_t * cmd)
{
  quic_main_t *qm = &quic_main;
  unformat_input_t _line_input, *line_input = &_line_input;
  u64 tmp;

  if (!unformat_user (input, unformat_line_input, line_input))
    return 0;

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "%U", unformat_memory_size, &tmp))
	{
	  qm->max_packets_per_key = tmp;
	}
      else
	return clib_error_return (0, "unknown input '%U'",
				  format_unformat_error, line_input);
    }

  return 0;
}

static clib_error_t *
quic_set_cc_fn (vlib_main_t *vm, unformat_input_t *input,
		vlib_cli_command_t *cmd)
{
  unformat_input_t _line_input, *line_input = &_line_input;
  quic_main_t *qm = &quic_main;
  clib_error_t *e = 0;

  if (!unformat_user (input, unformat_line_input, line_input))
    return 0;

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "reno"))
	qm->default_quic_cc = QUIC_CC_RENO;
      else if (unformat (line_input, "cubic"))
	qm->default_quic_cc = QUIC_CC_CUBIC;
      else
	{
	  e = clib_error_return (0, "unknown input '%U'",
				 format_unformat_error, line_input);
	  goto done;
	}
    }
done:
  unformat_free (line_input);
  return e;
}

static void
quic_release_crypto_context (u32 crypto_context_index, u8 thread_index)
{
  crypto_context_t *crctx;
  crctx = quic_crypto_context_get (crypto_context_index, thread_index);
  crctx->n_subscribers--;
  quic_crypto_context_free_if_needed (crctx, thread_index);
}

static int
quic_init_crypto_context (crypto_context_t * crctx, quic_ctx_t * ctx)
{
  quic_main_t *qm = &quic_main;
  quicly_context_t *quicly_ctx;
  ptls_iovec_t key_vec;
  app_cert_key_pair_t *ckpair;
  application_t *app;
  quic_crypto_context_data_t *data;
  ptls_context_t *ptls_ctx;

  QUIC_DBG (2, "Init quic crctx %d thread %d", crctx->ctx_index,
	    ctx->c_thread_index);

  data = clib_mem_alloc (sizeof (*data));
  /* picotls depends on data being zeroed */
  clib_memset (data, 0, sizeof (*data));
  crctx->data = (void *) data;
  quicly_ctx = &data->quicly_ctx;
  ptls_ctx = &data->ptls_ctx;

  ptls_ctx->random_bytes = ptls_openssl_random_bytes;
  ptls_ctx->get_time = &ptls_get_time;
  ptls_ctx->key_exchanges = ptls_openssl_key_exchanges;
  ptls_ctx->cipher_suites = qm->quic_ciphers[ctx->crypto_engine];
  ptls_ctx->certificates.list = NULL;
  ptls_ctx->certificates.count = 0;
  ptls_ctx->esni = NULL;
  ptls_ctx->on_client_hello = NULL;
  ptls_ctx->emit_certificate = NULL;
  ptls_ctx->sign_certificate = NULL;
  ptls_ctx->verify_certificate = NULL;
  ptls_ctx->ticket_lifetime = 86400;
  ptls_ctx->max_early_data_size = 8192;
  ptls_ctx->hkdf_label_prefix__obsolete = NULL;
  ptls_ctx->require_dhe_on_psk = 1;
  ptls_ctx->encrypt_ticket = &qm->session_cache.super;
  clib_memcpy (quicly_ctx, &quicly_spec_context, sizeof (quicly_context_t));

  quicly_ctx->max_packets_per_key = qm->max_packets_per_key;
  quicly_ctx->tls = ptls_ctx;
  quicly_ctx->stream_open = &on_stream_open;
  quicly_ctx->closed_by_remote = &on_closed_by_remote;
  quicly_ctx->now = &quicly_vpp_now_cb;
  quicly_amend_ptls_context (quicly_ctx->tls);

  if (qm->vnet_crypto_enabled &&
      qm->default_crypto_engine == CRYPTO_ENGINE_VPP)
    quicly_ctx->crypto_engine = &quic_crypto_engine;
  else
    quicly_ctx->crypto_engine = &quicly_default_crypto_engine;

  quicly_ctx->transport_params.max_data = QUIC_INT_MAX;
  quicly_ctx->transport_params.max_streams_uni = (uint64_t) 1 << 60;
  quicly_ctx->transport_params.max_streams_bidi = (uint64_t) 1 << 60;
  quicly_ctx->transport_params.max_idle_timeout = qm->connection_timeout;

  if (qm->default_quic_cc == QUIC_CC_CUBIC)
    quicly_ctx->init_cc = &quicly_cc_cubic_init;
  else if (qm->default_quic_cc == QUIC_CC_RENO)
    quicly_ctx->init_cc = &quicly_cc_reno_init;

  app = application_get (ctx->parent_app_id);
  quicly_ctx->transport_params.max_stream_data.bidi_local =
    app->sm_properties.rx_fifo_size - 1;
  quicly_ctx->transport_params.max_stream_data.bidi_remote =
    app->sm_properties.tx_fifo_size - 1;
  quicly_ctx->transport_params.max_stream_data.uni = QUIC_INT_MAX;

  quicly_ctx->transport_params.max_udp_payload_size = QUIC_MAX_PACKET_SIZE;
  if (!app->quic_iv_set)
    {
      ptls_openssl_random_bytes (app->quic_iv, QUIC_IV_LEN - 1);
      app->quic_iv[QUIC_IV_LEN - 1] = 0;
      app->quic_iv_set = 1;
    }

  clib_memcpy (data->cid_key, app->quic_iv, QUIC_IV_LEN);
  key_vec = ptls_iovec_init (data->cid_key, QUIC_IV_LEN);
  quicly_ctx->cid_encryptor =
    quicly_new_default_cid_encryptor (&ptls_openssl_bfecb,
				      &ptls_openssl_aes128ecb,
				      &ptls_openssl_sha256, key_vec);

  ckpair = app_cert_key_pair_get_if_valid (crctx->ckpair_index);
  if (!ckpair || !ckpair->key || !ckpair->cert)
    {
      QUIC_DBG (1, "Wrong ckpair id %d\n", crctx->ckpair_index);
      return -1;
    }
  if (load_bio_private_key (quicly_ctx->tls, (char *) ckpair->key))
    {
      QUIC_DBG (1, "failed to read private key from app configuration\n");
      return -1;
    }
  if (load_bio_certificate_chain (quicly_ctx->tls, (char *) ckpair->cert))
    {
      QUIC_DBG (1, "failed to load certificate\n");
      return -1;
    }
  return 0;

}

static int
quic_acquire_crypto_context (quic_ctx_t * ctx)
{
  quic_main_t *qm = &quic_main;
  crypto_context_t *crctx;
  clib_bihash_kv_24_8_t kv;

  if (ctx->crypto_engine == CRYPTO_ENGINE_NONE)
    {
      QUIC_DBG (2, "No crypto engine specified, using %d",
		qm->default_crypto_engine);
      ctx->crypto_engine = qm->default_crypto_engine;
    }
  if (!clib_bitmap_get (qm->available_crypto_engines, ctx->crypto_engine))
    {
      QUIC_DBG (1, "Quic does not support crypto engine %d",
		ctx->crypto_engine);
      return SESSION_E_NOCRYPTOENG;
    }

  /* Check for exisiting crypto ctx */
  quic_crypto_context_make_key_from_ctx (&kv, ctx);
  if (clib_bihash_search_24_8
      (&qm->wrk_ctx[ctx->c_thread_index].crypto_context_hash, &kv, &kv) == 0)
    {
      crctx = quic_crypto_context_get (kv.value, ctx->c_thread_index);
      QUIC_DBG (2, "Found exisiting crypto context %d", kv.value);
      ctx->crypto_context_index = kv.value;
      crctx->n_subscribers++;
      return 0;
    }

  crctx = quic_crypto_context_alloc (ctx->c_thread_index);
  ctx->crypto_context_index = crctx->ctx_index;
  kv.value = crctx->ctx_index;
  crctx->crypto_engine = ctx->crypto_engine;
  crctx->ckpair_index = ctx->ckpair_index;
  if (quic_init_crypto_context (crctx, ctx))
    goto error;
  if (vnet_app_add_cert_key_interest (ctx->ckpair_index, qm->app_index))
    goto error;
  crctx->n_subscribers++;
  clib_bihash_add_del_24_8 (&qm->
			    wrk_ctx[ctx->c_thread_index].crypto_context_hash,
			    &kv, 1 /* is_add */ );
  return 0;

error:
  quic_crypto_context_free_if_needed (crctx, ctx->c_thread_index);
  return SESSION_E_NOCRYPTOCKP;
}

/*  Helper functions */

static u32
quic_ctx_alloc (u32 thread_index)
{
  quic_main_t *qm = &quic_main;
  quic_ctx_t *ctx;

  pool_get (qm->ctx_pool[thread_index], ctx);

  clib_memset (ctx, 0, sizeof (quic_ctx_t));
  ctx->c_thread_index = thread_index;
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  QUIC_DBG (3, "Allocated quic_ctx %u on thread %u",
	    ctx - qm->ctx_pool[thread_index], thread_index);
  return ctx - qm->ctx_pool[thread_index];
}

static void
quic_ctx_free (quic_ctx_t * ctx)
{
  QUIC_DBG (2, "Free ctx %u %x", ctx->c_thread_index, ctx->c_c_index);
  u32 thread_index = ctx->c_thread_index;
  QUIC_ASSERT (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID);
  if (CLIB_DEBUG)
    clib_memset (ctx, 0xfb, sizeof (*ctx));
  pool_put (quic_main.ctx_pool[thread_index], ctx);
}

static quic_ctx_t *
quic_ctx_get (u32 ctx_index, u32 thread_index)
{
  return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
}

static quic_ctx_t *
quic_ctx_get_if_valid (u32 ctx_index, u32 thread_index)
{
  if (pool_is_free_index (quic_main.ctx_pool[thread_index], ctx_index))
    return 0;
  return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
}

quic_ctx_t *
quic_get_conn_ctx (quicly_conn_t * conn)
{
  u64 conn_data;
  conn_data = (u64) * quicly_get_data (conn);
  return quic_ctx_get (conn_data & UINT32_MAX, conn_data >> 32);
}

static void
quic_store_conn_ctx (quicly_conn_t * conn, quic_ctx_t * ctx)
{
  *quicly_get_data (conn) =
    (void *) (((u64) ctx->c_thread_index) << 32 | (u64) ctx->c_c_index);
}

static inline int
quic_ctx_is_stream (quic_ctx_t * ctx)
{
  return (ctx->flags & QUIC_F_IS_STREAM);
}

static inline int
quic_ctx_is_listener (quic_ctx_t * ctx)
{
  return (ctx->flags & QUIC_F_IS_LISTENER);
}

static inline int
quic_ctx_is_conn (quic_ctx_t * ctx)
{
  return !(quic_ctx_is_listener (ctx) || quic_ctx_is_stream (ctx));
}

static inline session_t *
get_stream_session_and_ctx_from_stream (quicly_stream_t * stream,
					quic_ctx_t ** ctx)
{
  quic_stream_data_t *stream_data;

  stream_data = (quic_stream_data_t *) stream->data;
  *ctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index);
  return session_get ((*ctx)->c_s_index, stream_data->thread_index);
}

static inline void
quic_make_connection_key (clib_bihash_kv_16_8_t * kv,
			  const quicly_cid_plaintext_t * id)
{
  kv->key[0] = ((u64) id->master_id) << 32 | (u64) id->thread_id;
  kv->key[1] = id->node_id;
}

static int
quic_sendable_packet_count (session_t * udp_session)
{
  u32 max_enqueue;
  u32 packet_size = QUIC_MAX_PACKET_SIZE + SESSION_CONN_HDR_LEN;
  max_enqueue = svm_fifo_max_enqueue (udp_session->tx_fifo);
  return clib_min (max_enqueue / packet_size, QUIC_SEND_PACKET_VEC_SIZE);
}

static quicly_context_t *
quic_get_quicly_ctx_from_ctx (quic_ctx_t * ctx)
{
  crypto_context_t *crctx =
    quic_crypto_context_get (ctx->crypto_context_index, ctx->c_thread_index);
  quic_crypto_context_data_t *data =
    (quic_crypto_context_data_t *) crctx->data;
  return &data->quicly_ctx;
}

static quicly_context_t *
quic_get_quicly_ctx_from_udp (u64 udp_session_handle)
{
  session_t *udp_session = session_get_from_handle (udp_session_handle);
  quic_ctx_t *ctx =
    quic_ctx_get (udp_session->opaque, udp_session->thread_index);
  return quic_get_quicly_ctx_from_ctx (ctx);
}

static inline void
quic_set_udp_tx_evt (session_t * udp_session)
{
  int rv = 0;
  if (svm_fifo_set_event (udp_session->tx_fifo))
    rv = session_send_io_evt_to_thread (udp_session->tx_fifo,
					SESSION_IO_EVT_TX);
  if (PREDICT_FALSE (rv))
    clib_warning ("Event enqueue errored %d", rv);
}

static inline void
quic_stop_ctx_timer (quic_ctx_t * ctx)
{
  tw_timer_wheel_1t_3w_1024sl_ov_t *tw;
  if (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID)
    return;
  tw = &quic_main.wrk_ctx[ctx->c_thread_index].timer_wheel;
  tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->timer_handle);
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  QUIC_DBG (4, "Stopping timer for ctx %u", ctx->c_c_index);
}

/* QUIC protocol actions */

static void
quic_ack_rx_data (session_t * stream_session)
{
  u32 max_deq;
  quic_ctx_t *sctx;
  svm_fifo_t *f;
  quicly_stream_t *stream;
  quic_stream_data_t *stream_data;

  sctx = quic_ctx_get (stream_session->connection_index,
		       stream_session->thread_index);
  QUIC_ASSERT (quic_ctx_is_stream (sctx));
  stream = sctx->stream;
  stream_data = (quic_stream_data_t *) stream->data;

  f = stream_session->rx_fifo;
  max_deq = svm_fifo_max_dequeue (f);

  QUIC_ASSERT (stream_data->app_rx_data_len >= max_deq);
  quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq);
  QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq);
  stream_data->app_rx_data_len = max_deq;
}

static void
quic_disconnect_transport (quic_ctx_t * ctx)
{
  QUIC_DBG (2, "Disconnecting transport 0x%lx", ctx->udp_session_handle);
  vnet_disconnect_args_t a = {
    .handle = ctx->udp_session_handle,
    .app_index = quic_main.app_index,
  };

  if (vnet_disconnect_session (&a))
    clib_warning ("UDP session 0x%lx disconnect errored",
		  ctx->udp_session_handle);
}

static void
quic_connection_delete (quic_ctx_t * ctx)
{
  clib_bihash_kv_16_8_t kv;
  quicly_conn_t *conn;

  QUIC_DBG (2, "Deleting connection %u", ctx->c_c_index);

  QUIC_ASSERT (!quic_ctx_is_stream (ctx));
  quic_stop_ctx_timer (ctx);

  /*  Delete the connection from the connection map */
  conn = ctx->conn;
  ctx->conn = NULL;
  quic_make_connection_key (&kv, quicly_get_master_id (conn));
  QUIC_DBG (2, "Deleting conn with id %lu %lu from map", kv.key[0],
	    kv.key[1]);
  clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 0 /* is_add */ );

  quic_disconnect_transport (ctx);

  if (ctx->conn)
    quicly_free (ctx->conn);
  session_transport_delete_notify (&ctx->connection);
}

void
quic_increment_counter (u8 evt, u8 val)
{
  vlib_main_t *vm = vlib_get_main ();
  vlib_node_increment_counter (vm, quic_input_node.index, evt, val);
}

/**
 * Called when quicly return an error
 * This function interacts tightly with quic_proto_on_close
 */
static void
quic_connection_closed (quic_ctx_t * ctx)
{
  QUIC_DBG (2, "QUIC connection %u/%u closed", ctx->c_thread_index,
	    ctx->c_c_index);

  /* TODO if connection is not established, just delete the session? */
  /* Actually should send connect or accept error */

  switch (ctx->conn_state)
    {
    case QUIC_CONN_STATE_READY:
      /* Error on an opened connection (timeout...)
         This puts the session in closing state, we should receive a notification
         when the app has closed its session */
      session_transport_reset_notify (&ctx->connection);
      /* This ensures we delete the connection when the app confirms the close */
      ctx->conn_state = QUIC_CONN_STATE_PASSIVE_CLOSING_QUIC_CLOSED;
      break;
    case QUIC_CONN_STATE_PASSIVE_CLOSING:
      ctx->conn_state = QUIC_CONN_STATE_PASSIVE_CLOSING_QUIC_CLOSED;
      /* quic_proto_on_close will eventually be called when the app confirms the close
         , we delete the connection at that point */
      break;
    case QUIC_CONN_STATE_PASSIVE_CLOSING_APP_CLOSED:
      /* App already confirmed close, we can delete the connection */
      quic_connection_delete (ctx);
      break;
    case QUIC_CONN_STATE_OPENED:
    case QUIC_CONN_STATE_HANDSHAKE:
    case QUIC_CONN_STATE_ACTIVE_CLOSING:
      quic_connection_delete (ctx);
      break;
    default:
      QUIC_DBG (0, "BUG %d", ctx->conn_state);
      break;
    }
}

static int
quic_send_datagram (session_t *udp_session, struct iovec *packet,
		    quicly_address_t *dest, quicly_address_t *src)
{
  u32 max_enqueue, len;
  session_dgram_hdr_t hdr;
  svm_fifo_t *f;
  transport_connection_t *tc;
  int ret;

  len = packet->iov_len;
  f = udp_session->tx_fifo;
  tc = session_get_transport (udp_session);
  max_enqueue = svm_fifo_max_enqueue (f);
  if (max_enqueue < SESSION_CONN_HDR_LEN + len)
    {
      QUIC_ERR ("Too much data to send, max_enqueue %u, len %u",
		max_enqueue, len + SESSION_CONN_HDR_LEN);
      return QUIC_ERROR_FULL_FIFO;
    }

  /*  Build packet header for fifo */
  hdr.data_length = len;
  hdr.data_offset = 0;
  hdr.is_ip4 = tc->is_ip4;
  clib_memcpy (&hdr.lcl_ip, &tc->lcl_ip, sizeof (ip46_address_t));
  hdr.lcl_port = tc->lcl_port;

  /*  Read dest address from quicly-provided sockaddr */
  if (hdr.is_ip4)
    {
      QUIC_ASSERT (dest->sa.sa_family == AF_INET);
      struct sockaddr_in *sa4 = (struct sockaddr_in *) &dest->sa;
      hdr.rmt_port = sa4->sin_port;
      hdr.rmt_ip.ip4.as_u32 = sa4->sin_addr.s_addr;
    }
  else
    {
      QUIC_ASSERT (dest->sa.sa_family == AF_INET6);
      struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) &dest->sa;
      hdr.rmt_port = sa6->sin6_port;
      clib_memcpy_fast (&hdr.rmt_ip.ip6, &sa6->sin6_addr, 16);
    }

  svm_fifo_seg_t segs[2] = { { (u8 *) &hdr, sizeof (hdr) },
			     { packet->iov_base, len } };

  ret = svm_fifo_enqueue_segments (f, segs, 2, 0 /* allow partial */);
  if (PREDICT_FALSE (ret < 0))
    {
      QUIC_ERR ("Not enough space to enqueue dgram");
      return QUIC_ERROR_FULL_FIFO;
    }

  quic_increment_counter (QUIC_ERROR_TX_PACKETS, 1);

  return 0;
}

static int
quic_send_packets (quic_ctx_t * ctx)
{
  struct iovec packets[QUIC_SEND_PACKET_VEC_SIZE];
  uint8_t
    buf[QUIC_SEND_PACKET_VEC_SIZE * quic_get_quicly_ctx_from_ctx (ctx)
				      ->transport_params.max_udp_payload_size];
  session_t *udp_session;
  quicly_conn_t *conn;
  size_t num_packets, i, max_packets;
  quicly_address_t dest, src;
  u32 n_sent = 0;
  int err = 0;

  /* We have sctx, get qctx */
  if (quic_ctx_is_stream (ctx))
    ctx = quic_ctx_get (ctx->quic_connection_ctx_id, ctx->c_thread_index);

  QUIC_ASSERT (!quic_ctx_is_stream (ctx));

  udp_session = session_get_from_handle_if_valid (ctx->udp_session_handle);
  if (!udp_session)
    goto quicly_error;

  conn = ctx->conn;
  if (!conn)
    return 0;

  do
    {
      /* TODO : quicly can assert it can send min_packets up to 2 */
      max_packets = quic_sendable_packet_count (udp_session);
      if (max_packets < 2)
	break;

      num_packets = max_packets;
      if ((err = quicly_send (conn, &dest, &src, packets, &num_packets, buf,
			      sizeof (buf))))
	goto quicly_error;

      for (i = 0; i != num_packets; ++i)
	{

	  if ((err =
		 quic_send_datagram (udp_session, &packets[i], &dest, &src)))
	    goto quicly_error;

	}
      n_sent += num_packets;
    }
  while (num_packets > 0 && num_packets == max_packets);

  quic_set_udp_tx_evt (udp_session);

  QUIC_DBG (3, "%u[TX] %u[RX]", svm_fifo_max_dequeue (udp_session->tx_fifo),
	    svm_fifo_max_dequeue (udp_session->rx_fifo));
  quic_update_timer (ctx);
  return n_sent;

quicly_error:
  if (err && err != QUICLY_ERROR_PACKET_IGNORED
      && err != QUICLY_ERROR_FREE_CONNECTION)
    clib_warning ("Quic error '%U'.", quic_format_err, err);
  quic_connection_closed (ctx);
  return 0;
}

/* Quicly callbacks */

static void
quic_on_stream_destroy (quicly_stream_t * stream, int err)
{
  quic_stream_data_t *stream_data = (quic_stream_data_t *) stream->data;
  quic_ctx_t *sctx = quic_ctx_get (stream_data->ctx_id,
				   stream_data->thread_index);
  session_t *stream_session = session_get (sctx->c_s_index,
					   sctx->c_thread_index);
  QUIC_DBG (2, "DESTROYED_STREAM: session 0x%lx (%U)",
	    session_handle (stream_session), quic_format_err, err);

  stream_session->session_state = SESSION_STATE_CLOSED;
  session_transport_delete_notify (&sctx->connection);

  quic_increment_counter (QUIC_ERROR_CLOSED_STREAM, 1);
  quic_ctx_free (sctx);
  clib_mem_free (stream->data);
}

static void
quic_on_stop_sending (quicly_stream_t * stream, int err)
{
#if QUIC_DEBUG >= 2
  quic_stream_data_t *stream_data = (quic_stream_data_t *) stream->data;
  quic_ctx_t *sctx = quic_ctx_get (stream_data->ctx_id,
				   stream_data->thread_index);
  session_t *stream_session = session_get (sctx->c_s_index,
					   sctx->c_thread_index);
  clib_warning ("(NOT IMPLEMENTD) STOP_SENDING: session 0x%lx (%U)",
		session_handle (stream_session), quic_format_err, err);
#endif
  /* TODO : handle STOP_SENDING */
}

static void
quic_on_receive_reset (quicly_stream_t * stream, int err)
{
  quic_stream_data_t *stream_data = (quic_stream_data_t *) stream->data;
  quic_ctx_t *sctx = quic_ctx_get (stream_data->ctx_id,
				   stream_data->thread_index);
#if QUIC_DEBUG >= 2
  session_t *stream_session = session_get (sctx->c_s_index,
					   sctx->c_thread_index);
  clib_warning ("RESET_STREAM: session 0x%lx (%U)",
		session_handle (stream_session), quic_format_err, err);
#endif
  session_transport_closing_notify (&sctx->connection);
}

static void
quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
		 size_t len)
{
  QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
  u32 max_enq, rlen, rv;
  quic_ctx_t *sctx;
  session_t *stream_session;
  app_worker_t *app_wrk;
  svm_fifo_t *f;
  quic_stream_data_t *stream_data;

  if (!len)
    return;

  stream_data = (quic_stream_data_t *) stream->data;
  sctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index);
  stream_session = session_get (sctx->c_s_index, stream_data->thread_index);
  f = stream_session->rx_fifo;

  max_enq = svm_fifo_max_enqueue_prod (f);
  QUIC_DBG (3, "Enqueuing %u at off %u in %u space", len, off, max_enq);
  /* Handle duplicate packet/chunk from quicly */
  if (off < stream_data->app_rx_data_len)
    {
      QUIC_DBG (3, "Session [idx %u, app_wrk %u, thread %u, rx-fifo 0x%llx]: "
		"DUPLICATE PACKET (max_enq %u, len %u, "
		"app_rx_data_len %u, off %u, ToBeNQ %u)",
		stream_session->session_index,
		stream_session->app_wrk_index,
		stream_session->thread_index, f,
		max_enq, len, stream_data->app_rx_data_len, off,
		off - stream_data->app_rx_data_len + len);
      return;
    }
  if (PREDICT_FALSE ((off - stream_data->app_rx_data_len + len) > max_enq))
    {
      QUIC_ERR ("Session [idx %u, app_wrk %u, thread %u, rx-fifo 0x%llx]: "
		"RX FIFO IS FULL (max_enq %u, len %u, "
		"app_rx_data_len %u, off %u, ToBeNQ %u)",
		stream_session->session_index,
		stream_session->app_wrk_index,
		stream_session->thread_index, f,
		max_enq, len, stream_data->app_rx_data_len, off,
		off - stream_data->app_rx_data_len + len);
      return;			/* This shouldn't happen */
    }
  if (off == stream_data->app_rx_data_len)
    {
      /* Streams live on the same thread so (f, stream_data) should stay consistent */
      rlen = svm_fifo_enqueue (f, len, (u8 *) src);
      QUIC_DBG (3, "Session [idx %u, app_wrk %u, ti %u, rx-fifo 0x%llx]: "
		"Enqueuing %u (rlen %u) at off %u in %u space, ",
		stream_session->session_index,
		stream_session->app_wrk_index,
		stream_session->thread_index, f, len, rlen, off, max_enq);
      stream_data->app_rx_data_len += rlen;
      QUIC_ASSERT (rlen >= len);
      app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
      if (PREDICT_TRUE (app_wrk != 0))
	{
	  rv = app_worker_lock_and_send_event (app_wrk, stream_session,
					       SESSION_IO_EVT_RX);
	  if (rv)
	    QUIC_ERR ("Failed to ping app for RX");
	}
      quic_ack_rx_data (stream_session);
    }
  else
    {
      rlen = svm_fifo_enqueue_with_offset (f,
					   off - stream_data->app_rx_data_len,
					   len, (u8 *) src);
      QUIC_ASSERT (rlen == 0);
    }
  return;
}

void
quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta)
{
  quic_stream_data_t *stream_data;
  session_t *stream_session;
  quic_ctx_t *ctx;
  svm_fifo_t *f;
  u32 rv;

  stream_data = (quic_stream_data_t *) stream->data;
  stream_session = get_stream_session_and_ctx_from_stream (stream, &ctx);
  f = stream_session->tx_fifo;

  QUIC_ASSERT (stream_data->app_tx_data_len >= delta);
  stream_data->app_tx_data_len -= delta;
  ctx->bytes_written += delta;
  rv = svm_fifo_dequeue_drop (f, delta);
  QUIC_ASSERT (rv == delta);

  rv = quicly_stream_sync_sendbuf (stream, 0);
  QUIC_ASSERT (!rv);
}

void
quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
		       size_t * len, int *wrote_all)
{
  quic_stream_data_t *stream_data;
  quic_ctx_t *ctx;
  session_t *stream_session;
  svm_fifo_t *f;
  u32 deq_max;

  stream_data = (quic_stream_data_t *) stream->data;
  stream_session = get_stream_session_and_ctx_from_stream (stream, &ctx);
  f = stream_session->tx_fifo;

  QUIC_DBG (3, "Emitting %u, offset %u", *len, off);

  deq_max = svm_fifo_max_dequeue_cons (f);
  QUIC_ASSERT (off <= deq_max);
  if (off + *len < deq_max)
    {
      *wrote_all = 0;
    }
  else
    {
      *wrote_all = 1;
      *len = deq_max - off;
    }
  QUIC_ASSERT (*len > 0);

  if (off + *len > stream_data->app_tx_data_len)
    stream_data->app_tx_data_len = off + *len;

  svm_fifo_peek (f, off, *len, dst);
}

static const quicly_stream_callbacks_t quic_stream_callbacks = {
  .on_destroy = quic_on_stream_destroy,
  .on_send_shift = quic_fifo_egress_shift,
  .on_send_emit = quic_fifo_egress_emit,
  .on_send_stop = quic_on_stop_sending,
  .on_receive = quic_on_receive,
  .on_receive_reset = quic_on_receive_reset
};

static int
quic_on_stream_open (quicly_stream_open_t * self, quicly_stream_t * stream)
{
  /* Return code for this function ends either
   * - in quicly_receive : if not QUICLY_ERROR_PACKET_IGNORED, will close connection
   * - in quicly_open_stream, returned directly
   */

  session_t *stream_session, *quic_session;
  quic_stream_data_t *stream_data;
  app_worker_t *app_wrk;
  quic_ctx_t *qctx, *sctx;
  u32 sctx_id;
  int rv;

  QUIC_DBG (2, "on_stream_open called");
  stream->data = clib_mem_alloc (sizeof (quic_stream_data_t));
  stream->callbacks = &quic_stream_callbacks;
  /* Notify accept on parent qsession, but only if this is not a locally
   * initiated stream */
  if (quicly_stream_is_self_initiated (stream))
    return 0;

  sctx_id = quic_ctx_alloc (vlib_get_thread_index ());
  qctx = quic_get_conn_ctx (stream->conn);

  /* Might need to signal that the connection is ready if the first thing the
   * server does is open a stream */
  quic_check_quic_session_connected (qctx);
  /* ctx might be invalidated */
  qctx = quic_get_conn_ctx (stream->conn);

  stream_session = session_alloc (qctx->c_thread_index);
  QUIC_DBG (2, "ACCEPTED stream_session 0x%lx ctx %u",
	    session_handle (stream_session), sctx_id);
  sctx = quic_ctx_get (sctx_id, qctx->c_thread_index);
  sctx->parent_app_wrk_id = qctx->parent_app_wrk_id;
  sctx->parent_app_id = qctx->parent_app_id;
  sctx->quic_connection_ctx_id = qctx->c_c_index;
  sctx->c_c_index = sctx_id;
  sctx->c_s_index = stream_session->session_index;
  sctx->stream = stream;
  sctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
  sctx->flags |= QUIC_F_IS_STREAM;
  if (quicly_stream_is_unidirectional (stream->stream_id))
    stream_session->flags |= SESSION_F_UNIDIRECTIONAL;

  stream_data = (quic_stream_data_t *) stream->data;
  stream_data->ctx_id = sctx_id;
  stream_data->thread_index = sctx->c_thread_index;
  stream_data->app_rx_data_len = 0;
  stream_data->app_tx_data_len = 0;

  sctx->c_s_index = stream_session->session_index;
  stream_session->session_state = SESSION_STATE_CREATED;
  stream_session->app_wrk_index = sctx->parent_app_wrk_id;
  stream_session->connection_index = sctx->c_c_index;
  stream_session->session_type =
    session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, qctx->udp_is_ip4);
  quic_session = session_get (qctx->c_s_index, qctx->c_thread_index);
  stream_session->listener_handle = listen_session_get_handle (quic_session);

  app_wrk = app_worker_get (stream_session->app_wrk_index);
  if ((rv = app_worker_init_connected (app_wrk, stream_session)))
    {
      QUIC_ERR ("failed to allocate fifos");
      quicly_reset_stream (stream, QUIC_APP_ALLOCATION_ERROR);
      return 0;			/* Frame is still valid */
    }
  svm_fifo_add_want_deq_ntf (stream_session->rx_fifo,
			     SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
			     SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);

  if ((rv = app_worker_accept_notify (app_wrk, stream_session)))
    {
      QUIC_ERR ("failed to notify accept worker app");
      quicly_reset_stream (stream, QUIC_APP_ACCEPT_NOTIFY_ERROR);
      return 0;			/* Frame is still valid */
    }

  return 0;
}

static void
quic_on_closed_by_remote (quicly_closed_by_remote_t *self, quicly_conn_t *conn,
			  int code, uint64_t frame_type, const char *reason,
			  size_t reason_len)
{
  quic_ctx_t *ctx = quic_get_conn_ctx (conn);
#if QUIC_DEBUG >= 2
  session_t *quic_session = session_get (ctx->c_s_index, ctx->c_thread_index);
  clib_warning ("Session 0x%lx closed by peer (%U) %.*s ",
		session_handle (quic_session), quic_format_err, code,
		reason_len, reason);
#endif
  ctx->conn_state = QUIC_CONN_STATE_PASSIVE_CLOSING;
  session_transport_closing_notify (&ctx->connection);
}

/* Timer handling */

static int64_t
quic_get_thread_time (u8 thread_index)
{
  return quic_main.wrk_ctx[thread_index].time_now;
}

static int64_t
quic_get_time (quicly_now_t * self)
{
  u8 thread_index = vlib_get_thread_index ();
  return quic_get_thread_time (thread_index);
}

static u32
quic_set_time_now (u32 thread_index)
{
  vlib_main_t *vlib_main = vlib_get_main ();
  f64 time = vlib_time_now (vlib_main);
  quic_main.wrk_ctx[thread_index].time_now = (int64_t) (time * 1000.f);
  return quic_main.wrk_ctx[thread_index].time_now;
}

/* Transport proto callback */
static void
quic_update_time (f64 now, u8 thread_index)
{
  tw_timer_wheel_1t_3w_1024sl_ov_t *tw;

  tw = &quic_main.wrk_ctx[thread_index].timer_wheel;
  quic_set_time_now (thread_index);
  tw_timer_expire_timers_1t_3w_1024sl_ov (tw, now);
}

static void
quic_timer_expired (u32 conn_index)
{
  quic_ctx_t *ctx;
  QUIC_DBG (4, "Timer expired for conn %u at %ld", conn_index,
	    quic_get_time (NULL));
  ctx = quic_ctx_get (conn_index, vlib_get_thread_index ());
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  quic_send_packets (ctx);
}

static void
quic_update_timer (quic_ctx_t * ctx)
{
  tw_timer_wheel_1t_3w_1024sl_ov_t *tw;
  int64_t next_timeout, next_interval;
  session_t *quic_session;
  int rv;

  /*  This timeout is in ms which is the unit of our timer */
  next_timeout = quicly_get_first_timeout (ctx->conn);
  next_interval = next_timeout - quic_get_time (NULL);

  if (next_timeout == 0 || next_interval <= 0)
    {
      if (ctx->c_s_index == QUIC_SESSION_INVALID)
	{
	  next_interval = 1;
	}
      else
	{
	  quic_session = session_get (ctx->c_s_index, ctx->c_thread_index);
	  if (svm_fifo_set_event (quic_session->tx_fifo))
	    {
	      rv = session_send_io_evt_to_thread_custom (quic_session,
							 quic_session->thread_index,
							 SESSION_IO_EVT_BUILTIN_TX);
	      if (PREDICT_FALSE (rv))
		QUIC_ERR ("Failed to enqueue builtin_tx %d", rv);
	    }
	  return;
	}
    }

  tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel;

  QUIC_DBG (4, "Timer set to %ld (int %ld) for ctx %u", next_timeout,
	    next_interval, ctx->c_c_index);

  if (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID)
    {
      if (next_timeout == INT64_MAX)
	{
	  QUIC_DBG (4, "timer for ctx %u already stopped", ctx->c_c_index);
	  return;
	}
      ctx->timer_handle = tw_timer_start_1t_3w_1024sl_ov (tw, ctx->c_c_index,
							  0, next_interval);
    }
  else
    {
      if (next_timeout == INT64_MAX)
	{
	  quic_stop_ctx_timer (ctx);
	}
      else
	tw_timer_update_1t_3w_1024sl_ov (tw, ctx->timer_handle,
					 next_interval);
    }
  return;
}

static void
quic_expired_timers_dispatch (u32 * expired_timers)
{
  int i;

  for (i = 0; i < vec_len (expired_timers); i++)
    {
      quic_timer_expired (expired_timers[i]);
    }
}

/* Transport proto functions */
static int
quic_connect_stream (session_t * quic_session, session_endpoint_cfg_t * sep)
{
  uint64_t quic_session_handle;
  session_t *stream_session;
  quic_stream_data_t *stream_data;
  quicly_stream_t *stream;
  quicly_conn_t *conn;
  app_worker_t *app_wrk;
  quic_ctx_t *qctx, *sctx;
  u32 sctx_index;
  u8 is_unidir;
  int rv;

  /*  Find base session to which the user want to attach a stream */
  quic_session_handle = session_handle (quic_session);
  QUIC_DBG (2, "Opening new stream (qsession %u)", quic_session_handle);

  if (session_type_transport_proto (quic_session->session_type) !=
      TRANSPORT_PROTO_QUIC)
    {
      QUIC_ERR ("received incompatible session");
      return -1;
    }

  app_wrk = app_worker_get_if_valid (quic_session->app_wrk_index);
  if (!app_wrk)
    {
      QUIC_ERR ("Invalid app worker :(");
      return -1;
    }

  sctx_index = quic_ctx_alloc (quic_session->thread_index);	/*  Allocate before we get pointers */
  sctx = quic_ctx_get (sctx_index, quic_session->thread_index);
  qctx = quic_ctx_get (quic_session->connection_index,
		       quic_session->thread_index);
  if (quic_ctx_is_stream (qctx))
    {
      QUIC_ERR ("session is a stream");
      quic_ctx_free (sctx);
      return -1;
    }

  sctx->parent_app_wrk_id = qctx->parent_app_wrk_id;
  sctx->parent_app_id = qctx->parent_app_id;
  sctx->quic_connection_ctx_id = qctx->c_c_index;
  sctx->c_c_index = sctx_index;
  sctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
  sctx->flags |= QUIC_F_IS_STREAM;

  conn = qctx->conn;

  if (!conn || !quicly_connection_is_ready (conn))
    return -1;

  is_unidir = sep->transport_flags & TRANSPORT_CFG_F_UNIDIRECTIONAL;
  if ((rv = quicly_open_stream (conn, &stream, is_unidir)))
    {
      QUIC_DBG (2, "Stream open failed with %d", rv);
      return -1;
    }
  quic_increment_counter (QUIC_ERROR_OPENED_STREAM, 1);

  sctx->stream = stream;

  QUIC_DBG (2, "Opened stream %d, creating session", stream->stream_id);

  stream_session = session_alloc (qctx->c_thread_index);
  QUIC_DBG (2, "Allocated stream_session 0x%lx ctx %u",
	    session_handle (stream_session), sctx_index);
  stream_session->app_wrk_index = app_wrk->wrk_index;
  stream_session->connection_index = sctx_index;
  stream_session->listener_handle = quic_session_handle;
  stream_session->session_type =
    session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, qctx->udp_is_ip4);
  if (is_unidir)
    stream_session->flags |= SESSION_F_UNIDIRECTIONAL;

  sctx->c_s_index = stream_session->session_index;
  stream_data = (quic_stream_data_t *) stream->data;
  stream_data->ctx_id = sctx->c_c_index;
  stream_data->thread_index = sctx->c_thread_index;
  stream_data->app_rx_data_len = 0;
  stream_data->app_tx_data_len = 0;
  stream_session->session_state = SESSION_STATE_READY;

  /* For now we only reset streams. Cleanup will be triggered by timers */
  if ((rv = app_worker_init_connected (app_wrk, stream_session)))
    {
      QUIC_ERR ("failed to app_worker_init_connected");
      quicly_reset_stream (stream, QUIC_APP_CONNECT_NOTIFY_ERROR);
      return app_worker_connect_notify (app_wrk, NULL, rv, sep->opaque);
    }

  svm_fifo_add_want_deq_ntf (stream_session->rx_fifo,
			     SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
			     SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);

  if (app_worker_connect_notify (app_wrk, stream_session, SESSION_E_NONE,
				 sep->opaque))
    {
      QUIC_ERR ("failed to notify app");
      quic_increment_counter (QUIC_ERROR_CLOSED_STREAM, 1);
      quicly_reset_stream (stream, QUIC_APP_CONNECT_NOTIFY_ERROR);
      return -1;
    }

  return 0;
}

static int
quic_connect_connection (session_endpoint_cfg_t * sep)
{
  vnet_connect_args_t _cargs, *cargs = &_cargs;
  transport_endpt_crypto_cfg_t *ccfg;
  quic_main_t *qm = &quic_main;
  quic_ctx_t *ctx;
  app_worker_t *app_wrk;
  application_t *app;
  u32 ctx_index;
  u32 thread_index = vlib_get_thread_index ();
  int error;

  if (!sep->ext_cfg)
    return SESSION_E_NOEXTCFG;

  ccfg = &sep->ext_cfg->crypto;

  clib_memset (cargs, 0, sizeof (*cargs));
  ctx_index = quic_ctx_alloc (thread_index);
  ctx = quic_ctx_get (ctx_index, thread_index);
  ctx->parent_app_wrk_id = sep->app_wrk_index;
  ctx->c_s_index = QUIC_SESSION_INVALID;
  ctx->c_c_index = ctx_index;
  ctx->udp_is_ip4 = sep->is_ip4;
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  ctx->conn_state = QUIC_CONN_STATE_HANDSHAKE;
  ctx->client_opaque = sep->opaque;
  ctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
  if (ccfg->hostname[0])
    ctx->srv_hostname = format (0, "%s", ccfg->hostname);
  else
    /*  needed by quic for crypto + determining client / server */
    ctx->srv_hostname = format (0, "%U", format_ip46_address,
				&sep->ip, sep->is_ip4);
  vec_terminate_c_string (ctx->srv_hostname);

  clib_memcpy (&cargs->sep_ext, sep, sizeof (session_endpoint_cfg_t));
  cargs->sep.transport_proto = TRANSPORT_PROTO_UDP;
  cargs->app_index = qm->app_index;
  cargs->api_context = ctx_index;

  app_wrk = app_worker_get (sep->app_wrk_index);
  app = application_get (app_wrk->app_index);
  ctx->parent_app_id = app_wrk->app_index;
  cargs->sep_ext.ns_index = app->ns_index;
  cargs->sep_ext.transport_flags = TRANSPORT_CFG_F_CONNECTED;

  ctx->crypto_engine = ccfg->crypto_engine;
  ctx->ckpair_index = ccfg->ckpair_index;
  if ((error = quic_acquire_crypto_context (ctx)))
    return error;

  if ((error = vnet_connect (cargs)))
    return error;

  return 0;
}

static int
quic_connect (transport_endpoint_cfg_t * tep)
{
  QUIC_DBG (2, "Called quic_connect");
  session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
  session_t *quic_session;
  sep = (session_endpoint_cfg_t *) tep;

  quic_session = session_get_from_handle_if_valid (sep->parent_handle);
  if (quic_session)
    return quic_connect_stream (quic_session, sep);
  else
    return quic_connect_connection (sep);
}

static void
quic_proto_on_close (u32 ctx_index, u32 thread_index)
{
  int err;
  quic_ctx_t *ctx = quic_ctx_get_if_valid (ctx_index, thread_index);
  if (!ctx)
    return;
  session_t *stream_session = session_get (ctx->c_s_index,
					   ctx->c_thread_index);
#if QUIC_DEBUG >= 2
  clib_warning ("Closing session 0x%lx", session_handle (stream_session));
#endif
  if (quic_ctx_is_stream (ctx))
    {
      quicly_stream_t *stream = ctx->stream;
      if (!quicly_stream_has_send_side (quicly_is_client (stream->conn),
					stream->stream_id))
	return;
      quicly_sendstate_shutdown (&stream->sendstate, ctx->bytes_written +
				 svm_fifo_max_dequeue
				 (stream_session->tx_fifo));
      err = quicly_stream_sync_sendbuf (stream, 1);
      if (err)
	{
	  QUIC_DBG (1, "sendstate_shutdown failed for stream session %lu",
		    session_handle (stream_session));
	  quicly_reset_stream (stream, QUIC_APP_ERROR_CLOSE_NOTIFY);
	}
      quic_send_packets (ctx);
      return;
    }

  switch (ctx->conn_state)
    {
    case QUIC_CONN_STATE_OPENED:
    case QUIC_CONN_STATE_HANDSHAKE:
    case QUIC_CONN_STATE_READY:
      ctx->conn_state = QUIC_CONN_STATE_ACTIVE_CLOSING;
      quicly_conn_t *conn = ctx->conn;
      /* Start connection closing. Keep sending packets until quicly_send
         returns QUICLY_ERROR_FREE_CONNECTION */

      quic_increment_counter (QUIC_ERROR_CLOSED_CONNECTION, 1);
      quicly_close (conn, QUIC_APP_ERROR_CLOSE_NOTIFY, "Closed by peer");
      /* This also causes all streams to be closed (and the cb called) */
      quic_send_packets (ctx);
      break;
    case QUIC_CONN_STATE_PASSIVE_CLOSING:
      ctx->conn_state = QUIC_CONN_STATE_PASSIVE_CLOSING_APP_CLOSED;
      /* send_packets will eventually return an error, we delete the conn at
         that point */
      break;
    case QUIC_CONN_STATE_PASSIVE_CLOSING_QUIC_CLOSED:
      quic_connection_delete (ctx);
      break;
    case QUIC_CONN_STATE_ACTIVE_CLOSING:
      break;
    default:
      QUIC_ERR ("Trying to close conn in state %d", ctx->conn_state);
      break;
    }
}

static u32
quic_start_listen (u32 quic_listen_session_index, transport_endpoint_t * tep)
{
  vnet_listen_args_t _bargs, *args = &_bargs;
  transport_endpt_crypto_cfg_t *ccfg;
  quic_main_t *qm = &quic_main;
  session_handle_t udp_handle;
  session_endpoint_cfg_t *sep;
  session_t *udp_listen_session;
  app_worker_t *app_wrk;
  application_t *app;
  quic_ctx_t *lctx;
  u32 lctx_index;
  app_listener_t *app_listener;
  int rv;

  sep = (session_endpoint_cfg_t *) tep;
  if (!sep->ext_cfg)
    return SESSION_E_NOEXTCFG;

  ccfg = &sep->ext_cfg->crypto;
  app_wrk = app_worker_get (sep->app_wrk_index);
  app = application_get (app_wrk->app_index);
  QUIC_DBG (2, "Called quic_start_listen for app %d", app_wrk->app_index);

  clib_memset (args, 0, sizeof (*args));
  args->app_index = qm->app_index;
  args->sep_ext = *sep;
  args->sep_ext.ns_index = app->ns_index;
  args->sep_ext.transport_proto = TRANSPORT_PROTO_UDP;
  args->sep_ext.transport_flags = TRANSPORT_CFG_F_CONNECTED;
  if ((rv = vnet_listen (args)))
    return rv;

  lctx_index = quic_ctx_alloc (0);
  udp_handle = args->handle;
  app_listener = app_listener_get_w_handle (udp_handle);
  udp_listen_session = app_listener_get_session (app_listener);
  udp_listen_session->opaque = lctx_index;

  lctx = quic_ctx_get (lctx_index, 0);
  lctx->flags |= QUIC_F_IS_LISTENER;

  clib_memcpy (&lctx->c_rmt_ip, &args->sep.peer.ip, sizeof (ip46_address_t));
  clib_memcpy (&lctx->c_lcl_ip, &args->sep.ip, sizeof (ip46_address_t));
  lctx->c_rmt_port = args->sep.peer.port;
  lctx->c_lcl_port = args->sep.port;
  lctx->c_is_ip4 = args->sep.is_ip4;
  lctx->c_fib_index = args->sep.fib_index;
  lctx->c_proto = TRANSPORT_PROTO_QUIC;
  lctx->parent_app_wrk_id = sep->app_wrk_index;
  lctx->parent_app_id = app_wrk->app_index;
  lctx->udp_session_handle = udp_handle;
  lctx->c_s_index = quic_listen_session_index;
  lctx->crypto_engine = ccfg->crypto_engine;
  lctx->ckpair_index = ccfg->ckpair_index;
  if ((rv = quic_acquire_crypto_context (lctx)))
    return rv;

  QUIC_DBG (2, "Listening UDP session 0x%lx",
	    session_handle (udp_listen_session));
  QUIC_DBG (2, "Listening QUIC session 0x%lx", quic_listen_session_index);
  return lctx_index;
}

static u32
quic_stop_listen (u32 lctx_index)
{
  QUIC_DBG (2, "Called quic_stop_listen");
  quic_ctx_t *lctx;
  lctx = quic_ctx_get (lctx_index, 0);
  QUIC_ASSERT (quic_ctx_is_listener (lctx));
  vnet_unlisten_args_t a = {
    .handle = lctx->udp_session_handle,
    .app_index = quic_main.app_index,
    .wrk_map_index = 0		/* default wrk */
  };
  if (vnet_unlisten (&a))
    clib_warning ("unlisten errored");

  quic_release_crypto_context (lctx->crypto_context_index,
			       0 /* thread_index */ );
  quic_ctx_free (lctx);
  return 0;
}

static transport_connection_t *
quic_connection_get (u32 ctx_index, u32 thread_index)
{
  quic_ctx_t *ctx;
  ctx = quic_ctx_get (ctx_index, thread_index);
  return &ctx->connection;
}

static transport_connection_t *
quic_listener_get (u32 listener_index)
{
  QUIC_DBG (2, "Called quic_listener_get");
  quic_ctx_t *ctx;
  ctx = quic_ctx_get (listener_index, 0);
  return &ctx->connection;
}

static u8 *
format_quic_ctx (u8 * s, va_list * args)
{
  quic_ctx_t *ctx = va_arg (*args, quic_ctx_t *);
  u32 verbose = va_arg (*args, u32);
  u8 *str = 0;

  if (!ctx)
    return s;
  str = format (str, "[#%d][Q] ", ctx->c_thread_index);

  if (quic_ctx_is_listener (ctx))
    str = format (str, "Listener, UDP %ld", ctx->udp_session_handle);
  else if (quic_ctx_is_stream (ctx))
    str = format (str, "Stream %ld conn %d",
		  ctx->stream->stream_id, ctx->quic_connection_ctx_id);
  else				/* connection */
    str = format (str, "Conn %d UDP %d", ctx->c_c_index,
		  ctx->udp_session_handle);

  str = format (str, " app %d wrk %d", ctx->parent_app_id,
		ctx->parent_app_wrk_id);

  if (verbose == 1)
    s = format (s, "%-" SESSION_CLI_ID_LEN "s%-" SESSION_CLI_STATE_LEN "d",
		str, ctx->conn_state);
  else
    s = format (s, "%s\n", str);
  vec_free (str);
  return s;
}

static u8 *
format_quic_connection (u8 * s, va_list * args)
{
  u32 qc_index = va_arg (*args, u32);
  u32 thread_index = va_arg (*args, u32);
  u32 verbose = va_arg (*args, u32);
  quic_ctx_t *ctx = quic_ctx_get (qc_index, thread_index);
  s = format (s, "%U", format_quic_ctx, ctx, verbose);
  return s;
}

static u8 *
format_quic_half_open (u8 * s, va_list * args)
{
  u32 qc_index = va_arg (*args, u32);
  u32 thread_index = va_arg (*args, u32);
  quic_ctx_t *ctx = quic_ctx_get (qc_index, thread_index);
  s = format (s, "[#%d][Q] half-open app %u", thread_index,
	      ctx->parent_app_id);
  return s;
}

/*  TODO improve */
static u8 *
format_quic_listener (u8 * s, va_list * args)
{
  u32 tci = va_arg (*args, u32);
  u32 thread_index = va_arg (*args, u32);
  u32 verbose = va_arg (*args, u32);
  quic_ctx_t *ctx = quic_ctx_get (tci, thread_index);
  s = format (s, "%U", format_quic_ctx, ctx, verbose);
  return s;
}

/* Session layer callbacks */

static inline void
quic_build_sockaddr (struct sockaddr *sa, socklen_t * salen,
		     ip46_address_t * addr, u16 port, u8 is_ip4)
{
  if (is_ip4)
    {
      struct sockaddr_in *sa4 = (struct sockaddr_in *) sa;
      sa4->sin_family = AF_INET;
      sa4->sin_port = port;
      sa4->sin_addr.s_addr = addr->ip4.as_u32;
      *salen = sizeof (struct sockaddr_in);
    }
  else
    {
      struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) sa;
      sa6->sin6_family = AF_INET6;
      sa6->sin6_port = port;
      clib_memcpy (&sa6->sin6_addr, &addr->ip6, 16);
      *salen = sizeof (struct sockaddr_in6);
    }
}

static void
quic_on_quic_session_connected (quic_ctx_t * ctx)
{
  session_t *quic_session;
  app_worker_t *app_wrk;
  u32 ctx_id = ctx->c_c_index;
  u32 thread_index = ctx->c_thread_index;
  int rv;

  quic_session = session_alloc (thread_index);

  QUIC_DBG (2, "Allocated quic session 0x%lx", session_handle (quic_session));
  ctx->c_s_index = quic_session->session_index;
  quic_session->app_wrk_index = ctx->parent_app_wrk_id;
  quic_session->connection_index = ctx->c_c_index;
  quic_session->listener_handle = SESSION_INVALID_HANDLE;
  quic_session->session_type =
    session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, ctx->udp_is_ip4);

  /* If quic session connected fails, immediatly close connection */
  app_wrk = app_worker_get (ctx->parent_app_wrk_id);
  if ((rv = app_worker_init_connected (app_wrk, quic_session)))
    {
      QUIC_ERR ("failed to app_worker_init_connected");
      quic_proto_on_close (ctx_id, thread_index);
      app_worker_connect_notify (app_wrk, NULL, rv, ctx->client_opaque);
      return;
    }

  quic_session->session_state = SESSION_STATE_CONNECTING;
  if ((rv = app_worker_connect_notify (app_wrk, quic_session,
				       SESSION_E_NONE, ctx->client_opaque)))
    {
      QUIC_ERR ("failed to notify app %d", rv);
      quic_proto_on_close (ctx_id, thread_index);
      return;
    }

  /*  If the app opens a stream in its callback it may invalidate ctx */
  ctx = quic_ctx_get (ctx_id, thread_index);
  /*
   * app_worker_connect_notify() might have reallocated pool, reload
   * quic_session pointer
   */
  quic_session = session_get (ctx->c_s_index, thread_index);
  quic_session->session_state = SESSION_STATE_LISTENING;
}

static void
quic_check_quic_session_connected (quic_ctx_t * ctx)
{
  /* Called when we need to trigger quic session connected
   * we may call this function on the server side / at
   * stream opening */

  /* Conn may be set to null if the connection is terminated */
  if (!ctx->conn || ctx->conn_state != QUIC_CONN_STATE_HANDSHAKE)
    return;
  if (!quicly_connection_is_ready (ctx->conn))
    return;
  ctx->conn_state = QUIC_CONN_STATE_READY;
  if (!quicly_is_client (ctx->conn))
    return;
  quic_on_quic_session_connected (ctx);
}

static inline void
quic_update_conn_ctx (quicly_conn_t * conn, quicly_context_t * quicly_context)
{
  /* we need to update the quicly_conn on migrate
   * as it contains a pointer to the crypto context */
  ptls_context_t **tls;
  quicly_context_t **_quicly_context;
  _quicly_context = (quicly_context_t **) conn;
  *_quicly_context = quicly_context;
  tls = (ptls_context_t **) quicly_get_tls (conn);
  *tls = quicly_context->tls;
}

static void
quic_receive_connection (void *arg)
{
  u32 new_ctx_id, thread_index = vlib_get_thread_index ();
  quic_ctx_t *temp_ctx, *new_ctx;
  clib_bihash_kv_16_8_t kv;
  quicly_conn_t *conn;
  quicly_context_t *quicly_context;
  session_t *udp_session;

  temp_ctx = arg;
  new_ctx_id = quic_ctx_alloc (thread_index);
  new_ctx = quic_ctx_get (new_ctx_id, thread_index);

  QUIC_DBG (2, "Received conn %u (now %u)", temp_ctx->c_thread_index,
	    new_ctx_id);

  clib_memcpy (new_ctx, temp_ctx, sizeof (quic_ctx_t));
  clib_mem_free (temp_ctx);

  new_ctx->c_thread_index = thread_index;
  new_ctx->c_c_index = new_ctx_id;
  quic_acquire_crypto_context (new_ctx);

  conn = new_ctx->conn;
  quicly_context = quic_get_quicly_ctx_from_ctx (new_ctx);
  quic_update_conn_ctx (conn, quicly_context);

  quic_store_conn_ctx (conn, new_ctx);
  quic_make_connection_key (&kv, quicly_get_master_id (conn));
  kv.value = ((u64) thread_index) << 32 | (u64) new_ctx_id;
  QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
  clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );
  new_ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  quic_update_timer (new_ctx);

  /*  Trigger write on this connection if necessary */
  udp_session = session_get_from_handle (new_ctx->udp_session_handle);
  udp_session->opaque = new_ctx_id;
  udp_session->flags &= ~SESSION_F_IS_MIGRATING;
  if (svm_fifo_max_dequeue (udp_session->tx_fifo))
    quic_set_udp_tx_evt (udp_session);
}

static void
quic_transfer_connection (u32 ctx_index, u32 dest_thread)
{
  quic_ctx_t *ctx, *temp_ctx;
  u32 thread_index = vlib_get_thread_index ();

  QUIC_DBG (2, "Transferring conn %u to thread %u", ctx_index, dest_thread);

  temp_ctx = clib_mem_alloc (sizeof (quic_ctx_t));
  QUIC_ASSERT (temp_ctx != NULL);
  ctx = quic_ctx_get (ctx_index, thread_index);

  clib_memcpy (temp_ctx, ctx, sizeof (quic_ctx_t));

  quic_stop_ctx_timer (ctx);
  quic_release_crypto_context (ctx->crypto_context_index, thread_index);
  quic_ctx_free (ctx);

  /*  Send connection to destination thread */
  session_send_rpc_evt_to_thread (dest_thread, quic_receive_connection,
				  (void *) temp_ctx);
}

static int
quic_udp_session_connected_callback (u32 quic_app_index, u32 ctx_index,
				     session_t * udp_session,
				     session_error_t err)
{
  QUIC_DBG (2, "UDP Session is now connected (id %u)",
	    udp_session->session_index);
  /* This should always be called before quic_connect returns since UDP always
   * connects instantly. */
  clib_bihash_kv_16_8_t kv;
  struct sockaddr_in6 sa6;
  struct sockaddr *sa = (struct sockaddr *) &sa6;
  socklen_t salen;
  transport_connection_t *tc;
  app_worker_t *app_wrk;
  quicly_conn_t *conn;
  quic_ctx_t *ctx;
  u32 thread_index = vlib_get_thread_index ();
  int ret;
  quicly_context_t *quicly_ctx;


  ctx = quic_ctx_get (ctx_index, thread_index);
  if (err)
    {
      u32 api_context;
      app_wrk = app_worker_get_if_valid (ctx->parent_app_wrk_id);
      if (app_wrk)
	{
	  api_context = ctx->c_s_index;
	  app_worker_connect_notify (app_wrk, 0, SESSION_E_NONE, api_context);
	}
      return 0;
    }

  ctx->c_thread_index = thread_index;
  ctx->c_c_index = ctx_index;

  QUIC_DBG (2, "New ctx [%u]%x", thread_index, (ctx) ? ctx_index : ~0);

  ctx->udp_session_handle = session_handle (udp_session);
  udp_session->opaque = ctx_index;

  /* Init QUIC lib connection
   * Generate required sockaddr & salen */
  tc = session_get_transport (udp_session);
  quic_build_sockaddr (sa, &salen, &tc->rmt_ip, tc->rmt_port, tc->is_ip4);

  quicly_ctx = quic_get_quicly_ctx_from_ctx (ctx);
  ret = quicly_connect (&ctx->conn, quicly_ctx, (char *) ctx->srv_hostname,
			sa, NULL, &quic_main.wrk_ctx[thread_index].next_cid,
			ptls_iovec_init (NULL, 0), &quic_main.hs_properties,
			NULL);
  ++quic_main.wrk_ctx[thread_index].next_cid.master_id;
  /*  Save context handle in quicly connection */
  quic_store_conn_ctx (ctx->conn, ctx);
  assert (ret == 0);

  /*  Register connection in connections map */
  conn = ctx->conn;
  quic_make_connection_key (&kv, quicly_get_master_id (conn));
  kv.value = ((u64) thread_index) << 32 | (u64) ctx_index;
  QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
  clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );

  /*  UDP stack quirk? preemptively transfer connection if that happens */
  if (udp_session->thread_index != thread_index)
    quic_transfer_connection (ctx_index, udp_session->thread_index);
  else
    quic_send_packets (ctx);

  return ret;
}

static void
quic_udp_session_disconnect_callback (session_t * s)
{
  clib_warning ("UDP session disconnected???");
}

static void
quic_udp_session_cleanup_callback (session_t * udp_session,
				   session_cleanup_ntf_t ntf)
{
  quic_ctx_t *ctx;

  if (ntf != SESSION_CLEANUP_SESSION)
    return;

  ctx = quic_ctx_get (udp_session->opaque, udp_session->thread_index);
  quic_stop_ctx_timer (ctx);
  quic_release_crypto_context (ctx->crypto_context_index,
			       ctx->c_thread_index);
  quic_ctx_free (ctx);
}

static void
quic_udp_session_reset_callback (session_t * s)
{
  clib_warning ("UDP session reset???");
}

static void
quic_udp_session_migrate_callback (session_t * s, session_handle_t new_sh)
{
  u32 new_thread = session_thread_from_handle (new_sh);
  quic_ctx_t *ctx;

  QUIC_DBG (2, "Session %x migrated to %lx", s->session_index, new_sh);
  QUIC_ASSERT (vlib_get_thread_index () == s->thread_index);
  ctx = quic_ctx_get (s->opaque, s->thread_index);
  QUIC_ASSERT (ctx->udp_session_handle == session_handle (s));

  ctx->udp_session_handle = new_sh;
#if QUIC_DEBUG >= 1
  s->opaque = 0xfeedface;
#endif
  quic_transfer_connection (ctx->c_c_index, new_thread);
}

int
quic_udp_session_accepted_callback (session_t * udp_session)
{
  /* New UDP connection, try to accept it */
  u32 ctx_index;
  quic_ctx_t *ctx, *lctx;
  session_t *udp_listen_session;
  u32 thread_index = vlib_get_thread_index ();

  udp_listen_session =
    listen_session_get_from_handle (udp_session->listener_handle);

  ctx_index = quic_ctx_alloc (thread_index);
  ctx = quic_ctx_get (ctx_index, thread_index);
  ctx->c_thread_index = udp_session->thread_index;
  ctx->c_c_index = ctx_index;
  ctx->c_s_index = QUIC_SESSION_INVALID;
  ctx->udp_session_handle = session_handle (udp_session);
  QUIC_DBG (2, "ACCEPTED UDP 0x%lx", ctx->udp_session_handle);
  ctx->listener_ctx_id = udp_listen_session->opaque;
  lctx = quic_ctx_get (udp_listen_session->opaque,
		       udp_listen_session->thread_index);
  ctx->udp_is_ip4 = lctx->c_is_ip4;
  ctx->parent_app_id = lctx->parent_app_id;
  ctx->parent_app_wrk_id = lctx->parent_app_wrk_id;
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  ctx->conn_state = QUIC_CONN_STATE_OPENED;
  ctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;

  ctx->crypto_engine = lctx->crypto_engine;
  ctx->ckpair_index = lctx->ckpair_index;
  quic_acquire_crypto_context (ctx);
  udp_session->opaque = ctx_index;

  /* TODO timeout to delete these if they never connect */
  return 0;
}

static int
quic_add_segment_callback (u32 client_index, u64 seg_handle)
{
  /* No-op for builtin */
  return 0;
}

static int
quic_del_segment_callback (u32 client_index, u64 seg_handle)
{
  /* No-op for builtin */
  return 0;
}

static int
quic_custom_app_rx_callback (transport_connection_t * tc)
{
  quic_ctx_t *ctx;
  session_t *stream_session = session_get (tc->s_index, tc->thread_index);
  QUIC_DBG (3, "Received app READ notification");
  quic_ack_rx_data (stream_session);
  svm_fifo_reset_has_deq_ntf (stream_session->rx_fifo);

  /* Need to send packets (acks may never be sent otherwise) */
  ctx = quic_ctx_get (stream_session->connection_index,
		      stream_session->thread_index);
  quic_send_packets (ctx);
  return 0;
}

static int
quic_custom_tx_callback (void *s, transport_send_params_t * sp)
{
  session_t *stream_session = (session_t *) s;
  quic_stream_data_t *stream_data;
  quicly_stream_t *stream;
  quic_ctx_t *ctx;
  u32 max_deq;
  int rv;

  if (PREDICT_FALSE
      (stream_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING))
    return 0;
  ctx = quic_ctx_get (stream_session->connection_index,
		      stream_session->thread_index);
  if (PREDICT_FALSE (!quic_ctx_is_stream (ctx)))
    {
      goto tx_end;		/* Most probably a reschedule */
    }

  QUIC_DBG (3, "Stream TX event");
  quic_ack_rx_data (stream_session);
  stream = ctx->stream;
  if (!quicly_sendstate_is_open (&stream->sendstate))
    {
      QUIC_ERR ("Warning: tried to send on closed stream");
      return 0;
    }

  stream_data = (quic_stream_data_t *) stream->data;
  max_deq = svm_fifo_max_dequeue (stream_session->tx_fifo);
  QUIC_ASSERT (max_deq >= stream_data->app_tx_data_len);
  if (max_deq == stream_data->app_tx_data_len)
    {
      QUIC_DBG (3, "TX but no data %d / %d", max_deq,
		stream_data->app_tx_data_len);
      return 0;
    }
  stream_data->app_tx_data_len = max_deq;
  rv = quicly_stream_sync_sendbuf (stream, 1);
  QUIC_ASSERT (!rv);

tx_end:
  return quic_send_packets (ctx);
}

/*
 * Returns 0 if a matching connection is found and is on the right thread.
 * Otherwise returns -1.
 * If a connection is found, even on the wrong thread, ctx_thread and ctx_index
 * will be set.
 */
static inline int
quic_find_packet_ctx (quic_rx_packet_ctx_t * pctx, u32 caller_thread_index)
{
  clib_bihash_kv_16_8_t kv;
  clib_bihash_16_8_t *h;
  quic_ctx_t *ctx;
  u32 index, thread_id;

  h = &quic_main.connection_hash;
  quic_make_connection_key (&kv, &pctx->packet.cid.dest.plaintext);
  QUIC_DBG (3, "Searching conn with id %lu %lu", kv.key[0], kv.key[1]);

  if (clib_bihash_search_16_8 (h, &kv, &kv))
    {
      QUIC_DBG (3, "connection not found");
      return QUIC_PACKET_TYPE_NONE;
    }

  index = kv.value & UINT32_MAX;
  thread_id = kv.value >> 32;
  /* Check if this connection belongs to this thread, otherwise
   * ask for it to be moved */
  if (thread_id != caller_thread_index)
    {
      QUIC_DBG (2, "Connection is on wrong thread");
      /* Cannot make full check with quicly_is_destination... */
      pctx->ctx_index = index;
      pctx->thread_index = thread_id;
      return QUIC_PACKET_TYPE_MIGRATE;
    }
  ctx = quic_ctx_get (index, vlib_get_thread_index ());
  if (!ctx->conn)
    {
      QUIC_ERR ("ctx has no conn");
      return QUIC_PACKET_TYPE_NONE;
    }
  if (!quicly_is_destination (ctx->conn, NULL, &pctx->sa, &pctx->packet))
    return QUIC_PACKET_TYPE_NONE;

  QUIC_DBG (3, "Connection found");
  pctx->ctx_index = index;
  pctx->thread_index = thread_id;
  return QUIC_PACKET_TYPE_RECEIVE;
}

static void
quic_accept_connection (quic_rx_packet_ctx_t * pctx)
{
  quicly_context_t *quicly_ctx;
  session_t *quic_session;
  clib_bihash_kv_16_8_t kv;
  app_worker_t *app_wrk;
  quicly_conn_t *conn;
  quic_ctx_t *ctx;
  quic_ctx_t *lctx;
  int rv;

  /* new connection, accept and create context if packet is valid
   * TODO: check if socket is actually listening? */
  ctx = quic_ctx_get (pctx->ctx_index, pctx->thread_index);
  if (ctx->c_s_index != QUIC_SESSION_INVALID)
    {
      QUIC_DBG (2, "already accepted ctx 0x%x", ctx->c_s_index);
      return;
    }

  quicly_ctx = quic_get_quicly_ctx_from_ctx (ctx);
  if ((rv = quicly_accept (&conn, quicly_ctx, NULL, &pctx->sa,
			   &pctx->packet, NULL,
			   &quic_main.wrk_ctx[pctx->thread_index].next_cid,
			   NULL)))
    {
      /* Invalid packet, pass */
      assert (conn == NULL);
      QUIC_ERR ("Accept failed with %U", quic_format_err, rv);
      /* TODO: cleanup created quic ctx and UDP session */
      return;
    }
  assert (conn != NULL);

  ++quic_main.wrk_ctx[pctx->thread_index].next_cid.master_id;
  /* Save ctx handle in quicly connection */
  quic_store_conn_ctx (conn, ctx);
  ctx->conn = conn;

  quic_session = session_alloc (ctx->c_thread_index);
  QUIC_DBG (2, "Allocated quic_session, 0x%lx ctx %u",
	    session_handle (quic_session), ctx->c_c_index);
  quic_session->session_state = SESSION_STATE_LISTENING;
  ctx->c_s_index = quic_session->session_index;

  lctx = quic_ctx_get (ctx->listener_ctx_id, 0);

  quic_session->app_wrk_index = lctx->parent_app_wrk_id;
  quic_session->connection_index = ctx->c_c_index;
  quic_session->session_type =
    session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, ctx->udp_is_ip4);
  quic_session->listener_handle = lctx->c_s_index;

  /* Register connection in connections map */
  quic_make_connection_key (&kv, quicly_get_master_id (conn));
  kv.value = ((u64) pctx->thread_index) << 32 | (u64) pctx->ctx_index;
  clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );
  QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);

  /* If notify fails, reset connection immediatly */
  if ((rv = app_worker_init_accepted (quic_session)))
    {
      QUIC_ERR ("failed to allocate fifos");
      quic_proto_on_close (pctx->ctx_index, pctx->thread_index);
      return;
    }

  app_wrk = app_worker_get (quic_session->app_wrk_index);
  if ((rv = app_worker_accept_notify (app_wrk, quic_session)))
    {
      QUIC_ERR ("failed to notify accept worker app");
      quic_proto_on_close (pctx->ctx_index, pctx->thread_index);
      return;
    }

  ctx->conn_state = QUIC_CONN_STATE_READY;
}

static int
quic_reset_connection (u64 udp_session_handle, quic_rx_packet_ctx_t * pctx)
{
  /* short header packet; potentially a dead connection. No need to check the
   * length of the incoming packet, because loop is prevented by authenticating
   * the CID (by checking node_id and thread_id). If the peer is also sending a
   * reset, then the next CID is highly likely to contain a non-authenticating
   * CID, ... */
  QUIC_DBG (2, "Sending stateless reset");
  int rv;
  session_t *udp_session;
  quicly_context_t *quicly_ctx;
  if (pctx->packet.cid.dest.plaintext.node_id != 0
      || pctx->packet.cid.dest.plaintext.thread_id != 0)
    return 0;
  quicly_ctx = quic_get_quicly_ctx_from_udp (udp_session_handle);
  quic_ctx_t *qctx = quic_ctx_get (pctx->ctx_index, pctx->thread_index);

  quicly_address_t src;
  uint8_t payload[quicly_ctx->transport_params.max_udp_payload_size];
  size_t payload_len =
    quicly_send_stateless_reset (quicly_ctx, &src.sa, payload);
  if (payload_len == 0)
    return 1;

  struct iovec packet;
  packet.iov_len = payload_len;
  packet.iov_base = payload;

  struct _st_quicly_conn_public_t *conn =
    (struct _st_quicly_conn_public_t *) qctx->conn;

  udp_session = session_get_from_handle (udp_session_handle);
  rv = quic_send_datagram (udp_session, &packet, &conn->remote.address,
			   &conn->local.address);
  quic_set_udp_tx_evt (udp_session);
  return rv;
}

static int
quic_process_one_rx_packet (u64 udp_session_handle, svm_fifo_t * f,
			    u32 fifo_offset, quic_rx_packet_ctx_t * pctx)
{
  size_t plen;
  u32 full_len, ret;
  u32 thread_index = vlib_get_thread_index ();
  u32 cur_deq = svm_fifo_max_dequeue (f) - fifo_offset;
  quicly_context_t *quicly_ctx;
  session_t *udp_session;
  int rv;

  ret = svm_fifo_peek (f, fifo_offset,
		       SESSION_CONN_HDR_LEN, (u8 *) & pctx->ph);
  QUIC_ASSERT (ret == SESSION_CONN_HDR_LEN);
  QUIC_ASSERT (pctx->ph.data_offset == 0);
  full_len = pctx->ph.data_length + SESSION_CONN_HDR_LEN;
  if (full_len > cur_deq)
    {
      QUIC_ERR ("Not enough data in fifo RX");
      return 1;
    }

  /* Quicly can read len bytes from the fifo at offset:
   * ph.data_offset + SESSION_CONN_HDR_LEN */
  ret = svm_fifo_peek (f, SESSION_CONN_HDR_LEN + fifo_offset,
		       pctx->ph.data_length, pctx->data);
  if (ret != pctx->ph.data_length)
    {
      QUIC_ERR ("Not enough data peeked in RX");
      return 1;
    }

  quic_increment_counter (QUIC_ERROR_RX_PACKETS, 1);
  quic_build_sockaddr (&pctx->sa, &pctx->salen, &pctx->ph.rmt_ip,
		       pctx->ph.rmt_port, pctx->ph.is_ip4);
  quicly_ctx = quic_get_quicly_ctx_from_udp (udp_session_handle);

  size_t off = 0;
  plen = quicly_decode_packet (quicly_ctx, &pctx->packet, pctx->data,
			       pctx->ph.data_length, &off);

  if (plen == SIZE_MAX)
    {
      return 1;
    }

  rv = quic_find_packet_ctx (pctx, thread_index);
  if (rv == QUIC_PACKET_TYPE_RECEIVE)
    {
      pctx->ptype = QUIC_PACKET_TYPE_RECEIVE;

      if (quic_main.vnet_crypto_enabled &&
	  quic_main.default_crypto_engine == CRYPTO_ENGINE_VPP)
	{
	  quic_ctx_t *qctx = quic_ctx_get (pctx->ctx_index, thread_index);
	  quic_crypto_decrypt_packet (qctx, pctx);
	}
      return 0;
    }
  else if (rv == QUIC_PACKET_TYPE_MIGRATE)
    {
      pctx->ptype = QUIC_PACKET_TYPE_MIGRATE;
      /*  Connection found but on wrong thread, ask move */
    }
  else if (QUICLY_PACKET_IS_LONG_HEADER (pctx->packet.octets.base[0]))
    {
      pctx->ptype = QUIC_PACKET_TYPE_ACCEPT;
      udp_session = session_get_from_handle (udp_session_handle);
      pctx->ctx_index = udp_session->opaque;
      pctx->thread_index = thread_index;
    }
  else
    {
      pctx->ptype = QUIC_PACKET_TYPE_RESET;
    }
  return 1;
}

static int
quic_udp_session_rx_callback (session_t * udp_session)
{
  /*  Read data from UDP rx_fifo and pass it to the quicly conn. */
  quic_ctx_t *ctx = NULL, *prev_ctx = NULL;
  svm_fifo_t *f = udp_session->rx_fifo;
  u32 max_deq;
  u64 udp_session_handle = session_handle (udp_session);
  int rv = 0;
  u32 thread_index = vlib_get_thread_index ();
  u32 cur_deq, fifo_offset, max_packets, i;

  quic_rx_packet_ctx_t packets_ctx[QUIC_RCV_MAX_PACKETS];

  if (udp_session->flags & SESSION_F_IS_MIGRATING)
    {
      QUIC_DBG (3, "RX on migrating udp session");
      return 0;
    }

rx_start:
  max_deq = svm_fifo_max_dequeue (f);
  if (max_deq == 0)
    return 0;

  fifo_offset = 0;
  max_packets = QUIC_RCV_MAX_PACKETS;

#if CLIB_DEBUG > 0
  clib_memset (packets_ctx, 0xfa,
	       QUIC_RCV_MAX_PACKETS * sizeof (quic_rx_packet_ctx_t));
#endif
  for (i = 0; i < max_packets; i++)
    {
      packets_ctx[i].thread_index = UINT32_MAX;
      packets_ctx[i].ctx_index = UINT32_MAX;
      packets_ctx[i].ptype = QUIC_PACKET_TYPE_DROP;

      cur_deq = max_deq - fifo_offset;
      if (cur_deq == 0)
	{
	  max_packets = i + 1;
	  break;
	}
      if (cur_deq < SESSION_CONN_HDR_LEN)
	{
	  fifo_offset = max_deq;
	  max_packets = i + 1;
	  QUIC_ERR ("Fifo %d < header size in RX", cur_deq);
	  break;
	}
      rv = quic_process_one_rx_packet (udp_session_handle, f,
				       fifo_offset, &packets_ctx[i]);
      if (packets_ctx[i].ptype != QUIC_PACKET_TYPE_MIGRATE)
	fifo_offset += SESSION_CONN_HDR_LEN + packets_ctx[i].ph.data_length;
      if (rv)
	{
	  max_packets = i + 1;
	  break;
	}
    }

  for (i = 0; i < max_packets; i++)
    {
      switch (packets_ctx[i].ptype)
	{
	case QUIC_PACKET_TYPE_RECEIVE:
	  ctx = quic_ctx_get (packets_ctx[i].ctx_index, thread_index);
	  rv = quicly_receive (ctx->conn, NULL, &packets_ctx[i].sa,
			       &packets_ctx[i].packet);
	  if (rv && rv != QUICLY_ERROR_PACKET_IGNORED)
	    {
	      QUIC_ERR ("quicly_receive return error %U",
			quic_format_err, rv);
	    }
	  break;
	case QUIC_PACKET_TYPE_ACCEPT:
	  quic_accept_connection (&packets_ctx[i]);
	  break;
	case QUIC_PACKET_TYPE_RESET:
	  quic_reset_connection (udp_session_handle, &packets_ctx[i]);
	  break;
	}
    }
  ctx = prev_ctx = NULL;
  for (i = 0; i < max_packets; i++)
    {
      prev_ctx = ctx;
      switch (packets_ctx[i].ptype)
	{
	case QUIC_PACKET_TYPE_RECEIVE:
	  ctx = quic_ctx_get (packets_ctx[i].ctx_index,
			      packets_ctx[i].thread_index);
	  quic_check_quic_session_connected (ctx);
	  ctx = quic_ctx_get (packets_ctx[i].ctx_index,
			      packets_ctx[i].thread_index);
	  break;
	case QUIC_PACKET_TYPE_ACCEPT:
	  ctx = quic_ctx_get (packets_ctx[i].ctx_index,
			      packets_ctx[i].thread_index);
	  break;
	default:
	  continue;		/* this exits the for loop since other packet types are
				   necessarily the last in the batch */
	}
      if (ctx != prev_ctx)
	quic_send_packets (ctx);
    }

  udp_session = session_get_from_handle (udp_session_handle);	/*  session alloc might have happened */
  f = udp_session->rx_fifo;
  svm_fifo_dequeue_drop (f, fifo_offset);

  if (svm_fifo_max_dequeue (f))
    goto rx_start;

  return 0;
}

always_inline void
quic_common_get_transport_endpoint (quic_ctx_t * ctx,
				    transport_endpoint_t * tep, u8 is_lcl)
{
  session_t *udp_session;
  if (!quic_ctx_is_stream (ctx))
    {
      udp_session = session_get_from_handle (ctx->udp_session_handle);
      session_get_endpoint (udp_session, tep, is_lcl);
    }
}

static void
quic_get_transport_listener_endpoint (u32 listener_index,
				      transport_endpoint_t * tep, u8 is_lcl)
{
  quic_ctx_t *ctx;
  app_listener_t *app_listener;
  session_t *udp_listen_session;
  ctx = quic_ctx_get (listener_index, vlib_get_thread_index ());
  if (quic_ctx_is_listener (ctx))
    {
      app_listener = app_listener_get_w_handle (ctx->udp_session_handle);
      udp_listen_session = app_listener_get_session (app_listener);
      return session_get_endpoint (udp_listen_session, tep, is_lcl);
    }
  quic_common_get_transport_endpoint (ctx, tep, is_lcl);
}

static void
quic_get_transport_endpoint (u32 ctx_index, u32 thread_index,
			     transport_endpoint_t * tep, u8 is_lcl)
{
  quic_ctx_t *ctx;
  ctx = quic_ctx_get (ctx_index, thread_index);
  quic_common_get_transport_endpoint (ctx, tep, is_lcl);
}

/* *INDENT-OFF* */
static session_cb_vft_t quic_app_cb_vft = {
  .session_accept_callback = quic_udp_session_accepted_callback,
  .session_disconnect_callback = quic_udp_session_disconnect_callback,
  .session_connected_callback = quic_udp_session_connected_callback,
  .session_reset_callback = quic_udp_session_reset_callback,
  .session_migrate_callback = quic_udp_session_migrate_callback,
  .add_segment_callback = quic_add_segment_callback,
  .del_segment_callback = quic_del_segment_callback,
  .builtin_app_rx_callback = quic_udp_session_rx_callback,
  .session_cleanup_callback = quic_udp_session_cleanup_callback,
  .app_cert_key_pair_delete_callback = quic_app_cert_key_pair_delete_callback,
};

static const transport_proto_vft_t quic_proto = {
  .connect = quic_connect,
  .close = quic_proto_on_close,
  .start_listen = quic_start_listen,
  .stop_listen = quic_stop_listen,
  .get_connection = quic_connection_get,
  .get_listener = quic_listener_get,
  .update_time = quic_update_time,
  .app_rx_evt = quic_custom_app_rx_callback,
  .custom_tx = quic_custom_tx_callback,
  .format_connection = format_quic_connection,
  .format_half_open = format_quic_half_open,
  .format_listener = format_quic_listener,
  .get_transport_endpoint = quic_get_transport_endpoint,
  .get_transport_listener_endpoint = quic_get_transport_listener_endpoint,
  .transport_options = {
    .name = "quic",
    .short_name = "Q",
    .tx_type = TRANSPORT_TX_INTERNAL,
    .service_type = TRANSPORT_SERVICE_APP,
  },
};
/* *INDENT-ON* */

static quicly_stream_open_t on_stream_open = { quic_on_stream_open };
static quicly_closed_by_remote_t on_closed_by_remote = {
  quic_on_closed_by_remote
};
static quicly_now_t quicly_vpp_now_cb = { quic_get_time };

static void
quic_register_cipher_suite (crypto_engine_type_t type,
			    ptls_cipher_suite_t ** ciphers)
{
  quic_main_t *qm = &quic_main;
  vec_validate (qm->quic_ciphers, type);
  clib_bitmap_set (qm->available_crypto_engines, type, 1);
  qm->quic_ciphers[type] = ciphers;
}

static void
quic_update_fifo_size ()
{
  quic_main_t *qm = &quic_main;
  segment_manager_props_t *seg_mgr_props =
    application_get_segment_manager_properties (qm->app_index);

  if (!seg_mgr_props)
    {
      clib_warning
	("error while getting segment_manager_props_t, can't update fifo-size");
      return;
    }

  seg_mgr_props->tx_fifo_size = qm->udp_fifo_size;
  seg_mgr_props->rx_fifo_size = qm->udp_fifo_size;
}

static clib_error_t *
quic_init (vlib_main_t * vm)
{
  u32 segment_size = 256 << 20;
  vlib_thread_main_t *vtm = vlib_get_thread_main ();
  tw_timer_wheel_1t_3w_1024sl_ov_t *tw;
  vnet_app_attach_args_t _a, *a = &_a;
  u64 options[APP_OPTIONS_N_OPTIONS];
  quic_main_t *qm = &quic_main;
  u32 num_threads, i;

  num_threads = 1 /* main thread */  + vtm->n_threads;

  clib_memset (a, 0, sizeof (*a));
  clib_memset (options, 0, sizeof (options));

  a->session_cb_vft = &quic_app_cb_vft;
  a->api_client_index = APP_INVALID_INDEX;
  a->options = options;
  a->name = format (0, "quic");
  a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
  a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = segment_size;
  a->options[APP_OPTIONS_RX_FIFO_SIZE] = qm->udp_fifo_size;
  a->options[APP_OPTIONS_TX_FIFO_SIZE] = qm->udp_fifo_size;
  a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = qm->udp_fifo_prealloc;
  a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
  a->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
  a->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_IS_TRANSPORT_APP;

  if (vnet_application_attach (a))
    {
      clib_warning ("failed to attach quic app");
      return clib_error_return (0, "failed to attach quic app");
    }

  vec_validate (qm->ctx_pool, num_threads - 1);
  vec_validate (qm->wrk_ctx, num_threads - 1);

  for (i = 0; i < num_threads; i++)
    {
      qm->wrk_ctx[i].next_cid.thread_id = i;
      tw = &qm->wrk_ctx[i].timer_wheel;
      tw_timer_wheel_init_1t_3w_1024sl_ov (tw, quic_expired_timers_dispatch,
					   1e-3 /* timer period 1ms */ , ~0);
      tw->last_run_time = vlib_time_now (vlib_get_main ());
      clib_bihash_init_24_8 (&qm->wrk_ctx[i].crypto_context_hash,
			     "quic crypto contexts", 64, 128 << 10);
    }

  clib_bihash_init_16_8 (&qm->connection_hash, "quic connections", 1024,
			 4 << 20);

  qm->app_index = a->app_index;
  qm->tstamp_ticks_per_clock = vm->clib_time.seconds_per_clock
    / QUIC_TSTAMP_RESOLUTION;
  qm->session_cache.super.cb = quic_encrypt_ticket_cb;

  transport_register_protocol (TRANSPORT_PROTO_QUIC, &quic_proto,
			       FIB_PROTOCOL_IP4, ~0);
  transport_register_protocol (TRANSPORT_PROTO_QUIC, &quic_proto,
			       FIB_PROTOCOL_IP6, ~0);

  clib_bitmap_alloc (qm->available_crypto_engines,
		     app_crypto_engine_n_types ());
  quic_register_cipher_suite (CRYPTO_ENGINE_PICOTLS,
			      ptls_openssl_cipher_suites);
  qm->default_crypto_engine = CRYPTO_ENGINE_PICOTLS;

  vnet_crypto_main_t *cm = &crypto_main;
  if (vec_len (cm->engines) == 0)
    qm->vnet_crypto_enabled = 0;
  else
    qm->vnet_crypto_enabled = 1;
  if (qm->vnet_crypto_enabled == 1)
    {
      quic_register_cipher_suite (CRYPTO_ENGINE_VPP,
				  quic_crypto_cipher_suites);
      qm->default_crypto_engine = CRYPTO_ENGINE_VPP;
    }

  qm->max_packets_per_key = DEFAULT_MAX_PACKETS_PER_KEY;
  clib_rwlock_init (&qm->crypto_keys_quic_rw_lock);

  qm->default_quic_cc = QUIC_CC_RENO;

  vec_free (a->name);
  return 0;
}

VLIB_INIT_FUNCTION (quic_init);

static clib_error_t *
quic_plugin_crypto_command_fn (vlib_main_t * vm,
			       unformat_input_t * input,
			       vlib_cli_command_t * cmd)
{
  unformat_input_t _line_input, *line_input = &_line_input;
  quic_main_t *qm = &quic_main;
  clib_error_t *e = 0;

  if (!unformat_user (input, unformat_line_input, line_input))
    return 0;

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "vpp"))
	qm->default_crypto_engine = CRYPTO_ENGINE_VPP;
      else if (unformat (line_input, "picotls"))
	qm->default_crypto_engine = CRYPTO_ENGINE_PICOTLS;
      else
	{
	  e = clib_error_return (0, "unknown input '%U'",
				 format_unformat_error, line_input);
	  goto done;
	}
    }
done:
  unformat_free (line_input);
  return e;
}

u64 quic_fifosize = 0;
static clib_error_t *
quic_plugin_set_fifo_size_command_fn (vlib_main_t * vm,
				      unformat_input_t * input,
				      vlib_cli_command_t * cmd)
{
  quic_main_t *qm = &quic_main;
  unformat_input_t _line_input, *line_input = &_line_input;
  uword tmp;

  if (!unformat_user (input, unformat_line_input, line_input))
    return 0;

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "%U", unformat_memory_size, &tmp))
	{
	  if (tmp >= 0x100000000ULL)
	    {
	      return clib_error_return
		(0, "fifo-size %llu (0x%llx) too large", tmp, tmp);
	    }
	  qm->udp_fifo_size = tmp;
	  quic_update_fifo_size ();
	}
      else
	return clib_error_return (0, "unknown input '%U'",
				  format_unformat_error, line_input);
    }

  return 0;
}

static inline u64
quic_get_counter_value (u32 event_code)
{
  vlib_node_t *n;
  vlib_main_t *vm;
  vlib_error_main_t *em;

  u32 code, i;
  u64 c, sum = 0;
  int index = 0;

  vm = vlib_get_main ();
  em = &vm->error_main;
  n = vlib_get_node (vm, quic_input_node.index);
  code = event_code;
  foreach_vlib_main ()
    {
      em = &this_vlib_main->error_main;
      i = n->error_heap_index + code;
      c = em->counters[i];

      if (i < vec_len (em->counters_last_clear))
	c -= em->counters_last_clear[i];
      sum += c;
      index++;
    }
  return sum;
}

static void
quic_show_aggregated_stats (vlib_main_t * vm)
{
  u32 num_workers = vlib_num_workers ();
  quic_main_t *qm = &quic_main;
  quic_ctx_t *ctx = NULL;
  quicly_stats_t st, agg_stats;
  u32 i, nconn = 0, nstream = 0;

  clib_memset (&agg_stats, 0, sizeof (agg_stats));
  for (i = 0; i < num_workers + 1; i++)
    {
      /* *INDENT-OFF* */
      pool_foreach (ctx, qm->ctx_pool[i])
       {
	if (quic_ctx_is_conn (ctx) && ctx->conn)
	  {
	    quicly_get_stats (ctx->conn, &st);
	    agg_stats.rtt.smoothed += st.rtt.smoothed;
	    agg_stats.rtt.minimum += st.rtt.minimum;
	    agg_stats.rtt.variance += st.rtt.variance;
	    agg_stats.num_packets.received += st.num_packets.received;
	    agg_stats.num_packets.sent += st.num_packets.sent;
	    agg_stats.num_packets.lost += st.num_packets.lost;
	    agg_stats.num_packets.ack_received += st.num_packets.ack_received;
	    agg_stats.num_bytes.received += st.num_bytes.received;
	    agg_stats.num_bytes.sent += st.num_bytes.sent;
	    nconn++;
	  }
	else if (quic_ctx_is_stream (ctx))
	  nstream++;
      }
      /* *INDENT-ON* */
    }
  vlib_cli_output (vm, "-------- Connections --------");
  vlib_cli_output (vm, "Current:         %u", nconn);
  vlib_cli_output (vm, "Opened:          %d",
		   quic_get_counter_value (QUIC_ERROR_OPENED_CONNECTION));
  vlib_cli_output (vm, "Closed:          %d",
		   quic_get_counter_value (QUIC_ERROR_CLOSED_CONNECTION));
  vlib_cli_output (vm, "---------- Streams ----------");
  vlib_cli_output (vm, "Current:         %u", nstream);
  vlib_cli_output (vm, "Opened:          %d",
		   quic_get_counter_value (QUIC_ERROR_OPENED_STREAM));
  vlib_cli_output (vm, "Closed:          %d",
		   quic_get_counter_value (QUIC_ERROR_CLOSED_STREAM));
  vlib_cli_output (vm, "---------- Packets ----------");
  vlib_cli_output (vm, "RX Total:        %d",
		   quic_get_counter_value (QUIC_ERROR_RX_PACKETS));
  vlib_cli_output (vm, "RX 0RTT:         %d",
		   quic_get_counter_value (QUIC_ERROR_ZERO_RTT_RX_PACKETS));
  vlib_cli_output (vm, "RX 1RTT:         %d",
		   quic_get_counter_value (QUIC_ERROR_ONE_RTT_RX_PACKETS));
  vlib_cli_output (vm, "TX Total:        %d",
		   quic_get_counter_value (QUIC_ERROR_TX_PACKETS));
  vlib_cli_output (vm, "----------- Stats -----------");
  vlib_cli_output (vm, "Min      RTT     %f",
		   nconn > 0 ? agg_stats.rtt.minimum / nconn : 0);
  vlib_cli_output (vm, "Smoothed RTT     %f",
		   nconn > 0 ? agg_stats.rtt.smoothed / nconn : 0);
  vlib_cli_output (vm, "Variance on RTT  %f",
		   nconn > 0 ? agg_stats.rtt.variance / nconn : 0);
  vlib_cli_output (vm, "Packets Received %lu",
		   agg_stats.num_packets.received);
  vlib_cli_output (vm, "Packets Sent     %lu", agg_stats.num_packets.sent);
  vlib_cli_output (vm, "Packets Lost     %lu", agg_stats.num_packets.lost);
  vlib_cli_output (vm, "Packets Acks     %lu",
		   agg_stats.num_packets.ack_received);
  vlib_cli_output (vm, "RX bytes         %lu", agg_stats.num_bytes.received);
  vlib_cli_output (vm, "TX bytes         %lu", agg_stats.num_bytes.sent);
}

static u8 *
quic_format_quicly_conn_id (u8 * s, va_list * args)
{
  quicly_cid_plaintext_t *mid = va_arg (*args, quicly_cid_plaintext_t *);
  s = format (s, "C%x_%x", mid->master_id, mid->thread_id);
  return s;
}

static u8 *
quic_format_quicly_stream_id (u8 * s, va_list * args)
{
  quicly_stream_t *stream = va_arg (*args, quicly_stream_t *);
  s =
    format (s, "%U S%lx", quic_format_quicly_conn_id,
	    quicly_get_master_id (stream->conn), stream->stream_id);
  return s;
}

static u8 *
quic_format_listener_ctx (u8 * s, va_list * args)
{
  quic_ctx_t *ctx = va_arg (*args, quic_ctx_t *);
  s = format (s, "[#%d][%x][Listener]", ctx->c_thread_index, ctx->c_c_index);
  return s;
}

static u8 *
quic_format_connection_ctx (u8 * s, va_list * args)
{
  quic_ctx_t *ctx = va_arg (*args, quic_ctx_t *);
  quicly_stats_t quicly_stats;

  s = format (s, "[#%d][%x]", ctx->c_thread_index, ctx->c_c_index);

  if (!ctx->conn)
    {
      s = format (s, "- no conn -\n");
      return s;
    }
  s = format (s, "[%U]",
	      quic_format_quicly_conn_id, quicly_get_master_id (ctx->conn));
  quicly_get_stats (ctx->conn, &quicly_stats);

  s = format (s, "[RTT >%3d, ~%3d, V%3d, last %3d]",
	      quicly_stats.rtt.minimum, quicly_stats.rtt.smoothed,
	      quicly_stats.rtt.variance, quicly_stats.rtt.latest);
  s = format (s, " TX:%d RX:%d loss:%d ack:%d",
	      quicly_stats.num_packets.sent,
	      quicly_stats.num_packets.received,
	      quicly_stats.num_packets.lost,
	      quicly_stats.num_packets.ack_received);
  s =
    format (s, "\ncwnd:%u ssthresh:%u recovery_end:%lu", quicly_stats.cc.cwnd,
	    quicly_stats.cc.ssthresh, quicly_stats.cc.recovery_end);

  quicly_context_t *quicly_ctx = quic_get_quicly_ctx_from_ctx (ctx);
  if (quicly_ctx->init_cc == &quicly_cc_cubic_init)
    {
      s = format (
	s,
	"\nk:%d w_max:%u w_last_max:%u avoidance_start:%ld last_sent_time:%ld",
	quicly_stats.cc.state.cubic.k, quicly_stats.cc.state.cubic.w_max,
	quicly_stats.cc.state.cubic.w_last_max,
	quicly_stats.cc.state.cubic.avoidance_start,
	quicly_stats.cc.state.cubic.last_sent_time);
    }
  else if (quicly_ctx->init_cc == &quicly_cc_reno_init)
    {
      s = format (s, " stash:%u", quicly_stats.cc.state.reno.stash);
    }

  return s;
}

static u8 *
quic_format_stream_ctx (u8 * s, va_list * args)
{
  quic_ctx_t *ctx = va_arg (*args, quic_ctx_t *);
  session_t *stream_session;
  quicly_stream_t *stream = ctx->stream;
  u32 txs, rxs;

  s = format (s, "[#%d][%x]", ctx->c_thread_index, ctx->c_c_index);
  s = format (s, "[%U]", quic_format_quicly_stream_id, stream);

  stream_session = session_get_if_valid (ctx->c_s_index, ctx->c_thread_index);
  if (!stream_session)
    {
      s = format (s, "- no session -\n");
      return s;
    }
  txs = svm_fifo_max_dequeue (stream_session->tx_fifo);
  rxs = svm_fifo_max_dequeue (stream_session->rx_fifo);
  s = format (s, "[rx %d tx %d]\n", rxs, txs);
  return s;
}

static clib_error_t *
quic_show_connections_command_fn (vlib_main_t * vm,
				  unformat_input_t * input,
				  vlib_cli_command_t * cmd)
{
  unformat_input_t _line_input, *line_input = &_line_input;
  u8 show_listeners = 0, show_conn = 0, show_stream = 0;
  u32 num_workers = vlib_num_workers ();
  quic_main_t *qm = &quic_main;
  clib_error_t *error = 0;
  quic_ctx_t *ctx = NULL;

  session_cli_return_if_not_enabled ();

  if (!unformat_user (input, unformat_line_input, line_input))
    {
      quic_show_aggregated_stats (vm);
      return 0;
    }

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "listener"))
	show_listeners = 1;
      else if (unformat (line_input, "conn"))
	show_conn = 1;
      else if (unformat (line_input, "stream"))
	show_stream = 1;
      else
	{
	  error = clib_error_return (0, "unknown input `%U'",
				     format_unformat_error, line_input);
	  goto done;
	}
    }

  for (int i = 0; i < num_workers + 1; i++)
    {
      /* *INDENT-OFF* */
      pool_foreach (ctx, qm->ctx_pool[i])
       {
        if (quic_ctx_is_stream (ctx) && show_stream)
          vlib_cli_output (vm, "%U", quic_format_stream_ctx, ctx);
        else if (quic_ctx_is_listener (ctx) && show_listeners)
          vlib_cli_output (vm, "%U", quic_format_listener_ctx, ctx);
	else if (quic_ctx_is_conn (ctx) && show_conn)
          vlib_cli_output (vm, "%U", quic_format_connection_ctx, ctx);
      }
      /* *INDENT-ON* */
    }

done:
  unformat_free (line_input);
  return error;
}

/* *INDENT-OFF* */
VLIB_CLI_COMMAND (quic_plugin_crypto_command, static) = {
  .path = "quic set crypto api",
  .short_help = "quic set crypto api [picotls|vpp]",
  .function = quic_plugin_crypto_command_fn,
};
VLIB_CLI_COMMAND(quic_plugin_set_fifo_size_command, static)=
{
  .path = "quic set fifo-size",
  .short_help = "quic set fifo-size N[K|M|G] (default 64K)",
  .function = quic_plugin_set_fifo_size_command_fn,
};
VLIB_CLI_COMMAND(quic_show_ctx_command, static)=
{
  .path = "show quic",
  .short_help = "show quic",
  .function = quic_show_connections_command_fn,
};
VLIB_CLI_COMMAND (quic_list_crypto_context_command, static) =
{
  .path = "show quic crypto context",
  .short_help = "list quic crypto contextes",
  .function = quic_list_crypto_context_command_fn,
};
VLIB_CLI_COMMAND (quic_set_max_packets_per_key, static) =
{
  .path = "set quic max_packets_per_key",
  .short_help = "set quic max_packets_per_key 16777216",
  .function = quic_set_max_packets_per_key_fn,
};
VLIB_CLI_COMMAND (quic_set_cc, static) = {
  .path = "set quic cc",
  .short_help = "set quic cc [reno|cubic]",
  .function = quic_set_cc_fn,
};
VLIB_PLUGIN_REGISTER () =
{
  .version = VPP_BUILD_VER,
  .description = "Quic transport protocol",
  .default_disabled = 1,
};
/* *INDENT-ON* */

static clib_error_t *
quic_config_fn (vlib_main_t * vm, unformat_input_t * input)
{
  unformat_input_t _line_input, *line_input = &_line_input;
  quic_main_t *qm = &quic_main;
  clib_error_t *error = 0;
  uword tmp;
  u32 i;

  qm->udp_fifo_size = QUIC_DEFAULT_FIFO_SIZE;
  qm->udp_fifo_prealloc = 0;
  qm->connection_timeout = QUIC_DEFAULT_CONN_TIMEOUT;

  if (!unformat_user (input, unformat_line_input, line_input))
    return 0;

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (input, "fifo-size %U", unformat_memory_size, &tmp))
	{
	  if (tmp >= 0x100000000ULL)
	    {
	      error = clib_error_return (0,
					 "fifo-size %llu (0x%llx) too large",
					 tmp, tmp);
	      goto done;
	    }
	  qm->udp_fifo_size = tmp;
	}
      else if (unformat (input, "conn-timeout %u", &i))
	qm->connection_timeout = i;
      else if (unformat (input, "fifo-prealloc %u", &i))
	qm->udp_fifo_prealloc = i;
      else
	{
	  error = clib_error_return (0, "unknown input '%U'",
				     format_unformat_error, line_input);
	  goto done;
	}
    }
done:
  unformat_free (line_input);
  return error;
}

VLIB_EARLY_CONFIG_FUNCTION (quic_config_fn, "quic");

static uword
quic_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
	      vlib_frame_t * frame)
{
  return 0;
}

/* *INDENT-OFF* */
VLIB_REGISTER_NODE (quic_input_node) =
{
  .function = quic_node_fn,
  .name = "quic-input",
  .vector_size = sizeof (u32),
  .type = VLIB_NODE_TYPE_INTERNAL,
  .n_errors = ARRAY_LEN (quic_error_strings),
  .error_strings = quic_error_strings,
};
/* *INDENT-ON* */

/*
 * fd.io coding-style-patch-verification: ON
 *
 * Local Variables:
 * eval: (c-set-style "gnu")
 * End:
 */