summaryrefslogtreecommitdiffstats
path: root/external_libs/python/jsonrpclib-pelix-0.2.5/jsonrpclib/threadpool.py
blob: a38b5b8330be14eb69c32c52ba77b6097bdd11b6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
#!/usr/bin/env python
# -- Content-Encoding: UTF-8 --
"""
Cached thread pool, inspired from Pelix/iPOPO Thread Pool

:author: Thomas Calmant
:copyright: Copyright 2015, isandlaTech
:license: Apache License 2.0
:version: 0.2.5

..

    Copyright 2015 isandlaTech

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""

# Documentation strings format
__docformat__ = "restructuredtext en"

# Module version
__version_info__ = (0, 2, 5)
__version__ = ".".join(str(x) for x in __version_info__)

# ------------------------------------------------------------------------------

# Standard library
import logging
import threading

try:
    # Python 3
    # pylint: disable=F0401
    import queue
except ImportError:
    # Python 2
    # pylint: disable=F0401
    import Queue as queue

# ------------------------------------------------------------------------------


class EventData(object):
    """
    A threading event with some associated data
    """
    def __init__(self):
        """
        Sets up the event
        """
        self.__event = threading.Event()
        self.__data = None
        self.__exception = None

    @property
    def data(self):
        """
        Returns the associated value
        """
        return self.__data

    @property
    def exception(self):
        """
        Returns the exception used to stop the wait() method
        """
        return self.__exception

    def clear(self):
        """
        Clears the event
        """
        self.__event.clear()
        self.__data = None
        self.__exception = None

    def is_set(self):
        """
        Checks if the event is set
        """
        return self.__event.is_set()

    def set(self, data=None):
        """
        Sets the event
        """
        self.__data = data
        self.__exception = None
        self.__event.set()

    def raise_exception(self, exception):
        """
        Raises an exception in wait()

        :param exception: An Exception object
        """
        self.__data = None
        self.__exception = exception
        self.__event.set()

    def wait(self, timeout=None):
        """
        Waits for the event or for the timeout

        :param timeout: Wait timeout (in seconds)
        :return: True if the event as been set, else False
        """
        # The 'or' part is for Python 2.6
        result = self.__event.wait(timeout) or self.__event.is_set()
        # pylint: disable=E0702
        # Pylint seems to miss the "is None" check below
        if self.__exception is None:
            return result
        else:
            raise self.__exception


class FutureResult(object):
    """
    An object to wait for the result of a threaded execution
    """
    def __init__(self, logger=None):
        """
        Sets up the FutureResult object

        :param logger: The Logger to use in case of error (optional)
        """
        self._logger = logger or logging.getLogger(__name__)
        self._done_event = EventData()
        self.__callback = None
        self.__extra = None

    def __notify(self):
        """
        Notify the given callback about the result of the execution
        """
        if self.__callback is not None:
            try:
                self.__callback(self._done_event.data,
                                self._done_event.exception,
                                self.__extra)
            except Exception as ex:
                self._logger.exception("Error calling back method: %s", ex)

    def set_callback(self, method, extra=None):
        """
        Sets a callback method, called once the result has been computed or in
        case of exception.

        The callback method must have the following signature:
        ``callback(result, exception, extra)``.

        :param method: The method to call back in the end of the execution
        :param extra: Extra parameter to be given to the callback method
        """
        self.__callback = method
        self.__extra = extra
        if self._done_event.is_set():
            # The execution has already finished
            self.__notify()

    def execute(self, method, args, kwargs):
        """
        Execute the given method and stores its result.
        The result is considered "done" even if the method raises an exception

        :param method: The method to execute
        :param args: Method positional arguments
        :param kwargs: Method keyword arguments
        :raise Exception: The exception raised by the method
        """
        # Normalize arguments
        if args is None:
            args = []

        if kwargs is None:
            kwargs = {}

        try:
            # Call the method
            result = method(*args, **kwargs)
        except Exception as ex:
            # Something went wrong: propagate to the event and to the caller
            self._done_event.raise_exception(ex)
            raise
        else:
            # Store the result
            self._done_event.set(result)
        finally:
            # In any case: notify the call back (if any)
            self.__notify()

    def done(self):
        """
        Returns True if the job has finished, else False
        """
        return self._done_event.is_set()

    def result(self, timeout=None):
        """
        Waits up to timeout for the result the threaded job.
        Returns immediately the result if the job has already been done.

        :param timeout: The maximum time to wait for a result (in seconds)
        :raise OSError: The timeout raised before the job finished
        :raise Exception: The exception encountered during the call, if any
        """
        if self._done_event.wait(timeout):
            return self._done_event.data
        else:
            raise OSError("Timeout raised")

# ------------------------------------------------------------------------------


class ThreadPool(object):
    """
    Executes the tasks stored in a FIFO in a thread pool
    """
    def __init__(self, max_threads, min_threads=1, queue_size=0, timeout=60,
                 logname=None):
        """
        Sets up the thread pool.

        Threads are kept alive 60 seconds (timeout argument).

        :param max_threads: Maximum size of the thread pool
        :param min_threads: Minimum size of the thread pool
        :param queue_size: Size of the task queue (0 for infinite)
        :param timeout: Queue timeout (in seconds, 60s by default)
        :param logname: Name of the logger
        :raise ValueError: Invalid number of threads
        """
        # Validate parameters
        try:
            max_threads = int(max_threads)
            if max_threads < 1:
                raise ValueError("Pool size must be greater than 0")
        except (TypeError, ValueError) as ex:
            raise ValueError("Invalid pool size: {0}".format(ex))

        try:
            min_threads = int(min_threads)
            if min_threads < 0:
                min_threads = 0
            elif min_threads > max_threads:
                min_threads = max_threads
        except (TypeError, ValueError) as ex:
            raise ValueError("Invalid pool size: {0}".format(ex))

        # The logger
        self._logger = logging.getLogger(logname or __name__)

        # The loop control event
        self._done_event = threading.Event()
        self._done_event.set()

        # The task queue
        try:
            queue_size = int(queue_size)
        except (TypeError, ValueError):
            # Not a valid integer
            queue_size = 0

        self._queue = queue.Queue(queue_size)
        self._timeout = timeout
        self.__lock = threading.RLock()

        # The thread pool
        self._min_threads = min_threads
        self._max_threads = max_threads
        self._threads = []

        # Thread count
        self._thread_id = 0

        # Current number of threads, active and alive
        self.__nb_threads = 0
        self.__nb_active_threads = 0

    def start(self):
        """
        Starts the thread pool. Does nothing if the pool is already started.
        """
        if not self._done_event.is_set():
            # Stop event not set: we're running
            return

        # Clear the stop event
        self._done_event.clear()

        # Compute the number of threads to start to handle pending tasks
        nb_pending_tasks = self._queue.qsize()
        if nb_pending_tasks > self._max_threads:
            nb_threads = self._max_threads
        elif nb_pending_tasks < self._min_threads:
            nb_threads = self._min_threads
        else:
            nb_threads = nb_pending_tasks

        # Create the threads
        for _ in range(nb_threads):
            self.__start_thread()

    def __start_thread(self):
        """
        Starts a new thread, if possible
        """
        with self.__lock:
            if self.__nb_threads >= self._max_threads:
                # Can't create more threads
                return False

            if self._done_event.is_set():
                # We're stopped: do nothing
                return False

            # Prepare thread and start it
            name = "{0}-{1}".format(self._logger.name, self._thread_id)
            self._thread_id += 1

            thread = threading.Thread(target=self.__run, name=name)
            thread.daemon = True
            self._threads.append(thread)
            thread.start()
            return True

    def stop(self):
        """
        Stops the thread pool. Does nothing if the pool is already stopped.
        """
        if self._done_event.is_set():
            # Stop event set: we're stopped
            return

        # Set the stop event
        self._done_event.set()

        with self.__lock:
            # Add something in the queue (to unlock the join())
            try:
                for _ in self._threads:
                    self._queue.put(self._done_event, True, self._timeout)
            except queue.Full:
                # There is already something in the queue
                pass

            # Copy the list of threads to wait for
            threads = self._threads[:]

        # Join threads outside the lock
        for thread in threads:
            while thread.is_alive():
                # Wait 3 seconds
                thread.join(3)
                if thread.is_alive():
                    # Thread is still alive: something might be wrong
                    self._logger.warning("Thread %s is still alive...",
                                         thread.name)

        # Clear storage
        del self._threads[:]
        self.clear()

    def enqueue(self, method, *args, **kwargs):
        """
        Queues a task in the pool

        :param method: Method to call
        :return: A FutureResult object, to get the result of the task
        :raise ValueError: Invalid method
        :raise Full: The task queue is full
        """
        if not hasattr(method, '__call__'):
            raise ValueError("{0} has no __call__ member."
                             .format(method.__name__))

        # Prepare the future result object
        future = FutureResult(self._logger)

        # Use a lock, as we might be "resetting" the queue
        with self.__lock:
            # Add the task to the queue
            self._queue.put((method, args, kwargs, future), True,
                            self._timeout)

            if self.__nb_active_threads == self.__nb_threads:
                # All threads are taken: start a new one
                self.__start_thread()

        return future

    def clear(self):
        """
        Empties the current queue content.
        Returns once the queue have been emptied.
        """
        with self.__lock:
            # Empty the current queue
            try:
                while True:
                    self._queue.get_nowait()
                    self._queue.task_done()
            except queue.Empty:
                # Queue is now empty
                pass

            # Wait for the tasks currently executed
            self.join()

    def join(self, timeout=None):
        """
        Waits for all the tasks to be executed

        :param timeout: Maximum time to wait (in seconds)
        :return: True if the queue has been emptied, else False
        """
        if self._queue.empty():
            # Nothing to wait for...
            return True
        elif timeout is None:
            # Use the original join
            self._queue.join()
            return True
        else:
            # Wait for the condition
            with self._queue.all_tasks_done:
                self._queue.all_tasks_done.wait(timeout)
                return not bool(self._queue.unfinished_tasks)

    def __run(self):
        """
        The main loop
        """
        with self.__lock:
            self.__nb_threads += 1

        while not self._done_event.is_set():
            try:
                # Wait for an action (blocking)
                task = self._queue.get(True, self._timeout)
                if task is self._done_event:
                    # Stop event in the queue: get out
                    self._queue.task_done()
                    with self.__lock:
                        self.__nb_threads -= 1
                        return
            except queue.Empty:
                # Nothing to do yet
                pass
            else:
                with self.__lock:
                    self.__nb_active_threads += 1

                # Extract elements
                method, args, kwargs, future = task
                try:
                    # Call the method
                    future.execute(method, args, kwargs)
                except Exception as ex:
                    self._logger.exception("Error executing %s: %s",
                                           method.__name__, ex)
                finally:
                    # Mark the action as executed
                    self._queue.task_done()

                    # Thread is not active anymore
                    self.__nb_active_threads -= 1

            # Clean up thread if necessary
            with self.__lock:
                if self.__nb_threads > self._min_threads:
                    # No more work for this thread, and we're above the
                    # minimum number of threads: stop this one
                    self.__nb_threads -= 1
                    return

        with self.__lock:
            # Thread stops
            self.__nb_threads -= 1