diff options
Diffstat (limited to 'metis/ccnx/forwarder/metis/messenger/metis_Messenger.c')
-rw-r--r-- | metis/ccnx/forwarder/metis/messenger/metis_Messenger.c | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/metis/ccnx/forwarder/metis/messenger/metis_Messenger.c b/metis/ccnx/forwarder/metis/messenger/metis_Messenger.c new file mode 100644 index 00000000..6313a3c1 --- /dev/null +++ b/metis/ccnx/forwarder/metis/messenger/metis_Messenger.c @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + * The messenger is contructued with a reference to the forwarder's dispatcher so it can + * schedule future events. When someone calls metisMessenger_Send(...), it will put the + * message on a queue. If the queue was empty, it will scheudle itself to be run. + * By running the queue in a future dispatcher slice, it guarantees that there will be + * no re-entrant behavior between callers and message listeners. + * + * A recipient will receive a reference counted copy of the missive, so it must call + * {@link metisMissive_Release} on it. + * + */ + +#include <config.h> +#include <stdio.h> +#include <LongBow/runtime.h> +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_ArrayList.h> +#include <parc/algol/parc_EventScheduler.h> +#include <parc/algol/parc_Event.h> + +#include <ccnx/forwarder/metis/messenger/metis_Messenger.h> +#include <ccnx/forwarder/metis/messenger/metis_MissiveDeque.h> + +struct metis_messenger { + PARCArrayList *callbacklist; + MetisDispatcher *dispatcher; + MetisMissiveDeque *eventQueue; + + PARCEventTimer *timerEvent; +}; + +static void metisMessenger_Dequeue(int fd, PARCEventType which_event, void *messengerVoidPtr); + +// ========================================= +// Public API + +MetisMessenger * +metisMessenger_Create(MetisDispatcher *dispatcher) +{ + MetisMessenger *messenger = parcMemory_AllocateAndClear(sizeof(MetisMessenger)); + assertNotNull(messenger, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(MetisMessenger)); + + // NULL destroyer because we're storing structures owned by the caller + messenger->dispatcher = dispatcher; + messenger->callbacklist = parcArrayList_Create(NULL); + messenger->eventQueue = metisMissiveDeque_Create(); + + // creates the timer, but does not start it + messenger->timerEvent = metisDispatcher_CreateTimer(dispatcher, false, metisMessenger_Dequeue, messenger); + + return messenger; +} + +void +metisMessenger_Destroy(MetisMessenger **messengerPtr) +{ + assertNotNull(messengerPtr, "Parameter must be non-null double pointer"); + assertNotNull(*messengerPtr, "Parameter must dereference to non-null pointer"); + + MetisMessenger *messenger = *messengerPtr; + parcArrayList_Destroy(&messenger->callbacklist); + metisMissiveDeque_Release(&messenger->eventQueue); + metisDispatcher_DestroyTimerEvent(messenger->dispatcher, &messenger->timerEvent); + parcMemory_Deallocate((void **) &messenger); + *messengerPtr = NULL; +} + +void +metisMessenger_Send(MetisMessenger *messenger, MetisMissive *missive) +{ + assertNotNull(messenger, "Parameter messenger must be non-null"); + assertNotNull(missive, "Parameter event must be non-null"); + + metisMissiveDeque_Append(messenger->eventQueue, missive); + if (metisMissiveDeque_Size(messenger->eventQueue) == 1) { + // We need to scheudle ourself when an event is added to an empty queue + + // precondition: timer should not be running. + struct timeval immediateTimeout = { 0, 0 }; + metisDispatcher_StartTimer(messenger->dispatcher, messenger->timerEvent, &immediateTimeout); + } +} + +static void +removeRecipient(MetisMessenger *messenger, const MetisMessengerRecipient *recipient) +{ + // don't increment i in the loop + for (size_t i = 0; i < parcArrayList_Size(messenger->callbacklist); ) { + const void *p = parcArrayList_Get(messenger->callbacklist, i); + if (p == recipient) { + // removing will compact the list, so next element will also be at i. + parcArrayList_RemoveAndDestroyAtIndex(messenger->callbacklist, i); + } else { + i++; + } + } +} + +/** + * @function metisEventMessenger_Register + * @abstract Receive all event messages + * @discussion + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + */ +void +metisMessenger_Register(MetisMessenger *messenger, const MetisMessengerRecipient *recipient) +{ + assertNotNull(messenger, "Parameter messenger must be non-null"); + assertNotNull(recipient, "Parameter recipient must be non-null"); + + // do not allow duplicates + removeRecipient(messenger, recipient); + + parcArrayList_Add(messenger->callbacklist, recipient); +} + +/** + * @function metisEventMessenger_Unregister + * @abstract Stop receiving event messages + * @discussion + * <#Discussion#> + * + * @param <#param1#> + * @return <#return#> + */ +void +metisMessenger_Unregister(MetisMessenger *messenger, const MetisMessengerRecipient *recipient) +{ + assertNotNull(messenger, "Parameter messenger must be non-null"); + assertNotNull(recipient, "Parameter recipient must be non-null"); + + removeRecipient(messenger, recipient); +} + +/** + * Called by event scheduler to give us a slice in which to dequeue events + * + * Called inside an event callback, so we now have exclusive access to the system. + * Dequeues all pending events and calls all the listeners for each one. + * + * @param [in] fd unused, required for compliance with function prototype + * @param [in] which_event unused, required for compliance with function prototype + * @param [in] messengerVoidPtr A void* to MetisMessenger + * + * @return <#value#> <#explanation#> + * + * Example: + * @code + * <#example#> + * @endcode + */ +static void +metisMessenger_Dequeue(int fd, PARCEventType which_event, void *messengerVoidPtr) +{ + MetisMessenger *messenger = (MetisMessenger *) messengerVoidPtr; + assertNotNull(messenger, "Called with null messenger pointer"); + + MetisMissive *missive; + while ((missive = metisMissiveDeque_RemoveFirst(messenger->eventQueue)) != NULL) { + for (size_t i = 0; i < parcArrayList_Size(messenger->callbacklist); i++) { + MetisMessengerRecipient *recipient = parcArrayList_Get(messenger->callbacklist, i); + assertTrue(recipient, "Recipient is null at index %zu", i); + + metisMessengerRecipient_Deliver(recipient, metisMissive_Acquire(missive)); + } + + // now let go of our reference to the missive + metisMissive_Release(&missive); + } +} |