blob: 5cf6a0008bae00cd26f820173670f5e4aa182107 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# DESCRIPTION:
# This test is used to verify the functionality of bluetooth device.
# The functionality under test are:
# 1. Detect the specified number of bluetooth adapter on dut.
# 2. Scan remote bluetooth device and try to find at least one device.
# 3. If a remote device keyword is given, the test will only care
# the devices whose 'Name' contains keyword. This applies to item 4 as well.
# 4. If an RSSI threshold value is given, check that the largest average RSSI
# among all scanned devices >= threshold.
# 5. Try to pair and connect with the bluetooth input device. Now it supports
# mouse.
# Check the ARGS in BluetoothTest for the detail of arguments.
import glob
import logging
import os
import sys
import threading
import time
import unittest
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.test import event_log
from cros.factory.test import factory
from cros.factory.test import factory_task
from cros.factory.test.i18n import test_ui as i18n_test_ui
from cros.factory.test import shopfloor
from cros.factory.test import test_ui
from cros.factory.test import ui_templates
from cros.factory.test.utils import bluetooth_utils
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import process_utils
from cros.factory.utils import sync_utils
_TEST_TITLE = i18n_test_ui.MakeI18nLabel('Bluetooth functional Test')
_MSG_DETECT_ADAPTER = i18n_test_ui.MakeI18nLabel('Detect bluetooth adapter')
_MSG_TURN_ON_DEVICE = i18n_test_ui.MakeI18nLabelWithClass(
'Enable the connection ability of bluetooth device and press Enter',
'start-font-size')
_MSG_INTO_FIXTURE = i18n_test_ui.MakeI18nLabelWithClass(
'Place the base into the fixture, '
'and press the space key on the test host.', 'start-font-size')
_MSG_RESET_MAGNET = i18n_test_ui.MakeI18nLabelWithClass(
'Please re-attach the magnet, and press the space key on the test host.',
'start-font-size')
_MSG_START_CHARGE = i18n_test_ui.MakeI18nLabelWithClass(
'Turn on charging by pressing the green button, '
'take the keyboard out and put it back, '
'and press the space key on the test host.', 'start-font-size')
_MSG_READ_BATTERY_1 = i18n_test_ui.MakeI18nLabelWithClass(
'Read battery level for the 1st time.', 'start-font-size')
_MSG_READ_BATTERY_2 = i18n_test_ui.MakeI18nLabelWithClass(
'Read battery level for the 2nd time.', 'start-font-size')
_MSG_BATTERY_CHARGE_TEST = i18n_test_ui.MakeI18nLabelWithClass(
'Check if the battery has charged to a higher percentage',
'start-font-size')
_MSG_CHECK_BATTERY_LEVEL = i18n_test_ui.MakeI18nLabelWithClass(
'Check battery level.', 'start-font-size')
_MSG_STOP_CHARGE = i18n_test_ui.MakeI18nLabelWithClass(
'Press the green button again to stop charging, '
'and press the space key on the test host.', 'start-font-size')
_MSG_OUT_OF_FIXTURE = i18n_test_ui.MakeI18nLabelWithClass(
'Take the base out of the fixture, '
'and press the space key on the test host.', 'start-font-size')
_MSG_READ_FIRMWARE_REVISION_STRING = i18n_test_ui.MakeI18nLabelWithClass(
'Read firmware revision string.', 'start-font-size')
_MSG_SCAN_DEVICE = i18n_test_ui.MakeI18nLabelWithClass(
'Scanning...', 'start-font-size')
_MSG_DETECT_RSSI = lambda count, total: (
i18n_test_ui.MakeI18nLabelWithClass(
'Detect RSSI (count {count}/{total})',
'start-font-size', count=count, total=total))
_MSG_TURN_ON_INPUT_DEVICE = i18n_test_ui.MakeI18nLabelWithClass(
'Enable the connection ability of input bluetooth device and press Enter',
'start-font-size')
_MSG_PAIR_INPUT_DEVICE = i18n_test_ui.MakeI18nLabelWithClass(
'Pairing to input device now...', 'start-font-size')
_MSG_UNPAIR = i18n_test_ui.MakeI18nLabelWithClass(
'Press shift-p-a-i-r simultaneously on the base.', 'start-font-size')
_MSG_CONNECT_INPUT_DEVICE = i18n_test_ui.MakeI18nLabelWithClass(
'Connecting to input device now...', 'start-font-size')
_MSG_TEST_INPUT = i18n_test_ui.MakeI18nLabelWithClass(
'Please test input. Press Escape to fail and Enter to pass',
'start-font-size')
_MSG_UNPAIRING = i18n_test_ui.MakeI18nLabelWithClass(
'Unpairing', 'start-font-size')
_MSG_AUTH_FAILED = i18n_test_ui.MakeI18nLabelWithClass(
'Authentication failed, retrying...', 'start-font-size')
INPUT_MAX_RETRY_TIMES = 10
INPUT_RETRY_INTERVAL = 1
RESET_ADAPTER_SLEEP_TIME = 5
READ_BATTERY_MAX_RETRY_TIMES = 10
BATTERY_LEVEL_KEY = 'battery_level'
READ_BATTERY_STEP_1 = 'read_battery_1'
READ_BATTERY_STEP_2 = 'read_battery_2'
def GetCurrentTime():
"""Get the current time."""
return time.strftime('%Y-%m-%d %H:%M:%S')
def ColonizeMac(mac):
""" Given a MAC address, normalize its colons.
Example: ABCDEF123456 -> AB:CD:EF:12:34:56
"""
mac_no_colons = ''.join(mac.strip().split(':'))
groups = [mac_no_colons[x:x+2] for x in range(0, len(mac_no_colons), 2)]
return ':'.join(groups)
def MakePasskeyLabelPrompt(passkey):
"""Creates a label prompting the operator to enter a passkey"""
return i18n_test_ui.MakeI18nLabelWithClass(
'Enter passkey {key} then press enter on the base.',
'start-font-size',
key=passkey)
def CheckInputCount():
"""Returns the number of input devices from probing /dev/input/event*."""
number_input = len(glob.glob('/dev/input/event*'))
logging.info('Found %d input devices.', number_input)
return number_input
def WaitForInputCount(task, expected_input_count, timeout=10):
""" Waits for the number of input devices to reach the given count.
Returns true if the input count reaches the given amount, false
otherwise. On failure it fails the task.
Args:
task: The task that may be Failed
expected_input_count: The number of input devices that determines success
timeout: The maximum time in seconds that we will wait
"""
end_time = time.time() + timeout
while time.time() < end_time:
input_count = CheckInputCount()
if input_count == expected_input_count:
return True
time.sleep(0.2)
task.Fail('Input device count %d is different than expected %d.' %
(input_count, expected_input_count))
return False
def _ResetAdapter():
"""Reset the adapter every time when using the BT device.
This is because the adapter may be down anytime for some unknown reason.
"""
cmd = 'hciconfig hci0 reset'
factory.console.info('Reset adapter and wait 5 seconds....: %s', cmd)
process_utils.Spawn(cmd.split(), log=True, check_call=True)
time.sleep(RESET_ADAPTER_SLEEP_TIME)
def _SaveLocalLog(log_file, data):
"""Save the log locally on a test host."""
log_dir = os.path.dirname(log_file)
if not os.path.isdir(log_dir):
os.makedirs(log_dir)
with open(log_file, 'a') as log:
log.write(str(data))
def _SaveAuxLogOnShopfloor(aux_log_file, data):
"""Save the local log file to shopfloor."""
try:
shopfloor_client = shopfloor.GetShopfloorConnection()
shopfloor_client.SaveAuxLog(aux_log_file, str(data))
except Exception as e:
# It is only a logging error. Do not fail the test.
logging.warning('Save aux log failure: %s', e)
def _SaveLogs(log_file, aux_log_file, data):
"""Save the log files on the local test host and on the shopfloor."""
# Prepend the current timestamp to each line.
data = ''.join([(GetCurrentTime() + ' ' + line + '\n') if line else '\n'
for line in data.splitlines()])
if log_file:
_SaveLocalLog(log_file, data)
if aux_log_file:
with open(log_file) as log:
_SaveAuxLogOnShopfloor(aux_log_file, log.read())
def RetryWithProgress(template, template_message, action_string,
max_retry_times, retry_interval, target, *args, **kwargs):
"""Runs target function with retries and shows retry times on progress bar.
Args:
template: an ui_template
template_message: The message to show when target function is running.
action_string: The string to describe the action in logging.
max_retry_times: the maximal retry times
retry_interval: the interval between retries
target: The target function. *args and **kwargs will be passed to target.
Returns:
Return the return value of the target function.
"""
def _UpdateProgressBar(retry_time, max_retry_time):
"""Updates the progress bar according to retry_time and max_retry_time."""
msg = 'Update progress bar with retry time: %d, max retry time: %d.'
logging.info(msg, retry_time, max_retry_time)
template.SetProgressBarValue(int(100 * retry_time / max_retry_time))
template.SetState(template_message)
template.DrawProgressBar()
target_result = sync_utils.Retry(max_retry_times, retry_interval,
_UpdateProgressBar, target, *args, **kwargs)
template.SetProgressBarValue(100)
log_msg = '%s was done.' if target_result else '%s failed.'
logging.info(log_msg, action_string)
return target_result
class DetectAdapterTask(factory_task.FactoryTask):
"""The task checking number of adapters.
Detects adapters from dbus and checks if the number of adapters matches the
expected number.
Args:
expected_adapter_count: The expected number of bluetooth adapters.
"""
def __init__(self, test, expected_adapter_count):
super(DetectAdapterTask, self).__init__()
self._test = test
self._expected_adapter_count = expected_adapter_count
def Run(self):
self._test.template.SetState(_MSG_DETECT_ADAPTER)
adapters = self._test.dut.bluetooth.GetAdapters(
self._test.args.detect_adapters_retry_times,
self._test.args.detect_adapters_interval_secs)
if len(adapters) == self._expected_adapter_count:
self.Pass()
else:
self.Fail('DetectAdapter: expect %d and find %d adapter(s).' %
(self._expected_adapter_count, len(adapters)))
class TurnOnTask(factory_task.FactoryTask):
"""The task to ask operator to turn on bluetooth device and press enter.
Args:
message: Html code containing message to show on the screen.
"""
def __init__(self, test, message, key=test_ui.ENTER_KEY):
super(TurnOnTask, self).__init__()
self._test = test
self._message = message
self._key = key
def Cleanup(self):
"""Unbinds the Enter key after this task is done."""
self._test.ui.UnbindKey(self._key)
def Run(self):
self._test.template.SetState(self._message)
self._test.ui.AppendCSS('.start-font-size {font-size: 2em;}')
self._test.ui.BindKey(self._key, lambda _: self.Pass())
logging.info('wait for the user to press a key')
def SetAndStartScanProgressBar(template, timeout_secs, scan_event=None):
"""Control progress bar fo a duration of timeout_secs."""
def UpdateProgressBar():
"""Updates progress bar for a duration of timeout_secs"""
start_time = time.time()
end_time = start_time + timeout_secs
while time.time() < end_time:
if scan_event and scan_event.isSet():
break
template.SetProgressBarValue(int(
100 * (time.time() - start_time) / timeout_secs))
time.sleep(0.2)
template.SetProgressBarValue(100)
logging.debug('UpdateProgressBar stopped.')
template.DrawProgressBar()
return process_utils.StartDaemonThread(
target=UpdateProgressBar, name='ProgressThread')
class ScanDevicesTask(factory_task.FactoryTask):
"""The task to scan bluetooth devices around.
In this task, the test will control the first adapter from BluetoothManager
and scan devices around for timeout_secs. The task passed if there is at least
one device.
If target_addresses is provided, the test will also check if it can find
at least one device specified in target_addresses list.
This passes the strongest matching device mac to _test.SetStrongestRssiMac
Note: this task is intended to be executed on a DUT, i.e., a chromebook,
to test its bluetooth module. A typical test case is to see if it can detect
a bluetooth mouse placed around it.
"""
def __init__(self, test):
super(ScanDevicesTask, self).__init__()
self._test = test
self._keyword = test.args.keyword
self._average_rssi_threshold = test.args.average_rssi_threshold
self._mac_to_scan = test.GetInputDeviceMac()
self._scan_counts = test.args.scan_counts
self._timeout_secs = test.args.scan_timeout_secs
self._progress_thread = None
def FilterByKeyword(self, devices):
"""Returns the devices filtered by self._keyword.
If self._keyword is None, leave devices as it is.
"""
if self._keyword is None:
return devices
filtered_devices = {}
for mac, props in devices.iteritems():
if 'Name' not in props:
logging.warning('Device %s: %s does not have "Name" property.',
mac, props)
continue
if self._keyword in props['Name']:
filtered_devices[mac] = props
logging.info('Device %s: "Name" property %s matches keyword %s.',
mac, props['Name'], self._keyword)
return filtered_devices
def UpdateRssi(self, devices_rssis, devices):
"""Updates devices_rssis using RSSI property in devices.
Args:
devices_rssis: A dict. Keys are mac addresses and values are lists of
scanned RSSI value.
devices: A dict. Keys are mac addresses and values are dicts of
properties.
"""
for mac, props in devices.iteritems():
if 'RSSI' not in props:
logging.warning('Device %s: %s does not have "RSSI" property.',
mac, props)
continue
if mac in devices_rssis:
devices_rssis[mac].append(props['RSSI'])
else:
devices_rssis[mac] = [props['RSSI']]
logging.info('UpdateRssi: %s', devices_rssis)
def Run(self):
bluetooth_manager = self._test.dut.bluetooth
adapter = bluetooth_manager.GetFirstAdapter(self._test.host_mac)
# Records RSSI of each scan and calculates average rssi.
candidate_rssis = {}
# Helper to check if the target MAC has been scanned
def has_scanned_target_mac():
return self._mac_to_scan and self._mac_to_scan in candidate_rssis
for _ in xrange(self._scan_counts):
self._test.template.SetState(_MSG_SCAN_DEVICE)
self._progress_thread = SetAndStartScanProgressBar(self._test.template,
self._timeout_secs)
devices = bluetooth_manager.ScanDevices(adapter, self._timeout_secs)
self._progress_thread.join()
logging.info('Found %d device(s).', len(devices))
for mac, props in devices.iteritems():
try:
logging.info('Device found: %s. Name: %s, RSSI: %d',
mac, props['Name'], props['RSSI'])
except KeyError:
logging.exception('Name or RSSI is not available in %s', mac)
self.UpdateRssi(candidate_rssis, self.FilterByKeyword(devices))
# Optimization: if we are only interested in one particular address,
# then we can early-out as soon as we find it
if self._average_rssi_threshold is None and has_scanned_target_mac():
logging.info("Address found, ending scan early")
break
logging.info('Found %d candidate device(s) in %s scans.',
len(candidate_rssis), self._scan_counts)
factory.console.info('Candidate devices scan results: %s',
dict((str(k), [int(r) for r in v])
for k, v in candidate_rssis.iteritems()))
if len(candidate_rssis) == 0:
self.Fail('ScanDevicesTask: Fail to find any candidate device.')
return
# Calculates maximum average RSSI.
max_average_rssi_mac, max_average_rssi = None, -sys.float_info.max
for mac, rssis in candidate_rssis.iteritems():
# typecast to str to avoid the weird dbus.String type
mac = str(mac)
average_rssi = float(sum(rssis)) / len(rssis)
logging.info('Device %s has average RSSI: %f', mac, average_rssi)
event_log.Log('avg_rssi', mac=mac, average_rssi=average_rssi)
if average_rssi > max_average_rssi:
max_average_rssi_mac, max_average_rssi = mac, average_rssi
logging.info('Device %s has the largest average RSSI: %f',
max_average_rssi_mac, max_average_rssi)
event_log.Log('bluetooth_scan_device', mac=str(max_average_rssi_mac),
rssi=float(max_average_rssi),
meet=max_average_rssi >= self._average_rssi_threshold)
self._test.SetStrongestRssiMac(max_average_rssi_mac)
if self._mac_to_scan and not has_scanned_target_mac():
found_addresses = [str(k) for k in candidate_rssis]
self.Fail('Failed to find MAC address %s.'
'Scanned addresses: %s' % (self._mac_to_scan, found_addresses))
return
if self._average_rssi_threshold is None:
# Test is uninterested in RSSI thresholds
self.Pass()
elif self._average_rssi_threshold > max_average_rssi:
factory.console.error('The largest average RSSI %f does not meet'
' threshold %f. Please ensure that the test BT '
'device is \'visible\' and close to the DUT '
'antenna.',
max_average_rssi, self._average_rssi_threshold)
self.Fail('ScanDeviceTask: The largest average RSSI %f of device %s does'
' not meet threshold %f.' % (
max_average_rssi, str(max_average_rssi_mac),
self._average_rssi_threshold))
else:
factory.console.info('The largest average RSSI %f meets threshold %f.',
max_average_rssi, self._average_rssi_threshold)
self.Pass()
class DetectRSSIofTargetMACTask(factory_task.FactoryTask):
"""The task to detect the RSSI strength at a given target MAC address.
In this task, a generic test host uses the first adapter from
BluetoothManager and scans devices around for timeout_secs. The task
passed if it can detect the RSSI strength at the target MAC.
Note: this task is intended to be executed on a generic test host to test
if the RSSI of a target device, e.g., a Ryu base, could be detected.
"""
def __init__(self, test):
super(DetectRSSIofTargetMACTask, self).__init__()
self._test = test
self._mac_to_scan = test.GetInputDeviceMac()
self._scan_counts = test.args.scan_counts
self._timeout_secs = test.args.scan_timeout_secs
self._input_device_rssi_key = test.args.input_device_rssi_key
self._progress_thread = None
self._scan_rssi_event = threading.Event()
self.fail_msg = ''
self._average_rssi_lower_threshold = None
self._average_rssi_upper_threshold = None
def _DeriveRSSIThreshold(self, threshold, fid):
if isinstance(threshold, (int, float)):
return threshold
elif isinstance(threshold, dict):
if fid in threshold:
return threshold.get(fid)
else:
self.fail_msg += 'Fixture ID "%s" is not legitimate!\n' % fid
else:
self.fail_msg += 'Wrong type of RSSI threshold: %s\n' % str(threshold)
def Run(self):
fid = event_log.GetDeviceId()
self._average_rssi_lower_threshold = self._DeriveRSSIThreshold(
self._test.args.average_rssi_lower_threshold, fid)
self._average_rssi_upper_threshold = self._DeriveRSSIThreshold(
self._test.args.average_rssi_upper_threshold, fid)
if self.fail_msg:
factory.console.error(self.fail_msg)
self.Fail(self.fail_msg)
return
bluetooth_manager = self._test.dut.bluetooth
adapter = bluetooth_manager.GetFirstAdapter(self._test.host_mac)
logging.info('mac (%s): %s', self._test.host_mac, adapter)
rssis = []
for i in xrange(1, 1 + self._scan_counts):
label = _MSG_DETECT_RSSI(i, self._scan_counts)
self._test.template.SetState(label)
self._scan_rssi_event.clear()
self._progress_thread = SetAndStartScanProgressBar(self._test.template,
self._timeout_secs,
self._scan_rssi_event)
devices = bluetooth_manager.ScanDevices(adapter,
timeout_secs=self._timeout_secs,
match_address=self._mac_to_scan)
self._scan_rssi_event.set()
self._progress_thread.join()
for mac, props in devices.iteritems():
if mac == self._mac_to_scan and 'RSSI' in props:
factory.console.info('RSSI of count %d: %.2f', i, props['RSSI'])
rssis.append(props['RSSI'])
if len(rssis) == 0:
self.Fail('DetectRSSIofTargetMACTask: Fail to get RSSI from device %s.' %
self._mac_to_scan)
else:
average_rssi = float(sum(rssis)) / len(rssis)
factory.set_shared_data(self._input_device_rssi_key, average_rssi)
logging.info('RSSIs at MAC %s: %s', self._mac_to_scan, rssis)
factory.console.info('Average RSSI: %.2f', average_rssi)
fail_msg = ''
if (self._average_rssi_lower_threshold is not None and
average_rssi < self._average_rssi_lower_threshold):
fail_msg += ('Average RSSI %.2f less than the lower threshold %.2f\n' %
(average_rssi, self._average_rssi_lower_threshold))
if (self._average_rssi_upper_threshold is not None and
average_rssi > self._average_rssi_upper_threshold):
fail_msg += ('Average RSSI %.2f greater than the upper threshold %.2f' %
(average_rssi, self._average_rssi_upper_threshold))
# Convert dbus.Int16 in rssis below to regular integers.
status = (('pass' if fail_msg == '' else 'fail') +
' exp: [%.2f, %.2f]' % (self._average_rssi_lower_threshold,
self._average_rssi_upper_threshold))
data = ('Average RSSI: %.2f %s (%s)\n' %
(average_rssi, map(int, rssis), status))
_SaveLogs(self._test.log_file, self._test.aux_log_file, data)
if fail_msg:
factory.console.error(fail_msg)
self.Fail(fail_msg)
else:
self.Pass()
class UnpairTask(factory_task.FactoryTask):
"""A task to unpair from bluetooth devices.
Args:
device_mac: None, or the MAC address of the device to unpair
name_fragment: None, or a substring of the name of the device(s) to unpair
"""
def __init__(self, test, device_mac, name_fragment):
super(UnpairTask, self).__init__()
self._test = test
self._device_mac = device_mac
self._name_fragment = name_fragment
def _ShouldUnpairDevice(self, device_props):
"""Indicate if a device matches the filter, and so should be unpaired
If a name fragment or MAC address is given, the corresponding property
must match. If neither is given, all devices should be unpaired.
"""
if self._device_mac and device_props["Address"] != self._device_mac:
return False
if (self._name_fragment and
self._name_fragment not in device_props.get('Name', '')):
return False
return device_props["Paired"]
def Run(self):
self._test.template.SetState(_MSG_UNPAIRING)
self._test.ui.AppendCSS('.start-font-size {font-size: 2em;}')
input_count_before_unpair = CheckInputCount()
bluetooth_manager = self._test.dut.bluetooth
adapter = bluetooth_manager.GetFirstAdapter(self._test.host_mac)
devices = bluetooth_manager.GetAllDevices(adapter).values()
devices_to_unpair = filter(self._ShouldUnpairDevice, devices)
logging.info('Unpairing %d device(s)', len(devices_to_unpair))
for device_to_unpair in devices_to_unpair:
address = device_to_unpair["Address"]
bluetooth_manager.DisconnectAndUnpairDevice(adapter, address)
bluetooth_manager.RemovePairedDevice(adapter, address)
# Check that we unpaired what we thought we did
expected_input_count = input_count_before_unpair - len(devices_to_unpair)
if WaitForInputCount(self, expected_input_count):
self.Pass()
class CheckDisconnectionOfPairedDeviceTask(factory_task.FactoryTask):
"""A task to check whether a paired device has disconnected.
Args:
device_mac: None, or the MAC address of the device to unpair
name_fragment: None, or a substring of the name of the device(s) to unpair
"""
def __init__(self, test, device_mac):
super(CheckDisconnectionOfPairedDeviceTask, self).__init__()
self._test = test
self._device_mac = device_mac
def _ConnectedDevice(self, device_props):
"""Indicates if a device matches the filter, and so should be unpaired
If a name fragment or MAC address is given, the corresponding property
must match. If neither is given, all devices should be unpaired.
"""
return (device_props["Address"] == self._device_mac and
int(device_props["Connected"]) >= 1)
def _CheckDisconnection(self):
bluetooth_manager = self._test.dut.bluetooth
adapter = bluetooth_manager.GetFirstAdapter(self._test.host_mac)
devices = bluetooth_manager.GetAllDevices(adapter).values()
connected_devices = filter(self._ConnectedDevice, devices)
logging.info('Connected and paired %d device(s)', len(connected_devices))
return len(connected_devices) == 0
def Run(self):
flag_disconnection = RetryWithProgress(
self._test.template, _MSG_UNPAIR,
'Check disconnection of the paired base',
INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
self._CheckDisconnection)
if flag_disconnection:
msg = 'Shift-P-A-I-R: done'
self.Pass()
else:
msg = 'Shift-P-A-I-R: not done'
self.Fail(msg)
factory.console.info(msg)
_SaveLogs(self._test.log_file, self._test.aux_log_file, msg)
def _ExecuteFixtureMethod(fixture, operation, post_sleep=0):
"""Execute a method of the charge test fixture."""
# An operation is mapped to its corresponding fixture method defined in
# base_charge_fixture.BaseChargeFixture class.
FIXTURE_METHOD_DICT = {'START_CHARGING': 'StartCharging',
'STOP_CHARGING': 'StopCharging',
'ENABLE_MAGNET': 'EnableMagnet',
'DISABLE_MAGNET': 'DisableMagnet'}
fixture_method = getattr(fixture, FIXTURE_METHOD_DICT.get(operation))
factory.console.info('Executing fixture method: %s', fixture_method.__name__)
fixture_method()
time.sleep(post_sleep)
class FixtureControlTask(factory_task.FactoryTask):
"""The task to control the charge test fixture.
Args:
operation: the operation to be performed by the test fixture.
"""
def __init__(self, test, operation, post_sleep=0):
# pylint: disable=super-init-not-called
self._fixture = test.fixture
self._operation = operation
self._post_sleep = post_sleep
def Run(self):
try:
_ExecuteFixtureMethod(self._fixture, self._operation,
post_sleep=self._post_sleep)
self.Pass()
except Exception as e:
self.Fail('error in executing %s (%s)' % (self._operation, e))
def _SaveLocalBatteryLog(base_enclosure_serial_number, mac, step,
battery_level, log_filename):
"""Save the battery log on the local test host."""
with open(log_filename, 'a') as f:
f.write('%s %s %s [%s]: %s\n' %
(GetCurrentTime(), base_enclosure_serial_number, mac, step,
battery_level))
class ReadBatteryLevelTask(factory_task.FactoryTask):
"""A class to read battery level."""
MSG_DICT = {READ_BATTERY_STEP_1: _MSG_READ_BATTERY_1,
READ_BATTERY_STEP_2: _MSG_READ_BATTERY_2}
def __init__(self, test, mac, step):
super(ReadBatteryLevelTask, self).__init__()
self._test = test
self._mac = mac
self._step = step
def Run(self):
self._test.template.SetState(self.MSG_DICT.get(self._step))
factory.console.info('%s via %s ...', self._step, self._test.hci_device)
try:
battery_level = int(RetryWithProgress(
self._test.template, self.MSG_DICT.get(self._step), self._step,
INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
bluetooth_utils.GattTool.GetDeviceInfo,
self._mac, 'battery level', hci_device=self._test.hci_device,
timeout=self._test.args.read_bluetooth_uuid_timeout_secs))
factory.console.info('%s: %d', self._step, battery_level)
except bluetooth_utils.BluetoothUtilsError as e:
self.Fail('%s failed to get battery level: %s' % (self._step, e))
return
old_battery_level = factory.get_shared_data(self._step)
if (self._step == READ_BATTERY_STEP_1 and
(old_battery_level is None or battery_level < old_battery_level)):
# If the battery level at step 1 becomes higher over different rounds
# (when the operator keeps retesting it for any reasons),
# we only keep the lowest one. This is because we want to test if the
# battery could charge to a higher level at step 2 than step 1.
factory.set_shared_data(self._step, battery_level)
elif self._step == READ_BATTERY_STEP_2:
# We keep the latest battery level read at step 2.
factory.set_shared_data(self._step, battery_level)
if self._step == READ_BATTERY_STEP_1:
data = ('\nSN: %s\nMAC: %s\n' %
(self._test.args.base_enclosure_serial_number, self._mac))
else:
data = ''
data += '%s: %s\n' % (self._step, battery_level)
_SaveLogs(self._test.log_file, self._test.aux_log_file, data)
if self._test.args.battery_log:
_SaveLocalBatteryLog(self._test.args.base_enclosure_serial_number,
self._mac, self._step, battery_level,
self._test.args.battery_log)
self.Pass()
class CheckBatteryLevelTask(factory_task.FactoryTask):
"""This battery level test checks whether the following condistions
are satisfied:
1. The battery levels are read twice.
2. battery_level_1 < battery_level_2
3. battery_level_1 >= expected_battery_level
"""
def __init__(self, test):
super(CheckBatteryLevelTask, self).__init__()
self._test = test
def Run(self):
self._test.template.SetState(_MSG_BATTERY_CHARGE_TEST)
battery_level_1 = factory.get_shared_data(READ_BATTERY_STEP_1)
battery_level_2 = factory.get_shared_data(READ_BATTERY_STEP_2)
factory.console.info('%s: %s', READ_BATTERY_STEP_1, str(battery_level_1))
factory.console.info('%s: %s', READ_BATTERY_STEP_2, str(battery_level_2))
if not battery_level_1 or not battery_level_2:
fail_msg = 'Battery levels should be read twice. read_1: %s, read_2: %s'
elif (battery_level_1 > battery_level_2 or
(battery_level_1 == battery_level_2 and battery_level_1 < 100)):
fail_msg = 'Base battery is not charged up. read_1: %s, read_2: %s'
elif battery_level_1 < self._test.args.expected_battery_level:
# Note: battery_level_1 instead of battery_level_2 should be larger than
# the expected_battery_level since battery_level_2 is read while
# charging and its value is usually larger than its actual value.
fail_msg = 'Measured battery level %s is less than the expected level %s.'
else:
self.Pass()
return
self.Fail(fail_msg % (battery_level_1, battery_level_2))
class ChargeTestTask(factory_task.FactoryTask):
def __init__(self, test, mac, step):
super(ChargeTestTask, self).__init__()
self._test = test
self._mac = mac
self._step = step
def ReadBatteryLevel(self, step):
_ResetAdapter()
if self._test.args.use_charge_fixture:
_ExecuteFixtureMethod(self._test.fixture, 'ENABLE_MAGNET')
factory.console.info('Begin reading battery level...')
value = bluetooth_utils.GattTool.GetDeviceInfo(
self._mac, 'battery level', hci_device=self._test.hci_device,
timeout=self._test.args.read_bluetooth_uuid_timeout_secs)
if self._test.args.use_charge_fixture:
_ExecuteFixtureMethod(self._test.fixture, 'DISABLE_MAGNET')
factory.console.info('%s: %s', step, value)
return int(value)
def Run(self):
if self._step == READ_BATTERY_STEP_1:
self._test.template.SetState(_MSG_READ_BATTERY_1)
battery_level = self.ReadBatteryLevel(self._step)
factory.set_shared_data(BATTERY_LEVEL_KEY, battery_level)
self.Pass()
elif self._step == READ_BATTERY_STEP_2:
def _ReadAndCheckBatteryLevel():
battery_level2 = self.ReadBatteryLevel(self._step)
result = battery_level2 > battery_level1
if result:
factory.set_shared_data(BATTERY_LEVEL_KEY, battery_level2)
return result
self._test.template.SetState(_MSG_READ_BATTERY_2)
battery_level1 = factory.get_shared_data(BATTERY_LEVEL_KEY)
# Check if the battery is charging for up to READ_BATTERY_MAX_RETRY_TIMES.
# Note: the magnet needs to be taken away and re-applied each time.
# This operation could be performed automatically with a charging
# test fixture; otherwise, it must be performed manually. Also
# note that there is a 5-second delay at reading the battery level.
count = 0
success_increased_level = False
while (not success_increased_level and
count < READ_BATTERY_MAX_RETRY_TIMES):
success_increased_level = _ReadAndCheckBatteryLevel()
count += 1
if success_increased_level:
self.Pass()
else:
self.Fail('ChargeTestTask: the battery is not charging!')
class CheckFirmwareRevisionTestTask(factory_task.FactoryTask):
"""A factory task class to read firmware revision string."""
def __init__(self, test, mac):
super(CheckFirmwareRevisionTestTask, self).__init__()
self._test = test
self._mac = mac
def Run(self):
self._test.template.SetState(_MSG_READ_FIRMWARE_REVISION_STRING)
factory.console.info('Begin reading firmware revision string via %s...',
self._test.hci_device)
try:
fw = RetryWithProgress(
self._test.template, _MSG_READ_FIRMWARE_REVISION_STRING,
'reading firmware', INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
bluetooth_utils.GattTool.GetDeviceInfo, self._mac,
'firmware revision string', hci_device=self._test.hci_device,
timeout=self._test.args.read_bluetooth_uuid_timeout_secs)
except bluetooth_utils.BluetoothUtilsError as e:
self.Fail('Failed to get firmware revision string: %s' % e)
return
factory.console.info('Expected firmware: %s',
self._test.args.firmware_revision_string)
factory.console.info('Actual firmware: %s', fw)
factory.set_shared_data(self._test.args.firmware_revision_string_key, fw)
data = 'FW: %s\n' % fw
_SaveLogs(self._test.log_file, self._test.aux_log_file, data)
if fw == self._test.args.firmware_revision_string:
self.Pass()
else:
self.Fail('Expected firmware: %s, actual firmware: %s' %
(self._test.args.firmware_revision_string, fw))
class InputTestTask(factory_task.FactoryTask):
"""The task to test bluetooth input device functionality.
The task will try to pair with the device given by the test,
and make the connection.
After the connection, the number of input event should plus one.
If it does not plus one, the task fails.
After connection, operator can try to use the input device and press Enter
to pass checking or Esc to fail the task.
In the end of test, the task will try to disconnect the device and remove the
device. If these procedures fail, the task fails.
Args:
finish_after_pair: Whether to end the test after pairing. If false,
the operator is prompted to test input, and then
the device is unpaired
"""
def __init__(self, test, finish_after_pair):
super(InputTestTask, self).__init__()
self._test = test
self._target_mac = None
self._bt_manager = None
self._adapter = None
self._need_to_cleanup = True
self._finish_after_pair = finish_after_pair
def Cleanup(self):
"""Cleans up input device if it was not cleaned"""
if self._need_to_cleanup:
success_to_remove = self.RemoveInput()
if not success_to_remove:
logging.error('Fail to remove input in Cleanup')
def RemoveInput(self):
"""Disconnects the input device and removes it.
Returns:
If disconnection and removal are both succeeded, return True, return False
otherwise.
"""
return_value = True
try:
self._bt_manager.SetDeviceConnected(self._adapter, self._target_mac,
False)
logging.info('Turned off the connection')
except self._bt_manager.Error:
logging.exception('Fail to turn off the connection.')
return_value = False
try:
self._bt_manager.RemovePairedDevice(self._adapter, self._target_mac)
logging.info('Remove the device')
except self._bt_manager.Error:
logging.exception('Fail to remove the device.')
return_value = False
return return_value
def RemoveInputAndQuit(self, success):
"""Removes the input device and quits the task.
Args:
success: The task is passed by operator or not.
Returns:
If the task is passed by operator and input has been removed successfully,
pass the task, fail the task otherwise.
"""
success_to_remove = self.RemoveInput()
# No need to cleanup again after the task does Pass() or Fail() if removal
# succeeds here.
self._need_to_cleanup = not success_to_remove
if success:
if success_to_remove:
self.Pass()
else:
self.Fail('InputTestTask: Fail to remove input')
else:
self.Fail('Failed by operator')
def OperatorTestInput(self):
"""Lets operator test the input and press key to pass/fail the task."""
logging.info('InputTestTask: Test the input by operator now')
self._test.template.SetState(_MSG_TEST_INPUT)
self._test.ui.BindKey(test_ui.ENTER_KEY,
lambda _: self.RemoveInputAndQuit(True))
self._test.ui.BindKey(test_ui.ESCAPE_KEY,
lambda _: self.RemoveInputAndQuit(False))
def Run(self):
def SaveLogAndFail(fail_reason):
"""Save the fail log and invoke Fail()."""
data = 'Pairing fail: %s\n' % fail_reason
_SaveLogs(self._test.log_file, self._test.aux_log_file, data)
self.Fail(fail_reason)
input_count_before_connection = CheckInputCount()
self._bt_manager = self._test.dut.bluetooth
self._adapter = self._bt_manager.GetFirstAdapter(self._test.host_mac)
self._target_mac = self._test.GetInputDeviceMac()
if not self._target_mac:
SaveLogAndFail('InputTestTask: No MAC with which to pair')
logging.info('Attempting pair with %s', self._target_mac)
self._bt_manager.DisconnectAndUnpairDevice(self._adapter, self._target_mac)
success_create_device = RetryWithProgress(
self._test.template, _MSG_PAIR_INPUT_DEVICE, 'create paired device',
INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
self._bt_manager.CreatePairedDevice, self._adapter,
self._target_mac, self._DisplayPasskey, self._AuthenticationCancelled)
if not success_create_device:
SaveLogAndFail('InputTestTask: Fail to create paired device.')
return
success_connect_device = RetryWithProgress(
self._test.template, _MSG_CONNECT_INPUT_DEVICE, 'connect input device',
INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
self._bt_manager.SetDeviceConnected, self._adapter,
self._target_mac, True)
if not success_connect_device:
SaveLogAndFail('InputTestTask: Fail to connect device.')
return
if not WaitForInputCount(self, input_count_before_connection + 1):
return
if self._finish_after_pair:
# We leave the device paired
self._need_to_cleanup = False
data = 'Pairing finished\n'
_SaveLogs(self._test.log_file, self._test.aux_log_file, data)
self.Pass()
return
self.OperatorTestInput()
def _DisplayPasskey(self, passkey):
logging.info("Displaying passkey %s", passkey)
label = MakePasskeyLabelPrompt(passkey)
self._test.template.SetState(label)
def _AuthenticationCancelled(self):
self._test.template.SetState(_MSG_AUTH_FAILED)
class BluetoothTest(unittest.TestCase):
ARGS = [
Arg('expected_adapter_count', int, 'Number of bluetooth adapters'
' on the machine.', default=0),
Arg('manufacturer_id', int, 'manufacturer id', optional=True),
Arg('detect_adapters_retry_times', int, 'Maximum retry time to'
' detect adapters', default=10),
Arg('detect_adapters_interval_secs', int, 'Interval in seconds between'
' each retry to detect adapters', default=2),
Arg('read_bluetooth_uuid_timeout_secs', int,
'Timeout to read bluetooth characteristics via uuid', default=None,
optional=True),
Arg('scan_devices', bool, 'Scan bluetooth device.',
default=False),
Arg('prompt_scan_message', bool, 'Prompts a message to tell user to'
' enable remote devices discovery mode', default=True),
Arg('keyword', str, 'Only cares remote devices whose "Name" contains'
' keyword.', default=None, optional=True),
Arg('average_rssi_threshold', float, 'Checks the largest average RSSI'
' among scanned device is equal to or greater than '
' average_rssi_threshold.',
default=None, optional=True),
Arg('scan_counts', int, 'Number of scans to calculate average RSSI',
default=3),
Arg('scan_timeout_secs', int, 'Timeout to do one scan', default=5),
Arg('input_device_mac', str, 'The mac address of bluetooth input device',
default=None, optional=True),
Arg('input_device_mac_key', str, 'A key for factory shared data '
'containing the mac address', default=None, optional=True),
Arg('input_device_rssi_key', str, 'A key for factory shared data '
'containing the rssi value', default=None, optional=True),
Arg('firmware_revision_string_key', str,
'A key of factory shared data containing firmware revision string',
optional=True),
Arg('firmware_revision_string', str,
'the firmware revision string', optional=True),
Arg('average_rssi_lower_threshold', (float, dict), 'Checks the average'
' RSSI of the target mac is equal to or greater than this threshold.',
default=None, optional=True),
Arg('average_rssi_upper_threshold', (float, dict), 'Checks the average'
' RSSI of the target mac is equal to or less than this threshold.',
default=None, optional=True),
Arg('pair_with_match', bool, 'Whether to pair with the strongest match.',
default=False, optional=True),
Arg('finish_after_pair', bool, 'Whether the test should end immediately '
'after pairing completes', default=False),
Arg('unpair', bool, 'Whether to unpair matching devices instead of pair',
default=False, optional=True),
Arg('check_shift_pair_keys', bool,
'check if shift-p-a-i-r keys are pressed.',
default=False, optional=True),
Arg('check_battery_charging', bool,
'Whether to check if the battery is charging',
default=False, optional=True),
Arg('read_battery_level', int, 'read the battery level',
default=None, optional=True),
Arg('check_battery_level', bool, 'Whether to check the battery level',
default=False, optional=True),
Arg('prompt_into_fixture', bool, 'Prompt the user to place the base into '
'the test fixture', default=False, optional=True),
Arg('use_charge_fixture', bool, 'whether a charge fixture is employed',
default=False, optional=True),
Arg('reset_fixture', bool, 'whether to reset the fixture',
default=False, optional=True),
Arg('start_charging', bool, 'Prompt the user to start charging the base',
default=False, optional=True),
Arg('enable_magnet', bool, 'enable the base',
default=False, optional=True),
Arg('reset_magnet', bool, 'reset the base',
default=False, optional=True),
Arg('stop_charging', bool, 'Prompt the user to stop charging the base',
default=False, optional=True),
Arg('base_enclosure_serial_number', unicode,
'the base enclosure serial number', default=None, optional=True),
Arg('battery_log', str,
'the battery log file', default=None, optional=True),
Arg('expected_battery_level', int,
'the expected battery level', default=100, optional=True),
Arg('log_path', str, 'the directory of the log on the local test host',
optional=True),
Arg('aux_log_path', str, 'the path of the aux log on shopfloor',
optional=True),
Arg('test_host_id_file', str, 'the file storing the id of the test host',
optional=True),
]
def SetStrongestRssiMac(self, mac_addr):
self._strongest_rssi_mac = mac_addr
def GetInputDeviceMac(self):
"""Gets the input device MAC to pair with, or None if None
This may be specified in the arguments, or computed at scan time.
"""
if self._input_device_mac:
return self._input_device_mac
else:
return self._strongest_rssi_mac
def setUp(self):
self.dut = device_utils.CreateDUTInterface()
self.ui = test_ui.UI()
self.ui.AppendCSS('.start-font-size {font-size: 2em;}')
self.template = ui_templates.TwoSections(self.ui)
self.template.SetTitle(_TEST_TITLE)
self._task_list = []
self._strongest_rssi_mac = None
self.fixture = None
if self.args.input_device_mac_key:
self._input_device_mac = (
ColonizeMac(factory.get_shared_data(self.args.input_device_mac_key)))
else:
self._input_device_mac = self.args.input_device_mac
self.btmgmt = bluetooth_utils.BtMgmt(self.args.manufacturer_id)
self.hci_device = self.btmgmt.GetHciDevice()
self.host_mac = self.btmgmt.GetMac()
logging.info('manufacturer_id %s: %s %s',
self.args.manufacturer_id, self.hci_device, self.host_mac)
if self.args.base_enclosure_serial_number:
if (self.args.test_host_id_file and
os.path.isfile(self.args.test_host_id_file)):
with open(self.args.test_host_id_file) as f:
test_host_id = f.read().strip()
else:
test_host_id = None
filename = '.'.join([self.args.base_enclosure_serial_number,
str(test_host_id)])
self.log_file = None
self.aux_log_file = None
if self.args.log_path:
self.log_file = os.path.join(self.args.log_path, filename)
# Note: aux_log_file is generated from log_file
# Not all projects would generate aux_log_file.
if self.args.aux_log_path:
self.aux_log_file = os.path.join(self.args.aux_log_path, filename)
def tearDown(self):
"""Close the charge test fixture."""
if self.args.use_charge_fixture:
self.fixture.Close()
def runTest(self):
if self.args.use_charge_fixture:
# Import this module only when a test station needs it.
# A base SMT test station does not need to use the charge fixture.
# pylint: disable=E0611
from cros.factory.test.fixture import base_charge_fixture
# Note: only reset the fixture in InitializeFixture test.
# This will stop charging and disable the magnet initially.
# For the following tests, do not reset the fixture so that
# the charging could be continued across tests in the test list
# defined in the base_host. The purpose is to keep charing the
# battery while executing other tests.
self.fixture = base_charge_fixture.BaseChargeFixture(
reset=self.args.reset_fixture)
if self.args.expected_adapter_count:
self._task_list.append(DetectAdapterTask(
self, self.args.expected_adapter_count))
if self.args.scan_devices:
if self.args.prompt_scan_message:
self._task_list.append(TurnOnTask(self, _MSG_TURN_ON_DEVICE))
self._task_list.append(ScanDevicesTask(self))
if self.args.input_device_rssi_key:
self._task_list.append(DetectRSSIofTargetMACTask(self))
if self.args.prompt_into_fixture:
self._task_list.append(
TurnOnTask(self, _MSG_INTO_FIXTURE, test_ui.SPACE_KEY))
if self.args.read_battery_level == 1:
self._task_list.append(
ReadBatteryLevelTask(self, self._input_device_mac,
READ_BATTERY_STEP_1))
if self.args.enable_magnet and self.args.use_charge_fixture:
self._task_list.append(FixtureControlTask(self, 'ENABLE_MAGNET'))
if self.args.reset_magnet:
if self.args.use_charge_fixture:
self._task_list.append(
FixtureControlTask(self, 'DISABLE_MAGNET', post_sleep=1))
self._task_list.append(FixtureControlTask(self, 'ENABLE_MAGNET'))
else:
self._task_list.append(
TurnOnTask(self, _MSG_RESET_MAGNET, test_ui.SPACE_KEY))
if self.args.start_charging:
if self.args.use_charge_fixture:
self._task_list.append(
# Let it charge for a little while.
FixtureControlTask(self, 'START_CHARGING'))
else:
self._task_list.append(
TurnOnTask(self, _MSG_START_CHARGE, test_ui.SPACE_KEY))
if self.args.check_shift_pair_keys:
self._task_list.append(
CheckDisconnectionOfPairedDeviceTask(self, self._input_device_mac))
if self.args.unpair:
self._task_list.append(
UnpairTask(self, self._input_device_mac, self.args.keyword))
if self.args.firmware_revision_string:
self._task_list.append(
CheckFirmwareRevisionTestTask(self, self._input_device_mac))
if self.args.pair_with_match:
self._task_list.append(InputTestTask(self, self.args.finish_after_pair))
if self.args.read_battery_level == 2:
self._task_list.append(
ReadBatteryLevelTask(self, self._input_device_mac,
READ_BATTERY_STEP_2))
if self.args.check_battery_level:
self._task_list.append(CheckBatteryLevelTask(self))
if self.args.stop_charging:
if self.args.use_charge_fixture:
self._task_list.append(FixtureControlTask(self, 'STOP_CHARGING'))
else:
self._task_list.append(
TurnOnTask(self, _MSG_STOP_CHARGE, test_ui.SPACE_KEY))
factory_task.FactoryTaskManager(self.ui, self._task_list).Run()