aboutsummaryrefslogtreecommitdiffstats
path: root/metis/ccnx/forwarder/metis/io/metis_StreamConnection.c
diff options
context:
space:
mode:
Diffstat (limited to 'metis/ccnx/forwarder/metis/io/metis_StreamConnection.c')
-rw-r--r--metis/ccnx/forwarder/metis/io/metis_StreamConnection.c570
1 files changed, 570 insertions, 0 deletions
diff --git a/metis/ccnx/forwarder/metis/io/metis_StreamConnection.c b/metis/ccnx/forwarder/metis/io/metis_StreamConnection.c
new file mode 100644
index 00000000..225e6d0d
--- /dev/null
+++ b/metis/ccnx/forwarder/metis/io/metis_StreamConnection.c
@@ -0,0 +1,570 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common activity for STREAM based listeners.
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <stdbool.h>
+#include <string.h>
+#include <errno.h>
+
+#include <ccnx/forwarder/metis/io/metis_StreamConnection.h>
+#include <ccnx/forwarder/metis/core/metis_Forwarder.h>
+#include <ccnx/forwarder/metis/core/metis_Connection.h>
+#include <ccnx/forwarder/metis/core/metis_Message.h>
+#include <parc/algol/parc_Hash.h>
+
+#include <LongBow/runtime.h>
+#include <parc/algol/parc_Memory.h>
+#include <ccnx/forwarder/metis/tlv/metis_Tlv.h>
+
+// 128 KB output queue
+#define OUTPUT_QUEUE_BYTES (128 * 1024)
+
+static void
+_conn_readcb(PARCEventQueue *bufferEventVector, PARCEventType type, void *ioOpsVoid);
+
+static void
+_conn_eventcb(PARCEventQueue *bufferEventVector, PARCEventQueueEventType events, void *ioOpsVoid);
+
+typedef struct metis_stream_state {
+ MetisForwarder *metis;
+ MetisLogger *logger;
+
+ int fd;
+
+ MetisAddressPair *addressPair;
+ PARCEventQueue *bufferEventVector;
+
+ bool isLocal;
+ bool isUp;
+ bool isClosed;
+ unsigned id;
+
+ size_t nextMessageLength;
+} _MetisStreamState;
+
+// Prototypes
+static bool _metisStreamConnection_Send(MetisIoOperations *ops, const CPIAddress *nexthop, MetisMessage *message);
+static const CPIAddress *_metisStreamConnection_GetRemoteAddress(const MetisIoOperations *ops);
+static const MetisAddressPair *_metisStreamConnection_GetAddressPair(const MetisIoOperations *ops);
+static unsigned _metisStreamConnection_GetConnectionId(const MetisIoOperations *ops);
+static bool _metisStreamConnection_IsUp(const MetisIoOperations *ops);
+static bool _metisStreamConnection_IsLocal(const MetisIoOperations *ops);
+static void _metisStreamConnection_DestroyOperations(MetisIoOperations **opsPtr);
+
+static void _setConnectionState(_MetisStreamState *stream, bool isUp);
+static CPIConnectionType _metisStreamConnection_GetConnectionType(const MetisIoOperations *ops);
+static MetisTicks _sendProbe(MetisIoOperations *ops, unsigned probeType);
+
+/*
+ * This assigns a unique pointer to the void * which we use
+ * as a GUID for this class.
+ */
+static const void *_metisIoOperationsGuid = __FILE__;
+
+/*
+ * Return our GUID
+ */
+static const void *
+_metisStreamConnection_Class(const MetisIoOperations *ops)
+{
+ return _metisIoOperationsGuid;
+}
+
+static MetisIoOperations _template = {
+ .closure = NULL,
+ .send = &_metisStreamConnection_Send,
+ .getRemoteAddress = &_metisStreamConnection_GetRemoteAddress,
+ .getAddressPair = &_metisStreamConnection_GetAddressPair,
+ .getConnectionId = &_metisStreamConnection_GetConnectionId,
+ .isUp = &_metisStreamConnection_IsUp,
+ .isLocal = &_metisStreamConnection_IsLocal,
+ .destroy = &_metisStreamConnection_DestroyOperations,
+ .class = &_metisStreamConnection_Class,
+ .getConnectionType = &_metisStreamConnection_GetConnectionType,
+ .sendProbe = &_sendProbe,
+};
+
+MetisIoOperations *
+metisStreamConnection_AcceptConnection(MetisForwarder *metis, int fd, MetisAddressPair *pair, bool isLocal)
+{
+ _MetisStreamState *stream = parcMemory_AllocateAndClear(sizeof(_MetisStreamState));
+ assertNotNull(stream, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(_MetisStreamState));
+
+ MetisDispatcher *dispatcher = metisForwarder_GetDispatcher(metis);
+ PARCEventScheduler *eventBase = metisDispatcher_GetEventScheduler(dispatcher);
+ stream->bufferEventVector = parcEventQueue_Create(eventBase, fd, PARCEventQueueOption_CloseOnFree | PARCEventQueueOption_DeferCallbacks);
+
+ stream->metis = metis;
+ stream->logger = metisLogger_Acquire(metisForwarder_GetLogger(metis));
+ stream->fd = fd;
+ stream->id = metisForwarder_GetNextConnectionId(metis);
+ stream->addressPair = pair;
+ stream->isClosed = false;
+
+ // allocate a connection
+ MetisIoOperations *io_ops = parcMemory_AllocateAndClear(sizeof(MetisIoOperations));
+ assertNotNull(io_ops, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(MetisIoOperations));
+ memcpy(io_ops, &_template, sizeof(MetisIoOperations));
+ io_ops->closure = stream;
+ stream->isLocal = isLocal;
+
+ parcEventQueue_SetCallbacks(stream->bufferEventVector, _conn_readcb, NULL, _conn_eventcb, (void *) io_ops);
+ parcEventQueue_Enable(stream->bufferEventVector, PARCEventType_Read);
+
+ metisMessenger_Send(metisForwarder_GetMessenger(stream->metis), metisMissive_Create(MetisMissiveType_ConnectionCreate, stream->id));
+
+ // As we are acceting a connection, we begin in the UP state
+ _setConnectionState(stream, true);
+
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Debug)) {
+ char *pair_str = metisAddressPair_ToString(pair);
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "StreamConnection %p accept for address pair %s", (void *) stream, pair_str);
+ free(pair_str);
+ }
+
+ return io_ops;
+}
+
+MetisIoOperations *
+metisStreamConnection_OpenConnection(MetisForwarder *metis, MetisAddressPair *pair, bool isLocal)
+{
+ assertNotNull(metis, "Parameter metis must be non-null");
+ assertNotNull(pair, "Parameter pair must be non-null");
+
+ // if there's an error on the bind or connect, will return NULL
+ PARCEventQueue *bufferEventVector = metisDispatcher_StreamBufferConnect(metisForwarder_GetDispatcher(metis), pair);
+ if (bufferEventVector == NULL) {
+ // error opening connection
+ return NULL;
+ }
+
+ _MetisStreamState *stream = parcMemory_AllocateAndClear(sizeof(_MetisStreamState));
+ assertNotNull(stream, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(_MetisStreamState));
+
+ stream->metis = metis;
+ stream->logger = metisLogger_Acquire(metisForwarder_GetLogger(metis));
+ stream->fd = parcEventQueue_GetFileDescriptor(bufferEventVector);
+ stream->bufferEventVector = bufferEventVector;
+ stream->id = metisForwarder_GetNextConnectionId(metis);
+ stream->addressPair = pair;
+ stream->isClosed = false;
+
+ // allocate a connection
+ MetisIoOperations *io_ops = parcMemory_AllocateAndClear(sizeof(MetisIoOperations));
+ assertNotNull(io_ops, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(MetisIoOperations));
+ memcpy(io_ops, &_template, sizeof(MetisIoOperations));
+ io_ops->closure = stream;
+ stream->isLocal = isLocal;
+
+ parcEventQueue_SetCallbacks(stream->bufferEventVector, _conn_readcb, NULL, _conn_eventcb, (void *) io_ops);
+ parcEventQueue_Enable(stream->bufferEventVector, PARCEventType_Read);
+
+ // we start in DOWN state, until remote side answers
+ metisMessenger_Send(metisForwarder_GetMessenger(stream->metis), metisMissive_Create(MetisMissiveType_ConnectionCreate, stream->id));
+ _setConnectionState(stream, false);
+
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Info)) {
+ char *pair_str = metisAddressPair_ToString(pair);
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Info, __func__,
+ "StreamConnection %p connect for address pair %s", (void *) stream, pair_str);
+ free(pair_str);
+ }
+
+ return io_ops;
+}
+
+static void
+_metisStreamConnection_DestroyOperations(MetisIoOperations **opsPtr)
+{
+ assertNotNull(opsPtr, "Parameter opsPtr must be non-null double pointer");
+ assertNotNull(*opsPtr, "Parameter opsPtr must dereference to non-null pointer");
+
+ MetisIoOperations *ops = *opsPtr;
+ assertNotNull(metisIoOperations_GetClosure(ops), "ops->context must not be null");
+
+ _MetisStreamState *stream = (_MetisStreamState *) metisIoOperations_GetClosure(ops);
+
+ parcEventQueue_Destroy(&stream->bufferEventVector);
+
+ metisAddressPair_Release(&stream->addressPair);
+
+ if (!stream->isClosed) {
+ stream->isClosed = true;
+ metisMessenger_Send(metisForwarder_GetMessenger(stream->metis), metisMissive_Create(MetisMissiveType_ConnectionClosed, stream->id));
+ }
+
+ metisMessenger_Send(metisForwarder_GetMessenger(stream->metis), metisMissive_Create(MetisMissiveType_ConnectionDestroyed, stream->id));
+
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Info)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Info, __func__,
+ "StreamConnection %p destroyed", (void *) stream);
+ }
+
+ metisLogger_Release(&stream->logger);
+ parcMemory_Deallocate((void **) &stream);
+ parcMemory_Deallocate((void **) &ops);
+
+ *opsPtr = NULL;
+}
+
+static bool
+_metisStreamConnection_IsUp(const MetisIoOperations *ops)
+{
+ assertNotNull(ops, "Parameter must be non-null");
+ const _MetisStreamState *stream = (const _MetisStreamState *) metisIoOperations_GetClosure(ops);
+ return stream->isUp;
+}
+
+static bool
+_metisStreamConnection_IsLocal(const MetisIoOperations *ops)
+{
+ assertNotNull(ops, "Parameter must be non-null");
+ const _MetisStreamState *stream = (const _MetisStreamState *) metisIoOperations_GetClosure(ops);
+ return stream->isLocal;
+}
+
+static const CPIAddress *
+_metisStreamConnection_GetRemoteAddress(const MetisIoOperations *ops)
+{
+ assertNotNull(ops, "Parameter must be non-null");
+ const _MetisStreamState *stream = (const _MetisStreamState *) metisIoOperations_GetClosure(ops);
+ return metisAddressPair_GetRemote(stream->addressPair);
+}
+
+static const MetisAddressPair *
+_metisStreamConnection_GetAddressPair(const MetisIoOperations *ops)
+{
+ assertNotNull(ops, "Parameter must be non-null");
+ const _MetisStreamState *stream = (const _MetisStreamState *) metisIoOperations_GetClosure(ops);
+ return stream->addressPair;
+}
+
+static unsigned
+_metisStreamConnection_GetConnectionId(const MetisIoOperations *ops)
+{
+ assertNotNull(ops, "Parameter must be non-null");
+ const _MetisStreamState *stream = (const _MetisStreamState *) metisIoOperations_GetClosure(ops);
+ return stream->id;
+}
+
+/**
+ * @function metisStreamConnection_Send
+ * @abstract Non-destructive send of the message.
+ * @discussion
+ * Send uses metisMessage_CopyToStreamBuffer, which is a non-destructive write.
+ * The send may fail if there's no buffer space in the output queue.
+ *
+ * @param dummy is ignored. A stream has only one peer.
+ * @return <#return#>
+ */
+static bool
+_metisStreamConnection_Send(MetisIoOperations *ops, const CPIAddress *dummy, MetisMessage *message)
+{
+ assertNotNull(ops, "Parameter ops must be non-null");
+ assertNotNull(message, "Parameter message must be non-null");
+ _MetisStreamState *stream = (_MetisStreamState *) metisIoOperations_GetClosure(ops);
+
+ bool success = false;
+ if (stream->isUp) {
+ PARCEventBuffer *buffer = parcEventBuffer_GetQueueBufferOutput(stream->bufferEventVector);
+ size_t buffer_backlog = parcEventBuffer_GetLength(buffer);
+ parcEventBuffer_Destroy(&buffer);
+
+ if (buffer_backlog < OUTPUT_QUEUE_BYTES) {
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Debug)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "connid %u Writing %zu bytes to buffer with backlog %zu bytes",
+ stream->id,
+ metisMessage_Length(message),
+ buffer_backlog);
+ }
+
+ int failure = metisMessage_Write(stream->bufferEventVector, message);
+ if (failure == 0) {
+ success = true;
+ }
+ } else {
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Warning)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Warning, __func__,
+ "connid %u Writing to buffer backlog %zu bytes DROP MESSAGE",
+ stream->id,
+ buffer_backlog);
+ }
+ }
+ } else {
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Error)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Error, __func__,
+ "connid %u tried to send to down connection (isUp %d isClosed %d)",
+ stream->id,
+ stream->isUp,
+ stream->isClosed);
+ }
+ }
+
+ return success;
+}
+
+static CPIConnectionType
+_metisStreamConnection_GetConnectionType(const MetisIoOperations *ops)
+{
+ return cpiConnection_TCP;
+}
+
+static MetisTicks
+_sendProbe(MetisIoOperations *ops, unsigned probeType)
+{
+ //TODO
+ return 0;
+}
+
+// =================================================================
+// the actual I/O functions
+
+/**
+ * @function startNewMessage
+ * @abstract Peek at the fixed header and set the stream->nextMessageLength
+ * @discussion
+ * This function manipulates the stream->nextMessageLength. After reading a FixedHeader, set nextMessageLength
+ * to the total length of the message.
+ *
+ * PRECONDITION: stream->nextMessageLength == 0
+ * PRECONDITION: inputBytesAvailable >= FIXED_HEADER_LEN
+ *
+ * @param stream is the stream begin parsed.
+ * @param input is the input PARCEventBuffer (corresponds to the buffer event's input)
+ * @param inputBytesAvailable is the number of bytes available in the input PARCEventBuffer.
+ * @return <#return#>
+ */
+static void
+_startNewMessage(_MetisStreamState *stream, PARCEventBuffer *input, size_t inputBytesAvailable)
+{
+ assertTrue(stream->nextMessageLength == 0, "Invalid state, nextMessageLength not zero: %zu", stream->nextMessageLength);
+ assertTrue(inputBytesAvailable >= metisTlv_FixedHeaderLength(), "read_length not a whole fixed header!: %zd", inputBytesAvailable);
+
+ // this linearizes the first FIXED_HEADER_LEN bytes of the input buffer's iovecs and
+ // returns a pointer to it.
+ uint8_t *fh = parcEventBuffer_Pullup(input, metisTlv_FixedHeaderLength());
+
+ // Calculate the total message size based on the fixed header
+ stream->nextMessageLength = metisTlv_TotalPacketLength(fh);
+}
+
+/**
+ * @function readMessage
+ * @abstract Read the complete message from the input
+ * @discussion
+ * Called to read a complete message from the input and return a MetisMessage
+ *
+ * PRECONDITION: There are at least <code>stream->nextMessageLength</code>
+ * bytes available on the input PARCEventBuffer.
+ *
+ * @param stream is the stream being parsed
+ * @param time is the current forwarder time (metisForwarder_GetTicks(stream->metis))
+ * @param input is the input PARCEventBuffer to readessage bytes.
+ * @return <#return#>
+ */
+static MetisMessage *
+_readMessage(_MetisStreamState *stream, MetisTicks time, PARCEventBuffer *input)
+{
+ MetisMessage *message = metisMessage_ReadFromBuffer(stream->id, time, input, stream->nextMessageLength, stream->logger);
+
+ return message;
+}
+
+/**
+ * @function single_read
+ * @abstract Read at most 1 message from the network
+ * @discussion
+ * If a complete message is ready on the input buffer, will allocate and return it.
+ *
+ * This function manipulates the stream->nextMessageLength. (1) Initializes with nextMessageLength = 0,
+ * which means we have not started parsing a packet. (2) After reading a FixedHeader, set nextMessageLength
+ * to the total length of the message. (3) After reading nextMessageLength bytes, return the outputBuffer
+ * and reset nextMessageLength to 0.
+ *
+ * @param input is the PARCEventBuffer to read
+ * @param stream is the related stream state for the input
+ * @return true if there's more to read after this message.
+ */
+static MetisMessage *
+_single_read(PARCEventBuffer *input, _MetisStreamState *stream)
+{
+ size_t bytesAvailable = parcEventBuffer_GetLength(input);
+
+ assertTrue(bytesAvailable >= metisTlv_FixedHeaderLength(), "Called with too short an input: %zu", bytesAvailable);
+
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Debug)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "connid %u read %zu bytes",
+ stream->id, bytesAvailable);
+ }
+
+ if (stream->nextMessageLength == 0) {
+ _startNewMessage(stream, input, bytesAvailable);
+ }
+
+ // This is not an ELSE statement. We can both start a new message then
+ // check if there's enough bytes to read the whole thing.
+
+ if (bytesAvailable >= stream->nextMessageLength) {
+ MetisMessage *message = _readMessage(stream, metisForwarder_GetTicks(stream->metis), input);
+
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Debug)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Debug, __func__,
+ "connid %u msg_length %zu read_length %zu, resetting parser",
+ stream->id,
+ stream->nextMessageLength,
+ bytesAvailable);
+ }
+
+ // now reset message length for next packet
+ stream->nextMessageLength = 0;
+
+ return message;
+ }
+
+ return NULL;
+}
+
+/**
+ * @function conn_readcb
+ * @abstract Event callback for reads
+ * @discussion
+ * Will read messages off the input. Continues reading as long as we
+ * can get a header to determine the next message length or as long as we
+ * can read a complete message.
+ *
+ * This function manipulates the read low water mark. (1) read a fixed header plus complete message,
+ * then set the low water mark to FIXED_HEADER_LEN. (2) read a fixed header, but not a complete
+ * message, then set low water mark to the total mesage length. Using the low water mark like this
+ * means the buffer event will only trigger on meaningful byte boundaries when we can get actual
+ * work done.
+ *
+ * @param <#param1#>
+ * @return <#return#>
+ */
+static void
+_conn_readcb(PARCEventQueue *event, PARCEventType type, void *ioOpsVoid)
+{
+ MetisIoOperations *ops = (MetisIoOperations *) ioOpsVoid;
+ _MetisStreamState *stream = (_MetisStreamState *) metisIoOperations_GetClosure(ops);
+
+ PARCEventBuffer *input = parcEventBuffer_GetQueueBufferInput(event);
+
+ // drain the input buffer
+ while (parcEventBuffer_GetLength(input) >= metisTlv_FixedHeaderLength() && parcEventBuffer_GetLength(input) >= stream->nextMessageLength) {
+ // this may set the stream->nextMessageLength
+ MetisMessage *message = _single_read(input, stream);
+
+ if (message) {
+ metisForwarder_Receive(stream->metis, message);
+ }
+ }
+
+ if (stream->nextMessageLength == 0) {
+ // we don't have the next header, so set it to the header length
+ metisStreamBuffer_SetWatermark(event, true, false, metisTlv_FixedHeaderLength(), 0);
+ } else {
+ // set it to the packet length
+ metisStreamBuffer_SetWatermark(event, true, false, stream->nextMessageLength, 0);
+ }
+ parcEventBuffer_Destroy(&input);
+}
+
+static void
+_setConnectionState(_MetisStreamState *stream, bool isUp)
+{
+ assertNotNull(stream, "Parameter stream must be non-null");
+
+ MetisMessenger *messenger = metisForwarder_GetMessenger(stream->metis);
+
+ bool oldStateIsUp = stream->isUp;
+ stream->isUp = isUp;
+
+ if (oldStateIsUp && !isUp) {
+ // bring connection DOWN
+ MetisMissive *missive = metisMissive_Create(MetisMissiveType_ConnectionDown, stream->id);
+ metisMessenger_Send(messenger, missive);
+ return;
+ }
+
+
+ if (!oldStateIsUp && isUp) {
+ // bring connection UP
+ MetisMissive *missive = metisMissive_Create(MetisMissiveType_ConnectionUp, stream->id);
+ metisMessenger_Send(messenger, missive);
+ return;
+ }
+}
+
+static void
+_conn_eventcb(PARCEventQueue *event, PARCEventQueueEventType events, void *ioOpsVoid)
+{
+ MetisIoOperations *ops = (MetisIoOperations *) ioOpsVoid;
+ _MetisStreamState *stream = (_MetisStreamState *) metisIoOperations_GetClosure(ops);
+
+ if (events & PARCEventQueueEventType_Connected) {
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Info)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Info, __func__,
+ "Connection %u is connected", stream->id);
+ }
+
+ // if the stream was closed, do not transition to an UP state
+ if (!stream->isClosed) {
+ _setConnectionState(stream, true);
+ }
+ } else
+ if (events & PARCEventQueueEventType_EOF) {
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Info)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Info, __func__,
+ "connid %u closed.",
+ stream->id);
+ }
+
+ parcEventQueue_Disable(stream->bufferEventVector, PARCEventType_Read);
+
+ _setConnectionState(stream, false);
+
+ if (!stream->isClosed) {
+ stream->isClosed = true;
+ // this will cause the connection manager to destroy the connection later
+ metisMessenger_Send(metisForwarder_GetMessenger(stream->metis), metisMissive_Create(MetisMissiveType_ConnectionClosed, stream->id));
+ }
+ } else
+ if (events & PARCEventQueueEventType_Error) {
+ if (metisLogger_IsLoggable(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Error)) {
+ metisLogger_Log(stream->logger, MetisLoggerFacility_IO, PARCLogLevel_Error, __func__,
+ "Got an error on the connection %u: %s", stream->id, strerror(errno));
+ }
+
+ parcEventQueue_Disable(stream->bufferEventVector, PARCEventType_Read | PARCEventType_Write);
+
+ _setConnectionState(stream, false);
+
+ if (!stream->isClosed) {
+ stream->isClosed = true;
+ // this will cause the connection manager to destroy the connection later
+ metisMessenger_Send(metisForwarder_GetMessenger(stream->metis), metisMissive_Create(MetisMissiveType_ConnectionClosed, stream->id));
+ }
+ }
+ /* None of the other events can happen here, since we haven't enabled
+ * timeouts */
+}