# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Test the interaction between trial and errors logged during test run. """ import time from twisted.internet import reactor, task from twisted.python import failure, log from twisted.trial import _synctest, reporter, unittest def makeFailure(): """ Return a new, realistic failure. """ try: 1 / 0 except ZeroDivisionError: f = failure.Failure() return f class Mask: """ Hide C{MockTest}s from Trial's automatic test finder. """ class FailureLoggingMixin: def test_silent(self): """ Don't log any errors. """ def test_single(self): """ Log a single error. """ log.err(makeFailure()) def test_double(self): """ Log two errors. """ log.err(makeFailure()) log.err(makeFailure()) def test_singleThenFail(self): """ Log a single error, then fail. """ log.err(makeFailure()) 1 + None class SynchronousFailureLogging(FailureLoggingMixin, unittest.SynchronousTestCase): pass class AsynchronousFailureLogging(FailureLoggingMixin, unittest.TestCase): def test_inCallback(self): """ Log an error in an asynchronous callback. """ return task.deferLater(reactor, 0, lambda: log.err(makeFailure())) class ObserverTests(unittest.SynchronousTestCase): """ Tests for L{_synctest._LogObserver}, a helper for the implementation of L{SynchronousTestCase.flushLoggedErrors}. """ def setUp(self): self.result = reporter.TestResult() self.observer = _synctest._LogObserver() def test_msg(self): """ Test that a standard log message doesn't go anywhere near the result. """ self.observer.gotEvent( { "message": ("some message",), "time": time.time(), "isError": 0, "system": "-", } ) self.assertEqual(self.observer.getErrors(), []) def test_error(self): """ Test that an observed error gets added to the result """ f = makeFailure() self.observer.gotEvent( { "message": (), "time": time.time(), "isError": 1, "system": "-", "failure": f, "why": None, } ) self.assertEqual(self.observer.getErrors(), [f]) def test_flush(self): """ Check that flushing the observer with no args removes all errors. """ self.test_error() flushed = self.observer.flushErrors() self.assertEqual(self.observer.getErrors(), []) self.assertEqual(len(flushed), 1) self.assertTrue(flushed[0].check(ZeroDivisionError)) def _makeRuntimeFailure(self): return failure.Failure(RuntimeError("test error")) def test_flushByType(self): """ Check that flushing the observer remove all failures of the given type. """ self.test_error() # log a ZeroDivisionError to the observer f = self._makeRuntimeFailure() self.observer.gotEvent( dict( message=(), time=time.time(), isError=1, system="-", failure=f, why=None ) ) flushed = self.observer.flushErrors(ZeroDivisionError) self.assertEqual(self.observer.getErrors(), [f]) self.assertEqual(len(flushed), 1) self.assertTrue(flushed[0].check(ZeroDivisionError)) def test_ignoreErrors(self): """ Check that C{_ignoreErrors} actually causes errors to be ignored. """ self.observer._ignoreErrors(ZeroDivisionError) f = makeFailure() self.observer.gotEvent( { "message": (), "time": time.time(), "isError": 1, "system": "-", "failure": f, "why": None, } ) self.assertEqual(self.observer.getErrors(), []) def test_clearIgnores(self): """ Check that C{_clearIgnores} ensures that previously ignored errors get captured. """ self.observer._ignoreErrors(ZeroDivisionError) self.observer._clearIgnores() f = makeFailure() self.observer.gotEvent( { "message": (), "time": time.time(), "isError": 1, "system": "-", "failure": f, "why": None, } ) self.assertEqual(self.observer.getErrors(), [f]) class LogErrorsMixin: """ High-level tests demonstrating the expected behaviour of logged errors during tests. """ def setUp(self): self.result = reporter.TestResult() def tearDown(self): self.flushLoggedErrors(ZeroDivisionError) def test_singleError(self): """ Test that a logged error gets reported as a test error. """ test = self.MockTest("test_single") test(self.result) self.assertEqual(len(self.result.errors), 1) self.assertTrue( self.result.errors[0][1].check(ZeroDivisionError), self.result.errors[0][1] ) self.assertEqual(0, self.result.successes) def test_twoErrors(self): """ Test that when two errors get logged, they both get reported as test errors. """ test = self.MockTest("test_double") test(self.result) self.assertEqual(len(self.result.errors), 2) self.assertEqual(0, self.result.successes) def test_errorsIsolated(self): """ Check that an error logged in one test doesn't fail the next test. """ t1 = self.MockTest("test_single") t2 = self.MockTest("test_silent") t1(self.result) t2(self.result) self.assertEqual(len(self.result.errors), 1) self.assertEqual(self.result.errors[0][0], t1) self.assertEqual(1, self.result.successes) def test_errorsIsolatedWhenTestFails(self): """ An error logged in a failed test doesn't fail the next test. """ t1 = self.MockTest("test_singleThenFail") t2 = self.MockTest("test_silent") t1(self.result) t2(self.result) self.assertEqual(len(self.result.errors), 2) self.assertEqual(self.result.errors[0][0], t1) self.result.errors[0][1].trap(TypeError) self.assertEqual(self.result.errors[1][0], t1) self.result.errors[1][1].trap(ZeroDivisionError) self.assertEqual(1, self.result.successes) def test_boundedObservers(self): """ There are no extra log observers after a test runs. """ # XXX trial is *all about* global log state. It should really be fixed. observer = _synctest._LogObserver() self.patch(_synctest, "_logObserver", observer) observers = log.theLogPublisher.observers[:] test = self.MockTest() test(self.result) self.assertEqual(observers, log.theLogPublisher.observers) class SynchronousLogErrorsTests(LogErrorsMixin, unittest.SynchronousTestCase): MockTest = Mask.SynchronousFailureLogging class AsynchronousLogErrorsTests(LogErrorsMixin, unittest.TestCase): MockTest = Mask.AsynchronousFailureLogging def test_inCallback(self): """ Test that errors logged in callbacks get reported as test errors. """ test = self.MockTest("test_inCallback") test(self.result) self.assertEqual(len(self.result.errors), 1) self.assertTrue( self.result.errors[0][1].check(ZeroDivisionError), self.result.errors[0][1] )