aboutsummaryrefslogtreecommitdiffstats
path: root/src/vnet/session
diff options
context:
space:
mode:
Diffstat (limited to 'src/vnet/session')
-rw-r--r--src/vnet/session/segment_manager.c1
-rw-r--r--src/vnet/session/session.c105
-rw-r--r--src/vnet/session/session_node.c26
-rw-r--r--src/vnet/session/stream_session.h2
4 files changed, 101 insertions, 33 deletions
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
index 262b7faa..43977063 100644
--- a/src/vnet/session/segment_manager.c
+++ b/src/vnet/session/segment_manager.c
@@ -224,6 +224,7 @@ segment_manager_del (segment_manager_t * sm)
session = stream_session_get (session_index, thread_index);
/* Instead of directly removing the session call disconnect */
+ session->session_state = SESSION_STATE_CLOSED;
session_send_session_evt_to_thread (stream_session_handle (session),
FIFO_EVENT_DISCONNECT,
thread_index);
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 70a5cd83..6fe99047 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -92,38 +92,104 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
return 0;
}
-/** Enqueue buffer chain tail */
+/**
+ * Discards bytes from buffer chain
+ *
+ * It discards n_bytes_to_drop starting at first buffer after chain_b
+ */
+always_inline void
+session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
+ vlib_buffer_t ** chain_b,
+ u32 n_bytes_to_drop)
+{
+ vlib_buffer_t *next = *chain_b;
+ u32 to_drop = n_bytes_to_drop;
+ ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
+ while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
+ {
+ next = vlib_get_buffer (vm, next->next_buffer);
+ if (next->current_length > to_drop)
+ {
+ vlib_buffer_advance (next, to_drop);
+ to_drop = 0;
+ }
+ else
+ {
+ to_drop -= next->current_length;
+ next->current_length = 0;
+ }
+ }
+ *chain_b = next;
+
+ if (to_drop == 0)
+ b->total_length_not_including_first_buffer -= n_bytes_to_drop;
+}
+
+/**
+ * Enqueue buffer chain tail
+ */
always_inline int
session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b,
u32 offset, u8 is_in_order)
{
vlib_buffer_t *chain_b;
- u32 chain_bi = b->next_buffer, len;
+ u32 chain_bi, len, diff;
vlib_main_t *vm = vlib_get_main ();
u8 *data;
- u16 written = 0;
+ u32 written = 0;
int rv = 0;
+ if (is_in_order && offset)
+ {
+ diff = offset - b->current_length;
+ if (diff > b->total_length_not_including_first_buffer)
+ return 0;
+ chain_b = b;
+ session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
+ chain_bi = vlib_get_buffer_index (vm, chain_b);
+ }
+ else
+ chain_bi = b->next_buffer;
+
do
{
chain_b = vlib_get_buffer (vm, chain_bi);
data = vlib_buffer_get_current (chain_b);
len = chain_b->current_length;
+ if (!len)
+ continue;
if (is_in_order)
{
rv = svm_fifo_enqueue_nowait (s->server_rx_fifo, len, data);
- if (rv < len)
+ if (rv == len)
+ {
+ written += rv;
+ }
+ else if (rv < len)
{
return (rv > 0) ? (written + rv) : written;
}
- written += rv;
+ else if (rv > len)
+ {
+ written += rv;
+
+ /* written more than what was left in chain */
+ if (written > b->total_length_not_including_first_buffer)
+ return written;
+
+ /* drop the bytes that have already been delivered */
+ session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
+ }
}
else
{
rv = svm_fifo_enqueue_with_offset (s->server_rx_fifo, offset, len,
data);
if (rv)
- return -1;
+ {
+ clib_warning ("failed to enqueue multi-buffer seg");
+ return -1;
+ }
offset += len;
}
}
@@ -155,22 +221,22 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
u32 offset, u8 queue_event, u8 is_in_order)
{
stream_session_t *s;
- int enqueued = 0, rv;
+ int enqueued = 0, rv, in_order_off;
s = stream_session_get (tc->s_index, tc->thread_index);
if (is_in_order)
{
- enqueued =
- svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length,
- vlib_buffer_get_current (b));
- if (PREDICT_FALSE
- ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued > 0))
+ enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo,
+ b->current_length,
+ vlib_buffer_get_current (b));
+ if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
+ && enqueued >= 0))
{
- rv = session_enqueue_chain_tail (s, b, 0, 1);
- if (rv <= 0)
- return enqueued;
- enqueued += rv;
+ in_order_off = enqueued > b->current_length ? enqueued : 0;
+ rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
+ if (rv > 0)
+ enqueued += rv;
}
}
else
@@ -179,9 +245,10 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
b->current_length,
vlib_buffer_get_current (b));
if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
- rv = session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
- if (rv)
- return -1;
+ session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
+ /* if something was enqueued, report even this as success for ooo
+ * segment handling */
+ return rv;
}
if (queue_event)
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index fac2b852..cd52742b 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -76,7 +76,7 @@ session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
u8 thread_index, svm_fifo_t * fifo,
vlib_buffer_t * b0, u32 bi0, u8 n_bufs_per_seg,
u32 left_from_seg, u32 * left_to_snd0,
- u16 * n_bufs, u32 * rx_offset, u16 deq_per_buf,
+ u16 * n_bufs, u32 * tx_offset, u16 deq_per_buf,
u8 peek_data)
{
vlib_buffer_t *chain_b0, *prev_b0;
@@ -104,8 +104,8 @@ session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
data0 = vlib_buffer_get_current (chain_b0);
if (peek_data)
{
- n_bytes_read = svm_fifo_peek (fifo, *rx_offset, len_to_deq0, data0);
- *rx_offset += n_bytes_read;
+ n_bytes_read = svm_fifo_peek (fifo, *tx_offset, len_to_deq0, data0);
+ *tx_offset += n_bytes_read;
}
else
{
@@ -126,7 +126,8 @@ session_tx_fifo_chain_tail (session_manager_main_t * smm, vlib_main_t * vm,
if (to_deq == 0)
break;
}
- ASSERT (to_deq == 0);
+ ASSERT (to_deq == 0
+ && b0->total_length_not_including_first_buffer == left_from_seg);
*left_to_snd0 -= left_from_seg;
}
@@ -144,7 +145,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
transport_proto_vft_t *transport_vft;
u32 next_index, next0, *to_next, n_left_to_next, bi0;
vlib_buffer_t *b0;
- u32 rx_offset = 0, max_dequeue0, n_bytes_per_seg;
+ u32 tx_offset = 0, max_dequeue0, n_bytes_per_seg, left_for_seg;
u16 snd_mss0, n_bufs_per_seg, n_bufs;
u8 *data0;
int i, n_bytes_read;
@@ -170,11 +171,11 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
- rx_offset = transport_vft->tx_fifo_offset (tc0);
+ tx_offset = transport_vft->tx_fifo_offset (tc0);
}
/* Check how much we can pull. If buffering, subtract the offset */
- max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo) - rx_offset;
+ max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo) - tx_offset;
/* Nothing to read return */
if (max_dequeue0 == 0)
@@ -193,6 +194,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
}
else
{
+ /* Expectation is that snd_space0 is already a multiple of snd_mss */
max_len_to_snd0 = snd_space0;
}
@@ -265,8 +267,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
b0 = vlib_get_buffer (vm, bi0);
b0->error = 0;
- b0->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID
- | VNET_BUFFER_F_LOCALLY_ORIGINATED;
+ b0->flags = VNET_BUFFER_F_LOCALLY_ORIGINATED;
b0->current_data = 0;
b0->total_length_not_including_first_buffer = 0;
@@ -274,11 +275,11 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
data0 = vlib_buffer_make_headroom (b0, MAX_HDRS_LEN);
if (peek_data)
{
- n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, rx_offset,
+ n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, tx_offset,
len_to_deq0, data0);
/* Keep track of progress locally, transport is also supposed to
* increment it independently when pushing the header */
- rx_offset += n_bytes_read;
+ tx_offset += n_bytes_read;
}
else
{
@@ -299,12 +300,11 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
*/
if (PREDICT_FALSE (n_bufs_per_seg > 1 && left_to_snd0))
{
- u32 left_for_seg;
left_for_seg = clib_min (snd_mss0 - n_bytes_read, left_to_snd0);
session_tx_fifo_chain_tail (smm, vm, thread_index,
s0->server_tx_fifo, b0, bi0,
n_bufs_per_seg, left_for_seg,
- &left_to_snd0, &n_bufs, &rx_offset,
+ &left_to_snd0, &n_bufs, &tx_offset,
deq_per_buf, peek_data);
}
diff --git a/src/vnet/session/stream_session.h b/src/vnet/session/stream_session.h
index 533cf97f..275052d3 100644
--- a/src/vnet/session/stream_session.h
+++ b/src/vnet/session/stream_session.h
@@ -56,7 +56,7 @@ typedef struct _stream_session_t
u8 session_type;
/** State */
- u8 session_state;
+ volatile u8 session_state;
u8 thread_index;