blob: d19448e8b7001b78ed50f1f01a9dd0968d906437 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests for the cmd_helper module."""
import unittest
import subprocess
import sys
import time
from devil import devil_env
from devil.utils import cmd_helper
with devil_env.SysPath(devil_env.PYMOCK_PATH):
import mock # pylint: disable=import-error
class CmdHelperSingleQuoteTest(unittest.TestCase):
def testSingleQuote_basic(self):
self.assertEqual('hello', cmd_helper.SingleQuote('hello'))
def testSingleQuote_withSpaces(self):
self.assertEqual("'hello world'", cmd_helper.SingleQuote('hello world'))
def testSingleQuote_withUnsafeChars(self):
self.assertEqual("""'hello'"'"'; rm -rf /'""",
cmd_helper.SingleQuote("hello'; rm -rf /"))
def testSingleQuote_dontExpand(self):
test_string = 'hello $TEST_VAR'
cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string)
self.assertEqual(test_string,
cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
class CmdHelperGetCmdStatusAndOutputTest(unittest.TestCase):
def testGetCmdStatusAndOutput_success(self):
cmd = 'echo "Hello World"'
status, output = cmd_helper.GetCmdStatusAndOutput(cmd, shell=True)
self.assertEqual(status, 0)
self.assertEqual(output.rstrip(), "Hello World")
def testGetCmdStatusAndOutput_unicode(self):
# pylint: disable=no-self-use
cmd = 'echo "\x80\x31Hello World\n"'
cmd_helper.GetCmdStatusAndOutput(cmd, shell=True)
class CmdHelperDoubleQuoteTest(unittest.TestCase):
def testDoubleQuote_basic(self):
self.assertEqual('hello', cmd_helper.DoubleQuote('hello'))
def testDoubleQuote_withSpaces(self):
self.assertEqual('"hello world"', cmd_helper.DoubleQuote('hello world'))
def testDoubleQuote_withUnsafeChars(self):
self.assertEqual('''"hello\\"; rm -rf /"''',
cmd_helper.DoubleQuote('hello"; rm -rf /'))
def testSingleQuote_doExpand(self):
test_string = 'hello $TEST_VAR'
cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string)
self.assertEqual('hello world',
cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
class CmdHelperShinkToSnippetTest(unittest.TestCase):
def testShrinkToSnippet_noArgs(self):
self.assertEqual('foo', cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar'))
self.assertEqual("'foo foo'",
cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar'))
self.assertEqual('"$a"\' bar\'',
cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo'))
self.assertEqual('\'foo \'"$a"',
cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar'))
self.assertEqual('foo"$a"',
cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar'))
def testShrinkToSnippet_singleArg(self):
self.assertEqual("foo ''",
cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar'))
self.assertEqual("foo foo",
cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar'))
self.assertEqual('"$a" "$a"',
cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo'))
self.assertEqual('foo "$a""$a"',
cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar'))
self.assertEqual('foo "$a"\' \'"$a"',
cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar'))
self.assertEqual('foo "$a""$a"\' \'',
cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar'))
self.assertEqual(
'foo \' \'"$a""$a"\' \'',
cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
_DEFAULT = 'DEFAULT'
class _ProcessOutputEvent(object):
def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT):
self.select_fds = select_fds
self.read_contents = read_contents
self.ts = ts
class _MockProcess(object):
def __init__(self, output_sequence=None, return_value=0):
# Arbitrary.
fake_stdout_fileno = 25
self.mock_proc = mock.MagicMock(spec=subprocess.Popen)
self.mock_proc.stdout = mock.MagicMock()
self.mock_proc.stdout.fileno = mock.MagicMock(
return_value=fake_stdout_fileno)
self.mock_proc.returncode = None
self._return_value = return_value
# This links the behavior of os.read, select.select, time.time, and
# <process>.poll. The output sequence can be thought of as a list of
# return values for select.select with corresponding return values for
# the other calls at any time between that select call and the following
# one. We iterate through the sequence only on calls to select.select.
#
# os.read is a special case, though, where we only return a given chunk
# of data *once* after a given call to select.
if not output_sequence:
output_sequence = []
# Use an leading element to make the iteration logic work.
initial_seq_element = _ProcessOutputEvent(
_DEFAULT, '', output_sequence[0].ts if output_sequence else _DEFAULT)
output_sequence.insert(0, initial_seq_element)
for o in output_sequence:
if o.select_fds == _DEFAULT:
if o.read_contents is None:
o.select_fds = []
else:
o.select_fds = [fake_stdout_fileno]
if o.ts == _DEFAULT:
o.ts = time.time()
self._output_sequence = output_sequence
self._output_seq_index = 0
self._read_flags = [False] * len(output_sequence)
def read_side_effect(*_args, **_kwargs):
if self._read_flags[self._output_seq_index]:
return None
self._read_flags[self._output_seq_index] = True
return self._output_sequence[self._output_seq_index].read_contents
def select_side_effect(*_args, **_kwargs):
if self._output_seq_index is None:
self._output_seq_index = 0
else:
self._output_seq_index += 1
if self._output_seq_index < len(self._output_sequence):
return (self._output_sequence[self._output_seq_index].select_fds, None,
None)
return ([], None, None)
def time_side_effect(*_args, **_kwargs):
return self._output_sequence[self._output_seq_index].ts
def poll_side_effect(*_args, **_kwargs):
if self._output_seq_index >= len(self._output_sequence) - 1:
self.mock_proc.returncode = self._return_value
return self.mock_proc.returncode
mock_read = mock.MagicMock(side_effect=read_side_effect)
mock_select = mock.MagicMock(side_effect=select_side_effect)
mock_time = mock.MagicMock(side_effect=time_side_effect)
self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect)
# Set up but *do not start* the mocks.
self._mocks = [
mock.patch('os.read', new=mock_read),
mock.patch('select.select', new=mock_select),
mock.patch('time.time', new=mock_time),
]
if sys.platform != 'win32':
self._mocks.append(mock.patch('fcntl.fcntl'))
def __enter__(self):
for m in self._mocks:
m.__enter__()
return self.mock_proc
def __exit__(self, exc_type, exc_val, exc_tb):
for m in reversed(self._mocks):
m.__exit__(exc_type, exc_val, exc_tb)
class CmdHelperIterCmdOutputLinesTest(unittest.TestCase):
"""Test IterCmdOutputLines with some calls to the unix 'seq' command."""
# This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
# can mock the process.
# pylint: disable=protected-access
_SIMPLE_OUTPUT_SEQUENCE = [
_ProcessOutputEvent(read_contents=b'1\n2\n'),
]
def testIterCmdOutputLines_success(self):
with _MockProcess(
output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc:
for num, line in enumerate(
cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
self.assertEqual(num, int(line))
def testIterCmdOutputLines_unicode(self):
output_sequence = [
_ProcessOutputEvent(read_contents=b'\x80\x31\nHello\n\xE2\x98\xA0')
]
with _MockProcess(output_sequence=output_sequence) as mock_proc:
lines = list(cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'))
self.assertEqual(lines[1], "Hello")
def testIterCmdOutputLines_exitStatusFail(self):
with self.assertRaises(subprocess.CalledProcessError):
with _MockProcess(
output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
return_value=1) as mock_proc:
for num, line in enumerate(
cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
self.assertEqual(num, int(line))
# after reading all the output we get an exit status of 1
def testIterCmdOutputLines_exitStatusIgnored(self):
with _MockProcess(
output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
return_value=1) as mock_proc:
for num, line in enumerate(
cmd_helper._IterCmdOutputLines(
mock_proc, 'mock_proc', check_status=False), 1):
self.assertEqual(num, int(line))
def testIterCmdOutputLines_exitStatusSkipped(self):
with _MockProcess(
output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
return_value=1) as mock_proc:
for num, line in enumerate(
cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
self.assertEqual(num, int(line))
# no exception will be raised because we don't attempt to read past
# the end of the output and, thus, the status never gets checked
if num == 2:
break
def testIterCmdOutputLines_delay(self):
output_sequence = [
_ProcessOutputEvent(read_contents=b'1\n2\n', ts=1),
_ProcessOutputEvent(read_contents=None, ts=2),
_ProcessOutputEvent(read_contents=b'Awake', ts=10),
]
with _MockProcess(output_sequence=output_sequence) as mock_proc:
for num, line in enumerate(
cmd_helper._IterCmdOutputLines(
mock_proc, 'mock_proc', iter_timeout=5), 1):
if num <= 2:
self.assertEqual(num, int(line))
elif num == 3:
self.assertEqual(None, line)
elif num == 4:
self.assertEqual('Awake', line)
else:
self.fail()
if __name__ == '__main__':
unittest.main()