blob: bd95b4c4290cc4b9e4a8ef0d9dd23f9913e201d0 [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A test to invoke a list of shell commands.
Description
-----------
This test will execute the shell commands given from argument ``commands`` one
by one, and fail if any of the return value is non-zero.
By default the commands are executed on DUT (specified by Device API). If you
need to run on station (usually local), set argument ``is_station`` to True.
Test Procedure
--------------
This is an automated test without user interaction.
Start the test and it will run the shell commands specified, one by one; and
fail if any of the command returns false.
Dependency
----------
The test uses system shell to execute commands, which may be different per
platform. Also, the command may be not always available on target system.
You have to review each command to check if that's provided on DUT.
Examples
--------
To add multiple ports to iptables, add this in test list::
{
"pytest_name": "exec_shell",
"args": {
"commands": [
"iptables -A input -p tcp --dport 4020",
"iptables -A input -p tcp --dport 4021",
"iptables -A input -p tcp --dport 4022"
]
}
}
To load module 'i2c-dev' (and never fails), add this in test list::
{
"pytest_name": "exec_shell",
"args": {
"commands": "modprobe i2c-dev || true"
}
}
"""
import logging
import os
import StringIO
import subprocess
import time
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.test.i18n import _
from cros.factory.test import test_ui
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import process_utils
class ExecShell(test_ui.TestCaseWithUI):
"""Runs a list of commands.
Properties:
_commands: A list of CheckItems.
"""
ARGS = [
Arg('commands', (list, str),
'A list (or one simple string) of shell commands to execute.'),
Arg('is_station', bool,
('Run the given commands on station (usually local host) instead of '
'DUT, for example preparing connection configuration.'),
default=False)
]
@staticmethod
def _DisplayedCommand(command, length=50):
"""Returns a possibly truncated command with max length to display."""
return (command[:length] + ' ...') if len(command) > length else command
def UpdateOutput(self, handle, name, output, interval_sec=0.1):
"""Updates output from file handle to given HTML node."""
self.ui.SetHTML('', id=name)
while True:
c = os.read(handle.fileno(), 4096)
if not c:
break
self.ui.SetHTML(
test_ui.Escape(c, preserve_line_breaks=False), append=True, id=name,
autoscroll=True)
output[name].write(c)
time.sleep(interval_sec)
def setUp(self):
self.ui.SetTitle(_('Running shell commands...'))
self._dut = (device_utils.CreateStationInterface()
if self.args.is_station else
device_utils.CreateDUTInterface())
if isinstance(self.args.commands, basestring):
self._commands = [self.args.commands]
else:
self._commands = self.args.commands
def runTest(self):
self.ui.DrawProgressBar(len(self._commands))
for command in self._commands:
self.ui.SetInstruction(self._DisplayedCommand(command))
process = self._dut.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
handles = {
'stdout': process.stdout,
'stderr': process.stderr,
}
output = {name: StringIO.StringIO() for name in handles}
threads = [
process_utils.StartDaemonThread(
target=self.UpdateOutput, args=(handle, name, output))
for name, handle in handles.iteritems()
]
process.wait()
for thread in threads:
thread.join()
stdout = output['stdout'].getvalue()
stderr = output['stderr'].getvalue()
logging.info('Shell command: %r, result=%s, stdout=%r, stderr=%r',
command, process.returncode, stdout, stderr)
if process.returncode != 0:
# More chance so user can see the error.
self.Sleep(3)
self.FailTask('Shell command failed (%d): %s' % (process.returncode,
command))
self.ui.AdvanceProgress()