blob: fefb91df41e51deb168d6252b28d099e99d251e0 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (C) 2010, 2012 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import logging
import re
import sys
import unittest
from webkitcorepy import StringIO
from webkitpy.layout_tests.views.metered_stream import MeteredStream
class RegularTest(unittest.TestCase):
verbose = False
isatty = False
print_timestamps = None
def setUp(self):
self.stream = StringIO()
self.stream.isatty = lambda: self.isatty
# configure a logger to test that log calls do normally get included.
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
self.logger.propagate = False
# add a dummy time counter for a default behavior.
self.times = list(range(10))
self.meter = MeteredStream(self.stream, self.verbose, self.logger, self.time_fn, 8675, print_timestamps=self.print_timestamps)
def tearDown(self):
if self.meter:
self.meter.cleanup()
self.meter = None
def time_fn(self):
return self.times.pop(0)
def test_logging_not_included(self):
# This tests that if we don't hand a logger to the MeteredStream,
# nothing is logged.
logging_stream = StringIO()
handler = logging.StreamHandler(logging_stream)
root_logger = logging.getLogger()
orig_level = root_logger.level
root_logger.addHandler(handler)
root_logger.setLevel(logging.DEBUG)
try:
self.meter = MeteredStream(self.stream, self.verbose, None, self.time_fn, 8675, print_timestamps=self.print_timestamps)
self.meter.write_throttled_update('foo')
self.meter.write_update('bar')
self.meter.write('baz')
self.assertEqual(logging_stream.getvalue(), '')
finally:
root_logger.removeHandler(handler)
root_logger.setLevel(orig_level)
def _basic(self, times):
self.times = times
self.meter.write_update('foo')
self.meter.write_update('bar')
self.meter.write_throttled_update('baz')
self.meter.write_throttled_update('baz 2')
self.meter.writeln('done')
self.assertEqual(self.times, [])
return self.stream.getvalue()
def test_basic(self):
self.assertEqual(self._basic([0, 1, 2, 13, 14]), 'foo\nbar\nbaz 2\ndone\n')
def _log_after_update(self):
self.meter.write_update('foo')
self.logger.info('bar')
return self.stream.getvalue()
def test_log_after_update(self):
self.assertEqual(self._log_after_update(), 'foo\nbar\n')
def test_log_args(self):
self.logger.info('foo %s %d', 'bar', 2)
self.assertEqual(self.stream.getvalue(), 'foo bar 2\n')
def test_unicode(self):
self.logger.info('‘example’')
self.assertEqual(self.stream.getvalue()[-len('‘example’\n'):], '‘example’\n')
self.logger.info(u'\u2713')
if sys.version_info > (3, 0):
self.assertEqual(self.stream.getvalue()[-2:], u'\u2713\n')
else:
self.assertEqual(self.stream.buflist[-1][-2:], u'\u2713\n')
def test_stream_with_encoding(self):
class AsciiStream(StringIO):
def write(self, s):
return StringIO.write(self, '{}'.format(s))
stream = AsciiStream()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.propagate = False
try:
meter = MeteredStream(stream, self.verbose, logger, self.time_fn, 8675, print_timestamps=self.print_timestamps)
self.logger.info(u'\u2713')
if sys.version_info > (3, 0):
self.assertEqual(stream.getvalue()[-2:], '✓\n')
else:
self.assertEqual(stream.getvalue()[-2:], '?\n')
finally:
meter.cleanup()
class TtyTest(RegularTest):
verbose = False
isatty = True
def test_basic(self):
self.assertEqual(
self._basic([0, 1, 1.05, 1.1, 2]),
''.join([
'foo',
MeteredStream._erasure('foo'), 'bar',
MeteredStream._erasure('bar'), 'baz 2',
MeteredStream._erasure('baz 2'), 'done\n',
]),
)
def test_log_after_update(self):
self.assertEqual(
self._log_after_update(),
'foo' + MeteredStream._erasure('foo') + 'bar\n',
)
class VerboseTest(RegularTest):
isatty = False
verbose = True
def test_basic(self):
lines = self._basic([0, 1, 2.1, 13, 14.1234]).splitlines()
# We don't bother to match the hours and minutes of the timestamp since
# the local timezone can vary and we can't set that portably and easily.
self.assertTrue(re.match(r'\d\d:\d\d:00.000 8675 foo', lines[0]))
self.assertTrue(re.match(r'\d\d:\d\d:01.000 8675 bar', lines[1]))
self.assertTrue(re.match(r'\d\d:\d\d:13.000 8675 baz 2', lines[2]))
self.assertTrue(re.match(r'\d\d:\d\d:14.123 8675 done', lines[3]))
self.assertEqual(len(lines), 4)
def test_log_after_update(self):
lines = self._log_after_update().splitlines()
self.assertTrue(re.match(r'\d\d:\d\d:00.000 8675 foo', lines[0]))
# The second argument should have a real timestamp and pid, so we just check the format.
self.assertTrue(re.match(r'\d\d:\d\d:\d\d.\d\d\d \d+ bar', lines[1]))
self.assertEqual(len(lines), 2)
def test_log_args(self):
self.logger.info('foo %s %d', 'bar', 2)
self.assertEqual(len(self.stream.getvalue().splitlines()), 1)
self.assertTrue(self.stream.getvalue().endswith('foo bar 2\n'))
class VerboseWithOutTimestamp(RegularTest):
isatty = True
verbose = True
print_timestamps = False
def test_basic(self):
lines = self._basic([0, 1, 2.1, 13, 14.1234]).splitlines()
self.assertTrue(re.match('foo', lines[0]))
self.assertTrue(re.match('bar', lines[1]))
self.assertTrue(re.match('baz 2', lines[2]))
self.assertTrue(re.match('done', lines[3]))
self.assertEqual(len(lines), 4)