summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2017-07-24 17:40:28 -0700
committerDave Barach <openvpp@barachs.net>2017-07-30 15:04:26 +0000
commit68810624f84467503482b82662c980e8f0e36deb (patch)
treee2365cc66a5b6d28700e6b167e0e7038fdafbda0 /src
parent03a6213fb5022d37ea92f974a1814db1c70bcbdf (diff)
Make tcp active open data structures thread safe
- Cleanup half-open connections and timers on the right thread - Ensure half-open connection and transport endpoint pools are thread safe - Enqueue TX events to the correct vpp thread in the builtin client - Use transport proto in transport connections instead of session type Change-Id: Id13239a206afbff6f34a38afa510fe014e4b2049 Signed-off-by: Florin Coras <fcoras@cisco.com> Signed-off-by: Dave Barach <dave@barachs.net>
Diffstat (limited to 'src')
-rw-r--r--src/svm/svm_fifo_segment.c6
-rw-r--r--src/vnet/session/session.c14
-rw-r--r--src/vnet/session/session.h3
-rwxr-xr-xsrc/vnet/session/session_cli.c2
-rw-r--r--src/vnet/session/session_lookup.c103
-rw-r--r--src/vnet/session/session_lookup.h5
-rw-r--r--src/vnet/session/transport.h6
-rw-r--r--src/vnet/session/transport_interface.c5
-rw-r--r--src/vnet/session/transport_interface.h2
-rw-r--r--src/vnet/tcp/builtin_client.c125
-rw-r--r--src/vnet/tcp/builtin_client.h9
-rw-r--r--src/vnet/tcp/tcp.c166
-rw-r--r--src/vnet/tcp/tcp.h18
-rwxr-xr-xsrc/vnet/tcp/tcp_debug.h32
-rw-r--r--src/vnet/tcp/tcp_input.c36
-rw-r--r--src/vnet/tcp/tcp_output.c19
-rw-r--r--src/vnet/tcp/tcp_test.c14
-rw-r--r--src/vnet/udp/udp.c8
-rw-r--r--src/vnet/udp/udp_input.c2
19 files changed, 321 insertions, 254 deletions
diff --git a/src/svm/svm_fifo_segment.c b/src/svm/svm_fifo_segment.c
index c80374a7ffe..a01e26e45e5 100644
--- a/src/svm/svm_fifo_segment.c
+++ b/src/svm/svm_fifo_segment.c
@@ -376,6 +376,12 @@ svm_fifo_segment_free_fifo (svm_fifo_segment_private_t * s, svm_fifo_t * f,
break;
}
+ if (CLIB_DEBUG)
+ {
+ f->master_session_index = ~0;
+ f->master_thread_index = ~0;
+ }
+
ssvm_pop_heap (oldheap);
ssvm_unlock_non_recursive (sh);
}
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 48000a6fee6..004c7193b94 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -64,7 +64,8 @@ stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc,
s->server_tx_fifo = server_tx_fifo;
/* Initialize state machine, such as it is... */
- s->session_type = tc->proto;
+ s->session_type = session_type_from_proto_and_ip (tc->transport_proto,
+ tc->is_ip4);
s->session_state = SESSION_STATE_CONNECTING;
s->svm_segment_index = fifo_segment_index;
s->thread_index = thread_index;
@@ -354,8 +355,7 @@ stream_session_init_fifos_pointers (transport_connection_t * tc,
}
int
-stream_session_connect_notify (transport_connection_t * tc, u8 sst,
- u8 is_fail)
+stream_session_connect_notify (transport_connection_t * tc, u8 is_fail)
{
application_t *app;
stream_session_t *new_s = 0;
@@ -365,7 +365,7 @@ stream_session_connect_notify (transport_connection_t * tc, u8 sst,
handle = stream_session_half_open_lookup_handle (&tc->lcl_ip, &tc->rmt_ip,
tc->lcl_port, tc->rmt_port,
- tc->proto);
+ tc->transport_proto);
if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
{
clib_warning ("This can't be good!");
@@ -391,7 +391,7 @@ stream_session_connect_notify (transport_connection_t * tc, u8 sst,
new_s->app_index = app->index;
}
- /* Notify client */
+ /* Notify client application */
if (app->cb_fns.session_connected_callback (app->index, api_context, new_s,
is_fail))
{
@@ -406,7 +406,7 @@ stream_session_connect_notify (transport_connection_t * tc, u8 sst,
}
/* Cleanup session lookup */
- stream_session_half_open_table_del (sst, tc);
+ stream_session_half_open_table_del (tc);
return error;
}
@@ -567,7 +567,7 @@ stream_session_open (u32 app_index, session_type_t st,
handle = (((u64) app_index) << 32) | (u64) tc->c_index;
/* Add to the half-open lookup table */
- stream_session_half_open_table_add (st, tc, handle);
+ stream_session_half_open_table_add (tc, handle);
*res = tc;
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index bb22f100529..180b9f8a496 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -274,8 +274,7 @@ stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes);
u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes);
-int stream_session_connect_notify (transport_connection_t * tc, u8 sst,
- u8 is_fail);
+int stream_session_connect_notify (transport_connection_t * tc, u8 is_fail);
void stream_session_init_fifos_pointers (transport_connection_t * tc,
u32 rx_pointer, u32 tx_pointer);
diff --git a/src/vnet/session/session_cli.c b/src/vnet/session/session_cli.c
index 4d432977356..de564ea7272 100755
--- a/src/vnet/session/session_cli.c
+++ b/src/vnet/session/session_cli.c
@@ -57,7 +57,7 @@ format_stream_session (u8 * s, va_list * args)
u8 *str = 0;
tp_vft = session_get_transport_vft (ss->session_type);
- if (verbose == 1)
+ if (verbose == 1 && ss->session_state >= SESSION_STATE_ACCEPTING)
str = format (0, "%-10u%-10u%-10lld",
svm_fifo_max_dequeue (ss->server_rx_fifo),
svm_fifo_max_enqueue (ss->server_tx_fifo),
diff --git a/src/vnet/session/session_lookup.c b/src/vnet/session/session_lookup.c
index b3862ee3920..1ce22f808e5 100644
--- a/src/vnet/session/session_lookup.c
+++ b/src/vnet/session/session_lookup.c
@@ -107,7 +107,7 @@ always_inline void
make_v4_ss_kv_from_tc (session_kv4_t * kv, transport_connection_t * t)
{
return make_v4_ss_kv (kv, &t->lcl_ip.ip4, &t->rmt_ip.ip4, t->lcl_port,
- t->rmt_port, t->proto);
+ t->rmt_port, t->transport_proto);
}
always_inline void
@@ -150,7 +150,7 @@ always_inline void
make_v6_ss_kv_from_tc (session_kv6_t * kv, transport_connection_t * t)
{
make_v6_ss_kv (kv, &t->lcl_ip.ip6, &t->rmt_ip.ip6, t->lcl_port,
- t->rmt_port, t->proto);
+ t->rmt_port, t->transport_proto);
}
/*
@@ -164,23 +164,17 @@ stream_session_table_add_for_tc (transport_connection_t * tc, u64 value)
session_kv4_t kv4;
session_kv6_t kv6;
- switch (tc->proto)
+ if (tc->is_ip4)
{
- case SESSION_TYPE_IP4_UDP:
- case SESSION_TYPE_IP4_TCP:
make_v4_ss_kv_from_tc (&kv4, tc);
kv4.value = value;
clib_bihash_add_del_16_8 (&sl->v4_session_hash, &kv4, 1 /* is_add */ );
- break;
- case SESSION_TYPE_IP6_UDP:
- case SESSION_TYPE_IP6_TCP:
+ }
+ else
+ {
make_v6_ss_kv_from_tc (&kv6, tc);
kv6.value = value;
clib_bihash_add_del_48_8 (&sl->v6_session_hash, &kv6, 1 /* is_add */ );
- break;
- default:
- clib_warning ("Session type not supported");
- ASSERT (0);
}
}
@@ -195,59 +189,24 @@ stream_session_table_add (session_manager_main_t * smm, stream_session_t * s,
stream_session_table_add_for_tc (tc, value);
}
-void
-stream_session_half_open_table_add (session_type_t sst,
- transport_connection_t * tc, u64 value)
-{
- session_lookup_t *sl = &session_lookup;
- session_kv4_t kv4;
- session_kv6_t kv6;
-
- switch (sst)
- {
- case SESSION_TYPE_IP4_UDP:
- case SESSION_TYPE_IP4_TCP:
- make_v4_ss_kv_from_tc (&kv4, tc);
- kv4.value = value;
- clib_bihash_add_del_16_8 (&sl->v4_half_open_hash, &kv4,
- 1 /* is_add */ );
- break;
- case SESSION_TYPE_IP6_UDP:
- case SESSION_TYPE_IP6_TCP:
- make_v6_ss_kv_from_tc (&kv6, tc);
- kv6.value = value;
- clib_bihash_add_del_48_8 (&sl->v6_half_open_hash, &kv6,
- 1 /* is_add */ );
- break;
- default:
- clib_warning ("Session type not supported");
- ASSERT (0);
- }
-}
-
int
stream_session_table_del_for_tc (transport_connection_t * tc)
{
session_lookup_t *sl = &session_lookup;
session_kv4_t kv4;
session_kv6_t kv6;
- switch (tc->proto)
+
+ if (tc->is_ip4)
{
- case SESSION_TYPE_IP4_UDP:
- case SESSION_TYPE_IP4_TCP:
make_v4_ss_kv_from_tc (&kv4, tc);
return clib_bihash_add_del_16_8 (&sl->v4_session_hash, &kv4,
0 /* is_add */ );
- break;
- case SESSION_TYPE_IP6_UDP:
- case SESSION_TYPE_IP6_TCP:
+ }
+ else
+ {
make_v6_ss_kv_from_tc (&kv6, tc);
return clib_bihash_add_del_48_8 (&sl->v6_session_hash, &kv6,
0 /* is_add */ );
- break;
- default:
- clib_warning ("Session type not supported");
- ASSERT (0);
}
return 0;
@@ -262,30 +221,48 @@ stream_session_table_del (stream_session_t * s)
return stream_session_table_del_for_tc (ts);
}
+
void
-stream_session_half_open_table_del (u8 sst, transport_connection_t * tc)
+stream_session_half_open_table_add (transport_connection_t * tc, u64 value)
{
session_lookup_t *sl = &session_lookup;
session_kv4_t kv4;
session_kv6_t kv6;
- switch (sst)
+ if (tc->is_ip4)
+ {
+ make_v4_ss_kv_from_tc (&kv4, tc);
+ kv4.value = value;
+ clib_bihash_add_del_16_8 (&sl->v4_half_open_hash, &kv4,
+ 1 /* is_add */ );
+ }
+ else
+ {
+ make_v6_ss_kv_from_tc (&kv6, tc);
+ kv6.value = value;
+ clib_bihash_add_del_48_8 (&sl->v6_half_open_hash, &kv6,
+ 1 /* is_add */ );
+ }
+}
+
+void
+stream_session_half_open_table_del (transport_connection_t * tc)
+{
+ session_lookup_t *sl = &session_lookup;
+ session_kv4_t kv4;
+ session_kv6_t kv6;
+
+ if (tc->is_ip4)
{
- case SESSION_TYPE_IP4_UDP:
- case SESSION_TYPE_IP4_TCP:
make_v4_ss_kv_from_tc (&kv4, tc);
clib_bihash_add_del_16_8 (&sl->v4_half_open_hash, &kv4,
0 /* is_add */ );
- break;
- case SESSION_TYPE_IP6_UDP:
- case SESSION_TYPE_IP6_TCP:
+ }
+ else
+ {
make_v6_ss_kv_from_tc (&kv6, tc);
clib_bihash_add_del_48_8 (&sl->v6_half_open_hash, &kv6,
0 /* is_add */ );
- break;
- default:
- clib_warning ("Session type not supported");
- ASSERT (0);
}
}
diff --git a/src/vnet/session/session_lookup.h b/src/vnet/session/session_lookup.h
index 9e92dab1b1d..cf1dc01356e 100644
--- a/src/vnet/session/session_lookup.h
+++ b/src/vnet/session/session_lookup.h
@@ -83,9 +83,8 @@ transport_connection_t *stream_session_half_open_lookup (ip46_address_t * lcl,
void stream_session_table_add_for_tc (transport_connection_t * tc, u64 value);
int stream_session_table_del_for_tc (transport_connection_t * tc);
int stream_session_table_del (stream_session_t * s);
-void stream_session_half_open_table_del (u8 sst, transport_connection_t * tc);
-void stream_session_half_open_table_add (session_type_t sst,
- transport_connection_t * tc,
+void stream_session_half_open_table_del (transport_connection_t * tc);
+void stream_session_half_open_table_add (transport_connection_t * tc,
u64 value);
void session_lookup_init (void);
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index 3895a60af48..e56be3386ed 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -31,12 +31,12 @@ typedef struct _transport_connection
ip46_address_t lcl_ip; /**< Local IP */
u16 lcl_port; /**< Local port */
u16 rmt_port; /**< Remote port */
- u8 proto; /**< Protocol id (also session type) */
+ u8 transport_proto; /**< Protocol id */
+ u8 is_ip4; /**< Flag if IP4 connection */
u32 vrf; /**< FIB table id */
u32 s_index; /**< Parent session index */
u32 c_index; /**< Connection index in transport pool */
- u8 is_ip4; /**< Flag if IP4 connection */
u32 thread_index; /**< Worker-thread index */
fib_node_index_t rmt_fei; /**< FIB entry index for rmt */
@@ -56,7 +56,7 @@ typedef struct _transport_connection
#define c_rmt_ip6 connection.rmt_ip.ip6
#define c_lcl_port connection.lcl_port
#define c_rmt_port connection.rmt_port
-#define c_proto connection.proto
+#define c_transport_proto connection.transport_proto
#define c_vrf connection.vrf
#define c_state connection.state
#define c_s_index connection.s_index
diff --git a/src/vnet/session/transport_interface.c b/src/vnet/session/transport_interface.c
index eb12aa69475..ef8d1e49524 100644
--- a/src/vnet/session/transport_interface.c
+++ b/src/vnet/session/transport_interface.c
@@ -73,9 +73,12 @@ transport_endpoint_table_del (transport_endpoint_table_t * ht,
* @param vft - virtual function table
*/
void
-session_register_transport (u8 session_type,
+session_register_transport (transport_proto_t transport_proto, u8 is_ip4,
const transport_proto_vft_t * vft)
{
+ u8 session_type;
+ session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
+
vec_validate (tp_vfts, session_type);
tp_vfts[session_type] = *vft;
diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h
index b7e86ee7960..661221c484a 100644
--- a/src/vnet/session/transport_interface.h
+++ b/src/vnet/session/transport_interface.h
@@ -67,7 +67,7 @@ void transport_endpoint_table_add (transport_endpoint_table_t * ht,
void transport_endpoint_table_del (transport_endpoint_table_t * ht,
transport_endpoint_t * te);
-void session_register_transport (u8 session_type,
+void session_register_transport (transport_proto_t transport_proto, u8 is_ip4,
const transport_proto_vft_t * vft);
transport_proto_vft_t *session_get_transport_vft (u8 session_type);
diff --git a/src/vnet/tcp/builtin_client.c b/src/vnet/tcp/builtin_client.c
index 744f50e7db2..27e20f8e8e5 100644
--- a/src/vnet/tcp/builtin_client.c
+++ b/src/vnet/tcp/builtin_client.c
@@ -46,6 +46,24 @@
#define TCP_BUILTIN_CLIENT_DBG (0)
static void
+signal_evt_to_cli_i (int *code)
+{
+ tclient_main_t *tm = &tclient_main;
+ ASSERT (vlib_get_thread_index () == 0);
+ vlib_process_signal_event (tm->vlib_main, tm->cli_node_index, *code, 0);
+}
+
+static void
+signal_evt_to_cli (int code)
+{
+ if (vlib_get_thread_index () != 0)
+ vl_api_rpc_call_main_thread (signal_evt_to_cli_i, (u8 *) & code,
+ sizeof (code));
+ else
+ signal_evt_to_cli_i (&code);
+}
+
+static void
send_test_chunk (tclient_main_t * tm, session_t * s)
{
u8 *test_data = tm->connect_test_data;
@@ -53,6 +71,7 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
u32 bytes_this_chunk;
session_fifo_event_t evt;
static int serial_number = 0;
+ svm_fifo_t *txf;
int rv;
ASSERT (vec_len (test_data) > 0);
@@ -63,7 +82,8 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
bytes_this_chunk = bytes_this_chunk < s->bytes_to_send
? bytes_this_chunk : s->bytes_to_send;
- rv = svm_fifo_enqueue_nowait (s->server_tx_fifo, bytes_this_chunk,
+ txf = s->server_tx_fifo;
+ rv = svm_fifo_enqueue_nowait (txf, bytes_this_chunk,
test_data + test_buf_offset);
/* If we managed to enqueue data... */
@@ -93,15 +113,16 @@ send_test_chunk (tclient_main_t * tm, session_t * s)
}
/* Poke the session layer */
- if (svm_fifo_set_event (s->server_tx_fifo))
+ if (svm_fifo_set_event (txf))
{
/* Fabricate TX event, send to vpp */
- evt.fifo = s->server_tx_fifo;
+ evt.fifo = txf;
evt.event_type = FIFO_EVENT_APP_TX;
evt.event_id = serial_number++;
- if (unix_shared_memory_queue_add (tm->vpp_event_queue, (u8 *) & evt,
- 0 /* do wait for mutex */ ))
+ if (unix_shared_memory_queue_add
+ (tm->vpp_event_queue[txf->master_thread_index], (u8 *) & evt,
+ 0 /* do wait for mutex */ ))
clib_warning ("could not enqueue event");
}
}
@@ -112,14 +133,16 @@ receive_test_chunk (tclient_main_t * tm, session_t * s)
{
svm_fifo_t *rx_fifo = s->server_rx_fifo;
int n_read, test_bytes = 0;
+ u32 my_thread_index = vlib_get_thread_index ();
/* Allow enqueuing of new event */
// svm_fifo_unset_event (rx_fifo);
if (test_bytes)
{
- n_read = svm_fifo_dequeue_nowait (rx_fifo, vec_len (tm->rx_buf),
- tm->rx_buf);
+ n_read = svm_fifo_dequeue_nowait (rx_fifo,
+ vec_len (tm->rx_buf[my_thread_index]),
+ tm->rx_buf[my_thread_index]);
}
else
{
@@ -151,10 +174,12 @@ receive_test_chunk (tclient_main_t * tm, session_t * s)
int i;
for (i = 0; i < n_read; i++)
{
- if (tm->rx_buf[i] != ((s->bytes_received + i) & 0xff))
+ if (tm->rx_buf[my_thread_index][i]
+ != ((s->bytes_received + i) & 0xff))
{
clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
- n_read, s->bytes_received + i, tm->rx_buf[i],
+ n_read, s->bytes_received + i,
+ tm->rx_buf[my_thread_index][i],
((s->bytes_received + i) & 0xff));
}
}
@@ -247,7 +272,11 @@ builtin_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
if (s)
{
- stream_session_disconnect (s);
+ vnet_disconnect_args_t _a, *a = &_a;
+ a->handle = stream_session_handle (s);
+ a->app_index = tm->app_index;
+ vnet_disconnect_session (a);
+
vec_delete (connections_this_batch, 1, i);
i--;
__sync_fetch_and_add (&tm->ready_connections, -1);
@@ -258,9 +287,7 @@ builtin_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
/* Kick the debug CLI process */
if (tm->ready_connections == 0)
{
- tm->test_end_time = vlib_time_now (vm);
- vlib_process_signal_event (vm, tm->cli_node_index,
- 2, 0 /* data */ );
+ signal_evt_to_cli (2);
}
}
}
@@ -369,27 +396,31 @@ static int
tcp_test_clients_init (vlib_main_t * vm)
{
tclient_main_t *tm = &tclient_main;
- vlib_thread_main_t *thread_main = vlib_get_thread_main ();
+ vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ u32 num_threads;
int i;
tclient_api_hookup (vm);
if (create_api_loopback (tm))
return -1;
+ num_threads = 1 /* main thread */ + vtm->n_threads;
+
/* Init test data. Big buffer */
vec_validate (tm->connect_test_data, 1024 * 1024 - 1);
for (i = 0; i < vec_len (tm->connect_test_data); i++)
tm->connect_test_data[i] = i & 0xff;
- tm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
- vec_validate (tm->rx_buf, vec_len (tm->connect_test_data) - 1);
+ vec_validate (tm->rx_buf, num_threads - 1);
+ for (i = 0; i < num_threads; i++)
+ vec_validate (tm->rx_buf[i], vec_len (tm->connect_test_data) - 1);
tm->is_init = 1;
- tm->vlib_main = vm;
- vec_validate (tm->connection_index_by_thread, thread_main->n_vlib_mains);
- vec_validate (tm->connections_this_batch_by_thread,
- thread_main->n_vlib_mains);
+ vec_validate (tm->connection_index_by_thread, vtm->n_vlib_mains);
+ vec_validate (tm->connections_this_batch_by_thread, vtm->n_vlib_mains);
+ vec_validate (tm->vpp_event_queue, vtm->n_vlib_mains);
+
return 0;
}
@@ -400,23 +431,28 @@ builtin_session_connected_callback (u32 app_index, u32 api_context,
tclient_main_t *tm = &tclient_main;
session_t *session;
u32 session_index;
- int i;
+ u8 thread_index = vlib_get_thread_index ();
+
+ ASSERT (s->thread_index == thread_index);
if (is_fail)
{
clib_warning ("connection %d failed!", api_context);
- vlib_process_signal_event (tm->vlib_main, tm->cli_node_index, -1,
- 0 /* data */ );
- return -1;
+ signal_evt_to_cli (-1);
+ return 0;
}
- tm->our_event_queue = session_manager_get_vpp_event_queue (s->thread_index);
- tm->vpp_event_queue = session_manager_get_vpp_event_queue (s->thread_index);
+ if (!tm->vpp_event_queue[thread_index])
+ tm->vpp_event_queue[thread_index] =
+ session_manager_get_vpp_event_queue (thread_index);
/*
* Setup session
*/
+ clib_spinlock_lock_if_init (&tm->sessions_lock);
pool_get (tm->sessions, session);
+ clib_spinlock_unlock_if_init (&tm->sessions_lock);
+
memset (session, 0, sizeof (*session));
session_index = session - tm->sessions;
session->bytes_to_send = tm->bytes_to_send;
@@ -427,32 +463,13 @@ builtin_session_connected_callback (u32 app_index, u32 api_context,
session->server_tx_fifo->client_session_index = session_index;
session->vpp_session_handle = stream_session_handle (s);
- /* Add it to the session lookup table */
- hash_set (tm->session_index_by_vpp_handles, session->vpp_session_handle,
- session_index);
-
- if (tm->ready_connections == tm->expected_connections - 1)
- {
- vlib_thread_main_t *thread_main = vlib_get_thread_main ();
- int thread_index;
-
- thread_index = 0;
- for (i = 0; i < pool_elts (tm->sessions); i++)
- {
- vec_add1 (tm->connection_index_by_thread[thread_index], i);
- thread_index++;
- if (thread_index == thread_main->n_vlib_mains)
- thread_index = 0;
- }
- }
+ vec_add1 (tm->connection_index_by_thread[thread_index], session_index);
__sync_fetch_and_add (&tm->ready_connections, 1);
if (tm->ready_connections == tm->expected_connections)
{
tm->run_test = 1;
- tm->test_start_time = vlib_time_now (tm->vlib_main);
/* Signal the CLI process that the action is starting... */
- vlib_process_signal_event (tm->vlib_main, tm->cli_node_index, 1,
- 0 /* data */ );
+ signal_evt_to_cli (1);
}
return 0;
@@ -606,7 +623,9 @@ test_tcp_clients_command_fn (vlib_main_t * vm,
tm->connections_per_batch = 1000;
tm->private_segment_count = 0;
tm->private_segment_size = 0;
-
+ tm->vlib_main = vm;
+ if (thread_main->n_vlib_mains > 1)
+ clib_spinlock_init (&tm->sessions_lock);
vec_free (tm->connect_uri);
while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
@@ -668,7 +687,9 @@ test_tcp_clients_command_fn (vlib_main_t * vm,
start_tx_pthread ();
#endif
+ vlib_worker_thread_barrier_sync (vm);
vnet_session_enable_disable (vm, 1 /* turn on TCP, etc. */ );
+ vlib_worker_thread_barrier_release (vm);
if (tm->test_client_attached == 0)
{
@@ -688,9 +709,8 @@ test_tcp_clients_command_fn (vlib_main_t * vm,
clients_connect (vm, uri, n_clients);
/* Park until the sessions come up, or ten seconds elapse... */
- vlib_process_wait_for_event_or_clock (vm, 10.0 /* timeout, seconds */ );
+ vlib_process_wait_for_event_or_clock (vm, 10 /* timeout, seconds */ );
event_type = vlib_process_get_events (vm, &event_data);
-
switch (event_type)
{
case ~0:
@@ -699,6 +719,7 @@ test_tcp_clients_command_fn (vlib_main_t * vm,
goto cleanup;
case 1:
+ tm->test_start_time = vlib_time_now (tm->vlib_main);
vlib_cli_output (vm, "Test started at %.6f", tm->test_start_time);
break;
@@ -710,7 +731,6 @@ test_tcp_clients_command_fn (vlib_main_t * vm,
/* Now wait for the sessions to finish... */
vlib_process_wait_for_event_or_clock (vm, cli_timeout);
event_type = vlib_process_get_events (vm, &event_data);
-
switch (event_type)
{
case ~0:
@@ -719,6 +739,7 @@ test_tcp_clients_command_fn (vlib_main_t * vm,
goto cleanup;
case 2:
+ tm->test_end_time = vlib_time_now (vm);
vlib_cli_output (vm, "Test finished at %.6f", tm->test_end_time);
break;
@@ -753,6 +774,7 @@ cleanup:
vec_reset_length (tm->connection_index_by_thread[i]);
vec_reset_length (tm->connections_this_batch_by_thread[i]);
}
+
pool_free (tm->sessions);
return 0;
@@ -765,6 +787,7 @@ VLIB_CLI_COMMAND (test_clients_command, static) =
.short_help = "test tcp clients [nclients %d]"
"[iterations %d] [bytes %d] [uri tcp://6.0.1.1/1234]",
.function = test_tcp_clients_command_fn,
+ .is_mp_safe = 1,
};
/* *INDENT-ON* */
diff --git a/src/vnet/tcp/builtin_client.h b/src/vnet/tcp/builtin_client.h
index 38af231dec3..06d239efe14 100644
--- a/src/vnet/tcp/builtin_client.h
+++ b/src/vnet/tcp/builtin_client.h
@@ -48,8 +48,7 @@ typedef struct
* Application setup parameters
*/
unix_shared_memory_queue_t *vl_input_queue; /**< vpe input queue */
- unix_shared_memory_queue_t *our_event_queue; /**< Our event queue */
- unix_shared_memory_queue_t *vpp_event_queue; /**< $$$ single thread */
+ unix_shared_memory_queue_t **vpp_event_queue;
u32 cli_node_index; /**< cli process node index */
u32 my_client_index; /**< loopback API client handle */
@@ -70,9 +69,9 @@ typedef struct
/*
* Test state variables
*/
- session_t *sessions; /**< Sessions pool */
- u8 *rx_buf; /**< intermediate rx buffer */
- uword *session_index_by_vpp_handles; /**< Hash table for disconnecting */
+ session_t *sessions; /**< Session pool, shared */
+ clib_spinlock_t sessions_lock;
+ u8 **rx_buf; /**< intermediate rx buffers */
u8 *connect_test_data; /**< Pre-computed test data */
u32 **connection_index_by_thread;
u32 **connections_this_batch_by_thread; /**< active connection batch */
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index 6d1cfa07749..59b20747da6 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -37,15 +37,14 @@ tcp_connection_bind (u32 session_index, transport_endpoint_t * lcl)
{
listener->c_lcl_ip4.as_u32 = lcl->ip.ip4.as_u32;
listener->c_is_ip4 = 1;
- listener->c_proto = SESSION_TYPE_IP4_TCP;
}
else
{
clib_memcpy (&listener->c_lcl_ip6, &lcl->ip.ip6,
sizeof (ip6_address_t));
- listener->c_proto = SESSION_TYPE_IP6_TCP;
- }
+ }
+ listener->c_transport_proto = TRANSPORT_PROTO_TCP;
listener->c_s_index = session_index;
listener->state = TCP_STATE_LISTEN;
@@ -95,6 +94,71 @@ tcp_session_get_listener (u32 listener_index)
return &tc->connection;
}
+always_inline void
+transport_endpoint_del (u32 tepi)
+{
+ tcp_main_t *tm = vnet_get_tcp_main ();
+ clib_spinlock_lock_if_init (&tm->local_endpoints_lock);
+ pool_put_index (tm->local_endpoints, tepi);
+ clib_spinlock_unlock_if_init (&tm->local_endpoints_lock);
+}
+
+always_inline transport_endpoint_t *
+transport_endpoint_new (void)
+{
+ tcp_main_t *tm = vnet_get_tcp_main ();
+ transport_endpoint_t *tep;
+ pool_get (tm->local_endpoints, tep);
+ return tep;
+}
+
+/**
+ * Cleanup half-open connection
+ *
+ */
+void
+tcp_half_open_connection_del (tcp_connection_t * tc)
+{
+ tcp_main_t *tm = vnet_get_tcp_main ();
+ clib_spinlock_lock_if_init (&tm->half_open_lock);
+ pool_put_index (tm->half_open_connections, tc->c_c_index);
+ if (CLIB_DEBUG)
+ memset (tc, 0xFA, sizeof (*tc));
+ clib_spinlock_unlock_if_init (&tm->half_open_lock);
+}
+
+/**
+ * Try to cleanup half-open connection
+ *
+ * If called from a thread that doesn't own tc, the call won't have any
+ * effect.
+ *
+ * @param tc - connection to be cleaned up
+ * @return non-zero if cleanup failed.
+ */
+int
+tcp_half_open_connection_cleanup (tcp_connection_t * tc)
+{
+ /* Make sure this is the owning thread */
+ if (tc->c_thread_index != vlib_get_thread_index ())
+ return 1;
+ tcp_timer_reset (tc, TCP_TIMER_ESTABLISH);
+ tcp_timer_reset (tc, TCP_TIMER_RETRANSMIT_SYN);
+ tcp_half_open_connection_del (tc);
+ return 0;
+}
+
+tcp_connection_t *
+tcp_half_open_connection_new (void)
+{
+ tcp_main_t *tm = vnet_get_tcp_main ();
+ tcp_connection_t *tc = 0;
+ pool_get (tm->half_open_connections, tc);
+ memset (tc, 0, sizeof (*tc));
+ tc->c_c_index = tc - tm->half_open_connections;
+ return tc;
+}
+
/**
* Cleans up connection state.
*
@@ -110,26 +174,28 @@ tcp_connection_cleanup (tcp_connection_t * tc)
/* Cleanup local endpoint if this was an active connect */
tepi = transport_endpoint_lookup (&tm->local_endpoints_table, &tc->c_lcl_ip,
tc->c_lcl_port);
-
- /*XXX lock */
if (tepi != TRANSPORT_ENDPOINT_INVALID_INDEX)
{
tep = pool_elt_at_index (tm->local_endpoints, tepi);
transport_endpoint_table_del (&tm->local_endpoints_table, tep);
- pool_put (tm->local_endpoints, tep);
+ transport_endpoint_del (tepi);
}
- /* Make sure all timers are cleared */
- tcp_connection_timers_reset (tc);
-
- /* Check if half-open */
+ /* Check if connection is not yet fully established */
if (tc->state == TCP_STATE_SYN_SENT)
{
- tcp_half_open_connection_del (tc);
+ /* Try to remove the half-open connection. If this is not the owning
+ * thread, tc won't be removed. Retransmit or establish timers will
+ * eventually expire and call again cleanup on the right thread. */
+ tcp_half_open_connection_cleanup (tc);
}
else
{
int thread_index = tc->c_thread_index;
+
+ /* Make sure all timers are cleared */
+ tcp_connection_timers_reset (tc);
+
/* Poison the entry */
if (CLIB_DEBUG > 0)
memset (tc, 0xFA, sizeof (*tc));
@@ -152,32 +218,6 @@ tcp_connection_del (tcp_connection_t * tc)
tcp_connection_cleanup (tc);
}
-/**
- * Cleanup half-open connection
- */
-void
-tcp_half_open_connection_del (tcp_connection_t * tc)
-{
- tcp_main_t *tm = vnet_get_tcp_main ();
- if (CLIB_DEBUG)
- memset (tc, 0xFA, sizeof (*tc));
- clib_spinlock_lock_if_init (&tm->half_open_lock);
- pool_put (tm->half_open_connections, tc);
- clib_spinlock_unlock_if_init (&tm->half_open_lock);
-}
-
-tcp_connection_t *
-tcp_half_open_connection_new ()
-{
- tcp_main_t *tm = vnet_get_tcp_main ();
- tcp_connection_t *tc = 0;
- clib_spinlock_lock_if_init (&tm->half_open_lock);
- pool_get (tm->half_open_connections, tc);
- clib_spinlock_unlock_if_init (&tm->half_open_lock);
- memset (tc, 0, sizeof (*tc));
- return tc;
-}
-
tcp_connection_t *
tcp_connection_new (u8 thread_index)
{
@@ -207,9 +247,7 @@ tcp_connection_reset (tcp_connection_t * tc)
tcp_connection_cleanup (tc);
break;
case TCP_STATE_SYN_SENT:
- /* XXX remove sst from call */
- stream_session_connect_notify (&tc->connection, tc->connection.proto,
- 1 /* fail */ );
+ stream_session_connect_notify (&tc->connection, 1 /* fail */ );
tcp_connection_cleanup (tc);
break;
case TCP_STATE_ESTABLISHED:
@@ -225,7 +263,7 @@ tcp_connection_reset (tcp_connection_t * tc)
stream_session_reset_notify (&tc->connection);
/* Wait for cleanup from session layer but not forever */
- tcp_timer_set (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
+ tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
break;
case TCP_STATE_CLOSED:
return;
@@ -325,8 +363,9 @@ ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4)
* table to mark the pair as used.
*/
int
-tcp_allocate_local_port (tcp_main_t * tm, ip46_address_t * ip)
+tcp_allocate_local_port (ip46_address_t * ip)
{
+ tcp_main_t *tm = vnet_get_tcp_main ();
transport_endpoint_t *tep;
u32 time_now, tei;
u16 min = 1024, max = 65535; /* XXX configurable ? */
@@ -338,10 +377,6 @@ tcp_allocate_local_port (tcp_main_t * tm, ip46_address_t * ip)
/* Only support active opens from thread 0 */
ASSERT (vlib_get_thread_index () == 0);
- /* Start at random point or max */
- pool_get (tm->local_endpoints, tep);
- clib_memcpy (&tep->ip, ip, sizeof (*ip));
-
/* Search for first free slot */
for (; tries >= 0; tries--)
{
@@ -355,21 +390,22 @@ tcp_allocate_local_port (tcp_main_t * tm, ip46_address_t * ip)
break;
}
- tep->port = port;
-
/* Look it up */
- tei = transport_endpoint_lookup (&tm->local_endpoints_table, &tep->ip,
- tep->port);
+ tei = transport_endpoint_lookup (&tm->local_endpoints_table, ip, port);
/* If not found, we're done */
if (tei == TRANSPORT_ENDPOINT_INVALID_INDEX)
{
+ clib_spinlock_lock_if_init (&tm->local_endpoints_lock);
+ tep = transport_endpoint_new ();
+ clib_memcpy (&tep->ip, ip, sizeof (*ip));
+ tep->port = port;
transport_endpoint_table_add (&tm->local_endpoints_table, tep,
tep - tm->local_endpoints);
+ clib_spinlock_unlock_if_init (&tm->local_endpoints_lock);
+
return tep->port;
}
}
- /* No free ports */
- pool_put (tm->local_endpoints, tep);
return -1;
}
@@ -592,7 +628,7 @@ tcp_connection_open (transport_endpoint_t * rmt)
}
/* Allocate source port */
- lcl_port = tcp_allocate_local_port (tm, &lcl_addr);
+ lcl_port = tcp_allocate_local_port (&lcl_addr);
if (lcl_port < 1)
{
clib_warning ("Failed to allocate src port");
@@ -602,16 +638,14 @@ tcp_connection_open (transport_endpoint_t * rmt)
/*
* Create connection and send SYN
*/
-
+ clib_spinlock_lock_if_init (&tm->half_open_lock);
tc = tcp_half_open_connection_new ();
-
clib_memcpy (&tc->c_rmt_ip, &rmt->ip, sizeof (ip46_address_t));
clib_memcpy (&tc->c_lcl_ip, &lcl_addr, sizeof (ip46_address_t));
tc->c_rmt_port = clib_host_to_net_u16 (rmt->port);
tc->c_lcl_port = clib_host_to_net_u16 (lcl_port);
- tc->c_c_index = tc - tm->half_open_connections;
tc->c_is_ip4 = rmt->is_ip4;
- tc->c_proto = rmt->is_ip4 ? SESSION_TYPE_IP4_TCP : SESSION_TYPE_IP6_TCP;
+ tc->c_transport_proto = TRANSPORT_PROTO_TCP;
tc->c_vrf = rmt->vrf;
/* The other connection vars will be initialized after SYN ACK */
tcp_connection_timers_init (tc);
@@ -619,6 +653,7 @@ tcp_connection_open (transport_endpoint_t * rmt)
TCP_EVT_DBG (TCP_EVT_OPEN, tc);
tc->state = TCP_STATE_SYN_SENT;
tcp_send_syn (tc);
+ clib_spinlock_unlock_if_init (&tm->half_open_lock);
return tc->c_c_index;
}
@@ -1057,16 +1092,12 @@ void
tcp_timer_establish_handler (u32 conn_index)
{
tcp_connection_t *tc;
- u8 sst;
tc = tcp_half_open_connection_get (conn_index);
tc->timers[TCP_TIMER_ESTABLISH] = TCP_TIMER_HANDLE_INVALID;
ASSERT (tc->state == TCP_STATE_SYN_SENT);
-
- sst = tc->c_is_ip4 ? SESSION_TYPE_IP4_TCP : SESSION_TYPE_IP6_TCP;
- stream_session_connect_notify (&tc->connection, sst, 1 /* fail */ );
-
+ stream_session_connect_notify (&tc->connection, 1 /* fail */ );
tcp_connection_cleanup (tc);
}
@@ -1077,6 +1108,8 @@ tcp_timer_waitclose_handler (u32 conn_index)
tcp_connection_t *tc;
tc = tcp_connection_get (conn_index, thread_index);
+ if (!tc)
+ return;
tc->timers[TCP_TIMER_WAITCLOSE] = TCP_TIMER_HANDLE_INVALID;
/* Session didn't come back with a close(). Send FIN either way
@@ -1180,8 +1213,8 @@ tcp_main_enable (vlib_main_t * vm)
ip4_register_protocol (IP_PROTOCOL_TCP, tcp4_input_node.index);
/* Register as transport with session layer */
- session_register_transport (SESSION_TYPE_IP4_TCP, &tcp_proto);
- session_register_transport (SESSION_TYPE_IP6_TCP, &tcp_proto);
+ session_register_transport (TRANSPORT_PROTO_TCP, 1, &tcp_proto);
+ session_register_transport (TRANSPORT_PROTO_TCP, 0, &tcp_proto);
/*
* Initialize data structures
@@ -1227,7 +1260,10 @@ tcp_main_enable (vlib_main_t * vm)
200000 /* $$$$ config parameter nbuckets */ ,
(64 << 20) /*$$$ config parameter table size */ );
if (num_threads > 1)
- clib_spinlock_init (&tm->half_open_lock);
+ {
+ clib_spinlock_init (&tm->half_open_lock);
+ clib_spinlock_init (&tm->local_endpoints_lock);
+ }
return error;
}
diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h
index 89c30616365..4fa681f8cc1 100644
--- a/src/vnet/tcp/tcp.h
+++ b/src/vnet/tcp/tcp.h
@@ -115,7 +115,8 @@ extern timer_expiration_handler tcp_timer_retransmit_syn_handler;
_(SENT_RCV_WND0, "Sent 0 receive window") \
_(RECOVERY, "Recovery on") \
_(FAST_RECOVERY, "Fast Recovery on") \
- _(FR_1_SMSS, "Sent 1 SMSS")
+ _(FR_1_SMSS, "Sent 1 SMSS") \
+ _(HALF_OPEN_DONE, "Half-open completed")
typedef enum _tcp_connection_flag_bits
{
@@ -381,6 +382,7 @@ typedef struct _tcp_main
/* Local endpoints lookup table */
transport_endpoint_table_t local_endpoints_table;
+ clib_spinlock_t local_endpoints_lock;
/* Congestion control algorithms registered */
tcp_cc_algorithm_t *cc_algos;
@@ -430,7 +432,8 @@ clib_error_t *vnet_tcp_enable_disable (vlib_main_t * vm, u8 is_en);
always_inline tcp_connection_t *
tcp_connection_get (u32 conn_index, u32 thread_index)
{
- if (pool_is_free_index (tcp_main.connections[thread_index], conn_index))
+ if (PREDICT_FALSE
+ (pool_is_free_index (tcp_main.connections[thread_index], conn_index)))
return 0;
return pool_elt_at_index (tcp_main.connections[thread_index], conn_index);
}
@@ -454,7 +457,7 @@ tcp_get_connection_from_transport (transport_connection_t * tconn)
void tcp_connection_close (tcp_connection_t * tc);
void tcp_connection_cleanup (tcp_connection_t * tc);
void tcp_connection_del (tcp_connection_t * tc);
-void tcp_half_open_connection_del (tcp_connection_t * tc);
+int tcp_half_open_connection_cleanup (tcp_connection_t * tc);
tcp_connection_t *tcp_connection_new (u8 thread_index);
void tcp_connection_reset (tcp_connection_t * tc);
@@ -473,9 +476,12 @@ tcp_listener_get (u32 tli)
always_inline tcp_connection_t *
tcp_half_open_connection_get (u32 conn_index)
{
- if (pool_is_free_index (tcp_main.half_open_connections, conn_index))
- return 0;
- return pool_elt_at_index (tcp_main.half_open_connections, conn_index);
+ tcp_connection_t *tc = 0;
+ clib_spinlock_lock_if_init (&tcp_main.half_open_lock);
+ if (!pool_is_free_index (tcp_main.half_open_connections, conn_index))
+ tc = pool_elt_at_index (tcp_main.half_open_connections, conn_index);
+ clib_spinlock_unlock_if_init (&tcp_main.half_open_lock);
+ return tc;
}
void tcp_make_ack (tcp_connection_t * ts, vlib_buffer_t * b);
diff --git a/src/vnet/tcp/tcp_debug.h b/src/vnet/tcp/tcp_debug.h
index e3da56f4a43..fc36eb29afd 100755
--- a/src/vnet/tcp/tcp_debug.h
+++ b/src/vnet/tcp/tcp_debug.h
@@ -19,9 +19,9 @@
#include <vlib/vlib.h>
#define TCP_DEBUG (1)
-#define TCP_DEBUG_SM (2)
-#define TCP_DEBUG_CC (0)
-#define TCP_DEBUG_CC_STAT (0)
+#define TCP_DEBUG_SM (0)
+#define TCP_DEBUG_CC (1)
+#define TCP_DEBUG_CC_STAT (1)
#define foreach_tcp_dbg_evt \
_(INIT, "") \
@@ -197,6 +197,19 @@ typedef enum _tcp_dbg_evt
ed->data[0] = _tc->c_c_index; \
}
+#define TCP_EVT_SYN_RCVD_HANDLER(_tc, ...) \
+{ \
+ TCP_EVT_INIT_HANDLER(_tc, 0); \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "syn-rx: irs %u", \
+ .format_args = "i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 1); \
+ ed->data[0] = _tc->irs; \
+ TCP_EVT_STATE_CHANGE_HANDLER(_tc); \
+}
+
#define TCP_EVT_UNBIND_HANDLER(_tc, ...) \
{ \
TCP_EVT_DEALLOC_HANDLER(_tc); \
@@ -258,19 +271,6 @@ typedef enum _tcp_dbg_evt
ed->data[0] = _tc->state; \
}
-#define TCP_EVT_SYN_RCVD_HANDLER(_tc, ...) \
-{ \
- TCP_EVT_INIT_HANDLER(_tc, 0); \
- ELOG_TYPE_DECLARE (_e) = \
- { \
- .format = "syn-rx: irs %u", \
- .format_args = "i4", \
- }; \
- DECLARE_ETD(_tc, _e, 1); \
- ed->data[0] = _tc->irs; \
- TCP_EVT_STATE_CHANGE_HANDLER(_tc); \
-}
-
#define TCP_EVT_SYN_SENT_HANDLER(_tc, ...) \
{ \
ELOG_TYPE_DECLARE (_e) = \
diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c
index d32b4fc89a5..6c59d70f072 100644
--- a/src/vnet/tcp/tcp_input.c
+++ b/src/vnet/tcp/tcp_input.c
@@ -1724,9 +1724,13 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
* in CLOSE-WAIT, set timer (reuse WAITCLOSE). */
tc0->state = TCP_STATE_CLOSE_WAIT;
TCP_EVT_DBG (TCP_EVT_FIN_RCVD, tc0);
- tc0->rcv_nxt += (vnet_buffer (b0)->tcp.data_len == 0);
+ if (vnet_buffer (b0)->tcp.data_len == 0)
+ {
+ tc0->rcv_nxt += 1;
+ next0 = TCP_ESTABLISHED_NEXT_DROP;
+ }
stream_session_disconnect_notify (&tc0->connection);
- tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
+ tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
}
done:
@@ -1819,7 +1823,6 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
tcp_main_t *tm = vnet_get_tcp_main ();
u32 n_left_from, next_index, *from, *to_next;
u32 my_thread_index = vm->thread_index, errors = 0;
- u8 sst = is_ip4 ? SESSION_TYPE_IP4_TCP : SESSION_TYPE_IP6_TCP;
from = vlib_frame_vector_args (from_frame);
n_left_from = from_frame->n_vectors;
@@ -1936,10 +1939,6 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
if (tcp_options_parse (tcp0, &tc0->rcv_opts))
goto drop;
- /* Stop connection establishment and retransmit timers */
- tcp_timer_reset (tc0, TCP_TIMER_ESTABLISH);
- tcp_timer_reset (tc0, TCP_TIMER_RETRANSMIT_SYN);
-
/* Valid SYN or SYN-ACK. Move connection from half-open pool to
* current thread pool. */
pool_get (tm->connections[my_thread_index], new_tc0);
@@ -1948,7 +1947,14 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
new_tc0->c_thread_index = my_thread_index;
new_tc0->rcv_nxt = vnet_buffer (b0)->tcp.seq_end;
new_tc0->irs = seq0;
- tcp_half_open_connection_del (tc0);
+ new_tc0->timers[TCP_TIMER_ESTABLISH] = TCP_TIMER_HANDLE_INVALID;
+ new_tc0->timers[TCP_TIMER_RETRANSMIT_SYN] =
+ TCP_TIMER_HANDLE_INVALID;
+
+ /* If this is not the owning thread, wait for syn retransmit to
+ * expire and cleanup then */
+ if (tcp_half_open_connection_cleanup (tc0))
+ tc0->flags |= TCP_CONN_HALF_OPEN_DONE;
if (tcp_opts_tstamp (&new_tc0->rcv_opts))
{
@@ -1980,11 +1986,10 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
/* Notify app that we have connection. If session layer can't
* allocate session send reset */
- if (stream_session_connect_notify (&new_tc0->connection, sst,
- 0))
+ if (stream_session_connect_notify (&new_tc0->connection, 0))
{
+ tcp_send_reset (new_tc0, b0, is_ip4);
tcp_connection_cleanup (new_tc0);
- tcp_send_reset (tc0, b0, is_ip4);
goto drop;
}
@@ -2002,8 +2007,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
new_tc0->state = TCP_STATE_SYN_RCVD;
/* Notify app that we have connection */
- if (stream_session_connect_notify
- (&new_tc0->connection, sst, 0))
+ if (stream_session_connect_notify (&new_tc0->connection, 0))
{
tcp_connection_cleanup (new_tc0);
tcp_send_reset (tc0, b0, is_ip4);
@@ -2250,6 +2254,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
if (tc0->snd_una == tc0->snd_una_max)
{
ASSERT (tcp_fin (tcp0));
+ tc0->rcv_nxt += 1;
tc0->state = TCP_STATE_FIN_WAIT_2;
TCP_EVT_DBG (TCP_EVT_STATE_CHANGE, tc0);
@@ -2263,6 +2268,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
* acknowledged ("ok") but do not delete the TCB. */
if (tcp_rcv_ack (tc0, b0, tcp0, &next0, &error0))
goto drop;
+
/* check if rtx queue is empty and ack CLOSE TODO */
break;
case TCP_STATE_CLOSE_WAIT:
@@ -2384,7 +2390,7 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
/* Got FIN, send ACK! */
tc0->state = TCP_STATE_TIME_WAIT;
tcp_connection_timers_reset (tc0);
- tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
+ tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
tcp_make_ack (tc0, b0);
next0 = tcp_next_output (is_ip4);
TCP_EVT_DBG (TCP_EVT_STATE_CHANGE, tc0);
@@ -2745,7 +2751,7 @@ tcp_lookup_is_valid (tcp_connection_t * tc, tcp_header_t * hdr)
if ((tmp =
stream_session_half_open_lookup (&tc->c_lcl_ip, &tc->c_rmt_ip,
tc->c_lcl_port, tc->c_rmt_port,
- tc->c_proto)))
+ tc->c_transport_proto)))
{
if (tmp->lcl_port == hdr->dst_port
&& tmp->rmt_port == hdr->src_port)
diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c
index 5e9ecf114a7..1ecb6ce6071 100644
--- a/src/vnet/tcp/tcp_output.c
+++ b/src/vnet/tcp/tcp_output.c
@@ -1087,15 +1087,14 @@ tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
if (is_syn)
{
tc = tcp_half_open_connection_get (index);
+ tc->timers[TCP_TIMER_RETRANSMIT_SYN] = TCP_TIMER_HANDLE_INVALID;
}
else
{
tc = tcp_connection_get (index, thread_index);
+ tc->timers[TCP_TIMER_RETRANSMIT] = TCP_TIMER_HANDLE_INVALID;
}
- /* Make sure timer handle is set to invalid */
- tc->timers[TCP_TIMER_RETRANSMIT] = TCP_TIMER_HANDLE_INVALID;
-
if (!tcp_in_recovery (tc) && tc->rto_boff > 0
&& tc->state >= TCP_STATE_ESTABLISHED)
{
@@ -1154,6 +1153,20 @@ tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
/* Retransmit for SYN/SYNACK */
else if (tc->state == TCP_STATE_SYN_RCVD || tc->state == TCP_STATE_SYN_SENT)
{
+ /* Half-open connection actually moved to established but we were
+ * waiting for syn retransmit to pop to call cleanup from the right
+ * thread. */
+ if (tc->flags & TCP_CONN_HALF_OPEN_DONE)
+ {
+ ASSERT (tc->state == TCP_STATE_SYN_SENT);
+ if (tcp_half_open_connection_cleanup (tc))
+ {
+ clib_warning ("could not remove half-open connection");
+ ASSERT (0);
+ }
+ return;
+ }
+
/* Try without increasing RTO a number of times. If this fails,
* start growing RTO exponentially */
if (tc->rto_boff > TCP_RTO_SYN_RETRIES)
diff --git a/src/vnet/tcp/tcp_test.c b/src/vnet/tcp/tcp_test.c
index 5c40ddf9ceb..37640cc61b9 100644
--- a/src/vnet/tcp/tcp_test.c
+++ b/src/vnet/tcp/tcp_test.c
@@ -1574,7 +1574,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
tc->connection.rmt_ip.ip4.as_u32 = clib_host_to_net_u32 (0x06000103);
tc->connection.lcl_port = 35051;
tc->connection.rmt_port = 53764;
- tc->connection.proto = 0;
+ tc->connection.transport_proto = 0;
clib_memcpy (tc1, &tc->connection, sizeof (*tc1));
pool_get (session_manager_main.sessions[0], s);
@@ -1590,7 +1590,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
tc->connection.rmt_ip.ip4.as_u32 = clib_host_to_net_u32 (0x06000102);
tc->connection.lcl_port = 38225;
tc->connection.rmt_port = 53764;
- tc->connection.proto = 0;
+ tc->connection.transport_proto = 0;
clib_memcpy (tc2, &tc->connection, sizeof (*tc2));
/*
@@ -1601,7 +1601,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
tconn = stream_session_lookup_transport_wt4 (&tc1->lcl_ip.ip4,
&tc1->rmt_ip.ip4,
tc1->lcl_port, tc1->rmt_port,
- tc1->proto, 0);
+ tc1->transport_proto, 0);
cmp = (memcmp (&tconn->rmt_ip, &tc1->rmt_ip, sizeof (tc1->rmt_ip)) == 0);
TCP_TEST ((cmp), "rmt ip is identical %d", cmp);
TCP_TEST ((tconn->lcl_port == tc1->lcl_port),
@@ -1614,7 +1614,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
tconn = stream_session_lookup_transport_wt4 (&tc2->lcl_ip.ip4,
&tc2->rmt_ip.ip4,
tc2->lcl_port, tc2->rmt_port,
- tc2->proto, 0);
+ tc2->transport_proto, 0);
TCP_TEST ((tconn == 0), "lookup result should be null");
/*
@@ -1624,12 +1624,12 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
tconn = stream_session_lookup_transport_wt4 (&tc1->lcl_ip.ip4,
&tc1->rmt_ip.ip4,
tc1->lcl_port, tc1->rmt_port,
- tc1->proto, 0);
+ tc1->transport_proto, 0);
TCP_TEST ((tconn == 0), "lookup result should be null");
tconn = stream_session_lookup_transport_wt4 (&tc2->lcl_ip.ip4,
&tc2->rmt_ip.ip4,
tc2->lcl_port, tc2->rmt_port,
- tc2->proto, 0);
+ tc2->transport_proto, 0);
TCP_TEST ((tconn == 0), "lookup result should be null");
/*
@@ -1639,7 +1639,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input)
tconn = stream_session_lookup_transport_wt4 (&tc2->lcl_ip.ip4,
&tc2->rmt_ip.ip4,
tc2->lcl_port, tc2->rmt_port,
- tc2->proto, 0);
+ tc2->transport_proto, 0);
TCP_TEST ((tconn == 0), "lookup result should be null");
return 0;
diff --git a/src/vnet/udp/udp.c b/src/vnet/udp/udp.c
index ff76a82e4a4..fedf2cc0224 100644
--- a/src/vnet/udp/udp.c
+++ b/src/vnet/udp/udp.c
@@ -34,7 +34,7 @@ udp_session_bind_ip4 (u32 session_index, transport_endpoint_t * lcl)
memset (listener, 0, sizeof (udp_connection_t));
listener->c_lcl_port = clib_host_to_net_u16 (lcl->port);
listener->c_lcl_ip4.as_u32 = lcl->ip.ip4.as_u32;
- listener->c_proto = SESSION_TYPE_IP4_UDP;
+ listener->c_transport_proto = TRANSPORT_PROTO_UDP;
udp_register_dst_port (um->vlib_main, lcl->port, udp4_uri_input_node.index,
1 /* is_ipv4 */ );
return 0;
@@ -49,7 +49,7 @@ udp_session_bind_ip6 (u32 session_index, transport_endpoint_t * lcl)
pool_get (um->udp_listeners, listener);
listener->c_lcl_port = clib_host_to_net_u16 (lcl->port);
clib_memcpy (&listener->c_lcl_ip6, &lcl->ip.ip6, sizeof (ip6_address_t));
- listener->c_proto = SESSION_TYPE_IP6_UDP;
+ listener->c_transport_proto = TRANSPORT_PROTO_UDP;
udp_register_dst_port (um->vlib_main, lcl->port,
udp4_uri_input_node.index, 0 /* is_ipv4 */ );
return 0;
@@ -318,8 +318,8 @@ udp_init (vlib_main_t * vm)
/* Register as transport with URI */
- session_register_transport (SESSION_TYPE_IP4_UDP, &udp4_proto);
- session_register_transport (SESSION_TYPE_IP6_UDP, &udp6_proto);
+ session_register_transport (TRANSPORT_PROTO_UDP, 1, &udp4_proto);
+ session_register_transport (TRANSPORT_PROTO_UDP, 0, &udp6_proto);
/*
* Initialize data structures
diff --git a/src/vnet/udp/udp_input.c b/src/vnet/udp/udp_input.c
index 9a8ff076fb1..6ccb1e52942 100644
--- a/src/vnet/udp/udp_input.c
+++ b/src/vnet/udp/udp_input.c
@@ -179,7 +179,7 @@ udp4_uri_input_node_fn (vlib_main_t * vm,
us->c_rmt_ip4.as_u32 = ip0->src_address.as_u32;
us->c_lcl_port = udp0->dst_port;
us->c_rmt_port = udp0->src_port;
- us->c_proto = SESSION_TYPE_IP4_UDP;
+ us->c_transport_proto = TRANSPORT_PROTO_UDP;
us->c_c_index = us - um->udp_sessions[my_thread_index];
/*