aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools/presentation/input_data_parser.py
diff options
context:
space:
mode:
authorTibor Frank <tifrank@cisco.com>2018-05-25 09:13:15 +0200
committerTibor Frank <tifrank@cisco.com>2018-05-25 13:56:35 +0200
commit6f5de201aadfbb31419c05dfae6495107a745899 (patch)
tree0caea9de6a25c85ced8bc44e224160fb84a410a4 /resources/tools/presentation/input_data_parser.py
parent4324c8b7dd1fe1ba8296168f4c4dd43a8c3e6cc0 (diff)
CSIT-1104: Trending: Speed-up plots generation
Change-Id: Ia3916523d4fc9e05ec864af2a858a03dea46c73c Signed-off-by: Tibor Frank <tifrank@cisco.com>
Diffstat (limited to 'resources/tools/presentation/input_data_parser.py')
-rw-r--r--resources/tools/presentation/input_data_parser.py49
1 files changed, 2 insertions, 47 deletions
diff --git a/resources/tools/presentation/input_data_parser.py b/resources/tools/presentation/input_data_parser.py
index 9428b2cdc9..beec34c106 100644
--- a/resources/tools/presentation/input_data_parser.py
+++ b/resources/tools/presentation/input_data_parser.py
@@ -31,6 +31,7 @@ from string import replace
from os import remove
from input_data_files import download_and_unzip_data_file
+from utils import Worker
class ExecutionChecker(ResultVisitor):
@@ -863,12 +864,10 @@ class InputData(object):
logging.info("Downloading and parsing input files ...")
work_queue = multiprocessing.JoinableQueue()
-
manager = multiprocessing.Manager()
-
data_queue = manager.Queue()
-
cpus = multiprocessing.cpu_count()
+
workers = list()
for cpu in range(cpus):
worker = Worker(work_queue,
@@ -1008,9 +1007,6 @@ class InputData(object):
:rtype pandas.Series
"""
- logging.info(" Creating the data set for the {0} '{1}'.".
- format(element.get("type", ""), element.get("title", "")))
-
try:
if element["filter"] in ("all", "template"):
cond = "True"
@@ -1095,44 +1091,3 @@ class InputData(object):
merged_data[ID] = item_data
return merged_data
-
-
-class Worker(multiprocessing.Process):
- """Worker class used to download and process input files in separate
- parallel processes.
- """
-
- def __init__(self, work_queue, data_queue, func):
- """Initialization.
-
- :param work_queue: Queue with items to process.
- :param data_queue: Shared memory between processes. Queue which keeps
- the result data. This data is then read by the main process and used
- in further processing.
- :param func: Function which is executed by the worker.
- :type work_queue: multiprocessing.JoinableQueue
- :type data_queue: multiprocessing.Manager().Queue()
- :type func: Callable object
- """
- super(Worker, self).__init__()
- self._work_queue = work_queue
- self._data_queue = data_queue
- self._func = func
-
- def run(self):
- """Method representing the process's activity.
- """
-
- while True:
- try:
- self.process(self._work_queue.get())
- finally:
- self._work_queue.task_done()
-
- def process(self, item_to_process):
- """Method executed by the runner.
-
- :param item_to_process: Data to be processed by the function.
- :type item_to_process: tuple
- """
- self._func(self.pid, self._data_queue, *item_to_process)