From cf751ec70df21affb19c77b2c51e3c231b8202ad Mon Sep 17 00:00:00 2001 From: Mohsin KAZMI Date: Wed, 18 Jan 2017 11:59:45 +0100 Subject: af_packet: multithreading support This patch adds multithreading support for af_packet interfaces. Change-Id: Ief5d1117e7ffeaa59dbc2831e583d5d8e8d4fa7a Signed-off-by: Mohsin KAZMI --- src/vnet/devices/af_packet/af_packet.c | 55 ++++++++++++++++++++++++++++++++++ src/vnet/devices/af_packet/af_packet.h | 7 +++++ src/vnet/devices/af_packet/device.c | 9 ++++++ src/vnet/devices/af_packet/node.c | 26 +++++++++------- 4 files changed, 86 insertions(+), 11 deletions(-) diff --git a/src/vnet/devices/af_packet/af_packet.c b/src/vnet/devices/af_packet/af_packet.c index 91c3988b..e491ba47 100644 --- a/src/vnet/devices/af_packet/af_packet.c +++ b/src/vnet/devices/af_packet/af_packet.c @@ -171,6 +171,31 @@ error: return ret; } +static void +af_packet_worker_thread_enable () +{ + /* If worker threads are enabled, switch to polling mode */ + foreach_vlib_main (( + { + vlib_node_set_state (this_vlib_main, + af_packet_input_node.index, + VLIB_NODE_STATE_POLLING); + })); + +} + +static void +af_packet_worker_thread_disable () +{ + foreach_vlib_main (( + { + vlib_node_set_state (this_vlib_main, + af_packet_input_node.index, + VLIB_NODE_STATE_INTERRUPT); + })); + +} + int af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set, u32 * sw_if_index) @@ -184,6 +209,7 @@ af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set, u8 hw_addr[6]; clib_error_t *error; vnet_sw_interface_t *sw; + vlib_thread_main_t *tm = vlib_get_thread_main (); vnet_main_t *vnm = vnet_get_main (); uword *p; uword if_index; @@ -226,6 +252,13 @@ af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set, apif->next_tx_frame = 0; apif->next_rx_frame = 0; + if (tm->n_vlib_mains > 1) + { + apif->lockp = clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, + CLIB_CACHE_LINE_BYTES); + memset ((void *) apif->lockp, 0, CLIB_CACHE_LINE_BYTES); + } + { unix_file_t template = { 0 }; template.read_function = af_packet_fd_read_ready; @@ -273,6 +306,10 @@ af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set, 0); if (sw_if_index) *sw_if_index = apif->sw_if_index; + + if (tm->n_vlib_mains > 1 && pool_elts (apm->interfaces) == 1) + af_packet_worker_thread_enable (); + return 0; error: @@ -286,6 +323,7 @@ int af_packet_delete_if (vlib_main_t * vm, u8 * host_if_name) { vnet_main_t *vnm = vnet_get_main (); + vlib_thread_main_t *tm = vlib_get_thread_main (); af_packet_main_t *apm = &af_packet_main; af_packet_if_t *apif; uword *p; @@ -335,6 +373,8 @@ af_packet_delete_if (vlib_main_t * vm, u8 * host_if_name) ethernet_delete_interface (vnm, apif->hw_if_index); pool_put (apm->interfaces, apif); + if (tm->n_vlib_mains > 1 && pool_elts (apm->interfaces) == 0) + af_packet_worker_thread_disable (); return 0; } @@ -344,9 +384,24 @@ af_packet_init (vlib_main_t * vm) { af_packet_main_t *apm = &af_packet_main; vlib_thread_main_t *tm = vlib_get_thread_main (); + vlib_thread_registration_t *tr; + uword *p; memset (apm, 0, sizeof (af_packet_main_t)); + apm->input_cpu_first_index = 0; + apm->input_cpu_count = 1; + + /* find out which cpus will be used for input */ + p = hash_get_mem (tm->thread_registrations_by_name, "workers"); + tr = p ? (vlib_thread_registration_t *) p[0] : 0; + + if (tr && tr->count > 0) + { + apm->input_cpu_first_index = tr->first_index; + apm->input_cpu_count = tr->count; + } + mhash_init_vec_string (&apm->if_index_by_host_if_name, sizeof (uword)); vec_validate_aligned (apm->rx_buffers, tm->n_vlib_mains - 1, diff --git a/src/vnet/devices/af_packet/af_packet.h b/src/vnet/devices/af_packet/af_packet.h index 19e2523d..e00e5cb4 100644 --- a/src/vnet/devices/af_packet/af_packet.h +++ b/src/vnet/devices/af_packet/af_packet.h @@ -20,6 +20,7 @@ typedef struct { CLIB_CACHE_LINE_ALIGN_MARK (cacheline0); + volatile u32 *lockp; u8 *host_if_name; int fd; struct tpacket_req *rx_req; @@ -50,6 +51,12 @@ typedef struct /* hash of host interface names */ mhash_t if_index_by_host_if_name; + + /* first cpu index */ + u32 input_cpu_first_index; + + /* total cpu count */ + u32 input_cpu_count; } af_packet_main_t; af_packet_main_t af_packet_main; diff --git a/src/vnet/devices/af_packet/device.c b/src/vnet/devices/af_packet/device.c index 1fb4000f..e3bf9bbc 100644 --- a/src/vnet/devices/af_packet/device.c +++ b/src/vnet/devices/af_packet/device.c @@ -92,6 +92,12 @@ af_packet_interface_tx (vlib_main_t * vm, struct tpacket2_hdr *tph; u32 frame_not_ready = 0; + if (PREDICT_FALSE (apif->lockp != 0)) + { + while (__sync_lock_test_and_set (apif->lockp, 1)) + ; + } + while (n_left > 0) { u32 len; @@ -152,6 +158,9 @@ af_packet_interface_tx (vlib_main_t * vm, } } + if (PREDICT_FALSE (apif->lockp != 0)) + *apif->lockp = 0; + if (PREDICT_FALSE (frame_not_ready)) vlib_error_count (vm, node->node_index, AF_PACKET_TX_ERROR_FRAME_NOT_READY, frame_not_ready); diff --git a/src/vnet/devices/af_packet/node.c b/src/vnet/devices/af_packet/node.c index 72004320..476ccca9 100644 --- a/src/vnet/devices/af_packet/node.c +++ b/src/vnet/devices/af_packet/node.c @@ -108,10 +108,9 @@ buffer_add_to_chain (vlib_main_t * vm, u32 bi, u32 first_bi, u32 prev_bi) always_inline uword af_packet_device_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node, - vlib_frame_t * frame, u32 device_idx) + vlib_frame_t * frame, af_packet_if_t * apif) { af_packet_main_t *apm = &af_packet_main; - af_packet_if_t *apif = pool_elt_at_index (apm->interfaces, device_idx); struct tpacket2_hdr *tph; u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT; u32 block = 0; @@ -125,10 +124,10 @@ af_packet_device_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node, u32 frame_num = apif->rx_req->tp_frame_nr; u8 *block_start = apif->rx_ring + block * block_size; uword n_trace = vlib_get_trace_count (vm, node); + u32 cpu_index = os_get_cpu_number (); u32 n_buffer_bytes = vlib_buffer_free_list_buffer_size (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX); u32 min_bufs = apif->rx_req->tp_frame_size / n_buffer_bytes; - int cpu_index = node->cpu_index; if (apif->per_interface_next_index != ~0) next_index = apif->per_interface_next_index; @@ -249,16 +248,18 @@ af_packet_input_fn (vlib_main_t * vm, vlib_node_runtime_t * node, { int i; u32 n_rx_packets = 0; - + u32 cpu_index = os_get_cpu_number (); af_packet_main_t *apm = &af_packet_main; + af_packet_if_t *apif; - /* *INDENT-OFF* */ - clib_bitmap_foreach (i, apm->pending_input_bitmap, - ({ - clib_bitmap_set (apm->pending_input_bitmap, i, 0); - n_rx_packets += af_packet_device_input_fn(vm, node, frame, i); - })); - /* *INDENT-ON* */ + for (i = 0; i < vec_len (apm->interfaces); i++) + { + apif = vec_elt_at_index (apm->interfaces, i); + if (apif->is_admin_up && + (i % apm->input_cpu_count) == + (cpu_index - apm->input_cpu_first_index)) + n_rx_packets += af_packet_device_input_fn (vm, node, frame, apif); + } return n_rx_packets; } @@ -270,6 +271,9 @@ VLIB_REGISTER_NODE (af_packet_input_node) = { .sibling_of = "device-input", .format_trace = format_af_packet_input_trace, .type = VLIB_NODE_TYPE_INPUT, + /** + * default state is INTERRUPT mode, switch to POLLING if worker threads are enabled + */ .state = VLIB_NODE_STATE_INTERRUPT, .n_errors = AF_PACKET_INPUT_N_ERROR, .error_strings = af_packet_input_error_strings, -- cgit 1.2.3-korg