summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py')
-rw-r--r--scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py375
1 files changed, 375 insertions, 0 deletions
diff --git a/scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py b/scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py
new file mode 100644
index 00000000..953b644f
--- /dev/null
+++ b/scripts/external_libs/elasticsearch/elasticsearch/helpers/__init__.py
@@ -0,0 +1,375 @@
+from __future__ import unicode_literals
+
+import logging
+from operator import methodcaller
+
+from ..exceptions import ElasticsearchException, TransportError
+from ..compat import map, string_types
+
+logger = logging.getLogger('elasticsearch.helpers')
+
+class BulkIndexError(ElasticsearchException):
+ @property
+ def errors(self):
+ """ List of errors from execution of the last chunk. """
+ return self.args[1]
+
+
+class ScanError(ElasticsearchException):
+ def __init__(self, scroll_id, *args, **kwargs):
+ super(ScanError, self).__init__(*args, **kwargs)
+ self.scroll_id = scroll_id
+
+def expand_action(data):
+ """
+ From one document or action definition passed in by the user extract the
+ action/data lines needed for elasticsearch's
+ :meth:`~elasticsearch.Elasticsearch.bulk` api.
+ """
+ # when given a string, assume user wants to index raw json
+ if isinstance(data, string_types):
+ return '{"index":{}}', data
+
+ # make sure we don't alter the action
+ data = data.copy()
+ op_type = data.pop('_op_type', 'index')
+ action = {op_type: {}}
+ for key in ('_index', '_parent', '_percolate', '_routing', '_timestamp',
+ '_ttl', '_type', '_version', '_version_type', '_id', '_retry_on_conflict'):
+ if key in data:
+ action[op_type][key] = data.pop(key)
+
+ # no data payload for delete
+ if op_type == 'delete':
+ return action, None
+
+ return action, data.get('_source', data)
+
+def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
+ """
+ Split actions into chunks by number or size, serialize them into strings in
+ the process.
+ """
+ bulk_actions = []
+ size, action_count = 0, 0
+ for action, data in actions:
+ action = serializer.dumps(action)
+ cur_size = len(action) + 1
+
+ if data is not None:
+ data = serializer.dumps(data)
+ cur_size += len(data) + 1
+
+ # full chunk, send it and start a new one
+ if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size):
+ yield bulk_actions
+ bulk_actions = []
+ size, action_count = 0, 0
+
+ bulk_actions.append(action)
+ if data is not None:
+ bulk_actions.append(data)
+ size += cur_size
+ action_count += 1
+
+ if bulk_actions:
+ yield bulk_actions
+
+def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_error=True, **kwargs):
+ """
+ Send a bulk request to elasticsearch and process the output.
+ """
+ # if raise on error is set, we need to collect errors per chunk before raising them
+ errors = []
+
+ try:
+ # send the actual request
+ resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)
+ except TransportError as e:
+ # default behavior - just propagate exception
+ if raise_on_exception:
+ raise e
+
+ # if we are not propagating, mark all actions in current chunk as failed
+ err_message = str(e)
+ exc_errors = []
+
+ # deserialize the data back, thisis expensive but only run on
+ # errors if raise_on_exception is false, so shouldn't be a real
+ # issue
+ bulk_data = map(client.transport.serializer.loads, bulk_actions)
+ while True:
+ try:
+ # collect all the information about failed actions
+ action = next(bulk_data)
+ op_type, action = action.popitem()
+ info = {"error": err_message, "status": e.status_code, "exception": e}
+ if op_type != 'delete':
+ info['data'] = next(bulk_data)
+ info.update(action)
+ exc_errors.append({op_type: info})
+ except StopIteration:
+ break
+
+ # emulate standard behavior for failed actions
+ if raise_on_error:
+ raise BulkIndexError('%i document(s) failed to index.' % len(exc_errors), exc_errors)
+ else:
+ for err in exc_errors:
+ yield False, err
+ return
+
+ # go through request-reponse pairs and detect failures
+ for op_type, item in map(methodcaller('popitem'), resp['items']):
+ ok = 200 <= item.get('status', 500) < 300
+ if not ok and raise_on_error:
+ errors.append({op_type: item})
+
+ if ok or not errors:
+ # if we are not just recording all errors to be able to raise
+ # them all at once, yield items individually
+ yield ok, {op_type: item}
+
+ if errors:
+ raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
+
+def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1024 * 1024,
+ raise_on_error=True, expand_action_callback=expand_action,
+ raise_on_exception=True, **kwargs):
+ """
+ Streaming bulk consumes actions from the iterable passed in and yields
+ results per action. For non-streaming usecases use
+ :func:`~elasticsearch.helpers.bulk` which is a wrapper around streaming
+ bulk that returns summary information about the bulk operation once the
+ entire input is consumed and sent.
+
+
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
+ :arg actions: iterable containing the actions to be executed
+ :arg chunk_size: number of docs in one chunk sent to es (default: 500)
+ :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
+ :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
+ from the execution of the last chunk when some occur. By default we raise.
+ :arg raise_on_exception: if ``False`` then don't propagate exceptions from
+ call to ``bulk`` and just report the items that failed as failed.
+ :arg expand_action_callback: callback executed on each action passed in,
+ should return a tuple containing the action line and the data line
+ (`None` if data line should be omitted).
+ """
+ actions = map(expand_action_callback, actions)
+
+ for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
+ for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
+ yield result
+
+def bulk(client, actions, stats_only=False, **kwargs):
+ """
+ Helper for the :meth:`~elasticsearch.Elasticsearch.bulk` api that provides
+ a more human friendly interface - it consumes an iterator of actions and
+ sends them to elasticsearch in chunks. It returns a tuple with summary
+ information - number of successfully executed actions and either list of
+ errors or number of errors if `stats_only` is set to `True`.
+
+ See :func:`~elasticsearch.helpers.streaming_bulk` for more accepted
+ parameters
+
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
+ :arg actions: iterator containing the actions
+ :arg stats_only: if `True` only report number of successful/failed
+ operations instead of just number of successful and a list of error responses
+
+ Any additional keyword arguments will be passed to
+ :func:`~elasticsearch.helpers.streaming_bulk` which is used to execute
+ the operation.
+ """
+ success, failed = 0, 0
+
+ # list of errors to be collected is not stats_only
+ errors = []
+
+ for ok, item in streaming_bulk(client, actions, **kwargs):
+ # go through request-reponse pairs and detect failures
+ if not ok:
+ if not stats_only:
+ errors.append(item)
+ failed += 1
+ else:
+ success += 1
+
+ return success, failed if stats_only else errors
+
+def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
+ max_chunk_bytes=100 * 1024 * 1024,
+ expand_action_callback=expand_action, **kwargs):
+ """
+ Parallel version of the bulk helper run in multiple threads at once.
+
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
+ :arg actions: iterator containing the actions
+ :arg thread_count: size of the threadpool to use for the bulk requests
+ :arg chunk_size: number of docs in one chunk sent to es (default: 500)
+ :arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
+ :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
+ from the execution of the last chunk when some occur. By default we raise.
+ :arg raise_on_exception: if ``False`` then don't propagate exceptions from
+ call to ``bulk`` and just report the items that failed as failed.
+ :arg expand_action_callback: callback executed on each action passed in,
+ should return a tuple containing the action line and the data line
+ (`None` if data line should be omitted).
+ """
+ # Avoid importing multiprocessing unless parallel_bulk is used
+ # to avoid exceptions on restricted environments like App Engine
+ from multiprocessing.dummy import Pool
+ actions = map(expand_action_callback, actions)
+
+ pool = Pool(thread_count)
+
+ try:
+ for result in pool.imap(
+ lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
+ _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
+ ):
+ for item in result:
+ yield item
+
+ finally:
+ pool.close()
+ pool.join()
+
+def scan(client, query=None, scroll='5m', raise_on_error=True,
+ preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, **kwargs):
+ """
+ Simple abstraction on top of the
+ :meth:`~elasticsearch.Elasticsearch.scroll` api - a simple iterator that
+ yields all hits as returned by underlining scroll requests.
+
+ By default scan does not return results in any pre-determined order. To
+ have a standard order in the returned documents (either by score or
+ explicit sort definition) when scrolling, use ``preserve_order=True``. This
+ may be an expensive operation and will negate the performance benefits of
+ using ``scan``.
+
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
+ :arg query: body for the :meth:`~elasticsearch.Elasticsearch.search` api
+ :arg scroll: Specify how long a consistent view of the index should be
+ maintained for scrolled search
+ :arg raise_on_error: raises an exception (``ScanError``) if an error is
+ encountered (some shards fail to execute). By default we raise.
+ :arg preserve_order: don't set the ``search_type`` to ``scan`` - this will
+ cause the scroll to paginate with preserving the order. Note that this
+ can be an extremely expensive operation and can easily lead to
+ unpredictable results, use with caution.
+ :arg size: size (per shard) of the batch send at each iteration.
+ :arg request_timeout: explicit timeout for each call to ``scan``
+ :arg clear_scroll: explicitly calls delete on the scroll id via the clear
+ scroll API at the end of the method on completion or error, defaults
+ to true.
+
+ Any additional keyword arguments will be passed to the initial
+ :meth:`~elasticsearch.Elasticsearch.search` call::
+
+ scan(es,
+ query={"query": {"match": {"title": "python"}}},
+ index="orders-*",
+ doc_type="books"
+ )
+
+ """
+ if not preserve_order:
+ query = query.copy() if query else {}
+ query["sort"] = "_doc"
+ # initial search
+ resp = client.search(body=query, scroll=scroll, size=size,
+ request_timeout=request_timeout, **kwargs)
+
+ scroll_id = resp.get('_scroll_id')
+ if scroll_id is None:
+ return
+
+ try:
+ first_run = True
+ while True:
+ # if we didn't set search_type to scan initial search contains data
+ if first_run:
+ first_run = False
+ else:
+ resp = client.scroll(scroll_id, scroll=scroll, request_timeout=request_timeout)
+
+ for hit in resp['hits']['hits']:
+ yield hit
+
+ # check if we have any errrors
+ if resp["_shards"]["failed"]:
+ logger.warning(
+ 'Scroll request has failed on %d shards out of %d.',
+ resp['_shards']['failed'], resp['_shards']['total']
+ )
+ if raise_on_error:
+ raise ScanError(
+ scroll_id,
+ 'Scroll request has failed on %d shards out of %d.' %
+ (resp['_shards']['failed'], resp['_shards']['total'])
+ )
+
+ scroll_id = resp.get('_scroll_id')
+ # end of scroll
+ if scroll_id is None or not resp['hits']['hits']:
+ break
+ finally:
+ if scroll_id and clear_scroll:
+ client.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))
+
+def reindex(client, source_index, target_index, query=None, target_client=None,
+ chunk_size=500, scroll='5m', scan_kwargs={}, bulk_kwargs={}):
+
+ """
+ Reindex all documents from one index that satisfy a given query
+ to another, potentially (if `target_client` is specified) on a different cluster.
+ If you don't specify the query you will reindex all the documents.
+
+ Since ``2.3`` a :meth:`~elasticsearch.Elasticsearch.reindex` api is
+ available as part of elasticsearch itself. It is recommended to use the api
+ instead of this helper wherever possible. The helper is here mostly for
+ backwards compatibility and for situations where more flexibility is
+ needed.
+
+ .. note::
+
+ This helper doesn't transfer mappings, just the data.
+
+ :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use (for
+ read if `target_client` is specified as well)
+ :arg source_index: index (or list of indices) to read documents from
+ :arg target_index: name of the index in the target cluster to populate
+ :arg query: body for the :meth:`~elasticsearch.Elasticsearch.search` api
+ :arg target_client: optional, is specified will be used for writing (thus
+ enabling reindex between clusters)
+ :arg chunk_size: number of docs in one chunk sent to es (default: 500)
+ :arg scroll: Specify how long a consistent view of the index should be
+ maintained for scrolled search
+ :arg scan_kwargs: additional kwargs to be passed to
+ :func:`~elasticsearch.helpers.scan`
+ :arg bulk_kwargs: additional kwargs to be passed to
+ :func:`~elasticsearch.helpers.bulk`
+ """
+ target_client = client if target_client is None else target_client
+
+ docs = scan(client,
+ query=query,
+ index=source_index,
+ scroll=scroll,
+ **scan_kwargs
+ )
+ def _change_doc_index(hits, index):
+ for h in hits:
+ h['_index'] = index
+ if 'fields' in h:
+ h.update(h.pop('fields'))
+ yield h
+
+ kwargs = {
+ 'stats_only': True,
+ }
+ kwargs.update(bulk_kwargs)
+ return bulk(target_client, _change_doc_index(docs, target_index),
+ chunk_size=chunk_size, **kwargs)