summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/elasticsearch/elasticsearch/connection
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/elasticsearch/elasticsearch/connection')
-rw-r--r--scripts/external_libs/elasticsearch/elasticsearch/connection/__init__.py3
-rw-r--r--scripts/external_libs/elasticsearch/elasticsearch/connection/base.py124
-rw-r--r--scripts/external_libs/elasticsearch/elasticsearch/connection/http_requests.py96
-rw-r--r--scripts/external_libs/elasticsearch/elasticsearch/connection/http_urllib3.py135
-rw-r--r--scripts/external_libs/elasticsearch/elasticsearch/connection/pooling.py33
5 files changed, 391 insertions, 0 deletions
diff --git a/scripts/external_libs/elasticsearch/elasticsearch/connection/__init__.py b/scripts/external_libs/elasticsearch/elasticsearch/connection/__init__.py
new file mode 100644
index 00000000..c2f484a1
--- /dev/null
+++ b/scripts/external_libs/elasticsearch/elasticsearch/connection/__init__.py
@@ -0,0 +1,3 @@
+from .base import Connection
+from .http_requests import RequestsHttpConnection
+from .http_urllib3 import Urllib3HttpConnection
diff --git a/scripts/external_libs/elasticsearch/elasticsearch/connection/base.py b/scripts/external_libs/elasticsearch/elasticsearch/connection/base.py
new file mode 100644
index 00000000..c0206b2e
--- /dev/null
+++ b/scripts/external_libs/elasticsearch/elasticsearch/connection/base.py
@@ -0,0 +1,124 @@
+import logging
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+from ..exceptions import TransportError, HTTP_EXCEPTIONS
+
+logger = logging.getLogger('elasticsearch')
+
+# create the elasticsearch.trace logger, but only set propagate to False if the
+# logger hasn't already been configured
+_tracer_already_configured = 'elasticsearch.trace' in logging.Logger.manager.loggerDict
+tracer = logging.getLogger('elasticsearch.trace')
+if not _tracer_already_configured:
+ tracer.propagate = False
+
+
+class Connection(object):
+ """
+ Class responsible for maintaining a connection to an Elasticsearch node. It
+ holds persistent connection pool to it and it's main interface
+ (`perform_request`) is thread-safe.
+
+ Also responsible for logging.
+ """
+ transport_schema = 'http'
+
+ def __init__(self, host='localhost', port=9200, use_ssl=False, url_prefix='', timeout=10, **kwargs):
+ """
+ :arg host: hostname of the node (default: localhost)
+ :arg port: port to use (integer, default: 9200)
+ :arg url_prefix: optional url prefix for elasticsearch
+ :arg timeout: default timeout in seconds (float, default: 10)
+ """
+ scheme = self.transport_schema
+ if use_ssl:
+ scheme += 's'
+ self.host = '%s://%s:%s' % (scheme, host, port)
+ if url_prefix:
+ url_prefix = '/' + url_prefix.strip('/')
+ self.url_prefix = url_prefix
+ self.timeout = timeout
+
+ def __repr__(self):
+ return '<%s: %s>' % (self.__class__.__name__, self.host)
+
+ def _pretty_json(self, data):
+ # pretty JSON in tracer curl logs
+ try:
+ return json.dumps(json.loads(data), sort_keys=True, indent=2, separators=(',', ': ')).replace("'", r'\u0027')
+ except (ValueError, TypeError):
+ # non-json data or a bulk request
+ return data
+
+ def _log_trace(self, method, path, body, status_code, response, duration):
+ if not tracer.isEnabledFor(logging.INFO) or not tracer.handlers:
+ return
+
+ # include pretty in trace curls
+ path = path.replace('?', '?pretty&', 1) if '?' in path else path + '?pretty'
+ if self.url_prefix:
+ path = path.replace(self.url_prefix, '', 1)
+ tracer.info("curl -X%s 'http://localhost:9200%s' -d '%s'", method, path, self._pretty_json(body) if body else '')
+
+ if tracer.isEnabledFor(logging.DEBUG):
+ tracer.debug('#[%s] (%.3fs)\n#%s', status_code, duration, self._pretty_json(response).replace('\n', '\n#') if response else '')
+
+ def log_request_success(self, method, full_url, path, body, status_code, response, duration):
+ """ Log a successful API call. """
+ # TODO: optionally pass in params instead of full_url and do urlencode only when needed
+
+ # body has already been serialized to utf-8, deserialize it for logging
+ # TODO: find a better way to avoid (de)encoding the body back and forth
+ if body:
+ body = body.decode('utf-8')
+
+ logger.info(
+ '%s %s [status:%s request:%.3fs]', method, full_url,
+ status_code, duration
+ )
+ logger.debug('> %s', body)
+ logger.debug('< %s', response)
+
+ self._log_trace(method, path, body, status_code, response, duration)
+
+ def log_request_fail(self, method, full_url, path, body, duration, status_code=None, response=None, exception=None):
+ """ Log an unsuccessful API call. """
+ # do not log 404s on HEAD requests
+ if method == 'HEAD' and status_code == 404:
+ return
+ logger.warning(
+ '%s %s [status:%s request:%.3fs]', method, full_url,
+ status_code or 'N/A', duration, exc_info=exception is not None
+ )
+
+ # body has already been serialized to utf-8, deserialize it for logging
+ # TODO: find a better way to avoid (de)encoding the body back and forth
+ if body:
+ body = body.decode('utf-8')
+
+ logger.debug('> %s', body)
+
+ self._log_trace(method, path, body, status_code, response, duration)
+
+ if response is not None:
+ logger.debug('< %s', response)
+
+ def _raise_error(self, status_code, raw_data):
+ """ Locate appropriate exception and raise it. """
+ error_message = raw_data
+ additional_info = None
+ try:
+ if raw_data:
+ additional_info = json.loads(raw_data)
+ error_message = additional_info.get('error', error_message)
+ if isinstance(error_message, dict) and 'type' in error_message:
+ error_message = error_message['type']
+ except (ValueError, TypeError) as err:
+ logger.warning('Undecodable raw error response from server: %s', err)
+
+ raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
+
+
diff --git a/scripts/external_libs/elasticsearch/elasticsearch/connection/http_requests.py b/scripts/external_libs/elasticsearch/elasticsearch/connection/http_requests.py
new file mode 100644
index 00000000..6c9a924b
--- /dev/null
+++ b/scripts/external_libs/elasticsearch/elasticsearch/connection/http_requests.py
@@ -0,0 +1,96 @@
+import time
+import warnings
+try:
+ import requests
+ REQUESTS_AVAILABLE = True
+except ImportError:
+ REQUESTS_AVAILABLE = False
+
+from .base import Connection
+from ..exceptions import ConnectionError, ImproperlyConfigured, ConnectionTimeout, SSLError
+from ..compat import urlencode, string_types
+
+class RequestsHttpConnection(Connection):
+ """
+ Connection using the `requests` library.
+
+ :arg http_auth: optional http auth information as either ':' separated
+ string or a tuple. Any value will be passed into requests as `auth`.
+ :arg use_ssl: use ssl for the connection if `True`
+ :arg verify_certs: whether to verify SSL certificates
+ :arg ca_certs: optional path to CA bundle. By default standard requests'
+ bundle will be used.
+ :arg client_cert: path to the file containing the private key and the
+ certificate, or cert only if using client_key
+ :arg client_key: path to the file containing the private key if using
+ separate cert and key files (client_cert will contain only the cert)
+ :arg headers: any custom http headers to be add to requests
+ """
+ def __init__(self, host='localhost', port=9200, http_auth=None,
+ use_ssl=False, verify_certs=True, ca_certs=None, client_cert=None,
+ client_key=None, headers=None, **kwargs):
+ if not REQUESTS_AVAILABLE:
+ raise ImproperlyConfigured("Please install requests to use RequestsHttpConnection.")
+
+ super(RequestsHttpConnection, self).__init__(host=host, port=port, **kwargs)
+ self.session = requests.Session()
+ self.session.headers = headers
+ if http_auth is not None:
+ if isinstance(http_auth, (tuple, list)):
+ http_auth = tuple(http_auth)
+ elif isinstance(http_auth, string_types):
+ http_auth = tuple(http_auth.split(':', 1))
+ self.session.auth = http_auth
+ self.base_url = 'http%s://%s:%d%s' % (
+ 's' if use_ssl else '',
+ host, port, self.url_prefix
+ )
+ self.session.verify = verify_certs
+ if not client_key:
+ self.session.cert = client_cert
+ elif client_cert:
+ # cert is a tuple of (certfile, keyfile)
+ self.session.cert = (client_cert, client_key)
+ if ca_certs:
+ if not verify_certs:
+ raise ImproperlyConfigured("You cannot pass CA certificates when verify SSL is off.")
+ self.session.verify = ca_certs
+
+ if use_ssl and not verify_certs:
+ warnings.warn(
+ 'Connecting to %s using SSL with verify_certs=False is insecure.' % self.base_url)
+
+ def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
+ url = self.base_url + url
+ if params:
+ url = '%s?%s' % (url, urlencode(params or {}))
+
+ start = time.time()
+ try:
+ response = self.session.request(method, url, data=body, timeout=timeout or self.timeout)
+ duration = time.time() - start
+ raw_data = response.text
+ except requests.exceptions.SSLError as e:
+ self.log_request_fail(method, url, response.request.path_url, body, time.time() - start, exception=e)
+ raise SSLError('N/A', str(e), e)
+ except requests.Timeout as e:
+ self.log_request_fail(method, url, response.request.path_url, body, time.time() - start, exception=e)
+ raise ConnectionTimeout('TIMEOUT', str(e), e)
+ except requests.ConnectionError as e:
+ self.log_request_fail(method, url, response.request.path_url, body, time.time() - start, exception=e)
+ raise ConnectionError('N/A', str(e), e)
+
+ # raise errors based on http status codes, let the client handle those if needed
+ if not (200 <= response.status_code < 300) and response.status_code not in ignore:
+ self.log_request_fail(method, url, response.request.path_url, body, duration, response.status_code, raw_data)
+ self._raise_error(response.status_code, raw_data)
+
+ self.log_request_success(method, url, response.request.path_url, body, response.status_code, raw_data, duration)
+
+ return response.status_code, response.headers, raw_data
+
+ def close(self):
+ """
+ Explicitly closes connections
+ """
+ self.session.close()
diff --git a/scripts/external_libs/elasticsearch/elasticsearch/connection/http_urllib3.py b/scripts/external_libs/elasticsearch/elasticsearch/connection/http_urllib3.py
new file mode 100644
index 00000000..066466cd
--- /dev/null
+++ b/scripts/external_libs/elasticsearch/elasticsearch/connection/http_urllib3.py
@@ -0,0 +1,135 @@
+import time
+import urllib3
+from urllib3.exceptions import ReadTimeoutError, SSLError as UrllibSSLError
+import warnings
+
+CA_CERTS = None
+
+try:
+ import certifi
+ CA_CERTS = certifi.where()
+except ImportError:
+ pass
+
+from .base import Connection
+from ..exceptions import ConnectionError, ImproperlyConfigured, ConnectionTimeout, SSLError
+from ..compat import urlencode
+
+class Urllib3HttpConnection(Connection):
+ """
+ Default connection class using the `urllib3` library and the http protocol.
+
+ :arg host: hostname of the node (default: localhost)
+ :arg port: port to use (integer, default: 9200)
+ :arg url_prefix: optional url prefix for elasticsearch
+ :arg timeout: default timeout in seconds (float, default: 10)
+ :arg http_auth: optional http auth information as either ':' separated
+ string or a tuple
+ :arg use_ssl: use ssl for the connection if `True`
+ :arg verify_certs: whether to verify SSL certificates
+ :arg ca_certs: optional path to CA bundle. See
+ https://urllib3.readthedocs.io/en/latest/security.html#using-certifi-with-urllib3
+ for instructions how to get default set
+ :arg client_cert: path to the file containing the private key and the
+ certificate, or cert only if using client_key
+ :arg client_key: path to the file containing the private key if using
+ separate cert and key files (client_cert will contain only the cert)
+ :arg ssl_version: version of the SSL protocol to use. Choices are:
+ SSLv23 (default) SSLv2 SSLv3 TLSv1 (see ``PROTOCOL_*`` constants in the
+ ``ssl`` module for exact options for your environment).
+ :arg ssl_assert_hostname: use hostname verification if not `False`
+ :arg ssl_assert_fingerprint: verify the supplied certificate fingerprint if not `None`
+ :arg maxsize: the number of connections which will be kept open to this
+ host. See https://urllib3.readthedocs.io/en/1.4/pools.html#api for more
+ information.
+ :arg headers: any custom http headers to be add to requests
+ """
+ def __init__(self, host='localhost', port=9200, http_auth=None,
+ use_ssl=False, verify_certs=True, ca_certs=None, client_cert=None,
+ client_key=None, ssl_version=None, ssl_assert_hostname=None,
+ ssl_assert_fingerprint=None, maxsize=10, headers=None, **kwargs):
+
+ super(Urllib3HttpConnection, self).__init__(host=host, port=port, use_ssl=use_ssl, **kwargs)
+ self.headers = headers.copy() if headers else {}
+ self.headers.update(urllib3.make_headers(keep_alive=True))
+ if http_auth is not None:
+ if isinstance(http_auth, (tuple, list)):
+ http_auth = ':'.join(http_auth)
+ self.headers.update(urllib3.make_headers(basic_auth=http_auth))
+
+ ca_certs = CA_CERTS if ca_certs is None else ca_certs
+ pool_class = urllib3.HTTPConnectionPool
+ kw = {}
+ if use_ssl:
+ pool_class = urllib3.HTTPSConnectionPool
+ kw.update({
+ 'ssl_version': ssl_version,
+ 'assert_hostname': ssl_assert_hostname,
+ 'assert_fingerprint': ssl_assert_fingerprint,
+ })
+
+ if verify_certs:
+ if not ca_certs:
+ raise ImproperlyConfigured("Root certificates are missing for certificate "
+ "validation. Either pass them in using the ca_certs parameter or "
+ "install certifi to use it automatically.")
+
+ kw.update({
+ 'cert_reqs': 'CERT_REQUIRED',
+ 'ca_certs': ca_certs,
+ 'cert_file': client_cert,
+ 'key_file': client_key,
+ })
+ else:
+ warnings.warn(
+ 'Connecting to %s using SSL with verify_certs=False is insecure.' % host)
+
+ self.pool = pool_class(host, port=port, timeout=self.timeout, maxsize=maxsize, **kw)
+
+ def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
+ url = self.url_prefix + url
+ if params:
+ url = '%s?%s' % (url, urlencode(params))
+ full_url = self.host + url
+
+ start = time.time()
+ try:
+ kw = {}
+ if timeout:
+ kw['timeout'] = timeout
+
+ # in python2 we need to make sure the url and method are not
+ # unicode. Otherwise the body will be decoded into unicode too and
+ # that will fail (#133, #201).
+ if not isinstance(url, str):
+ url = url.encode('utf-8')
+ if not isinstance(method, str):
+ method = method.encode('utf-8')
+
+ response = self.pool.urlopen(method, url, body, retries=False, headers=self.headers, **kw)
+ duration = time.time() - start
+ raw_data = response.data.decode('utf-8')
+ except UrllibSSLError as e:
+ self.log_request_fail(method, full_url, url, body, time.time() - start, exception=e)
+ raise SSLError('N/A', str(e), e)
+ except ReadTimeoutError as e:
+ self.log_request_fail(method, full_url, url, body, time.time() - start, exception=e)
+ raise ConnectionTimeout('TIMEOUT', str(e), e)
+ except Exception as e:
+ self.log_request_fail(method, full_url, url, body, time.time() - start, exception=e)
+ raise ConnectionError('N/A', str(e), e)
+
+ if not (200 <= response.status < 300) and response.status not in ignore:
+ self.log_request_fail(method, full_url, url, body, duration, response.status, raw_data)
+ self._raise_error(response.status, raw_data)
+
+ self.log_request_success(method, full_url, url, body, response.status,
+ raw_data, duration)
+
+ return response.status, response.getheaders(), raw_data
+
+ def close(self):
+ """
+ Explicitly closes connection
+ """
+ self.pool.close()
diff --git a/scripts/external_libs/elasticsearch/elasticsearch/connection/pooling.py b/scripts/external_libs/elasticsearch/elasticsearch/connection/pooling.py
new file mode 100644
index 00000000..3115e7c2
--- /dev/null
+++ b/scripts/external_libs/elasticsearch/elasticsearch/connection/pooling.py
@@ -0,0 +1,33 @@
+try:
+ import queue
+except ImportError:
+ import Queue as queue
+from .base import Connection
+
+
+class PoolingConnection(Connection):
+ """
+ Base connection class for connections that use libraries without thread
+ safety and no capacity for connection pooling. To use this just implement a
+ ``_make_connection`` method that constructs a new connection and returns
+ it.
+ """
+ def __init__(self, *args, **kwargs):
+ self._free_connections = queue.Queue()
+ super(PoolingConnection, self).__init__(*args, **kwargs)
+
+ def _get_connection(self):
+ try:
+ return self._free_connections.get_nowait()
+ except queue.Empty:
+ return self._make_connection()
+
+ def _release_connection(self, con):
+ self._free_connections.put(con)
+
+ def close(self):
+ """
+ Explicitly close connection
+ """
+ pass
+