diff options
author | 2015-08-26 15:50:06 +0300 | |
---|---|---|
committer | 2015-08-26 15:50:06 +0300 | |
commit | 53f0e28d7f30c7175cbb15884c309613593859d8 (patch) | |
tree | e75ff499233519baa6c50c60fa0c726da7b42e03 /scripts/external_libs/python-daemon-2.0.5/test | |
parent | a628a35b10fbd38211c353f506a8c49c2cc3dd7e (diff) | |
parent | 00d6f001971b324bb9c884aaf0384a4cee076550 (diff) |
Merge branch 'lib_fix'
Diffstat (limited to 'scripts/external_libs/python-daemon-2.0.5/test')
6 files changed, 3616 insertions, 0 deletions
diff --git a/scripts/external_libs/python-daemon-2.0.5/test/__init__.py b/scripts/external_libs/python-daemon-2.0.5/test/__init__.py new file mode 100644 index 00000000..398519f1 --- /dev/null +++ b/scripts/external_libs/python-daemon-2.0.5/test/__init__.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# +# test/__init__.py +# Part of ‘python-daemon’, an implementation of PEP 3143. +# +# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Apache License, version 2.0 as published by the +# Apache Software Foundation. +# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details. + +""" Unit test suite for ‘daemon’ package. + """ + +from __future__ import (absolute_import, unicode_literals) + + +# Local variables: +# coding: utf-8 +# mode: python +# End: +# vim: fileencoding=utf-8 filetype=python : diff --git a/scripts/external_libs/python-daemon-2.0.5/test/scaffold.py b/scripts/external_libs/python-daemon-2.0.5/test/scaffold.py new file mode 100644 index 00000000..9a4f1150 --- /dev/null +++ b/scripts/external_libs/python-daemon-2.0.5/test/scaffold.py @@ -0,0 +1,322 @@ +# -*- coding: utf-8 -*- + +# test/scaffold.py +# Part of ‘python-daemon’, an implementation of PEP 3143. +# +# Copyright © 2007–2015 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Apache License, version 2.0 as published by the +# Apache Software Foundation. +# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details. + +""" Scaffolding for unit test modules. + """ + +from __future__ import (absolute_import, unicode_literals) + +import unittest +import doctest +import logging +import os +import sys +import operator +import textwrap +from copy import deepcopy +import functools + +try: + # Python 2 has both ‘str’ (bytes) and ‘unicode’ (text). + basestring = basestring + unicode = unicode +except NameError: + # Python 3 names the Unicode data type ‘str’. + basestring = str + unicode = str + +import testscenarios +import testtools.testcase + + +test_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(test_dir) +if not test_dir in sys.path: + sys.path.insert(1, test_dir) +if not parent_dir in sys.path: + sys.path.insert(1, parent_dir) + +# Disable all but the most critical logging messages. +logging.disable(logging.CRITICAL) + + +def get_function_signature(func): + """ Get the function signature as a mapping of attributes. + + :param func: The function object to interrogate. + :return: A mapping of the components of a function signature. + + The signature is constructed as a mapping: + + * 'name': The function's defined name. + * 'arg_count': The number of arguments expected by the function. + * 'arg_names': A sequence of the argument names, as strings. + * 'arg_defaults': A sequence of the default values for the arguments. + * 'va_args': The name bound to remaining positional arguments. + * 'va_kw_args': The name bound to remaining keyword arguments. + + """ + try: + # Python 3 function attributes. + func_code = func.__code__ + func_defaults = func.__defaults__ + except AttributeError: + # Python 2 function attributes. + func_code = func.func_code + func_defaults = func.func_defaults + + arg_count = func_code.co_argcount + arg_names = func_code.co_varnames[:arg_count] + + arg_defaults = {} + if func_defaults is not None: + arg_defaults = dict( + (name, value) + for (name, value) in + zip(arg_names[::-1], func_defaults[::-1])) + + signature = { + 'name': func.__name__, + 'arg_count': arg_count, + 'arg_names': arg_names, + 'arg_defaults': arg_defaults, + } + + non_pos_names = list(func_code.co_varnames[arg_count:]) + COLLECTS_ARBITRARY_POSITIONAL_ARGS = 0x04 + if func_code.co_flags & COLLECTS_ARBITRARY_POSITIONAL_ARGS: + signature['var_args'] = non_pos_names.pop(0) + COLLECTS_ARBITRARY_KEYWORD_ARGS = 0x08 + if func_code.co_flags & COLLECTS_ARBITRARY_KEYWORD_ARGS: + signature['var_kw_args'] = non_pos_names.pop(0) + + return signature + + +def format_function_signature(func): + """ Format the function signature as printable text. + + :param func: The function object to interrogate. + :return: A formatted text representation of the function signature. + + The signature is rendered a text; for example:: + + foo(spam, eggs, ham=True, beans=None, *args, **kwargs) + + """ + signature = get_function_signature(func) + + args_text = [] + for arg_name in signature['arg_names']: + if arg_name in signature['arg_defaults']: + arg_text = "{name}={value!r}".format( + name=arg_name, value=signature['arg_defaults'][arg_name]) + else: + arg_text = "{name}".format( + name=arg_name) + args_text.append(arg_text) + if 'var_args' in signature: + args_text.append("*{var_args}".format(signature)) + if 'var_kw_args' in signature: + args_text.append("**{var_kw_args}".format(signature)) + signature_args_text = ", ".join(args_text) + + func_name = signature['name'] + signature_text = "{name}({args})".format( + name=func_name, args=signature_args_text) + + return signature_text + + +class TestCase(testtools.testcase.TestCase): + """ Test case behaviour. """ + + def failUnlessOutputCheckerMatch(self, want, got, msg=None): + """ Fail unless the specified string matches the expected. + + :param want: The desired output pattern. + :param got: The actual text to match. + :param msg: A message to prefix on the failure message. + :return: ``None``. + :raises self.failureException: If the text does not match. + + Fail the test unless ``want`` matches ``got``, as determined by + a ``doctest.OutputChecker`` instance. This is not an equality + check, but a pattern match according to the ``OutputChecker`` + rules. + + """ + checker = doctest.OutputChecker() + want = textwrap.dedent(want) + source = "" + example = doctest.Example(source, want) + got = textwrap.dedent(got) + checker_optionflags = functools.reduce(operator.or_, [ + doctest.ELLIPSIS, + ]) + if not checker.check_output(want, got, checker_optionflags): + if msg is None: + diff = checker.output_difference( + example, got, checker_optionflags) + msg = "\n".join([ + "Output received did not match expected output", + "{diff}", + ]).format( + diff=diff) + raise self.failureException(msg) + + assertOutputCheckerMatch = failUnlessOutputCheckerMatch + + def failUnlessFunctionInTraceback(self, traceback, function, msg=None): + """ Fail if the function is not in the traceback. + + :param traceback: The traceback object to interrogate. + :param function: The function object to match. + :param msg: A message to prefix on the failure message. + :return: ``None``. + + :raises self.failureException: If the function is not in the + traceback. + + Fail the test if the function ``function`` is not at any of the + levels in the traceback object ``traceback``. + + """ + func_in_traceback = False + expected_code = function.func_code + current_traceback = traceback + while current_traceback is not None: + if expected_code is current_traceback.tb_frame.f_code: + func_in_traceback = True + break + current_traceback = current_traceback.tb_next + + if not func_in_traceback: + if msg is None: + msg = ( + "Traceback did not lead to original function" + " {function}" + ).format( + function=function) + raise self.failureException(msg) + + assertFunctionInTraceback = failUnlessFunctionInTraceback + + def failUnlessFunctionSignatureMatch(self, first, second, msg=None): + """ Fail if the function signatures do not match. + + :param first: The first function to compare. + :param second: The second function to compare. + :param msg: A message to prefix to the failure message. + :return: ``None``. + + :raises self.failureException: If the function signatures do + not match. + + Fail the test if the function signature does not match between + the ``first`` function and the ``second`` function. + + The function signature includes: + + * function name, + + * count of named parameters, + + * sequence of named parameters, + + * default values of named parameters, + + * collector for arbitrary positional arguments, + + * collector for arbitrary keyword arguments. + + """ + first_signature = get_function_signature(first) + second_signature = get_function_signature(second) + + if first_signature != second_signature: + if msg is None: + first_signature_text = format_function_signature(first) + second_signature_text = format_function_signature(second) + msg = (textwrap.dedent("""\ + Function signatures do not match: + {first!r} != {second!r} + Expected: + {first_text} + Got: + {second_text}""") + ).format( + first=first_signature, + first_text=first_signature_text, + second=second_signature, + second_text=second_signature_text, + ) + raise self.failureException(msg) + + assertFunctionSignatureMatch = failUnlessFunctionSignatureMatch + + +class TestCaseWithScenarios(testscenarios.WithScenarios, TestCase): + """ Test cases run per scenario. """ + + +class Exception_TestCase(TestCaseWithScenarios): + """ Test cases for exception classes. """ + + def test_exception_instance(self): + """ Exception instance should be created. """ + self.assertIsNot(self.instance, None) + + def test_exception_types(self): + """ Exception instance should match expected types. """ + for match_type in self.types: + self.assertIsInstance(self.instance, match_type) + + +def make_exception_scenarios(scenarios): + """ Make test scenarios for exception classes. + + :param scenarios: Sequence of scenarios. + :return: List of scenarios with additional mapping entries. + + Use this with `testscenarios` to adapt `Exception_TestCase`_ for + any exceptions that need testing. + + Each scenario is a tuple (`name`, `map`) where `map` is a mapping + of attributes to be applied to each test case. Attributes map must + contain items for: + + :key exc_type: + The exception type to be tested. + :key min_args: + The minimum argument count for the exception instance + initialiser. + :key types: + Sequence of types that should be superclasses of each + instance of the exception type. + + """ + updated_scenarios = deepcopy(scenarios) + for (name, scenario) in updated_scenarios: + args = (None,) * scenario['min_args'] + scenario['args'] = args + instance = scenario['exc_type'](*args) + scenario['instance'] = instance + + return updated_scenarios + + +# Local variables: +# coding: utf-8 +# mode: python +# End: +# vim: fileencoding=utf-8 filetype=python : diff --git a/scripts/external_libs/python-daemon-2.0.5/test/test_daemon.py b/scripts/external_libs/python-daemon-2.0.5/test/test_daemon.py new file mode 100644 index 00000000..a911858a --- /dev/null +++ b/scripts/external_libs/python-daemon-2.0.5/test/test_daemon.py @@ -0,0 +1,1744 @@ +# -*- coding: utf-8 -*- +# +# test/test_daemon.py +# Part of ‘python-daemon’, an implementation of PEP 3143. +# +# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Apache License, version 2.0 as published by the +# Apache Software Foundation. +# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details. + +""" Unit test for ‘daemon’ module. + """ + +from __future__ import (absolute_import, unicode_literals) + +import os +import sys +import tempfile +import resource +import errno +import signal +import socket +from types import ModuleType +import collections +import functools +try: + # Standard library of Python 2.7 and later. + from io import StringIO +except ImportError: + # Standard library of Python 2.6 and earlier. + from StringIO import StringIO + +import mock + +from . import scaffold +from .scaffold import (basestring, unicode) +from .test_pidfile import ( + FakeFileDescriptorStringIO, + setup_pidfile_fixtures, + ) + +import daemon + + +class ModuleExceptions_TestCase(scaffold.Exception_TestCase): + """ Test cases for module exception classes. """ + + scenarios = scaffold.make_exception_scenarios([ + ('daemon.daemon.DaemonError', dict( + exc_type = daemon.daemon.DaemonError, + min_args = 1, + types = [Exception], + )), + ('daemon.daemon.DaemonOSEnvironmentError', dict( + exc_type = daemon.daemon.DaemonOSEnvironmentError, + min_args = 1, + types = [daemon.daemon.DaemonError, OSError], + )), + ('daemon.daemon.DaemonProcessDetachError', dict( + exc_type = daemon.daemon.DaemonProcessDetachError, + min_args = 1, + types = [daemon.daemon.DaemonError, OSError], + )), + ]) + + +def setup_daemon_context_fixtures(testcase): + """ Set up common test fixtures for DaemonContext test case. + + :param testcase: A ``TestCase`` instance to decorate. + :return: ``None``. + + Decorate the `testcase` with fixtures for tests involving + `DaemonContext`. + + """ + setup_streams_fixtures(testcase) + + setup_pidfile_fixtures(testcase) + + testcase.fake_pidfile_path = tempfile.mktemp() + testcase.mock_pidlockfile = mock.MagicMock() + testcase.mock_pidlockfile.path = testcase.fake_pidfile_path + + testcase.daemon_context_args = dict( + stdin=testcase.stream_files_by_name['stdin'], + stdout=testcase.stream_files_by_name['stdout'], + stderr=testcase.stream_files_by_name['stderr'], + ) + testcase.test_instance = daemon.DaemonContext( + **testcase.daemon_context_args) + +fake_default_signal_map = object() + +@mock.patch.object( + daemon.daemon, "is_detach_process_context_required", + new=(lambda: True)) +@mock.patch.object( + daemon.daemon, "make_default_signal_map", + new=(lambda: fake_default_signal_map)) +@mock.patch.object(os, "setgid", new=(lambda x: object())) +@mock.patch.object(os, "setuid", new=(lambda x: object())) +class DaemonContext_BaseTestCase(scaffold.TestCase): + """ Base class for DaemonContext test case classes. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonContext_BaseTestCase, self).setUp() + + setup_daemon_context_fixtures(self) + + +class DaemonContext_TestCase(DaemonContext_BaseTestCase): + """ Test cases for DaemonContext class. """ + + def test_instantiate(self): + """ New instance of DaemonContext should be created. """ + self.assertIsInstance( + self.test_instance, daemon.daemon.DaemonContext) + + def test_minimum_zero_arguments(self): + """ Initialiser should not require any arguments. """ + instance = daemon.daemon.DaemonContext() + self.assertIsNot(instance, None) + + def test_has_specified_chroot_directory(self): + """ Should have specified chroot_directory option. """ + args = dict( + chroot_directory=object(), + ) + expected_directory = args['chroot_directory'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_directory, instance.chroot_directory) + + def test_has_specified_working_directory(self): + """ Should have specified working_directory option. """ + args = dict( + working_directory=object(), + ) + expected_directory = args['working_directory'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_directory, instance.working_directory) + + def test_has_default_working_directory(self): + """ Should have default working_directory option. """ + args = dict() + expected_directory = "/" + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_directory, instance.working_directory) + + def test_has_specified_creation_mask(self): + """ Should have specified umask option. """ + args = dict( + umask=object(), + ) + expected_mask = args['umask'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_mask, instance.umask) + + def test_has_default_creation_mask(self): + """ Should have default umask option. """ + args = dict() + expected_mask = 0 + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_mask, instance.umask) + + def test_has_specified_uid(self): + """ Should have specified uid option. """ + args = dict( + uid=object(), + ) + expected_id = args['uid'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_id, instance.uid) + + def test_has_derived_uid(self): + """ Should have uid option derived from process. """ + args = dict() + expected_id = os.getuid() + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_id, instance.uid) + + def test_has_specified_gid(self): + """ Should have specified gid option. """ + args = dict( + gid=object(), + ) + expected_id = args['gid'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_id, instance.gid) + + def test_has_derived_gid(self): + """ Should have gid option derived from process. """ + args = dict() + expected_id = os.getgid() + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_id, instance.gid) + + def test_has_specified_detach_process(self): + """ Should have specified detach_process option. """ + args = dict( + detach_process=object(), + ) + expected_value = args['detach_process'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_value, instance.detach_process) + + def test_has_derived_detach_process(self): + """ Should have detach_process option derived from environment. """ + args = dict() + func = daemon.daemon.is_detach_process_context_required + expected_value = func() + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_value, instance.detach_process) + + def test_has_specified_files_preserve(self): + """ Should have specified files_preserve option. """ + args = dict( + files_preserve=object(), + ) + expected_files_preserve = args['files_preserve'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_files_preserve, instance.files_preserve) + + def test_has_specified_pidfile(self): + """ Should have the specified pidfile. """ + args = dict( + pidfile=object(), + ) + expected_pidfile = args['pidfile'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_pidfile, instance.pidfile) + + def test_has_specified_stdin(self): + """ Should have specified stdin option. """ + args = dict( + stdin=object(), + ) + expected_file = args['stdin'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_file, instance.stdin) + + def test_has_specified_stdout(self): + """ Should have specified stdout option. """ + args = dict( + stdout=object(), + ) + expected_file = args['stdout'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_file, instance.stdout) + + def test_has_specified_stderr(self): + """ Should have specified stderr option. """ + args = dict( + stderr=object(), + ) + expected_file = args['stderr'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_file, instance.stderr) + + def test_has_specified_signal_map(self): + """ Should have specified signal_map option. """ + args = dict( + signal_map=object(), + ) + expected_signal_map = args['signal_map'] + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_signal_map, instance.signal_map) + + def test_has_derived_signal_map(self): + """ Should have signal_map option derived from system. """ + args = dict() + expected_signal_map = daemon.daemon.make_default_signal_map() + instance = daemon.daemon.DaemonContext(**args) + self.assertEqual(expected_signal_map, instance.signal_map) + + +class DaemonContext_is_open_TestCase(DaemonContext_BaseTestCase): + """ Test cases for DaemonContext.is_open property. """ + + def test_begin_false(self): + """ Initial value of is_open should be False. """ + instance = self.test_instance + self.assertEqual(False, instance.is_open) + + def test_write_fails(self): + """ Writing to is_open should fail. """ + instance = self.test_instance + self.assertRaises( + AttributeError, + setattr, instance, 'is_open', object()) + + +class DaemonContext_open_TestCase(DaemonContext_BaseTestCase): + """ Test cases for DaemonContext.open method. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonContext_open_TestCase, self).setUp() + + self.test_instance._is_open = False + + self.mock_module_daemon = mock.MagicMock() + daemon_func_patchers = dict( + (func_name, mock.patch.object( + daemon.daemon, func_name)) + for func_name in [ + "detach_process_context", + "change_working_directory", + "change_root_directory", + "change_file_creation_mask", + "change_process_owner", + "prevent_core_dump", + "close_all_open_files", + "redirect_stream", + "set_signal_handlers", + "register_atexit_function", + ]) + for (func_name, patcher) in daemon_func_patchers.items(): + mock_func = patcher.start() + self.addCleanup(patcher.stop) + self.mock_module_daemon.attach_mock(mock_func, func_name) + + self.mock_module_daemon.attach_mock(mock.Mock(), 'DaemonContext') + + self.test_files_preserve_fds = object() + self.test_signal_handler_map = object() + daemoncontext_method_return_values = { + '_get_exclude_file_descriptors': + self.test_files_preserve_fds, + '_make_signal_handler_map': + self.test_signal_handler_map, + } + daemoncontext_func_patchers = dict( + (func_name, mock.patch.object( + daemon.daemon.DaemonContext, + func_name, + return_value=return_value)) + for (func_name, return_value) in + daemoncontext_method_return_values.items()) + for (func_name, patcher) in daemoncontext_func_patchers.items(): + mock_func = patcher.start() + self.addCleanup(patcher.stop) + self.mock_module_daemon.DaemonContext.attach_mock( + mock_func, func_name) + + def test_performs_steps_in_expected_sequence(self): + """ Should perform daemonisation steps in expected sequence. """ + instance = self.test_instance + instance.chroot_directory = object() + instance.detach_process = True + instance.pidfile = self.mock_pidlockfile + self.mock_module_daemon.attach_mock( + self.mock_pidlockfile, 'pidlockfile') + expected_calls = [ + mock.call.change_root_directory(mock.ANY), + mock.call.prevent_core_dump(), + mock.call.change_file_creation_mask(mock.ANY), + mock.call.change_working_directory(mock.ANY), + mock.call.change_process_owner(mock.ANY, mock.ANY), + mock.call.detach_process_context(), + mock.call.DaemonContext._make_signal_handler_map(), + mock.call.set_signal_handlers(mock.ANY), + mock.call.DaemonContext._get_exclude_file_descriptors(), + mock.call.close_all_open_files(exclude=mock.ANY), + mock.call.redirect_stream(mock.ANY, mock.ANY), + mock.call.redirect_stream(mock.ANY, mock.ANY), + mock.call.redirect_stream(mock.ANY, mock.ANY), + mock.call.pidlockfile.__enter__(), + mock.call.register_atexit_function(mock.ANY), + ] + instance.open() + self.mock_module_daemon.assert_has_calls(expected_calls) + + def test_returns_immediately_if_is_open(self): + """ Should return immediately if is_open property is true. """ + instance = self.test_instance + instance._is_open = True + instance.open() + self.assertEqual(0, len(self.mock_module_daemon.mock_calls)) + + def test_changes_root_directory_to_chroot_directory(self): + """ Should change root directory to `chroot_directory` option. """ + instance = self.test_instance + chroot_directory = object() + instance.chroot_directory = chroot_directory + instance.open() + self.mock_module_daemon.change_root_directory.assert_called_with( + chroot_directory) + + def test_omits_chroot_if_no_chroot_directory(self): + """ Should omit changing root directory if no `chroot_directory`. """ + instance = self.test_instance + instance.chroot_directory = None + instance.open() + self.assertFalse(self.mock_module_daemon.change_root_directory.called) + + def test_prevents_core_dump(self): + """ Should request prevention of core dumps. """ + instance = self.test_instance + instance.open() + self.mock_module_daemon.prevent_core_dump.assert_called_with() + + def test_omits_prevent_core_dump_if_prevent_core_false(self): + """ Should omit preventing core dumps if `prevent_core` is false. """ + instance = self.test_instance + instance.prevent_core = False + instance.open() + self.assertFalse(self.mock_module_daemon.prevent_core_dump.called) + + def test_closes_open_files(self): + """ Should close all open files, excluding `files_preserve`. """ + instance = self.test_instance + expected_exclude = self.test_files_preserve_fds + instance.open() + self.mock_module_daemon.close_all_open_files.assert_called_with( + exclude=expected_exclude) + + def test_changes_directory_to_working_directory(self): + """ Should change current directory to `working_directory` option. """ + instance = self.test_instance + working_directory = object() + instance.working_directory = working_directory + instance.open() + self.mock_module_daemon.change_working_directory.assert_called_with( + working_directory) + + def test_changes_creation_mask_to_umask(self): + """ Should change file creation mask to `umask` option. """ + instance = self.test_instance + umask = object() + instance.umask = umask + instance.open() + self.mock_module_daemon.change_file_creation_mask.assert_called_with( + umask) + + def test_changes_owner_to_specified_uid_and_gid(self): + """ Should change process UID and GID to `uid` and `gid` options. """ + instance = self.test_instance + uid = object() + gid = object() + instance.uid = uid + instance.gid = gid + instance.open() + self.mock_module_daemon.change_process_owner.assert_called_with( + uid, gid) + + def test_detaches_process_context(self): + """ Should request detach of process context. """ + instance = self.test_instance + instance.open() + self.mock_module_daemon.detach_process_context.assert_called_with() + + def test_omits_process_detach_if_not_required(self): + """ Should omit detach of process context if not required. """ + instance = self.test_instance + instance.detach_process = False + instance.open() + self.assertFalse(self.mock_module_daemon.detach_process_context.called) + + def test_sets_signal_handlers_from_signal_map(self): + """ Should set signal handlers according to `signal_map`. """ + instance = self.test_instance + instance.signal_map = object() + expected_signal_handler_map = self.test_signal_handler_map + instance.open() + self.mock_module_daemon.set_signal_handlers.assert_called_with( + expected_signal_handler_map) + + def test_redirects_standard_streams(self): + """ Should request redirection of standard stream files. """ + instance = self.test_instance + (system_stdin, system_stdout, system_stderr) = ( + sys.stdin, sys.stdout, sys.stderr) + (target_stdin, target_stdout, target_stderr) = ( + self.stream_files_by_name[name] + for name in ['stdin', 'stdout', 'stderr']) + expected_calls = [ + mock.call(system_stdin, target_stdin), + mock.call(system_stdout, target_stdout), + mock.call(system_stderr, target_stderr), + ] + instance.open() + self.mock_module_daemon.redirect_stream.assert_has_calls( + expected_calls, any_order=True) + + def test_enters_pidfile_context(self): + """ Should enter the PID file context manager. """ + instance = self.test_instance + instance.pidfile = self.mock_pidlockfile + instance.open() + self.mock_pidlockfile.__enter__.assert_called_with() + + def test_sets_is_open_true(self): + """ Should set the `is_open` property to True. """ + instance = self.test_instance + instance.open() + self.assertEqual(True, instance.is_open) + + def test_registers_close_method_for_atexit(self): + """ Should register the `close` method for atexit processing. """ + instance = self.test_instance + close_method = instance.close + instance.open() + self.mock_module_daemon.register_atexit_function.assert_called_with( + close_method) + + +class DaemonContext_close_TestCase(DaemonContext_BaseTestCase): + """ Test cases for DaemonContext.close method. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonContext_close_TestCase, self).setUp() + + self.test_instance._is_open = True + + def test_returns_immediately_if_not_is_open(self): + """ Should return immediately if is_open property is false. """ + instance = self.test_instance + instance._is_open = False + instance.pidfile = object() + instance.close() + self.assertFalse(self.mock_pidlockfile.__exit__.called) + + def test_exits_pidfile_context(self): + """ Should exit the PID file context manager. """ + instance = self.test_instance + instance.pidfile = self.mock_pidlockfile + instance.close() + self.mock_pidlockfile.__exit__.assert_called_with(None, None, None) + + def test_returns_none(self): + """ Should return None. """ + instance = self.test_instance + expected_result = None + result = instance.close() + self.assertIs(result, expected_result) + + def test_sets_is_open_false(self): + """ Should set the `is_open` property to False. """ + instance = self.test_instance + instance.close() + self.assertEqual(False, instance.is_open) + + +@mock.patch.object(daemon.daemon.DaemonContext, "open") +class DaemonContext_context_manager_enter_TestCase(DaemonContext_BaseTestCase): + """ Test cases for DaemonContext.__enter__ method. """ + + def test_opens_daemon_context(self, mock_func_daemoncontext_open): + """ Should open the DaemonContext. """ + instance = self.test_instance + instance.__enter__() + mock_func_daemoncontext_open.assert_called_with() + + def test_returns_self_instance(self, mock_func_daemoncontext_open): + """ Should return DaemonContext instance. """ + instance = self.test_instance + expected_result = instance + result = instance.__enter__() + self.assertIs(result, expected_result) + + +@mock.patch.object(daemon.daemon.DaemonContext, "close") +class DaemonContext_context_manager_exit_TestCase(DaemonContext_BaseTestCase): + """ Test cases for DaemonContext.__exit__ method. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonContext_context_manager_exit_TestCase, self).setUp() + + self.test_args = dict( + exc_type=object(), + exc_value=object(), + traceback=object(), + ) + + def test_closes_daemon_context(self, mock_func_daemoncontext_close): + """ Should close the DaemonContext. """ + instance = self.test_instance + args = self.test_args + instance.__exit__(**args) + mock_func_daemoncontext_close.assert_called_with() + + def test_returns_none(self, mock_func_daemoncontext_close): + """ Should return None, indicating exception was not handled. """ + instance = self.test_instance + args = self.test_args + expected_result = None + result = instance.__exit__(**args) + self.assertIs(result, expected_result) + + +class DaemonContext_terminate_TestCase(DaemonContext_BaseTestCase): + """ Test cases for DaemonContext.terminate method. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonContext_terminate_TestCase, self).setUp() + + self.test_signal = signal.SIGTERM + self.test_frame = None + self.test_args = (self.test_signal, self.test_frame) + + def test_raises_system_exit(self): + """ Should raise SystemExit. """ + instance = self.test_instance + args = self.test_args + expected_exception = SystemExit + self.assertRaises( + expected_exception, + instance.terminate, *args) + + def test_exception_message_contains_signal_number(self): + """ Should raise exception with a message containing signal number. """ + instance = self.test_instance + args = self.test_args + signal_number = self.test_signal + expected_exception = SystemExit + exc = self.assertRaises( + expected_exception, + instance.terminate, *args) + self.assertIn(unicode(signal_number), unicode(exc)) + + +class DaemonContext_get_exclude_file_descriptors_TestCase( + DaemonContext_BaseTestCase): + """ Test cases for DaemonContext._get_exclude_file_descriptors function. """ + + def setUp(self): + """ Set up test fixtures. """ + super( + DaemonContext_get_exclude_file_descriptors_TestCase, + self).setUp() + + self.test_files = { + 2: FakeFileDescriptorStringIO(), + 5: 5, + 11: FakeFileDescriptorStringIO(), + 17: None, + 23: FakeFileDescriptorStringIO(), + 37: 37, + 42: FakeFileDescriptorStringIO(), + } + for (fileno, item) in self.test_files.items(): + if hasattr(item, '_fileno'): + item._fileno = fileno + self.test_file_descriptors = set( + fd for (fd, item) in self.test_files.items() + if item is not None) + self.test_file_descriptors.update( + self.stream_files_by_name[name].fileno() + for name in ['stdin', 'stdout', 'stderr'] + ) + + def test_returns_expected_file_descriptors(self): + """ Should return expected set of file descriptors. """ + instance = self.test_instance + instance.files_preserve = list(self.test_files.values()) + expected_result = self.test_file_descriptors + result = instance._get_exclude_file_descriptors() + self.assertEqual(expected_result, result) + + def test_returns_stream_redirects_if_no_files_preserve(self): + """ Should return only stream redirects if no files_preserve. """ + instance = self.test_instance + instance.files_preserve = None + expected_result = set( + stream.fileno() + for stream in self.stream_files_by_name.values()) + result = instance._get_exclude_file_descriptors() + self.assertEqual(expected_result, result) + + def test_returns_empty_set_if_no_files(self): + """ Should return empty set if no file options. """ + instance = self.test_instance + for name in ['files_preserve', 'stdin', 'stdout', 'stderr']: + setattr(instance, name, None) + expected_result = set() + result = instance._get_exclude_file_descriptors() + self.assertEqual(expected_result, result) + + def test_omits_non_file_streams(self): + """ Should omit non-file stream attributes. """ + instance = self.test_instance + instance.files_preserve = list(self.test_files.values()) + stream_files = self.stream_files_by_name + expected_result = self.test_file_descriptors.copy() + for (pseudo_stream_name, pseudo_stream) in stream_files.items(): + test_non_file_object = object() + setattr(instance, pseudo_stream_name, test_non_file_object) + stream_fd = pseudo_stream.fileno() + expected_result.discard(stream_fd) + result = instance._get_exclude_file_descriptors() + self.assertEqual(expected_result, result) + + def test_includes_verbatim_streams_without_file_descriptor(self): + """ Should include verbatim any stream without a file descriptor. """ + instance = self.test_instance + instance.files_preserve = list(self.test_files.values()) + stream_files = self.stream_files_by_name + mock_fileno_method = mock.MagicMock( + spec=sys.__stdin__.fileno, + side_effect=ValueError) + expected_result = self.test_file_descriptors.copy() + for (pseudo_stream_name, pseudo_stream) in stream_files.items(): + test_non_fd_stream = StringIO() + if not hasattr(test_non_fd_stream, 'fileno'): + # Python < 3 StringIO doesn't have ‘fileno’ at all. + # Add a method which raises an exception. + test_non_fd_stream.fileno = mock_fileno_method + setattr(instance, pseudo_stream_name, test_non_fd_stream) + stream_fd = pseudo_stream.fileno() + expected_result.discard(stream_fd) + expected_result.add(test_non_fd_stream) + result = instance._get_exclude_file_descriptors() + self.assertEqual(expected_result, result) + + def test_omits_none_streams(self): + """ Should omit any stream attribute which is None. """ + instance = self.test_instance + instance.files_preserve = list(self.test_files.values()) + stream_files = self.stream_files_by_name + expected_result = self.test_file_descriptors.copy() + for (pseudo_stream_name, pseudo_stream) in stream_files.items(): + setattr(instance, pseudo_stream_name, None) + stream_fd = pseudo_stream.fileno() + expected_result.discard(stream_fd) + result = instance._get_exclude_file_descriptors() + self.assertEqual(expected_result, result) + + +class DaemonContext_make_signal_handler_TestCase(DaemonContext_BaseTestCase): + """ Test cases for DaemonContext._make_signal_handler function. """ + + def test_returns_ignore_for_none(self): + """ Should return SIG_IGN when None handler specified. """ + instance = self.test_instance + target = None + expected_result = signal.SIG_IGN + result = instance._make_signal_handler(target) + self.assertEqual(expected_result, result) + + def test_returns_method_for_name(self): + """ Should return method of DaemonContext when name specified. """ + instance = self.test_instance + target = 'terminate' + expected_result = instance.terminate + result = instance._make_signal_handler(target) + self.assertEqual(expected_result, result) + + def test_raises_error_for_unknown_name(self): + """ Should raise AttributeError for unknown method name. """ + instance = self.test_instance + target = 'b0gUs' + expected_error = AttributeError + self.assertRaises( + expected_error, + instance._make_signal_handler, target) + + def test_returns_object_for_object(self): + """ Should return same object for any other object. """ + instance = self.test_instance + target = object() + expected_result = target + result = instance._make_signal_handler(target) + self.assertEqual(expected_result, result) + + +class DaemonContext_make_signal_handler_map_TestCase( + DaemonContext_BaseTestCase): + """ Test cases for DaemonContext._make_signal_handler_map function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonContext_make_signal_handler_map_TestCase, self).setUp() + + self.test_instance.signal_map = { + object(): object(), + object(): object(), + object(): object(), + } + + self.test_signal_handlers = dict( + (key, object()) + for key in self.test_instance.signal_map.values()) + self.test_signal_handler_map = dict( + (key, self.test_signal_handlers[target]) + for (key, target) in self.test_instance.signal_map.items()) + + def fake_make_signal_handler(target): + return self.test_signal_handlers[target] + + func_patcher_make_signal_handler = mock.patch.object( + daemon.daemon.DaemonContext, "_make_signal_handler", + side_effect=fake_make_signal_handler) + self.mock_func_make_signal_handler = ( + func_patcher_make_signal_handler.start()) + self.addCleanup(func_patcher_make_signal_handler.stop) + + def test_returns_constructed_signal_handler_items(self): + """ Should return items as constructed via make_signal_handler. """ + instance = self.test_instance + expected_result = self.test_signal_handler_map + result = instance._make_signal_handler_map() + self.assertEqual(expected_result, result) + + +try: + FileNotFoundError +except NameError: + # Python 2 uses IOError. + FileNotFoundError = functools.partial(IOError, errno.ENOENT) + + +@mock.patch.object(os, "chdir") +class change_working_directory_TestCase(scaffold.TestCase): + """ Test cases for change_working_directory function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(change_working_directory_TestCase, self).setUp() + + self.test_directory = object() + self.test_args = dict( + directory=self.test_directory, + ) + + def test_changes_working_directory_to_specified_directory( + self, + mock_func_os_chdir): + """ Should change working directory to specified directory. """ + args = self.test_args + directory = self.test_directory + daemon.daemon.change_working_directory(**args) + mock_func_os_chdir.assert_called_with(directory) + + def test_raises_daemon_error_on_os_error( + self, + mock_func_os_chdir): + """ Should raise a DaemonError on receiving an IOError. """ + args = self.test_args + test_error = FileNotFoundError("No such directory") + mock_func_os_chdir.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_working_directory, **args) + self.assertEqual(test_error, exc.__cause__) + + def test_error_message_contains_original_error_message( + self, + mock_func_os_chdir): + """ Should raise a DaemonError with original message. """ + args = self.test_args + test_error = FileNotFoundError("No such directory") + mock_func_os_chdir.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_working_directory, **args) + self.assertIn(unicode(test_error), unicode(exc)) + + +@mock.patch.object(os, "chroot") +@mock.patch.object(os, "chdir") +class change_root_directory_TestCase(scaffold.TestCase): + """ Test cases for change_root_directory function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(change_root_directory_TestCase, self).setUp() + + self.test_directory = object() + self.test_args = dict( + directory=self.test_directory, + ) + + def test_changes_working_directory_to_specified_directory( + self, + mock_func_os_chdir, mock_func_os_chroot): + """ Should change working directory to specified directory. """ + args = self.test_args + directory = self.test_directory + daemon.daemon.change_root_directory(**args) + mock_func_os_chdir.assert_called_with(directory) + + def test_changes_root_directory_to_specified_directory( + self, + mock_func_os_chdir, mock_func_os_chroot): + """ Should change root directory to specified directory. """ + args = self.test_args + directory = self.test_directory + daemon.daemon.change_root_directory(**args) + mock_func_os_chroot.assert_called_with(directory) + + def test_raises_daemon_error_on_os_error_from_chdir( + self, + mock_func_os_chdir, mock_func_os_chroot): + """ Should raise a DaemonError on receiving an IOError from chdir. """ + args = self.test_args + test_error = FileNotFoundError("No such directory") + mock_func_os_chdir.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_root_directory, **args) + self.assertEqual(test_error, exc.__cause__) + + def test_raises_daemon_error_on_os_error_from_chroot( + self, + mock_func_os_chdir, mock_func_os_chroot): + """ Should raise a DaemonError on receiving an OSError from chroot. """ + args = self.test_args + test_error = OSError(errno.EPERM, "No chroot for you!") + mock_func_os_chroot.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_root_directory, **args) + self.assertEqual(test_error, exc.__cause__) + + def test_error_message_contains_original_error_message( + self, + mock_func_os_chdir, mock_func_os_chroot): + """ Should raise a DaemonError with original message. """ + args = self.test_args + test_error = FileNotFoundError("No such directory") + mock_func_os_chdir.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_root_directory, **args) + self.assertIn(unicode(test_error), unicode(exc)) + + +@mock.patch.object(os, "umask") +class change_file_creation_mask_TestCase(scaffold.TestCase): + """ Test cases for change_file_creation_mask function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(change_file_creation_mask_TestCase, self).setUp() + + self.test_mask = object() + self.test_args = dict( + mask=self.test_mask, + ) + + def test_changes_umask_to_specified_mask(self, mock_func_os_umask): + """ Should change working directory to specified directory. """ + args = self.test_args + mask = self.test_mask + daemon.daemon.change_file_creation_mask(**args) + mock_func_os_umask.assert_called_with(mask) + + def test_raises_daemon_error_on_os_error_from_chdir( + self, + mock_func_os_umask): + """ Should raise a DaemonError on receiving an OSError from umask. """ + args = self.test_args + test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?") + mock_func_os_umask.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_file_creation_mask, **args) + self.assertEqual(test_error, exc.__cause__) + + def test_error_message_contains_original_error_message( + self, + mock_func_os_umask): + """ Should raise a DaemonError with original message. """ + args = self.test_args + test_error = FileNotFoundError("No such directory") + mock_func_os_umask.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_file_creation_mask, **args) + self.assertIn(unicode(test_error), unicode(exc)) + + +@mock.patch.object(os, "setgid") +@mock.patch.object(os, "setuid") +class change_process_owner_TestCase(scaffold.TestCase): + """ Test cases for change_process_owner function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(change_process_owner_TestCase, self).setUp() + + self.test_uid = object() + self.test_gid = object() + self.test_args = dict( + uid=self.test_uid, + gid=self.test_gid, + ) + + def test_changes_gid_and_uid_in_order( + self, + mock_func_os_setuid, mock_func_os_setgid): + """ Should change process GID and UID in correct order. + + Since the process requires appropriate privilege to use + either of `setuid` or `setgid`, changing the UID must be + done last. + + """ + args = self.test_args + daemon.daemon.change_process_owner(**args) + mock_func_os_setuid.assert_called() + mock_func_os_setgid.assert_called() + + def test_changes_group_id_to_gid( + self, + mock_func_os_setuid, mock_func_os_setgid): + """ Should change process GID to specified value. """ + args = self.test_args + gid = self.test_gid + daemon.daemon.change_process_owner(**args) + mock_func_os_setgid.assert_called(gid) + + def test_changes_user_id_to_uid( + self, + mock_func_os_setuid, mock_func_os_setgid): + """ Should change process UID to specified value. """ + args = self.test_args + uid = self.test_uid + daemon.daemon.change_process_owner(**args) + mock_func_os_setuid.assert_called(uid) + + def test_raises_daemon_error_on_os_error_from_setgid( + self, + mock_func_os_setuid, mock_func_os_setgid): + """ Should raise a DaemonError on receiving an OSError from setgid. """ + args = self.test_args + test_error = OSError(errno.EPERM, "No switching for you!") + mock_func_os_setgid.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_process_owner, **args) + self.assertEqual(test_error, exc.__cause__) + + def test_raises_daemon_error_on_os_error_from_setuid( + self, + mock_func_os_setuid, mock_func_os_setgid): + """ Should raise a DaemonError on receiving an OSError from setuid. """ + args = self.test_args + test_error = OSError(errno.EPERM, "No switching for you!") + mock_func_os_setuid.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_process_owner, **args) + self.assertEqual(test_error, exc.__cause__) + + def test_error_message_contains_original_error_message( + self, + mock_func_os_setuid, mock_func_os_setgid): + """ Should raise a DaemonError with original message. """ + args = self.test_args + test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?") + mock_func_os_setuid.side_effect = test_error + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.change_process_owner, **args) + self.assertIn(unicode(test_error), unicode(exc)) + + +RLimitResult = collections.namedtuple('RLimitResult', ['soft', 'hard']) + +fake_RLIMIT_CORE = object() + +@mock.patch.object(resource, "RLIMIT_CORE", new=fake_RLIMIT_CORE) +@mock.patch.object(resource, "setrlimit", side_effect=(lambda x, y: None)) +@mock.patch.object(resource, "getrlimit", side_effect=(lambda x: None)) +class prevent_core_dump_TestCase(scaffold.TestCase): + """ Test cases for prevent_core_dump function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(prevent_core_dump_TestCase, self).setUp() + + def test_sets_core_limit_to_zero( + self, + mock_func_resource_getrlimit, mock_func_resource_setrlimit): + """ Should set the RLIMIT_CORE resource to zero. """ + expected_resource = fake_RLIMIT_CORE + expected_limit = tuple(RLimitResult(soft=0, hard=0)) + daemon.daemon.prevent_core_dump() + mock_func_resource_getrlimit.assert_called_with(expected_resource) + mock_func_resource_setrlimit.assert_called_with( + expected_resource, expected_limit) + + def test_raises_error_when_no_core_resource( + self, + mock_func_resource_getrlimit, mock_func_resource_setrlimit): + """ Should raise DaemonError if no RLIMIT_CORE resource. """ + test_error = ValueError("Bogus platform doesn't have RLIMIT_CORE") + def fake_getrlimit(res): + if res == resource.RLIMIT_CORE: + raise test_error + else: + return None + mock_func_resource_getrlimit.side_effect = fake_getrlimit + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.prevent_core_dump) + self.assertEqual(test_error, exc.__cause__) + + +@mock.patch.object(os, "close") +class close_file_descriptor_if_open_TestCase(scaffold.TestCase): + """ Test cases for close_file_descriptor_if_open function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(close_file_descriptor_if_open_TestCase, self).setUp() + + self.fake_fd = 274 + + def test_requests_file_descriptor_close(self, mock_func_os_close): + """ Should request close of file descriptor. """ + fd = self.fake_fd + daemon.daemon.close_file_descriptor_if_open(fd) + mock_func_os_close.assert_called_with(fd) + + def test_ignores_badfd_error_on_close(self, mock_func_os_close): + """ Should ignore OSError EBADF when closing. """ + fd = self.fake_fd + test_error = OSError(errno.EBADF, "Bad file descriptor") + def fake_os_close(fd): + raise test_error + mock_func_os_close.side_effect = fake_os_close + daemon.daemon.close_file_descriptor_if_open(fd) + mock_func_os_close.assert_called_with(fd) + + def test_raises_error_if_oserror_on_close(self, mock_func_os_close): + """ Should raise DaemonError if an OSError occurs when closing. """ + fd = self.fake_fd + test_error = OSError(object(), "Unexpected error") + def fake_os_close(fd): + raise test_error + mock_func_os_close.side_effect = fake_os_close + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.close_file_descriptor_if_open, fd) + self.assertEqual(test_error, exc.__cause__) + + def test_raises_error_if_ioerror_on_close(self, mock_func_os_close): + """ Should raise DaemonError if an IOError occurs when closing. """ + fd = self.fake_fd + test_error = IOError(object(), "Unexpected error") + def fake_os_close(fd): + raise test_error + mock_func_os_close.side_effect = fake_os_close + expected_error = daemon.daemon.DaemonOSEnvironmentError + exc = self.assertRaises( + expected_error, + daemon.daemon.close_file_descriptor_if_open, fd) + self.assertEqual(test_error, exc.__cause__) + + +class maxfd_TestCase(scaffold.TestCase): + """ Test cases for module MAXFD constant. """ + + def test_positive(self): + """ Should be a positive number. """ + maxfd = daemon.daemon.MAXFD + self.assertTrue(maxfd > 0) + + def test_integer(self): + """ Should be an integer. """ + maxfd = daemon.daemon.MAXFD + self.assertEqual(int(maxfd), maxfd) + + def test_reasonably_high(self): + """ Should be reasonably high for default open files limit. + + If the system reports a limit of “infinity” on maximum + file descriptors, we still need a finite number in order + to close “all” of them. Ensure this is reasonably high + to catch most use cases. + + """ + expected_minimum = 2048 + maxfd = daemon.daemon.MAXFD + self.assertTrue( + expected_minimum <= maxfd, + msg=( + "MAXFD should be at least {minimum!r}" + " (got {maxfd!r})".format( + minimum=expected_minimum, maxfd=maxfd))) + + +fake_default_maxfd = 8 +fake_RLIMIT_NOFILE = object() +fake_RLIM_INFINITY = object() +fake_rlimit_nofile_large = 2468 + +def fake_getrlimit_nofile_soft_infinity(resource): + result = RLimitResult(soft=fake_RLIM_INFINITY, hard=object()) + if resource != fake_RLIMIT_NOFILE: + result = NotImplemented + return result + +def fake_getrlimit_nofile_hard_infinity(resource): + result = RLimitResult(soft=object(), hard=fake_RLIM_INFINITY) + if resource != fake_RLIMIT_NOFILE: + result = NotImplemented + return result + +def fake_getrlimit_nofile_hard_large(resource): + result = RLimitResult(soft=object(), hard=fake_rlimit_nofile_large) + if resource != fake_RLIMIT_NOFILE: + result = NotImplemented + return result + +@mock.patch.object(daemon.daemon, "MAXFD", new=fake_default_maxfd) +@mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE) +@mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY) +@mock.patch.object( + resource, "getrlimit", + side_effect=fake_getrlimit_nofile_hard_large) +class get_maximum_file_descriptors_TestCase(scaffold.TestCase): + """ Test cases for get_maximum_file_descriptors function. """ + + def test_returns_system_hard_limit(self, mock_func_resource_getrlimit): + """ Should return process hard limit on number of files. """ + expected_result = fake_rlimit_nofile_large + result = daemon.daemon.get_maximum_file_descriptors() + self.assertEqual(expected_result, result) + + def test_returns_module_default_if_hard_limit_infinity( + self, mock_func_resource_getrlimit): + """ Should return module MAXFD if hard limit is infinity. """ + mock_func_resource_getrlimit.side_effect = ( + fake_getrlimit_nofile_hard_infinity) + expected_result = fake_default_maxfd + result = daemon.daemon.get_maximum_file_descriptors() + self.assertEqual(expected_result, result) + + +def fake_get_maximum_file_descriptors(): + return fake_default_maxfd + +@mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE) +@mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY) +@mock.patch.object( + resource, "getrlimit", + new=fake_getrlimit_nofile_soft_infinity) +@mock.patch.object( + daemon.daemon, "get_maximum_file_descriptors", + new=fake_get_maximum_file_descriptors) +@mock.patch.object(daemon.daemon, "close_file_descriptor_if_open") +class close_all_open_files_TestCase(scaffold.TestCase): + """ Test cases for close_all_open_files function. """ + + def test_requests_all_open_files_to_close( + self, mock_func_close_file_descriptor_if_open): + """ Should request close of all open files. """ + expected_file_descriptors = range(fake_default_maxfd) + expected_calls = [ + mock.call(fd) for fd in expected_file_descriptors] + daemon.daemon.close_all_open_files() + mock_func_close_file_descriptor_if_open.assert_has_calls( + expected_calls, any_order=True) + + def test_requests_all_but_excluded_files_to_close( + self, mock_func_close_file_descriptor_if_open): + """ Should request close of all open files but those excluded. """ + test_exclude = set([3, 7]) + args = dict( + exclude=test_exclude, + ) + expected_file_descriptors = set( + fd for fd in range(fake_default_maxfd) + if fd not in test_exclude) + expected_calls = [ + mock.call(fd) for fd in expected_file_descriptors] + daemon.daemon.close_all_open_files(**args) + mock_func_close_file_descriptor_if_open.assert_has_calls( + expected_calls, any_order=True) + + +class detach_process_context_TestCase(scaffold.TestCase): + """ Test cases for detach_process_context function. """ + + class FakeOSExit(SystemExit): + """ Fake exception raised for os._exit(). """ + + def setUp(self): + """ Set up test fixtures. """ + super(detach_process_context_TestCase, self).setUp() + + self.mock_module_os = mock.MagicMock(wraps=os) + + fake_pids = [0, 0] + func_patcher_os_fork = mock.patch.object( + os, "fork", + side_effect=iter(fake_pids)) + self.mock_func_os_fork = func_patcher_os_fork.start() + self.addCleanup(func_patcher_os_fork.stop) + self.mock_module_os.attach_mock(self.mock_func_os_fork, "fork") + + func_patcher_os_setsid = mock.patch.object(os, "setsid") + self.mock_func_os_setsid = func_patcher_os_setsid.start() + self.addCleanup(func_patcher_os_setsid.stop) + self.mock_module_os.attach_mock(self.mock_func_os_setsid, "setsid") + + def raise_os_exit(status=None): + raise self.FakeOSExit(status) + + func_patcher_os_force_exit = mock.patch.object( + os, "_exit", + side_effect=raise_os_exit) + self.mock_func_os_force_exit = func_patcher_os_force_exit.start() + self.addCleanup(func_patcher_os_force_exit.stop) + self.mock_module_os.attach_mock(self.mock_func_os_force_exit, "_exit") + + def test_parent_exits(self): + """ Parent process should exit. """ + parent_pid = 23 + self.mock_func_os_fork.side_effect = iter([parent_pid]) + self.assertRaises( + self.FakeOSExit, + daemon.daemon.detach_process_context) + self.mock_module_os.assert_has_calls([ + mock.call.fork(), + mock.call._exit(0), + ]) + + def test_first_fork_error_raises_error(self): + """ Error on first fork should raise DaemonProcessDetachError. """ + fork_errno = 13 + fork_strerror = "Bad stuff happened" + test_error = OSError(fork_errno, fork_strerror) + test_pids_iter = iter([test_error]) + + def fake_fork(): + next_item = next(test_pids_iter) + if isinstance(next_item, Exception): + raise next_item + else: + return next_item + + self.mock_func_os_fork.side_effect = fake_fork + exc = self.assertRaises( + daemon.daemon.DaemonProcessDetachError, + daemon.daemon.detach_process_context) + self.assertEqual(test_error, exc.__cause__) + self.mock_module_os.assert_has_calls([ + mock.call.fork(), + ]) + + def test_child_starts_new_process_group(self): + """ Child should start new process group. """ + daemon.daemon.detach_process_context() + self.mock_module_os.assert_has_calls([ + mock.call.fork(), + mock.call.setsid(), + ]) + + def test_child_forks_next_parent_exits(self): + """ Child should fork, then exit if parent. """ + fake_pids = [0, 42] + self.mock_func_os_fork.side_effect = iter(fake_pids) + self.assertRaises( + self.FakeOSExit, + daemon.daemon.detach_process_context) + self.mock_module_os.assert_has_calls([ + mock.call.fork(), + mock.call.setsid(), + mock.call.fork(), + mock.call._exit(0), + ]) + + def test_second_fork_error_reports_to_stderr(self): + """ Error on second fork should cause report to stderr. """ + fork_errno = 17 + fork_strerror = "Nasty stuff happened" + test_error = OSError(fork_errno, fork_strerror) + test_pids_iter = iter([0, test_error]) + + def fake_fork(): + next_item = next(test_pids_iter) + if isinstance(next_item, Exception): + raise next_item + else: + return next_item + + self.mock_func_os_fork.side_effect = fake_fork + exc = self.assertRaises( + daemon.daemon.DaemonProcessDetachError, + daemon.daemon.detach_process_context) + self.assertEqual(test_error, exc.__cause__) + self.mock_module_os.assert_has_calls([ + mock.call.fork(), + mock.call.setsid(), + mock.call.fork(), + ]) + + def test_child_forks_next_child_continues(self): + """ Child should fork, then continue if child. """ + daemon.daemon.detach_process_context() + self.mock_module_os.assert_has_calls([ + mock.call.fork(), + mock.call.setsid(), + mock.call.fork(), + ]) + + +@mock.patch("os.getppid", return_value=765) +class is_process_started_by_init_TestCase(scaffold.TestCase): + """ Test cases for is_process_started_by_init function. """ + + def test_returns_false_by_default(self, mock_func_os_getppid): + """ Should return False under normal circumstances. """ + expected_result = False + result = daemon.daemon.is_process_started_by_init() + self.assertIs(result, expected_result) + + def test_returns_true_if_parent_process_is_init( + self, mock_func_os_getppid): + """ Should return True if parent process is `init`. """ + init_pid = 1 + mock_func_os_getppid.return_value = init_pid + expected_result = True + result = daemon.daemon.is_process_started_by_init() + self.assertIs(result, expected_result) + + +class is_socket_TestCase(scaffold.TestCase): + """ Test cases for is_socket function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(is_socket_TestCase, self).setUp() + + def fake_getsockopt(level, optname, buflen=None): + result = object() + if optname is socket.SO_TYPE: + result = socket.SOCK_RAW + return result + + self.fake_socket_getsockopt_func = fake_getsockopt + + self.fake_socket_error = socket.error( + errno.ENOTSOCK, + "Socket operation on non-socket") + + self.mock_socket = mock.MagicMock(spec=socket.socket) + self.mock_socket.getsockopt.side_effect = self.fake_socket_error + + def fake_socket_fromfd(fd, family, type, proto=None): + return self.mock_socket + + func_patcher_socket_fromfd = mock.patch.object( + socket, "fromfd", + side_effect=fake_socket_fromfd) + func_patcher_socket_fromfd.start() + self.addCleanup(func_patcher_socket_fromfd.stop) + + def test_returns_false_by_default(self): + """ Should return False under normal circumstances. """ + test_fd = 23 + expected_result = False + result = daemon.daemon.is_socket(test_fd) + self.assertIs(result, expected_result) + + def test_returns_true_if_stdin_is_socket(self): + """ Should return True if `stdin` is a socket. """ + test_fd = 23 + getsockopt = self.mock_socket.getsockopt + getsockopt.side_effect = self.fake_socket_getsockopt_func + expected_result = True + result = daemon.daemon.is_socket(test_fd) + self.assertIs(result, expected_result) + + def test_returns_false_if_stdin_socket_raises_error(self): + """ Should return True if `stdin` is a socket and raises error. """ + test_fd = 23 + getsockopt = self.mock_socket.getsockopt + getsockopt.side_effect = socket.error( + object(), "Weird socket stuff") + expected_result = True + result = daemon.daemon.is_socket(test_fd) + self.assertIs(result, expected_result) + + +class is_process_started_by_superserver_TestCase(scaffold.TestCase): + """ Test cases for is_process_started_by_superserver function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(is_process_started_by_superserver_TestCase, self).setUp() + + def fake_is_socket(fd): + if sys.__stdin__.fileno() == fd: + result = self.fake_stdin_is_socket_func() + else: + result = False + return result + + self.fake_stdin_is_socket_func = (lambda: False) + + func_patcher_is_socket = mock.patch.object( + daemon.daemon, "is_socket", + side_effect=fake_is_socket) + func_patcher_is_socket.start() + self.addCleanup(func_patcher_is_socket.stop) + + def test_returns_false_by_default(self): + """ Should return False under normal circumstances. """ + expected_result = False + result = daemon.daemon.is_process_started_by_superserver() + self.assertIs(result, expected_result) + + def test_returns_true_if_stdin_is_socket(self): + """ Should return True if `stdin` is a socket. """ + self.fake_stdin_is_socket_func = (lambda: True) + expected_result = True + result = daemon.daemon.is_process_started_by_superserver() + self.assertIs(result, expected_result) + + +@mock.patch.object( + daemon.daemon, "is_process_started_by_superserver", + return_value=False) +@mock.patch.object( + daemon.daemon, "is_process_started_by_init", + return_value=False) +class is_detach_process_context_required_TestCase(scaffold.TestCase): + """ Test cases for is_detach_process_context_required function. """ + + def test_returns_true_by_default( + self, + mock_func_is_process_started_by_init, + mock_func_is_process_started_by_superserver): + """ Should return True under normal circumstances. """ + expected_result = True + result = daemon.daemon.is_detach_process_context_required() + self.assertIs(result, expected_result) + + def test_returns_false_if_started_by_init( + self, + mock_func_is_process_started_by_init, + mock_func_is_process_started_by_superserver): + """ Should return False if current process started by init. """ + mock_func_is_process_started_by_init.return_value = True + expected_result = False + result = daemon.daemon.is_detach_process_context_required() + self.assertIs(result, expected_result) + + def test_returns_true_if_started_by_superserver( + self, + mock_func_is_process_started_by_init, + mock_func_is_process_started_by_superserver): + """ Should return False if current process started by superserver. """ + mock_func_is_process_started_by_superserver.return_value = True + expected_result = False + result = daemon.daemon.is_detach_process_context_required() + self.assertIs(result, expected_result) + + +def setup_streams_fixtures(testcase): + """ Set up common test fixtures for standard streams. """ + testcase.stream_file_paths = dict( + stdin=tempfile.mktemp(), + stdout=tempfile.mktemp(), + stderr=tempfile.mktemp(), + ) + + testcase.stream_files_by_name = dict( + (name, FakeFileDescriptorStringIO()) + for name in ['stdin', 'stdout', 'stderr'] + ) + + testcase.stream_files_by_path = dict( + (testcase.stream_file_paths[name], + testcase.stream_files_by_name[name]) + for name in ['stdin', 'stdout', 'stderr'] + ) + + +@mock.patch.object(os, "dup2") +class redirect_stream_TestCase(scaffold.TestCase): + """ Test cases for redirect_stream function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(redirect_stream_TestCase, self).setUp() + + self.test_system_stream = FakeFileDescriptorStringIO() + self.test_target_stream = FakeFileDescriptorStringIO() + self.test_null_file = FakeFileDescriptorStringIO() + + def fake_os_open(path, flag, mode=None): + if path == os.devnull: + result = self.test_null_file.fileno() + else: + raise FileNotFoundError("No such file", path) + return result + + func_patcher_os_open = mock.patch.object( + os, "open", + side_effect=fake_os_open) + self.mock_func_os_open = func_patcher_os_open.start() + self.addCleanup(func_patcher_os_open.stop) + + def test_duplicates_target_file_descriptor( + self, mock_func_os_dup2): + """ Should duplicate file descriptor from target to system stream. """ + system_stream = self.test_system_stream + system_fileno = system_stream.fileno() + target_stream = self.test_target_stream + target_fileno = target_stream.fileno() + daemon.daemon.redirect_stream(system_stream, target_stream) + mock_func_os_dup2.assert_called_with(target_fileno, system_fileno) + + def test_duplicates_null_file_descriptor_by_default( + self, mock_func_os_dup2): + """ Should by default duplicate the null file to the system stream. """ + system_stream = self.test_system_stream + system_fileno = system_stream.fileno() + target_stream = None + null_path = os.devnull + null_flag = os.O_RDWR + null_file = self.test_null_file + null_fileno = null_file.fileno() + daemon.daemon.redirect_stream(system_stream, target_stream) + self.mock_func_os_open.assert_called_with(null_path, null_flag) + mock_func_os_dup2.assert_called_with(null_fileno, system_fileno) + + +class make_default_signal_map_TestCase(scaffold.TestCase): + """ Test cases for make_default_signal_map function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(make_default_signal_map_TestCase, self).setUp() + + # Use whatever default string type this Python version needs. + signal_module_name = str('signal') + self.fake_signal_module = ModuleType(signal_module_name) + + fake_signal_names = [ + 'SIGHUP', + 'SIGCLD', + 'SIGSEGV', + 'SIGTSTP', + 'SIGTTIN', + 'SIGTTOU', + 'SIGTERM', + ] + for name in fake_signal_names: + setattr(self.fake_signal_module, name, object()) + + module_patcher_signal = mock.patch.object( + daemon.daemon, "signal", new=self.fake_signal_module) + module_patcher_signal.start() + self.addCleanup(module_patcher_signal.stop) + + default_signal_map_by_name = { + 'SIGTSTP': None, + 'SIGTTIN': None, + 'SIGTTOU': None, + 'SIGTERM': 'terminate', + } + self.default_signal_map = dict( + (getattr(self.fake_signal_module, name), target) + for (name, target) in default_signal_map_by_name.items()) + + def test_returns_constructed_signal_map(self): + """ Should return map per default. """ + expected_result = self.default_signal_map + result = daemon.daemon.make_default_signal_map() + self.assertEqual(expected_result, result) + + def test_returns_signal_map_with_only_ids_in_signal_module(self): + """ Should return map with only signals in the `signal` module. + + The `signal` module is documented to only define those + signals which exist on the running system. Therefore the + default map should not contain any signals which are not + defined in the `signal` module. + + """ + del(self.default_signal_map[self.fake_signal_module.SIGTTOU]) + del(self.fake_signal_module.SIGTTOU) + expected_result = self.default_signal_map + result = daemon.daemon.make_default_signal_map() + self.assertEqual(expected_result, result) + + +@mock.patch.object(daemon.daemon.signal, "signal") +class set_signal_handlers_TestCase(scaffold.TestCase): + """ Test cases for set_signal_handlers function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(set_signal_handlers_TestCase, self).setUp() + + self.signal_handler_map = { + signal.SIGQUIT: object(), + signal.SIGSEGV: object(), + signal.SIGINT: object(), + } + + def test_sets_signal_handler_for_each_item(self, mock_func_signal_signal): + """ Should set signal handler for each item in map. """ + signal_handler_map = self.signal_handler_map + expected_calls = [ + mock.call(signal_number, handler) + for (signal_number, handler) in signal_handler_map.items()] + daemon.daemon.set_signal_handlers(signal_handler_map) + self.assertEquals(expected_calls, mock_func_signal_signal.mock_calls) + + +@mock.patch.object(daemon.daemon.atexit, "register") +class register_atexit_function_TestCase(scaffold.TestCase): + """ Test cases for register_atexit_function function. """ + + def test_registers_function_for_atexit_processing( + self, mock_func_atexit_register): + """ Should register specified function for atexit processing. """ + func = object() + daemon.daemon.register_atexit_function(func) + mock_func_atexit_register.assert_called_with(func) + + +# Local variables: +# coding: utf-8 +# mode: python +# End: +# vim: fileencoding=utf-8 filetype=python : diff --git a/scripts/external_libs/python-daemon-2.0.5/test/test_metadata.py b/scripts/external_libs/python-daemon-2.0.5/test/test_metadata.py new file mode 100644 index 00000000..692753f4 --- /dev/null +++ b/scripts/external_libs/python-daemon-2.0.5/test/test_metadata.py @@ -0,0 +1,380 @@ +# -*- coding: utf-8 -*- +# +# test/test_metadata.py +# Part of ‘python-daemon’, an implementation of PEP 3143. +# +# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Apache License, version 2.0 as published by the +# Apache Software Foundation. +# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details. + +""" Unit test for ‘_metadata’ private module. + """ + +from __future__ import (absolute_import, unicode_literals) + +import sys +import errno +import re +try: + # Python 3 standard library. + import urllib.parse as urlparse +except ImportError: + # Python 2 standard library. + import urlparse +import functools +import collections +import json + +import pkg_resources +import mock +import testtools.helpers +import testtools.matchers +import testscenarios + +from . import scaffold +from .scaffold import (basestring, unicode) + +import daemon._metadata as metadata + + +class HasAttribute(testtools.matchers.Matcher): + """ A matcher to assert an object has a named attribute. """ + + def __init__(self, name): + self.attribute_name = name + + def match(self, instance): + """ Assert the object `instance` has an attribute named `name`. """ + result = None + if not testtools.helpers.safe_hasattr(instance, self.attribute_name): + result = AttributeNotFoundMismatch(instance, self.attribute_name) + return result + + +class AttributeNotFoundMismatch(testtools.matchers.Mismatch): + """ The specified instance does not have the named attribute. """ + + def __init__(self, instance, name): + self.instance = instance + self.attribute_name = name + + def describe(self): + """ Emit a text description of this mismatch. """ + text = ( + "{instance!r}" + " has no attribute named {name!r}").format( + instance=self.instance, name=self.attribute_name) + return text + + +class metadata_value_TestCase(scaffold.TestCaseWithScenarios): + """ Test cases for metadata module values. """ + + expected_str_attributes = set([ + 'version_installed', + 'author', + 'copyright', + 'license', + 'url', + ]) + + scenarios = [ + (name, {'attribute_name': name}) + for name in expected_str_attributes] + for (name, params) in scenarios: + if name == 'version_installed': + # No duck typing, this attribute might be None. + params['ducktype_attribute_name'] = NotImplemented + continue + # Expect an attribute of ‘str’ to test this value. + params['ducktype_attribute_name'] = 'isdigit' + + def test_module_has_attribute(self): + """ Metadata should have expected value as a module attribute. """ + self.assertThat( + metadata, HasAttribute(self.attribute_name)) + + def test_module_attribute_has_duck_type(self): + """ Metadata value should have expected duck-typing attribute. """ + if self.ducktype_attribute_name == NotImplemented: + self.skipTest("Can't assert this attribute's type") + instance = getattr(metadata, self.attribute_name) + self.assertThat( + instance, HasAttribute(self.ducktype_attribute_name)) + + +class parse_person_field_TestCase( + testscenarios.WithScenarios, testtools.TestCase): + """ Test cases for ‘get_latest_version’ function. """ + + scenarios = [ + ('simple', { + 'test_person': "Foo Bar <foo.bar@example.com>", + 'expected_result': ("Foo Bar", "foo.bar@example.com"), + }), + ('empty', { + 'test_person': "", + 'expected_result': (None, None), + }), + ('none', { + 'test_person': None, + 'expected_error': TypeError, + }), + ('no email', { + 'test_person': "Foo Bar", + 'expected_result': ("Foo Bar", None), + }), + ] + + def test_returns_expected_result(self): + """ Should return expected result. """ + if hasattr(self, 'expected_error'): + self.assertRaises( + self.expected_error, + metadata.parse_person_field, self.test_person) + else: + result = metadata.parse_person_field(self.test_person) + self.assertEqual(self.expected_result, result) + + +class YearRange_TestCase(scaffold.TestCaseWithScenarios): + """ Test cases for ‘YearRange’ class. """ + + scenarios = [ + ('simple', { + 'begin_year': 1970, + 'end_year': 1979, + 'expected_text': "1970–1979", + }), + ('same year', { + 'begin_year': 1970, + 'end_year': 1970, + 'expected_text': "1970", + }), + ('no end year', { + 'begin_year': 1970, + 'end_year': None, + 'expected_text': "1970", + }), + ] + + def setUp(self): + """ Set up test fixtures. """ + super(YearRange_TestCase, self).setUp() + + self.test_instance = metadata.YearRange( + self.begin_year, self.end_year) + + def test_text_representation_as_expected(self): + """ Text representation should be as expected. """ + result = unicode(self.test_instance) + self.assertEqual(result, self.expected_text) + + +FakeYearRange = collections.namedtuple('FakeYearRange', ['begin', 'end']) + +@mock.patch.object(metadata, 'YearRange', new=FakeYearRange) +class make_year_range_TestCase(scaffold.TestCaseWithScenarios): + """ Test cases for ‘make_year_range’ function. """ + + scenarios = [ + ('simple', { + 'begin_year': "1970", + 'end_date': "1979-01-01", + 'expected_range': FakeYearRange(begin=1970, end=1979), + }), + ('same year', { + 'begin_year': "1970", + 'end_date': "1970-01-01", + 'expected_range': FakeYearRange(begin=1970, end=1970), + }), + ('no end year', { + 'begin_year': "1970", + 'end_date': None, + 'expected_range': FakeYearRange(begin=1970, end=None), + }), + ('end date UNKNOWN token', { + 'begin_year': "1970", + 'end_date': "UNKNOWN", + 'expected_range': FakeYearRange(begin=1970, end=None), + }), + ('end date FUTURE token', { + 'begin_year': "1970", + 'end_date': "FUTURE", + 'expected_range': FakeYearRange(begin=1970, end=None), + }), + ] + + def test_result_matches_expected_range(self): + """ Result should match expected YearRange. """ + result = metadata.make_year_range(self.begin_year, self.end_date) + self.assertEqual(result, self.expected_range) + + +class metadata_content_TestCase(scaffold.TestCase): + """ Test cases for content of metadata. """ + + def test_copyright_formatted_correctly(self): + """ Copyright statement should be formatted correctly. """ + regex_pattern = ( + "Copyright © " + "\d{4}" # four-digit year + "(?:–\d{4})?" # optional range dash and ending four-digit year + ) + regex_flags = re.UNICODE + self.assertThat( + metadata.copyright, + testtools.matchers.MatchesRegex(regex_pattern, regex_flags)) + + def test_author_formatted_correctly(self): + """ Author information should be formatted correctly. """ + regex_pattern = ( + ".+ " # name + "<[^>]+>" # email address, in angle brackets + ) + regex_flags = re.UNICODE + self.assertThat( + metadata.author, + testtools.matchers.MatchesRegex(regex_pattern, regex_flags)) + + def test_copyright_contains_author(self): + """ Copyright information should contain author information. """ + self.assertThat( + metadata.copyright, + testtools.matchers.Contains(metadata.author)) + + def test_url_parses_correctly(self): + """ Homepage URL should parse correctly. """ + result = urlparse.urlparse(metadata.url) + self.assertIsInstance( + result, urlparse.ParseResult, + "URL value {url!r} did not parse correctly".format( + url=metadata.url)) + + +try: + FileNotFoundError +except NameError: + # Python 2 uses IOError. + FileNotFoundError = functools.partial(IOError, errno.ENOENT) + +version_info_filename = "version_info.json" + +def fake_func_has_metadata(testcase, resource_name): + """ Fake the behaviour of ‘pkg_resources.Distribution.has_metadata’. """ + if ( + resource_name != testcase.expected_resource_name + or not hasattr(testcase, 'test_version_info')): + return False + return True + + +def fake_func_get_metadata(testcase, resource_name): + """ Fake the behaviour of ‘pkg_resources.Distribution.get_metadata’. """ + if not fake_func_has_metadata(testcase, resource_name): + error = FileNotFoundError(resource_name) + raise error + content = testcase.test_version_info + return content + + +def fake_func_get_distribution(testcase, distribution_name): + """ Fake the behaviour of ‘pkg_resources.get_distribution’. """ + if distribution_name != metadata.distribution_name: + raise pkg_resources.DistributionNotFound + if hasattr(testcase, 'get_distribution_error'): + raise testcase.get_distribution_error + mock_distribution = testcase.mock_distribution + mock_distribution.has_metadata.side_effect = functools.partial( + fake_func_has_metadata, testcase) + mock_distribution.get_metadata.side_effect = functools.partial( + fake_func_get_metadata, testcase) + return mock_distribution + + +@mock.patch.object(metadata, 'distribution_name', new="mock-dist") +class get_distribution_version_info_TestCase(scaffold.TestCaseWithScenarios): + """ Test cases for ‘get_distribution_version_info’ function. """ + + default_version_info = { + 'release_date': "UNKNOWN", + 'version': "UNKNOWN", + 'maintainer': "UNKNOWN", + } + + scenarios = [ + ('version 0.0', { + 'test_version_info': json.dumps({ + 'version': "0.0", + }), + 'expected_version_info': {'version': "0.0"}, + }), + ('version 1.0', { + 'test_version_info': json.dumps({ + 'version': "1.0", + }), + 'expected_version_info': {'version': "1.0"}, + }), + ('file lorem_ipsum.json', { + 'version_info_filename': "lorem_ipsum.json", + 'test_version_info': json.dumps({ + 'version': "1.0", + }), + 'expected_version_info': {'version': "1.0"}, + }), + ('not installed', { + 'get_distribution_error': pkg_resources.DistributionNotFound(), + 'expected_version_info': default_version_info, + }), + ('no version_info', { + 'expected_version_info': default_version_info, + }), + ] + + def setUp(self): + """ Set up test fixtures. """ + super(get_distribution_version_info_TestCase, self).setUp() + + if hasattr(self, 'expected_resource_name'): + self.test_args = {'filename': self.expected_resource_name} + else: + self.test_args = {} + self.expected_resource_name = version_info_filename + + self.mock_distribution = mock.MagicMock() + func_patcher_get_distribution = mock.patch.object( + pkg_resources, 'get_distribution') + func_patcher_get_distribution.start() + self.addCleanup(func_patcher_get_distribution.stop) + pkg_resources.get_distribution.side_effect = functools.partial( + fake_func_get_distribution, self) + + def test_requests_installed_distribution(self): + """ The package distribution should be retrieved. """ + expected_distribution_name = metadata.distribution_name + version_info = metadata.get_distribution_version_info(**self.test_args) + pkg_resources.get_distribution.assert_called_with( + expected_distribution_name) + + def test_requests_specified_filename(self): + """ The specified metadata resource name should be requested. """ + if hasattr(self, 'get_distribution_error'): + self.skipTest("No access to distribution") + version_info = metadata.get_distribution_version_info(**self.test_args) + self.mock_distribution.has_metadata.assert_called_with( + self.expected_resource_name) + + def test_result_matches_expected_items(self): + """ The result should match the expected items. """ + version_info = metadata.get_distribution_version_info(**self.test_args) + self.assertEqual(self.expected_version_info, version_info) + + +# Local variables: +# coding: utf-8 +# mode: python +# End: +# vim: fileencoding=utf-8 filetype=python : diff --git a/scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py b/scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py new file mode 100644 index 00000000..9b636ec8 --- /dev/null +++ b/scripts/external_libs/python-daemon-2.0.5/test/test_pidfile.py @@ -0,0 +1,472 @@ +# -*- coding: utf-8 -*- +# +# test/test_pidfile.py +# Part of ‘python-daemon’, an implementation of PEP 3143. +# +# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Apache License, version 2.0 as published by the +# Apache Software Foundation. +# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details. + +""" Unit test for ‘pidfile’ module. + """ + +from __future__ import (absolute_import, unicode_literals) + +try: + # Python 3 standard library. + import builtins +except ImportError: + # Python 2 standard library. + import __builtin__ as builtins +import os +import itertools +import tempfile +import errno +import functools +try: + # Standard library of Python 2.7 and later. + from io import StringIO +except ImportError: + # Standard library of Python 2.6 and earlier. + from StringIO import StringIO + +import mock +import lockfile + +from . import scaffold + +import daemon.pidfile + + +class FakeFileDescriptorStringIO(StringIO, object): + """ A StringIO class that fakes a file descriptor. """ + + _fileno_generator = itertools.count() + + def __init__(self, *args, **kwargs): + self._fileno = next(self._fileno_generator) + super(FakeFileDescriptorStringIO, self).__init__(*args, **kwargs) + + def fileno(self): + return self._fileno + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +try: + FileNotFoundError + PermissionError +except NameError: + # Python 2 uses IOError. + FileNotFoundError = functools.partial(IOError, errno.ENOENT) + PermissionError = functools.partial(IOError, errno.EPERM) + + +def make_pidlockfile_scenarios(): + """ Make a collection of scenarios for testing `PIDLockFile` instances. + + :return: A collection of scenarios for tests involving + `PIDLockfFile` instances. + + The collection is a mapping from scenario name to a dictionary of + scenario attributes. + + """ + + fake_current_pid = 235 + fake_other_pid = 8642 + fake_pidfile_path = tempfile.mktemp() + + fake_pidfile_empty = FakeFileDescriptorStringIO() + fake_pidfile_current_pid = FakeFileDescriptorStringIO( + "{pid:d}\n".format(pid=fake_current_pid)) + fake_pidfile_other_pid = FakeFileDescriptorStringIO( + "{pid:d}\n".format(pid=fake_other_pid)) + fake_pidfile_bogus = FakeFileDescriptorStringIO( + "b0gUs") + + scenarios = { + 'simple': {}, + 'not-exist': { + 'open_func_name': 'fake_open_nonexist', + 'os_open_func_name': 'fake_os_open_nonexist', + }, + 'not-exist-write-denied': { + 'open_func_name': 'fake_open_nonexist', + 'os_open_func_name': 'fake_os_open_nonexist', + }, + 'not-exist-write-busy': { + 'open_func_name': 'fake_open_nonexist', + 'os_open_func_name': 'fake_os_open_nonexist', + }, + 'exist-read-denied': { + 'open_func_name': 'fake_open_read_denied', + 'os_open_func_name': 'fake_os_open_read_denied', + }, + 'exist-locked-read-denied': { + 'locking_pid': fake_other_pid, + 'open_func_name': 'fake_open_read_denied', + 'os_open_func_name': 'fake_os_open_read_denied', + }, + 'exist-empty': {}, + 'exist-invalid': { + 'pidfile': fake_pidfile_bogus, + }, + 'exist-current-pid': { + 'pidfile': fake_pidfile_current_pid, + 'pidfile_pid': fake_current_pid, + }, + 'exist-current-pid-locked': { + 'pidfile': fake_pidfile_current_pid, + 'pidfile_pid': fake_current_pid, + 'locking_pid': fake_current_pid, + }, + 'exist-other-pid': { + 'pidfile': fake_pidfile_other_pid, + 'pidfile_pid': fake_other_pid, + }, + 'exist-other-pid-locked': { + 'pidfile': fake_pidfile_other_pid, + 'pidfile_pid': fake_other_pid, + 'locking_pid': fake_other_pid, + }, + } + + for scenario in scenarios.values(): + scenario['pid'] = fake_current_pid + scenario['pidfile_path'] = fake_pidfile_path + if 'pidfile' not in scenario: + scenario['pidfile'] = fake_pidfile_empty + if 'pidfile_pid' not in scenario: + scenario['pidfile_pid'] = None + if 'locking_pid' not in scenario: + scenario['locking_pid'] = None + if 'open_func_name' not in scenario: + scenario['open_func_name'] = 'fake_open_okay' + if 'os_open_func_name' not in scenario: + scenario['os_open_func_name'] = 'fake_os_open_okay' + + return scenarios + + +def setup_pidfile_fixtures(testcase): + """ Set up common fixtures for PID file test cases. + + :param testcase: A `TestCase` instance to decorate. + + Decorate the `testcase` with attributes to be fixtures for tests + involving `PIDLockFile` instances. + + """ + scenarios = make_pidlockfile_scenarios() + testcase.pidlockfile_scenarios = scenarios + + def get_scenario_option(testcase, key, default=None): + value = default + try: + value = testcase.scenario[key] + except (NameError, TypeError, AttributeError, KeyError): + pass + return value + + func_patcher_os_getpid = mock.patch.object( + os, "getpid", + return_value=scenarios['simple']['pid']) + func_patcher_os_getpid.start() + testcase.addCleanup(func_patcher_os_getpid.stop) + + def make_fake_open_funcs(testcase): + + def fake_open_nonexist(filename, mode, buffering): + if mode.startswith('r'): + error = FileNotFoundError( + "No such file {filename!r}".format( + filename=filename)) + raise error + else: + result = testcase.scenario['pidfile'] + return result + + def fake_open_read_denied(filename, mode, buffering): + if mode.startswith('r'): + error = PermissionError( + "Read denied on {filename!r}".format( + filename=filename)) + raise error + else: + result = testcase.scenario['pidfile'] + return result + + def fake_open_okay(filename, mode, buffering): + result = testcase.scenario['pidfile'] + return result + + def fake_os_open_nonexist(filename, flags, mode): + if (flags & os.O_CREAT): + result = testcase.scenario['pidfile'].fileno() + else: + error = FileNotFoundError( + "No such file {filename!r}".format( + filename=filename)) + raise error + return result + + def fake_os_open_read_denied(filename, flags, mode): + if (flags & os.O_CREAT): + result = testcase.scenario['pidfile'].fileno() + else: + error = PermissionError( + "Read denied on {filename!r}".format( + filename=filename)) + raise error + return result + + def fake_os_open_okay(filename, flags, mode): + result = testcase.scenario['pidfile'].fileno() + return result + + funcs = dict( + (name, obj) for (name, obj) in vars().items() + if callable(obj)) + + return funcs + + testcase.fake_pidfile_open_funcs = make_fake_open_funcs(testcase) + + def fake_open(filename, mode='rt', buffering=None): + scenario_path = get_scenario_option(testcase, 'pidfile_path') + if filename == scenario_path: + func_name = testcase.scenario['open_func_name'] + fake_open_func = testcase.fake_pidfile_open_funcs[func_name] + result = fake_open_func(filename, mode, buffering) + else: + result = FakeFileDescriptorStringIO() + return result + + mock_open = mock.mock_open() + mock_open.side_effect = fake_open + + func_patcher_builtin_open = mock.patch.object( + builtins, "open", + new=mock_open) + func_patcher_builtin_open.start() + testcase.addCleanup(func_patcher_builtin_open.stop) + + def fake_os_open(filename, flags, mode=None): + scenario_path = get_scenario_option(testcase, 'pidfile_path') + if filename == scenario_path: + func_name = testcase.scenario['os_open_func_name'] + fake_os_open_func = testcase.fake_pidfile_open_funcs[func_name] + result = fake_os_open_func(filename, flags, mode) + else: + result = FakeFileDescriptorStringIO().fileno() + return result + + mock_os_open = mock.MagicMock(side_effect=fake_os_open) + + func_patcher_os_open = mock.patch.object( + os, "open", + new=mock_os_open) + func_patcher_os_open.start() + testcase.addCleanup(func_patcher_os_open.stop) + + def fake_os_fdopen(fd, mode='rt', buffering=None): + scenario_pidfile = get_scenario_option( + testcase, 'pidfile', FakeFileDescriptorStringIO()) + if fd == testcase.scenario['pidfile'].fileno(): + result = testcase.scenario['pidfile'] + else: + raise OSError(errno.EBADF, "Bad file descriptor") + return result + + mock_os_fdopen = mock.MagicMock(side_effect=fake_os_fdopen) + + func_patcher_os_fdopen = mock.patch.object( + os, "fdopen", + new=mock_os_fdopen) + func_patcher_os_fdopen.start() + testcase.addCleanup(func_patcher_os_fdopen.stop) + + +def make_lockfile_method_fakes(scenario): + """ Make common fake methods for lockfile class. + + :param scenario: A scenario for testing with PIDLockFile. + :return: A mapping from normal function name to the corresponding + fake function. + + Each fake function behaves appropriately for the specified `scenario`. + + """ + + def fake_func_read_pid(): + return scenario['pidfile_pid'] + def fake_func_is_locked(): + return (scenario['locking_pid'] is not None) + def fake_func_i_am_locking(): + return ( + scenario['locking_pid'] == scenario['pid']) + def fake_func_acquire(timeout=None): + if scenario['locking_pid'] is not None: + raise lockfile.AlreadyLocked() + scenario['locking_pid'] = scenario['pid'] + def fake_func_release(): + if scenario['locking_pid'] is None: + raise lockfile.NotLocked() + if scenario['locking_pid'] != scenario['pid']: + raise lockfile.NotMyLock() + scenario['locking_pid'] = None + def fake_func_break_lock(): + scenario['locking_pid'] = None + + fake_methods = dict( + ( + func_name.replace('fake_func_', ''), + mock.MagicMock(side_effect=fake_func)) + for (func_name, fake_func) in vars().items() + if func_name.startswith('fake_func_')) + + return fake_methods + + +def apply_lockfile_method_mocks(mock_lockfile, testcase, scenario): + """ Apply common fake methods to mock lockfile class. + + :param mock_lockfile: An object providing the `LockFile` interface. + :param testcase: The `TestCase` instance providing the context for + the patch. + :param scenario: The `PIDLockFile` test scenario to use. + + Mock the `LockFile` methods of `mock_lockfile`, by applying fake + methods customised for `scenario`. The mock is does by a patch + within the context of `testcase`. + + """ + fake_methods = dict( + (func_name, fake_func) + for (func_name, fake_func) in + make_lockfile_method_fakes(scenario).items() + if func_name not in ['read_pid']) + + for (func_name, fake_func) in fake_methods.items(): + func_patcher = mock.patch.object( + mock_lockfile, func_name, + new=fake_func) + func_patcher.start() + testcase.addCleanup(func_patcher.stop) + + +def setup_pidlockfile_fixtures(testcase, scenario_name=None): + """ Set up common fixtures for PIDLockFile test cases. + + :param testcase: The `TestCase` instance to decorate. + :param scenario_name: The name of the `PIDLockFile` scenario to use. + + Decorate the `testcase` with attributes that are fixtures for test + cases involving `PIDLockFile` instances.` + + """ + + setup_pidfile_fixtures(testcase) + + for func_name in [ + 'write_pid_to_pidfile', + 'remove_existing_pidfile', + ]: + func_patcher = mock.patch.object(lockfile.pidlockfile, func_name) + func_patcher.start() + testcase.addCleanup(func_patcher.stop) + + +class TimeoutPIDLockFile_TestCase(scaffold.TestCase): + """ Test cases for ‘TimeoutPIDLockFile’ class. """ + + def setUp(self): + """ Set up test fixtures. """ + super(TimeoutPIDLockFile_TestCase, self).setUp() + + pidlockfile_scenarios = make_pidlockfile_scenarios() + self.pidlockfile_scenario = pidlockfile_scenarios['simple'] + pidfile_path = self.pidlockfile_scenario['pidfile_path'] + + for func_name in ['__init__', 'acquire']: + func_patcher = mock.patch.object( + lockfile.pidlockfile.PIDLockFile, func_name) + func_patcher.start() + self.addCleanup(func_patcher.stop) + + self.scenario = { + 'pidfile_path': self.pidlockfile_scenario['pidfile_path'], + 'acquire_timeout': self.getUniqueInteger(), + } + + self.test_kwargs = dict( + path=self.scenario['pidfile_path'], + acquire_timeout=self.scenario['acquire_timeout'], + ) + self.test_instance = daemon.pidfile.TimeoutPIDLockFile( + **self.test_kwargs) + + def test_inherits_from_pidlockfile(self): + """ Should inherit from PIDLockFile. """ + instance = self.test_instance + self.assertIsInstance(instance, lockfile.pidlockfile.PIDLockFile) + + def test_init_has_expected_signature(self): + """ Should have expected signature for ‘__init__’. """ + def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass + test_func.__name__ = str('__init__') + self.assertFunctionSignatureMatch( + test_func, + daemon.pidfile.TimeoutPIDLockFile.__init__) + + def test_has_specified_acquire_timeout(self): + """ Should have specified ‘acquire_timeout’ value. """ + instance = self.test_instance + expected_timeout = self.test_kwargs['acquire_timeout'] + self.assertEqual(expected_timeout, instance.acquire_timeout) + + @mock.patch.object( + lockfile.pidlockfile.PIDLockFile, "__init__", + autospec=True) + def test_calls_superclass_init(self, mock_init): + """ Should call the superclass ‘__init__’. """ + expected_path = self.test_kwargs['path'] + instance = daemon.pidfile.TimeoutPIDLockFile(**self.test_kwargs) + mock_init.assert_called_with(instance, expected_path) + + @mock.patch.object( + lockfile.pidlockfile.PIDLockFile, "acquire", + autospec=True) + def test_acquire_uses_specified_timeout(self, mock_func_acquire): + """ Should call the superclass ‘acquire’ with specified timeout. """ + instance = self.test_instance + test_timeout = self.getUniqueInteger() + expected_timeout = test_timeout + instance.acquire(test_timeout) + mock_func_acquire.assert_called_with(instance, expected_timeout) + + @mock.patch.object( + lockfile.pidlockfile.PIDLockFile, "acquire", + autospec=True) + def test_acquire_uses_stored_timeout_by_default(self, mock_func_acquire): + """ Should call superclass ‘acquire’ with stored timeout by default. """ + instance = self.test_instance + test_timeout = self.test_kwargs['acquire_timeout'] + expected_timeout = test_timeout + instance.acquire() + mock_func_acquire.assert_called_with(instance, expected_timeout) + + +# Local variables: +# coding: utf-8 +# mode: python +# End: +# vim: fileencoding=utf-8 filetype=python : diff --git a/scripts/external_libs/python-daemon-2.0.5/test/test_runner.py b/scripts/external_libs/python-daemon-2.0.5/test/test_runner.py new file mode 100644 index 00000000..4c0c714a --- /dev/null +++ b/scripts/external_libs/python-daemon-2.0.5/test/test_runner.py @@ -0,0 +1,675 @@ +# -*- coding: utf-8 -*- +# +# test/test_runner.py +# Part of ‘python-daemon’, an implementation of PEP 3143. +# +# Copyright © 2009–2015 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Apache License, version 2.0 as published by the +# Apache Software Foundation. +# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details. + +""" Unit test for ‘runner’ module. + """ + +from __future__ import (absolute_import, unicode_literals) + +try: + # Python 3 standard library. + import builtins +except ImportError: + # Python 2 standard library. + import __builtin__ as builtins +import os +import os.path +import sys +import tempfile +import errno +import signal +import functools + +import lockfile +import mock +import testtools + +from . import scaffold +from .scaffold import (basestring, unicode) +from .test_pidfile import ( + FakeFileDescriptorStringIO, + setup_pidfile_fixtures, + make_pidlockfile_scenarios, + apply_lockfile_method_mocks, + ) +from .test_daemon import ( + setup_streams_fixtures, + ) + +import daemon.daemon +import daemon.runner +import daemon.pidfile + + +class ModuleExceptions_TestCase(scaffold.Exception_TestCase): + """ Test cases for module exception classes. """ + + scenarios = scaffold.make_exception_scenarios([ + ('daemon.runner.DaemonRunnerError', dict( + exc_type = daemon.runner.DaemonRunnerError, + min_args = 1, + types = [Exception], + )), + ('daemon.runner.DaemonRunnerInvalidActionError', dict( + exc_type = daemon.runner.DaemonRunnerInvalidActionError, + min_args = 1, + types = [daemon.runner.DaemonRunnerError, ValueError], + )), + ('daemon.runner.DaemonRunnerStartFailureError', dict( + exc_type = daemon.runner.DaemonRunnerStartFailureError, + min_args = 1, + types = [daemon.runner.DaemonRunnerError, RuntimeError], + )), + ('daemon.runner.DaemonRunnerStopFailureError', dict( + exc_type = daemon.runner.DaemonRunnerStopFailureError, + min_args = 1, + types = [daemon.runner.DaemonRunnerError, RuntimeError], + )), + ]) + + +def make_runner_scenarios(): + """ Make a collection of scenarios for testing `DaemonRunner` instances. + + :return: A collection of scenarios for tests involving + `DaemonRunner` instances. + + The collection is a mapping from scenario name to a dictionary of + scenario attributes. + + """ + + pidlockfile_scenarios = make_pidlockfile_scenarios() + + scenarios = { + 'simple': { + 'pidlockfile_scenario_name': 'simple', + }, + 'pidfile-locked': { + 'pidlockfile_scenario_name': 'exist-other-pid-locked', + }, + } + + for scenario in scenarios.values(): + if 'pidlockfile_scenario_name' in scenario: + pidlockfile_scenario = pidlockfile_scenarios.pop( + scenario['pidlockfile_scenario_name']) + scenario['pid'] = pidlockfile_scenario['pid'] + scenario['pidfile_path'] = pidlockfile_scenario['pidfile_path'] + scenario['pidfile_timeout'] = 23 + scenario['pidlockfile_scenario'] = pidlockfile_scenario + + return scenarios + + +def set_runner_scenario(testcase, scenario_name): + """ Set the DaemonRunner test scenario for the test case. + + :param testcase: The `TestCase` instance to decorate. + :param scenario_name: The name of the scenario to use. + + Set the `DaemonRunner` test scenario name and decorate the + `testcase` with the corresponding scenario fixtures. + + """ + scenarios = testcase.runner_scenarios + testcase.scenario = scenarios[scenario_name] + apply_lockfile_method_mocks( + testcase.mock_runner_lockfile, + testcase, + testcase.scenario['pidlockfile_scenario']) + + +def setup_runner_fixtures(testcase): + """ Set up common fixtures for `DaemonRunner` test cases. + + :param testcase: A `TestCase` instance to decorate. + + Decorate the `testcase` with attributes to be fixtures for tests + involving `DaemonRunner` instances. + + """ + setup_pidfile_fixtures(testcase) + setup_streams_fixtures(testcase) + + testcase.runner_scenarios = make_runner_scenarios() + + patcher_stderr = mock.patch.object( + sys, "stderr", + new=FakeFileDescriptorStringIO()) + testcase.fake_stderr = patcher_stderr.start() + testcase.addCleanup(patcher_stderr.stop) + + simple_scenario = testcase.runner_scenarios['simple'] + + testcase.mock_runner_lockfile = mock.MagicMock( + spec=daemon.pidfile.TimeoutPIDLockFile) + apply_lockfile_method_mocks( + testcase.mock_runner_lockfile, + testcase, + simple_scenario['pidlockfile_scenario']) + testcase.mock_runner_lockfile.path = simple_scenario['pidfile_path'] + + patcher_lockfile_class = mock.patch.object( + daemon.pidfile, "TimeoutPIDLockFile", + return_value=testcase.mock_runner_lockfile) + patcher_lockfile_class.start() + testcase.addCleanup(patcher_lockfile_class.stop) + + class TestApp(object): + + def __init__(self): + self.stdin_path = testcase.stream_file_paths['stdin'] + self.stdout_path = testcase.stream_file_paths['stdout'] + self.stderr_path = testcase.stream_file_paths['stderr'] + self.pidfile_path = simple_scenario['pidfile_path'] + self.pidfile_timeout = simple_scenario['pidfile_timeout'] + + run = mock.MagicMock(name="TestApp.run") + + testcase.TestApp = TestApp + + patcher_runner_daemoncontext = mock.patch.object( + daemon.runner, "DaemonContext", autospec=True) + patcher_runner_daemoncontext.start() + testcase.addCleanup(patcher_runner_daemoncontext.stop) + + testcase.test_app = testcase.TestApp() + + testcase.test_program_name = "bazprog" + testcase.test_program_path = os.path.join( + "/foo/bar", testcase.test_program_name) + testcase.valid_argv_params = { + 'start': [testcase.test_program_path, 'start'], + 'stop': [testcase.test_program_path, 'stop'], + 'restart': [testcase.test_program_path, 'restart'], + } + + def fake_open(filename, mode=None, buffering=None): + if filename in testcase.stream_files_by_path: + result = testcase.stream_files_by_path[filename] + else: + result = FakeFileDescriptorStringIO() + result.mode = mode + result.buffering = buffering + return result + + mock_open = mock.mock_open() + mock_open.side_effect = fake_open + + func_patcher_builtin_open = mock.patch.object( + builtins, "open", + new=mock_open) + func_patcher_builtin_open.start() + testcase.addCleanup(func_patcher_builtin_open.stop) + + func_patcher_os_kill = mock.patch.object(os, "kill") + func_patcher_os_kill.start() + testcase.addCleanup(func_patcher_os_kill.stop) + + patcher_sys_argv = mock.patch.object( + sys, "argv", + new=testcase.valid_argv_params['start']) + patcher_sys_argv.start() + testcase.addCleanup(patcher_sys_argv.stop) + + testcase.test_instance = daemon.runner.DaemonRunner(testcase.test_app) + + testcase.scenario = NotImplemented + + +class DaemonRunner_BaseTestCase(scaffold.TestCase): + """ Base class for DaemonRunner test case classes. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonRunner_BaseTestCase, self).setUp() + + setup_runner_fixtures(self) + set_runner_scenario(self, 'simple') + + +class DaemonRunner_TestCase(DaemonRunner_BaseTestCase): + """ Test cases for DaemonRunner class. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonRunner_TestCase, self).setUp() + + func_patcher_parse_args = mock.patch.object( + daemon.runner.DaemonRunner, "parse_args") + func_patcher_parse_args.start() + self.addCleanup(func_patcher_parse_args.stop) + + # Create a new instance now with our custom patches. + self.test_instance = daemon.runner.DaemonRunner(self.test_app) + + def test_instantiate(self): + """ New instance of DaemonRunner should be created. """ + self.assertIsInstance(self.test_instance, daemon.runner.DaemonRunner) + + def test_parses_commandline_args(self): + """ Should parse commandline arguments. """ + self.test_instance.parse_args.assert_called_with() + + def test_has_specified_app(self): + """ Should have specified application object. """ + self.assertIs(self.test_app, self.test_instance.app) + + def test_sets_pidfile_none_when_pidfile_path_is_none(self): + """ Should set ‘pidfile’ to ‘None’ when ‘pidfile_path’ is ‘None’. """ + pidfile_path = None + self.test_app.pidfile_path = pidfile_path + expected_pidfile = None + instance = daemon.runner.DaemonRunner(self.test_app) + self.assertIs(expected_pidfile, instance.pidfile) + + def test_error_when_pidfile_path_not_string(self): + """ Should raise ValueError when PID file path not a string. """ + pidfile_path = object() + self.test_app.pidfile_path = pidfile_path + expected_error = ValueError + self.assertRaises( + expected_error, + daemon.runner.DaemonRunner, self.test_app) + + def test_error_when_pidfile_path_not_absolute(self): + """ Should raise ValueError when PID file path not absolute. """ + pidfile_path = "foo/bar.pid" + self.test_app.pidfile_path = pidfile_path + expected_error = ValueError + self.assertRaises( + expected_error, + daemon.runner.DaemonRunner, self.test_app) + + def test_creates_lock_with_specified_parameters(self): + """ Should create a TimeoutPIDLockFile with specified params. """ + pidfile_path = self.scenario['pidfile_path'] + pidfile_timeout = self.scenario['pidfile_timeout'] + daemon.pidfile.TimeoutPIDLockFile.assert_called_with( + pidfile_path, pidfile_timeout) + + def test_has_created_pidfile(self): + """ Should have new PID lock file as `pidfile` attribute. """ + expected_pidfile = self.mock_runner_lockfile + instance = self.test_instance + self.assertIs( + expected_pidfile, instance.pidfile) + + def test_daemon_context_has_created_pidfile(self): + """ DaemonContext component should have new PID lock file. """ + expected_pidfile = self.mock_runner_lockfile + daemon_context = self.test_instance.daemon_context + self.assertIs( + expected_pidfile, daemon_context.pidfile) + + def test_daemon_context_has_specified_stdin_stream(self): + """ DaemonContext component should have specified stdin file. """ + test_app = self.test_app + expected_file = self.stream_files_by_name['stdin'] + daemon_context = self.test_instance.daemon_context + self.assertEqual(expected_file, daemon_context.stdin) + + def test_daemon_context_has_stdin_in_read_mode(self): + """ DaemonContext component should open stdin file for read. """ + expected_mode = 'rt' + daemon_context = self.test_instance.daemon_context + self.assertIn(expected_mode, daemon_context.stdin.mode) + + def test_daemon_context_has_specified_stdout_stream(self): + """ DaemonContext component should have specified stdout file. """ + test_app = self.test_app + expected_file = self.stream_files_by_name['stdout'] + daemon_context = self.test_instance.daemon_context + self.assertEqual(expected_file, daemon_context.stdout) + + def test_daemon_context_has_stdout_in_append_mode(self): + """ DaemonContext component should open stdout file for append. """ + expected_mode = 'w+t' + daemon_context = self.test_instance.daemon_context + self.assertIn(expected_mode, daemon_context.stdout.mode) + + def test_daemon_context_has_specified_stderr_stream(self): + """ DaemonContext component should have specified stderr file. """ + test_app = self.test_app + expected_file = self.stream_files_by_name['stderr'] + daemon_context = self.test_instance.daemon_context + self.assertEqual(expected_file, daemon_context.stderr) + + def test_daemon_context_has_stderr_in_append_mode(self): + """ DaemonContext component should open stderr file for append. """ + expected_mode = 'w+t' + daemon_context = self.test_instance.daemon_context + self.assertIn(expected_mode, daemon_context.stderr.mode) + + def test_daemon_context_has_stderr_with_no_buffering(self): + """ DaemonContext component should open stderr file unbuffered. """ + expected_buffering = 0 + daemon_context = self.test_instance.daemon_context + self.assertEqual( + expected_buffering, daemon_context.stderr.buffering) + + +class DaemonRunner_usage_exit_TestCase(DaemonRunner_BaseTestCase): + """ Test cases for DaemonRunner.usage_exit method. """ + + def test_raises_system_exit(self): + """ Should raise SystemExit exception. """ + instance = self.test_instance + argv = [self.test_program_path] + self.assertRaises( + SystemExit, + instance._usage_exit, argv) + + def test_message_follows_conventional_format(self): + """ Should emit a conventional usage message. """ + instance = self.test_instance + argv = [self.test_program_path] + expected_stderr_output = """\ + usage: {progname} ... + """.format( + progname=self.test_program_name) + self.assertRaises( + SystemExit, + instance._usage_exit, argv) + self.assertOutputCheckerMatch( + expected_stderr_output, self.fake_stderr.getvalue()) + + +class DaemonRunner_parse_args_TestCase(DaemonRunner_BaseTestCase): + """ Test cases for DaemonRunner.parse_args method. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonRunner_parse_args_TestCase, self).setUp() + + func_patcher_usage_exit = mock.patch.object( + daemon.runner.DaemonRunner, "_usage_exit", + side_effect=NotImplementedError) + func_patcher_usage_exit.start() + self.addCleanup(func_patcher_usage_exit.stop) + + def test_emits_usage_message_if_insufficient_args(self): + """ Should emit a usage message and exit if too few arguments. """ + instance = self.test_instance + argv = [self.test_program_path] + exc = self.assertRaises( + NotImplementedError, + instance.parse_args, argv) + daemon.runner.DaemonRunner._usage_exit.assert_called_with(argv) + + def test_emits_usage_message_if_unknown_action_arg(self): + """ Should emit a usage message and exit if unknown action. """ + instance = self.test_instance + progname = self.test_program_name + argv = [self.test_program_path, 'bogus'] + exc = self.assertRaises( + NotImplementedError, + instance.parse_args, argv) + daemon.runner.DaemonRunner._usage_exit.assert_called_with(argv) + + def test_should_parse_system_argv_by_default(self): + """ Should parse sys.argv by default. """ + instance = self.test_instance + expected_action = 'start' + argv = self.valid_argv_params['start'] + with mock.patch.object(sys, "argv", new=argv): + instance.parse_args() + self.assertEqual(expected_action, instance.action) + + def test_sets_action_from_first_argument(self): + """ Should set action from first commandline argument. """ + instance = self.test_instance + for name, argv in self.valid_argv_params.items(): + expected_action = name + instance.parse_args(argv) + self.assertEqual(expected_action, instance.action) + + +try: + ProcessLookupError +except NameError: + # Python 2 uses OSError. + ProcessLookupError = functools.partial(OSError, errno.ESRCH) + +class DaemonRunner_do_action_TestCase(DaemonRunner_BaseTestCase): + """ Test cases for DaemonRunner.do_action method. """ + + def test_raises_error_if_unknown_action(self): + """ Should emit a usage message and exit if action is unknown. """ + instance = self.test_instance + instance.action = 'bogus' + expected_error = daemon.runner.DaemonRunnerInvalidActionError + self.assertRaises( + expected_error, + instance.do_action) + + +class DaemonRunner_do_action_start_TestCase(DaemonRunner_BaseTestCase): + """ Test cases for DaemonRunner.do_action method, action 'start'. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonRunner_do_action_start_TestCase, self).setUp() + + self.test_instance.action = 'start' + + def test_raises_error_if_pidfile_locked(self): + """ Should raise error if PID file is locked. """ + + instance = self.test_instance + instance.daemon_context.open.side_effect = lockfile.AlreadyLocked + pidfile_path = self.scenario['pidfile_path'] + expected_error = daemon.runner.DaemonRunnerStartFailureError + expected_message_content = pidfile_path + exc = self.assertRaises( + expected_error, + instance.do_action) + self.assertIn(expected_message_content, unicode(exc)) + + def test_breaks_lock_if_no_such_process(self): + """ Should request breaking lock if PID file process is not running. """ + set_runner_scenario(self, 'pidfile-locked') + instance = self.test_instance + self.mock_runner_lockfile.read_pid.return_value = ( + self.scenario['pidlockfile_scenario']['pidfile_pid']) + pidfile_path = self.scenario['pidfile_path'] + test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid'] + expected_signal = signal.SIG_DFL + test_error = ProcessLookupError("Not running") + os.kill.side_effect = test_error + instance.do_action() + os.kill.assert_called_with(test_pid, expected_signal) + self.mock_runner_lockfile.break_lock.assert_called_with() + + def test_requests_daemon_context_open(self): + """ Should request the daemon context to open. """ + instance = self.test_instance + instance.do_action() + instance.daemon_context.open.assert_called_with() + + def test_emits_start_message_to_stderr(self): + """ Should emit start message to stderr. """ + instance = self.test_instance + expected_stderr = """\ + started with pid {pid:d} + """.format( + pid=self.scenario['pid']) + instance.do_action() + self.assertOutputCheckerMatch( + expected_stderr, self.fake_stderr.getvalue()) + + def test_requests_app_run(self): + """ Should request the application to run. """ + instance = self.test_instance + instance.do_action() + self.test_app.run.assert_called_with() + + +class DaemonRunner_do_action_stop_TestCase(DaemonRunner_BaseTestCase): + """ Test cases for DaemonRunner.do_action method, action 'stop'. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonRunner_do_action_stop_TestCase, self).setUp() + + set_runner_scenario(self, 'pidfile-locked') + + self.test_instance.action = 'stop' + + self.mock_runner_lockfile.is_locked.return_value = True + self.mock_runner_lockfile.i_am_locking.return_value = False + self.mock_runner_lockfile.read_pid.return_value = ( + self.scenario['pidlockfile_scenario']['pidfile_pid']) + + def test_raises_error_if_pidfile_not_locked(self): + """ Should raise error if PID file is not locked. """ + set_runner_scenario(self, 'simple') + instance = self.test_instance + self.mock_runner_lockfile.is_locked.return_value = False + self.mock_runner_lockfile.i_am_locking.return_value = False + self.mock_runner_lockfile.read_pid.return_value = ( + self.scenario['pidlockfile_scenario']['pidfile_pid']) + pidfile_path = self.scenario['pidfile_path'] + expected_error = daemon.runner.DaemonRunnerStopFailureError + expected_message_content = pidfile_path + exc = self.assertRaises( + expected_error, + instance.do_action) + self.assertIn(expected_message_content, unicode(exc)) + + def test_breaks_lock_if_pidfile_stale(self): + """ Should break lock if PID file is stale. """ + instance = self.test_instance + pidfile_path = self.scenario['pidfile_path'] + test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid'] + expected_signal = signal.SIG_DFL + test_error = OSError(errno.ESRCH, "Not running") + os.kill.side_effect = test_error + instance.do_action() + self.mock_runner_lockfile.break_lock.assert_called_with() + + def test_sends_terminate_signal_to_process_from_pidfile(self): + """ Should send SIGTERM to the daemon process. """ + instance = self.test_instance + test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid'] + expected_signal = signal.SIGTERM + instance.do_action() + os.kill.assert_called_with(test_pid, expected_signal) + + def test_raises_error_if_cannot_send_signal_to_process(self): + """ Should raise error if cannot send signal to daemon process. """ + instance = self.test_instance + test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid'] + pidfile_path = self.scenario['pidfile_path'] + test_error = OSError(errno.EPERM, "Nice try") + os.kill.side_effect = test_error + expected_error = daemon.runner.DaemonRunnerStopFailureError + expected_message_content = unicode(test_pid) + exc = self.assertRaises( + expected_error, + instance.do_action) + self.assertIn(expected_message_content, unicode(exc)) + + +@mock.patch.object(daemon.runner.DaemonRunner, "_start") +@mock.patch.object(daemon.runner.DaemonRunner, "_stop") +class DaemonRunner_do_action_restart_TestCase(DaemonRunner_BaseTestCase): + """ Test cases for DaemonRunner.do_action method, action 'restart'. """ + + def setUp(self): + """ Set up test fixtures. """ + super(DaemonRunner_do_action_restart_TestCase, self).setUp() + + set_runner_scenario(self, 'pidfile-locked') + + self.test_instance.action = 'restart' + + def test_requests_stop_then_start( + self, + mock_func_daemonrunner_start, mock_func_daemonrunner_stop): + """ Should request stop, then start. """ + instance = self.test_instance + instance.do_action() + mock_func_daemonrunner_start.assert_called_with() + mock_func_daemonrunner_stop.assert_called_with() + + +@mock.patch.object(sys, "stderr") +class emit_message_TestCase(scaffold.TestCase): + """ Test cases for ‘emit_message’ function. """ + + def test_writes_specified_message_to_stream(self, mock_stderr): + """ Should write specified message to stream. """ + test_message = self.getUniqueString() + expected_content = "{message}\n".format(message=test_message) + daemon.runner.emit_message(test_message, stream=mock_stderr) + mock_stderr.write.assert_called_with(expected_content) + + def test_writes_to_specified_stream(self, mock_stderr): + """ Should write message to specified stream. """ + test_message = self.getUniqueString() + mock_stream = mock.MagicMock() + daemon.runner.emit_message(test_message, stream=mock_stream) + mock_stream.write.assert_called_with(mock.ANY) + + def test_writes_to_stderr_by_default(self, mock_stderr): + """ Should write message to ‘sys.stderr’ by default. """ + test_message = self.getUniqueString() + daemon.runner.emit_message(test_message) + mock_stderr.write.assert_called_with(mock.ANY) + + +class is_pidfile_stale_TestCase(scaffold.TestCase): + """ Test cases for ‘is_pidfile_stale’ function. """ + + def setUp(self): + """ Set up test fixtures. """ + super(is_pidfile_stale_TestCase, self).setUp() + + func_patcher_os_kill = mock.patch.object(os, "kill") + func_patcher_os_kill.start() + self.addCleanup(func_patcher_os_kill.stop) + os.kill.return_value = None + + self.test_pid = self.getUniqueInteger() + self.test_pidfile = mock.MagicMock(daemon.pidfile.TimeoutPIDLockFile) + self.test_pidfile.read_pid.return_value = self.test_pid + + def test_returns_false_if_no_pid_in_file(self): + """ Should return False if the pidfile contains no PID. """ + self.test_pidfile.read_pid.return_value = None + expected_result = False + result = daemon.runner.is_pidfile_stale(self.test_pidfile) + self.assertEqual(expected_result, result) + + def test_returns_false_if_process_exists(self): + """ Should return False if the process with its PID exists. """ + expected_result = False + result = daemon.runner.is_pidfile_stale(self.test_pidfile) + self.assertEqual(expected_result, result) + + def test_returns_true_if_process_does_not_exist(self): + """ Should return True if the process does not exist. """ + test_error = ProcessLookupError("No such process") + del os.kill.return_value + os.kill.side_effect = test_error + expected_result = True + result = daemon.runner.is_pidfile_stale(self.test_pidfile) + self.assertEqual(expected_result, result) + + +# Local variables: +# coding: utf-8 +# mode: python +# End: +# vim: fileencoding=utf-8 filetype=python : |