| """ terminal reporting of the full testing process. |
| |
| This is a good source for looking at the various reporting hooks. |
| """ |
| from __future__ import absolute_import, division, print_function |
| |
| import itertools |
| import platform |
| import sys |
| import time |
| |
| import pluggy |
| import py |
| import six |
| |
| import pytest |
| from _pytest import nodes |
| from _pytest.main import EXIT_OK, EXIT_TESTSFAILED, EXIT_INTERRUPTED, \ |
| EXIT_USAGEERROR, EXIT_NOTESTSCOLLECTED |
| |
| |
| def pytest_addoption(parser): |
| group = parser.getgroup("terminal reporting", "reporting", after="general") |
| group._addoption('-v', '--verbose', action="count", |
| dest="verbose", default=0, help="increase verbosity."), |
| group._addoption('-q', '--quiet', action="count", |
| dest="quiet", default=0, help="decrease verbosity."), |
| group._addoption('-r', |
| action="store", dest="reportchars", default='', metavar="chars", |
| help="show extra test summary info as specified by chars (f)ailed, " |
| "(E)error, (s)skipped, (x)failed, (X)passed, " |
| "(p)passed, (P)passed with output, (a)all except pP. " |
| "Warnings are displayed at all times except when " |
| "--disable-warnings is set") |
| group._addoption('--disable-warnings', '--disable-pytest-warnings', default=False, |
| dest='disable_warnings', action='store_true', |
| help='disable warnings summary') |
| group._addoption('-l', '--showlocals', |
| action="store_true", dest="showlocals", default=False, |
| help="show locals in tracebacks (disabled by default).") |
| group._addoption('--tb', metavar="style", |
| action="store", dest="tbstyle", default='auto', |
| choices=['auto', 'long', 'short', 'no', 'line', 'native'], |
| help="traceback print mode (auto/long/short/line/native/no).") |
| group._addoption('--fulltrace', '--full-trace', |
| action="store_true", default=False, |
| help="don't cut any tracebacks (default is to cut).") |
| group._addoption('--color', metavar="color", |
| action="store", dest="color", default='auto', |
| choices=['yes', 'no', 'auto'], |
| help="color terminal output (yes/no/auto).") |
| |
| parser.addini("console_output_style", |
| help="console output: classic or with additional progress information (classic|progress).", |
| default='progress') |
| |
| |
| def pytest_configure(config): |
| config.option.verbose -= config.option.quiet |
| reporter = TerminalReporter(config, sys.stdout) |
| config.pluginmanager.register(reporter, 'terminalreporter') |
| if config.option.debug or config.option.traceconfig: |
| def mywriter(tags, args): |
| msg = " ".join(map(str, args)) |
| reporter.write_line("[traceconfig] " + msg) |
| config.trace.root.setprocessor("pytest:config", mywriter) |
| |
| |
| def getreportopt(config): |
| reportopts = "" |
| reportchars = config.option.reportchars |
| if not config.option.disable_warnings and 'w' not in reportchars: |
| reportchars += 'w' |
| elif config.option.disable_warnings and 'w' in reportchars: |
| reportchars = reportchars.replace('w', '') |
| if reportchars: |
| for char in reportchars: |
| if char not in reportopts and char != 'a': |
| reportopts += char |
| elif char == 'a': |
| reportopts = 'fEsxXw' |
| return reportopts |
| |
| |
| def pytest_report_teststatus(report): |
| if report.passed: |
| letter = "." |
| elif report.skipped: |
| letter = "s" |
| elif report.failed: |
| letter = "F" |
| if report.when != "call": |
| letter = "f" |
| return report.outcome, letter, report.outcome.upper() |
| |
| |
| class WarningReport: |
| """ |
| Simple structure to hold warnings information captured by ``pytest_logwarning``. |
| """ |
| |
| def __init__(self, code, message, nodeid=None, fslocation=None): |
| """ |
| :param code: unused |
| :param str message: user friendly message about the warning |
| :param str|None nodeid: node id that generated the warning (see ``get_location``). |
| :param tuple|py.path.local fslocation: |
| file system location of the source of the warning (see ``get_location``). |
| """ |
| self.code = code |
| self.message = message |
| self.nodeid = nodeid |
| self.fslocation = fslocation |
| |
| def get_location(self, config): |
| """ |
| Returns the more user-friendly information about the location |
| of a warning, or None. |
| """ |
| if self.nodeid: |
| return self.nodeid |
| if self.fslocation: |
| if isinstance(self.fslocation, tuple) and len(self.fslocation) >= 2: |
| filename, linenum = self.fslocation[:2] |
| relpath = py.path.local(filename).relto(config.invocation_dir) |
| return '%s:%s' % (relpath, linenum) |
| else: |
| return str(self.fslocation) |
| return None |
| |
| |
| class TerminalReporter: |
| def __init__(self, config, file=None): |
| import _pytest.config |
| self.config = config |
| self.verbosity = self.config.option.verbose |
| self.showheader = self.verbosity >= 0 |
| self.showfspath = self.verbosity >= 0 |
| self.showlongtestinfo = self.verbosity > 0 |
| self._numcollected = 0 |
| self._session = None |
| |
| self.stats = {} |
| self.startdir = py.path.local() |
| if file is None: |
| file = sys.stdout |
| self._tw = _pytest.config.create_terminal_writer(config, file) |
| # self.writer will be deprecated in pytest-3.4 |
| self.writer = self._tw |
| self._screen_width = self._tw.fullwidth |
| self.currentfspath = None |
| self.reportchars = getreportopt(config) |
| self.hasmarkup = self._tw.hasmarkup |
| self.isatty = file.isatty() |
| self._progress_items_reported = 0 |
| self._show_progress_info = self.config.getini('console_output_style') == 'progress' |
| |
| def hasopt(self, char): |
| char = {'xfailed': 'x', 'skipped': 's'}.get(char, char) |
| return char in self.reportchars |
| |
| def write_fspath_result(self, nodeid, res): |
| fspath = self.config.rootdir.join(nodeid.split("::")[0]) |
| if fspath != self.currentfspath: |
| if self.currentfspath is not None: |
| self._write_progress_information_filling_space() |
| self.currentfspath = fspath |
| fspath = self.startdir.bestrelpath(fspath) |
| self._tw.line() |
| self._tw.write(fspath + " ") |
| self._tw.write(res) |
| |
| def write_ensure_prefix(self, prefix, extra="", **kwargs): |
| if self.currentfspath != prefix: |
| self._tw.line() |
| self.currentfspath = prefix |
| self._tw.write(prefix) |
| if extra: |
| self._tw.write(extra, **kwargs) |
| self.currentfspath = -2 |
| self._write_progress_information_filling_space() |
| |
| def ensure_newline(self): |
| if self.currentfspath: |
| self._tw.line() |
| self.currentfspath = None |
| |
| def write(self, content, **markup): |
| self._tw.write(content, **markup) |
| |
| def write_line(self, line, **markup): |
| if not isinstance(line, six.text_type): |
| line = six.text_type(line, errors="replace") |
| self.ensure_newline() |
| self._tw.line(line, **markup) |
| |
| def rewrite(self, line, **markup): |
| """ |
| Rewinds the terminal cursor to the beginning and writes the given line. |
| |
| :kwarg erase: if True, will also add spaces until the full terminal width to ensure |
| previous lines are properly erased. |
| |
| The rest of the keyword arguments are markup instructions. |
| """ |
| erase = markup.pop('erase', False) |
| if erase: |
| fill_count = self._tw.fullwidth - len(line) - 1 |
| fill = ' ' * fill_count |
| else: |
| fill = '' |
| line = str(line) |
| self._tw.write("\r" + line + fill, **markup) |
| |
| def write_sep(self, sep, title=None, **markup): |
| self.ensure_newline() |
| self._tw.sep(sep, title, **markup) |
| |
| def section(self, title, sep="=", **kw): |
| self._tw.sep(sep, title, **kw) |
| |
| def line(self, msg, **kw): |
| self._tw.line(msg, **kw) |
| |
| def pytest_internalerror(self, excrepr): |
| for line in six.text_type(excrepr).split("\n"): |
| self.write_line("INTERNALERROR> " + line) |
| return 1 |
| |
| def pytest_logwarning(self, code, fslocation, message, nodeid): |
| warnings = self.stats.setdefault("warnings", []) |
| warning = WarningReport(code=code, fslocation=fslocation, |
| message=message, nodeid=nodeid) |
| warnings.append(warning) |
| |
| def pytest_plugin_registered(self, plugin): |
| if self.config.option.traceconfig: |
| msg = "PLUGIN registered: %s" % (plugin,) |
| # XXX this event may happen during setup/teardown time |
| # which unfortunately captures our output here |
| # which garbles our output if we use self.write_line |
| self.write_line(msg) |
| |
| def pytest_deselected(self, items): |
| self.stats.setdefault('deselected', []).extend(items) |
| |
| def pytest_runtest_logstart(self, nodeid, location): |
| # ensure that the path is printed before the |
| # 1st test of a module starts running |
| if self.showlongtestinfo: |
| line = self._locationline(nodeid, *location) |
| self.write_ensure_prefix(line, "") |
| elif self.showfspath: |
| fsid = nodeid.split("::")[0] |
| self.write_fspath_result(fsid, "") |
| |
| def pytest_runtest_logreport(self, report): |
| rep = report |
| res = self.config.hook.pytest_report_teststatus(report=rep) |
| cat, letter, word = res |
| if isinstance(word, tuple): |
| word, markup = word |
| else: |
| markup = None |
| self.stats.setdefault(cat, []).append(rep) |
| self._tests_ran = True |
| if not letter and not word: |
| # probably passed setup/teardown |
| return |
| running_xdist = hasattr(rep, 'node') |
| self._progress_items_reported += 1 |
| if self.verbosity <= 0: |
| if not running_xdist and self.showfspath: |
| self.write_fspath_result(rep.nodeid, letter) |
| else: |
| self._tw.write(letter) |
| self._write_progress_if_past_edge() |
| else: |
| if markup is None: |
| if rep.passed: |
| markup = {'green': True} |
| elif rep.failed: |
| markup = {'red': True} |
| elif rep.skipped: |
| markup = {'yellow': True} |
| else: |
| markup = {} |
| line = self._locationline(rep.nodeid, *rep.location) |
| if not running_xdist: |
| self.write_ensure_prefix(line, word, **markup) |
| else: |
| self.ensure_newline() |
| self._tw.write("[%s]" % rep.node.gateway.id) |
| if self._show_progress_info: |
| self._tw.write(self._get_progress_information_message() + " ", cyan=True) |
| else: |
| self._tw.write(' ') |
| self._tw.write(word, **markup) |
| self._tw.write(" " + line) |
| self.currentfspath = -2 |
| |
| def _write_progress_if_past_edge(self): |
| if not self._show_progress_info: |
| return |
| last_item = self._progress_items_reported == self._session.testscollected |
| if last_item: |
| self._write_progress_information_filling_space() |
| return |
| |
| past_edge = self._tw.chars_on_current_line + self._PROGRESS_LENGTH + 1 >= self._screen_width |
| if past_edge: |
| msg = self._get_progress_information_message() |
| self._tw.write(msg + '\n', cyan=True) |
| |
| _PROGRESS_LENGTH = len(' [100%]') |
| |
| def _get_progress_information_message(self): |
| collected = self._session.testscollected |
| if collected: |
| progress = self._progress_items_reported * 100 // collected |
| return ' [{:3d}%]'.format(progress) |
| return ' [100%]' |
| |
| def _write_progress_information_filling_space(self): |
| if not self._show_progress_info: |
| return |
| msg = self._get_progress_information_message() |
| fill = ' ' * (self._tw.fullwidth - self._tw.chars_on_current_line - len(msg) - 1) |
| self.write(fill + msg, cyan=True) |
| |
| def pytest_collection(self): |
| if not self.isatty and self.config.option.verbose >= 1: |
| self.write("collecting ... ", bold=True) |
| |
| def pytest_collectreport(self, report): |
| if report.failed: |
| self.stats.setdefault("error", []).append(report) |
| elif report.skipped: |
| self.stats.setdefault("skipped", []).append(report) |
| items = [x for x in report.result if isinstance(x, pytest.Item)] |
| self._numcollected += len(items) |
| if self.isatty: |
| # self.write_fspath_result(report.nodeid, 'E') |
| self.report_collect() |
| |
| def report_collect(self, final=False): |
| if self.config.option.verbose < 0: |
| return |
| |
| errors = len(self.stats.get('error', [])) |
| skipped = len(self.stats.get('skipped', [])) |
| if final: |
| line = "collected " |
| else: |
| line = "collecting " |
| line += str(self._numcollected) + " item" + ('' if self._numcollected == 1 else 's') |
| if errors: |
| line += " / %d errors" % errors |
| if skipped: |
| line += " / %d skipped" % skipped |
| if self.isatty: |
| self.rewrite(line, bold=True, erase=True) |
| if final: |
| self.write('\n') |
| else: |
| self.write_line(line) |
| |
| def pytest_collection_modifyitems(self): |
| self.report_collect(True) |
| |
| @pytest.hookimpl(trylast=True) |
| def pytest_sessionstart(self, session): |
| self._session = session |
| self._sessionstarttime = time.time() |
| if not self.showheader: |
| return |
| self.write_sep("=", "test session starts", bold=True) |
| verinfo = platform.python_version() |
| msg = "platform %s -- Python %s" % (sys.platform, verinfo) |
| if hasattr(sys, 'pypy_version_info'): |
| verinfo = ".".join(map(str, sys.pypy_version_info[:3])) |
| msg += "[pypy-%s-%s]" % (verinfo, sys.pypy_version_info[3]) |
| msg += ", pytest-%s, py-%s, pluggy-%s" % ( |
| pytest.__version__, py.__version__, pluggy.__version__) |
| if self.verbosity > 0 or self.config.option.debug or \ |
| getattr(self.config.option, 'pastebin', None): |
| msg += " -- " + str(sys.executable) |
| self.write_line(msg) |
| lines = self.config.hook.pytest_report_header( |
| config=self.config, startdir=self.startdir) |
| self._write_report_lines_from_hooks(lines) |
| |
| def _write_report_lines_from_hooks(self, lines): |
| lines.reverse() |
| for line in flatten(lines): |
| self.write_line(line) |
| |
| def pytest_report_header(self, config): |
| inifile = "" |
| if config.inifile: |
| inifile = " " + config.rootdir.bestrelpath(config.inifile) |
| lines = ["rootdir: %s, inifile:%s" % (config.rootdir, inifile)] |
| |
| plugininfo = config.pluginmanager.list_plugin_distinfo() |
| if plugininfo: |
| |
| lines.append( |
| "plugins: %s" % ", ".join(_plugin_nameversions(plugininfo))) |
| return lines |
| |
| def pytest_collection_finish(self, session): |
| if self.config.option.collectonly: |
| self._printcollecteditems(session.items) |
| if self.stats.get('failed'): |
| self._tw.sep("!", "collection failures") |
| for rep in self.stats.get('failed'): |
| rep.toterminal(self._tw) |
| return 1 |
| return 0 |
| lines = self.config.hook.pytest_report_collectionfinish( |
| config=self.config, startdir=self.startdir, items=session.items) |
| self._write_report_lines_from_hooks(lines) |
| |
| def _printcollecteditems(self, items): |
| # to print out items and their parent collectors |
| # we take care to leave out Instances aka () |
| # because later versions are going to get rid of them anyway |
| if self.config.option.verbose < 0: |
| if self.config.option.verbose < -1: |
| counts = {} |
| for item in items: |
| name = item.nodeid.split('::', 1)[0] |
| counts[name] = counts.get(name, 0) + 1 |
| for name, count in sorted(counts.items()): |
| self._tw.line("%s: %d" % (name, count)) |
| else: |
| for item in items: |
| nodeid = item.nodeid |
| nodeid = nodeid.replace("::()::", "::") |
| self._tw.line(nodeid) |
| return |
| stack = [] |
| indent = "" |
| for item in items: |
| needed_collectors = item.listchain()[1:] # strip root node |
| while stack: |
| if stack == needed_collectors[:len(stack)]: |
| break |
| stack.pop() |
| for col in needed_collectors[len(stack):]: |
| stack.append(col) |
| # if col.name == "()": |
| # continue |
| indent = (len(stack) - 1) * " " |
| self._tw.line("%s%s" % (indent, col)) |
| |
| @pytest.hookimpl(hookwrapper=True) |
| def pytest_sessionfinish(self, exitstatus): |
| outcome = yield |
| outcome.get_result() |
| self._tw.line("") |
| summary_exit_codes = ( |
| EXIT_OK, EXIT_TESTSFAILED, EXIT_INTERRUPTED, EXIT_USAGEERROR, |
| EXIT_NOTESTSCOLLECTED) |
| if exitstatus in summary_exit_codes: |
| self.config.hook.pytest_terminal_summary(terminalreporter=self, |
| exitstatus=exitstatus) |
| self.summary_errors() |
| self.summary_failures() |
| self.summary_warnings() |
| self.summary_passes() |
| if exitstatus == EXIT_INTERRUPTED: |
| self._report_keyboardinterrupt() |
| del self._keyboardinterrupt_memo |
| self.summary_deselected() |
| self.summary_stats() |
| |
| def pytest_keyboard_interrupt(self, excinfo): |
| self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True) |
| |
| def pytest_unconfigure(self): |
| if hasattr(self, '_keyboardinterrupt_memo'): |
| self._report_keyboardinterrupt() |
| |
| def _report_keyboardinterrupt(self): |
| excrepr = self._keyboardinterrupt_memo |
| msg = excrepr.reprcrash.message |
| self.write_sep("!", msg) |
| if "KeyboardInterrupt" in msg: |
| if self.config.option.fulltrace: |
| excrepr.toterminal(self._tw) |
| else: |
| self._tw.line("to show a full traceback on KeyboardInterrupt use --fulltrace", yellow=True) |
| excrepr.reprcrash.toterminal(self._tw) |
| |
| def _locationline(self, nodeid, fspath, lineno, domain): |
| def mkrel(nodeid): |
| line = self.config.cwd_relative_nodeid(nodeid) |
| if domain and line.endswith(domain): |
| line = line[:-len(domain)] |
| values = domain.split("[") |
| values[0] = values[0].replace('.', '::') # don't replace '.' in params |
| line += "[".join(values) |
| return line |
| # collect_fspath comes from testid which has a "/"-normalized path |
| |
| if fspath: |
| res = mkrel(nodeid).replace("::()", "") # parens-normalization |
| if nodeid.split("::")[0] != fspath.replace("\\", nodes.SEP): |
| res += " <- " + self.startdir.bestrelpath(fspath) |
| else: |
| res = "[location]" |
| return res + " " |
| |
| def _getfailureheadline(self, rep): |
| if hasattr(rep, 'location'): |
| fspath, lineno, domain = rep.location |
| return domain |
| else: |
| return "test session" # XXX? |
| |
| def _getcrashline(self, rep): |
| try: |
| return str(rep.longrepr.reprcrash) |
| except AttributeError: |
| try: |
| return str(rep.longrepr)[:50] |
| except AttributeError: |
| return "" |
| |
| # |
| # summaries for sessionfinish |
| # |
| def getreports(self, name): |
| values = [] |
| for x in self.stats.get(name, []): |
| if not hasattr(x, '_pdbshown'): |
| values.append(x) |
| return values |
| |
| def summary_warnings(self): |
| if self.hasopt("w"): |
| all_warnings = self.stats.get("warnings") |
| if not all_warnings: |
| return |
| |
| grouped = itertools.groupby(all_warnings, key=lambda wr: wr.get_location(self.config)) |
| |
| self.write_sep("=", "warnings summary", yellow=True, bold=False) |
| for location, warning_records in grouped: |
| self._tw.line(str(location) or '<undetermined location>') |
| for w in warning_records: |
| lines = w.message.splitlines() |
| indented = '\n'.join(' ' + x for x in lines) |
| self._tw.line(indented) |
| self._tw.line() |
| self._tw.line('-- Docs: http://doc.pytest.org/en/latest/warnings.html') |
| |
| def summary_passes(self): |
| if self.config.option.tbstyle != "no": |
| if self.hasopt("P"): |
| reports = self.getreports('passed') |
| if not reports: |
| return |
| self.write_sep("=", "PASSES") |
| for rep in reports: |
| msg = self._getfailureheadline(rep) |
| self.write_sep("_", msg) |
| self._outrep_summary(rep) |
| |
| def print_teardown_sections(self, rep): |
| for secname, content in rep.sections: |
| if 'teardown' in secname: |
| self._tw.sep('-', secname) |
| if content[-1:] == "\n": |
| content = content[:-1] |
| self._tw.line(content) |
| |
| def summary_failures(self): |
| if self.config.option.tbstyle != "no": |
| reports = self.getreports('failed') |
| if not reports: |
| return |
| self.write_sep("=", "FAILURES") |
| for rep in reports: |
| if self.config.option.tbstyle == "line": |
| line = self._getcrashline(rep) |
| self.write_line(line) |
| else: |
| msg = self._getfailureheadline(rep) |
| markup = {'red': True, 'bold': True} |
| self.write_sep("_", msg, **markup) |
| self._outrep_summary(rep) |
| for report in self.getreports(''): |
| if report.nodeid == rep.nodeid and report.when == 'teardown': |
| self.print_teardown_sections(report) |
| |
| def summary_errors(self): |
| if self.config.option.tbstyle != "no": |
| reports = self.getreports('error') |
| if not reports: |
| return |
| self.write_sep("=", "ERRORS") |
| for rep in self.stats['error']: |
| msg = self._getfailureheadline(rep) |
| if not hasattr(rep, 'when'): |
| # collect |
| msg = "ERROR collecting " + msg |
| elif rep.when == "setup": |
| msg = "ERROR at setup of " + msg |
| elif rep.when == "teardown": |
| msg = "ERROR at teardown of " + msg |
| self.write_sep("_", msg) |
| self._outrep_summary(rep) |
| |
| def _outrep_summary(self, rep): |
| rep.toterminal(self._tw) |
| for secname, content in rep.sections: |
| self._tw.sep("-", secname) |
| if content[-1:] == "\n": |
| content = content[:-1] |
| self._tw.line(content) |
| |
| def summary_stats(self): |
| session_duration = time.time() - self._sessionstarttime |
| (line, color) = build_summary_stats_line(self.stats) |
| msg = "%s in %.2f seconds" % (line, session_duration) |
| markup = {color: True, 'bold': True} |
| |
| if self.verbosity >= 0: |
| self.write_sep("=", msg, **markup) |
| if self.verbosity == -1: |
| self.write_line(msg, **markup) |
| |
| def summary_deselected(self): |
| if 'deselected' in self.stats: |
| self.write_sep("=", "%d tests deselected" % ( |
| len(self.stats['deselected'])), bold=True) |
| |
| |
| def repr_pythonversion(v=None): |
| if v is None: |
| v = sys.version_info |
| try: |
| return "%s.%s.%s-%s-%s" % v |
| except (TypeError, ValueError): |
| return str(v) |
| |
| |
| def flatten(values): |
| for x in values: |
| if isinstance(x, (list, tuple)): |
| for y in flatten(x): |
| yield y |
| else: |
| yield x |
| |
| |
| def build_summary_stats_line(stats): |
| keys = ("failed passed skipped deselected " |
| "xfailed xpassed warnings error").split() |
| unknown_key_seen = False |
| for key in stats.keys(): |
| if key not in keys: |
| if key: # setup/teardown reports have an empty key, ignore them |
| keys.append(key) |
| unknown_key_seen = True |
| parts = [] |
| for key in keys: |
| val = stats.get(key, None) |
| if val: |
| parts.append("%d %s" % (len(val), key)) |
| |
| if parts: |
| line = ", ".join(parts) |
| else: |
| line = "no tests ran" |
| |
| if 'failed' in stats or 'error' in stats: |
| color = 'red' |
| elif 'warnings' in stats or unknown_key_seen: |
| color = 'yellow' |
| elif 'passed' in stats: |
| color = 'green' |
| else: |
| color = 'yellow' |
| |
| return (line, color) |
| |
| |
| def _plugin_nameversions(plugininfo): |
| values = [] |
| for plugin, dist in plugininfo: |
| # gets us name and version! |
| name = '{dist.project_name}-{dist.version}'.format(dist=dist) |
| # questionable convenience, but it keeps things short |
| if name.startswith("pytest-"): |
| name = name[7:] |
| # we decided to print python package names |
| # they can have more than one plugin |
| if name not in values: |
| values.append(name) |
| return values |