/* * Copyright (c) 2017 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** */ #include #include #include #include "internal_parc_Event.h" #include #include #include #include #include static int _parc_event_queue_debug_enabled = 0; #define parcEventQueue_LogDebug(parcEventQueue, ...) \ if (_parc_event_queue_debug_enabled) \ parcLog_Debug(parcEventScheduler_GetLogger(parcEventQueue->eventScheduler), __VA_ARGS__) /** * Current implementation based on top of libevent2 */ #include #include /** * @typedef PARCEventQueue * @brief A structure containing private event state */ struct PARCEventQueue { // Event scheduler we have been queued with PARCEventScheduler *eventScheduler; struct bufferevent *buffereventBuffer; // Interpose on bufferevent callbacks PARCEventQueue_Callback *readCallback; void *readUserData; PARCEventQueue_Callback *writeCallback; void *writeUserData; PARCEventQueue_EventCallback *eventCallback; void *eventUserData; }; struct PARCEventQueuePair { PARCEventQueue *up; PARCEventQueue *down; }; static void _parc_queue_read_callback(struct bufferevent *bev, void *ptr) { PARCEventQueue *parcEventQueue = (PARCEventQueue *) ptr; parcEventQueue_LogDebug(parcEventQueue, "_parc_queue_read_callback(bev=%p,ptr->buffereventBuffer=%p,parcEventQueue=%p)\n", bev, parcEventQueue->buffereventBuffer, parcEventQueue); assertNotNull(parcEventQueue->readCallback, "parcEvent read callback called when NULL"); parcEventQueue->readCallback(parcEventQueue, PARCEventType_Read, parcEventQueue->readUserData); } static void _parc_queue_write_callback(struct bufferevent *bev, void *ptr) { PARCEventQueue *parcEventQueue = (PARCEventQueue *) ptr; parcEventQueue_LogDebug(parcEventQueue, "_parc_queue_write_callback(bev=%p,ptr->buffereventBuffer=%p,parcEventQueue=%p)\n", bev, parcEventQueue->buffereventBuffer, parcEventQueue); assertNotNull(parcEventQueue->writeCallback, "parcEvent write callback called when NULL"); parcEventQueue->writeCallback(parcEventQueue, PARCEventType_Write, parcEventQueue->writeUserData); } static void _parc_queue_event_callback(struct bufferevent *bev, short events, void *ptr) { PARCEventQueue *parcEventQueue = (PARCEventQueue *) ptr; int errno_forwarded = errno; parcEventQueue_LogDebug(parcEventQueue, "_parc_queue_event_callback(bev=%p,events=%x,errno=%d,ptr->buffereventBuffer=%p,parcEventQueue=%p)\n", bev, events, errno, parcEventQueue->buffereventBuffer, parcEventQueue); assertNotNull(parcEventQueue->eventCallback, "parcEvent event callback called when NULL"); errno = errno_forwarded; parcEventQueue->eventCallback(parcEventQueue, internal_bufferevent_type_to_PARCEventQueueEventType(events), parcEventQueue->eventUserData); } void parcEventQueue_SetCallbacks(PARCEventQueue *parcEventQueue, PARCEventQueue_Callback *readCallback, PARCEventQueue_Callback *writeCallback, PARCEventQueue_EventCallback *eventCallback, void *user_data) { parcEventQueue_LogDebug(parcEventQueue, "parcEventQueue_SetCallbacks(event=%p(buffer=%p),readcb=%p,writecb=%p,eventcb=%p,user_data=%p)\n", parcEventQueue, parcEventQueue->buffereventBuffer, readCallback, writeCallback, eventCallback, user_data); parcEventQueue->readCallback = readCallback; parcEventQueue->readUserData = user_data; parcEventQueue->writeCallback = writeCallback; parcEventQueue->writeUserData = user_data; parcEventQueue->eventCallback = eventCallback; parcEventQueue->eventUserData = user_data; bufferevent_setcb(parcEventQueue->buffereventBuffer, (readCallback) ? _parc_queue_read_callback : NULL, (writeCallback) ? _parc_queue_write_callback : NULL, (eventCallback) ? _parc_queue_event_callback : NULL, parcEventQueue); } PARCEventQueue * parcEventQueue_Create(PARCEventScheduler *eventScheduler, int fd, PARCEventQueueOption flags) { assertNotNull(eventScheduler, "parcEventQueue_Create passed a NULL scheduler instance."); PARCEventQueue *parcEventQueue = parcMemory_AllocateAndClear(sizeof(PARCEventQueue)); assertNotNull(parcEventQueue, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(PARCEventQueue)); parcEventQueue->eventScheduler = eventScheduler; // // PARCEventQueue_CloseOnFree // we close the underlying file descriptor/bufferevent/whatever // when this bufferevent is freed. // // PARCEventQueue_DeferCallbacks // callbacks are run deferred in the event loop. // parcEventQueue->buffereventBuffer = bufferevent_socket_new(parcEventScheduler_GetEvBase(eventScheduler), fd, internal_PARCEventQueueOption_to_bufferevent_options(flags)); assertNotNull(parcEventQueue->buffereventBuffer, "Got null from bufferevent_socket_new for socket %d", fd); parcEventQueue_LogDebug(parcEventQueue, "parcEventQueue_Create(eventScheduler=%p,libevent_base=%p) = %p\n", eventScheduler, parcEventScheduler_GetEvBase(eventScheduler), parcEventQueue); return parcEventQueue; } void parcEventQueue_Destroy(PARCEventQueue **parcEventQueue) { parcEventQueue_LogDebug((*parcEventQueue), "parcEventQueue_Destroy(ptr=%p)\n", *parcEventQueue); assertNotNull((*parcEventQueue)->buffereventBuffer, "parcEventQueue_Destroy passed a null buffer!"); bufferevent_free((*parcEventQueue)->buffereventBuffer); parcMemory_Deallocate((void *) parcEventQueue); } int parcEventQueue_SetFileDescriptor(PARCEventQueue *parcEventQueue, int fd) { return bufferevent_setfd(parcEventQueue->buffereventBuffer, fd); } int parcEventQueue_GetFileDescriptor(PARCEventQueue *parcEventQueue) { return bufferevent_getfd(parcEventQueue->buffereventBuffer); } PARCEventType parcEventQueue_GetEnabled(PARCEventQueue *event) { return internal_libevent_type_to_PARCEventType(bufferevent_get_enabled(event->buffereventBuffer)); } void parcEventQueue_Enable(PARCEventQueue *parcEventQueue, PARCEventType types) { bufferevent_enable(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types)); } void parcEventQueue_Disable(PARCEventQueue *parcEventQueue, PARCEventType types) { bufferevent_disable(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types)); } int parcEventQueue_ConnectSocket(PARCEventQueue *instance, struct sockaddr *address, int addrlen) { return bufferevent_socket_connect(instance->buffereventBuffer, address, addrlen); } int parcEventQueue_Flush(PARCEventQueue *parcEventQueue, PARCEventType types) { return bufferevent_flush(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types), BEV_NORMAL); } int parcEventQueue_Finished(PARCEventQueue *parcEventQueue, PARCEventType types) { return bufferevent_flush(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types), BEV_FINISHED); } void parcEventQueue_SetWatermark(PARCEventQueue *parcEventQueue, PARCEventType types, size_t low, size_t high) { parcEventQueue_LogDebug(parcEventQueue, "parcEventQueue->buffereventBuffer=%p\n", parcEventQueue->buffereventBuffer); bufferevent_setwatermark(parcEventQueue->buffereventBuffer, internal_PARCEventType_to_libevent_type(types), low, high); } int parcEventQueue_Printf(PARCEventQueue *parcEventQueue, const char *fmt, ...) { struct evbuffer *buffer = bufferevent_get_output(parcEventQueue->buffereventBuffer); assertNotNull(buffer, "bufferevent_get_output returned NULL"); va_list ap; va_start(ap, fmt); int result = evbuffer_add_vprintf(buffer, fmt, ap); va_end(ap); return result; } int parcEventQueue_Read(PARCEventQueue *parcEventQueue, void *data, size_t dataLength) { return bufferevent_read(parcEventQueue->buffereventBuffer, data, dataLength); } int parcEventQueue_Write(PARCEventQueue *parcEventQueue, void *data, size_t dataLength) { return bufferevent_write(parcEventQueue->buffereventBuffer, data, dataLength); } int parcEventQueue_SetPriority(PARCEventQueue *eventQueue, PARCEventPriority priority) { bufferevent_priority_set(eventQueue->buffereventBuffer, internal_PARCEventPriority_to_libevent_priority(priority)); return 0; } PARCEventQueuePair * parcEventQueue_CreateConnectedPair(PARCEventScheduler *eventScheduler) { assertNotNull(eventScheduler, "parcEventQueue_CreateConnectedPair must be passed a valid Event Scheduler"); PARCEventQueuePair *parcEventQueuePair = parcMemory_AllocateAndClear(sizeof(PARCEventQueuePair)); assertNotNull(parcEventQueuePair, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(PARCEventQueuePair)); parcEventQueuePair->up = parcMemory_AllocateAndClear(sizeof(PARCEventQueue)); parcEventQueuePair->up->eventScheduler = eventScheduler; parcEventQueue_LogDebug(parcEventQueuePair->up, "up instance parcEventQueue_Create(eventScheduler=%p,libevent_parcEventQueue=%p) = %p\n", eventScheduler, parcEventScheduler_GetEvBase(eventScheduler), parcEventQueuePair->up); assertNotNull(parcEventQueuePair->up, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(PARCEventQueue)); parcEventQueuePair->down = parcMemory_AllocateAndClear(sizeof(PARCEventQueue)); parcEventQueuePair->down->eventScheduler = eventScheduler; parcEventQueue_LogDebug(parcEventQueuePair->down, "down instance parcEventQueue_Create(eventScheduler=%p,libevent_parcEventQueue=%p) = %p\n", eventScheduler, parcEventScheduler_GetEvBase(eventScheduler), parcEventQueuePair->down); assertNotNull(parcEventQueuePair->down, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(PARCEventQueue)); struct bufferevent *evpair[2]; int result = bufferevent_pair_new(parcEventScheduler_GetEvBase(eventScheduler), 0, evpair); if (result != 0) { parcMemory_Deallocate((void **) &(parcEventQueuePair->up)); parcMemory_Deallocate((void **) &(parcEventQueuePair->down)); parcMemory_Deallocate((void **) &parcEventQueuePair); return NULL; } parcEventQueuePair->up->buffereventBuffer = evpair[0]; parcEventQueuePair->down->buffereventBuffer = evpair[1]; (void) parcEventQueue_SetPriority(parcEventQueuePair->up, PARCEventPriority_Normal); (void) parcEventQueue_SetPriority(parcEventQueuePair->down, PARCEventPriority_Normal); return parcEventQueuePair; } void parcEventQueue_DestroyConnectedPair(PARCEventQueuePair **queuePair) { parcEventQueue_LogDebug((*queuePair)->up, "parcEventQueue_DestroyPair(up ptr=%p)\n", (*queuePair)->up); parcEventQueue_LogDebug((*queuePair)->down, "parcEventQueue_DestroyPair(down ptr=%p)\n", (*queuePair)->down); bufferevent_free((*queuePair)->up->buffereventBuffer); bufferevent_free((*queuePair)->down->buffereventBuffer); parcMemory_Deallocate((void **) &((*queuePair)->up)); parcMemory_Deallocate((void **) &((*queuePair)->down)); parcMemory_Deallocate((void **) queuePair); } PARCEventQueue * parcEventQueue_GetConnectedUpQueue(PARCEventQueuePair *queuePair) { return queuePair->up; } PARCEventQueue * parcEventQueue_GetConnectedDownQueue(PARCEventQueuePair *queuePair) { return queuePair->down; } struct evbuffer * internal_parcEventQueue_GetEvInputBuffer(PARCEventQueue *queue) { return bufferevent_get_input(queue->buffereventBuffer); } struct evbuffer * internal_parcEventQueue_GetEvOutputBuffer(PARCEventQueue *queue) { return bufferevent_get_output(queue->buffereventBuffer); } void parcEventQueue_EnableDebug(void) { _parc_event_queue_debug_enabled = 1; } void parcEventQueue_DisableDebug(void) { _parc_event_queue_debug_enabled = 0; }