blob: 247296271658ab06381a47a264057895b04ef1e7 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for shop floor server."""
from __future__ import print_function
import logging
import os
import re
import shutil
import sys
import tempfile
import time
import unittest
import xmlrpclib
import factory_common # pylint: disable=W0611
from cros.factory import shopfloor
from cros.factory.shopfloor import factory_update_server
from cros.factory.test import factory
from cros.factory.test import utils
from cros.factory.umpire.client import umpire_server_proxy
from cros.factory.utils import net_utils
from cros.factory.utils.process_utils import Spawn
class ShopFloorServerTest(unittest.TestCase):
def setUp(self):
"""Starts shop floor server and creates client proxy."""
# pylint: disable=W0212
self.server_port = net_utils.FindUnusedTCPPort()
self.base_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
self.data_dir = tempfile.mkdtemp(prefix='shopfloor_data.')
self.auto_archive_logs = os.path.join(self.data_dir, 'auto-archive-logs')
self.logs_dir = os.path.join(self.data_dir, time.strftime('logs.%Y%m%d'))
self.reports_dir = os.path.join(
self.data_dir, shopfloor.REPORTS_DIR,
time.strftime(shopfloor.LOGS_DIR_FORMAT))
self.aux_logs_dir = os.path.join(
self.data_dir, shopfloor.AUX_LOGS_DIR,
time.strftime(shopfloor.LOGS_DIR_FORMAT))
self.events_dir = os.path.join(
self.data_dir, shopfloor.EVENTS_DIR)
self.parameters_dir = os.path.join(self.data_dir, 'parameters')
self.registration_code_log = (
os.path.join(self.data_dir, shopfloor.REGISTRATION_CODE_LOG_CSV))
csv_source = os.path.join(self.base_dir, 'testdata', 'devices.csv')
csv_work = os.path.join(self.data_dir, 'devices.csv')
aux_csv_source = os.path.join(self.base_dir, 'testdata', 'aux_mlb.csv')
aux_csv_work = os.path.join(self.data_dir, 'aux_mlb.csv')
shutil.copyfile(csv_source, csv_work)
shutil.copyfile(aux_csv_source, aux_csv_work)
os.mkdir(os.path.join(self.data_dir, shopfloor.UPDATE_DIR))
os.mkdir(os.path.join(self.data_dir, shopfloor.UPDATE_DIR, 'factory'))
factory_update_server.poll_interval_sec = 0.1
# Use shopfloor_server.py (or the SHOPFLOOR_SERVER_CMD environment
# variable if set).
cmd = os.environ.get(
'SHOPFLOOR_SERVER_CMD',
'python %s' % os.path.join(
self.base_dir, 'shopfloor_server.py')).split(' ')
cmd.extend([
'-q', '-a', net_utils.LOCALHOST, '-p', str(self.server_port),
'-m', 'cros.factory.shopfloor.simple_shopfloor',
'-d', self.data_dir,
'--auto-archive-logs', os.path.join(self.auto_archive_logs,
'logs.DATE.tar.bz2')])
self.process = Spawn(cmd, log=True)
self.proxy = umpire_server_proxy.TimeoutUmpireServerProxy(
'http://%s:%s' % (net_utils.LOCALHOST, self.server_port),
allow_none=True)
# Waits the server to be ready, up to 1 second.
for _ in xrange(10):
try:
self.proxy.Ping()
break
except: # pylint: disable=W0702
time.sleep(0.1)
continue
else:
self.fail('Server never came up')
def tearDown(self):
"""Terminates shop floor server"""
self.process.terminate()
self.process.wait()
shutil.rmtree(self.data_dir)
def testCheckSN(self):
# Valid serial numbers range from CR001001 to CR001025
for i in range(25):
serial = 'CR0010%02d' % (i + 1)
self.assertTrue(self.proxy.CheckSN(serial))
# Test invalid serial numbers
self.assertRaises(xmlrpclib.Fault, self.proxy.CheckSN, '0000')
self.assertRaises(xmlrpclib.Fault, self.proxy.CheckSN, 'garbage')
self.assertRaises(xmlrpclib.Fault, self.proxy.CheckSN, '')
self.assertRaises(xmlrpclib.Fault, self.proxy.CheckSN, None)
self.assertRaises(xmlrpclib.Fault, self.proxy.CheckSN, 'CR001000')
self.assertRaises(xmlrpclib.Fault, self.proxy.CheckSN, 'CR001026')
def _CreateFileAndContextAsFilename(self, filename):
utils.TryMakeDirs(os.path.dirname(filename))
with open(filename, 'w') as fd:
fd.write(os.path.basename(filename))
def testListParameters(self):
# Make few temporary files.
wifi_production = set(['rf/wifi/parameters.production'])
wifi_calibration = set(['rf/wifi/calibration_config.1',
'rf/wifi/calibration_config.2'])
cell_production = set(['rf/cell/parameters.production'])
for filename in (wifi_production | wifi_calibration | cell_production):
self._CreateFileAndContextAsFilename(
os.path.join(self.parameters_dir, filename))
self.assertEqual(set(self.proxy.ListParameters('rf/wifi/*')),
(wifi_production | wifi_calibration))
self.assertEqual(set(self.proxy.ListParameters('rf/wifi/calibration*')),
wifi_calibration)
# Because ListParameters is not recursive, this should be empty set.
self.assertEqual(set(self.proxy.ListParameters('*')), set())
self.assertEqual(set(self.proxy.ListParameters('rf/*/parameters.*')),
(wifi_production | cell_production))
# Listing files outside parameters directory should raise an exception.
self.assertRaises(xmlrpclib.Fault, self.proxy.ListParameters, 'rf/../../*')
def testGetParameter(self):
wifi_production = 'parameters.production'
relpath = os.path.join('rf/wifi/', wifi_production)
self._CreateFileAndContextAsFilename(
os.path.join(self.parameters_dir, relpath))
# Get valid parameter.
self.assertEquals(wifi_production, self.proxy.GetParameter(relpath).data)
# Get parameter that doesn't exist
self.assertRaises(xmlrpclib.Fault, self.proxy.GetParameter, relpath + 'foo')
self.assertRaises(xmlrpclib.Fault, self.proxy.GetParameter, 'rf/wifi')
# Get parameter outside parameters folder.
self.assertRaises(
xmlrpclib.Fault, self.proxy.GetParameter, '../devices.csv')
def testGetHWID(self):
# Valid HWIDs range from CR001001 to CR001025
for i in range(25):
serial = 'CR0010%02d' % (i + 1)
result = self.proxy.GetHWID(serial)
self.assertTrue(result.startswith('MAGICA '))
self.assertEqual(len(result.split(' ')), 4)
def testGetHWIDUpdater(self):
self.assertEquals(None, self.proxy.GetHWIDUpdater())
# Add a HWID updater; the update server will start serving it within
# a second.
with open(os.path.join(self.data_dir, shopfloor.UPDATE_DIR,
'hwid_updater.sh'), 'w') as f:
f.write('foobar')
for _ in xrange(20):
updater = self.proxy.GetHWIDUpdater()
if updater:
self.assertEqual('foobar', updater.data)
break
time.sleep(0.1)
else:
self.fail('HWID updater was never picked up')
# Add another file; now there should be no updater returned since
# this is an invalid state.
open(os.path.join(self.data_dir, shopfloor.UPDATE_DIR,
'hwid_updater2.sh'), 'w').close()
for _ in xrange(20):
if self.proxy.GetHWIDUpdater() is None:
break # Good!
time.sleep(0.1)
else:
self.fail('HWID updater never reverted to None')
def testGetVPD(self):
# VPD fields defined in simple.csv
RO_FIELDS = ('region', 'serial_number')
RW_FIELDS_SET1 = ('wifi_mac', 'cellular_mac')
RW_FIELDS_SET2 = ('wifi_mac', )
vpd = self.proxy.GetVPD('CR001005')
for field in RO_FIELDS:
self.assertTrue(field in vpd['ro'] and vpd['ro'][field])
for field in RW_FIELDS_SET1:
self.assertTrue(field in vpd['rw'] and vpd['rw'][field])
self.assertEqual(vpd['ro']['region'], 'us')
self.assertEqual(vpd['rw']['wifi_mac'], '0b:ad:f0:0d:15:05')
self.assertEqual(vpd['rw']['cellular_mac'], '70:75:65:6c:6c:65')
vpd = self.proxy.GetVPD('CR001016')
for field in RO_FIELDS:
self.assertTrue(field in vpd['ro'] and vpd['ro'][field])
for field in RW_FIELDS_SET2:
self.assertTrue(field in vpd['rw'] and vpd['rw'][field])
self.assertEqual(vpd['ro']['region'], 'nl')
self.assertEqual(vpd['rw']['wifi_mac'], '0b:ad:f0:0d:15:10')
self.assertEqual(vpd['rw']['cellular_mac'], '')
# Checks MAC addresses
for i in range(25):
serial = 'CR0010%02d' % (i + 1)
vpd = self.proxy.GetVPD(serial)
wifi_mac = vpd['rw']['wifi_mac']
self.assertEqual(wifi_mac, '0b:ad:f0:0d:15:%02x' % (i + 1))
if i < 5:
cellular_mac = vpd['rw']['cellular_mac']
self.assertEqual(cellular_mac, '70:75:65:6c:6c:%02x' % (i + 0x61))
# Checks invalid serial numbers
self.assertRaises(xmlrpclib.Fault, self.proxy.GetVPD, 'MAGICA')
return True
def testSaveAuxLog(self):
self.proxy.SaveAuxLog('foo/bar', shopfloor.Binary('Blob'))
self.assertEquals(
'Blob',
open(os.path.join(self.aux_logs_dir, 'foo/bar')).read())
def _MakeTarFile(self, content_path, compress=True):
"""Makes a tar archive containing a single empty file.
Args:
content_path: The path to the empty file within the archive.
Returns: The tar archive contents as a string.
"""
tmp = tempfile.mkdtemp('tar')
try:
factory_log_path = os.path.join(
tmp, content_path.lstrip('/'))
utils.TryMakeDirs(os.path.dirname(factory_log_path))
open(factory_log_path, 'w').close()
return Spawn([
'tar', '-c' + ('j' if compress else '') + 'f', '-', '-C', tmp,
content_path.lstrip('/')],
check_output=True).stdout_data
finally:
shutil.rmtree(tmp)
def testUploadCorruptReport_Empty(self):
self.assertRaisesRegexp(
xmlrpclib.Fault,
'This does not look like a tar archive',
self.proxy.UploadReport, 'CR001020', shopfloor.Binary(''), 'foo')
def testUploadCorruptReport_MissingLog(self):
self.assertRaisesRegexp(
xmlrpclib.Fault,
factory.FACTORY_LOG_PATH_ON_DEVICE.lstrip('/') + ' missing',
self.proxy.UploadReport, 'CR001020',
shopfloor.Binary(self._MakeTarFile('foo')), 'foo')
def testUploadCorruptReport_CorruptBZ2(self):
tbz2 = self._MakeTarFile('foo')
tbz2 = tbz2[:-1] # Truncate the file
self.assertRaisesRegexp(
xmlrpclib.Fault,
'Compressed file ends unexpectedly',
self.proxy.UploadReport, 'CR001020',
shopfloor.Binary(tbz2), 'foo')
def testUploadReportWithHourlyRotation(self):
# Other things should be well covered by the testUploadReport.
# We only test if the hourly rotating directory created as expected.
blob = self._MakeTarFile(factory.FACTORY_LOG_PATH_ON_DEVICE)
report_name = 'hourly_report_blob.rpt.bz2'
# Test the SetReportHourlyRotation is working.
self.assertFalse(self.proxy.SetReportHourlyRotation(False))
self.assertTrue(self.proxy.SetReportHourlyRotation(True))
expected_report_path = os.path.join(
self.proxy.GetReportsDir(), report_name)
# TODO(itspeter): It might be a risk of low possibility that hour of
# getting the path are different when the RPC call is actually made.
# A re-run of make test should immediate fix that : )
self.proxy.UploadReport('CR001020', shopfloor.Binary(blob),
report_name)
self.assertEquals(blob, open(expected_report_path).read())
def testUploadEventWithHourlyRotataion(self):
# Other things should be well covered by the testUploadEvent.
# We only test if the hourly rotating directory created as expected.
# Test the SetEventHourlyRotation is working.
self.assertFalse(self.proxy.SetEventHourlyRotation(False))
self.assertTrue(self.proxy.SetEventHourlyRotation(True))
incremental_event_file = os.path.join(
self.proxy.GetIncrementalEventsDir(), 'LOG_tradasai')
self.assertTrue(self.proxy.UploadEvent('LOG_tradasai',
'PREAMBLE\n---\nEVENT_1\n'))
# There is a low possibility flaky that hour of getting the path
# are different when the RPC call is actually made.
# A re-run of make test should immediate fix that : )
self.assertTrue(os.path.isfile(incremental_event_file))
with open(incremental_event_file, 'r') as f:
events = [event.strip() for event in f.read().split('---')]
self.assertEqual(events[0], 'PREAMBLE')
self.assertEqual(events[1], 'EVENT_1')
def testUploadReport(self):
# Upload simple blob
blob = self._MakeTarFile(factory.FACTORY_LOG_PATH_ON_DEVICE)
report_name = 'simple_blob.rpt.bz2'
report_path = os.path.join(self.reports_dir, report_name)
self.proxy.UploadReport('CR001020', shopfloor.Binary(blob),
report_name)
self.assertEquals(blob, open(report_path).read())
self.assertTrue(re.match(r'^[0-9a-f]{32}\s',
open(report_path + '.md5').read()))
# Try to upload to invalid serial number
self.assertRaises(xmlrpclib.Fault, self.proxy.UploadReport, 'CR00200', blob)
# Move the report to yesterday's dir. "Insert" some media and
# check that the logs are archived.
yesterday_localtime = time.localtime(time.time() - 24 * 60 * 60)
yesterday = time.strftime(shopfloor.LOGS_DIR_FORMAT, yesterday_localtime)
shutil.move(self.reports_dir,
os.path.join(self.data_dir, shopfloor.REPORTS_DIR, yesterday))
os.makedirs(self.auto_archive_logs)
dest_path = os.path.join(
self.auto_archive_logs,
time.strftime('logs.%Y%m%d.tar.bz2', yesterday_localtime))
for _ in xrange(20):
if os.path.exists(dest_path):
break
time.sleep(.1)
else:
self.fail('%s was never created' % dest_path)
def testFinalize(self):
self.proxy.Finalize('CR001024')
self.assertRaises(xmlrpclib.Fault, self.proxy.Finalize, '0999')
def testGetTestMd5sum(self):
shutil.copyfile(os.path.join(os.path.dirname(__file__),
'testdata', 'factory.tar.bz2'),
os.path.join(self.data_dir, shopfloor.UPDATE_DIR,
'factory.tar.bz2'))
# It should be unpacked within a second.
for _ in xrange(20):
md5sum = self.proxy.GetTestMd5sum()
if md5sum:
self.assertEqual('18cac06201e65e060f757193c153cacb', md5sum)
break
time.sleep(0.1)
else:
self.fail('No update found')
def testGetTestMd5sumWithoutMd5sumFile(self):
self.assertTrue(self.proxy.GetTestMd5sum() is None)
def testGetRegistrationCodeMap(self):
self.assertEquals(
{'user': ('000000000000000000000000000000000000'
'0000000000000000000000000000190a55ad'),
'group': ('010101010101010101010101010101010101'
'010101010101010101010101010162319fcc')},
self.proxy.GetRegistrationCodeMap('CR001001'))
# Make sure it was logged.
log = open(self.registration_code_log).read()
self.assertTrue(re.match(
r'^MAGICA,'
r'000000000000000000000000000000000000'
r'0000000000000000000000000000190a55ad,'
r'010101010101010101010101010101010101'
r'010101010101010101010101010162319fcc,'
r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,'
r'MAGICA MADOKA A-A 1214\n', log), repr(log))
def testLogRegistrationCode(self):
valid_code = ('000000000000000000000000000000000000'
'0000000000000000000000000000190a55ad')
invalid_code = '1' + valid_code[1:]
# This should work.
self.proxy.LogRegistrationCodeMap(
'MAGICA MADOKA A-A 1214', {'user': valid_code, 'group': valid_code})
for invalid_map in ({'user': invalid_code, 'group': valid_code},
{'user': valid_code, 'group': invalid_code}):
self.assertRaisesRegexp(
Exception, "CRC of '10+190a55ad' is invalid",
self.proxy.LogRegistrationCodeMap,
'MAGICA MADOKA A-A 1214', invalid_map)
def testUploadEvent(self):
# A new event file should be created.
self.assertTrue(self.proxy.UploadEvent('LOG_C835C718',
'PREAMBLE\n---\nEVENT_1\n'))
event_file = os.path.join(self.events_dir, 'LOG_C835C718')
self.assertTrue(os.path.isfile(event_file))
# Additional events should be appended to existing event files.
self.assertTrue(self.proxy.UploadEvent('LOG_C835C718',
'---\nEVENT_2\n'))
with open(event_file, 'r') as f:
events = [event.strip() for event in f.read().split('---')]
self.assertEqual(events[0], 'PREAMBLE')
self.assertEqual(events[1], 'EVENT_1')
self.assertEqual(events[2], 'EVENT_2')
def testGetDeviceData(self):
self.assertEqual({'serial_number': 'MLB00001',
'has_lte': True},
self.proxy.GetAuxData('mlb', 'MLB00001'))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
unittest.main()