aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools/presentation/specification_parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/tools/presentation/specification_parser.py')
-rw-r--r--resources/tools/presentation/specification_parser.py507
1 files changed, 507 insertions, 0 deletions
diff --git a/resources/tools/presentation/specification_parser.py b/resources/tools/presentation/specification_parser.py
new file mode 100644
index 0000000000..ec663f99d9
--- /dev/null
+++ b/resources/tools/presentation/specification_parser.py
@@ -0,0 +1,507 @@
+# Copyright (c) 2017 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Specification
+
+Parsing of the specification YAML file.
+"""
+
+
+import logging
+from yaml import load, YAMLError
+from pprint import pformat
+
+from errors import PresentationError
+
+
+class Specification(object):
+ """Specification of Presentation and analytics layer.
+
+ - based on specification specified in the specification YAML file
+ - presentation and analytics layer is model driven
+ """
+
+ # Tags are used in specification YAML file and replaced while the file is
+ # parsed.
+ TAG_OPENER = "{"
+ TAG_CLOSER = "}"
+
+ def __init__(self, cfg_file):
+ """Initialization.
+
+ :param cfg_file: File handler for the specification YAML file.
+ :type cfg_file: BinaryIO
+ """
+ self._cfg_file = cfg_file
+ self._cfg_yaml = None
+
+ self._specification = {"environment": dict(),
+ "debug": dict(),
+ "static": dict(),
+ "input": dict(),
+ "output": dict(),
+ "tables": list(),
+ "plots": list(),
+ "files": list()}
+
+ @property
+ def specification(self):
+ """Getter - specification.
+
+ :returns: Specification.
+ :rtype: dict
+ """
+ return self._specification
+
+ @property
+ def environment(self):
+ """Getter - environment.
+
+ :returns: Environment specification.
+ :rtype: dict
+ """
+ return self._specification["environment"]
+
+ @property
+ def static(self):
+ """Getter - static content.
+
+ :returns: Static content specification.
+ :rtype: dict
+ """
+ return self._specification["static"]
+
+ @property
+ def debug(self):
+ """Getter - debug
+
+ :returns: Debug specification
+ :rtype: dict
+ """
+ return self._specification["debug"]
+
+ @property
+ def is_debug(self):
+ """Getter - debug mode
+
+ :returns: True if debug mode is on, otherwise False.
+ :rtype: bool
+ """
+
+ try:
+ if self.environment["configuration"]["CFG[DEBUG]"] == 1:
+ return True
+ else:
+ return False
+ except KeyError:
+ return False
+
+ @property
+ def input(self):
+ """Getter - specification - inputs.
+ - jobs and builds.
+
+ :returns: Inputs.
+ :rtype: dict
+ """
+ return self._specification["input"]
+
+ @property
+ def builds(self):
+ """Getter - builds defined in specification.
+
+ :returns: Builds defined in the specification.
+ :rtype: dict
+ """
+ return self.input["builds"]
+
+ @property
+ def output(self):
+ """Getter - specification - output formats and versions to be generated.
+ - formats: html, pdf
+ - versions: full, ...
+
+ :returns: Outputs to be generated.
+ :rtype: dict
+ """
+ return self._specification["output"]
+
+ @property
+ def tables(self):
+ """Getter - tables to be generated.
+
+ :returns: List of specifications of tables to be generated.
+ :rtype: list
+ """
+ return self._specification["tables"]
+
+ @property
+ def plots(self):
+ """Getter - plots to be generated.
+
+ :returns: List of specifications of plots to be generated.
+ :rtype: list
+ """
+ return self._specification["plots"]
+
+ @property
+ def files(self):
+ """Getter - files to be generated.
+
+ :returns: List of specifications of files to be generated.
+ :rtype: list
+ """
+ return self._specification["files"]
+
+ def set_input_state(self, job, build_nr, state):
+ """Set the state of input
+
+ :param job:
+ :param build_nr:
+ :param state:
+ :return:
+ """
+
+ try:
+ for build in self._specification["input"]["builds"][job]:
+ if build["build"] == build_nr:
+ build["status"] = state
+ break
+ else:
+ raise PresentationError("Build '{}' is not defined for job '{}'"
+ " in specification file.".
+ format(build_nr, job))
+ except KeyError:
+ raise PresentationError("Job '{}' and build '{}' is not defined in "
+ "specification file.".format(job, build_nr))
+
+ def set_input_file_name(self, job, build_nr, file_name):
+ """Set the state of input
+
+ :param job:
+ :param build_nr:
+ :param file_name:
+ :return:
+ """
+
+ try:
+ for build in self._specification["input"]["builds"][job]:
+ if build["build"] == build_nr:
+ build["file-name"] = file_name
+ break
+ else:
+ raise PresentationError("Build '{}' is not defined for job '{}'"
+ " in specification file.".
+ format(build_nr, job))
+ except KeyError:
+ raise PresentationError("Job '{}' and build '{}' is not defined in "
+ "specification file.".format(job, build_nr))
+
+ def _get_type_index(self, item_type):
+ """Get index of item type (environment, input, output, ...) in
+ specification YAML file.
+
+ :param item_type: Item type: Top level items in specification YAML file,
+ e.g.: environment, input, output.
+ :type item_type: str
+ :returns: Index of the given item type.
+ :rtype: int
+ """
+
+ index = 0
+ for item in self._cfg_yaml:
+ if item["type"] == item_type:
+ return index
+ index += 1
+ return None
+
+ def _find_tag(self, text):
+ """Find the first tag in the given text. The tag is enclosed by the
+ TAG_OPENER and TAG_CLOSER.
+
+ :param text: Text to be searched.
+ :type text: str
+ :returns: The tag, or None if not found.
+ :rtype: str
+ """
+ try:
+ start = text.index(self.TAG_OPENER)
+ end = text.index(self.TAG_CLOSER, start + 1) + 1
+ return text[start:end]
+ except ValueError:
+ return None
+
+ def _replace_tags(self, data, src_data=None):
+ """Replace tag(s) in the data by their values.
+
+ :param data: The data where the tags will be replaced by their values.
+ :param src_data: Data where the tags are defined. It is dictionary where
+ the key is the tag and the value is the tag value. If not given, 'data'
+ is used instead.
+ :type data: str or dict
+ :type src_data: dict
+ :returns: Data with the tags replaced.
+ :rtype: str or dict
+ :raises: PresentationError if it is not possible to replace the tag or
+ the data is not the supported data type (str, dict).
+ """
+
+ if src_data is None:
+ src_data = data
+
+ if isinstance(data, str):
+ tag = self._find_tag(data)
+ if tag is not None:
+ data = data.replace(tag, src_data[tag[1:-1]])
+
+ elif isinstance(data, dict):
+ counter = 0
+ for key, value in data.items():
+ tag = self._find_tag(value)
+ if tag is not None:
+ try:
+ data[key] = value.replace(tag, src_data[tag[1:-1]])
+ counter += 1
+ except KeyError:
+ raise PresentationError("Not possible to replace the "
+ "tag '{}'".format(tag))
+ if counter:
+ self._replace_tags(data, src_data)
+ else:
+ raise PresentationError("Replace tags: Not supported data type.")
+
+ return data
+
+ def _parse_env(self):
+ """Parse environment specification in the specification YAML file.
+ """
+
+ logging.info("Parsing specification file: environment ...")
+
+ idx = self._get_type_index("environment")
+ if idx is None:
+ return None
+
+ try:
+ self._specification["environment"]["configuration"] = \
+ self._cfg_yaml[idx]["configuration"]
+ except KeyError:
+ self._specification["environment"]["configuration"] = None
+
+ try:
+ self._specification["environment"]["paths"] = \
+ self._replace_tags(self._cfg_yaml[idx]["paths"])
+ except KeyError:
+ self._specification["environment"]["paths"] = None
+
+ try:
+ self._specification["environment"]["urls"] = \
+ self._replace_tags(self._cfg_yaml[idx]["urls"])
+ except KeyError:
+ self._specification["environment"]["urls"] = None
+
+ try:
+ self._specification["environment"]["make-dirs"] = \
+ self._cfg_yaml[idx]["make-dirs"]
+ except KeyError:
+ self._specification["environment"]["make-dirs"] = None
+
+ try:
+ self._specification["environment"]["remove-dirs"] = \
+ self._cfg_yaml[idx]["remove-dirs"]
+ except KeyError:
+ self._specification["environment"]["remove-dirs"] = None
+
+ try:
+ self._specification["environment"]["build-dirs"] = \
+ self._cfg_yaml[idx]["build-dirs"]
+ except KeyError:
+ self._specification["environment"]["build-dirs"] = None
+
+ logging.info("Done.")
+
+ def _parse_debug(self):
+ """Parse debug specification in the specification YAML file.
+ """
+
+ if int(self.environment["configuration"]["CFG[DEBUG]"]) != 1:
+ return None
+
+ logging.info("Parsing specification file: debug ...")
+
+ idx = self._get_type_index("debug")
+ if idx is None:
+ self.environment["configuration"]["CFG[DEBUG]"] = 0
+ return None
+
+ try:
+ for key, value in self._cfg_yaml[idx]["general"].items():
+ self._specification["debug"][key] = value
+
+ self._specification["input"]["builds"] = dict()
+ for job, builds in self._cfg_yaml[idx]["builds"].items():
+ if builds:
+ self._specification["input"]["builds"][job] = list()
+ for build in builds:
+ self._specification["input"]["builds"][job].\
+ append({"build": build["build"],
+ "status": "downloaded",
+ "file-name": self._replace_tags(
+ build["file"],
+ self.environment["paths"])})
+ else:
+ logging.warning("No build is defined for the job '{}'. "
+ "Trying to continue without it.".
+ format(job))
+
+ except KeyError:
+ raise PresentationError("No data to process.")
+
+ def _parse_input(self):
+ """Parse input specification in the specification YAML file.
+
+ :raises: PresentationError if there are no data to process.
+ """
+
+ logging.info("Parsing specification file: input ...")
+
+ idx = self._get_type_index("input")
+ if idx is None:
+ raise PresentationError("No data to process.")
+
+ try:
+ for key, value in self._cfg_yaml[idx]["general"].items():
+ self._specification["input"][key] = value
+ self._specification["input"]["builds"] = dict()
+ for job, builds in self._cfg_yaml[idx]["builds"].items():
+ if builds:
+ self._specification["input"]["builds"][job] = list()
+ for build in builds:
+ self._specification["input"]["builds"][job].\
+ append({"build": build, "status": None})
+ else:
+ logging.warning("No build is defined for the job '{}'. "
+ "Trying to continue without it.".
+ format(job))
+ except KeyError:
+ raise PresentationError("No data to process.")
+
+ logging.info("Done.")
+
+ def _parse_output(self):
+ """Parse output specification in the specification YAML file.
+
+ :raises: PresentationError if there is no output defined.
+ """
+
+ logging.info("Parsing specification file: output ...")
+
+ idx = self._get_type_index("output")
+ if idx is None:
+ raise PresentationError("No output defined.")
+
+ try:
+ self._specification["output"] = self._cfg_yaml[idx]["format"]
+ except KeyError:
+ raise PresentationError("No output defined.")
+
+ logging.info("Done.")
+
+ def _parse_static(self):
+ """Parse specification of the static content in the specification YAML
+ file.
+ """
+
+ logging.info("Parsing specification file: static content ...")
+
+ idx = self._get_type_index("static")
+ if idx is None:
+ logging.warning("No static content specified.")
+
+ for key, value in self._cfg_yaml[idx].items():
+ if isinstance(value, str):
+ try:
+ self._cfg_yaml[idx][key] = self._replace_tags(
+ value, self._specification["environment"]["paths"])
+ except KeyError:
+ pass
+
+ self._specification["static"] = self._cfg_yaml[idx]
+
+ logging.info("Done.")
+
+ def _parse_elements(self):
+ """Parse elements (tables, plots) specification in the specification
+ YAML file.
+ """
+
+ logging.info("Parsing specification file: elements ...")
+
+ count = 1
+ for element in self._cfg_yaml:
+ try:
+ element["output-file"] = self._replace_tags(
+ element["output-file"],
+ self._specification["environment"]["paths"])
+ except KeyError:
+ pass
+ if element["type"] == "table":
+ logging.info(" {:3d} Processing a table ...".format(count))
+ try:
+ element["template"] = self._replace_tags(
+ element["template"],
+ self._specification["environment"]["paths"])
+ except KeyError:
+ pass
+ self._specification["tables"].append(element)
+ count += 1
+ elif element["type"] == "plot":
+ logging.info(" {:3d} Processing a plot ...".format(count))
+ self._specification["plots"].append(element)
+ count += 1
+ elif element["type"] == "file":
+ logging.info(" {:3d} Processing a file ...".format(count))
+ try:
+ element["dir-tables"] = self._replace_tags(
+ element["dir-tables"],
+ self._specification["environment"]["paths"])
+ except KeyError:
+ pass
+ self._specification["files"].append(element)
+ count += 1
+
+ logging.info("Done.")
+
+ def read_specification(self):
+ """Parse specification in the specification YAML file.
+
+ :raises: PresentationError if an error occurred while parsing the
+ specification file.
+ """
+ try:
+ self._cfg_yaml = load(self._cfg_file)
+ except YAMLError as err:
+ raise PresentationError(msg="An error occurred while parsing the "
+ "specification file.",
+ details=str(err))
+
+ self._parse_env()
+ self._parse_debug()
+ if not self.debug:
+ self._parse_input()
+ self._parse_output()
+ self._parse_static()
+ self._parse_elements()
+
+ logging.debug("Specification: \n{}".
+ format(pformat(self._specification)))