summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/elasticsearch/elasticsearch/connection_pool.py
blob: 3ec43edfb24614e02490142b5aa343cdf7ca125e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import time
import random
import logging

try:
    from Queue import PriorityQueue, Empty
except ImportError:
    from queue import PriorityQueue, Empty

from .exceptions import ImproperlyConfigured

logger = logging.getLogger('elasticsearch')

class ConnectionSelector(object):
    """
    Simple class used to select a connection from a list of currently live
    connection instances. In init time it is passed a dictionary containing all
    the connections' options which it can then use during the selection
    process. When the `select` method is called it is given a list of
    *currently* live connections to choose from.

    The options dictionary is the one that has been passed to
    :class:`~elasticsearch.Transport` as `hosts` param and the same that is
    used to construct the Connection object itself. When the Connection was
    created from information retrieved from the cluster via the sniffing
    process it will be the dictionary returned by the `host_info_callback`.

    Example of where this would be useful is a zone-aware selector that would
    only select connections from it's own zones and only fall back to other
    connections where there would be none in it's zones.
    """
    def __init__(self, opts):
        """
        :arg opts: dictionary of connection instances and their options
        """
        self.connection_opts = opts

    def select(self, connections):
        """
        Select a connection from the given list.

        :arg connections: list of live connections to choose from
        """
        pass


class RandomSelector(ConnectionSelector):
    """
    Select a connection at random
    """
    def select(self, connections):
        return random.choice(connections)


class RoundRobinSelector(ConnectionSelector):
    """
    Selector using round-robin.
    """
    def __init__(self, opts):
        super(RoundRobinSelector, self).__init__(opts)
        self.rr = -1

    def select(self, connections):
        self.rr += 1
        self.rr %= len(connections)
        return connections[self.rr]

