#!/usr/bin/env python
# -- Content-Encoding: UTF-8 --
Cached thread pool, inspired from Pelix/iPOPO Thread Pool

:author: Thomas Calmant
:copyright: Copyright 2015, isandlaTech
:license: Apache License 2.0
:version: 0.2.5


# Documentation strings format
__docformat__ = "restructuredtext en"

# Module version
__version_info__ = (0, 2, 5)
__version__ = ".".join(str(x) for x in __version_info__)

# ------------------------------------------------------------------------------

# Standard library
import logging
import threading

    # Python 3
    # pylint: disable=F0401
    import queue
except ImportError:
    # Python 2
    # pylint: disable=F0401
    import Queue as queue

# ------------------------------------------------------------------------------

class EventData(object):
    A threading event with some associated data
    def __init__(self):
        Sets up the event
        self.__event = threading.Event()
        self.__data = None
        self.__exception = None

    def data(self):
        Returns the associated value
        return self.__data

    def exception(self):
        Returns the exception used to stop the wait() method
        return self.__exception

    def clear(self):
        Clears the event
        self.__data = None
        self.__exception = None

    def is_set(self):
        Checks if the event is set
        return self.__event.is_set()

    def set(self, data=None):
        Sets the event
        self.__data = data
        self.__exception = None

    def raise_exception(self, exception):
        Raises an exception in wait()

        :param exception: An Exception object
        self.__data = None
        self.__exception = exception

    def wait(self, timeout=None):
        Waits for the event or for the timeout

        :param timeout: Wait timeout (in seconds)
        :return: True if the event as been set, else False
        # The 'or' part is for Python 2.6
        result = self.__event.wait(timeout) or self.__event.is_set()
        # pylint: disable=E0702
        # Pylint seems to miss the "is None" check below
        if self.__exception is None:
            return result
            raise self.__exception

class FutureResult(object):
    An object to wait for the result of a threaded execution
    def __init__(self, logger=None):
        Sets up the FutureResult object

        :param logger: The Logger to use in case of error (optional)
        self._logger = logger or logging.getLogger(__name__)
        self._done_event = EventData()
        self.__callback = None
        self.__extra = None

    def __notify(self):
        Notify the given callback about the result of the execution
        if self.__callback is not None:
            except Exception as ex:
                self._logger.exception("Error calling back method: %s", ex)

    def set_callback(self, method, extra=None):
        Sets a callback method, called once the result has been computed or in
        case of exception.

        The callback method must have the following signature:
        ``callback(result, exception, extra)``.

        :param method: The method to call back in the end of the execution
        :param extra: Extra parameter to be given to the callback method
        self.__callback = method
        self.__extra = extra
        if self._done_event.is_set():
            # The execution has already finished

    def execute(self, method, args, kwargs):
        Execute the given method and stores its result.
        The result is considered "done" even if the method raises an exception

        :param method: The method to execute
        :param args: Method positional arguments
        :param kwargs: Method keyword arguments
        :raise Exception: The exception raised by the method
        # Normalize arguments
        if args is None:
            args = []

        if kwargs is None:
            kwargs = {}

            # Call the method
            result = method(*args, **kwargs)
        except Exception as ex:
            # Something went wrong: propagate to the event and to the caller
            # Store the result
            # In any case: notify the call back (if any)

    def done(self):
        Returns True if the job has finished, else False
        return self._done_event.is_set()

    def result(self, timeout=None):
        Waits up to timeout for the result the threaded job.
        Returns immediately the result if the job has already been done.

        :param timeout: The maximum time to wait for a result (in seconds)
        :raise OSError: The timeout raised before the job finished
        :raise Exception: The exception encountered during the call, if any
        if self._done_event.wait(timeout):
            return self._done_event.data
            raise OSError("Timeout raised")

# ------------------------------------------------------------------------------

