aboutsummaryrefslogtreecommitdiffstats
path: root/examples/performance-thread/common/lthread_sched.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/performance-thread/common/lthread_sched.c')
-rw-r--r--examples/performance-thread/common/lthread_sched.c599
1 files changed, 599 insertions, 0 deletions
diff --git a/examples/performance-thread/common/lthread_sched.c b/examples/performance-thread/common/lthread_sched.c
new file mode 100644
index 00000000..7c40bc05
--- /dev/null
+++ b/examples/performance-thread/common/lthread_sched.c
@@ -0,0 +1,599 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2015 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Some portions of this software is derived from the
+ * https://github.com/halayli/lthread which carrys the following license.
+ *
+ * Copyright (C) 2012, Hasan Alayli <halayli@gmail.com>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+
+#define RTE_MEM 1
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include <stddef.h>
+#include <limits.h>
+#include <inttypes.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <fcntl.h>
+#include <sys/time.h>
+#include <sys/mman.h>
+#include <sched.h>
+
+#include <rte_prefetch.h>
+#include <rte_per_lcore.h>
+#include <rte_atomic.h>
+#include <rte_atomic_64.h>
+#include <rte_log.h>
+#include <rte_common.h>
+#include <rte_branch_prediction.h>
+
+#include "lthread_api.h"
+#include "lthread_int.h"
+#include "lthread_sched.h"
+#include "lthread_objcache.h"
+#include "lthread_timer.h"
+#include "lthread_mutex.h"
+#include "lthread_cond.h"
+#include "lthread_tls.h"
+#include "lthread_diag.h"
+
+/*
+ * This file implements the lthread scheduler
+ * The scheduler is the function lthread_run()
+ * This must be run as the main loop of an EAL thread.
+ *
+ * Currently once a scheduler is created it cannot be destroyed
+ * When a scheduler shuts down it is assumed that the application is terminating
+ */
+
+static rte_atomic16_t num_schedulers;
+static rte_atomic16_t active_schedulers;
+
+/* one scheduler per lcore */
+RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL;
+
+struct lthread_sched *schedcore[LTHREAD_MAX_LCORES];
+
+diag_callback diag_cb;
+
+uint64_t diag_mask;
+
+
+/* constructor */
+void lthread_sched_ctor(void) __attribute__ ((constructor));
+void lthread_sched_ctor(void)
+{
+ memset(schedcore, 0, sizeof(schedcore));
+ rte_atomic16_init(&num_schedulers);
+ rte_atomic16_set(&num_schedulers, 1);
+ rte_atomic16_init(&active_schedulers);
+ rte_atomic16_set(&active_schedulers, 0);
+ diag_cb = NULL;
+}
+
+
+enum sched_alloc_phase {
+ SCHED_ALLOC_OK,
+ SCHED_ALLOC_QNODE_POOL,
+ SCHED_ALLOC_READY_QUEUE,
+ SCHED_ALLOC_PREADY_QUEUE,
+ SCHED_ALLOC_LTHREAD_CACHE,
+ SCHED_ALLOC_STACK_CACHE,
+ SCHED_ALLOC_PERLT_CACHE,
+ SCHED_ALLOC_TLS_CACHE,
+ SCHED_ALLOC_COND_CACHE,
+ SCHED_ALLOC_MUTEX_CACHE,
+};
+
+static int
+_lthread_sched_alloc_resources(struct lthread_sched *new_sched)
+{
+ int alloc_status;
+
+ do {
+ /* Initialize per scheduler queue node pool */
+ alloc_status = SCHED_ALLOC_QNODE_POOL;
+ new_sched->qnode_pool =
+ _qnode_pool_create("qnode pool", LTHREAD_PREALLOC);
+ if (new_sched->qnode_pool == NULL)
+ break;
+
+ /* Initialize per scheduler local ready queue */
+ alloc_status = SCHED_ALLOC_READY_QUEUE;
+ new_sched->ready = _lthread_queue_create("ready queue");
+ if (new_sched->ready == NULL)
+ break;
+
+ /* Initialize per scheduler local peer ready queue */
+ alloc_status = SCHED_ALLOC_PREADY_QUEUE;
+ new_sched->pready = _lthread_queue_create("pready queue");
+ if (new_sched->pready == NULL)
+ break;
+
+ /* Initialize per scheduler local free lthread cache */
+ alloc_status = SCHED_ALLOC_LTHREAD_CACHE;
+ new_sched->lthread_cache =
+ _lthread_objcache_create("lthread cache",
+ sizeof(struct lthread),
+ LTHREAD_PREALLOC);
+ if (new_sched->lthread_cache == NULL)
+ break;
+
+ /* Initialize per scheduler local free stack cache */
+ alloc_status = SCHED_ALLOC_STACK_CACHE;
+ new_sched->stack_cache =
+ _lthread_objcache_create("stack_cache",
+ sizeof(struct lthread_stack),
+ LTHREAD_PREALLOC);
+ if (new_sched->stack_cache == NULL)
+ break;
+
+ /* Initialize per scheduler local free per lthread data cache */
+ alloc_status = SCHED_ALLOC_PERLT_CACHE;
+ new_sched->per_lthread_cache =
+ _lthread_objcache_create("per_lt cache",
+ RTE_PER_LTHREAD_SECTION_SIZE,
+ LTHREAD_PREALLOC);
+ if (new_sched->per_lthread_cache == NULL)
+ break;
+
+ /* Initialize per scheduler local free tls cache */
+ alloc_status = SCHED_ALLOC_TLS_CACHE;
+ new_sched->tls_cache =
+ _lthread_objcache_create("TLS cache",
+ sizeof(struct lthread_tls),
+ LTHREAD_PREALLOC);
+ if (new_sched->tls_cache == NULL)
+ break;
+
+ /* Initialize per scheduler local free cond var cache */
+ alloc_status = SCHED_ALLOC_COND_CACHE;
+ new_sched->cond_cache =
+ _lthread_objcache_create("cond cache",
+ sizeof(struct lthread_cond),
+ LTHREAD_PREALLOC);
+ if (new_sched->cond_cache == NULL)
+ break;
+
+ /* Initialize per scheduler local free mutex cache */
+ alloc_status = SCHED_ALLOC_MUTEX_CACHE;
+ new_sched->mutex_cache =
+ _lthread_objcache_create("mutex cache",
+ sizeof(struct lthread_mutex),
+ LTHREAD_PREALLOC);
+ if (new_sched->mutex_cache == NULL)
+ break;
+
+ alloc_status = SCHED_ALLOC_OK;
+ } while (0);
+
+ /* roll back on any failure */
+ switch (alloc_status) {
+ case SCHED_ALLOC_MUTEX_CACHE:
+ _lthread_objcache_destroy(new_sched->cond_cache);
+ /* fall through */
+ case SCHED_ALLOC_COND_CACHE:
+ _lthread_objcache_destroy(new_sched->tls_cache);
+ /* fall through */
+ case SCHED_ALLOC_TLS_CACHE:
+ _lthread_objcache_destroy(new_sched->per_lthread_cache);
+ /* fall through */
+ case SCHED_ALLOC_PERLT_CACHE:
+ _lthread_objcache_destroy(new_sched->stack_cache);
+ /* fall through */
+ case SCHED_ALLOC_STACK_CACHE:
+ _lthread_objcache_destroy(new_sched->lthread_cache);
+ /* fall through */
+ case SCHED_ALLOC_LTHREAD_CACHE:
+ _lthread_queue_destroy(new_sched->pready);
+ /* fall through */
+ case SCHED_ALLOC_PREADY_QUEUE:
+ _lthread_queue_destroy(new_sched->ready);
+ /* fall through */
+ case SCHED_ALLOC_READY_QUEUE:
+ _qnode_pool_destroy(new_sched->qnode_pool);
+ /* fall through */
+ case SCHED_ALLOC_QNODE_POOL:
+ /* fall through */
+ case SCHED_ALLOC_OK:
+ break;
+ }
+ return alloc_status;
+}
+
+
+/*
+ * Create a scheduler on the current lcore
+ */
+struct lthread_sched *_lthread_sched_create(size_t stack_size)
+{
+ int status;
+ struct lthread_sched *new_sched;
+ unsigned lcoreid = rte_lcore_id();
+
+ LTHREAD_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE);
+
+ if (stack_size == 0)
+ stack_size = LTHREAD_MAX_STACK_SIZE;
+
+ new_sched =
+ rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched),
+ RTE_CACHE_LINE_SIZE,
+ rte_socket_id());
+ if (new_sched == NULL) {
+ RTE_LOG(CRIT, LTHREAD,
+ "Failed to allocate memory for scheduler\n");
+ return NULL;
+ }
+
+ _lthread_key_pool_init();
+
+ new_sched->stack_size = stack_size;
+ new_sched->birth = rte_rdtsc();
+ THIS_SCHED = new_sched;
+
+ status = _lthread_sched_alloc_resources(new_sched);
+ if (status != SCHED_ALLOC_OK) {
+ RTE_LOG(CRIT, LTHREAD,
+ "Failed to allocate resources for scheduler code = %d\n",
+ status);
+ rte_free(new_sched);
+ return NULL;
+ }
+
+ bzero(&new_sched->ctx, sizeof(struct ctx));
+
+ new_sched->lcore_id = lcoreid;
+
+ schedcore[lcoreid] = new_sched;
+
+ new_sched->run_flag = 1;
+
+ DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0);
+
+ rte_wmb();
+ return new_sched;
+}
+
+/*
+ * Set the number of schedulers in the system
+ */
+int lthread_num_schedulers_set(int num)
+{
+ rte_atomic16_set(&num_schedulers, num);
+ return (int)rte_atomic16_read(&num_schedulers);
+}
+
+/*
+ * Return the number of schedulers active
+ */
+int lthread_active_schedulers(void)
+{
+ return (int)rte_atomic16_read(&active_schedulers);
+}
+
+
+/**
+ * shutdown the scheduler running on the specified lcore
+ */
+void lthread_scheduler_shutdown(unsigned lcoreid)
+{
+ uint64_t coreid = (uint64_t) lcoreid;
+
+ if (coreid < LTHREAD_MAX_LCORES) {
+ if (schedcore[coreid] != NULL)
+ schedcore[coreid]->run_flag = 0;
+ }
+}
+
+/**
+ * shutdown all schedulers
+ */
+void lthread_scheduler_shutdown_all(void)
+{
+ uint64_t i;
+
+ /*
+ * give time for all schedulers to have started
+ * Note we use sched_yield() rather than pthread_yield() to allow
+ * for the possibility of a pthread wrapper on lthread_yield(),
+ * something that is not possible unless the scheduler is running.
+ */
+ while (rte_atomic16_read(&active_schedulers) <
+ rte_atomic16_read(&num_schedulers))
+ sched_yield();
+
+ for (i = 0; i < LTHREAD_MAX_LCORES; i++) {
+ if (schedcore[i] != NULL)
+ schedcore[i]->run_flag = 0;
+ }
+}
+
+/*
+ * Resume a suspended lthread
+ */
+static inline void
+_lthread_resume(struct lthread *lt) __attribute__ ((always_inline));
+static inline void _lthread_resume(struct lthread *lt)
+{
+ struct lthread_sched *sched = THIS_SCHED;
+ struct lthread_stack *s;
+ uint64_t state = lt->state;
+#if LTHREAD_DIAG
+ int init = 0;
+#endif
+
+ sched->current_lthread = lt;
+
+ if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) {
+ /* if detached we can free the thread now */
+ if (state & BIT(ST_LT_DETACH)) {
+ _lthread_free(lt);
+ sched->current_lthread = NULL;
+ return;
+ }
+ }
+
+ if (state & BIT(ST_LT_INIT)) {
+ /* first time this thread has been run */
+ /* assign thread to this scheduler */
+ lt->sched = THIS_SCHED;
+
+ /* allocate stack */
+ s = _stack_alloc();
+
+ lt->stack_container = s;
+ _lthread_set_stack(lt, s->stack, s->stack_size);
+
+ /* allocate memory for TLS used by this thread */
+ _lthread_tls_alloc(lt);
+
+ lt->state = BIT(ST_LT_READY);
+#if LTHREAD_DIAG
+ init = 1;
+#endif
+ }
+
+ DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt);
+
+ /* switch to the new thread */
+ ctx_switch(&lt->ctx, &sched->ctx);
+
+ /* If posting to a queue that could be read by another lcore
+ * we defer the queue write till now to ensure the context has been
+ * saved before the other core tries to resume it
+ * This applies to blocking on mutex, cond, and to set_affinity
+ */
+ if (lt->pending_wr_queue != NULL) {
+ struct lthread_queue *dest = lt->pending_wr_queue;
+
+ lt->pending_wr_queue = NULL;
+
+ /* queue the current thread to the specified queue */
+ _lthread_queue_insert_mp(dest, lt);
+ }
+
+ sched->current_lthread = NULL;
+}
+
+/*
+ * Handle sleep timer expiry
+*/
+void
+_sched_timer_cb(struct rte_timer *tim, void *arg)
+{
+ struct lthread *lt = (struct lthread *) arg;
+ uint64_t state = lt->state;
+
+ DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, &lt->tim, 0);
+
+ rte_timer_stop(tim);
+
+ if (lt->state & BIT(ST_LT_CANCELLED))
+ (THIS_SCHED)->nb_blocked_threads--;
+
+ lt->state = state | BIT(ST_LT_EXPIRED);
+ _lthread_resume(lt);
+ lt->state = state & CLEARBIT(ST_LT_EXPIRED);
+}
+
+
+
+/*
+ * Returns 0 if there is a pending job in scheduler or 1 if done and can exit.
+ */
+static inline int _lthread_sched_isdone(struct lthread_sched *sched)
+{
+ return (sched->run_flag == 0) &&
+ (_lthread_queue_empty(sched->ready)) &&
+ (_lthread_queue_empty(sched->pready)) &&
+ (sched->nb_blocked_threads == 0);
+}
+
+/*
+ * Wait for all schedulers to start
+ */
+static inline void _lthread_schedulers_sync_start(void)
+{
+ rte_atomic16_inc(&active_schedulers);
+
+ /* wait for lthread schedulers
+ * Note we use sched_yield() rather than pthread_yield() to allow
+ * for the possibility of a pthread wrapper on lthread_yield(),
+ * something that is not possible unless the scheduler is running.
+ */
+ while (rte_atomic16_read(&active_schedulers) <
+ rte_atomic16_read(&num_schedulers))
+ sched_yield();
+
+}
+
+/*
+ * Wait for all schedulers to stop
+ */
+static inline void _lthread_schedulers_sync_stop(void)
+{
+ rte_atomic16_dec(&active_schedulers);
+ rte_atomic16_dec(&num_schedulers);
+
+ /* wait for schedulers
+ * Note we use sched_yield() rather than pthread_yield() to allow
+ * for the possibility of a pthread wrapper on lthread_yield(),
+ * something that is not possible unless the scheduler is running.
+ */
+ while (rte_atomic16_read(&active_schedulers) > 0)
+ sched_yield();
+
+}
+
+
+/*
+ * Run the lthread scheduler
+ * This loop is the heart of the system
+ */
+void lthread_run(void)
+{
+
+ struct lthread_sched *sched = THIS_SCHED;
+ struct lthread *lt = NULL;
+
+ RTE_LOG(INFO, LTHREAD,
+ "starting scheduler %p on lcore %u phys core %u\n",
+ sched, rte_lcore_id(),
+ rte_lcore_index(rte_lcore_id()));
+
+ /* if more than one, wait for all schedulers to start */
+ _lthread_schedulers_sync_start();
+
+
+ /*
+ * This is the main scheduling loop
+ * So long as there are tasks in existence we run this loop.
+ * We check for:-
+ * expired timers,
+ * the local ready queue,
+ * and the peer ready queue,
+ *
+ * and resume lthreads ad infinitum.
+ */
+ while (!_lthread_sched_isdone(sched)) {
+
+ rte_timer_manage();
+
+ lt = _lthread_queue_poll(sched->ready);
+ if (lt != NULL)
+ _lthread_resume(lt);
+ lt = _lthread_queue_poll(sched->pready);
+ if (lt != NULL)
+ _lthread_resume(lt);
+ }
+
+
+ /* if more than one wait for all schedulers to stop */
+ _lthread_schedulers_sync_stop();
+
+ (THIS_SCHED) = NULL;
+
+ RTE_LOG(INFO, LTHREAD,
+ "stopping scheduler %p on lcore %u phys core %u\n",
+ sched, rte_lcore_id(),
+ rte_lcore_index(rte_lcore_id()));
+ fflush(stdout);
+}
+
+/*
+ * Return the scheduler for this lcore
+ *
+ */
+struct lthread_sched *_lthread_sched_get(int lcore_id)
+{
+ if (lcore_id > LTHREAD_MAX_LCORES)
+ return NULL;
+ return schedcore[lcore_id];
+}
+
+/*
+ * migrate the current thread to another scheduler running
+ * on the specified lcore.
+ */
+int lthread_set_affinity(unsigned lcoreid)
+{
+ struct lthread *lt = THIS_LTHREAD;
+ struct lthread_sched *dest_sched;
+
+ if (unlikely(lcoreid > LTHREAD_MAX_LCORES))
+ return POSIX_ERRNO(EINVAL);
+
+
+ DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0);
+
+ dest_sched = schedcore[lcoreid];
+
+ if (unlikely(dest_sched == NULL))
+ return POSIX_ERRNO(EINVAL);
+
+ if (likely(dest_sched != THIS_SCHED)) {
+ lt->sched = dest_sched;
+ lt->pending_wr_queue = dest_sched->pready;
+ _affinitize();
+ return 0;
+ }
+ return 0;
+}