diff options
author | Stanislav Zaikin <stanislav.zaikin@46labs.com> | 2024-03-06 19:48:30 +0100 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2024-03-18 17:30:07 +0000 |
commit | dc4d21e9ce78a77caa7abfe997021cd735863e0f (patch) | |
tree | 17b6636f29b256020eff224cfe7a2b76ad77726e /src/vpp-api/vapi/vapi.c | |
parent | 3eb6cbec50a5cbe4c3465d60ba6aea7bf2845cd1 (diff) |
vapi: uds transport support
introduce ability to connect over unix socket instead of shared memory
Type: improvement
Change-Id: Id9042c74e33ad4e418896c4d7ae48bb9106195c9
Signed-off-by: Stanislav Zaikin <stanislav.zaikin@46labs.com>
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Diffstat (limited to 'src/vpp-api/vapi/vapi.c')
-rw-r--r-- | src/vpp-api/vapi/vapi.c | 816 |
1 files changed, 688 insertions, 128 deletions
diff --git a/src/vpp-api/vapi/vapi.c b/src/vpp-api/vapi/vapi.c index 46ccf37a8aa..022f023aeb0 100644 --- a/src/vpp-api/vapi/vapi.c +++ b/src/vpp-api/vapi/vapi.c @@ -98,8 +98,11 @@ struct vapi_ctx_s bool connected; bool handle_keepalives; pthread_mutex_t requests_mutex; + bool use_uds; svm_queue_t *vl_input_queue; + clib_socket_t client_socket; + clib_time_t time; u32 my_client_index; /** client message index hash table */ uword *msg_index_by_name_and_crc; @@ -230,8 +233,8 @@ vapi_to_be_freed_validate () #endif -void * -vapi_msg_alloc (vapi_ctx_t ctx, size_t size) +static void * +vapi_shm_msg_alloc (vapi_ctx_t ctx, size_t size) { if (!ctx->connected) { @@ -245,6 +248,23 @@ vapi_msg_alloc (vapi_ctx_t ctx, size_t size) return rv; } +static void * +vapi_sock_msg_alloc (size_t size) +{ + u8 *rv = 0; + vec_validate_init_empty (rv, size - 1, 0); + return rv; +} + +void * +vapi_msg_alloc (vapi_ctx_t ctx, size_t size) +{ + if (ctx->use_uds) + return vapi_sock_msg_alloc (size); + + return vapi_shm_msg_alloc (ctx, size); +} + void vapi_msg_free (vapi_ctx_t ctx, void *msg) { @@ -252,10 +272,19 @@ vapi_msg_free (vapi_ctx_t ctx, void *msg) { return; } + #if VAPI_DEBUG_ALLOC vapi_trace_free (msg); #endif - vl_msg_api_free (msg); + + if (ctx->use_uds) + { + vec_free (msg); + } + else + { + vl_msg_api_free (msg); + } } vapi_msg_id_t @@ -294,6 +323,7 @@ vapi_ctx_alloc (vapi_ctx_t * result) } pthread_mutex_init (&ctx->requests_mutex, NULL); *result = ctx; + clib_time_init (&ctx->time); return VAPI_OK; fail: vapi_ctx_free (ctx); @@ -343,6 +373,205 @@ vapi_api_name_and_crc_free (vapi_ctx_t ctx) hash_free (ctx->msg_index_by_name_and_crc); } +static vapi_error_e +vapi_sock_get_errno (int err) +{ + switch (err) + { + case ENOTSOCK: + return VAPI_ENOTSOCK; + case EACCES: + return VAPI_EACCES; + case ECONNRESET: + return VAPI_ECONNRESET; + default: + break; + } + return VAPI_ESOCK_FAILURE; +} + +static vapi_error_e +vapi_sock_send (vapi_ctx_t ctx, u8 *msg) +{ + size_t n; + struct msghdr hdr; + + const size_t len = vec_len (msg); + const size_t total_len = len + sizeof (msgbuf_t); + + msgbuf_t msgbuf1 = { + .q = 0, + .gc_mark_timestamp = 0, + .data_len = htonl (len), + }; + + struct iovec bufs[2] = { + [0] = { .iov_base = &msgbuf1, .iov_len = sizeof (msgbuf1) }, + [1] = { .iov_base = msg, .iov_len = len }, + }; + + clib_memset (&hdr, 0, sizeof (hdr)); + hdr.msg_iov = bufs; + hdr.msg_iovlen = 2; + + n = sendmsg (ctx->client_socket.fd, &hdr, 0); + if (n < 0) + { + return vapi_sock_get_errno (errno); + } + + if (n < total_len) + { + return VAPI_EAGAIN; + } + + vec_free (msg); + + return VAPI_OK; +} + +static vapi_error_e +vapi_sock_send2 (vapi_ctx_t ctx, u8 *msg1, u8 *msg2) +{ + size_t n; + struct msghdr hdr; + + const size_t len1 = vec_len (msg1); + const size_t len2 = vec_len (msg2); + const size_t total_len = len1 + len2 + 2 * sizeof (msgbuf_t); + + msgbuf_t msgbuf1 = { + .q = 0, + .gc_mark_timestamp = 0, + .data_len = htonl (len1), + }; + + msgbuf_t msgbuf2 = { + .q = 0, + .gc_mark_timestamp = 0, + .data_len = htonl (len2), + }; + + struct iovec bufs[4] = { + [0] = { .iov_base = &msgbuf1, .iov_len = sizeof (msgbuf1) }, + [1] = { .iov_base = msg1, .iov_len = len1 }, + [2] = { .iov_base = &msgbuf2, .iov_len = sizeof (msgbuf2) }, + [3] = { .iov_base = msg2, .iov_len = len2 }, + }; + + clib_memset (&hdr, 0, sizeof (hdr)); + hdr.msg_iov = bufs; + hdr.msg_iovlen = 4; + + n = sendmsg (ctx->client_socket.fd, &hdr, 0); + if (n < 0) + { + return vapi_sock_get_errno (errno); + } + + if (n < total_len) + { + return VAPI_EAGAIN; + } + + vec_free (msg1); + vec_free (msg2); + + return VAPI_OK; +} + +static vapi_error_e +vapi_sock_recv_internal (vapi_ctx_t ctx, u8 **vec_msg, u32 timeout) +{ + clib_socket_t *sock = &ctx->client_socket; + u32 data_len = 0, msg_size; + msgbuf_t *mbp = 0; + ssize_t n, current_rx_index; + f64 deadline; + vapi_error_e rv = VAPI_EAGAIN; + + if (ctx->client_socket.fd == 0) + return VAPI_ENOTSOCK; + + deadline = clib_time_now (&ctx->time) + timeout; + + while (1) + { + current_rx_index = vec_len (sock->rx_buffer); + while (current_rx_index < sizeof (*mbp)) + { + vec_validate (sock->rx_buffer, sizeof (*mbp) - 1); + n = recv (sock->fd, sock->rx_buffer + current_rx_index, + sizeof (*mbp) - current_rx_index, MSG_DONTWAIT); + if (n < 0) + { + if (errno == EAGAIN && clib_time_now (&ctx->time) >= deadline) + return VAPI_EAGAIN; + + if (errno == EAGAIN) + continue; + + clib_unix_warning ("socket_read"); + vec_set_len (sock->rx_buffer, current_rx_index); + return vapi_sock_get_errno (errno); + } + current_rx_index += n; + } + vec_set_len (sock->rx_buffer, current_rx_index); + + mbp = (msgbuf_t *) (sock->rx_buffer); + data_len = ntohl (mbp->data_len); + current_rx_index = vec_len (sock->rx_buffer); + vec_validate (sock->rx_buffer, current_rx_index + data_len); + mbp = (msgbuf_t *) (sock->rx_buffer); + msg_size = data_len + sizeof (*mbp); + + while (current_rx_index < msg_size) + { + n = recv (sock->fd, sock->rx_buffer + current_rx_index, + msg_size - current_rx_index, MSG_DONTWAIT); + if (n < 0) + { + if (errno == EAGAIN && clib_time_now (&ctx->time) >= deadline) + return VAPI_EAGAIN; + + if (errno == EAGAIN) + continue; + + clib_unix_warning ("socket_read"); + vec_set_len (sock->rx_buffer, current_rx_index); + return vapi_sock_get_errno (errno); + } + current_rx_index += n; + } + vec_set_len (sock->rx_buffer, current_rx_index); + + if (vec_len (sock->rx_buffer) >= data_len + sizeof (*mbp)) + { + if (data_len) + { + vec_add (*vec_msg, mbp->data, data_len); + rv = VAPI_OK; + } + else + { + *vec_msg = 0; + } + + if (vec_len (sock->rx_buffer) == data_len + sizeof (*mbp)) + vec_set_len (sock->rx_buffer, 0); + else + vec_delete (sock->rx_buffer, data_len + sizeof (*mbp), 0); + mbp = 0; + + /* Quit if we're out of data, and not expecting a ping reply */ + if (vec_len (sock->rx_buffer) == 0) + break; + } + } + return rv; +} + static void vapi_memclnt_create_v2_reply_t_handler (vapi_ctx_t ctx, vl_api_memclnt_create_v2_reply_t *mp) @@ -377,6 +606,28 @@ vapi_memclnt_create_v2_reply_t_handler (vapi_ctx_t ctx, } static void +vapi_sockclnt_create_reply_t_handler (vapi_ctx_t ctx, + vl_api_sockclnt_create_reply_t *mp) +{ + int i; + u8 *name_and_crc; + + ctx->my_client_index = mp->index; + + /* Clean out any previous hash table (unlikely) */ + vapi_api_name_and_crc_free (ctx); + + ctx->msg_index_by_name_and_crc = hash_create_string (0, sizeof (uword)); + + for (i = 0; i < be16toh (mp->count); i++) + { + name_and_crc = format (0, "%s%c", mp->message_table[i].name, 0); + hash_set_mem (ctx->msg_index_by_name_and_crc, name_and_crc, + be16toh (mp->message_table[i].index)); + } +} + +static void vapi_memclnt_delete_reply_t_handler (vapi_ctx_t ctx, vl_api_memclnt_delete_reply_t *mp) { @@ -389,9 +640,17 @@ vapi_memclnt_delete_reply_t_handler (vapi_ctx_t ctx, ctx->vl_input_queue = 0; } +static void +vapi_sockclnt_delete_reply_t_handler (vapi_ctx_t ctx, + vl_api_sockclnt_delete_reply_t *mp) +{ + ctx->my_client_index = ~0; + ctx->vl_input_queue = 0; +} + static int -vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota, - int input_queue_size, bool keepalive) +vapi_shm_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota, + int input_queue_size, bool keepalive) { vl_api_memclnt_create_v2_t *mp; vl_api_memclnt_create_v2_reply_t *rp; @@ -406,7 +665,7 @@ vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota, if (shmem_hdr == 0 || shmem_hdr->vl_input_queue == 0) { clib_warning ("shmem_hdr / input queue NULL"); - return -1; + return VAPI_ECON_FAIL; } clib_mem_unpoison (shmem_hdr, sizeof (*shmem_hdr)); @@ -449,7 +708,7 @@ vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota, ts = tsrem; } /* Timeout... */ - return -1; + return VAPI_ECON_FAIL; read_one_msg: VL_MSG_API_UNPOISON (rp); @@ -465,8 +724,83 @@ vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota, return (rv); } +static int +vapi_sock_client_connect (vapi_ctx_t ctx, char *path, const char *name) +{ + clib_error_t *error; + clib_socket_t *sock; + vl_api_sockclnt_create_t *mp; + vl_api_sockclnt_create_reply_t *rp; + int rv = 0; + u8 *msg = 0; + + ctx->my_client_index = ~0; + + if (ctx->client_socket.fd) + return VAPI_EINVAL; + + if (name == 0) + return VAPI_EINVAL; + + sock = &ctx->client_socket; + sock->config = path ? path : API_SOCKET_FILE; + sock->flags = CLIB_SOCKET_F_IS_CLIENT; + + if ((error = clib_socket_init (sock))) + { + clib_error_report (error); + return VAPI_ECON_FAIL; + } + + mp = vapi_sock_msg_alloc (sizeof (vl_api_sockclnt_create_t)); + mp->_vl_msg_id = ntohs (VL_API_SOCKCLNT_CREATE); + strncpy ((char *) mp->name, name, sizeof (mp->name) - 1); + + if (vapi_sock_send (ctx, (void *) mp) != VAPI_OK) + { + return VAPI_ECON_FAIL; + } + + while (1) + { + int qstatus; + struct timespec ts, tsrem; + int i; + + /* Wait up to 10 seconds */ + for (i = 0; i < 1000; i++) + { + qstatus = vapi_sock_recv_internal (ctx, &msg, 0); + + if (qstatus == 0) + goto read_one_msg; + ts.tv_sec = 0; + ts.tv_nsec = 10000 * 1000; /* 10 ms */ + while (nanosleep (&ts, &tsrem) < 0) + ts = tsrem; + } + /* Timeout... */ + return -1; + + read_one_msg: + if (vec_len (msg) == 0) + continue; + + rp = (void *) msg; + if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_CREATE_REPLY) + { + clib_warning ("unexpected reply: id %d", ntohs (rp->_vl_msg_id)); + continue; + } + rv = clib_net_to_host_u32 (rp->response); + vapi_sockclnt_create_reply_t_handler (ctx, rp); + break; + } + return (rv); +} + static void -vapi_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup) +vapi_shm_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup) { vl_api_memclnt_delete_t *mp; vl_shmem_hdr_t *shmem_hdr; @@ -485,8 +819,21 @@ vapi_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup) vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) &mp); } +static vapi_error_e +vapi_sock_client_send_disconnect (vapi_ctx_t ctx) +{ + vl_api_sockclnt_delete_t *mp; + + mp = vapi_msg_alloc (ctx, sizeof (vl_api_sockclnt_delete_t)); + clib_memset (mp, 0, sizeof (*mp)); + mp->_vl_msg_id = ntohs (VL_API_SOCKCLNT_DELETE); + mp->client_index = ctx->my_client_index; + + return vapi_sock_send (ctx, (void *) mp); +} + static int -vapi_client_disconnect (vapi_ctx_t ctx) +vapi_shm_client_disconnect (vapi_ctx_t ctx) { vl_api_memclnt_delete_reply_t *rp; svm_queue_t *vl_input_queue; @@ -494,7 +841,7 @@ vapi_client_disconnect (vapi_ctx_t ctx) msgbuf_t *msgbuf; vl_input_queue = ctx->vl_input_queue; - vapi_client_send_disconnect (ctx, 0 /* wait for reply */); + vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */); /* * Have to be careful here, in case the client is disconnecting @@ -511,7 +858,7 @@ vapi_client_disconnect (vapi_ctx_t ctx) { clib_warning ("peer unresponsive, give up"); ctx->my_client_index = ~0; - return -1; + return VAPI_ENORESP; } if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0) continue; @@ -535,6 +882,65 @@ vapi_client_disconnect (vapi_ctx_t ctx) return 0; } +static vapi_error_e +vapi_sock_client_disconnect (vapi_ctx_t ctx) +{ + vl_api_sockclnt_delete_reply_t *rp; + u8 *msg = 0; + msgbuf_t *msgbuf; + int rv; + f64 deadline; + + deadline = clib_time_now (&ctx->time) + 2; + + do + { + rv = vapi_sock_client_send_disconnect (ctx); + } + while (clib_time_now (&ctx->time) < deadline && rv != VAPI_OK); + + while (1) + { + if (clib_time_now (&ctx->time) >= deadline) + { + clib_warning ("peer unresponsive, give up"); + ctx->my_client_index = ~0; + return VAPI_ENORESP; + } + + if (vapi_sock_recv_internal (ctx, &msg, 0) != VAPI_OK) + continue; + + msgbuf = (void *) msg; + rp = (void *) msgbuf->data; + /* drain the queue */ + if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_DELETE_REPLY) + { + clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id)); + msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data)); + vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len)); + continue; + } + msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data)); + vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len)); + break; + } + + clib_socket_close (&ctx->client_socket); + vapi_api_name_and_crc_free (ctx); + return VAPI_OK; +} + +int +vapi_client_disconnect (vapi_ctx_t ctx) +{ + if (ctx->use_uds) + { + return vapi_sock_client_disconnect (ctx); + } + return vapi_shm_client_disconnect (ctx); +} + u32 vapi_api_get_msg_index (vapi_ctx_t ctx, u8 *name_and_crc) { @@ -550,9 +956,9 @@ vapi_api_get_msg_index (vapi_ctx_t ctx, u8 *name_and_crc) } vapi_error_e -vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix, - int max_outstanding_requests, int response_queue_size, - vapi_mode_e mode, bool handle_keepalives) +vapi_connect_ex (vapi_ctx_t ctx, const char *name, const char *path, + int max_outstanding_requests, int response_queue_size, + vapi_mode_e mode, bool handle_keepalives, bool use_uds) { int rv; @@ -560,7 +966,8 @@ vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix, { return VAPI_EINVAL; } - if (!clib_mem_get_per_cpu_heap () && !clib_mem_init (0, 1024 * 1024 * 32)) + + if (!clib_mem_get_per_cpu_heap () && !clib_mem_init (0, 1024L * 1024 * 32)) { return VAPI_ENOMEM; } @@ -576,28 +983,39 @@ vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix, clib_memset (ctx->requests, 0, size); /* coverity[MISSING_LOCK] - 177211 requests_mutex is not needed here */ ctx->requests_start = ctx->requests_count = 0; + ctx->use_uds = use_uds; - if (chroot_prefix) + if (use_uds) { - VAPI_DBG ("set memory root path `%s'", chroot_prefix); - vl_set_memory_root_path ((char *) chroot_prefix); - } - static char api_map[] = "/vpe-api"; - VAPI_DBG ("client api map `%s'", api_map); - if ((rv = vl_map_shmem (api_map, 0 /* is_vlib */)) < 0) - { - return VAPI_EMAP_FAIL; + if (vapi_sock_client_connect (ctx, (char *) path, name) < 0) + { + return VAPI_ECON_FAIL; + } } - VAPI_DBG ("connect client `%s'", name); - if (vapi_client_connect (ctx, (char *) name, 0, response_queue_size, true) < - 0) + else { - vl_client_api_unmap (); - return VAPI_ECON_FAIL; - } + if (path) + { + VAPI_DBG ("set memory root path `%s'", path); + vl_set_memory_root_path ((char *) path); + } + static char api_map[] = "/vpe-api"; + VAPI_DBG ("client api map `%s'", api_map); + if ((rv = vl_map_shmem (api_map, 0 /* is_vlib */)) < 0) + { + return VAPI_EMAP_FAIL; + } + VAPI_DBG ("connect client `%s'", name); + if (vapi_shm_client_connect (ctx, (char *) name, 0, response_queue_size, + true) < 0) + { + vl_client_api_unmap (); + return VAPI_ECON_FAIL; + } #if VAPI_DEBUG_CONNECT VAPI_DBG ("start probing messages"); #endif + } int i; for (i = 0; i < __vapi_metadata.count; ++i) @@ -670,6 +1088,15 @@ fail: return rv; } +vapi_error_e +vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix, + int max_outstanding_requests, int response_queue_size, + vapi_mode_e mode, bool handle_keepalives) +{ + return vapi_connect_ex (ctx, name, chroot_prefix, max_outstanding_requests, + response_queue_size, mode, handle_keepalives, false); +} + /* * API client running in the same process as VPP */ @@ -680,6 +1107,11 @@ vapi_connect_from_vpp (vapi_ctx_t ctx, const char *name, { int rv; + if (ctx->use_uds) + { + return VAPI_ENOTSUP; + } + if (response_queue_size <= 0 || max_outstanding_requests <= 0) { return VAPI_EINVAL; @@ -698,8 +1130,8 @@ vapi_connect_from_vpp (vapi_ctx_t ctx, const char *name, ctx->requests_start = ctx->requests_count = 0; VAPI_DBG ("connect client `%s'", name); - if (vapi_client_connect (ctx, (char *) name, 0, response_queue_size, - handle_keepalives) < 0) + if (vapi_shm_client_connect (ctx, (char *) name, 0, response_queue_size, + handle_keepalives) < 0) { return VAPI_ECON_FAIL; } @@ -773,11 +1205,17 @@ vapi_disconnect_from_vpp (vapi_ctx_t ctx) { return VAPI_EINVAL; } + + if (ctx->use_uds) + { + return VAPI_ENOTSUP; + } + vl_api_memclnt_delete_reply_t *rp; svm_queue_t *vl_input_queue; time_t begin; vl_input_queue = ctx->vl_input_queue; - vapi_client_send_disconnect (ctx, 0 /* wait for reply */); + vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */); /* * Have to be careful here, in case the client is disconnecting @@ -821,19 +1259,14 @@ fail: return rv; } -vapi_error_e -vapi_disconnect (vapi_ctx_t ctx) +static vapi_error_e +vapi_shm_disconnect (vapi_ctx_t ctx) { - if (!ctx->connected) - { - return VAPI_EINVAL; - } - vl_api_memclnt_delete_reply_t *rp; svm_queue_t *vl_input_queue; time_t begin; vl_input_queue = ctx->vl_input_queue; - vapi_client_send_disconnect (ctx, 0 /* wait for reply */); + vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */); /* * Have to be careful here, in case the client is disconnecting @@ -881,90 +1314,162 @@ fail: return rv; } +static vapi_error_e +vapi_sock_disconnect (vapi_ctx_t ctx) +{ + vl_api_sockclnt_delete_reply_t *rp; + time_t begin; + u8 *msg = 0; + + vapi_sock_client_send_disconnect (ctx); + + begin = time (0); + vapi_error_e rv = VAPI_OK; + while (1) + { + time_t now; + + now = time (0); + + if (now >= (begin + 2)) + { + clib_warning ("peer unresponsive, give up"); + ctx->my_client_index = ~0; + rv = VAPI_ENORESP; + goto fail; + } + if (vapi_sock_recv_internal (ctx, &msg, 0) < 0) + continue; + + if (vec_len (msg) == 0) + continue; + + rp = (void *) msg; + + /* drain the queue */ + if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_DELETE_REPLY) + { + clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id)); + continue; + } + vapi_sockclnt_delete_reply_t_handler ( + ctx, (void *) rp /*, ntohl (msgbuf->data_len)*/); + break; + } +fail: + clib_socket_close (&ctx->client_socket); + vapi_api_name_and_crc_free (ctx); + + ctx->connected = false; + return rv; +} + vapi_error_e -vapi_get_fd (vapi_ctx_t ctx, int *fd) +vapi_disconnect (vapi_ctx_t ctx) { - return VAPI_ENOTSUP; + if (!ctx->connected) + { + return VAPI_EINVAL; + } + + if (ctx->use_uds) + { + return vapi_sock_disconnect (ctx); + } + return vapi_shm_disconnect (ctx); } vapi_error_e -vapi_send (vapi_ctx_t ctx, void *msg) +vapi_get_fd (vapi_ctx_t ctx, int *fd) { - vapi_error_e rv = VAPI_OK; - if (!ctx || !msg || !ctx->connected) + if (ctx->use_uds && fd) { - rv = VAPI_EINVAL; - goto out; + *fd = ctx->client_socket.fd; + return VAPI_OK; } - int tmp; - svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue; + return VAPI_ENOTSUP; +} + #if VAPI_DEBUG +static void +vapi_debug_log (vapi_ctx_t ctx, void *msg, const char *fun) +{ unsigned msgid = be16toh (*(u16 *) msg); if (msgid <= ctx->vl_msg_id_max) { vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid]; if (id < __vapi_metadata.count) { - VAPI_DBG ("send msg@%p:%u[%s]", msg, msgid, + VAPI_DBG ("%s msg@%p:%u[%s]", fun, msg, msgid, __vapi_metadata.msgs[id]->name); } else { - VAPI_DBG ("send msg@%p:%u[UNKNOWN]", msg, msgid); + VAPI_DBG ("%s msg@%p:%u[UNKNOWN]", fun, msg, msgid); } } else { - VAPI_DBG ("send msg@%p:%u[UNKNOWN]", msg, msgid); + VAPI_DBG ("%s msg@%p:%u[UNKNOWN]", fun, msg, msgid); } +} +#endif + +static vapi_error_e +vapi_shm_send (vapi_ctx_t ctx, void *msg) +{ + int rv = VAPI_OK; + int tmp; + svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue; +#if VAPI_DEBUG + vapi_debug_log (ctx, msg, "send"); #endif - tmp = svm_queue_add (q, (u8 *) & msg, - VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1); + tmp = + svm_queue_add (q, (u8 *) &msg, VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1); if (tmp < 0) { rv = VAPI_EAGAIN; } else VL_MSG_API_POISON (msg); -out: - VAPI_DBG ("vapi_send() rv = %d", rv); + return rv; } vapi_error_e -vapi_send2 (vapi_ctx_t ctx, void *msg1, void *msg2) +vapi_send (vapi_ctx_t ctx, void *msg) { vapi_error_e rv = VAPI_OK; - if (!ctx || !msg1 || !msg2 || !ctx->connected) + if (!ctx || !msg || !ctx->connected) { rv = VAPI_EINVAL; goto out; } - svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue; -#if VAPI_DEBUG - unsigned msgid1 = be16toh (*(u16 *) msg1); - unsigned msgid2 = be16toh (*(u16 *) msg2); - const char *name1 = "UNKNOWN"; - const char *name2 = "UNKNOWN"; - if (msgid1 <= ctx->vl_msg_id_max) + + if (ctx->use_uds) { - vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid1]; - if (id < __vapi_metadata.count) - { - name1 = __vapi_metadata.msgs[id]->name; - } + rv = vapi_sock_send (ctx, msg); } - if (msgid2 <= ctx->vl_msg_id_max) + else { - vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid2]; - if (id < __vapi_metadata.count) - { - name2 = __vapi_metadata.msgs[id]->name; - } + rv = vapi_shm_send (ctx, msg); } - VAPI_DBG ("send two: %u[%s], %u[%s]", msgid1, name1, msgid2, name2); + +out: + VAPI_DBG ("vapi_send() rv = %d", rv); + return rv; +} + +static vapi_error_e +vapi_shm_send2 (vapi_ctx_t ctx, void *msg1, void *msg2) +{ + vapi_error_e rv = VAPI_OK; + svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue; +#if VAPI_DEBUG + vapi_debug_log (ctx, msg1, "send2"); + vapi_debug_log (ctx, msg2, "send2"); #endif - int tmp = svm_queue_add2 (q, (u8 *) & msg1, (u8 *) & msg2, + int tmp = svm_queue_add2 (q, (u8 *) &msg1, (u8 *) &msg2, VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1); if (tmp < 0) { @@ -972,31 +1477,52 @@ vapi_send2 (vapi_ctx_t ctx, void *msg1, void *msg2) } else VL_MSG_API_POISON (msg1); -out: - VAPI_DBG ("vapi_send() rv = %d", rv); + return rv; } vapi_error_e -vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size, - svm_q_conditional_wait_t cond, u32 time) +vapi_send2 (vapi_ctx_t ctx, void *msg1, void *msg2) { - if (!ctx || !ctx->connected || !msg || !msg_size) + vapi_error_e rv = VAPI_OK; + if (!ctx || !msg1 || !msg2 || !ctx->connected) { - return VAPI_EINVAL; + rv = VAPI_EINVAL; + goto out; + } + + if (ctx->use_uds) + { + rv = vapi_sock_send2 (ctx, msg1, msg2); + } + else + { + rv = vapi_shm_send2 (ctx, msg1, msg2); } + +out: + VAPI_DBG ("vapi_send() rv = %d", rv); + return rv; +} + +static vapi_error_e +vapi_shm_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size, + svm_q_conditional_wait_t cond, u32 time) +{ vapi_error_e rv = VAPI_OK; uword data; svm_queue_t *q = ctx->vl_input_queue; -again: VAPI_DBG ("doing shm queue sub"); int tmp = svm_queue_sub (q, (u8 *) & data, cond, time); - if (tmp == 0) + if (tmp != 0) { + return VAPI_EAGAIN; + } + VL_MSG_API_UNPOISON ((void *) data); #if VAPI_DEBUG_ALLOC vapi_add_to_be_freed ((void *) data); @@ -1010,60 +1536,94 @@ again: } *msg = (u8 *) data; *msg_size = ntohl (msgbuf->data_len); + #if VAPI_DEBUG - unsigned msgid = be16toh (*(u16 *) * msg); - if (msgid <= ctx->vl_msg_id_max) - { - vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid]; - if (id < __vapi_metadata.count) - { - VAPI_DBG ("recv msg@%p:%u[%s]", *msg, msgid, - __vapi_metadata.msgs[id]->name); - } - else - { - VAPI_DBG ("recv msg@%p:%u[UNKNOWN]", *msg, msgid); - } - } - else - { - VAPI_DBG ("recv msg@%p:%u[UNKNOWN]", *msg, msgid); - } + vapi_debug_log (ctx, msg, "recv"); +#endif + + return rv; +} + +static vapi_error_e +vapi_sock_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size, u32 time) +{ + vapi_error_e rv = VAPI_OK; + u8 *data = 0; + if (time == 0 && ctx->mode == VAPI_MODE_BLOCKING) + time = 1; + + rv = vapi_sock_recv_internal (ctx, &data, time); + + if (rv != VAPI_OK) + { + return rv; + } + + *msg = data; + *msg_size = vec_len (data); + +#if VAPI_DEBUG + vapi_debug_log (ctx, msg, "recv"); #endif - if (ctx->handle_keepalives) + + return rv; +} + +vapi_error_e +vapi_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size, + svm_q_conditional_wait_t cond, u32 time) +{ + if (!ctx || !ctx->connected || !msg || !msg_size) + { + return VAPI_EINVAL; + } + vapi_error_e rv = VAPI_OK; + +again: + if (ctx->use_uds) + { + rv = vapi_sock_recv (ctx, msg, msg_size, time); + } + else + { + rv = vapi_shm_recv (ctx, msg, msg_size, cond, time); + } + + if (rv != VAPI_OK) + return rv; + + if (ctx->handle_keepalives) + { + unsigned msgid = be16toh (*(u16 *) *msg); + if (msgid == vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive)) { - unsigned msgid = be16toh (*(u16 *) * msg); - if (msgid == - vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive)) + vapi_msg_memclnt_keepalive_reply *reply = NULL; + do { - vapi_msg_memclnt_keepalive_reply *reply = NULL; - do - { - reply = vapi_msg_alloc (ctx, sizeof (*reply)); - } - while (!reply); - reply->header.context = vapi_get_client_index (ctx); - reply->header._vl_msg_id = - vapi_lookup_vl_msg_id (ctx, - vapi_msg_id_memclnt_keepalive_reply); - reply->payload.retval = 0; - vapi_msg_memclnt_keepalive_reply_hton (reply); - while (VAPI_EAGAIN == vapi_send (ctx, reply)); - vapi_msg_free (ctx, *msg); - goto again; + reply = vapi_msg_alloc (ctx, sizeof (*reply)); } + while (!reply); + reply->header.context = vapi_get_client_index (ctx); + reply->header._vl_msg_id = + vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive_reply); + reply->payload.retval = 0; + vapi_msg_memclnt_keepalive_reply_hton (reply); + while (VAPI_EAGAIN == vapi_send (ctx, reply)) + ; + vapi_msg_free (ctx, *msg); + goto again; } } - else - { - rv = VAPI_EAGAIN; - } + return rv; } vapi_error_e vapi_wait (vapi_ctx_t ctx) { + if (ctx->use_uds) + return VAPI_ENOTSUP; + svm_queue_lock (ctx->vl_input_queue); svm_queue_wait (ctx->vl_input_queue); svm_queue_unlock (ctx->vl_input_queue); |