aboutsummaryrefslogtreecommitdiffstats
path: root/hicn-light/src/hicn/io
diff options
context:
space:
mode:
authorJordan Augé <jordan.auge+fdio@cisco.com>2019-04-16 14:39:50 +0200
committerJordan Augé <jordan.auge+fdio@cisco.com>2019-04-16 15:37:53 +0200
commit3ed2badca49b8c7e636d8f8c40f532a7d3a7ba29 (patch)
treef865fe39515b3a496bfeee7ad8ef73916104caf4 /hicn-light/src/hicn/io
parentc365689250216861fd7727203ee6ba1049ad5778 (diff)
[HICN-177] Provide helpers to send, receive and process control messages
Change-Id: I5f7270568eaf24858346edebc638cf51e28cc5ad Signed-off-by: Jordan Augé <jordan.auge+fdio@cisco.com>
Diffstat (limited to 'hicn-light/src/hicn/io')
-rw-r--r--hicn-light/src/hicn/io/hicnConnection.c58
-rw-r--r--hicn-light/src/hicn/io/hicnListener.c12
-rw-r--r--hicn-light/src/hicn/io/ioOperations.c7
-rw-r--r--hicn-light/src/hicn/io/ioOperations.h10
-rw-r--r--hicn-light/src/hicn/io/streamConnection.c93
-rw-r--r--hicn-light/src/hicn/io/streamConnection.h3
-rw-r--r--hicn-light/src/hicn/io/udpConnection.c41
-rw-r--r--hicn-light/src/hicn/io/udpListener.c11
8 files changed, 137 insertions, 98 deletions
diff --git a/hicn-light/src/hicn/io/hicnConnection.c b/hicn-light/src/hicn/io/hicnConnection.c
index d9797786a..3c6f0612c 100644
--- a/hicn-light/src/hicn/io/hicnConnection.c
+++ b/hicn-light/src/hicn/io/hicnConnection.c
@@ -24,6 +24,7 @@
#include <src/hicn/config.h>
#include <stdio.h>
#include <string.h>
+#include <sys/uio.h>
#include <unistd.h>
#include <hicn/core/message.h>
@@ -75,7 +76,8 @@ typedef struct hicn_state {
// Prototypes
static bool _send(IoOperations *ops, const Address *nexthop, Message *message);
-static bool _sendCommandResponse(IoOperations *ops, struct iovec *message);
+static bool _sendIOVBuffer(IoOperations *ops, struct iovec *message,
+ size_t size);
static const Address *_getRemoteAddress(const IoOperations *ops);
static const AddressPair *_getAddressPair(const IoOperations *ops);
static unsigned _getConnectionId(const IoOperations *ops);
@@ -99,18 +101,20 @@ static const void *_streamConnection_Class(const IoOperations *ops) {
return _ioOperationsGuid;
}
-static IoOperations _template = {.closure = NULL,
- .send = &_send,
- .sendCommandResponse = &_sendCommandResponse,
- .getRemoteAddress = &_getRemoteAddress,
- .getAddressPair = &_getAddressPair,
- .getConnectionId = &_getConnectionId,
- .isUp = &_isUp,
- .isLocal = &_isLocal,
- .destroy = &_destroy,
- .class = &_streamConnection_Class,
- .getConnectionType = &_getConnectionType,
- .sendProbe = &_sendProbe};
+static IoOperations _template = {
+ .closure = NULL,
+ .send = &_send,
+ .sendIOVBuffer = &_sendIOVBuffer,
+ .getRemoteAddress = &_getRemoteAddress,
+ .getAddressPair = &_getAddressPair,
+ .getConnectionId = &_getConnectionId,
+ .isUp = &_isUp,
+ .isLocal = &_isLocal,
+ .destroy = &_destroy,
+ .class = &_streamConnection_Class,
+ .getConnectionType = &_getConnectionType,
+ .sendProbe = &_sendProbe,
+};
// =================================================================
@@ -329,10 +333,30 @@ static bool _send(IoOperations *ops, const Address *dummy, Message *message) {
return true;
}
-static bool _sendCommandResponse(IoOperations *ops, struct iovec *message) {
- //XXX this should be nerver called since we do not handle control messages
- //with hicn connections, so nothing to do here!
- return false;
+static bool _sendIOVBuffer(IoOperations *ops, struct iovec *message,
+ size_t size) {
+ parcAssertNotNull(ops, "Parameter ops must be non-null");
+ parcAssertNotNull(message, "Parameter message must be non-null");
+
+ _HicnState *hicnConnState = (_HicnState *)ioOperations_GetClosure(ops);
+
+
+ ssize_t n = writev(hicnConnState->hicnListenerSocket, message, size);
+ if (n < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ if (logger_IsLoggable(hicnConnState->logger, LoggerFacility_IO,
+ PARCLogLevel_Error)) {
+ size_t length = 0;
+ for (int i = 0; i < size; i++)
+ length += message[i].iov_len;
+ logger_Log(hicnConnState->logger, LoggerFacility_IO, PARCLogLevel_Error,
+ __func__, "Incorrect write length %zd, expected %zd: (%d) %s\n",
+ n, length, errno, strerror(errno));
+ }
+ }
+ return false;
+ }
+ return true;
}
static list_connections_type _getConnectionType(const IoOperations *ops) {
diff --git a/hicn-light/src/hicn/io/hicnListener.c b/hicn-light/src/hicn/io/hicnListener.c
index 3f6cab94e..525cae31c 100644
--- a/hicn-light/src/hicn/io/hicnListener.c
+++ b/hicn-light/src/hicn/io/hicnListener.c
@@ -500,6 +500,7 @@ const Connection *_findConnectionFromPacket(HicnListener *hicn,
return conn;
}
+#if 0
static Address *_createAddressFromPacket(uint8_t *msgBuffer) {
Address *packetAddr = NULL;
if (messageHandler_GetIPPacketType(msgBuffer) == IPv6_TYPE) {
@@ -521,6 +522,7 @@ static Address *_createAddressFromPacket(uint8_t *msgBuffer) {
}
return packetAddr;
}
+#endif
static void _handleProbeMessage(HicnListener *hicn, uint8_t *msgBuffer) {
Address *packetAddr = _createAddressFromPacket(msgBuffer);
@@ -691,16 +693,20 @@ static Message *_readMessage(HicnListener *hicn, int fd, uint8_t *msgBuffer) {
_handleWldrNotification(hicn, msgBuffer);
} else if (messageHandler_IsLoadBalancerProbe(msgBuffer)) {
_handleProbeMessage(hicn, msgBuffer);
- }
#ifdef WITH_MAPME
- else if (mapMe_isMapMe(msgBuffer)) {
+ } else if (mapMe_isMapMe(msgBuffer)) {
/* This function triggers the handling of the MAP-Me message, and we
* will return NULL so as to terminate the processing of this
* msgBuffer. */
_handleMapMe(hicn, fd, msgBuffer);
- }
#endif /* WITH_MAPME */
+ }
+
+ if (messageHandler_handleHooks(hicn->forwarder, hicn->connection_id,
+ hicn->localAddress, msgBuffer))
+ goto END;
+END:
return message;
}
diff --git a/hicn-light/src/hicn/io/ioOperations.c b/hicn-light/src/hicn/io/ioOperations.c
index 53adc47c5..a9b763448 100644
--- a/hicn-light/src/hicn/io/ioOperations.c
+++ b/hicn-light/src/hicn/io/ioOperations.c
@@ -28,12 +28,11 @@ bool ioOperations_Send(IoOperations *ops, const Address *nexthop,
return ops->send(ops, nexthop, message);
}
-bool ioOperations_SendCommandResponse(IoOperations *ops,
- struct iovec *message) {
- return ops->sendCommandResponse(ops, message);
+bool ioOperations_SendIOVBuffer(IoOperations *ops, struct iovec *message,
+ size_t size) {
+ return ops->sendIOVBuffer(ops, message, size);
}
-
const Address *ioOperations_GetRemoteAddress(const IoOperations *ops) {
return ops->getRemoteAddress(ops);
}
diff --git a/hicn-light/src/hicn/io/ioOperations.h b/hicn-light/src/hicn/io/ioOperations.h
index 115b017dc..c5e58d8b2 100644
--- a/hicn-light/src/hicn/io/ioOperations.h
+++ b/hicn-light/src/hicn/io/ioOperations.h
@@ -63,7 +63,8 @@ typedef struct io_ops IoOperations;
struct io_ops {
void *closure;
bool (*send)(IoOperations *ops, const Address *nexthop, Message *message);
- bool (*sendCommandResponse)(IoOperations *ops, struct iovec *message);
+ bool (*sendIOVBuffer)(IoOperations *ops, struct iovec *message, size_t
+ size);
const Address *(*getRemoteAddress)(const IoOperations *ops);
const AddressPair *(*getAddressPair)(const IoOperations *ops);
bool (*isUp)(const IoOperations *ops);
@@ -168,10 +169,10 @@ void ioOperations_Release(IoOperations **opsPtr);
* @endcode
*/
bool ioOperations_Send(IoOperations *ops, const Address *nexthop,
- Message *message);
+ Message *message);
-bool ioOperations_SendCommandResponse(IoOperations *ops,
- struct iovec *message);
+bool ioOperations_SendIOVBuffer(IoOperations *ops, struct iovec *message,
+ size_t size);
/**
* A connection is made up of a local and a remote address. This function
@@ -363,4 +364,5 @@ list_connections_type ioOperations_GetConnectionType(const IoOperations *ops);
Ticks ioOperations_SendProbe(IoOperations *ops, unsigned probeType,
uint8_t *message);
+
#endif // io_h
diff --git a/hicn-light/src/hicn/io/streamConnection.c b/hicn-light/src/hicn/io/streamConnection.c
index d9ba5a355..465e0c326 100644
--- a/hicn-light/src/hicn/io/streamConnection.c
+++ b/hicn-light/src/hicn/io/streamConnection.c
@@ -64,9 +64,9 @@ typedef struct stream_state {
// Prototypes
static bool _streamConnection_Send(IoOperations *ops, const Address *nexthop,
- Message *message);
-static bool _streamConnection_SendCommandResponse(IoOperations *ops,
- struct iovec *msg);
+ Message *message);
+static bool _streamConnection_SendIOVBuffer(IoOperations *ops, struct
+ iovec *msg, size_t size);
static const Address *_streamConnection_GetRemoteAddress(
const IoOperations *ops);
static const AddressPair *_streamConnection_GetAddressPair(
@@ -98,7 +98,7 @@ static const void *_streamConnection_Class(const IoOperations *ops) {
static IoOperations _template = {
.closure = NULL,
.send = &_streamConnection_Send,
- .sendCommandResponse = &_streamConnection_SendCommandResponse,
+ .sendIOVBuffer = &_streamConnection_SendIOVBuffer,
.getRemoteAddress = &_streamConnection_GetRemoteAddress,
.getAddressPair = &_streamConnection_GetAddressPair,
.getConnectionId = &_streamConnection_GetConnectionId,
@@ -286,52 +286,14 @@ static unsigned _streamConnection_GetConnectionId(const IoOperations *ops) {
return stream->id;
}
-bool _streamConnection_SendCommandResponse(IoOperations *ops,
- struct iovec *response) {
+bool _streamConnection_SendIOVBuffer(IoOperations *ops,
+ struct iovec * message, size_t size) {
parcAssertNotNull(ops, "Parameter ops must be non-null");
- parcAssertNotNull(response, "Parameter message must be non-null");
- _StreamState *conn = (_StreamState *)ioOperations_GetClosure(ops);
-
- bool success = false;
- if (conn->isUp) {
- PARCEventBuffer *buffer =
- parcEventBuffer_GetQueueBufferOutput(conn->bufferEventVector);
- size_t buffer_backlog = parcEventBuffer_GetLength(buffer);
- parcEventBuffer_Destroy(&buffer);
-
- if (buffer_backlog < OUTPUT_QUEUE_BYTES) {
- if (logger_IsLoggable(conn->logger, LoggerFacility_IO,
- PARCLogLevel_Debug)) {
- logger_Log(
- conn->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
- "connid %u Writing %zu bytes to buffer with backlog %zu bytes",
- conn->id,
- (response[0].iov_len +
- response[1].iov_len), // NEW: take total lenght
- buffer_backlog);
- }
+ parcAssertNotNull(message, "Parameter message must be non-null");
- // NEW: write directly ino the parcEventQueue without passing through
- // message
- int failure =
- parcEventQueue_Write(conn->bufferEventVector, response[0].iov_base,
- response[0].iov_len) +
- parcEventQueue_Write(conn->bufferEventVector, response[1].iov_base,
- response[1].iov_len);
+ _StreamState *conn = (_StreamState *)ioOperations_GetClosure(ops);
- if (failure == 0) {
- success = true;
- }
- } else {
- if (logger_IsLoggable(conn->logger, LoggerFacility_IO,
- PARCLogLevel_Warning)) {
- logger_Log(conn->logger, LoggerFacility_IO, PARCLogLevel_Warning,
- __func__,
- "connid %u Writing to buffer backlog %zu bytes DROP MESSAGE",
- conn->id, buffer_backlog);
- }
- }
- } else {
+ if (!conn->isUp) {
if (logger_IsLoggable(conn->logger, LoggerFacility_IO,
PARCLogLevel_Error)) {
logger_Log(
@@ -339,9 +301,44 @@ bool _streamConnection_SendCommandResponse(IoOperations *ops,
"connid %u tried to send to down connection (isUp %d isClosed %d)",
conn->id, conn->isUp, conn->isClosed);
}
+ return false;
}
- return success;
+ PARCEventBuffer *buffer =
+ parcEventBuffer_GetQueueBufferOutput(conn->bufferEventVector);
+ size_t buffer_backlog = parcEventBuffer_GetLength(buffer);
+ parcEventBuffer_Destroy(&buffer);
+
+ if (buffer_backlog >= OUTPUT_QUEUE_BYTES) {
+ if (logger_IsLoggable(conn->logger, LoggerFacility_IO,
+ PARCLogLevel_Warning)) {
+ logger_Log(conn->logger, LoggerFacility_IO, PARCLogLevel_Warning,
+ __func__,
+ "connid %u Writing to buffer backlog %zu bytes DROP MESSAGE",
+ conn->id, buffer_backlog);
+ }
+ return false;
+ }
+
+ if (logger_IsLoggable(conn->logger, LoggerFacility_IO,
+ PARCLogLevel_Debug)) {
+ size_t length = 0;
+ for (int i = 0; i < size; i++)
+ length += message[i].iov_len;
+
+ logger_Log( conn->logger, LoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "connid %u Writing %zu bytes to buffer with backlog %zu bytes",
+ conn->id, length, buffer_backlog);
+ }
+
+ /* Write directly into the parcEventQueue without passing through message */
+ for (int i = 0; i < size; i++) {
+ if (parcEventQueue_Write(conn->bufferEventVector, message[i].iov_base,
+ message[i].iov_len) != 0)
+ return false;
+ }
+
+ return true;
}
/**
diff --git a/hicn-light/src/hicn/io/streamConnection.h b/hicn-light/src/hicn/io/streamConnection.h
index 2b312e771..f483d0b82 100644
--- a/hicn-light/src/hicn/io/streamConnection.h
+++ b/hicn-light/src/hicn/io/streamConnection.h
@@ -70,6 +70,7 @@ IoOperations *streamConnection_AcceptConnection(Forwarder *forwarder, int fd,
IoOperations *streamConnection_OpenConnection(Forwarder *forwarder,
AddressPair *pair, bool isLocal);
-bool streamState_SendCommandResponse(IoOperations *ops, struct iovec *response);
+bool streamState_SendIOVBuffer(IoOperations *ops, struct iovec *response,
+ size_t size);
#endif // streamConnection_h
diff --git a/hicn-light/src/hicn/io/udpConnection.c b/hicn-light/src/hicn/io/udpConnection.c
index b057f03ff..69cbea0e1 100644
--- a/hicn-light/src/hicn/io/udpConnection.c
+++ b/hicn-light/src/hicn/io/udpConnection.c
@@ -61,7 +61,8 @@ typedef struct udp_state {
// Prototypes
static bool _send(IoOperations *ops, const Address *nexthop, Message *message);
-static bool _sendCommandResponse(IoOperations *ops, struct iovec *message);
+static bool _sendIOVBuffer(IoOperations *ops, struct iovec *message,
+ size_t size);
static const Address *_getRemoteAddress(const IoOperations *ops);
static const AddressPair *_getAddressPair(const IoOperations *ops);
static unsigned _getConnectionId(const IoOperations *ops);
@@ -71,6 +72,7 @@ static void _destroy(IoOperations **opsPtr);
static list_connections_type _getConnectionType(const IoOperations *ops);
static Ticks _sendProbe(IoOperations *ops, unsigned probeType,
uint8_t *message);
+
/*
* This assigns a unique pointer to the void * which we use
* as a GUID for this class.
@@ -84,18 +86,20 @@ static const void *_streamConnection_Class(const IoOperations *ops) {
return _IoOperationsGuid;
}
-static IoOperations _template = {.closure = NULL,
- .send = &_send,
- .sendCommandResponse = &_sendCommandResponse,
- .getRemoteAddress = &_getRemoteAddress,
- .getAddressPair = &_getAddressPair,
- .getConnectionId = &_getConnectionId,
- .isUp = &_isUp,
- .isLocal = &_isLocal,
- .destroy = &_destroy,
- .class = &_streamConnection_Class,
- .getConnectionType = &_getConnectionType,
- .sendProbe = &_sendProbe};
+static IoOperations _template = {
+ .closure = NULL,
+ .send = &_send,
+ .sendIOVBuffer = &_sendIOVBuffer,
+ .getRemoteAddress = &_getRemoteAddress,
+ .getAddressPair = &_getAddressPair,
+ .getConnectionId = &_getConnectionId,
+ .isUp = &_isUp,
+ .isLocal = &_isLocal,
+ .destroy = &_destroy,
+ .class = &_streamConnection_Class,
+ .getConnectionType = &_getConnectionType,
+ .sendProbe = &_sendProbe,
+};
// =================================================================
@@ -263,7 +267,8 @@ static bool _send(IoOperations *ops, const Address *dummy, Message *message) {
return true;
}
-static bool _sendCommandResponse(IoOperations *ops, struct iovec *message){
+static bool _sendIOVBuffer(IoOperations *ops, struct iovec *message,
+ size_t size) {
parcAssertNotNull(ops, "Parameter ops must be non-null");
parcAssertNotNull(message, "Parameter message must be non-null");
_UdpState *udpConnState = (_UdpState *)ioOperations_GetClosure(ops);
@@ -276,7 +281,7 @@ static bool _sendCommandResponse(IoOperations *ops, struct iovec *message){
udpConnState->peerAddress,
udpConnState->peerAddressLength);
- ssize_t writeLength = writev(udpConnState->udpListenerSocket, message, 2);
+ ssize_t writeLength = writev(udpConnState->udpListenerSocket, message, size);
struct sockaddr any_address = {0};
any_address.sa_family = AF_UNSPEC;
@@ -287,15 +292,15 @@ static bool _sendCommandResponse(IoOperations *ops, struct iovec *message){
return false;
}
#else
- WSABUF dataBuf[2];
+ WSABUF dataBuf[ARRAY_SIZE(message)];
DWORD BytesSent = 0;
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < ARRAY_SIZE(message); i++) {
dataBuf[i].buf = message[i].iov_base;
dataBuf[i].len = (ULONG)message[i].iov_len;
}
- int rc = WSASendTo(udpConnState->udpListenerSocket, dataBuf, 2,
+ int rc = WSASendTo(udpConnState->udpListenerSocket, dataBuf, ARRAY_SIZE(message),
&BytesSent, 0, (SOCKADDR *)udpConnState->peerAddress,
udpConnState->peerAddressLength, NULL, NULL);
diff --git a/hicn-light/src/hicn/io/udpListener.c b/hicn-light/src/hicn/io/udpListener.c
index e73896372..9b7cf60af 100644
--- a/hicn-light/src/hicn/io/udpListener.c
+++ b/hicn-light/src/hicn/io/udpListener.c
@@ -451,13 +451,18 @@ static Message *_readMessage(UdpListener *udp, int fd,
} else if (messageHandler_IsLoadBalancerProbe(packet)) {
*processed = true;
_handleProbeMessage(udp, packet);
- }
#ifdef WITH_MAPME
- else if (mapMe_isMapMe(packet)) {
+ } else if (mapMe_isMapMe(packet)) {
*processed = true;
forwarder_ProcessMapMe(udp->forwarder, packet, connid);
- }
#endif /* WITH_MAPME */
+ }
+
+ /* Generic hook handler */
+ if (messageHandler_handleHooks(udp->forwarder, CONNECTION_ID_UNDEFINED,
+ udp->localAddress, packet))
+ goto END;
+END:
return message;
}