aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/plugins/hs_apps/echo_client.c576
-rw-r--r--src/plugins/hs_apps/echo_client.h61
2 files changed, 319 insertions, 318 deletions
diff --git a/src/plugins/hs_apps/echo_client.c b/src/plugins/hs_apps/echo_client.c
index d2ae252cd98..1d33a855240 100644
--- a/src/plugins/hs_apps/echo_client.c
+++ b/src/plugins/hs_apps/echo_client.c
@@ -15,22 +15,19 @@
* limitations under the License.
*/
-#include <vnet/vnet.h>
-#include <vlibapi/api.h>
-#include <vlibmemory/api.h>
#include <hs_apps/echo_client.h>
-echo_client_main_t echo_client_main;
+static ec_main_t ec_main;
-#define ECHO_CLIENT_DBG (0)
-#define DBG(_fmt, _args...) \
- if (ECHO_CLIENT_DBG) \
- clib_warning (_fmt, ##_args)
+#define EC_DBG (0)
+#define DBG(_fmt, _args...) \
+ if (EC_DBG) \
+ clib_warning (_fmt, ##_args)
static void
signal_evt_to_cli_i (int *code)
{
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
ASSERT (vlib_get_thread_index () == 0);
vlib_process_signal_event (ecm->vlib_main, ecm->cli_node_index, *code, 0);
}
@@ -45,8 +42,32 @@ signal_evt_to_cli (int code)
signal_evt_to_cli_i (&code);
}
+static inline ec_worker_t *
+ec_worker_get (u32 thread_index)
+{
+ return vec_elt_at_index (ec_main.wrk, thread_index);
+}
+
+static inline ec_session_t *
+ec_session_alloc (ec_worker_t *wrk)
+{
+ ec_session_t *ecs;
+
+ pool_get_zero (wrk->sessions, ecs);
+ ecs->data.session_index = ecs - wrk->sessions;
+ ecs->thread_index = wrk->thread_index;
+
+ return ecs;
+}
+
+static inline ec_session_t *
+ec_session_get (ec_worker_t *wrk, u32 ec_index)
+{
+ return pool_elt_at_index (wrk->sessions, ec_index);
+}
+
static void
-send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
+send_data_chunk (ec_main_t *ecm, ec_session_t *es)
{
u8 *test_data = ecm->connect_test_data;
int test_buf_len, test_buf_offset, rv;
@@ -54,27 +75,27 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
test_buf_len = vec_len (test_data);
ASSERT (test_buf_len > 0);
- test_buf_offset = s->bytes_sent % test_buf_len;
- bytes_this_chunk = clib_min (test_buf_len - test_buf_offset,
- s->bytes_to_send);
+ test_buf_offset = es->bytes_sent % test_buf_len;
+ bytes_this_chunk =
+ clib_min (test_buf_len - test_buf_offset, es->bytes_to_send);
if (!ecm->is_dgram)
{
if (ecm->no_copy)
{
- svm_fifo_t *f = s->data.tx_fifo;
+ svm_fifo_t *f = es->data.tx_fifo;
rv = clib_min (svm_fifo_max_enqueue_prod (f), bytes_this_chunk);
svm_fifo_enqueue_nocopy (f, rv);
session_send_io_evt_to_thread_custom (
- &f->shr->master_session_index, s->thread_index, SESSION_IO_EVT_TX);
+ &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
}
else
- rv = app_send_stream (&s->data, test_data + test_buf_offset,
+ rv = app_send_stream (&es->data, test_data + test_buf_offset,
bytes_this_chunk, 0);
}
else
{
- svm_fifo_t *f = s->data.tx_fifo;
+ svm_fifo_t *f = es->data.tx_fifo;
u32 max_enqueue = svm_fifo_max_enqueue_prod (f);
if (max_enqueue < sizeof (session_dgram_hdr_t))
@@ -85,7 +106,7 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
if (ecm->no_copy)
{
session_dgram_hdr_t hdr;
- app_session_transport_t *at = &s->data.transport;
+ app_session_transport_t *at = &es->data.transport;
rv = clib_min (max_enqueue, bytes_this_chunk);
@@ -101,12 +122,12 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
svm_fifo_enqueue (f, sizeof (hdr), (u8 *) & hdr);
svm_fifo_enqueue_nocopy (f, rv);
session_send_io_evt_to_thread_custom (
- &f->shr->master_session_index, s->thread_index, SESSION_IO_EVT_TX);
+ &es->vpp_session_index, es->thread_index, SESSION_IO_EVT_TX);
}
else
{
bytes_this_chunk = clib_min (bytes_this_chunk, max_enqueue);
- rv = app_send_dgram (&s->data, test_data + test_buf_offset,
+ rv = app_send_dgram (&es->data, test_data + test_buf_offset,
bytes_this_chunk, 0);
}
}
@@ -115,45 +136,43 @@ send_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
if (rv > 0)
{
/* Account for it... */
- s->bytes_to_send -= rv;
- s->bytes_sent += rv;
+ es->bytes_to_send -= rv;
+ es->bytes_sent += rv;
- if (ECHO_CLIENT_DBG)
+ if (EC_DBG)
{
- /* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "tx-enq: xfer %d bytes, sent %u remain %u",
.format_args = "i4i4i4",
};
- /* *INDENT-ON* */
struct
{
u32 data[3];
} *ed;
ed = ELOG_DATA (&vlib_global_main.elog_main, e);
ed->data[0] = rv;
- ed->data[1] = s->bytes_sent;
- ed->data[2] = s->bytes_to_send;
+ ed->data[1] = es->bytes_sent;
+ ed->data[2] = es->bytes_to_send;
}
}
}
static void
-receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
+receive_data_chunk (ec_worker_t *wrk, ec_session_t *es)
{
- svm_fifo_t *rx_fifo = s->data.rx_fifo;
- u32 thread_index = vlib_get_thread_index ();
+ ec_main_t *ecm = &ec_main;
+ svm_fifo_t *rx_fifo = es->data.rx_fifo;
int n_read, i;
if (ecm->test_bytes)
{
if (!ecm->is_dgram)
- n_read = app_recv_stream (&s->data, ecm->rx_buf[thread_index],
- vec_len (ecm->rx_buf[thread_index]));
+ n_read =
+ app_recv_stream (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
else
- n_read = app_recv_dgram (&s->data, ecm->rx_buf[thread_index],
- vec_len (ecm->rx_buf[thread_index]));
+ n_read =
+ app_recv_dgram (&es->data, wrk->rx_buf, vec_len (wrk->rx_buf));
}
else
{
@@ -163,15 +182,13 @@ receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
if (n_read > 0)
{
- if (ECHO_CLIENT_DBG)
+ if (EC_DBG)
{
- /* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "rx-deq: %d bytes",
.format_args = "i4",
};
- /* *INDENT-ON* */
struct
{
u32 data[1];
@@ -184,65 +201,64 @@ receive_data_chunk (echo_client_main_t * ecm, eclient_session_t * s)
{
for (i = 0; i < n_read; i++)
{
- if (ecm->rx_buf[thread_index][i]
- != ((s->bytes_received + i) & 0xff))
+ if (wrk->rx_buf[i] != ((es->bytes_received + i) & 0xff))
{
clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
- n_read, s->bytes_received + i,
- ecm->rx_buf[thread_index][i],
- ((s->bytes_received + i) & 0xff));
+ n_read, es->bytes_received + i, wrk->rx_buf[i],
+ ((es->bytes_received + i) & 0xff));
ecm->test_failed = 1;
}
}
}
- ASSERT (n_read <= s->bytes_to_receive);
- s->bytes_to_receive -= n_read;
- s->bytes_received += n_read;
+ ASSERT (n_read <= es->bytes_to_receive);
+ es->bytes_to_receive -= n_read;
+ es->bytes_received += n_read;
}
}
static uword
-echo_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
- vlib_frame_t * frame)
+ec_node_fn (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
{
- echo_client_main_t *ecm = &echo_client_main;
- int my_thread_index = vlib_get_thread_index ();
- eclient_session_t *sp;
- int i;
- int delete_session;
- u32 *connection_indices;
- u32 *connections_this_batch;
- u32 nconnections_this_batch;
-
- connection_indices = ecm->connection_index_by_thread[my_thread_index];
- connections_this_batch =
- ecm->connections_this_batch_by_thread[my_thread_index];
-
- if ((ecm->run_test != ECHO_CLIENTS_RUNNING) ||
- ((vec_len (connection_indices) == 0)
- && vec_len (connections_this_batch) == 0))
+ u32 *conn_indices, *conns_this_batch, nconns_this_batch;
+ int thread_index = vm->thread_index, i, delete_session;
+ ec_main_t *ecm = &ec_main;
+ ec_worker_t *wrk;
+ ec_session_t *es;
+ session_t *s;
+
+ if (ecm->run_test != EC_RUNNING)
+ return 0;
+
+ wrk = ec_worker_get (thread_index);
+ conn_indices = wrk->conn_indices;
+ conns_this_batch = wrk->conns_this_batch;
+
+ if (((vec_len (conn_indices) == 0) && vec_len (conns_this_batch) == 0))
return 0;
/* Grab another pile of connections */
- if (PREDICT_FALSE (vec_len (connections_this_batch) == 0))
+ if (PREDICT_FALSE (vec_len (conns_this_batch) == 0))
{
- nconnections_this_batch =
- clib_min (ecm->connections_per_batch, vec_len (connection_indices));
-
- ASSERT (nconnections_this_batch > 0);
- vec_validate (connections_this_batch, nconnections_this_batch - 1);
- clib_memcpy_fast (connections_this_batch,
- connection_indices + vec_len (connection_indices)
- - nconnections_this_batch,
- nconnections_this_batch * sizeof (u32));
- _vec_len (connection_indices) -= nconnections_this_batch;
+ nconns_this_batch =
+ clib_min (ecm->connections_per_batch, vec_len (conn_indices));
+
+ ASSERT (nconns_this_batch > 0);
+ vec_validate (conns_this_batch, nconns_this_batch - 1);
+ clib_memcpy_fast (conns_this_batch,
+ conn_indices + vec_len (conn_indices) -
+ nconns_this_batch,
+ nconns_this_batch * sizeof (u32));
+ _vec_len (conn_indices) -= nconns_this_batch;
}
- if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch
- && ecm->prev_conns == vec_len (connections_this_batch)))
+ /*
+ * Track progress
+ */
+ if (PREDICT_FALSE (ecm->prev_conns != ecm->connections_per_batch &&
+ ecm->prev_conns == vec_len (conns_this_batch)))
{
ecm->repeats++;
- ecm->prev_conns = vec_len (connections_this_batch);
+ ecm->prev_conns = vec_len (conns_this_batch);
if (ecm->repeats == 500000)
{
clib_warning ("stuck clients");
@@ -250,32 +266,35 @@ echo_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
}
else
{
- ecm->prev_conns = vec_len (connections_this_batch);
+ ecm->prev_conns = vec_len (conns_this_batch);
ecm->repeats = 0;
}
- for (i = 0; i < vec_len (connections_this_batch); i++)
+ /*
+ * Handle connections in this batch
+ */
+ for (i = 0; i < vec_len (conns_this_batch); i++)
{
- delete_session = 1;
+ es = ec_session_get (wrk, conns_this_batch[i]);
- sp = pool_elt_at_index (ecm->sessions, connections_this_batch[i]);
+ delete_session = 1;
- if (sp->bytes_to_send > 0)
+ if (es->bytes_to_send > 0)
{
- send_data_chunk (ecm, sp);
+ send_data_chunk (ecm, es);
delete_session = 0;
}
- if (sp->bytes_to_receive > 0)
+
+ if (es->bytes_to_receive > 0)
{
delete_session = 0;
}
+
if (PREDICT_FALSE (delete_session == 1))
{
- session_t *s;
-
- clib_atomic_fetch_add (&ecm->tx_total, sp->bytes_sent);
- clib_atomic_fetch_add (&ecm->rx_total, sp->bytes_received);
- s = session_get_from_handle_if_valid (sp->vpp_session_handle);
+ clib_atomic_fetch_add (&ecm->tx_total, es->bytes_sent);
+ clib_atomic_fetch_add (&ecm->rx_total, es->bytes_received);
+ s = session_get_from_handle_if_valid (es->vpp_session_handle);
if (s)
{
@@ -284,40 +303,38 @@ echo_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
a->app_index = ecm->app_index;
vnet_disconnect_session (a);
- vec_delete (connections_this_batch, 1, i);
+ vec_delete (conns_this_batch, 1, i);
i--;
clib_atomic_fetch_add (&ecm->ready_connections, -1);
}
else
{
clib_warning ("session AWOL?");
- vec_delete (connections_this_batch, 1, i);
+ vec_delete (conns_this_batch, 1, i);
}
/* Kick the debug CLI process */
if (ecm->ready_connections == 0)
{
- signal_evt_to_cli (2);
+ signal_evt_to_cli (EC_CLI_TEST_DONE);
}
}
}
- ecm->connection_index_by_thread[my_thread_index] = connection_indices;
- ecm->connections_this_batch_by_thread[my_thread_index] =
- connections_this_batch;
+ wrk->conn_indices = conn_indices;
+ wrk->conns_this_batch = conns_this_batch;
return 0;
}
-VLIB_REGISTER_NODE (echo_clients_node) =
-{
- .function = echo_client_node_fn,
+VLIB_REGISTER_NODE (echo_clients_node) = {
+ .function = ec_node_fn,
.name = "echo-clients",
.type = VLIB_NODE_TYPE_INPUT,
.state = VLIB_NODE_STATE_DISABLED,
};
static void
-ec_reset_runtime_config (echo_client_main_t *ecm)
+ec_reset_runtime_config (ec_main_t *ecm)
{
ecm->n_clients = 1;
ecm->quic_streams = 1;
@@ -332,7 +349,7 @@ ec_reset_runtime_config (echo_client_main_t *ecm)
ecm->test_failed = 0;
ecm->tls_engine = CRYPTO_ENGINE_OPENSSL;
ecm->no_copy = 0;
- ecm->run_test = ECHO_CLIENTS_STARTING;
+ ecm->run_test = EC_STARTING;
ecm->ready_connections = 0;
ecm->connect_conn_index = 0;
ecm->rx_total = 0;
@@ -349,10 +366,11 @@ ec_reset_runtime_config (echo_client_main_t *ecm)
}
static int
-echo_clients_init (vlib_main_t * vm)
+ec_init (vlib_main_t *vm)
{
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
vlib_thread_main_t *vtm = vlib_get_thread_main ();
+ ec_worker_t *wrk;
u32 num_threads;
int i;
@@ -380,32 +398,30 @@ echo_clients_init (vlib_main_t * vm)
*/
vlib_node_set_state (vm, session_queue_node.index,
VLIB_NODE_STATE_POLLING);
-
- clib_spinlock_init (&ecm->sessions_lock);
}
/* App init done only once */
if (ecm->app_is_init)
return 0;
- num_threads = 1 /* main thread */ + vtm->n_threads;
/* Init test data. Big buffer */
vec_validate (ecm->connect_test_data, 4 * 1024 * 1024 - 1);
for (i = 0; i < vec_len (ecm->connect_test_data); i++)
ecm->connect_test_data[i] = i & 0xff;
- vec_validate (ecm->rx_buf, num_threads - 1);
- for (i = 0; i < num_threads; i++)
- vec_validate (ecm->rx_buf[i], vec_len (ecm->connect_test_data) - 1);
+ num_threads = 1 /* main thread */ + vtm->n_threads;
+ vec_validate (ecm->wrk, num_threads);
+ vec_foreach (wrk, ecm->wrk)
+ {
+ vec_validate (wrk->rx_buf, vec_len (ecm->connect_test_data) - 1);
+ wrk->thread_index = wrk - ecm->wrk;
+ wrk->vpp_event_queue =
+ session_main_get_vpp_event_queue (wrk->thread_index);
+ }
ecm->app_is_init = 1;
- vec_validate (ecm->connection_index_by_thread, vtm->n_vlib_mains);
- vec_validate (ecm->connections_this_batch_by_thread, vtm->n_vlib_mains);
- vec_validate (ecm->quic_session_index_by_thread, vtm->n_vlib_mains);
- vec_validate (ecm->vpp_event_queue, vtm->n_vlib_mains);
-
vlib_worker_thread_barrier_sync (vm);
vnet_session_enable_disable (vm, 1 /* turn on session and transports */);
vlib_worker_thread_barrier_release (vm);
@@ -419,38 +435,51 @@ echo_clients_init (vlib_main_t * vm)
}
static void
-echo_clients_cleanup (echo_client_main_t *ecm)
+ec_prealloc_sessions (ec_main_t *ecm)
{
- int i;
+ u32 sessions_per_wrk, n_wrks;
+ ec_worker_t *wrk;
- for (i = 0; i < vec_len (ecm->connection_index_by_thread); i++)
- {
- vec_reset_length (ecm->connection_index_by_thread[i]);
- vec_reset_length (ecm->connections_this_batch_by_thread[i]);
- vec_reset_length (ecm->quic_session_index_by_thread[i]);
- }
+ n_wrks = vlib_num_workers () ? vlib_num_workers () : 1;
+
+ sessions_per_wrk = ecm->n_clients / n_wrks;
+ vec_foreach (wrk, ecm->wrk)
+ pool_init_fixed (wrk->sessions, 1.1 * sessions_per_wrk);
+}
+
+static void
+ec_worker_cleanup (ec_worker_t *wrk)
+{
+ pool_free (wrk->sessions);
+ vec_free (wrk->conn_indices);
+ vec_free (wrk->conns_this_batch);
+}
+
+static void
+ec_cleanup (ec_main_t *ecm)
+{
+ ec_worker_t *wrk;
+
+ vec_foreach (wrk, ecm->wrk)
+ ec_worker_cleanup (wrk);
- pool_free (ecm->sessions);
vec_free (ecm->connect_uri);
vec_free (ecm->appns_id);
- clib_spinlock_free (&ecm->sessions_lock);
if (ecm->barrier_acq_needed)
vlib_worker_thread_barrier_sync (ecm->vlib_main);
}
static int
-quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
- session_t * s,
- session_error_t err)
+quic_ec_qsession_connected_callback (u32 app_index, u32 api_context,
+ session_t *s, session_error_t err)
{
- echo_client_main_t *ecm = &echo_client_main;
- vnet_connect_args_t *a = 0;
- int rv;
- u8 thread_index = vlib_get_thread_index ();
session_endpoint_cfg_t sep = SESSION_ENDPOINT_CFG_NULL;
- u32 stream_n;
+ ec_main_t *ecm = &ec_main;
+ vnet_connect_args_t *a = 0;
session_handle_t handle;
+ u32 stream_n;
+ int rv;
DBG ("QUIC Connection handle %d", session_handle (s));
@@ -479,102 +508,94 @@ quic_echo_clients_qsession_connected_callback (u32 app_index, u32 api_context,
* 's' is no longer valid, its underlying pool could have been moved in
* vnet_connect()
*/
- vec_add1 (ecm->quic_session_index_by_thread[thread_index], handle);
vec_free (a);
return 0;
}
static int
-quic_echo_clients_session_connected_callback (u32 app_index, u32 api_context,
- session_t * s,
- session_error_t err)
+quic_ec_session_connected_callback (u32 app_index, u32 api_context,
+ session_t *s, session_error_t err)
{
- echo_client_main_t *ecm = &echo_client_main;
- eclient_session_t *session;
- u32 session_index;
- u8 thread_index;
+ ec_main_t *ecm = &ec_main;
+ ec_session_t *es;
+ ec_worker_t *wrk;
+ u32 thread_index;
- if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
+ if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
return -1;
if (err)
{
clib_warning ("connection %d failed!", api_context);
- ecm->run_test = ECHO_CLIENTS_EXITING;
- signal_evt_to_cli (-1);
+ ecm->run_test = EC_EXITING;
+ signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
return 0;
}
if (s->listener_handle == SESSION_INVALID_HANDLE)
- return quic_echo_clients_qsession_connected_callback (app_index,
- api_context, s,
- err);
+ return quic_ec_qsession_connected_callback (app_index, api_context, s,
+ err);
DBG ("STREAM Connection callback %d", api_context);
thread_index = s->thread_index;
ASSERT (thread_index == vlib_get_thread_index ()
|| session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
- if (!ecm->vpp_event_queue[thread_index])
- ecm->vpp_event_queue[thread_index] =
- session_main_get_vpp_event_queue (thread_index);
+ wrk = ec_worker_get (thread_index);
/*
* Setup session
*/
- clib_spinlock_lock_if_init (&ecm->sessions_lock);
- pool_get (ecm->sessions, session);
- clib_spinlock_unlock_if_init (&ecm->sessions_lock);
-
- clib_memset (session, 0, sizeof (*session));
- session_index = session - ecm->sessions;
- session->bytes_to_send = ecm->bytes_to_send;
- session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
- session->data.rx_fifo = s->rx_fifo;
- session->data.rx_fifo->shr->client_session_index = session_index;
- session->data.tx_fifo = s->tx_fifo;
- session->data.tx_fifo->shr->client_session_index = session_index;
- session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
- session->vpp_session_handle = session_handle (s);
+ es = ec_session_alloc (wrk);
+
+ es->bytes_to_send = ecm->bytes_to_send;
+ es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
+ es->data.rx_fifo = s->rx_fifo;
+ es->data.rx_fifo->shr->client_session_index = es->data.session_index;
+ es->data.tx_fifo = s->tx_fifo;
+ es->data.tx_fifo->shr->client_session_index = es->data.session_index;
+ es->data.vpp_evt_q = wrk->vpp_event_queue;
+ es->vpp_session_handle = session_handle (s);
+ es->vpp_session_index = s->session_index;
+ s->opaque = es->data.session_index;
if (ecm->is_dgram)
{
transport_connection_t *tc;
tc = session_get_transport (s);
- clib_memcpy_fast (&session->data.transport, tc,
- sizeof (session->data.transport));
- session->data.is_dgram = 1;
+ clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
+ es->data.is_dgram = 1;
}
- vec_add1 (ecm->connection_index_by_thread[thread_index], session_index);
+ vec_add1 (wrk->conn_indices, es->data.session_index);
clib_atomic_fetch_add (&ecm->ready_connections, 1);
if (ecm->ready_connections == ecm->expected_connections)
{
- ecm->run_test = ECHO_CLIENTS_RUNNING;
+ ecm->run_test = EC_RUNNING;
/* Signal the CLI process that the action is starting... */
- signal_evt_to_cli (1);
+ signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
}
return 0;
}
static int
-echo_clients_session_connected_callback (u32 app_index, u32 api_context,
- session_t * s, session_error_t err)
+ec_session_connected_callback (u32 app_index, u32 api_context, session_t *s,
+ session_error_t err)
{
- echo_client_main_t *ecm = &echo_client_main;
- eclient_session_t *session;
- u32 session_index;
- u8 thread_index;
+ ec_main_t *ecm = &ec_main;
+ ec_session_t *es;
+ u32 thread_index;
+ ec_worker_t *wrk;
- if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_STARTING))
+ if (PREDICT_FALSE (ecm->run_test != EC_STARTING))
return -1;
if (err)
{
clib_warning ("connection %d failed!", api_context);
- ecm->run_test = ECHO_CLIENTS_EXITING;
- signal_evt_to_cli (-1);
+ ecm->run_test = EC_EXITING;
+ signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
return 0;
}
@@ -582,53 +603,48 @@ echo_clients_session_connected_callback (u32 app_index, u32 api_context,
ASSERT (thread_index == vlib_get_thread_index ()
|| session_transport_service_type (s) == TRANSPORT_SERVICE_CL);
- if (!ecm->vpp_event_queue[thread_index])
- ecm->vpp_event_queue[thread_index] =
- session_main_get_vpp_event_queue (thread_index);
+ wrk = ec_worker_get (thread_index);
/*
* Setup session
*/
- clib_spinlock_lock_if_init (&ecm->sessions_lock);
- pool_get (ecm->sessions, session);
- clib_spinlock_unlock_if_init (&ecm->sessions_lock);
-
- clib_memset (session, 0, sizeof (*session));
- session_index = session - ecm->sessions;
- session->bytes_to_send = ecm->bytes_to_send;
- session->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
- session->data.rx_fifo = s->rx_fifo;
- session->data.rx_fifo->shr->client_session_index = session_index;
- session->data.tx_fifo = s->tx_fifo;
- session->data.tx_fifo->shr->client_session_index = session_index;
- session->data.vpp_evt_q = ecm->vpp_event_queue[thread_index];
- session->vpp_session_handle = session_handle (s);
+ es = ec_session_alloc (wrk);
+
+ es->bytes_to_send = ecm->bytes_to_send;
+ es->bytes_to_receive = ecm->no_return ? 0ULL : ecm->bytes_to_send;
+ es->data.rx_fifo = s->rx_fifo;
+ es->data.rx_fifo->shr->client_session_index = es->data.session_index;
+ es->data.tx_fifo = s->tx_fifo;
+ es->data.tx_fifo->shr->client_session_index = es->data.session_index;
+ es->data.vpp_evt_q = wrk->vpp_event_queue;
+ es->vpp_session_handle = session_handle (s);
+ es->vpp_session_index = s->session_index;
+ s->opaque = es->data.session_index;
if (ecm->is_dgram)
{
transport_connection_t *tc;
tc = session_get_transport (s);
- clib_memcpy_fast (&session->data.transport, tc,
- sizeof (session->data.transport));
- session->data.is_dgram = 1;
+ clib_memcpy_fast (&es->data.transport, tc, sizeof (es->data.transport));
+ es->data.is_dgram = 1;
}
- vec_add1 (ecm->connection_index_by_thread[thread_index], session_index);
+ vec_add1 (wrk->conn_indices, es->data.session_index);
clib_atomic_fetch_add (&ecm->ready_connections, 1);
if (ecm->ready_connections == ecm->expected_connections)
{
- ecm->run_test = ECHO_CLIENTS_RUNNING;
+ ecm->run_test = EC_RUNNING;
/* Signal the CLI process that the action is starting... */
- signal_evt_to_cli (1);
+ signal_evt_to_cli (EC_CLI_CONNECTS_DONE);
}
return 0;
}
static void
-echo_clients_session_reset_callback (session_t * s)
+ec_session_reset_callback (session_t *s)
{
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
vnet_disconnect_args_t _a = { 0 }, *a = &_a;
if (s->session_state == SESSION_STATE_READY)
@@ -641,15 +657,15 @@ echo_clients_session_reset_callback (session_t * s)
}
static int
-echo_clients_session_create_callback (session_t * s)
+ec_session_accept_callback (session_t *s)
{
return 0;
}
static void
-echo_clients_session_disconnect_callback (session_t * s)
+ec_session_disconnect_callback (session_t *s)
{
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
vnet_disconnect_args_t _a = { 0 }, *a = &_a;
a->handle = session_handle (s);
a->app_index = ecm->app_index;
@@ -658,9 +674,9 @@ echo_clients_session_disconnect_callback (session_t * s)
}
void
-echo_clients_session_disconnect (session_t * s)
+ec_session_disconnect (session_t *s)
{
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
vnet_disconnect_args_t _a = { 0 }, *a = &_a;
a->handle = session_handle (s);
a->app_index = ecm->app_index;
@@ -668,20 +684,22 @@ echo_clients_session_disconnect (session_t * s)
}
static int
-echo_clients_rx_callback (session_t * s)
+ec_session_rx_callback (session_t *s)
{
- echo_client_main_t *ecm = &echo_client_main;
- eclient_session_t *sp;
+ ec_main_t *ecm = &ec_main;
+ ec_worker_t *wrk;
+ ec_session_t *es;
- if (PREDICT_FALSE (ecm->run_test != ECHO_CLIENTS_RUNNING))
+ if (PREDICT_FALSE (ecm->run_test != EC_RUNNING))
{
- echo_clients_session_disconnect (s);
+ ec_session_disconnect (s);
return -1;
}
- sp =
- pool_elt_at_index (ecm->sessions, s->rx_fifo->shr->client_session_index);
- receive_data_chunk (ecm, sp);
+ wrk = ec_worker_get (s->thread_index);
+ es = ec_session_get (wrk, s->opaque);
+
+ receive_data_chunk (wrk, es);
if (svm_fifo_max_dequeue_cons (s->rx_fifo))
{
@@ -691,27 +709,34 @@ echo_clients_rx_callback (session_t * s)
return 0;
}
-int
-echo_client_add_segment_callback (u32 client_index, u64 segment_handle)
+static int
+ec_add_segment_callback (u32 app_index, u64 segment_handle)
+{
+ /* New segments may be added */
+ return 0;
+}
+
+static int
+ec_del_segment_callback (u32 app_index, u64 segment_handle)
{
- /* New heaps may be added */
return 0;
}
-static session_cb_vft_t echo_clients = {
- .session_reset_callback = echo_clients_session_reset_callback,
- .session_connected_callback = echo_clients_session_connected_callback,
- .session_accept_callback = echo_clients_session_create_callback,
- .session_disconnect_callback = echo_clients_session_disconnect_callback,
- .builtin_app_rx_callback = echo_clients_rx_callback,
- .add_segment_callback = echo_client_add_segment_callback
+static session_cb_vft_t ec_cb_vft = {
+ .session_reset_callback = ec_session_reset_callback,
+ .session_connected_callback = ec_session_connected_callback,
+ .session_accept_callback = ec_session_accept_callback,
+ .session_disconnect_callback = ec_session_disconnect_callback,
+ .builtin_app_rx_callback = ec_session_rx_callback,
+ .add_segment_callback = ec_add_segment_callback,
+ .del_segment_callback = ec_del_segment_callback,
};
static clib_error_t *
-echo_clients_attach ()
+ec_attach ()
{
vnet_app_add_cert_key_pair_args_t _ck_pair, *ck_pair = &_ck_pair;
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
vnet_app_attach_args_t _a, *a = &_a;
u32 prealloc_fifos;
u64 options[18];
@@ -723,9 +748,8 @@ echo_clients_attach ()
a->api_client_index = ~0;
a->name = format (0, "echo_client");
if (ecm->transport_proto == TRANSPORT_PROTO_QUIC)
- echo_clients.session_connected_callback =
- quic_echo_clients_session_connected_callback;
- a->session_cb_vft = &echo_clients;
+ ec_cb_vft.session_connected_callback = quic_ec_session_connected_callback;
+ a->session_cb_vft = &ec_cb_vft;
prealloc_fifos = ecm->prealloc_fifos ? ecm->expected_connections : 1;
@@ -767,9 +791,9 @@ echo_clients_attach ()
}
static int
-echo_clients_detach ()
+ec_detach ()
{
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
vnet_app_detach_args_t _da, *da = &_da;
int rv;
@@ -786,30 +810,6 @@ echo_clients_detach ()
return rv;
}
-static void *
-echo_client_thread_fn (void *arg)
-{
- return 0;
-}
-
-/** Start a transmit thread */
-int
-echo_clients_start_tx_pthread (echo_client_main_t * ecm)
-{
- if (ecm->client_thread_handle == 0)
- {
- int rv = pthread_create (&ecm->client_thread_handle,
- NULL /*attr */ ,
- echo_client_thread_fn, 0);
- if (rv)
- {
- ecm->client_thread_handle = 0;
- return -1;
- }
- }
- return 0;
-}
-
static int
ec_transport_needs_crypto (transport_proto_t proto)
{
@@ -817,10 +817,10 @@ ec_transport_needs_crypto (transport_proto_t proto)
proto == TRANSPORT_PROTO_QUIC;
}
-int
-echo_clients_connect_rpc (void *args)
+static int
+ec_connect_rpc (void *args)
{
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
vnet_connect_args_t _a = {}, *a = &_a;
vlib_main_t *vm = vlib_get_main ();
int rv, needs_crypto;
@@ -860,7 +860,8 @@ echo_clients_connect_rpc (void *args)
if (rv)
{
clib_warning ("connect returned: %U", format_session_error, rv);
- signal_evt_to_cli (2);
+ ecm->run_test = EC_EXITING;
+ signal_evt_to_cli (EC_CLI_CONNECTS_FAILED);
break;
}
@@ -870,15 +871,15 @@ echo_clients_connect_rpc (void *args)
vlib_worker_thread_barrier_release (vm);
if (ci < ecm->expected_connections)
- echo_clients_program_connects ();
+ ec_program_connects ();
return 0;
}
void
-echo_clients_program_connects (void)
+ec_program_connects (void)
{
- session_send_rpc_evt_to_thread_force (0, echo_clients_connect_rpc, 0);
+ session_send_rpc_evt_to_thread_force (0, ec_connect_rpc, 0);
}
#define ec_cli(_fmt, _args...) \
@@ -886,12 +887,12 @@ echo_clients_program_connects (void)
vlib_cli_output (vm, _fmt, ##_args)
static clib_error_t *
-echo_clients_command_fn (vlib_main_t *vm, unformat_input_t *input,
- vlib_cli_command_t *cmd)
+ec_command_fn (vlib_main_t *vm, unformat_input_t *input,
+ vlib_cli_command_t *cmd)
{
unformat_input_t _line_input, *line_input = &_line_input;
char *default_uri = "tcp://6.0.1.1/1234", *transfer_type;
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
uword *event_data = 0, event_type;
clib_error_t *error = 0;
int rv, had_config = 1;
@@ -901,7 +902,7 @@ echo_clients_command_fn (vlib_main_t *vm, unformat_input_t *input,
if (ecm->test_client_attached)
return clib_error_return (0, "failed: already running!");
- if (echo_clients_init (vm))
+ if (ec_init (vm))
{
error = clib_error_return (0, "failed init");
goto cleanup;
@@ -993,9 +994,9 @@ parse_config:
ecm->is_dgram = (ecm->transport_proto == TRANSPORT_PROTO_UDP);
if (ecm->prealloc_sessions)
- pool_init_fixed (ecm->sessions, 1.1 * ecm->n_clients);
+ ec_prealloc_sessions (ecm);
- if ((error = echo_clients_attach ()))
+ if ((error = ec_attach ()))
{
clib_error_report (error);
goto cleanup;
@@ -1006,7 +1007,7 @@ parse_config:
*/
ecm->syn_start_time = vlib_time_now (vm);
- echo_clients_program_connects ();
+ ec_program_connects ();
/*
* Park until the sessions come up, or syn_timeout seconds pass
@@ -1023,16 +1024,17 @@ parse_config:
ecm->ready_connections);
goto cleanup;
- case 1:
+ case EC_CLI_CONNECTS_DONE:
delta = vlib_time_now (vm) - ecm->syn_start_time;
if (delta != 0.0)
ec_cli ("%d three-way handshakes in %.2f seconds %.2f/s",
ecm->n_clients, delta, ((f64) ecm->n_clients) / delta);
break;
- case 2:
+ case EC_CLI_CONNECTS_FAILED:
error = clib_error_return (0, "failed: connect returned");
goto cleanup;
+
default:
ec_cli ("unexpected event(1): %d", event_type);
error = clib_error_return (0, "failed: unexpected event(1): %d",
@@ -1056,7 +1058,7 @@ parse_config:
ecm->ready_connections);
goto cleanup;
- case 2:
+ case EC_CLI_TEST_DONE:
ecm->test_end_time = vlib_time_now (vm);
ec_cli ("Test finished at %.6f", ecm->test_end_time);
break;
@@ -1096,17 +1098,17 @@ cleanup:
/*
* Cleanup
*/
- ecm->run_test = ECHO_CLIENTS_EXITING;
+ ecm->run_test = EC_EXITING;
vlib_process_wait_for_event_or_clock (vm, 10e-3);
/* Detach the application, so we can use different fifo sizes next time */
- if (echo_clients_detach ())
+ if (ec_detach ())
{
error = clib_error_return (0, "failed: app detach");
ec_cli ("WARNING: app detach failed...");
}
- echo_clients_cleanup (ecm);
+ ec_cleanup (ecm);
if (had_config)
unformat_free (line_input);
@@ -1116,27 +1118,27 @@ cleanup:
return error;
}
-VLIB_CLI_COMMAND (echo_clients_command, static) =
-{
+VLIB_CLI_COMMAND (ec_command, static) = {
.path = "test echo clients",
- .short_help = "test echo clients [nclients %d][[m|g]bytes <bytes>]"
- "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]"
- "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
- "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
- "[uri <tcp://ip/port>][test-bytes][no-output]",
- .function = echo_clients_command_fn,
+ .short_help =
+ "test echo clients [nclients %d][[m|g]bytes <bytes>]"
+ "[test-timeout <time>][syn-timeout <time>][no-return][fifo-size <size>]"
+ "[private-segment-count <count>][private-segment-size <bytes>[m|g]]"
+ "[preallocate-fifos][preallocate-sessions][client-batch <batch-size>]"
+ "[uri <tcp://ip/port>][test-bytes][no-output]",
+ .function = ec_command_fn,
.is_mp_safe = 1,
};
clib_error_t *
-echo_clients_main_init (vlib_main_t * vm)
+ec_main_init (vlib_main_t *vm)
{
- echo_client_main_t *ecm = &echo_client_main;
+ ec_main_t *ecm = &ec_main;
ecm->app_is_init = 0;
return 0;
}
-VLIB_INIT_FUNCTION (echo_clients_main_init);
+VLIB_INIT_FUNCTION (ec_main_init);
/*
* fd.io coding-style-patch-verification: ON
diff --git a/src/plugins/hs_apps/echo_client.h b/src/plugins/hs_apps/echo_client.h
index a0e844d7e6d..b5f6c981f7b 100644
--- a/src/plugins/hs_apps/echo_client.h
+++ b/src/plugins/hs_apps/echo_client.h
@@ -18,42 +18,39 @@
#ifndef __included_echo_client_h__
#define __included_echo_client_h__
-#include <vnet/vnet.h>
-#include <vnet/ip/ip.h>
-#include <vnet/ethernet/ethernet.h>
-
-#include <vppinfra/hash.h>
-#include <vppinfra/error.h>
#include <vnet/session/session.h>
#include <vnet/session/application_interface.h>
-typedef struct
+typedef struct ec_session_
{
CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
app_session_t data;
+ u32 vpp_session_index;
+ u32 thread_index;
u64 bytes_to_send;
u64 bytes_sent;
u64 bytes_to_receive;
u64 bytes_received;
u64 vpp_session_handle;
- u8 thread_index;
-} eclient_session_t;
+} ec_session_t;
+
+typedef struct ec_worker_
+{
+ CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
+ ec_session_t *sessions; /**< session pool */
+ u8 *rx_buf; /**< prealloced rx buffer */
+ u32 *conn_indices; /**< sessions handled by worker */
+ u32 *conns_this_batch; /**< sessions handled in batch */
+ svm_msg_q_t *vpp_event_queue; /**< session layer worker mq */
+ u32 thread_index; /**< thread index for worker */
+} ec_worker_t;
typedef struct
{
- /*
- * Test state variables
- */
- eclient_session_t *sessions; /**< Session pool, shared */
- clib_spinlock_t sessions_lock; /**< Session pool lock */
- u8 **rx_buf; /**< intermediate rx buffers */
+ ec_worker_t *wrk; /**< Per-thread state */
u8 *connect_test_data; /**< Pre-computed test data */
- u32 **quic_session_index_by_thread;
- u32 **connection_index_by_thread;
- u32 **connections_this_batch_by_thread; /**< active connection batch */
volatile u32 ready_connections;
- volatile u32 finished_connections;
volatile u64 rx_total;
volatile u64 tx_total;
volatile int run_test; /**< Signal start of test */
@@ -64,16 +61,14 @@ typedef struct
u32 prev_conns;
u32 repeats;
- u32 connect_conn_index; /**< Conencts attempted progress */
+ u32 connect_conn_index; /**< Connects attempted progress */
/*
* Application setup parameters
*/
- svm_msg_q_t **vpp_event_queue;
u32 cli_node_index; /**< cli process node index */
u32 app_index; /**< app index after attach */
- pthread_t client_thread_handle;
/*
* Configuration params
@@ -116,19 +111,23 @@ typedef struct
u8 barrier_acq_needed;
vlib_main_t *vlib_main;
-} echo_client_main_t;
+} ec_main_t;
-enum
+typedef enum ec_state_
{
- ECHO_CLIENTS_STARTING,
- ECHO_CLIENTS_RUNNING,
- ECHO_CLIENTS_EXITING
-} echo_clients_test_state_e;
+ EC_STARTING,
+ EC_RUNNING,
+ EC_EXITING
+} ec_state_t;
-extern echo_client_main_t echo_client_main;
-vlib_node_registration_t echo_clients_node;
+typedef enum ec_cli_signal_
+{
+ EC_CLI_CONNECTS_DONE = 1,
+ EC_CLI_CONNECTS_FAILED,
+ EC_CLI_TEST_DONE
+} ec_cli_signal_t;
-void echo_clients_program_connects (void);
+void ec_program_connects (void);
#endif /* __included_echo_client_h__ */