diff options
author | pmikus <pmikus@cisco.com> | 2021-12-02 14:17:15 +0100 |
---|---|---|
committer | pmikus <pmikus@cisco.com> | 2021-12-02 14:17:15 +0100 |
commit | 69eef7c73337c331e30a5b6e15b76a10c580e093 (patch) | |
tree | d1504ff257be8af55e5f08292b06d3f24d714637 /resources/tools/storage/storage.py | |
parent | aab8ce333a2243ef4ae738dc5e00b5d338f80cfa (diff) |
feat(AWS): Remove obsolete code
Signed-off-by: pmikus <pmikus@cisco.com>
Change-Id: I97bba8f2092bc0554702a3acac10594426bd79c3
Diffstat (limited to 'resources/tools/storage/storage.py')
-rwxr-xr-x | resources/tools/storage/storage.py | 270 |
1 files changed, 0 insertions, 270 deletions
diff --git a/resources/tools/storage/storage.py b/resources/tools/storage/storage.py deleted file mode 100755 index 9932bc3bb5..0000000000 --- a/resources/tools/storage/storage.py +++ /dev/null @@ -1,270 +0,0 @@ -#!/usr/bin/env/env python3 - -# Copyright (c) 2021 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Storage Backend Class.""" - -from json import loads -from struct import unpack -from gzip import GzipFile - -from boto3 import Session -from botocore import exceptions - -S3_API_LIMIT = 1048576 - - -class Storage: - """Class implementing storage object retrieval. - S3 Select API allows us to retrieve a subset of data by using simple SQL - expressions. By using Select API to retrieve only the data needed by the - application, drastic performance improvements can be achieved. - """ - def __init__(self, endpoint_url, bucket, profile_name): - """Class init function to create S3 client object. - - :param endpoint_url: S3 storage endpoint url. - :param bucket: S3 parent bucket. - :param profile_name: S3 storage configuration. - :type endpoint_url: str - :type bucket: str - :type profile_name: str - """ - self.endpoint_url = endpoint_url - self.bucket = bucket - self.profile_name = profile_name - - self.session = Session(profile_name=self.profile_name) - self.client = self.session.client( - service_name=u"s3", endpoint_url=self.endpoint_url - ) - self.resource = self.session.resource( - service_name=u"s3", endpoint_url=self.endpoint_url - ) - - def __repr__(self): - """Return a string executable as Python constructor call. - - :returns: Executable constructor call. - :rtype: str - """ - return ( - f"Storage(endpoint_url={self.endpoint_url!r}, " - f"bucket={self.bucket!r}, " - f"profile_name={self.profile_name!r})" - ) - - def _get_matching_s3_keys( - self, bucket, prefix=u"", suffix=u""): - """This function generates the keys in an S3 bucket. Function act as - a Python generator object. - - :param bucket: Name of the S3 bucket. - :param prefix: Only fetch keys that start with this prefix (optional). - :param suffix: Only fetch keys that end with this suffix (optional). - :type bucket: str - :type prefix: str - :type suffix: str - :raises RuntimeError: If connection to storage fails. - """ - kwargs = { - u"Bucket": bucket - } - - prefixes = (prefix, ) if isinstance(prefix, str) else prefix - - for key_prefix in prefixes: - kwargs[u"Prefix"] = key_prefix - try: - paginator = self.client.get_paginator(u"list_objects_v2") - for page in paginator.paginate(**kwargs): - try: - contents = page[u"Contents"] - except KeyError: - break - - for obj in contents: - key = obj[u"Key"] - if key.endswith(suffix): - yield obj - except exceptions.EndpointConnectionError: - raise RuntimeError( - u"Connection Error!" - ) - - def _get_matching_s3_content( - self, key, expression): - """This function filters the contents of an S3 object based on a simple - structured query language (SQL) statement. In the request, along with - the SQL expression, we are specifying JSON serialization of the object. - S3 uses this format to parse object data into records, and returns only - records that match the specified SQL expression. Data serialization - format for the response is set to JSON. - - :param key: S3 Key (file path). - :param expression: S3 compatible SQL query. - :type key: str - :type expression: str - :returns: JSON content of interest. - :rtype: str - :raises RuntimeError: If connection to storage fails. - :raises ValueError: If JSON reading fails. - """ - try: - content = self.client.select_object_content( - Bucket=self.bucket, - Key=key, - ExpressionType=u"SQL", - Expression=expression, - InputSerialization={ - u"JSON": { - u"Type": u"Document" - }, - u"CompressionType": u"GZIP" - }, - OutputSerialization={ - u"JSON": { - u"RecordDelimiter": u"" - } - } - ) - records = u"" - for event in content[u"Payload"]: - if u"Records" in event: - records = event[u"Records"][u"Payload"].decode(u"utf-8") - return records - except exceptions.EndpointConnectionError: - raise RuntimeError( - u"Connection Error!" - ) - except exceptions.EventStreamError: - raise ValueError( - u"Malformed JSON content!" - ) - - def _get_matching_s3_object( - self, key): - """Gets full S3 object. If the file is gzip'd it will be unpacked. - - :param key: Name of the S3 key (file). - :type key: str - :returns: JSON file of interest. - :rtype: str - :raises RuntimeError: If connection to storage fails. - """ - try: - streaming_object = self.client.get_object( - Bucket=self.bucket, - Key=key - )[u"Body"] - with GzipFile(fileobj=streaming_object) as gzipfile: - content = gzipfile.read() - return content - except exceptions.EndpointConnectionError: - raise RuntimeError( - u"Connection Error!" - ) - - def _get_matching_s3_length( - self, key): - """Gets the file size of S3 object. If the file is gzip'd the packed - size is reported. - - :param key: Name of the S3 key (file). - :type key: str - :returns: File size in bytes. Defaults to 0 if any error. - :rtype: int - :raises RuntimeError: If connection to storage fails. - """ - try: - compressed_size = self.client.get_object( - Bucket=self.bucket, - Key=key - )[u"ContentLength"] - last_four_bytes = self.client.get_object( - Bucket=self.bucket, - Key=key, - Range=f"bytes={compressed_size-4}-{compressed_size}" - )[u"Body"] - return unpack(u"I", last_four_bytes.read(4))[0] - except exceptions.EndpointConnectionError: - return 0 - - def is_large_file( - self, key): - """Returns True if file is larger then 1MB that S3 select allows. - - :param key: Name of the S3 key (file). - :type key: str - :returns: Returns True if file is large then 1MB that S3 select allows. - :rtype: bool - """ - return bool( - self._get_matching_s3_length(key=key[u"Key"]) > S3_API_LIMIT - ) - - def s3_file_processing( - self, prefix=u"", suffix=u"json.gz", - expression=u"select * from s3object s"): - """Batch S3 key processing. Function retrieves list of files and use - S3 Select API to query content. - - :param prefix: Only fetch keys that start with this prefix (optional). - :param suffix: Only fetch keys that end with this suffix (optional). - :param expression: S3 compatible SQL query (optional). - :type prefix: str - :type suffix: str - :type expression: str - """ - key_iterator = self._get_matching_s3_keys( - bucket=self.bucket, - prefix=prefix, - suffix=suffix - ) - - for key in key_iterator: - try: - yield key[u"Key"], loads( - self._get_matching_s3_content( - key=key[u"Key"], expression=expression - ) - ) - except ValueError: - return - - def s3_dump_file_processing( - self, prefix=u"", suffix=u"json.gz"): - """Batch S3 key processing. Function retrieves list of files and use - S3 Get Object API to query content. - - :param prefix: Only fetch keys that start with this prefix (optional). - :param suffix: Only fetch keys that end with this suffix (optional). - :type prefix: str - :type suffix: str - """ - key_iterator = self._get_matching_s3_keys( - bucket=self.bucket, - prefix=prefix, - suffix=suffix - ) - - for key in key_iterator: - try: - yield loads( - self._get_matching_s3_object( - key=key[u"Key"] - ) - ) - except ValueError: - return |