From a633eec74619a96925285ac4dcf0154fbfafb855 Mon Sep 17 00:00:00 2001 From: Konstantin Ananyev Date: Thu, 7 Jul 2016 19:22:38 +0100 Subject: initial tle_dring implementation The Dynamic Ring (dring) is a implementation of unbounded FIFO queue, that supports lockless bulk enqueue/dequeue for multiple producers/consumers. Internally it contains producer/consumer head/tail indexes (same as DPDK rte_ring), plus linked list of Dynamic Ring Blocks (drb)s. Each drb contains some metadata plus array of pointers to queued objects. It is a caller responsibility to provide sufficient number of drbs for enqueue operation, and manage unused drbs returned by dequeue operation. dring features: - FIFO (First In First Out) - Lockless implementation. - Multi- or single-consumer dequeue. - Multi- or single-producer enqueue. - Bulk dequeue. - Bulk enqueue. Change-Id: I3621c99c6b114a387036a397e79baa8d1588bdb5 Signed-off-by: Konstantin Ananyev --- Makefile | 1 + lib/Makefile | 1 + lib/libtle_dring/Makefile | 38 ++++ lib/libtle_dring/dring.c | 101 +++++++++ lib/libtle_dring/tle_dring.h | 473 ++++++++++++++++++++++++++++++++++++++ test/Makefile | 26 +++ test/dring/Makefile | 42 ++++ test/dring/test_dring.c | 525 +++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 1207 insertions(+) create mode 100644 lib/libtle_dring/Makefile create mode 100644 lib/libtle_dring/dring.c create mode 100644 lib/libtle_dring/tle_dring.h create mode 100644 test/Makefile create mode 100644 test/dring/Makefile create mode 100644 test/dring/test_dring.c diff --git a/Makefile b/Makefile index c22527c..072b466 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ RTE_TARGET ?= x86_64-native-linuxapp-gcc DIRS-y += lib DIRS-y += examples +DIRS-y += test MAKEFLAGS += --no-print-directory diff --git a/lib/Makefile b/lib/Makefile index 56e8efb..8ce9bac 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -21,6 +21,7 @@ endif include $(RTE_SDK)/mk/rte.vars.mk +DIRS-y += libtle_dring DIRS-y += libtle_udp include $(TLDK_ROOT)/mk/tle.subdir.mk diff --git a/lib/libtle_dring/Makefile b/lib/libtle_dring/Makefile new file mode 100644 index 0000000..1f2c940 --- /dev/null +++ b/lib/libtle_dring/Makefile @@ -0,0 +1,38 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +# Default target, can be overwritten by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = libtle_dring.a + +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) + +EXPORT_MAP := tle_dring_version.map + +LIBABIVER := 1 + +#source files +SRCS-y += dring.c + +SYMLINK-y-include += tle_dring.h + +include $(RTE_SDK)/mk/rte.extlib.mk diff --git a/lib/libtle_dring/dring.c b/lib/libtle_dring/dring.c new file mode 100644 index 0000000..e0fae19 --- /dev/null +++ b/lib/libtle_dring/dring.c @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +static const char * +str_drb_dummy(const struct tle_dring *dr, const struct tle_drb *db) +{ + return (db == &dr->dummy) ? "" : ""; +} + +static const char * +str_obj_state(const struct tle_dring *dr, const struct tle_drb *db, + uint32_t idx) +{ + if (db->start + idx < dr->cons.tail) + return ""; + else if (db->start + idx >= dr->prod.tail) + return ""; + else + return NULL; +} + +static void +drb_obj_dump(FILE *f, int32_t verb, const struct tle_dring *dr, + const struct tle_drb *db, uint32_t idx) +{ + const char *st; + + st = str_obj_state(dr, db, idx); + + /* pointer to object is valid, dump it. */ + if (st == NULL) + fprintf(f, "\t\t\t\t%u:%p\n", db->start + idx, db->objs[idx]); + + /* dump in verbose mode only. */ + else if (verb > 0) + fprintf(f, "\t\t\t\t%u:%p%s\n", + db->start + idx, db->objs[idx], st); +} + +static void +drb_dump(FILE *f, int32_t verb, const struct tle_dring *dr, + const struct tle_drb *db) +{ + uint32_t i; + + fprintf(f, "\t\t@%p%s={\n", db, str_drb_dummy(dr, db)); + fprintf(f, "\t\t\tnext=%p,\n", db->next); + fprintf(f, "\t\t\tsize=%u,\n", db->size); + fprintf(f, "\t\t\tstart=%u,\n", db->start); + + fprintf(f, "\t\t\tobjs[]={\n"); + for (i = 0; i != db->size; i++) + drb_obj_dump(f, verb, dr, db, i); + fprintf(f, "\t\t\t},\n"); + + fprintf(f, "\t\t},\n"); +} + +void +tle_dring_dump(FILE *f, int32_t verb, const struct tle_dring *dr) +{ + struct tle_drb *db; + + fprintf(f, "tle_dring@%p={\n", dr); + fprintf(f, "\tflags=%#x,\n", dr->flags); + + fprintf(f, "\tprod={,\n"); + fprintf(f, "\t\thead=%u,\n", dr->prod.head); + fprintf(f, "\t\ttail=%u,\n", dr->prod.tail); + fprintf(f, "\t\tcrb=%p%s,\n", dr->prod.crb, + str_drb_dummy(dr, dr->prod.crb)); + fprintf(f, "\t},\n"); + + fprintf(f, "\tcons={,\n"); + fprintf(f, "\t\thead=%u,\n", dr->cons.head); + fprintf(f, "\t\ttail=%u,\n", dr->cons.tail); + fprintf(f, "\t\tcrb=%p%s,\n", dr->cons.crb, + str_drb_dummy(dr, dr->cons.crb)); + fprintf(f, "\t},\n"); + + fprintf(f, "\tdrbs[] = {\n"); + for (db = dr->prod.crb; db != NULL; db = db->next) + drb_dump(f, verb, dr, db); + fprintf(f, "\t},\n"); + + fprintf(f, "};\n"); +} diff --git a/lib/libtle_dring/tle_dring.h b/lib/libtle_dring/tle_dring.h new file mode 100644 index 0000000..e89679d --- /dev/null +++ b/lib/libtle_dring/tle_dring.h @@ -0,0 +1,473 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TLE_DRING_H_ +#define _TLE_DRING_H_ + +#include + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @file + * TLE dring + * + * The Dynamic Ring (dring) is a implementation of unbounded FIFO queue, + * that supports lockless bulk enqueue/dequeue for multiple producers/consumers. + * Internally it is represented by linked list of Dynamic Ring Blocks (drb). + * Each drb contains some metadata plus array of pointers to queued objects. + * It is a caller responsibility to provide sufficient number of drbs for + * enqueue operation, and manage unused drbs returned by dequeue operation. + * dring features: + * + * - FIFO (First In First Out) + * - Lockless implementation. + * - Multi- or single-consumer dequeue. + * - Multi- or single-producer enqueue. + * - Bulk dequeue. + * - Bulk enqueue. + */ + +/* + * RTE_ASSERT was introduced in DPDK 16.07. + * For older versions, use RTE_VERIFY. + */ +#ifdef RTE_ASSERT +#define TLE_DRING_ASSERT(exp) RTE_ASSERT(exp) +#else +#define TLE_DRING_ASSERT(exp) RTE_VERIFY(exp) +#endif + +struct tle_drb { + struct tle_drb *next; + void *udata; /**< user data. */ + uint32_t size; /**< number of objects in that buffer. */ + uint32_t start; /**< start index for that block. */ + const void *objs[0]; +} __rte_cache_aligned; + +struct tle_dring { + uint32_t flags; + struct { + volatile uint32_t head; /**< producer head */ + volatile uint32_t tail; /**< producer tail */ + struct tle_drb * volatile crb; /**< block to enqueue to */ + } prod __rte_cache_aligned; + struct { + volatile uint32_t head; /**< consumer head */ + volatile uint32_t tail; /**< consumer tail */ + struct tle_drb * volatile crb; /**< block to dequeue from */ + } cons __rte_cache_aligned; + + struct tle_drb dummy; /**< dummy block */ +}; + +/* + * helper routine, to copy objects to/from the ring. + */ +static inline void __attribute__((always_inline)) +__tle_dring_copy_objs(const void *dst[], const void * const src[], uint32_t num) +{ + uint32_t i; + + for (i = 0; i != RTE_ALIGN_FLOOR(num, 4); i += 4) { + dst[i] = src[i]; + dst[i + 1] = src[i + 1]; + dst[i + 2] = src[i + 2]; + dst[i + 3] = src[i + 3]; + } + switch (num % 4) { + case 3: + dst[i + 2] = src[i + 2]; + case 2: + dst[i + 1] = src[i + 1]; + case 1: + dst[i] = src[i]; + } +} + +/* + * helper routine, to enqueue objects into the ring. + */ +static inline uint32_t __attribute__((always_inline)) +__tle_dring_enqueue(struct tle_dring *dr, uint32_t head, + const void * const objs[], uint32_t nb_obj, + struct tle_drb *drbs[], uint32_t nb_drb) +{ + uint32_t i, j, k, n; + struct tle_drb *pb; + + pb = dr->prod.crb; + i = 0; + + /* fill the current producer block */ + if (pb->size != 0) { + n = head - pb->start; + k = RTE_MIN(pb->size - n, nb_obj); + __tle_dring_copy_objs(pb->objs + n, objs, k); + i += k; + } + + /* fill new blocks, if any */ + j = 0; + if (i != nb_obj && nb_drb != 0) { + + do { + pb->next = drbs[j]; + pb = drbs[j]; + pb->start = head + i; + k = RTE_MIN(pb->size, nb_obj - i); + __tle_dring_copy_objs(pb->objs, objs + i, k); + i += k; + } while (++j != nb_drb && i != nb_obj); + + pb->next = NULL; + + /* new procucer current block. */ + dr->prod.crb = pb; + } + + /* we have to enqueue all requested objects. */ + TLE_DRING_ASSERT(nb_obj == i); + + /* return number of unused blocks. */ + return nb_drb - j; +} + +/** + * Enqueue several objects on the dring (multi-producers safe). + * Note that it is a caller responsibility to provide enough drbs + * to enqueue all requested objects. + * + * @param dr + * A pointer to the ring structure. + * @param objs + * An array of pointers to objects to enqueue. + * @param nb_obj + * The number of objects to add to the dring from the objs[]. + * @param drbs + * An array of pointers to the drbs that can be used by the dring + * to perform enqueue operation. + * @param nb_drb + * at input: number of elements in the drbs[] array. + * at output: number of unused by the dring elements in the drbs[] array. + * @return + * - number of enqueued objects. + */ +static inline uint32_t +tle_dring_mp_enqueue(struct tle_dring *dr, const void * const objs[], + uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb) +{ + uint32_t head, next; + + if (nb_obj == 0) + return 0; + + /* reserve position inside the ring. */ + do { + head = dr->prod.head; + next = head + nb_obj; + } while (rte_atomic32_cmpset(&dr->prod.head, head, next) == 0); + + /* + * If there are other enqueues in progress that preceded that one, + * then wait for them to complete + */ + while (dr->prod.tail != head) + rte_pause(); + + /* make sure that changes from previous updates are visible. */ + rte_smp_rmb(); + + /* now it is safe to enqueue into the ring. */ + *nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb); + + /* make new objects visible to the consumer. */ + rte_smp_wmb(); + dr->prod.tail = next; + + return nb_obj; +} + +/** + * Enqueue several objects on the dring (NOT multi-producers safe). + * Note that it is a caller responsibility to provide enough drbs + * to enqueue all requested objects. + * + * @param dr + * A pointer to the ring structure. + * @param objs + * An array of pointers to objects to enqueue. + * @param nb_obj + * The number of objects to add to the dring from the objs[]. + * @param drbs + * An array of pointers to the drbs that can be used by the dring + * to perform enqueue operation. + * @param nb_drb + * at input: number of elements in the drbs[] array. + * at output: number of unused by the dring elements in the drbs[] array. + * @return + * - number of enqueued objects. + */ +static inline uint32_t +tle_dring_sp_enqueue(struct tle_dring *dr, const void * const objs[], + uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb) +{ + uint32_t head, next; + + if (nb_obj == 0) + return 0; + + head = dr->prod.head; + next = head + nb_obj; + + /* update producer head value. */ + dr->prod.head = next; + + /* enqueue into the ring. */ + *nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb); + + /* make new objects visible to the consumer. */ + rte_smp_wmb(); + dr->prod.tail = next; + + return nb_obj; +} + +/* + * helper routine, to dequeue objects from the ring. + */ +static inline uint32_t __attribute__((always_inline)) +__tle_dring_dequeue(struct tle_dring *dr, uint32_t head, + const void *objs[], uint32_t nb_obj, + struct tle_drb *drbs[], uint32_t nb_drb) +{ + uint32_t i, j, k, n; + struct tle_drb *pb; + + pb = dr->cons.crb; + i = 0; + + /* copy from the current consumer block */ + if (pb->size != 0) { + n = head - pb->start; + k = RTE_MIN(pb->size - n, nb_obj); + __tle_dring_copy_objs(objs, pb->objs + n, k); + i += k; + } + + /* copy from other blocks */ + j = 0; + if (i != nb_obj && nb_drb != 0) { + + do { + /* current block is empty, put it into the free list. */ + if (pb != &dr->dummy) + drbs[j++] = pb; + + /* proceed to the next block. */ + pb = pb->next; + k = RTE_MIN(pb->size, nb_obj - i); + __tle_dring_copy_objs(objs + i, pb->objs, k); + i += k; + } while (j != nb_drb && i != nb_obj); + + /* new consumer currect block. */ + dr->cons.crb = pb; + } + + /* we have to dequeue all requested objects. */ + TLE_DRING_ASSERT(nb_obj == i); + + /* return number of blocks to free. */ + return j; +} + +/** + * Dequeue several objects from the dring (multi-consumers safe). + * Note, that it is a caller responsibility to provide drbs[] large + * enough to store pointers to all drbs that might become unused + * after that dequeue operation. It is a caller responsibility to manage + * unused drbs after the dequeue operation is completed + * (i.e mark them as free/reusable again, etc.). + * + * @param dr + * A pointer to the ring structure. + * @param objs + * An array of pointers to objects that will be dequeued. + * @param nb_obj + * The number of objects to dequeue from the dring. + * @param drbs + * An array of pointers to the drbs that will become unused after that + * dequeue operation. + * @param nb_drb + * at input: number of elements in the drbs[] array. + * at output: number of filled entries in the drbs[] array. + * @return + * - number of dequeued objects. + */ +static inline uint32_t +tle_dring_mc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj, + struct tle_drb *drbs[], uint32_t *nb_drb) +{ + uint32_t head, next, num, tail; + + /* move cons.head atomically */ + do { + head = dr->cons.head; + tail = dr->prod.tail; + + num = RTE_MIN(tail - head, nb_obj); + + /* no objects to dequeue */ + if (num == 0) { + *nb_drb = 0; + return 0; + } + + next = head + num; + } while (rte_atomic32_cmpset(&dr->cons.head, head, next) == 0); + + /* + * If there are other dequeues in progress that preceded that one, + * then wait for them to complete + */ + while (dr->cons.tail != head) + rte_pause(); + + /* make sure that changes from previous updates are visible. */ + rte_smp_rmb(); + + /* now it is safe to dequeue from the ring. */ + *nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb); + + /* update consumer tail value. */ + rte_smp_wmb(); + dr->cons.tail = next; + + return num; +} + +/** + * Dequeue several objects from the dring (NOT multi-consumers safe). + * Note, that it is a caller responsibility to provide drbs[] large + * enough to store pointers to all drbs that might become unused + * after that dequeue operation. It is a caller responsibility to manage + * unused drbs after the dequeue operation is completed + * (i.e mark them as free/reusable again, etc.). + * + * @param dr + * A pointer to the ring structure. + * @param objs + * An array of pointers to objects that will be dequeued. + * @param nb_obj + * The number of objects to dequeue from the dring. + * @param drbs + * An array of pointers to the drbs that will become unused after that + * dequeue operation. + * @param nb_drb + * at input: number of elements in the drbs[] array. + * at output: number of filled entries in the drbs[] array. + * @return + * - number of dequeued objects. + */ +static inline uint32_t +tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj, + struct tle_drb *drbs[], uint32_t *nb_drb) +{ + uint32_t head, next, num, tail; + + head = dr->cons.head; + tail = dr->prod.tail; + + num = RTE_MIN(tail - head, nb_obj); + + /* no objects to dequeue */ + if (num == 0) { + *nb_drb = 0; + return 0; + } + + next = head + num; + + /* update consumer head value. */ + dr->cons.head = next; + + /* dequeue from the ring. */ + *nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb); + + /* update consumer tail value. */ + rte_smp_wmb(); + dr->cons.tail = next; + + return num; +} + +/** + * Reset given dring to the initial state. + * Note, that information about all queued objects will be lost. + * + * @param dr + * A pointer to the dring structure. + */ +static inline void +tle_dring_reset(struct tle_dring *dr) +{ + memset(dr, 0, sizeof(*dr)); + dr->prod.crb = &dr->dummy; + dr->cons.crb = &dr->dummy; +} + +/** + * Calculate required size for drb to store up to *num* objects. + * + * @param num + * Number of objects drb should be able to store. + * @return + * - required size of the drb. + */ +static inline size_t +tle_drb_calc_size(uint32_t num) +{ + size_t sz; + + sz = offsetof(struct tle_drb, objs[num]); + return RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE); +} + +/** + * Dump information about the dring to the file. + * + * @param f + * A pointer to the file. + * @param verb + * Verbosity level (currently only 0 or 1). + * @param dr + * A pointer to the dring structure. + */ +extern void tle_dring_dump(FILE *f, int32_t verb, const struct tle_dring *dr); + +#ifdef __cplusplus +} +#endif + +#endif /* _TLE_DRING_H_ */ diff --git a/test/Makefile b/test/Makefile new file mode 100644 index 0000000..fb446b4 --- /dev/null +++ b/test/Makefile @@ -0,0 +1,26 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +ifeq ($(TLDK_ROOT),) +$(error "Please define TLDK_SDK environment variable") +endif + +include $(RTE_SDK)/mk/rte.vars.mk + +DIRS-y += dring + +include $(TLDK_ROOT)/mk/tle.subdir.mk diff --git a/test/dring/Makefile b/test/dring/Makefile new file mode 100644 index 0000000..6822a1f --- /dev/null +++ b/test/dring/Makefile @@ -0,0 +1,42 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ifeq ($(RTE_SDK),) +$(error "Please define RTE_SDK environment variable") +endif + +ifeq ($(RTE_TARGET),) +$(error "Please define RTE_TARGET environment variable") +endif + +ifeq ($(TLDK_ROOT),) +$(error "Please define TLDK_ROOT environment variable") +endif + +include $(RTE_SDK)/mk/rte.vars.mk + +# binary name +APP = test_dring + +# all source are stored in SRCS-y +SRCS-y += test_dring.c + +CFLAGS += $(WERROR_FLAGS) +CFLAGS += -I$(TLDK_ROOT)/$(RTE_TARGET)/include + +LDLIBS += -L$(TLDK_ROOT)/$(RTE_TARGET)/lib +LDLIBS += -ltle_dring + +EXTRA_CFLAGS += -O3 + +include $(RTE_SDK)/mk/rte.extapp.mk diff --git a/test/dring/test_dring.c b/test/dring/test_dring.c new file mode 100644 index 0000000..22d0db4 --- /dev/null +++ b/test/dring/test_dring.c @@ -0,0 +1,525 @@ +/* + * Copyright (c) 2016 Intel Corporation. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define OBJ_NUM UINT16_MAX +#define ITER_NUM (4 * OBJ_NUM) + +enum { + NONE, + SINGLE, + MULTI, +}; + +struct dring_arg { + struct tle_dring *dr; + struct rte_ring *r; + uint32_t iter; + int32_t enq_type; + int32_t deq_type; + uint32_t enq; + uint32_t deq; +}; + +/* + * free memory allocated for drbs and for the ring itself. + */ +static void +fini_drb_ring(struct rte_ring *r) +{ + struct tle_drb *drb; + + /* free drbs. */ + while (rte_ring_dequeue(r, (void **)&drb) == 0) + free(drb); + + /* free ring. */ + free(r); +} + +/* + * allocate drbs for specified number of objects, put them into the ring. + */ +static struct rte_ring * +init_drb_ring(uint32_t num) +{ + uint32_t i, k, n; + size_t sz, tsz; + struct rte_ring *r; + struct tle_drb *drb; + + /* allocate and initialise rte_ring. */ + + n = rte_align32pow2(num); + sz = sizeof(*r) + n * sizeof(r->ring[0]); + + r = calloc(1, sz); + if (r == NULL) { + printf("%s:%d(%u) failed to allocate %zu bytes;\n", + __func__, __LINE__, num, sz); + return NULL; + } + + rte_ring_init(r, __func__, n, 0); + + /* allocate drbs and put them into the ring. */ + + tsz = sz; + for (i = 0; i != num; i += k) { + k = rte_rand() % (UINT8_MAX + 1) + 1; + k = RTE_MIN(k, num - i); + sz = tle_drb_calc_size(k); + drb = calloc(1, sz); + if (drb == NULL) { + printf("%s:%d(%u) %u-th iteration: " + "failed to allocate %zu bytes;\n", + __func__, __LINE__, num, i, sz); + fini_drb_ring(r); + return NULL; + } + drb->size = k; + rte_ring_enqueue(r, drb); + tsz += sz; + } + + printf("%s(%u) total %zu bytes allocated, number of drbs: %u;\n", + __func__, num, tsz, rte_ring_count(r)); + return r; +} + +/* + * Each enqueued object will contain: + * [2-3]B: it's own sequence number. + * [0-1]B: next object sequence number, or UINT16_MAX. + */ +static void +test_fill_obj(uintptr_t obj[], uint32_t num) +{ + uint32_t i; + + for (i = 0; i != num - 1; i++) + obj[i] = i << 16 | (i + 1); + + obj[i] = i << 16 | UINT16_MAX; +} + +static uint32_t +test_check_obj(uintptr_t obj[], uint32_t num) +{ + uint32_t i, h, l, oh, ol; + + h = obj[0] >> 16; + l = obj[0] & UINT16_MAX; + + if (h + 1 != l && l != UINT16_MAX) + return 0; + + if (l == UINT16_MAX) + l = 0; + + for (i = 1; i != num; i++) { + + oh = obj[i] >> 16; + ol = obj[i] & UINT16_MAX; + + if (l != oh || (oh + 1 != ol && ol != UINT16_MAX)) + return i; + + l = ol; + if (l == UINT16_MAX) + l = 0; + } + + return num; +} + +static int +test_dring_dequeue(struct tle_dring *dr, struct rte_ring *r, uint32_t num, + int32_t type) +{ + uint32_t i, k, lc, n, t; + struct tle_drb *drb[num]; + uintptr_t obj[num]; + + lc = rte_lcore_id(); + k = num; + + /* dequeue objects. */ + if (type == SINGLE) + n = tle_dring_sc_dequeue(dr, (const void **)obj, num, drb, &k); + else if (type == MULTI) + n = tle_dring_mc_dequeue(dr, (const void **)obj, num, drb, &k); + else + return -EINVAL; + + if (n == 0) + return 0; + + /* check the data returned. */ + t = test_check_obj(obj, n); + if (t != n) { + printf("%s:%d(%p, %u) at lcore %u: invalid dequeued object, " + "n=%u, idx=%u, obj=%#x, prev obj=%#x;\n", + __func__, __LINE__, dr, num, lc, n, t, + (uint32_t)obj[t], (t == 0) ? 0 : (uint32_t)obj[t - 1]); + return -EFAULT; + } + + /* check and free drbs. */ + for (i = 0; i != k; i++) { + /* udata value for drb in use shouldn't be zero. */ + if (drb[i]->udata == NULL) { + printf("error @ %s:%d(%p, %u) at lcore %u: " + "erroneous drb@%p={udata=%p, size=%u,};\n", + __func__, __LINE__, dr, num, lc, drb[i], + drb[i]->udata, drb[i]->size); + return -EFAULT; + } + drb[i]->udata = NULL; + rte_ring_enqueue(r, drb[i]); + } + + return n; +} + +static int +test_dring_enqueue(struct tle_dring *dr, struct rte_ring *r, uint32_t num, + int32_t type) +{ + uint32_t i, j, k, lc, nb; + struct tle_drb *drb[num]; + uintptr_t obj[num]; + + lc = rte_lcore_id(); + + /* prepare drbs to enqueue up to *num* objects. */ + for (i = 0, j = 0; i != num; i += k, j++) { + + if (rte_ring_dequeue(r, (void **)&drb[j]) != 0) + break; + + /* udata value for unused drb should be zero. */ + if (drb[j]->udata != NULL) { + printf("error @ %s:%d(%p, %u) at lcore %u: " + "erroneous drb@%p={udata=%p, size=%u,};\n", + __func__, __LINE__, dr, num, lc, drb[j], + drb[j]->udata, drb[j]->size); + return -EFAULT; + } + + /* update udata value with current lcore id. */ + drb[j]->udata = (void *)(uintptr_t)(lc + 1); + k = drb[j]->size; + k = RTE_MIN(k, num - i); + } + + /* no free drbs left. */ + if (i == 0) + return 0; + + /* fill objects to enqueue. */ + test_fill_obj(obj, i); + + /* enqueue into the dring. */ + nb = j; + if (type == SINGLE) + k = tle_dring_sp_enqueue(dr, (const void **)obj, i, drb, &nb); + else if (type == MULTI) + k = tle_dring_mp_enqueue(dr, (const void **)obj, i, drb, &nb); + else + return -EINVAL; + + if (k != i) { + printf("%s:%d(%p, %p, %u): failed to enqueue %u objects;\n", + __func__, __LINE__, dr, r, num, i); + } + + /* free unused drbs */ + for (i = j - nb; i != j; i++) { + if ((uintptr_t)drb[i]->udata != lc + 1) { + printf("error @ %s:%d(%p, %u) at lcore %u: " + "erroneous drb@%p={udata=%p, size=%u,};\n", + __func__, __LINE__, dr, num, lc, drb[i], + drb[i]->udata, drb[i]->size); + return -EFAULT; + } + drb[i]->udata = NULL; + rte_ring_enqueue(r, drb[i]); + } + + return k; +} + +static int +test_dring_enq_deq(struct dring_arg *arg) +{ + int32_t rc; + uint32_t i, lc, n; + + rc = 0; + arg->enq = 0; + arg->deq = 0; + lc = rte_lcore_id(); + + for (i = 0; i != arg->iter; i++) { + + /* try to enqueue random number of objects. */ + if (arg->enq_type != NONE) { + n = rte_rand() % (UINT8_MAX + 1); + rc = test_dring_enqueue(arg->dr, arg->r, n, + arg->enq_type); + if (rc < 0) + break; + arg->enq += rc; + } + + /* try to dequeue random number of objects. */ + if (arg->deq_type != NONE) { + n = rte_rand() % (UINT8_MAX + 1); + rc = test_dring_dequeue(arg->dr, arg->r, n, + arg->deq_type); + if (rc < 0) + break; + arg->deq += rc; + } + } + + if (rc < 0) + return rc; + + /* dequeue remaining objects. */ + while (arg->deq_type != NONE && arg->enq != arg->deq) { + + /* try to dequeue random number of objects. */ + n = rte_rand() % (UINT8_MAX + 1) + 1; + rc = test_dring_dequeue(arg->dr, arg->r, n, arg->deq_type); + if (rc <= 0) + break; + arg->deq += rc; + } + + printf("%s:%d(lcore=%u, enq_type=%d, deq_type=%d): " + "%u objects enqueued, %u objects dequeued\n", + __func__, __LINE__, lc, arg->enq_type, arg->deq_type, + arg->enq, arg->deq); + return 0; +} + +/* + * enqueue/dequeue by single thread. + */ +static int +test_dring_st(void) +{ + int32_t rc; + struct rte_ring *r; + struct tle_dring dr; + struct dring_arg arg; + + printf("%s started;\n", __func__); + + tle_dring_reset(&dr); + r = init_drb_ring(OBJ_NUM); + if (r == NULL) + return -ENOMEM; + + tle_dring_dump(stdout, 1, &dr); + + memset(&arg, 0, sizeof(arg)); + arg.dr = &dr; + arg.r = r; + arg.iter = ITER_NUM; + arg.enq_type = SINGLE; + arg.deq_type = SINGLE; + rc = test_dring_enq_deq(&arg); + + rc = (rc != 0) ? rc : (arg.enq != arg.deq); + printf("%s finished with status: %s(%d);\n", + __func__, strerror(-rc), rc); + + tle_dring_dump(stdout, rc != 0, &dr); + fini_drb_ring(r); + + return rc; +} + +static int +test_dring_worker(void *arg) +{ + struct dring_arg *p; + + p = (struct dring_arg *)arg; + return test_dring_enq_deq(p); +} + +/* + * enqueue/dequeue by multiple threads. + */ +static int +test_dring_mt(int32_t master_enq_type, int32_t master_deq_type, + int32_t slave_enq_type, int32_t slave_deq_type) +{ + int32_t rc; + uint32_t lc; + uint64_t deq, enq; + struct rte_ring *r; + struct tle_dring dr; + struct dring_arg arg[RTE_MAX_LCORE]; + + tle_dring_reset(&dr); + r = init_drb_ring(OBJ_NUM); + if (r == NULL) + return -ENOMEM; + + memset(arg, 0, sizeof(arg)); + + /* launch on all slaves */ + RTE_LCORE_FOREACH_SLAVE(lc) { + arg[lc].dr = &dr; + arg[lc].r = r; + arg[lc].iter = ITER_NUM; + arg[lc].enq_type = slave_enq_type; + arg[lc].deq_type = slave_deq_type; + rte_eal_remote_launch(test_dring_worker, &arg[lc], lc); + } + + /* launch on master */ + lc = rte_lcore_id(); + arg[lc].dr = &dr; + arg[lc].r = r; + arg[lc].iter = ITER_NUM; + arg[lc].enq_type = master_enq_type; + arg[lc].deq_type = master_deq_type; + rc = test_dring_worker(&arg[lc]); + enq = arg[lc].enq; + deq = arg[lc].deq; + + /* wait for slaves. */ + RTE_LCORE_FOREACH_SLAVE(lc) { + rc |= rte_eal_wait_lcore(lc); + enq += arg[lc].enq; + deq += arg[lc].deq; + } + + printf("%s:%d: total %" PRIu64 " objects enqueued, %" + PRIu64 " objects dequeued\n", + __func__, __LINE__, enq, deq); + + rc = (rc != 0) ? rc : (enq != deq); + if (rc != 0) + tle_dring_dump(stdout, 1, &dr); + + fini_drb_ring(r); + return rc; +} + +static int +test_dring_mp_mc(void) +{ + int32_t rc; + + printf("%s started;\n", __func__); + rc = test_dring_mt(MULTI, MULTI, MULTI, MULTI); + printf("%s finished with status: %s(%d);\n", + __func__, strerror(-rc), rc); + return rc; +} + +static int +test_dring_mp_sc(void) +{ + int32_t rc; + + printf("%s started;\n", __func__); + rc = test_dring_mt(MULTI, SINGLE, MULTI, NONE); + printf("%s finished with status: %s(%d);\n", + __func__, strerror(-rc), rc); + return rc; +} + +static int +test_dring_sp_mc(void) +{ + int32_t rc; + + printf("%s started;\n", __func__); + rc = test_dring_mt(SINGLE, MULTI, NONE, MULTI); + printf("%s finished with status: %s(%d);\n", + __func__, strerror(-rc), rc); + return rc; +} + +static int +test_dring(void) +{ + int32_t rc; + + rc = test_dring_st(); + if (rc != 0) + return rc; + + rc = test_dring_mp_mc(); + if (rc != 0) + return rc; + + rc = test_dring_mp_sc(); + if (rc != 0) + return rc; + + rc = test_dring_sp_mc(); + if (rc != 0) + return rc; + + return 0; +} + +int +main(int argc, char *argv[]) +{ + int32_t rc; + + rc = rte_eal_init(argc, argv); + if (rc < 0) + rte_exit(EXIT_FAILURE, + "%s: rte_eal_init failed with error code: %d\n", + __func__, rc); + + rc = test_dring(); + if (rc != 0) + printf("TEST FAILED\n"); + else + printf("TEST OK\n"); + + return rc; +} -- cgit 1.2.3-korg