diff options
Diffstat (limited to 'src/plugins/linux-cp/lcp_nl.c')
-rw-r--r-- | src/plugins/linux-cp/lcp_nl.c | 490 |
1 files changed, 463 insertions, 27 deletions
diff --git a/src/plugins/linux-cp/lcp_nl.c b/src/plugins/linux-cp/lcp_nl.c index 1c0ca0d5cd5..62868f35dc4 100644 --- a/src/plugins/linux-cp/lcp_nl.c +++ b/src/plugins/linux-cp/lcp_nl.c @@ -31,6 +31,7 @@ #include <vlib/vlib.h> #include <vlib/unix/unix.h> #include <vppinfra/error.h> +#include <vppinfra/linux/netns.h> #include <vnet/fib/fib_table.h> @@ -38,6 +39,29 @@ #include <plugins/linux-cp/lcp_interface.h> +typedef enum nl_status_t_ +{ + NL_STATUS_NOTIF_PROC, + NL_STATUS_SYNC, +} nl_status_t; + +typedef enum nl_sock_type_t_ +{ + NL_SOCK_TYPE_LINK, + NL_SOCK_TYPE_ADDR, + NL_SOCK_TYPE_NEIGH, + NL_SOCK_TYPE_ROUTE, +} nl_sock_type_t; + +#define NL_SOCK_TYPES_N (NL_SOCK_TYPE_ROUTE + 1) + +/* Socket type, message type, type name, function subname */ +#define foreach_sock_type \ + _ (NL_SOCK_TYPE_LINK, RTM_GETLINK, "link", link) \ + _ (NL_SOCK_TYPE_ADDR, RTM_GETADDR, "address", link_addr) \ + _ (NL_SOCK_TYPE_NEIGH, RTM_GETNEIGH, "neighbor", neigh) \ + _ (NL_SOCK_TYPE_ROUTE, RTM_GETROUTE, "route", route) + typedef enum nl_event_type_t_ { NL_EVENT_READ, @@ -47,7 +71,10 @@ typedef enum nl_event_type_t_ typedef struct nl_main { + nl_status_t nl_status; + struct nl_sock *sk_route; + struct nl_sock *sk_route_sync[NL_SOCK_TYPES_N]; vlib_log_class_t nl_logger; nl_vft_t *nl_vfts; struct nl_cache *nl_caches[LCP_NL_N_OBJS]; @@ -59,6 +86,10 @@ typedef struct nl_main u32 batch_size; u32 batch_delay_ms; + u32 sync_batch_limit; + u32 sync_batch_delay_ms; + u32 sync_attempt_delay_ms; + } nl_main_t; #define NL_RX_BUF_SIZE_DEF (1 << 27) /* 128 MB */ @@ -66,11 +97,18 @@ typedef struct nl_main #define NL_BATCH_SIZE_DEF (1 << 11) /* 2048 */ #define NL_BATCH_DELAY_MS_DEF 50 /* 50 ms, max 20 batch/s */ +#define NL_SYNC_BATCH_LIMIT_DEF (1 << 10) /* 1024 */ +#define NL_SYNC_BATCH_DELAY_MS_DEF 20 /* 20ms, max 50 batch/s */ +#define NL_SYNC_ATTEMPT_DELAY_MS_DEF 2000 /* 2s */ + static nl_main_t nl_main = { .rx_buf_size = NL_RX_BUF_SIZE_DEF, .tx_buf_size = NL_TX_BUF_SIZE_DEF, .batch_size = NL_BATCH_SIZE_DEF, .batch_delay_ms = NL_BATCH_DELAY_MS_DEF, + .sync_batch_limit = NL_SYNC_BATCH_LIMIT_DEF, + .sync_batch_delay_ms = NL_SYNC_BATCH_DELAY_MS_DEF, + .sync_attempt_delay_ms = NL_SYNC_ATTEMPT_DELAY_MS_DEF, }; /* #define foreach_nl_nft_proto \ */ @@ -103,6 +141,25 @@ static nl_main_t nl_main = { } \ } +#define FOREACH_VFT_NO_ARG(__func) \ + { \ + nl_main_t *nm = &nl_main; \ + nl_vft_t *__nv; \ + vec_foreach (__nv, nm->nl_vfts) \ + { \ + if (!__nv->__func.cb) \ + continue; \ + \ + if (!__nv->__func.is_mp_safe) \ + vlib_worker_thread_barrier_sync (vlib_get_main ()); \ + \ + __nv->__func.cb (); \ + \ + if (!__nv->__func.is_mp_safe) \ + vlib_worker_thread_barrier_release (vlib_get_main ()); \ + } \ + } + #define FOREACH_VFT_CTX(__func, __arg, __ctx) \ { \ nl_main_t *nm = &nl_main; \ @@ -136,6 +193,8 @@ nl_register_vft (const nl_vft_t *nv) static void lcp_nl_open_socket (void); static void lcp_nl_close_socket (void); +static void lcp_nl_open_sync_socket (nl_sock_type_t sock_type); +static void lcp_nl_close_sync_socket (nl_sock_type_t sock_type); static void nl_route_del (struct rtnl_route *rr, void *arg) @@ -150,6 +209,18 @@ nl_route_add (struct rtnl_route *rr, void *arg) } static void +nl_route_sync_begin (void) +{ + FOREACH_VFT_NO_ARG (nvl_rt_route_sync_begin); +} + +static void +nl_route_sync_end (void) +{ + FOREACH_VFT_NO_ARG (nvl_rt_route_sync_end); +} + +static void nl_neigh_del (struct rtnl_neigh *rn, void *arg) { FOREACH_VFT (nvl_rt_neigh_del, rn); @@ -162,6 +233,18 @@ nl_neigh_add (struct rtnl_neigh *rn, void *arg) } static void +nl_neigh_sync_begin (void) +{ + FOREACH_VFT_NO_ARG (nvl_rt_neigh_sync_begin); +} + +static void +nl_neigh_sync_end (void) +{ + FOREACH_VFT_NO_ARG (nvl_rt_neigh_sync_end); +} + +static void nl_link_addr_del (struct rtnl_addr *rla, void *arg) { FOREACH_VFT (nvl_rt_addr_del, rla); @@ -174,6 +257,18 @@ nl_link_addr_add (struct rtnl_addr *rla, void *arg) } static void +nl_link_addr_sync_begin (void) +{ + FOREACH_VFT_NO_ARG (nvl_rt_addr_sync_begin); +} + +static void +nl_link_addr_sync_end (void) +{ + FOREACH_VFT_NO_ARG (nvl_rt_addr_sync_end); +} + +static void nl_link_del (struct rtnl_link *rl, void *arg) { FOREACH_VFT_CTX (nvl_rt_link_del, rl, arg); @@ -186,6 +281,18 @@ nl_link_add (struct rtnl_link *rl, void *arg) } static void +nl_link_sync_begin (void) +{ + FOREACH_VFT_NO_ARG (nvl_rt_link_sync_begin); +} + +static void +nl_link_sync_end (void) +{ + FOREACH_VFT_NO_ARG (nvl_rt_link_sync_end); +} + +static void nl_route_dispatch (struct nl_object *obj, void *arg) { /* nothing can be done without interface mappings */ @@ -251,6 +358,170 @@ nl_route_process_msgs (void) return n_msgs; } +static int +lcp_nl_route_discard_msgs (void) +{ + nl_main_t *nm = &nl_main; + nl_msg_info_t *msg_info; + int n_msgs; + + n_msgs = vec_len (nm->nl_msg_queue); + if (n_msgs == 0) + return 0; + + vec_foreach (msg_info, nm->nl_msg_queue) + { + nlmsg_free (msg_info->msg); + } + + vec_reset_length (nm->nl_msg_queue); + + NL_INFO ("Discarded %u messages", n_msgs); + + return n_msgs; +} + +static int +lcp_nl_route_send_dump_req (nl_sock_type_t sock_type, int msg_type) +{ + nl_main_t *nm = &nl_main; + struct nl_sock *sk_route = nm->sk_route_sync[sock_type]; + int err; + struct rtgenmsg rt_hdr = { + .rtgen_family = AF_UNSPEC, + }; + + err = + nl_send_simple (sk_route, msg_type, NLM_F_DUMP, &rt_hdr, sizeof (rt_hdr)); + + if (err < 0) + { + NL_ERROR ("Unable to send a dump request: %s", nl_geterror (err)); + } + else + NL_INFO ("Dump request sent via socket %d of type %d", + nl_socket_get_fd (sk_route), sock_type); + + return err; +} + +static int +lcp_nl_route_dump_cb (struct nl_msg *msg, void *arg) +{ + int err; + + if ((err = nl_msg_parse (msg, nl_route_dispatch, NULL)) < 0) + NL_ERROR ("Unable to parse object: %s", nl_geterror (err)); + + return NL_OK; +} + +static int +lcp_nl_recv_dump_replies (nl_sock_type_t sock_type, int msg_limit, + int *is_done_rcvd) +{ + nl_main_t *nm = &nl_main; + struct nl_sock *sk_route = nm->sk_route_sync[sock_type]; + struct sockaddr_nl nla; + uint8_t *buf = NULL; + int n_bytes; + struct nlmsghdr *hdr; + struct nl_msg *msg = NULL; + int err = 0; + int done = 0; + int n_msgs = 0; + +continue_reading: + n_bytes = nl_recv (sk_route, &nla, &buf, /* creds */ NULL); + if (n_bytes <= 0) + return n_bytes; + + hdr = (struct nlmsghdr *) buf; + while (nlmsg_ok (hdr, n_bytes)) + { + nlmsg_free (msg); + msg = nlmsg_convert (hdr); + if (!msg) + { + err = -NLE_NOMEM; + goto out; + } + + n_msgs++; + + nlmsg_set_proto (msg, NETLINK_ROUTE); + nlmsg_set_src (msg, &nla); + + /* Message that terminates a multipart message. Finish parsing and signal + * the caller that all dump replies have been received + */ + if (hdr->nlmsg_type == NLMSG_DONE) + { + done = 1; + goto out; + } + /* Message to be ignored. Continue parsing */ + else if (hdr->nlmsg_type == NLMSG_NOOP) + ; + /* Message that indicates data was lost. Finish parsing and return an + * error + */ + else if (hdr->nlmsg_type == NLMSG_OVERRUN) + { + err = -NLE_MSG_OVERFLOW; + goto out; + } + /* Message that indicates an error. Finish parsing, extract the error + * code, and return it */ + else if (hdr->nlmsg_type == NLMSG_ERROR) + { + struct nlmsgerr *e = nlmsg_data (hdr); + + if (hdr->nlmsg_len < nlmsg_size (sizeof (*e))) + { + err = -NLE_MSG_TRUNC; + goto out; + } + else if (e->error) + { + err = -nl_syserr2nlerr (e->error); + goto out; + } + /* Message is an acknowledgement (err_code = 0). Continue parsing */ + else + ; + } + /* Message that contains the requested data. Pass it for processing and + * continue parsing + */ + else + { + lcp_nl_route_dump_cb (msg, NULL); + } + + hdr = nlmsg_next (hdr, &n_bytes); + } + + nlmsg_free (msg); + free (buf); + msg = NULL; + buf = NULL; + + if (!done && n_msgs < msg_limit) + goto continue_reading; + +out: + nlmsg_free (msg); + free (buf); + + if (err) + return err; + + *is_done_rcvd = done; + + return n_msgs; +} + #define DAY_F64 (1.0 * (24 * 60 * 60)) static uword @@ -261,38 +532,150 @@ nl_route_process (vlib_main_t *vm, vlib_node_runtime_t *node, uword event_type; uword *event_data = 0; f64 wait_time = DAY_F64; + int n_msgs; + int is_done; while (1) { - /* If we process a batch of messages and stop because we reached the - * batch size limit, we want to wake up after the batch delay and - * process more. Otherwise we just want to wait for a read event. - */ - vlib_process_wait_for_event_or_clock (vm, wait_time); - event_type = vlib_process_get_events (vm, &event_data); - - switch (event_type) + if (nm->nl_status == NL_STATUS_NOTIF_PROC) + { + /* If we process a batch of messages and stop because we reached the + * batch size limit, we want to wake up after the batch delay and + * process more. Otherwise we just want to wait for a read event. + */ + vlib_process_wait_for_event_or_clock (vm, wait_time); + event_type = vlib_process_get_events (vm, &event_data); + vec_reset_length (event_data); + + switch (event_type) + { + /* Process batch of queued messages on timeout or read event + * signal + */ + case ~0: + case NL_EVENT_READ: + nl_route_process_msgs (); + wait_time = (vec_len (nm->nl_msg_queue) != 0) ? + nm->batch_delay_ms * 1e-3 : + DAY_F64; + break; + + /* Initiate synchronization if there was an error polling or + * reading the notification socket + */ + case NL_EVENT_ERR: + nm->nl_status = NL_STATUS_SYNC; + break; + + default: + NL_ERROR ("Unknown event type: %u", (u32) event_type); + } + } + else if (nm->nl_status == NL_STATUS_SYNC) { - /* process batch of queued messages on timeout or read event signal */ - case ~0: - case NL_EVENT_READ: - nl_route_process_msgs (); - wait_time = (vec_len (nm->nl_msg_queue) != 0) ? - nm->batch_delay_ms * 1e-3 : - DAY_F64; - break; - - /* reopen the socket if there was an error polling/reading it */ - case NL_EVENT_ERR: + /* Stop processing notifications - close the notification socket and + * discard all messages that are currently in the queue + */ lcp_nl_close_socket (); + lcp_nl_route_discard_msgs (); + + /* Wait some time before next synchronization attempt. Allows to + * reduce the number of failed attempts that stall the main thread by + * waiting out the notification storm + */ + NL_INFO ("Wait before next synchronization attempt for %ums", + nm->sync_attempt_delay_ms); + vlib_process_suspend (vm, nm->sync_attempt_delay_ms * 1e-3); + + /* Open netlink synchronization socket, one for every data type of + * interest: link, address, neighbor, and route. That is needed to + * be able to send dump requests for every data type simultaneously. + * If send a dump request while the previous one is in progress, + * the request will fail and EBUSY returned + */ +#define _(stype, mtype, tname, fn) lcp_nl_open_sync_socket (stype); + foreach_sock_type +#undef _ + + /* Start reading notifications and enqueueing them for further + * processing. The notifications will serve as a difference between + * the snapshot made after the dump request and the actual state at + * the moment. Once all the dump replies are processed, the + * notifications will be processed + */ lcp_nl_open_socket (); - break; - default: - NL_ERROR ("Unknown event type: %u", (u32) event_type); + /* Request the current entry set from the kernel for every data type + * of interest. Thus requesting a snapshot of the current routing + * state that the kernel will make and then reply with + */ +#define _(stype, mtype, tname, fn) lcp_nl_route_send_dump_req (stype, mtype); + foreach_sock_type +#undef _ + + /* Process all the dump replies */ +#define _(stype, mtype, tname, fn) \ + nl_##fn##_sync_begin (); \ + is_done = 0; \ + do \ + { \ + n_msgs = \ + lcp_nl_recv_dump_replies (stype, nm->sync_batch_limit, &is_done); \ + if (n_msgs < 0) \ + { \ + NL_ERROR ("Error receiving dump replies of type " tname \ + ": %s (%d)", \ + nl_geterror (n_msgs), n_msgs); \ + break; \ + } \ + else if (n_msgs == 0) \ + { \ + NL_ERROR ("EOF while receiving dump replies of type " tname); \ + break; \ + } \ + else \ + NL_INFO ("Processed %u dump replies of type " tname, n_msgs); \ + \ + /* Suspend the processing loop and wait until event signal is \ + * received or timeout expires. During synchronization, only \ + * error event is expected because read event is suppressed. \ + * Allows not to stall the main thread and detect errors on the \ + * notification socket that will make synchronization \ + * incomplete \ + */ \ + vlib_process_wait_for_event_or_clock (vm, \ + nm->sync_batch_delay_ms * 1e-3); \ + event_type = vlib_process_get_events (vm, &event_data); \ + vec_reset_length (event_data); \ + \ + /* If error event received, stop synchronization and repeat an \ + * attempt later \ + */ \ + if (event_type == NL_EVENT_ERR) \ + goto sync_later; \ + } \ + while (!is_done); \ + nl_##fn##_sync_end (); + + foreach_sock_type +#undef _ + + /* Start processing notifications */ + nm->nl_status = NL_STATUS_NOTIF_PROC; + + /* Trigger messages processing if there are notifications received + * during synchronization + */ + wait_time = (vec_len (nm->nl_msg_queue) != 0) ? 1e-3 : DAY_F64; + + sync_later: + /* Close netlink synchronization sockets */ +#define _(stype, mtype, tname, fn) lcp_nl_close_sync_socket (stype); + foreach_sock_type +#undef _ } - - vec_reset_length (event_data); + else + NL_ERROR ("Unknown status: %d", nm->nl_status); } return frame->n_vectors; } @@ -318,9 +701,12 @@ nl_route_cb (struct nl_msg *msg, void *arg) msg_info->msg = msg; nlmsg_get (msg); - /* notify process node */ - vlib_process_signal_event (vlib_get_main (), nl_route_process_node.index, - NL_EVENT_READ, 0); + /* notify process node if netlink notification processing is active */ + if (nm->nl_status == NL_STATUS_NOTIF_PROC) + { + vlib_process_signal_event (vlib_get_main (), nl_route_process_node.index, + NL_EVENT_READ, 0); + } return 0; } @@ -543,6 +929,55 @@ lcp_nl_open_socket (void) NL_INFO ("Opened netlink socket %d", nl_socket_get_fd (nm->sk_route)); } +static void +lcp_nl_open_sync_socket (nl_sock_type_t sock_type) +{ + nl_main_t *nm = &nl_main; + int dest_ns_fd, curr_ns_fd; + struct nl_sock *sk_route; + + /* Allocate a new blocking socket for routes that will be used for dump + * requests. Buffer sizes are left default because replies to dump requests + * are flow-controlled and the kernel will not overflow the socket by sending + * these + */ + + nm->sk_route_sync[sock_type] = sk_route = nl_socket_alloc (); + + dest_ns_fd = lcp_get_default_ns_fd (); + if (dest_ns_fd) + { + curr_ns_fd = clib_netns_open (NULL /* self */); + clib_setns (dest_ns_fd); + } + + nl_connect (sk_route, NETLINK_ROUTE); + + if (dest_ns_fd) + { + clib_setns (curr_ns_fd); + close (curr_ns_fd); + } + + NL_INFO ("Opened netlink synchronization socket %d of type %d", + nl_socket_get_fd (sk_route), sock_type); +} + +static void +lcp_nl_close_sync_socket (nl_sock_type_t sock_type) +{ + nl_main_t *nm = &nl_main; + struct nl_sock *sk_route = nm->sk_route_sync[sock_type]; + + if (sk_route) + { + NL_INFO ("Closing netlink synchronization socket %d of type %d", + nl_socket_get_fd (sk_route), sock_type); + nl_socket_free (sk_route); + nm->sk_route_sync[sock_type] = NULL; + } +} + #include <vnet/plugin/plugin.h> clib_error_t * lcp_nl_init (vlib_main_t *vm) @@ -552,6 +987,7 @@ lcp_nl_init (vlib_main_t *vm) .pair_add_fn = lcp_nl_pair_add_cb, }; + nm->nl_status = NL_STATUS_NOTIF_PROC; nm->clib_file_index = ~0; nm->nl_logger = vlib_log_register_class ("nl", "nl"); |