summaryrefslogtreecommitdiffstats
path: root/libparc/parc/concurrent/parc_RingBuffer_1x1.c
diff options
context:
space:
mode:
Diffstat (limited to 'libparc/parc/concurrent/parc_RingBuffer_1x1.c')
-rwxr-xr-xlibparc/parc/concurrent/parc_RingBuffer_1x1.c235
1 files changed, 235 insertions, 0 deletions
diff --git a/libparc/parc/concurrent/parc_RingBuffer_1x1.c b/libparc/parc/concurrent/parc_RingBuffer_1x1.c
new file mode 100755
index 00000000..a023f4d7
--- /dev/null
+++ b/libparc/parc/concurrent/parc_RingBuffer_1x1.c
@@ -0,0 +1,235 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A thread-safe fixed size ring buffer.
+ *
+ * A single-producer/single-consumer version is lock-free, along the lines of Lamport, "Proving the
+ * Correctness of Multiprocess Programs," IEEE Trans on Software Engineering 3(2), Mar 1977, which
+ * is based on reading/writing native types upto the data bus width being atomic operations.
+ *
+ * It can hold (elements-1) data items. elements must be a power of 2.
+ *
+ * The writer_head is where the next element should be inserted. The reader_tail is where the next element
+ * should be read.
+ *
+ * All index variables are unbounded uint32_t. This means they just keep counting up. To get the actual
+ * index in the ring, we mask with (elements-1). For example, a ring with 16 elements will be masked with
+ * 0x0000000F. We call this the "ring_mask".
+ *
+ * Because we never let the writer_head and reader_tail differ by more than (elements-1), this technique of
+ * masking works just the same as taking the modulus. There's no problems at the uint32_t wraparound either.
+ * The only math operation we are doing is "+1" which works just fine to wrap a uint32_t.
+ *
+ * Let's look at some exampls. I'm going to use a uint16_t so its easier to write the numbers. Let's assume
+ * that the ring size is 16, so the first ring is (0 - 15).
+ * head tail
+ * initialize 0 0
+ * put x 3 3 0
+ * get x 2 3 2
+ * put x 13 16 2
+ * put x 1 17 2
+ * put x 1 blocks # (0x11 + 1) & 0x0F == tail & 0x0F
+ * get x 14 17 16
+ * get x 1 17 17 # ring is now empty
+ * ...
+ * empty 65534 65534 # 0xFFFE 0xFFFE masked = 14 14
+ * put x1 65535 65534 # 0xFFFF 0xFFFE masked = 15 14
+ * put x1 0 65534 # 0x0000 0xFFFE masked = 0 14
+ * ...
+ *
+ * The number of remaining available items is (ring_mask + reader_tail - writer_head) & ring_mask.
+ * head tail remaining
+ * initialize 0 0 15 + 0 - 0 = 15
+ * put x 3 3 0 15 + 0 - 3 = 12
+ * get x 2 3 2
+ * put x 13 16 2 15 + 2 - 16 = 1
+ * put x 1 17 2 15 + 2 - 17 = 0
+ * put x 1 blocks
+ * get x 14 17 16 15 + 16 - 17 = 14
+ * get x 1 17 17 15 + 17 - 17 = 15
+ * ...
+ * empty 65534 65534 15 + 65534 - 65534 = 13 - 65534 = 13 - (-2) = 15
+ * put x1 65535 65534 15 + 65534 - 65535 = 13 - 65535 = 13 - (-1) = 14
+ * put x1 0 65534 15 + 65534 - 0 = 13 - 65535 = 13 - ( 0) = 13
+ * ...
+ *
+ * If (writer_head + 1) & ring_mask == reader_tail, then the ring is full.
+ * If writer_head == reader_tail, then the ring is empty.
+ *
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <parc/algol/parc_Object.h>
+#include <LongBow/runtime.h>
+
+#include <parc/concurrent/parc_RingBuffer_1x1.h>
+
+#ifdef __GNUC__
+
+// on x86 or x86_64, simple assignment will work
+#if (__x86_64__ || __i386__)
+#define ATOMIC_ADD_AND_FETCH(ptr, increment) __sync_add_and_fetch(ptr, increment)
+#define ATOMIC_BOOL_CAS(ptr, oldvalue, newvalue) __sync_bool_compare_and_swap(ptr, oldvalue, newvalue)
+#define ATOMIC_FETCH(ptr) *(ptr)
+#define ATOMIC_SET(ptr, oldvalue, newvalue) *(ptr) = newvalue
+#else
+#define ATOMIC_ADD_AND_FETCH(ptr, increment) __sync_add_and_fetch(ptr, increment)
+#define ATOMIC_BOOL_CAS(ptr, oldvalue, newvalue) __sync_bool_compare_and_swap(ptr, oldvalue, newvalue)
+#define ATOMIC_FETCH(ptr) ATOMIC_ADD_AND_FETCH(ptr, 0)
+#define ATOMIC_SET(ptr, oldvalue, newvalue) ATOMIC_BOOL_CAS(ptr, oldvalue, newvalue)
+#endif
+
+#else
+#error "Only GNUC supported, we need atomic operations"
+#endif
+
+struct parc_ringbuffer_1x1 {
+ // LP64 LP32
+ volatile uint32_t writer_head; // 0- 3 0
+ volatile uint32_t reader_tail; // 4- 7 4
+ uint32_t elements; // 8-11 8
+ uint32_t ring_mask; // 12-15 12
+
+ RingBufferEntryDestroyer *destroyer; // 16-23 16
+ void **buffer; // 24-31 24
+};
+
+static bool
+_isPowerOfTwo(uint32_t x)
+{
+ return ((x != 0) && !(x & (x - 1)));
+}
+
+static void
+_destroy(PARCRingBuffer1x1 **ringptr)
+{
+ PARCRingBuffer1x1 *ring = *ringptr;
+
+ if (ring->destroyer) {
+ void *ptr = NULL;
+ while (parcRingBuffer1x1_Get(ring, &ptr)) {
+ ring->destroyer(&ptr);
+ }
+ }
+ parcMemory_Deallocate((void **) &(ring->buffer));
+}
+
+parcObject_ExtendPARCObject(PARCRingBuffer1x1, _destroy, NULL, NULL, NULL, NULL, NULL, NULL);
+
+static PARCRingBuffer1x1 *
+_create(uint32_t elements, RingBufferEntryDestroyer *destroyer)
+{
+ PARCRingBuffer1x1 *ring = parcObject_CreateInstance(PARCRingBuffer1x1);
+ assertNotNull(ring, "parcObject_Create returned NULL");
+
+ ring->buffer = parcMemory_AllocateAndClear(sizeof(void *) * elements);
+ assertNotNull((ring->buffer), "parcMemory_AllocateAndClear() failed to allocate array of %u pointers", elements);
+
+ ring->writer_head = 0;
+ ring->reader_tail = 0;
+ ring->elements = elements;
+ ring->destroyer = destroyer;
+ ring->ring_mask = elements - 1;
+
+ return ring;
+}
+
+PARCRingBuffer1x1 *
+parcRingBuffer1x1_Create(uint32_t elements, RingBufferEntryDestroyer *destroyer)
+{
+ assertTrue(_isPowerOfTwo(elements), "Parameter elements must be a power of 2, got %u", elements);
+ return _create(elements, destroyer);
+}
+
+
+parcObject_ImplementAcquire(parcRingBuffer1x1, PARCRingBuffer1x1);
+
+parcObject_ImplementRelease(parcRingBuffer1x1, PARCRingBuffer1x1);
+
+/**
+ * Put is protected by the writer mutex. This means that the tail mutex could
+ * actually increase while this is happening. That's ok. Increasing the tail
+ * just means there is _more_ room in the ring. We only modify writer_head.
+ */
+bool
+parcRingBuffer1x1_Put(PARCRingBuffer1x1 *ring, void *data)
+{
+ // Our speculative operation
+ // The consumer modifies reader_tail, so make sure that's an atomic read.
+ // only the prodcuer modifies writer_head, so there's only us
+
+ uint32_t writer_head = ring->writer_head;
+ uint32_t reader_tail = ATOMIC_FETCH(&ring->reader_tail);
+
+ uint32_t writer_next = (writer_head + 1) & ring->ring_mask;
+
+ // ring is full
+ if (writer_next == reader_tail) {
+ return false;
+ }
+
+ assertNull(ring->buffer[writer_head], "Ring index %u is not null!", writer_head);
+ ring->buffer[writer_head] = data;
+
+ // we're using this just for atomic write to the integer
+ ATOMIC_SET(&ring->writer_head, writer_head, writer_next);
+
+ return true;
+}
+
+bool
+parcRingBuffer1x1_Get(PARCRingBuffer1x1 *ring, void **outputDataPtr)
+{
+ // do our speculative operation.
+ // The producer modifies writer_head, so make sure that's an atomic read.
+ // only the consumer modifies reader_tail, so there's only us
+
+ uint32_t writer_head = ATOMIC_FETCH(&ring->writer_head); // native type assignment is atomic
+ uint32_t reader_tail = ring->reader_tail;
+ uint32_t reader_next = (reader_tail + 1) & ring->ring_mask;
+
+ // ring is empty
+ if (writer_head == reader_tail) {
+ return false;
+ }
+
+ // now try to commit it
+ ATOMIC_SET(&ring->reader_tail, reader_tail, reader_next);
+
+ *outputDataPtr = ring->buffer[reader_tail];
+
+ // for sanity's sake
+ ring->buffer[reader_tail] = NULL;
+
+ return true;
+}
+
+uint32_t
+parcRingBuffer1x1_Remaining(PARCRingBuffer1x1 *ring)
+{
+ uint32_t writer_head = ATOMIC_FETCH(&ring->writer_head);
+ uint32_t reader_tail = ATOMIC_FETCH(&ring->reader_tail);
+
+ return (ring->ring_mask + reader_tail - writer_head) & ring->ring_mask;
+}