blob: 42366dc6aad9885a51b397695fe25196f61e4565 [file] [log] [blame]
#!/usr/bin/env vpython
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import test_env
import os
import sys
import unittest
import Queue
import StringIO
from tools import annotee_indenter
class AnnoteeNesterTest(unittest.TestCase):
def _indent_test(self, data_in, data_out_expected, base_level=2):
# left-strip from provided lines so test code is easier to read.
input_lines = [x.lstrip() for x in data_in.splitlines(True)]
output_lines_expected = [x.lstrip() for x in
data_out_expected.splitlines(True)]
output_stream = StringIO.StringIO()
annotee_indenter.indent(base_level, input_lines, output_stream)
output_lines = output_stream.getvalue().splitlines(True)
self.assertListEqual(output_lines, output_lines_expected)
def test_basic(self):
self._indent_test("""@@@STEP_STARTED@@@
@@@STEP_CLOSED@@@""",
"""@@@STEP_STARTED@@@
@@@STEP_NEST_LEVEL@2@@@
@@@STEP_CLOSED@@@""",
base_level=2)
def test_already_indented(self):
self._indent_test("""@@@STEP_STARTED@@@
@@@STEP_NEST_LEVEL@1@@@
@@@STEP_CLOSED@@@""",
"""@@@STEP_STARTED@@@
@@@STEP_NEST_LEVEL@2@@@
@@@STEP_NEST_LEVEL@3@@@
@@@STEP_CLOSED@@@""",
base_level=2)
def test_kind_of_nested(self):
self._indent_test("""@@@STEP_STARTED@@@
@@@STEP_STARTED@@@
@@@STEP_CLOSED@@@
@@@STEP_CLOSED@@@""",
"""@@@STEP_STARTED@@@
@@@STEP_NEST_LEVEL@2@@@
@@@STEP_STARTED@@@
@@@STEP_NEST_LEVEL@2@@@
@@@STEP_CLOSED@@@
@@@STEP_CLOSED@@@""",
base_level=2)
def test_line_generator_basic(self):
q = Queue.Queue()
for chunk in ['1st\n', '2nd\n3', 'r', 'd', '\n4t', 'h', '\n', '5', 'th']:
q.put(('stdout', chunk))
q.put(('stderr', chunk))
q.put(annotee_indenter.QUEUE_TERMINATE_ITEM)
stderr_stream = StringIO.StringIO()
res = list(annotee_indenter.line_generator(q, stderr_stream))
expected = ['1st\n', '2nd\n', '3rd\n', '4th\n', '5th']
self.assertEqual(res, expected)
self.assertEqual(stderr_stream.getvalue().splitlines(True), expected)
def _communicate(self, args, cwd):
"""Return [stdout lines as list, stderr as string, return_code]"""
res = []
def target(q, base_level):
stderr_stream = StringIO.StringIO()
res.append(list(annotee_indenter.line_generator(q, stderr_stream)))
res.append(stderr_stream.getvalue())
ret_code = annotee_indenter.run(args, cwd, base_level=1, target=target)
self.assertEqual(len(res), 2)
out_list, err = res # pylint: disable=W0632
return out_list, err, ret_code
def _communicate_test(self, name, ret_code):
# Run ./output_generator.py with given name and ret_code.
return self._communicate(
[sys.executable, 'output_generator.py', name, str(ret_code)],
cwd=os.path.dirname(__file__))
def test_run_stdout(self):
out, err, ret = self._communicate_test('simple_out', 0)
self.assertEqual(err, '')
self.assertEqual(out, ['simple' + os.linesep])
self.assertEqual(ret, 0)
def test_run_stderr(self):
out, err, ret = self._communicate_test('simple_err', 2)
self.assertEqual(out, [])
self.assertEqual(err, 'simple' + os.linesep)
self.assertEqual(ret, 2)
def test_run_both(self):
out, err, ret = self._communicate_test('simple_both', 0)
self.assertEqual(out, ['simple' + os.linesep])
self.assertEqual(err, 'simple' + os.linesep)
self.assertEqual(ret, 0)
def test_run_both_x_100000(self):
out, err, ret = self._communicate_test('both_x_100000', 0)
self.assertEqual(out, ['simple' + os.linesep] * 100000)
self.assertEqual(err, ('simple' + os.linesep) * 100000)
self.assertEqual(ret, 0)
def test_run_overload_buffers(self):
self.maxDiff = 1000000
out, err, ret = self._communicate_test('overload_buffers', 0)
s = 'x' * 99 + os.linesep
self.assertEqual(out, [s] * (10 * 1000))
self.assertEqual(err, s * (10 * 1000))
self.assertEqual(ret, 0)
if __name__ == '__main__':
unittest.main()