| import os, sys, unittest, getopt, time |
| |
| use_resources = [] |
| |
| import ctypes |
| ctypes_symbols = dir(ctypes) |
| |
| def need_symbol(name): |
| return unittest.skipUnless(name in ctypes_symbols, |
| '{!r} is required'.format(name)) |
| |
| |
| class ResourceDenied(unittest.SkipTest): |
| """Test skipped because it requested a disallowed resource. |
| |
| This is raised when a test calls requires() for a resource that |
| has not be enabled. Resources are defined by test modules. |
| """ |
| |
| def is_resource_enabled(resource): |
| """Test whether a resource is enabled. |
| |
| If the caller's module is __main__ then automatically return True.""" |
| if sys._getframe().f_back.f_globals.get("__name__") == "__main__": |
| return True |
| result = use_resources is not None and \ |
| (resource in use_resources or "*" in use_resources) |
| if not result: |
| _unavail[resource] = None |
| return result |
| |
| _unavail = {} |
| def requires(resource, msg=None): |
| """Raise ResourceDenied if the specified resource is not available. |
| |
| If the caller's module is __main__ then automatically return True.""" |
| # see if the caller's module is __main__ - if so, treat as if |
| # the resource was set |
| if sys._getframe().f_back.f_globals.get("__name__") == "__main__": |
| return |
| if not is_resource_enabled(resource): |
| if msg is None: |
| msg = "Use of the `%s' resource not enabled" % resource |
| raise ResourceDenied(msg) |
| |
| def find_package_modules(package, mask): |
| import fnmatch |
| if (hasattr(package, "__loader__") and |
| hasattr(package.__loader__, '_files')): |
| path = package.__name__.replace(".", os.path.sep) |
| mask = os.path.join(path, mask) |
| for fnm in package.__loader__._files.iterkeys(): |
| if fnmatch.fnmatchcase(fnm, mask): |
| yield os.path.splitext(fnm)[0].replace(os.path.sep, ".") |
| else: |
| path = package.__path__[0] |
| for fnm in os.listdir(path): |
| if fnmatch.fnmatchcase(fnm, mask): |
| yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0]) |
| |
| def get_tests(package, mask, verbosity, exclude=()): |
| """Return a list of skipped test modules, and a list of test cases.""" |
| tests = [] |
| skipped = [] |
| for modname in find_package_modules(package, mask): |
| if modname.split(".")[-1] in exclude: |
| skipped.append(modname) |
| if verbosity > 1: |
| print >> sys.stderr, "Skipped %s: excluded" % modname |
| continue |
| try: |
| mod = __import__(modname, globals(), locals(), ['*']) |
| except (ResourceDenied, unittest.SkipTest) as detail: |
| skipped.append(modname) |
| if verbosity > 1: |
| print >> sys.stderr, "Skipped %s: %s" % (modname, detail) |
| continue |
| for name in dir(mod): |
| if name.startswith("_"): |
| continue |
| o = getattr(mod, name) |
| if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase): |
| tests.append(o) |
| return skipped, tests |
| |
| def usage(): |
| print __doc__ |
| return 1 |
| |
| def test_with_refcounts(runner, verbosity, testcase): |
| """Run testcase several times, tracking reference counts.""" |
| import gc |
| import ctypes |
| ptc = ctypes._pointer_type_cache.copy() |
| cfc = ctypes._c_functype_cache.copy() |
| wfc = ctypes._win_functype_cache.copy() |
| |
| # when searching for refcount leaks, we have to manually reset any |
| # caches that ctypes has. |
| def cleanup(): |
| ctypes._pointer_type_cache = ptc.copy() |
| ctypes._c_functype_cache = cfc.copy() |
| ctypes._win_functype_cache = wfc.copy() |
| gc.collect() |
| |
| test = unittest.makeSuite(testcase) |
| for i in range(5): |
| rc = sys.gettotalrefcount() |
| runner.run(test) |
| cleanup() |
| COUNT = 5 |
| refcounts = [None] * COUNT |
| for i in range(COUNT): |
| rc = sys.gettotalrefcount() |
| runner.run(test) |
| cleanup() |
| refcounts[i] = sys.gettotalrefcount() - rc |
| if filter(None, refcounts): |
| print "%s leaks:\n\t" % testcase, refcounts |
| elif verbosity: |
| print "%s: ok." % testcase |
| |
| class TestRunner(unittest.TextTestRunner): |
| def run(self, test, skipped): |
| "Run the given test case or test suite." |
| # Same as unittest.TextTestRunner.run, except that it reports |
| # skipped tests. |
| result = self._makeResult() |
| startTime = time.time() |
| test(result) |
| stopTime = time.time() |
| timeTaken = stopTime - startTime |
| result.printErrors() |
| self.stream.writeln(result.separator2) |
| run = result.testsRun |
| if _unavail: #skipped: |
| requested = _unavail.keys() |
| requested.sort() |
| self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" % |
| (run, run != 1 and "s" or "", timeTaken, |
| len(skipped), |
| len(skipped) != 1 and "s" or "")) |
| self.stream.writeln("Unavailable resources: %s" % ", ".join(requested)) |
| else: |
| self.stream.writeln("Ran %d test%s in %.3fs" % |
| (run, run != 1 and "s" or "", timeTaken)) |
| self.stream.writeln() |
| if not result.wasSuccessful(): |
| self.stream.write("FAILED (") |
| failed, errored = map(len, (result.failures, result.errors)) |
| if failed: |
| self.stream.write("failures=%d" % failed) |
| if errored: |
| if failed: self.stream.write(", ") |
| self.stream.write("errors=%d" % errored) |
| self.stream.writeln(")") |
| else: |
| self.stream.writeln("OK") |
| return result |
| |
| |
| def main(*packages): |
| try: |
| opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:") |
| except getopt.error: |
| return usage() |
| |
| verbosity = 1 |
| search_leaks = False |
| exclude = [] |
| for flag, value in opts: |
| if flag == "-q": |
| verbosity -= 1 |
| elif flag == "-v": |
| verbosity += 1 |
| elif flag == "-r": |
| try: |
| sys.gettotalrefcount |
| except AttributeError: |
| print >> sys.stderr, "-r flag requires Python debug build" |
| return -1 |
| search_leaks = True |
| elif flag == "-u": |
| use_resources.extend(value.split(",")) |
| elif flag == "-x": |
| exclude.extend(value.split(",")) |
| |
| mask = "test_*.py" |
| if args: |
| mask = args[0] |
| |
| for package in packages: |
| run_tests(package, mask, verbosity, search_leaks, exclude) |
| |
| |
| def run_tests(package, mask, verbosity, search_leaks, exclude): |
| skipped, testcases = get_tests(package, mask, verbosity, exclude) |
| runner = TestRunner(verbosity=verbosity) |
| |
| suites = [unittest.makeSuite(o) for o in testcases] |
| suite = unittest.TestSuite(suites) |
| result = runner.run(suite, skipped) |
| |
| if search_leaks: |
| # hunt for refcount leaks |
| runner = BasicTestRunner() |
| for t in testcases: |
| test_with_refcounts(runner, verbosity, t) |
| |
| return bool(result.errors) |
| |
| class BasicTestRunner: |
| def run(self, test): |
| result = unittest.TestResult() |
| test(result) |
| return result |