diff options
Diffstat (limited to 'src/framework/ring')
-rw-r--r-- | src/framework/ring/dmm_ring.c | 94 | ||||
-rw-r--r-- | src/framework/ring/dmm_ring.h | 205 | ||||
-rw-r--r-- | src/framework/ring/dmm_ring_base.h | 189 |
3 files changed, 488 insertions, 0 deletions
diff --git a/src/framework/ring/dmm_ring.c b/src/framework/ring/dmm_ring.c new file mode 100644 index 0000000..2b1b1ee --- /dev/null +++ b/src/framework/ring/dmm_ring.c @@ -0,0 +1,94 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include <stdio.h> +#include <string.h> + +#include "nstack_log.h" + +#include "dmm_ring.h" + +void +dmm_ring_dump (struct dmm_ring *ring, int list) +{ + if (!ring) + return; + + (void) printf ("ring:%p size:%d flag:0x%x prod:%d-%d/%d cons:%d-%d/%d", + ring, ring->size, ring->flag, + ring->prod_head, ring->prod_tail, ring->is_sp, + ring->cons_head, ring->cons_tail, ring->is_sc); + if (list && dmm_ring_count (ring)) + { + int count = 0; + void **p = (void **) (ring + 1); + int i = ring->cons_head; + while (i != ring->prod_tail) + { + if ((count++ & 3) == 0) + (void) printf ("\n"); + (void) printf (" %d:%p", i, p[i]); + if (++i >= ring->size) + i = 0; + } + } + (void) printf ("\n----------------\n"); +} + +int +dmm_ring_init (struct dmm_ring *ring, int num, int flag) +{ + if (num > DMM_RING_MAX_NUM) + return -1; + + (void) memset (ring, 0, sizeof (struct dmm_ring)); + + ring->size = num + 1; + + ring->prod_head = 0; + ring->prod_tail = 0; + ring->is_sp = flag & DMM_RING_INIT_SP; + + ring->cons_head = 0; + ring->cons_tail = 0; + ring->is_sc = flag & DMM_RING_INIT_SC; + + return 0; +} + +int +dmm_pool_init (struct dmm_ring *pool, size_t elt_size, int num, uint32_t flag) +{ + int ret; + void *array; + const size_t ring_size = dmm_ring_bufsize (num); + + if (0 != dmm_ring_init (pool, num, flag)) + { + NSFW_LOGERR ("init pool's ring failed, num:%d flag:0x%x ring_size:%lu", + num, flag, ring_size); + return -1; + } + + array = (char *) pool + ring_size; + ret = dmm_array_enqueue (pool, array, num, elt_size); + if (ret != num) + { + NSFW_LOGERR ("enqueue failed, num:%d elt_size:%lu", num, elt_size); + return -1; + } + + return 0; +} diff --git a/src/framework/ring/dmm_ring.h b/src/framework/ring/dmm_ring.h new file mode 100644 index 0000000..914eff2 --- /dev/null +++ b/src/framework/ring/dmm_ring.h @@ -0,0 +1,205 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef _DMM_RING_H_ +#define _DMM_RING_H_ + +#include <stdint.h> + +#include "dmm_atomic.h" +#include "dmm_pause.h" +#include "dmm_barrier.h" +#include "dmm_common.h" + +#define DMM_RING_MAX_NUM 0x7FFFFFFE /* 2,147,483,646 */ + +#define DMM_RING_INIT_SP 0x0001 +#define DMM_RING_INIT_SC 0x0002 +#define DMM_RING_INIT_MPMC 0 +#define DMM_RING_INIT_SPSC (DMM_RING_INIT_SP | DMM_RING_INIT_SC) + +struct dmm_ring +{ + int size; + uint32_t flag; + + volatile int prod_head; + volatile int prod_tail; + int is_sp; + int _prod_pad; + + volatile int cons_head; + volatile int cons_tail; + int is_sc; + int _cons_pad; +} _dmm_cache_aligned; + +#include "dmm_ring_base.h" + +void dmm_ring_dump (struct dmm_ring *ring, int list); +int dmm_ring_init (struct dmm_ring *ring, int num, int flag); +int dmm_pool_init (struct dmm_ring *pool, size_t elt_size, + int num, uint32_t flag); + +inline static size_t +dmm_ring_bufsize (int num) +{ + size_t size = sizeof (struct dmm_ring); + + size += (sizeof (void *) * (num + 1)); + + return dmm_align (size, DMM_CACHE_LINE_SIZE); +} + +inline static size_t +dmm_pool_arraysize (int num, int elt_size) +{ + const size_t size = elt_size * num; + return dmm_align (size, DMM_CACHE_LINE_SIZE); +} + +inline static size_t +dmm_pool_bufsize (int num, int elt_size) +{ + return dmm_ring_bufsize (num) + dmm_pool_arraysize (num, elt_size); +} + +inline static int +dmm_ring_count (struct dmm_ring *ring) +{ + const int count = ring->prod_tail - ring->cons_head; + + if (count >= 0) + return count; + return count + ring->size; +} + +inline static int +dmm_ring_free_count (struct dmm_ring *ring) +{ + const int count = ring->cons_tail - ring->prod_head; + + if (count >= 0) + return count; + return count + ring->size; +} + +/* enqueue several objects in a ring. + ring: point to the ring + p: object pointer list array + num: number of object + fixed: enqueue fixed number +single_cons: is single producer + <return>: number of enqueued object, 0: queue is full +*/ +inline static int +dmm_ring_enqueue (struct dmm_ring *ring, void **p, int num, + int fixed, int is_sp) +{ + int n, pos, new_pos; + + n = dmm_enqueue_prep (ring, num, &pos, fixed, is_sp); + + if (n == 0) + return 0; + + new_pos = dmm_enqueue_copy (ring, pos, p, n); + + dmm_enqueue_done (ring, pos, new_pos, is_sp); + + return n; +} + +inline static int +dmm_fix_enqueue (struct dmm_ring *ring, void **p, int num) +{ + return dmm_ring_enqueue (ring, p, num, 1, ring->is_sp); +} + +inline static int +dmm_var_enqueue (struct dmm_ring *ring, void **p, int num) +{ + return dmm_ring_enqueue (ring, p, num, 0, ring->is_sp); +} + +inline static int +dmm_enqueue (struct dmm_ring *ring, void *p) +{ + return dmm_var_enqueue (ring, &p, 1); +} + +inline static int +dmm_array_enqueue (struct dmm_ring *ring, void *array, + int num, size_t elt_size) +{ + int n, pos, new_pos; + + n = dmm_enqueue_prep (ring, num, &pos, 0, ring->is_sp); + + if (n == 0) + return 0; + + new_pos = dmm_enqueue_copy_array (ring, pos, array, n, elt_size); + + dmm_enqueue_done (ring, pos, new_pos, ring->is_sp); + + return n; +} + +/* dequeue several objects from a ring. + ring: point to the ring + p: save object array + num: number of p + fixed: dequeue fixed number +single_cons: is single consumer + <return>: number of dequeued object, 0: queue is empty +*/ +inline static int +dmm_ring_dequeue (struct dmm_ring *ring, void **p, int num, + int fixed, int single_cons) +{ + int n, pos, new_pos; + + n = dmm_dequeue_prep (ring, num, &pos, fixed, single_cons); + + if (n == 0) + return 0; + + new_pos = dmm_dequeue_copy (ring, pos, p, n); + + dmm_dequeue_done (ring, pos, new_pos, single_cons); + + return n; +} + +inline static int +dmm_fix_dequeue (struct dmm_ring *ring, void **p, int num) +{ + return dmm_ring_dequeue (ring, p, num, 1, ring->is_sc); +} + +inline static int +dmm_var_dequeue (struct dmm_ring *ring, void **p, int num) +{ + return dmm_ring_dequeue (ring, p, num, 0, ring->is_sc); +} + +inline static int +dmm_dequeue (struct dmm_ring *ring, void **p) +{ + return dmm_var_dequeue (ring, p, 1); +} + +#endif diff --git a/src/framework/ring/dmm_ring_base.h b/src/framework/ring/dmm_ring_base.h new file mode 100644 index 0000000..9a48eea --- /dev/null +++ b/src/framework/ring/dmm_ring_base.h @@ -0,0 +1,189 @@ +/* +* +* Copyright (c) 2018 Huawei Technologies Co.,Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef _DMM_RING_BASE_H_ +#define _DMM_RING_BASE_H_ + +#ifndef _DMM_RING_H_ +#error include dmm_ring.h please +#endif + +inline static int +__move_head (volatile int *head, int ring_size, + int old_head, int real_num, int is_single) +{ + int new_head = old_head + real_num; + + if (new_head >= ring_size) + { + new_head -= ring_size; + } + + if (is_single) + { + *head = new_head; + return 1; + } + + return dmm_atomic_swap ((dmm_atomic_t *) head, old_head, new_head); +} + +inline static int +dmm_enqueue_prep (struct dmm_ring *ring, const int num, + int *pos, const int fixed, int is_sp) +{ + int succ, real, head; + const int ring_size = ring->size; + + do + { + head = ring->prod_head; + + dmm_barrier (); + + if ((real = ring->cons_tail - head - 1) < 0) + real += ring_size; + + if (real >= num) + real = num; + else if (fixed) + return 0; + + if (real <= 0) + return 0; + + succ = __move_head (&ring->prod_head, ring_size, head, real, is_sp); + } + while (!succ); + + *pos = head; + return real; +} + +inline static int +dmm_enqueue_copy (struct dmm_ring *ring, int pos, void **from, int num) +{ + const int ring_size = ring->size; + void **box = (void **) (ring + 1); + + while (num > 0) + { + box[pos++] = *from++; + if (pos >= ring_size) + pos = 0; + --num; + } + + return pos; +} + +inline static void +dmm_enqueue_done (struct dmm_ring *ring, int pos, int new_pos, int is_sp) +{ + dmm_barrier (); + + if (!is_sp) + { + while (ring->prod_tail != pos) + dmm_pause (); + } + + ring->prod_tail = new_pos; +} + +inline static int +dmm_enqueue_copy_array (struct dmm_ring *ring, int pos, + void *array, int num, size_t elt_size) +{ + const int ring_size = ring->size; + void **box = (void **) (ring + 1); + char *from = (char *) array; + + while (num > 0) + { + box[pos++] = from; + if (pos >= ring_size) + pos = 0; + from += elt_size; + --num; + } + + return pos; +} + +inline static int +dmm_dequeue_prep (struct dmm_ring *ring, const int num, int *pos, + const int fixed, int is_sc) +{ + int succ, real, head; + const int ring_size = ring->size; + + do + { + head = ring->cons_head; + + dmm_barrier (); + + if ((real = ring->prod_tail - head) < 0) + real += ring_size; + + if (real >= num) + real = num; + else if (fixed) + return 0; + + if (real <= 0) + return 0; + + succ = __move_head (&ring->cons_head, ring_size, head, real, is_sc); + } + while (!succ); + + *pos = head; + return real; +} + +inline static int +dmm_dequeue_copy (struct dmm_ring *ring, int pos, void **to, int num) +{ + const int ring_size = ring->size; + void **box = (void **) (ring + 1); + + while (num > 0) + { + *to++ = box[pos++]; + if (pos >= ring_size) + pos = 0; + --num; + } + + return pos; +} + +inline static void +dmm_dequeue_done (struct dmm_ring *ring, int pos, int new_pos, int is_sc) +{ + dmm_barrier (); + + if (!is_sc) + { + while (ring->cons_tail != pos) + dmm_pause (); + } + + ring->cons_tail = new_pos; +} + +#endif |