aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools/presentation/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/tools/presentation/utils.py')
-rw-r--r--resources/tools/presentation/utils.py161
1 files changed, 127 insertions, 34 deletions
diff --git a/resources/tools/presentation/utils.py b/resources/tools/presentation/utils.py
index 8365bfad5c..0a9d985a88 100644
--- a/resources/tools/presentation/utils.py
+++ b/resources/tools/presentation/utils.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2017 Cisco and/or its affiliates.
+# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -14,6 +14,7 @@
"""General purpose utilities.
"""
+import multiprocessing
import subprocess
import numpy as np
import pandas as pd
@@ -21,7 +22,7 @@ import logging
from os import walk, makedirs, environ
from os.path import join, isdir
-from shutil import copy, Error
+from shutil import move, Error
from math import sqrt
from errors import PresentationError
@@ -68,58 +69,69 @@ def relative_change(nr1, nr2):
return float(((nr2 - nr1) / nr1) * 100)
-def remove_outliers(input_data, outlier_const):
- """
+def remove_outliers(input_list, outlier_const=1.5, window=14):
+ """Return list with outliers removed, using split_outliers.
- :param input_data: Data from which the outliers will be removed.
+ :param input_list: Data from which the outliers will be removed.
:param outlier_const: Outlier constant.
- :type input_data: list
+ :param window: How many preceding values to take into account.
+ :type input_list: list of floats
:type outlier_const: float
+ :type window: int
:returns: The input list without outliers.
- :rtype: list
+ :rtype: list of floats
"""
- data = np.array(input_data)
+ data = np.array(input_list)
upper_quartile = np.percentile(data, 75)
lower_quartile = np.percentile(data, 25)
iqr = (upper_quartile - lower_quartile) * outlier_const
quartile_set = (lower_quartile - iqr, upper_quartile + iqr)
result_lst = list()
- for y in data.tolist():
+ for y in input_list:
if quartile_set[0] <= y <= quartile_set[1]:
result_lst.append(y)
return result_lst
-def find_outliers(input_data, outlier_const=1.5):
+def split_outliers(input_series, outlier_const=1.5, window=14):
"""Go through the input data and generate two pandas series:
- - input data without outliers
+ - input data with outliers replaced by NAN
- outliers.
The function uses IQR to detect outliers.
- :param input_data: Data to be examined for outliers.
+ :param input_series: Data to be examined for outliers.
:param outlier_const: Outlier constant.
- :type input_data: pandas.Series
+ :param window: How many preceding values to take into account.
+ :type input_series: pandas.Series
:type outlier_const: float
- :returns: Tuple: input data with outliers removed; Outliers.
- :rtype: tuple (trimmed_data, outliers)
+ :type window: int
+ :returns: Input data with NAN outliers and Outliers.
+ :rtype: (pandas.Series, pandas.Series)
"""
- upper_quartile = input_data.quantile(q=0.75)
- lower_quartile = input_data.quantile(q=0.25)
- iqr = (upper_quartile - lower_quartile) * outlier_const
- low = lower_quartile - iqr
- high = upper_quartile + iqr
+ list_data = list(input_series.items())
+ head_size = min(window, len(list_data))
+ head_list = list_data[:head_size]
trimmed_data = pd.Series()
outliers = pd.Series()
- for item in input_data.items():
- item_pd = pd.Series([item[1], ], index=[item[0], ])
- if low <= item[1] <= high:
+ for item_x, item_y in head_list:
+ item_pd = pd.Series([item_y, ], index=[item_x, ])
+ trimmed_data = trimmed_data.append(item_pd)
+ for index, (item_x, item_y) in list(enumerate(list_data))[head_size:]:
+ y_rolling_list = [y for (x, y) in list_data[index - head_size:index]]
+ y_rolling_array = np.array(y_rolling_list)
+ q1 = np.percentile(y_rolling_array, 25)
+ q3 = np.percentile(y_rolling_array, 75)
+ iqr = (q3 - q1) * outlier_const
+ low = q1 - iqr
+ item_pd = pd.Series([item_y, ], index=[item_x, ])
+ if low <= item_y:
trimmed_data = trimmed_data.append(item_pd)
else:
- trimmed_data = trimmed_data.append(pd.Series([np.nan, ],
- index=[item[0], ]))
outliers = outliers.append(item_pd)
+ nan_pd = pd.Series([np.nan, ], index=[item_x, ])
+ trimmed_data = trimmed_data.append(nan_pd)
return trimmed_data, outliers
@@ -129,7 +141,7 @@ def get_files(path, extension=None, full_path=True):
:param path: Path to files.
:param extension: Extension of files to process. If it is the empty string,
- all files will be processed.
+ all files will be processed.
:param full_path: If True, the files with full path are generated.
:type path: str
:type extension: str
@@ -187,8 +199,10 @@ def execute_command(cmd):
stdout, stderr = proc.communicate()
- logging.info(stdout)
- logging.info(stderr)
+ if stdout:
+ logging.info(stdout)
+ if stderr:
+ logging.info(stderr)
if proc.returncode != 0:
logging.error(" Command execution failed.")
@@ -239,10 +253,7 @@ def archive_input_data(spec):
logging.info(" Archiving the input data files ...")
- if spec.is_debug:
- extension = spec.debug["input-format"]
- else:
- extension = spec.input["file-format"]
+ extension = spec.input["file-format"]
data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
extension=extension)
dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
@@ -253,11 +264,93 @@ def archive_input_data(spec):
makedirs(dst)
for data_file in data_files:
- logging.info(" Copying the file: {0} ...".format(data_file))
- copy(data_file, dst)
+ logging.info(" Moving the file: {0} ...".format(data_file))
+ move(data_file, dst)
except (Error, OSError) as err:
raise PresentationError("Not possible to archive the input data.",
str(err))
logging.info(" Done.")
+
+
+def classify_anomalies(data, window):
+ """Evaluates if the sample value is an outlier, regression, normal or
+ progression compared to the previous data within the window.
+ We use the intervals defined as:
+ - regress: less than trimmed moving median - 3 * stdev
+ - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev
+ - progress: more than trimmed moving median + 3 * stdev
+ where stdev is trimmed moving standard deviation.
+
+ :param data: Full data set with the outliers replaced by nan.
+ :param window: Window size used to calculate moving average and moving
+ stdev.
+ :type data: pandas.Series
+ :type window: int
+ :returns: Evaluated results.
+ :rtype: list
+ """
+
+ if data.size < 3:
+ return None
+
+ win_size = data.size if data.size < window else window
+ tmm = data.rolling(window=win_size, min_periods=2).median()
+ tmstd = data.rolling(window=win_size, min_periods=2).std()
+
+ classification = ["normal", ]
+ first = True
+ for build, value in data.iteritems():
+ if first:
+ first = False
+ continue
+ if np.isnan(value) or np.isnan(tmm[build]) or np.isnan(tmstd[build]):
+ classification.append("outlier")
+ elif value < (tmm[build] - 3 * tmstd[build]):
+ classification.append("regression")
+ elif value > (tmm[build] + 3 * tmstd[build]):
+ classification.append("progression")
+ else:
+ classification.append("normal")
+ return classification
+
+
+class Worker(multiprocessing.Process):
+ """Worker class used to process tasks in separate parallel processes.
+ """
+
+ def __init__(self, work_queue, data_queue, func):
+ """Initialization.
+
+ :param work_queue: Queue with items to process.
+ :param data_queue: Shared memory between processes. Queue which keeps
+ the result data. This data is then read by the main process and used
+ in further processing.
+ :param func: Function which is executed by the worker.
+ :type work_queue: multiprocessing.JoinableQueue
+ :type data_queue: multiprocessing.Manager().Queue()
+ :type func: Callable object
+ """
+ super(Worker, self).__init__()
+ self._work_queue = work_queue
+ self._data_queue = data_queue
+ self._func = func
+
+ def run(self):
+ """Method representing the process's activity.
+ """
+
+ while True:
+ try:
+ self.process(self._work_queue.get())
+ finally:
+ self._work_queue.task_done()
+
+ def process(self, item_to_process):
+ """Method executed by the runner.
+
+ :param item_to_process: Data to be processed by the function.
+ :type item_to_process: tuple
+ """
+ self._func(self.pid, self._data_queue, *item_to_process)