aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorFlorin Coras <fcoras@cisco.com>2019-02-16 20:47:32 -0800
committerDamjan Marion <dmarion@me.com>2019-02-18 20:43:33 +0000
commit222e1f4160a5828bb2b5bf62716cd76664f6100b (patch)
treeeeb8b47fa94dc23152f1652e9dbd14c384d910ff /src
parenta333795d1c425877645754a384af47755a80712e (diff)
tcp: harden for high scale scenarios
- Better handle buffer starvation scenarios - Handle case when both peers enter recovery due to packet loss. - Fix passive open establish cleanup Change-Id: I2f28baa2ff0383bb8f5f6d2452b49aa38ce69bce Signed-off-by: Florin Coras <fcoras@cisco.com>
Diffstat (limited to 'src')
-rw-r--r--src/vnet/session-apps/echo_client.c10
-rw-r--r--src/vnet/session/application.c1
-rw-r--r--src/vnet/session/session.c7
-rw-r--r--src/vnet/session/session_types.h1
-rw-r--r--src/vnet/tcp/tcp.c8
-rw-r--r--src/vnet/tcp/tcp.h2
-rw-r--r--src/vnet/tcp/tcp_error.def1
-rw-r--r--src/vnet/tcp/tcp_input.c44
-rw-r--r--src/vnet/tcp/tcp_output.c43
9 files changed, 82 insertions, 35 deletions
diff --git a/src/vnet/session-apps/echo_client.c b/src/vnet/session-apps/echo_client.c
index 8b7788fa266..cb93e81054c 100644
--- a/src/vnet/session-apps/echo_client.c
+++ b/src/vnet/session-apps/echo_client.c
@@ -605,13 +605,11 @@ echo_clients_connect (vlib_main_t * vm, u32 n_clients)
return clib_error_return (0, "connect returned: %d", rv);
/* Crude pacing for call setups */
- if ((i % 4) == 0)
- vlib_process_suspend (vm, 10e-6);
+ if ((i % 16) == 0)
+ vlib_process_suspend (vm, 100e-6);
ASSERT (i + 1 >= ecm->ready_connections);
- while (i + 1 - ecm->ready_connections > 1000)
- {
- vlib_process_suspend (vm, 100e-6);
- }
+ while (i + 1 - ecm->ready_connections > 128)
+ vlib_process_suspend (vm, 1e-3);
}
return 0;
}
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index e6292157dfa..ee69cf88c88 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -73,6 +73,7 @@ application_local_listen_session_alloc (application_t * app)
ll->session_index = ll - app->local_listen_sessions;
ll->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE, 0);
ll->app_index = app->app_index;
+ ll->session_state = SESSION_STATE_LISTENING;
return ll;
}
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index a94a0c40882..58e085ee29b 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -862,6 +862,12 @@ session_transport_delete_notify (transport_connection_t * tc)
switch (s->session_state)
{
+ case SESSION_STATE_CREATED:
+ /* Session was created but accept notification was not yet sent to the
+ * app. Cleanup everything. */
+ session_lookup_del_session (s);
+ session_free_w_fifos (s);
+ break;
case SESSION_STATE_ACCEPTING:
case SESSION_STATE_TRANSPORT_CLOSING:
/* If transport finishes or times out before we get a reply
@@ -963,6 +969,7 @@ session_stream_accept (transport_connection_t * tc, u32 listener_index,
s->app_wrk_index = app_wrk->wrk_index;
s->listener_index = listener_index;
+ s->session_state = SESSION_STATE_CREATED;
/* Shoulder-tap the server */
if (notify)
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 846af25c777..e10dceafa16 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -113,6 +113,7 @@ typedef u64 session_handle_t;
*/
typedef enum
{
+ SESSION_STATE_CREATED,
SESSION_STATE_LISTENING,
SESSION_STATE_CONNECTING,
SESSION_STATE_ACCEPTING,
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index 81f209b5d7c..c51224447fc 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Cisco and/or its affiliates.
+ * Copyright (c) 2016-2019 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -1244,11 +1244,13 @@ tcp_timer_establish_handler (u32 conn_index)
if (PREDICT_FALSE (tc == 0))
return;
ASSERT (tc->state == TCP_STATE_SYN_RCVD);
+ tc->timers[TCP_TIMER_ESTABLISH] = TCP_TIMER_HANDLE_INVALID;
+ tcp_connection_set_state (tc, TCP_STATE_CLOSED);
/* Start cleanup. App wasn't notified yet so use delete notify as
* opposed to delete to cleanup session layer state. */
+ tcp_connection_timers_reset (tc);
session_transport_delete_notify (&tc->connection);
- tc->timers[TCP_TIMER_ESTABLISH] = TCP_TIMER_HANDLE_INVALID;
- tcp_connection_cleanup (tc);
+ tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
}
static void
diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h
index 68750ce373f..d1fbf156ed5 100644
--- a/src/vnet/tcp/tcp.h
+++ b/src/vnet/tcp/tcp.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Cisco and/or its affiliates.
+ * Copyright (c) 2016-2019 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
diff --git a/src/vnet/tcp/tcp_error.def b/src/vnet/tcp/tcp_error.def
index 7bed10f1840..7dbe952e104 100644
--- a/src/vnet/tcp/tcp_error.def
+++ b/src/vnet/tcp/tcp_error.def
@@ -28,6 +28,7 @@ tcp_error (SEGMENT_INVALID, "Invalid segments")
tcp_error (SYNS_RCVD, "SYNs received")
tcp_error (SPURIOUS_SYN, "Spurious SYNs received")
tcp_error (SYN_ACKS_RCVD, "SYN-ACKs received")
+tcp_error (SPURIOUS_SYN_ACK, "Spurious SYN-ACKs received")
tcp_error (MSG_QUEUE_FULL, "Events not sent for lack of msg queue space")
tcp_error (CREATE_SESSION_FAIL, "Sessions couldn't be allocated")
tcp_error (ACK_OK, "Pure ACKs received")
diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c
index 6514dca4b4a..3d8ae890372 100644
--- a/src/vnet/tcp/tcp_input.c
+++ b/src/vnet/tcp/tcp_input.c
@@ -350,20 +350,34 @@ tcp_segment_validate (tcp_worker_ctx_t * wrk, tcp_connection_t * tc0,
goto error;
}
- *error0 = TCP_ERROR_RCV_WND;
/* If our window is 0 and the packet is in sequence, let it pass
* through for ack processing. It should be dropped later. */
- if (!(tc0->rcv_wnd == 0
- && tc0->rcv_nxt == vnet_buffer (b0)->tcp.seq_number))
+ if (tc0->rcv_wnd == 0
+ && tc0->rcv_nxt == vnet_buffer (b0)->tcp.seq_number)
+ goto check_reset;
+
+ /* If we entered recovery and peer did so as well, there's a chance that
+ * dup acks won't be acceptable on either end because seq_end may be less
+ * than rcv_las. This can happen if acks are lost in both directions. */
+ if (tcp_in_recovery (tc0)
+ && seq_geq (vnet_buffer (b0)->tcp.seq_number,
+ tc0->rcv_las - tc0->rcv_wnd)
+ && seq_leq (vnet_buffer (b0)->tcp.seq_end,
+ tc0->rcv_nxt + tc0->rcv_wnd))
+ goto check_reset;
+
+ *error0 = TCP_ERROR_RCV_WND;
+
+ /* If not RST, send dup ack */
+ if (!tcp_rst (th0))
{
- /* If not RST, send dup ack */
- if (!tcp_rst (th0))
- {
- tcp_program_dupack (wrk, tc0);
- TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0, vnet_buffer (b0)->tcp);
- }
- goto error;
+ tcp_program_dupack (wrk, tc0);
+ TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0, vnet_buffer (b0)->tcp);
}
+ goto error;
+
+ check_reset:
+ ;
}
/* 2nd: check the RST bit */
@@ -507,6 +521,7 @@ tcp_estimate_initial_rtt (tcp_connection_t * tc)
if (tc->rtt_ts)
{
tc->mrtt_us = tcp_time_now_us (thread_index) - tc->rtt_ts;
+ tc->mrtt_us = clib_max (tc->mrtt_us, 0.0001);
mrtt = clib_max ((u32) (tc->mrtt_us * THZ), 1);
tc->rtt_ts = 0;
}
@@ -514,6 +529,9 @@ tcp_estimate_initial_rtt (tcp_connection_t * tc)
{
mrtt = tcp_time_now_w_thread (thread_index) - tc->rcv_opts.tsecr;
mrtt = clib_max (mrtt, 1);
+ /* Due to retransmits we don't know the initial mrtt */
+ if (tc->rto_boff && mrtt > 1 * THZ)
+ mrtt = 1 * THZ;
tc->mrtt_us = (f64) mrtt *TCP_TICK;
}
@@ -2360,6 +2378,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
/* Make sure the connection actually exists */
ASSERT (tcp_lookup_connection (tc0->c_fib_index, b0,
my_thread_index, is_ip4));
+ error0 = TCP_ERROR_SPURIOUS_SYN_ACK;
goto drop;
}
@@ -2441,7 +2460,6 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
/* No SYN flag. Drop. */
if (!tcp_syn (tcp0))
{
- clib_warning ("not synack");
error0 = TCP_ERROR_SEGMENT_INVALID;
goto drop;
}
@@ -2449,7 +2467,6 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
/* Parse options */
if (tcp_options_parse (tcp0, &tc0->rcv_opts, 1))
{
- clib_warning ("options parse fail");
error0 = TCP_ERROR_OPTIONS;
goto drop;
}
@@ -3676,6 +3693,9 @@ do { \
_(SYN_SENT, TCP_FLAG_RST, TCP_INPUT_NEXT_SYN_SENT, TCP_ERROR_NONE);
_(SYN_SENT, TCP_FLAG_RST | TCP_FLAG_ACK, TCP_INPUT_NEXT_SYN_SENT,
TCP_ERROR_NONE);
+ _(SYN_SENT, TCP_FLAG_FIN, TCP_INPUT_NEXT_SYN_SENT, TCP_ERROR_NONE);
+ _(SYN_SENT, TCP_FLAG_FIN | TCP_FLAG_ACK, TCP_INPUT_NEXT_SYN_SENT,
+ TCP_ERROR_NONE);
/* ACK for for established connection -> tcp-established. */
_(ESTABLISHED, TCP_FLAG_ACK, TCP_INPUT_NEXT_ESTABLISHED, TCP_ERROR_NONE);
/* FIN for for established connection -> tcp-established. */
diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c
index 9f38851b026..725ffec0852 100644
--- a/src/vnet/tcp/tcp_output.c
+++ b/src/vnet/tcp/tcp_output.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 Cisco and/or its affiliates.
+ * Copyright (c) 2016-2019 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -587,11 +587,6 @@ tcp_make_synack (tcp_connection_t * tc, vlib_buffer_t * b)
tcp_options_write ((u8 *) (th + 1), snd_opts);
vnet_buffer (b)->tcp.connection_index = tc->c_c_index;
-
- /* Init retransmit timer. Use update instead of set because of
- * retransmissions */
- tcp_retransmit_timer_force_update (tc);
- TCP_EVT_DBG (TCP_EVT_SYNACK_SENT, tc);
}
always_inline void
@@ -951,7 +946,10 @@ tcp_send_syn (tcp_connection_t * tc)
tc->rto * TCP_TO_TIMER_TICK);
if (PREDICT_FALSE (!vlib_buffer_alloc (vm, &bi, 1)))
- return;
+ {
+ tcp_timer_update (tc, TCP_TIMER_RETRANSMIT_SYN, 1);
+ return;
+ }
b = vlib_get_buffer (vm, bi);
tcp_init_buffer (vm, b);
@@ -975,14 +973,20 @@ tcp_send_synack (tcp_connection_t * tc)
vlib_buffer_t *b;
u32 bi;
+ tcp_retransmit_timer_force_update (tc);
+
if (PREDICT_FALSE (!vlib_buffer_alloc (vm, &bi, 1)))
- return;
+ {
+ tcp_timer_update (tc, TCP_TIMER_RETRANSMIT, 1);
+ return;
+ }
tc->rtt_ts = tcp_time_now_us (tc->c_thread_index);
b = vlib_get_buffer (vm, bi);
tcp_init_buffer (vm, b);
tcp_make_synack (tc, b);
tcp_enqueue_to_output (wrk, b, bi, tc->c_is_ip4);
+ TCP_EVT_DBG (TCP_EVT_SYNACK_SENT, tc);
}
/**
@@ -1050,6 +1054,9 @@ tcp_send_fin (tcp_connection_t * tc)
tcp_timer_update (tc, TCP_TIMER_RETRANSMIT, 1);
if (fin_snt)
tc->snd_nxt = tc->snd_una_max;
+ else
+ /* Make sure retransmit retries a fin not data */
+ tc->flags |= TCP_CONN_FINSNT;
return;
}
@@ -1195,7 +1202,10 @@ tcp_send_ack (tcp_connection_t * tc)
u32 bi;
if (PREDICT_FALSE (!vlib_buffer_alloc (vm, &bi, 1)))
- return;
+ {
+ tcp_update_rcv_wnd (tc);
+ return;
+ }
b = vlib_get_buffer (vm, bi);
tcp_init_buffer (vm, b);
tcp_make_ack (tc, b);
@@ -1458,6 +1468,8 @@ tcp_rxt_timeout_cc (tcp_connection_t * tc)
scoreboard_clear (&tc->sack_sb);
tcp_cc_fastrecovery_exit (tc);
}
+ else
+ tc->rcv_dupacks = 0;
/* Start again from the beginning */
tc->cc_algo->congestion (tc);
@@ -1504,7 +1516,7 @@ tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 2);
/* Lost FIN, retransmit and return */
- if (tcp_is_lost_fin (tc))
+ if (tc->flags & TCP_CONN_FINSNT)
{
tcp_send_fin (tc);
tc->rto_boff += 1;
@@ -1553,7 +1565,7 @@ tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
n_bytes = tcp_prepare_retransmit_segment (wrk, tc, 0, tc->snd_mss, &b);
if (!n_bytes)
{
- tcp_retransmit_timer_force_update (tc);
+ tcp_timer_update (tc, TCP_TIMER_RETRANSMIT, 1);
return;
}
@@ -1591,7 +1603,10 @@ tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
tc->rto * TCP_TO_TIMER_TICK);
if (PREDICT_FALSE (!vlib_buffer_alloc (vm, &bi, 1)))
- return;
+ {
+ tcp_timer_update (tc, TCP_TIMER_RETRANSMIT_SYN, 1);
+ return;
+ }
b = vlib_get_buffer (vm, bi);
tcp_init_buffer (vm, b);
@@ -1614,9 +1629,11 @@ tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
tc->rto = clib_min (tc->rto << 1, TCP_RTO_MAX);
tc->rtt_ts = 0;
+ tcp_retransmit_timer_force_update (tc);
+
if (PREDICT_FALSE (!vlib_buffer_alloc (vm, &bi, 1)))
{
- tcp_retransmit_timer_force_update (tc);
+ tcp_timer_update (tc, TCP_TIMER_RETRANSMIT, 1);
return;
}
"dut_version", "passed", "test_name_long", "test_name_short", "test_type", "version" ] # load schemas with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: schema = StructType.fromJson(load(f_schema)) # create empty DF out of schemas sdf = spark.createDataFrame([], schema) # filter list filtered = [path for path in paths if schema_name in path] # select for path in filtered: print(path) sdf_loaded = spark \ .read \ .option("multiline", "true") \ .schema(schema) \ .json(path) \ .withColumn("job", lit(path.split("/")[4])) \ .withColumn("build", lit(path.split("/")[5])) sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) # drop rows with all nulls and drop rows with null in critical frames sdf = sdf.na.drop(how="all") sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) # flatten frame sdf = flatten_frame(sdf) return sdf # create SparkContext and GlueContext spark_context = SparkContext.getOrCreate() spark_context.setLogLevel("WARN") glue_context = GlueContext(spark_context) spark = glue_context.spark_session # files of interest paths = wr.s3.list_objects( path=PATH, suffix=SUFFIX, last_modified_begin=LAST_MODIFIED_BEGIN, last_modified_end=LAST_MODIFIED_END, ignore_suffix=IGNORE_SUFFIX, ignore_empty=True ) filtered_paths = [path for path in paths if "report-coverage-2306" in path] out_sdf = process_json_to_dataframe("device", filtered_paths) out_sdf.printSchema() out_sdf = out_sdf \ .withColumn("year", lit(datetime.now().year)) \ .withColumn("month", lit(datetime.now().month)) \ .withColumn("day", lit(datetime.now().day)) \ .repartition(1) try: wr.s3.to_parquet( df=out_sdf.toPandas(), path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2306", dataset=True, partition_cols=["test_type", "year", "month", "day"], compression="snappy", use_threads=True, mode="overwrite_partitions", boto3_session=session.Session( aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], region_name=environ["OUT_AWS_DEFAULT_REGION"] ) ) except EmptyDataFrame: pass