| # Copyright (C) 2017 Igalia S.L. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions |
| # are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' AND ANY |
| # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
| # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
| # DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY |
| # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
| # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
| # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON |
| # ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| import contextlib |
| import errno |
| import os |
| import shutil |
| import sys |
| import tempfile |
| |
| from webkitpy.common.system.filesystem import FileSystem |
| from webkitpy.common.webkit_finder import WebKitFinder |
| import pytest |
| from _pytest.config import ExitCode |
| |
| |
| class TemporaryDirectory(object): |
| |
| def __enter__(self): |
| self.path = tempfile.mkdtemp(prefix="pytest-") |
| return self.path |
| |
| def __exit__(self, *args): |
| try: |
| shutil.rmtree(self.path) |
| except OSError as e: |
| # no such file or directory |
| if e.errno != errno.ENOENT: |
| raise |
| |
| |
| def get_item_name(item, ignore_param): |
| if ignore_param is None: |
| return item.name |
| |
| single_param = '[%s]' % ignore_param |
| if item.name.endswith(single_param): |
| return item.name[:-len(single_param)] |
| |
| param = '[%s-' % ignore_param |
| if param in item.name: |
| return item.name.replace('%s-' % ignore_param, '') |
| |
| return item.name |
| |
| |
| class CollectRecorder(object): |
| |
| def __init__(self, ignore_param): |
| self._ignore_param = ignore_param |
| self.tests = {} |
| |
| def pytest_collectreport(self, report): |
| if report.nodeid and report.result: |
| self.tests.setdefault(report.nodeid, []) |
| for subtest in report.result: |
| self.tests[report.nodeid].append(get_item_name(subtest, self._ignore_param)) |
| |
| |
| class HarnessResultRecorder(object): |
| |
| def __init__(self): |
| self.outcome = ('OK', None) |
| |
| def pytest_collectreport(self, report): |
| if report.outcome == 'failed': |
| self.outcome = ('ERROR', None) |
| elif report.outcome == 'skipped': |
| self.outcome = ('SKIP', None) |
| |
| |
| class SubtestResultRecorder(object): |
| |
| def __init__(self): |
| self.results = [] |
| |
| def pytest_runtest_logreport(self, report): |
| if report.passed and report.when == 'call': |
| self.record_pass(report) |
| elif report.failed: |
| if report.when != 'call': |
| self.record_error(report) |
| else: |
| self.record_fail(report) |
| elif report.skipped: |
| self.record_skip(report) |
| |
| def _was_timeout(self, report): |
| return hasattr(report.longrepr, 'reprcrash') and report.longrepr.reprcrash.message.startswith('Failed: Timeout >') |
| |
| def record_pass(self, report): |
| if hasattr(report, 'wasxfail'): |
| if report.wasxfail == 'Timeout': |
| self.record(report.nodeid, 'XPASS_TIMEOUT') |
| else: |
| self.record(report.nodeid, 'XPASS') |
| else: |
| self.record(report.nodeid, 'PASS') |
| |
| def record_fail(self, report): |
| if self._was_timeout(report): |
| self.record(report.nodeid, 'TIMEOUT', stack=report.longrepr) |
| else: |
| self.record(report.nodeid, 'FAIL', stack=report.longrepr) |
| |
| def record_error(self, report): |
| # error in setup/teardown |
| if report.when != 'call': |
| message = '%s error' % report.when |
| self.record(report.nodeid, 'ERROR', message, report.longrepr) |
| |
| def record_skip(self, report): |
| if hasattr(report, 'wasxfail'): |
| if self._was_timeout(report) and report.wasxfail != 'Timeout': |
| self.record(report.nodeid, 'TIMEOUT', stack=report.longrepr) |
| else: |
| self.record(report.nodeid, 'XFAIL') |
| else: |
| self.record(report.nodeid, 'SKIP') |
| |
| def record(self, test, status, message=None, stack=None): |
| if stack is not None: |
| stack = str(stack) |
| new_result = (test, status, message, stack) |
| self.results.append(new_result) |
| |
| |
| class TestExpectationsMarker(object): |
| |
| def __init__(self, expectations, timeout, ignore_param): |
| self._expectations = expectations |
| self._timeout = timeout |
| self._ignore_param = ignore_param |
| self._base_dir = WebKitFinder(FileSystem()).path_from_webkit_base('WebDriverTests') |
| |
| def pytest_collection_modifyitems(self, session, config, items): |
| for item in items: |
| test = os.path.relpath(str(item.fspath), self._base_dir) |
| item_name = get_item_name(item, self._ignore_param) |
| if self._expectations.is_slow(test, item_name): |
| item.add_marker(pytest.mark.timeout(self._timeout * 5)) |
| expected = self._expectations.get_expectation(test, item_name)[0] |
| if expected == 'FAIL': |
| item.add_marker(pytest.mark.xfail) |
| elif expected == 'TIMEOUT': |
| item.add_marker(pytest.mark.xfail(reason="Timeout")) |
| elif expected == 'SKIP': |
| item.add_marker(pytest.mark.skip) |
| |
| |
| def collect(directory, args, ignore_param=None): |
| collect_recorder = CollectRecorder(ignore_param) |
| with open(os.devnull, 'w') as f: |
| with contextlib.redirect_stdout(f): |
| with TemporaryDirectory() as cache_directory: |
| cmd = ['--collect-only', |
| '--basetemp', cache_directory] |
| cmd.extend(args) |
| cmd.append(directory) |
| pytest.main(cmd, plugins=[collect_recorder]) |
| return collect_recorder.tests |
| |
| |
| def run(path, args, timeout, env, expectations, ignore_param=None): |
| harness_recorder = HarnessResultRecorder() |
| subtests_recorder = SubtestResultRecorder() |
| expectations_marker = TestExpectationsMarker(expectations, timeout, ignore_param) |
| _environ = dict(os.environ) |
| os.environ.clear() |
| os.environ.update(env) |
| |
| with TemporaryDirectory() as cache_directory: |
| try: |
| cmd = ['-vv', |
| '--capture', 'no', |
| '--basetemp', cache_directory, |
| '--showlocals', |
| '--timeout', str(timeout), |
| '-p', 'no:cacheprovider'] |
| cmd.extend(args) |
| cmd.append(path) |
| result = pytest.main(cmd, plugins=[harness_recorder, subtests_recorder, expectations_marker]) |
| |
| if result == ExitCode.INTERNAL_ERROR: |
| harness_recorder.outcome = ('ERROR', None) |
| except Exception as e: |
| harness_recorder.outcome = ('ERROR', str(e)) |
| |
| os.environ.clear() |
| os.environ.update(_environ) |
| |
| return harness_recorder.outcome, subtests_recorder.results |