aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools/presentation/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/tools/presentation/utils.py')
-rw-r--r--resources/tools/presentation/utils.py41
1 files changed, 41 insertions, 0 deletions
diff --git a/resources/tools/presentation/utils.py b/resources/tools/presentation/utils.py
index ab86bafdd7..f32019dc2e 100644
--- a/resources/tools/presentation/utils.py
+++ b/resources/tools/presentation/utils.py
@@ -14,6 +14,7 @@
"""General purpose utilities.
"""
+import multiprocessing
import subprocess
import numpy as np
import pandas as pd
@@ -271,3 +272,43 @@ def archive_input_data(spec):
str(err))
logging.info(" Done.")
+
+
+class Worker(multiprocessing.Process):
+ """Worker class used to process tasks in separate parallel processes.
+ """
+
+ def __init__(self, work_queue, data_queue, func):
+ """Initialization.
+
+ :param work_queue: Queue with items to process.
+ :param data_queue: Shared memory between processes. Queue which keeps
+ the result data. This data is then read by the main process and used
+ in further processing.
+ :param func: Function which is executed by the worker.
+ :type work_queue: multiprocessing.JoinableQueue
+ :type data_queue: multiprocessing.Manager().Queue()
+ :type func: Callable object
+ """
+ super(Worker, self).__init__()
+ self._work_queue = work_queue
+ self._data_queue = data_queue
+ self._func = func
+
+ def run(self):
+ """Method representing the process's activity.
+ """
+
+ while True:
+ try:
+ self.process(self._work_queue.get())
+ finally:
+ self._work_queue.task_done()
+
+ def process(self, item_to_process):
+ """Method executed by the runner.
+
+ :param item_to_process: Data to be processed by the function.
+ :type item_to_process: tuple
+ """
+ self._func(self.pid, self._data_queue, *item_to_process)