diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/vnet/ipfix-export/flow_api.c | 5 | ||||
-rw-r--r-- | src/vnet/ipfix-export/flow_report.c | 160 | ||||
-rw-r--r-- | src/vnet/ipfix-export/flow_report.h | 69 |
3 files changed, 234 insertions, 0 deletions
diff --git a/src/vnet/ipfix-export/flow_api.c b/src/vnet/ipfix-export/flow_api.c index c64f5508487..62dc703d9a0 100644 --- a/src/vnet/ipfix-export/flow_api.c +++ b/src/vnet/ipfix-export/flow_api.c @@ -160,6 +160,11 @@ vl_api_set_ipfix_exporter_t_internal ( if (path_mtu < 68) return VNET_API_ERROR_INVALID_VALUE; + /* Calculate how much header data we need. */ + exp->all_headers_size = sizeof (ip4_header_t) + sizeof (udp_header_t) + + sizeof (ipfix_message_header_t) + + sizeof (ipfix_set_header_t); + /* Reset report streams if we are reconfiguring IP addresses */ if (exp->ipfix_collector.as_u32 != collector.as_u32 || exp->src_address.as_u32 != src.as_u32 || diff --git a/src/vnet/ipfix-export/flow_report.c b/src/vnet/ipfix-export/flow_report.c index 38c2454faef..55c3b4d789c 100644 --- a/src/vnet/ipfix-export/flow_report.c +++ b/src/vnet/ipfix-export/flow_report.c @@ -15,6 +15,7 @@ /* * flow_report.c */ +#include <vppinfra/atomics.h> #include <vnet/ipfix-export/flow_report.h> #include <vnet/api_errno.h> #include <vnet/udp/udp.h> @@ -238,6 +239,135 @@ vnet_flow_rewrite_generic_callback (ipfix_exporter_t *exp, flow_report_t *fr, return rewrite; } +vlib_buffer_t * +vnet_ipfix_exp_get_buffer (vlib_main_t *vm, ipfix_exporter_t *exp, + flow_report_t *fr, u32 thread_index) +{ + u32 bi0; + vlib_buffer_t *b0; + + if (fr->per_thread_data[thread_index].buffer) + return fr->per_thread_data[thread_index].buffer; + + if (vlib_buffer_alloc (vm, &bi0, 1) != 1) + return NULL; + + /* Initialize the buffer */ + b0 = fr->per_thread_data[thread_index].buffer = vlib_get_buffer (vm, bi0); + + b0->current_data = 0; + b0->current_length = exp->all_headers_size; + b0->flags |= (VLIB_BUFFER_TOTAL_LENGTH_VALID | VNET_BUFFER_F_FLOW_REPORT); + vnet_buffer (b0)->sw_if_index[VLIB_RX] = 0; + vnet_buffer (b0)->sw_if_index[VLIB_TX] = exp->fib_index; + fr->per_thread_data[thread_index].next_data_offset = b0->current_length; + + return b0; +} + +/* + * Send a buffer that is mostly populated. Has flow records but needs some + * header fields updated. + */ +void +vnet_ipfix_exp_send_buffer (vlib_main_t *vm, ipfix_exporter_t *exp, + flow_report_t *fr, flow_report_stream_t *stream, + u32 thread_index, vlib_buffer_t *b0) +{ + flow_report_main_t *frm = &flow_report_main; + vlib_frame_t *f; + ip4_ipfix_template_packet_t *tp; + ipfix_set_header_t *s; + ipfix_message_header_t *h; + ip4_header_t *ip; + udp_header_t *udp; + + /* nothing to send */ + if (fr->per_thread_data[thread_index].next_data_offset <= + exp->all_headers_size) + return; + + tp = vlib_buffer_get_current (b0); + ip = (ip4_header_t *) &tp->ip4; + udp = (udp_header_t *) (ip + 1); + h = (ipfix_message_header_t *) (udp + 1); + s = (ipfix_set_header_t *) (h + 1); + + ip->ip_version_and_header_length = 0x45; + ip->ttl = 254; + ip->protocol = IP_PROTOCOL_UDP; + ip->flags_and_fragment_offset = 0; + ip->src_address.as_u32 = exp->src_address.as_u32; + ip->dst_address.as_u32 = exp->ipfix_collector.as_u32; + udp->src_port = clib_host_to_net_u16 (stream->src_port); + udp->dst_port = clib_host_to_net_u16 (exp->collector_port); + udp->checksum = 0; + + /* FIXUP: message header export_time */ + h->export_time = + (u32) (((f64) frm->unix_time_0) + (vlib_time_now (vm) - frm->vlib_time_0)); + h->export_time = clib_host_to_net_u32 (h->export_time); + h->domain_id = clib_host_to_net_u32 (stream->domain_id); + + /* + * RFC 7011: Section 3.2 + * + * Incremental sequence counter modulo 2^32 of all IPFIX Data Records + * sent in the current stream from the current Observation Domain by + * the Exporting Process + */ + h->sequence_number = + clib_atomic_fetch_add (&stream->sequence_number, + fr->per_thread_data[thread_index].n_data_records); + h->sequence_number = clib_host_to_net_u32 (h->sequence_number); + + /* + * For data records we use the template ID as the set ID. + * RFC 7011: 3.4.3 + */ + s->set_id_length = ipfix_set_id_length ( + fr->template_id, + b0->current_length - (sizeof (*ip) + sizeof (*udp) + sizeof (*h))); + h->version_length = + version_length (b0->current_length - (sizeof (*ip) + sizeof (*udp))); + + ip->length = clib_host_to_net_u16 (b0->current_length); + + ip->checksum = ip4_header_checksum (ip); + udp->length = clib_host_to_net_u16 (b0->current_length - sizeof (*ip)); + + if (exp->udp_checksum) + { + /* RFC 7011 section 10.3.2. */ + udp->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip); + if (udp->checksum == 0) + udp->checksum = 0xffff; + } + + ASSERT (ip4_header_checksum_is_valid (ip)); + + /* Find or allocate a frame */ + f = fr->per_thread_data[thread_index].frame; + if (PREDICT_FALSE (f == 0)) + { + u32 *to_next; + f = vlib_get_frame_to_node (vm, ip4_lookup_node.index); + fr->per_thread_data[thread_index].frame = f; + u32 bi0 = vlib_get_buffer_index (vm, b0); + + /* Enqueue the buffer */ + to_next = vlib_frame_vector_args (f); + to_next[0] = bi0; + f->n_vectors = 1; + } + + vlib_put_frame_to_node (vm, ip4_lookup_node.index, f); + + fr->per_thread_data[thread_index].frame = NULL; + fr->per_thread_data[thread_index].buffer = NULL; + fr->per_thread_data[thread_index].next_data_offset = 0; +} + static uword flow_report_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f) @@ -346,6 +476,10 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp, flow_report_t *fr; flow_report_stream_t *stream; u32 si; + vlib_thread_main_t *tm = &vlib_thread_main; + flow_report_main_t *frm = &flow_report_main; + vlib_main_t *vm = frm->vlib_main; + int size; si = find_stream (exp, a->domain_id, a->src_port); if (si == -2) @@ -371,6 +505,19 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp, { if (found_index != ~0) { + for (int i = 0; + i < vec_len (exp->reports[found_index].per_thread_data); i++) + { + u32 bi; + if (exp->reports[found_index].per_thread_data[i].buffer) + { + bi = vlib_get_buffer_index ( + vm, exp->reports[found_index].per_thread_data[i].buffer); + vlib_buffer_free (vm, &bi, 1); + } + } + vec_free (exp->reports[found_index].per_thread_data); + vec_delete (exp->reports, 1, found_index); stream = &exp->streams[si]; stream->n_reports--; @@ -410,6 +557,14 @@ vnet_flow_report_add_del (ipfix_exporter_t *exp, fr->report_elements = a->report_elements; fr->n_report_elements = a->n_report_elements; fr->stream_indexp = a->stream_indexp; + vec_validate (fr->per_thread_data, tm->n_threads); + /* Store the flow_report index back in the args struct */ + a->flow_report_index = fr - exp->reports; + + size = 0; + for (int i = 0; i < fr->n_report_elements; i++) + size += fr->report_elements[i].size; + fr->data_record_size = size; if (template_id) *template_id = fr->template_id; @@ -539,6 +694,11 @@ set_ipfix_exporter_command_fn (vlib_main_t * vm, if (path_mtu < 68) return clib_error_return (0, "too small path-mtu value, minimum is 68"); + /* Calculate how much header data we need. */ + exp->all_headers_size = sizeof (ip4_header_t) + sizeof (udp_header_t) + + sizeof (ipfix_message_header_t) + + sizeof (ipfix_set_header_t); + /* Reset report streams if we are reconfiguring IP addresses */ if (exp->ipfix_collector.as_u32 != collector.as_u32 || exp->src_address.as_u32 != src.as_u32 || diff --git a/src/vnet/ipfix-export/flow_report.h b/src/vnet/ipfix-export/flow_report.h index 65ddebcac3e..6b884a2e28e 100644 --- a/src/vnet/ipfix-export/flow_report.h +++ b/src/vnet/ipfix-export/flow_report.h @@ -72,6 +72,16 @@ typedef union uword as_uword; } opaque_t; +/* + * A stream represents an IPFIX session to a destination. We can have + * multiple streams to the same destination, but each one has its own + * domain and source port. A stream has a sequence number for that + * session. A stream may contain multiple templates (i.e multiple for + * reports) and each stream also has its own template space. + * + * A stream has per thread state so that data packets can be built + * and send on multiple threads at the same time. + */ typedef struct { u32 domain_id; @@ -81,11 +91,37 @@ typedef struct u16 next_template_no; } flow_report_stream_t; +/* + * For each flow_report we want to be able to build buffers/frames per thread. + */ +typedef struct +{ + vlib_buffer_t *buffer; + vlib_frame_t *frame; + u16 next_data_offset; + /* + * We need this per stream as the IPFIX sequence number is the count of + * data record sent, not the count of packets with data records sent. + * See RFC 7011, Sec 3.1 + */ + u8 n_data_records; +} flow_report_per_thread_t; + +/* + * A flow report represents a group of fields that are to be exported. + * Each flow_report has an associated template that is generated when + * the flow_report is added. Each flow_report is associated with a + * stream, and multiple flow_reports can use the same stream. When + * adding a flow_report the keys for the stream are the domain_id + * and the source_port. + */ typedef struct flow_report { /* ipfix rewrite, set by callback */ u8 *rewrite; u16 template_id; + int data_record_size; + flow_report_per_thread_t *per_thread_data; u32 stream_index; f64 last_template_sent; int update_rewrite; @@ -134,6 +170,13 @@ typedef struct ipfix_exporter /* UDP checksum calculation enable flag */ u8 udp_checksum; + + /* + * The amount of data needed for all the headers, prior to the first + * flowset (template or data or ...) This is mostly dependent on the + * L3 and L4 protocols in use. + */ + u32 all_headers_size; } ipfix_exporter_t; typedef struct flow_report_main @@ -171,6 +214,11 @@ typedef struct u32 domain_id; u16 src_port; u32 *stream_indexp; + /* + * When adding a flow report, the index of the flow report is stored + * here on success. + */ + u32 flow_report_index; } vnet_flow_report_add_del_args_t; int vnet_flow_report_add_del (ipfix_exporter_t *exp, @@ -191,6 +239,27 @@ int vnet_stream_change (ipfix_exporter_t *exp, u32 old_domain_id, */ ipfix_exporter_t *vnet_ipfix_exporter_lookup (ip4_address_t *ipfix_collector); +/* + * Get the currently in use buffer for the given stream on the given core. + * If there is no current buffer then allocate a new one and return that. + * This is the buffer that data records should be written into. The offset + * currently in use is stored in the per-thread data for the stream and + * should be updated as new records are written in. + */ +vlib_buffer_t *vnet_ipfix_exp_get_buffer (vlib_main_t *vm, + ipfix_exporter_t *exp, + flow_report_t *fr, u32 thread_index); + +/* + * Send the provided buffer. At this stage the buffer should be populated + * with data records, with the offset in use stored in the stream per thread + * data. This func will fix up all the headers and then send the buffer. + */ +void vnet_ipfix_exp_send_buffer (vlib_main_t *vm, ipfix_exporter_t *exp, + flow_report_t *fr, + flow_report_stream_t *stream, + u32 thread_index, vlib_buffer_t *b0); + #endif /* __included_vnet_flow_report_h__ */ /* |