class ThreadPool(object):
    Executes the tasks stored in a FIFO in a thread pool
    def __init__(self, max_threads, min_threads=1, queue_size=0, timeout=60,
        Sets up the thread pool.

        Threads are kept alive 60 seconds (timeout argument).

        :param max_threads: Maximum size of the thread pool
        :param min_threads: Minimum size of the thread pool
        :param queue_size: Size of the task queue (0 for infinite)
        :param timeout: Queue timeout (in seconds, 60s by default)
        :param logname: Name of the logger
        :raise ValueError: Invalid number of threads
        # Validate parameters
            max_threads = int(max_threads)
            if max_threads < 1:
                raise ValueError("Pool size must be greater than 0")
        except (TypeError, ValueError) as ex:
            raise ValueError("Invalid pool size: {0}".format(ex))

            min_threads = int(min_threads)
            if min_threads < 0:
                min_threads = 0
            elif min_threads > max_threads:
                min_threads = max_threads
        except (TypeError, ValueError) as ex:
            raise ValueError("Invalid pool size: {0}".format(ex))

        # The logger
        self._logger = logging.getLogger(logname or __name__)

        # The loop control event
        self._done_event = threading.Event()

        # The task queue
            queue_size = int(queue_size)
        except (TypeError, ValueError):
            # Not a valid integer
            queue_size = 0

        self._queue = queue.Queue(queue_size)
        self._timeout = timeout
        self.__lock = threading.RLock()

        # The thread pool
        self._min_threads = min_threads
        self._max_threads = max_threads
        self._threads = []

        # Thread count
        self._thread_id = 0

        # Current number of threads, active and alive
        self.__nb_threads = 0
        self.__nb_active_threads = 0

    def start(self):
        Starts the thread pool. Does nothing if the pool is already started.
        if not self._done_event.is_set():
            # Stop event not set: we're running

        # Clear the stop event

        # Compute the number of threads to start to handle pending tasks
        nb_pending_tasks = self._queue.qsize()
        if nb_pending_tasks > self._max_threads:
            nb_threads = self._max_threads
        elif nb_pending_tasks < self._min_threads:
            nb_threads = self._min_threads
            nb_threads = nb_pending_tasks

        # Create the threads
        for _ in range(nb_threads):

    def __start_thread(self):
        Starts a new thread, if possible
        with self.__lock:
            if self.__nb_threads >= self._max_threads:
                # Can't create more threads
                return False

            if self._done_event.is_set():
                # We're stopped: do nothing
                return False

            # Prepare thread and start it
            name = "{0}-{1}".format(self._logger.name, self._thread_id)
            self._thread_id += 1

            thread = threading.Thread(target=self.__run, name=name)
            thread.daemon = True
            return True

    def stop(self):
        Stops the thread pool. Does nothing if the pool is already stopped.
        if self._done_event.is_set():
            # Stop event set: we're stopped

        # Set the stop event

        with self.__lock:
            # Add something in the queue (to unlock the join())
                for _ in self._threads:
                    self._queue.put(self._done_event, True, self._timeout)
            except queue.Full:
                # There is already something in the queue

            # Copy the list of threads to wait for
            threads = self._threads[:]

        # Join threads outside the lock
        for thread in threads:
            while thread.is_alive():
                # Wait 3 seconds
                if thread.is_alive():
                    # Thread is still alive: something might be wrong
                    self._logger.warning("Thread %s is still alive...",

        # Clear storage
        del self._threads[:]

    def enqueue(self, method, *args, **kwargs):
        Queues a task in the pool

        :param method: Method to call
        :return: A FutureResult object, to get the result of the task
        :raise ValueError: Invalid method
        :raise Full: The task queue is full
        if not hasattr(method, '__call__'):
            raise ValueError("{0} has no __call__ member."

        # Prepare the future result object
        future = FutureResult(self._logger)

        # Use a lock, as we might be "resetting" the queue
        with self.__lock:
            # Add the task to the queue
            self._queue.put((method, args, kwargs, future), True,

            if self.__nb_active_threads == self.__nb_threads:
                # All threads are taken: start a new one

        return future

    def clear(self):
        Empties the current queue content.
        Returns once the queue have been emptied.
        with self.__lock:
            # Empty the current queue
                while True:
            except queue.Empty:
                # Queue is now empty

            # Wait for the tasks currently executed

    def join(self, timeout=None):
        Waits for all the tasks to be executed

        :param timeout: Maximum time to wait (in seconds)
        :return: True if the queue has been emptied, else False
        if self._queue.empty():
            # Nothing to wait for...
            return True
        elif timeout is None:
            # Use the original join
            return True
            # Wait for the condition
            with self._queue.all_tasks_done:
                return not bool(self._queue.unfinished_tasks)

    def __run(self):
        The main loop
        with self.__lock:
            self.__nb_threads += 1

        while not self._done_event.is_set():
                # Wait for an action (blocking)
                task = self._queue.get(True, self._timeout)
                if task is self._done_event:
                    # Stop event in the queue: get out
                    with self.__lock:
                        self.__nb_threads -= 1
            except queue.Empty:
                # Nothing to do yet
                with self.__lock:
                    self.__nb_active_threads += 1

                # Extract elements
                method, args, kwargs, future = task
                    # Call the method
                    future.execute(method, args, kwargs)
                except Exception as ex:
                    self._logger.exception("Error executing %s: %s",
                                           method.__name__, ex)
                    # Mark the action as executed

                    # Thread is not active anymore
                    self.__nb_active_threads -= 1

            # Clean up thread if necessary
            with self.__lock:
                if self.__nb_threads > self._min_threads:
                    # No more work for this thread, and we're above the
                    # minimum number of threads: stop this one
                    self.__nb_threads -= 1

        with self.__lock:
            # Thread stops
            self.__nb_threads -= 1