summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py')
-rw-r--r--scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py472
1 files changed, 472 insertions, 0 deletions
diff --git a/scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py b/scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py
new file mode 100644
index 00000000..9b636ec8
--- /dev/null
+++ b/scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py
@@ -0,0 +1,472 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_pidfile.py
+# Part of ‘python-daemon’, an implementation of PEP 3143.
+#
+# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Apache License, version 2.0 as published by the
+# Apache Software Foundation.
+# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
+
+""" Unit test for ‘pidfile’ module.
+ """
+
+from __future__ import (absolute_import, unicode_literals)
+
+try:
+ # Python 3 standard library.
+ import builtins
+except ImportError:
+ # Python 2 standard library.
+ import __builtin__ as builtins
+import os
+import itertools
+import tempfile
+import errno
+import functools
+try:
+ # Standard library of Python 2.7 and later.
+ from io import StringIO
+except ImportError:
+ # Standard library of Python 2.6 and earlier.
+ from StringIO import StringIO
+
+import mock
+import lockfile
+
+from . import scaffold
+
+import daemon.pidfile
+
+
+class FakeFileDescriptorStringIO(StringIO, object):
+ """ A StringIO class that fakes a file descriptor. """
+
+ _fileno_generator = itertools.count()
+
+ def __init__(self, *args, **kwargs):
+ self._fileno = next(self._fileno_generator)
+ super(FakeFileDescriptorStringIO, self).__init__(*args, **kwargs)
+
+ def fileno(self):
+ return self._fileno
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+
+try:
+ FileNotFoundError
+ PermissionError
+except NameError:
+ # Python 2 uses IOError.
+ FileNotFoundError = functools.partial(IOError, errno.ENOENT)
+ PermissionError = functools.partial(IOError, errno.EPERM)
+
+
+def make_pidlockfile_scenarios():
+ """ Make a collection of scenarios for testing `PIDLockFile` instances.
+
+ :return: A collection of scenarios for tests involving
+ `PIDLockfFile` instances.
+
+ The collection is a mapping from scenario name to a dictionary of
+ scenario attributes.
+
+ """
+
+ fake_current_pid = 235
+ fake_other_pid = 8642
+ fake_pidfile_path = tempfile.mktemp()
+
+ fake_pidfile_empty = FakeFileDescriptorStringIO()
+ fake_pidfile_current_pid = FakeFileDescriptorStringIO(
+ "{pid:d}\n".format(pid=fake_current_pid))
+ fake_pidfile_other_pid = FakeFileDescriptorStringIO(
+ "{pid:d}\n".format(pid=fake_other_pid))
+ fake_pidfile_bogus = FakeFileDescriptorStringIO(
+ "b0gUs")
+
+ scenarios = {
+ 'simple': {},
+ 'not-exist': {
+ 'open_func_name': 'fake_open_nonexist',
+ 'os_open_func_name': 'fake_os_open_nonexist',
+ },
+ 'not-exist-write-denied': {
+ 'open_func_name': 'fake_open_nonexist',
+ 'os_open_func_name': 'fake_os_open_nonexist',
+ },
+ 'not-exist-write-busy': {
+ 'open_func_name': 'fake_open_nonexist',
+ 'os_open_func_name': 'fake_os_open_nonexist',
+ },
+ 'exist-read-denied': {
+ 'open_func_name': 'fake_open_read_denied',
+ 'os_open_func_name': 'fake_os_open_read_denied',
+ },
+ 'exist-locked-read-denied': {
+ 'locking_pid': fake_other_pid,
+ 'open_func_name': 'fake_open_read_denied',
+ 'os_open_func_name': 'fake_os_open_read_denied',
+ },
+ 'exist-empty': {},
+ 'exist-invalid': {
+ 'pidfile': fake_pidfile_bogus,
+ },
+ 'exist-current-pid': {
+ 'pidfile': fake_pidfile_current_pid,
+ 'pidfile_pid': fake_current_pid,
+ },
+ 'exist-current-pid-locked': {
+ 'pidfile': fake_pidfile_current_pid,
+ 'pidfile_pid': fake_current_pid,
+ 'locking_pid': fake_current_pid,
+ },
+ 'exist-other-pid': {
+ 'pidfile': fake_pidfile_other_pid,
+ 'pidfile_pid': fake_other_pid,
+ },
+ 'exist-other-pid-locked': {
+ 'pidfile': fake_pidfile_other_pid,
+ 'pidfile_pid': fake_other_pid,
+ 'locking_pid': fake_other_pid,
+ },
+ }
+
+ for scenario in scenarios.values():
+ scenario['pid'] = fake_current_pid
+ scenario['pidfile_path'] = fake_pidfile_path
+ if 'pidfile' not in scenario:
+ scenario['pidfile'] = fake_pidfile_empty
+ if 'pidfile_pid' not in scenario:
+ scenario['pidfile_pid'] = None
+ if 'locking_pid' not in scenario:
+ scenario['locking_pid'] = None
+ if 'open_func_name' not in scenario:
+ scenario['open_func_name'] = 'fake_open_okay'
+ if 'os_open_func_name' not in scenario:
+ scenario['os_open_func_name'] = 'fake_os_open_okay'
+
+ return scenarios
+
+
+def setup_pidfile_fixtures(testcase):
+ """ Set up common fixtures for PID file test cases.
+
+ :param testcase: A `TestCase` instance to decorate.
+
+ Decorate the `testcase` with attributes to be fixtures for tests
+ involving `PIDLockFile` instances.
+
+ """
+ scenarios = make_pidlockfile_scenarios()
+ testcase.pidlockfile_scenarios = scenarios
+
+ def get_scenario_option(testcase, key, default=None):
+ value = default
+ try:
+ value = testcase.scenario[key]
+ except (NameError, TypeError, AttributeError, KeyError):
+ pass
+ return value
+
+ func_patcher_os_getpid = mock.patch.object(
+ os, "getpid",
+ return_value=scenarios['simple']['pid'])
+ func_patcher_os_getpid.start()
+ testcase.addCleanup(func_patcher_os_getpid.stop)
+
+ def make_fake_open_funcs(testcase):
+
+ def fake_open_nonexist(filename, mode, buffering):
+ if mode.startswith('r'):
+ error = FileNotFoundError(
+ "No such file {filename!r}".format(
+ filename=filename))
+ raise error
+ else:
+ result = testcase.scenario['pidfile']
+ return result
+
+ def fake_open_read_denied(filename, mode, buffering):
+ if mode.startswith('r'):
+ error = PermissionError(
+ "Read denied on {filename!r}".format(
+ filename=filename))
+ raise error
+ else:
+ result = testcase.scenario['pidfile']
+ return result
+
+ def fake_open_okay(filename, mode, buffering):
+ result = testcase.scenario['pidfile']
+ return result
+
+ def fake_os_open_nonexist(filename, flags, mode):
+ if (flags & os.O_CREAT):
+ result = testcase.scenario['pidfile'].fileno()
+ else:
+ error = FileNotFoundError(
+ "No such file {filename!r}".format(
+ filename=filename))
+ raise error
+ return result
+
+ def fake_os_open_read_denied(filename, flags, mode):
+ if (flags & os.O_CREAT):
+ result = testcase.scenario['pidfile'].fileno()
+ else:
+ error = PermissionError(
+ "Read denied on {filename!r}".format(
+ filename=filename))
+ raise error
+ return result
+
+ def fake_os_open_okay(filename, flags, mode):
+ result = testcase.scenario['pidfile'].fileno()
+ return result
+
+ funcs = dict(
+ (name, obj) for (name, obj) in vars().items()
+ if callable(obj))
+
+ return funcs
+
+ testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase)
+
+ def fake_open(filename, mode='rt', buffering=None):
+ scenario_path = get_scenario_option(testcase, 'pidfile_path')
+ if filename == scenario_path:
+ func_name = testcase.scenario['open_func_name']
+ fake_open_func = testcase.fake_pidfile_open_funcs[func_name]
+ result = fake_open_func(filename, mode, buffering)
+ else:
+ result = FakeFileDescriptorStringIO()
+ return result
+
+ mock_open = mock.mock_open()
+ mock_open.side_effect = fake_open
+
+ func_patcher_builtin_open = mock.patch.object(
+ builtins, "open",
+ new=mock_open)
+ func_patcher_builtin_open.start()
+ testcase.addCleanup(func_patcher_builtin_open.stop)
+
+ def fake_os_open(filename, flags, mode=None):
+ scenario_path = get_scenario_option(testcase, 'pidfile_path')
+ if filename == scenario_path:
+ func_name = testcase.scenario['os_open_func_name']
+ fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name]
+ result = fake_os_open_func(filename, flags, mode)
+ else:
+ result = FakeFileDescriptorStringIO().fileno()
+ return result
+
+ mock_os_open = mock.MagicMock(side_effect=fake_os_open)
+
+ func_patcher_os_open = mock.patch.object(
+ os, "open",
+ new=mock_os_open)
+ func_patcher_os_open.start()
+ testcase.addCleanup(func_patcher_os_open.stop)
+
+ def fake_os_fdopen(fd, mode='rt', buffering=None):
+ scenario_pidfile = get_scenario_option(
+ testcase, 'pidfile', FakeFileDescriptorStringIO())
+ if fd == testcase.scenario['pidfile'].fileno():
+ result = testcase.scenario['pidfile']
+ else:
+ raise OSError(errno.EBADF, "Bad file descriptor")
+ return result
+
+ mock_os_fdopen = mock.MagicMock(side_effect=fake_os_fdopen)
+
+ func_patcher_os_fdopen = mock.patch.object(
+ os, "fdopen",
+ new=mock_os_fdopen)
+ func_patcher_os_fdopen.start()
+ testcase.addCleanup(func_patcher_os_fdopen.stop)
+
+
+def make_lockfile_method_fakes(scenario):
+ """ Make common fake methods for lockfile class.
+
+ :param scenario: A scenario for testing with PIDLockFile.
+ :return: A mapping from normal function name to the corresponding
+ fake function.
+
+ Each fake function behaves appropriately for the specified `scenario`.
+
+ """
+
+ def fake_func_read_pid():
+ return scenario['pidfile_pid']
+ def fake_func_is_locked():
+ return (scenario['locking_pid'] is not None)
+ def fake_func_i_am_locking():
+ return (
+ scenario['locking_pid'] == scenario['pid'])
+ def fake_func_acquire(timeout=None):
+ if scenario['locking_pid'] is not None:
+ raise lockfile.AlreadyLocked()
+ scenario['locking_pid'] = scenario['pid']
+ def fake_func_release():
+ if scenario['locking_pid'] is None:
+ raise lockfile.NotLocked()
+ if scenario['locking_pid'] != scenario['pid']:
+ raise lockfile.NotMyLock()
+ scenario['locking_pid'] = None
+ def fake_func_break_lock():
+ scenario['locking_pid'] = None
+
+ fake_methods = dict(
+ (
+ func_name.replace('fake_func_', ''),
+ mock.MagicMock(side_effect=fake_func))
+ for (func_name, fake_func) in vars().items()
+ if func_name.startswith('fake_func_'))
+
+ return fake_methods
+
+
+def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario):
+ """ Apply common fake methods to mock lockfile class.
+
+ :param mock_lockfile: An object providing the `LockFile` interface.
+ :param testcase: The `TestCase` instance providing the context for
+ the patch.
+ :param scenario: The `PIDLockFile` test scenario to use.
+
+ Mock the `LockFile` methods of `mock_lockfile`, by applying fake
+ methods customised for `scenario`. The mock is does by a patch
+ within the context of `testcase`.
+
+ """
+ fake_methods = dict(
+ (func_name, fake_func)
+ for (func_name, fake_func) in
+ make_lockfile_method_fakes(scenario).items()
+ if func_name not in ['read_pid'])
+
+ for (func_name, fake_func) in fake_methods.items():
+ func_patcher = mock.patch.object(
+ mock_lockfile, func_name,
+ new=fake_func)
+ func_patcher.start()
+ testcase.addCleanup(func_patcher.stop)
+
+
+def setup_pidlockfile_fixtures(testcase, scenario_name=None):
+ """ Set up common fixtures for PIDLockFile test cases.
+
+ :param testcase: The `TestCase` instance to decorate.
+ :param scenario_name: The name of the `PIDLockFile` scenario to use.
+
+ Decorate the `testcase` with attributes that are fixtures for test
+ cases involving `PIDLockFile` instances.`
+
+ """
+
+ setup_pidfile_fixtures(testcase)
+
+ for func_name in [
+ 'write_pid_to_pidfile',
+ 'remove_existing_pidfile',
+ ]:
+ func_patcher = mock.patch.object(lockfile.pidlockfile, func_name)
+ func_patcher.start()
+ testcase.addCleanup(func_patcher.stop)
+
+
+class TimeoutPIDLockFile_TestCase(scaffold.TestCase):
+ """ Test cases for ‘TimeoutPIDLockFile’ class. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(TimeoutPIDLockFile_TestCase, self).setUp()
+
+ pidlockfile_scenarios = make_pidlockfile_scenarios()
+ self.pidlockfile_scenario = pidlockfile_scenarios['simple']
+ pidfile_path = self.pidlockfile_scenario['pidfile_path']
+
+ for func_name in ['__init__', 'acquire']:
+ func_patcher = mock.patch.object(
+ lockfile.pidlockfile.PIDLockFile, func_name)
+ func_patcher.start()
+ self.addCleanup(func_patcher.stop)
+
+ self.scenario = {
+ 'pidfile_path': self.pidlockfile_scenario['pidfile_path'],
+ 'acquire_timeout': self.getUniqueInteger(),
+ }
+
+ self.test_kwargs = dict(
+ path=self.scenario['pidfile_path'],
+ acquire_timeout=self.scenario['acquire_timeout'],
+ )
+ self.test_instance = daemon.pidfile.TimeoutPIDLockFile(
+ **self.test_kwargs)
+
+ def test_inherits_from_pidlockfile(self):
+ """ Should inherit from PIDLockFile. """
+ instance = self.test_instance
+ self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile)
+
+ def test_init_has_expected_signature(self):
+ """ Should have expected signature for ‘__init__’. """
+ def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass
+ test_func.__name__ = str('__init__')
+ self.assertFunctionSignatureMatch(
+ test_func,
+ daemon.pidfile.TimeoutPIDLockFile.__init__)
+
+ def test_has_specified_acquire_timeout(self):
+ """ Should have specified ‘acquire_timeout’ value. """
+ instance = self.test_instance
+ expected_timeout = self.test_kwargs['acquire_timeout']
+ self.assertEqual(expected_timeout, instance.acquire_timeout)
+
+ @mock.patch.object(
+ lockfile.pidlockfile.PIDLockFile, "__init__",
+ autospec=True)
+ def test_calls_superclass_init(self, mock_init):
+ """ Should call the superclass ‘__init__’. """
+ expected_path = self.test_kwargs['path']
+ instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs)
+ mock_init.assert_called_with(instance, expected_path)
+
+ @mock.patch.object(
+ lockfile.pidlockfile.PIDLockFile, "acquire",
+ autospec=True)
+ def test_acquire_uses_specified_timeout(self, mock_func_acquire):
+ """ Should call the superclass ‘acquire’ with specified timeout. """
+ instance = self.test_instance
+ test_timeout = self.getUniqueInteger()
+ expected_timeout = test_timeout
+ instance.acquire(test_timeout)
+ mock_func_acquire.assert_called_with(instance, expected_timeout)
+
+ @mock.patch.object(
+ lockfile.pidlockfile.PIDLockFile, "acquire",
+ autospec=True)
+ def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire):
+ """ Should call superclass ‘acquire’ with stored timeout by default. """
+ instance = self.test_instance
+ test_timeout = self.test_kwargs['acquire_timeout']
+ expected_timeout = test_timeout
+ instance.acquire()
+ mock_func_acquire.assert_called_with(instance, expected_timeout)
+
+
+# Local variables:
+# coding: utf-8
+# mode: python
+# End:
+# vim: fileencoding=utf-8 filetype=python :