| #!/usr/bin/python |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| |
| import cStringIO |
| import logging |
| import mox |
| import os |
| import subprocess |
| import sys |
| import time |
| import unittest |
| from logging import handlers |
| |
| import factory_common # pylint: disable=W0611 |
| from cros.factory.utils import file_utils |
| from cros.factory.utils import process_utils |
| from cros.factory.utils.process_utils import CheckOutput, PIPE, Spawn |
| from cros.factory.utils.process_utils import SpawnOutput |
| from cros.factory.utils.process_utils import TerminateOrKillProcess |
| |
| |
| class SpawnTest(unittest.TestCase): |
| |
| def setUp(self): |
| log_entries = self.log_entries = [] |
| |
| class Target(object): |
| |
| def handle(self, record): |
| log_entries.append((record.levelname, record.msg % record.args)) |
| |
| self.handler = handlers.MemoryHandler(capacity=0, target=Target()) |
| logging.getLogger().addHandler(self.handler) |
| |
| process_utils.dev_null = None |
| |
| def tearDown(self): |
| logging.getLogger().removeHandler(self.handler) |
| |
| def testNoShell(self): |
| process = Spawn(['echo', 'f<o>o'], |
| stdout=PIPE, stderr=PIPE, |
| log=True) |
| stdout, stderr = process.communicate() |
| self.assertEquals('f<o>o\n', stdout) |
| self.assertEquals('', stderr) |
| self.assertEquals(0, process.returncode) |
| self.assertEquals([('INFO', |
| '''Running command: "echo \'f<o>o\'"''')], |
| self.log_entries) |
| |
| def testShell(self): |
| process = Spawn('echo foo', shell=True, |
| stdout=PIPE, stderr=PIPE, log=True) |
| stdout, stderr = process.communicate() |
| self.assertEquals('foo\n', stdout) |
| self.assertEquals('', stderr) |
| self.assertEquals(0, process.returncode) |
| self.assertEquals([('INFO', 'Running command: "echo foo"')], |
| self.log_entries) |
| |
| def testCall(self): |
| process = Spawn('echo blah; exit 3', shell=True, call=True) |
| self.assertEquals(3, process.returncode) |
| # stdout/stderr are not trapped |
| self.assertEquals(None, process.stdout) |
| self.assertEquals(None, process.stdout_data) |
| |
| # Would cause a bad buffering situation. |
| self.assertRaises(ValueError, |
| lambda: Spawn('echo', call=True, stdout=PIPE)) |
| self.assertRaises(ValueError, |
| lambda: Spawn('echo', call=True, stderr=PIPE)) |
| |
| def testCheckCall(self): |
| Spawn('exit 0', shell=True, check_call=True) |
| self.assertRaises(subprocess.CalledProcessError, |
| lambda: Spawn('exit 3', shell=True, check_call=True)) |
| |
| self.assertFalse(self.log_entries) |
| self.assertRaises( |
| subprocess.CalledProcessError, |
| lambda: Spawn('exit 3', shell=True, check_call=True, log=True)) |
| self.assertEquals([('INFO', 'Running command: "exit 3"'), |
| ('ERROR', 'Exit code 3 from command: "exit 3"')], |
| self.log_entries) |
| |
| def testCheckCallFunction(self): |
| Spawn('exit 3', shell=True, check_call=lambda code: code == 3) |
| self.assertRaises( |
| subprocess.CalledProcessError, |
| lambda: Spawn('exit 2', shell=True, |
| check_call=lambda code: code == 3)) |
| |
| def testCheckOutput(self): |
| self.assertEquals( |
| 'foo\n', |
| Spawn('echo foo', shell=True, check_output=True).stdout_data) |
| self.assertRaises(subprocess.CalledProcessError, |
| lambda: Spawn('exit 3', shell=True, check_output=True)) |
| |
| def testReadStdout(self): |
| process = Spawn('echo foo; echo bar; exit 3', shell=True, read_stdout=True) |
| self.assertEquals('foo\nbar\n', process.stdout_data) |
| self.assertEquals(['foo\n', 'bar\n'], process.stdout_lines()) |
| self.assertEquals(['foo', 'bar'], process.stdout_lines(strip=True)) |
| self.assertEquals(None, process.stderr_data) |
| self.assertEquals(3, process.returncode) |
| |
| def testReadStderr(self): |
| process = Spawn('(echo bar; echo foo) >& 2', shell=True, read_stderr=True) |
| self.assertEquals(None, process.stdout_data) |
| self.assertEquals('bar\nfoo\n', process.stderr_data) |
| self.assertEquals(['bar\n', 'foo\n'], process.stderr_lines()) |
| self.assertEquals(0, process.returncode) |
| |
| def testReadStdoutAndStderr(self): |
| process = Spawn('echo foo; echo bar >& 2', shell=True, |
| read_stdout=True, read_stderr=True) |
| self.assertEquals('foo\n', process.stdout_data) |
| self.assertEquals('bar\n', process.stderr_data) |
| self.assertEquals(('foo\n', 'bar\n'), process.communicate()) |
| self.assertEquals(0, process.returncode) |
| |
| def testLogStderrOnError(self): |
| Spawn('echo foo >& 2', shell=True, log_stderr_on_error=True) |
| self.assertFalse(self.log_entries) |
| |
| Spawn('echo foo >& 2; exit 3', shell=True, log_stderr_on_error=True) |
| self.assertEquals( |
| [('ERROR', |
| 'Exit code 3 from command: "echo foo >& 2; exit 3"; ' |
| 'stderr: """\nfoo\n\n"""')], |
| self.log_entries) |
| |
| def testIgnoreStdout(self): |
| self.assertFalse(process_utils.dev_null) |
| process = Spawn('echo ignored; echo foo >& 2', shell=True, |
| ignore_stdout=True, read_stderr=True) |
| self.assertTrue(process_utils.dev_null) |
| self.assertEquals('foo\n', process.stderr_data) |
| |
| def testIgnoreStderr(self): |
| self.assertFalse(process_utils.dev_null) |
| process = Spawn('echo foo; echo ignored >& 2', shell=True, |
| read_stdout=True, ignore_stderr=True) |
| self.assertTrue(process_utils.dev_null) |
| self.assertEquals('foo\n', process.stdout_data) |
| |
| def testOpenDevNull(self): |
| self.assertFalse(process_utils.dev_null) |
| dev_null = process_utils.OpenDevNull() |
| self.assertEquals(os.devnull, dev_null.name) |
| self.assertEquals(dev_null, process_utils.OpenDevNull()) |
| |
| |
| _CMD_FOO_SUCCESS = 'echo foo; exit 0' |
| _CMD_FOO_FAILED = 'echo foo; exit 1' |
| |
| |
| class CheckOutputTest(unittest.TestCase): |
| |
| def testCheckOutput(self): |
| self.assertEquals('foo\n', CheckOutput(_CMD_FOO_SUCCESS, shell=True)) |
| self.assertRaises(subprocess.CalledProcessError, |
| lambda: CheckOutput(_CMD_FOO_FAILED, shell=True)) |
| |
| |
| class SpawnOutputTest(unittest.TestCase): |
| |
| def testSpawnOutput(self): |
| self.assertEquals('foo\n', SpawnOutput(_CMD_FOO_SUCCESS, shell=True)) |
| self.assertEquals('foo\n', SpawnOutput(_CMD_FOO_FAILED, shell=True)) |
| self.assertRaises(subprocess.CalledProcessError, |
| lambda: SpawnOutput(_CMD_FOO_FAILED, shell=True, |
| check_output=True)) |
| |
| |
| class TerminateOrKillProcessTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.m = mox.Mox() |
| self.m.StubOutWithMock(logging, 'info') |
| |
| def tearDown(self): |
| self.m.UnsetStubs() |
| |
| def testTerminateProcess(self): |
| process = Spawn(['sleep', '10']) |
| logging.info('Stopping process %d', process.pid) |
| logging.info('Process %d stopped', process.pid) |
| self.m.ReplayAll() |
| TerminateOrKillProcess(process, 2) |
| self.m.VerifyAll() |
| |
| def testKillProcess(self): |
| process = Spawn('trap true SIGTERM SIGKILL; sleep 10', shell=True) |
| # Allow the process some time to execute and setup signal trap. |
| time.sleep(1) |
| logging.info('Stopping process %d', process.pid) |
| logging.info('Sending SIGKILL to process %d', process.pid) |
| logging.info('Process %d stopped', process.pid) |
| self.m.ReplayAll() |
| TerminateOrKillProcess(process, 2) |
| self.m.VerifyAll() |
| |
| def testTerminateSudoProcess(self): |
| process = Spawn(['sleep', '10'], sudo=True) |
| logging.info('Stopping process %d', process.pid) |
| spawn_msg = 'Running command: "kill %d"' % process.pid |
| logging.info(spawn_msg) |
| self.m.ReplayAll() |
| TerminateOrKillProcess(process, sudo=True) |
| self.m.VerifyAll() |
| |
| |
| class TestSpawnTee(unittest.TestCase): |
| |
| def runTest(self): |
| with file_utils.UnopenedTemporaryFile() as stdout, \ |
| file_utils.UnopenedTemporaryFile() as tee_file: |
| # Call SpawnTee which should write to both stdout and tee_file. |
| with open(stdout, 'w') as f: |
| process_utils.SpawnTee(['ls', '/bin/sh'], stdout=f, |
| output_file=tee_file, check_call=True) |
| # Make sure the contents in stdout and tee_file are correct. |
| with open(stdout) as f: |
| self.assertEquals('/bin/sh\n', f.read()) |
| with open(tee_file) as f: |
| self.assertEquals('/bin/sh\n', f.read()) |
| |
| |
| class TestRedirectStdout(unittest.TestCase): |
| def setUp(self): |
| self.saved_stdout = sys.stdout |
| self.mock_stdout = cStringIO.StringIO() |
| sys.stdout = self.mock_stdout |
| |
| def tearDown(self): |
| sys.stdout = self.saved_stdout |
| |
| def testRedirectStdout(self): |
| print 'before' |
| dummy_file = process_utils.DummyFile() |
| with process_utils.RedirectStandardStreams(stdout=dummy_file): |
| print 'SHOULD_NOT_OUTPUT' |
| print 'after' |
| self.assertEquals('before\nafter\n', self.mock_stdout.getvalue()) |
| |
| def testNotRedirectStdout(self): |
| print 'before' |
| with process_utils.RedirectStandardStreams(stdout=None): |
| print 'SHOULD_OUTPUT' |
| print 'after' |
| self.assertEquals('before\nSHOULD_OUTPUT\nafter\n', |
| self.mock_stdout.getvalue()) |
| |
| def testRedirectAgainStdoutWithinContext(self): |
| dummy_file = process_utils.DummyFile() |
| with self.assertRaises(IOError): |
| with process_utils.RedirectStandardStreams(stdout=dummy_file): |
| sys.stdout = process_utils.DummyFile() |
| |
| def testRedirectStdoutWithinContext(self): |
| dummy_file = process_utils.DummyFile() |
| print 'before' |
| with process_utils.RedirectStandardStreams(stdout=None): |
| print 'SHOULD_OUTPUT' |
| sys.stdout = dummy_file |
| print 'SHOULD_NOT_OUTPUT' |
| print 'after' |
| self.assertEquals('before\nSHOULD_OUTPUT\n', self.mock_stdout.getvalue()) |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.INFO) |
| unittest.main() |