diff options
author | Luca Muscariello <lumuscar@cisco.com> | 2022-06-09 21:34:09 +0200 |
---|---|---|
committer | Luca Muscariello <muscariello@ieee.org> | 2022-06-30 10:47:50 +0200 |
commit | 6b94663b2455e212009a544ae23bb6a8c55407f8 (patch) | |
tree | 0af780ce5eeb1009fd24b8af8af08e8368eda3bd /telemetry | |
parent | a1ac96f497719b897793ac14b287cb8d840651c1 (diff) |
refactor(lib, hicn-light, vpp, hiperf): HICN-723
- move infra data structure into the shared lib
- new packet cache using double hashing and lookup on prefix suffix
- testing updates
- authenticated requests using interest manifests
Co-authored-by: Mauro Sardara <msardara@cisco.com>
Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com>
Co-authored-by: Michele Papalini <micpapal@cisco.com>
Co-authored-by: Olivier Roques <oroques+fdio@cisco.com>
Co-authored-by: Enrico Loparco <eloparco@cisco.com>
Change-Id: Iaddebfe6aa5279ea8553433b0f519578f6b9ccd9
Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'telemetry')
21 files changed, 1477 insertions, 1240 deletions
diff --git a/telemetry/.clang-format b/telemetry/.clang-format new file mode 100644 index 000000000..0043ea3d0 --- /dev/null +++ b/telemetry/.clang-format @@ -0,0 +1,3 @@ +# Copyright (c) 2022 Cisco and/or its affiliates. + +BasedOnStyle: Google
\ No newline at end of file diff --git a/telemetry/CMakeLists.txt b/telemetry/CMakeLists.txt index eff855646..181e726f1 100644 --- a/telemetry/CMakeLists.txt +++ b/telemetry/CMakeLists.txt @@ -11,19 +11,68 @@ # See the License for the specific language governing permissions and # limitations under the License. - +############################################################## +# Project and cmake version +############################################################## cmake_minimum_required(VERSION 3.10 FATAL_ERROR) +project(telemetry) + + +############################################################## +# C Standard +############################################################## +set(CMAKE_C_STANDARD 11) + + +############################################################## +# Cmake modules +############################################################## +include("${CMAKE_CURRENT_SOURCE_DIR}/../versions.cmake") +set(CMAKE_MODULE_PATH + ${CMAKE_MODULE_PATH} + ${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules +) + + +############################################################## +# Libs and Bins names +############################################################## +set(COLLECTD_PLUGINS hicn-collectd-plugins CACHE INTERNAL "" FORCE) +set(HICN_LIGHT_TELEMETRY hicn_light) +set(KAFKA_TELEMETRY write_kafka_line_protocol) +set(VPP_TELEMETRY vpp) +set(VPP_HICN_TELEMETRY vpp_hicn) + ############################################################## -# Packaging and versioning +# Dependencies and third party libs ############################################################## +find_package(Collectd ${COLLECTD_DEFAULT_VERSION} REQUIRED) +add_subdirectory(third-party) + + +############################################################## +# Check if building as subproject or as root project +############################################################## +if(NOT (CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) AND + NOT (BUILD_HICNPLUGIN AND "${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")) + return() +endif() +include(CommonSetup) + +# Include config.h in all collectd plugins +set(COLLECTD_COMPILER_OPTIONS -include config.h) + +# ############################################################## +# # Packaging and versioning +# ############################################################## include(${CMAKE_CURRENT_SOURCE_DIR}/../versions.cmake) +include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/packaging.cmake) ############################################################## # Subdirectories ############################################################## -if ((CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) OR - (BUILD_HICNPLUGIN AND "${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")) - add_subdirectory(vpp-collectd) -endif () +add_subdirectory(hicn-light-collectd) +add_subdirectory(kafka-collectd) +add_subdirectory(vpp-collectd)
\ No newline at end of file diff --git a/telemetry/vpp-collectd/cmake/packaging.cmake b/telemetry/cmake/packaging.cmake index 49a617342..d5d2264ce 100644 --- a/telemetry/vpp-collectd/cmake/packaging.cmake +++ b/telemetry/cmake/packaging.cmake @@ -21,11 +21,11 @@ set(${COLLECTD_PLUGINS}_DESCRIPTION ) set(${COLLECTD_PLUGINS}_DEB_DEPENDENCIES - "collectd, hicn-plugin-dev (= stable_version)" + "collectd, hicn-plugin-dev (= stable_version) libhicnctrl (= stable_version) libyajl-dev" CACHE STRING "Dependencies for deb/rpm package." ) set(${COLLECTD_PLUGINS}_RPM_DEPENDENCIES - "collectd, hicn-plugin-dev = stable_version" + "collectd, hicn-plugin-dev (= stable_version) libhicnctrl (= stable_version) libyajl-dev" CACHE STRING "Dependencies for deb/rpm package." )
\ No newline at end of file diff --git a/telemetry/collectd.conf b/telemetry/collectd.conf new file mode 100644 index 000000000..225c161c6 --- /dev/null +++ b/telemetry/collectd.conf @@ -0,0 +1,54 @@ +############################################################################## +# Global # +############################################################################## +FQDNLookup true +#Interval 10 + +# Limit the size of the write queue. Default is no limit. Setting up a limit +# is recommended for servers handling a high volume of traffic. +#WriteQueueLimitHigh 1000000 +#WriteQueueLimitLow 800000 + +############################################################################## +# Logging # +############################################################################## +LoadPlugin logfile + +<Plugin logfile> + LogLevel "info" + File STDOUT + Timestamp true + PrintSeverity true +</Plugin> + +############################################################################## +# LoadPlugin section # +############################################################################## +LoadPlugin write_log + +<LoadPlugin hicn_light> + Globals true # Required to find libhicnctrl symbols + Interval 5 +</LoadPlugin> + +<LoadPlugin write_kafka_line_protocol> + Interval 10 +</LoadPlugin> + +############################################################################## +# Plugin configuration # +############################################################################## +<Plugin write_kafka_line_protocol> + Property "bootstrap.servers" "localhost:8081" + Property "security.protocol" "sasl_plaintext" + Property "sasl.mechanism" "SCRAM-SHA-256" + Property "sasl.username" "eloparco" + Property "sasl.password" "password" + + <Topic "stream"> + Format InfluxDB + </Topic> + # <Topic "metadata"> + # Format hicnJSON + # </Topic> +</Plugin> diff --git a/telemetry/data_model.h b/telemetry/data_model.h new file mode 100644 index 000000000..b1c4ae7e5 --- /dev/null +++ b/telemetry/data_model.h @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2022 Cisco and/or its affiliates. + */ + +#include "utils/common/common.h" + +#define KAFKA_TOPIC_KEY "_TOPIC" +#define KAFKA_STREAM_TOPIC "stream" +#define KAFKA_METADATA_TOPIC "metadata" + +/************** DATA SOURCES ******************************/ +data_source_t packets_dsrc[1] = { + {"packets", DS_TYPE_GAUGE, 0, NAN}, +}; + +data_source_t interests_dsrc[1] = { + {"interests", DS_TYPE_GAUGE, 0, NAN}, +}; + +data_source_t data_dsrc[1] = { + {"data", DS_TYPE_GAUGE, 0, NAN}, +}; + +data_source_t combined_dsrc[2] = { + {"packets", DS_TYPE_DERIVE, 0, NAN}, + {"bytes", DS_TYPE_DERIVE, 0, NAN}, +}; + +/************** DATA SETS NODE ****************************/ +data_set_t pkts_processed_ds = { + "pkts_processed", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pkts_interest_count_ds = { + "pkts_interest_count", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pkts_data_count_ds = { + "pkts_data_count", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pkts_from_cache_count_ds = { + "pkts_from_cache_count", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pkts_no_pit_count_ds = { + "pkts_no_pit_count", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t pit_expired_count_ds = { + "pit_expired_count", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t cs_expired_count_ds = { + "cs_expired_count", + STATIC_ARRAY_SIZE(data_dsrc), + data_dsrc, +}; + +data_set_t cs_lru_count_ds = { + "cs_lru_count", + STATIC_ARRAY_SIZE(data_dsrc), + data_dsrc, +}; + +data_set_t pkts_drop_no_buf_ds = { + "pkts_drop_no_buf", + STATIC_ARRAY_SIZE(packets_dsrc), + packets_dsrc, +}; + +data_set_t interests_aggregated_ds = { + "interests_aggregated", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t interests_retx_ds = { + "interests_retx", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t interests_hash_collision_ds = { + "interests_hash_collision", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t pit_entries_count_ds = { + "pit_entries_count", + STATIC_ARRAY_SIZE(interests_dsrc), + interests_dsrc, +}; + +data_set_t cs_entries_count_ds = { + "cs_entries_count", + STATIC_ARRAY_SIZE(data_dsrc), + data_dsrc, +}; + +data_set_t cs_entries_ntw_count_ds = { + "cs_entries_ntw_count", + STATIC_ARRAY_SIZE(data_dsrc), + data_dsrc, +}; + +/************** DATA SETS FACE ****************************/ +data_set_t irx_ds = { + "irx", + STATIC_ARRAY_SIZE(combined_dsrc), + combined_dsrc, +}; + +data_set_t itx_ds = { + "itx", + STATIC_ARRAY_SIZE(combined_dsrc), + combined_dsrc, +}; + +data_set_t drx_ds = { + "drx", + STATIC_ARRAY_SIZE(combined_dsrc), + combined_dsrc, +}; + +data_set_t dtx_ds = { + "dtx", + STATIC_ARRAY_SIZE(combined_dsrc), + combined_dsrc, +}; diff --git a/telemetry/hicn-light-collectd/CMakeLists.txt b/telemetry/hicn-light-collectd/CMakeLists.txt new file mode 100644 index 000000000..984d7076c --- /dev/null +++ b/telemetry/hicn-light-collectd/CMakeLists.txt @@ -0,0 +1,65 @@ +# Copyright (c) 2022 Cisco and/or its affiliates. + +############################################################## +# Source files +############################################################## +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_light.c +) + + +############################################################## +# Include dirs +############################################################## +list(APPEND INCLUDE_DIRS + ${COLLECTD_INCLUDE_DIRS} + ${THIRD_PARTY_INCLUDE_DIRS} +) + + +############################################################## +# Libraries +############################################################## +find_package(Libhicn ${CURRENT_VERSION} REQUIRED NO_MODULE) +find_package(Libhicnctrl ${CURRENT_VERSION} REQUIRED NO_MODULE) + +if (DISABLE_SHARED_LIBRARIES) + set(LIBTYPE static) +else() + set(LIBTYPE shared) +endif() + +list(APPEND LIBHICN_LIBRARIES hicn::hicn.${LIBTYPE}) +list(APPEND LIBHICNCTRL_LIBRARIES hicn::hicnctrl.${LIBTYPE}) + +list (APPEND LIBRARIES + PRIVATE ${LIBHICNCTRL_LIBRARIES} + PRIVATE ${LIBHICN_LIBRARIES} +) + + +############################################################## +# Compiler options +############################################################## +list(APPEND COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ${COLLECTD_COMPILER_OPTIONS} +) + + +############################################################## +# Build library +############################################################## +build_library(${HICN_LIGHT_TELEMETRY} + SHARED + EMPTY_PREFIX + SOURCES ${SOURCE_FILES} + LINK_LIBRARIES ${LIBRARIES} + INCLUDE_DIRS + PRIVATE ${INCLUDE_DIRS} + INSTALL_FULL_PATH_DIR ${COLLECTD_PLUGIN_DIR} + COMPONENT ${COLLECTD_PLUGINS} + DEPENDS ${DEPENDENCIES} + LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} +)
\ No newline at end of file diff --git a/telemetry/hicn-light-collectd/hicn_light.c b/telemetry/hicn-light-collectd/hicn_light.c new file mode 100644 index 000000000..bb4eb571c --- /dev/null +++ b/telemetry/hicn-light-collectd/hicn_light.c @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2022 Cisco and/or its affiliates. + */ + +#define ntohll hicn_ntohll // Rename to avoid collision +#include <hicn/ctrl/api.h> +#include <hicn/ctrl/hicn-light-ng.h> +#include <hicn/util/sstrncpy.h> +#undef ntohll + +#include "../data_model.h" +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" + +#define PLUGIN_NAME "hicn_light" + +static hc_sock_t *s = NULL; + +static void submit(const char *type, value_t *values, size_t values_len, + meta_data_t *meta) { + assert(type != NULL && values != NULL && values_len != 0); + + value_list_t vl = {.values = values, .values_len = values_len}; + if (meta) vl.meta = meta; + + int rc = strcpy_s(vl.plugin, sizeof(vl.plugin), PLUGIN_NAME); + _ASSERT(rc == EOK); + rc = strcpy_s(vl.type, sizeof(vl.type), type); + _ASSERT(rc == EOK); + rc = strcpy_s(vl.host, sizeof(vl.host), hostname_g); + _ASSERT(rc == EOK); + + plugin_dispatch_values(&vl); +} + +static int read_forwarder_global_stats(hc_data_t **pdata, meta_data_t *meta) { + // Retrieve global stats from forwarder + int rc = hc_stats_get(s, pdata); + if (rc < 0) { + plugin_log(LOG_ERR, "Could not read global stats from forwarder"); + return -1; + } + hicn_light_stats_t stats = *((hicn_light_stats_t *)(*pdata)->buffer); + + // Submit values + value_t values[1]; + values[0] = (value_t){.gauge = stats.forwarder.countReceived}; + submit(pkts_processed_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countInterestsReceived}; + submit(pkts_interest_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countObjectsReceived}; + submit(pkts_data_count_ds.type, values, 1, meta); + values[0] = + (value_t){.gauge = stats.forwarder.countInterestsSatisfiedFromStore}; + submit(pkts_from_cache_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countDroppedNoReversePath}; + submit(pkts_no_pit_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countInterestsExpired}; + submit(pit_expired_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countDataExpired}; + submit(cs_expired_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.pkt_cache.n_lru_evictions}; + submit(cs_lru_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countDropped}; + submit(pkts_drop_no_buf_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countInterestsAggregated}; + submit(interests_aggregated_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.forwarder.countInterestsRetransmitted}; + submit(interests_retx_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.pkt_cache.n_pit_entries}; + submit(pit_entries_count_ds.type, values, 1, meta); + values[0] = (value_t){.gauge = stats.pkt_cache.n_cs_entries}; + submit(cs_entries_count_ds.type, values, 1, meta); + + return 0; +} + +static int read_forwarder_per_face_stats(hc_data_t **pdata, meta_data_t *meta) { + // Retrieve per-face stats from forwarder + int rc = hc_stats_list(s, pdata); + if (rc < 0) { + plugin_log(LOG_ERR, "Could not read face stats from forwarder"); + return -1; + } + hc_data_t *data = *pdata; + cmd_stats_list_item_t *conn_stats = (cmd_stats_list_item_t *)data->buffer; + cmd_stats_list_item_t *end = + (cmd_stats_list_item_t *)(data->buffer + + data->size * data->out_element_size); + + // Submit values + while (conn_stats < end) { + rc = meta_data_add_unsigned_int(meta, "face_id", conn_stats->id); + assert(rc == 0); + + value_t values[2]; + values[0] = (value_t){.derive = conn_stats->stats.interests.rx_pkts}; + values[1] = (value_t){.derive = conn_stats->stats.interests.rx_bytes}; + submit(irx_ds.type, values, 2, meta); + values[0] = (value_t){.derive = conn_stats->stats.interests.tx_pkts}; + values[1] = (value_t){.derive = conn_stats->stats.interests.tx_bytes}; + submit(itx_ds.type, values, 2, meta); + values[0] = (value_t){.derive = conn_stats->stats.data.rx_pkts}; + values[1] = (value_t){.derive = conn_stats->stats.data.rx_bytes}; + submit(drx_ds.type, values, 2, meta); + values[0] = (value_t){.derive = conn_stats->stats.data.tx_pkts}; + values[1] = (value_t){.derive = conn_stats->stats.data.tx_bytes}; + submit(dtx_ds.type, values, 2, meta); + + conn_stats++; + } + + return 0; +} + +static int read_forwarder_stats() { + // Create metadata + meta_data_t *meta = meta_data_create(); + int rc = meta_data_add_string(meta, KAFKA_TOPIC_KEY, KAFKA_STREAM_TOPIC); + assert(rc == 0); + + hc_data_t *data = NULL; + rc = read_forwarder_global_stats(&data, meta); + if (rc < 0) goto READ_ERROR; + rc = read_forwarder_per_face_stats(&data, meta); + +READ_ERROR: + meta_data_destroy(meta); + hc_data_free(data); + return rc; +} + +static int connect_to_forwarder() { + plugin_log(LOG_INFO, "Connecting to forwarder"); + s = hc_sock_create_forwarder(HICNLIGHT_NG); + if (!s) { + plugin_log(LOG_ERR, "Could not create socket"); + return -1; + } + + int rc = hc_sock_connect(s); + if (rc < 0) { + plugin_log(LOG_ERR, "Could not establish connection to forwarder"); + hc_sock_free(s); + s = NULL; + return -1; + } + + return 0; +} + +static int disconnect_from_forwarder() { + plugin_log(LOG_INFO, "Disconnecting from forwarder"); + + if (s == NULL) { + plugin_log(LOG_ERR, "Forwarder not connected"); + return -1; + } + + hc_command_t command = {0}; + command.object.connection.id = 0; + int rc = strcpy_s(command.object.connection.name, + sizeof(command.object.connection.name), "SELF"); + if (rc != EOK || hc_connection_delete(s, &command.object.connection) < 0) { + rc = -1; + plugin_log(LOG_ERR, "Error removing local connection to forwarder"); + } + + hc_sock_free(s); + return rc; +} + +void module_register() { + // Data sets + plugin_register_data_set(&pkts_processed_ds); + plugin_register_data_set(&pkts_interest_count_ds); + plugin_register_data_set(&pkts_data_count_ds); + plugin_register_data_set(&pkts_from_cache_count_ds); + plugin_register_data_set(&pkts_no_pit_count_ds); + plugin_register_data_set(&pit_expired_count_ds); + plugin_register_data_set(&cs_expired_count_ds); + plugin_register_data_set(&cs_lru_count_ds); + plugin_register_data_set(&pkts_drop_no_buf_ds); + plugin_register_data_set(&interests_aggregated_ds); + plugin_register_data_set(&interests_retx_ds); + plugin_register_data_set(&interests_hash_collision_ds); + plugin_register_data_set(&pit_entries_count_ds); + plugin_register_data_set(&cs_entries_count_ds); + plugin_register_data_set(&cs_entries_ntw_count_ds); + plugin_register_data_set(&irx_ds); + plugin_register_data_set(&itx_ds); + plugin_register_data_set(&drx_ds); + plugin_register_data_set(&dtx_ds); + + // Callbacks + plugin_register_init(PLUGIN_NAME, connect_to_forwarder); + plugin_register_read(PLUGIN_NAME, read_forwarder_stats); + plugin_register_shutdown(PLUGIN_NAME, disconnect_from_forwarder); +}
\ No newline at end of file diff --git a/telemetry/kafka-collectd/CMakeLists.txt b/telemetry/kafka-collectd/CMakeLists.txt new file mode 100644 index 000000000..f1ff81117 --- /dev/null +++ b/telemetry/kafka-collectd/CMakeLists.txt @@ -0,0 +1,62 @@ +# Copyright (c) 2022 Cisco and/or its affiliates. + +############################################################## +# Source files +############################################################## +file(GLOB_RECURSE COLLECTD_UTILS_SOURCES "${THIRD_PARTY_INCLUDE_DIRS}/utils/cmds/*.c") + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/write_kafka_line_protocol.c + ${THIRD_PARTY_INCLUDE_DIRS}/utils/format_json/format_json.c + ${THIRD_PARTY_INCLUDE_DIRS}/utils/format_graphite/format_graphite.c + ${CMAKE_CURRENT_SOURCE_DIR}/format_influxdb.c + ${COLLECTD_UTILS_SOURCES} +) + + +############################################################## +# Include dirs +############################################################## +list(APPEND INCLUDE_DIRS + PRIVATE ${COLLECTD_INCLUDE_DIRS} + PRIVATE ${THIRD_PARTY_INCLUDE_DIRS} +) + + +############################################################## +# Libraries +############################################################## +find_package(RdKafka ${RDKAFKA_DEFAULT_VERSION} REQUIRED) +find_library(YAJL_LIB libyajl.so REQUIRED) + +list (APPEND LIBRARIES + ${YAJL_LIB} + ${RdKafka_LIBRARY_PATH} +) + + +############################################################## +# Compiler options +############################################################## +list(APPEND COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ${COLLECTD_COMPILER_OPTIONS} +) + + +############################################################## +# Build library +############################################################## +build_library(${KAFKA_TELEMETRY} + SHARED + EMPTY_PREFIX + SOURCES ${SOURCE_FILES} + LINK_LIBRARIES ${LIBRARIES} + INCLUDE_DIRS + PRIVATE ${INCLUDE_DIRS} + INSTALL_FULL_PATH_DIR ${COLLECTD_PLUGIN_DIR} + COMPONENT ${COLLECTD_PLUGINS} + DEPENDS ${DEPENDENCIES} + LINK_FLAGS ${LINK_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} +) diff --git a/telemetry/kafka-collectd/format_influxdb.c b/telemetry/kafka-collectd/format_influxdb.c new file mode 100644 index 000000000..f7ff29501 --- /dev/null +++ b/telemetry/kafka-collectd/format_influxdb.c @@ -0,0 +1,190 @@ +/** + * collectd - src/utils_format_influxdb.c + * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2009 Aman Gupta + * Copyright (C) 2019 Carlos Peon Costa + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster <octo at collectd.org> + * Aman Gupta <aman at tmm1.net> + * Carlos Peon Costa <carlospeon at gmail.com> + * multiple Server directives by: + * Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com> + **/ + +#include "format_influxdb.h" + +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" +#include "utils/metadata/meta_data.h" +#include "utils_cache.h" + +static int format_influxdb_escape_string(char *buffer, size_t buffer_size, + const char *string) { + if ((buffer == NULL) || (string == NULL)) return -EINVAL; + + if (buffer_size < 3) return -ENOMEM; + + int dst_pos = 0; + +#define BUFFER_ADD(c) \ + do { \ + if (dst_pos >= (buffer_size - 1)) { \ + buffer[buffer_size - 1] = '\0'; \ + return -ENOMEM; \ + } \ + buffer[dst_pos] = (c); \ + dst_pos++; \ + } while (0) + + /* Escape special characters */ + for (int src_pos = 0; string[src_pos] != 0; src_pos++) { + if ((string[src_pos] == '\\') || (string[src_pos] == ' ') || + (string[src_pos] == ',') || (string[src_pos] == '=') || + (string[src_pos] == '"')) { + BUFFER_ADD('\\'); + BUFFER_ADD(string[src_pos]); + } else + BUFFER_ADD(string[src_pos]); + } /* for */ + buffer[dst_pos] = 0; + +#undef BUFFER_ADD + + return dst_pos; +} /* int format_influxdb_escape_string */ + +int format_influxdb_value_list( + char *buffer, int buffer_len, const data_set_t *ds, const value_list_t *vl, + bool store_rates, format_influxdb_time_precision_t time_precision) { + int status; + int offset = 0; + gauge_t *rates = NULL; + bool have_values = false; + + assert(0 == strcmp(ds->type, vl->type)); + +#define BUFFER_ADD_ESCAPE(...) \ + do { \ + status = format_influxdb_escape_string(buffer + offset, \ + buffer_len - offset, __VA_ARGS__); \ + if (status < 0) return status; \ + offset += status; \ + } while (0) + +#define BUFFER_ADD(...) \ + do { \ + status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__); \ + if ((status < 0) || (status >= (buffer_len - offset))) { \ + sfree(rates); \ + return -ENOMEM; \ + } \ + offset += status; \ + } while (0) + + assert(vl->type); + BUFFER_ADD_ESCAPE(vl->type); + BUFFER_ADD(",host="); + BUFFER_ADD_ESCAPE(vl->host); + if (vl->meta) { + char **toc; + int n = meta_data_toc(vl->meta, &toc); + + for (int i = 0; i < n; i++) { + char *key = toc[i]; + char *value; + + if (meta_data_as_string(vl->meta, key, &value) == 0) { + BUFFER_ADD(","); + BUFFER_ADD_ESCAPE(key); + BUFFER_ADD("="); + BUFFER_ADD_ESCAPE(value); + free(value); + } + free(toc[i]); + } + + if (n != 0) free(toc); + } + + BUFFER_ADD(" "); + for (size_t i = 0; i < ds->ds_num; i++) { + if ((ds->ds[i].type != DS_TYPE_COUNTER) && + (ds->ds[i].type != DS_TYPE_GAUGE) && + (ds->ds[i].type != DS_TYPE_DERIVE) && + (ds->ds[i].type != DS_TYPE_ABSOLUTE)) { + sfree(rates); + return -EINVAL; + } + + if (ds->ds[i].type == DS_TYPE_GAUGE) { + if (isnan(vl->values[i].gauge)) continue; + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge); + have_values = true; + } else if (store_rates) { + if (rates == NULL) rates = uc_get_rate(ds, vl); + if (rates == NULL) { + WARNING( + "format_influxdb: " + "uc_get_rate failed."); + return -EINVAL; + } + if (isnan(rates[i])) continue; + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_COUNTER) { + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, + (uint64_t)vl->values[i].counter); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_DERIVE) { + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive); + have_values = true; + } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) { + if (have_values) BUFFER_ADD(","); + BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute); + have_values = true; + } + + } /* for ds->ds_num */ + sfree(rates); + + if (!have_values) return 0; + + uint64_t influxdb_time = 0; + switch (time_precision) { + case NS: + influxdb_time = CDTIME_T_TO_NS(vl->time); + break; + case US: + influxdb_time = CDTIME_T_TO_US(vl->time); + break; + case MS: + influxdb_time = CDTIME_T_TO_MS(vl->time); + break; + } + + BUFFER_ADD(" %" PRIu64 "\n", influxdb_time); + +#undef BUFFER_ADD_ESCAPE +#undef BUFFER_ADD + + return offset; +} /* int format_influxdb_value_list */ diff --git a/telemetry/kafka-collectd/format_influxdb.h b/telemetry/kafka-collectd/format_influxdb.h new file mode 100644 index 000000000..fd298f11b --- /dev/null +++ b/telemetry/kafka-collectd/format_influxdb.h @@ -0,0 +1,45 @@ +/** + * collectd - src/utils_format_influxdb.h + * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2009 Aman Gupta + * Copyright (C) 2019 Carlos Peon Costa + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster <octo at collectd.org> + * Aman Gupta <aman at tmm1.net> + * Carlos Peon Costa <carlospeon at gmail.com> + * multiple Server directives by: + * Paul (systemcrash) <newtwen thatfunny_at_symbol gmail.com> + **/ + +#ifndef UTILS_FORMAT_INFLUXDB_H +#define UTILS_FORMAT_INFLUXDB_H 1 + +#include "collectd.h" +#include "plugin.h" + +typedef enum { + NS, + US, + MS, +} format_influxdb_time_precision_t; + +int format_influxdb_value_list(char *buffer, int buffer_len, + const data_set_t *ds, const value_list_t *vl, + bool store_rates, + format_influxdb_time_precision_t time_precision); + +#endif /* UTILS_FORMAT_INFLUXDB_H */ diff --git a/telemetry/kafka-collectd/write_kafka_line_protocol.c b/telemetry/kafka-collectd/write_kafka_line_protocol.c new file mode 100644 index 000000000..5eb7b520d --- /dev/null +++ b/telemetry/kafka-collectd/write_kafka_line_protocol.c @@ -0,0 +1,526 @@ +/** + * collectd - src/write_kafka.c + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Pierre-Yves Ritschard <pyr at spootnik.org> + */ + +#include <errno.h> +#include <librdkafka/rdkafka.h> +#include <stdint.h> + +#include "../data_model.h" +#include "collectd.h" +#include "format_influxdb.h" +#include "plugin.h" +#include "utils/cmds/putval.h" +#include "utils/common/common.h" +#include "utils/format_graphite/format_graphite.h" +#include "utils/format_json/format_json.h" +#include "utils_random.h" + +struct kafka_topic_context { +#define KAFKA_FORMAT_JSON 0 +#define KAFKA_FORMAT_COMMAND 1 +#define KAFKA_FORMAT_GRAPHITE 2 +#define KAFKA_FORMAT_INFLUXDB 3 + uint8_t format; + unsigned int graphite_flags; + bool store_rates; + rd_kafka_topic_conf_t *conf; + rd_kafka_topic_t *topic; + rd_kafka_conf_t *kafka_conf; + rd_kafka_t *kafka; + char *key; + char *prefix; + char *postfix; + char escape_char; + char *topic_name; + pthread_mutex_t lock; +}; + +static int kafka_handle(struct kafka_topic_context *); +static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); +static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, + int32_t, void *, void *); + +/* Version 0.9.0 of librdkafka deprecates rd_kafka_set_logger() in favor of + * rd_kafka_conf_set_log_cb(). This is to make sure we're not using the + * deprecated function. */ +#ifdef HAVE_LIBRDKAFKA_LOG_CB +#undef HAVE_LIBRDKAFKA_LOGGER +#endif + +#if defined(HAVE_LIBRDKAFKA_LOGGER) || defined(HAVE_LIBRDKAFKA_LOG_CB) +static void kafka_log(const rd_kafka_t *, int, const char *, const char *); + +static void kafka_log(const rd_kafka_t *rkt, int level, const char *fac, + const char *msg) { + plugin_log(level, "%s", msg); +} +#endif + +static rd_kafka_resp_err_t kafka_error() { +#if RD_KAFKA_VERSION >= 0x000b00ff + return rd_kafka_last_error(); +#else + return rd_kafka_errno2err(errno); +#endif +} + +static uint32_t kafka_hash(const char *keydata, size_t keylen) { + uint32_t hash = 5381; + for (; keylen > 0; keylen--) + hash = ((hash << 5) + hash) + keydata[keylen - 1]; + return hash; +} + +/* 31 bit -> 4 byte -> 8 byte hex string + null byte */ +#define KAFKA_RANDOM_KEY_SIZE 9 +#define KAFKA_RANDOM_KEY_BUFFER \ + (char[KAFKA_RANDOM_KEY_SIZE]) { "" } +static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE]) { + ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u()); + return buffer; +} + +static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata, + size_t keylen, int32_t partition_cnt, void *p, + void *m) { + uint32_t key = kafka_hash(keydata, keylen); + uint32_t target = key % partition_cnt; + int32_t i = partition_cnt; + + while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) { + target = (target + 1) % partition_cnt; + } + return target; +} + +static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */ +{ + char errbuf[1024]; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + + if (ctx->kafka != NULL && ctx->topic != NULL) return 0; + + if (ctx->kafka == NULL) { + if ((conf = rd_kafka_conf_dup(ctx->kafka_conf)) == NULL) { + ERROR("write_kafka plugin: cannot duplicate kafka config"); + return 1; + } + + if ((ctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf, + sizeof(errbuf))) == NULL) { + ERROR("write_kafka plugin: cannot create kafka handle."); + return 1; + } + + rd_kafka_conf_destroy(ctx->kafka_conf); + ctx->kafka_conf = NULL; + + INFO("write_kafka plugin: created KAFKA handle : %s", + rd_kafka_name(ctx->kafka)); + +#if defined(HAVE_LIBRDKAFKA_LOGGER) && !defined(HAVE_LIBRDKAFKA_LOG_CB) + rd_kafka_set_logger(ctx->kafka, kafka_log); +#endif + } + + if (ctx->topic == NULL) { + if ((topic_conf = rd_kafka_topic_conf_dup(ctx->conf)) == NULL) { + ERROR("write_kafka plugin: cannot duplicate kafka topic config"); + return 1; + } + + if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name, + topic_conf)) == NULL) { + ERROR("write_kafka plugin: cannot create topic : %s\n", + rd_kafka_err2str(kafka_error())); + return errno; + } + + rd_kafka_topic_conf_destroy(ctx->conf); + ctx->conf = NULL; + + INFO("write_kafka plugin: handle created for topic : %s", + rd_kafka_topic_name(ctx->topic)); + } + + return 0; + +} /* }}} int kafka_handle */ + +static int kafka_write(const data_set_t *ds, /* {{{ */ + const value_list_t *vl, user_data_t *ud) { + int status = 0; + void *key; + size_t keylen = 0; + char buffer[8192]; + size_t bfree = sizeof(buffer); + size_t bfill = 0; + size_t blen = 0; + struct kafka_topic_context *ctx = ud->data; + + if ((ds == NULL) || (vl == NULL) || (ctx == NULL)) return EINVAL; + + pthread_mutex_lock(&ctx->lock); + status = kafka_handle(ctx); + pthread_mutex_unlock(&ctx->lock); + if (status != 0) return status; + + bzero(buffer, sizeof(buffer)); + + switch (ctx->format) { + case KAFKA_FORMAT_COMMAND: + status = cmd_create_putval(buffer, sizeof(buffer), ds, vl); + if (status != 0) { + ERROR("write_kafka plugin: cmd_create_putval failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + break; + case KAFKA_FORMAT_JSON: + format_json_initialize(buffer, &bfill, &bfree); + format_json_value_list(buffer, &bfill, &bfree, ds, vl, ctx->store_rates); + format_json_finalize(buffer, &bfill, &bfree); + blen = strlen(buffer); + break; + case KAFKA_FORMAT_GRAPHITE: + status = + format_graphite(buffer, sizeof(buffer), ds, vl, ctx->prefix, + ctx->postfix, ctx->escape_char, ctx->graphite_flags); + if (status != 0) { + ERROR("write_kafka plugin: format_graphite failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + break; + case KAFKA_FORMAT_INFLUXDB: { + // Decide format depending on the topic + // (to handle multilple topics w/ different formats); + // Comment this part to use the stream topic as default for other + // collectd plugins (e.g. cpu, mem) + char *topic = NULL; + int rc = meta_data_get_string(vl->meta, KAFKA_TOPIC_KEY, &topic); + if (rc != 0 || topic == NULL) return 0; + if (strcasecmp(KAFKA_STREAM_TOPIC, topic) != 0) { + free(topic); + return 0; + } + meta_data_delete(vl->meta, KAFKA_TOPIC_KEY); + + status = + format_influxdb_value_list(buffer, sizeof(buffer), ds, vl, false, NS); + if (status <= 0) { + ERROR("write_kafka plugin: format_influxdb failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + + // Print without newline + buffer[blen - 1] = 0; + INFO("%s", buffer); + buffer[blen - 1] = '\n'; + + break; + } + default: + ERROR("write_kafka plugin: invalid format %i.", ctx->format); + return -1; + } + + key = + (ctx->key != NULL) ? ctx->key : kafka_random_key(KAFKA_RANDOM_KEY_BUFFER); + keylen = strlen(key); + + rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, + buffer, blen, key, keylen, NULL); + + return status; +} /* }}} int kafka_write */ + +static void kafka_topic_context_free(void *p) /* {{{ */ +{ + struct kafka_topic_context *ctx = p; + + if (ctx == NULL) return; + + if (ctx->topic_name != NULL) sfree(ctx->topic_name); + if (ctx->topic != NULL) rd_kafka_topic_destroy(ctx->topic); + if (ctx->conf != NULL) rd_kafka_topic_conf_destroy(ctx->conf); + if (ctx->kafka_conf != NULL) rd_kafka_conf_destroy(ctx->kafka_conf); + if (ctx->kafka != NULL) rd_kafka_destroy(ctx->kafka); + + sfree(ctx); +} /* }}} void kafka_topic_context_free */ + +static void kafka_config_topic(rd_kafka_conf_t *conf, + oconfig_item_t *ci) /* {{{ */ +{ + int status; + struct kafka_topic_context *tctx; + char *key = NULL; + char *val; + char callback_name[DATA_MAX_NAME_LEN]; + char errbuf[1024]; + oconfig_item_t *child; + rd_kafka_conf_res_t ret; + + if ((tctx = calloc(1, sizeof(*tctx))) == NULL) { + ERROR("write_kafka plugin: calloc failed."); + return; + } + + tctx->escape_char = '.'; + tctx->store_rates = true; + tctx->format = KAFKA_FORMAT_JSON; + tctx->key = NULL; + + if ((tctx->kafka_conf = rd_kafka_conf_dup(conf)) == NULL) { + sfree(tctx); + ERROR("write_kafka plugin: cannot allocate memory for kafka config"); + return; + } + +#ifdef HAVE_LIBRDKAFKA_LOG_CB + rd_kafka_conf_set_log_cb(tctx->kafka_conf, kafka_log); +#endif + + if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) { + rd_kafka_conf_destroy(tctx->kafka_conf); + sfree(tctx); + ERROR("write_kafka plugin: cannot create topic configuration."); + return; + } + + if (ci->values_num != 1) { + WARNING("kafka topic name needed."); + goto errout; + } + + if (ci->values[0].type != OCONFIG_TYPE_STRING) { + WARNING("kafka topic needs a string argument."); + goto errout; + } + + if ((tctx->topic_name = strdup(ci->values[0].value.string)) == NULL) { + ERROR("write_kafka plugin: cannot copy topic name."); + goto errout; + } + + for (int i = 0; i < ci->children_num; i++) { + /* + * The code here could be simplified but makes room + * for easy adding of new options later on. + */ + child = &ci->children[i]; + status = 0; + + if (strcasecmp("Property", child->key) == 0) { + if (child->values_num != 2) { + WARNING("kafka properties need both a key and a value."); + goto errout; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("kafka properties needs string arguments."); + goto errout; + } + key = child->values[0].value.string; + val = child->values[1].value.string; + ret = + rd_kafka_topic_conf_set(tctx->conf, key, val, errbuf, sizeof(errbuf)); + if (ret != RD_KAFKA_CONF_OK) { + WARNING("cannot set kafka topic property %s to %s: %s.", key, val, + errbuf); + goto errout; + } + + } else if (strcasecmp("Key", child->key) == 0) { + if (cf_util_get_string(child, &tctx->key) != 0) continue; + if (strcasecmp("Random", tctx->key) == 0) { + sfree(tctx->key); + tctx->key = strdup(kafka_random_key(KAFKA_RANDOM_KEY_BUFFER)); + } + } else if (strcasecmp("Format", child->key) == 0) { + status = cf_util_get_string(child, &key); + if (status != 0) goto errout; + + assert(key != NULL); + + if (strcasecmp(key, "Command") == 0) { + tctx->format = KAFKA_FORMAT_COMMAND; + + } else if (strcasecmp(key, "Graphite") == 0) { + tctx->format = KAFKA_FORMAT_GRAPHITE; + + } else if (strcasecmp(key, "Json") == 0) { + tctx->format = KAFKA_FORMAT_JSON; + + } else if (strcasecmp(key, "InfluxDB") == 0) { + tctx->format = KAFKA_FORMAT_INFLUXDB; + + } else { + WARNING("write_kafka plugin: Invalid format string: %s", key); + } + + sfree(key); + + } else if (strcasecmp("StoreRates", child->key) == 0) { + status = cf_util_get_boolean(child, &tctx->store_rates); + (void)cf_util_get_flag(child, &tctx->graphite_flags, + GRAPHITE_STORE_RATES); + + } else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0) { + status = cf_util_get_flag(child, &tctx->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + + } else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0) { + status = cf_util_get_flag(child, &tctx->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); + + } else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0) { + status = cf_util_get_flag(child, &tctx->graphite_flags, + GRAPHITE_PRESERVE_SEPARATOR); + + } else if (strcasecmp("GraphiteUseTags", child->key) == 0) { + status = + cf_util_get_flag(child, &tctx->graphite_flags, GRAPHITE_USE_TAGS); + + } else if (strcasecmp("GraphitePrefix", child->key) == 0) { + status = cf_util_get_string(child, &tctx->prefix); + } else if (strcasecmp("GraphitePostfix", child->key) == 0) { + status = cf_util_get_string(child, &tctx->postfix); + } else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) { + char *tmp_buff = NULL; + status = cf_util_get_string(child, &tmp_buff); + if (strlen(tmp_buff) > 1) + WARNING( + "write_kafka plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + tctx->escape_char = tmp_buff[0]; + sfree(tmp_buff); + } else { + WARNING("write_kafka plugin: Invalid directive: %s.", child->key); + } + + if (status != 0) break; + } + + rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition); + rd_kafka_topic_conf_set_opaque(tctx->conf, tctx); + + ssnprintf(callback_name, sizeof(callback_name), "write_kafka/%s", + tctx->topic_name); + + status = plugin_register_write(callback_name, kafka_write, + &(user_data_t){ + .data = tctx, + .free_func = kafka_topic_context_free, + }); + if (status != 0) { + WARNING( + "write_kafka plugin: plugin_register_write (\"%s\") " + "failed with status %i.", + callback_name, status); + goto errout; + } + + pthread_mutex_init(&tctx->lock, /* attr = */ NULL); + + return; +errout: + if (tctx->topic_name != NULL) free(tctx->topic_name); + if (tctx->conf != NULL) rd_kafka_topic_conf_destroy(tctx->conf); + if (tctx->kafka_conf != NULL) rd_kafka_conf_destroy(tctx->kafka_conf); + sfree(tctx); +} /* }}} int kafka_config_topic */ + +static int kafka_config(oconfig_item_t *ci) /* {{{ */ +{ + oconfig_item_t *child; + rd_kafka_conf_t *conf; + rd_kafka_conf_res_t ret; + char errbuf[1024]; + + if ((conf = rd_kafka_conf_new()) == NULL) { + WARNING("cannot allocate kafka configuration."); + return -1; + } + for (int i = 0; i < ci->children_num; i++) { + child = &ci->children[i]; + + if (strcasecmp("Topic", child->key) == 0) { + kafka_config_topic(conf, child); + } else if (strcasecmp(child->key, "Property") == 0) { + char *key = NULL; + char *val = NULL; + + if (child->values_num != 2) { + WARNING("kafka properties need both a key and a value."); + goto errout; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("kafka properties needs string arguments."); + goto errout; + } + if ((key = strdup(child->values[0].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute key."); + goto errout; + } + if ((val = strdup(child->values[1].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute value."); + sfree(key); + goto errout; + } + ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf)); + if (ret != RD_KAFKA_CONF_OK) { + WARNING("cannot set kafka property %s to %s: %s", key, val, errbuf); + sfree(key); + sfree(val); + goto errout; + } + sfree(key); + sfree(val); + } else { + WARNING( + "write_kafka plugin: Ignoring unknown " + "configuration option \"%s\" at top level.", + child->key); + } + } + if (conf != NULL) rd_kafka_conf_destroy(conf); + return 0; +errout: + if (conf != NULL) rd_kafka_conf_destroy(conf); + return -1; +} /* }}} int kafka_config */ + +void module_register(void) { + plugin_register_complex_config("write_kafka_line_protocol", kafka_config); +} diff --git a/telemetry/third-party/CMakeLists.txt b/telemetry/third-party/CMakeLists.txt new file mode 100644 index 000000000..26d305d62 --- /dev/null +++ b/telemetry/third-party/CMakeLists.txt @@ -0,0 +1,18 @@ +# Copyright (c) 2022 Cisco and/or its affiliates. + +set(THIRD_PARTY_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR}) + +include(FetchContent) +set(FETCHCONTENT_QUIET off) +FetchContent_Declare(collectd + URL https://github.com/collectd/collectd/archive/refs/tags/collectd-${COLLECTD_VERSION}.zip + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" +) +FetchContent_Populate(collectd) + +list(APPEND THIRD_PARTY_INCLUDE_DIRS + ${collectd_SOURCE_DIR}/src +) +set(THIRD_PARTY_INCLUDE_DIRS ${THIRD_PARTY_INCLUDE_DIRS} PARENT_SCOPE) diff --git a/telemetry/vpp-collectd/CMakeLists.txt b/telemetry/vpp-collectd/CMakeLists.txt index 54a1a4b76..4ce357f63 100644 --- a/telemetry/vpp-collectd/CMakeLists.txt +++ b/telemetry/vpp-collectd/CMakeLists.txt @@ -11,34 +11,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -project(hicn-collectd-plugins) - - -############################################################## -# CMake Modules -############################################################## -set(CMAKE_MODULE_PATH - ${CMAKE_MODULE_PATH} - ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake/Modules/) - - ############################################################## # Dependencies ############################################################## find_package(Vpp ${VPP_DEFAULT_VERSION} REQUIRED) -find_package(Collectd ${COLLECTD_DEFAULT_VERSION} REQUIRED) - - -############################################################## -# Libs and bins names -############################################################## -set(COLLECTD_PLUGINS hicn-collectd-plugins) - - -############################################################## -# Packaging -############################################################## -include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/packaging.cmake) ############################################################## diff --git a/telemetry/vpp-collectd/common/README.md b/telemetry/vpp-collectd/common/README.md deleted file mode 100644 index e3b9c74f6..000000000 --- a/telemetry/vpp-collectd/common/README.md +++ /dev/null @@ -1,12 +0,0 @@ -# Headers for collectd plugins - -These headers are required for plugin development but are not shipped with the -`collectd` Ubuntu 20.04 package (as of May 2021): - -* [common.h](https://github.com/collectd/collectd/blob/main/src/utils/common/common.h) -* [plugin.h](https://github.com/collectd/collectd/blob/main/src/daemon/plugin.h) -* [meta_data.h](https://github.com/collectd/collectd/blob/main/src/utils/metadata/meta_data.h) - -Related issues: -* [GitHub](https://github.com/collectd/collectd/issues/3881) -* [Ubuntu](https://bugs.launchpad.net/ubuntu/+source/collectd/+bug/1929079) diff --git a/telemetry/vpp-collectd/common/common.h b/telemetry/vpp-collectd/common/common.h deleted file mode 100644 index fce2d12bb..000000000 --- a/telemetry/vpp-collectd/common/common.h +++ /dev/null @@ -1,405 +0,0 @@ -/** - * collectd - src/common.h - * Copyright (C) 2005-2014 Florian octo Forster - * - * Permission is hereby granted, free of charge, to any person obtaining a - * copy of this software and associated documentation files (the "Software"), - * to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, - * and/or sell copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER - * DEALINGS IN THE SOFTWARE. - * - * Authors: - * Florian octo Forster <octo at collectd.org> - * Niki W. Waibel <niki.waibel@gmx.net> - **/ - -#ifndef COMMON_H -#define COMMON_H - -#include "collectd.h" - -#include "plugin.h" - -#if HAVE_PWD_H -#include <pwd.h> -#endif - -#define sfree(ptr) \ - do { \ - free(ptr); \ - (ptr) = NULL; \ - } while (0) - -#define STATIC_ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a))) - -#define IS_TRUE(s) \ - ((strcasecmp("true", (s)) == 0) || (strcasecmp("yes", (s)) == 0) || \ - (strcasecmp("on", (s)) == 0)) -#define IS_FALSE(s) \ - ((strcasecmp("false", (s)) == 0) || (strcasecmp("no", (s)) == 0) || \ - (strcasecmp("off", (s)) == 0)) - -struct rate_to_value_state_s { - value_t last_value; - cdtime_t last_time; - gauge_t residual; -}; -typedef struct rate_to_value_state_s rate_to_value_state_t; - -struct value_to_rate_state_s { - value_t last_value; - cdtime_t last_time; -}; -typedef struct value_to_rate_state_s value_to_rate_state_t; - -char *sstrncpy(char *dest, const char *src, size_t n); - -__attribute__((format(printf, 3, 4))) int ssnprintf(char *str, size_t size, - char const *format, ...); - -__attribute__((format(printf, 1, 2))) char *ssnprintf_alloc(char const *format, - ...); - -char *sstrdup(const char *s); -size_t sstrnlen(const char *s, size_t n); -char *sstrndup(const char *s, size_t n); -void *smalloc(size_t size); -char *sstrerror(int errnum, char *buf, size_t buflen); - -#ifndef ERRBUF_SIZE -#define ERRBUF_SIZE 256 -#endif - -#define STRERROR(e) sstrerror((e), (char[ERRBUF_SIZE]){0}, ERRBUF_SIZE) -#define STRERRNO STRERROR(errno) - -/* - * NAME - * sread - * - * DESCRIPTION - * Reads exactly `n' bytes or fails. Syntax and other behavior is analogous - * to `read(2)'. - * - * PARAMETERS - * `fd' File descriptor to write to. - * `buf' Buffer that is to be written. - * `count' Number of bytes in the buffer. - * - * RETURN VALUE - * Zero upon success or non-zero if an error occurred. `errno' is set in this - * case. - */ -int sread(int fd, void *buf, size_t count); - -/* - * NAME - * swrite - * - * DESCRIPTION - * Writes exactly `n' bytes or fails. Syntax and other behavior is analogous - * to `write(2)'. - * - * PARAMETERS - * `fd' File descriptor to write to. - * `buf' Buffer that is to be written. - * `count' Number of bytes in the buffer. - * - * RETURN VALUE - * Zero upon success or non-zero if an error occurred. `errno' is set in this - * case. - */ -int swrite(int fd, const void *buf, size_t count); - -/* - * NAME - * strsplit - * - * DESCRIPTION - * Splits a string into parts and stores pointers to the parts in `fields'. - * The characters split at are: " ", "\t", "\r", and "\n". - * - * PARAMETERS - * `string' String to split. This string will be modified. `fields' will - * contain pointers to parts of this string, so free'ing it - * will destroy `fields' as well. - * `fields' Array of strings where pointers to the parts will be stored. - * `size' Number of elements in the array. No more than `size' - * pointers will be stored in `fields'. - * - * RETURN VALUE - * Returns the number of parts stored in `fields'. - */ -int strsplit(char *string, char **fields, size_t size); - -/* - * NAME - * strjoin - * - * DESCRIPTION - * Joins together several parts of a string using `sep' as a separator. This - * is equivalent to the Perl built-in `join'. - * - * PARAMETERS - * `dst' Buffer where the result is stored. Can be NULL if you need to - * determine the required buffer size only. - * `dst_len' Length of the destination buffer. No more than this many - * bytes will be written to the memory pointed to by `dst', - * including the trailing null-byte. Must be zero if dst is - * NULL. - * `fields' Array of strings to be joined. - * `fields_num' Number of elements in the `fields' array. - * `sep' String to be inserted between any two elements of `fields'. - * This string is neither prepended nor appended to the result. - * Instead of passing "" (empty string) one can pass NULL. - * - * RETURN VALUE - * Returns the number of characters in the resulting string, excluding a - * tailing null byte. If this value is greater than or equal to "dst_len", the - * result in "dst" is truncated (but still null terminated). On error a - * negative value is returned. - */ -int strjoin(char *dst, size_t dst_len, char **fields, size_t fields_num, - const char *sep); - -/* - * NAME - * escape_slashes - * - * DESCRIPTION - * Removes slashes ("/") from "buffer". If buffer contains a single slash, - * the result will be "root". Leading slashes are removed. All other slashes - * are replaced with underscores ("_"). - * This function is used by plugin_dispatch_values() to escape all parts of - * the identifier. - * - * PARAMETERS - * `buffer' String to be escaped. - * `buffer_size' Size of the buffer. No more then this many bytes will be - * written to `buffer', including the trailing null-byte. - * - * RETURN VALUE - * Returns zero upon success and a value smaller than zero upon failure. - */ -int escape_slashes(char *buffer, size_t buffer_size); - -/** - * NAME - * escape_string - * - * DESCRIPTION - * escape_string quotes and escapes a string to be usable with collectd's - * plain text protocol. "simple" strings are left as they are, for example if - * buffer is 'simple' before the call, it will remain 'simple'. However, if - * buffer contains 'more "complex"' before the call, the returned buffer will - * contain '"more \"complex\""'. - * - * If the buffer is too small to contain the escaped string, the string will - * be truncated. However, leading and trailing double quotes, as well as an - * ending null byte are guaranteed. - * - * RETURN VALUE - * Returns zero on success, even if the string was truncated. Non-zero on - * failure. - */ -int escape_string(char *buffer, size_t buffer_size); - -/* - * NAME - * replace_special - * - * DESCRIPTION - * Replaces any special characters (anything that's not alpha-numeric or a - * dash) with an underscore. - * - * E.g. "foo$bar&" would become "foo_bar_". - * - * PARAMETERS - * `buffer' String to be handled. - * `buffer_size' Length of the string. The function returns after - * encountering a null-byte or reading this many bytes. - */ -void replace_special(char *buffer, size_t buffer_size); - -/* - * NAME - * strunescape - * - * DESCRIPTION - * Replaces any escaped characters in a string with the appropriate special - * characters. The following escaped characters are recognized: - * - * \t -> <tab> - * \n -> <newline> - * \r -> <carriage return> - * - * For all other escacped characters only the backslash will be removed. - * - * PARAMETERS - * `buf' String to be unescaped. - * `buf_len' Length of the string, including the terminating null-byte. - * - * RETURN VALUE - * Returns zero upon success, a value less than zero else. - */ -int strunescape(char *buf, size_t buf_len); - -/** - * Removed trailing newline characters (CR and LF) from buffer, which must be - * null terminated. Returns the length of the resulting string. - */ -__attribute__((nonnull(1))) size_t strstripnewline(char *buffer); - -/* - * NAME - * timeval_cmp - * - * DESCRIPTION - * Compare the two time values `tv0' and `tv1' and store the absolut value - * of the difference in the time value pointed to by `delta' if it does not - * equal NULL. - * - * RETURN VALUE - * Returns an integer less than, equal to, or greater than zero if `tv0' is - * less than, equal to, or greater than `tv1' respectively. - */ -int timeval_cmp(struct timeval tv0, struct timeval tv1, struct timeval *delta); - -/* make sure tv_usec stores less than a second */ -#define NORMALIZE_TIMEVAL(tv) \ - do { \ - (tv).tv_sec += (tv).tv_usec / 1000000; \ - (tv).tv_usec = (tv).tv_usec % 1000000; \ - } while (0) - -/* make sure tv_sec stores less than a second */ -#define NORMALIZE_TIMESPEC(tv) \ - do { \ - (tv).tv_sec += (tv).tv_nsec / 1000000000; \ - (tv).tv_nsec = (tv).tv_nsec % 1000000000; \ - } while (0) - -int check_create_dir(const char *file_orig); - -#ifdef HAVE_LIBKSTAT -#if HAVE_KSTAT_H -#include <kstat.h> -#endif -int get_kstat(kstat_t **ksp_ptr, char *module, int instance, char *name); -long long get_kstat_value(kstat_t *ksp, char *name); -#endif - -#ifndef HAVE_HTONLL -unsigned long long ntohll(unsigned long long n); -unsigned long long htonll(unsigned long long n); -#endif - -#if FP_LAYOUT_NEED_NOTHING -#define ntohd(d) (d) -#define htond(d) (d) -#elif FP_LAYOUT_NEED_ENDIANFLIP || FP_LAYOUT_NEED_INTSWAP -double ntohd(double d); -double htond(double d); -#else -#error \ - "Don't know how to convert between host and network representation of doubles." -#endif - -int format_name(char *ret, int ret_len, const char *hostname, - const char *plugin, const char *plugin_instance, - const char *type, const char *type_instance); -#define FORMAT_VL(ret, ret_len, vl) \ - format_name(ret, ret_len, (vl)->host, (vl)->plugin, (vl)->plugin_instance, \ - (vl)->type, (vl)->type_instance) -int format_values(char *ret, size_t ret_len, const data_set_t *ds, - const value_list_t *vl, bool store_rates); - -int parse_identifier(char *str, char **ret_host, char **ret_plugin, - char **ret_plugin_instance, char **ret_type, - char **ret_type_instance, char *default_host); -int parse_identifier_vl(const char *str, value_list_t *vl); -int parse_value(const char *value, value_t *ret_value, int ds_type); -int parse_values(char *buffer, value_list_t *vl, const data_set_t *ds); - -/* parse_value_file reads "path" and parses its content as an integer or - * floating point, depending on "ds_type". On success, the value is stored in - * "ret_value" and zero is returned. On failure, a non-zero value is returned. - */ -int parse_value_file(char const *path, value_t *ret_value, int ds_type); - -#if !HAVE_GETPWNAM_R -struct passwd; -int getpwnam_r(const char *name, struct passwd *pwbuf, char *buf, size_t buflen, - struct passwd **pwbufp); -#endif - -int notification_init(notification_t *n, int severity, const char *message, - const char *host, const char *plugin, - const char *plugin_instance, const char *type, - const char *type_instance); -#define NOTIFICATION_INIT_VL(n, vl) \ - notification_init(n, NOTIF_FAILURE, NULL, (vl)->host, (vl)->plugin, \ - (vl)->plugin_instance, (vl)->type, (vl)->type_instance) - -typedef int (*dirwalk_callback_f)(const char *dirname, const char *filename, - void *user_data); -int walk_directory(const char *dir, dirwalk_callback_f callback, - void *user_data, int hidden); -/* Returns the number of bytes read or negative on error. */ -ssize_t read_file_contents(char const *filename, void *buf, size_t bufsize); -/* Writes the contents of the file into the buffer with a trailing NUL. - * Returns the number of bytes written to the buffer or negative on error. */ -ssize_t read_text_file_contents(char const *filename, char *buf, - size_t bufsize); - -counter_t counter_diff(counter_t old_value, counter_t new_value); - -/* Convert a rate back to a value_t. When converting to a derive_t, counter_t - * or absolute_t, take fractional residuals into account. This is important - * when scaling counters, for example. - * Returns zero on success. Returns EAGAIN when called for the first time; in - * this case the value_t is invalid and the next call should succeed. Other - * return values indicate an error. */ -int rate_to_value(value_t *ret_value, gauge_t rate, - rate_to_value_state_t *state, int ds_type, cdtime_t t); - -int value_to_rate(gauge_t *ret_rate, value_t value, int ds_type, cdtime_t t, - value_to_rate_state_t *state); - -/* Converts a service name (a string) to a port number - * (in the range [1-65535]). Returns less than zero on error. */ -int service_name_to_port_number(const char *service_name); - -/* Sets various, non-default, socket options */ -void set_sock_opts(int sockfd); - -/** Parse a string to a derive_t value. Returns zero on success or non-zero on - * failure. If failure is returned, ret_value is not touched. */ -int strtoderive(const char *string, derive_t *ret_value); - -/** Parse a string to a gauge_t value. Returns zero on success or non-zero on - * failure. If failure is returned, ret_value is not touched. */ -int strtogauge(const char *string, gauge_t *ret_value); - -int strarray_add(char ***ret_array, size_t *ret_array_len, char const *str); -void strarray_free(char **array, size_t array_len); - -/** Check if the current process benefits from the capability passed in - * argument. Returns zero if it does, less than zero if it doesn't or on error. - * See capabilities(7) for the list of possible capabilities. - * */ -int check_capability(int arg); - -#endif /* COMMON_H */ diff --git a/telemetry/vpp-collectd/common/meta_data.h b/telemetry/vpp-collectd/common/meta_data.h deleted file mode 100644 index 203b14607..000000000 --- a/telemetry/vpp-collectd/common/meta_data.h +++ /dev/null @@ -1,71 +0,0 @@ -/** - * collectd - src/meta_data.h - * Copyright (C) 2008-2011 Florian octo Forster - * - * Permission is hereby granted, free of charge, to any person obtaining a - * copy of this software and associated documentation files (the "Software"), - * to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, - * and/or sell copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER - * DEALINGS IN THE SOFTWARE. - * - * Authors: - * Florian octo Forster <octo at collectd.org> - **/ - -#ifndef META_DATA_H -#define META_DATA_H - -#include "collectd.h" - -/* - * Defines - */ -#define MD_TYPE_STRING 1 -#define MD_TYPE_SIGNED_INT 2 -#define MD_TYPE_UNSIGNED_INT 3 -#define MD_TYPE_DOUBLE 4 -#define MD_TYPE_BOOLEAN 5 - -struct meta_data_s; -typedef struct meta_data_s meta_data_t; - -meta_data_t *meta_data_create(void); -meta_data_t *meta_data_clone(meta_data_t *orig); -int meta_data_clone_merge(meta_data_t **dest, meta_data_t *orig); -void meta_data_destroy(meta_data_t *md); - -int meta_data_exists(meta_data_t *md, const char *key); -int meta_data_type(meta_data_t *md, const char *key); -int meta_data_toc(meta_data_t *md, char ***toc); -int meta_data_delete(meta_data_t *md, const char *key); - -int meta_data_add_string(meta_data_t *md, const char *key, const char *value); -int meta_data_add_signed_int(meta_data_t *md, const char *key, int64_t value); -int meta_data_add_unsigned_int(meta_data_t *md, const char *key, - uint64_t value); -int meta_data_add_double(meta_data_t *md, const char *key, double value); -int meta_data_add_boolean(meta_data_t *md, const char *key, bool value); - -int meta_data_get_string(meta_data_t *md, const char *key, char **value); -int meta_data_get_signed_int(meta_data_t *md, const char *key, int64_t *value); -int meta_data_get_unsigned_int(meta_data_t *md, const char *key, - uint64_t *value); -int meta_data_get_double(meta_data_t *md, const char *key, double *value); -int meta_data_get_boolean(meta_data_t *md, const char *key, bool *value); - -/* Returns the value as a string, regardless of the type. */ -int meta_data_as_string(meta_data_t *md, const char *key, char **value); - -#endif /* META_DATA_H */ diff --git a/telemetry/vpp-collectd/common/plugin.h b/telemetry/vpp-collectd/common/plugin.h deleted file mode 100644 index bbd69e003..000000000 --- a/telemetry/vpp-collectd/common/plugin.h +++ /dev/null @@ -1,483 +0,0 @@ -/** - * collectd - src/daemon/plugin.h - * Copyright (C) 2005-2014 Florian octo Forster - * - * Permission is hereby granted, free of charge, to any person obtaining a - * copy of this software and associated documentation files (the "Software"), - * to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, - * and/or sell copies of the Software, and to permit persons to whom the - * Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER - * DEALINGS IN THE SOFTWARE. - * - * Authors: - * Florian octo Forster <octo at collectd.org> - * Sebastian Harl <sh at tokkee.org> - **/ - -#ifndef PLUGIN_H -#define PLUGIN_H - -#include "collectd.h" - -#include "configfile.h" -#include "meta_data.h" -#include "utils_time.h" - -#include <inttypes.h> -#include <pthread.h> - -#define DS_TYPE_COUNTER 0 -#define DS_TYPE_GAUGE 1 -#define DS_TYPE_DERIVE 2 -#define DS_TYPE_ABSOLUTE 3 - -#define DS_TYPE_TO_STRING(t) \ - (t == DS_TYPE_COUNTER) ? "counter" \ - : (t == DS_TYPE_GAUGE) ? "gauge" \ - : (t == DS_TYPE_DERIVE) ? "derive" \ - : (t == DS_TYPE_ABSOLUTE) ? "absolute" \ - : "unknown" - -#ifndef LOG_ERR -#define LOG_ERR 3 -#endif -#ifndef LOG_WARNING -#define LOG_WARNING 4 -#endif -#ifndef LOG_NOTICE -#define LOG_NOTICE 5 -#endif -#ifndef LOG_INFO -#define LOG_INFO 6 -#endif -#ifndef LOG_DEBUG -#define LOG_DEBUG 7 -#endif - -#define NOTIF_MAX_MSG_LEN 256 - -#define NOTIF_FAILURE 1 -#define NOTIF_WARNING 2 -#define NOTIF_OKAY 4 - -#define plugin_interval (plugin_get_ctx().interval) - -/* - * Public data types - */ -struct identifier_s { - char *host; - char *plugin; - char *plugin_instance; - char *type; - char *type_instance; -}; -typedef struct identifier_s identifier_t; - -typedef unsigned long long counter_t; -typedef double gauge_t; -typedef int64_t derive_t; -typedef uint64_t absolute_t; - -union value_u { - counter_t counter; - gauge_t gauge; - derive_t derive; - absolute_t absolute; -}; -typedef union value_u value_t; - -struct value_list_s { - value_t *values; - size_t values_len; - cdtime_t time; - cdtime_t interval; - char host[DATA_MAX_NAME_LEN]; - char plugin[DATA_MAX_NAME_LEN]; - char plugin_instance[DATA_MAX_NAME_LEN]; - char type[DATA_MAX_NAME_LEN]; - char type_instance[DATA_MAX_NAME_LEN]; - meta_data_t *meta; -}; -typedef struct value_list_s value_list_t; - -#define VALUE_LIST_INIT \ - { .values = NULL, .meta = NULL } - -struct data_source_s { - char name[DATA_MAX_NAME_LEN]; - int type; - double min; - double max; -}; -typedef struct data_source_s data_source_t; - -struct data_set_s { - char type[DATA_MAX_NAME_LEN]; - size_t ds_num; - data_source_t *ds; -}; -typedef struct data_set_s data_set_t; - -enum notification_meta_type_e { - NM_TYPE_STRING, - NM_TYPE_SIGNED_INT, - NM_TYPE_UNSIGNED_INT, - NM_TYPE_DOUBLE, - NM_TYPE_BOOLEAN -}; - -typedef struct notification_meta_s { - char name[DATA_MAX_NAME_LEN]; - enum notification_meta_type_e type; - union { - const char *nm_string; - int64_t nm_signed_int; - uint64_t nm_unsigned_int; - double nm_double; - bool nm_boolean; - } nm_value; - struct notification_meta_s *next; -} notification_meta_t; - -typedef struct notification_s { - int severity; - cdtime_t time; - char message[NOTIF_MAX_MSG_LEN]; - char host[DATA_MAX_NAME_LEN]; - char plugin[DATA_MAX_NAME_LEN]; - char plugin_instance[DATA_MAX_NAME_LEN]; - char type[DATA_MAX_NAME_LEN]; - char type_instance[DATA_MAX_NAME_LEN]; - notification_meta_t *meta; -} notification_t; - -struct user_data_s { - void *data; - void (*free_func)(void *); -}; -typedef struct user_data_s user_data_t; - -enum cache_event_type_e { CE_VALUE_NEW, CE_VALUE_UPDATE, CE_VALUE_EXPIRED }; - -typedef struct cache_event_s { - enum cache_event_type_e type; - const value_list_t *value_list; - const char *value_list_name; - int ret; -} cache_event_t; - -struct plugin_ctx_s { - char *name; - cdtime_t interval; - cdtime_t flush_interval; - cdtime_t flush_timeout; -}; -typedef struct plugin_ctx_s plugin_ctx_t; - -/* - * Callback types - */ -typedef int (*plugin_init_cb)(void); -typedef int (*plugin_read_cb)(user_data_t *); -typedef int (*plugin_write_cb)(const data_set_t *, const value_list_t *, - user_data_t *); -typedef int (*plugin_flush_cb)(cdtime_t timeout, const char *identifier, - user_data_t *); -/* "missing" callback. Returns less than zero on failure, zero if other - * callbacks should be called, greater than zero if no more callbacks should be - * called. */ -typedef int (*plugin_missing_cb)(const value_list_t *, user_data_t *); -/* "cache event" callback. CE_VALUE_NEW events are sent to all registered - * callbacks. Callback should check if it interested in further CE_VALUE_UPDATE - * and CE_VALUE_EXPIRED events for metric and set event->ret = 1 if so. - */ -typedef int (*plugin_cache_event_cb)(cache_event_t *, user_data_t *); -typedef void (*plugin_log_cb)(int severity, const char *message, user_data_t *); -typedef int (*plugin_shutdown_cb)(void); -typedef int (*plugin_notification_cb)(const notification_t *, user_data_t *); -/* - * NAME - * plugin_set_dir - * - * DESCRIPTION - * Sets the current `plugindir' - * - * ARGUMENTS - * `dir' Path to the plugin directory - * - * NOTES - * If `dir' is NULL the compiled in default `PLUGINDIR' is used. - */ -void plugin_set_dir(const char *dir); - -/* - * NAME - * plugin_load - * - * DESCRIPTION - * Searches the current `plugindir' (see `plugin_set_dir') for the plugin - * named $type and loads it. Afterwards the plugin's `module_register' - * function is called, which then calls `plugin_register' to register callback - * functions. - * - * ARGUMENTS - * `name' Name of the plugin to load. - * `global' Make this plugins symbols available for other shared libraries. - * - * RETURN VALUE - * Returns zero upon success, a value greater than zero if no plugin was found - * and a value below zero if an error occurs. - * - * NOTES - * Re-loading an already loaded module is detected and zero is returned in - * this case. - */ -int plugin_load(const char *name, bool global); -bool plugin_is_loaded(char const *name); - -int plugin_init_all(void); -void plugin_read_all(void); -int plugin_read_all_once(void); -int plugin_shutdown_all(void); - -/* - * NAME - * plugin_write - * - * DESCRIPTION - * Calls the write function of the given plugin with the provided data set and - * value list. It differs from `plugin_dispatch_values' in that it does not - * update the cache, does not do threshold checking, call the chain subsystem - * and so on. It looks up the requested plugin and invokes the function, end - * of story. - * - * ARGUMENTS - * plugin Name of the plugin. If NULL, the value is sent to all registered - * write functions. - * ds Pointer to the data_set_t structure. If NULL, the data set is - * looked up according to the `type' member in the `vl' argument. - * vl The actual value to be processed. Must not be NULL. - * - * RETURN VALUE - * Returns zero upon success or non-zero if an error occurred. If `plugin' is - * NULL and more than one plugin is called, an error is only returned if *all* - * plugins fail. - * - * NOTES - * This is the function used by the `write' built-in target. May be used by - * other target plugins. - */ -int plugin_write(const char *plugin, const data_set_t *ds, - const value_list_t *vl); - -int plugin_flush(const char *plugin, cdtime_t timeout, const char *identifier); - -/* - * The `plugin_register_*' functions are used to make `config', `init', - * `read', `write' and `shutdown' functions known to the plugin - * infrastructure. Also, the data-formats are made public like this. - */ -int plugin_register_config(const char *name, - int (*callback)(const char *key, const char *val), - const char **keys, int keys_num); -int plugin_register_complex_config(const char *type, - int (*callback)(oconfig_item_t *)); -int plugin_register_init(const char *name, plugin_init_cb callback); -int plugin_register_read(const char *name, int (*callback)(void)); -/* "user_data" will be freed automatically, unless - * "plugin_register_complex_read" returns an error (non-zero). */ -int plugin_register_complex_read(const char *group, const char *name, - plugin_read_cb callback, cdtime_t interval, - user_data_t const *user_data); -int plugin_register_write(const char *name, plugin_write_cb callback, - user_data_t const *user_data); -int plugin_register_flush(const char *name, plugin_flush_cb callback, - user_data_t const *user_data); -int plugin_register_missing(const char *name, plugin_missing_cb callback, - user_data_t const *user_data); -int plugin_register_cache_event(const char *name, - plugin_cache_event_cb callback, - user_data_t const *ud); -int plugin_register_shutdown(const char *name, plugin_shutdown_cb callback); -int plugin_register_data_set(const data_set_t *ds); -int plugin_register_log(const char *name, plugin_log_cb callback, - user_data_t const *user_data); -int plugin_register_notification(const char *name, - plugin_notification_cb callback, - user_data_t const *user_data); - -int plugin_unregister_config(const char *name); -int plugin_unregister_complex_config(const char *name); -int plugin_unregister_init(const char *name); -int plugin_unregister_read(const char *name); -int plugin_unregister_read_group(const char *group); -int plugin_unregister_write(const char *name); -int plugin_unregister_flush(const char *name); -int plugin_unregister_missing(const char *name); -int plugin_unregister_cache_event(const char *name); -int plugin_unregister_shutdown(const char *name); -int plugin_unregister_data_set(const char *name); -int plugin_unregister_log(const char *name); -int plugin_unregister_notification(const char *name); - -/* - * NAME - * plugin_log_available_writers - * - * DESCRIPTION - * This function can be called to output a list of _all_ registered - * writers to the logfacility. - * Since some writers dynamically build their name it can be hard for - * the configuring person to know it. This function will fill this gap. - */ -void plugin_log_available_writers(void); - -/* - * NAME - * plugin_dispatch_values - * - * DESCRIPTION - * This function is called by reading processes with the values they've - * aquired. The function fetches the data-set definition (that has been - * registered using `plugin_register_data_set') and calls _all_ registered - * write-functions. - * - * ARGUMENTS - * `vl' Value list of the values that have been read by a `read' - * function. - */ -int plugin_dispatch_values(value_list_t const *vl); - -/* - * NAME - * plugin_dispatch_multivalue - * - * SYNOPSIS - * plugin_dispatch_multivalue (vl, true, DS_TYPE_GAUGE, - * "free", 42.0, - * "used", 58.0, - * NULL); - * - * DESCRIPTION - * Takes a list of type instances and values and dispatches that in a batch, - * making sure that all values have the same time stamp. If "store_percentage" - * is set to true, the "type" is set to "percent" and a percentage is - * calculated and dispatched, rather than the absolute values. Values that are - * NaN are dispatched as NaN and will not influence the total. - * - * The variadic arguments is a list of type_instance / type pairs, that are - * interpreted as type "char const *" and type, encoded by their corresponding - * "store_type": - * - * - "gauge_t" when "DS_TYPE_GAUGE" - * - "absolute_t" when "DS_TYPE_ABSOLUTE" - * - "derive_t" when "DS_TYPE_DERIVE" - * - "counter_t" when "DS_TYPE_COUNTER" - * - * The last argument must be - * a NULL pointer to signal end-of-list. - * - * RETURNS - * The number of values it failed to dispatch (zero on success). - */ -__attribute__((sentinel)) int plugin_dispatch_multivalue(value_list_t const *vl, - bool store_percentage, - int store_type, ...); - -int plugin_dispatch_missing(const value_list_t *vl); -void plugin_dispatch_cache_event(enum cache_event_type_e event_type, - unsigned long callbacks_mask, const char *name, - const value_list_t *vl); - -int plugin_dispatch_notification(const notification_t *notif); - -void plugin_log(int level, const char *format, ...) - __attribute__((format(printf, 2, 3))); - -/* These functions return the parsed severity or less than zero on failure. */ -int parse_log_severity(const char *severity); -int parse_notif_severity(const char *severity); - -#define ERROR(...) plugin_log(LOG_ERR, __VA_ARGS__) -#define WARNING(...) plugin_log(LOG_WARNING, __VA_ARGS__) -#define NOTICE(...) plugin_log(LOG_NOTICE, __VA_ARGS__) -#define INFO(...) plugin_log(LOG_INFO, __VA_ARGS__) -#if COLLECT_DEBUG -#define DEBUG(...) plugin_log(LOG_DEBUG, __VA_ARGS__) -#else /* COLLECT_DEBUG */ -#define DEBUG(...) /* noop */ -#endif /* ! COLLECT_DEBUG */ - -/* This will log messages, prefixed by plugin name */ -void daemon_log(int level, const char *format, ...) - __attribute__((format(printf, 2, 3))); - -#define P_ERROR(...) daemon_log(LOG_ERR, __VA_ARGS__) -#define P_WARNING(...) daemon_log(LOG_WARNING, __VA_ARGS__) -#define P_NOTICE(...) daemon_log(LOG_NOTICE, __VA_ARGS__) -#define P_INFO(...) daemon_log(LOG_INFO, __VA_ARGS__) - -const data_set_t *plugin_get_ds(const char *name); - -int plugin_notification_meta_add_string(notification_t *n, const char *name, - const char *value); -int plugin_notification_meta_add_signed_int(notification_t *n, const char *name, - int64_t value); -int plugin_notification_meta_add_unsigned_int(notification_t *n, - const char *name, uint64_t value); -int plugin_notification_meta_add_double(notification_t *n, const char *name, - double value); -int plugin_notification_meta_add_boolean(notification_t *n, const char *name, - bool value); - -int plugin_notification_meta_copy(notification_t *dst, - const notification_t *src); - -int plugin_notification_meta_free(notification_meta_t *n); - -/* - * Plugin context management. - */ - -void plugin_init_ctx(void); - -plugin_ctx_t plugin_get_ctx(void); -plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx); - -/* - * NAME - * plugin_get_interval - * - * DESCRIPTION - * This function returns the current value of the plugin's interval. The - * return value will be strictly greater than zero in all cases. If - * everything else fails, it will fall back to 10 seconds. - */ -cdtime_t plugin_get_interval(void); - -/* - * Context-aware thread management. - */ - -int plugin_thread_create(pthread_t *thread, void *(*start_routine)(void *), - void *arg, char const *name); - -/* - * Plugins need to implement this - */ - -void module_register(void); - -#endif /* PLUGIN_H */ diff --git a/telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt b/telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt index d55aede80..85dd51577 100644 --- a/telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt +++ b/telemetry/vpp-collectd/vpp-hicn/CMakeLists.txt @@ -12,55 +12,62 @@ # limitations under the License. ############################################################## -# Check if building as subproject or as root project +# Dependencies ############################################################## -if(${CMAKE_SOURCE_DIR}/vpp-collectd STREQUAL ${PROJECT_SOURCE_DIR}) - message (STATUS "not compiling in the same folder") - find_package(HicnPlugin ${CURRENT_VERSION} REQUIRED) - find_package(Vapisafe ${CURRENT_VERSION} REQUIRED) -else() - message (STATUS "compiling in the same folder") - list(APPEND DEPENDENCIES - ${HICNPLUGIN_SHARED} - ) -endif() +find_package(HicnPlugin ${CURRENT_VERSION} REQUIRED) +find_package(Libsafevapi ${CURRENT_VERSION} REQUIRED NO_MODULE) ############################################################## -# Sources +# Source files ############################################################## list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/vpp_hicn.c ) + +############################################################## +# Include dirs +############################################################## list(APPEND INCLUDE_DIRS ${COLLECTD_INCLUDE_DIRS} + ${THIRD_PARTY_INCLUDE_DIRS} ${HICNPLUGIN_INCLUDE_DIRS} - ${SAFE_VAPI_INCLUDE_DIRS} + ${Libsafe_vapi_INCLUDE_DIRS} ${VPP_INCLUDE_DIRS} - ${CMAKE_CURRENT_SOURCE_DIR} - "${CMAKE_CURRENT_SOURCE_DIR}/../common" ) ############################################################## -# Libs +# Libraries ############################################################## list(APPEND LIBRARIES ${VPP_LIBRARY_VAPICLIENT} - ${SAFE_VAPI_LIBRARIES} + hicn::safevapi.shared +) + + +############################################################## +# Compiler options +############################################################## +list(APPEND COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ${COLLECTD_COMPILER_OPTIONS} ) ############################################################## # Build library ############################################################## -build_library(vpp_hicn +build_library(${VPP_HICN_TELEMETRY} + SHARED + EMPTY_PREFIX SOURCES ${SOURCE_FILES} LINK_LIBRARIES ${LIBRARIES} - INCLUDE_DIRS ${INCLUDE_DIRS} - INSTALL_FULL_PATH_DIR ${CMAKE_INSTALL_PREFIX}/lib/collectd - COMPONENT "${COLLECTD_PLUGINS}" + INCLUDE_DIRS + PRIVATE ${INCLUDE_DIRS} + INSTALL_FULL_PATH_DIR ${COLLECTD_PLUGIN_DIR} + COMPONENT ${COLLECTD_PLUGINS} DEPENDS ${DEPENDENCIES} COMPILE_OPTIONS ${COMPILER_OPTIONS} ) diff --git a/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c b/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c index a724c1124..a20bcbcd0 100644 --- a/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c +++ b/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c @@ -13,13 +13,14 @@ * limitations under the License. */ -/* Keep order as it is */ -#include "common.h" -#include <config.h> +#include "../../data_model.h" +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" #define counter_t vpp_counter_t +#include <hicn/vapi/vapi_safe.h> #include <vapi/hicn.api.vapi.h> -#include <vapi/vapi_safe.h> #undef counter_t DEFINE_VAPI_MSG_IDS_HICN_API_JSON @@ -34,140 +35,6 @@ static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); static bool verbose = false; static char *tag = NULL; -/************** DATA SOURCES ******************************/ -static data_source_t packets_dsrc[1] = { - {"packets", DS_TYPE_GAUGE, 0, NAN}, -}; - -static data_source_t interests_dsrc[1] = { - {"interests", DS_TYPE_GAUGE, 0, NAN}, -}; - -static data_source_t data_dsrc[1] = { - {"data", DS_TYPE_GAUGE, 0, NAN}, -}; - -static data_source_t combined_dsrc[2] = { - {"packets", DS_TYPE_DERIVE, 0, NAN}, - {"bytes", DS_TYPE_DERIVE, 0, NAN}, -}; - -/************** DATA SETS NODE ****************************/ -static data_set_t pkts_processed_ds = { - "pkts_processed", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pkts_interest_count_ds = { - "pkts_interest_count", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pkts_data_count_ds = { - "pkts_data_count", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pkts_from_cache_count_ds = { - "pkts_from_cache_count", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pkts_no_pit_count_ds = { - "pkts_no_pit_count", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t pit_expired_count_ds = { - "pit_expired_count", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t cs_expired_count_ds = { - "cs_expired_count", - STATIC_ARRAY_SIZE(data_dsrc), - data_dsrc, -}; - -static data_set_t cs_lru_count_ds = { - "cs_lru_count", - STATIC_ARRAY_SIZE(data_dsrc), - data_dsrc, -}; - -static data_set_t pkts_drop_no_buf_ds = { - "pkts_drop_no_buf", - STATIC_ARRAY_SIZE(packets_dsrc), - packets_dsrc, -}; - -static data_set_t interests_aggregated_ds = { - "interests_aggregated", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t interests_retx_ds = { - "interests_retx", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t interests_hash_collision_ds = { - "interests_hash_collision", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t pit_entries_count_ds = { - "pit_entries_count", - STATIC_ARRAY_SIZE(interests_dsrc), - interests_dsrc, -}; - -static data_set_t cs_entries_count_ds = { - "cs_entries_count", - STATIC_ARRAY_SIZE(data_dsrc), - data_dsrc, -}; - -static data_set_t cs_entries_ntw_count_ds = { - "cs_entries_ntw_count", - STATIC_ARRAY_SIZE(data_dsrc), - data_dsrc, -}; - -/************** DATA SETS FACE ****************************/ -static data_set_t irx_ds = { - "irx", - STATIC_ARRAY_SIZE(combined_dsrc), - combined_dsrc, -}; - -static data_set_t itx_ds = { - "itx", - STATIC_ARRAY_SIZE(combined_dsrc), - combined_dsrc, -}; - -static data_set_t drx_ds = { - "drx", - STATIC_ARRAY_SIZE(combined_dsrc), - combined_dsrc, -}; - -static data_set_t dtx_ds = { - "dtx", - STATIC_ARRAY_SIZE(combined_dsrc), - combined_dsrc, -}; - /**********************************************************/ /********** UTILITY FUNCTIONS *****************************/ /**********************************************************/ @@ -189,8 +56,7 @@ static int submit(const char *plugin_instance, const char *type, sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance)); sstrncpy(vl.type, type, sizeof(vl.type)); - if (tag != NULL) - sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); + if (tag != NULL) sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); return plugin_dispatch_values(&vl); } @@ -223,15 +89,12 @@ static int vpp_hicn_config(const char *key, const char *value) { /* * Callback called by the hICN plugin API when node stats are ready. */ -static vapi_error_e -parse_node_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, - bool is_last, - vapi_payload_hicn_api_node_stats_get_reply *reply) { - if (reply == NULL || rv != VAPI_OK) - return rv; +static vapi_error_e parse_node_stats( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_node_stats_get_reply *reply) { + if (reply == NULL || rv != VAPI_OK) return rv; - if (reply->retval != VAPI_OK) - return reply->retval; + if (reply->retval != VAPI_OK) return reply->retval; char *node_name = "node"; value_t values[1]; @@ -277,15 +140,12 @@ parse_node_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, /* * Callback called by the hICN plugin API when face stats are ready. */ -static vapi_error_e -parse_face_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, - bool is_last, - vapi_payload_hicn_api_face_stats_details *reply) { - if (reply == NULL || rv != VAPI_OK) - return rv; +static vapi_error_e parse_face_stats( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_face_stats_details *reply) { + if (reply == NULL || rv != VAPI_OK) return rv; - if (reply->retval != VAPI_OK) - return reply->retval; + if (reply->retval != VAPI_OK) return reply->retval; char face_name[10]; snprintf(face_name, 10, "face%u", reply->faceid); @@ -314,8 +174,7 @@ parse_face_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, static int vpp_hicn_init(void) { int ret = vapi_connect_safe(&vapi_ctx, 0); - if (ret) - plugin_log(LOG_ERR, "vpp_hicn plugin: vapi_connect_safe failed"); + if (ret) plugin_log(LOG_ERR, "vpp_hicn plugin: vapi_connect_safe failed"); return ret; } diff --git a/telemetry/vpp-collectd/vpp/CMakeLists.txt b/telemetry/vpp-collectd/vpp/CMakeLists.txt index 41c19208a..e1cf55553 100644 --- a/telemetry/vpp-collectd/vpp/CMakeLists.txt +++ b/telemetry/vpp-collectd/vpp/CMakeLists.txt @@ -20,13 +20,12 @@ list(APPEND SOURCE_FILES ############################################################## -# Include directories +# Include dirs ############################################################## list(APPEND INCLUDE_DIRS ${COLLECTD_INCLUDE_DIRS} + ${THIRD_PARTY_INCLUDE_DIRS} ${VPP_INCLUDE_DIRS} - ${CMAKE_CURRENT_SOURCE_DIR} - "${CMAKE_CURRENT_SOURCE_DIR}/../common" ) @@ -39,11 +38,26 @@ list(APPEND LIBRARIES ) -build_module(vpp +############################################################## +# Compiler options +############################################################## +list(APPEND COMPILER_OPTIONS + ${DEFAULT_COMPILER_OPTIONS} + ${COLLECTD_COMPILER_OPTIONS} +) + + +############################################################## +# Build library +############################################################## +build_library(${VPP_TELEMETRY} + SHARED + EMPTY_PREFIX SOURCES ${SOURCE_FILES} LINK_LIBRARIES ${LIBRARIES} - INCLUDE_DIRS ${INCLUDE_DIRS} - INSTALL_FULL_PATH_DIR ${CMAKE_INSTALL_PREFIX}/lib/collectd + INCLUDE_DIRS + PRIVATE ${INCLUDE_DIRS} + INSTALL_FULL_PATH_DIR ${COLLECTD_PLUGIN_DIR} COMPONENT ${COLLECTD_PLUGINS} COMPILE_OPTIONS ${COMPILER_OPTIONS} ) diff --git a/telemetry/vpp-collectd/vpp/vpp.c b/telemetry/vpp-collectd/vpp/vpp.c index 85d0971d0..ff70f3503 100644 --- a/telemetry/vpp-collectd/vpp/vpp.c +++ b/telemetry/vpp-collectd/vpp/vpp.c @@ -13,9 +13,9 @@ * limitations under the License. */ -/* Keep order as it is */ -#include "common.h" -#include <config.h> +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" #define counter_t vpp_counter_t #include <vpp-api/client/stat_client.h> @@ -165,8 +165,7 @@ static int submit(const char *plugin_instance, const char *type, sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance)); sstrncpy(vl.type, type, sizeof(vl.type)); - if (tag != NULL) - sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); + if (tag != NULL) sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); return plugin_dispatch_values(&vl); } @@ -261,8 +260,7 @@ static int vpp_init(void) { u8 *stat_segment_name = (u8 *)STAT_SEGMENT_SOCKET_FILE; int ret = stat_segment_connect((char *)stat_segment_name); - if (ret) - plugin_log(LOG_ERR, "vpp plugin: connecting to segment failed"); + if (ret) plugin_log(LOG_ERR, "vpp plugin: connecting to segment failed"); return ret; } @@ -296,66 +294,65 @@ static int vpp_read(void) { /* Collect results for each interface and submit them */ for (int i = 0; i < vec_len(res); i++) { switch (res[i].type) { - case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE: - for (int k = 0; k < vec_len(res[i].simple_counter_vec); k++) { - for (int j = 0; j < vec_len(res[i].simple_counter_vec[k]); j++) { - if (!interfaces[j]) { - continue; - } + case STAT_DIR_TYPE_COUNTER_VECTOR_SIMPLE: + for (int k = 0; k < vec_len(res[i].simple_counter_vec); k++) { + for (int j = 0; j < vec_len(res[i].simple_counter_vec[k]); j++) { + if (!interfaces[j]) { + continue; + } - if (get_data_set(res[i].name, &data_set)) { - continue; - } + if (get_data_set(res[i].name, &data_set)) { + continue; + } - value_t values[1] = { - (value_t){.derive = res[i].simple_counter_vec[k][j]}}; + value_t values[1] = { + (value_t){.derive = res[i].simple_counter_vec[k][j]}}; - err = submit(interfaces[j], data_set.type, values, 1, ×tamp); + err = submit(interfaces[j], data_set.type, values, 1, ×tamp); - if (err) - goto END; + if (err) goto END; + } } - } - break; + break; - case STAT_DIR_TYPE_COUNTER_VECTOR_COMBINED: - for (int k = 0; k < vec_len(res[i].combined_counter_vec); k++) { - for (int j = 0; j < vec_len(res[i].combined_counter_vec[k]); j++) { - if (!interfaces[j]) { - continue; - } + case STAT_DIR_TYPE_COUNTER_VECTOR_COMBINED: + for (int k = 0; k < vec_len(res[i].combined_counter_vec); k++) { + for (int j = 0; j < vec_len(res[i].combined_counter_vec[k]); j++) { + if (!interfaces[j]) { + continue; + } - if (get_data_set(res[i].name, &data_set)) { - continue; - } + if (get_data_set(res[i].name, &data_set)) { + continue; + } - value_t values[2] = { - (value_t){.derive = res[i].combined_counter_vec[k][j].packets}, - (value_t){.derive = res[i].combined_counter_vec[k][j].bytes}, - }; + value_t values[2] = { + (value_t){.derive = res[i].combined_counter_vec[k][j].packets}, + (value_t){.derive = res[i].combined_counter_vec[k][j].bytes}, + }; - err = submit(interfaces[j], data_set.type, values, 2, ×tamp); + err = submit(interfaces[j], data_set.type, values, 2, ×tamp); - if (err) - goto END; + if (err) goto END; + } } - } - break; + break; - case STAT_DIR_TYPE_SCALAR_INDEX: - plugin_log(LOG_INFO, "vpp plugin: %.2f %s", res[i].scalar_value, - res[i].name); - break; + case STAT_DIR_TYPE_SCALAR_INDEX: + plugin_log(LOG_INFO, "vpp plugin: %.2f %s", res[i].scalar_value, + res[i].name); + break; - case STAT_DIR_TYPE_NAME_VECTOR: - break; + case STAT_DIR_TYPE_NAME_VECTOR: + break; - case STAT_DIR_TYPE_ERROR_INDEX: - break; + case STAT_DIR_TYPE_ERROR_INDEX: + break; - default: - plugin_log(LOG_WARNING, "vpp plugin: unknown stat type %d", res[i].type); - break; + default: + plugin_log(LOG_WARNING, "vpp plugin: unknown stat type %d", + res[i].type); + break; } } |