diff options
Diffstat (limited to 'telemetry')
-rw-r--r-- | telemetry/.clang-format | 3 | ||||
-rw-r--r-- | telemetry/CMakeLists.txt | 69 | ||||
-rw-r--r-- | telemetry/cmake/packaging.cmake (renamed from telemetry/vpp-collectd/cmake/Modules/Packaging.cmake) | 8 | ||||
-rw-r--r-- | telemetry/collectd.conf | 54 | ||||
-rw-r--r-- | telemetry/data_model.h | 143 | ||||
-rw-r--r-- | telemetry/hicn-light-collectd/CMakeLists.txt | 65 | ||||
-rw-r--r-- | telemetry/hicn-light-collectd/hicn_light.c | 200 | ||||
-rw-r--r-- | telemetry/kafka-collectd/CMakeLists.txt | 62 | ||||
-rw-r--r-- | telemetry/kafka-collectd/format_influxdb.c | 190 | ||||
-rw-r--r-- | telemetry/kafka-collectd/format_influxdb.h | 45 | ||||
-rw-r--r-- | telemetry/kafka-collectd/write_kafka_line_protocol.c | 526 | ||||
-rw-r--r-- | telemetry/third-party/CMakeLists.txt | 18 | ||||
-rw-r--r-- | telemetry/vpp-collectd/CMakeLists.txt | 24 | ||||
-rw-r--r-- | telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt | 94 | ||||
-rw-r--r-- | telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c | 196 | ||||
-rw-r--r-- | telemetry/vpp-collectd/vpp/CMakeLists.txt | 73 | ||||
-rw-r--r-- | telemetry/vpp-collectd/vpp/vpp.c | 123 |
17 files changed, 1559 insertions, 334 deletions
diff --git a/telemetry/.clang-format b/telemetry/.clang-format new file mode 100644 index 000000000..0043ea3d0 --- /dev/null +++ b/telemetry/.clang-format @@ -0,0 +1,3 @@ +# Copyright (c) 2022 Cisco and/or its affiliates. + +BasedOnStyle: Google
\ No newline at end of file diff --git a/telemetry/CMakeLists.txt b/telemetry/CMakeLists.txt index 53fd04f01..181e726f1 100644 --- a/telemetry/CMakeLists.txt +++ b/telemetry/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,11 +11,68 @@ # See the License for the specific language governing permissions and # limitations under the License. +############################################################## +# Project and cmake version +############################################################## +cmake_minimum_required(VERSION 3.10 FATAL_ERROR) +project(telemetry) -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) -if ((CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) OR - (BUILD_HICNPLUGIN AND "${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")) - add_subdirectory(vpp-collectd) -endif () +############################################################## +# C Standard +############################################################## +set(CMAKE_C_STANDARD 11) + +############################################################## +# Cmake modules +############################################################## +include("${CMAKE_CURRENT_SOURCE_DIR}/../versions.cmake") +set(CMAKE_MODULE_PATH + ${CMAKE_MODULE_PATH} + ${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules +) + + +############################################################## +# Libs and Bins names +############################################################## +set(COLLECTD_PLUGINS hicn-collectd-plugins CACHE INTERNAL "" FORCE) +set(HICN_LIGHT_TELEMETRY hicn_light) +set(KAFKA_TELEMETRY write_kafka_line_protocol) +set(VPP_TELEMETRY vpp) +set(VPP_HICN_TELEMETRY vpp_hicn) + + +############################################################## +# Dependencies and third party libs +############################################################## +find_package(Collectd ${COLLECTD_DEFAULT_VERSION} REQUIRED) +add_subdirectory(third-party) + + +############################################################## +# Check if building as subproject or as root project +############################################################## +if(NOT (CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) AND + NOT (BUILD_HICNPLUGIN AND "${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")) + return() +endif() +include(CommonSetup) + +# Include config.h in all collectd plugins +set(COLLECTD_COMPILER_OPTIONS -include config.h) + +# ############################################################## +# # Packaging and versioning +# ############################################################## +include(${CMAKE_CURRENT_SOURCE_DIR}/../versions.cmake) +include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/packaging.cmake) + + +############################################################## +# Subdirectories +############################################################## +add_subdirectory(hicn-light-collectd) +add_subdirectory(kafka-collectd) +add_subdirectory(vpp-collectd)
\ No newline at end of file diff --git a/telemetry/vpp-collectd/cmake/Modules/Packaging.cmake b/telemetry/cmake/packaging.cmake index dc4629a26..d5d2264ce 100644 --- a/telemetry/vpp-collectd/cmake/Modules/Packaging.cmake +++ b/telemetry/cmake/packaging.cmake @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -16,16 +16,16 @@ ###################### set(${COLLECTD_PLUGINS}_DESCRIPTION - "A high-performance Hybrid ICN forwarder as a plugin to VPP." + "VPP and hICN plugins for collectd" CACHE STRING "Description for deb/rpm package." ) set(${COLLECTD_PLUGINS}_DEB_DEPENDENCIES - "collectd, hicn-plugin-dev" + "collectd, hicn-plugin-dev (= stable_version) libhicnctrl (= stable_version) libyajl-dev" CACHE STRING "Dependencies for deb/rpm package." ) set(${COLLECTD_PLUGINS}_RPM_DEPENDENCIES - "collectd, hicn-plugin-dev" + "collectd, hicn-plugin-dev (= stable_version) libhicnctrl (= stable_version) libyajl-dev" CACHE STRING "Dependencies for deb/rpm package." )
\ No newline at end of file diff --git a/telemetry/collectd.conf b/telemetry/collectd.conf new file mode 100644 index 000000000..225c161c6 --- /dev/null +++ b/telemetry/collectd.conf @@ -0,0 +1,54 @@ +############################################################################## +# Global # +############################################################################## +FQDNLookup true +#Interval 10 + +# Limit the size of the write queue. Default is no limit. Setting up a limit +# is recommended for servers handling a high volume of traffic. +#WriteQueueLimitHigh 1000000 +#WriteQueueLimitLow 800000 + +############################################################################## +# Logging # +############################################################################## +LoadPlugin logfile + +<Plugin logfile> + LogLevel "info" + File STDOUT + Timestamp true + PrintSeverity true +</Plugin> + +############################################################################## +# LoadPlugin section # +############################################################################## +LoadPlugin write_log + +<LoadPlugin hicn_light> + Globals true # Required to find libhicnctrl symbols + Interval 5 +</LoadPlugin> + +<LoadPlugin write_kafka_line_protocol> + Interval 10 +</LoadPlugin> + +############################################################################## +# Plugin configuration # +############################################################################## +<Plugin write_kafka_line_protocol> + Property "bootstrap.servers" "localhost:8081" + Property "security.protocol" "sasl_plaintext" + Property "sasl.mechanism" "SCRAM-SHA-256" + Property "sasl.username" "eloparco" + Property "sasl.password" "password" + + <Topic "stream"> + Format InfluxDB + </Topic> + # <Topic "metadata"> + # Format hicnJSON + # </Topic> +</Plugin> diff --git a/telemetry/data_model.h b/telemetry/data_model.h new file mode 100644 index 000000000..b1c4ae7e5 --- /dev/null +++ b/telemetry/data_model.h @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2022 Cisco and/or its affiliates. + */ + +#include "utils/common/common.h" + +#define KAFKA_TOPIC_KEY "_TOPIC" +#define KAFKA_STREAM_TOPIC "stream" +#define KAFKA_METADATA_TOPIC "metadata" + +/************** DATA SOURCES ******************************/ +data_source_t packets_dsrc[1] = { + {"packets", DS_TYPE_GAUGE, 0, NAN}, +}; + +data_source_t interests_dsrc[1] = { + {"interests", DS_TYPE_GAUGE, 0, NAN}, +}; + +data_source_t data_dsrc[1] = { + {"data", DS_TYPE_GAUGE, 0, NAN}, +}; + +data_source_t combined_dsrc[2] = { + {"packets", DS_TYPE_DERIVE, 0, NAN}, + {"bytes", DS_TYPE_DERIVE, 0, NAN}, +}; + +/************** DATA SETS NODE ****************************/ +data_set_t pkts_processed_ds = { + "pkts_processed", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pkts_interest_count_ds = { + "pkts_interest_count", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pkts_data_count_ds = { + "pkts_data_count", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pkts_from_cache_count_ds = { + "pkts_from_cache_count", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pkts_no_pit_count_ds = { + "pkts_no_pit_count", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pit_expired_count_ds = { + "pit_expired_count", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t cs_expired_count_ds = { + "cs_expired_count", + STATIC_ARRAY_SIZE(data_dsrc), + data_dsrc, +}; + +data_set_t cs_lru_count_ds = { + "cs_lru_count", + STATIC_ARRAY_SIZE(data_dsrc), + data_dsrc, +}; + +data_set_t pkts_drop_no_buf_ds = { + "pkts_drop_no_buf", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t interests_aggregated_ds = { + "interests_aggregated", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t interests_retx_ds = { + "interests_retx", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t interests_hash_collision_ds = { + "interests_hash_collision", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t pit_entries_count_ds = { + "pit_entries_count", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t cs_entries_count_ds = { + "cs_entries_count", + STATIC_ARRAY_SIZE(data_dsrc), + data_dsrc, +}; + +data_set_t cs_entries_ntw_count_ds = { + "cs_entries_ntw_count", + STATIC_ARRAY_SIZE(data_dsrc), + data_dsrc, +}; + +/************** DATA SETS FACE ****************************/ +data_set_t irx_ds = { + "irx", + STATIC_ARRAY_SIZE(combined_dsrc), + combined_dsrc, +}; + +data_set_t itx_ds = { + "itx", + STATIC_ARRAY_SIZE(combined_dsrc), + combined_dsrc, +}; + +data_set_t drx_ds = { + "drx", + STATIC_ARRAY_SIZE(combined_dsrc), + combined_dsrc, +}; + +data_set_t dtx_ds = { + "dtx", + STATIC_ARRAY_SIZE(combined_dsrc), + combined_dsrc, +}; diff --git a/telemetry/hicn-light-collectd/CMakeLists.txt b/telemetry/hicn-light-collectd/CMakeLists.txt new file mode 100644 index 000000000..984d7076c --- /dev/null +++ b/telemetry/hicn-light-collectd/CMakeLists.txt @@ -0,0 +1,65 @@ +# Copyright (c) 2022 Cisco and/or its affiliates. + +############################################################## +# Source files +############################################################## +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_light.c +) + + +############################################################## +# Include dirs +############################################################## +list(APPEND INCLUDE_DIRS + ${COLLECTD_INCLUDE_DIRS} + ${THIRD_PARTY_INCLUDE_DIRS} +) + + +############################################################## +# Libraries +############################################################## +find_package(Libhicn ${CURRENT_VERSION} REQUIRED NO_MODULE) +find_package(Libhicnctrl ${CURRENT_VERSION} REQUIRED NO_MODULE) + +if (DISABLE_SHARED_LIBRARIES) + set(LIBTYPE static) +else() + set(LIBTYPE shared) +endif() + +list(APPEND LIBHICN_LIBRARIES hicn::hicn.${LIBTYPE}) +list(APPEND LIBHICNCTRL_LIBRARIES hicn::hicnctrl.${LIBTYPE}) + +list (APPEND LIBRARIES + PRIVATE ${LIBHICNCTRL_LIBRARIES} + PRIVATE ${LIBHICN_LIBRARIES} +) + + +############################################################## +# Compiler options +############################################################## +list(APPEND COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ${COLLECTD_COMPILER_OPTIONS} +) + + +############################################################## +# Build library +############################################################## +build_library(${HICN_LIGHT_TELEMETRY} + SHARED + EMPTY_PREFIX + SOURCES ${SOURCE_FILES} + LINK_LIBRARIES ${LIBRARIES} + INCLUDE_DIRS + PRIVATE ${INCLUDE_DIRS} + INSTALL_FULL_PATH_DIR ${COLLECTD_PLUGIN_DIR} + COMPONENT ${COLLECTD_PLUGINS} + DEPENDS ${DEPENDENCIES} + LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} +)
\ No newline at end of file diff --git a/telemetry/hicn-light-collectd/hicn_light.c b/telemetry/hicn-light-collectd/hicn_light.c new file mode 100644 index 000000000..bb4eb571c --- /dev/null +++ b/telemetry/hicn-light-collectd/hicn_light.c @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2022 Cisco and/or its affiliates. + */ + +#define ntohll hicn_ntohll // Rename to avoid collision +#include <hicn/ctrl/api.h> +#include <hicn/ctrl/hicn-light-ng.h> +#include <hicn/util/sstrncpy.h> +#undef ntohll + +#include "../data_model.h" +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" + +#define PLUGIN_NAME "hicn_light" + +static hc_sock_t *s = NULL; + +static void submit(const char *type, value_t *values, size_t values_len, + meta_data_t *meta) { + assert(type != NULL && values != NULL && values_len != 0); + + value_list_t vl = {.values = values, .values_len = values_len}; + if (meta) vl.meta = meta; + + int rc = strcpy_s(vl.plugin, sizeof(vl.plugin), PLUGIN_NAME); + _ASSERT(rc == EOK); + rc = strcpy_s(vl.type, sizeof(vl.type), type); + _ASSERT(rc == EOK); + rc = strcpy_s(vl.host, sizeof(vl.host), hostname_g); + _ASSERT(rc == EOK); + + plugin_dispatch_values(&vl); +} + +static int read_forwarder_global_stats(hc_data_t **pdata, meta_data_t *meta) { + // Retrieve global stats from forwarder + int rc = hc_stats_get(s, pdata); + if (rc < 0) { + plugin_log(LOG_ERR, "Could not read global stats from forwarder"); + return -1; + } + hicn_light_stats_t stats = *((hicn_light_stats_t *)(*pdata)->buffer); + + // Submit values + value_t values[1]; + values[0] = (value_t){.gauge = stats.forwarder.countReceived}; + submit(pkts_processed_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countInterestsReceived}; + submit(pkts_interest_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countObjectsReceived}; + submit(pkts_data_count_ds.type, values, 1, meta); + values[0] = + (value_t){.gauge = stats.forwarder.countInterestsSatisfiedFromStore}; + submit(pkts_from_cache_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countDroppedNoReversePath}; + submit(pkts_no_pit_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countInterestsExpired}; + submit(pit_expired_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countDataExpired}; + submit(cs_expired_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.pkt_cache.n_lru_evictions}; + submit(cs_lru_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countDropped}; + submit(pkts_drop_no_buf_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countInterestsAggregated}; + submit(interests_aggregated_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countInterestsRetransmitted}; + submit(interests_retx_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.pkt_cache.n_pit_entries}; + submit(pit_entries_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.pkt_cache.n_cs_entries}; + submit(cs_entries_count_ds.type, values, 1, meta); + + return 0; +} + +static int read_forwarder_per_face_stats(hc_data_t **pdata, meta_data_t *meta) { + // Retrieve per-face stats from forwarder + int rc = hc_stats_list(s, pdata); + if (rc < 0) { + plugin_log(LOG_ERR, "Could not read face stats from forwarder"); + return -1; + } + hc_data_t *data = *pdata; + cmd_stats_list_item_t *conn_stats = (cmd_stats_list_item_t *)data->buffer; + cmd_stats_list_item_t *end = + (cmd_stats_list_item_t *)(data->buffer + + data->size * data->out_element_size); + + // Submit values + while (conn_stats < end) { + rc = meta_data_add_unsigned_int(meta, "face_id", conn_stats->id); + assert(rc == 0); + + value_t values[2]; + values[0] = (value_t){.derive = conn_stats->stats.interests.rx_pkts}; + values[1] = (value_t){.derive = conn_stats->stats.interests.rx_bytes}; + submit(irx_ds.type, values, 2, meta); + values[0] = (value_t){.derive = conn_stats->stats.interests.tx_pkts}; + values[1] = (value_t){.derive = conn_stats->stats.interests.tx_bytes}; + submit(itx_ds.type, values, 2, meta); + values[0] = (value_t){.derive = conn_stats->stats.data.rx_pkts}; + values[1] = (value_t){.derive = conn_stats->stats.data.rx_bytes}; + submit(drx_ds.type, values, 2, meta); + values[0] = (value_t){.derive = conn_stats->stats.data.tx_pkts}; + values[1] = (value_t){.derive = conn_stats->stats.data.tx_bytes}; + submit(dtx_ds.type, values, 2, meta); + + conn_stats++; + } + + return 0; +} + +static int read_forwarder_stats() { + // Create metadata + meta_data_t *meta = meta_data_create(); + int rc = meta_data_add_string(meta, KAFKA_TOPIC_KEY, KAFKA_STREAM_TOPIC); + assert(rc == 0); + + hc_data_t *data = NULL; + rc = read_forwarder_global_stats(&data, meta); + if (rc < 0) goto READ_ERROR; + rc = read_forwarder_per_face_stats(&data, meta); + +READ_ERROR: + meta_data_destroy(meta); + hc_data_free(data); + return rc; +} + +static int connect_to_forwarder() { + plugin_log(LOG_INFO, "Connecting to forwarder"); + s = hc_sock_create_forwarder(HICNLIGHT_NG); + if (!s) { + plugin_log(LOG_ERR, "Could not create socket"); + return -1; + } + + int rc = hc_sock_connect(s); + if (rc < 0) { + plugin_log(LOG_ERR, "Could not establish connection to forwarder"); + hc_sock_free(s); + s = NULL; + return -1; + } + + return 0; +} + +static int disconnect_from_forwarder() { + plugin_log(LOG_INFO, "Disconnecting from forwarder"); + + if (s == NULL) { + plugin_log(LOG_ERR, "Forwarder not connected"); + return -1; + } + + hc_command_t command = {0}; + command.object.connection.id = 0; + int rc = strcpy_s(command.object.connection.name, + sizeof(command.object.connection.name), "SELF"); + if (rc != EOK || hc_connection_delete(s, &command.object.connection) < 0) { + rc = -1; + plugin_log(LOG_ERR, "Error removing local connection to forwarder"); + } + + hc_sock_free(s); + return rc; +} + +void module_register() { + // Data sets + plugin_register_data_set(&pkts_processed_ds); + plugin_register_data_set(&pkts_interest_count_ds); + plugin_register_data_set(&pkts_data_count_ds); + plugin_register_data_set(&pkts_from_cache_count_ds); + plugin_register_data_set(&pkts_no_pit_count_ds); + plugin_register_data_set(&pit_expired_count_ds); + plugin_register_data_set(&cs_expired_count_ds); + plugin_register_data_set(&cs_lru_count_ds); + plugin_register_data_set(&pkts_drop_no_buf_ds); + plugin_register_data_set(&interests_aggregated_ds); + plugin_register_data_set(&interests_retx_ds); + plugin_register_data_set(&interests_hash_collision_ds); + plugin_register_data_set(&pit_entries_count_ds); + plugin_register_data_set(&cs_entries_count_ds); + plugin_register_data_set(&cs_entries_ntw_count_ds); + plugin_register_data_set(&irx_ds); + plugin_register_data_set(&itx_ds); + plugin_register_data_set(&drx_ds); + plugin_register_data_set(&dtx_ds); + + // Callbacks + plugin_register_init(PLUGIN_NAME, connect_to_forwarder); + plugin_register_read(PLUGIN_NAME, read_forwarder_stats); + plugin_register_shutdown(PLUGIN_NAME, disconnect_from_forwarder); +}
\ No newline at end of file diff --git a/telemetry/kafka-collectd/CMakeLists.txt b/telemetry/kafka-collectd/CMakeLists.txt new file mode 100644 index 000000000..f1ff81117 --- /dev/null +++ b/telemetry/kafka-collectd/CMakeLists.txt @@ -0,0 +1,62 @@ +# Copyright (c) 2022 Cisco and/or its affiliates. + +############################################################## +# Source files +############################################################## +file(GLOB_RECURSE COLLECTD_UTILS_SOURCES "${THIRD_PARTY_INCLUDE_DIRS}/utils/cmds/*.c") + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/write_kafka_line_protocol.c + ${THIRD_PARTY_INCLUDE_DIRS}/utils/format_json/format_json.c + ${THIRD_PARTY_INCLUDE_DIRS}/utils/format_graphite/format_graphite.c + ${CMAKE_CURRENT_SOURCE_DIR}/format_influxdb.c + ${COLLECTD_UTILS_SOURCES} +) + + +############################################################## +# Include dirs +############################################################## +list(APPEND INCLUDE_DIRS + PRIVATE ${COLLECTD_INCLUDE_DIRS} + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} +) + + +############################################################## +# Libraries +############################################################## +find_package(RdKafka ${RDKAFKA_DEFAULT_VERSION} REQUIRED) +find_library(YAJL_LIB libyajl.so REQUIRED) + +list (APPEND LIBRARIES + ${YAJL_LIB} + ${RdKafka_LIBRARY_PATH} +) + + +############################################################## +# Compiler options +############################################################## +list(APPEND COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ${COLLECTD_COMPILER_OPTIONS} +) + + +############################################################## +# Build library +############################################################## +build_library(${KAFKA_TELEMETRY} + SHARED + EMPTY_PREFIX + SOURCES ${SOURCE_FILES} + LINK_LIBRARIES ${LIBRARIES} + INCLUDE_DIRS + PRIVATE ${INCLUDE_DIRS} + INSTALL_FULL_PATH_DIR ${COLLECTD_PLUGIN_DIR} + COMPONENT ${COLLECTD_PLUGINS} + DEPENDS ${DEPENDENCIES} + LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} +) diff --git a/telemetry/kafka-collectd/format_influxdb.c b/telemetry/kafka-collectd/format_influxdb.c new file mode 100644 index 000000000..f7ff29501 --- /dev/null +++ b/telemetry/kafka-collectd/format_influxdb.c @@ -0,0 +1,190 @@ +/** + * collectd - src/utils_format_influxdb.c + * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2009 Aman Gupta + * Copyright (C) 2019 Carlos Peon Costa + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster <octo at collectd.org> + * Aman Gupta <aman at tmm1.net> + * Carlos Peon Costa <carlospeon at gmail.com> + * multiple Server directives by: + * Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com> + **/ + +#include "format_influxdb.h" + +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" +#include "utils/metadata/meta_data.h" +#include "utils_cache.h" + +static int format_influxdb_escape_string(char *buffer, size_t buffer_size, + const char *string) { + if ((buffer == NULL) || (string == NULL)) return -EINVAL; + + if (buffer_size < 3) return -ENOMEM; + + int dst_pos = 0; + +#define BUFFER_ADD(c) \ + do { \ + if (dst_pos >= (buffer_size - 1)) { \ + buffer[buffer_size - 1] = '\0'; \ + return -ENOMEM; \ + } \ + buffer[dst_pos] = (c); \ + dst_pos++; \ + } while (0) + + /* Escape special characters */ + for (int src_pos = 0; string[src_pos] != 0; src_pos++) { + if ((string[src_pos] == '\\') || (string[src_pos] == ' ') || + (string[src_pos] == ',') || (string[src_pos] == '=') || + (string[src_pos] == '"')) { + BUFFER_ADD('\\'); + BUFFER_ADD(string[src_pos]); + } else + BUFFER_ADD(string[src_pos]); + } /* for */ + buffer[dst_pos] = 0; + +#undef BUFFER_ADD + + return dst_pos; +} /* int format_influxdb_escape_string */ + +int format_influxdb_value_list( + char *buffer, int buffer_len, const data_set_t *ds, const value_list_t *vl, + bool store_rates, format_influxdb_time_precision_t time_precision) { + int status; + int offset = 0; + gauge_t *rates = NULL; + bool have_values = false; + + assert(0 == strcmp(ds->type, vl->type)); + +#define BUFFER_ADD_ESCAPE(...) \ + do { \ + status = format_influxdb_escape_string(buffer + offset, \ + buffer_len - offset, __VA_ARGS__); \ + if (status < 0) return status; \ + offset += status; \ + } while (0) + +#define BUFFER_ADD(...) \ + do { \ + status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \ + if ((status < 0) || (status >= (buffer_len - offset))) { \ + sfree(rates); \ + return -ENOMEM; \ + } \ + offset += status; \ + } while (0) + + assert(vl->type); + BUFFER_ADD_ESCAPE(vl->type); + BUFFER_ADD(",host="); + BUFFER_ADD_ESCAPE(vl->host); + if (vl->meta) { + char **toc; + int n = meta_data_toc(vl->meta, &toc); + + for (int i = 0; i < n; i++) { + char *key = toc[i]; + char *value; + + if (meta_data_as_string(vl->meta, key, &value) == 0) { + BUFFER_ADD(","); + BUFFER_ADD_ESCAPE(key); + BUFFER_ADD("="); + BUFFER_ADD_ESCAPE(value); + free(value); + } + free(toc[i]); + } + + if (n != 0) free(toc); + } + + BUFFER_ADD(" "); + for (size_t i = 0; i < ds->ds_num; i++) { + if ((ds->ds[i].type != DS_TYPE_COUNTER) && + (ds->ds[i].type != DS_TYPE_GAUGE) && + (ds->ds[i].type != DS_TYPE_DERIVE) && + (ds->ds[i].type != DS_TYPE_ABSOLUTE)) { + sfree(rates); + return -EINVAL; + } + + if (ds->ds[i].type == DS_TYPE_GAUGE) { + if (isnan(vl->values[i].gauge)) continue; + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge); + have_values = true; + } else if (store_rates) { + if (rates == NULL) rates = uc_get_rate(ds, vl); + if (rates == NULL) { + WARNING( + "format_influxdb: " + "uc_get_rate failed."); + return -EINVAL; + } + if (isnan(rates[i])) continue; + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_COUNTER) { + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, + (uint64_t)vl->values[i].counter); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_DERIVE) { + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) { + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute); + have_values = true; + } + + } /* for ds->ds_num */ + sfree(rates); + + if (!have_values) return 0; + + uint64_t influxdb_time = 0; + switch (time_precision) { + case NS: + influxdb_time = CDTIME_T_TO_NS(vl->time); + break; + case US: + influxdb_time = CDTIME_T_TO_US(vl->time); + break; + case MS: + influxdb_time = CDTIME_T_TO_MS(vl->time); + break; + } + + BUFFER_ADD(" %" PRIu64 "\n", influxdb_time); + +#undef BUFFER_ADD_ESCAPE +#undef BUFFER_ADD + + return offset; +} /* int format_influxdb_value_list */ diff --git a/telemetry/kafka-collectd/format_influxdb.h b/telemetry/kafka-collectd/format_influxdb.h new file mode 100644 index 000000000..fd298f11b --- /dev/null +++ b/telemetry/kafka-collectd/format_influxdb.h @@ -0,0 +1,45 @@ +/** + * collectd - src/utils_format_influxdb.h + * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2009 Aman Gupta + * Copyright (C) 2019 Carlos Peon Costa + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster <octo at collectd.org> + * Aman Gupta <aman at tmm1.net> + * Carlos Peon Costa <carlospeon at gmail.com> + * multiple Server directives by: + * Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com> + **/ + +#ifndef UTILS_FORMAT_INFLUXDB_H +#define UTILS_FORMAT_INFLUXDB_H 1 + +#include "collectd.h" +#include "plugin.h" + +typedef enum { + NS, + US, + MS, +} format_influxdb_time_precision_t; + +int format_influxdb_value_list(char *buffer, int buffer_len, + const data_set_t *ds, const value_list_t *vl, + bool store_rates, + format_influxdb_time_precision_t time_precision); + +#endif /* UTILS_FORMAT_INFLUXDB_H */ diff --git a/telemetry/kafka-collectd/write_kafka_line_protocol.c b/telemetry/kafka-collectd/write_kafka_line_protocol.c new file mode 100644 index 000000000..5eb7b520d --- /dev/null +++ b/telemetry/kafka-collectd/write_kafka_line_protocol.c @@ -0,0 +1,526 @@ +/** + * collectd - src/write_kafka.c + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Pierre-Yves Ritschard <pyr at spootnik.org> + */ + +#include <errno.h> +#include <librdkafka/rdkafka.h> +#include <stdint.h> + +#include "../data_model.h" +#include "collectd.h" +#include "format_influxdb.h" +#include "plugin.h" +#include "utils/cmds/putval.h" +#include "utils/common/common.h" +#include "utils/format_graphite/format_graphite.h" +#include "utils/format_json/format_json.h" +#include "utils_random.h" + +struct kafka_topic_context { +#define KAFKA_FORMAT_JSON 0 +#define KAFKA_FORMAT_COMMAND 1 +#define KAFKA_FORMAT_GRAPHITE 2 +#define KAFKA_FORMAT_INFLUXDB 3 + uint8_t format; + unsigned int graphite_flags; + bool store_rates; + rd_kafka_topic_conf_t *conf; + rd_kafka_topic_t *topic; + rd_kafka_conf_t *kafka_conf; + rd_kafka_t *kafka; + char *key; + char *prefix; + char *postfix; + char escape_char; + char *topic_name; + pthread_mutex_t lock; +}; + +static int kafka_handle(struct kafka_topic_context *); +static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); +static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, + int32_t, void *, void *); + +/* Version 0.9.0 of librdkafka deprecates rd_kafka_set_logger() in favor of + * rd_kafka_conf_set_log_cb(). This is to make sure we're not using the + * deprecated function. */ +#ifdef HAVE_LIBRDKAFKA_LOG_CB +#undef HAVE_LIBRDKAFKA_LOGGER +#endif + +#if defined(HAVE_LIBRDKAFKA_LOGGER) || defined(HAVE_LIBRDKAFKA_LOG_CB) +static void kafka_log(const rd_kafka_t *, int, const char *, const char *); + +static void kafka_log(const rd_kafka_t *rkt, int level, const char *fac, + const char *msg) { + plugin_log(level, "%s", msg); +} +#endif + +static rd_kafka_resp_err_t kafka_error() { +#if RD_KAFKA_VERSION >= 0x000b00ff + return rd_kafka_last_error(); +#else + return rd_kafka_errno2err(errno); +#endif +} + +static uint32_t kafka_hash(const char *keydata, size_t keylen) { + uint32_t hash = 5381; + for (; keylen > 0; keylen--) + hash = ((hash << 5) + hash) + keydata[keylen - 1]; + return hash; +} + +/* 31 bit -> 4 byte -> 8 byte hex string + null byte */ +#define KAFKA_RANDOM_KEY_SIZE 9 +#define KAFKA_RANDOM_KEY_BUFFER \ + (char[KAFKA_RANDOM_KEY_SIZE]) { "" } +static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE]) { + ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u()); + return buffer; +} + +static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata, + size_t keylen, int32_t partition_cnt, void *p, + void *m) { + uint32_t key = kafka_hash(keydata, keylen); + uint32_t target = key % partition_cnt; + int32_t i = partition_cnt; + + while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) { + target = (target + 1) % partition_cnt; + } + return target; +} + +static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */ +{ + char errbuf[1024]; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + + if (ctx->kafka != NULL && ctx->topic != NULL) return 0; + + if (ctx->kafka == NULL) { + if ((conf = rd_kafka_conf_dup(ctx->kafka_conf)) == NULL) { + ERROR("write_kafka plugin: cannot duplicate kafka config"); + return 1; + } + + if ((ctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf, + sizeof(errbuf))) == NULL) { + ERROR("write_kafka plugin: cannot create kafka handle."); + return 1; + } + + rd_kafka_conf_destroy(ctx->kafka_conf); + ctx->kafka_conf = NULL; + + INFO("write_kafka plugin: created KAFKA handle : %s", + rd_kafka_name(ctx->kafka)); + +#if defined(HAVE_LIBRDKAFKA_LOGGER) && !defined(HAVE_LIBRDKAFKA_LOG_CB) + rd_kafka_set_logger(ctx->kafka, kafka_log); +#endif + } + + if (ctx->topic == NULL) { + if ((topic_conf = rd_kafka_topic_conf_dup(ctx->conf)) == NULL) { + ERROR("write_kafka plugin: cannot duplicate kafka topic config"); + return 1; + } + + if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name, + topic_conf)) == NULL) { + ERROR("write_kafka plugin: cannot create topic : %s\n", + rd_kafka_err2str(kafka_error())); + return errno; + } + + rd_kafka_topic_conf_destroy(ctx->conf); + ctx->conf = NULL; + + INFO("write_kafka plugin: handle created for topic : %s", + rd_kafka_topic_name(ctx->topic)); + } + + return 0; + +} /* }}} int kafka_handle */ + +static int kafka_write(const data_set_t *ds, /* {{{ */ + const value_list_t *vl, user_data_t *ud) { + int status = 0; + void *key; + size_t keylen = 0; + char buffer[8192]; + size_t bfree = sizeof(buffer); + size_t bfill = 0; + size_t blen = 0; + struct kafka_topic_context *ctx = ud->data; + + if ((ds == NULL) || (vl == NULL) || (ctx == NULL)) return EINVAL; + + pthread_mutex_lock(&ctx->lock); + status = kafka_handle(ctx); + pthread_mutex_unlock(&ctx->lock); + if (status != 0) return status; + + bzero(buffer, sizeof(buffer)); + + switch (ctx->format) { + case KAFKA_FORMAT_COMMAND: + status = cmd_create_putval(buffer, sizeof(buffer), ds, vl); + if (status != 0) { + ERROR("write_kafka plugin: cmd_create_putval failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + break; + case KAFKA_FORMAT_JSON: + format_json_initialize(buffer, &bfill, &bfree); + format_json_value_list(buffer, &bfill, &bfree, ds, vl, ctx->store_rates); + format_json_finalize(buffer, &bfill, &bfree); + blen = strlen(buffer); + break; + case KAFKA_FORMAT_GRAPHITE: + status = + format_graphite(buffer, sizeof(buffer), ds, vl, ctx->prefix, + ctx->postfix, ctx->escape_char, ctx->graphite_flags); + if (status != 0) { + ERROR("write_kafka plugin: format_graphite failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + break; + case KAFKA_FORMAT_INFLUXDB: { + // Decide format depending on the topic + // (to handle multilple topics w/ different formats); + // Comment this part to use the stream topic as default for other + // collectd plugins (e.g. cpu, mem) + char *topic = NULL; + int rc = meta_data_get_string(vl->meta, KAFKA_TOPIC_KEY, &topic); + if (rc != 0 || topic == NULL) return 0; + if (strcasecmp(KAFKA_STREAM_TOPIC, topic) != 0) { + free(topic); + return 0; + } + meta_data_delete(vl->meta, KAFKA_TOPIC_KEY); + + status = + format_influxdb_value_list(buffer, sizeof(buffer), ds, vl, false, NS); + if (status <= 0) { + ERROR("write_kafka plugin: format_influxdb failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + + // Print without newline + buffer[blen - 1] = 0; + INFO("%s", buffer); + buffer[blen - 1] = '\n'; + + break; + } + default: + ERROR("write_kafka plugin: invalid format %i.", ctx->format); + return -1; + } + + key = + (ctx->key != NULL) ? ctx->key : kafka_random_key(KAFKA_RANDOM_KEY_BUFFER); + keylen = strlen(key); + + rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, + buffer, blen, key, keylen, NULL); + + return status; +} /* }}} int kafka_write */ + +static void kafka_topic_context_free(void *p) /* {{{ */ +{ + struct kafka_topic_context *ctx = p; + + if (ctx == NULL) return; + + if (ctx->topic_name != NULL) sfree(ctx->topic_name); + if (ctx->topic != NULL) rd_kafka_topic_destroy(ctx->topic); + if (ctx->conf != NULL) rd_kafka_topic_conf_destroy(ctx->conf); + if (ctx->kafka_conf != NULL) rd_kafka_conf_destroy(ctx->kafka_conf); + if (ctx->kafka != NULL) rd_kafka_destroy(ctx->kafka); + + sfree(ctx); +} /* }}} void kafka_topic_context_free */ + +static void kafka_config_topic(rd_kafka_conf_t *conf, + oconfig_item_t *ci) /* {{{ */ +{ + int status; + struct kafka_topic_context *tctx; + char *key = NULL; + char *val; + char callback_name[DATA_MAX_NAME_LEN]; + char errbuf[1024]; + oconfig_item_t *child; + rd_kafka_conf_res_t ret; + + if ((tctx = calloc(1, sizeof(*tctx))) == NULL) { + ERROR("write_kafka plugin: calloc failed."); + return; + } + + tctx->escape_char = '.'; + tctx->store_rates = true; + tctx->format = KAFKA_FORMAT_JSON; + tctx->key = NULL; + + if ((tctx->kafka_conf = rd_kafka_conf_dup(conf)) == NULL) { + sfree(tctx); + ERROR("write_kafka plugin: cannot allocate memory for kafka config"); + return; + } + +#ifdef HAVE_LIBRDKAFKA_LOG_CB + rd_kafka_conf_set_log_cb(tctx->kafka_conf, kafka_log); +#endif + + if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) { + rd_kafka_conf_destroy(tctx->kafka_conf); + sfree(tctx); + ERROR("write_kafka plugin: cannot create topic configuration."); + return; + } + + if (ci->values_num != 1) { + WARNING("kafka topic name needed."); + goto errout; + } + + if (ci->values[0].type != OCONFIG_TYPE_STRING) { + WARNING("kafka topic needs a string argument."); + goto errout; + } + + if ((tctx->topic_name = strdup(ci->values[0].value.string)) == NULL) { + ERROR("write_kafka plugin: cannot copy topic name."); + goto errout; + } + + for (int i = 0; i < ci->children_num; i++) { + /* + * The code here could be simplified but makes room + * for easy adding of new options later on. + */ + child = &ci->children[i]; + status = 0; + + if (strcasecmp("Property", child->key) == 0) { + if (child->values_num != 2) { + WARNING("kafka properties need both a key and a value."); + goto errout; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("kafka properties needs string arguments."); + goto errout; + } + key = child->values[0].value.string; + val = child->values[1].value.string; + ret = + rd_kafka_topic_conf_set(tctx->conf, key, val, errbuf, sizeof(errbuf)); + if (ret != RD_KAFKA_CONF_OK) { + WARNING("cannot set kafka topic property %s to %s: %s.", key, val, + errbuf); + goto errout; + } + + } else if (strcasecmp("Key", child->key) == 0) { + if (cf_util_get_string(child, &tctx->key) != 0) continue; + if (strcasecmp("Random", tctx->key) == 0) { + sfree(tctx->key); + tctx->key = strdup(kafka_random_key(KAFKA_RANDOM_KEY_BUFFER)); + } + } else if (strcasecmp("Format", child->key) == 0) { + status = cf_util_get_string(child, &key); + if (status != 0) goto errout; + + assert(key != NULL); + + if (strcasecmp(key, "Command") == 0) { + tctx->format = KAFKA_FORMAT_COMMAND; + + } else if (strcasecmp(key, "Graphite") == 0) { + tctx->format = KAFKA_FORMAT_GRAPHITE; + + } else if (strcasecmp(key, "Json") == 0) { + tctx->format = KAFKA_FORMAT_JSON; + + } else if (strcasecmp(key, "InfluxDB") == 0) { + tctx->format = KAFKA_FORMAT_INFLUXDB; + + } else { + WARNING("write_kafka plugin: Invalid format string: %s", key); + } + + sfree(key); + + } else if (strcasecmp("StoreRates", child->key) == 0) { + status = cf_util_get_boolean(child, &tctx->store_rates); + (void)cf_util_get_flag(child, &tctx->graphite_flags, + GRAPHITE_STORE_RATES); + + } else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0) { + status = cf_util_get_flag(child, &tctx->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + + } else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0) { + status = cf_util_get_flag(child, &tctx->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); + + } else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0) { + status = cf_util_get_flag(child, &tctx->graphite_flags, + GRAPHITE_PRESERVE_SEPARATOR); + + } else if (strcasecmp("GraphiteUseTags", child->key) == 0) { + status = + cf_util_get_flag(child, &tctx->graphite_flags, GRAPHITE_USE_TAGS); + + } else if (strcasecmp("GraphitePrefix", child->key) == 0) { + status = cf_util_get_string(child, &tctx->prefix); + } else if (strcasecmp("GraphitePostfix", child->key) == 0) { + status = cf_util_get_string(child, &tctx->postfix); + } else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) { + char *tmp_buff = NULL; + status = cf_util_get_string(child, &tmp_buff); + if (strlen(tmp_buff) > 1) + WARNING( + "write_kafka plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + tctx->escape_char = tmp_buff[0]; + sfree(tmp_buff); + } else { + WARNING("write_kafka plugin: Invalid directive: %s.", child->key); + } + + if (status != 0) break; + } + + rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition); + rd_kafka_topic_conf_set_opaque(tctx->conf, tctx); + + ssnprintf(callback_name, sizeof(callback_name), "write_kafka/%s", + tctx->topic_name); + + status = plugin_register_write(callback_name, kafka_write, + &(user_data_t){ + .data = tctx, + .free_func = kafka_topic_context_free, + }); + if (status != 0) { + WARNING( + "write_kafka plugin: plugin_register_write (\"%s\") " + "failed with status %i.", + callback_name, status); + goto errout; + } + + pthread_mutex_init(&tctx->lock, /* attr = */ NULL); + + return; +errout: + if (tctx->topic_name != NULL) free(tctx->topic_name); + if (tctx->conf != NULL) rd_kafka_topic_conf_destroy(tctx->conf); + if (tctx->kafka_conf != NULL) rd_kafka_conf_destroy(tctx->kafka_conf); + sfree(tctx); +} /* }}} int kafka_config_topic */ + +static int kafka_config(oconfig_item_t *ci) /* {{{ */ +{ + oconfig_item_t *child; + rd_kafka_conf_t *conf; + rd_kafka_conf_res_t ret; + char errbuf[1024]; + + if ((conf = rd_kafka_conf_new()) == NULL) { + WARNING("cannot allocate kafka configuration."); + return -1; + } + for (int i = 0; i < ci->children_num; i++) { + child = &ci->children[i]; + + if (strcasecmp("Topic", child->key) == 0) { + kafka_config_topic(conf, child); + } else if (strcasecmp(child->key, "Property") == 0) { + char *key = NULL; + char *val = NULL; + + if (child->values_num != 2) { + WARNING("kafka properties need both a key and a value."); + goto errout; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("kafka properties needs string arguments."); + goto errout; + } + if ((key = strdup(child->values[0].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute key."); + goto errout; + } + if ((val = strdup(child->values[1].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute value."); + sfree(key); + goto errout; + } + ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf)); + if (ret != RD_KAFKA_CONF_OK) { + WARNING("cannot set kafka property %s to %s: %s", key, val, errbuf); + sfree(key); + sfree(val); + goto errout; + } + sfree(key); + sfree(val); + } else { + WARNING( + "write_kafka plugin: Ignoring unknown " + "configuration option \"%s\" at top level.", + child->key); + } + } + if (conf != NULL) rd_kafka_conf_destroy(conf); + return 0; +errout: + if (conf != NULL) rd_kafka_conf_destroy(conf); + return -1; +} /* }}} int kafka_config */ + +void module_register(void) { + plugin_register_complex_config("write_kafka_line_protocol", kafka_config); +} diff --git a/telemetry/third-party/CMakeLists.txt b/telemetry/third-party/CMakeLists.txt new file mode 100644 index 000000000..26d305d62 --- /dev/null +++ b/telemetry/third-party/CMakeLists.txt @@ -0,0 +1,18 @@ +# Copyright (c) 2022 Cisco and/or its affiliates. + +set(THIRD_PARTY_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR}) + +include(FetchContent) +set(FETCHCONTENT_QUIET off) +FetchContent_Declare(collectd + URL https://github.com/collectd/collectd/archive/refs/tags/collectd-${COLLECTD_VERSION}.zip + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" +) +FetchContent_Populate(collectd) + +list(APPEND THIRD_PARTY_INCLUDE_DIRS + ${collectd_SOURCE_DIR}/src +) +set(THIRD_PARTY_INCLUDE_DIRS ${THIRD_PARTY_INCLUDE_DIRS} PARENT_SCOPE) diff --git a/telemetry/vpp-collectd/CMakeLists.txt b/telemetry/vpp-collectd/CMakeLists.txt index ef09fb980..4ce357f63 100644 --- a/telemetry/vpp-collectd/CMakeLists.txt +++ b/telemetry/vpp-collectd/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,20 +11,26 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) +############################################################## +# Dependencies +############################################################## +find_package(Vpp ${VPP_DEFAULT_VERSION} REQUIRED) -set (COLLECTD_PLUGINS hicn-collectd-plugins) -project(hicn-collectd-plugins) -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/" "${CMAKE_CURRENT_SOURCE_DIR}/../../cmake/Modules/") +############################################################## +# Compiler Options +############################################################## +set(COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} +) -include(BuildMacros) +############################################################## +# Subdirectories +############################################################## add_subdirectory(vpp) add_subdirectory(vpp-hicn) -include(Packaging) if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) - include(Packager) - make_packages() + make_packages() endif() diff --git a/telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt b/telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt index fc39b9385..85dd51577 100644 --- a/telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt +++ b/telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,45 +11,63 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) - +############################################################## # Dependencies -find_package(Collectd REQUIRED) -find_package(Vpp REQUIRED) - -if(${CMAKE_SOURCE_DIR}/vpp-collectd STREQUAL ${PROJECT_SOURCE_DIR}) - message (STATUS "not compiling in the same folder") - find_package(HicnPlugin REQUIRED) - find_package(VapiSafe REQUIRED) -else() - message (STATUS "compiling in the same folder") - list(APPEND DEPENDENCIES - hicn_plugin - ) -endif() +############################################################## +find_package(HicnPlugin ${CURRENT_VERSION} REQUIRED) +find_package(Libsafevapi ${CURRENT_VERSION} REQUIRED NO_MODULE) + +############################################################## +# Source files +############################################################## list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/vpp_hicn.c + ${CMAKE_CURRENT_SOURCE_DIR}/vpp_hicn.c ) -list (APPEND INCLUDE_DIRS - ${COLLECTD_INCLUDE_DIRS} - ${HICNPLUGIN_INCLUDE_DIRS} - ${SAFE_VAPI_INCLUDE_DIRS} - ${VPP_INCLUDE_DIRS} - ${CMAKE_CURRENT_SOURCE_DIR}) - -list (APPEND LIBRARIES - ${VPP_LIBRARY_VAPICLIENT} - ${SAFE_VAPI_LIBRARIES}) - -build_library(vpp_hicn - SHARED - SOURCES ${SOURCE_FILES} - LINK_LIBRARIES ${LIBRARIES} - INCLUDE_DIRS ${INCLUDE_DIRS} - INSTALL_FULL_PATH_DIR ${CMAKE_INSTALL_PREFIX}/lib/collectd - COMPONENT "${COLLECTD_PLUGINS}" - DEPENDS ${DEPENDENCIES} - EMPTY_PREFIX true - ) + +############################################################## +# Include dirs +############################################################## +list(APPEND INCLUDE_DIRS + ${COLLECTD_INCLUDE_DIRS} + ${THIRD_PARTY_INCLUDE_DIRS} + ${HICNPLUGIN_INCLUDE_DIRS} + ${Libsafe_vapi_INCLUDE_DIRS} + ${VPP_INCLUDE_DIRS} +) + + +############################################################## +# Libraries +############################################################## +list(APPEND LIBRARIES + ${VPP_LIBRARY_VAPICLIENT} + hicn::safevapi.shared +) + + +############################################################## +# Compiler options +############################################################## +list(APPEND COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ${COLLECTD_COMPILER_OPTIONS} +) + + +############################################################## +# Build library +############################################################## +build_library(${VPP_HICN_TELEMETRY} + SHARED + EMPTY_PREFIX + SOURCES ${SOURCE_FILES} + LINK_LIBRARIES ${LIBRARIES} + INCLUDE_DIRS + PRIVATE ${INCLUDE_DIRS} + INSTALL_FULL_PATH_DIR ${COLLECTD_PLUGIN_DIR} + COMPONENT ${COLLECTD_PLUGINS} + DEPENDS ${DEPENDENCIES} + COMPILE_OPTIONS ${COMPILER_OPTIONS} +) diff --git a/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c b/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c index 591b8f584..a20bcbcd0 100644 --- a/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c +++ b/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,30 +13,14 @@ * limitations under the License. */ -#if !HAVE_CONFIG_H -#include <stdlib.h> -#include <string.h> - -#ifndef __USE_ISOC99 /* required for NAN */ -#define DISABLE_ISOC99 1 -#define __USE_ISOC99 1 -#endif /* !defined(__USE_ISOC99) */ - -#if DISABLE_ISOC99 -#undef DISABLE_ISOC99 -#undef __USE_ISOC99 -#endif /* DISABLE_ISOC99 */ -#endif /* ! HAVE_CONFIG */ - -/* Keep order as it is */ -#include <config.h> -#include <collectd.h> -#include <common.h> -#include <plugin.h> +#include "../../data_model.h" +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" #define counter_t vpp_counter_t +#include <hicn/vapi/vapi_safe.h> #include <vapi/hicn.api.vapi.h> -#include <vapi/vapi_safe.h> #undef counter_t DEFINE_VAPI_MSG_IDS_HICN_API_JSON @@ -51,144 +35,9 @@ static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); static bool verbose = false; static char *tag = NULL; -/************** DATA SOURCES ******************************/ -static data_source_t packets_dsrc[1] = { - {"packets", DS_TYPE_GAUGE, 0, NAN}, -}; - -static data_source_t interests_dsrc[1] = { - {"interests", DS_TYPE_GAUGE, 0, NAN}, -}; - -static data_source_t data_dsrc[1] = { - {"data", DS_TYPE_GAUGE, 0, NAN}, -}; - -static data_source_t combined_dsrc[2] = { - {"packets", DS_TYPE_DERIVE, 0, NAN}, - {"bytes", DS_TYPE_DERIVE, 0, NAN}, -}; - -/************** DATA SETS NODE ****************************/ -static data_set_t pkts_processed_ds = { - "pkts_processed", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pkts_interest_count_ds = { - "pkts_interest_count", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pkts_data_count_ds = { - "pkts_data_count", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pkts_from_cache_count_ds = { - "pkts_from_cache_count", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pkts_no_pit_count_ds = { - "pkts_no_pit_count", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pit_expired_count_ds = { - "pit_expired_count", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t cs_expired_count_ds = { - "cs_expired_count", - STATIC_ARRAY_SIZE(data_dsrc), - data_dsrc, -}; - -static data_set_t cs_lru_count_ds = { - "cs_lru_count", - STATIC_ARRAY_SIZE(data_dsrc), - data_dsrc, -}; - -static data_set_t pkts_drop_no_buf_ds = { - "pkts_drop_no_buf", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t interests_aggregated_ds = { - "interests_aggregated", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t interests_retx_ds = { - "interests_retx", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t interests_hash_collision_ds = { - "interests_hash_collision", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t pit_entries_count_ds = { - "pit_entries_count", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t cs_entries_count_ds = { - "cs_entries_count", - STATIC_ARRAY_SIZE(data_dsrc), - data_dsrc, -}; - -static data_set_t cs_entries_ntw_count_ds = { - "cs_entries_ntw_count", - STATIC_ARRAY_SIZE(data_dsrc), - data_dsrc, -}; - -/************** DATA SETS FACE ****************************/ -static data_set_t irx_ds = { - "irx", - STATIC_ARRAY_SIZE(combined_dsrc), - combined_dsrc, -}; - -static data_set_t itx_ds = { - "itx", - STATIC_ARRAY_SIZE(combined_dsrc), - combined_dsrc, -}; - -static data_set_t drx_ds = { - "drx", - STATIC_ARRAY_SIZE(combined_dsrc), - combined_dsrc, -}; - -static data_set_t dtx_ds = { - "dtx", - STATIC_ARRAY_SIZE(combined_dsrc), - combined_dsrc, -}; - /**********************************************************/ /********** UTILITY FUNCTIONS *****************************/ /**********************************************************/ - /* * Utility function used by the read callback to populate a * value_list_t and pass it to plugin_dispatch_values. @@ -207,8 +56,7 @@ static int submit(const char *plugin_instance, const char *type, sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance)); sstrncpy(vl.type, type, sizeof(vl.type)); - if (tag != NULL) - sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); + if (tag != NULL) sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); return plugin_dispatch_values(&vl); } @@ -216,7 +64,6 @@ static int submit(const char *plugin_instance, const char *type, /**********************************************************/ /********** CALLBACK FUNCTIONS ****************************/ /**********************************************************/ - /* * This function is called for each configuration item. */ @@ -242,15 +89,12 @@ static int vpp_hicn_config(const char *key, const char *value) { /* * Callback called by the hICN plugin API when node stats are ready. */ -static vapi_error_e -parse_node_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, - bool is_last, - vapi_payload_hicn_api_node_stats_get_reply *reply) { - if (reply == NULL || rv != VAPI_OK) - return rv; +static vapi_error_e parse_node_stats( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_node_stats_get_reply *reply) { + if (reply == NULL || rv != VAPI_OK) return rv; - if (reply->retval != VAPI_OK) - return reply->retval; + if (reply->retval != VAPI_OK) return reply->retval; char *node_name = "node"; value_t values[1]; @@ -296,15 +140,12 @@ parse_node_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, /* * Callback called by the hICN plugin API when face stats are ready. */ -static vapi_error_e -parse_face_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, - bool is_last, - vapi_payload_hicn_api_face_stats_details *reply) { - if (reply == NULL || rv != VAPI_OK) - return rv; +static vapi_error_e parse_face_stats( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_face_stats_details *reply) { + if (reply == NULL || rv != VAPI_OK) return rv; - if (reply->retval != VAPI_OK) - return reply->retval; + if (reply->retval != VAPI_OK) return reply->retval; char face_name[10]; snprintf(face_name, 10, "face%u", reply->faceid); @@ -333,8 +174,7 @@ parse_face_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, static int vpp_hicn_init(void) { int ret = vapi_connect_safe(&vapi_ctx, 0); - if (ret) - plugin_log(LOG_ERR, "vpp_hicn plugin: vapi_connect_safe failed"); + if (ret) plugin_log(LOG_ERR, "vpp_hicn plugin: vapi_connect_safe failed"); return ret; } diff --git a/telemetry/vpp-collectd/vpp/CMakeLists.txt b/telemetry/vpp-collectd/vpp/CMakeLists.txt index 36248b17a..e1cf55553 100644 --- a/telemetry/vpp-collectd/vpp/CMakeLists.txt +++ b/telemetry/vpp-collectd/vpp/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,32 +11,53 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) +############################################################## +# Sources +############################################################## +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/vpp.c +) -# Dependencies -list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/vpp.c +############################################################## +# Include dirs +############################################################## +list(APPEND INCLUDE_DIRS + ${COLLECTD_INCLUDE_DIRS} + ${THIRD_PARTY_INCLUDE_DIRS} + ${VPP_INCLUDE_DIRS} +) + + +############################################################## +# Libraries +############################################################## +list(APPEND LIBRARIES + ${VPP_LIBRARY_VPPAPICLIENT} + ${VPP_LIBRARY_INFRA} ) -find_package(Vpp REQUIRED) -find_package(Collectd REQUIRED) - -list (APPEND INCLUDE_DIRS - ${COLLECTD_INCLUDE_DIRS} - ${VPP_INCLUDE_DIRS} - ${CMAKE_CURRENT_SOURCE_DIR}) - -list (APPEND LIBRARIES - ${VPP_LIBRARY_VPPAPICLIENT} - ${VPP_LIBRARY_INFRA}) - -build_library(vpp - SHARED - SOURCES ${SOURCE_FILES} - LINK_LIBRARIES ${LIBRARIES} - INCLUDE_DIRS ${INCLUDE_DIRS} - INSTALL_FULL_PATH_DIR ${CMAKE_INSTALL_PREFIX}/lib/collectd - COMPONENT ${COLLECTD_PLUGINS} - EMPTY_PREFIX true - ) + +############################################################## +# Compiler options +############################################################## +list(APPEND COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ${COLLECTD_COMPILER_OPTIONS} +) + + +############################################################## +# Build library +############################################################## +build_library(${VPP_TELEMETRY} + SHARED + EMPTY_PREFIX + SOURCES ${SOURCE_FILES} + LINK_LIBRARIES ${LIBRARIES} + INCLUDE_DIRS + PRIVATE ${INCLUDE_DIRS} + INSTALL_FULL_PATH_DIR ${COLLECTD_PLUGIN_DIR} + COMPONENT ${COLLECTD_PLUGINS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} +) diff --git a/telemetry/vpp-collectd/vpp/vpp.c b/telemetry/vpp-collectd/vpp/vpp.c index ba838a050..ff70f3503 100644 --- a/telemetry/vpp-collectd/vpp/vpp.c +++ b/telemetry/vpp-collectd/vpp/vpp.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,26 +13,9 @@ * limitations under the License. */ -#if !HAVE_CONFIG_H -#include <stdlib.h> -#include <string.h> - -#ifndef __USE_ISOC99 /* required for NAN */ -#define DISABLE_ISOC99 1 -#define __USE_ISOC99 1 -#endif /* !defined(__USE_ISOC99) */ - -#if DISABLE_ISOC99 -#undef DISABLE_ISOC99 -#undef __USE_ISOC99 -#endif /* DISABLE_ISOC99 */ -#endif /* ! HAVE_CONFIG */ - -/* Keep order as it is */ -#include <config.h> -#include <collectd.h> -#include <common.h> -#include <plugin.h> +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" #define counter_t vpp_counter_t #include <vpp-api/client/stat_client.h> @@ -164,7 +147,6 @@ static data_set_t if_tx_broadcast_ds = { /**********************************************************/ /********** UTILITY FUNCTIONS *****************************/ /**********************************************************/ - /* * Utility function used by the read callback to populate a * value_list_t and pass it to plugin_dispatch_values. @@ -183,8 +165,7 @@ static int submit(const char *plugin_instance, const char *type, sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance)); sstrncpy(vl.type, type, sizeof(vl.type)); - if (tag != NULL) - sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); + if (tag != NULL) sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); return plugin_dispatch_values(&vl); } @@ -247,7 +228,6 @@ static int get_data_set(const char *stat_name, data_set_t *data_set_ptr) { /**********************************************************/ /********** CALLBACK FUNCTIONS ****************************/ /**********************************************************/ - /* * This function is called for each configuration item. */ @@ -280,8 +260,7 @@ static int vpp_init(void) { u8 *stat_segment_name = (u8 *)STAT_SEGMENT_SOCKET_FILE; int ret = stat_segment_connect((char *)stat_segment_name); - if (ret) - plugin_log(LOG_ERR, "vpp plugin: connecting to segment failed"); + if (ret) plugin_log(LOG_ERR, "vpp plugin: connecting to segment failed"); return ret; } @@ -315,67 +294,65 @@ static int vpp_read(void) { /* Collect results for each interface and submit them */ for (int i = 0; i < vec_len(res); i++) { switch (res[i].type) { - case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE: - for (int k = 0; k < vec_len(res[i].simple_counter_vec); k++) { - for (int j = 0; j < vec_len(res[i].simple_counter_vec[k]); j++) { - if (!interfaces[j]) { - continue; - } + case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE: + for (int k = 0; k < vec_len(res[i].simple_counter_vec); k++) { + for (int j = 0; j < vec_len(res[i].simple_counter_vec[k]); j++) { + if (!interfaces[j]) { + continue; + } - if (get_data_set(res[i].name, &data_set)) { - continue; - } + if (get_data_set(res[i].name, &data_set)) { + continue; + } - value_t values[1] = { - (value_t){.derive = res[i].simple_counter_vec[k][j]} - }; + value_t values[1] = { + (value_t){.derive = res[i].simple_counter_vec[k][j]}}; - err = submit(interfaces[j], data_set.type, values, 1, ×tamp); + err = submit(interfaces[j], data_set.type, values, 1, ×tamp); - if (err) - goto END; + if (err) goto END; + } } - } - break; + break; - case STAT_DIR_TYPE_COUNTER_VECTOR_COMBINED: - for (int k = 0; k < vec_len(res[i].combined_counter_vec); k++) { - for (int j = 0; j < vec_len(res[i].combined_counter_vec[k]); j++) { - if (!interfaces[j]) { - continue; - } + case STAT_DIR_TYPE_COUNTER_VECTOR_COMBINED: + for (int k = 0; k < vec_len(res[i].combined_counter_vec); k++) { + for (int j = 0; j < vec_len(res[i].combined_counter_vec[k]); j++) { + if (!interfaces[j]) { + continue; + } - if (get_data_set(res[i].name, &data_set)) { - continue; - } + if (get_data_set(res[i].name, &data_set)) { + continue; + } - value_t values[2] = { - (value_t){.derive = res[i].combined_counter_vec[k][j].packets}, - (value_t){.derive = res[i].combined_counter_vec[k][j].bytes}, - }; + value_t values[2] = { + (value_t){.derive = res[i].combined_counter_vec[k][j].packets}, + (value_t){.derive = res[i].combined_counter_vec[k][j].bytes}, + }; - err = submit(interfaces[j], data_set.type, values, 2, ×tamp); + err = submit(interfaces[j], data_set.type, values, 2, ×tamp); - if (err) - goto END; + if (err) goto END; + } } - } - break; + break; - case STAT_DIR_TYPE_SCALAR_INDEX: - plugin_log(LOG_INFO, "vpp plugin: %.2f %s", res[i].scalar_value, - res[i].name); - break; + case STAT_DIR_TYPE_SCALAR_INDEX: + plugin_log(LOG_INFO, "vpp plugin: %.2f %s", res[i].scalar_value, + res[i].name); + break; - case STAT_DIR_TYPE_NAME_VECTOR: - break; + case STAT_DIR_TYPE_NAME_VECTOR: + break; - case STAT_DIR_TYPE_ERROR_INDEX: - break; + case STAT_DIR_TYPE_ERROR_INDEX: + break; - default: - plugin_log(LOG_WARNING, "vpp plugin: unknown stat type %d", res[i].type); - break; + default: + plugin_log(LOG_WARNING, "vpp plugin: unknown stat type %d", + res[i].type); + break; } } |