summaryrefslogtreecommitdiffstats
path: root/src/plugins/arping
diff options
context:
space:
mode:
authorSteven Luong <sluong@cisco.com>2021-02-14 11:37:02 -0800
committerBeno�t Ganne <bganne@cisco.com>2021-04-02 08:44:52 +0000
commita77ae4708906b2a7894f7ac694bf55d5f0558d5f (patch)
treec62c174032c9605250ecfe44ce857cfae8943356 /src/plugins/arping
parent19be32876f0587067e72c92b4d1faf5d4c26b3cd (diff)
arping: add arping command
Add linux similar arping command to VPP. syntax: arping [gratuitous] <address> <interface> [repeat <count>] [interval <secs>] Type: feature Signed-off-by: Steven Luong <sluong@cisco.com> Change-Id: I9267c054235207b8fae8e3f159246777eb0340dd
Diffstat (limited to 'src/plugins/arping')
-rw-r--r--src/plugins/arping/CMakeLists.txt24
-rw-r--r--src/plugins/arping/FEATURE.yaml9
-rw-r--r--src/plugins/arping/arping.api61
-rw-r--r--src/plugins/arping/arping.c797
-rw-r--r--src/plugins/arping/arping.h74
-rw-r--r--src/plugins/arping/arping_api.c87
-rw-r--r--src/plugins/arping/arping_test.c167
-rw-r--r--src/plugins/arping/test/test_arping.py251
8 files changed, 1470 insertions, 0 deletions
diff --git a/src/plugins/arping/CMakeLists.txt b/src/plugins/arping/CMakeLists.txt
new file mode 100644
index 00000000000..afec21e7d8e
--- /dev/null
+++ b/src/plugins/arping/CMakeLists.txt
@@ -0,0 +1,24 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+add_vpp_plugin(arping
+ SOURCES
+ arping.c
+ arping_api.c
+
+ API_FILES
+ arping.api
+
+ API_TEST_SOURCES
+ arping_test.c
+)
diff --git a/src/plugins/arping/FEATURE.yaml b/src/plugins/arping/FEATURE.yaml
new file mode 100644
index 00000000000..c947b17c6a4
--- /dev/null
+++ b/src/plugins/arping/FEATURE.yaml
@@ -0,0 +1,9 @@
+---
+name: arping command
+maintainer: Steven Luong <sluong@cisco.com>
+features:
+ - arping command to send either gratuitous or ARP request to the remote
+ - support both IPv4 and IPv6
+description: "arping command"
+state: production
+properties: [API, CLI, STATS, MULTITHREAD]
diff --git a/src/plugins/arping/arping.api b/src/plugins/arping/arping.api
new file mode 100644
index 00000000000..f797b8cf3aa
--- /dev/null
+++ b/src/plugins/arping/arping.api
@@ -0,0 +1,61 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+option version = "1.0.0";
+import "vnet/interface_types.api";
+import "vnet/ip/ip_types.api";
+
+/** \brief
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param address - address to send arp request or gratuitous arp.
+ @param sw_if_index - interface to send
+ @param repeat - number of packets to send
+ @param interval - if more than 1 packet is sent, the delay between send
+ @param is_garp - is garp or arp request
+*/
+
+define arping
+{
+ u32 client_index;
+ u32 context;
+ vl_api_address_t address;
+ vl_api_interface_index_t sw_if_index;
+ bool is_garp;
+ u32 repeat [default=1];
+ f64 interval [default=1.0];
+ option vat_help = "<address> <interface> [gratuitouss] [repeat <count>] [interval <sec>]";
+};
+
+/** \brief
+ @param context - sender context, to match reply w/ request
+ @param retval - return value for request
+ @reply_count - return value for reply count
+*/
+
+define arping_reply
+{
+ u32 context;
+ i32 retval;
+ u32 reply_count;
+};
+
+/*
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/arping/arping.c b/src/plugins/arping/arping.c
new file mode 100644
index 00000000000..6c5836b9b9b
--- /dev/null
+++ b/src/plugins/arping/arping.c
@@ -0,0 +1,797 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stddef.h>
+
+#include <vlib/vlib.h>
+#include <vlib/unix/unix.h>
+#include <vnet/plugin/plugin.h>
+#include <vpp/app/version.h>
+#include <vnet/ip-neighbor/ip4_neighbor.h>
+#include <vnet/ip-neighbor/ip6_neighbor.h>
+#include <arping/arping.h>
+
+arping_main_t arping_main;
+
+#define foreach_arping_error _ (NONE, "no error")
+
+typedef enum
+{
+#define _(f, s) ARPING_ERROR_##f,
+ foreach_arping_error
+#undef _
+ ARPING_N_ERROR,
+} arping__error_t;
+
+static char *arping_error_strings[] = {
+#define _(n, s) s,
+ foreach_arping_error
+#undef _
+};
+
+#define foreach_arping \
+ _ (DROP, "error-drop") \
+ _ (IO, "interface-output")
+
+typedef enum
+{
+#define _(sym, str) ARPING_NEXT_##sym,
+ foreach_arping
+#undef _
+ ARPING_N_NEXT,
+} arping_next_t;
+
+typedef struct arping_trace_t_
+{
+ u32 sw_if_index;
+ u16 arp_opcode;
+ ethernet_arp_ip4_over_ethernet_address_t reply;
+} arping_trace_t;
+
+typedef enum
+{
+#define _(sym, str) ARPING6_NEXT_##sym,
+ foreach_arping
+#undef _
+ ARPING6_N_NEXT,
+} arping6_next_t;
+
+typedef CLIB_PACKED (struct {
+ mac_address_t mac;
+ ip6_address_t ip6;
+}) ethernet_arp_ip6_over_ethernet_address_t;
+
+typedef struct arping6_trace_t_
+{
+ u32 sw_if_index;
+ u8 type;
+ ethernet_arp_ip6_over_ethernet_address_t reply;
+} arping6_trace_t;
+
+/* packet trace format function */
+static u8 *
+format_arping_trace (u8 *s, va_list *args)
+{
+ CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
+ CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
+ arping_trace_t *t = va_arg (*args, arping_trace_t *);
+
+ s = format (s, "sw-if-index: %u, opcode: %U, from %U (%U)", t->sw_if_index,
+ format_ethernet_arp_opcode, t->arp_opcode, format_mac_address,
+ &t->reply.mac, format_ip4_address, &t->reply.ip4);
+
+ return s;
+}
+
+static u8 *
+format_arping6_trace (u8 *s, va_list *args)
+{
+ CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
+ CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
+ arping6_trace_t *t = va_arg (*args, arping6_trace_t *);
+
+ s = format (s, "sw-if-index: %u, type: %u, from %U (%U)", t->sw_if_index,
+ t->type, format_mac_address, &t->reply.mac, format_ip6_address,
+ &t->reply.ip6);
+
+ return s;
+}
+
+VLIB_NODE_FN (arping_input_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+ u32 n_left_from, *from, *to_next, n_left_to_next;
+ arping_next_t next_index;
+ arping_main_t *am = &arping_main;
+
+ next_index = node->cached_next_index;
+ n_left_from = frame->n_vectors;
+ from = vlib_frame_vector_args (frame);
+
+ while (n_left_from > 0)
+ {
+ vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
+
+ while (n_left_from >= 2 && n_left_to_next >= 2)
+ {
+ u32 next0, next1, bi0, bi1;
+ vlib_buffer_t *b0, *b1;
+ ethernet_arp_header_t *arp0, *arp1;
+ u32 sw_if_index0, sw_if_index1;
+ arping_intf_t *aif0, *aif1;
+
+ bi0 = to_next[0] = from[0];
+ bi1 = to_next[1] = from[1];
+
+ from += 2;
+ n_left_from -= 2;
+ to_next += 2;
+ n_left_to_next -= 2;
+
+ next0 = next1 = ARPING_NEXT_DROP;
+
+ b0 = vlib_get_buffer (vm, bi0);
+ b1 = vlib_get_buffer (vm, bi1);
+
+ arp0 = vlib_buffer_get_current (b0);
+ arp1 = vlib_buffer_get_current (b1);
+
+ vnet_feature_next (&next0, b0);
+ vnet_feature_next (&next1, b1);
+
+ sw_if_index0 = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+ sw_if_index1 = vnet_buffer (b1)->sw_if_index[VLIB_RX];
+
+ if (PREDICT_TRUE (arp0->opcode ==
+ clib_host_to_net_u16 (ETHERNET_ARP_OPCODE_reply)))
+ {
+ aif0 = am->interfaces[sw_if_index0];
+ if (PREDICT_TRUE (aif0->address.ip.ip4.as_u32 ==
+ arp0->ip4_over_ethernet[0].ip4.as_u32))
+ {
+ aif0->recv.from4.ip4.as_u32 =
+ arp0->ip4_over_ethernet[0].ip4.as_u32;
+ clib_memcpy_fast (&aif0->recv.from4.mac,
+ &arp0->ip4_over_ethernet[0].mac, 6);
+ aif0->reply_count++;
+ }
+ }
+ if (PREDICT_TRUE (arp1->opcode ==
+ clib_host_to_net_u16 (ETHERNET_ARP_OPCODE_reply)))
+ {
+ aif1 = am->interfaces[sw_if_index1];
+ if (PREDICT_TRUE (aif1->address.ip.ip4.as_u32 ==
+ arp1->ip4_over_ethernet[0].ip4.as_u32))
+ {
+ aif1->recv.from4.ip4.as_u32 =
+ arp1->ip4_over_ethernet[0].ip4.as_u32;
+ clib_memcpy_fast (&aif1->recv.from4.mac,
+ &arp0->ip4_over_ethernet[0].mac, 6);
+ aif1->reply_count++;
+ }
+ }
+
+ if (PREDICT_FALSE ((b0->flags & VLIB_BUFFER_IS_TRACED)))
+ {
+ arping_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+ t->arp_opcode = clib_host_to_net_u16 (arp0->opcode);
+ t->reply.ip4.as_u32 = arp0->ip4_over_ethernet[0].ip4.as_u32;
+ clib_memcpy_fast (&t->reply.mac, &arp0->ip4_over_ethernet[0].mac,
+ 6);
+ }
+ if (PREDICT_FALSE ((b1->flags & VLIB_BUFFER_IS_TRACED)))
+ {
+ arping_trace_t *t = vlib_add_trace (vm, node, b1, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_RX];
+ t->arp_opcode = clib_host_to_net_u16 (arp1->opcode);
+ t->reply.ip4.as_u32 = arp1->ip4_over_ethernet[0].ip4.as_u32;
+ clib_memcpy_fast (&t->reply.mac, &arp1->ip4_over_ethernet[0].mac,
+ 6);
+ }
+
+ vlib_validate_buffer_enqueue_x2 (vm, node, next_index, to_next,
+ n_left_to_next, bi0, bi1, next0,
+ next1);
+ }
+
+ while (n_left_from > 0 && n_left_to_next > 0)
+ {
+ u32 next0, bi0;
+ vlib_buffer_t *b0;
+ ethernet_arp_header_t *arp0;
+ arping_intf_t *aif0;
+ u32 sw_if_index0;
+
+ bi0 = to_next[0] = from[0];
+
+ from += 1;
+ n_left_from -= 1;
+ to_next += 1;
+ n_left_to_next -= 1;
+ next0 = ARPING_NEXT_DROP;
+
+ b0 = vlib_get_buffer (vm, bi0);
+ arp0 = vlib_buffer_get_current (b0);
+
+ vnet_feature_next (&next0, b0);
+
+ sw_if_index0 = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+
+ if (PREDICT_TRUE (arp0->opcode ==
+ clib_host_to_net_u16 (ETHERNET_ARP_OPCODE_reply)))
+ {
+ aif0 = am->interfaces[sw_if_index0];
+ if (PREDICT_TRUE (aif0->address.ip.ip4.as_u32 ==
+ arp0->ip4_over_ethernet[0].ip4.as_u32))
+ {
+ aif0->recv.from4.ip4.as_u32 =
+ arp0->ip4_over_ethernet[0].ip4.as_u32;
+ clib_memcpy_fast (&aif0->recv.from4.mac,
+ &arp0->ip4_over_ethernet[0].mac, 6);
+ aif0->reply_count++;
+ }
+ }
+
+ if (PREDICT_FALSE ((b0->flags & VLIB_BUFFER_IS_TRACED)))
+ {
+ arping_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+ t->arp_opcode = clib_host_to_net_u16 (arp0->opcode);
+ t->reply.ip4.as_u32 = arp0->ip4_over_ethernet[0].ip4.as_u32;
+ clib_memcpy_fast (&t->reply.mac, &arp0->ip4_over_ethernet[0].mac,
+ 6);
+ }
+
+ vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
+ n_left_to_next, bi0, next0);
+ }
+
+ vlib_put_next_frame (vm, node, next_index, n_left_to_next);
+ }
+
+ return frame->n_vectors;
+}
+
+VLIB_REGISTER_NODE (arping_input_node) =
+{
+ .name = "arping-input",.vector_size = sizeof (u32),.format_trace =
+ format_arping_trace,.type = VLIB_NODE_TYPE_INTERNAL,.n_errors =
+ ARPING_N_ERROR,.error_strings = arping_error_strings,.n_next_nodes =
+ ARPING_N_NEXT,.next_nodes =
+ {
+ [ARPING_NEXT_DROP] = "error-drop",[ARPING_NEXT_IO] = "interface-output",}
+,};
+
+VNET_FEATURE_INIT (arping_feat_node, static) = {
+ .arc_name = "arp",
+ .node_name = "arping-input",
+ .runs_before = VNET_FEATURES ("arp-reply"),
+};
+
+VLIB_NODE_FN (arping6_input_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+ u32 n_left_from, *from, *to_next, n_left_to_next;
+ arping_next_t next_index;
+ arping_main_t *am = &arping_main;
+
+ next_index = node->cached_next_index;
+ n_left_from = frame->n_vectors;
+ from = vlib_frame_vector_args (frame);
+
+ while (n_left_from > 0)
+ {
+ vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
+
+ while (n_left_from >= 2 && n_left_to_next >= 2)
+ {
+ u32 next0, next1, bi0, bi1;
+ vlib_buffer_t *b0, *b1;
+ ip6_header_t *ip60, *ip61;
+ u32 sw_if_index0, sw_if_index1;
+ arping_intf_t *aif0, *aif1;
+ icmp6_neighbor_solicitation_or_advertisement_header_t *sol_adv0,
+ *sol_adv1;
+ icmp6_neighbor_discovery_ethernet_link_layer_address_option_t
+ *lladdr0,
+ *lladdr1;
+
+ bi0 = to_next[0] = from[0];
+ bi1 = to_next[1] = from[1];
+
+ from += 2;
+ n_left_from -= 2;
+ to_next += 2;
+ n_left_to_next -= 2;
+
+ next0 = next1 = ARPING6_NEXT_DROP;
+
+ b0 = vlib_get_buffer (vm, bi0);
+ b1 = vlib_get_buffer (vm, bi1);
+
+ ip60 = vlib_buffer_get_current (b0);
+ ip61 = vlib_buffer_get_current (b1);
+
+ vnet_feature_next (&next0, b0);
+ vnet_feature_next (&next1, b1);
+
+ sw_if_index0 = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+ sw_if_index1 = vnet_buffer (b1)->sw_if_index[VLIB_RX];
+
+ sol_adv0 = ip6_next_header (ip60);
+ lladdr0 =
+ (icmp6_neighbor_discovery_ethernet_link_layer_address_option_t
+ *) (sol_adv0 + 1);
+
+ if (PREDICT_TRUE (sol_adv0->icmp.type ==
+ ICMP6_neighbor_advertisement))
+ {
+ aif0 = am->interfaces[sw_if_index0];
+ if (PREDICT_TRUE (clib_memcmp (&aif0->address.ip.ip6,
+ &sol_adv0->target_address,
+ sizeof (aif0->address.ip.ip6)) ==
+ 0))
+ {
+ clib_memcpy_fast (&aif0->recv.from6.ip6,
+ &sol_adv0->target_address,
+ sizeof (aif0->recv.from6.ip6));
+ clib_memcpy_fast (&aif0->recv.from6.mac,
+ lladdr0->ethernet_address, 6);
+ aif0->reply_count++;
+ }
+ }
+
+ sol_adv1 = ip6_next_header (ip61);
+ lladdr1 =
+ (icmp6_neighbor_discovery_ethernet_link_layer_address_option_t
+ *) (sol_adv1 + 1);
+
+ if (PREDICT_TRUE (sol_adv1->icmp.type ==
+ ICMP6_neighbor_advertisement))
+ {
+ aif1 = am->interfaces[sw_if_index1];
+ if (PREDICT_TRUE (clib_memcmp (&aif1->address.ip.ip6,
+ &sol_adv1->target_address,
+ sizeof (aif1->address.ip.ip6)) ==
+ 0))
+ {
+ clib_memcpy_fast (&aif1->recv.from6.ip6,
+ &sol_adv1->target_address,
+ sizeof (aif1->recv.from6.ip6));
+ clib_memcpy_fast (&aif1->recv.from6.mac,
+ lladdr1->ethernet_address, 6);
+ aif1->reply_count++;
+ }
+ }
+
+ if (PREDICT_FALSE ((b0->flags & VLIB_BUFFER_IS_TRACED)))
+ {
+ arping6_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+ t->type = sol_adv0->icmp.type;
+ clib_memcpy_fast (&t->reply.ip6, &sol_adv0->target_address,
+ sizeof (t->reply.ip6));
+ clib_memcpy_fast (&t->reply.mac, lladdr0->ethernet_address, 6);
+ }
+ if (PREDICT_FALSE ((b1->flags & VLIB_BUFFER_IS_TRACED)))
+ {
+ arping6_trace_t *t = vlib_add_trace (vm, node, b1, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_RX];
+ t->type = sol_adv1->icmp.type;
+ clib_memcpy_fast (&t->reply.ip6, &sol_adv1->target_address,
+ sizeof (t->reply.ip6));
+ clib_memcpy_fast (&t->reply.mac, lladdr1->ethernet_address, 6);
+ }
+
+ vlib_validate_buffer_enqueue_x2 (vm, node, next_index, to_next,
+ n_left_to_next, bi0, bi1, next0,
+ next1);
+ }
+
+ while (n_left_from > 0 && n_left_to_next > 0)
+ {
+ u32 next0, bi0;
+ vlib_buffer_t *b0;
+ arping_intf_t *aif0;
+ u32 sw_if_index0;
+ ip6_header_t *ip60;
+ icmp6_neighbor_solicitation_or_advertisement_header_t *sol_adv0;
+ icmp6_neighbor_discovery_ethernet_link_layer_address_option_t
+ *lladdr0;
+
+ bi0 = to_next[0] = from[0];
+
+ from += 1;
+ n_left_from -= 1;
+ to_next += 1;
+ n_left_to_next -= 1;
+ next0 = ARPING_NEXT_DROP;
+
+ b0 = vlib_get_buffer (vm, bi0);
+ ip60 = vlib_buffer_get_current (b0);
+
+ vnet_feature_next (&next0, b0);
+
+ sw_if_index0 = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+
+ sol_adv0 = ip6_next_header (ip60);
+ lladdr0 =
+ (icmp6_neighbor_discovery_ethernet_link_layer_address_option_t
+ *) (sol_adv0 + 1);
+ if (PREDICT_TRUE (sol_adv0->icmp.type ==
+ ICMP6_neighbor_advertisement))
+ {
+ aif0 = am->interfaces[sw_if_index0];
+ if (PREDICT_TRUE (clib_memcmp (&aif0->address.ip.ip6,
+ &sol_adv0->target_address,
+ sizeof (aif0->address.ip.ip6)) ==
+ 0))
+ {
+ clib_memcpy_fast (&aif0->recv.from6.ip6,
+ &sol_adv0->target_address,
+ sizeof (aif0->recv.from6.ip6));
+ clib_memcpy_fast (&aif0->recv.from6.mac,
+ lladdr0->ethernet_address, 6);
+ aif0->reply_count++;
+ }
+ }
+
+ if (PREDICT_FALSE ((b0->flags & VLIB_BUFFER_IS_TRACED)))
+ {
+ arping6_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+ t->type = sol_adv0->icmp.type;
+ clib_memcpy_fast (&t->reply.ip6, &sol_adv0->target_address,
+ sizeof (t->reply.ip6));
+ clib_memcpy_fast (&t->reply.mac, lladdr0->ethernet_address, 6);
+ }
+
+ vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
+ n_left_to_next, bi0, next0);
+ }
+
+ vlib_put_next_frame (vm, node, next_index, n_left_to_next);
+ }
+
+ return frame->n_vectors;
+}
+
+VLIB_REGISTER_NODE (arping6_input_node) =
+{
+ .name = "arping6-input",.vector_size = sizeof (u32),.format_trace =
+ format_arping6_trace,.type = VLIB_NODE_TYPE_INTERNAL,.n_errors =
+ ARPING_N_ERROR,.error_strings = arping_error_strings,.n_next_nodes =
+ ARPING_N_NEXT,.next_nodes =
+ {
+ [ARPING6_NEXT_DROP] = "error-drop",[ARPING6_NEXT_IO] = "interface-output",}
+,};
+
+VNET_FEATURE_INIT (arping6_feat_node, static) = {
+ .arc_name = "ip6-local",
+ .node_name = "arping6-input",
+ .runs_before = VNET_FEATURES ("ip6-local-end-of-arc"),
+};
+
+static clib_error_t *
+arping_neighbor_advertisement (vlib_main_t *vm, arping_args_t *args)
+{
+ vnet_main_t *vnm = vnet_get_main ();
+ u32 send_count = 0;
+
+ while (args->repeat > 0)
+ {
+ send_count++;
+ if (args->address.version == AF_IP4)
+ {
+ if (args->silence == 0)
+ vlib_cli_output (vm, "Sending %u GARP to %U", send_count,
+ format_ip4_address, &args->address.ip.ip4);
+ ip4_neighbor_advertise (vm, vnm, args->sw_if_index,
+ &args->address.ip.ip4);
+ }
+ else
+ {
+ if (args->silence == 0)
+ vlib_cli_output (vm, "Sending %u Neighbor Advertisement to %U",
+ send_count, format_ip6_address,
+ &args->address.ip.ip6);
+ ip6_neighbor_advertise (vm, vnm, args->sw_if_index,
+ &args->address.ip.ip6);
+ }
+ args->repeat--;
+ if ((args->interval > 0.0) && (args->repeat > 0))
+ vlib_process_suspend (vm, args->interval);
+ }
+
+ return 0;
+}
+
+static void
+arping_vnet_feature_enable_disable (vlib_main_t *vm, const char *arc_name,
+ const char *node_name, u32 sw_if_index,
+ int enable_disable, void *feature_config,
+ u32 n_feature_config_bytes)
+{
+ vlib_worker_thread_barrier_sync (vm);
+ vnet_feature_enable_disable (arc_name, node_name, sw_if_index,
+ enable_disable, feature_config,
+ n_feature_config_bytes);
+ vlib_worker_thread_barrier_release (vm);
+}
+
+static void
+arping_vec_validate (vlib_main_t *vm, u32 sw_if_index)
+{
+ arping_main_t *am = &arping_main;
+
+ if (sw_if_index >= vec_len (am->interfaces))
+ {
+ vlib_worker_thread_barrier_sync (vm);
+ vec_validate (am->interfaces, sw_if_index);
+ vlib_worker_thread_barrier_release (vm);
+ }
+}
+
+static clib_error_t *
+arping_neighbor_probe_dst (vlib_main_t *vm, arping_args_t *args)
+{
+ arping_main_t *am = &arping_main;
+ u32 send_count = 0;
+ clib_error_t *error;
+ arping_intf_t aif;
+
+ /* Disallow multiple sends on the same interface for now. Who needs it? */
+ if (am->interfaces && (am->interfaces[args->sw_if_index] != 0))
+ {
+ error = clib_error_return (
+ 0, "arping command is in progress for the same interface. "
+ "Please try again later.");
+ args->rv = VNET_API_ERROR_INVALID_VALUE;
+ return error;
+ }
+
+ arping_vec_validate (vm, args->sw_if_index);
+ clib_memset (&aif, 0, sizeof (aif));
+ aif.interval = args->interval;
+ aif.repeat = args->repeat;
+ aif.reply_count = 0;
+ am->interfaces[args->sw_if_index] = &aif;
+
+ clib_memcpy (&aif.address, &args->address, sizeof (aif.address));
+ if (args->address.version == AF_IP4)
+ arping_vnet_feature_enable_disable (vm, "arp", "arping-input",
+ args->sw_if_index, 1, 0, 0);
+ else
+ arping_vnet_feature_enable_disable (vm, "ip6-local", "arping6-input",
+ args->sw_if_index, 1, 0, 0);
+
+ while (args->repeat > 0)
+ {
+ send_count++;
+ if (args->address.version == AF_IP4)
+ {
+ if (args->silence == 0)
+ vlib_cli_output (vm, "Sending %u ARP Request to %U", send_count,
+ format_ip4_address, &args->address.ip.ip4);
+ ip4_neighbor_probe_dst (args->sw_if_index, &args->address.ip.ip4);
+ }
+ else
+ {
+ if (args->silence == 0)
+ vlib_cli_output (vm, "Sending %u Neighbor Solicitation to %U",
+ send_count, format_ip6_address,
+ &args->address.ip.ip6);
+ ip6_neighbor_probe_dst (args->sw_if_index, &args->address.ip.ip6);
+ }
+ args->repeat--;
+ if ((args->interval > 0.0) && (args->repeat > 0))
+ vlib_process_suspend (vm, args->interval);
+ }
+
+ /* wait for a second on the reply */
+ u32 wait_count = 0;
+ while ((aif.reply_count < send_count) && (wait_count < 10))
+ {
+ vlib_process_suspend (vm, 0.1);
+ wait_count++;
+ }
+
+ if (args->address.version == AF_IP4)
+ {
+ clib_memcpy (&args->recv.from4, &aif.recv.from4,
+ sizeof (args->recv.from4));
+ arping_vnet_feature_enable_disable (vm, "arp", "arping-input",
+ args->sw_if_index, 0, 0, 0);
+ }
+ else
+ {
+ clib_memcpy (&args->recv.from6, &aif.recv.from6,
+ sizeof (args->recv.from6));
+ arping_vnet_feature_enable_disable (vm, "ip6-local", "arping6-input",
+ args->sw_if_index, 0, 0, 0);
+ }
+ args->reply_count = aif.reply_count;
+
+ am->interfaces[args->sw_if_index] = 0;
+
+ return 0;
+}
+
+void
+arping_run_command (vlib_main_t *vm, arping_args_t *args)
+{
+ if (args->is_garp)
+ args->error = arping_neighbor_advertisement (vm, args);
+ else
+ args->error = arping_neighbor_probe_dst (vm, args);
+}
+
+static clib_error_t *
+arping_ip_address (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ clib_error_t *error = 0;
+ vnet_main_t *vnm = vnet_get_main ();
+ arping_args_t args = { 0 };
+ f64 interval = ARPING_DEFAULT_INTERVAL;
+
+ args.repeat = ARPING_DEFAULT_REPEAT;
+ args.interval = ARPING_DEFAULT_INTERVAL;
+ args.sw_if_index = ~0;
+ args.silence = 0;
+
+ if (unformat (input, "gratuitous"))
+ args.is_garp = 1;
+
+ if (unformat (input, "%U", unformat_ip4_address, &args.address.ip.ip4))
+ args.address.version = AF_IP4;
+ else if (unformat (input, "%U", unformat_ip6_address, &args.address.ip.ip6))
+ args.address.version = AF_IP6;
+ else
+ {
+ error = clib_error_return (
+ 0,
+ "expecting IP4/IP6 address `%U'. Usage: arping [gratuitous] <addr> "
+ "<intf> [repeat <count>] [interval <secs>]",
+ format_unformat_error, input);
+ goto done;
+ }
+
+ if (!unformat_user (input, unformat_vnet_sw_interface, vnm,
+ &args.sw_if_index))
+ {
+ error = clib_error_return (0, "unknown interface `%U'",
+ format_unformat_error, input);
+ goto done;
+ }
+
+ /* parse the rest of the parameters in a cycle */
+ while (!unformat_eof (input, NULL))
+ {
+ if (unformat (input, "interval"))
+ {
+ if (!unformat (input, "%f", &interval))
+ {
+ error = clib_error_return (
+ 0, "expecting interval (floating point number) got `%U'",
+ format_unformat_error, input);
+ goto done;
+ }
+ args.interval = interval;
+ }
+ else if (unformat (input, "repeat"))
+ {
+ if (!unformat (input, "%u", &args.repeat))
+ {
+ error =
+ clib_error_return (0, "expecting repeat count but got `%U'",
+ format_unformat_error, input);
+ goto done;
+ }
+ }
+ else
+ {
+ error = clib_error_return (0, "unknown input `%U'",
+ format_unformat_error, input);
+ goto done;
+ }
+ }
+
+ arping_run_command (vm, &args);
+
+ if (args.reply_count)
+ {
+ if (args.address.version == AF_IP4)
+ vlib_cli_output (vm, "Received %u ARP Replies from %U (%U)",
+ args.reply_count, format_mac_address,
+ &args.recv.from4.mac, format_ip4_address,
+ &args.recv.from4.ip4);
+ else
+ vlib_cli_output (
+ vm, "Received %u ICMP6 neighbor advertisements from %U (%U)",
+ args.reply_count, format_mac_address, &args.recv.from6.mac,
+ format_ip6_address, &args.recv.from6.ip6);
+ }
+ else if (args.is_garp == 0)
+ vlib_cli_output (vm, "Received 0 Reply");
+
+ error = args.error;
+done:
+ return error;
+}
+// clang-format off
+/*?
+ * This command sends an ARP REQUEST or gratuitous ARP to network hosts. The
+ * address can be an IPv4 or IPv6 address.
+ *
+ * @cliexpar
+ * @parblock
+ * Example of how to send an IPv4 ARP REQUEST
+ * @cliexstart{arping 100.1.1.10 VirtualEthernet0/0/0 repeat 3 interval 1}
+ * Sending 1 ARP Request to 100.1.1.10
+ * Sending 2 ARP Request to 100.1.1.10
+ * Sending 3 ARP Request to 100.1.1.10
+ * Received 3 ARP Replies from 52:53:00:00:04:01 (100.1.1.10)
+ * @cliexend
+ *
+ * Example of how to send an IPv6 Neighbor Solicitation
+ * @cliexstart{arping 2001:192::2 VirtualEthernet0/0/0 repeat 3 interval 1}
+ * Sending 1 Neighbor Solicitation to 2001:192::2
+ * Sending 2 Neighbor Solicitation to 2001:192::2
+ * Sending 3 Neighbor Solicitation to 2001:192::2
+ * Received 3 ICMP6 neighbor advertisements from 52:53:00:00:04:01 (2001:192::2)
+ * @cliexend
+ *
+ * Example of how to send an IPv4 gratuitous ARP
+ * @cliexstart{arping gratuitous 100.1.1.100 VirtualEthernet0/0/0 repeat 2}
+ * Sending 1 GARP to 100.1.1.100
+ * Sending 2 GARP to 100.1.1.100
+ * @cliexend
+ * @endparblock
+ *
+?*/
+// clang-format on
+VLIB_CLI_COMMAND (arping_command, static) = {
+ .path = "arping",
+ .function = arping_ip_address,
+ .short_help = "arping [gratuitous] {addr} {interface}"
+ " [interval {sec}] [repeat {cnt}]",
+ .is_mp_safe = 1,
+};
+
+static clib_error_t *
+arping_cli_init (vlib_main_t *vm)
+{
+ /* initialize binary API */
+ arping_plugin_api_hookup (vm);
+
+ return 0;
+}
+
+VLIB_INIT_FUNCTION (arping_cli_init);
+
+VLIB_PLUGIN_REGISTER () = {
+ .version = VPP_BUILD_VER,
+ .description = "Arping (arping)",
+};
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/arping/arping.h b/src/plugins/arping/arping.h
new file mode 100644
index 00000000000..d07e7cfb683
--- /dev/null
+++ b/src/plugins/arping/arping.h
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef included_arping_arping_h
+#define included_arping_arping_h
+
+#include <vnet/ip/ip_types.h>
+#include <vnet/ethernet/arp_packet.h>
+
+#define ARPING_DEFAULT_INTERVAL 1.0
+#define ARPING_DEFAULT_REPEAT 1
+
+typedef struct arping6_ip6_reply_t
+{
+ mac_address_t mac;
+ ip6_address_t ip6;
+} arping6_ip6_reply_t;
+
+typedef CLIB_PACKED (union arping46_reply_ {
+ ethernet_arp_ip4_over_ethernet_address_t from4;
+ arping6_ip6_reply_t from6;
+}) arping46_reply_t;
+
+typedef struct arping_intf_t
+{
+ CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+ f64 interval;
+ u32 repeat;
+ ip_address_t address;
+
+ arping46_reply_t recv;
+ u32 reply_count;
+} arping_intf_t;
+
+typedef struct arping_main_t
+{
+ arping_intf_t *arping_interfaces;
+ arping_intf_t **interfaces;
+ u16 msg_id_base;
+} arping_main_t;
+
+typedef struct arping_args_t
+{
+ ip_address_t address;
+ u32 sw_if_index;
+ u32 repeat;
+ f64 interval;
+ u8 is_garp;
+ u8 silence;
+
+ /* reply */
+ i32 rv;
+ u32 reply_count;
+ arping46_reply_t recv;
+ clib_error_t *error;
+} arping_args_t;
+
+extern arping_main_t arping_main;
+
+extern clib_error_t *arping_plugin_api_hookup (vlib_main_t *vm);
+extern void arping_run_command (vlib_main_t *vm, arping_args_t *args);
+
+#endif /* included_arping_arping_h */
diff --git a/src/plugins/arping/arping_api.c b/src/plugins/arping/arping_api.c
new file mode 100644
index 00000000000..015c6148f5e
--- /dev/null
+++ b/src/plugins/arping/arping_api.c
@@ -0,0 +1,87 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+#include <vlib/vlib.h>
+#include <vlib/unix/unix.h>
+#include <vlib/pci/pci.h>
+#include <vnet/ethernet/ethernet.h>
+#include <vnet/format_fns.h>
+#include <vnet/ip/ip_types_api.h>
+
+#include <arping/arping.h>
+
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+
+/* define message IDs */
+#include <arping/arping.api_enum.h>
+#include <arping/arping.api_types.h>
+
+#include <vlibapi/api_helper_macros.h>
+
+static void
+vl_api_arping_t_handler (vl_api_arping_t *mp)
+{
+ vlib_main_t *vm = vlib_get_main ();
+ arping_main_t *am = &arping_main;
+ vl_api_arping_reply_t *rmp;
+ arping_args_t args = { 0 };
+ int rv;
+
+ if (mp->sw_if_index != ~0)
+ VALIDATE_SW_IF_INDEX (mp);
+
+ ip_address_decode2 (&mp->address, &args.address);
+ args.interval = clib_net_to_host_f64 (mp->interval);
+ args.repeat = ntohl (mp->repeat);
+ args.is_garp = mp->is_garp;
+ args.sw_if_index = ntohl (mp->sw_if_index);
+ args.silence = 1;
+
+ arping_run_command (vm, &args);
+ rv = args.rv;
+
+ BAD_SW_IF_INDEX_LABEL;
+
+ REPLY_MACRO2 (VL_API_ARPING_REPLY + am->msg_id_base,
+ ({ rmp->reply_count = ntohl (args.reply_count); }));
+}
+
+/* set tup the API message handling tables */
+#include <arping/arping.api.c>
+clib_error_t *
+arping_plugin_api_hookup (vlib_main_t *vm)
+{
+ arping_main_t *am = &arping_main;
+ api_main_t *vam = vlibapi_get_main ();
+
+ /* ask for a correctly-sized block of API message decode slots */
+ am->msg_id_base = setup_message_id_table ();
+
+ /* Mark API as mp safe */
+ vam->is_mp_safe[am->msg_id_base + VL_API_ARPING] = 1;
+
+ return 0;
+}
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/arping/arping_test.c b/src/plugins/arping/arping_test.c
new file mode 100644
index 00000000000..9001b7098a7
--- /dev/null
+++ b/src/plugins/arping/arping_test.c
@@ -0,0 +1,167 @@
+/*
+ * arping VAT support
+ *
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <inttypes.h>
+
+#include <vat/vat.h>
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+
+#include <vppinfra/error.h>
+#include <arping/arping.h>
+
+#define __plugin_msg_base arping_test_main.msg_id_base
+#include <vlibapi/vat_helper_macros.h>
+
+/* declare message IDs */
+#include <vnet/format_fns.h>
+#include <arping/arping.api_enum.h>
+#include <arping/arping.api_types.h>
+#include <vpp/api/vpe.api_types.h>
+#include <vnet/ip/ip_types_api.h>
+
+typedef struct
+{
+ /* API message ID base */
+ u16 msg_id_base;
+ u32 ping_id;
+ vat_main_t *vat_main;
+} arping_test_main_t;
+
+arping_test_main_t arping_test_main;
+
+/* arping request API */
+static int
+api_arping (vat_main_t *vam)
+{
+ vl_api_arping_t *mp;
+ arping_args_t args = { 0 };
+ int ret;
+ unformat_input_t *input = vam->input;
+ vnet_main_t *vnm = vnet_get_main ();
+ f64 interval = ARPING_DEFAULT_INTERVAL;
+ vl_api_control_ping_t *mp_ping;
+ arping_test_main_t *atm = &arping_test_main;
+
+ args.repeat = ARPING_DEFAULT_REPEAT;
+ args.interval = ARPING_DEFAULT_INTERVAL;
+ args.sw_if_index = ~0;
+
+ if (unformat (input, "gratuitous"))
+ args.is_garp = 1;
+
+ if (unformat (input, "%U", unformat_ip4_address, &args.address.ip.ip4))
+ args.address.version = AF_IP4;
+ else if (unformat (input, "%U", unformat_ip6_address, &args.address.ip.ip6))
+ args.address.version = AF_IP6;
+ else
+ {
+ errmsg ("expecting IP4/IP6 address `%U'. Usage: arping [gratuitous] "
+ "<addr> <intf> [repeat <count>] [interval <secs>]",
+ format_unformat_error, input);
+ return -99;
+ }
+
+ if (!unformat_user (input, unformat_vnet_sw_interface, vnm,
+ &args.sw_if_index))
+ {
+ errmsg ("unknown interface `%U'", format_unformat_error, input);
+ return -99;
+ }
+
+ /* parse the rest of the parameters in a cycle */
+ while (!unformat_eof (input, NULL))
+ {
+ if (unformat (input, "interval"))
+ {
+ if (!unformat (input, "%f", &interval))
+ {
+ errmsg ("expecting interval (floating point number) got `%U'",
+ format_unformat_error, input);
+ return -99;
+ }
+ args.interval = interval;
+ }
+ else if (unformat (input, "repeat"))
+ {
+ if (!unformat (input, "%u", &args.repeat))
+ {
+ errmsg ("expecting repeat count but got `%U'",
+ format_unformat_error, input);
+ return -99;
+ }
+ }
+ else
+ {
+ errmsg ("unknown input `%U'", format_unformat_error, input);
+ return -99;
+ }
+ }
+
+ M (ARPING, mp);
+
+ mp->interval = clib_host_to_net_f64 (args.interval);
+ mp->repeat = clib_host_to_net_u32 (args.repeat);
+ mp->is_garp = args.is_garp;
+ mp->sw_if_index = clib_host_to_net_u32 (args.sw_if_index);
+ ip_address_encode2 (&args.address, &mp->address);
+
+ S (mp);
+
+ /* Use a control ping for synchronization */
+ if (!atm->ping_id)
+ atm->ping_id = vl_msg_api_get_msg_index ((u8 *) (VL_API_CONTROL_PING_CRC));
+ mp_ping = vl_msg_api_alloc_as_if_client (sizeof (*mp_ping));
+ mp_ping->_vl_msg_id = htons (atm->ping_id);
+ mp_ping->client_index = vam->my_client_index;
+
+ fformat (vam->ofp, "Sending ping id=%d\n", atm->ping_id);
+
+ vam->result_ready = 0;
+ S (mp_ping);
+
+ W (ret);
+
+ return ret;
+}
+
+/* arping-create reply handler */
+static void
+vl_api_arping_reply_t_handler (vl_api_arping_reply_t *mp)
+{
+ vat_main_t *vam = arping_test_main.vat_main;
+ i32 retval = ntohl (mp->retval);
+
+ if (retval == 0)
+ {
+ fformat (vam->ofp, "arping request reply count = %d\n",
+ ntohl (mp->reply_count));
+ }
+
+ vam->retval = retval;
+ vam->result_ready = 1;
+}
+
+#include <arping/arping.api_test.c>
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/arping/test/test_arping.py b/src/plugins/arping/test/test_arping.py
new file mode 100644
index 00000000000..bd8b6250a54
--- /dev/null
+++ b/src/plugins/arping/test/test_arping.py
@@ -0,0 +1,251 @@
+from scapy.layers.l2 import ARP
+from scapy.layers.inet6 import ICMPv6ND_NS, ICMPv6ND_NA, IPv6
+
+from framework import VppTestCase
+
+""" TestArping is a subclass of VPPTestCase classes.
+
+Basic test for sanity check of arping.
+
+"""
+
+
+class TestArping(VppTestCase):
+ """ Arping Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestArping, cls).setUpClass()
+ try:
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces)
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.disable_ipv6_ra()
+ i.resolve_arp()
+ i.resolve_ndp()
+ except Exception:
+ super(TestArping, cls).tearDownClass()
+ raise
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestArping, cls).tearDownClass()
+
+ def tearDown(self):
+ super(TestArping, self).tearDown()
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show hardware"))
+
+ def verify_arping_request(self, p, src, dst):
+ arp = p[ARP]
+ self.assertEqual(arp.hwtype, 0x0001)
+ self.assertEqual(arp.ptype, 0x0800)
+ self.assertEqual(arp.hwlen, 6)
+ self.assertEqual(arp.op, 1)
+ self.assertEqual(arp.psrc, src)
+ self.assertEqual(arp.pdst, dst)
+
+ def verify_arping_ip6_ns(self, p, src, dst):
+ icmpv6 = p[ICMPv6ND_NS]
+ self.assertEqual(icmpv6.type, 135)
+ self.assertEqual(icmpv6.tgt, dst)
+ ipv6 = p[IPv6]
+ self.assertEqual(src, ipv6.src)
+
+ def verify_arping_ip6_na(self, p, src, dst):
+ icmpv6 = p[ICMPv6ND_NA]
+ self.assertEqual(icmpv6.type, 136)
+ self.assertEqual(icmpv6.tgt, dst)
+ ipv6 = p[IPv6]
+ self.assertEqual(src, ipv6.src)
+
+ def test_arping_ip4_arp_request_cli(self):
+ """ arping IP4 arp request CLI test """
+ try:
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ remote_ip4 = self.pg1.remote_ip4
+
+ ping_cmd = "arping " + remote_ip4 + "pg1 repeat 5 interval 0.1"
+ ret = self.vapi.cli(ping_cmd)
+ self.logger.info(ret)
+
+ ping_cmd = "arping " + remote_ip4 + "pg1"
+ ret = self.vapi.cli(ping_cmd)
+ self.logger.info(ret)
+
+ out = self.pg1.get_capture(6)
+ for p in out:
+ self.verify_arping_request(p, self.pg1.local_ip4,
+ self.pg1.remote_ip4)
+ finally:
+ self.vapi.cli("show error")
+
+ def test_arping_ip4_garp_cli(self):
+ """ arping ip4 gratuitous arp CLI test """
+ try:
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ ping_cmd = ("arping gratuitous" + self.pg1.local_ip4 +
+ "pg1 repeat 5 interval 0.1")
+ ret = self.vapi.cli(ping_cmd)
+ self.logger.info(ret)
+
+ ping_cmd = "arping gratuitous" + self.pg1.local_ip4 + "pg1"
+ ret = self.vapi.cli(ping_cmd)
+ self.logger.info(ret)
+
+ out = self.pg1.get_capture(6)
+ for p in out:
+ self.verify_arping_request(p, self.pg1.local_ip4,
+ self.pg1.local_ip4)
+ finally:
+ self.vapi.cli("show error")
+
+ def test_arping_ip4_arp_request_api(self):
+ """ arping ip4 arp request API test """
+ try:
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ remote_ip4 = self.pg1.remote_ip4
+
+ ret = self.vapi.arping(address=remote_ip4,
+ sw_if_index=self.pg1.sw_if_index,
+ is_garp=0, repeat=5, interval=0.1)
+ self.logger.info(ret)
+
+ ret = self.vapi.arping(address=remote_ip4,
+ sw_if_index=self.pg1.sw_if_index,
+ is_garp=0)
+ self.logger.info(ret)
+
+ out = self.pg1.get_capture(6)
+ for p in out:
+ self.verify_arping_request(p, self.pg1.local_ip4,
+ self.pg1.remote_ip4)
+ finally:
+ self.vapi.cli("show error")
+
+ def test_arping_ip4_garp_api(self):
+ """ arping ip4 gratuitous arp API test """
+ try:
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ ret = self.vapi.arping(address=self.pg1.local_ip4,
+ sw_if_index=self.pg1.sw_if_index,
+ is_garp=1, repeat=5, interval=0.1)
+ self.logger.info(ret)
+
+ ret = self.vapi.arping(address=self.pg1.local_ip4,
+ sw_if_index=self.pg1.sw_if_index,
+ is_garp=1)
+ self.logger.info(ret)
+
+ out = self.pg1.get_capture(6)
+ for p in out:
+ self.verify_arping_request(p, self.pg1.local_ip4,
+ self.pg1.local_ip4)
+ finally:
+ self.vapi.cli("show error")
+
+ def test_arping_ip6_ns_cli(self):
+ """ arping IP6 neighbor solicitation CLI test """
+ try:
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ remote_ip6 = self.pg1.remote_ip6
+
+ ping_cmd = "arping " + remote_ip6 + "pg1 repeat 5 interval 0.1"
+ ret = self.vapi.cli(ping_cmd)
+ self.logger.info(ret)
+
+ ping_cmd = "arping " + remote_ip6 + "pg1"
+ ret = self.vapi.cli(ping_cmd)
+ self.logger.info(ret)
+
+ out = self.pg1.get_capture(6)
+ for p in out:
+ self.verify_arping_ip6_ns(p, self.pg1.local_ip6,
+ self.pg1.remote_ip6)
+ finally:
+ self.vapi.cli("show error")
+
+ def test_arping_ip6_ns_api(self):
+ """ arping ip6 neighbor solicitation API test """
+ try:
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ remote_ip6 = self.pg1.remote_ip6
+
+ ret = self.vapi.arping(address=remote_ip6,
+ sw_if_index=self.pg1.sw_if_index,
+ is_garp=0, repeat=5, interval=0.1)
+ self.logger.info(ret)
+
+ ret = self.vapi.arping(address=remote_ip6,
+ sw_if_index=self.pg1.sw_if_index,
+ is_garp=0)
+ self.logger.info(ret)
+
+ out = self.pg1.get_capture(6)
+ for p in out:
+ self.verify_arping_ip6_ns(p, self.pg1.local_ip6,
+ self.pg1.remote_ip6)
+ finally:
+ self.vapi.cli("show error")
+
+ def test_arping_ip6_na_cli(self):
+ """ arping ip6 neighbor advertisement CLI test """
+ try:
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ ping_cmd = ("arping gratuitous" + self.pg1.local_ip6 +
+ "pg1 repeat 5 interval 0.1")
+ ret = self.vapi.cli(ping_cmd)
+ self.logger.info(ret)
+
+ ping_cmd = "arping gratuitous" + self.pg1.local_ip6 + "pg1"
+ ret = self.vapi.cli(ping_cmd)
+ self.logger.info(ret)
+
+ out = self.pg1.get_capture(6)
+ for p in out:
+ self.verify_arping_ip6_na(p, self.pg1.local_ip6,
+ self.pg1.local_ip6)
+ finally:
+ self.vapi.cli("show error")
+
+ def test_arping_ip6_na_api(self):
+ """ arping ip6 neighbor advertisement API test """
+ try:
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ ret = self.vapi.arping(address=self.pg1.local_ip6,
+ sw_if_index=self.pg1.sw_if_index,
+ is_garp=1, repeat=5, interval=0.1)
+ self.logger.info(ret)
+
+ ret = self.vapi.arping(address=self.pg1.local_ip6,
+ sw_if_index=self.pg1.sw_if_index,
+ is_garp=1)
+ self.logger.info(ret)
+
+ out = self.pg1.get_capture(6)
+ for p in out:
+ self.verify_arping_ip6_na(p, self.pg1.local_ip6,
+ self.pg1.local_ip6)
+ finally:
+ self.vapi.cli("show error")
+
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)
495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930
/*
 * Copyright (c) 2019 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <sys/socket.h>

#include <vnet/session/application.h>
#include <vnet/session/transport.h>
#include <vnet/session/session.h>
#include <vlib/unix/plugin.h>
#include <vpp/app/version.h>

#include <vppinfra/lock.h>

#include <quic/quic.h>
#include <quic/certs.h>
#include <quic/error.h>

#include <quicly/constants.h>
#include <quicly/defaults.h>
#include <picotls.h>

#include <quic/quic_crypto.h>

extern quicly_crypto_engine_t quic_crypto_engine;

static char *quic_error_strings[] = {
#define quic_error(n,s) s,
#include <quic/quic_error.def>
#undef quic_error
};

#define DEFAULT_MAX_PACKETS_PER_KEY 16777216

quic_main_t quic_main;
static void quic_update_timer (quic_ctx_t * ctx);
static void quic_check_quic_session_connected (quic_ctx_t * ctx);
static int quic_reset_connection (u64 udp_session_handle,
				  quic_rx_packet_ctx_t * pctx);
static void quic_proto_on_close (u32 ctx_index, u32 thread_index);

static quicly_stream_open_t on_stream_open;
static quicly_closed_by_peer_t on_closed_by_peer;
static quicly_now_t quicly_vpp_now_cb;

/* Crypto contexts */

static inline void
quic_crypto_context_make_key_from_ctx (clib_bihash_kv_24_8_t * kv,
				       quic_ctx_t * ctx)
{
  application_t *app = application_get (ctx->parent_app_id);
  kv->key[0] = ((u64) ctx->ckpair_index) << 32 | (u64) ctx->crypto_engine;
  kv->key[1] = app->sm_properties.rx_fifo_size - 1;
  kv->key[2] = app->sm_properties.tx_fifo_size - 1;
}

static inline void
quic_crypto_context_make_key_from_crctx (clib_bihash_kv_24_8_t * kv,
					 crypto_context_t * crctx)
{
  quic_crypto_context_data_t *data =
    (quic_crypto_context_data_t *) crctx->data;
  kv->key[0] = ((u64) crctx->ckpair_index) << 32 | (u64) crctx->crypto_engine;
  kv->key[1] = data->quicly_ctx.transport_params.max_stream_data.bidi_local;
  kv->key[2] = data->quicly_ctx.transport_params.max_stream_data.bidi_remote;
}

static void
quic_crypto_context_free_if_needed (crypto_context_t * crctx, u8 thread_index)
{
  quic_main_t *qm = &quic_main;
  clib_bihash_kv_24_8_t kv;
  if (crctx->n_subscribers)
    return;
  quic_crypto_context_make_key_from_crctx (&kv, crctx);
  clib_bihash_add_del_24_8 (&qm->wrk_ctx[thread_index].crypto_context_hash,
			    &kv, 0 /* is_add */ );
  clib_mem_free (crctx->data);
  pool_put (qm->wrk_ctx[thread_index].crypto_ctx_pool, crctx);
}

static quicly_datagram_t *
quic_alloc_packet (quicly_packet_allocator_t * self, size_t payloadsize)
{
  quicly_datagram_t *packet;
  if ((packet =
       clib_mem_alloc (sizeof (*packet) + payloadsize +
		       sizeof (quic_encrypt_cb_ctx))) == NULL)
    return NULL;
  packet->data.base =
    (uint8_t *) packet + sizeof (*packet) + sizeof (quic_encrypt_cb_ctx);
  quic_encrypt_cb_ctx *encrypt_cb_ctx =
    (quic_encrypt_cb_ctx *) ((uint8_t *) packet + sizeof (*packet));

  clib_memset (encrypt_cb_ctx, 0, sizeof (*encrypt_cb_ctx));
  return packet;
}

static void
quic_free_packet (quicly_packet_allocator_t * self,
		  quicly_datagram_t * packet)
{
  clib_mem_free (packet);
}

quicly_packet_allocator_t quic_packet_allocator =
  { quic_alloc_packet, quic_free_packet };

static int
quic_app_cert_key_pair_delete_callback (app_cert_key_pair_t * ckpair)
{
  quic_main_t *qm = &quic_main;
  crypto_context_t *crctx;
  clib_bihash_kv_24_8_t kv;
  vlib_thread_main_t *vtm = vlib_get_thread_main ();
  int num_threads = 1 /* main thread */  + vtm->n_threads;
  int i;

  for (i = 0; i < num_threads; i++)
    {
      /* *INDENT-OFF* */
      pool_foreach (crctx, qm->wrk_ctx[i].crypto_ctx_pool, ({
	if (crctx->ckpair_index == ckpair->cert_key_index)
	  {
	    quic_crypto_context_make_key_from_crctx (&kv, crctx);
	    clib_bihash_add_del_24_8 (&qm->wrk_ctx[i].crypto_context_hash, &kv, 0 /* is_add */ );
	  }
      }));
      /* *INDENT-ON* */
    }
  return 0;
}

static crypto_context_t *
quic_crypto_context_alloc (u8 thread_index)
{
  quic_main_t *qm = &quic_main;
  crypto_context_t *crctx;
  u32 idx;

  pool_get (qm->wrk_ctx[thread_index].crypto_ctx_pool, crctx);
  clib_memset (crctx, 0, sizeof (*crctx));
  idx = (crctx - qm->wrk_ctx[thread_index].crypto_ctx_pool);
  crctx->ctx_index = ((u32) thread_index) << 24 | idx;

  return crctx;
}

static crypto_context_t *
quic_crypto_context_get (u32 cr_index, u32 thread_index)
{
  quic_main_t *qm = &quic_main;
  ASSERT (cr_index >> 24 == thread_index);
  return pool_elt_at_index (qm->wrk_ctx[thread_index].crypto_ctx_pool,
			    cr_index & 0x00ffffff);
}

static clib_error_t *
quic_list_crypto_context_command_fn (vlib_main_t * vm,
				     unformat_input_t * input,
				     vlib_cli_command_t * cmd)
{
  quic_main_t *qm = &quic_main;
  crypto_context_t *crctx;
  vlib_thread_main_t *vtm = vlib_get_thread_main ();
  int i, num_threads = 1 /* main thread */  + vtm->n_threads;
  for (i = 0; i < num_threads; i++)
    {
      /* *INDENT-OFF* */
      pool_foreach (crctx, qm->wrk_ctx[i].crypto_ctx_pool, ({
	vlib_cli_output (vm, "[%d][Q]%U", i, format_crypto_context, crctx);
      }));
      /* *INDENT-ON* */
    }
  return 0;
}

static clib_error_t *
quic_set_max_packets_per_key_fn (vlib_main_t * vm,
				 unformat_input_t * input,
				 vlib_cli_command_t * cmd)
{
  quic_main_t *qm = &quic_main;
  unformat_input_t _line_input, *line_input = &_line_input;
  u64 tmp;

  if (!unformat_user (input, unformat_line_input, line_input))
    return 0;

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "%U", unformat_memory_size, &tmp))
	{
	  qm->max_packets_per_key = tmp;
	}
      else
	return clib_error_return (0, "unknown input '%U'",
				  format_unformat_error, line_input);
    }

  return 0;
}

static void
quic_release_crypto_context (u32 crypto_context_index, u8 thread_index)
{
  crypto_context_t *crctx;
  crctx = quic_crypto_context_get (crypto_context_index, thread_index);
  crctx->n_subscribers--;
  quic_crypto_context_free_if_needed (crctx, thread_index);
}

static int
quic_init_crypto_context (crypto_context_t * crctx, quic_ctx_t * ctx)
{
  quic_main_t *qm = &quic_main;
  quicly_context_t *quicly_ctx;
  ptls_iovec_t key_vec;
  app_cert_key_pair_t *ckpair;
  application_t *app;
  quic_crypto_context_data_t *data;
  ptls_context_t *ptls_ctx;

  QUIC_DBG (2, "Init quic crctx %d thread %d", crctx->ctx_index,
	    ctx->c_thread_index);

  data = clib_mem_alloc (sizeof (*data));
  /* picotls depends on data being zeroed */
  clib_memset (data, 0, sizeof (*data));
  crctx->data = (void *) data;
  quicly_ctx = &data->quicly_ctx;
  ptls_ctx = &data->ptls_ctx;

  ptls_ctx->random_bytes = ptls_openssl_random_bytes;
  ptls_ctx->get_time = &ptls_get_time;
  ptls_ctx->key_exchanges = ptls_openssl_key_exchanges;
  ptls_ctx->cipher_suites = qm->quic_ciphers[ctx->crypto_engine];
  ptls_ctx->certificates.list = NULL;
  ptls_ctx->certificates.count = 0;
  ptls_ctx->esni = NULL;
  ptls_ctx->on_client_hello = NULL;
  ptls_ctx->emit_certificate = NULL;
  ptls_ctx->sign_certificate = NULL;
  ptls_ctx->verify_certificate = NULL;
  ptls_ctx->ticket_lifetime = 86400;
  ptls_ctx->max_early_data_size = 8192;
  ptls_ctx->hkdf_label_prefix__obsolete = NULL;
  ptls_ctx->require_dhe_on_psk = 1;
  ptls_ctx->encrypt_ticket = &qm->session_cache.super;
  clib_memcpy (quicly_ctx, &quicly_spec_context, sizeof (quicly_context_t));

  quicly_ctx->max_packet_size = QUIC_MAX_PACKET_SIZE;
  quicly_ctx->max_packets_per_key = qm->max_packets_per_key;
  quicly_ctx->tls = ptls_ctx;
  quicly_ctx->stream_open = &on_stream_open;
  quicly_ctx->closed_by_peer = &on_closed_by_peer;
  quicly_ctx->now = &quicly_vpp_now_cb;
  quicly_amend_ptls_context (quicly_ctx->tls);

  quicly_ctx->packet_allocator = &quic_packet_allocator;
  quicly_ctx->crypto_engine = &quic_crypto_engine;
  quicly_ctx->transport_params.max_data = QUIC_INT_MAX;
  quicly_ctx->transport_params.max_streams_uni = (uint64_t) 1 << 60;
  quicly_ctx->transport_params.max_streams_bidi = (uint64_t) 1 << 60;
  quicly_ctx->transport_params.max_idle_timeout = qm->connection_timeout;

  app = application_get (ctx->parent_app_id);
  quicly_ctx->transport_params.max_stream_data.bidi_local =
    app->sm_properties.rx_fifo_size - 1;
  quicly_ctx->transport_params.max_stream_data.bidi_remote =
    app->sm_properties.tx_fifo_size - 1;
  quicly_ctx->transport_params.max_stream_data.uni = QUIC_INT_MAX;

  if (!app->quic_iv_set)
    {
      ptls_openssl_random_bytes (app->quic_iv, QUIC_IV_LEN - 1);
      app->quic_iv[QUIC_IV_LEN - 1] = 0;
      app->quic_iv_set = 1;
    }

  clib_memcpy (data->cid_key, app->quic_iv, QUIC_IV_LEN);
  key_vec = ptls_iovec_init (data->cid_key, QUIC_IV_LEN);
  quicly_ctx->cid_encryptor =
    quicly_new_default_cid_encryptor (&ptls_openssl_bfecb,
				      &ptls_openssl_aes128ecb,
				      &ptls_openssl_sha256, key_vec);

  ckpair = app_cert_key_pair_get_if_valid (crctx->ckpair_index);
  if (!ckpair || !ckpair->key || !ckpair->cert)
    {
      QUIC_DBG (1, "Wrong ckpair id %d\n", crctx->ckpair_index);
      return -1;
    }
  if (load_bio_private_key (quicly_ctx->tls, (char *) ckpair->key))
    {
      QUIC_DBG (1, "failed to read private key from app configuration\n");
      return -1;
    }
  if (load_bio_certificate_chain (quicly_ctx->tls, (char *) ckpair->cert))
    {
      QUIC_DBG (1, "failed to load certificate\n");
      return -1;
    }
  return 0;

}

static int
quic_acquire_crypto_context (quic_ctx_t * ctx)
{
  quic_main_t *qm = &quic_main;
  crypto_context_t *crctx;
  clib_bihash_kv_24_8_t kv;

  if (ctx->crypto_engine == CRYPTO_ENGINE_NONE)
    {
      QUIC_DBG (2, "No crypto engine specified, using %d",
		qm->default_crypto_engine);
      ctx->crypto_engine = qm->default_crypto_engine;
    }
  if (!clib_bitmap_get (qm->available_crypto_engines, ctx->crypto_engine))
    {
      QUIC_DBG (1, "Quic does not support crypto engine %d",
		ctx->crypto_engine);
      return VNET_API_ERROR_MISSING_CERT_KEY;
    }

  /* Check for exisiting crypto ctx */
  quic_crypto_context_make_key_from_ctx (&kv, ctx);
  if (clib_bihash_search_24_8
      (&qm->wrk_ctx[ctx->c_thread_index].crypto_context_hash, &kv, &kv) == 0)
    {
      crctx = quic_crypto_context_get (kv.value, ctx->c_thread_index);
      QUIC_DBG (2, "Found exisiting crypto context %d", kv.value);
      ctx->crypto_context_index = kv.value;
      crctx->n_subscribers++;
      return 0;
    }

  crctx = quic_crypto_context_alloc (ctx->c_thread_index);
  ctx->crypto_context_index = crctx->ctx_index;
  kv.value = crctx->ctx_index;
  crctx->crypto_engine = ctx->crypto_engine;
  crctx->ckpair_index = ctx->ckpair_index;
  if (quic_init_crypto_context (crctx, ctx))
    goto error;
  if (vnet_app_add_cert_key_interest (ctx->ckpair_index, qm->app_index))
    goto error;
  crctx->n_subscribers++;
  clib_bihash_add_del_24_8 (&qm->
			    wrk_ctx[ctx->c_thread_index].crypto_context_hash,
			    &kv, 1 /* is_add */ );
  return 0;

error:
  quic_crypto_context_free_if_needed (crctx, ctx->c_thread_index);
  return VNET_API_ERROR_MISSING_CERT_KEY;
}

/*  Helper functions */

static u32
quic_ctx_alloc (u32 thread_index)
{
  quic_main_t *qm = &quic_main;
  quic_ctx_t *ctx;

  pool_get (qm->ctx_pool[thread_index], ctx);

  clib_memset (ctx, 0, sizeof (quic_ctx_t));
  ctx->c_thread_index = thread_index;
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  QUIC_DBG (3, "Allocated quic_ctx %u on thread %u",
	    ctx - qm->ctx_pool[thread_index], thread_index);
  return ctx - qm->ctx_pool[thread_index];
}

static void
quic_ctx_free (quic_ctx_t * ctx)
{
  QUIC_DBG (2, "Free ctx %u %x", ctx->c_thread_index, ctx->c_c_index);
  u32 thread_index = ctx->c_thread_index;
  QUIC_ASSERT (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID);
  if (CLIB_DEBUG)
    clib_memset (ctx, 0xfb, sizeof (*ctx));
  pool_put (quic_main.ctx_pool[thread_index], ctx);
}

static quic_ctx_t *
quic_ctx_get (u32 ctx_index, u32 thread_index)
{
  return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
}

static quic_ctx_t *
quic_ctx_get_if_valid (u32 ctx_index, u32 thread_index)
{
  if (pool_is_free_index (quic_main.ctx_pool[thread_index], ctx_index))
    return 0;
  return pool_elt_at_index (quic_main.ctx_pool[thread_index], ctx_index);
}

quic_ctx_t *
quic_get_conn_ctx (quicly_conn_t * conn)
{
  u64 conn_data;
  conn_data = (u64) * quicly_get_data (conn);
  return quic_ctx_get (conn_data & UINT32_MAX, conn_data >> 32);
}

static void
quic_store_conn_ctx (quicly_conn_t * conn, quic_ctx_t * ctx)
{
  *quicly_get_data (conn) =
    (void *) (((u64) ctx->c_thread_index) << 32 | (u64) ctx->c_c_index);
}

static inline int
quic_ctx_is_stream (quic_ctx_t * ctx)
{
  return (ctx->flags & QUIC_F_IS_STREAM);
}

static inline int
quic_ctx_is_listener (quic_ctx_t * ctx)
{
  return (ctx->flags & QUIC_F_IS_LISTENER);
}

static inline int
quic_ctx_is_conn (quic_ctx_t * ctx)
{
  return !(quic_ctx_is_listener (ctx) || quic_ctx_is_stream (ctx));
}

static inline session_t *
get_stream_session_and_ctx_from_stream (quicly_stream_t * stream,
					quic_ctx_t ** ctx)
{
  quic_stream_data_t *stream_data;

  stream_data = (quic_stream_data_t *) stream->data;
  *ctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index);
  return session_get ((*ctx)->c_s_index, stream_data->thread_index);
}

static inline void
quic_make_connection_key (clib_bihash_kv_16_8_t * kv,
			  const quicly_cid_plaintext_t * id)
{
  kv->key[0] = ((u64) id->master_id) << 32 | (u64) id->thread_id;
  kv->key[1] = id->node_id;
}

static int
quic_sendable_packet_count (session_t * udp_session)
{
  u32 max_enqueue;
  u32 packet_size = QUIC_MAX_PACKET_SIZE + SESSION_CONN_HDR_LEN;
  max_enqueue = svm_fifo_max_enqueue (udp_session->tx_fifo);
  return clib_min (max_enqueue / packet_size, QUIC_SEND_PACKET_VEC_SIZE);
}

static quicly_context_t *
quic_get_quicly_ctx_from_ctx (quic_ctx_t * ctx)
{
  crypto_context_t *crctx =
    quic_crypto_context_get (ctx->crypto_context_index, ctx->c_thread_index);
  quic_crypto_context_data_t *data =
    (quic_crypto_context_data_t *) crctx->data;
  return &data->quicly_ctx;
}

static quicly_context_t *
quic_get_quicly_ctx_from_udp (u64 udp_session_handle)
{
  session_t *udp_session = session_get_from_handle (udp_session_handle);
  quic_ctx_t *ctx =
    quic_ctx_get (udp_session->opaque, udp_session->thread_index);
  return quic_get_quicly_ctx_from_ctx (ctx);
}

static inline void
quic_set_udp_tx_evt (session_t * udp_session)
{
  int rv = 0;
  if (svm_fifo_set_event (udp_session->tx_fifo))
    rv = session_send_io_evt_to_thread (udp_session->tx_fifo,
					SESSION_IO_EVT_TX);
  if (PREDICT_FALSE (rv))
    clib_warning ("Event enqueue errored %d", rv);
}

static inline void
quic_stop_ctx_timer (quic_ctx_t * ctx)
{
  tw_timer_wheel_1t_3w_1024sl_ov_t *tw;
  if (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID)
    return;
  tw = &quic_main.wrk_ctx[ctx->c_thread_index].timer_wheel;
  tw_timer_stop_1t_3w_1024sl_ov (tw, ctx->timer_handle);
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  QUIC_DBG (4, "Stopping timer for ctx %u", ctx->c_c_index);
}

/* QUIC protocol actions */

static void
quic_ack_rx_data (session_t * stream_session)
{
  u32 max_deq;
  quic_ctx_t *sctx;
  svm_fifo_t *f;
  quicly_stream_t *stream;
  quic_stream_data_t *stream_data;

  sctx = quic_ctx_get (stream_session->connection_index,
		       stream_session->thread_index);
  QUIC_ASSERT (quic_ctx_is_stream (sctx));
  stream = sctx->stream;
  stream_data = (quic_stream_data_t *) stream->data;

  f = stream_session->rx_fifo;
  max_deq = svm_fifo_max_dequeue (f);

  QUIC_ASSERT (stream_data->app_rx_data_len >= max_deq);
  quicly_stream_sync_recvbuf (stream, stream_data->app_rx_data_len - max_deq);
  QUIC_DBG (3, "Acking %u bytes", stream_data->app_rx_data_len - max_deq);
  stream_data->app_rx_data_len = max_deq;
}

static void
quic_disconnect_transport (quic_ctx_t * ctx)
{
  QUIC_DBG (2, "Disconnecting transport 0x%lx", ctx->udp_session_handle);
  vnet_disconnect_args_t a = {
    .handle = ctx->udp_session_handle,
    .app_index = quic_main.app_index,
  };

  if (vnet_disconnect_session (&a))
    clib_warning ("UDP session 0x%lx disconnect errored",
		  ctx->udp_session_handle);
}

static void
quic_connection_delete (quic_ctx_t * ctx)
{
  clib_bihash_kv_16_8_t kv;
  quicly_conn_t *conn;

  QUIC_DBG (2, "Deleting connection %u", ctx->c_c_index);

  QUIC_ASSERT (!quic_ctx_is_stream (ctx));
  quic_stop_ctx_timer (ctx);

  /*  Delete the connection from the connection map */
  conn = ctx->conn;
  ctx->conn = NULL;
  quic_make_connection_key (&kv, quicly_get_master_id (conn));
  QUIC_DBG (2, "Deleting conn with id %lu %lu from map", kv.key[0],
	    kv.key[1]);
  clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 0 /* is_add */ );

  quic_disconnect_transport (ctx);

  if (ctx->conn)
    quicly_free (ctx->conn);
  session_transport_delete_notify (&ctx->connection);
}

void
quic_increment_counter (u8 evt, u8 val)
{
  vlib_main_t *vm = vlib_get_main ();
  vlib_node_increment_counter (vm, quic_input_node.index, evt, val);
}

/**
 * Called when quicly return an error
 * This function interacts tightly with quic_proto_on_close
 */
static void
quic_connection_closed (quic_ctx_t * ctx)
{
  QUIC_DBG (2, "QUIC connection %u/%u closed", ctx->c_thread_index,
	    ctx->c_c_index);

  /* TODO if connection is not established, just delete the session? */
  /* Actually should send connect or accept error */

  switch (ctx->conn_state)
    {
    case QUIC_CONN_STATE_READY:
      /* Error on an opened connection (timeout...)
         This puts the session in closing state, we should receive a notification
         when the app has closed its session */
      session_transport_reset_notify (&ctx->connection);
      /* This ensures we delete the connection when the app confirms the close */
      ctx->conn_state = QUIC_CONN_STATE_PASSIVE_CLOSING_QUIC_CLOSED;
      break;
    case QUIC_CONN_STATE_PASSIVE_CLOSING:
      ctx->conn_state = QUIC_CONN_STATE_PASSIVE_CLOSING_QUIC_CLOSED;
      /* quic_proto_on_close will eventually be called when the app confirms the close
         , we delete the connection at that point */
      break;
    case QUIC_CONN_STATE_PASSIVE_CLOSING_APP_CLOSED:
      /* App already confirmed close, we can delete the connection */
      quic_connection_delete (ctx);
      break;
    case QUIC_CONN_STATE_OPENED:
    case QUIC_CONN_STATE_HANDSHAKE:
    case QUIC_CONN_STATE_ACTIVE_CLOSING:
      quic_connection_delete (ctx);
      break;
    default:
      QUIC_DBG (0, "BUG %d", ctx->conn_state);
      break;
    }
}

static int
quic_send_datagram (session_t * udp_session, quicly_datagram_t * packet)
{
  u32 max_enqueue;
  session_dgram_hdr_t hdr;
  u32 len, ret;
  svm_fifo_t *f;
  transport_connection_t *tc;

  len = packet->data.len;
  f = udp_session->tx_fifo;
  tc = session_get_transport (udp_session);
  max_enqueue = svm_fifo_max_enqueue (f);
  if (max_enqueue < SESSION_CONN_HDR_LEN + len)
    {
      QUIC_ERR ("Too much data to send, max_enqueue %u, len %u",
		max_enqueue, len + SESSION_CONN_HDR_LEN);
      return QUIC_ERROR_FULL_FIFO;
    }

  /*  Build packet header for fifo */
  hdr.data_length = len;
  hdr.data_offset = 0;
  hdr.is_ip4 = tc->is_ip4;
  clib_memcpy (&hdr.lcl_ip, &tc->lcl_ip, sizeof (ip46_address_t));
  hdr.lcl_port = tc->lcl_port;

  /*  Read dest address from quicly-provided sockaddr */
  if (hdr.is_ip4)
    {
      QUIC_ASSERT (packet->dest.sa.sa_family == AF_INET);
      struct sockaddr_in *sa4 = (struct sockaddr_in *) &packet->dest.sa;
      hdr.rmt_port = sa4->sin_port;
      hdr.rmt_ip.ip4.as_u32 = sa4->sin_addr.s_addr;
    }
  else
    {
      QUIC_ASSERT (packet->dest.sa.sa_family == AF_INET6);
      struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) &packet->dest.sa;
      hdr.rmt_port = sa6->sin6_port;
      clib_memcpy (&hdr.rmt_ip.ip6, &sa6->sin6_addr, 16);
    }

  ret = svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
  if (ret != sizeof (hdr))
    {
      QUIC_ERR ("Not enough space to enqueue header");
      return QUIC_ERROR_FULL_FIFO;
    }
  ret = svm_fifo_enqueue (f, len, packet->data.base);
  if (ret != len)
    {
      QUIC_ERR ("Not enough space to enqueue payload");
      return QUIC_ERROR_FULL_FIFO;
    }

  quic_increment_counter (QUIC_ERROR_TX_PACKETS, 1);

  return 0;
}

static int
quic_send_packets (quic_ctx_t * ctx)
{
  quic_main_t *qm = &quic_main;
  quicly_datagram_t *packets[QUIC_SEND_PACKET_VEC_SIZE];
  session_t *udp_session;
  quicly_conn_t *conn;
  size_t num_packets, i, max_packets;
  quicly_packet_allocator_t *pa;
  int err = 0;
  u32 thread_index = vlib_get_thread_index ();

  /* We have sctx, get qctx */
  if (quic_ctx_is_stream (ctx))
    ctx = quic_ctx_get (ctx->quic_connection_ctx_id, ctx->c_thread_index);

  QUIC_ASSERT (!quic_ctx_is_stream (ctx));

  udp_session = session_get_from_handle_if_valid (ctx->udp_session_handle);
  if (!udp_session)
    goto quicly_error;

  conn = ctx->conn;

  if (!conn)
    return 0;

  /* TODO : quicly can assert it can send min_packets up to 2 */
  if (quic_sendable_packet_count (udp_session) < 2)
    goto stop_sending;

  pa = quic_get_quicly_ctx_from_ctx (ctx)->packet_allocator;
  do
    {
      max_packets = quic_sendable_packet_count (udp_session);
      if (max_packets < 2)
	break;
      num_packets = max_packets;
      if ((err = quicly_send (conn, packets, &num_packets)))
	goto quicly_error;

      quic_crypto_batch_tx_packets (&qm->wrk_ctx
				    [thread_index].crypto_context_batch);

      for (i = 0; i != num_packets; ++i)
	{
	  quic_crypto_finalize_send_packet (packets[i]);
	  if ((err = quic_send_datagram (udp_session, packets[i])))
	    goto quicly_error;

	  pa->free_packet (pa, packets[i]);
	}
    }
  while (num_packets > 0 && num_packets == max_packets);

stop_sending:
  quic_set_udp_tx_evt (udp_session);

  QUIC_DBG (3, "%u[TX] %u[RX]", svm_fifo_max_dequeue (udp_session->tx_fifo),
	    svm_fifo_max_dequeue (udp_session->rx_fifo));
  quic_update_timer (ctx);
  return 0;

quicly_error:
  if (err && err != QUICLY_ERROR_PACKET_IGNORED
      && err != QUICLY_ERROR_FREE_CONNECTION)
    clib_warning ("Quic error '%U'.", quic_format_err, err);
  quic_connection_closed (ctx);
  return 1;
}

/* Quicly callbacks */

static void
quic_on_stream_destroy (quicly_stream_t * stream, int err)
{
  quic_stream_data_t *stream_data = (quic_stream_data_t *) stream->data;
  quic_ctx_t *sctx = quic_ctx_get (stream_data->ctx_id,
				   stream_data->thread_index);
  session_t *stream_session = session_get (sctx->c_s_index,
					   sctx->c_thread_index);
  QUIC_DBG (2, "DESTROYED_STREAM: session 0x%lx (%U)",
	    session_handle (stream_session), quic_format_err, err);

  stream_session->session_state = SESSION_STATE_CLOSED;
  session_transport_delete_notify (&sctx->connection);

  quic_increment_counter (QUIC_ERROR_CLOSED_STREAM, 1);
  quic_ctx_free (sctx);
  clib_mem_free (stream->data);
}

static void
quic_on_stop_sending (quicly_stream_t * stream, int err)
{
#if QUIC_DEBUG >= 2
  quic_stream_data_t *stream_data = (quic_stream_data_t *) stream->data;
  quic_ctx_t *sctx = quic_ctx_get (stream_data->ctx_id,
				   stream_data->thread_index);
  session_t *stream_session = session_get (sctx->c_s_index,
					   sctx->c_thread_index);
  clib_warning ("(NOT IMPLEMENTD) STOP_SENDING: session 0x%lx (%U)",
		session_handle (stream_session), quic_format_err, err);
#endif
  /* TODO : handle STOP_SENDING */
}

static void
quic_on_receive_reset (quicly_stream_t * stream, int err)
{
  quic_stream_data_t *stream_data = (quic_stream_data_t *) stream->data;
  quic_ctx_t *sctx = quic_ctx_get (stream_data->ctx_id,
				   stream_data->thread_index);
#if QUIC_DEBUG >= 2
  session_t *stream_session = session_get (sctx->c_s_index,
					   sctx->c_thread_index);
  clib_warning ("RESET_STREAM: session 0x%lx (%U)",
		session_handle (stream_session), quic_format_err, err);
#endif
  session_transport_closing_notify (&sctx->connection);
}

static void
quic_on_receive (quicly_stream_t * stream, size_t off, const void *src,
		 size_t len)
{
  QUIC_DBG (3, "received data: %lu bytes, offset %lu", len, off);
  u32 max_enq, rlen, rv;
  quic_ctx_t *sctx;
  session_t *stream_session;
  app_worker_t *app_wrk;
  svm_fifo_t *f;
  quic_stream_data_t *stream_data;

  if (!len)
    return;

  stream_data = (quic_stream_data_t *) stream->data;
  sctx = quic_ctx_get (stream_data->ctx_id, stream_data->thread_index);
  stream_session = session_get (sctx->c_s_index, stream_data->thread_index);
  f = stream_session->rx_fifo;

  max_enq = svm_fifo_max_enqueue_prod (f);
  QUIC_DBG (3, "Enqueuing %u at off %u in %u space", len, off, max_enq);
  /* Handle duplicate packet/chunk from quicly */
  if (off < stream_data->app_rx_data_len)
    {
      QUIC_DBG (3, "Session [idx %u, app_wrk %u, thread %u, rx-fifo 0x%llx]: "
		"DUPLICATE PACKET (max_enq %u, len %u, "
		"app_rx_data_len %u, off %u, ToBeNQ %u)",
		stream_session->session_index,
		stream_session->app_wrk_index,
		stream_session->thread_index, f,
		max_enq, len, stream_data->app_rx_data_len, off,
		off - stream_data->app_rx_data_len + len);
      return;
    }
  if (PREDICT_FALSE ((off - stream_data->app_rx_data_len + len) > max_enq))
    {
      QUIC_ERR ("Session [idx %u, app_wrk %u, thread %u, rx-fifo 0x%llx]: "
		"RX FIFO IS FULL (max_enq %u, len %u, "
		"app_rx_data_len %u, off %u, ToBeNQ %u)",
		stream_session->session_index,
		stream_session->app_wrk_index,
		stream_session->thread_index, f,
		max_enq, len, stream_data->app_rx_data_len, off,
		off - stream_data->app_rx_data_len + len);
      return;			/* This shouldn't happen */
    }
  if (off == stream_data->app_rx_data_len)
    {
      /* Streams live on the same thread so (f, stream_data) should stay consistent */
      rlen = svm_fifo_enqueue (f, len, (u8 *) src);
      QUIC_DBG (3, "Session [idx %u, app_wrk %u, ti %u, rx-fifo 0x%llx]: "
		"Enqueuing %u (rlen %u) at off %u in %u space, ",
		stream_session->session_index,
		stream_session->app_wrk_index,
		stream_session->thread_index, f, len, rlen, off, max_enq);
      stream_data->app_rx_data_len += rlen;
      QUIC_ASSERT (rlen >= len);
      app_wrk = app_worker_get_if_valid (stream_session->app_wrk_index);
      if (PREDICT_TRUE (app_wrk != 0))
	{
	  rv = app_worker_lock_and_send_event (app_wrk, stream_session,
					       SESSION_IO_EVT_RX);
	  if (rv)
	    QUIC_ERR ("Failed to ping app for RX");
	}
      quic_ack_rx_data (stream_session);
    }
  else
    {
      rlen = svm_fifo_enqueue_with_offset (f,
					   off - stream_data->app_rx_data_len,
					   len, (u8 *) src);
      QUIC_ASSERT (rlen == 0);
    }
  return;
}

void
quic_fifo_egress_shift (quicly_stream_t * stream, size_t delta)
{
  quic_stream_data_t *stream_data;
  session_t *stream_session;
  quic_ctx_t *ctx;
  svm_fifo_t *f;
  u32 rv;

  stream_data = (quic_stream_data_t *) stream->data;
  stream_session = get_stream_session_and_ctx_from_stream (stream, &ctx);
  f = stream_session->tx_fifo;

  QUIC_ASSERT (stream_data->app_tx_data_len >= delta);
  stream_data->app_tx_data_len -= delta;
  ctx->bytes_written += delta;
  rv = svm_fifo_dequeue_drop (f, delta);
  QUIC_ASSERT (rv == delta);

  rv = quicly_stream_sync_sendbuf (stream, 0);
  QUIC_ASSERT (!rv);
}

void
quic_fifo_egress_emit (quicly_stream_t * stream, size_t off, void *dst,
		       size_t * len, int *wrote_all)
{
  quic_stream_data_t *stream_data;
  quic_ctx_t *ctx;
  session_t *stream_session;
  svm_fifo_t *f;
  u32 deq_max;

  stream_data = (quic_stream_data_t *) stream->data;
  stream_session = get_stream_session_and_ctx_from_stream (stream, &ctx);
  f = stream_session->tx_fifo;

  QUIC_DBG (3, "Emitting %u, offset %u", *len, off);

  deq_max = svm_fifo_max_dequeue_cons (f);
  QUIC_ASSERT (off <= deq_max);
  if (off + *len < deq_max)
    {
      *wrote_all = 0;
    }
  else
    {
      *wrote_all = 1;
      *len = deq_max - off;
    }
  QUIC_ASSERT (*len > 0);

  if (off + *len > stream_data->app_tx_data_len)
    stream_data->app_tx_data_len = off + *len;

  svm_fifo_peek (f, off, *len, dst);
}

static const quicly_stream_callbacks_t quic_stream_callbacks = {
  .on_destroy = quic_on_stream_destroy,
  .on_send_shift = quic_fifo_egress_shift,
  .on_send_emit = quic_fifo_egress_emit,
  .on_send_stop = quic_on_stop_sending,
  .on_receive = quic_on_receive,
  .on_receive_reset = quic_on_receive_reset
};

static int
quic_on_stream_open (quicly_stream_open_t * self, quicly_stream_t * stream)
{
  /* Return code for this function ends either
   * - in quicly_receive : if not QUICLY_ERROR_PACKET_IGNORED, will close connection
   * - in quicly_open_stream, returned directly
   */

  session_t *stream_session, *quic_session;
  quic_stream_data_t *stream_data;
  app_worker_t *app_wrk;
  quic_ctx_t *qctx, *sctx;
  u32 sctx_id;
  int rv;

  QUIC_DBG (2, "on_stream_open called");
  stream->data = clib_mem_alloc (sizeof (quic_stream_data_t));
  stream->callbacks = &quic_stream_callbacks;
  /* Notify accept on parent qsession, but only if this is not a locally
   * initiated stream */
  if (quicly_stream_is_self_initiated (stream))
    return 0;

  sctx_id = quic_ctx_alloc (vlib_get_thread_index ());
  qctx = quic_get_conn_ctx (stream->conn);

  /* Might need to signal that the connection is ready if the first thing the
   * server does is open a stream */
  quic_check_quic_session_connected (qctx);
  /* ctx might be invalidated */
  qctx = quic_get_conn_ctx (stream->conn);

  stream_session = session_alloc (qctx->c_thread_index);
  QUIC_DBG (2, "ACCEPTED stream_session 0x%lx ctx %u",
	    session_handle (stream_session), sctx_id);
  sctx = quic_ctx_get (sctx_id, qctx->c_thread_index);
  sctx->parent_app_wrk_id = qctx->parent_app_wrk_id;
  sctx->parent_app_id = qctx->parent_app_id;
  sctx->quic_connection_ctx_id = qctx->c_c_index;
  sctx->c_c_index = sctx_id;
  sctx->c_s_index = stream_session->session_index;
  sctx->stream = stream;
  sctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
  sctx->flags |= QUIC_F_IS_STREAM;
  if (quicly_stream_is_unidirectional (stream->stream_id))
    stream_session->flags |= SESSION_F_UNIDIRECTIONAL;

  stream_data = (quic_stream_data_t *) stream->data;
  stream_data->ctx_id = sctx_id;
  stream_data->thread_index = sctx->c_thread_index;
  stream_data->app_rx_data_len = 0;
  stream_data->app_tx_data_len = 0;

  sctx->c_s_index = stream_session->session_index;
  stream_session->session_state = SESSION_STATE_CREATED;
  stream_session->app_wrk_index = sctx->parent_app_wrk_id;
  stream_session->connection_index = sctx->c_c_index;
  stream_session->session_type =
    session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, qctx->udp_is_ip4);
  quic_session = session_get (qctx->c_s_index, qctx->c_thread_index);
  stream_session->listener_handle = listen_session_get_handle (quic_session);

  app_wrk = app_worker_get (stream_session->app_wrk_index);
  if ((rv = app_worker_init_connected (app_wrk, stream_session)))
    {
      QUIC_ERR ("failed to allocate fifos");
      quicly_reset_stream (stream, QUIC_APP_ALLOCATION_ERROR);
      return 0;			/* Frame is still valid */
    }
  svm_fifo_add_want_deq_ntf (stream_session->rx_fifo,
			     SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
			     SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);

  if ((rv = app_worker_accept_notify (app_wrk, stream_session)))
    {
      QUIC_ERR ("failed to notify accept worker app");
      quicly_reset_stream (stream, QUIC_APP_ACCEPT_NOTIFY_ERROR);
      return 0;			/* Frame is still valid */
    }

  return 0;
}

static void
quic_on_closed_by_peer (quicly_closed_by_peer_t * self, quicly_conn_t * conn,
			int code, uint64_t frame_type,
			const char *reason, size_t reason_len)
{
  quic_ctx_t *ctx = quic_get_conn_ctx (conn);
#if QUIC_DEBUG >= 2
  session_t *quic_session = session_get (ctx->c_s_index, ctx->c_thread_index);
  clib_warning ("Session 0x%lx closed by peer (%U) %.*s ",
		session_handle (quic_session), quic_format_err, code,
		reason_len, reason);
#endif
  ctx->conn_state = QUIC_CONN_STATE_PASSIVE_CLOSING;
  session_transport_closing_notify (&ctx->connection);
}

/* Timer handling */

static int64_t
quic_get_thread_time (u8 thread_index)
{
  return quic_main.wrk_ctx[thread_index].time_now;
}

static int64_t
quic_get_time (quicly_now_t * self)
{
  u8 thread_index = vlib_get_thread_index ();
  return quic_get_thread_time (thread_index);
}

static u32
quic_set_time_now (u32 thread_index)
{
  vlib_main_t *vlib_main = vlib_get_main ();
  f64 time = vlib_time_now (vlib_main);
  quic_main.wrk_ctx[thread_index].time_now = (int64_t) (time * 1000.f);
  return quic_main.wrk_ctx[thread_index].time_now;
}

/* Transport proto callback */
static void
quic_update_time (f64 now, u8 thread_index)
{
  tw_timer_wheel_1t_3w_1024sl_ov_t *tw;

  tw = &quic_main.wrk_ctx[thread_index].timer_wheel;
  quic_set_time_now (thread_index);
  tw_timer_expire_timers_1t_3w_1024sl_ov (tw, now);
}

static void
quic_timer_expired (u32 conn_index)
{
  quic_ctx_t *ctx;
  QUIC_DBG (4, "Timer expired for conn %u at %ld", conn_index,
	    quic_get_time (NULL));
  ctx = quic_ctx_get (conn_index, vlib_get_thread_index ());
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  quic_send_packets (ctx);
}

static void
quic_update_timer (quic_ctx_t * ctx)
{
  tw_timer_wheel_1t_3w_1024sl_ov_t *tw;
  int64_t next_timeout, next_interval;
  session_t *quic_session;
  int rv;

  /*  This timeout is in ms which is the unit of our timer */
  next_timeout = quicly_get_first_timeout (ctx->conn);
  next_interval = next_timeout - quic_get_time (NULL);

  if (next_timeout == 0 || next_interval <= 0)
    {
      if (ctx->c_s_index == QUIC_SESSION_INVALID)
	{
	  next_interval = 1;
	}
      else
	{
	  quic_session = session_get (ctx->c_s_index, ctx->c_thread_index);
	  if (svm_fifo_set_event (quic_session->tx_fifo))
	    {
	      rv = session_send_io_evt_to_thread_custom (quic_session,
							 quic_session->thread_index,
							 SESSION_IO_EVT_BUILTIN_TX);
	      if (PREDICT_FALSE (rv))
		QUIC_ERR ("Failed to enqueue builtin_tx %d", rv);
	    }
	  return;
	}
    }

  tw = &quic_main.wrk_ctx[vlib_get_thread_index ()].timer_wheel;

  QUIC_DBG (4, "Timer set to %ld (int %ld) for ctx %u", next_timeout,
	    next_interval, ctx->c_c_index);

  if (ctx->timer_handle == QUIC_TIMER_HANDLE_INVALID)
    {
      if (next_timeout == INT64_MAX)
	{
	  QUIC_DBG (4, "timer for ctx %u already stopped", ctx->c_c_index);
	  return;
	}
      ctx->timer_handle = tw_timer_start_1t_3w_1024sl_ov (tw, ctx->c_c_index,
							  0, next_interval);
    }
  else
    {
      if (next_timeout == INT64_MAX)
	{
	  quic_stop_ctx_timer (ctx);
	}
      else
	tw_timer_update_1t_3w_1024sl_ov (tw, ctx->timer_handle,
					 next_interval);
    }
  return;
}

static void
quic_expired_timers_dispatch (u32 * expired_timers)
{
  int i;

  for (i = 0; i < vec_len (expired_timers); i++)
    {
      quic_timer_expired (expired_timers[i]);
    }
}

/* Transport proto functions */
static int
quic_connect_stream (session_t * quic_session, session_endpoint_cfg_t * sep)
{
  uint64_t quic_session_handle;
  session_t *stream_session;
  quic_stream_data_t *stream_data;
  quicly_stream_t *stream;
  quicly_conn_t *conn;
  app_worker_t *app_wrk;
  quic_ctx_t *qctx, *sctx;
  u32 sctx_index;
  u8 is_unidir;
  int rv;

  /*  Find base session to which the user want to attach a stream */
  quic_session_handle = session_handle (quic_session);
  QUIC_DBG (2, "Opening new stream (qsession %u)", quic_session_handle);

  if (session_type_transport_proto (quic_session->session_type) !=
      TRANSPORT_PROTO_QUIC)
    {
      QUIC_ERR ("received incompatible session");
      return -1;
    }

  app_wrk = app_worker_get_if_valid (quic_session->app_wrk_index);
  if (!app_wrk)
    {
      QUIC_ERR ("Invalid app worker :(");
      return -1;
    }

  sctx_index = quic_ctx_alloc (quic_session->thread_index);	/*  Allocate before we get pointers */
  sctx = quic_ctx_get (sctx_index, quic_session->thread_index);
  qctx = quic_ctx_get (quic_session->connection_index,
		       quic_session->thread_index);
  if (quic_ctx_is_stream (qctx))
    {
      QUIC_ERR ("session is a stream");
      quic_ctx_free (sctx);
      return -1;
    }

  sctx->parent_app_wrk_id = qctx->parent_app_wrk_id;
  sctx->parent_app_id = qctx->parent_app_id;
  sctx->quic_connection_ctx_id = qctx->c_c_index;
  sctx->c_c_index = sctx_index;
  sctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
  sctx->flags |= QUIC_F_IS_STREAM;

  conn = qctx->conn;

  if (!conn || !quicly_connection_is_ready (conn))
    return -1;

  is_unidir = sep->transport_flags & TRANSPORT_CFG_F_UNIDIRECTIONAL;
  if ((rv = quicly_open_stream (conn, &stream, is_unidir)))
    {
      QUIC_DBG (2, "Stream open failed with %d", rv);
      return -1;
    }
  quic_increment_counter (QUIC_ERROR_OPENED_STREAM, 1);

  sctx->stream = stream;

  QUIC_DBG (2, "Opened stream %d, creating session", stream->stream_id);

  stream_session = session_alloc (qctx->c_thread_index);
  QUIC_DBG (2, "Allocated stream_session 0x%lx ctx %u",
	    session_handle (stream_session), sctx_index);
  stream_session->app_wrk_index = app_wrk->wrk_index;
  stream_session->connection_index = sctx_index;
  stream_session->listener_handle = quic_session_handle;
  stream_session->session_type =
    session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, qctx->udp_is_ip4);
  if (is_unidir)
    stream_session->flags |= SESSION_F_UNIDIRECTIONAL;

  sctx->c_s_index = stream_session->session_index;
  stream_data = (quic_stream_data_t *) stream->data;
  stream_data->ctx_id = sctx->c_c_index;
  stream_data->thread_index = sctx->c_thread_index;
  stream_data->app_rx_data_len = 0;
  stream_data->app_tx_data_len = 0;
  stream_session->session_state = SESSION_STATE_READY;

  /* For now we only reset streams. Cleanup will be triggered by timers */
  if ((rv = app_worker_init_connected (app_wrk, stream_session)))
    {
      QUIC_ERR ("failed to app_worker_init_connected");
      quicly_reset_stream (stream, QUIC_APP_CONNECT_NOTIFY_ERROR);
      return app_worker_connect_notify (app_wrk, NULL, rv, sep->opaque);
    }

  svm_fifo_add_want_deq_ntf (stream_session->rx_fifo,
			     SVM_FIFO_WANT_DEQ_NOTIF_IF_FULL |
			     SVM_FIFO_WANT_DEQ_NOTIF_IF_EMPTY);

  if (app_worker_connect_notify (app_wrk, stream_session, SESSION_E_NONE,
				 sep->opaque))
    {
      QUIC_ERR ("failed to notify app");
      quic_increment_counter (QUIC_ERROR_CLOSED_STREAM, 1);
      quicly_reset_stream (stream, QUIC_APP_CONNECT_NOTIFY_ERROR);
      return -1;
    }

  return 0;
}

static int
quic_connect_connection (session_endpoint_cfg_t * sep)
{
  vnet_connect_args_t _cargs, *cargs = &_cargs;
  quic_main_t *qm = &quic_main;
  quic_ctx_t *ctx;
  app_worker_t *app_wrk;
  application_t *app;
  u32 ctx_index;
  u32 thread_index = vlib_get_thread_index ();
  int error;

  clib_memset (cargs, 0, sizeof (*cargs));
  ctx_index = quic_ctx_alloc (thread_index);
  ctx = quic_ctx_get (ctx_index, thread_index);
  ctx->parent_app_wrk_id = sep->app_wrk_index;
  ctx->c_s_index = QUIC_SESSION_INVALID;
  ctx->c_c_index = ctx_index;
  ctx->udp_is_ip4 = sep->is_ip4;
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  ctx->conn_state = QUIC_CONN_STATE_HANDSHAKE;
  ctx->client_opaque = sep->opaque;
  ctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
  if (sep->hostname)
    ctx->srv_hostname = format (0, "%v", sep->hostname);
  else
    /*  needed by quic for crypto + determining client / server */
    ctx->srv_hostname = format (0, "%U", format_ip46_address,
				&sep->ip, sep->is_ip4);
  vec_terminate_c_string (ctx->srv_hostname);

  clib_memcpy (&cargs->sep, sep, sizeof (session_endpoint_cfg_t));
  cargs->sep.transport_proto = TRANSPORT_PROTO_UDP;
  cargs->app_index = qm->app_index;
  cargs->api_context = ctx_index;

  app_wrk = app_worker_get (sep->app_wrk_index);
  app = application_get (app_wrk->app_index);
  ctx->parent_app_id = app_wrk->app_index;
  cargs->sep_ext.ns_index = app->ns_index;
  cargs->sep_ext.transport_flags = TRANSPORT_CFG_F_CONNECTED;

  ctx->crypto_engine = sep->crypto_engine;
  ctx->ckpair_index = sep->ckpair_index;
  if ((error = quic_acquire_crypto_context (ctx)))
    return error;

  if ((error = vnet_connect (cargs)))
    return error;

  return 0;
}

static int
quic_connect (transport_endpoint_cfg_t * tep)
{
  QUIC_DBG (2, "Called quic_connect");
  session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) tep;
  session_t *quic_session;
  sep = (session_endpoint_cfg_t *) tep;

  quic_session = session_get_from_handle_if_valid (sep->parent_handle);
  if (quic_session)
    return quic_connect_stream (quic_session, sep);
  else
    return quic_connect_connection (sep);
}

static void
quic_proto_on_close (u32 ctx_index, u32 thread_index)
{
  int err;
  quic_ctx_t *ctx = quic_ctx_get_if_valid (ctx_index, thread_index);
  if (!ctx)
    return;
  session_t *stream_session = session_get (ctx->c_s_index,
					   ctx->c_thread_index);
#if QUIC_DEBUG >= 2
  clib_warning ("Closing session 0x%lx", session_handle (stream_session));
#endif
  if (quic_ctx_is_stream (ctx))
    {
      quicly_stream_t *stream = ctx->stream;
      if (!quicly_stream_has_send_side (quicly_is_client (stream->conn),
					stream->stream_id))
	return;
      quicly_sendstate_shutdown (&stream->sendstate, ctx->bytes_written +
				 svm_fifo_max_dequeue
				 (stream_session->tx_fifo));
      err = quicly_stream_sync_sendbuf (stream, 1);
      if (err)
	{
	  QUIC_DBG (1, "sendstate_shutdown failed for stream session %lu",
		    session_handle (stream_session));
	  quicly_reset_stream (stream, QUIC_APP_ERROR_CLOSE_NOTIFY);
	}
      quic_send_packets (ctx);
      return;
    }

  switch (ctx->conn_state)
    {
    case QUIC_CONN_STATE_OPENED:
    case QUIC_CONN_STATE_HANDSHAKE:
    case QUIC_CONN_STATE_READY:
      ctx->conn_state = QUIC_CONN_STATE_ACTIVE_CLOSING;
      quicly_conn_t *conn = ctx->conn;
      /* Start connection closing. Keep sending packets until quicly_send
         returns QUICLY_ERROR_FREE_CONNECTION */

      quic_increment_counter (QUIC_ERROR_CLOSED_CONNECTION, 1);
      quicly_close (conn, QUIC_APP_ERROR_CLOSE_NOTIFY, "Closed by peer");
      /* This also causes all streams to be closed (and the cb called) */
      quic_send_packets (ctx);
      break;
    case QUIC_CONN_STATE_PASSIVE_CLOSING:
      ctx->conn_state = QUIC_CONN_STATE_PASSIVE_CLOSING_APP_CLOSED;
      /* send_packets will eventually return an error, we delete the conn at
         that point */
      break;
    case QUIC_CONN_STATE_PASSIVE_CLOSING_QUIC_CLOSED:
      quic_connection_delete (ctx);
      break;
    case QUIC_CONN_STATE_ACTIVE_CLOSING:
      break;
    default:
      QUIC_ERR ("Trying to close conn in state %d", ctx->conn_state);
      break;
    }
}

static u32
quic_start_listen (u32 quic_listen_session_index, transport_endpoint_t * tep)
{
  vnet_listen_args_t _bargs, *args = &_bargs;
  quic_main_t *qm = &quic_main;
  session_handle_t udp_handle;
  session_endpoint_cfg_t *sep;
  session_t *udp_listen_session;
  app_worker_t *app_wrk;
  application_t *app;
  quic_ctx_t *lctx;
  u32 lctx_index;
  app_listener_t *app_listener;
  int rv;

  sep = (session_endpoint_cfg_t *) tep;
  app_wrk = app_worker_get (sep->app_wrk_index);
  /* We need to call this because we call app_worker_init_connected in
   * quic_accept_stream, which assumes the connect segment manager exists */
  app_worker_alloc_connects_segment_manager (app_wrk);
  app = application_get (app_wrk->app_index);
  QUIC_DBG (2, "Called quic_start_listen for app %d", app_wrk->app_index);

  clib_memset (args, 0, sizeof (*args));
  args->app_index = qm->app_index;
  args->sep_ext = *sep;
  args->sep_ext.ns_index = app->ns_index;
  args->sep_ext.transport_proto = TRANSPORT_PROTO_UDP;
  args->sep_ext.transport_flags = TRANSPORT_CFG_F_CONNECTED;
  if ((rv = vnet_listen (args)))
    return rv;

  lctx_index = quic_ctx_alloc (0);
  udp_handle = args->handle;
  app_listener = app_listener_get_w_handle (udp_handle);
  udp_listen_session = app_listener_get_session (app_listener);
  udp_listen_session->opaque = lctx_index;

  lctx = quic_ctx_get (lctx_index, 0);
  lctx->flags |= QUIC_F_IS_LISTENER;

  clib_memcpy (&lctx->c_rmt_ip, &args->sep.peer.ip, sizeof (ip46_address_t));
  clib_memcpy (&lctx->c_lcl_ip, &args->sep.ip, sizeof (ip46_address_t));
  lctx->c_rmt_port = args->sep.peer.port;
  lctx->c_lcl_port = args->sep.port;
  lctx->c_is_ip4 = args->sep.is_ip4;
  lctx->c_fib_index = args->sep.fib_index;
  lctx->c_proto = TRANSPORT_PROTO_QUIC;
  lctx->parent_app_wrk_id = sep->app_wrk_index;
  lctx->parent_app_id = app_wrk->app_index;
  lctx->udp_session_handle = udp_handle;
  lctx->c_s_index = quic_listen_session_index;
  lctx->crypto_engine = sep->crypto_engine;
  lctx->ckpair_index = sep->ckpair_index;
  if (quic_acquire_crypto_context (lctx))
    return -1;

  QUIC_DBG (2, "Listening UDP session 0x%lx",
	    session_handle (udp_listen_session));
  QUIC_DBG (2, "Listening QUIC session 0x%lx", quic_listen_session_index);
  return lctx_index;
}

static u32
quic_stop_listen (u32 lctx_index)
{
  QUIC_DBG (2, "Called quic_stop_listen");
  quic_ctx_t *lctx;
  lctx = quic_ctx_get (lctx_index, 0);
  QUIC_ASSERT (quic_ctx_is_listener (lctx));
  vnet_unlisten_args_t a = {
    .handle = lctx->udp_session_handle,
    .app_index = quic_main.app_index,
    .wrk_map_index = 0		/* default wrk */
  };
  if (vnet_unlisten (&a))
    clib_warning ("unlisten errored");

  quic_release_crypto_context (lctx->crypto_context_index,
			       0 /* thread_index */ );
  quic_ctx_free (lctx);
  return 0;
}

static transport_connection_t *
quic_connection_get (u32 ctx_index, u32 thread_index)
{
  quic_ctx_t *ctx;
  ctx = quic_ctx_get (ctx_index, thread_index);
  return &ctx->connection;
}

static transport_connection_t *
quic_listener_get (u32 listener_index)
{
  QUIC_DBG (2, "Called quic_listener_get");
  quic_ctx_t *ctx;
  ctx = quic_ctx_get (listener_index, 0);
  return &ctx->connection;
}

static u8 *
format_quic_ctx (u8 * s, va_list * args)
{
  quic_ctx_t *ctx = va_arg (*args, quic_ctx_t *);
  u32 verbose = va_arg (*args, u32);
  u8 *str = 0;

  if (!ctx)
    return s;
  str = format (str, "[#%d][Q] ", ctx->c_thread_index);

  if (quic_ctx_is_listener (ctx))
    str = format (str, "Listener, UDP %ld", ctx->udp_session_handle);
  else if (quic_ctx_is_stream (ctx))
    str = format (str, "Stream %ld conn %d",
		  ctx->stream->stream_id, ctx->quic_connection_ctx_id);
  else				/* connection */
    str = format (str, "Conn %d UDP %d", ctx->c_c_index,
		  ctx->udp_session_handle);

  str = format (str, " app %d wrk %d", ctx->parent_app_id,
		ctx->parent_app_wrk_id);

  if (verbose == 1)
    s = format (s, "%-50s%-15d", str, ctx->conn_state);
  else
    s = format (s, "%s\n", str);
  vec_free (str);
  return s;
}

static u8 *
format_quic_connection (u8 * s, va_list * args)
{
  u32 qc_index = va_arg (*args, u32);
  u32 thread_index = va_arg (*args, u32);
  u32 verbose = va_arg (*args, u32);
  quic_ctx_t *ctx = quic_ctx_get (qc_index, thread_index);
  s = format (s, "%U", format_quic_ctx, ctx, verbose);
  return s;
}

static u8 *
format_quic_half_open (u8 * s, va_list * args)
{
  u32 qc_index = va_arg (*args, u32);
  u32 thread_index = va_arg (*args, u32);
  quic_ctx_t *ctx = quic_ctx_get (qc_index, thread_index);
  s = format (s, "[#%d][Q] half-open app %u", thread_index,
	      ctx->parent_app_id);
  return s;
}

/*  TODO improve */
static u8 *
format_quic_listener (u8 * s, va_list * args)
{
  u32 tci = va_arg (*args, u32);
  u32 thread_index = va_arg (*args, u32);
  u32 verbose = va_arg (*args, u32);
  quic_ctx_t *ctx = quic_ctx_get (tci, thread_index);
  s = format (s, "%U", format_quic_ctx, ctx, verbose);
  return s;
}

/* Session layer callbacks */

static inline void
quic_build_sockaddr (struct sockaddr *sa, socklen_t * salen,
		     ip46_address_t * addr, u16 port, u8 is_ip4)
{
  if (is_ip4)
    {
      struct sockaddr_in *sa4 = (struct sockaddr_in *) sa;
      sa4->sin_family = AF_INET;
      sa4->sin_port = port;
      sa4->sin_addr.s_addr = addr->ip4.as_u32;
      *salen = sizeof (struct sockaddr_in);
    }
  else
    {
      struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *) sa;
      sa6->sin6_family = AF_INET6;
      sa6->sin6_port = port;
      clib_memcpy (&sa6->sin6_addr, &addr->ip6, 16);
      *salen = sizeof (struct sockaddr_in6);
    }
}

static void
quic_on_quic_session_connected (quic_ctx_t * ctx)
{
  session_t *quic_session;
  app_worker_t *app_wrk;
  u32 ctx_id = ctx->c_c_index;
  u32 thread_index = ctx->c_thread_index;
  int rv;

  quic_session = session_alloc (thread_index);

  QUIC_DBG (2, "Allocated quic session 0x%lx", session_handle (quic_session));
  ctx->c_s_index = quic_session->session_index;
  quic_session->app_wrk_index = ctx->parent_app_wrk_id;
  quic_session->connection_index = ctx->c_c_index;
  quic_session->listener_handle = SESSION_INVALID_HANDLE;
  quic_session->session_type =
    session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, ctx->udp_is_ip4);

  /* If quic session connected fails, immediatly close connection */
  app_wrk = app_worker_get (ctx->parent_app_wrk_id);
  if ((rv = app_worker_init_connected (app_wrk, quic_session)))
    {
      QUIC_ERR ("failed to app_worker_init_connected");
      quic_proto_on_close (ctx_id, thread_index);
      app_worker_connect_notify (app_wrk, NULL, rv, ctx->client_opaque);
      return;
    }

  quic_session->session_state = SESSION_STATE_CONNECTING;
  if ((rv = app_worker_connect_notify (app_wrk, quic_session,
				       SESSION_E_NONE, ctx->client_opaque)))
    {
      QUIC_ERR ("failed to notify app %d", rv);
      quic_proto_on_close (ctx_id, thread_index);
      return;
    }

  /*  If the app opens a stream in its callback it may invalidate ctx */
  ctx = quic_ctx_get (ctx_id, thread_index);
  /*
   * app_worker_connect_notify() might have reallocated pool, reload
   * quic_session pointer
   */
  quic_session = session_get (ctx->c_s_index, thread_index);
  quic_session->session_state = SESSION_STATE_LISTENING;
}

static void
quic_check_quic_session_connected (quic_ctx_t * ctx)
{
  /* Called when we need to trigger quic session connected
   * we may call this function on the server side / at
   * stream opening */

  /* Conn may be set to null if the connection is terminated */
  if (!ctx->conn || ctx->conn_state != QUIC_CONN_STATE_HANDSHAKE)
    return;
  if (!quicly_connection_is_ready (ctx->conn))
    return;
  ctx->conn_state = QUIC_CONN_STATE_READY;
  if (!quicly_is_client (ctx->conn))
    return;
  quic_on_quic_session_connected (ctx);
}

static inline void
quic_update_conn_ctx (quicly_conn_t * conn, quicly_context_t * quicly_context)
{
  /* we need to update the quicly_conn on migrate
   * as it contains a pointer to the crypto context */
  ptls_context_t **tls;
  quicly_context_t **_quicly_context;
  _quicly_context = (quicly_context_t **) conn;
  *_quicly_context = quicly_context;
  tls = (ptls_context_t **) quicly_get_tls (conn);
  *tls = quicly_context->tls;
}

static void
quic_receive_connection (void *arg)
{
  u32 new_ctx_id, thread_index = vlib_get_thread_index ();
  quic_ctx_t *temp_ctx, *new_ctx;
  clib_bihash_kv_16_8_t kv;
  quicly_conn_t *conn;
  quicly_context_t *quicly_context;
  session_t *udp_session;

  temp_ctx = arg;
  new_ctx_id = quic_ctx_alloc (thread_index);
  new_ctx = quic_ctx_get (new_ctx_id, thread_index);

  QUIC_DBG (2, "Received conn %u (now %u)", temp_ctx->c_thread_index,
	    new_ctx_id);

  clib_memcpy (new_ctx, temp_ctx, sizeof (quic_ctx_t));
  clib_mem_free (temp_ctx);

  new_ctx->c_thread_index = thread_index;
  new_ctx->c_c_index = new_ctx_id;
  quic_acquire_crypto_context (new_ctx);

  conn = new_ctx->conn;
  quicly_context = quic_get_quicly_ctx_from_ctx (new_ctx);
  quic_update_conn_ctx (conn, quicly_context);

  quic_store_conn_ctx (conn, new_ctx);
  quic_make_connection_key (&kv, quicly_get_master_id (conn));
  kv.value = ((u64) thread_index) << 32 | (u64) new_ctx_id;
  QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
  clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );
  new_ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  quic_update_timer (new_ctx);

  /*  Trigger write on this connection if necessary */
  udp_session = session_get_from_handle (new_ctx->udp_session_handle);
  udp_session->opaque = new_ctx_id;
  udp_session->flags &= ~SESSION_F_IS_MIGRATING;
  if (svm_fifo_max_dequeue (udp_session->tx_fifo))
    quic_set_udp_tx_evt (udp_session);
}

static void
quic_transfer_connection (u32 ctx_index, u32 dest_thread)
{
  quic_ctx_t *ctx, *temp_ctx;
  u32 thread_index = vlib_get_thread_index ();

  QUIC_DBG (2, "Transferring conn %u to thread %u", ctx_index, dest_thread);

  temp_ctx = clib_mem_alloc (sizeof (quic_ctx_t));
  QUIC_ASSERT (temp_ctx != NULL);
  ctx = quic_ctx_get (ctx_index, thread_index);

  clib_memcpy (temp_ctx, ctx, sizeof (quic_ctx_t));

  quic_stop_ctx_timer (ctx);
  quic_release_crypto_context (ctx->crypto_context_index, thread_index);
  quic_ctx_free (ctx);

  /*  Send connection to destination thread */
  session_send_rpc_evt_to_thread (dest_thread, quic_receive_connection,
				  (void *) temp_ctx);
}

static int
quic_udp_session_connected_callback (u32 quic_app_index, u32 ctx_index,
				     session_t * udp_session,
				     session_error_t err)
{
  QUIC_DBG (2, "QSession is now connected (id %u)",
	    udp_session->session_index);
  /* This should always be called before quic_connect returns since UDP always
   * connects instantly. */
  clib_bihash_kv_16_8_t kv;
  struct sockaddr_in6 sa6;
  struct sockaddr *sa = (struct sockaddr *) &sa6;
  socklen_t salen;
  transport_connection_t *tc;
  app_worker_t *app_wrk;
  quicly_conn_t *conn;
  quic_ctx_t *ctx;
  u32 thread_index = vlib_get_thread_index ();
  int ret;
  quicly_context_t *quicly_ctx;


  ctx = quic_ctx_get (ctx_index, thread_index);
  if (err)
    {
      u32 api_context;
      app_wrk = app_worker_get_if_valid (ctx->parent_app_wrk_id);
      if (app_wrk)
	{
	  api_context = ctx->c_s_index;
	  app_worker_connect_notify (app_wrk, 0, SESSION_E_NONE, api_context);
	}
      return 0;
    }

  ctx->c_thread_index = thread_index;
  ctx->c_c_index = ctx_index;

  QUIC_DBG (2, "Quic connect returned %u. New ctx [%u]%x",
	    is_fail, thread_index, (ctx) ? ctx_index : ~0);

  ctx->udp_session_handle = session_handle (udp_session);
  udp_session->opaque = ctx_index;

  /* Init QUIC lib connection
   * Generate required sockaddr & salen */
  tc = session_get_transport (udp_session);
  quic_build_sockaddr (sa, &salen, &tc->rmt_ip, tc->rmt_port, tc->is_ip4);

  quicly_ctx = quic_get_quicly_ctx_from_ctx (ctx);
  ret = quicly_connect (&ctx->conn, quicly_ctx, (char *) ctx->srv_hostname,
			sa, NULL, &quic_main.wrk_ctx[thread_index].next_cid,
			ptls_iovec_init (NULL, 0), &quic_main.hs_properties,
			NULL);
  ++quic_main.wrk_ctx[thread_index].next_cid.master_id;
  /*  Save context handle in quicly connection */
  quic_store_conn_ctx (ctx->conn, ctx);
  assert (ret == 0);

  /*  Register connection in connections map */
  conn = ctx->conn;
  quic_make_connection_key (&kv, quicly_get_master_id (conn));
  kv.value = ((u64) thread_index) << 32 | (u64) ctx_index;
  QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);
  clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );

  /*  UDP stack quirk? preemptively transfer connection if that happens */
  if (udp_session->thread_index != thread_index)
    quic_transfer_connection (ctx_index, udp_session->thread_index);
  else
    quic_send_packets (ctx);

  return ret;
}

static void
quic_udp_session_disconnect_callback (session_t * s)
{
  clib_warning ("UDP session disconnected???");
}

static void
quic_udp_session_cleanup_callback (session_t * udp_session,
				   session_cleanup_ntf_t ntf)
{
  quic_ctx_t *ctx;

  if (ntf != SESSION_CLEANUP_SESSION)
    return;

  ctx = quic_ctx_get (udp_session->opaque, udp_session->thread_index);
  quic_stop_ctx_timer (ctx);
  quic_release_crypto_context (ctx->crypto_context_index,
			       ctx->c_thread_index);
  quic_ctx_free (ctx);
}

static void
quic_udp_session_reset_callback (session_t * s)
{
  clib_warning ("UDP session reset???");
}

static void
quic_udp_session_migrate_callback (session_t * s, session_handle_t new_sh)
{
  u32 new_thread = session_thread_from_handle (new_sh);
  quic_ctx_t *ctx;

  QUIC_DBG (2, "Session %x migrated to %lx", s->session_index, new_sh);
  QUIC_ASSERT (vlib_get_thread_index () == s->thread_index);
  ctx = quic_ctx_get (s->opaque, s->thread_index);
  QUIC_ASSERT (ctx->udp_session_handle == session_handle (s));

  ctx->udp_session_handle = new_sh;
#if QUIC_DEBUG >= 1
  s->opaque = 0xfeedface;
#endif
  quic_transfer_connection (ctx->c_c_index, new_thread);
}

int
quic_udp_session_accepted_callback (session_t * udp_session)
{
  /* New UDP connection, try to accept it */
  u32 ctx_index;
  quic_ctx_t *ctx, *lctx;
  session_t *udp_listen_session;
  u32 thread_index = vlib_get_thread_index ();

  udp_listen_session =
    listen_session_get_from_handle (udp_session->listener_handle);

  ctx_index = quic_ctx_alloc (thread_index);
  ctx = quic_ctx_get (ctx_index, thread_index);
  ctx->c_thread_index = udp_session->thread_index;
  ctx->c_c_index = ctx_index;
  ctx->c_s_index = QUIC_SESSION_INVALID;
  ctx->udp_session_handle = session_handle (udp_session);
  QUIC_DBG (2, "ACCEPTED UDP 0x%lx", ctx->udp_session_handle);
  ctx->listener_ctx_id = udp_listen_session->opaque;
  lctx = quic_ctx_get (udp_listen_session->opaque,
		       udp_listen_session->thread_index);
  ctx->udp_is_ip4 = lctx->c_is_ip4;
  ctx->parent_app_id = lctx->parent_app_id;
  ctx->parent_app_wrk_id = lctx->parent_app_wrk_id;
  ctx->timer_handle = QUIC_TIMER_HANDLE_INVALID;
  ctx->conn_state = QUIC_CONN_STATE_OPENED;
  ctx->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;

  ctx->crypto_engine = lctx->crypto_engine;
  ctx->ckpair_index = lctx->ckpair_index;
  quic_acquire_crypto_context (ctx);
  udp_session->opaque = ctx_index;

  /* TODO timeout to delete these if they never connect */
  return 0;
}

static int
quic_add_segment_callback (u32 client_index, u64 seg_handle)
{
  /* No-op for builtin */
  return 0;
}

static int
quic_del_segment_callback (u32 client_index, u64 seg_handle)
{
  /* No-op for builtin */
  return 0;
}

static int
quic_custom_app_rx_callback (transport_connection_t * tc)
{
  quic_ctx_t *ctx;
  session_t *stream_session = session_get (tc->s_index, tc->thread_index);
  QUIC_DBG (3, "Received app READ notification");
  quic_ack_rx_data (stream_session);
  svm_fifo_reset_has_deq_ntf (stream_session->rx_fifo);

  /* Need to send packets (acks may never be sent otherwise) */
  ctx = quic_ctx_get (stream_session->connection_index,
		      stream_session->thread_index);
  quic_send_packets (ctx);
  return 0;
}

static int
quic_custom_tx_callback (void *s, transport_send_params_t * sp)
{
  session_t *stream_session = (session_t *) s;
  quic_stream_data_t *stream_data;
  quicly_stream_t *stream;
  quic_ctx_t *ctx;
  u32 max_deq;
  int rv;

  if (PREDICT_FALSE
      (stream_session->session_state >= SESSION_STATE_TRANSPORT_CLOSING))
    return 0;
  ctx = quic_ctx_get (stream_session->connection_index,
		      stream_session->thread_index);
  if (PREDICT_FALSE (!quic_ctx_is_stream (ctx)))
    {
      goto tx_end;		/* Most probably a reschedule */
    }

  QUIC_DBG (3, "Stream TX event");
  quic_ack_rx_data (stream_session);
  stream = ctx->stream;
  if (!quicly_sendstate_is_open (&stream->sendstate))
    {
      QUIC_ERR ("Warning: tried to send on closed stream");
      return -1;
    }

  stream_data = (quic_stream_data_t *) stream->data;
  max_deq = svm_fifo_max_dequeue (stream_session->tx_fifo);
  QUIC_ASSERT (max_deq >= stream_data->app_tx_data_len);
  if (max_deq == stream_data->app_tx_data_len)
    {
      QUIC_DBG (3, "TX but no data %d / %d", max_deq,
		stream_data->app_tx_data_len);
      return 0;
    }
  stream_data->app_tx_data_len = max_deq;
  rv = quicly_stream_sync_sendbuf (stream, 1);
  QUIC_ASSERT (!rv);

tx_end:
  quic_send_packets (ctx);
  return 0;
}

/*
 * Returns 0 if a matching connection is found and is on the right thread.
 * Otherwise returns -1.
 * If a connection is found, even on the wrong thread, ctx_thread and ctx_index
 * will be set.
 */
static inline int
quic_find_packet_ctx (quic_rx_packet_ctx_t * pctx, u32 caller_thread_index)
{
  clib_bihash_kv_16_8_t kv;
  clib_bihash_16_8_t *h;
  quic_ctx_t *ctx;
  u32 index, thread_id;

  h = &quic_main.connection_hash;
  quic_make_connection_key (&kv, &pctx->packet.cid.dest.plaintext);
  QUIC_DBG (3, "Searching conn with id %lu %lu", kv.key[0], kv.key[1]);

  if (clib_bihash_search_16_8 (h, &kv, &kv))
    {
      QUIC_DBG (3, "connection not found");
      return QUIC_PACKET_TYPE_NONE;
    }

  index = kv.value & UINT32_MAX;
  thread_id = kv.value >> 32;
  /* Check if this connection belongs to this thread, otherwise
   * ask for it to be moved */
  if (thread_id != caller_thread_index)
    {
      QUIC_DBG (2, "Connection is on wrong thread");
      /* Cannot make full check with quicly_is_destination... */
      pctx->ctx_index = index;
      pctx->thread_index = thread_id;
      return QUIC_PACKET_TYPE_MIGRATE;
    }
  ctx = quic_ctx_get (index, vlib_get_thread_index ());
  if (!ctx->conn)
    {
      QUIC_ERR ("ctx has no conn");
      return QUIC_PACKET_TYPE_NONE;
    }
  if (!quicly_is_destination (ctx->conn, NULL, &pctx->sa, &pctx->packet))
    return QUIC_PACKET_TYPE_NONE;

  QUIC_DBG (3, "Connection found");
  pctx->ctx_index = index;
  pctx->thread_index = thread_id;
  return QUIC_PACKET_TYPE_RECEIVE;
}

static void
quic_accept_connection (quic_rx_packet_ctx_t * pctx)
{
  quicly_context_t *quicly_ctx;
  session_t *quic_session;
  clib_bihash_kv_16_8_t kv;
  app_worker_t *app_wrk;
  quicly_conn_t *conn;
  quic_ctx_t *ctx;
  quic_ctx_t *lctx;
  int rv;

  /* new connection, accept and create context if packet is valid
   * TODO: check if socket is actually listening? */
  ctx = quic_ctx_get (pctx->ctx_index, pctx->thread_index);
  if (ctx->c_s_index != QUIC_SESSION_INVALID)
    {
      QUIC_DBG (2, "already accepted ctx 0x%x", ctx->c_s_index);
      return;
    }

  quicly_ctx = quic_get_quicly_ctx_from_ctx (ctx);
  if ((rv = quicly_accept (&conn, quicly_ctx, NULL, &pctx->sa,
			   &pctx->packet, NULL,
			   &quic_main.wrk_ctx[pctx->thread_index].next_cid,
			   NULL)))
    {
      /* Invalid packet, pass */
      assert (conn == NULL);
      QUIC_ERR ("Accept failed with %U", quic_format_err, rv);
      /* TODO: cleanup created quic ctx and UDP session */
      return;
    }
  assert (conn != NULL);

  ++quic_main.wrk_ctx[pctx->thread_index].next_cid.master_id;
  /* Save ctx handle in quicly connection */
  quic_store_conn_ctx (conn, ctx);
  ctx->conn = conn;

  quic_session = session_alloc (ctx->c_thread_index);
  QUIC_DBG (2, "Allocated quic_session, 0x%lx ctx %u",
	    session_handle (quic_session), ctx->c_c_index);
  quic_session->session_state = SESSION_STATE_LISTENING;
  ctx->c_s_index = quic_session->session_index;

  lctx = quic_ctx_get (ctx->listener_ctx_id, 0);

  quic_session->app_wrk_index = lctx->parent_app_wrk_id;
  quic_session->connection_index = ctx->c_c_index;
  quic_session->session_type =
    session_type_from_proto_and_ip (TRANSPORT_PROTO_QUIC, ctx->udp_is_ip4);
  quic_session->listener_handle = lctx->c_s_index;

  /* Register connection in connections map */
  quic_make_connection_key (&kv, quicly_get_master_id (conn));
  kv.value = ((u64) pctx->thread_index) << 32 | (u64) pctx->ctx_index;
  clib_bihash_add_del_16_8 (&quic_main.connection_hash, &kv, 1 /* is_add */ );
  QUIC_DBG (2, "Registering conn with id %lu %lu", kv.key[0], kv.key[1]);

  /* If notify fails, reset connection immediatly */
  if ((rv = app_worker_init_accepted (quic_session)))
    {
      QUIC_ERR ("failed to allocate fifos");
      quic_proto_on_close (pctx->ctx_index, pctx->thread_index);
      return;
    }

  app_wrk = app_worker_get (quic_session->app_wrk_index);
  if ((rv = app_worker_accept_notify (app_wrk, quic_session)))
    {
      QUIC_ERR ("failed to notify accept worker app");
      quic_proto_on_close (pctx->ctx_index, pctx->thread_index);
      return;
    }

  ctx->conn_state = QUIC_CONN_STATE_READY;
}

static int
quic_reset_connection (u64 udp_session_handle, quic_rx_packet_ctx_t * pctx)
{
  /* short header packet; potentially a dead connection. No need to check the
   * length of the incoming packet, because loop is prevented by authenticating
   * the CID (by checking node_id and thread_id). If the peer is also sending a
   * reset, then the next CID is highly likely to contain a non-authenticating
   * CID, ... */
  QUIC_DBG (2, "Sending stateless reset");
  int rv;
  quicly_datagram_t *dgram;
  session_t *udp_session;
  quicly_context_t *quicly_ctx;
  if (pctx->packet.cid.dest.plaintext.node_id != 0
      || pctx->packet.cid.dest.plaintext.thread_id != 0)
    return 0;
  quicly_ctx = quic_get_quicly_ctx_from_udp (udp_session_handle);
  dgram = quicly_send_stateless_reset (quicly_ctx, &pctx->sa, NULL,
				       &pctx->packet.cid.dest.plaintext);
  if (dgram == NULL)
    return 1;
  udp_session = session_get_from_handle (udp_session_handle);
  rv = quic_send_datagram (udp_session, dgram);
  quic_set_udp_tx_evt (udp_session);
  return rv;
}

static int
quic_process_one_rx_packet (u64 udp_session_handle, svm_fifo_t * f,
			    u32 fifo_offset, quic_rx_packet_ctx_t * pctx)
{
  size_t plen;
  u32 full_len, ret;
  u32 thread_index = vlib_get_thread_index ();
  u32 cur_deq = svm_fifo_max_dequeue (f) - fifo_offset;
  quicly_context_t *quicly_ctx;
  session_t *udp_session;
  int rv;

  ret = svm_fifo_peek (f, fifo_offset,
		       SESSION_CONN_HDR_LEN, (u8 *) & pctx->ph);
  QUIC_ASSERT (ret == SESSION_CONN_HDR_LEN);
  QUIC_ASSERT (pctx->ph.data_offset == 0);
  full_len = pctx->ph.data_length + SESSION_CONN_HDR_LEN;
  if (full_len > cur_deq)
    {
      QUIC_ERR ("Not enough data in fifo RX");
      return 1;
    }

  /* Quicly can read len bytes from the fifo at offset:
   * ph.data_offset + SESSION_CONN_HDR_LEN */
  ret = svm_fifo_peek (f, SESSION_CONN_HDR_LEN + fifo_offset,
		       pctx->ph.data_length, pctx->data);
  if (ret != pctx->ph.data_length)
    {
      QUIC_ERR ("Not enough data peeked in RX");
      return 1;
    }

  quic_increment_counter (QUIC_ERROR_RX_PACKETS, 1);
  quic_build_sockaddr (&pctx->sa, &pctx->salen, &pctx->ph.rmt_ip,
		       pctx->ph.rmt_port, pctx->ph.is_ip4);
  quicly_ctx = quic_get_quicly_ctx_from_udp (udp_session_handle);
  plen = quicly_decode_packet (quicly_ctx, &pctx->packet,
			       pctx->data, pctx->ph.data_length);

  if (plen == SIZE_MAX)
    {
      return 1;
    }

  rv = quic_find_packet_ctx (pctx, thread_index);
  if (rv == QUIC_PACKET_TYPE_RECEIVE)
    {
      pctx->ptype = QUIC_PACKET_TYPE_RECEIVE;
      if (quic_main.vnet_crypto_enabled)
	{
	  quic_ctx_t *qctx = quic_ctx_get (pctx->ctx_index, thread_index);
	  quic_crypto_decrypt_packet (qctx, pctx);
	}
      return 0;
    }
  else if (rv == QUIC_PACKET_TYPE_MIGRATE)
    {
      pctx->ptype = QUIC_PACKET_TYPE_MIGRATE;
      /*  Connection found but on wrong thread, ask move */
    }
  else if (QUICLY_PACKET_IS_LONG_HEADER (pctx->packet.octets.base[0]))
    {
      pctx->ptype = QUIC_PACKET_TYPE_ACCEPT;
      udp_session = session_get_from_handle (udp_session_handle);
      pctx->ctx_index = udp_session->opaque;
      pctx->thread_index = thread_index;
    }
  else
    {
      pctx->ptype = QUIC_PACKET_TYPE_RESET;
    }
  return 1;
}

static int
quic_udp_session_rx_callback (session_t * udp_session)
{
  /*  Read data from UDP rx_fifo and pass it to the quicly conn. */
  quic_main_t *qm = &quic_main;
  quic_ctx_t *ctx = NULL, *prev_ctx = NULL;
  svm_fifo_t *f = udp_session->rx_fifo;
  u32 max_deq;
  u64 udp_session_handle = session_handle (udp_session);
  int rv = 0;
  u32 thread_index = vlib_get_thread_index ();
  u32 cur_deq, fifo_offset, max_packets, i;

  quic_rx_packet_ctx_t packets_ctx[QUIC_RCV_MAX_BATCH_PACKETS];

  if (udp_session->flags & SESSION_F_IS_MIGRATING)
    {
      QUIC_DBG (3, "RX on migrating udp session");
      return 0;
    }

rx_start:
  max_deq = svm_fifo_max_dequeue (f);
  if (max_deq == 0)
    return 0;

  fifo_offset = 0;
  max_packets = QUIC_RCV_MAX_BATCH_PACKETS;

#if CLIB_DEBUG > 0
  clib_memset (packets_ctx, 0xfa,
	       QUIC_RCV_MAX_BATCH_PACKETS * sizeof (quic_rx_packet_ctx_t));
#endif

  for (i = 0; i < max_packets; i++)
    {
      packets_ctx[i].thread_index = UINT32_MAX;
      packets_ctx[i].ctx_index = UINT32_MAX;
      packets_ctx[i].ptype = QUIC_PACKET_TYPE_DROP;

      cur_deq = max_deq - fifo_offset;
      if (cur_deq == 0)
	{
	  max_packets = i + 1;
	  break;
	}
      if (cur_deq < SESSION_CONN_HDR_LEN)
	{
	  fifo_offset = max_deq;
	  max_packets = i + 1;
	  QUIC_ERR ("Fifo %d < header size in RX", cur_deq);
	  break;
	}
      rv = quic_process_one_rx_packet (udp_session_handle, f,
				       fifo_offset, &packets_ctx[i]);
      if (packets_ctx[i].ptype != QUIC_PACKET_TYPE_MIGRATE)
	fifo_offset += SESSION_CONN_HDR_LEN + packets_ctx[i].ph.data_length;
      if (rv)
	{
	  max_packets = i + 1;
	  break;
	}
    }

  quic_crypto_batch_rx_packets (&qm->
				wrk_ctx[thread_index].crypto_context_batch);

  for (i = 0; i < max_packets; i++)
    {
      switch (packets_ctx[i].ptype)
	{
	case QUIC_PACKET_TYPE_RECEIVE:
	  ctx = quic_ctx_get (packets_ctx[i].ctx_index, thread_index);
	  rv = quicly_receive (ctx->conn, NULL, &packets_ctx[i].sa,
			       &packets_ctx[i].packet);
	  if (rv && rv != QUICLY_ERROR_PACKET_IGNORED)
	    {
	      QUIC_ERR ("quicly_receive return error %U",
			quic_format_err, rv);
	    }
	  break;
	case QUIC_PACKET_TYPE_ACCEPT:
	  quic_accept_connection (&packets_ctx[i]);
	  break;
	case QUIC_PACKET_TYPE_RESET:
	  quic_reset_connection (udp_session_handle, &packets_ctx[i]);
	  break;
	}
    }
  ctx = prev_ctx = NULL;
  for (i = 0; i < max_packets; i++)
    {
      prev_ctx = ctx;
      switch (packets_ctx[i].ptype)
	{
	case QUIC_PACKET_TYPE_RECEIVE:
	  ctx = quic_ctx_get (packets_ctx[i].ctx_index,
			      packets_ctx[i].thread_index);
	  quic_check_quic_session_connected (ctx);
	  ctx = quic_ctx_get (packets_ctx[i].ctx_index,
			      packets_ctx[i].thread_index);
	  break;
	case QUIC_PACKET_TYPE_ACCEPT:
	  ctx = quic_ctx_get (packets_ctx[i].ctx_index,
			      packets_ctx[i].thread_index);
	  break;
	default:
	  continue;		/* this exits the for loop since other packet types are
				   necessarily the last in the batch */
	}
      if (ctx != prev_ctx)
	quic_send_packets (ctx);
    }

  udp_session = session_get_from_handle (udp_session_handle);	/*  session alloc might have happened */
  f = udp_session->rx_fifo;
  svm_fifo_dequeue_drop (f, fifo_offset);

  if (svm_fifo_max_dequeue (f))
    goto rx_start;

  return 0;
}

always_inline void
quic_common_get_transport_endpoint (quic_ctx_t * ctx,
				    transport_endpoint_t * tep, u8 is_lcl)
{
  session_t *udp_session;
  if (!quic_ctx_is_stream (ctx))
    {
      udp_session = session_get_from_handle (ctx->udp_session_handle);
      session_get_endpoint (udp_session, tep, is_lcl);
    }
}

static void
quic_get_transport_listener_endpoint (u32 listener_index,
				      transport_endpoint_t * tep, u8 is_lcl)
{
  quic_ctx_t *ctx;
  app_listener_t *app_listener;
  session_t *udp_listen_session;
  ctx = quic_ctx_get (listener_index, vlib_get_thread_index ());
  if (quic_ctx_is_listener (ctx))
    {
      app_listener = app_listener_get_w_handle (ctx->udp_session_handle);
      udp_listen_session = app_listener_get_session (app_listener);
      return session_get_endpoint (udp_listen_session, tep, is_lcl);
    }
  quic_common_get_transport_endpoint (ctx, tep, is_lcl);
}

static void
quic_get_transport_endpoint (u32 ctx_index, u32 thread_index,
			     transport_endpoint_t * tep, u8 is_lcl)
{
  quic_ctx_t *ctx;
  ctx = quic_ctx_get (ctx_index, thread_index);
  quic_common_get_transport_endpoint (ctx, tep, is_lcl);
}

/* *INDENT-OFF* */
static session_cb_vft_t quic_app_cb_vft = {
  .session_accept_callback = quic_udp_session_accepted_callback,
  .session_disconnect_callback = quic_udp_session_disconnect_callback,
  .session_connected_callback = quic_udp_session_connected_callback,
  .session_reset_callback = quic_udp_session_reset_callback,
  .session_migrate_callback = quic_udp_session_migrate_callback,
  .add_segment_callback = quic_add_segment_callback,
  .del_segment_callback = quic_del_segment_callback,
  .builtin_app_rx_callback = quic_udp_session_rx_callback,
  .session_cleanup_callback = quic_udp_session_cleanup_callback,
  .app_cert_key_pair_delete_callback = quic_app_cert_key_pair_delete_callback,
};

static const transport_proto_vft_t quic_proto = {
  .connect = quic_connect,
  .close = quic_proto_on_close,
  .start_listen = quic_start_listen,
  .stop_listen = quic_stop_listen,
  .get_connection = quic_connection_get,
  .get_listener = quic_listener_get,
  .update_time = quic_update_time,
  .app_rx_evt = quic_custom_app_rx_callback,
  .custom_tx = quic_custom_tx_callback,
  .format_connection = format_quic_connection,
  .format_half_open = format_quic_half_open,
  .format_listener = format_quic_listener,
  .get_transport_endpoint = quic_get_transport_endpoint,
  .get_transport_listener_endpoint = quic_get_transport_listener_endpoint,
  .transport_options = {
    .name = "quic",
    .short_name = "Q",
    .tx_type = TRANSPORT_TX_INTERNAL,
    .service_type = TRANSPORT_SERVICE_APP,
  },
};
/* *INDENT-ON* */

static quicly_stream_open_t on_stream_open = { quic_on_stream_open };
static quicly_closed_by_peer_t on_closed_by_peer = { quic_on_closed_by_peer };
static quicly_now_t quicly_vpp_now_cb = { quic_get_time };

static void
quic_register_cipher_suite (crypto_engine_type_t type,
			    ptls_cipher_suite_t ** ciphers)
{
  quic_main_t *qm = &quic_main;
  vec_validate (qm->quic_ciphers, type);
  clib_bitmap_set (qm->available_crypto_engines, type, 1);
  qm->quic_ciphers[type] = ciphers;
}

static void
quic_update_fifo_size ()
{
  quic_main_t *qm = &quic_main;
  segment_manager_props_t *seg_mgr_props =
    application_get_segment_manager_properties (qm->app_index);

  if (!seg_mgr_props)
    {
      clib_warning
	("error while getting segment_manager_props_t, can't update fifo-size");
      return;
    }

  seg_mgr_props->tx_fifo_size = qm->udp_fifo_size;
  seg_mgr_props->rx_fifo_size = qm->udp_fifo_size;
}

static clib_error_t *
quic_init (vlib_main_t * vm)
{
  u32 segment_size = 256 << 20;
  vlib_thread_main_t *vtm = vlib_get_thread_main ();
  tw_timer_wheel_1t_3w_1024sl_ov_t *tw;
  vnet_app_attach_args_t _a, *a = &_a;
  u64 options[APP_OPTIONS_N_OPTIONS];
  quic_main_t *qm = &quic_main;
  u32 num_threads, i;

  num_threads = 1 /* main thread */  + vtm->n_threads;

  clib_memset (a, 0, sizeof (*a));
  clib_memset (options, 0, sizeof (options));

  a->session_cb_vft = &quic_app_cb_vft;
  a->api_client_index = APP_INVALID_INDEX;
  a->options = options;
  a->name = format (0, "quic");
  a->options[APP_OPTIONS_SEGMENT_SIZE] = segment_size;
  a->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = segment_size;
  a->options[APP_OPTIONS_RX_FIFO_SIZE] = qm->udp_fifo_size;
  a->options[APP_OPTIONS_TX_FIFO_SIZE] = qm->udp_fifo_size;
  a->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = qm->udp_fifo_prealloc;
  a->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_IS_BUILTIN;
  a->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
  a->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_IS_TRANSPORT_APP;

  if (vnet_application_attach (a))
    {
      clib_warning ("failed to attach quic app");
      return clib_error_return (0, "failed to attach quic app");
    }

  vec_validate (qm->ctx_pool, num_threads - 1);
  vec_validate (qm->wrk_ctx, num_threads - 1);

  for (i = 0; i < num_threads; i++)
    {
      qm->wrk_ctx[i].next_cid.thread_id = i;
      tw = &qm->wrk_ctx[i].timer_wheel;
      tw_timer_wheel_init_1t_3w_1024sl_ov (tw, quic_expired_timers_dispatch,
					   1e-3 /* timer period 1ms */ , ~0);
      tw->last_run_time = vlib_time_now (vlib_get_main ());
      clib_bihash_init_24_8 (&qm->wrk_ctx[i].crypto_context_hash,
			     "quic crypto contexts", 64, 128 << 10);

      qm->wrk_ctx[i].crypto_context_batch.nb_rx_packets = 0;
      qm->wrk_ctx[i].crypto_context_batch.nb_tx_packets = 0;
    }

  clib_bihash_init_16_8 (&qm->connection_hash, "quic connections", 1024,
			 4 << 20);

  qm->app_index = a->app_index;
  qm->tstamp_ticks_per_clock = vm->clib_time.seconds_per_clock
    / QUIC_TSTAMP_RESOLUTION;
  qm->session_cache.super.cb = quic_encrypt_ticket_cb;

  transport_register_protocol (TRANSPORT_PROTO_QUIC, &quic_proto,
			       FIB_PROTOCOL_IP4, ~0);
  transport_register_protocol (TRANSPORT_PROTO_QUIC, &quic_proto,
			       FIB_PROTOCOL_IP6, ~0);

  clib_bitmap_alloc (qm->available_crypto_engines,
		     app_crypto_engine_n_types ());
  quic_register_cipher_suite (CRYPTO_ENGINE_VPP, quic_crypto_cipher_suites);
  quic_register_cipher_suite (CRYPTO_ENGINE_PICOTLS,
			      ptls_openssl_cipher_suites);
  qm->default_crypto_engine = CRYPTO_ENGINE_VPP;
  qm->max_packets_per_key = DEFAULT_MAX_PACKETS_PER_KEY;
  clib_rwlock_init (&qm->crypto_keys_quic_rw_lock);

  vnet_crypto_main_t *cm = &crypto_main;
  if (vec_len (cm->engines) == 0)
    qm->vnet_crypto_enabled = 0;
  else
    qm->vnet_crypto_enabled = 1;

  vec_free (a->name);
  return 0;
}

VLIB_INIT_FUNCTION (quic_init);

static clib_error_t *
quic_plugin_crypto_command_fn (vlib_main_t * vm,
			       unformat_input_t * input,
			       vlib_cli_command_t * cmd)
{
  quic_main_t *qm = &quic_main;
  if (unformat_check_input (input) == UNFORMAT_END_OF_INPUT)
    return clib_error_return (0, "unknown input '%U'",
			      format_unformat_error, input);
  if (unformat (input, "vpp"))
    qm->default_crypto_engine = CRYPTO_ENGINE_VPP;
  else if (unformat (input, "picotls"))
    qm->default_crypto_engine = CRYPTO_ENGINE_PICOTLS;
  else
    return clib_error_return (0, "unknown input '%U'",
			      format_unformat_error, input);
  return 0;
}

u64 quic_fifosize = 0;
static clib_error_t *
quic_plugin_set_fifo_size_command_fn (vlib_main_t * vm,
				      unformat_input_t * input,
				      vlib_cli_command_t * cmd)
{
  quic_main_t *qm = &quic_main;
  unformat_input_t _line_input, *line_input = &_line_input;
  uword tmp;

  if (!unformat_user (input, unformat_line_input, line_input))
    return 0;

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "%U", unformat_memory_size, &tmp))
	{
	  if (tmp >= 0x100000000ULL)
	    {
	      return clib_error_return
		(0, "fifo-size %llu (0x%llx) too large", tmp, tmp);
	    }
	  qm->udp_fifo_size = tmp;
	  quic_update_fifo_size ();
	}
      else
	return clib_error_return (0, "unknown input '%U'",
				  format_unformat_error, line_input);
    }

  return 0;
}

static inline u64
quic_get_counter_value (u32 event_code)
{
  vlib_node_t *n;
  vlib_main_t *vm;
  vlib_error_main_t *em;

  u32 code, i;
  u64 c, sum = 0;
  int index = 0;

  vm = vlib_get_main ();
  em = &vm->error_main;
  n = vlib_get_node (vm, quic_input_node.index);
  code = event_code;
  /* *INDENT-OFF* */
  foreach_vlib_main(({
    em = &this_vlib_main->error_main;
    i = n->error_heap_index + code;
    c = em->counters[i];

    if (i < vec_len (em->counters_last_clear))
       c -= em->counters_last_clear[i];
    sum += c;
    index++;
  }));
  /* *INDENT-ON* */
  return sum;
}

static void
quic_show_aggregated_stats (vlib_main_t * vm)
{
  u32 num_workers = vlib_num_workers ();
  quic_main_t *qm = &quic_main;
  quic_ctx_t *ctx = NULL;
  quicly_stats_t st, agg_stats;
  u32 i, nconn = 0, nstream = 0;

  clib_memset (&agg_stats, 0, sizeof (agg_stats));
  for (i = 0; i < num_workers + 1; i++)
    {
      /* *INDENT-OFF* */
      pool_foreach (ctx, qm->ctx_pool[i],
      ({
	if (quic_ctx_is_conn (ctx) && ctx->conn)
	  {
	    quicly_get_stats (ctx->conn, &st);
	    agg_stats.rtt.smoothed += st.rtt.smoothed;
	    agg_stats.rtt.minimum += st.rtt.minimum;
	    agg_stats.rtt.variance += st.rtt.variance;
	    agg_stats.num_packets.received += st.num_packets.received;
	    agg_stats.num_packets.sent += st.num_packets.sent;
	    agg_stats.num_packets.lost += st.num_packets.lost;
	    agg_stats.num_packets.ack_received += st.num_packets.ack_received;
	    agg_stats.num_bytes.received += st.num_bytes.received;
	    agg_stats.num_bytes.sent += st.num_bytes.sent;
	    nconn++;
	  }
	else if (quic_ctx_is_stream (ctx))
	  nstream++;
      }));
      /* *INDENT-ON* */
    }
  vlib_cli_output (vm, "-------- Connections --------");
  vlib_cli_output (vm, "Current:         %u", nconn);
  vlib_cli_output (vm, "Opened:          %d",
		   quic_get_counter_value (QUIC_ERROR_OPENED_CONNECTION));
  vlib_cli_output (vm, "Closed:          %d",
		   quic_get_counter_value (QUIC_ERROR_CLOSED_CONNECTION));
  vlib_cli_output (vm, "---------- Streams ----------");
  vlib_cli_output (vm, "Current:         %u", nstream);
  vlib_cli_output (vm, "Opened:          %d",
		   quic_get_counter_value (QUIC_ERROR_OPENED_STREAM));
  vlib_cli_output (vm, "Closed:          %d",
		   quic_get_counter_value (QUIC_ERROR_CLOSED_STREAM));
  vlib_cli_output (vm, "---------- Packets ----------");
  vlib_cli_output (vm, "RX Total:        %d",
		   quic_get_counter_value (QUIC_ERROR_RX_PACKETS));
  vlib_cli_output (vm, "RX 0RTT:         %d",
		   quic_get_counter_value (QUIC_ERROR_ZERO_RTT_RX_PACKETS));
  vlib_cli_output (vm, "RX 1RTT:         %d",
		   quic_get_counter_value (QUIC_ERROR_ONE_RTT_RX_PACKETS));
  vlib_cli_output (vm, "TX Total:        %d",
		   quic_get_counter_value (QUIC_ERROR_TX_PACKETS));
  vlib_cli_output (vm, "----------- Stats -----------");
  vlib_cli_output (vm, "Min      RTT     %f",
		   nconn > 0 ? agg_stats.rtt.minimum / nconn : 0);
  vlib_cli_output (vm, "Smoothed RTT     %f",
		   nconn > 0 ? agg_stats.rtt.smoothed / nconn : 0);
  vlib_cli_output (vm, "Variance on RTT  %f",
		   nconn > 0 ? agg_stats.rtt.variance / nconn : 0);
  vlib_cli_output (vm, "Packets Received %lu",
		   agg_stats.num_packets.received);
  vlib_cli_output (vm, "Packets Sent     %lu", agg_stats.num_packets.sent);
  vlib_cli_output (vm, "Packets Lost     %lu", agg_stats.num_packets.lost);
  vlib_cli_output (vm, "Packets Acks     %lu",
		   agg_stats.num_packets.ack_received);
  vlib_cli_output (vm, "RX bytes         %lu", agg_stats.num_bytes.received);
  vlib_cli_output (vm, "TX bytes         %lu", agg_stats.num_bytes.sent);
}

static u8 *
quic_format_quicly_conn_id (u8 * s, va_list * args)
{
  quicly_cid_plaintext_t *mid = va_arg (*args, quicly_cid_plaintext_t *);
  s = format (s, "C%x_%x", mid->master_id, mid->thread_id);
  return s;
}

static u8 *
quic_format_quicly_stream_id (u8 * s, va_list * args)
{
  quicly_stream_t *stream = va_arg (*args, quicly_stream_t *);
  s =
    format (s, "%U S%lx", quic_format_quicly_conn_id,
	    quicly_get_master_id (stream->conn), stream->stream_id);
  return s;
}

static u8 *
quic_format_listener_ctx (u8 * s, va_list * args)
{
  quic_ctx_t *ctx = va_arg (*args, quic_ctx_t *);
  s = format (s, "[#%d][%x][Listener]", ctx->c_thread_index, ctx->c_c_index);
  return s;
}

static u8 *
quic_format_connection_ctx (u8 * s, va_list * args)
{
  quic_ctx_t *ctx = va_arg (*args, quic_ctx_t *);
  quicly_stats_t quicly_stats;

  s = format (s, "[#%d][%x]", ctx->c_thread_index, ctx->c_c_index);

  if (!ctx->conn)
    {
      s = format (s, "- no conn -\n");
      return s;
    }
  s = format (s, "[%U]",
	      quic_format_quicly_conn_id, quicly_get_master_id (ctx->conn));
  quicly_get_stats (ctx->conn, &quicly_stats);

  s = format (s, "[RTT >%3d, ~%3d, V%3d, last %3d]",
	      quicly_stats.rtt.minimum, quicly_stats.rtt.smoothed,
	      quicly_stats.rtt.variance, quicly_stats.rtt.latest);
  s = format (s, " TX:%d RX:%d loss:%d ack:%d",
	      quicly_stats.num_packets.sent,
	      quicly_stats.num_packets.received,
	      quicly_stats.num_packets.lost,
	      quicly_stats.num_packets.ack_received);
  return s;
}

static u8 *
quic_format_stream_ctx (u8 * s, va_list * args)
{
  quic_ctx_t *ctx = va_arg (*args, quic_ctx_t *);
  session_t *stream_session;
  quicly_stream_t *stream = ctx->stream;
  u32 txs, rxs;

  s = format (s, "[#%d][%x]", ctx->c_thread_index, ctx->c_c_index);
  s = format (s, "[%U]", quic_format_quicly_stream_id, stream);

  stream_session = session_get_if_valid (ctx->c_s_index, ctx->c_thread_index);
  if (!stream_session)
    {
      s = format (s, "- no session -\n");
      return s;
    }
  txs = svm_fifo_max_dequeue (stream_session->tx_fifo);
  rxs = svm_fifo_max_dequeue (stream_session->rx_fifo);
  s = format (s, "[rx %d tx %d]\n", rxs, txs);
  return s;
}

static clib_error_t *
quic_show_connections_command_fn (vlib_main_t * vm,
				  unformat_input_t * input,
				  vlib_cli_command_t * cmd)
{
  unformat_input_t _line_input, *line_input = &_line_input;
  u8 show_listeners = 0, show_conn = 0, show_stream = 0;
  u32 num_workers = vlib_num_workers ();
  quic_main_t *qm = &quic_main;
  clib_error_t *error = 0;
  quic_ctx_t *ctx = NULL;

  session_cli_return_if_not_enabled ();

  if (!unformat_user (input, unformat_line_input, line_input))
    {
      quic_show_aggregated_stats (vm);
      return 0;
    }

  while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (line_input, "listener"))
	show_listeners = 1;
      else if (unformat (line_input, "conn"))
	show_conn = 1;
      else if (unformat (line_input, "stream"))
	show_stream = 1;
      else
	{
	  error = clib_error_return (0, "unknown input `%U'",
				     format_unformat_error, line_input);
	  goto done;
	}
    }

  for (int i = 0; i < num_workers + 1; i++)
    {
      /* *INDENT-OFF* */
      pool_foreach (ctx, qm->ctx_pool[i],
      ({
        if (quic_ctx_is_stream (ctx) && show_stream)
          vlib_cli_output (vm, "%U", quic_format_stream_ctx, ctx);
        else if (quic_ctx_is_listener (ctx) && show_listeners)
          vlib_cli_output (vm, "%U", quic_format_listener_ctx, ctx);
	else if (quic_ctx_is_conn (ctx) && show_conn)
          vlib_cli_output (vm, "%U", quic_format_connection_ctx, ctx);
      }));
      /* *INDENT-ON* */
    }

done:
  unformat_free (line_input);
  return error;
}

/* *INDENT-OFF* */
VLIB_CLI_COMMAND (quic_plugin_crypto_command, static) =
{
  .path = "quic set crypto api",
  .short_help = "quic set crypto api [picotls, vpp]",
  .function = quic_plugin_crypto_command_fn,
};
VLIB_CLI_COMMAND(quic_plugin_set_fifo_size_command, static)=
{
  .path = "quic set fifo-size",
  .short_help = "quic set fifo-size N[K|M|G] (default 64K)",
  .function = quic_plugin_set_fifo_size_command_fn,
};
VLIB_CLI_COMMAND(quic_show_ctx_command, static)=
{
  .path = "show quic",
  .short_help = "show quic",
  .function = quic_show_connections_command_fn,
};
VLIB_CLI_COMMAND (quic_list_crypto_context_command, static) =
{
  .path = "show quic crypto context",
  .short_help = "list quic crypto contextes",
  .function = quic_list_crypto_context_command_fn,
};
VLIB_CLI_COMMAND (quic_set_max_packets_per_key, static) =
{
  .path = "set quic max_packets_per_key",
  .short_help = "set quic max_packets_per_key 16777216",
  .function = quic_set_max_packets_per_key_fn,
};
VLIB_PLUGIN_REGISTER () =
{
  .version = VPP_BUILD_VER,
  .description = "Quic transport protocol",
  .default_disabled = 1,
};
/* *INDENT-ON* */

static clib_error_t *
quic_config_fn (vlib_main_t * vm, unformat_input_t * input)
{
  quic_main_t *qm = &quic_main;
  uword tmp;
  u32 i;

  qm->udp_fifo_size = QUIC_DEFAULT_FIFO_SIZE;
  qm->udp_fifo_prealloc = 0;
  qm->connection_timeout = QUIC_DEFAULT_CONN_TIMEOUT;
  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (input, "fifo-size %U", unformat_memory_size, &tmp))
	{
	  if (tmp >= 0x100000000ULL)
	    {
	      return clib_error_return (0,
					"fifo-size %llu (0x%llx) too large",
					tmp, tmp);
	    }
	  qm->udp_fifo_size = tmp;
	}
      else if (unformat (input, "conn-timeout %u", &i))
	qm->connection_timeout = i;
      else if (unformat (input, "fifo-prealloc %u", &i))
	qm->udp_fifo_prealloc = i;
      else
	return clib_error_return (0, "unknown input '%U'",
				  format_unformat_error, input);
    }

  return 0;
}

VLIB_EARLY_CONFIG_FUNCTION (quic_config_fn, "quic");

static uword
quic_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
	      vlib_frame_t * frame)
{
  return 0;
}

/* *INDENT-OFF* */
VLIB_REGISTER_NODE (quic_input_node) =
{
  .function = quic_node_fn,
  .name = "quic-input",
  .vector_size = sizeof (u32),
  .type = VLIB_NODE_TYPE_INTERNAL,
  .n_errors = ARRAY_LEN (quic_error_strings),
  .error_strings = quic_error_strings,
};
/* *INDENT-ON* */

/*
 * fd.io coding-style-patch-verification: ON
 *
 * Local Variables:
 * eval: (c-set-style "gnu")
 * End:
 */