blob: 638198fa9f69f74169360f97b7461bce10c6ffcc [file] [log] [blame]
#! /usr/bin/env python
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A wrapper of the servo module from autotest repo."""
import argparse
import logging
import os
import re
import subprocess
import factory_common # pylint: disable=W0611
from cros.factory.factory_flow import common
from cros.factory.test import utils
from cros.factory.utils import process_utils
FLASHROM_LOCK_FILE = '/tmp/factory_flow_flashrom'
FLASHROM_LOCK_TIMEOUT = 600
class Servo(object):
"""A wrapper class for servo.Servo in autotest repo."""
# A board name map to translate board variants to base board.
BoardMap = {
'expresso': 'rambi',
'big': 'nyan_big',
}
def __init__(self, board, host, port=9999, serial=None):
self._InstallRequiredPackages()
# Do autotest imports here to stop it from messing around with logging
# settings.
# pylint: disable=W0612, F0401
import autotest_common
from autotest_lib.server import hosts
from autotest_lib.server.cros.servo import servo
if host in ('localhost', '127.0.0.1'):
if board in self.BoardMap:
board = self.BoardMap[board]
self._servod = process_utils.Spawn(
['servod', '--board', board, '--port', str(port)] +
(['--serialname', serial] if serial else []),
sudo=True, log=True)
def WaitForServod():
REGEXP = re.compile(r'^tcp.*(127\.0\.0\.1|localhost):%d' % port,
re.MULTILINE)
netstat = process_utils.CheckOutput(['netstat', '-nl'])
return bool(REGEXP.search(netstat))
utils.WaitFor(WaitForServod, 10)
if common.OnMoblab():
# Do not try to auto-update if we are running servo host directly on a
# Moblab.
hosts.ServoHost._update_image = lambda _: True
# Create a dummy servod config to make ServoHost happy.
process_utils.Spawn(['touch', '/var/lib/servod/config'],
log=True, sudo=True, check_call=True)
self._servo = servo.Servo(hosts.ServoHost(servo_host=host, servo_port=port),
serial)
def _InstallRequiredPackages(self):
if common.OnMoblab():
# Moblab has all the required packages.
return
# Check if flashrom is installed with ft2232_spi support.
flashrom_equery = process_utils.CheckOutput(
['equery', '--no-color', '--no-pipe', 'uses', 'flashrom'])
use_ft2232_spi = re.search(
r'^\s*([+-])\s*([+-])\s*ft2232_spi', flashrom_equery, re.MULTILINE)
if use_ft2232_spi.group(2) != '+':
logging.info(
'Your flashrom does not have ft2232_spi support. Re-building '
'flashrom package with USE=ft223_spi...')
process_utils.Spawn(
['sudo', '-E', 'emerge', 'flashrom'], env=dict(USE='ft2232_spi'),
log=True, check_call=True, log_stderr_on_error=True,
ignore_stdout=True)
# Check if flash_ec is accessible to root.
try:
process_utils.Spawn(['which', 'flash_ec'], sudo=True, check_call=True)
except subprocess.CalledProcessError:
logging.info('Cannot locate flash_ec through sudo. Creating symbolic '
'link of flash_ec in /usr/local/bin...')
process_utils.Spawn(
['ln', '-s', os.path.join(
os.environ['CROS_WORKON_SRCROOT'], 'src', 'platform', 'ec',
'util', 'flash_ec'),
'/usr/local/bin'], sudo=True, log=True, check_call=True)
def __getattr__(self, name):
"""Delegates getter of all unknown attributes to servo.Servo object."""
return self.__dict__.get(name) or getattr(self._servo, name)
def TearDown(self):
"""Cleans up servod process if we started one."""
if self._servod:
process_utils.TerminateOrKillProcess(self._servod, sudo=True)
def Main():
parser = argparse.ArgumentParser()
parser.add_argument('--board', help='the board name used to start servod')
parser.add_argument('--host', help='the servo host')
parser.add_argument('--port', type=int, default=9999,
help='the port servod listens to')
parser.add_argument('--serial', help='the serial number of the servo board')
parser.add_argument('method_call', help='the method and its args to call')
args = parser.parse_args()
servo = None
try:
servo = Servo(args.board, args.host, port=args.port, serial=args.serial)
print eval('servo.%s' % args.method_call, dict(servo=servo), {})
finally:
if servo:
servo.TearDown()
if __name__ == '__main__':
Main()