diff options
author | Hanoh Haim <hhaim@cisco.com> | 2017-01-12 13:47:39 +0200 |
---|---|---|
committer | Hanoh Haim <hhaim@cisco.com> | 2017-01-15 17:10:16 +0200 |
commit | 420216e583706fbd7bf214818fcce0143a05e982 (patch) | |
tree | 0fd39bac06af7e12889406b0f20cd40527e0d18f /scripts/external_libs/elasticsearch/elasticsearch/helpers | |
parent | 4e5a651c8e052cdbcad73f6af5ce065ffd6dbce4 (diff) |
add elk
Signed-off-by: Hanoh Haim <hhaim@cisco.com>
Diffstat (limited to 'scripts/external_libs/elasticsearch/elasticsearch/helpers')
-rw-r--r-- | scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py | 375 | ||||
-rw-r--r-- | scripts/external_libs/elasticsearch/elasticsearch/helpers/test.py | 60 |
2 files changed, 435 insertions, 0 deletions
diff --git a/scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py b/scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py new file mode 100644 index 00000000..953b644f --- /dev/null +++ b/scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py @@ -0,0 +1,375 @@ +from __future__ import unicode_literals + +import logging +from operator import methodcaller + +from ..exceptions import ElasticsearchException, TransportError +from ..compat import map, string_types + +logger = logging.getLogger('elasticsearch.helpers') + +class BulkIndexError(ElasticsearchException): + @property + def errors(self): + """ List of errors from execution of the last chunk. """ + return self.args[1] + + +class ScanError(ElasticsearchException): + def __init__(self, scroll_id, *args, **kwargs): + super(ScanError, self).__init__(*args, **kwargs) + self.scroll_id = scroll_id + +def expand_action(data): + """ + From one document or action definition passed in by the user extract the + action/data lines needed for elasticsearch's + :meth:`~elasticsearch.Elasticsearch.bulk` api. + """ + # when given a string, assume user wants to index raw json + if isinstance(data, string_types): + return '{"index":{}}', data + + # make sure we don't alter the action + data = data.copy() + op_type = data.pop('_op_type', 'index') + action = {op_type: {}} + for key in ('_index', '_parent', '_percolate', '_routing', '_timestamp', + '_ttl', '_type', '_version', '_version_type', '_id', '_retry_on_conflict'): + if key in data: + action[op_type][key] = data.pop(key) + + # no data payload for delete + if op_type == 'delete': + return action, None + + return action, data.get('_source', data) + +def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer): + """ + Split actions into chunks by number or size, serialize them into strings in + the process. + """ + bulk_actions = [] + size, action_count = 0, 0 + for action, data in actions: + action = serializer.dumps(action) + cur_size = len(action) + 1 + + if data is not None: + data = serializer.dumps(data) + cur_size += len(data) + 1 + + # full chunk, send it and start a new one + if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size): + yield bulk_actions + bulk_actions = [] + size, action_count = 0, 0 + + bulk_actions.append(action) + if data is not None: + bulk_actions.append(data) + size += cur_size + action_count += 1 + + if bulk_actions: + yield bulk_actions + +def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_error=True, **kwargs): + """ + Send a bulk request to elasticsearch and process the output. + """ + # if raise on error is set, we need to collect errors per chunk before raising them + errors = [] + + try: + # send the actual request + resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs) + except TransportError as e: + # default behavior - just propagate exception + if raise_on_exception: + raise e + + # if we are not propagating, mark all actions in current chunk as failed + err_message = str(e) + exc_errors = [] + + # deserialize the data back, thisis expensive but only run on + # errors if raise_on_exception is false, so shouldn't be a real + # issue + bulk_data = map(client.transport.serializer.loads, bulk_actions) + while True: + try: + # collect all the information about failed actions + action = next(bulk_data) + op_type, action = action.popitem() + info = {"error": err_message, "status": e.status_code, "exception": e} + if op_type != 'delete': + info['data'] = next(bulk_data) + info.update(action) + exc_errors.append({op_type: info}) + except StopIteration: + break + + # emulate standard behavior for failed actions + if raise_on_error: + raise BulkIndexError('%i document(s) failed to index.' % len(exc_errors), exc_errors) + else: + for err in exc_errors: + yield False, err + return + + # go through request-reponse pairs and detect failures + for op_type, item in map(methodcaller('popitem'), resp['items']): + ok = 200 <= item.get('status', 500) < 300 + if not ok and raise_on_error: + errors.append({op_type: item}) + + if ok or not errors: + # if we are not just recording all errors to be able to raise + # them all at once, yield items individually + yield ok, {op_type: item} + + if errors: + raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors) + +def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 * 1024, + raise_on_error=True, expand_action_callback=expand_action, + raise_on_exception=True, **kwargs): + """ + Streaming bulk consumes actions from the iterable passed in and yields + results per action. For non-streaming usecases use + :func:`~elasticsearch.helpers.bulk` which is a wrapper around streaming + bulk that returns summary information about the bulk operation once the + entire input is consumed and sent. + + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use + :arg actions: iterable containing the actions to be executed + :arg chunk_size: number of docs in one chunk sent to es (default: 500) + :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB) + :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) + from the execution of the last chunk when some occur. By default we raise. + :arg raise_on_exception: if ``False`` then don't propagate exceptions from + call to ``bulk`` and just report the items that failed as failed. + :arg expand_action_callback: callback executed on each action passed in, + should return a tuple containing the action line and the data line + (`None` if data line should be omitted). + """ + actions = map(expand_action_callback, actions) + + for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer): + for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs): + yield result + +def bulk(client, actions, stats_only=False, **kwargs): + """ + Helper for the :meth:`~elasticsearch.Elasticsearch.bulk` api that provides + a more human friendly interface - it consumes an iterator of actions and + sends them to elasticsearch in chunks. It returns a tuple with summary + information - number of successfully executed actions and either list of + errors or number of errors if `stats_only` is set to `True`. + + See :func:`~elasticsearch.helpers.streaming_bulk` for more accepted + parameters + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use + :arg actions: iterator containing the actions + :arg stats_only: if `True` only report number of successful/failed + operations instead of just number of successful and a list of error responses + + Any additional keyword arguments will be passed to + :func:`~elasticsearch.helpers.streaming_bulk` which is used to execute + the operation. + """ + success, failed = 0, 0 + + # list of errors to be collected is not stats_only + errors = [] + + for ok, item in streaming_bulk(client, actions, **kwargs): + # go through request-reponse pairs and detect failures + if not ok: + if not stats_only: + errors.append(item) + failed += 1 + else: + success += 1 + + return success, failed if stats_only else errors + +def parallel_bulk(client, actions, thread_count=4, chunk_size=500, + max_chunk_bytes=100 * 1024 * 1024, + expand_action_callback=expand_action, **kwargs): + """ + Parallel version of the bulk helper run in multiple threads at once. + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use + :arg actions: iterator containing the actions + :arg thread_count: size of the threadpool to use for the bulk requests + :arg chunk_size: number of docs in one chunk sent to es (default: 500) + :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB) + :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) + from the execution of the last chunk when some occur. By default we raise. + :arg raise_on_exception: if ``False`` then don't propagate exceptions from + call to ``bulk`` and just report the items that failed as failed. + :arg expand_action_callback: callback executed on each action passed in, + should return a tuple containing the action line and the data line + (`None` if data line should be omitted). + """ + # Avoid importing multiprocessing unless parallel_bulk is used + # to avoid exceptions on restricted environments like App Engine + from multiprocessing.dummy import Pool + actions = map(expand_action_callback, actions) + + pool = Pool(thread_count) + + try: + for result in pool.imap( + lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)), + _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer) + ): + for item in result: + yield item + + finally: + pool.close() + pool.join() + +def scan(client, query=None, scroll='5m', raise_on_error=True, + preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, **kwargs): + """ + Simple abstraction on top of the + :meth:`~elasticsearch.Elasticsearch.scroll` api - a simple iterator that + yields all hits as returned by underlining scroll requests. + + By default scan does not return results in any pre-determined order. To + have a standard order in the returned documents (either by score or + explicit sort definition) when scrolling, use ``preserve_order=True``. This + may be an expensive operation and will negate the performance benefits of + using ``scan``. + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use + :arg query: body for the :meth:`~elasticsearch.Elasticsearch.search` api + :arg scroll: Specify how long a consistent view of the index should be + maintained for scrolled search + :arg raise_on_error: raises an exception (``ScanError``) if an error is + encountered (some shards fail to execute). By default we raise. + :arg preserve_order: don't set the ``search_type`` to ``scan`` - this will + cause the scroll to paginate with preserving the order. Note that this + can be an extremely expensive operation and can easily lead to + unpredictable results, use with caution. + :arg size: size (per shard) of the batch send at each iteration. + :arg request_timeout: explicit timeout for each call to ``scan`` + :arg clear_scroll: explicitly calls delete on the scroll id via the clear + scroll API at the end of the method on completion or error, defaults + to true. + + Any additional keyword arguments will be passed to the initial + :meth:`~elasticsearch.Elasticsearch.search` call:: + + scan(es, + query={"query": {"match": {"title": "python"}}}, + index="orders-*", + doc_type="books" + ) + + """ + if not preserve_order: + query = query.copy() if query else {} + query["sort"] = "_doc" + # initial search + resp = client.search(body=query, scroll=scroll, size=size, + request_timeout=request_timeout, **kwargs) + + scroll_id = resp.get('_scroll_id') + if scroll_id is None: + return + + try: + first_run = True + while True: + # if we didn't set search_type to scan initial search contains data + if first_run: + first_run = False + else: + resp = client.scroll(scroll_id, scroll=scroll, request_timeout=request_timeout) + + for hit in resp['hits']['hits']: + yield hit + + # check if we have any errrors + if resp["_shards"]["failed"]: + logger.warning( + 'Scroll request has failed on %d shards out of %d.', + resp['_shards']['failed'], resp['_shards']['total'] + ) + if raise_on_error: + raise ScanError( + scroll_id, + 'Scroll request has failed on %d shards out of %d.' % + (resp['_shards']['failed'], resp['_shards']['total']) + ) + + scroll_id = resp.get('_scroll_id') + # end of scroll + if scroll_id is None or not resp['hits']['hits']: + break + finally: + if scroll_id and clear_scroll: + client.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, )) + +def reindex(client, source_index, target_index, query=None, target_client=None, + chunk_size=500, scroll='5m', scan_kwargs={}, bulk_kwargs={}): + + """ + Reindex all documents from one index that satisfy a given query + to another, potentially (if `target_client` is specified) on a different cluster. + If you don't specify the query you will reindex all the documents. + + Since ``2.3`` a :meth:`~elasticsearch.Elasticsearch.reindex` api is + available as part of elasticsearch itself. It is recommended to use the api + instead of this helper wherever possible. The helper is here mostly for + backwards compatibility and for situations where more flexibility is + needed. + + .. note:: + + This helper doesn't transfer mappings, just the data. + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use (for + read if `target_client` is specified as well) + :arg source_index: index (or list of indices) to read documents from + :arg target_index: name of the index in the target cluster to populate + :arg query: body for the :meth:`~elasticsearch.Elasticsearch.search` api + :arg target_client: optional, is specified will be used for writing (thus + enabling reindex between clusters) + :arg chunk_size: number of docs in one chunk sent to es (default: 500) + :arg scroll: Specify how long a consistent view of the index should be + maintained for scrolled search + :arg scan_kwargs: additional kwargs to be passed to + :func:`~elasticsearch.helpers.scan` + :arg bulk_kwargs: additional kwargs to be passed to + :func:`~elasticsearch.helpers.bulk` + """ + target_client = client if target_client is None else target_client + + docs = scan(client, + query=query, + index=source_index, + scroll=scroll, + **scan_kwargs + ) + def _change_doc_index(hits, index): + for h in hits: + h['_index'] = index + if 'fields' in h: + h.update(h.pop('fields')) + yield h + + kwargs = { + 'stats_only': True, + } + kwargs.update(bulk_kwargs) + return bulk(target_client, _change_doc_index(docs, target_index), + chunk_size=chunk_size, **kwargs) diff --git a/scripts/external_libs/elasticsearch/elasticsearch/helpers/test.py b/scripts/external_libs/elasticsearch/elasticsearch/helpers/test.py new file mode 100644 index 00000000..6fdd2ec0 --- /dev/null +++ b/scripts/external_libs/elasticsearch/elasticsearch/helpers/test.py @@ -0,0 +1,60 @@ +import time +import os +try: + # python 2.6 + from unittest2 import TestCase, SkipTest +except ImportError: + from unittest import TestCase, SkipTest + +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import ConnectionError + +def get_test_client(nowait=False, **kwargs): + # construct kwargs from the environment + kw = {'timeout': 30} + if 'TEST_ES_CONNECTION' in os.environ: + from elasticsearch import connection + kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION']) + + kw.update(kwargs) + client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw) + + # wait for yellow status + for _ in range(1 if nowait else 100): + try: + client.cluster.health(wait_for_status='yellow') + return client + except ConnectionError: + time.sleep(.1) + else: + # timeout + raise SkipTest("Elasticsearch failed to start.") + +def _get_version(version_string): + if '.' not in version_string: + return () + version = version_string.strip().split('.') + return tuple(int(v) if v.isdigit() else 999 for v in version) + +class ElasticsearchTestCase(TestCase): + @staticmethod + def _get_client(): + return get_test_client() + + @classmethod + def setUpClass(cls): + super(ElasticsearchTestCase, cls).setUpClass() + cls.client = cls._get_client() + + def tearDown(self): + super(ElasticsearchTestCase, self).tearDown() + self.client.indices.delete(index='*', ignore=404) + self.client.indices.delete_template(name='*', ignore=404) + + @property + def es_version(self): + if not hasattr(self, '_es_version'): + version_string = self.client.info()['version']['number'] + self._es_version = _get_version(version_string) + return self._es_version + |