blob: 40fa47cfdcb616d3cc81ae744439776f7f067e02 [file] [log] [blame]
# Copyright (C) 2018 Igalia S.L.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Library General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public License
# along with this library; see the file COPYING.LIB. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301, USA.
import errno
import os
import select
import socket
import struct
import subprocess
import sys
from signal import alarm, signal, SIGALRM
from io import BytesIO
(LOG_NONE,
LOG_ERROR,
LOG_START_BINARY,
LOG_LIST_CASE,
LOG_SKIP_CASE,
LOG_START_CASE,
LOG_STOP_CASE,
LOG_MIN_RESULT,
LOG_MAX_RESULT,
LOG_MESSAGE,
LOG_START_SUITE,
LOG_STOP_SUITE) = list(range(12))
(TEST_RUN_SUCCESS,
TEST_RUN_SKIPPED,
TEST_RUN_FAILURE,
TEST_RUN_INCOMPLETE) = list(range(4))
class TestTimeout(Exception):
pass
class Message(object):
def __init__(self, log_type, strings, numbers):
self.log_type = log_type
self.strings = strings
self.numbers = numbers
@staticmethod
def create(data):
if len(data) < 5 * 4:
return 0, None
def read_unsigned(bytes, n=1):
values = struct.unpack('%dI' % n, bytes.read(4 * n))
return [socket.ntohl(v) for v in values if v is not None]
def read_double(bytes, n=1):
return struct.unpack('>%dd' % n, bytes.read(8 * n))
def read_string(bytes, n=1):
values = []
for i in range(n):
str_len = read_unsigned(bytes)[0]
values.append(struct.unpack('%ds' % str_len, bytes.read(str_len))[0].decode('utf-8'))
return values
bytes = BytesIO(data)
msg_len = read_unsigned(bytes)[0]
if len(data) < msg_len:
return 0, None
log_type, n_strings, n_numbers, reserved = read_unsigned(bytes, 4)
if reserved != 0:
return 0, None
strings = read_string(bytes, n_strings)
numbers = read_double(bytes, n_numbers)
return msg_len, Message(log_type, strings, numbers)
class GLibTestRunner(object):
def __init__(self, test_binary, timeout, is_slow_function=None, slow_timeout=0):
self._test_binary = test_binary
self._timeout = timeout
if is_slow_function is not None:
self._is_test_slow = is_slow_function
else:
self._is_test_slow = lambda x, y: False
self._slow_timeout = slow_timeout
self._stderr_fd = None
self._subtest = None
self._subtest_messages = []
self._results = {}
def _process_data(self, data):
retval = []
msg_len, msg = Message.create(data)
while msg_len:
retval.append(msg)
data = data[msg_len:]
msg_len, msg = Message.create(data)
return data, retval
def _process_message(self, message):
if message.log_type == LOG_ERROR:
self._subtest_message(message.strings)
elif message.log_type == LOG_START_CASE:
self._subtest_start(message.strings[0])
elif message.log_type == LOG_STOP_CASE:
self._subtest_end(message.numbers[0])
elif message.log_type == LOG_MESSAGE:
self._subtest_message([message.strings[0]])
def _read_from_pipe(self, pipe_r):
data = b''
read_set = [pipe_r]
while read_set:
try:
rlist, _, _ = select.select(read_set, [], [])
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
if pipe_r in rlist:
buffer = os.read(pipe_r, 4096)
if not buffer:
read_set.remove(pipe_r)
data += buffer
data, messages = self._process_data(data)
for message in messages:
self._process_message(message)
def _read_from_stderr(self, fd):
data = ''
read_set = [fd]
while True:
try:
rlist, _, _ = select.select(read_set, [], [], 0)
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
if fd not in rlist:
return data
buffer = os.read(fd, 4096)
if not buffer:
return data
data += buffer.decode('utf-8')
@staticmethod
def _start_timeout(timeout):
if timeout <= 0:
return
def _alarm_handler(signum, frame):
raise TestTimeout
signal(SIGALRM, _alarm_handler)
alarm(timeout)
@staticmethod
def _stop_timeout():
alarm(0)
def _subtest_start(self, subtest):
self._subtest = subtest
message = self._subtest + ':'
sys.stdout.write(' %-68s' % message)
sys.stdout.flush()
timeout = self._timeout
if self._is_test_slow(os.path.basename(self._test_binary), self._subtest):
timeout = self._slow_timeout
self._start_timeout(timeout)
def _subtest_message(self, message):
if self._subtest is None:
sys.stdout.write('%s\n' % '\n'.join(message))
else:
self._subtest_messages.extend(message)
def _subtest_stderr(self, errors):
ignore_next_line = False
for line in errors.rstrip('\n').split('\n'):
if ignore_next_line:
ignore_next_line = False
continue
if line == '**':
ignore_next_line = True
continue
sys.stderr.write('%s\n' % line)
sys.stderr.flush()
def _subtest_end(self, result, did_timeout=False):
self._stop_timeout()
if did_timeout:
self._results[self._subtest] = 'TIMEOUT'
elif result == TEST_RUN_SUCCESS:
self._results[self._subtest] = 'PASS'
elif result == TEST_RUN_SKIPPED:
self._results[self._subtest] = 'SKIP'
elif not self._subtest_messages:
self._results[self._subtest] = 'CRASH'
else:
self._results[self._subtest] = 'FAIL'
sys.stdout.write('%s\n' % self._results[self._subtest])
if self._subtest_messages:
sys.stdout.write('%s\n' % '\n'.join(self._subtest_messages))
sys.stdout.flush()
errors = self._read_from_stderr(self._stderr_fd)
if errors:
self._subtest_stderr(errors)
self._subtest = None
self._subtest_messages = []
def run(self, subtests=[], skipped=[], env=None):
pipe_r, pipe_w = os.pipe()
command = [self._test_binary, '--quiet', '--keep-going', '--GTestLogFD=%d' % pipe_w]
if self._results:
command.append('--GTestSkipCount=%d' % len(self._results))
for subtest in subtests:
command.extend(['-p', subtest])
for skip in skipped:
command.extend(['-s', skip])
if not self._results:
sys.stdout.write('TEST: %s...\n' % self._test_binary)
sys.stdout.flush()
if sys.version_info.major > 2:
p = subprocess.Popen(command, stderr=subprocess.PIPE, env=env, pass_fds=[pipe_w])
else:
p = subprocess.Popen(command, stderr=subprocess.PIPE, env=env)
self._stderr_fd = p.stderr.fileno()
os.close(pipe_w)
need_restart = False
try:
self._read_from_pipe(pipe_r)
p.wait()
except TestTimeout:
self._subtest_end(0, did_timeout=True)
p.terminate()
need_restart = True
finally:
os.close(pipe_r)
if self._subtest is not None:
self._subtest_end(256)
need_restart = True
# Check for errors before any test is run
if not self._results and p.returncode != 0:
errors = self._read_from_stderr(self._stderr_fd)
sys.stdout.write('Test program setup failed.\n')
self._subtest_stderr(errors)
self._results['beforeAll'] = 'CRASH'
return self._results
# Try to read errors from afterAll
if p.returncode != 0 and not need_restart:
errors = self._read_from_stderr(self._stderr_fd)
sys.stdout.write('Test program shutdown failed.')
self._subtest_stderr(errors)
self._results['afterAll'] = 'CRASH'
return self._results
if len(self._results) == 0:
# Normally stderr is checked after a subtest has been parsed. If no subtests have been parsed
# chances are something went wrong with the test executable itself and we should print stderr
# to the user.
sys.stderr.write(self._read_from_stderr(self._stderr_fd))
self._stderr_fd = None
if need_restart:
self.run(subtests, skipped, env)
return self._results
if __name__ == '__main__':
for test in sys.argv[1:]:
runner = GLibTestRunner(test, 5)
runner.run()