diff options
Diffstat (limited to 'resources/tools/presentation/specification_parser.py')
-rw-r--r-- | resources/tools/presentation/specification_parser.py | 507 |
1 files changed, 507 insertions, 0 deletions
diff --git a/resources/tools/presentation/specification_parser.py b/resources/tools/presentation/specification_parser.py new file mode 100644 index 0000000000..ec663f99d9 --- /dev/null +++ b/resources/tools/presentation/specification_parser.py @@ -0,0 +1,507 @@ +# Copyright (c) 2017 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Specification + +Parsing of the specification YAML file. +""" + + +import logging +from yaml import load, YAMLError +from pprint import pformat + +from errors import PresentationError + + +class Specification(object): + """Specification of Presentation and analytics layer. + + - based on specification specified in the specification YAML file + - presentation and analytics layer is model driven + """ + + # Tags are used in specification YAML file and replaced while the file is + # parsed. + TAG_OPENER = "{" + TAG_CLOSER = "}" + + def __init__(self, cfg_file): + """Initialization. + + :param cfg_file: File handler for the specification YAML file. + :type cfg_file: BinaryIO + """ + self._cfg_file = cfg_file + self._cfg_yaml = None + + self._specification = {"environment": dict(), + "debug": dict(), + "static": dict(), + "input": dict(), + "output": dict(), + "tables": list(), + "plots": list(), + "files": list()} + + @property + def specification(self): + """Getter - specification. + + :returns: Specification. + :rtype: dict + """ + return self._specification + + @property + def environment(self): + """Getter - environment. + + :returns: Environment specification. + :rtype: dict + """ + return self._specification["environment"] + + @property + def static(self): + """Getter - static content. + + :returns: Static content specification. + :rtype: dict + """ + return self._specification["static"] + + @property + def debug(self): + """Getter - debug + + :returns: Debug specification + :rtype: dict + """ + return self._specification["debug"] + + @property + def is_debug(self): + """Getter - debug mode + + :returns: True if debug mode is on, otherwise False. + :rtype: bool + """ + + try: + if self.environment["configuration"]["CFG[DEBUG]"] == 1: + return True + else: + return False + except KeyError: + return False + + @property + def input(self): + """Getter - specification - inputs. + - jobs and builds. + + :returns: Inputs. + :rtype: dict + """ + return self._specification["input"] + + @property + def builds(self): + """Getter - builds defined in specification. + + :returns: Builds defined in the specification. + :rtype: dict + """ + return self.input["builds"] + + @property + def output(self): + """Getter - specification - output formats and versions to be generated. + - formats: html, pdf + - versions: full, ... + + :returns: Outputs to be generated. + :rtype: dict + """ + return self._specification["output"] + + @property + def tables(self): + """Getter - tables to be generated. + + :returns: List of specifications of tables to be generated. + :rtype: list + """ + return self._specification["tables"] + + @property + def plots(self): + """Getter - plots to be generated. + + :returns: List of specifications of plots to be generated. + :rtype: list + """ + return self._specification["plots"] + + @property + def files(self): + """Getter - files to be generated. + + :returns: List of specifications of files to be generated. + :rtype: list + """ + return self._specification["files"] + + def set_input_state(self, job, build_nr, state): + """Set the state of input + + :param job: + :param build_nr: + :param state: + :return: + """ + + try: + for build in self._specification["input"]["builds"][job]: + if build["build"] == build_nr: + build["status"] = state + break + else: + raise PresentationError("Build '{}' is not defined for job '{}'" + " in specification file.". + format(build_nr, job)) + except KeyError: + raise PresentationError("Job '{}' and build '{}' is not defined in " + "specification file.".format(job, build_nr)) + + def set_input_file_name(self, job, build_nr, file_name): + """Set the state of input + + :param job: + :param build_nr: + :param file_name: + :return: + """ + + try: + for build in self._specification["input"]["builds"][job]: + if build["build"] == build_nr: + build["file-name"] = file_name + break + else: + raise PresentationError("Build '{}' is not defined for job '{}'" + " in specification file.". + format(build_nr, job)) + except KeyError: + raise PresentationError("Job '{}' and build '{}' is not defined in " + "specification file.".format(job, build_nr)) + + def _get_type_index(self, item_type): + """Get index of item type (environment, input, output, ...) in + specification YAML file. + + :param item_type: Item type: Top level items in specification YAML file, + e.g.: environment, input, output. + :type item_type: str + :returns: Index of the given item type. + :rtype: int + """ + + index = 0 + for item in self._cfg_yaml: + if item["type"] == item_type: + return index + index += 1 + return None + + def _find_tag(self, text): + """Find the first tag in the given text. The tag is enclosed by the + TAG_OPENER and TAG_CLOSER. + + :param text: Text to be searched. + :type text: str + :returns: The tag, or None if not found. + :rtype: str + """ + try: + start = text.index(self.TAG_OPENER) + end = text.index(self.TAG_CLOSER, start + 1) + 1 + return text[start:end] + except ValueError: + return None + + def _replace_tags(self, data, src_data=None): + """Replace tag(s) in the data by their values. + + :param data: The data where the tags will be replaced by their values. + :param src_data: Data where the tags are defined. It is dictionary where + the key is the tag and the value is the tag value. If not given, 'data' + is used instead. + :type data: str or dict + :type src_data: dict + :returns: Data with the tags replaced. + :rtype: str or dict + :raises: PresentationError if it is not possible to replace the tag or + the data is not the supported data type (str, dict). + """ + + if src_data is None: + src_data = data + + if isinstance(data, str): + tag = self._find_tag(data) + if tag is not None: + data = data.replace(tag, src_data[tag[1:-1]]) + + elif isinstance(data, dict): + counter = 0 + for key, value in data.items(): + tag = self._find_tag(value) + if tag is not None: + try: + data[key] = value.replace(tag, src_data[tag[1:-1]]) + counter += 1 + except KeyError: + raise PresentationError("Not possible to replace the " + "tag '{}'".format(tag)) + if counter: + self._replace_tags(data, src_data) + else: + raise PresentationError("Replace tags: Not supported data type.") + + return data + + def _parse_env(self): + """Parse environment specification in the specification YAML file. + """ + + logging.info("Parsing specification file: environment ...") + + idx = self._get_type_index("environment") + if idx is None: + return None + + try: + self._specification["environment"]["configuration"] = \ + self._cfg_yaml[idx]["configuration"] + except KeyError: + self._specification["environment"]["configuration"] = None + + try: + self._specification["environment"]["paths"] = \ + self._replace_tags(self._cfg_yaml[idx]["paths"]) + except KeyError: + self._specification["environment"]["paths"] = None + + try: + self._specification["environment"]["urls"] = \ + self._replace_tags(self._cfg_yaml[idx]["urls"]) + except KeyError: + self._specification["environment"]["urls"] = None + + try: + self._specification["environment"]["make-dirs"] = \ + self._cfg_yaml[idx]["make-dirs"] + except KeyError: + self._specification["environment"]["make-dirs"] = None + + try: + self._specification["environment"]["remove-dirs"] = \ + self._cfg_yaml[idx]["remove-dirs"] + except KeyError: + self._specification["environment"]["remove-dirs"] = None + + try: + self._specification["environment"]["build-dirs"] = \ + self._cfg_yaml[idx]["build-dirs"] + except KeyError: + self._specification["environment"]["build-dirs"] = None + + logging.info("Done.") + + def _parse_debug(self): + """Parse debug specification in the specification YAML file. + """ + + if int(self.environment["configuration"]["CFG[DEBUG]"]) != 1: + return None + + logging.info("Parsing specification file: debug ...") + + idx = self._get_type_index("debug") + if idx is None: + self.environment["configuration"]["CFG[DEBUG]"] = 0 + return None + + try: + for key, value in self._cfg_yaml[idx]["general"].items(): + self._specification["debug"][key] = value + + self._specification["input"]["builds"] = dict() + for job, builds in self._cfg_yaml[idx]["builds"].items(): + if builds: + self._specification["input"]["builds"][job] = list() + for build in builds: + self._specification["input"]["builds"][job].\ + append({"build": build["build"], + "status": "downloaded", + "file-name": self._replace_tags( + build["file"], + self.environment["paths"])}) + else: + logging.warning("No build is defined for the job '{}'. " + "Trying to continue without it.". + format(job)) + + except KeyError: + raise PresentationError("No data to process.") + + def _parse_input(self): + """Parse input specification in the specification YAML file. + + :raises: PresentationError if there are no data to process. + """ + + logging.info("Parsing specification file: input ...") + + idx = self._get_type_index("input") + if idx is None: + raise PresentationError("No data to process.") + + try: + for key, value in self._cfg_yaml[idx]["general"].items(): + self._specification["input"][key] = value + self._specification["input"]["builds"] = dict() + for job, builds in self._cfg_yaml[idx]["builds"].items(): + if builds: + self._specification["input"]["builds"][job] = list() + for build in builds: + self._specification["input"]["builds"][job].\ + append({"build": build, "status": None}) + else: + logging.warning("No build is defined for the job '{}'. " + "Trying to continue without it.". + format(job)) + except KeyError: + raise PresentationError("No data to process.") + + logging.info("Done.") + + def _parse_output(self): + """Parse output specification in the specification YAML file. + + :raises: PresentationError if there is no output defined. + """ + + logging.info("Parsing specification file: output ...") + + idx = self._get_type_index("output") + if idx is None: + raise PresentationError("No output defined.") + + try: + self._specification["output"] = self._cfg_yaml[idx]["format"] + except KeyError: + raise PresentationError("No output defined.") + + logging.info("Done.") + + def _parse_static(self): + """Parse specification of the static content in the specification YAML + file. + """ + + logging.info("Parsing specification file: static content ...") + + idx = self._get_type_index("static") + if idx is None: + logging.warning("No static content specified.") + + for key, value in self._cfg_yaml[idx].items(): + if isinstance(value, str): + try: + self._cfg_yaml[idx][key] = self._replace_tags( + value, self._specification["environment"]["paths"]) + except KeyError: + pass + + self._specification["static"] = self._cfg_yaml[idx] + + logging.info("Done.") + + def _parse_elements(self): + """Parse elements (tables, plots) specification in the specification + YAML file. + """ + + logging.info("Parsing specification file: elements ...") + + count = 1 + for element in self._cfg_yaml: + try: + element["output-file"] = self._replace_tags( + element["output-file"], + self._specification["environment"]["paths"]) + except KeyError: + pass + if element["type"] == "table": + logging.info(" {:3d} Processing a table ...".format(count)) + try: + element["template"] = self._replace_tags( + element["template"], + self._specification["environment"]["paths"]) + except KeyError: + pass + self._specification["tables"].append(element) + count += 1 + elif element["type"] == "plot": + logging.info(" {:3d} Processing a plot ...".format(count)) + self._specification["plots"].append(element) + count += 1 + elif element["type"] == "file": + logging.info(" {:3d} Processing a file ...".format(count)) + try: + element["dir-tables"] = self._replace_tags( + element["dir-tables"], + self._specification["environment"]["paths"]) + except KeyError: + pass + self._specification["files"].append(element) + count += 1 + + logging.info("Done.") + + def read_specification(self): + """Parse specification in the specification YAML file. + + :raises: PresentationError if an error occurred while parsing the + specification file. + """ + try: + self._cfg_yaml = load(self._cfg_file) + except YAMLError as err: + raise PresentationError(msg="An error occurred while parsing the " + "specification file.", + details=str(err)) + + self._parse_env() + self._parse_debug() + if not self.debug: + self._parse_input() + self._parse_output() + self._parse_static() + self._parse_elements() + + logging.debug("Specification: \n{}". + format(pformat(self._specification))) |