diff options
Diffstat (limited to 'examples/performance-thread/common/lthread_sched.c')
-rw-r--r-- | examples/performance-thread/common/lthread_sched.c | 599 |
1 files changed, 599 insertions, 0 deletions
diff --git a/examples/performance-thread/common/lthread_sched.c b/examples/performance-thread/common/lthread_sched.c new file mode 100644 index 00000000..7c40bc05 --- /dev/null +++ b/examples/performance-thread/common/lthread_sched.c @@ -0,0 +1,599 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2015 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Some portions of this software is derived from the + * https://github.com/halayli/lthread which carrys the following license. + * + * Copyright (C) 2012, Hasan Alayli <halayli@gmail.com> + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + + +#define RTE_MEM 1 + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <stddef.h> +#include <limits.h> +#include <inttypes.h> +#include <unistd.h> +#include <pthread.h> +#include <fcntl.h> +#include <sys/time.h> +#include <sys/mman.h> +#include <sched.h> + +#include <rte_prefetch.h> +#include <rte_per_lcore.h> +#include <rte_atomic.h> +#include <rte_atomic_64.h> +#include <rte_log.h> +#include <rte_common.h> +#include <rte_branch_prediction.h> + +#include "lthread_api.h" +#include "lthread_int.h" +#include "lthread_sched.h" +#include "lthread_objcache.h" +#include "lthread_timer.h" +#include "lthread_mutex.h" +#include "lthread_cond.h" +#include "lthread_tls.h" +#include "lthread_diag.h" + +/* + * This file implements the lthread scheduler + * The scheduler is the function lthread_run() + * This must be run as the main loop of an EAL thread. + * + * Currently once a scheduler is created it cannot be destroyed + * When a scheduler shuts down it is assumed that the application is terminating + */ + +static rte_atomic16_t num_schedulers; +static rte_atomic16_t active_schedulers; + +/* one scheduler per lcore */ +RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL; + +struct lthread_sched *schedcore[LTHREAD_MAX_LCORES]; + +diag_callback diag_cb; + +uint64_t diag_mask; + + +/* constructor */ +void lthread_sched_ctor(void) __attribute__ ((constructor)); +void lthread_sched_ctor(void) +{ + memset(schedcore, 0, sizeof(schedcore)); + rte_atomic16_init(&num_schedulers); + rte_atomic16_set(&num_schedulers, 1); + rte_atomic16_init(&active_schedulers); + rte_atomic16_set(&active_schedulers, 0); + diag_cb = NULL; +} + + +enum sched_alloc_phase { + SCHED_ALLOC_OK, + SCHED_ALLOC_QNODE_POOL, + SCHED_ALLOC_READY_QUEUE, + SCHED_ALLOC_PREADY_QUEUE, + SCHED_ALLOC_LTHREAD_CACHE, + SCHED_ALLOC_STACK_CACHE, + SCHED_ALLOC_PERLT_CACHE, + SCHED_ALLOC_TLS_CACHE, + SCHED_ALLOC_COND_CACHE, + SCHED_ALLOC_MUTEX_CACHE, +}; + +static int +_lthread_sched_alloc_resources(struct lthread_sched *new_sched) +{ + int alloc_status; + + do { + /* Initialize per scheduler queue node pool */ + alloc_status = SCHED_ALLOC_QNODE_POOL; + new_sched->qnode_pool = + _qnode_pool_create("qnode pool", LTHREAD_PREALLOC); + if (new_sched->qnode_pool == NULL) + break; + + /* Initialize per scheduler local ready queue */ + alloc_status = SCHED_ALLOC_READY_QUEUE; + new_sched->ready = _lthread_queue_create("ready queue"); + if (new_sched->ready == NULL) + break; + + /* Initialize per scheduler local peer ready queue */ + alloc_status = SCHED_ALLOC_PREADY_QUEUE; + new_sched->pready = _lthread_queue_create("pready queue"); + if (new_sched->pready == NULL) + break; + + /* Initialize per scheduler local free lthread cache */ + alloc_status = SCHED_ALLOC_LTHREAD_CACHE; + new_sched->lthread_cache = + _lthread_objcache_create("lthread cache", + sizeof(struct lthread), + LTHREAD_PREALLOC); + if (new_sched->lthread_cache == NULL) + break; + + /* Initialize per scheduler local free stack cache */ + alloc_status = SCHED_ALLOC_STACK_CACHE; + new_sched->stack_cache = + _lthread_objcache_create("stack_cache", + sizeof(struct lthread_stack), + LTHREAD_PREALLOC); + if (new_sched->stack_cache == NULL) + break; + + /* Initialize per scheduler local free per lthread data cache */ + alloc_status = SCHED_ALLOC_PERLT_CACHE; + new_sched->per_lthread_cache = + _lthread_objcache_create("per_lt cache", + RTE_PER_LTHREAD_SECTION_SIZE, + LTHREAD_PREALLOC); + if (new_sched->per_lthread_cache == NULL) + break; + + /* Initialize per scheduler local free tls cache */ + alloc_status = SCHED_ALLOC_TLS_CACHE; + new_sched->tls_cache = + _lthread_objcache_create("TLS cache", + sizeof(struct lthread_tls), + LTHREAD_PREALLOC); + if (new_sched->tls_cache == NULL) + break; + + /* Initialize per scheduler local free cond var cache */ + alloc_status = SCHED_ALLOC_COND_CACHE; + new_sched->cond_cache = + _lthread_objcache_create("cond cache", + sizeof(struct lthread_cond), + LTHREAD_PREALLOC); + if (new_sched->cond_cache == NULL) + break; + + /* Initialize per scheduler local free mutex cache */ + alloc_status = SCHED_ALLOC_MUTEX_CACHE; + new_sched->mutex_cache = + _lthread_objcache_create("mutex cache", + sizeof(struct lthread_mutex), + LTHREAD_PREALLOC); + if (new_sched->mutex_cache == NULL) + break; + + alloc_status = SCHED_ALLOC_OK; + } while (0); + + /* roll back on any failure */ + switch (alloc_status) { + case SCHED_ALLOC_MUTEX_CACHE: + _lthread_objcache_destroy(new_sched->cond_cache); + /* fall through */ + case SCHED_ALLOC_COND_CACHE: + _lthread_objcache_destroy(new_sched->tls_cache); + /* fall through */ + case SCHED_ALLOC_TLS_CACHE: + _lthread_objcache_destroy(new_sched->per_lthread_cache); + /* fall through */ + case SCHED_ALLOC_PERLT_CACHE: + _lthread_objcache_destroy(new_sched->stack_cache); + /* fall through */ + case SCHED_ALLOC_STACK_CACHE: + _lthread_objcache_destroy(new_sched->lthread_cache); + /* fall through */ + case SCHED_ALLOC_LTHREAD_CACHE: + _lthread_queue_destroy(new_sched->pready); + /* fall through */ + case SCHED_ALLOC_PREADY_QUEUE: + _lthread_queue_destroy(new_sched->ready); + /* fall through */ + case SCHED_ALLOC_READY_QUEUE: + _qnode_pool_destroy(new_sched->qnode_pool); + /* fall through */ + case SCHED_ALLOC_QNODE_POOL: + /* fall through */ + case SCHED_ALLOC_OK: + break; + } + return alloc_status; +} + + +/* + * Create a scheduler on the current lcore + */ +struct lthread_sched *_lthread_sched_create(size_t stack_size) +{ + int status; + struct lthread_sched *new_sched; + unsigned lcoreid = rte_lcore_id(); + + LTHREAD_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE); + + if (stack_size == 0) + stack_size = LTHREAD_MAX_STACK_SIZE; + + new_sched = + rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched), + RTE_CACHE_LINE_SIZE, + rte_socket_id()); + if (new_sched == NULL) { + RTE_LOG(CRIT, LTHREAD, + "Failed to allocate memory for scheduler\n"); + return NULL; + } + + _lthread_key_pool_init(); + + new_sched->stack_size = stack_size; + new_sched->birth = rte_rdtsc(); + THIS_SCHED = new_sched; + + status = _lthread_sched_alloc_resources(new_sched); + if (status != SCHED_ALLOC_OK) { + RTE_LOG(CRIT, LTHREAD, + "Failed to allocate resources for scheduler code = %d\n", + status); + rte_free(new_sched); + return NULL; + } + + bzero(&new_sched->ctx, sizeof(struct ctx)); + + new_sched->lcore_id = lcoreid; + + schedcore[lcoreid] = new_sched; + + new_sched->run_flag = 1; + + DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0); + + rte_wmb(); + return new_sched; +} + +/* + * Set the number of schedulers in the system + */ +int lthread_num_schedulers_set(int num) +{ + rte_atomic16_set(&num_schedulers, num); + return (int)rte_atomic16_read(&num_schedulers); +} + +/* + * Return the number of schedulers active + */ +int lthread_active_schedulers(void) +{ + return (int)rte_atomic16_read(&active_schedulers); +} + + +/** + * shutdown the scheduler running on the specified lcore + */ +void lthread_scheduler_shutdown(unsigned lcoreid) +{ + uint64_t coreid = (uint64_t) lcoreid; + + if (coreid < LTHREAD_MAX_LCORES) { + if (schedcore[coreid] != NULL) + schedcore[coreid]->run_flag = 0; + } +} + +/** + * shutdown all schedulers + */ +void lthread_scheduler_shutdown_all(void) +{ + uint64_t i; + + /* + * give time for all schedulers to have started + * Note we use sched_yield() rather than pthread_yield() to allow + * for the possibility of a pthread wrapper on lthread_yield(), + * something that is not possible unless the scheduler is running. + */ + while (rte_atomic16_read(&active_schedulers) < + rte_atomic16_read(&num_schedulers)) + sched_yield(); + + for (i = 0; i < LTHREAD_MAX_LCORES; i++) { + if (schedcore[i] != NULL) + schedcore[i]->run_flag = 0; + } +} + +/* + * Resume a suspended lthread + */ +static inline void +_lthread_resume(struct lthread *lt) __attribute__ ((always_inline)); +static inline void _lthread_resume(struct lthread *lt) +{ + struct lthread_sched *sched = THIS_SCHED; + struct lthread_stack *s; + uint64_t state = lt->state; +#if LTHREAD_DIAG + int init = 0; +#endif + + sched->current_lthread = lt; + + if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) { + /* if detached we can free the thread now */ + if (state & BIT(ST_LT_DETACH)) { + _lthread_free(lt); + sched->current_lthread = NULL; + return; + } + } + + if (state & BIT(ST_LT_INIT)) { + /* first time this thread has been run */ + /* assign thread to this scheduler */ + lt->sched = THIS_SCHED; + + /* allocate stack */ + s = _stack_alloc(); + + lt->stack_container = s; + _lthread_set_stack(lt, s->stack, s->stack_size); + + /* allocate memory for TLS used by this thread */ + _lthread_tls_alloc(lt); + + lt->state = BIT(ST_LT_READY); +#if LTHREAD_DIAG + init = 1; +#endif + } + + DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt); + + /* switch to the new thread */ + ctx_switch(<->ctx, &sched->ctx); + + /* If posting to a queue that could be read by another lcore + * we defer the queue write till now to ensure the context has been + * saved before the other core tries to resume it + * This applies to blocking on mutex, cond, and to set_affinity + */ + if (lt->pending_wr_queue != NULL) { + struct lthread_queue *dest = lt->pending_wr_queue; + + lt->pending_wr_queue = NULL; + + /* queue the current thread to the specified queue */ + _lthread_queue_insert_mp(dest, lt); + } + + sched->current_lthread = NULL; +} + +/* + * Handle sleep timer expiry +*/ +void +_sched_timer_cb(struct rte_timer *tim, void *arg) +{ + struct lthread *lt = (struct lthread *) arg; + uint64_t state = lt->state; + + DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, <->tim, 0); + + rte_timer_stop(tim); + + if (lt->state & BIT(ST_LT_CANCELLED)) + (THIS_SCHED)->nb_blocked_threads--; + + lt->state = state | BIT(ST_LT_EXPIRED); + _lthread_resume(lt); + lt->state = state & CLEARBIT(ST_LT_EXPIRED); +} + + + +/* + * Returns 0 if there is a pending job in scheduler or 1 if done and can exit. + */ +static inline int _lthread_sched_isdone(struct lthread_sched *sched) +{ + return (sched->run_flag == 0) && + (_lthread_queue_empty(sched->ready)) && + (_lthread_queue_empty(sched->pready)) && + (sched->nb_blocked_threads == 0); +} + +/* + * Wait for all schedulers to start + */ +static inline void _lthread_schedulers_sync_start(void) +{ + rte_atomic16_inc(&active_schedulers); + + /* wait for lthread schedulers + * Note we use sched_yield() rather than pthread_yield() to allow + * for the possibility of a pthread wrapper on lthread_yield(), + * something that is not possible unless the scheduler is running. + */ + while (rte_atomic16_read(&active_schedulers) < + rte_atomic16_read(&num_schedulers)) + sched_yield(); + +} + +/* + * Wait for all schedulers to stop + */ +static inline void _lthread_schedulers_sync_stop(void) +{ + rte_atomic16_dec(&active_schedulers); + rte_atomic16_dec(&num_schedulers); + + /* wait for schedulers + * Note we use sched_yield() rather than pthread_yield() to allow + * for the possibility of a pthread wrapper on lthread_yield(), + * something that is not possible unless the scheduler is running. + */ + while (rte_atomic16_read(&active_schedulers) > 0) + sched_yield(); + +} + + +/* + * Run the lthread scheduler + * This loop is the heart of the system + */ +void lthread_run(void) +{ + + struct lthread_sched *sched = THIS_SCHED; + struct lthread *lt = NULL; + + RTE_LOG(INFO, LTHREAD, + "starting scheduler %p on lcore %u phys core %u\n", + sched, rte_lcore_id(), + rte_lcore_index(rte_lcore_id())); + + /* if more than one, wait for all schedulers to start */ + _lthread_schedulers_sync_start(); + + + /* + * This is the main scheduling loop + * So long as there are tasks in existence we run this loop. + * We check for:- + * expired timers, + * the local ready queue, + * and the peer ready queue, + * + * and resume lthreads ad infinitum. + */ + while (!_lthread_sched_isdone(sched)) { + + rte_timer_manage(); + + lt = _lthread_queue_poll(sched->ready); + if (lt != NULL) + _lthread_resume(lt); + lt = _lthread_queue_poll(sched->pready); + if (lt != NULL) + _lthread_resume(lt); + } + + + /* if more than one wait for all schedulers to stop */ + _lthread_schedulers_sync_stop(); + + (THIS_SCHED) = NULL; + + RTE_LOG(INFO, LTHREAD, + "stopping scheduler %p on lcore %u phys core %u\n", + sched, rte_lcore_id(), + rte_lcore_index(rte_lcore_id())); + fflush(stdout); +} + +/* + * Return the scheduler for this lcore + * + */ +struct lthread_sched *_lthread_sched_get(int lcore_id) +{ + if (lcore_id > LTHREAD_MAX_LCORES) + return NULL; + return schedcore[lcore_id]; +} + +/* + * migrate the current thread to another scheduler running + * on the specified lcore. + */ +int lthread_set_affinity(unsigned lcoreid) +{ + struct lthread *lt = THIS_LTHREAD; + struct lthread_sched *dest_sched; + + if (unlikely(lcoreid > LTHREAD_MAX_LCORES)) + return POSIX_ERRNO(EINVAL); + + + DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0); + + dest_sched = schedcore[lcoreid]; + + if (unlikely(dest_sched == NULL)) + return POSIX_ERRNO(EINVAL); + + if (likely(dest_sched != THIS_SCHED)) { + lt->sched = dest_sched; + lt->pending_wr_queue = dest_sched->pready; + _affinitize(); + return 0; + } + return 0; +} |