blob: 8c24d0b767fbb0b18adab55f0ff0bc2737e670c4 [file] [log] [blame]
#!/usr/bin/trial --temp-directory=/tmp/_trial_temp/
#
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import copy
import glob
import logging
import mox
import os
import shutil
import time
from twisted.internet import reactor
from twisted.trial import unittest
from twisted.web import server, xmlrpc
import xmlrpclib
import factory_common # pylint: disable=W0611
from cros.factory.umpire.bundle_selector import SelectRuleset
from cros.factory.umpire.rpc_dut import (
LogDUTCommands,
RootDUTCommands,
UmpireDUTCommands,
FACTORY_STAGES)
from cros.factory.umpire.umpire_env import UmpireEnvForTest
from cros.factory.umpire.utils import ConcentrateDeferreds
from cros.factory.umpire.version import UMPIRE_VERSION_MAJOR
from cros.factory.umpire.web.xmlrpc import XMLRPCContainer
from cros.factory.utils import file_utils
from cros.factory.utils import net_utils
TEST_RPC_PORT = net_utils.GetUnusedPort()
TESTDIR = os.path.abspath(os.path.join(os.path.split(__file__)[0], 'testdata'))
TESTCONFIG = os.path.join(TESTDIR, 'enable_update.yaml')
class DUTRPCTest(unittest.TestCase):
def setUp(self):
self.env = UmpireEnvForTest()
shutil.copy(TESTCONFIG, self.env.active_config_file)
# Create empty files for resources.
for res in ['complete.gz##d41d8cd9',
'install_factory_toolkit.run#ftk_v0.1#d41d8cd9',
'efi.gz##d41d8cd9',
'firmware.gz#bios_v0.3:ec_v0.2#d41d8cd9',
'hwid.gz##d41d8cd9',
'vmlinux##d41d8cd9',
'oem.gz##d41d8cd9',
'rootfs-release.gz#release_v9876.0.0#d41d8cd9',
'rootfs-test.gz#test_v5432.0.0#d41d8cd9',
'install_factory_toolkit.run#ftk_v0.4#d41d8cd9',
'state.gz##d41d8cd9']:
file_utils.TouchFile(os.path.join(self.env.resources_dir, res))
self.env.LoadConfig()
self.mox = mox.Mox()
self.proxy = xmlrpc.Proxy('http://localhost:%d' % TEST_RPC_PORT,
allowNone=True)
root_commands = RootDUTCommands(self.env)
umpire_dut_commands = UmpireDUTCommands(self.env)
log_dut_commands = LogDUTCommands(self.env)
xmlrpc_resource = XMLRPCContainer()
xmlrpc_resource.AddHandler(root_commands)
xmlrpc_resource.AddHandler(umpire_dut_commands)
xmlrpc_resource.AddHandler(log_dut_commands)
self.twisted_port = reactor.listenTCP( # pylint: disable=E1101
TEST_RPC_PORT, server.Site(xmlrpc_resource))
# The device info that matches TESTCONFIG
self.device_info = {
'x_umpire_dut': {
'mac': 'aa:bb:cc:dd:ee:ff',
'sn': '0C1234567890',
'mlb_sn': 'SN001',
'stage': 'SMT'},
'components': {
'device_factory_toolkit': 'd41d8cd9',
'rootfs_release': 'release_v9876.0.0',
'rootfs_test': 'test_v5432.0.0',
'firmware_ec': 'ec_v0.2',
'firmware_bios': 'bios_v0.3'}}
def tearDown(self):
self.twisted_port.stopListening()
self.mox.UnsetStubs()
self.mox.VerifyAll()
def Call(self, function, *args):
return self.proxy.callRemote(function, *args)
def testPing(self):
def CheckResult(result):
self.assertEqual(result, {'version': UMPIRE_VERSION_MAJOR})
return result
d = self.Call('Ping')
d.addCallback(CheckResult)
return d
def testTime(self):
def CheckTime(result):
timediff = time.time() - result
logging.debug('testTime timediff = %f', timediff)
self.assertTrue(timediff <= 0.1)
return result
d = self.Call('GetTime')
d.addCallback(CheckTime)
return d
def testListParameters(self):
def CheckList(result):
logging.debug('list parameters = %s', str(result))
self.assertEqual(result, [])
return result
d = self.Call('ListParameters', 'parameters_*')
d.addCallback(CheckList)
return d
def testGetUpdateNoUpdate(self):
def CheckNoUpdate(result):
logging.debug('no update result: %s', str(result))
for unused_component, update_info in result.iteritems():
self.assertFalse(update_info['needs_update'])
return result
deferreds = []
for stage in FACTORY_STAGES:
noupdate_info = copy.deepcopy(self.device_info)
noupdate_info['x_umpire_dut']['stage'] = stage
d = self.Call('GetUpdate', noupdate_info)
d.addCallback(CheckNoUpdate)
deferreds.append(d)
return ConcentrateDeferreds(deferreds)
def testGetUpdate(self):
def CheckSingleComponentUpdate(result):
logging.debug('update result:\n\t%r', result)
self.assertEqual(1, sum(result[component]['needs_update'] for component in
result))
return result
update_info = copy.deepcopy(self.device_info)
ruleset = SelectRuleset(self.env.config, update_info['x_umpire_dut'])
logging.debug('selected ruleset: %s', str(ruleset))
deferreds = []
for component, stage_range in ruleset['enable_update'].iteritems():
# Make a new copy.
update_info = copy.deepcopy(self.device_info)
update_info['x_umpire_dut']['stage'] = stage_range[0]
update_info['components'][component] = 'force update'
deferred = self.Call('GetUpdate', update_info)
# There's only one component needs update.
deferred.addCallback(CheckSingleComponentUpdate)
deferreds.append(deferred)
return ConcentrateDeferreds(deferreds)
def testUploadReport(self):
def CheckTrue(result):
self.assertEqual(result, True)
return result
def CheckReport(content, namestrings=None):
if namestrings is None:
namestrings = []
report_files = glob.glob(os.path.join(
self.env.umpire_data_dir, 'report', '*', '*'))
logging.debug('report files: %r', report_files)
self.assertTrue(report_files)
report_path = report_files[0]
with open(report_path, 'rb') as f:
self.assertEqual(f.read(), content)
for name in namestrings:
self.assertIn(name, report_path)
return True
d = self.Call('UploadReport', 'serial1234', xmlrpclib.Binary('content'),
'rpt_name5678', 'stage90')
d.addCallback(CheckTrue)
d.addCallback(lambda _: CheckReport(
'content', namestrings=['serial1234', 'rpt_name5678', 'stage90',
'report', 'rpt.xz']))
return d
def testUploadEvent(self):
def CheckTrue(result):
self.assertEqual(result, True)
return result
def CheckEvent(content):
event_files = glob.glob(os.path.join(
self.env.umpire_data_dir, 'eventlog', '*', '*'))
logging.debug('event files: %r', event_files)
self.assertTrue(event_files)
event_path = event_files[0]
with open(event_path, 'r') as f:
self.assertEqual(f.read(), content)
return True
d = self.Call('UploadEvent', 'event_log_name', '123')
d.addCallback(CheckTrue)
d.addCallback(lambda _: CheckEvent('123'))
d.addCallback(lambda _: self.Call('UploadEvent', 'event_log_name', '456'))
d.addCallback(CheckTrue)
d.addCallback(lambda _: CheckEvent('123456'))
return d
if os.environ.get('LOG_LEVEL'):
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)5s %(message)s')
else:
logging.disable(logging.CRITICAL)