diff options
Diffstat (limited to 'drivers/mempool/bucket/rte_mempool_bucket.c')
-rw-r--r-- | drivers/mempool/bucket/rte_mempool_bucket.c | 628 |
1 files changed, 628 insertions, 0 deletions
diff --git a/drivers/mempool/bucket/rte_mempool_bucket.c b/drivers/mempool/bucket/rte_mempool_bucket.c new file mode 100644 index 00000000..78d2b9d0 --- /dev/null +++ b/drivers/mempool/bucket/rte_mempool_bucket.c @@ -0,0 +1,628 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2017-2018 Solarflare Communications Inc. + * All rights reserved. + * + * This software was jointly developed between OKTET Labs (under contract + * for Solarflare) and Solarflare Communications, Inc. + */ + +#include <stdbool.h> +#include <stdio.h> +#include <string.h> + +#include <rte_errno.h> +#include <rte_ring.h> +#include <rte_mempool.h> +#include <rte_malloc.h> + +/* + * The general idea of the bucket mempool driver is as follows. + * We keep track of physically contiguous groups (buckets) of objects + * of a certain size. Every such a group has a counter that is + * incremented every time an object from that group is enqueued. + * Until the bucket is full, no objects from it are eligible for allocation. + * If a request is made to dequeue a multiply of bucket size, it is + * satisfied by returning the whole buckets, instead of separate objects. + */ + + +struct bucket_header { + unsigned int lcore_id; + uint8_t fill_cnt; +}; + +struct bucket_stack { + unsigned int top; + unsigned int limit; + void *objects[]; +}; + +struct bucket_data { + unsigned int header_size; + unsigned int total_elt_size; + unsigned int obj_per_bucket; + unsigned int bucket_stack_thresh; + uintptr_t bucket_page_mask; + struct rte_ring *shared_bucket_ring; + struct bucket_stack *buckets[RTE_MAX_LCORE]; + /* + * Multi-producer single-consumer ring to hold objects that are + * returned to the mempool at a different lcore than initially + * dequeued + */ + struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE]; + struct rte_ring *shared_orphan_ring; + struct rte_mempool *pool; + unsigned int bucket_mem_size; +}; + +static struct bucket_stack * +bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts) +{ + struct bucket_stack *stack; + + stack = rte_zmalloc_socket("bucket_stack", + sizeof(struct bucket_stack) + + n_elts * sizeof(void *), + RTE_CACHE_LINE_SIZE, + mp->socket_id); + if (stack == NULL) + return NULL; + stack->limit = n_elts; + stack->top = 0; + + return stack; +} + +static void +bucket_stack_push(struct bucket_stack *stack, void *obj) +{ + RTE_ASSERT(stack->top < stack->limit); + stack->objects[stack->top++] = obj; +} + +static void * +bucket_stack_pop_unsafe(struct bucket_stack *stack) +{ + RTE_ASSERT(stack->top > 0); + return stack->objects[--stack->top]; +} + +static void * +bucket_stack_pop(struct bucket_stack *stack) +{ + if (stack->top == 0) + return NULL; + return bucket_stack_pop_unsafe(stack); +} + +static int +bucket_enqueue_single(struct bucket_data *bd, void *obj) +{ + int rc = 0; + uintptr_t addr = (uintptr_t)obj; + struct bucket_header *hdr; + unsigned int lcore_id = rte_lcore_id(); + + addr &= bd->bucket_page_mask; + hdr = (struct bucket_header *)addr; + + if (likely(hdr->lcore_id == lcore_id)) { + if (hdr->fill_cnt < bd->obj_per_bucket - 1) { + hdr->fill_cnt++; + } else { + hdr->fill_cnt = 0; + /* Stack is big enough to put all buckets */ + bucket_stack_push(bd->buckets[lcore_id], hdr); + } + } else if (hdr->lcore_id != LCORE_ID_ANY) { + struct rte_ring *adopt_ring = + bd->adoption_buffer_rings[hdr->lcore_id]; + + rc = rte_ring_enqueue(adopt_ring, obj); + /* Ring is big enough to put all objects */ + RTE_ASSERT(rc == 0); + } else if (hdr->fill_cnt < bd->obj_per_bucket - 1) { + hdr->fill_cnt++; + } else { + hdr->fill_cnt = 0; + rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr); + /* Ring is big enough to put all buckets */ + RTE_ASSERT(rc == 0); + } + + return rc; +} + +static int +bucket_enqueue(struct rte_mempool *mp, void * const *obj_table, + unsigned int n) +{ + struct bucket_data *bd = mp->pool_data; + struct bucket_stack *local_stack = bd->buckets[rte_lcore_id()]; + unsigned int i; + int rc = 0; + + for (i = 0; i < n; i++) { + rc = bucket_enqueue_single(bd, obj_table[i]); + RTE_ASSERT(rc == 0); + } + if (local_stack->top > bd->bucket_stack_thresh) { + rte_ring_enqueue_bulk(bd->shared_bucket_ring, + &local_stack->objects + [bd->bucket_stack_thresh], + local_stack->top - + bd->bucket_stack_thresh, + NULL); + local_stack->top = bd->bucket_stack_thresh; + } + return rc; +} + +static void ** +bucket_fill_obj_table(const struct bucket_data *bd, void **pstart, + void **obj_table, unsigned int n) +{ + unsigned int i; + uint8_t *objptr = *pstart; + + for (objptr += bd->header_size, i = 0; i < n; + i++, objptr += bd->total_elt_size) + *obj_table++ = objptr; + *pstart = objptr; + return obj_table; +} + +static int +bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table, + unsigned int n_orphans) +{ + unsigned int i; + int rc; + uint8_t *objptr; + + rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table, + n_orphans, NULL); + if (unlikely(rc != (int)n_orphans)) { + struct bucket_header *hdr; + + objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]); + hdr = (struct bucket_header *)objptr; + + if (objptr == NULL) { + rc = rte_ring_dequeue(bd->shared_bucket_ring, + (void **)&objptr); + if (rc != 0) { + rte_errno = ENOBUFS; + return -rte_errno; + } + hdr = (struct bucket_header *)objptr; + hdr->lcore_id = rte_lcore_id(); + } + hdr->fill_cnt = 0; + bucket_fill_obj_table(bd, (void **)&objptr, obj_table, + n_orphans); + for (i = n_orphans; i < bd->obj_per_bucket; i++, + objptr += bd->total_elt_size) { + rc = rte_ring_enqueue(bd->shared_orphan_ring, + objptr); + if (rc != 0) { + RTE_ASSERT(0); + rte_errno = -rc; + return rc; + } + } + } + + return 0; +} + +static int +bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table, + unsigned int n_buckets) +{ + struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()]; + unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top); + void **obj_table_base = obj_table; + + n_buckets -= n_buckets_from_stack; + while (n_buckets_from_stack-- > 0) { + void *obj = bucket_stack_pop_unsafe(cur_stack); + + obj_table = bucket_fill_obj_table(bd, &obj, obj_table, + bd->obj_per_bucket); + } + while (n_buckets-- > 0) { + struct bucket_header *hdr; + + if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring, + (void **)&hdr) != 0)) { + /* + * Return the already-dequeued buffers + * back to the mempool + */ + bucket_enqueue(bd->pool, obj_table_base, + obj_table - obj_table_base); + rte_errno = ENOBUFS; + return -rte_errno; + } + hdr->lcore_id = rte_lcore_id(); + obj_table = bucket_fill_obj_table(bd, (void **)&hdr, + obj_table, + bd->obj_per_bucket); + } + + return 0; +} + +static int +bucket_adopt_orphans(struct bucket_data *bd) +{ + int rc = 0; + struct rte_ring *adopt_ring = + bd->adoption_buffer_rings[rte_lcore_id()]; + + if (unlikely(!rte_ring_empty(adopt_ring))) { + void *orphan; + + while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) { + rc = bucket_enqueue_single(bd, orphan); + RTE_ASSERT(rc == 0); + } + } + return rc; +} + +static int +bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n) +{ + struct bucket_data *bd = mp->pool_data; + unsigned int n_buckets = n / bd->obj_per_bucket; + unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket; + int rc = 0; + + bucket_adopt_orphans(bd); + + if (unlikely(n_orphans > 0)) { + rc = bucket_dequeue_orphans(bd, obj_table + + (n_buckets * bd->obj_per_bucket), + n_orphans); + if (rc != 0) + return rc; + } + + if (likely(n_buckets > 0)) { + rc = bucket_dequeue_buckets(bd, obj_table, n_buckets); + if (unlikely(rc != 0) && n_orphans > 0) { + rte_ring_enqueue_bulk(bd->shared_orphan_ring, + obj_table + (n_buckets * + bd->obj_per_bucket), + n_orphans, NULL); + } + } + + return rc; +} + +static int +bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table, + unsigned int n) +{ + struct bucket_data *bd = mp->pool_data; + const uint32_t header_size = bd->header_size; + struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()]; + unsigned int n_buckets_from_stack = RTE_MIN(n, cur_stack->top); + struct bucket_header *hdr; + void **first_objp = first_obj_table; + + bucket_adopt_orphans(bd); + + n -= n_buckets_from_stack; + while (n_buckets_from_stack-- > 0) { + hdr = bucket_stack_pop_unsafe(cur_stack); + *first_objp++ = (uint8_t *)hdr + header_size; + } + if (n > 0) { + if (unlikely(rte_ring_dequeue_bulk(bd->shared_bucket_ring, + first_objp, n, NULL) != n)) { + /* Return the already dequeued buckets */ + while (first_objp-- != first_obj_table) { + bucket_stack_push(cur_stack, + (uint8_t *)*first_objp - + header_size); + } + rte_errno = ENOBUFS; + return -rte_errno; + } + while (n-- > 0) { + hdr = (struct bucket_header *)*first_objp; + hdr->lcore_id = rte_lcore_id(); + *first_objp++ = (uint8_t *)hdr + header_size; + } + } + + return 0; +} + +static void +count_underfilled_buckets(struct rte_mempool *mp, + void *opaque, + struct rte_mempool_memhdr *memhdr, + __rte_unused unsigned int mem_idx) +{ + unsigned int *pcount = opaque; + const struct bucket_data *bd = mp->pool_data; + unsigned int bucket_page_sz = + (unsigned int)(~bd->bucket_page_mask + 1); + uintptr_t align; + uint8_t *iter; + + align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) - + (uintptr_t)memhdr->addr; + + for (iter = (uint8_t *)memhdr->addr + align; + iter < (uint8_t *)memhdr->addr + memhdr->len; + iter += bucket_page_sz) { + struct bucket_header *hdr = (struct bucket_header *)iter; + + *pcount += hdr->fill_cnt; + } +} + +static unsigned int +bucket_get_count(const struct rte_mempool *mp) +{ + const struct bucket_data *bd = mp->pool_data; + unsigned int count = + bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) + + rte_ring_count(bd->shared_orphan_ring); + unsigned int i; + + for (i = 0; i < RTE_MAX_LCORE; i++) { + if (!rte_lcore_is_enabled(i)) + continue; + count += bd->obj_per_bucket * bd->buckets[i]->top + + rte_ring_count(bd->adoption_buffer_rings[i]); + } + + rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp, + count_underfilled_buckets, &count); + + return count; +} + +static int +bucket_alloc(struct rte_mempool *mp) +{ + int rg_flags = 0; + int rc = 0; + char rg_name[RTE_RING_NAMESIZE]; + struct bucket_data *bd; + unsigned int i; + unsigned int bucket_header_size; + + bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd), + RTE_CACHE_LINE_SIZE, mp->socket_id); + if (bd == NULL) { + rc = -ENOMEM; + goto no_mem_for_data; + } + bd->pool = mp; + if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN) + bucket_header_size = sizeof(struct bucket_header); + else + bucket_header_size = RTE_CACHE_LINE_SIZE; + RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE); + bd->header_size = mp->header_size + bucket_header_size; + bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size; + bd->bucket_mem_size = RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024; + bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) / + bd->total_elt_size; + bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1); + /* eventually this should be a tunable parameter */ + bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3; + + if (mp->flags & MEMPOOL_F_SP_PUT) + rg_flags |= RING_F_SP_ENQ; + if (mp->flags & MEMPOOL_F_SC_GET) + rg_flags |= RING_F_SC_DEQ; + + for (i = 0; i < RTE_MAX_LCORE; i++) { + if (!rte_lcore_is_enabled(i)) + continue; + bd->buckets[i] = + bucket_stack_create(mp, mp->size / bd->obj_per_bucket); + if (bd->buckets[i] == NULL) { + rc = -ENOMEM; + goto no_mem_for_stacks; + } + rc = snprintf(rg_name, sizeof(rg_name), + RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i); + if (rc < 0 || rc >= (int)sizeof(rg_name)) { + rc = -ENAMETOOLONG; + goto no_mem_for_stacks; + } + bd->adoption_buffer_rings[i] = + rte_ring_create(rg_name, rte_align32pow2(mp->size + 1), + mp->socket_id, + rg_flags | RING_F_SC_DEQ); + if (bd->adoption_buffer_rings[i] == NULL) { + rc = -rte_errno; + goto no_mem_for_stacks; + } + } + + rc = snprintf(rg_name, sizeof(rg_name), + RTE_MEMPOOL_MZ_FORMAT ".0", mp->name); + if (rc < 0 || rc >= (int)sizeof(rg_name)) { + rc = -ENAMETOOLONG; + goto invalid_shared_orphan_ring; + } + bd->shared_orphan_ring = + rte_ring_create(rg_name, rte_align32pow2(mp->size + 1), + mp->socket_id, rg_flags); + if (bd->shared_orphan_ring == NULL) { + rc = -rte_errno; + goto cannot_create_shared_orphan_ring; + } + + rc = snprintf(rg_name, sizeof(rg_name), + RTE_MEMPOOL_MZ_FORMAT ".1", mp->name); + if (rc < 0 || rc >= (int)sizeof(rg_name)) { + rc = -ENAMETOOLONG; + goto invalid_shared_bucket_ring; + } + bd->shared_bucket_ring = + rte_ring_create(rg_name, + rte_align32pow2((mp->size + 1) / + bd->obj_per_bucket), + mp->socket_id, rg_flags); + if (bd->shared_bucket_ring == NULL) { + rc = -rte_errno; + goto cannot_create_shared_bucket_ring; + } + + mp->pool_data = bd; + + return 0; + +cannot_create_shared_bucket_ring: +invalid_shared_bucket_ring: + rte_ring_free(bd->shared_orphan_ring); +cannot_create_shared_orphan_ring: +invalid_shared_orphan_ring: +no_mem_for_stacks: + for (i = 0; i < RTE_MAX_LCORE; i++) { + rte_free(bd->buckets[i]); + rte_ring_free(bd->adoption_buffer_rings[i]); + } + rte_free(bd); +no_mem_for_data: + rte_errno = -rc; + return rc; +} + +static void +bucket_free(struct rte_mempool *mp) +{ + unsigned int i; + struct bucket_data *bd = mp->pool_data; + + if (bd == NULL) + return; + + for (i = 0; i < RTE_MAX_LCORE; i++) { + rte_free(bd->buckets[i]); + rte_ring_free(bd->adoption_buffer_rings[i]); + } + + rte_ring_free(bd->shared_orphan_ring); + rte_ring_free(bd->shared_bucket_ring); + + rte_free(bd); +} + +static ssize_t +bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num, + __rte_unused uint32_t pg_shift, size_t *min_total_elt_size, + size_t *align) +{ + struct bucket_data *bd = mp->pool_data; + unsigned int bucket_page_sz; + + if (bd == NULL) + return -EINVAL; + + bucket_page_sz = rte_align32pow2(bd->bucket_mem_size); + *align = bucket_page_sz; + *min_total_elt_size = bucket_page_sz; + /* + * Each bucket occupies its own block aligned to + * bucket_page_sz, so the required amount of memory is + * a multiple of bucket_page_sz. + * We also need extra space for a bucket header + */ + return ((obj_num + bd->obj_per_bucket - 1) / + bd->obj_per_bucket) * bucket_page_sz; +} + +static int +bucket_populate(struct rte_mempool *mp, unsigned int max_objs, + void *vaddr, rte_iova_t iova, size_t len, + rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg) +{ + struct bucket_data *bd = mp->pool_data; + unsigned int bucket_page_sz; + unsigned int bucket_header_sz; + unsigned int n_objs; + uintptr_t align; + uint8_t *iter; + int rc; + + if (bd == NULL) + return -EINVAL; + + bucket_page_sz = rte_align32pow2(bd->bucket_mem_size); + align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) - + (uintptr_t)vaddr; + + bucket_header_sz = bd->header_size - mp->header_size; + if (iova != RTE_BAD_IOVA) + iova += align + bucket_header_sz; + + for (iter = (uint8_t *)vaddr + align, n_objs = 0; + iter < (uint8_t *)vaddr + len && n_objs < max_objs; + iter += bucket_page_sz) { + struct bucket_header *hdr = (struct bucket_header *)iter; + unsigned int chunk_len = bd->bucket_mem_size; + + if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len) + chunk_len = len - (iter - (uint8_t *)vaddr); + if (chunk_len <= bucket_header_sz) + break; + chunk_len -= bucket_header_sz; + + hdr->fill_cnt = 0; + hdr->lcore_id = LCORE_ID_ANY; + rc = rte_mempool_op_populate_default(mp, + RTE_MIN(bd->obj_per_bucket, + max_objs - n_objs), + iter + bucket_header_sz, + iova, chunk_len, + obj_cb, obj_cb_arg); + if (rc < 0) + return rc; + n_objs += rc; + if (iova != RTE_BAD_IOVA) + iova += bucket_page_sz; + } + + return n_objs; +} + +static int +bucket_get_info(const struct rte_mempool *mp, struct rte_mempool_info *info) +{ + struct bucket_data *bd = mp->pool_data; + + info->contig_block_size = bd->obj_per_bucket; + return 0; +} + + +static const struct rte_mempool_ops ops_bucket = { + .name = "bucket", + .alloc = bucket_alloc, + .free = bucket_free, + .enqueue = bucket_enqueue, + .dequeue = bucket_dequeue, + .get_count = bucket_get_count, + .calc_mem_size = bucket_calc_mem_size, + .populate = bucket_populate, + .get_info = bucket_get_info, + .dequeue_contig_blocks = bucket_dequeue_contig_blocks, +}; + + +MEMPOOL_REGISTER_OPS(ops_bucket); |