diff options
author | Tibor Frank <tifrank@cisco.com> | 2018-05-25 09:13:15 +0200 |
---|---|---|
committer | Tibor Frank <tifrank@cisco.com> | 2018-05-25 13:56:35 +0200 |
commit | 6f5de201aadfbb31419c05dfae6495107a745899 (patch) | |
tree | 0caea9de6a25c85ced8bc44e224160fb84a410a4 /resources/tools/presentation/input_data_parser.py | |
parent | 4324c8b7dd1fe1ba8296168f4c4dd43a8c3e6cc0 (diff) |
CSIT-1104: Trending: Speed-up plots generation
Change-Id: Ia3916523d4fc9e05ec864af2a858a03dea46c73c
Signed-off-by: Tibor Frank <tifrank@cisco.com>
Diffstat (limited to 'resources/tools/presentation/input_data_parser.py')
-rw-r--r-- | resources/tools/presentation/input_data_parser.py | 49 |
1 files changed, 2 insertions, 47 deletions
diff --git a/resources/tools/presentation/input_data_parser.py b/resources/tools/presentation/input_data_parser.py index 9428b2cdc9..beec34c106 100644 --- a/resources/tools/presentation/input_data_parser.py +++ b/resources/tools/presentation/input_data_parser.py @@ -31,6 +31,7 @@ from string import replace from os import remove from input_data_files import download_and_unzip_data_file +from utils import Worker class ExecutionChecker(ResultVisitor): @@ -863,12 +864,10 @@ class InputData(object): logging.info("Downloading and parsing input files ...") work_queue = multiprocessing.JoinableQueue() - manager = multiprocessing.Manager() - data_queue = manager.Queue() - cpus = multiprocessing.cpu_count() + workers = list() for cpu in range(cpus): worker = Worker(work_queue, @@ -1008,9 +1007,6 @@ class InputData(object): :rtype pandas.Series """ - logging.info(" Creating the data set for the {0} '{1}'.". - format(element.get("type", ""), element.get("title", ""))) - try: if element["filter"] in ("all", "template"): cond = "True" @@ -1095,44 +1091,3 @@ class InputData(object): merged_data[ID] = item_data return merged_data - - -class Worker(multiprocessing.Process): - """Worker class used to download and process input files in separate - parallel processes. - """ - - def __init__(self, work_queue, data_queue, func): - """Initialization. - - :param work_queue: Queue with items to process. - :param data_queue: Shared memory between processes. Queue which keeps - the result data. This data is then read by the main process and used - in further processing. - :param func: Function which is executed by the worker. - :type work_queue: multiprocessing.JoinableQueue - :type data_queue: multiprocessing.Manager().Queue() - :type func: Callable object - """ - super(Worker, self).__init__() - self._work_queue = work_queue - self._data_queue = data_queue - self._func = func - - def run(self): - """Method representing the process's activity. - """ - - while True: - try: - self.process(self._work_queue.get()) - finally: - self._work_queue.task_done() - - def process(self, item_to_process): - """Method executed by the runner. - - :param item_to_process: Data to be processed by the function. - :type item_to_process: tuple - """ - self._func(self.pid, self._data_queue, *item_to_process) |