aboutsummaryrefslogtreecommitdiffstats
path: root/src/uri/uri_tcp_test.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/uri/uri_tcp_test.c')
-rw-r--r--src/uri/uri_tcp_test.c161
1 files changed, 110 insertions, 51 deletions
diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c
index 406a5f4eac7..e2834817208 100644
--- a/src/uri/uri_tcp_test.c
+++ b/src/uri/uri_tcp_test.c
@@ -116,6 +116,7 @@ typedef struct
pthread_t client_rx_thread_handle;
u32 client_bytes_received;
u8 test_return_packets;
+ u32 bytes_to_send;
/* convenience */
svm_fifo_segment_main_t *segment_main;
@@ -313,11 +314,16 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
rx_fifo = e->fifo;
- bytes = e->enqueue_length;
+ bytes = svm_fifo_max_dequeue (rx_fifo);
+ /* Allow enqueuing of new event */
+ svm_fifo_unset_event (rx_fifo);
+
+ /* Read the bytes */
do
{
- n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
- utm->rx_buf);
+ n_read = svm_fifo_dequeue_nowait (rx_fifo, 0,
+ clib_min (vec_len (utm->rx_buf),
+ bytes), utm->rx_buf);
if (n_read > 0)
{
bytes -= n_read;
@@ -333,9 +339,17 @@ client_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
}
utm->client_bytes_received += n_read;
}
+ else
+ {
+ if (n_read == -2)
+ {
+ clib_warning ("weird!");
+ break;
+ }
+ }
}
- while (n_read < 0 || bytes > 0);
+ while (bytes > 0);
}
void
@@ -479,47 +493,41 @@ vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
}
}
-void
-client_send_data (uri_tcp_test_main_t * utm)
+static void
+send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
+ u32 bytes)
{
u8 *test_data = utm->connect_test_data;
u64 bytes_sent = 0;
- int rv;
- int mypid = getpid ();
- session_t *session;
- svm_fifo_t *tx_fifo;
- int buffer_offset, bytes_to_send = 0;
+ int test_buf_offset = 0;
+ u32 bytes_to_snd;
+ u32 queue_max_chunk = 64 << 10, actual_write;
session_fifo_event_t evt;
static int serial_number = 0;
- int i;
- u32 max_chunk = 64 << 10, write;
-
- session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
- tx_fifo = session->server_tx_fifo;
+ int rv;
- vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+ bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
+ if (bytes_to_snd > vec_len (test_data))
+ bytes_to_snd = vec_len (test_data);
- for (i = 0; i < 1; i++)
+ while (bytes_to_snd > 0)
{
- bytes_to_send = vec_len (test_data);
- buffer_offset = 0;
- while (bytes_to_send > 0)
+ actual_write =
+ bytes_to_snd > queue_max_chunk ? queue_max_chunk : bytes_to_snd;
+ rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, actual_write,
+ test_data + test_buf_offset);
+
+ if (rv > 0)
{
- write = bytes_to_send > max_chunk ? max_chunk : bytes_to_send;
- rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, write,
- test_data + buffer_offset);
+ bytes_to_snd -= rv;
+ test_buf_offset += rv;
+ bytes_sent += rv;
- if (rv > 0)
+ if (svm_fifo_set_event (tx_fifo))
{
- bytes_to_send -= rv;
- buffer_offset += rv;
- bytes_sent += rv;
-
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = rv;
evt.event_id = serial_number++;
unix_shared_memory_queue_add (utm->vpp_event_queue,
@@ -528,13 +536,40 @@ client_send_data (uri_tcp_test_main_t * utm)
}
}
}
+}
+
+void
+client_send_data (uri_tcp_test_main_t * utm)
+{
+ u8 *test_data = utm->connect_test_data;
+ int mypid = getpid ();
+ session_t *session;
+ svm_fifo_t *tx_fifo;
+ u32 n_iterations, leftover;
+ int i;
+
+ session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
+ tx_fifo = session->server_tx_fifo;
+
+ vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+ n_iterations = utm->bytes_to_send / vec_len (test_data);
+
+ for (i = 0; i < n_iterations; i++)
+ {
+ send_test_chunk (utm, tx_fifo, mypid, 0);
+ }
+
+ leftover = utm->bytes_to_send % vec_len (test_data);
+ if (leftover)
+ send_test_chunk (utm, tx_fifo, mypid, leftover);
if (utm->test_return_packets)
{
f64 timeout = clib_time_now (&utm->clib_time) + 2;
/* Wait for the outstanding packets */
- while (utm->client_bytes_received < vec_len (test_data))
+ while (utm->client_bytes_received <
+ vec_len (test_data) * n_iterations + leftover)
{
if (clib_time_now (&utm->clib_time) > timeout)
{
@@ -542,9 +577,8 @@ client_send_data (uri_tcp_test_main_t * utm)
break;
}
}
-
- utm->time_to_stop = 1;
}
+ utm->time_to_stop = 1;
}
void
@@ -599,6 +633,11 @@ client_test (uri_tcp_test_main_t * utm)
/* Disconnect */
client_disconnect (utm);
+
+ if (wait_for_state_change (utm, STATE_START))
+ {
+ return;
+ }
}
static void
@@ -714,7 +753,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
{
svm_fifo_t *rx_fifo, *tx_fifo;
int n_read;
-
session_fifo_event_t evt;
unix_shared_memory_queue_t *q;
int rv, bytes;
@@ -722,34 +760,46 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
rx_fifo = e->fifo;
tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
- bytes = e->enqueue_length;
+ bytes = svm_fifo_max_dequeue (rx_fifo);
+ /* Allow enqueuing of a new event */
+ svm_fifo_unset_event (rx_fifo);
+
+ if (bytes == 0)
+ return;
+
+ /* Read the bytes */
do
{
n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
utm->rx_buf);
+ if (n_read > 0)
+ bytes -= n_read;
+
+ if (utm->drop_packets)
+ continue;
/* Reflect if a non-drop session */
- if (!utm->drop_packets && n_read > 0)
+ if (n_read > 0)
{
do
{
rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf);
}
- while (rv == -2 && !utm->time_to_stop);
-
- /* Fabricate TX event, send to vpp */
- evt.fifo = tx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = n_read;
- evt.event_id = e->event_id;
- q = utm->vpp_event_queue;
- unix_shared_memory_queue_add (q, (u8 *) & evt,
- 0 /* do wait for mutex */ );
- }
+ while (rv <= 0 && !utm->time_to_stop);
- if (n_read > 0)
- bytes -= n_read;
+ /* If event wasn't set, add one */
+ if (svm_fifo_set_event (tx_fifo))
+ {
+ /* Fabricate TX event, send to vpp */
+ evt.fifo = tx_fifo;
+ evt.event_type = FIFO_EVENT_SERVER_TX;
+ evt.event_id = e->event_id;
+
+ q = utm->vpp_event_queue;
+ unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
+ }
}
while ((n_read < 0 || bytes > 0) && !utm->time_to_stop);
}
@@ -852,7 +902,10 @@ static void
vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
mp)
{
+ uri_tcp_test_main_t *utm = &uri_tcp_test_main;
+
clib_warning ("retval %d", ntohl (mp->retval));
+ utm->state = STATE_START;
}
#define foreach_uri_msg \
@@ -888,6 +941,7 @@ main (int argc, char **argv)
u8 *heap, *uri = 0;
u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234";
u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
+ u32 bytes_to_send = 64 << 10, mbytes;
u32 tmp;
mheap_t *h;
session_t *session;
@@ -934,6 +988,10 @@ main (int argc, char **argv)
drop_packets = 1;
else if (unformat (a, "test"))
test_return_packets = 1;
+ else if (unformat (a, "mbytes %d", &mbytes))
+ {
+ bytes_to_send = mbytes << 20;
+ }
else
{
fformat (stderr, "%s: usage [master|slave]\n");
@@ -956,6 +1014,7 @@ main (int argc, char **argv)
utm->segment_main = &svm_fifo_segment_main;
utm->drop_packets = drop_packets;
utm->test_return_packets = test_return_packets;
+ utm->bytes_to_send = bytes_to_send;
setup_signal_handlers ();
uri_api_hookup (utm);