diff options
author | Paul Atkins <patkins@graphiant.com> | 2021-09-27 21:30:13 +0100 |
---|---|---|
committer | Neale Ranns <neale@graphiant.com> | 2021-11-22 09:30:09 +0000 |
commit | 19a5f23b23e4eec54d99f105725ddc4df7421a5c (patch) | |
tree | 57c37b8de8d7391d4f47efc857f875bbeb68b709 /src/vnet/ipfix-export/flow_report.c | |
parent | 0352ea0c779ddcb636325406695ffc659a6e21db (diff) |
ipfix-export: Add APIs to get/send buffers
The ipfix exporter should be doing most of the work of building packets
and sending them rather than leaving every client of the exporter to do
all the work themselves. Start to move towards that by adding APIs to
get and send buffers. Store the state of this in new per thread data on
the report so that we can send with minimal use of atomics. We do need
an atomic for the sequence number in the packet though as that contains
the number of data_records sent for the 'stream', not just for a single
core. As the state is stored on the flow_report_t the caller needs to
know which report they are using, so add a field to the args struct used
to create the report that is used to pass back the report index on success.
Type: improvement
Signed-off-by: Paul Atkins <patkins@graphiant.com>
Change-Id: I222b98a3f0326b3b71b11e0866a8c9736bed6dc1
Diffstat (limited to 'src/vnet/ipfix-export/flow_report.c')
-rw-r--r-- | src/vnet/ipfix-export/flow_report.c | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/src/vnet/ipfix-export/flow_report.c b/src/vnet/ipfix-export/flow_report.c index 38c2454faef..55c3b4d789c 100644 --- a/src/vnet/ipfix-export/flow_report.c +++ b/src/vnet/ipfix-export/flow_report.c @@ -15,6 +15,7 @@ /* * flow_report.c */ +#include <vppinfra/atomics.h> #include <vnet/ipfix-export/flow_report.h> #include <vnet/api_errno.h> #include <vnet/udp/udp.h> @@ -238,6 +239,135 @@ vnet_flow_rewrite_generic_callback (ipfix_exporter_t *exp, flow_report_t *fr, return rewrite; } +vlib_buffer_t * +vnet_ipfix_exp_get_buffer (vlib_main_t *vm, ipfix_exporter_t *exp, + flow_report_t *fr, u32 thread_index) +{ + u32 bi0; + vlib_buffer_t *b0; + + if (fr->per_thread_data[thread_index].buffer) + return fr->per_thread_data[thread_index].buffer; + + if (vlib_buffer_alloc (vm, &bi0, 1) != 1) + return NULL; + + /* Initialize the buffer */ + b0 = fr->per_thread_data[thread_index].buffer = vlib_get_buffer (vm, bi0); + + b0->current_data = 0; + b0->current_length = exp->all_headers_size; + b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VNET_BUFFER_F_FLOW_REPORT); + vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0; + vnet_buffer (b0)->sw_if_index[VLIB_TX] = exp->fib_index; + fr->per_thread_data[thread_index].next_data_offset = b0->current_length; + + return b0; +} + +/* + * Send a buffer that is mostly populated. Has flow records but needs some + * header fields updated. + */ +void +vnet_ipfix_exp_send_buffer (vlib_main_t *vm, ipfix_exporter_t *exp, + flow_report_t *fr, flow_report_stream_t *stream, + u32 thread_index, vlib_buffer_t *b0) +{ + flow_report_main_t *frm = &flow_report_main; + vlib_frame_t *f; + ip4_ipfix_template_packet_t *tp; + ipfix_set_header_t *s; + ipfix_message_header_t *h; + ip4_header_t *ip; + udp_header_t *udp; + + /* nothing to send */ + if (fr->per_thread_data[thread_index].next_data_offset <= + exp->all_headers_size) + return; + + tp = vlib_buffer_get_current (b0); + ip = (ip4_header_t *) &tp->ip4; + udp = (udp_header_t *) (ip + 1); + h = (ipfix_message_header_t *) (udp + 1); + s = (ipfix_set_header_t *) (h + 1); + + ip->ip_version_and_header_length = 0x45; + ip->ttl = 254; + ip->protocol = IP_PROTOCOL_UDP; + ip->flags_and_fragment_offset = 0; + ip->src_address.as_u32 = exp->src_address.as_u32; + ip->dst_address.as_u32 = exp->ipfix_collector.as_u32; + udp->src_port = clib_host_to_net_u16 (stream->src_port); + udp->dst_port = clib_host_to_net_u16 (exp->collector_port); + udp->checksum = 0; + + /* FIXUP: message header export_time */ + h->export_time = + (u32) (((f64) frm->unix_time_0) + (vlib_time_now (vm) - frm->vlib_time_0)); + h->export_time = clib_host_to_net_u32 (h->export_time); + h->domain_id = clib_host_to_net_u32 (stream->domain_id); + + /* + * RFC 7011: Section 3.2 + * + * Incremental sequence counter modulo 2^32 of all IPFIX Data Records + * sent in the current stream from the current Observation Domain by + * the Exporting Process + */ + h->sequence_number = + clib_atomic_fetch_add (&stream->sequence_number, + fr->per_thread_data[thread_index].n_data_records); + h->sequence_number = clib_host_to_net_u32 (h->sequence_number); + + /* + * For data records we use the template ID as the set ID. + * RFC 7011: 3.4.3 + */ + s->set_id_length = ipfix_set_id_length ( + fr->template_id, + b0->current_length - (sizeof (*ip) + sizeof (*udp) + sizeof (*h))); + h->version_length = + version_length (b0->current_length - (sizeof (*ip) + sizeof (*udp))); + + ip->length = clib_host_to_net_u16 (b0->current_length); + + ip->checksum = ip4_header_checksum (ip); + udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip)); + + if (exp->udp_checksum) + { + /* RFC 7011 section 10.3.2. */ + udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip); + if (udp->checksum == 0) + udp->checksum = 0xffff; + } + + ASSERT (ip4_header_checksum_is_valid (ip)); + + /* Find or allocate a frame */ + f = fr->per_thread_data[thread_index].frame; + if (PREDICT_FALSE (f == 0)) + { + u32 *to_next; + f = vlib_get_frame_to_node (vm, ip4_lookup_node.index); + fr->per_thread_data[thread_index].frame = f; + u32 bi0 = vlib_get_buffer_index (vm, b0); + + /* Enqueue the buffer */ + to_next = vlib_frame_vector_args (f); + to_next[0] = bi0; + f->n_vectors = 1; + } + + vlib_put_frame_to_node (vm, ip4_lookup_node.index, f); + + fr->per_thread_data[thread_index].frame = NULL; + fr->per_thread_data[thread_index].buffer = NULL; + fr->per_thread_data[thread_index].next_data_offset = 0; +} + static uword flow_report_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f) @@ -346,6 +476,10 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp, flow_report_t *fr; flow_report_stream_t *stream; u32 si; + vlib_thread_main_t *tm = &vlib_thread_main; + flow_report_main_t *frm = &flow_report_main; + vlib_main_t *vm = frm->vlib_main; + int size; si = find_stream (exp, a->domain_id, a->src_port); if (si == -2) @@ -371,6 +505,19 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp, { if (found_index != ~0) { + for (int i = 0; + i < vec_len (exp->reports[found_index].per_thread_data); i++) + { + u32 bi; + if (exp->reports[found_index].per_thread_data[i].buffer) + { + bi = vlib_get_buffer_index ( + vm, exp->reports[found_index].per_thread_data[i].buffer); + vlib_buffer_free (vm, &bi, 1); + } + } + vec_free (exp->reports[found_index].per_thread_data); + vec_delete (exp->reports, 1, found_index); stream = &exp->streams[si]; stream->n_reports--; @@ -410,6 +557,14 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp, fr->report_elements = a->report_elements; fr->n_report_elements = a->n_report_elements; fr->stream_indexp = a->stream_indexp; + vec_validate (fr->per_thread_data, tm->n_threads); + /* Store the flow_report index back in the args struct */ + a->flow_report_index = fr - exp->reports; + + size = 0; + for (int i = 0; i < fr->n_report_elements; i++) + size += fr->report_elements[i].size; + fr->data_record_size = size; if (template_id) *template_id = fr->template_id; @@ -539,6 +694,11 @@ set_ipfix_exporter_command_fn (vlib_main_t * vm, if (path_mtu < 68) return clib_error_return (0, "too small path-mtu value, minimum is 68"); + /* Calculate how much header data we need. */ + exp->all_headers_size = sizeof (ip4_header_t) + sizeof (udp_header_t) + + sizeof (ipfix_message_header_t) + + sizeof (ipfix_set_header_t); + /* Reset report streams if we are reconfiguring IP addresses */ if (exp->ipfix_collector.as_u32 != collector.as_u32 || exp->src_address.as_u32 != src.as_u32 || |