diff options
Diffstat (limited to 'src/plugins/sflow')
-rw-r--r-- | src/plugins/sflow/CMakeLists.txt | 61 | ||||
-rw-r--r-- | src/plugins/sflow/FEATURE.yaml | 16 | ||||
-rw-r--r-- | src/plugins/sflow/node.c | 356 | ||||
-rw-r--r-- | src/plugins/sflow/sflow.api | 198 | ||||
-rw-r--r-- | src/plugins/sflow/sflow.c | 1052 | ||||
-rw-r--r-- | src/plugins/sflow/sflow.h | 198 | ||||
-rw-r--r-- | src/plugins/sflow/sflow.rst | 61 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_common.h | 44 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_psample.c | 523 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_psample.h | 111 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_psample_fields.h | 36 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_test.c | 298 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_usersock.c | 222 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_usersock.h | 133 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_vapi.c | 226 | ||||
-rw-r--r-- | src/plugins/sflow/sflow_vapi.h | 55 |
16 files changed, 3590 insertions, 0 deletions
diff --git a/src/plugins/sflow/CMakeLists.txt b/src/plugins/sflow/CMakeLists.txt new file mode 100644 index 00000000000..35433bd24df --- /dev/null +++ b/src/plugins/sflow/CMakeLists.txt @@ -0,0 +1,61 @@ + +# Copyright (c) 2024 InMon Corp. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +vpp_find_path(NETLINK_INCLUDE_DIR NAMES linux/netlink.h) +if (NOT NETLINK_INCLUDE_DIR) + message(WARNING "netlink headers not found - sflow plugin disabled") + return() +endif() + +if ("${CMAKE_SYSTEM_NAME}" STREQUAL "FreeBSD") + message(WARNING "sflow is not supported on FreeBSD - sflow plugin disabled") + return() +endif() + +LIST(FIND excluded_plugins linux-cp exc_index) +if(${exc_index} EQUAL "-1") + message(WARNING "sflow plugin - linux-cp plugin included: compiling VAPI calls") + add_compile_definitions(SFLOW_USE_VAPI) +else() + message(WARNING "sflow plugin - linux-cp plugin excluded: not compiling VAPI calls") +endif() + +include_directories(${CMAKE_SOURCE_DIR}/vpp-api ${CMAKE_CURRENT_BINARY_DIR}/../../vpp-api) +add_vpp_plugin(sflow + SOURCES + sflow.c + node.c + sflow_common.h + sflow.h + sflow_psample.c + sflow_psample.h + sflow_psample_fields.h + sflow_usersock.c + sflow_usersock.h + sflow_vapi.c + sflow_vapi.h + + MULTIARCH_SOURCES + node.c + + API_FILES + sflow.api + + API_TEST_SOURCES + sflow_test.c + + LINK_LIBRARIES + vppapiclient + vapiclient +) diff --git a/src/plugins/sflow/FEATURE.yaml b/src/plugins/sflow/FEATURE.yaml new file mode 100644 index 00000000000..612db61005c --- /dev/null +++ b/src/plugins/sflow/FEATURE.yaml @@ -0,0 +1,16 @@ +--- +name: sFlow +maintainer: Neil McKee <neil.mckee@inmon.com> + +description: |- + This plugin implements the random packet-sampling and interface + telemetry streaming required to support standard sFlow export + on Linux platforms. The overhead incurred by this monitoring is + minimal, so that detailed, real-time traffic analysis can be + achieved even under high load conditions, with visibility into + any fields that appear in the packet headers. If the linux-cp + plugin is running then interfaces will be mapped to their + equivalent Linux tap ports. + +state: experimental +properties: [CLI, MULTITHREAD] diff --git a/src/plugins/sflow/node.c b/src/plugins/sflow/node.c new file mode 100644 index 00000000000..51826438834 --- /dev/null +++ b/src/plugins/sflow/node.c @@ -0,0 +1,356 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vlib/vlib.h> +#include <vlibmemory/api.h> +#include <vnet/vnet.h> +#include <vnet/pg/pg.h> +#include <vppinfra/error.h> +#include <sflow/sflow.h> + +typedef struct +{ + u32 next_index; + u32 sw_if_index; + u8 new_src_mac[6]; + u8 new_dst_mac[6]; +} sflow_trace_t; + +#ifndef CLIB_MARCH_VARIANT +static u8 * +my_format_mac_address (u8 *s, va_list *args) +{ + u8 *a = va_arg (*args, u8 *); + return format (s, "%02x:%02x:%02x:%02x:%02x:%02x", a[0], a[1], a[2], a[3], + a[4], a[5]); +} + +/* packet trace format function */ +static u8 * +format_sflow_trace (u8 *s, va_list *args) +{ + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + sflow_trace_t *t = va_arg (*args, sflow_trace_t *); + + s = format (s, "SFLOW: sw_if_index %d, next index %d\n", t->sw_if_index, + t->next_index); + s = format (s, " src %U -> dst %U", my_format_mac_address, t->new_src_mac, + my_format_mac_address, t->new_dst_mac); + return s; +} + +vlib_node_registration_t sflow_node; + +#endif /* CLIB_MARCH_VARIANT */ + +#ifndef CLIB_MARCH_VARIANT +static char *sflow_error_strings[] = { +#define _(sym, string) string, + foreach_sflow_error +#undef _ +}; +#endif /* CLIB_MARCH_VARIANT */ + +typedef enum +{ + SFLOW_NEXT_ETHERNET_INPUT, + SFLOW_N_NEXT, +} sflow_next_t; + +VLIB_NODE_FN (sflow_node) +(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame) +{ + u32 n_left_from, *from, *to_next; + sflow_next_t next_index; + + sflow_main_t *smp = &sflow_main; + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + + uword thread_index = os_get_thread_index (); + sflow_per_thread_data_t *sfwk = + vec_elt_at_index (smp->per_thread_data, thread_index); + + /* note that sfwk->skip==1 means "take the next packet", + so we never see sfwk->skip==0. */ + + u32 pkts = n_left_from; + if (PREDICT_TRUE (sfwk->skip > pkts)) + { + /* skip the whole frame-vector */ + sfwk->skip -= pkts; + sfwk->pool += pkts; + } + else + { + while (pkts >= sfwk->skip) + { + /* reach in to get the one we want. */ + vlib_buffer_t *bN = vlib_get_buffer (vm, from[sfwk->skip - 1]); + + /* Sample this packet header. */ + u32 hdr = bN->current_length; + if (hdr > smp->headerB) + hdr = smp->headerB; + + ethernet_header_t *en = vlib_buffer_get_current (bN); + u32 if_index = vnet_buffer (bN)->sw_if_index[VLIB_RX]; + vnet_hw_interface_t *hw = + vnet_get_sup_hw_interface (smp->vnet_main, if_index); + if (hw) + if_index = hw->hw_if_index; + else + { + // TODO: can we get interfaces that have no hw interface? + // If so, should we ignore the sample? + } + + sflow_sample_t sample = { + .samplingN = sfwk->smpN, + .input_if_index = if_index, + .sampled_packet_size = + bN->current_length + bN->total_length_not_including_first_buffer, + .header_bytes = hdr + }; + + // TODO: what bit in the buffer can we set right here to indicate + // that this packet was sampled (and perhaps another bit to say if it + // was dropped or sucessfully enqueued)? That way we can check it + // below if the packet is traced, and indicate that in the trace + // output. + + // TODO: we end up copying the header twice here. Consider allowing + // the enqueue to be just a little more complex. Like this: + // if(!sflow_fifo_enqueue(&sfwk->fifo, &sample, en, hdr). + // With headerB==128 that would be memcpy(,,24) plus memcpy(,,128) + // instead of the memcpy(,,128) plus memcpy(,,24+256) that we do + // here. (We also know that it could be done as a multiple of 8 + // (aligned) bytes because the sflow_sample_t fields are (6xu32) and + // the headerB setting is quantized to the nearest 32 bytes, so there + // may be ways to make it even easier for the compiler.) + sfwk->smpl++; + memcpy (sample.header, en, hdr); + if (PREDICT_FALSE (!sflow_fifo_enqueue (&sfwk->fifo, &sample))) + sfwk->drop++; + + pkts -= sfwk->skip; + sfwk->pool += sfwk->skip; + sfwk->skip = sflow_next_random_skip (sfwk); + } + /* We took a sample (or several) from this frame-vector, but now we are + skipping the rest. */ + sfwk->skip -= pkts; + sfwk->pool += pkts; + } + + /* the rest of this is boilerplate code just to make sure + * that packets are passed on the same way as they would + * have been if this node were not enabled. + * TODO: If there is ever a way to do this in one step + * (i.e. pass on the whole frame-vector unchanged) then it + * might help performance. + */ + + next_index = node->cached_next_index; + + while (n_left_from > 0) + { + u32 n_left_to_next; + + vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); + + while (n_left_from >= 8 && n_left_to_next >= 4) + { + u32 next0 = SFLOW_NEXT_ETHERNET_INPUT; + u32 next1 = SFLOW_NEXT_ETHERNET_INPUT; + u32 next2 = SFLOW_NEXT_ETHERNET_INPUT; + u32 next3 = SFLOW_NEXT_ETHERNET_INPUT; + ethernet_header_t *en0, *en1, *en2, *en3; + u32 bi0, bi1, bi2, bi3; + vlib_buffer_t *b0, *b1, *b2, *b3; + + /* Prefetch next iteration. */ + { + vlib_buffer_t *p4, *p5, *p6, *p7; + + p4 = vlib_get_buffer (vm, from[4]); + p5 = vlib_get_buffer (vm, from[5]); + p6 = vlib_get_buffer (vm, from[6]); + p7 = vlib_get_buffer (vm, from[7]); + + vlib_prefetch_buffer_header (p4, LOAD); + vlib_prefetch_buffer_header (p5, LOAD); + vlib_prefetch_buffer_header (p6, LOAD); + vlib_prefetch_buffer_header (p7, LOAD); + + CLIB_PREFETCH (p4->data, CLIB_CACHE_LINE_BYTES, STORE); + CLIB_PREFETCH (p5->data, CLIB_CACHE_LINE_BYTES, STORE); + CLIB_PREFETCH (p6->data, CLIB_CACHE_LINE_BYTES, STORE); + CLIB_PREFETCH (p7->data, CLIB_CACHE_LINE_BYTES, STORE); + } + + /* speculatively enqueue b0-b3 to the current next frame */ + to_next[0] = bi0 = from[0]; + to_next[1] = bi1 = from[1]; + to_next[2] = bi2 = from[2]; + to_next[3] = bi3 = from[3]; + from += 4; + to_next += 4; + n_left_from -= 4; + n_left_to_next -= 4; + + b0 = vlib_get_buffer (vm, bi0); + b1 = vlib_get_buffer (vm, bi1); + b2 = vlib_get_buffer (vm, bi2); + b3 = vlib_get_buffer (vm, bi3); + + /* do this to always pass on to the next node on feature arc */ + vnet_feature_next (&next0, b0); + vnet_feature_next (&next1, b1); + vnet_feature_next (&next2, b2); + vnet_feature_next (&next3, b3); + + ASSERT (b0->current_data == 0); + ASSERT (b1->current_data == 0); + ASSERT (b2->current_data == 0); + ASSERT (b3->current_data == 0); + + en0 = vlib_buffer_get_current (b0); + en1 = vlib_buffer_get_current (b1); + en2 = vlib_buffer_get_current (b2); + en3 = vlib_buffer_get_current (b3); + + if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED)) + { + sflow_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t)); + t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX]; + t->next_index = next0; + clib_memcpy (t->new_src_mac, en0->src_address, + sizeof (t->new_src_mac)); + clib_memcpy (t->new_dst_mac, en0->dst_address, + sizeof (t->new_dst_mac)); + } + + if (PREDICT_FALSE (b1->flags & VLIB_BUFFER_IS_TRACED)) + { + sflow_trace_t *t = vlib_add_trace (vm, node, b1, sizeof (*t)); + t->sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_RX]; + t->next_index = next1; + clib_memcpy (t->new_src_mac, en1->src_address, + sizeof (t->new_src_mac)); + clib_memcpy (t->new_dst_mac, en1->dst_address, + sizeof (t->new_dst_mac)); + } + + if (PREDICT_FALSE (b2->flags & VLIB_BUFFER_IS_TRACED)) + { + sflow_trace_t *t = vlib_add_trace (vm, node, b2, sizeof (*t)); + t->sw_if_index = vnet_buffer (b2)->sw_if_index[VLIB_RX]; + t->next_index = next2; + clib_memcpy (t->new_src_mac, en2->src_address, + sizeof (t->new_src_mac)); + clib_memcpy (t->new_dst_mac, en2->dst_address, + sizeof (t->new_dst_mac)); + } + + if (PREDICT_FALSE (b3->flags & VLIB_BUFFER_IS_TRACED)) + { + sflow_trace_t *t = vlib_add_trace (vm, node, b3, sizeof (*t)); + t->sw_if_index = vnet_buffer (b3)->sw_if_index[VLIB_RX]; + t->next_index = next3; + clib_memcpy (t->new_src_mac, en3->src_address, + sizeof (t->new_src_mac)); + clib_memcpy (t->new_dst_mac, en3->dst_address, + sizeof (t->new_dst_mac)); + } + + /* verify speculative enqueues, maybe switch current next frame */ + vlib_validate_buffer_enqueue_x4 (vm, node, next_index, to_next, + n_left_to_next, bi0, bi1, bi2, bi3, + next0, next1, next2, next3); + } + + while (n_left_from > 0 && n_left_to_next > 0) + { + u32 bi0; + vlib_buffer_t *b0; + u32 next0 = SFLOW_NEXT_ETHERNET_INPUT; + ethernet_header_t *en0; + + /* speculatively enqueue b0 to the current next frame */ + bi0 = from[0]; + to_next[0] = bi0; + from += 1; + to_next += 1; + n_left_from -= 1; + n_left_to_next -= 1; + + b0 = vlib_get_buffer (vm, bi0); + + /* do this to always pass on to the next node on feature arc */ + vnet_feature_next (&next0, b0); + + /* + * Direct from the driver, we should be at offset 0 + * aka at &b0->data[0] + */ + ASSERT (b0->current_data == 0); + + en0 = vlib_buffer_get_current (b0); + + if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED)) + { + sflow_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t)); + t->sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX]; + t->next_index = next0; + clib_memcpy (t->new_src_mac, en0->src_address, + sizeof (t->new_src_mac)); + clib_memcpy (t->new_dst_mac, en0->dst_address, + sizeof (t->new_dst_mac)); + } + + /* verify speculative enqueue, maybe switch current next frame */ + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, + n_left_to_next, bi0, next0); + } + + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + } + return frame->n_vectors; +} + +#ifndef CLIB_MARCH_VARIANT +VLIB_REGISTER_NODE (sflow_node) = +{ + .name = "sflow", + .vector_size = sizeof (u32), + .format_trace = format_sflow_trace, + .type = VLIB_NODE_TYPE_INTERNAL, + .n_errors = ARRAY_LEN(sflow_error_strings), + .error_strings = sflow_error_strings, + .n_next_nodes = SFLOW_N_NEXT, + /* edit / add dispositions here */ + .next_nodes = { + [SFLOW_NEXT_ETHERNET_INPUT] = "ethernet-input", + }, +}; +#endif /* CLIB_MARCH_VARIANT */ +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/sflow/sflow.api b/src/plugins/sflow/sflow.api new file mode 100644 index 00000000000..e5f33001e6e --- /dev/null +++ b/src/plugins/sflow/sflow.api @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file sflow.api + * @brief VPP control-plane API messages. + * + * This file defines VPP control-plane binary API messages which are generally + * called through a shared memory interface. + */ + +/* Version and type recitations */ + +option version = "0.1.0"; +import "vnet/interface_types.api"; + + +/** @brief API to enable / disable sflow + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param enable_disable - 1 to enable, 0 to disable the feature + @param hw_if_index - hardware interface handle +*/ + +autoreply define sflow_enable_disable { + /* Client identifier, set from api_main.my_client_index */ + u32 client_index; + + /* Arbitrary context, so client can match reply to request */ + u32 context; + + /* Enable / disable the feature */ + bool enable_disable; + + /* Interface handle */ + vl_api_interface_index_t hw_if_index; +}; + +/** @brief API to get sflow sampling-rate + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request +*/ + +define sflow_sampling_rate_get { + /* Client identifier, set from api_main.my_client_index */ + u32 client_index; + + /* Arbitrary context, so client can match reply to request */ + u32 context; +}; + +/** \brief API go the sflow sampling-rate + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param sampling_N - the current 1-in-N sampling rate +*/ + +define sflow_sampling_rate_get_reply +{ + u32 context; + u32 sampling_N; + option in_progress; +}; + +/** @brief API to set sflow sampling-rate + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param sampling_N - 1-in-N random sampling rate +*/ + +autoreply define sflow_sampling_rate_set { + /* Client identifier, set from api_main.my_client_index */ + u32 client_index; + + /* Arbitrary context, so client can match reply to request */ + u32 context; + + /* Sampling_N */ + u32 sampling_N [default=10000]; +}; + +/** @brief API to set sflow polling-interval + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param polling_S - polling interval in seconds +*/ + +autoreply define sflow_polling_interval_set { + /* Client identifier, set from api_main.my_client_index */ + u32 client_index; + + /* Arbitrary context, so client can match reply to request */ + u32 context; + + /* Polling_S */ + u32 polling_S [default=20]; +}; + +/** @brief API to get sflow polling-interval + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request +*/ + +define sflow_polling_interval_get { + /* Client identifier, set from api_main.my_client_index */ + u32 client_index; + + /* Arbitrary context, so client can match reply to request */ + u32 context; +}; + +/** \brief API go the sflow polling-interval + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param polling_S - current polling interval in seconds +*/ + +define sflow_polling_interval_get_reply +{ + u32 context; + u32 polling_S; + option in_progress; +}; + +/** @brief API to set sflow header-bytes + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param header_B - max header length in bytes +*/ + +autoreply define sflow_header_bytes_set { + /* Client identifier, set from api_main.my_client_index */ + u32 client_index; + + /* Arbitrary context, so client can match reply to request */ + u32 context; + + /* header_B */ + u32 header_B [default=128]; +}; + +/** @brief API to get sflow header-bytes + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request +*/ + +define sflow_header_bytes_get { + /* Client identifier, set from api_main.my_client_index */ + u32 client_index; + + /* Arbitrary context, so client can match reply to request */ + u32 context; +}; + +/** \brief API go the sflow header-bytes + @param client_index - opaque cookie to identify the sender + @param context - sender context, to match reply w/ request + @param header_B - current maximum header length in bytes +*/ + +define sflow_header_bytes_get_reply +{ + u32 context; + u32 header_B; + option in_progress; +}; + +/** \brief Dump sflow enabled interface(s) + @param client_index - opaque cookie to identify the sender + @param hw_if_index - hw_if_index of a specific interface, or -1 (default) + to return all sflow enabled interfaces +*/ +define sflow_interface_dump +{ + u32 client_index; + u32 context; + vl_api_interface_index_t hw_if_index [default=0xffffffff]; +}; + +/** \brief sflow enabled interface details +*/ +define sflow_interface_details +{ + u32 context; + vl_api_interface_index_t hw_if_index; +}; diff --git a/src/plugins/sflow/sflow.c b/src/plugins/sflow/sflow.c new file mode 100644 index 00000000000..5aa65062330 --- /dev/null +++ b/src/plugins/sflow/sflow.c @@ -0,0 +1,1052 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <vnet/vnet.h> +#include <vnet/plugin/plugin.h> +#include <sflow/sflow.h> + +#include <vlibapi/api.h> +#include <vlibmemory/api.h> +#include <vpp/app/version.h> +#include <stdbool.h> + +#include <sflow/sflow.api_enum.h> +#include <sflow/sflow.api_types.h> +#include <sflow/sflow_psample.h> + +#include <vpp-api/client/stat_client.h> +#include <vlib/stats/stats.h> + +#define REPLY_MSG_ID_BASE smp->msg_id_base +#include <vlibapi/api_helper_macros.h> + +sflow_main_t sflow_main; +vlib_log_class_t sflow_logger; + +static void +sflow_stat_segment_client_init (void) +{ + stat_client_main_t *scm = &stat_client_main; + vlib_stats_segment_t *sm = vlib_stats_get_segment (); + uword size; + + size = sm->memory_size ? sm->memory_size : STAT_SEGMENT_DEFAULT_SIZE; + scm->memory_size = size; + scm->shared_header = sm->shared_header; + scm->directory_vector = + stat_segment_adjust (scm, (void *) scm->shared_header->directory_vector); +} + +static void +update_counter_vector_simple (stat_segment_data_t *res, + sflow_counters_t *ifCtrs, u32 hw_if_index) +{ + for (int th = 0; th < vec_len (res->simple_counter_vec); th++) + { + for (int intf = 0; intf < vec_len (res->simple_counter_vec[th]); intf++) + { + if (intf == hw_if_index) + { + u64 count = res->simple_counter_vec[th][intf]; + if (count) + { + if (strcmp (res->name, "/if/rx-error") == 0) + ifCtrs->rx.errs += count; + else if (strcmp (res->name, "/if/tx-error") == 0) + ifCtrs->tx.errs += count; + else if (strcmp (res->name, "/if/drops") == 0) + ifCtrs->tx.drps += count; + else if (strcmp (res->name, "/if/rx-miss") == 0 || + strcmp (res->name, "/if/rx-no-buf") == 0) + ifCtrs->rx.drps += count; + } + } + } + } +} + +static void +update_counter_vector_combined (stat_segment_data_t *res, + sflow_counters_t *ifCtrs, u32 hw_if_index) +{ + for (int th = 0; th < vec_len (res->simple_counter_vec); th++) + { + for (int intf = 0; intf < vec_len (res->combined_counter_vec[th]); + intf++) + { + if (intf == hw_if_index) + { + u64 pkts = res->combined_counter_vec[th][intf].packets; + u64 byts = res->combined_counter_vec[th][intf].bytes; + if (pkts || byts) + { + if (strcmp (res->name, "/if/rx") == 0) + { + ifCtrs->rx.pkts += pkts; + ifCtrs->rx.byts += byts; + } + else if (strcmp (res->name, "/if/tx") == 0) + { + ifCtrs->tx.byts += byts; + ifCtrs->tx.pkts += pkts; + } + // TODO: do multicasts include broadcasts, or are they + // counted separately? (test with traffic) + else if (strcmp (res->name, "/if/rx-multicast") == 0) + ifCtrs->rx.m_pkts += pkts; + else if (strcmp (res->name, "/if/tx-multicast") == 0) + ifCtrs->tx.m_pkts += pkts; + else if (strcmp (res->name, "/if/rx-broadcast") == 0) + ifCtrs->rx.b_pkts += pkts; + else if (strcmp (res->name, "/if/tx-broadcast") == 0) + ifCtrs->tx.b_pkts += pkts; + } + } + } + } +} + +static int +startsWith (u8 *str, char *prefix) +{ + if (str && prefix) + { + int len1 = vec_len (str); + int len2 = strlen (prefix); + if (len1 >= len2) + { + return (memcmp (str, prefix, len2) == 0); + } + } + return false; +} + +static void +update_counters (sflow_main_t *smp, sflow_per_interface_data_t *sfif) +{ + vnet_sw_interface_t *sw = + vnet_get_sw_interface (smp->vnet_main, sfif->sw_if_index); + vnet_hw_interface_t *hw = + vnet_get_hw_interface (smp->vnet_main, sfif->hw_if_index); + // This gives us a list of stat integers + u32 *stats = stat_segment_ls (NULL); + stat_segment_data_t *res = NULL; + // read vector of stat_segment_data_t objects +retry: + res = stat_segment_dump (stats); + if (res == NULL) + { + /* Memory layout has changed */ + if (stats) + vec_free (stats); + stats = stat_segment_ls (NULL); + goto retry; + } + sflow_counters_t ifCtrs = {}; + // and accumulate the (per-thread) entries for this interface + for (int ii = 0; ii < vec_len (res); ii++) + { + switch (res[ii].type) + { + case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE: + update_counter_vector_simple (&res[ii], &ifCtrs, sfif->hw_if_index); + break; + case STAT_DIR_TYPE_COUNTER_VECTOR_COMBINED: + update_counter_vector_combined (&res[ii], &ifCtrs, + sfif->hw_if_index); + break; + case STAT_DIR_TYPE_SCALAR_INDEX: + case STAT_DIR_TYPE_NAME_VECTOR: + case STAT_DIR_TYPE_EMPTY: + default: + break; + } + } + stat_segment_data_free (res); + vec_free (stats); + // send the structure via netlink + SFLOWUSSpec spec = {}; + SFLOWUSSpec_setMsgType (&spec, SFLOW_VPP_MSG_IF_COUNTERS); + SFLOWUSSpec_setAttr (&spec, SFLOW_VPP_ATTR_PORTNAME, hw->name, + vec_len (hw->name)); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFINDEX, sfif->hw_if_index); + if (sfif->linux_if_index) + { + // We know the corresponding Linux ifIndex for this interface, so include + // that here. + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_OSINDEX, + sfif->linux_if_index); + } + + // Report consistent with vpp-snmp-agent + u64 ifSpeed = (hw->link_speed == ~0) ? 0 : (hw->link_speed * 1000); + if (startsWith (hw->name, "loop") || startsWith (hw->name, "tap")) + ifSpeed = 1e9; + + u32 ifType = startsWith (hw->name, "loop") ? 24 // softwareLoopback + : + 6; // ethernetCsmacd + + u32 ifDirection = (hw->flags & VNET_HW_INTERFACE_FLAG_HALF_DUPLEX) ? + 2 // half-duplex + : + 1; // full-duplex + + u32 operUp = (hw->flags & VNET_HW_INTERFACE_FLAG_LINK_UP) ? 1 : 0; + u32 adminUp = (sw->flags & VNET_SW_INTERFACE_FLAG_ADMIN_UP) ? 1 : 0; + + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFSPEED, ifSpeed); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFTYPE, ifType); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_IFDIRECTION, ifDirection); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_OPER_UP, operUp); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_ADMIN_UP, adminUp); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_OCTETS, ifCtrs.rx.byts); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_OCTETS, ifCtrs.tx.byts); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_PKTS, ifCtrs.rx.pkts); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_PKTS, ifCtrs.tx.pkts); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_MCASTS, ifCtrs.rx.m_pkts); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_MCASTS, ifCtrs.tx.m_pkts); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_BCASTS, ifCtrs.rx.b_pkts); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_BCASTS, ifCtrs.tx.b_pkts); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_ERRORS, ifCtrs.rx.errs); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_ERRORS, ifCtrs.tx.errs); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_RX_DISCARDS, ifCtrs.rx.drps); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_TX_DISCARDS, ifCtrs.tx.drps); + SFLOWUSSpec_setAttr (&spec, SFLOW_VPP_ATTR_HW_ADDRESS, hw->hw_address, + vec_len (hw->hw_address)); + smp->unixsock_seq++; + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_SEQ, smp->unixsock_seq); + if (SFLOWUSSpec_send (&smp->sflow_usersock, &spec) < 0) + smp->csample_send_drops++; + smp->csample_send++; +} + +static u32 +total_drops (sflow_main_t *smp) +{ + // sum sendmsg and worker-fifo drops + u32 all_drops = smp->psample_send_drops; + for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++) + { + sflow_per_thread_data_t *sfwk = + vec_elt_at_index (smp->per_thread_data, thread_index); + all_drops += sfwk->drop; + } + return all_drops; +} + +static void +send_sampling_status_info (sflow_main_t *smp) +{ + SFLOWUSSpec spec = {}; + u32 all_pipeline_drops = total_drops (smp); + SFLOWUSSpec_setMsgType (&spec, SFLOW_VPP_MSG_STATUS); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_UPTIME_S, smp->now_mono_S); + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_DROPS, all_pipeline_drops); + ++smp->unixsock_seq; + SFLOWUSSpec_setAttrInt (&spec, SFLOW_VPP_ATTR_SEQ, smp->unixsock_seq); + SFLOWUSSpec_send (&smp->sflow_usersock, &spec); +} + +static int +counter_polling_check (sflow_main_t *smp) +{ + // see if we should poll one or more interfaces + int polled = 0; + for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++) + { + sflow_per_interface_data_t *sfif = + vec_elt_at_index (smp->per_interface_data, ii); + if (sfif && sfif->sflow_enabled && + (sfif->polled == 0 // always send the first time + || (smp->now_mono_S % smp->pollingS) == + (sfif->hw_if_index % smp->pollingS))) + { + update_counters (smp, sfif); + sfif->polled++; + polled++; + } + } + return polled; +} + +static u32 +read_worker_fifos (sflow_main_t *smp) +{ + // Our maximum samples/sec is approximately: + // (SFLOW_READ_BATCH * smp->total_threads) / SFLOW_POLL_WAIT_S + // but it may also be affected by SFLOW_FIFO_DEPTH + // and whether vlib_process_wait_for_event_or_clock() really waits for + // SFLOW_POLL_WAIT_S every time. + // If there are too many samples then dropping them as early as possible + // (and as randomly as possible) is preferred, so SFLOW_FIFO_DEPTH should not + // be any bigger than it strictly needs to be. If there is a system + // bottleneck it could be in the PSAMPLE netlink channel, the hsflowd + // encoder, the UDP stack, the network path, the collector, or a faraway + // application. Any kind of "clipping" will result in systematic bias so we + // try to make this fair even when it's running hot. For example, we'll + // round-robin the thread FIFO dequeues here to make sure we give them equal + // access to the PSAMPLE channel. Another factor in sizing SFLOW_FIFO_DEPTH + // is to ensure that we can absorb a short-term line-rate burst without + // dropping samples. This implies a deeper FIFO. In fact it looks like this + // requirement ends up being the dominant one. A value of SFLOW_FIFO_DEPTH + // that will absorb an n-second line-rate burst may well result in the max + // sustainable samples/sec being higher than we really need. But it's not a + // serious problem because the samples are packed into UDP datagrams and the + // network or collector can drop those anywhere they need to. The protocol is + // designed to be tolerant to random packet-loss in transit. For example, 1% + // loss should just make it look like the sampling-rate setting was 1:10100 + // instead of 1:10000. + u32 batch = 0; + for (; batch < SFLOW_READ_BATCH; batch++) + { + u32 psample_send = 0, psample_send_fail = 0; + for (u32 thread_index = 0; thread_index < smp->total_threads; + thread_index++) + { + sflow_per_thread_data_t *sfwk = + vec_elt_at_index (smp->per_thread_data, thread_index); + sflow_sample_t sample; + if (sflow_fifo_dequeue (&sfwk->fifo, &sample)) + { + if (sample.header_bytes > smp->headerB) + { + // We get here if header-bytes setting is reduced dynamically + // and a sample that was in the FIFO appears with a larger + // header. + continue; + } + SFLOWPSSpec spec = {}; + u32 ps_group = SFLOW_VPP_PSAMPLE_GROUP_INGRESS; + u32 seqNo = ++smp->psample_seq_ingress; + // TODO: is it always ethernet? (affects ifType counter as well) + u16 header_protocol = 1; /* ethernet */ + SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_SAMPLE_GROUP, + ps_group); + SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_IIFINDEX, + sample.input_if_index); + SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_OIFINDEX, + sample.output_if_index); + SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_ORIGSIZE, + sample.sampled_packet_size); + SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_GROUP_SEQ, + seqNo); + SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_SAMPLE_RATE, + sample.samplingN); + SFLOWPSSpec_setAttr (&spec, SFLOWPS_PSAMPLE_ATTR_DATA, + sample.header, sample.header_bytes); + SFLOWPSSpec_setAttrInt (&spec, SFLOWPS_PSAMPLE_ATTR_PROTO, + header_protocol); + psample_send++; + if (SFLOWPSSpec_send (&smp->sflow_psample, &spec) < 0) + psample_send_fail++; + } + } + if (psample_send == 0) + { + // nothing found on FIFOs this time through, so terminate batch early + break; + } + else + { + vlib_node_increment_counter (smp->vlib_main, sflow_node.index, + SFLOW_ERROR_PSAMPLE_SEND, psample_send); + if (psample_send_fail > 0) + { + vlib_node_increment_counter (smp->vlib_main, sflow_node.index, + SFLOW_ERROR_PSAMPLE_SEND_FAIL, + psample_send_fail); + smp->psample_send_drops += psample_send_fail; + } + } + } + return batch; +} + +static void +read_node_counters (sflow_main_t *smp, sflow_err_ctrs_t *ctrs) +{ + for (u32 ec = 0; ec < SFLOW_N_ERROR; ec++) + ctrs->counters[ec] = 0; + for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++) + { + sflow_per_thread_data_t *sfwk = + vec_elt_at_index (smp->per_thread_data, thread_index); + ctrs->counters[SFLOW_ERROR_PROCESSED] += sfwk->pool; + ctrs->counters[SFLOW_ERROR_SAMPLED] += sfwk->smpl; + ctrs->counters[SFLOW_ERROR_DROPPED] += sfwk->drop; + } +} + +static void +update_node_cntr (sflow_main_t *smp, sflow_err_ctrs_t *prev, + sflow_err_ctrs_t *latest, sflow_error_t ee) +{ + u32 delta = latest->counters[ee] - prev->counters[ee]; + vlib_node_increment_counter (smp->vlib_main, sflow_node.index, ee, delta); +} + +static void +update_node_counters (sflow_main_t *smp, sflow_err_ctrs_t *prev, + sflow_err_ctrs_t *latest) +{ + update_node_cntr (smp, prev, latest, SFLOW_ERROR_PROCESSED); + update_node_cntr (smp, prev, latest, SFLOW_ERROR_SAMPLED); + update_node_cntr (smp, prev, latest, SFLOW_ERROR_DROPPED); + *prev = *latest; // latch for next time +} + +static uword +sflow_process_samples (vlib_main_t *vm, vlib_node_runtime_t *node, + vlib_frame_t *frame) +{ + sflow_main_t *smp = &sflow_main; + clib_time_t ctm; + clib_time_init (&ctm); + + sflow_err_ctrs_t prev = {}; + read_node_counters (smp, &prev); + + while (1) + { + + // We don't have anything for the main loop to edge-trigger on, so + // we are just asking to be called back regularly. More regularly + // if sFlow is actually enabled... + f64 poll_wait_S = smp->running ? SFLOW_POLL_WAIT_S : 1.0; + vlib_process_wait_for_event_or_clock (vm, poll_wait_S); + if (!smp->running) + { + // Nothing to do. Just yield again. + continue; + } + +#ifdef SFLOW_USE_VAPI +#ifdef SFLOW_TEST_HAMMER_VAPI + sflow_vapi_check_for_linux_if_index_results (&smp->vac, + smp->per_interface_data); + sflow_vapi_read_linux_if_index_numbers (&smp->vac, + smp->per_interface_data); +#endif +#endif + + // PSAMPLE channel may need extra step (e.g. to learn family_id) + // before it is ready to send + EnumSFLOWPSState psState = SFLOWPS_state (&smp->sflow_psample); + if (psState != SFLOWPS_STATE_READY) + { + SFLOWPS_open_step (&smp->sflow_psample); + } + + // What we want is a monotonic, per-second clock. This seems to do it + // because it is based on the CPU clock. + f64 tnow = clib_time_now (&ctm); + u32 tnow_S = (u32) tnow; + if (tnow_S != smp->now_mono_S) + { + // second rollover + smp->now_mono_S = tnow_S; +#ifdef SFLOW_USE_VAPI + if (!smp->vac.vapi_unavailable) + { + // look up linux if_index numbers + sflow_vapi_check_for_linux_if_index_results ( + &smp->vac, smp->per_interface_data); + if (smp->vapi_requests == 0 || + (tnow_S % SFLOW_VAPI_POLL_INTERVAL) == 0) + { + if (sflow_vapi_read_linux_if_index_numbers ( + &smp->vac, smp->per_interface_data)) + { + smp->vapi_requests++; + } + } + } +#endif + // send status info + send_sampling_status_info (smp); + // poll counters for interfaces that are due + counter_polling_check (smp); + } + // process samples from workers + read_worker_fifos (smp); + + // and sync the global counters + sflow_err_ctrs_t latest = {}; + read_node_counters (smp, &latest); + update_node_counters (smp, &prev, &latest); + } + return 0; +} + +VLIB_REGISTER_NODE (sflow_process_samples_node, static) = { + .function = sflow_process_samples, + .name = "sflow-process-samples", + .type = VLIB_NODE_TYPE_PROCESS, + .process_log2_n_stack_bytes = 17, +}; + +static void +sflow_set_worker_sampling_state (sflow_main_t *smp) +{ + /* set up (or reset) sampling context for each thread */ + vlib_thread_main_t *tm = &vlib_thread_main; + smp->total_threads = 1 + tm->n_threads; + vec_validate (smp->per_thread_data, smp->total_threads); + for (u32 thread_index = 0; thread_index < smp->total_threads; thread_index++) + { + sflow_per_thread_data_t *sfwk = + vec_elt_at_index (smp->per_thread_data, thread_index); + if (sfwk->smpN != smp->samplingN) + { + sfwk->smpN = smp->samplingN; + sfwk->seed = thread_index; + sfwk->skip = sflow_next_random_skip (sfwk); + SFLOW_DBG ( + "sflowset_worker_sampling_state: samplingN=%u thread=%u skip=%u", + smp->samplingN, thread_index, sfwk->skip); + } + } +} + +static void +sflow_sampling_start (sflow_main_t *smp) +{ + SFLOW_INFO ("sflow_sampling_start"); + + smp->running = 1; + // Reset this clock so that the per-second netlink status updates + // will communicate a restart to hsflowd. This helps to distinguish: + // (1) vpp restarted with sFlow off => no status updates (went quiet) + // (2) vpp restarted with default sFlow => status updates (starting again + // from 0) + smp->now_mono_S = 0; + + // reset sequence numbers to indicated discontinuity + smp->psample_seq_ingress = 0; + smp->psample_seq_egress = 0; + smp->psample_send_drops = 0; + +#ifdef SFLOW_USE_VAPI + // reset vapi request count so that we make a request the first time + smp->vapi_requests = 0; +#endif + + /* open PSAMPLE netlink channel for writing packet samples */ + SFLOWPS_open (&smp->sflow_psample); + /* open USERSOCK netlink channel for writing counters */ + SFLOWUS_open (&smp->sflow_usersock); + smp->sflow_usersock.group_id = SFLOW_NETLINK_USERSOCK_MULTICAST; + /* set up (or reset) sampling context for each thread */ + sflow_set_worker_sampling_state (smp); +} + +static void +sflow_sampling_stop (sflow_main_t *smp) +{ + SFLOW_INFO ("sflow_sampling_stop"); + smp->running = 0; + SFLOWPS_close (&smp->sflow_psample); + SFLOWUS_close (&smp->sflow_usersock); +} + +static void +sflow_sampling_start_stop (sflow_main_t *smp) +{ + int run = (smp->samplingN != 0 && smp->interfacesEnabled != 0); + if (run != smp->running) + { + if (run) + sflow_sampling_start (smp); + else + sflow_sampling_stop (smp); + } +} + +int +sflow_sampling_rate (sflow_main_t *smp, u32 samplingN) +{ + // TODO: this might be the right place to enforce the + // "2 significant" figures constraint so that per-interface + // sampling-rate settings can use HCF+sub-sampling efficiently. + + if (smp->running && smp->samplingN && samplingN) + { + // dynamic change of sampling rate + smp->samplingN = samplingN; + sflow_set_worker_sampling_state (smp); + } + else + { + // potential on/off change + smp->samplingN = samplingN; + sflow_sampling_start_stop (smp); + } + return 0; +} + +int +sflow_polling_interval (sflow_main_t *smp, u32 pollingS) +{ + smp->pollingS = pollingS; + return 0; +} + +int +sflow_header_bytes (sflow_main_t *smp, u32 headerB) +{ + u32 hdrB = headerB; + // first round up to nearest multiple of SFLOW_HEADER_BYTES_STEP + // (which helps to make worker thread memcpy faster) + hdrB = ((hdrB + SFLOW_HEADER_BYTES_STEP - 1) / SFLOW_HEADER_BYTES_STEP) * + SFLOW_HEADER_BYTES_STEP; + // then check max/min + if (hdrB < SFLOW_MIN_HEADER_BYTES) + hdrB = SFLOW_MIN_HEADER_BYTES; + if (hdrB > SFLOW_MAX_HEADER_BYTES) + hdrB = SFLOW_MAX_HEADER_BYTES; + if (hdrB != headerB) + SFLOW_WARN ("header_bytes rounded from %u to %u\n", headerB, hdrB); + smp->headerB = hdrB; + return 0; +} + +int +sflow_enable_disable (sflow_main_t *smp, u32 sw_if_index, int enable_disable) +{ + vnet_sw_interface_t *sw; + + /* Utterly wrong? */ + if (pool_is_free_index (smp->vnet_main->interface_main.sw_interfaces, + sw_if_index)) + return VNET_API_ERROR_INVALID_SW_IF_INDEX; + + /* Not a physical port? */ + sw = vnet_get_sw_interface (smp->vnet_main, sw_if_index); + if (sw->type != VNET_SW_INTERFACE_TYPE_HARDWARE) + return VNET_API_ERROR_INVALID_SW_IF_INDEX; + + // note: vnet_interface_main_t has "fast lookup table" called + // he_if_index_by_sw_if_index. + SFLOW_DBG ("sw_if_index=%u, sup_sw_if_index=%u, hw_if_index=%u\n", + sw->sw_if_index, sw->sup_sw_if_index, sw->hw_if_index); + + // note: vnet_hw_interface_t has uword *bond_info + // (where 0=>none, ~0 => slave, other=>ptr to bitmap of slaves) + + vec_validate (smp->per_interface_data, sw->hw_if_index); + sflow_per_interface_data_t *sfif = + vec_elt_at_index (smp->per_interface_data, sw->hw_if_index); + if (enable_disable == sfif->sflow_enabled) + { + // redundant enable or disable + return VNET_API_ERROR_VALUE_EXIST; + } + else + { + // OK, turn it on/off + sfif->sw_if_index = sw_if_index; + sfif->hw_if_index = sw->hw_if_index; + sfif->polled = 0; + sfif->sflow_enabled = enable_disable; + vnet_feature_enable_disable ("device-input", "sflow", sw_if_index, + enable_disable, 0, 0); + smp->interfacesEnabled += (enable_disable) ? 1 : -1; + } + + sflow_sampling_start_stop (smp); + return 0; +} + +static clib_error_t * +sflow_sampling_rate_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + sflow_main_t *smp = &sflow_main; + u32 sampling_N = ~0; + + int rv; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "%u", &sampling_N)) + ; + else + break; + } + + if (sampling_N == ~0) + return clib_error_return (0, "Please specify a sampling rate..."); + + rv = sflow_sampling_rate (smp, sampling_N); + + switch (rv) + { + case 0: + break; + default: + return clib_error_return (0, "sflow_enable_disable returned %d", rv); + } + return 0; +} + +static clib_error_t * +sflow_polling_interval_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + sflow_main_t *smp = &sflow_main; + u32 polling_S = ~0; + + int rv; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "%u", &polling_S)) + ; + else + break; + } + + if (polling_S == ~0) + return clib_error_return (0, "Please specify a polling interval..."); + + rv = sflow_polling_interval (smp, polling_S); + + switch (rv) + { + case 0: + break; + default: + return clib_error_return (0, "sflow_polling_interval returned %d", rv); + } + return 0; +} + +static clib_error_t * +sflow_header_bytes_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + sflow_main_t *smp = &sflow_main; + u32 header_B = ~0; + + int rv; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "%u", &header_B)) + ; + else + break; + } + + if (header_B == ~0) + return clib_error_return (0, "Please specify a header bytes limit..."); + + rv = sflow_header_bytes (smp, header_B); + + switch (rv) + { + case 0: + break; + default: + return clib_error_return (0, "sflow_header_bytes returned %d", rv); + } + return 0; +} + +static clib_error_t * +sflow_enable_disable_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + sflow_main_t *smp = &sflow_main; + u32 sw_if_index = ~0; + int enable_disable = 1; + + int rv; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "disable")) + enable_disable = 0; + else if (unformat (input, "%U", unformat_vnet_sw_interface, + smp->vnet_main, &sw_if_index)) + ; + else + break; + } + + if (sw_if_index == ~0) + return clib_error_return (0, "Please specify an interface..."); + + rv = sflow_enable_disable (smp, sw_if_index, enable_disable); + + switch (rv) + { + case 0: + break; + + case VNET_API_ERROR_INVALID_SW_IF_INDEX: + return clib_error_return ( + 0, "Invalid interface, only works on physical ports"); + break; + + case VNET_API_ERROR_UNIMPLEMENTED: + return clib_error_return (0, + "Device driver doesn't support redirection"); + break; + + default: + return clib_error_return (0, "sflow_enable_disable returned %d", rv); + } + return 0; +} + +static clib_error_t * +show_sflow_command_fn (vlib_main_t *vm, unformat_input_t *input, + vlib_cli_command_t *cmd) +{ + sflow_main_t *smp = &sflow_main; + clib_error_t *error = NULL; + vlib_cli_output (vm, "sflow sampling-rate %u\n", smp->samplingN); + vlib_cli_output (vm, "sflow sampling-direction ingress\n"); + vlib_cli_output (vm, "sflow polling-interval %u\n", smp->pollingS); + vlib_cli_output (vm, "sflow header-bytes %u\n", smp->headerB); + u32 itfs_enabled = 0; + for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++) + { + sflow_per_interface_data_t *sfif = + vec_elt_at_index (smp->per_interface_data, ii); + if (sfif && sfif->sflow_enabled) + { + itfs_enabled++; + vnet_hw_interface_t *hw = + vnet_get_hw_interface (smp->vnet_main, sfif->hw_if_index); + vlib_cli_output (vm, "sflow enable %s\n", (char *) hw->name); + } + } + vlib_cli_output (vm, "Status\n"); + vlib_cli_output (vm, " interfaces enabled: %u\n", itfs_enabled); + vlib_cli_output (vm, " packet samples sent: %u\n", + smp->psample_seq_ingress + smp->psample_seq_egress); + vlib_cli_output (vm, " packet samples dropped: %u\n", total_drops (smp)); + vlib_cli_output (vm, " counter samples sent: %u\n", smp->csample_send); + vlib_cli_output (vm, " counter samples dropped: %u\n", + smp->csample_send_drops); + return error; +} + +VLIB_CLI_COMMAND (sflow_enable_disable_command, static) = { + .path = "sflow enable-disable", + .short_help = "sflow enable-disable <interface-name> [disable]", + .function = sflow_enable_disable_command_fn, +}; + +VLIB_CLI_COMMAND (sflow_sampling_rate_command, static) = { + .path = "sflow sampling-rate", + .short_help = "sflow sampling-rate <N>", + .function = sflow_sampling_rate_command_fn, +}; + +VLIB_CLI_COMMAND (sflow_polling_interval_command, static) = { + .path = "sflow polling-interval", + .short_help = "sflow polling-interval <S>", + .function = sflow_polling_interval_command_fn, +}; + +VLIB_CLI_COMMAND (sflow_header_bytes_command, static) = { + .path = "sflow header-bytes", + .short_help = "sflow header-bytes <B>", + .function = sflow_header_bytes_command_fn, +}; + +VLIB_CLI_COMMAND (show_sflow_command, static) = { + .path = "show sflow", + .short_help = "show sflow", + .function = show_sflow_command_fn, +}; + +/* API message handler */ +static void +vl_api_sflow_enable_disable_t_handler (vl_api_sflow_enable_disable_t *mp) +{ + vl_api_sflow_enable_disable_reply_t *rmp; + sflow_main_t *smp = &sflow_main; + int rv; + + rv = sflow_enable_disable (smp, ntohl (mp->hw_if_index), + (int) (mp->enable_disable)); + + REPLY_MACRO (VL_API_SFLOW_ENABLE_DISABLE_REPLY); +} + +static void +vl_api_sflow_sampling_rate_set_t_handler (vl_api_sflow_sampling_rate_set_t *mp) +{ + vl_api_sflow_sampling_rate_set_reply_t *rmp; + sflow_main_t *smp = &sflow_main; + int rv; + + rv = sflow_sampling_rate (smp, ntohl (mp->sampling_N)); + + REPLY_MACRO (VL_API_SFLOW_SAMPLING_RATE_SET_REPLY); +} + +static void +vl_api_sflow_sampling_rate_get_t_handler (vl_api_sflow_sampling_rate_get_t *mp) +{ + vl_api_sflow_sampling_rate_get_reply_t *rmp; + sflow_main_t *smp = &sflow_main; + + REPLY_MACRO_DETAILS2 (VL_API_SFLOW_SAMPLING_RATE_GET_REPLY, + ({ rmp->sampling_N = ntohl (smp->samplingN); })); +} + +static void +vl_api_sflow_polling_interval_set_t_handler ( + vl_api_sflow_polling_interval_set_t *mp) +{ + vl_api_sflow_polling_interval_set_reply_t *rmp; + sflow_main_t *smp = &sflow_main; + int rv; + + rv = sflow_polling_interval (smp, ntohl (mp->polling_S)); + + REPLY_MACRO (VL_API_SFLOW_POLLING_INTERVAL_SET_REPLY); +} + +static void +vl_api_sflow_polling_interval_get_t_handler ( + vl_api_sflow_polling_interval_get_t *mp) +{ + vl_api_sflow_polling_interval_get_reply_t *rmp; + sflow_main_t *smp = &sflow_main; + + REPLY_MACRO_DETAILS2 (VL_API_SFLOW_POLLING_INTERVAL_GET_REPLY, + ({ rmp->polling_S = ntohl (smp->pollingS); })); +} + +static void +vl_api_sflow_header_bytes_set_t_handler (vl_api_sflow_header_bytes_set_t *mp) +{ + vl_api_sflow_header_bytes_set_reply_t *rmp; + sflow_main_t *smp = &sflow_main; + int rv; + + rv = sflow_header_bytes (smp, ntohl (mp->header_B)); + + REPLY_MACRO (VL_API_SFLOW_HEADER_BYTES_SET_REPLY); +} + +static void +vl_api_sflow_header_bytes_get_t_handler (vl_api_sflow_header_bytes_get_t *mp) +{ + vl_api_sflow_header_bytes_get_reply_t *rmp; + sflow_main_t *smp = &sflow_main; + + REPLY_MACRO_DETAILS2 (VL_API_SFLOW_HEADER_BYTES_GET_REPLY, + ({ rmp->header_B = ntohl (smp->headerB); })); +} + +static void +send_sflow_interface_details (vpe_api_main_t *am, vl_api_registration_t *reg, + u32 context, const u32 hw_if_index) +{ + vl_api_sflow_interface_details_t *mp; + sflow_main_t *smp = &sflow_main; + + mp = vl_msg_api_alloc_zero (sizeof (*mp)); + mp->_vl_msg_id = ntohs (REPLY_MSG_ID_BASE + VL_API_SFLOW_INTERFACE_DETAILS); + mp->context = context; + + mp->hw_if_index = htonl (hw_if_index); + vl_api_send_msg (reg, (u8 *) mp); +} + +static void +vl_api_sflow_interface_dump_t_handler (vl_api_sflow_interface_dump_t *mp) +{ + vpe_api_main_t *am = &vpe_api_main; + sflow_main_t *smp = &sflow_main; + vl_api_registration_t *reg; + u32 hw_if_index = ~0; + + reg = vl_api_client_index_to_registration (mp->client_index); + if (!reg) + return; + hw_if_index = ntohl (mp->hw_if_index); + + for (int ii = 0; ii < vec_len (smp->per_interface_data); ii++) + { + sflow_per_interface_data_t *sfif = + vec_elt_at_index (smp->per_interface_data, ii); + if (sfif && sfif->sflow_enabled) + { + if (hw_if_index == ~0 || hw_if_index == sfif->hw_if_index) + { + send_sflow_interface_details (am, reg, mp->context, + sfif->hw_if_index); + } + } + } +} + +/* API definitions */ +#include <sflow/sflow.api.c> + +static clib_error_t * +sflow_init (vlib_main_t *vm) +{ + sflow_logger = vlib_log_register_class ("sflow", "all"); + + sflow_main_t *smp = &sflow_main; + clib_error_t *error = 0; + + smp->vlib_main = vm; + smp->vnet_main = vnet_get_main (); + + /* set default sampling-rate and polling-interval so that "enable" is all + * that is necessary */ + smp->samplingN = SFLOW_DEFAULT_SAMPLING_N; + smp->pollingS = SFLOW_DEFAULT_POLLING_S; + smp->headerB = SFLOW_DEFAULT_HEADER_BYTES; + + /* Add our API messages to the global name_crc hash table */ + smp->msg_id_base = setup_message_id_table (); + + /* access to counters - TODO: should this only happen on sflow enable? */ + sflow_stat_segment_client_init (); + return error; +} + +VLIB_INIT_FUNCTION (sflow_init); + +VNET_FEATURE_INIT (sflow, static) = { + .arc_name = "device-input", + .node_name = "sflow", + .runs_before = VNET_FEATURES ("ethernet-input"), +}; + +VLIB_PLUGIN_REGISTER () = { + .version = VPP_BUILD_VER, + .description = "sFlow random packet sampling", +}; + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/sflow/sflow.h b/src/plugins/sflow/sflow.h new file mode 100644 index 00000000000..609ff723816 --- /dev/null +++ b/src/plugins/sflow/sflow.h @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __included_sflow_h__ +#define __included_sflow_h__ + +#include <vnet/vnet.h> +#include <vnet/ip/ip.h> +#include <vnet/ethernet/ethernet.h> + +#include <vppinfra/hash.h> +#include <vppinfra/error.h> +#include <sflow/sflow_common.h> +#include <sflow/sflow_vapi.h> +#include <sflow/sflow_psample.h> +#include <sflow/sflow_usersock.h> + +#define SFLOW_DEFAULT_SAMPLING_N 10000 +#define SFLOW_DEFAULT_POLLING_S 20 +#define SFLOW_DEFAULT_HEADER_BYTES 128 +#define SFLOW_MAX_HEADER_BYTES 256 +#define SFLOW_MIN_HEADER_BYTES 64 +#define SFLOW_HEADER_BYTES_STEP 32 + +#define SFLOW_FIFO_DEPTH 2048 // must be power of 2 +#define SFLOW_POLL_WAIT_S 0.001 +#define SFLOW_READ_BATCH 100 + +// use PSAMPLE group number to distinguish VPP samples from others +// (so that hsflowd will know to remap the ifIndex numbers if necessary) +#define SFLOW_VPP_PSAMPLE_GROUP_INGRESS 3 +#define SFLOW_VPP_PSAMPLE_GROUP_EGRESS 4 + +#define foreach_sflow_error \ + _ (PROCESSED, "sflow packets processed") \ + _ (SAMPLED, "sflow packets sampled") \ + _ (DROPPED, "sflow packets dropped") \ + _ (PSAMPLE_SEND, "sflow PSAMPLE sent") \ + _ (PSAMPLE_SEND_FAIL, "sflow PSAMPLE send failed") + +typedef enum +{ +#define _(sym, str) SFLOW_ERROR_##sym, + foreach_sflow_error +#undef _ + SFLOW_N_ERROR, +} sflow_error_t; + +typedef struct +{ + u32 counters[SFLOW_N_ERROR]; +} sflow_err_ctrs_t; + +/* packet sample */ +typedef struct +{ + u32 samplingN; + u32 input_if_index; + u32 output_if_index; + u32 header_protocol; + u32 sampled_packet_size; + u32 header_bytes; + u8 header[SFLOW_MAX_HEADER_BYTES]; +} sflow_sample_t; + +// Define SPSC FIFO for sending samples worker-to-main. +// (I did try to use VPP svm FIFO, but couldn't +// understand why it was sometimes going wrong). +typedef struct +{ + volatile u32 tx; // can change under consumer's feet + volatile u32 rx; // can change under producer's feet + sflow_sample_t samples[SFLOW_FIFO_DEPTH]; +} sflow_fifo_t; + +#define SFLOW_FIFO_NEXT(slot) ((slot + 1) & (SFLOW_FIFO_DEPTH - 1)) +static inline int +sflow_fifo_enqueue (sflow_fifo_t *fifo, sflow_sample_t *sample) +{ + u32 curr_rx = clib_atomic_load_acq_n (&fifo->rx); + u32 curr_tx = fifo->tx; // clib_atomic_load_acq_n(&fifo->tx); + u32 next_tx = SFLOW_FIFO_NEXT (curr_tx); + if (next_tx == curr_rx) + return false; // full + memcpy (&fifo->samples[next_tx], sample, sizeof (*sample)); + clib_atomic_store_rel_n (&fifo->tx, next_tx); + return true; +} + +static inline int +sflow_fifo_dequeue (sflow_fifo_t *fifo, sflow_sample_t *sample) +{ + u32 curr_rx = fifo->rx; // clib_atomic_load_acq_n(&fifo->rx); + u32 curr_tx = clib_atomic_load_acq_n (&fifo->tx); + if (curr_rx == curr_tx) + return false; // empty + memcpy (sample, &fifo->samples[curr_rx], sizeof (*sample)); + u32 next_rx = SFLOW_FIFO_NEXT (curr_rx); + clib_atomic_store_rel_n (&fifo->rx, next_rx); + return true; +} + +/* private to worker */ +typedef struct +{ + u32 smpN; + u32 skip; + u32 pool; + u32 seed; + u32 smpl; + u32 drop; + CLIB_CACHE_LINE_ALIGN_MARK (_fifo); + sflow_fifo_t fifo; +} sflow_per_thread_data_t; + +typedef struct +{ + /* API message ID base */ + u16 msg_id_base; + + /* convenience */ + vlib_main_t *vlib_main; + vnet_main_t *vnet_main; + ethernet_main_t *ethernet_main; + + /* sampling state */ + u32 samplingN; + u32 pollingS; + u32 headerB; + u32 total_threads; + sflow_per_interface_data_t *per_interface_data; + sflow_per_thread_data_t *per_thread_data; + + /* psample channel (packet samples) */ + SFLOWPS sflow_psample; + /* usersock channel (periodic counters) */ + SFLOWUS sflow_usersock; +#define SFLOW_NETLINK_USERSOCK_MULTICAST 29 + /* dropmon channel (packet drops) */ + // SFLOWDM sflow_dropmon; + + /* sample-processing */ + u32 now_mono_S; + + /* running control */ + int running; + u32 interfacesEnabled; + + /* main-thread counters */ + u32 psample_seq_ingress; + u32 psample_seq_egress; + u32 psample_send_drops; + u32 csample_send; + u32 csample_send_drops; + u32 unixsock_seq; +#ifdef SFLOW_USE_VAPI + /* vapi query helper thread (transient) */ + CLIB_CACHE_LINE_ALIGN_MARK (_vapi); + sflow_vapi_client_t vac; + int vapi_requests; +#endif +} sflow_main_t; + +extern sflow_main_t sflow_main; + +extern vlib_node_registration_t sflow_node; + +static inline u32 +sflow_next_random_skip (sflow_per_thread_data_t *sfwk) +{ + /* skip==1 means "take the next packet" so this + fn must never return 0 */ + if (sfwk->smpN <= 1) + return 1; + u32 lim = (2 * sfwk->smpN) - 1; + return (random_u32 (&sfwk->seed) % lim) + 1; +} + +#endif /* __included_sflow_h__ */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/sflow/sflow.rst b/src/plugins/sflow/sflow.rst new file mode 100644 index 00000000000..f9c18488363 --- /dev/null +++ b/src/plugins/sflow/sflow.rst @@ -0,0 +1,61 @@ +.. _Sflow_agent: + +.. toctree:: + +SFlow Monitoring Agent +====================== + +Overview +________ + +This plugin implements the random packet-sampling and interface +telemetry streaming required to support standard sFlow export +on Linux platforms. The overhead incurred by this monitoring is +minimal, so that detailed, real-time traffic analysis can be +achieved even under high load conditions, with visibility into +any fields that appear in the packet headers. If the VPP linux-cp +plugin is running then interfaces will be mapped to their +equivalent Linux tap ports. + +Example Configuration +_____________________ + +:: + sflow sampling-rate 10000 + sflow polling-interval 20 + sflow header-bytes 128 + sflow enable GigabitEthernet0/8/0 + sflow enable GigabitEthernet0/9/0 + sflow enable GigabitEthernet0/a/0 + ... + sflow enable GigabitEthernet0/a/0 disable + +Detailed notes +______________ + +Each VPP worker that has at least one interface, will create a FIFO +and enqueues samples to it from the interfaces it is servicing that +are enabled. There is a process running in the main thread that will +dequeue the FIFOs periodically. If the FIFO is full, the worker will +drop samples, which helps ensure that (a) the main thread is not +overloaded with samples and (b) that individual workers and interfaces, +even when under high load, can't crowd out other interfaces and workers. + +You can change the sampling-rate at runtime, but keep in mind that +it is a global variable that applies to workers, not interfaces. +This means that (1) all workers will sample at the same rate, and (2) +if there are multiple interfaces assigned to a worker, they'll share +the sampling rate which will undershoot, and similarly (3) if there +are multiple RX queues assigned to more than one worker, the effective +sampling rate will overshoot. + +External Dependencies +_____________________ + +This plugin writes packet samples to the standard Linux netlink PSAMPLE +channel, so the kernel psample module must be loaded with modprobe or +insmod. As such, this plugin only works for Linux environments. + +It also shares periodic interface counter samples vi netlink USERSOCK. +The host-sflow daemon, hsflowd, at https://sflow.net is one example of +a tool that will consume this feed and emit standard sFlow v5. diff --git a/src/plugins/sflow/sflow_common.h b/src/plugins/sflow/sflow_common.h new file mode 100644 index 00000000000..29784638bb9 --- /dev/null +++ b/src/plugins/sflow/sflow_common.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __included_sflow_common_h__ +#define __included_sflow_common_h__ + +// #define SFLOW_USE_VAPI (set by CMakeLists.txt) + +extern vlib_log_class_t sflow_logger; +#define SFLOW_DBG(...) vlib_log_debug (sflow_logger, __VA_ARGS__); +#define SFLOW_INFO(...) vlib_log_info (sflow_logger, __VA_ARGS__); +#define SFLOW_NOTICE(...) vlib_log_notice (sflow_logger, __VA_ARGS__); +#define SFLOW_WARN(...) vlib_log_warn (sflow_logger, __VA_ARGS__); +#define SFLOW_ERR(...) vlib_log_err (sflow_logger, __VA_ARGS__); + +typedef struct +{ + u32 sw_if_index; + u32 hw_if_index; + u32 linux_if_index; + u32 polled; + int sflow_enabled; +} sflow_per_interface_data_t; + +#endif /* __included_sflow_common_h__ */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/sflow/sflow_psample.c b/src/plugins/sflow/sflow_psample.c new file mode 100644 index 00000000000..0e4fcfbe790 --- /dev/null +++ b/src/plugins/sflow/sflow_psample.c @@ -0,0 +1,523 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined(__cplusplus) +extern "C" +{ +#endif + +#include <vlib/vlib.h> +#include <vnet/vnet.h> +#include <vnet/pg/pg.h> +#include <vppinfra/error.h> +#include <sflow/sflow.h> + +#include <fcntl.h> +#include <asm/types.h> +#include <sys/socket.h> +#include <linux/types.h> +#include <linux/netlink.h> +#include <linux/genetlink.h> +#include <linux/psample.h> +#include <signal.h> +#include <ctype.h> + +#include <sflow/sflow_psample.h> + + /*_________________---------------------------__________________ + _________________ fcntl utils __________________ + -----------------___________________________------------------ + */ + + static void + setNonBlocking (int fd) + { + // set the socket to non-blocking + int fdFlags = fcntl (fd, F_GETFL); + fdFlags |= O_NONBLOCK; + if (fcntl (fd, F_SETFL, fdFlags) < 0) + { + SFLOW_ERR ("fcntl(O_NONBLOCK) failed: %s\n", strerror (errno)); + } + } + + static void + setCloseOnExec (int fd) + { + // make sure it doesn't get inherited, e.g. when we fork a script + int fdFlags = fcntl (fd, F_GETFD); + fdFlags |= FD_CLOEXEC; + if (fcntl (fd, F_SETFD, fdFlags) < 0) + { + SFLOW_ERR ("fcntl(F_SETFD=FD_CLOEXEC) failed: %s\n", strerror (errno)); + } + } + + static int + setSendBuffer (int fd, int requested) + { + int txbuf = 0; + socklen_t txbufsiz = sizeof (txbuf); + if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, &txbufsiz) < 0) + { + SFLOW_ERR ("getsockopt(SO_SNDBUF) failed: %s", strerror (errno)); + } + if (txbuf < requested) + { + txbuf = requested; + if (setsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, sizeof (txbuf)) < 0) + { + SFLOW_WARN ("setsockopt(SO_TXBUF=%d) failed: %s", requested, + strerror (errno)); + } + // see what we actually got + txbufsiz = sizeof (txbuf); + if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txbuf, &txbufsiz) < 0) + { + SFLOW_ERR ("getsockopt(SO_SNDBUF) failed: %s", strerror (errno)); + } + } + return txbuf; + } + + /*_________________---------------------------__________________ + _________________ generic_pid __________________ + -----------------___________________________------------------ + choose a 32-bit id that is likely to be unique even if more + than one module in this process wants to bind a netlink socket + */ + + static u32 + generic_pid (u32 mod_id) + { + return (mod_id << 16) | getpid (); + } + + /*_________________---------------------------__________________ + _________________ generic_open __________________ + -----------------___________________________------------------ + */ + + static int + generic_open (u32 mod_id) + { + int nl_sock = socket (AF_NETLINK, SOCK_RAW, NETLINK_GENERIC); + if (nl_sock < 0) + { + SFLOW_ERR ("nl_sock open failed: %s\n", strerror (errno)); + return -1; + } + // bind to a suitable id + struct sockaddr_nl sa = { .nl_family = AF_NETLINK, + .nl_pid = generic_pid (mod_id) }; + if (bind (nl_sock, (struct sockaddr *) &sa, sizeof (sa)) < 0) + SFLOW_ERR ("generic_open: bind failed: %s\n", strerror (errno)); + setNonBlocking (nl_sock); + setCloseOnExec (nl_sock); + return nl_sock; + } + + /*_________________---------------------------__________________ + _________________ generic_send __________________ + -----------------___________________________------------------ + */ + + static int + generic_send (int sockfd, u32 mod_id, int type, int cmd, int req_type, + void *req, int req_len, int req_footprint, u32 seqNo) + { + struct nlmsghdr nlh = {}; + struct genlmsghdr ge = {}; + struct nlattr attr = {}; + + attr.nla_len = sizeof (attr) + req_len; + attr.nla_type = req_type; + + ge.cmd = cmd; + ge.version = 1; + + nlh.nlmsg_len = NLMSG_LENGTH (req_footprint + sizeof (attr) + sizeof (ge)); + nlh.nlmsg_flags = NLM_F_REQUEST | NLM_F_ACK; + nlh.nlmsg_type = type; + nlh.nlmsg_seq = seqNo; + nlh.nlmsg_pid = generic_pid (mod_id); + + struct iovec iov[4] = { { .iov_base = &nlh, .iov_len = sizeof (nlh) }, + { .iov_base = &ge, .iov_len = sizeof (ge) }, + { .iov_base = &attr, .iov_len = sizeof (attr) }, + { .iov_base = req, .iov_len = req_footprint } }; + + struct sockaddr_nl sa = { .nl_family = AF_NETLINK }; + struct msghdr msg = { .msg_name = &sa, + .msg_namelen = sizeof (sa), + .msg_iov = iov, + .msg_iovlen = 4 }; + return sendmsg (sockfd, &msg, 0); + } + + /*_________________---------------------------__________________ + _________________ getFamily_PSAMPLE __________________ + -----------------___________________________------------------ + */ + + static void + getFamily_PSAMPLE (SFLOWPS *pst) + { +#define SFLOWPS_FAM_LEN sizeof (PSAMPLE_GENL_NAME) +#define SFLOWPS_FAM_FOOTPRINT NLMSG_ALIGN (SFLOWPS_FAM_LEN) + char fam_name[SFLOWPS_FAM_FOOTPRINT] = {}; + memcpy (fam_name, PSAMPLE_GENL_NAME, SFLOWPS_FAM_LEN); + generic_send (pst->nl_sock, pst->id, GENL_ID_CTRL, CTRL_CMD_GETFAMILY, + CTRL_ATTR_FAMILY_NAME, fam_name, SFLOWPS_FAM_LEN, + SFLOWPS_FAM_FOOTPRINT, ++pst->nl_seq); + pst->state = SFLOWPS_STATE_WAIT_FAMILY; + } + + /*_________________---------------------------__________________ + _________________ processNetlink_GENERIC __________________ + -----------------___________________________------------------ + */ + + static void + processNetlink_GENERIC (SFLOWPS *pst, struct nlmsghdr *nlh) + { + char *msg = (char *) NLMSG_DATA (nlh); + int msglen = nlh->nlmsg_len - NLMSG_HDRLEN; + struct genlmsghdr *genl = (struct genlmsghdr *) msg; + SFLOW_DBG ("generic netlink CMD = %u\n", genl->cmd); + + for (int offset = GENL_HDRLEN; offset < msglen;) + { + struct nlattr *attr = (struct nlattr *) (msg + offset); + if (attr->nla_len == 0 || (attr->nla_len + offset) > msglen) + { + SFLOW_ERR ("processNetlink_GENERIC attr parse error\n"); + break; // attr parse error + } + char *attr_datap = (char *) attr + NLA_HDRLEN; + switch (attr->nla_type) + { + case CTRL_ATTR_VERSION: + pst->genetlink_version = *(u32 *) attr_datap; + break; + case CTRL_ATTR_FAMILY_ID: + pst->family_id = *(u16 *) attr_datap; + SFLOW_DBG ("generic family id: %u\n", pst->family_id); + break; + case CTRL_ATTR_FAMILY_NAME: + SFLOW_DBG ("generic family name: %s\n", attr_datap); + break; + case CTRL_ATTR_MCAST_GROUPS: + for (int grp_offset = NLA_HDRLEN; grp_offset < attr->nla_len;) + { + struct nlattr *grp_attr = + (struct nlattr *) (msg + offset + grp_offset); + if (grp_attr->nla_len == 0 || + (grp_attr->nla_len + grp_offset) > attr->nla_len) + { + SFLOW_ERR ( + "processNetlink_GENERIC grp_attr parse error\n"); + break; + } + char *grp_name = NULL; + u32 grp_id = 0; + for (int gf_offset = NLA_HDRLEN; + gf_offset < grp_attr->nla_len;) + { + struct nlattr *gf_attr = + (struct nlattr *) (msg + offset + grp_offset + + gf_offset); + if (gf_attr->nla_len == 0 || + (gf_attr->nla_len + gf_offset) > grp_attr->nla_len) + { + SFLOW_ERR ( + "processNetlink_GENERIC gf_attr parse error\n"); + break; + } + char *grp_attr_datap = (char *) gf_attr + NLA_HDRLEN; + switch (gf_attr->nla_type) + { + case CTRL_ATTR_MCAST_GRP_NAME: + grp_name = grp_attr_datap; + SFLOW_DBG ("psample multicast group: %s\n", grp_name); + break; + case CTRL_ATTR_MCAST_GRP_ID: + grp_id = *(u32 *) grp_attr_datap; + SFLOW_DBG ("psample multicast group id: %u\n", grp_id); + break; + } + gf_offset += NLMSG_ALIGN (gf_attr->nla_len); + } + if (pst->group_id == 0 && grp_name && grp_id && + !strcmp (grp_name, PSAMPLE_NL_MCGRP_SAMPLE_NAME)) + { + SFLOW_DBG ("psample found group %s=%u\n", grp_name, + grp_id); + pst->group_id = grp_id; + // We don't need to join the group if we are only sending + // to it. + } + + grp_offset += NLMSG_ALIGN (grp_attr->nla_len); + } + break; + default: + SFLOW_DBG ("psample attr type: %u (nested=%u) len: %u\n", + attr->nla_type, attr->nla_type & NLA_F_NESTED, + attr->nla_len); + break; + } + offset += NLMSG_ALIGN (attr->nla_len); + } + if (pst->family_id && pst->group_id) + { + SFLOW_DBG ("psample state->READY\n"); + pst->state = SFLOWPS_STATE_READY; + } + } + + // TODO: we can take out the fns for reading PSAMPLE here + + /*_________________---------------------------__________________ + _________________ processNetlink __________________ + -----------------___________________________------------------ + */ + + static void + processNetlink (SFLOWPS *pst, struct nlmsghdr *nlh) + { + if (nlh->nlmsg_type == NETLINK_GENERIC) + { + processNetlink_GENERIC (pst, nlh); + } + else if (nlh->nlmsg_type == pst->family_id) + { + // We are write-only, don't need to read these. + } + } + + /*_________________---------------------------__________________ + _________________ readNetlink_PSAMPLE __________________ + -----------------___________________________------------------ + */ + + static void + readNetlink_PSAMPLE (SFLOWPS *pst, int fd) + { + uint8_t recv_buf[SFLOWPS_PSAMPLE_READNL_RCV_BUF]; + int numbytes = recv (fd, recv_buf, sizeof (recv_buf), 0); + if (numbytes <= 0) + { + SFLOW_ERR ("readNetlink_PSAMPLE returned %d : %s\n", numbytes, + strerror (errno)); + return; + } + struct nlmsghdr *nlh = (struct nlmsghdr *) recv_buf; + while (NLMSG_OK (nlh, numbytes)) + { + if (nlh->nlmsg_type == NLMSG_DONE) + break; + if (nlh->nlmsg_type == NLMSG_ERROR) + { + struct nlmsgerr *err_msg = (struct nlmsgerr *) NLMSG_DATA (nlh); + if (err_msg->error == 0) + { + SFLOW_DBG ("received Netlink ACK\n"); + } + else + { + SFLOW_ERR ("error in netlink message: %d : %s\n", + err_msg->error, strerror (-err_msg->error)); + } + return; + } + processNetlink (pst, nlh); + nlh = NLMSG_NEXT (nlh, numbytes); + } + } + + /*_________________---------------------------__________________ + _________________ SFLOWPS_open __________________ + -----------------___________________________------------------ + */ + + bool + SFLOWPS_open (SFLOWPS *pst) + { + if (pst->nl_sock == 0) + { + pst->nl_sock = generic_open (pst->id); + if (pst->nl_sock > 0) + { + pst->state = SFLOWPS_STATE_OPEN; + setSendBuffer (pst->nl_sock, SFLOWPS_PSAMPLE_READNL_SND_BUF); + getFamily_PSAMPLE (pst); + } + } + return (pst->nl_sock > 0); + } + + /*_________________---------------------------__________________ + _________________ SFLOWPS_close __________________ + -----------------___________________________------------------ + */ + + bool + SFLOWPS_close (SFLOWPS *pst) + { + if (pst->nl_sock > 0) + { + int err = close (pst->nl_sock); + if (err == 0) + { + pst->nl_sock = 0; + return true; + } + else + { + SFLOW_ERR ("SFLOWPS_close: returned %d : %s\n", err, + strerror (errno)); + } + } + return false; + } + + /*_________________---------------------------__________________ + _________________ SFLOWPS_state __________________ + -----------------___________________________------------------ + */ + + EnumSFLOWPSState + SFLOWPS_state (SFLOWPS *pst) + { + return pst->state; + } + + /*_________________---------------------------__________________ + _________________ SFLOWPS_open_step __________________ + -----------------___________________________------------------ + */ + + EnumSFLOWPSState + SFLOWPS_open_step (SFLOWPS *pst) + { + switch (pst->state) + { + case SFLOWPS_STATE_INIT: + SFLOWPS_open (pst); + break; + case SFLOWPS_STATE_OPEN: + getFamily_PSAMPLE (pst); + break; + case SFLOWPS_STATE_WAIT_FAMILY: + readNetlink_PSAMPLE (pst, pst->nl_sock); + break; + case SFLOWPS_STATE_READY: + break; + } + return pst->state; + } + + /*_________________---------------------------__________________ + _________________ SFLOWPSSpec_setAttr __________________ + -----------------___________________________------------------ + */ + + bool + SFLOWPSSpec_setAttr (SFLOWPSSpec *spec, EnumSFLOWPSAttributes field, + void *val, int len) + { + SFLOWPSAttr *psa = &spec->attr[field]; + if (psa->included) + return false; + psa->included = true; + int expected_len = SFLOWPS_Fields[field].len; + if (expected_len && expected_len != len) + { + SFLOW_ERR ("SFLOWPSSpec_setAttr(%s) length=%u != expected: %u\n", + SFLOWPS_Fields[field].descr, len, expected_len); + return false; + } + psa->attr.nla_type = field; + psa->attr.nla_len = sizeof (psa->attr) + len; + int len_w_pad = NLMSG_ALIGN (len); + psa->val.iov_len = len_w_pad; + psa->val.iov_base = val; + spec->n_attrs++; + spec->attrs_len += sizeof (psa->attr); + spec->attrs_len += len_w_pad; + return true; + } + + /*_________________---------------------------__________________ + _________________ SFLOWPSSpec_send __________________ + -----------------___________________________------------------ + */ + + int + SFLOWPSSpec_send (SFLOWPS *pst, SFLOWPSSpec *spec) + { + spec->nlh.nlmsg_len = NLMSG_LENGTH (sizeof (spec->ge) + spec->attrs_len); + spec->nlh.nlmsg_flags = 0; + spec->nlh.nlmsg_type = pst->family_id; + spec->nlh.nlmsg_seq = ++pst->nl_seq; + spec->nlh.nlmsg_pid = generic_pid (pst->id); + + spec->ge.cmd = PSAMPLE_CMD_SAMPLE; + spec->ge.version = PSAMPLE_GENL_VERSION; + +#define MAX_IOV_FRAGMENTS (2 * __SFLOWPS_PSAMPLE_ATTR_MAX) + 2 + + struct iovec iov[MAX_IOV_FRAGMENTS]; + u32 frag = 0; + iov[frag].iov_base = &spec->nlh; + iov[frag].iov_len = sizeof (spec->nlh); + frag++; + iov[frag].iov_base = &spec->ge; + iov[frag].iov_len = sizeof (spec->ge); + frag++; + int nn = 0; + for (u32 ii = 0; ii < __SFLOWPS_PSAMPLE_ATTR_MAX; ii++) + { + SFLOWPSAttr *psa = &spec->attr[ii]; + if (psa->included) + { + nn++; + iov[frag].iov_base = &psa->attr; + iov[frag].iov_len = sizeof (psa->attr); + frag++; + iov[frag] = psa->val; // struct copy + frag++; + } + } + ASSERT (nn == spec->n_attrs); + + struct sockaddr_nl da = { .nl_family = AF_NETLINK, + .nl_groups = (1 << (pst->group_id - 1)) }; + + struct msghdr msg = { .msg_name = &da, + .msg_namelen = sizeof (da), + .msg_iov = iov, + .msg_iovlen = frag }; + + int status = sendmsg (pst->nl_sock, &msg, 0); + if (status <= 0) + { + SFLOW_ERR ("strerror(errno) = %s; errno = %d\n", strerror (errno), + errno); + return -1; + } + return 0; + } diff --git a/src/plugins/sflow/sflow_psample.h b/src/plugins/sflow/sflow_psample.h new file mode 100644 index 00000000000..5d4944231fd --- /dev/null +++ b/src/plugins/sflow/sflow_psample.h @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __included_sflow_psample_h__ +#define __included_sflow_psample_h__ + +#include <vlib/vlib.h> +#include <vnet/vnet.h> +#include <vnet/pg/pg.h> +#include <vppinfra/error.h> +#include <sflow/sflow.h> + +#include <asm/types.h> +#include <sys/socket.h> +#include <linux/types.h> +#include <linux/netlink.h> +#include <linux/genetlink.h> +#include <linux/psample.h> +#include <signal.h> +#include <ctype.h> + +// #define SFLOWPS_DEBUG + +#define SFLOWPS_PSAMPLE_READNL_RCV_BUF 8192 +#define SFLOWPS_PSAMPLE_READNL_SND_BUF 1000000 + +/* Shadow the attributes in linux/psample.h so + * we can easily compile/test fields that are not + * defined on the kernel we are compiling on. + */ +typedef enum +{ +#define SFLOWPS_FIELDDATA(field, len, descr) field, +#include "sflow/sflow_psample_fields.h" +#undef SFLOWPS_FIELDDATA + __SFLOWPS_PSAMPLE_ATTR_MAX +} EnumSFLOWPSAttributes; + +typedef struct _SFLOWPS_field_t +{ + EnumSFLOWPSAttributes field; + int len; + char *descr; +} SFLOWPS_field_t; + +static const SFLOWPS_field_t SFLOWPS_Fields[] = { +#define SFLOWPS_FIELDDATA(field, len, descr) { field, len, descr }, +#include "sflow/sflow_psample_fields.h" +#undef SFLOWPS_FIELDDATA +}; + +typedef enum +{ + SFLOWPS_STATE_INIT, + SFLOWPS_STATE_OPEN, + SFLOWPS_STATE_WAIT_FAMILY, + SFLOWPS_STATE_READY +} EnumSFLOWPSState; + +typedef struct _SFLOWPS +{ + EnumSFLOWPSState state; + u32 id; + int nl_sock; + u32 nl_seq; + u32 genetlink_version; + u16 family_id; + u32 group_id; +} SFLOWPS; + +typedef struct _SFLOWPSAttr +{ + bool included : 1; + struct nlattr attr; + struct iovec val; +} SFLOWPSAttr; + +typedef struct _SFLOWPSSpec +{ + struct nlmsghdr nlh; + struct genlmsghdr ge; + SFLOWPSAttr attr[__SFLOWPS_PSAMPLE_ATTR_MAX]; + int n_attrs; + int attrs_len; +} SFLOWPSSpec; + +bool SFLOWPS_open (SFLOWPS *pst); +bool SFLOWPS_close (SFLOWPS *pst); +EnumSFLOWPSState SFLOWPS_state (SFLOWPS *pst); +EnumSFLOWPSState SFLOWPS_open_step (SFLOWPS *pst); + +bool SFLOWPSSpec_setAttr (SFLOWPSSpec *spec, EnumSFLOWPSAttributes field, + void *buf, int len); +#define SFLOWPSSpec_setAttrInt(spec, field, val) \ + SFLOWPSSpec_setAttr ((spec), (field), &(val), sizeof (val)) + +int SFLOWPSSpec_send (SFLOWPS *pst, SFLOWPSSpec *spec); + +#endif /* __included_sflow_psample_h__ */ diff --git a/src/plugins/sflow/sflow_psample_fields.h b/src/plugins/sflow/sflow_psample_fields.h new file mode 100644 index 00000000000..72d484c4850 --- /dev/null +++ b/src/plugins/sflow/sflow_psample_fields.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_IIFINDEX, 4, "input if_index") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OIFINDEX, 4, "output if_index") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_ORIGSIZE, 4, "original packet size") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_SAMPLE_GROUP, 4, "group number") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_GROUP_SEQ, 4, "group sequence number") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_SAMPLE_RATE, 4, "sampling N") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_DATA, 0, "sampled header") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_TUNNEL, 0, "tunnel header") + +/* commands attributes */ +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_GROUP_REFCOUNT, 0, + "group reference count") + +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_PAD, 0, "pad bytes") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OUT_TC, 2, "egress queue number") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_OUT_TC_OCC, 8, + "egress queue depth in bytes") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_LATENCY, 8, + "transit latency in nanoseconds") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_TIMESTAMP, 8, "timestamp") +SFLOWPS_FIELDDATA (SFLOWPS_PSAMPLE_ATTR_PROTO, 2, "header protocol") diff --git a/src/plugins/sflow/sflow_test.c b/src/plugins/sflow/sflow_test.c new file mode 100644 index 00000000000..554806640e3 --- /dev/null +++ b/src/plugins/sflow/sflow_test.c @@ -0,0 +1,298 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vat/vat.h> +#include <vlibapi/api.h> +#include <vlibmemory/api.h> +#include <vppinfra/error.h> +#include <stdbool.h> + +#define __plugin_msg_base sflow_test_main.msg_id_base +#include <vlibapi/vat_helper_macros.h> + +uword unformat_sw_if_index (unformat_input_t *input, va_list *args); + +/* Declare message IDs */ +#include <sflow/sflow.api_enum.h> +#include <sflow/sflow.api_types.h> + +typedef struct +{ + /* API message ID base */ + u16 msg_id_base; + vat_main_t *vat_main; +} sflow_test_main_t; + +sflow_test_main_t sflow_test_main; + +static int +api_sflow_enable_disable (vat_main_t *vam) +{ + unformat_input_t *i = vam->input; + int enable_disable = 1; + u32 hw_if_index = ~0; + vl_api_sflow_enable_disable_t *mp; + int ret; + + /* Parse args required to build the message */ + while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT) + { + if (unformat (i, "%U", unformat_sw_if_index, vam, &hw_if_index)) + ; + else if (unformat (i, "disable")) + enable_disable = 0; + else + break; + } + + if (hw_if_index == ~0) + { + errmsg ("missing interface name / explicit hw_if_index number \n"); + return -99; + } + + /* Construct the API message */ + M (SFLOW_ENABLE_DISABLE, mp); + mp->hw_if_index = ntohl (hw_if_index); + mp->enable_disable = enable_disable; + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + return ret; +} + +static void +vl_api_sflow_sampling_rate_get_reply_t_handler ( + vl_api_sflow_sampling_rate_get_reply_t *mp) +{ + vat_main_t *vam = sflow_test_main.vat_main; + clib_warning ("sflow sampling_N: %d", ntohl (mp->sampling_N)); + vam->result_ready = 1; +} + +static int +api_sflow_sampling_rate_get (vat_main_t *vam) +{ + vl_api_sflow_sampling_rate_get_t *mp; + int ret; + + /* Construct the API message */ + M (SFLOW_SAMPLING_RATE_GET, mp); + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + return ret; +} + +static int +api_sflow_sampling_rate_set (vat_main_t *vam) +{ + unformat_input_t *i = vam->input; + u32 sampling_N = ~0; + vl_api_sflow_sampling_rate_set_t *mp; + int ret; + + /* Parse args required to build the message */ + while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT) + { + if (unformat (i, "sampling_N %d", &sampling_N)) + ; + else + break; + } + + if (sampling_N == ~0) + { + errmsg ("missing sampling_N number \n"); + return -99; + } + + /* Construct the API message */ + M (SFLOW_SAMPLING_RATE_SET, mp); + mp->sampling_N = ntohl (sampling_N); + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + return ret; +} + +static void +vl_api_sflow_polling_interval_get_reply_t_handler ( + vl_api_sflow_polling_interval_get_reply_t *mp) +{ + vat_main_t *vam = sflow_test_main.vat_main; + clib_warning ("sflow polling-interval: %d", ntohl (mp->polling_S)); + vam->result_ready = 1; +} + +static int +api_sflow_polling_interval_get (vat_main_t *vam) +{ + vl_api_sflow_polling_interval_get_t *mp; + int ret; + + /* Construct the API message */ + M (SFLOW_POLLING_INTERVAL_GET, mp); + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + return ret; +} + +static int +api_sflow_polling_interval_set (vat_main_t *vam) +{ + unformat_input_t *i = vam->input; + u32 polling_S = ~0; + vl_api_sflow_polling_interval_set_t *mp; + int ret; + + /* Parse args required to build the message */ + while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT) + { + if (unformat (i, "polling_S %d", &polling_S)) + ; + else + break; + } + + if (polling_S == ~0) + { + errmsg ("missing polling_S number \n"); + return -99; + } + + /* Construct the API message */ + M (SFLOW_POLLING_INTERVAL_SET, mp); + mp->polling_S = ntohl (polling_S); + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + return ret; +} + +static void +vl_api_sflow_header_bytes_get_reply_t_handler ( + vl_api_sflow_header_bytes_get_reply_t *mp) +{ + vat_main_t *vam = sflow_test_main.vat_main; + clib_warning ("sflow header-bytes: %d", ntohl (mp->header_B)); + vam->result_ready = 1; +} + +static int +api_sflow_header_bytes_get (vat_main_t *vam) +{ + vl_api_sflow_header_bytes_get_t *mp; + int ret; + + /* Construct the API message */ + M (SFLOW_HEADER_BYTES_GET, mp); + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + return ret; +} + +static int +api_sflow_header_bytes_set (vat_main_t *vam) +{ + unformat_input_t *i = vam->input; + u32 header_B = ~0; + vl_api_sflow_header_bytes_set_t *mp; + int ret; + + /* Parse args required to build the message */ + while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT) + { + if (unformat (i, "header_B %d", &header_B)) + ; + else + break; + } + + if (header_B == ~0) + { + errmsg ("missing header_B number \n"); + return -99; + } + + /* Construct the API message */ + M (SFLOW_HEADER_BYTES_SET, mp); + mp->header_B = ntohl (header_B); + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + return ret; +} + +static void +vl_api_sflow_interface_details_t_handler (vl_api_sflow_interface_details_t *mp) +{ + vat_main_t *vam = sflow_test_main.vat_main; + clib_warning ("sflow enable: %d", ntohl (mp->hw_if_index)); + vam->result_ready = 1; +} + +static int +api_sflow_interface_dump (vat_main_t *vam) +{ + vl_api_sflow_interface_dump_t *mp; + int ret; + + /* Construct the API message */ + M (SFLOW_INTERFACE_DUMP, mp); + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + return ret; +} + +/* + * List of messages that the sflow test plugin sends, + * and that the data plane plugin processes + */ +#include <sflow/sflow.api_test.c> + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/sflow/sflow_usersock.c b/src/plugins/sflow/sflow_usersock.c new file mode 100644 index 00000000000..0ccb947709a --- /dev/null +++ b/src/plugins/sflow/sflow_usersock.c @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined(__cplusplus) +extern "C" +{ +#endif + +#include <vlib/vlib.h> +#include <vnet/vnet.h> +#include <vnet/pg/pg.h> +#include <vppinfra/error.h> +#include <sflow/sflow.h> + +#include <fcntl.h> +#include <asm/types.h> +#include <sys/socket.h> +#include <linux/types.h> +#include <linux/netlink.h> +#include <signal.h> +#include <ctype.h> + +#include <sflow/sflow_usersock.h> + + /*_________________---------------------------__________________ + _________________ fcntl utils __________________ + -----------------___________________________------------------ + */ + + static void + setNonBlocking (int fd) + { + // set the socket to non-blocking + int fdFlags = fcntl (fd, F_GETFL); + fdFlags |= O_NONBLOCK; + if (fcntl (fd, F_SETFL, fdFlags) < 0) + { + SFLOW_ERR ("fcntl(O_NONBLOCK) failed: %s\n", strerror (errno)); + } + } + + static void + setCloseOnExec (int fd) + { + // make sure it doesn't get inherited, e.g. when we fork a script + int fdFlags = fcntl (fd, F_GETFD); + fdFlags |= FD_CLOEXEC; + if (fcntl (fd, F_SETFD, fdFlags) < 0) + { + SFLOW_ERR ("fcntl(F_SETFD=FD_CLOEXEC) failed: %s\n", strerror (errno)); + } + } + + /*_________________---------------------------__________________ + _________________ usersock_open __________________ + -----------------___________________________------------------ + */ + + static int + usersock_open (void) + { + int nl_sock = socket (AF_NETLINK, SOCK_RAW, NETLINK_USERSOCK); + if (nl_sock < 0) + { + SFLOW_ERR ("nl_sock open failed: %s\n", strerror (errno)); + return -1; + } + setNonBlocking (nl_sock); + setCloseOnExec (nl_sock); + return nl_sock; + } + + /*_________________---------------------------__________________ + _________________ SFLOWUS_open __________________ + -----------------___________________________------------------ + */ + + bool + SFLOWUS_open (SFLOWUS *ust) + { + if (ust->nl_sock == 0) + { + ust->nl_sock = usersock_open (); + } + return true; + } + + /*_________________---------------------------__________________ + _________________ SFLOWUS_close __________________ + -----------------___________________________------------------ + */ + + bool + SFLOWUS_close (SFLOWUS *ust) + { + if (ust->nl_sock != 0) + { + int err = close (ust->nl_sock); + if (err == 0) + { + ust->nl_sock = 0; + return true; + } + else + { + SFLOW_WARN ("SFLOWUS_close: returned %d : %s\n", err, + strerror (errno)); + } + } + return false; + } + + /*_________________---------------------------__________________ + _________________ SFLOWUSSpec_setMsgType __________________ + -----------------___________________________------------------ + */ + + bool + SFLOWUSSpec_setMsgType (SFLOWUSSpec *spec, EnumSFlowVppMsgType msgType) + { + spec->nlh.nlmsg_type = msgType; + return true; + } + + /*_________________---------------------------__________________ + _________________ SFLOWUSSpec_setAttr __________________ + -----------------___________________________------------------ + */ + + bool + SFLOWUSSpec_setAttr (SFLOWUSSpec *spec, EnumSFlowVppAttributes field, + void *val, int len) + { + SFLOWUSAttr *usa = &spec->attr[field]; + if (usa->included) + return false; + usa->included = true; + usa->attr.nla_type = field; + usa->attr.nla_len = sizeof (usa->attr) + len; + int len_w_pad = NLMSG_ALIGN (len); + usa->val.iov_len = len_w_pad; + usa->val.iov_base = val; + spec->n_attrs++; + spec->attrs_len += sizeof (usa->attr); + spec->attrs_len += len_w_pad; + return true; + } + + /*_________________---------------------------__________________ + _________________ SFLOWUSSpec_send __________________ + -----------------___________________________------------------ + */ + + int + SFLOWUSSpec_send (SFLOWUS *ust, SFLOWUSSpec *spec) + { + spec->nlh.nlmsg_len = NLMSG_LENGTH (spec->attrs_len); + spec->nlh.nlmsg_flags = 0; + spec->nlh.nlmsg_seq = ++ust->nl_seq; + spec->nlh.nlmsg_pid = getpid (); + +#define MAX_IOV_FRAGMENTS (2 * __SFLOW_VPP_ATTR_MAX) + 2 + + struct iovec iov[MAX_IOV_FRAGMENTS]; + u32 frag = 0; + iov[frag].iov_base = &spec->nlh; + iov[frag].iov_len = sizeof (spec->nlh); + frag++; + int nn = 0; + for (u32 ii = 0; ii < __SFLOW_VPP_ATTR_MAX; ii++) + { + SFLOWUSAttr *usa = &spec->attr[ii]; + if (usa->included) + { + nn++; + iov[frag].iov_base = &usa->attr; + iov[frag].iov_len = sizeof (usa->attr); + frag++; + iov[frag] = usa->val; // struct copy + frag++; + } + } + ASSERT (nn == spec->n_attrs); + + struct sockaddr_nl da = { + .nl_family = AF_NETLINK, + .nl_groups = (1 << (ust->group_id - 1)) // for multicast to the group + // .nl_pid = 1e9+6343 // for unicast to receiver bound to netlink socket + // with that "pid" + }; + + struct msghdr msg = { .msg_name = &da, + .msg_namelen = sizeof (da), + .msg_iov = iov, + .msg_iovlen = frag }; + + int status = sendmsg (ust->nl_sock, &msg, 0); + if (status <= 0) + { + // Linux replies with ECONNREFUSED when + // a multicast is sent via NETLINK_USERSOCK, but + // it's not an error so we can just ignore it here. + if (errno != ECONNREFUSED) + { + SFLOW_DBG ("USERSOCK strerror(errno) = %s\n", strerror (errno)); + return -1; + } + } + return 0; + } diff --git a/src/plugins/sflow/sflow_usersock.h b/src/plugins/sflow/sflow_usersock.h new file mode 100644 index 00000000000..d66389941a6 --- /dev/null +++ b/src/plugins/sflow/sflow_usersock.h @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __included_sflow_usersock_h__ +#define __included_sflow_usersock_h__ + +#include <vlib/vlib.h> +#include <vnet/vnet.h> +#include <vnet/pg/pg.h> +#include <vppinfra/error.h> +#include <sflow/sflow.h> + +#include <asm/types.h> +#include <sys/socket.h> +#include <linux/types.h> +#include <linux/netlink.h> +#include <signal.h> +#include <ctype.h> + +// ==================== shared with hsflowd mod_vpp ========================= +// See https://github.com/sflow/host-sflow + +#define SFLOW_VPP_NETLINK_USERSOCK_MULTICAST 29 + +typedef enum +{ + SFLOW_VPP_MSG_STATUS = 1, + SFLOW_VPP_MSG_IF_COUNTERS +} EnumSFlowVppMsgType; + +typedef enum +{ + SFLOW_VPP_ATTR_PORTNAME, /* string */ + SFLOW_VPP_ATTR_IFINDEX, /* u32 */ + SFLOW_VPP_ATTR_IFTYPE, /* u32 */ + SFLOW_VPP_ATTR_IFSPEED, /* u64 */ + SFLOW_VPP_ATTR_IFDIRECTION, /* u32 */ + SFLOW_VPP_ATTR_OPER_UP, /* u32 */ + SFLOW_VPP_ATTR_ADMIN_UP, /* u32 */ + SFLOW_VPP_ATTR_RX_OCTETS, /* u64 */ + SFLOW_VPP_ATTR_TX_OCTETS, /* u64 */ + SFLOW_VPP_ATTR_RX_PKTS, /* u64 */ + SFLOW_VPP_ATTR_TX_PKTS, /* u64 */ + SFLOW_VPP_ATTR_RX_BCASTS, /* u64 */ + SFLOW_VPP_ATTR_TX_BCASTS, /* u64 */ + SFLOW_VPP_ATTR_RX_MCASTS, /* u64 */ + SFLOW_VPP_ATTR_TX_MCASTS, /* u64 */ + SFLOW_VPP_ATTR_RX_DISCARDS, /* u64 */ + SFLOW_VPP_ATTR_TX_DISCARDS, /* u64 */ + SFLOW_VPP_ATTR_RX_ERRORS, /* u64 */ + SFLOW_VPP_ATTR_TX_ERRORS, /* u64 */ + SFLOW_VPP_ATTR_HW_ADDRESS, /* binary */ + SFLOW_VPP_ATTR_UPTIME_S, /* u32 */ + SFLOW_VPP_ATTR_OSINDEX, /* u32 Linux ifIndex number, where applicable */ + SFLOW_VPP_ATTR_DROPS, /* u32 all FIFO and netlink sendmsg drops */ + SFLOW_VPP_ATTR_SEQ, /* u32 send seq no */ + /* enum shared with hsflowd, so only add here */ + __SFLOW_VPP_ATTR_MAX +} EnumSFlowVppAttributes; + +#define SFLOW_VPP_PSAMPLE_GROUP_INGRESS 3 +#define SFLOW_VPP_PSAMPLE_GROUP_EGRESS 4 + +// ========================================================================= +typedef struct +{ + u64 byts; + u64 pkts; + u64 m_pkts; + u64 b_pkts; + u64 errs; + u64 drps; +} sflow_ctrs_t; + +typedef struct +{ + sflow_ctrs_t tx; + sflow_ctrs_t rx; +} sflow_counters_t; + +typedef struct _SFLOWUS_field_t +{ + EnumSFlowVppAttributes field; + int len; +} SFLOWUS_field_t; + +typedef struct _SFLOWUS +{ + u32 id; + int nl_sock; + u32 nl_seq; + u32 group_id; +} SFLOWUS; + +typedef struct _SFLOWUSAttr +{ + bool included : 1; + struct nlattr attr; + struct iovec val; +} SFLOWUSAttr; + +typedef struct _SFLOWUSSpec +{ + struct nlmsghdr nlh; + SFLOWUSAttr attr[__SFLOW_VPP_ATTR_MAX]; + int n_attrs; + int attrs_len; +} SFLOWUSSpec; + +bool SFLOWUS_open (SFLOWUS *ust); +bool SFLOWUS_close (SFLOWUS *ust); + +bool SFLOWUSSpec_setMsgType (SFLOWUSSpec *spec, EnumSFlowVppMsgType type); +bool SFLOWUSSpec_setAttr (SFLOWUSSpec *spec, EnumSFlowVppAttributes field, + void *buf, int len); +#define SFLOWUSSpec_setAttrInt(spec, field, val) \ + SFLOWUSSpec_setAttr ((spec), (field), &(val), sizeof (val)) + +int SFLOWUSSpec_send (SFLOWUS *ust, SFLOWUSSpec *spec); + +#endif /* __included_sflow_usersock_h__ */ diff --git a/src/plugins/sflow/sflow_vapi.c b/src/plugins/sflow/sflow_vapi.c new file mode 100644 index 00000000000..cdc89a54c80 --- /dev/null +++ b/src/plugins/sflow/sflow_vapi.c @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sflow/sflow_vapi.h> + +#ifdef SFLOW_USE_VAPI + +#include <vlibapi/api.h> +#include <vlibmemory/api.h> +#include <vpp/app/version.h> +#include <stdbool.h> + +#include <vapi/vapi.h> +#include <vapi/memclnt.api.vapi.h> +#include <vapi/vlib.api.vapi.h> + +#ifdef included_interface_types_api_types_h +#define defined_vapi_enum_if_status_flags +#define defined_vapi_enum_mtu_proto +#define defined_vapi_enum_link_duplex +#define defined_vapi_enum_sub_if_flags +#define defined_vapi_enum_rx_mode +#define defined_vapi_enum_if_type +#define defined_vapi_enum_direction +#endif +#include <vapi/lcp.api.vapi.h> + +DEFINE_VAPI_MSG_IDS_LCP_API_JSON; + +static vapi_error_e +my_pair_get_cb (struct vapi_ctx_s *ctx, void *callback_ctx, vapi_error_e rv, + bool is_last, vapi_payload_lcp_itf_pair_get_v2_reply *reply) +{ + // this is a no-op, but it seems like it's presence is still required. For + // example, it is called if the pair lookup does not find anything. + return VAPI_OK; +} + +static vapi_error_e +my_pair_details_cb (struct vapi_ctx_s *ctx, void *callback_ctx, + vapi_error_e rv, bool is_last, + vapi_payload_lcp_itf_pair_details *details) +{ + sflow_per_interface_data_t *sfif = + (sflow_per_interface_data_t *) callback_ctx; + // Setting this here will mean it is sent to hsflowd with the interface + // counters. + sfif->linux_if_index = details->vif_index; + return VAPI_OK; +} + +static vapi_error_e +sflow_vapi_connect (sflow_vapi_client_t *vac) +{ + vapi_error_e rv = VAPI_OK; + vapi_ctx_t ctx = vac->vapi_ctx; + if (ctx == NULL) + { + // first time - open and connect. + if ((rv = vapi_ctx_alloc (&ctx)) != VAPI_OK) + { + SFLOW_ERR ("vap_ctx_alloc() returned %d", rv); + } + else + { + vac->vapi_ctx = ctx; + if ((rv = vapi_connect_from_vpp ( + ctx, "api_from_sflow_plugin", SFLOW_VAPI_MAX_REQUEST_Q, + SFLOW_VAPI_MAX_RESPONSE_Q, VAPI_MODE_BLOCKING, true)) != + VAPI_OK) + { + SFLOW_ERR ("vapi_connect_from_vpp() returned %d", rv); + } + else + { + // Connected - but is there a handler for the request we want to + // send? + if (!vapi_is_msg_available (ctx, + vapi_msg_id_lcp_itf_pair_add_del_v2)) + { + SFLOW_WARN ("vapi_is_msg_available() returned false => " + "linux-cp plugin not loaded"); + rv = VAPI_EUSER; + } + } + } + } + return rv; +} + +// in forked thread +static void * +get_lcp_itf_pairs (void *magic) +{ + sflow_vapi_client_t *vac = magic; + vapi_error_e rv = VAPI_OK; + + sflow_per_interface_data_t *intfs = vac->vapi_itfs; + vlib_set_thread_name (SFLOW_VAPI_THREAD_NAME); + if ((rv = sflow_vapi_connect (vac)) != VAPI_OK) + { + vac->vapi_unavailable = true; + } + else + { + vapi_ctx_t ctx = vac->vapi_ctx; + + for (int ii = 1; ii < vec_len (intfs); ii++) + { + sflow_per_interface_data_t *sfif = vec_elt_at_index (intfs, ii); + if (sfif && sfif->sflow_enabled) + { + // TODO: if we try non-blocking we might not be able to just pour + // all the requests in here. Might be better to do them one at a + // time - e.g. when we poll for counters. + vapi_msg_lcp_itf_pair_get_v2 *msg = + vapi_alloc_lcp_itf_pair_get_v2 (ctx); + if (msg) + { + msg->payload.sw_if_index = sfif->sw_if_index; + if ((rv = vapi_lcp_itf_pair_get_v2 (ctx, msg, my_pair_get_cb, + sfif, my_pair_details_cb, + sfif)) != VAPI_OK) + { + SFLOW_ERR ("vapi_lcp_itf_pair_get_v2 returned %d", rv); + // vapi.h: "message must be freed by vapi_msg_free if not + // consumed by vapi_send" + vapi_msg_free (ctx, msg); + } + } + } + } + // We no longer disconnect or free the client structures + // vapi_disconnect_from_vpp (ctx); + // vapi_ctx_free (ctx); + } + // indicate that we are done - more portable that using pthread_tryjoin_np() + vac->vapi_request_status = (int) rv; + clib_atomic_store_rel_n (&vac->vapi_request_active, false); + // TODO: how to tell if heap-allocated data is stored separately per thread? + // And if so, how to tell the allocator to GC all data for the thread when it + // exits? + return (void *) rv; +} + +int +sflow_vapi_read_linux_if_index_numbers (sflow_vapi_client_t *vac, + sflow_per_interface_data_t *itfs) +{ + +#ifdef SFLOW_VAPI_TEST_PLUGIN_SYMBOL + // don't even fork the query thread if the symbol is not there + if (!vlib_get_plugin_symbol ("linux_cp_plugin.so", "lcp_itf_pair_get")) + { + return false; + } +#endif + // previous query is done and results extracted? + int req_active = clib_atomic_load_acq_n (&vac->vapi_request_active); + if (req_active == false && vac->vapi_itfs == NULL) + { + // make a copy of the current interfaces vector for the lookup thread to + // write into + vac->vapi_itfs = vec_dup (itfs); + pthread_attr_t attr; + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + pthread_attr_setstacksize (&attr, VLIB_THREAD_STACK_SIZE); + vac->vapi_request_active = true; + pthread_create (&vac->vapi_thread, &attr, get_lcp_itf_pairs, vac); + pthread_attr_destroy (&attr); + return true; + } + return false; +} + +int +sflow_vapi_check_for_linux_if_index_results (sflow_vapi_client_t *vac, + sflow_per_interface_data_t *itfs) +{ + // request completed? + // TODO: if we use non-blocking mode do we have to call something here to + // receive results? + int req_active = clib_atomic_load_acq_n (&vac->vapi_request_active); + if (req_active == false && vac->vapi_itfs != NULL) + { + // yes, extract what we learned + // TODO: would not have to do this if vector were array of pointers + // to sflow_per_interface_data_t rather than an actual array, but + // it does mean we have very clear separation between the threads. + for (int ii = 1; ii < vec_len (vac->vapi_itfs); ii++) + { + sflow_per_interface_data_t *sfif1 = + vec_elt_at_index (vac->vapi_itfs, ii); + sflow_per_interface_data_t *sfif2 = vec_elt_at_index (itfs, ii); + if (sfif1 && sfif2 && sfif1->sflow_enabled && sfif2->sflow_enabled) + sfif2->linux_if_index = sfif1->linux_if_index; + } + vec_free (vac->vapi_itfs); + vac->vapi_itfs = NULL; + return true; + } + return false; +} + +#endif /* SFLOW_USE_VAPI */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/sflow/sflow_vapi.h b/src/plugins/sflow/sflow_vapi.h new file mode 100644 index 00000000000..640fe997684 --- /dev/null +++ b/src/plugins/sflow/sflow_vapi.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024 InMon Corp. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __included_sflow_vapi_h__ +#define __included_sflow_vapi_h__ + +#include <vnet/vnet.h> +#include <sflow/sflow_common.h> + +#ifdef SFLOW_USE_VAPI + +#define SFLOW_VAPI_POLL_INTERVAL 5 +#define SFLOW_VAPI_MAX_REQUEST_Q 8 +#define SFLOW_VAPI_MAX_RESPONSE_Q 16 +#define SFLOW_VAPI_THREAD_NAME "sflow_vapi" // must be <= 15 characters + +// #define SFLOW_VAPI_TEST_PLUGIN_SYMBOL + +typedef struct +{ + volatile int vapi_request_active; // to sync main <-> vapi_thread + pthread_t vapi_thread; + sflow_per_interface_data_t *vapi_itfs; + int vapi_unavailable; + int vapi_request_status; // written by vapi_thread + void *vapi_ctx; +} sflow_vapi_client_t; + +int sflow_vapi_read_linux_if_index_numbers (sflow_vapi_client_t *vac, + sflow_per_interface_data_t *itfs); +int +sflow_vapi_check_for_linux_if_index_results (sflow_vapi_client_t *vac, + sflow_per_interface_data_t *itfs); + +#endif /* SFLOW_USE_VAPI */ +#endif /* __included_sflow_vapi_h__ */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ |