}
}
@@ -278,7 +276,6 @@ active_open_connected_callback (u32 app_index, u32 opaque,
proxy_main_t *pm = &proxy_main;
proxy_session_t *ps;
u8 thread_index = vlib_get_thread_index ();
- session_fifo_event_t evt;
if (is_fail)
{
@@ -320,15 +317,9 @@ active_open_connected_callback (u32 app_index, u32 opaque,
/*
* Send event for active open tx fifo
*/
+ ASSERT (s->thread_index == thread_index);
if (svm_fifo_set_event (s->server_tx_fifo))
- {
- evt.fifo = s->server_tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
- if (svm_queue_add
- (pm->active_open_event_queue[thread_index], (u8 *) & evt,
- 0 /* do wait for mutex */ ))
- clib_warning ("failed to enqueue tx evt");
- }
+ session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
return 0;
}
@@ -354,8 +345,6 @@ active_open_disconnect_callback (stream_session_t * s)
static int
active_open_rx_callback (stream_session_t * s)
{
- proxy_main_t *pm = &proxy_main;
- session_fifo_event_t evt;
svm_fifo_t *proxy_tx_fifo;
proxy_tx_fifo = s->server_rx_fifo;
@@ -365,12 +354,10 @@ active_open_rx_callback (stream_session_t * s)
*/
if (svm_fifo_set_event (proxy_tx_fifo))
{
- u32 p_thread_index = proxy_tx_fifo->master_thread_index;
- evt.fifo = proxy_tx_fifo;
- evt.event_type = FIFO_EVENT_APP_TX;
- if (svm_queue_add (pm->server_event_queue[p_thread_index], (u8 *) & evt,
- 0 /* do wait for mutex */ ))
- clib_warning ("failed to enqueue server rx evt");
+ u8 thread_index = proxy_tx_fifo->master_thread_index;
+ return session_send_io_evt_to_thread_custom (proxy_tx_fifo,
+ thread_index,
+ FIFO_EVENT_APP_TX);
}
return 0;
@@ -40,8 +40,8 @@ typedef struct
{
svm_queue_t *vl_input_queue; /**< vpe input queue */
/** per-thread vectors */
- svm_queue_t **server_event_queue;
- svm_queue_t **active_open_event_queue;
+ svm_msg_q_t **server_event_queue;
+ svm_msg_q_t **active_open_event_queue;
u8 **rx_buf; /**< intermediate rx buffers */
u32 cli_node_index; /**< cli process node index */