summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py')
-rwxr-xr-xscripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py835
1 files changed, 835 insertions, 0 deletions
diff --git a/scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py b/scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py
new file mode 100755
index 00000000..2cae744a
--- /dev/null
+++ b/scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py
@@ -0,0 +1,835 @@
+"""
+Overview
+========
+
+The multiprocess plugin enables you to distribute your test run among a set of
+worker processes that run tests in parallel. This can speed up CPU-bound test
+runs (as long as the number of work processeses is around the number of
+processors or cores available), but is mainly useful for IO-bound tests that
+spend most of their time waiting for data to arrive from someplace else.
+
+.. note ::
+
+ See :doc:`../doc_tests/test_multiprocess/multiprocess` for
+ additional documentation and examples. Use of this plugin on python
+ 2.5 or earlier requires the multiprocessing_ module, also available
+ from PyPI.
+
+.. _multiprocessing : http://code.google.com/p/python-multiprocessing/
+
+How tests are distributed
+=========================
+
+The ideal case would be to dispatch each test to a worker process
+separately. This ideal is not attainable in all cases, however, because many
+test suites depend on context (class, module or package) fixtures.
+
+The plugin can't know (unless you tell it -- see below!) if a context fixture
+can be called many times concurrently (is re-entrant), or if it can be shared
+among tests running in different processes. Therefore, if a context has
+fixtures, the default behavior is to dispatch the entire suite to a worker as
+a unit.
+
+Controlling distribution
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+There are two context-level variables that you can use to control this default
+behavior.
+
+If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
+in the context, and the plugin will dispatch tests in suites bound to that
+context as if the context had no fixtures. This means that the fixtures will
+execute concurrently and multiple times, typically once per test.
+
+If a context's fixtures can be shared by tests running in different processes
+-- such as a package-level fixture that starts an external http server or
+initializes a shared database -- then set ``_multiprocess_shared_ = True`` in
+the context. These fixtures will then execute in the primary nose process, and
+tests in those contexts will be individually dispatched to run in parallel.
+
+How results are collected and reported
+======================================
+
+As each test or suite executes in a worker process, results (failures, errors,
+and specially handled exceptions like SkipTest) are collected in that
+process. When the worker process finishes, it returns results to the main
+nose process. There, any progress output is printed (dots!), and the
+results from the test run are combined into a consolidated result
+set. When results have been received for all dispatched tests, or all
+workers have died, the result summary is output as normal.
+
+Beware!
+=======
+
+Not all test suites will benefit from, or even operate correctly using, this
+plugin. For example, CPU-bound tests will run more slowly if you don't have
+multiple processors. There are also some differences in plugin
+interactions and behaviors due to the way in which tests are dispatched and
+loaded. In general, test loading under this plugin operates as if it were
+always in directed mode instead of discovered mode. For instance, doctests
+in test modules will always be found when using this plugin with the doctest
+plugin.
+
+But the biggest issue you will face is probably concurrency. Unless you
+have kept your tests as religiously pure unit tests, with no side-effects, no
+ordering issues, and no external dependencies, chances are you will experience
+odd, intermittent and unexplainable failures and errors when using this
+plugin. This doesn't necessarily mean the plugin is broken; it may mean that
+your test suite is not safe for concurrency.
+
+New Features in 1.1.0
+=====================
+
+* functions generated by test generators are now added to the worker queue
+ making them multi-threaded.
+* fixed timeout functionality, now functions will be terminated with a
+ TimedOutException exception when they exceed their execution time. The
+ worker processes are not terminated.
+* added ``--process-restartworker`` option to restart workers once they are
+ done, this helps control memory usage. Sometimes memory leaks can accumulate
+ making long runs very difficult.
+* added global _instantiate_plugins to configure which plugins are started
+ on the worker processes.
+
+"""
+
+import logging
+import os
+import sys
+import time
+import traceback
+import unittest
+import pickle
+import signal
+import nose.case
+from nose.core import TextTestRunner
+from nose import failure
+from nose import loader
+from nose.plugins.base import Plugin
+from nose.pyversion import bytes_
+from nose.result import TextTestResult
+from nose.suite import ContextSuite
+from nose.util import test_address
+try:
+ # 2.7+
+ from unittest.runner import _WritelnDecorator
+except ImportError:
+ from unittest import _WritelnDecorator
+from Queue import Empty
+from warnings import warn
+try:
+ from cStringIO import StringIO
+except ImportError:
+ import StringIO
+
+# this is a list of plugin classes that will be checked for and created inside
+# each worker process
+_instantiate_plugins = None
+
+log = logging.getLogger(__name__)
+
+Process = Queue = Pool = Event = Value = Array = None
+
+# have to inherit KeyboardInterrupt to it will interrupt process properly
+class TimedOutException(KeyboardInterrupt):
+ def __init__(self, value = "Timed Out"):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+def _import_mp():
+ global Process, Queue, Pool, Event, Value, Array
+ try:
+ from multiprocessing import Manager, Process
+ #prevent the server process created in the manager which holds Python
+ #objects and allows other processes to manipulate them using proxies
+ #to interrupt on SIGINT (keyboardinterrupt) so that the communication
+ #channel between subprocesses and main process is still usable after
+ #ctrl+C is received in the main process.
+ old=signal.signal(signal.SIGINT, signal.SIG_IGN)
+ m = Manager()
+ #reset it back so main process will receive a KeyboardInterrupt
+ #exception on ctrl+c
+ signal.signal(signal.SIGINT, old)
+ Queue, Pool, Event, Value, Array = (
+ m.Queue, m.Pool, m.Event, m.Value, m.Array
+ )
+ except ImportError:
+ warn("multiprocessing module is not available, multiprocess plugin "
+ "cannot be used", RuntimeWarning)
+
+
+class TestLet:
+ def __init__(self, case):
+ try:
+ self._id = case.id()
+ except AttributeError:
+ pass
+ self._short_description = case.shortDescription()
+ self._str = str(case)
+
+ def id(self):
+ return self._id
+
+ def shortDescription(self):
+ return self._short_description
+
+ def __str__(self):
+ return self._str
+
+class MultiProcess(Plugin):
+ """
+ Run tests in multiple processes. Requires processing module.
+ """
+ score = 1000
+ status = {}
+
+ def options(self, parser, env):
+ """
+ Register command-line options.
+ """
+ parser.add_option("--processes", action="store",
+ default=env.get('NOSE_PROCESSES', 0),
+ dest="multiprocess_workers",
+ metavar="NUM",
+ help="Spread test run among this many processes. "
+ "Set a number equal to the number of processors "
+ "or cores in your machine for best results. "
+ "Pass a negative number to have the number of "
+ "processes automatically set to the number of "
+ "cores. Passing 0 means to disable parallel "
+ "testing. Default is 0 unless NOSE_PROCESSES is "
+ "set. "
+ "[NOSE_PROCESSES]")
+ parser.add_option("--process-timeout", action="store",
+ default=env.get('NOSE_PROCESS_TIMEOUT', 10),
+ dest="multiprocess_timeout",
+ metavar="SECONDS",
+ help="Set timeout for return of results from each "
+ "test runner process. Default is 10. "
+ "[NOSE_PROCESS_TIMEOUT]")
+ parser.add_option("--process-restartworker", action="store_true",
+ default=env.get('NOSE_PROCESS_RESTARTWORKER', False),
+ dest="multiprocess_restartworker",
+ help="If set, will restart each worker process once"
+ " their tests are done, this helps control memory "
+ "leaks from killing the system. "
+ "[NOSE_PROCESS_RESTARTWORKER]")
+
+ def configure(self, options, config):
+ """
+ Configure plugin.
+ """
+ try:
+ self.status.pop('active')
+ except KeyError:
+ pass
+ if not hasattr(options, 'multiprocess_workers'):
+ self.enabled = False
+ return
+ # don't start inside of a worker process
+ if config.worker:
+ return
+ self.config = config
+ try:
+ workers = int(options.multiprocess_workers)
+ except (TypeError, ValueError):
+ workers = 0
+ if workers:
+ _import_mp()
+ if Process is None:
+ self.enabled = False
+ return
+ # Negative number of workers will cause multiprocessing to hang.
+ # Set the number of workers to the CPU count to avoid this.
+ if workers < 0:
+ try:
+ import multiprocessing
+ workers = multiprocessing.cpu_count()
+ except NotImplementedError:
+ self.enabled = False
+ return
+ self.enabled = True
+ self.config.multiprocess_workers = workers
+ t = float(options.multiprocess_timeout)
+ self.config.multiprocess_timeout = t
+ r = int(options.multiprocess_restartworker)
+ self.config.multiprocess_restartworker = r
+ self.status['active'] = True
+
+ def prepareTestLoader(self, loader):
+ """Remember loader class so MultiProcessTestRunner can instantiate
+ the right loader.
+ """
+ self.loaderClass = loader.__class__
+
+ def prepareTestRunner(self, runner):
+ """Replace test runner with MultiProcessTestRunner.
+ """
+ # replace with our runner class
+ return MultiProcessTestRunner(stream=runner.stream,
+ verbosity=self.config.verbosity,
+ config=self.config,
+ loaderClass=self.loaderClass)
+
+def signalhandler(sig, frame):
+ raise TimedOutException()
+
+class MultiProcessTestRunner(TextTestRunner):
+ waitkilltime = 5.0 # max time to wait to terminate a process that does not
+ # respond to SIGILL
+ def __init__(self, **kw):
+ self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
+ super(MultiProcessTestRunner, self).__init__(**kw)
+
+ def collect(self, test, testQueue, tasks, to_teardown, result):
+ # dispatch and collect results
+ # put indexes only on queue because tests aren't picklable
+ for case in self.nextBatch(test):
+ log.debug("Next batch %s (%s)", case, type(case))
+ if (isinstance(case, nose.case.Test) and
+ isinstance(case.test, failure.Failure)):
+ log.debug("Case is a Failure")
+ case(result) # run here to capture the failure
+ continue
+ # handle shared fixtures
+ if isinstance(case, ContextSuite) and case.context is failure.Failure:
+ log.debug("Case is a Failure")
+ case(result) # run here to capture the failure
+ continue
+ elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
+ log.debug("%s has shared fixtures", case)
+ try:
+ case.setUp()
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ log.debug("%s setup failed", sys.exc_info())
+ result.addError(case, sys.exc_info())
+ else:
+ to_teardown.append(case)
+ if case.factory:
+ ancestors=case.factory.context.get(case, [])
+ for an in ancestors[:2]:
+ #log.debug('reset ancestor %s', an)
+ if getattr(an, '_multiprocess_shared_', False):
+ an._multiprocess_can_split_=True
+ #an._multiprocess_shared_=False
+ self.collect(case, testQueue, tasks, to_teardown, result)
+
+ else:
+ test_addr = self.addtask(testQueue,tasks,case)
+ log.debug("Queued test %s (%s) to %s",
+ len(tasks), test_addr, testQueue)
+
+ def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
+ keyboardCaught = Event()
+ p = Process(target=runner,
+ args=(iworker, testQueue,
+ resultQueue,
+ currentaddr,
+ currentstart,
+ keyboardCaught,
+ shouldStop,
+ self.loaderClass,
+ result.__class__,
+ pickle.dumps(self.config)))
+ p.currentaddr = currentaddr
+ p.currentstart = currentstart
+ p.keyboardCaught = keyboardCaught
+ old = signal.signal(signal.SIGILL, signalhandler)
+ p.start()
+ signal.signal(signal.SIGILL, old)
+ return p
+
+ def run(self, test):
+ """
+ Execute the test (which may be a test suite). If the test is a suite,
+ distribute it out among as many processes as have been configured, at
+ as fine a level as is possible given the context fixtures defined in
+ the suite or any sub-suites.
+
+ """
+ log.debug("%s.run(%s) (%s)", self, test, os.getpid())
+ wrapper = self.config.plugins.prepareTest(test)
+ if wrapper is not None:
+ test = wrapper
+
+ # plugins can decorate or capture the output stream
+ wrapped = self.config.plugins.setOutputStream(self.stream)
+ if wrapped is not None:
+ self.stream = wrapped
+
+ testQueue = Queue()
+ resultQueue = Queue()
+ tasks = []
+ completed = []
+ workers = []
+ to_teardown = []
+ shouldStop = Event()
+
+ result = self._makeResult()
+ start = time.time()
+
+ self.collect(test, testQueue, tasks, to_teardown, result)
+
+ log.debug("Starting %s workers", self.config.multiprocess_workers)
+ for i in range(self.config.multiprocess_workers):
+ p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
+ workers.append(p)
+ log.debug("Started worker process %s", i+1)
+
+ total_tasks = len(tasks)
+ # need to keep track of the next time to check for timeouts in case
+ # more than one process times out at the same time.
+ nexttimeout=self.config.multiprocess_timeout
+ thrownError = None
+
+ try:
+ while tasks:
+ log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
+ len(completed), total_tasks,nexttimeout)
+ try:
+ iworker, addr, newtask_addrs, batch_result = resultQueue.get(
+ timeout=nexttimeout)
+ log.debug('Results received for worker %d, %s, new tasks: %d',
+ iworker,addr,len(newtask_addrs))
+ try:
+ try:
+ tasks.remove(addr)
+ except ValueError:
+ log.warn('worker %s failed to remove from tasks: %s',
+ iworker,addr)
+ total_tasks += len(newtask_addrs)
+ tasks.extend(newtask_addrs)
+ except KeyError:
+ log.debug("Got result for unknown task? %s", addr)
+ log.debug("current: %s",str(list(tasks)[0]))
+ else:
+ completed.append([addr,batch_result])
+ self.consolidate(result, batch_result)
+ if (self.config.stopOnError
+ and not result.wasSuccessful()):
+ # set the stop condition
+ shouldStop.set()
+ break
+ if self.config.multiprocess_restartworker:
+ log.debug('joining worker %s',iworker)
+ # wait for working, but not that important if worker
+ # cannot be joined in fact, for workers that add to
+ # testQueue, they will not terminate until all their
+ # items are read
+ workers[iworker].join(timeout=1)
+ if not shouldStop.is_set() and not testQueue.empty():
+ log.debug('starting new process on worker %s',iworker)
+ workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ except Empty:
+ log.debug("Timed out with %s tasks pending "
+ "(empty testQueue=%r): %s",
+ len(tasks),testQueue.empty(),str(tasks))
+ any_alive = False
+ for iworker, w in enumerate(workers):
+ if w.is_alive():
+ worker_addr = bytes_(w.currentaddr.value,'ascii')
+ timeprocessing = time.time() - w.currentstart.value
+ if ( len(worker_addr) == 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
+ log.debug('worker %d has finished its work item, '
+ 'but is not exiting? do we wait for it?',
+ iworker)
+ else:
+ any_alive = True
+ if (len(worker_addr) > 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
+ log.debug('timed out worker %s: %s',
+ iworker,worker_addr)
+ w.currentaddr.value = bytes_('')
+ # If the process is in C++ code, sending a SIGILL
+ # might not send a python KeybordInterrupt exception
+ # therefore, send multiple signals until an
+ # exception is caught. If this takes too long, then
+ # terminate the process
+ w.keyboardCaught.clear()
+ startkilltime = time.time()
+ while not w.keyboardCaught.is_set() and w.is_alive():
+ if time.time()-startkilltime > self.waitkilltime:
+ # have to terminate...
+ log.error("terminating worker %s",iworker)
+ w.terminate()
+ # there is a small probability that the
+ # terminated process might send a result,
+ # which has to be specially handled or
+ # else processes might get orphaned.
+ workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ break
+ os.kill(w.pid, signal.SIGILL)
+ time.sleep(0.1)
+ if not any_alive and testQueue.empty():
+ log.debug("All workers dead")
+ break
+ nexttimeout=self.config.multiprocess_timeout
+ for w in workers:
+ if w.is_alive() and len(w.currentaddr.value) > 0:
+ timeprocessing = time.time()-w.currentstart.value
+ if timeprocessing <= self.config.multiprocess_timeout:
+ nexttimeout = min(nexttimeout,
+ self.config.multiprocess_timeout-timeprocessing)
+ log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
+
+ except (KeyboardInterrupt, SystemExit), e:
+ log.info('parent received ctrl-c when waiting for test results')
+ thrownError = e
+ #resultQueue.get(False)
+
+ result.addError(test, sys.exc_info())
+
+ try:
+ for case in to_teardown:
+ log.debug("Tearing down shared fixtures for %s", case)
+ try:
+ case.tearDown()
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ result.addError(case, sys.exc_info())
+
+ stop = time.time()
+
+ # first write since can freeze on shutting down processes
+ result.printErrors()
+ result.printSummary(start, stop)
+ self.config.plugins.finalize(result)
+
+ if thrownError is None:
+ log.debug("Tell all workers to stop")
+ for w in workers:
+ if w.is_alive():
+ testQueue.put('STOP', block=False)
+
+ # wait for the workers to end
+ for iworker,worker in enumerate(workers):
+ if worker.is_alive():
+ log.debug('joining worker %s',iworker)
+ worker.join()
+ if worker.is_alive():
+ log.debug('failed to join worker %s',iworker)
+ except (KeyboardInterrupt, SystemExit):
+ log.info('parent received ctrl-c when shutting down: stop all processes')
+ for worker in workers:
+ if worker.is_alive():
+ worker.terminate()
+
+ if thrownError: raise thrownError
+ else: raise
+
+ return result
+
+ def addtask(testQueue,tasks,case):
+ arg = None
+ if isinstance(case,nose.case.Test) and hasattr(case.test,'arg'):
+ # this removes the top level descriptor and allows real function
+ # name to be returned
+ case.test.descriptor = None
+ arg = case.test.arg
+ test_addr = MultiProcessTestRunner.address(case)
+ testQueue.put((test_addr,arg), block=False)
+ if arg is not None:
+ test_addr += str(arg)
+ if tasks is not None:
+ tasks.append(test_addr)
+ return test_addr
+ addtask = staticmethod(addtask)
+
+ def address(case):
+ if hasattr(case, 'address'):
+ file, mod, call = case.address()
+ elif hasattr(case, 'context'):
+ file, mod, call = test_address(case.context)
+ else:
+ raise Exception("Unable to convert %s to address" % case)
+ parts = []
+ if file is None:
+ if mod is None:
+ raise Exception("Unaddressable case %s" % case)
+ else:
+ parts.append(mod)
+ else:
+ # strip __init__.py(c) from end of file part
+ # if present, having it there confuses loader
+ dirname, basename = os.path.split(file)
+ if basename.startswith('__init__'):
+ file = dirname
+ parts.append(file)
+ if call is not None:
+ parts.append(call)
+ return ':'.join(map(str, parts))
+ address = staticmethod(address)
+
+ def nextBatch(self, test):
+ # allows tests or suites to mark themselves as not safe
+ # for multiprocess execution
+ if hasattr(test, 'context'):
+ if not getattr(test.context, '_multiprocess_', True):
+ return
+
+ if ((isinstance(test, ContextSuite)
+ and test.hasFixtures(self.checkCanSplit))
+ or not getattr(test, 'can_split', True)
+ or not isinstance(test, unittest.TestSuite)):
+ # regular test case, or a suite with context fixtures
+
+ # special case: when run like nosetests path/to/module.py
+ # the top-level suite has only one item, and it shares
+ # the same context as that item. In that case, we want the
+ # item, not the top-level suite
+ if isinstance(test, ContextSuite):
+ contained = list(test)
+ if (len(contained) == 1
+ and getattr(contained[0],
+ 'context', None) == test.context):
+ test = contained[0]
+ yield test
+ else:
+ # Suite is without fixtures at this level; but it may have
+ # fixtures at any deeper level, so we need to examine it all
+ # the way down to the case level
+ for case in test:
+ for batch in self.nextBatch(case):
+ yield batch
+
+ def checkCanSplit(context, fixt):
+ """
+ Callback that we use to check whether the fixtures found in a
+ context or ancestor are ones we care about.
+
+ Contexts can tell us that their fixtures are reentrant by setting
+ _multiprocess_can_split_. So if we see that, we return False to
+ disregard those fixtures.
+ """
+ if not fixt:
+ return False
+ if getattr(context, '_multiprocess_can_split_', False):
+ return False
+ return True
+ checkCanSplit = staticmethod(checkCanSplit)
+
+ def sharedFixtures(self, case):
+ context = getattr(case, 'context', None)
+ if not context:
+ return False
+ return getattr(context, '_multiprocess_shared_', False)
+
+ def consolidate(self, result, batch_result):
+ log.debug("batch result is %s" , batch_result)
+ try:
+ output, testsRun, failures, errors, errorClasses = batch_result
+ except ValueError:
+ log.debug("result in unexpected format %s", batch_result)
+ failure.Failure(*sys.exc_info())(result)
+ return
+ self.stream.write(output)
+ result.testsRun += testsRun
+ result.failures.extend(failures)
+ result.errors.extend(errors)
+ for key, (storage, label, isfail) in errorClasses.items():
+ if key not in result.errorClasses:
+ # Ordinarily storage is result attribute
+ # but it's only processed through the errorClasses
+ # dict, so it's ok to fake it here
+ result.errorClasses[key] = ([], label, isfail)
+ mystorage, _junk, _junk = result.errorClasses[key]
+ mystorage.extend(storage)
+ log.debug("Ran %s tests (total: %s)", testsRun, result.testsRun)
+
+
+def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+ keyboardCaught, shouldStop, loaderClass, resultClass, config):
+ try:
+ try:
+ return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+ keyboardCaught, shouldStop, loaderClass, resultClass, config)
+ except KeyboardInterrupt:
+ log.debug('Worker %s keyboard interrupt, stopping',ix)
+ except Empty:
+ log.debug("Worker %s timed out waiting for tasks", ix)
+
+def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+ keyboardCaught, shouldStop, loaderClass, resultClass, config):
+
+ config = pickle.loads(config)
+ dummy_parser = config.parserClass()
+ if _instantiate_plugins is not None:
+ for pluginclass in _instantiate_plugins:
+ plugin = pluginclass()
+ plugin.addOptions(dummy_parser,{})
+ config.plugins.addPlugin(plugin)
+ config.plugins.configure(config.options,config)
+ config.plugins.begin()
+ log.debug("Worker %s executing, pid=%d", ix,os.getpid())
+ loader = loaderClass(config=config)
+ loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
+
+ def get():
+ return testQueue.get(timeout=config.multiprocess_timeout)
+
+ def makeResult():
+ stream = _WritelnDecorator(StringIO())
+ result = resultClass(stream, descriptions=1,
+ verbosity=config.verbosity,
+ config=config)
+ plug_result = config.plugins.prepareTestResult(result)
+ if plug_result:
+ return plug_result
+ return result
+
+ def batch(result):
+ failures = [(TestLet(c), err) for c, err in result.failures]
+ errors = [(TestLet(c), err) for c, err in result.errors]
+ errorClasses = {}
+ for key, (storage, label, isfail) in result.errorClasses.items():
+ errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
+ label, isfail)
+ return (
+ result.stream.getvalue(),
+ result.testsRun,
+ failures,
+ errors,
+ errorClasses)
+ for test_addr, arg in iter(get, 'STOP'):
+ if shouldStop.is_set():
+ log.exception('Worker %d STOPPED',ix)
+ break
+ result = makeResult()
+ test = loader.loadTestsFromNames([test_addr])
+ test.testQueue = testQueue
+ test.tasks = []
+ test.arg = arg
+ log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
+ try:
+ if arg is not None:
+ test_addr = test_addr + str(arg)
+ currentaddr.value = bytes_(test_addr)
+ currentstart.value = time.time()
+ test(result)
+ currentaddr.value = bytes_('')
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ except KeyboardInterrupt, e: #TimedOutException:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ keyboardCaught.set()
+ if len(currentaddr.value):
+ if timeout:
+ msg = 'Worker %s timed out, failing current test %s'
+ else:
+ msg = 'Worker %s keyboard interrupt, failing current test %s'
+ log.exception(msg,ix,test_addr)
+ currentaddr.value = bytes_('')
+ failure.Failure(*sys.exc_info())(result)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ else:
+ if timeout:
+ msg = 'Worker %s test %s timed out'
+ else:
+ msg = 'Worker %s test %s keyboard interrupt'
+ log.debug(msg,ix,test_addr)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ if not timeout:
+ raise
+ except SystemExit:
+ currentaddr.value = bytes_('')
+ log.exception('Worker %s system exit',ix)
+ raise
+ except:
+ currentaddr.value = bytes_('')
+ log.exception("Worker %s error running test or returning "
+ "results",ix)
+ failure.Failure(*sys.exc_info())(result)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ if config.multiprocess_restartworker:
+ break
+ log.debug("Worker %s ending", ix)
+
+
+class NoSharedFixtureContextSuite(ContextSuite):
+ """
+ Context suite that never fires shared fixtures.
+
+ When a context sets _multiprocess_shared_, fixtures in that context
+ are executed by the main process. Using this suite class prevents them
+ from executing in the runner process as well.
+
+ """
+ testQueue = None
+ tasks = None
+ arg = None
+ def setupContext(self, context):
+ if getattr(context, '_multiprocess_shared_', False):
+ return
+ super(NoSharedFixtureContextSuite, self).setupContext(context)
+
+ def teardownContext(self, context):
+ if getattr(context, '_multiprocess_shared_', False):
+ return
+ super(NoSharedFixtureContextSuite, self).teardownContext(context)
+ def run(self, result):
+ """Run tests in suite inside of suite fixtures.
+ """
+ # proxy the result for myself
+ log.debug("suite %s (%s) run called, tests: %s",
+ id(self), self, self._tests)
+ if self.resultProxy:
+ result, orig = self.resultProxy(result, self), result
+ else:
+ result, orig = result, result
+ try:
+ #log.debug('setUp for %s', id(self));
+ self.setUp()
+ except KeyboardInterrupt:
+ raise
+ except:
+ self.error_context = 'setup'
+ result.addError(self, self._exc_info())
+ return
+ try:
+ for test in self._tests:
+ if (isinstance(test,nose.case.Test)
+ and self.arg is not None):
+ test.test.arg = self.arg
+ else:
+ test.arg = self.arg
+ test.testQueue = self.testQueue
+ test.tasks = self.tasks
+ if result.shouldStop:
+ log.debug("stopping")
+ break
+ # each nose.case.Test will create its own result proxy
+ # so the cases need the original result, to avoid proxy
+ # chains
+ #log.debug('running test %s in suite %s', test, self);
+ try:
+ test(orig)
+ except KeyboardInterrupt, e:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ msg = 'Timeout when running test %s in suite %s'
+ else:
+ msg = 'KeyboardInterrupt when running test %s in suite %s'
+ log.debug(msg, test, self)
+ err = (TimedOutException,TimedOutException(str(test)),
+ sys.exc_info()[2])
+ test.config.plugins.addError(test,err)
+ orig.addError(test,err)
+ if not timeout:
+ raise
+ finally:
+ self.has_run = True
+ try:
+ #log.debug('tearDown for %s', id(self));
+ self.tearDown()
+ except KeyboardInterrupt:
+ raise
+ except:
+ self.error_context = 'teardown'
+ result.addError(self, self._exc_info())