aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/session/session.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session/session.c')
-rw-r--r--src/vnet/session/session.c54
1 files changed, 46 insertions, 8 deletions
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index c56712bbf87..efd1d73c4a0 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -518,6 +518,30 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
return svm_fifo_dequeue_drop (s->server_tx_fifo, max_bytes);
}
+static inline int
+session_notify_subscribers (u32 app_index, stream_session_t * s,
+ svm_fifo_t * f, session_evt_type_t evt_type)
+{
+ app_worker_t *app_wrk;
+ application_t *app;
+ int i;
+
+ app = application_get (app_index);
+ if (!app)
+ return -1;
+
+ for (i = 0; i < f->n_subscribers; i++)
+ {
+ app_wrk = application_get_worker (app, f->subscribers[i]);
+ if (!app_wrk)
+ continue;
+ if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
+ return -1;
+ }
+
+ return 0;
+}
+
/**
* Notify session peer that new data has been enqueued.
*
@@ -529,10 +553,10 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
static inline int
session_enqueue_notify (stream_session_t * s)
{
- app_worker_t *app;
+ app_worker_t *app_wrk;
- app = app_worker_get_if_valid (s->app_wrk_index);
- if (PREDICT_FALSE (!app))
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (PREDICT_FALSE (!app_wrk))
{
SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
return 0;
@@ -545,22 +569,36 @@ session_enqueue_notify (stream_session_t * s)
}));
/* *INDENT-ON* */
- return app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_RX);
+ if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
+ FIFO_EVENT_APP_RX)))
+ return -1;
+
+ if (PREDICT_FALSE (svm_fifo_n_subscribers (s->server_rx_fifo)))
+ return session_notify_subscribers (app_wrk->app_index, s,
+ s->server_rx_fifo, FIFO_EVENT_APP_RX);
+
+ return 0;
}
int
session_dequeue_notify (stream_session_t * s)
{
- app_worker_t *app;
+ app_worker_t *app_wrk;
- app = app_worker_get_if_valid (s->app_wrk_index);
- if (PREDICT_FALSE (!app))
+ app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+ if (PREDICT_FALSE (!app_wrk))
return -1;
- if (app_worker_lock_and_send_event (app, s, FIFO_EVENT_APP_TX))
+ if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
+ FIFO_EVENT_APP_TX)))
return -1;
+ if (PREDICT_FALSE (s->server_tx_fifo->n_subscribers))
+ return session_notify_subscribers (app_wrk->app_index, s,
+ s->server_tx_fifo, FIFO_EVENT_APP_TX);
+
svm_fifo_clear_tx_ntf (s->server_tx_fifo);
+
return 0;
}