diff options
author | Michele Papalini <micpapal+fdio@cisco.com> | 2017-02-24 08:00:33 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@fd.io> | 2017-02-24 08:00:33 +0000 |
commit | 4df7f4cc98b6288177df256e1db70ddc3f7d00db (patch) | |
tree | 55e71277b419e4830ae641868ab8e751c8b86972 /libparc/parc/concurrent/parc_ThreadPool.c | |
parent | f28308bd99381ef5f1e178e2e1f870f245e35873 (diff) | |
parent | ec688b4723a041044226358bcd4dd6e2da39da49 (diff) |
Merge "Initial commit: cframework. Longbow and Libparc" into cframework/master
Diffstat (limited to 'libparc/parc/concurrent/parc_ThreadPool.c')
-rw-r--r-- | libparc/parc/concurrent/parc_ThreadPool.c | 457 |
1 files changed, 457 insertions, 0 deletions
diff --git a/libparc/parc/concurrent/parc_ThreadPool.c b/libparc/parc/concurrent/parc_ThreadPool.c new file mode 100644 index 00000000..80b093ba --- /dev/null +++ b/libparc/parc/concurrent/parc_ThreadPool.c @@ -0,0 +1,457 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + */ +#include <config.h> +#include <stdio.h> + +#include <parc/algol/parc_Object.h> +#include <parc/algol/parc_DisplayIndented.h> +#include <parc/algol/parc_Memory.h> + +#include <parc/algol/parc_SortedList.h> +#include <parc/algol/parc_LinkedList.h> + +#include <parc/concurrent/parc_AtomicUint64.h> +#include <parc/concurrent/parc_ThreadPool.h> +#include <parc/concurrent/parc_Thread.h> + +struct PARCThreadPool { + bool continueExistingPeriodicTasksAfterShutdown; + bool executeExistingDelayedTasksAfterShutdown; + bool removeOnCancel; + PARCLinkedList *workQueue; + PARCLinkedList *threads; + int poolSize; + int maximumPoolSize; + long taskCount; + bool isShutdown; + bool isTerminated; + bool isTerminating; + + PARCAtomicUint64 *completedTaskCount; +}; + +static void * +_parcThreadPool_Worker(const PARCThread *thread, const PARCThreadPool *pool) +{ + while (parcThread_IsCancelled(thread) == false && pool->isTerminated == false) { + if (parcLinkedList_Lock(pool->workQueue)) { + PARCFutureTask *task = parcLinkedList_RemoveFirst(pool->workQueue); + if (task != NULL) { + parcAtomicUint64_Increment(pool->completedTaskCount); + parcLinkedList_Unlock(pool->workQueue); + parcFutureTask_Run(task); + parcFutureTask_Release(&task); + parcLinkedList_Lock(pool->workQueue); + + parcLinkedList_Notify(pool->workQueue); + } else { + parcLinkedList_WaitFor(pool->workQueue, 1000000000); + } + parcLinkedList_Unlock(pool->workQueue); + } + } + + return NULL; +} + +static void +_parcThreadPool_CancelAll(const PARCThreadPool *pool) +{ + PARCIterator *iterator = parcLinkedList_CreateIterator(pool->threads); + + while (parcIterator_HasNext(iterator)) { + PARCThread *thread = parcIterator_Next(iterator); + parcThread_Cancel(thread); + } + parcIterator_Release(&iterator); +} + +static void +_parcThreadPool_JoinAll(const PARCThreadPool *pool) +{ + PARCIterator *iterator = parcLinkedList_CreateIterator(pool->threads); + + while (parcIterator_HasNext(iterator)) { + PARCThread *thread = parcIterator_Next(iterator); + parcThread_Join(thread); + } + parcIterator_Release(&iterator); +} + +static bool +_parcThreadPool_Destructor(PARCThreadPool **instancePtr) +{ + assertNotNull(instancePtr, "Parameter must be a non-null pointer to a PARCThreadPool pointer."); + PARCThreadPool *pool = *instancePtr; + + if (pool->isShutdown == false) { + _parcThreadPool_CancelAll(pool); + _parcThreadPool_JoinAll(pool); + } + + parcAtomicUint64_Release(&pool->completedTaskCount); + parcLinkedList_Release(&pool->threads); + + if (parcObject_Lock(pool->workQueue)) { + parcLinkedList_Release(&pool->workQueue); + } + + return true; +} + +parcObject_ImplementAcquire(parcThreadPool, PARCThreadPool); + +parcObject_ImplementRelease(parcThreadPool, PARCThreadPool); + +parcObject_Override(PARCThreadPool, PARCObject, + .isLockable = true, + .destructor = (PARCObjectDestructor *) _parcThreadPool_Destructor, + .copy = (PARCObjectCopy *) parcThreadPool_Copy, + .toString = (PARCObjectToString *) parcThreadPool_ToString, + .equals = (PARCObjectEquals *) parcThreadPool_Equals, + .compare = (PARCObjectCompare *) parcThreadPool_Compare, + .hashCode = (PARCObjectHashCode *) parcThreadPool_HashCode); + +void +parcThreadPool_AssertValid(const PARCThreadPool *instance) +{ + assertTrue(parcThreadPool_IsValid(instance), + "PARCThreadPool is not valid."); +} + + +PARCThreadPool * +parcThreadPool_Create(int poolSize) +{ + PARCThreadPool *result = parcObject_CreateInstance(PARCThreadPool); + + if (result != NULL) { + result->poolSize = poolSize; + result->maximumPoolSize = poolSize; + result->taskCount = 0; + result->isShutdown = false; + result->isTerminated = false; + result->isTerminating = false; + result->workQueue = parcLinkedList_Create(); + result->threads = parcLinkedList_Create(); + + result->completedTaskCount = parcAtomicUint64_Create(0); + + result->continueExistingPeriodicTasksAfterShutdown = false; + result->executeExistingDelayedTasksAfterShutdown = false; + result->removeOnCancel = true; + + if (parcObject_Lock(result)) { + for (int i = 0; i < poolSize; i++) { + PARCThread *thread = parcThread_Create((void *(*)(PARCThread *, PARCObject *))_parcThreadPool_Worker, (PARCObject *) result); + parcLinkedList_Append(result->threads, thread); + parcThread_Start(thread); + parcThread_Release(&thread); + } + parcObject_Unlock(result); + } + } + + return result; +} + +int +parcThreadPool_Compare(const PARCThreadPool *instance, const PARCThreadPool *other) +{ + int result = 0; + + return result; +} + +PARCThreadPool * +parcThreadPool_Copy(const PARCThreadPool *original) +{ + PARCThreadPool *result = parcThreadPool_Create(original->poolSize); + + return result; +} + +void +parcThreadPool_Display(const PARCThreadPool *instance, int indentation) +{ + parcDisplayIndented_PrintLine(indentation, "PARCThreadPool@%p {", instance); + /* Call Display() functions for the fields here. */ + parcDisplayIndented_PrintLine(indentation, "}"); +} + +bool +parcThreadPool_Equals(const PARCThreadPool *x, const PARCThreadPool *y) +{ + bool result = false; + + if (x == y) { + result = true; + } else if (x == NULL || y == NULL) { + result = false; + } else { + /* perform instance specific equality tests here. */ + if (x->poolSize == y->poolSize) { + result = true; + } + } + + return result; +} + +PARCHashCode +parcThreadPool_HashCode(const PARCThreadPool *instance) +{ + PARCHashCode result = 0; + + return result; +} + +bool +parcThreadPool_IsValid(const PARCThreadPool *instance) +{ + bool result = false; + + if (instance != NULL) { + result = true; + } + + return result; +} + +PARCJSON * +parcThreadPool_ToJSON(const PARCThreadPool *instance) +{ + PARCJSON *result = parcJSON_Create(); + + if (result != NULL) { + } + + return result; +} + +char * +parcThreadPool_ToString(const PARCThreadPool *instance) +{ + char *result = parcMemory_Format("PARCThreadPool@%p\n", instance); + + return result; +} + +void +parcThreadPool_SetAllowCoreThreadTimeOut(PARCThreadPool *pool, bool value) +{ +} + +bool +parcThreadPool_GetAllowsCoreThreadTimeOut(const PARCThreadPool *pool) +{ + return false; +} + +bool +parcThreadPool_AwaitTermination(PARCThreadPool *pool, PARCTimeout *timeout) +{ + bool result = false; + + if (pool->isTerminating) { + if (parcLinkedList_Lock(pool->workQueue)) { + while (parcLinkedList_Size(pool->workQueue) > 0) { + if (parcTimeout_IsNever(timeout)) { + parcLinkedList_Wait(pool->workQueue); + } else { + // This is not accurate as this will continue the delay, rather than keep a cumulative amount of delay. + uint64_t delay = parcTimeout_InNanoSeconds(timeout); + parcLinkedList_WaitFor(pool->workQueue, delay); + } + } + result = true; + parcLinkedList_Unlock(pool->workQueue); + } + + parcThreadPool_ShutdownNow(pool); + } + + return result; +} + +bool +parcThreadPool_Execute(PARCThreadPool *pool, PARCFutureTask *task) +{ + bool result = false; + + if (parcThreadPool_Lock(pool)) { + if (pool->isShutdown == false) { + parcThreadPool_Unlock(pool); + if (parcLinkedList_Lock(pool->workQueue)) { + parcLinkedList_Append(pool->workQueue, task); + parcLinkedList_Notify(pool->workQueue); + parcLinkedList_Unlock(pool->workQueue); + result = true; + } + } else { + parcThreadPool_Unlock(pool); + } + } + + return result; +} + +int +parcThreadPool_GetActiveCount(const PARCThreadPool *pool) +{ + return pool->poolSize; +} + +uint64_t +parcThreadPool_GetCompletedTaskCount(const PARCThreadPool *pool) +{ + return parcAtomicUint64_GetValue(pool->completedTaskCount); +} + +int +parcThreadPool_GetCorePoolSize(const PARCThreadPool *pool) +{ + return pool->poolSize; +} + +PARCTimeout * +parcThreadPool_GetKeepAliveTime(const PARCThreadPool *pool) +{ + return PARCTimeout_Never; +} + +int +parcThreadPool_GetLargestPoolSize(const PARCThreadPool *pool) +{ + return pool->poolSize; +} + +int +parcThreadPool_GetMaximumPoolSize(const PARCThreadPool *pool) +{ + return pool->maximumPoolSize; +} + +int +parcThreadPool_GetPoolSize(const PARCThreadPool *pool) +{ + return pool->poolSize; +} + +PARCLinkedList * +parcThreadPool_GetQueue(const PARCThreadPool *pool) +{ + return pool->workQueue; +} + +long +parcThreadPool_GetTaskCount(const PARCThreadPool *pool) +{ + return pool->taskCount; +} + +bool +parcThreadPool_IsShutdown(const PARCThreadPool *pool) +{ + return pool->isShutdown; +} + +bool +parcThreadPool_IsTerminated(const PARCThreadPool *pool) +{ + return pool->isTerminated; +} + +bool +parcThreadPool_IsTerminating(const PARCThreadPool *pool) +{ + return pool->isTerminating; +} + +int +parcThreadPool_PrestartAllCoreThreads(PARCThreadPool *pool) +{ + return 0; +} + +bool +parcThreadPool_PrestartCoreThread(PARCThreadPool *pool) +{ + return 0; +} + +void +parcThreadPool_Purge(PARCThreadPool *pool) +{ +} + +bool +parcThreadPool_Remove(PARCThreadPool *pool, PARCFutureTask *task) +{ + return false; +} + +void +parcThreadPool_SetCorePoolSize(PARCThreadPool *pool, int corePoolSize) +{ +} + +void +parcThreadPool_SetKeepAliveTime(PARCThreadPool *pool, PARCTimeout *timeout) +{ +} + +void +parcThreadPool_SetMaximumPoolSize(PARCThreadPool *pool, int maximumPoolSize) +{ +} + +void +parcThreadPool_Shutdown(PARCThreadPool *pool) +{ + if (parcThreadPool_Lock(pool)) { + pool->isShutdown = true; + pool->isTerminating = true; + parcThreadPool_Unlock(pool); + } +} + +PARCLinkedList * +parcThreadPool_ShutdownNow(PARCThreadPool *pool) +{ + parcThreadPool_Shutdown(pool); + + // Cause all of the worker threads to exit. + _parcThreadPool_CancelAll(pool); + + // Wake them all up so they detect that they are cancelled. + if (parcThreadPool_Lock(pool)) { + parcThreadPool_NotifyAll(pool); + parcThreadPool_Unlock(pool); + } + + if (parcLinkedList_Lock(pool->workQueue)) { + parcLinkedList_NotifyAll(pool->workQueue); + parcLinkedList_Unlock(pool->workQueue); + } + // Join with all of them, thereby cleaning up all of them. + _parcThreadPool_JoinAll(pool); + + pool->isTerminated = true; + return NULL; +} |