blob: 7f3d802558d078dd69538402a001a2d736b76296 [file]
#!/usr/bin/env python
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import itertools
import logging
import os
import sys
import tempfile
import unittest
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, ROOT_DIR)
from utils import subprocess42
# Disable pre-set unbuffered output to not interfere with the testing being done
# here. Otherwise everything would test with unbuffered; which is fine but
# that's not what we specifically want to test here.
ENV = os.environ.copy()
ENV.pop('PYTHONUNBUFFERED', None)
SCRIPT_OUT = (
'import signal, sys, time;\n'
'l = [];\n'
'def handler(signum, _):\n'
' l.append(signum);\n'
' sys.stdout.write(\'got signal %%d\\n\' %% signum);\n'
' sys.stdout.flush();\n'
'signal.signal(%s, handler);\n'
'sys.stdout.write(\'hi\\n\');\n'
'sys.stdout.flush();\n'
'while not l:\n'
' try:\n'
' time.sleep(0.01);\n'
' except IOError:\n'
' sys.stdout.write(\'ioerror\\n\');\n'
' sys.stdout.flush();\n'
'sys.stdout.write(\'bye\\n\');\n'
'sys.stdout.flush();\n') % (
'signal.SIGBREAK' if sys.platform == 'win32' else 'signal.SIGTERM')
SCRIPT_ERR = (
'import signal, sys, time;\n'
'l = [];\n'
'def handler(signum, _):\n'
' l.append(signum);\n'
' sys.stderr.write(\'got signal %%d\\n\' %% signum);\n'
' sys.stderr.flush();\n'
'signal.signal(%s, handler);\n'
'sys.stderr.write(\'hi\\n\');\n'
'sys.stderr.flush();\n'
'while not l:\n'
' try:\n'
' time.sleep(0.01);\n'
' except IOError:\n'
' sys.stderr.write(\'ioerror\\n\');\n'
' sys.stderr.flush();\n'
'sys.stderr.write(\'bye\\n\');\n'
'sys.stderr.flush();\n') % (
'signal.SIGBREAK' if sys.platform == 'win32' else 'signal.SIGTERM')
OUTPUT_SCRIPT = r"""
import re
import sys
import time
def main():
try:
for command in sys.argv[1:]:
if re.match(r'^[0-9\.]+$', command):
time.sleep(float(command))
continue
if command.startswith('out_'):
pipe = sys.stdout
elif command.startswith('err_'):
pipe = sys.stderr
else:
return 1
command = command[4:]
if command == 'print':
pipe.write('printing')
elif command == 'sleeping':
pipe.write('Sleeping.\n')
elif command == 'slept':
pipe.write('Slept.\n')
elif command == 'lf':
pipe.write('\n')
elif command == 'flush':
pipe.flush()
else:
return 1
return 0
except OSError:
return 0
if __name__ == '__main__':
sys.exit(main())
"""
def to_native_eol(string):
if string is None:
return string
if sys.platform == 'win32':
return string.replace('\n', '\r\n')
return string
def get_output_sleep_proc(flush, unbuffered, sleep_duration):
"""Returns process with universal_newlines=True that prints to stdout before
after a sleep.
It also optionally sys.stdout.flush() before the sleep and optionally enable
unbuffered output in python.
"""
command = [
'import sys,time',
'print(\'A\')',
]
if flush:
# Sadly, this doesn't work otherwise in some combination.
command.append('sys.stdout.flush()')
command.extend((
'time.sleep(%s)' % sleep_duration,
'print(\'B\')',
))
cmd = [sys.executable, '-c', ';'.join(command)]
if unbuffered:
cmd.append('-u')
return subprocess42.Popen(
cmd, env=ENV, stdout=subprocess42.PIPE, universal_newlines=True)
def get_output_sleep_proc_err(sleep_duration):
"""Returns process with universal_newlines=True that prints to stderr before
and after a sleep.
"""
command = [
'import sys,time',
'sys.stderr.write(\'A\\n\')',
]
command.extend((
'time.sleep(%s)' % sleep_duration,
'sys.stderr.write(\'B\\n\')',
))
cmd = [sys.executable, '-c', ';'.join(command)]
return subprocess42.Popen(
cmd, env=ENV, stderr=subprocess42.PIPE, universal_newlines=True)
class Subprocess42Test(unittest.TestCase):
def setUp(self):
self._output_script = None
super(Subprocess42Test, self).setUp()
def tearDown(self):
try:
if self._output_script:
os.remove(self._output_script)
finally:
super(Subprocess42Test, self).tearDown()
@property
def output_script(self):
if not self._output_script:
handle, self._output_script = tempfile.mkstemp(
prefix='subprocess42', suffix='.py')
os.write(handle, OUTPUT_SCRIPT)
os.close(handle)
return self._output_script
def test_communicate_timeout(self):
timedout = 1 if sys.platform == 'win32' else -9
# Format is:
# ( (cmd, stderr_pipe, timeout), (stdout, stderr, returncode) ), ...
# See OUTPUT script for the meaning of the commands.
test_data = [
# 0 means no timeout, like None.
(
(['out_sleeping', '0.001', 'out_slept', 'err_print'], None, 0),
('Sleeping.\nSlept.\n', None, 0),
),
(
(['err_print'], subprocess42.STDOUT, 0),
('printing', None, 0),
),
(
(['err_print'], subprocess42.PIPE, 0),
('', 'printing', 0),
),
# On a loaded system, this can be tight.
(
(['out_sleeping', 'out_flush', '60', 'out_slept'], None, 0.5),
('Sleeping.\n', None, timedout),
),
(
(
# Note that err_flush is necessary on Windows but not on the other
# OSes. This means the likelihood of missing stderr output from a
# killed child process on Windows is much higher than on other OSes.
[
'out_sleeping', 'out_flush', 'err_print', 'err_flush', '60',
'out_slept',
],
subprocess42.PIPE,
0.5),
('Sleeping.\n', 'printing', timedout),
),
(
(['out_sleeping', '0.001', 'out_slept'], None, 60),
('Sleeping.\nSlept.\n', None, 0),
),
]
for i, ((args, errpipe, timeout), expected) in enumerate(test_data):
proc = subprocess42.Popen(
[sys.executable, self.output_script] + args,
env=ENV,
stdout=subprocess42.PIPE,
stderr=errpipe)
try:
stdout, stderr = proc.communicate(timeout=timeout)
code = proc.returncode
except subprocess42.TimeoutExpired as e:
stdout = e.output
stderr = e.stderr
self.assertTrue(proc.kill())
code = proc.wait()
finally:
duration = proc.duration()
expected_duration = 0.0001 if not timeout or timeout == 60 else timeout
self.assertTrue(duration >= expected_duration, (i, expected_duration))
self.assertEqual(
(i, stdout, stderr, code),
(i,
to_native_eol(expected[0]),
to_native_eol(expected[1]),
expected[2]))
# Try again with universal_newlines=True.
proc = subprocess42.Popen(
[sys.executable, self.output_script] + args,
env=ENV,
stdout=subprocess42.PIPE,
stderr=errpipe,
universal_newlines=True)
try:
stdout, stderr = proc.communicate(timeout=timeout)
code = proc.returncode
except subprocess42.TimeoutExpired as e:
stdout = e.output
stderr = e.stderr
self.assertTrue(proc.kill())
code = proc.wait()
finally:
duration = proc.duration()
self.assertTrue(duration >= expected_duration, (i, expected_duration))
self.assertEqual(
(i, stdout, stderr, code),
(i,) + expected)
def test_communicate_input(self):
cmd = [
sys.executable, '-u', '-c',
'import sys; sys.stdout.write(sys.stdin.read(5))',
]
proc = subprocess42.Popen(
cmd, stdin=subprocess42.PIPE, stdout=subprocess42.PIPE)
out, err = proc.communicate(input='12345')
self.assertEqual('12345', out)
self.assertEqual(None, err)
def test_communicate_input_timeout(self):
cmd = [sys.executable, '-u', '-c', 'import time; time.sleep(60)']
proc = subprocess42.Popen(cmd, stdin=subprocess42.PIPE)
try:
proc.communicate(input='12345', timeout=0.5)
self.fail()
except subprocess42.TimeoutExpired as e:
self.assertEqual(None, e.output)
self.assertEqual(None, e.stderr)
self.assertTrue(proc.kill())
proc.wait()
self.assertLessEqual(0.5, proc.duration())
def test_communicate_input_stdout_timeout(self):
cmd = [
sys.executable, '-u', '-c',
'import sys, time; sys.stdout.write(sys.stdin.read(5)); time.sleep(60)',
]
proc = subprocess42.Popen(
cmd, stdin=subprocess42.PIPE, stdout=subprocess42.PIPE)
try:
proc.communicate(input='12345', timeout=0.5)
self.fail()
except subprocess42.TimeoutExpired as e:
self.assertEqual('12345', e.output)
self.assertEqual(None, e.stderr)
self.assertTrue(proc.kill())
proc.wait()
self.assertLessEqual(0.5, proc.duration())
def test_communicate_timeout_no_pipe(self):
# In this case, it's effectively a wait() call.
cmd = [sys.executable, '-u', '-c', 'import time; time.sleep(60)']
proc = subprocess42.Popen(cmd)
try:
proc.communicate(timeout=0.5)
self.fail()
except subprocess42.TimeoutExpired as e:
self.assertEqual(None, e.output)
self.assertEqual(None, e.stderr)
self.assertTrue(proc.kill())
proc.wait()
self.assertLessEqual(0.5, proc.duration())
def test_call(self):
cmd = [sys.executable, '-u', '-c', 'import sys; sys.exit(0)']
self.assertEqual(0, subprocess42.call(cmd))
cmd = [sys.executable, '-u', '-c', 'import sys; sys.exit(1)']
self.assertEqual(1, subprocess42.call(cmd))
def test_check_call(self):
cmd = [sys.executable, '-u', '-c', 'import sys; sys.exit(0)']
self.assertEqual(0, subprocess42.check_call(cmd))
cmd = [sys.executable, '-u', '-c', 'import sys; sys.exit(1)']
try:
self.assertEqual(1, subprocess42.check_call(cmd))
self.fail()
except subprocess42.CalledProcessError as e:
self.assertEqual(None, e.output)
def test_check_output(self):
cmd = [sys.executable, '-u', '-c', 'print(\'.\')']
self.assertEqual(
'.\n',
subprocess42.check_output(cmd, universal_newlines=True))
cmd = [sys.executable, '-u', '-c', 'import sys; print(\'.\'); sys.exit(1)']
try:
subprocess42.check_output(cmd, universal_newlines=True)
self.fail()
except subprocess42.CalledProcessError as e:
self.assertEqual('.\n', e.output)
def test_recv_any(self):
# Test all pipe direction and output scenarios.
combinations = [
{
'cmd': ['out_print', 'err_print'],
'stdout': None,
'stderr': None,
'expected': {},
},
{
'cmd': ['out_print', 'err_print'],
'stdout': None,
'stderr': subprocess42.STDOUT,
'expected': {},
},
{
'cmd': ['out_print'],
'stdout': subprocess42.PIPE,
'stderr': subprocess42.PIPE,
'expected': {'stdout': 'printing'},
},
{
'cmd': ['out_print'],
'stdout': subprocess42.PIPE,
'stderr': None,
'expected': {'stdout': 'printing'},
},
{
'cmd': ['out_print'],
'stdout': subprocess42.PIPE,
'stderr': subprocess42.STDOUT,
'expected': {'stdout': 'printing'},
},
{
'cmd': ['err_print'],
'stdout': subprocess42.PIPE,
'stderr': subprocess42.PIPE,
'expected': {'stderr': 'printing'},
},
{
'cmd': ['err_print'],
'stdout': None,
'stderr': subprocess42.PIPE,
'expected': {'stderr': 'printing'},
},
{
'cmd': ['err_print'],
'stdout': subprocess42.PIPE,
'stderr': subprocess42.STDOUT,
'expected': {'stdout': 'printing'},
},
{
'cmd': ['out_print', 'err_print'],
'stdout': subprocess42.PIPE,
'stderr': subprocess42.PIPE,
'expected': {'stderr': 'printing', 'stdout': 'printing'},
},
{
'cmd': ['out_print', 'err_print'],
'stdout': subprocess42.PIPE,
'stderr': subprocess42.STDOUT,
'expected': {'stdout': 'printingprinting'},
},
]
for i, testcase in enumerate(combinations):
cmd = [sys.executable, self.output_script] + testcase['cmd']
p = subprocess42.Popen(
cmd, env=ENV, stdout=testcase['stdout'], stderr=testcase['stderr'])
actual = {}
while p.poll() is None:
pipe, data = p.recv_any()
if data:
actual.setdefault(pipe, '')
actual[pipe] += data
# The process exited, read any remaining data in the pipes.
while True:
pipe, data = p.recv_any()
if pipe is None:
break
actual.setdefault(pipe, '')
actual[pipe] += data
self.assertEqual(
testcase['expected'],
actual,
(i, testcase['cmd'], testcase['expected'], actual))
self.assertEqual((None, None), p.recv_any())
self.assertEqual(0, p.returncode)
def test_recv_any_different_buffering(self):
# Specifically test all buffering scenarios.
for flush, unbuffered in itertools.product([True, False], [True, False]):
actual = ''
proc = get_output_sleep_proc(flush, unbuffered, 0.5)
while True:
p, data = proc.recv_any()
if not p:
break
self.assertEqual('stdout', p)
self.assertTrue(data, (p, data))
actual += data
self.assertEqual('A\nB\n', actual)
# Contrary to yield_any() or recv_any(0), wait() needs to be used here.
proc.wait()
self.assertEqual(0, proc.returncode)
def test_recv_any_timeout_0(self):
# rec_any() is expected to timeout and return None with no data pending at
# least once, due to the sleep of 'duration' and the use of timeout=0.
for flush, unbuffered in itertools.product([True, False], [True, False]):
for duration in (0.05, 0.1, 0.5, 2):
try:
actual = ''
proc = get_output_sleep_proc(flush, unbuffered, duration)
try:
got_none = False
while True:
p, data = proc.recv_any(timeout=0)
if not p:
if proc.poll() is None:
got_none = True
continue
break
self.assertEqual('stdout', p)
self.assertTrue(data, (p, data))
actual += data
self.assertEqual('A\nB\n', actual)
self.assertEqual(0, proc.returncode)
self.assertEqual(True, got_none)
break
finally:
proc.kill()
proc.wait()
except AssertionError:
if duration != 2:
print('Sleeping rocks. Trying slower.')
continue
raise
def test_yield_any_no_timeout(self):
for duration in (0.05, 0.1, 0.5, 2):
try:
proc = get_output_sleep_proc(True, True, duration)
try:
expected = [
'A\n',
'B\n',
]
for p, data in proc.yield_any():
self.assertEqual('stdout', p)
self.assertEqual(expected.pop(0), data)
self.assertEqual(0, proc.returncode)
self.assertEqual([], expected)
break
finally:
proc.kill()
proc.wait()
except AssertionError:
if duration != 2:
print('Sleeping rocks. Trying slower.')
continue
raise
def test_yield_any_timeout_0(self):
# rec_any() is expected to timeout and return None with no data pending at
# least once, due to the sleep of 'duration' and the use of timeout=0.
for duration in (0.05, 0.1, 0.5, 2):
try:
proc = get_output_sleep_proc(True, True, duration)
try:
expected = [
'A\n',
'B\n',
]
got_none = False
for p, data in proc.yield_any(timeout=0):
if not p:
got_none = True
continue
self.assertEqual('stdout', p)
self.assertEqual(expected.pop(0), data)
self.assertEqual(0, proc.returncode)
self.assertEqual([], expected)
self.assertEqual(True, got_none)
break
finally:
proc.kill()
proc.wait()
except AssertionError:
if duration != 2:
print('Sleeping rocks. Trying slower.')
continue
raise
def test_yield_any_timeout_0_called(self):
# rec_any() is expected to timeout and return None with no data pending at
# least once, due to the sleep of 'duration' and the use of timeout=0.
for duration in (0.05, 0.1, 0.5, 2):
try:
proc = get_output_sleep_proc(True, True, duration)
try:
expected = [
'A\n',
'B\n',
]
got_none = False
called = []
def timeout():
called.append(0)
return 0
for p, data in proc.yield_any(timeout=timeout):
if not p:
got_none = True
continue
self.assertEqual('stdout', p)
self.assertEqual(expected.pop(0), data)
self.assertEqual(0, proc.returncode)
self.assertEqual([], expected)
self.assertEqual(True, got_none)
self.assertTrue(called)
break
finally:
proc.kill()
proc.wait()
except AssertionError:
if duration != 2:
print('Sleeping rocks. Trying slower.')
continue
raise
def test_yield_any_returncode(self):
proc = subprocess42.Popen(
[sys.executable, '-c', 'import sys;sys.stdout.write("yo");sys.exit(1)'],
stdout=subprocess42.PIPE)
for p, d in proc.yield_any():
self.assertEqual('stdout', p)
self.assertEqual('yo', d)
# There was a bug where the second call to wait() would overwrite
# proc.returncode with 0 when timeout is not None.
self.assertEqual(1, proc.wait())
self.assertEqual(1, proc.wait(timeout=0))
self.assertEqual(1, proc.poll())
self.assertEqual(1, proc.returncode)
# On Windows, the clock resolution is 15ms so Popen.duration() will likely
# be 0.
self.assertLessEqual(0, proc.duration())
def _wait_for_hi(self, proc, err):
actual = ''
while True:
if err:
data = proc.recv_err(timeout=5)
else:
data = proc.recv_out(timeout=5)
if not data:
self.fail('%r' % actual)
self.assertTrue(data)
actual += data
if actual in ('hi\n', 'hi\r\n'):
break
def _proc(self, err, **kwargs):
# Do not use the -u flag here, we want to test when it is buffered by
# default. See reference above about PYTHONUNBUFFERED.
# That's why the two scripts uses .flush(). Sadly, the flush() call is
# needed on Windows even for sys.stderr (!)
cmd = [sys.executable, '-c', SCRIPT_ERR if err else SCRIPT_OUT]
# TODO(maruel): Make universal_newlines=True work and not hang.
if err:
kwargs['stderr'] = subprocess42.PIPE
else:
kwargs['stdout'] = subprocess42.PIPE
return subprocess42.Popen(cmd, **kwargs)
def test_detached(self):
self._test_detached(False)
self._test_detached(True)
def _test_detached(self, err):
is_win = (sys.platform == 'win32')
key = 'stderr' if err else 'stdout'
proc = self._proc(err, detached=True)
try:
self._wait_for_hi(proc, err)
proc.terminate()
if is_win:
# What happens on Windows is that the process is immediately killed
# after handling SIGBREAK.
self.assertEqual(0, proc.wait())
# Windows...
self.assertIn(
proc.recv_any(),
(
(key, 'got signal 21\r\nioerror\r\nbye\r\n'),
(key, 'got signal 21\nioerror\nbye\n'),
(key, 'got signal 21\r\nbye\r\n'),
(key, 'got signal 21\nbye\n'),
))
else:
self.assertEqual(0, proc.wait())
self.assertEqual((key, 'got signal 15\nbye\n'), proc.recv_any())
finally:
# In case the test fails.
proc.kill()
proc.wait()
def test_attached(self):
self._test_attached(False)
self._test_attached(True)
def _test_attached(self, err):
is_win = (sys.platform == 'win32')
key = 'stderr' if err else 'stdout'
proc = self._proc(err, detached=False)
try:
self._wait_for_hi(proc, err)
proc.terminate()
if is_win:
# If attached, it's hard killed.
self.assertEqual(1, proc.wait())
self.assertEqual((None, None), proc.recv_any())
else:
self.assertEqual(0, proc.wait())
self.assertEqual((key, 'got signal 15\nbye\n'), proc.recv_any())
finally:
# In case the test fails.
proc.kill()
proc.wait()
def test_split(self):
data = [
('stdout', 'o1\no2\no3\n'),
('stderr', 'e1\ne2\ne3\n'),
('stdout', '\n\n'),
('stdout', '\n'),
('stdout', 'o4\no5'),
('stdout', '_sameline\npart1 of one line '),
('stderr', 'err inserted between two parts of stdout\n'),
('stdout', 'part2 of one line\n'),
('stdout', 'incomplete last stdout'),
('stderr', 'incomplete last stderr'),
]
self.assertEquals(list(subprocess42.split(data)), [
('stdout', 'o1'),
('stdout', 'o2'),
('stdout', 'o3'),
('stderr', 'e1'),
('stderr', 'e2'),
('stderr', 'e3'),
('stdout', ''),
('stdout', ''),
('stdout', ''),
('stdout', 'o4'),
('stdout', 'o5_sameline'),
('stderr', 'err inserted between two parts of stdout'),
('stdout', 'part1 of one line part2 of one line'),
('stderr', 'incomplete last stderr'),
('stdout', 'incomplete last stdout'),
])
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
unittest.main()