diff options
author | Florin Coras <fcoras@cisco.com> | 2017-10-02 00:18:51 -0700 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2017-10-16 21:41:11 +0000 |
commit | 3cbc04bea02fc60471dfe0c671ede3ca42c118c3 (patch) | |
tree | 6128beab7dfb01c6221da2f675078078170e75ac /src/uri | |
parent | 0cb01bde499979066389975ba81670764914cbc2 (diff) |
udp: refactor udp code
Change-Id: I44d5c9df7c49b8d4d5677c6d319033b2da3e6b80
Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/uri')
-rwxr-xr-x | src/uri/uri_tcp_test.c | 7 | ||||
-rw-r--r-- | src/uri/uri_udp_test.c | 204 | ||||
-rw-r--r-- | src/uri/vppcom.c | 2 |
3 files changed, 171 insertions, 42 deletions
diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c index 41d3d4c1f42..89f070f72cd 100755 --- a/src/uri/uri_tcp_test.c +++ b/src/uri/uri_tcp_test.c @@ -327,6 +327,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) svm_fifo_segment_create_args_t _a, *a = &_a; int rv; + memset (a, 0, sizeof (*a)); a->segment_name = (char *) mp->segment_name; a->segment_size = mp->segment_size; /* Attach to the segment vpp created */ @@ -590,7 +591,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid, u32 bytes_to_snd; u32 queue_max_chunk = 128 << 10, actual_write; session_fifo_event_t evt; - static int serial_number = 0; int rv; bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes; @@ -615,7 +615,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid, /* Fabricate TX event, send to vpp */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = serial_number++; unix_shared_memory_queue_add (utm->vpp_event_queue, (u8 *) & evt, @@ -918,6 +917,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) memset (rmp, 0, sizeof (*rmp)); rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY); rmp->handle = mp->handle; + rmp->context = mp->context; vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp); session->bytes_received = 0; @@ -983,7 +983,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm, /* Fabricate TX event, send to vpp */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = e->event_id; q = utm->vpp_event_queue; unix_shared_memory_queue_add (q, (u8 *) & evt, @@ -997,7 +996,7 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm, void server_handle_event_queue (uri_tcp_test_main_t * utm) { - session_fifo_event_t _e, *e = &_e;; + session_fifo_event_t _e, *e = &_e; while (1) { diff --git a/src/uri/uri_udp_test.c b/src/uri/uri_udp_test.c index d559d5726c7..27e70cf944c 100644 --- a/src/uri/uri_udp_test.c +++ b/src/uri/uri_udp_test.c @@ -50,6 +50,7 @@ typedef enum { STATE_START, + STATE_BOUND, STATE_READY, STATE_FAILED, STATE_DISCONNECTING, @@ -97,6 +98,7 @@ typedef struct /* $$$$ hack: cut-through session index */ volatile u32 cut_through_session_index; + volatile u32 connected_session; /* unique segment name counter */ u32 unique_segment_index; @@ -123,6 +125,7 @@ typedef struct /* convenience */ svm_fifo_segment_main_t *segment_main; + u8 *connect_test_data; } uri_udp_test_main_t; #if CLIB_DEBUG > 0 @@ -163,7 +166,7 @@ void application_send_attach (uri_udp_test_main_t * utm) { vl_api_application_attach_t *bmp; - u32 fifo_size = 3 << 20; + u32 fifo_size = 1 << 20; bmp = vl_msg_api_alloc (sizeof (*bmp)); memset (bmp, 0, sizeof (*bmp)); @@ -172,11 +175,12 @@ application_send_attach (uri_udp_test_main_t * utm) bmp->context = ntohl (0xfeedface); bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT; - bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16; + bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2; bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size; bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size; bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20; bmp->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 20; + bmp->options[APP_EVT_QUEUE_SIZE] = 16768; vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp); } @@ -348,7 +352,7 @@ udp_client_connect (uri_udp_test_main_t * utm) } static void -client_send (uri_udp_test_main_t * utm, session_t * session) +client_send_cut_through (uri_udp_test_main_t * utm, session_t * session) { int i; u8 *test_data = 0; @@ -391,7 +395,6 @@ client_send (uri_udp_test_main_t * utm, session_t * session) } bytes_to_read = svm_fifo_max_dequeue (rx_fifo); - bytes_to_read = vec_len (utm->rx_buf) > bytes_to_read ? bytes_to_read : vec_len (utm->rx_buf); @@ -451,7 +454,114 @@ client_send (uri_udp_test_main_t * utm, session_t * session) } static void -uri_udp_client_test (uri_udp_test_main_t * utm) +send_test_chunk (uri_udp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid, + u32 bytes) +{ + u8 *test_data = utm->connect_test_data; + u64 bytes_sent = 0; + int test_buf_offset = 0; + u32 bytes_to_snd; + u32 queue_max_chunk = 128 << 10, actual_write; + session_fifo_event_t evt; + int rv; + + bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes; + if (bytes_to_snd > vec_len (test_data)) + bytes_to_snd = vec_len (test_data); + + while (bytes_to_snd > 0 && !utm->time_to_stop) + { + actual_write = (bytes_to_snd > queue_max_chunk) ? + queue_max_chunk : bytes_to_snd; + rv = svm_fifo_enqueue_nowait (tx_fifo, actual_write, + test_data + test_buf_offset); + + if (rv > 0) + { + bytes_to_snd -= rv; + test_buf_offset += rv; + bytes_sent += rv; + + if (svm_fifo_set_event (tx_fifo)) + { + /* Fabricate TX event, send to vpp */ + evt.fifo = tx_fifo; + evt.event_type = FIFO_EVENT_APP_TX; + + unix_shared_memory_queue_add (utm->vpp_event_queue, + (u8 *) & evt, + 0 /* do wait for mutex */ ); + } + } + } +} + +static void +recv_test_chunk (uri_udp_test_main_t * utm, session_t * session) +{ + svm_fifo_t *rx_fifo; + int buffer_offset, bytes_to_read = 0, rv; + + rx_fifo = session->server_rx_fifo; + bytes_to_read = svm_fifo_max_dequeue (rx_fifo); + bytes_to_read = + vec_len (utm->rx_buf) > bytes_to_read ? + bytes_to_read : vec_len (utm->rx_buf); + + buffer_offset = 0; + while (bytes_to_read > 0) + { + rv = svm_fifo_dequeue_nowait (rx_fifo, bytes_to_read, + utm->rx_buf + buffer_offset); + if (rv > 0) + { + bytes_to_read -= rv; + buffer_offset += rv; + } + } +} + +void +client_send_data (uri_udp_test_main_t * utm) +{ + u8 *test_data; + int mypid = getpid (); + session_t *session; + svm_fifo_t *tx_fifo; + u32 n_iterations; + int i; + + vec_validate (utm->connect_test_data, 64 * 1024 - 1); + for (i = 0; i < vec_len (utm->connect_test_data); i++) + utm->connect_test_data[i] = i & 0xff; + + test_data = utm->connect_test_data; + session = pool_elt_at_index (utm->sessions, utm->connected_session); + tx_fifo = session->server_tx_fifo; + + ASSERT (vec_len (test_data) > 0); + + vec_validate (utm->rx_buf, vec_len (test_data) - 1); + n_iterations = NITER; + + for (i = 0; i < n_iterations; i++) + { + send_test_chunk (utm, tx_fifo, mypid, 0); + recv_test_chunk (utm, session); + if (utm->time_to_stop) + break; + } + + f64 timeout = clib_time_now (&utm->clib_time) + 5; + while (clib_time_now (&utm->clib_time) < timeout) + { + recv_test_chunk (utm, session); + } + +} + +static void +client_test (uri_udp_test_main_t * utm) { session_t *session; @@ -464,10 +574,18 @@ uri_udp_client_test (uri_udp_test_main_t * utm) return; } - /* Only works with cut through sessions */ - session = pool_elt_at_index (utm->sessions, utm->cut_through_session_index); + if (utm->cut_through_session_index != ~0) + { + session = pool_elt_at_index (utm->sessions, + utm->cut_through_session_index); + client_send_cut_through (utm, session); + } + else + { + session = pool_elt_at_index (utm->sessions, utm->connected_session); + client_send_data (utm); + } - client_send (utm, session); application_detach (utm); } @@ -483,7 +601,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp) return; } - utm->state = STATE_READY; + utm->state = STATE_BOUND; } static void @@ -492,6 +610,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) svm_fifo_segment_create_args_t _a, *a = &_a; int rv; + memset (a, 0, sizeof (*a)); a->segment_name = (char *) mp->segment_name; a->segment_size = mp->segment_size; /* Attach to the segment vpp created */ @@ -625,8 +744,6 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) hash_set (utm->session_index_by_vpp_handles, mp->handle, session - utm->sessions); - utm->state = STATE_READY; - if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0) { f64 now = clib_time_now (&utm->clib_time); @@ -639,7 +756,11 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) memset (rmp, 0, sizeof (*rmp)); rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY); rmp->handle = mp->handle; + rmp->context = mp->context; vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp); + + CLIB_MEMORY_BARRIER (); + utm->state = STATE_READY; } static void @@ -677,16 +798,22 @@ static void vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp) { uri_udp_test_main_t *utm = &uri_udp_test_main; + session_t *session; ASSERT (utm->i_am_master == 0); + if (mp->retval) + { + clib_warning ("failed connect"); + return; + } + /* We've been redirected */ if (mp->segment_name_length > 0) { svm_fifo_segment_main_t *sm = &svm_fifo_segment_main; svm_fifo_segment_create_args_t _a, *a = &_a; u32 segment_index; - session_t *session; svm_fifo_segment_private_t *seg; int rv; @@ -707,20 +834,24 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp) vec_add2 (utm->seg, seg, 1); memcpy (seg, sm->segments + segment_index, sizeof (*seg)); sleep (1); - - pool_get (utm->sessions, session); - utm->cut_through_session_index = session - utm->sessions; - - session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo, - svm_fifo_t *); - ASSERT (session->server_rx_fifo); - session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo, - svm_fifo_t *); - ASSERT (session->server_tx_fifo); } - /* security: could unlink /dev/shm/<mp->segment_name> here, maybe */ + pool_get (utm->sessions, session); + session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo, + svm_fifo_t *); + ASSERT (session->server_rx_fifo); + session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo, + svm_fifo_t *); + ASSERT (session->server_tx_fifo); + if (mp->segment_name_length > 0) + utm->cut_through_session_index = session - utm->sessions; + else + { + utm->connected_session = session - utm->sessions; + utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address, + unix_shared_memory_queue_t *); + } utm->state = STATE_READY; } @@ -789,13 +920,13 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm, { svm_fifo_t *rx_fifo, *tx_fifo; int nbytes; - session_fifo_event_t evt; unix_shared_memory_queue_t *q; int rv; rx_fifo = e->fifo; tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo; + svm_fifo_unset_event (rx_fifo); do { @@ -809,13 +940,11 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm, } while (rv == -2); - /* Fabricate TX event, send to vpp */ - evt.fifo = tx_fifo; - evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = e->event_id; - if (svm_fifo_set_event (tx_fifo)) { + /* Fabricate TX event, send to vpp */ + evt.fifo = tx_fifo; + evt.event_type = FIFO_EVENT_APP_TX; q = utm->vpp_event_queue; unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ ); @@ -827,6 +956,9 @@ server_handle_event_queue (uri_udp_test_main_t * utm) { session_fifo_event_t _e, *e = &_e; + while (utm->state != STATE_READY) + sleep (5); + while (1) { unix_shared_memory_queue_sub (utm->our_event_queue, (u8 *) e, @@ -845,7 +977,7 @@ server_handle_event_queue (uri_udp_test_main_t * utm) break; } if (PREDICT_FALSE (utm->time_to_stop == 1)) - break; + return; if (PREDICT_FALSE (utm->time_to_print_stats == 1)) { utm->time_to_print_stats = 0; @@ -869,7 +1001,7 @@ server_unbind (uri_udp_test_main_t * utm) } static void -server_listen (uri_udp_test_main_t * utm) +server_bind (uri_udp_test_main_t * utm) { vl_api_bind_uri_t *bmp; @@ -890,11 +1022,11 @@ udp_server_test (uri_udp_test_main_t * utm) application_send_attach (utm); /* Bind to uri */ - server_listen (utm); + server_bind (utm); - if (wait_for_state_change (utm, STATE_READY)) + if (wait_for_state_change (utm, STATE_BOUND)) { - clib_warning ("timeout waiting for STATE_READY"); + clib_warning ("timeout waiting for STATE_BOUND"); return; } @@ -976,7 +1108,7 @@ main (int argc, char **argv) utm->i_am_master = i_am_master; utm->segment_main = &svm_fifo_segment_main; - utm->connect_uri = format (0, "udp://6.0.0.1/1234%c", 0); + utm->connect_uri = format (0, "udp://6.0.1.2/1234%c", 0); setup_signal_handlers (); @@ -991,7 +1123,7 @@ main (int argc, char **argv) if (i_am_master == 0) { - uri_udp_client_test (utm); + client_test (utm); exit (0); } diff --git a/src/uri/vppcom.c b/src/uri/vppcom.c index a8e3a5005ba..f0bd2f86c64 100644 --- a/src/uri/vppcom.c +++ b/src/uri/vppcom.c @@ -136,7 +136,6 @@ typedef struct vppcom_main_t_ u8 init; u32 *client_session_index_fifo; volatile u32 bind_session_index; - u32 tx_event_id; int main_cpu; /* vpe input queue */ @@ -2328,7 +2327,6 @@ vppcom_session_write (uint32_t session_index, void *buf, int n) /* Fabricate TX event, send to vpp */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = vcm->tx_event_id++; rval = vppcom_session_at_index (session_index, &session); if (PREDICT_FALSE (rval)) |