From 3fca567ff438145e28dd1318ad5b1734c1091257 Mon Sep 17 00:00:00 2001 From: Mohsin Kazmi Date: Thu, 4 Jan 2018 18:57:26 +0100 Subject: svm: queue sub: Add conditional timed wait On reviece side svm queue only permits blocking and non-blocking calls. This patch adds timed wait blocking functionality which returns either on signal/event or on given time out. It also preserves the original behavior, so it will not hurt client applications which are using svm queue. Change-Id: Ic10632170330a80afb8bc781d4ccddfe4da2c69a Signed-off-by: Mohsin Kazmi --- src/svm/queue.c | 31 ++++++++++++++++++++++++++----- src/svm/queue.h | 21 ++++++++++++++++++++- src/tests/vnet/session/tcp_echo.c | 6 +++--- src/tests/vnet/session/udp_echo.c | 2 +- src/vlibapi/api_shared.c | 2 +- src/vlibmemory/memory_client.c | 4 ++-- src/vlibmemory/memory_shared.c | 2 +- src/vpp-api/client/client.c | 5 +++-- src/vpp-api/vapi/vapi.c | 9 ++++++--- src/vpp-api/vapi/vapi.h | 6 +++++- src/vpp-api/vapi/vapi.hpp | 5 +++-- 11 files changed, 71 insertions(+), 22 deletions(-) (limited to 'src') diff --git a/src/svm/queue.c b/src/svm/queue.c index aef409277db..96e40fc2aec 100644 --- a/src/svm/queue.c +++ b/src/svm/queue.c @@ -26,6 +26,7 @@ #include #include #include +#include #include /* @@ -299,12 +300,14 @@ svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait) * svm_queue_sub */ int -svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait) +svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond, + u32 time) { i8 *headp; int need_broadcast = 0; + int rc = 0; - if (nowait) + if (cond == SVM_Q_NOWAIT) { /* zero on success */ if (pthread_mutex_trylock (&q->mutex)) @@ -317,14 +320,32 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait) if (PREDICT_FALSE (q->cursize == 0)) { - if (nowait) + if (cond == SVM_Q_NOWAIT) { pthread_mutex_unlock (&q->mutex); return (-2); } - while (q->cursize == 0) + else if (cond == SVM_Q_TIMEDWAIT) { - (void) pthread_cond_wait (&q->condvar, &q->mutex); + struct timespec ts; + ts.tv_sec = unix_time_now () + time; + ts.tv_nsec = 0; + while (q->cursize == 0 && rc == 0) + { + rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts); + } + if (rc == ETIMEDOUT) + { + pthread_mutex_unlock (&q->mutex); + return ETIMEDOUT; + } + } + else + { + while (q->cursize == 0) + { + (void) pthread_cond_wait (&q->condvar, &q->mutex); + } } } diff --git a/src/svm/queue.h b/src/svm/queue.h index dc1fc36e002..856c17237d1 100644 --- a/src/svm/queue.h +++ b/src/svm/queue.h @@ -36,6 +36,24 @@ typedef struct _svm_queue char data[0]; } svm_queue_t; +typedef enum +{ + /** + * blocking call + */ + SVM_Q_WAIT = 0, + + /** + * non-blocking call + */ + SVM_Q_NOWAIT, + + /** + * blocking call, return on signal or time-out + */ + SVM_Q_TIMEDWAIT, +} svm_q_conditional_wait_t; + svm_queue_t *svm_queue_init (int nels, int elsize, int consumer_pid, @@ -43,7 +61,8 @@ svm_queue_t *svm_queue_init (int nels, void svm_queue_free (svm_queue_t * q); int svm_queue_add (svm_queue_t * q, u8 * elem, int nowait); int svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait); -int svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait); +int svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond, + u32 time); int svm_queue_sub2 (svm_queue_t * q, u8 * elem); void svm_queue_lock (svm_queue_t * q); void svm_queue_unlock (svm_queue_t * q); diff --git a/src/tests/vnet/session/tcp_echo.c b/src/tests/vnet/session/tcp_echo.c index ed9e2f7b780..3fef65ae304 100644 --- a/src/tests/vnet/session/tcp_echo.c +++ b/src/tests/vnet/session/tcp_echo.c @@ -505,7 +505,7 @@ client_handle_event_queue (uri_tcp_test_main_t * utm) { session_fifo_event_t _e, *e = &_e;; - svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ ); + svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); switch (e->event_type) { case FIFO_EVENT_APP_RX: @@ -530,7 +530,7 @@ client_rx_thread_fn (void *arg) utm->client_bytes_received = 0; while (1) { - svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ ); + svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); switch (e->event_type) { case FIFO_EVENT_APP_RX: @@ -1027,7 +1027,7 @@ server_handle_event_queue (uri_tcp_test_main_t * utm) while (1) { - svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ ); + svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); switch (e->event_type) { case FIFO_EVENT_APP_RX: diff --git a/src/tests/vnet/session/udp_echo.c b/src/tests/vnet/session/udp_echo.c index 4e17a1748da..07e72376732 100644 --- a/src/tests/vnet/session/udp_echo.c +++ b/src/tests/vnet/session/udp_echo.c @@ -956,7 +956,7 @@ server_handle_event_queue (uri_udp_test_main_t * utm) while (1) { - svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ ); + svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); switch (e->event_type) { case FIFO_EVENT_APP_RX: diff --git a/src/vlibapi/api_shared.c b/src/vlibapi/api_shared.c index d4196264064..3b9c0f47b66 100644 --- a/src/vlibapi/api_shared.c +++ b/src/vlibapi/api_shared.c @@ -758,7 +758,7 @@ vl_msg_api_queue_handler (svm_queue_t * q) { uword msg; - while (!svm_queue_sub (q, (u8 *) & msg, 0)) + while (!svm_queue_sub (q, (u8 *) & msg, SVM_Q_WAIT, 0)) vl_msg_api_handler ((void *) msg); } diff --git a/src/vlibmemory/memory_client.c b/src/vlibmemory/memory_client.c index a0b5cc99cd4..deb913b63e3 100644 --- a/src/vlibmemory/memory_client.c +++ b/src/vlibmemory/memory_client.c @@ -218,7 +218,7 @@ vl_client_connect (const char *name, int ctx_quota, int input_queue_size) for (i = 0; i < 1000; i++) { qstatus = svm_queue_sub (vl_input_queue, (u8 *) & rp, - 1 /* nowait */ ); + SVM_Q_NOWAIT, 0); if (qstatus == 0) goto read_one_msg; ts.tv_sec = 0; @@ -305,7 +305,7 @@ vl_client_disconnect (void) am->shmem_hdr = 0; break; } - if (svm_queue_sub (vl_input_queue, (u8 *) & rp, 1) < 0) + if (svm_queue_sub (vl_input_queue, (u8 *) & rp, SVM_Q_NOWAIT, 0) < 0) continue; /* drain the queue */ diff --git a/src/vlibmemory/memory_shared.c b/src/vlibmemory/memory_shared.c index c9ace1b141d..5b7d735dab9 100644 --- a/src/vlibmemory/memory_shared.c +++ b/src/vlibmemory/memory_shared.c @@ -577,7 +577,7 @@ vl_map_shmem (const char *region_name, int is_vlib) mutex_ok: am->vlib_rp = vlib_rp; - while (svm_queue_sub (q, (u8 *) & old_msg, 1 /* nowait */ ) + while (svm_queue_sub (q, (u8 *) & old_msg, SVM_Q_NOWAIT, 0) != -2 /* queue underflow */ ) { vl_msg_api_free_nolock ((void *) old_msg); diff --git a/src/vpp-api/client/client.c b/src/vpp-api/client/client.c index f1372230195..fd2c4174285 100644 --- a/src/vpp-api/client/client.c +++ b/src/vpp-api/client/client.c @@ -143,7 +143,7 @@ vac_rx_thread_fn (void *arg) q = am->vl_input_queue; while (1) - while (!svm_queue_sub(q, (u8 *)&msg, 0)) + while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0)) { u16 id = ntohs(*((u16 *)msg)); switch (id) { @@ -404,7 +404,8 @@ vac_read (char **p, int *l, u16 timeout) q = am->vl_input_queue; again: - rv = svm_queue_sub(q, (u8 *)&msg, 0); + rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0); + if (rv == 0) { u16 msg_id = ntohs(*((u16 *)msg)); switch (msg_id) { diff --git a/src/vpp-api/vapi/vapi.c b/src/vpp-api/vapi/vapi.c index 7aa346bf083..754c89cf561 100644 --- a/src/vpp-api/vapi/vapi.c +++ b/src/vpp-api/vapi/vapi.c @@ -502,7 +502,8 @@ out: } vapi_error_e -vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size) +vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size, + svm_q_conditional_wait_t cond, u32 time) { if (!ctx || !ctx->connected || !msg || !msg_size) { @@ -519,7 +520,9 @@ vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size) svm_queue_t *q = am->vl_input_queue; VAPI_DBG ("doing shm queue sub"); - int tmp = svm_queue_sub (q, (u8 *) & data, 0); + + int tmp = svm_queue_sub (q, (u8 *) & data, cond, time); + if (tmp == 0) { #if VAPI_DEBUG_ALLOC @@ -700,7 +703,7 @@ vapi_dispatch_one (vapi_ctx_t ctx) VAPI_DBG ("vapi_dispatch_one()"); void *msg; size_t size; - vapi_error_e rv = vapi_recv (ctx, &msg, &size); + vapi_error_e rv = vapi_recv (ctx, &msg, &size, SVM_Q_WAIT, 0); if (VAPI_OK != rv) { VAPI_DBG ("vapi_recv failed with rv=%d", rv); diff --git a/src/vpp-api/vapi/vapi.h b/src/vpp-api/vapi/vapi.h index 245bf654e8a..fc48e7d402c 100644 --- a/src/vpp-api/vapi/vapi.h +++ b/src/vpp-api/vapi/vapi.h @@ -22,6 +22,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" @@ -162,10 +163,13 @@ extern "C" * @param ctx opaque vapi context * @param[out] msg pointer to result variable containing message * @param[out] msg_size pointer to result variable containing message size + * @param cond enum type for blocking, non-blocking or timed wait call + * @param time in sec for timed wait * * @return VAPI_OK on success, other error code on error */ - vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size); + vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size, + svm_q_conditional_wait_t cond, u32 time); /** * @brief wait for connection to become readable or writable diff --git a/src/vpp-api/vapi/vapi.hpp b/src/vpp-api/vapi/vapi.hpp index 893851a0dbb..28357db420c 100644 --- a/src/vpp-api/vapi/vapi.hpp +++ b/src/vpp-api/vapi/vapi.hpp @@ -245,7 +245,7 @@ public: * * @return VAPI_OK on success, other error code on error */ - vapi_error_e dispatch (const Common_req *limit = nullptr) + vapi_error_e dispatch (const Common_req *limit = nullptr, u32 time = 5) { std::lock_guard lock (dispatch_mutex); vapi_error_e rv = VAPI_OK; @@ -254,7 +254,8 @@ public: { void *shm_data; size_t shm_data_size; - rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size); + rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size, SVM_Q_TIMEDWAIT, + time); if (VAPI_OK != rv) { return rv; -- cgit 1.2.3-korg