diff options
Diffstat (limited to 'src/uri/uri_tcp_test.c')
-rw-r--r-- | src/uri/uri_tcp_test.c | 161 |
1 files changed, 110 insertions, 51 deletions
diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c index 406a5f4e..e2834817 100644 --- a/src/uri/uri_tcp_test.c +++ b/src/uri/uri_tcp_test.c @@ -116,6 +116,7 @@ typedef struct pthread_t client_rx_thread_handle; u32 client_bytes_received; u8 test_return_packets; + u32 bytes_to_send; /* convenience */ svm_fifo_segment_main_t *segment_main; @@ -313,11 +314,16 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm, rx_fifo = e->fifo; - bytes = e->enqueue_length; + bytes = svm_fifo_max_dequeue (rx_fifo); + /* Allow enqueuing of new event */ + svm_fifo_unset_event (rx_fifo); + + /* Read the bytes */ do { - n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf), - utm->rx_buf); + n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, + clib_min (vec_len (utm->rx_buf), + bytes), utm->rx_buf); if (n_read > 0) { bytes -= n_read; @@ -333,9 +339,17 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm, } utm->client_bytes_received += n_read; } + else + { + if (n_read == -2) + { + clib_warning ("weird!"); + break; + } + } } - while (n_read < 0 || bytes > 0); + while (bytes > 0); } void @@ -479,47 +493,41 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp) } } -void -client_send_data (uri_tcp_test_main_t * utm) +static void +send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid, + u32 bytes) { u8 *test_data = utm->connect_test_data; u64 bytes_sent = 0; - int rv; - int mypid = getpid (); - session_t *session; - svm_fifo_t *tx_fifo; - int buffer_offset, bytes_to_send = 0; + int test_buf_offset = 0; + u32 bytes_to_snd; + u32 queue_max_chunk = 64 << 10, actual_write; session_fifo_event_t evt; static int serial_number = 0; - int i; - u32 max_chunk = 64 << 10, write; - - session = pool_elt_at_index (utm->sessions, utm->connected_session_index); - tx_fifo = session->server_tx_fifo; + int rv; - vec_validate (utm->rx_buf, vec_len (test_data) - 1); + bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes; + if (bytes_to_snd > vec_len (test_data)) + bytes_to_snd = vec_len (test_data); - for (i = 0; i < 1; i++) + while (bytes_to_snd > 0) { - bytes_to_send = vec_len (test_data); - buffer_offset = 0; - while (bytes_to_send > 0) + actual_write = + bytes_to_snd > queue_max_chunk ? queue_max_chunk : bytes_to_snd; + rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, actual_write, + test_data + test_buf_offset); + + if (rv > 0) { - write = bytes_to_send > max_chunk ? max_chunk : bytes_to_send; - rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, write, - test_data + buffer_offset); + bytes_to_snd -= rv; + test_buf_offset += rv; + bytes_sent += rv; - if (rv > 0) + if (svm_fifo_set_event (tx_fifo)) { - bytes_to_send -= rv; - buffer_offset += rv; - bytes_sent += rv; - /* Fabricate TX event, send to vpp */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_SERVER_TX; - /* $$$$ for event logging */ - evt.enqueue_length = rv; evt.event_id = serial_number++; unix_shared_memory_queue_add (utm->vpp_event_queue, @@ -528,13 +536,40 @@ client_send_data (uri_tcp_test_main_t * utm) } } } +} + +void +client_send_data (uri_tcp_test_main_t * utm) +{ + u8 *test_data = utm->connect_test_data; + int mypid = getpid (); + session_t *session; + svm_fifo_t *tx_fifo; + u32 n_iterations, leftover; + int i; + + session = pool_elt_at_index (utm->sessions, utm->connected_session_index); + tx_fifo = session->server_tx_fifo; + + vec_validate (utm->rx_buf, vec_len (test_data) - 1); + n_iterations = utm->bytes_to_send / vec_len (test_data); + + for (i = 0; i < n_iterations; i++) + { + send_test_chunk (utm, tx_fifo, mypid, 0); + } + + leftover = utm->bytes_to_send % vec_len (test_data); + if (leftover) + send_test_chunk (utm, tx_fifo, mypid, leftover); if (utm->test_return_packets) { f64 timeout = clib_time_now (&utm->clib_time) + 2; /* Wait for the outstanding packets */ - while (utm->client_bytes_received < vec_len (test_data)) + while (utm->client_bytes_received < + vec_len (test_data) * n_iterations + leftover) { if (clib_time_now (&utm->clib_time) > timeout) { @@ -542,9 +577,8 @@ client_send_data (uri_tcp_test_main_t * utm) break; } } - - utm->time_to_stop = 1; } + utm->time_to_stop = 1; } void @@ -599,6 +633,11 @@ client_test (uri_tcp_test_main_t * utm) /* Disconnect */ client_disconnect (utm); + + if (wait_for_state_change (utm, STATE_START)) + { + return; + } } static void @@ -714,7 +753,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm, { svm_fifo_t *rx_fifo, *tx_fifo; int n_read; - session_fifo_event_t evt; unix_shared_memory_queue_t *q; int rv, bytes; @@ -722,34 +760,46 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm, rx_fifo = e->fifo; tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo; - bytes = e->enqueue_length; + bytes = svm_fifo_max_dequeue (rx_fifo); + /* Allow enqueuing of a new event */ + svm_fifo_unset_event (rx_fifo); + + if (bytes == 0) + return; + + /* Read the bytes */ do { n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf), utm->rx_buf); + if (n_read > 0) + bytes -= n_read; + + if (utm->drop_packets) + continue; /* Reflect if a non-drop session */ - if (!utm->drop_packets && n_read > 0) + if (n_read > 0) { do { rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf); } - while (rv == -2 && !utm->time_to_stop); - - /* Fabricate TX event, send to vpp */ - evt.fifo = tx_fifo; - evt.event_type = FIFO_EVENT_SERVER_TX; - /* $$$$ for event logging */ - evt.enqueue_length = n_read; - evt.event_id = e->event_id; - q = utm->vpp_event_queue; - unix_shared_memory_queue_add (q, (u8 *) & evt, - 0 /* do wait for mutex */ ); - } + while (rv <= 0 && !utm->time_to_stop); - if (n_read > 0) - bytes -= n_read; + /* If event wasn't set, add one */ + if (svm_fifo_set_event (tx_fifo)) + { + /* Fabricate TX event, send to vpp */ + evt.fifo = tx_fifo; + evt.event_type = FIFO_EVENT_SERVER_TX; + evt.event_id = e->event_id; + + q = utm->vpp_event_queue; + unix_shared_memory_queue_add (q, (u8 *) & evt, + 0 /* do wait for mutex */ ); + } + } } while ((n_read < 0 || bytes > 0) && !utm->time_to_stop); } @@ -852,7 +902,10 @@ static void vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t * mp) { + uri_tcp_test_main_t *utm = &uri_tcp_test_main; + clib_warning ("retval %d", ntohl (mp->retval)); + utm->state = STATE_START; } #define foreach_uri_msg \ @@ -888,6 +941,7 @@ main (int argc, char **argv) u8 *heap, *uri = 0; u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234"; u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234"; + u32 bytes_to_send = 64 << 10, mbytes; u32 tmp; mheap_t *h; session_t *session; @@ -934,6 +988,10 @@ main (int argc, char **argv) drop_packets = 1; else if (unformat (a, "test")) test_return_packets = 1; + else if (unformat (a, "mbytes %d", &mbytes)) + { + bytes_to_send = mbytes << 20; + } else { fformat (stderr, "%s: usage [master|slave]\n"); @@ -956,6 +1014,7 @@ main (int argc, char **argv) utm->segment_main = &svm_fifo_segment_main; utm->drop_packets = drop_packets; utm->test_return_packets = test_return_packets; + utm->bytes_to_send = bytes_to_send; setup_signal_handlers (); uri_api_hookup (utm); |