aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/io
diff options
context:
space:
mode:
authorJordan Augé <jordan.auge+fdio@cisco.com>2020-09-23 17:50:52 +0200
committerJordan Augé <jordan.auge+fdio@cisco.com>2020-09-28 18:25:30 +0200
commit32dccec98e4c7d7e4ce902e19ba8d1b29b823758 (patch)
tree95c5dec2083a3774c13bd5f896743cd6c5c42a7a /hicn-light/src/hicn/io
parent7356408ca1554468c9d7b9840aaaee28b4341c8d (diff)
[HICN-570] Message buffer (incl. CS and PIT changes)
Change-Id: I4c508e4b04dee3acbfc3da1d26e1770cb826f22b Signed-off-by: Jordan Augé <jordan.auge+fdio@cisco.com>
Diffstat (limited to 'hicn-light/src/hicn/io')
-rw-r--r--hicn-light/src/hicn/io/CMakeLists.txt1
-rw-r--r--hicn-light/src/hicn/io/base.c128
-rw-r--r--hicn-light/src/hicn/io/base.h38
-rw-r--r--hicn-light/src/hicn/io/hicn.c95
-rw-r--r--hicn-light/src/hicn/io/tcp.c42
-rw-r--r--hicn-light/src/hicn/io/udp.c113
6 files changed, 272 insertions, 145 deletions
diff --git a/hicn-light/src/hicn/io/CMakeLists.txt b/hicn-light/src/hicn/io/CMakeLists.txt
index d7e6977d6..80a1ba867 100644
--- a/hicn-light/src/hicn/io/CMakeLists.txt
+++ b/hicn-light/src/hicn/io/CMakeLists.txt
@@ -17,6 +17,7 @@ list(APPEND HEADER_FILES
)
list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/base.c
${CMAKE_CURRENT_SOURCE_DIR}/hicn.c
${CMAKE_CURRENT_SOURCE_DIR}/tcp.c
${CMAKE_CURRENT_SOURCE_DIR}/udp.c
diff --git a/hicn-light/src/hicn/io/base.c b/hicn-light/src/hicn/io/base.c
new file mode 100644
index 000000000..35d8915a8
--- /dev/null
+++ b/hicn-light/src/hicn/io/base.c
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file base.c
+ * #brief Implementation of base IO functions.
+ */
+
+#include <hicn/util/log.h>
+
+#include "base.h"
+
+/**
+ * @brief Helper function for listener to read a single packet on a socket
+ */
+ssize_t io_read_single_fd(int fd, msgbuf_t * msgbuf, address_t * address)
+{
+ uint8_t * packet = msgbuf_get_packet(msgbuf);
+ size_t size = msgbuf_get_len(msgbuf);
+
+ for(;;) {
+ ssize_t n = read(fd, packet, size);
+ if (n == 0)
+ return n;
+ if (n < 0) {
+ if (errno == EINTR)
+ continue; // XXX was break;
+ ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno));
+ return -1;
+ }
+
+ msgbuf->length = n;
+ *address = ADDRESS_ANY(AF_UNSPEC, 0); // XXX placeholder, see hicn.c
+ }
+
+ return 1;
+}
+
+ssize_t io_read_single_socket(int fd, msgbuf_t * msgbuf, address_t * address)
+{
+
+ struct sockaddr_storage * sa = (struct sockaddr_storage *)address;
+ socklen_t sa_len = sizeof(sa);
+
+ uint8_t * packet = msgbuf_get_packet(msgbuf);
+
+ for (;;) {
+ ssize_t n = recvfrom(fd, packet, MTU, 0, (struct sockaddr *)&sa, &sa_len);
+ if (n == 0)
+ return n;
+ if (n < 0) {
+ if (errno == EINTR)
+ continue; // XXX was break;
+ ERROR("recvfrom failed %d: (%d) %s", fd, errno, strerror(errno));
+ return -1;
+ }
+
+ msgbuf->length = n;
+ }
+
+ return 1;
+}
+
+#ifdef __linux__
+ssize_t io_read_batch_socket(int fd, msgbuf_t ** msgbuf,
+ address_t ** address, size_t batch_size)
+{
+ struct mmsghdr msghdr[batch_size];
+ struct iovec iovecs[batch_size];
+ struct sockaddr_storage addrs[batch_size];
+
+ /* Prepare the mmghdr struct for recvmmsg */
+ for (unsigned i = 0; i < MAX_MSG; i++) {
+ struct mmsghdr *msg = &msghdr[i];
+ *msg = (struct mmsghdr) {
+ .msg_hdr = {
+ .msg_iov = &iovecs[i],
+ .msg_iovlen = 1,
+ .msg_name = &addrs[i],
+ .msg_namelen = sizeof(struct sockaddr_storage),
+ .msg_control = NULL,
+ .msg_controllen = 0,
+ },
+ };
+
+ iovecs[i] = (struct iovec) {
+ .iov_base = msgbuf_get_packet(msgbuf[i]),
+ .iov_len = MTU,
+ };
+ }
+
+ int n;
+ for (;;) {
+ n = recvmmsg(fd, msghdr, batch_size, /* flags */ 0,
+ /* timeout */ NULL);
+ if (n == 0)
+ return 0;
+ if (n < 0) {
+ if (errno == EINTR)
+ continue; // XXX was break;
+ ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno));
+ return (ssize_t) n;
+ }
+
+ /* Assign size to msgbuf, and build address pair */
+ for (int i = 0; i < n; i++) {
+ struct mmsghdr *msg = &msghdr[i];
+ msgbuf[i]->length = msg->msg_hdr.msg_iovlen;
+ **address = *(address_t*)msg->msg_hdr.msg_name;
+ }
+ }
+
+ return n;
+}
+#endif /* __linux__ */
+
diff --git a/hicn-light/src/hicn/io/base.h b/hicn-light/src/hicn/io/base.h
new file mode 100644
index 000000000..639334072
--- /dev/null
+++ b/hicn-light/src/hicn/io/base.h
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file base.h
+ * #brief Base IO functions.
+ */
+
+#ifndef HICNLIGHT_IO_BASE
+#define HICNLIGHT_IO_BASE
+
+#include "../core/address_pair.h"
+#include "../core/msgbuf.h"
+
+#define MAX_MSG 64 //16 //32
+
+ssize_t io_read_single_fd(int fd, msgbuf_t * msgbuf,
+ address_t * address);
+
+ssize_t io_read_single_socket(int fd, msgbuf_t * msgbuf,
+ address_t * address);
+
+ssize_t io_read_batch_socket(int fd, msgbuf_t ** msgbuf,
+ address_t ** address, size_t n);
+
+#endif /* HICNLIGHT_IO_BASE */
diff --git a/hicn-light/src/hicn/io/hicn.c b/hicn-light/src/hicn/io/hicn.c
index 26c641e51..02b6728a9 100644
--- a/hicn-light/src/hicn/io/hicn.c
+++ b/hicn-light/src/hicn/io/hicn.c
@@ -26,17 +26,18 @@
#include <string.h>
#include <unistd.h>
-#include <hicn/core/listener.h>
-#include <hicn/core/listener_vft.h>
-#include <hicn/core/connection.h>
-#include <hicn/core/connection_vft.h>
-#include <hicn/core/connection_table.h>
-#include <hicn/core/forwarder.h>
-#include <hicn/core/mapme.h>
-#include <hicn/core/messagePacketType.h>
-#include <hicn/socket/api.h>
#include <hicn/util/log.h>
+#include "base.h"
+#include "../core/listener.h"
+#include "../core/listener_vft.h"
+#include "../core/connection.h"
+#include "../core/connection_vft.h"
+#include "../core/connection_table.h"
+#include "../core/forwarder.h"
+#include "../core/mapme.h"
+#include "../socket/api.h"
+
#define IPv6 6
#define IPv4 4
#define MTU_SIZE 1500 // bytes
@@ -242,65 +243,7 @@ typedef struct {
} listener_hicn_data_t;
-static
-void
-listener_hicn_read_callback(listener_t * listener, int fd, void * data)
-{
- assert(listener);
- assert(!data); /* No user data */
- uint8_t packet[MTU_SIZE];
-
- int family = address_family(&listener->address);
- if ((family != AF_INET) && (family != AF_INET6)) {
- /*
- * We need to discard the frame. Read 1 byte. This will clear it off
- * the stack.
- */
- int nread = read(fd, packet, 1);
-
- if (nread > 0) {
- DEBUG("Discarded frame from fd %d", fd);
- } else if (nread < 0) {
- ERROR("Error trying to discard frame from fd %d: (%d) %s", fd, errno,
- strerror(errno));
- }
- return;
- }
-
-#if 0
- if (!(what & PARCEventType_Read))
- return;
-#endif
-
- ssize_t n = read(fd, packet, MTU_SIZE);
- if (n < 0) {
- ERROR("read failed %d: (%d) %s", fd, errno, strerror(errno));
- return;
- }
-
-#if 0
- address_t packet_addr;
- if (_createAddressFromPacket(packet, &packet_addr) < 0)
- return;
-
- address_pair_t pair_find = {
- .local = packet_addr,
- .remote = /* dummy */ hicn->localAddress,
- };
- const Connection *conn = _lookupConnection(listener, &pair_find);
- if (!conn) {
- address_pair_t pair = {
- .local = hicn->localAddress,
- .remote = packet_addr,
- };
- connid = _createNewConnection(listener, fd, &pair);
- } else {
- connid = connection_GetConnectionId(conn);
- }
-#endif
-
- listener_read_callback(listener->forwarder, listener, fd, &listener->address, packet, n);
-}
+#define listener_hicn_read_callback listener_read_callback
bool
listener_hicn_bind(listener_t * listener, const address_t * address)
@@ -426,7 +369,8 @@ listener_hicn_get_socket(const listener_t * listener, const address_t * local,
{
assert(listener);
assert(listener_get_type(listener) == FACE_TYPE_HICN);
- // assert(pair);
+ assert(local);
+ assert(remote);
/* ... */
@@ -434,6 +378,9 @@ listener_hicn_get_socket(const listener_t * listener, const address_t * local,
}
+#define listener_hicn_read_single io_read_single_fd
+#define listener_hicn_read_batch NULL
+
DECLARE_LISTENER(hicn);
/******************************************************************************
@@ -503,7 +450,7 @@ static
int
connection_hicn_send_packet(const connection_t * connection, const uint8_t * packet, size_t size)
{
- // assert(ops);
+ assert(connection);
assert(packet);
/* ... */
@@ -511,12 +458,4 @@ connection_hicn_send_packet(const connection_t * connection, const uint8_t * pac
return 0;
}
-static
-void
-connection_hicn_read_callback(connection_t * connection, int fd, void * data)
-{
- ERROR("Unexpected read callback for hicn connection");
- return;
-}
-
DECLARE_CONNECTION(hicn);
diff --git a/hicn-light/src/hicn/io/tcp.c b/hicn-light/src/hicn/io/tcp.c
index 0ed9b4650..e4e2b06af 100644
--- a/hicn-light/src/hicn/io/tcp.c
+++ b/hicn-light/src/hicn/io/tcp.c
@@ -27,19 +27,19 @@
#include <stdio.h>
#include <string.h>
-#include <hicn/core/connection.h>
-#include <hicn/core/connection_vft.h>
-#include <hicn/core/listener.h>
-#include <hicn/core/listener_vft.h>
-#include <hicn/core/msgbuf.h>
-#include <hicn/core/forwarder.h>
-
-#include <hicn/core/messageHandler.h>
-
-#include <hicn/utils/commands.h>
+#include <hicn/hicn.h>
#include <hicn/util/log.h>
-#include <hicn/hicn.h>
+#include "base.h"
+#include "../core/connection.h"
+#include "../core/connection_vft.h"
+#include "../core/listener.h"
+#include "../core/listener_vft.h"
+#include "../core/msgbuf.h"
+#include "../core/forwarder.h"
+#include "../core/messageHandler.h"
+#include "../utils/commands.h"
+
// 128 KB output queue
#define OUTPUT_QUEUE_BYTES (128 * 1024)
@@ -93,13 +93,8 @@ listener_tcp_get_socket(const listener_t * listener, const address_t * local,
}
-static
-void
-listener_tcp_read_callback(listener_t * listener, int fd, void * data)
-{
- ERROR("[listener_tcp_read_callback] Not implemented");
-
-}
+#define listener_tcp_read_single io_read_single_socket
+#define listener_tcp_read_batch NULL
DECLARE_LISTENER(tcp);
@@ -324,11 +319,10 @@ connection_tcp_sendv(connnection_t * connection, struct iovec * iov,
// XXX too much repeated code with sendv here
static
int
-connection_tcp_send(const connection_t * connection, //const address_t * address,
- msgbuf_t * msgbuf, bool queue)
+connection_tcp_send(const connection_t * connection, msgbuf_t * msgbuf,
+ bool queue)
{
assert(connection);
- // assert(address);
/* msgbuf can be NULL */
/* No need to flush */
@@ -459,12 +453,13 @@ ERR:
* @param <#param1#>
* @return <#return#>
*/
+#if 0
static
void
connection_tcp_read_callback(connection_t * connection, int fd, void * user_data)
{
- // assert(!!(what & PARCEventType_Read));
assert(connection);
+ /* user_data can be NULL */
connection_tcp_data_t * data = connection->data;
assert(RECV_BUFLEN - data->woff > MTU);
@@ -487,7 +482,7 @@ connection_tcp_read_callback(connection_t * connection, int fd, void * user_data
uint8_t * packet = data->buf + data->roff;
size_t size = data->woff - data->roff; /* > 0 */
- ssize_t used = listener_read_callback(connection->forwarder, NULL, fd,
+ ssize_t used = listener_read_callback(NULL, fd,
address_pair_get_local(&connection->pair), packet, size);
if (used < 0)
return; // XXX close connection ?
@@ -517,6 +512,7 @@ connection_tcp_read_callback(connection_t * connection, int fd, void * user_data
return;
}
+#endif
#if 0
static
diff --git a/hicn-light/src/hicn/io/udp.c b/hicn-light/src/hicn/io/udp.c
index f0eaf8fd0..38d643838 100644
--- a/hicn-light/src/hicn/io/udp.c
+++ b/hicn-light/src/hicn/io/udp.c
@@ -31,6 +31,7 @@
#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
#ifndef _WIN32
#include <sys/uio.h>
@@ -46,19 +47,20 @@
#define UDP_GRO 104
#endif /* WITH_GSO */
-#include <hicn/core/address_pair.h>
-#include <hicn/core/connection.h>
-#include <hicn/core/connection_vft.h>
-#include <hicn/core/listener.h>
-#include <hicn/core/listener_vft.h>
-#include <hicn/base/loop.h>
-#include <hicn/core/msgbuf.h>
-#include <hicn/core/forwarder.h>
-#include <hicn/core/messageHandler.h>
-#include <hicn/core/messagePacketType.h>
-#include <hicn/hicn-light/config.h>
#include <hicn/util/log.h>
+#include "base.h"
+#include "../base/loop.h"
+#include "../core/address_pair.h"
+#include "../core/connection.h"
+#include "../core/connection_vft.h"
+#include "../core/forwarder.h"
+#include "../core/listener.h"
+#include "../core/listener_vft.h"
+#include "../core/messageHandler.h"
+#include "../core/msgbuf.h"
+//#include "../hicn-light/config.h"
+
// Batching based on recvmmsg is also generic
// the difference is the handling of packet as in tcp we need to go through the
// ring buffer first to do the framing, while in UDP this is already done
@@ -75,8 +77,6 @@
typedef struct {
uint16_t port; // in address ?
-
- batch_buffer_t bb;
} listener_udp_data_t;
#ifdef __ANDROID__
@@ -211,10 +211,10 @@ listener_udp_initialize(listener_t * listener)
{
assert(listener);
+#if 0
listener_udp_data_t * data = listener->data;
assert(data);
-
- init_batch_buffers(&data->bb);
+#endif
// XXX Socket creation should be a function per-se and not be called in
// initialize !
@@ -287,18 +287,13 @@ ERR_SOCKET:
return -1;
}
-static
-void
-listener_udp_read_callback(listener_t * listener, int fd, void * user_data)
-{
- assert(listener);
- assert(!user_data);
-
- listener_udp_data_t * data = listener->data;
- assert(data);
+#define listener_udp_read_single io_read_single_socket
- listener_batch_read_callback(listener->forwarder, listener, fd, &listener->address, &data->bb);
-}
+#ifdef __linux__
+#define listener_udp_read_batch io_read_batch_socket
+#else
+#define listener_udp_read_batch NULL
+#endif /* __linux__ */
DECLARE_LISTENER(udp);
@@ -307,7 +302,8 @@ DECLARE_LISTENER(udp);
******************************************************************************/
typedef struct {
- batch_buffer_t bb;
+ // XXX queue storage : msfbuf id
+ off_t queue[MAX_MSG]; // sized according to the max batch
int queue_len;
} connection_udp_data_t;
@@ -323,8 +319,6 @@ connection_udp_initialize(connection_t * connection)
connection_udp_data_t * data = connection->data;
assert(data);
- init_batch_buffers(&data->bb);
-
data->queue_len = 0;
return 0;
@@ -358,6 +352,10 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que
connection_udp_data_t * data = connection->data;
assert(data);
+ forwarder_t * forwarder = connection->forwarder;
+ msgbuf_pool_t * msgbuf_pool = forwarder_get_msgbuf_pool(forwarder);
+
+
/* Flush if required or if queue is full */
if ((!msgbuf) || (queue && (data->queue_len > MAX_MSG))) {
/* Flush operation */
@@ -366,7 +364,38 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que
#else
int flags = 0;
#endif /* WITH_ZEROCOPY */
- int n = sendmmsg(connection->fd, data->bb.msghdr, data->queue_len, flags);
+
+
+ // BEGIN : send batch
+
+ /* Preparing the struct mmsghdr for batch sending */
+ struct mmsghdr msghdr[MAX_MSG];
+ struct iovec iovecs[MAX_MSG];
+
+ /* Prepare the mmghdr struct for recvmmsg */
+ for (unsigned i = 0; i < data->queue_len; i++) {
+ struct mmsghdr *msg = &msghdr[i];
+ *msg = (struct mmsghdr) {
+ .msg_hdr = {
+ .msg_iov = &iovecs[i],
+ .msg_iovlen = 1,
+ .msg_name = NULL,
+ .msg_namelen = 0,
+ .msg_control = NULL,
+ .msg_controllen = 0,
+ },
+ };
+
+ msgbuf_t * msgbuf = msgbuf_pool_at(msgbuf_pool, data->queue[i]);
+ iovecs[i] = (struct iovec) {
+ .iov_base = msgbuf_get_packet(msgbuf),
+ .iov_len = msgbuf_get_len(msgbuf),
+ };
+ }
+
+ // XXX build mmsghdr from the msgbuf queue
+
+ int n = sendmmsg(connection->fd, msghdr, data->queue_len, flags);
if (n == -1) {
perror("sendmmsg()");
data->queue_len = 0;
@@ -384,11 +413,10 @@ connection_udp_send(const connection_t * connection, msgbuf_t * msgbuf, bool que
}
if (queue) {
- struct iovec *iovec = &data->bb.iovecs[data->queue_len++];
- iovec->iov_base = msgbuf_get_packet(msgbuf);
- iovec->iov_len = msgbuf_get_len(msgbuf);
-
+ /* Queue packet */
+ data->queue[data->queue_len++] = msgbuf_pool_get_id(msgbuf_pool, msgbuf);
} else {
+ /* Send one */
ssize_t writeLength = write(connection->fd, msgbuf_get_packet(msgbuf),
msgbuf_get_len(msgbuf));
@@ -467,16 +495,13 @@ connection_udp_send_packet(const connection_t * connection, const uint8_t * pack
return 0;
}
-static
-void
-connection_udp_read_callback(connection_t * connection, int fd, void * user_data)
-{
- assert(connection);
- assert(!user_data);
+#define connection_udp_read_single listener_read_batch_socket listener_single_socket
+
+#ifdef __linux__
+#define connection_udp_read_batch listener_read_batch_socket
+#else
+#define connection_udp_read_batch NULL
+#endif /* __linux__ */
- connection_udp_data_t * data = connection->data;
- assert(data);
- listener_batch_read_callback(connection->forwarder, NULL, fd, address_pair_get_local(&connection->pair), &data->bb);
-}
DECLARE_CONNECTION(udp);