From 3cbc04bea02fc60471dfe0c671ede3ca42c118c3 Mon Sep 17 00:00:00 2001 From: Florin Coras Date: Mon, 2 Oct 2017 00:18:51 -0700 Subject: udp: refactor udp code Change-Id: I44d5c9df7c49b8d4d5677c6d319033b2da3e6b80 Signed-off-by: Florin Coras --- src/scripts/vnet/uri/dummy_app.py | 91 ++++-- src/scripts/vnet/uri/udp | 6 + src/uri/uri_tcp_test.c | 7 +- src/uri/uri_udp_test.c | 204 +++++++++++--- src/uri/vppcom.c | 2 - src/vnet.am | 2 +- src/vnet/ip/ip.c | 57 +++- src/vnet/ip/ip.h | 1 + src/vnet/session/application.c | 7 +- src/vnet/session/application_interface.c | 32 ++- src/vnet/session/application_interface.h | 21 +- src/vnet/session/segment_manager.c | 2 +- src/vnet/session/session.c | 461 ++++++++++++++++++++++--------- src/vnet/session/session.h | 149 ++++++++-- src/vnet/session/session_api.c | 23 +- src/vnet/session/session_cli.c | 21 +- src/vnet/session/session_lookup.c | 96 +++++-- src/vnet/session/session_lookup.h | 16 +- src/vnet/session/session_node.c | 20 +- src/vnet/session/session_table.h | 1 + src/vnet/session/session_test.c | 25 +- src/vnet/session/stream_session.h | 1 + src/vnet/session/transport.c | 306 ++++++++++++++++++++ src/vnet/session/transport.h | 34 ++- src/vnet/session/transport_interface.c | 109 -------- src/vnet/session/transport_interface.h | 18 +- src/vnet/tcp/builtin_client.c | 10 +- src/vnet/tcp/builtin_http_server.c | 5 +- src/vnet/tcp/builtin_proxy.c | 17 +- src/vnet/tcp/builtin_server.c | 5 +- src/vnet/tcp/tcp.c | 277 ++++--------------- src/vnet/tcp/tcp.h | 10 - src/vnet/tcp/tcp_input.c | 92 +++--- src/vnet/tcp/tcp_test.c | 14 +- src/vnet/udp/builtin_server.c | 1 - src/vnet/udp/udp.c | 393 ++++++++++++++------------ src/vnet/udp/udp.h | 177 ++++++++---- src/vnet/udp/udp_error.def | 6 + src/vnet/udp/udp_input.c | 348 +++++++++++------------ src/vnet/udp/udp_local.c | 117 ++++---- 40 files changed, 1941 insertions(+), 1243 deletions(-) mode change 100644 => 100755 src/scripts/vnet/uri/dummy_app.py create mode 100644 src/vnet/session/transport.c delete mode 100644 src/vnet/session/transport_interface.c (limited to 'src') diff --git a/src/scripts/vnet/uri/dummy_app.py b/src/scripts/vnet/uri/dummy_app.py old mode 100644 new mode 100755 index ff00f2fc8c6..85ea4dcd474 --- a/src/scripts/vnet/uri/dummy_app.py +++ b/src/scripts/vnet/uri/dummy_app.py @@ -3,6 +3,7 @@ import socket import sys import time +import argparse # action can be reflect or drop action = "drop" @@ -32,37 +33,52 @@ def handle_connection (connection, client_address): connection.sendall(data) finally: connection.close() - -def run_server(ip, port): - print("Starting server {}:{}".format(repr(ip), repr(port))) +def run_tcp_server(ip, port): + print("Starting TCP server {}:{}".format(repr(ip), repr(port))) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = (ip, int(port)) sock.bind(server_address) sock.listen(1) - while True: connection, client_address = sock.accept() handle_connection (connection, client_address) +def run_udp_server(ip, port): + print("Starting UDP server {}:{}".format(repr(ip), repr(port))) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_address = (ip, int(port)) + sock.bind(server_address) + while True: + data, addr = sock.recvfrom(4096) + if (action != "drop"): + #snd_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto (data, addr) -def prepare_data(): +def run_server(ip, port, proto): + if (proto == "tcp"): + run_tcp_server(ip, port) + elif (proto == "udp"): + run_udp_server(ip, port) + +def prepare_data(power): buf = [] - for i in range (0, pow(2, 16)): + for i in range (0, pow(2, power)): buf.append(i & 0xff) return bytearray(buf) -def run_client(ip, port): - print("Starting client {}:{}".format(repr(ip), repr(port))) +def run_tcp_client(ip, port): + print("Starting TCP client {}:{}".format(repr(ip), repr(port))) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_address = (ip, port) + server_address = (ip, int(port)) sock.connect(server_address) - - data = prepare_data() + + data = prepare_data(16) n_rcvd = 0 n_sent = len (data) try: sock.sendall(data) - + timeout = time.time() + 2 while n_rcvd < n_sent and time.time() < timeout: tmp = sock.recv(1500) @@ -73,28 +89,53 @@ def run_client(ip, port): print("Difference at byte {}. Sent {} got {}" .format(n_rcvd + i, data[n_rcvd + i], tmp[i])) n_rcvd += n_read - + if (n_rcvd < n_sent or n_rcvd > n_sent): print("Sent {} and got back {}".format(n_sent, n_rcvd)) else: print("Got back what we've sent!!"); - + finally: sock.close() - -def run(mode, ip, port): +def run_udp_client(ip, port): + print("Starting UDP client {}:{}".format(repr(ip), repr(port))) + n_packets = 100 + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_address = (ip, int(port)) + data = prepare_data(10) + try: + for i in range (0, n_packets): + sock.sendto(data, server_address) + finally: + sock.close() +def run_client(ip, port, proto): + if (proto == "tcp"): + run_tcp_client(ip, port) + elif (proto == "udp"): + run_udp_client(ip, port) +def run(mode, ip, port, proto): if (mode == "server"): - run_server (ip, port) + run_server (ip, port, proto) elif (mode == "client"): - run_client (ip, port) + run_client (ip, port, proto) else: raise Exception("Unknown mode. Only client and server supported") if __name__ == "__main__": - if (len(sys.argv)) < 4: - raise Exception("Usage: ./dummy_app [ ]") - if (len(sys.argv) == 6): - action = sys.argv[4] - test = int(sys.argv[5]) - - run (sys.argv[1], sys.argv[2], int(sys.argv[3])) + parser = argparse.ArgumentParser() + parser.add_argument('-m', action='store', dest='mode') + parser.add_argument('-i', action='store', dest='ip') + parser.add_argument('-p', action='store', dest='port') + parser.add_argument('-proto', action='store', dest='proto') + parser.add_argument('-a', action='store', dest='action') + parser.add_argument('-t', action='store', dest='test') + results = parser.parse_args() + action = results.action + test = results.test + run(results.mode, results.ip, results.port, results.proto) + #if (len(sys.argv)) < 4: + # raise Exception("Usage: ./dummy_app [ ]") + #if (len(sys.argv) == 6): + # action = sys.argv[4] + # test = int(sys.argv[5]) + #run (sys.argv[1], sys.argv[2], int(sys.argv[3])) diff --git a/src/scripts/vnet/uri/udp b/src/scripts/vnet/uri/udp index c7628f49b13..3ab4292fce6 100644 --- a/src/scripts/vnet/uri/udp +++ b/src/scripts/vnet/uri/udp @@ -1,6 +1,12 @@ loop create set int ip address loop0 6.0.0.1/32 set int state loop0 up +set int state GigabitEthernet1b/0/0 up +set int ip address GigabitEthernet1b/0/0 192.168.1.1/24 + +create host-interface name vpp1 +set int state host-vpp1 up +set int ip address host-vpp1 6.0.1.1/24 packet-generator new { name udp diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c index 41d3d4c1f42..89f070f72cd 100755 --- a/src/uri/uri_tcp_test.c +++ b/src/uri/uri_tcp_test.c @@ -327,6 +327,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) svm_fifo_segment_create_args_t _a, *a = &_a; int rv; + memset (a, 0, sizeof (*a)); a->segment_name = (char *) mp->segment_name; a->segment_size = mp->segment_size; /* Attach to the segment vpp created */ @@ -590,7 +591,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid, u32 bytes_to_snd; u32 queue_max_chunk = 128 << 10, actual_write; session_fifo_event_t evt; - static int serial_number = 0; int rv; bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes; @@ -615,7 +615,6 @@ send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid, /* Fabricate TX event, send to vpp */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = serial_number++; unix_shared_memory_queue_add (utm->vpp_event_queue, (u8 *) & evt, @@ -918,6 +917,7 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) memset (rmp, 0, sizeof (*rmp)); rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY); rmp->handle = mp->handle; + rmp->context = mp->context; vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp); session->bytes_received = 0; @@ -983,7 +983,6 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm, /* Fabricate TX event, send to vpp */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = e->event_id; q = utm->vpp_event_queue; unix_shared_memory_queue_add (q, (u8 *) & evt, @@ -997,7 +996,7 @@ server_handle_fifo_event_rx (uri_tcp_test_main_t * utm, void server_handle_event_queue (uri_tcp_test_main_t * utm) { - session_fifo_event_t _e, *e = &_e;; + session_fifo_event_t _e, *e = &_e; while (1) { diff --git a/src/uri/uri_udp_test.c b/src/uri/uri_udp_test.c index d559d5726c7..27e70cf944c 100644 --- a/src/uri/uri_udp_test.c +++ b/src/uri/uri_udp_test.c @@ -50,6 +50,7 @@ typedef enum { STATE_START, + STATE_BOUND, STATE_READY, STATE_FAILED, STATE_DISCONNECTING, @@ -97,6 +98,7 @@ typedef struct /* $$$$ hack: cut-through session index */ volatile u32 cut_through_session_index; + volatile u32 connected_session; /* unique segment name counter */ u32 unique_segment_index; @@ -123,6 +125,7 @@ typedef struct /* convenience */ svm_fifo_segment_main_t *segment_main; + u8 *connect_test_data; } uri_udp_test_main_t; #if CLIB_DEBUG > 0 @@ -163,7 +166,7 @@ void application_send_attach (uri_udp_test_main_t * utm) { vl_api_application_attach_t *bmp; - u32 fifo_size = 3 << 20; + u32 fifo_size = 1 << 20; bmp = vl_msg_api_alloc (sizeof (*bmp)); memset (bmp, 0, sizeof (*bmp)); @@ -172,11 +175,12 @@ application_send_attach (uri_udp_test_main_t * utm) bmp->context = ntohl (0xfeedface); bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT | APP_OPTIONS_FLAGS_ADD_SEGMENT; - bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16; + bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 2; bmp->options[SESSION_OPTIONS_RX_FIFO_SIZE] = fifo_size; bmp->options[SESSION_OPTIONS_TX_FIFO_SIZE] = fifo_size; bmp->options[SESSION_OPTIONS_ADD_SEGMENT_SIZE] = 128 << 20; bmp->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 20; + bmp->options[APP_EVT_QUEUE_SIZE] = 16768; vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & bmp); } @@ -348,7 +352,7 @@ udp_client_connect (uri_udp_test_main_t * utm) } static void -client_send (uri_udp_test_main_t * utm, session_t * session) +client_send_cut_through (uri_udp_test_main_t * utm, session_t * session) { int i; u8 *test_data = 0; @@ -391,7 +395,6 @@ client_send (uri_udp_test_main_t * utm, session_t * session) } bytes_to_read = svm_fifo_max_dequeue (rx_fifo); - bytes_to_read = vec_len (utm->rx_buf) > bytes_to_read ? bytes_to_read : vec_len (utm->rx_buf); @@ -451,7 +454,114 @@ client_send (uri_udp_test_main_t * utm, session_t * session) } static void -uri_udp_client_test (uri_udp_test_main_t * utm) +send_test_chunk (uri_udp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid, + u32 bytes) +{ + u8 *test_data = utm->connect_test_data; + u64 bytes_sent = 0; + int test_buf_offset = 0; + u32 bytes_to_snd; + u32 queue_max_chunk = 128 << 10, actual_write; + session_fifo_event_t evt; + int rv; + + bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes; + if (bytes_to_snd > vec_len (test_data)) + bytes_to_snd = vec_len (test_data); + + while (bytes_to_snd > 0 && !utm->time_to_stop) + { + actual_write = (bytes_to_snd > queue_max_chunk) ? + queue_max_chunk : bytes_to_snd; + rv = svm_fifo_enqueue_nowait (tx_fifo, actual_write, + test_data + test_buf_offset); + + if (rv > 0) + { + bytes_to_snd -= rv; + test_buf_offset += rv; + bytes_sent += rv; + + if (svm_fifo_set_event (tx_fifo)) + { + /* Fabricate TX event, send to vpp */ + evt.fifo = tx_fifo; + evt.event_type = FIFO_EVENT_APP_TX; + + unix_shared_memory_queue_add (utm->vpp_event_queue, + (u8 *) & evt, + 0 /* do wait for mutex */ ); + } + } + } +} + +static void +recv_test_chunk (uri_udp_test_main_t * utm, session_t * session) +{ + svm_fifo_t *rx_fifo; + int buffer_offset, bytes_to_read = 0, rv; + + rx_fifo = session->server_rx_fifo; + bytes_to_read = svm_fifo_max_dequeue (rx_fifo); + bytes_to_read = + vec_len (utm->rx_buf) > bytes_to_read ? + bytes_to_read : vec_len (utm->rx_buf); + + buffer_offset = 0; + while (bytes_to_read > 0) + { + rv = svm_fifo_dequeue_nowait (rx_fifo, bytes_to_read, + utm->rx_buf + buffer_offset); + if (rv > 0) + { + bytes_to_read -= rv; + buffer_offset += rv; + } + } +} + +void +client_send_data (uri_udp_test_main_t * utm) +{ + u8 *test_data; + int mypid = getpid (); + session_t *session; + svm_fifo_t *tx_fifo; + u32 n_iterations; + int i; + + vec_validate (utm->connect_test_data, 64 * 1024 - 1); + for (i = 0; i < vec_len (utm->connect_test_data); i++) + utm->connect_test_data[i] = i & 0xff; + + test_data = utm->connect_test_data; + session = pool_elt_at_index (utm->sessions, utm->connected_session); + tx_fifo = session->server_tx_fifo; + + ASSERT (vec_len (test_data) > 0); + + vec_validate (utm->rx_buf, vec_len (test_data) - 1); + n_iterations = NITER; + + for (i = 0; i < n_iterations; i++) + { + send_test_chunk (utm, tx_fifo, mypid, 0); + recv_test_chunk (utm, session); + if (utm->time_to_stop) + break; + } + + f64 timeout = clib_time_now (&utm->clib_time) + 5; + while (clib_time_now (&utm->clib_time) < timeout) + { + recv_test_chunk (utm, session); + } + +} + +static void +client_test (uri_udp_test_main_t * utm) { session_t *session; @@ -464,10 +574,18 @@ uri_udp_client_test (uri_udp_test_main_t * utm) return; } - /* Only works with cut through sessions */ - session = pool_elt_at_index (utm->sessions, utm->cut_through_session_index); + if (utm->cut_through_session_index != ~0) + { + session = pool_elt_at_index (utm->sessions, + utm->cut_through_session_index); + client_send_cut_through (utm, session); + } + else + { + session = pool_elt_at_index (utm->sessions, utm->connected_session); + client_send_data (utm); + } - client_send (utm, session); application_detach (utm); } @@ -483,7 +601,7 @@ vl_api_bind_uri_reply_t_handler (vl_api_bind_uri_reply_t * mp) return; } - utm->state = STATE_READY; + utm->state = STATE_BOUND; } static void @@ -492,6 +610,7 @@ vl_api_map_another_segment_t_handler (vl_api_map_another_segment_t * mp) svm_fifo_segment_create_args_t _a, *a = &_a; int rv; + memset (a, 0, sizeof (*a)); a->segment_name = (char *) mp->segment_name; a->segment_size = mp->segment_size; /* Attach to the segment vpp created */ @@ -625,8 +744,6 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) hash_set (utm->session_index_by_vpp_handles, mp->handle, session - utm->sessions); - utm->state = STATE_READY; - if (pool_elts (utm->sessions) && (pool_elts (utm->sessions) % 20000) == 0) { f64 now = clib_time_now (&utm->clib_time); @@ -639,7 +756,11 @@ vl_api_accept_session_t_handler (vl_api_accept_session_t * mp) memset (rmp, 0, sizeof (*rmp)); rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY); rmp->handle = mp->handle; + rmp->context = mp->context; vl_msg_api_send_shmem (utm->vl_input_queue, (u8 *) & rmp); + + CLIB_MEMORY_BARRIER (); + utm->state = STATE_READY; } static void @@ -677,16 +798,22 @@ static void vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp) { uri_udp_test_main_t *utm = &uri_udp_test_main; + session_t *session; ASSERT (utm->i_am_master == 0); + if (mp->retval) + { + clib_warning ("failed connect"); + return; + } + /* We've been redirected */ if (mp->segment_name_length > 0) { svm_fifo_segment_main_t *sm = &svm_fifo_segment_main; svm_fifo_segment_create_args_t _a, *a = &_a; u32 segment_index; - session_t *session; svm_fifo_segment_private_t *seg; int rv; @@ -707,20 +834,24 @@ vl_api_connect_session_reply_t_handler (vl_api_connect_session_reply_t * mp) vec_add2 (utm->seg, seg, 1); memcpy (seg, sm->segments + segment_index, sizeof (*seg)); sleep (1); - - pool_get (utm->sessions, session); - utm->cut_through_session_index = session - utm->sessions; - - session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo, - svm_fifo_t *); - ASSERT (session->server_rx_fifo); - session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo, - svm_fifo_t *); - ASSERT (session->server_tx_fifo); } - /* security: could unlink /dev/shm/segment_name> here, maybe */ + pool_get (utm->sessions, session); + session->server_rx_fifo = uword_to_pointer (mp->server_rx_fifo, + svm_fifo_t *); + ASSERT (session->server_rx_fifo); + session->server_tx_fifo = uword_to_pointer (mp->server_tx_fifo, + svm_fifo_t *); + ASSERT (session->server_tx_fifo); + if (mp->segment_name_length > 0) + utm->cut_through_session_index = session - utm->sessions; + else + { + utm->connected_session = session - utm->sessions; + utm->vpp_event_queue = uword_to_pointer (mp->vpp_event_queue_address, + unix_shared_memory_queue_t *); + } utm->state = STATE_READY; } @@ -789,13 +920,13 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm, { svm_fifo_t *rx_fifo, *tx_fifo; int nbytes; - session_fifo_event_t evt; unix_shared_memory_queue_t *q; int rv; rx_fifo = e->fifo; tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo; + svm_fifo_unset_event (rx_fifo); do { @@ -809,13 +940,11 @@ server_handle_fifo_event_rx (uri_udp_test_main_t * utm, } while (rv == -2); - /* Fabricate TX event, send to vpp */ - evt.fifo = tx_fifo; - evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = e->event_id; - if (svm_fifo_set_event (tx_fifo)) { + /* Fabricate TX event, send to vpp */ + evt.fifo = tx_fifo; + evt.event_type = FIFO_EVENT_APP_TX; q = utm->vpp_event_queue; unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ ); @@ -827,6 +956,9 @@ server_handle_event_queue (uri_udp_test_main_t * utm) { session_fifo_event_t _e, *e = &_e; + while (utm->state != STATE_READY) + sleep (5); + while (1) { unix_shared_memory_queue_sub (utm->our_event_queue, (u8 *) e, @@ -845,7 +977,7 @@ server_handle_event_queue (uri_udp_test_main_t * utm) break; } if (PREDICT_FALSE (utm->time_to_stop == 1)) - break; + return; if (PREDICT_FALSE (utm->time_to_print_stats == 1)) { utm->time_to_print_stats = 0; @@ -869,7 +1001,7 @@ server_unbind (uri_udp_test_main_t * utm) } static void -server_listen (uri_udp_test_main_t * utm) +server_bind (uri_udp_test_main_t * utm) { vl_api_bind_uri_t *bmp; @@ -890,11 +1022,11 @@ udp_server_test (uri_udp_test_main_t * utm) application_send_attach (utm); /* Bind to uri */ - server_listen (utm); + server_bind (utm); - if (wait_for_state_change (utm, STATE_READY)) + if (wait_for_state_change (utm, STATE_BOUND)) { - clib_warning ("timeout waiting for STATE_READY"); + clib_warning ("timeout waiting for STATE_BOUND"); return; } @@ -976,7 +1108,7 @@ main (int argc, char **argv) utm->i_am_master = i_am_master; utm->segment_main = &svm_fifo_segment_main; - utm->connect_uri = format (0, "udp://6.0.0.1/1234%c", 0); + utm->connect_uri = format (0, "udp://6.0.1.2/1234%c", 0); setup_signal_handlers (); @@ -991,7 +1123,7 @@ main (int argc, char **argv) if (i_am_master == 0) { - uri_udp_client_test (utm); + client_test (utm); exit (0); } diff --git a/src/uri/vppcom.c b/src/uri/vppcom.c index a8e3a5005ba..f0bd2f86c64 100644 --- a/src/uri/vppcom.c +++ b/src/uri/vppcom.c @@ -136,7 +136,6 @@ typedef struct vppcom_main_t_ u8 init; u32 *client_session_index_fifo; volatile u32 bind_session_index; - u32 tx_event_id; int main_cpu; /* vpe input queue */ @@ -2328,7 +2327,6 @@ vppcom_session_write (uint32_t session_index, void *buf, int n) /* Fabricate TX event, send to vpp */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = vcm->tx_event_id++; rval = vppcom_session_at_index (session_index, &session); if (PREDICT_FALSE (rval)) diff --git a/src/vnet.am b/src/vnet.am index 520bee45727..97964c5132e 100644 --- a/src/vnet.am +++ b/src/vnet.am @@ -877,7 +877,7 @@ libvnet_la_SOURCES += \ vnet/session/session_table.c \ vnet/session/session_lookup.c \ vnet/session/session_node.c \ - vnet/session/transport_interface.c \ + vnet/session/transport.c \ vnet/session/application.c \ vnet/session/session_cli.c \ vnet/session/application_interface.c \ diff --git a/src/vnet/ip/ip.c b/src/vnet/ip/ip.c index caa553dd6d7..bd9706b846d 100644 --- a/src/vnet/ip/ip.c +++ b/src/vnet/ip/ip.c @@ -63,6 +63,24 @@ ip_is_local (u32 fib_index, ip46_address_t * ip46_address, u8 is_ip4) return (flags & FIB_ENTRY_FLAG_LOCAL); } +void +ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4) +{ + if (is_ip4) + dst->ip4.as_u32 = src->ip4.as_u32; + else + clib_memcpy (&dst->ip6, &src->ip6, sizeof (ip6_address_t)); +} + +void +ip_set (ip46_address_t * dst, void *src, u8 is_ip4) +{ + if (is_ip4) + dst->ip4.as_u32 = ((ip4_address_t *) src)->as_u32; + else + clib_memcpy (&dst->ip6, (ip6_address_t *) src, sizeof (ip6_address_t)); +} + u8 ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4) { @@ -97,22 +115,37 @@ ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4) return 0; } -void -ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4) +void * +ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4) { - if (is_ip4) - dst->ip4.as_u32 = src->ip4.as_u32; - else - clib_memcpy (&dst->ip6, &src->ip6, sizeof (ip6_address_t)); -} + ip_lookup_main_t *lm4 = &ip4_main.lookup_main; + ip_lookup_main_t *lm6 = &ip6_main.lookup_main; + ip_interface_address_t *ia = 0; -void -ip_set (ip46_address_t * dst, void *src, u8 is_ip4) -{ if (is_ip4) - dst->ip4.as_u32 = ((ip4_address_t *) src)->as_u32; + { + /* *INDENT-OFF* */ + foreach_ip_interface_address (lm4, ia, sw_if_index, 1 /* unnumbered */ , + ({ + return ip_interface_address_get_address (lm4, ia); + })); + /* *INDENT-ON* */ + } else - clib_memcpy (&dst->ip6, (ip6_address_t *) src, sizeof (ip6_address_t)); + { + /* *INDENT-OFF* */ + foreach_ip_interface_address (lm6, ia, sw_if_index, 1 /* unnumbered */ , + ({ + ip6_address_t *rv; + rv = ip_interface_address_get_address (lm6, ia); + /* Trying to use a link-local ip6 src address is a fool's errand */ + if (!ip6_address_is_link_local_unicast (rv)) + return rv; + })); + /* *INDENT-ON* */ + } + + return 0; } /* diff --git a/src/vnet/ip/ip.h b/src/vnet/ip/ip.h index 3b3a465d042..9387ba39099 100644 --- a/src/vnet/ip/ip.h +++ b/src/vnet/ip/ip.h @@ -198,6 +198,7 @@ u8 ip_is_local (u32 fib_index, ip46_address_t * ip46_address, u8 is_ip4); u8 ip_interface_has_address (u32 sw_if_index, ip46_address_t * ip, u8 is_ip4); void ip_copy (ip46_address_t * dst, ip46_address_t * src, u8 is_ip4); void ip_set (ip46_address_t * dst, void *src, u8 is_ip4); +void *ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4); #endif /* included_ip_main_h */ diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c index 75d3cfb2e33..c6fd1197304 100644 --- a/src/vnet/session/application.c +++ b/src/vnet/session/application.c @@ -415,7 +415,6 @@ application_open_session (application_t * app, session_endpoint_t * sep, u32 api_context) { segment_manager_t *sm; - transport_connection_t *tc = 0; int rv; /* Make sure we have a segment manager for connects */ @@ -427,13 +426,9 @@ application_open_session (application_t * app, session_endpoint_t * sep, app->connects_seg_manager = segment_manager_index (sm); } - if ((rv = stream_session_open (app->index, sep, &tc))) + if ((rv = session_open (app->index, sep, api_context))) return rv; - /* Store api_context for when the reply comes. Not the nicest thing - * but better than allocating a separate half-open pool. */ - tc->s_index = api_context; - return 0; } diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c index a0dff90565a..8599c74fe46 100644 --- a/src/vnet/session/application_interface.c +++ b/src/vnet/session/application_interface.c @@ -92,7 +92,8 @@ static int vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle) { application_t *app; - u32 table_index, listener_index; + u32 table_index; + u64 listener; int rv, have_local = 0; app = application_get_if_valid (app_index); @@ -108,8 +109,8 @@ vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle) table_index = application_session_table (app, session_endpoint_fib_proto (sep)); - listener_index = session_lookup_session_endpoint (table_index, sep); - if (listener_index != SESSION_INVALID_INDEX) + listener = session_lookup_session_endpoint (table_index, sep); + if (listener != SESSION_INVALID_HANDLE) return VNET_API_ERROR_ADDRESS_IN_USE; /* @@ -119,8 +120,8 @@ vnet_bind_i (u32 app_index, session_endpoint_t * sep, u64 * handle) if (application_has_local_scope (app) && session_endpoint_is_zero (sep)) { table_index = application_local_session_table (app); - listener_index = session_lookup_session_endpoint (table_index, sep); - if (listener_index != SESSION_INVALID_INDEX) + listener = session_lookup_session_endpoint (table_index, sep); + if (listener != SESSION_INVALID_HANDLE) return VNET_API_ERROR_ADDRESS_IN_USE; session_lookup_add_session_endpoint (table_index, sep, app->index); *handle = session_lookup_local_listener_make_handle (sep); @@ -206,6 +207,7 @@ vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep, { application_t *server, *app; u32 table_index; + stream_session_t *listener; if (session_endpoint_is_zero (sep)) return VNET_API_ERROR_INVALID_VALUE; @@ -243,10 +245,13 @@ vnet_connect_i (u32 app_index, u32 api_context, session_endpoint_t * sep, table_index = application_session_table (app, session_endpoint_fib_proto (sep)); - app_index = session_lookup_session_endpoint (table_index, sep); - server = application_get (app_index); - if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT)) - return app_connect_redirect (server, mp); + listener = session_lookup_listener (table_index, sep); + if (listener) + { + server = application_get (listener->app_index); + if (server && (server->flags & APP_OPTIONS_FLAGS_ACCEPT_REDIRECT)) + return app_connect_redirect (server, mp); + } /* * Not connecting to a local server, propagate to transport @@ -470,14 +475,15 @@ vnet_unbind_uri (vnet_unbind_args_t * a) clib_error_t * vnet_connect_uri (vnet_connect_args_t * a) { - session_endpoint_t sep = SESSION_ENDPOINT_NULL; + session_endpoint_t sep_null = SESSION_ENDPOINT_NULL; int rv; /* Parse uri */ - rv = parse_uri (a->uri, &sep); + a->sep = sep_null; + rv = parse_uri (a->uri, &a->sep); if (rv) return clib_error_return_code (0, rv, 0, "app init: %d", rv); - if ((rv = vnet_connect_i (a->app_index, a->api_context, &sep, a->mp))) + if ((rv = vnet_connect_i (a->app_index, a->api_context, &a->sep, a->mp))) return clib_error_return_code (0, rv, 0, "connect failed"); return 0; } @@ -489,7 +495,7 @@ vnet_disconnect_session (vnet_disconnect_args_t * a) stream_session_t *s; session_parse_handle (a->handle, &index, &thread_index); - s = stream_session_get_if_valid (index, thread_index); + s = session_get_if_valid (index, thread_index); if (!s || s->app_index != a->app_index) return VNET_API_ERROR_INVALID_VALUE; diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h index 5e1fe8ee528..0251c3bc311 100644 --- a/src/vnet/session/application_interface.h +++ b/src/vnet/session/application_interface.h @@ -56,11 +56,7 @@ typedef struct _vnet_bind_args_t union { char *uri; - struct - { - session_endpoint_t sep; - transport_proto_t proto; - }; + session_endpoint_t sep; }; u32 app_index; @@ -86,23 +82,14 @@ typedef struct _vnet_unbind_args_t typedef struct _vnet_connect_args { - union - { - char *uri; - struct - { - session_endpoint_t sep; - transport_proto_t proto; - }; - }; + char *uri; + session_endpoint_t sep; u32 app_index; u32 api_context; /* Used for redirects */ void *mp; - - /* used for proxy connections */ - u64 server_handle; + u64 session_handle; } vnet_connect_args_t; typedef struct _vnet_disconnect_args_t diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c index f35dec72d88..cb83d8e53bc 100644 --- a/src/vnet/session/segment_manager.c +++ b/src/vnet/session/segment_manager.c @@ -273,7 +273,7 @@ segment_manager_del_sessions (segment_manager_t * sm) if (session->session_state != SESSION_STATE_CLOSED) { session->session_state = SESSION_STATE_CLOSED; - session_send_session_evt_to_thread (stream_session_handle + session_send_session_evt_to_thread (session_handle (session), FIFO_EVENT_DISCONNECT, thread_index); diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c index 88b38f15a61..7f28a3992ed 100644 --- a/src/vnet/session/session.c +++ b/src/vnet/session/session.c @@ -28,67 +28,151 @@ session_manager_main_t session_manager_main; extern transport_proto_vft_t *tp_vfts; -int -stream_session_create_i (segment_manager_t * sm, transport_connection_t * tc, - u8 alloc_fifos, stream_session_t ** ret_s) +static void +session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type, + u32 thread_index, void *fp, void *rpc_args) +{ + u32 tries = 0; + session_fifo_event_t evt = { {0}, }; + unix_shared_memory_queue_t *q; + + evt.event_type = evt_type; + if (evt_type == FIFO_EVENT_RPC) + { + evt.rpc_args.fp = fp; + evt.rpc_args.arg = rpc_args; + } + else + evt.session_handle = session_handle; + + q = session_manager_get_vpp_event_queue (thread_index); + while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1)) + { + if (tries++ == 3) + { + SESSION_DBG ("failed to enqueue evt"); + break; + } + } +} + +void +session_send_session_evt_to_thread (u64 session_handle, + fifo_event_type_t evt_type, + u32 thread_index) +{ + session_send_evt_to_thread (session_handle, evt_type, thread_index, 0, 0); +} + +void +session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args) +{ + if (thread_index != vlib_get_thread_index ()) + session_send_evt_to_thread (0, FIFO_EVENT_RPC, thread_index, fp, + rpc_args); + else + { + void (*fnp) (void *) = fp; + fnp (rpc_args); + } +} + +stream_session_t * +session_alloc (u32 thread_index) { session_manager_main_t *smm = &session_manager_main; + stream_session_t *s; + u8 will_expand = 0; + pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand, + CLIB_CACHE_LINE_BYTES); + /* If we have peekers, let them finish */ + if (PREDICT_FALSE (will_expand)) + { + clib_spinlock_lock_if_init (&smm->peekers_write_locks[thread_index]); + pool_get_aligned (session_manager_main.sessions[thread_index], s, + CLIB_CACHE_LINE_BYTES); + clib_spinlock_unlock_if_init (&smm->peekers_write_locks[thread_index]); + } + else + { + pool_get_aligned (session_manager_main.sessions[thread_index], s, + CLIB_CACHE_LINE_BYTES); + } + memset (s, 0, sizeof (*s)); + s->session_index = s - session_manager_main.sessions[thread_index]; + s->thread_index = thread_index; + return s; +} + +static void +session_free (stream_session_t * s) +{ + pool_put (session_manager_main.sessions[s->thread_index], s); + if (CLIB_DEBUG) + memset (s, 0xFA, sizeof (*s)); +} + +static int +session_alloc_fifos (segment_manager_t * sm, stream_session_t * s) +{ svm_fifo_t *server_rx_fifo = 0, *server_tx_fifo = 0; u32 fifo_segment_index; - u32 pool_index; - stream_session_t *s; - u64 value; - u32 thread_index = tc->thread_index; int rv; - ASSERT (thread_index == vlib_get_thread_index ()); + if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo, + &server_tx_fifo, + &fifo_segment_index))) + return rv; + /* Initialize backpointers */ + server_rx_fifo->master_session_index = s->session_index; + server_rx_fifo->master_thread_index = s->thread_index; - /* Create the session */ - pool_get_aligned (smm->sessions[thread_index], s, CLIB_CACHE_LINE_BYTES); - memset (s, 0, sizeof (*s)); - pool_index = s - smm->sessions[thread_index]; + server_tx_fifo->master_session_index = s->session_index; + server_tx_fifo->master_thread_index = s->thread_index; - /* Allocate fifos */ - if (alloc_fifos) - { - if ((rv = segment_manager_alloc_session_fifos (sm, &server_rx_fifo, - &server_tx_fifo, - &fifo_segment_index))) - { - pool_put (smm->sessions[thread_index], s); - return rv; - } - /* Initialize backpointers */ - server_rx_fifo->master_session_index = pool_index; - server_rx_fifo->master_thread_index = thread_index; + s->server_rx_fifo = server_rx_fifo; + s->server_tx_fifo = server_tx_fifo; + s->svm_segment_index = fifo_segment_index; + return 0; +} - server_tx_fifo->master_session_index = pool_index; - server_tx_fifo->master_thread_index = thread_index; +static stream_session_t * +session_alloc_for_connection (transport_connection_t * tc) +{ + stream_session_t *s; + u32 thread_index = tc->thread_index; - s->server_rx_fifo = server_rx_fifo; - s->server_tx_fifo = server_tx_fifo; - s->svm_segment_index = fifo_segment_index; - } + ASSERT (thread_index == vlib_get_thread_index ()); - /* Initialize state machine, such as it is... */ - s->session_type = session_type_from_proto_and_ip (tc->transport_proto, - tc->is_ip4); + s = session_alloc (thread_index); + s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4); s->session_state = SESSION_STATE_CONNECTING; s->thread_index = thread_index; - s->session_index = pool_index; - /* Attach transport to session */ + /* Attach transport to session and vice versa */ s->connection_index = tc->c_index; - - /* Attach session to transport */ tc->s_index = s->session_index; + return s; +} + +static int +session_alloc_and_init (segment_manager_t * sm, transport_connection_t * tc, + u8 alloc_fifos, stream_session_t ** ret_s) +{ + stream_session_t *s; + int rv; + + s = session_alloc_for_connection (tc); + if (alloc_fifos && (rv = session_alloc_fifos (sm, s))) + { + session_free (s); + return rv; + } /* Add to the main lookup table */ - value = stream_session_handle (s); - session_lookup_add_connection (tc, value); + session_lookup_add_connection (tc, session_handle (s)); *ret_s = s; - return 0; } @@ -217,8 +301,9 @@ session_enqueue_chain_tail (stream_session_t * s, vlib_buffer_t * b, * @return Number of bytes enqueued or a negative value if enqueueing failed. */ int -stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, - u32 offset, u8 queue_event, u8 is_in_order) +session_enqueue_stream_connection (transport_connection_t * tc, + vlib_buffer_t * b, u32 offset, + u8 queue_event, u8 is_in_order) { stream_session_t *s; int enqueued = 0, rv, in_order_off; @@ -257,12 +342,12 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, * by calling stream_server_flush_enqueue_events () */ session_manager_main_t *smm = vnet_get_session_manager_main (); u32 thread_index = s->thread_index; - u32 my_enqueue_epoch = smm->current_enqueue_epoch[thread_index]; + u32 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index]; - if (s->enqueue_epoch != my_enqueue_epoch) + if (s->enqueue_epoch != enqueue_epoch) { - s->enqueue_epoch = my_enqueue_epoch; - vec_add1 (smm->session_indices_to_enqueue_by_thread[thread_index], + s->enqueue_epoch = enqueue_epoch; + vec_add1 (smm->session_to_enqueue[tc->proto][thread_index], s - smm->sessions[thread_index]); } } @@ -270,6 +355,41 @@ stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, return enqueued; } +int +session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b, + u8 proto, u8 queue_event) +{ + int enqueued = 0, rv, in_order_off; + + if (svm_fifo_max_enqueue (s->server_rx_fifo) < b->current_length) + return -1; + enqueued = svm_fifo_enqueue_nowait (s->server_rx_fifo, b->current_length, + vlib_buffer_get_current (b)); + if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0)) + { + in_order_off = enqueued > b->current_length ? enqueued : 0; + rv = session_enqueue_chain_tail (s, b, in_order_off, 1); + if (rv > 0) + enqueued += rv; + } + if (queue_event) + { + /* Queue RX event on this fifo. Eventually these will need to be flushed + * by calling stream_server_flush_enqueue_events () */ + session_manager_main_t *smm = vnet_get_session_manager_main (); + u32 thread_index = s->thread_index; + u32 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index]; + + if (s->enqueue_epoch != enqueue_epoch) + { + s->enqueue_epoch = enqueue_epoch; + vec_add1 (smm->session_to_enqueue[proto][thread_index], + s - smm->sessions[thread_index]); + } + } + return enqueued; +} + /** Check if we have space in rx fifo to push more bytes */ u8 stream_session_no_space (transport_connection_t * tc, u32 thread_index, @@ -319,12 +439,11 @@ stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes) * @return 0 on succes or negative number if failed to send notification. */ static int -stream_session_enqueue_notify (stream_session_t * s, u8 block) +session_enqueue_notify (stream_session_t * s, u8 block) { application_t *app; session_fifo_event_t evt; unix_shared_memory_queue_t *q; - static u32 serial_number; if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED)) { @@ -354,7 +473,6 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block) /* Fabricate event */ evt.fifo = s->server_rx_fifo; evt.event_type = FIFO_EVENT_APP_RX; - evt.event_id = serial_number++; /* Add event to server's event queue */ q = app->event_queue; @@ -389,35 +507,25 @@ stream_session_enqueue_notify (stream_session_t * s, u8 block) * failures due to API queue being full. */ int -session_manager_flush_enqueue_events (u32 thread_index) +session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index) { session_manager_main_t *smm = &session_manager_main; - u32 *session_indices_to_enqueue; + u32 *indices; + stream_session_t *s; int i, errors = 0; - session_indices_to_enqueue = - smm->session_indices_to_enqueue_by_thread[thread_index]; + indices = smm->session_to_enqueue[transport_proto][thread_index]; - for (i = 0; i < vec_len (session_indices_to_enqueue); i++) + for (i = 0; i < vec_len (indices); i++) { - stream_session_t *s0; - - /* Get session */ - s0 = stream_session_get_if_valid (session_indices_to_enqueue[i], - thread_index); - if (s0 == 0 || stream_session_enqueue_notify (s0, 0 /* don't block */ )) - { - errors++; - } + s = session_get_if_valid (indices[i], thread_index); + if (s == 0 || session_enqueue_notify (s, 0 /* don't block */ )) + errors++; } - vec_reset_length (session_indices_to_enqueue); - - smm->session_indices_to_enqueue_by_thread[thread_index] = - session_indices_to_enqueue; - - /* Increment enqueue epoch for next round */ - smm->current_enqueue_epoch[thread_index]++; + vec_reset_length (indices); + smm->session_to_enqueue[transport_proto][thread_index] = indices; + smm->current_enqueue_epoch[transport_proto][thread_index]++; return errors; } @@ -438,22 +546,25 @@ stream_session_init_fifos_pointers (transport_connection_t * tc, } int -stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) +session_stream_connect_notify (transport_connection_t * tc, u8 is_fail) { application_t *app; stream_session_t *new_s = 0; u64 handle; u32 opaque = 0; int error = 0; + segment_manager_t *sm; + u8 alloc_fifos; + /* + * Find connection handle and cleanup half-open table + */ handle = session_lookup_half_open_handle (tc); if (handle == HALF_OPEN_LOOKUP_INVALID_VALUE) { SESSION_DBG ("half-open was removed!"); return -1; } - - /* Cleanup half-open table */ session_lookup_del_half_open (tc); /* Get the app's index from the handle we stored when opening connection @@ -462,17 +573,16 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) app = application_get_if_valid (handle >> 32); if (!app) return -1; - opaque = tc->s_index; + /* + * Allocate new session with fifos (svm segments are allocated if needed) + */ if (!is_fail) { - segment_manager_t *sm; - u8 alloc_fifos; sm = application_get_connect_segment_manager (app); alloc_fifos = application_is_proxy (app); - /* Create new session (svm segments are allocated if needed) */ - if (stream_session_create_i (sm, tc, alloc_fifos, &new_s)) + if (session_alloc_and_init (sm, tc, alloc_fifos, &new_s)) { is_fail = 1; error = -1; @@ -481,7 +591,9 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) new_s->app_index = app->index; } - /* Notify client application */ + /* + * Notify client application + */ if (app->cb_fns.session_connected_callback (app->index, opaque, new_s, is_fail)) { @@ -498,6 +610,67 @@ stream_session_connect_notify (transport_connection_t * tc, u8 is_fail) return error; } +typedef struct _session_switch_pool_args +{ + u32 session_index; + u32 thread_index; + u32 new_thread_index; + u32 new_session_index; +} session_switch_pool_args_t; + +static void +session_switch_pool (void *cb_args) +{ + session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args; + stream_session_t *s; + ASSERT (args->thread_index == vlib_get_thread_index ()); + s = session_get (args->session_index, args->thread_index); + s->server_tx_fifo->master_session_index = args->new_session_index; + s->server_tx_fifo->master_thread_index = args->new_thread_index; + tp_vfts[s->session_type].cleanup (s->connection_index, s->thread_index); + session_free (s); + clib_mem_free (cb_args); +} + +/** + * Move dgram session to the right thread + */ +int +session_dgram_connect_notify (transport_connection_t * tc, + u32 old_thread_index, + stream_session_t ** new_session) +{ + stream_session_t *new_s; + session_switch_pool_args_t *rpc_args; + + /* + * Clone half-open session to the right thread. + */ + new_s = session_clone_safe (tc->s_index, old_thread_index); + new_s->connection_index = tc->c_index; + new_s->server_rx_fifo->master_session_index = new_s->session_index; + new_s->server_rx_fifo->master_thread_index = new_s->thread_index; + new_s->session_state = SESSION_STATE_READY; + session_lookup_add_connection (tc, session_handle (new_s)); + + /* + * Ask thread owning the old session to clean it up and make us the tx + * fifo owner + */ + rpc_args = clib_mem_alloc (sizeof (*rpc_args)); + rpc_args->new_session_index = new_s->session_index; + rpc_args->new_thread_index = new_s->thread_index; + rpc_args->session_index = tc->s_index; + rpc_args->thread_index = old_thread_index; + session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool, + rpc_args); + + tc->s_index = new_s->session_index; + new_s->connection_index = tc->c_index; + *new_session = new_s; + return 0; +} + void stream_session_accept_notify (transport_connection_t * tc) { @@ -533,7 +706,6 @@ stream_session_disconnect_notify (transport_connection_t * tc) void stream_session_delete (stream_session_t * s) { - session_manager_main_t *smm = vnet_get_session_manager_main (); int rv; /* Delete from the main lookup table. */ @@ -543,10 +715,7 @@ stream_session_delete (stream_session_t * s) /* Cleanup fifo segments */ segment_manager_dealloc_fifos (s->svm_segment_index, s->server_rx_fifo, s->server_tx_fifo); - - pool_put (smm->sessions[s->thread_index], s); - if (CLIB_DEBUG) - memset (s, 0xFA, sizeof (*s)); + session_free (s); } /** @@ -563,7 +732,7 @@ stream_session_delete_notify (transport_connection_t * tc) stream_session_t *s; /* App might've been removed already */ - s = stream_session_get_if_valid (tc->s_index, tc->thread_index); + s = session_get_if_valid (tc->s_index, tc->thread_index); if (!s) return; stream_session_delete (s); @@ -596,14 +765,14 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index, session_type_t sst; int rv; - sst = session_type_from_proto_and_ip (tc->transport_proto, tc->is_ip4); + sst = session_type_from_proto_and_ip (tc->proto, tc->is_ip4); /* Find the server */ listener = listen_session_get (sst, listener_index); server = application_get (listener->app_index); sm = application_get_listen_segment_manager (server, listener); - if ((rv = stream_session_create_i (sm, tc, 1, &s))) + if ((rv = session_alloc_and_init (sm, tc, 1, &s))) return rv; s->app_index = server->index; @@ -629,14 +798,17 @@ stream_session_accept (transport_connection_t * tc, u32 listener_index, * @param app_index Index of the application requesting the connect * @param st Session type requested. * @param tep Remote transport endpoint - * @param res Resulting transport connection . + * @param opaque Opaque data (typically, api_context) the application expects + * on open completion. */ int -stream_session_open (u32 app_index, session_endpoint_t * rmt, - transport_connection_t ** res) +session_open (u32 app_index, session_endpoint_t * rmt, u32 opaque) { transport_connection_t *tc; session_type_t sst; + segment_manager_t *sm; + stream_session_t *s; + application_t *app; int rv; u64 handle; @@ -644,22 +816,45 @@ stream_session_open (u32 app_index, session_endpoint_t * rmt, rv = tp_vfts[sst].open (session_endpoint_to_transport (rmt)); if (rv < 0) { - clib_warning ("Transport failed to open connection."); + SESSION_DBG ("Transport failed to open connection."); return VNET_API_ERROR_SESSION_CONNECT; } tc = tp_vfts[sst].get_half_open ((u32) rv); - /* Save app and tc index. The latter is needed to help establish the - * connection while the former is needed when the connect notify comes - * and we have to notify the external app */ - handle = (((u64) app_index) << 32) | (u64) tc->c_index; - - /* Add to the half-open lookup table */ - session_lookup_add_half_open (tc, handle); + /* If transport offers a stream service, only allocate session once the + * connection has been established. + */ + if (transport_is_stream (rmt->transport_proto)) + { + /* Add connection to half-open table and save app and tc index. The + * latter is needed to help establish the connection while the former + * is needed when the connect notify comes and we have to notify the + * external app + */ + handle = (((u64) app_index) << 32) | (u64) tc->c_index; + session_lookup_add_half_open (tc, handle); + + /* Store api_context (opaque) for when the reply comes. Not the nicest + * thing but better than allocating a separate half-open pool. + */ + tc->s_index = opaque; + } + /* For dgram type of service, allocate session and fifos now. + */ + else + { + app = application_get (app_index); + sm = application_get_connect_segment_manager (app); - *res = tc; + if (session_alloc_and_init (sm, tc, 1, &s)) + return -1; + s->app_index = app->index; + s->session_state = SESSION_STATE_CONNECTING_READY; + /* Tell the app about the new event fifo for this session */ + app->cb_fns.session_connected_callback (app->index, opaque, s, 0); + } return 0; } @@ -672,14 +867,14 @@ stream_session_open (u32 app_index, session_endpoint_t * rmt, * @param tep Local endpoint to be listened on. */ int -stream_session_listen (stream_session_t * s, session_endpoint_t * tep) +stream_session_listen (stream_session_t * s, session_endpoint_t * sep) { transport_connection_t *tc; u32 tci; /* Transport bind/listen */ tci = tp_vfts[s->session_type].bind (s->session_index, - session_endpoint_to_transport (tep)); + session_endpoint_to_transport (sep)); if (tci == (u32) ~ 0) return -1; @@ -694,7 +889,6 @@ stream_session_listen (stream_session_t * s, session_endpoint_t * tep) /* Add to the main lookup table */ session_lookup_add_connection (tc, s->session_index); - return 0; } @@ -726,32 +920,6 @@ stream_session_stop_listen (stream_session_t * s) return 0; } -void -session_send_session_evt_to_thread (u64 session_handle, - fifo_event_type_t evt_type, - u32 thread_index) -{ - static u16 serial_number = 0; - u32 tries = 0; - session_fifo_event_t evt; - unix_shared_memory_queue_t *q; - - /* Fabricate event */ - evt.session_handle = session_handle; - evt.event_type = evt_type; - evt.event_id = serial_number++; - - q = session_manager_get_vpp_event_queue (thread_index); - while (unix_shared_memory_queue_add (q, (u8 *) & evt, 1)) - { - if (tries++ == 3) - { - TCP_DBG ("failed to enqueue evt"); - break; - } - } -} - /** * Disconnect session and propagate to transport. This should eventually * result in a delete notification that allows us to cleanup session state. @@ -837,6 +1005,21 @@ session_type_from_proto_and_ip (transport_proto_t proto, u8 is_ip4) return SESSION_N_TYPES; } +transport_connection_t * +session_get_transport (stream_session_t * s) +{ + if (s->session_state >= SESSION_STATE_READY) + return tp_vfts[s->session_type].get_connection (s->connection_index, + s->thread_index); + return 0; +} + +transport_connection_t * +listen_session_get_transport (stream_session_t * s) +{ + return tp_vfts[s->session_type].get_listener (s->connection_index); +} + int listen_session_get_local_session_endpoint (stream_session_t * listener, session_endpoint_t * sep) @@ -852,7 +1035,7 @@ listen_session_get_local_session_endpoint (stream_session_t * listener, /* N.B. The ip should not be copied because this is the local endpoint */ sep->port = tc->lcl_port; - sep->transport_proto = tc->transport_proto; + sep->transport_proto = tc->proto; sep->is_ip4 = tc->is_ip4; return 0; } @@ -864,7 +1047,7 @@ session_manager_main_enable (vlib_main_t * vm) vlib_thread_main_t *vtm = vlib_get_thread_main (); u32 num_threads; u32 preallocated_sessions_per_worker; - int i; + int i, j; num_threads = 1 /* main thread */ + vtm->n_threads; @@ -877,12 +1060,21 @@ session_manager_main_enable (vlib_main_t * vm) /* configure per-thread ** vectors */ vec_validate (smm->sessions, num_threads - 1); - vec_validate (smm->session_indices_to_enqueue_by_thread, num_threads - 1); vec_validate (smm->tx_buffers, num_threads - 1); vec_validate (smm->pending_event_vector, num_threads - 1); + vec_validate (smm->pending_disconnects, num_threads - 1); vec_validate (smm->free_event_vector, num_threads - 1); - vec_validate (smm->current_enqueue_epoch, num_threads - 1); vec_validate (smm->vpp_event_queues, num_threads - 1); + vec_validate (smm->session_peekers, num_threads - 1); + vec_validate (smm->peekers_readers_locks, num_threads - 1); + vec_validate (smm->peekers_write_locks, num_threads - 1); + + for (i = 0; i < TRANSPORT_N_PROTO; i++) + for (j = 0; j < num_threads; j++) + { + vec_validate (smm->session_to_enqueue[i], num_threads - 1); + vec_validate (smm->current_enqueue_epoch[i], num_threads - 1); + } for (i = 0; i < num_threads; i++) { @@ -890,6 +1082,8 @@ session_manager_main_enable (vlib_main_t * vm) _vec_len (smm->free_event_vector[i]) = 0; vec_validate (smm->pending_event_vector[i], 0); _vec_len (smm->pending_event_vector[i]) = 0; + vec_validate (smm->pending_disconnects[i], 0); + _vec_len (smm->pending_disconnects[i]) = 0; } #if SESSION_DBG @@ -924,6 +1118,7 @@ session_manager_main_enable (vlib_main_t * vm) session_lookup_init (); app_namespaces_init (); + transport_init (); smm->is_enabled = 1; diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h index b1a03d213e9..bd854d4b4c5 100644 --- a/src/vnet/session/session.h +++ b/src/vnet/session/session.h @@ -105,7 +105,7 @@ typedef CLIB_PACKED (struct { rpc_args_t rpc_args; }; u8 event_type; - u16 event_id; + u8 postponed; }) session_fifo_event_t; /* *INDENT-ON* */ @@ -128,17 +128,21 @@ struct _session_manager_main /** Per worker thread session pools */ stream_session_t **sessions; + /** Per worker-thread count of threads peeking into the session pool */ + u32 *session_peekers; + + /** Per worker-thread rw peekers locks */ + clib_spinlock_t *peekers_readers_locks; + clib_spinlock_t *peekers_write_locks; + /** Pool of listen sessions. Same type as stream sessions to ease lookups */ stream_session_t *listen_sessions[SESSION_N_TYPES]; - /** Sparse vector to map dst port to stream server */ - u16 *stream_server_by_dst_port[SESSION_N_TYPES]; - - /** per-worker enqueue epoch counters */ - u8 *current_enqueue_epoch; + /** Per-proto, per-worker enqueue epoch counters */ + u8 *current_enqueue_epoch[TRANSPORT_N_PROTO]; - /** Per-worker thread vector of sessions to enqueue */ - u32 **session_indices_to_enqueue_by_thread; + /** Per-proto, per-worker thread vector of sessions to enqueue */ + u32 **session_to_enqueue[TRANSPORT_N_PROTO]; /** per-worker tx buffer free lists */ u32 **tx_buffers; @@ -149,6 +153,9 @@ struct _session_manager_main /** per-worker active event vectors */ session_fifo_event_t **pending_event_vector; + /** per-worker postponed disconnects */ + session_fifo_event_t **pending_disconnects; + /** vpp fifo event queue */ unix_shared_memory_queue_t **vpp_event_queues; @@ -213,6 +220,8 @@ stream_session_is_valid (u32 si, u8 thread_index) return 1; } +stream_session_t *session_alloc (u32 thread_index); + always_inline stream_session_t * session_get (u32 si, u32 thread_index) { @@ -221,7 +230,7 @@ session_get (u32 si, u32 thread_index) } always_inline stream_session_t * -stream_session_get_if_valid (u64 si, u32 thread_index) +session_get_if_valid (u64 si, u32 thread_index) { if (thread_index >= vec_len (session_manager_main.sessions)) return 0; @@ -234,7 +243,7 @@ stream_session_get_if_valid (u64 si, u32 thread_index) } always_inline u64 -stream_session_handle (stream_session_t * s) +session_handle (stream_session_t * s) { return ((u64) s->thread_index << 32) | (u64) s->session_index; } @@ -267,6 +276,66 @@ session_get_from_handle (u64 handle) session_index_from_handle (handle)); } +/** + * Acquires a lock that blocks a session pool from expanding. + * + * This is typically used for safely peeking into other threads' + * pools in order to clone elements. Lock should be dropped as soon + * as possible by calling @ref session_pool_remove_peeker. + * + * NOTE: Avoid using pool_elt_at_index while the lock is held because + * it may lead to free elt bitmap expansion/contraction! + */ +always_inline void +session_pool_add_peeker (u32 thread_index) +{ + session_manager_main_t *smm = &session_manager_main; + if (thread_index == vlib_get_thread_index ()) + return; + clib_spinlock_lock_if_init (&smm->peekers_readers_locks[thread_index]); + smm->session_peekers[thread_index] += 1; + if (smm->session_peekers[thread_index] == 1) + clib_spinlock_lock_if_init (&smm->peekers_write_locks[thread_index]); + clib_spinlock_unlock_if_init (&smm->peekers_readers_locks[thread_index]); +} + +always_inline void +session_pool_remove_peeker (u32 thread_index) +{ + session_manager_main_t *smm = &session_manager_main; + if (thread_index == vlib_get_thread_index ()) + return; + ASSERT (session_manager_main.session_peekers[thread_index] > 0); + clib_spinlock_lock_if_init (&smm->peekers_readers_locks[thread_index]); + smm->session_peekers[thread_index] -= 1; + if (smm->session_peekers[thread_index] == 0) + clib_spinlock_unlock_if_init (&smm->peekers_write_locks[thread_index]); + clib_spinlock_unlock_if_init (&smm->peekers_readers_locks[thread_index]); +} + +/** + * Get session from handle and 'lock' pool resize if not in same thread + * + * Caller should drop the peek 'lock' as soon as possible. + */ +always_inline stream_session_t * +session_get_from_handle_safe (u64 handle) +{ + session_manager_main_t *smm = &session_manager_main; + u32 thread_index = session_thread_from_handle (handle); + if (thread_index == vlib_get_thread_index ()) + { + return pool_elt_at_index (smm->sessions[thread_index], + session_index_from_handle (handle)); + } + else + { + session_pool_add_peeker (thread_index); + /* Don't use pool_elt_at index. See @ref session_pool_add_peeker */ + return smm->sessions[thread_index] + session_index_from_handle (handle); + } +} + always_inline stream_session_t * stream_session_listener_get (u8 sst, u64 si) { @@ -296,17 +365,52 @@ stream_session_rx_fifo_size (transport_connection_t * tc) return s->server_rx_fifo->nitems; } +always_inline u32 +session_get_index (stream_session_t * s) +{ + return (s - session_manager_main.sessions[s->thread_index]); +} + +always_inline stream_session_t * +session_clone_safe (u32 session_index, u32 thread_index) +{ + stream_session_t *old_s, *new_s; + u32 current_thread_index = vlib_get_thread_index (); + + /* If during the memcpy pool is reallocated AND the memory allocator + * decides to give the old chunk of memory to somebody in a hurry to + * scribble something on it, we have a problem. So add this thread as + * a session pool peeker. + */ + session_pool_add_peeker (thread_index); + new_s = session_alloc (current_thread_index); + old_s = session_manager_main.sessions[thread_index] + session_index; + clib_memcpy (new_s, old_s, sizeof (*new_s)); + session_pool_remove_peeker (thread_index); + new_s->thread_index = current_thread_index; + new_s->session_index = session_get_index (new_s); + return new_s; +} + +transport_connection_t *session_get_transport (stream_session_t * s); + u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc); +stream_session_t *session_alloc (u32 thread_index); int -stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b, - u32 offset, u8 queue_event, u8 is_in_order); -int -stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, - u32 offset, u32 max_bytes); +session_enqueue_stream_connection (transport_connection_t * tc, + vlib_buffer_t * b, u32 offset, + u8 queue_event, u8 is_in_order); +int session_enqueue_dgram_connection (stream_session_t * s, vlib_buffer_t * b, + u8 proto, u8 queue_event); +int stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer, + u32 offset, u32 max_bytes); u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes); -int stream_session_connect_notify (transport_connection_t * tc, u8 is_fail); +int session_stream_connect_notify (transport_connection_t * tc, u8 is_fail); +int session_dgram_connect_notify (transport_connection_t * tc, + u32 old_thread_index, + stream_session_t ** new_session); void stream_session_init_fifos_pointers (transport_connection_t * tc, u32 rx_pointer, u32 tx_pointer); @@ -314,12 +418,9 @@ void stream_session_accept_notify (transport_connection_t * tc); void stream_session_disconnect_notify (transport_connection_t * tc); void stream_session_delete_notify (transport_connection_t * tc); void stream_session_reset_notify (transport_connection_t * tc); -int -stream_session_accept (transport_connection_t * tc, u32 listener_index, - u8 notify); -int -stream_session_open (u32 app_index, session_endpoint_t * tep, - transport_connection_t ** tc); +int stream_session_accept (transport_connection_t * tc, u32 listener_index, + u8 notify); +int session_open (u32 app_index, session_endpoint_t * tep, u32 opaque); int stream_session_listen (stream_session_t * s, session_endpoint_t * tep); int stream_session_stop_listen (stream_session_t * s); void stream_session_disconnect (stream_session_t * s); @@ -346,7 +447,7 @@ session_manager_get_vpp_event_queue (u32 thread_index) return session_manager_main.vpp_event_queues[thread_index]; } -int session_manager_flush_enqueue_events (u32 thread_index); +int session_manager_flush_enqueue_events (u8 proto, u32 thread_index); always_inline u64 listen_session_get_handle (stream_session_t * s) @@ -400,6 +501,8 @@ listen_session_del (stream_session_t * s) pool_put (session_manager_main.listen_sessions[s->session_type], s); } +transport_connection_t *listen_session_get_transport (stream_session_t * s); + int listen_session_get_local_session_endpoint (stream_session_t * listener, session_endpoint_t * sep); diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c index 5bfca7be6fc..432c7ba6013 100755 --- a/src/vnet/session/session_api.c +++ b/src/vnet/session/session_api.c @@ -99,10 +99,10 @@ send_session_accept_callback (stream_session_t * s) mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_ACCEPT_SESSION); mp->context = server->index; listener = listen_session_get (s->session_type, s->listener_index); - tp_vft = session_get_transport_vft (s->session_type); + tp_vft = transport_protocol_get_vft (s->session_type); tc = tp_vft->get_connection (s->connection_index, s->thread_index); mp->listener_handle = listen_session_get_handle (listener); - mp->handle = stream_session_handle (s); + mp->handle = session_handle (s); mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo); mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo); mp->vpp_event_queue_address = pointer_to_uword (vpp_queue); @@ -129,7 +129,7 @@ send_session_disconnect_callback (stream_session_t * s) mp = vl_msg_api_alloc (sizeof (*mp)); memset (mp, 0, sizeof (*mp)); mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_DISCONNECT_SESSION); - mp->handle = stream_session_handle (s); + mp->handle = session_handle (s); vl_msg_api_send_shmem (q, (u8 *) & mp); } @@ -148,7 +148,7 @@ send_session_reset_callback (stream_session_t * s) mp = vl_msg_api_alloc (sizeof (*mp)); memset (mp, 0, sizeof (*mp)); mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_RESET_SESSION); - mp->handle = stream_session_handle (s); + mp->handle = session_handle (s); vl_msg_api_send_shmem (q, (u8 *) & mp); } @@ -175,7 +175,7 @@ send_session_connected_callback (u32 app_index, u32 api_context, vpp_queue = session_manager_get_vpp_event_queue (s->thread_index); mp->server_rx_fifo = pointer_to_uword (s->server_rx_fifo); mp->server_tx_fifo = pointer_to_uword (s->server_tx_fifo); - mp->handle = stream_session_handle (s); + mp->handle = session_handle (s); mp->vpp_event_queue_address = pointer_to_uword (vpp_queue); mp->retval = 0; } @@ -463,11 +463,14 @@ vl_api_connect_uri_t_handler (vl_api_connect_uri_t * mp) rv = VNET_API_ERROR_APPLICATION_NOT_ATTACHED; } + /* + * Don't reply to stream (tcp) connects. The reply will come once + * the connection is established. In case of the redirects, the reply + * will come from the server app. + */ if (rv == 0 || rv == VNET_API_ERROR_SESSION_REDIRECT) return; - /* Got some error, relay it */ - done: /* *INDENT-OFF* */ REPLY_MACRO (VL_API_CONNECT_SESSION_REPLY); @@ -540,7 +543,7 @@ vl_api_reset_session_reply_t_handler (vl_api_reset_session_reply_t * mp) return; session_parse_handle (mp->handle, &index, &thread_index); - s = stream_session_get_if_valid (index, thread_index); + s = session_get_if_valid (index, thread_index); if (s == 0 || app->index != s->app_index) { clib_warning ("Invalid session!"); @@ -576,7 +579,7 @@ vl_api_accept_session_reply_t_handler (vl_api_accept_session_reply_t * mp) else { session_parse_handle (mp->handle, &session_index, &thread_index); - s = stream_session_get_if_valid (session_index, thread_index); + s = session_get_if_valid (session_index, thread_index); if (!s) { clib_warning ("session doesn't exist"); @@ -623,8 +626,8 @@ vl_api_bind_sock_t_handler (vl_api_bind_sock_t * mp) a->sep.port = mp->port; a->sep.fib_index = mp->vrf; a->sep.sw_if_index = ENDPOINT_INVALID_INDEX; + a->sep.transport_proto = mp->proto; a->app_index = app->index; - a->proto = mp->proto; if ((error = vnet_bind (a))) { diff --git a/src/vnet/session/session_cli.c b/src/vnet/session/session_cli.c index 588cb603d39..f0f490daa21 100755 --- a/src/vnet/session/session_cli.c +++ b/src/vnet/session/session_cli.c @@ -55,7 +55,7 @@ format_stream_session (u8 * s, va_list * args) int verbose = va_arg (*args, int); transport_proto_vft_t *tp_vft; u8 *str = 0; - tp_vft = session_get_transport_vft (ss->session_type); + tp_vft = transport_protocol_get_vft (ss->session_type); if (verbose == 1 && ss->session_state >= SESSION_STATE_ACCEPTING) str = format (0, "%-10u%-10u%-10lld", @@ -63,9 +63,7 @@ format_stream_session (u8 * s, va_list * args) svm_fifo_max_enqueue (ss->server_tx_fifo), stream_session_get_index (ss)); - if (ss->session_state == SESSION_STATE_READY - || ss->session_state == SESSION_STATE_ACCEPTING - || ss->session_state == SESSION_STATE_CLOSED) + if (ss->session_state >= SESSION_STATE_ACCEPTING) { s = format (s, "%U", tp_vft->format_connection, ss->connection_index, ss->thread_index, verbose); @@ -146,16 +144,17 @@ unformat_stream_session (unformat_input_t * input, va_list * args) return 0; if (is_ip4) - s = session_lookup4 (fib_index, &lcl.ip4, &rmt.ip4, - clib_host_to_net_u16 (lcl_port), - clib_host_to_net_u16 (rmt_port), proto); + s = session_lookup_safe4 (fib_index, &lcl.ip4, &rmt.ip4, + clib_host_to_net_u16 (lcl_port), + clib_host_to_net_u16 (rmt_port), proto); else - s = session_lookup6 (fib_index, &lcl.ip6, &rmt.ip6, - clib_host_to_net_u16 (lcl_port), - clib_host_to_net_u16 (rmt_port), proto); + s = session_lookup_safe6 (fib_index, &lcl.ip6, &rmt.ip6, + clib_host_to_net_u16 (lcl_port), + clib_host_to_net_u16 (rmt_port), proto); if (s) { *result = s; + session_pool_remove_peeker (s->thread_index); return 1; } return 0; @@ -324,7 +323,7 @@ clear_session_command_fn (vlib_main_t * vm, unformat_input_t * input, if (session_index != ~0) { - session = stream_session_get_if_valid (session_index, thread_index); + session = session_get_if_valid (session_index, thread_index); if (!session) return clib_error_return (0, "no session %d on thread %d", session_index, thread_index); diff --git a/src/vnet/session/session_lookup.c b/src/vnet/session/session_lookup.c index 796d93ec33e..740c5a6d533 100644 --- a/src/vnet/session/session_lookup.c +++ b/src/vnet/session/session_lookup.c @@ -116,7 +116,7 @@ always_inline void make_v4_ss_kv_from_tc (session_kv4_t * kv, transport_connection_t * t) { make_v4_ss_kv (kv, &t->lcl_ip.ip4, &t->rmt_ip.ip4, t->lcl_port, t->rmt_port, - session_type_from_proto_and_ip (t->transport_proto, 1)); + session_type_from_proto_and_ip (t->proto, 1)); } always_inline void @@ -159,7 +159,7 @@ always_inline void make_v6_ss_kv_from_tc (session_kv6_t * kv, transport_connection_t * t) { make_v6_ss_kv (kv, &t->lcl_ip.ip6, &t->rmt_ip.ip6, t->lcl_port, t->rmt_port, - session_type_from_proto_and_ip (t->transport_proto, 0)); + session_type_from_proto_and_ip (t->proto, 0)); } @@ -339,7 +339,7 @@ session_lookup_del_session (stream_session_t * s) return session_lookup_del_connection (ts); } -u32 +u64 session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep) { session_table_t *st; @@ -349,14 +349,14 @@ session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep) st = session_table_get (table_index); if (!st) - return SESSION_INVALID_INDEX; + return SESSION_INVALID_HANDLE; if (sep->is_ip4) { make_v4_listener_kv (&kv4, &sep->ip.ip4, sep->port, sep->transport_proto); rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4); if (rv == 0) - return (u32) kv4.value; + return kv4.value; } else { @@ -364,9 +364,43 @@ session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep) sep->transport_proto); rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6); if (rv == 0) - return (u32) kv6.value; + return kv6.value; } - return SESSION_INVALID_INDEX; + return SESSION_INVALID_HANDLE; +} + +stream_session_t * +session_lookup_global_session_endpoint (session_endpoint_t * sep) +{ + session_table_t *st; + session_kv4_t kv4; + session_kv6_t kv6; + u8 fib_proto; + u32 table_index; + int rv; + + fib_proto = session_endpoint_fib_proto (sep); + table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index); + st = session_table_get (table_index); + if (!st) + return 0; + if (sep->is_ip4) + { + make_v4_listener_kv (&kv4, &sep->ip.ip4, sep->port, + sep->transport_proto); + rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4); + if (rv == 0) + return session_get_from_handle (kv4.value); + } + else + { + make_v6_listener_kv (&kv6, &sep->ip.ip6, sep->port, + sep->transport_proto); + rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6); + if (rv == 0) + return session_get_from_handle (kv6.value); + } + return 0; } u32 @@ -562,7 +596,7 @@ session_lookup_half_open_handle (transport_connection_t * tc) if (tc->is_ip4) { make_v4_ss_kv (&kv4, &tc->lcl_ip.ip4, &tc->rmt_ip.ip4, tc->lcl_port, - tc->rmt_port, tc->transport_proto); + tc->rmt_port, tc->proto); rv = clib_bihash_search_inline_16_8 (&st->v4_half_open_hash, &kv4); if (rv == 0) return kv4.value; @@ -570,7 +604,7 @@ session_lookup_half_open_handle (transport_connection_t * tc) else { make_v6_ss_kv (&kv6, &tc->lcl_ip.ip6, &tc->rmt_ip.ip6, tc->lcl_port, - tc->rmt_port, tc->transport_proto); + tc->rmt_port, tc->proto); rv = clib_bihash_search_inline_48_8 (&st->v6_half_open_hash, &kv6); if (rv == 0) return kv6.value; @@ -713,12 +747,19 @@ session_lookup_connection4 (u32 fib_index, ip4_address_t * lcl, /** * Lookup session with ip4 and transport layer information * - * Lookup logic is identical to that of @ref session_lookup_connection_wt4 but - * this returns a session as opposed to a transport connection; + * Important note: this may look into another thread's pool table and + * register as 'peeker'. Caller should call @ref session_pool_remove_peeker as + * if needed as soon as possible. + * + * Lookup logic is similar to that of @ref session_lookup_connection_wt4 but + * this returns a session as opposed to a transport connection and it does not + * try to lookup half-open sessions. + * + * Typically used by dgram connections */ stream_session_t * -session_lookup4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt, - u16 lcl_port, u16 rmt_port, u8 proto) +session_lookup_safe4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt, + u16 lcl_port, u16 rmt_port, u8 proto) { session_table_t *st; session_kv4_t kv4; @@ -733,16 +774,11 @@ session_lookup4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt, make_v4_ss_kv (&kv4, lcl, rmt, lcl_port, rmt_port, proto); rv = clib_bihash_search_inline_16_8 (&st->v4_session_hash, &kv4); if (rv == 0) - return session_get_from_handle (kv4.value); + return session_get_from_handle_safe (kv4.value); /* If nothing is found, check if any listener is available */ if ((s = session_lookup_listener4_i (st, lcl, lcl_port, proto))) return s; - - /* Finally, try half-open connections */ - rv = clib_bihash_search_inline_16_8 (&st->v4_half_open_hash, &kv4); - if (rv == 0) - return session_get_from_handle (kv4.value); return 0; } @@ -868,12 +904,19 @@ session_lookup_connection6 (u32 fib_index, ip6_address_t * lcl, /** * Lookup session with ip6 and transport layer information * - * Lookup logic is identical to that of @ref session_lookup_connection_wt6 but - * this returns a session as opposed to a transport connection; + * Important note: this may look into another thread's pool table and + * register as 'peeker'. Caller should call @ref session_pool_remove_peeker as + * if needed as soon as possible. + * + * Lookup logic is similar to that of @ref session_lookup_connection_wt6 but + * this returns a session as opposed to a transport connection and it does not + * try to lookup half-open sessions. + * + * Typically used by dgram connections */ stream_session_t * -session_lookup6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt, - u16 lcl_port, u16 rmt_port, u8 proto) +session_lookup_safe6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt, + u16 lcl_port, u16 rmt_port, u8 proto) { session_table_t *st; session_kv6_t kv6; @@ -887,16 +930,11 @@ session_lookup6 (u32 fib_index, ip6_address_t * lcl, ip6_address_t * rmt, make_v6_ss_kv (&kv6, lcl, rmt, lcl_port, rmt_port, proto); rv = clib_bihash_search_inline_48_8 (&st->v6_session_hash, &kv6); if (rv == 0) - return session_get_from_handle (kv6.value); + return session_get_from_handle_safe (kv6.value); /* If nothing is found, check if any listener is available */ if ((s = session_lookup_listener6_i (st, lcl, lcl_port, proto))) return s; - - /* Finally, try half-open connections */ - rv = clib_bihash_search_inline_48_8 (&st->v6_half_open_hash, &kv6); - if (rv == 0) - return session_get_from_handle (kv6.value); return 0; } diff --git a/src/vnet/session/session_lookup.h b/src/vnet/session/session_lookup.h index 20cbaf2acd6..449f8f4e2d2 100644 --- a/src/vnet/session/session_lookup.h +++ b/src/vnet/session/session_lookup.h @@ -20,12 +20,12 @@ #include #include -stream_session_t *session_lookup4 (u32 fib_index, ip4_address_t * lcl, - ip4_address_t * rmt, u16 lcl_port, - u16 rmt_port, u8 proto); -stream_session_t *session_lookup6 (u32 fib_index, ip6_address_t * lcl, - ip6_address_t * rmt, u16 lcl_port, - u16 rmt_port, u8 proto); +stream_session_t *session_lookup_safe4 (u32 fib_index, ip4_address_t * lcl, + ip4_address_t * rmt, u16 lcl_port, + u16 rmt_port, u8 proto); +stream_session_t *session_lookup_safe6 (u32 fib_index, ip6_address_t * lcl, + ip6_address_t * rmt, u16 lcl_port, + u16 rmt_port, u8 proto); transport_connection_t *session_lookup_connection_wt4 (u32 fib_index, ip4_address_t * lcl, ip4_address_t * rmt, @@ -58,10 +58,12 @@ stream_session_t *session_lookup_listener (u32 table_index, session_endpoint_t * sep); int session_lookup_add_connection (transport_connection_t * tc, u64 value); int session_lookup_del_connection (transport_connection_t * tc); -u32 session_lookup_session_endpoint (u32 table_index, +u64 session_lookup_session_endpoint (u32 table_index, session_endpoint_t * sep); u32 session_lookup_local_session_endpoint (u32 table_index, session_endpoint_t * sep); +stream_session_t *session_lookup_global_session_endpoint (session_endpoint_t + *); int session_lookup_add_session_endpoint (u32 table_index, session_endpoint_t * sep, u64 value); int session_lookup_del_session_endpoint (u32 table_index, diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c index d2291fa38de..cbe936ccd86 100644 --- a/src/vnet/session/session_node.c +++ b/src/vnet/session/session_node.c @@ -154,7 +154,7 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node, next_index = next0 = session_type_to_next[s0->session_type]; - transport_vft = session_get_transport_vft (s0->session_type); + transport_vft = transport_protocol_get_vft (s0->session_type); tc0 = transport_vft->get_connection (s0->connection_index, thread_index); /* Make sure we have space to send and there's something to dequeue */ @@ -401,8 +401,7 @@ session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node, always_inline stream_session_t * session_event_get_session (session_fifo_event_t * e, u8 thread_index) { - return stream_session_get_if_valid (e->fifo->master_session_index, - thread_index); + return session_get_if_valid (e->fifo->master_session_index, thread_index); } void @@ -540,7 +539,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * frame) { session_manager_main_t *smm = vnet_get_session_manager_main (); - session_fifo_event_t *my_pending_event_vector, *e; + session_fifo_event_t *my_pending_event_vector, *pending_disconnects, *e; session_fifo_event_t *my_fifo_events; u32 n_to_dequeue, n_events; unix_shared_memory_queue_t *q; @@ -570,8 +569,10 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, /* min number of events we can dequeue without blocking */ n_to_dequeue = q->cursize; my_pending_event_vector = smm->pending_event_vector[my_thread_index]; + pending_disconnects = smm->pending_disconnects[my_thread_index]; - if (n_to_dequeue == 0 && vec_len (my_pending_event_vector) == 0) + if (!n_to_dequeue && !vec_len (my_pending_event_vector) + && !vec_len (pending_disconnects)) return 0; SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0); @@ -603,9 +604,11 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, pthread_mutex_unlock (&q->mutex); vec_append (my_fifo_events, my_pending_event_vector); + vec_append (my_fifo_events, smm->pending_disconnects[my_thread_index]); _vec_len (my_pending_event_vector) = 0; smm->pending_event_vector[my_thread_index] = my_pending_event_vector; + _vec_len (smm->pending_disconnects[my_thread_index]) = 0; skip_dequeue: n_events = vec_len (my_fifo_events); @@ -644,6 +647,13 @@ skip_dequeue: } break; case FIFO_EVENT_DISCONNECT: + /* Make sure disconnects run after the pending list is drained */ + if (!e0->postponed) + { + e0->postponed = 1; + vec_add1 (smm->pending_disconnects[my_thread_index], *e0); + continue; + } s0 = session_get_from_handle (e0->session_handle); stream_session_disconnect (s0); break; diff --git a/src/vnet/session/session_table.h b/src/vnet/session/session_table.h index ce0b4a2ff25..5e0564043f3 100644 --- a/src/vnet/session/session_table.h +++ b/src/vnet/session/session_table.h @@ -37,6 +37,7 @@ typedef struct _session_lookup_table #define SESSION_TABLE_INVALID_INDEX ((u32)~0) #define SESSION_LOCAL_TABLE_PREFIX ((u32)~0) #define SESSION_INVALID_INDEX ((u32)~0) +#define SESSION_INVALID_HANDLE ((u64)~0) typedef int (*ip4_session_table_walk_fn_t) (clib_bihash_kv_16_8_t * kvp, void *ctx); diff --git a/src/vnet/session/session_test.c b/src/vnet/session/session_test.c index b46b33d1396..433c20e5e1d 100644 --- a/src/vnet/session/session_test.c +++ b/src/vnet/session/session_test.c @@ -260,8 +260,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input) SESSION_TEST ((s->app_index == server_index), "app_index should be that of " "the server"); server_local_st_index = application_local_session_table (server); - local_listener = session_lookup_session_endpoint (server_local_st_index, - &server_sep); + local_listener = + session_lookup_local_session_endpoint (server_local_st_index, + &server_sep); SESSION_TEST ((local_listener != SESSION_INVALID_INDEX), "listener should exist in local table"); @@ -312,8 +313,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input) s = session_lookup_listener (server_st_index, &server_sep); SESSION_TEST ((s == 0), "listener should not exist in global table"); - local_listener = session_lookup_session_endpoint (server_local_st_index, - &server_sep); + local_listener = + session_lookup_local_session_endpoint (server_local_st_index, + &server_sep); SESSION_TEST ((s == 0), "listener should not exist in local table"); detach_args.app_index = server_index; @@ -337,8 +339,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input) s = session_lookup_listener (server_st_index, &server_sep); SESSION_TEST ((s == 0), "listener should not exist in global table"); server_local_st_index = application_local_session_table (server); - local_listener = session_lookup_session_endpoint (server_local_st_index, - &server_sep); + local_listener = + session_lookup_local_session_endpoint (server_local_st_index, + &server_sep); SESSION_TEST ((local_listener != SESSION_INVALID_INDEX), "listener should exist in local table"); @@ -346,8 +349,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input) error = vnet_unbind (&unbind_args); SESSION_TEST ((error == 0), "unbind should work"); - local_listener = session_lookup_session_endpoint (server_local_st_index, - &server_sep); + local_listener = + session_lookup_local_session_endpoint (server_local_st_index, + &server_sep); SESSION_TEST ((local_listener == SESSION_INVALID_INDEX), "listener should not exist in local table"); @@ -417,8 +421,9 @@ session_test_namespace (vlib_main_t * vm, unformat_input_t * input) SESSION_TEST ((s->app_index == server_index), "app_index should be that of " "the server"); server_local_st_index = application_local_session_table (server); - local_listener = session_lookup_session_endpoint (server_local_st_index, - &server_sep); + local_listener = + session_lookup_local_session_endpoint (server_local_st_index, + &server_sep); SESSION_TEST ((local_listener != SESSION_INVALID_INDEX), "zero listener should exist in local table"); detach_args.app_index = server_index; diff --git a/src/vnet/session/stream_session.h b/src/vnet/session/stream_session.h index 1ed6e0b9eec..51d5065059b 100644 --- a/src/vnet/session/stream_session.h +++ b/src/vnet/session/stream_session.h @@ -43,6 +43,7 @@ typedef enum SESSION_STATE_CONNECTING, SESSION_STATE_ACCEPTING, SESSION_STATE_READY, + SESSION_STATE_CONNECTING_READY, SESSION_STATE_CLOSED, SESSION_STATE_N_STATES, } stream_session_state_t; diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c new file mode 100644 index 00000000000..fc722e45668 --- /dev/null +++ b/src/vnet/session/transport.c @@ -0,0 +1,306 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +/** + * Per-type vector of transport protocol virtual function tables + */ +transport_proto_vft_t *tp_vfts; + +/* + * Port allocator seed + */ +static u32 port_allocator_seed; + +/* + * Local endpoints table + */ +static transport_endpoint_table_t local_endpoints_table; + +/* + * Pool of local endpoints + */ +static transport_endpoint_t *local_endpoints; + +/* + * Local endpoints pool lock + */ +static clib_spinlock_t local_endpoints_lock; + + +u32 +transport_endpoint_lookup (transport_endpoint_table_t * ht, u8 proto, + ip46_address_t * ip, u16 port) +{ + clib_bihash_kv_24_8_t kv; + int rv; + + kv.key[0] = ip->as_u64[0]; + kv.key[1] = ip->as_u64[1]; + kv.key[2] = (u64) port << 8 | (u64) proto; + + rv = clib_bihash_search_inline_24_8 (ht, &kv); + if (rv == 0) + return kv.value; + + return ENDPOINT_INVALID_INDEX; +} + +void +transport_endpoint_table_add (transport_endpoint_table_t * ht, u8 proto, + transport_endpoint_t * te, u32 value) +{ + clib_bihash_kv_24_8_t kv; + + kv.key[0] = te->ip.as_u64[0]; + kv.key[1] = te->ip.as_u64[1]; + kv.key[2] = (u64) te->port << 8 | (u64) proto; + kv.value = value; + + clib_bihash_add_del_24_8 (ht, &kv, 1); +} + +void +transport_endpoint_table_del (transport_endpoint_table_t * ht, u8 proto, + transport_endpoint_t * te) +{ + clib_bihash_kv_24_8_t kv; + + kv.key[0] = te->ip.as_u64[0]; + kv.key[1] = te->ip.as_u64[1]; + kv.key[2] = (u64) te->port << 8 | (u64) proto; + + clib_bihash_add_del_24_8 (ht, &kv, 0); +} + +/** + * Register transport virtual function table. + * + * @param type - session type (not protocol type) + * @param vft - virtual function table + */ +void +transport_register_protocol (transport_proto_t transport_proto, u8 is_ip4, + const transport_proto_vft_t * vft) +{ + u8 session_type; + session_type = session_type_from_proto_and_ip (transport_proto, is_ip4); + + vec_validate (tp_vfts, session_type); + tp_vfts[session_type] = *vft; + + /* If an offset function is provided, then peek instead of dequeue */ + session_manager_set_transport_rx_fn (session_type, + vft->tx_fifo_offset != 0); +} + +/** + * Get transport virtual function table + * + * @param type - session type (not protocol type) + */ +transport_proto_vft_t * +transport_protocol_get_vft (u8 session_type) +{ + if (session_type >= vec_len (tp_vfts)) + return 0; + return &tp_vfts[session_type]; +} + +#define PORT_MASK ((1 << 16)- 1) + +void +transport_endpoint_del (u32 tepi) +{ + clib_spinlock_lock_if_init (&local_endpoints_lock); + pool_put_index (local_endpoints, tepi); + clib_spinlock_unlock_if_init (&local_endpoints_lock); +} + +always_inline transport_endpoint_t * +transport_endpoint_new (void) +{ + transport_endpoint_t *tep; + pool_get (local_endpoints, tep); + return tep; +} + +void +transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port) +{ + u32 tepi; + transport_endpoint_t *tep; + + /* Cleanup local endpoint if this was an active connect */ + tepi = transport_endpoint_lookup (&local_endpoints_table, proto, lcl_ip, + clib_net_to_host_u16 (port)); + if (tepi != ENDPOINT_INVALID_INDEX) + { + tep = pool_elt_at_index (local_endpoints, tepi); + transport_endpoint_table_del (&local_endpoints_table, proto, tep); + transport_endpoint_del (tepi); + } +} + +/** + * Allocate local port and add if successful add entry to local endpoint + * table to mark the pair as used. + */ +int +transport_alloc_local_port (u8 proto, ip46_address_t * ip) +{ + transport_endpoint_t *tep; + u32 tei; + u16 min = 1024, max = 65535; /* XXX configurable ? */ + int tries, limit; + + limit = max - min; + + /* Only support active opens from thread 0 */ + ASSERT (vlib_get_thread_index () == 0); + + /* Search for first free slot */ + for (tries = 0; tries < limit; tries++) + { + u16 port = 0; + + /* Find a port in the specified range */ + while (1) + { + port = random_u32 (&port_allocator_seed) & PORT_MASK; + if (PREDICT_TRUE (port >= min && port < max)) + break; + } + + /* Look it up. If not found, we're done */ + tei = transport_endpoint_lookup (&local_endpoints_table, proto, ip, + port); + if (tei == ENDPOINT_INVALID_INDEX) + { + clib_spinlock_lock_if_init (&local_endpoints_lock); + tep = transport_endpoint_new (); + clib_memcpy (&tep->ip, ip, sizeof (*ip)); + tep->port = port; + transport_endpoint_table_add (&local_endpoints_table, proto, tep, + tep - local_endpoints); + clib_spinlock_unlock_if_init (&local_endpoints_lock); + + return tep->port; + } + } + return -1; +} + +int +transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt, + ip46_address_t * lcl_addr, u16 * lcl_port) +{ + fib_prefix_t prefix; + fib_node_index_t fei; + u32 sw_if_index; + int port; + + /* + * Find the local address and allocate port + */ + + /* Find a FIB path to the destination */ + clib_memcpy (&prefix.fp_addr, &rmt->ip, sizeof (rmt->ip)); + prefix.fp_proto = rmt->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; + prefix.fp_len = rmt->is_ip4 ? 32 : 128; + + ASSERT (rmt->fib_index != ENDPOINT_INVALID_INDEX); + fei = fib_table_lookup (rmt->fib_index, &prefix); + + /* Couldn't find route to destination. Bail out. */ + if (fei == FIB_NODE_INDEX_INVALID) + { + clib_warning ("no route to destination"); + return -1; + } + + sw_if_index = rmt->sw_if_index; + if (sw_if_index == ENDPOINT_INVALID_INDEX) + sw_if_index = fib_entry_get_resolving_interface (fei); + + if (sw_if_index == ENDPOINT_INVALID_INDEX) + { + clib_warning ("no resolving interface for %U", format_ip46_address, + &rmt->ip, (rmt->is_ip4 == 0) + 1); + return -1; + } + + memset (lcl_addr, 0, sizeof (*lcl_addr)); + + if (rmt->is_ip4) + { + ip4_address_t *ip4; + ip4 = ip_interface_get_first_ip (sw_if_index, 1); + lcl_addr->ip4.as_u32 = ip4->as_u32; + } + else + { + ip6_address_t *ip6; + ip6 = ip_interface_get_first_ip (sw_if_index, 0); + if (ip6 == 0) + { + clib_warning ("no routable ip6 addresses on %U", + format_vnet_sw_if_index_name, vnet_get_main (), + sw_if_index); + return -1; + } + clib_memcpy (&lcl_addr->ip6, ip6, sizeof (*ip6)); + } + + /* Allocate source port */ + port = transport_alloc_local_port (proto, lcl_addr); + if (port < 1) + { + clib_warning ("Failed to allocate src port"); + return -1; + } + *lcl_port = port; + return 0; +} + +void +transport_init (void) +{ + vlib_thread_main_t *vtm = vlib_get_thread_main (); + u32 local_endpoints_table_buckets = 250000; + u32 local_endpoints_table_memory = 512 << 20; + u32 num_threads; + + /* Initialize [port-allocator] random number seed */ + port_allocator_seed = (u32) clib_cpu_time_now (); + + clib_bihash_init_24_8 (&local_endpoints_table, "local endpoints table", + local_endpoints_table_buckets, + local_endpoints_table_memory); + num_threads = 1 /* main thread */ + vtm->n_threads; + if (num_threads > 1) + clib_spinlock_init (&local_endpoints_lock); +} + +/* + * fd.io coding-style-patch-verification: ON + * + * Local Variables: + * eval: (c-set-style "gnu") + * End: + */ diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h index 8c299c46490..f2cc80bb23a 100644 --- a/src/vnet/session/transport.h +++ b/src/vnet/session/transport.h @@ -29,7 +29,7 @@ typedef struct _transport_connection ip46_address_t lcl_ip; /**< Local IP */ u16 lcl_port; /**< Local port */ u16 rmt_port; /**< Remote port */ - u8 transport_proto; /**< Protocol id */ + u8 proto; /**< Protocol id */ u8 is_ip4; /**< Flag if IP4 connection */ u32 fib_index; /**< Network namespace */ @@ -54,7 +54,7 @@ typedef struct _transport_connection #define c_rmt_ip6 connection.rmt_ip.ip6 #define c_lcl_port connection.lcl_port #define c_rmt_port connection.rmt_port -#define c_transport_proto connection.transport_proto +#define c_proto connection.proto #define c_fib_index connection.fib_index #define c_s_index connection.s_index #define c_c_index connection.c_index @@ -69,7 +69,8 @@ typedef struct _transport_connection typedef enum _transport_proto { TRANSPORT_PROTO_TCP, - TRANSPORT_PROTO_UDP + TRANSPORT_PROTO_UDP, + TRANSPORT_N_PROTO } transport_proto_t; #define foreach_transport_connection_fields \ @@ -86,6 +87,8 @@ typedef struct _transport_endpoint #undef _ } transport_endpoint_t; +typedef clib_bihash_24_8_t transport_endpoint_table_t; + #define ENDPOINT_INVALID_INDEX ((u32)~0) always_inline u8 @@ -94,6 +97,31 @@ transport_connection_fib_proto (transport_connection_t * tc) return tc->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; } +always_inline u8 +transport_endpoint_fib_proto (transport_endpoint_t * tep) +{ + return tep->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; +} + +always_inline u8 +transport_is_stream (u8 proto) +{ + return (proto == TRANSPORT_PROTO_TCP); +} + +always_inline u8 +transport_is_dgram (u8 proto) +{ + return (proto == TRANSPORT_PROTO_UDP); +} + +int transport_alloc_local_port (u8 proto, ip46_address_t * ip); +int transport_alloc_local_endpoint (u8 proto, transport_endpoint_t * rmt, + ip46_address_t * lcl_addr, + u16 * lcl_port); +void transport_endpoint_cleanup (u8 proto, ip46_address_t * lcl_ip, u16 port); +void transport_init (void); + #endif /* VNET_VNET_URI_TRANSPORT_H_ */ /* diff --git a/src/vnet/session/transport_interface.c b/src/vnet/session/transport_interface.c deleted file mode 100644 index ef8d1e49524..00000000000 --- a/src/vnet/session/transport_interface.c +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2017 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -/** - * Per-type vector of transport protocol virtual function tables - */ -transport_proto_vft_t *tp_vfts; - -u32 -transport_endpoint_lookup (transport_endpoint_table_t * ht, - ip46_address_t * ip, u16 port) -{ - clib_bihash_kv_24_8_t kv; - int rv; - - kv.key[0] = ip->as_u64[0]; - kv.key[1] = ip->as_u64[1]; - kv.key[2] = port; - - rv = clib_bihash_search_inline_24_8 (ht, &kv); - if (rv == 0) - return kv.value; - - return TRANSPORT_ENDPOINT_INVALID_INDEX; -} - -void -transport_endpoint_table_add (transport_endpoint_table_t * ht, - transport_endpoint_t * te, u32 value) -{ - clib_bihash_kv_24_8_t kv; - - kv.key[0] = te->ip.as_u64[0]; - kv.key[1] = te->ip.as_u64[1]; - kv.key[2] = te->port; - kv.value = value; - - clib_bihash_add_del_24_8 (ht, &kv, 1); -} - -void -transport_endpoint_table_del (transport_endpoint_table_t * ht, - transport_endpoint_t * te) -{ - clib_bihash_kv_24_8_t kv; - - kv.key[0] = te->ip.as_u64[0]; - kv.key[1] = te->ip.as_u64[1]; - kv.key[2] = te->port; - - clib_bihash_add_del_24_8 (ht, &kv, 0); -} - -/** - * Register transport virtual function table. - * - * @param type - session type (not protocol type) - * @param vft - virtual function table - */ -void -session_register_transport (transport_proto_t transport_proto, u8 is_ip4, - const transport_proto_vft_t * vft) -{ - u8 session_type; - session_type = session_type_from_proto_and_ip (transport_proto, is_ip4); - - vec_validate (tp_vfts, session_type); - tp_vfts[session_type] = *vft; - - /* If an offset function is provided, then peek instead of dequeue */ - session_manager_set_transport_rx_fn (session_type, - vft->tx_fifo_offset != 0); -} - -/** - * Get transport virtual function table - * - * @param type - session type (not protocol type) - */ -transport_proto_vft_t * -session_get_transport_vft (u8 session_type) -{ - if (session_type >= vec_len (tp_vfts)) - return 0; - return &tp_vfts[session_type]; -} - -/* - * fd.io coding-style-patch-verification: ON - * - * Local Variables: - * eval: (c-set-style "gnu") - * End: - */ diff --git a/src/vnet/session/transport_interface.h b/src/vnet/session/transport_interface.h index 661221c484a..079e6464268 100644 --- a/src/vnet/session/transport_interface.h +++ b/src/vnet/session/transport_interface.h @@ -56,20 +56,10 @@ typedef struct _transport_proto_vft u8 *(*format_half_open) (u8 * s, va_list * args); } transport_proto_vft_t; -typedef clib_bihash_24_8_t transport_endpoint_table_t; - -#define TRANSPORT_ENDPOINT_INVALID_INDEX ((u32)~0) - -u32 transport_endpoint_lookup (transport_endpoint_table_t * ht, - ip46_address_t * ip, u16 port); -void transport_endpoint_table_add (transport_endpoint_table_t * ht, - transport_endpoint_t * te, u32 value); -void transport_endpoint_table_del (transport_endpoint_table_t * ht, - transport_endpoint_t * te); - -void session_register_transport (transport_proto_t transport_proto, u8 is_ip4, - const transport_proto_vft_t * vft); -transport_proto_vft_t *session_get_transport_vft (u8 session_type); +void transport_register_protocol (transport_proto_t transport_proto, + u8 is_ip4, + const transport_proto_vft_t * vft); +transport_proto_vft_t *transport_protocol_get_vft (u8 session_type); #endif /* SRC_VNET_SESSION_TRANSPORT_INTERFACE_H_ */ diff --git a/src/vnet/tcp/builtin_client.c b/src/vnet/tcp/builtin_client.c index 7a0d2ea1744..4258fc43010 100644 --- a/src/vnet/tcp/builtin_client.c +++ b/src/vnet/tcp/builtin_client.c @@ -50,7 +50,6 @@ send_test_chunk (tclient_main_t * tm, session_t * s) int test_buf_offset; u32 bytes_this_chunk; session_fifo_event_t evt; - static int serial_number = 0; svm_fifo_t *txf; int rv; @@ -98,7 +97,6 @@ send_test_chunk (tclient_main_t * tm, session_t * s) /* Fabricate TX event, send to vpp */ evt.fifo = txf; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = serial_number++; if (unix_shared_memory_queue_add (tm->vpp_event_queue[txf->master_thread_index], (u8 *) & evt, @@ -248,12 +246,12 @@ builtin_client_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, session_parse_handle (sp->vpp_session_handle, &index, &thread_index); - s = stream_session_get_if_valid (index, thread_index); + s = session_get_if_valid (index, thread_index); if (s) { vnet_disconnect_args_t _a, *a = &_a; - a->handle = stream_session_handle (s); + a->handle = session_handle (s); a->app_index = tm->app_index; vnet_disconnect_session (a); @@ -369,7 +367,7 @@ builtin_session_connected_callback (u32 app_index, u32 api_context, session->server_rx_fifo->client_session_index = session_index; session->server_tx_fifo = s->server_tx_fifo; session->server_tx_fifo->client_session_index = session_index; - session->vpp_session_handle = stream_session_handle (s); + session->vpp_session_handle = session_handle (s); vec_add1 (tm->connection_index_by_thread[thread_index], session_index); __sync_fetch_and_add (&tm->ready_connections, 1); @@ -403,7 +401,7 @@ builtin_session_disconnect_callback (stream_session_t * s) { tclient_main_t *tm = &tclient_main; vnet_disconnect_args_t _a, *a = &_a; - a->handle = stream_session_handle (s); + a->handle = session_handle (s); a->app_index = tm->app_index; vnet_disconnect_session (a); return; diff --git a/src/vnet/tcp/builtin_http_server.c b/src/vnet/tcp/builtin_http_server.c index f808400d63d..f307880bcfd 100644 --- a/src/vnet/tcp/builtin_http_server.c +++ b/src/vnet/tcp/builtin_http_server.c @@ -166,7 +166,6 @@ send_data (builtin_http_server_args * args, u8 * data) /* Fabricate TX event, send to vpp */ evt.fifo = s->server_tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = 0; unix_shared_memory_queue_add (hsm->vpp_queue[s->thread_index], (u8 *) & evt, @@ -346,7 +345,7 @@ http_server_rx_callback (stream_session_t * s) /* send the command to a new/recycled vlib process */ args = clib_mem_alloc (sizeof (*args)); args->data = vec_dup (hsm->rx_buf); - args->session_handle = stream_session_handle (s); + args->session_handle = session_handle (s); /* Send an RPC request via the thread-0 input node */ if (vlib_get_thread_index () != 0) @@ -382,7 +381,7 @@ builtin_session_disconnect_callback (stream_session_t * s) http_server_main_t *bsm = &http_server_main; vnet_disconnect_args_t _a, *a = &_a; - a->handle = stream_session_handle (s); + a->handle = session_handle (s); a->app_index = bsm->app_index; vnet_disconnect_session (a); } diff --git a/src/vnet/tcp/builtin_proxy.c b/src/vnet/tcp/builtin_proxy.c index a51a812ca16..e1e0198e695 100644 --- a/src/vnet/tcp/builtin_proxy.c +++ b/src/vnet/tcp/builtin_proxy.c @@ -32,7 +32,7 @@ delete_proxy_session (stream_session_t * s, int is_active_open) uword *p; u64 handle; - handle = stream_session_handle (s); + handle = session_handle (s); clib_spinlock_lock_if_init (&bpm->sessions_lock); if (is_active_open) @@ -88,19 +88,19 @@ delete_proxy_session (stream_session_t * s, int is_active_open) if (active_open_session) { - a->handle = stream_session_handle (active_open_session); + a->handle = session_handle (active_open_session); a->app_index = bpm->active_open_app_index; hash_unset (bpm->proxy_session_by_active_open_handle, - stream_session_handle (active_open_session)); + session_handle (active_open_session)); vnet_disconnect_session (a); } if (server_session) { - a->handle = stream_session_handle (server_session); + a->handle = session_handle (server_session); a->app_index = bpm->server_app_index; hash_unset (bpm->proxy_session_by_server_handle, - stream_session_handle (server_session)); + session_handle (server_session)); vnet_disconnect_session (a); } } @@ -171,8 +171,7 @@ server_rx_callback (stream_session_t * s) ASSERT (s->thread_index == thread_index); clib_spinlock_lock_if_init (&bpm->sessions_lock); - p = - hash_get (bpm->proxy_session_by_server_handle, stream_session_handle (s)); + p = hash_get (bpm->proxy_session_by_server_handle, session_handle (s)); if (PREDICT_TRUE (p != 0)) { @@ -218,7 +217,7 @@ server_rx_callback (stream_session_t * s) memset (ps, 0, sizeof (*ps)); ps->server_rx_fifo = rx_fifo; ps->server_tx_fifo = tx_fifo; - ps->vpp_server_handle = stream_session_handle (s); + ps->vpp_server_handle = session_handle (s); proxy_index = ps - bpm->sessions; @@ -268,7 +267,7 @@ active_open_connected_callback (u32 app_index, u32 opaque, clib_spinlock_lock_if_init (&bpm->sessions_lock); ps = pool_elt_at_index (bpm->sessions, opaque); - ps->vpp_active_open_handle = stream_session_handle (s); + ps->vpp_active_open_handle = session_handle (s); s->server_tx_fifo = ps->server_rx_fifo; s->server_rx_fifo = ps->server_tx_fifo; diff --git a/src/vnet/tcp/builtin_server.c b/src/vnet/tcp/builtin_server.c index b4a52c67fcd..27c4370389f 100644 --- a/src/vnet/tcp/builtin_server.c +++ b/src/vnet/tcp/builtin_server.c @@ -73,7 +73,7 @@ builtin_session_disconnect_callback (stream_session_t * s) builtin_server_main_t *bsm = &builtin_server_main; vnet_disconnect_args_t _a, *a = &_a; - a->handle = stream_session_handle (s); + a->handle = session_handle (s); a->app_index = bsm->app_index; vnet_disconnect_session (a); } @@ -158,7 +158,6 @@ builtin_server_rx_callback (stream_session_t * s) svm_fifo_t *tx_fifo, *rx_fifo; builtin_server_main_t *bsm = &builtin_server_main; session_fifo_event_t evt; - static int serial_number = 0; u32 thread_index = vlib_get_thread_index (); ASSERT (s->thread_index == thread_index); @@ -190,7 +189,6 @@ builtin_server_rx_callback (stream_session_t * s) unix_shared_memory_queue_t *q; evt.fifo = rx_fifo; evt.event_type = FIFO_EVENT_BUILTIN_RX; - evt.event_id = 0; q = bsm->vpp_queue[thread_index]; if (PREDICT_FALSE (q->cursize == q->maxsize)) @@ -232,7 +230,6 @@ builtin_server_rx_callback (stream_session_t * s) /* Fabricate TX event, send to vpp */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = serial_number++; if (unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index], (u8 *) & evt, diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c index 34c901eb1df..1c44ef04f85 100644 --- a/src/vnet/tcp/tcp.c +++ b/src/vnet/tcp/tcp.c @@ -28,39 +28,6 @@ tcp_main_t tcp_main; -void * -ip_interface_get_first_ip (u32 sw_if_index, u8 is_ip4) -{ - ip_lookup_main_t *lm4 = &ip4_main.lookup_main; - ip_lookup_main_t *lm6 = &ip6_main.lookup_main; - ip_interface_address_t *ia = 0; - - if (is_ip4) - { - /* *INDENT-OFF* */ - foreach_ip_interface_address (lm4, ia, sw_if_index, 1 /* unnumbered */ , - ({ - return ip_interface_address_get_address (lm4, ia); - })); - /* *INDENT-ON* */ - } - else - { - /* *INDENT-OFF* */ - foreach_ip_interface_address (lm6, ia, sw_if_index, 1 /* unnumbered */ , - ({ - ip6_address_t *rv; - rv = ip_interface_address_get_address (lm6, ia); - /* Trying to use a link-local ip6 src address is a fool's errand */ - if (!ip6_address_is_link_local_unicast (rv)) - return rv; - })); - /* *INDENT-ON* */ - } - - return 0; -} - static u32 tcp_connection_bind (u32 session_index, transport_endpoint_t * lcl) { @@ -83,7 +50,7 @@ tcp_connection_bind (u32 session_index, transport_endpoint_t * lcl) } ip_copy (&listener->c_lcl_ip, &lcl->ip, lcl->is_ip4); listener->c_is_ip4 = lcl->is_ip4; - listener->c_transport_proto = TRANSPORT_PROTO_TCP; + listener->c_proto = TRANSPORT_PROTO_TCP; listener->c_s_index = session_index; listener->c_fib_index = lcl->fib_index; listener->state = TCP_STATE_LISTEN; @@ -134,24 +101,6 @@ tcp_session_get_listener (u32 listener_index) return &tc->connection; } -always_inline void -transport_endpoint_del (u32 tepi) -{ - tcp_main_t *tm = vnet_get_tcp_main (); - clib_spinlock_lock_if_init (&tm->local_endpoints_lock); - pool_put_index (tm->local_endpoints, tepi); - clib_spinlock_unlock_if_init (&tm->local_endpoints_lock); -} - -always_inline transport_endpoint_t * -transport_endpoint_new (void) -{ - tcp_main_t *tm = vnet_get_tcp_main (); - transport_endpoint_t *tep; - pool_get (tm->local_endpoints, tep); - return tep; -} - /** * Cleanup half-open connection * @@ -209,18 +158,10 @@ void tcp_connection_cleanup (tcp_connection_t * tc) { tcp_main_t *tm = &tcp_main; - u32 tepi; - transport_endpoint_t *tep; /* Cleanup local endpoint if this was an active connect */ - tepi = transport_endpoint_lookup (&tm->local_endpoints_table, &tc->c_lcl_ip, - clib_net_to_host_u16 (tc->c_lcl_port)); - if (tepi != TRANSPORT_ENDPOINT_INVALID_INDEX) - { - tep = pool_elt_at_index (tm->local_endpoints, tepi); - transport_endpoint_table_del (&tm->local_endpoints_table, tep); - transport_endpoint_del (tepi); - } + transport_endpoint_cleanup (TRANSPORT_PROTO_TCP, &tc->c_lcl_ip, + tc->c_lcl_port); /* Check if connection is not yet fully established */ if (tc->state == TCP_STATE_SYN_SENT) @@ -288,7 +229,7 @@ tcp_connection_reset (tcp_connection_t * tc) tcp_connection_cleanup (tc); break; case TCP_STATE_SYN_SENT: - stream_session_connect_notify (&tc->connection, 1 /* fail */ ); + session_stream_connect_notify (&tc->connection, 1 /* fail */ ); tcp_connection_cleanup (tc); break; case TCP_STATE_ESTABLISHED: @@ -388,57 +329,6 @@ tcp_session_cleanup (u32 conn_index, u32 thread_index) tcp_timer_update (tc, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME); } -#define PORT_MASK ((1 << 16)- 1) -/** - * Allocate local port and add if successful add entry to local endpoint - * table to mark the pair as used. - */ -int -tcp_allocate_local_port (ip46_address_t * ip) -{ - tcp_main_t *tm = vnet_get_tcp_main (); - transport_endpoint_t *tep; - u32 tei; - u16 min = 1024, max = 65535; /* XXX configurable ? */ - int tries, limit; - - limit = max - min; - - /* Only support active opens from thread 0 */ - ASSERT (vlib_get_thread_index () == 0); - - /* Search for first free slot */ - for (tries = 0; tries < limit; tries++) - { - u16 port = 0; - - /* Find a port in the specified range */ - while (1) - { - port = random_u32 (&tm->port_allocator_seed) & PORT_MASK; - if (PREDICT_TRUE (port >= min && port < max)) - break; - } - - /* Look it up */ - tei = transport_endpoint_lookup (&tm->local_endpoints_table, ip, port); - /* If not found, we're done */ - if (tei == TRANSPORT_ENDPOINT_INVALID_INDEX) - { - clib_spinlock_lock_if_init (&tm->local_endpoints_lock); - tep = transport_endpoint_new (); - clib_memcpy (&tep->ip, ip, sizeof (*ip)); - tep->port = port; - transport_endpoint_table_add (&tm->local_endpoints_table, tep, - tep - tm->local_endpoints); - clib_spinlock_unlock_if_init (&tm->local_endpoints_lock); - - return tep->port; - } - } - return -1; -} - /** * Initialize all connection timers as invalid */ @@ -574,9 +464,15 @@ tcp_init_snd_vars (tcp_connection_t * tc) { u32 time_now; - /* Set random initial sequence */ - tcp_set_time_now (0); + /* + * We use the time to randomize iss and for setting up the initial + * timestamp. Make sure it's updated otherwise syn and ack in the + * handshake may make it look as if time has flown in the opposite + * direction for us. + */ + tcp_set_time_now (vlib_get_thread_index ()); time_now = tcp_time_now (); + tc->iss = random_u32 (&time_now); tc->snd_una = tc->iss; tc->snd_nxt = tc->iss + 1; @@ -600,112 +496,70 @@ tcp_connection_init_vars (tcp_connection_t * tc) // tcp_connection_fib_attach (tc); } +static int +tcp_alloc_custom_local_endpoint (tcp_main_t * tm, ip46_address_t * lcl_addr, + u16 * lcl_port, u8 is_ip4) +{ + int index, port; + if (is_ip4) + { + index = tm->last_v4_address_rotor++; + if (tm->last_v4_address_rotor >= vec_len (tm->ip4_src_addresses)) + tm->last_v4_address_rotor = 0; + lcl_addr->ip4.as_u32 = tm->ip4_src_addresses[index].as_u32; + } + else + { + index = tm->last_v6_address_rotor++; + if (tm->last_v6_address_rotor >= vec_len (tm->ip6_src_addresses)) + tm->last_v6_address_rotor = 0; + clib_memcpy (&lcl_addr->ip6, &tm->ip6_src_addresses[index], + sizeof (ip6_address_t)); + } + port = transport_alloc_local_port (TRANSPORT_PROTO_TCP, lcl_addr); + if (port < 1) + { + clib_warning ("Failed to allocate src port"); + return -1; + } + *lcl_port = port; + return 0; +} + int tcp_connection_open (transport_endpoint_t * rmt) { tcp_main_t *tm = vnet_get_tcp_main (); tcp_connection_t *tc; - fib_prefix_t prefix; - fib_node_index_t fei; - u32 sw_if_index; ip46_address_t lcl_addr; - int lcl_port; + u16 lcl_port; + int rv; /* - * Find the local address and allocate port + * Allocate local endpoint */ - memset (&lcl_addr, 0, sizeof (lcl_addr)); - - /* Find a FIB path to the destination */ - clib_memcpy (&prefix.fp_addr, &rmt->ip, sizeof (rmt->ip)); - prefix.fp_proto = rmt->is_ip4 ? FIB_PROTOCOL_IP4 : FIB_PROTOCOL_IP6; - prefix.fp_len = rmt->is_ip4 ? 32 : 128; - - ASSERT (rmt->fib_index != ENDPOINT_INVALID_INDEX); - fei = fib_table_lookup (rmt->fib_index, &prefix); - - /* Couldn't find route to destination. Bail out. */ - if (fei == FIB_NODE_INDEX_INVALID) - { - clib_warning ("no route to destination"); - return -1; - } - - sw_if_index = rmt->sw_if_index; - if (sw_if_index == ENDPOINT_INVALID_INDEX) - sw_if_index = fib_entry_get_resolving_interface (fei); - - if (sw_if_index == ENDPOINT_INVALID_INDEX) - { - clib_warning ("no resolving interface for %U", format_ip46_address, - &rmt->ip, (rmt->is_ip4 == 0) + 1); - return -1; - } - - if (rmt->is_ip4) - { - ip4_address_t *ip4; - int index; - if (vec_len (tm->ip4_src_addresses)) - { - index = tm->last_v4_address_rotor++; - if (tm->last_v4_address_rotor >= vec_len (tm->ip4_src_addresses)) - tm->last_v4_address_rotor = 0; - lcl_addr.ip4.as_u32 = tm->ip4_src_addresses[index].as_u32; - } - else - { - ip4 = ip_interface_get_first_ip (sw_if_index, 1); - lcl_addr.ip4.as_u32 = ip4->as_u32; - } - } + if ((rmt->is_ip4 && vec_len (tm->ip4_src_addresses)) + || (!rmt->is_ip4 && vec_len (tm->ip6_src_addresses))) + rv = tcp_alloc_custom_local_endpoint (tm, &lcl_addr, &lcl_port, + rmt->is_ip4); else - { - ip6_address_t *ip6; - int index; - - if (vec_len (tm->ip6_src_addresses)) - { - index = tm->last_v6_address_rotor++; - if (tm->last_v6_address_rotor >= vec_len (tm->ip6_src_addresses)) - tm->last_v6_address_rotor = 0; - clib_memcpy (&lcl_addr.ip6, &tm->ip6_src_addresses[index], - sizeof (*ip6)); - } - else - { - ip6 = ip_interface_get_first_ip (sw_if_index, 0); - if (ip6 == 0) - { - clib_warning ("no routable ip6 addresses on %U", - format_vnet_sw_if_index_name, vnet_get_main (), - sw_if_index); - return -1; - } - - clib_memcpy (&lcl_addr.ip6, ip6, sizeof (*ip6)); - } - } + rv = transport_alloc_local_endpoint (TRANSPORT_PROTO_TCP, + rmt, &lcl_addr, &lcl_port); - /* Allocate source port */ - lcl_port = tcp_allocate_local_port (&lcl_addr); - if (lcl_port < 1) - { - clib_warning ("Failed to allocate src port"); - return -1; - } + if (rv) + return -1; /* * Create connection and send SYN */ clib_spinlock_lock_if_init (&tm->half_open_lock); tc = tcp_half_open_connection_new (); - clib_memcpy (&tc->c_rmt_ip, &rmt->ip, sizeof (ip46_address_t)); - clib_memcpy (&tc->c_lcl_ip, &lcl_addr, sizeof (ip46_address_t)); + ip_copy (&tc->c_rmt_ip, &rmt->ip, rmt->is_ip4); + ip_copy (&tc->c_lcl_ip, &lcl_addr, rmt->is_ip4); tc->c_rmt_port = rmt->port; tc->c_lcl_port = clib_host_to_net_u16 (lcl_port); tc->c_is_ip4 = rmt->is_ip4; - tc->c_transport_proto = TRANSPORT_PROTO_TCP; + tc->c_proto = TRANSPORT_PROTO_TCP; tc->c_fib_index = rmt->fib_index; /* The other connection vars will be initialized after SYN ACK */ tcp_connection_timers_init (tc); @@ -1195,7 +1049,7 @@ tcp_timer_establish_handler (u32 conn_index) if (tc) { ASSERT (tc->state == TCP_STATE_SYN_SENT); - stream_session_connect_notify (&tc->connection, 1 /* fail */ ); + session_stream_connect_notify (&tc->connection, 1 /* fail */ ); TCP_DBG ("establish pop: %U", format_tcp_connection, tc, 2); } else @@ -1328,8 +1182,8 @@ tcp_main_enable (vlib_main_t * vm) ip6_register_protocol (IP_PROTOCOL_TCP, tcp6_input_node.index); /* Register as transport with session layer */ - session_register_transport (TRANSPORT_PROTO_TCP, 1, &tcp_proto); - session_register_transport (TRANSPORT_PROTO_TCP, 0, &tcp_proto); + transport_register_protocol (TRANSPORT_PROTO_TCP, 1, &tcp_proto); + transport_register_protocol (TRANSPORT_PROTO_TCP, 0, &tcp_proto); /* * Initialize data structures @@ -1379,22 +1233,9 @@ tcp_main_enable (vlib_main_t * vm) tm->tstamp_ticks_per_clock = vm->clib_time.seconds_per_clock / TCP_TSTAMP_RESOLUTION; - if (tm->local_endpoints_table_buckets == 0) - tm->local_endpoints_table_buckets = 250000; - if (tm->local_endpoints_table_memory == 0) - tm->local_endpoints_table_memory = 512 << 20; - - clib_bihash_init_24_8 (&tm->local_endpoints_table, "local endpoint table", - tm->local_endpoints_table_buckets, - tm->local_endpoints_table_memory); - - /* Initialize [port-allocator] random number seed */ - tm->port_allocator_seed = (u32) clib_cpu_time_now (); - if (num_threads > 1) { clib_spinlock_init (&tm->half_open_lock); - clib_spinlock_init (&tm->local_endpoints_lock); } vec_validate (tm->tx_frames[0], num_threads - 1); diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h index 2a65dfae784..b057b883fd8 100644 --- a/src/vnet/tcp/tcp.h +++ b/src/vnet/tcp/tcp.h @@ -385,13 +385,6 @@ typedef struct _tcp_main tcp_connection_t *half_open_connections; clib_spinlock_t half_open_lock; - /* Pool of local TCP endpoints */ - transport_endpoint_t *local_endpoints; - - /* Local endpoints lookup table */ - transport_endpoint_table_t local_endpoints_table; - clib_spinlock_t local_endpoints_lock; - /* Congestion control algorithms registered */ tcp_cc_algorithm_t *cc_algos; @@ -412,9 +405,6 @@ typedef struct _tcp_main u32 last_v6_address_rotor; ip6_address_t *ip6_src_addresses; - /** Port allocator random number generator seed */ - u32 port_allocator_seed; - /** vlib buffer size */ u32 bytes_per_buffer; diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c index 3a32e62d243..73642df8ca9 100644 --- a/src/vnet/tcp/tcp_input.c +++ b/src/vnet/tcp/tcp_input.c @@ -1388,8 +1388,8 @@ tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b, return TCP_ERROR_PURE_ACK; } - written = stream_session_enqueue_data (&tc->connection, b, 0, - 1 /* queue event */ , 1); + written = session_enqueue_stream_connection (&tc->connection, b, 0, + 1 /* queue event */ , 1); TCP_EVT_DBG (TCP_EVT_INPUT, tc, 0, data_len, written); @@ -1450,9 +1450,10 @@ tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b, } /* Enqueue out-of-order data with relative offset */ - rv = stream_session_enqueue_data (&tc->connection, b, - vnet_buffer (b)->tcp.seq_number - - tc->rcv_nxt, 0 /* queue event */ , 0); + rv = session_enqueue_stream_connection (&tc->connection, b, + vnet_buffer (b)->tcp.seq_number - + tc->rcv_nxt, 0 /* queue event */ , + 0); /* Nothing written */ if (rv) @@ -1669,15 +1670,16 @@ tcp_set_rx_trace_data (tcp_rx_trace_t * t0, tcp_connection_t * tc0, } always_inline void -tcp_established_inc_counter (vlib_main_t * vm, u8 is_ip4, u8 evt, u8 val) +tcp_node_inc_counter (vlib_main_t * vm, u32 tcp4_node, u32 tcp6_node, + u8 is_ip4, u8 evt, u8 val) { if (PREDICT_TRUE (!val)) return; if (is_ip4) - vlib_node_increment_counter (vm, tcp4_established_node.index, evt, val); + vlib_node_increment_counter (vm, tcp4_node, evt, val); else - vlib_node_increment_counter (vm, tcp6_established_node.index, evt, val); + vlib_node_increment_counter (vm, tcp6_node, evt, val); } always_inline uword @@ -1787,8 +1789,11 @@ tcp46_established_inline (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_put_next_frame (vm, node, next_index, n_left_to_next); } - errors = session_manager_flush_enqueue_events (my_thread_index); - tcp_established_inc_counter (vm, is_ip4, TCP_ERROR_EVENT_FIFO_FULL, errors); + errors = session_manager_flush_enqueue_events (TRANSPORT_PROTO_TCP, + my_thread_index); + tcp_node_inc_counter (vm, is_ip4, tcp4_established_node.index, + tcp6_established_node.index, + TCP_ERROR_EVENT_FIFO_FULL, errors); tcp_flush_frame_to_output (vm, my_thread_index, is_ip4); return from_frame->n_vectors; @@ -1873,8 +1878,7 @@ tcp_lookup_is_valid (tcp_connection_t * tc, tcp_header_t * hdr) { handle = session_lookup_half_open_handle (&tc->connection); tmp = session_lookup_half_open_connection (handle & 0xFFFFFFFF, - tc->c_transport_proto, - tc->c_is_ip4); + tc->c_proto, tc->c_is_ip4); if (tmp) { @@ -2117,7 +2121,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node, /* Notify app that we have connection. If session layer can't * allocate session send reset */ - if (stream_session_connect_notify (&new_tc0->connection, 0)) + if (session_stream_connect_notify (&new_tc0->connection, 0)) { clib_warning ("connect notify fail"); tcp_send_reset_w_pkt (new_tc0, b0, is_ip4); @@ -2138,7 +2142,7 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node, new_tc0->state = TCP_STATE_SYN_RCVD; /* Notify app that we have connection */ - if (stream_session_connect_notify (&new_tc0->connection, 0)) + if (session_stream_connect_notify (&new_tc0->connection, 0)) { tcp_connection_cleanup (new_tc0); tcp_send_reset_w_pkt (tc0, b0, is_ip4); @@ -2187,17 +2191,11 @@ tcp46_syn_sent_inline (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_put_next_frame (vm, node, next_index, n_left_to_next); } - errors = session_manager_flush_enqueue_events (my_thread_index); - if (errors) - { - if (is_ip4) - vlib_node_increment_counter (vm, tcp4_established_node.index, - TCP_ERROR_EVENT_FIFO_FULL, errors); - else - vlib_node_increment_counter (vm, tcp6_established_node.index, - TCP_ERROR_EVENT_FIFO_FULL, errors); - } - + errors = session_manager_flush_enqueue_events (TRANSPORT_PROTO_TCP, + my_thread_index); + tcp_node_inc_counter (vm, is_ip4, tcp4_syn_sent_node.index, + tcp6_syn_sent_node.index, + TCP_ERROR_EVENT_FIFO_FULL, errors); return from_frame->n_vectors; } @@ -2259,6 +2257,9 @@ VLIB_REGISTER_NODE (tcp6_syn_sent_node) = VLIB_NODE_FUNCTION_MULTIARCH (tcp6_syn_sent_node, tcp6_syn_sent_rcv); +vlib_node_registration_t tcp4_rcv_process_node; +vlib_node_registration_t tcp6_rcv_process_node; + /** * Handles reception for all states except LISTEN, SYN-SENT and ESTABLISHED * as per RFC793 p. 64 @@ -2583,16 +2584,11 @@ tcp46_rcv_process_inline (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_put_next_frame (vm, node, next_index, n_left_to_next); } - errors = session_manager_flush_enqueue_events (my_thread_index); - if (errors) - { - if (is_ip4) - vlib_node_increment_counter (vm, tcp4_established_node.index, - TCP_ERROR_EVENT_FIFO_FULL, errors); - else - vlib_node_increment_counter (vm, tcp6_established_node.index, - TCP_ERROR_EVENT_FIFO_FULL, errors); - } + errors = session_manager_flush_enqueue_events (TRANSPORT_PROTO_TCP, + my_thread_index); + tcp_node_inc_counter (vm, is_ip4, tcp4_rcv_process_node.index, + tcp6_rcv_process_node.index, + TCP_ERROR_EVENT_FIFO_FULL, errors); return from_frame->n_vectors; } @@ -2966,12 +2962,13 @@ tcp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, + tcp_header_bytes (tcp0)); n_data_bytes0 = clib_net_to_host_u16 (ip40->length) - n_advance_bytes0; - tconn = - session_lookup_connection_wt4 (fib_index0, &ip40->dst_address, - &ip40->src_address, - tcp0->dst_port, tcp0->src_port, - TRANSPORT_PROTO_TCP, - my_thread_index); + tconn = session_lookup_connection_wt4 (fib_index0, + &ip40->dst_address, + &ip40->src_address, + tcp0->dst_port, + tcp0->src_port, + TRANSPORT_PROTO_TCP, + my_thread_index); tc0 = tcp_get_connection_from_transport (tconn); ASSERT (tcp_lookup_is_valid (tc0, tcp0)); } @@ -2983,12 +2980,13 @@ tcp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, n_data_bytes0 = clib_net_to_host_u16 (ip60->payload_length) - n_advance_bytes0; n_advance_bytes0 += sizeof (ip60[0]); - tconn = - session_lookup_connection_wt6 (fib_index0, &ip60->dst_address, - &ip60->src_address, - tcp0->dst_port, tcp0->src_port, - TRANSPORT_PROTO_TCP, - my_thread_index); + tconn = session_lookup_connection_wt6 (fib_index0, + &ip60->dst_address, + &ip60->src_address, + tcp0->dst_port, + tcp0->src_port, + TRANSPORT_PROTO_TCP, + my_thread_index); tc0 = tcp_get_connection_from_transport (tconn); ASSERT (tcp_lookup_is_valid (tc0, tcp0)); } diff --git a/src/vnet/tcp/tcp_test.c b/src/vnet/tcp/tcp_test.c index 2018855840f..021f416cb76 100644 --- a/src/vnet/tcp/tcp_test.c +++ b/src/vnet/tcp/tcp_test.c @@ -1574,7 +1574,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input) tc->connection.rmt_ip.ip4.as_u32 = clib_host_to_net_u32 (0x06000103); tc->connection.lcl_port = 35051; tc->connection.rmt_port = 53764; - tc->connection.transport_proto = 0; + tc->connection.proto = 0; clib_memcpy (tc1, &tc->connection, sizeof (*tc1)); pool_get (session_manager_main.sessions[0], s); @@ -1590,7 +1590,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input) tc->connection.rmt_ip.ip4.as_u32 = clib_host_to_net_u32 (0x06000102); tc->connection.lcl_port = 38225; tc->connection.rmt_port = 53764; - tc->connection.transport_proto = 0; + tc->connection.proto = 0; clib_memcpy (tc2, &tc->connection, sizeof (*tc2)); /* @@ -1601,7 +1601,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input) tconn = session_lookup_connection_wt4 (0, &tc1->lcl_ip.ip4, &tc1->rmt_ip.ip4, tc1->lcl_port, tc1->rmt_port, - tc1->transport_proto, 0); + tc1->proto, 0); cmp = (memcmp (&tconn->rmt_ip, &tc1->rmt_ip, sizeof (tc1->rmt_ip)) == 0); TCP_TEST ((cmp), "rmt ip is identical %d", cmp); TCP_TEST ((tconn->lcl_port == tc1->lcl_port), @@ -1614,7 +1614,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input) tconn = session_lookup_connection_wt4 (0, &tc2->lcl_ip.ip4, &tc2->rmt_ip.ip4, tc2->lcl_port, tc2->rmt_port, - tc2->transport_proto, 0); + tc2->proto, 0); TCP_TEST ((tconn == 0), "lookup result should be null"); /* @@ -1624,12 +1624,12 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input) tconn = session_lookup_connection_wt4 (0, &tc1->lcl_ip.ip4, &tc1->rmt_ip.ip4, tc1->lcl_port, tc1->rmt_port, - tc1->transport_proto, 0); + tc1->proto, 0); TCP_TEST ((tconn == 0), "lookup result should be null"); tconn = session_lookup_connection_wt4 (0, &tc2->lcl_ip.ip4, &tc2->rmt_ip.ip4, tc2->lcl_port, tc2->rmt_port, - tc2->transport_proto, 0); + tc2->proto, 0); TCP_TEST ((tconn == 0), "lookup result should be null"); /* @@ -1639,7 +1639,7 @@ tcp_test_lookup (vlib_main_t * vm, unformat_input_t * input) tconn = session_lookup_connection_wt4 (0, &tc2->lcl_ip.ip4, &tc2->rmt_ip.ip4, tc2->lcl_port, tc2->rmt_port, - tc2->transport_proto, 0); + tc2->proto, 0); TCP_TEST ((tconn == 0), "lookup result should be null"); return 0; diff --git a/src/vnet/udp/builtin_server.c b/src/vnet/udp/builtin_server.c index 23f0854fb53..593642cb09c 100644 --- a/src/vnet/udp/builtin_server.c +++ b/src/vnet/udp/builtin_server.c @@ -73,7 +73,6 @@ builtin_server_rx_callback (stream_session_t * s) /* Fabricate TX event, send to ourselves */ evt.fifo = tx_fifo; evt.event_type = FIFO_EVENT_APP_TX; - evt.event_id = 0; q = session_manager_get_vpp_event_queue (s->thread_index); unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ ); diff --git a/src/vnet/udp/udp.c b/src/vnet/udp/udp.c index 0e0336b5919..c12e837a74a 100644 --- a/src/vnet/udp/udp.c +++ b/src/vnet/udp/udp.c @@ -22,61 +22,92 @@ #include #include -udp_uri_main_t udp_uri_main; - -u32 -udp_session_bind_ip4 (u32 session_index, transport_endpoint_t * lcl) +udp_connection_t * +udp_connection_alloc (u32 thread_index) { - udp_uri_main_t *um = vnet_get_udp_main (); - udp_connection_t *listener; + udp_main_t *um = &udp_main; + udp_connection_t *uc; + u32 will_expand = 0; + pool_get_aligned_will_expand (um->connections[thread_index], will_expand, + CLIB_CACHE_LINE_BYTES); - pool_get (um->udp_listeners, listener); - memset (listener, 0, sizeof (udp_connection_t)); - listener->c_lcl_port = lcl->port; - listener->c_lcl_ip4.as_u32 = lcl->ip.ip4.as_u32; - listener->c_transport_proto = TRANSPORT_PROTO_UDP; - udp_register_dst_port (um->vlib_main, clib_net_to_host_u16 (lcl->port), - udp4_uri_input_node.index, 1 /* is_ipv4 */ ); - return 0; + if (PREDICT_FALSE (will_expand)) + { + clib_spinlock_lock_if_init (&udp_main.peekers_write_locks + [thread_index]); + pool_get_aligned (udp_main.connections[thread_index], uc, + CLIB_CACHE_LINE_BYTES); + clib_spinlock_unlock_if_init (&udp_main.peekers_write_locks + [thread_index]); + } + else + { + pool_get_aligned (um->connections[thread_index], uc, + CLIB_CACHE_LINE_BYTES); + } + memset (uc, 0, sizeof (*uc)); + uc->c_c_index = uc - um->connections[thread_index]; + uc->c_thread_index = thread_index; + uc->c_proto = TRANSPORT_PROTO_UDP; + return uc; } -u32 -udp_session_bind_ip6 (u32 session_index, transport_endpoint_t * lcl) +void +udp_connection_free (udp_connection_t * uc) { - udp_uri_main_t *um = vnet_get_udp_main (); - udp_connection_t *listener; - - pool_get (um->udp_listeners, listener); - listener->c_lcl_port = lcl->port; - clib_memcpy (&listener->c_lcl_ip6, &lcl->ip.ip6, sizeof (ip6_address_t)); - listener->c_transport_proto = TRANSPORT_PROTO_UDP; - udp_register_dst_port (um->vlib_main, clib_net_to_host_u16 (lcl->port), - udp4_uri_input_node.index, 0 /* is_ipv4 */ ); - return 0; + pool_put (udp_main.connections[uc->c_thread_index], uc); + if (CLIB_DEBUG) + memset (uc, 0xFA, sizeof (*uc)); } u32 -udp_session_unbind_ip4 (u32 listener_index) +udp_session_bind (u32 session_index, transport_endpoint_t * lcl) { + udp_main_t *um = vnet_get_udp_main (); vlib_main_t *vm = vlib_get_main (); udp_connection_t *listener; - listener = udp_listener_get (listener_index); + u32 node_index; + void *iface_ip; + udp_dst_port_info_t *pi; - /* deregister the udp_local mapping */ - udp_unregister_dst_port (vm, listener->c_lcl_port, 1 /* is_ipv4 */ ); - return 0; + pi = udp_get_dst_port_info (um, lcl->port, lcl->is_ip4); + if (pi) + return -1; + + pool_get (um->listener_pool, listener); + memset (listener, 0, sizeof (udp_connection_t)); + + listener->c_lcl_port = lcl->port; + listener->c_c_index = listener - um->listener_pool; + + /* If we are provided a sw_if_index, bind using one of its ips */ + if (ip_is_zero (&lcl->ip, 1) && lcl->sw_if_index != ENDPOINT_INVALID_INDEX) + { + if ((iface_ip = ip_interface_get_first_ip (lcl->sw_if_index, + lcl->is_ip4))) + ip_set (&lcl->ip, iface_ip, lcl->is_ip4); + } + ip_copy (&listener->c_lcl_ip, &lcl->ip, lcl->is_ip4); + listener->c_is_ip4 = lcl->is_ip4; + listener->c_proto = TRANSPORT_PROTO_UDP; + listener->c_s_index = session_index; + listener->c_fib_index = lcl->fib_index; + + node_index = lcl->is_ip4 ? udp4_input_node.index : udp6_input_node.index; + udp_register_dst_port (vm, clib_net_to_host_u16 (lcl->port), node_index, + 1 /* is_ipv4 */ ); + return listener->c_c_index; } u32 -udp_session_unbind_ip6 (u32 listener_index) +udp_session_unbind (u32 listener_index) { vlib_main_t *vm = vlib_get_main (); - udp_connection_t *listener; + udp_connection_t *listener; listener = udp_listener_get (listener_index); - - /* deregister the udp_local mapping */ - udp_unregister_dst_port (vm, listener->c_lcl_port, 0 /* is_ipv4 */ ); + udp_unregister_dst_port (vm, clib_net_to_host_u16 (listener->c_lcl_port), + listener->c_is_ip4); return 0; } @@ -90,214 +121,220 @@ udp_session_get_listener (u32 listener_index) } u32 -udp_push_header (transport_connection_t * tconn, vlib_buffer_t * b) +udp_push_header (transport_connection_t * tc, vlib_buffer_t * b) { - udp_connection_t *us; - u8 *data; - udp_header_t *udp; + udp_connection_t *uc; + vlib_main_t *vm = vlib_get_main (); - us = (udp_connection_t *) tconn; + uc = udp_get_connection_from_transport (tc); - if (tconn->is_ip4) - { - ip4_header_t *ip; - - data = vlib_buffer_get_current (b); - udp = (udp_header_t *) (data - sizeof (udp_header_t)); - ip = (ip4_header_t *) ((u8 *) udp - sizeof (ip4_header_t)); - - /* Build packet header, swap rx key src + dst fields */ - ip->src_address.as_u32 = us->c_lcl_ip4.as_u32; - ip->dst_address.as_u32 = us->c_rmt_ip4.as_u32; - ip->ip_version_and_header_length = 0x45; - ip->ttl = 254; - ip->protocol = IP_PROTOCOL_UDP; - ip->length = clib_host_to_net_u16 (b->current_length + sizeof (*udp)); - ip->checksum = ip4_header_checksum (ip); - - udp->src_port = us->c_lcl_port; - udp->dst_port = us->c_rmt_port; - udp->length = clib_host_to_net_u16 (b->current_length); - udp->checksum = 0; - - b->current_length = sizeof (*ip) + sizeof (*udp); - return SESSION_QUEUE_NEXT_IP4_LOOKUP; - } + vlib_buffer_push_udp (b, uc->c_lcl_port, uc->c_rmt_port, 1); + if (tc->is_ip4) + vlib_buffer_push_ip4 (vm, b, &uc->c_lcl_ip4, &uc->c_rmt_ip4, + IP_PROTOCOL_UDP, 1); else { - vlib_main_t *vm = vlib_get_main (); - ip6_header_t *ip; - u16 payload_length; - int bogus = ~0; - - data = vlib_buffer_get_current (b); - udp = (udp_header_t *) (data - sizeof (udp_header_t)); - ip = (ip6_header_t *) ((u8 *) udp - sizeof (ip6_header_t)); - - /* Build packet header, swap rx key src + dst fields */ - clib_memcpy (&ip->src_address, &us->c_lcl_ip6, sizeof (ip6_address_t)); - clib_memcpy (&ip->dst_address, &us->c_rmt_ip6, sizeof (ip6_address_t)); - - ip->ip_version_traffic_class_and_flow_label = - clib_host_to_net_u32 (0x6 << 28); - - ip->hop_limit = 0xff; - ip->protocol = IP_PROTOCOL_UDP; - - payload_length = vlib_buffer_length_in_chain (vm, b); - payload_length -= sizeof (*ip); - - ip->payload_length = clib_host_to_net_u16 (payload_length); - - udp->checksum = ip6_tcp_udp_icmp_compute_checksum (vm, b, ip, &bogus); - ASSERT (!bogus); - - udp->src_port = us->c_lcl_port; - udp->dst_port = us->c_rmt_port; - udp->length = clib_host_to_net_u16 (b->current_length); - udp->checksum = 0; - - b->current_length = sizeof (*ip) + sizeof (*udp); - - return SESSION_QUEUE_NEXT_IP6_LOOKUP; + ip6_header_t *ih; + ih = vlib_buffer_push_ip6 (vm, b, &uc->c_lcl_ip6, &uc->c_rmt_ip6, + IP_PROTOCOL_UDP); + vnet_buffer (b)->l3_hdr_offset = (u8 *) ih - b->data; } + vnet_buffer (b)->sw_if_index[VLIB_RX] = 0; + vnet_buffer (b)->sw_if_index[VLIB_TX] = ~0; + b->flags |= VNET_BUFFER_F_LOCALLY_ORIGINATED; + + return 0; } transport_connection_t * -udp_session_get (u32 connection_index, u32 my_thread_index) +udp_session_get (u32 connection_index, u32 thread_index) { - udp_uri_main_t *um = vnet_get_udp_main (); + udp_connection_t *uc; + uc = udp_connection_get (connection_index, thread_index); + if (uc) + return &uc->connection; + return 0; +} - udp_connection_t *us; - us = - pool_elt_at_index (um->udp_sessions[my_thread_index], connection_index); - return &us->connection; +void +udp_session_close (u32 connection_index, u32 thread_index) +{ + vlib_main_t *vm = vlib_get_main (); + udp_connection_t *uc; + uc = udp_connection_get (connection_index, thread_index); + if (uc) + { + udp_unregister_dst_port (vm, clib_net_to_host_u16 (uc->c_lcl_port), + uc->c_is_ip4); + stream_session_delete_notify (&uc->connection); + udp_connection_free (uc); + } } void -udp_session_close (u32 connection_index, u32 my_thread_index) +udp_session_cleanup (u32 connection_index, u32 thread_index) { - udp_uri_main_t *um = vnet_get_udp_main (); - pool_put_index (um->udp_sessions[my_thread_index], connection_index); + udp_connection_t *uc; + uc = udp_connection_get (connection_index, thread_index); + if (uc) + udp_connection_free (uc); } u8 * -format_udp_session_ip4 (u8 * s, va_list * args) +format_udp_connection_id (u8 * s, va_list * args) { - u32 uci = va_arg (*args, u32); - u32 thread_index = va_arg (*args, u32); - udp_connection_t *u4; - - u4 = udp_connection_get (uci, thread_index); + udp_connection_t *uc = va_arg (*args, udp_connection_t *); + if (!uc) + return s; + if (uc->c_is_ip4) + s = format (s, "[#%d][%s] %U:%d->%U:%d", uc->c_thread_index, "U", + format_ip4_address, &uc->c_lcl_ip4, + clib_net_to_host_u16 (uc->c_lcl_port), format_ip4_address, + &uc->c_rmt_ip4, clib_net_to_host_u16 (uc->c_rmt_port)); + else + s = format (s, "[#%d][%s] %U:%d->%U:%d", uc->c_thread_index, "U", + format_ip6_address, &uc->c_lcl_ip6, + clib_net_to_host_u16 (uc->c_lcl_port), format_ip6_address, + &uc->c_rmt_ip6, clib_net_to_host_u16 (uc->c_rmt_port)); + return s; +} - s = format (s, "[%s] %U:%d->%U:%d", "udp", format_ip4_address, - &u4->c_lcl_ip4, clib_net_to_host_u16 (u4->c_lcl_port), - format_ip4_address, &u4->c_rmt_ip4, - clib_net_to_host_u16 (u4->c_rmt_port)); +u8 * +format_udp_connection (u8 * s, va_list * args) +{ + udp_connection_t *uc = va_arg (*args, udp_connection_t *); + u32 verbose = va_arg (*args, u32); + if (!uc) + return s; + s = format (s, "%-50U", format_udp_connection_id, uc); + if (verbose) + { + if (verbose == 1) + s = format (s, "%-15s", "-"); + else + s = format (s, "\n"); + } return s; } u8 * -format_udp_session_ip6 (u8 * s, va_list * args) +format_udp_session (u8 * s, va_list * args) { u32 uci = va_arg (*args, u32); u32 thread_index = va_arg (*args, u32); - udp_connection_t *tc = udp_connection_get (uci, thread_index); - s = format (s, "[%s] %U:%d->%U:%d", "udp", format_ip6_address, - &tc->c_lcl_ip6, clib_net_to_host_u16 (tc->c_lcl_port), - format_ip6_address, &tc->c_rmt_ip6, - clib_net_to_host_u16 (tc->c_rmt_port)); - return s; + u32 verbose = va_arg (*args, u32); + udp_connection_t *uc; + + uc = udp_connection_get (uci, thread_index); + return format (s, "%U", format_udp_connection, uc, verbose); } u8 * -format_udp_listener_session_ip4 (u8 * s, va_list * args) +format_udp_half_open_session (u8 * s, va_list * args) { - u32 tci = va_arg (*args, u32); - udp_connection_t *tc = udp_listener_get (tci); - s = format (s, "[%s] %U:%d->%U:%d", "udp", format_ip4_address, - &tc->c_lcl_ip4, clib_net_to_host_u16 (tc->c_lcl_port), - format_ip4_address, &tc->c_rmt_ip4, - clib_net_to_host_u16 (tc->c_rmt_port)); - return s; + clib_warning ("BUG"); + return 0; } u8 * -format_udp_listener_session_ip6 (u8 * s, va_list * args) +format_udp_listener_session (u8 * s, va_list * args) { u32 tci = va_arg (*args, u32); - udp_connection_t *tc = udp_listener_get (tci); - s = format (s, "[%s] %U:%d->%U:%d", "udp", format_ip6_address, - &tc->c_lcl_ip6, clib_net_to_host_u16 (tc->c_lcl_port), - format_ip6_address, &tc->c_rmt_ip6, - clib_net_to_host_u16 (tc->c_rmt_port)); - return s; + udp_connection_t *uc = udp_listener_get (tci); + return format (s, "%U", format_udp_connection, uc); } u16 -udp_send_mss_uri (transport_connection_t * t) +udp_send_mss (transport_connection_t * t) { /* TODO figure out MTU of output interface */ - return 400; + return 1460; } u32 -udp_send_space_uri (transport_connection_t * t) +udp_send_space (transport_connection_t * t) { /* No constraint on TX window */ return ~0; } int -udp_open_connection (transport_endpoint_t * tep) +udp_open_connection (transport_endpoint_t * rmt) { - clib_warning ("Not implemented"); - return 0; + udp_main_t *um = vnet_get_udp_main (); + vlib_main_t *vm = vlib_get_main (); + u32 thread_index = vlib_get_thread_index (); + udp_connection_t *uc; + ip46_address_t lcl_addr; + u32 node_index; + u16 lcl_port; + + if (transport_alloc_local_endpoint (TRANSPORT_PROTO_UDP, rmt, &lcl_addr, + &lcl_port)) + return -1; + + while (udp_get_dst_port_info (um, lcl_port, rmt->is_ip4)) + { + lcl_port = transport_alloc_local_port (TRANSPORT_PROTO_UDP, &lcl_addr); + if (lcl_port < 1) + { + clib_warning ("Failed to allocate src port"); + return -1; + } + } + + node_index = rmt->is_ip4 ? udp4_input_node.index : udp6_input_node.index; + udp_register_dst_port (vm, lcl_port, node_index, 1 /* is_ipv4 */ ); + + uc = udp_connection_alloc (thread_index); + ip_copy (&uc->c_rmt_ip, &rmt->ip, rmt->is_ip4); + ip_copy (&uc->c_lcl_ip, &lcl_addr, rmt->is_ip4); + uc->c_rmt_port = rmt->port; + uc->c_lcl_port = clib_host_to_net_u16 (lcl_port); + uc->c_is_ip4 = rmt->is_ip4; + uc->c_proto = TRANSPORT_PROTO_UDP; + uc->c_fib_index = rmt->fib_index; + + return uc->c_c_index; } -/* *INDENT-OFF* */ -const static transport_proto_vft_t udp4_proto = { - .bind = udp_session_bind_ip4, - .open = udp_open_connection, - .unbind = udp_session_unbind_ip4, - .push_header = udp_push_header, - .get_connection = udp_session_get, - .get_listener = udp_session_get_listener, - .close = udp_session_close, - .send_mss = udp_send_mss_uri, - .send_space = udp_send_space_uri, - .format_connection = format_udp_session_ip4, - .format_listener = format_udp_listener_session_ip4 -}; +transport_connection_t * +udp_half_open_session_get_transport (u32 conn_index) +{ + udp_connection_t *uc; + uc = udp_connection_get (conn_index, vlib_get_thread_index ()); + return &uc->connection; +} -const static transport_proto_vft_t udp6_proto = { - .bind = udp_session_bind_ip6, +/* *INDENT-OFF* */ +const static transport_proto_vft_t udp_proto = { + .bind = udp_session_bind, .open = udp_open_connection, - .unbind = udp_session_unbind_ip6, + .unbind = udp_session_unbind, .push_header = udp_push_header, .get_connection = udp_session_get, .get_listener = udp_session_get_listener, + .get_half_open = udp_half_open_session_get_transport, .close = udp_session_close, - .send_mss = udp_send_mss_uri, - .send_space = udp_send_space_uri, - .format_connection = format_udp_session_ip6, - .format_listener = format_udp_listener_session_ip6 + .cleanup = udp_session_cleanup, + .send_mss = udp_send_mss, + .send_space = udp_send_space, + .format_connection = format_udp_session, + .format_half_open = format_udp_half_open_session, + .format_listener = format_udp_listener_session }; /* *INDENT-ON* */ static clib_error_t * udp_init (vlib_main_t * vm) { - udp_uri_main_t *um = vnet_get_udp_main (); + udp_main_t *um = vnet_get_udp_main (); ip_main_t *im = &ip_main; vlib_thread_main_t *tm = vlib_get_thread_main (); u32 num_threads; clib_error_t *error = 0; ip_protocol_info_t *pi; - um->vlib_main = vm; - um->vnet_main = vnet_get_main (); - if ((error = vlib_call_init_function (vm, ip_main_init))) return error; if ((error = vlib_call_init_function (vm, ip4_lookup_init))) @@ -318,16 +355,18 @@ udp_init (vlib_main_t * vm) /* Register as transport with URI */ - session_register_transport (TRANSPORT_PROTO_UDP, 1, &udp4_proto); - session_register_transport (TRANSPORT_PROTO_UDP, 0, &udp6_proto); + transport_register_protocol (TRANSPORT_PROTO_UDP, 1, &udp_proto); + transport_register_protocol (TRANSPORT_PROTO_UDP, 0, &udp_proto); /* * Initialize data structures */ num_threads = 1 /* main thread */ + tm->n_threads; - vec_validate (um->udp_sessions, num_threads - 1); - + vec_validate (um->connections, num_threads - 1); + vec_validate (um->connection_peekers, num_threads - 1); + vec_validate (um->peekers_readers_locks, num_threads - 1); + vec_validate (um->peekers_write_locks, num_threads - 1); return error; } diff --git a/src/vnet/udp/udp.h b/src/vnet/udp/udp.h index aa37701106c..920ef963ace 100644 --- a/src/vnet/udp/udp.h +++ b/src/vnet/udp/udp.h @@ -26,49 +26,6 @@ #include #include -typedef struct -{ - transport_connection_t connection; /** must be first */ - - /** ersatz MTU to limit fifo pushes to test data size */ - u32 mtu; -} udp_connection_t; - -typedef struct _udp_uri_main -{ - /* Per-worker thread udp connection pools */ - udp_connection_t **udp_sessions; - udp_connection_t *udp_listeners; - - /* convenience */ - vlib_main_t *vlib_main; - vnet_main_t *vnet_main; - ip4_main_t *ip4_main; - ip6_main_t *ip6_main; -} udp_uri_main_t; - -extern udp_uri_main_t udp_uri_main; -extern vlib_node_registration_t udp4_uri_input_node; - -always_inline udp_uri_main_t * -vnet_get_udp_main () -{ - return &udp_uri_main; -} - -always_inline udp_connection_t * -udp_connection_get (u32 conn_index, u32 thread_index) -{ - return pool_elt_at_index (udp_uri_main.udp_sessions[thread_index], - conn_index); -} - -always_inline udp_connection_t * -udp_listener_get (u32 conn_index) -{ - return pool_elt_at_index (udp_uri_main.udp_listeners, conn_index); -} - typedef enum { #define udp_error(n,s) UDP_ERROR_##n, @@ -77,6 +34,13 @@ typedef enum UDP_N_ERROR, } udp_error_t; +typedef struct +{ + transport_connection_t connection; /** must be first */ + /** ersatz MTU to limit fifo pushes to test data size */ + u32 mtu; +} udp_connection_t; + #define foreach_udp4_dst_port \ _ (53, dns) \ _ (67, dhcp_to_server) \ @@ -161,10 +125,108 @@ typedef struct u8 punt_unknown4; u8 punt_unknown6; - /* convenience */ - vlib_main_t *vlib_main; + /* + * Per-worker thread udp connection pools used with session layer + */ + udp_connection_t **connections; + u32 *connection_peekers; + clib_spinlock_t *peekers_readers_locks; + clib_spinlock_t *peekers_write_locks; + udp_connection_t *listener_pool; + } udp_main_t; +extern udp_main_t udp_main; +extern vlib_node_registration_t udp4_input_node; +extern vlib_node_registration_t udp6_input_node; + +always_inline udp_connection_t * +udp_connection_get (u32 conn_index, u32 thread_index) +{ + if (pool_is_free_index (udp_main.connections[thread_index], conn_index)) + return 0; + return pool_elt_at_index (udp_main.connections[thread_index], conn_index); +} + +always_inline udp_connection_t * +udp_listener_get (u32 conn_index) +{ + return pool_elt_at_index (udp_main.listener_pool, conn_index); +} + +always_inline udp_main_t * +vnet_get_udp_main () +{ + return &udp_main; +} + +always_inline udp_connection_t * +udp_get_connection_from_transport (transport_connection_t * tc) +{ + return ((udp_connection_t *) tc); +} + +always_inline u32 +udp_connection_index (udp_connection_t * uc) +{ + return (uc - udp_main.connections[uc->c_thread_index]); +} + +udp_connection_t *udp_connection_alloc (u32 thread_index); + +/** + * Acquires a lock that blocks a connection pool from expanding. + */ +always_inline void +udp_pool_add_peeker (u32 thread_index) +{ + if (thread_index != vlib_get_thread_index ()) + return; + clib_spinlock_lock_if_init (&udp_main.peekers_readers_locks[thread_index]); + udp_main.connection_peekers[thread_index] += 1; + if (udp_main.connection_peekers[thread_index] == 1) + clib_spinlock_lock_if_init (&udp_main.peekers_write_locks[thread_index]); + clib_spinlock_unlock_if_init (&udp_main.peekers_readers_locks + [thread_index]); +} + +always_inline void +udp_pool_remove_peeker (u32 thread_index) +{ + if (thread_index != vlib_get_thread_index ()) + return; + ASSERT (udp_main.connection_peekers[thread_index] > 0); + clib_spinlock_lock_if_init (&udp_main.peekers_readers_locks[thread_index]); + udp_main.connection_peekers[thread_index] -= 1; + if (udp_main.connection_peekers[thread_index] == 0) + clib_spinlock_unlock_if_init (&udp_main.peekers_write_locks + [thread_index]); + clib_spinlock_unlock_if_init (&udp_main.peekers_readers_locks + [thread_index]); +} + +always_inline udp_connection_t * +udp_conenction_clone_safe (u32 connection_index, u32 thread_index) +{ + udp_connection_t *old_c, *new_c; + u32 current_thread_index = vlib_get_thread_index (); + new_c = udp_connection_alloc (current_thread_index); + + /* If during the memcpy pool is reallocated AND the memory allocator + * decides to give the old chunk of memory to somebody in a hurry to + * scribble something on it, we have a problem. So add this thread as + * a session pool peeker. + */ + udp_pool_add_peeker (thread_index); + old_c = udp_main.connections[thread_index] + connection_index; + clib_memcpy (new_c, old_c, sizeof (*new_c)); + udp_pool_remove_peeker (thread_index); + new_c->c_thread_index = current_thread_index; + new_c->c_c_index = udp_connection_index (new_c); + return new_c; +} + + always_inline udp_dst_port_info_t * udp_get_dst_port_info (udp_main_t * um, udp_dst_port_t dst_port, u8 is_ip4) { @@ -174,19 +236,34 @@ udp_get_dst_port_info (udp_main_t * um, udp_dst_port_t dst_port, u8 is_ip4) format_function_t format_udp_header; format_function_t format_udp_rx_trace; - unformat_function_t unformat_udp_header; void udp_register_dst_port (vlib_main_t * vm, udp_dst_port_t dst_port, u32 node_index, u8 is_ip4); - -void -udp_unregister_dst_port (vlib_main_t * vm, - udp_dst_port_t dst_port, u8 is_ip4); +void udp_unregister_dst_port (vlib_main_t * vm, + udp_dst_port_t dst_port, u8 is_ip4); void udp_punt_unknown (vlib_main_t * vm, u8 is_ip4, u8 is_add); +always_inline void * +vlib_buffer_push_udp (vlib_buffer_t * b, u16 sp, u16 dp, u8 offload_csum) +{ + udp_header_t *uh; + + uh = vlib_buffer_push_uninit (b, sizeof (udp_header_t)); + uh->src_port = sp; + uh->dst_port = dp; + uh->checksum = 0; + uh->length = clib_host_to_net_u16 (b->current_length); + if (offload_csum) + { + b->flags |= VNET_BUFFER_F_OFFLOAD_UDP_CKSUM; + vnet_buffer (b)->l4_hdr_offset = (u8 *) uh - b->data; + } + return uh; +} + always_inline void ip_udp_fixup_one (vlib_main_t * vm, vlib_buffer_t * b0, u8 is_ip4) { diff --git a/src/vnet/udp/udp_error.def b/src/vnet/udp/udp_error.def index bfdae0acc77..488e7bee7df 100644 --- a/src/vnet/udp/udp_error.def +++ b/src/vnet/udp/udp_error.def @@ -19,3 +19,9 @@ udp_error (NONE, "no error") udp_error (NO_LISTENER, "no listener for dst port") udp_error (LENGTH_ERROR, "UDP packets with length errors") udp_error (PUNT, "no listener punt") +udp_error (ENQUEUED, "UDP packets enqueued") +udp_error (FIFO_FULL, "UDP fifo full") +udp_error (NOT_READY, "UDP connection not ready") +udp_error (LISTENER, "UDP connected session") +udp_error (CREATE_SESSION, "Failed to create UDP session") +udp_error (EVENT_FIFO_FULL, "UDP session event fifo full") diff --git a/src/vnet/udp/udp_input.c b/src/vnet/udp/udp_input.c index 5d3a1855269..8170cfbe49a 100644 --- a/src/vnet/udp/udp_input.c +++ b/src/vnet/udp/udp_input.c @@ -28,55 +28,62 @@ #include #include "../session/application_interface.h" -vlib_node_registration_t udp4_uri_input_node; +static char *udp_error_strings[] = { +#define udp_error(n,s) s, +#include "udp_error.def" +#undef udp_error +}; typedef struct { - u32 session; + u32 connection; u32 disposition; u32 thread_index; -} udp4_uri_input_trace_t; +} udp_input_trace_t; /* packet trace format function */ static u8 * -format_udp4_uri_input_trace (u8 * s, va_list * args) +format_udp_input_trace (u8 * s, va_list * args) { CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); - udp4_uri_input_trace_t *t = va_arg (*args, udp4_uri_input_trace_t *); + udp_input_trace_t *t = va_arg (*args, udp_input_trace_t *); - s = format (s, "UDP4_URI_INPUT: session %d, disposition %d, thread %d", - t->session, t->disposition, t->thread_index); + s = format (s, "UDP_INPUT: connection %d, disposition %d, thread %d", + t->connection, t->disposition, t->thread_index); return s; } +#define foreach_udp_input_next \ + _ (DROP, "error-drop") + typedef enum { - UDP4_URI_INPUT_NEXT_DROP, - UDP4_URI_INPUT_N_NEXT, -} udp4_uri_input_next_t; - -static char *udp4_uri_input_error_strings[] = { -#define _(sym,string) string, - foreach_session_input_error +#define _(s, n) UDP_INPUT_NEXT_##s, + foreach_udp_input_next #undef _ -}; + UDP_INPUT_N_NEXT, +} udp_input_next_t; -static uword -udp4_uri_input_node_fn (vlib_main_t * vm, - vlib_node_runtime_t * node, vlib_frame_t * frame) +always_inline void +udp_input_inc_counter (vlib_main_t * vm, u8 is_ip4, u8 evt, u8 val) +{ + if (PREDICT_TRUE (!val)) + return; + + if (is_ip4) + vlib_node_increment_counter (vm, udp4_input_node.index, evt, val); + else + vlib_node_increment_counter (vm, udp6_input_node.index, evt, val); +} + +always_inline uword +udp46_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node, + vlib_frame_t * frame, u8 is_ip4) { u32 n_left_from, *from, *to_next; - udp4_uri_input_next_t next_index; - udp_uri_main_t *um = vnet_get_udp_main (); - session_manager_main_t *smm = vnet_get_session_manager_main (); + u32 next_index, errors; u32 my_thread_index = vm->thread_index; - u8 my_enqueue_epoch; - u32 *session_indices_to_enqueue; - static u32 serial_number; - int i; - - my_enqueue_epoch = ++smm->current_enqueue_epoch[my_thread_index]; from = vlib_frame_vector_args (frame); n_left_from = frame->n_vectors; @@ -90,16 +97,18 @@ udp4_uri_input_node_fn (vlib_main_t * vm, while (n_left_from > 0 && n_left_to_next > 0) { - u32 bi0; + u32 bi0, fib_index0; vlib_buffer_t *b0; - u32 next0 = UDP4_URI_INPUT_NEXT_DROP; - u32 error0 = SESSION_ERROR_ENQUEUED; + u32 next0 = UDP_INPUT_NEXT_DROP; + u32 error0 = UDP_ERROR_ENQUEUED; udp_header_t *udp0; - ip4_header_t *ip0; - stream_session_t *s0; - svm_fifo_t *f0; - u16 udp_len0; + ip4_header_t *ip40; + ip6_header_t *ip60; u8 *data0; + stream_session_t *s0; + transport_connection_t *tc0 = 0; + udp_connection_t *child0, *new_uc0; + int written0; /* speculatively enqueue b0 to the current next frame */ bi0 = from[0]; @@ -112,89 +121,97 @@ udp4_uri_input_node_fn (vlib_main_t * vm, b0 = vlib_get_buffer (vm, bi0); /* udp_local hands us a pointer to the udp data */ - data0 = vlib_buffer_get_current (b0); udp0 = (udp_header_t *) (data0 - sizeof (*udp0)); + fib_index0 = vnet_buffer (b0)->ip.fib_index; - /* $$$$ fixme: udp_local doesn't do ip options correctly anyhow */ - ip0 = (ip4_header_t *) (((u8 *) udp0) - sizeof (*ip0)); - s0 = 0; - - /* lookup session */ - s0 = session_lookup4 (0, &ip0->dst_address, &ip0->src_address, - udp0->dst_port, udp0->src_port, - TRANSPORT_PROTO_UDP); + if (is_ip4) + { + /* $$$$ fixme: udp_local doesn't do ip options correctly anyhow */ + ip40 = (ip4_header_t *) (((u8 *) udp0) - sizeof (*ip40)); + s0 = session_lookup_safe4 (fib_index0, &ip40->dst_address, + &ip40->src_address, udp0->dst_port, + udp0->src_port, TRANSPORT_PROTO_UDP); + } + else + { + ip60 = (ip6_header_t *) (((u8 *) udp0) - sizeof (*ip60)); + s0 = session_lookup_safe6 (fib_index0, &ip60->dst_address, + &ip60->src_address, udp0->dst_port, + udp0->src_port, TRANSPORT_PROTO_UDP); + } - /* no listener */ if (PREDICT_FALSE (s0 == 0)) { - error0 = SESSION_ERROR_NO_LISTENER; + error0 = UDP_ERROR_NO_LISTENER; goto trace0; } - f0 = s0->server_rx_fifo; - - /* established hit */ if (PREDICT_TRUE (s0->session_state == SESSION_STATE_READY)) { - udp_len0 = clib_net_to_host_u16 (udp0->length); - - if (PREDICT_FALSE (udp_len0 > svm_fifo_max_enqueue (f0))) - { - error0 = SESSION_ERROR_FIFO_FULL; - goto trace0; - } - - svm_fifo_enqueue_nowait (f0, udp_len0 - sizeof (*udp0), - (u8 *) (udp0 + 1)); - - b0->error = node->errors[SESSION_ERROR_ENQUEUED]; - - /* We need to send an RX event on this fifo */ - if (s0->enqueue_epoch != my_enqueue_epoch) - { - s0->enqueue_epoch = my_enqueue_epoch; - - vec_add1 (smm->session_indices_to_enqueue_by_thread - [my_thread_index], - s0 - smm->sessions[my_thread_index]); - } + tc0 = session_get_transport (s0); } - /* listener hit */ - else if (s0->session_state == SESSION_STATE_LISTENING) + else if (s0->session_state == SESSION_STATE_CONNECTING_READY) { - udp_connection_t *us; - int rv; - - error0 = SESSION_ERROR_NOT_READY; - /* - * create udp transport session + * Clone the transport. It will be cleaned up with the + * session once we notify the session layer. */ - pool_get (um->udp_sessions[my_thread_index], us); - - us->mtu = 1024; /* $$$$ policy */ - - us->c_lcl_ip4.as_u32 = ip0->dst_address.as_u32; - us->c_rmt_ip4.as_u32 = ip0->src_address.as_u32; - us->c_lcl_port = udp0->dst_port; - us->c_rmt_port = udp0->src_port; - us->c_transport_proto = TRANSPORT_PROTO_UDP; - us->c_c_index = us - um->udp_sessions[my_thread_index]; + new_uc0 = udp_conenction_clone_safe (s0->connection_index, + s0->thread_index); + ASSERT (s0->session_index == new_uc0->c_s_index); /* - * create stream session and attach the udp session to it + * Drop the 'lock' on pool resize */ - rv = stream_session_accept (&us->connection, s0->session_index, - 1 /*notify */ ); - if (rv) - error0 = rv; + session_pool_remove_peeker (s0->thread_index); + session_dgram_connect_notify (&new_uc0->connection, + s0->thread_index, &s0); + tc0 = &new_uc0->connection; + } + else if (s0->session_state == SESSION_STATE_LISTENING) + { + tc0 = listen_session_get_transport (s0); + + child0 = udp_connection_alloc (my_thread_index); + if (is_ip4) + { + ip_set (&child0->c_lcl_ip, &ip40->dst_address, 1); + ip_set (&child0->c_rmt_ip, &ip40->src_address, 1); + } + else + { + ip_set (&child0->c_lcl_ip, &ip60->dst_address, 0); + ip_set (&child0->c_rmt_ip, &ip60->src_address, 0); + } + child0->c_lcl_port = udp0->dst_port; + child0->c_rmt_port = udp0->src_port; + child0->c_is_ip4 = is_ip4; + child0->mtu = 1460; /* $$$$ policy */ + + if (stream_session_accept + (&child0->connection, tc0->s_index, 1)) + { + error0 = UDP_ERROR_CREATE_SESSION; + goto trace0; + } + s0 = session_get (child0->c_s_index, child0->c_thread_index); + s0->session_state = SESSION_STATE_READY; + tc0 = &child0->connection; + error0 = UDP_ERROR_LISTENER; } else { + error0 = UDP_ERROR_NOT_READY; + goto trace0; + } - error0 = SESSION_ERROR_NOT_READY; + written0 = session_enqueue_dgram_connection (s0, b0, tc0->proto, + 1 /* queue evt */ ); + if (PREDICT_FALSE (written0 < 0)) + { + error0 = UDP_ERROR_FIFO_FULL; goto trace0; } @@ -204,17 +221,14 @@ udp4_uri_input_node_fn (vlib_main_t * vm, if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE) && (b0->flags & VLIB_BUFFER_IS_TRACED))) { - udp4_uri_input_trace_t *t = - vlib_add_trace (vm, node, b0, sizeof (*t)); + udp_input_trace_t *t = vlib_add_trace (vm, node, b0, + sizeof (*t)); - t->session = ~0; - if (s0) - t->session = s0 - smm->sessions[my_thread_index]; + t->connection = tc0 ? tc0->c_index : ~0; t->disposition = error0; t->thread_index = my_thread_index; } - /* verify speculative enqueue, maybe switch current next frame */ vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, n_left_to_next, bi0, next0); @@ -223,94 +237,66 @@ udp4_uri_input_node_fn (vlib_main_t * vm, vlib_put_next_frame (vm, node, next_index, n_left_to_next); } - /* Send enqueue events */ - - session_indices_to_enqueue = - smm->session_indices_to_enqueue_by_thread[my_thread_index]; - - for (i = 0; i < vec_len (session_indices_to_enqueue); i++) - { - session_fifo_event_t evt; - unix_shared_memory_queue_t *q; - stream_session_t *s0; - application_t *server0; - - /* Get session */ - s0 = pool_elt_at_index (smm->sessions[my_thread_index], - session_indices_to_enqueue[i]); - - /* Get session's server */ - server0 = application_get (s0->app_index); - - /* Built-in server? Deliver the goods... */ - if (server0->cb_fns.builtin_server_rx_callback) - { - server0->cb_fns.builtin_server_rx_callback (s0); - continue; - } - - if (svm_fifo_set_event (s0->server_rx_fifo)) - { - /* Fabricate event */ - evt.fifo = s0->server_rx_fifo; - evt.event_type = FIFO_EVENT_APP_RX; - evt.event_id = serial_number++; - - /* Add event to server's event queue */ - q = server0->event_queue; - - /* Don't block for lack of space */ - if (PREDICT_TRUE (q->cursize < q->maxsize)) - { - unix_shared_memory_queue_add (server0->event_queue, - (u8 *) & evt, - 0 /* do wait for mutex */ ); - } - else - { - vlib_node_increment_counter (vm, udp4_uri_input_node.index, - SESSION_ERROR_FIFO_FULL, 1); - } - } - /* *INDENT-OFF* */ - if (1) - { - ELOG_TYPE_DECLARE (e) = - { - .format = "evt-enqueue: id %d length %d", - .format_args = "i4i4",}; - struct - { - u32 data[2]; - } *ed; - ed = ELOG_DATA (&vlib_global_main.elog_main, e); - ed->data[0] = evt.event_id; - ed->data[1] = svm_fifo_max_dequeue (s0->server_rx_fifo); - } - /* *INDENT-ON* */ + errors = session_manager_flush_enqueue_events (TRANSPORT_PROTO_UDP, + my_thread_index); + udp_input_inc_counter (vm, is_ip4, UDP_ERROR_EVENT_FIFO_FULL, errors); + return frame->n_vectors; +} - } +vlib_node_registration_t udp4_input_node; +vlib_node_registration_t udp6_input_node; - vec_reset_length (session_indices_to_enqueue); +static uword +udp4_input (vlib_main_t * vm, vlib_node_runtime_t * node, + vlib_frame_t * frame) +{ + return udp46_input_inline (vm, node, frame, 1); +} - smm->session_indices_to_enqueue_by_thread[my_thread_index] = - session_indices_to_enqueue; +/* *INDENT-OFF* */ +VLIB_REGISTER_NODE (udp4_input_node) = +{ + .function = udp4_input, + .name = "udp4-input", + .vector_size = sizeof (u32), + .format_trace = format_udp_input_trace, + .type = VLIB_NODE_TYPE_INTERNAL, + .n_errors = ARRAY_LEN (udp_error_strings), + .error_strings = udp_error_strings, + .n_next_nodes = UDP_INPUT_N_NEXT, + .next_nodes = { +#define _(s, n) [UDP_INPUT_NEXT_##s] = n, + foreach_udp_input_next +#undef _ + }, +}; +/* *INDENT-ON* */ - return frame->n_vectors; +static uword +udp6_input (vlib_main_t * vm, vlib_node_runtime_t * node, + vlib_frame_t * frame) +{ + return udp46_input_inline (vm, node, frame, 0); } -VLIB_REGISTER_NODE (udp4_uri_input_node) = +/* *INDENT-OFF* */ +VLIB_REGISTER_NODE (udp6_input_node) = { - .function = udp4_uri_input_node_fn,.name = "udp4-uri-input",.vector_size = - sizeof (u32),.format_trace = format_udp4_uri_input_trace,.type = - VLIB_NODE_TYPE_INTERNAL,.n_errors = - ARRAY_LEN (udp4_uri_input_error_strings),.error_strings = - udp4_uri_input_error_strings,.n_next_nodes = UDP4_URI_INPUT_N_NEXT, - /* edit / add dispositions here */ - .next_nodes = - { - [UDP4_URI_INPUT_NEXT_DROP] = "error-drop",} -,}; + .function = udp6_input, + .name = "udp6-input", + .vector_size = sizeof (u32), + .format_trace = format_udp_input_trace, + .type = VLIB_NODE_TYPE_INTERNAL, + .n_errors = ARRAY_LEN (udp_error_strings), + .error_strings = udp_error_strings, + .n_next_nodes = UDP_INPUT_N_NEXT, + .next_nodes = { +#define _(s, n) [UDP_INPUT_NEXT_##s] = n, + foreach_udp_input_next +#undef _ + }, +}; +/* *INDENT-ON* */ /* * fd.io coding-style-patch-verification: ON diff --git a/src/vnet/udp/udp_local.c b/src/vnet/udp/udp_local.c index 8c0ac465e04..ce9bb029788 100644 --- a/src/vnet/udp/udp_local.c +++ b/src/vnet/udp/udp_local.c @@ -23,7 +23,7 @@ udp_main_t udp_main; -#define foreach_udp_input_next \ +#define foreach_udp_local_next \ _ (PUNT, "error-punt") \ _ (DROP, "error-drop") \ _ (ICMP4_ERROR, "ip4-icmp-error") \ @@ -31,25 +31,25 @@ udp_main_t udp_main; typedef enum { -#define _(s,n) UDP_INPUT_NEXT_##s, - foreach_udp_input_next +#define _(s,n) UDP_LOCAL_NEXT_##s, + foreach_udp_local_next #undef _ - UDP_INPUT_N_NEXT, -} udp_input_next_t; + UDP_LOCAL_N_NEXT, +} udp_local_next_t; typedef struct { u16 src_port; u16 dst_port; u8 bound; -} udp_rx_trace_t; +} udp_local_rx_trace_t; u8 * format_udp_rx_trace (u8 * s, va_list * args) { CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); - udp_rx_trace_t *t = va_arg (*args, udp_rx_trace_t *); + udp_local_rx_trace_t *t = va_arg (*args, udp_local_rx_trace_t *); s = format (s, "UDP: src-port %d dst-port %d%s", clib_net_to_host_u16 (t->src_port), @@ -58,11 +58,11 @@ format_udp_rx_trace (u8 * s, va_list * args) return s; } -vlib_node_registration_t udp4_input_node; -vlib_node_registration_t udp6_input_node; +vlib_node_registration_t udp4_local_node; +vlib_node_registration_t udp6_local_node; always_inline uword -udp46_input_inline (vlib_main_t * vm, +udp46_local_inline (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * from_frame, int is_ip4) { @@ -132,7 +132,7 @@ udp46_input_inline (vlib_main_t * vm, if (PREDICT_FALSE (b0->current_length < advance0 + sizeof (*h0))) { error0 = UDP_ERROR_LENGTH_ERROR; - next0 = UDP_INPUT_NEXT_DROP; + next0 = UDP_LOCAL_NEXT_DROP; } else { @@ -143,14 +143,14 @@ udp46_input_inline (vlib_main_t * vm, vlib_buffer_length_in_chain (vm, b0))) { error0 = UDP_ERROR_LENGTH_ERROR; - next0 = UDP_INPUT_NEXT_DROP; + next0 = UDP_LOCAL_NEXT_DROP; } } if (PREDICT_FALSE (b1->current_length < advance1 + sizeof (*h1))) { error1 = UDP_ERROR_LENGTH_ERROR; - next1 = UDP_INPUT_NEXT_DROP; + next1 = UDP_LOCAL_NEXT_DROP; } else { @@ -161,7 +161,7 @@ udp46_input_inline (vlib_main_t * vm, vlib_buffer_length_in_chain (vm, b1))) { error1 = UDP_ERROR_LENGTH_ERROR; - next1 = UDP_INPUT_NEXT_DROP; + next1 = UDP_LOCAL_NEXT_DROP; } } @@ -187,7 +187,7 @@ udp46_input_inline (vlib_main_t * vm, if (PREDICT_FALSE (punt_unknown)) { b0->error = node->errors[UDP_ERROR_PUNT]; - next0 = UDP_INPUT_NEXT_PUNT; + next0 = UDP_LOCAL_NEXT_PUNT; } else if (is_ip4) { @@ -195,7 +195,7 @@ udp46_input_inline (vlib_main_t * vm, ICMP4_destination_unreachable, ICMP4_destination_unreachable_port_unreachable, 0); - next0 = UDP_INPUT_NEXT_ICMP4_ERROR; + next0 = UDP_LOCAL_NEXT_ICMP4_ERROR; n_no_listener++; } else @@ -204,7 +204,7 @@ udp46_input_inline (vlib_main_t * vm, ICMP6_destination_unreachable, ICMP6_destination_unreachable_port_unreachable, 0); - next0 = UDP_INPUT_NEXT_ICMP6_ERROR; + next0 = UDP_LOCAL_NEXT_ICMP6_ERROR; n_no_listener++; } } @@ -224,7 +224,7 @@ udp46_input_inline (vlib_main_t * vm, if (PREDICT_FALSE (punt_unknown)) { b1->error = node->errors[UDP_ERROR_PUNT]; - next1 = UDP_INPUT_NEXT_PUNT; + next1 = UDP_LOCAL_NEXT_PUNT; } else if (is_ip4) { @@ -232,7 +232,7 @@ udp46_input_inline (vlib_main_t * vm, ICMP4_destination_unreachable, ICMP4_destination_unreachable_port_unreachable, 0); - next1 = UDP_INPUT_NEXT_ICMP4_ERROR; + next1 = UDP_LOCAL_NEXT_ICMP4_ERROR; n_no_listener++; } else @@ -241,7 +241,7 @@ udp46_input_inline (vlib_main_t * vm, ICMP6_destination_unreachable, ICMP6_destination_unreachable_port_unreachable, 0); - next1 = UDP_INPUT_NEXT_ICMP6_ERROR; + next1 = UDP_LOCAL_NEXT_ICMP6_ERROR; n_no_listener++; } } @@ -254,26 +254,26 @@ udp46_input_inline (vlib_main_t * vm, if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED)) { - udp_rx_trace_t *tr = vlib_add_trace (vm, node, - b0, sizeof (*tr)); + udp_local_rx_trace_t *tr = vlib_add_trace (vm, node, + b0, sizeof (*tr)); if (b0->error != node->errors[UDP_ERROR_LENGTH_ERROR]) { tr->src_port = h0 ? h0->src_port : 0; tr->dst_port = h0 ? h0->dst_port : 0; - tr->bound = (next0 != UDP_INPUT_NEXT_ICMP4_ERROR && - next0 != UDP_INPUT_NEXT_ICMP6_ERROR); + tr->bound = (next0 != UDP_LOCAL_NEXT_ICMP4_ERROR && + next0 != UDP_LOCAL_NEXT_ICMP6_ERROR); } } if (PREDICT_FALSE (b1->flags & VLIB_BUFFER_IS_TRACED)) { - udp_rx_trace_t *tr = vlib_add_trace (vm, node, - b1, sizeof (*tr)); + udp_local_rx_trace_t *tr = vlib_add_trace (vm, node, + b1, sizeof (*tr)); if (b1->error != node->errors[UDP_ERROR_LENGTH_ERROR]) { tr->src_port = h1 ? h1->src_port : 0; tr->dst_port = h1 ? h1->dst_port : 0; - tr->bound = (next1 != UDP_INPUT_NEXT_ICMP4_ERROR && - next1 != UDP_INPUT_NEXT_ICMP6_ERROR); + tr->bound = (next1 != UDP_LOCAL_NEXT_ICMP4_ERROR && + next1 != UDP_LOCAL_NEXT_ICMP6_ERROR); } } @@ -308,7 +308,7 @@ udp46_input_inline (vlib_main_t * vm, if (PREDICT_FALSE (b0->current_length < advance0 + sizeof (*h0))) { b0->error = node->errors[UDP_ERROR_LENGTH_ERROR]; - next0 = UDP_INPUT_NEXT_DROP; + next0 = UDP_LOCAL_NEXT_DROP; goto trace_x1; } @@ -333,7 +333,7 @@ udp46_input_inline (vlib_main_t * vm, if (PREDICT_FALSE (punt_unknown)) { b0->error = node->errors[UDP_ERROR_PUNT]; - next0 = UDP_INPUT_NEXT_PUNT; + next0 = UDP_LOCAL_NEXT_PUNT; } else if (is_ip4) { @@ -341,7 +341,7 @@ udp46_input_inline (vlib_main_t * vm, ICMP4_destination_unreachable, ICMP4_destination_unreachable_port_unreachable, 0); - next0 = UDP_INPUT_NEXT_ICMP4_ERROR; + next0 = UDP_LOCAL_NEXT_ICMP4_ERROR; n_no_listener++; } else @@ -350,7 +350,7 @@ udp46_input_inline (vlib_main_t * vm, ICMP6_destination_unreachable, ICMP6_destination_unreachable_port_unreachable, 0); - next0 = UDP_INPUT_NEXT_ICMP6_ERROR; + next0 = UDP_LOCAL_NEXT_ICMP6_ERROR; n_no_listener++; } } @@ -364,20 +364,20 @@ udp46_input_inline (vlib_main_t * vm, else { b0->error = node->errors[UDP_ERROR_LENGTH_ERROR]; - next0 = UDP_INPUT_NEXT_DROP; + next0 = UDP_LOCAL_NEXT_DROP; } trace_x1: if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED)) { - udp_rx_trace_t *tr = vlib_add_trace (vm, node, - b0, sizeof (*tr)); + udp_local_rx_trace_t *tr = vlib_add_trace (vm, node, + b0, sizeof (*tr)); if (b0->error != node->errors[UDP_ERROR_LENGTH_ERROR]) { tr->src_port = h0->src_port; tr->dst_port = h0->dst_port; - tr->bound = (next0 != UDP_INPUT_NEXT_ICMP4_ERROR && - next0 != UDP_INPUT_NEXT_ICMP6_ERROR); + tr->bound = (next0 != UDP_LOCAL_NEXT_ICMP4_ERROR && + next0 != UDP_LOCAL_NEXT_ICMP6_ERROR); } } @@ -400,23 +400,22 @@ static char *udp_error_strings[] = { }; static uword -udp4_input (vlib_main_t * vm, +udp4_local (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * from_frame) { - return udp46_input_inline (vm, node, from_frame, 1 /* is_ip4 */ ); + return udp46_local_inline (vm, node, from_frame, 1 /* is_ip4 */ ); } static uword -udp6_input (vlib_main_t * vm, +udp6_local (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * from_frame) { - return udp46_input_inline (vm, node, from_frame, 0 /* is_ip4 */ ); + return udp46_local_inline (vm, node, from_frame, 0 /* is_ip4 */ ); } - /* *INDENT-OFF* */ -VLIB_REGISTER_NODE (udp4_input_node) = { - .function = udp4_input, +VLIB_REGISTER_NODE (udp4_local_node) = { + .function = udp4_local, .name = "ip4-udp-lookup", /* Takes a vector of packets. */ .vector_size = sizeof (u32), @@ -424,10 +423,10 @@ VLIB_REGISTER_NODE (udp4_input_node) = { .n_errors = UDP_N_ERROR, .error_strings = udp_error_strings, - .n_next_nodes = UDP_INPUT_N_NEXT, + .n_next_nodes = UDP_LOCAL_N_NEXT, .next_nodes = { -#define _(s,n) [UDP_INPUT_NEXT_##s] = n, - foreach_udp_input_next +#define _(s,n) [UDP_LOCAL_NEXT_##s] = n, + foreach_udp_local_next #undef _ }, @@ -437,11 +436,11 @@ VLIB_REGISTER_NODE (udp4_input_node) = { }; /* *INDENT-ON* */ -VLIB_NODE_FUNCTION_MULTIARCH (udp4_input_node, udp4_input); +VLIB_NODE_FUNCTION_MULTIARCH (udp4_local_node, udp4_local); /* *INDENT-OFF* */ -VLIB_REGISTER_NODE (udp6_input_node) = { - .function = udp6_input, +VLIB_REGISTER_NODE (udp6_local_node) = { + .function = udp6_local, .name = "ip6-udp-lookup", /* Takes a vector of packets. */ .vector_size = sizeof (u32), @@ -449,10 +448,10 @@ VLIB_REGISTER_NODE (udp6_input_node) = { .n_errors = UDP_N_ERROR, .error_strings = udp_error_strings, - .n_next_nodes = UDP_INPUT_N_NEXT, + .n_next_nodes = UDP_LOCAL_N_NEXT, .next_nodes = { -#define _(s,n) [UDP_INPUT_NEXT_##s] = n, - foreach_udp_input_next +#define _(s,n) [UDP_LOCAL_NEXT_##s] = n, + foreach_udp_local_next #undef _ }, @@ -462,7 +461,7 @@ VLIB_REGISTER_NODE (udp6_input_node) = { }; /* *INDENT-ON* */ -VLIB_NODE_FUNCTION_MULTIARCH (udp6_input_node, udp6_input); +VLIB_NODE_FUNCTION_MULTIARCH (udp6_local_node, udp6_local); static void add_dst_port (udp_main_t * um, @@ -508,8 +507,8 @@ udp_register_dst_port (vlib_main_t * vm, pi->node_index = node_index; pi->next_index = vlib_node_add_next (vm, - is_ip4 ? udp4_input_node.index - : udp6_input_node.index, node_index); + is_ip4 ? udp4_local_node.index + : udp6_local_node.index, node_index); /* Setup udp protocol -> next index sparse vector mapping. */ if (is_ip4) @@ -620,8 +619,8 @@ udp_local_init (vlib_main_t * vm) um->dst_port_info_by_dst_port[i] = hash_create (0, sizeof (uword)); } - udp_setup_node (vm, udp4_input_node.index); - udp_setup_node (vm, udp6_input_node.index); + udp_setup_node (vm, udp4_local_node.index); + udp_setup_node (vm, udp6_local_node.index); um->punt_unknown4 = 0; um->punt_unknown6 = 0; @@ -640,7 +639,7 @@ udp_local_init (vlib_main_t * vm) #define _(n,s) add_dst_port (um, UDP_DST_PORT_##s, #s, 0 /* is_ip4 */); foreach_udp6_dst_port #undef _ - ip4_register_protocol (IP_PROTOCOL_UDP, udp4_input_node.index); + ip4_register_protocol (IP_PROTOCOL_UDP, udp4_local_node.index); /* Note: ip6 differs from ip4, UDP is hotwired to ip6-udp-lookup */ return 0; } -- cgit 1.2.3-korg