class ConnectionPool(object):
    """
    Container holding the :class:`~elasticsearch.Connection` instances,
    managing the selection process (via a
    :class:`~elasticsearch.ConnectionSelector`) and dead connections.

    It's only interactions are with the :class:`~elasticsearch.Transport` class
    that drives all the actions within `ConnectionPool`.

    Initially connections are stored on the class as a list and, along with the
    connection options, get passed to the `ConnectionSelector` instance for
    future reference.

    Upon each request the `Transport` will ask for a `Connection` via the
    `get_connection` method. If the connection fails (it's `perform_request`
    raises a `ConnectionError`) it will be marked as dead (via `mark_dead`) and
    put on a timeout (if it fails N times in a row the timeout is exponentially
    longer - the formula is `default_timeout * 2 ** (fail_count - 1)`). When
    the timeout is over the connection will be resurrected and returned to the
    live pool. A connection that has been peviously marked as dead and
    succeedes will be marked as live (it's fail count will be deleted).
    """
    def __init__(self, connections, dead_timeout=60, timeout_cutoff=5,
        selector_class=RoundRobinSelector, randomize_hosts=True, **kwargs):
        """
        :arg connections: list of tuples containing the
            :class:`~elasticsearch.Connection` instance and it's options
        :arg dead_timeout: number of seconds a connection should be retired for
            after a failure, increases on consecutive failures
        :arg timeout_cutoff: number of consecutive failures after which the
            timeout doesn't increase
        :arg selector_class: :class:`~elasticsearch.ConnectionSelector`
            subclass to use if more than one connection is live
        :arg randomize_hosts: shuffle the list of connections upon arrival to
            avoid dog piling effect across processes
        """
        if not connections:
            raise ImproperlyConfigured("No defined connections, you need to "
                    "specify at least one host.")
        self.connection_opts = connections
        self.connections = [c for (c, opts) in connections]
        # remember original connection list for resurrect(force=True)
        self.orig_connections = tuple(self.connections)
        # PriorityQueue for thread safety and ease of timeout management
        self.dead = PriorityQueue(len(self.connections))
        self.dead_count = {}

        if randomize_hosts:
            # randomize the connection list to avoid all clients hitting same node
            # after startup/restart
            random.shuffle(self.connections)

        # default timeout after which to try resurrecting a connection
        self.dead_timeout = dead_timeout
        self.timeout_cutoff = timeout_cutoff

        self.selector = selector_class(dict(connections))

    def mark_dead(self, connection, now=None):
        """
        Mark the connection as dead (failed). Remove it from the live pool and
        put it on a timeout.

        :arg connection: the failed instance
        """
        # allow inject for testing purposes
        now = now if now else time.time()
        try:
            self.connections.remove(connection)
        except ValueError:
            # connection not alive or another thread marked it already, ignore
            return
        else:
            dead_count = self.dead_count.get(connection, 0) + 1
            self.dead_count[connection] = dead_count
            timeout = self.dead_timeout * 2 ** min(dead_count - 1, self.timeout_cutoff)
            self.dead.put((now + timeout, connection))
            logger.warning(
                'Connection %r has failed for %i times in a row, putting on %i second timeout.',
                connection, dead_count, timeout
            )

    def mark_live(self, connection):
        """
        Mark connection as healthy after a resurrection. Resets the fail
        counter for the connection.

        :arg connection: the connection to redeem
        """
        try:
            del self.dead_count[connection]
        except KeyError:
            # race condition, safe to ignore
            pass

    def resurrect(self, force=False):
        """
        Attempt to resurrect a connection from the dead pool. It will try to
        locate one (not all) eligible (it's timeout is over) connection to
        return to the live pool. Any resurrected connection is also returned.

        :arg force: resurrect a connection even if there is none eligible (used
            when we have no live connections). If force is specified resurrect
            always returns a connection.

        """
        # no dead connections
        if self.dead.empty():
            # we are forced to return a connection, take one from the original
            # list. This is to avoid a race condition where get_connection can
            # see no live connections but when it calls resurrect self.dead is
            # also empty. We assume that other threat has resurrected all
            # available connections so we can safely return one at random.
            if force:
                return random.choice(self.orig_connections)
            return

        try:
            # retrieve a connection to check
            timeout, connection = self.dead.get(block=False)
        except Empty:
            # other thread has been faster and the queue is now empty. If we
            # are forced, return a connection at random again.
            if force:
                return random.choice(self.orig_connections)
            return

        if not force and timeout > time.time():
            # return it back if not eligible and not forced
            self.dead.put((timeout, connection))
            return

        # either we were forced or the connection is elligible to be retried
        self.connections.append(connection)
        logger.info('Resurrecting connection %r (force=%s).', connection, force)
        return connection

    def get_connection(self):
        """
        Return a connection from the pool using the `ConnectionSelector`
        instance.

        It tries to resurrect eligible connections, forces a resurrection when
        no connections are availible and passes the list of live connections to
        the selector instance to choose from.

        Returns a connection instance and it's current fail count.
        """
        self.resurrect()
        connections = self.connections[:]

        # no live nodes, resurrect one by force and return it
        if not connections:
            return self.resurrect(True)

        # only call selector if we have a selection
        if len(connections) > 1:
            return self.selector.select(connections)

        # only one connection, no need for a selector
        return connections[0]

    def close(self):
        """
        Explicitly closes connections
        """
        for conn in self.orig_connections:
            conn.close()

class DummyConnectionPool(ConnectionPool):
    def __init__(self, connections, **kwargs):
        if len(connections) != 1:
            raise ImproperlyConfigured("DummyConnectionPool needs exactly one "
                    "connection defined.")
        # we need connection opts for sniffing logic
        self.connection_opts = connections
        self.connection = connections[0][0]
        self.connections = (self.connection, )

    def get_connection(self):
        return self.connection

    def close(self):
        """
        Explicitly closes connections
        """
        self.connection.close()

    def _noop(self, *args, **kwargs):
        pass
    mark_dead = mark_live = resurrect = _noop