From dfc9b7cac857a3a49555f9fc448bd2c6aa3400a6 Mon Sep 17 00:00:00 2001 From: Ole Troan Date: Mon, 6 Mar 2017 23:51:57 +0100 Subject: Python API: Synchronous mode. Change-Id: Ic8f186dbb35bb4e2e191d311cab51315a88a2d81 Signed-off-by: Ole Troan --- src/vpp-api/python/pneum/pneum.c | 273 ++++++++++++++++++++++++++++++++++----- src/vpp-api/python/pneum/pneum.h | 4 +- 2 files changed, 243 insertions(+), 34 deletions(-) (limited to 'src/vpp-api/python/pneum') diff --git a/src/vpp-api/python/pneum/pneum.c b/src/vpp-api/python/pneum/pneum.c index 37c8d8fe..da9d69df 100644 --- a/src/vpp-api/python/pneum/pneum.c +++ b/src/vpp-api/python/pneum/pneum.c @@ -22,9 +22,7 @@ #include #include #include -#include #include - #include #include #include @@ -35,6 +33,16 @@ #include "pneum.h" +/* + * Asynchronous mode: + * Client registers a callback. All messages are sent to the callback. + * Synchronous mode: + * Client calls blocking read(). + * Clients are expected to collate events on a queue. + * pneum_write() -> suspends RX thread + * pneum_read() -> resumes RX thread + */ + #define vl_typedefs /* define message structures */ #include #undef vl_typedefs @@ -47,15 +55,50 @@ vlib_main_t vlib_global_main; vlib_main_t **vlib_mains; typedef struct { - u8 rx_thread_jmpbuf_valid; u8 connected_to_vlib; - jmp_buf rx_thread_jmpbuf; pthread_t rx_thread_handle; + pthread_t timeout_thread_handle; + pthread_mutex_t queue_lock; + pthread_cond_t suspend_cv; + pthread_cond_t resume_cv; + pthread_mutex_t timeout_lock; + pthread_cond_t timeout_cv; + pthread_cond_t timeout_cancel_cv; + pthread_cond_t terminate_cv; } pneum_main_t; pneum_main_t pneum_main; - pneum_callback_t pneum_callback; +u16 read_timeout = 0; +bool rx_is_running = false; + +static void +init (void) +{ + pneum_main_t *pm = &pneum_main; + memset(pm, 0, sizeof(*pm)); + pthread_mutex_init(&pm->queue_lock, NULL); + pthread_cond_init(&pm->suspend_cv, NULL); + pthread_cond_init(&pm->resume_cv, NULL); + pthread_mutex_init(&pm->timeout_lock, NULL); + pthread_cond_init(&pm->timeout_cv, NULL); + pthread_cond_init(&pm->timeout_cancel_cv, NULL); + pthread_cond_init(&pm->terminate_cv, NULL); +} + +static void +cleanup (void) +{ + pneum_main_t *pm = &pneum_main; + pthread_cond_destroy(&pm->suspend_cv); + pthread_cond_destroy(&pm->resume_cv); + pthread_cond_destroy(&pm->timeout_cv); + pthread_cond_destroy(&pm->timeout_cancel_cv); + pthread_cond_destroy(&pm->terminate_cv); + pthread_mutex_destroy(&pm->queue_lock); + pthread_mutex_destroy(&pm->timeout_lock); + memset (pm, 0, sizeof (*pm)); +} /* * Satisfy external references when -lvlib is not available. @@ -75,11 +118,6 @@ static void pneum_api_handler (void *msg) { u16 id = ntohs(*((u16 *)msg)); - if (id == VL_API_RX_THREAD_EXIT) { - pneum_main_t *pm = &pneum_main; - vl_msg_api_free(msg); - longjmp(pm->rx_thread_jmpbuf, 1); - } msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data)); int l = ntohl(msgbuf->data_len); if (l == 0) @@ -101,16 +139,108 @@ pneum_rx_thread_fn (void *arg) q = am->vl_input_queue; - /* So we can make the rx thread terminate cleanly */ - if (setjmp(pm->rx_thread_jmpbuf) == 0) { - pm->rx_thread_jmpbuf_valid = 1; - while (1) - while (!unix_shared_memory_queue_sub(q, (u8 *)&msg, 0)) - pneum_api_handler((void *)msg); - } + while (1) + while (!unix_shared_memory_queue_sub(q, (u8 *)&msg, 0)) + { + u16 id = ntohs(*((u16 *)msg)); + switch (id) { + case VL_API_RX_THREAD_EXIT: + vl_msg_api_free((void *) msg); + /* signal waiting threads that this thread is about to terminate */ + pthread_mutex_lock(&pm->queue_lock); + pthread_cond_signal(&pm->terminate_cv); + pthread_mutex_unlock(&pm->queue_lock); + pthread_exit(0); + return 0; + break; + + case VL_API_MEMCLNT_RX_THREAD_SUSPEND: + vl_msg_api_free((void * )msg); + /* Suspend thread and signal reader */ + pthread_mutex_lock(&pm->queue_lock); + pthread_cond_signal(&pm->suspend_cv); + /* Wait for the resume signal */ + pthread_cond_wait (&pm->resume_cv, &pm->queue_lock); + pthread_mutex_unlock(&pm->queue_lock); + break; + + case VL_API_MEMCLNT_READ_TIMEOUT: + clib_warning("Received read timeout in async thread\n"); + vl_msg_api_free((void *) msg); + break; + + default: + pneum_api_handler((void *)msg); + } + } +} + +static void * +pneum_timeout_thread_fn (void *arg) +{ + vl_api_memclnt_read_timeout_t *ep; + pneum_main_t *pm = &pneum_main; + api_main_t *am = &api_main; + struct timespec ts; + struct timeval tv; + u16 timeout; + int rv; + + while (1) + { + /* Wait for poke */ + pthread_mutex_lock(&pm->timeout_lock); + pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock); + timeout = read_timeout; + gettimeofday(&tv, NULL); + ts.tv_sec = tv.tv_sec + timeout; + ts.tv_nsec = 0; + rv = pthread_cond_timedwait (&pm->timeout_cancel_cv, + &pm->timeout_lock, &ts); + pthread_mutex_unlock(&pm->timeout_lock); + if (rv == ETIMEDOUT) + { + ep = vl_msg_api_alloc (sizeof (*ep)); + ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT); + vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep); + } + } pthread_exit(0); } +void +pneum_rx_suspend (void) +{ + api_main_t *am = &api_main; + pneum_main_t *pm = &pneum_main; + vl_api_memclnt_rx_thread_suspend_t *ep; + + if (!pm->rx_thread_handle) return; + pthread_mutex_lock(&pm->queue_lock); + if (rx_is_running) + { + ep = vl_msg_api_alloc (sizeof (*ep)); + ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND); + vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep); + /* Wait for RX thread to tell us it has suspendend */ + pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock); + rx_is_running = false; + } + pthread_mutex_unlock(&pm->queue_lock); +} + +void +pneum_rx_resume (void) +{ + pneum_main_t *pm = &pneum_main; + if (!pm->rx_thread_handle) return; + pthread_mutex_lock(&pm->queue_lock); + if (rx_is_running) return; + pthread_cond_signal(&pm->resume_cv); + rx_is_running = true; + pthread_mutex_unlock(&pm->queue_lock); +} + uword * pneum_msg_table_get_hash (void) { @@ -126,12 +256,13 @@ pneum_msg_table_size(void) } int -pneum_connect (char * name, char * chroot_prefix, pneum_callback_t cb, +pneum_connect (char * name, char * chroot_prefix, pneum_callback_t cb, int rx_qlen) { int rv = 0; pneum_main_t *pm = &pneum_main; + init(); if (chroot_prefix != NULL) vl_set_memory_root_path (chroot_prefix); @@ -154,6 +285,16 @@ pneum_connect (char * name, char * chroot_prefix, pneum_callback_t cb, return (-1); } pneum_callback = cb; + rx_is_running = true; + } + + /* Start read timeout thread */ + rv = pthread_create(&pm->timeout_thread_handle, NULL, + pneum_timeout_thread_fn, 0); + if (rv) { + clib_warning("pthread_create returned %d", rv); + vl_client_api_unmap(); + return (-1); } pm->connected_to_vlib = 1; @@ -167,31 +308,69 @@ pneum_disconnect (void) api_main_t *am = &api_main; pneum_main_t *pm = &pneum_main; - if (pm->rx_thread_jmpbuf_valid) { + if (!pm->connected_to_vlib) return 0; + + if (pm->rx_thread_handle) { vl_api_rx_thread_exit_t *ep; uword junk; ep = vl_msg_api_alloc (sizeof (*ep)); ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT); vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep); - pthread_join(pm->rx_thread_handle, (void **) &junk); - } - if (pm->connected_to_vlib) { - vl_client_disconnect(); - vl_client_api_unmap(); - pneum_callback = 0; + + /* wait (with timeout) until RX thread has finished */ + struct timespec ts; + struct timeval tv; + gettimeofday(&tv, NULL); + ts.tv_sec = tv.tv_sec + 5; + ts.tv_nsec = 0; + pthread_mutex_lock(&pm->queue_lock); + int rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts); + pthread_mutex_unlock(&pm->queue_lock); + /* now join so we wait until thread has -really- finished */ + if (rv == ETIMEDOUT) + pthread_cancel(pm->rx_thread_handle); + else + pthread_join(pm->rx_thread_handle, (void **) &junk); } - memset (pm, 0, sizeof (*pm)); + if (pm->timeout_thread_handle) + pthread_cancel(pm->timeout_thread_handle); + + vl_client_disconnect(); + vl_client_api_unmap(); + pneum_callback = 0; + + cleanup(); return (0); } +static void +set_timeout (unsigned short timeout) +{ + pneum_main_t *pm = &pneum_main; + pthread_mutex_lock(&pm->timeout_lock); + read_timeout = timeout; + pthread_cond_signal(&pm->timeout_cv); + pthread_mutex_unlock(&pm->timeout_lock); +} + +static void +unset_timeout (void) +{ + pneum_main_t *pm = &pneum_main; + pthread_mutex_lock(&pm->timeout_lock); + pthread_cond_signal(&pm->timeout_cancel_cv); + pthread_mutex_unlock(&pm->timeout_lock); +} + int -pneum_read (char **p, int *l) +pneum_read (char **p, int *l, u16 timeout) { unix_shared_memory_queue_t *q; api_main_t *am = &api_main; pneum_main_t *pm = &pneum_main; uword msg; + msgbuf_t *msgbuf; if (!pm->connected_to_vlib) return -1; @@ -199,21 +378,48 @@ pneum_read (char **p, int *l) if (am->our_pid == 0) return (-1); + /* Poke timeout thread */ + if (timeout) + set_timeout(timeout); + q = am->vl_input_queue; int rv = unix_shared_memory_queue_sub(q, (u8 *)&msg, 0); if (rv == 0) { u16 msg_id = ntohs(*((u16 *)msg)); - msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data)); - *l = ntohl(msgbuf->data_len); - if (*l == 0) { - printf("Unregistered API message: %d\n", msg_id); - return (-1); + switch (msg_id) { + case VL_API_RX_THREAD_EXIT: + printf("Received thread exit\n"); + return -1; + case VL_API_MEMCLNT_RX_THREAD_SUSPEND: + printf("Received thread suspend\n"); + goto error; + case VL_API_MEMCLNT_READ_TIMEOUT: + printf("Received read timeout %ds\n", timeout); + goto error; + + default: + msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data)); + *l = ntohl(msgbuf->data_len); + if (*l == 0) { + printf("Unregistered API message: %d\n", msg_id); + goto error; + } } *p = (char *)msg; + + /* Let timeout notification thread know we're done */ + unset_timeout(); + } else { printf("Read failed with %d\n", rv); } return (rv); + + error: + vl_msg_api_free((void *) msg); + /* Client might forget to resume RX thread on failure */ + pneum_rx_resume (); + return -1; } /* @@ -241,12 +447,13 @@ pneum_write (char *p, int l) if (!pm->connected_to_vlib) return -1; if (!mp) return (-1); + memcpy(mp, p, l); mp->client_index = pneum_client_index(); q = am->shmem_hdr->vl_input_queue; rv = unix_shared_memory_queue_add(q, (u8 *)&mp, 0); if (rv != 0) { - printf("vpe_api_write fails: %d\n", rv); + clib_warning("vpe_api_write fails: %d\n", rv); /* Clear message */ pneum_free(mp); } diff --git a/src/vpp-api/python/pneum/pneum.h b/src/vpp-api/python/pneum/pneum.h index 9312eb47..c4b55ae0 100644 --- a/src/vpp-api/python/pneum/pneum.h +++ b/src/vpp-api/python/pneum/pneum.h @@ -22,11 +22,13 @@ typedef void (*pneum_callback_t)(unsigned char * data, int len); int pneum_connect(char * name, char * chroot_prefix, pneum_callback_t cb, int rx_qlen); int pneum_disconnect(void); -int pneum_read(char **data, int *l); +int pneum_read(char **data, int *l, unsigned short timeout); int pneum_write(char *data, int len); void pneum_free(void * msg); uword * pneum_msg_table_get_hash (void); int pneum_msg_table_size(void); uint32_t pneum_get_msg_index(unsigned char * name); +void pneum_rx_suspend (void); +void pneum_rx_resume (void); #endif -- cgit 1.2.3-korg