blob: bf213df527d0769015ca8d756cd098af22aaca88 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests and calibrates ambient light sensor with fixture.
This test tests the ambient light sensor by setting different light
intensity in the light chamber. Calibration is done by comparing the light
intensity detected from the ALS sensor and the preset value.
Usage examples::
# Saving parameter file in USB.
OperatorTest(
id='ALSCalibration',
pytest_name='light_sensor_fixture',
dargs=dict(
data_method='USB',
param_pathname='als.params'))
# Saving parameter file in shopfloor.
OperatorTest(
id='ALSCalibration',
pytest_name='light_sensor_fixture',
dargs=dict(
data_method='SF',
param_pathname='als.params'
shopfloor_ip='10.0.0.1'))
"""
from __future__ import print_function
from collections import namedtuple
import datetime
import logging
import os
import Queue
import re
import serial
import threading
import time
import traceback
import unittest
import xmlrpclib
import factory_common # pylint: disable=W0611
from cros.factory.system.i2cbus import I2CBus
from cros.factory.test.args import Arg
from cros.factory.test import factory
from cros.factory.test import leds
from cros.factory.test.media_util import MediaMonitor, MountedMedia
from cros.factory.test import network
from cros.factory.test import shopfloor
from cros.factory.test.serial_utils import OpenSerial, FindTtyByDriver
from cros.factory.test import test_ui
from cros.factory.test.utils import Enum
from cros.factory.utils.process_utils import Spawn
DataMethod = Enum(['USB', 'SF'])
EventType = Enum(['START_TEST', 'EXIT_TEST'])
FailCause = Enum(['ReadSerial', 'ModuleSN', 'CalibData', 'WriteVPD',
'ALSInit', 'ALSValue', 'ALSOrder', 'Unknown'])
InternalEvent = namedtuple('InternalEvent', 'event_type aux_data')
# Fake value for ALS to return.
_ALS_MOCK_VALUE = 10
_ALS_MOCK_SCALE_FACTOR = 0.5
_LED_PATTERN = ((leds.LED_NUM|leds.LED_CAP, 0.05), (0, 0.05))
class FixtureException(Exception):
pass
class Fixture(object):
"""Communications with the test fixture."""
def __init__(self, params, mock_mode=False):
self._tty = None
self._mock_mode = mock_mode
# Load parameters.
self._driver = params['driver']
self._serial_params = params['serial_params']
self._serial_delay = params['serial_delay']
self._light_delay = params['light_delay']
self._light_seq = params['light_seq']
self._fixture_echo = params['echo']
self._light_off = params['off']
def Connect(self):
"""Setups the serial port communication."""
if self._mock_mode:
return
port = FindTtyByDriver(self._driver)
if not port:
raise FixtureException('Cannot find TTY with driver %s' % self._driver)
self._tty = OpenSerial(port=port, **self._serial_params)
self._tty.flush()
def Send(self, msg):
"""Sends control messages to the fixture."""
if self._mock_mode:
return
for c in msg:
self._tty.write(str(c))
self._tty.flush()
# The fixture needs some time to process each incoming character.
time.sleep(self._serial_delay)
def _Read(self):
return self._tty.read(self._tty.inWaiting())
def AssertSuccess(self):
"""Checks if the returned value from the fixture is OK."""
if self._mock_mode:
return
ret = self._Read()
if not re.search(self._fixture_echo, ret):
raise FixtureException('The communication with fixture was broken')
def SetLight(self, idx):
self.Send(self._light_seq[idx])
def TurnOffLight(self):
self.Send(self._light_off)
def WaitForLightSwitch(self):
time.sleep(self._light_delay)
class ALS(object):
"""Interfaces the ambient light sensor over iio."""
# Default min delay seconds.
_DEFAULT_MIN_DELAY = 0.178
def __init__(self, val_path, scale_path, _mock_mode=False):
self._mock_mode = _mock_mode
self.detected = True
if _mock_mode:
return
if (not os.path.isfile(val_path) or
not os.path.isfile(scale_path)):
self.detected = False
return
self._val_path = val_path
self._scale_path = scale_path
def _ReadCore(self):
with open(self._val_path, 'r') as f:
val = int(f.readline().rstrip())
return val
def _Read(self, delay=None, samples=1):
"""Reads the light sensor value.
Args:
delay: Delay between samples in seconds. 0 means as fast as
possible.
samples: Total samples to read.
Returns:
The light sensor values in a list.
"""
if self._mock_mode:
return _ALS_MOCK_VALUE
if samples < 1:
samples = 1
if delay is None:
delay = self._DEFAULT_MIN_DELAY
buf = []
# The first value might be contaminated by previous settings.
# We need to skip it for better accuracy.
self._ReadCore()
for _ in range(samples):
time.sleep(delay)
val = self._ReadCore()
buf.append(val)
return buf
def ReadMean(self, delay=None, samples=1):
if self._mock_mode:
return _ALS_MOCK_VALUE
if not self.detected:
return None
buf = self._Read(delay, samples)
return int(round(float(sum(buf)) / len(buf)))
def SetScaleFactor(self, scale):
if self._mock_mode:
return
if not self.detected:
return
with open(self._scale_path, 'w') as f:
f.write(str(int(round(scale))))
return
def GetScaleFactor(self):
if self._mock_mode:
return _ALS_MOCK_SCALE_FACTOR
if not self.detected:
return None
with open(self._scale_path, 'r') as f:
s = int(f.readline().rstrip())
return s
class LightSensorFixtureTest(unittest.TestCase):
"""Tests light sensor."""
ARGS = [
Arg('mock_mode', bool, 'Mock mode allows testing without a fixture.',
default=False),
Arg('data_method', str, 'How to read parameters and save test results. '
'Supported types: SF and USB.', default='USB'),
Arg('param_pathname', str, 'Pathname of parameter file on '
'USB drive or shopfloor.', default='camera.params'),
Arg('shopfloor_ip', str, 'Local IP address for connecting shopfloor. '
'When data_method = Shopfloor, set as None to use DHCP.',
default=None, optional=True),
]
# CSS style classes defined in the corresponding HTML file.
_STYLE_INFO = 'color_idle'
_STYLE_PASS = 'color_good'
_STYLE_FAIL = 'color_bad'
def setUp(self):
self.ui = test_ui.UI()
self.internal_queue = Queue.Queue()
self._fail_cause = None
self._fixture = None
self._ignore_enter_key = False
self._light_state = 0
self._logs = []
self._params_loaded = False
self._params = None
self._progress = 0
self._result_status = None
self._usb_dev_path = None
self._usb_ready_event = None
self._module_sn = None
self._all_time = 0
self.ui.AddEventHandler(
'start_test_button_clicked',
lambda js_args: self._PostInternalQueue(EventType.START_TEST, js_args))
self.ui.AddEventHandler(
'exit_test_button_clicked',
lambda _: self._PostInternalQueue(EventType.EXIT_TEST))
self.ui.BindKey(
test_ui.ESCAPE_KEY,
lambda _: self._PostInternalQueue(EventType.EXIT_TEST))
def _PostInternalQueue(self, event_type, aux_data=None):
"""Posts an event to internal queue.
Args:
event_type: EventType.
aux_data: Extra data.
"""
self.internal_queue.put(InternalEvent(event_type, aux_data))
def _PopInternalQueue(self, wait):
"""Pops an event from internal queue.
Args:
wait: A bool flag to wait forever until internal queue has something.
Returns:
The first InternalEvent in internal queue. None if 'wait' is False and
internal queue is empty.
"""
if wait:
return self.internal_queue.get(block=True, timeout=None)
else:
try:
return self.internal_queue.get_nowait()
except Queue.Empty:
return None
def _Log(self, text):
logging.info(text)
self._logs.append(text)
def _MakePassLabel(self, msg):
return test_ui.MakeLabel(msg, css_class=self._STYLE_PASS)
def _MakeFailLabel(self, msg):
return test_ui.MakeLabel(msg, css_class=self._STYLE_FAIL)
def _ResetData(self):
self._progress = 0
self.ui.CallJSFunction('ResetUiData', "")
def _GetModuleSN(self):
"""Read module serial number.
The module serial number can be read from sysfs for USB camera or from
i2c for MIPI camera.
"""
if self._params['sn']['source'] == 'sysfs':
return self._GetModuleSNSysfs()
elif self._params['sn']['source'] == 'i2c':
return self._GetModuleSNI2C()
def _GetModuleSNSysfs(self):
success, input_sn = self._ReadSysfs(self._params['sn']['sysfs_path'])
if success:
self._Log('Serial number: %s' % input_sn)
if not re.match(self._params['sn']['format'], input_sn):
self._Log('Error: invalid serial number.')
return None
else:
return None
return input_sn
def _GetModuleSNI2C(self):
i2c_param = self._params['sn']['i2c_param']
data_addr = i2c_param['data_addr']
try:
# Power on camera so we can read from I2C
fd = os.open(i2c_param['dev_node'], os.O_RDWR)
bus = I2CBus('/dev/i2c-%d' % i2c_param['bus'])
ret = bus.wr_rd(i2c_param['chip_addr'],
[data_addr >> 8, data_addr & 0xff], i2c_param['length'])
return ''.join([chr(x) for x in reversed(ret)])
finally:
os.close(fd)
def _ReadSysfs(self, pathname):
"""Read single-line data from sysfs.
Args:
pathname: Pathname in sysfs.
Returns:
Tuple of (success, read data).
"""
try:
with open(pathname, 'r') as f:
read_data = f.read().rstrip()
except IOError as e:
self._Log('Fail to read %r: %r' % (pathname, e))
return False, None
if read_data.find('\n') >= 0:
self._Log('%r contains multi-line data: %r' % (pathname, read_data))
return False, None
return True, read_data
def _GetLogFilePrefix(self):
device_sn = shopfloor.get_serial_number() or 'MISSING_SN'
module_sn = self._module_sn or 'MISSING_MODULE_SN'
return '_'.join([re.sub(r'\W+', '_', x) for x in
[os.environ.get('CROS_FACTORY_TEST_PATH'),
device_sn, module_sn]])
def _SaveTestData(self):
"""Saves test data to USB drive or shopfloor."""
log_prefix = self._GetLogFilePrefix()
self._logs.append('') # add tailing newline
data_files = [(log_prefix + '.txt', '\n'.join(self._logs))]
# Skip saving test data for DataMethod.SIMPLE.
if self.args.data_method == DataMethod.USB:
self._SaveTestDataToUSB(data_files)
elif self.args.data_method == DataMethod.SF:
self._SaveTestDataToShopfloor(data_files)
def _SaveTestDataToUSB(self, data_files):
"""Saves test data to USB drive.
Args:
data_files: list of (filename, file data) pairs.
Returns:
Success or not.
"""
self._UpdateStatus(mid='save_to_usb')
self._usb_ready_event.wait()
with MountedMedia(self._usb_dev_path, 1) as mount_point:
folder_path = os.path.join(mount_point,
datetime.date.today().strftime('%Y%m%d'))
if os.path.exists(folder_path):
if not os.path.isdir(folder_path):
factory.console.info('Error: fail to create folder %r' % folder_path)
return False
else:
os.mkdir(folder_path)
for filename, data in data_files:
file_path = os.path.join(folder_path, filename)
mode = 'ab' if '.txt' in filename else 'wb'
try:
with open(file_path, mode) as f:
f.write(data)
except IOError as e:
self._Log('Error: fail to save %r: %r' % (file_path, e))
return False
self._UpdateProgress(pid='save_to_usb')
return True
def _SaveTestDataToShopfloor(self, data_files):
"""Saves test data to shopfloor.
Args:
data_files: list of (filename, file data) pairs.
"""
self._UpdateStatus(mid='save_to_shopfloor')
network.PrepareNetwork(ip=self.args.shopfloor_ip, force_new_ip=False)
shopfloor_client = shopfloor.GetShopfloorConnection()
for filename, data in data_files:
start_time = time.time()
shopfloor_client.SaveAuxLog(filename, xmlrpclib.Binary(data))
factory.console.info('Successfully uploaded %r in %.03f s',
filename, time.time() - start_time)
self._UpdateProgress(pid='save_to_shopfloor')
def _LoadParams(self):
"""Loads parameters and then shows main test screen."""
if self.args.data_method == DataMethod.USB:
self._params = self._LoadParamsFromUSB()
self._ResetData()
self.ui.CallJSFunction('OnUSBInit')
elif self.args.data_method == DataMethod.SF:
self._params = self._LoadParamsFromShopfloor()
# Sum all check point time to get total time
self._all_time = sum(self._params['chk_point'].values())
# Basic pre-processing of the parameters.
self.SyncFixture()
self._Log('Parameter version: %s\n' % self._params['version'])
def _LoadParamsFromUSB(self):
"""Loads parameters from USB drive."""
self._usb_ready_event = threading.Event()
MediaMonitor().Start(on_insert=self._OnUSBInsertion,
on_remove=self._OnUSBRemoval)
while self._usb_ready_event.wait():
with MountedMedia(self._usb_dev_path, 1) as mount_point:
pathname = os.path.join(mount_point, self.args.param_pathname)
try:
with open(pathname , 'r') as f:
data = eval(f.read())
self._params_loaded = True
return data
except IOError as e:
self._Log('Error: fail to read %r: %r' % (pathname, e))
time.sleep(0.5)
def _LoadParamsFromShopfloor(self):
"""Loads parameters from shopfloor."""
network.PrepareNetwork(ip=self.args.shopfloor_ip, force_new_ip=True)
self.ui.CallJSFunction("OnShopfloorInit")
factory.console.info('Reading %s from shopfloor', self.args.param_pathname)
shopfloor_client = shopfloor.GetShopfloorConnection()
data = eval(shopfloor_client.GetParameter(self.args.param_pathname).data)
self._params_loaded = True
return data
def _OnUSBInsertion(self, dev_path):
self._usb_dev_path = dev_path
self._usb_ready_event.set()
self.ui.CallJSFunction('OnUSBInsertion')
def _OnUSBRemoval(self, unused_dev_path):
self._usb_ready_event.clear()
self._usb_dev_path = None
self.ui.CallJSFunction('OnUSBRemoval')
def _OnU2SInsertion(self, _):
if self._params_loaded:
self.SyncFixture()
def _OnU2SRemoval(self, _):
if self._params_loaded:
self.ui.CallJSFunction('OnRemoveFixtureConnection')
def SyncFixture(self, _=None):
self.ui.CallJSFunction('OnDetectFixtureConnection')
cnt = 0
while not self._SetupFixture():
cnt += 1
if cnt >= self._params['fixture']['n_retry']:
self.ui.CallJSFunction('OnRemoveFixtureConnection')
return
time.sleep(self._params['fixture']['retry_delay'])
self.ui.CallJSFunction('OnAddFixtureConnection')
def _SetupFixture(self):
"""Initialize the communication with the fixture."""
try:
self._fixture = Fixture(self._params['fixture'], self.args.mock_mode)
self._fixture.Connect()
# Go with the default(first) lighting intensity.
self._light_state = 0
self._fixture.SetLight(self._light_state)
self._fixture.AssertSuccess()
except Exception as e:
self._fixture = None
self._Log(str(e) + '\n')
self._Log('Failed to initialize the test fixture.\n')
return False
self._Log('Test fixture successfully initialized.\n')
return True
def _WriteVPD(self, calib_result):
self._UpdateStatus(mid='dump_to_vpd')
conf = self._params['als']
if not calib_result:
self._UpdateResult(False, FailCause.CalibData)
self._Log('ALS calibration data is incorrect.\n')
return False
if (not self.args.mock_mode and
Spawn(conf['save_vpd'] % calib_result, shell=True)):
self._UpdateResult(False, FailCause.WriteVPD)
self._Log('Writing VPD data failed!\n')
return False
self._Log('Successfully calibrated ALS scales.\n')
self._UpdateProgress(pid='dump_to_vpd')
return True
def StartTest(self, _=None):
self._ResetData()
self._UpdateStatus(mid='start_test')
if not self._SetupFixture():
self._UpdateStatus(mid='fixture_fail')
self.ui.CallJSFunction('OnRemoveFixtureConnection')
return
self._UpdateProgress(pid='start_test')
self._StartALSTest()
self._FinalizeTest()
def _StartALSTest(self):
self._module_sn = self._GetModuleSN()
if not self._module_sn:
self._UpdateResult(False, FailCause.ModuleSN)
self._Log('Failed to get module serial number')
return
self._UpdateStatus(mid='init_als')
conf = self._params['als']
als = ALS(conf['val_path'], conf['scale_path'], self.args.mock_mode)
if not als.detected:
self._UpdateResult(False, FailCause.ALSInit)
self._Log('Failed to initialize the ALS.\n')
return
als.SetScaleFactor(conf['calibscale'])
self._UpdateProgress(pid='init_als')
# Go through all different lighting settings
# and record ALS values.
calib_result = 0
try:
vals = []
while True:
# Get ALS values.
self._UpdateStatus(mid='read_als%d' % self._light_state)
scale = als.GetScaleFactor()
val = als.ReadMean(delay=conf['read_delay'], samples=conf['n_samples'])
vals.append(val)
self._Log('Lighting preset lux value: %d\n' %
conf['luxs'][self._light_state])
self._Log('ALS value: %d\n' % val)
self._Log('ALS calibration scale: %d\n' % scale)
# Check if it is a false read.
if not val:
self._UpdateResult(False, FailCause.ALSValue)
self._Log('The ALS value is stuck at zero.\n')
return
# Compute calibration data if it is the calibration target.
if conf['luxs'][self._light_state] == conf['calib_lux']:
calib_result = int(round(float(conf['calib_target']) / val * scale))
self._Log('ALS calibration data will be %d\n' %
calib_result)
self._UpdateProgress(pid='read_als%d' % self._light_state)
# Go to the next lighting preset.
if not self._SwitchToNextLight():
break
# Check value ordering
# Skipping value ordering check when in mock mode since we don't have
# real ALS device
if not self.args.mock_mode:
for i, li in enumerate(conf['luxs']):
for j in range(i):
if ((li > conf['luxs'][j] and vals[j] >= vals[i]) or
(li < conf['luxs'][j] and vals[j] <= vals[i])):
self._UpdateResult(False, FailCause.ALSOrder)
self._Log('The ordering of ALS values is wrong.\n')
return
except (FixtureException, serial.serialutil.SerialException):
self._fixture = None
self._UpdateResult(None)
self._Log('The test fixture was disconnected!\n')
self.ui.CallJSFunction('OnRemoveFixtureConnection')
return
except:
self._Log(traceback.format_exc())
self._UpdateResult(False, FailCause.Unknown)
self._Log('Failed to read values from ALS or unknown error.\n' +
traceback.format_exc())
return
self._Log('Successfully recorded ALS values.\n')
# Save ALS values to vpd for FATP test.
if not self._WriteVPD(calib_result):
return
self._UpdateResult(True)
def _FinalizeTest(self):
result_map = {
True: 'PASSED',
False: 'FAILED',
None: 'UNTESTED'
}
self._UpdateStatus(mid='end_test')
self._Log('Result in summary:\n%s%s\n' %
(result_map[self._result_status],
(': %s' % self._fail_cause) if not self._result_status else ''))
self._UpdateProgress(pid='end_test')
self._SaveTestData()
# Display final result on UI
def get_str(ret, prefix, cause=None):
if ret:
return self._MakePassLabel(prefix + 'PASS')
if ret is None:
return self._MakeFailLabel(prefix + 'UNFINISHED: ' + cause)
return self._MakeFailLabel(prefix + 'FAIL:' + cause)
als_result = get_str(self._result_status, 'ALS: ', self._fail_cause)
self._UpdateStatus(msg=als_result)
self._UpdateProgress(value=100)
def _SwitchToNextLight(self):
self._UpdateStatus(mid='adjust_light')
conf = self._params['als']
self._light_state += 1
self._fixture.SetLight(self._light_state)
self._UpdateProgress(pid='adjust_light')
self._fixture.AssertSuccess()
if self._light_state >= len(conf['luxs']):
return False
self._UpdateStatus(mid='wait_fixture')
self._fixture.WaitForLightSwitch()
self._UpdateProgress(pid='wait_fixture')
return True
def _RegisterEvents(self, events):
for event in events:
assert hasattr(self, event)
self.ui.AddEventHandler(event, getattr(self, event))
def _UpdateStatus(self, mid=None, msg=None):
message = ''
if msg:
message = msg
elif mid:
message = test_ui.MakeLabel(self._params['message'][mid + '_en'],
self._params['message'][mid + '_zh'],
self._params['msg_style'][mid])
self.ui.CallJSFunction('UpdateTestStatus', message)
def _UpdateProgress(self, pid=None, value=None, add=True):
if value:
percent = value
elif pid:
if add:
self._progress += self._params['chk_point'][pid]
else:
self._progress = self._params['chk_point'][pid]
percent = int(round((float(self._progress) / self._all_time) * 100))
self.ui.CallJSFunction('UpdatePrograssBar', '%d%%' % percent)
def _UpdateResult(self, result, cause=None):
self._result_status = result
self._fail_cause = cause
def _StartSerialMonitor(self):
MediaMonitor('usb-serial').Start(on_insert=self._OnU2SInsertion,
on_remove=self._OnU2SRemoval)
def runTest(self):
ui_thread = self.ui.Run(blocking=False)
self.ui.CallJSFunction('InitLayout',
self.args.data_method == 'shopfloor',
self._ignore_enter_key)
self._LoadParams()
self._StartSerialMonitor()
while True:
event = self._PopInternalQueue(wait=True)
if event.event_type == EventType.START_TEST:
with leds.Blinker(_LED_PATTERN):
self.StartTest()
elif event.event_type == EventType.EXIT_TEST:
factory.log('%s test finished' % self.__class__)
if self._result_status:
self.ui.Pass()
else:
self.ui.Fail('ALS test failed.')
break
else:
raise ValueError('Invalid event type.')
ui_thread.join()