aboutsummaryrefslogtreecommitdiffstats
path: root/libparc/parc/concurrent/parc_ThreadPool.c
diff options
context:
space:
mode:
authorMichele Papalini <micpapal+fdio@cisco.com>2017-02-24 08:00:33 +0000
committerGerrit Code Review <gerrit@fd.io>2017-02-24 08:00:33 +0000
commit4df7f4cc98b6288177df256e1db70ddc3f7d00db (patch)
tree55e71277b419e4830ae641868ab8e751c8b86972 /libparc/parc/concurrent/parc_ThreadPool.c
parentf28308bd99381ef5f1e178e2e1f870f245e35873 (diff)
parentec688b4723a041044226358bcd4dd6e2da39da49 (diff)
Merge "Initial commit: cframework. Longbow and Libparc" into cframework/master
Diffstat (limited to 'libparc/parc/concurrent/parc_ThreadPool.c')
-rw-r--r--libparc/parc/concurrent/parc_ThreadPool.c457
1 files changed, 457 insertions, 0 deletions
diff --git a/libparc/parc/concurrent/parc_ThreadPool.c b/libparc/parc/concurrent/parc_ThreadPool.c
new file mode 100644
index 00000000..80b093ba
--- /dev/null
+++ b/libparc/parc/concurrent/parc_ThreadPool.c
@@ -0,0 +1,457 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ */
+#include <config.h>
+#include <stdio.h>
+
+#include <parc/algol/parc_Object.h>
+#include <parc/algol/parc_DisplayIndented.h>
+#include <parc/algol/parc_Memory.h>
+
+#include <parc/algol/parc_SortedList.h>
+#include <parc/algol/parc_LinkedList.h>
+
+#include <parc/concurrent/parc_AtomicUint64.h>
+#include <parc/concurrent/parc_ThreadPool.h>
+#include <parc/concurrent/parc_Thread.h>
+
+struct PARCThreadPool {
+ bool continueExistingPeriodicTasksAfterShutdown;
+ bool executeExistingDelayedTasksAfterShutdown;
+ bool removeOnCancel;
+ PARCLinkedList *workQueue;
+ PARCLinkedList *threads;
+ int poolSize;
+ int maximumPoolSize;
+ long taskCount;
+ bool isShutdown;
+ bool isTerminated;
+ bool isTerminating;
+
+ PARCAtomicUint64 *completedTaskCount;
+};
+
+static void *
+_parcThreadPool_Worker(const PARCThread *thread, const PARCThreadPool *pool)
+{
+ while (parcThread_IsCancelled(thread) == false && pool->isTerminated == false) {
+ if (parcLinkedList_Lock(pool->workQueue)) {
+ PARCFutureTask *task = parcLinkedList_RemoveFirst(pool->workQueue);
+ if (task != NULL) {
+ parcAtomicUint64_Increment(pool->completedTaskCount);
+ parcLinkedList_Unlock(pool->workQueue);
+ parcFutureTask_Run(task);
+ parcFutureTask_Release(&task);
+ parcLinkedList_Lock(pool->workQueue);
+
+ parcLinkedList_Notify(pool->workQueue);
+ } else {
+ parcLinkedList_WaitFor(pool->workQueue, 1000000000);
+ }
+ parcLinkedList_Unlock(pool->workQueue);
+ }
+ }
+
+ return NULL;
+}
+
+static void
+_parcThreadPool_CancelAll(const PARCThreadPool *pool)
+{
+ PARCIterator *iterator = parcLinkedList_CreateIterator(pool->threads);
+
+ while (parcIterator_HasNext(iterator)) {
+ PARCThread *thread = parcIterator_Next(iterator);
+ parcThread_Cancel(thread);
+ }
+ parcIterator_Release(&iterator);
+}
+
+static void
+_parcThreadPool_JoinAll(const PARCThreadPool *pool)
+{
+ PARCIterator *iterator = parcLinkedList_CreateIterator(pool->threads);
+
+ while (parcIterator_HasNext(iterator)) {
+ PARCThread *thread = parcIterator_Next(iterator);
+ parcThread_Join(thread);
+ }
+ parcIterator_Release(&iterator);
+}
+
+static bool
+_parcThreadPool_Destructor(PARCThreadPool **instancePtr)
+{
+ assertNotNull(instancePtr, "Parameter must be a non-null pointer to a PARCThreadPool pointer.");
+ PARCThreadPool *pool = *instancePtr;
+
+ if (pool->isShutdown == false) {
+ _parcThreadPool_CancelAll(pool);
+ _parcThreadPool_JoinAll(pool);
+ }
+
+ parcAtomicUint64_Release(&pool->completedTaskCount);
+ parcLinkedList_Release(&pool->threads);
+
+ if (parcObject_Lock(pool->workQueue)) {
+ parcLinkedList_Release(&pool->workQueue);
+ }
+
+ return true;
+}
+
+parcObject_ImplementAcquire(parcThreadPool, PARCThreadPool);
+
+parcObject_ImplementRelease(parcThreadPool, PARCThreadPool);
+
+parcObject_Override(PARCThreadPool, PARCObject,
+ .isLockable = true,
+ .destructor = (PARCObjectDestructor *) _parcThreadPool_Destructor,
+ .copy = (PARCObjectCopy *) parcThreadPool_Copy,
+ .toString = (PARCObjectToString *) parcThreadPool_ToString,
+ .equals = (PARCObjectEquals *) parcThreadPool_Equals,
+ .compare = (PARCObjectCompare *) parcThreadPool_Compare,
+ .hashCode = (PARCObjectHashCode *) parcThreadPool_HashCode);
+
+void
+parcThreadPool_AssertValid(const PARCThreadPool *instance)
+{
+ assertTrue(parcThreadPool_IsValid(instance),
+ "PARCThreadPool is not valid.");
+}
+
+
+PARCThreadPool *
+parcThreadPool_Create(int poolSize)
+{
+ PARCThreadPool *result = parcObject_CreateInstance(PARCThreadPool);
+
+ if (result != NULL) {
+ result->poolSize = poolSize;
+ result->maximumPoolSize = poolSize;
+ result->taskCount = 0;
+ result->isShutdown = false;
+ result->isTerminated = false;
+ result->isTerminating = false;
+ result->workQueue = parcLinkedList_Create();
+ result->threads = parcLinkedList_Create();
+
+ result->completedTaskCount = parcAtomicUint64_Create(0);
+
+ result->continueExistingPeriodicTasksAfterShutdown = false;
+ result->executeExistingDelayedTasksAfterShutdown = false;
+ result->removeOnCancel = true;
+
+ if (parcObject_Lock(result)) {
+ for (int i = 0; i < poolSize; i++) {
+ PARCThread *thread = parcThread_Create((void *(*)(PARCThread *, PARCObject *))_parcThreadPool_Worker, (PARCObject *) result);
+ parcLinkedList_Append(result->threads, thread);
+ parcThread_Start(thread);
+ parcThread_Release(&thread);
+ }
+ parcObject_Unlock(result);
+ }
+ }
+
+ return result;
+}
+
+int
+parcThreadPool_Compare(const PARCThreadPool *instance, const PARCThreadPool *other)
+{
+ int result = 0;
+
+ return result;
+}
+
+PARCThreadPool *
+parcThreadPool_Copy(const PARCThreadPool *original)
+{
+ PARCThreadPool *result = parcThreadPool_Create(original->poolSize);
+
+ return result;
+}
+
+void
+parcThreadPool_Display(const PARCThreadPool *instance, int indentation)
+{
+ parcDisplayIndented_PrintLine(indentation, "PARCThreadPool@%p {", instance);
+ /* Call Display() functions for the fields here. */
+ parcDisplayIndented_PrintLine(indentation, "}");
+}
+
+bool
+parcThreadPool_Equals(const PARCThreadPool *x, const PARCThreadPool *y)
+{
+ bool result = false;
+
+ if (x == y) {
+ result = true;
+ } else if (x == NULL || y == NULL) {
+ result = false;
+ } else {
+ /* perform instance specific equality tests here. */
+ if (x->poolSize == y->poolSize) {
+ result = true;
+ }
+ }
+
+ return result;
+}
+
+PARCHashCode
+parcThreadPool_HashCode(const PARCThreadPool *instance)
+{
+ PARCHashCode result = 0;
+
+ return result;
+}
+
+bool
+parcThreadPool_IsValid(const PARCThreadPool *instance)
+{
+ bool result = false;
+
+ if (instance != NULL) {
+ result = true;
+ }
+
+ return result;
+}
+
+PARCJSON *
+parcThreadPool_ToJSON(const PARCThreadPool *instance)
+{
+ PARCJSON *result = parcJSON_Create();
+
+ if (result != NULL) {
+ }
+
+ return result;
+}
+
+char *
+parcThreadPool_ToString(const PARCThreadPool *instance)
+{
+ char *result = parcMemory_Format("PARCThreadPool@%p\n", instance);
+
+ return result;
+}
+
+void
+parcThreadPool_SetAllowCoreThreadTimeOut(PARCThreadPool *pool, bool value)
+{
+}
+
+bool
+parcThreadPool_GetAllowsCoreThreadTimeOut(const PARCThreadPool *pool)
+{
+ return false;
+}
+
+bool
+parcThreadPool_AwaitTermination(PARCThreadPool *pool, PARCTimeout *timeout)
+{
+ bool result = false;
+
+ if (pool->isTerminating) {
+ if (parcLinkedList_Lock(pool->workQueue)) {
+ while (parcLinkedList_Size(pool->workQueue) > 0) {
+ if (parcTimeout_IsNever(timeout)) {
+ parcLinkedList_Wait(pool->workQueue);
+ } else {
+ // This is not accurate as this will continue the delay, rather than keep a cumulative amount of delay.
+ uint64_t delay = parcTimeout_InNanoSeconds(timeout);
+ parcLinkedList_WaitFor(pool->workQueue, delay);
+ }
+ }
+ result = true;
+ parcLinkedList_Unlock(pool->workQueue);
+ }
+
+ parcThreadPool_ShutdownNow(pool);
+ }
+
+ return result;
+}
+
+bool
+parcThreadPool_Execute(PARCThreadPool *pool, PARCFutureTask *task)
+{
+ bool result = false;
+
+ if (parcThreadPool_Lock(pool)) {
+ if (pool->isShutdown == false) {
+ parcThreadPool_Unlock(pool);
+ if (parcLinkedList_Lock(pool->workQueue)) {
+ parcLinkedList_Append(pool->workQueue, task);
+ parcLinkedList_Notify(pool->workQueue);
+ parcLinkedList_Unlock(pool->workQueue);
+ result = true;
+ }
+ } else {
+ parcThreadPool_Unlock(pool);
+ }
+ }
+
+ return result;
+}
+
+int
+parcThreadPool_GetActiveCount(const PARCThreadPool *pool)
+{
+ return pool->poolSize;
+}
+
+uint64_t
+parcThreadPool_GetCompletedTaskCount(const PARCThreadPool *pool)
+{
+ return parcAtomicUint64_GetValue(pool->completedTaskCount);
+}
+
+int
+parcThreadPool_GetCorePoolSize(const PARCThreadPool *pool)
+{
+ return pool->poolSize;
+}
+
+PARCTimeout *
+parcThreadPool_GetKeepAliveTime(const PARCThreadPool *pool)
+{
+ return PARCTimeout_Never;
+}
+
+int
+parcThreadPool_GetLargestPoolSize(const PARCThreadPool *pool)
+{
+ return pool->poolSize;
+}
+
+int
+parcThreadPool_GetMaximumPoolSize(const PARCThreadPool *pool)
+{
+ return pool->maximumPoolSize;
+}
+
+int
+parcThreadPool_GetPoolSize(const PARCThreadPool *pool)
+{
+ return pool->poolSize;
+}
+
+PARCLinkedList *
+parcThreadPool_GetQueue(const PARCThreadPool *pool)
+{
+ return pool->workQueue;
+}
+
+long
+parcThreadPool_GetTaskCount(const PARCThreadPool *pool)
+{
+ return pool->taskCount;
+}
+
+bool
+parcThreadPool_IsShutdown(const PARCThreadPool *pool)
+{
+ return pool->isShutdown;
+}
+
+bool
+parcThreadPool_IsTerminated(const PARCThreadPool *pool)
+{
+ return pool->isTerminated;
+}
+
+bool
+parcThreadPool_IsTerminating(const PARCThreadPool *pool)
+{
+ return pool->isTerminating;
+}
+
+int
+parcThreadPool_PrestartAllCoreThreads(PARCThreadPool *pool)
+{
+ return 0;
+}
+
+bool
+parcThreadPool_PrestartCoreThread(PARCThreadPool *pool)
+{
+ return 0;
+}
+
+void
+parcThreadPool_Purge(PARCThreadPool *pool)
+{
+}
+
+bool
+parcThreadPool_Remove(PARCThreadPool *pool, PARCFutureTask *task)
+{
+ return false;
+}
+
+void
+parcThreadPool_SetCorePoolSize(PARCThreadPool *pool, int corePoolSize)
+{
+}
+
+void
+parcThreadPool_SetKeepAliveTime(PARCThreadPool *pool, PARCTimeout *timeout)
+{
+}
+
+void
+parcThreadPool_SetMaximumPoolSize(PARCThreadPool *pool, int maximumPoolSize)
+{
+}
+
+void
+parcThreadPool_Shutdown(PARCThreadPool *pool)
+{
+ if (parcThreadPool_Lock(pool)) {
+ pool->isShutdown = true;
+ pool->isTerminating = true;
+ parcThreadPool_Unlock(pool);
+ }
+}
+
+PARCLinkedList *
+parcThreadPool_ShutdownNow(PARCThreadPool *pool)
+{
+ parcThreadPool_Shutdown(pool);
+
+ // Cause all of the worker threads to exit.
+ _parcThreadPool_CancelAll(pool);
+
+ // Wake them all up so they detect that they are cancelled.
+ if (parcThreadPool_Lock(pool)) {
+ parcThreadPool_NotifyAll(pool);
+ parcThreadPool_Unlock(pool);
+ }
+
+ if (parcLinkedList_Lock(pool->workQueue)) {
+ parcLinkedList_NotifyAll(pool->workQueue);
+ parcLinkedList_Unlock(pool->workQueue);
+ }
+ // Join with all of them, thereby cleaning up all of them.
+ _parcThreadPool_JoinAll(pool);
+
+ pool->isTerminated = true;
+ return NULL;
+}