From 15ad172a847fa667c57a4594ef4158405db9a984 Mon Sep 17 00:00:00 2001 From: Angelo Mantellini Date: Tue, 31 Mar 2020 17:50:43 +0200 Subject: [HICN-554] hicn-light refactoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I36f2d393741d4502ce14d3791158e43e3e9cd4cf Signed-off-by: Jordan Augé --- hicn-light/src/hicn/io/tcp.c | 563 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 563 insertions(+) create mode 100644 hicn-light/src/hicn/io/tcp.c (limited to 'hicn-light/src/hicn/io/tcp.c') diff --git a/hicn-light/src/hicn/io/tcp.c b/hicn-light/src/hicn/io/tcp.c new file mode 100644 index 000000000..03911e556 --- /dev/null +++ b/hicn-light/src/hicn/io/tcp.c @@ -0,0 +1,563 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Common activity for STREAM based listeners. + */ + +#include +#ifndef _WIN32 +#include // fcntl +#endif /* _WIN32 */ +#include // fcntl +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include +// 128 KB output queue +#define OUTPUT_QUEUE_BYTES (128 * 1024) + +#define RECV_BUFLEN 8192 +#define MTU 1500 + +// XXX TODO what is exactly an eventqueue +// XXX TODO can't we write directly to the socket ? + +/****************************************************************************** + * Listener + ******************************************************************************/ + +typedef struct { +} listener_tcp_data_t; + +static +int +listener_tcp_initialize(listener_t * listener) +{ + + ERROR("[listener_tcp_initialize] Not implemented"); + + return 0; +} + +static +void +listener_tcp_finalize(listener_t * listener) +{ + + ERROR("[listener_tcp_finalize] Not implemented"); +} + +static +int +listener_tcp_punt(const listener_t * listener, const char * prefix_s) +{ + ERROR("[listener_tcp_punt] Not implemented"); + return -1; +} + +static +int +listener_tcp_get_socket(const listener_t * listener, const address_t * local, + const address_t * remote, const char * interface_name) +{ + + ERROR("[listener_tcp_get_socket] Not implemented"); + return -1; + +} + +static +void +listener_tcp_read_callback(listener_t * listener, int fd, void * data) +{ + ERROR("[listener_tcp_read_callback] Not implemented"); + +} + +DECLARE_LISTENER(tcp); + +/****************************************************************************** + * Connection + ******************************************************************************/ + + +typedef struct { + /* Partial receive buffer */ + u8 buf[RECV_BUFLEN]; + size_t roff; /**< Read offset */ + size_t woff; /**< Write offset */ + + + // XXX this should be initialized with the header length, and tells what we + // expect to receive next... + size_t next_len; + struct bufferevent * bufferevent; +} connection_tcp_data_t; + +// XXX This seems more a listener code ! +// XXX we must have accept to the connection table to spawn new ones !! +// XXX equivalent to initialize +int +connection_tcp_accept(connection_t * connection, forwarder_t *forwarder, int fd, + address_pair_t *pair, bool local, unsigned connection_id) +{ + assert(connection); + assert(forwarder); + + *connection = (connection_t) { + .id = connection_id, + .interface_name = NULL, + .type = FACE_TYPE_TCP, + .pair = *pair, + .fd = fd, + .local = local, + // As we are accepting a connection, we begin in the UP state + .state = FACE_STATE_UP, + .admin_state = FACE_STATE_UP, +#ifdef WITH_POLICY + .priority = 0, +#endif /* WITH_POLICY */ + + .forwarder = forwarder, + .closed = false, + }; + + // XXX this new connection needs to be registered + //char *str = pair_ToString(udp->pair); + INFO("%s connection %p created for address %s (local=%s)", + face_type_str(connection->type), connection, "N/A", + connection_is_local(connection) ? "true" : "false"); + //free(str); + + return 0; +} + +int +make_socket(address_pair_t * pair) +{ +#ifndef _WIN32 + int fd = socket(address_family(&pair->local), SOCK_STREAM, 0); + if (fd < 0) { + perror("socket"); + goto ERR_SOCKET; + } +#else + SOCKET fd = socket(address_family(&pair->local), SOCK_STREAM, 0); + if (fd == INVALID_SOCKET) { + perror("socket"); + goto ERR_SOCKET; + } +#endif /* _WIN32 */ + + /* Set non-blocking flag */ +#ifndef _WIN32 + int flags = fcntl(fd, F_GETFL, NULL); + if (flags == -1) { + perror("F_GETFL"); + goto ERR; + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + perror("F_SETFL"); + goto ERR; + } +#else + if (ioctlsocket(fd, FIONBIO, &(u_long){1}) != NO_ERROR) { + perror("ioctlsocket"); + goto ERR; + } +#endif /* _WIN32 */ + + if (bind(fd, address_sa(&pair->local), address_socklen(&pair->local)) == -1) { + perror("bind"); + goto ERR; + } + + if (connect(fd, address_sa(&pair->remote), address_socklen(&pair->remote)) < 0) { + perror("connect"); + goto ERR; + } + + + return 0; + +ERR: +#ifndef _WIN32 + close(fd); +#else + closesocket(fd); +#endif + +ERR_SOCKET: + return -1; + +} + +static +int +connection_tcp_initialize(connection_t * connection) +{ + assert(connection); + assert(connection->type = FACE_TYPE_TCP); + + connection_tcp_data_t * data = connection->data; + assert(data); + + data->roff = 0; + data->woff = 0; + + connection->fd = make_socket(connection_get_pair(connection)); + if (connection->fd < 0) { + ERROR("Error creating TCP socket"); + return -1; + } + + //char *pair_str = address_pair_ToString(pair); + INFO("%s connection %p connect for address pair %s", + face_type_str(connection->type), connection, "N/A"); + //free(pair_str); + + return 0; +} + +// XXX a part needs to be handled in the connection.c +static +void +connection_tcp_finalize(connection_t * connection) +{ + if (!connection->closed) { + connection->closed = true; + // XXX need to delete the connection like previously in the connection + // manager + } + + INFO("%s connection %p destroyed", face_type_str(connection->type), + connection); + // XXX need to release the "connection" +} + +#if 0 +static +bool +connection_tcp_sendv(connnection_t * connection, struct iovec * iov, + size_t size) +{ + assert(connection); + assert(iov); + + if (!connection_is_up(connection)) { + ERROR("Connection #%u tried to send to down connection (up=%d closed=%d)", + connection_get_id(connection), + connection_get_up(connection) ? "true" : "false", + connection_get_closed(connection) ? "true" : "false"); + return false; + } + + PARCEventBuffer *buffer = + parcEventBuffer_GetQueueBufferOutput(connection->events); + size_t buffer_backlog = parcEventBuffer_GetLength(buffer); + parcEventBuffer_Destroy(&buffer); + + if (buffer_backlog >= OUTPUT_QUEUE_BYTES) { + WARN("Connection #%u Writing to buffer backlog %zu bytes DROP MESSAGE", + connection_get_id(connection), buffer_backlog); + return false; + } + +#if 0 + /* DEBUG */ + size_t length = 0; + for (int i = 0; i < size; i++) + length += message[i].iov_len; + DEBUG("Connection #%u writing %zu bytes to buffer with backlog %zu bytes", + connection_get_id(connection), length, buffer_backlog); +#endif + + /* Write directly into the parcEventQueue without passing through message */ + for (int i = 0; i < size; i++) { + if (parcEventQueue_Write(conn->events, iov[i].iov_base, + iov[i].iov_len) != 0) + return false; + } + + return true; +} +#endif + +/** + * @function streamConnection_Send + * @abstract Non-destructive send of the message. + * @discussion + * Send uses message_CopyToStreamBuffer, which is a non-destructive write. + * The send may fail if there's no buffer space in the output queue. + * + * @param dummy is ignored. A stream has only one peer. + * @return <#return#> + */ +// XXX address not used anywhere +// XXX too much repeated code with sendv here +static +int +connection_tcp_send(const connection_t * connection, //const address_t * address, + msgbuf_t * msgbuf, bool queue) +{ + assert(connection); + assert(address); + /* msgbuf can be NULL */ + + /* No need to flush */ + if (!msgbuf) + return true; + + if (!connection_is_up(connection)) { + ERROR("Connection #%u tried to send to down connection (up=%d closed=%d)", + connection_get_id(connection), + connection_is_up(connection) ? "true" : "false", + connection_is_closed(connection) ? "true" : "false"); + return false; + } + + // XXX TODO write to fd +#if 0 + PARCEventBuffer *buffer = + parcEventBuffer_GetQueueBufferOutput(connection->events); + size_t buffer_backlog = parcEventBuffer_GetLength(buffer); + parcEventBuffer_Destroy(&buffer); + + if (buffer_backlog >= OUTPUT_QUEUE_BYTES) { + WARN("Connection #%u Writing to buffer backlog %zu bytes DROP MESSAGE", + connection_get_id(connection), buffer_backlog); + return false; + } + + DEBUG("Connection #%u Writing %zu bytes to buffer with backlog %zu bytes", + connection_get_id(connection), msgbuf_len(message), buffer_backlog); + + return (parcEventQueue_Write(connection->events, msgbuf_packet(message), + msgbuf_len(message)) == 0); +#endif + return true; +} + +static +int +connection_tcp_send_packet(const connection_t * connection, + const uint8_t * packet, size_t size) +{ + /* Not implemented for local connections */ + // XXX shall we set the pointer to NULL and add a check ? + + ERROR("[connection_tcp_send_packet] Not implemented"); + + return -1; +} + +// ================================================================= +// the actual I/O functions + +// not needed anymore ? +#if 0 +// XXX this is called iif there is sufficient data to read, otherwise it raises +// an assertion error. This was a problem before I guess +static +int +connection_tcp_read_message(connection_t * connection, msgbuf_t * msgbuf) +{ + assert(connection); + assert(msgbuf); + + connection_tcp_data_t * data = connection->data; + assert(data); + + size_t n = evbuffer_get_length(data->evbuffer); + // XXX this check was wrong + // parcAssertTrue(n >= sizeof(header_control_message), + "Called with too short an input: %zu", n); + + // XXX WTF + if (stream->next_len == 0) { + // this linearizes the first messageHandler_GetIPv6HeaderLength() bytes of the + // input buffer's iovecs and returns a pointer to it. + uint8_t *fh = parcEventBuffer_Pullup(data->evbuffer, sizeof(header_control_message)); + + // Calculate the total message size based on the fixed header + stream->next_len = messageHandler_GetTotalPacketLength(fh); + } + // This is not an ELSE statement. We can both start a new message then + // check if there's enough bytes to read the whole thing. + + if (n < stream->next_len) + return -1; + + uint8_t packet_type; + if (messageHandler_IsInterest(packet)) { + packet_type = MESSAGE_TYPE_INTEREST; + } else if (messageHandler_IsData(packet)) { + packet_type = MESSAGE_TYPE_DATA; + } else { + ERROR("Dropped packet that is not interest nor data"); + goto ERR; + } + + if (evbuffer_remove(data->evbuffer, data->packet, data->next_len) < 0) + return -1; + + msgbuf_from_packet(msgbuf, data->packet, packet_type, stream->id, + ticks_now()); + + // now reset message length for next packet + data->next_len = 0; + return 0; + +ERR: + evbuffer_drain(data->evbuffer, data->next_len); + return -1; +} +#endif + +/** + * @function conn_readcb + * @abstract Event callback for reads + * @discussion + * Will read messages off the input. Continues reading as long as we + * can get a header to determine the next message length or as long as we + * can read a complete message. + * + * This function manipulates the read low water mark. (1) read a fixed header + * plus complete message, then set the low water mark to FIXED_HEADER_LEN. (2) + * read a fixed header, but not a complete message, then set low water mark to + * the total mesage length. Using the low water mark like this means the buffer + * event will only trigger on meaningful byte boundaries when we can get actual + * work done. + * + * @param <#param1#> + * @return <#return#> + */ +static +void +connection_tcp_read_callback(connection_t * connection, int fd, void * user_data) +{ + assert(!!(what & PARCEventType_Read)); + assert(connection_void); + + connection_tcp_data_t * data = connection->data; + assert(RECV_BUFLEN - data->woff > MTU); + + /* No batching here as code is not expected to receive much throughput */ + for (;;) { + ssize_t n = recv(connection->fd, data->buf + data->woff, + RECV_BUFLEN - data->woff, 0); + if (n == 0) /* EOF */ + return; // XXX close connection + if (n < 0) { + if (errno == EWOULDBLOCK) + break; + perror("recv"); + return; // XXX close connection + } + data->woff += n; + + /* Process */ + uint8_t * packet = data->buf + data->roff; + size_t size = data->woff - data->roff; /* > 0 */ + + ssize_t used = listener_read_callback(connection->forwarder, NULL, fd, + address_pair_get_local(&connection->pair), packet, size); + if (used < 0) + return; // XXX close connection ? + if (used == 0) + break; /* We would have received more if there was still packets to be read */ + data->roff += used; + assert(data->roff <= data->woff); + + if (data->roff == data->woff) { + /* Reset state whenever possible to avoid memcpy's */ + data->roff = 0; + data->woff = 0; + return; + } + } + + /* Make sure there is enough remaining space in the buffer */ + if (RECV_BUFLEN - data->woff < MTU) { + /* + * There should be no overlap provided a sufficiently large BUFLEN, but + * who knows. + */ + memmove(data->buf, data->buf + data->roff, data->woff - data->roff); + data->woff -= data->roff; + data->roff = 0; + } + + return; +} + +#if 0 +static +void +connection_tcp_callback(connection_t * connection, int fd, void * user_data) +{ + if (events & PARCEventQueueEventType_Connected) { + INFO("Connection %u is connected", stream->id); + + // if the stream was closed, do not transition to an UP state + if (!stream->isClosed) { + _setConnectionState(stream, true); + } + } else if (events & PARCEventQueueEventType_EOF) { + INFO("connid %u closed.", stream->id); + + parcEventQueue_Disable(stream->events, PARCEventType_Read); + + _setConnectionState(stream, false); + + if (!stream->isClosed) { + stream->isClosed = true; + // XXX TODO destroy the connection + } + } else if (events & PARCEventQueueEventType_Error) { + ERROR("Got an error on the connection %u: %s", stream->id, + strerror(errno)); + + parcEventQueue_Disable(stream->events, + PARCEventType_Read | PARCEventType_Write); + + _setConnectionState(stream, false); + + if (!stream->isClosed) { + stream->isClosed = true; + // XXX TODO destroy the connection + } + } + /* None of the other events can happen here, since we haven't enabled + * timeouts */ +} +#endif + +DECLARE_CONNECTION(tcp) -- cgit 1.2.3-korg