aboutsummaryrefslogtreecommitdiffstats
path: root/src/framework/ring
diff options
context:
space:
mode:
Diffstat (limited to 'src/framework/ring')
-rw-r--r--src/framework/ring/dmm_ring.c94
-rw-r--r--src/framework/ring/dmm_ring.h205
-rw-r--r--src/framework/ring/dmm_ring_base.h189
3 files changed, 488 insertions, 0 deletions
diff --git a/src/framework/ring/dmm_ring.c b/src/framework/ring/dmm_ring.c
new file mode 100644
index 0000000..2b1b1ee
--- /dev/null
+++ b/src/framework/ring/dmm_ring.c
@@ -0,0 +1,94 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <stdio.h>
+#include <string.h>
+
+#include "nstack_log.h"
+
+#include "dmm_ring.h"
+
+void
+dmm_ring_dump (struct dmm_ring *ring, int list)
+{
+ if (!ring)
+ return;
+
+ (void) printf ("ring:%p size:%d flag:0x%x prod:%d-%d/%d cons:%d-%d/%d",
+ ring, ring->size, ring->flag,
+ ring->prod_head, ring->prod_tail, ring->is_sp,
+ ring->cons_head, ring->cons_tail, ring->is_sc);
+ if (list && dmm_ring_count (ring))
+ {
+ int count = 0;
+ void **p = (void **) (ring + 1);
+ int i = ring->cons_head;
+ while (i != ring->prod_tail)
+ {
+ if ((count++ & 3) == 0)
+ (void) printf ("\n");
+ (void) printf (" %d:%p", i, p[i]);
+ if (++i >= ring->size)
+ i = 0;
+ }
+ }
+ (void) printf ("\n----------------\n");
+}
+
+int
+dmm_ring_init (struct dmm_ring *ring, int num, int flag)
+{
+ if (num > DMM_RING_MAX_NUM)
+ return -1;
+
+ (void) memset (ring, 0, sizeof (struct dmm_ring));
+
+ ring->size = num + 1;
+
+ ring->prod_head = 0;
+ ring->prod_tail = 0;
+ ring->is_sp = flag & DMM_RING_INIT_SP;
+
+ ring->cons_head = 0;
+ ring->cons_tail = 0;
+ ring->is_sc = flag & DMM_RING_INIT_SC;
+
+ return 0;
+}
+
+int
+dmm_pool_init (struct dmm_ring *pool, size_t elt_size, int num, uint32_t flag)
+{
+ int ret;
+ void *array;
+ const size_t ring_size = dmm_ring_bufsize (num);
+
+ if (0 != dmm_ring_init (pool, num, flag))
+ {
+ NSFW_LOGERR ("init pool's ring failed, num:%d flag:0x%x ring_size:%lu",
+ num, flag, ring_size);
+ return -1;
+ }
+
+ array = (char *) pool + ring_size;
+ ret = dmm_array_enqueue (pool, array, num, elt_size);
+ if (ret != num)
+ {
+ NSFW_LOGERR ("enqueue failed, num:%d elt_size:%lu", num, elt_size);
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/framework/ring/dmm_ring.h b/src/framework/ring/dmm_ring.h
new file mode 100644
index 0000000..914eff2
--- /dev/null
+++ b/src/framework/ring/dmm_ring.h
@@ -0,0 +1,205 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef _DMM_RING_H_
+#define _DMM_RING_H_
+
+#include <stdint.h>
+
+#include "dmm_atomic.h"
+#include "dmm_pause.h"
+#include "dmm_barrier.h"
+#include "dmm_common.h"
+
+#define DMM_RING_MAX_NUM 0x7FFFFFFE /* 2,147,483,646 */
+
+#define DMM_RING_INIT_SP 0x0001
+#define DMM_RING_INIT_SC 0x0002
+#define DMM_RING_INIT_MPMC 0
+#define DMM_RING_INIT_SPSC (DMM_RING_INIT_SP | DMM_RING_INIT_SC)
+
+struct dmm_ring
+{
+ int size;
+ uint32_t flag;
+
+ volatile int prod_head;
+ volatile int prod_tail;
+ int is_sp;
+ int _prod_pad;
+
+ volatile int cons_head;
+ volatile int cons_tail;
+ int is_sc;
+ int _cons_pad;
+} _dmm_cache_aligned;
+
+#include "dmm_ring_base.h"
+
+void dmm_ring_dump (struct dmm_ring *ring, int list);
+int dmm_ring_init (struct dmm_ring *ring, int num, int flag);
+int dmm_pool_init (struct dmm_ring *pool, size_t elt_size,
+ int num, uint32_t flag);
+
+inline static size_t
+dmm_ring_bufsize (int num)
+{
+ size_t size = sizeof (struct dmm_ring);
+
+ size += (sizeof (void *) * (num + 1));
+
+ return dmm_align (size, DMM_CACHE_LINE_SIZE);
+}
+
+inline static size_t
+dmm_pool_arraysize (int num, int elt_size)
+{
+ const size_t size = elt_size * num;
+ return dmm_align (size, DMM_CACHE_LINE_SIZE);
+}
+
+inline static size_t
+dmm_pool_bufsize (int num, int elt_size)
+{
+ return dmm_ring_bufsize (num) + dmm_pool_arraysize (num, elt_size);
+}
+
+inline static int
+dmm_ring_count (struct dmm_ring *ring)
+{
+ const int count = ring->prod_tail - ring->cons_head;
+
+ if (count >= 0)
+ return count;
+ return count + ring->size;
+}
+
+inline static int
+dmm_ring_free_count (struct dmm_ring *ring)
+{
+ const int count = ring->cons_tail - ring->prod_head;
+
+ if (count >= 0)
+ return count;
+ return count + ring->size;
+}
+
+/* enqueue several objects in a ring.
+ ring: point to the ring
+ p: object pointer list array
+ num: number of object
+ fixed: enqueue fixed number
+single_cons: is single producer
+ <return>: number of enqueued object, 0: queue is full
+*/
+inline static int
+dmm_ring_enqueue (struct dmm_ring *ring, void **p, int num,
+ int fixed, int is_sp)
+{
+ int n, pos, new_pos;
+
+ n = dmm_enqueue_prep (ring, num, &pos, fixed, is_sp);
+
+ if (n == 0)
+ return 0;
+
+ new_pos = dmm_enqueue_copy (ring, pos, p, n);
+
+ dmm_enqueue_done (ring, pos, new_pos, is_sp);
+
+ return n;
+}
+
+inline static int
+dmm_fix_enqueue (struct dmm_ring *ring, void **p, int num)
+{
+ return dmm_ring_enqueue (ring, p, num, 1, ring->is_sp);
+}
+
+inline static int
+dmm_var_enqueue (struct dmm_ring *ring, void **p, int num)
+{
+ return dmm_ring_enqueue (ring, p, num, 0, ring->is_sp);
+}
+
+inline static int
+dmm_enqueue (struct dmm_ring *ring, void *p)
+{
+ return dmm_var_enqueue (ring, &p, 1);
+}
+
+inline static int
+dmm_array_enqueue (struct dmm_ring *ring, void *array,
+ int num, size_t elt_size)
+{
+ int n, pos, new_pos;
+
+ n = dmm_enqueue_prep (ring, num, &pos, 0, ring->is_sp);
+
+ if (n == 0)
+ return 0;
+
+ new_pos = dmm_enqueue_copy_array (ring, pos, array, n, elt_size);
+
+ dmm_enqueue_done (ring, pos, new_pos, ring->is_sp);
+
+ return n;
+}
+
+/* dequeue several objects from a ring.
+ ring: point to the ring
+ p: save object array
+ num: number of p
+ fixed: dequeue fixed number
+single_cons: is single consumer
+ <return>: number of dequeued object, 0: queue is empty
+*/
+inline static int
+dmm_ring_dequeue (struct dmm_ring *ring, void **p, int num,
+ int fixed, int single_cons)
+{
+ int n, pos, new_pos;
+
+ n = dmm_dequeue_prep (ring, num, &pos, fixed, single_cons);
+
+ if (n == 0)
+ return 0;
+
+ new_pos = dmm_dequeue_copy (ring, pos, p, n);
+
+ dmm_dequeue_done (ring, pos, new_pos, single_cons);
+
+ return n;
+}
+
+inline static int
+dmm_fix_dequeue (struct dmm_ring *ring, void **p, int num)
+{
+ return dmm_ring_dequeue (ring, p, num, 1, ring->is_sc);
+}
+
+inline static int
+dmm_var_dequeue (struct dmm_ring *ring, void **p, int num)
+{
+ return dmm_ring_dequeue (ring, p, num, 0, ring->is_sc);
+}
+
+inline static int
+dmm_dequeue (struct dmm_ring *ring, void **p)
+{
+ return dmm_var_dequeue (ring, p, 1);
+}
+
+#endif
diff --git a/src/framework/ring/dmm_ring_base.h b/src/framework/ring/dmm_ring_base.h
new file mode 100644
index 0000000..9a48eea
--- /dev/null
+++ b/src/framework/ring/dmm_ring_base.h
@@ -0,0 +1,189 @@
+/*
+*
+* Copyright (c) 2018 Huawei Technologies Co.,Ltd.
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at:
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef _DMM_RING_BASE_H_
+#define _DMM_RING_BASE_H_
+
+#ifndef _DMM_RING_H_
+#error include dmm_ring.h please
+#endif
+
+inline static int
+__move_head (volatile int *head, int ring_size,
+ int old_head, int real_num, int is_single)
+{
+ int new_head = old_head + real_num;
+
+ if (new_head >= ring_size)
+ {
+ new_head -= ring_size;
+ }
+
+ if (is_single)
+ {
+ *head = new_head;
+ return 1;
+ }
+
+ return dmm_atomic_swap ((dmm_atomic_t *) head, old_head, new_head);
+}
+
+inline static int
+dmm_enqueue_prep (struct dmm_ring *ring, const int num,
+ int *pos, const int fixed, int is_sp)
+{
+ int succ, real, head;
+ const int ring_size = ring->size;
+
+ do
+ {
+ head = ring->prod_head;
+
+ dmm_barrier ();
+
+ if ((real = ring->cons_tail - head - 1) < 0)
+ real += ring_size;
+
+ if (real >= num)
+ real = num;
+ else if (fixed)
+ return 0;
+
+ if (real <= 0)
+ return 0;
+
+ succ = __move_head (&ring->prod_head, ring_size, head, real, is_sp);
+ }
+ while (!succ);
+
+ *pos = head;
+ return real;
+}
+
+inline static int
+dmm_enqueue_copy (struct dmm_ring *ring, int pos, void **from, int num)
+{
+ const int ring_size = ring->size;
+ void **box = (void **) (ring + 1);
+
+ while (num > 0)
+ {
+ box[pos++] = *from++;
+ if (pos >= ring_size)
+ pos = 0;
+ --num;
+ }
+
+ return pos;
+}
+
+inline static void
+dmm_enqueue_done (struct dmm_ring *ring, int pos, int new_pos, int is_sp)
+{
+ dmm_barrier ();
+
+ if (!is_sp)
+ {
+ while (ring->prod_tail != pos)
+ dmm_pause ();
+ }
+
+ ring->prod_tail = new_pos;
+}
+
+inline static int
+dmm_enqueue_copy_array (struct dmm_ring *ring, int pos,
+ void *array, int num, size_t elt_size)
+{
+ const int ring_size = ring->size;
+ void **box = (void **) (ring + 1);
+ char *from = (char *) array;
+
+ while (num > 0)
+ {
+ box[pos++] = from;
+ if (pos >= ring_size)
+ pos = 0;
+ from += elt_size;
+ --num;
+ }
+
+ return pos;
+}
+
+inline static int
+dmm_dequeue_prep (struct dmm_ring *ring, const int num, int *pos,
+ const int fixed, int is_sc)
+{
+ int succ, real, head;
+ const int ring_size = ring->size;
+
+ do
+ {
+ head = ring->cons_head;
+
+ dmm_barrier ();
+
+ if ((real = ring->prod_tail - head) < 0)
+ real += ring_size;
+
+ if (real >= num)
+ real = num;
+ else if (fixed)
+ return 0;
+
+ if (real <= 0)
+ return 0;
+
+ succ = __move_head (&ring->cons_head, ring_size, head, real, is_sc);
+ }
+ while (!succ);
+
+ *pos = head;
+ return real;
+}
+
+inline static int
+dmm_dequeue_copy (struct dmm_ring *ring, int pos, void **to, int num)
+{
+ const int ring_size = ring->size;
+ void **box = (void **) (ring + 1);
+
+ while (num > 0)
+ {
+ *to++ = box[pos++];
+ if (pos >= ring_size)
+ pos = 0;
+ --num;
+ }
+
+ return pos;
+}
+
+inline static void
+dmm_dequeue_done (struct dmm_ring *ring, int pos, int new_pos, int is_sc)
+{
+ dmm_barrier ();
+
+ if (!is_sc)
+ {
+ while (ring->cons_tail != pos)
+ dmm_pause ();
+ }
+
+ ring->cons_tail = new_pos;
+}
+
+#endif