aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/sflow
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/sflow')
-rw-r--r--src/plugins/sflow/CMakeLists.txt61
-rw-r--r--src/plugins/sflow/FEATURE.yaml16
-rw-r--r--src/plugins/sflow/node.c356
-rw-r--r--src/plugins/sflow/sflow.api198
-rw-r--r--src/plugins/sflow/sflow.c1052
-rw-r--r--src/plugins/sflow/sflow.h198
-rw-r--r--src/plugins/sflow/sflow.rst61
-rw-r--r--src/plugins/sflow/sflow_common.h44
-rw-r--r--src/plugins/sflow/sflow_psample.c523
-rw-r--r--src/plugins/sflow/sflow_psample.h111
-rw-r--r--src/plugins/sflow/sflow_psample_fields.h36
-rw-r--r--src/plugins/sflow/sflow_test.c298
-rw-r--r--src/plugins/sflow/sflow_usersock.c222
-rw-r--r--src/plugins/sflow/sflow_usersock.h133
-rw-r--r--src/plugins/sflow/sflow_vapi.c226
-rw-r--r--src/plugins/sflow/sflow_vapi.h55
16 files changed, 3590 insertions, 0 deletions
diff --git a/src/plugins/sflow/CMakeLists.txt b/src/plugins/sflow/CMakeLists.txt
new file mode 100644
index 00000000000..35433bd24df
--- /dev/null
+++ b/src/plugins/sflow/CMakeLists.txt
@@ -0,0 +1,61 @@
+
+# Copyright (c) 2024 InMon Corp.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+vpp_find_path(NETLINK_INCLUDE_DIR NAMES linux/netlink.h)
+if (NOT NETLINK_INCLUDE_DIR)
+ message(WARNING "netlink headers not found - sflow plugin disabled")
+ return()
+endif()
+
+if ("${CMAKE_SYSTEM_NAME}" STREQUAL "FreeBSD")
+ message(WARNING "sflow is not supported on FreeBSD - sflow plugin disabled")
+ return()
+endif()
+
+LIST(FIND excluded_plugins linux-cp exc_index)
+if(${exc_index} EQUAL "-1")
+ message(WARNING "sflow plugin - linux-cp plugin included: compiling VAPI calls")
+ add_compile_definitions(SFLOW_USE_VAPI)
+else()
+ message(WARNING "sflow plugin - linux-cp plugin excluded: not compiling VAPI calls")
+endif()
+
+include_directories(${CMAKE_SOURCE_DIR}/vpp-api ${CMAKE_CURRENT_BINARY_DIR}/../../vpp-api)
+add_vpp_plugin(sflow
+ SOURCES
+ sflow.c
+ node.c
+ sflow_common.h
+ sflow.h
+ sflow_psample.c
+ sflow_psample.h
+ sflow_psample_fields.h
+ sflow_usersock.c
+ sflow_usersock.h
+ sflow_vapi.c
+ sflow_vapi.h
+
+ MULTIARCH_SOURCES
+ node.c
+
+ API_FILES
+ sflow.api
+
+ API_TEST_SOURCES
+ sflow_test.c
+
+ LINK_LIBRARIES
+ vppapiclient
+ vapiclient
+)
diff --git a/src/plugins/sflow/FEATURE.yaml b/src/plugins/sflow/FEATURE.yaml
new file mode 100644
index 00000000000..612db61005c
--- /dev/null
+++ b/src/plugins/sflow/FEATURE.yaml
@@ -0,0 +1,16 @@
+---
+name: sFlow
+maintainer: Neil McKee <neil.mckee@inmon.com>
+
+description: |-
+ This plugin implements the random packet-sampling and interface
+ telemetry streaming required to support standard sFlow export
+ on Linux platforms. The overhead incurred by this monitoring is
+ minimal, so that detailed, real-time traffic analysis can be
+ achieved even under high load conditions, with visibility into
+ any fields that appear in the packet headers. If the linux-cp
+ plugin is running then interfaces will be mapped to their
+ equivalent Linux tap ports.
+
+state: experimental
+properties: [CLI, MULTITHREAD]
diff --git a/src/plugins/sflow/node.c b/src/plugins/sflow/node.c
new file mode 100644
index 00000000000..51826438834
--- /dev/null
+++ b/src/plugins/sflow/node.c
@@ -0,0 +1,356 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vlib/vlib.h>
+#include <vlibmemory/api.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+typedef struct
+{
+ u32 next_index;
+ u32 sw_if_index;
+ u8 new_src_mac[6];
+ u8 new_dst_mac[6];
+} sflow_trace_t;
+
+#ifndef CLIB_MARCH_VARIANT
+static u8 *
+my_format_mac_address (u8 *s, va_list *args)
+{
+ u8 *a = va_arg (*args, u8 *);
+ return format (s, "%02x:%02x:%02x:%02x:%02x:%02x", a[0], a[1], a[2], a[3],
+ a[4], a[5]);
+}
+
+/* packet trace format function */
+static u8 *
+format_sflow_trace (u8 *s, va_list *args)
+{
+ CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
+ CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
+ sflow_trace_t *t = va_arg (*args, sflow_trace_t *);
+
+ s = format (s, "SFLOW: sw_if_index %d, next index %d\n", t->sw_if_index,
+ t->next_index);
+ s = format (s, " src %U -> dst %U", my_format_mac_address, t->new_src_mac,
+ my_format_mac_address, t->new_dst_mac);
+ return s;
+}
+
+vlib_node_registration_t sflow_node;
+
+#endif /* CLIB_MARCH_VARIANT */
+
+#ifndef CLIB_MARCH_VARIANT
+static char *sflow_error_strings[] = {
+#define _(sym, string) string,
+ foreach_sflow_error
+#undef _
+};
+#endif /* CLIB_MARCH_VARIANT */
+
+typedef enum
+{
+ SFLOW_NEXT_ETHERNET_INPUT,
+ SFLOW_N_NEXT,
+} sflow_next_t;
+
+VLIB_NODE_FN (sflow_node)
+(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
+{
+ u32 n_left_from, *from, *to_next;
+ sflow_next_t next_index;
+
+ sflow_main_t *smp = &sflow_main;
+ from = vlib_frame_vector_args (frame);
+ n_left_from = frame->n_vectors;
+
+ uword thread_index = os_get_thread_index ();
+ sflow_per_thread_data_t *sfwk =
+ vec_elt_at_index (smp->per_thread_data, thread_index);
+
+ /* note that sfwk->skip==1 means "take the next packet",
+ so we never see sfwk->skip==0. */
+
+ u32 pkts = n_left_from;
+ if (PREDICT_TRUE (sfwk->skip > pkts))
+ {
+ /* skip the whole frame-vector */
+ sfwk->skip -= pkts;
+ sfwk->pool += pkts;
+ }
+ else
+ {
+ while (pkts >= sfwk->skip)
+ {
+ /* reach in to get the one we want. */
+ vlib_buffer_t *bN = vlib_get_buffer (vm, from[sfwk->skip - 1]);
+
+ /* Sample this packet header. */
+ u32 hdr = bN->current_length;
+ if (hdr > smp->headerB)
+ hdr = smp->headerB;
+
+ ethernet_header_t *en = vlib_buffer_get_current (bN);
+ u32 if_index = vnet_buffer (bN)->sw_if_index[VLIB_RX];
+ vnet_hw_interface_t *hw =
+ vnet_get_sup_hw_interface (smp->vnet_main, if_index);
+ if (hw)
+ if_index = hw->hw_if_index;
+ else
+ {
+ // TODO: can we get interfaces that have no hw interface?
+ // If so, should we ignore the sample?
+ }
+
+ sflow_sample_t sample = {
+ .samplingN = sfwk->smpN,
+ .input_if_index = if_index,
+ .sampled_packet_size =
+ bN->current_length + bN->total_length_not_including_first_buffer,
+ .header_bytes = hdr
+ };
+
+ // TODO: what bit in the buffer can we set right here to indicate
+ // that this packet was sampled (and perhaps another bit to say if it
+ // was dropped or sucessfully enqueued)? That way we can check it
+ // below if the packet is traced, and indicate that in the trace
+ // output.
+
+ // TODO: we end up copying the header twice here. Consider allowing
+ // the enqueue to be just a little more complex. Like this:
+ // if(!sflow_fifo_enqueue(&sfwk->fifo, &sample, en, hdr).
+ // With headerB==128 that would be memcpy(,,24) plus memcpy(,,128)
+ // instead of the memcpy(,,128) plus memcpy(,,24+256) that we do
+ // here. (We also know that it could be done as a multiple of 8
+ // (aligned) bytes because the sflow_sample_t fields are (6xu32) and
+ // the headerB setting is quantized to the nearest 32 bytes, so there
+ // may be ways to make it even easier for the compiler.)
+ sfwk->smpl++;
+ memcpy (sample.header, en, hdr);
+ if (PREDICT_FALSE (!sflow_fifo_enqueue (&sfwk->fifo, &sample)))
+ sfwk->drop++;
+
+ pkts -= sfwk->skip;
+ sfwk->pool += sfwk->skip;
+ sfwk->skip = sflow_next_random_skip (sfwk);
+ }
+ /* We took a sample (or several) from this frame-vector, but now we are
+ skipping the rest. */
+ sfwk->skip -= pkts;
+ sfwk->pool += pkts;
+ }
+
+ /* the rest of this is boilerplate code just to make sure
+ * that packets are passed on the same way as they would
+ * have been if this node were not enabled.
+ * TODO: If there is ever a way to do this in one step
+ * (i.e. pass on the whole frame-vector unchanged) then it
+ * might help performance.
+ */
+
+ next_index = node->cached_next_index;
+
+ while (n_left_from > 0)
+ {
+ u32 n_left_to_next;
+
+ vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
+
+ while (n_left_from >= 8 && n_left_to_next >= 4)
+ {
+ u32 next0 = SFLOW_NEXT_ETHERNET_INPUT;
+ u32 next1 = SFLOW_NEXT_ETHERNET_INPUT;
+ u32 next2 = SFLOW_NEXT_ETHERNET_INPUT;
+ u32 next3 = SFLOW_NEXT_ETHERNET_INPUT;
+ ethernet_header_t *en0, *en1, *en2, *en3;
+ u32 bi0, bi1, bi2, bi3;
+ vlib_buffer_t *b0, *b1, *b2, *b3;
+
+ /* Prefetch next iteration. */
+ {
+ vlib_buffer_t *p4, *p5, *p6, *p7;
+
+ p4 = vlib_get_buffer (vm, from[4]);
+ p5 = vlib_get_buffer (vm, from[5]);
+ p6 = vlib_get_buffer (vm, from[6]);
+ p7 = vlib_get_buffer (vm, from[7]);
+
+ vlib_prefetch_buffer_header (p4, LOAD);
+ vlib_prefetch_buffer_header (p5, LOAD);
+ vlib_prefetch_buffer_header (p6, LOAD);
+ vlib_prefetch_buffer_header (p7, LOAD);
+
+ CLIB_PREFETCH (p4->data, CLIB_CACHE_LINE_BYTES, STORE);
+ CLIB_PREFETCH (p5->data, CLIB_CACHE_LINE_BYTES, STORE);
+ CLIB_PREFETCH (p6->data, CLIB_CACHE_LINE_BYTES, STORE);
+ CLIB_PREFETCH (p7->data, CLIB_CACHE_LINE_BYTES, STORE);
+ }
+
+ /* speculatively enqueue b0-b3 to the current next frame */
+ to_next[0] = bi0 = from[0];
+ to_next[1] = bi1 = from[1];
+ to_next[2] = bi2 = from[2];
+ to_next[3] = bi3 = from[3];
+ from += 4;
+ to_next += 4;
+ n_left_from -= 4;
+ n_left_to_next -= 4;
+
+ b0 = vlib_get_buffer (vm, bi0);
+ b1 = vlib_get_buffer (vm, bi1);
+ b2 = vlib_get_buffer (vm, bi2);
+ b3 = vlib_get_buffer (vm, bi3);
+
+ /* do this to always pass on to the next node on feature arc */
+ vnet_feature_next (&next0, b0);
+ vnet_feature_next (&next1, b1);
+ vnet_feature_next (&next2, b2);
+ vnet_feature_next (&next3, b3);
+
+ ASSERT (b0->current_data == 0);
+ ASSERT (b1->current_data == 0);
+ ASSERT (b2->current_data == 0);
+ ASSERT (b3->current_data == 0);
+
+ en0 = vlib_buffer_get_current (b0);
+ en1 = vlib_buffer_get_current (b1);
+ en2 = vlib_buffer_get_current (b2);
+ en3 = vlib_buffer_get_current (b3);
+
+ if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
+ {
+ sflow_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+ t->next_index = next0;
+ clib_memcpy (t->new_src_mac, en0->src_address,
+ sizeof (t->new_src_mac));
+ clib_memcpy (t->new_dst_mac, en0->dst_address,
+ sizeof (t->new_dst_mac));
+ }
+
+ if (PREDICT_FALSE (b1->flags & VLIB_BUFFER_IS_TRACED))
+ {
+ sflow_trace_t *t = vlib_add_trace (vm, node, b1, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_RX];
+ t->next_index = next1;
+ clib_memcpy (t->new_src_mac, en1->src_address,
+ sizeof (t->new_src_mac));
+ clib_memcpy (t->new_dst_mac, en1->dst_address,
+ sizeof (t->new_dst_mac));
+ }
+
+ if (PREDICT_FALSE (b2->flags & VLIB_BUFFER_IS_TRACED))
+ {
+ sflow_trace_t *t = vlib_add_trace (vm, node, b2, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b2)->sw_if_index[VLIB_RX];
+ t->next_index = next2;
+ clib_memcpy (t->new_src_mac, en2->src_address,
+ sizeof (t->new_src_mac));
+ clib_memcpy (t->new_dst_mac, en2->dst_address,
+ sizeof (t->new_dst_mac));
+ }
+
+ if (PREDICT_FALSE (b3->flags & VLIB_BUFFER_IS_TRACED))
+ {
+ sflow_trace_t *t = vlib_add_trace (vm, node, b3, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b3)->sw_if_index[VLIB_RX];
+ t->next_index = next3;
+ clib_memcpy (t->new_src_mac, en3->src_address,
+ sizeof (t->new_src_mac));
+ clib_memcpy (t->new_dst_mac, en3->dst_address,
+ sizeof (t->new_dst_mac));
+ }
+
+ /* verify speculative enqueues, maybe switch current next frame */
+ vlib_validate_buffer_enqueue_x4 (vm, node, next_index, to_next,
+ n_left_to_next, bi0, bi1, bi2, bi3,
+ next0, next1, next2, next3);
+ }
+
+ while (n_left_from > 0 && n_left_to_next > 0)
+ {
+ u32 bi0;
+ vlib_buffer_t *b0;
+ u32 next0 = SFLOW_NEXT_ETHERNET_INPUT;
+ ethernet_header_t *en0;
+
+ /* speculatively enqueue b0 to the current next frame */
+ bi0 = from[0];
+ to_next[0] = bi0;
+ from += 1;
+ to_next += 1;
+ n_left_from -= 1;
+ n_left_to_next -= 1;
+
+ b0 = vlib_get_buffer (vm, bi0);
+
+ /* do this to always pass on to the next node on feature arc */
+ vnet_feature_next (&next0, b0);
+
+ /*
+ * Direct from the driver, we should be at offset 0
+ * aka at &b0->data[0]
+ */
+ ASSERT (b0->current_data == 0);
+
+ en0 = vlib_buffer_get_current (b0);
+
+ if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
+ {
+ sflow_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
+ t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX];
+ t->next_index = next0;
+ clib_memcpy (t->new_src_mac, en0->src_address,
+ sizeof (t->new_src_mac));
+ clib_memcpy (t->new_dst_mac, en0->dst_address,
+ sizeof (t->new_dst_mac));
+ }
+
+ /* verify speculative enqueue, maybe switch current next frame */
+ vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
+ n_left_to_next, bi0, next0);
+ }
+
+ vlib_put_next_frame (vm, node, next_index, n_left_to_next);
+ }
+ return frame->n_vectors;
+}
+
+#ifndef CLIB_MARCH_VARIANT
+VLIB_REGISTER_NODE (sflow_node) =
+{
+ .name = "sflow",
+ .vector_size = sizeof (u32),
+ .format_trace = format_sflow_trace,
+ .type = VLIB_NODE_TYPE_INTERNAL,
+ .n_errors = ARRAY_LEN(sflow_error_strings),
+ .error_strings = sflow_error_strings,
+ .n_next_nodes = SFLOW_N_NEXT,
+ /* edit / add dispositions here */
+ .next_nodes = {
+ [SFLOW_NEXT_ETHERNET_INPUT] = "ethernet-input",
+ },
+};
+#endif /* CLIB_MARCH_VARIANT */
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow.api b/src/plugins/sflow/sflow.api
new file mode 100644
index 00000000000..e5f33001e6e
--- /dev/null
+++ b/src/plugins/sflow/sflow.api
@@ -0,0 +1,198 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file sflow.api
+ * @brief VPP control-plane API messages.
+ *
+ * This file defines VPP control-plane binary API messages which are generally
+ * called through a shared memory interface.
+ */
+
+/* Version and type recitations */
+
+option version = "0.1.0";
+import "vnet/interface_types.api";
+
+
+/** @brief API to enable / disable sflow
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param enable_disable - 1 to enable, 0 to disable the feature
+ @param hw_if_index - hardware interface handle
+*/
+
+autoreply define sflow_enable_disable {
+ /* Client identifier, set from api_main.my_client_index */
+ u32 client_index;
+
+ /* Arbitrary context, so client can match reply to request */
+ u32 context;
+
+ /* Enable / disable the feature */
+ bool enable_disable;
+
+ /* Interface handle */
+ vl_api_interface_index_t hw_if_index;
+};
+
+/** @brief API to get sflow sampling-rate
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+*/
+
+define sflow_sampling_rate_get {
+ /* Client identifier, set from api_main.my_client_index */
+ u32 client_index;
+
+ /* Arbitrary context, so client can match reply to request */
+ u32 context;
+};
+
+/** \brief API go the sflow sampling-rate
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param sampling_N - the current 1-in-N sampling rate
+*/
+
+define sflow_sampling_rate_get_reply
+{
+ u32 context;
+ u32 sampling_N;
+ option in_progress;
+};
+
+/** @brief API to set sflow sampling-rate
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param sampling_N - 1-in-N random sampling rate
+*/
+
+autoreply define sflow_sampling_rate_set {
+ /* Client identifier, set from api_main.my_client_index */
+ u32 client_index;
+
+ /* Arbitrary context, so client can match reply to request */
+ u32 context;
+
+ /* Sampling_N */
+ u32 sampling_N [default=10000];
+};
+
+/** @brief API to set sflow polling-interval
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param polling_S - polling interval in seconds
+*/
+
+autoreply define sflow_polling_interval_set {
+ /* Client identifier, set from api_main.my_client_index */
+ u32 client_index;
+
+ /* Arbitrary context, so client can match reply to request */
+ u32 context;
+
+ /* Polling_S */
+ u32 polling_S [default=20];
+};
+
+/** @brief API to get sflow polling-interval
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+*/
+
+define sflow_polling_interval_get {
+ /* Client identifier, set from api_main.my_client_index */
+ u32 client_index;
+
+ /* Arbitrary context, so client can match reply to request */
+ u32 context;
+};
+
+/** \brief API go the sflow polling-interval
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param polling_S - current polling interval in seconds
+*/
+
+define sflow_polling_interval_get_reply
+{
+ u32 context;
+ u32 polling_S;
+ option in_progress;
+};
+
+/** @brief API to set sflow header-bytes
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param header_B - max header length in bytes
+*/
+
+autoreply define sflow_header_bytes_set {
+ /* Client identifier, set from api_main.my_client_index */
+ u32 client_index;
+
+ /* Arbitrary context, so client can match reply to request */
+ u32 context;
+
+ /* header_B */
+ u32 header_B [default=128];
+};
+
+/** @brief API to get sflow header-bytes
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+*/
+
+define sflow_header_bytes_get {
+ /* Client identifier, set from api_main.my_client_index */
+ u32 client_index;
+
+ /* Arbitrary context, so client can match reply to request */
+ u32 context;
+};
+
+/** \brief API go the sflow header-bytes
+ @param client_index - opaque cookie to identify the sender
+ @param context - sender context, to match reply w/ request
+ @param header_B - current maximum header length in bytes
+*/
+
+define sflow_header_bytes_get_reply
+{
+ u32 context;
+ u32 header_B;
+ option in_progress;
+};
+
+/** \brief Dump sflow enabled interface(s)
+ @param client_index - opaque cookie to identify the sender
+ @param hw_if_index - hw_if_index of a specific interface, or -1 (default)
+ to return all sflow enabled interfaces
+*/
+define sflow_interface_dump
+{
+ u32 client_index;
+ u32 context;
+ vl_api_interface_index_t hw_if_index [default=0xffffffff];
+};
+
+/** \brief sflow enabled interface details
+*/
+define sflow_interface_details
+{
+ u32 context;
+ vl_api_interface_index_t hw_if_index;
+};
diff --git a/src/plugins/sflow/sflow.c b/src/plugins/sflow/sflow.c
new file mode 100644
index 00000000000..5aa65062330
--- /dev/null
+++ b/src/plugins/sflow/sflow.c
@@ -0,0 +1,1052 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vnet/vnet.h>
+#include <vnet/plugin/plugin.h>
+#include <sflow/sflow.h>
+
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vpp/app/version.h>
+#include <stdbool.h>
+
+#include <sflow/sflow.api_enum.h>
+#include <sflow/sflow.api_types.h>
+#include <sflow/sflow_psample.h>
+
+#include <vpp-api/client/stat_client.h>
+#include <vlib/stats/stats.h>
+
+#define REPLY_MSG_ID_BASE smp->msg_id_base
+#include <vlibapi/api_helper_macros.h>
+
+sflow_main_t sflow_main;
+vlib_log_class_t sflow_logger;
+
+static void
+sflow_stat_segment_client_init (void)
+{
+ stat_client_main_t *scm = &stat_client_main;
+ vlib_stats_segment_t *sm = vlib_stats_get_segment ();
+ uword size;
+
+ size = sm->memory_size ? sm->memory_size : STAT_SEGMENT_DEFAULT_SIZE;
+ scm->memory_size = size;
+ scm->shared_header = sm->shared_header;
+ scm->directory_vector =
+ stat_segment_adjust (scm, (void *) scm->shared_header->directory_vector);
+}
+
+static void
+update_counter_vector_simple (stat_segment_data_t *res,
+ sflow_counters_t *ifCtrs, u32 hw_if_index)
+{
+ for (int th = 0; th < vec_len (res->simple_counter_vec); th++)
+ {
+ for (int intf = 0; intf < vec_len (res->simple_counter_vec[th]); intf++)
+ {
+ if (intf == hw_if_index)
+ {
+ u64 count = res->simple_counter_vec[th][intf];
+ if (count)
+ {
+ if (strcmp (res->name, "/if/rx-error") == 0)
+ ifCtrs->rx.errs += count;
+ else if (strcmp (res->name, "/if/tx-error") == 0)
+ ifCtrs->tx.errs += count;
+ else if (strcmp (res->name, "/if/drops") == 0)
+ ifCtrs->tx.drps += count;
+ else if (strcmp (res->name, "/if/rx-miss") == 0 ||
+ strcmp (res->name, "/if/rx-no-buf") == 0)
+ ifCtrs->rx.drps += count;
+ }
+ }
+ }
+ }
+}
+
+static void
+update_counter_vector_combined (stat_segment_data_t *res,
+ sflow_counters_t *ifCtrs, u32 hw_if_index)
+{
+ for (int th = 0; th < vec_len (res->simple_counter_vec); th++)
+ {
+ for (int intf = 0; intf < vec_len (res->combined_counter_vec[th]);
+ intf++)
+ {
+ if (intf == hw_if_index)
+ {
+ u64 pkts = res->combined_counter_vec[th][intf].packets;
+ u64 byts = res->combined_counter_vec[th][intf].bytes;
+ if (pkts || byts)
+ {
+ if (strcmp (res->name, "/if/rx") == 0)
+ {
+ ifCtrs->rx.pkts += pkts;
+ ifCtrs->rx.byts += byts;
+ }
+ else if (strcmp (res->name, "/if/tx") == 0)
+ {
+ ifCtrs->tx.byts += byts;
+ ifCtrs->tx.pkts += pkts;
+ }
+ // TODO: do multicasts include broadcasts, or are they
+ // counted separately? (test with traffic)
+ else if (strcmp (res->name, "/if/rx-multicast") == 0)
+ ifCtrs->rx.m_pkts += pkts;
+ else if (strcmp (res->name, "/if/tx-multicast") == 0)
+ ifCtrs->tx.m_pkts += pkts;
+ else if (strcmp (res->name, "/if/rx-broadcast") == 0)
+ ifCtrs->rx.b_pkts += pkts;
+ else if (strcmp (res->name, "/if/tx-broadcast") == 0)
+ ifCtrs->tx.b_pkts += pkts;
+ }
+ }
+ }
+ }
+}
+
+static int
+startsWith (u8 *str, char *prefix)
+{
+ if (str && prefix)
+ {
+ int len1 = vec_len (str);
+ int len2 = strlen (prefix);
+ if (len1 >= len2)
+ {
+ return (memcmp (str, prefix, len2) == 0);
+ }
+ }
+ return false;
+}
+
+static void
+update_counters (sflow_main_t *smp, sflow_per_interface_data_t *sfif)
+{
+ vnet_sw_interface_t *sw =
+ vnet_get_sw_interface (smp->vnet_main, sfif->sw_if_index);
+ vnet_hw_interface_t *hw =
+ vnet_get_hw_interface (smp->vnet_main, sfif->hw_if_index);
+ // This gives us a list of stat integers
+ u32 *stats = stat_segment_ls (NULL);
+ stat_segment_data_t *res = NULL;
+ // read vector of stat_segment_data_t objects
+retry:
+ res = stat_segment_dump (stats);
+ if (res == NULL)
+ {
+ /* Memory layout has changed */
+ if (stats)
+ vec_free (stats);
+ stats = stat_segment_ls (NULL);
+ goto retry;
+ }
+ sflow_counters_t ifCtrs = {};
+ // and accumulate the (per-thread) entries for this interface
+ for (int ii = 0; ii < vec_len (res); ii++)
+ {
+ switch (res[ii].type)
+ {
+ case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE:
+ update_counter_vector_simple (&res[ii], &ifCtrs, sfif->hw_if_index);
+ break;
+ case STAT_DIR_TYPE_COUNTER_VECTOR_COMBINED:
+ update_counter_vector_combined (&res[ii], &ifCtrs,
+ sfif->hw_if_index);
+ break;
+ case STAT_DIR_TYPE_SCALAR_INDEX:
+ case STAT_DIR_TYPE_NAME_VECTOR:
+ case STAT_DIR_TYPE_EMPTY:
+ default:
+ break;
+ }
+ }
+ stat_segment_data_free (res);
+ vec_free (stats);
+ // send the structure via netlink
+ SFLOWUSSpec spec = {};
+ SFLOWUSSpec_setMsgType (&spec, SFLOW_VPP_MSG_IF_COUNTERS);
+ SFLOWUSSpec_setAttr (&spec, SFLOW_VPP_ATTR_PORTNAME, hw->name,
+ vec_len (hw->name));
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFINDEX, sfif->hw_if_index);
+ if (sfif->linux_if_index)
+ {
+ // We know the corresponding Linux ifIndex for this interface, so include
+ // that here.
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_OSINDEX,
+ sfif->linux_if_index);
+ }
+
+ // Report consistent with vpp-snmp-agent
+ u64 ifSpeed = (hw->link_speed == ~0) ? 0 : (hw->link_speed * 1000);
+ if (startsWith (hw->name, "loop") || startsWith (hw->name, "tap"))
+ ifSpeed = 1e9;
+
+ u32 ifType = startsWith (hw->name, "loop") ? 24 // softwareLoopback
+ :
+ 6; // ethernetCsmacd
+
+ u32 ifDirection = (hw->flags & VNET_HW_INTERFACE_FLAG_HALF_DUPLEX) ?
+ 2 // half-duplex
+ :
+ 1; // full-duplex
+
+ u32 operUp = (hw->flags & VNET_HW_INTERFACE_FLAG_LINK_UP) ? 1 : 0;
+ u32 adminUp = (sw->flags & VNET_SW_INTERFACE_FLAG_ADMIN_UP) ? 1 : 0;
+
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFSPEED, ifSpeed);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFTYPE, ifType);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFDIRECTION, ifDirection);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_OPER_UP, operUp);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_ADMIN_UP, adminUp);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_OCTETS, ifCtrs.rx.byts);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_OCTETS, ifCtrs.tx.byts);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_PKTS, ifCtrs.rx.pkts);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_PKTS, ifCtrs.tx.pkts);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_MCASTS, ifCtrs.rx.m_pkts);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_MCASTS, ifCtrs.tx.m_pkts);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_BCASTS, ifCtrs.rx.b_pkts);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_BCASTS, ifCtrs.tx.b_pkts);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_ERRORS, ifCtrs.rx.errs);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_ERRORS, ifCtrs.tx.errs);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_DISCARDS, ifCtrs.rx.drps);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_DISCARDS, ifCtrs.tx.drps);
+ SFLOWUSSpec_setAttr (&spec, SFLOW_VPP_ATTR_HW_ADDRESS, hw->hw_address,
+ vec_len (hw->hw_address));
+ smp->unixsock_seq++;
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_SEQ, smp->unixsock_seq);
+ if (SFLOWUSSpec_send (&smp->sflow_usersock, &spec) < 0)
+ smp->csample_send_drops++;
+ smp->csample_send++;
+}
+
+static u32
+total_drops (sflow_main_t *smp)
+{
+ // sum sendmsg and worker-fifo drops
+ u32 all_drops = smp->psample_send_drops;
+ for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++)
+ {
+ sflow_per_thread_data_t *sfwk =
+ vec_elt_at_index (smp->per_thread_data, thread_index);
+ all_drops += sfwk->drop;
+ }
+ return all_drops;
+}
+
+static void
+send_sampling_status_info (sflow_main_t *smp)
+{
+ SFLOWUSSpec spec = {};
+ u32 all_pipeline_drops = total_drops (smp);
+ SFLOWUSSpec_setMsgType (&spec, SFLOW_VPP_MSG_STATUS);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_UPTIME_S, smp->now_mono_S);
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_DROPS, all_pipeline_drops);
+ ++smp->unixsock_seq;
+ SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_SEQ, smp->unixsock_seq);
+ SFLOWUSSpec_send (&smp->sflow_usersock, &spec);
+}
+
+static int
+counter_polling_check (sflow_main_t *smp)
+{
+ // see if we should poll one or more interfaces
+ int polled = 0;
+ for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++)
+ {
+ sflow_per_interface_data_t *sfif =
+ vec_elt_at_index (smp->per_interface_data, ii);
+ if (sfif && sfif->sflow_enabled &&
+ (sfif->polled == 0 // always send the first time
+ || (smp->now_mono_S % smp->pollingS) ==
+ (sfif->hw_if_index % smp->pollingS)))
+ {
+ update_counters (smp, sfif);
+ sfif->polled++;
+ polled++;
+ }
+ }
+ return polled;
+}
+
+static u32
+read_worker_fifos (sflow_main_t *smp)
+{
+ // Our maximum samples/sec is approximately:
+ // (SFLOW_READ_BATCH * smp->total_threads) / SFLOW_POLL_WAIT_S
+ // but it may also be affected by SFLOW_FIFO_DEPTH
+ // and whether vlib_process_wait_for_event_or_clock() really waits for
+ // SFLOW_POLL_WAIT_S every time.
+ // If there are too many samples then dropping them as early as possible
+ // (and as randomly as possible) is preferred, so SFLOW_FIFO_DEPTH should not
+ // be any bigger than it strictly needs to be. If there is a system
+ // bottleneck it could be in the PSAMPLE netlink channel, the hsflowd
+ // encoder, the UDP stack, the network path, the collector, or a faraway
+ // application. Any kind of "clipping" will result in systematic bias so we
+ // try to make this fair even when it's running hot. For example, we'll
+ // round-robin the thread FIFO dequeues here to make sure we give them equal
+ // access to the PSAMPLE channel. Another factor in sizing SFLOW_FIFO_DEPTH
+ // is to ensure that we can absorb a short-term line-rate burst without
+ // dropping samples. This implies a deeper FIFO. In fact it looks like this
+ // requirement ends up being the dominant one. A value of SFLOW_FIFO_DEPTH
+ // that will absorb an n-second line-rate burst may well result in the max
+ // sustainable samples/sec being higher than we really need. But it's not a
+ // serious problem because the samples are packed into UDP datagrams and the
+ // network or collector can drop those anywhere they need to. The protocol is
+ // designed to be tolerant to random packet-loss in transit. For example, 1%
+ // loss should just make it look like the sampling-rate setting was 1:10100
+ // instead of 1:10000.
+ u32 batch = 0;
+ for (; batch < SFLOW_READ_BATCH; batch++)
+ {
+ u32 psample_send = 0, psample_send_fail = 0;
+ for (u32 thread_index = 0; thread_index < smp->total_threads;
+ thread_index++)
+ {
+ sflow_per_thread_data_t *sfwk =
+ vec_elt_at_index (smp->per_thread_data, thread_index);
+ sflow_sample_t sample;
+ if (sflow_fifo_dequeue (&sfwk->fifo, &sample))
+ {
+ if (sample.header_bytes > smp->headerB)
+ {
+ // We get here if header-bytes setting is reduced dynamically
+ // and a sample that was in the FIFO appears with a larger
+ // header.
+ continue;
+ }
+ SFLOWPSSpec spec = {};
+ u32 ps_group = SFLOW_VPP_PSAMPLE_GROUP_INGRESS;
+ u32 seqNo = ++smp->psample_seq_ingress;
+ // TODO: is it always ethernet? (affects ifType counter as well)
+ u16 header_protocol = 1; /* ethernet */
+ SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_SAMPLE_GROUP,
+ ps_group);
+ SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_IIFINDEX,
+ sample.input_if_index);
+ SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_OIFINDEX,
+ sample.output_if_index);
+ SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_ORIGSIZE,
+ sample.sampled_packet_size);
+ SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_GROUP_SEQ,
+ seqNo);
+ SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_SAMPLE_RATE,
+ sample.samplingN);
+ SFLOWPSSpec_setAttr (&spec, SFLOWPS_PSAMPLE_ATTR_DATA,
+ sample.header, sample.header_bytes);
+ SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_PROTO,
+ header_protocol);
+ psample_send++;
+ if (SFLOWPSSpec_send (&smp->sflow_psample, &spec) < 0)
+ psample_send_fail++;
+ }
+ }
+ if (psample_send == 0)
+ {
+ // nothing found on FIFOs this time through, so terminate batch early
+ break;
+ }
+ else
+ {
+ vlib_node_increment_counter (smp->vlib_main, sflow_node.index,
+ SFLOW_ERROR_PSAMPLE_SEND, psample_send);
+ if (psample_send_fail > 0)
+ {
+ vlib_node_increment_counter (smp->vlib_main, sflow_node.index,
+ SFLOW_ERROR_PSAMPLE_SEND_FAIL,
+ psample_send_fail);
+ smp->psample_send_drops += psample_send_fail;
+ }
+ }
+ }
+ return batch;
+}
+
+static void
+read_node_counters (sflow_main_t *smp, sflow_err_ctrs_t *ctrs)
+{
+ for (u32 ec = 0; ec < SFLOW_N_ERROR; ec++)
+ ctrs->counters[ec] = 0;
+ for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++)
+ {
+ sflow_per_thread_data_t *sfwk =
+ vec_elt_at_index (smp->per_thread_data, thread_index);
+ ctrs->counters[SFLOW_ERROR_PROCESSED] += sfwk->pool;
+ ctrs->counters[SFLOW_ERROR_SAMPLED] += sfwk->smpl;
+ ctrs->counters[SFLOW_ERROR_DROPPED] += sfwk->drop;
+ }
+}
+
+static void
+update_node_cntr (sflow_main_t *smp, sflow_err_ctrs_t *prev,
+ sflow_err_ctrs_t *latest, sflow_error_t ee)
+{
+ u32 delta = latest->counters[ee] - prev->counters[ee];
+ vlib_node_increment_counter (smp->vlib_main, sflow_node.index, ee, delta);
+}
+
+static void
+update_node_counters (sflow_main_t *smp, sflow_err_ctrs_t *prev,
+ sflow_err_ctrs_t *latest)
+{
+ update_node_cntr (smp, prev, latest, SFLOW_ERROR_PROCESSED);
+ update_node_cntr (smp, prev, latest, SFLOW_ERROR_SAMPLED);
+ update_node_cntr (smp, prev, latest, SFLOW_ERROR_DROPPED);
+ *prev = *latest; // latch for next time
+}
+
+static uword
+sflow_process_samples (vlib_main_t *vm, vlib_node_runtime_t *node,
+ vlib_frame_t *frame)
+{
+ sflow_main_t *smp = &sflow_main;
+ clib_time_t ctm;
+ clib_time_init (&ctm);
+
+ sflow_err_ctrs_t prev = {};
+ read_node_counters (smp, &prev);
+
+ while (1)
+ {
+
+ // We don't have anything for the main loop to edge-trigger on, so
+ // we are just asking to be called back regularly. More regularly
+ // if sFlow is actually enabled...
+ f64 poll_wait_S = smp->running ? SFLOW_POLL_WAIT_S : 1.0;
+ vlib_process_wait_for_event_or_clock (vm, poll_wait_S);
+ if (!smp->running)
+ {
+ // Nothing to do. Just yield again.
+ continue;
+ }
+
+#ifdef SFLOW_USE_VAPI
+#ifdef SFLOW_TEST_HAMMER_VAPI
+ sflow_vapi_check_for_linux_if_index_results (&smp->vac,
+ smp->per_interface_data);
+ sflow_vapi_read_linux_if_index_numbers (&smp->vac,
+ smp->per_interface_data);
+#endif
+#endif
+
+ // PSAMPLE channel may need extra step (e.g. to learn family_id)
+ // before it is ready to send
+ EnumSFLOWPSState psState = SFLOWPS_state (&smp->sflow_psample);
+ if (psState != SFLOWPS_STATE_READY)
+ {
+ SFLOWPS_open_step (&smp->sflow_psample);
+ }
+
+ // What we want is a monotonic, per-second clock. This seems to do it
+ // because it is based on the CPU clock.
+ f64 tnow = clib_time_now (&ctm);
+ u32 tnow_S = (u32) tnow;
+ if (tnow_S != smp->now_mono_S)
+ {
+ // second rollover
+ smp->now_mono_S = tnow_S;
+#ifdef SFLOW_USE_VAPI
+ if (!smp->vac.vapi_unavailable)
+ {
+ // look up linux if_index numbers
+ sflow_vapi_check_for_linux_if_index_results (
+ &smp->vac, smp->per_interface_data);
+ if (smp->vapi_requests == 0 ||
+ (tnow_S % SFLOW_VAPI_POLL_INTERVAL) == 0)
+ {
+ if (sflow_vapi_read_linux_if_index_numbers (
+ &smp->vac, smp->per_interface_data))
+ {
+ smp->vapi_requests++;
+ }
+ }
+ }
+#endif
+ // send status info
+ send_sampling_status_info (smp);
+ // poll counters for interfaces that are due
+ counter_polling_check (smp);
+ }
+ // process samples from workers
+ read_worker_fifos (smp);
+
+ // and sync the global counters
+ sflow_err_ctrs_t latest = {};
+ read_node_counters (smp, &latest);
+ update_node_counters (smp, &prev, &latest);
+ }
+ return 0;
+}
+
+VLIB_REGISTER_NODE (sflow_process_samples_node, static) = {
+ .function = sflow_process_samples,
+ .name = "sflow-process-samples",
+ .type = VLIB_NODE_TYPE_PROCESS,
+ .process_log2_n_stack_bytes = 17,
+};
+
+static void
+sflow_set_worker_sampling_state (sflow_main_t *smp)
+{
+ /* set up (or reset) sampling context for each thread */
+ vlib_thread_main_t *tm = &vlib_thread_main;
+ smp->total_threads = 1 + tm->n_threads;
+ vec_validate (smp->per_thread_data, smp->total_threads);
+ for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++)
+ {
+ sflow_per_thread_data_t *sfwk =
+ vec_elt_at_index (smp->per_thread_data, thread_index);
+ if (sfwk->smpN != smp->samplingN)
+ {
+ sfwk->smpN = smp->samplingN;
+ sfwk->seed = thread_index;
+ sfwk->skip = sflow_next_random_skip (sfwk);
+ SFLOW_DBG (
+ "sflowset_worker_sampling_state: samplingN=%u thread=%u skip=%u",
+ smp->samplingN, thread_index, sfwk->skip);
+ }
+ }
+}
+
+static void
+sflow_sampling_start (sflow_main_t *smp)
+{
+ SFLOW_INFO ("sflow_sampling_start");
+
+ smp->running = 1;
+ // Reset this clock so that the per-second netlink status updates
+ // will communicate a restart to hsflowd. This helps to distinguish:
+ // (1) vpp restarted with sFlow off => no status updates (went quiet)
+ // (2) vpp restarted with default sFlow => status updates (starting again
+ // from 0)
+ smp->now_mono_S = 0;
+
+ // reset sequence numbers to indicated discontinuity
+ smp->psample_seq_ingress = 0;
+ smp->psample_seq_egress = 0;
+ smp->psample_send_drops = 0;
+
+#ifdef SFLOW_USE_VAPI
+ // reset vapi request count so that we make a request the first time
+ smp->vapi_requests = 0;
+#endif
+
+ /* open PSAMPLE netlink channel for writing packet samples */
+ SFLOWPS_open (&smp->sflow_psample);
+ /* open USERSOCK netlink channel for writing counters */
+ SFLOWUS_open (&smp->sflow_usersock);
+ smp->sflow_usersock.group_id = SFLOW_NETLINK_USERSOCK_MULTICAST;
+ /* set up (or reset) sampling context for each thread */
+ sflow_set_worker_sampling_state (smp);
+}
+
+static void
+sflow_sampling_stop (sflow_main_t *smp)
+{
+ SFLOW_INFO ("sflow_sampling_stop");
+ smp->running = 0;
+ SFLOWPS_close (&smp->sflow_psample);
+ SFLOWUS_close (&smp->sflow_usersock);
+}
+
+static void
+sflow_sampling_start_stop (sflow_main_t *smp)
+{
+ int run = (smp->samplingN != 0 && smp->interfacesEnabled != 0);
+ if (run != smp->running)
+ {
+ if (run)
+ sflow_sampling_start (smp);
+ else
+ sflow_sampling_stop (smp);
+ }
+}
+
+int
+sflow_sampling_rate (sflow_main_t *smp, u32 samplingN)
+{
+ // TODO: this might be the right place to enforce the
+ // "2 significant" figures constraint so that per-interface
+ // sampling-rate settings can use HCF+sub-sampling efficiently.
+
+ if (smp->running && smp->samplingN && samplingN)
+ {
+ // dynamic change of sampling rate
+ smp->samplingN = samplingN;
+ sflow_set_worker_sampling_state (smp);
+ }
+ else
+ {
+ // potential on/off change
+ smp->samplingN = samplingN;
+ sflow_sampling_start_stop (smp);
+ }
+ return 0;
+}
+
+int
+sflow_polling_interval (sflow_main_t *smp, u32 pollingS)
+{
+ smp->pollingS = pollingS;
+ return 0;
+}
+
+int
+sflow_header_bytes (sflow_main_t *smp, u32 headerB)
+{
+ u32 hdrB = headerB;
+ // first round up to nearest multiple of SFLOW_HEADER_BYTES_STEP
+ // (which helps to make worker thread memcpy faster)
+ hdrB = ((hdrB + SFLOW_HEADER_BYTES_STEP - 1) / SFLOW_HEADER_BYTES_STEP) *
+ SFLOW_HEADER_BYTES_STEP;
+ // then check max/min
+ if (hdrB < SFLOW_MIN_HEADER_BYTES)
+ hdrB = SFLOW_MIN_HEADER_BYTES;
+ if (hdrB > SFLOW_MAX_HEADER_BYTES)
+ hdrB = SFLOW_MAX_HEADER_BYTES;
+ if (hdrB != headerB)
+ SFLOW_WARN ("header_bytes rounded from %u to %u\n", headerB, hdrB);
+ smp->headerB = hdrB;
+ return 0;
+}
+
+int
+sflow_enable_disable (sflow_main_t *smp, u32 sw_if_index, int enable_disable)
+{
+ vnet_sw_interface_t *sw;
+
+ /* Utterly wrong? */
+ if (pool_is_free_index (smp->vnet_main->interface_main.sw_interfaces,
+ sw_if_index))
+ return VNET_API_ERROR_INVALID_SW_IF_INDEX;
+
+ /* Not a physical port? */
+ sw = vnet_get_sw_interface (smp->vnet_main, sw_if_index);
+ if (sw->type != VNET_SW_INTERFACE_TYPE_HARDWARE)
+ return VNET_API_ERROR_INVALID_SW_IF_INDEX;
+
+ // note: vnet_interface_main_t has "fast lookup table" called
+ // he_if_index_by_sw_if_index.
+ SFLOW_DBG ("sw_if_index=%u, sup_sw_if_index=%u, hw_if_index=%u\n",
+ sw->sw_if_index, sw->sup_sw_if_index, sw->hw_if_index);
+
+ // note: vnet_hw_interface_t has uword *bond_info
+ // (where 0=>none, ~0 => slave, other=>ptr to bitmap of slaves)
+
+ vec_validate (smp->per_interface_data, sw->hw_if_index);
+ sflow_per_interface_data_t *sfif =
+ vec_elt_at_index (smp->per_interface_data, sw->hw_if_index);
+ if (enable_disable == sfif->sflow_enabled)
+ {
+ // redundant enable or disable
+ return VNET_API_ERROR_VALUE_EXIST;
+ }
+ else
+ {
+ // OK, turn it on/off
+ sfif->sw_if_index = sw_if_index;
+ sfif->hw_if_index = sw->hw_if_index;
+ sfif->polled = 0;
+ sfif->sflow_enabled = enable_disable;
+ vnet_feature_enable_disable ("device-input", "sflow", sw_if_index,
+ enable_disable, 0, 0);
+ smp->interfacesEnabled += (enable_disable) ? 1 : -1;
+ }
+
+ sflow_sampling_start_stop (smp);
+ return 0;
+}
+
+static clib_error_t *
+sflow_sampling_rate_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ sflow_main_t *smp = &sflow_main;
+ u32 sampling_N = ~0;
+
+ int rv;
+
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "%u", &sampling_N))
+ ;
+ else
+ break;
+ }
+
+ if (sampling_N == ~0)
+ return clib_error_return (0, "Please specify a sampling rate...");
+
+ rv = sflow_sampling_rate (smp, sampling_N);
+
+ switch (rv)
+ {
+ case 0:
+ break;
+ default:
+ return clib_error_return (0, "sflow_enable_disable returned %d", rv);
+ }
+ return 0;
+}
+
+static clib_error_t *
+sflow_polling_interval_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ sflow_main_t *smp = &sflow_main;
+ u32 polling_S = ~0;
+
+ int rv;
+
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "%u", &polling_S))
+ ;
+ else
+ break;
+ }
+
+ if (polling_S == ~0)
+ return clib_error_return (0, "Please specify a polling interval...");
+
+ rv = sflow_polling_interval (smp, polling_S);
+
+ switch (rv)
+ {
+ case 0:
+ break;
+ default:
+ return clib_error_return (0, "sflow_polling_interval returned %d", rv);
+ }
+ return 0;
+}
+
+static clib_error_t *
+sflow_header_bytes_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ sflow_main_t *smp = &sflow_main;
+ u32 header_B = ~0;
+
+ int rv;
+
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "%u", &header_B))
+ ;
+ else
+ break;
+ }
+
+ if (header_B == ~0)
+ return clib_error_return (0, "Please specify a header bytes limit...");
+
+ rv = sflow_header_bytes (smp, header_B);
+
+ switch (rv)
+ {
+ case 0:
+ break;
+ default:
+ return clib_error_return (0, "sflow_header_bytes returned %d", rv);
+ }
+ return 0;
+}
+
+static clib_error_t *
+sflow_enable_disable_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ sflow_main_t *smp = &sflow_main;
+ u32 sw_if_index = ~0;
+ int enable_disable = 1;
+
+ int rv;
+
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "disable"))
+ enable_disable = 0;
+ else if (unformat (input, "%U", unformat_vnet_sw_interface,
+ smp->vnet_main, &sw_if_index))
+ ;
+ else
+ break;
+ }
+
+ if (sw_if_index == ~0)
+ return clib_error_return (0, "Please specify an interface...");
+
+ rv = sflow_enable_disable (smp, sw_if_index, enable_disable);
+
+ switch (rv)
+ {
+ case 0:
+ break;
+
+ case VNET_API_ERROR_INVALID_SW_IF_INDEX:
+ return clib_error_return (
+ 0, "Invalid interface, only works on physical ports");
+ break;
+
+ case VNET_API_ERROR_UNIMPLEMENTED:
+ return clib_error_return (0,
+ "Device driver doesn't support redirection");
+ break;
+
+ default:
+ return clib_error_return (0, "sflow_enable_disable returned %d", rv);
+ }
+ return 0;
+}
+
+static clib_error_t *
+show_sflow_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
+{
+ sflow_main_t *smp = &sflow_main;
+ clib_error_t *error = NULL;
+ vlib_cli_output (vm, "sflow sampling-rate %u\n", smp->samplingN);
+ vlib_cli_output (vm, "sflow sampling-direction ingress\n");
+ vlib_cli_output (vm, "sflow polling-interval %u\n", smp->pollingS);
+ vlib_cli_output (vm, "sflow header-bytes %u\n", smp->headerB);
+ u32 itfs_enabled = 0;
+ for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++)
+ {
+ sflow_per_interface_data_t *sfif =
+ vec_elt_at_index (smp->per_interface_data, ii);
+ if (sfif && sfif->sflow_enabled)
+ {
+ itfs_enabled++;
+ vnet_hw_interface_t *hw =
+ vnet_get_hw_interface (smp->vnet_main, sfif->hw_if_index);
+ vlib_cli_output (vm, "sflow enable %s\n", (char *) hw->name);
+ }
+ }
+ vlib_cli_output (vm, "Status\n");
+ vlib_cli_output (vm, " interfaces enabled: %u\n", itfs_enabled);
+ vlib_cli_output (vm, " packet samples sent: %u\n",
+ smp->psample_seq_ingress + smp->psample_seq_egress);
+ vlib_cli_output (vm, " packet samples dropped: %u\n", total_drops (smp));
+ vlib_cli_output (vm, " counter samples sent: %u\n", smp->csample_send);
+ vlib_cli_output (vm, " counter samples dropped: %u\n",
+ smp->csample_send_drops);
+ return error;
+}
+
+VLIB_CLI_COMMAND (sflow_enable_disable_command, static) = {
+ .path = "sflow enable-disable",
+ .short_help = "sflow enable-disable <interface-name> [disable]",
+ .function = sflow_enable_disable_command_fn,
+};
+
+VLIB_CLI_COMMAND (sflow_sampling_rate_command, static) = {
+ .path = "sflow sampling-rate",
+ .short_help = "sflow sampling-rate <N>",
+ .function = sflow_sampling_rate_command_fn,
+};
+
+VLIB_CLI_COMMAND (sflow_polling_interval_command, static) = {
+ .path = "sflow polling-interval",
+ .short_help = "sflow polling-interval <S>",
+ .function = sflow_polling_interval_command_fn,
+};
+
+VLIB_CLI_COMMAND (sflow_header_bytes_command, static) = {
+ .path = "sflow header-bytes",
+ .short_help = "sflow header-bytes <B>",
+ .function = sflow_header_bytes_command_fn,
+};
+
+VLIB_CLI_COMMAND (show_sflow_command, static) = {
+ .path = "show sflow",
+ .short_help = "show sflow",
+ .function = show_sflow_command_fn,
+};
+
+/* API message handler */
+static void
+vl_api_sflow_enable_disable_t_handler (vl_api_sflow_enable_disable_t *mp)
+{
+ vl_api_sflow_enable_disable_reply_t *rmp;
+ sflow_main_t *smp = &sflow_main;
+ int rv;
+
+ rv = sflow_enable_disable (smp, ntohl (mp->hw_if_index),
+ (int) (mp->enable_disable));
+
+ REPLY_MACRO (VL_API_SFLOW_ENABLE_DISABLE_REPLY);
+}
+
+static void
+vl_api_sflow_sampling_rate_set_t_handler (vl_api_sflow_sampling_rate_set_t *mp)
+{
+ vl_api_sflow_sampling_rate_set_reply_t *rmp;
+ sflow_main_t *smp = &sflow_main;
+ int rv;
+
+ rv = sflow_sampling_rate (smp, ntohl (mp->sampling_N));
+
+ REPLY_MACRO (VL_API_SFLOW_SAMPLING_RATE_SET_REPLY);
+}
+
+static void
+vl_api_sflow_sampling_rate_get_t_handler (vl_api_sflow_sampling_rate_get_t *mp)
+{
+ vl_api_sflow_sampling_rate_get_reply_t *rmp;
+ sflow_main_t *smp = &sflow_main;
+
+ REPLY_MACRO_DETAILS2 (VL_API_SFLOW_SAMPLING_RATE_GET_REPLY,
+ ({ rmp->sampling_N = ntohl (smp->samplingN); }));
+}
+
+static void
+vl_api_sflow_polling_interval_set_t_handler (
+ vl_api_sflow_polling_interval_set_t *mp)
+{
+ vl_api_sflow_polling_interval_set_reply_t *rmp;
+ sflow_main_t *smp = &sflow_main;
+ int rv;
+
+ rv = sflow_polling_interval (smp, ntohl (mp->polling_S));
+
+ REPLY_MACRO (VL_API_SFLOW_POLLING_INTERVAL_SET_REPLY);
+}
+
+static void
+vl_api_sflow_polling_interval_get_t_handler (
+ vl_api_sflow_polling_interval_get_t *mp)
+{
+ vl_api_sflow_polling_interval_get_reply_t *rmp;
+ sflow_main_t *smp = &sflow_main;
+
+ REPLY_MACRO_DETAILS2 (VL_API_SFLOW_POLLING_INTERVAL_GET_REPLY,
+ ({ rmp->polling_S = ntohl (smp->pollingS); }));
+}
+
+static void
+vl_api_sflow_header_bytes_set_t_handler (vl_api_sflow_header_bytes_set_t *mp)
+{
+ vl_api_sflow_header_bytes_set_reply_t *rmp;
+ sflow_main_t *smp = &sflow_main;
+ int rv;
+
+ rv = sflow_header_bytes (smp, ntohl (mp->header_B));
+
+ REPLY_MACRO (VL_API_SFLOW_HEADER_BYTES_SET_REPLY);
+}
+
+static void
+vl_api_sflow_header_bytes_get_t_handler (vl_api_sflow_header_bytes_get_t *mp)
+{
+ vl_api_sflow_header_bytes_get_reply_t *rmp;
+ sflow_main_t *smp = &sflow_main;
+
+ REPLY_MACRO_DETAILS2 (VL_API_SFLOW_HEADER_BYTES_GET_REPLY,
+ ({ rmp->header_B = ntohl (smp->headerB); }));
+}
+
+static void
+send_sflow_interface_details (vpe_api_main_t *am, vl_api_registration_t *reg,
+ u32 context, const u32 hw_if_index)
+{
+ vl_api_sflow_interface_details_t *mp;
+ sflow_main_t *smp = &sflow_main;
+
+ mp = vl_msg_api_alloc_zero (sizeof (*mp));
+ mp->_vl_msg_id = ntohs (REPLY_MSG_ID_BASE + VL_API_SFLOW_INTERFACE_DETAILS);
+ mp->context = context;
+
+ mp->hw_if_index = htonl (hw_if_index);
+ vl_api_send_msg (reg, (u8 *) mp);
+}
+
+static void
+vl_api_sflow_interface_dump_t_handler (vl_api_sflow_interface_dump_t *mp)
+{
+ vpe_api_main_t *am = &vpe_api_main;
+ sflow_main_t *smp = &sflow_main;
+ vl_api_registration_t *reg;
+ u32 hw_if_index = ~0;
+
+ reg = vl_api_client_index_to_registration (mp->client_index);
+ if (!reg)
+ return;
+ hw_if_index = ntohl (mp->hw_if_index);
+
+ for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++)
+ {
+ sflow_per_interface_data_t *sfif =
+ vec_elt_at_index (smp->per_interface_data, ii);
+ if (sfif && sfif->sflow_enabled)
+ {
+ if (hw_if_index == ~0 || hw_if_index == sfif->hw_if_index)
+ {
+ send_sflow_interface_details (am, reg, mp->context,
+ sfif->hw_if_index);
+ }
+ }
+ }
+}
+
+/* API definitions */
+#include <sflow/sflow.api.c>
+
+static clib_error_t *
+sflow_init (vlib_main_t *vm)
+{
+ sflow_logger = vlib_log_register_class ("sflow", "all");
+
+ sflow_main_t *smp = &sflow_main;
+ clib_error_t *error = 0;
+
+ smp->vlib_main = vm;
+ smp->vnet_main = vnet_get_main ();
+
+ /* set default sampling-rate and polling-interval so that "enable" is all
+ * that is necessary */
+ smp->samplingN = SFLOW_DEFAULT_SAMPLING_N;
+ smp->pollingS = SFLOW_DEFAULT_POLLING_S;
+ smp->headerB = SFLOW_DEFAULT_HEADER_BYTES;
+
+ /* Add our API messages to the global name_crc hash table */
+ smp->msg_id_base = setup_message_id_table ();
+
+ /* access to counters - TODO: should this only happen on sflow enable? */
+ sflow_stat_segment_client_init ();
+ return error;
+}
+
+VLIB_INIT_FUNCTION (sflow_init);
+
+VNET_FEATURE_INIT (sflow, static) = {
+ .arc_name = "device-input",
+ .node_name = "sflow",
+ .runs_before = VNET_FEATURES ("ethernet-input"),
+};
+
+VLIB_PLUGIN_REGISTER () = {
+ .version = VPP_BUILD_VER,
+ .description = "sFlow random packet sampling",
+};
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow.h b/src/plugins/sflow/sflow.h
new file mode 100644
index 00000000000..609ff723816
--- /dev/null
+++ b/src/plugins/sflow/sflow.h
@@ -0,0 +1,198 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __included_sflow_h__
+#define __included_sflow_h__
+
+#include <vnet/vnet.h>
+#include <vnet/ip/ip.h>
+#include <vnet/ethernet/ethernet.h>
+
+#include <vppinfra/hash.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow_common.h>
+#include <sflow/sflow_vapi.h>
+#include <sflow/sflow_psample.h>
+#include <sflow/sflow_usersock.h>
+
+#define SFLOW_DEFAULT_SAMPLING_N 10000
+#define SFLOW_DEFAULT_POLLING_S 20
+#define SFLOW_DEFAULT_HEADER_BYTES 128
+#define SFLOW_MAX_HEADER_BYTES 256
+#define SFLOW_MIN_HEADER_BYTES 64
+#define SFLOW_HEADER_BYTES_STEP 32
+
+#define SFLOW_FIFO_DEPTH 2048 // must be power of 2
+#define SFLOW_POLL_WAIT_S 0.001
+#define SFLOW_READ_BATCH 100
+
+// use PSAMPLE group number to distinguish VPP samples from others
+// (so that hsflowd will know to remap the ifIndex numbers if necessary)
+#define SFLOW_VPP_PSAMPLE_GROUP_INGRESS 3
+#define SFLOW_VPP_PSAMPLE_GROUP_EGRESS 4
+
+#define foreach_sflow_error \
+ _ (PROCESSED, "sflow packets processed") \
+ _ (SAMPLED, "sflow packets sampled") \
+ _ (DROPPED, "sflow packets dropped") \
+ _ (PSAMPLE_SEND, "sflow PSAMPLE sent") \
+ _ (PSAMPLE_SEND_FAIL, "sflow PSAMPLE send failed")
+
+typedef enum
+{
+#define _(sym, str) SFLOW_ERROR_##sym,
+ foreach_sflow_error
+#undef _
+ SFLOW_N_ERROR,
+} sflow_error_t;
+
+typedef struct
+{
+ u32 counters[SFLOW_N_ERROR];
+} sflow_err_ctrs_t;
+
+/* packet sample */
+typedef struct
+{
+ u32 samplingN;
+ u32 input_if_index;
+ u32 output_if_index;
+ u32 header_protocol;
+ u32 sampled_packet_size;
+ u32 header_bytes;
+ u8 header[SFLOW_MAX_HEADER_BYTES];
+} sflow_sample_t;
+
+// Define SPSC FIFO for sending samples worker-to-main.
+// (I did try to use VPP svm FIFO, but couldn't
+// understand why it was sometimes going wrong).
+typedef struct
+{
+ volatile u32 tx; // can change under consumer's feet
+ volatile u32 rx; // can change under producer's feet
+ sflow_sample_t samples[SFLOW_FIFO_DEPTH];
+} sflow_fifo_t;
+
+#define SFLOW_FIFO_NEXT(slot) ((slot + 1) & (SFLOW_FIFO_DEPTH - 1))
+static inline int
+sflow_fifo_enqueue (sflow_fifo_t *fifo, sflow_sample_t *sample)
+{
+ u32 curr_rx = clib_atomic_load_acq_n (&fifo->rx);
+ u32 curr_tx = fifo->tx; // clib_atomic_load_acq_n(&fifo->tx);
+ u32 next_tx = SFLOW_FIFO_NEXT (curr_tx);
+ if (next_tx == curr_rx)
+ return false; // full
+ memcpy (&fifo->samples[next_tx], sample, sizeof (*sample));
+ clib_atomic_store_rel_n (&fifo->tx, next_tx);
+ return true;
+}
+
+static inline int
+sflow_fifo_dequeue (sflow_fifo_t *fifo, sflow_sample_t *sample)
+{
+ u32 curr_rx = fifo->rx; // clib_atomic_load_acq_n(&fifo->rx);
+ u32 curr_tx = clib_atomic_load_acq_n (&fifo->tx);
+ if (curr_rx == curr_tx)
+ return false; // empty
+ memcpy (sample, &fifo->samples[curr_rx], sizeof (*sample));
+ u32 next_rx = SFLOW_FIFO_NEXT (curr_rx);
+ clib_atomic_store_rel_n (&fifo->rx, next_rx);
+ return true;
+}
+
+/* private to worker */
+typedef struct
+{
+ u32 smpN;
+ u32 skip;
+ u32 pool;
+ u32 seed;
+ u32 smpl;
+ u32 drop;
+ CLIB_CACHE_LINE_ALIGN_MARK (_fifo);
+ sflow_fifo_t fifo;
+} sflow_per_thread_data_t;
+
+typedef struct
+{
+ /* API message ID base */
+ u16 msg_id_base;
+
+ /* convenience */
+ vlib_main_t *vlib_main;
+ vnet_main_t *vnet_main;
+ ethernet_main_t *ethernet_main;
+
+ /* sampling state */
+ u32 samplingN;
+ u32 pollingS;
+ u32 headerB;
+ u32 total_threads;
+ sflow_per_interface_data_t *per_interface_data;
+ sflow_per_thread_data_t *per_thread_data;
+
+ /* psample channel (packet samples) */
+ SFLOWPS sflow_psample;
+ /* usersock channel (periodic counters) */
+ SFLOWUS sflow_usersock;
+#define SFLOW_NETLINK_USERSOCK_MULTICAST 29
+ /* dropmon channel (packet drops) */
+ // SFLOWDM sflow_dropmon;
+
+ /* sample-processing */
+ u32 now_mono_S;
+
+ /* running control */
+ int running;
+ u32 interfacesEnabled;
+
+ /* main-thread counters */
+ u32 psample_seq_ingress;
+ u32 psample_seq_egress;
+ u32 psample_send_drops;
+ u32 csample_send;
+ u32 csample_send_drops;
+ u32 unixsock_seq;
+#ifdef SFLOW_USE_VAPI
+ /* vapi query helper thread (transient) */
+ CLIB_CACHE_LINE_ALIGN_MARK (_vapi);
+ sflow_vapi_client_t vac;
+ int vapi_requests;
+#endif
+} sflow_main_t;
+
+extern sflow_main_t sflow_main;
+
+extern vlib_node_registration_t sflow_node;
+
+static inline u32
+sflow_next_random_skip (sflow_per_thread_data_t *sfwk)
+{
+ /* skip==1 means "take the next packet" so this
+ fn must never return 0 */
+ if (sfwk->smpN <= 1)
+ return 1;
+ u32 lim = (2 * sfwk->smpN) - 1;
+ return (random_u32 (&sfwk->seed) % lim) + 1;
+}
+
+#endif /* __included_sflow_h__ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow.rst b/src/plugins/sflow/sflow.rst
new file mode 100644
index 00000000000..f9c18488363
--- /dev/null
+++ b/src/plugins/sflow/sflow.rst
@@ -0,0 +1,61 @@
+.. _Sflow_agent:
+
+.. toctree::
+
+SFlow Monitoring Agent
+======================
+
+Overview
+________
+
+This plugin implements the random packet-sampling and interface
+telemetry streaming required to support standard sFlow export
+on Linux platforms. The overhead incurred by this monitoring is
+minimal, so that detailed, real-time traffic analysis can be
+achieved even under high load conditions, with visibility into
+any fields that appear in the packet headers. If the VPP linux-cp
+plugin is running then interfaces will be mapped to their
+equivalent Linux tap ports.
+
+Example Configuration
+_____________________
+
+::
+ sflow sampling-rate 10000
+ sflow polling-interval 20
+ sflow header-bytes 128
+ sflow enable GigabitEthernet0/8/0
+ sflow enable GigabitEthernet0/9/0
+ sflow enable GigabitEthernet0/a/0
+ ...
+ sflow enable GigabitEthernet0/a/0 disable
+
+Detailed notes
+______________
+
+Each VPP worker that has at least one interface, will create a FIFO
+and enqueues samples to it from the interfaces it is servicing that
+are enabled. There is a process running in the main thread that will
+dequeue the FIFOs periodically. If the FIFO is full, the worker will
+drop samples, which helps ensure that (a) the main thread is not
+overloaded with samples and (b) that individual workers and interfaces,
+even when under high load, can't crowd out other interfaces and workers.
+
+You can change the sampling-rate at runtime, but keep in mind that
+it is a global variable that applies to workers, not interfaces.
+This means that (1) all workers will sample at the same rate, and (2)
+if there are multiple interfaces assigned to a worker, they'll share
+the sampling rate which will undershoot, and similarly (3) if there
+are multiple RX queues assigned to more than one worker, the effective
+sampling rate will overshoot.
+
+External Dependencies
+_____________________
+
+This plugin writes packet samples to the standard Linux netlink PSAMPLE
+channel, so the kernel psample module must be loaded with modprobe or
+insmod. As such, this plugin only works for Linux environments.
+
+It also shares periodic interface counter samples vi netlink USERSOCK.
+The host-sflow daemon, hsflowd, at https://sflow.net is one example of
+a tool that will consume this feed and emit standard sFlow v5.
diff --git a/src/plugins/sflow/sflow_common.h b/src/plugins/sflow/sflow_common.h
new file mode 100644
index 00000000000..29784638bb9
--- /dev/null
+++ b/src/plugins/sflow/sflow_common.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __included_sflow_common_h__
+#define __included_sflow_common_h__
+
+// #define SFLOW_USE_VAPI (set by CMakeLists.txt)
+
+extern vlib_log_class_t sflow_logger;
+#define SFLOW_DBG(...) vlib_log_debug (sflow_logger, __VA_ARGS__);
+#define SFLOW_INFO(...) vlib_log_info (sflow_logger, __VA_ARGS__);
+#define SFLOW_NOTICE(...) vlib_log_notice (sflow_logger, __VA_ARGS__);
+#define SFLOW_WARN(...) vlib_log_warn (sflow_logger, __VA_ARGS__);
+#define SFLOW_ERR(...) vlib_log_err (sflow_logger, __VA_ARGS__);
+
+typedef struct
+{
+ u32 sw_if_index;
+ u32 hw_if_index;
+ u32 linux_if_index;
+ u32 polled;
+ int sflow_enabled;
+} sflow_per_interface_data_t;
+
+#endif /* __included_sflow_common_h__ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow_psample.c b/src/plugins/sflow/sflow_psample.c
new file mode 100644
index 00000000000..0e4fcfbe790
--- /dev/null
+++ b/src/plugins/sflow/sflow_psample.c
@@ -0,0 +1,523 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined(__cplusplus)
+extern "C"
+{
+#endif
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+#include <fcntl.h>
+#include <asm/types.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include <linux/genetlink.h>
+#include <linux/psample.h>
+#include <signal.h>
+#include <ctype.h>
+
+#include <sflow/sflow_psample.h>
+
+ /*_________________---------------------------__________________
+ _________________ fcntl utils __________________
+ -----------------___________________________------------------
+ */
+
+ static void
+ setNonBlocking (int fd)
+ {
+ // set the socket to non-blocking
+ int fdFlags = fcntl (fd, F_GETFL);
+ fdFlags |= O_NONBLOCK;
+ if (fcntl (fd, F_SETFL, fdFlags) < 0)
+ {
+ SFLOW_ERR ("fcntl(O_NONBLOCK) failed: %s\n", strerror (errno));
+ }
+ }
+
+ static void
+ setCloseOnExec (int fd)
+ {
+ // make sure it doesn't get inherited, e.g. when we fork a script
+ int fdFlags = fcntl (fd, F_GETFD);
+ fdFlags |= FD_CLOEXEC;
+ if (fcntl (fd, F_SETFD, fdFlags) < 0)
+ {
+ SFLOW_ERR ("fcntl(F_SETFD=FD_CLOEXEC) failed: %s\n", strerror (errno));
+ }
+ }
+
+ static int
+ setSendBuffer (int fd, int requested)
+ {
+ int txbuf = 0;
+ socklen_t txbufsiz = sizeof (txbuf);
+ if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, &txbufsiz) < 0)
+ {
+ SFLOW_ERR ("getsockopt(SO_SNDBUF) failed: %s", strerror (errno));
+ }
+ if (txbuf < requested)
+ {
+ txbuf = requested;
+ if (setsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, sizeof (txbuf)) < 0)
+ {
+ SFLOW_WARN ("setsockopt(SO_TXBUF=%d) failed: %s", requested,
+ strerror (errno));
+ }
+ // see what we actually got
+ txbufsiz = sizeof (txbuf);
+ if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, &txbufsiz) < 0)
+ {
+ SFLOW_ERR ("getsockopt(SO_SNDBUF) failed: %s", strerror (errno));
+ }
+ }
+ return txbuf;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ generic_pid __________________
+ -----------------___________________________------------------
+ choose a 32-bit id that is likely to be unique even if more
+ than one module in this process wants to bind a netlink socket
+ */
+
+ static u32
+ generic_pid (u32 mod_id)
+ {
+ return (mod_id << 16) | getpid ();
+ }
+
+ /*_________________---------------------------__________________
+ _________________ generic_open __________________
+ -----------------___________________________------------------
+ */
+
+ static int
+ generic_open (u32 mod_id)
+ {
+ int nl_sock = socket (AF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
+ if (nl_sock < 0)
+ {
+ SFLOW_ERR ("nl_sock open failed: %s\n", strerror (errno));
+ return -1;
+ }
+ // bind to a suitable id
+ struct sockaddr_nl sa = { .nl_family = AF_NETLINK,
+ .nl_pid = generic_pid (mod_id) };
+ if (bind (nl_sock, (struct sockaddr *) &sa, sizeof (sa)) < 0)
+ SFLOW_ERR ("generic_open: bind failed: %s\n", strerror (errno));
+ setNonBlocking (nl_sock);
+ setCloseOnExec (nl_sock);
+ return nl_sock;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ generic_send __________________
+ -----------------___________________________------------------
+ */
+
+ static int
+ generic_send (int sockfd, u32 mod_id, int type, int cmd, int req_type,
+ void *req, int req_len, int req_footprint, u32 seqNo)
+ {
+ struct nlmsghdr nlh = {};
+ struct genlmsghdr ge = {};
+ struct nlattr attr = {};
+
+ attr.nla_len = sizeof (attr) + req_len;
+ attr.nla_type = req_type;
+
+ ge.cmd = cmd;
+ ge.version = 1;
+
+ nlh.nlmsg_len = NLMSG_LENGTH (req_footprint + sizeof (attr) + sizeof (ge));
+ nlh.nlmsg_flags = NLM_F_REQUEST | NLM_F_ACK;
+ nlh.nlmsg_type = type;
+ nlh.nlmsg_seq = seqNo;
+ nlh.nlmsg_pid = generic_pid (mod_id);
+
+ struct iovec iov[4] = { { .iov_base = &nlh, .iov_len = sizeof (nlh) },
+ { .iov_base = &ge, .iov_len = sizeof (ge) },
+ { .iov_base = &attr, .iov_len = sizeof (attr) },
+ { .iov_base = req, .iov_len = req_footprint } };
+
+ struct sockaddr_nl sa = { .nl_family = AF_NETLINK };
+ struct msghdr msg = { .msg_name = &sa,
+ .msg_namelen = sizeof (sa),
+ .msg_iov = iov,
+ .msg_iovlen = 4 };
+ return sendmsg (sockfd, &msg, 0);
+ }
+
+ /*_________________---------------------------__________________
+ _________________ getFamily_PSAMPLE __________________
+ -----------------___________________________------------------
+ */
+
+ static void
+ getFamily_PSAMPLE (SFLOWPS *pst)
+ {
+#define SFLOWPS_FAM_LEN sizeof (PSAMPLE_GENL_NAME)
+#define SFLOWPS_FAM_FOOTPRINT NLMSG_ALIGN (SFLOWPS_FAM_LEN)
+ char fam_name[SFLOWPS_FAM_FOOTPRINT] = {};
+ memcpy (fam_name, PSAMPLE_GENL_NAME, SFLOWPS_FAM_LEN);
+ generic_send (pst->nl_sock, pst->id, GENL_ID_CTRL, CTRL_CMD_GETFAMILY,
+ CTRL_ATTR_FAMILY_NAME, fam_name, SFLOWPS_FAM_LEN,
+ SFLOWPS_FAM_FOOTPRINT, ++pst->nl_seq);
+ pst->state = SFLOWPS_STATE_WAIT_FAMILY;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ processNetlink_GENERIC __________________
+ -----------------___________________________------------------
+ */
+
+ static void
+ processNetlink_GENERIC (SFLOWPS *pst, struct nlmsghdr *nlh)
+ {
+ char *msg = (char *) NLMSG_DATA (nlh);
+ int msglen = nlh->nlmsg_len - NLMSG_HDRLEN;
+ struct genlmsghdr *genl = (struct genlmsghdr *) msg;
+ SFLOW_DBG ("generic netlink CMD = %u\n", genl->cmd);
+
+ for (int offset = GENL_HDRLEN; offset < msglen;)
+ {
+ struct nlattr *attr = (struct nlattr *) (msg + offset);
+ if (attr->nla_len == 0 || (attr->nla_len + offset) > msglen)
+ {
+ SFLOW_ERR ("processNetlink_GENERIC attr parse error\n");
+ break; // attr parse error
+ }
+ char *attr_datap = (char *) attr + NLA_HDRLEN;
+ switch (attr->nla_type)
+ {
+ case CTRL_ATTR_VERSION:
+ pst->genetlink_version = *(u32 *) attr_datap;
+ break;
+ case CTRL_ATTR_FAMILY_ID:
+ pst->family_id = *(u16 *) attr_datap;
+ SFLOW_DBG ("generic family id: %u\n", pst->family_id);
+ break;
+ case CTRL_ATTR_FAMILY_NAME:
+ SFLOW_DBG ("generic family name: %s\n", attr_datap);
+ break;
+ case CTRL_ATTR_MCAST_GROUPS:
+ for (int grp_offset = NLA_HDRLEN; grp_offset < attr->nla_len;)
+ {
+ struct nlattr *grp_attr =
+ (struct nlattr *) (msg + offset + grp_offset);
+ if (grp_attr->nla_len == 0 ||
+ (grp_attr->nla_len + grp_offset) > attr->nla_len)
+ {
+ SFLOW_ERR (
+ "processNetlink_GENERIC grp_attr parse error\n");
+ break;
+ }
+ char *grp_name = NULL;
+ u32 grp_id = 0;
+ for (int gf_offset = NLA_HDRLEN;
+ gf_offset < grp_attr->nla_len;)
+ {
+ struct nlattr *gf_attr =
+ (struct nlattr *) (msg + offset + grp_offset +
+ gf_offset);
+ if (gf_attr->nla_len == 0 ||
+ (gf_attr->nla_len + gf_offset) > grp_attr->nla_len)
+ {
+ SFLOW_ERR (
+ "processNetlink_GENERIC gf_attr parse error\n");
+ break;
+ }
+ char *grp_attr_datap = (char *) gf_attr + NLA_HDRLEN;
+ switch (gf_attr->nla_type)
+ {
+ case CTRL_ATTR_MCAST_GRP_NAME:
+ grp_name = grp_attr_datap;
+ SFLOW_DBG ("psample multicast group: %s\n", grp_name);
+ break;
+ case CTRL_ATTR_MCAST_GRP_ID:
+ grp_id = *(u32 *) grp_attr_datap;
+ SFLOW_DBG ("psample multicast group id: %u\n", grp_id);
+ break;
+ }
+ gf_offset += NLMSG_ALIGN (gf_attr->nla_len);
+ }
+ if (pst->group_id == 0 && grp_name && grp_id &&
+ !strcmp (grp_name, PSAMPLE_NL_MCGRP_SAMPLE_NAME))
+ {
+ SFLOW_DBG ("psample found group %s=%u\n", grp_name,
+ grp_id);
+ pst->group_id = grp_id;
+ // We don't need to join the group if we are only sending
+ // to it.
+ }
+
+ grp_offset += NLMSG_ALIGN (grp_attr->nla_len);
+ }
+ break;
+ default:
+ SFLOW_DBG ("psample attr type: %u (nested=%u) len: %u\n",
+ attr->nla_type, attr->nla_type & NLA_F_NESTED,
+ attr->nla_len);
+ break;
+ }
+ offset += NLMSG_ALIGN (attr->nla_len);
+ }
+ if (pst->family_id && pst->group_id)
+ {
+ SFLOW_DBG ("psample state->READY\n");
+ pst->state = SFLOWPS_STATE_READY;
+ }
+ }
+
+ // TODO: we can take out the fns for reading PSAMPLE here
+
+ /*_________________---------------------------__________________
+ _________________ processNetlink __________________
+ -----------------___________________________------------------
+ */
+
+ static void
+ processNetlink (SFLOWPS *pst, struct nlmsghdr *nlh)
+ {
+ if (nlh->nlmsg_type == NETLINK_GENERIC)
+ {
+ processNetlink_GENERIC (pst, nlh);
+ }
+ else if (nlh->nlmsg_type == pst->family_id)
+ {
+ // We are write-only, don't need to read these.
+ }
+ }
+
+ /*_________________---------------------------__________________
+ _________________ readNetlink_PSAMPLE __________________
+ -----------------___________________________------------------
+ */
+
+ static void
+ readNetlink_PSAMPLE (SFLOWPS *pst, int fd)
+ {
+ uint8_t recv_buf[SFLOWPS_PSAMPLE_READNL_RCV_BUF];
+ int numbytes = recv (fd, recv_buf, sizeof (recv_buf), 0);
+ if (numbytes <= 0)
+ {
+ SFLOW_ERR ("readNetlink_PSAMPLE returned %d : %s\n", numbytes,
+ strerror (errno));
+ return;
+ }
+ struct nlmsghdr *nlh = (struct nlmsghdr *) recv_buf;
+ while (NLMSG_OK (nlh, numbytes))
+ {
+ if (nlh->nlmsg_type == NLMSG_DONE)
+ break;
+ if (nlh->nlmsg_type == NLMSG_ERROR)
+ {
+ struct nlmsgerr *err_msg = (struct nlmsgerr *) NLMSG_DATA (nlh);
+ if (err_msg->error == 0)
+ {
+ SFLOW_DBG ("received Netlink ACK\n");
+ }
+ else
+ {
+ SFLOW_ERR ("error in netlink message: %d : %s\n",
+ err_msg->error, strerror (-err_msg->error));
+ }
+ return;
+ }
+ processNetlink (pst, nlh);
+ nlh = NLMSG_NEXT (nlh, numbytes);
+ }
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWPS_open __________________
+ -----------------___________________________------------------
+ */
+
+ bool
+ SFLOWPS_open (SFLOWPS *pst)
+ {
+ if (pst->nl_sock == 0)
+ {
+ pst->nl_sock = generic_open (pst->id);
+ if (pst->nl_sock > 0)
+ {
+ pst->state = SFLOWPS_STATE_OPEN;
+ setSendBuffer (pst->nl_sock, SFLOWPS_PSAMPLE_READNL_SND_BUF);
+ getFamily_PSAMPLE (pst);
+ }
+ }
+ return (pst->nl_sock > 0);
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWPS_close __________________
+ -----------------___________________________------------------
+ */
+
+ bool
+ SFLOWPS_close (SFLOWPS *pst)
+ {
+ if (pst->nl_sock > 0)
+ {
+ int err = close (pst->nl_sock);
+ if (err == 0)
+ {
+ pst->nl_sock = 0;
+ return true;
+ }
+ else
+ {
+ SFLOW_ERR ("SFLOWPS_close: returned %d : %s\n", err,
+ strerror (errno));
+ }
+ }
+ return false;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWPS_state __________________
+ -----------------___________________________------------------
+ */
+
+ EnumSFLOWPSState
+ SFLOWPS_state (SFLOWPS *pst)
+ {
+ return pst->state;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWPS_open_step __________________
+ -----------------___________________________------------------
+ */
+
+ EnumSFLOWPSState
+ SFLOWPS_open_step (SFLOWPS *pst)
+ {
+ switch (pst->state)
+ {
+ case SFLOWPS_STATE_INIT:
+ SFLOWPS_open (pst);
+ break;
+ case SFLOWPS_STATE_OPEN:
+ getFamily_PSAMPLE (pst);
+ break;
+ case SFLOWPS_STATE_WAIT_FAMILY:
+ readNetlink_PSAMPLE (pst, pst->nl_sock);
+ break;
+ case SFLOWPS_STATE_READY:
+ break;
+ }
+ return pst->state;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWPSSpec_setAttr __________________
+ -----------------___________________________------------------
+ */
+
+ bool
+ SFLOWPSSpec_setAttr (SFLOWPSSpec *spec, EnumSFLOWPSAttributes field,
+ void *val, int len)
+ {
+ SFLOWPSAttr *psa = &spec->attr[field];
+ if (psa->included)
+ return false;
+ psa->included = true;
+ int expected_len = SFLOWPS_Fields[field].len;
+ if (expected_len && expected_len != len)
+ {
+ SFLOW_ERR ("SFLOWPSSpec_setAttr(%s) length=%u != expected: %u\n",
+ SFLOWPS_Fields[field].descr, len, expected_len);
+ return false;
+ }
+ psa->attr.nla_type = field;
+ psa->attr.nla_len = sizeof (psa->attr) + len;
+ int len_w_pad = NLMSG_ALIGN (len);
+ psa->val.iov_len = len_w_pad;
+ psa->val.iov_base = val;
+ spec->n_attrs++;
+ spec->attrs_len += sizeof (psa->attr);
+ spec->attrs_len += len_w_pad;
+ return true;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWPSSpec_send __________________
+ -----------------___________________________------------------
+ */
+
+ int
+ SFLOWPSSpec_send (SFLOWPS *pst, SFLOWPSSpec *spec)
+ {
+ spec->nlh.nlmsg_len = NLMSG_LENGTH (sizeof (spec->ge) + spec->attrs_len);
+ spec->nlh.nlmsg_flags = 0;
+ spec->nlh.nlmsg_type = pst->family_id;
+ spec->nlh.nlmsg_seq = ++pst->nl_seq;
+ spec->nlh.nlmsg_pid = generic_pid (pst->id);
+
+ spec->ge.cmd = PSAMPLE_CMD_SAMPLE;
+ spec->ge.version = PSAMPLE_GENL_VERSION;
+
+#define MAX_IOV_FRAGMENTS (2 * __SFLOWPS_PSAMPLE_ATTR_MAX) + 2
+
+ struct iovec iov[MAX_IOV_FRAGMENTS];
+ u32 frag = 0;
+ iov[frag].iov_base = &spec->nlh;
+ iov[frag].iov_len = sizeof (spec->nlh);
+ frag++;
+ iov[frag].iov_base = &spec->ge;
+ iov[frag].iov_len = sizeof (spec->ge);
+ frag++;
+ int nn = 0;
+ for (u32 ii = 0; ii < __SFLOWPS_PSAMPLE_ATTR_MAX; ii++)
+ {
+ SFLOWPSAttr *psa = &spec->attr[ii];
+ if (psa->included)
+ {
+ nn++;
+ iov[frag].iov_base = &psa->attr;
+ iov[frag].iov_len = sizeof (psa->attr);
+ frag++;
+ iov[frag] = psa->val; // struct copy
+ frag++;
+ }
+ }
+ ASSERT (nn == spec->n_attrs);
+
+ struct sockaddr_nl da = { .nl_family = AF_NETLINK,
+ .nl_groups = (1 << (pst->group_id - 1)) };
+
+ struct msghdr msg = { .msg_name = &da,
+ .msg_namelen = sizeof (da),
+ .msg_iov = iov,
+ .msg_iovlen = frag };
+
+ int status = sendmsg (pst->nl_sock, &msg, 0);
+ if (status <= 0)
+ {
+ SFLOW_ERR ("strerror(errno) = %s; errno = %d\n", strerror (errno),
+ errno);
+ return -1;
+ }
+ return 0;
+ }
diff --git a/src/plugins/sflow/sflow_psample.h b/src/plugins/sflow/sflow_psample.h
new file mode 100644
index 00000000000..5d4944231fd
--- /dev/null
+++ b/src/plugins/sflow/sflow_psample.h
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __included_sflow_psample_h__
+#define __included_sflow_psample_h__
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+#include <asm/types.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include <linux/genetlink.h>
+#include <linux/psample.h>
+#include <signal.h>
+#include <ctype.h>
+
+// #define SFLOWPS_DEBUG
+
+#define SFLOWPS_PSAMPLE_READNL_RCV_BUF 8192
+#define SFLOWPS_PSAMPLE_READNL_SND_BUF 1000000
+
+/* Shadow the attributes in linux/psample.h so
+ * we can easily compile/test fields that are not
+ * defined on the kernel we are compiling on.
+ */
+typedef enum
+{
+#define SFLOWPS_FIELDDATA(field, len, descr) field,
+#include "sflow/sflow_psample_fields.h"
+#undef SFLOWPS_FIELDDATA
+ __SFLOWPS_PSAMPLE_ATTR_MAX
+} EnumSFLOWPSAttributes;
+
+typedef struct _SFLOWPS_field_t
+{
+ EnumSFLOWPSAttributes field;
+ int len;
+ char *descr;
+} SFLOWPS_field_t;
+
+static const SFLOWPS_field_t SFLOWPS_Fields[] = {
+#define SFLOWPS_FIELDDATA(field, len, descr) { field, len, descr },
+#include "sflow/sflow_psample_fields.h"
+#undef SFLOWPS_FIELDDATA
+};
+
+typedef enum
+{
+ SFLOWPS_STATE_INIT,
+ SFLOWPS_STATE_OPEN,
+ SFLOWPS_STATE_WAIT_FAMILY,
+ SFLOWPS_STATE_READY
+} EnumSFLOWPSState;
+
+typedef struct _SFLOWPS
+{
+ EnumSFLOWPSState state;
+ u32 id;
+ int nl_sock;
+ u32 nl_seq;
+ u32 genetlink_version;
+ u16 family_id;
+ u32 group_id;
+} SFLOWPS;
+
+typedef struct _SFLOWPSAttr
+{
+ bool included : 1;
+ struct nlattr attr;
+ struct iovec val;
+} SFLOWPSAttr;
+
+typedef struct _SFLOWPSSpec
+{
+ struct nlmsghdr nlh;
+ struct genlmsghdr ge;
+ SFLOWPSAttr attr[__SFLOWPS_PSAMPLE_ATTR_MAX];
+ int n_attrs;
+ int attrs_len;
+} SFLOWPSSpec;
+
+bool SFLOWPS_open (SFLOWPS *pst);
+bool SFLOWPS_close (SFLOWPS *pst);
+EnumSFLOWPSState SFLOWPS_state (SFLOWPS *pst);
+EnumSFLOWPSState SFLOWPS_open_step (SFLOWPS *pst);
+
+bool SFLOWPSSpec_setAttr (SFLOWPSSpec *spec, EnumSFLOWPSAttributes field,
+ void *buf, int len);
+#define SFLOWPSSpec_setAttrInt(spec, field, val) \
+ SFLOWPSSpec_setAttr ((spec), (field), &(val), sizeof (val))
+
+int SFLOWPSSpec_send (SFLOWPS *pst, SFLOWPSSpec *spec);
+
+#endif /* __included_sflow_psample_h__ */
diff --git a/src/plugins/sflow/sflow_psample_fields.h b/src/plugins/sflow/sflow_psample_fields.h
new file mode 100644
index 00000000000..72d484c4850
--- /dev/null
+++ b/src/plugins/sflow/sflow_psample_fields.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_IIFINDEX, 4, "input if_index")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OIFINDEX, 4, "output if_index")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_ORIGSIZE, 4, "original packet size")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_SAMPLE_GROUP, 4, "group number")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_GROUP_SEQ, 4, "group sequence number")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_SAMPLE_RATE, 4, "sampling N")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_DATA, 0, "sampled header")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_TUNNEL, 0, "tunnel header")
+
+/* commands attributes */
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_GROUP_REFCOUNT, 0,
+ "group reference count")
+
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_PAD, 0, "pad bytes")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OUT_TC, 2, "egress queue number")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OUT_TC_OCC, 8,
+ "egress queue depth in bytes")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_LATENCY, 8,
+ "transit latency in nanoseconds")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_TIMESTAMP, 8, "timestamp")
+SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_PROTO, 2, "header protocol")
diff --git a/src/plugins/sflow/sflow_test.c b/src/plugins/sflow/sflow_test.c
new file mode 100644
index 00000000000..554806640e3
--- /dev/null
+++ b/src/plugins/sflow/sflow_test.c
@@ -0,0 +1,298 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vat/vat.h>
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vppinfra/error.h>
+#include <stdbool.h>
+
+#define __plugin_msg_base sflow_test_main.msg_id_base
+#include <vlibapi/vat_helper_macros.h>
+
+uword unformat_sw_if_index (unformat_input_t *input, va_list *args);
+
+/* Declare message IDs */
+#include <sflow/sflow.api_enum.h>
+#include <sflow/sflow.api_types.h>
+
+typedef struct
+{
+ /* API message ID base */
+ u16 msg_id_base;
+ vat_main_t *vat_main;
+} sflow_test_main_t;
+
+sflow_test_main_t sflow_test_main;
+
+static int
+api_sflow_enable_disable (vat_main_t *vam)
+{
+ unformat_input_t *i = vam->input;
+ int enable_disable = 1;
+ u32 hw_if_index = ~0;
+ vl_api_sflow_enable_disable_t *mp;
+ int ret;
+
+ /* Parse args required to build the message */
+ while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (i, "%U", unformat_sw_if_index, vam, &hw_if_index))
+ ;
+ else if (unformat (i, "disable"))
+ enable_disable = 0;
+ else
+ break;
+ }
+
+ if (hw_if_index == ~0)
+ {
+ errmsg ("missing interface name / explicit hw_if_index number \n");
+ return -99;
+ }
+
+ /* Construct the API message */
+ M (SFLOW_ENABLE_DISABLE, mp);
+ mp->hw_if_index = ntohl (hw_if_index);
+ mp->enable_disable = enable_disable;
+
+ /* send it... */
+ S (mp);
+
+ /* Wait for a reply... */
+ W (ret);
+ return ret;
+}
+
+static void
+vl_api_sflow_sampling_rate_get_reply_t_handler (
+ vl_api_sflow_sampling_rate_get_reply_t *mp)
+{
+ vat_main_t *vam = sflow_test_main.vat_main;
+ clib_warning ("sflow sampling_N: %d", ntohl (mp->sampling_N));
+ vam->result_ready = 1;
+}
+
+static int
+api_sflow_sampling_rate_get (vat_main_t *vam)
+{
+ vl_api_sflow_sampling_rate_get_t *mp;
+ int ret;
+
+ /* Construct the API message */
+ M (SFLOW_SAMPLING_RATE_GET, mp);
+
+ /* send it... */
+ S (mp);
+
+ /* Wait for a reply... */
+ W (ret);
+ return ret;
+}
+
+static int
+api_sflow_sampling_rate_set (vat_main_t *vam)
+{
+ unformat_input_t *i = vam->input;
+ u32 sampling_N = ~0;
+ vl_api_sflow_sampling_rate_set_t *mp;
+ int ret;
+
+ /* Parse args required to build the message */
+ while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (i, "sampling_N %d", &sampling_N))
+ ;
+ else
+ break;
+ }
+
+ if (sampling_N == ~0)
+ {
+ errmsg ("missing sampling_N number \n");
+ return -99;
+ }
+
+ /* Construct the API message */
+ M (SFLOW_SAMPLING_RATE_SET, mp);
+ mp->sampling_N = ntohl (sampling_N);
+
+ /* send it... */
+ S (mp);
+
+ /* Wait for a reply... */
+ W (ret);
+ return ret;
+}
+
+static void
+vl_api_sflow_polling_interval_get_reply_t_handler (
+ vl_api_sflow_polling_interval_get_reply_t *mp)
+{
+ vat_main_t *vam = sflow_test_main.vat_main;
+ clib_warning ("sflow polling-interval: %d", ntohl (mp->polling_S));
+ vam->result_ready = 1;
+}
+
+static int
+api_sflow_polling_interval_get (vat_main_t *vam)
+{
+ vl_api_sflow_polling_interval_get_t *mp;
+ int ret;
+
+ /* Construct the API message */
+ M (SFLOW_POLLING_INTERVAL_GET, mp);
+
+ /* send it... */
+ S (mp);
+
+ /* Wait for a reply... */
+ W (ret);
+ return ret;
+}
+
+static int
+api_sflow_polling_interval_set (vat_main_t *vam)
+{
+ unformat_input_t *i = vam->input;
+ u32 polling_S = ~0;
+ vl_api_sflow_polling_interval_set_t *mp;
+ int ret;
+
+ /* Parse args required to build the message */
+ while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (i, "polling_S %d", &polling_S))
+ ;
+ else
+ break;
+ }
+
+ if (polling_S == ~0)
+ {
+ errmsg ("missing polling_S number \n");
+ return -99;
+ }
+
+ /* Construct the API message */
+ M (SFLOW_POLLING_INTERVAL_SET, mp);
+ mp->polling_S = ntohl (polling_S);
+
+ /* send it... */
+ S (mp);
+
+ /* Wait for a reply... */
+ W (ret);
+ return ret;
+}
+
+static void
+vl_api_sflow_header_bytes_get_reply_t_handler (
+ vl_api_sflow_header_bytes_get_reply_t *mp)
+{
+ vat_main_t *vam = sflow_test_main.vat_main;
+ clib_warning ("sflow header-bytes: %d", ntohl (mp->header_B));
+ vam->result_ready = 1;
+}
+
+static int
+api_sflow_header_bytes_get (vat_main_t *vam)
+{
+ vl_api_sflow_header_bytes_get_t *mp;
+ int ret;
+
+ /* Construct the API message */
+ M (SFLOW_HEADER_BYTES_GET, mp);
+
+ /* send it... */
+ S (mp);
+
+ /* Wait for a reply... */
+ W (ret);
+ return ret;
+}
+
+static int
+api_sflow_header_bytes_set (vat_main_t *vam)
+{
+ unformat_input_t *i = vam->input;
+ u32 header_B = ~0;
+ vl_api_sflow_header_bytes_set_t *mp;
+ int ret;
+
+ /* Parse args required to build the message */
+ while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (i, "header_B %d", &header_B))
+ ;
+ else
+ break;
+ }
+
+ if (header_B == ~0)
+ {
+ errmsg ("missing header_B number \n");
+ return -99;
+ }
+
+ /* Construct the API message */
+ M (SFLOW_HEADER_BYTES_SET, mp);
+ mp->header_B = ntohl (header_B);
+
+ /* send it... */
+ S (mp);
+
+ /* Wait for a reply... */
+ W (ret);
+ return ret;
+}
+
+static void
+vl_api_sflow_interface_details_t_handler (vl_api_sflow_interface_details_t *mp)
+{
+ vat_main_t *vam = sflow_test_main.vat_main;
+ clib_warning ("sflow enable: %d", ntohl (mp->hw_if_index));
+ vam->result_ready = 1;
+}
+
+static int
+api_sflow_interface_dump (vat_main_t *vam)
+{
+ vl_api_sflow_interface_dump_t *mp;
+ int ret;
+
+ /* Construct the API message */
+ M (SFLOW_INTERFACE_DUMP, mp);
+
+ /* send it... */
+ S (mp);
+
+ /* Wait for a reply... */
+ W (ret);
+ return ret;
+}
+
+/*
+ * List of messages that the sflow test plugin sends,
+ * and that the data plane plugin processes
+ */
+#include <sflow/sflow.api_test.c>
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow_usersock.c b/src/plugins/sflow/sflow_usersock.c
new file mode 100644
index 00000000000..0ccb947709a
--- /dev/null
+++ b/src/plugins/sflow/sflow_usersock.c
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if defined(__cplusplus)
+extern "C"
+{
+#endif
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+#include <fcntl.h>
+#include <asm/types.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include <signal.h>
+#include <ctype.h>
+
+#include <sflow/sflow_usersock.h>
+
+ /*_________________---------------------------__________________
+ _________________ fcntl utils __________________
+ -----------------___________________________------------------
+ */
+
+ static void
+ setNonBlocking (int fd)
+ {
+ // set the socket to non-blocking
+ int fdFlags = fcntl (fd, F_GETFL);
+ fdFlags |= O_NONBLOCK;
+ if (fcntl (fd, F_SETFL, fdFlags) < 0)
+ {
+ SFLOW_ERR ("fcntl(O_NONBLOCK) failed: %s\n", strerror (errno));
+ }
+ }
+
+ static void
+ setCloseOnExec (int fd)
+ {
+ // make sure it doesn't get inherited, e.g. when we fork a script
+ int fdFlags = fcntl (fd, F_GETFD);
+ fdFlags |= FD_CLOEXEC;
+ if (fcntl (fd, F_SETFD, fdFlags) < 0)
+ {
+ SFLOW_ERR ("fcntl(F_SETFD=FD_CLOEXEC) failed: %s\n", strerror (errno));
+ }
+ }
+
+ /*_________________---------------------------__________________
+ _________________ usersock_open __________________
+ -----------------___________________________------------------
+ */
+
+ static int
+ usersock_open (void)
+ {
+ int nl_sock = socket (AF_NETLINK, SOCK_RAW, NETLINK_USERSOCK);
+ if (nl_sock < 0)
+ {
+ SFLOW_ERR ("nl_sock open failed: %s\n", strerror (errno));
+ return -1;
+ }
+ setNonBlocking (nl_sock);
+ setCloseOnExec (nl_sock);
+ return nl_sock;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWUS_open __________________
+ -----------------___________________________------------------
+ */
+
+ bool
+ SFLOWUS_open (SFLOWUS *ust)
+ {
+ if (ust->nl_sock == 0)
+ {
+ ust->nl_sock = usersock_open ();
+ }
+ return true;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWUS_close __________________
+ -----------------___________________________------------------
+ */
+
+ bool
+ SFLOWUS_close (SFLOWUS *ust)
+ {
+ if (ust->nl_sock != 0)
+ {
+ int err = close (ust->nl_sock);
+ if (err == 0)
+ {
+ ust->nl_sock = 0;
+ return true;
+ }
+ else
+ {
+ SFLOW_WARN ("SFLOWUS_close: returned %d : %s\n", err,
+ strerror (errno));
+ }
+ }
+ return false;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWUSSpec_setMsgType __________________
+ -----------------___________________________------------------
+ */
+
+ bool
+ SFLOWUSSpec_setMsgType (SFLOWUSSpec *spec, EnumSFlowVppMsgType msgType)
+ {
+ spec->nlh.nlmsg_type = msgType;
+ return true;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWUSSpec_setAttr __________________
+ -----------------___________________________------------------
+ */
+
+ bool
+ SFLOWUSSpec_setAttr (SFLOWUSSpec *spec, EnumSFlowVppAttributes field,
+ void *val, int len)
+ {
+ SFLOWUSAttr *usa = &spec->attr[field];
+ if (usa->included)
+ return false;
+ usa->included = true;
+ usa->attr.nla_type = field;
+ usa->attr.nla_len = sizeof (usa->attr) + len;
+ int len_w_pad = NLMSG_ALIGN (len);
+ usa->val.iov_len = len_w_pad;
+ usa->val.iov_base = val;
+ spec->n_attrs++;
+ spec->attrs_len += sizeof (usa->attr);
+ spec->attrs_len += len_w_pad;
+ return true;
+ }
+
+ /*_________________---------------------------__________________
+ _________________ SFLOWUSSpec_send __________________
+ -----------------___________________________------------------
+ */
+
+ int
+ SFLOWUSSpec_send (SFLOWUS *ust, SFLOWUSSpec *spec)
+ {
+ spec->nlh.nlmsg_len = NLMSG_LENGTH (spec->attrs_len);
+ spec->nlh.nlmsg_flags = 0;
+ spec->nlh.nlmsg_seq = ++ust->nl_seq;
+ spec->nlh.nlmsg_pid = getpid ();
+
+#define MAX_IOV_FRAGMENTS (2 * __SFLOW_VPP_ATTR_MAX) + 2
+
+ struct iovec iov[MAX_IOV_FRAGMENTS];
+ u32 frag = 0;
+ iov[frag].iov_base = &spec->nlh;
+ iov[frag].iov_len = sizeof (spec->nlh);
+ frag++;
+ int nn = 0;
+ for (u32 ii = 0; ii < __SFLOW_VPP_ATTR_MAX; ii++)
+ {
+ SFLOWUSAttr *usa = &spec->attr[ii];
+ if (usa->included)
+ {
+ nn++;
+ iov[frag].iov_base = &usa->attr;
+ iov[frag].iov_len = sizeof (usa->attr);
+ frag++;
+ iov[frag] = usa->val; // struct copy
+ frag++;
+ }
+ }
+ ASSERT (nn == spec->n_attrs);
+
+ struct sockaddr_nl da = {
+ .nl_family = AF_NETLINK,
+ .nl_groups = (1 << (ust->group_id - 1)) // for multicast to the group
+ // .nl_pid = 1e9+6343 // for unicast to receiver bound to netlink socket
+ // with that "pid"
+ };
+
+ struct msghdr msg = { .msg_name = &da,
+ .msg_namelen = sizeof (da),
+ .msg_iov = iov,
+ .msg_iovlen = frag };
+
+ int status = sendmsg (ust->nl_sock, &msg, 0);
+ if (status <= 0)
+ {
+ // Linux replies with ECONNREFUSED when
+ // a multicast is sent via NETLINK_USERSOCK, but
+ // it's not an error so we can just ignore it here.
+ if (errno != ECONNREFUSED)
+ {
+ SFLOW_DBG ("USERSOCK strerror(errno) = %s\n", strerror (errno));
+ return -1;
+ }
+ }
+ return 0;
+ }
diff --git a/src/plugins/sflow/sflow_usersock.h b/src/plugins/sflow/sflow_usersock.h
new file mode 100644
index 00000000000..d66389941a6
--- /dev/null
+++ b/src/plugins/sflow/sflow_usersock.h
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __included_sflow_usersock_h__
+#define __included_sflow_usersock_h__
+
+#include <vlib/vlib.h>
+#include <vnet/vnet.h>
+#include <vnet/pg/pg.h>
+#include <vppinfra/error.h>
+#include <sflow/sflow.h>
+
+#include <asm/types.h>
+#include <sys/socket.h>
+#include <linux/types.h>
+#include <linux/netlink.h>
+#include <signal.h>
+#include <ctype.h>
+
+// ==================== shared with hsflowd mod_vpp =========================
+// See https://github.com/sflow/host-sflow
+
+#define SFLOW_VPP_NETLINK_USERSOCK_MULTICAST 29
+
+typedef enum
+{
+ SFLOW_VPP_MSG_STATUS = 1,
+ SFLOW_VPP_MSG_IF_COUNTERS
+} EnumSFlowVppMsgType;
+
+typedef enum
+{
+ SFLOW_VPP_ATTR_PORTNAME, /* string */
+ SFLOW_VPP_ATTR_IFINDEX, /* u32 */
+ SFLOW_VPP_ATTR_IFTYPE, /* u32 */
+ SFLOW_VPP_ATTR_IFSPEED, /* u64 */
+ SFLOW_VPP_ATTR_IFDIRECTION, /* u32 */
+ SFLOW_VPP_ATTR_OPER_UP, /* u32 */
+ SFLOW_VPP_ATTR_ADMIN_UP, /* u32 */
+ SFLOW_VPP_ATTR_RX_OCTETS, /* u64 */
+ SFLOW_VPP_ATTR_TX_OCTETS, /* u64 */
+ SFLOW_VPP_ATTR_RX_PKTS, /* u64 */
+ SFLOW_VPP_ATTR_TX_PKTS, /* u64 */
+ SFLOW_VPP_ATTR_RX_BCASTS, /* u64 */
+ SFLOW_VPP_ATTR_TX_BCASTS, /* u64 */
+ SFLOW_VPP_ATTR_RX_MCASTS, /* u64 */
+ SFLOW_VPP_ATTR_TX_MCASTS, /* u64 */
+ SFLOW_VPP_ATTR_RX_DISCARDS, /* u64 */
+ SFLOW_VPP_ATTR_TX_DISCARDS, /* u64 */
+ SFLOW_VPP_ATTR_RX_ERRORS, /* u64 */
+ SFLOW_VPP_ATTR_TX_ERRORS, /* u64 */
+ SFLOW_VPP_ATTR_HW_ADDRESS, /* binary */
+ SFLOW_VPP_ATTR_UPTIME_S, /* u32 */
+ SFLOW_VPP_ATTR_OSINDEX, /* u32 Linux ifIndex number, where applicable */
+ SFLOW_VPP_ATTR_DROPS, /* u32 all FIFO and netlink sendmsg drops */
+ SFLOW_VPP_ATTR_SEQ, /* u32 send seq no */
+ /* enum shared with hsflowd, so only add here */
+ __SFLOW_VPP_ATTR_MAX
+} EnumSFlowVppAttributes;
+
+#define SFLOW_VPP_PSAMPLE_GROUP_INGRESS 3
+#define SFLOW_VPP_PSAMPLE_GROUP_EGRESS 4
+
+// =========================================================================
+typedef struct
+{
+ u64 byts;
+ u64 pkts;
+ u64 m_pkts;
+ u64 b_pkts;
+ u64 errs;
+ u64 drps;
+} sflow_ctrs_t;
+
+typedef struct
+{
+ sflow_ctrs_t tx;
+ sflow_ctrs_t rx;
+} sflow_counters_t;
+
+typedef struct _SFLOWUS_field_t
+{
+ EnumSFlowVppAttributes field;
+ int len;
+} SFLOWUS_field_t;
+
+typedef struct _SFLOWUS
+{
+ u32 id;
+ int nl_sock;
+ u32 nl_seq;
+ u32 group_id;
+} SFLOWUS;
+
+typedef struct _SFLOWUSAttr
+{
+ bool included : 1;
+ struct nlattr attr;
+ struct iovec val;
+} SFLOWUSAttr;
+
+typedef struct _SFLOWUSSpec
+{
+ struct nlmsghdr nlh;
+ SFLOWUSAttr attr[__SFLOW_VPP_ATTR_MAX];
+ int n_attrs;
+ int attrs_len;
+} SFLOWUSSpec;
+
+bool SFLOWUS_open (SFLOWUS *ust);
+bool SFLOWUS_close (SFLOWUS *ust);
+
+bool SFLOWUSSpec_setMsgType (SFLOWUSSpec *spec, EnumSFlowVppMsgType type);
+bool SFLOWUSSpec_setAttr (SFLOWUSSpec *spec, EnumSFlowVppAttributes field,
+ void *buf, int len);
+#define SFLOWUSSpec_setAttrInt(spec, field, val) \
+ SFLOWUSSpec_setAttr ((spec), (field), &(val), sizeof (val))
+
+int SFLOWUSSpec_send (SFLOWUS *ust, SFLOWUSSpec *spec);
+
+#endif /* __included_sflow_usersock_h__ */
diff --git a/src/plugins/sflow/sflow_vapi.c b/src/plugins/sflow/sflow_vapi.c
new file mode 100644
index 00000000000..cdc89a54c80
--- /dev/null
+++ b/src/plugins/sflow/sflow_vapi.c
@@ -0,0 +1,226 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sflow/sflow_vapi.h>
+
+#ifdef SFLOW_USE_VAPI
+
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vpp/app/version.h>
+#include <stdbool.h>
+
+#include <vapi/vapi.h>
+#include <vapi/memclnt.api.vapi.h>
+#include <vapi/vlib.api.vapi.h>
+
+#ifdef included_interface_types_api_types_h
+#define defined_vapi_enum_if_status_flags
+#define defined_vapi_enum_mtu_proto
+#define defined_vapi_enum_link_duplex
+#define defined_vapi_enum_sub_if_flags
+#define defined_vapi_enum_rx_mode
+#define defined_vapi_enum_if_type
+#define defined_vapi_enum_direction
+#endif
+#include <vapi/lcp.api.vapi.h>
+
+DEFINE_VAPI_MSG_IDS_LCP_API_JSON;
+
+static vapi_error_e
+my_pair_get_cb (struct vapi_ctx_s *ctx, void *callback_ctx, vapi_error_e rv,
+ bool is_last, vapi_payload_lcp_itf_pair_get_v2_reply *reply)
+{
+ // this is a no-op, but it seems like it's presence is still required. For
+ // example, it is called if the pair lookup does not find anything.
+ return VAPI_OK;
+}
+
+static vapi_error_e
+my_pair_details_cb (struct vapi_ctx_s *ctx, void *callback_ctx,
+ vapi_error_e rv, bool is_last,
+ vapi_payload_lcp_itf_pair_details *details)
+{
+ sflow_per_interface_data_t *sfif =
+ (sflow_per_interface_data_t *) callback_ctx;
+ // Setting this here will mean it is sent to hsflowd with the interface
+ // counters.
+ sfif->linux_if_index = details->vif_index;
+ return VAPI_OK;
+}
+
+static vapi_error_e
+sflow_vapi_connect (sflow_vapi_client_t *vac)
+{
+ vapi_error_e rv = VAPI_OK;
+ vapi_ctx_t ctx = vac->vapi_ctx;
+ if (ctx == NULL)
+ {
+ // first time - open and connect.
+ if ((rv = vapi_ctx_alloc (&ctx)) != VAPI_OK)
+ {
+ SFLOW_ERR ("vap_ctx_alloc() returned %d", rv);
+ }
+ else
+ {
+ vac->vapi_ctx = ctx;
+ if ((rv = vapi_connect_from_vpp (
+ ctx, "api_from_sflow_plugin", SFLOW_VAPI_MAX_REQUEST_Q,
+ SFLOW_VAPI_MAX_RESPONSE_Q, VAPI_MODE_BLOCKING, true)) !=
+ VAPI_OK)
+ {
+ SFLOW_ERR ("vapi_connect_from_vpp() returned %d", rv);
+ }
+ else
+ {
+ // Connected - but is there a handler for the request we want to
+ // send?
+ if (!vapi_is_msg_available (ctx,
+ vapi_msg_id_lcp_itf_pair_add_del_v2))
+ {
+ SFLOW_WARN ("vapi_is_msg_available() returned false => "
+ "linux-cp plugin not loaded");
+ rv = VAPI_EUSER;
+ }
+ }
+ }
+ }
+ return rv;
+}
+
+// in forked thread
+static void *
+get_lcp_itf_pairs (void *magic)
+{
+ sflow_vapi_client_t *vac = magic;
+ vapi_error_e rv = VAPI_OK;
+
+ sflow_per_interface_data_t *intfs = vac->vapi_itfs;
+ vlib_set_thread_name (SFLOW_VAPI_THREAD_NAME);
+ if ((rv = sflow_vapi_connect (vac)) != VAPI_OK)
+ {
+ vac->vapi_unavailable = true;
+ }
+ else
+ {
+ vapi_ctx_t ctx = vac->vapi_ctx;
+
+ for (int ii = 1; ii < vec_len (intfs); ii++)
+ {
+ sflow_per_interface_data_t *sfif = vec_elt_at_index (intfs, ii);
+ if (sfif && sfif->sflow_enabled)
+ {
+ // TODO: if we try non-blocking we might not be able to just pour
+ // all the requests in here. Might be better to do them one at a
+ // time - e.g. when we poll for counters.
+ vapi_msg_lcp_itf_pair_get_v2 *msg =
+ vapi_alloc_lcp_itf_pair_get_v2 (ctx);
+ if (msg)
+ {
+ msg->payload.sw_if_index = sfif->sw_if_index;
+ if ((rv = vapi_lcp_itf_pair_get_v2 (ctx, msg, my_pair_get_cb,
+ sfif, my_pair_details_cb,
+ sfif)) != VAPI_OK)
+ {
+ SFLOW_ERR ("vapi_lcp_itf_pair_get_v2 returned %d", rv);
+ // vapi.h: "message must be freed by vapi_msg_free if not
+ // consumed by vapi_send"
+ vapi_msg_free (ctx, msg);
+ }
+ }
+ }
+ }
+ // We no longer disconnect or free the client structures
+ // vapi_disconnect_from_vpp (ctx);
+ // vapi_ctx_free (ctx);
+ }
+ // indicate that we are done - more portable that using pthread_tryjoin_np()
+ vac->vapi_request_status = (int) rv;
+ clib_atomic_store_rel_n (&vac->vapi_request_active, false);
+ // TODO: how to tell if heap-allocated data is stored separately per thread?
+ // And if so, how to tell the allocator to GC all data for the thread when it
+ // exits?
+ return (void *) rv;
+}
+
+int
+sflow_vapi_read_linux_if_index_numbers (sflow_vapi_client_t *vac,
+ sflow_per_interface_data_t *itfs)
+{
+
+#ifdef SFLOW_VAPI_TEST_PLUGIN_SYMBOL
+ // don't even fork the query thread if the symbol is not there
+ if (!vlib_get_plugin_symbol ("linux_cp_plugin.so", "lcp_itf_pair_get"))
+ {
+ return false;
+ }
+#endif
+ // previous query is done and results extracted?
+ int req_active = clib_atomic_load_acq_n (&vac->vapi_request_active);
+ if (req_active == false && vac->vapi_itfs == NULL)
+ {
+ // make a copy of the current interfaces vector for the lookup thread to
+ // write into
+ vac->vapi_itfs = vec_dup (itfs);
+ pthread_attr_t attr;
+ pthread_attr_init (&attr);
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+ pthread_attr_setstacksize (&attr, VLIB_THREAD_STACK_SIZE);
+ vac->vapi_request_active = true;
+ pthread_create (&vac->vapi_thread, &attr, get_lcp_itf_pairs, vac);
+ pthread_attr_destroy (&attr);
+ return true;
+ }
+ return false;
+}
+
+int
+sflow_vapi_check_for_linux_if_index_results (sflow_vapi_client_t *vac,
+ sflow_per_interface_data_t *itfs)
+{
+ // request completed?
+ // TODO: if we use non-blocking mode do we have to call something here to
+ // receive results?
+ int req_active = clib_atomic_load_acq_n (&vac->vapi_request_active);
+ if (req_active == false && vac->vapi_itfs != NULL)
+ {
+ // yes, extract what we learned
+ // TODO: would not have to do this if vector were array of pointers
+ // to sflow_per_interface_data_t rather than an actual array, but
+ // it does mean we have very clear separation between the threads.
+ for (int ii = 1; ii < vec_len (vac->vapi_itfs); ii++)
+ {
+ sflow_per_interface_data_t *sfif1 =
+ vec_elt_at_index (vac->vapi_itfs, ii);
+ sflow_per_interface_data_t *sfif2 = vec_elt_at_index (itfs, ii);
+ if (sfif1 && sfif2 && sfif1->sflow_enabled && sfif2->sflow_enabled)
+ sfif2->linux_if_index = sfif1->linux_if_index;
+ }
+ vec_free (vac->vapi_itfs);
+ vac->vapi_itfs = NULL;
+ return true;
+ }
+ return false;
+}
+
+#endif /* SFLOW_USE_VAPI */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/plugins/sflow/sflow_vapi.h b/src/plugins/sflow/sflow_vapi.h
new file mode 100644
index 00000000000..640fe997684
--- /dev/null
+++ b/src/plugins/sflow/sflow_vapi.h
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2024 InMon Corp.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __included_sflow_vapi_h__
+#define __included_sflow_vapi_h__
+
+#include <vnet/vnet.h>
+#include <sflow/sflow_common.h>
+
+#ifdef SFLOW_USE_VAPI
+
+#define SFLOW_VAPI_POLL_INTERVAL 5
+#define SFLOW_VAPI_MAX_REQUEST_Q 8
+#define SFLOW_VAPI_MAX_RESPONSE_Q 16
+#define SFLOW_VAPI_THREAD_NAME "sflow_vapi" // must be <= 15 characters
+
+// #define SFLOW_VAPI_TEST_PLUGIN_SYMBOL
+
+typedef struct
+{
+ volatile int vapi_request_active; // to sync main <-> vapi_thread
+ pthread_t vapi_thread;
+ sflow_per_interface_data_t *vapi_itfs;
+ int vapi_unavailable;
+ int vapi_request_status; // written by vapi_thread
+ void *vapi_ctx;
+} sflow_vapi_client_t;
+
+int sflow_vapi_read_linux_if_index_numbers (sflow_vapi_client_t *vac,
+ sflow_per_interface_data_t *itfs);
+int
+sflow_vapi_check_for_linux_if_index_results (sflow_vapi_client_t *vac,
+ sflow_per_interface_data_t *itfs);
+
+#endif /* SFLOW_USE_VAPI */
+#endif /* __included_sflow_vapi_h__ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */