diff options
Diffstat (limited to 'vnet/vnet/devices/dpdk/threads.c')
-rw-r--r-- | vnet/vnet/devices/dpdk/threads.c | 378 |
1 files changed, 378 insertions, 0 deletions
diff --git a/vnet/vnet/devices/dpdk/threads.c b/vnet/vnet/devices/dpdk/threads.c new file mode 100644 index 00000000000..aa32f1007c3 --- /dev/null +++ b/vnet/vnet/devices/dpdk/threads.c @@ -0,0 +1,378 @@ +/* + * Copyright (c) 2015 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vnet/vnet.h> +#include <vppinfra/vec.h> +#include <vppinfra/error.h> +#include <vppinfra/format.h> +#include <signal.h> + +#include <vnet/ethernet/ethernet.h> +#include <vnet/devices/dpdk/dpdk.h> + +#include <vlibmemory/api.h> +#include <vlibmemory/vl_memory_msg_enum.h> /* enumerate all vlib messages */ + +#define vl_typedefs /* define message structures */ +#include <vlibmemory/vl_memory_api_h.h> +#undef vl_typedefs + +/* instantiate all the print functions we know about */ +#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__) +#define vl_printfun +#include <vlibmemory/vl_memory_api_h.h> +#undef vl_printfun + +vlib_thread_main_t vlib_thread_main; + +frame_queue_trace_t *frame_queue_traces; + +/* + * Check the frame queue to see if any frames are available. + * If so, pull the packets off the frames and put them to + * the handoff node. + */ +static inline int vlib_frame_queue_dequeue_internal (vlib_main_t *vm) +{ + u32 thread_id = vm->cpu_index; + vlib_frame_queue_t *fq = vlib_frame_queues[thread_id]; + vlib_frame_queue_elt_t *elt; + u32 * from, * to; + vlib_frame_t * f; + int msg_type; + int processed = 0; + u32 n_left_to_node; + u32 vectors = 0; + + ASSERT (fq); + ASSERT(vm == vlib_mains[thread_id]); + + /* + * Gather trace data for frame queues + */ + if (PREDICT_FALSE(fq->trace)) + { + frame_queue_trace_t *fqt; + u32 elix; + + fqt = &frame_queue_traces[thread_id]; + fqt->nelts = fq->nelts; + fqt->head = fq->head; + fqt->head_hint = fq->head_hint; + fqt->tail = fq->tail; + fqt->threshold = fq->vector_threshold; + fqt->n_in_use = fqt->tail - fqt->head; + if (fqt->n_in_use > fqt->nelts){ + fqt->n_in_use = 0; + } + + for (elix=0; elix<fqt->nelts; elix++) { + elt = fq->elts + ((fq->head+1 + elix) & (fq->nelts-1)); + if (1 || elt->valid) + { + fqt->n_vectors[elix] = elt->n_vectors; + } + } + fqt->written = 1; + } + + while (1) + { + if (fq->head == fq->tail) + { + fq->head_hint = fq->head; + return processed; + } + + elt = fq->elts + ((fq->head+1) & (fq->nelts-1)); + + if (!elt->valid) + { + fq->head_hint = fq->head; + return processed; + } + + from = elt->buffer_index; + msg_type = elt->msg_type; + + ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME); + ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE); + + f = vlib_get_frame_to_node + (vm, 1 ? handoff_dispatch_node.index : ethernet_input_node.index); + + to = vlib_frame_vector_args (f); + + n_left_to_node = elt->n_vectors; + + while (n_left_to_node >= 4) + { + to[0] = from[0]; + to[1] = from[1]; + to[2] = from[2]; + to[3] = from[3]; + to += 4; + from += 4; + n_left_to_node -= 4; + } + + while (n_left_to_node > 0) + { + to[0] = from[0]; + to++; + from++; + n_left_to_node--; + } + + vectors += elt->n_vectors; + f->n_vectors = elt->n_vectors; + vlib_put_frame_to_node + (vm, 1 ? handoff_dispatch_node.index : ethernet_input_node.index, f); + + elt->valid = 0; + elt->n_vectors = 0; + elt->msg_type = 0xfefefefe; + CLIB_MEMORY_BARRIER(); + fq->head++; + processed++; + + /* + * Limit the number of packets pushed into the graph + */ + if (vectors >= fq->vector_threshold) + { + fq->head_hint = fq->head; + return processed; + } + } + ASSERT(0); + return processed; +} + +int dpdk_frame_queue_dequeue (vlib_main_t *vm) +{ + return vlib_frame_queue_dequeue_internal (vm); +} + +/* + * dpdk_worker_thread - Contains the main loop of a worker thread. + * + * w + * Information for the current thread + * io_name + * The name of thread performing dpdk device IO (if any). If there are no + * instances of that thread, then the current thread will do dpdk device + * polling. Ports will be divided among instances of the current thread. + * callback + * If not null, this function will be called once during each main loop. + */ +static_always_inline void +dpdk_worker_thread_internal (vlib_main_t *vm, + dpdk_worker_thread_callback_t callback, + int have_io_threads) +{ + vlib_node_main_t * nm = &vm->node_main; + u64 cpu_time_now = clib_cpu_time_now (); + + while (1) + { + vlib_worker_thread_barrier_check (); + + vlib_frame_queue_dequeue_internal (vm); + + /* Invoke callback if supplied */ + if (PREDICT_FALSE(callback != NULL)) + callback(vm); + + if (!have_io_threads) + { + vlib_node_runtime_t * n; + vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]) + { + cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, + VLIB_NODE_STATE_POLLING, /* frame */ 0, + cpu_time_now); + } + + } + + if (_vec_len (nm->pending_frames)) + { + int i; + cpu_time_now = clib_cpu_time_now (); + for (i = 0; i < _vec_len (nm->pending_frames); i++) { + vlib_pending_frame_t *p; + + p = nm->pending_frames + i; + + cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now); + } + _vec_len (nm->pending_frames) = 0; + } + vlib_increment_main_loop_counter (vm); + + /* Record time stamp in case there are no enabled nodes and above + calls do not update time stamp. */ + cpu_time_now = clib_cpu_time_now (); + } +} + +void dpdk_worker_thread (vlib_worker_thread_t * w, + char *io_name, + dpdk_worker_thread_callback_t callback) +{ + vlib_main_t *vm; + uword * p; + vlib_thread_main_t * tm = vlib_get_thread_main(); + vlib_thread_registration_t * tr; + dpdk_main_t * dm = &dpdk_main; + + vm = vlib_get_main(); + + ASSERT(vm->cpu_index == os_get_cpu_number()); + + clib_time_init (&vm->clib_time); + clib_mem_set_heap (w->thread_mheap); + + /* Wait until the dpdk init sequence is complete */ + while (dm->io_thread_release == 0) + vlib_worker_thread_barrier_check (); + + /* any I/O threads? */ + p = hash_get_mem (tm->thread_registrations_by_name, io_name); + tr = (vlib_thread_registration_t *)p[0]; + + if (tr && tr->count > 0) + dpdk_worker_thread_internal(vm, callback, /* have_io_threads */ 1); + else + dpdk_worker_thread_internal(vm, callback, /* have_io_threads */ 0); +} + +void dpdk_worker_thread_fn (void * arg) +{ + vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg; + vlib_worker_thread_init (w); + dpdk_worker_thread (w, "io", 0); +} + +#if VIRL == 0 +VLIB_REGISTER_THREAD (worker_thread_reg, static) = { + .name = "workers", + .short_name = "wk", + .function = dpdk_worker_thread_fn, + .mheap_size = 256<<20, +}; +#endif + +void dpdk_io_thread_fn (void * arg) +{ + vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg; + vlib_worker_thread_init (w); + dpdk_io_thread (w, 0, 0, "workers", 0); +} + +#if VIRL == 0 +VLIB_REGISTER_THREAD (io_thread_reg, static) = { + .name = "io", + .short_name = "io", + .function = dpdk_io_thread_fn, + .mheap_size = 256<<20, +}; +#endif + +static void vl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp) +{ + vl_api_rpc_reply_t * rmp; + int (*fp)(void *); + i32 rv = 0; + vlib_main_t * vm = vlib_get_main(); + + if (mp->function == 0) + { + rv = -1; + clib_warning ("rpc NULL function pointer"); + } + + else + { + if (mp->need_barrier_sync) + vlib_worker_thread_barrier_sync (vm); + + fp = (void *)(mp->function); + rv = (*fp)(mp->data); + + if (mp->need_barrier_sync) + vlib_worker_thread_barrier_release (vm); + } + + if (mp->send_reply) + { + unix_shared_memory_queue_t * q = + vl_api_client_index_to_input_queue (mp->client_index); + if (q) + { + rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp)); + rmp->_vl_msg_id = ntohs (VL_API_RPC_REPLY); + rmp->context = mp->context; + rmp->retval = rv; + vl_msg_api_send_shmem (q, (u8 *)&rmp); + } + } + if (mp->multicast) + { + clib_warning ("multicast not yet implemented..."); + } +} + +static void vl_api_rpc_reply_t_handler (vl_api_rpc_reply_t * mp) +{ clib_warning ("unimplemented"); } + +void vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length) +{ + vl_api_rpc_call_t * mp; + api_main_t *am = &api_main; + vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr; + + mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length); + memset (mp, 0, sizeof (*mp)); + memcpy (mp->data, data, data_length); + mp->_vl_msg_id = ntohs (VL_API_RPC_CALL); + mp->function = (u64)fp; + mp->need_barrier_sync = 1; + + /* Use the "normal" control-plane mechanism for the main thread */ + vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *)&mp); +} + + +#define foreach_rpc_api_msg \ +_(RPC_CALL,rpc_call) \ +_(RPC_REPLY,rpc_reply) + +static clib_error_t * +rpc_api_hookup (vlib_main_t *vm) +{ +#define _(N,n) \ + vl_msg_api_set_handlers(VL_API_##N, #n, \ + vl_api_##n##_t_handler, \ + vl_noop_handler, \ + vl_noop_handler, \ + vl_api_##n##_t_print, \ + sizeof(vl_api_##n##_t), 0 /* do not trace */); + foreach_rpc_api_msg; +#undef _ + return 0; +} + +VLIB_API_INIT_FUNCTION(rpc_api_hookup); |