diff options
Diffstat (limited to 'libparc/parc/concurrent/parc_RingBuffer_NxM.c')
-rwxr-xr-x | libparc/parc/concurrent/parc_RingBuffer_NxM.c | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/libparc/parc/concurrent/parc_RingBuffer_NxM.c b/libparc/parc/concurrent/parc_RingBuffer_NxM.c new file mode 100755 index 00000000..15ec849b --- /dev/null +++ b/libparc/parc/concurrent/parc_RingBuffer_NxM.c @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A thread-safe fixed size ring buffer. + * + * The multiple producer, multiple consumer version uses a pthread mutex around a NxM ring buffer. + * + */ + +#include <config.h> +#include <stdio.h> +#include <stdlib.h> +#include <pthread.h> +#include <errno.h> +#include <string.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_Object.h> +#include <LongBow/runtime.h> + +#include <parc/concurrent/parc_RingBuffer_1x1.h> +#include <parc/concurrent/parc_RingBuffer_NxM.h> + +struct parc_ringbuffer_NxM { + PARCRingBuffer1x1 *onebyone; + + // This protectes the overall data structure for Acquire and Release + pthread_mutex_t allocation_mutex; + + pthread_mutex_t writer_mutex; + pthread_mutex_t reader_mutex; + + RingBufferEntryDestroyer *destroyer; +}; + +/* + * Attemps a lock and returns false if we cannot get it. + * + * @endcode + */ +static bool +_lock(pthread_mutex_t *mutex) +{ + int failure = pthread_mutex_lock(mutex); + assertFalse(failure, "Error locking mutex: (%d) %s\n", errno, strerror(errno)); + return true; +} + +static bool +_unlock(pthread_mutex_t *mutex) +{ + int failure = pthread_mutex_unlock(mutex); + assertFalse(failure, "Error unlocking mutex: (%d) %s\n", errno, strerror(errno)); + return true; +} + +static void +_destroy(PARCRingBufferNxM **ringptr) +{ + PARCRingBufferNxM *ring = *ringptr; + + if (ring->destroyer) { + void *ptr = NULL; + while (parcRingBufferNxM_Get(ring, &ptr)) { + ring->destroyer(&ptr); + } + } + parcRingBuffer1x1_Release(&ring->onebyone); +} + + +parcObject_ExtendPARCObject(PARCRingBufferNxM, _destroy, NULL, NULL, NULL, NULL, NULL, NULL); + +static PARCRingBufferNxM * +_create(uint32_t elements, RingBufferEntryDestroyer *destroyer) +{ + PARCRingBufferNxM *ring = parcObject_CreateInstance(PARCRingBufferNxM); + assertNotNull(ring, "parcObject_Create returned NULL"); + + ring->onebyone = parcRingBuffer1x1_Create(elements, destroyer); + ring->destroyer = destroyer; + pthread_mutex_init(&ring->allocation_mutex, NULL); + pthread_mutex_init(&ring->writer_mutex, NULL); + pthread_mutex_init(&ring->reader_mutex, NULL); + return ring; +} + +PARCRingBufferNxM * +parcRingBufferNxM_Create(uint32_t elements, RingBufferEntryDestroyer *destroyer) +{ + return _create(elements, destroyer); +} + +PARCRingBufferNxM * +parcRingBufferNxM_Acquire(PARCRingBufferNxM *ring) +{ + PARCRingBufferNxM *acquired; + + _lock(&ring->allocation_mutex); + acquired = parcObject_Acquire(ring); + _unlock(&ring->allocation_mutex); + + return acquired; +} + +void +parcRingBufferNxM_Release(PARCRingBufferNxM **ringPtr) +{ + PARCRingBufferNxM *ring = *ringPtr; + _lock(&ring->allocation_mutex); + parcObject_Release((void **) ringPtr); + _unlock(&ring->allocation_mutex); +} + +/** + * Put is protected by the writer mutex. This means that the tail mutex could + * actually increase while this is happening. That's ok. Increasing the tail + * just means there is _more_ room in the ring. We only modify writer_head. + */ +bool +parcRingBufferNxM_Put(PARCRingBufferNxM *ring, void *data) +{ + // **** LOCK + _lock(&ring->writer_mutex); + bool success = parcRingBuffer1x1_Put(ring->onebyone, data); + // **** UNLOCK + _unlock(&ring->writer_mutex); + return success; +} + +bool +parcRingBufferNxM_Get(PARCRingBufferNxM *ring, void **outputDataPtr) +{ + // **** LOCK + _lock(&ring->reader_mutex); + bool success = parcRingBuffer1x1_Get(ring->onebyone, outputDataPtr); + // **** UNLOCK + _unlock(&ring->reader_mutex); + return success; +} + +uint32_t +parcRingBufferNxM_Remaining(PARCRingBufferNxM *ring) +{ + _lock(&ring->writer_mutex); + _lock(&ring->reader_mutex); + + uint32_t remaining = parcRingBuffer1x1_Remaining(ring->onebyone); + + _unlock(&ring->reader_mutex); + _unlock(&ring->writer_mutex); + + return remaining; +} |