From 3fca567ff438145e28dd1318ad5b1734c1091257 Mon Sep 17 00:00:00 2001 From: Mohsin Kazmi Date: Thu, 4 Jan 2018 18:57:26 +0100 Subject: svm: queue sub: Add conditional timed wait On reviece side svm queue only permits blocking and non-blocking calls. This patch adds timed wait blocking functionality which returns either on signal/event or on given time out. It also preserves the original behavior, so it will not hurt client applications which are using svm queue. Change-Id: Ic10632170330a80afb8bc781d4ccddfe4da2c69a Signed-off-by: Mohsin Kazmi --- src/svm/queue.c | 31 ++++++++++++++++++++++++++----- src/svm/queue.h | 21 ++++++++++++++++++++- src/tests/vnet/session/tcp_echo.c | 6 +++--- src/tests/vnet/session/udp_echo.c | 2 +- src/vlibapi/api_shared.c | 2 +- src/vlibmemory/memory_client.c | 4 ++-- src/vlibmemory/memory_shared.c | 2 +- src/vpp-api/client/client.c | 5 +++-- src/vpp-api/vapi/vapi.c | 9 ++++++--- src/vpp-api/vapi/vapi.h | 6 +++++- src/vpp-api/vapi/vapi.hpp | 5 +++-- test/ext/vapi_c_test.c | 8 ++++---- test/ext/vapi_cpp_test.cpp | 32 ++++++++++++++++++++------------ 13 files changed, 95 insertions(+), 38 deletions(-) diff --git a/src/svm/queue.c b/src/svm/queue.c index aef409277db..96e40fc2aec 100644 --- a/src/svm/queue.c +++ b/src/svm/queue.c @@ -26,6 +26,7 @@ #include #include #include +#include #include /* @@ -299,12 +300,14 @@ svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait) * svm_queue_sub */ int -svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait) +svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond, + u32 time) { i8 *headp; int need_broadcast = 0; + int rc = 0; - if (nowait) + if (cond == SVM_Q_NOWAIT) { /* zero on success */ if (pthread_mutex_trylock (&q->mutex)) @@ -317,14 +320,32 @@ svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait) if (PREDICT_FALSE (q->cursize == 0)) { - if (nowait) + if (cond == SVM_Q_NOWAIT) { pthread_mutex_unlock (&q->mutex); return (-2); } - while (q->cursize == 0) + else if (cond == SVM_Q_TIMEDWAIT) { - (void) pthread_cond_wait (&q->condvar, &q->mutex); + struct timespec ts; + ts.tv_sec = unix_time_now () + time; + ts.tv_nsec = 0; + while (q->cursize == 0 && rc == 0) + { + rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts); + } + if (rc == ETIMEDOUT) + { + pthread_mutex_unlock (&q->mutex); + return ETIMEDOUT; + } + } + else + { + while (q->cursize == 0) + { + (void) pthread_cond_wait (&q->condvar, &q->mutex); + } } } diff --git a/src/svm/queue.h b/src/svm/queue.h index dc1fc36e002..856c17237d1 100644 --- a/src/svm/queue.h +++ b/src/svm/queue.h @@ -36,6 +36,24 @@ typedef struct _svm_queue char data[0]; } svm_queue_t; +typedef enum +{ + /** + * blocking call + */ + SVM_Q_WAIT = 0, + + /** + * non-blocking call + */ + SVM_Q_NOWAIT, + + /** + * blocking call, return on signal or time-out + */ + SVM_Q_TIMEDWAIT, +} svm_q_conditional_wait_t; + svm_queue_t *svm_queue_init (int nels, int elsize, int consumer_pid, @@ -43,7 +61,8 @@ svm_queue_t *svm_queue_init (int nels, void svm_queue_free (svm_queue_t * q); int svm_queue_add (svm_queue_t * q, u8 * elem, int nowait); int svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait); -int svm_queue_sub (svm_queue_t * q, u8 * elem, int nowait); +int svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond, + u32 time); int svm_queue_sub2 (svm_queue_t * q, u8 * elem); void svm_queue_lock (svm_queue_t * q); void svm_queue_unlock (svm_queue_t * q); diff --git a/src/tests/vnet/session/tcp_echo.c b/src/tests/vnet/session/tcp_echo.c index ed9e2f7b780..3fef65ae304 100644 --- a/src/tests/vnet/session/tcp_echo.c +++ b/src/tests/vnet/session/tcp_echo.c @@ -505,7 +505,7 @@ client_handle_event_queue (uri_tcp_test_main_t * utm) { session_fifo_event_t _e, *e = &_e;; - svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ ); + svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); switch (e->event_type) { case FIFO_EVENT_APP_RX: @@ -530,7 +530,7 @@ client_rx_thread_fn (void *arg) utm->client_bytes_received = 0; while (1) { - svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ ); + svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); switch (e->event_type) { case FIFO_EVENT_APP_RX: @@ -1027,7 +1027,7 @@ server_handle_event_queue (uri_tcp_test_main_t * utm) while (1) { - svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ ); + svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); switch (e->event_type) { case FIFO_EVENT_APP_RX: diff --git a/src/tests/vnet/session/udp_echo.c b/src/tests/vnet/session/udp_echo.c index 4e17a1748da..07e72376732 100644 --- a/src/tests/vnet/session/udp_echo.c +++ b/src/tests/vnet/session/udp_echo.c @@ -956,7 +956,7 @@ server_handle_event_queue (uri_udp_test_main_t * utm) while (1) { - svm_queue_sub (utm->our_event_queue, (u8 *) e, 0 /* nowait */ ); + svm_queue_sub (utm->our_event_queue, (u8 *) e, SVM_Q_WAIT, 0); switch (e->event_type) { case FIFO_EVENT_APP_RX: diff --git a/src/vlibapi/api_shared.c b/src/vlibapi/api_shared.c index d4196264064..3b9c0f47b66 100644 --- a/src/vlibapi/api_shared.c +++ b/src/vlibapi/api_shared.c @@ -758,7 +758,7 @@ vl_msg_api_queue_handler (svm_queue_t * q) { uword msg; - while (!svm_queue_sub (q, (u8 *) & msg, 0)) + while (!svm_queue_sub (q, (u8 *) & msg, SVM_Q_WAIT, 0)) vl_msg_api_handler ((void *) msg); } diff --git a/src/vlibmemory/memory_client.c b/src/vlibmemory/memory_client.c index a0b5cc99cd4..deb913b63e3 100644 --- a/src/vlibmemory/memory_client.c +++ b/src/vlibmemory/memory_client.c @@ -218,7 +218,7 @@ vl_client_connect (const char *name, int ctx_quota, int input_queue_size) for (i = 0; i < 1000; i++) { qstatus = svm_queue_sub (vl_input_queue, (u8 *) & rp, - 1 /* nowait */ ); + SVM_Q_NOWAIT, 0); if (qstatus == 0) goto read_one_msg; ts.tv_sec = 0; @@ -305,7 +305,7 @@ vl_client_disconnect (void) am->shmem_hdr = 0; break; } - if (svm_queue_sub (vl_input_queue, (u8 *) & rp, 1) < 0) + if (svm_queue_sub (vl_input_queue, (u8 *) & rp, SVM_Q_NOWAIT, 0) < 0) continue; /* drain the queue */ diff --git a/src/vlibmemory/memory_shared.c b/src/vlibmemory/memory_shared.c index c9ace1b141d..5b7d735dab9 100644 --- a/src/vlibmemory/memory_shared.c +++ b/src/vlibmemory/memory_shared.c @@ -577,7 +577,7 @@ vl_map_shmem (const char *region_name, int is_vlib) mutex_ok: am->vlib_rp = vlib_rp; - while (svm_queue_sub (q, (u8 *) & old_msg, 1 /* nowait */ ) + while (svm_queue_sub (q, (u8 *) & old_msg, SVM_Q_NOWAIT, 0) != -2 /* queue underflow */ ) { vl_msg_api_free_nolock ((void *) old_msg); diff --git a/src/vpp-api/client/client.c b/src/vpp-api/client/client.c index f1372230195..fd2c4174285 100644 --- a/src/vpp-api/client/client.c +++ b/src/vpp-api/client/client.c @@ -143,7 +143,7 @@ vac_rx_thread_fn (void *arg) q = am->vl_input_queue; while (1) - while (!svm_queue_sub(q, (u8 *)&msg, 0)) + while (!svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0)) { u16 id = ntohs(*((u16 *)msg)); switch (id) { @@ -404,7 +404,8 @@ vac_read (char **p, int *l, u16 timeout) q = am->vl_input_queue; again: - rv = svm_queue_sub(q, (u8 *)&msg, 0); + rv = svm_queue_sub(q, (u8 *)&msg, SVM_Q_WAIT, 0); + if (rv == 0) { u16 msg_id = ntohs(*((u16 *)msg)); switch (msg_id) { diff --git a/src/vpp-api/vapi/vapi.c b/src/vpp-api/vapi/vapi.c index 7aa346bf083..754c89cf561 100644 --- a/src/vpp-api/vapi/vapi.c +++ b/src/vpp-api/vapi/vapi.c @@ -502,7 +502,8 @@ out: } vapi_error_e -vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size) +vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size, + svm_q_conditional_wait_t cond, u32 time) { if (!ctx || !ctx->connected || !msg || !msg_size) { @@ -519,7 +520,9 @@ vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size) svm_queue_t *q = am->vl_input_queue; VAPI_DBG ("doing shm queue sub"); - int tmp = svm_queue_sub (q, (u8 *) & data, 0); + + int tmp = svm_queue_sub (q, (u8 *) & data, cond, time); + if (tmp == 0) { #if VAPI_DEBUG_ALLOC @@ -700,7 +703,7 @@ vapi_dispatch_one (vapi_ctx_t ctx) VAPI_DBG ("vapi_dispatch_one()"); void *msg; size_t size; - vapi_error_e rv = vapi_recv (ctx, &msg, &size); + vapi_error_e rv = vapi_recv (ctx, &msg, &size, SVM_Q_WAIT, 0); if (VAPI_OK != rv) { VAPI_DBG ("vapi_recv failed with rv=%d", rv); diff --git a/src/vpp-api/vapi/vapi.h b/src/vpp-api/vapi/vapi.h index 245bf654e8a..fc48e7d402c 100644 --- a/src/vpp-api/vapi/vapi.h +++ b/src/vpp-api/vapi/vapi.h @@ -22,6 +22,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" @@ -162,10 +163,13 @@ extern "C" * @param ctx opaque vapi context * @param[out] msg pointer to result variable containing message * @param[out] msg_size pointer to result variable containing message size + * @param cond enum type for blocking, non-blocking or timed wait call + * @param time in sec for timed wait * * @return VAPI_OK on success, other error code on error */ - vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size); + vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size, + svm_q_conditional_wait_t cond, u32 time); /** * @brief wait for connection to become readable or writable diff --git a/src/vpp-api/vapi/vapi.hpp b/src/vpp-api/vapi/vapi.hpp index 893851a0dbb..28357db420c 100644 --- a/src/vpp-api/vapi/vapi.hpp +++ b/src/vpp-api/vapi/vapi.hpp @@ -245,7 +245,7 @@ public: * * @return VAPI_OK on success, other error code on error */ - vapi_error_e dispatch (const Common_req *limit = nullptr) + vapi_error_e dispatch (const Common_req *limit = nullptr, u32 time = 5) { std::lock_guard lock (dispatch_mutex); vapi_error_e rv = VAPI_OK; @@ -254,7 +254,8 @@ public: { void *shm_data; size_t shm_data_size; - rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size); + rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size, SVM_Q_TIMEDWAIT, + time); if (VAPI_OK != rv) { return rv; diff --git a/test/ext/vapi_c_test.c b/test/ext/vapi_c_test.c index ad75ad6e8b8..52a939f72a5 100644 --- a/test/ext/vapi_c_test.c +++ b/test/ext/vapi_c_test.c @@ -66,12 +66,12 @@ START_TEST (test_invalid_values) ck_assert_int_eq (VAPI_EINVAL, rv); rv = vapi_send (NULL, NULL); ck_assert_int_eq (VAPI_EINVAL, rv); - rv = vapi_recv (NULL, NULL, NULL); + rv = vapi_recv (NULL, NULL, NULL, 0, 0); ck_assert_int_eq (VAPI_EINVAL, rv); - rv = vapi_recv (ctx, NULL, NULL); + rv = vapi_recv (ctx, NULL, NULL, 0, 0); ck_assert_int_eq (VAPI_EINVAL, rv); vapi_msg_show_version_reply *reply; - rv = vapi_recv (ctx, (void **) &reply, NULL); + rv = vapi_recv (ctx, (void **) &reply, NULL, 0, 0); ck_assert_int_eq (VAPI_EINVAL, rv); rv = vapi_disconnect (ctx); ck_assert_int_eq (VAPI_OK, rv); @@ -531,7 +531,7 @@ START_TEST (test_show_version_1) ck_assert_int_eq (VAPI_OK, rv); vapi_msg_show_version_reply *resp; size_t size; - rv = vapi_recv (ctx, (void *) &resp, &size); + rv = vapi_recv (ctx, (void *) &resp, &size, 0, 0); ck_assert_int_eq (VAPI_OK, rv); int dummy; show_version_cb (NULL, &dummy, VAPI_OK, true, &resp->payload); diff --git a/test/ext/vapi_cpp_test.cpp b/test/ext/vapi_cpp_test.cpp index 14c35d5bedd..25ea9cc7f7b 100644 --- a/test/ext/vapi_cpp_test.cpp +++ b/test/ext/vapi_cpp_test.cpp @@ -37,6 +37,13 @@ static char *api_prefix = nullptr; static const int max_outstanding_requests = 32; static const int response_queue_size = 32; +#define WAIT_FOR_RESPONSE(param, ret) \ + do \ + { \ + ret = con.wait_for_response (param); \ + } \ + while (ret == VAPI_EAGAIN) + using namespace vapi; void verify_show_version_reply (const Show_version_reply &r) @@ -68,7 +75,7 @@ START_TEST (test_show_version_1) Show_version sv (con); vapi_error_e rv = sv.execute (); ck_assert_int_eq (VAPI_OK, rv); - rv = con.wait_for_response (sv); + WAIT_FOR_RESPONSE (sv, rv); ck_assert_int_eq (VAPI_OK, rv); auto &r = sv.get_response (); verify_show_version_reply (r); @@ -122,7 +129,8 @@ START_TEST (test_loopbacks_1) memcpy (p.mac_address, mac_addresses[i], sizeof (p.mac_address)); auto e = cl.execute (); ck_assert_int_eq (VAPI_OK, e); - vapi_error_e rv = con.wait_for_response (cl); + vapi_error_e rv; + WAIT_FOR_RESPONSE (cl, rv); ck_assert_int_eq (VAPI_OK, rv); auto &rp = cl.get_response ().get_payload (); ck_assert_int_eq (0, rp.retval); @@ -145,7 +153,7 @@ START_TEST (test_loopbacks_1) memset (p.name_filter, 0, sizeof (p.name_filter)); auto rv = d.execute (); ck_assert_int_eq (VAPI_OK, rv); - rv = con.wait_for_response (d); + WAIT_FOR_RESPONSE (d, rv); ck_assert_int_eq (VAPI_OK, rv); auto &rs = d.get_result_set (); for (auto &r : rs) @@ -172,7 +180,7 @@ START_TEST (test_loopbacks_1) dl.get_request ().get_payload ().sw_if_index = sw_if_indexes[i]; auto rv = dl.execute (); ck_assert_int_eq (VAPI_OK, rv); - rv = con.wait_for_response (dl); + WAIT_FOR_RESPONSE (dl, rv); ck_assert_int_eq (VAPI_OK, rv); auto &response = dl.get_response (); auto rp = response.get_payload (); @@ -187,7 +195,7 @@ START_TEST (test_loopbacks_1) memset (p.name_filter, 0, sizeof (p.name_filter)); auto rv = d.execute (); ck_assert_int_eq (VAPI_OK, rv); - rv = con.wait_for_response (d); + WAIT_FOR_RESPONSE (d, rv); ck_assert_int_eq (VAPI_OK, rv); auto &rs = d.get_result_set (); for (auto &r : rs) @@ -305,7 +313,7 @@ START_TEST (test_loopbacks_2) memset (p.name_filter, 0, sizeof (p.name_filter)); auto rv = d.execute (); ck_assert_int_eq (VAPI_OK, rv); - rv = con.wait_for_response (d); + WAIT_FOR_RESPONSE (d, rv); ck_assert_int_eq (VAPI_OK, rv); ck_assert_int_ne (0, swdcb.called); std::array dcbs; @@ -334,7 +342,7 @@ START_TEST (test_loopbacks_2) memset (p.name_filter, 0, sizeof (p.name_filter)); auto rv = d.execute (); ck_assert_int_eq (VAPI_OK, rv); - rv = con.wait_for_response (d); + WAIT_FOR_RESPONSE (d, rv); ck_assert_int_eq (VAPI_OK, rv); auto &rs = d.get_result_set (); for (auto &r : rs) @@ -360,7 +368,7 @@ START_TEST (test_stats_1) auto rv = ws.execute (); ck_assert_int_eq (VAPI_OK, rv); Event_registration sc (con); - rv = con.wait_for_response (sc); + WAIT_FOR_RESPONSE (sc, rv); ck_assert_int_eq (VAPI_OK, rv); auto &rs = sc.get_result_set (); int count = 0; @@ -407,7 +415,7 @@ START_TEST (test_stats_2) ck_assert_int_eq (VAPI_OK, rv); Vnet_interface_simple_counters_cb cb; Event_registration sc (con, std::ref (cb)); - rv = con.wait_for_response (sc); + WAIT_FOR_RESPONSE (sc, rv); ck_assert_int_eq (VAPI_OK, rv); ck_assert_int_ne (0, cb.called); } @@ -452,7 +460,7 @@ START_TEST (test_stats_3) Event_registration sc (con, std::ref (cb)); for (int i = 0; i < 5; ++i) { - rv = con.wait_for_response (sc); + WAIT_FOR_RESPONSE (sc, rv); } ck_assert_int_eq (VAPI_OK, rv); ck_assert_int_eq (5, cb.called); @@ -472,9 +480,9 @@ START_TEST (test_stats_4) ck_assert_int_eq (VAPI_OK, rv); Event_registration sc (con); Event_registration cc (con); - rv = con.wait_for_response (sc); + WAIT_FOR_RESPONSE (sc, rv); ck_assert_int_eq (VAPI_OK, rv); - rv = con.wait_for_response (cc); + WAIT_FOR_RESPONSE (cc, rv); ck_assert_int_eq (VAPI_OK, rv); int count = 0; for (auto &r : sc.get_result_set ()) -- cgit 1.2.3-korg