diff options
Diffstat (limited to 'scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py')
-rwxr-xr-x | scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py | 835 |
1 files changed, 0 insertions, 835 deletions
diff --git a/scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py b/scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py deleted file mode 100755 index 2cae744a..00000000 --- a/scripts/external_libs/nose-1.3.4/nose/plugins/multiprocess.py +++ /dev/null @@ -1,835 +0,0 @@ -""" -Overview -======== - -The multiprocess plugin enables you to distribute your test run among a set of -worker processes that run tests in parallel. This can speed up CPU-bound test -runs (as long as the number of work processeses is around the number of -processors or cores available), but is mainly useful for IO-bound tests that -spend most of their time waiting for data to arrive from someplace else. - -.. note :: - - See :doc:`../doc_tests/test_multiprocess/multiprocess` for - additional documentation and examples. Use of this plugin on python - 2.5 or earlier requires the multiprocessing_ module, also available - from PyPI. - -.. _multiprocessing : http://code.google.com/p/python-multiprocessing/ - -How tests are distributed -========================= - -The ideal case would be to dispatch each test to a worker process -separately. This ideal is not attainable in all cases, however, because many -test suites depend on context (class, module or package) fixtures. - -The plugin can't know (unless you tell it -- see below!) if a context fixture -can be called many times concurrently (is re-entrant), or if it can be shared -among tests running in different processes. Therefore, if a context has -fixtures, the default behavior is to dispatch the entire suite to a worker as -a unit. - -Controlling distribution -^^^^^^^^^^^^^^^^^^^^^^^^ - -There are two context-level variables that you can use to control this default -behavior. - -If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True`` -in the context, and the plugin will dispatch tests in suites bound to that -context as if the context had no fixtures. This means that the fixtures will -execute concurrently and multiple times, typically once per test. - -If a context's fixtures can be shared by tests running in different processes --- such as a package-level fixture that starts an external http server or -initializes a shared database -- then set ``_multiprocess_shared_ = True`` in -the context. These fixtures will then execute in the primary nose process, and -tests in those contexts will be individually dispatched to run in parallel. - -How results are collected and reported -====================================== - -As each test or suite executes in a worker process, results (failures, errors, -and specially handled exceptions like SkipTest) are collected in that -process. When the worker process finishes, it returns results to the main -nose process. There, any progress output is printed (dots!), and the -results from the test run are combined into a consolidated result -set. When results have been received for all dispatched tests, or all -workers have died, the result summary is output as normal. - -Beware! -======= - -Not all test suites will benefit from, or even operate correctly using, this -plugin. For example, CPU-bound tests will run more slowly if you don't have -multiple processors. There are also some differences in plugin -interactions and behaviors due to the way in which tests are dispatched and -loaded. In general, test loading under this plugin operates as if it were -always in directed mode instead of discovered mode. For instance, doctests -in test modules will always be found when using this plugin with the doctest -plugin. - -But the biggest issue you will face is probably concurrency. Unless you -have kept your tests as religiously pure unit tests, with no side-effects, no -ordering issues, and no external dependencies, chances are you will experience -odd, intermittent and unexplainable failures and errors when using this -plugin. This doesn't necessarily mean the plugin is broken; it may mean that -your test suite is not safe for concurrency. - -New Features in 1.1.0 -===================== - -* functions generated by test generators are now added to the worker queue - making them multi-threaded. -* fixed timeout functionality, now functions will be terminated with a - TimedOutException exception when they exceed their execution time. The - worker processes are not terminated. -* added ``--process-restartworker`` option to restart workers once they are - done, this helps control memory usage. Sometimes memory leaks can accumulate - making long runs very difficult. -* added global _instantiate_plugins to configure which plugins are started - on the worker processes. - -""" - -import logging -import os -import sys -import time -import traceback -import unittest -import pickle -import signal -import nose.case -from nose.core import TextTestRunner -from nose import failure -from nose import loader -from nose.plugins.base import Plugin -from nose.pyversion import bytes_ -from nose.result import TextTestResult -from nose.suite import ContextSuite -from nose.util import test_address -try: - # 2.7+ - from unittest.runner import _WritelnDecorator -except ImportError: - from unittest import _WritelnDecorator -from Queue import Empty -from warnings import warn -try: - from cStringIO import StringIO -except ImportError: - import StringIO - -# this is a list of plugin classes that will be checked for and created inside -# each worker process -_instantiate_plugins = None - -log = logging.getLogger(__name__) - -Process = Queue = Pool = Event = Value = Array = None - -# have to inherit KeyboardInterrupt to it will interrupt process properly -class TimedOutException(KeyboardInterrupt): - def __init__(self, value = "Timed Out"): - self.value = value - def __str__(self): - return repr(self.value) - -def _import_mp(): - global Process, Queue, Pool, Event, Value, Array - try: - from multiprocessing import Manager, Process - #prevent the server process created in the manager which holds Python - #objects and allows other processes to manipulate them using proxies - #to interrupt on SIGINT (keyboardinterrupt) so that the communication - #channel between subprocesses and main process is still usable after - #ctrl+C is received in the main process. - old=signal.signal(signal.SIGINT, signal.SIG_IGN) - m = Manager() - #reset it back so main process will receive a KeyboardInterrupt - #exception on ctrl+c - signal.signal(signal.SIGINT, old) - Queue, Pool, Event, Value, Array = ( - m.Queue, m.Pool, m.Event, m.Value, m.Array - ) - except ImportError: - warn("multiprocessing module is not available, multiprocess plugin " - "cannot be used", RuntimeWarning) - - -class TestLet: - def __init__(self, case): - try: - self._id = case.id() - except AttributeError: - pass - self._short_description = case.shortDescription() - self._str = str(case) - - def id(self): - return self._id - - def shortDescription(self): - return self._short_description - - def __str__(self): - return self._str - -class MultiProcess(Plugin): - """ - Run tests in multiple processes. Requires processing module. - """ - score = 1000 - status = {} - - def options(self, parser, env): - """ - Register command-line options. - """ - parser.add_option("--processes", action="store", - default=env.get('NOSE_PROCESSES', 0), - dest="multiprocess_workers", - metavar="NUM", - help="Spread test run among this many processes. " - "Set a number equal to the number of processors " - "or cores in your machine for best results. " - "Pass a negative number to have the number of " - "processes automatically set to the number of " - "cores. Passing 0 means to disable parallel " - "testing. Default is 0 unless NOSE_PROCESSES is " - "set. " - "[NOSE_PROCESSES]") - parser.add_option("--process-timeout", action="store", - default=env.get('NOSE_PROCESS_TIMEOUT', 10), - dest="multiprocess_timeout", - metavar="SECONDS", - help="Set timeout for return of results from each " - "test runner process. Default is 10. " - "[NOSE_PROCESS_TIMEOUT]") - parser.add_option("--process-restartworker", action="store_true", - default=env.get('NOSE_PROCESS_RESTARTWORKER', False), - dest="multiprocess_restartworker", - help="If set, will restart each worker process once" - " their tests are done, this helps control memory " - "leaks from killing the system. " - "[NOSE_PROCESS_RESTARTWORKER]") - - def configure(self, options, config): - """ - Configure plugin. - """ - try: - self.status.pop('active') - except KeyError: - pass - if not hasattr(options, 'multiprocess_workers'): - self.enabled = False - return - # don't start inside of a worker process - if config.worker: - return - self.config = config - try: - workers = int(options.multiprocess_workers) - except (TypeError, ValueError): - workers = 0 - if workers: - _import_mp() - if Process is None: - self.enabled = False - return - # Negative number of workers will cause multiprocessing to hang. - # Set the number of workers to the CPU count to avoid this. - if workers < 0: - try: - import multiprocessing - workers = multiprocessing.cpu_count() - except NotImplementedError: - self.enabled = False - return - self.enabled = True - self.config.multiprocess_workers = workers - t = float(options.multiprocess_timeout) - self.config.multiprocess_timeout = t - r = int(options.multiprocess_restartworker) - self.config.multiprocess_restartworker = r - self.status['active'] = True - - def prepareTestLoader(self, loader): - """Remember loader class so MultiProcessTestRunner can instantiate - the right loader. - """ - self.loaderClass = loader.__class__ - - def prepareTestRunner(self, runner): - """Replace test runner with MultiProcessTestRunner. - """ - # replace with our runner class - return MultiProcessTestRunner(stream=runner.stream, - verbosity=self.config.verbosity, - config=self.config, - loaderClass=self.loaderClass) - -def signalhandler(sig, frame): - raise TimedOutException() - -class MultiProcessTestRunner(TextTestRunner): - waitkilltime = 5.0 # max time to wait to terminate a process that does not - # respond to SIGILL - def __init__(self, **kw): - self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader) - super(MultiProcessTestRunner, self).__init__(**kw) - - def collect(self, test, testQueue, tasks, to_teardown, result): - # dispatch and collect results - # put indexes only on queue because tests aren't picklable - for case in self.nextBatch(test): - log.debug("Next batch %s (%s)", case, type(case)) - if (isinstance(case, nose.case.Test) and - isinstance(case.test, failure.Failure)): - log.debug("Case is a Failure") - case(result) # run here to capture the failure - continue - # handle shared fixtures - if isinstance(case, ContextSuite) and case.context is failure.Failure: - log.debug("Case is a Failure") - case(result) # run here to capture the failure - continue - elif isinstance(case, ContextSuite) and self.sharedFixtures(case): - log.debug("%s has shared fixtures", case) - try: - case.setUp() - except (KeyboardInterrupt, SystemExit): - raise - except: - log.debug("%s setup failed", sys.exc_info()) - result.addError(case, sys.exc_info()) - else: - to_teardown.append(case) - if case.factory: - ancestors=case.factory.context.get(case, []) - for an in ancestors[:2]: - #log.debug('reset ancestor %s', an) - if getattr(an, '_multiprocess_shared_', False): - an._multiprocess_can_split_=True - #an._multiprocess_shared_=False - self.collect(case, testQueue, tasks, to_teardown, result) - - else: - test_addr = self.addtask(testQueue,tasks,case) - log.debug("Queued test %s (%s) to %s", - len(tasks), test_addr, testQueue) - - def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result): - currentaddr = Value('c',bytes_('')) - currentstart = Value('d',time.time()) - keyboardCaught = Event() - p = Process(target=runner, - args=(iworker, testQueue, - resultQueue, - currentaddr, - currentstart, - keyboardCaught, - shouldStop, - self.loaderClass, - result.__class__, - pickle.dumps(self.config))) - p.currentaddr = currentaddr - p.currentstart = currentstart - p.keyboardCaught = keyboardCaught - old = signal.signal(signal.SIGILL, signalhandler) - p.start() - signal.signal(signal.SIGILL, old) - return p - - def run(self, test): - """ - Execute the test (which may be a test suite). If the test is a suite, - distribute it out among as many processes as have been configured, at - as fine a level as is possible given the context fixtures defined in - the suite or any sub-suites. - - """ - log.debug("%s.run(%s) (%s)", self, test, os.getpid()) - wrapper = self.config.plugins.prepareTest(test) - if wrapper is not None: - test = wrapper - - # plugins can decorate or capture the output stream - wrapped = self.config.plugins.setOutputStream(self.stream) - if wrapped is not None: - self.stream = wrapped - - testQueue = Queue() - resultQueue = Queue() - tasks = [] - completed = [] - workers = [] - to_teardown = [] - shouldStop = Event() - - result = self._makeResult() - start = time.time() - - self.collect(test, testQueue, tasks, to_teardown, result) - - log.debug("Starting %s workers", self.config.multiprocess_workers) - for i in range(self.config.multiprocess_workers): - p = self.startProcess(i, testQueue, resultQueue, shouldStop, result) - workers.append(p) - log.debug("Started worker process %s", i+1) - - total_tasks = len(tasks) - # need to keep track of the next time to check for timeouts in case - # more than one process times out at the same time. - nexttimeout=self.config.multiprocess_timeout - thrownError = None - - try: - while tasks: - log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs", - len(completed), total_tasks,nexttimeout) - try: - iworker, addr, newtask_addrs, batch_result = resultQueue.get( - timeout=nexttimeout) - log.debug('Results received for worker %d, %s, new tasks: %d', - iworker,addr,len(newtask_addrs)) - try: - try: - tasks.remove(addr) - except ValueError: - log.warn('worker %s failed to remove from tasks: %s', - iworker,addr) - total_tasks += len(newtask_addrs) - tasks.extend(newtask_addrs) - except KeyError: - log.debug("Got result for unknown task? %s", addr) - log.debug("current: %s",str(list(tasks)[0])) - else: - completed.append([addr,batch_result]) - self.consolidate(result, batch_result) - if (self.config.stopOnError - and not result.wasSuccessful()): - # set the stop condition - shouldStop.set() - break - if self.config.multiprocess_restartworker: - log.debug('joining worker %s',iworker) - # wait for working, but not that important if worker - # cannot be joined in fact, for workers that add to - # testQueue, they will not terminate until all their - # items are read - workers[iworker].join(timeout=1) - if not shouldStop.is_set() and not testQueue.empty(): - log.debug('starting new process on worker %s',iworker) - workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result) - except Empty: - log.debug("Timed out with %s tasks pending " - "(empty testQueue=%r): %s", - len(tasks),testQueue.empty(),str(tasks)) - any_alive = False - for iworker, w in enumerate(workers): - if w.is_alive(): - worker_addr = bytes_(w.currentaddr.value,'ascii') - timeprocessing = time.time() - w.currentstart.value - if ( len(worker_addr) == 0 - and timeprocessing > self.config.multiprocess_timeout-0.1): - log.debug('worker %d has finished its work item, ' - 'but is not exiting? do we wait for it?', - iworker) - else: - any_alive = True - if (len(worker_addr) > 0 - and timeprocessing > self.config.multiprocess_timeout-0.1): - log.debug('timed out worker %s: %s', - iworker,worker_addr) - w.currentaddr.value = bytes_('') - # If the process is in C++ code, sending a SIGILL - # might not send a python KeybordInterrupt exception - # therefore, send multiple signals until an - # exception is caught. If this takes too long, then - # terminate the process - w.keyboardCaught.clear() - startkilltime = time.time() - while not w.keyboardCaught.is_set() and w.is_alive(): - if time.time()-startkilltime > self.waitkilltime: - # have to terminate... - log.error("terminating worker %s",iworker) - w.terminate() - # there is a small probability that the - # terminated process might send a result, - # which has to be specially handled or - # else processes might get orphaned. - workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result) - break - os.kill(w.pid, signal.SIGILL) - time.sleep(0.1) - if not any_alive and testQueue.empty(): - log.debug("All workers dead") - break - nexttimeout=self.config.multiprocess_timeout - for w in workers: - if w.is_alive() and len(w.currentaddr.value) > 0: - timeprocessing = time.time()-w.currentstart.value - if timeprocessing <= self.config.multiprocess_timeout: - nexttimeout = min(nexttimeout, - self.config.multiprocess_timeout-timeprocessing) - log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks)) - - except (KeyboardInterrupt, SystemExit), e: - log.info('parent received ctrl-c when waiting for test results') - thrownError = e - #resultQueue.get(False) - - result.addError(test, sys.exc_info()) - - try: - for case in to_teardown: - log.debug("Tearing down shared fixtures for %s", case) - try: - case.tearDown() - except (KeyboardInterrupt, SystemExit): - raise - except: - result.addError(case, sys.exc_info()) - - stop = time.time() - - # first write since can freeze on shutting down processes - result.printErrors() - result.printSummary(start, stop) - self.config.plugins.finalize(result) - - if thrownError is None: - log.debug("Tell all workers to stop") - for w in workers: - if w.is_alive(): - testQueue.put('STOP', block=False) - - # wait for the workers to end - for iworker,worker in enumerate(workers): - if worker.is_alive(): - log.debug('joining worker %s',iworker) - worker.join() - if worker.is_alive(): - log.debug('failed to join worker %s',iworker) - except (KeyboardInterrupt, SystemExit): - log.info('parent received ctrl-c when shutting down: stop all processes') - for worker in workers: - if worker.is_alive(): - worker.terminate() - - if thrownError: raise thrownError - else: raise - - return result - - def addtask(testQueue,tasks,case): - arg = None - if isinstance(case,nose.case.Test) and hasattr(case.test,'arg'): - # this removes the top level descriptor and allows real function - # name to be returned - case.test.descriptor = None - arg = case.test.arg - test_addr = MultiProcessTestRunner.address(case) - testQueue.put((test_addr,arg), block=False) - if arg is not None: - test_addr += str(arg) - if tasks is not None: - tasks.append(test_addr) - return test_addr - addtask = staticmethod(addtask) - - def address(case): - if hasattr(case, 'address'): - file, mod, call = case.address() - elif hasattr(case, 'context'): - file, mod, call = test_address(case.context) - else: - raise Exception("Unable to convert %s to address" % case) - parts = [] - if file is None: - if mod is None: - raise Exception("Unaddressable case %s" % case) - else: - parts.append(mod) - else: - # strip __init__.py(c) from end of file part - # if present, having it there confuses loader - dirname, basename = os.path.split(file) - if basename.startswith('__init__'): - file = dirname - parts.append(file) - if call is not None: - parts.append(call) - return ':'.join(map(str, parts)) - address = staticmethod(address) - - def nextBatch(self, test): - # allows tests or suites to mark themselves as not safe - # for multiprocess execution - if hasattr(test, 'context'): - if not getattr(test.context, '_multiprocess_', True): - return - - if ((isinstance(test, ContextSuite) - and test.hasFixtures(self.checkCanSplit)) - or not getattr(test, 'can_split', True) - or not isinstance(test, unittest.TestSuite)): - # regular test case, or a suite with context fixtures - - # special case: when run like nosetests path/to/module.py - # the top-level suite has only one item, and it shares - # the same context as that item. In that case, we want the - # item, not the top-level suite - if isinstance(test, ContextSuite): - contained = list(test) - if (len(contained) == 1 - and getattr(contained[0], - 'context', None) == test.context): - test = contained[0] - yield test - else: - # Suite is without fixtures at this level; but it may have - # fixtures at any deeper level, so we need to examine it all - # the way down to the case level - for case in test: - for batch in self.nextBatch(case): - yield batch - - def checkCanSplit(context, fixt): - """ - Callback that we use to check whether the fixtures found in a - context or ancestor are ones we care about. - - Contexts can tell us that their fixtures are reentrant by setting - _multiprocess_can_split_. So if we see that, we return False to - disregard those fixtures. - """ - if not fixt: - return False - if getattr(context, '_multiprocess_can_split_', False): - return False - return True - checkCanSplit = staticmethod(checkCanSplit) - - def sharedFixtures(self, case): - context = getattr(case, 'context', None) - if not context: - return False - return getattr(context, '_multiprocess_shared_', False) - - def consolidate(self, result, batch_result): - log.debug("batch result is %s" , batch_result) - try: - output, testsRun, failures, errors, errorClasses = batch_result - except ValueError: - log.debug("result in unexpected format %s", batch_result) - failure.Failure(*sys.exc_info())(result) - return - self.stream.write(output) - result.testsRun += testsRun - result.failures.extend(failures) - result.errors.extend(errors) - for key, (storage, label, isfail) in errorClasses.items(): - if key not in result.errorClasses: - # Ordinarily storage is result attribute - # but it's only processed through the errorClasses - # dict, so it's ok to fake it here - result.errorClasses[key] = ([], label, isfail) - mystorage, _junk, _junk = result.errorClasses[key] - mystorage.extend(storage) - log.debug("Ran %s tests (total: %s)", testsRun, result.testsRun) - - -def runner(ix, testQueue, resultQueue, currentaddr, currentstart, - keyboardCaught, shouldStop, loaderClass, resultClass, config): - try: - try: - return __runner(ix, testQueue, resultQueue, currentaddr, currentstart, - keyboardCaught, shouldStop, loaderClass, resultClass, config) - except KeyboardInterrupt: - log.debug('Worker %s keyboard interrupt, stopping',ix) - except Empty: - log.debug("Worker %s timed out waiting for tasks", ix) - -def __runner(ix, testQueue, resultQueue, currentaddr, currentstart, - keyboardCaught, shouldStop, loaderClass, resultClass, config): - - config = pickle.loads(config) - dummy_parser = config.parserClass() - if _instantiate_plugins is not None: - for pluginclass in _instantiate_plugins: - plugin = pluginclass() - plugin.addOptions(dummy_parser,{}) - config.plugins.addPlugin(plugin) - config.plugins.configure(config.options,config) - config.plugins.begin() - log.debug("Worker %s executing, pid=%d", ix,os.getpid()) - loader = loaderClass(config=config) - loader.suiteClass.suiteClass = NoSharedFixtureContextSuite - - def get(): - return testQueue.get(timeout=config.multiprocess_timeout) - - def makeResult(): - stream = _WritelnDecorator(StringIO()) - result = resultClass(stream, descriptions=1, - verbosity=config.verbosity, - config=config) - plug_result = config.plugins.prepareTestResult(result) - if plug_result: - return plug_result - return result - - def batch(result): - failures = [(TestLet(c), err) for c, err in result.failures] - errors = [(TestLet(c), err) for c, err in result.errors] - errorClasses = {} - for key, (storage, label, isfail) in result.errorClasses.items(): - errorClasses[key] = ([(TestLet(c), err) for c, err in storage], - label, isfail) - return ( - result.stream.getvalue(), - result.testsRun, - failures, - errors, - errorClasses) - for test_addr, arg in iter(get, 'STOP'): - if shouldStop.is_set(): - log.exception('Worker %d STOPPED',ix) - break - result = makeResult() - test = loader.loadTestsFromNames([test_addr]) - test.testQueue = testQueue - test.tasks = [] - test.arg = arg - log.debug("Worker %s Test is %s (%s)", ix, test_addr, test) - try: - if arg is not None: - test_addr = test_addr + str(arg) - currentaddr.value = bytes_(test_addr) - currentstart.value = time.time() - test(result) - currentaddr.value = bytes_('') - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - except KeyboardInterrupt, e: #TimedOutException: - timeout = isinstance(e, TimedOutException) - if timeout: - keyboardCaught.set() - if len(currentaddr.value): - if timeout: - msg = 'Worker %s timed out, failing current test %s' - else: - msg = 'Worker %s keyboard interrupt, failing current test %s' - log.exception(msg,ix,test_addr) - currentaddr.value = bytes_('') - failure.Failure(*sys.exc_info())(result) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - else: - if timeout: - msg = 'Worker %s test %s timed out' - else: - msg = 'Worker %s test %s keyboard interrupt' - log.debug(msg,ix,test_addr) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - if not timeout: - raise - except SystemExit: - currentaddr.value = bytes_('') - log.exception('Worker %s system exit',ix) - raise - except: - currentaddr.value = bytes_('') - log.exception("Worker %s error running test or returning " - "results",ix) - failure.Failure(*sys.exc_info())(result) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - if config.multiprocess_restartworker: - break - log.debug("Worker %s ending", ix) - - -class NoSharedFixtureContextSuite(ContextSuite): - """ - Context suite that never fires shared fixtures. - - When a context sets _multiprocess_shared_, fixtures in that context - are executed by the main process. Using this suite class prevents them - from executing in the runner process as well. - - """ - testQueue = None - tasks = None - arg = None - def setupContext(self, context): - if getattr(context, '_multiprocess_shared_', False): - return - super(NoSharedFixtureContextSuite, self).setupContext(context) - - def teardownContext(self, context): - if getattr(context, '_multiprocess_shared_', False): - return - super(NoSharedFixtureContextSuite, self).teardownContext(context) - def run(self, result): - """Run tests in suite inside of suite fixtures. - """ - # proxy the result for myself - log.debug("suite %s (%s) run called, tests: %s", - id(self), self, self._tests) - if self.resultProxy: - result, orig = self.resultProxy(result, self), result - else: - result, orig = result, result - try: - #log.debug('setUp for %s', id(self)); - self.setUp() - except KeyboardInterrupt: - raise - except: - self.error_context = 'setup' - result.addError(self, self._exc_info()) - return - try: - for test in self._tests: - if (isinstance(test,nose.case.Test) - and self.arg is not None): - test.test.arg = self.arg - else: - test.arg = self.arg - test.testQueue = self.testQueue - test.tasks = self.tasks - if result.shouldStop: - log.debug("stopping") - break - # each nose.case.Test will create its own result proxy - # so the cases need the original result, to avoid proxy - # chains - #log.debug('running test %s in suite %s', test, self); - try: - test(orig) - except KeyboardInterrupt, e: - timeout = isinstance(e, TimedOutException) - if timeout: - msg = 'Timeout when running test %s in suite %s' - else: - msg = 'KeyboardInterrupt when running test %s in suite %s' - log.debug(msg, test, self) - err = (TimedOutException,TimedOutException(str(test)), - sys.exc_info()[2]) - test.config.plugins.addError(test,err) - orig.addError(test,err) - if not timeout: - raise - finally: - self.has_run = True - try: - #log.debug('tearDown for %s', id(self)); - self.tearDown() - except KeyboardInterrupt: - raise - except: - self.error_context = 'teardown' - result.addError(self, self._exc_info()) |