aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools/presentation/specification_parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/tools/presentation/specification_parser.py')
-rw-r--r--resources/tools/presentation/specification_parser.py796
1 files changed, 480 insertions, 316 deletions
diff --git a/resources/tools/presentation/specification_parser.py b/resources/tools/presentation/specification_parser.py
index 9852d905ba..302ce037ab 100644
--- a/resources/tools/presentation/specification_parser.py
+++ b/resources/tools/presentation/specification_parser.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -18,15 +18,17 @@ Parsing of the specification YAML file.
import logging
-from yaml import load, YAMLError
from pprint import pformat
-from errors import PresentationError
-from utils import get_last_successful_build_number
-from utils import get_last_completed_build_number
+from yaml import load, FullLoader, YAMLError
+from pal_errors import PresentationError
+from pal_utils import (
+ get_last_successful_build_nr, get_last_completed_build_number
+)
-class Specification(object):
+
+class Specification:
"""Specification of Presentation and analytics layer.
- based on specification specified in the specification YAML file
@@ -35,8 +37,8 @@ class Specification(object):
# Tags are used in specification YAML file and replaced while the file is
# parsed.
- TAG_OPENER = "{"
- TAG_CLOSER = "}"
+ TAG_OPENER = u"{"
+ TAG_CLOSER = u"}"
def __init__(self, cfg_file):
"""Initialization.
@@ -47,15 +49,17 @@ class Specification(object):
self._cfg_file = cfg_file
self._cfg_yaml = None
- self._specification = {"environment": dict(),
- "configuration": dict(),
- "static": dict(),
- "input": dict(),
- "output": dict(),
- "tables": list(),
- "plots": list(),
- "files": list(),
- "cpta": dict()}
+ self._specification = {
+ u"environment": dict(),
+ u"configuration": dict(),
+ u"static": dict(),
+ u"input": dict(),
+ u"output": dict(),
+ u"tables": list(),
+ u"plots": list(),
+ u"files": list(),
+ u"cpta": dict()
+ }
@property
def specification(self):
@@ -73,7 +77,7 @@ class Specification(object):
:returns: Environment specification.
:rtype: dict
"""
- return self._specification["environment"]
+ return self._specification[u"environment"]
@property
def configuration(self):
@@ -82,7 +86,7 @@ class Specification(object):
:returns: Configuration of PAL.
:rtype: dict
"""
- return self._specification["configuration"]
+ return self._specification[u"configuration"]
@property
def static(self):
@@ -91,7 +95,7 @@ class Specification(object):
:returns: Static content specification.
:rtype: dict
"""
- return self._specification["static"]
+ return self._specification[u"static"]
@property
def mapping(self):
@@ -101,7 +105,7 @@ class Specification(object):
one.
:rtype: dict
"""
- return self._specification["configuration"]["mapping"]
+ return self._specification[u"configuration"][u"mapping"]
@property
def ignore(self):
@@ -110,7 +114,7 @@ class Specification(object):
:returns: List of ignored test cases.
:rtype: list
"""
- return self._specification["configuration"]["ignore"]
+ return self._specification[u"configuration"][u"ignore"]
@property
def alerting(self):
@@ -119,7 +123,7 @@ class Specification(object):
:returns: Specification of alerts.
:rtype: dict
"""
- return self._specification["configuration"]["alerting"]
+ return self._specification[u"configuration"][u"alerting"]
@property
def input(self):
@@ -129,7 +133,16 @@ class Specification(object):
:returns: Inputs.
:rtype: dict
"""
- return self._specification["input"]
+ return self._specification[u"input"]
+
+ @input.setter
+ def input(self, new_value):
+ """Setter - specification - inputs.
+
+ :param new_value: New value to be set.
+ :type new_value: dict
+ """
+ self._specification[u"input"] = new_value
@property
def builds(self):
@@ -138,7 +151,28 @@ class Specification(object):
:returns: Builds defined in the specification.
:rtype: dict
"""
- return self.input["builds"]
+ return self.input[u"builds"]
+
+ @builds.setter
+ def builds(self, new_value):
+ """Setter - builds defined in specification.
+
+ :param new_value: New value to be set.
+ :type new_value: dict
+ """
+ self.input[u"builds"] = new_value
+
+ def add_build(self, job, build):
+ """Add a build to the specification.
+
+ :param job: The job which run the build.
+ :param build: The build to be added.
+ :type job: str
+ :type build: dict
+ """
+ if self._specification[u"input"][u"builds"].get(job, None) is None:
+ self._specification[u"input"][u"builds"][job] = list()
+ self._specification[u"input"][u"builds"][job].append(build)
@property
def output(self):
@@ -149,7 +183,7 @@ class Specification(object):
:returns: Outputs to be generated.
:rtype: dict
"""
- return self._specification["output"]
+ return self._specification[u"output"]
@property
def tables(self):
@@ -158,7 +192,7 @@ class Specification(object):
:returns: List of specifications of tables to be generated.
:rtype: list
"""
- return self._specification["tables"]
+ return self._specification[u"tables"]
@property
def plots(self):
@@ -167,7 +201,7 @@ class Specification(object):
:returns: List of specifications of plots to be generated.
:rtype: list
"""
- return self._specification["plots"]
+ return self._specification[u"plots"]
@property
def files(self):
@@ -176,7 +210,7 @@ class Specification(object):
:returns: List of specifications of files to be generated.
:rtype: list
"""
- return self._specification["files"]
+ return self._specification[u"files"]
@property
def cpta(self):
@@ -184,54 +218,66 @@ class Specification(object):
generated.
:returns: List of specifications of Continuous Performance Trending and
- Analysis to be generated.
+ Analysis to be generated.
:rtype: list
"""
- return self._specification["cpta"]
+ return self._specification[u"cpta"]
def set_input_state(self, job, build_nr, state):
"""Set the state of input
- :param job:
- :param build_nr:
- :param state:
- :return:
+ :param job: Job name.
+ :param build_nr: Build number.
+ :param state: The new input state.
+ :type job: str
+ :type build_nr: int
+ :type state: str
+ :raises: PresentationError if wrong job and/or build is provided.
"""
try:
- for build in self._specification["input"]["builds"][job]:
- if build["build"] == build_nr:
- build["status"] = state
+ for build in self._specification[u"input"][u"builds"][job]:
+ if build[u"build"] == build_nr:
+ build[u"status"] = state
break
else:
- raise PresentationError("Build '{}' is not defined for job '{}'"
- " in specification file.".
- format(build_nr, job))
+ raise PresentationError(
+ f"Build {build_nr} is not defined for job {job} in "
+ f"specification file."
+ )
except KeyError:
- raise PresentationError("Job '{}' and build '{}' is not defined in "
- "specification file.".format(job, build_nr))
+ raise PresentationError(
+ f"Job {job} and build {build_nr} is not defined in "
+ f"specification file."
+ )
def set_input_file_name(self, job, build_nr, file_name):
"""Set the state of input
- :param job:
- :param build_nr:
- :param file_name:
- :return:
+ :param job: Job name.
+ :param build_nr: Build number.
+ :param file_name: The new file name.
+ :type job: str
+ :type build_nr: int
+ :type file_name: str
+ :raises: PresentationError if wrong job and/or build is provided.
"""
try:
- for build in self._specification["input"]["builds"][job]:
- if build["build"] == build_nr:
- build["file-name"] = file_name
+ for build in self._specification[u"input"][u"builds"][job]:
+ if build[u"build"] == build_nr:
+ build[u"file-name"] = file_name
break
else:
- raise PresentationError("Build '{}' is not defined for job '{}'"
- " in specification file.".
- format(build_nr, job))
+ raise PresentationError(
+ f"Build {build_nr} is not defined for job {job} in "
+ f"specification file."
+ )
except KeyError:
- raise PresentationError("Job '{}' and build '{}' is not defined in "
- "specification file.".format(job, build_nr))
+ raise PresentationError(
+ f"Job {job} and build {build_nr} is not defined in "
+ f"specification file."
+ )
def _get_build_number(self, job, build_type):
"""Get the number of the job defined by its name:
@@ -244,39 +290,40 @@ class Specification(object):
- lastCompletedBuild
:type job" str
:raises PresentationError: If it is not possible to get the build
- number.
+ number.
:returns: The build number.
:rtype: int
"""
# defined as a range <start, end>
- if build_type == "lastSuccessfulBuild":
+ if build_type == u"lastSuccessfulBuild":
# defined as a range <start, lastSuccessfulBuild>
- ret_code, build_nr, _ = get_last_successful_build_number(
- self.environment["urls"]["URL[JENKINS,CSIT]"], job)
- elif build_type == "lastCompletedBuild":
+ ret_code, build_nr, _ = get_last_successful_build_nr(
+ self.environment[u"urls"][u"URL[JENKINS,CSIT]"], job)
+ elif build_type == u"lastCompletedBuild":
# defined as a range <start, lastCompletedBuild>
ret_code, build_nr, _ = get_last_completed_build_number(
- self.environment["urls"]["URL[JENKINS,CSIT]"], job)
+ self.environment[u"urls"][u"URL[JENKINS,CSIT]"], job)
else:
- raise PresentationError("Not supported build type: '{0}'".
- format(build_type))
+ raise PresentationError(f"Not supported build type: {build_type}")
if ret_code != 0:
- raise PresentationError("Not possible to get the number of the "
- "build number.")
+ raise PresentationError(u"Not possible to get the number of the "
+ u"build number.")
try:
build_nr = int(build_nr)
return build_nr
except ValueError as err:
- raise PresentationError("Not possible to get the number of the "
- "build number.\nReason: {0}".format(err))
+ raise PresentationError(
+ f"Not possible to get the number of the build number. Reason:\n"
+ f"{repr(err)}"
+ )
def _get_type_index(self, item_type):
"""Get index of item type (environment, input, output, ...) in
specification YAML file.
:param item_type: Item type: Top level items in specification YAML file,
- e.g.: environment, input, output.
+ e.g.: environment, input, output.
:type item_type: str
:returns: Index of the given item type.
:rtype: int
@@ -284,7 +331,7 @@ class Specification(object):
index = 0
for item in self._cfg_yaml:
- if item["type"] == item_type:
+ if item[u"type"] == item_type:
return index
index += 1
return None
@@ -310,14 +357,14 @@ class Specification(object):
:param data: The data where the tags will be replaced by their values.
:param src_data: Data where the tags are defined. It is dictionary where
- the key is the tag and the value is the tag value. If not given, 'data'
- is used instead.
- :type data: str or dict
+ the key is the tag and the value is the tag value. If not given,
+ 'data' is used instead.
+ :type data: str, list or dict
:type src_data: dict
:returns: Data with the tags replaced.
- :rtype: str or dict
+ :rtype: str, list or dict
:raises: PresentationError if it is not possible to replace the tag or
- the data is not the supported data type (str, dict).
+ the data is not the supported data type (str, list or dict).
"""
if src_data is None:
@@ -327,8 +374,15 @@ class Specification(object):
tag = self._find_tag(data)
if tag is not None:
data = data.replace(tag, src_data[tag[1:-1]])
+ return data
- elif isinstance(data, dict):
+ if isinstance(data, list):
+ new_list = list()
+ for item in data:
+ new_list.append(self._replace_tags(item, src_data))
+ return new_list
+
+ if isinstance(data, dict):
counter = 0
for key, value in data.items():
tag = self._find_tag(value)
@@ -337,171 +391,187 @@ class Specification(object):
data[key] = value.replace(tag, src_data[tag[1:-1]])
counter += 1
except KeyError:
- raise PresentationError("Not possible to replace the "
- "tag '{}'".format(tag))
+ raise PresentationError(
+ f"Not possible to replace the tag {tag}"
+ )
if counter:
self._replace_tags(data, src_data)
- else:
- raise PresentationError("Replace tags: Not supported data type.")
+ return data
- return data
+ raise PresentationError(u"Replace tags: Not supported data type.")
def _parse_env(self):
"""Parse environment specification in the specification YAML file.
"""
- logging.info("Parsing specification file: environment ...")
+ logging.info(u"Parsing specification file: environment ...")
- idx = self._get_type_index("environment")
+ idx = self._get_type_index(u"environment")
if idx is None:
- return None
+ return
try:
- self._specification["environment"]["configuration"] = \
- self._cfg_yaml[idx]["configuration"]
+ self._specification[u"environment"][u"configuration"] = \
+ self._cfg_yaml[idx][u"configuration"]
except KeyError:
- self._specification["environment"]["configuration"] = None
+ self._specification[u"environment"][u"configuration"] = None
try:
- self._specification["environment"]["paths"] = \
- self._replace_tags(self._cfg_yaml[idx]["paths"])
+ self._specification[u"environment"][u"paths"] = \
+ self._replace_tags(self._cfg_yaml[idx][u"paths"])
except KeyError:
- self._specification["environment"]["paths"] = None
+ self._specification[u"environment"][u"paths"] = None
try:
- self._specification["environment"]["urls"] = \
- self._cfg_yaml[idx]["urls"]
+ self._specification[u"environment"][u"urls"] = \
+ self._cfg_yaml[idx][u"urls"]
except KeyError:
- self._specification["environment"]["urls"] = None
+ self._specification[u"environment"][u"urls"] = None
try:
- self._specification["environment"]["make-dirs"] = \
- self._cfg_yaml[idx]["make-dirs"]
+ self._specification[u"environment"][u"make-dirs"] = \
+ self._cfg_yaml[idx][u"make-dirs"]
except KeyError:
- self._specification["environment"]["make-dirs"] = None
+ self._specification[u"environment"][u"make-dirs"] = None
try:
- self._specification["environment"]["remove-dirs"] = \
- self._cfg_yaml[idx]["remove-dirs"]
+ self._specification[u"environment"][u"remove-dirs"] = \
+ self._cfg_yaml[idx][u"remove-dirs"]
except KeyError:
- self._specification["environment"]["remove-dirs"] = None
+ self._specification[u"environment"][u"remove-dirs"] = None
try:
- self._specification["environment"]["build-dirs"] = \
- self._cfg_yaml[idx]["build-dirs"]
+ self._specification[u"environment"][u"build-dirs"] = \
+ self._cfg_yaml[idx][u"build-dirs"]
except KeyError:
- self._specification["environment"]["build-dirs"] = None
+ self._specification[u"environment"][u"build-dirs"] = None
try:
- self._specification["environment"]["testbeds"] = \
- self._cfg_yaml[idx]["testbeds"]
+ self._specification[u"environment"][u"testbeds"] = \
+ self._cfg_yaml[idx][u"testbeds"]
except KeyError:
- self._specification["environment"]["testbeds"] = None
+ self._specification[u"environment"][u"testbeds"] = None
- logging.info("Done.")
+ logging.info(u"Done.")
+
+ def _load_mapping_table(self):
+ """Load a mapping table if it is specified. If not, use empty list.
+ """
+
+ mapping_file_name = self._specification[u"configuration"].\
+ get(u"mapping-file", None)
+ if mapping_file_name:
+ try:
+ with open(mapping_file_name, u'r') as mfile:
+ mapping = load(mfile, Loader=FullLoader)
+ # Make sure everything is lowercase
+ self._specification[u"configuration"][u"mapping"] = \
+ {key.lower(): val.lower() for key, val in
+ mapping.items()}
+ logging.debug(f"Loaded mapping table:\n{mapping}")
+ except (YAMLError, IOError) as err:
+ raise PresentationError(
+ msg=f"An error occurred while parsing the mapping file "
+ f"{mapping_file_name}",
+ details=repr(err)
+ )
+ else:
+ self._specification[u"configuration"][u"mapping"] = dict()
+
+ def _load_ignore_list(self):
+ """Load an ignore list if it is specified. If not, use empty list.
+ """
+
+ ignore_list_name = self._specification[u"configuration"].\
+ get(u"ignore-list", None)
+ if ignore_list_name:
+ try:
+ with open(ignore_list_name, u'r') as ifile:
+ ignore = load(ifile, Loader=FullLoader)
+ # Make sure everything is lowercase
+ self._specification[u"configuration"][u"ignore"] = \
+ [item.lower() for item in ignore]
+ logging.debug(f"Loaded ignore list:\n{ignore}")
+ except (YAMLError, IOError) as err:
+ raise PresentationError(
+ msg=f"An error occurred while parsing the ignore list file "
+ f"{ignore_list_name}.",
+ details=repr(err)
+ )
+ else:
+ self._specification[u"configuration"][u"ignore"] = list()
def _parse_configuration(self):
"""Parse configuration of PAL in the specification YAML file.
"""
- logging.info("Parsing specification file: configuration ...")
+ logging.info(u"Parsing specification file: configuration ...")
idx = self._get_type_index("configuration")
if idx is None:
- logging.warning("No configuration information in the specification "
- "file.")
- return None
+ logging.warning(
+ u"No configuration information in the specification file."
+ )
+ return
try:
- self._specification["configuration"] = self._cfg_yaml[idx]
-
+ self._specification[u"configuration"] = self._cfg_yaml[idx]
except KeyError:
- raise PresentationError("No configuration defined.")
+ raise PresentationError(u"No configuration defined.")
# Data sets: Replace ranges by lists
- for set_name, data_set in self.configuration["data-sets"].items():
+ for set_name, data_set in self.configuration[u"data-sets"].items():
if not isinstance(data_set, dict):
continue
for job, builds in data_set.items():
- if builds:
- if isinstance(builds, dict):
- build_end = builds.get("end", None)
+ if not builds:
+ continue
+ if isinstance(builds, dict):
+ build_end = builds.get(u"end", None)
+ max_builds = builds.get(u"max-builds", None)
+ reverse = builds.get(u"reverse", False)
+ try:
+ build_end = int(build_end)
+ except ValueError:
+ # defined as a range <start, build_type>
+ build_end = self._get_build_number(job, build_end)
+ builds = [x for x in range(builds[u"start"], build_end + 1)]
+ if max_builds and max_builds < len(builds):
+ builds = builds[-max_builds:]
+ if reverse:
+ builds.reverse()
+ self.configuration[u"data-sets"][set_name][job] = builds
+ elif isinstance(builds, list):
+ for idx, item in enumerate(builds):
try:
- build_end = int(build_end)
+ builds[idx] = int(item)
except ValueError:
- # defined as a range <start, build_type>
- build_end = self._get_build_number(job, build_end)
- builds = [x for x in range(builds["start"], build_end+1)
- if x not in builds.get("skip", list())]
- self.configuration["data-sets"][set_name][job] = builds
- elif isinstance(builds, list):
- for idx, item in enumerate(builds):
- try:
- builds[idx] = int(item)
- except ValueError:
- # defined as a range <build_type>
- builds[idx] = self._get_build_number(job, item)
+ # defined as a range <build_type>
+ builds[idx] = self._get_build_number(job, item)
# Data sets: add sub-sets to sets (only one level):
- for set_name, data_set in self.configuration["data-sets"].items():
+ for set_name, data_set in self.configuration[u"data-sets"].items():
if isinstance(data_set, list):
new_set = dict()
for item in data_set:
try:
- for key, val in self.configuration["data-sets"][item].\
+ for key, val in self.configuration[u"data-sets"][item].\
items():
new_set[key] = val
except KeyError:
raise PresentationError(
- "Data set {0} is not defined in "
- "the configuration section.".format(item))
- self.configuration["data-sets"][set_name] = new_set
+ f"Data set {item} is not defined in "
+ f"the configuration section."
+ )
+ self.configuration[u"data-sets"][set_name] = new_set
# Mapping table:
- mapping = None
- mapping_file_name = self._specification["configuration"].\
- get("mapping-file", None)
- if mapping_file_name:
- logging.debug("Mapping file: '{0}'".format(mapping_file_name))
- try:
- with open(mapping_file_name, 'r') as mfile:
- mapping = load(mfile)
- logging.debug("Loaded mapping table:\n{0}".format(mapping))
- except (YAMLError, IOError) as err:
- raise PresentationError(
- msg="An error occurred while parsing the mapping file "
- "'{0}'.".format(mapping_file_name),
- details=repr(err))
- # Make sure everything is lowercase
- if mapping:
- self._specification["configuration"]["mapping"] = \
- {key.lower(): val.lower() for key, val in mapping.iteritems()}
- else:
- self._specification["configuration"]["mapping"] = dict()
+ self._load_mapping_table()
# Ignore list:
- ignore = None
- ignore_list_name = self._specification["configuration"].\
- get("ignore-list", None)
- if ignore_list_name:
- logging.debug("Ignore list file: '{0}'".format(ignore_list_name))
- try:
- with open(ignore_list_name, 'r') as ifile:
- ignore = load(ifile)
- logging.debug("Loaded ignore list:\n{0}".format(ignore))
- except (YAMLError, IOError) as err:
- raise PresentationError(
- msg="An error occurred while parsing the ignore list file "
- "'{0}'.".format(ignore_list_name),
- details=repr(err))
- # Make sure everything is lowercase
- if ignore:
- self._specification["configuration"]["ignore"] = \
- [item.lower() for item in ignore]
- else:
- self._specification["configuration"]["ignore"] = list()
+ self._load_ignore_list()
- logging.info("Done.")
+ logging.info(u"Done.")
def _parse_input(self):
"""Parse input specification in the specification YAML file.
@@ -509,41 +579,53 @@ class Specification(object):
:raises: PresentationError if there are no data to process.
"""
- logging.info("Parsing specification file: input ...")
+ logging.info(u"Parsing specification file: input ...")
- idx = self._get_type_index("input")
+ idx = self._get_type_index(u"input")
if idx is None:
- raise PresentationError("No data to process.")
+ raise PresentationError(u"No data to process.")
try:
- for key, value in self._cfg_yaml[idx]["general"].items():
- self._specification["input"][key] = value
- self._specification["input"]["builds"] = dict()
+ for key, value in self._cfg_yaml[idx][u"general"].items():
+ self._specification[u"input"][key] = value
+ self._specification[u"input"][u"builds"] = dict()
- for job, builds in self._cfg_yaml[idx]["builds"].items():
+ for job, builds in self._cfg_yaml[idx][u"builds"].items():
if builds:
if isinstance(builds, dict):
- build_end = builds.get("end", None)
+ build_end = builds.get(u"end", None)
+ max_builds = builds.get(u"max-builds", None)
+ reverse = bool(builds.get(u"reverse", False))
try:
build_end = int(build_end)
except ValueError:
# defined as a range <start, build_type>
+ if build_end in (u"lastCompletedBuild",
+ u"lastSuccessfulBuild"):
+ reverse = True
build_end = self._get_build_number(job, build_end)
- builds = [x for x in range(builds["start"], build_end+1)
- if x not in builds.get("skip", list())]
- self._specification["input"]["builds"][job] = list()
+ builds = [x for x in range(builds[u"start"],
+ build_end + 1)
+ if x not in builds.get(u"skip", list())]
+ if reverse:
+ builds.reverse()
+ if max_builds and max_builds < len(builds):
+ builds = builds[:max_builds]
+ self._specification[u"input"][u"builds"][job] = list()
for build in builds:
- self._specification["input"]["builds"][job]. \
- append({"build": build, "status": None})
+ self._specification[u"input"][u"builds"][job]. \
+ append({u"build": build, u"status": None})
else:
- logging.warning("No build is defined for the job '{}'. "
- "Trying to continue without it.".
- format(job))
+ logging.warning(
+ f"No build is defined for the job {job}. Trying to "
+ f"continue without it."
+ )
+
except KeyError:
- raise PresentationError("No data to process.")
+ raise PresentationError(u"No data to process.")
- logging.info("Done.")
+ logging.info(u"Done.")
def _parse_output(self):
"""Parse output specification in the specification YAML file.
@@ -551,189 +633,272 @@ class Specification(object):
:raises: PresentationError if there is no output defined.
"""
- logging.info("Parsing specification file: output ...")
+ logging.info(u"Parsing specification file: output ...")
- idx = self._get_type_index("output")
+ idx = self._get_type_index(u"output")
if idx is None:
- raise PresentationError("No output defined.")
+ raise PresentationError(u"No output defined.")
try:
- self._specification["output"] = self._cfg_yaml[idx]
+ self._specification[u"output"] = self._cfg_yaml[idx]
except (KeyError, IndexError):
- raise PresentationError("No output defined.")
+ raise PresentationError(u"No output defined.")
- logging.info("Done.")
+ logging.info(u"Done.")
def _parse_static(self):
"""Parse specification of the static content in the specification YAML
file.
"""
- logging.info("Parsing specification file: static content ...")
+ logging.info(u"Parsing specification file: static content ...")
- idx = self._get_type_index("static")
+ idx = self._get_type_index(u"static")
if idx is None:
- logging.warning("No static content specified.")
+ logging.warning(u"No static content specified.")
for key, value in self._cfg_yaml[idx].items():
if isinstance(value, str):
try:
self._cfg_yaml[idx][key] = self._replace_tags(
- value, self._specification["environment"]["paths"])
+ value, self._specification[u"environment"][u"paths"])
except KeyError:
pass
- self._specification["static"] = self._cfg_yaml[idx]
+ self._specification[u"static"] = self._cfg_yaml[idx]
- logging.info("Done.")
+ logging.info(u"Done.")
+
+ def _parse_elements_tables(self, table):
+ """Parse tables from the specification YAML file.
+
+ :param table: Table to be parsed from the specification file.
+ :type table: dict
+ :raises PresentationError: If wrong data set is used.
+ """
+
+ try:
+ table[u"template"] = self._replace_tags(
+ table[u"template"],
+ self._specification[u"environment"][u"paths"])
+ except KeyError:
+ pass
+
+ # Add data sets
+ try:
+ for item in (u"reference", u"compare"):
+ if table.get(item, None):
+ data_set = table[item].get(u"data", None)
+ if isinstance(data_set, str):
+ table[item][u"data"] = \
+ self.configuration[u"data-sets"][data_set]
+ data_set = table[item].get(u"data-replacement", None)
+ if isinstance(data_set, str):
+ table[item][u"data-replacement"] = \
+ self.configuration[u"data-sets"][data_set]
+
+ if table.get(u"history", None):
+ for i in range(len(table[u"history"])):
+ data_set = table[u"history"][i].get(u"data", None)
+ if isinstance(data_set, str):
+ table[u"history"][i][u"data"] = \
+ self.configuration[u"data-sets"][data_set]
+ data_set = table[u"history"][i].get(
+ u"data-replacement", None)
+ if isinstance(data_set, str):
+ table[u"history"][i][u"data-replacement"] = \
+ self.configuration[u"data-sets"][data_set]
+
+ if table.get(u"columns", None):
+ for i in range(len(table[u"columns"])):
+ data_set = table[u"columns"][i].get(u"data-set", None)
+ if isinstance(data_set, str):
+ table[u"columns"][i][u"data-set"] = \
+ self.configuration[u"data-sets"][data_set]
+ data_set = table[u"columns"][i].get(
+ u"data-replacement", None)
+ if isinstance(data_set, str):
+ table[u"columns"][i][u"data-replacement"] = \
+ self.configuration[u"data-sets"][data_set]
+
+ except KeyError:
+ raise PresentationError(
+ f"Wrong data set used in {table.get(u'title', u'')}."
+ )
+
+ self._specification[u"tables"].append(table)
+
+ def _parse_elements_plots(self, plot):
+ """Parse plots from the specification YAML file.
+
+ :param plot: Plot to be parsed from the specification file.
+ :type plot: dict
+ :raises PresentationError: If plot layout is not defined.
+ """
+
+ # Add layout to the plots:
+ layout = plot[u"layout"].get(u"layout", None)
+ if layout is not None:
+ plot[u"layout"].pop(u"layout")
+ try:
+ for key, val in (self.configuration[u"plot-layouts"]
+ [layout].items()):
+ plot[u"layout"][key] = val
+ except KeyError:
+ raise PresentationError(
+ f"Layout {layout} is not defined in the "
+ f"configuration section."
+ )
+ self._specification[u"plots"].append(plot)
+
+ def _parse_elements_files(self, file):
+ """Parse files from the specification YAML file.
+
+ :param file: File to be parsed from the specification file.
+ :type file: dict
+ """
+
+ try:
+ file[u"dir-tables"] = self._replace_tags(
+ file[u"dir-tables"],
+ self._specification[u"environment"][u"paths"])
+ except KeyError:
+ pass
+ self._specification[u"files"].append(file)
+
+ def _parse_elements_cpta(self, cpta):
+ """Parse cpta from the specification YAML file.
+
+ :param cpta: cpta to be parsed from the specification file.
+ :type cpta: dict
+ :raises PresentationError: If wrong data set is used or if plot layout
+ is not defined.
+ """
+
+ for plot in cpta[u"plots"]:
+ # Add layout to the plots:
+ layout = plot.get(u"layout", None)
+ if layout is not None:
+ try:
+ plot[u"layout"] = \
+ self.configuration[u"plot-layouts"][layout]
+ except KeyError:
+ raise PresentationError(
+ f"Layout {layout} is not defined in the "
+ f"configuration section."
+ )
+ # Add data sets:
+ if isinstance(plot.get(u"data", None), str):
+ data_set = plot[u"data"]
+ try:
+ plot[u"data"] = \
+ self.configuration[u"data-sets"][data_set]
+ except KeyError:
+ raise PresentationError(
+ f"Data set {data_set} is not defined in "
+ f"the configuration section."
+ )
+ self._specification[u"cpta"] = cpta
def _parse_elements(self):
- """Parse elements (tables, plots) specification in the specification
+ """Parse elements (tables, plots, ..) specification in the specification
YAML file.
"""
- logging.info("Parsing specification file: elements ...")
+ logging.info(u"Parsing specification file: elements ...")
count = 1
for element in self._cfg_yaml:
+
+ # Replace tags:
try:
- element["output-file"] = self._replace_tags(
- element["output-file"],
- self._specification["environment"]["paths"])
+ element[u"output-file"] = self._replace_tags(
+ element[u"output-file"],
+ self._specification[u"environment"][u"paths"])
except KeyError:
pass
try:
- element["input-file"] = self._replace_tags(
- element["input-file"],
- self._specification["environment"]["paths"])
+ element[u"input-file"] = self._replace_tags(
+ element[u"input-file"],
+ self._specification[u"environment"][u"paths"])
except KeyError:
pass
- # add data sets to the elements:
- if isinstance(element.get("data", None), str):
- data_set = element["data"]
- try:
- element["data"] = self.configuration["data-sets"][data_set]
- except KeyError:
- raise PresentationError("Data set {0} is not defined in "
- "the configuration section.".
- format(data_set))
+ try:
+ element[u"output-file-links"] = self._replace_tags(
+ element[u"output-file-links"],
+ self._specification[u"environment"][u"paths"])
+ except KeyError:
+ pass
- if element["type"] == "table":
- logging.info(" {:3d} Processing a table ...".format(count))
+ # Add data sets to the elements:
+ if isinstance(element.get(u"data", None), str):
+ data_set = element[u"data"]
try:
- element["template"] = self._replace_tags(
- element["template"],
- self._specification["environment"]["paths"])
+ element[u"data"] = \
+ self.configuration[u"data-sets"][data_set]
except KeyError:
- pass
-
- # add data sets
- try:
- for item in ("reference", "compare"):
- if element.get(item, None):
- data_set = element[item].get("data", None)
- if isinstance(data_set, str):
- element[item]["data"] = \
- self.configuration["data-sets"][data_set]
- data_set = element[item].get("data-replacement",
- None)
- if isinstance(data_set, str):
- element[item]["data-replacement"] = \
- self.configuration["data-sets"][data_set]
-
- if element.get("history", None):
- for i in range(len(element["history"])):
- data_set = element["history"][i].get("data", None)
- if isinstance(data_set, str):
- element["history"][i]["data"] = \
- self.configuration["data-sets"][data_set]
- data_set = element["history"][i].get(
- "data-replacement", None)
- if isinstance(data_set, str):
- element["history"][i]["data-replacement"] = \
- self.configuration["data-sets"][data_set]
+ raise PresentationError(
+ f"Data set {data_set} is not defined in the "
+ f"configuration section."
+ )
+ elif isinstance(element.get(u"data", None), list):
+ new_list = list()
+ for item in element[u"data"]:
+ try:
+ new_list.append(
+ self.configuration[u"data-sets"][item]
+ )
+ except KeyError:
+ raise PresentationError(
+ f"Data set {item} is not defined in the "
+ f"configuration section."
+ )
+ element[u"data"] = new_list
- except KeyError:
- raise PresentationError("Wrong data set used in {0}.".
- format(element.get("title", "")))
+ # Parse elements:
+ if element[u"type"] == u"table":
- self._specification["tables"].append(element)
+ logging.info(f" {count:3d} Processing a table ...")
+ self._parse_elements_tables(element)
count += 1
- elif element["type"] == "plot":
- logging.info(" {:3d} Processing a plot ...".format(count))
+ elif element[u"type"] == u"plot":
- # Add layout to the plots:
- layout = element["layout"].get("layout", None)
- if layout is not None:
- element["layout"].pop("layout")
- try:
- for key, val in (self.configuration["plot-layouts"]
- [layout].items()):
- element["layout"][key] = val
- except KeyError:
- raise PresentationError("Layout {0} is not defined in "
- "the configuration section.".
- format(layout))
- self._specification["plots"].append(element)
+ logging.info(f" {count:3d} Processing a plot ...")
+ self._parse_elements_plots(element)
count += 1
- elif element["type"] == "file":
- logging.info(" {:3d} Processing a file ...".format(count))
- try:
- element["dir-tables"] = self._replace_tags(
- element["dir-tables"],
- self._specification["environment"]["paths"])
- except KeyError:
- pass
- self._specification["files"].append(element)
+ elif element[u"type"] == u"file":
+
+ logging.info(f" {count:3d} Processing a file ...")
+ self._parse_elements_files(element)
count += 1
- elif element["type"] == "cpta":
- logging.info(" {:3d} Processing Continuous Performance "
- "Trending and Analysis ...".format(count))
+ elif element[u"type"] == u"cpta":
- for plot in element["plots"]:
- # Add layout to the plots:
- layout = plot.get("layout", None)
- if layout is not None:
- try:
- plot["layout"] = \
- self.configuration["plot-layouts"][layout]
- except KeyError:
- raise PresentationError(
- "Layout {0} is not defined in the "
- "configuration section.".format(layout))
- # Add data sets:
- if isinstance(plot.get("data", None), str):
- data_set = plot["data"]
- try:
- plot["data"] = \
- self.configuration["data-sets"][data_set]
- except KeyError:
- raise PresentationError(
- "Data set {0} is not defined in "
- "the configuration section.".
- format(data_set))
- self._specification["cpta"] = element
+ logging.info(
+ f" {count:3d} Processing Continuous Performance Trending "
+ f"and Analysis ..."
+ )
+ self._parse_elements_cpta(element)
count += 1
- logging.info("Done.")
+ logging.info(u"Done.")
def read_specification(self):
"""Parse specification in the specification YAML file.
:raises: PresentationError if an error occurred while parsing the
- specification file.
+ specification file.
"""
try:
- self._cfg_yaml = load(self._cfg_file)
+ self._cfg_yaml = load(self._cfg_file, Loader=FullLoader)
except YAMLError as err:
- raise PresentationError(msg="An error occurred while parsing the "
- "specification file.",
- details=str(err))
+ raise PresentationError(msg=u"An error occurred while parsing the "
+ u"specification file.",
+ details=repr(err))
self._parse_env()
self._parse_configuration()
@@ -742,5 +907,4 @@ class Specification(object):
self._parse_static()
self._parse_elements()
- logging.debug("Specification: \n{}".
- format(pformat(self._specification)))
+ logging.debug(f"Specification: \n{pformat(self._specification)}")