summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test')
-rwxr-xr-xscripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/__init__.py23
-rwxr-xr-xscripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/scaffold.py322
-rwxr-xr-xscripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_daemon.py1744
-rwxr-xr-xscripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_metadata.py380
-rwxr-xr-xscripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_pidfile.py472
-rwxr-xr-xscripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_runner.py675
6 files changed, 3616 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/__init__.py b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/__init__.py
new file mode 100755
index 00000000..398519f1
--- /dev/null
+++ b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/__init__.py
@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+#
+# test/__init__.py
+# Part of ‘python-daemon’, an implementation of PEP 3143.
+#
+# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Apache License, version 2.0 as published by the
+# Apache Software Foundation.
+# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
+
+""" Unit test suite for ‘daemon’ package.
+ """
+
+from __future__ import (absolute_import, unicode_literals)
+
+
+# Local variables:
+# coding: utf-8
+# mode: python
+# End:
+# vim: fileencoding=utf-8 filetype=python :
diff --git a/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/scaffold.py b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/scaffold.py
new file mode 100755
index 00000000..9a4f1150
--- /dev/null
+++ b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/scaffold.py
@@ -0,0 +1,322 @@
+# -*- coding: utf-8 -*-
+
+# test/scaffold.py
+# Part of ‘python-daemon’, an implementation of PEP 3143.
+#
+# Copyright © 2007–2015 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Apache License, version 2.0 as published by the
+# Apache Software Foundation.
+# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
+
+""" Scaffolding for unit test modules.
+ """
+
+from __future__ import (absolute_import, unicode_literals)
+
+import unittest
+import doctest
+import logging
+import os
+import sys
+import operator
+import textwrap
+from copy import deepcopy
+import functools
+
+try:
+ # Python 2 has both ‘str’ (bytes) and ‘unicode’ (text).
+ basestring = basestring
+ unicode = unicode
+except NameError:
+ # Python 3 names the Unicode data type ‘str’.
+ basestring = str
+ unicode = str
+
+import testscenarios
+import testtools.testcase
+
+
+test_dir = os.path.dirname(os.path.abspath(__file__))
+parent_dir = os.path.dirname(test_dir)
+if not test_dir in sys.path:
+ sys.path.insert(1, test_dir)
+if not parent_dir in sys.path:
+ sys.path.insert(1, parent_dir)
+
+# Disable all but the most critical logging messages.
+logging.disable(logging.CRITICAL)
+
+
+def get_function_signature(func):
+ """ Get the function signature as a mapping of attributes.
+
+ :param func: The function object to interrogate.
+ :return: A mapping of the components of a function signature.
+
+ The signature is constructed as a mapping:
+
+ * 'name': The function's defined name.
+ * 'arg_count': The number of arguments expected by the function.
+ * 'arg_names': A sequence of the argument names, as strings.
+ * 'arg_defaults': A sequence of the default values for the arguments.
+ * 'va_args': The name bound to remaining positional arguments.
+ * 'va_kw_args': The name bound to remaining keyword arguments.
+
+ """
+ try:
+ # Python 3 function attributes.
+ func_code = func.__code__
+ func_defaults = func.__defaults__
+ except AttributeError:
+ # Python 2 function attributes.
+ func_code = func.func_code
+ func_defaults = func.func_defaults
+
+ arg_count = func_code.co_argcount
+ arg_names = func_code.co_varnames[:arg_count]
+
+ arg_defaults = {}
+ if func_defaults is not None:
+ arg_defaults = dict(
+ (name, value)
+ for (name, value) in
+ zip(arg_names[::-1], func_defaults[::-1]))
+
+ signature = {
+ 'name': func.__name__,
+ 'arg_count': arg_count,
+ 'arg_names': arg_names,
+ 'arg_defaults': arg_defaults,
+ }
+
+ non_pos_names = list(func_code.co_varnames[arg_count:])
+ COLLECTS_ARBITRARY_POSITIONAL_ARGS = 0x04
+ if func_code.co_flags & COLLECTS_ARBITRARY_POSITIONAL_ARGS:
+ signature['var_args'] = non_pos_names.pop(0)
+ COLLECTS_ARBITRARY_KEYWORD_ARGS = 0x08
+ if func_code.co_flags & COLLECTS_ARBITRARY_KEYWORD_ARGS:
+ signature['var_kw_args'] = non_pos_names.pop(0)
+
+ return signature
+
+
+def format_function_signature(func):
+ """ Format the function signature as printable text.
+
+ :param func: The function object to interrogate.
+ :return: A formatted text representation of the function signature.
+
+ The signature is rendered a text; for example::
+
+ foo(spam, eggs, ham=True, beans=None, *args, **kwargs)
+
+ """
+ signature = get_function_signature(func)
+
+ args_text = []
+ for arg_name in signature['arg_names']:
+ if arg_name in signature['arg_defaults']:
+ arg_text = "{name}={value!r}".format(
+ name=arg_name, value=signature['arg_defaults'][arg_name])
+ else:
+ arg_text = "{name}".format(
+ name=arg_name)
+ args_text.append(arg_text)
+ if 'var_args' in signature:
+ args_text.append("*{var_args}".format(signature))
+ if 'var_kw_args' in signature:
+ args_text.append("**{var_kw_args}".format(signature))
+ signature_args_text = ", ".join(args_text)
+
+ func_name = signature['name']
+ signature_text = "{name}({args})".format(
+ name=func_name, args=signature_args_text)
+
+ return signature_text
+
+
+class TestCase(testtools.testcase.TestCase):
+ """ Test case behaviour. """
+
+ def failUnlessOutputCheckerMatch(self, want, got, msg=None):
+ """ Fail unless the specified string matches the expected.
+
+ :param want: The desired output pattern.
+ :param got: The actual text to match.
+ :param msg: A message to prefix on the failure message.
+ :return: ``None``.
+ :raises self.failureException: If the text does not match.
+
+ Fail the test unless ``want`` matches ``got``, as determined by
+ a ``doctest.OutputChecker`` instance. This is not an equality
+ check, but a pattern match according to the ``OutputChecker``
+ rules.
+
+ """
+ checker = doctest.OutputChecker()
+ want = textwrap.dedent(want)
+ source = ""
+ example = doctest.Example(source, want)
+ got = textwrap.dedent(got)
+ checker_optionflags = functools.reduce(operator.or_, [
+ doctest.ELLIPSIS,
+ ])
+ if not checker.check_output(want, got, checker_optionflags):
+ if msg is None:
+ diff = checker.output_difference(
+ example, got, checker_optionflags)
+ msg = "\n".join([
+ "Output received did not match expected output",
+ "{diff}",
+ ]).format(
+ diff=diff)
+ raise self.failureException(msg)
+
+ assertOutputCheckerMatch = failUnlessOutputCheckerMatch
+
+ def failUnlessFunctionInTraceback(self, traceback, function, msg=None):
+ """ Fail if the function is not in the traceback.
+
+ :param traceback: The traceback object to interrogate.
+ :param function: The function object to match.
+ :param msg: A message to prefix on the failure message.
+ :return: ``None``.
+
+ :raises self.failureException: If the function is not in the
+ traceback.
+
+ Fail the test if the function ``function`` is not at any of the
+ levels in the traceback object ``traceback``.
+
+ """
+ func_in_traceback = False
+ expected_code = function.func_code
+ current_traceback = traceback
+ while current_traceback is not None:
+ if expected_code is current_traceback.tb_frame.f_code:
+ func_in_traceback = True
+ break
+ current_traceback = current_traceback.tb_next
+
+ if not func_in_traceback:
+ if msg is None:
+ msg = (
+ "Traceback did not lead to original function"
+ " {function}"
+ ).format(
+ function=function)
+ raise self.failureException(msg)
+
+ assertFunctionInTraceback = failUnlessFunctionInTraceback
+
+ def failUnlessFunctionSignatureMatch(self, first, second, msg=None):
+ """ Fail if the function signatures do not match.
+
+ :param first: The first function to compare.
+ :param second: The second function to compare.
+ :param msg: A message to prefix to the failure message.
+ :return: ``None``.
+
+ :raises self.failureException: If the function signatures do
+ not match.
+
+ Fail the test if the function signature does not match between
+ the ``first`` function and the ``second`` function.
+
+ The function signature includes:
+
+ * function name,
+
+ * count of named parameters,
+
+ * sequence of named parameters,
+
+ * default values of named parameters,
+
+ * collector for arbitrary positional arguments,
+
+ * collector for arbitrary keyword arguments.
+
+ """
+ first_signature = get_function_signature(first)
+ second_signature = get_function_signature(second)
+
+ if first_signature != second_signature:
+ if msg is None:
+ first_signature_text = format_function_signature(first)
+ second_signature_text = format_function_signature(second)
+ msg = (textwrap.dedent("""\
+ Function signatures do not match:
+ {first!r} != {second!r}
+ Expected:
+ {first_text}
+ Got:
+ {second_text}""")
+ ).format(
+ first=first_signature,
+ first_text=first_signature_text,
+ second=second_signature,
+ second_text=second_signature_text,
+ )
+ raise self.failureException(msg)
+
+ assertFunctionSignatureMatch = failUnlessFunctionSignatureMatch
+
+
+class TestCaseWithScenarios(testscenarios.WithScenarios, TestCase):
+ """ Test cases run per scenario. """
+
+
+class Exception_TestCase(TestCaseWithScenarios):
+ """ Test cases for exception classes. """
+
+ def test_exception_instance(self):
+ """ Exception instance should be created. """
+ self.assertIsNot(self.instance, None)
+
+ def test_exception_types(self):
+ """ Exception instance should match expected types. """
+ for match_type in self.types:
+ self.assertIsInstance(self.instance, match_type)
+
+
+def make_exception_scenarios(scenarios):
+ """ Make test scenarios for exception classes.
+
+ :param scenarios: Sequence of scenarios.
+ :return: List of scenarios with additional mapping entries.
+
+ Use this with `testscenarios` to adapt `Exception_TestCase`_ for
+ any exceptions that need testing.
+
+ Each scenario is a tuple (`name`, `map`) where `map` is a mapping
+ of attributes to be applied to each test case. Attributes map must
+ contain items for:
+
+ :key exc_type:
+ The exception type to be tested.
+ :key min_args:
+ The minimum argument count for the exception instance
+ initialiser.
+ :key types:
+ Sequence of types that should be superclasses of each
+ instance of the exception type.
+
+ """
+ updated_scenarios = deepcopy(scenarios)
+ for (name, scenario) in updated_scenarios:
+ args = (None,) * scenario['min_args']
+ scenario['args'] = args
+ instance = scenario['exc_type'](*args)
+ scenario['instance'] = instance
+
+ return updated_scenarios
+
+
+# Local variables:
+# coding: utf-8
+# mode: python
+# End:
+# vim: fileencoding=utf-8 filetype=python :
diff --git a/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_daemon.py b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_daemon.py
new file mode 100755
index 00000000..a911858a
--- /dev/null
+++ b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_daemon.py
@@ -0,0 +1,1744 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_daemon.py
+# Part of ‘python-daemon’, an implementation of PEP 3143.
+#
+# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Apache License, version 2.0 as published by the
+# Apache Software Foundation.
+# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
+
+""" Unit test for ‘daemon’ module.
+ """
+
+from __future__ import (absolute_import, unicode_literals)
+
+import os
+import sys
+import tempfile
+import resource
+import errno
+import signal
+import socket
+from types import ModuleType
+import collections
+import functools
+try:
+ # Standard library of Python 2.7 and later.
+ from io import StringIO
+except ImportError:
+ # Standard library of Python 2.6 and earlier.
+ from StringIO import StringIO
+
+import mock
+
+from . import scaffold
+from .scaffold import (basestring, unicode)
+from .test_pidfile import (
+ FakeFileDescriptorStringIO,
+ setup_pidfile_fixtures,
+ )
+
+import daemon
+
+
+class ModuleExceptions_TestCase(scaffold.Exception_TestCase):
+ """ Test cases for module exception classes. """
+
+ scenarios = scaffold.make_exception_scenarios([
+ ('daemon.daemon.DaemonError', dict(
+ exc_type = daemon.daemon.DaemonError,
+ min_args = 1,
+ types = [Exception],
+ )),
+ ('daemon.daemon.DaemonOSEnvironmentError', dict(
+ exc_type = daemon.daemon.DaemonOSEnvironmentError,
+ min_args = 1,
+ types = [daemon.daemon.DaemonError, OSError],
+ )),
+ ('daemon.daemon.DaemonProcessDetachError', dict(
+ exc_type = daemon.daemon.DaemonProcessDetachError,
+ min_args = 1,
+ types = [daemon.daemon.DaemonError, OSError],
+ )),
+ ])
+
+
+def setup_daemon_context_fixtures(testcase):
+ """ Set up common test fixtures for DaemonContext test case.
+
+ :param testcase: A ``TestCase`` instance to decorate.
+ :return: ``None``.
+
+ Decorate the `testcase` with fixtures for tests involving
+ `DaemonContext`.
+
+ """
+ setup_streams_fixtures(testcase)
+
+ setup_pidfile_fixtures(testcase)
+
+ testcase.fake_pidfile_path = tempfile.mktemp()
+ testcase.mock_pidlockfile = mock.MagicMock()
+ testcase.mock_pidlockfile.path = testcase.fake_pidfile_path
+
+ testcase.daemon_context_args = dict(
+ stdin=testcase.stream_files_by_name['stdin'],
+ stdout=testcase.stream_files_by_name['stdout'],
+ stderr=testcase.stream_files_by_name['stderr'],
+ )
+ testcase.test_instance = daemon.DaemonContext(
+ **testcase.daemon_context_args)
+
+fake_default_signal_map = object()
+
+@mock.patch.object(
+ daemon.daemon, "is_detach_process_context_required",
+ new=(lambda: True))
+@mock.patch.object(
+ daemon.daemon, "make_default_signal_map",
+ new=(lambda: fake_default_signal_map))
+@mock.patch.object(os, "setgid", new=(lambda x: object()))
+@mock.patch.object(os, "setuid", new=(lambda x: object()))
+class DaemonContext_BaseTestCase(scaffold.TestCase):
+ """ Base class for DaemonContext test case classes. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonContext_BaseTestCase, self).setUp()
+
+ setup_daemon_context_fixtures(self)
+
+
+class DaemonContext_TestCase(DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext class. """
+
+ def test_instantiate(self):
+ """ New instance of DaemonContext should be created. """
+ self.assertIsInstance(
+ self.test_instance, daemon.daemon.DaemonContext)
+
+ def test_minimum_zero_arguments(self):
+ """ Initialiser should not require any arguments. """
+ instance = daemon.daemon.DaemonContext()
+ self.assertIsNot(instance, None)
+
+ def test_has_specified_chroot_directory(self):
+ """ Should have specified chroot_directory option. """
+ args = dict(
+ chroot_directory=object(),
+ )
+ expected_directory = args['chroot_directory']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_directory, instance.chroot_directory)
+
+ def test_has_specified_working_directory(self):
+ """ Should have specified working_directory option. """
+ args = dict(
+ working_directory=object(),
+ )
+ expected_directory = args['working_directory']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_directory, instance.working_directory)
+
+ def test_has_default_working_directory(self):
+ """ Should have default working_directory option. """
+ args = dict()
+ expected_directory = "/"
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_directory, instance.working_directory)
+
+ def test_has_specified_creation_mask(self):
+ """ Should have specified umask option. """
+ args = dict(
+ umask=object(),
+ )
+ expected_mask = args['umask']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_mask, instance.umask)
+
+ def test_has_default_creation_mask(self):
+ """ Should have default umask option. """
+ args = dict()
+ expected_mask = 0
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_mask, instance.umask)
+
+ def test_has_specified_uid(self):
+ """ Should have specified uid option. """
+ args = dict(
+ uid=object(),
+ )
+ expected_id = args['uid']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_id, instance.uid)
+
+ def test_has_derived_uid(self):
+ """ Should have uid option derived from process. """
+ args = dict()
+ expected_id = os.getuid()
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_id, instance.uid)
+
+ def test_has_specified_gid(self):
+ """ Should have specified gid option. """
+ args = dict(
+ gid=object(),
+ )
+ expected_id = args['gid']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_id, instance.gid)
+
+ def test_has_derived_gid(self):
+ """ Should have gid option derived from process. """
+ args = dict()
+ expected_id = os.getgid()
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_id, instance.gid)
+
+ def test_has_specified_detach_process(self):
+ """ Should have specified detach_process option. """
+ args = dict(
+ detach_process=object(),
+ )
+ expected_value = args['detach_process']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_value, instance.detach_process)
+
+ def test_has_derived_detach_process(self):
+ """ Should have detach_process option derived from environment. """
+ args = dict()
+ func = daemon.daemon.is_detach_process_context_required
+ expected_value = func()
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_value, instance.detach_process)
+
+ def test_has_specified_files_preserve(self):
+ """ Should have specified files_preserve option. """
+ args = dict(
+ files_preserve=object(),
+ )
+ expected_files_preserve = args['files_preserve']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_files_preserve, instance.files_preserve)
+
+ def test_has_specified_pidfile(self):
+ """ Should have the specified pidfile. """
+ args = dict(
+ pidfile=object(),
+ )
+ expected_pidfile = args['pidfile']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_pidfile, instance.pidfile)
+
+ def test_has_specified_stdin(self):
+ """ Should have specified stdin option. """
+ args = dict(
+ stdin=object(),
+ )
+ expected_file = args['stdin']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_file, instance.stdin)
+
+ def test_has_specified_stdout(self):
+ """ Should have specified stdout option. """
+ args = dict(
+ stdout=object(),
+ )
+ expected_file = args['stdout']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_file, instance.stdout)
+
+ def test_has_specified_stderr(self):
+ """ Should have specified stderr option. """
+ args = dict(
+ stderr=object(),
+ )
+ expected_file = args['stderr']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_file, instance.stderr)
+
+ def test_has_specified_signal_map(self):
+ """ Should have specified signal_map option. """
+ args = dict(
+ signal_map=object(),
+ )
+ expected_signal_map = args['signal_map']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_signal_map, instance.signal_map)
+
+ def test_has_derived_signal_map(self):
+ """ Should have signal_map option derived from system. """
+ args = dict()
+ expected_signal_map = daemon.daemon.make_default_signal_map()
+ instance = daemon.daemon.DaemonContext(**args)
+ self.assertEqual(expected_signal_map, instance.signal_map)
+
+
+class DaemonContext_is_open_TestCase(DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext.is_open property. """
+
+ def test_begin_false(self):
+ """ Initial value of is_open should be False. """
+ instance = self.test_instance
+ self.assertEqual(False, instance.is_open)
+
+ def test_write_fails(self):
+ """ Writing to is_open should fail. """
+ instance = self.test_instance
+ self.assertRaises(
+ AttributeError,
+ setattr, instance, 'is_open', object())
+
+
+class DaemonContext_open_TestCase(DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext.open method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonContext_open_TestCase, self).setUp()
+
+ self.test_instance._is_open = False
+
+ self.mock_module_daemon = mock.MagicMock()
+ daemon_func_patchers = dict(
+ (func_name, mock.patch.object(
+ daemon.daemon, func_name))
+ for func_name in [
+ "detach_process_context",
+ "change_working_directory",
+ "change_root_directory",
+ "change_file_creation_mask",
+ "change_process_owner",
+ "prevent_core_dump",
+ "close_all_open_files",
+ "redirect_stream",
+ "set_signal_handlers",
+ "register_atexit_function",
+ ])
+ for (func_name, patcher) in daemon_func_patchers.items():
+ mock_func = patcher.start()
+ self.addCleanup(patcher.stop)
+ self.mock_module_daemon.attach_mock(mock_func, func_name)
+
+ self.mock_module_daemon.attach_mock(mock.Mock(), 'DaemonContext')
+
+ self.test_files_preserve_fds = object()
+ self.test_signal_handler_map = object()
+ daemoncontext_method_return_values = {
+ '_get_exclude_file_descriptors':
+ self.test_files_preserve_fds,
+ '_make_signal_handler_map':
+ self.test_signal_handler_map,
+ }
+ daemoncontext_func_patchers = dict(
+ (func_name, mock.patch.object(
+ daemon.daemon.DaemonContext,
+ func_name,
+ return_value=return_value))
+ for (func_name, return_value) in
+ daemoncontext_method_return_values.items())
+ for (func_name, patcher) in daemoncontext_func_patchers.items():
+ mock_func = patcher.start()
+ self.addCleanup(patcher.stop)
+ self.mock_module_daemon.DaemonContext.attach_mock(
+ mock_func, func_name)
+
+ def test_performs_steps_in_expected_sequence(self):
+ """ Should perform daemonisation steps in expected sequence. """
+ instance = self.test_instance
+ instance.chroot_directory = object()
+ instance.detach_process = True
+ instance.pidfile = self.mock_pidlockfile
+ self.mock_module_daemon.attach_mock(
+ self.mock_pidlockfile, 'pidlockfile')
+ expected_calls = [
+ mock.call.change_root_directory(mock.ANY),
+ mock.call.prevent_core_dump(),
+ mock.call.change_file_creation_mask(mock.ANY),
+ mock.call.change_working_directory(mock.ANY),
+ mock.call.change_process_owner(mock.ANY, mock.ANY),
+ mock.call.detach_process_context(),
+ mock.call.DaemonContext._make_signal_handler_map(),
+ mock.call.set_signal_handlers(mock.ANY),
+ mock.call.DaemonContext._get_exclude_file_descriptors(),
+ mock.call.close_all_open_files(exclude=mock.ANY),
+ mock.call.redirect_stream(mock.ANY, mock.ANY),
+ mock.call.redirect_stream(mock.ANY, mock.ANY),
+ mock.call.redirect_stream(mock.ANY, mock.ANY),
+ mock.call.pidlockfile.__enter__(),
+ mock.call.register_atexit_function(mock.ANY),
+ ]
+ instance.open()
+ self.mock_module_daemon.assert_has_calls(expected_calls)
+
+ def test_returns_immediately_if_is_open(self):
+ """ Should return immediately if is_open property is true. """
+ instance = self.test_instance
+ instance._is_open = True
+ instance.open()
+ self.assertEqual(0, len(self.mock_module_daemon.mock_calls))
+
+ def test_changes_root_directory_to_chroot_directory(self):
+ """ Should change root directory to `chroot_directory` option. """
+ instance = self.test_instance
+ chroot_directory = object()
+ instance.chroot_directory = chroot_directory
+ instance.open()
+ self.mock_module_daemon.change_root_directory.assert_called_with(
+ chroot_directory)
+
+ def test_omits_chroot_if_no_chroot_directory(self):
+ """ Should omit changing root directory if no `chroot_directory`. """
+ instance = self.test_instance
+ instance.chroot_directory = None
+ instance.open()
+ self.assertFalse(self.mock_module_daemon.change_root_directory.called)
+
+ def test_prevents_core_dump(self):
+ """ Should request prevention of core dumps. """
+ instance = self.test_instance
+ instance.open()
+ self.mock_module_daemon.prevent_core_dump.assert_called_with()
+
+ def test_omits_prevent_core_dump_if_prevent_core_false(self):
+ """ Should omit preventing core dumps if `prevent_core` is false. """
+ instance = self.test_instance
+ instance.prevent_core = False
+ instance.open()
+ self.assertFalse(self.mock_module_daemon.prevent_core_dump.called)
+
+ def test_closes_open_files(self):
+ """ Should close all open files, excluding `files_preserve`. """
+ instance = self.test_instance
+ expected_exclude = self.test_files_preserve_fds
+ instance.open()
+ self.mock_module_daemon.close_all_open_files.assert_called_with(
+ exclude=expected_exclude)
+
+ def test_changes_directory_to_working_directory(self):
+ """ Should change current directory to `working_directory` option. """
+ instance = self.test_instance
+ working_directory = object()
+ instance.working_directory = working_directory
+ instance.open()
+ self.mock_module_daemon.change_working_directory.assert_called_with(
+ working_directory)
+
+ def test_changes_creation_mask_to_umask(self):
+ """ Should change file creation mask to `umask` option. """
+ instance = self.test_instance
+ umask = object()
+ instance.umask = umask
+ instance.open()
+ self.mock_module_daemon.change_file_creation_mask.assert_called_with(
+ umask)
+
+ def test_changes_owner_to_specified_uid_and_gid(self):
+ """ Should change process UID and GID to `uid` and `gid` options. """
+ instance = self.test_instance
+ uid = object()
+ gid = object()
+ instance.uid = uid
+ instance.gid = gid
+ instance.open()
+ self.mock_module_daemon.change_process_owner.assert_called_with(
+ uid, gid)
+
+ def test_detaches_process_context(self):
+ """ Should request detach of process context. """
+ instance = self.test_instance
+ instance.open()
+ self.mock_module_daemon.detach_process_context.assert_called_with()
+
+ def test_omits_process_detach_if_not_required(self):
+ """ Should omit detach of process context if not required. """
+ instance = self.test_instance
+ instance.detach_process = False
+ instance.open()
+ self.assertFalse(self.mock_module_daemon.detach_process_context.called)
+
+ def test_sets_signal_handlers_from_signal_map(self):
+ """ Should set signal handlers according to `signal_map`. """
+ instance = self.test_instance
+ instance.signal_map = object()
+ expected_signal_handler_map = self.test_signal_handler_map
+ instance.open()
+ self.mock_module_daemon.set_signal_handlers.assert_called_with(
+ expected_signal_handler_map)
+
+ def test_redirects_standard_streams(self):
+ """ Should request redirection of standard stream files. """
+ instance = self.test_instance
+ (system_stdin, system_stdout, system_stderr) = (
+ sys.stdin, sys.stdout, sys.stderr)
+ (target_stdin, target_stdout, target_stderr) = (
+ self.stream_files_by_name[name]
+ for name in ['stdin', 'stdout', 'stderr'])
+ expected_calls = [
+ mock.call(system_stdin, target_stdin),
+ mock.call(system_stdout, target_stdout),
+ mock.call(system_stderr, target_stderr),
+ ]
+ instance.open()
+ self.mock_module_daemon.redirect_stream.assert_has_calls(
+ expected_calls, any_order=True)
+
+ def test_enters_pidfile_context(self):
+ """ Should enter the PID file context manager. """
+ instance = self.test_instance
+ instance.pidfile = self.mock_pidlockfile
+ instance.open()
+ self.mock_pidlockfile.__enter__.assert_called_with()
+
+ def test_sets_is_open_true(self):
+ """ Should set the `is_open` property to True. """
+ instance = self.test_instance
+ instance.open()
+ self.assertEqual(True, instance.is_open)
+
+ def test_registers_close_method_for_atexit(self):
+ """ Should register the `close` method for atexit processing. """
+ instance = self.test_instance
+ close_method = instance.close
+ instance.open()
+ self.mock_module_daemon.register_atexit_function.assert_called_with(
+ close_method)
+
+
+class DaemonContext_close_TestCase(DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext.close method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonContext_close_TestCase, self).setUp()
+
+ self.test_instance._is_open = True
+
+ def test_returns_immediately_if_not_is_open(self):
+ """ Should return immediately if is_open property is false. """
+ instance = self.test_instance
+ instance._is_open = False
+ instance.pidfile = object()
+ instance.close()
+ self.assertFalse(self.mock_pidlockfile.__exit__.called)
+
+ def test_exits_pidfile_context(self):
+ """ Should exit the PID file context manager. """
+ instance = self.test_instance
+ instance.pidfile = self.mock_pidlockfile
+ instance.close()
+ self.mock_pidlockfile.__exit__.assert_called_with(None, None, None)
+
+ def test_returns_none(self):
+ """ Should return None. """
+ instance = self.test_instance
+ expected_result = None
+ result = instance.close()
+ self.assertIs(result, expected_result)
+
+ def test_sets_is_open_false(self):
+ """ Should set the `is_open` property to False. """
+ instance = self.test_instance
+ instance.close()
+ self.assertEqual(False, instance.is_open)
+
+
+@mock.patch.object(daemon.daemon.DaemonContext, "open")
+class DaemonContext_context_manager_enter_TestCase(DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext.__enter__ method. """
+
+ def test_opens_daemon_context(self, mock_func_daemoncontext_open):
+ """ Should open the DaemonContext. """
+ instance = self.test_instance
+ instance.__enter__()
+ mock_func_daemoncontext_open.assert_called_with()
+
+ def test_returns_self_instance(self, mock_func_daemoncontext_open):
+ """ Should return DaemonContext instance. """
+ instance = self.test_instance
+ expected_result = instance
+ result = instance.__enter__()
+ self.assertIs(result, expected_result)
+
+
+@mock.patch.object(daemon.daemon.DaemonContext, "close")
+class DaemonContext_context_manager_exit_TestCase(DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext.__exit__ method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonContext_context_manager_exit_TestCase, self).setUp()
+
+ self.test_args = dict(
+ exc_type=object(),
+ exc_value=object(),
+ traceback=object(),
+ )
+
+ def test_closes_daemon_context(self, mock_func_daemoncontext_close):
+ """ Should close the DaemonContext. """
+ instance = self.test_instance
+ args = self.test_args
+ instance.__exit__(**args)
+ mock_func_daemoncontext_close.assert_called_with()
+
+ def test_returns_none(self, mock_func_daemoncontext_close):
+ """ Should return None, indicating exception was not handled. """
+ instance = self.test_instance
+ args = self.test_args
+ expected_result = None
+ result = instance.__exit__(**args)
+ self.assertIs(result, expected_result)
+
+
+class DaemonContext_terminate_TestCase(DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext.terminate method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonContext_terminate_TestCase, self).setUp()
+
+ self.test_signal = signal.SIGTERM
+ self.test_frame = None
+ self.test_args = (self.test_signal, self.test_frame)
+
+ def test_raises_system_exit(self):
+ """ Should raise SystemExit. """
+ instance = self.test_instance
+ args = self.test_args
+ expected_exception = SystemExit
+ self.assertRaises(
+ expected_exception,
+ instance.terminate, *args)
+
+ def test_exception_message_contains_signal_number(self):
+ """ Should raise exception with a message containing signal number. """
+ instance = self.test_instance
+ args = self.test_args
+ signal_number = self.test_signal
+ expected_exception = SystemExit
+ exc = self.assertRaises(
+ expected_exception,
+ instance.terminate, *args)
+ self.assertIn(unicode(signal_number), unicode(exc))
+
+
+class DaemonContext_get_exclude_file_descriptors_TestCase(
+ DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext._get_exclude_file_descriptors function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(
+ DaemonContext_get_exclude_file_descriptors_TestCase,
+ self).setUp()
+
+ self.test_files = {
+ 2: FakeFileDescriptorStringIO(),
+ 5: 5,
+ 11: FakeFileDescriptorStringIO(),
+ 17: None,
+ 23: FakeFileDescriptorStringIO(),
+ 37: 37,
+ 42: FakeFileDescriptorStringIO(),
+ }
+ for (fileno, item) in self.test_files.items():
+ if hasattr(item, '_fileno'):
+ item._fileno = fileno
+ self.test_file_descriptors = set(
+ fd for (fd, item) in self.test_files.items()
+ if item is not None)
+ self.test_file_descriptors.update(
+ self.stream_files_by_name[name].fileno()
+ for name in ['stdin', 'stdout', 'stderr']
+ )
+
+ def test_returns_expected_file_descriptors(self):
+ """ Should return expected set of file descriptors. """
+ instance = self.test_instance
+ instance.files_preserve = list(self.test_files.values())
+ expected_result = self.test_file_descriptors
+ result = instance._get_exclude_file_descriptors()
+ self.assertEqual(expected_result, result)
+
+ def test_returns_stream_redirects_if_no_files_preserve(self):
+ """ Should return only stream redirects if no files_preserve. """
+ instance = self.test_instance
+ instance.files_preserve = None
+ expected_result = set(
+ stream.fileno()
+ for stream in self.stream_files_by_name.values())
+ result = instance._get_exclude_file_descriptors()
+ self.assertEqual(expected_result, result)
+
+ def test_returns_empty_set_if_no_files(self):
+ """ Should return empty set if no file options. """
+ instance = self.test_instance
+ for name in ['files_preserve', 'stdin', 'stdout', 'stderr']:
+ setattr(instance, name, None)
+ expected_result = set()
+ result = instance._get_exclude_file_descriptors()
+ self.assertEqual(expected_result, result)
+
+ def test_omits_non_file_streams(self):
+ """ Should omit non-file stream attributes. """
+ instance = self.test_instance
+ instance.files_preserve = list(self.test_files.values())
+ stream_files = self.stream_files_by_name
+ expected_result = self.test_file_descriptors.copy()
+ for (pseudo_stream_name, pseudo_stream) in stream_files.items():
+ test_non_file_object = object()
+ setattr(instance, pseudo_stream_name, test_non_file_object)
+ stream_fd = pseudo_stream.fileno()
+ expected_result.discard(stream_fd)
+ result = instance._get_exclude_file_descriptors()
+ self.assertEqual(expected_result, result)
+
+ def test_includes_verbatim_streams_without_file_descriptor(self):
+ """ Should include verbatim any stream without a file descriptor. """
+ instance = self.test_instance
+ instance.files_preserve = list(self.test_files.values())
+ stream_files = self.stream_files_by_name
+ mock_fileno_method = mock.MagicMock(
+ spec=sys.__stdin__.fileno,
+ side_effect=ValueError)
+ expected_result = self.test_file_descriptors.copy()
+ for (pseudo_stream_name, pseudo_stream) in stream_files.items():
+ test_non_fd_stream = StringIO()
+ if not hasattr(test_non_fd_stream, 'fileno'):
+ # Python < 3 StringIO doesn't have ‘fileno’ at all.
+ # Add a method which raises an exception.
+ test_non_fd_stream.fileno = mock_fileno_method
+ setattr(instance, pseudo_stream_name, test_non_fd_stream)
+ stream_fd = pseudo_stream.fileno()
+ expected_result.discard(stream_fd)
+ expected_result.add(test_non_fd_stream)
+ result = instance._get_exclude_file_descriptors()
+ self.assertEqual(expected_result, result)
+
+ def test_omits_none_streams(self):
+ """ Should omit any stream attribute which is None. """
+ instance = self.test_instance
+ instance.files_preserve = list(self.test_files.values())
+ stream_files = self.stream_files_by_name
+ expected_result = self.test_file_descriptors.copy()
+ for (pseudo_stream_name, pseudo_stream) in stream_files.items():
+ setattr(instance, pseudo_stream_name, None)
+ stream_fd = pseudo_stream.fileno()
+ expected_result.discard(stream_fd)
+ result = instance._get_exclude_file_descriptors()
+ self.assertEqual(expected_result, result)
+
+
+class DaemonContext_make_signal_handler_TestCase(DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext._make_signal_handler function. """
+
+ def test_returns_ignore_for_none(self):
+ """ Should return SIG_IGN when None handler specified. """
+ instance = self.test_instance
+ target = None
+ expected_result = signal.SIG_IGN
+ result = instance._make_signal_handler(target)
+ self.assertEqual(expected_result, result)
+
+ def test_returns_method_for_name(self):
+ """ Should return method of DaemonContext when name specified. """
+ instance = self.test_instance
+ target = 'terminate'
+ expected_result = instance.terminate
+ result = instance._make_signal_handler(target)
+ self.assertEqual(expected_result, result)
+
+ def test_raises_error_for_unknown_name(self):
+ """ Should raise AttributeError for unknown method name. """
+ instance = self.test_instance
+ target = 'b0gUs'
+ expected_error = AttributeError
+ self.assertRaises(
+ expected_error,
+ instance._make_signal_handler, target)
+
+ def test_returns_object_for_object(self):
+ """ Should return same object for any other object. """
+ instance = self.test_instance
+ target = object()
+ expected_result = target
+ result = instance._make_signal_handler(target)
+ self.assertEqual(expected_result, result)
+
+
+class DaemonContext_make_signal_handler_map_TestCase(
+ DaemonContext_BaseTestCase):
+ """ Test cases for DaemonContext._make_signal_handler_map function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonContext_make_signal_handler_map_TestCase, self).setUp()
+
+ self.test_instance.signal_map = {
+ object(): object(),
+ object(): object(),
+ object(): object(),
+ }
+
+ self.test_signal_handlers = dict(
+ (key, object())
+ for key in self.test_instance.signal_map.values())
+ self.test_signal_handler_map = dict(
+ (key, self.test_signal_handlers[target])
+ for (key, target) in self.test_instance.signal_map.items())
+
+ def fake_make_signal_handler(target):
+ return self.test_signal_handlers[target]
+
+ func_patcher_make_signal_handler = mock.patch.object(
+ daemon.daemon.DaemonContext, "_make_signal_handler",
+ side_effect=fake_make_signal_handler)
+ self.mock_func_make_signal_handler = (
+ func_patcher_make_signal_handler.start())
+ self.addCleanup(func_patcher_make_signal_handler.stop)
+
+ def test_returns_constructed_signal_handler_items(self):
+ """ Should return items as constructed via make_signal_handler. """
+ instance = self.test_instance
+ expected_result = self.test_signal_handler_map
+ result = instance._make_signal_handler_map()
+ self.assertEqual(expected_result, result)
+
+
+try:
+ FileNotFoundError
+except NameError:
+ # Python 2 uses IOError.
+ FileNotFoundError = functools.partial(IOError, errno.ENOENT)
+
+
+@mock.patch.object(os, "chdir")
+class change_working_directory_TestCase(scaffold.TestCase):
+ """ Test cases for change_working_directory function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(change_working_directory_TestCase, self).setUp()
+
+ self.test_directory = object()
+ self.test_args = dict(
+ directory=self.test_directory,
+ )
+
+ def test_changes_working_directory_to_specified_directory(
+ self,
+ mock_func_os_chdir):
+ """ Should change working directory to specified directory. """
+ args = self.test_args
+ directory = self.test_directory
+ daemon.daemon.change_working_directory(**args)
+ mock_func_os_chdir.assert_called_with(directory)
+
+ def test_raises_daemon_error_on_os_error(
+ self,
+ mock_func_os_chdir):
+ """ Should raise a DaemonError on receiving an IOError. """
+ args = self.test_args
+ test_error = FileNotFoundError("No such directory")
+ mock_func_os_chdir.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_working_directory, **args)
+ self.assertEqual(test_error, exc.__cause__)
+
+ def test_error_message_contains_original_error_message(
+ self,
+ mock_func_os_chdir):
+ """ Should raise a DaemonError with original message. """
+ args = self.test_args
+ test_error = FileNotFoundError("No such directory")
+ mock_func_os_chdir.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_working_directory, **args)
+ self.assertIn(unicode(test_error), unicode(exc))
+
+
+@mock.patch.object(os, "chroot")
+@mock.patch.object(os, "chdir")
+class change_root_directory_TestCase(scaffold.TestCase):
+ """ Test cases for change_root_directory function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(change_root_directory_TestCase, self).setUp()
+
+ self.test_directory = object()
+ self.test_args = dict(
+ directory=self.test_directory,
+ )
+
+ def test_changes_working_directory_to_specified_directory(
+ self,
+ mock_func_os_chdir, mock_func_os_chroot):
+ """ Should change working directory to specified directory. """
+ args = self.test_args
+ directory = self.test_directory
+ daemon.daemon.change_root_directory(**args)
+ mock_func_os_chdir.assert_called_with(directory)
+
+ def test_changes_root_directory_to_specified_directory(
+ self,
+ mock_func_os_chdir, mock_func_os_chroot):
+ """ Should change root directory to specified directory. """
+ args = self.test_args
+ directory = self.test_directory
+ daemon.daemon.change_root_directory(**args)
+ mock_func_os_chroot.assert_called_with(directory)
+
+ def test_raises_daemon_error_on_os_error_from_chdir(
+ self,
+ mock_func_os_chdir, mock_func_os_chroot):
+ """ Should raise a DaemonError on receiving an IOError from chdir. """
+ args = self.test_args
+ test_error = FileNotFoundError("No such directory")
+ mock_func_os_chdir.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_root_directory, **args)
+ self.assertEqual(test_error, exc.__cause__)
+
+ def test_raises_daemon_error_on_os_error_from_chroot(
+ self,
+ mock_func_os_chdir, mock_func_os_chroot):
+ """ Should raise a DaemonError on receiving an OSError from chroot. """
+ args = self.test_args
+ test_error = OSError(errno.EPERM, "No chroot for you!")
+ mock_func_os_chroot.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_root_directory, **args)
+ self.assertEqual(test_error, exc.__cause__)
+
+ def test_error_message_contains_original_error_message(
+ self,
+ mock_func_os_chdir, mock_func_os_chroot):
+ """ Should raise a DaemonError with original message. """
+ args = self.test_args
+ test_error = FileNotFoundError("No such directory")
+ mock_func_os_chdir.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_root_directory, **args)
+ self.assertIn(unicode(test_error), unicode(exc))
+
+
+@mock.patch.object(os, "umask")
+class change_file_creation_mask_TestCase(scaffold.TestCase):
+ """ Test cases for change_file_creation_mask function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(change_file_creation_mask_TestCase, self).setUp()
+
+ self.test_mask = object()
+ self.test_args = dict(
+ mask=self.test_mask,
+ )
+
+ def test_changes_umask_to_specified_mask(self, mock_func_os_umask):
+ """ Should change working directory to specified directory. """
+ args = self.test_args
+ mask = self.test_mask
+ daemon.daemon.change_file_creation_mask(**args)
+ mock_func_os_umask.assert_called_with(mask)
+
+ def test_raises_daemon_error_on_os_error_from_chdir(
+ self,
+ mock_func_os_umask):
+ """ Should raise a DaemonError on receiving an OSError from umask. """
+ args = self.test_args
+ test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
+ mock_func_os_umask.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_file_creation_mask, **args)
+ self.assertEqual(test_error, exc.__cause__)
+
+ def test_error_message_contains_original_error_message(
+ self,
+ mock_func_os_umask):
+ """ Should raise a DaemonError with original message. """
+ args = self.test_args
+ test_error = FileNotFoundError("No such directory")
+ mock_func_os_umask.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_file_creation_mask, **args)
+ self.assertIn(unicode(test_error), unicode(exc))
+
+
+@mock.patch.object(os, "setgid")
+@mock.patch.object(os, "setuid")
+class change_process_owner_TestCase(scaffold.TestCase):
+ """ Test cases for change_process_owner function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(change_process_owner_TestCase, self).setUp()
+
+ self.test_uid = object()
+ self.test_gid = object()
+ self.test_args = dict(
+ uid=self.test_uid,
+ gid=self.test_gid,
+ )
+
+ def test_changes_gid_and_uid_in_order(
+ self,
+ mock_func_os_setuid, mock_func_os_setgid):
+ """ Should change process GID and UID in correct order.
+
+ Since the process requires appropriate privilege to use
+ either of `setuid` or `setgid`, changing the UID must be
+ done last.
+
+ """
+ args = self.test_args
+ daemon.daemon.change_process_owner(**args)
+ mock_func_os_setuid.assert_called()
+ mock_func_os_setgid.assert_called()
+
+ def test_changes_group_id_to_gid(
+ self,
+ mock_func_os_setuid, mock_func_os_setgid):
+ """ Should change process GID to specified value. """
+ args = self.test_args
+ gid = self.test_gid
+ daemon.daemon.change_process_owner(**args)
+ mock_func_os_setgid.assert_called(gid)
+
+ def test_changes_user_id_to_uid(
+ self,
+ mock_func_os_setuid, mock_func_os_setgid):
+ """ Should change process UID to specified value. """
+ args = self.test_args
+ uid = self.test_uid
+ daemon.daemon.change_process_owner(**args)
+ mock_func_os_setuid.assert_called(uid)
+
+ def test_raises_daemon_error_on_os_error_from_setgid(
+ self,
+ mock_func_os_setuid, mock_func_os_setgid):
+ """ Should raise a DaemonError on receiving an OSError from setgid. """
+ args = self.test_args
+ test_error = OSError(errno.EPERM, "No switching for you!")
+ mock_func_os_setgid.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_process_owner, **args)
+ self.assertEqual(test_error, exc.__cause__)
+
+ def test_raises_daemon_error_on_os_error_from_setuid(
+ self,
+ mock_func_os_setuid, mock_func_os_setgid):
+ """ Should raise a DaemonError on receiving an OSError from setuid. """
+ args = self.test_args
+ test_error = OSError(errno.EPERM, "No switching for you!")
+ mock_func_os_setuid.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_process_owner, **args)
+ self.assertEqual(test_error, exc.__cause__)
+
+ def test_error_message_contains_original_error_message(
+ self,
+ mock_func_os_setuid, mock_func_os_setgid):
+ """ Should raise a DaemonError with original message. """
+ args = self.test_args
+ test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
+ mock_func_os_setuid.side_effect = test_error
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.change_process_owner, **args)
+ self.assertIn(unicode(test_error), unicode(exc))
+
+
+RLimitResult = collections.namedtuple('RLimitResult', ['soft', 'hard'])
+
+fake_RLIMIT_CORE = object()
+
+@mock.patch.object(resource, "RLIMIT_CORE", new=fake_RLIMIT_CORE)
+@mock.patch.object(resource, "setrlimit", side_effect=(lambda x, y: None))
+@mock.patch.object(resource, "getrlimit", side_effect=(lambda x: None))
+class prevent_core_dump_TestCase(scaffold.TestCase):
+ """ Test cases for prevent_core_dump function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(prevent_core_dump_TestCase, self).setUp()
+
+ def test_sets_core_limit_to_zero(
+ self,
+ mock_func_resource_getrlimit, mock_func_resource_setrlimit):
+ """ Should set the RLIMIT_CORE resource to zero. """
+ expected_resource = fake_RLIMIT_CORE
+ expected_limit = tuple(RLimitResult(soft=0, hard=0))
+ daemon.daemon.prevent_core_dump()
+ mock_func_resource_getrlimit.assert_called_with(expected_resource)
+ mock_func_resource_setrlimit.assert_called_with(
+ expected_resource, expected_limit)
+
+ def test_raises_error_when_no_core_resource(
+ self,
+ mock_func_resource_getrlimit, mock_func_resource_setrlimit):
+ """ Should raise DaemonError if no RLIMIT_CORE resource. """
+ test_error = ValueError("Bogus platform doesn't have RLIMIT_CORE")
+ def fake_getrlimit(res):
+ if res == resource.RLIMIT_CORE:
+ raise test_error
+ else:
+ return None
+ mock_func_resource_getrlimit.side_effect = fake_getrlimit
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.prevent_core_dump)
+ self.assertEqual(test_error, exc.__cause__)
+
+
+@mock.patch.object(os, "close")
+class close_file_descriptor_if_open_TestCase(scaffold.TestCase):
+ """ Test cases for close_file_descriptor_if_open function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(close_file_descriptor_if_open_TestCase, self).setUp()
+
+ self.fake_fd = 274
+
+ def test_requests_file_descriptor_close(self, mock_func_os_close):
+ """ Should request close of file descriptor. """
+ fd = self.fake_fd
+ daemon.daemon.close_file_descriptor_if_open(fd)
+ mock_func_os_close.assert_called_with(fd)
+
+ def test_ignores_badfd_error_on_close(self, mock_func_os_close):
+ """ Should ignore OSError EBADF when closing. """
+ fd = self.fake_fd
+ test_error = OSError(errno.EBADF, "Bad file descriptor")
+ def fake_os_close(fd):
+ raise test_error
+ mock_func_os_close.side_effect = fake_os_close
+ daemon.daemon.close_file_descriptor_if_open(fd)
+ mock_func_os_close.assert_called_with(fd)
+
+ def test_raises_error_if_oserror_on_close(self, mock_func_os_close):
+ """ Should raise DaemonError if an OSError occurs when closing. """
+ fd = self.fake_fd
+ test_error = OSError(object(), "Unexpected error")
+ def fake_os_close(fd):
+ raise test_error
+ mock_func_os_close.side_effect = fake_os_close
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.close_file_descriptor_if_open, fd)
+ self.assertEqual(test_error, exc.__cause__)
+
+ def test_raises_error_if_ioerror_on_close(self, mock_func_os_close):
+ """ Should raise DaemonError if an IOError occurs when closing. """
+ fd = self.fake_fd
+ test_error = IOError(object(), "Unexpected error")
+ def fake_os_close(fd):
+ raise test_error
+ mock_func_os_close.side_effect = fake_os_close
+ expected_error = daemon.daemon.DaemonOSEnvironmentError
+ exc = self.assertRaises(
+ expected_error,
+ daemon.daemon.close_file_descriptor_if_open, fd)
+ self.assertEqual(test_error, exc.__cause__)
+
+
+class maxfd_TestCase(scaffold.TestCase):
+ """ Test cases for module MAXFD constant. """
+
+ def test_positive(self):
+ """ Should be a positive number. """
+ maxfd = daemon.daemon.MAXFD
+ self.assertTrue(maxfd > 0)
+
+ def test_integer(self):
+ """ Should be an integer. """
+ maxfd = daemon.daemon.MAXFD
+ self.assertEqual(int(maxfd), maxfd)
+
+ def test_reasonably_high(self):
+ """ Should be reasonably high for default open files limit.
+
+ If the system reports a limit of “infinity” on maximum
+ file descriptors, we still need a finite number in order
+ to close “all” of them. Ensure this is reasonably high
+ to catch most use cases.
+
+ """
+ expected_minimum = 2048
+ maxfd = daemon.daemon.MAXFD
+ self.assertTrue(
+ expected_minimum <= maxfd,
+ msg=(
+ "MAXFD should be at least {minimum!r}"
+ " (got {maxfd!r})".format(
+ minimum=expected_minimum, maxfd=maxfd)))
+
+
+fake_default_maxfd = 8
+fake_RLIMIT_NOFILE = object()
+fake_RLIM_INFINITY = object()
+fake_rlimit_nofile_large = 2468
+
+def fake_getrlimit_nofile_soft_infinity(resource):
+ result = RLimitResult(soft=fake_RLIM_INFINITY, hard=object())
+ if resource != fake_RLIMIT_NOFILE:
+ result = NotImplemented
+ return result
+
+def fake_getrlimit_nofile_hard_infinity(resource):
+ result = RLimitResult(soft=object(), hard=fake_RLIM_INFINITY)
+ if resource != fake_RLIMIT_NOFILE:
+ result = NotImplemented
+ return result
+
+def fake_getrlimit_nofile_hard_large(resource):
+ result = RLimitResult(soft=object(), hard=fake_rlimit_nofile_large)
+ if resource != fake_RLIMIT_NOFILE:
+ result = NotImplemented
+ return result
+
+@mock.patch.object(daemon.daemon, "MAXFD", new=fake_default_maxfd)
+@mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE)
+@mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY)
+@mock.patch.object(
+ resource, "getrlimit",
+ side_effect=fake_getrlimit_nofile_hard_large)
+class get_maximum_file_descriptors_TestCase(scaffold.TestCase):
+ """ Test cases for get_maximum_file_descriptors function. """
+
+ def test_returns_system_hard_limit(self, mock_func_resource_getrlimit):
+ """ Should return process hard limit on number of files. """
+ expected_result = fake_rlimit_nofile_large
+ result = daemon.daemon.get_maximum_file_descriptors()
+ self.assertEqual(expected_result, result)
+
+ def test_returns_module_default_if_hard_limit_infinity(
+ self, mock_func_resource_getrlimit):
+ """ Should return module MAXFD if hard limit is infinity. """
+ mock_func_resource_getrlimit.side_effect = (
+ fake_getrlimit_nofile_hard_infinity)
+ expected_result = fake_default_maxfd
+ result = daemon.daemon.get_maximum_file_descriptors()
+ self.assertEqual(expected_result, result)
+
+
+def fake_get_maximum_file_descriptors():
+ return fake_default_maxfd
+
+@mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE)
+@mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY)
+@mock.patch.object(
+ resource, "getrlimit",
+ new=fake_getrlimit_nofile_soft_infinity)
+@mock.patch.object(
+ daemon.daemon, "get_maximum_file_descriptors",
+ new=fake_get_maximum_file_descriptors)
+@mock.patch.object(daemon.daemon, "close_file_descriptor_if_open")
+class close_all_open_files_TestCase(scaffold.TestCase):
+ """ Test cases for close_all_open_files function. """
+
+ def test_requests_all_open_files_to_close(
+ self, mock_func_close_file_descriptor_if_open):
+ """ Should request close of all open files. """
+ expected_file_descriptors = range(fake_default_maxfd)
+ expected_calls = [
+ mock.call(fd) for fd in expected_file_descriptors]
+ daemon.daemon.close_all_open_files()
+ mock_func_close_file_descriptor_if_open.assert_has_calls(
+ expected_calls, any_order=True)
+
+ def test_requests_all_but_excluded_files_to_close(
+ self, mock_func_close_file_descriptor_if_open):
+ """ Should request close of all open files but those excluded. """
+ test_exclude = set([3, 7])
+ args = dict(
+ exclude=test_exclude,
+ )
+ expected_file_descriptors = set(
+ fd for fd in range(fake_default_maxfd)
+ if fd not in test_exclude)
+ expected_calls = [
+ mock.call(fd) for fd in expected_file_descriptors]
+ daemon.daemon.close_all_open_files(**args)
+ mock_func_close_file_descriptor_if_open.assert_has_calls(
+ expected_calls, any_order=True)
+
+
+class detach_process_context_TestCase(scaffold.TestCase):
+ """ Test cases for detach_process_context function. """
+
+ class FakeOSExit(SystemExit):
+ """ Fake exception raised for os._exit(). """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(detach_process_context_TestCase, self).setUp()
+
+ self.mock_module_os = mock.MagicMock(wraps=os)
+
+ fake_pids = [0, 0]
+ func_patcher_os_fork = mock.patch.object(
+ os, "fork",
+ side_effect=iter(fake_pids))
+ self.mock_func_os_fork = func_patcher_os_fork.start()
+ self.addCleanup(func_patcher_os_fork.stop)
+ self.mock_module_os.attach_mock(self.mock_func_os_fork, "fork")
+
+ func_patcher_os_setsid = mock.patch.object(os, "setsid")
+ self.mock_func_os_setsid = func_patcher_os_setsid.start()
+ self.addCleanup(func_patcher_os_setsid.stop)
+ self.mock_module_os.attach_mock(self.mock_func_os_setsid, "setsid")
+
+ def raise_os_exit(status=None):
+ raise self.FakeOSExit(status)
+
+ func_patcher_os_force_exit = mock.patch.object(
+ os, "_exit",
+ side_effect=raise_os_exit)
+ self.mock_func_os_force_exit = func_patcher_os_force_exit.start()
+ self.addCleanup(func_patcher_os_force_exit.stop)
+ self.mock_module_os.attach_mock(self.mock_func_os_force_exit, "_exit")
+
+ def test_parent_exits(self):
+ """ Parent process should exit. """
+ parent_pid = 23
+ self.mock_func_os_fork.side_effect = iter([parent_pid])
+ self.assertRaises(
+ self.FakeOSExit,
+ daemon.daemon.detach_process_context)
+ self.mock_module_os.assert_has_calls([
+ mock.call.fork(),
+ mock.call._exit(0),
+ ])
+
+ def test_first_fork_error_raises_error(self):
+ """ Error on first fork should raise DaemonProcessDetachError. """
+ fork_errno = 13
+ fork_strerror = "Bad stuff happened"
+ test_error = OSError(fork_errno, fork_strerror)
+ test_pids_iter = iter([test_error])
+
+ def fake_fork():
+ next_item = next(test_pids_iter)
+ if isinstance(next_item, Exception):
+ raise next_item
+ else:
+ return next_item
+
+ self.mock_func_os_fork.side_effect = fake_fork
+ exc = self.assertRaises(
+ daemon.daemon.DaemonProcessDetachError,
+ daemon.daemon.detach_process_context)
+ self.assertEqual(test_error, exc.__cause__)
+ self.mock_module_os.assert_has_calls([
+ mock.call.fork(),
+ ])
+
+ def test_child_starts_new_process_group(self):
+ """ Child should start new process group. """
+ daemon.daemon.detach_process_context()
+ self.mock_module_os.assert_has_calls([
+ mock.call.fork(),
+ mock.call.setsid(),
+ ])
+
+ def test_child_forks_next_parent_exits(self):
+ """ Child should fork, then exit if parent. """
+ fake_pids = [0, 42]
+ self.mock_func_os_fork.side_effect = iter(fake_pids)
+ self.assertRaises(
+ self.FakeOSExit,
+ daemon.daemon.detach_process_context)
+ self.mock_module_os.assert_has_calls([
+ mock.call.fork(),
+ mock.call.setsid(),
+ mock.call.fork(),
+ mock.call._exit(0),
+ ])
+
+ def test_second_fork_error_reports_to_stderr(self):
+ """ Error on second fork should cause report to stderr. """
+ fork_errno = 17
+ fork_strerror = "Nasty stuff happened"
+ test_error = OSError(fork_errno, fork_strerror)
+ test_pids_iter = iter([0, test_error])
+
+ def fake_fork():
+ next_item = next(test_pids_iter)
+ if isinstance(next_item, Exception):
+ raise next_item
+ else:
+ return next_item
+
+ self.mock_func_os_fork.side_effect = fake_fork
+ exc = self.assertRaises(
+ daemon.daemon.DaemonProcessDetachError,
+ daemon.daemon.detach_process_context)
+ self.assertEqual(test_error, exc.__cause__)
+ self.mock_module_os.assert_has_calls([
+ mock.call.fork(),
+ mock.call.setsid(),
+ mock.call.fork(),
+ ])
+
+ def test_child_forks_next_child_continues(self):
+ """ Child should fork, then continue if child. """
+ daemon.daemon.detach_process_context()
+ self.mock_module_os.assert_has_calls([
+ mock.call.fork(),
+ mock.call.setsid(),
+ mock.call.fork(),
+ ])
+
+
+@mock.patch("os.getppid", return_value=765)
+class is_process_started_by_init_TestCase(scaffold.TestCase):
+ """ Test cases for is_process_started_by_init function. """
+
+ def test_returns_false_by_default(self, mock_func_os_getppid):
+ """ Should return False under normal circumstances. """
+ expected_result = False
+ result = daemon.daemon.is_process_started_by_init()
+ self.assertIs(result, expected_result)
+
+ def test_returns_true_if_parent_process_is_init(
+ self, mock_func_os_getppid):
+ """ Should return True if parent process is `init`. """
+ init_pid = 1
+ mock_func_os_getppid.return_value = init_pid
+ expected_result = True
+ result = daemon.daemon.is_process_started_by_init()
+ self.assertIs(result, expected_result)
+
+
+class is_socket_TestCase(scaffold.TestCase):
+ """ Test cases for is_socket function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(is_socket_TestCase, self).setUp()
+
+ def fake_getsockopt(level, optname, buflen=None):
+ result = object()
+ if optname is socket.SO_TYPE:
+ result = socket.SOCK_RAW
+ return result
+
+ self.fake_socket_getsockopt_func = fake_getsockopt
+
+ self.fake_socket_error = socket.error(
+ errno.ENOTSOCK,
+ "Socket operation on non-socket")
+
+ self.mock_socket = mock.MagicMock(spec=socket.socket)
+ self.mock_socket.getsockopt.side_effect = self.fake_socket_error
+
+ def fake_socket_fromfd(fd, family, type, proto=None):
+ return self.mock_socket
+
+ func_patcher_socket_fromfd = mock.patch.object(
+ socket, "fromfd",
+ side_effect=fake_socket_fromfd)
+ func_patcher_socket_fromfd.start()
+ self.addCleanup(func_patcher_socket_fromfd.stop)
+
+ def test_returns_false_by_default(self):
+ """ Should return False under normal circumstances. """
+ test_fd = 23
+ expected_result = False
+ result = daemon.daemon.is_socket(test_fd)
+ self.assertIs(result, expected_result)
+
+ def test_returns_true_if_stdin_is_socket(self):
+ """ Should return True if `stdin` is a socket. """
+ test_fd = 23
+ getsockopt = self.mock_socket.getsockopt
+ getsockopt.side_effect = self.fake_socket_getsockopt_func
+ expected_result = True
+ result = daemon.daemon.is_socket(test_fd)
+ self.assertIs(result, expected_result)
+
+ def test_returns_false_if_stdin_socket_raises_error(self):
+ """ Should return True if `stdin` is a socket and raises error. """
+ test_fd = 23
+ getsockopt = self.mock_socket.getsockopt
+ getsockopt.side_effect = socket.error(
+ object(), "Weird socket stuff")
+ expected_result = True
+ result = daemon.daemon.is_socket(test_fd)
+ self.assertIs(result, expected_result)
+
+
+class is_process_started_by_superserver_TestCase(scaffold.TestCase):
+ """ Test cases for is_process_started_by_superserver function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(is_process_started_by_superserver_TestCase, self).setUp()
+
+ def fake_is_socket(fd):
+ if sys.__stdin__.fileno() == fd:
+ result = self.fake_stdin_is_socket_func()
+ else:
+ result = False
+ return result
+
+ self.fake_stdin_is_socket_func = (lambda: False)
+
+ func_patcher_is_socket = mock.patch.object(
+ daemon.daemon, "is_socket",
+ side_effect=fake_is_socket)
+ func_patcher_is_socket.start()
+ self.addCleanup(func_patcher_is_socket.stop)
+
+ def test_returns_false_by_default(self):
+ """ Should return False under normal circumstances. """
+ expected_result = False
+ result = daemon.daemon.is_process_started_by_superserver()
+ self.assertIs(result, expected_result)
+
+ def test_returns_true_if_stdin_is_socket(self):
+ """ Should return True if `stdin` is a socket. """
+ self.fake_stdin_is_socket_func = (lambda: True)
+ expected_result = True
+ result = daemon.daemon.is_process_started_by_superserver()
+ self.assertIs(result, expected_result)
+
+
+@mock.patch.object(
+ daemon.daemon, "is_process_started_by_superserver",
+ return_value=False)
+@mock.patch.object(
+ daemon.daemon, "is_process_started_by_init",
+ return_value=False)
+class is_detach_process_context_required_TestCase(scaffold.TestCase):
+ """ Test cases for is_detach_process_context_required function. """
+
+ def test_returns_true_by_default(
+ self,
+ mock_func_is_process_started_by_init,
+ mock_func_is_process_started_by_superserver):
+ """ Should return True under normal circumstances. """
+ expected_result = True
+ result = daemon.daemon.is_detach_process_context_required()
+ self.assertIs(result, expected_result)
+
+ def test_returns_false_if_started_by_init(
+ self,
+ mock_func_is_process_started_by_init,
+ mock_func_is_process_started_by_superserver):
+ """ Should return False if current process started by init. """
+ mock_func_is_process_started_by_init.return_value = True
+ expected_result = False
+ result = daemon.daemon.is_detach_process_context_required()
+ self.assertIs(result, expected_result)
+
+ def test_returns_true_if_started_by_superserver(
+ self,
+ mock_func_is_process_started_by_init,
+ mock_func_is_process_started_by_superserver):
+ """ Should return False if current process started by superserver. """
+ mock_func_is_process_started_by_superserver.return_value = True
+ expected_result = False
+ result = daemon.daemon.is_detach_process_context_required()
+ self.assertIs(result, expected_result)
+
+
+def setup_streams_fixtures(testcase):
+ """ Set up common test fixtures for standard streams. """
+ testcase.stream_file_paths = dict(
+ stdin=tempfile.mktemp(),
+ stdout=tempfile.mktemp(),
+ stderr=tempfile.mktemp(),
+ )
+
+ testcase.stream_files_by_name = dict(
+ (name, FakeFileDescriptorStringIO())
+ for name in ['stdin', 'stdout', 'stderr']
+ )
+
+ testcase.stream_files_by_path = dict(
+ (testcase.stream_file_paths[name],
+ testcase.stream_files_by_name[name])
+ for name in ['stdin', 'stdout', 'stderr']
+ )
+
+
+@mock.patch.object(os, "dup2")
+class redirect_stream_TestCase(scaffold.TestCase):
+ """ Test cases for redirect_stream function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(redirect_stream_TestCase, self).setUp()
+
+ self.test_system_stream = FakeFileDescriptorStringIO()
+ self.test_target_stream = FakeFileDescriptorStringIO()
+ self.test_null_file = FakeFileDescriptorStringIO()
+
+ def fake_os_open(path, flag, mode=None):
+ if path == os.devnull:
+ result = self.test_null_file.fileno()
+ else:
+ raise FileNotFoundError("No such file", path)
+ return result
+
+ func_patcher_os_open = mock.patch.object(
+ os, "open",
+ side_effect=fake_os_open)
+ self.mock_func_os_open = func_patcher_os_open.start()
+ self.addCleanup(func_patcher_os_open.stop)
+
+ def test_duplicates_target_file_descriptor(
+ self, mock_func_os_dup2):
+ """ Should duplicate file descriptor from target to system stream. """
+ system_stream = self.test_system_stream
+ system_fileno = system_stream.fileno()
+ target_stream = self.test_target_stream
+ target_fileno = target_stream.fileno()
+ daemon.daemon.redirect_stream(system_stream, target_stream)
+ mock_func_os_dup2.assert_called_with(target_fileno, system_fileno)
+
+ def test_duplicates_null_file_descriptor_by_default(
+ self, mock_func_os_dup2):
+ """ Should by default duplicate the null file to the system stream. """
+ system_stream = self.test_system_stream
+ system_fileno = system_stream.fileno()
+ target_stream = None
+ null_path = os.devnull
+ null_flag = os.O_RDWR
+ null_file = self.test_null_file
+ null_fileno = null_file.fileno()
+ daemon.daemon.redirect_stream(system_stream, target_stream)
+ self.mock_func_os_open.assert_called_with(null_path, null_flag)
+ mock_func_os_dup2.assert_called_with(null_fileno, system_fileno)
+
+
+class make_default_signal_map_TestCase(scaffold.TestCase):
+ """ Test cases for make_default_signal_map function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(make_default_signal_map_TestCase, self).setUp()
+
+ # Use whatever default string type this Python version needs.
+ signal_module_name = str('signal')
+ self.fake_signal_module = ModuleType(signal_module_name)
+
+ fake_signal_names = [
+ 'SIGHUP',
+ 'SIGCLD',
+ 'SIGSEGV',
+ 'SIGTSTP',
+ 'SIGTTIN',
+ 'SIGTTOU',
+ 'SIGTERM',
+ ]
+ for name in fake_signal_names:
+ setattr(self.fake_signal_module, name, object())
+
+ module_patcher_signal = mock.patch.object(
+ daemon.daemon, "signal", new=self.fake_signal_module)
+ module_patcher_signal.start()
+ self.addCleanup(module_patcher_signal.stop)
+
+ default_signal_map_by_name = {
+ 'SIGTSTP': None,
+ 'SIGTTIN': None,
+ 'SIGTTOU': None,
+ 'SIGTERM': 'terminate',
+ }
+ self.default_signal_map = dict(
+ (getattr(self.fake_signal_module, name), target)
+ for (name, target) in default_signal_map_by_name.items())
+
+ def test_returns_constructed_signal_map(self):
+ """ Should return map per default. """
+ expected_result = self.default_signal_map
+ result = daemon.daemon.make_default_signal_map()
+ self.assertEqual(expected_result, result)
+
+ def test_returns_signal_map_with_only_ids_in_signal_module(self):
+ """ Should return map with only signals in the `signal` module.
+
+ The `signal` module is documented to only define those
+ signals which exist on the running system. Therefore the
+ default map should not contain any signals which are not
+ defined in the `signal` module.
+
+ """
+ del(self.default_signal_map[self.fake_signal_module.SIGTTOU])
+ del(self.fake_signal_module.SIGTTOU)
+ expected_result = self.default_signal_map
+ result = daemon.daemon.make_default_signal_map()
+ self.assertEqual(expected_result, result)
+
+
+@mock.patch.object(daemon.daemon.signal, "signal")
+class set_signal_handlers_TestCase(scaffold.TestCase):
+ """ Test cases for set_signal_handlers function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(set_signal_handlers_TestCase, self).setUp()
+
+ self.signal_handler_map = {
+ signal.SIGQUIT: object(),
+ signal.SIGSEGV: object(),
+ signal.SIGINT: object(),
+ }
+
+ def test_sets_signal_handler_for_each_item(self, mock_func_signal_signal):
+ """ Should set signal handler for each item in map. """
+ signal_handler_map = self.signal_handler_map
+ expected_calls = [
+ mock.call(signal_number, handler)
+ for (signal_number, handler) in signal_handler_map.items()]
+ daemon.daemon.set_signal_handlers(signal_handler_map)
+ self.assertEquals(expected_calls, mock_func_signal_signal.mock_calls)
+
+
+@mock.patch.object(daemon.daemon.atexit, "register")
+class register_atexit_function_TestCase(scaffold.TestCase):
+ """ Test cases for register_atexit_function function. """
+
+ def test_registers_function_for_atexit_processing(
+ self, mock_func_atexit_register):
+ """ Should register specified function for atexit processing. """
+ func = object()
+ daemon.daemon.register_atexit_function(func)
+ mock_func_atexit_register.assert_called_with(func)
+
+
+# Local variables:
+# coding: utf-8
+# mode: python
+# End:
+# vim: fileencoding=utf-8 filetype=python :
diff --git a/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_metadata.py b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_metadata.py
new file mode 100755
index 00000000..692753f4
--- /dev/null
+++ b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_metadata.py
@@ -0,0 +1,380 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_metadata.py
+# Part of ‘python-daemon’, an implementation of PEP 3143.
+#
+# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Apache License, version 2.0 as published by the
+# Apache Software Foundation.
+# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
+
+""" Unit test for ‘_metadata’ private module.
+ """
+
+from __future__ import (absolute_import, unicode_literals)
+
+import sys
+import errno
+import re
+try:
+ # Python 3 standard library.
+ import urllib.parse as urlparse
+except ImportError:
+ # Python 2 standard library.
+ import urlparse
+import functools
+import collections
+import json
+
+import pkg_resources
+import mock
+import testtools.helpers
+import testtools.matchers
+import testscenarios
+
+from . import scaffold
+from .scaffold import (basestring, unicode)
+
+import daemon._metadata as metadata
+
+
+class HasAttribute(testtools.matchers.Matcher):
+ """ A matcher to assert an object has a named attribute. """
+
+ def __init__(self, name):
+ self.attribute_name = name
+
+ def match(self, instance):
+ """ Assert the object `instance` has an attribute named `name`. """
+ result = None
+ if not testtools.helpers.safe_hasattr(instance, self.attribute_name):
+ result = AttributeNotFoundMismatch(instance, self.attribute_name)
+ return result
+
+
+class AttributeNotFoundMismatch(testtools.matchers.Mismatch):
+ """ The specified instance does not have the named attribute. """
+
+ def __init__(self, instance, name):
+ self.instance = instance
+ self.attribute_name = name
+
+ def describe(self):
+ """ Emit a text description of this mismatch. """
+ text = (
+ "{instance!r}"
+ " has no attribute named {name!r}").format(
+ instance=self.instance, name=self.attribute_name)
+ return text
+
+
+class metadata_value_TestCase(scaffold.TestCaseWithScenarios):
+ """ Test cases for metadata module values. """
+
+ expected_str_attributes = set([
+ 'version_installed',
+ 'author',
+ 'copyright',
+ 'license',
+ 'url',
+ ])
+
+ scenarios = [
+ (name, {'attribute_name': name})
+ for name in expected_str_attributes]
+ for (name, params) in scenarios:
+ if name == 'version_installed':
+ # No duck typing, this attribute might be None.
+ params['ducktype_attribute_name'] = NotImplemented
+ continue
+ # Expect an attribute of ‘str’ to test this value.
+ params['ducktype_attribute_name'] = 'isdigit'
+
+ def test_module_has_attribute(self):
+ """ Metadata should have expected value as a module attribute. """
+ self.assertThat(
+ metadata, HasAttribute(self.attribute_name))
+
+ def test_module_attribute_has_duck_type(self):
+ """ Metadata value should have expected duck-typing attribute. """
+ if self.ducktype_attribute_name == NotImplemented:
+ self.skipTest("Can't assert this attribute's type")
+ instance = getattr(metadata, self.attribute_name)
+ self.assertThat(
+ instance, HasAttribute(self.ducktype_attribute_name))
+
+
+class parse_person_field_TestCase(
+ testscenarios.WithScenarios, testtools.TestCase):
+ """ Test cases for ‘get_latest_version’ function. """
+
+ scenarios = [
+ ('simple', {
+ 'test_person': "Foo Bar <foo.bar@example.com>",
+ 'expected_result': ("Foo Bar", "foo.bar@example.com"),
+ }),
+ ('empty', {
+ 'test_person': "",
+ 'expected_result': (None, None),
+ }),
+ ('none', {
+ 'test_person': None,
+ 'expected_error': TypeError,
+ }),
+ ('no email', {
+ 'test_person': "Foo Bar",
+ 'expected_result': ("Foo Bar", None),
+ }),
+ ]
+
+ def test_returns_expected_result(self):
+ """ Should return expected result. """
+ if hasattr(self, 'expected_error'):
+ self.assertRaises(
+ self.expected_error,
+ metadata.parse_person_field, self.test_person)
+ else:
+ result = metadata.parse_person_field(self.test_person)
+ self.assertEqual(self.expected_result, result)
+
+
+class YearRange_TestCase(scaffold.TestCaseWithScenarios):
+ """ Test cases for ‘YearRange’ class. """
+
+ scenarios = [
+ ('simple', {
+ 'begin_year': 1970,
+ 'end_year': 1979,
+ 'expected_text': "1970–1979",
+ }),
+ ('same year', {
+ 'begin_year': 1970,
+ 'end_year': 1970,
+ 'expected_text': "1970",
+ }),
+ ('no end year', {
+ 'begin_year': 1970,
+ 'end_year': None,
+ 'expected_text': "1970",
+ }),
+ ]
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(YearRange_TestCase, self).setUp()
+
+ self.test_instance = metadata.YearRange(
+ self.begin_year, self.end_year)
+
+ def test_text_representation_as_expected(self):
+ """ Text representation should be as expected. """
+ result = unicode(self.test_instance)
+ self.assertEqual(result, self.expected_text)
+
+
+FakeYearRange = collections.namedtuple('FakeYearRange', ['begin', 'end'])
+
+@mock.patch.object(metadata, 'YearRange', new=FakeYearRange)
+class make_year_range_TestCase(scaffold.TestCaseWithScenarios):
+ """ Test cases for ‘make_year_range’ function. """
+
+ scenarios = [
+ ('simple', {
+ 'begin_year': "1970",
+ 'end_date': "1979-01-01",
+ 'expected_range': FakeYearRange(begin=1970, end=1979),
+ }),
+ ('same year', {
+ 'begin_year': "1970",
+ 'end_date': "1970-01-01",
+ 'expected_range': FakeYearRange(begin=1970, end=1970),
+ }),
+ ('no end year', {
+ 'begin_year': "1970",
+ 'end_date': None,
+ 'expected_range': FakeYearRange(begin=1970, end=None),
+ }),
+ ('end date UNKNOWN token', {
+ 'begin_year': "1970",
+ 'end_date': "UNKNOWN",
+ 'expected_range': FakeYearRange(begin=1970, end=None),
+ }),
+ ('end date FUTURE token', {
+ 'begin_year': "1970",
+ 'end_date': "FUTURE",
+ 'expected_range': FakeYearRange(begin=1970, end=None),
+ }),
+ ]
+
+ def test_result_matches_expected_range(self):
+ """ Result should match expected YearRange. """
+ result = metadata.make_year_range(self.begin_year, self.end_date)
+ self.assertEqual(result, self.expected_range)
+
+
+class metadata_content_TestCase(scaffold.TestCase):
+ """ Test cases for content of metadata. """
+
+ def test_copyright_formatted_correctly(self):
+ """ Copyright statement should be formatted correctly. """
+ regex_pattern = (
+ "Copyright © "
+ "\d{4}" # four-digit year
+ "(?:–\d{4})?" # optional range dash and ending four-digit year
+ )
+ regex_flags = re.UNICODE
+ self.assertThat(
+ metadata.copyright,
+ testtools.matchers.MatchesRegex(regex_pattern, regex_flags))
+
+ def test_author_formatted_correctly(self):
+ """ Author information should be formatted correctly. """
+ regex_pattern = (
+ ".+ " # name
+ "<[^>]+>" # email address, in angle brackets
+ )
+ regex_flags = re.UNICODE
+ self.assertThat(
+ metadata.author,
+ testtools.matchers.MatchesRegex(regex_pattern, regex_flags))
+
+ def test_copyright_contains_author(self):
+ """ Copyright information should contain author information. """
+ self.assertThat(
+ metadata.copyright,
+ testtools.matchers.Contains(metadata.author))
+
+ def test_url_parses_correctly(self):
+ """ Homepage URL should parse correctly. """
+ result = urlparse.urlparse(metadata.url)
+ self.assertIsInstance(
+ result, urlparse.ParseResult,
+ "URL value {url!r} did not parse correctly".format(
+ url=metadata.url))
+
+
+try:
+ FileNotFoundError
+except NameError:
+ # Python 2 uses IOError.
+ FileNotFoundError = functools.partial(IOError, errno.ENOENT)
+
+version_info_filename = "version_info.json"
+
+def fake_func_has_metadata(testcase, resource_name):
+ """ Fake the behaviour of ‘pkg_resources.Distribution.has_metadata’. """
+ if (
+ resource_name != testcase.expected_resource_name
+ or not hasattr(testcase, 'test_version_info')):
+ return False
+ return True
+
+
+def fake_func_get_metadata(testcase, resource_name):
+ """ Fake the behaviour of ‘pkg_resources.Distribution.get_metadata’. """
+ if not fake_func_has_metadata(testcase, resource_name):
+ error = FileNotFoundError(resource_name)
+ raise error
+ content = testcase.test_version_info
+ return content
+
+
+def fake_func_get_distribution(testcase, distribution_name):
+ """ Fake the behaviour of ‘pkg_resources.get_distribution’. """
+ if distribution_name != metadata.distribution_name:
+ raise pkg_resources.DistributionNotFound
+ if hasattr(testcase, 'get_distribution_error'):
+ raise testcase.get_distribution_error
+ mock_distribution = testcase.mock_distribution
+ mock_distribution.has_metadata.side_effect = functools.partial(
+ fake_func_has_metadata, testcase)
+ mock_distribution.get_metadata.side_effect = functools.partial(
+ fake_func_get_metadata, testcase)
+ return mock_distribution
+
+
+@mock.patch.object(metadata, 'distribution_name', new="mock-dist")
+class get_distribution_version_info_TestCase(scaffold.TestCaseWithScenarios):
+ """ Test cases for ‘get_distribution_version_info’ function. """
+
+ default_version_info = {
+ 'release_date': "UNKNOWN",
+ 'version': "UNKNOWN",
+ 'maintainer': "UNKNOWN",
+ }
+
+ scenarios = [
+ ('version 0.0', {
+ 'test_version_info': json.dumps({
+ 'version': "0.0",
+ }),
+ 'expected_version_info': {'version': "0.0"},
+ }),
+ ('version 1.0', {
+ 'test_version_info': json.dumps({
+ 'version': "1.0",
+ }),
+ 'expected_version_info': {'version': "1.0"},
+ }),
+ ('file lorem_ipsum.json', {
+ 'version_info_filename': "lorem_ipsum.json",
+ 'test_version_info': json.dumps({
+ 'version': "1.0",
+ }),
+ 'expected_version_info': {'version': "1.0"},
+ }),
+ ('not installed', {
+ 'get_distribution_error': pkg_resources.DistributionNotFound(),
+ 'expected_version_info': default_version_info,
+ }),
+ ('no version_info', {
+ 'expected_version_info': default_version_info,
+ }),
+ ]
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(get_distribution_version_info_TestCase, self).setUp()
+
+ if hasattr(self, 'expected_resource_name'):
+ self.test_args = {'filename': self.expected_resource_name}
+ else:
+ self.test_args = {}
+ self.expected_resource_name = version_info_filename
+
+ self.mock_distribution = mock.MagicMock()
+ func_patcher_get_distribution = mock.patch.object(
+ pkg_resources, 'get_distribution')
+ func_patcher_get_distribution.start()
+ self.addCleanup(func_patcher_get_distribution.stop)
+ pkg_resources.get_distribution.side_effect = functools.partial(
+ fake_func_get_distribution, self)
+
+ def test_requests_installed_distribution(self):
+ """ The package distribution should be retrieved. """
+ expected_distribution_name = metadata.distribution_name
+ version_info = metadata.get_distribution_version_info(**self.test_args)
+ pkg_resources.get_distribution.assert_called_with(
+ expected_distribution_name)
+
+ def test_requests_specified_filename(self):
+ """ The specified metadata resource name should be requested. """
+ if hasattr(self, 'get_distribution_error'):
+ self.skipTest("No access to distribution")
+ version_info = metadata.get_distribution_version_info(**self.test_args)
+ self.mock_distribution.has_metadata.assert_called_with(
+ self.expected_resource_name)
+
+ def test_result_matches_expected_items(self):
+ """ The result should match the expected items. """
+ version_info = metadata.get_distribution_version_info(**self.test_args)
+ self.assertEqual(self.expected_version_info, version_info)
+
+
+# Local variables:
+# coding: utf-8
+# mode: python
+# End:
+# vim: fileencoding=utf-8 filetype=python :
diff --git a/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_pidfile.py b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_pidfile.py
new file mode 100755
index 00000000..9b636ec8
--- /dev/null
+++ b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_pidfile.py
@@ -0,0 +1,472 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_pidfile.py
+# Part of ‘python-daemon’, an implementation of PEP 3143.
+#
+# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Apache License, version 2.0 as published by the
+# Apache Software Foundation.
+# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
+
+""" Unit test for ‘pidfile’ module.
+ """
+
+from __future__ import (absolute_import, unicode_literals)
+
+try:
+ # Python 3 standard library.
+ import builtins
+except ImportError:
+ # Python 2 standard library.
+ import __builtin__ as builtins
+import os
+import itertools
+import tempfile
+import errno
+import functools
+try:
+ # Standard library of Python 2.7 and later.
+ from io import StringIO
+except ImportError:
+ # Standard library of Python 2.6 and earlier.
+ from StringIO import StringIO
+
+import mock
+import lockfile
+
+from . import scaffold
+
+import daemon.pidfile
+
+
+class FakeFileDescriptorStringIO(StringIO, object):
+ """ A StringIO class that fakes a file descriptor. """
+
+ _fileno_generator = itertools.count()
+
+ def __init__(self, *args, **kwargs):
+ self._fileno = next(self._fileno_generator)
+ super(FakeFileDescriptorStringIO, self).__init__(*args, **kwargs)
+
+ def fileno(self):
+ return self._fileno
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+
+try:
+ FileNotFoundError
+ PermissionError
+except NameError:
+ # Python 2 uses IOError.
+ FileNotFoundError = functools.partial(IOError, errno.ENOENT)
+ PermissionError = functools.partial(IOError, errno.EPERM)
+
+
+def make_pidlockfile_scenarios():
+ """ Make a collection of scenarios for testing `PIDLockFile` instances.
+
+ :return: A collection of scenarios for tests involving
+ `PIDLockfFile` instances.
+
+ The collection is a mapping from scenario name to a dictionary of
+ scenario attributes.
+
+ """
+
+ fake_current_pid = 235
+ fake_other_pid = 8642
+ fake_pidfile_path = tempfile.mktemp()
+
+ fake_pidfile_empty = FakeFileDescriptorStringIO()
+ fake_pidfile_current_pid = FakeFileDescriptorStringIO(
+ "{pid:d}\n".format(pid=fake_current_pid))
+ fake_pidfile_other_pid = FakeFileDescriptorStringIO(
+ "{pid:d}\n".format(pid=fake_other_pid))
+ fake_pidfile_bogus = FakeFileDescriptorStringIO(
+ "b0gUs")
+
+ scenarios = {
+ 'simple': {},
+ 'not-exist': {
+ 'open_func_name': 'fake_open_nonexist',
+ 'os_open_func_name': 'fake_os_open_nonexist',
+ },
+ 'not-exist-write-denied': {
+ 'open_func_name': 'fake_open_nonexist',
+ 'os_open_func_name': 'fake_os_open_nonexist',
+ },
+ 'not-exist-write-busy': {
+ 'open_func_name': 'fake_open_nonexist',
+ 'os_open_func_name': 'fake_os_open_nonexist',
+ },
+ 'exist-read-denied': {
+ 'open_func_name': 'fake_open_read_denied',
+ 'os_open_func_name': 'fake_os_open_read_denied',
+ },
+ 'exist-locked-read-denied': {
+ 'locking_pid': fake_other_pid,
+ 'open_func_name': 'fake_open_read_denied',
+ 'os_open_func_name': 'fake_os_open_read_denied',
+ },
+ 'exist-empty': {},
+ 'exist-invalid': {
+ 'pidfile': fake_pidfile_bogus,
+ },
+ 'exist-current-pid': {
+ 'pidfile': fake_pidfile_current_pid,
+ 'pidfile_pid': fake_current_pid,
+ },
+ 'exist-current-pid-locked': {
+ 'pidfile': fake_pidfile_current_pid,
+ 'pidfile_pid': fake_current_pid,
+ 'locking_pid': fake_current_pid,
+ },
+ 'exist-other-pid': {
+ 'pidfile': fake_pidfile_other_pid,
+ 'pidfile_pid': fake_other_pid,
+ },
+ 'exist-other-pid-locked': {
+ 'pidfile': fake_pidfile_other_pid,
+ 'pidfile_pid': fake_other_pid,
+ 'locking_pid': fake_other_pid,
+ },
+ }
+
+ for scenario in scenarios.values():
+ scenario['pid'] = fake_current_pid
+ scenario['pidfile_path'] = fake_pidfile_path
+ if 'pidfile' not in scenario:
+ scenario['pidfile'] = fake_pidfile_empty
+ if 'pidfile_pid' not in scenario:
+ scenario['pidfile_pid'] = None
+ if 'locking_pid' not in scenario:
+ scenario['locking_pid'] = None
+ if 'open_func_name' not in scenario:
+ scenario['open_func_name'] = 'fake_open_okay'
+ if 'os_open_func_name' not in scenario:
+ scenario['os_open_func_name'] = 'fake_os_open_okay'
+
+ return scenarios
+
+
+def setup_pidfile_fixtures(testcase):
+ """ Set up common fixtures for PID file test cases.
+
+ :param testcase: A `TestCase` instance to decorate.
+
+ Decorate the `testcase` with attributes to be fixtures for tests
+ involving `PIDLockFile` instances.
+
+ """
+ scenarios = make_pidlockfile_scenarios()
+ testcase.pidlockfile_scenarios = scenarios
+
+ def get_scenario_option(testcase, key, default=None):
+ value = default
+ try:
+ value = testcase.scenario[key]
+ except (NameError, TypeError, AttributeError, KeyError):
+ pass
+ return value
+
+ func_patcher_os_getpid = mock.patch.object(
+ os, "getpid",
+ return_value=scenarios['simple']['pid'])
+ func_patcher_os_getpid.start()
+ testcase.addCleanup(func_patcher_os_getpid.stop)
+
+ def make_fake_open_funcs(testcase):
+
+ def fake_open_nonexist(filename, mode, buffering):
+ if mode.startswith('r'):
+ error = FileNotFoundError(
+ "No such file {filename!r}".format(
+ filename=filename))
+ raise error
+ else:
+ result = testcase.scenario['pidfile']
+ return result
+
+ def fake_open_read_denied(filename, mode, buffering):
+ if mode.startswith('r'):
+ error = PermissionError(
+ "Read denied on {filename!r}".format(
+ filename=filename))
+ raise error
+ else:
+ result = testcase.scenario['pidfile']
+ return result
+
+ def fake_open_okay(filename, mode, buffering):
+ result = testcase.scenario['pidfile']
+ return result
+
+ def fake_os_open_nonexist(filename, flags, mode):
+ if (flags & os.O_CREAT):
+ result = testcase.scenario['pidfile'].fileno()
+ else:
+ error = FileNotFoundError(
+ "No such file {filename!r}".format(
+ filename=filename))
+ raise error
+ return result
+
+ def fake_os_open_read_denied(filename, flags, mode):
+ if (flags & os.O_CREAT):
+ result = testcase.scenario['pidfile'].fileno()
+ else:
+ error = PermissionError(
+ "Read denied on {filename!r}".format(
+ filename=filename))
+ raise error
+ return result
+
+ def fake_os_open_okay(filename, flags, mode):
+ result = testcase.scenario['pidfile'].fileno()
+ return result
+
+ funcs = dict(
+ (name, obj) for (name, obj) in vars().items()
+ if callable(obj))
+
+ return funcs
+
+ testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase)
+
+ def fake_open(filename, mode='rt', buffering=None):
+ scenario_path = get_scenario_option(testcase, 'pidfile_path')
+ if filename == scenario_path:
+ func_name = testcase.scenario['open_func_name']
+ fake_open_func = testcase.fake_pidfile_open_funcs[func_name]
+ result = fake_open_func(filename, mode, buffering)
+ else:
+ result = FakeFileDescriptorStringIO()
+ return result
+
+ mock_open = mock.mock_open()
+ mock_open.side_effect = fake_open
+
+ func_patcher_builtin_open = mock.patch.object(
+ builtins, "open",
+ new=mock_open)
+ func_patcher_builtin_open.start()
+ testcase.addCleanup(func_patcher_builtin_open.stop)
+
+ def fake_os_open(filename, flags, mode=None):
+ scenario_path = get_scenario_option(testcase, 'pidfile_path')
+ if filename == scenario_path:
+ func_name = testcase.scenario['os_open_func_name']
+ fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name]
+ result = fake_os_open_func(filename, flags, mode)
+ else:
+ result = FakeFileDescriptorStringIO().fileno()
+ return result
+
+ mock_os_open = mock.MagicMock(side_effect=fake_os_open)
+
+ func_patcher_os_open = mock.patch.object(
+ os, "open",
+ new=mock_os_open)
+ func_patcher_os_open.start()
+ testcase.addCleanup(func_patcher_os_open.stop)
+
+ def fake_os_fdopen(fd, mode='rt', buffering=None):
+ scenario_pidfile = get_scenario_option(
+ testcase, 'pidfile', FakeFileDescriptorStringIO())
+ if fd == testcase.scenario['pidfile'].fileno():
+ result = testcase.scenario['pidfile']
+ else:
+ raise OSError(errno.EBADF, "Bad file descriptor")
+ return result
+
+ mock_os_fdopen = mock.MagicMock(side_effect=fake_os_fdopen)
+
+ func_patcher_os_fdopen = mock.patch.object(
+ os, "fdopen",
+ new=mock_os_fdopen)
+ func_patcher_os_fdopen.start()
+ testcase.addCleanup(func_patcher_os_fdopen.stop)
+
+
+def make_lockfile_method_fakes(scenario):
+ """ Make common fake methods for lockfile class.
+
+ :param scenario: A scenario for testing with PIDLockFile.
+ :return: A mapping from normal function name to the corresponding
+ fake function.
+
+ Each fake function behaves appropriately for the specified `scenario`.
+
+ """
+
+ def fake_func_read_pid():
+ return scenario['pidfile_pid']
+ def fake_func_is_locked():
+ return (scenario['locking_pid'] is not None)
+ def fake_func_i_am_locking():
+ return (
+ scenario['locking_pid'] == scenario['pid'])
+ def fake_func_acquire(timeout=None):
+ if scenario['locking_pid'] is not None:
+ raise lockfile.AlreadyLocked()
+ scenario['locking_pid'] = scenario['pid']
+ def fake_func_release():
+ if scenario['locking_pid'] is None:
+ raise lockfile.NotLocked()
+ if scenario['locking_pid'] != scenario['pid']:
+ raise lockfile.NotMyLock()
+ scenario['locking_pid'] = None
+ def fake_func_break_lock():
+ scenario['locking_pid'] = None
+
+ fake_methods = dict(
+ (
+ func_name.replace('fake_func_', ''),
+ mock.MagicMock(side_effect=fake_func))
+ for (func_name, fake_func) in vars().items()
+ if func_name.startswith('fake_func_'))
+
+ return fake_methods
+
+
+def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario):
+ """ Apply common fake methods to mock lockfile class.
+
+ :param mock_lockfile: An object providing the `LockFile` interface.
+ :param testcase: The `TestCase` instance providing the context for
+ the patch.
+ :param scenario: The `PIDLockFile` test scenario to use.
+
+ Mock the `LockFile` methods of `mock_lockfile`, by applying fake
+ methods customised for `scenario`. The mock is does by a patch
+ within the context of `testcase`.
+
+ """
+ fake_methods = dict(
+ (func_name, fake_func)
+ for (func_name, fake_func) in
+ make_lockfile_method_fakes(scenario).items()
+ if func_name not in ['read_pid'])
+
+ for (func_name, fake_func) in fake_methods.items():
+ func_patcher = mock.patch.object(
+ mock_lockfile, func_name,
+ new=fake_func)
+ func_patcher.start()
+ testcase.addCleanup(func_patcher.stop)
+
+
+def setup_pidlockfile_fixtures(testcase, scenario_name=None):
+ """ Set up common fixtures for PIDLockFile test cases.
+
+ :param testcase: The `TestCase` instance to decorate.
+ :param scenario_name: The name of the `PIDLockFile` scenario to use.
+
+ Decorate the `testcase` with attributes that are fixtures for test
+ cases involving `PIDLockFile` instances.`
+
+ """
+
+ setup_pidfile_fixtures(testcase)
+
+ for func_name in [
+ 'write_pid_to_pidfile',
+ 'remove_existing_pidfile',
+ ]:
+ func_patcher = mock.patch.object(lockfile.pidlockfile, func_name)
+ func_patcher.start()
+ testcase.addCleanup(func_patcher.stop)
+
+
+class TimeoutPIDLockFile_TestCase(scaffold.TestCase):
+ """ Test cases for ‘TimeoutPIDLockFile’ class. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(TimeoutPIDLockFile_TestCase, self).setUp()
+
+ pidlockfile_scenarios = make_pidlockfile_scenarios()
+ self.pidlockfile_scenario = pidlockfile_scenarios['simple']
+ pidfile_path = self.pidlockfile_scenario['pidfile_path']
+
+ for func_name in ['__init__', 'acquire']:
+ func_patcher = mock.patch.object(
+ lockfile.pidlockfile.PIDLockFile, func_name)
+ func_patcher.start()
+ self.addCleanup(func_patcher.stop)
+
+ self.scenario = {
+ 'pidfile_path': self.pidlockfile_scenario['pidfile_path'],
+ 'acquire_timeout': self.getUniqueInteger(),
+ }
+
+ self.test_kwargs = dict(
+ path=self.scenario['pidfile_path'],
+ acquire_timeout=self.scenario['acquire_timeout'],
+ )
+ self.test_instance = daemon.pidfile.TimeoutPIDLockFile(
+ **self.test_kwargs)
+
+ def test_inherits_from_pidlockfile(self):
+ """ Should inherit from PIDLockFile. """
+ instance = self.test_instance
+ self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile)
+
+ def test_init_has_expected_signature(self):
+ """ Should have expected signature for ‘__init__’. """
+ def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass
+ test_func.__name__ = str('__init__')
+ self.assertFunctionSignatureMatch(
+ test_func,
+ daemon.pidfile.TimeoutPIDLockFile.__init__)
+
+ def test_has_specified_acquire_timeout(self):
+ """ Should have specified ‘acquire_timeout’ value. """
+ instance = self.test_instance
+ expected_timeout = self.test_kwargs['acquire_timeout']
+ self.assertEqual(expected_timeout, instance.acquire_timeout)
+
+ @mock.patch.object(
+ lockfile.pidlockfile.PIDLockFile, "__init__",
+ autospec=True)
+ def test_calls_superclass_init(self, mock_init):
+ """ Should call the superclass ‘__init__’. """
+ expected_path = self.test_kwargs['path']
+ instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs)
+ mock_init.assert_called_with(instance, expected_path)
+
+ @mock.patch.object(
+ lockfile.pidlockfile.PIDLockFile, "acquire",
+ autospec=True)
+ def test_acquire_uses_specified_timeout(self, mock_func_acquire):
+ """ Should call the superclass ‘acquire’ with specified timeout. """
+ instance = self.test_instance
+ test_timeout = self.getUniqueInteger()
+ expected_timeout = test_timeout
+ instance.acquire(test_timeout)
+ mock_func_acquire.assert_called_with(instance, expected_timeout)
+
+ @mock.patch.object(
+ lockfile.pidlockfile.PIDLockFile, "acquire",
+ autospec=True)
+ def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire):
+ """ Should call superclass ‘acquire’ with stored timeout by default. """
+ instance = self.test_instance
+ test_timeout = self.test_kwargs['acquire_timeout']
+ expected_timeout = test_timeout
+ instance.acquire()
+ mock_func_acquire.assert_called_with(instance, expected_timeout)
+
+
+# Local variables:
+# coding: utf-8
+# mode: python
+# End:
+# vim: fileencoding=utf-8 filetype=python :
diff --git a/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_runner.py b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_runner.py
new file mode 100755
index 00000000..4c0c714a
--- /dev/null
+++ b/scripts/automation/trex_control_plane/python_lib/python-daemon-2.0.5/test/test_runner.py
@@ -0,0 +1,675 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_runner.py
+# Part of ‘python-daemon’, an implementation of PEP 3143.
+#
+# Copyright © 2009–2015 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Apache License, version 2.0 as published by the
+# Apache Software Foundation.
+# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
+
+""" Unit test for ‘runner’ module.
+ """
+
+from __future__ import (absolute_import, unicode_literals)
+
+try:
+ # Python 3 standard library.
+ import builtins
+except ImportError:
+ # Python 2 standard library.
+ import __builtin__ as builtins
+import os
+import os.path
+import sys
+import tempfile
+import errno
+import signal
+import functools
+
+import lockfile
+import mock
+import testtools
+
+from . import scaffold
+from .scaffold import (basestring, unicode)
+from .test_pidfile import (
+ FakeFileDescriptorStringIO,
+ setup_pidfile_fixtures,
+ make_pidlockfile_scenarios,
+ apply_lockfile_method_mocks,
+ )
+from .test_daemon import (
+ setup_streams_fixtures,
+ )
+
+import daemon.daemon
+import daemon.runner
+import daemon.pidfile
+
+
+class ModuleExceptions_TestCase(scaffold.Exception_TestCase):
+ """ Test cases for module exception classes. """
+
+ scenarios = scaffold.make_exception_scenarios([
+ ('daemon.runner.DaemonRunnerError', dict(
+ exc_type = daemon.runner.DaemonRunnerError,
+ min_args = 1,
+ types = [Exception],
+ )),
+ ('daemon.runner.DaemonRunnerInvalidActionError', dict(
+ exc_type = daemon.runner.DaemonRunnerInvalidActionError,
+ min_args = 1,
+ types = [daemon.runner.DaemonRunnerError, ValueError],
+ )),
+ ('daemon.runner.DaemonRunnerStartFailureError', dict(
+ exc_type = daemon.runner.DaemonRunnerStartFailureError,
+ min_args = 1,
+ types = [daemon.runner.DaemonRunnerError, RuntimeError],
+ )),
+ ('daemon.runner.DaemonRunnerStopFailureError', dict(
+ exc_type = daemon.runner.DaemonRunnerStopFailureError,
+ min_args = 1,
+ types = [daemon.runner.DaemonRunnerError, RuntimeError],
+ )),
+ ])
+
+
+def make_runner_scenarios():
+ """ Make a collection of scenarios for testing `DaemonRunner` instances.
+
+ :return: A collection of scenarios for tests involving
+ `DaemonRunner` instances.
+
+ The collection is a mapping from scenario name to a dictionary of
+ scenario attributes.
+
+ """
+
+ pidlockfile_scenarios = make_pidlockfile_scenarios()
+
+ scenarios = {
+ 'simple': {
+ 'pidlockfile_scenario_name': 'simple',
+ },
+ 'pidfile-locked': {
+ 'pidlockfile_scenario_name': 'exist-other-pid-locked',
+ },
+ }
+
+ for scenario in scenarios.values():
+ if 'pidlockfile_scenario_name' in scenario:
+ pidlockfile_scenario = pidlockfile_scenarios.pop(
+ scenario['pidlockfile_scenario_name'])
+ scenario['pid'] = pidlockfile_scenario['pid']
+ scenario['pidfile_path'] = pidlockfile_scenario['pidfile_path']
+ scenario['pidfile_timeout'] = 23
+ scenario['pidlockfile_scenario'] = pidlockfile_scenario
+
+ return scenarios
+
+
+def set_runner_scenario(testcase, scenario_name):
+ """ Set the DaemonRunner test scenario for the test case.
+
+ :param testcase: The `TestCase` instance to decorate.
+ :param scenario_name: The name of the scenario to use.
+
+ Set the `DaemonRunner` test scenario name and decorate the
+ `testcase` with the corresponding scenario fixtures.
+
+ """
+ scenarios = testcase.runner_scenarios
+ testcase.scenario = scenarios[scenario_name]
+ apply_lockfile_method_mocks(
+ testcase.mock_runner_lockfile,
+ testcase,
+ testcase.scenario['pidlockfile_scenario'])
+
+
+def setup_runner_fixtures(testcase):
+ """ Set up common fixtures for `DaemonRunner` test cases.
+
+ :param testcase: A `TestCase` instance to decorate.
+
+ Decorate the `testcase` with attributes to be fixtures for tests
+ involving `DaemonRunner` instances.
+
+ """
+ setup_pidfile_fixtures(testcase)
+ setup_streams_fixtures(testcase)
+
+ testcase.runner_scenarios = make_runner_scenarios()
+
+ patcher_stderr = mock.patch.object(
+ sys, "stderr",
+ new=FakeFileDescriptorStringIO())
+ testcase.fake_stderr = patcher_stderr.start()
+ testcase.addCleanup(patcher_stderr.stop)
+
+ simple_scenario = testcase.runner_scenarios['simple']
+
+ testcase.mock_runner_lockfile = mock.MagicMock(
+ spec=daemon.pidfile.TimeoutPIDLockFile)
+ apply_lockfile_method_mocks(
+ testcase.mock_runner_lockfile,
+ testcase,
+ simple_scenario['pidlockfile_scenario'])
+ testcase.mock_runner_lockfile.path = simple_scenario['pidfile_path']
+
+ patcher_lockfile_class = mock.patch.object(
+ daemon.pidfile, "TimeoutPIDLockFile",
+ return_value=testcase.mock_runner_lockfile)
+ patcher_lockfile_class.start()
+ testcase.addCleanup(patcher_lockfile_class.stop)
+
+ class TestApp(object):
+
+ def __init__(self):
+ self.stdin_path = testcase.stream_file_paths['stdin']
+ self.stdout_path = testcase.stream_file_paths['stdout']
+ self.stderr_path = testcase.stream_file_paths['stderr']
+ self.pidfile_path = simple_scenario['pidfile_path']
+ self.pidfile_timeout = simple_scenario['pidfile_timeout']
+
+ run = mock.MagicMock(name="TestApp.run")
+
+ testcase.TestApp = TestApp
+
+ patcher_runner_daemoncontext = mock.patch.object(
+ daemon.runner, "DaemonContext", autospec=True)
+ patcher_runner_daemoncontext.start()
+ testcase.addCleanup(patcher_runner_daemoncontext.stop)
+
+ testcase.test_app = testcase.TestApp()
+
+ testcase.test_program_name = "bazprog"
+ testcase.test_program_path = os.path.join(
+ "/foo/bar", testcase.test_program_name)
+ testcase.valid_argv_params = {
+ 'start': [testcase.test_program_path, 'start'],
+ 'stop': [testcase.test_program_path, 'stop'],
+ 'restart': [testcase.test_program_path, 'restart'],
+ }
+
+ def fake_open(filename, mode=None, buffering=None):
+ if filename in testcase.stream_files_by_path:
+ result = testcase.stream_files_by_path[filename]
+ else:
+ result = FakeFileDescriptorStringIO()
+ result.mode = mode
+ result.buffering = buffering
+ return result
+
+ mock_open = mock.mock_open()
+ mock_open.side_effect = fake_open
+
+ func_patcher_builtin_open = mock.patch.object(
+ builtins, "open",
+ new=mock_open)
+ func_patcher_builtin_open.start()
+ testcase.addCleanup(func_patcher_builtin_open.stop)
+
+ func_patcher_os_kill = mock.patch.object(os, "kill")
+ func_patcher_os_kill.start()
+ testcase.addCleanup(func_patcher_os_kill.stop)
+
+ patcher_sys_argv = mock.patch.object(
+ sys, "argv",
+ new=testcase.valid_argv_params['start'])
+ patcher_sys_argv.start()
+ testcase.addCleanup(patcher_sys_argv.stop)
+
+ testcase.test_instance = daemon.runner.DaemonRunner(testcase.test_app)
+
+ testcase.scenario = NotImplemented
+
+
+class DaemonRunner_BaseTestCase(scaffold.TestCase):
+ """ Base class for DaemonRunner test case classes. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonRunner_BaseTestCase, self).setUp()
+
+ setup_runner_fixtures(self)
+ set_runner_scenario(self, 'simple')
+
+
+class DaemonRunner_TestCase(DaemonRunner_BaseTestCase):
+ """ Test cases for DaemonRunner class. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonRunner_TestCase, self).setUp()
+
+ func_patcher_parse_args = mock.patch.object(
+ daemon.runner.DaemonRunner, "parse_args")
+ func_patcher_parse_args.start()
+ self.addCleanup(func_patcher_parse_args.stop)
+
+ # Create a new instance now with our custom patches.
+ self.test_instance = daemon.runner.DaemonRunner(self.test_app)
+
+ def test_instantiate(self):
+ """ New instance of DaemonRunner should be created. """
+ self.assertIsInstance(self.test_instance, daemon.runner.DaemonRunner)
+
+ def test_parses_commandline_args(self):
+ """ Should parse commandline arguments. """
+ self.test_instance.parse_args.assert_called_with()
+
+ def test_has_specified_app(self):
+ """ Should have specified application object. """
+ self.assertIs(self.test_app, self.test_instance.app)
+
+ def test_sets_pidfile_none_when_pidfile_path_is_none(self):
+ """ Should set ‘pidfile’ to ‘None’ when ‘pidfile_path’ is ‘None’. """
+ pidfile_path = None
+ self.test_app.pidfile_path = pidfile_path
+ expected_pidfile = None
+ instance = daemon.runner.DaemonRunner(self.test_app)
+ self.assertIs(expected_pidfile, instance.pidfile)
+
+ def test_error_when_pidfile_path_not_string(self):
+ """ Should raise ValueError when PID file path not a string. """
+ pidfile_path = object()
+ self.test_app.pidfile_path = pidfile_path
+ expected_error = ValueError
+ self.assertRaises(
+ expected_error,
+ daemon.runner.DaemonRunner, self.test_app)
+
+ def test_error_when_pidfile_path_not_absolute(self):
+ """ Should raise ValueError when PID file path not absolute. """
+ pidfile_path = "foo/bar.pid"
+ self.test_app.pidfile_path = pidfile_path
+ expected_error = ValueError
+ self.assertRaises(
+ expected_error,
+ daemon.runner.DaemonRunner, self.test_app)
+
+ def test_creates_lock_with_specified_parameters(self):
+ """ Should create a TimeoutPIDLockFile with specified params. """
+ pidfile_path = self.scenario['pidfile_path']
+ pidfile_timeout = self.scenario['pidfile_timeout']
+ daemon.pidfile.TimeoutPIDLockFile.assert_called_with(
+ pidfile_path, pidfile_timeout)
+
+ def test_has_created_pidfile(self):
+ """ Should have new PID lock file as `pidfile` attribute. """
+ expected_pidfile = self.mock_runner_lockfile
+ instance = self.test_instance
+ self.assertIs(
+ expected_pidfile, instance.pidfile)
+
+ def test_daemon_context_has_created_pidfile(self):
+ """ DaemonContext component should have new PID lock file. """
+ expected_pidfile = self.mock_runner_lockfile
+ daemon_context = self.test_instance.daemon_context
+ self.assertIs(
+ expected_pidfile, daemon_context.pidfile)
+
+ def test_daemon_context_has_specified_stdin_stream(self):
+ """ DaemonContext component should have specified stdin file. """
+ test_app = self.test_app
+ expected_file = self.stream_files_by_name['stdin']
+ daemon_context = self.test_instance.daemon_context
+ self.assertEqual(expected_file, daemon_context.stdin)
+
+ def test_daemon_context_has_stdin_in_read_mode(self):
+ """ DaemonContext component should open stdin file for read. """
+ expected_mode = 'rt'
+ daemon_context = self.test_instance.daemon_context
+ self.assertIn(expected_mode, daemon_context.stdin.mode)
+
+ def test_daemon_context_has_specified_stdout_stream(self):
+ """ DaemonContext component should have specified stdout file. """
+ test_app = self.test_app
+ expected_file = self.stream_files_by_name['stdout']
+ daemon_context = self.test_instance.daemon_context
+ self.assertEqual(expected_file, daemon_context.stdout)
+
+ def test_daemon_context_has_stdout_in_append_mode(self):
+ """ DaemonContext component should open stdout file for append. """
+ expected_mode = 'w+t'
+ daemon_context = self.test_instance.daemon_context
+ self.assertIn(expected_mode, daemon_context.stdout.mode)
+
+ def test_daemon_context_has_specified_stderr_stream(self):
+ """ DaemonContext component should have specified stderr file. """
+ test_app = self.test_app
+ expected_file = self.stream_files_by_name['stderr']
+ daemon_context = self.test_instance.daemon_context
+ self.assertEqual(expected_file, daemon_context.stderr)
+
+ def test_daemon_context_has_stderr_in_append_mode(self):
+ """ DaemonContext component should open stderr file for append. """
+ expected_mode = 'w+t'
+ daemon_context = self.test_instance.daemon_context
+ self.assertIn(expected_mode, daemon_context.stderr.mode)
+
+ def test_daemon_context_has_stderr_with_no_buffering(self):
+ """ DaemonContext component should open stderr file unbuffered. """
+ expected_buffering = 0
+ daemon_context = self.test_instance.daemon_context
+ self.assertEqual(
+ expected_buffering, daemon_context.stderr.buffering)
+
+
+class DaemonRunner_usage_exit_TestCase(DaemonRunner_BaseTestCase):
+ """ Test cases for DaemonRunner.usage_exit method. """
+
+ def test_raises_system_exit(self):
+ """ Should raise SystemExit exception. """
+ instance = self.test_instance
+ argv = [self.test_program_path]
+ self.assertRaises(
+ SystemExit,
+ instance._usage_exit, argv)
+
+ def test_message_follows_conventional_format(self):
+ """ Should emit a conventional usage message. """
+ instance = self.test_instance
+ argv = [self.test_program_path]
+ expected_stderr_output = """\
+ usage: {progname} ...
+ """.format(
+ progname=self.test_program_name)
+ self.assertRaises(
+ SystemExit,
+ instance._usage_exit, argv)
+ self.assertOutputCheckerMatch(
+ expected_stderr_output, self.fake_stderr.getvalue())
+
+
+class DaemonRunner_parse_args_TestCase(DaemonRunner_BaseTestCase):
+ """ Test cases for DaemonRunner.parse_args method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonRunner_parse_args_TestCase, self).setUp()
+
+ func_patcher_usage_exit = mock.patch.object(
+ daemon.runner.DaemonRunner, "_usage_exit",
+ side_effect=NotImplementedError)
+ func_patcher_usage_exit.start()
+ self.addCleanup(func_patcher_usage_exit.stop)
+
+ def test_emits_usage_message_if_insufficient_args(self):
+ """ Should emit a usage message and exit if too few arguments. """
+ instance = self.test_instance
+ argv = [self.test_program_path]
+ exc = self.assertRaises(
+ NotImplementedError,
+ instance.parse_args, argv)
+ daemon.runner.DaemonRunner._usage_exit.assert_called_with(argv)
+
+ def test_emits_usage_message_if_unknown_action_arg(self):
+ """ Should emit a usage message and exit if unknown action. """
+ instance = self.test_instance
+ progname = self.test_program_name
+ argv = [self.test_program_path, 'bogus']
+ exc = self.assertRaises(
+ NotImplementedError,
+ instance.parse_args, argv)
+ daemon.runner.DaemonRunner._usage_exit.assert_called_with(argv)
+
+ def test_should_parse_system_argv_by_default(self):
+ """ Should parse sys.argv by default. """
+ instance = self.test_instance
+ expected_action = 'start'
+ argv = self.valid_argv_params['start']
+ with mock.patch.object(sys, "argv", new=argv):
+ instance.parse_args()
+ self.assertEqual(expected_action, instance.action)
+
+ def test_sets_action_from_first_argument(self):
+ """ Should set action from first commandline argument. """
+ instance = self.test_instance
+ for name, argv in self.valid_argv_params.items():
+ expected_action = name
+ instance.parse_args(argv)
+ self.assertEqual(expected_action, instance.action)
+
+
+try:
+ ProcessLookupError
+except NameError:
+ # Python 2 uses OSError.
+ ProcessLookupError = functools.partial(OSError, errno.ESRCH)
+
+class DaemonRunner_do_action_TestCase(DaemonRunner_BaseTestCase):
+ """ Test cases for DaemonRunner.do_action method. """
+
+ def test_raises_error_if_unknown_action(self):
+ """ Should emit a usage message and exit if action is unknown. """
+ instance = self.test_instance
+ instance.action = 'bogus'
+ expected_error = daemon.runner.DaemonRunnerInvalidActionError
+ self.assertRaises(
+ expected_error,
+ instance.do_action)
+
+
+class DaemonRunner_do_action_start_TestCase(DaemonRunner_BaseTestCase):
+ """ Test cases for DaemonRunner.do_action method, action 'start'. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonRunner_do_action_start_TestCase, self).setUp()
+
+ self.test_instance.action = 'start'
+
+ def test_raises_error_if_pidfile_locked(self):
+ """ Should raise error if PID file is locked. """
+
+ instance = self.test_instance
+ instance.daemon_context.open.side_effect = lockfile.AlreadyLocked
+ pidfile_path = self.scenario['pidfile_path']
+ expected_error = daemon.runner.DaemonRunnerStartFailureError
+ expected_message_content = pidfile_path
+ exc = self.assertRaises(
+ expected_error,
+ instance.do_action)
+ self.assertIn(expected_message_content, unicode(exc))
+
+ def test_breaks_lock_if_no_such_process(self):
+ """ Should request breaking lock if PID file process is not running. """
+ set_runner_scenario(self, 'pidfile-locked')
+ instance = self.test_instance
+ self.mock_runner_lockfile.read_pid.return_value = (
+ self.scenario['pidlockfile_scenario']['pidfile_pid'])
+ pidfile_path = self.scenario['pidfile_path']
+ test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
+ expected_signal = signal.SIG_DFL
+ test_error = ProcessLookupError("Not running")
+ os.kill.side_effect = test_error
+ instance.do_action()
+ os.kill.assert_called_with(test_pid, expected_signal)
+ self.mock_runner_lockfile.break_lock.assert_called_with()
+
+ def test_requests_daemon_context_open(self):
+ """ Should request the daemon context to open. """
+ instance = self.test_instance
+ instance.do_action()
+ instance.daemon_context.open.assert_called_with()
+
+ def test_emits_start_message_to_stderr(self):
+ """ Should emit start message to stderr. """
+ instance = self.test_instance
+ expected_stderr = """\
+ started with pid {pid:d}
+ """.format(
+ pid=self.scenario['pid'])
+ instance.do_action()
+ self.assertOutputCheckerMatch(
+ expected_stderr, self.fake_stderr.getvalue())
+
+ def test_requests_app_run(self):
+ """ Should request the application to run. """
+ instance = self.test_instance
+ instance.do_action()
+ self.test_app.run.assert_called_with()
+
+
+class DaemonRunner_do_action_stop_TestCase(DaemonRunner_BaseTestCase):
+ """ Test cases for DaemonRunner.do_action method, action 'stop'. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonRunner_do_action_stop_TestCase, self).setUp()
+
+ set_runner_scenario(self, 'pidfile-locked')
+
+ self.test_instance.action = 'stop'
+
+ self.mock_runner_lockfile.is_locked.return_value = True
+ self.mock_runner_lockfile.i_am_locking.return_value = False
+ self.mock_runner_lockfile.read_pid.return_value = (
+ self.scenario['pidlockfile_scenario']['pidfile_pid'])
+
+ def test_raises_error_if_pidfile_not_locked(self):
+ """ Should raise error if PID file is not locked. """
+ set_runner_scenario(self, 'simple')
+ instance = self.test_instance
+ self.mock_runner_lockfile.is_locked.return_value = False
+ self.mock_runner_lockfile.i_am_locking.return_value = False
+ self.mock_runner_lockfile.read_pid.return_value = (
+ self.scenario['pidlockfile_scenario']['pidfile_pid'])
+ pidfile_path = self.scenario['pidfile_path']
+ expected_error = daemon.runner.DaemonRunnerStopFailureError
+ expected_message_content = pidfile_path
+ exc = self.assertRaises(
+ expected_error,
+ instance.do_action)
+ self.assertIn(expected_message_content, unicode(exc))
+
+ def test_breaks_lock_if_pidfile_stale(self):
+ """ Should break lock if PID file is stale. """
+ instance = self.test_instance
+ pidfile_path = self.scenario['pidfile_path']
+ test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
+ expected_signal = signal.SIG_DFL
+ test_error = OSError(errno.ESRCH, "Not running")
+ os.kill.side_effect = test_error
+ instance.do_action()
+ self.mock_runner_lockfile.break_lock.assert_called_with()
+
+ def test_sends_terminate_signal_to_process_from_pidfile(self):
+ """ Should send SIGTERM to the daemon process. """
+ instance = self.test_instance
+ test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
+ expected_signal = signal.SIGTERM
+ instance.do_action()
+ os.kill.assert_called_with(test_pid, expected_signal)
+
+ def test_raises_error_if_cannot_send_signal_to_process(self):
+ """ Should raise error if cannot send signal to daemon process. """
+ instance = self.test_instance
+ test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
+ pidfile_path = self.scenario['pidfile_path']
+ test_error = OSError(errno.EPERM, "Nice try")
+ os.kill.side_effect = test_error
+ expected_error = daemon.runner.DaemonRunnerStopFailureError
+ expected_message_content = unicode(test_pid)
+ exc = self.assertRaises(
+ expected_error,
+ instance.do_action)
+ self.assertIn(expected_message_content, unicode(exc))
+
+
+@mock.patch.object(daemon.runner.DaemonRunner, "_start")
+@mock.patch.object(daemon.runner.DaemonRunner, "_stop")
+class DaemonRunner_do_action_restart_TestCase(DaemonRunner_BaseTestCase):
+ """ Test cases for DaemonRunner.do_action method, action 'restart'. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(DaemonRunner_do_action_restart_TestCase, self).setUp()
+
+ set_runner_scenario(self, 'pidfile-locked')
+
+ self.test_instance.action = 'restart'
+
+ def test_requests_stop_then_start(
+ self,
+ mock_func_daemonrunner_start, mock_func_daemonrunner_stop):
+ """ Should request stop, then start. """
+ instance = self.test_instance
+ instance.do_action()
+ mock_func_daemonrunner_start.assert_called_with()
+ mock_func_daemonrunner_stop.assert_called_with()
+
+
+@mock.patch.object(sys, "stderr")
+class emit_message_TestCase(scaffold.TestCase):
+ """ Test cases for ‘emit_message’ function. """
+
+ def test_writes_specified_message_to_stream(self, mock_stderr):
+ """ Should write specified message to stream. """
+ test_message = self.getUniqueString()
+ expected_content = "{message}\n".format(message=test_message)
+ daemon.runner.emit_message(test_message, stream=mock_stderr)
+ mock_stderr.write.assert_called_with(expected_content)
+
+ def test_writes_to_specified_stream(self, mock_stderr):
+ """ Should write message to specified stream. """
+ test_message = self.getUniqueString()
+ mock_stream = mock.MagicMock()
+ daemon.runner.emit_message(test_message, stream=mock_stream)
+ mock_stream.write.assert_called_with(mock.ANY)
+
+ def test_writes_to_stderr_by_default(self, mock_stderr):
+ """ Should write message to ‘sys.stderr’ by default. """
+ test_message = self.getUniqueString()
+ daemon.runner.emit_message(test_message)
+ mock_stderr.write.assert_called_with(mock.ANY)
+
+
+class is_pidfile_stale_TestCase(scaffold.TestCase):
+ """ Test cases for ‘is_pidfile_stale’ function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ super(is_pidfile_stale_TestCase, self).setUp()
+
+ func_patcher_os_kill = mock.patch.object(os, "kill")
+ func_patcher_os_kill.start()
+ self.addCleanup(func_patcher_os_kill.stop)
+ os.kill.return_value = None
+
+ self.test_pid = self.getUniqueInteger()
+ self.test_pidfile = mock.MagicMock(daemon.pidfile.TimeoutPIDLockFile)
+ self.test_pidfile.read_pid.return_value = self.test_pid
+
+ def test_returns_false_if_no_pid_in_file(self):
+ """ Should return False if the pidfile contains no PID. """
+ self.test_pidfile.read_pid.return_value = None
+ expected_result = False
+ result = daemon.runner.is_pidfile_stale(self.test_pidfile)
+ self.assertEqual(expected_result, result)
+
+ def test_returns_false_if_process_exists(self):
+ """ Should return False if the process with its PID exists. """
+ expected_result = False
+ result = daemon.runner.is_pidfile_stale(self.test_pidfile)
+ self.assertEqual(expected_result, result)
+
+ def test_returns_true_if_process_does_not_exist(self):
+ """ Should return True if the process does not exist. """
+ test_error = ProcessLookupError("No such process")
+ del os.kill.return_value
+ os.kill.side_effect = test_error
+ expected_result = True
+ result = daemon.runner.is_pidfile_stale(self.test_pidfile)
+ self.assertEqual(expected_result, result)
+
+
+# Local variables:
+# coding: utf-8
+# mode: python
+# End:
+# vim: fileencoding=utf-8 filetype=python :