aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2018-07-27 05:45:06 -0700
committerDave Barach <openvpp@barachs.net>2018-07-31 11:36:54 +0000
commit460dce6e2d017cc7b2151fd0fa61d464570489d7 (patch)
tree19e944702f8b7fcb659a8166f50a03ee5eba9a7a
parente939bf1b508e1fae6929dd8cf0f3effdc2c12549 (diff)
vcl: add read/write udp support
Change-Id: Ie6171c12055cde6915856de340839f5da1b1b1da Signed-off-by: Florin Coras <fcoras@cisco.com>
-rw-r--r--src/vcl/vcl_bapi.c14
-rw-r--r--src/vcl/vcl_cfg.c1
-rw-r--r--src/vcl/vcl_private.h9
-rw-r--r--src/vcl/vcl_test_server.c17
-rw-r--r--src/vcl/vppcom.c142
-rw-r--r--src/vnet/session-apps/echo_server.c10
-rw-r--r--src/vnet/session/application.c6
-rw-r--r--src/vnet/session/application_interface.h39
-rw-r--r--src/vnet/session/session.h4
9 files changed, 149 insertions, 93 deletions
diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c
index 0201cd82966..311df64528c 100644
--- a/src/vcl/vcl_bapi.c
+++ b/src/vcl/vcl_bapi.c
@@ -132,6 +132,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
svm_fifo_segment_create_args_t *a = &_a;
int rv;
+ vcm->mounting_segment = 1;
memset (a, 0, sizeof (*a));
a->segment_name = (char *) mp->segment_name;
a->segment_size = mp->segment_size;
@@ -147,6 +148,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp)
VDBG (1, "VCL<%d>: mapped new segment '%s' size %d", getpid (),
mp->segment_name, mp->segment_size);
+ vcm->mounting_segment = 0;
}
static void
@@ -356,6 +358,18 @@ done:
vppcom_session_table_add_listener (mp->handle, session_index);
session->session_state = STATE_LISTEN;
+ if (session->is_dgram)
+ {
+ svm_fifo_t *rx_fifo, *tx_fifo;
+ session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
+ rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
+ rx_fifo->client_session_index = session_index;
+ tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
+ tx_fifo->client_session_index = session_index;
+ session->rx_fifo = rx_fifo;
+ session->tx_fifo = tx_fifo;
+ }
+
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: bind succeeded!",
getpid (), mp->handle, mp->context);
done_unlock:
diff --git a/src/vcl/vcl_cfg.c b/src/vcl/vcl_cfg.c
index 279a975719e..6e6a0ac0da4 100644
--- a/src/vcl/vcl_cfg.c
+++ b/src/vcl/vcl_cfg.c
@@ -21,6 +21,7 @@
*/
static vppcom_main_t _vppcom_main = {
.debug = VPPCOM_DEBUG_INIT,
+ .init = 0,
.my_client_index = ~0
};
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 327a7fc02fe..af58cdc3ac9 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -237,6 +237,9 @@ typedef struct vppcom_main_t_
/** Pool of cut through registrations */
vcl_cut_through_registration_t *cut_through_registrations;
+ /** Flag indicating that a new segment is being mounted */
+ volatile u32 mounting_segment;
+
#ifdef VCL_ELOG
/* VPP Event-logger */
elog_main_t elog_main;
@@ -313,6 +316,12 @@ vcl_session_get_index_from_handle (u64 handle)
return VCL_INVALID_SESSION_INDEX;
}
+static inline u8
+vcl_session_is_ct (vcl_session_t * s)
+{
+ return (s->our_evt_q != 0);
+}
+
static inline int
vppcom_session_at_index (u32 session_index, vcl_session_t * volatile *sess)
{
diff --git a/src/vcl/vcl_test_server.c b/src/vcl/vcl_test_server.c
index 6a2fda0be57..b49383eb73a 100644
--- a/src/vcl/vcl_test_server.c
+++ b/src/vcl/vcl_test_server.c
@@ -425,14 +425,17 @@ main (int argc, char **argv)
return -1;
}
- rv = vppcom_session_listen (ssm->listen_fd, 10);
- if (rv < 0)
+ if (!ssm->cfg.transport_udp)
{
- errno_val = errno = -rv;
- perror ("ERROR in main()");
- fprintf (stderr, "SERVER: ERROR: listen failed "
- "(errno = %d)!\n", errno_val);
- return -1;
+ rv = vppcom_session_listen (ssm->listen_fd, 10);
+ if (rv < 0)
+ {
+ errno_val = errno = -rv;
+ perror ("ERROR in main()");
+ fprintf (stderr, "SERVER: ERROR: listen failed "
+ "(errno = %d)!\n", errno_val);
+ return -1;
+ }
}
ssm->epfd = vppcom_epoll_create ();
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index d1c4413b2a8..a74e55af634 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -33,9 +33,15 @@ static void
vcl_wait_for_memory (void *mem)
{
u8 __clib_unused test;
+ if (vcm->mounting_segment)
+ {
+ while (vcm->mounting_segment)
+ ;
+ return;
+ }
if (1 || vcm->debug)
{
- sleep (1);
+ usleep (1e5);
return;
}
if (signal (SIGSEGV, sigsegv_signal))
@@ -359,6 +365,8 @@ vcl_session_accepted_handler (session_accepted_msg_t * mp)
hash_set (vcm->session_index_by_vpp_handles, mp->handle, session_index);
session->transport.lcl_port = listen_session->transport.lcl_port;
session->transport.lcl_ip = listen_session->transport.lcl_ip;
+ session->session_type = listen_session->session_type;
+ session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
" address %U port %d queue %p!", getpid (), mp->handle, session_index,
@@ -405,13 +413,18 @@ done:
if (rv)
goto done_unlock;
+ rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
+ tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
+ vcl_wait_for_memory (rx_fifo);
+ rx_fifo->client_session_index = session_index;
+ tx_fifo->client_session_index = session_index;
+
if (mp->client_event_queue_address)
{
session->vpp_evt_q = uword_to_pointer (mp->server_event_queue_address,
svm_msg_q_t *);
session->our_evt_q = uword_to_pointer (mp->client_event_queue_address,
svm_msg_q_t *);
- vcl_wait_for_memory (session->vpp_evt_q);
session->ct_registration = vcl_ct_registration_add (session->our_evt_q,
session_index);
}
@@ -419,11 +432,6 @@ done:
session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
svm_msg_q_t *);
- rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
- rx_fifo->client_session_index = session_index;
- tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
- tx_fifo->client_session_index = session_index;
-
session->rx_fifo = rx_fifo;
session->tx_fifo = tx_fifo;
session->vpp_handle = mp->handle;
@@ -762,11 +770,10 @@ vppcom_session_create (u8 proto, u8 is_nonblocking)
session->session_type = proto;
session->session_state = STATE_START;
session->vpp_handle = ~0;
+ session->is_dgram = proto == VPPCOM_PROTO_UDP;
if (is_nonblocking)
VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
- else
- VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
is_nonblocking, session_index);
@@ -929,6 +936,10 @@ vppcom_session_bind (uint32_t session_index, vppcom_endpt_t * ep)
session->session_type ? "UDP" : "TCP");
vcl_evt (VCL_EVT_BIND, session);
VCL_SESSION_UNLOCK ();
+
+ if (session->session_type == VPPCOM_PROTO_UDP)
+ vppcom_session_listen (session_index, 10);
+
done:
return rv;
}
@@ -1225,12 +1236,20 @@ vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
return (e->event_type == SESSION_IO_EVT_CT_TX);
}
+static inline u8
+vcl_session_is_readable (vcl_session_t * s)
+{
+ return ((s->session_state & STATE_OPEN)
+ || (s->session_state == STATE_LISTEN
+ && s->session_type == VPPCOM_PROTO_UDP));
+}
+
static inline int
vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
u8 peek)
{
int n_read = 0, rv, is_nonblocking;
- vcl_session_t *session = 0;
+ vcl_session_t *s = 0;
svm_fifo_t *rx_fifo;
svm_msg_q_msg_t msg;
session_event_t *e;
@@ -1239,9 +1258,9 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
ASSERT (buf);
- VCL_SESSION_LOCK_AND_GET (session_index, &session);
+ VCL_SESSION_LOCK_AND_GET (session_index, &s);
- if (PREDICT_FALSE (session->is_vep))
+ if (PREDICT_FALSE (s->is_vep))
{
VCL_SESSION_UNLOCK ();
clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
@@ -1250,24 +1269,24 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
goto done;
}
- is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
- rx_fifo = session->rx_fifo;
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
+ rx_fifo = s->rx_fifo;
- if (PREDICT_FALSE (!(session->session_state & STATE_OPEN)))
+ if (PREDICT_FALSE (!vcl_session_is_readable (s)))
{
- session_state_t state = session->session_state;
+ session_state_t state = s->session_state;
VCL_SESSION_UNLOCK ();
rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: %s session is not open! "
"state 0x%x (%s), returning %d (%s)",
- getpid (), session->vpp_handle, session_index, state,
+ getpid (), s->vpp_handle, session_index, state,
vppcom_session_state_str (state), rv, vppcom_retval_str (rv));
goto done;
}
VCL_SESSION_UNLOCK ();
- mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+ mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
is_full = svm_fifo_is_full (rx_fifo);
if (svm_fifo_is_empty (rx_fifo))
@@ -1286,7 +1305,7 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
if (!vcl_is_rx_evt_for_session (e, session_index,
- session->our_evt_q != 0))
+ s->our_evt_q != 0))
{
vcl_handle_mq_ctrl_event (e);
svm_msg_q_free_msg (mq, &msg);
@@ -1303,27 +1322,24 @@ vppcom_session_read_internal (uint32_t session_index, void *buf, int n,
}
}
- if (peek)
- n_read = svm_fifo_peek (rx_fifo, 0, n, buf);
+ if (s->is_dgram)
+ n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 1, peek);
else
- n_read = svm_fifo_dequeue_nowait (rx_fifo, n, buf);
- ASSERT (n_read > 0);
- svm_fifo_unset_event (rx_fifo);
+ n_read = app_recv_stream_raw (rx_fifo, buf, n, 1, peek);
- if (session->our_evt_q && is_full)
- app_send_io_evt_to_vpp (session->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
+ if (vcl_session_is_ct (s) && is_full)
+ app_send_io_evt_to_vpp (s->vpp_evt_q, rx_fifo, SESSION_IO_EVT_CT_RX,
SVM_Q_WAIT);
-
if (VPPCOM_DEBUG > 2)
{
if (n_read > 0)
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: read %d bytes "
- "from (%p)", getpid (), session->vpp_handle,
+ "from (%p)", getpid (), s->vpp_handle,
session_index, n_read, rx_fifo);
else
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: nothing read! "
- "returning %d (%s)", getpid (), session->vpp_handle,
+ "returning %d (%s)", getpid (), s->vpp_handle,
session_index, rv, vppcom_retval_str (rv));
}
return n_read;
@@ -1389,45 +1405,46 @@ int
vppcom_session_write (uint32_t session_index, void *buf, size_t n)
{
int rv, n_write, is_nonblocking;
- vcl_session_t *session = 0;
+ vcl_session_t *s = 0;
svm_fifo_t *tx_fifo = 0;
+ session_evt_type_t et;
svm_msg_q_msg_t msg;
session_event_t *e;
svm_msg_q_t *mq;
ASSERT (buf);
- VCL_SESSION_LOCK_AND_GET (session_index, &session);
+ VCL_SESSION_LOCK_AND_GET (session_index, &s);
- tx_fifo = session->tx_fifo;
- is_nonblocking = VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK);
+ tx_fifo = s->tx_fifo;
+ is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
- if (PREDICT_FALSE (session->is_vep))
+ if (PREDICT_FALSE (s->is_vep))
{
VCL_SESSION_UNLOCK ();
clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
"cannot write to an epoll session!",
- getpid (), session->vpp_handle, session_index);
+ getpid (), s->vpp_handle, session_index);
rv = VPPCOM_EBADFD;
goto done;
}
- if (!(session->session_state & STATE_OPEN))
+ if (!(s->session_state & STATE_OPEN))
{
- session_state_t state = session->session_state;
+ session_state_t state = s->session_state;
rv = ((state & STATE_DISCONNECT) ? VPPCOM_ECONNRESET : VPPCOM_ENOTCONN);
VCL_SESSION_UNLOCK ();
VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: session is not open! "
"state 0x%x (%s)",
- getpid (), session->vpp_handle, session_index,
+ getpid (), s->vpp_handle, session_index,
state, vppcom_session_state_str (state));
goto done;
}
VCL_SESSION_UNLOCK ();
- mq = session->our_evt_q ? session->our_evt_q : vcm->app_event_queue;
+ mq = vcl_session_is_ct (s) ? s->our_evt_q : vcm->app_event_queue;
if (svm_fifo_is_full (tx_fifo))
{
if (is_nonblocking)
@@ -1448,7 +1465,7 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
svm_msg_q_sub_w_lock (mq, &msg);
e = svm_msg_q_msg_data (mq, &msg);
if (!vcl_is_tx_evt_for_session (e, session_index,
- session->our_evt_q != 0))
+ s->our_evt_q != 0))
{
vcl_handle_mq_ctrl_event (e);
svm_msg_q_free_msg (mq, &msg);
@@ -1465,31 +1482,27 @@ vppcom_session_write (uint32_t session_index, void *buf, size_t n)
}
}
- n_write = svm_fifo_enqueue_nowait (tx_fifo, n, (void *) buf);
- ASSERT (n_write > 0);
+ ASSERT (FIFO_EVENT_APP_TX + 1 == SESSION_IO_EVT_CT_TX);
+ et = FIFO_EVENT_APP_TX + vcl_session_is_ct (s);
+ if (s->is_dgram)
+ n_write = app_send_dgram_raw (tx_fifo, &s->transport,
+ s->vpp_evt_q, buf, n, et, SVM_Q_WAIT);
+ else
+ n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
+ SVM_Q_WAIT);
- if (svm_fifo_set_event (tx_fifo))
- {
- session_evt_type_t et;
- VCL_SESSION_LOCK_AND_GET (session_index, &session);
- et = session->our_evt_q ? SESSION_IO_EVT_CT_TX : FIFO_EVENT_APP_TX;
- app_send_io_evt_to_vpp (session->vpp_evt_q, tx_fifo, et, SVM_Q_WAIT);
- VCL_SESSION_UNLOCK ();
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: added FIFO_EVENT_APP_TX "
- "to vpp_event_q %p, n_write %d", getpid (),
- session->vpp_handle, session_index, session->vpp_evt_q, n_write);
- }
+ ASSERT (n_write > 0);
if (VPPCOM_DEBUG > 2)
{
if (n_write <= 0)
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
- "FIFO-FULL (%p)", getpid (), session->vpp_handle,
+ "FIFO-FULL (%p)", getpid (), s->vpp_handle,
session_index, tx_fifo);
else
clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: "
"wrote %d bytes tx-fifo: (%p)", getpid (),
- session->vpp_handle, session_index, n_write, tx_fifo);
+ s->vpp_handle, session_index, n_write, tx_fifo);
}
return n_write;
@@ -2885,18 +2898,11 @@ vppcom_session_recvfrom (uint32_t session_index, void *buffer,
VCL_SESSION_UNLOCK ();
VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
getpid (), session_index);
- rv = VPPCOM_EBADFD;
VCL_SESSION_UNLOCK ();
- goto done;
+ return VPPCOM_EBADFD;
}
ep->is_ip4 = session->transport.is_ip4;
ep->port = session->transport.rmt_port;
- if (session->transport.is_ip4)
- clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
- sizeof (ip4_address_t));
- else
- clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
- sizeof (ip6_address_t));
VCL_SESSION_UNLOCK ();
}
@@ -2908,10 +2914,16 @@ vppcom_session_recvfrom (uint32_t session_index, void *buffer,
{
clib_warning ("VCL<%d>: Unsupport flags for recvfrom %d",
getpid (), flags);
- rv = VPPCOM_EAFNOSUPPORT;
+ return VPPCOM_EAFNOSUPPORT;
}
-done:
+ if (session->transport.is_ip4)
+ clib_memcpy (ep->ip, &session->transport.rmt_ip.ip4,
+ sizeof (ip4_address_t));
+ else
+ clib_memcpy (ep->ip, &session->transport.rmt_ip.ip6,
+ sizeof (ip6_address_t));
+
return rv;
}
diff --git a/src/vnet/session-apps/echo_server.c b/src/vnet/session-apps/echo_server.c
index 770f4ba7337..14ab36d796c 100644
--- a/src/vnet/session-apps/echo_server.c
+++ b/src/vnet/session-apps/echo_server.c
@@ -211,14 +211,16 @@ echo_server_rx_callback (stream_session_t * s)
actual_transfer = app_recv_stream_raw (rx_fifo,
esm->rx_buf[thread_index],
max_transfer,
- 0 /* don't clear event */ );
+ 0 /* don't clear event */ ,
+ 0 /* peek */ );
}
else
{
actual_transfer = app_recv_dgram_raw (rx_fifo,
esm->rx_buf[thread_index],
max_transfer, &at,
- 0 /* don't clear event */ );
+ 0 /* don't clear event */ ,
+ 0 /* peek */ );
}
ASSERT (actual_transfer == max_transfer);
/* test_bytes (esm, actual_transfer); */
@@ -232,14 +234,14 @@ echo_server_rx_callback (stream_session_t * s)
n_written = app_send_stream_raw (tx_fifo,
esm->vpp_queue[thread_index],
esm->rx_buf[thread_index],
- actual_transfer, 0);
+ actual_transfer, FIFO_EVENT_APP_TX, 0);
}
else
{
n_written = app_send_dgram_raw (tx_fifo, &at,
esm->vpp_queue[s->thread_index],
esm->rx_buf[thread_index],
- actual_transfer, 0);
+ actual_transfer, FIFO_EVENT_APP_TX, 0);
}
if (n_written != max_transfer)
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 99f0dff7fab..757e12e1b58 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -841,7 +841,8 @@ app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
svm_msg_q_msg_t msg;
svm_msg_q_t *mq;
- if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
+ if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY
+ && s->session_state != SESSION_STATE_LISTENING))
{
/* Session is closed so app will never clean up. Flush rx fifo */
if (s->session_state == SESSION_STATE_CLOSED)
@@ -917,8 +918,9 @@ app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
typedef int (app_send_evt_handler_fn) (application_t *app,
stream_session_t *s,
u8 lock);
-static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[3] = {
app_send_io_evt_rx,
+ 0,
app_send_io_evt_tx,
};
/* *INDENT-ON* */
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index ffe2a64c245..daba169a297 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -354,7 +354,8 @@ app_send_io_evt_to_vpp (svm_msg_q_t * mq, svm_fifo_t * f, u8 evt_type,
always_inline int
app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
- svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 noblock)
+ svm_msg_q_t * vpp_evt_q, u8 * data, u32 len, u8 evt_type,
+ u8 noblock)
{
u32 max_enqueue, actual_write;
session_dgram_hdr_t hdr;
@@ -379,7 +380,7 @@ app_send_dgram_raw (svm_fifo_t * f, app_session_transport_t * at,
if ((rv = svm_fifo_enqueue_nowait (f, actual_write, data)) > 0)
{
if (svm_fifo_set_event (f))
- app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
+ app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock);
}
ASSERT (rv);
return rv;
@@ -389,19 +390,19 @@ always_inline int
app_send_dgram (app_session_t * s, u8 * data, u32 len, u8 noblock)
{
return app_send_dgram_raw (s->tx_fifo, &s->transport, s->vpp_evt_q, data,
- len, noblock);
+ len, FIFO_EVENT_APP_TX, noblock);
}
always_inline int
app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data,
- u32 len, u8 noblock)
+ u32 len, u8 evt_type, u8 noblock)
{
int rv;
if ((rv = svm_fifo_enqueue_nowait (f, len, data)) > 0)
{
if (svm_fifo_set_event (f))
- app_send_io_evt_to_vpp (vpp_evt_q, f, FIFO_EVENT_APP_TX, noblock);
+ app_send_io_evt_to_vpp (vpp_evt_q, f, evt_type, noblock);
}
return rv;
}
@@ -409,7 +410,8 @@ app_send_stream_raw (svm_fifo_t * f, svm_msg_q_t * vpp_evt_q, u8 * data,
always_inline int
app_send_stream (app_session_t * s, u8 * data, u32 len, u8 noblock)
{
- return app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, data, len, noblock);
+ return app_send_stream_raw (s->tx_fifo, s->vpp_evt_q, data, len,
+ FIFO_EVENT_APP_TX, noblock);
}
always_inline int
@@ -422,17 +424,22 @@ app_send (app_session_t * s, u8 * data, u32 len, u8 noblock)
always_inline int
app_recv_dgram_raw (svm_fifo_t * f, u8 * buf, u32 len,
- app_session_transport_t * at, u8 clear_evt)
+ app_session_transport_t * at, u8 clear_evt, u8 peek)
{
session_dgram_pre_hdr_t ph;
u32 max_deq;
int rv;
- if (clear_evt)
- svm_fifo_unset_event (f);
max_deq = svm_fifo_max_dequeue (f);
if (max_deq < sizeof (session_dgram_hdr_t))
- return 0;
+ {
+ if (clear_evt)
+ svm_fifo_unset_event (f);
+ return 0;
+ }
+
+ if (clear_evt)
+ svm_fifo_unset_event (f);
svm_fifo_peek (f, 0, sizeof (ph), (u8 *) & ph);
ASSERT (ph.data_length >= ph.data_offset);
@@ -440,6 +447,8 @@ app_recv_dgram_raw (svm_fifo_t * f, u8 * buf, u32 len,
svm_fifo_peek (f, sizeof (ph), sizeof (*at), (u8 *) at);
len = clib_min (len, ph.data_length - ph.data_offset);
rv = svm_fifo_peek (f, ph.data_offset + SESSION_CONN_HDR_LEN, len, buf);
+ if (peek)
+ return rv;
ph.data_offset += rv;
if (ph.data_offset == ph.data_length)
svm_fifo_dequeue_drop (f, ph.data_length + SESSION_CONN_HDR_LEN);
@@ -451,21 +460,25 @@ app_recv_dgram_raw (svm_fifo_t * f, u8 * buf, u32 len,
always_inline int
app_recv_dgram (app_session_t * s, u8 * buf, u32 len)
{
- return app_recv_dgram_raw (s->rx_fifo, buf, len, &s->transport, 1);
+ return app_recv_dgram_raw (s->rx_fifo, buf, len, &s->transport, 1, 0);
}
always_inline int
-app_recv_stream_raw (svm_fifo_t * f, u8 * buf, u32 len, u8 clear_evt)
+app_recv_stream_raw (svm_fifo_t * f, u8 * buf, u32 len, u8 clear_evt, u8 peek)
{
if (clear_evt)
svm_fifo_unset_event (f);
+
+ if (peek)
+ return svm_fifo_peek (f, 0, len, buf);
+
return svm_fifo_dequeue_nowait (f, len, buf);
}
always_inline int
app_recv_stream (app_session_t * s, u8 * buf, u32 len)
{
- return app_recv_stream_raw (s->rx_fifo, buf, len, 1);
+ return app_recv_stream_raw (s->rx_fifo, buf, len, 1, 0);
}
always_inline int
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 99546cb1347..5e94c41f927 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -33,12 +33,12 @@
typedef enum
{
FIFO_EVENT_APP_RX,
+ SESSION_IO_EVT_CT_RX,
FIFO_EVENT_APP_TX,
+ SESSION_IO_EVT_CT_TX,
FIFO_EVENT_DISCONNECT,
FIFO_EVENT_BUILTIN_RX,
FIFO_EVENT_RPC,
- SESSION_IO_EVT_CT_TX,
- SESSION_IO_EVT_CT_RX,
SESSION_CTRL_EVT_ACCEPTED,
SESSION_CTRL_EVT_ACCEPTED_REPLY,
SESSION_CTRL_EVT_CONNECTED,