aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/linux-cp/lcp_nl.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/linux-cp/lcp_nl.c')
-rw-r--r--src/plugins/linux-cp/lcp_nl.c490
1 files changed, 463 insertions, 27 deletions
diff --git a/src/plugins/linux-cp/lcp_nl.c b/src/plugins/linux-cp/lcp_nl.c
index 1c0ca0d5cd5..62868f35dc4 100644
--- a/src/plugins/linux-cp/lcp_nl.c
+++ b/src/plugins/linux-cp/lcp_nl.c
@@ -31,6 +31,7 @@
#include <vlib/vlib.h>
#include <vlib/unix/unix.h>
#include <vppinfra/error.h>
+#include <vppinfra/linux/netns.h>
#include <vnet/fib/fib_table.h>
@@ -38,6 +39,29 @@
#include <plugins/linux-cp/lcp_interface.h>
+typedef enum nl_status_t_
+{
+ NL_STATUS_NOTIF_PROC,
+ NL_STATUS_SYNC,
+} nl_status_t;
+
+typedef enum nl_sock_type_t_
+{
+ NL_SOCK_TYPE_LINK,
+ NL_SOCK_TYPE_ADDR,
+ NL_SOCK_TYPE_NEIGH,
+ NL_SOCK_TYPE_ROUTE,
+} nl_sock_type_t;
+
+#define NL_SOCK_TYPES_N (NL_SOCK_TYPE_ROUTE + 1)
+
+/* Socket type, message type, type name, function subname */
+#define foreach_sock_type \
+ _ (NL_SOCK_TYPE_LINK, RTM_GETLINK, "link", link) \
+ _ (NL_SOCK_TYPE_ADDR, RTM_GETADDR, "address", link_addr) \
+ _ (NL_SOCK_TYPE_NEIGH, RTM_GETNEIGH, "neighbor", neigh) \
+ _ (NL_SOCK_TYPE_ROUTE, RTM_GETROUTE, "route", route)
+
typedef enum nl_event_type_t_
{
NL_EVENT_READ,
@@ -47,7 +71,10 @@ typedef enum nl_event_type_t_
typedef struct nl_main
{
+ nl_status_t nl_status;
+
struct nl_sock *sk_route;
+ struct nl_sock *sk_route_sync[NL_SOCK_TYPES_N];
vlib_log_class_t nl_logger;
nl_vft_t *nl_vfts;
struct nl_cache *nl_caches[LCP_NL_N_OBJS];
@@ -59,6 +86,10 @@ typedef struct nl_main
u32 batch_size;
u32 batch_delay_ms;
+ u32 sync_batch_limit;
+ u32 sync_batch_delay_ms;
+ u32 sync_attempt_delay_ms;
+
} nl_main_t;
#define NL_RX_BUF_SIZE_DEF (1 << 27) /* 128 MB */
@@ -66,11 +97,18 @@ typedef struct nl_main
#define NL_BATCH_SIZE_DEF (1 << 11) /* 2048 */
#define NL_BATCH_DELAY_MS_DEF 50 /* 50 ms, max 20 batch/s */
+#define NL_SYNC_BATCH_LIMIT_DEF (1 << 10) /* 1024 */
+#define NL_SYNC_BATCH_DELAY_MS_DEF 20 /* 20ms, max 50 batch/s */
+#define NL_SYNC_ATTEMPT_DELAY_MS_DEF 2000 /* 2s */
+
static nl_main_t nl_main = {
.rx_buf_size = NL_RX_BUF_SIZE_DEF,
.tx_buf_size = NL_TX_BUF_SIZE_DEF,
.batch_size = NL_BATCH_SIZE_DEF,
.batch_delay_ms = NL_BATCH_DELAY_MS_DEF,
+ .sync_batch_limit = NL_SYNC_BATCH_LIMIT_DEF,
+ .sync_batch_delay_ms = NL_SYNC_BATCH_DELAY_MS_DEF,
+ .sync_attempt_delay_ms = NL_SYNC_ATTEMPT_DELAY_MS_DEF,
};
/* #define foreach_nl_nft_proto \ */
@@ -103,6 +141,25 @@ static nl_main_t nl_main = {
} \
}
+#define FOREACH_VFT_NO_ARG(__func) \
+ { \
+ nl_main_t *nm = &nl_main; \
+ nl_vft_t *__nv; \
+ vec_foreach (__nv, nm->nl_vfts) \
+ { \
+ if (!__nv->__func.cb) \
+ continue; \
+ \
+ if (!__nv->__func.is_mp_safe) \
+ vlib_worker_thread_barrier_sync (vlib_get_main ()); \
+ \
+ __nv->__func.cb (); \
+ \
+ if (!__nv->__func.is_mp_safe) \
+ vlib_worker_thread_barrier_release (vlib_get_main ()); \
+ } \
+ }
+
#define FOREACH_VFT_CTX(__func, __arg, __ctx) \
{ \
nl_main_t *nm = &nl_main; \
@@ -136,6 +193,8 @@ nl_register_vft (const nl_vft_t *nv)
static void lcp_nl_open_socket (void);
static void lcp_nl_close_socket (void);
+static void lcp_nl_open_sync_socket (nl_sock_type_t sock_type);
+static void lcp_nl_close_sync_socket (nl_sock_type_t sock_type);
static void
nl_route_del (struct rtnl_route *rr, void *arg)
@@ -150,6 +209,18 @@ nl_route_add (struct rtnl_route *rr, void *arg)
}
static void
+nl_route_sync_begin (void)
+{
+ FOREACH_VFT_NO_ARG (nvl_rt_route_sync_begin);
+}
+
+static void
+nl_route_sync_end (void)
+{
+ FOREACH_VFT_NO_ARG (nvl_rt_route_sync_end);
+}
+
+static void
nl_neigh_del (struct rtnl_neigh *rn, void *arg)
{
FOREACH_VFT (nvl_rt_neigh_del, rn);
@@ -162,6 +233,18 @@ nl_neigh_add (struct rtnl_neigh *rn, void *arg)
}
static void
+nl_neigh_sync_begin (void)
+{
+ FOREACH_VFT_NO_ARG (nvl_rt_neigh_sync_begin);
+}
+
+static void
+nl_neigh_sync_end (void)
+{
+ FOREACH_VFT_NO_ARG (nvl_rt_neigh_sync_end);
+}
+
+static void
nl_link_addr_del (struct rtnl_addr *rla, void *arg)
{
FOREACH_VFT (nvl_rt_addr_del, rla);
@@ -174,6 +257,18 @@ nl_link_addr_add (struct rtnl_addr *rla, void *arg)
}
static void
+nl_link_addr_sync_begin (void)
+{
+ FOREACH_VFT_NO_ARG (nvl_rt_addr_sync_begin);
+}
+
+static void
+nl_link_addr_sync_end (void)
+{
+ FOREACH_VFT_NO_ARG (nvl_rt_addr_sync_end);
+}
+
+static void
nl_link_del (struct rtnl_link *rl, void *arg)
{
FOREACH_VFT_CTX (nvl_rt_link_del, rl, arg);
@@ -186,6 +281,18 @@ nl_link_add (struct rtnl_link *rl, void *arg)
}
static void
+nl_link_sync_begin (void)
+{
+ FOREACH_VFT_NO_ARG (nvl_rt_link_sync_begin);
+}
+
+static void
+nl_link_sync_end (void)
+{
+ FOREACH_VFT_NO_ARG (nvl_rt_link_sync_end);
+}
+
+static void
nl_route_dispatch (struct nl_object *obj, void *arg)
{
/* nothing can be done without interface mappings */
@@ -251,6 +358,170 @@ nl_route_process_msgs (void)
return n_msgs;
}
+static int
+lcp_nl_route_discard_msgs (void)
+{
+ nl_main_t *nm = &nl_main;
+ nl_msg_info_t *msg_info;
+ int n_msgs;
+
+ n_msgs = vec_len (nm->nl_msg_queue);
+ if (n_msgs == 0)
+ return 0;
+
+ vec_foreach (msg_info, nm->nl_msg_queue)
+ {
+ nlmsg_free (msg_info->msg);
+ }
+
+ vec_reset_length (nm->nl_msg_queue);
+
+ NL_INFO ("Discarded %u messages", n_msgs);
+
+ return n_msgs;
+}
+
+static int
+lcp_nl_route_send_dump_req (nl_sock_type_t sock_type, int msg_type)
+{
+ nl_main_t *nm = &nl_main;
+ struct nl_sock *sk_route = nm->sk_route_sync[sock_type];
+ int err;
+ struct rtgenmsg rt_hdr = {
+ .rtgen_family = AF_UNSPEC,
+ };
+
+ err =
+ nl_send_simple (sk_route, msg_type, NLM_F_DUMP, &rt_hdr, sizeof (rt_hdr));
+
+ if (err < 0)
+ {
+ NL_ERROR ("Unable to send a dump request: %s", nl_geterror (err));
+ }
+ else
+ NL_INFO ("Dump request sent via socket %d of type %d",
+ nl_socket_get_fd (sk_route), sock_type);
+
+ return err;
+}
+
+static int
+lcp_nl_route_dump_cb (struct nl_msg *msg, void *arg)
+{
+ int err;
+
+ if ((err = nl_msg_parse (msg, nl_route_dispatch, NULL)) < 0)
+ NL_ERROR ("Unable to parse object: %s", nl_geterror (err));
+
+ return NL_OK;
+}
+
+static int
+lcp_nl_recv_dump_replies (nl_sock_type_t sock_type, int msg_limit,
+ int *is_done_rcvd)
+{
+ nl_main_t *nm = &nl_main;
+ struct nl_sock *sk_route = nm->sk_route_sync[sock_type];
+ struct sockaddr_nl nla;
+ uint8_t *buf = NULL;
+ int n_bytes;
+ struct nlmsghdr *hdr;
+ struct nl_msg *msg = NULL;
+ int err = 0;
+ int done = 0;
+ int n_msgs = 0;
+
+continue_reading:
+ n_bytes = nl_recv (sk_route, &nla, &buf, /* creds */ NULL);
+ if (n_bytes <= 0)
+ return n_bytes;
+
+ hdr = (struct nlmsghdr *) buf;
+ while (nlmsg_ok (hdr, n_bytes))
+ {
+ nlmsg_free (msg);
+ msg = nlmsg_convert (hdr);
+ if (!msg)
+ {
+ err = -NLE_NOMEM;
+ goto out;
+ }
+
+ n_msgs++;
+
+ nlmsg_set_proto (msg, NETLINK_ROUTE);
+ nlmsg_set_src (msg, &nla);
+
+ /* Message that terminates a multipart message. Finish parsing and signal
+ * the caller that all dump replies have been received
+ */
+ if (hdr->nlmsg_type == NLMSG_DONE)
+ {
+ done = 1;
+ goto out;
+ }
+ /* Message to be ignored. Continue parsing */
+ else if (hdr->nlmsg_type == NLMSG_NOOP)
+ ;
+ /* Message that indicates data was lost. Finish parsing and return an
+ * error
+ */
+ else if (hdr->nlmsg_type == NLMSG_OVERRUN)
+ {
+ err = -NLE_MSG_OVERFLOW;
+ goto out;
+ }
+ /* Message that indicates an error. Finish parsing, extract the error
+ * code, and return it */
+ else if (hdr->nlmsg_type == NLMSG_ERROR)
+ {
+ struct nlmsgerr *e = nlmsg_data (hdr);
+
+ if (hdr->nlmsg_len < nlmsg_size (sizeof (*e)))
+ {
+ err = -NLE_MSG_TRUNC;
+ goto out;
+ }
+ else if (e->error)
+ {
+ err = -nl_syserr2nlerr (e->error);
+ goto out;
+ }
+ /* Message is an acknowledgement (err_code = 0). Continue parsing */
+ else
+ ;
+ }
+ /* Message that contains the requested data. Pass it for processing and
+ * continue parsing
+ */
+ else
+ {
+ lcp_nl_route_dump_cb (msg, NULL);
+ }
+
+ hdr = nlmsg_next (hdr, &n_bytes);
+ }
+
+ nlmsg_free (msg);
+ free (buf);
+ msg = NULL;
+ buf = NULL;
+
+ if (!done && n_msgs < msg_limit)
+ goto continue_reading;
+
+out:
+ nlmsg_free (msg);
+ free (buf);
+
+ if (err)
+ return err;
+
+ *is_done_rcvd = done;
+
+ return n_msgs;
+}
+
#define DAY_F64 (1.0 * (24 * 60 * 60))
static uword
@@ -261,38 +532,150 @@ nl_route_process (vlib_main_t *vm, vlib_node_runtime_t *node,
uword event_type;
uword *event_data = 0;
f64 wait_time = DAY_F64;
+ int n_msgs;
+ int is_done;
while (1)
{
- /* If we process a batch of messages and stop because we reached the
- * batch size limit, we want to wake up after the batch delay and
- * process more. Otherwise we just want to wait for a read event.
- */
- vlib_process_wait_for_event_or_clock (vm, wait_time);
- event_type = vlib_process_get_events (vm, &event_data);
-
- switch (event_type)
+ if (nm->nl_status == NL_STATUS_NOTIF_PROC)
+ {
+ /* If we process a batch of messages and stop because we reached the
+ * batch size limit, we want to wake up after the batch delay and
+ * process more. Otherwise we just want to wait for a read event.
+ */
+ vlib_process_wait_for_event_or_clock (vm, wait_time);
+ event_type = vlib_process_get_events (vm, &event_data);
+ vec_reset_length (event_data);
+
+ switch (event_type)
+ {
+ /* Process batch of queued messages on timeout or read event
+ * signal
+ */
+ case ~0:
+ case NL_EVENT_READ:
+ nl_route_process_msgs ();
+ wait_time = (vec_len (nm->nl_msg_queue) != 0) ?
+ nm->batch_delay_ms * 1e-3 :
+ DAY_F64;
+ break;
+
+ /* Initiate synchronization if there was an error polling or
+ * reading the notification socket
+ */
+ case NL_EVENT_ERR:
+ nm->nl_status = NL_STATUS_SYNC;
+ break;
+
+ default:
+ NL_ERROR ("Unknown event type: %u", (u32) event_type);
+ }
+ }
+ else if (nm->nl_status == NL_STATUS_SYNC)
{
- /* process batch of queued messages on timeout or read event signal */
- case ~0:
- case NL_EVENT_READ:
- nl_route_process_msgs ();
- wait_time = (vec_len (nm->nl_msg_queue) != 0) ?
- nm->batch_delay_ms * 1e-3 :
- DAY_F64;
- break;
-
- /* reopen the socket if there was an error polling/reading it */
- case NL_EVENT_ERR:
+ /* Stop processing notifications - close the notification socket and
+ * discard all messages that are currently in the queue
+ */
lcp_nl_close_socket ();
+ lcp_nl_route_discard_msgs ();
+
+ /* Wait some time before next synchronization attempt. Allows to
+ * reduce the number of failed attempts that stall the main thread by
+ * waiting out the notification storm
+ */
+ NL_INFO ("Wait before next synchronization attempt for %ums",
+ nm->sync_attempt_delay_ms);
+ vlib_process_suspend (vm, nm->sync_attempt_delay_ms * 1e-3);
+
+ /* Open netlink synchronization socket, one for every data type of
+ * interest: link, address, neighbor, and route. That is needed to
+ * be able to send dump requests for every data type simultaneously.
+ * If send a dump request while the previous one is in progress,
+ * the request will fail and EBUSY returned
+ */
+#define _(stype, mtype, tname, fn) lcp_nl_open_sync_socket (stype);
+ foreach_sock_type
+#undef _
+
+ /* Start reading notifications and enqueueing them for further
+ * processing. The notifications will serve as a difference between
+ * the snapshot made after the dump request and the actual state at
+ * the moment. Once all the dump replies are processed, the
+ * notifications will be processed
+ */
lcp_nl_open_socket ();
- break;
- default:
- NL_ERROR ("Unknown event type: %u", (u32) event_type);
+ /* Request the current entry set from the kernel for every data type
+ * of interest. Thus requesting a snapshot of the current routing
+ * state that the kernel will make and then reply with
+ */
+#define _(stype, mtype, tname, fn) lcp_nl_route_send_dump_req (stype, mtype);
+ foreach_sock_type
+#undef _
+
+ /* Process all the dump replies */
+#define _(stype, mtype, tname, fn) \
+ nl_##fn##_sync_begin (); \
+ is_done = 0; \
+ do \
+ { \
+ n_msgs = \
+ lcp_nl_recv_dump_replies (stype, nm->sync_batch_limit, &is_done); \
+ if (n_msgs < 0) \
+ { \
+ NL_ERROR ("Error receiving dump replies of type " tname \
+ ": %s (%d)", \
+ nl_geterror (n_msgs), n_msgs); \
+ break; \
+ } \
+ else if (n_msgs == 0) \
+ { \
+ NL_ERROR ("EOF while receiving dump replies of type " tname); \
+ break; \
+ } \
+ else \
+ NL_INFO ("Processed %u dump replies of type " tname, n_msgs); \
+ \
+ /* Suspend the processing loop and wait until event signal is \
+ * received or timeout expires. During synchronization, only \
+ * error event is expected because read event is suppressed. \
+ * Allows not to stall the main thread and detect errors on the \
+ * notification socket that will make synchronization \
+ * incomplete \
+ */ \
+ vlib_process_wait_for_event_or_clock (vm, \
+ nm->sync_batch_delay_ms * 1e-3); \
+ event_type = vlib_process_get_events (vm, &event_data); \
+ vec_reset_length (event_data); \
+ \
+ /* If error event received, stop synchronization and repeat an \
+ * attempt later \
+ */ \
+ if (event_type == NL_EVENT_ERR) \
+ goto sync_later; \
+ } \
+ while (!is_done); \
+ nl_##fn##_sync_end ();
+
+ foreach_sock_type
+#undef _
+
+ /* Start processing notifications */
+ nm->nl_status = NL_STATUS_NOTIF_PROC;
+
+ /* Trigger messages processing if there are notifications received
+ * during synchronization
+ */
+ wait_time = (vec_len (nm->nl_msg_queue) != 0) ? 1e-3 : DAY_F64;
+
+ sync_later:
+ /* Close netlink synchronization sockets */
+#define _(stype, mtype, tname, fn) lcp_nl_close_sync_socket (stype);
+ foreach_sock_type
+#undef _
}
-
- vec_reset_length (event_data);
+ else
+ NL_ERROR ("Unknown status: %d", nm->nl_status);
}
return frame->n_vectors;
}
@@ -318,9 +701,12 @@ nl_route_cb (struct nl_msg *msg, void *arg)
msg_info->msg = msg;
nlmsg_get (msg);
- /* notify process node */
- vlib_process_signal_event (vlib_get_main (), nl_route_process_node.index,
- NL_EVENT_READ, 0);
+ /* notify process node if netlink notification processing is active */
+ if (nm->nl_status == NL_STATUS_NOTIF_PROC)
+ {
+ vlib_process_signal_event (vlib_get_main (), nl_route_process_node.index,
+ NL_EVENT_READ, 0);
+ }
return 0;
}
@@ -543,6 +929,55 @@ lcp_nl_open_socket (void)
NL_INFO ("Opened netlink socket %d", nl_socket_get_fd (nm->sk_route));
}
+static void
+lcp_nl_open_sync_socket (nl_sock_type_t sock_type)
+{
+ nl_main_t *nm = &nl_main;
+ int dest_ns_fd, curr_ns_fd;
+ struct nl_sock *sk_route;
+
+ /* Allocate a new blocking socket for routes that will be used for dump
+ * requests. Buffer sizes are left default because replies to dump requests
+ * are flow-controlled and the kernel will not overflow the socket by sending
+ * these
+ */
+
+ nm->sk_route_sync[sock_type] = sk_route = nl_socket_alloc ();
+
+ dest_ns_fd = lcp_get_default_ns_fd ();
+ if (dest_ns_fd)
+ {
+ curr_ns_fd = clib_netns_open (NULL /* self */);
+ clib_setns (dest_ns_fd);
+ }
+
+ nl_connect (sk_route, NETLINK_ROUTE);
+
+ if (dest_ns_fd)
+ {
+ clib_setns (curr_ns_fd);
+ close (curr_ns_fd);
+ }
+
+ NL_INFO ("Opened netlink synchronization socket %d of type %d",
+ nl_socket_get_fd (sk_route), sock_type);
+}
+
+static void
+lcp_nl_close_sync_socket (nl_sock_type_t sock_type)
+{
+ nl_main_t *nm = &nl_main;
+ struct nl_sock *sk_route = nm->sk_route_sync[sock_type];
+
+ if (sk_route)
+ {
+ NL_INFO ("Closing netlink synchronization socket %d of type %d",
+ nl_socket_get_fd (sk_route), sock_type);
+ nl_socket_free (sk_route);
+ nm->sk_route_sync[sock_type] = NULL;
+ }
+}
+
#include <vnet/plugin/plugin.h>
clib_error_t *
lcp_nl_init (vlib_main_t *vm)
@@ -552,6 +987,7 @@ lcp_nl_init (vlib_main_t *vm)
.pair_add_fn = lcp_nl_pair_add_cb,
};
+ nm->nl_status = NL_STATUS_NOTIF_PROC;
nm->clib_file_index = ~0;
nm->nl_logger = vlib_log_register_class ("nl", "nl");