summaryrefslogtreecommitdiffstats
path: root/src/vnet/ipfix-export/flow_report.c
diff options
context:
space:
mode:
authorPaul Atkins <patkins@graphiant.com>2021-09-27 21:30:13 +0100
committerNeale Ranns <neale@graphiant.com>2021-11-22 09:30:09 +0000
commit19a5f23b23e4eec54d99f105725ddc4df7421a5c (patch)
tree57c37b8de8d7391d4f47efc857f875bbeb68b709 /src/vnet/ipfix-export/flow_report.c
parent0352ea0c779ddcb636325406695ffc659a6e21db (diff)
ipfix-export: Add APIs to get/send buffers
The ipfix exporter should be doing most of the work of building packets and sending them rather than leaving every client of the exporter to do all the work themselves. Start to move towards that by adding APIs to get and send buffers. Store the state of this in new per thread data on the report so that we can send with minimal use of atomics. We do need an atomic for the sequence number in the packet though as that contains the number of data_records sent for the 'stream', not just for a single core. As the state is stored on the flow_report_t the caller needs to know which report they are using, so add a field to the args struct used to create the report that is used to pass back the report index on success. Type: improvement Signed-off-by: Paul Atkins <patkins@graphiant.com> Change-Id: I222b98a3f0326b3b71b11e0866a8c9736bed6dc1
Diffstat (limited to 'src/vnet/ipfix-export/flow_report.c')
-rw-r--r--src/vnet/ipfix-export/flow_report.c160
1 files changed, 160 insertions, 0 deletions
diff --git a/src/vnet/ipfix-export/flow_report.c b/src/vnet/ipfix-export/flow_report.c
index 38c2454faef..55c3b4d789c 100644
--- a/src/vnet/ipfix-export/flow_report.c
+++ b/src/vnet/ipfix-export/flow_report.c
@@ -15,6 +15,7 @@
/*
* flow_report.c
*/
+#include <vppinfra/atomics.h>
#include <vnet/ipfix-export/flow_report.h>
#include <vnet/api_errno.h>
#include <vnet/udp/udp.h>
@@ -238,6 +239,135 @@ vnet_flow_rewrite_generic_callback (ipfix_exporter_t *exp, flow_report_t *fr,
return rewrite;
}
+vlib_buffer_t *
+vnet_ipfix_exp_get_buffer (vlib_main_t *vm, ipfix_exporter_t *exp,
+ flow_report_t *fr, u32 thread_index)
+{
+ u32 bi0;
+ vlib_buffer_t *b0;
+
+ if (fr->per_thread_data[thread_index].buffer)
+ return fr->per_thread_data[thread_index].buffer;
+
+ if (vlib_buffer_alloc (vm, &bi0, 1) != 1)
+ return NULL;
+
+ /* Initialize the buffer */
+ b0 = fr->per_thread_data[thread_index].buffer = vlib_get_buffer (vm, bi0);
+
+ b0->current_data = 0;
+ b0->current_length = exp->all_headers_size;
+ b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VNET_BUFFER_F_FLOW_REPORT);
+ vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
+ vnet_buffer (b0)->sw_if_index[VLIB_TX] = exp->fib_index;
+ fr->per_thread_data[thread_index].next_data_offset = b0->current_length;
+
+ return b0;
+}
+
+/*
+ * Send a buffer that is mostly populated. Has flow records but needs some
+ * header fields updated.
+ */
+void
+vnet_ipfix_exp_send_buffer (vlib_main_t *vm, ipfix_exporter_t *exp,
+ flow_report_t *fr, flow_report_stream_t *stream,
+ u32 thread_index, vlib_buffer_t *b0)
+{
+ flow_report_main_t *frm = &flow_report_main;
+ vlib_frame_t *f;
+ ip4_ipfix_template_packet_t *tp;
+ ipfix_set_header_t *s;
+ ipfix_message_header_t *h;
+ ip4_header_t *ip;
+ udp_header_t *udp;
+
+ /* nothing to send */
+ if (fr->per_thread_data[thread_index].next_data_offset <=
+ exp->all_headers_size)
+ return;
+
+ tp = vlib_buffer_get_current (b0);
+ ip = (ip4_header_t *) &tp->ip4;
+ udp = (udp_header_t *) (ip + 1);
+ h = (ipfix_message_header_t *) (udp + 1);
+ s = (ipfix_set_header_t *) (h + 1);
+
+ ip->ip_version_and_header_length = 0x45;
+ ip->ttl = 254;
+ ip->protocol = IP_PROTOCOL_UDP;
+ ip->flags_and_fragment_offset = 0;
+ ip->src_address.as_u32 = exp->src_address.as_u32;
+ ip->dst_address.as_u32 = exp->ipfix_collector.as_u32;
+ udp->src_port = clib_host_to_net_u16 (stream->src_port);
+ udp->dst_port = clib_host_to_net_u16 (exp->collector_port);
+ udp->checksum = 0;
+
+ /* FIXUP: message header export_time */
+ h->export_time =
+ (u32) (((f64) frm->unix_time_0) + (vlib_time_now (vm) - frm->vlib_time_0));
+ h->export_time = clib_host_to_net_u32 (h->export_time);
+ h->domain_id = clib_host_to_net_u32 (stream->domain_id);
+
+ /*
+ * RFC 7011: Section 3.2
+ *
+ * Incremental sequence counter modulo 2^32 of all IPFIX Data Records
+ * sent in the current stream from the current Observation Domain by
+ * the Exporting Process
+ */
+ h->sequence_number =
+ clib_atomic_fetch_add (&stream->sequence_number,
+ fr->per_thread_data[thread_index].n_data_records);
+ h->sequence_number = clib_host_to_net_u32 (h->sequence_number);
+
+ /*
+ * For data records we use the template ID as the set ID.
+ * RFC 7011: 3.4.3
+ */
+ s->set_id_length = ipfix_set_id_length (
+ fr->template_id,
+ b0->current_length - (sizeof (*ip) + sizeof (*udp) + sizeof (*h)));
+ h->version_length =
+ version_length (b0->current_length - (sizeof (*ip) + sizeof (*udp)));
+
+ ip->length = clib_host_to_net_u16 (b0->current_length);
+
+ ip->checksum = ip4_header_checksum (ip);
+ udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip));
+
+ if (exp->udp_checksum)
+ {
+ /* RFC 7011 section 10.3.2. */
+ udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip);
+ if (udp->checksum == 0)
+ udp->checksum = 0xffff;
+ }
+
+ ASSERT (ip4_header_checksum_is_valid (ip));
+
+ /* Find or allocate a frame */
+ f = fr->per_thread_data[thread_index].frame;
+ if (PREDICT_FALSE (f == 0))
+ {
+ u32 *to_next;
+ f = vlib_get_frame_to_node (vm, ip4_lookup_node.index);
+ fr->per_thread_data[thread_index].frame = f;
+ u32 bi0 = vlib_get_buffer_index (vm, b0);
+
+ /* Enqueue the buffer */
+ to_next = vlib_frame_vector_args (f);
+ to_next[0] = bi0;
+ f->n_vectors = 1;
+ }
+
+ vlib_put_frame_to_node (vm, ip4_lookup_node.index, f);
+
+ fr->per_thread_data[thread_index].frame = NULL;
+ fr->per_thread_data[thread_index].buffer = NULL;
+ fr->per_thread_data[thread_index].next_data_offset = 0;
+}
+
static uword
flow_report_process (vlib_main_t * vm,
vlib_node_runtime_t * rt, vlib_frame_t * f)
@@ -346,6 +476,10 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
flow_report_t *fr;
flow_report_stream_t *stream;
u32 si;
+ vlib_thread_main_t *tm = &vlib_thread_main;
+ flow_report_main_t *frm = &flow_report_main;
+ vlib_main_t *vm = frm->vlib_main;
+ int size;
si = find_stream (exp, a->domain_id, a->src_port);
if (si == -2)
@@ -371,6 +505,19 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
{
if (found_index != ~0)
{
+ for (int i = 0;
+ i < vec_len (exp->reports[found_index].per_thread_data); i++)
+ {
+ u32 bi;
+ if (exp->reports[found_index].per_thread_data[i].buffer)
+ {
+ bi = vlib_get_buffer_index (
+ vm, exp->reports[found_index].per_thread_data[i].buffer);
+ vlib_buffer_free (vm, &bi, 1);
+ }
+ }
+ vec_free (exp->reports[found_index].per_thread_data);
+
vec_delete (exp->reports, 1, found_index);
stream = &exp->streams[si];
stream->n_reports--;
@@ -410,6 +557,14 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
fr->report_elements = a->report_elements;
fr->n_report_elements = a->n_report_elements;
fr->stream_indexp = a->stream_indexp;
+ vec_validate (fr->per_thread_data, tm->n_threads);
+ /* Store the flow_report index back in the args struct */
+ a->flow_report_index = fr - exp->reports;
+
+ size = 0;
+ for (int i = 0; i < fr->n_report_elements; i++)
+ size += fr->report_elements[i].size;
+ fr->data_record_size = size;
if (template_id)
*template_id = fr->template_id;
@@ -539,6 +694,11 @@ set_ipfix_exporter_command_fn (vlib_main_t * vm,
if (path_mtu < 68)
return clib_error_return (0, "too small path-mtu value, minimum is 68");
+ /* Calculate how much header data we need. */
+ exp->all_headers_size = sizeof (ip4_header_t) + sizeof (udp_header_t) +
+ sizeof (ipfix_message_header_t) +
+ sizeof (ipfix_set_header_t);
+
/* Reset report streams if we are reconfiguring IP addresses */
if (exp->ipfix_collector.as_u32 != collector.as_u32 ||
exp->src_address.as_u32 != src.as_u32 ||