summaryrefslogtreecommitdiffstats
path: root/external_libs/python/python-daemon-2.0.5/test
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/python-daemon-2.0.5/test')
-rw-r--r--external_libs/python/python-daemon-2.0.5/test/__init__.py23
-rw-r--r--external_libs/python/python-daemon-2.0.5/test/scaffold.py322
-rw-r--r--external_libs/python/python-daemon-2.0.5/test/test_daemon.py1744
-rw-r--r--external_libs/python/python-daemon-2.0.5/test/test_metadata.py380
-rw-r--r--external_libs/python/python-daemon-2.0.5/test/test_pidfile.py472
-rw-r--r--external_libs/python/python-daemon-2.0.5/test/test_runner.py675
6 files changed, 0 insertions, 3616 deletions
diff --git a/external_libs/python/python-daemon-2.0.5/test/__init__.py b/external_libs/python/python-daemon-2.0.5/test/__init__.py
deleted file mode 100644
index 398519f1..00000000
--- a/external_libs/python/python-daemon-2.0.5/test/__init__.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# test/__init__.py
-# Part of ‘python-daemon’, an implementation of PEP 3143.
-#
-# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
-#
-# This is free software: you may copy, modify, and/or distribute this work
-# under the terms of the Apache License, version 2.0 as published by the
-# Apache Software Foundation.
-# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
-
-""" Unit test suite for ‘daemon’ package.
- """
-
-from __future__ import (absolute_import, unicode_literals)
-
-
-# Local variables:
-# coding: utf-8
-# mode: python
-# End:
-# vim: fileencoding=utf-8 filetype=python :
diff --git a/external_libs/python/python-daemon-2.0.5/test/scaffold.py b/external_libs/python/python-daemon-2.0.5/test/scaffold.py
deleted file mode 100644
index 9a4f1150..00000000
--- a/external_libs/python/python-daemon-2.0.5/test/scaffold.py
+++ /dev/null
@@ -1,322 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# test/scaffold.py
-# Part of ‘python-daemon’, an implementation of PEP 3143.
-#
-# Copyright © 2007–2015 Ben Finney <ben+python@benfinney.id.au>
-#
-# This is free software: you may copy, modify, and/or distribute this work
-# under the terms of the Apache License, version 2.0 as published by the
-# Apache Software Foundation.
-# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
-
-""" Scaffolding for unit test modules.
- """
-
-from __future__ import (absolute_import, unicode_literals)
-
-import unittest
-import doctest
-import logging
-import os
-import sys
-import operator
-import textwrap
-from copy import deepcopy
-import functools
-
-try:
- # Python 2 has both ‘str’ (bytes) and ‘unicode’ (text).
- basestring = basestring
- unicode = unicode
-except NameError:
- # Python 3 names the Unicode data type ‘str’.
- basestring = str
- unicode = str
-
-import testscenarios
-import testtools.testcase
-
-
-test_dir = os.path.dirname(os.path.abspath(__file__))
-parent_dir = os.path.dirname(test_dir)
-if not test_dir in sys.path:
- sys.path.insert(1, test_dir)
-if not parent_dir in sys.path:
- sys.path.insert(1, parent_dir)
-
-# Disable all but the most critical logging messages.
-logging.disable(logging.CRITICAL)
-
-
-def get_function_signature(func):
- """ Get the function signature as a mapping of attributes.
-
- :param func: The function object to interrogate.
- :return: A mapping of the components of a function signature.
-
- The signature is constructed as a mapping:
-
- * 'name': The function's defined name.
- * 'arg_count': The number of arguments expected by the function.
- * 'arg_names': A sequence of the argument names, as strings.
- * 'arg_defaults': A sequence of the default values for the arguments.
- * 'va_args': The name bound to remaining positional arguments.
- * 'va_kw_args': The name bound to remaining keyword arguments.
-
- """
- try:
- # Python 3 function attributes.
- func_code = func.__code__
- func_defaults = func.__defaults__
- except AttributeError:
- # Python 2 function attributes.
- func_code = func.func_code
- func_defaults = func.func_defaults
-
- arg_count = func_code.co_argcount
- arg_names = func_code.co_varnames[:arg_count]
-
- arg_defaults = {}
- if func_defaults is not None:
- arg_defaults = dict(
- (name, value)
- for (name, value) in
- zip(arg_names[::-1], func_defaults[::-1]))
-
- signature = {
- 'name': func.__name__,
- 'arg_count': arg_count,
- 'arg_names': arg_names,
- 'arg_defaults': arg_defaults,
- }
-
- non_pos_names = list(func_code.co_varnames[arg_count:])
- COLLECTS_ARBITRARY_POSITIONAL_ARGS = 0x04
- if func_code.co_flags & COLLECTS_ARBITRARY_POSITIONAL_ARGS:
- signature['var_args'] = non_pos_names.pop(0)
- COLLECTS_ARBITRARY_KEYWORD_ARGS = 0x08
- if func_code.co_flags & COLLECTS_ARBITRARY_KEYWORD_ARGS:
- signature['var_kw_args'] = non_pos_names.pop(0)
-
- return signature
-
-
-def format_function_signature(func):
- """ Format the function signature as printable text.
-
- :param func: The function object to interrogate.
- :return: A formatted text representation of the function signature.
-
- The signature is rendered a text; for example::
-
- foo(spam, eggs, ham=True, beans=None, *args, **kwargs)
-
- """
- signature = get_function_signature(func)
-
- args_text = []
- for arg_name in signature['arg_names']:
- if arg_name in signature['arg_defaults']:
- arg_text = "{name}={value!r}".format(
- name=arg_name, value=signature['arg_defaults'][arg_name])
- else:
- arg_text = "{name}".format(
- name=arg_name)
- args_text.append(arg_text)
- if 'var_args' in signature:
- args_text.append("*{var_args}".format(signature))
- if 'var_kw_args' in signature:
- args_text.append("**{var_kw_args}".format(signature))
- signature_args_text = ", ".join(args_text)
-
- func_name = signature['name']
- signature_text = "{name}({args})".format(
- name=func_name, args=signature_args_text)
-
- return signature_text
-
-
-class TestCase(testtools.testcase.TestCase):
- """ Test case behaviour. """
-
- def failUnlessOutputCheckerMatch(self, want, got, msg=None):
- """ Fail unless the specified string matches the expected.
-
- :param want: The desired output pattern.
- :param got: The actual text to match.
- :param msg: A message to prefix on the failure message.
- :return: ``None``.
- :raises self.failureException: If the text does not match.
-
- Fail the test unless ``want`` matches ``got``, as determined by
- a ``doctest.OutputChecker`` instance. This is not an equality
- check, but a pattern match according to the ``OutputChecker``
- rules.
-
- """
- checker = doctest.OutputChecker()
- want = textwrap.dedent(want)
- source = ""
- example = doctest.Example(source, want)
- got = textwrap.dedent(got)
- checker_optionflags = functools.reduce(operator.or_, [
- doctest.ELLIPSIS,
- ])
- if not checker.check_output(want, got, checker_optionflags):
- if msg is None:
- diff = checker.output_difference(
- example, got, checker_optionflags)
- msg = "\n".join([
- "Output received did not match expected output",
- "{diff}",
- ]).format(
- diff=diff)
- raise self.failureException(msg)
-
- assertOutputCheckerMatch = failUnlessOutputCheckerMatch
-
- def failUnlessFunctionInTraceback(self, traceback, function, msg=None):
- """ Fail if the function is not in the traceback.
-
- :param traceback: The traceback object to interrogate.
- :param function: The function object to match.
- :param msg: A message to prefix on the failure message.
- :return: ``None``.
-
- :raises self.failureException: If the function is not in the
- traceback.
-
- Fail the test if the function ``function`` is not at any of the
- levels in the traceback object ``traceback``.
-
- """
- func_in_traceback = False
- expected_code = function.func_code
- current_traceback = traceback
- while current_traceback is not None:
- if expected_code is current_traceback.tb_frame.f_code:
- func_in_traceback = True
- break
- current_traceback = current_traceback.tb_next
-
- if not func_in_traceback:
- if msg is None:
- msg = (
- "Traceback did not lead to original function"
- " {function}"
- ).format(
- function=function)
- raise self.failureException(msg)
-
- assertFunctionInTraceback = failUnlessFunctionInTraceback
-
- def failUnlessFunctionSignatureMatch(self, first, second, msg=None):
- """ Fail if the function signatures do not match.
-
- :param first: The first function to compare.
- :param second: The second function to compare.
- :param msg: A message to prefix to the failure message.
- :return: ``None``.
-
- :raises self.failureException: If the function signatures do
- not match.
-
- Fail the test if the function signature does not match between
- the ``first`` function and the ``second`` function.
-
- The function signature includes:
-
- * function name,
-
- * count of named parameters,
-
- * sequence of named parameters,
-
- * default values of named parameters,
-
- * collector for arbitrary positional arguments,
-
- * collector for arbitrary keyword arguments.
-
- """
- first_signature = get_function_signature(first)
- second_signature = get_function_signature(second)
-
- if first_signature != second_signature:
- if msg is None:
- first_signature_text = format_function_signature(first)
- second_signature_text = format_function_signature(second)
- msg = (textwrap.dedent("""\
- Function signatures do not match:
- {first!r} != {second!r}
- Expected:
- {first_text}
- Got:
- {second_text}""")
- ).format(
- first=first_signature,
- first_text=first_signature_text,
- second=second_signature,
- second_text=second_signature_text,
- )
- raise self.failureException(msg)
-
- assertFunctionSignatureMatch = failUnlessFunctionSignatureMatch
-
-
-class TestCaseWithScenarios(testscenarios.WithScenarios, TestCase):
- """ Test cases run per scenario. """
-
-
-class Exception_TestCase(TestCaseWithScenarios):
- """ Test cases for exception classes. """
-
- def test_exception_instance(self):
- """ Exception instance should be created. """
- self.assertIsNot(self.instance, None)
-
- def test_exception_types(self):
- """ Exception instance should match expected types. """
- for match_type in self.types:
- self.assertIsInstance(self.instance, match_type)
-
-
-def make_exception_scenarios(scenarios):
- """ Make test scenarios for exception classes.
-
- :param scenarios: Sequence of scenarios.
- :return: List of scenarios with additional mapping entries.
-
- Use this with `testscenarios` to adapt `Exception_TestCase`_ for
- any exceptions that need testing.
-
- Each scenario is a tuple (`name`, `map`) where `map` is a mapping
- of attributes to be applied to each test case. Attributes map must
- contain items for:
-
- :key exc_type:
- The exception type to be tested.
- :key min_args:
- The minimum argument count for the exception instance
- initialiser.
- :key types:
- Sequence of types that should be superclasses of each
- instance of the exception type.
-
- """
- updated_scenarios = deepcopy(scenarios)
- for (name, scenario) in updated_scenarios:
- args = (None,) * scenario['min_args']
- scenario['args'] = args
- instance = scenario['exc_type'](*args)
- scenario['instance'] = instance
-
- return updated_scenarios
-
-
-# Local variables:
-# coding: utf-8
-# mode: python
-# End:
-# vim: fileencoding=utf-8 filetype=python :
diff --git a/external_libs/python/python-daemon-2.0.5/test/test_daemon.py b/external_libs/python/python-daemon-2.0.5/test/test_daemon.py
deleted file mode 100644
index a911858a..00000000
--- a/external_libs/python/python-daemon-2.0.5/test/test_daemon.py
+++ /dev/null
@@ -1,1744 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# test/test_daemon.py
-# Part of ‘python-daemon’, an implementation of PEP 3143.
-#
-# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
-#
-# This is free software: you may copy, modify, and/or distribute this work
-# under the terms of the Apache License, version 2.0 as published by the
-# Apache Software Foundation.
-# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
-
-""" Unit test for ‘daemon’ module.
- """
-
-from __future__ import (absolute_import, unicode_literals)
-
-import os
-import sys
-import tempfile
-import resource
-import errno
-import signal
-import socket
-from types import ModuleType
-import collections
-import functools
-try:
- # Standard library of Python 2.7 and later.
- from io import StringIO
-except ImportError:
- # Standard library of Python 2.6 and earlier.
- from StringIO import StringIO
-
-import mock
-
-from . import scaffold
-from .scaffold import (basestring, unicode)
-from .test_pidfile import (
- FakeFileDescriptorStringIO,
- setup_pidfile_fixtures,
- )
-
-import daemon
-
-
-class ModuleExceptions_TestCase(scaffold.Exception_TestCase):
- """ Test cases for module exception classes. """
-
- scenarios = scaffold.make_exception_scenarios([
- ('daemon.daemon.DaemonError', dict(
- exc_type = daemon.daemon.DaemonError,
- min_args = 1,
- types = [Exception],
- )),
- ('daemon.daemon.DaemonOSEnvironmentError', dict(
- exc_type = daemon.daemon.DaemonOSEnvironmentError,
- min_args = 1,
- types = [daemon.daemon.DaemonError, OSError],
- )),
- ('daemon.daemon.DaemonProcessDetachError', dict(
- exc_type = daemon.daemon.DaemonProcessDetachError,
- min_args = 1,
- types = [daemon.daemon.DaemonError, OSError],
- )),
- ])
-
-
-def setup_daemon_context_fixtures(testcase):
- """ Set up common test fixtures for DaemonContext test case.
-
- :param testcase: A ``TestCase`` instance to decorate.
- :return: ``None``.
-
- Decorate the `testcase` with fixtures for tests involving
- `DaemonContext`.
-
- """
- setup_streams_fixtures(testcase)
-
- setup_pidfile_fixtures(testcase)
-
- testcase.fake_pidfile_path = tempfile.mktemp()
- testcase.mock_pidlockfile = mock.MagicMock()
- testcase.mock_pidlockfile.path = testcase.fake_pidfile_path
-
- testcase.daemon_context_args = dict(
- stdin=testcase.stream_files_by_name['stdin'],
- stdout=testcase.stream_files_by_name['stdout'],
- stderr=testcase.stream_files_by_name['stderr'],
- )
- testcase.test_instance = daemon.DaemonContext(
- **testcase.daemon_context_args)
-
-fake_default_signal_map = object()
-
-@mock.patch.object(
- daemon.daemon, "is_detach_process_context_required",
- new=(lambda: True))
-@mock.patch.object(
- daemon.daemon, "make_default_signal_map",
- new=(lambda: fake_default_signal_map))
-@mock.patch.object(os, "setgid", new=(lambda x: object()))
-@mock.patch.object(os, "setuid", new=(lambda x: object()))
-class DaemonContext_BaseTestCase(scaffold.TestCase):
- """ Base class for DaemonContext test case classes. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonContext_BaseTestCase, self).setUp()
-
- setup_daemon_context_fixtures(self)
-
-
-class DaemonContext_TestCase(DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext class. """
-
- def test_instantiate(self):
- """ New instance of DaemonContext should be created. """
- self.assertIsInstance(
- self.test_instance, daemon.daemon.DaemonContext)
-
- def test_minimum_zero_arguments(self):
- """ Initialiser should not require any arguments. """
- instance = daemon.daemon.DaemonContext()
- self.assertIsNot(instance, None)
-
- def test_has_specified_chroot_directory(self):
- """ Should have specified chroot_directory option. """
- args = dict(
- chroot_directory=object(),
- )
- expected_directory = args['chroot_directory']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_directory, instance.chroot_directory)
-
- def test_has_specified_working_directory(self):
- """ Should have specified working_directory option. """
- args = dict(
- working_directory=object(),
- )
- expected_directory = args['working_directory']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_directory, instance.working_directory)
-
- def test_has_default_working_directory(self):
- """ Should have default working_directory option. """
- args = dict()
- expected_directory = "/"
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_directory, instance.working_directory)
-
- def test_has_specified_creation_mask(self):
- """ Should have specified umask option. """
- args = dict(
- umask=object(),
- )
- expected_mask = args['umask']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_mask, instance.umask)
-
- def test_has_default_creation_mask(self):
- """ Should have default umask option. """
- args = dict()
- expected_mask = 0
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_mask, instance.umask)
-
- def test_has_specified_uid(self):
- """ Should have specified uid option. """
- args = dict(
- uid=object(),
- )
- expected_id = args['uid']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_id, instance.uid)
-
- def test_has_derived_uid(self):
- """ Should have uid option derived from process. """
- args = dict()
- expected_id = os.getuid()
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_id, instance.uid)
-
- def test_has_specified_gid(self):
- """ Should have specified gid option. """
- args = dict(
- gid=object(),
- )
- expected_id = args['gid']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_id, instance.gid)
-
- def test_has_derived_gid(self):
- """ Should have gid option derived from process. """
- args = dict()
- expected_id = os.getgid()
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_id, instance.gid)
-
- def test_has_specified_detach_process(self):
- """ Should have specified detach_process option. """
- args = dict(
- detach_process=object(),
- )
- expected_value = args['detach_process']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_value, instance.detach_process)
-
- def test_has_derived_detach_process(self):
- """ Should have detach_process option derived from environment. """
- args = dict()
- func = daemon.daemon.is_detach_process_context_required
- expected_value = func()
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_value, instance.detach_process)
-
- def test_has_specified_files_preserve(self):
- """ Should have specified files_preserve option. """
- args = dict(
- files_preserve=object(),
- )
- expected_files_preserve = args['files_preserve']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_files_preserve, instance.files_preserve)
-
- def test_has_specified_pidfile(self):
- """ Should have the specified pidfile. """
- args = dict(
- pidfile=object(),
- )
- expected_pidfile = args['pidfile']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_pidfile, instance.pidfile)
-
- def test_has_specified_stdin(self):
- """ Should have specified stdin option. """
- args = dict(
- stdin=object(),
- )
- expected_file = args['stdin']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_file, instance.stdin)
-
- def test_has_specified_stdout(self):
- """ Should have specified stdout option. """
- args = dict(
- stdout=object(),
- )
- expected_file = args['stdout']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_file, instance.stdout)
-
- def test_has_specified_stderr(self):
- """ Should have specified stderr option. """
- args = dict(
- stderr=object(),
- )
- expected_file = args['stderr']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_file, instance.stderr)
-
- def test_has_specified_signal_map(self):
- """ Should have specified signal_map option. """
- args = dict(
- signal_map=object(),
- )
- expected_signal_map = args['signal_map']
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_signal_map, instance.signal_map)
-
- def test_has_derived_signal_map(self):
- """ Should have signal_map option derived from system. """
- args = dict()
- expected_signal_map = daemon.daemon.make_default_signal_map()
- instance = daemon.daemon.DaemonContext(**args)
- self.assertEqual(expected_signal_map, instance.signal_map)
-
-
-class DaemonContext_is_open_TestCase(DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext.is_open property. """
-
- def test_begin_false(self):
- """ Initial value of is_open should be False. """
- instance = self.test_instance
- self.assertEqual(False, instance.is_open)
-
- def test_write_fails(self):
- """ Writing to is_open should fail. """
- instance = self.test_instance
- self.assertRaises(
- AttributeError,
- setattr, instance, 'is_open', object())
-
-
-class DaemonContext_open_TestCase(DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext.open method. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonContext_open_TestCase, self).setUp()
-
- self.test_instance._is_open = False
-
- self.mock_module_daemon = mock.MagicMock()
- daemon_func_patchers = dict(
- (func_name, mock.patch.object(
- daemon.daemon, func_name))
- for func_name in [
- "detach_process_context",
- "change_working_directory",
- "change_root_directory",
- "change_file_creation_mask",
- "change_process_owner",
- "prevent_core_dump",
- "close_all_open_files",
- "redirect_stream",
- "set_signal_handlers",
- "register_atexit_function",
- ])
- for (func_name, patcher) in daemon_func_patchers.items():
- mock_func = patcher.start()
- self.addCleanup(patcher.stop)
- self.mock_module_daemon.attach_mock(mock_func, func_name)
-
- self.mock_module_daemon.attach_mock(mock.Mock(), 'DaemonContext')
-
- self.test_files_preserve_fds = object()
- self.test_signal_handler_map = object()
- daemoncontext_method_return_values = {
- '_get_exclude_file_descriptors':
- self.test_files_preserve_fds,
- '_make_signal_handler_map':
- self.test_signal_handler_map,
- }
- daemoncontext_func_patchers = dict(
- (func_name, mock.patch.object(
- daemon.daemon.DaemonContext,
- func_name,
- return_value=return_value))
- for (func_name, return_value) in
- daemoncontext_method_return_values.items())
- for (func_name, patcher) in daemoncontext_func_patchers.items():
- mock_func = patcher.start()
- self.addCleanup(patcher.stop)
- self.mock_module_daemon.DaemonContext.attach_mock(
- mock_func, func_name)
-
- def test_performs_steps_in_expected_sequence(self):
- """ Should perform daemonisation steps in expected sequence. """
- instance = self.test_instance
- instance.chroot_directory = object()
- instance.detach_process = True
- instance.pidfile = self.mock_pidlockfile
- self.mock_module_daemon.attach_mock(
- self.mock_pidlockfile, 'pidlockfile')
- expected_calls = [
- mock.call.change_root_directory(mock.ANY),
- mock.call.prevent_core_dump(),
- mock.call.change_file_creation_mask(mock.ANY),
- mock.call.change_working_directory(mock.ANY),
- mock.call.change_process_owner(mock.ANY, mock.ANY),
- mock.call.detach_process_context(),
- mock.call.DaemonContext._make_signal_handler_map(),
- mock.call.set_signal_handlers(mock.ANY),
- mock.call.DaemonContext._get_exclude_file_descriptors(),
- mock.call.close_all_open_files(exclude=mock.ANY),
- mock.call.redirect_stream(mock.ANY, mock.ANY),
- mock.call.redirect_stream(mock.ANY, mock.ANY),
- mock.call.redirect_stream(mock.ANY, mock.ANY),
- mock.call.pidlockfile.__enter__(),
- mock.call.register_atexit_function(mock.ANY),
- ]
- instance.open()
- self.mock_module_daemon.assert_has_calls(expected_calls)
-
- def test_returns_immediately_if_is_open(self):
- """ Should return immediately if is_open property is true. """
- instance = self.test_instance
- instance._is_open = True
- instance.open()
- self.assertEqual(0, len(self.mock_module_daemon.mock_calls))
-
- def test_changes_root_directory_to_chroot_directory(self):
- """ Should change root directory to `chroot_directory` option. """
- instance = self.test_instance
- chroot_directory = object()
- instance.chroot_directory = chroot_directory
- instance.open()
- self.mock_module_daemon.change_root_directory.assert_called_with(
- chroot_directory)
-
- def test_omits_chroot_if_no_chroot_directory(self):
- """ Should omit changing root directory if no `chroot_directory`. """
- instance = self.test_instance
- instance.chroot_directory = None
- instance.open()
- self.assertFalse(self.mock_module_daemon.change_root_directory.called)
-
- def test_prevents_core_dump(self):
- """ Should request prevention of core dumps. """
- instance = self.test_instance
- instance.open()
- self.mock_module_daemon.prevent_core_dump.assert_called_with()
-
- def test_omits_prevent_core_dump_if_prevent_core_false(self):
- """ Should omit preventing core dumps if `prevent_core` is false. """
- instance = self.test_instance
- instance.prevent_core = False
- instance.open()
- self.assertFalse(self.mock_module_daemon.prevent_core_dump.called)
-
- def test_closes_open_files(self):
- """ Should close all open files, excluding `files_preserve`. """
- instance = self.test_instance
- expected_exclude = self.test_files_preserve_fds
- instance.open()
- self.mock_module_daemon.close_all_open_files.assert_called_with(
- exclude=expected_exclude)
-
- def test_changes_directory_to_working_directory(self):
- """ Should change current directory to `working_directory` option. """
- instance = self.test_instance
- working_directory = object()
- instance.working_directory = working_directory
- instance.open()
- self.mock_module_daemon.change_working_directory.assert_called_with(
- working_directory)
-
- def test_changes_creation_mask_to_umask(self):
- """ Should change file creation mask to `umask` option. """
- instance = self.test_instance
- umask = object()
- instance.umask = umask
- instance.open()
- self.mock_module_daemon.change_file_creation_mask.assert_called_with(
- umask)
-
- def test_changes_owner_to_specified_uid_and_gid(self):
- """ Should change process UID and GID to `uid` and `gid` options. """
- instance = self.test_instance
- uid = object()
- gid = object()
- instance.uid = uid
- instance.gid = gid
- instance.open()
- self.mock_module_daemon.change_process_owner.assert_called_with(
- uid, gid)
-
- def test_detaches_process_context(self):
- """ Should request detach of process context. """
- instance = self.test_instance
- instance.open()
- self.mock_module_daemon.detach_process_context.assert_called_with()
-
- def test_omits_process_detach_if_not_required(self):
- """ Should omit detach of process context if not required. """
- instance = self.test_instance
- instance.detach_process = False
- instance.open()
- self.assertFalse(self.mock_module_daemon.detach_process_context.called)
-
- def test_sets_signal_handlers_from_signal_map(self):
- """ Should set signal handlers according to `signal_map`. """
- instance = self.test_instance
- instance.signal_map = object()
- expected_signal_handler_map = self.test_signal_handler_map
- instance.open()
- self.mock_module_daemon.set_signal_handlers.assert_called_with(
- expected_signal_handler_map)
-
- def test_redirects_standard_streams(self):
- """ Should request redirection of standard stream files. """
- instance = self.test_instance
- (system_stdin, system_stdout, system_stderr) = (
- sys.stdin, sys.stdout, sys.stderr)
- (target_stdin, target_stdout, target_stderr) = (
- self.stream_files_by_name[name]
- for name in ['stdin', 'stdout', 'stderr'])
- expected_calls = [
- mock.call(system_stdin, target_stdin),
- mock.call(system_stdout, target_stdout),
- mock.call(system_stderr, target_stderr),
- ]
- instance.open()
- self.mock_module_daemon.redirect_stream.assert_has_calls(
- expected_calls, any_order=True)
-
- def test_enters_pidfile_context(self):
- """ Should enter the PID file context manager. """
- instance = self.test_instance
- instance.pidfile = self.mock_pidlockfile
- instance.open()
- self.mock_pidlockfile.__enter__.assert_called_with()
-
- def test_sets_is_open_true(self):
- """ Should set the `is_open` property to True. """
- instance = self.test_instance
- instance.open()
- self.assertEqual(True, instance.is_open)
-
- def test_registers_close_method_for_atexit(self):
- """ Should register the `close` method for atexit processing. """
- instance = self.test_instance
- close_method = instance.close
- instance.open()
- self.mock_module_daemon.register_atexit_function.assert_called_with(
- close_method)
-
-
-class DaemonContext_close_TestCase(DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext.close method. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonContext_close_TestCase, self).setUp()
-
- self.test_instance._is_open = True
-
- def test_returns_immediately_if_not_is_open(self):
- """ Should return immediately if is_open property is false. """
- instance = self.test_instance
- instance._is_open = False
- instance.pidfile = object()
- instance.close()
- self.assertFalse(self.mock_pidlockfile.__exit__.called)
-
- def test_exits_pidfile_context(self):
- """ Should exit the PID file context manager. """
- instance = self.test_instance
- instance.pidfile = self.mock_pidlockfile
- instance.close()
- self.mock_pidlockfile.__exit__.assert_called_with(None, None, None)
-
- def test_returns_none(self):
- """ Should return None. """
- instance = self.test_instance
- expected_result = None
- result = instance.close()
- self.assertIs(result, expected_result)
-
- def test_sets_is_open_false(self):
- """ Should set the `is_open` property to False. """
- instance = self.test_instance
- instance.close()
- self.assertEqual(False, instance.is_open)
-
-
-@mock.patch.object(daemon.daemon.DaemonContext, "open")
-class DaemonContext_context_manager_enter_TestCase(DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext.__enter__ method. """
-
- def test_opens_daemon_context(self, mock_func_daemoncontext_open):
- """ Should open the DaemonContext. """
- instance = self.test_instance
- instance.__enter__()
- mock_func_daemoncontext_open.assert_called_with()
-
- def test_returns_self_instance(self, mock_func_daemoncontext_open):
- """ Should return DaemonContext instance. """
- instance = self.test_instance
- expected_result = instance
- result = instance.__enter__()
- self.assertIs(result, expected_result)
-
-
-@mock.patch.object(daemon.daemon.DaemonContext, "close")
-class DaemonContext_context_manager_exit_TestCase(DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext.__exit__ method. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonContext_context_manager_exit_TestCase, self).setUp()
-
- self.test_args = dict(
- exc_type=object(),
- exc_value=object(),
- traceback=object(),
- )
-
- def test_closes_daemon_context(self, mock_func_daemoncontext_close):
- """ Should close the DaemonContext. """
- instance = self.test_instance
- args = self.test_args
- instance.__exit__(**args)
- mock_func_daemoncontext_close.assert_called_with()
-
- def test_returns_none(self, mock_func_daemoncontext_close):
- """ Should return None, indicating exception was not handled. """
- instance = self.test_instance
- args = self.test_args
- expected_result = None
- result = instance.__exit__(**args)
- self.assertIs(result, expected_result)
-
-
-class DaemonContext_terminate_TestCase(DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext.terminate method. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonContext_terminate_TestCase, self).setUp()
-
- self.test_signal = signal.SIGTERM
- self.test_frame = None
- self.test_args = (self.test_signal, self.test_frame)
-
- def test_raises_system_exit(self):
- """ Should raise SystemExit. """
- instance = self.test_instance
- args = self.test_args
- expected_exception = SystemExit
- self.assertRaises(
- expected_exception,
- instance.terminate, *args)
-
- def test_exception_message_contains_signal_number(self):
- """ Should raise exception with a message containing signal number. """
- instance = self.test_instance
- args = self.test_args
- signal_number = self.test_signal
- expected_exception = SystemExit
- exc = self.assertRaises(
- expected_exception,
- instance.terminate, *args)
- self.assertIn(unicode(signal_number), unicode(exc))
-
-
-class DaemonContext_get_exclude_file_descriptors_TestCase(
- DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext._get_exclude_file_descriptors function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(
- DaemonContext_get_exclude_file_descriptors_TestCase,
- self).setUp()
-
- self.test_files = {
- 2: FakeFileDescriptorStringIO(),
- 5: 5,
- 11: FakeFileDescriptorStringIO(),
- 17: None,
- 23: FakeFileDescriptorStringIO(),
- 37: 37,
- 42: FakeFileDescriptorStringIO(),
- }
- for (fileno, item) in self.test_files.items():
- if hasattr(item, '_fileno'):
- item._fileno = fileno
- self.test_file_descriptors = set(
- fd for (fd, item) in self.test_files.items()
- if item is not None)
- self.test_file_descriptors.update(
- self.stream_files_by_name[name].fileno()
- for name in ['stdin', 'stdout', 'stderr']
- )
-
- def test_returns_expected_file_descriptors(self):
- """ Should return expected set of file descriptors. """
- instance = self.test_instance
- instance.files_preserve = list(self.test_files.values())
- expected_result = self.test_file_descriptors
- result = instance._get_exclude_file_descriptors()
- self.assertEqual(expected_result, result)
-
- def test_returns_stream_redirects_if_no_files_preserve(self):
- """ Should return only stream redirects if no files_preserve. """
- instance = self.test_instance
- instance.files_preserve = None
- expected_result = set(
- stream.fileno()
- for stream in self.stream_files_by_name.values())
- result = instance._get_exclude_file_descriptors()
- self.assertEqual(expected_result, result)
-
- def test_returns_empty_set_if_no_files(self):
- """ Should return empty set if no file options. """
- instance = self.test_instance
- for name in ['files_preserve', 'stdin', 'stdout', 'stderr']:
- setattr(instance, name, None)
- expected_result = set()
- result = instance._get_exclude_file_descriptors()
- self.assertEqual(expected_result, result)
-
- def test_omits_non_file_streams(self):
- """ Should omit non-file stream attributes. """
- instance = self.test_instance
- instance.files_preserve = list(self.test_files.values())
- stream_files = self.stream_files_by_name
- expected_result = self.test_file_descriptors.copy()
- for (pseudo_stream_name, pseudo_stream) in stream_files.items():
- test_non_file_object = object()
- setattr(instance, pseudo_stream_name, test_non_file_object)
- stream_fd = pseudo_stream.fileno()
- expected_result.discard(stream_fd)
- result = instance._get_exclude_file_descriptors()
- self.assertEqual(expected_result, result)
-
- def test_includes_verbatim_streams_without_file_descriptor(self):
- """ Should include verbatim any stream without a file descriptor. """
- instance = self.test_instance
- instance.files_preserve = list(self.test_files.values())
- stream_files = self.stream_files_by_name
- mock_fileno_method = mock.MagicMock(
- spec=sys.__stdin__.fileno,
- side_effect=ValueError)
- expected_result = self.test_file_descriptors.copy()
- for (pseudo_stream_name, pseudo_stream) in stream_files.items():
- test_non_fd_stream = StringIO()
- if not hasattr(test_non_fd_stream, 'fileno'):
- # Python < 3 StringIO doesn't have ‘fileno’ at all.
- # Add a method which raises an exception.
- test_non_fd_stream.fileno = mock_fileno_method
- setattr(instance, pseudo_stream_name, test_non_fd_stream)
- stream_fd = pseudo_stream.fileno()
- expected_result.discard(stream_fd)
- expected_result.add(test_non_fd_stream)
- result = instance._get_exclude_file_descriptors()
- self.assertEqual(expected_result, result)
-
- def test_omits_none_streams(self):
- """ Should omit any stream attribute which is None. """
- instance = self.test_instance
- instance.files_preserve = list(self.test_files.values())
- stream_files = self.stream_files_by_name
- expected_result = self.test_file_descriptors.copy()
- for (pseudo_stream_name, pseudo_stream) in stream_files.items():
- setattr(instance, pseudo_stream_name, None)
- stream_fd = pseudo_stream.fileno()
- expected_result.discard(stream_fd)
- result = instance._get_exclude_file_descriptors()
- self.assertEqual(expected_result, result)
-
-
-class DaemonContext_make_signal_handler_TestCase(DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext._make_signal_handler function. """
-
- def test_returns_ignore_for_none(self):
- """ Should return SIG_IGN when None handler specified. """
- instance = self.test_instance
- target = None
- expected_result = signal.SIG_IGN
- result = instance._make_signal_handler(target)
- self.assertEqual(expected_result, result)
-
- def test_returns_method_for_name(self):
- """ Should return method of DaemonContext when name specified. """
- instance = self.test_instance
- target = 'terminate'
- expected_result = instance.terminate
- result = instance._make_signal_handler(target)
- self.assertEqual(expected_result, result)
-
- def test_raises_error_for_unknown_name(self):
- """ Should raise AttributeError for unknown method name. """
- instance = self.test_instance
- target = 'b0gUs'
- expected_error = AttributeError
- self.assertRaises(
- expected_error,
- instance._make_signal_handler, target)
-
- def test_returns_object_for_object(self):
- """ Should return same object for any other object. """
- instance = self.test_instance
- target = object()
- expected_result = target
- result = instance._make_signal_handler(target)
- self.assertEqual(expected_result, result)
-
-
-class DaemonContext_make_signal_handler_map_TestCase(
- DaemonContext_BaseTestCase):
- """ Test cases for DaemonContext._make_signal_handler_map function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonContext_make_signal_handler_map_TestCase, self).setUp()
-
- self.test_instance.signal_map = {
- object(): object(),
- object(): object(),
- object(): object(),
- }
-
- self.test_signal_handlers = dict(
- (key, object())
- for key in self.test_instance.signal_map.values())
- self.test_signal_handler_map = dict(
- (key, self.test_signal_handlers[target])
- for (key, target) in self.test_instance.signal_map.items())
-
- def fake_make_signal_handler(target):
- return self.test_signal_handlers[target]
-
- func_patcher_make_signal_handler = mock.patch.object(
- daemon.daemon.DaemonContext, "_make_signal_handler",
- side_effect=fake_make_signal_handler)
- self.mock_func_make_signal_handler = (
- func_patcher_make_signal_handler.start())
- self.addCleanup(func_patcher_make_signal_handler.stop)
-
- def test_returns_constructed_signal_handler_items(self):
- """ Should return items as constructed via make_signal_handler. """
- instance = self.test_instance
- expected_result = self.test_signal_handler_map
- result = instance._make_signal_handler_map()
- self.assertEqual(expected_result, result)
-
-
-try:
- FileNotFoundError
-except NameError:
- # Python 2 uses IOError.
- FileNotFoundError = functools.partial(IOError, errno.ENOENT)
-
-
-@mock.patch.object(os, "chdir")
-class change_working_directory_TestCase(scaffold.TestCase):
- """ Test cases for change_working_directory function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(change_working_directory_TestCase, self).setUp()
-
- self.test_directory = object()
- self.test_args = dict(
- directory=self.test_directory,
- )
-
- def test_changes_working_directory_to_specified_directory(
- self,
- mock_func_os_chdir):
- """ Should change working directory to specified directory. """
- args = self.test_args
- directory = self.test_directory
- daemon.daemon.change_working_directory(**args)
- mock_func_os_chdir.assert_called_with(directory)
-
- def test_raises_daemon_error_on_os_error(
- self,
- mock_func_os_chdir):
- """ Should raise a DaemonError on receiving an IOError. """
- args = self.test_args
- test_error = FileNotFoundError("No such directory")
- mock_func_os_chdir.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_working_directory, **args)
- self.assertEqual(test_error, exc.__cause__)
-
- def test_error_message_contains_original_error_message(
- self,
- mock_func_os_chdir):
- """ Should raise a DaemonError with original message. """
- args = self.test_args
- test_error = FileNotFoundError("No such directory")
- mock_func_os_chdir.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_working_directory, **args)
- self.assertIn(unicode(test_error), unicode(exc))
-
-
-@mock.patch.object(os, "chroot")
-@mock.patch.object(os, "chdir")
-class change_root_directory_TestCase(scaffold.TestCase):
- """ Test cases for change_root_directory function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(change_root_directory_TestCase, self).setUp()
-
- self.test_directory = object()
- self.test_args = dict(
- directory=self.test_directory,
- )
-
- def test_changes_working_directory_to_specified_directory(
- self,
- mock_func_os_chdir, mock_func_os_chroot):
- """ Should change working directory to specified directory. """
- args = self.test_args
- directory = self.test_directory
- daemon.daemon.change_root_directory(**args)
- mock_func_os_chdir.assert_called_with(directory)
-
- def test_changes_root_directory_to_specified_directory(
- self,
- mock_func_os_chdir, mock_func_os_chroot):
- """ Should change root directory to specified directory. """
- args = self.test_args
- directory = self.test_directory
- daemon.daemon.change_root_directory(**args)
- mock_func_os_chroot.assert_called_with(directory)
-
- def test_raises_daemon_error_on_os_error_from_chdir(
- self,
- mock_func_os_chdir, mock_func_os_chroot):
- """ Should raise a DaemonError on receiving an IOError from chdir. """
- args = self.test_args
- test_error = FileNotFoundError("No such directory")
- mock_func_os_chdir.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_root_directory, **args)
- self.assertEqual(test_error, exc.__cause__)
-
- def test_raises_daemon_error_on_os_error_from_chroot(
- self,
- mock_func_os_chdir, mock_func_os_chroot):
- """ Should raise a DaemonError on receiving an OSError from chroot. """
- args = self.test_args
- test_error = OSError(errno.EPERM, "No chroot for you!")
- mock_func_os_chroot.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_root_directory, **args)
- self.assertEqual(test_error, exc.__cause__)
-
- def test_error_message_contains_original_error_message(
- self,
- mock_func_os_chdir, mock_func_os_chroot):
- """ Should raise a DaemonError with original message. """
- args = self.test_args
- test_error = FileNotFoundError("No such directory")
- mock_func_os_chdir.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_root_directory, **args)
- self.assertIn(unicode(test_error), unicode(exc))
-
-
-@mock.patch.object(os, "umask")
-class change_file_creation_mask_TestCase(scaffold.TestCase):
- """ Test cases for change_file_creation_mask function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(change_file_creation_mask_TestCase, self).setUp()
-
- self.test_mask = object()
- self.test_args = dict(
- mask=self.test_mask,
- )
-
- def test_changes_umask_to_specified_mask(self, mock_func_os_umask):
- """ Should change working directory to specified directory. """
- args = self.test_args
- mask = self.test_mask
- daemon.daemon.change_file_creation_mask(**args)
- mock_func_os_umask.assert_called_with(mask)
-
- def test_raises_daemon_error_on_os_error_from_chdir(
- self,
- mock_func_os_umask):
- """ Should raise a DaemonError on receiving an OSError from umask. """
- args = self.test_args
- test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
- mock_func_os_umask.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_file_creation_mask, **args)
- self.assertEqual(test_error, exc.__cause__)
-
- def test_error_message_contains_original_error_message(
- self,
- mock_func_os_umask):
- """ Should raise a DaemonError with original message. """
- args = self.test_args
- test_error = FileNotFoundError("No such directory")
- mock_func_os_umask.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_file_creation_mask, **args)
- self.assertIn(unicode(test_error), unicode(exc))
-
-
-@mock.patch.object(os, "setgid")
-@mock.patch.object(os, "setuid")
-class change_process_owner_TestCase(scaffold.TestCase):
- """ Test cases for change_process_owner function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(change_process_owner_TestCase, self).setUp()
-
- self.test_uid = object()
- self.test_gid = object()
- self.test_args = dict(
- uid=self.test_uid,
- gid=self.test_gid,
- )
-
- def test_changes_gid_and_uid_in_order(
- self,
- mock_func_os_setuid, mock_func_os_setgid):
- """ Should change process GID and UID in correct order.
-
- Since the process requires appropriate privilege to use
- either of `setuid` or `setgid`, changing the UID must be
- done last.
-
- """
- args = self.test_args
- daemon.daemon.change_process_owner(**args)
- mock_func_os_setuid.assert_called()
- mock_func_os_setgid.assert_called()
-
- def test_changes_group_id_to_gid(
- self,
- mock_func_os_setuid, mock_func_os_setgid):
- """ Should change process GID to specified value. """
- args = self.test_args
- gid = self.test_gid
- daemon.daemon.change_process_owner(**args)
- mock_func_os_setgid.assert_called(gid)
-
- def test_changes_user_id_to_uid(
- self,
- mock_func_os_setuid, mock_func_os_setgid):
- """ Should change process UID to specified value. """
- args = self.test_args
- uid = self.test_uid
- daemon.daemon.change_process_owner(**args)
- mock_func_os_setuid.assert_called(uid)
-
- def test_raises_daemon_error_on_os_error_from_setgid(
- self,
- mock_func_os_setuid, mock_func_os_setgid):
- """ Should raise a DaemonError on receiving an OSError from setgid. """
- args = self.test_args
- test_error = OSError(errno.EPERM, "No switching for you!")
- mock_func_os_setgid.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_process_owner, **args)
- self.assertEqual(test_error, exc.__cause__)
-
- def test_raises_daemon_error_on_os_error_from_setuid(
- self,
- mock_func_os_setuid, mock_func_os_setgid):
- """ Should raise a DaemonError on receiving an OSError from setuid. """
- args = self.test_args
- test_error = OSError(errno.EPERM, "No switching for you!")
- mock_func_os_setuid.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_process_owner, **args)
- self.assertEqual(test_error, exc.__cause__)
-
- def test_error_message_contains_original_error_message(
- self,
- mock_func_os_setuid, mock_func_os_setgid):
- """ Should raise a DaemonError with original message. """
- args = self.test_args
- test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
- mock_func_os_setuid.side_effect = test_error
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.change_process_owner, **args)
- self.assertIn(unicode(test_error), unicode(exc))
-
-
-RLimitResult = collections.namedtuple('RLimitResult', ['soft', 'hard'])
-
-fake_RLIMIT_CORE = object()
-
-@mock.patch.object(resource, "RLIMIT_CORE", new=fake_RLIMIT_CORE)
-@mock.patch.object(resource, "setrlimit", side_effect=(lambda x, y: None))
-@mock.patch.object(resource, "getrlimit", side_effect=(lambda x: None))
-class prevent_core_dump_TestCase(scaffold.TestCase):
- """ Test cases for prevent_core_dump function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(prevent_core_dump_TestCase, self).setUp()
-
- def test_sets_core_limit_to_zero(
- self,
- mock_func_resource_getrlimit, mock_func_resource_setrlimit):
- """ Should set the RLIMIT_CORE resource to zero. """
- expected_resource = fake_RLIMIT_CORE
- expected_limit = tuple(RLimitResult(soft=0, hard=0))
- daemon.daemon.prevent_core_dump()
- mock_func_resource_getrlimit.assert_called_with(expected_resource)
- mock_func_resource_setrlimit.assert_called_with(
- expected_resource, expected_limit)
-
- def test_raises_error_when_no_core_resource(
- self,
- mock_func_resource_getrlimit, mock_func_resource_setrlimit):
- """ Should raise DaemonError if no RLIMIT_CORE resource. """
- test_error = ValueError("Bogus platform doesn't have RLIMIT_CORE")
- def fake_getrlimit(res):
- if res == resource.RLIMIT_CORE:
- raise test_error
- else:
- return None
- mock_func_resource_getrlimit.side_effect = fake_getrlimit
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.prevent_core_dump)
- self.assertEqual(test_error, exc.__cause__)
-
-
-@mock.patch.object(os, "close")
-class close_file_descriptor_if_open_TestCase(scaffold.TestCase):
- """ Test cases for close_file_descriptor_if_open function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(close_file_descriptor_if_open_TestCase, self).setUp()
-
- self.fake_fd = 274
-
- def test_requests_file_descriptor_close(self, mock_func_os_close):
- """ Should request close of file descriptor. """
- fd = self.fake_fd
- daemon.daemon.close_file_descriptor_if_open(fd)
- mock_func_os_close.assert_called_with(fd)
-
- def test_ignores_badfd_error_on_close(self, mock_func_os_close):
- """ Should ignore OSError EBADF when closing. """
- fd = self.fake_fd
- test_error = OSError(errno.EBADF, "Bad file descriptor")
- def fake_os_close(fd):
- raise test_error
- mock_func_os_close.side_effect = fake_os_close
- daemon.daemon.close_file_descriptor_if_open(fd)
- mock_func_os_close.assert_called_with(fd)
-
- def test_raises_error_if_oserror_on_close(self, mock_func_os_close):
- """ Should raise DaemonError if an OSError occurs when closing. """
- fd = self.fake_fd
- test_error = OSError(object(), "Unexpected error")
- def fake_os_close(fd):
- raise test_error
- mock_func_os_close.side_effect = fake_os_close
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.close_file_descriptor_if_open, fd)
- self.assertEqual(test_error, exc.__cause__)
-
- def test_raises_error_if_ioerror_on_close(self, mock_func_os_close):
- """ Should raise DaemonError if an IOError occurs when closing. """
- fd = self.fake_fd
- test_error = IOError(object(), "Unexpected error")
- def fake_os_close(fd):
- raise test_error
- mock_func_os_close.side_effect = fake_os_close
- expected_error = daemon.daemon.DaemonOSEnvironmentError
- exc = self.assertRaises(
- expected_error,
- daemon.daemon.close_file_descriptor_if_open, fd)
- self.assertEqual(test_error, exc.__cause__)
-
-
-class maxfd_TestCase(scaffold.TestCase):
- """ Test cases for module MAXFD constant. """
-
- def test_positive(self):
- """ Should be a positive number. """
- maxfd = daemon.daemon.MAXFD
- self.assertTrue(maxfd > 0)
-
- def test_integer(self):
- """ Should be an integer. """
- maxfd = daemon.daemon.MAXFD
- self.assertEqual(int(maxfd), maxfd)
-
- def test_reasonably_high(self):
- """ Should be reasonably high for default open files limit.
-
- If the system reports a limit of “infinity” on maximum
- file descriptors, we still need a finite number in order
- to close “all” of them. Ensure this is reasonably high
- to catch most use cases.
-
- """
- expected_minimum = 2048
- maxfd = daemon.daemon.MAXFD
- self.assertTrue(
- expected_minimum <= maxfd,
- msg=(
- "MAXFD should be at least {minimum!r}"
- " (got {maxfd!r})".format(
- minimum=expected_minimum, maxfd=maxfd)))
-
-
-fake_default_maxfd = 8
-fake_RLIMIT_NOFILE = object()
-fake_RLIM_INFINITY = object()
-fake_rlimit_nofile_large = 2468
-
-def fake_getrlimit_nofile_soft_infinity(resource):
- result = RLimitResult(soft=fake_RLIM_INFINITY, hard=object())
- if resource != fake_RLIMIT_NOFILE:
- result = NotImplemented
- return result
-
-def fake_getrlimit_nofile_hard_infinity(resource):
- result = RLimitResult(soft=object(), hard=fake_RLIM_INFINITY)
- if resource != fake_RLIMIT_NOFILE:
- result = NotImplemented
- return result
-
-def fake_getrlimit_nofile_hard_large(resource):
- result = RLimitResult(soft=object(), hard=fake_rlimit_nofile_large)
- if resource != fake_RLIMIT_NOFILE:
- result = NotImplemented
- return result
-
-@mock.patch.object(daemon.daemon, "MAXFD", new=fake_default_maxfd)
-@mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE)
-@mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY)
-@mock.patch.object(
- resource, "getrlimit",
- side_effect=fake_getrlimit_nofile_hard_large)
-class get_maximum_file_descriptors_TestCase(scaffold.TestCase):
- """ Test cases for get_maximum_file_descriptors function. """
-
- def test_returns_system_hard_limit(self, mock_func_resource_getrlimit):
- """ Should return process hard limit on number of files. """
- expected_result = fake_rlimit_nofile_large
- result = daemon.daemon.get_maximum_file_descriptors()
- self.assertEqual(expected_result, result)
-
- def test_returns_module_default_if_hard_limit_infinity(
- self, mock_func_resource_getrlimit):
- """ Should return module MAXFD if hard limit is infinity. """
- mock_func_resource_getrlimit.side_effect = (
- fake_getrlimit_nofile_hard_infinity)
- expected_result = fake_default_maxfd
- result = daemon.daemon.get_maximum_file_descriptors()
- self.assertEqual(expected_result, result)
-
-
-def fake_get_maximum_file_descriptors():
- return fake_default_maxfd
-
-@mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE)
-@mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY)
-@mock.patch.object(
- resource, "getrlimit",
- new=fake_getrlimit_nofile_soft_infinity)
-@mock.patch.object(
- daemon.daemon, "get_maximum_file_descriptors",
- new=fake_get_maximum_file_descriptors)
-@mock.patch.object(daemon.daemon, "close_file_descriptor_if_open")
-class close_all_open_files_TestCase(scaffold.TestCase):
- """ Test cases for close_all_open_files function. """
-
- def test_requests_all_open_files_to_close(
- self, mock_func_close_file_descriptor_if_open):
- """ Should request close of all open files. """
- expected_file_descriptors = range(fake_default_maxfd)
- expected_calls = [
- mock.call(fd) for fd in expected_file_descriptors]
- daemon.daemon.close_all_open_files()
- mock_func_close_file_descriptor_if_open.assert_has_calls(
- expected_calls, any_order=True)
-
- def test_requests_all_but_excluded_files_to_close(
- self, mock_func_close_file_descriptor_if_open):
- """ Should request close of all open files but those excluded. """
- test_exclude = set([3, 7])
- args = dict(
- exclude=test_exclude,
- )
- expected_file_descriptors = set(
- fd for fd in range(fake_default_maxfd)
- if fd not in test_exclude)
- expected_calls = [
- mock.call(fd) for fd in expected_file_descriptors]
- daemon.daemon.close_all_open_files(**args)
- mock_func_close_file_descriptor_if_open.assert_has_calls(
- expected_calls, any_order=True)
-
-
-class detach_process_context_TestCase(scaffold.TestCase):
- """ Test cases for detach_process_context function. """
-
- class FakeOSExit(SystemExit):
- """ Fake exception raised for os._exit(). """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(detach_process_context_TestCase, self).setUp()
-
- self.mock_module_os = mock.MagicMock(wraps=os)
-
- fake_pids = [0, 0]
- func_patcher_os_fork = mock.patch.object(
- os, "fork",
- side_effect=iter(fake_pids))
- self.mock_func_os_fork = func_patcher_os_fork.start()
- self.addCleanup(func_patcher_os_fork.stop)
- self.mock_module_os.attach_mock(self.mock_func_os_fork, "fork")
-
- func_patcher_os_setsid = mock.patch.object(os, "setsid")
- self.mock_func_os_setsid = func_patcher_os_setsid.start()
- self.addCleanup(func_patcher_os_setsid.stop)
- self.mock_module_os.attach_mock(self.mock_func_os_setsid, "setsid")
-
- def raise_os_exit(status=None):
- raise self.FakeOSExit(status)
-
- func_patcher_os_force_exit = mock.patch.object(
- os, "_exit",
- side_effect=raise_os_exit)
- self.mock_func_os_force_exit = func_patcher_os_force_exit.start()
- self.addCleanup(func_patcher_os_force_exit.stop)
- self.mock_module_os.attach_mock(self.mock_func_os_force_exit, "_exit")
-
- def test_parent_exits(self):
- """ Parent process should exit. """
- parent_pid = 23
- self.mock_func_os_fork.side_effect = iter([parent_pid])
- self.assertRaises(
- self.FakeOSExit,
- daemon.daemon.detach_process_context)
- self.mock_module_os.assert_has_calls([
- mock.call.fork(),
- mock.call._exit(0),
- ])
-
- def test_first_fork_error_raises_error(self):
- """ Error on first fork should raise DaemonProcessDetachError. """
- fork_errno = 13
- fork_strerror = "Bad stuff happened"
- test_error = OSError(fork_errno, fork_strerror)
- test_pids_iter = iter([test_error])
-
- def fake_fork():
- next_item = next(test_pids_iter)
- if isinstance(next_item, Exception):
- raise next_item
- else:
- return next_item
-
- self.mock_func_os_fork.side_effect = fake_fork
- exc = self.assertRaises(
- daemon.daemon.DaemonProcessDetachError,
- daemon.daemon.detach_process_context)
- self.assertEqual(test_error, exc.__cause__)
- self.mock_module_os.assert_has_calls([
- mock.call.fork(),
- ])
-
- def test_child_starts_new_process_group(self):
- """ Child should start new process group. """
- daemon.daemon.detach_process_context()
- self.mock_module_os.assert_has_calls([
- mock.call.fork(),
- mock.call.setsid(),
- ])
-
- def test_child_forks_next_parent_exits(self):
- """ Child should fork, then exit if parent. """
- fake_pids = [0, 42]
- self.mock_func_os_fork.side_effect = iter(fake_pids)
- self.assertRaises(
- self.FakeOSExit,
- daemon.daemon.detach_process_context)
- self.mock_module_os.assert_has_calls([
- mock.call.fork(),
- mock.call.setsid(),
- mock.call.fork(),
- mock.call._exit(0),
- ])
-
- def test_second_fork_error_reports_to_stderr(self):
- """ Error on second fork should cause report to stderr. """
- fork_errno = 17
- fork_strerror = "Nasty stuff happened"
- test_error = OSError(fork_errno, fork_strerror)
- test_pids_iter = iter([0, test_error])
-
- def fake_fork():
- next_item = next(test_pids_iter)
- if isinstance(next_item, Exception):
- raise next_item
- else:
- return next_item
-
- self.mock_func_os_fork.side_effect = fake_fork
- exc = self.assertRaises(
- daemon.daemon.DaemonProcessDetachError,
- daemon.daemon.detach_process_context)
- self.assertEqual(test_error, exc.__cause__)
- self.mock_module_os.assert_has_calls([
- mock.call.fork(),
- mock.call.setsid(),
- mock.call.fork(),
- ])
-
- def test_child_forks_next_child_continues(self):
- """ Child should fork, then continue if child. """
- daemon.daemon.detach_process_context()
- self.mock_module_os.assert_has_calls([
- mock.call.fork(),
- mock.call.setsid(),
- mock.call.fork(),
- ])
-
-
-@mock.patch("os.getppid", return_value=765)
-class is_process_started_by_init_TestCase(scaffold.TestCase):
- """ Test cases for is_process_started_by_init function. """
-
- def test_returns_false_by_default(self, mock_func_os_getppid):
- """ Should return False under normal circumstances. """
- expected_result = False
- result = daemon.daemon.is_process_started_by_init()
- self.assertIs(result, expected_result)
-
- def test_returns_true_if_parent_process_is_init(
- self, mock_func_os_getppid):
- """ Should return True if parent process is `init`. """
- init_pid = 1
- mock_func_os_getppid.return_value = init_pid
- expected_result = True
- result = daemon.daemon.is_process_started_by_init()
- self.assertIs(result, expected_result)
-
-
-class is_socket_TestCase(scaffold.TestCase):
- """ Test cases for is_socket function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(is_socket_TestCase, self).setUp()
-
- def fake_getsockopt(level, optname, buflen=None):
- result = object()
- if optname is socket.SO_TYPE:
- result = socket.SOCK_RAW
- return result
-
- self.fake_socket_getsockopt_func = fake_getsockopt
-
- self.fake_socket_error = socket.error(
- errno.ENOTSOCK,
- "Socket operation on non-socket")
-
- self.mock_socket = mock.MagicMock(spec=socket.socket)
- self.mock_socket.getsockopt.side_effect = self.fake_socket_error
-
- def fake_socket_fromfd(fd, family, type, proto=None):
- return self.mock_socket
-
- func_patcher_socket_fromfd = mock.patch.object(
- socket, "fromfd",
- side_effect=fake_socket_fromfd)
- func_patcher_socket_fromfd.start()
- self.addCleanup(func_patcher_socket_fromfd.stop)
-
- def test_returns_false_by_default(self):
- """ Should return False under normal circumstances. """
- test_fd = 23
- expected_result = False
- result = daemon.daemon.is_socket(test_fd)
- self.assertIs(result, expected_result)
-
- def test_returns_true_if_stdin_is_socket(self):
- """ Should return True if `stdin` is a socket. """
- test_fd = 23
- getsockopt = self.mock_socket.getsockopt
- getsockopt.side_effect = self.fake_socket_getsockopt_func
- expected_result = True
- result = daemon.daemon.is_socket(test_fd)
- self.assertIs(result, expected_result)
-
- def test_returns_false_if_stdin_socket_raises_error(self):
- """ Should return True if `stdin` is a socket and raises error. """
- test_fd = 23
- getsockopt = self.mock_socket.getsockopt
- getsockopt.side_effect = socket.error(
- object(), "Weird socket stuff")
- expected_result = True
- result = daemon.daemon.is_socket(test_fd)
- self.assertIs(result, expected_result)
-
-
-class is_process_started_by_superserver_TestCase(scaffold.TestCase):
- """ Test cases for is_process_started_by_superserver function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(is_process_started_by_superserver_TestCase, self).setUp()
-
- def fake_is_socket(fd):
- if sys.__stdin__.fileno() == fd:
- result = self.fake_stdin_is_socket_func()
- else:
- result = False
- return result
-
- self.fake_stdin_is_socket_func = (lambda: False)
-
- func_patcher_is_socket = mock.patch.object(
- daemon.daemon, "is_socket",
- side_effect=fake_is_socket)
- func_patcher_is_socket.start()
- self.addCleanup(func_patcher_is_socket.stop)
-
- def test_returns_false_by_default(self):
- """ Should return False under normal circumstances. """
- expected_result = False
- result = daemon.daemon.is_process_started_by_superserver()
- self.assertIs(result, expected_result)
-
- def test_returns_true_if_stdin_is_socket(self):
- """ Should return True if `stdin` is a socket. """
- self.fake_stdin_is_socket_func = (lambda: True)
- expected_result = True
- result = daemon.daemon.is_process_started_by_superserver()
- self.assertIs(result, expected_result)
-
-
-@mock.patch.object(
- daemon.daemon, "is_process_started_by_superserver",
- return_value=False)
-@mock.patch.object(
- daemon.daemon, "is_process_started_by_init",
- return_value=False)
-class is_detach_process_context_required_TestCase(scaffold.TestCase):
- """ Test cases for is_detach_process_context_required function. """
-
- def test_returns_true_by_default(
- self,
- mock_func_is_process_started_by_init,
- mock_func_is_process_started_by_superserver):
- """ Should return True under normal circumstances. """
- expected_result = True
- result = daemon.daemon.is_detach_process_context_required()
- self.assertIs(result, expected_result)
-
- def test_returns_false_if_started_by_init(
- self,
- mock_func_is_process_started_by_init,
- mock_func_is_process_started_by_superserver):
- """ Should return False if current process started by init. """
- mock_func_is_process_started_by_init.return_value = True
- expected_result = False
- result = daemon.daemon.is_detach_process_context_required()
- self.assertIs(result, expected_result)
-
- def test_returns_true_if_started_by_superserver(
- self,
- mock_func_is_process_started_by_init,
- mock_func_is_process_started_by_superserver):
- """ Should return False if current process started by superserver. """
- mock_func_is_process_started_by_superserver.return_value = True
- expected_result = False
- result = daemon.daemon.is_detach_process_context_required()
- self.assertIs(result, expected_result)
-
-
-def setup_streams_fixtures(testcase):
- """ Set up common test fixtures for standard streams. """
- testcase.stream_file_paths = dict(
- stdin=tempfile.mktemp(),
- stdout=tempfile.mktemp(),
- stderr=tempfile.mktemp(),
- )
-
- testcase.stream_files_by_name = dict(
- (name, FakeFileDescriptorStringIO())
- for name in ['stdin', 'stdout', 'stderr']
- )
-
- testcase.stream_files_by_path = dict(
- (testcase.stream_file_paths[name],
- testcase.stream_files_by_name[name])
- for name in ['stdin', 'stdout', 'stderr']
- )
-
-
-@mock.patch.object(os, "dup2")
-class redirect_stream_TestCase(scaffold.TestCase):
- """ Test cases for redirect_stream function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(redirect_stream_TestCase, self).setUp()
-
- self.test_system_stream = FakeFileDescriptorStringIO()
- self.test_target_stream = FakeFileDescriptorStringIO()
- self.test_null_file = FakeFileDescriptorStringIO()
-
- def fake_os_open(path, flag, mode=None):
- if path == os.devnull:
- result = self.test_null_file.fileno()
- else:
- raise FileNotFoundError("No such file", path)
- return result
-
- func_patcher_os_open = mock.patch.object(
- os, "open",
- side_effect=fake_os_open)
- self.mock_func_os_open = func_patcher_os_open.start()
- self.addCleanup(func_patcher_os_open.stop)
-
- def test_duplicates_target_file_descriptor(
- self, mock_func_os_dup2):
- """ Should duplicate file descriptor from target to system stream. """
- system_stream = self.test_system_stream
- system_fileno = system_stream.fileno()
- target_stream = self.test_target_stream
- target_fileno = target_stream.fileno()
- daemon.daemon.redirect_stream(system_stream, target_stream)
- mock_func_os_dup2.assert_called_with(target_fileno, system_fileno)
-
- def test_duplicates_null_file_descriptor_by_default(
- self, mock_func_os_dup2):
- """ Should by default duplicate the null file to the system stream. """
- system_stream = self.test_system_stream
- system_fileno = system_stream.fileno()
- target_stream = None
- null_path = os.devnull
- null_flag = os.O_RDWR
- null_file = self.test_null_file
- null_fileno = null_file.fileno()
- daemon.daemon.redirect_stream(system_stream, target_stream)
- self.mock_func_os_open.assert_called_with(null_path, null_flag)
- mock_func_os_dup2.assert_called_with(null_fileno, system_fileno)
-
-
-class make_default_signal_map_TestCase(scaffold.TestCase):
- """ Test cases for make_default_signal_map function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(make_default_signal_map_TestCase, self).setUp()
-
- # Use whatever default string type this Python version needs.
- signal_module_name = str('signal')
- self.fake_signal_module = ModuleType(signal_module_name)
-
- fake_signal_names = [
- 'SIGHUP',
- 'SIGCLD',
- 'SIGSEGV',
- 'SIGTSTP',
- 'SIGTTIN',
- 'SIGTTOU',
- 'SIGTERM',
- ]
- for name in fake_signal_names:
- setattr(self.fake_signal_module, name, object())
-
- module_patcher_signal = mock.patch.object(
- daemon.daemon, "signal", new=self.fake_signal_module)
- module_patcher_signal.start()
- self.addCleanup(module_patcher_signal.stop)
-
- default_signal_map_by_name = {
- 'SIGTSTP': None,
- 'SIGTTIN': None,
- 'SIGTTOU': None,
- 'SIGTERM': 'terminate',
- }
- self.default_signal_map = dict(
- (getattr(self.fake_signal_module, name), target)
- for (name, target) in default_signal_map_by_name.items())
-
- def test_returns_constructed_signal_map(self):
- """ Should return map per default. """
- expected_result = self.default_signal_map
- result = daemon.daemon.make_default_signal_map()
- self.assertEqual(expected_result, result)
-
- def test_returns_signal_map_with_only_ids_in_signal_module(self):
- """ Should return map with only signals in the `signal` module.
-
- The `signal` module is documented to only define those
- signals which exist on the running system. Therefore the
- default map should not contain any signals which are not
- defined in the `signal` module.
-
- """
- del(self.default_signal_map[self.fake_signal_module.SIGTTOU])
- del(self.fake_signal_module.SIGTTOU)
- expected_result = self.default_signal_map
- result = daemon.daemon.make_default_signal_map()
- self.assertEqual(expected_result, result)
-
-
-@mock.patch.object(daemon.daemon.signal, "signal")
-class set_signal_handlers_TestCase(scaffold.TestCase):
- """ Test cases for set_signal_handlers function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(set_signal_handlers_TestCase, self).setUp()
-
- self.signal_handler_map = {
- signal.SIGQUIT: object(),
- signal.SIGSEGV: object(),
- signal.SIGINT: object(),
- }
-
- def test_sets_signal_handler_for_each_item(self, mock_func_signal_signal):
- """ Should set signal handler for each item in map. """
- signal_handler_map = self.signal_handler_map
- expected_calls = [
- mock.call(signal_number, handler)
- for (signal_number, handler) in signal_handler_map.items()]
- daemon.daemon.set_signal_handlers(signal_handler_map)
- self.assertEquals(expected_calls, mock_func_signal_signal.mock_calls)
-
-
-@mock.patch.object(daemon.daemon.atexit, "register")
-class register_atexit_function_TestCase(scaffold.TestCase):
- """ Test cases for register_atexit_function function. """
-
- def test_registers_function_for_atexit_processing(
- self, mock_func_atexit_register):
- """ Should register specified function for atexit processing. """
- func = object()
- daemon.daemon.register_atexit_function(func)
- mock_func_atexit_register.assert_called_with(func)
-
-
-# Local variables:
-# coding: utf-8
-# mode: python
-# End:
-# vim: fileencoding=utf-8 filetype=python :
diff --git a/external_libs/python/python-daemon-2.0.5/test/test_metadata.py b/external_libs/python/python-daemon-2.0.5/test/test_metadata.py
deleted file mode 100644
index 692753f4..00000000
--- a/external_libs/python/python-daemon-2.0.5/test/test_metadata.py
+++ /dev/null
@@ -1,380 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# test/test_metadata.py
-# Part of ‘python-daemon’, an implementation of PEP 3143.
-#
-# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
-#
-# This is free software: you may copy, modify, and/or distribute this work
-# under the terms of the Apache License, version 2.0 as published by the
-# Apache Software Foundation.
-# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
-
-""" Unit test for ‘_metadata’ private module.
- """
-
-from __future__ import (absolute_import, unicode_literals)
-
-import sys
-import errno
-import re
-try:
- # Python 3 standard library.
- import urllib.parse as urlparse
-except ImportError:
- # Python 2 standard library.
- import urlparse
-import functools
-import collections
-import json
-
-import pkg_resources
-import mock
-import testtools.helpers
-import testtools.matchers
-import testscenarios
-
-from . import scaffold
-from .scaffold import (basestring, unicode)
-
-import daemon._metadata as metadata
-
-
-class HasAttribute(testtools.matchers.Matcher):
- """ A matcher to assert an object has a named attribute. """
-
- def __init__(self, name):
- self.attribute_name = name
-
- def match(self, instance):
- """ Assert the object `instance` has an attribute named `name`. """
- result = None
- if not testtools.helpers.safe_hasattr(instance, self.attribute_name):
- result = AttributeNotFoundMismatch(instance, self.attribute_name)
- return result
-
-
-class AttributeNotFoundMismatch(testtools.matchers.Mismatch):
- """ The specified instance does not have the named attribute. """
-
- def __init__(self, instance, name):
- self.instance = instance
- self.attribute_name = name
-
- def describe(self):
- """ Emit a text description of this mismatch. """
- text = (
- "{instance!r}"
- " has no attribute named {name!r}").format(
- instance=self.instance, name=self.attribute_name)
- return text
-
-
-class metadata_value_TestCase(scaffold.TestCaseWithScenarios):
- """ Test cases for metadata module values. """
-
- expected_str_attributes = set([
- 'version_installed',
- 'author',
- 'copyright',
- 'license',
- 'url',
- ])
-
- scenarios = [
- (name, {'attribute_name': name})
- for name in expected_str_attributes]
- for (name, params) in scenarios:
- if name == 'version_installed':
- # No duck typing, this attribute might be None.
- params['ducktype_attribute_name'] = NotImplemented
- continue
- # Expect an attribute of ‘str’ to test this value.
- params['ducktype_attribute_name'] = 'isdigit'
-
- def test_module_has_attribute(self):
- """ Metadata should have expected value as a module attribute. """
- self.assertThat(
- metadata, HasAttribute(self.attribute_name))
-
- def test_module_attribute_has_duck_type(self):
- """ Metadata value should have expected duck-typing attribute. """
- if self.ducktype_attribute_name == NotImplemented:
- self.skipTest("Can't assert this attribute's type")
- instance = getattr(metadata, self.attribute_name)
- self.assertThat(
- instance, HasAttribute(self.ducktype_attribute_name))
-
-
-class parse_person_field_TestCase(
- testscenarios.WithScenarios, testtools.TestCase):
- """ Test cases for ‘get_latest_version’ function. """
-
- scenarios = [
- ('simple', {
- 'test_person': "Foo Bar <foo.bar@example.com>",
- 'expected_result': ("Foo Bar", "foo.bar@example.com"),
- }),
- ('empty', {
- 'test_person': "",
- 'expected_result': (None, None),
- }),
- ('none', {
- 'test_person': None,
- 'expected_error': TypeError,
- }),
- ('no email', {
- 'test_person': "Foo Bar",
- 'expected_result': ("Foo Bar", None),
- }),
- ]
-
- def test_returns_expected_result(self):
- """ Should return expected result. """
- if hasattr(self, 'expected_error'):
- self.assertRaises(
- self.expected_error,
- metadata.parse_person_field, self.test_person)
- else:
- result = metadata.parse_person_field(self.test_person)
- self.assertEqual(self.expected_result, result)
-
-
-class YearRange_TestCase(scaffold.TestCaseWithScenarios):
- """ Test cases for ‘YearRange’ class. """
-
- scenarios = [
- ('simple', {
- 'begin_year': 1970,
- 'end_year': 1979,
- 'expected_text': "1970–1979",
- }),
- ('same year', {
- 'begin_year': 1970,
- 'end_year': 1970,
- 'expected_text': "1970",
- }),
- ('no end year', {
- 'begin_year': 1970,
- 'end_year': None,
- 'expected_text': "1970",
- }),
- ]
-
- def setUp(self):
- """ Set up test fixtures. """
- super(YearRange_TestCase, self).setUp()
-
- self.test_instance = metadata.YearRange(
- self.begin_year, self.end_year)
-
- def test_text_representation_as_expected(self):
- """ Text representation should be as expected. """
- result = unicode(self.test_instance)
- self.assertEqual(result, self.expected_text)
-
-
-FakeYearRange = collections.namedtuple('FakeYearRange', ['begin', 'end'])
-
-@mock.patch.object(metadata, 'YearRange', new=FakeYearRange)
-class make_year_range_TestCase(scaffold.TestCaseWithScenarios):
- """ Test cases for ‘make_year_range’ function. """
-
- scenarios = [
- ('simple', {
- 'begin_year': "1970",
- 'end_date': "1979-01-01",
- 'expected_range': FakeYearRange(begin=1970, end=1979),
- }),
- ('same year', {
- 'begin_year': "1970",
- 'end_date': "1970-01-01",
- 'expected_range': FakeYearRange(begin=1970, end=1970),
- }),
- ('no end year', {
- 'begin_year': "1970",
- 'end_date': None,
- 'expected_range': FakeYearRange(begin=1970, end=None),
- }),
- ('end date UNKNOWN token', {
- 'begin_year': "1970",
- 'end_date': "UNKNOWN",
- 'expected_range': FakeYearRange(begin=1970, end=None),
- }),
- ('end date FUTURE token', {
- 'begin_year': "1970",
- 'end_date': "FUTURE",
- 'expected_range': FakeYearRange(begin=1970, end=None),
- }),
- ]
-
- def test_result_matches_expected_range(self):
- """ Result should match expected YearRange. """
- result = metadata.make_year_range(self.begin_year, self.end_date)
- self.assertEqual(result, self.expected_range)
-
-
-class metadata_content_TestCase(scaffold.TestCase):
- """ Test cases for content of metadata. """
-
- def test_copyright_formatted_correctly(self):
- """ Copyright statement should be formatted correctly. """
- regex_pattern = (
- "Copyright © "
- "\d{4}" # four-digit year
- "(?:–\d{4})?" # optional range dash and ending four-digit year
- )
- regex_flags = re.UNICODE
- self.assertThat(
- metadata.copyright,
- testtools.matchers.MatchesRegex(regex_pattern, regex_flags))
-
- def test_author_formatted_correctly(self):
- """ Author information should be formatted correctly. """
- regex_pattern = (
- ".+ " # name
- "<[^>]+>" # email address, in angle brackets
- )
- regex_flags = re.UNICODE
- self.assertThat(
- metadata.author,
- testtools.matchers.MatchesRegex(regex_pattern, regex_flags))
-
- def test_copyright_contains_author(self):
- """ Copyright information should contain author information. """
- self.assertThat(
- metadata.copyright,
- testtools.matchers.Contains(metadata.author))
-
- def test_url_parses_correctly(self):
- """ Homepage URL should parse correctly. """
- result = urlparse.urlparse(metadata.url)
- self.assertIsInstance(
- result, urlparse.ParseResult,
- "URL value {url!r} did not parse correctly".format(
- url=metadata.url))
-
-
-try:
- FileNotFoundError
-except NameError:
- # Python 2 uses IOError.
- FileNotFoundError = functools.partial(IOError, errno.ENOENT)
-
-version_info_filename = "version_info.json"
-
-def fake_func_has_metadata(testcase, resource_name):
- """ Fake the behaviour of ‘pkg_resources.Distribution.has_metadata’. """
- if (
- resource_name != testcase.expected_resource_name
- or not hasattr(testcase, 'test_version_info')):
- return False
- return True
-
-
-def fake_func_get_metadata(testcase, resource_name):
- """ Fake the behaviour of ‘pkg_resources.Distribution.get_metadata’. """
- if not fake_func_has_metadata(testcase, resource_name):
- error = FileNotFoundError(resource_name)
- raise error
- content = testcase.test_version_info
- return content
-
-
-def fake_func_get_distribution(testcase, distribution_name):
- """ Fake the behaviour of ‘pkg_resources.get_distribution’. """
- if distribution_name != metadata.distribution_name:
- raise pkg_resources.DistributionNotFound
- if hasattr(testcase, 'get_distribution_error'):
- raise testcase.get_distribution_error
- mock_distribution = testcase.mock_distribution
- mock_distribution.has_metadata.side_effect = functools.partial(
- fake_func_has_metadata, testcase)
- mock_distribution.get_metadata.side_effect = functools.partial(
- fake_func_get_metadata, testcase)
- return mock_distribution
-
-
-@mock.patch.object(metadata, 'distribution_name', new="mock-dist")
-class get_distribution_version_info_TestCase(scaffold.TestCaseWithScenarios):
- """ Test cases for ‘get_distribution_version_info’ function. """
-
- default_version_info = {
- 'release_date': "UNKNOWN",
- 'version': "UNKNOWN",
- 'maintainer': "UNKNOWN",
- }
-
- scenarios = [
- ('version 0.0', {
- 'test_version_info': json.dumps({
- 'version': "0.0",
- }),
- 'expected_version_info': {'version': "0.0"},
- }),
- ('version 1.0', {
- 'test_version_info': json.dumps({
- 'version': "1.0",
- }),
- 'expected_version_info': {'version': "1.0"},
- }),
- ('file lorem_ipsum.json', {
- 'version_info_filename': "lorem_ipsum.json",
- 'test_version_info': json.dumps({
- 'version': "1.0",
- }),
- 'expected_version_info': {'version': "1.0"},
- }),
- ('not installed', {
- 'get_distribution_error': pkg_resources.DistributionNotFound(),
- 'expected_version_info': default_version_info,
- }),
- ('no version_info', {
- 'expected_version_info': default_version_info,
- }),
- ]
-
- def setUp(self):
- """ Set up test fixtures. """
- super(get_distribution_version_info_TestCase, self).setUp()
-
- if hasattr(self, 'expected_resource_name'):
- self.test_args = {'filename': self.expected_resource_name}
- else:
- self.test_args = {}
- self.expected_resource_name = version_info_filename
-
- self.mock_distribution = mock.MagicMock()
- func_patcher_get_distribution = mock.patch.object(
- pkg_resources, 'get_distribution')
- func_patcher_get_distribution.start()
- self.addCleanup(func_patcher_get_distribution.stop)
- pkg_resources.get_distribution.side_effect = functools.partial(
- fake_func_get_distribution, self)
-
- def test_requests_installed_distribution(self):
- """ The package distribution should be retrieved. """
- expected_distribution_name = metadata.distribution_name
- version_info = metadata.get_distribution_version_info(**self.test_args)
- pkg_resources.get_distribution.assert_called_with(
- expected_distribution_name)
-
- def test_requests_specified_filename(self):
- """ The specified metadata resource name should be requested. """
- if hasattr(self, 'get_distribution_error'):
- self.skipTest("No access to distribution")
- version_info = metadata.get_distribution_version_info(**self.test_args)
- self.mock_distribution.has_metadata.assert_called_with(
- self.expected_resource_name)
-
- def test_result_matches_expected_items(self):
- """ The result should match the expected items. """
- version_info = metadata.get_distribution_version_info(**self.test_args)
- self.assertEqual(self.expected_version_info, version_info)
-
-
-# Local variables:
-# coding: utf-8
-# mode: python
-# End:
-# vim: fileencoding=utf-8 filetype=python :
diff --git a/external_libs/python/python-daemon-2.0.5/test/test_pidfile.py b/external_libs/python/python-daemon-2.0.5/test/test_pidfile.py
deleted file mode 100644
index 9b636ec8..00000000
--- a/external_libs/python/python-daemon-2.0.5/test/test_pidfile.py
+++ /dev/null
@@ -1,472 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# test/test_pidfile.py
-# Part of ‘python-daemon’, an implementation of PEP 3143.
-#
-# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
-#
-# This is free software: you may copy, modify, and/or distribute this work
-# under the terms of the Apache License, version 2.0 as published by the
-# Apache Software Foundation.
-# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
-
-""" Unit test for ‘pidfile’ module.
- """
-
-from __future__ import (absolute_import, unicode_literals)
-
-try:
- # Python 3 standard library.
- import builtins
-except ImportError:
- # Python 2 standard library.
- import __builtin__ as builtins
-import os
-import itertools
-import tempfile
-import errno
-import functools
-try:
- # Standard library of Python 2.7 and later.
- from io import StringIO
-except ImportError:
- # Standard library of Python 2.6 and earlier.
- from StringIO import StringIO
-
-import mock
-import lockfile
-
-from . import scaffold
-
-import daemon.pidfile
-
-
-class FakeFileDescriptorStringIO(StringIO, object):
- """ A StringIO class that fakes a file descriptor. """
-
- _fileno_generator = itertools.count()
-
- def __init__(self, *args, **kwargs):
- self._fileno = next(self._fileno_generator)
- super(FakeFileDescriptorStringIO, self).__init__(*args, **kwargs)
-
- def fileno(self):
- return self._fileno
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
-
-
-try:
- FileNotFoundError
- PermissionError
-except NameError:
- # Python 2 uses IOError.
- FileNotFoundError = functools.partial(IOError, errno.ENOENT)
- PermissionError = functools.partial(IOError, errno.EPERM)
-
-
-def make_pidlockfile_scenarios():
- """ Make a collection of scenarios for testing `PIDLockFile` instances.
-
- :return: A collection of scenarios for tests involving
- `PIDLockfFile` instances.
-
- The collection is a mapping from scenario name to a dictionary of
- scenario attributes.
-
- """
-
- fake_current_pid = 235
- fake_other_pid = 8642
- fake_pidfile_path = tempfile.mktemp()
-
- fake_pidfile_empty = FakeFileDescriptorStringIO()
- fake_pidfile_current_pid = FakeFileDescriptorStringIO(
- "{pid:d}\n".format(pid=fake_current_pid))
- fake_pidfile_other_pid = FakeFileDescriptorStringIO(
- "{pid:d}\n".format(pid=fake_other_pid))
- fake_pidfile_bogus = FakeFileDescriptorStringIO(
- "b0gUs")
-
- scenarios = {
- 'simple': {},
- 'not-exist': {
- 'open_func_name': 'fake_open_nonexist',
- 'os_open_func_name': 'fake_os_open_nonexist',
- },
- 'not-exist-write-denied': {
- 'open_func_name': 'fake_open_nonexist',
- 'os_open_func_name': 'fake_os_open_nonexist',
- },
- 'not-exist-write-busy': {
- 'open_func_name': 'fake_open_nonexist',
- 'os_open_func_name': 'fake_os_open_nonexist',
- },
- 'exist-read-denied': {
- 'open_func_name': 'fake_open_read_denied',
- 'os_open_func_name': 'fake_os_open_read_denied',
- },
- 'exist-locked-read-denied': {
- 'locking_pid': fake_other_pid,
- 'open_func_name': 'fake_open_read_denied',
- 'os_open_func_name': 'fake_os_open_read_denied',
- },
- 'exist-empty': {},
- 'exist-invalid': {
- 'pidfile': fake_pidfile_bogus,
- },
- 'exist-current-pid': {
- 'pidfile': fake_pidfile_current_pid,
- 'pidfile_pid': fake_current_pid,
- },
- 'exist-current-pid-locked': {
- 'pidfile': fake_pidfile_current_pid,
- 'pidfile_pid': fake_current_pid,
- 'locking_pid': fake_current_pid,
- },
- 'exist-other-pid': {
- 'pidfile': fake_pidfile_other_pid,
- 'pidfile_pid': fake_other_pid,
- },
- 'exist-other-pid-locked': {
- 'pidfile': fake_pidfile_other_pid,
- 'pidfile_pid': fake_other_pid,
- 'locking_pid': fake_other_pid,
- },
- }
-
- for scenario in scenarios.values():
- scenario['pid'] = fake_current_pid
- scenario['pidfile_path'] = fake_pidfile_path
- if 'pidfile' not in scenario:
- scenario['pidfile'] = fake_pidfile_empty
- if 'pidfile_pid' not in scenario:
- scenario['pidfile_pid'] = None
- if 'locking_pid' not in scenario:
- scenario['locking_pid'] = None
- if 'open_func_name' not in scenario:
- scenario['open_func_name'] = 'fake_open_okay'
- if 'os_open_func_name' not in scenario:
- scenario['os_open_func_name'] = 'fake_os_open_okay'
-
- return scenarios
-
-
-def setup_pidfile_fixtures(testcase):
- """ Set up common fixtures for PID file test cases.
-
- :param testcase: A `TestCase` instance to decorate.
-
- Decorate the `testcase` with attributes to be fixtures for tests
- involving `PIDLockFile` instances.
-
- """
- scenarios = make_pidlockfile_scenarios()
- testcase.pidlockfile_scenarios = scenarios
-
- def get_scenario_option(testcase, key, default=None):
- value = default
- try:
- value = testcase.scenario[key]
- except (NameError, TypeError, AttributeError, KeyError):
- pass
- return value
-
- func_patcher_os_getpid = mock.patch.object(
- os, "getpid",
- return_value=scenarios['simple']['pid'])
- func_patcher_os_getpid.start()
- testcase.addCleanup(func_patcher_os_getpid.stop)
-
- def make_fake_open_funcs(testcase):
-
- def fake_open_nonexist(filename, mode, buffering):
- if mode.startswith('r'):
- error = FileNotFoundError(
- "No such file {filename!r}".format(
- filename=filename))
- raise error
- else:
- result = testcase.scenario['pidfile']
- return result
-
- def fake_open_read_denied(filename, mode, buffering):
- if mode.startswith('r'):
- error = PermissionError(
- "Read denied on {filename!r}".format(
- filename=filename))
- raise error
- else:
- result = testcase.scenario['pidfile']
- return result
-
- def fake_open_okay(filename, mode, buffering):
- result = testcase.scenario['pidfile']
- return result
-
- def fake_os_open_nonexist(filename, flags, mode):
- if (flags & os.O_CREAT):
- result = testcase.scenario['pidfile'].fileno()
- else:
- error = FileNotFoundError(
- "No such file {filename!r}".format(
- filename=filename))
- raise error
- return result
-
- def fake_os_open_read_denied(filename, flags, mode):
- if (flags & os.O_CREAT):
- result = testcase.scenario['pidfile'].fileno()
- else:
- error = PermissionError(
- "Read denied on {filename!r}".format(
- filename=filename))
- raise error
- return result
-
- def fake_os_open_okay(filename, flags, mode):
- result = testcase.scenario['pidfile'].fileno()
- return result
-
- funcs = dict(
- (name, obj) for (name, obj) in vars().items()
- if callable(obj))
-
- return funcs
-
- testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase)
-
- def fake_open(filename, mode='rt', buffering=None):
- scenario_path = get_scenario_option(testcase, 'pidfile_path')
- if filename == scenario_path:
- func_name = testcase.scenario['open_func_name']
- fake_open_func = testcase.fake_pidfile_open_funcs[func_name]
- result = fake_open_func(filename, mode, buffering)
- else:
- result = FakeFileDescriptorStringIO()
- return result
-
- mock_open = mock.mock_open()
- mock_open.side_effect = fake_open
-
- func_patcher_builtin_open = mock.patch.object(
- builtins, "open",
- new=mock_open)
- func_patcher_builtin_open.start()
- testcase.addCleanup(func_patcher_builtin_open.stop)
-
- def fake_os_open(filename, flags, mode=None):
- scenario_path = get_scenario_option(testcase, 'pidfile_path')
- if filename == scenario_path:
- func_name = testcase.scenario['os_open_func_name']
- fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name]
- result = fake_os_open_func(filename, flags, mode)
- else:
- result = FakeFileDescriptorStringIO().fileno()
- return result
-
- mock_os_open = mock.MagicMock(side_effect=fake_os_open)
-
- func_patcher_os_open = mock.patch.object(
- os, "open",
- new=mock_os_open)
- func_patcher_os_open.start()
- testcase.addCleanup(func_patcher_os_open.stop)
-
- def fake_os_fdopen(fd, mode='rt', buffering=None):
- scenario_pidfile = get_scenario_option(
- testcase, 'pidfile', FakeFileDescriptorStringIO())
- if fd == testcase.scenario['pidfile'].fileno():
- result = testcase.scenario['pidfile']
- else:
- raise OSError(errno.EBADF, "Bad file descriptor")
- return result
-
- mock_os_fdopen = mock.MagicMock(side_effect=fake_os_fdopen)
-
- func_patcher_os_fdopen = mock.patch.object(
- os, "fdopen",
- new=mock_os_fdopen)
- func_patcher_os_fdopen.start()
- testcase.addCleanup(func_patcher_os_fdopen.stop)
-
-
-def make_lockfile_method_fakes(scenario):
- """ Make common fake methods for lockfile class.
-
- :param scenario: A scenario for testing with PIDLockFile.
- :return: A mapping from normal function name to the corresponding
- fake function.
-
- Each fake function behaves appropriately for the specified `scenario`.
-
- """
-
- def fake_func_read_pid():
- return scenario['pidfile_pid']
- def fake_func_is_locked():
- return (scenario['locking_pid'] is not None)
- def fake_func_i_am_locking():
- return (
- scenario['locking_pid'] == scenario['pid'])
- def fake_func_acquire(timeout=None):
- if scenario['locking_pid'] is not None:
- raise lockfile.AlreadyLocked()
- scenario['locking_pid'] = scenario['pid']
- def fake_func_release():
- if scenario['locking_pid'] is None:
- raise lockfile.NotLocked()
- if scenario['locking_pid'] != scenario['pid']:
- raise lockfile.NotMyLock()
- scenario['locking_pid'] = None
- def fake_func_break_lock():
- scenario['locking_pid'] = None
-
- fake_methods = dict(
- (
- func_name.replace('fake_func_', ''),
- mock.MagicMock(side_effect=fake_func))
- for (func_name, fake_func) in vars().items()
- if func_name.startswith('fake_func_'))
-
- return fake_methods
-
-
-def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario):
- """ Apply common fake methods to mock lockfile class.
-
- :param mock_lockfile: An object providing the `LockFile` interface.
- :param testcase: The `TestCase` instance providing the context for
- the patch.
- :param scenario: The `PIDLockFile` test scenario to use.
-
- Mock the `LockFile` methods of `mock_lockfile`, by applying fake
- methods customised for `scenario`. The mock is does by a patch
- within the context of `testcase`.
-
- """
- fake_methods = dict(
- (func_name, fake_func)
- for (func_name, fake_func) in
- make_lockfile_method_fakes(scenario).items()
- if func_name not in ['read_pid'])
-
- for (func_name, fake_func) in fake_methods.items():
- func_patcher = mock.patch.object(
- mock_lockfile, func_name,
- new=fake_func)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
-
-
-def setup_pidlockfile_fixtures(testcase, scenario_name=None):
- """ Set up common fixtures for PIDLockFile test cases.
-
- :param testcase: The `TestCase` instance to decorate.
- :param scenario_name: The name of the `PIDLockFile` scenario to use.
-
- Decorate the `testcase` with attributes that are fixtures for test
- cases involving `PIDLockFile` instances.`
-
- """
-
- setup_pidfile_fixtures(testcase)
-
- for func_name in [
- 'write_pid_to_pidfile',
- 'remove_existing_pidfile',
- ]:
- func_patcher = mock.patch.object(lockfile.pidlockfile, func_name)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
-
-
-class TimeoutPIDLockFile_TestCase(scaffold.TestCase):
- """ Test cases for ‘TimeoutPIDLockFile’ class. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(TimeoutPIDLockFile_TestCase, self).setUp()
-
- pidlockfile_scenarios = make_pidlockfile_scenarios()
- self.pidlockfile_scenario = pidlockfile_scenarios['simple']
- pidfile_path = self.pidlockfile_scenario['pidfile_path']
-
- for func_name in ['__init__', 'acquire']:
- func_patcher = mock.patch.object(
- lockfile.pidlockfile.PIDLockFile, func_name)
- func_patcher.start()
- self.addCleanup(func_patcher.stop)
-
- self.scenario = {
- 'pidfile_path': self.pidlockfile_scenario['pidfile_path'],
- 'acquire_timeout': self.getUniqueInteger(),
- }
-
- self.test_kwargs = dict(
- path=self.scenario['pidfile_path'],
- acquire_timeout=self.scenario['acquire_timeout'],
- )
- self.test_instance = daemon.pidfile.TimeoutPIDLockFile(
- **self.test_kwargs)
-
- def test_inherits_from_pidlockfile(self):
- """ Should inherit from PIDLockFile. """
- instance = self.test_instance
- self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile)
-
- def test_init_has_expected_signature(self):
- """ Should have expected signature for ‘__init__’. """
- def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass
- test_func.__name__ = str('__init__')
- self.assertFunctionSignatureMatch(
- test_func,
- daemon.pidfile.TimeoutPIDLockFile.__init__)
-
- def test_has_specified_acquire_timeout(self):
- """ Should have specified ‘acquire_timeout’ value. """
- instance = self.test_instance
- expected_timeout = self.test_kwargs['acquire_timeout']
- self.assertEqual(expected_timeout, instance.acquire_timeout)
-
- @mock.patch.object(
- lockfile.pidlockfile.PIDLockFile, "__init__",
- autospec=True)
- def test_calls_superclass_init(self, mock_init):
- """ Should call the superclass ‘__init__’. """
- expected_path = self.test_kwargs['path']
- instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs)
- mock_init.assert_called_with(instance, expected_path)
-
- @mock.patch.object(
- lockfile.pidlockfile.PIDLockFile, "acquire",
- autospec=True)
- def test_acquire_uses_specified_timeout(self, mock_func_acquire):
- """ Should call the superclass ‘acquire’ with specified timeout. """
- instance = self.test_instance
- test_timeout = self.getUniqueInteger()
- expected_timeout = test_timeout
- instance.acquire(test_timeout)
- mock_func_acquire.assert_called_with(instance, expected_timeout)
-
- @mock.patch.object(
- lockfile.pidlockfile.PIDLockFile, "acquire",
- autospec=True)
- def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire):
- """ Should call superclass ‘acquire’ with stored timeout by default. """
- instance = self.test_instance
- test_timeout = self.test_kwargs['acquire_timeout']
- expected_timeout = test_timeout
- instance.acquire()
- mock_func_acquire.assert_called_with(instance, expected_timeout)
-
-
-# Local variables:
-# coding: utf-8
-# mode: python
-# End:
-# vim: fileencoding=utf-8 filetype=python :
diff --git a/external_libs/python/python-daemon-2.0.5/test/test_runner.py b/external_libs/python/python-daemon-2.0.5/test/test_runner.py
deleted file mode 100644
index 4c0c714a..00000000
--- a/external_libs/python/python-daemon-2.0.5/test/test_runner.py
+++ /dev/null
@@ -1,675 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# test/test_runner.py
-# Part of ‘python-daemon’, an implementation of PEP 3143.
-#
-# Copyright © 2009–2015 Ben Finney <ben+python@benfinney.id.au>
-#
-# This is free software: you may copy, modify, and/or distribute this work
-# under the terms of the Apache License, version 2.0 as published by the
-# Apache Software Foundation.
-# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
-
-""" Unit test for ‘runner’ module.
- """
-
-from __future__ import (absolute_import, unicode_literals)
-
-try:
- # Python 3 standard library.
- import builtins
-except ImportError:
- # Python 2 standard library.
- import __builtin__ as builtins
-import os
-import os.path
-import sys
-import tempfile
-import errno
-import signal
-import functools
-
-import lockfile
-import mock
-import testtools
-
-from . import scaffold
-from .scaffold import (basestring, unicode)
-from .test_pidfile import (
- FakeFileDescriptorStringIO,
- setup_pidfile_fixtures,
- make_pidlockfile_scenarios,
- apply_lockfile_method_mocks,
- )
-from .test_daemon import (
- setup_streams_fixtures,
- )
-
-import daemon.daemon
-import daemon.runner
-import daemon.pidfile
-
-
-class ModuleExceptions_TestCase(scaffold.Exception_TestCase):
- """ Test cases for module exception classes. """
-
- scenarios = scaffold.make_exception_scenarios([
- ('daemon.runner.DaemonRunnerError', dict(
- exc_type = daemon.runner.DaemonRunnerError,
- min_args = 1,
- types = [Exception],
- )),
- ('daemon.runner.DaemonRunnerInvalidActionError', dict(
- exc_type = daemon.runner.DaemonRunnerInvalidActionError,
- min_args = 1,
- types = [daemon.runner.DaemonRunnerError, ValueError],
- )),
- ('daemon.runner.DaemonRunnerStartFailureError', dict(
- exc_type = daemon.runner.DaemonRunnerStartFailureError,
- min_args = 1,
- types = [daemon.runner.DaemonRunnerError, RuntimeError],
- )),
- ('daemon.runner.DaemonRunnerStopFailureError', dict(
- exc_type = daemon.runner.DaemonRunnerStopFailureError,
- min_args = 1,
- types = [daemon.runner.DaemonRunnerError, RuntimeError],
- )),
- ])
-
-
-def make_runner_scenarios():
- """ Make a collection of scenarios for testing `DaemonRunner` instances.
-
- :return: A collection of scenarios for tests involving
- `DaemonRunner` instances.
-
- The collection is a mapping from scenario name to a dictionary of
- scenario attributes.
-
- """
-
- pidlockfile_scenarios = make_pidlockfile_scenarios()
-
- scenarios = {
- 'simple': {
- 'pidlockfile_scenario_name': 'simple',
- },
- 'pidfile-locked': {
- 'pidlockfile_scenario_name': 'exist-other-pid-locked',
- },
- }
-
- for scenario in scenarios.values():
- if 'pidlockfile_scenario_name' in scenario:
- pidlockfile_scenario = pidlockfile_scenarios.pop(
- scenario['pidlockfile_scenario_name'])
- scenario['pid'] = pidlockfile_scenario['pid']
- scenario['pidfile_path'] = pidlockfile_scenario['pidfile_path']
- scenario['pidfile_timeout'] = 23
- scenario['pidlockfile_scenario'] = pidlockfile_scenario
-
- return scenarios
-
-
-def set_runner_scenario(testcase, scenario_name):
- """ Set the DaemonRunner test scenario for the test case.
-
- :param testcase: The `TestCase` instance to decorate.
- :param scenario_name: The name of the scenario to use.
-
- Set the `DaemonRunner` test scenario name and decorate the
- `testcase` with the corresponding scenario fixtures.
-
- """
- scenarios = testcase.runner_scenarios
- testcase.scenario = scenarios[scenario_name]
- apply_lockfile_method_mocks(
- testcase.mock_runner_lockfile,
- testcase,
- testcase.scenario['pidlockfile_scenario'])
-
-
-def setup_runner_fixtures(testcase):
- """ Set up common fixtures for `DaemonRunner` test cases.
-
- :param testcase: A `TestCase` instance to decorate.
-
- Decorate the `testcase` with attributes to be fixtures for tests
- involving `DaemonRunner` instances.
-
- """
- setup_pidfile_fixtures(testcase)
- setup_streams_fixtures(testcase)
-
- testcase.runner_scenarios = make_runner_scenarios()
-
- patcher_stderr = mock.patch.object(
- sys, "stderr",
- new=FakeFileDescriptorStringIO())
- testcase.fake_stderr = patcher_stderr.start()
- testcase.addCleanup(patcher_stderr.stop)
-
- simple_scenario = testcase.runner_scenarios['simple']
-
- testcase.mock_runner_lockfile = mock.MagicMock(
- spec=daemon.pidfile.TimeoutPIDLockFile)
- apply_lockfile_method_mocks(
- testcase.mock_runner_lockfile,
- testcase,
- simple_scenario['pidlockfile_scenario'])
- testcase.mock_runner_lockfile.path = simple_scenario['pidfile_path']
-
- patcher_lockfile_class = mock.patch.object(
- daemon.pidfile, "TimeoutPIDLockFile",
- return_value=testcase.mock_runner_lockfile)
- patcher_lockfile_class.start()
- testcase.addCleanup(patcher_lockfile_class.stop)
-
- class TestApp(object):
-
- def __init__(self):
- self.stdin_path = testcase.stream_file_paths['stdin']
- self.stdout_path = testcase.stream_file_paths['stdout']
- self.stderr_path = testcase.stream_file_paths['stderr']
- self.pidfile_path = simple_scenario['pidfile_path']
- self.pidfile_timeout = simple_scenario['pidfile_timeout']
-
- run = mock.MagicMock(name="TestApp.run")
-
- testcase.TestApp = TestApp
-
- patcher_runner_daemoncontext = mock.patch.object(
- daemon.runner, "DaemonContext", autospec=True)
- patcher_runner_daemoncontext.start()
- testcase.addCleanup(patcher_runner_daemoncontext.stop)
-
- testcase.test_app = testcase.TestApp()
-
- testcase.test_program_name = "bazprog"
- testcase.test_program_path = os.path.join(
- "/foo/bar", testcase.test_program_name)
- testcase.valid_argv_params = {
- 'start': [testcase.test_program_path, 'start'],
- 'stop': [testcase.test_program_path, 'stop'],
- 'restart': [testcase.test_program_path, 'restart'],
- }
-
- def fake_open(filename, mode=None, buffering=None):
- if filename in testcase.stream_files_by_path:
- result = testcase.stream_files_by_path[filename]
- else:
- result = FakeFileDescriptorStringIO()
- result.mode = mode
- result.buffering = buffering
- return result
-
- mock_open = mock.mock_open()
- mock_open.side_effect = fake_open
-
- func_patcher_builtin_open = mock.patch.object(
- builtins, "open",
- new=mock_open)
- func_patcher_builtin_open.start()
- testcase.addCleanup(func_patcher_builtin_open.stop)
-
- func_patcher_os_kill = mock.patch.object(os, "kill")
- func_patcher_os_kill.start()
- testcase.addCleanup(func_patcher_os_kill.stop)
-
- patcher_sys_argv = mock.patch.object(
- sys, "argv",
- new=testcase.valid_argv_params['start'])
- patcher_sys_argv.start()
- testcase.addCleanup(patcher_sys_argv.stop)
-
- testcase.test_instance = daemon.runner.DaemonRunner(testcase.test_app)
-
- testcase.scenario = NotImplemented
-
-
-class DaemonRunner_BaseTestCase(scaffold.TestCase):
- """ Base class for DaemonRunner test case classes. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonRunner_BaseTestCase, self).setUp()
-
- setup_runner_fixtures(self)
- set_runner_scenario(self, 'simple')
-
-
-class DaemonRunner_TestCase(DaemonRunner_BaseTestCase):
- """ Test cases for DaemonRunner class. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonRunner_TestCase, self).setUp()
-
- func_patcher_parse_args = mock.patch.object(
- daemon.runner.DaemonRunner, "parse_args")
- func_patcher_parse_args.start()
- self.addCleanup(func_patcher_parse_args.stop)
-
- # Create a new instance now with our custom patches.
- self.test_instance = daemon.runner.DaemonRunner(self.test_app)
-
- def test_instantiate(self):
- """ New instance of DaemonRunner should be created. """
- self.assertIsInstance(self.test_instance, daemon.runner.DaemonRunner)
-
- def test_parses_commandline_args(self):
- """ Should parse commandline arguments. """
- self.test_instance.parse_args.assert_called_with()
-
- def test_has_specified_app(self):
- """ Should have specified application object. """
- self.assertIs(self.test_app, self.test_instance.app)
-
- def test_sets_pidfile_none_when_pidfile_path_is_none(self):
- """ Should set ‘pidfile’ to ‘None’ when ‘pidfile_path’ is ‘None’. """
- pidfile_path = None
- self.test_app.pidfile_path = pidfile_path
- expected_pidfile = None
- instance = daemon.runner.DaemonRunner(self.test_app)
- self.assertIs(expected_pidfile, instance.pidfile)
-
- def test_error_when_pidfile_path_not_string(self):
- """ Should raise ValueError when PID file path not a string. """
- pidfile_path = object()
- self.test_app.pidfile_path = pidfile_path
- expected_error = ValueError
- self.assertRaises(
- expected_error,
- daemon.runner.DaemonRunner, self.test_app)
-
- def test_error_when_pidfile_path_not_absolute(self):
- """ Should raise ValueError when PID file path not absolute. """
- pidfile_path = "foo/bar.pid"
- self.test_app.pidfile_path = pidfile_path
- expected_error = ValueError
- self.assertRaises(
- expected_error,
- daemon.runner.DaemonRunner, self.test_app)
-
- def test_creates_lock_with_specified_parameters(self):
- """ Should create a TimeoutPIDLockFile with specified params. """
- pidfile_path = self.scenario['pidfile_path']
- pidfile_timeout = self.scenario['pidfile_timeout']
- daemon.pidfile.TimeoutPIDLockFile.assert_called_with(
- pidfile_path, pidfile_timeout)
-
- def test_has_created_pidfile(self):
- """ Should have new PID lock file as `pidfile` attribute. """
- expected_pidfile = self.mock_runner_lockfile
- instance = self.test_instance
- self.assertIs(
- expected_pidfile, instance.pidfile)
-
- def test_daemon_context_has_created_pidfile(self):
- """ DaemonContext component should have new PID lock file. """
- expected_pidfile = self.mock_runner_lockfile
- daemon_context = self.test_instance.daemon_context
- self.assertIs(
- expected_pidfile, daemon_context.pidfile)
-
- def test_daemon_context_has_specified_stdin_stream(self):
- """ DaemonContext component should have specified stdin file. """
- test_app = self.test_app
- expected_file = self.stream_files_by_name['stdin']
- daemon_context = self.test_instance.daemon_context
- self.assertEqual(expected_file, daemon_context.stdin)
-
- def test_daemon_context_has_stdin_in_read_mode(self):
- """ DaemonContext component should open stdin file for read. """
- expected_mode = 'rt'
- daemon_context = self.test_instance.daemon_context
- self.assertIn(expected_mode, daemon_context.stdin.mode)
-
- def test_daemon_context_has_specified_stdout_stream(self):
- """ DaemonContext component should have specified stdout file. """
- test_app = self.test_app
- expected_file = self.stream_files_by_name['stdout']
- daemon_context = self.test_instance.daemon_context
- self.assertEqual(expected_file, daemon_context.stdout)
-
- def test_daemon_context_has_stdout_in_append_mode(self):
- """ DaemonContext component should open stdout file for append. """
- expected_mode = 'w+t'
- daemon_context = self.test_instance.daemon_context
- self.assertIn(expected_mode, daemon_context.stdout.mode)
-
- def test_daemon_context_has_specified_stderr_stream(self):
- """ DaemonContext component should have specified stderr file. """
- test_app = self.test_app
- expected_file = self.stream_files_by_name['stderr']
- daemon_context = self.test_instance.daemon_context
- self.assertEqual(expected_file, daemon_context.stderr)
-
- def test_daemon_context_has_stderr_in_append_mode(self):
- """ DaemonContext component should open stderr file for append. """
- expected_mode = 'w+t'
- daemon_context = self.test_instance.daemon_context
- self.assertIn(expected_mode, daemon_context.stderr.mode)
-
- def test_daemon_context_has_stderr_with_no_buffering(self):
- """ DaemonContext component should open stderr file unbuffered. """
- expected_buffering = 0
- daemon_context = self.test_instance.daemon_context
- self.assertEqual(
- expected_buffering, daemon_context.stderr.buffering)
-
-
-class DaemonRunner_usage_exit_TestCase(DaemonRunner_BaseTestCase):
- """ Test cases for DaemonRunner.usage_exit method. """
-
- def test_raises_system_exit(self):
- """ Should raise SystemExit exception. """
- instance = self.test_instance
- argv = [self.test_program_path]
- self.assertRaises(
- SystemExit,
- instance._usage_exit, argv)
-
- def test_message_follows_conventional_format(self):
- """ Should emit a conventional usage message. """
- instance = self.test_instance
- argv = [self.test_program_path]
- expected_stderr_output = """\
- usage: {progname} ...
- """.format(
- progname=self.test_program_name)
- self.assertRaises(
- SystemExit,
- instance._usage_exit, argv)
- self.assertOutputCheckerMatch(
- expected_stderr_output, self.fake_stderr.getvalue())
-
-
-class DaemonRunner_parse_args_TestCase(DaemonRunner_BaseTestCase):
- """ Test cases for DaemonRunner.parse_args method. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonRunner_parse_args_TestCase, self).setUp()
-
- func_patcher_usage_exit = mock.patch.object(
- daemon.runner.DaemonRunner, "_usage_exit",
- side_effect=NotImplementedError)
- func_patcher_usage_exit.start()
- self.addCleanup(func_patcher_usage_exit.stop)
-
- def test_emits_usage_message_if_insufficient_args(self):
- """ Should emit a usage message and exit if too few arguments. """
- instance = self.test_instance
- argv = [self.test_program_path]
- exc = self.assertRaises(
- NotImplementedError,
- instance.parse_args, argv)
- daemon.runner.DaemonRunner._usage_exit.assert_called_with(argv)
-
- def test_emits_usage_message_if_unknown_action_arg(self):
- """ Should emit a usage message and exit if unknown action. """
- instance = self.test_instance
- progname = self.test_program_name
- argv = [self.test_program_path, 'bogus']
- exc = self.assertRaises(
- NotImplementedError,
- instance.parse_args, argv)
- daemon.runner.DaemonRunner._usage_exit.assert_called_with(argv)
-
- def test_should_parse_system_argv_by_default(self):
- """ Should parse sys.argv by default. """
- instance = self.test_instance
- expected_action = 'start'
- argv = self.valid_argv_params['start']
- with mock.patch.object(sys, "argv", new=argv):
- instance.parse_args()
- self.assertEqual(expected_action, instance.action)
-
- def test_sets_action_from_first_argument(self):
- """ Should set action from first commandline argument. """
- instance = self.test_instance
- for name, argv in self.valid_argv_params.items():
- expected_action = name
- instance.parse_args(argv)
- self.assertEqual(expected_action, instance.action)
-
-
-try:
- ProcessLookupError
-except NameError:
- # Python 2 uses OSError.
- ProcessLookupError = functools.partial(OSError, errno.ESRCH)
-
-class DaemonRunner_do_action_TestCase(DaemonRunner_BaseTestCase):
- """ Test cases for DaemonRunner.do_action method. """
-
- def test_raises_error_if_unknown_action(self):
- """ Should emit a usage message and exit if action is unknown. """
- instance = self.test_instance
- instance.action = 'bogus'
- expected_error = daemon.runner.DaemonRunnerInvalidActionError
- self.assertRaises(
- expected_error,
- instance.do_action)
-
-
-class DaemonRunner_do_action_start_TestCase(DaemonRunner_BaseTestCase):
- """ Test cases for DaemonRunner.do_action method, action 'start'. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonRunner_do_action_start_TestCase, self).setUp()
-
- self.test_instance.action = 'start'
-
- def test_raises_error_if_pidfile_locked(self):
- """ Should raise error if PID file is locked. """
-
- instance = self.test_instance
- instance.daemon_context.open.side_effect = lockfile.AlreadyLocked
- pidfile_path = self.scenario['pidfile_path']
- expected_error = daemon.runner.DaemonRunnerStartFailureError
- expected_message_content = pidfile_path
- exc = self.assertRaises(
- expected_error,
- instance.do_action)
- self.assertIn(expected_message_content, unicode(exc))
-
- def test_breaks_lock_if_no_such_process(self):
- """ Should request breaking lock if PID file process is not running. """
- set_runner_scenario(self, 'pidfile-locked')
- instance = self.test_instance
- self.mock_runner_lockfile.read_pid.return_value = (
- self.scenario['pidlockfile_scenario']['pidfile_pid'])
- pidfile_path = self.scenario['pidfile_path']
- test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
- expected_signal = signal.SIG_DFL
- test_error = ProcessLookupError("Not running")
- os.kill.side_effect = test_error
- instance.do_action()
- os.kill.assert_called_with(test_pid, expected_signal)
- self.mock_runner_lockfile.break_lock.assert_called_with()
-
- def test_requests_daemon_context_open(self):
- """ Should request the daemon context to open. """
- instance = self.test_instance
- instance.do_action()
- instance.daemon_context.open.assert_called_with()
-
- def test_emits_start_message_to_stderr(self):
- """ Should emit start message to stderr. """
- instance = self.test_instance
- expected_stderr = """\
- started with pid {pid:d}
- """.format(
- pid=self.scenario['pid'])
- instance.do_action()
- self.assertOutputCheckerMatch(
- expected_stderr, self.fake_stderr.getvalue())
-
- def test_requests_app_run(self):
- """ Should request the application to run. """
- instance = self.test_instance
- instance.do_action()
- self.test_app.run.assert_called_with()
-
-
-class DaemonRunner_do_action_stop_TestCase(DaemonRunner_BaseTestCase):
- """ Test cases for DaemonRunner.do_action method, action 'stop'. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonRunner_do_action_stop_TestCase, self).setUp()
-
- set_runner_scenario(self, 'pidfile-locked')
-
- self.test_instance.action = 'stop'
-
- self.mock_runner_lockfile.is_locked.return_value = True
- self.mock_runner_lockfile.i_am_locking.return_value = False
- self.mock_runner_lockfile.read_pid.return_value = (
- self.scenario['pidlockfile_scenario']['pidfile_pid'])
-
- def test_raises_error_if_pidfile_not_locked(self):
- """ Should raise error if PID file is not locked. """
- set_runner_scenario(self, 'simple')
- instance = self.test_instance
- self.mock_runner_lockfile.is_locked.return_value = False
- self.mock_runner_lockfile.i_am_locking.return_value = False
- self.mock_runner_lockfile.read_pid.return_value = (
- self.scenario['pidlockfile_scenario']['pidfile_pid'])
- pidfile_path = self.scenario['pidfile_path']
- expected_error = daemon.runner.DaemonRunnerStopFailureError
- expected_message_content = pidfile_path
- exc = self.assertRaises(
- expected_error,
- instance.do_action)
- self.assertIn(expected_message_content, unicode(exc))
-
- def test_breaks_lock_if_pidfile_stale(self):
- """ Should break lock if PID file is stale. """
- instance = self.test_instance
- pidfile_path = self.scenario['pidfile_path']
- test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
- expected_signal = signal.SIG_DFL
- test_error = OSError(errno.ESRCH, "Not running")
- os.kill.side_effect = test_error
- instance.do_action()
- self.mock_runner_lockfile.break_lock.assert_called_with()
-
- def test_sends_terminate_signal_to_process_from_pidfile(self):
- """ Should send SIGTERM to the daemon process. """
- instance = self.test_instance
- test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
- expected_signal = signal.SIGTERM
- instance.do_action()
- os.kill.assert_called_with(test_pid, expected_signal)
-
- def test_raises_error_if_cannot_send_signal_to_process(self):
- """ Should raise error if cannot send signal to daemon process. """
- instance = self.test_instance
- test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
- pidfile_path = self.scenario['pidfile_path']
- test_error = OSError(errno.EPERM, "Nice try")
- os.kill.side_effect = test_error
- expected_error = daemon.runner.DaemonRunnerStopFailureError
- expected_message_content = unicode(test_pid)
- exc = self.assertRaises(
- expected_error,
- instance.do_action)
- self.assertIn(expected_message_content, unicode(exc))
-
-
-@mock.patch.object(daemon.runner.DaemonRunner, "_start")
-@mock.patch.object(daemon.runner.DaemonRunner, "_stop")
-class DaemonRunner_do_action_restart_TestCase(DaemonRunner_BaseTestCase):
- """ Test cases for DaemonRunner.do_action method, action 'restart'. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(DaemonRunner_do_action_restart_TestCase, self).setUp()
-
- set_runner_scenario(self, 'pidfile-locked')
-
- self.test_instance.action = 'restart'
-
- def test_requests_stop_then_start(
- self,
- mock_func_daemonrunner_start, mock_func_daemonrunner_stop):
- """ Should request stop, then start. """
- instance = self.test_instance
- instance.do_action()
- mock_func_daemonrunner_start.assert_called_with()
- mock_func_daemonrunner_stop.assert_called_with()
-
-
-@mock.patch.object(sys, "stderr")
-class emit_message_TestCase(scaffold.TestCase):
- """ Test cases for ‘emit_message’ function. """
-
- def test_writes_specified_message_to_stream(self, mock_stderr):
- """ Should write specified message to stream. """
- test_message = self.getUniqueString()
- expected_content = "{message}\n".format(message=test_message)
- daemon.runner.emit_message(test_message, stream=mock_stderr)
- mock_stderr.write.assert_called_with(expected_content)
-
- def test_writes_to_specified_stream(self, mock_stderr):
- """ Should write message to specified stream. """
- test_message = self.getUniqueString()
- mock_stream = mock.MagicMock()
- daemon.runner.emit_message(test_message, stream=mock_stream)
- mock_stream.write.assert_called_with(mock.ANY)
-
- def test_writes_to_stderr_by_default(self, mock_stderr):
- """ Should write message to ‘sys.stderr’ by default. """
- test_message = self.getUniqueString()
- daemon.runner.emit_message(test_message)
- mock_stderr.write.assert_called_with(mock.ANY)
-
-
-class is_pidfile_stale_TestCase(scaffold.TestCase):
- """ Test cases for ‘is_pidfile_stale’ function. """
-
- def setUp(self):
- """ Set up test fixtures. """
- super(is_pidfile_stale_TestCase, self).setUp()
-
- func_patcher_os_kill = mock.patch.object(os, "kill")
- func_patcher_os_kill.start()
- self.addCleanup(func_patcher_os_kill.stop)
- os.kill.return_value = None
-
- self.test_pid = self.getUniqueInteger()
- self.test_pidfile = mock.MagicMock(daemon.pidfile.TimeoutPIDLockFile)
- self.test_pidfile.read_pid.return_value = self.test_pid
-
- def test_returns_false_if_no_pid_in_file(self):
- """ Should return False if the pidfile contains no PID. """
- self.test_pidfile.read_pid.return_value = None
- expected_result = False
- result = daemon.runner.is_pidfile_stale(self.test_pidfile)
- self.assertEqual(expected_result, result)
-
- def test_returns_false_if_process_exists(self):
- """ Should return False if the process with its PID exists. """
- expected_result = False
- result = daemon.runner.is_pidfile_stale(self.test_pidfile)
- self.assertEqual(expected_result, result)
-
- def test_returns_true_if_process_does_not_exist(self):
- """ Should return True if the process does not exist. """
- test_error = ProcessLookupError("No such process")
- del os.kill.return_value
- os.kill.side_effect = test_error
- expected_result = True
- result = daemon.runner.is_pidfile_stale(self.test_pidfile)
- self.assertEqual(expected_result, result)
-
-
-# Local variables:
-# coding: utf-8
-# mode: python
-# End:
-# vim: fileencoding=utf-8 filetype=python :