diff options
Diffstat (limited to 'vlib-api/vlibmemory/unix_shared_memory_queue.c')
-rw-r--r-- | vlib-api/vlibmemory/unix_shared_memory_queue.c | 389 |
1 files changed, 211 insertions, 178 deletions
diff --git a/vlib-api/vlibmemory/unix_shared_memory_queue.c b/vlib-api/vlibmemory/unix_shared_memory_queue.c index b8182658f65..25d28910706 100644 --- a/vlib-api/vlibmemory/unix_shared_memory_queue.c +++ b/vlib-api/vlibmemory/unix_shared_memory_queue.c @@ -1,4 +1,4 @@ -/* +/* *------------------------------------------------------------------ * unix_shared_memory_queue.c - unidirectional shared-memory queues * @@ -29,7 +29,7 @@ /* * unix_shared_memory_queue_init - * + * * nels = number of elements on the queue * elsize = element size, presumably 4 and cacheline-size will * be popular choices. @@ -46,246 +46,279 @@ * The spp process / main thread allocates one of these * at startup; its main input queue. The spp main input queue * has a pointer to it in the shared memory segment header. - * - * You probably want to be on an svm data heap before calling this + * + * You probably want to be on an svm data heap before calling this * function. */ unix_shared_memory_queue_t * -unix_shared_memory_queue_init(int nels, - int elsize, - int consumer_pid, - int signal_when_queue_non_empty) +unix_shared_memory_queue_init (int nels, + int elsize, + int consumer_pid, + int signal_when_queue_non_empty) { - unix_shared_memory_queue_t *q; - pthread_mutexattr_t attr; - pthread_condattr_t cattr; - - q = clib_mem_alloc_aligned(sizeof(unix_shared_memory_queue_t) - + nels*elsize, CLIB_CACHE_LINE_BYTES); - memset(q, 0, sizeof (*q)); - - q->elsize = elsize; - q->maxsize = nels; - q->consumer_pid = consumer_pid; - q->signal_when_queue_non_empty = signal_when_queue_non_empty; - - memset(&attr,0,sizeof(attr)); - memset(&cattr,0,sizeof(attr)); - - if (pthread_mutexattr_init(&attr)) - clib_unix_warning("mutexattr_init"); - if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) - clib_unix_warning("pthread_mutexattr_setpshared"); - if (pthread_mutex_init(&q->mutex, &attr)) - clib_unix_warning("mutex_init"); - if (pthread_mutexattr_destroy(&attr)) - clib_unix_warning("mutexattr_destroy"); - if (pthread_condattr_init(&cattr)) - clib_unix_warning("condattr_init"); - /* prints funny-looking messages in the Linux target */ - if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED)) - clib_unix_warning("condattr_setpshared"); - if (pthread_cond_init(&q->condvar, &cattr)) - clib_unix_warning("cond_init1"); - if(pthread_condattr_destroy(&cattr)) - clib_unix_warning("cond_init2"); - - return(q); + unix_shared_memory_queue_t *q; + pthread_mutexattr_t attr; + pthread_condattr_t cattr; + + q = clib_mem_alloc_aligned (sizeof (unix_shared_memory_queue_t) + + nels * elsize, CLIB_CACHE_LINE_BYTES); + memset (q, 0, sizeof (*q)); + + q->elsize = elsize; + q->maxsize = nels; + q->consumer_pid = consumer_pid; + q->signal_when_queue_non_empty = signal_when_queue_non_empty; + + memset (&attr, 0, sizeof (attr)); + memset (&cattr, 0, sizeof (attr)); + + if (pthread_mutexattr_init (&attr)) + clib_unix_warning ("mutexattr_init"); + if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED)) + clib_unix_warning ("pthread_mutexattr_setpshared"); + if (pthread_mutex_init (&q->mutex, &attr)) + clib_unix_warning ("mutex_init"); + if (pthread_mutexattr_destroy (&attr)) + clib_unix_warning ("mutexattr_destroy"); + if (pthread_condattr_init (&cattr)) + clib_unix_warning ("condattr_init"); + /* prints funny-looking messages in the Linux target */ + if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED)) + clib_unix_warning ("condattr_setpshared"); + if (pthread_cond_init (&q->condvar, &cattr)) + clib_unix_warning ("cond_init1"); + if (pthread_condattr_destroy (&cattr)) + clib_unix_warning ("cond_init2"); + + return (q); } /* * unix_shared_memory_queue_free */ -void unix_shared_memory_queue_free(unix_shared_memory_queue_t *q) +void +unix_shared_memory_queue_free (unix_shared_memory_queue_t * q) { - (void) pthread_mutex_destroy(&q->mutex); - (void) pthread_cond_destroy(&q->condvar); - clib_mem_free(q); + (void) pthread_mutex_destroy (&q->mutex); + (void) pthread_cond_destroy (&q->condvar); + clib_mem_free (q); } -void unix_shared_memory_queue_lock (unix_shared_memory_queue_t *q) +void +unix_shared_memory_queue_lock (unix_shared_memory_queue_t * q) { - pthread_mutex_lock(&q->mutex); + pthread_mutex_lock (&q->mutex); } -void unix_shared_memory_queue_unlock (unix_shared_memory_queue_t *q) +void +unix_shared_memory_queue_unlock (unix_shared_memory_queue_t * q) { - pthread_mutex_unlock(&q->mutex); + pthread_mutex_unlock (&q->mutex); } -int unix_shared_memory_queue_is_full (unix_shared_memory_queue_t *q) +int +unix_shared_memory_queue_is_full (unix_shared_memory_queue_t * q) { - return q->cursize == q->maxsize; + return q->cursize == q->maxsize; } /* * unix_shared_memory_queue_add_nolock */ -int unix_shared_memory_queue_add_nolock (unix_shared_memory_queue_t *q, - u8 *elem) +int +unix_shared_memory_queue_add_nolock (unix_shared_memory_queue_t * q, + u8 * elem) { - i8 *tailp; - int need_broadcast=0; - - if (PREDICT_FALSE(q->cursize == q->maxsize)) { - while(q->cursize == q->maxsize) { - (void) pthread_cond_wait(&q->condvar, &q->mutex); - } + i8 *tailp; + int need_broadcast = 0; + + if (PREDICT_FALSE (q->cursize == q->maxsize)) + { + while (q->cursize == q->maxsize) + { + (void) pthread_cond_wait (&q->condvar, &q->mutex); + } } - - tailp = (i8 *)(&q->data[0] + q->elsize*q->tail); - clib_memcpy(tailp, elem, q->elsize); - q->tail++; - q->cursize++; + tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); + clib_memcpy (tailp, elem, q->elsize); - need_broadcast = (q->cursize == 1); + q->tail++; + q->cursize++; - if (q->tail == q->maxsize) - q->tail = 0; + need_broadcast = (q->cursize == 1); - if (need_broadcast) { - (void) pthread_cond_broadcast(&q->condvar); - if (q->signal_when_queue_non_empty) - kill (q->consumer_pid, q->signal_when_queue_non_empty); + if (q->tail == q->maxsize) + q->tail = 0; + + if (need_broadcast) + { + (void) pthread_cond_broadcast (&q->condvar); + if (q->signal_when_queue_non_empty) + kill (q->consumer_pid, q->signal_when_queue_non_empty); } - return 0; + return 0; } -int unix_shared_memory_queue_add_raw (unix_shared_memory_queue_t *q, - u8 *elem) +int +unix_shared_memory_queue_add_raw (unix_shared_memory_queue_t * q, u8 * elem) { - i8 *tailp; - - if (PREDICT_FALSE(q->cursize == q->maxsize)) { - while(q->cursize == q->maxsize) - ; + i8 *tailp; + + if (PREDICT_FALSE (q->cursize == q->maxsize)) + { + while (q->cursize == q->maxsize) + ; } - - tailp = (i8 *)(&q->data[0] + q->elsize*q->tail); - clib_memcpy(tailp, elem, q->elsize); - q->tail++; - q->cursize++; + tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); + clib_memcpy (tailp, elem, q->elsize); - if (q->tail == q->maxsize) - q->tail = 0; - return 0; + q->tail++; + q->cursize++; + + if (q->tail == q->maxsize) + q->tail = 0; + return 0; } /* * unix_shared_memory_queue_add */ -int unix_shared_memory_queue_add (unix_shared_memory_queue_t *q, - u8 *elem, int nowait) +int +unix_shared_memory_queue_add (unix_shared_memory_queue_t * q, + u8 * elem, int nowait) { - i8 *tailp; - int need_broadcast=0; - - if (nowait) { - /* zero on success */ - if (pthread_mutex_trylock (&q->mutex)) { - return (-1); - } - } else - pthread_mutex_lock(&q->mutex); - - if (PREDICT_FALSE(q->cursize == q->maxsize)) { - if (nowait) { - pthread_mutex_unlock(&q->mutex); - return (-2); - } - while(q->cursize == q->maxsize) { - (void) pthread_cond_wait(&q->condvar, &q->mutex); - } + i8 *tailp; + int need_broadcast = 0; + + if (nowait) + { + /* zero on success */ + if (pthread_mutex_trylock (&q->mutex)) + { + return (-1); + } + } + else + pthread_mutex_lock (&q->mutex); + + if (PREDICT_FALSE (q->cursize == q->maxsize)) + { + if (nowait) + { + pthread_mutex_unlock (&q->mutex); + return (-2); + } + while (q->cursize == q->maxsize) + { + (void) pthread_cond_wait (&q->condvar, &q->mutex); + } } - - tailp = (i8 *)(&q->data[0] + q->elsize*q->tail); - clib_memcpy(tailp, elem, q->elsize); - q->tail++; - q->cursize++; + tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); + clib_memcpy (tailp, elem, q->elsize); + + q->tail++; + q->cursize++; - need_broadcast = (q->cursize == 1); + need_broadcast = (q->cursize == 1); - if (q->tail == q->maxsize) - q->tail = 0; + if (q->tail == q->maxsize) + q->tail = 0; - if (need_broadcast) { - (void) pthread_cond_broadcast(&q->condvar); - if (q->signal_when_queue_non_empty) - kill (q->consumer_pid, q->signal_when_queue_non_empty); + if (need_broadcast) + { + (void) pthread_cond_broadcast (&q->condvar); + if (q->signal_when_queue_non_empty) + kill (q->consumer_pid, q->signal_when_queue_non_empty); } - pthread_mutex_unlock(&q->mutex); + pthread_mutex_unlock (&q->mutex); - return 0; + return 0; } /* * unix_shared_memory_queue_sub */ -int unix_shared_memory_queue_sub(unix_shared_memory_queue_t *q, - u8 *elem, int nowait) +int +unix_shared_memory_queue_sub (unix_shared_memory_queue_t * q, + u8 * elem, int nowait) { - i8 *headp; - int need_broadcast=0; - - if (nowait) { - /* zero on success */ - if (pthread_mutex_trylock (&q->mutex)) { - return (-1); - } - } else - pthread_mutex_lock(&q->mutex); - - if (PREDICT_FALSE(q->cursize == 0)) { - if (nowait) { - pthread_mutex_unlock(&q->mutex); - return (-2); - } - while (q->cursize == 0) { - (void) pthread_cond_wait(&q->condvar, &q->mutex); - } + i8 *headp; + int need_broadcast = 0; + + if (nowait) + { + /* zero on success */ + if (pthread_mutex_trylock (&q->mutex)) + { + return (-1); + } } - - headp = (i8 *)(&q->data[0] + q->elsize*q->head); - clib_memcpy(elem, headp, q->elsize); - - q->head++; - if (q->cursize == q->maxsize) - need_broadcast = 1; + else + pthread_mutex_lock (&q->mutex); + + if (PREDICT_FALSE (q->cursize == 0)) + { + if (nowait) + { + pthread_mutex_unlock (&q->mutex); + return (-2); + } + while (q->cursize == 0) + { + (void) pthread_cond_wait (&q->condvar, &q->mutex); + } + } + + headp = (i8 *) (&q->data[0] + q->elsize * q->head); + clib_memcpy (elem, headp, q->elsize); - q->cursize--; + q->head++; + if (q->cursize == q->maxsize) + need_broadcast = 1; - if(q->head == q->maxsize) - q->head = 0; + q->cursize--; - if (need_broadcast) - (void) pthread_cond_broadcast(&q->condvar); - - pthread_mutex_unlock(&q->mutex); + if (q->head == q->maxsize) + q->head = 0; - return 0; + if (need_broadcast) + (void) pthread_cond_broadcast (&q->condvar); + + pthread_mutex_unlock (&q->mutex); + + return 0; } -int unix_shared_memory_queue_sub_raw (unix_shared_memory_queue_t *q, - u8 *elem) +int +unix_shared_memory_queue_sub_raw (unix_shared_memory_queue_t * q, u8 * elem) { - i8 *headp; - - if (PREDICT_FALSE(q->cursize == 0)) { - while (q->cursize == 0) - ; + i8 *headp; + + if (PREDICT_FALSE (q->cursize == 0)) + { + while (q->cursize == 0) + ; } - - headp = (i8 *)(&q->data[0] + q->elsize*q->head); - clib_memcpy(elem, headp, q->elsize); - - q->head++; - q->cursize--; - - if(q->head == q->maxsize) - q->head = 0; - return 0; + + headp = (i8 *) (&q->data[0] + q->elsize * q->head); + clib_memcpy (elem, headp, q->elsize); + + q->head++; + q->cursize--; + + if (q->head == q->maxsize) + q->head = 0; + return 0; } + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ |