diff options
author | Tibor Frank <tifrank@cisco.com> | 2018-05-25 09:13:15 +0200 |
---|---|---|
committer | Tibor Frank <tifrank@cisco.com> | 2018-05-25 13:56:35 +0200 |
commit | 6f5de201aadfbb31419c05dfae6495107a745899 (patch) | |
tree | 0caea9de6a25c85ced8bc44e224160fb84a410a4 /resources/tools/presentation/utils.py | |
parent | 4324c8b7dd1fe1ba8296168f4c4dd43a8c3e6cc0 (diff) |
CSIT-1104: Trending: Speed-up plots generation
Change-Id: Ia3916523d4fc9e05ec864af2a858a03dea46c73c
Signed-off-by: Tibor Frank <tifrank@cisco.com>
Diffstat (limited to 'resources/tools/presentation/utils.py')
-rw-r--r-- | resources/tools/presentation/utils.py | 41 |
1 files changed, 41 insertions, 0 deletions
diff --git a/resources/tools/presentation/utils.py b/resources/tools/presentation/utils.py index ab86bafdd7..f32019dc2e 100644 --- a/resources/tools/presentation/utils.py +++ b/resources/tools/presentation/utils.py @@ -14,6 +14,7 @@ """General purpose utilities. """ +import multiprocessing import subprocess import numpy as np import pandas as pd @@ -271,3 +272,43 @@ def archive_input_data(spec): str(err)) logging.info(" Done.") + + +class Worker(multiprocessing.Process): + """Worker class used to process tasks in separate parallel processes. + """ + + def __init__(self, work_queue, data_queue, func): + """Initialization. + + :param work_queue: Queue with items to process. + :param data_queue: Shared memory between processes. Queue which keeps + the result data. This data is then read by the main process and used + in further processing. + :param func: Function which is executed by the worker. + :type work_queue: multiprocessing.JoinableQueue + :type data_queue: multiprocessing.Manager().Queue() + :type func: Callable object + """ + super(Worker, self).__init__() + self._work_queue = work_queue + self._data_queue = data_queue + self._func = func + + def run(self): + """Method representing the process's activity. + """ + + while True: + try: + self.process(self._work_queue.get()) + finally: + self._work_queue.task_done() + + def process(self, item_to_process): + """Method executed by the runner. + + :param item_to_process: Data to be processed by the function. + :type item_to_process: tuple + """ + self._func(self.pid, self._data_queue, *item_to_process) |