aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools/storage/storage.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/tools/storage/storage.py')
-rwxr-xr-xresources/tools/storage/storage.py270
1 files changed, 270 insertions, 0 deletions
diff --git a/resources/tools/storage/storage.py b/resources/tools/storage/storage.py
new file mode 100755
index 0000000000..9932bc3bb5
--- /dev/null
+++ b/resources/tools/storage/storage.py
@@ -0,0 +1,270 @@
+#!/usr/bin/env/env python3
+
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Storage Backend Class."""
+
+from json import loads
+from struct import unpack
+from gzip import GzipFile
+
+from boto3 import Session
+from botocore import exceptions
+
+S3_API_LIMIT = 1048576
+
+
+class Storage:
+ """Class implementing storage object retrieval.
+ S3 Select API allows us to retrieve a subset of data by using simple SQL
+ expressions. By using Select API to retrieve only the data needed by the
+ application, drastic performance improvements can be achieved.
+ """
+ def __init__(self, endpoint_url, bucket, profile_name):
+ """Class init function to create S3 client object.
+
+ :param endpoint_url: S3 storage endpoint url.
+ :param bucket: S3 parent bucket.
+ :param profile_name: S3 storage configuration.
+ :type endpoint_url: str
+ :type bucket: str
+ :type profile_name: str
+ """
+ self.endpoint_url = endpoint_url
+ self.bucket = bucket
+ self.profile_name = profile_name
+
+ self.session = Session(profile_name=self.profile_name)
+ self.client = self.session.client(
+ service_name=u"s3", endpoint_url=self.endpoint_url
+ )
+ self.resource = self.session.resource(
+ service_name=u"s3", endpoint_url=self.endpoint_url
+ )
+
+ def __repr__(self):
+ """Return a string executable as Python constructor call.
+
+ :returns: Executable constructor call.
+ :rtype: str
+ """
+ return (
+ f"Storage(endpoint_url={self.endpoint_url!r}, "
+ f"bucket={self.bucket!r}, "
+ f"profile_name={self.profile_name!r})"
+ )
+
+ def _get_matching_s3_keys(
+ self, bucket, prefix=u"", suffix=u""):
+ """This function generates the keys in an S3 bucket. Function act as
+ a Python generator object.
+
+ :param bucket: Name of the S3 bucket.
+ :param prefix: Only fetch keys that start with this prefix (optional).
+ :param suffix: Only fetch keys that end with this suffix (optional).
+ :type bucket: str
+ :type prefix: str
+ :type suffix: str
+ :raises RuntimeError: If connection to storage fails.
+ """
+ kwargs = {
+ u"Bucket": bucket
+ }
+
+ prefixes = (prefix, ) if isinstance(prefix, str) else prefix
+
+ for key_prefix in prefixes:
+ kwargs[u"Prefix"] = key_prefix
+ try:
+ paginator = self.client.get_paginator(u"list_objects_v2")
+ for page in paginator.paginate(**kwargs):
+ try:
+ contents = page[u"Contents"]
+ except KeyError:
+ break
+
+ for obj in contents:
+ key = obj[u"Key"]
+ if key.endswith(suffix):
+ yield obj
+ except exceptions.EndpointConnectionError:
+ raise RuntimeError(
+ u"Connection Error!"
+ )
+
+ def _get_matching_s3_content(
+ self, key, expression):
+ """This function filters the contents of an S3 object based on a simple
+ structured query language (SQL) statement. In the request, along with
+ the SQL expression, we are specifying JSON serialization of the object.
+ S3 uses this format to parse object data into records, and returns only
+ records that match the specified SQL expression. Data serialization
+ format for the response is set to JSON.
+
+ :param key: S3 Key (file path).
+ :param expression: S3 compatible SQL query.
+ :type key: str
+ :type expression: str
+ :returns: JSON content of interest.
+ :rtype: str
+ :raises RuntimeError: If connection to storage fails.
+ :raises ValueError: If JSON reading fails.
+ """
+ try:
+ content = self.client.select_object_content(
+ Bucket=self.bucket,
+ Key=key,
+ ExpressionType=u"SQL",
+ Expression=expression,
+ InputSerialization={
+ u"JSON": {
+ u"Type": u"Document"
+ },
+ u"CompressionType": u"GZIP"
+ },
+ OutputSerialization={
+ u"JSON": {
+ u"RecordDelimiter": u""
+ }
+ }
+ )
+ records = u""
+ for event in content[u"Payload"]:
+ if u"Records" in event:
+ records = event[u"Records"][u"Payload"].decode(u"utf-8")
+ return records
+ except exceptions.EndpointConnectionError:
+ raise RuntimeError(
+ u"Connection Error!"
+ )
+ except exceptions.EventStreamError:
+ raise ValueError(
+ u"Malformed JSON content!"
+ )
+
+ def _get_matching_s3_object(
+ self, key):
+ """Gets full S3 object. If the file is gzip'd it will be unpacked.
+
+ :param key: Name of the S3 key (file).
+ :type key: str
+ :returns: JSON file of interest.
+ :rtype: str
+ :raises RuntimeError: If connection to storage fails.
+ """
+ try:
+ streaming_object = self.client.get_object(
+ Bucket=self.bucket,
+ Key=key
+ )[u"Body"]
+ with GzipFile(fileobj=streaming_object) as gzipfile:
+ content = gzipfile.read()
+ return content
+ except exceptions.EndpointConnectionError:
+ raise RuntimeError(
+ u"Connection Error!"
+ )
+
+ def _get_matching_s3_length(
+ self, key):
+ """Gets the file size of S3 object. If the file is gzip'd the packed
+ size is reported.
+
+ :param key: Name of the S3 key (file).
+ :type key: str
+ :returns: File size in bytes. Defaults to 0 if any error.
+ :rtype: int
+ :raises RuntimeError: If connection to storage fails.
+ """
+ try:
+ compressed_size = self.client.get_object(
+ Bucket=self.bucket,
+ Key=key
+ )[u"ContentLength"]
+ last_four_bytes = self.client.get_object(
+ Bucket=self.bucket,
+ Key=key,
+ Range=f"bytes={compressed_size-4}-{compressed_size}"
+ )[u"Body"]
+ return unpack(u"I", last_four_bytes.read(4))[0]
+ except exceptions.EndpointConnectionError:
+ return 0
+
+ def is_large_file(
+ self, key):
+ """Returns True if file is larger then 1MB that S3 select allows.
+
+ :param key: Name of the S3 key (file).
+ :type key: str
+ :returns: Returns True if file is large then 1MB that S3 select allows.
+ :rtype: bool
+ """
+ return bool(
+ self._get_matching_s3_length(key=key[u"Key"]) > S3_API_LIMIT
+ )
+
+ def s3_file_processing(
+ self, prefix=u"", suffix=u"json.gz",
+ expression=u"select * from s3object s"):
+ """Batch S3 key processing. Function retrieves list of files and use
+ S3 Select API to query content.
+
+ :param prefix: Only fetch keys that start with this prefix (optional).
+ :param suffix: Only fetch keys that end with this suffix (optional).
+ :param expression: S3 compatible SQL query (optional).
+ :type prefix: str
+ :type suffix: str
+ :type expression: str
+ """
+ key_iterator = self._get_matching_s3_keys(
+ bucket=self.bucket,
+ prefix=prefix,
+ suffix=suffix
+ )
+
+ for key in key_iterator:
+ try:
+ yield key[u"Key"], loads(
+ self._get_matching_s3_content(
+ key=key[u"Key"], expression=expression
+ )
+ )
+ except ValueError:
+ return
+
+ def s3_dump_file_processing(
+ self, prefix=u"", suffix=u"json.gz"):
+ """Batch S3 key processing. Function retrieves list of files and use
+ S3 Get Object API to query content.
+
+ :param prefix: Only fetch keys that start with this prefix (optional).
+ :param suffix: Only fetch keys that end with this suffix (optional).
+ :type prefix: str
+ :type suffix: str
+ """
+ key_iterator = self._get_matching_s3_keys(
+ bucket=self.bucket,
+ prefix=prefix,
+ suffix=suffix
+ )
+
+ for key in key_iterator:
+ try:
+ yield loads(
+ self._get_matching_s3_object(
+ key=key[u"Key"]
+ )
+ )
+ except ValueError:
+ return