/* * SPDX-License-Identifier: BSD-3-Clause * Copyright 2015 Intel Corporation. * Copyright 2012 Hasan Alayli */ #define RTE_MEM 1 #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "lthread_api.h" #include "lthread_int.h" #include "lthread_sched.h" #include "lthread_objcache.h" #include "lthread_timer.h" #include "lthread_mutex.h" #include "lthread_cond.h" #include "lthread_tls.h" #include "lthread_diag.h" /* * This file implements the lthread scheduler * The scheduler is the function lthread_run() * This must be run as the main loop of an EAL thread. * * Currently once a scheduler is created it cannot be destroyed * When a scheduler shuts down it is assumed that the application is terminating */ static rte_atomic16_t num_schedulers; static rte_atomic16_t active_schedulers; /* one scheduler per lcore */ RTE_DEFINE_PER_LCORE(struct lthread_sched *, this_sched) = NULL; struct lthread_sched *schedcore[LTHREAD_MAX_LCORES]; diag_callback diag_cb; uint64_t diag_mask; /* constructor */ RTE_INIT(lthread_sched_ctor) { memset(schedcore, 0, sizeof(schedcore)); rte_atomic16_init(&num_schedulers); rte_atomic16_set(&num_schedulers, 1); rte_atomic16_init(&active_schedulers); rte_atomic16_set(&active_schedulers, 0); diag_cb = NULL; } enum sched_alloc_phase { SCHED_ALLOC_OK, SCHED_ALLOC_QNODE_POOL, SCHED_ALLOC_READY_QUEUE, SCHED_ALLOC_PREADY_QUEUE, SCHED_ALLOC_LTHREAD_CACHE, SCHED_ALLOC_STACK_CACHE, SCHED_ALLOC_PERLT_CACHE, SCHED_ALLOC_TLS_CACHE, SCHED_ALLOC_COND_CACHE, SCHED_ALLOC_MUTEX_CACHE, }; static int _lthread_sched_alloc_resources(struct lthread_sched *new_sched) { int alloc_status; do { /* Initialize per scheduler queue node pool */ alloc_status = SCHED_ALLOC_QNODE_POOL; new_sched->qnode_pool = _qnode_pool_create("qnode pool", LTHREAD_PREALLOC); if (new_sched->qnode_pool == NULL) break; /* Initialize per scheduler local ready queue */ alloc_status = SCHED_ALLOC_READY_QUEUE; new_sched->ready = _lthread_queue_create("ready queue"); if (new_sched->ready == NULL) break; /* Initialize per scheduler local peer ready queue */ alloc_status = SCHED_ALLOC_PREADY_QUEUE; new_sched->pready = _lthread_queue_create("pready queue"); if (new_sched->pready == NULL) break; /* Initialize per scheduler local free lthread cache */ alloc_status = SCHED_ALLOC_LTHREAD_CACHE; new_sched->lthread_cache = _lthread_objcache_create("lthread cache", sizeof(struct lthread), LTHREAD_PREALLOC); if (new_sched->lthread_cache == NULL) break; /* Initialize per scheduler local free stack cache */ alloc_status = SCHED_ALLOC_STACK_CACHE; new_sched->stack_cache = _lthread_objcache_create("stack_cache", sizeof(struct lthread_stack), LTHREAD_PREALLOC); if (new_sched->stack_cache == NULL) break; /* Initialize per scheduler local free per lthread data cache */ alloc_status = SCHED_ALLOC_PERLT_CACHE; new_sched->per_lthread_cache = _lthread_objcache_create("per_lt cache", RTE_PER_LTHREAD_SECTION_SIZE, LTHREAD_PREALLOC); if (new_sched->per_lthread_cache == NULL) break; /* Initialize per scheduler local free tls cache */ alloc_status = SCHED_ALLOC_TLS_CACHE; new_sched->tls_cache = _lthread_objcache_create("TLS cache", sizeof(struct lthread_tls), LTHREAD_PREALLOC); if (new_sched->tls_cache == NULL) break; /* Initialize per scheduler local free cond var cache */ alloc_status = SCHED_ALLOC_COND_CACHE; new_sched->cond_cache = _lthread_objcache_create("cond cache", sizeof(struct lthread_cond), LTHREAD_PREALLOC); if (new_sched->cond_cache == NULL) break; /* Initialize per scheduler local free mutex cache */ alloc_status = SCHED_ALLOC_MUTEX_CACHE; new_sched->mutex_cache = _lthread_objcache_create("mutex cache", sizeof(struct lthread_mutex), LTHREAD_PREALLOC); if (new_sched->mutex_cache == NULL) break; alloc_status = SCHED_ALLOC_OK; } while (0); /* roll back on any failure */ switch (alloc_status) { case SCHED_ALLOC_MUTEX_CACHE: _lthread_objcache_destroy(new_sched->cond_cache); /* fall through */ case SCHED_ALLOC_COND_CACHE: _lthread_objcache_destroy(new_sched->tls_cache); /* fall through */ case SCHED_ALLOC_TLS_CACHE: _lthread_objcache_destroy(new_sched->per_lthread_cache); /* fall through */ case SCHED_ALLOC_PERLT_CACHE: _lthread_objcache_destroy(new_sched->stack_cache); /* fall through */ case SCHED_ALLOC_STACK_CACHE: _lthread_objcache_destroy(new_sched->lthread_cache); /* fall through */ case SCHED_ALLOC_LTHREAD_CACHE: _lthread_queue_destroy(new_sched->pready); /* fall through */ case SCHED_ALLOC_PREADY_QUEUE: _lthread_queue_destroy(new_sched->ready); /* fall through */ case SCHED_ALLOC_READY_QUEUE: _qnode_pool_destroy(new_sched->qnode_pool); /* fall through */ case SCHED_ALLOC_QNODE_POOL: /* fall through */ case SCHED_ALLOC_OK: break; } return alloc_status; } /* * Create a scheduler on the current lcore */ struct lthread_sched *_lthread_sched_create(size_t stack_size) { int status; struct lthread_sched *new_sched; unsigned lcoreid = rte_lcore_id(); RTE_ASSERT(stack_size <= LTHREAD_MAX_STACK_SIZE); if (stack_size == 0) stack_size = LTHREAD_MAX_STACK_SIZE; new_sched = rte_calloc_socket(NULL, 1, sizeof(struct lthread_sched), RTE_CACHE_LINE_SIZE, rte_socket_id()); if (new_sched == NULL) { RTE_LOG(CRIT, LTHREAD, "Failed to allocate memory for scheduler\n"); return NULL; } _lthread_key_pool_init(); new_sched->stack_size = stack_size; new_sched->birth = rte_rdtsc(); THIS_SCHED = new_sched; status = _lthread_sched_alloc_resources(new_sched); if (status != SCHED_ALLOC_OK) { RTE_LOG(CRIT, LTHREAD, "Failed to allocate resources for scheduler code = %d\n", status); rte_free(new_sched); return NULL; } bzero(&new_sched->ctx, sizeof(struct ctx)); new_sched->lcore_id = lcoreid; schedcore[lcoreid] = new_sched; new_sched->run_flag = 1; DIAG_EVENT(new_sched, LT_DIAG_SCHED_CREATE, rte_lcore_id(), 0); rte_wmb(); return new_sched; } /* * Set the number of schedulers in the system */ int lthread_num_schedulers_set(int num) { rte_atomic16_set(&num_schedulers, num); return (int)rte_atomic16_read(&num_schedulers); } /* * Return the number of schedulers active */ int lthread_active_schedulers(void) { return (int)rte_atomic16_read(&active_schedulers); } /** * shutdown the scheduler running on the specified lcore */ void lthread_scheduler_shutdown(unsigned lcoreid) { uint64_t coreid = (uint64_t) lcoreid; if (coreid < LTHREAD_MAX_LCORES) { if (schedcore[coreid] != NULL) schedcore[coreid]->run_flag = 0; } } /** * shutdown all schedulers */ void lthread_scheduler_shutdown_all(void) { uint64_t i; /* * give time for all schedulers to have started * Note we use sched_yield() rather than pthread_yield() to allow * for the possibility of a pthread wrapper on lthread_yield(), * something that is not possible unless the scheduler is running. */ while (rte_atomic16_read(&active_schedulers) < rte_atomic16_read(&num_schedulers)) sched_yield(); for (i = 0; i < LTHREAD_MAX_LCORES; i++) { if (schedcore[i] != NULL) schedcore[i]->run_flag = 0; } } /* * Resume a suspended lthread */ static __rte_always_inline void _lthread_resume(struct lthread *lt); static inline void _lthread_resume(struct lthread *lt) { struct lthread_sched *sched = THIS_SCHED; struct lthread_stack *s; uint64_t state = lt->state; #if LTHREAD_DIAG int init = 0; #endif sched->current_lthread = lt; if (state & (BIT(ST_LT_CANCELLED) | BIT(ST_LT_EXITED))) { /* if detached we can free the thread now */ if (state & BIT(ST_LT_DETACH)) { _lthread_free(lt); sched->current_lthread = NULL; return; } } if (state & BIT(ST_LT_INIT)) { /* first time this thread has been run */ /* assign thread to this scheduler */ lt->sched = THIS_SCHED; /* allocate stack */ s = _stack_alloc(); lt->stack_container = s; _lthread_set_stack(lt, s->stack, s->stack_size); /* allocate memory for TLS used by this thread */ _lthread_tls_alloc(lt); lt->state = BIT(ST_LT_READY); #if LTHREAD_DIAG init = 1; #endif } DIAG_EVENT(lt, LT_DIAG_LTHREAD_RESUMED, init, lt); /* switch to the new thread */ ctx_switch(<->ctx, &sched->ctx); /* If posting to a queue that could be read by another lcore * we defer the queue write till now to ensure the context has been * saved before the other core tries to resume it * This applies to blocking on mutex, cond, and to set_affinity */ if (lt->pending_wr_queue != NULL) { struct lthread_queue *dest = lt->pending_wr_queue; lt->pending_wr_queue = NULL; /* queue the current thread to the specified queue */ _lthread_queue_insert_mp(dest, lt); } sched->current_lthread = NULL; } /* * Handle sleep timer expiry */ void _sched_timer_cb(struct rte_timer *tim, void *arg) { struct lthread *lt = (struct lthread *) arg; uint64_t state = lt->state; DIAG_EVENT(lt, LT_DIAG_LTHREAD_TMR_EXPIRED, <->tim, 0); rte_timer_stop(tim); if (lt->state & BIT(ST_LT_CANCELLED)) (THIS_SCHED)->nb_blocked_threads--; lt->state = state | BIT(ST_LT_EXPIRED); _lthread_resume(lt); lt->state = state & CLEARBIT(ST_LT_EXPIRED); } /* * Returns 0 if there is a pending job in scheduler or 1 if done and can exit. */ static inline int _lthread_sched_isdone(struct lthread_sched *sched) { return (sched->run_flag == 0) && (_lthread_queue_empty(sched->ready)) && (_lthread_queue_empty(sched->pready)) && (sched->nb_blocked_threads == 0); } /* * Wait for all schedulers to start */ static inline void _lthread_schedulers_sync_start(void) { rte_atomic16_inc(&active_schedulers); /* wait for lthread schedulers * Note we use sched_yield() rather than pthread_yield() to allow * for the possibility of a pthread wrapper on lthread_yield(), * something that is not possible unless the scheduler is running. */ while (rte_atomic16_read(&active_schedulers) < rte_atomic16_read(&num_schedulers)) sched_yield(); } /* * Wait for all schedulers to stop */ static inline void _lthread_schedulers_sync_stop(void) { rte_atomic16_dec(&active_schedulers); rte_atomic16_dec(&num_schedulers); /* wait for schedulers * Note we use sched_yield() rather than pthread_yield() to allow * for the possibility of a pthread wrapper on lthread_yield(), * something that is not possible unless the scheduler is running. */ while (rte_atomic16_read(&active_schedulers) > 0) sched_yield(); } /* * Run the lthread scheduler * This loop is the heart of the system */ void lthread_run(void) { struct lthread_sched *sched = THIS_SCHED; struct lthread *lt = NULL; RTE_LOG(INFO, LTHREAD, "starting scheduler %p on lcore %u phys core %u\n", sched, rte_lcore_id(), rte_lcore_index(rte_lcore_id())); /* if more than one, wait for all schedulers to start */ _lthread_schedulers_sync_start(); /* * This is the main scheduling loop * So long as there are tasks in existence we run this loop. * We check for:- * expired timers, * the local ready queue, * and the peer ready queue, * * and resume lthreads ad infinitum. */ while (!_lthread_sched_isdone(sched)) { rte_timer_manage(); lt = _lthread_queue_poll(sched->ready); if (lt != NULL) _lthread_resume(lt); lt = _lthread_queue_poll(sched->pready); if (lt != NULL) _lthread_resume(lt); } /* if more than one wait for all schedulers to stop */ _lthread_schedulers_sync_stop(); (THIS_SCHED) = NULL; RTE_LOG(INFO, LTHREAD, "stopping scheduler %p on lcore %u phys core %u\n", sched, rte_lcore_id(), rte_lcore_index(rte_lcore_id())); fflush(stdout); } /* * Return the scheduler for this lcore * */ struct lthread_sched *_lthread_sched_get(unsigned int lcore_id) { struct lthread_sched *res = NULL; if (lcore_id < LTHREAD_MAX_LCORES) res = schedcore[lcore_id]; return res; } /* * migrate the current thread to another scheduler running * on the specified lcore. */ int lthread_set_affinity(unsigned lcoreid) { struct lthread *lt = THIS_LTHREAD; struct lthread_sched *dest_sched; if (unlikely(lcoreid >= LTHREAD_MAX_LCORES)) return POSIX_ERRNO(EINVAL); DIAG_EVENT(lt, LT_DIAG_LTHREAD_AFFINITY, lcoreid, 0); dest_sched = schedcore[lcoreid]; if (unlikely(dest_sched == NULL)) return POSIX_ERRNO(EINVAL); if (likely(dest_sched != THIS_SCHED)) { lt->sched = dest_sched; lt->pending_wr_queue = dest_sched->pready; _affinitize(); return 0; } return 0; }