aboutsummaryrefslogtreecommitdiffstats
path: root/libparc/parc/algol/parc_EventQueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'libparc/parc/algol/parc_EventQueue.c')
-rwxr-xr-xlibparc/parc/algol/parc_EventQueue.c353
1 files changed, 353 insertions, 0 deletions
diff --git a/libparc/parc/algol/parc_EventQueue.c b/libparc/parc/algol/parc_EventQueue.c
new file mode 100755
index 00000000..5d656a12
--- /dev/null
+++ b/libparc/parc/algol/parc_EventQueue.c
@@ -0,0 +1,353 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ */
+#include <config.h>
+#include <errno.h>
+
+#include <LongBow/runtime.h>
+
+#include "internal_parc_Event.h"
+#include <parc/algol/parc_EventScheduler.h>
+#include <parc/algol/parc_EventQueue.h>
+#include <parc/algol/parc_FileOutputStream.h>
+#include <parc/logging/parc_Log.h>
+#include <parc/logging/parc_LogReporterFile.h>
+
+static int _parc_event_queue_debug_enabled = 0;
+
+#define parcEventQueue_LogDebug(parcEventQueue, ...) \
+ if (_parc_event_queue_debug_enabled) \
+ parcLog_Debug(parcEventScheduler_GetLogger(parcEventQueue->eventScheduler), __VA_ARGS__)
+
+/**
+ * Current implementation based on top of libevent2
+ */
+
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+
+/**
+ * @typedef PARCEventQueue
+ * @brief A structure containing private event state
+ */
+struct PARCEventQueue {
+ // Event scheduler we have been queued with
+ PARCEventScheduler *eventScheduler;
+
+ struct bufferevent *buffereventBuffer;
+ // Interpose on bufferevent callbacks
+ PARCEventQueue_Callback *readCallback;
+ void *readUserData;
+ PARCEventQueue_Callback *writeCallback;
+ void *writeUserData;
+ PARCEventQueue_EventCallback *eventCallback;
+ void *eventUserData;
+};
+
+struct PARCEventQueuePair {
+ PARCEventQueue *up;
+ PARCEventQueue *down;
+};
+
+static void
+_parc_queue_read_callback(struct bufferevent *bev, void *ptr)
+{
+ PARCEventQueue *parcEventQueue = (PARCEventQueue *) ptr;
+ parcEventQueue_LogDebug(parcEventQueue,
+ "_parc_queue_read_callback(bev=%p,ptr->buffereventBuffer=%p,parcEventQueue=%p)\n",
+ bev, parcEventQueue->buffereventBuffer, parcEventQueue);
+ assertNotNull(parcEventQueue->readCallback, "parcEvent read callback called when NULL");
+
+ parcEventQueue->readCallback(parcEventQueue, PARCEventType_Read, parcEventQueue->readUserData);
+}
+
+static void
+_parc_queue_write_callback(struct bufferevent *bev, void *ptr)
+{
+ PARCEventQueue *parcEventQueue = (PARCEventQueue *) ptr;
+ parcEventQueue_LogDebug(parcEventQueue,
+ "_parc_queue_write_callback(bev=%p,ptr->buffereventBuffer=%p,parcEventQueue=%p)\n",
+ bev, parcEventQueue->buffereventBuffer, parcEventQueue);
+ assertNotNull(parcEventQueue->writeCallback, "parcEvent write callback called when NULL");
+
+ parcEventQueue->writeCallback(parcEventQueue, PARCEventType_Write, parcEventQueue->writeUserData);
+}
+
+static void
+_parc_queue_event_callback(struct bufferevent *bev, short events, void *ptr)
+{
+ PARCEventQueue *parcEventQueue = (PARCEventQueue *) ptr;
+ int errno_forwarded = errno;
+ parcEventQueue_LogDebug(parcEventQueue,
+ "_parc_queue_event_callback(bev=%p,events=%x,errno=%d,ptr->buffereventBuffer=%p,parcEventQueue=%p)\n",
+ bev, events, errno, parcEventQueue->buffereventBuffer, parcEventQueue);
+ assertNotNull(parcEventQueue->eventCallback, "parcEvent event callback called when NULL");
+
+ errno = errno_forwarded;
+ parcEventQueue->eventCallback(parcEventQueue, internal_bufferevent_type_to_PARCEventQueueEventType(events), parcEventQueue->eventUserData);
+}
+
+void
+parcEventQueue_SetCallbacks(PARCEventQueue *parcEventQueue,
+ PARCEventQueue_Callback *readCallback,
+ PARCEventQueue_Callback *writeCallback,
+ PARCEventQueue_EventCallback *eventCallback,
+ void *user_data)
+{
+ parcEventQueue_LogDebug(parcEventQueue,
+ "parcEventQueue_SetCallbacks(event=%p(buffer=%p),readcb=%p,writecb=%p,eventcb=%p,user_data=%p)\n",
+ parcEventQueue, parcEventQueue->buffereventBuffer,
+ readCallback, writeCallback, eventCallback,
+ user_data);
+
+ parcEventQueue->readCallback = readCallback;
+ parcEventQueue->readUserData = user_data;
+ parcEventQueue->writeCallback = writeCallback;
+ parcEventQueue->writeUserData = user_data;
+ parcEventQueue->eventCallback = eventCallback;
+ parcEventQueue->eventUserData = user_data;
+ bufferevent_setcb(parcEventQueue->buffereventBuffer,
+ (readCallback) ? _parc_queue_read_callback : NULL,
+ (writeCallback) ? _parc_queue_write_callback : NULL,
+ (eventCallback) ? _parc_queue_event_callback : NULL,
+ parcEventQueue);
+}
+
+PARCEventQueue *
+parcEventQueue_Create(PARCEventScheduler *eventScheduler, int fd, PARCEventQueueOption flags)
+{
+ assertNotNull(eventScheduler, "parcEventQueue_Create passed a NULL scheduler instance.");
+ PARCEventQueue *parcEventQueue = parcMemory_AllocateAndClear(sizeof(PARCEventQueue));
+ assertNotNull(parcEventQueue, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(PARCEventQueue));
+ parcEventQueue->eventScheduler = eventScheduler;
+
+ //
+ // PARCEventQueue_CloseOnFree
+ // we close the underlying file descriptor/bufferevent/whatever
+ // when this bufferevent is freed.
+ //
+ // PARCEventQueue_DeferCallbacks
+ // callbacks are run deferred in the event loop.
+ //
+ parcEventQueue->buffereventBuffer = bufferevent_socket_new(parcEventScheduler_GetEvBase(eventScheduler), fd,
+ internal_PARCEventQueueOption_to_bufferevent_options(flags));
+ assertNotNull(parcEventQueue->buffereventBuffer,
+ "Got null from bufferevent_socket_new for socket %d", fd);
+
+ parcEventQueue_LogDebug(parcEventQueue,
+ "parcEventQueue_Create(eventScheduler=%p,libevent_base=%p) = %p\n",
+ eventScheduler,
+ parcEventScheduler_GetEvBase(eventScheduler),
+ parcEventQueue);
+
+ return parcEventQueue;
+}
+
+void
+parcEventQueue_Destroy(PARCEventQueue **parcEventQueue)
+{
+ parcEventQueue_LogDebug((*parcEventQueue), "parcEventQueue_Destroy(ptr=%p)\n", *parcEventQueue);
+ assertNotNull((*parcEventQueue)->buffereventBuffer, "parcEventQueue_Destroy passed a null buffer!");
+
+ bufferevent_free((*parcEventQueue)->buffereventBuffer);
+ parcMemory_Deallocate((void *) parcEventQueue);
+}
+
+int
+parcEventQueue_SetFileDescriptor(PARCEventQueue *parcEventQueue, int fd)
+{
+ return bufferevent_setfd(parcEventQueue->buffereventBuffer, fd);
+}
+
+int
+parcEventQueue_GetFileDescriptor(PARCEventQueue *parcEventQueue)
+{
+ return bufferevent_getfd(parcEventQueue->buffereventBuffer);
+}
+
+PARCEventType
+parcEventQueue_GetEnabled(PARCEventQueue *event)
+{
+ return internal_libevent_type_to_PARCEventType(bufferevent_get_enabled(event->buffereventBuffer));
+}
+
+void
+parcEventQueue_Enable(PARCEventQueue *parcEventQueue, PARCEventType types)
+{
+ bufferevent_enable(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types));
+}
+
+void
+parcEventQueue_Disable(PARCEventQueue *parcEventQueue, PARCEventType types)
+{
+ bufferevent_disable(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types));
+}
+
+int
+parcEventQueue_ConnectSocket(PARCEventQueue *instance, struct sockaddr *address, int addrlen)
+{
+ return bufferevent_socket_connect(instance->buffereventBuffer, address, addrlen);
+}
+
+int
+parcEventQueue_Flush(PARCEventQueue *parcEventQueue, PARCEventType types)
+{
+ return bufferevent_flush(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types), BEV_NORMAL);
+}
+
+int
+parcEventQueue_Finished(PARCEventQueue *parcEventQueue, PARCEventType types)
+{
+ return bufferevent_flush(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types), BEV_FINISHED);
+}
+
+void
+parcEventQueue_SetWatermark(PARCEventQueue *parcEventQueue, PARCEventType types, size_t low, size_t high)
+{
+ parcEventQueue_LogDebug(parcEventQueue, "parcEventQueue->buffereventBuffer=%p\n", parcEventQueue->buffereventBuffer);
+ bufferevent_setwatermark(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types), low, high);
+}
+
+int
+parcEventQueue_Printf(PARCEventQueue *parcEventQueue, const char *fmt, ...)
+{
+ struct evbuffer *buffer = bufferevent_get_output(parcEventQueue->buffereventBuffer);
+ assertNotNull(buffer, "bufferevent_get_output returned NULL");
+
+ va_list ap;
+
+ va_start(ap, fmt);
+ int result = evbuffer_add_vprintf(buffer, fmt, ap);
+ va_end(ap);
+ return result;
+}
+
+int
+parcEventQueue_Read(PARCEventQueue *parcEventQueue, void *data, size_t dataLength)
+{
+ return bufferevent_read(parcEventQueue->buffereventBuffer, data, dataLength);
+}
+
+int
+parcEventQueue_Write(PARCEventQueue *parcEventQueue, void *data, size_t dataLength)
+{
+ return bufferevent_write(parcEventQueue->buffereventBuffer, data, dataLength);
+}
+
+int
+parcEventQueue_SetPriority(PARCEventQueue *eventQueue, PARCEventPriority priority)
+{
+ bufferevent_priority_set(eventQueue->buffereventBuffer, internal_PARCEventPriority_to_libevent_priority(priority));
+ return 0;
+}
+
+PARCEventQueuePair *
+parcEventQueue_CreateConnectedPair(PARCEventScheduler *eventScheduler)
+{
+ assertNotNull(eventScheduler, "parcEventQueue_CreateConnectedPair must be passed a valid Event Scheduler");
+ PARCEventQueuePair *parcEventQueuePair = parcMemory_AllocateAndClear(sizeof(PARCEventQueuePair));
+ assertNotNull(parcEventQueuePair, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(PARCEventQueuePair));
+
+ parcEventQueuePair->up = parcMemory_AllocateAndClear(sizeof(PARCEventQueue));
+ parcEventQueuePair->up->eventScheduler = eventScheduler;
+ parcEventQueue_LogDebug(parcEventQueuePair->up,
+ "up instance parcEventQueue_Create(eventScheduler=%p,libevent_parcEventQueue=%p) = %p\n",
+ eventScheduler,
+ parcEventScheduler_GetEvBase(eventScheduler),
+ parcEventQueuePair->up);
+ assertNotNull(parcEventQueuePair->up, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(PARCEventQueue));
+
+ parcEventQueuePair->down = parcMemory_AllocateAndClear(sizeof(PARCEventQueue));
+ parcEventQueuePair->down->eventScheduler = eventScheduler;
+ parcEventQueue_LogDebug(parcEventQueuePair->down,
+ "down instance parcEventQueue_Create(eventScheduler=%p,libevent_parcEventQueue=%p) = %p\n",
+ eventScheduler,
+ parcEventScheduler_GetEvBase(eventScheduler),
+ parcEventQueuePair->down);
+ assertNotNull(parcEventQueuePair->down, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(PARCEventQueue));
+
+ struct bufferevent *evpair[2];
+ int result = bufferevent_pair_new(parcEventScheduler_GetEvBase(eventScheduler), 0, evpair);
+ if (result != 0) {
+ parcMemory_Deallocate((void **) &(parcEventQueuePair->up));
+ parcMemory_Deallocate((void **) &(parcEventQueuePair->down));
+ parcMemory_Deallocate((void **) &parcEventQueuePair);
+ return NULL;
+ }
+
+ parcEventQueuePair->up->buffereventBuffer = evpair[0];
+ parcEventQueuePair->down->buffereventBuffer = evpair[1];
+
+ (void) parcEventQueue_SetPriority(parcEventQueuePair->up, PARCEventPriority_Normal);
+ (void) parcEventQueue_SetPriority(parcEventQueuePair->down, PARCEventPriority_Normal);
+
+ return parcEventQueuePair;
+}
+
+void
+parcEventQueue_DestroyConnectedPair(PARCEventQueuePair **queuePair)
+{
+ parcEventQueue_LogDebug((*queuePair)->up,
+ "parcEventQueue_DestroyPair(up ptr=%p)\n",
+ (*queuePair)->up);
+ parcEventQueue_LogDebug((*queuePair)->down,
+ "parcEventQueue_DestroyPair(down ptr=%p)\n",
+ (*queuePair)->down);
+
+ bufferevent_free((*queuePair)->up->buffereventBuffer);
+ bufferevent_free((*queuePair)->down->buffereventBuffer);
+
+ parcMemory_Deallocate((void **) &((*queuePair)->up));
+ parcMemory_Deallocate((void **) &((*queuePair)->down));
+ parcMemory_Deallocate((void **) queuePair);
+}
+
+PARCEventQueue *
+parcEventQueue_GetConnectedUpQueue(PARCEventQueuePair *queuePair)
+{
+ return queuePair->up;
+}
+
+PARCEventQueue *
+parcEventQueue_GetConnectedDownQueue(PARCEventQueuePair *queuePair)
+{
+ return queuePair->down;
+}
+
+struct evbuffer *
+internal_parcEventQueue_GetEvInputBuffer(PARCEventQueue *queue)
+{
+ return bufferevent_get_input(queue->buffereventBuffer);
+}
+
+struct evbuffer *
+internal_parcEventQueue_GetEvOutputBuffer(PARCEventQueue *queue)
+{
+ return bufferevent_get_output(queue->buffereventBuffer);
+}
+
+void
+parcEventQueue_EnableDebug(void)
+{
+ _parc_event_queue_debug_enabled = 1;
+}
+
+void
+parcEventQueue_DisableDebug(void)
+{
+ _parc_event_queue_debug_enabled = 0;
+}