summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/vnet/ipfix-export/flow_api.c5
-rw-r--r--src/vnet/ipfix-export/flow_report.c160
-rw-r--r--src/vnet/ipfix-export/flow_report.h69
3 files changed, 234 insertions, 0 deletions
diff --git a/src/vnet/ipfix-export/flow_api.c b/src/vnet/ipfix-export/flow_api.c
index c64f5508487..62dc703d9a0 100644
--- a/src/vnet/ipfix-export/flow_api.c
+++ b/src/vnet/ipfix-export/flow_api.c
@@ -160,6 +160,11 @@ vl_api_set_ipfix_exporter_t_internal (
if (path_mtu < 68)
return VNET_API_ERROR_INVALID_VALUE;
+ /* Calculate how much header data we need. */
+ exp->all_headers_size = sizeof (ip4_header_t) + sizeof (udp_header_t) +
+ sizeof (ipfix_message_header_t) +
+ sizeof (ipfix_set_header_t);
+
/* Reset report streams if we are reconfiguring IP addresses */
if (exp->ipfix_collector.as_u32 != collector.as_u32 ||
exp->src_address.as_u32 != src.as_u32 ||
diff --git a/src/vnet/ipfix-export/flow_report.c b/src/vnet/ipfix-export/flow_report.c
index 38c2454faef..55c3b4d789c 100644
--- a/src/vnet/ipfix-export/flow_report.c
+++ b/src/vnet/ipfix-export/flow_report.c
@@ -15,6 +15,7 @@
/*
* flow_report.c
*/
+#include <vppinfra/atomics.h>
#include <vnet/ipfix-export/flow_report.h>
#include <vnet/api_errno.h>
#include <vnet/udp/udp.h>
@@ -238,6 +239,135 @@ vnet_flow_rewrite_generic_callback (ipfix_exporter_t *exp, flow_report_t *fr,
return rewrite;
}
+vlib_buffer_t *
+vnet_ipfix_exp_get_buffer (vlib_main_t *vm, ipfix_exporter_t *exp,
+ flow_report_t *fr, u32 thread_index)
+{
+ u32 bi0;
+ vlib_buffer_t *b0;
+
+ if (fr->per_thread_data[thread_index].buffer)
+ return fr->per_thread_data[thread_index].buffer;
+
+ if (vlib_buffer_alloc (vm, &bi0, 1) != 1)
+ return NULL;
+
+ /* Initialize the buffer */
+ b0 = fr->per_thread_data[thread_index].buffer = vlib_get_buffer (vm, bi0);
+
+ b0->current_data = 0;
+ b0->current_length = exp->all_headers_size;
+ b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VNET_BUFFER_F_FLOW_REPORT);
+ vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0;
+ vnet_buffer (b0)->sw_if_index[VLIB_TX] = exp->fib_index;
+ fr->per_thread_data[thread_index].next_data_offset = b0->current_length;
+
+ return b0;
+}
+
+/*
+ * Send a buffer that is mostly populated. Has flow records but needs some
+ * header fields updated.
+ */
+void
+vnet_ipfix_exp_send_buffer (vlib_main_t *vm, ipfix_exporter_t *exp,
+ flow_report_t *fr, flow_report_stream_t *stream,
+ u32 thread_index, vlib_buffer_t *b0)
+{
+ flow_report_main_t *frm = &flow_report_main;
+ vlib_frame_t *f;
+ ip4_ipfix_template_packet_t *tp;
+ ipfix_set_header_t *s;
+ ipfix_message_header_t *h;
+ ip4_header_t *ip;
+ udp_header_t *udp;
+
+ /* nothing to send */
+ if (fr->per_thread_data[thread_index].next_data_offset <=
+ exp->all_headers_size)
+ return;
+
+ tp = vlib_buffer_get_current (b0);
+ ip = (ip4_header_t *) &tp->ip4;
+ udp = (udp_header_t *) (ip + 1);
+ h = (ipfix_message_header_t *) (udp + 1);
+ s = (ipfix_set_header_t *) (h + 1);
+
+ ip->ip_version_and_header_length = 0x45;
+ ip->ttl = 254;
+ ip->protocol = IP_PROTOCOL_UDP;
+ ip->flags_and_fragment_offset = 0;
+ ip->src_address.as_u32 = exp->src_address.as_u32;
+ ip->dst_address.as_u32 = exp->ipfix_collector.as_u32;
+ udp->src_port = clib_host_to_net_u16 (stream->src_port);
+ udp->dst_port = clib_host_to_net_u16 (exp->collector_port);
+ udp->checksum = 0;
+
+ /* FIXUP: message header export_time */
+ h->export_time =
+ (u32) (((f64) frm->unix_time_0) + (vlib_time_now (vm) - frm->vlib_time_0));
+ h->export_time = clib_host_to_net_u32 (h->export_time);
+ h->domain_id = clib_host_to_net_u32 (stream->domain_id);
+
+ /*
+ * RFC 7011: Section 3.2
+ *
+ * Incremental sequence counter modulo 2^32 of all IPFIX Data Records
+ * sent in the current stream from the current Observation Domain by
+ * the Exporting Process
+ */
+ h->sequence_number =
+ clib_atomic_fetch_add (&stream->sequence_number,
+ fr->per_thread_data[thread_index].n_data_records);
+ h->sequence_number = clib_host_to_net_u32 (h->sequence_number);
+
+ /*
+ * For data records we use the template ID as the set ID.
+ * RFC 7011: 3.4.3
+ */
+ s->set_id_length = ipfix_set_id_length (
+ fr->template_id,
+ b0->current_length - (sizeof (*ip) + sizeof (*udp) + sizeof (*h)));
+ h->version_length =
+ version_length (b0->current_length - (sizeof (*ip) + sizeof (*udp)));
+
+ ip->length = clib_host_to_net_u16 (b0->current_length);
+
+ ip->checksum = ip4_header_checksum (ip);
+ udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip));
+
+ if (exp->udp_checksum)
+ {
+ /* RFC 7011 section 10.3.2. */
+ udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip);
+ if (udp->checksum == 0)
+ udp->checksum = 0xffff;
+ }
+
+ ASSERT (ip4_header_checksum_is_valid (ip));
+
+ /* Find or allocate a frame */
+ f = fr->per_thread_data[thread_index].frame;
+ if (PREDICT_FALSE (f == 0))
+ {
+ u32 *to_next;
+ f = vlib_get_frame_to_node (vm, ip4_lookup_node.index);
+ fr->per_thread_data[thread_index].frame = f;
+ u32 bi0 = vlib_get_buffer_index (vm, b0);
+
+ /* Enqueue the buffer */
+ to_next = vlib_frame_vector_args (f);
+ to_next[0] = bi0;
+ f->n_vectors = 1;
+ }
+
+ vlib_put_frame_to_node (vm, ip4_lookup_node.index, f);
+
+ fr->per_thread_data[thread_index].frame = NULL;
+ fr->per_thread_data[thread_index].buffer = NULL;
+ fr->per_thread_data[thread_index].next_data_offset = 0;
+}
+
static uword
flow_report_process (vlib_main_t * vm,
vlib_node_runtime_t * rt, vlib_frame_t * f)
@@ -346,6 +476,10 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
flow_report_t *fr;
flow_report_stream_t *stream;
u32 si;
+ vlib_thread_main_t *tm = &vlib_thread_main;
+ flow_report_main_t *frm = &flow_report_main;
+ vlib_main_t *vm = frm->vlib_main;
+ int size;
si = find_stream (exp, a->domain_id, a->src_port);
if (si == -2)
@@ -371,6 +505,19 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
{
if (found_index != ~0)
{
+ for (int i = 0;
+ i < vec_len (exp->reports[found_index].per_thread_data); i++)
+ {
+ u32 bi;
+ if (exp->reports[found_index].per_thread_data[i].buffer)
+ {
+ bi = vlib_get_buffer_index (
+ vm, exp->reports[found_index].per_thread_data[i].buffer);
+ vlib_buffer_free (vm, &bi, 1);
+ }
+ }
+ vec_free (exp->reports[found_index].per_thread_data);
+
vec_delete (exp->reports, 1, found_index);
stream = &exp->streams[si];
stream->n_reports--;
@@ -410,6 +557,14 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp,
fr->report_elements = a->report_elements;
fr->n_report_elements = a->n_report_elements;
fr->stream_indexp = a->stream_indexp;
+ vec_validate (fr->per_thread_data, tm->n_threads);
+ /* Store the flow_report index back in the args struct */
+ a->flow_report_index = fr - exp->reports;
+
+ size = 0;
+ for (int i = 0; i < fr->n_report_elements; i++)
+ size += fr->report_elements[i].size;
+ fr->data_record_size = size;
if (template_id)
*template_id = fr->template_id;
@@ -539,6 +694,11 @@ set_ipfix_exporter_command_fn (vlib_main_t * vm,
if (path_mtu < 68)
return clib_error_return (0, "too small path-mtu value, minimum is 68");
+ /* Calculate how much header data we need. */
+ exp->all_headers_size = sizeof (ip4_header_t) + sizeof (udp_header_t) +
+ sizeof (ipfix_message_header_t) +
+ sizeof (ipfix_set_header_t);
+
/* Reset report streams if we are reconfiguring IP addresses */
if (exp->ipfix_collector.as_u32 != collector.as_u32 ||
exp->src_address.as_u32 != src.as_u32 ||
diff --git a/src/vnet/ipfix-export/flow_report.h b/src/vnet/ipfix-export/flow_report.h
index 65ddebcac3e..6b884a2e28e 100644
--- a/src/vnet/ipfix-export/flow_report.h
+++ b/src/vnet/ipfix-export/flow_report.h
@@ -72,6 +72,16 @@ typedef union
uword as_uword;
} opaque_t;
+/*
+ * A stream represents an IPFIX session to a destination. We can have
+ * multiple streams to the same destination, but each one has its own
+ * domain and source port. A stream has a sequence number for that
+ * session. A stream may contain multiple templates (i.e multiple for
+ * reports) and each stream also has its own template space.
+ *
+ * A stream has per thread state so that data packets can be built
+ * and send on multiple threads at the same time.
+ */
typedef struct
{
u32 domain_id;
@@ -81,11 +91,37 @@ typedef struct
u16 next_template_no;
} flow_report_stream_t;
+/*
+ * For each flow_report we want to be able to build buffers/frames per thread.
+ */
+typedef struct
+{
+ vlib_buffer_t *buffer;
+ vlib_frame_t *frame;
+ u16 next_data_offset;
+ /*
+ * We need this per stream as the IPFIX sequence number is the count of
+ * data record sent, not the count of packets with data records sent.
+ * See RFC 7011, Sec 3.1
+ */
+ u8 n_data_records;
+} flow_report_per_thread_t;
+
+/*
+ * A flow report represents a group of fields that are to be exported.
+ * Each flow_report has an associated template that is generated when
+ * the flow_report is added. Each flow_report is associated with a
+ * stream, and multiple flow_reports can use the same stream. When
+ * adding a flow_report the keys for the stream are the domain_id
+ * and the source_port.
+ */
typedef struct flow_report
{
/* ipfix rewrite, set by callback */
u8 *rewrite;
u16 template_id;
+ int data_record_size;
+ flow_report_per_thread_t *per_thread_data;
u32 stream_index;
f64 last_template_sent;
int update_rewrite;
@@ -134,6 +170,13 @@ typedef struct ipfix_exporter
/* UDP checksum calculation enable flag */
u8 udp_checksum;
+
+ /*
+ * The amount of data needed for all the headers, prior to the first
+ * flowset (template or data or ...) This is mostly dependent on the
+ * L3 and L4 protocols in use.
+ */
+ u32 all_headers_size;
} ipfix_exporter_t;
typedef struct flow_report_main
@@ -171,6 +214,11 @@ typedef struct
u32 domain_id;
u16 src_port;
u32 *stream_indexp;
+ /*
+ * When adding a flow report, the index of the flow report is stored
+ * here on success.
+ */
+ u32 flow_report_index;
} vnet_flow_report_add_del_args_t;
int vnet_flow_report_add_del (ipfix_exporter_t *exp,
@@ -191,6 +239,27 @@ int vnet_stream_change (ipfix_exporter_t *exp, u32 old_domain_id,
*/
ipfix_exporter_t *vnet_ipfix_exporter_lookup (ip4_address_t *ipfix_collector);
+/*
+ * Get the currently in use buffer for the given stream on the given core.
+ * If there is no current buffer then allocate a new one and return that.
+ * This is the buffer that data records should be written into. The offset
+ * currently in use is stored in the per-thread data for the stream and
+ * should be updated as new records are written in.
+ */
+vlib_buffer_t *vnet_ipfix_exp_get_buffer (vlib_main_t *vm,
+ ipfix_exporter_t *exp,
+ flow_report_t *fr, u32 thread_index);
+
+/*
+ * Send the provided buffer. At this stage the buffer should be populated
+ * with data records, with the offset in use stored in the stream per thread
+ * data. This func will fix up all the headers and then send the buffer.
+ */
+void vnet_ipfix_exp_send_buffer (vlib_main_t *vm, ipfix_exporter_t *exp,
+ flow_report_t *fr,
+ flow_report_stream_t *stream,
+ u32 thread_index, vlib_buffer_t *b0);
+
#endif /* __included_vnet_flow_report_h__ */
/*