summaryrefslogtreecommitdiffstats
path: root/src/vpp-api/vapi/vapi.c
diff options
context:
space:
mode:
authorStanislav Zaikin <stanislav.zaikin@46labs.com>2024-03-06 19:48:30 +0100
committerOle Tr�an <otroan@employees.org>2024-03-18 17:30:07 +0000
commitdc4d21e9ce78a77caa7abfe997021cd735863e0f (patch)
tree17b6636f29b256020eff224cfe7a2b76ad77726e /src/vpp-api/vapi/vapi.c
parent3eb6cbec50a5cbe4c3465d60ba6aea7bf2845cd1 (diff)
vapi: uds transport support
introduce ability to connect over unix socket instead of shared memory Type: improvement Change-Id: Id9042c74e33ad4e418896c4d7ae48bb9106195c9 Signed-off-by: Stanislav Zaikin <stanislav.zaikin@46labs.com> Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Diffstat (limited to 'src/vpp-api/vapi/vapi.c')
-rw-r--r--src/vpp-api/vapi/vapi.c816
1 files changed, 688 insertions, 128 deletions
diff --git a/src/vpp-api/vapi/vapi.c b/src/vpp-api/vapi/vapi.c
index 46ccf37a8aa..022f023aeb0 100644
--- a/src/vpp-api/vapi/vapi.c
+++ b/src/vpp-api/vapi/vapi.c
@@ -98,8 +98,11 @@ struct vapi_ctx_s
bool connected;
bool handle_keepalives;
pthread_mutex_t requests_mutex;
+ bool use_uds;
svm_queue_t *vl_input_queue;
+ clib_socket_t client_socket;
+ clib_time_t time;
u32 my_client_index;
/** client message index hash table */
uword *msg_index_by_name_and_crc;
@@ -230,8 +233,8 @@ vapi_to_be_freed_validate ()
#endif
-void *
-vapi_msg_alloc (vapi_ctx_t ctx, size_t size)
+static void *
+vapi_shm_msg_alloc (vapi_ctx_t ctx, size_t size)
{
if (!ctx->connected)
{
@@ -245,6 +248,23 @@ vapi_msg_alloc (vapi_ctx_t ctx, size_t size)
return rv;
}
+static void *
+vapi_sock_msg_alloc (size_t size)
+{
+ u8 *rv = 0;
+ vec_validate_init_empty (rv, size - 1, 0);
+ return rv;
+}
+
+void *
+vapi_msg_alloc (vapi_ctx_t ctx, size_t size)
+{
+ if (ctx->use_uds)
+ return vapi_sock_msg_alloc (size);
+
+ return vapi_shm_msg_alloc (ctx, size);
+}
+
void
vapi_msg_free (vapi_ctx_t ctx, void *msg)
{
@@ -252,10 +272,19 @@ vapi_msg_free (vapi_ctx_t ctx, void *msg)
{
return;
}
+
#if VAPI_DEBUG_ALLOC
vapi_trace_free (msg);
#endif
- vl_msg_api_free (msg);
+
+ if (ctx->use_uds)
+ {
+ vec_free (msg);
+ }
+ else
+ {
+ vl_msg_api_free (msg);
+ }
}
vapi_msg_id_t
@@ -294,6 +323,7 @@ vapi_ctx_alloc (vapi_ctx_t * result)
}
pthread_mutex_init (&ctx->requests_mutex, NULL);
*result = ctx;
+ clib_time_init (&ctx->time);
return VAPI_OK;
fail:
vapi_ctx_free (ctx);
@@ -343,6 +373,205 @@ vapi_api_name_and_crc_free (vapi_ctx_t ctx)
hash_free (ctx->msg_index_by_name_and_crc);
}
+static vapi_error_e
+vapi_sock_get_errno (int err)
+{
+ switch (err)
+ {
+ case ENOTSOCK:
+ return VAPI_ENOTSOCK;
+ case EACCES:
+ return VAPI_EACCES;
+ case ECONNRESET:
+ return VAPI_ECONNRESET;
+ default:
+ break;
+ }
+ return VAPI_ESOCK_FAILURE;
+}
+
+static vapi_error_e
+vapi_sock_send (vapi_ctx_t ctx, u8 *msg)
+{
+ size_t n;
+ struct msghdr hdr;
+
+ const size_t len = vec_len (msg);
+ const size_t total_len = len + sizeof (msgbuf_t);
+
+ msgbuf_t msgbuf1 = {
+ .q = 0,
+ .gc_mark_timestamp = 0,
+ .data_len = htonl (len),
+ };
+
+ struct iovec bufs[2] = {
+ [0] = { .iov_base = &msgbuf1, .iov_len = sizeof (msgbuf1) },
+ [1] = { .iov_base = msg, .iov_len = len },
+ };
+
+ clib_memset (&hdr, 0, sizeof (hdr));
+ hdr.msg_iov = bufs;
+ hdr.msg_iovlen = 2;
+
+ n = sendmsg (ctx->client_socket.fd, &hdr, 0);
+ if (n < 0)
+ {
+ return vapi_sock_get_errno (errno);
+ }
+
+ if (n < total_len)
+ {
+ return VAPI_EAGAIN;
+ }
+
+ vec_free (msg);
+
+ return VAPI_OK;
+}
+
+static vapi_error_e
+vapi_sock_send2 (vapi_ctx_t ctx, u8 *msg1, u8 *msg2)
+{
+ size_t n;
+ struct msghdr hdr;
+
+ const size_t len1 = vec_len (msg1);
+ const size_t len2 = vec_len (msg2);
+ const size_t total_len = len1 + len2 + 2 * sizeof (msgbuf_t);
+
+ msgbuf_t msgbuf1 = {
+ .q = 0,
+ .gc_mark_timestamp = 0,
+ .data_len = htonl (len1),
+ };
+
+ msgbuf_t msgbuf2 = {
+ .q = 0,
+ .gc_mark_timestamp = 0,
+ .data_len = htonl (len2),
+ };
+
+ struct iovec bufs[4] = {
+ [0] = { .iov_base = &msgbuf1, .iov_len = sizeof (msgbuf1) },
+ [1] = { .iov_base = msg1, .iov_len = len1 },
+ [2] = { .iov_base = &msgbuf2, .iov_len = sizeof (msgbuf2) },
+ [3] = { .iov_base = msg2, .iov_len = len2 },
+ };
+
+ clib_memset (&hdr, 0, sizeof (hdr));
+ hdr.msg_iov = bufs;
+ hdr.msg_iovlen = 4;
+
+ n = sendmsg (ctx->client_socket.fd, &hdr, 0);
+ if (n < 0)
+ {
+ return vapi_sock_get_errno (errno);
+ }
+
+ if (n < total_len)
+ {
+ return VAPI_EAGAIN;
+ }
+
+ vec_free (msg1);
+ vec_free (msg2);
+
+ return VAPI_OK;
+}
+
+static vapi_error_e
+vapi_sock_recv_internal (vapi_ctx_t ctx, u8 **vec_msg, u32 timeout)
+{
+ clib_socket_t *sock = &ctx->client_socket;
+ u32 data_len = 0, msg_size;
+ msgbuf_t *mbp = 0;
+ ssize_t n, current_rx_index;
+ f64 deadline;
+ vapi_error_e rv = VAPI_EAGAIN;
+
+ if (ctx->client_socket.fd == 0)
+ return VAPI_ENOTSOCK;
+
+ deadline = clib_time_now (&ctx->time) + timeout;
+
+ while (1)
+ {
+ current_rx_index = vec_len (sock->rx_buffer);
+ while (current_rx_index < sizeof (*mbp))
+ {
+ vec_validate (sock->rx_buffer, sizeof (*mbp) - 1);
+ n = recv (sock->fd, sock->rx_buffer + current_rx_index,
+ sizeof (*mbp) - current_rx_index, MSG_DONTWAIT);
+ if (n < 0)
+ {
+ if (errno == EAGAIN && clib_time_now (&ctx->time) >= deadline)
+ return VAPI_EAGAIN;
+
+ if (errno == EAGAIN)
+ continue;
+
+ clib_unix_warning ("socket_read");
+ vec_set_len (sock->rx_buffer, current_rx_index);
+ return vapi_sock_get_errno (errno);
+ }
+ current_rx_index += n;
+ }
+ vec_set_len (sock->rx_buffer, current_rx_index);
+
+ mbp = (msgbuf_t *) (sock->rx_buffer);
+ data_len = ntohl (mbp->data_len);
+ current_rx_index = vec_len (sock->rx_buffer);
+ vec_validate (sock->rx_buffer, current_rx_index + data_len);
+ mbp = (msgbuf_t *) (sock->rx_buffer);
+ msg_size = data_len + sizeof (*mbp);
+
+ while (current_rx_index < msg_size)
+ {
+ n = recv (sock->fd, sock->rx_buffer + current_rx_index,
+ msg_size - current_rx_index, MSG_DONTWAIT);
+ if (n < 0)
+ {
+ if (errno == EAGAIN && clib_time_now (&ctx->time) >= deadline)
+ return VAPI_EAGAIN;
+
+ if (errno == EAGAIN)
+ continue;
+
+ clib_unix_warning ("socket_read");
+ vec_set_len (sock->rx_buffer, current_rx_index);
+ return vapi_sock_get_errno (errno);
+ }
+ current_rx_index += n;
+ }
+ vec_set_len (sock->rx_buffer, current_rx_index);
+
+ if (vec_len (sock->rx_buffer) >= data_len + sizeof (*mbp))
+ {
+ if (data_len)
+ {
+ vec_add (*vec_msg, mbp->data, data_len);
+ rv = VAPI_OK;
+ }
+ else
+ {
+ *vec_msg = 0;
+ }
+
+ if (vec_len (sock->rx_buffer) == data_len + sizeof (*mbp))
+ vec_set_len (sock->rx_buffer, 0);
+ else
+ vec_delete (sock->rx_buffer, data_len + sizeof (*mbp), 0);
+ mbp = 0;
+
+ /* Quit if we're out of data, and not expecting a ping reply */
+ if (vec_len (sock->rx_buffer) == 0)
+ break;
+ }
+ }
+ return rv;
+}
+
static void
vapi_memclnt_create_v2_reply_t_handler (vapi_ctx_t ctx,
vl_api_memclnt_create_v2_reply_t *mp)
@@ -377,6 +606,28 @@ vapi_memclnt_create_v2_reply_t_handler (vapi_ctx_t ctx,
}
static void
+vapi_sockclnt_create_reply_t_handler (vapi_ctx_t ctx,
+ vl_api_sockclnt_create_reply_t *mp)
+{
+ int i;
+ u8 *name_and_crc;
+
+ ctx->my_client_index = mp->index;
+
+ /* Clean out any previous hash table (unlikely) */
+ vapi_api_name_and_crc_free (ctx);
+
+ ctx->msg_index_by_name_and_crc = hash_create_string (0, sizeof (uword));
+
+ for (i = 0; i < be16toh (mp->count); i++)
+ {
+ name_and_crc = format (0, "%s%c", mp->message_table[i].name, 0);
+ hash_set_mem (ctx->msg_index_by_name_and_crc, name_and_crc,
+ be16toh (mp->message_table[i].index));
+ }
+}
+
+static void
vapi_memclnt_delete_reply_t_handler (vapi_ctx_t ctx,
vl_api_memclnt_delete_reply_t *mp)
{
@@ -389,9 +640,17 @@ vapi_memclnt_delete_reply_t_handler (vapi_ctx_t ctx,
ctx->vl_input_queue = 0;
}
+static void
+vapi_sockclnt_delete_reply_t_handler (vapi_ctx_t ctx,
+ vl_api_sockclnt_delete_reply_t *mp)
+{
+ ctx->my_client_index = ~0;
+ ctx->vl_input_queue = 0;
+}
+
static int
-vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
- int input_queue_size, bool keepalive)
+vapi_shm_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
+ int input_queue_size, bool keepalive)
{
vl_api_memclnt_create_v2_t *mp;
vl_api_memclnt_create_v2_reply_t *rp;
@@ -406,7 +665,7 @@ vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
if (shmem_hdr == 0 || shmem_hdr->vl_input_queue == 0)
{
clib_warning ("shmem_hdr / input queue NULL");
- return -1;
+ return VAPI_ECON_FAIL;
}
clib_mem_unpoison (shmem_hdr, sizeof (*shmem_hdr));
@@ -449,7 +708,7 @@ vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
ts = tsrem;
}
/* Timeout... */
- return -1;
+ return VAPI_ECON_FAIL;
read_one_msg:
VL_MSG_API_UNPOISON (rp);
@@ -465,8 +724,83 @@ vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
return (rv);
}
+static int
+vapi_sock_client_connect (vapi_ctx_t ctx, char *path, const char *name)
+{
+ clib_error_t *error;
+ clib_socket_t *sock;
+ vl_api_sockclnt_create_t *mp;
+ vl_api_sockclnt_create_reply_t *rp;
+ int rv = 0;
+ u8 *msg = 0;
+
+ ctx->my_client_index = ~0;
+
+ if (ctx->client_socket.fd)
+ return VAPI_EINVAL;
+
+ if (name == 0)
+ return VAPI_EINVAL;
+
+ sock = &ctx->client_socket;
+ sock->config = path ? path : API_SOCKET_FILE;
+ sock->flags = CLIB_SOCKET_F_IS_CLIENT;
+
+ if ((error = clib_socket_init (sock)))
+ {
+ clib_error_report (error);
+ return VAPI_ECON_FAIL;
+ }
+
+ mp = vapi_sock_msg_alloc (sizeof (vl_api_sockclnt_create_t));
+ mp->_vl_msg_id = ntohs (VL_API_SOCKCLNT_CREATE);
+ strncpy ((char *) mp->name, name, sizeof (mp->name) - 1);
+
+ if (vapi_sock_send (ctx, (void *) mp) != VAPI_OK)
+ {
+ return VAPI_ECON_FAIL;
+ }
+
+ while (1)
+ {
+ int qstatus;
+ struct timespec ts, tsrem;
+ int i;
+
+ /* Wait up to 10 seconds */
+ for (i = 0; i < 1000; i++)
+ {
+ qstatus = vapi_sock_recv_internal (ctx, &msg, 0);
+
+ if (qstatus == 0)
+ goto read_one_msg;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 10000 * 1000; /* 10 ms */
+ while (nanosleep (&ts, &tsrem) < 0)
+ ts = tsrem;
+ }
+ /* Timeout... */
+ return -1;
+
+ read_one_msg:
+ if (vec_len (msg) == 0)
+ continue;
+
+ rp = (void *) msg;
+ if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_CREATE_REPLY)
+ {
+ clib_warning ("unexpected reply: id %d", ntohs (rp->_vl_msg_id));
+ continue;
+ }
+ rv = clib_net_to_host_u32 (rp->response);
+ vapi_sockclnt_create_reply_t_handler (ctx, rp);
+ break;
+ }
+ return (rv);
+}
+
static void
-vapi_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup)
+vapi_shm_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup)
{
vl_api_memclnt_delete_t *mp;
vl_shmem_hdr_t *shmem_hdr;
@@ -485,8 +819,21 @@ vapi_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup)
vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) &mp);
}
+static vapi_error_e
+vapi_sock_client_send_disconnect (vapi_ctx_t ctx)
+{
+ vl_api_sockclnt_delete_t *mp;
+
+ mp = vapi_msg_alloc (ctx, sizeof (vl_api_sockclnt_delete_t));
+ clib_memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = ntohs (VL_API_SOCKCLNT_DELETE);
+ mp->client_index = ctx->my_client_index;
+
+ return vapi_sock_send (ctx, (void *) mp);
+}
+
static int
-vapi_client_disconnect (vapi_ctx_t ctx)
+vapi_shm_client_disconnect (vapi_ctx_t ctx)
{
vl_api_memclnt_delete_reply_t *rp;
svm_queue_t *vl_input_queue;
@@ -494,7 +841,7 @@ vapi_client_disconnect (vapi_ctx_t ctx)
msgbuf_t *msgbuf;
vl_input_queue = ctx->vl_input_queue;
- vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
+ vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */);
/*
* Have to be careful here, in case the client is disconnecting
@@ -511,7 +858,7 @@ vapi_client_disconnect (vapi_ctx_t ctx)
{
clib_warning ("peer unresponsive, give up");
ctx->my_client_index = ~0;
- return -1;
+ return VAPI_ENORESP;
}
if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0)
continue;
@@ -535,6 +882,65 @@ vapi_client_disconnect (vapi_ctx_t ctx)
return 0;
}
+static vapi_error_e
+vapi_sock_client_disconnect (vapi_ctx_t ctx)
+{
+ vl_api_sockclnt_delete_reply_t *rp;
+ u8 *msg = 0;
+ msgbuf_t *msgbuf;
+ int rv;
+ f64 deadline;
+
+ deadline = clib_time_now (&ctx->time) + 2;
+
+ do
+ {
+ rv = vapi_sock_client_send_disconnect (ctx);
+ }
+ while (clib_time_now (&ctx->time) < deadline && rv != VAPI_OK);
+
+ while (1)
+ {
+ if (clib_time_now (&ctx->time) >= deadline)
+ {
+ clib_warning ("peer unresponsive, give up");
+ ctx->my_client_index = ~0;
+ return VAPI_ENORESP;
+ }
+
+ if (vapi_sock_recv_internal (ctx, &msg, 0) != VAPI_OK)
+ continue;
+
+ msgbuf = (void *) msg;
+ rp = (void *) msgbuf->data;
+ /* drain the queue */
+ if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_DELETE_REPLY)
+ {
+ clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id));
+ msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data));
+ vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len));
+ continue;
+ }
+ msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data));
+ vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len));
+ break;
+ }
+
+ clib_socket_close (&ctx->client_socket);
+ vapi_api_name_and_crc_free (ctx);
+ return VAPI_OK;
+}
+
+int
+vapi_client_disconnect (vapi_ctx_t ctx)
+{
+ if (ctx->use_uds)
+ {
+ return vapi_sock_client_disconnect (ctx);
+ }
+ return vapi_shm_client_disconnect (ctx);
+}
+
u32
vapi_api_get_msg_index (vapi_ctx_t ctx, u8 *name_and_crc)
{
@@ -550,9 +956,9 @@ vapi_api_get_msg_index (vapi_ctx_t ctx, u8 *name_and_crc)
}
vapi_error_e
-vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix,
- int max_outstanding_requests, int response_queue_size,
- vapi_mode_e mode, bool handle_keepalives)
+vapi_connect_ex (vapi_ctx_t ctx, const char *name, const char *path,
+ int max_outstanding_requests, int response_queue_size,
+ vapi_mode_e mode, bool handle_keepalives, bool use_uds)
{
int rv;
@@ -560,7 +966,8 @@ vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix,
{
return VAPI_EINVAL;
}
- if (!clib_mem_get_per_cpu_heap () && !clib_mem_init (0, 1024 * 1024 * 32))
+
+ if (!clib_mem_get_per_cpu_heap () && !clib_mem_init (0, 1024L * 1024 * 32))
{
return VAPI_ENOMEM;
}
@@ -576,28 +983,39 @@ vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix,
clib_memset (ctx->requests, 0, size);
/* coverity[MISSING_LOCK] - 177211 requests_mutex is not needed here */
ctx->requests_start = ctx->requests_count = 0;
+ ctx->use_uds = use_uds;
- if (chroot_prefix)
+ if (use_uds)
{
- VAPI_DBG ("set memory root path `%s'", chroot_prefix);
- vl_set_memory_root_path ((char *) chroot_prefix);
- }
- static char api_map[] = "/vpe-api";
- VAPI_DBG ("client api map `%s'", api_map);
- if ((rv = vl_map_shmem (api_map, 0 /* is_vlib */)) < 0)
- {
- return VAPI_EMAP_FAIL;
+ if (vapi_sock_client_connect (ctx, (char *) path, name) < 0)
+ {
+ return VAPI_ECON_FAIL;
+ }
}
- VAPI_DBG ("connect client `%s'", name);
- if (vapi_client_connect (ctx, (char *) name, 0, response_queue_size, true) <
- 0)
+ else
{
- vl_client_api_unmap ();
- return VAPI_ECON_FAIL;
- }
+ if (path)
+ {
+ VAPI_DBG ("set memory root path `%s'", path);
+ vl_set_memory_root_path ((char *) path);
+ }
+ static char api_map[] = "/vpe-api";
+ VAPI_DBG ("client api map `%s'", api_map);
+ if ((rv = vl_map_shmem (api_map, 0 /* is_vlib */)) < 0)
+ {
+ return VAPI_EMAP_FAIL;
+ }
+ VAPI_DBG ("connect client `%s'", name);
+ if (vapi_shm_client_connect (ctx, (char *) name, 0, response_queue_size,
+ true) < 0)
+ {
+ vl_client_api_unmap ();
+ return VAPI_ECON_FAIL;
+ }
#if VAPI_DEBUG_CONNECT
VAPI_DBG ("start probing messages");
#endif
+ }
int i;
for (i = 0; i < __vapi_metadata.count; ++i)
@@ -670,6 +1088,15 @@ fail:
return rv;
}
+vapi_error_e
+vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix,
+ int max_outstanding_requests, int response_queue_size,
+ vapi_mode_e mode, bool handle_keepalives)
+{
+ return vapi_connect_ex (ctx, name, chroot_prefix, max_outstanding_requests,
+ response_queue_size, mode, handle_keepalives, false);
+}
+
/*
* API client running in the same process as VPP
*/
@@ -680,6 +1107,11 @@ vapi_connect_from_vpp (vapi_ctx_t ctx, const char *name,
{
int rv;
+ if (ctx->use_uds)
+ {
+ return VAPI_ENOTSUP;
+ }
+
if (response_queue_size <= 0 || max_outstanding_requests <= 0)
{
return VAPI_EINVAL;
@@ -698,8 +1130,8 @@ vapi_connect_from_vpp (vapi_ctx_t ctx, const char *name,
ctx->requests_start = ctx->requests_count = 0;
VAPI_DBG ("connect client `%s'", name);
- if (vapi_client_connect (ctx, (char *) name, 0, response_queue_size,
- handle_keepalives) < 0)
+ if (vapi_shm_client_connect (ctx, (char *) name, 0, response_queue_size,
+ handle_keepalives) < 0)
{
return VAPI_ECON_FAIL;
}
@@ -773,11 +1205,17 @@ vapi_disconnect_from_vpp (vapi_ctx_t ctx)
{
return VAPI_EINVAL;
}
+
+ if (ctx->use_uds)
+ {
+ return VAPI_ENOTSUP;
+ }
+
vl_api_memclnt_delete_reply_t *rp;
svm_queue_t *vl_input_queue;
time_t begin;
vl_input_queue = ctx->vl_input_queue;
- vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
+ vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */);
/*
* Have to be careful here, in case the client is disconnecting
@@ -821,19 +1259,14 @@ fail:
return rv;
}
-vapi_error_e
-vapi_disconnect (vapi_ctx_t ctx)
+static vapi_error_e
+vapi_shm_disconnect (vapi_ctx_t ctx)
{
- if (!ctx->connected)
- {
- return VAPI_EINVAL;
- }
-
vl_api_memclnt_delete_reply_t *rp;
svm_queue_t *vl_input_queue;
time_t begin;
vl_input_queue = ctx->vl_input_queue;
- vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
+ vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */);
/*
* Have to be careful here, in case the client is disconnecting
@@ -881,90 +1314,162 @@ fail:
return rv;
}
+static vapi_error_e
+vapi_sock_disconnect (vapi_ctx_t ctx)
+{
+ vl_api_sockclnt_delete_reply_t *rp;
+ time_t begin;
+ u8 *msg = 0;
+
+ vapi_sock_client_send_disconnect (ctx);
+
+ begin = time (0);
+ vapi_error_e rv = VAPI_OK;
+ while (1)
+ {
+ time_t now;
+
+ now = time (0);
+
+ if (now >= (begin + 2))
+ {
+ clib_warning ("peer unresponsive, give up");
+ ctx->my_client_index = ~0;
+ rv = VAPI_ENORESP;
+ goto fail;
+ }
+ if (vapi_sock_recv_internal (ctx, &msg, 0) < 0)
+ continue;
+
+ if (vec_len (msg) == 0)
+ continue;
+
+ rp = (void *) msg;
+
+ /* drain the queue */
+ if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_DELETE_REPLY)
+ {
+ clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id));
+ continue;
+ }
+ vapi_sockclnt_delete_reply_t_handler (
+ ctx, (void *) rp /*, ntohl (msgbuf->data_len)*/);
+ break;
+ }
+fail:
+ clib_socket_close (&ctx->client_socket);
+ vapi_api_name_and_crc_free (ctx);
+
+ ctx->connected = false;
+ return rv;
+}
+
vapi_error_e
-vapi_get_fd (vapi_ctx_t ctx, int *fd)
+vapi_disconnect (vapi_ctx_t ctx)
{
- return VAPI_ENOTSUP;
+ if (!ctx->connected)
+ {
+ return VAPI_EINVAL;
+ }
+
+ if (ctx->use_uds)
+ {
+ return vapi_sock_disconnect (ctx);
+ }
+ return vapi_shm_disconnect (ctx);
}
vapi_error_e
-vapi_send (vapi_ctx_t ctx, void *msg)
+vapi_get_fd (vapi_ctx_t ctx, int *fd)
{
- vapi_error_e rv = VAPI_OK;
- if (!ctx || !msg || !ctx->connected)
+ if (ctx->use_uds && fd)
{
- rv = VAPI_EINVAL;
- goto out;
+ *fd = ctx->client_socket.fd;
+ return VAPI_OK;
}
- int tmp;
- svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue;
+ return VAPI_ENOTSUP;
+}
+
#if VAPI_DEBUG
+static void
+vapi_debug_log (vapi_ctx_t ctx, void *msg, const char *fun)
+{
unsigned msgid = be16toh (*(u16 *) msg);
if (msgid <= ctx->vl_msg_id_max)
{
vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid];
if (id < __vapi_metadata.count)
{
- VAPI_DBG ("send msg@%p:%u[%s]", msg, msgid,
+ VAPI_DBG ("%s msg@%p:%u[%s]", fun, msg, msgid,
__vapi_metadata.msgs[id]->name);
}
else
{
- VAPI_DBG ("send msg@%p:%u[UNKNOWN]", msg, msgid);
+ VAPI_DBG ("%s msg@%p:%u[UNKNOWN]", fun, msg, msgid);
}
}
else
{
- VAPI_DBG ("send msg@%p:%u[UNKNOWN]", msg, msgid);
+ VAPI_DBG ("%s msg@%p:%u[UNKNOWN]", fun, msg, msgid);
}
+}
+#endif
+
+static vapi_error_e
+vapi_shm_send (vapi_ctx_t ctx, void *msg)
+{
+ int rv = VAPI_OK;
+ int tmp;
+ svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue;
+#if VAPI_DEBUG
+ vapi_debug_log (ctx, msg, "send");
#endif
- tmp = svm_queue_add (q, (u8 *) & msg,
- VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1);
+ tmp =
+ svm_queue_add (q, (u8 *) &msg, VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1);
if (tmp < 0)
{
rv = VAPI_EAGAIN;
}
else
VL_MSG_API_POISON (msg);
-out:
- VAPI_DBG ("vapi_send() rv = %d", rv);
+
return rv;
}
vapi_error_e
-vapi_send2 (vapi_ctx_t ctx, void *msg1, void *msg2)
+vapi_send (vapi_ctx_t ctx, void *msg)
{
vapi_error_e rv = VAPI_OK;
- if (!ctx || !msg1 || !msg2 || !ctx->connected)
+ if (!ctx || !msg || !ctx->connected)
{
rv = VAPI_EINVAL;
goto out;
}
- svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue;
-#if VAPI_DEBUG
- unsigned msgid1 = be16toh (*(u16 *) msg1);
- unsigned msgid2 = be16toh (*(u16 *) msg2);
- const char *name1 = "UNKNOWN";
- const char *name2 = "UNKNOWN";
- if (msgid1 <= ctx->vl_msg_id_max)
+
+ if (ctx->use_uds)
{
- vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid1];
- if (id < __vapi_metadata.count)
- {
- name1 = __vapi_metadata.msgs[id]->name;
- }
+ rv = vapi_sock_send (ctx, msg);
}
- if (msgid2 <= ctx->vl_msg_id_max)
+ else
{
- vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid2];
- if (id < __vapi_metadata.count)
- {
- name2 = __vapi_metadata.msgs[id]->name;
- }
+ rv = vapi_shm_send (ctx, msg);
}
- VAPI_DBG ("send two: %u[%s], %u[%s]", msgid1, name1, msgid2, name2);
+
+out:
+ VAPI_DBG ("vapi_send() rv = %d", rv);
+ return rv;
+}
+
+static vapi_error_e
+vapi_shm_send2 (vapi_ctx_t ctx, void *msg1, void *msg2)
+{
+ vapi_error_e rv = VAPI_OK;
+ svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue;
+#if VAPI_DEBUG
+ vapi_debug_log (ctx, msg1, "send2");
+ vapi_debug_log (ctx, msg2, "send2");
#endif
- int tmp = svm_queue_add2 (q, (u8 *) & msg1, (u8 *) & msg2,
+ int tmp = svm_queue_add2 (q, (u8 *) &msg1, (u8 *) &msg2,
VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1);
if (tmp < 0)
{
@@ -972,31 +1477,52 @@ vapi_send2 (vapi_ctx_t ctx, void *msg1, void *msg2)
}
else
VL_MSG_API_POISON (msg1);
-out:
- VAPI_DBG ("vapi_send() rv = %d", rv);
+
return rv;
}
vapi_error_e
-vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size,
- svm_q_conditional_wait_t cond, u32 time)
+vapi_send2 (vapi_ctx_t ctx, void *msg1, void *msg2)
{
- if (!ctx || !ctx->connected || !msg || !msg_size)
+ vapi_error_e rv = VAPI_OK;
+ if (!ctx || !msg1 || !msg2 || !ctx->connected)
{
- return VAPI_EINVAL;
+ rv = VAPI_EINVAL;
+ goto out;
+ }
+
+ if (ctx->use_uds)
+ {
+ rv = vapi_sock_send2 (ctx, msg1, msg2);
+ }
+ else
+ {
+ rv = vapi_shm_send2 (ctx, msg1, msg2);
}
+
+out:
+ VAPI_DBG ("vapi_send() rv = %d", rv);
+ return rv;
+}
+
+static vapi_error_e
+vapi_shm_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size,
+ svm_q_conditional_wait_t cond, u32 time)
+{
vapi_error_e rv = VAPI_OK;
uword data;
svm_queue_t *q = ctx->vl_input_queue;
-again:
VAPI_DBG ("doing shm queue sub");
int tmp = svm_queue_sub (q, (u8 *) & data, cond, time);
- if (tmp == 0)
+ if (tmp != 0)
{
+ return VAPI_EAGAIN;
+ }
+
VL_MSG_API_UNPOISON ((void *) data);
#if VAPI_DEBUG_ALLOC
vapi_add_to_be_freed ((void *) data);
@@ -1010,60 +1536,94 @@ again:
}
*msg = (u8 *) data;
*msg_size = ntohl (msgbuf->data_len);
+
#if VAPI_DEBUG
- unsigned msgid = be16toh (*(u16 *) * msg);
- if (msgid <= ctx->vl_msg_id_max)
- {
- vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid];
- if (id < __vapi_metadata.count)
- {
- VAPI_DBG ("recv msg@%p:%u[%s]", *msg, msgid,
- __vapi_metadata.msgs[id]->name);
- }
- else
- {
- VAPI_DBG ("recv msg@%p:%u[UNKNOWN]", *msg, msgid);
- }
- }
- else
- {
- VAPI_DBG ("recv msg@%p:%u[UNKNOWN]", *msg, msgid);
- }
+ vapi_debug_log (ctx, msg, "recv");
+#endif
+
+ return rv;
+}
+
+static vapi_error_e
+vapi_sock_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size, u32 time)
+{
+ vapi_error_e rv = VAPI_OK;
+ u8 *data = 0;
+ if (time == 0 && ctx->mode == VAPI_MODE_BLOCKING)
+ time = 1;
+
+ rv = vapi_sock_recv_internal (ctx, &data, time);
+
+ if (rv != VAPI_OK)
+ {
+ return rv;
+ }
+
+ *msg = data;
+ *msg_size = vec_len (data);
+
+#if VAPI_DEBUG
+ vapi_debug_log (ctx, msg, "recv");
#endif
- if (ctx->handle_keepalives)
+
+ return rv;
+}
+
+vapi_error_e
+vapi_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size,
+ svm_q_conditional_wait_t cond, u32 time)
+{
+ if (!ctx || !ctx->connected || !msg || !msg_size)
+ {
+ return VAPI_EINVAL;
+ }
+ vapi_error_e rv = VAPI_OK;
+
+again:
+ if (ctx->use_uds)
+ {
+ rv = vapi_sock_recv (ctx, msg, msg_size, time);
+ }
+ else
+ {
+ rv = vapi_shm_recv (ctx, msg, msg_size, cond, time);
+ }
+
+ if (rv != VAPI_OK)
+ return rv;
+
+ if (ctx->handle_keepalives)
+ {
+ unsigned msgid = be16toh (*(u16 *) *msg);
+ if (msgid == vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive))
{
- unsigned msgid = be16toh (*(u16 *) * msg);
- if (msgid ==
- vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive))
+ vapi_msg_memclnt_keepalive_reply *reply = NULL;
+ do
{
- vapi_msg_memclnt_keepalive_reply *reply = NULL;
- do
- {
- reply = vapi_msg_alloc (ctx, sizeof (*reply));
- }
- while (!reply);
- reply->header.context = vapi_get_client_index (ctx);
- reply->header._vl_msg_id =
- vapi_lookup_vl_msg_id (ctx,
- vapi_msg_id_memclnt_keepalive_reply);
- reply->payload.retval = 0;
- vapi_msg_memclnt_keepalive_reply_hton (reply);
- while (VAPI_EAGAIN == vapi_send (ctx, reply));
- vapi_msg_free (ctx, *msg);
- goto again;
+ reply = vapi_msg_alloc (ctx, sizeof (*reply));
}
+ while (!reply);
+ reply->header.context = vapi_get_client_index (ctx);
+ reply->header._vl_msg_id =
+ vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive_reply);
+ reply->payload.retval = 0;
+ vapi_msg_memclnt_keepalive_reply_hton (reply);
+ while (VAPI_EAGAIN == vapi_send (ctx, reply))
+ ;
+ vapi_msg_free (ctx, *msg);
+ goto again;
}
}
- else
- {
- rv = VAPI_EAGAIN;
- }
+
return rv;
}
vapi_error_e
vapi_wait (vapi_ctx_t ctx)
{
+ if (ctx->use_uds)
+ return VAPI_ENOTSUP;
+
svm_queue_lock (ctx->vl_input_queue);
svm_queue_wait (ctx->vl_input_queue);
svm_queue_unlock (ctx->vl_input_queue);