+from __future__ import unicode_literals
+import logging
+from operator import methodcaller
+from ..exceptions import ElasticsearchException, TransportError
+from ..compat import map, string_types
+logger = logging.getLogger('elasticsearch.helpers')
+class BulkIndexError(ElasticsearchException):
+ @property
+ def errors(self):
+ """ List of errors from execution of the last chunk. """
+ return self.args[1]
+class ScanError(ElasticsearchException):
+ def __init__(self, scroll_id, *args, **kwargs):
+ super(ScanError, self).__init__(*args, **kwargs)
+ self.scroll_id = scroll_id
+def expand_action(data):
+ """
+ From one document or action definition passed in by the user extract the
+ action/data lines needed for elasticsearch's
+ :meth:`~elasticsearch.Elasticsearch.bulk` api.
+ """
+ # when given a string, assume user wants to index raw json
+ if isinstance(data, string_types):
+ return '{"index":{}}', data
+ # make sure we don't alter the action
+ data = data.copy()
+ op_type = data.pop('_op_type', 'index')
+ action = {op_type: {}}
+ for key in ('_index', '_parent', '_percolate', '_routing', '_timestamp',
+ '_ttl', '_type', '_version', '_version_type', '_id', '_retry_on_conflict'):
+ if key in data:
+ action[op_type][key] = data.pop(key)
+ # no data payload for delete
+ if op_type == 'delete':
+ return action, None
+ return action, data.get('_source', data)
+def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
+ """
+ Split actions into chunks by number or size, serialize them into strings in
+ the process.
+ """
+ bulk_actions = []
+ size, action_count = 0, 0
+ for action, data in actions:
+ action = serializer.dumps(action)
+ cur_size = len(action) + 1
+ if data is not None:
+ data = serializer.dumps(data)
+ cur_size += len(data) + 1
+ # full chunk, send it and start a new one
+ if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size):
+ yield bulk_actions
+ bulk_actions = []
+ size, action_count = 0, 0
+ bulk_actions.append(action)
+ if data is not None:
+ bulk_actions.append(data)
+ size += cur_size
+ action_count += 1
+ if bulk_actions:
+ yield bulk_actions
+def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_error=True, **kwargs):
+ """
+ Send a bulk request to elasticsearch and process the output.
+ """
+ # if raise on error is set, we need to collect errors per chunk before raising them
+ errors = []
+ try:
+ # send the actual request
+ resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)
+ except TransportError as e:
+ # default behavior - just propagate exception
+ if raise_on_exception:
+ raise e
+ # if we are not propagating, mark all actions in current chunk as failed
+ err_message = str(e)
+ exc_errors = []
+ # deserialize the data back, thisis expensive but only run on
+ # errors if raise_on_exception is false, so shouldn't be a real
+ # issue
+ bulk_data = map(client.transport.serializer.loads, bulk_actions)
+ while True:
+ try:
+ # collect all the information about failed actions
+ action = next(bulk_data)
+ op_type, action = action.popitem()
+ info = {"error": err_message, "status": e.status_code, "exception": e}
+ if op_type != 'delete':
+ info['data'] = next(bulk_data)
+ info.update(action)
+ exc_errors.append({op_type: info})
+ except StopIteration:
+ break
+ # emulate standard behavior for failed actions
+ if raise_on_error:
+ raise BulkIndexError('%i document(s) failed to index.' % len(exc_errors), exc_errors)
+ else:
+ for err in exc_errors:
+ yield False, err
+ return
+ # go through request-reponse pairs and detect failures
+ for op_type, item in map(methodcaller('popitem'), resp['items']):
+ ok = 200 <= item.get('status', 500) < 300
+ if not ok and raise_on_error:
+ errors.append({op_type: item})
+ if ok or not errors:
+ # if we are not just recording all errors to be able to raise
+ # them all at once, yield items individually
+ yield ok, {op_type: item}
+ if errors:
+ raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
+def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 * 1024,
+ raise_on_error=True, expand_action_callback=expand_action,
+ raise_on_exception=True, **kwargs):
+ """
+ Streaming bulk consumes actions from the iterable passed in and yields
+ results per action. For non-streaming usecases use
+ :func:`~elasticsearch.helpers.bulk` which is a wrapper around streaming
+ bulk that returns summary information about the bulk operation once the
+ entire input is consumed and sent.
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
+ :arg actions: iterable containing the actions to be executed
+ :arg chunk_size: number of docs in one chunk sent to es (default: 500)
+ :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
+ :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
+ from the execution of the last chunk when some occur. By default we raise.
+ :arg raise_on_exception: if ``False`` then don't propagate exceptions from
+ call to ``bulk`` and just report the items that failed as failed.
+ :arg expand_action_callback: callback executed on each action passed in,
+ should return a tuple containing the action line and the data line
+ (`None` if data line should be omitted).
+ """
+ actions = map(expand_action_callback, actions)
+ for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
+ for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
+ yield result
+def bulk(client, actions, stats_only=False, **kwargs):
+ """
+ Helper for the :meth:`~elasticsearch.Elasticsearch.bulk` api that provides
+ a more human friendly interface - it consumes an iterator of actions and
+ sends them to elasticsearch in chunks. It returns a tuple with summary
+ information - number of successfully executed actions and either list of
+ errors or number of errors if `stats_only` is set to `True`.
+ See :func:`~elasticsearch.helpers.streaming_bulk` for more accepted
+ parameters
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
+ :arg actions: iterator containing the actions
+ :arg stats_only: if `True` only report number of successful/failed
+ operations instead of just number of successful and a list of error responses
+ Any additional keyword arguments will be passed to
+ :func:`~elasticsearch.helpers.streaming_bulk` which is used to execute
+ the operation.
+ """
+ success, failed = 0, 0
+ # list of errors to be collected is not stats_only
+ errors = []
+ for ok, item in streaming_bulk(client, actions, **kwargs):
+ # go through request-reponse pairs and detect failures
+ if not ok:
+ if not stats_only:
+ errors.append(item)
+ failed += 1
+ else:
+ success += 1
+ return success, failed if stats_only else errors
+def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
+ max_chunk_bytes=100 * 1024 * 1024,
+ expand_action_callback=expand_action, **kwargs):
+ """
+ Parallel version of the bulk helper run in multiple threads at once.
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
+ :arg actions: iterator containing the actions
+ :arg thread_count: size of the threadpool to use for the bulk requests
+ :arg chunk_size: number of docs in one chunk sent to es (default: 500)
+ :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
+ :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
+ from the execution of the last chunk when some occur. By default we raise.
+ :arg raise_on_exception: if ``False`` then don't propagate exceptions from
+ call to ``bulk`` and just report the items that failed as failed.
+ :arg expand_action_callback: callback executed on each action passed in,
+ should return a tuple containing the action line and the data line
+ (`None` if data line should be omitted).
+ """
+ # Avoid importing multiprocessing unless parallel_bulk is used
+ # to avoid exceptions on restricted environments like App Engine
+ from multiprocessing.dummy import Pool
+ actions = map(expand_action_callback, actions)
+ pool = Pool(thread_count)
+ try:
+ for result in pool.imap(
+ lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
+ _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
+ ):
+ for item in result:
+ yield item
+ finally:
+ pool.close()
+ pool.join()
+def scan(client, query=None, scroll='5m', raise_on_error=True,
+ preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, **kwargs):
+ """
+ Simple abstraction on top of the
+ :meth:`~elasticsearch.Elasticsearch.scroll` api - a simple iterator that
+ yields all hits as returned by underlining scroll requests.
+ By default scan does not return results in any pre-determined order. To
+ have a standard order in the returned documents (either by score or
+ explicit sort definition) when scrolling, use ``preserve_order=True``. This
+ may be an expensive operation and will negate the performance benefits of
+ using ``scan``.
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
+ :arg query: body for the :meth:`` api
+ :arg scroll: Specify how long a consistent view of the index should be
+ maintained for scrolled search
+ :arg raise_on_error: raises an exception (``ScanError``) if an error is
+ encountered (some shards fail to execute). By default we raise.
+ :arg preserve_order: don't set the ``search_type`` to ``scan`` - this will
+ cause the scroll to paginate with preserving the order. Note that this
+ can be an extremely expensive operation and can easily lead to
+ unpredictable results, use with caution.
+ :arg size: size (per shard) of the batch send at each iteration.
+ :arg request_timeout: explicit timeout for each call to ``scan``
+ :arg clear_scroll: explicitly calls delete on the scroll id via the clear
+ scroll API at the end of the method on completion or error, defaults
+ to true.
+ Any additional keyword arguments will be passed to the initial
+ :meth:`` call::
+ scan(es,
+ query={"query": {"match": {"title": "python"}}},
+ index="orders-*",
+ doc_type="books"
+ )
+ """
+ if not preserve_order:
+ query = query.copy() if query else {}
+ query["sort"] = "_doc"
+ # initial search
+ resp =, scroll=scroll, size=size,
+ request_timeout=request_timeout, **kwargs)
+ scroll_id = resp.get('_scroll_id')
+ if scroll_id is None:
+ return
+ try:
+ first_run = True
+ while True:
+ # if we didn't set search_type to scan initial search contains data
+ if first_run:
+ first_run = False
+ else:
+ resp = client.scroll(scroll_id, scroll=scroll, request_timeout=request_timeout)
+ for hit in resp['hits']['hits']:
+ yield hit
+ # check if we have any errrors
+ if resp["_shards"]["failed"]:
+ logger.warning(
+ 'Scroll request has failed on %d shards out of %d.',
+ resp['_shards']['failed'], resp['_shards']['total']
+ )
+ if raise_on_error:
+ raise ScanError(
+ scroll_id,
+ 'Scroll request has failed on %d shards out of %d.' %
+ (resp['_shards']['failed'], resp['_shards']['total'])
+ )
+ scroll_id = resp.get('_scroll_id')
+ # end of scroll
+ if scroll_id is None or not resp['hits']['hits']:
+ break
+ finally:
+ if scroll_id and clear_scroll:
+ client.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))
+def reindex(client, source_index, target_index, query=None, target_client=None,
+ chunk_size=500, scroll='5m', scan_kwargs={}, bulk_kwargs={}):
+ """
+ Reindex all documents from one index that satisfy a given query
+ to another, potentially (if `target_client` is specified) on a different cluster.
+ If you don't specify the query you will reindex all the documents.
+ Since ``2.3`` a :meth:`~elasticsearch.Elasticsearch.reindex` api is
+ available as part of elasticsearch itself. It is recommended to use the api
+ instead of this helper wherever possible. The helper is here mostly for
+ backwards compatibility and for situations where more flexibility is
+ needed.
+ .. note::
+ This helper doesn't transfer mappings, just the data.
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use (for
+ read if `target_client` is specified as well)
+ :arg source_index: index (or list of indices) to read documents from
+ :arg target_index: name of the index in the target cluster to populate
+ :arg query: body for the :meth:`` api
+ :arg target_client: optional, is specified will be used for writing (thus
+ enabling reindex between clusters)
+ :arg chunk_size: number of docs in one chunk sent to es (default: 500)
+ :arg scroll: Specify how long a consistent view of the index should be
+ maintained for scrolled search
+ :arg scan_kwargs: additional kwargs to be passed to
+ :func:`~elasticsearch.helpers.scan`
+ :arg bulk_kwargs: additional kwargs to be passed to
+ :func:`~elasticsearch.helpers.bulk`
+ """
+ target_client = client if target_client is None else target_client
+ docs = scan(client,
+ query=query,
+ index=source_index,
+ scroll=scroll,
+ **scan_kwargs
+ )
+ def _change_doc_index(hits, index):
+ for h in hits:
+ h['_index'] = index
+ if 'fields' in h:
+ h.update(h.pop('fields'))
+ yield h
+ kwargs = {
+ 'stats_only': True,
+ }
+ kwargs.update(bulk_kwargs)
+ return bulk(target_client, _change_doc_index(docs, target_index),
+ chunk_size=chunk_size, **kwargs)
diff --git a/scripts/external_libs/elasticsearch/elasticsearch/helpers/ b/scripts/external_libs/elasticsearch/elasticsearch/helpers/
new file mode 100644
index 00000000..6fdd2ec0
--- /dev/null
+++ b/scripts/external_libs/elasticsearch/elasticsearch/helpers/
@@ -0,0 +1,60 @@
+import time
+import os
+ # python 2.6
+ from unittest2 import TestCase, SkipTest
+except ImportError:
+ from unittest import TestCase, SkipTest
+from elasticsearch import Elasticsearch
+from elasticsearch.exceptions import ConnectionError
+def get_test_client(nowait=False, **kwargs):
+ # construct kwargs from the environment
+ kw = {'timeout': 30}
+ if 'TEST_ES_CONNECTION' in os.environ:
+ from elasticsearch import connection
+ kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION'])
+ kw.update(kwargs)
+ client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw)
+ # wait for yellow status
+ for _ in range(1 if nowait else 100):
+ try:
+ return client
+ except ConnectionError:
+ time.sleep(.1)
+ else:
+ # timeout
+ raise SkipTest("Elasticsearch failed to start.")
+def _get_version(version_string):
+ if '.' not in version_string:
+ return ()
+ version = version_string.strip().split('.')
+ return tuple(int(v) if v.isdigit() else 999 for v in version)
+class ElasticsearchTestCase(TestCase):
+ @staticmethod
+ def _get_client():
+ return get_test_client()
+ @classmethod
+ def setUpClass(cls):
+ super(ElasticsearchTestCase, cls).setUpClass()
+ cls.client = cls._get_client()
+ def tearDown(self):
+ super(ElasticsearchTestCase, self).tearDown()
+ self.client.indices.delete(index='*', ignore=404)
+ self.client.indices.delete_template(name='*', ignore=404)
+ @property
+ def es_version(self):
+ if not hasattr(self, '_es_version'):
+ version_string =['version']['number']
+ self._es_version = _get_version(version_string)
+ return self._es_version