diff options
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/Makefile.am | 4 | ||||
-rw-r--r-- | src/plugins/kubeproxy.am | 38 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kp.api | 81 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kp.c | 974 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kp.h | 473 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kp_api.c | 249 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kp_cli.c | 347 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kp_node.c | 839 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kp_plugin_doc.md | 105 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kp_test.c | 268 | ||||
-rw-r--r-- | src/plugins/kubeproxy/kphash.h | 216 |
11 files changed, 3594 insertions, 0 deletions
diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index 286fd1f9885..746b4e0662c 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -59,6 +59,10 @@ if ENABLE_IXGE_PLUGIN include ixge.am endif +if ENABLE_KUBEPROXY_PLUGIN +include kubeproxy.am +endif + if ENABLE_LB_PLUGIN include lb.am endif diff --git a/src/plugins/kubeproxy.am b/src/plugins/kubeproxy.am new file mode 100644 index 00000000000..50e7e2fc1a4 --- /dev/null +++ b/src/plugins/kubeproxy.am @@ -0,0 +1,38 @@ +# Copyright (c) 2017 Intel Corporation, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +vppapitestplugins_LTLIBRARIES += kubeproxy_test_plugin.la +vppplugins_LTLIBRARIES += kubeproxy_plugin.la + +kubeproxy_plugin_la_SOURCES = \ + kubeproxy/kp.c \ + kubeproxy/kp_node.c \ + kubeproxy/kp_cli.c \ + kubeproxy/kp_api.c + +BUILT_SOURCES += \ + kubeproxy/kp.api.h \ + kubeproxy/kp.api.json + +API_FILES += kubeproxy/kp.api + +noinst_HEADERS += \ + kubeproxy/kp.h \ + kubeproxy/kphash.h \ + kubeproxy/kp.api.h + +kubeproxy_test_plugin_la_SOURCES = \ + kubeproxy/kp_test.c \ + kubeproxy/kp_plugin.api.h + +# vi:syntax=automake diff --git a/src/plugins/kubeproxy/kp.api b/src/plugins/kubeproxy/kp.api new file mode 100644 index 00000000000..e8063c194b3 --- /dev/null +++ b/src/plugins/kubeproxy/kp.api @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2017 Intel and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +vl_api_version 1.0.0 + +/** \brief Configure Kube-proxy global parameters + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param sticky_buckets_per_core - Number of buckets *per worker thread* in the + established flow table (must be power of 2). + @param flow_timeout - Time in seconds after which, if no packet is received + for a given flow, the flow is removed from the established flow table. +*/ +autoreply define kp_conf +{ + u32 client_index; + u32 context; + u32 sticky_buckets_per_core; + u32 flow_timeout; +}; + +/** \brief Add a virtual address (or prefix) + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param ip_prefix - IP address (IPv4 in lower order 32 bits). + @param prefix_length - IP prefix length (96 + 'IPv4 prefix length' for IPv4). + @param is_ipv6 - Is IPv6 addresss. + @param port - service port; + @param target_port - Pod's port corresponding to specific service. + @param node_port - Node's port. + @param is_nat4 - DNAT is NAT44 (NAT64 otherwise). + @param new_flows_table_length - Size of the new connections flow table used + for this VIP (must be power of 2). + @param is_del - The VIP should be removed. +*/ +autoreply define kp_add_del_vip { + u32 client_index; + u32 context; + u8 ip_prefix[16]; + u8 prefix_length; + u8 is_ipv6; + u16 port; + u16 target_port; + u16 node_port; + u8 is_nat4; + u32 new_flows_table_length; + u8 is_del; +}; + +/** \brief Add a pod for a given VIP + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param vip_ip_prefix - VIP IP address (IPv4 in lower order 32 bits). + @param vip_ip_prefix - VIP IP prefix length (96 + 'IPv4 prefix length' for IPv4). + @param vip_is_ipv6 - VIP is IPv6 addresss. + @param pod_address - The pod's IP address (IPv4 in lower order 32 bits). + @param pod_is_ipv6 - Pod is IPv6 addresss. + @param is_del - The Pod should be removed. +*/ +autoreply define kp_add_del_pod { + u32 client_index; + u32 context; + u8 vip_ip_prefix[16]; + u8 vip_prefix_length; + u8 vip_is_ipv6; + u8 pod_address[16]; + u8 pod_is_ipv6; + u8 is_del; +}; diff --git a/src/plugins/kubeproxy/kp.c b/src/plugins/kubeproxy/kp.c new file mode 100644 index 00000000000..1a087e82073 --- /dev/null +++ b/src/plugins/kubeproxy/kp.c @@ -0,0 +1,974 @@ +/* + * Copyright (c) 2017 Intel and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or anated to in writing, software + * distributed under the License is distributed on an "POD IS" BPODIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <kubeproxy/kp.h> +#include <vnet/plugin/plugin.h> +#include <vpp/app/version.h> +#include <vnet/api_errno.h> +#include <vnet/udp/udp.h> + +//GC runs at most once every so many seconds +#define KP_GARBAGE_RUN 60 + +//After so many seconds. It is assumed that inter-core race condition will not occur. +#define KP_CONCURRENCY_TIMEOUT 10 + +kp_main_t kp_main; + +#define kp_get_writer_lock() do {} while(__sync_lock_test_and_set (kp_main.writer_lock, 1)) +#define kp_put_writer_lock() kp_main.writer_lock[0] = 0 + +static void kp_pod_stack (kp_pod_t *pod); + +void ip46_prefix_normalize(ip46_address_t *prefix, u8 plen) +{ + if (plen == 0) { + prefix->as_u64[0] = 0; + prefix->as_u64[1] = 0; + } else if (plen <= 64) { + prefix->as_u64[0] &= clib_host_to_net_u64(0xffffffffffffffffL << (64 - plen)); + prefix->as_u64[1] = 0; + } else { + prefix->as_u64[1] &= clib_host_to_net_u64(0xffffffffffffffffL << (128 - plen)); + } + +} + +uword unformat_ip46_prefix (unformat_input_t * input, va_list * args) +{ + ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); + u8 *len = va_arg (*args, u8 *); + ip46_type_t type = va_arg (*args, ip46_type_t); + + u32 l; + if ((type != IP46_TYPE_IP6) && unformat(input, "%U/%u", unformat_ip4_address, &ip46->ip4, &l)) { + if (l > 32) + return 0; + *len = l + 96; + ip46->pad[0] = ip46->pad[1] = ip46->pad[2] = 0; + } else if ((type != IP46_TYPE_IP4) && unformat(input, "%U/%u", unformat_ip6_address, &ip46->ip6, &l)) { + if (l > 128) + return 0; + *len = l; + } else { + return 0; + } + return 1; +} + +u8 *format_ip46_prefix (u8 * s, va_list * args) +{ + ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); + u32 len = va_arg (*args, u32); //va_arg cannot use u8 or u16 + ip46_type_t type = va_arg (*args, ip46_type_t); + + int is_ip4 = 0; + if (type == IP46_TYPE_IP4) + is_ip4 = 1; + else if (type == IP46_TYPE_IP6) + is_ip4 = 0; + else + is_ip4 = (len >= 96) && ip46_address_is_ip4(ip46); + + return is_ip4 ? + format(s, "%U/%d", format_ip4_address, &ip46->ip4, len - 96): + format(s, "%U/%d", format_ip6_address, &ip46->ip6, len); +} + +const static char * const kp_dpo_nat4_ip4[] = { "kp4-nat4" , NULL }; +const static char * const kp_dpo_nat4_ip6[] = { "kp6-nat4" , NULL }; +const static char* const * const kp_dpo_nat4_nodes[DPO_PROTO_NUM] = + { + [DPO_PROTO_IP4] = kp_dpo_nat4_ip4, + [DPO_PROTO_IP6] = kp_dpo_nat4_ip6, + }; + +const static char * const kp_dpo_nat6_ip4[] = { "kp4-nat6" , NULL }; +const static char * const kp_dpo_nat6_ip6[] = { "kp6-nat6" , NULL }; +const static char* const * const kp_dpo_nat6_nodes[DPO_PROTO_NUM] = + { + [DPO_PROTO_IP4] = kp_dpo_nat6_ip4, + [DPO_PROTO_IP6] = kp_dpo_nat6_ip6, + }; + +u32 kp_hash_time_now(vlib_main_t * vm) +{ + return (u32) (vlib_time_now(vm) + 10000); +} + +u8 *format_kp_main (u8 * s, va_list * args) +{ + vlib_thread_main_t *tm = vlib_get_thread_main(); + kp_main_t *kpm = &kp_main; + s = format(s, "kp_main"); + s = format(s, " #vips: %u\n", pool_elts(kpm->vips)); + s = format(s, " #pods: %u\n", pool_elts(kpm->pods) - 1); + + u32 thread_index; + for(thread_index = 0; thread_index < tm->n_vlib_mains; thread_index++ ) { + kp_hash_t *h = kpm->per_cpu[thread_index].sticky_ht; + if (h) { + s = format(s, "core %d\n", thread_index); + s = format(s, " timeout: %ds\n", h->timeout); + s = format(s, " usage: %d / %d\n", kp_hash_elts(h, kp_hash_time_now(vlib_get_main())), kp_hash_size(h)); + } + } + + return s; +} + +static char *kp_vip_type_strings[] = { + [KP_VIP_TYPE_IP4_NAT44] = "ip4-nat44", + [KP_VIP_TYPE_IP4_NAT46] = "ip4-nat46", + [KP_VIP_TYPE_IP6_NAT64] = "ip6-nat64", + [KP_VIP_TYPE_IP6_NAT66] = "ip6-nat66", +}; + +u8 *format_kp_vip_type (u8 * s, va_list * args) +{ + kp_vip_type_t vipt = va_arg (*args, kp_vip_type_t); + u32 i; + for (i=0; i<KP_VIP_N_TYPES; i++) + if (vipt == i) + return format(s, kp_vip_type_strings[i]); + return format(s, "_WRONG_TYPE_"); +} + +uword unformat_kp_vip_type (unformat_input_t * input, va_list * args) +{ + kp_vip_type_t *vipt = va_arg (*args, kp_vip_type_t *); + u32 i; + for (i=0; i<KP_VIP_N_TYPES; i++) + if (unformat(input, kp_vip_type_strings[i])) { + *vipt = i; + return 1; + } + return 0; +} + +u8 *format_kp_vip (u8 * s, va_list * args) +{ + kp_vip_t *vip = va_arg (*args, kp_vip_t *); + return format(s, "%U %U port:%u target_port:%u node_port:%u " + "new_size:%u #pod:%u%s", + format_kp_vip_type, vip->type, + format_ip46_prefix, &vip->prefix, vip->plen, IP46_TYPE_ANY, + ntohs(vip->port), ntohs(vip->target_port), + ntohs(vip->node_port), + vip->new_flow_table_mask + 1, + pool_elts(vip->pod_indexes), + (vip->flags & KP_VIP_FLAGS_USED)?"":" removed"); +} + +u8 *format_kp_pod (u8 * s, va_list * args) +{ + kp_pod_t *pod = va_arg (*args, kp_pod_t *); + return format(s, "%U %s", format_ip46_address, + &pod->address, IP46_TYPE_ANY, + (pod->flags & KP_POD_FLAGS_USED)?"used":"removed"); +} + +u8 *format_kp_vip_detailed (u8 * s, va_list * args) +{ + kp_main_t *kpm = &kp_main; + kp_vip_t *vip = va_arg (*args, kp_vip_t *); + uword indent = format_get_indent (s); + + s = format(s, "%U %U [%u] %U port:%u target_port:%u node_port:%u%s\n" + "%U new_size:%u\n", + format_white_space, indent, + format_kp_vip_type, vip->type, + vip - kpm->vips, format_ip46_prefix, &vip->prefix, vip->plen, IP46_TYPE_ANY, + ntohs(vip->port), ntohs(vip->target_port), + ntohs(vip->node_port), + (vip->flags & KP_VIP_FLAGS_USED)?"":" removed", + format_white_space, indent, + vip->new_flow_table_mask + 1); + + //Print counters + s = format(s, "%U counters:\n", + format_white_space, indent); + u32 i; + for (i=0; i<KP_N_VIP_COUNTERS; i++) + s = format(s, "%U %s: %d\n", + format_white_space, indent, + kpm->vip_counters[i].name, + vlib_get_simple_counter(&kpm->vip_counters[i], vip - kpm->vips)); + + + s = format(s, "%U #pod:%u\n", + format_white_space, indent, + pool_elts(vip->pod_indexes)); + + //Let's count the buckets for each POD + u32 *count = 0; + vec_validate(count, pool_len(kpm->pods)); //Possibly big alloc for not much... + kp_new_flow_entry_t *nfe; + vec_foreach(nfe, vip->new_flow_table) + count[nfe->pod_index]++; + + kp_pod_t *pod; + u32 *pod_index; + pool_foreach(pod_index, vip->pod_indexes, { + pod = &kpm->pods[*pod_index]; + s = format(s, "%U %U %d buckets %d flows dpo:%u %s\n", + format_white_space, indent, + format_ip46_address, &pod->address, IP46_TYPE_ANY, + count[pod - kpm->pods], + vlib_refcount_get(&kpm->pod_refcount, pod - kpm->pods), + pod->dpo.dpoi_index, + (pod->flags & KP_POD_FLAGS_USED)?"used":" removed"); + }); + + vec_free(count); + + /* + s = format(s, "%U new flows table:\n", format_white_space, indent); + kp_new_flow_entry_t *nfe; + vec_foreach(nfe, vip->new_flow_table) { + s = format(s, "%U %d: %d\n", format_white_space, indent, nfe - vip->new_flow_table, nfe->pod_index); + } + */ + return s; +} + +typedef struct { + u32 pod_index; + u32 last; + u32 skip; +} kp_pseudorand_t; + +static int kp_pseudorand_compare(void *a, void *b) +{ + kp_pod_t *poda, *podb; + kp_main_t *kpm = &kp_main; + poda = &kpm->pods[((kp_pseudorand_t *)a)->pod_index]; + podb = &kpm->pods[((kp_pseudorand_t *)b)->pod_index]; + return memcmp(&poda->address, &podb->address, sizeof(podb->address)); +} + +static void kp_vip_garbage_collection(kp_vip_t *vip) +{ + kp_main_t *kpm = &kp_main; + ASSERT (kpm->writer_lock[0]); + + u32 now = (u32) vlib_time_now(vlib_get_main()); + if (!clib_u32_loop_gt(now, vip->last_garbage_collection + KP_GARBAGE_RUN)) + return; + + vip->last_garbage_collection = now; + kp_pod_t *pod; + u32 *pod_index; + pool_foreach(pod_index, vip->pod_indexes, { + pod = &kpm->pods[*pod_index]; + if (!(pod->flags & KP_POD_FLAGS_USED) && //Not used + clib_u32_loop_gt(now, pod->last_used + KP_CONCURRENCY_TIMEOUT) && //Not recently used + (vlib_refcount_get(&kpm->pod_refcount, pod - kpm->pods) == 0)) + { //Not referenced + fib_entry_child_remove(pod->next_hop_fib_entry_index, + pod->next_hop_child_index); + fib_table_entry_delete_index(pod->next_hop_fib_entry_index, + FIB_SOURCE_RR); + pod->next_hop_fib_entry_index = FIB_NODE_INDEX_INVALID; + + pool_put(vip->pod_indexes, pod_index); + pool_put(kpm->pods, pod); + } + }); +} + +void kp_garbage_collection() +{ + kp_main_t *kpm = &kp_main; + kp_get_writer_lock(); + kp_vip_t *vip; + u32 *to_be_removed_vips = 0, *i; + pool_foreach(vip, kpm->vips, { + kp_vip_garbage_collection(vip); + + if (!(vip->flags & KP_VIP_FLAGS_USED) && + (pool_elts(vip->pod_indexes) == 0)) { + vec_add1(to_be_removed_vips, vip - kpm->vips); + } + }); + + vec_foreach(i, to_be_removed_vips) { + vip = &kpm->vips[*i]; + pool_put(kpm->vips, vip); + pool_free(vip->pod_indexes); + } + + vec_free(to_be_removed_vips); + kp_put_writer_lock(); +} + +static void kp_vip_update_new_flow_table(kp_vip_t *vip) +{ + kp_main_t *kpm = &kp_main; + kp_new_flow_entry_t *old_table; + u32 i, *pod_index; + kp_new_flow_entry_t *new_flow_table = 0; + kp_pod_t *pod; + kp_pseudorand_t *pr, *sort_arr = 0; + u32 count; + + ASSERT (kpm->writer_lock[0]); //We must have the lock + + //Check if some POD is configured or not + i = 0; + pool_foreach(pod_index, vip->pod_indexes, { + pod = &kpm->pods[*pod_index]; + if (pod->flags & KP_POD_FLAGS_USED) { //Not used anymore + i = 1; + goto out; //Not sure 'break' works in this macro-loop + } + }); + +out: + if (i == 0) { + //Only the default. i.e. no POD + vec_validate(new_flow_table, vip->new_flow_table_mask); + for (i=0; i<vec_len(new_flow_table); i++) + new_flow_table[i].pod_index = 0; + + goto finished; + } + + //First, let's sort the PODs + sort_arr = 0; + vec_alloc(sort_arr, pool_elts(vip->pod_indexes)); + + i = 0; + pool_foreach(pod_index, vip->pod_indexes, { + pod = &kpm->pods[*pod_index]; + if (!(pod->flags & KP_POD_FLAGS_USED)) //Not used anymore + continue; + + sort_arr[i].pod_index = pod - kpm->pods; + i++; + }); + _vec_len(sort_arr) = i; + + vec_sort_with_function(sort_arr, kp_pseudorand_compare); + + //Now let's pseudo-randomly generate permutations + vec_foreach(pr, sort_arr) { + kp_pod_t *pod = &kpm->pods[pr->pod_index]; + + u64 seed = clib_xxhash(pod->address.as_u64[0] ^ + pod->address.as_u64[1]); + /* We have 2^n buckets. + * skip must be prime with 2^n. + * So skip must be odd. + * MagLev actually state that M should be prime, + * but this has a big computation cost (% operation). + * Using 2^n is more better (& operation). + */ + pr->skip = ((seed & 0xffffffff) | 1) & vip->new_flow_table_mask; + pr->last = (seed >> 32) & vip->new_flow_table_mask; + } + + //Let's create a new flow table + vec_validate(new_flow_table, vip->new_flow_table_mask); + for (i=0; i<vec_len(new_flow_table); i++) + new_flow_table[i].pod_index = ~0; + + u32 done = 0; + while (1) { + vec_foreach(pr, sort_arr) { + while (1) { + u32 last = pr->last; + pr->last = (pr->last + pr->skip) & vip->new_flow_table_mask; + if (new_flow_table[last].pod_index == ~0) { + new_flow_table[last].pod_index = pr->pod_index; + break; + } + } + done++; + if (done == vec_len(new_flow_table)) + goto finished; + } + } + + vec_free(sort_arr); + +finished: + +//Count number of changed entries + count = 0; + for (i=0; i<vec_len(new_flow_table); i++) + if (vip->new_flow_table == 0 || + new_flow_table[i].pod_index != vip->new_flow_table[i].pod_index) + count++; + + old_table = vip->new_flow_table; + vip->new_flow_table = new_flow_table; + vec_free(old_table); +} + +int kp_conf(u32 per_cpu_sticky_buckets, u32 flow_timeout) +{ + kp_main_t *kpm = &kp_main; + + if (!is_pow2(per_cpu_sticky_buckets)) + return VNET_API_ERROR_INVALID_MEMORY_SIZE; + + kp_get_writer_lock(); //Not exactly necessary but just a reminder that it exists for my future self + kpm->per_cpu_sticky_buckets = per_cpu_sticky_buckets; + kpm->flow_timeout = flow_timeout; + kp_put_writer_lock(); + return 0; +} + +static +int kp_vip_find_index_with_lock(ip46_address_t *prefix, u8 plen, u32 *vip_index) +{ + kp_main_t *kpm = &kp_main; + kp_vip_t *vip; + ASSERT (kpm->writer_lock[0]); //This must be called with the lock owned + ip46_prefix_normalize(prefix, plen); + pool_foreach(vip, kpm->vips, { + if ((vip->flags & KP_POD_FLAGS_USED) && + vip->plen == plen && + vip->prefix.as_u64[0] == prefix->as_u64[0] && + vip->prefix.as_u64[1] == prefix->as_u64[1]) { + *vip_index = vip - kpm->vips; + return 0; + } + }); + return VNET_API_ERROR_NO_SUCH_ENTRY; +} + +int kp_vip_find_index(ip46_address_t *prefix, u8 plen, u32 *vip_index) +{ + int ret; + kp_get_writer_lock(); + ret = kp_vip_find_index_with_lock(prefix, plen, vip_index); + kp_put_writer_lock(); + return ret; +} + +static int kp_pod_find_index_vip(kp_vip_t *vip, ip46_address_t *address, u32 *pod_index) +{ + kp_main_t *kpm = &kp_main; + ASSERT (kpm->writer_lock[0]); //This must be called with the lock owned + kp_pod_t *pod; + u32 *podi; + pool_foreach(podi, vip->pod_indexes, { + pod = &kpm->pods[*podi]; + if (pod->vip_index == (vip - kpm->vips) && + pod->address.as_u64[0] == address->as_u64[0] && + pod->address.as_u64[1] == address->as_u64[1]) { + *pod_index = pod - kpm->pods; + return 0; + } + }); + return -1; +} + +int kp_vip_add_pods(u32 vip_index, ip46_address_t *addresses, u32 n) +{ + kp_main_t *kpm = &kp_main; + kp_get_writer_lock(); + kp_vip_t *vip; + if (!(vip = kp_vip_get_by_index(vip_index))) { + kp_put_writer_lock(); + return VNET_API_ERROR_NO_SUCH_ENTRY; + } + + ip46_type_t type = kp_vip_is_nat4(vip)?IP46_TYPE_IP4:IP46_TYPE_IP6; + u32 *to_be_added = 0; + u32 *to_be_updated = 0; + u32 i; + u32 *ip; + kp_snat_mapping_t *m; + kp_snat4_key_t m_key4; + clib_bihash_kv_8_8_t kv; + + //Sanity check + while (n--) { + + if (!kp_pod_find_index_vip(vip, &addresses[n], &i)) { + if (kpm->pods[i].flags & KP_POD_FLAGS_USED) { + vec_free(to_be_added); + vec_free(to_be_updated); + kp_put_writer_lock(); + return VNET_API_ERROR_VALUE_EXIST; + } + vec_add1(to_be_updated, i); + goto next; + } + + if (ip46_address_type(&addresses[n]) != type) { + vec_free(to_be_added); + vec_free(to_be_updated); + kp_put_writer_lock(); + return VNET_API_ERROR_INVALID_ADDRESS_FAMILY; + } + + if (n) { + u32 n2 = n; + while(n2--) //Check for duplicates + if (addresses[n2].as_u64[0] == addresses[n].as_u64[0] && + addresses[n2].as_u64[1] == addresses[n].as_u64[1]) + goto next; + } + + vec_add1(to_be_added, n); + +next: + continue; + } + + //Update reused PODs + vec_foreach(ip, to_be_updated) { + kpm->pods[*ip].flags = KP_POD_FLAGS_USED; + } + vec_free(to_be_updated); + + //Create those who have to be created + vec_foreach(ip, to_be_added) { + kp_pod_t *pod; + u32 *pod_index; + pool_get(kpm->pods, pod); + pod->address = addresses[*ip]; + pod->flags = KP_POD_FLAGS_USED; + pod->vip_index = vip_index; + pool_get(vip->pod_indexes, pod_index); + *pod_index = pod - kpm->pods; + + /* + * become a child of the FIB entry + * so we are informed when its forwarding changes + */ + fib_prefix_t nh = {}; + if (kp_vip_is_nat4(vip)) { + nh.fp_addr.ip4 = pod->address.ip4; + nh.fp_len = 32; + nh.fp_proto = FIB_PROTOCOL_IP4; + } else { + nh.fp_addr.ip6 = pod->address.ip6; + nh.fp_len = 128; + nh.fp_proto = FIB_PROTOCOL_IP6; + } + + pod->next_hop_fib_entry_index = + fib_table_entry_special_add(0, + &nh, + FIB_SOURCE_RR, + FIB_ENTRY_FLAG_NONE); + pod->next_hop_child_index = + fib_entry_child_add(pod->next_hop_fib_entry_index, + kpm->fib_node_type, + pod - kpm->pods); + + kp_pod_stack(pod); + + /* Add SNAT static mapping */ + pool_get (kpm->snat_mappings, m); + memset (m, 0, sizeof (*m)); + if (kp_vip_is_nat4(vip)) { + m_key4.addr = pod->address.ip4; + m_key4.port = vip->target_port; + m_key4.protocol = 0; + m_key4.fib_index = 0; + + m->vip.ip4 = vip->prefix.ip4;; + m->node_ip.ip4.as_u32 = 0; + m->pod_ip.ip4 = pod->address.ip4; + m->vip_is_ipv6 = 0; + m->node_ip_is_ipv6 = 0; + m->pod_ip_is_ipv6 = 0; + m->port = vip->port; + m->node_port = vip->node_port; + m->target_port = vip->target_port; + m->vrf_id = 0; + m->fib_index = 0; + + kv.key = m_key4.as_u64; + kv.value = m - kpm->snat_mappings; + clib_bihash_add_del_8_8(&kpm->mapping_by_pod, &kv, 1); + } else { + /* TBD */ + } + + } + vec_free(to_be_added); + + //Recompute flows + kp_vip_update_new_flow_table(vip); + + //Garbage collection maybe + kp_vip_garbage_collection(vip); + + kp_put_writer_lock(); + return 0; +} + +int kp_vip_del_pods_withlock(u32 vip_index, ip46_address_t *addresses, u32 n) +{ + kp_main_t *kpm = &kp_main; + u32 now = (u32) vlib_time_now(vlib_get_main()); + u32 *ip = 0; + + kp_vip_t *vip; + if (!(vip = kp_vip_get_by_index(vip_index))) { + return VNET_API_ERROR_NO_SUCH_ENTRY; + } + + u32 *indexes = NULL; + while (n--) { + u32 i; + if (kp_pod_find_index_vip(vip, &addresses[n], &i)) { + vec_free(indexes); + return VNET_API_ERROR_NO_SUCH_ENTRY; + } + + if (n) { //Check for duplicates + u32 n2 = n - 1; + while(n2--) { + if (addresses[n2].as_u64[0] == addresses[n].as_u64[0] && + addresses[n2].as_u64[1] == addresses[n].as_u64[1]) + goto next; + } + } + + vec_add1(indexes, i); +next: + continue; + } + + //Garbage collection maybe + kp_vip_garbage_collection(vip); + + if (indexes != NULL) { + vec_foreach(ip, indexes) { + kpm->pods[*ip].flags &= ~KP_POD_FLAGS_USED; + kpm->pods[*ip].last_used = now; + } + + //Recompute flows + kp_vip_update_new_flow_table(vip); + } + + vec_free(indexes); + return 0; +} + +int kp_vip_del_pods(u32 vip_index, ip46_address_t *addresses, u32 n) +{ + kp_get_writer_lock(); + int ret = kp_vip_del_pods_withlock(vip_index, addresses, n); + kp_put_writer_lock(); + return ret; +} + +/** + * Add the VIP adjacency to the ip4 or ip6 fib + */ +static void kp_vip_add_adjacency(kp_main_t *kpm, kp_vip_t *vip) +{ + dpo_proto_t proto = 0; + dpo_id_t dpo = DPO_INVALID; + fib_prefix_t pfx = {}; + if (kp_vip_is_ip4(vip)) { + pfx.fp_addr.ip4 = vip->prefix.ip4; + pfx.fp_len = vip->plen - 96; + pfx.fp_proto = FIB_PROTOCOL_IP4; + proto = DPO_PROTO_IP4; + } else { + pfx.fp_addr.ip6 = vip->prefix.ip6; + pfx.fp_len = vip->plen; + pfx.fp_proto = FIB_PROTOCOL_IP6; + proto = DPO_PROTO_IP6; + } + dpo_set(&dpo, kp_vip_is_nat4(vip)?kpm->dpo_nat4_type:kpm->dpo_nat6_type, + proto, vip - kpm->vips); + fib_table_entry_special_dpo_add(0, + &pfx, + FIB_SOURCE_PLUGIN_HI, + FIB_ENTRY_FLAG_EXCLUSIVE, + &dpo); + dpo_reset(&dpo); +} + +/** + * Deletes the adjacency podsociated with the VIP + */ +static void kp_vip_del_adjacency(kp_main_t *kpm, kp_vip_t *vip) +{ + fib_prefix_t pfx = {}; + if (kp_vip_is_ip4(vip)) { + pfx.fp_addr.ip4 = vip->prefix.ip4; + pfx.fp_len = vip->plen - 96; + pfx.fp_proto = FIB_PROTOCOL_IP4; + } else { + pfx.fp_addr.ip6 = vip->prefix.ip6; + pfx.fp_len = vip->plen; + pfx.fp_proto = FIB_PROTOCOL_IP6; + } + fib_table_entry_special_remove(0, &pfx, FIB_SOURCE_PLUGIN_HI); +} + +int kp_vip_add(ip46_address_t *prefix, u8 plen, kp_vip_type_t type, + u32 new_length, u32 *vip_index, + u16 port, u16 target_port, u16 node_port) +{ + kp_main_t *kpm = &kp_main; + vlib_main_t *vm = kpm->vlib_main; + kp_vip_t *vip; + u32 key, *key_copy; + uword * entry; + + kp_get_writer_lock(); + ip46_prefix_normalize(prefix, plen); + + if (!kp_vip_find_index_with_lock(prefix, plen, vip_index)) { + kp_put_writer_lock(); + return VNET_API_ERROR_VALUE_EXIST; + } + + if (!is_pow2(new_length)) { + kp_put_writer_lock(); + return VNET_API_ERROR_INVALID_MEMORY_SIZE; + } + + if (ip46_prefix_is_ip4(prefix, plen) && + (type != KP_VIP_TYPE_IP4_NAT44) && + (type != KP_VIP_TYPE_IP4_NAT46)) + return VNET_API_ERROR_INVALID_ADDRESS_FAMILY; + + + //Allocate + pool_get(kpm->vips, vip); + + //Init + vip->prefix = *prefix; + vip->plen = plen; + vip->port = clib_host_to_net_u16(port); + vip->target_port = clib_host_to_net_u16(target_port); + vip->node_port = clib_host_to_net_u16(node_port); + vip->last_garbage_collection = (u32) vlib_time_now(vlib_get_main()); + vip->type = type; + vip->flags = KP_VIP_FLAGS_USED; + vip->pod_indexes = 0; + + //Validate counters + u32 i; + for (i = 0; i < KP_N_VIP_COUNTERS; i++) { + vlib_validate_simple_counter(&kpm->vip_counters[i], vip - kpm->vips); + vlib_zero_simple_counter(&kpm->vip_counters[i], vip - kpm->vips); + } + + //Configure new flow table + vip->new_flow_table_mask = new_length - 1; + vip->new_flow_table = 0; + + //Create a new flow hash table full of the default entry + kp_vip_update_new_flow_table(vip); + + //Create adjacency to direct traffic + kp_vip_add_adjacency(kpm, vip); + + //Create maping from nodeport to vip_index + key = clib_host_to_net_u16(node_port); + entry = hash_get_mem (kpm->nodeport_by_key, &key); + if (entry) + return VNET_API_ERROR_VALUE_EXIST; + + key_copy = clib_mem_alloc (sizeof (*key_copy)); + clib_memcpy (key_copy, &key, sizeof (*key_copy)); + hash_set_mem (kpm->nodeport_by_key, key_copy, vip - kpm->vips); + + /* receive packets destined to NodeIP:NodePort */ + udp_register_dst_port (vm, node_port, kp4_nodeport_node.index, 1); + udp_register_dst_port (vm, node_port, kp6_nodeport_node.index, 0); + + //Return result + *vip_index = vip - kpm->vips; + + kp_put_writer_lock(); + return 0; +} + +int kp_vip_del(u32 vip_index) +{ + kp_main_t *kpm = &kp_main; + kp_vip_t *vip; + kp_get_writer_lock(); + if (!(vip = kp_vip_get_by_index(vip_index))) { + kp_put_writer_lock(); + return VNET_API_ERROR_NO_SUCH_ENTRY; + } + + //FIXME: This operation is actually not working + //We will need to remove state before performing this. + + { + //Remove all PODs + ip46_address_t *pods = 0; + kp_pod_t *pod; + u32 *pod_index; + pool_foreach(pod_index, vip->pod_indexes, { + pod = &kpm->pods[*pod_index]; + vec_add1(pods, pod->address); + }); + if (vec_len(pods)) + kp_vip_del_pods_withlock(vip_index, pods, vec_len(pods)); + vec_free(pods); + } + + //Delete adjacency + kp_vip_del_adjacency(kpm, vip); + + //Set the VIP pod unused + vip->flags &= ~KP_VIP_FLAGS_USED; + + kp_put_writer_lock(); + return 0; +} + +/* *INDENT-OFF* */ +VLIB_PLUGIN_REGISTER () = { + .version = VPP_BUILD_VER, + .description = "kube-proxy data plane", +}; +/* *INDENT-ON* */ + +u8 *format_kp_dpo (u8 * s, va_list * va) +{ + index_t index = va_arg (*va, index_t); + CLIB_UNUSED(u32 indent) = va_arg (*va, u32); + kp_main_t *kpm = &kp_main; + kp_vip_t *vip = pool_elt_at_index (kpm->vips, index); + return format (s, "%U", format_kp_vip, vip); +} + +static void kp_dpo_lock (dpo_id_t *dpo) {} +static void kp_dpo_unlock (dpo_id_t *dpo) {} + +static fib_node_t * +kp_fib_node_get_node (fib_node_index_t index) +{ + kp_main_t *kpm = &kp_main; + kp_pod_t *pod = pool_elt_at_index (kpm->pods, index); + return (&pod->fib_node); +} + +static void +kp_fib_node_last_lock_gone (fib_node_t *node) +{ +} + +static kp_pod_t * +kp_pod_from_fib_node (fib_node_t *node) +{ + return ((kp_pod_t*)(((char*)node) - + STRUCT_OFFSET_OF(kp_pod_t, fib_node))); +} + +static void +kp_pod_stack (kp_pod_t *pod) +{ + kp_main_t *kpm = &kp_main; + kp_vip_t *vip = &kpm->vips[pod->vip_index]; + dpo_stack(kp_vip_is_nat4(vip)?kpm->dpo_nat4_type:kpm->dpo_nat6_type, + kp_vip_is_ip4(vip)?DPO_PROTO_IP4:DPO_PROTO_IP6, + &pod->dpo, + fib_entry_contribute_ip_forwarding( + pod->next_hop_fib_entry_index)); +} + +static fib_node_back_walk_rc_t +kp_fib_node_back_walk_notify (fib_node_t *node, + fib_node_back_walk_ctx_t *ctx) +{ + kp_pod_stack(kp_pod_from_fib_node(node)); + return (FIB_NODE_BACK_WALK_CONTINUE); +} + +int kp_nat4_interface_add_del (u32 sw_if_index, int is_del) +{ + if (is_del) + { + vnet_feature_enable_disable ("ip4-unicast", "kp-nat4-in2out", + sw_if_index, 0, 0, 0); + } + else + { + vnet_feature_enable_disable ("ip4-unicast", "kp-nat4-in2out", + sw_if_index, 1, 0, 0); + } + + return 0; +} + +clib_error_t * +kp_init (vlib_main_t * vm) +{ + vlib_thread_main_t *tm = vlib_get_thread_main (); + kp_main_t *kpm = &kp_main; + kpm->vnet_main = vnet_get_main (); + kpm->vlib_main = vm; + + kp_pod_t *default_pod; + fib_node_vft_t kp_fib_node_vft = { + .fnv_get = kp_fib_node_get_node, + .fnv_last_lock = kp_fib_node_last_lock_gone, + .fnv_back_walk = kp_fib_node_back_walk_notify, + }; + dpo_vft_t kp_vft = { + .dv_lock = kp_dpo_lock, + .dv_unlock = kp_dpo_unlock, + .dv_format = format_kp_dpo, + }; + + kpm->vips = 0; + kpm->per_cpu = 0; + vec_validate(kpm->per_cpu, tm->n_vlib_mains - 1); + kpm->writer_lock = clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES); + kpm->writer_lock[0] = 0; + kpm->per_cpu_sticky_buckets = KP_DEFAULT_PER_CPU_STICKY_BUCKETS; + kpm->flow_timeout = KP_DEFAULT_FLOW_TIMEOUT; + kpm->dpo_nat4_type = dpo_register_new_type(&kp_vft, kp_dpo_nat4_nodes); + kpm->dpo_nat6_type = dpo_register_new_type(&kp_vft, kp_dpo_nat6_nodes); + kpm->fib_node_type = fib_node_register_new_type(&kp_fib_node_vft); + + //Init POD reference counters + vlib_refcount_init(&kpm->pod_refcount); + + //Allocate and init default POD. + kpm->pods = 0; + pool_get(kpm->pods, default_pod); + default_pod->flags = 0; + default_pod->dpo.dpoi_next_node = KP_NEXT_DROP; + default_pod->vip_index = ~0; + default_pod->address.ip6.as_u64[0] = 0xffffffffffffffffL; + default_pod->address.ip6.as_u64[1] = 0xffffffffffffffffL; + + kpm->nodeport_by_key + = hash_create_mem (0, sizeof(u16), sizeof (uword)); + + clib_bihash_init_8_8 (&kpm->mapping_by_pod, + "mapping_by_pod", KP_MAPPING_BUCKETS, + KP_MAPPING_MEMORY_SIZE); + +#define _(a,b,c) kpm->vip_counters[c].name = b; + kp_foreach_vip_counter +#undef _ + return NULL; +} + +VLIB_INIT_FUNCTION (kp_init); diff --git a/src/plugins/kubeproxy/kp.h b/src/plugins/kubeproxy/kp.h new file mode 100644 index 00000000000..243c002833f --- /dev/null +++ b/src/plugins/kubeproxy/kp.h @@ -0,0 +1,473 @@ +/* + * Copyright (c) 2017 Intel and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "POD IS" BPODIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * kp-plugin implements a MagLev-like load balancer. + * http://research.google.com/pubs/pub44824.html + * + * It hasn't been tested for interoperability with the original MagLev + * but intends to provide similar functionality. + * The kube-proxy receives traffic destined to VIP (Virtual IP) + * addresses from one or multiple(ECMP) routers. + * The kube-proxy tunnels the traffic toward many application servers + * ensuring session stickyness (i.e. that a single sessions is tunneled + * towards a single application server). + * + */ + +#ifndef KP_PLUGIN_KP_KP_H_ +#define KP_PLUGIN_KP_KP_H_ + +#include <vnet/util/refcount.h> +#include <vnet/vnet.h> +#include <vnet/ip/ip.h> +#include <vnet/dpo/dpo.h> +#include <vnet/fib/fib_table.h> +#include <vppinfra/bihash_8_8.h> + +#include <kubeproxy/kphash.h> + +#define KP_DEFAULT_PER_CPU_STICKY_BUCKETS 1 << 10 +#define KP_DEFAULT_FLOW_TIMEOUT 40 +#define KP_MAPPING_BUCKETS 1024 +#define KP_MAPPING_MEMORY_SIZE 64<<20 + +typedef enum { + KP_NEXT_DROP, + KP_N_NEXT, +} kp_next_t; + +typedef enum { + KP_NAT4_IN2OUT_NEXT_DROP, + KP_NAT4_IN2OUT_NEXT_LOOKUP, + KP_NAT4_IN2OUT_N_NEXT, +} kp_nat4_in2out_next_t; + +#define foreach_kp_nat_in2out_error \ +_(UNSUPPORTED_PROTOCOL, "Unsupported protocol") \ +_(IN2OUT_PACKETS, "Good in2out packets processed") \ +_(NO_TRANSLATION, "No translation") + +typedef enum { +#define _(sym,str) KP_NAT_IN2OUT_ERROR_##sym, + foreach_kp_nat_in2out_error +#undef _ + KP_NAT_IN2OUT_N_ERROR, +} kp_nat_in2out_error_t; + +/** + * kube-proxy supports three types of service + */ +typedef enum { + KP_SVR_TYPE_VIP_PORT, + KP_SVR_TYPE_NODEIP_PORT, + KP_SVR_TYPE_EXT_LB, + KP_SVR_N_TYPES, +} kp_svr_type_t; + +typedef enum { + KP_NODEPORT_NEXT_IP4_NAT4, + KP_NODEPORT_NEXT_IP4_NAT6, + KP_NODEPORT_NEXT_IP6_NAT4, + KP_NODEPORT_NEXT_IP6_NAT6, + KP_NODEPORT_NEXT_DROP, + KP_NODEPORT_N_NEXT, +} kp_nodeport_next_t; + +/** + * Each VIP is configured with a set of PODs + */ +typedef struct { + /** + * Registration to FIB event. + */ + fib_node_t fib_node; + + /** + * Destination address used to transfer traffic towards to that POD. + * The address is also used pod ID and pseudo-random + * seed for the load-balancing process. + */ + ip46_address_t address; + + /** + * PODs are indexed by address and VIP Index. + * Which means there will be duplicated if the same server + * address is used for multiple VIPs. + */ + u32 vip_index; + + /** + * Some per-POD flags. + * For now only KP_POD_FLAGS_USED is defined. + */ + u8 flags; + +#define KP_POD_FLAGS_USED 0x1 + + /** + * Rotating timestamp of when KP_POD_FLAGS_USED flag was last set. + * + * POD removal is based on garbage collection and reference counting. + * When an POD is removed, there is a race between configuration core + * and worker cores which may still add a reference while it should not + * be used. This timestamp is used to not remove the POD while a race condition + * may happen. + */ + u32 last_used; + + /** + * The FIB entry index for the next-hop + */ + fib_node_index_t next_hop_fib_entry_index; + + /** + * The child index on the FIB entry + */ + u32 next_hop_child_index; + + /** + * The next DPO in the graph to follow. + */ + dpo_id_t dpo; + +} kp_pod_t; + +format_function_t format_kp_pod; + +typedef struct { + u32 pod_index; +} kp_new_flow_entry_t; + +#define kp_foreach_vip_counter \ + _(NEXT_PACKET, "packet from existing sessions", 0) \ + _(FIRST_PACKET, "first session packet", 1) \ + _(UNTRACKED_PACKET, "untracked packet", 2) \ + _(NO_SERVER, "no server configured", 3) + +typedef enum { +#define _(a,b,c) KP_VIP_COUNTER_##a = c, + kp_foreach_vip_counter +#undef _ + KP_N_VIP_COUNTERS +} kp_vip_counter_t; + +/** + * kube-proxy supports IPv4 and IPv6 traffic + * and NAT4 and NAT6. + */ +typedef enum { + KP_VIP_TYPE_IP4_NAT44, + KP_VIP_TYPE_IP4_NAT46, + KP_VIP_TYPE_IP6_NAT64, + KP_VIP_TYPE_IP6_NAT66, + KP_VIP_N_TYPES, +} kp_vip_type_t; + +format_function_t format_kp_vip_type; +unformat_function_t unformat_kp_vip_type; + +/** + * Load balancing service is provided per VIP. + * In this data model, a VIP can be a whole prefix. + * But load balancing only + * occurs on a per-source-address/port basis. Meaning that if a given source + * reuses the same port for multiple destinations within the same VIP, + * they will be considered as a single flow. + */ +typedef struct { + + //Runtime + + /** + * Vector mapping (flow-hash & new_connect_table_mask) to POD index. + * This is used for new flows. + */ + kp_new_flow_entry_t *new_flow_table; + + /** + * New flows table length - 1 + * (length MUST be a power of 2) + */ + u32 new_flow_table_mask; + + /** + * last time garbage collection was run to free the PODs. + */ + u32 last_garbage_collection; + + //Not runtime + + /** + * A Virtual IP represents a given service delivered + * by a set of PODs. It can be a single + * address or a prefix. + * IPv4 prefixes are encoded using IPv4-in-IPv6 embedded address + * (i.e. ::/96 prefix). + */ + ip46_address_t prefix; + + /** + * The VIP prefix length. + * In case of IPv4, plen = 96 + ip4_plen. + */ + u8 plen; + + /** + * Service port. network byte order + */ + u16 port; + + /** + * Pod's port corresponding to specific service. network byte order + */ + u16 target_port; + + /** + * Node's port, can access service via NodeIP:node_port. network byte order + */ + u16 node_port; + + + /** + * The type of traffic for this. + * KP_TYPE_UNDEFINED if unknown. + */ + kp_vip_type_t type; + + /** + * Flags related to this VIP. + * KP_VIP_FLAGS_USED means the VIP is active. + * When it is not set, the VIP in the process of being removed. + * We cannot immediately remove a VIP because the VIP index still may be stored + * in the adjacency index. + */ + u8 flags; +#define KP_VIP_FLAGS_USED 0x1 + + /** + * Pool of POD indexes used for this VIP. + * This also includes PODs that have been removed (but are still referenced). + */ + u32 *pod_indexes; + +} kp_vip_t; + +/* + * mapping from nodeport to vip_index + */ +typedef struct { + + u32 vip_index; + +} kp_nodeport_t; + +#define kp_vip_is_ip4(vip) ((vip)->type == KP_VIP_TYPE_IP4_NAT44 \ + || (vip)->type == KP_VIP_TYPE_IP4_NAT46) +#define kp_vip_is_nat4(vip) ((vip)->type == KP_VIP_TYPE_IP6_NAT64 \ + || (vip)->type == KP_VIP_TYPE_IP4_NAT44) +format_function_t format_kp_vip; +format_function_t format_kp_vip_detailed; + +#define foreach_kp_nat_protocol \ + _(UDP, 0, udp, "udp") \ + _(TCP, 1, tcp, "tcp") + +typedef enum { +#define _(N, i, n, s) KP_NAT_PROTOCOL_##N = i, + foreach_kp_nat_protocol +#undef _ +} kp_nat_protocol_t; + +always_inline u32 +kp_ip_proto_to_nat_proto (u8 ip_proto) +{ + u32 nat_proto = ~0; + + nat_proto = (ip_proto == IP_PROTOCOL_UDP) ? KP_NAT_PROTOCOL_UDP : nat_proto; + nat_proto = (ip_proto == IP_PROTOCOL_TCP) ? KP_NAT_PROTOCOL_TCP : nat_proto; + + return nat_proto; +} + +/* Key for Pod's egress SNAT */ +typedef struct { + union + { + struct + { + ip4_address_t addr; + u16 port; + u16 protocol:3, + fib_index:13; + }; + u64 as_u64; + }; +} kp_snat4_key_t; + +typedef struct +{ + ip6_address_t prefix; + u8 plen; + u32 vrf_id; + u32 fib_index; +} kp_snat6_key_t; + +typedef struct { + kp_svr_type_t svr_type; + ip46_address_t vip; + ip46_address_t node_ip; + ip46_address_t pod_ip; + u8 vip_is_ipv6; + u8 node_ip_is_ipv6; + u8 pod_ip_is_ipv6; + u16 port; /* Network byte order */ + u16 node_port; /* Network byte order */ + u16 target_port; /* Network byte order */ + u32 vrf_id; + u32 fib_index; +} kp_snat_mapping_t; + +typedef struct { + /** + * Each CPU has its own sticky flow hash table. + * One single table is used for all VIPs. + */ + kp_hash_t *sticky_ht; + +} kp_per_cpu_t; + +typedef struct { + /** + * Pool of all Virtual IPs + */ + kp_vip_t *vips; + + /** + * Pool of PODs. + * PODs are referenced by address and vip index. + * The first element (index 0) is special and used only to fill + * new_flow_tables when no POD has been configured. + */ + kp_pod_t *pods; + + /** + * Each POD has an associated reference counter. + * As pods[0] has a special meaning, its associated counter + * starts at 0 and is decremented instead. i.e. do not use it. + */ + vlib_refcount_t pod_refcount; + + /* hash lookup vip_index by key: {u16: nodeport} */ + uword * nodeport_by_key; + + + /** + * Some global data is per-cpu + */ + kp_per_cpu_t *per_cpu; + + /** + * Node next index for IP adjacencies, for each of the traffic types. + */ + u32 ip_lookup_next_index[KP_VIP_N_TYPES]; + + /** + * Number of buckets in the per-cpu sticky hash table. + */ + u32 per_cpu_sticky_buckets; + + /** + * Flow timeout in seconds. + */ + u32 flow_timeout; + + /** + * Per VIP counter + */ + vlib_simple_counter_main_t vip_counters[KP_N_VIP_COUNTERS]; + + /** + * DPO used to send packet from IP4/6 lookup to KP node. + */ + dpo_type_t dpo_nat4_type; + dpo_type_t dpo_nat6_type; + + /** + * Node type for registering to fib changes. + */ + fib_node_type_t fib_node_type; + + /* Find a static mapping by pod IP : target_port */ + clib_bihash_8_8_t mapping_by_pod; + + /* Static mapping pool */ + kp_snat_mapping_t * snat_mappings; + + /** + * API dynamically registered base ID. + */ + u16 msg_id_base; + + volatile u32 *writer_lock; + + /* convenience */ + vlib_main_t *vlib_main; + vnet_main_t *vnet_main; +} kp_main_t; + +#define ip46_address_type(ip46) (ip46_address_is_ip4(ip46)?IP46_TYPE_IP4:IP46_TYPE_IP6) +#define ip46_prefix_is_ip4(ip46, len) ((len) >= 96 && ip46_address_is_ip4(ip46)) +#define ip46_prefix_type(ip46, len) (ip46_prefix_is_ip4(ip46, len)?IP46_TYPE_IP4:IP46_TYPE_IP6) + +void ip46_prefix_normalize(ip46_address_t *prefix, u8 plen); +uword unformat_ip46_prefix (unformat_input_t * input, va_list * args); +u8 *format_ip46_prefix (u8 * s, va_list * args); + + +extern kp_main_t kp_main; +extern vlib_node_registration_t kp4_node; +extern vlib_node_registration_t kp6_node; +extern vlib_node_registration_t kp4_nodeport_node; +extern vlib_node_registration_t kp6_nodeport_node; +extern vlib_node_registration_t kp_nat4_in2out_node; + +/** + * Fix global kube-proxy parameters. + * @return 0 on success. VNET_KP_ERR_XXX on error + */ +int kp_conf(u32 sticky_buckets, u32 flow_timeout); + +int kp_vip_add(ip46_address_t *prefix, u8 plen, kp_vip_type_t type, + u32 new_length, u32 *vip_index, + u16 port, u16 target_port, u16 node_port); +int kp_vip_del(u32 vip_index); + +int kp_vip_find_index(ip46_address_t *prefix, u8 plen, u32 *vip_index); + +#define kp_vip_get_by_index(index) (pool_is_free_index(kp_main.vips, index)?NULL:pool_elt_at_index(kp_main.vips, index)) + +int kp_vip_add_pods(u32 vip_index, ip46_address_t *addresses, u32 n); +int kp_vip_del_pods(u32 vip_index, ip46_address_t *addresses, u32 n); + +u32 kp_hash_time_now(vlib_main_t * vm); + +void kp_garbage_collection(); + +int kp_nat4_interface_add_del (u32 sw_if_index, int is_del); + +format_function_t format_kp_main; + +#endif /* KP_PLUGIN_KP_KP_H_ */ diff --git a/src/plugins/kubeproxy/kp_api.c b/src/plugins/kubeproxy/kp_api.c new file mode 100644 index 00000000000..56b247a395e --- /dev/null +++ b/src/plugins/kubeproxy/kp_api.c @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2016 Intel and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "POD IS" BPODIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <kubeproxy/kp.h> + +#include <vppinfra/byte_order.h> +#include <vlibapi/api.h> +#include <vlibmemory/api.h> + +#define vl_msg_id(n,h) n, +typedef enum { +#include <kubeproxy/kp.api.h> + /* We'll want to know how many messages IDs we need... */ + VL_MSG_FIRST_AVAILABLE, +} vl_msg_id_t; +#undef vl_msg_id + + +/* define message structures */ +#define vl_typedefs +#include <kubeproxy/kp.api.h> +#undef vl_typedefs + +/* define generated endian-swappers */ +#define vl_endianfun +#include <kubeproxy/kp.api.h> +#undef vl_endianfun + +#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__) + +/* Get the API version number */ +#define vl_api_version(n,v) static u32 api_version=(v); +#include <kubeproxy/kp.api.h> +#undef vl_api_version + +#define vl_msg_name_crc_list +#include <kubeproxy/kp.api.h> +#undef vl_msg_name_crc_list + + +#define REPLY_MSG_ID_BASE kpm->msg_id_base +#include <vlibapi/api_helper_macros.h> + +static void +setup_message_id_table (kp_main_t * kpm, api_main_t * am) +{ +#define _(id,n,crc) \ + vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id + kpm->msg_id_base); + foreach_vl_msg_name_crc_kp; +#undef _ +} + +/* Macro to finish up custom dump fns */ +#define FINISH \ + vec_add1 (s, 0); \ + vl_print (handle, (char *)s); \ + vec_free (s); \ + return handle; + +static void +vl_api_kp_conf_t_handler +(vl_api_kp_conf_t * mp) +{ + kp_main_t *kpm = &kp_main; + vl_api_kp_conf_reply_t * rmp; + int rv = 0; + + rv = kp_conf(mp->sticky_buckets_per_core, + mp->flow_timeout); + + REPLY_MACRO (VL_API_KP_CONF_REPLY); +} + +static void *vl_api_kp_conf_t_print +(vl_api_kp_conf_t *mp, void * handle) +{ + u8 * s; + s = format (0, "SCRIPT: kp_conf "); + s = format (s, "%u ", mp->sticky_buckets_per_core); + s = format (s, "%u ", mp->flow_timeout); + FINISH; +} + + +static void +vl_api_kp_add_del_vip_t_handler +(vl_api_kp_add_del_vip_t * mp) +{ + kp_main_t *kpm = &kp_main; + vl_api_kp_conf_reply_t * rmp; + int rv = 0; + ip46_address_t prefix; + u8 prefix_length = mp->prefix_length; + + if (mp->is_ipv6 == 0) + { + prefix_length += 96; + memcpy(&prefix.ip4, mp->ip_prefix, sizeof(prefix.ip4)); + prefix.pad[0] = prefix.pad[1] = prefix.pad[2] = 0; + } + else + { + memcpy(&prefix.ip6, mp->ip_prefix, sizeof(prefix.ip6)); + } + + if (mp->is_del) { + u32 vip_index; + if (!(rv = kp_vip_find_index(&prefix, prefix_length, &vip_index))) + rv = kp_vip_del(vip_index); + } else { + u32 vip_index; + kp_vip_type_t type; + if (mp->is_ipv6 == 0) { + type = mp->is_nat4?KP_VIP_TYPE_IP4_NAT44:KP_VIP_TYPE_IP4_NAT46; + } else { + type = mp->is_nat4?KP_VIP_TYPE_IP6_NAT64:KP_VIP_TYPE_IP6_NAT66; + } + + rv = kp_vip_add(&prefix, prefix_length, type, + ntohl(mp->new_flows_table_length), &vip_index, + ntohs(mp->port), ntohs(mp->target_port), + ntohs(mp->node_port)); + } + REPLY_MACRO (VL_API_KP_CONF_REPLY); +} + +static void *vl_api_kp_add_del_vip_t_print +(vl_api_kp_add_del_vip_t *mp, void * handle) +{ + u8 * s; + s = format (0, "SCRIPT: kp_add_del_vip "); + s = format (s, "%U ", format_ip46_prefix, + (ip46_address_t *)mp->ip_prefix, mp->prefix_length, IP46_TYPE_ANY); + s = format (s, "port %u ", mp->port); + s = format (s, "target_port %u ", mp->target_port); + s = format (s, "node_port %u ", mp->node_port); + s = format (s, "%s ", mp->is_nat4?"nat4":"nat6"); + s = format (s, "%u ", mp->new_flows_table_length); + s = format (s, "%s ", mp->is_del?"del":"add"); + FINISH; +} + +static void +vl_api_kp_add_del_pod_t_handler +(vl_api_kp_add_del_pod_t * mp) +{ + kp_main_t *kpm = &kp_main; + vl_api_kp_conf_reply_t * rmp; + int rv = 0; + u32 vip_index; + + ip46_address_t vip_ip_prefix; + u8 vip_prefix_length = mp->vip_prefix_length; + + if (mp->vip_is_ipv6 == 0) + { + vip_prefix_length += 96; + memcpy(&vip_ip_prefix.ip4, mp->vip_ip_prefix, + sizeof(vip_ip_prefix.ip4)); + vip_ip_prefix.pad[0] = vip_ip_prefix.pad[1] = vip_ip_prefix.pad[2] = 0; + } + else + { + memcpy(&vip_ip_prefix.ip6, mp->vip_ip_prefix, + sizeof(vip_ip_prefix.ip6)); + } + + ip46_address_t pod_address; + + if (mp->pod_is_ipv6 == 0) + { + memcpy(&pod_address.ip4, mp->pod_address, + sizeof(pod_address.ip4)); + pod_address.pad[0] = pod_address.pad[1] = pod_address.pad[2] = 0; + } + else + { + memcpy(&pod_address.ip6, mp->pod_address, + sizeof(pod_address.ip6)); + } + + if ((rv = kp_vip_find_index(&vip_ip_prefix, vip_prefix_length, &vip_index))) + goto done; + + if (mp->is_del) + rv = kp_vip_del_pods(vip_index, &pod_address, 1); + else + rv = kp_vip_add_pods(vip_index, &pod_address, 1); + +done: + REPLY_MACRO (VL_API_KP_CONF_REPLY); +} + +static void *vl_api_kp_add_del_pod_t_print +(vl_api_kp_add_del_pod_t *mp, void * handle) +{ + u8 * s; + s = format (0, "SCRIPT: kp_add_del_pod "); + s = format (s, "%U ", format_ip46_prefix, + (ip46_address_t *)mp->vip_ip_prefix, mp->vip_prefix_length, IP46_TYPE_ANY); + s = format (s, "%U ", format_ip46_address, + (ip46_address_t *)mp->pod_address, IP46_TYPE_ANY); + s = format (s, "%s ", mp->is_del?"del":"add"); + FINISH; +} + +/* List of message types that this plugin understands */ +#define foreach_kp_plugin_api_msg \ +_(KP_CONF, kp_conf) \ +_(KP_ADD_DEL_VIP, kp_add_del_vip) \ +_(KP_ADD_DEL_POD, kp_add_del_pod) + +static clib_error_t * kp_api_init (vlib_main_t * vm) +{ + kp_main_t *kpm = &kp_main; + u8 *name = format (0, "kp_%08x%c", api_version, 0); + kpm->msg_id_base = vl_msg_api_get_msg_ids + ((char *) name, VL_MSG_FIRST_AVAILABLE); + +#define _(N,n) \ + vl_msg_api_set_handlers((VL_API_##N + kpm->msg_id_base), \ + #n, \ + vl_api_##n##_t_handler, \ + vl_noop_handler, \ + vl_api_##n##_t_endian, \ + vl_api_##n##_t_print, \ + sizeof(vl_api_##n##_t), 1); + foreach_kp_plugin_api_msg; +#undef _ + + /* Add our API messages to the global name_crc hash table */ + setup_message_id_table (kpm, &api_main); + + return 0; +} + +VLIB_INIT_FUNCTION (kp_api_init); diff --git a/src/plugins/kubeproxy/kp_cli.c b/src/plugins/kubeproxy/kp_cli.c new file mode 100644 index 00000000000..6a18834274e --- /dev/null +++ b/src/plugins/kubeproxy/kp_cli.c @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2016 Intel and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "POD IS" BPODIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <kubeproxy/kp.h> + + +static clib_error_t * +kp_vip_command_fn (vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) +{ + unformat_input_t _line_input, *line_input = &_line_input; + ip46_address_t prefix; + u8 plen; + u32 new_len = 1024; + u32 port = 0; + u32 target_port = 0; + u32 node_port = 0; + u32 del = 0; + int ret; + u32 nat4 = 0; + kp_vip_type_t type; + clib_error_t *error = 0; + + if (!unformat_user (input, unformat_line_input, line_input)) + return 0; + + if (!unformat(line_input, "%U", unformat_ip46_prefix, &prefix, &plen, IP46_TYPE_ANY, &plen)) { + error = clib_error_return (0, "invalid vip prefix: '%U'", + format_unformat_error, line_input); + goto done; + } + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) + { + if (unformat(line_input, "new_len %d", &new_len)) + ; + else if (unformat(line_input, "port %d", &port)) + ; + else if (unformat(line_input, "target_port %d", &target_port)) + ; + else if (unformat(line_input, "node_port %d", &node_port)) + ; + else if (unformat(line_input, "del")) + del = 1; + else if (unformat(line_input, "nat4")) + nat4 = 1; + else if (unformat(line_input, "nat6")) + nat4 = 0; + else { + error = clib_error_return (0, "parse error: '%U'", + format_unformat_error, line_input); + goto done; + } + } + + + if (ip46_prefix_is_ip4(&prefix, plen)) { + type = (nat4)?KP_VIP_TYPE_IP4_NAT44:KP_VIP_TYPE_IP4_NAT46; + } else { + type = (nat4)?KP_VIP_TYPE_IP6_NAT64:KP_VIP_TYPE_IP6_NAT66; + } + + kp_garbage_collection(); + + u32 index; + if (!del) { + if ((ret = kp_vip_add(&prefix, plen, type, new_len, &index, + (u16)port, (u16)target_port, (u16)node_port))) { + error = clib_error_return (0, "kp_vip_add error %d", ret); + goto done; + } else { + vlib_cli_output(vm, "kp_vip_add ok %d", index); + } + } else { + if ((ret = kp_vip_find_index(&prefix, plen, &index))) { + error = clib_error_return (0, "kp_vip_find_index error %d", ret); + goto done; + } else if ((ret = kp_vip_del(index))) { + error = clib_error_return (0, "kp_vip_del error %d", ret); + goto done; + } + } + +done: + unformat_free (line_input); + + return error; +} + +VLIB_CLI_COMMAND (kp_vip_command, static) = +{ + .path = "kube-proxy vip", + .short_help = "kube-proxy vip <prefix> port <n> target_port <n>" + " node_port <n> [nat4|nat6)] [new_len <n>] [del]", + .function = kp_vip_command_fn, +}; + +static clib_error_t * +kp_pod_command_fn (vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) +{ + unformat_input_t _line_input, *line_input = &_line_input; + ip46_address_t vip_prefix, pod_addr; + u8 vip_plen; + ip46_address_t *pod_array = 0; + u32 vip_index; + u8 del = 0; + int ret; + clib_error_t *error = 0; + + if (!unformat_user (input, unformat_line_input, line_input)) + return 0; + + if (!unformat(line_input, "%U", unformat_ip46_prefix, &vip_prefix, &vip_plen, IP46_TYPE_ANY)) { + error = clib_error_return (0, "invalid pod address: '%U'", + format_unformat_error, line_input); + goto done; + } + + if ((ret = kp_vip_find_index(&vip_prefix, vip_plen, &vip_index))) { + error = clib_error_return (0, "kp_vip_find_index error %d", ret); + goto done; + } + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) + { + if (unformat(line_input, "%U", unformat_ip46_address, &pod_addr, IP46_TYPE_ANY)) { + vec_add1(pod_array, pod_addr); + } else if (unformat(line_input, "del")) { + del = 1; + } else { + error = clib_error_return (0, "parse error: '%U'", + format_unformat_error, line_input); + goto done; + } + } + + if (!vec_len(pod_array)) { + error = clib_error_return (0, "No POD address provided"); + goto done; + } + + kp_garbage_collection(); + clib_warning("vip index is %d", vip_index); + + if (del) { + if ((ret = kp_vip_del_pods(vip_index, pod_array, vec_len(pod_array)))) { + error = clib_error_return (0, "kp_vip_del_pods error %d", ret); + goto done; + } + } else { + if ((ret = kp_vip_add_pods(vip_index, pod_array, vec_len(pod_array)))) { + error = clib_error_return (0, "kp_vip_add_pods error %d", ret); + goto done; + } + } + +done: + unformat_free (line_input); + vec_free(pod_array); + + return error; +} + +VLIB_CLI_COMMAND (kp_pod_command, static) = +{ + .path = "kube-proxy pod", + .short_help = + "kube-proxy pod <vip-prefix> [<address> [<address> [...]]] [del]", + .function = kp_pod_command_fn, +}; + +static clib_error_t * +kp_conf_command_fn (vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) +{ + kp_main_t *kpm = &kp_main; + unformat_input_t _line_input, *line_input = &_line_input; + u32 per_cpu_sticky_buckets = kpm->per_cpu_sticky_buckets; + u32 per_cpu_sticky_buckets_log2 = 0; + u32 flow_timeout = kpm->flow_timeout; + int ret; + clib_error_t *error = 0; + + if (!unformat_user (input, unformat_line_input, line_input)) + return 0; + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) + { + if (unformat(line_input, "buckets %d", &per_cpu_sticky_buckets)) + ; + else if (unformat(line_input, "buckets-log2 %d", &per_cpu_sticky_buckets_log2)) { + if (per_cpu_sticky_buckets_log2 >= 32) + return clib_error_return (0, "buckets-log2 value is too high"); + per_cpu_sticky_buckets = 1 << per_cpu_sticky_buckets_log2; + } else if (unformat(line_input, "timeout %d", &flow_timeout)) + ; + else { + error = clib_error_return (0, "parse error: '%U'", + format_unformat_error, line_input); + goto done; + } + } + + kp_garbage_collection(); + + if ((ret = kp_conf(per_cpu_sticky_buckets, flow_timeout))) { + error = clib_error_return (0, "kp_conf error %d", ret); + goto done; + } + +done: + unformat_free (line_input); + + return error; +} + +VLIB_CLI_COMMAND (kp_conf_command, static) = +{ + .path = "kube-proxy conf", + .short_help = "kube-proxy conf [buckets <n>] [timeout <s>]", + .function = kp_conf_command_fn, +}; + +static clib_error_t * +kp_show_command_fn (vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) +{ + vlib_cli_output(vm, "%U", format_kp_main); + return NULL; +} + + +VLIB_CLI_COMMAND (kp_show_command, static) = +{ + .path = "show kube-proxy", + .short_help = "show kube-proxy", + .function = kp_show_command_fn, +}; + +static clib_error_t * +kp_show_vips_command_fn (vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) +{ + unformat_input_t line_input; + kp_main_t *kpm = &kp_main; + kp_vip_t *vip; + u8 verbose = 0; + + if (!unformat_user (input, unformat_line_input, &line_input)) + return 0; + + if (unformat(&line_input, "verbose")) + verbose = 1; + + pool_foreach(vip, kpm->vips, { + vlib_cli_output(vm, "%U\n", verbose?format_kp_vip_detailed:format_kp_vip, vip); + }); + + unformat_free (&line_input); + return NULL; +} + +VLIB_CLI_COMMAND (kp_show_vips_command, static) = +{ + .path = "show kube-proxy vips", + .short_help = "show kube-proxy vips [verbose]", + .function = kp_show_vips_command_fn, +}; + +static clib_error_t * +kp_set_interface_nat4_command_fn (vlib_main_t * vm, + unformat_input_t * input, + vlib_cli_command_t * cmd) +{ + unformat_input_t _line_input, *line_input = &_line_input; + vnet_main_t * vnm = vnet_get_main(); + clib_error_t * error = 0; + u32 sw_if_index; + u32 * inside_sw_if_indices = 0; + int is_del = 0; + int i; + + sw_if_index = ~0; + + /* Get a line of input. */ + if (!unformat_user (input, unformat_line_input, line_input)) + return 0; + + while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (line_input, "in %U", unformat_vnet_sw_interface, + vnm, &sw_if_index)) + vec_add1 (inside_sw_if_indices, sw_if_index); + else if (unformat (line_input, "del")) + is_del = 1; + else + { + error = clib_error_return (0, "unknown input '%U'", + format_unformat_error, line_input); + goto done; + } + } + + if (vec_len (inside_sw_if_indices)) + { + for (i = 0; i < vec_len(inside_sw_if_indices); i++) + { + sw_if_index = inside_sw_if_indices[i]; + + if (kp_nat4_interface_add_del (sw_if_index, is_del)) + { + error = clib_error_return (0, "%s %U failed", + is_del ? "del" : "add", + format_vnet_sw_interface_name, vnm, + vnet_get_sw_interface (vnm, + sw_if_index)); + goto done; + } + } + } + +done: + unformat_free (line_input); + vec_free (inside_sw_if_indices); + + return error; +} + +VLIB_CLI_COMMAND (kp_set_interface_nat4_command, static) = { + .path = "kube-proxy set interface nat4", + .function = kp_set_interface_nat4_command_fn, + .short_help = "kube-proxy set interface nat4 in <intfc> [del]", +}; + diff --git a/src/plugins/kubeproxy/kp_node.c b/src/plugins/kubeproxy/kp_node.c new file mode 100644 index 00000000000..5cee6971e35 --- /dev/null +++ b/src/plugins/kubeproxy/kp_node.c @@ -0,0 +1,839 @@ +/* + * Copyright (c) 2016 Intel and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or anated to in writing, software + * distributed under the License is distributed on an "POD IS" BPODIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <vnet/fib/ip4_fib.h> + +#include <kubeproxy/kp.h> +#include <kubeproxy/kphash.h> + +#define foreach_kp_error \ + _(NONE, "no error") \ + _(PROTO_NOT_SUPPORTED, "protocol not supported") + +typedef enum { +#define _(sym,str) KP_ERROR_##sym, + foreach_kp_error +#undef _ + KP_N_ERROR, +} kp_error_t; + +static char *kp_error_strings[] = { +#define _(sym,string) string, + foreach_kp_error +#undef _ +}; + +typedef struct { + u32 vip_index; + u32 pod_index; +} kp_trace_t; + +typedef struct { + u32 vip_index; + u32 node_port; +} kp_nodeport_trace_t; + +typedef struct { + u32 rx_sw_if_index; + u32 next_index; +} kp_nat_trace_t; + +u8 * +format_kp_trace (u8 * s, va_list * args) +{ + kp_main_t *kpm = &kp_main; + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + kp_trace_t *t = va_arg (*args, kp_trace_t *); + if (pool_is_free_index(kpm->vips, t->vip_index)) { + s = format(s, "kp vip[%d]: This VIP was freed since capture\n"); + } else { + s = format(s, "kp vip[%d]: %U\n", t->vip_index, format_kp_vip, &kpm->vips[t->vip_index]); + } + if (pool_is_free_index(kpm->pods, t->pod_index)) { + s = format(s, " kp pod[%d]: This POD was freed since capture"); + } else { + s = format(s, " kp pod[%d]: %U", t->pod_index, format_kp_pod, &kpm->pods[t->pod_index]); + } + return s; +} + +u8 * +format_kp_nat_trace (u8 * s, va_list * args) +{ + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + kp_nat_trace_t *t = va_arg (*args, kp_nat_trace_t *); + + s = format(s, "kp nat: rx_sw_if_index = %d, next_index = %d", + t->rx_sw_if_index, t->next_index); + + return s; +} + +kp_hash_t *kp_get_sticky_table(u32 thread_index) +{ + kp_main_t *kpm = &kp_main; + kp_hash_t *sticky_ht = kpm->per_cpu[thread_index].sticky_ht; + //Check if size changed + if (PREDICT_FALSE(sticky_ht && (kpm->per_cpu_sticky_buckets != kp_hash_nbuckets(sticky_ht)))) + { + //Dereference everything in there + kp_hash_bucket_t *b; + u32 i; + kp_hash_foreach_entry(sticky_ht, b, i) { + vlib_refcount_add(&kpm->pod_refcount, thread_index, b->value[i], -1); + vlib_refcount_add(&kpm->pod_refcount, thread_index, 0, 1); + } + + kp_hash_free(sticky_ht); + sticky_ht = NULL; + } + + //Create if necessary + if (PREDICT_FALSE(sticky_ht == NULL)) { + kpm->per_cpu[thread_index].sticky_ht = kp_hash_alloc(kpm->per_cpu_sticky_buckets, kpm->flow_timeout); + sticky_ht = kpm->per_cpu[thread_index].sticky_ht; + clib_warning("Regenerated sticky table %p", sticky_ht); + } + + ASSERT(sticky_ht); + + //Update timeout + sticky_ht->timeout = kpm->flow_timeout; + return sticky_ht; +} + +u64 +kp_node_get_other_ports4(ip4_header_t *ip40) +{ + return 0; +} + +u64 +kp_node_get_other_ports6(ip6_header_t *ip60) +{ + return 0; +} + +static_always_inline u32 +kp_node_get_hash(vlib_buffer_t *p, u8 is_input_v4) +{ + u32 hash; + if (is_input_v4) + { + ip4_header_t *ip40; + u64 ports; + ip40 = vlib_buffer_get_current (p); + if (PREDICT_TRUE (ip40->protocol == IP_PROTOCOL_TCP || + ip40->protocol == IP_PROTOCOL_UDP)) + ports = ((u64)((udp_header_t *)(ip40 + 1))->src_port << 16) | + ((u64)((udp_header_t *)(ip40 + 1))->dst_port); + else + ports = kp_node_get_other_ports4(ip40); + + hash = kp_hash_hash(*((u64 *)&ip40->address_pair), ports, + 0, 0, 0); + } + else + { + ip6_header_t *ip60; + ip60 = vlib_buffer_get_current (p); + u64 ports; + if (PREDICT_TRUE (ip60->protocol == IP_PROTOCOL_TCP || + ip60->protocol == IP_PROTOCOL_UDP)) + ports = ((u64)((udp_header_t *)(ip60 + 1))->src_port << 16) | + ((u64)((udp_header_t *)(ip60 + 1))->dst_port); + else + ports = kp_node_get_other_ports6(ip60); + + hash = kp_hash_hash(ip60->src_address.as_u64[0], + ip60->src_address.as_u64[1], + ip60->dst_address.as_u64[0], + ip60->dst_address.as_u64[1], + ports); + } + return hash; +} + +static_always_inline uword +kp_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame, + u8 is_input_v4, //Compile-time parameter stating that is input is v4 (or v6) + u8 is_nat_v4) //Compile-time parameter stating that is NAT is v4 (or v6) +{ + kp_main_t *kpm = &kp_main; + u32 n_left_from, *from, next_index, *to_next, n_left_to_next; + u32 thread_index = vlib_get_thread_index(); + u32 kp_time = kp_hash_time_now(vm); + + kp_hash_t *sticky_ht = kp_get_sticky_table(thread_index); + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + next_index = node->cached_next_index; + + u32 nexthash0 = 0; + if (PREDICT_TRUE(n_left_from > 0)) + nexthash0 = kp_node_get_hash(vlib_get_buffer (vm, from[0]), is_input_v4); + + while (n_left_from > 0) + { + vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); + while (n_left_from > 0 && n_left_to_next > 0) + { + u32 pi0; + vlib_buffer_t *p0; + kp_vip_t *vip0; + u32 podindex0; + u32 available_index0; + u8 counter = 0; + u32 hash0 = nexthash0; + + if (PREDICT_TRUE(n_left_from > 1)) + { + vlib_buffer_t *p1 = vlib_get_buffer (vm, from[1]); + //Compute next hash and prefetch bucket + nexthash0 = kp_node_get_hash(p1, is_input_v4); + kp_hash_prefetch_bucket(sticky_ht, nexthash0); + //Prefetch for encap, next + CLIB_PREFETCH (vlib_buffer_get_current(p1) - 64, 64, STORE); + } + + if (PREDICT_TRUE(n_left_from > 2)) + { + vlib_buffer_t *p2; + p2 = vlib_get_buffer(vm, from[2]); + /* prefetch packet header and data */ + vlib_prefetch_buffer_header(p2, STORE); + CLIB_PREFETCH (vlib_buffer_get_current(p2), 64, STORE); + } + + pi0 = to_next[0] = from[0]; + from += 1; + n_left_from -= 1; + to_next += 1; + n_left_to_next -= 1; + + p0 = vlib_get_buffer (vm, pi0); + vip0 = pool_elt_at_index (kpm->vips, + vnet_buffer (p0)->ip.adj_index[VLIB_TX]); + + kp_hash_get(sticky_ht, hash0, vnet_buffer (p0)->ip.adj_index[VLIB_TX], + kp_time, &available_index0, &podindex0); + + if (PREDICT_TRUE(podindex0 != ~0)) + { + //Found an existing entry + counter = KP_VIP_COUNTER_NEXT_PACKET; + } + else if (PREDICT_TRUE(available_index0 != ~0)) + { + //There is an available slot for a new flow + podindex0 = vip0->new_flow_table[hash0 & vip0->new_flow_table_mask].pod_index; + counter = KP_VIP_COUNTER_FIRST_PACKET; + counter = (podindex0 == 0)?KP_VIP_COUNTER_NO_SERVER:counter; + + //Dereference previously used + vlib_refcount_add(&kpm->pod_refcount, thread_index, + kp_hash_available_value(sticky_ht, hash0, available_index0), -1); + vlib_refcount_add(&kpm->pod_refcount, thread_index, + podindex0, 1); + + //Add sticky entry + //Note that when there is no POD configured, an entry is configured anyway. + //But no configured POD is not something that should happen + kp_hash_put(sticky_ht, hash0, podindex0, + vnet_buffer (p0)->ip.adj_index[VLIB_TX], + available_index0, kp_time); + } + else + { + //Could not store new entry in the table + podindex0 = vip0->new_flow_table[hash0 & vip0->new_flow_table_mask].pod_index; + counter = KP_VIP_COUNTER_UNTRACKED_PACKET; + } + + vlib_increment_simple_counter(&kpm->vip_counters[counter], + thread_index, + vnet_buffer (p0)->ip.adj_index[VLIB_TX], + 1); + //Now let's do NAT + { + udp_header_t *port0; + + if ( (is_input_v4==1) && (is_nat_v4==1) ) /* NAT44 */ + { + ip4_header_t *ip40; + ip40 = vlib_buffer_get_current(p0); + port0 = (udp_header_t *)(ip40 + 1); + ip40->dst_address = kpm->pods[podindex0].address.ip4; + ip40->checksum = ip4_header_checksum (ip40); + } + else if ( (is_input_v4==1) && (is_nat_v4==0) ) /* NAT46 */ + { + /* TBD */ + u16 len0 = 0; + ip4_header_t *ip40; + ip40 = vlib_buffer_get_current(p0); + len0 = clib_net_to_host_u16(ip40->length); + + vlib_buffer_advance(p0, (-sizeof(ip6_header_t)+sizeof(ip4_header_t)) ); + ip6_header_t *ip60; + ip60 = vlib_buffer_get_current(p0); + port0 = (udp_header_t *)(ip60 + 1); + ip60->payload_length = len0 - sizeof(ip4_header_t); + ip60->dst_address = kpm->pods[podindex0].address.ip6; + } + else if ( (is_input_v4==0) && (is_nat_v4==0) ) /* NAT66 */ + { + ip6_header_t *ip60; + ip60 = vlib_buffer_get_current(p0); + port0 = (udp_header_t *)(ip60 + 1); + ip60->dst_address = kpm->pods[podindex0].address.ip6; + } + else /* NAT64 */ + { + /* TBD */ + u16 len0 = 0; + ip6_header_t *ip60; + ip60 = vlib_buffer_get_current(p0); + len0 = clib_net_to_host_u16(ip60->payload_length); + + vlib_buffer_advance(p0, (sizeof(ip6_header_t)-sizeof(ip4_header_t)) ); + ip4_header_t *ip40; + ip40 = vlib_buffer_get_current(p0); + port0 = (udp_header_t *)(ip40 + 1); + ip40->length = len0 + sizeof(ip4_header_t); + ip40->dst_address = kpm->pods[podindex0].address.ip4; + ip40->checksum = ip4_header_checksum (ip40); + } + + port0->dst_port = vip0->target_port; + } + + if (PREDICT_FALSE (p0->flags & VLIB_BUFFER_IS_TRACED)) + { + kp_trace_t *tr = vlib_add_trace (vm, node, p0, sizeof (*tr)); + tr->pod_index = podindex0; + tr->vip_index = vnet_buffer (p0)->ip.adj_index[VLIB_TX]; + } + + //Enqueue to next + //Note that this is going to error if podindex0 == 0 + vnet_buffer (p0)->ip.adj_index[VLIB_TX] = kpm->pods[podindex0].dpo.dpoi_index; + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, + n_left_to_next, pi0, + kpm->pods[podindex0].dpo.dpoi_next_node); + } + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + } + + return frame->n_vectors; +} + +u8 * +format_nodeport_kp_trace (u8 * s, va_list * args) +{ + kp_main_t *kpm = &kp_main; + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + kp_nodeport_trace_t *t = va_arg (*args, kp_nodeport_trace_t *); + if (pool_is_free_index(kpm->vips, t->vip_index)) { + s = format(s, "kp vip[%d]: This VIP was freed since capture\n"); + } else { + s = format(s, "kp vip[%d]: %U\n", t->vip_index, format_kp_vip, &kpm->vips[t->vip_index]); + } + + s = format(s, " kp node_port: %d", t->node_port); + + return s; +} +static uword +kp_nodeport_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, + vlib_frame_t * frame, + u8 is_input_v4) +{ + kp_main_t *kpm = &kp_main; + u32 n_left_from, *from, next_index, *to_next, n_left_to_next; + + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + next_index = node->cached_next_index; + + + while (n_left_from > 0) + { + vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); + + while (n_left_from > 0 && n_left_to_next > 0) + { + u32 pi0; + vlib_buffer_t *p0; + udp_header_t * udp_0; + uword * entry0; + u32 next0 = KP_NODEPORT_NEXT_DROP; + + + if (PREDICT_TRUE(n_left_from > 1)) + { + vlib_buffer_t *p1 = vlib_get_buffer (vm, from[1]); + //Prefetch for encap, next + CLIB_PREFETCH (vlib_buffer_get_current(p1) - 64, 64, STORE); + } + + if (PREDICT_TRUE(n_left_from > 2)) + { + vlib_buffer_t *p2; + p2 = vlib_get_buffer(vm, from[2]); + /* prefetch packet header and data */ + vlib_prefetch_buffer_header(p2, STORE); + CLIB_PREFETCH (vlib_buffer_get_current(p2), 64, STORE); + } + + pi0 = to_next[0] = from[0]; + from += 1; + n_left_from -= 1; + to_next += 1; + n_left_to_next -= 1; + + p0 = vlib_get_buffer (vm, pi0); + + if (is_input_v4==1) + { + ip4_header_t *ip40; + vlib_buffer_advance + (p0, -(word)(sizeof(udp_header_t)+sizeof(ip4_header_t))); + ip40 = vlib_buffer_get_current(p0); + udp_0 = (udp_header_t *)(ip40 + 1); + } + else + { + ip6_header_t *ip60; + vlib_buffer_advance + (p0, -(word)(sizeof(udp_header_t)+sizeof(ip6_header_t))); + ip60 = vlib_buffer_get_current(p0); + udp_0 = (udp_header_t *)(ip60 + 1); + } + + entry0 = hash_get_mem(kpm->nodeport_by_key, &(udp_0->dst_port)); + + + if (is_input_v4==1) + { + next0 = KP_NODEPORT_NEXT_IP4_NAT4; + } + else + { + next0 = KP_NODEPORT_NEXT_IP6_NAT6; + } + + if (PREDICT_FALSE (p0->flags & VLIB_BUFFER_IS_TRACED)) + { + kp_nodeport_trace_t *tr = vlib_add_trace (vm, node, + p0, sizeof (*tr)); + tr->vip_index = vnet_buffer (p0)->ip.adj_index[VLIB_TX]; + tr->node_port = (u32)clib_net_to_host_u16(udp_0->dst_port); + } + + //Enqueue to next + vnet_buffer(p0)->ip.adj_index[VLIB_TX] = entry0[0]; + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, + n_left_to_next, pi0, next0); + } + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + } + + return frame->n_vectors; + +} + +/** + * @brief Match NAT4 static mapping. + * + * @param sm NAT main. + * @param match Address and port to match. + * @param mapping External or local address and port of the matched mapping. + * + * @returns 0 if match found otherwise 1. + */ +int kp_nat4_mapping_match (kp_main_t *kpm, + kp_snat4_key_t match, + kp_snat4_key_t * mapping) +{ + clib_bihash_kv_8_8_t kv, value; + kp_snat_mapping_t *m; + kp_snat4_key_t m_key; + clib_bihash_8_8_t *mapping_hash = &kpm->mapping_by_pod; + + m_key.addr = match.addr; + m_key.port = match.port; + m_key.protocol = match.protocol; + m_key.fib_index = match.fib_index; + + kv.key = m_key.as_u64; + + if (clib_bihash_search_8_8 (mapping_hash, &kv, &value)) + { + return 1; + } + + m = pool_elt_at_index (kpm->snat_mappings, value.value); + + if (m->svr_type == KP_SVR_TYPE_VIP_PORT) + { + mapping->addr = m->vip.ip4; + mapping->port = clib_host_to_net_u16 (m->port); + mapping->fib_index = m->fib_index; + mapping->protocol = match.protocol; + } + else if (m->svr_type == KP_SVR_TYPE_NODEIP_PORT) + { + mapping->addr = m->node_ip.ip4; + mapping->port = clib_host_to_net_u16 (m->node_port); + mapping->fib_index = m->fib_index; + mapping->protocol = match.protocol; + } + + return 0; +} + +static uword +kp_nat4_in2out_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, + vlib_frame_t * frame) +{ + u32 n_left_from, * from, * to_next; + kp_nat4_in2out_next_t next_index; + u32 pkts_processed = 0; + kp_main_t *kpm = &kp_main; + u32 stats_node_index; + + stats_node_index = kp_nat4_in2out_node.index; + + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + next_index = node->cached_next_index; + + while (n_left_from > 0) + { + u32 n_left_to_next; + + vlib_get_next_frame (vm, node, next_index, + to_next, n_left_to_next); + + while (n_left_from > 0 && n_left_to_next > 0) + { + u32 bi0; + vlib_buffer_t * b0; + u32 next0; + u32 sw_if_index0; + ip4_header_t * ip0; + ip_csum_t sum0; + u32 new_addr0, old_addr0; + u16 old_port0, new_port0; + udp_header_t * udp0; + tcp_header_t * tcp0; + kp_snat4_key_t key0, sm0; + u32 proto0; + u32 rx_fib_index0; + + /* speculatively enqueue b0 to the current next frame */ + bi0 = from[0]; + to_next[0] = bi0; + from += 1; + to_next += 1; + n_left_from -= 1; + n_left_to_next -= 1; + + b0 = vlib_get_buffer (vm, bi0); + next0 = KP_NAT4_IN2OUT_NEXT_LOOKUP; + + ip0 = vlib_buffer_get_current (b0); + udp0 = ip4_next_header (ip0); + tcp0 = (tcp_header_t *) udp0; + + sw_if_index0 = vnet_buffer(b0)->sw_if_index[VLIB_RX]; + rx_fib_index0 = ip4_fib_table_get_index_for_sw_if_index(sw_if_index0); + + proto0 = kp_ip_proto_to_nat_proto (ip0->protocol); + + if (PREDICT_FALSE (proto0 == ~0)) + goto trace0; + + key0.addr = ip0->src_address; + key0.protocol = proto0; + key0.port = udp0->src_port; + key0.fib_index = rx_fib_index0; + + if (kp_nat4_mapping_match (kpm, key0, &sm0)) + { + next0= KP_NAT4_IN2OUT_NEXT_DROP; + goto trace0; + } + + new_addr0 = sm0.addr.as_u32; + new_port0 = sm0.port; + vnet_buffer(b0)->sw_if_index[VLIB_TX] = sm0.fib_index; + old_addr0 = ip0->src_address.as_u32; + ip0->src_address.as_u32 = new_addr0; + + sum0 = ip0->checksum; + sum0 = ip_csum_update (sum0, old_addr0, new_addr0, + ip4_header_t, + src_address /* changed member */); + ip0->checksum = ip_csum_fold (sum0); + + if (PREDICT_FALSE(new_port0 != udp0->dst_port)) + { + if (PREDICT_TRUE(proto0 == KP_NAT_PROTOCOL_TCP)) + { + old_port0 = tcp0->src_port; + tcp0->src_port = new_port0; + + sum0 = tcp0->checksum; + sum0 = ip_csum_update (sum0, old_addr0, new_addr0, + ip4_header_t, + dst_address /* changed member */); + sum0 = ip_csum_update (sum0, old_port0, new_port0, + ip4_header_t /* cheat */, + length /* changed member */); + tcp0->checksum = ip_csum_fold(sum0); + } + else + { + old_port0 = udp0->src_port; + udp0->src_port = new_port0; + udp0->checksum = 0; + } + } + else + { + if (PREDICT_TRUE(proto0 == KP_NAT_PROTOCOL_TCP)) + { + sum0 = tcp0->checksum; + sum0 = ip_csum_update (sum0, old_addr0, new_addr0, + ip4_header_t, + dst_address /* changed member */); + tcp0->checksum = ip_csum_fold(sum0); + } + } + + trace0: + if (PREDICT_FALSE((node->flags & VLIB_NODE_FLAG_TRACE) + && (b0->flags & VLIB_BUFFER_IS_TRACED))) + { + kp_nat_trace_t *t = + vlib_add_trace (vm, node, b0, sizeof (*t)); + t->rx_sw_if_index = sw_if_index0; + t->next_index = next0; + } + + pkts_processed += next0 != KP_NAT4_IN2OUT_NEXT_DROP; + + /* verify speculative enqueue, maybe switch current next frame */ + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, + to_next, n_left_to_next, + bi0, next0); + } + + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + } + + vlib_node_increment_counter (vm, stats_node_index, + KP_NAT_IN2OUT_ERROR_IN2OUT_PACKETS, + pkts_processed); + return frame->n_vectors; +} + +static uword +kp6_nat6_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + return kp_node_fn(vm, node, frame, 0, 0); +} + +static uword +kp6_nat4_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + return kp_node_fn(vm, node, frame, 0, 1); +} + +static uword +kp4_nat6_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + return kp_node_fn(vm, node, frame, 1, 0); +} + +static uword +kp4_nat4_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + return kp_node_fn(vm, node, frame, 1, 1); +} + +VLIB_REGISTER_NODE (kp6_nat6_node) = +{ + .function = kp6_nat6_node_fn, + .name = "kp6-nat6", + .vector_size = sizeof (u32), + .format_trace = format_kp_trace, + + .n_errors = KP_N_ERROR, + .error_strings = kp_error_strings, + + .n_next_nodes = KP_N_NEXT, + .next_nodes = + { + [KP_NEXT_DROP] = "error-drop" + }, +}; + +VLIB_REGISTER_NODE (kp6_nat4_node) = +{ + .function = kp6_nat4_node_fn, + .name = "kp6-nat4", + .vector_size = sizeof (u32), + .format_trace = format_kp_trace, + + .n_errors = KP_N_ERROR, + .error_strings = kp_error_strings, + + .n_next_nodes = KP_N_NEXT, + .next_nodes = + { + [KP_NEXT_DROP] = "error-drop" + }, +}; + +VLIB_REGISTER_NODE (kp4_nat6_node) = +{ + .function = kp4_nat6_node_fn, + .name = "kp4-nat6", + .vector_size = sizeof (u32), + .format_trace = format_kp_trace, + + .n_errors = KP_N_ERROR, + .error_strings = kp_error_strings, + + .n_next_nodes = KP_N_NEXT, + .next_nodes = + { + [KP_NEXT_DROP] = "error-drop" + }, +}; + +VLIB_REGISTER_NODE (kp4_nat4_node) = +{ + .function = kp4_nat4_node_fn, + .name = "kp4-nat4", + .vector_size = sizeof (u32), + .format_trace = format_kp_trace, + + .n_errors = KP_N_ERROR, + .error_strings = kp_error_strings, + + .n_next_nodes = KP_N_NEXT, + .next_nodes = + { + [KP_NEXT_DROP] = "error-drop" + }, +}; + +static uword +kp4_nodeport_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, + vlib_frame_t * frame) +{ + return kp_nodeport_node_fn(vm, node, frame, 1); +} + +static uword +kp6_nodeport_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, + vlib_frame_t * frame) +{ + return kp_nodeport_node_fn(vm, node, frame, 0); +} + +VLIB_REGISTER_NODE (kp4_nodeport_node) = +{ + .function = kp4_nodeport_node_fn, + .name = "kp4-nodeport", + .vector_size = sizeof (u32), + .format_trace = format_nodeport_kp_trace, + + .n_errors = KP_N_ERROR, + .error_strings = kp_error_strings, + + .n_next_nodes = KP_NODEPORT_N_NEXT, + .next_nodes = + { + [KP_NODEPORT_NEXT_IP4_NAT4] = "kp4-nat4", + [KP_NODEPORT_NEXT_IP4_NAT6] = "kp4-nat6", + [KP_NODEPORT_NEXT_IP6_NAT4] = "kp6-nat4", + [KP_NODEPORT_NEXT_IP6_NAT6] = "kp6-nat6", + [KP_NODEPORT_NEXT_DROP] = "error-drop", + }, +}; + +VLIB_REGISTER_NODE (kp6_nodeport_node) = +{ + .function = kp6_nodeport_node_fn, + .name = "kp6-nodeport", + .vector_size = sizeof (u32), + .format_trace = format_nodeport_kp_trace, + + .n_errors = KP_N_ERROR, + .error_strings = kp_error_strings, + + .n_next_nodes = KP_NODEPORT_N_NEXT, + .next_nodes = + { + [KP_NODEPORT_NEXT_IP4_NAT4] = "kp4-nat4", + [KP_NODEPORT_NEXT_IP4_NAT6] = "kp4-nat6", + [KP_NODEPORT_NEXT_IP6_NAT4] = "kp6-nat4", + [KP_NODEPORT_NEXT_IP6_NAT6] = "kp6-nat6", + [KP_NODEPORT_NEXT_DROP] = "error-drop", + }, +}; + +VNET_FEATURE_INIT (kp_nat4_in2out_node_fn, static) = +{ + .arc_name = "ip4-unicast", + .node_name = "kp-nat4-in2out", + .runs_before = VNET_FEATURES ("ip4-lookup"), +}; + +VLIB_REGISTER_NODE (kp_nat4_in2out_node) = +{ + .function = kp_nat4_in2out_node_fn, + .name = "kp-nat4-in2out", + .vector_size = sizeof (u32), + .format_trace = format_kp_nat_trace, + + .n_errors = KP_N_ERROR, + .error_strings = kp_error_strings, + + .n_next_nodes = KP_NAT4_IN2OUT_N_NEXT, + .next_nodes = + { + [KP_NAT4_IN2OUT_NEXT_DROP] = "error-drop", + [KP_NAT4_IN2OUT_NEXT_LOOKUP] = "ip4-lookup", + }, +}; diff --git a/src/plugins/kubeproxy/kp_plugin_doc.md b/src/plugins/kubeproxy/kp_plugin_doc.md new file mode 100644 index 00000000000..e1247916bd5 --- /dev/null +++ b/src/plugins/kubeproxy/kp_plugin_doc.md @@ -0,0 +1,105 @@ +# Kube-proxy plugin for VPP + +## Overview + +This plugin provides kube-proxy data plane on user space, +which is used to replace linux kernal's kube-proxy based on iptables. +The idea is largely inspired from VPP LB plugin. + +Currently, kube-proxy plugin supports three service types: +1) Cluster IP plus Port: support any protocols, including TCP, UDP. +2) Node IP plus Node Port: currently only support UDP. +3) External Load Balancer. + +For Cluster IP plus Port case: +kube-proxy is configured with a set of Virtual IPs (VIP, which can be +prefixes), and for each VIP, with a set of POD addresses (PODs). + +For a specific session received for a given VIP (or VIP prefix), +first packet selects a Pod according to internal load balancing algorithm, +then does DNAT operation and sent to chosen Pod. +At the same time, will create a session entry to store Pod chosen result. +Following packets for that session will look up session table first, +which ensures that a given session will always be routed to the same Pod. + +For returned packet from Pod, it will do SNAT operation and sent out. + +Please refer to below for details: +https://schd.ws/hosted_files/ossna2017/1e/VPP_K8S_GTPU_OSSNA.pdf + + +## Configuration + +### Global KP parameters + +The kube-proxy needs to be configured with some parameters: + + ku conf [buckets <n>] [timeout <s>] + +buckets: the *per-thread* established-connections-table number of buckets. + +timeout: the number of seconds a connection will remain in the + established-connections-table while no packet for this flow + is received. + +### Configure VIPs and Ports + + ku vip <prefix> port <n> target_port <n> node_port <n> \ + [nat4|nat6)] [new_len <n>] [del] + +new_len is the size of the new-connection-table. It should be 1 or 2 orders of +magnitude bigger than the number of PODs for the VIP in order to ensure a good +load balancing. + +Examples: + + ku vip 90.0.0.0/8 nat44 new_len 2048 + ku vip 2003::/16 nat66 new_len 2048 + +### Configure PODs (for each VIP) + + ku pod <vip-prefix> [<address> [<address> [...]]] [del] + +You can add (or delete) as many PODs at a time (for a single VIP). + +Examples: + + ku pod 90.0.0.0/8 10.0.0.1 + ku pod 2002::/16 2001::2 2001::3 2001::4 + +### Configure SNAT + + ku set interface nat4 in <intfc> [del] + +Set SNAT feature in a specific interface. + + +## Monitoring + +The plugin provides quite a bunch of counters and information. + + show ku + show ku vip verbose + show node counters + + +## Design notes + +### Multi-Threading + +This implementation implement parallelism by using +one established-connections table per thread. This is equivalent to assuming +that RSS will make a job similar to ECMP, and is pretty useful as threads don't +need to get a lock in order to write in the table. + +### Hash Table + +A kube-proxy requires an efficient read and write Hash table. The Hash table +used by ip6-forward is very read-efficient, but not so much for writing. In +addition, it is not a big deal if writing into the Hash table fails. + +The plugin therefore uses a very specific Hash table. + - Fixed (and power of 2) number of buckets (configured at runtime) + - Fixed (and power of 2) elements per buckets (configured at compilation time) + + diff --git a/src/plugins/kubeproxy/kp_test.c b/src/plugins/kubeproxy/kp_test.c new file mode 100644 index 00000000000..895a6adedaa --- /dev/null +++ b/src/plugins/kubeproxy/kp_test.c @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2016 Intel and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "POD IS" BPODIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <vat/vat.h> +#include <vlibapi/api.h> +#include <vlibmemory/api.h> +#include <vppinfra/error.h> +#include <kubeproxy/kp.h> + +#define __plugin_msg_base kp_test_main.msg_id_base +#include <vlibapi/vat_helper_macros.h> + +//TODO: Move that to vat/plugin_api.c +////////////////////////// +uword unformat_ip46_address (unformat_input_t * input, va_list * args) +{ + ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); + ip46_type_t type = va_arg (*args, ip46_type_t); + if ((type != IP46_TYPE_IP6) && + unformat(input, "%U", unformat_ip4_address, &ip46->ip4)) { + ip46_address_mask_ip4(ip46); + return 1; + } else if ((type != IP46_TYPE_IP4) && + unformat(input, "%U", unformat_ip6_address, &ip46->ip6)) { + return 1; + } + return 0; +} +uword unformat_ip46_prefix (unformat_input_t * input, va_list * args) +{ + ip46_address_t *ip46 = va_arg (*args, ip46_address_t *); + u8 *len = va_arg (*args, u8 *); + ip46_type_t type = va_arg (*args, ip46_type_t); + + u32 l; + if ((type != IP46_TYPE_IP6) && unformat(input, "%U/%u", unformat_ip4_address, &ip46->ip4, &l)) { + if (l > 32) + return 0; + *len = l + 96; + ip46->pad[0] = ip46->pad[1] = ip46->pad[2] = 0; + } else if ((type != IP46_TYPE_IP4) && unformat(input, "%U/%u", unformat_ip6_address, &ip46->ip6, &l)) { + if (l > 128) + return 0; + *len = l; + } else { + return 0; + } + return 1; +} +///////////////////////// + +#define vl_msg_id(n,h) n, +typedef enum { +#include <kubeproxy/kp.api.h> + /* We'll want to know how many messages IDs we need... */ + VL_MSG_FIRST_AVAILABLE, +} vl_msg_id_t; +#undef vl_msg_id + +/* define message structures */ +#define vl_typedefs +#include <kubeproxy/kp.api.h> +#undef vl_typedefs + +/* declare message handlers for each api */ + +#define vl_endianfun /* define message structures */ +#include <kubeproxy/kp.api.h> +#undef vl_endianfun + +/* instantiate all the print functions we know about */ +#define vl_print(handle, ...) +#define vl_printfun +#include <kubeproxy/kp.api.h> +#undef vl_printfun + +/* Get the API version number. */ +#define vl_api_version(n,v) static u32 api_version=(v); +#include <kubeproxy/kp.api.h> +#undef vl_api_version + +typedef struct { + /* API message ID base */ + u16 msg_id_base; + vat_main_t *vat_main; +} kp_test_main_t; + +kp_test_main_t kp_test_main; + +#define foreach_standard_reply_retval_handler \ +_(kp_conf_reply) \ +_(kp_add_del_vip_reply) \ +_(kp_add_del_pod_reply) + +#define _(n) \ + static void vl_api_##n##_t_handler \ + (vl_api_##n##_t * mp) \ + { \ + vat_main_t * vam = kp_test_main.vat_main; \ + i32 retval = ntohl(mp->retval); \ + if (vam->async_mode) { \ + vam->async_errors += (retval < 0); \ + } else { \ + vam->retval = retval; \ + vam->result_ready = 1; \ + } \ + } +foreach_standard_reply_retval_handler; +#undef _ + +/* + * Table of message reply handlers, must include boilerplate handlers + * we just generated + */ +#define foreach_vpe_api_reply_msg \ + _(KP_CONF_REPLY, kp_conf_reply) \ + _(KP_ADD_DEL_VIP_REPLY, kp_add_del_vip_reply) \ + _(KP_ADD_DEL_POD_REPLY, kp_add_del_pod_reply) + +static int api_kp_conf (vat_main_t * vam) +{ + unformat_input_t *i = vam->input; + vl_api_kp_conf_t mps, *mp; + int ret; + + if (!unformat(i, "%u %u", + &mps.sticky_buckets_per_core, + &mps.flow_timeout)) { + errmsg ("invalid arguments\n"); + return -99; + } + + M(KP_CONF, mp); + S(mp); + W (ret); + return ret; +} + +static int api_kp_add_del_vip (vat_main_t * vam) +{ + unformat_input_t * i = vam->input; + vl_api_kp_add_del_vip_t mps, *mp; + int ret; + mps.is_del = 0; + mps.is_nat4 = 0; + + if (!unformat(i, "%U", + unformat_ip46_prefix, mps.ip_prefix, &mps.prefix_length, IP46_TYPE_ANY)) { + errmsg ("invalid prefix\n"); + return -99; + } + + if (unformat(i, "nat4")) { + mps.is_nat4 = 1; + } else if (unformat(i, "nat6")) { + mps.is_nat4 = 0; + } else { + errmsg ("no nat\n"); + return -99; + } + + if (!unformat(i, "%d", &mps.new_flows_table_length)) { + errmsg ("no table lentgh\n"); + return -99; + } + + if (unformat(i, "del")) { + mps.is_del = 1; + } + + M(KP_ADD_DEL_VIP, mp); + S(mp); + W (ret); + return ret; +} + +static int api_kp_add_del_pod (vat_main_t * vam) +{ + unformat_input_t * i = vam->input; + vl_api_kp_add_del_pod_t mps, *mp; + int ret; + mps.is_del = 0; + + if (!unformat(i, "%U %U", + unformat_ip46_prefix, mps.vip_ip_prefix, &mps.vip_prefix_length, IP46_TYPE_ANY, + unformat_ip46_address, mps.pod_address)) { + errmsg ("invalid prefix or address\n"); + return -99; + } + + if (unformat(i, "del")) { + mps.is_del = 1; + } + + M(KP_ADD_DEL_POD, mp); + S(mp); + W (ret); + return ret; +} + +/* + * List of messages that the api test plugin sends, + * and that the data plane plugin processes + */ +#define foreach_vpe_api_msg \ +_(kp_conf, "<sticky_buckets_per_core> <flow_timeout>") \ +_(kp_add_del_vip, "<ip-prefix> <port> <target_port> <node_port> " \ + "[nat4|nat6] <new_table_len> [del]") \ +_(kp_add_del_pod, "<vip-ip-prefix> <address> [del]") + +static void +kp_vat_api_hookup (vat_main_t *vam) +{ + kp_test_main_t * kptm = &kp_test_main; + /* Hook up handlers for replies from the data plane plug-in */ +#define _(N,n) \ + vl_msg_api_set_handlers((VL_API_##N + kptm->msg_id_base), \ + #n, \ + vl_api_##n##_t_handler, \ + vl_noop_handler, \ + vl_api_##n##_t_endian, \ + vl_api_##n##_t_print, \ + sizeof(vl_api_##n##_t), 1); + foreach_vpe_api_reply_msg; +#undef _ + + /* API messages we can send */ +#define _(n,h) hash_set_mem (vam->function_by_name, #n, api_##n); + foreach_vpe_api_msg; +#undef _ + + /* Help strings */ +#define _(n,h) hash_set_mem (vam->help_by_name, #n, h); + foreach_vpe_api_msg; +#undef _ +} + +clib_error_t * vat_plugin_register (vat_main_t *vam) +{ + kp_test_main_t * kptm = &kp_test_main; + + u8 * name; + + kptm->vat_main = vam; + + /* Ask the vpp engine for the first assigned message-id */ + name = format (0, "kp_%08x%c", api_version, 0); + kptm->msg_id_base = vl_client_get_first_plugin_msg_id ((char *) name); + + if (kptm->msg_id_base != (u16) ~0) + kp_vat_api_hookup (vam); + + vec_free(name); + + return 0; +} diff --git a/src/plugins/kubeproxy/kphash.h b/src/plugins/kubeproxy/kphash.h new file mode 100644 index 00000000000..2957aeb2a53 --- /dev/null +++ b/src/plugins/kubeproxy/kphash.h @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2017 Intel and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * vppinfra already includes tons of different hash tables. + * MagLev flow table is a bit different. It has to be very efficient + * for both writing and reading operations. But it does not need to + * be 100% reliable (write can fail). It also needs to recycle + * old entries in a lazy way. + * + * This hash table is the most dummy hash table you can do. + * Fixed total size, fixed bucket size. + * Advantage is that it could be very efficient (maybe). + * + */ + +#ifndef KP_PLUGIN_KP_KPHASH_H_ +#define KP_PLUGIN_KP_KPHASH_H_ + +#include <vnet/vnet.h> +#include <vppinfra/xxhash.h> +#include <vppinfra/crc32.h> + +/* + * @brief Number of entries per bucket. + */ +#define KPHASH_ENTRY_PER_BUCKET 4 + +#define KP_HASH_DO_NOT_USE_SSE_BUCKETS 0 + +/** + * 32 bits integer comparison for running values. + * 1 > 0 is true. But 1 > 0xffffffff also is. + */ +#define clib_u32_loop_gt(a, b) (((u32)(a)) - ((u32)(b)) < 0x7fffffff) + +/* + * @brief One bucket contains 4 entries. + * Each bucket takes one 64B cache line in memory. + */ +typedef struct { + CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + u32 hash[KPHASH_ENTRY_PER_BUCKET]; + u32 timeout[KPHASH_ENTRY_PER_BUCKET]; + u32 vip[KPHASH_ENTRY_PER_BUCKET]; + u32 value[KPHASH_ENTRY_PER_BUCKET]; +} kp_hash_bucket_t; + +typedef struct { + u32 buckets_mask; + u32 timeout; + kp_hash_bucket_t buckets[]; +} kp_hash_t; + +#define kp_hash_nbuckets(h) (((h)->buckets_mask) + 1) +#define kp_hash_size(h) ((h)->buckets_mask + KPHASH_ENTRY_PER_BUCKET) + +#define kp_hash_foreach_bucket(h, bucket) \ + for (bucket = (h)->buckets; \ + bucket < (h)->buckets + kp_hash_nbuckets(h); \ + bucket++) + +#define kp_hash_foreach_entry(h, bucket, i) \ + kp_hash_foreach_bucket(h, bucket) \ + for (i = 0; i < KPHASH_ENTRY_PER_BUCKET; i++) + +#define kp_hash_foreach_valid_entry(h, bucket, i, now) \ + kp_hash_foreach_entry(h, bucket, i) \ + if (!clib_u32_loop_gt((now), bucket->timeout[i])) + +static_always_inline +kp_hash_t *kp_hash_alloc(u32 buckets, u32 timeout) +{ + if (!is_pow2(buckets)) + return NULL; + + // Allocate 1 more bucket for prefetch + u32 size = ((u64)&((kp_hash_t *)(0))->buckets[0]) + + sizeof(kp_hash_bucket_t) * (buckets + 1); + u8 *mem = 0; + kp_hash_t *h; + vec_alloc_aligned(mem, size, CLIB_CACHE_LINE_BYTES); + h = (kp_hash_t *)mem; + h->buckets_mask = (buckets - 1); + h->timeout = timeout; + return h; +} + +static_always_inline +void kp_hash_free(kp_hash_t *h) +{ + u8 *mem = (u8 *)h; + vec_free(mem); +} + +static_always_inline +u32 kp_hash_hash(u64 k0, u64 k1, u64 k2, u64 k3, u64 k4) +{ +#ifdef clib_crc32c_uses_intrinsics + u64 key[5]; + key[0] = k0; + key[1] = k1; + key[2] = k2; + key[3] = k3; + key[4] = k4; + return clib_crc32c ((u8 *) key, 40); +#else + u64 tmp = k0 ^ k1 ^ k2 ^ k3 ^ k4; + return (u32)clib_xxhash (tmp); +#endif +} + +static_always_inline +void kp_hash_prefetch_bucket(kp_hash_t *ht, u32 hash) +{ + kp_hash_bucket_t *bucket = &ht->buckets[hash & ht->buckets_mask]; + CLIB_PREFETCH(bucket, sizeof(*bucket), READ); +} + +static_always_inline +void kp_hash_get(kp_hash_t *ht, u32 hash, u32 vip, u32 time_now, + u32 *available_index, u32 *found_value) +{ + kp_hash_bucket_t *bucket = &ht->buckets[hash & ht->buckets_mask]; + *found_value = ~0; + *available_index = ~0; +#if __SSE4_2__ && KP_HASH_DO_NOT_USE_SSE_BUCKETS == 0 + u32 bitmask, found_index; + __m128i mask; + + // mask[*] = timeout[*] > now + mask = _mm_cmpgt_epi32(_mm_loadu_si128 ((__m128i *) bucket->timeout), + _mm_set1_epi32 (time_now)); + // bitmask[*] = now <= timeout[*/4] + bitmask = (~_mm_movemask_epi8(mask)) & 0xffff; + // Get first index with now <= timeout[*], if any. + *available_index = (bitmask)?__builtin_ctz(bitmask)/4:*available_index; + + // mask[*] = (timeout[*] > now) && (hash[*] == hash) + mask = _mm_and_si128(mask, + _mm_cmpeq_epi32( + _mm_loadu_si128 ((__m128i *) bucket->hash), + _mm_set1_epi32 (hash))); + + // Load the array of vip values + // mask[*] = (timeout[*] > now) && (hash[*] == hash) && (vip[*] == vip) + mask = _mm_and_si128(mask, + _mm_cmpeq_epi32( + _mm_loadu_si128 ((__m128i *) bucket->vip), + _mm_set1_epi32 (vip))); + + // mask[*] = (timeout[*x4] > now) && (hash[*x4] == hash) && (vip[*x4] == vip) + bitmask = _mm_movemask_epi8(mask); + // Get first index, if any + found_index = (bitmask)?__builtin_ctzll(bitmask)/4:0; + ASSERT(found_index < 4); + *found_value = (bitmask)?bucket->value[found_index]:*found_value; + bucket->timeout[found_index] = + (bitmask)?time_now + ht->timeout:bucket->timeout[found_index]; +#else + u32 i; + for (i = 0; i < KPHASH_ENTRY_PER_BUCKET; i++) { + u8 cmp = (bucket->hash[i] == hash && bucket->vip[i] == vip); + u8 timeouted = clib_u32_loop_gt(time_now, bucket->timeout[i]); + *found_value = (cmp || timeouted)?*found_value:bucket->value[i]; + bucket->timeout[i] = (cmp || timeouted)?time_now + ht->timeout:bucket->timeout[i]; + *available_index = (timeouted && (*available_index == ~0))?i:*available_index; + + if (!cmp) + return; + } +#endif +} + +static_always_inline +u32 kp_hash_available_value(kp_hash_t *h, u32 hash, u32 available_index) +{ + return h->buckets[hash & h->buckets_mask].value[available_index]; +} + +static_always_inline +void kp_hash_put(kp_hash_t *h, u32 hash, u32 value, u32 vip, + u32 available_index, u32 time_now) +{ + kp_hash_bucket_t *bucket = &h->buckets[hash & h->buckets_mask]; + bucket->hash[available_index] = hash; + bucket->value[available_index] = value; + bucket->timeout[available_index] = time_now + h->timeout; + bucket->vip[available_index] = vip; +} + +static_always_inline +u32 kp_hash_elts(kp_hash_t *h, u32 time_now) +{ + u32 tot = 0; + kp_hash_bucket_t *bucket; + u32 i; + kp_hash_foreach_valid_entry(h, bucket, i, time_now) { + tot++; + } + return tot; +} + +#endif /* KP_PLUGIN_KP_KPHASH_H_ */ |