diff options
29 files changed, 3417 insertions, 2106 deletions
diff --git a/MAINTAINERS b/MAINTAINERS index 2f198319a3c..0f5dfc04d84 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -112,10 +112,10 @@ VNET VXLAN M: John Lo <loj@cisco.com> F: src/vnet/vxlan/ -Plugin - flowperpkt -M: Dave Barach <dave@barachs.net> -F: src/plugins/flowperpkt/ -F: src/plugins/flowperpkt.am +Plugin - flowprobe +M: Ole Troan <otroan@employees.org> +F: src/plugins/flowprobe/ +F: src/plugins/flowprobe.am Plugin - SIXRD M: Ole Troan <ot@cisco.com> diff --git a/doxygen/user_doc.md b/doxygen/user_doc.md index d052c53b6b1..becc2e0a353 100644 --- a/doxygen/user_doc.md +++ b/doxygen/user_doc.md @@ -11,7 +11,7 @@ Several modules provide operational, dataplane-user focused documentation. - @subpage lldp_doc - @subpage map_doc - @subpage dpdk_crypto_ipsec_doc -- @subpage flowperpkt_plugin_doc +- @subpage flowprobe_plugin_doc - @subpage qos_doc - @subpage span_doc - @subpage srv6_doc diff --git a/src/configure.ac b/src/configure.ac index 2a907b2f93d..173b3153807 100644 --- a/src/configure.ac +++ b/src/configure.ac @@ -146,7 +146,7 @@ AC_SUBST(AR_FLAGS) # Please keep alphabetical order PLUGIN_ENABLED(acl) PLUGIN_ENABLED(dpdk) -PLUGIN_ENABLED(flowperpkt) +PLUGIN_ENABLED(flowprobe) PLUGIN_ENABLED(gtpu) PLUGIN_ENABLED(ila) PLUGIN_ENABLED(ioam) diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index 388c9ad2196..f26d0fd27c5 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -37,8 +37,8 @@ if ENABLE_DPDK_PLUGIN include dpdk.am endif -if ENABLE_FLOWPERPKT_PLUGIN -include flowperpkt.am +if ENABLE_FLOWPROBE_PLUGIN +include flowprobe.am endif diff --git a/src/plugins/flowperpkt/flowperpkt.c b/src/plugins/flowperpkt/flowperpkt.c deleted file mode 100644 index 3e5fc8b0c4d..00000000000 --- a/src/plugins/flowperpkt/flowperpkt.c +++ /dev/null @@ -1,621 +0,0 @@ -/* - * flowperpkt.c - per-packet data capture flow report plugin - * - * Copyright (c) <current-year> <your-organization> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * @file - * @brief Per-packet IPFIX flow record generator plugin - * - * This file implements vpp plugin registration mechanics, - * debug CLI, and binary API handling. - */ - -#include <vnet/vnet.h> -#include <vpp/app/version.h> -#include <vnet/plugin/plugin.h> -#include <flowperpkt/flowperpkt.h> - -#include <vlibapi/api.h> -#include <vlibmemory/api.h> -#include <vlibsocket/api.h> - -/* define message IDs */ -#include <flowperpkt/flowperpkt_msg_enum.h> - -/* define message structures */ -#define vl_typedefs -#include <flowperpkt/flowperpkt_all_api_h.h> -#undef vl_typedefs - -/* define generated endian-swappers */ -#define vl_endianfun -#include <flowperpkt/flowperpkt_all_api_h.h> -#undef vl_endianfun - -/* instantiate all the print functions we know about */ -#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__) -#define vl_printfun -#include <flowperpkt/flowperpkt_all_api_h.h> -#undef vl_printfun - -flowperpkt_main_t flowperpkt_main; - -/* Get the API version number */ -#define vl_api_version(n,v) static u32 api_version=(v); -#include <flowperpkt/flowperpkt_all_api_h.h> -#undef vl_api_version - -#define REPLY_MSG_ID_BASE fm->msg_id_base -#include <vlibapi/api_helper_macros.h> - -/* Define the per-interface configurable features */ -/* *INDENT-OFF* */ -VNET_FEATURE_INIT (flow_perpacket_ipv4, static) = -{ - .arc_name = "ip4-output", - .node_name = "flowperpkt-ipv4", - .runs_before = VNET_FEATURES ("interface-output"), -}; - -VNET_FEATURE_INIT (flow_perpacket_l2, static) = -{ - .arc_name = "interface-output", - .node_name = "flowperpkt-l2", - .runs_before = VNET_FEATURES ("interface-tx"), -}; -/* *INDENT-ON* */ - -/* Macro to finish up custom dump fns */ -#define FINISH \ - vec_add1 (s, 0); \ - vl_print (handle, (char *)s); \ - vec_free (s); \ - return handle; - -/** - * @brief Create an IPFIX template packet rewrite string - * @param frm flow_report_main_t * - * @param fr flow_report_t * - * @param collector_address ip4_address_t * the IPFIX collector address - * @param src_address ip4_address_t * the source address we should use - * @param collector_port u16 the collector port we should use, host byte order - * @returns u8 * vector containing the indicated IPFIX template packet - */ -static inline u8 * -flowperpkt_template_rewrite_inline (flow_report_main_t * frm, - flow_report_t * fr, - ip4_address_t * collector_address, - ip4_address_t * src_address, - u16 collector_port, int variant) -{ - ip4_header_t *ip; - udp_header_t *udp; - ipfix_message_header_t *h; - ipfix_set_header_t *s; - ipfix_template_header_t *t; - ipfix_field_specifier_t *f; - ipfix_field_specifier_t *first_field; - u8 *rewrite = 0; - ip4_ipfix_template_packet_t *tp; - u32 field_count = 0; - flow_report_stream_t *stream; - flowperpkt_main_t *fm = &flowperpkt_main; - - stream = &frm->streams[fr->stream_index]; - - if (variant == FLOW_VARIANT_IPV4) - { - /* - * ip4 Supported Fields: - * - * ingressInterface, TLV type 10, u32 - * egressInterface, TLV type 14, u32 - * sourceIpv4Address, TLV type 8, u32 - * destinationIPv4Address, TLV type 12, u32 - * ipClassOfService, TLV type 5, u8 - * flowStartNanoseconds, TLV type 156, dateTimeNanoseconds (f64) - * Implementation: f64 nanoseconds since VPP started - * warning: wireshark doesn't really understand this TLV - * dataLinkFrameSize, TLV type 312, u16 - * warning: wireshark doesn't understand this TLV at all - */ - - /* Currently 7 fields */ - field_count += 7; - - /* allocate rewrite space */ - vec_validate_aligned - (rewrite, - sizeof (ip4_ipfix_template_packet_t) - + field_count * sizeof (ipfix_field_specifier_t) - 1, - CLIB_CACHE_LINE_BYTES); - } - else if (variant == FLOW_VARIANT_L2) - { - /* - * L2 Supported Fields: - * - * ingressInterface, TLV type 10, u32 - * egressInterface, TLV type 14, u32 - * sourceMacAddress, TLV type 56, u8[6] we hope - * destinationMacAddress, TLV type 57, u8[6] we hope - * ethernetType, TLV type 256, u16 - * flowStartNanoseconds, TLV type 156, dateTimeNanoseconds (f64) - * Implementation: f64 nanoseconds since VPP started - * warning: wireshark doesn't really understand this TLV - * dataLinkFrameSize, TLV type 312, u16 - * warning: wireshark doesn't understand this TLV at all - */ - - /* Currently 7 fields */ - field_count += 7; - - /* allocate rewrite space */ - vec_validate_aligned - (rewrite, - sizeof (ip4_ipfix_template_packet_t) - + field_count * sizeof (ipfix_field_specifier_t) - 1, - CLIB_CACHE_LINE_BYTES); - } - - tp = (ip4_ipfix_template_packet_t *) rewrite; - ip = (ip4_header_t *) & tp->ip4; - udp = (udp_header_t *) (ip + 1); - h = (ipfix_message_header_t *) (udp + 1); - s = (ipfix_set_header_t *) (h + 1); - t = (ipfix_template_header_t *) (s + 1); - first_field = f = (ipfix_field_specifier_t *) (t + 1); - - ip->ip_version_and_header_length = 0x45; - ip->ttl = 254; - ip->protocol = IP_PROTOCOL_UDP; - ip->src_address.as_u32 = src_address->as_u32; - ip->dst_address.as_u32 = collector_address->as_u32; - udp->src_port = clib_host_to_net_u16 (stream->src_port); - udp->dst_port = clib_host_to_net_u16 (collector_port); - udp->length = clib_host_to_net_u16 (vec_len (rewrite) - sizeof (*ip)); - - /* FIXUP: message header export_time */ - /* FIXUP: message header sequence_number */ - h->domain_id = clib_host_to_net_u32 (stream->domain_id); - - /* Add TLVs to the template */ - if (variant == FLOW_VARIANT_IPV4) - { - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , ingressInterface, - 4); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , egressInterface, - 4); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , sourceIPv4Address, - 4); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , destinationIPv4Address, 4); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , ipClassOfService, - 1); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , flowStartNanoseconds, - 8); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , dataLinkFrameSize, - 2); - f++; - } - else if (variant == FLOW_VARIANT_L2) - { - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , ingressInterface, - 4); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , egressInterface, - 4); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , sourceMacAddress, - 6); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , destinationMacAddress, 6); - f++; - f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , ethernetType, - 2); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , flowStartNanoseconds, - 8); - f++; - f->e_id_length = - ipfix_e_id_length (0 /* enterprise */ , dataLinkFrameSize, - 2); - f++; - } - - /* Extend in the obvious way, right here... */ - - /* Back to the template packet... */ - ip = (ip4_header_t *) & tp->ip4; - udp = (udp_header_t *) (ip + 1); - - ASSERT (f - first_field); - /* Field count in this template */ - t->id_count = ipfix_id_count (fr->template_id, f - first_field); - - if (variant == FLOW_VARIANT_IPV4) - fm->ipv4_report_id = fr->template_id; - else if (variant == FLOW_VARIANT_L2) - fm->l2_report_id = fr->template_id; - - /* set length in octets */ - s->set_id_length = - ipfix_set_id_length (2 /* set_id */ , (u8 *) f - (u8 *) s); - - /* message length in octets */ - h->version_length = version_length ((u8 *) f - (u8 *) h); - - ip->length = clib_host_to_net_u16 ((u8 *) f - (u8 *) ip); - ip->checksum = ip4_header_checksum (ip); - - return rewrite; -} - -u8 * -flowperpkt_template_rewrite_ipv4 (flow_report_main_t * frm, - flow_report_t * fr, - ip4_address_t * collector_address, - ip4_address_t * src_address, - u16 collector_port) -{ - return flowperpkt_template_rewrite_inline - (frm, fr, collector_address, src_address, collector_port, - FLOW_VARIANT_IPV4); -} - -u8 * -flowperpkt_template_rewrite_l2 (flow_report_main_t * frm, - flow_report_t * fr, - ip4_address_t * collector_address, - ip4_address_t * src_address, - u16 collector_port) -{ - return flowperpkt_template_rewrite_inline - (frm, fr, collector_address, src_address, collector_port, - FLOW_VARIANT_L2); -} - - -/** - * @brief Flush accumulated data - * @param frm flow_report_main_t * - * @param fr flow_report_t * - * @param f vlib_frame_t * - * - * <em>Notes:</em> - * This function must simply return the incoming frame, or no template packets - * will be sent. - */ -vlib_frame_t * -flowperpkt_data_callback_ipv4 (flow_report_main_t * frm, - flow_report_t * fr, - vlib_frame_t * f, u32 * to_next, - u32 node_index) -{ - flowperpkt_flush_callback_ipv4 (); - return f; -} - -vlib_frame_t * -flowperpkt_data_callback_l2 (flow_report_main_t * frm, - flow_report_t * fr, - vlib_frame_t * f, u32 * to_next, u32 node_index) -{ - flowperpkt_flush_callback_l2 (); - return f; -} - -/** - * @brief configure / deconfigure the IPFIX flow-per-packet - * @param fm flowperpkt_main_t * fm - * @param sw_if_index u32 the desired interface - * @param is_add int 1 to enable the feature, 0 to disable it - * @returns 0 if successful, non-zero otherwise - */ - -static int flowperpkt_tx_interface_add_del_feature - (flowperpkt_main_t * fm, u32 sw_if_index, int which, int is_add) -{ - flow_report_main_t *frm = &flow_report_main; - vnet_flow_report_add_del_args_t _a, *a = &_a; - int rv; - - if (which == FLOW_VARIANT_IPV4 && !fm->ipv4_report_created) - { - memset (a, 0, sizeof (*a)); - a->rewrite_callback = flowperpkt_template_rewrite_ipv4; - a->flow_data_callback = flowperpkt_data_callback_ipv4; - a->is_add = 1; - a->domain_id = 1; /*$$$$ config parameter */ - a->src_port = 4739; /*$$$$ config parameter */ - fm->ipv4_report_created = 1; - - rv = vnet_flow_report_add_del (frm, a); - if (rv) - { - clib_warning ("vnet_flow_report_add_del returned %d", rv); - return -1; - } - } - else if (which == FLOW_VARIANT_L2 && !fm->l2_report_created) - { - memset (a, 0, sizeof (*a)); - a->rewrite_callback = flowperpkt_template_rewrite_l2; - a->flow_data_callback = flowperpkt_data_callback_l2; - a->is_add = 1; - a->domain_id = 1; /*$$$$ config parameter */ - a->src_port = 4739; /*$$$$ config parameter */ - fm->l2_report_created = 1; - - rv = vnet_flow_report_add_del (frm, a); - if (rv) - { - clib_warning ("vnet_flow_report_add_del returned %d", rv); - return -1; - } - } - - if (which == FLOW_VARIANT_IPV4) - vnet_feature_enable_disable ("ip4-output", "flowperpkt-ipv4", - sw_if_index, is_add, 0, 0); - else if (which == FLOW_VARIANT_L2) - vnet_feature_enable_disable ("interface-output", "flowperpkt-l2", - sw_if_index, is_add, 0, 0); - - return 0; -} - -/** - * @brief API message handler - * @param mp vl_api_flowperpkt_tx_interface_add_del_t * mp the api message - */ -void vl_api_flowperpkt_tx_interface_add_del_t_handler - (vl_api_flowperpkt_tx_interface_add_del_t * mp) -{ - flowperpkt_main_t *fm = &flowperpkt_main; - vl_api_flowperpkt_tx_interface_add_del_reply_t *rmp; - u32 sw_if_index = ntohl (mp->sw_if_index); - int rv = 0; - - VALIDATE_SW_IF_INDEX (mp); - - if (mp->which != FLOW_VARIANT_IPV4 && mp->which != FLOW_VARIANT_L2) - { - rv = VNET_API_ERROR_UNIMPLEMENTED; - goto out; - } - - rv = flowperpkt_tx_interface_add_del_feature (fm, sw_if_index, mp->which, - mp->is_add); -out: - BAD_SW_IF_INDEX_LABEL; - - REPLY_MACRO (VL_API_FLOWPERPKT_TX_INTERFACE_ADD_DEL_REPLY); -} - -/** - * @brief API message custom-dump function - * @param mp vl_api_flowperpkt_tx_interface_add_del_t * mp the api message - * @param handle void * print function handle - * @returns u8 * output string - */ -static void *vl_api_flowperpkt_tx_interface_add_del_t_print - (vl_api_flowperpkt_tx_interface_add_del_t * mp, void *handle) -{ - u8 *s; - - s = format (0, "SCRIPT: flowperpkt_tx_interface_add_del "); - s = format (s, "sw_if_index %d is_add %d which %d ", - clib_host_to_net_u32 (mp->sw_if_index), - (int) mp->is_add, (int) mp->which); - FINISH; -} - -/* List of message types that this plugin understands */ -#define foreach_flowperpkt_plugin_api_msg \ -_(FLOWPERPKT_TX_INTERFACE_ADD_DEL, flowperpkt_tx_interface_add_del) - -/* *INDENT-OFF* */ -VLIB_PLUGIN_REGISTER () = { - .version = VPP_BUILD_VER, - .description = "Flow per Packet", -}; -/* *INDENT-ON* */ - -static clib_error_t * -flowperpkt_tx_interface_add_del_feature_command_fn (vlib_main_t * vm, - unformat_input_t * input, - vlib_cli_command_t * cmd) -{ - flowperpkt_main_t *fm = &flowperpkt_main; - u32 sw_if_index = ~0; - int is_add = 1; - u8 which = FLOW_VARIANT_IPV4; - - int rv; - - while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) - { - if (unformat (input, "disable")) - is_add = 0; - else if (unformat (input, "%U", unformat_vnet_sw_interface, - fm->vnet_main, &sw_if_index)); - else if (unformat (input, "l2")) - which = FLOW_VARIANT_L2; - else - break; - } - - if (sw_if_index == ~0) - return clib_error_return (0, "Please specify an interface..."); - - rv = - flowperpkt_tx_interface_add_del_feature (fm, sw_if_index, which, is_add); - switch (rv) - { - case 0: - break; - - case VNET_API_ERROR_INVALID_SW_IF_INDEX: - return clib_error_return - (0, "Invalid interface, only works on physical ports"); - break; - - case VNET_API_ERROR_UNIMPLEMENTED: - return clib_error_return (0, "ip6 not supported"); - break; - - default: - return clib_error_return (0, "flowperpkt_enable_disable returned %d", - rv); - } - return 0; -} - -/*? - * '<em>flowperpkt feature add-del</em>' commands to enable/disable - * per-packet IPFIX flow record generation on an interface - * - * @cliexpar - * @parblock - * To enable per-packet IPFIX flow-record generation on an interface: - * @cliexcmd{flowperpkt feature add-del GigabitEthernet2/0/0} - * - * To disable per-packet IPFIX flow-record generation on an interface: - * @cliexcmd{flowperpkt feature add-del GigabitEthernet2/0/0 disable} - * @cliexend - * @endparblock -?*/ -/* *INDENT-OFF* */ -VLIB_CLI_COMMAND (flowperpkt_enable_disable_command, static) = { - .path = "flowperpkt feature add-del", - .short_help = - "flowperpkt feature add-del <interface-name> [disable]", - .function = flowperpkt_tx_interface_add_del_feature_command_fn, -}; -/* *INDENT-ON* */ - -/** - * @brief Set up the API message handling tables - * @param vm vlib_main_t * vlib main data structure pointer - * @returns 0 to indicate all is well - */ -static clib_error_t * -flowperpkt_plugin_api_hookup (vlib_main_t * vm) -{ - flowperpkt_main_t *fm = &flowperpkt_main; -#define _(N,n) \ - vl_msg_api_set_handlers((VL_API_##N + fm->msg_id_base), \ - #n, \ - vl_api_##n##_t_handler, \ - vl_noop_handler, \ - vl_api_##n##_t_endian, \ - vl_api_##n##_t_print, \ - sizeof(vl_api_##n##_t), 1); - foreach_flowperpkt_plugin_api_msg; -#undef _ - - return 0; -} - -#define vl_msg_name_crc_list -#include <flowperpkt/flowperpkt_all_api_h.h> -#undef vl_msg_name_crc_list - -static void -setup_message_id_table (flowperpkt_main_t * fm, api_main_t * am) -{ -#define _(id,n,crc) \ - vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id + fm->msg_id_base); - foreach_vl_msg_name_crc_flowperpkt; -#undef _ -} - -/** - * @brief Set up the API message handling tables - * @param vm vlib_main_t * vlib main data structure pointer - * @returns 0 to indicate all is well, or a clib_error_t - */ -static clib_error_t * -flowperpkt_init (vlib_main_t * vm) -{ - flowperpkt_main_t *fm = &flowperpkt_main; - vlib_thread_main_t *tm = &vlib_thread_main; - clib_error_t *error = 0; - u32 num_threads; - u8 *name; - - fm->vnet_main = vnet_get_main (); - - /* Construct the API name */ - name = format (0, "flowperpkt_%08x%c", api_version, 0); - - /* Ask for a correctly-sized block of API message decode slots */ - fm->msg_id_base = vl_msg_api_get_msg_ids - ((char *) name, VL_MSG_FIRST_AVAILABLE); - - /* Hook up message handlers */ - error = flowperpkt_plugin_api_hookup (vm); - - /* Add our API messages to the global name_crc hash table */ - setup_message_id_table (fm, &api_main); - - vec_free (name); - - /* Decide how many worker threads we have */ - num_threads = 1 /* main thread */ + tm->n_threads; - - /* Allocate per worker thread vectors */ - vec_validate (fm->ipv4_buffers_per_worker, num_threads - 1); - vec_validate (fm->l2_buffers_per_worker, num_threads - 1); - vec_validate (fm->ipv4_frames_per_worker, num_threads - 1); - vec_validate (fm->l2_frames_per_worker, num_threads - 1); - vec_validate (fm->ipv4_next_record_offset_per_worker, num_threads - 1); - vec_validate (fm->l2_next_record_offset_per_worker, num_threads - 1); - - /* Set up time reference pair */ - fm->vlib_time_0 = vlib_time_now (vm); - fm->nanosecond_time_0 = unix_time_now_nsec (); - - return error; -} - -VLIB_INIT_FUNCTION (flowperpkt_init); - -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ diff --git a/src/plugins/flowperpkt/flowperpkt.h b/src/plugins/flowperpkt/flowperpkt.h deleted file mode 100644 index 20f6939dda5..00000000000 --- a/src/plugins/flowperpkt/flowperpkt.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * flowperpkt.h - skeleton vpp engine plug-in header file - * - * Copyright (c) <current-year> <your-organization> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __included_flowperpkt_h__ -#define __included_flowperpkt_h__ - -#include <vnet/vnet.h> -#include <vnet/ip/ip.h> -#include <vnet/ethernet/ethernet.h> - -#include <vppinfra/hash.h> -#include <vppinfra/error.h> -#include <vnet/flow/flow_report.h> -#include <vnet/flow/flow_report_classify.h> - -/** - * @file - * @brief flow-per-packet plugin header file - */ -typedef struct -{ - /** API message ID base */ - u16 msg_id_base; - - /** Have the reports [templates] been created? */ - int ipv4_report_created; - int l2_report_created; - - /** stream/template IDs */ - u16 ipv4_report_id; - u16 l2_report_id; - - /** ipfix buffers under construction, per-worker thread */ - vlib_buffer_t **ipv4_buffers_per_worker; - vlib_buffer_t **l2_buffers_per_worker; - - /** frames containing ipfix buffers, per-worker thread */ - vlib_frame_t **ipv4_frames_per_worker; - vlib_frame_t **l2_frames_per_worker; - - /** next record offset, per worker thread */ - u16 *ipv4_next_record_offset_per_worker; - u16 *l2_next_record_offset_per_worker; - - /** Time reference pair */ - u64 nanosecond_time_0; - f64 vlib_time_0; - - /** convenience vlib_main_t pointer */ - vlib_main_t *vlib_main; - /** convenience vnet_main_t pointer */ - vnet_main_t *vnet_main; -} flowperpkt_main_t; - -typedef enum -{ - FLOW_VARIANT_IPV4, - FLOW_VARIANT_L2, - FLOW_N_VARIANTS, -} flowperpkt_variant_t; - -extern flowperpkt_main_t flowperpkt_main; - -extern vlib_node_registration_t flowperpkt_ipv4_node; - -void flowperpkt_flush_callback_ipv4 (void); -void flowperpkt_flush_callback_l2 (void); - -#endif /* __included_flowperpkt_h__ */ - -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ diff --git a/src/plugins/flowperpkt/flowperpkt_plugin_doc.md b/src/plugins/flowperpkt/flowperpkt_plugin_doc.md deleted file mode 100644 index ed76c45c2dc..00000000000 --- a/src/plugins/flowperpkt/flowperpkt_plugin_doc.md +++ /dev/null @@ -1,13 +0,0 @@ -Per-packet IPFIX flow record plugin {#flowperpkt_plugin_doc} -=================================== - -## Introduction - -This plugin generates one ipfix record entry per packet transmitted -on interfaces which have the feature enabled - -## Sample configuration - -set ipfix exporter collector 192.168.6.2 src 192.168.6.1 template-interval 20 port 4739 path-mtu 1500 - -flowperpkt feature add-del GigabitEthernet2/3/0 diff --git a/src/plugins/flowperpkt/l2_node.c b/src/plugins/flowperpkt/l2_node.c deleted file mode 100644 index db80e990352..00000000000 --- a/src/plugins/flowperpkt/l2_node.c +++ /dev/null @@ -1,561 +0,0 @@ -/* - * l2_node.c - l2 ipfix-per-packet graph node - * - * Copyright (c) <current-year> <your-organization> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vlib/vlib.h> -#include <vnet/vnet.h> -#include <vnet/pg/pg.h> -#include <vppinfra/error.h> -#include <flowperpkt/flowperpkt.h> - -/** - * @file l2 flow record generator graph node - */ - -typedef struct -{ - /** interface handle */ - u32 rx_sw_if_index; - u32 tx_sw_if_index; - /** src and dst L2 addresses */ - u8 src_mac[6]; - u8 dst_mac[6]; - /** Ethertype */ - u16 ethertype; - /** packet timestamp */ - u64 timestamp; - /** size of the buffer */ - u16 buffer_size; -} flowperpkt_l2_trace_t; - -/* packet trace format function */ -static u8 * -format_flowperpkt_l2_trace (u8 * s, va_list * args) -{ - CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); - CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); - flowperpkt_l2_trace_t *t = va_arg (*args, flowperpkt_l2_trace_t *); - - s = format (s, - "FLOWPERPKT-L2: rx_sw_if_index %d, tx_sw_if_index %d, src %U dst %U ethertype %0x2, timestamp %lld, size %d", - t->rx_sw_if_index, t->tx_sw_if_index, - format_ethernet_address, &t->src_mac, - format_ethernet_address, &t->dst_mac, - t->ethertype, t->timestamp, t->buffer_size); - return s; -} - -vlib_node_registration_t flowperpkt_l2_node; - -/* No counters at the moment */ -#define foreach_flowperpkt_l2_error - -typedef enum -{ -#define _(sym,str) FLOWPERPKT_ERROR_##sym, - foreach_flowperpkt_l2_error -#undef _ - FLOWPERPKT_N_ERROR, -} flowperpkt_l2_error_t; - -static char *flowperpkt_l2_error_strings[] = { -#define _(sym,string) string, - foreach_flowperpkt_l2_error -#undef _ -}; - -typedef enum -{ - FLOWPERPKT_L2_NEXT_DROP, - FLOWPERPKT_L2_NEXT_IP4_LOOKUP, - FLOWPERPKT_L2_N_NEXT, -} flowperpkt_l2_next_t; - -/** - * @brief add an entry to the flow record under construction - * @param vm vlib_main_t * current worker thread main structure pointer - * @param fm flowperpkt_main_t * flow-per-packet main structure pointer - * @param sw_if_index u32 interface handle - * @param tos u8 ToS bits from the packet - * @param timestamp u64 timestamp, nanoseconds since 1/1/70 - * @param length u16 ip length of the packet - * @param do_flush int 1 = flush all cached records, 0 = construct a record - */ - -static inline void -add_to_flow_record_l2 (vlib_main_t * vm, - vlib_node_runtime_t * node, - flowperpkt_main_t * fm, - u32 rx_sw_if_index, u32 tx_sw_if_index, - u8 * src_mac, u8 * dst_mac, - u16 ethertype, u64 timestamp, u16 length, int do_flush) -{ - u32 my_thread_index = vm->thread_index; - flow_report_main_t *frm = &flow_report_main; - ip4_header_t *ip; - udp_header_t *udp; - ip4_ipfix_template_packet_t *tp; - ipfix_message_header_t *h; - ipfix_set_header_t *s; - vlib_frame_t *f; - vlib_buffer_t *b0; - u16 offset; - u32 bi0; - vlib_buffer_free_list_t *fl; - - /* Find or allocate a buffer */ - b0 = fm->l2_buffers_per_worker[my_thread_index]; - - /* Need to allocate a buffer? */ - if (PREDICT_FALSE (b0 == 0)) - { - /* Nothing to flush */ - if (do_flush) - return; - - /* $$$$ drop counter? */ - if (vlib_buffer_alloc (vm, &bi0, 1) != 1) - return; - - /* Initialize the buffer */ - b0 = fm->l2_buffers_per_worker[my_thread_index] = - vlib_get_buffer (vm, bi0); - fl = - vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); - vlib_buffer_init_for_free_list (b0, fl); - VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); - offset = 0; - } - else - { - /* use the current buffer */ - bi0 = vlib_get_buffer_index (vm, b0); - offset = fm->l2_next_record_offset_per_worker[my_thread_index]; - } - - /* Find or allocate a frame */ - f = fm->l2_frames_per_worker[my_thread_index]; - if (PREDICT_FALSE (f == 0)) - { - u32 *to_next; - f = vlib_get_frame_to_node (vm, ip4_lookup_node.index); - fm->l2_frames_per_worker[my_thread_index] = f; - - /* Enqueue the buffer */ - to_next = vlib_frame_vector_args (f); - to_next[0] = bi0; - f->n_vectors = 1; - } - - /* Fresh packet, construct header */ - if (PREDICT_FALSE (offset == 0)) - { - flow_report_stream_t *stream; - - stream = &frm->streams[0]; - - b0->current_data = 0; - b0->current_length = sizeof (*ip) + sizeof (*udp) + sizeof (*h) + - sizeof (*s); - b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VLIB_BUFFER_FLOW_REPORT); - vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0; - vnet_buffer (b0)->sw_if_index[VLIB_TX] = frm->fib_index; - - tp = vlib_buffer_get_current (b0); - ip = (ip4_header_t *) & tp->ip4; - udp = (udp_header_t *) (ip + 1); - h = (ipfix_message_header_t *) (udp + 1); - s = (ipfix_set_header_t *) (h + 1); - - ip->ip_version_and_header_length = 0x45; - ip->ttl = 254; - ip->protocol = IP_PROTOCOL_UDP; - ip->flags_and_fragment_offset = 0; - ip->src_address.as_u32 = frm->src_address.as_u32; - ip->dst_address.as_u32 = frm->ipfix_collector.as_u32; - udp->src_port = clib_host_to_net_u16 (UDP_DST_PORT_ipfix); - udp->dst_port = clib_host_to_net_u16 (UDP_DST_PORT_ipfix); - udp->checksum = 0; - - /* FIXUP: message header export_time */ - h->export_time = (u32) - (((f64) frm->unix_time_0) + - (vlib_time_now (frm->vlib_main) - frm->vlib_time_0)); - h->export_time = clib_host_to_net_u32 (h->export_time); - h->domain_id = clib_host_to_net_u32 (stream->domain_id); - - /* FIXUP: message header sequence_number */ - h->sequence_number = stream->sequence_number++; - h->sequence_number = clib_host_to_net_u32 (h->sequence_number); - - offset = (u32) (((u8 *) (s + 1)) - (u8 *) tp); - } - - /* Add data, unless we're flushing stale data */ - if (PREDICT_TRUE (do_flush == 0)) - { - - /* Add data */ - /* Ingress interface */ - { - u32 ingress_interface = clib_host_to_net_u32 (rx_sw_if_index); - clib_memcpy (b0->data + offset, &ingress_interface, - sizeof (ingress_interface)); - offset += sizeof (ingress_interface); - } - /* Egress interface */ - { - u32 egress_interface = clib_host_to_net_u32 (tx_sw_if_index); - clib_memcpy (b0->data + offset, &egress_interface, - sizeof (egress_interface)); - offset += sizeof (egress_interface); - } - /* src mac address */ - { - clib_memcpy (b0->data + offset, src_mac, 6); - offset += 6; - } - /* dst mac address */ - { - clib_memcpy (b0->data + offset, dst_mac, 6); - offset += 6; - } - - /* ethertype */ - b0->data[offset++] = ethertype >> 8; - b0->data[offset++] = ethertype & 0xFF; - - /* Timestamp */ - clib_memcpy (b0->data + offset, ×tamp, sizeof (f64)); - offset += sizeof (f64); - - /* pkt size */ - { - u16 pkt_size = clib_host_to_net_u16 (length); - clib_memcpy (b0->data + offset, &pkt_size, sizeof (pkt_size)); - offset += sizeof (pkt_size); - } - - b0->current_length += - /* 2*sw_if_index + 2*mac + ethertype + timestamp + length = 32 */ - 2 * sizeof (u32) + 12 + sizeof (u16) + sizeof (f64) + sizeof (u16); - - } - /* Time to flush the buffer? */ - if (PREDICT_FALSE - (do_flush || (offset + 2 * sizeof (u32) + 12 + sizeof (u16) + - +sizeof (f64) + sizeof (u16)) > frm->path_mtu)) - { - tp = vlib_buffer_get_current (b0); - ip = (ip4_header_t *) & tp->ip4; - udp = (udp_header_t *) (ip + 1); - h = (ipfix_message_header_t *) (udp + 1); - s = (ipfix_set_header_t *) (h + 1); - - s->set_id_length = ipfix_set_id_length (fm->l2_report_id, - b0->current_length - - (sizeof (*ip) + sizeof (*udp) + - sizeof (*h))); - h->version_length = version_length (b0->current_length - - (sizeof (*ip) + sizeof (*udp))); - - ip->length = clib_host_to_net_u16 (b0->current_length); - - ip->checksum = ip4_header_checksum (ip); - udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip)); - - if (frm->udp_checksum) - { - /* RFC 7011 section 10.3.2. */ - udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip); - if (udp->checksum == 0) - udp->checksum = 0xffff; - } - - ASSERT (ip->checksum == ip4_header_checksum (ip)); - - if (PREDICT_FALSE (vlib_get_trace_count (vm, node) > 0)) - { - vlib_trace_buffer (vm, node, FLOWPERPKT_L2_NEXT_IP4_LOOKUP, b0, - 0 /* follow chain */ ); - flowperpkt_l2_trace_t *t = - vlib_add_trace (vm, node, b0, sizeof (*t)); - memset (t, 0, sizeof (*t)); - t->rx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX]; - t->tx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_TX]; - t->buffer_size = b0->current_length; - } - - vlib_put_frame_to_node (vm, ip4_lookup_node.index, - fm->l2_frames_per_worker[my_thread_index]); - fm->l2_frames_per_worker[my_thread_index] = 0; - fm->l2_buffers_per_worker[my_thread_index] = 0; - offset = 0; - } - - fm->l2_next_record_offset_per_worker[my_thread_index] = offset; -} - -void -flowperpkt_flush_callback_l2 (void) -{ - vlib_main_t *vm = vlib_get_main (); - flowperpkt_main_t *fm = &flowperpkt_main; - vlib_node_runtime_t *node; - node = vlib_node_get_runtime (vm, flowperpkt_l2_node.index); - - add_to_flow_record_l2 (vm, node, fm, 0 /* rx_sw_if_index */ , - 0 /* tx_sw_if_index */ , - 0 /* src mac */ , - 0 /* dst mac */ , - 0 /* ethertype */ , - 0ULL /* timestamp */ , - 0 /* length */ , - 1 /* do_flush */ ); -} - - -static uword -flowperpkt_l2_node_fn (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * frame) -{ - u32 n_left_from, *from, *to_next; - flowperpkt_l2_next_t next_index; - flowperpkt_main_t *fm = &flowperpkt_main; - u64 now; - - now = (u64) ((vlib_time_now (vm) - fm->vlib_time_0) * 1e9); - now += fm->nanosecond_time_0; - - from = vlib_frame_vector_args (frame); - n_left_from = frame->n_vectors; - next_index = node->cached_next_index; - - while (n_left_from > 0) - { - u32 n_left_to_next; - - vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); - - while (n_left_from >= 4 && n_left_to_next >= 2) - { - u32 next0 = FLOWPERPKT_L2_NEXT_DROP; - u32 next1 = FLOWPERPKT_L2_NEXT_DROP; - ethernet_header_t *eh0, *eh1; - u16 len0, len1; - u32 bi0, bi1; - vlib_buffer_t *b0, *b1; - - /* Prefetch next iteration. */ - { - vlib_buffer_t *p2, *p3; - - p2 = vlib_get_buffer (vm, from[2]); - p3 = vlib_get_buffer (vm, from[3]); - - vlib_prefetch_buffer_header (p2, LOAD); - vlib_prefetch_buffer_header (p3, LOAD); - - CLIB_PREFETCH (p2->data, CLIB_CACHE_LINE_BYTES, STORE); - CLIB_PREFETCH (p3->data, CLIB_CACHE_LINE_BYTES, STORE); - } - - /* speculatively enqueue b0 and b1 to the current next frame */ - to_next[0] = bi0 = from[0]; - to_next[1] = bi1 = from[1]; - from += 2; - to_next += 2; - n_left_from -= 2; - n_left_to_next -= 2; - - b0 = vlib_get_buffer (vm, bi0); - b1 = vlib_get_buffer (vm, bi1); - - vnet_feature_next (vnet_buffer (b0)->sw_if_index[VLIB_TX], - &next0, b0); - vnet_feature_next (vnet_buffer (b1)->sw_if_index[VLIB_TX], - &next1, b1); - - eh0 = vlib_buffer_get_current (b0); - len0 = vlib_buffer_length_in_chain (vm, b0); - - if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) - add_to_flow_record_l2 (vm, node, fm, - vnet_buffer (b0)->sw_if_index[VLIB_RX], - vnet_buffer (b0)->sw_if_index[VLIB_TX], - eh0->src_address, - eh0->dst_address, - eh0->type, now, len0, 0 /* flush */ ); - - eh1 = vlib_buffer_get_current (b0); - len1 = vlib_buffer_length_in_chain (vm, b0); - - if (PREDICT_TRUE ((b1->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) - add_to_flow_record_l2 (vm, node, fm, - vnet_buffer (b1)->sw_if_index[VLIB_RX], - vnet_buffer (b1)->sw_if_index[VLIB_TX], - eh1->src_address, - eh1->dst_address, - eh1->type, now, len1, 0 /* flush */ ); - - if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE))) - { - if (b0->flags & VLIB_BUFFER_IS_TRACED) - { - flowperpkt_l2_trace_t *t = - vlib_add_trace (vm, node, b0, sizeof (*t)); - t->rx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX]; - t->tx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_TX]; - clib_memcpy (t->src_mac, eh0->src_address, 6); - clib_memcpy (t->dst_mac, eh0->dst_address, 6); - t->ethertype = clib_net_to_host_u16 (eh0->type); - t->timestamp = now; - t->buffer_size = len0; - } - if (b1->flags & VLIB_BUFFER_IS_TRACED) - { - flowperpkt_l2_trace_t *t = - vlib_add_trace (vm, node, b1, sizeof (*t)); - t->rx_sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_RX]; - t->tx_sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_TX]; - clib_memcpy (t->src_mac, eh1->src_address, 6); - clib_memcpy (t->dst_mac, eh1->dst_address, 6); - t->ethertype = clib_net_to_host_u16 (eh1->type); - t->timestamp = now; - t->buffer_size = len1; - } - } - - /* verify speculative enqueues, maybe switch current next frame */ - vlib_validate_buffer_enqueue_x2 (vm, node, next_index, - to_next, n_left_to_next, - bi0, bi1, next0, next1); - } - - while (n_left_from > 0 && n_left_to_next > 0) - { - u32 bi0; - vlib_buffer_t *b0; - u32 next0 = FLOWPERPKT_L2_NEXT_DROP; - ethernet_header_t *eh0; - u16 len0; - - /* speculatively enqueue b0 to the current next frame */ - bi0 = from[0]; - to_next[0] = bi0; - from += 1; - to_next += 1; - n_left_from -= 1; - n_left_to_next -= 1; - - b0 = vlib_get_buffer (vm, bi0); - - vnet_feature_next (vnet_buffer (b0)->sw_if_index[VLIB_TX], - &next0, b0); - - eh0 = vlib_buffer_get_current (b0); - len0 = vlib_buffer_length_in_chain (vm, b0); - - if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) - add_to_flow_record_l2 (vm, node, fm, - vnet_buffer (b0)->sw_if_index[VLIB_RX], - vnet_buffer (b0)->sw_if_index[VLIB_TX], - eh0->src_address, - eh0->dst_address, - eh0->type, now, len0, 0 /* flush */ ); - - if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) - && (b0->flags & VLIB_BUFFER_IS_TRACED))) - { - flowperpkt_l2_trace_t *t = - vlib_add_trace (vm, node, b0, sizeof (*t)); - t->rx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX]; - t->tx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_TX]; - clib_memcpy (t->src_mac, eh0->src_address, 6); - clib_memcpy (t->dst_mac, eh0->dst_address, 6); - t->ethertype = clib_net_to_host_u16 (eh0->type); - t->timestamp = now; - t->buffer_size = len0; - } - - /* verify speculative enqueue, maybe switch current next frame */ - vlib_validate_buffer_enqueue_x1 (vm, node, next_index, - to_next, n_left_to_next, - bi0, next0); - } - - vlib_put_next_frame (vm, node, next_index, n_left_to_next); - } - return frame->n_vectors; -} - -/** - * @brief IPFIX l2 flow-per-packet graph node - * @node flowperpkt-l2 - * - * This is the IPFIX flow-record-per-packet node. - * - * @param vm vlib_main_t corresponding to the current thread. - * @param node vlib_node_runtime_t data for this node. - * @param frame vlib_frame_t whose contents should be dispatched. - * - * @par Graph mechanics: buffer metadata, next index usage - * - * <em>Uses:</em> - * - <code>vnet_buffer(b)->ip.save_rewrite_length</code> - * - tells the node the length of the rewrite which was applied in - * ip4/6_rewrite_inline, allows the code to find the IP header without - * having to parse L2 headers, or make stupid assumptions about their - * length. - * - <code>vnet_buffer(b)->flags & VLIB_BUFFER_FLOW_REPORT</code> - * - Used to suppress flow record generation for flow record packets. - * - * <em>Sets:</em> - * - <code>vnet_buffer(b)->flags & VLIB_BUFFER_FLOW_REPORT</code> - * - To suppress flow record generation for flow record packets - * - * <em>Next Index:</em> - * - Next configured output feature on the interface, usually - * "interface-output." Generated flow records head for ip4-lookup - */ - -/* *INDENT-OFF* */ -VLIB_REGISTER_NODE (flowperpkt_l2_node) = { - .function = flowperpkt_l2_node_fn, - .name = "flowperpkt-l2", - .vector_size = sizeof (u32), - .format_trace = format_flowperpkt_l2_trace, - .type = VLIB_NODE_TYPE_INTERNAL, - - .n_errors = ARRAY_LEN(flowperpkt_l2_error_strings), - .error_strings = flowperpkt_l2_error_strings, - - .n_next_nodes = FLOWPERPKT_L2_N_NEXT, - - /* edit / add dispositions here */ - .next_nodes = { - [FLOWPERPKT_L2_NEXT_DROP] = "error-drop", - [FLOWPERPKT_L2_NEXT_IP4_LOOKUP] = "ip4-lookup", - }, -}; -/* *INDENT-ON* */ - -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ diff --git a/src/plugins/flowperpkt/node.c b/src/plugins/flowperpkt/node.c deleted file mode 100644 index 9bac4166a97..00000000000 --- a/src/plugins/flowperpkt/node.c +++ /dev/null @@ -1,574 +0,0 @@ -/* - * node.c - ipv4 ipfix-per-packet graph node - * - * Copyright (c) <current-year> <your-organization> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vlib/vlib.h> -#include <vnet/vnet.h> -#include <vnet/pg/pg.h> -#include <vppinfra/error.h> -#include <flowperpkt/flowperpkt.h> - -/** - * @file ipv4 flow record generator graph node - */ - -typedef struct -{ - /** interface handle */ - u32 rx_sw_if_index; - u32 tx_sw_if_index; - u32 src_address; - u32 dst_address; - /** ToS bits */ - u8 tos; - /** packet timestamp */ - u64 timestamp; - /** size of the buffer */ - u16 buffer_size; -} flowperpkt_ipv4_trace_t; - -/* packet trace format function */ -static u8 * -format_flowperpkt_ipv4_trace (u8 * s, va_list * args) -{ - CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); - CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); - flowperpkt_ipv4_trace_t *t = va_arg (*args, flowperpkt_ipv4_trace_t *); - - s = format (s, - "FLOWPERPKT-V4: rx_sw_if_index %d, tx_sw_if_index %d, src %U dst %U tos %0x2, timestamp %lld, size %d", - t->rx_sw_if_index, t->tx_sw_if_index, - format_ip4_address, &t->src_address, - format_ip4_address, &t->dst_address, - t->tos, t->timestamp, t->buffer_size); - return s; -} - -vlib_node_registration_t flowperpkt_ipv4_node; - -/* No counters at the moment */ -#define foreach_flowperpkt_ipv4_error - -typedef enum -{ -#define _(sym,str) FLOWPERPKT_ERROR_##sym, - foreach_flowperpkt_ipv4_error -#undef _ - FLOWPERPKT_N_ERROR, -} flowperpkt_ipv4_error_t; - -static char *flowperpkt_ipv4_error_strings[] = { -#define _(sym,string) string, - foreach_flowperpkt_ipv4_error -#undef _ -}; - -typedef enum -{ - FLOWPERPKT_IPV4_NEXT_DROP, - FLOWPERPKT_IPV4_NEXT_LOOKUP, - FLOWPERPKT_IPV4_N_NEXT, -} flowperpkt_ipv4_next_t; - -/** - * @brief add an entry to the flow record under construction - * @param vm vlib_main_t * current worker thread main structure pointer - * @param fm flowperpkt_main_t * flow-per-packet main structure pointer - * @param sw_if_index u32 interface handle - * @param tos u8 ToS bits from the packet - * @param timestamp u64 timestamp, nanoseconds since 1/1/70 - * @param length u16 ip length of the packet - * @param do_flush int 1 = flush all cached records, 0 = construct a record - */ - -static inline void -add_to_flow_record_ipv4 (vlib_main_t * vm, - vlib_node_runtime_t * node, - flowperpkt_main_t * fm, - u32 rx_sw_if_index, u32 tx_sw_if_index, - u32 src_address, u32 dst_address, - u8 tos, u64 timestamp, u16 length, int do_flush) -{ - u32 my_thread_index = vm->thread_index; - flow_report_main_t *frm = &flow_report_main; - ip4_header_t *ip; - udp_header_t *udp; - ip4_ipfix_template_packet_t *tp; - ipfix_message_header_t *h; - ipfix_set_header_t *s; - vlib_frame_t *f; - vlib_buffer_t *b0; - u16 offset; - u32 bi0; - vlib_buffer_free_list_t *fl; - - /* Find or allocate a buffer */ - b0 = fm->ipv4_buffers_per_worker[my_thread_index]; - - /* Need to allocate a buffer? */ - if (PREDICT_FALSE (b0 == 0)) - { - /* Nothing to flush */ - if (do_flush) - return; - - /* $$$$ drop counter? */ - if (vlib_buffer_alloc (vm, &bi0, 1) != 1) - return; - - /* Initialize the buffer */ - b0 = fm->ipv4_buffers_per_worker[my_thread_index] = - vlib_get_buffer (vm, bi0); - fl = - vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); - vlib_buffer_init_for_free_list (b0, fl); - VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); - offset = 0; - } - else - { - /* use the current buffer */ - bi0 = vlib_get_buffer_index (vm, b0); - offset = fm->ipv4_next_record_offset_per_worker[my_thread_index]; - } - - /* Find or allocate a frame */ - f = fm->ipv4_frames_per_worker[my_thread_index]; - if (PREDICT_FALSE (f == 0)) - { - u32 *to_next; - f = vlib_get_frame_to_node (vm, ip4_lookup_node.index); - fm->ipv4_frames_per_worker[my_thread_index] = f; - - /* Enqueue the buffer */ - to_next = vlib_frame_vector_args (f); - to_next[0] = bi0; - f->n_vectors = 1; - } - - /* Fresh packet, construct header */ - if (PREDICT_FALSE (offset == 0)) - { - flow_report_stream_t *stream; - - stream = &frm->streams[0]; - - b0->current_data = 0; - b0->current_length = sizeof (*ip) + sizeof (*udp) + sizeof (*h) + - sizeof (*s); - b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VLIB_BUFFER_FLOW_REPORT); - vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0; - vnet_buffer (b0)->sw_if_index[VLIB_TX] = frm->fib_index; - - tp = vlib_buffer_get_current (b0); - ip = (ip4_header_t *) & tp->ip4; - udp = (udp_header_t *) (ip + 1); - h = (ipfix_message_header_t *) (udp + 1); - s = (ipfix_set_header_t *) (h + 1); - - ip->ip_version_and_header_length = 0x45; - ip->ttl = 254; - ip->protocol = IP_PROTOCOL_UDP; - ip->flags_and_fragment_offset = 0; - ip->src_address.as_u32 = frm->src_address.as_u32; - ip->dst_address.as_u32 = frm->ipfix_collector.as_u32; - udp->src_port = clib_host_to_net_u16 (UDP_DST_PORT_ipfix); - udp->dst_port = clib_host_to_net_u16 (UDP_DST_PORT_ipfix); - udp->checksum = 0; - - /* FIXUP: message header export_time */ - h->export_time = (u32) - (((f64) frm->unix_time_0) + - (vlib_time_now (frm->vlib_main) - frm->vlib_time_0)); - h->export_time = clib_host_to_net_u32 (h->export_time); - h->domain_id = clib_host_to_net_u32 (stream->domain_id); - - /* FIXUP: message header sequence_number */ - h->sequence_number = stream->sequence_number++; - h->sequence_number = clib_host_to_net_u32 (h->sequence_number); - - offset = (u32) (((u8 *) (s + 1)) - (u8 *) tp); - } - - /* Add data, unless we're flushing stale data */ - if (PREDICT_TRUE (do_flush == 0)) - { - - /* Add data */ - /* Ingress interface */ - { - u32 ingress_interface = clib_host_to_net_u32 (rx_sw_if_index); - clib_memcpy (b0->data + offset, &ingress_interface, - sizeof (ingress_interface)); - offset += sizeof (ingress_interface); - } - /* Egress interface */ - { - u32 egress_interface = clib_host_to_net_u32 (tx_sw_if_index); - clib_memcpy (b0->data + offset, &egress_interface, - sizeof (egress_interface)); - offset += sizeof (egress_interface); - } - /* ip4 src address */ - { - clib_memcpy (b0->data + offset, &src_address, sizeof (src_address)); - offset += sizeof (src_address); - } - /* ip4 dst address */ - { - clib_memcpy (b0->data + offset, &dst_address, sizeof (dst_address)); - offset += sizeof (dst_address); - } - - /* ToS */ - b0->data[offset++] = tos; - - /* Timestamp */ - clib_memcpy (b0->data + offset, ×tamp, sizeof (f64)); - offset += sizeof (f64); - - /* pkt size */ - { - u16 pkt_size = clib_host_to_net_u16 (length); - clib_memcpy (b0->data + offset, &pkt_size, sizeof (pkt_size)); - offset += sizeof (pkt_size); - } - - b0->current_length += - /* sw_if_index + tos + timestamp + length = 15 */ - 4 * sizeof (u32) + sizeof (u8) + sizeof (f64) + sizeof (u16); - - } - /* Time to flush the buffer? */ - if (PREDICT_FALSE - (do_flush || (offset + 4 * sizeof (u32) + sizeof (u8) - + sizeof (f64) + sizeof (u16)) > frm->path_mtu)) - { - tp = vlib_buffer_get_current (b0); - ip = (ip4_header_t *) & tp->ip4; - udp = (udp_header_t *) (ip + 1); - h = (ipfix_message_header_t *) (udp + 1); - s = (ipfix_set_header_t *) (h + 1); - - s->set_id_length = ipfix_set_id_length (fm->ipv4_report_id, - b0->current_length - - (sizeof (*ip) + sizeof (*udp) + - sizeof (*h))); - h->version_length = version_length (b0->current_length - - (sizeof (*ip) + sizeof (*udp))); - - ip->length = clib_host_to_net_u16 (b0->current_length); - - ip->checksum = ip4_header_checksum (ip); - udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip)); - - if (frm->udp_checksum) - { - /* RFC 7011 section 10.3.2. */ - udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip); - if (udp->checksum == 0) - udp->checksum = 0xffff; - } - - ASSERT (ip->checksum == ip4_header_checksum (ip)); - - if (PREDICT_FALSE (vlib_get_trace_count (vm, node) > 0)) - { - vlib_trace_buffer (vm, node, FLOWPERPKT_IPV4_NEXT_LOOKUP, b0, - 0 /* follow chain */ ); - flowperpkt_ipv4_trace_t *t = - vlib_add_trace (vm, node, b0, sizeof (*t)); - t->rx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX]; - t->tx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_TX]; - t->src_address = 0; - t->dst_address = 0; - t->tos = 0; - t->timestamp = 0; - t->buffer_size = b0->current_length; - } - - vlib_put_frame_to_node (vm, ip4_lookup_node.index, - fm->ipv4_frames_per_worker[my_thread_index]); - fm->ipv4_frames_per_worker[my_thread_index] = 0; - fm->ipv4_buffers_per_worker[my_thread_index] = 0; - offset = 0; - } - - fm->ipv4_next_record_offset_per_worker[my_thread_index] = offset; -} - -void -flowperpkt_flush_callback_ipv4 (void) -{ - vlib_main_t *vm = vlib_get_main (); - flowperpkt_main_t *fm = &flowperpkt_main; - vlib_node_runtime_t *node; - node = vlib_node_get_runtime (vm, flowperpkt_ipv4_node.index); - - add_to_flow_record_ipv4 (vm, node, fm, 0 /* rx_sw_if_index */ , - 0 /* tx_sw_if_index */ , - 0 /* src_address */ , - 0 /* dst_address */ , - 0 /* ToS */ , - 0ULL /* timestamp */ , - 0 /* length */ , - 1 /* do_flush */ ); -} - - -static uword -flowperpkt_ipv4_node_fn (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * frame) -{ - u32 n_left_from, *from, *to_next; - flowperpkt_ipv4_next_t next_index; - flowperpkt_main_t *fm = &flowperpkt_main; - u64 now; - - now = (u64) ((vlib_time_now (vm) - fm->vlib_time_0) * 1e9); - now += fm->nanosecond_time_0; - - from = vlib_frame_vector_args (frame); - n_left_from = frame->n_vectors; - next_index = node->cached_next_index; - - while (n_left_from > 0) - { - u32 n_left_to_next; - - vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); - - while (n_left_from >= 4 && n_left_to_next >= 2) - { - u32 next0 = FLOWPERPKT_IPV4_NEXT_DROP; - u32 next1 = FLOWPERPKT_IPV4_NEXT_DROP; - ip4_header_t *ip0, *ip1; - u16 len0, len1; - u32 bi0, bi1; - vlib_buffer_t *b0, *b1; - - /* Prefetch next iteration. */ - { - vlib_buffer_t *p2, *p3; - - p2 = vlib_get_buffer (vm, from[2]); - p3 = vlib_get_buffer (vm, from[3]); - - vlib_prefetch_buffer_header (p2, LOAD); - vlib_prefetch_buffer_header (p3, LOAD); - - CLIB_PREFETCH (p2->data, CLIB_CACHE_LINE_BYTES, STORE); - CLIB_PREFETCH (p3->data, CLIB_CACHE_LINE_BYTES, STORE); - } - - /* speculatively enqueue b0 and b1 to the current next frame */ - to_next[0] = bi0 = from[0]; - to_next[1] = bi1 = from[1]; - from += 2; - to_next += 2; - n_left_from -= 2; - n_left_to_next -= 2; - - b0 = vlib_get_buffer (vm, bi0); - b1 = vlib_get_buffer (vm, bi1); - - vnet_feature_next (vnet_buffer (b0)->sw_if_index[VLIB_TX], - &next0, b0); - vnet_feature_next (vnet_buffer (b1)->sw_if_index[VLIB_TX], - &next1, b1); - - ip0 = (ip4_header_t *) ((u8 *) vlib_buffer_get_current (b0) + - vnet_buffer (b0)->ip.save_rewrite_length); - - len0 = vlib_buffer_length_in_chain (vm, b0); - - if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) - add_to_flow_record_ipv4 (vm, node, fm, - vnet_buffer (b0)->sw_if_index[VLIB_RX], - vnet_buffer (b0)->sw_if_index[VLIB_TX], - ip0->src_address.as_u32, - ip0->dst_address.as_u32, - ip0->tos, now, len0, 0 /* flush */ ); - - ip1 = (ip4_header_t *) ((u8 *) vlib_buffer_get_current (b1) + - vnet_buffer (b1)->ip.save_rewrite_length); - len1 = vlib_buffer_length_in_chain (vm, b1); - - if (PREDICT_TRUE ((b1->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) - add_to_flow_record_ipv4 (vm, node, fm, - vnet_buffer (b1)->sw_if_index[VLIB_RX], - vnet_buffer (b1)->sw_if_index[VLIB_TX], - ip1->src_address.as_u32, - ip1->dst_address.as_u32, - ip1->tos, now, len1, 0 /* flush */ ); - - if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE))) - { - if (b0->flags & VLIB_BUFFER_IS_TRACED) - { - flowperpkt_ipv4_trace_t *t = - vlib_add_trace (vm, node, b0, sizeof (*t)); - t->rx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX]; - t->tx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_TX]; - t->src_address = ip0->src_address.as_u32; - t->dst_address = ip0->dst_address.as_u32; - t->tos = ip0->tos; - t->timestamp = now; - t->buffer_size = len0; - } - if (b1->flags & VLIB_BUFFER_IS_TRACED) - { - flowperpkt_ipv4_trace_t *t = - vlib_add_trace (vm, node, b1, sizeof (*t)); - t->rx_sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_RX]; - t->tx_sw_if_index = vnet_buffer (b1)->sw_if_index[VLIB_TX]; - t->src_address = ip1->src_address.as_u32; - t->dst_address = ip1->dst_address.as_u32; - t->tos = ip1->tos; - t->timestamp = now; - t->buffer_size = len1; - } - } - - /* verify speculative enqueues, maybe switch current next frame */ - vlib_validate_buffer_enqueue_x2 (vm, node, next_index, - to_next, n_left_to_next, - bi0, bi1, next0, next1); - } - - while (n_left_from > 0 && n_left_to_next > 0) - { - u32 bi0; - vlib_buffer_t *b0; - u32 next0 = FLOWPERPKT_IPV4_NEXT_DROP; - ip4_header_t *ip0; - u16 len0; - - /* speculatively enqueue b0 to the current next frame */ - bi0 = from[0]; - to_next[0] = bi0; - from += 1; - to_next += 1; - n_left_from -= 1; - n_left_to_next -= 1; - - b0 = vlib_get_buffer (vm, bi0); - - vnet_feature_next (vnet_buffer (b0)->sw_if_index[VLIB_TX], - &next0, b0); - - ip0 = (ip4_header_t *) ((u8 *) vlib_buffer_get_current (b0) + - vnet_buffer (b0)->ip.save_rewrite_length); - /* - * egressInterface, TLV type 14, u32 - * ipClassOfService, TLV type 5, u8 - * flowStartNanoseconds, TLV type 156, dateTimeNanoseconds (f64) - * Implementation: f64 nanoseconds since VPP started - * dataLinkFrameSize, TLV type 312, u16 - */ - len0 = vlib_buffer_length_in_chain (vm, b0); - - if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) - add_to_flow_record_ipv4 (vm, node, fm, - vnet_buffer (b0)->sw_if_index[VLIB_RX], - vnet_buffer (b0)->sw_if_index[VLIB_TX], - ip0->src_address.as_u32, - ip0->dst_address.as_u32, - ip0->tos, now, len0, 0 /* flush */ ); - - if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) - && (b0->flags & VLIB_BUFFER_IS_TRACED))) - { - flowperpkt_ipv4_trace_t *t = - vlib_add_trace (vm, node, b0, sizeof (*t)); - t->rx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_RX]; - t->tx_sw_if_index = vnet_buffer (b0)->sw_if_index[VLIB_TX]; - t->src_address = ip0->src_address.as_u32; - t->dst_address = ip0->dst_address.as_u32; - t->tos = ip0->tos; - t->timestamp = now; - t->buffer_size = len0; - } - - /* verify speculative enqueue, maybe switch current next frame */ - vlib_validate_buffer_enqueue_x1 (vm, node, next_index, - to_next, n_left_to_next, - bi0, next0); - } - - vlib_put_next_frame (vm, node, next_index, n_left_to_next); - } - return frame->n_vectors; -} - -/** - * @brief IPFIX ipv4 flow-per-packet graph node - * @node flowperpkt-ipv4 - * - * This is the IPFIX flow-record-per-packet node. - * - * @param vm vlib_main_t corresponding to the current thread. - * @param node vlib_node_runtime_t data for this node. - * @param frame vlib_frame_t whose contents should be dispatched. - * - * @par Graph mechanics: buffer metadata, next index usage - * - * <em>Uses:</em> - * - <code>vnet_buffer(b)->ip.save_rewrite_length</code> - * - tells the node the length of the rewrite which was applied in - * ip4/6_rewrite_inline, allows the code to find the IP header without - * having to parse L2 headers, or make stupid assumptions about their - * length. - * - <code>vnet_buffer(b)->flags & VLIB_BUFFER_FLOW_REPORT</code> - * - Used to suppress flow record generation for flow record packets. - * - * <em>Sets:</em> - * - <code>vnet_buffer(b)->flags & VLIB_BUFFER_FLOW_REPORT</code> - * - To suppress flow record generation for flow record packets - * - * <em>Next Index:</em> - * - Next configured output feature on the interface, usually - * "interface-output." Generated flow records head for ip4-lookup - */ - -/* *INDENT-OFF* */ -VLIB_REGISTER_NODE (flowperpkt_ipv4_node) = { - .function = flowperpkt_ipv4_node_fn, - .name = "flowperpkt-ipv4", - .vector_size = sizeof (u32), - .format_trace = format_flowperpkt_ipv4_trace, - .type = VLIB_NODE_TYPE_INTERNAL, - - .n_errors = ARRAY_LEN(flowperpkt_ipv4_error_strings), - .error_strings = flowperpkt_ipv4_error_strings, - - .n_next_nodes = FLOWPERPKT_IPV4_N_NEXT, - - /* edit / add dispositions here */ - .next_nodes = { - [FLOWPERPKT_IPV4_NEXT_DROP] = "error-drop", - /* Used only to trace ipfix data packets */ - [FLOWPERPKT_IPV4_NEXT_LOOKUP] = "ip4-lookup", - }, -}; -/* *INDENT-ON* */ - -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ diff --git a/src/plugins/flowperpkt.am b/src/plugins/flowprobe.am index a400603a71f..c56e246d219 100644 --- a/src/plugins/flowperpkt.am +++ b/src/plugins/flowprobe.am @@ -12,27 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. -vppplugins_LTLIBRARIES += flowperpkt_plugin.la -vppapitestplugins_LTLIBRARIES += flowperpkt_test_plugin.la +vppplugins_LTLIBRARIES += flowprobe_plugin.la +vppapitestplugins_LTLIBRARIES += flowprobe_test_plugin.la -flowperpkt_plugin_la_SOURCES = flowperpkt/flowperpkt.c \ - flowperpkt/l2_node.c \ - flowperpkt/node.c \ - flowperpkt/flowperpkt_plugin.api.h +flowprobe_plugin_la_SOURCES = flowprobe/flowprobe.c \ + flowprobe/node.c \ + flowprobe/flowprobe_plugin.api.h BUILT_SOURCES += \ - flowperpkt/flowperpkt.api.h \ - flowperpkt/flowperpkt.api.json + flowprobe/flowprobe.api.h \ + flowprobe/flowprobe.api.json noinst_HEADERS += \ - flowperpkt/flowperpkt_all_api_h.h \ - flowperpkt/flowperpkt_msg_enum.h \ - flowperpkt/flowperpkt.api.h + flowprobe/flowprobe_all_api_h.h \ + flowprobe/flowprobe_msg_enum.h \ + flowprobe/flowprobe.api.h -flowperpkt_test_plugin_la_SOURCES = \ - flowperpkt/flowperpkt_test.c \ - flowperpkt/flowperpkt_plugin.api.h +flowprobe_test_plugin_la_SOURCES = \ + flowprobe/flowprobe_test.c \ + flowprobe/flowprobe_plugin.api.h -API_FILES += flowperpkt/flowperpkt.api +API_FILES += flowprobe/flowprobe.api # vi:syntax=automake diff --git a/src/plugins/flowperpkt/flowperpkt.api b/src/plugins/flowprobe/flowprobe.api index 3ff92dca05f..3f8c583bcf2 100644 --- a/src/plugins/flowperpkt/flowperpkt.api +++ b/src/plugins/flowprobe/flowprobe.api @@ -2,7 +2,7 @@ /** \file This file defines the vpp control-plane API messages - used to control the flowperpkt plugin + used to control the flowprobe plugin */ /** \brief Enable / disable per-packet IPFIX recording on an interface @@ -12,7 +12,7 @@ @param is_ipv6 - if non-zero the address is ipv6, else ipv4 @param sw_if_index - index of the interface */ -autoreply manual_print define flowperpkt_tx_interface_add_del +autoreply manual_print define flowprobe_tx_interface_add_del { /* Client identifier, set from api_main.my_client_index */ u32 client_index; @@ -28,7 +28,7 @@ autoreply manual_print define flowperpkt_tx_interface_add_del u32 sw_if_index; }; -autoreply define flowperpkt_params +autoreply define flowprobe_params { u32 client_index; u32 context; diff --git a/src/plugins/flowprobe/flowprobe.c b/src/plugins/flowprobe/flowprobe.c new file mode 100644 index 00000000000..8975f89cddf --- /dev/null +++ b/src/plugins/flowprobe/flowprobe.c @@ -0,0 +1,1122 @@ +/* + * flowprobe.c - ipfix probe plugin + * + * Copyright (c) 2016 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * @brief Per-packet IPFIX flow record generator plugin + * + * This file implements vpp plugin registration mechanics, + * debug CLI, and binary API handling. + */ + +#include <vnet/vnet.h> +#include <vpp/app/version.h> +#include <vnet/plugin/plugin.h> +#include <flowprobe/flowprobe.h> + +#include <vlibapi/api.h> +#include <vlibmemory/api.h> +#include <vlibsocket/api.h> + +/* define message IDs */ +#include <flowprobe/flowprobe_msg_enum.h> + +/* define message structures */ +#define vl_typedefs +#include <flowprobe/flowprobe_all_api_h.h> +#undef vl_typedefs + +/* define generated endian-swappers */ +#define vl_endianfun +#include <flowprobe/flowprobe_all_api_h.h> +#undef vl_endianfun + +/* instantiate all the print functions we know about */ +#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__) +#define vl_printfun +#include <flowprobe/flowprobe_all_api_h.h> +#undef vl_printfun + +flowprobe_main_t flowprobe_main; +vlib_node_registration_t flowprobe_walker_node; +static vlib_node_registration_t flowprobe_timer_node; +uword flowprobe_walker_process (vlib_main_t * vm, vlib_node_runtime_t * rt, + vlib_frame_t * f); + +/* Get the API version number */ +#define vl_api_version(n,v) static u32 api_version=(v); +#include <flowprobe/flowprobe_all_api_h.h> +#undef vl_api_version + +#define REPLY_MSG_ID_BASE fm->msg_id_base +#include <vlibapi/api_helper_macros.h> + +/* Define the per-interface configurable features */ +/* *INDENT-OFF* */ +VNET_FEATURE_INIT (flow_perpacket_ip4, static) = +{ + .arc_name = "ip4-output", + .node_name = "flowprobe-ip4", + .runs_before = VNET_FEATURES ("interface-output"), +}; + +VNET_FEATURE_INIT (flow_perpacket_ip6, static) = +{ + .arc_name = "ip6-output", + .node_name = "flowprobe-ip6", + .runs_before = VNET_FEATURES ("interface-output"), +}; + +VNET_FEATURE_INIT (flow_perpacket_l2, static) = +{ + .arc_name = "interface-output", + .node_name = "flowprobe-l2", + .runs_before = VNET_FEATURES ("interface-tx"), +}; +/* *INDENT-ON* */ + +/* Macro to finish up custom dump fns */ +#define FINISH \ + vec_add1 (s, 0); \ + vl_print (handle, (char *)s); \ + vec_free (s); \ + return handle; + +static inline ipfix_field_specifier_t * +flowprobe_template_ip4_fields (ipfix_field_specifier_t * f) +{ +#define flowprobe_template_ip4_field_count() 4 + /* sourceIpv4Address, TLV type 8, u32 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + sourceIPv4Address, 4); + f++; + /* destinationIPv4Address, TLV type 12, u32 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + destinationIPv4Address, 4); + f++; + /* protocolIdentifier, TLV type 4, u8 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + protocolIdentifier, 1); + f++; + /* octetDeltaCount, TLV type 1, u64 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + octetDeltaCount, 8); + f++; + return f; +} + +static inline ipfix_field_specifier_t * +flowprobe_template_ip6_fields (ipfix_field_specifier_t * f) +{ +#define flowprobe_template_ip6_field_count() 4 + /* sourceIpv6Address, TLV type 27, 16 octets */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + sourceIPv6Address, 16); + f++; + /* destinationIPv6Address, TLV type 28, 16 octets */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + destinationIPv6Address, 16); + f++; + /* protocolIdentifier, TLV type 4, u8 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + protocolIdentifier, 1); + f++; + /* octetDeltaCount, TLV type 1, u64 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + octetDeltaCount, 8); + f++; + return f; +} + +static inline ipfix_field_specifier_t * +flowprobe_template_l2_fields (ipfix_field_specifier_t * f) +{ +#define flowprobe_template_l2_field_count() 3 + /* sourceMacAddress, TLV type 56, u8[6] we hope */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + sourceMacAddress, 6); + f++; + /* destinationMacAddress, TLV type 80, u8[6] we hope */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + destinationMacAddress, 6); + f++; + /* ethernetType, TLV type 256, u16 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + ethernetType, 2); + f++; + return f; +} + +static inline ipfix_field_specifier_t * +flowprobe_template_common_fields (ipfix_field_specifier_t * f) +{ +#define flowprobe_template_common_field_count() 3 + /* ingressInterface, TLV type 10, u32 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + ingressInterface, 4); + f++; + + /* egressInterface, TLV type 14, u32 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + egressInterface, 4); + f++; + + /* packetDeltaCount, TLV type 2, u64 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + packetDeltaCount, 8); + f++; + + return f; +} + +static inline ipfix_field_specifier_t * +flowprobe_template_l4_fields (ipfix_field_specifier_t * f) +{ +#define flowprobe_template_l4_field_count() 2 + /* sourceTransportPort, TLV type 7, u16 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + sourceTransportPort, 2); + f++; + /* destinationTransportPort, TLV type 11, u16 */ + f->e_id_length = ipfix_e_id_length (0 /* enterprise */ , + destinationTransportPort, 2); + f++; + return f; +} + +/** + * @brief Create an IPFIX template packet rewrite string + * @param frm flow_report_main_t * + * @param fr flow_report_t * + * @param collector_address ip4_address_t * the IPFIX collector address + * @param src_address ip4_address_t * the source address we should use + * @param collector_port u16 the collector port we should use, host byte order + * @returns u8 * vector containing the indicated IPFIX template packet + */ +static inline u8 * +flowprobe_template_rewrite_inline (flow_report_main_t * frm, + flow_report_t * fr, + ip4_address_t * collector_address, + ip4_address_t * src_address, + u16 collector_port, + flowprobe_variant_t which) +{ + ip4_header_t *ip; + udp_header_t *udp; + ipfix_message_header_t *h; + ipfix_set_header_t *s; + ipfix_template_header_t *t; + ipfix_field_specifier_t *f; + ipfix_field_specifier_t *first_field; + u8 *rewrite = 0; + ip4_ipfix_template_packet_t *tp; + u32 field_count = 0; + flow_report_stream_t *stream; + flowprobe_main_t *fm = &flowprobe_main; + flowprobe_record_t flags = fr->opaque.as_uword; + bool collect_ip4 = false, collect_ip6 = false; + + stream = &frm->streams[fr->stream_index]; + + if (flags & FLOW_RECORD_L3) + { + collect_ip4 = which == FLOW_VARIANT_L2_IP4 || which == FLOW_VARIANT_IP4; + collect_ip6 = which == FLOW_VARIANT_L2_IP6 || which == FLOW_VARIANT_IP6; + if (which == FLOW_VARIANT_L2_IP4) + flags |= FLOW_RECORD_L2_IP4; + if (which == FLOW_VARIANT_L2_IP6) + flags |= FLOW_RECORD_L2_IP6; + } + + field_count += flowprobe_template_common_field_count (); + if (flags & FLOW_RECORD_L2) + field_count += flowprobe_template_l2_field_count (); + if (collect_ip4) + field_count += flowprobe_template_ip4_field_count (); + if (collect_ip6) + field_count += flowprobe_template_ip6_field_count (); + if (flags & FLOW_RECORD_L4) + field_count += flowprobe_template_l4_field_count (); + + /* allocate rewrite space */ + vec_validate_aligned + (rewrite, sizeof (ip4_ipfix_template_packet_t) + + field_count * sizeof (ipfix_field_specifier_t) - 1, + CLIB_CACHE_LINE_BYTES); + + tp = (ip4_ipfix_template_packet_t *) rewrite; + ip = (ip4_header_t *) & tp->ip4; + udp = (udp_header_t *) (ip + 1); + h = (ipfix_message_header_t *) (udp + 1); + s = (ipfix_set_header_t *) (h + 1); + t = (ipfix_template_header_t *) (s + 1); + first_field = f = (ipfix_field_specifier_t *) (t + 1); + + ip->ip_version_and_header_length = 0x45; + ip->ttl = 254; + ip->protocol = IP_PROTOCOL_UDP; + ip->src_address.as_u32 = src_address->as_u32; + ip->dst_address.as_u32 = collector_address->as_u32; + udp->src_port = clib_host_to_net_u16 (stream->src_port); + udp->dst_port = clib_host_to_net_u16 (collector_port); + udp->length = clib_host_to_net_u16 (vec_len (rewrite) - sizeof (*ip)); + + /* FIXUP: message header export_time */ + /* FIXUP: message header sequence_number */ + h->domain_id = clib_host_to_net_u32 (stream->domain_id); + + /* Add TLVs to the template */ + f = flowprobe_template_common_fields (f); + + if (flags & FLOW_RECORD_L2) + f = flowprobe_template_l2_fields (f); + if (collect_ip4) + f = flowprobe_template_ip4_fields (f); + if (collect_ip6) + f = flowprobe_template_ip6_fields (f); + if (flags & FLOW_RECORD_L4) + f = flowprobe_template_l4_fields (f); + + /* Back to the template packet... */ + ip = (ip4_header_t *) & tp->ip4; + udp = (udp_header_t *) (ip + 1); + + ASSERT (f - first_field); + /* Field count in this template */ + t->id_count = ipfix_id_count (fr->template_id, f - first_field); + + fm->template_size[flags] = (u8 *) f - (u8 *) s; + + /* set length in octets */ + s->set_id_length = + ipfix_set_id_length (2 /* set_id */ , (u8 *) f - (u8 *) s); + + /* message length in octets */ + h->version_length = version_length ((u8 *) f - (u8 *) h); + + ip->length = clib_host_to_net_u16 ((u8 *) f - (u8 *) ip); + ip->checksum = ip4_header_checksum (ip); + + return rewrite; +} + +static u8 * +flowprobe_template_rewrite_ip6 (flow_report_main_t * frm, + flow_report_t * fr, + ip4_address_t * collector_address, + ip4_address_t * src_address, + u16 collector_port) +{ + return flowprobe_template_rewrite_inline + (frm, fr, collector_address, src_address, collector_port, + FLOW_VARIANT_IP6); +} + +static u8 * +flowprobe_template_rewrite_ip4 (flow_report_main_t * frm, + flow_report_t * fr, + ip4_address_t * collector_address, + ip4_address_t * src_address, + u16 collector_port) +{ + return flowprobe_template_rewrite_inline + (frm, fr, collector_address, src_address, collector_port, + FLOW_VARIANT_IP4); +} + +static u8 * +flowprobe_template_rewrite_l2 (flow_report_main_t * frm, + flow_report_t * fr, + ip4_address_t * collector_address, + ip4_address_t * src_address, + u16 collector_port) +{ + return flowprobe_template_rewrite_inline + (frm, fr, collector_address, src_address, collector_port, + FLOW_VARIANT_L2); +} + +static u8 * +flowprobe_template_rewrite_l2_ip4 (flow_report_main_t * frm, + flow_report_t * fr, + ip4_address_t * collector_address, + ip4_address_t * src_address, + u16 collector_port) +{ + return flowprobe_template_rewrite_inline + (frm, fr, collector_address, src_address, collector_port, + FLOW_VARIANT_L2_IP4); +} + +static u8 * +flowprobe_template_rewrite_l2_ip6 (flow_report_main_t * frm, + flow_report_t * fr, + ip4_address_t * collector_address, + ip4_address_t * src_address, + u16 collector_port) +{ + return flowprobe_template_rewrite_inline + (frm, fr, collector_address, src_address, collector_port, + FLOW_VARIANT_L2_IP6); +} + +/** + * @brief Flush accumulated data + * @param frm flow_report_main_t * + * @param fr flow_report_t * + * @param f vlib_frame_t * + * + * <em>Notes:</em> + * This function must simply return the incoming frame, or no template packets + * will be sent. + */ +vlib_frame_t * +flowprobe_data_callback_ip4 (flow_report_main_t * frm, + flow_report_t * fr, + vlib_frame_t * f, u32 * to_next, u32 node_index) +{ + flowprobe_flush_callback_ip4 (); + return f; +} + +vlib_frame_t * +flowprobe_data_callback_ip6 (flow_report_main_t * frm, + flow_report_t * fr, + vlib_frame_t * f, u32 * to_next, u32 node_index) +{ + flowprobe_flush_callback_ip6 (); + return f; +} + +vlib_frame_t * +flowprobe_data_callback_l2 (flow_report_main_t * frm, + flow_report_t * fr, + vlib_frame_t * f, u32 * to_next, u32 node_index) +{ + flowprobe_flush_callback_l2 (); + return f; +} + +static int +flowprobe_template_add_del (u32 domain_id, u16 src_port, + flowprobe_record_t flags, + vnet_flow_data_callback_t * flow_data_callback, + vnet_flow_rewrite_callback_t * rewrite_callback, + bool is_add, u16 * template_id) +{ + flow_report_main_t *frm = &flow_report_main; + vnet_flow_report_add_del_args_t a = { + .rewrite_callback = rewrite_callback, + .flow_data_callback = flow_data_callback, + .is_add = is_add, + .domain_id = domain_id, + .src_port = src_port, + .opaque.as_uword = flags, + }; + return vnet_flow_report_add_del (frm, &a, template_id); +} + +static void +flowprobe_expired_timer_callback (u32 * expired_timers) +{ + vlib_main_t *vm = vlib_get_main (); + flowprobe_main_t *fm = &flowprobe_main; + u32 my_cpu_number = vm->thread_index; + int i; + u32 poolindex; + + for (i = 0; i < vec_len (expired_timers); i++) + { + poolindex = expired_timers[i] & 0x7FFFFFFF; + vec_add1 (fm->expired_passive_per_worker[my_cpu_number], poolindex); + } +} + +static clib_error_t * +flowprobe_create_state_tables (u32 active_timer) +{ + flowprobe_main_t *fm = &flowprobe_main; + vlib_thread_main_t *tm = &vlib_thread_main; + vlib_main_t *vm = vlib_get_main (); + clib_error_t *error = 0; + u32 num_threads; + int i; + + /* Decide how many worker threads we have */ + num_threads = 1 /* main thread */ + tm->n_threads; + + /* Hash table per worker */ + fm->ht_log2len = FLOWPROBE_LOG2_HASHSIZE; + + /* Init per worker flow state and timer wheels */ + if (active_timer) + { + vec_validate (fm->timers_per_worker, num_threads - 1); + vec_validate (fm->expired_passive_per_worker, num_threads - 1); + vec_validate (fm->hash_per_worker, num_threads - 1); + vec_validate (fm->pool_per_worker, num_threads - 1); + + for (i = 0; i < num_threads; i++) + { + int j; + pool_alloc (fm->pool_per_worker[i], 1 << fm->ht_log2len); + vec_resize (fm->hash_per_worker[i], 1 << fm->ht_log2len); + for (j = 0; j < (1 << fm->ht_log2len); j++) + fm->hash_per_worker[i][j] = ~0; + fm->timers_per_worker[i] = + clib_mem_alloc (sizeof (TWT (tw_timer_wheel))); + tw_timer_wheel_init_2t_1w_2048sl (fm->timers_per_worker[i], + flowprobe_expired_timer_callback, + 1.0, 1024); + } + fm->disabled = true; + } + else + { + f64 now = vlib_time_now (vm); + vec_validate (fm->stateless_entry, num_threads - 1); + for (i = 0; i < num_threads; i++) + fm->stateless_entry[i].last_exported = now; + fm->disabled = false; + } + fm->initialized = true; + return error; +} + +static int +validate_feature_on_interface (flowprobe_main_t * fm, u32 sw_if_index, + u8 which) +{ + vec_validate_init_empty (fm->flow_per_interface, sw_if_index, ~0); + + if (fm->flow_per_interface[sw_if_index] == (u8) ~ 0) + return -1; + else if (fm->flow_per_interface[sw_if_index] != which) + return 0; + else + return 1; +} + +/** + * @brief configure / deconfigure the IPFIX flow-per-packet + * @param fm flowprobe_main_t * fm + * @param sw_if_index u32 the desired interface + * @param is_add int 1 to enable the feature, 0 to disable it + * @returns 0 if successful, non-zero otherwise + */ + +static int +flowprobe_tx_interface_add_del_feature (flowprobe_main_t * fm, + u32 sw_if_index, u8 which, int is_add) +{ + vlib_main_t *vm = vlib_get_main (); + int rv = 0; + u16 template_id = 0; + flowprobe_record_t flags = fm->record; + + fm->flow_per_interface[sw_if_index] = (is_add) ? which : (u8) ~ 0; + fm->template_per_flow[which] += (is_add) ? 1 : -1; + if (is_add && fm->template_per_flow[which] > 1) + template_id = fm->template_reports[flags]; + + if ((is_add && fm->template_per_flow[which] == 1) || + (!is_add && fm->template_per_flow[which] == 0)) + { + if (which == FLOW_VARIANT_L2) + { + if (fm->record & FLOW_RECORD_L2) + { + rv = flowprobe_template_add_del (1, UDP_DST_PORT_ipfix, flags, + flowprobe_data_callback_l2, + flowprobe_template_rewrite_l2, + is_add, &template_id); + } + if (fm->record & FLOW_RECORD_L3 || fm->record & FLOW_RECORD_L4) + { + rv = flowprobe_template_add_del (1, UDP_DST_PORT_ipfix, flags, + flowprobe_data_callback_l2, + flowprobe_template_rewrite_l2_ip4, + is_add, &template_id); + fm->template_reports[flags | FLOW_RECORD_L2_IP4] = + (is_add) ? template_id : 0; + rv = + flowprobe_template_add_del (1, UDP_DST_PORT_ipfix, flags, + flowprobe_data_callback_l2, + flowprobe_template_rewrite_l2_ip6, + is_add, &template_id); + fm->template_reports[flags | FLOW_RECORD_L2_IP6] = + (is_add) ? template_id : 0; + + /* Special case L2 */ + fm->context[FLOW_VARIANT_L2_IP4].flags = + flags | FLOW_RECORD_L2_IP4; + fm->context[FLOW_VARIANT_L2_IP6].flags = + flags | FLOW_RECORD_L2_IP6; + + fm->template_reports[flags] = template_id; + } + } + else if (which == FLOW_VARIANT_IP4) + rv = flowprobe_template_add_del (1, UDP_DST_PORT_ipfix, flags, + flowprobe_data_callback_ip4, + flowprobe_template_rewrite_ip4, + is_add, &template_id); + else if (which == FLOW_VARIANT_IP6) + rv = flowprobe_template_add_del (1, UDP_DST_PORT_ipfix, flags, + flowprobe_data_callback_ip6, + flowprobe_template_rewrite_ip6, + is_add, &template_id); + } + if (rv && rv != VNET_API_ERROR_VALUE_EXIST) + { + clib_warning ("vnet_flow_report_add_del returned %d", rv); + return -1; + } + + if (which != (u8) ~ 0) + { + fm->context[which].flags = fm->record; + fm->template_reports[flags] = (is_add) ? template_id : 0; + } + + if (which == FLOW_VARIANT_IP4) + vnet_feature_enable_disable ("ip4-output", "flowprobe-ip4", + sw_if_index, is_add, 0, 0); + else if (which == FLOW_VARIANT_IP6) + vnet_feature_enable_disable ("ip6-output", "flowprobe-ip6", + sw_if_index, is_add, 0, 0); + else if (which == FLOW_VARIANT_L2) + vnet_feature_enable_disable ("interface-output", "flowprobe-l2", + sw_if_index, is_add, 0, 0); + + /* Stateful flow collection */ + if (is_add && !fm->initialized) + { + flowprobe_create_state_tables (fm->active_timer); + if (fm->active_timer) + vlib_process_signal_event (vm, flowprobe_timer_node.index, 1, 0); + } + + return 0; +} + +/** + * @brief API message handler + * @param mp vl_api_flowprobe_tx_interface_add_del_t * mp the api message + */ +void vl_api_flowprobe_tx_interface_add_del_t_handler + (vl_api_flowprobe_tx_interface_add_del_t * mp) +{ + flowprobe_main_t *fm = &flowprobe_main; + vl_api_flowprobe_tx_interface_add_del_reply_t *rmp; + u32 sw_if_index = ntohl (mp->sw_if_index); + int rv = 0; + + VALIDATE_SW_IF_INDEX (mp); + + if (mp->which != FLOW_VARIANT_IP4 && mp->which != FLOW_VARIANT_L2 + && mp->which != FLOW_VARIANT_IP6) + { + rv = VNET_API_ERROR_UNIMPLEMENTED; + goto out; + } + + if (fm->record == 0) + { + clib_warning ("Please specify flowprobe params record first..."); + rv = VNET_API_ERROR_CANNOT_ENABLE_DISABLE_FEATURE; + goto out; + } + + rv = validate_feature_on_interface (fm, sw_if_index, mp->which); + if ((rv == 1 && mp->is_add == 1) || rv == 0) + { + rv = VNET_API_ERROR_CANNOT_ENABLE_DISABLE_FEATURE; + goto out; + } + + rv = flowprobe_tx_interface_add_del_feature + (fm, sw_if_index, mp->which, mp->is_add); + +out: + BAD_SW_IF_INDEX_LABEL; + + REPLY_MACRO (VL_API_FLOWPROBE_TX_INTERFACE_ADD_DEL_REPLY); +} + +/** + * @brief API message custom-dump function + * @param mp vl_api_flowprobe_tx_interface_add_del_t * mp the api message + * @param handle void * print function handle + * @returns u8 * output string + */ +static void *vl_api_flowprobe_tx_interface_add_del_t_print + (vl_api_flowprobe_tx_interface_add_del_t * mp, void *handle) +{ + u8 *s; + + s = format (0, "SCRIPT: flowprobe_tx_interface_add_del "); + s = format (s, "sw_if_index %d is_add %d which %d ", + clib_host_to_net_u32 (mp->sw_if_index), + (int) mp->is_add, (int) mp->which); + FINISH; +} + +#define vec_neg_search(v,E) \ +({ \ + word _v(i) = 0; \ + while (_v(i) < vec_len(v) && v[_v(i)] == E) \ + { \ + _v(i)++; \ + } \ + if (_v(i) == vec_len(v)) \ + _v(i) = ~0; \ + _v(i); \ +}) + +static int +flowprobe_params (flowprobe_main_t * fm, u8 record_l2, + u8 record_l3, u8 record_l4, + u32 active_timer, u32 passive_timer) +{ + flowprobe_record_t flags = 0; + + if (vec_neg_search (fm->flow_per_interface, (u8) ~ 0) != ~0) + return ~0; + + if (record_l2) + flags |= FLOW_RECORD_L2; + if (record_l3) + flags |= FLOW_RECORD_L3; + if (record_l4) + flags |= FLOW_RECORD_L4; + + fm->record = flags; + + /* + * Timers: ~0 is default, 0 is off + */ + fm->active_timer = + (active_timer == (u32) ~ 0 ? FLOWPROBE_TIMER_ACTIVE : active_timer); + fm->passive_timer = + (passive_timer == (u32) ~ 0 ? FLOWPROBE_TIMER_PASSIVE : passive_timer); + + return 0; +} + +void +vl_api_flowprobe_params_t_handler (vl_api_flowprobe_params_t * mp) +{ + flowprobe_main_t *fm = &flowprobe_main; + vl_api_flowprobe_params_reply_t *rmp; + int rv = 0; + + rv = flowprobe_params + (fm, mp->record_l2, mp->record_l3, mp->record_l4, + clib_net_to_host_u32 (mp->active_timer), + clib_net_to_host_u32 (mp->passive_timer)); + + REPLY_MACRO (VL_API_FLOWPROBE_PARAMS_REPLY); +} + +/* List of message types that this plugin understands */ +#define foreach_flowprobe_plugin_api_msg \ +_(FLOWPROBE_TX_INTERFACE_ADD_DEL, flowprobe_tx_interface_add_del) \ +_(FLOWPROBE_PARAMS, flowprobe_params) + +/* *INDENT-OFF* */ +VLIB_PLUGIN_REGISTER () = { + .version = VPP_BUILD_VER, + .description = "Flow per Packet", +}; +/* *INDENT-ON* */ + +u8 * +format_flowprobe_entry (u8 * s, va_list * args) +{ + flowprobe_entry_t *e = va_arg (*args, flowprobe_entry_t *); + s = format (s, " %d/%d", e->key.rx_sw_if_index, e->key.tx_sw_if_index); + + s = format (s, " %U %U", format_ethernet_address, &e->key.src_mac, + format_ethernet_address, &e->key.dst_mac); + s = format (s, " %U -> %U", + format_ip46_address, &e->key.src_address, IP46_TYPE_ANY, + format_ip46_address, &e->key.dst_address, IP46_TYPE_ANY); + s = format (s, " %d", e->key.protocol); + s = format (s, " %d %d\n", clib_net_to_host_u16 (e->key.src_port), + clib_net_to_host_u16 (e->key.dst_port)); + + return s; +} + +static clib_error_t * +flowprobe_show_table_fn (vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cm) +{ + flowprobe_main_t *fm = &flowprobe_main; + int i; + flowprobe_entry_t *e; + + vlib_cli_output (vm, "Dumping IPFIX table"); + + for (i = 0; i < vec_len (fm->pool_per_worker); i++) + { + /* *INDENT-OFF* */ + pool_foreach (e, fm->pool_per_worker[i], ( + { + vlib_cli_output (vm, "%U", + format_flowprobe_entry, + e); + })); + /* *INDENT-ON* */ + + } + return 0; +} + +static clib_error_t * +flowprobe_show_stats_fn (vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cm) +{ + flowprobe_main_t *fm = &flowprobe_main; + int i; + + vlib_cli_output (vm, "IPFIX table statistics"); + vlib_cli_output (vm, "Flow entry size: %d\n", sizeof (flowprobe_entry_t)); + vlib_cli_output (vm, "Flow pool size per thread: %d\n", + 0x1 << FLOWPROBE_LOG2_HASHSIZE); + + for (i = 0; i < vec_len (fm->pool_per_worker); i++) + vlib_cli_output (vm, "Pool utilisation thread %d is %d%%\n", i, + (100 * pool_elts (fm->pool_per_worker[i])) / + (0x1 << FLOWPROBE_LOG2_HASHSIZE)); + return 0; +} + +static clib_error_t * +flowprobe_tx_interface_add_del_feature_command_fn (vlib_main_t * vm, + unformat_input_t * input, + vlib_cli_command_t * cmd) +{ + flowprobe_main_t *fm = &flowprobe_main; + u32 sw_if_index = ~0; + int is_add = 1; + u8 which = FLOW_VARIANT_IP4; + int rv; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "disable")) + is_add = 0; + else if (unformat (input, "%U", unformat_vnet_sw_interface, + fm->vnet_main, &sw_if_index)); + else if (unformat (input, "ip4")) + which = FLOW_VARIANT_IP4; + else if (unformat (input, "ip6")) + which = FLOW_VARIANT_IP6; + else if (unformat (input, "l2")) + which = FLOW_VARIANT_L2; + else + break; + } + + if (fm->record == 0) + return clib_error_return (0, + "Please specify flowprobe params record first..."); + + if (sw_if_index == ~0) + return clib_error_return (0, "Please specify an interface..."); + + rv = validate_feature_on_interface (fm, sw_if_index, which); + if (rv == 1) + { + if (is_add) + return clib_error_return (0, + "Datapath is already enabled for given interface..."); + } + else if (rv == 0) + return clib_error_return (0, + "Interface has enable different datapath ..."); + + rv = + flowprobe_tx_interface_add_del_feature (fm, sw_if_index, which, is_add); + switch (rv) + { + case 0: + break; + + case VNET_API_ERROR_INVALID_SW_IF_INDEX: + return clib_error_return + (0, "Invalid interface, only works on physical ports"); + break; + + case VNET_API_ERROR_UNIMPLEMENTED: + return clib_error_return (0, "ip6 not supported"); + break; + + default: + return clib_error_return (0, "flowprobe_enable_disable returned %d", + rv); + } + return 0; +} + +static clib_error_t * +flowprobe_params_command_fn (vlib_main_t * vm, + unformat_input_t * input, + vlib_cli_command_t * cmd) +{ + flowprobe_main_t *fm = &flowprobe_main; + bool record_l2 = false, record_l3 = false, record_l4 = false; + u32 active_timer = ~0; + u32 passive_timer = ~0; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "active %d", &active_timer)) + ; + else if (unformat (input, "passive %d", &passive_timer)) + ; + else if (unformat (input, "record")) + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) + { + if (unformat (input, "l2")) + record_l2 = true; + else if (unformat (input, "l3")) + record_l3 = true; + else if (unformat (input, "l4")) + record_l4 = true; + else + break; + } + else + break; + } + + if (passive_timer > 0 && active_timer > passive_timer) + return clib_error_return (0, + "Passive timer has to be greater than active one..."); + + if (flowprobe_params (fm, record_l2, record_l3, record_l4, + active_timer, passive_timer)) + return clib_error_return (0, + "Couldn't change flowperpacket params when feature is enabled on some interface ..."); + return 0; +} + +/*? + * '<em>flowprobe feature add-del</em>' commands to enable/disable + * per-packet IPFIX flow record generation on an interface + * + * @cliexpar + * @parblock + * To enable per-packet IPFIX flow-record generation on an interface: + * @cliexcmd{flowprobe feature add-del GigabitEthernet2/0/0} + * + * To disable per-packet IPFIX flow-record generation on an interface: + * @cliexcmd{flowprobe feature add-del GigabitEthernet2/0/0 disable} + * @cliexend + * @endparblock +?*/ +/* *INDENT-OFF* */ +VLIB_CLI_COMMAND (flowprobe_enable_disable_command, static) = { + .path = "flowprobe feature add-del", + .short_help = + "flowprobe feature add-del <interface-name> <l2|ip4|ip6> disable", + .function = flowprobe_tx_interface_add_del_feature_command_fn, +}; +VLIB_CLI_COMMAND (flowprobe_params_command, static) = { + .path = "flowprobe params", + .short_help = + "flowprobe params record <[l2] [l3] [l4]> [active <timer> passive <timer>]", + .function = flowprobe_params_command_fn, +}; +VLIB_CLI_COMMAND (flowprobe_show_table_command, static) = { + .path = "show flowprobe table", + .short_help = "show flowprobe table", + .function = flowprobe_show_table_fn, +}; +VLIB_CLI_COMMAND (flowprobe_show_stats_command, static) = { + .path = "show flowprobe statistics", + .short_help = "show flowprobe statistics", + .function = flowprobe_show_stats_fn, +}; +/* *INDENT-ON* */ + +/** + * @brief Set up the API message handling tables + * @param vm vlib_main_t * vlib main data structure pointer + * @returns 0 to indicate all is well + */ +static clib_error_t * +flowprobe_plugin_api_hookup (vlib_main_t * vm) +{ + flowprobe_main_t *fm = &flowprobe_main; +#define _(N,n) \ + vl_msg_api_set_handlers((VL_API_##N + fm->msg_id_base), \ + #n, \ + vl_api_##n##_t_handler, \ + vl_noop_handler, \ + vl_api_##n##_t_endian, \ + vl_api_##n##_t_print, \ + sizeof(vl_api_##n##_t), 1); + foreach_flowprobe_plugin_api_msg; +#undef _ + + return 0; +} + +#define vl_msg_name_crc_list +#include <flowprobe/flowprobe_all_api_h.h> +#undef vl_msg_name_crc_list + +static void +setup_message_id_table (flowprobe_main_t * fm, api_main_t * am) +{ +#define _(id,n,crc) \ + vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id + fm->msg_id_base); + foreach_vl_msg_name_crc_flowprobe; +#undef _ +} + +/* + * Main-core process, sending an interrupt to the per worker input + * process that spins the per worker timer wheel. + */ +static uword +timer_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f) +{ + uword *event_data = 0; + vlib_main_t **worker_vms = 0, *worker_vm; + flowprobe_main_t *fm = &flowprobe_main; + + /* Wait for Godot... */ + vlib_process_wait_for_event_or_clock (vm, 1e9); + uword event_type = vlib_process_get_events (vm, &event_data); + if (event_type != 1) + clib_warning ("bogus kickoff event received, %d", event_type); + vec_reset_length (event_data); + + int i; + if (vec_len (vlib_mains) == 0) + vec_add1 (worker_vms, vm); + else + { + for (i = 0; i < vec_len (vlib_mains); i++) + { + worker_vm = vlib_mains[i]; + if (worker_vm) + vec_add1 (worker_vms, worker_vm); + } + } + f64 sleep_duration = 0.1; + + while (1) + { + /* Send an interrupt to each timer input node */ + sleep_duration = 0.1; + for (i = 0; i < vec_len (worker_vms); i++) + { + worker_vm = worker_vms[i]; + if (worker_vm) + { + vlib_node_set_interrupt_pending (worker_vm, + flowprobe_walker_node.index); + sleep_duration = + (fm->expired_passive_per_worker[i] > 0) ? 1e-4 : 0.1; + } + } + vlib_process_suspend (vm, sleep_duration); + } + return 0; /* or not */ +} + +/* *INDENT-OFF* */ +VLIB_REGISTER_NODE (flowprobe_timer_node,static) = { + .function = timer_process, + .name = "flowprobe-timer-process", + .type = VLIB_NODE_TYPE_PROCESS, +}; +/* *INDENT-ON* */ + +/** + * @brief Set up the API message handling tables + * @param vm vlib_main_t * vlib main data structure pointer + * @returns 0 to indicate all is well, or a clib_error_t + */ +static clib_error_t * +flowprobe_init (vlib_main_t * vm) +{ + flowprobe_main_t *fm = &flowprobe_main; + vlib_thread_main_t *tm = &vlib_thread_main; + clib_error_t *error = 0; + u8 *name; + u32 num_threads; + int i; + + fm->vnet_main = vnet_get_main (); + + /* Construct the API name */ + name = format (0, "flowprobe_%08x%c", api_version, 0); + + /* Ask for a correctly-sized block of API message decode slots */ + fm->msg_id_base = vl_msg_api_get_msg_ids + ((char *) name, VL_MSG_FIRST_AVAILABLE); + + /* Hook up message handlers */ + error = flowprobe_plugin_api_hookup (vm); + + /* Add our API messages to the global name_crc hash table */ + setup_message_id_table (fm, &api_main); + + vec_free (name); + + /* Set up time reference pair */ + fm->vlib_time_0 = vlib_time_now (vm); + fm->nanosecond_time_0 = unix_time_now_nsec (); + + memset (fm->template_reports, 0, sizeof (fm->template_reports)); + memset (fm->template_size, 0, sizeof (fm->template_size)); + memset (fm->template_per_flow, 0, sizeof (fm->template_per_flow)); + + /* Decide how many worker threads we have */ + num_threads = 1 /* main thread */ + tm->n_threads; + + /* Allocate per worker thread vectors per flavour */ + for (i = 0; i < FLOW_N_VARIANTS; i++) + { + vec_validate (fm->context[i].buffers_per_worker, num_threads - 1); + vec_validate (fm->context[i].frames_per_worker, num_threads - 1); + vec_validate (fm->context[i].next_record_offset_per_worker, + num_threads - 1); + } + + fm->active_timer = FLOWPROBE_TIMER_ACTIVE; + fm->passive_timer = FLOWPROBE_TIMER_PASSIVE; + + return error; +} + +VLIB_INIT_FUNCTION (flowprobe_init); + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/flowprobe/flowprobe.h b/src/plugins/flowprobe/flowprobe.h new file mode 100644 index 00000000000..196c92a7f35 --- /dev/null +++ b/src/plugins/flowprobe/flowprobe.h @@ -0,0 +1,167 @@ +/* + * flowprobe.h - ipfix probe plug-in header file + * + * Copyright (c) 2016 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __included_flowprobe_h__ +#define __included_flowprobe_h__ + +#include <vnet/vnet.h> +#include <vnet/ip/ip.h> +#include <vnet/ethernet/ethernet.h> + +#include <vppinfra/hash.h> +#include <vppinfra/error.h> +#include <vnet/flow/flow_report.h> +#include <vnet/flow/flow_report_classify.h> +#include <vppinfra/tw_timer_2t_1w_2048sl.h> + +/* Default timers in seconds */ +#define FLOWPROBE_TIMER_ACTIVE (15) +#define FLOWPROBE_TIMER_PASSIVE 120 // XXXX: FOR TESTING (30*60) +#define FLOWPROBE_LOG2_HASHSIZE (18) + +typedef enum +{ + FLOW_RECORD_L2 = 1 << 0, + FLOW_RECORD_L3 = 1 << 1, + FLOW_RECORD_L4 = 1 << 2, + FLOW_RECORD_L2_IP4 = 1 << 3, + FLOW_RECORD_L2_IP6 = 1 << 4, + FLOW_N_RECORDS = 1 << 5, +} flowprobe_record_t; + +/* *INDENT-OFF* */ +typedef enum __attribute__ ((__packed__)) +{ + FLOW_VARIANT_IP4, + FLOW_VARIANT_IP6, + FLOW_VARIANT_L2, + FLOW_VARIANT_L2_IP4, + FLOW_VARIANT_L2_IP6, + FLOW_N_VARIANTS, +} flowprobe_variant_t; +/* *INDENT-ON* */ + +STATIC_ASSERT (sizeof (flowprobe_variant_t) == 1, + "flowprobe_variant_t is expected to be 1 byte, " + "revisit padding in flowprobe_key_t"); + +#define FLOW_MAXIMUM_EXPORT_ENTRIES (1024) + +typedef struct +{ + /* what to collect per variant */ + flowprobe_record_t flags; + /** ipfix buffers under construction, per-worker thread */ + vlib_buffer_t **buffers_per_worker; + /** frames containing ipfix buffers, per-worker thread */ + vlib_frame_t **frames_per_worker; + /** next record offset, per worker thread */ + u16 *next_record_offset_per_worker; +} flowprobe_protocol_context_t; + +#define FLOWPROBE_KEY_IN_U32 22 +/* *INDENT-OFF* */ +typedef CLIB_PACKED (union +{ + struct { + u32 rx_sw_if_index; + u32 tx_sw_if_index; + u8 src_mac[6]; + u8 dst_mac[6]; + u16 ethertype; + ip46_address_t src_address; + ip46_address_t dst_address; + u8 protocol; + u16 src_port; + u16 dst_port; + flowprobe_variant_t which; + }; + u32 as_u32[FLOWPROBE_KEY_IN_U32]; +}) flowprobe_key_t; +/* *INDENT-ON* */ + +STATIC_ASSERT (sizeof (flowprobe_key_t) == FLOWPROBE_KEY_IN_U32 * + sizeof (u32), "flowprobe_key_t padding is wrong"); + +typedef struct +{ + flowprobe_key_t key; + u64 packetcount; + u64 octetcount; + f64 last_updated; + f64 last_exported; + u32 passive_timer_handle; +} flowprobe_entry_t; + +/** + * @file + * @brief flow-per-packet plugin header file + */ +typedef struct +{ + /** API message ID base */ + u16 msg_id_base; + + flowprobe_protocol_context_t context[FLOW_N_VARIANTS]; + u16 template_reports[FLOW_N_RECORDS]; + u16 template_size[FLOW_N_RECORDS]; + + /** Time reference pair */ + u64 nanosecond_time_0; + f64 vlib_time_0; + + /** Per CPU flow-state */ + u8 ht_log2len; /* Hash table size is 2^log2len */ + u32 **hash_per_worker; + flowprobe_entry_t **pool_per_worker; + /* *INDENT-OFF* */ + TWT (tw_timer_wheel) ** timers_per_worker; + /* *INDENT-ON* */ + u32 **expired_passive_per_worker; + + flowprobe_record_t record; + u32 active_timer; + u32 passive_timer; + flowprobe_entry_t *stateless_entry; + + bool initialized; + bool disabled; + + u16 template_per_flow[FLOW_N_VARIANTS]; + u8 *flow_per_interface; + + /** convenience vlib_main_t pointer */ + vlib_main_t *vlib_main; + /** convenience vnet_main_t pointer */ + vnet_main_t *vnet_main; +} flowprobe_main_t; + +extern flowprobe_main_t flowprobe_main; + +void flowprobe_flush_callback_ip4 (void); +void flowprobe_flush_callback_ip6 (void); +void flowprobe_flush_callback_l2 (void); +u8 *format_flowprobe_entry (u8 * s, va_list * args); + +#endif + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/flowperpkt/flowperpkt_all_api_h.h b/src/plugins/flowprobe/flowprobe_all_api_h.h index 329c375abca..1f30ecccf98 100644 --- a/src/plugins/flowperpkt/flowperpkt_all_api_h.h +++ b/src/plugins/flowprobe/flowprobe_all_api_h.h @@ -1,5 +1,5 @@ /* - * flowperpkt_all_api_h.h - plug-in api #include file + * flowprobe_all_api_h.h - plug-in api #include file * * Copyright (c) <current-year> <your-organization> * Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,4 +15,4 @@ * limitations under the License. */ /* Include the generated file, see BUILT_SOURCES in Makefile.am */ -#include <flowperpkt/flowperpkt.api.h> +#include <flowprobe/flowprobe.api.h> diff --git a/src/plugins/flowperpkt/flowperpkt_msg_enum.h b/src/plugins/flowprobe/flowprobe_msg_enum.h index 3177e77a63b..bc0b21c96c1 100644 --- a/src/plugins/flowperpkt/flowperpkt_msg_enum.h +++ b/src/plugins/flowprobe/flowprobe_msg_enum.h @@ -1,5 +1,5 @@ /* - * flowperpkt_msg_enum.h - vpp engine plug-in message enumeration + * flowprobe_msg_enum.h - vpp engine plug-in message enumeration * * Copyright (c) <current-year> <your-organization> * Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,18 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef included_flowperpkt_msg_enum_h -#define included_flowperpkt_msg_enum_h +#ifndef included_flowprobe_msg_enum_h +#define included_flowprobe_msg_enum_h #include <vppinfra/byte_order.h> #define vl_msg_id(n,h) n, typedef enum { -#include <flowperpkt/flowperpkt_all_api_h.h> +#include <flowprobe/flowprobe_all_api_h.h> /* We'll want to know how many messages IDs we need... */ VL_MSG_FIRST_AVAILABLE, } vl_msg_id_t; #undef vl_msg_id -#endif /* included_flowperpkt_msg_enum_h */ +#endif /* included_flowprobe_msg_enum_h */ diff --git a/src/plugins/flowprobe/flowprobe_plugin_doc.md b/src/plugins/flowprobe/flowprobe_plugin_doc.md new file mode 100644 index 00000000000..4c9b2342a83 --- /dev/null +++ b/src/plugins/flowprobe/flowprobe_plugin_doc.md @@ -0,0 +1,13 @@ +IPFIX flow record plugin {#flowprobe_plugin_doc} +======================== + +## Introduction + +This plugin generates ipfix flow records on interfaces which have the feature enabled + +## Sample configuration + +set ipfix exporter collector 192.168.6.2 src 192.168.6.1 template-interval 20 port 4739 path-mtu 1500 + +flowprobe params record l3 active 20 passive 120 +flowprobe feature add-del GigabitEthernet2/3/0 l2
\ No newline at end of file diff --git a/src/plugins/flowperpkt/flowperpkt_test.c b/src/plugins/flowprobe/flowprobe_test.c index 972a3b073a8..91793f552c9 100644 --- a/src/plugins/flowperpkt/flowperpkt_test.c +++ b/src/plugins/flowprobe/flowprobe_test.c @@ -1,5 +1,5 @@ /* - * flowperpkt.c - skeleton vpp-api-test plug-in + * flowprobe.c - skeleton vpp-api-test plug-in * * Copyright (c) <current-year> <your-organization> * Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,8 +19,9 @@ #include <vlibmemory/api.h> #include <vlibsocket/api.h> #include <vppinfra/error.h> +#include <flowprobe/flowprobe.h> -#define __plugin_msg_base flowperpkt_test_main.msg_id_base +#define __plugin_msg_base flowprobe_test_main.msg_id_base #include <vlibapi/vat_helper_macros.h> /** @@ -30,28 +31,28 @@ uword unformat_sw_if_index (unformat_input_t * input, va_list * args); /* Declare message IDs */ -#include <flowperpkt/flowperpkt_msg_enum.h> +#include <flowprobe/flowprobe_msg_enum.h> /* define message structures */ #define vl_typedefs -#include <flowperpkt/flowperpkt_all_api_h.h> +#include <flowprobe/flowprobe_all_api_h.h> #undef vl_typedefs /* declare message handlers for each api */ #define vl_endianfun /* define message structures */ -#include <flowperpkt/flowperpkt_all_api_h.h> +#include <flowprobe/flowprobe_all_api_h.h> #undef vl_endianfun /* instantiate all the print functions we know about */ #define vl_print(handle, ...) #define vl_printfun -#include <flowperpkt/flowperpkt_all_api_h.h> +#include <flowprobe/flowprobe_all_api_h.h> #undef vl_printfun /* Get the API version number. */ #define vl_api_version(n,v) static u32 api_version=(v); -#include <flowperpkt/flowperpkt_all_api_h.h> +#include <flowprobe/flowprobe_all_api_h.h> #undef vl_api_version typedef struct @@ -60,18 +61,19 @@ typedef struct u16 msg_id_base; /** vat_main_t pointer */ vat_main_t *vat_main; -} flowperpkt_test_main_t; +} flowprobe_test_main_t; -flowperpkt_test_main_t flowperpkt_test_main; +flowprobe_test_main_t flowprobe_test_main; #define foreach_standard_reply_retval_handler \ -_(flowperpkt_tx_interface_add_del_reply) +_(flowprobe_tx_interface_add_del_reply) \ +_(flowprobe_params_reply) #define _(n) \ static void vl_api_##n##_t_handler \ (vl_api_##n##_t * mp) \ { \ - vat_main_t * vam = flowperpkt_test_main.vat_main; \ + vat_main_t * vam = flowprobe_test_main.vat_main; \ i32 retval = ntohl(mp->retval); \ if (vam->async_mode) { \ vam->async_errors += (retval < 0); \ @@ -88,17 +90,18 @@ foreach_standard_reply_retval_handler; * we just generated */ #define foreach_vpe_api_reply_msg \ -_(FLOWPERPKT_TX_INTERFACE_ADD_DEL_REPLY, \ - flowperpkt_tx_interface_add_del_reply) +_(FLOWPROBE_TX_INTERFACE_ADD_DEL_REPLY, \ + flowprobe_tx_interface_add_del_reply) \ +_(FLOWPROBE_PARAMS_REPLY, flowprobe_params_reply) static int -api_flowperpkt_tx_interface_add_del (vat_main_t * vam) +api_flowprobe_tx_interface_add_del (vat_main_t * vam) { unformat_input_t *i = vam->input; int enable_disable = 1; - u8 which = 0; /* ipv4 by default */ + u8 which = FLOW_VARIANT_IP4; u32 sw_if_index = ~0; - vl_api_flowperpkt_tx_interface_add_del_t *mp; + vl_api_flowprobe_tx_interface_add_del_t *mp; int ret; /* Parse args required to build the message */ @@ -110,8 +113,12 @@ api_flowperpkt_tx_interface_add_del (vat_main_t * vam) ; else if (unformat (i, "disable")) enable_disable = 0; + else if (unformat (i, "ip4")) + which = FLOW_VARIANT_IP4; + else if (unformat (i, "ip6")) + which = FLOW_VARIANT_IP6; else if (unformat (i, "l2")) - which = 1; + which = FLOW_VARIANT_L2; else break; } @@ -123,7 +130,7 @@ api_flowperpkt_tx_interface_add_del (vat_main_t * vam) } /* Construct the API message */ - M (FLOWPERPKT_TX_INTERFACE_ADD_DEL, mp); + M (FLOWPROBE_TX_INTERFACE_ADD_DEL, mp); mp->sw_if_index = ntohl (sw_if_index); mp->is_add = enable_disable; mp->which = which; @@ -136,17 +143,73 @@ api_flowperpkt_tx_interface_add_del (vat_main_t * vam) return ret; } +static int +api_flowprobe_params (vat_main_t * vam) +{ + unformat_input_t *i = vam->input; + u8 record_l2 = 0, record_l3 = 0, record_l4 = 0; + u32 active_timer = ~0; + u32 passive_timer = ~0; + vl_api_flowprobe_params_t *mp; + int ret; + + while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT) + { + if (unformat (i, "active %d", &active_timer)) + ; + else if (unformat (i, "passive %d", &passive_timer)) + ; + else if (unformat (i, "record")) + while (unformat_check_input (i) != UNFORMAT_END_OF_INPUT) + { + if (unformat (i, "l2")) + record_l2 = 1; + else if (unformat (i, "l3")) + record_l3 = 1; + else if (unformat (i, "l4")) + record_l4 = 1; + else + break; + } + else + break; + } + + if (passive_timer > 0 && active_timer > passive_timer) + { + errmsg ("Passive timer has to be greater than active one...\n"); + return -99; + } + + /* Construct the API message */ + M (FLOWPROBE_PARAMS, mp); + mp->record_l2 = record_l2; + mp->record_l3 = record_l3; + mp->record_l4 = record_l4; + mp->active_timer = ntohl (active_timer); + mp->passive_timer = ntohl (passive_timer); + + /* send it... */ + S (mp); + + /* Wait for a reply... */ + W (ret); + + return ret; +} + /* * List of messages that the api test plugin sends, * and that the data plane plugin processes */ #define foreach_vpe_api_msg \ -_(flowperpkt_tx_interface_add_del, "<intfc> [disable]") +_(flowprobe_tx_interface_add_del, "<intfc> [disable]") \ +_(flowprobe_params, "record <[l2] [l3] [l4]> [active <timer> passive <timer>]") static void -flowperpkt_vat_api_hookup (vat_main_t * vam) +flowprobe_vat_api_hookup (vat_main_t * vam) { - flowperpkt_test_main_t *sm = &flowperpkt_test_main; + flowprobe_test_main_t *sm = &flowprobe_test_main; /* Hook up handlers for replies from the data plane plug-in */ #define _(N,n) \ vl_msg_api_set_handlers((VL_API_##N + sm->msg_id_base), \ @@ -173,18 +236,18 @@ flowperpkt_vat_api_hookup (vat_main_t * vam) clib_error_t * vat_plugin_register (vat_main_t * vam) { - flowperpkt_test_main_t *sm = &flowperpkt_test_main; + flowprobe_test_main_t *sm = &flowprobe_test_main; u8 *name; sm->vat_main = vam; /* Ask the vpp engine for the first assigned message-id */ - name = format (0, "flowperpkt_%08x%c", api_version, 0); + name = format (0, "flowprobe_%08x%c", api_version, 0); sm->msg_id_base = vl_client_get_first_plugin_msg_id ((char *) name); /* Don't attempt to hook up API messages if the data plane plugin is AWOL */ if (sm->msg_id_base != (u16) ~ 0) - flowperpkt_vat_api_hookup (vam); + flowprobe_vat_api_hookup (vam); vec_free (name); diff --git a/src/plugins/flowprobe/node.c b/src/plugins/flowprobe/node.c new file mode 100644 index 00000000000..6a539db9bbf --- /dev/null +++ b/src/plugins/flowprobe/node.c @@ -0,0 +1,998 @@ +/* + * node.c - ipfix probe graph node + * + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vlib/vlib.h> +#include <vnet/vnet.h> +#include <vnet/pg/pg.h> +#include <vppinfra/error.h> +#include <flowprobe/flowprobe.h> +#include <vnet/ip/ip6_packet.h> + +static void flowprobe_export_entry (vlib_main_t * vm, flowprobe_entry_t * e); + +/** + * @file flow record generator graph node + */ + +typedef struct +{ + /** interface handle */ + u32 rx_sw_if_index; + u32 tx_sw_if_index; + /** packet timestamp */ + u64 timestamp; + /** size of the buffer */ + u16 buffer_size; + + /** L2 information */ + u8 src_mac[6]; + u8 dst_mac[6]; + /** Ethertype */ + u16 ethertype; + + /** L3 information */ + ip46_address_t src_address; + ip46_address_t dst_address; + u8 protocol; + u8 tos; + + /** L4 information */ + u16 src_port; + u16 dst_port; + + flowprobe_variant_t which; +} flowprobe_trace_t; + +static char *flowprobe_variant_strings[] = { + [FLOW_VARIANT_IP4] = "IP4", + [FLOW_VARIANT_IP6] = "IP6", + [FLOW_VARIANT_L2] = "L2", + [FLOW_VARIANT_L2_IP4] = "L2-IP4", + [FLOW_VARIANT_L2_IP6] = "L2-IP6", +}; + +/* packet trace format function */ +static u8 * +format_flowprobe_trace (u8 * s, va_list * args) +{ + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + flowprobe_trace_t *t = va_arg (*args, flowprobe_trace_t *); + uword indent = format_get_indent (s); + + s = format (s, + "FLOWPROBE[%s]: rx_sw_if_index %d, tx_sw_if_index %d, " + "timestamp %lld, size %d", flowprobe_variant_strings[t->which], + t->rx_sw_if_index, t->tx_sw_if_index, + t->timestamp, t->buffer_size); + + if (t->which == FLOW_VARIANT_L2) + s = format (s, "\n%U -> %U", format_white_space, indent, + format_ethernet_address, &t->src_mac, + format_ethernet_address, &t->dst_mac); + + if (t->protocol > 0 + && (t->which == FLOW_VARIANT_L2_IP4 || t->which == FLOW_VARIANT_IP4 + || t->which == FLOW_VARIANT_L2_IP6 || t->which == FLOW_VARIANT_IP6)) + s = + format (s, "\n%U%U: %U -> %U", format_white_space, indent, + format_ip_protocol, t->protocol, format_ip46_address, + &t->src_address, IP46_TYPE_ANY, format_ip46_address, + &t->dst_address, IP46_TYPE_ANY); + return s; +} + +vlib_node_registration_t flowprobe_ip4_node; +vlib_node_registration_t flowprobe_ip6_node; +vlib_node_registration_t flowprobe_l2_node; + +/* No counters at the moment */ +#define foreach_flowprobe_error \ +_(COLLISION, "Hash table collisions") \ +_(BUFFER, "Buffer allocation error") \ +_(EXPORTED_PACKETS, "Exported packets") \ +_(INPATH, "Exported packets in path") + +typedef enum +{ +#define _(sym,str) FLOWPROBE_ERROR_##sym, + foreach_flowprobe_error +#undef _ + FLOWPROBE_N_ERROR, +} flowprobe_error_t; + +static char *flowprobe_error_strings[] = { +#define _(sym,string) string, + foreach_flowprobe_error +#undef _ +}; + +typedef enum +{ + FLOWPROBE_NEXT_DROP, + FLOWPROBE_NEXT_IP4_LOOKUP, + FLOWPROBE_N_NEXT, +} flowprobe_next_t; + +#define FLOWPROBE_NEXT_NODES { \ + [FLOWPROBE_NEXT_DROP] = "error-drop", \ + [FLOWPROBE_NEXT_IP4_LOOKUP] = "ip4-lookup", \ +} + +static inline flowprobe_variant_t +flowprobe_get_variant (flowprobe_variant_t which, + flowprobe_record_t flags, u16 ethertype) +{ + if (which == FLOW_VARIANT_L2 + && (flags & FLOW_RECORD_L3 || flags & FLOW_RECORD_L4)) + return ethertype == ETHERNET_TYPE_IP6 ? FLOW_VARIANT_L2_IP6 : ethertype == + ETHERNET_TYPE_IP4 ? FLOW_VARIANT_L2_IP4 : FLOW_VARIANT_L2; + return which; +} + +static inline u32 +flowprobe_common_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset) +{ + u16 start = offset; + + /* Ingress interface */ + u32 rx_if = clib_host_to_net_u32 (e->key.rx_sw_if_index); + clib_memcpy (to_b->data + offset, &rx_if, sizeof (rx_if)); + offset += sizeof (rx_if); + + /* Egress interface */ + u32 tx_if = clib_host_to_net_u32 (e->key.tx_sw_if_index); + clib_memcpy (to_b->data + offset, &tx_if, sizeof (tx_if)); + offset += sizeof (tx_if); + + /* packet delta count */ + u64 packetdelta = clib_host_to_net_u64 (e->packetcount); + clib_memcpy (to_b->data + offset, &packetdelta, sizeof (u64)); + offset += sizeof (u64); + + return offset - start; +} + +static inline u32 +flowprobe_l2_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset) +{ + u16 start = offset; + + /* src mac address */ + clib_memcpy (to_b->data + offset, &e->key.src_mac, 6); + offset += 6; + + /* dst mac address */ + clib_memcpy (to_b->data + offset, &e->key.dst_mac, 6); + offset += 6; + + /* ethertype */ + clib_memcpy (to_b->data + offset, &e->key.ethertype, 2); + offset += 2; + + return offset - start; +} + +static inline u32 +flowprobe_l3_ip6_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset) +{ + u16 start = offset; + + /* ip6 src address */ + clib_memcpy (to_b->data + offset, &e->key.src_address, + sizeof (ip6_address_t)); + offset += sizeof (ip6_address_t); + + /* ip6 dst address */ + clib_memcpy (to_b->data + offset, &e->key.dst_address, + sizeof (ip6_address_t)); + offset += sizeof (ip6_address_t); + + /* Protocol */ + to_b->data[offset++] = e->key.protocol; + + /* octetDeltaCount */ + u64 octetdelta = clib_host_to_net_u64 (e->octetcount); + clib_memcpy (to_b->data + offset, &octetdelta, sizeof (u64)); + offset += sizeof (u64); + + return offset - start; +} + +static inline u32 +flowprobe_l3_ip4_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset) +{ + u16 start = offset; + + /* ip4 src address */ + clib_memcpy (to_b->data + offset, &e->key.src_address.ip4, + sizeof (ip4_address_t)); + offset += sizeof (ip4_address_t); + + /* ip4 dst address */ + clib_memcpy (to_b->data + offset, &e->key.dst_address.ip4, + sizeof (ip4_address_t)); + offset += sizeof (ip4_address_t); + + /* Protocol */ + to_b->data[offset++] = e->key.protocol; + + /* octetDeltaCount */ + u64 octetdelta = clib_host_to_net_u64 (e->octetcount); + clib_memcpy (to_b->data + offset, &octetdelta, sizeof (u64)); + offset += sizeof (u64); + + return offset - start; +} + +static inline u32 +flowprobe_l4_add (vlib_buffer_t * to_b, flowprobe_entry_t * e, u16 offset) +{ + u16 start = offset; + + /* src port */ + clib_memcpy (to_b->data + offset, &e->key.src_port, 2); + offset += 2; + + /* dst port */ + clib_memcpy (to_b->data + offset, &e->key.dst_port, 2); + offset += 2; + + return offset - start; +} + +static inline u32 +flowprobe_hash (flowprobe_key_t * k) +{ + flowprobe_main_t *fm = &flowprobe_main; + int i; + u32 h = 0; + for (i = 0; i < sizeof (k->as_u32) / sizeof (u32); i++) + h = crc_u32 (k->as_u32[i], h); + return h >> (32 - fm->ht_log2len); +} + +flowprobe_entry_t * +flowprobe_lookup (u32 my_cpu_number, flowprobe_key_t * k, u32 * poolindex, + bool * collision) +{ + flowprobe_main_t *fm = &flowprobe_main; + flowprobe_entry_t *e; + u32 h; + + h = (fm->active_timer) ? flowprobe_hash (k) : 0; + + /* Lookup in the flow state pool */ + *poolindex = fm->hash_per_worker[my_cpu_number][h]; + if (*poolindex != ~0) + { + e = pool_elt_at_index (fm->pool_per_worker[my_cpu_number], *poolindex); + if (e) + { + /* Verify key or report collision */ + if (memcmp (k, &e->key, sizeof (flowprobe_key_t))) + *collision = true; + return e; + } + } + + return 0; +} + +flowprobe_entry_t * +flowprobe_create (u32 my_cpu_number, flowprobe_key_t * k, u32 * poolindex) +{ + flowprobe_main_t *fm = &flowprobe_main; + u32 h; + + flowprobe_entry_t *e; + + /* Get my index */ + h = (fm->active_timer) ? flowprobe_hash (k) : 0; + + pool_get (fm->pool_per_worker[my_cpu_number], e); + *poolindex = e - fm->pool_per_worker[my_cpu_number]; + fm->hash_per_worker[my_cpu_number][h] = *poolindex; + + e->key = *k; + + if (fm->passive_timer > 0) + { + e->passive_timer_handle = tw_timer_start_2t_1w_2048sl + (fm->timers_per_worker[my_cpu_number], *poolindex, 0, + fm->passive_timer); + } + return e; +} + +static inline void +add_to_flow_record_state (vlib_main_t * vm, vlib_node_runtime_t * node, + flowprobe_main_t * fm, vlib_buffer_t * b, + u64 timestamp, u16 length, + flowprobe_variant_t which, flowprobe_trace_t * t) +{ + if (fm->disabled) + return; + + u32 my_cpu_number = vm->thread_index; + u16 octets = 0; + + flowprobe_record_t flags = fm->context[which].flags; + bool collect_ip4 = false, collect_ip6 = false; + ASSERT (b); + ethernet_header_t *eth = vlib_buffer_get_current (b); + u16 ethertype = clib_net_to_host_u16 (eth->type); + /* *INDENT-OFF* */ + flowprobe_key_t k = { {0} }; + /* *INDENT-ON* */ + ip4_header_t *ip4 = 0; + ip6_header_t *ip6 = 0; + udp_header_t *udp = 0; + + if (flags & FLOW_RECORD_L3 || flags & FLOW_RECORD_L4) + { + collect_ip4 = which == FLOW_VARIANT_L2_IP4 || which == FLOW_VARIANT_IP4; + collect_ip6 = which == FLOW_VARIANT_L2_IP6 || which == FLOW_VARIANT_IP6; + } + + k.rx_sw_if_index = vnet_buffer (b)->sw_if_index[VLIB_RX]; + k.tx_sw_if_index = vnet_buffer (b)->sw_if_index[VLIB_TX]; + + k.which = which; + + if (flags & FLOW_RECORD_L2) + { + clib_memcpy (k.src_mac, eth->src_address, 6); + clib_memcpy (k.dst_mac, eth->dst_address, 6); + k.ethertype = ethertype; + } + if (collect_ip6 && ethertype == ETHERNET_TYPE_IP6) + { + ip6 = (ip6_header_t *) (eth + 1); + udp = (udp_header_t *) (ip6 + 1); + if (flags & FLOW_RECORD_L3) + { + k.src_address.as_u64[0] = ip6->src_address.as_u64[0]; + k.src_address.as_u64[1] = ip6->src_address.as_u64[1]; + k.dst_address.as_u64[0] = ip6->dst_address.as_u64[0]; + k.dst_address.as_u64[1] = ip6->dst_address.as_u64[1]; + } + k.protocol = ip6->protocol; + octets = clib_net_to_host_u16 (ip6->payload_length) + + sizeof (ip6_header_t); + } + if (collect_ip4 && ethertype == ETHERNET_TYPE_IP4) + { + ip4 = (ip4_header_t *) (eth + 1); + udp = (udp_header_t *) (ip4 + 1); + if (flags & FLOW_RECORD_L3) + { + k.src_address.ip4.as_u32 = ip4->src_address.as_u32; + k.dst_address.ip4.as_u32 = ip4->dst_address.as_u32; + } + k.protocol = ip4->protocol; + octets = clib_net_to_host_u16 (ip4->length); + } + if ((flags & FLOW_RECORD_L4) && udp && + (k.protocol == IP_PROTOCOL_TCP || k.protocol == IP_PROTOCOL_UDP)) + { + k.src_port = udp->src_port; + k.dst_port = udp->dst_port; + } + + if (t) + { + t->rx_sw_if_index = k.rx_sw_if_index; + t->tx_sw_if_index = k.tx_sw_if_index; + clib_memcpy (t->src_mac, k.src_mac, 6); + clib_memcpy (t->dst_mac, k.dst_mac, 6); + t->ethertype = k.ethertype; + t->src_address.ip4.as_u32 = k.src_address.ip4.as_u32; + t->dst_address.ip4.as_u32 = k.dst_address.ip4.as_u32; + t->protocol = k.protocol; + t->src_port = k.src_port; + t->dst_port = k.dst_port; + t->which = k.which; + } + + flowprobe_entry_t *e = 0; + f64 now = vlib_time_now (vm); + if (fm->active_timer > 0) + { + u32 poolindex = ~0; + bool collision = false; + + e = flowprobe_lookup (my_cpu_number, &k, &poolindex, &collision); + if (collision) + { + /* Flush data and clean up entry for reuse. */ + if (e->packetcount) + flowprobe_export_entry (vm, e); + e->key = k; + vlib_node_increment_counter (vm, node->node_index, + FLOWPROBE_ERROR_COLLISION, 1); + } + if (!e) /* Create new entry */ + { + e = flowprobe_create (my_cpu_number, &k, &poolindex); + e->last_exported = now; + } + } + else + { + e = &fm->stateless_entry[my_cpu_number]; + e->key = k; + } + + if (e) + { + /* Updating entry */ + e->packetcount++; + e->octetcount += octets; + e->last_updated = now; + + if (fm->active_timer == 0 + || (now > e->last_exported + fm->active_timer)) + flowprobe_export_entry (vm, e); + } +} + +static u16 +flowprobe_get_headersize (void) +{ + return sizeof (ip4_header_t) + sizeof (udp_header_t) + + sizeof (ipfix_message_header_t) + sizeof (ipfix_set_header_t); +} + +static void +flowprobe_export_send (vlib_main_t * vm, vlib_buffer_t * b0, + flowprobe_variant_t which) +{ + flowprobe_main_t *fm = &flowprobe_main; + flow_report_main_t *frm = &flow_report_main; + vlib_frame_t *f; + ip4_ipfix_template_packet_t *tp; + ipfix_set_header_t *s; + ipfix_message_header_t *h; + ip4_header_t *ip; + udp_header_t *udp; + flowprobe_record_t flags = fm->context[which].flags; + u32 my_cpu_number = vm->thread_index; + + /* Fill in header */ + flow_report_stream_t *stream; + + /* Nothing to send */ + if (fm->context[which].next_record_offset_per_worker[my_cpu_number] <= + flowprobe_get_headersize ()) + return; + + u32 i, index = vec_len (frm->streams); + for (i = 0; i < index; i++) + if (frm->streams[i].domain_id == 1) + { + index = i; + break; + } + if (i == vec_len (frm->streams)) + { + vec_validate (frm->streams, index); + frm->streams[index].domain_id = 1; + } + stream = &frm->streams[index]; + + tp = vlib_buffer_get_current (b0); + ip = (ip4_header_t *) & tp->ip4; + udp = (udp_header_t *) (ip + 1); + h = (ipfix_message_header_t *) (udp + 1); + s = (ipfix_set_header_t *) (h + 1); + + ip->ip_version_and_header_length = 0x45; + ip->ttl = 254; + ip->protocol = IP_PROTOCOL_UDP; + ip->flags_and_fragment_offset = 0; + ip->src_address.as_u32 = frm->src_address.as_u32; + ip->dst_address.as_u32 = frm->ipfix_collector.as_u32; + udp->src_port = clib_host_to_net_u16 (UDP_DST_PORT_ipfix); + udp->dst_port = clib_host_to_net_u16 (UDP_DST_PORT_ipfix); + udp->checksum = 0; + + /* FIXUP: message header export_time */ + h->export_time = (u32) + (((f64) frm->unix_time_0) + + (vlib_time_now (frm->vlib_main) - frm->vlib_time_0)); + h->export_time = clib_host_to_net_u32 (h->export_time); + h->domain_id = clib_host_to_net_u32 (stream->domain_id); + + /* FIXUP: message header sequence_number */ + h->sequence_number = stream->sequence_number++; + h->sequence_number = clib_host_to_net_u32 (h->sequence_number); + + s->set_id_length = ipfix_set_id_length (fm->template_reports[flags], + b0->current_length - + (sizeof (*ip) + sizeof (*udp) + + sizeof (*h))); + h->version_length = version_length (b0->current_length - + (sizeof (*ip) + sizeof (*udp))); + + ip->length = clib_host_to_net_u16 (b0->current_length); + + ip->checksum = ip4_header_checksum (ip); + udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip)); + + if (frm->udp_checksum) + { + /* RFC 7011 section 10.3.2. */ + udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip); + if (udp->checksum == 0) + udp->checksum = 0xffff; + } + + ASSERT (ip->checksum == ip4_header_checksum (ip)); + + /* Find or allocate a frame */ + f = fm->context[which].frames_per_worker[my_cpu_number]; + if (PREDICT_FALSE (f == 0)) + { + u32 *to_next; + f = vlib_get_frame_to_node (vm, ip4_lookup_node.index); + fm->context[which].frames_per_worker[my_cpu_number] = f; + u32 bi0 = vlib_get_buffer_index (vm, b0); + + /* Enqueue the buffer */ + to_next = vlib_frame_vector_args (f); + to_next[0] = bi0; + f->n_vectors = 1; + } + + vlib_put_frame_to_node (vm, ip4_lookup_node.index, f); + vlib_node_increment_counter (vm, flowprobe_l2_node.index, + FLOWPROBE_ERROR_EXPORTED_PACKETS, 1); + + fm->context[which].frames_per_worker[my_cpu_number] = 0; + fm->context[which].buffers_per_worker[my_cpu_number] = 0; + fm->context[which].next_record_offset_per_worker[my_cpu_number] = + flowprobe_get_headersize (); +} + +static vlib_buffer_t * +flowprobe_get_buffer (vlib_main_t * vm, flowprobe_variant_t which) +{ + flowprobe_main_t *fm = &flowprobe_main; + flow_report_main_t *frm = &flow_report_main; + vlib_buffer_t *b0; + u32 bi0; + vlib_buffer_free_list_t *fl; + u32 my_cpu_number = vm->thread_index; + + /* Find or allocate a buffer */ + b0 = fm->context[which].buffers_per_worker[my_cpu_number]; + + /* Need to allocate a buffer? */ + if (PREDICT_FALSE (b0 == 0)) + { + if (vlib_buffer_alloc (vm, &bi0, 1) != 1) + { + vlib_node_increment_counter (vm, flowprobe_l2_node.index, + FLOWPROBE_ERROR_BUFFER, 1); + return 0; + } + + /* Initialize the buffer */ + b0 = fm->context[which].buffers_per_worker[my_cpu_number] = + vlib_get_buffer (vm, bi0); + fl = + vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); + vlib_buffer_init_for_free_list (b0, fl); + VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0); + + b0->current_data = 0; + b0->current_length = flowprobe_get_headersize (); + b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VLIB_BUFFER_FLOW_REPORT); + vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0; + vnet_buffer (b0)->sw_if_index[VLIB_TX] = frm->fib_index; + fm->context[which].next_record_offset_per_worker[my_cpu_number] = + b0->current_length; + } + + return b0; +} + +static void +flowprobe_export_entry (vlib_main_t * vm, flowprobe_entry_t * e) +{ + u32 my_cpu_number = vm->thread_index; + flowprobe_main_t *fm = &flowprobe_main; + flow_report_main_t *frm = &flow_report_main; + vlib_buffer_t *b0; + bool collect_ip4 = false, collect_ip6 = false; + flowprobe_variant_t which = e->key.which; + flowprobe_record_t flags = fm->context[which].flags; + u16 offset = + fm->context[which].next_record_offset_per_worker[my_cpu_number]; + + if (offset < flowprobe_get_headersize ()) + offset = flowprobe_get_headersize (); + + b0 = flowprobe_get_buffer (vm, which); + /* No available buffer, what to do... */ + if (b0 == 0) + return; + + if (flags & FLOW_RECORD_L3) + { + collect_ip4 = which == FLOW_VARIANT_L2_IP4 || which == FLOW_VARIANT_IP4; + collect_ip6 = which == FLOW_VARIANT_L2_IP6 || which == FLOW_VARIANT_IP6; + } + + offset += flowprobe_common_add (b0, e, offset); + + if (flags & FLOW_RECORD_L2) + offset += flowprobe_l2_add (b0, e, offset); + if (collect_ip6) + offset += flowprobe_l3_ip6_add (b0, e, offset); + if (collect_ip4) + offset += flowprobe_l3_ip4_add (b0, e, offset); + if (flags & FLOW_RECORD_L4) + offset += flowprobe_l4_add (b0, e, offset); + + /* Reset per flow-export counters */ + e->packetcount = 0; + e->octetcount = 0; + e->last_exported = vlib_time_now (vm); + + b0->current_length = offset; + + fm->context[which].next_record_offset_per_worker[my_cpu_number] = offset; + /* Time to flush the buffer? */ + if (offset + fm->template_size[flags] > frm->path_mtu) + flowprobe_export_send (vm, b0, which); +} + +uword +flowprobe_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame, + flowprobe_variant_t which) +{ + u32 n_left_from, *from, *to_next; + flowprobe_next_t next_index; + flowprobe_main_t *fm = &flowprobe_main; + u64 now; + + now = (u64) ((vlib_time_now (vm) - fm->vlib_time_0) * 1e9); + now += fm->nanosecond_time_0; + + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + next_index = node->cached_next_index; + + while (n_left_from > 0) + { + u32 n_left_to_next; + + vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); + + while (n_left_from >= 4 && n_left_to_next >= 2) + { + u32 next0 = FLOWPROBE_NEXT_DROP; + u32 next1 = FLOWPROBE_NEXT_DROP; + u16 len0, len1; + u32 bi0, bi1; + vlib_buffer_t *b0, *b1; + + /* Prefetch next iteration. */ + { + vlib_buffer_t *p2, *p3; + + p2 = vlib_get_buffer (vm, from[2]); + p3 = vlib_get_buffer (vm, from[3]); + + vlib_prefetch_buffer_header (p2, LOAD); + vlib_prefetch_buffer_header (p3, LOAD); + + CLIB_PREFETCH (p2->data, CLIB_CACHE_LINE_BYTES, STORE); + CLIB_PREFETCH (p3->data, CLIB_CACHE_LINE_BYTES, STORE); + } + + /* speculatively enqueue b0 and b1 to the current next frame */ + to_next[0] = bi0 = from[0]; + to_next[1] = bi1 = from[1]; + from += 2; + to_next += 2; + n_left_from -= 2; + n_left_to_next -= 2; + + b0 = vlib_get_buffer (vm, bi0); + b1 = vlib_get_buffer (vm, bi1); + + vnet_feature_next (vnet_buffer (b0)->sw_if_index[VLIB_TX], + &next0, b0); + vnet_feature_next (vnet_buffer (b1)->sw_if_index[VLIB_TX], + &next1, b1); + + len0 = vlib_buffer_length_in_chain (vm, b0); + ethernet_header_t *eh0 = vlib_buffer_get_current (b0); + u16 ethertype0 = clib_net_to_host_u16 (eh0->type); + + if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) + add_to_flow_record_state (vm, node, fm, b0, now, len0, + flowprobe_get_variant + (which, fm->context[which].flags, + ethertype0), 0); + + len1 = vlib_buffer_length_in_chain (vm, b1); + ethernet_header_t *eh1 = vlib_buffer_get_current (b1); + u16 ethertype1 = clib_net_to_host_u16 (eh1->type); + + if (PREDICT_TRUE ((b1->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) + add_to_flow_record_state (vm, node, fm, b1, now, len1, + flowprobe_get_variant + (which, fm->context[which].flags, + ethertype1), 0); + + /* verify speculative enqueues, maybe switch current next frame */ + vlib_validate_buffer_enqueue_x2 (vm, node, next_index, + to_next, n_left_to_next, + bi0, bi1, next0, next1); + } + + while (n_left_from > 0 && n_left_to_next > 0) + { + u32 bi0; + vlib_buffer_t *b0; + u32 next0 = FLOWPROBE_NEXT_DROP; + u16 len0; + + /* speculatively enqueue b0 to the current next frame */ + bi0 = from[0]; + to_next[0] = bi0; + from += 1; + to_next += 1; + n_left_from -= 1; + n_left_to_next -= 1; + + b0 = vlib_get_buffer (vm, bi0); + + vnet_feature_next (vnet_buffer (b0)->sw_if_index[VLIB_TX], + &next0, b0); + + len0 = vlib_buffer_length_in_chain (vm, b0); + ethernet_header_t *eh0 = vlib_buffer_get_current (b0); + u16 ethertype0 = clib_net_to_host_u16 (eh0->type); + + if (PREDICT_TRUE ((b0->flags & VLIB_BUFFER_FLOW_REPORT) == 0)) + { + flowprobe_trace_t *t = 0; + if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) + && (b0->flags & VLIB_BUFFER_IS_TRACED))) + t = vlib_add_trace (vm, node, b0, sizeof (*t)); + + add_to_flow_record_state (vm, node, fm, b0, now, len0, + flowprobe_get_variant + (which, fm->context[which].flags, + ethertype0), t); + } + + /* verify speculative enqueue, maybe switch current next frame */ + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, + to_next, n_left_to_next, + bi0, next0); + } + + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + } + return frame->n_vectors; +} + +static uword +flowprobe_ip4_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP4); +} + +static uword +flowprobe_ip6_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_IP6); +} + +static uword +flowprobe_l2_node_fn (vlib_main_t * vm, + vlib_node_runtime_t * node, vlib_frame_t * frame) +{ + return flowprobe_node_fn (vm, node, frame, FLOW_VARIANT_L2); +} + +static inline void +flush_record (flowprobe_variant_t which) +{ + vlib_main_t *vm = vlib_get_main (); + vlib_buffer_t *b = flowprobe_get_buffer (vm, which); + if (b) + flowprobe_export_send (vm, b, which); +} + +void +flowprobe_flush_callback_ip4 (void) +{ + flush_record (FLOW_VARIANT_IP4); +} + +void +flowprobe_flush_callback_ip6 (void) +{ + flush_record (FLOW_VARIANT_IP6); +} + +void +flowprobe_flush_callback_l2 (void) +{ + flush_record (FLOW_VARIANT_L2); + flush_record (FLOW_VARIANT_L2_IP4); + flush_record (FLOW_VARIANT_L2_IP6); +} + + +static void +flowprobe_delete_by_index (u32 my_cpu_number, u32 poolindex) +{ + flowprobe_main_t *fm = &flowprobe_main; + flowprobe_entry_t *e; + u32 h; + + e = pool_elt_at_index (fm->pool_per_worker[my_cpu_number], poolindex); + + /* Get my index */ + h = flowprobe_hash (&e->key); + + /* Reset hash */ + fm->hash_per_worker[my_cpu_number][h] = ~0; + + pool_put_index (fm->pool_per_worker[my_cpu_number], poolindex); +} + + +/* Per worker process processing the active/passive expired entries */ +static uword +flowprobe_walker_process (vlib_main_t * vm, + vlib_node_runtime_t * rt, vlib_frame_t * f) +{ + flowprobe_main_t *fm = &flowprobe_main; + flow_report_main_t *frm = &flow_report_main; + flowprobe_entry_t *e; + + /* + * $$$$ Remove this check from here and track FRM status and disable + * this process if required. + */ + if (frm->ipfix_collector.as_u32 == 0 || frm->src_address.as_u32 == 0) + { + fm->disabled = true; + return 0; + } + fm->disabled = false; + + u32 cpu_index = os_get_thread_index (); + u32 *to_be_removed = 0, *i; + + /* + * Tick the timer when required and process the vector of expired + * timers + */ + f64 start_time = vlib_time_now (vm); + u32 count = 0; + + tw_timer_expire_timers_2t_1w_2048sl (fm->timers_per_worker[cpu_index], + start_time); + + vec_foreach (i, fm->expired_passive_per_worker[cpu_index]) + { + u32 exported = 0; + f64 now = vlib_time_now (vm); + if (now > start_time + 100e-6 + || exported > FLOW_MAXIMUM_EXPORT_ENTRIES - 1) + break; + + if (pool_is_free_index (fm->pool_per_worker[cpu_index], *i)) + { + clib_warning ("Element is %d is freed already\n", *i); + continue; + } + else + e = pool_elt_at_index (fm->pool_per_worker[cpu_index], *i); + + /* Check last update timestamp. If it is longer than passive time nuke + * entry. Otherwise restart timer with what's left + * Premature passive timer by more than 10% + */ + if ((now - e->last_updated) < (fm->passive_timer * 0.9)) + { + f64 delta = fm->passive_timer - (now - e->last_updated); + e->passive_timer_handle = tw_timer_start_2t_1w_2048sl + (fm->timers_per_worker[cpu_index], *i, 0, delta); + } + else /* Nuke entry */ + { + vec_add1 (to_be_removed, *i); + } + /* If anything to report send it to the exporter */ + if (e->packetcount && now > e->last_exported + fm->active_timer) + { + exported++; + flowprobe_export_entry (vm, e); + } + count++; + } + if (count) + vec_delete (fm->expired_passive_per_worker[cpu_index], count, 0); + + vec_foreach (i, to_be_removed) flowprobe_delete_by_index (cpu_index, *i); + vec_free (to_be_removed); + + return 0; +} + +/* *INDENT-OFF* */ +VLIB_REGISTER_NODE (flowprobe_ip4_node) = { + .function = flowprobe_ip4_node_fn, + .name = "flowprobe-ip4", + .vector_size = sizeof (u32), + .format_trace = format_flowprobe_trace, + .type = VLIB_NODE_TYPE_INTERNAL, + .n_errors = ARRAY_LEN(flowprobe_error_strings), + .error_strings = flowprobe_error_strings, + .n_next_nodes = FLOWPROBE_N_NEXT, + .next_nodes = FLOWPROBE_NEXT_NODES, +}; +VLIB_REGISTER_NODE (flowprobe_ip6_node) = { + .function = flowprobe_ip6_node_fn, + .name = "flowprobe-ip6", + .vector_size = sizeof (u32), + .format_trace = format_flowprobe_trace, + .type = VLIB_NODE_TYPE_INTERNAL, + .n_errors = ARRAY_LEN(flowprobe_error_strings), + .error_strings = flowprobe_error_strings, + .n_next_nodes = FLOWPROBE_N_NEXT, + .next_nodes = FLOWPROBE_NEXT_NODES, +}; +VLIB_REGISTER_NODE (flowprobe_l2_node) = { + .function = flowprobe_l2_node_fn, + .name = "flowprobe-l2", + .vector_size = sizeof (u32), + .format_trace = format_flowprobe_trace, + .type = VLIB_NODE_TYPE_INTERNAL, + .n_errors = ARRAY_LEN(flowprobe_error_strings), + .error_strings = flowprobe_error_strings, + .n_next_nodes = FLOWPROBE_N_NEXT, + .next_nodes = FLOWPROBE_NEXT_NODES, +}; +VLIB_REGISTER_NODE (flowprobe_walker_node) = { + .function = flowprobe_walker_process, + .name = "flowprobe-walker", + .type = VLIB_NODE_TYPE_INPUT, + .state = VLIB_NODE_STATE_INTERRUPT, +}; +/* *INDENT-ON* */ + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/plugins/ioam/analyse/ioam_summary_export.c b/src/plugins/ioam/analyse/ioam_summary_export.c index 9a2667f67f8..af2d39ab582 100644 --- a/src/plugins/ioam/analyse/ioam_summary_export.c +++ b/src/plugins/ioam/analyse/ioam_summary_export.c @@ -398,6 +398,7 @@ ioam_flow_create (u8 del) int rv; u32 domain_id = 0; flow_report_main_t *frm = &flow_report_main; + u16 template_id; memset (&args, 0, sizeof (args)); args.rewrite_callback = ioam_template_rewrite; @@ -405,7 +406,7 @@ ioam_flow_create (u8 del) del ? (args.is_add = 0) : (args.is_add = 1); args.domain_id = domain_id; - rv = vnet_flow_report_add_del (frm, &args); + rv = vnet_flow_report_add_del (frm, &args, &template_id); switch (rv) { diff --git a/src/plugins/ioam/udp-ping/udp_ping_export.c b/src/plugins/ioam/udp-ping/udp_ping_export.c index ce64b606f1f..91cbb4bd157 100644 --- a/src/plugins/ioam/udp-ping/udp_ping_export.c +++ b/src/plugins/ioam/udp-ping/udp_ping_export.c @@ -226,6 +226,7 @@ udp_ping_flow_create (u8 del) int rv; u32 domain_id = 0; flow_report_main_t *frm = &flow_report_main; + u16 template_id; memset (&args, 0, sizeof (args)); args.rewrite_callback = udp_ping_template_rewrite; @@ -234,7 +235,7 @@ udp_ping_flow_create (u8 del) args.domain_id = domain_id; args.src_port = UDP_DST_PORT_ipfix; - rv = vnet_flow_report_add_del (frm, &args); + rv = vnet_flow_report_add_del (frm, &args, &template_id); switch (rv) { diff --git a/src/plugins/snat/snat_ipfix_logging.c b/src/plugins/snat/snat_ipfix_logging.c index 1a11186335c..c68dc540d15 100644 --- a/src/plugins/snat/snat_ipfix_logging.c +++ b/src/plugins/snat/snat_ipfix_logging.c @@ -784,7 +784,7 @@ snat_ipfix_logging_enable_disable (int enable, u32 domain_id, u16 src_port) a.rewrite_callback = snat_template_rewrite_max_entries_per_usr; a.flow_data_callback = snat_data_callback_max_entries_per_usr; - rv = vnet_flow_report_add_del (frm, &a); + rv = vnet_flow_report_add_del (frm, &a, NULL); if (rv) { clib_warning ("vnet_flow_report_add_del returned %d", rv); @@ -796,7 +796,7 @@ snat_ipfix_logging_enable_disable (int enable, u32 domain_id, u16 src_port) a.rewrite_callback = snat_template_rewrite_nat44_session; a.flow_data_callback = snat_data_callback_nat44_session; - rv = vnet_flow_report_add_del (frm, &a); + rv = vnet_flow_report_add_del (frm, &a, NULL); if (rv) { clib_warning ("vnet_flow_report_add_del returned %d", rv); @@ -806,7 +806,7 @@ snat_ipfix_logging_enable_disable (int enable, u32 domain_id, u16 src_port) a.rewrite_callback = snat_template_rewrite_addr_exhausted; a.flow_data_callback = snat_data_callback_addr_exhausted; - rv = vnet_flow_report_add_del (frm, &a); + rv = vnet_flow_report_add_del (frm, &a, NULL); if (rv) { clib_warning ("vnet_flow_report_add_del returned %d", rv); diff --git a/src/vnet/flow/flow_api.c b/src/vnet/flow/flow_api.c index b975dda199f..52a608ca6e7 100644 --- a/src/vnet/flow/flow_api.c +++ b/src/vnet/flow/flow_api.c @@ -296,7 +296,7 @@ static void args.domain_id = fcm->domain_id; args.src_port = fcm->src_port; - rv = vnet_flow_report_add_del (frm, &args); + rv = vnet_flow_report_add_del (frm, &args, NULL); /* If deleting, or add failed */ if (is_add == 0 || (rv && is_add)) diff --git a/src/vnet/flow/flow_report.c b/src/vnet/flow/flow_report.c index c78a78a9680..ccc84235695 100644 --- a/src/vnet/flow/flow_report.c +++ b/src/vnet/flow/flow_report.c @@ -87,7 +87,6 @@ int send_template_packet (flow_report_main_t *frm, if (frm->ipfix_collector.as_u32 == 0 || frm->src_address.as_u32 == 0) { - clib_warning ("no collector: disabling flow collector process"); vlib_node_set_state (frm->vlib_main, flow_report_process_node.index, VLIB_NODE_STATE_DISABLED); return -1; @@ -238,7 +237,8 @@ VLIB_REGISTER_NODE (flow_report_process_node) = { }; int vnet_flow_report_add_del (flow_report_main_t *frm, - vnet_flow_report_add_del_args_t *a) + vnet_flow_report_add_del_args_t *a, + u16 *template_id) { int i; int found_index = ~0; @@ -260,6 +260,8 @@ int vnet_flow_report_add_del (flow_report_main_t *frm, && fr->flow_data_callback == a->flow_data_callback) { found_index = i; + if (template_id) + *template_id = fr->template_id; break; } } @@ -304,7 +306,10 @@ int vnet_flow_report_add_del (flow_report_main_t *frm, fr->opaque = a->opaque; fr->rewrite_callback = a->rewrite_callback; fr->flow_data_callback = a->flow_data_callback; - + + if (template_id) + *template_id = fr->template_id; + return 0; } @@ -415,10 +420,7 @@ set_ipfix_exporter_command_fn (vlib_main_t * vm, break; } - if (collector.as_u32 == 0) - return clib_error_return (0, "collector address required"); - - if (src.as_u32 == 0) + if (collector.as_u32 != 0 && src.as_u32 == 0) return clib_error_return (0, "src address required"); if (path_mtu > 1450 /* vpp does not support fragmentation */) @@ -441,7 +443,8 @@ set_ipfix_exporter_command_fn (vlib_main_t * vm, frm->template_interval = template_interval; frm->udp_checksum = udp_checksum; - vlib_cli_output (vm, "Collector %U, src address %U, " + if (collector.as_u32) + vlib_cli_output (vm, "Collector %U, src address %U, " "fib index %d, path MTU %u, " "template resend interval %us, " "udp checksum %s", @@ -449,6 +452,8 @@ set_ipfix_exporter_command_fn (vlib_main_t * vm, format_ip4_address, &frm->src_address, fib_index, path_mtu, template_interval, udp_checksum ? "enabled" : "disabled"); + else + vlib_cli_output (vm, "IPFIX Collector is disabled"); /* Turn on the flow reporting process */ vlib_process_signal_event (vm, flow_report_process_node.index, diff --git a/src/vnet/flow/flow_report.h b/src/vnet/flow/flow_report.h index e8ed3818534..01859ce5888 100644 --- a/src/vnet/flow/flow_report.h +++ b/src/vnet/flow/flow_report.h @@ -130,7 +130,8 @@ typedef struct { } vnet_flow_report_add_del_args_t; int vnet_flow_report_add_del (flow_report_main_t *frm, - vnet_flow_report_add_del_args_t *a); + vnet_flow_report_add_del_args_t *a, + u16 *template_id); clib_error_t * flow_report_add_del_error_to_clib_error (int error); diff --git a/src/vnet/flow/flow_report_classify.c b/src/vnet/flow/flow_report_classify.c index 27f03ddcd68..d4c30492e2d 100644 --- a/src/vnet/flow/flow_report_classify.c +++ b/src/vnet/flow/flow_report_classify.c @@ -458,7 +458,7 @@ ipfix_classify_table_add_del_command_fn (vlib_main_t * vm, args.domain_id = fcm->domain_id; args.src_port = fcm->src_port; - rv = vnet_flow_report_add_del (frm, &args); + rv = vnet_flow_report_add_del (frm, &args, NULL); error = flow_report_add_del_error_to_clib_error(rv); diff --git a/src/vnet/ip/ip6_packet.h b/src/vnet/ip/ip6_packet.h index cdd7eed5a82..c0c745e27fe 100644 --- a/src/vnet/ip/ip6_packet.h +++ b/src/vnet/ip/ip6_packet.h @@ -341,6 +341,12 @@ typedef struct ip6_address_t src_address, dst_address; } ip6_header_t; +always_inline u8 +ip6_traffic_class (ip6_header_t * i) +{ + return (i->ip_version_traffic_class_and_flow_label & 0x0FF00000) >> 20; +} + always_inline void * ip6_next_header (ip6_header_t * i) { diff --git a/src/vpp-api/java/jvpp/gen/jvpp_gen.py b/src/vpp-api/java/jvpp/gen/jvpp_gen.py index 7932741d94a..6648a4f7156 100755 --- a/src/vpp-api/java/jvpp/gen/jvpp_gen.py +++ b/src/vpp-api/java/jvpp/gen/jvpp_gen.py @@ -30,7 +30,7 @@ from jvppgen import jvpp_impl_gen from jvppgen import jvpp_c_gen from jvppgen import util -blacklist = [ "memclnt.api", "flowperpkt.api" ] +blacklist = [ "memclnt.api", "flowprobe.api" ] # Invocation: # ~/Projects/vpp/vpp-api/jvpp/gen$ mkdir -p java/io/fd/vpp/jvpp && cd java/io/fd/vpp/jvpp diff --git a/test/test_flowperpkt.py b/test/test_flowperpkt.py deleted file mode 100644 index b13d0c63a1b..00000000000 --- a/test/test_flowperpkt.py +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/env python - -import unittest - -from framework import VppTestCase, VppTestRunner - -from scapy.packet import Raw -from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP - - -class TestFlowperpkt(VppTestCase): - """ Flow-per-packet plugin: test both L2 and IP4 reporting """ - - def setUp(self): - """ - Set up - - **Config:** - - create three PG interfaces - """ - super(TestFlowperpkt, self).setUp() - - self.create_pg_interfaces(range(3)) - - self.pg_if_packet_sizes = [150] - - self.interfaces = list(self.pg_interfaces) - - for intf in self.interfaces: - intf.admin_up() - intf.config_ip4() - intf.resolve_arp() - - def create_stream(self, src_if, dst_if, packet_sizes): - """Create a packet stream to tickle the plugin - - :param VppInterface src_if: Source interface for packet stream - :param VppInterface src_if: Dst interface for packet stream - :param list packet_sizes: Sizes to test - """ - pkts = [] - for size in packet_sizes: - info = self.create_packet_info(src_if, dst_if) - payload = self.info_to_payload(info) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) / - UDP(sport=1234, dport=4321) / - Raw(payload)) - info.data = p.copy() - self.extend_packet(p, size) - pkts.append(p) - return pkts - - @staticmethod - def compare_with_mask(payload, masked_expected_data): - if len(payload) * 2 != len(masked_expected_data): - return False - - # iterate over pairs: raw byte from payload and ASCII code for that - # byte from masked payload (or XX if masked) - for i in range(len(payload)): - p = payload[i] - m = masked_expected_data[2 * i:2 * i + 2] - if m != "XX": - if "%02x" % ord(p) != m: - return False - return True - - def verify_ipfix(self, collector_if): - """Check the ipfix capture""" - found_data_packet = False - found_template_packet = False - found_l2_data_packet = False - found_l2_template_packet = False - - # Scapy, of course, understands ipfix not at all... - # These data vetted by manual inspection in wireshark - # X'ed out fields are timestamps, which will absolutely - # fail to compare. - - data_udp_string = "1283128300370000000a002fXXXXXXXX000000000000000101"\ - "00001f0000000100000002ac100102ac10020200XXXXXXXXXXXXXXXX0092" - - template_udp_string = "12831283003c0000000a0034XXXXXXXX00000002000000"\ - "010002002401000007000a0004000e000400080004000c000400050001009c00"\ - "0801380002" - - l2_data_udp_string = "12831283003c0000000a0034XXXXXXXX000000010000000"\ - "1010100240000000100000002%s02020000ff020008XXXXXXXXXXX"\ - "XXXXX0092" % self.pg1.local_mac.translate(None, ":") - - l2_template_udp_string = "12831283003c0000000a0034XXXXXXXX00000002000"\ - "000010002002401010007000a0004000e0004003800060050000601000002009"\ - "c000801380002" - - self.logger.info("Look for ipfix packets on %s sw_if_index %d " - % (collector_if.name, collector_if.sw_if_index)) - # expecting 4 packets on collector interface based on traffic on other - # interfaces - capture = collector_if.get_capture(4) - - for p in capture: - ip = p[IP] - udp = p[UDP] - self.logger.info("src %s dst %s" % (ip.src, ip.dst)) - self.logger.info(" udp src_port %s dst_port %s" - % (udp.sport, udp.dport)) - - payload = str(udp) - - if self.compare_with_mask(payload, data_udp_string): - self.logger.info("found ip4 data packet") - found_data_packet = True - elif self.compare_with_mask(payload, template_udp_string): - self.logger.info("found ip4 template packet") - found_template_packet = True - elif self.compare_with_mask(payload, l2_data_udp_string): - self.logger.info("found l2 data packet") - found_l2_data_packet = True - elif self.compare_with_mask(payload, l2_template_udp_string): - self.logger.info("found l2 template packet") - found_l2_template_packet = True - else: - unmasked_payload = "".join(["%02x" % ord(c) for c in payload]) - self.logger.error("unknown pkt '%s'" % unmasked_payload) - - self.assertTrue(found_data_packet, "Data packet not found") - self.assertTrue(found_template_packet, "Template packet not found") - self.assertTrue(found_l2_data_packet, "L2 data packet not found") - self.assertTrue(found_l2_template_packet, - "L2 template packet not found") - - def test_L3_fpp(self): - """ Flow per packet L3 test """ - - # Configure an ipfix report on the [nonexistent] collector - # 172.16.3.2, as if it was connected to the pg2 interface - # Install a FIB entry, so the exporter's work won't turn into - # an ARP request - - self.pg_enable_capture(self.pg_interfaces) - self.pg2.configure_ipv4_neighbors() - self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n, - src_address=self.pg2.local_ip4n, - path_mtu=1450, - template_interval=1) - - # Export flow records for all pkts transmitted on pg1 - self.vapi.cli("flowperpkt feature add-del pg1") - self.vapi.cli("flowperpkt feature add-del pg1 l2") - - # Arrange to minimally trace generated ipfix packets - self.vapi.cli("trace add flowperpkt-ipv4 10") - self.vapi.cli("trace add flowperpkt-l2 10") - - # Create a stream from pg0 -> pg1, which causes - # an ipfix packet to be transmitted on pg2 - - pkts = self.create_stream(self.pg0, self.pg1, - self.pg_if_packet_sizes) - self.pg0.add_stream(pkts) - self.pg_start() - - # Flush the ipfix collector, so we don't need any - # asinine time.sleep(5) action - self.vapi.cli("ipfix flush") - - # Make sure the 4 pkts we expect actually showed up - self.verify_ipfix(self.pg2) - -if __name__ == '__main__': - unittest.main(testRunner=VppTestRunner) diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py new file mode 100644 index 00000000000..560b44cc991 --- /dev/null +++ b/test/test_flowprobe.py @@ -0,0 +1,967 @@ +#!/usr/bin/env python +import random +import socket +import unittest +import time + +from scapy.packet import Raw +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, UDP +from scapy.layers.inet6 import IPv6 + +from framework import VppTestCase, VppTestRunner +from vpp_object import VppObject +from vpp_pg_interface import CaptureTimeoutError +from util import ppp +from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder + + +class VppCFLOW(VppObject): + """CFLOW object for IPFIX exporter and Flowprobe feature""" + + def __init__(self, test, intf='pg2', active=0, passive=0, timeout=300, + mtu=512, datapath='l2', layer='l2 l3 l4'): + self._test = test + self._intf = intf + self._active = active + if passive == 0 or passive < active: + self._passive = active+5 + else: + self._passive = passive + self._datapath = datapath # l2 ip4 ip6 + self._collect = layer # l2 l3 l4 + self._timeout = timeout + self._mtu = mtu + self._configured = False + + def add_vpp_config(self): + self.enable_exporter() + self._test.vapi.ppcli("flowprobe params record %s active %s " + "passive %s" % (self._collect, self._active, + self._passive)) + self.enable_flowprobe_feature() + self._test.vapi.cli("ipfix flush") + self._configured = True + + def remove_vpp_config(self): + self.disable_exporter() + self.disable_flowprobe_feature() + self._test.vapi.cli("ipfix flush") + self._configured = False + + def enable_exporter(self): + self._test.vapi.set_ipfix_exporter( + collector_address=self._test.pg0.remote_ip4n, + src_address=self._test.pg0.local_ip4n, + path_mtu=self._mtu, + template_interval=self._timeout) + + def enable_flowprobe_feature(self): + self._test.vapi.ppcli("flowprobe feature add-del %s %s" % + (self._intf, self._datapath)) + + def disable_exporter(self): + self._test.vapi.cli("set ipfix exporter collector 0.0.0.0") + + def disable_flowprobe_feature(self): + self._test.vapi.cli("flowprobe feature add-del %s %s disable" % + (self._intf, self._datapath)) + + def object_id(self): + return "ipfix-collector-%s" % (self._src, self.dst) + + def query_vpp_config(self): + return self._configured + + def verify_templates(self, decoder=None, timeout=1, count=3): + templates = [] + p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout) + self._test.assertTrue(p.haslayer(IPFIX)) + if decoder is not None and p.haslayer(Template): + templates.append(p[Template].templateID) + decoder.add_template(p.getlayer(Template)) + if count > 1: + p = self._test.wait_for_cflow_packet(self._test.collector, 2) + self._test.assertTrue(p.haslayer(IPFIX)) + if decoder is not None and p.haslayer(Template): + templates.append(p[Template].templateID) + decoder.add_template(p.getlayer(Template)) + if count > 2: + p = self._test.wait_for_cflow_packet(self._test.collector, 2) + self._test.assertTrue(p.haslayer(IPFIX)) + if decoder is not None and p.haslayer(Template): + templates.append(p[Template].templateID) + decoder.add_template(p.getlayer(Template)) + return templates + + +class MethodHolder(VppTestCase): + """ Flow-per-packet plugin: test L2, IP4, IP6 reporting """ + + # Test variables + debug_print = False + max_number_of_packets = 16 + pkts = [] + + @classmethod + def setUpClass(cls): + """ + Perform standard class setup (defined by class method setUpClass in + class VppTestCase) before running the test case, set test case related + variables and configure VPP. + """ + super(MethodHolder, cls).setUpClass() + try: + # Create pg interfaces + cls.create_pg_interfaces(range(7)) + + # Packet sizes + cls.pg_if_packet_sizes = [64, 512, 1518, 9018] + + # Create BD with MAC learning and unknown unicast flooding disabled + # and put interfaces to this BD + cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1) + cls.vapi.sw_interface_set_l2_bridge(cls.pg1._sw_if_index, bd_id=1) + cls.vapi.sw_interface_set_l2_bridge(cls.pg2._sw_if_index, bd_id=1) + + # Set up all interfaces + for i in cls.pg_interfaces: + i.admin_up() + + cls.pg0.config_ip4() + cls.pg0.configure_ipv4_neighbors() + cls.collector = cls.pg0 + + cls.pg1.config_ip4() + cls.pg1.resolve_arp() + cls.pg2.config_ip4() + cls.pg2.resolve_arp() + cls.pg3.config_ip4() + cls.pg3.resolve_arp() + cls.pg4.config_ip4() + cls.pg4.resolve_arp() + + cls.pg5.config_ip6() + cls.pg5.resolve_ndp() + cls.pg5.disable_ipv6_ra() + cls.pg6.config_ip6() + cls.pg6.resolve_ndp() + cls.pg6.disable_ipv6_ra() + except Exception: + super(MethodHolder, cls).tearDownClass() + raise + + def create_stream(self, src_if=None, dst_if=None, packets=None, + size=None, ip_ver='v4'): + """Create a packet stream to tickle the plugin + + :param VppInterface src_if: Source interface for packet stream + :param VppInterface src_if: Dst interface for packet stream + """ + if src_if is None: + src_if = self.pg1 + if dst_if is None: + dst_if = self.pg2 + self.pkts = [] + if packets is None: + packets = random.randint(1, self.max_number_of_packets) + pkt_size = size + for p in range(0, packets): + if size is None: + pkt_size = random.choice(self.pg_if_packet_sizes) + info = self.create_packet_info(src_if, dst_if) + payload = self.info_to_payload(info) + p = Ether(src=src_if.remote_mac, dst=src_if.local_mac) + if ip_ver == 'v4': + p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + else: + p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) + p /= (UDP(sport=1234, dport=4321) / Raw(payload)) + info.data = p.copy() + self.extend_packet(p, pkt_size) + self.pkts.append(p) + + def verify_cflow_data(self, decoder, capture, cflow): + octets = 0 + packets = 0 + for p in capture: + octets += p[IP].len + packets += 1 + if cflow.haslayer(Data): + data = decoder.decode_data_set(cflow.getlayer(Set)) + for record in data: + self.assertEqual(int(record[1].encode('hex'), 16), octets) + self.assertEqual(int(record[2].encode('hex'), 16), packets) + + def verify_cflow_data_detail(self, decoder, capture, cflow, + data_set={1: 'octets', 2: 'packets'}, + ip_ver='v4'): + if self.debug_print: + print capture[0].show() + if cflow.haslayer(Data): + data = decoder.decode_data_set(cflow.getlayer(Set)) + if self.debug_print: + print data + if ip_ver == 'v4': + ip_layer = capture[0][IP] + else: + ip_layer = capture[0][IPv6] + if data_set is not None: + for record in data: + # skip flow if in/out gress interface is 0 + if int(record[10].encode('hex'), 16) == 0: + continue + if int(record[14].encode('hex'), 16) == 0: + continue + + for field in data_set: + if field not in record.keys(): + continue + value = data_set[field] + if value == 'octets': + value = ip_layer.len + if ip_ver == 'v6': + value += 40 # ??? is this correct + elif value == 'packets': + value = 1 + elif value == 'src_ip': + if ip_ver == 'v4': + ip = socket.inet_pton(socket.AF_INET, + ip_layer.src) + else: + ip = socket.inet_pton(socket.AF_INET6, + ip_layer.src) + value = int(ip.encode('hex'), 16) + elif value == 'dst_ip': + if ip_ver == 'v4': + ip = socket.inet_pton(socket.AF_INET, + ip_layer.dst) + else: + ip = socket.inet_pton(socket.AF_INET6, + ip_layer.dst) + value = int(ip.encode('hex'), 16) + elif value == 'sport': + value = int(capture[0][UDP].sport) + elif value == 'dport': + value = int(capture[0][UDP].dport) + self.assertEqual(int(record[field].encode('hex'), 16), + value) + + def verify_cflow_data_notimer(self, decoder, capture, cflows): + idx = 0 + for cflow in cflows: + if cflow.haslayer(Data): + data = decoder.decode_data_set(cflow.getlayer(Set)) + else: + raise Exception("No CFLOW data") + + for rec in data: + p = capture[idx] + idx += 1 + self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16)) + self.assertEqual(1, int(rec[2].encode('hex'), 16)) + self.assertEqual(len(capture), idx) + + def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1, + expected=True): + """ wait for CFLOW packet and verify its correctness + + :param timeout: how long to wait + + :returns: tuple (packet, time spent waiting for packet) + """ + self.logger.info("IPFIX: Waiting for CFLOW packet") + deadline = time.time() + timeout + counter = 0 + # self.logger.debug(self.vapi.ppcli("show flow table")) + while True: + counter += 1 + # sanity check + self.assert_in_range(counter, 0, 100, "number of packets ignored") + time_left = deadline - time.time() + try: + if time_left < 0 and expected: + # self.logger.debug(self.vapi.ppcli("show flow table")) + raise CaptureTimeoutError( + "Packet did not arrive within timeout") + p = collector_intf.wait_for_packet(timeout=time_left) + except CaptureTimeoutError: + if expected: + # self.logger.debug(self.vapi.ppcli("show flow table")) + raise CaptureTimeoutError( + "Packet did not arrive within timeout") + else: + return + if not expected: + raise CaptureTimeoutError("Packet arrived even not expected") + self.assertEqual(p[Set].setID, set_id) + # self.logger.debug(self.vapi.ppcli("show flow table")) + self.logger.debug(ppp("IPFIX: Got packet:", p)) + break + return p + + def send_packets(self, src_if=None, dst_if=None): + self.sleep(3) + if src_if is None: + src_if = self.pg1 + if dst_if is None: + dst_if = self.pg2 + self.pg_enable_capture([dst_if]) + src_if.add_stream(self.pkts) + self.pg_start() + return dst_if.get_capture(len(self.pkts)) + + +class TestFFP_Timers(MethodHolder): + """Template verification, timer tests""" + + def test_0001(self): + """ receive template data packets""" + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + + ipfix = VppCFLOW(test=self, active=10) + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix.verify_templates(timeout=3) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + def test_0002(self): + """ timer=10s, less than template timeout""" + self.logger.info("FFP_TEST_START_0002") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=20) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.create_stream() + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[1], 39) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0002") + + def test_0003(self): + """ timer=30s, greater than template timeout""" + self.logger.info("FFP_TEST_START_0003") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, timeout=20, active=25) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + ipfix.verify_templates(timeout=3) + + self.create_stream() + capture = self.send_packets() + + # next set of template packet should arrive after 20 seconds + # template packet should arrive within 20 s + templates = ipfix.verify_templates(ipfix_decoder, timeout=25) + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[1], 55) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0003") + + def test_0004(self): + """ sent packet after first cflow packet arrived""" + self.logger.info("FFP_TEST_START_0004") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=20) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.create_stream() + self.send_packets() + + # make sure the one packet we expect actually showed up + self.wait_for_cflow_packet(self.collector, templates[1], 39) + + self.pg_enable_capture([self.pg2]) + + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.verify_cflow_data(ipfix_decoder, capture, cflow) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0004") + + +class TestFFP_DatapathL2(MethodHolder): + """collect information on Ethernet datapath""" + + def test_0000(self): + """ verify template on L2 datapath""" + self.logger.info("FFP_TEST_START_0000") + self.pg_enable_capture(self.pg_interfaces) + + ipfix = VppCFLOW(test=self, active=10) + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix.verify_templates(timeout=3, count=3) + self.collector.get_capture(3) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0000") + + def test_0001(self): + """ L2 data on L2 datapath""" + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=10, layer='l2') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + + self.create_stream(packets=1) + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {2: 'packets', 256: 8}) + + # expected two templates and one cflow packet + self.collector.get_capture(2) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + def test_0002(self): + """ L3 data on L2 datapath""" + self.logger.info("FFP_TEST_START_0002") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=10, layer='l3') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=2) + + self.create_stream(packets=1) + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {2: 'packets', 4: 17, + 8: 'src_ip', 12: 'dst_ip'}) + + # expected one template and one cflow packet + self.collector.get_capture(3) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0002") + + def test_0003(self): + """ L4 data on L2 datapath""" + self.logger.info("FFP_TEST_START_0003") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=10, layer='l4') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=2) + + self.create_stream(packets=1) + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {2: 'packets', 7: 'sport', 11: 'dport'}) + + # expected one template and one cflow packet + self.collector.get_capture(3) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0003") + + +class TestFFP_DatapathIP4(MethodHolder): + """collect information on IP4 datapath""" + + def test_0000(self): + """ verify templates on IP4 datapath""" + self.logger.info("FFP_TEST_START_0000") + + self.pg_enable_capture(self.pg_interfaces) + + ipfix = VppCFLOW(test=self, active=10, datapath='ip4') + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix.verify_templates(timeout=3, count=1) + self.collector.get_capture(1) + + ipfix.remove_vpp_config() + + self.logger.info("FFP_TEST_FINISH_0000") + + def test_0001(self): + """ L2 data on IP4 datapath""" + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, intf='pg4', active=10, + layer='l2', datapath='ip4') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + + self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) + capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {2: 'packets', 256: 8}) + + # expected two templates and one cflow packet + self.collector.get_capture(2) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + def test_0002(self): + """ L3 data on IP4 datapath""" + self.logger.info("FFP_TEST_START_0002") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, intf='pg4', active=10, + layer='l3', datapath='ip4') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + + self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) + capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {1: 'octets', 2: 'packets', + 8: 'src_ip', 12: 'dst_ip'}) + + # expected two templates and one cflow packet + self.collector.get_capture(2) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0002") + + def test_0003(self): + """ L4 data on IP4 datapath""" + self.logger.info("FFP_TEST_START_0003") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, intf='pg4', active=10, + layer='l4', datapath='ip4') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + + self.create_stream(src_if=self.pg3, dst_if=self.pg4, packets=1) + capture = self.send_packets(src_if=self.pg3, dst_if=self.pg4) + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {2: 'packets', 7: 'sport', 11: 'dport'}) + + # expected two templates and one cflow packet + self.collector.get_capture(2) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0003") + + +class TestFFP_DatapathIP6(MethodHolder): + """collect information on IP6 datapath""" + + def test_0000(self): + """ verify templates on IP6 datapath""" + self.logger.info("FFP_TEST_START_0000") + self.pg_enable_capture(self.pg_interfaces) + + ipfix = VppCFLOW(test=self, active=10, datapath='ip6') + ipfix.add_vpp_config() + + # template packet should arrive immediately + ipfix.verify_templates(timeout=3, count=1) + self.collector.get_capture(1) + + ipfix.remove_vpp_config() + + self.logger.info("FFP_TEST_FINISH_0000") + + def test_0001(self): + """ L2 data on IP6 datapath""" + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, intf='pg6', active=20, + layer='l2', datapath='ip6') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, + ip_ver='IPv6') + capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {2: 'packets', 256: 56710}, + ip_ver='v6') + + # expected two templates and one cflow packet + self.collector.get_capture(2) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + def test_0002(self): + """ L3 data on IP6 datapath""" + self.logger.info("FFP_TEST_START_0002") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, intf='pg6', active=10, + layer='l3', datapath='ip6') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, + ip_ver='IPv6') + capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {2: 'packets', + 27: 'src_ip', 28: 'dst_ip'}, + ip_ver='v6') + + # expected two templates and one cflow packet + self.collector.get_capture(2) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0002") + + def test_0003(self): + """ L4 data on IP6 datapath""" + self.logger.info("FFP_TEST_START_0003") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, intf='pg6', active=10, + layer='l4', datapath='ip6') + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3, count=1) + + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, + ip_ver='IPv6') + capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[0], 39) + self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, + {2: 'packets', 7: 'sport', 11: 'dport'}, + ip_ver='v6') + + # expected two templates and one cflow packet + self.collector.get_capture(2) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0003") + + +class TestFFP_NoTimers(MethodHolder): + """No timers""" + + def test_0001(self): + """ no timers, one CFLOW packet, 9 Flows inside""" + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=0) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.create_stream(packets=9) + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + cflow = self.wait_for_cflow_packet(self.collector, templates[1], 10) + self.verify_cflow_data_notimer(ipfix_decoder, capture, [cflow]) + self.wait_for_cflow_packet(self.collector, templates[1], 10, + expected=False) + self.collector.get_capture(4) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + def test_0002(self): + """ no timers, two CFLOW packets (mtu=256), 3 Flows in each""" + self.logger.info("FFP_TEST_START_0002") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=0, mtu=256) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.create_stream(packets=6) + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + cflows = [] + cflows.append(self.wait_for_cflow_packet(self.collector, + templates[1], 10)) + cflows.append(self.wait_for_cflow_packet(self.collector, + templates[1], 10)) + self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows) + self.collector.get_capture(5) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0002") + + +class TestFFP_DisableIPFIX(MethodHolder): + """Disable IPFIX""" + + def test_0001(self): + """ disable IPFIX after first packets""" + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=10) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=30) + + self.create_stream() + self.send_packets() + + # make sure the one packet we expect actually showed up + self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.collector.get_capture(4) + + # disble IPFIX + ipfix.disable_exporter() + self.pg_enable_capture([self.collector]) + + self.send_packets() + + # make sure no one packet arrived in 1 minute + self.wait_for_cflow_packet(self.collector, templates[1], 30, + expected=False) + self.collector.get_capture(0) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + +class TestFFP_ReenableIPFIX(MethodHolder): + """Re-enable IPFIX""" + + def test_0001(self): + """ disable IPFIX after first packets and re-enable after few packets + """ + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=10) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.create_stream() + self.send_packets() + + # make sure the one packet we expect actually showed up + self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.collector.get_capture(4) + + # disble IPFIX + ipfix.disable_exporter() + self.pg_enable_capture([self.collector]) + + self.send_packets() + + # make sure no one packet arrived in active timer span + self.wait_for_cflow_packet(self.collector, templates[1], 30, + expected=False) + self.collector.get_capture(0) + + # enable IPFIX + ipfix.enable_exporter() + self.vapi.cli("ipfix flush") + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.send_packets() + + # make sure the next packets (templates and data) we expect actually + # showed up + self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.collector.get_capture(4) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + +class TestFFP_DisableFFP(MethodHolder): + """Disable Flowprobe feature""" + + def test_0001(self): + """ disable flowprobe feature after first packets""" + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + ipfix = VppCFLOW(test=self, active=10) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.create_stream() + self.send_packets() + + # make sure the one packet we expect actually showed up + self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.collector.get_capture(4) + + # disble IPFIX + ipfix.disable_flowprobe_feature() + self.pg_enable_capture([self.collector]) + + self.send_packets() + + # make sure no one packet arrived in active timer span + self.wait_for_cflow_packet(self.collector, templates[1], 30, + expected=False) + self.collector.get_capture(0) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + +class TestFFP_ReenableFFP(MethodHolder): + """Re-enable Flowprobe feature""" + + def test_0001(self): + """ disable flowprobe feature after first packets and re-enable + after few packets """ + self.logger.info("FFP_TEST_START_0001") + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW(test=self, active=10) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.create_stream() + self.send_packets() + + # make sure the one packet we expect actually showed up + self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.collector.get_capture(4) + + # disble FPP feature + ipfix.disable_flowprobe_feature() + self.pg_enable_capture([self.collector]) + + self.send_packets() + + # make sure no one packet arrived in active timer span + self.wait_for_cflow_packet(self.collector, templates[1], 30, + expected=False) + self.collector.get_capture(0) + + # enable FPP feature + ipfix.enable_flowprobe_feature() + self.vapi.cli("ipfix flush") + templates = ipfix.verify_templates(ipfix_decoder, timeout=3) + + self.send_packets() + + # make sure the next packets (templates and data) we expect actually + # showed up + self.wait_for_cflow_packet(self.collector, templates[1], 30) + self.collector.get_capture(4) + + ipfix.remove_vpp_config() + self.logger.info("FFP_TEST_FINISH_0001") + + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) |