/* *------------------------------------------------------------------ * Copyright (c) 2017 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *------------------------------------------------------------------ */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define vl_typedefs /* define message structures */ #include #undef vl_typedefs /* we need to use control pings for some stuff and because we're forced to put * the code in headers, we need a way to be able to grab the ids of these * messages - so declare them here as extern */ vapi_msg_id_t vapi_msg_id_control_ping = 0; vapi_msg_id_t vapi_msg_id_control_ping_reply = 0; DEFINE_VAPI_MSG_IDS_MEMCLNT_API_JSON; DEFINE_VAPI_MSG_IDS_VLIB_API_JSON; struct { size_t count; vapi_message_desc_t **msgs; size_t max_len_name_with_crc; } __vapi_metadata; typedef struct { u32 context; vapi_cb_t callback; void *callback_ctx; vapi_msg_id_t response_id; enum vapi_request_type type; } vapi_req_t; static const u32 context_counter_mask = (1 << 31); typedef struct { vapi_error_e (*cb) (vapi_ctx_t ctx, void *callback_ctx, vapi_msg_id_t id, void *payload); void *ctx; } vapi_generic_cb_with_ctx; typedef struct { vapi_error_e (*cb) (vapi_ctx_t ctx, void *callback_ctx, void *payload); void *ctx; } vapi_event_cb_with_ctx; struct vapi_ctx_s { vapi_mode_e mode; int requests_size; /* size of the requests array (circular queue) */ int requests_start; /* index of first request */ int requests_count; /* number of used slots */ vapi_req_t *requests; u32 context_counter; vapi_generic_cb_with_ctx generic_cb; vapi_event_cb_with_ctx *event_cbs; u16 *vapi_msg_id_t_to_vl_msg_id; u16 vl_msg_id_max; vapi_msg_id_t *vl_msg_id_to_vapi_msg_t; bool connected; bool handle_keepalives; pthread_mutex_t requests_mutex; bool use_uds; svm_queue_t *vl_input_queue; clib_socket_t client_socket; clib_time_t time; u32 my_client_index; /** client message index hash table */ uword *msg_index_by_name_and_crc; }; u32 vapi_gen_req_context (vapi_ctx_t ctx) { ++ctx->context_counter; ctx->context_counter %= context_counter_mask; return ctx->context_counter | context_counter_mask; } size_t vapi_get_request_count (vapi_ctx_t ctx) { return ctx->requests_count; } bool vapi_requests_full (vapi_ctx_t ctx) { return (ctx->requests_count == ctx->requests_size); } bool vapi_requests_empty (vapi_ctx_t ctx) { return (0 == ctx->requests_count); } static int vapi_requests_end (vapi_ctx_t ctx) { return (ctx->requests_start + ctx->requests_count) % ctx->requests_size; } void vapi_store_request (vapi_ctx_t ctx, u32 context, vapi_msg_id_t response_id, enum vapi_request_type request_type, vapi_cb_t callback, void *callback_ctx) { assert (!vapi_requests_full (ctx)); /* if the mutex is not held, bad things will happen */ assert (0 != pthread_mutex_trylock (&ctx->requests_mutex)); const int requests_end = vapi_requests_end (ctx); vapi_req_t *slot = &ctx->requests[requests_end]; slot->type = request_type; slot->response_id = response_id; slot->context = context; slot->callback = callback; slot->callback_ctx = callback_ctx; VAPI_DBG ("stored@%d: context:%x (start is @%d)", requests_end, context, ctx->requests_start); ++ctx->requests_count; assert (!vapi_requests_empty (ctx)); } #if VAPI_DEBUG_ALLOC struct to_be_freed_s; struct to_be_freed_s { void *v; struct to_be_freed_s *next; }; static struct to_be_freed_s *to_be_freed = NULL; void vapi_add_to_be_freed (void *v) { struct to_be_freed_s *prev = NULL; struct to_be_freed_s *tmp; tmp = to_be_freed; while (tmp && tmp->v) { prev = tmp; tmp = tmp->next; } if (!tmp) { if (!prev) { tmp = to_be_freed = calloc (1, sizeof (*to_be_freed)); } else { tmp = prev->next = calloc (1, sizeof (*to_be_freed)); } } VAPI_DBG ("To be freed %p", v); tmp->v = v; } void vapi_trace_free (void *v) { struct to_be_freed_s *tmp = to_be_freed; while (tmp && tmp->v != v) { tmp = tmp->next; } if (tmp && tmp->v == v) { VAPI_DBG ("Freed %p", v); tmp->v = NULL; } else { VAPI_ERR ("Trying to free untracked pointer %p", v); abort (); } } void vapi_to_be_freed_validate () { struct to_be_freed_s *tmp = to_be_freed; while (tmp) { if (tmp->v) { VAPI_ERR ("Unfreed msg %p!", tmp->v); } tmp = tmp->next; } } #endif static void * vapi_shm_msg_alloc (vapi_ctx_t ctx, size_t size) { if (!ctx->connected) { return NULL; } void *rv = vl_msg_api_alloc_as_if_client_or_null (size); if (rv) { clib_memset (rv, 0, size); } return rv; } static void * vapi_sock_msg_alloc (size_t size) { u8 *rv = 0; vec_validate_init_empty (rv, size - 1, 0); return rv; } void * vapi_msg_alloc (vapi_ctx_t ctx, size_t size) { if (ctx->use_uds) return vapi_sock_msg_alloc (size); return vapi_shm_msg_alloc (ctx, size); } void vapi_msg_free (vapi_ctx_t ctx, void *msg) { if (!ctx->connected) { return; } #if VAPI_DEBUG_ALLOC vapi_trace_free (msg); #endif if (ctx->use_uds) { vec_free (msg); } else { vl_msg_api_free (msg); } } vapi_msg_id_t vapi_lookup_vapi_msg_id_t (vapi_ctx_t ctx, u16 vl_msg_id) { if (vl_msg_id <= ctx->vl_msg_id_max) { return ctx->vl_msg_id_to_vapi_msg_t[vl_msg_id]; } return VAPI_INVALID_MSG_ID; } vapi_error_e vapi_ctx_alloc (vapi_ctx_t * result) { vapi_ctx_t ctx = calloc (1, sizeof (struct vapi_ctx_s)); if (!ctx) { return VAPI_ENOMEM; } ctx->context_counter = 0; ctx->vapi_msg_id_t_to_vl_msg_id = malloc (__vapi_metadata.count * sizeof (*ctx->vapi_msg_id_t_to_vl_msg_id)); if (!ctx->vapi_msg_id_t_to_vl_msg_id) { goto fail; } clib_memset (ctx->vapi_msg_id_t_to_vl_msg_id, ~0, __vapi_metadata.count * sizeof (*ctx->vapi_msg_id_t_to_vl_msg_id)); ctx->event_cbs = calloc (__vapi_metadata.count, sizeof (*ctx->event_cbs)); if (!ctx->event_cbs) { goto fail; } pthread_mutex_init (&ctx->requests_mutex, NULL); *result = ctx; clib_time_init (&ctx->time); return VAPI_OK; fail: vapi_ctx_free (ctx); return VAPI_ENOMEM; } void vapi_ctx_free (vapi_ctx_t ctx) { assert (!ctx->connected); free (ctx->requests); free (ctx->vapi_msg_id_t_to_vl_msg_id); free (ctx->event_cbs); free (ctx->vl_msg_id_to_vapi_msg_t); pthread_mutex_destroy (&ctx->requests_mutex); free (ctx); } bool vapi_is_msg_available (vapi_ctx_t ctx, vapi_msg_id_t id) { return vapi_lookup_vl_msg_id (ctx, id) != UINT16_MAX; } /* Cut and paste to avoid adding dependency to client library */ __clib_nosanitize_addr static void VL_API_VEC_UNPOISON (const void *v) { const vec_header_t *vh = &((vec_header_t *) v)[-1]; clib_mem_unpoison (vh, sizeof (*vh) + vec_len (v)); } static void vapi_api_name_and_crc_free (vapi_ctx_t ctx) { int i; u8 **keys = 0; hash_pair_t *hp; if (!ctx->msg_index_by_name_and_crc) return; hash_foreach_pair (hp, ctx->msg_index_by_name_and_crc, ({ vec_add1 (keys, (u8 *) hp->key); })); for (i = 0; i < vec_len (keys); i++) vec_free (keys[i]); vec_free (keys); hash_free (ctx->msg_index_by_name_and_crc); } static vapi_error_e vapi_sock_get_errno (int err) { switch (err) { case ENOTSOCK: return VAPI_ENOTSOCK; case EACCES: return VAPI_EACCES; case ECONNRESET: return VAPI_ECONNRESET; default: break; } return VAPI_ESOCK_FAILURE; } static vapi_error_e vapi_sock_send (vapi_ctx_t ctx, u8 *msg) { size_t n; struct msghdr hdr; const size_t len = vec_len (msg); const size_t total_len = len + sizeof (msgbuf_t); msgbuf_t msgbuf1 = { .q = 0, .gc_mark_timestamp = 0, .data_len = htonl (len), }; struct iovec bufs[2] = { [0] = { .iov_base = &msgbuf1, .iov_len = sizeof (msgbuf1) }, [1] = { .iov_base = msg, .iov_len = len }, }; clib_memset (&hdr, 0, sizeof (hdr)); hdr.msg_iov = bufs; hdr.msg_iovlen = 2; n = sendmsg (ctx->client_socket.fd, &hdr, 0); if (n < 0) { return vapi_sock_get_errno (errno); } if (n < total_len) { return VAPI_EAGAIN; } vec_free (msg); return VAPI_OK; } static vapi_error_e vapi_sock_send2 (vapi_ctx_t ctx, u8 *msg1, u8 *msg2) { size_t n; struct msghdr hdr; const size_t len1 = vec_len (msg1); const size_t len2 = vec_len (msg2); const size_t total_len = len1 + len2 + 2 * sizeof (msgbuf_t); msgbuf_t msgbuf1 = { .q = 0, .gc_mark_timestamp = 0, .data_len = htonl (len1), }; msgbuf_t msgbuf2 = { .q = 0, .gc_mark_timestamp = 0, .data_len = htonl (len2), }; struct iovec bufs[4] = { [0] = { .iov_base = &msgbuf1, .iov_len = sizeof (msgbuf1) }, [1] = { .iov_base = msg1, .iov_len = len1 }, [2] = { .iov_base = &msgbuf2, .iov_len = sizeof (msgbuf2) }, [3] = { .iov_base = msg2, .iov_len = len2 }, }; clib_memset (&hdr, 0, sizeof (hdr)); hdr.msg_iov = bufs; hdr.msg_iovlen = 4; n = sendmsg (ctx->client_socket.fd, &hdr, 0); if (n < 0) { return vapi_sock_get_errno (errno); } if (n < total_len) { return VAPI_EAGAIN; } vec_free (msg1); vec_free (msg2); return VAPI_OK; } static vapi_error_e vapi_sock_recv_internal (vapi_ctx_t ctx, u8 **vec_msg, u32 timeout) { clib_socket_t *sock = &ctx->client_socket; u32 data_len = 0, msg_size; msgbuf_t *mbp = 0; ssize_t n, current_rx_index; f64 deadline; vapi_error_e rv = VAPI_EAGAIN; if (ctx->client_socket.fd == 0) return VAPI_ENOTSOCK; deadline = clib_time_now (&ctx->time) + timeout; while (1) { current_rx_index = vec_len (sock->rx_buffer); while (current_rx_index < sizeof (*mbp)) { vec_validate (sock->rx_buffer, sizeof (*mbp) - 1); n = recv (sock->fd, sock->rx_buffer + current_rx_index, sizeof (*mbp) - current_rx_index, MSG_DONTWAIT); if (n < 0) { if (errno == EAGAIN && clib_time_now (&ctx->time) >= deadline) return VAPI_EAGAIN; if (errno == EAGAIN) continue; clib_unix_warning ("socket_read"); vec_set_len (sock->rx_buffer, current_rx_index); return vapi_sock_get_errno (errno); } current_rx_index += n; } vec_set_len (sock->rx_buffer, current_rx_index); mbp = (msgbuf_t *) (sock->rx_buffer); data_len = ntohl (mbp->data_len); current_rx_index = vec_len (sock->rx_buffer); vec_validate (sock->rx_buffer, current_rx_index + data_len); mbp = (msgbuf_t *) (sock->rx_buffer); msg_size = data_len + sizeof (*mbp); while (current_rx_index < msg_size) { n = recv (sock->fd, sock->rx_buffer + current_rx_index, msg_size - current_rx_index, MSG_DONTWAIT); if (n < 0) { if (errno == EAGAIN && clib_time_now (&ctx->time) >= deadline) return VAPI_EAGAIN; if (errno == EAGAIN) continue; clib_unix_warning ("socket_read"); vec_set_len (sock->rx_buffer, current_rx_index); return vapi_sock_get_errno (errno); } current_rx_index += n; } vec_set_len (sock->rx_buffer, current_rx_index); if (vec_len (sock->rx_buffer) >= data_len + sizeof (*mbp)) { if (data_len) { vec_add (*vec_msg, mbp->data, data_len); rv = VAPI_OK; } else { *vec_msg = 0; } if (vec_len (sock->rx_buffer) == data_len + sizeof (*mbp)) vec_set_len (sock->rx_buffer, 0); else vec_delete (sock->rx_buffer, data_len + sizeof (*mbp), 0); mbp = 0; /* Quit if we're out of data, and not expecting a ping reply */ if (vec_len (sock->rx_buffer) == 0) break; } } return rv; } static void vapi_memclnt_create_v2_reply_t_handler (vapi_ctx_t ctx, vl_api_memclnt_create_v2_reply_t *mp) { serialize_main_t _sm, *sm = &_sm; u8 *tblv; u32 nmsgs; int i; u8 *name_and_crc; u32 msg_index; ctx->my_client_index = mp->index; /* Clean out any previous hash table (unlikely) */ vapi_api_name_and_crc_free (ctx); ctx->msg_index_by_name_and_crc = hash_create_string (0, sizeof (uword)); /* Recreate the vnet-side API message handler table */ tblv = uword_to_pointer (mp->message_table, u8 *); unserialize_open_data (sm, tblv, vec_len (tblv)); unserialize_integer (sm, &nmsgs, sizeof (u32)); VL_API_VEC_UNPOISON (tblv); for (i = 0; i < nmsgs; i++) { msg_index = unserialize_likely_small_unsigned_integer (sm); unserialize_cstring (sm, (char **) &name_and_crc); hash_set_mem (ctx->msg_index_by_name_and_crc, name_and_crc, msg_index); } } static void vapi_sockclnt_create_reply_t_handler (vapi_ctx_t ctx, vl_api_sockclnt_create_reply_t *mp) { int i; u8 *name_and_crc; ctx->my_client_index = mp->index; /* Clean out any previous hash table (unlikely) */ vapi_api_name_and_crc_free (ctx); ctx->msg_index_by_name_and_crc = hash_create_string (0, sizeof (uword)); for (i = 0; i < be16toh (mp->count); i++) { name_and_crc = format (0, "%s%c", mp->message_table[i].name, 0); hash_set_mem (ctx->msg_index_by_name_and_crc, name_and_crc, be16toh (mp->message_table[i].index)); } } static void vapi_memclnt_delete_reply_t_handler (vapi_ctx_t ctx, vl_api_memclnt_delete_reply_t *mp) { void *oldheap; oldheap = vl_msg_push_heap (); svm_queue_free (ctx->vl_input_queue); vl_msg_pop_heap (oldheap); ctx->my_client_index = ~0; ctx->vl_input_queue = 0; } static void vapi_sockclnt_delete_reply_t_handler (vapi_ctx_t ctx, vl_api_sockclnt_delete_reply_t *mp) { ctx->my_client_index = ~0; ctx->vl_input_queue = 0; } static int vapi_shm_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota, int input_queue_size, bool keepalive) { vl_api_memclnt_create_v2_t *mp; vl_api_memclnt_create_v2_reply_t *rp; svm_queue_t *vl_input_queue; vl_shmem_hdr_t *shmem_hdr; int rv = 0; void *oldheap; api_main_t *am = vlibapi_get_main (); shmem_hdr = am->shmem_hdr; if (shmem_hdr == 0 || shmem_hdr->vl_input_queue == 0) { clib_warning ("shmem_hdr / input queue NULL"); return VAPI_ECON_FAIL; } clib_mem_unpoison (shmem_hdr, sizeof (*shmem_hdr)); VL_MSG_API_SVM_QUEUE_UNPOISON (shmem_hdr->vl_input_queue); oldheap = vl_msg_push_heap (); vl_input_queue = svm_queue_alloc_and_init (input_queue_size, sizeof (uword), getpid ()); vl_msg_pop_heap (oldheap); ctx->my_client_index = ~0; ctx->vl_input_queue = vl_input_queue; mp = vl_msg_api_alloc_as_if_client (sizeof (vl_api_memclnt_create_v2_t)); clib_memset (mp, 0, sizeof (*mp)); mp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_V2); mp->ctx_quota = ctx_quota; mp->input_queue = (uword) vl_input_queue; strncpy ((char *) mp->name, name, sizeof (mp->name) - 1); mp->keepalive = keepalive; vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) &mp); while (1) { int qstatus; struct timespec ts, tsrem; int i; /* Wait up to 10 seconds */ for (i = 0; i < 1000; i++) { qstatus = svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0); if (qstatus == 0) goto read_one_msg; ts.tv_sec = 0; ts.tv_nsec = 10000 * 1000; /* 10 ms */ while (nanosleep (&ts, &tsrem) < 0) ts = tsrem; } /* Timeout... */ return VAPI_ECON_FAIL; read_one_msg: VL_MSG_API_UNPOISON (rp); if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_CREATE_V2_REPLY) { clib_warning ("unexpected reply: id %d", ntohs (rp->_vl_msg_id)); continue; } rv = clib_net_to_host_u32 (rp->response); vapi_memclnt_create_v2_reply_t_handler (ctx, rp); break; } return (rv); } static int vapi_sock_client_connect (vapi_ctx_t ctx, char *path, const char *name) { clib_error_t *error; clib_socket_t *sock; vl_api_sockclnt_create_t *mp; vl_api_sockclnt_create_reply_t *rp; int rv = 0; u8 *msg = 0; ctx->my_client_index = ~0; if (ctx->client_socket.fd) return VAPI_EINVAL; if (name == 0) return VAPI_EINVAL; sock = &ctx->client_socket; sock->config = path ? path : API_SOCKET_FILE; sock->flags = CLIB_SOCKET_F_IS_CLIENT; if ((error = clib_socket_init (sock))) { clib_error_report (error); return VAPI_ECON_FAIL; } mp = vapi_sock_msg_alloc (sizeof (vl_api_sockclnt_create_t)); mp->_vl_msg_id = ntohs (VL_API_SOCKCLNT_CREATE); strncpy ((char *) mp->name, name, sizeof (mp->name) - 1); if (vapi_sock_send (ctx, (void *) mp) != VAPI_OK) { return VAPI_ECON_FAIL; } while (1) { int qstatus; struct timespec ts, tsrem; int i; /* Wait up to 10 seconds */ for (i = 0; i < 1000; i++) { qstatus = vapi_sock_recv_internal (ctx, &msg, 0); if (qstatus == 0) goto read_one_msg; ts.tv_sec = 0; ts.tv_nsec = 10000 * 1000; /* 10 ms */ while (nanosleep (&ts, &tsrem) < 0) ts = tsrem; } /* Timeout... */ return -1; read_one_msg: if (vec_len (msg) == 0) continue; rp = (void *) msg; if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_CREATE_REPLY) { clib_warning ("unexpected reply: id %d", ntohs (rp->_vl_msg_id)); continue; } rv = clib_net_to_host_u32 (rp->response); vapi_sockclnt_create_reply_t_handler (ctx, rp); break; } return (rv); } static void vapi_shm_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup) { vl_api_memclnt_delete_t *mp; vl_shmem_hdr_t *shmem_hdr; api_main_t *am = vlibapi_get_main (); ASSERT (am->vlib_rp); shmem_hdr = am->shmem_hdr; ASSERT (shmem_hdr && shmem_hdr->vl_input_queue); mp = vl_msg_api_alloc (sizeof (vl_api_memclnt_delete_t)); clib_memset (mp, 0, sizeof (*mp)); mp->_vl_msg_id = ntohs (VL_API_MEMCLNT_DELETE); mp->index = ctx->my_client_index; mp->do_cleanup = do_cleanup; vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) &mp); } static vapi_error_e vapi_sock_client_send_disconnect (vapi_ctx_t ctx) { vl_api_sockclnt_delete_t *mp; mp = vapi_msg_alloc (ctx, sizeof (vl_api_sockclnt_delete_t)); clib_memset (mp, 0, sizeof (*mp)); mp->_vl_msg_id = ntohs (VL_API_SOCKCLNT_DELETE); mp->client_index = ctx->my_client_index; return vapi_sock_send (ctx, (void *) mp); } static int vapi_shm_client_disconnect (vapi_ctx_t ctx) { vl_api_memclnt_delete_reply_t *rp; svm_queue_t *vl_input_queue; time_t begin; msgbuf_t *msgbuf; vl_input_queue = ctx->vl_input_queue; vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */); /* * Have to be careful here, in case the client is disconnecting * because e.g. the vlib process died, or is unresponsive. */ begin = time (0); while (1) { time_t now; now = time (0); if (now >= (begin + 2)) { clib_warning ("peer unresponsive, give up"); ctx->my_client_index = ~0; return VAPI_ENORESP; } if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0) continue; VL_MSG_API_UNPOISON (rp); /* drain the queue */ if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_DELETE_REPLY) { clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id)); msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data)); vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len)); continue; } msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data)); vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len)); break; } vapi_api_name_and_crc_free (ctx); return 0; } static vapi_error_e vapi_sock_client_disconnect (vapi_ctx_t ctx) { vl_api_sockclnt_delete_reply_t *rp; u8 *msg = 0; msgbuf_t *msgbuf; int rv; f64 deadline; deadline = clib_time_now (&ctx->time) + 2; do { rv = vapi_sock_client_send_disconnect (ctx); } while (clib_time_now (&ctx->time) < deadline && rv != VAPI_OK); while (1) { if (clib_time_now (&ctx->time) >= deadline) { clib_warning ("peer unresponsive, give up"); ctx->my_client_index = ~0; return VAPI_ENORESP; } if (vapi_sock_recv_internal (ctx, &msg, 0) != VAPI_OK) continue; msgbuf = (void *) msg; rp = (void *) msgbuf->data; /* drain the queue */ if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_DELETE_REPLY) { clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id)); msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data)); vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len)); continue; } msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data)); vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len)); break; } clib_socket_close (&ctx->client_socket); clib_socket_free (&ctx->client_socket); vapi_api_name_and_crc_free (ctx); return VAPI_OK; } int vapi_client_disconnect (vapi_ctx_t ctx) { if (ctx->use_uds) { return vapi_sock_client_disconnect (ctx); } return vapi_shm_client_disconnect (ctx); } u32 vapi_api_get_msg_index (vapi_ctx_t ctx, u8 *name_and_crc) { uword *p; if (ctx->msg_index_by_name_and_crc) { p = hash_get_mem (ctx->msg_index_by_name_and_crc, name_and_crc); if (p) return p[0]; } return ~0; } vapi_error_e vapi_connect_ex (vapi_ctx_t ctx, const char *name, const char *path, int max_outstanding_requests, int response_queue_size, vapi_mode_e mode, bool handle_keepalives, bool use_uds) { int rv; if (response_queue_size <= 0 || max_outstanding_requests <= 0) { return VAPI_EINVAL; } if (!clib_mem_get_per_cpu_heap () && !clib_mem_init (0, 1024L * 1024 * 32)) { return VAPI_ENOMEM; } ctx->requests_size = max_outstanding_requests; const size_t size = ctx->requests_size * sizeof (*ctx->requests); void *tmp = realloc (ctx->requests, size); if (!tmp) { return VAPI_ENOMEM; } ctx->requests = tmp; clib_memset (ctx->requests, 0, size); /* coverity[MISSING_LOCK] - 177211 requests_mutex is not needed here */ ctx->requests_start = ctx->requests_count = 0; ctx->use_uds = use_uds; if (use_uds) { if (vapi_sock_client_connect (ctx, (char *) path, name) != VAPI_OK) { return VAPI_ECON_FAIL; } } else { if (path) { VAPI_DBG ("set memory root path `%s'", path); vl_set_memory_root_path ((char *) path); } static char api_map[] = "/vpe-api"; VAPI_DBG ("client api map `%s'", api_map); if ((rv = vl_map_shmem (api_map, 0 /* is_vlib */)) < 0) { return VAPI_EMAP_FAIL; } VAPI_DBG ("connect client `%s'", name); if (vapi_shm_client_connect (ctx, (char *) name, 0, response_queue_size, true) < 0) { vl_client_api_unmap (); return VAPI_ECON_FAIL; } #if VAPI_DEBUG_CONNECT VAPI_DBG ("start probing messages"); #endif } int i; for (i = 0; i < __vapi_metadata.count; ++i) { vapi_message_desc_t *m = __vapi_metadata.msgs[i]; u8 scratch[m->name_with_crc_len + 1]; memcpy (scratch, m->name_with_crc, m->name_with_crc_len + 1); u32 id = vapi_api_get_msg_index (ctx, scratch); if (VAPI_INVALID_MSG_ID != id) { if (id > UINT16_MAX) { VAPI_ERR ("Returned vl_msg_id `%u' > UINT16MAX `%u'!", id, UINT16_MAX); rv = VAPI_EINVAL; goto fail; } if (id > ctx->vl_msg_id_max) { vapi_msg_id_t *tmp = realloc (ctx->vl_msg_id_to_vapi_msg_t, sizeof (*ctx->vl_msg_id_to_vapi_msg_t) * (id + 1)); if (!tmp) { rv = VAPI_ENOMEM; goto fail; } ctx->vl_msg_id_to_vapi_msg_t = tmp; ctx->vl_msg_id_max = id; } ctx->vl_msg_id_to_vapi_msg_t[id] = m->id; ctx->vapi_msg_id_t_to_vl_msg_id[m->id] = id; #if VAPI_DEBUG_CONNECT VAPI_DBG ("Message `%s' has vl_msg_id `%u'", m->name_with_crc, (unsigned) id); #endif } else { ctx->vapi_msg_id_t_to_vl_msg_id[m->id] = UINT16_MAX; VAPI_DBG ("Message `%s' not available", m->name_with_crc); } } #if VAPI_DEBUG_CONNECT VAPI_DBG ("finished probing messages"); #endif if (!vapi_is_msg_available (ctx, vapi_msg_id_control_ping) || !vapi_is_msg_available (ctx, vapi_msg_id_control_ping_reply)) { VAPI_ERR ( "control ping or control ping reply not available, cannot connect"); rv = VAPI_EINCOMPATIBLE; goto fail; } ctx->mode = mode; ctx->connected = true; if (vapi_is_msg_available (ctx, vapi_msg_id_memclnt_keepalive)) { ctx->handle_keepalives = handle_keepalives; } else { ctx->handle_keepalives = false; } return VAPI_OK; fail: vapi_client_disconnect (ctx); vl_client_api_unmap (); return rv; } vapi_error_e vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix, int max_outstanding_requests, int response_queue_size, vapi_mode_e mode, bool handle_keepalives) { return vapi_connect_ex (ctx, name, chroot_prefix, max_outstanding_requests, response_queue_size, mode, handle_keepalives, false); } /* * API client running in the same process as VPP */ vapi_error_e vapi_connect_from_vpp (vapi_ctx_t ctx, const char *name, int max_outstanding_requests, int response_queue_size, vapi_mode_e mode, bool handle_keepalives) { int rv; if (ctx->use_uds) { return VAPI_ENOTSUP; } if (response_queue_size <= 0 || max_outstanding_requests <= 0) { return VAPI_EINVAL; } ctx->requests_size = max_outstanding_requests; const size_t size = ctx->requests_size * sizeof (*ctx->requests); void *tmp = realloc (ctx->requests, size); if (!tmp) { return VAPI_ENOMEM; } ctx->requests = tmp; clib_memset (ctx->requests, 0, size); /* coverity[MISSING_LOCK] - 177211 requests_mutex is not needed here */ ctx->requests_start = ctx->requests_count = 0; VAPI_DBG ("connect client `%s'", name); if (vapi_shm_client_connect (ctx, (char *) name, 0, response_queue_size, handle_keepalives) < 0) { return VAPI_ECON_FAIL; } int i; for (i = 0; i < __vapi_metadata.count; ++i) { vapi_message_desc_t *m = __vapi_metadata.msgs[i]; u8 scratch[m->name_with_crc_len + 1]; memcpy (scratch, m->name_with_crc, m->name_with_crc_len + 1); u32 id = vapi_api_get_msg_index (ctx, scratch); if (VAPI_INVALID_MSG_ID != id) { if (id > UINT16_MAX) { VAPI_ERR ("Returned vl_msg_id `%u' > UINT16MAX `%u'!", id, UINT16_MAX); rv = VAPI_EINVAL; goto fail; } if (id > ctx->vl_msg_id_max) { vapi_msg_id_t *tmp = realloc (ctx->vl_msg_id_to_vapi_msg_t, sizeof (*ctx->vl_msg_id_to_vapi_msg_t) * (id + 1)); if (!tmp) { rv = VAPI_ENOMEM; goto fail; } ctx->vl_msg_id_to_vapi_msg_t = tmp; ctx->vl_msg_id_max = id; } ctx->vl_msg_id_to_vapi_msg_t[id] = m->id; ctx->vapi_msg_id_t_to_vl_msg_id[m->id] = id; } else { ctx->vapi_msg_id_t_to_vl_msg_id[m->id] = UINT16_MAX; VAPI_DBG ("Message `%s' not available", m->name_with_crc); } } if (!vapi_is_msg_available (ctx, vapi_msg_id_control_ping) || !vapi_is_msg_available (ctx, vapi_msg_id_control_ping_reply)) { VAPI_ERR ( "control ping or control ping reply not available, cannot connect"); rv = VAPI_EINCOMPATIBLE; goto fail; } ctx->mode = mode; ctx->connected = true; if (vapi_is_msg_available (ctx, vapi_msg_id_memclnt_keepalive)) { ctx->handle_keepalives = handle_keepalives; } else { ctx->handle_keepalives = false; } return VAPI_OK; fail: vapi_client_disconnect (ctx); return rv; } vapi_error_e vapi_disconnect_from_vpp (vapi_ctx_t ctx) { if (!ctx->connected) { return VAPI_EINVAL; } if (ctx->use_uds) { return VAPI_ENOTSUP; } vl_api_memclnt_delete_reply_t *rp; svm_queue_t *vl_input_queue; time_t begin; vl_input_queue = ctx->vl_input_queue; vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */); /* * Have to be careful here, in case the client is disconnecting * because e.g. the vlib process died, or is unresponsive. */ begin = time (0); vapi_error_e rv = VAPI_OK; while (1) { time_t now; now = time (0); if (now >= (begin + 2)) { clib_warning ("peer unresponsive, give up"); ctx->my_client_index = ~0; rv = VAPI_ENORESP; goto fail; } if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0) continue; VL_MSG_API_UNPOISON (rp); /* drain the queue */ if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_DELETE_REPLY) { clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id)); vl_msg_api_free (rp); continue; } vapi_memclnt_delete_reply_t_handler ( ctx, (void *) rp /*, ntohl (msgbuf->data_len)*/); break; } fail: vapi_api_name_and_crc_free (ctx); ctx->connected = false; return rv; } static vapi_error_e vapi_shm_disconnect (vapi_ctx_t ctx) { vl_api_memclnt_delete_reply_t *rp; svm_queue_t *vl_input_queue; time_t begin; vl_input_queue = ctx->vl_input_queue; vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */); /* * Have to be careful here, in case the client is disconnecting * because e.g. the vlib process died, or is unresponsive. */ begin = time (0); vapi_error_e rv = VAPI_OK; while (1) { time_t now; now = time (0); if (now >= (begin + 2)) { clib_warning ("peer unresponsive, give up"); ctx->my_client_index = ~0; rv = VAPI_ENORESP; goto fail; } if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0) continue; VL_MSG_API_UNPOISON (rp); /* drain the queue */ if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_DELETE_REPLY) { clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id)); vl_msg_api_free (rp); continue; } vapi_memclnt_delete_reply_t_handler ( ctx, (void *) rp /*, ntohl (msgbuf->data_len)*/); break; } fail: vapi_api_name_and_crc_free (ctx); vl_client_api_unmap (); #if VAPI_DEBUG_ALLOC vapi_to_be_freed_validate (); #endif ctx->connected = false; return rv; } static vapi_error_e vapi_sock_disconnect (vapi_ctx_t ctx) { vl_api_sockclnt_delete_reply_t *rp; time_t begin; u8 *msg = 0; vapi_sock_client_send_disconnect (ctx); begin = time (0); vapi_error_e rv = VAPI_OK; while (1) { time_t now; now = time (0); if (now >= (begin + 2)) { clib_warning ("peer unresponsive, give up"); ctx->my_client_index = ~0; rv = VAPI_ENORESP; goto fail; } if (vapi_sock_recv_internal (ctx, &msg, 0) < 0) continue; if (vec_len (msg) == 0) continue; rp = (void *) msg; /* drain the queue */ if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_DELETE_REPLY) { clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id)); continue; } vapi_sockclnt_delete_reply_t_handler ( ctx, (void *) rp /*, ntohl (msgbuf->data_len)*/); break; } fail: clib_socket_close (&ctx->client_socket); clib_socket_free (&ctx->client_socket); vapi_api_name_and_crc_free (ctx); ctx->connected = false; return rv; } vapi_error_e vapi_disconnect (vapi_ctx_t ctx) { if (!ctx->connected) { return VAPI_EINVAL; } if (ctx->use_uds) { return vapi_sock_disconnect (ctx); } return vapi_shm_disconnect (ctx); } vapi_error_e vapi_get_fd (vapi_ctx_t ctx, int *fd) { if (ctx->use_uds && fd) { *fd = ctx->client_socket.fd; return VAPI_OK; } return VAPI_ENOTSUP; } #if VAPI_DEBUG static void vapi_debug_log (vapi_ctx_t ctx, void *msg, const char *fun) { unsigned msgid = be16toh (*(u16 *) msg); if (msgid <= ctx->vl_msg_id_max) { vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid]; if (id < __vapi_metadata.count) { VAPI_DBG ("%s msg@%p:%u[%s]", fun, msg, msgid, __vapi_metadata.msgs[id]->name); } else { VAPI_DBG ("%s msg@%p:%u[UNKNOWN]", fun, msg, msgid); } } else { VAPI_DBG ("%s msg@%p:%u[UNKNOWN]", fun, msg, msgid); } } #endif static vapi_error_e vapi_shm_send (vapi_ctx_t ctx, void *msg) { int rv = VAPI_OK; int tmp; svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue; #if VAPI_DEBUG vapi_debug_log (ctx, msg, "send"); #endif tmp = svm_queue_add (q, (u8 *) &msg, VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1); if (tmp < 0) { rv = VAPI_EAGAIN; } else VL_MSG_API_POISON (msg); return rv; } vapi_error_e vapi_send (vapi_ctx_t ctx, void *msg) { vapi_error_e rv = VAPI_OK; if (!ctx || !msg || !ctx->connected) { rv = VAPI_EINVAL; goto out; } if (ctx->use_uds) { rv = vapi_sock_send (ctx, msg); } else { rv = vapi_shm_send (ctx, msg); } out: VAPI_DBG ("vapi_send() rv = %d", rv); return rv; } static vapi_error_e vapi_shm_send2 (vapi_ctx_t ctx, void *msg1, void *msg2) { vapi_error_e rv = VAPI_OK; svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue; #if VAPI_DEBUG vapi_debug_log (ctx, msg1, "send2"); vapi_debug_log (ctx, msg2, "send2"); #endif int tmp = svm_queue_add2 (q, (u8 *) &msg1, (u8 *) &msg2, VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1); if (tmp < 0) { rv = VAPI_EAGAIN; } else VL_MSG_API_POISON (msg1); return rv; } vapi_error_e vapi_send2 (vapi_ctx_t ctx, void *msg1, void *msg2) { vapi_error_e rv = VAPI_OK; if (!ctx || !msg1 || !msg2 || !ctx->connected) { rv = VAPI_EINVAL; goto out; } if (ctx->use_uds) { rv = vapi_sock_send2 (ctx, msg1, msg2); } else { rv = vapi_shm_send2 (ctx, msg1, msg2); } out: VAPI_DBG ("vapi_send() rv = %d", rv); return rv; } static vapi_error_e vapi_shm_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size, svm_q_conditional_wait_t cond, u32 time) { vapi_error_e rv = VAPI_OK; uword data; svm_queue_t *q = ctx->vl_input_queue; VAPI_DBG ("doing shm queue sub"); int tmp = svm_queue_sub (q, (u8 *) & data, cond, time); if (tmp != 0) { return VAPI_EAGAIN; } VL_MSG_API_UNPOISON ((void *) data); #if VAPI_DEBUG_ALLOC vapi_add_to_be_freed ((void *) data); #endif msgbuf_t *msgbuf = (msgbuf_t *) ((u8 *) data - offsetof (msgbuf_t, data)); if (!msgbuf->data_len) { vapi_msg_free (ctx, (u8 *) data); return VAPI_EAGAIN; } *msg = (u8 *) data; *msg_size = ntohl (msgbuf->data_len); #if VAPI_DEBUG vapi_debug_log (ctx, msg, "recv"); #endif return rv; } static vapi_error_e vapi_sock_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size, u32 time) { vapi_error_e rv = VAPI_OK; u8 *data = 0; if (time == 0 && ctx->mode == VAPI_MODE_BLOCKING) time = 1; rv = vapi_sock_recv_internal (ctx, &data, time); if (rv != VAPI_OK) { return rv; } *msg = data; *msg_size = vec_len (data); #if VAPI_DEBUG vapi_debug_log (ctx, msg, "recv"); #endif return rv; } vapi_error_e vapi_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size, svm_q_conditional_wait_t cond, u32 time) { if (!ctx || !ctx->connected || !msg || !msg_size) { return VAPI_EINVAL; } vapi_error_e rv = VAPI_OK; again: if (ctx->use_uds) { rv = vapi_sock_recv (ctx, msg, msg_size, time); } else { rv = vapi_shm_recv (ctx, msg, msg_size, cond, time); } if (rv != VAPI_OK) return rv; if (ctx->handle_keepalives) { unsigned msgid = be16toh (*(u16 *) *msg); if (msgid == vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive)) { vapi_msg_memclnt_keepalive_reply *reply = NULL; do { reply = vapi_msg_alloc (ctx, sizeof (*reply)); } while (!reply); reply->header.context = vapi_get_client_index (ctx); reply->header._vl_msg_id = vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive_reply); reply->payload.retval = 0; vapi_msg_memclnt_keepalive_reply_hton (reply); while (VAPI_EAGAIN == vapi_send (ctx, reply)) ; vapi_msg_free (ctx, *msg); goto again; } } return rv; } vapi_error_e vapi_wait (vapi_ctx_t ctx) { if (ctx->use_uds) return VAPI_ENOTSUP; svm_queue_lock (ctx->vl_input_queue); svm_queue_wait (ctx->vl_input_queue); svm_queue_unlock (ctx->vl_input_queue); return VAPI_OK; } static vapi_error_e vapi_dispatch_response (vapi_ctx_t ctx, vapi_msg_id_t id, u32 context, void *msg) { int mrv; if (0 != (mrv = pthread_mutex_lock (&ctx->requests_mutex))) { VAPI_DBG ("pthread_mutex_lock() failed, rv=%d:%s", mrv, strerror (mrv)); return VAPI_MUTEX_FAILURE; } int tmp = ctx->requests_start; const int requests_end = vapi_requests_end (ctx); while (ctx->requests[tmp].context != context && tmp != requests_end) { ++tmp; if (tmp == ctx->requests_size) { tmp = 0; } } VAPI_DBG ("dispatch, search from %d, %s at %d", ctx->requests_start, ctx->requests[tmp].context == context ? "matched" : "stopped", tmp); vapi_error_e rv = VAPI_OK; if (ctx->requests[tmp].context == context) { while (ctx->requests_start != tmp) { VAPI_ERR ("No response to req with context=%u", (unsigned) ctx->requests[tmp].context); ctx->requests[ctx->requests_start].callback (ctx, ctx->requests [ctx-> requests_start].callback_ctx, VAPI_ENORESP, true, NULL); clib_memset (&ctx->requests[ctx->requests_start], 0, sizeof (ctx->requests[ctx->requests_start])); ++ctx->requests_start; --ctx->requests_count; if (ctx->requests_start == ctx->requests_size) { ctx->requests_start = 0; } } // now ctx->requests_start == tmp int payload_offset = vapi_get_payload_offset (id); void *payload = ((u8 *) msg) + payload_offset; bool is_last = true; switch (ctx->requests[tmp].type) { case VAPI_REQUEST_STREAM: if (ctx->requests[tmp].response_id == id) { is_last = false; } else { VAPI_DBG ("Stream response ID doesn't match current ID, move to " "next ID"); clib_memset (&ctx->requests[tmp], 0, sizeof (ctx->requests[tmp])); ++ctx->requests_start; --ctx->requests_count; if (ctx->requests_start == ctx->requests_size) { ctx->requests_start = 0; } tmp = ctx->requests_start; if (ctx->requests[tmp].context != context) { VAPI_ERR ("Unexpected context %u, expected context %u!", ctx->requests[tmp].context, context); } } break; case VAPI_REQUEST_DUMP: if (vapi_msg_id_control_ping_reply == id) { payload = NULL; } else { is_last = false; } break; case VAPI_REQUEST_REG: break; } if (payload_offset != -1) { rv = ctx->requests[tmp].callback ( ctx, ctx->requests[tmp].callback_ctx, VAPI_OK, is_last, payload); } else { /* this is a message without payload, so bend the callback a little */ rv = ((vapi_error_e (*)(vapi_ctx_t, void *, vapi_error_e, bool)) ctx->requests[tmp].callback) (ctx, ctx->requests[tmp].callback_ctx, VAPI_OK, is_last); } if (is_last) { clib_memset (&ctx->requests[ctx->requests_start], 0, sizeof (ctx->requests[ctx->requests_start])); ++ctx->requests_start; --ctx->requests_count; if (ctx->requests_start == ctx->requests_size) { ctx->requests_start = 0; } } VAPI_DBG ("after dispatch, req start = %d, end = %d, count = %d", ctx->requests_start, requests_end, ctx->requests_count); } if (0 != (mrv = pthread_mutex_unlock (&ctx->requests_mutex))) { VAPI_DBG ("pthread_mutex_unlock() failed, rv=%d:%s", mrv, strerror (mrv)); abort (); /* this really shouldn't happen */ } return rv; } static vapi_error_e vapi_dispatch_event (vapi_ctx_t ctx, vapi_msg_id_t id, void *msg) { if (ctx->event_cbs[id].cb) { return ctx->event_cbs[id].cb (ctx, ctx->event_cbs[id].ctx, msg); } else if (ctx->generic_cb.cb) { return ctx->generic_cb.cb (ctx, ctx->generic_cb.ctx, id, msg); } else { VAPI_DBG ("No handler/generic handler for msg id %u[%s], message ignored", (unsigned) id, __vapi_metadata.msgs[id]->name); } return VAPI_OK; } bool vapi_msg_is_with_context (vapi_msg_id_t id) { assert (id <= __vapi_metadata.count); return __vapi_metadata.msgs[id]->has_context; } static int vapi_verify_msg_size (vapi_msg_id_t id, void *buf, uword buf_size) { assert (id < __vapi_metadata.count); return __vapi_metadata.msgs[id]->verify_msg_size (buf, buf_size); } vapi_error_e vapi_dispatch_one_timedwait (vapi_ctx_t ctx, u32 wait_time) { VAPI_DBG ("vapi_dispatch_one()"); void *msg; uword size; svm_q_conditional_wait_t cond = vapi_is_nonblocking (ctx) ? (wait_time ? SVM_Q_TIMEDWAIT : SVM_Q_NOWAIT) : SVM_Q_WAIT; vapi_error_e rv = vapi_recv (ctx, &msg, &size, cond, wait_time); if (VAPI_OK != rv) { VAPI_DBG ("vapi_recv failed with rv=%d", rv); return rv; } u16 vpp_id = be16toh (*(u16 *) msg); if (vpp_id > ctx->vl_msg_id_max) { VAPI_ERR ("Unknown msg ID received, id `%u', out of range <0,%u>", (unsigned) vpp_id, (unsigned) ctx->vl_msg_id_max); vapi_msg_free (ctx, msg); return VAPI_EINVAL; } if (VAPI_INVALID_MSG_ID == (unsigned) ctx->vl_msg_id_to_vapi_msg_t[vpp_id]) { VAPI_ERR ("Unknown msg ID received, id `%u' marked as not supported", (unsigned) vpp_id); vapi_msg_free (ctx, msg); return VAPI_EINVAL; } const vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[vpp_id]; vapi_get_swap_to_host_func (id) (msg); if (vapi_verify_msg_size (id, msg, size)) { vapi_msg_free (ctx, msg); return VAPI_EINVAL; } u32 context; if (vapi_msg_is_with_context (id)) { context = *(u32 *) (((u8 *) msg) + vapi_get_context_offset (id)); /* is this a message originating from VAPI? */ VAPI_DBG ("dispatch, context is %x", context); if (context & context_counter_mask) { rv = vapi_dispatch_response (ctx, id, context, msg); goto done; } } rv = vapi_dispatch_event (ctx, id, msg); done: vapi_msg_free (ctx, msg); return rv; } vapi_error_e vapi_dispatch_one (vapi_ctx_t ctx) { return vapi_dispatch_one_timedwait (ctx, 0); } vapi_error_e vapi_dispatch (vapi_ctx_t ctx) { vapi_error_e rv = VAPI_OK; while (!vapi_requests_empty (ctx)) { rv = vapi_dispatch_one (ctx); if (VAPI_OK != rv) { return rv; } } return rv; } void vapi_set_event_cb (vapi_ctx_t ctx, vapi_msg_id_t id, vapi_event_cb callback, void *callback_ctx) { vapi_event_cb_with_ctx *c = &ctx->event_cbs[id]; c->cb = callback; c->ctx = callback_ctx; } void vapi_clear_event_cb (vapi_ctx_t ctx, vapi_msg_id_t id) { vapi_set_event_cb (ctx, id, NULL, NULL); } void vapi_set_generic_event_cb (vapi_ctx_t ctx, vapi_generic_event_cb callback, void *callback_ctx) { ctx->generic_cb.cb = callback; ctx->generic_cb.ctx = callback_ctx; } void vapi_clear_generic_event_cb (vapi_ctx_t ctx) { ctx->generic_cb.cb = NULL; ctx->generic_cb.ctx = NULL; } u16 vapi_lookup_vl_msg_id (vapi_ctx_t ctx, vapi_msg_id_t id) { assert (id < __vapi_metadata.count); return ctx->vapi_msg_id_t_to_vl_msg_id[id]; } int vapi_get_client_index (vapi_ctx_t ctx) { return ctx->my_client_index; } bool vapi_is_nonblocking (vapi_ctx_t ctx) { return (VAPI_MODE_NONBLOCKING == ctx->mode); } size_t vapi_get_max_request_count (vapi_ctx_t ctx) { return ctx->requests_size - 1; } int vapi_get_payload_offset (vapi_msg_id_t id) { assert (id < __vapi_metadata.count); return __vapi_metadata.msgs[id]->payload_offset; } void (*vapi_get_swap_to_host_func (vapi_msg_id_t id)) (void *msg) { assert (id < __vapi_metadata.count); return __vapi_metadata.msgs[id]->swap_to_host; } void (*vapi_get_swap_to_be_func (vapi_msg_id_t id)) (void *msg) { assert (id < __vapi_metadata.count); return __vapi_metadata.msgs[id]->swap_to_be; } size_t vapi_get_context_offset (vapi_msg_id_t id) { assert (id < __vapi_metadata.count); return __vapi_metadata.msgs[id]->context_offset; } vapi_msg_id_t vapi_register_msg (vapi_message_desc_t * msg) { int i = 0; for (i = 0; i < __vapi_metadata.count; ++i) { if (!strcmp (msg->name_with_crc, __vapi_metadata.msgs[i]->name_with_crc)) { /* this happens if somebody is linking together several objects while * using the static inline headers, just fill in the already * assigned id here so that all the objects are in sync */ msg->id = __vapi_metadata.msgs[i]->id; return msg->id; } } vapi_msg_id_t id = __vapi_metadata.count; ++__vapi_metadata.count; __vapi_metadata.msgs = realloc (__vapi_metadata.msgs, sizeof (*__vapi_metadata.msgs) * __vapi_metadata.count); __vapi_metadata.msgs[id] = msg; size_t s = strlen (msg->name_with_crc); if (s > __vapi_metadata.max_len_name_with_crc) { __vapi_metadata.max_len_name_with_crc = s; } msg->id = id; return id; } vapi_error_e vapi_producer_lock (vapi_ctx_t ctx) { int mrv; if (0 != (mrv = pthread_mutex_lock (&ctx->requests_mutex))) { VAPI_DBG ("pthread_mutex_lock() failed, rv=%d:%s", mrv, strerror (mrv)); (void) mrv; /* avoid warning if the above debug is not enabled */ return VAPI_MUTEX_FAILURE; } return VAPI_OK; } vapi_error_e vapi_producer_unlock (vapi_ctx_t ctx) { int mrv; if (0 != (mrv = pthread_mutex_unlock (&ctx->requests_mutex))) { VAPI_DBG ("pthread_mutex_unlock() failed, rv=%d:%s", mrv, strerror (mrv)); (void) mrv; /* avoid warning if the above debug is not enabled */ return VAPI_MUTEX_FAILURE; } return VAPI_OK; } size_t vapi_get_message_count () { return __vapi_metadata.count; } const char * vapi_get_msg_name (vapi_msg_id_t id) { return __vapi_metadata.msgs[id]->name; } void vapi_stop_rx_thread (vapi_ctx_t ctx) { if (!ctx || !ctx->connected || !ctx->vl_input_queue) { return; } vl_client_stop_rx_thread (ctx->vl_input_queue); } /* * fd.io coding-style-patch-verification: ON * * Local Variables: * eval: (c-set-style "gnu") * End: */