aboutsummaryrefslogtreecommitdiffstats
path: root/src/uri
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2017-10-02 00:18:51 -0700
committerDave Barach <openvpp@barachs.net>2017-10-16 21:41:11 +0000
commit3cbc04bea02fc60471dfe0c671ede3ca42c118c3 (patch)
tree6128beab7dfb01c6221da2f675078078170e75ac /src/uri
parent0cb01bde499979066389975ba81670764914cbc2 (diff)
udp: refactor udp code
Change-Id: I44d5c9df7c49b8d4d5677c6d319033b2da3e6b80 Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src/uri')
-rwxr-xr-xsrc/uri/uri_tcp_test.c7
-rw-r--r--src/uri/uri_udp_test.c204
-rw-r--r--src/uri/vppcom.c2
3 files changed, 171 insertions, 42 deletions
diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c
index 41d3d4c1f42..89f070f72cd 100755
--- a/src/uri/uri_tcp_test.c
+++ b/src/uri/uri_tcp_test.c
@@ -327,6 +327,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
svm_fifo_segment_create_args_t _a, *a = &_a;
int rv;
+ memset (a, 0, sizeof (*a));
a->segment_name = (char *) mp->segment_name;
a->segment_size = mp->segment_size;
/* Attach to the segment vpp created */
@@ -590,7 +591,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
u32 bytes_to_snd;
u32 queue_max_chunk = 128 << 10, actual_write;
session_fifo_event_t evt;
- static int serial_number = 0;
int rv;
bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
@@ -615,7 +615,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_APP_TX;
- evt.event_id = serial_number++;
unix_shared_memory_queue_add (utm->vpp_event_queue,
(u8 *) & evt,
@@ -918,6 +917,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
memset (rmp, 0, sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
rmp->handle = mp->handle;
+ rmp->context = mp->context;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
session->bytes_received = 0;
@@ -983,7 +983,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_APP_TX;
- evt.event_id = e->event_id;
q = utm->vpp_event_queue;
unix_shared_memory_queue_add (q, (u8 *) & evt,
@@ -997,7 +996,7 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm,
void
server_handle_event_queue (uri_tcp_test_main_t * utm)
{
- session_fifo_event_t _e, *e = &_e;;
+ session_fifo_event_t _e, *e = &_e;
while (1)
{
diff --git a/src/uri/uri_udp_test.c b/src/uri/uri_udp_test.c
index d559d5726c7..27e70cf944c 100644
--- a/src/uri/uri_udp_test.c
+++ b/src/uri/uri_udp_test.c
@@ -50,6 +50,7 @@
typedef enum
{
STATE_START,
+ STATE_BOUND,
STATE_READY,
STATE_FAILED,
STATE_DISCONNECTING,
@@ -97,6 +98,7 @@ typedef struct
/* $$$$ hack: cut-through session index */
volatile u32 cut_through_session_index;
+ volatile u32 connected_session;
/* unique segment name counter */
u32 unique_segment_index;
@@ -123,6 +125,7 @@ typedef struct
/* convenience */
svm_fifo_segment_main_t *segment_main;
+ u8 *connect_test_data;
} uri_udp_test_main_t;
#if CLIB_DEBUG > 0
@@ -163,7 +166,7 @@ void
application_send_attach (uri_udp_test_main_t * utm)
{
vl_api_application_attach_t *bmp;
- u32 fifo_size = 3 << 20;
+ u32 fifo_size = 1 << 20;
bmp = vl_msg_api_alloc (sizeof (*bmp));
memset (bmp, 0, sizeof (*bmp));
@@ -172,11 +175,12 @@ application_send_attach (uri_udp_test_main_t * utm)
bmp->context = ntohl (0xfeedface);
bmp->options[APP_OPTIONS_FLAGS] =
APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT;
- bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16;
+ bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2;
bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size;
bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size;
bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20;
bmp->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 20;
+ bmp->options[APP_EVT_QUEUE_SIZE] = 16768;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp);
}
@@ -348,7 +352,7 @@ udp_client_connect (uri_udp_test_main_t * utm)
}
static void
-client_send (uri_udp_test_main_t * utm, session_t * session)
+client_send_cut_through (uri_udp_test_main_t * utm, session_t * session)
{
int i;
u8 *test_data = 0;
@@ -391,7 +395,6 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
}
bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
-
bytes_to_read = vec_len (utm->rx_buf) > bytes_to_read ?
bytes_to_read : vec_len (utm->rx_buf);
@@ -451,7 +454,114 @@ client_send (uri_udp_test_main_t * utm, session_t * session)
}
static void
-uri_udp_client_test (uri_udp_test_main_t * utm)
+send_test_chunk (uri_udp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
+ u32 bytes)
+{
+ u8 *test_data = utm->connect_test_data;
+ u64 bytes_sent = 0;
+ int test_buf_offset = 0;
+ u32 bytes_to_snd;
+ u32 queue_max_chunk = 128 << 10, actual_write;
+ session_fifo_event_t evt;
+ int rv;
+
+ bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
+ if (bytes_to_snd > vec_len (test_data))
+ bytes_to_snd = vec_len (test_data);
+
+ while (bytes_to_snd > 0 && !utm->time_to_stop)
+ {
+ actual_write = (bytes_to_snd > queue_max_chunk) ?
+ queue_max_chunk : bytes_to_snd;
+ rv = svm_fifo_enqueue_nowait (tx_fifo, actual_write,
+ test_data + test_buf_offset);
+
+ if (rv > 0)
+ {
+ bytes_to_snd -= rv;
+ test_buf_offset += rv;
+ bytes_sent += rv;
+
+ if (svm_fifo_set_event (tx_fifo))
+ {
+ /* Fabricate TX event, send to vpp */
+ evt.fifo = tx_fifo;
+ evt.event_type = FIFO_EVENT_APP_TX;
+
+ unix_shared_memory_queue_add (utm->vpp_event_queue,
+ (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
+ }
+ }
+}
+
+static void
+recv_test_chunk (uri_udp_test_main_t * utm, session_t * session)
+{
+ svm_fifo_t *rx_fifo;
+ int buffer_offset, bytes_to_read = 0, rv;
+
+ rx_fifo = session->server_rx_fifo;
+ bytes_to_read = svm_fifo_max_dequeue (rx_fifo);
+ bytes_to_read =
+ vec_len (utm->rx_buf) > bytes_to_read ?
+ bytes_to_read : vec_len (utm->rx_buf);
+
+ buffer_offset = 0;
+ while (bytes_to_read > 0)
+ {
+ rv = svm_fifo_dequeue_nowait (rx_fifo, bytes_to_read,
+ utm->rx_buf + buffer_offset);
+ if (rv > 0)
+ {
+ bytes_to_read -= rv;
+ buffer_offset += rv;
+ }
+ }
+}
+
+void
+client_send_data (uri_udp_test_main_t * utm)
+{
+ u8 *test_data;
+ int mypid = getpid ();
+ session_t *session;
+ svm_fifo_t *tx_fifo;
+ u32 n_iterations;
+ int i;
+
+ vec_validate (utm->connect_test_data, 64 * 1024 - 1);
+ for (i = 0; i < vec_len (utm->connect_test_data); i++)
+ utm->connect_test_data[i] = i & 0xff;
+
+ test_data = utm->connect_test_data;
+ session = pool_elt_at_index (utm->sessions, utm->connected_session);
+ tx_fifo = session->server_tx_fifo;
+
+ ASSERT (vec_len (test_data) > 0);
+
+ vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+ n_iterations = NITER;
+
+ for (i = 0; i < n_iterations; i++)
+ {
+ send_test_chunk (utm, tx_fifo, mypid, 0);
+ recv_test_chunk (utm, session);
+ if (utm->time_to_stop)
+ break;
+ }
+
+ f64 timeout = clib_time_now (&utm->clib_time) + 5;
+ while (clib_time_now (&utm->clib_time) < timeout)
+ {
+ recv_test_chunk (utm, session);
+ }
+
+}
+
+static void
+client_test (uri_udp_test_main_t * utm)
{
session_t *session;
@@ -464,10 +574,18 @@ uri_udp_client_test (uri_udp_test_main_t * utm)
return;
}
- /* Only works with cut through sessions */
- session = pool_elt_at_index (utm->sessions, utm->cut_through_session_index);
+ if (utm->cut_through_session_index != ~0)
+ {
+ session = pool_elt_at_index (utm->sessions,
+ utm->cut_through_session_index);
+ client_send_cut_through (utm, session);
+ }
+ else
+ {
+ session = pool_elt_at_index (utm->sessions, utm->connected_session);
+ client_send_data (utm);
+ }
- client_send (utm, session);
application_detach (utm);
}
@@ -483,7 +601,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp)
return;
}
- utm->state = STATE_READY;
+ utm->state = STATE_BOUND;
}
static void
@@ -492,6 +610,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
svm_fifo_segment_create_args_t _a, *a = &_a;
int rv;
+ memset (a, 0, sizeof (*a));
a->segment_name = (char *) mp->segment_name;
a->segment_size = mp->segment_size;
/* Attach to the segment vpp created */
@@ -625,8 +744,6 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
hash_set (utm->session_index_by_vpp_handles, mp->handle,
session - utm->sessions);
- utm->state = STATE_READY;
-
if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0)
{
f64 now = clib_time_now (&utm->clib_time);
@@ -639,7 +756,11 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp)
memset (rmp, 0, sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
rmp->handle = mp->handle;
+ rmp->context = mp->context;
vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp);
+
+ CLIB_MEMORY_BARRIER ();
+ utm->state = STATE_READY;
}
static void
@@ -677,16 +798,22 @@ static void
vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
{
uri_udp_test_main_t *utm = &uri_udp_test_main;
+ session_t *session;
ASSERT (utm->i_am_master == 0);
+ if (mp->retval)
+ {
+ clib_warning ("failed connect");
+ return;
+ }
+
/* We've been redirected */
if (mp->segment_name_length > 0)
{
svm_fifo_segment_main_t *sm = &svm_fifo_segment_main;
svm_fifo_segment_create_args_t _a, *a = &_a;
u32 segment_index;
- session_t *session;
svm_fifo_segment_private_t *seg;
int rv;
@@ -707,20 +834,24 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp)
vec_add2 (utm->seg, seg, 1);
memcpy (seg, sm->segments + segment_index, sizeof (*seg));
sleep (1);
-
- pool_get (utm->sessions, session);
- utm->cut_through_session_index = session - utm->sessions;
-
- session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo,
- svm_fifo_t *);
- ASSERT (session->server_rx_fifo);
- session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo,
- svm_fifo_t *);
- ASSERT (session->server_tx_fifo);
}
- /* security: could unlink /dev/shm/<mp->segment_name> here, maybe */
+ pool_get (utm->sessions, session);
+ session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo,
+ svm_fifo_t *);
+ ASSERT (session->server_rx_fifo);
+ session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo,
+ svm_fifo_t *);
+ ASSERT (session->server_tx_fifo);
+ if (mp->segment_name_length > 0)
+ utm->cut_through_session_index = session - utm->sessions;
+ else
+ {
+ utm->connected_session = session - utm->sessions;
+ utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address,
+ unix_shared_memory_queue_t *);
+ }
utm->state = STATE_READY;
}
@@ -789,13 +920,13 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
{
svm_fifo_t *rx_fifo, *tx_fifo;
int nbytes;
-
session_fifo_event_t evt;
unix_shared_memory_queue_t *q;
int rv;
rx_fifo = e->fifo;
tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
+ svm_fifo_unset_event (rx_fifo);
do
{
@@ -809,13 +940,11 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm,
}
while (rv == -2);
- /* Fabricate TX event, send to vpp */
- evt.fifo = tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
- evt.event_id = e->event_id;
-
if (svm_fifo_set_event (tx_fifo))
{
+ /* Fabricate TX event, send to vpp */
+ evt.fifo = tx_fifo;
+ evt.event_type = FIFO_EVENT_APP_TX;
q = utm->vpp_event_queue;
unix_shared_memory_queue_add (q, (u8 *) & evt,
0 /* do wait for mutex */ );
@@ -827,6 +956,9 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
{
session_fifo_event_t _e, *e = &_e;
+ while (utm->state != STATE_READY)
+ sleep (5);
+
while (1)
{
unix_shared_memory_queue_sub (utm->our_event_queue, (u8 *) e,
@@ -845,7 +977,7 @@ server_handle_event_queue (uri_udp_test_main_t * utm)
break;
}
if (PREDICT_FALSE (utm->time_to_stop == 1))
- break;
+ return;
if (PREDICT_FALSE (utm->time_to_print_stats == 1))
{
utm->time_to_print_stats = 0;
@@ -869,7 +1001,7 @@ server_unbind (uri_udp_test_main_t * utm)
}
static void
-server_listen (uri_udp_test_main_t * utm)
+server_bind (uri_udp_test_main_t * utm)
{
vl_api_bind_uri_t *bmp;
@@ -890,11 +1022,11 @@ udp_server_test (uri_udp_test_main_t * utm)
application_send_attach (utm);
/* Bind to uri */
- server_listen (utm);
+ server_bind (utm);
- if (wait_for_state_change (utm, STATE_READY))
+ if (wait_for_state_change (utm, STATE_BOUND))
{
- clib_warning ("timeout waiting for STATE_READY");
+ clib_warning ("timeout waiting for STATE_BOUND");
return;
}
@@ -976,7 +1108,7 @@ main (int argc, char **argv)
utm->i_am_master = i_am_master;
utm->segment_main = &svm_fifo_segment_main;
- utm->connect_uri = format (0, "udp://6.0.0.1/1234%c", 0);
+ utm->connect_uri = format (0, "udp://6.0.1.2/1234%c", 0);
setup_signal_handlers ();
@@ -991,7 +1123,7 @@ main (int argc, char **argv)
if (i_am_master == 0)
{
- uri_udp_client_test (utm);
+ client_test (utm);
exit (0);
}
diff --git a/src/uri/vppcom.c b/src/uri/vppcom.c
index a8e3a5005ba..f0bd2f86c64 100644
--- a/src/uri/vppcom.c
+++ b/src/uri/vppcom.c
@@ -136,7 +136,6 @@ typedef struct vppcom_main_t_
u8 init;
u32 *client_session_index_fifo;
volatile u32 bind_session_index;
- u32 tx_event_id;
int main_cpu;
/* vpe input queue */
@@ -2328,7 +2327,6 @@ vppcom_session_write (uint32_t session_index, void *buf, int n)
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_APP_TX;
- evt.event_id = vcm->tx_event_id++;
rval = vppcom_session_at_index (session_index, &session);
if (PREDICT_FALSE (rval))