aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorKonstantin Ananyev <konstantin.ananyev@intel.com>2016-07-07 19:22:38 +0100
committerKonstantin Ananyev <konstantin.ananyev@intel.com>2016-07-07 23:34:40 +0100
commita633eec74619a96925285ac4dcf0154fbfafb855 (patch)
treea3c178491a4e5508f4350feb913c1a6e6fb058d4 /lib
parentc034e6912a0bcaaaf90b64f2c09d6d1cdda3aecb (diff)
initial tle_dring implementation
The Dynamic Ring (dring) is a implementation of unbounded FIFO queue, that supports lockless bulk enqueue/dequeue for multiple producers/consumers. Internally it contains producer/consumer head/tail indexes (same as DPDK rte_ring), plus linked list of Dynamic Ring Blocks (drb)s. Each drb contains some metadata plus array of pointers to queued objects. It is a caller responsibility to provide sufficient number of drbs for enqueue operation, and manage unused drbs returned by dequeue operation. dring features: - FIFO (First In First Out) - Lockless implementation. - Multi- or single-consumer dequeue. - Multi- or single-producer enqueue. - Bulk dequeue. - Bulk enqueue. Change-Id: I3621c99c6b114a387036a397e79baa8d1588bdb5 Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/Makefile1
-rw-r--r--lib/libtle_dring/Makefile38
-rw-r--r--lib/libtle_dring/dring.c101
-rw-r--r--lib/libtle_dring/tle_dring.h473
4 files changed, 613 insertions, 0 deletions
diff --git a/lib/Makefile b/lib/Makefile
index 56e8efb..8ce9bac 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -21,6 +21,7 @@ endif
include $(RTE_SDK)/mk/rte.vars.mk
+DIRS-y += libtle_dring
DIRS-y += libtle_udp
include $(TLDK_ROOT)/mk/tle.subdir.mk
diff --git a/lib/libtle_dring/Makefile b/lib/libtle_dring/Makefile
new file mode 100644
index 0000000..1f2c940
--- /dev/null
+++ b/lib/libtle_dring/Makefile
@@ -0,0 +1,38 @@
+# Copyright (c) 2016 Intel Corporation.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overwritten by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = libtle_dring.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+EXPORT_MAP := tle_dring_version.map
+
+LIBABIVER := 1
+
+#source files
+SRCS-y += dring.c
+
+SYMLINK-y-include += tle_dring.h
+
+include $(RTE_SDK)/mk/rte.extlib.mk
diff --git a/lib/libtle_dring/dring.c b/lib/libtle_dring/dring.c
new file mode 100644
index 0000000..e0fae19
--- /dev/null
+++ b/lib/libtle_dring/dring.c
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2016 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <tle_dring.h>
+
+static const char *
+str_drb_dummy(const struct tle_dring *dr, const struct tle_drb *db)
+{
+ return (db == &dr->dummy) ? "<dummy>" : "";
+}
+
+static const char *
+str_obj_state(const struct tle_dring *dr, const struct tle_drb *db,
+ uint32_t idx)
+{
+ if (db->start + idx < dr->cons.tail)
+ return "<stale>";
+ else if (db->start + idx >= dr->prod.tail)
+ return "<free>";
+ else
+ return NULL;
+}
+
+static void
+drb_obj_dump(FILE *f, int32_t verb, const struct tle_dring *dr,
+ const struct tle_drb *db, uint32_t idx)
+{
+ const char *st;
+
+ st = str_obj_state(dr, db, idx);
+
+ /* pointer to object is valid, dump it. */
+ if (st == NULL)
+ fprintf(f, "\t\t\t\t%u:%p\n", db->start + idx, db->objs[idx]);
+
+ /* dump in verbose mode only. */
+ else if (verb > 0)
+ fprintf(f, "\t\t\t\t%u:%p%s\n",
+ db->start + idx, db->objs[idx], st);
+}
+
+static void
+drb_dump(FILE *f, int32_t verb, const struct tle_dring *dr,
+ const struct tle_drb *db)
+{
+ uint32_t i;
+
+ fprintf(f, "\t\t@%p%s={\n", db, str_drb_dummy(dr, db));
+ fprintf(f, "\t\t\tnext=%p,\n", db->next);
+ fprintf(f, "\t\t\tsize=%u,\n", db->size);
+ fprintf(f, "\t\t\tstart=%u,\n", db->start);
+
+ fprintf(f, "\t\t\tobjs[]={\n");
+ for (i = 0; i != db->size; i++)
+ drb_obj_dump(f, verb, dr, db, i);
+ fprintf(f, "\t\t\t},\n");
+
+ fprintf(f, "\t\t},\n");
+}
+
+void
+tle_dring_dump(FILE *f, int32_t verb, const struct tle_dring *dr)
+{
+ struct tle_drb *db;
+
+ fprintf(f, "tle_dring@%p={\n", dr);
+ fprintf(f, "\tflags=%#x,\n", dr->flags);
+
+ fprintf(f, "\tprod={,\n");
+ fprintf(f, "\t\thead=%u,\n", dr->prod.head);
+ fprintf(f, "\t\ttail=%u,\n", dr->prod.tail);
+ fprintf(f, "\t\tcrb=%p%s,\n", dr->prod.crb,
+ str_drb_dummy(dr, dr->prod.crb));
+ fprintf(f, "\t},\n");
+
+ fprintf(f, "\tcons={,\n");
+ fprintf(f, "\t\thead=%u,\n", dr->cons.head);
+ fprintf(f, "\t\ttail=%u,\n", dr->cons.tail);
+ fprintf(f, "\t\tcrb=%p%s,\n", dr->cons.crb,
+ str_drb_dummy(dr, dr->cons.crb));
+ fprintf(f, "\t},\n");
+
+ fprintf(f, "\tdrbs[] = {\n");
+ for (db = dr->prod.crb; db != NULL; db = db->next)
+ drb_dump(f, verb, dr, db);
+ fprintf(f, "\t},\n");
+
+ fprintf(f, "};\n");
+}
diff --git a/lib/libtle_dring/tle_dring.h b/lib/libtle_dring/tle_dring.h
new file mode 100644
index 0000000..e89679d
--- /dev/null
+++ b/lib/libtle_dring/tle_dring.h
@@ -0,0 +1,473 @@
+/*
+ * Copyright (c) 2016 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _TLE_DRING_H_
+#define _TLE_DRING_H_
+
+#include <string.h>
+
+#include <rte_common.h>
+#include <rte_atomic.h>
+#include <rte_memory.h>
+#include <rte_debug.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ * TLE dring
+ *
+ * The Dynamic Ring (dring) is a implementation of unbounded FIFO queue,
+ * that supports lockless bulk enqueue/dequeue for multiple producers/consumers.
+ * Internally it is represented by linked list of Dynamic Ring Blocks (drb).
+ * Each drb contains some metadata plus array of pointers to queued objects.
+ * It is a caller responsibility to provide sufficient number of drbs for
+ * enqueue operation, and manage unused drbs returned by dequeue operation.
+ * dring features:
+ *
+ * - FIFO (First In First Out)
+ * - Lockless implementation.
+ * - Multi- or single-consumer dequeue.
+ * - Multi- or single-producer enqueue.
+ * - Bulk dequeue.
+ * - Bulk enqueue.
+ */
+
+/*
+ * RTE_ASSERT was introduced in DPDK 16.07.
+ * For older versions, use RTE_VERIFY.
+ */
+#ifdef RTE_ASSERT
+#define TLE_DRING_ASSERT(exp) RTE_ASSERT(exp)
+#else
+#define TLE_DRING_ASSERT(exp) RTE_VERIFY(exp)
+#endif
+
+struct tle_drb {
+ struct tle_drb *next;
+ void *udata; /**< user data. */
+ uint32_t size; /**< number of objects in that buffer. */
+ uint32_t start; /**< start index for that block. */
+ const void *objs[0];
+} __rte_cache_aligned;
+
+struct tle_dring {
+ uint32_t flags;
+ struct {
+ volatile uint32_t head; /**< producer head */
+ volatile uint32_t tail; /**< producer tail */
+ struct tle_drb * volatile crb; /**< block to enqueue to */
+ } prod __rte_cache_aligned;
+ struct {
+ volatile uint32_t head; /**< consumer head */
+ volatile uint32_t tail; /**< consumer tail */
+ struct tle_drb * volatile crb; /**< block to dequeue from */
+ } cons __rte_cache_aligned;
+
+ struct tle_drb dummy; /**< dummy block */
+};
+
+/*
+ * helper routine, to copy objects to/from the ring.
+ */
+static inline void __attribute__((always_inline))
+__tle_dring_copy_objs(const void *dst[], const void * const src[], uint32_t num)
+{
+ uint32_t i;
+
+ for (i = 0; i != RTE_ALIGN_FLOOR(num, 4); i += 4) {
+ dst[i] = src[i];
+ dst[i + 1] = src[i + 1];
+ dst[i + 2] = src[i + 2];
+ dst[i + 3] = src[i + 3];
+ }
+ switch (num % 4) {
+ case 3:
+ dst[i + 2] = src[i + 2];
+ case 2:
+ dst[i + 1] = src[i + 1];
+ case 1:
+ dst[i] = src[i];
+ }
+}
+
+/*
+ * helper routine, to enqueue objects into the ring.
+ */
+static inline uint32_t __attribute__((always_inline))
+__tle_dring_enqueue(struct tle_dring *dr, uint32_t head,
+ const void * const objs[], uint32_t nb_obj,
+ struct tle_drb *drbs[], uint32_t nb_drb)
+{
+ uint32_t i, j, k, n;
+ struct tle_drb *pb;
+
+ pb = dr->prod.crb;
+ i = 0;
+
+ /* fill the current producer block */
+ if (pb->size != 0) {
+ n = head - pb->start;
+ k = RTE_MIN(pb->size - n, nb_obj);
+ __tle_dring_copy_objs(pb->objs + n, objs, k);
+ i += k;
+ }
+
+ /* fill new blocks, if any */
+ j = 0;
+ if (i != nb_obj && nb_drb != 0) {
+
+ do {
+ pb->next = drbs[j];
+ pb = drbs[j];
+ pb->start = head + i;
+ k = RTE_MIN(pb->size, nb_obj - i);
+ __tle_dring_copy_objs(pb->objs, objs + i, k);
+ i += k;
+ } while (++j != nb_drb && i != nb_obj);
+
+ pb->next = NULL;
+
+ /* new procucer current block. */
+ dr->prod.crb = pb;
+ }
+
+ /* we have to enqueue all requested objects. */
+ TLE_DRING_ASSERT(nb_obj == i);
+
+ /* return number of unused blocks. */
+ return nb_drb - j;
+}
+
+/**
+ * Enqueue several objects on the dring (multi-producers safe).
+ * Note that it is a caller responsibility to provide enough drbs
+ * to enqueue all requested objects.
+ *
+ * @param dr
+ * A pointer to the ring structure.
+ * @param objs
+ * An array of pointers to objects to enqueue.
+ * @param nb_obj
+ * The number of objects to add to the dring from the objs[].
+ * @param drbs
+ * An array of pointers to the drbs that can be used by the dring
+ * to perform enqueue operation.
+ * @param nb_drb
+ * at input: number of elements in the drbs[] array.
+ * at output: number of unused by the dring elements in the drbs[] array.
+ * @return
+ * - number of enqueued objects.
+ */
+static inline uint32_t
+tle_dring_mp_enqueue(struct tle_dring *dr, const void * const objs[],
+ uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+ uint32_t head, next;
+
+ if (nb_obj == 0)
+ return 0;
+
+ /* reserve position inside the ring. */
+ do {
+ head = dr->prod.head;
+ next = head + nb_obj;
+ } while (rte_atomic32_cmpset(&dr->prod.head, head, next) == 0);
+
+ /*
+ * If there are other enqueues in progress that preceded that one,
+ * then wait for them to complete
+ */
+ while (dr->prod.tail != head)
+ rte_pause();
+
+ /* make sure that changes from previous updates are visible. */
+ rte_smp_rmb();
+
+ /* now it is safe to enqueue into the ring. */
+ *nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb);
+
+ /* make new objects visible to the consumer. */
+ rte_smp_wmb();
+ dr->prod.tail = next;
+
+ return nb_obj;
+}
+
+/**
+ * Enqueue several objects on the dring (NOT multi-producers safe).
+ * Note that it is a caller responsibility to provide enough drbs
+ * to enqueue all requested objects.
+ *
+ * @param dr
+ * A pointer to the ring structure.
+ * @param objs
+ * An array of pointers to objects to enqueue.
+ * @param nb_obj
+ * The number of objects to add to the dring from the objs[].
+ * @param drbs
+ * An array of pointers to the drbs that can be used by the dring
+ * to perform enqueue operation.
+ * @param nb_drb
+ * at input: number of elements in the drbs[] array.
+ * at output: number of unused by the dring elements in the drbs[] array.
+ * @return
+ * - number of enqueued objects.
+ */
+static inline uint32_t
+tle_dring_sp_enqueue(struct tle_dring *dr, const void * const objs[],
+ uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+ uint32_t head, next;
+
+ if (nb_obj == 0)
+ return 0;
+
+ head = dr->prod.head;
+ next = head + nb_obj;
+
+ /* update producer head value. */
+ dr->prod.head = next;
+
+ /* enqueue into the ring. */
+ *nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb);
+
+ /* make new objects visible to the consumer. */
+ rte_smp_wmb();
+ dr->prod.tail = next;
+
+ return nb_obj;
+}
+
+/*
+ * helper routine, to dequeue objects from the ring.
+ */
+static inline uint32_t __attribute__((always_inline))
+__tle_dring_dequeue(struct tle_dring *dr, uint32_t head,
+ const void *objs[], uint32_t nb_obj,
+ struct tle_drb *drbs[], uint32_t nb_drb)
+{
+ uint32_t i, j, k, n;
+ struct tle_drb *pb;
+
+ pb = dr->cons.crb;
+ i = 0;
+
+ /* copy from the current consumer block */
+ if (pb->size != 0) {
+ n = head - pb->start;
+ k = RTE_MIN(pb->size - n, nb_obj);
+ __tle_dring_copy_objs(objs, pb->objs + n, k);
+ i += k;
+ }
+
+ /* copy from other blocks */
+ j = 0;
+ if (i != nb_obj && nb_drb != 0) {
+
+ do {
+ /* current block is empty, put it into the free list. */
+ if (pb != &dr->dummy)
+ drbs[j++] = pb;
+
+ /* proceed to the next block. */
+ pb = pb->next;
+ k = RTE_MIN(pb->size, nb_obj - i);
+ __tle_dring_copy_objs(objs + i, pb->objs, k);
+ i += k;
+ } while (j != nb_drb && i != nb_obj);
+
+ /* new consumer currect block. */
+ dr->cons.crb = pb;
+ }
+
+ /* we have to dequeue all requested objects. */
+ TLE_DRING_ASSERT(nb_obj == i);
+
+ /* return number of blocks to free. */
+ return j;
+}
+
+/**
+ * Dequeue several objects from the dring (multi-consumers safe).
+ * Note, that it is a caller responsibility to provide drbs[] large
+ * enough to store pointers to all drbs that might become unused
+ * after that dequeue operation. It is a caller responsibility to manage
+ * unused drbs after the dequeue operation is completed
+ * (i.e mark them as free/reusable again, etc.).
+ *
+ * @param dr
+ * A pointer to the ring structure.
+ * @param objs
+ * An array of pointers to objects that will be dequeued.
+ * @param nb_obj
+ * The number of objects to dequeue from the dring.
+ * @param drbs
+ * An array of pointers to the drbs that will become unused after that
+ * dequeue operation.
+ * @param nb_drb
+ * at input: number of elements in the drbs[] array.
+ * at output: number of filled entries in the drbs[] array.
+ * @return
+ * - number of dequeued objects.
+ */
+static inline uint32_t
+tle_dring_mc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
+ struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+ uint32_t head, next, num, tail;
+
+ /* move cons.head atomically */
+ do {
+ head = dr->cons.head;
+ tail = dr->prod.tail;
+
+ num = RTE_MIN(tail - head, nb_obj);
+
+ /* no objects to dequeue */
+ if (num == 0) {
+ *nb_drb = 0;
+ return 0;
+ }
+
+ next = head + num;
+ } while (rte_atomic32_cmpset(&dr->cons.head, head, next) == 0);
+
+ /*
+ * If there are other dequeues in progress that preceded that one,
+ * then wait for them to complete
+ */
+ while (dr->cons.tail != head)
+ rte_pause();
+
+ /* make sure that changes from previous updates are visible. */
+ rte_smp_rmb();
+
+ /* now it is safe to dequeue from the ring. */
+ *nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb);
+
+ /* update consumer tail value. */
+ rte_smp_wmb();
+ dr->cons.tail = next;
+
+ return num;
+}
+
+/**
+ * Dequeue several objects from the dring (NOT multi-consumers safe).
+ * Note, that it is a caller responsibility to provide drbs[] large
+ * enough to store pointers to all drbs that might become unused
+ * after that dequeue operation. It is a caller responsibility to manage
+ * unused drbs after the dequeue operation is completed
+ * (i.e mark them as free/reusable again, etc.).
+ *
+ * @param dr
+ * A pointer to the ring structure.
+ * @param objs
+ * An array of pointers to objects that will be dequeued.
+ * @param nb_obj
+ * The number of objects to dequeue from the dring.
+ * @param drbs
+ * An array of pointers to the drbs that will become unused after that
+ * dequeue operation.
+ * @param nb_drb
+ * at input: number of elements in the drbs[] array.
+ * at output: number of filled entries in the drbs[] array.
+ * @return
+ * - number of dequeued objects.
+ */
+static inline uint32_t
+tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
+ struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+ uint32_t head, next, num, tail;
+
+ head = dr->cons.head;
+ tail = dr->prod.tail;
+
+ num = RTE_MIN(tail - head, nb_obj);
+
+ /* no objects to dequeue */
+ if (num == 0) {
+ *nb_drb = 0;
+ return 0;
+ }
+
+ next = head + num;
+
+ /* update consumer head value. */
+ dr->cons.head = next;
+
+ /* dequeue from the ring. */
+ *nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb);
+
+ /* update consumer tail value. */
+ rte_smp_wmb();
+ dr->cons.tail = next;
+
+ return num;
+}
+
+/**
+ * Reset given dring to the initial state.
+ * Note, that information about all queued objects will be lost.
+ *
+ * @param dr
+ * A pointer to the dring structure.
+ */
+static inline void
+tle_dring_reset(struct tle_dring *dr)
+{
+ memset(dr, 0, sizeof(*dr));
+ dr->prod.crb = &dr->dummy;
+ dr->cons.crb = &dr->dummy;
+}
+
+/**
+ * Calculate required size for drb to store up to *num* objects.
+ *
+ * @param num
+ * Number of objects drb should be able to store.
+ * @return
+ * - required size of the drb.
+ */
+static inline size_t
+tle_drb_calc_size(uint32_t num)
+{
+ size_t sz;
+
+ sz = offsetof(struct tle_drb, objs[num]);
+ return RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
+}
+
+/**
+ * Dump information about the dring to the file.
+ *
+ * @param f
+ * A pointer to the file.
+ * @param verb
+ * Verbosity level (currently only 0 or 1).
+ * @param dr
+ * A pointer to the dring structure.
+ */
+extern void tle_dring_dump(FILE *f, int32_t verb, const struct tle_dring *dr);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _TLE_DRING_H_ */