blob: 7ce527b7ab3ba476cce045255bbcaeb875f4d6ee [file] [log] [blame]
# Copyright 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A factory test to verify the functionality of bluetooth device.
Description
-----------
A bluetooth test to detect adapter, scan bluetooth device, check average RSSI
value, and connect with the bluetooth input device.
To run this bluetooth test, the DUT most have at least one bluetooth adapter.
If argument ``scan_devices`` is set, there should be at least one remote
bluetooth device.
If argument ``pair_with_match`` is set, there should be a bluetooth input
device like a mouse.
Test Procedure
--------------
1. Setup a bluetooth input device (like a mouse) if needed.
2. Enable the connection ability of bluetooth on DUT.
3. A prompt message will be displayed on the UI if ``prompt_scan_message`` and
``scan_devices`` is set.
4. The bluetooth test will run automatically.
Dependency
----------
- Device API (``cros.factory.device.chromeos.bluetooth``).
- Bluetooth utility (``cros.factory.test.utils.bluetooth_utils``).
Examples
--------
To detect the specified number of bluetooth adapter on DUT, add this in test
list::
{
"pytest_name": "bluetooth",
"args": {
"expected_adapter_count": 1
}
}
To scan remote bluetooth device and try to find at least one deivce whose name
contains 'KEY_WORD'::
{
"pytest_name": "bluetooth",
"args": {
"scan_devices": true,
"keyword": "KEY_WORD"
}
}
To check the the largest average RSSI among all scanned devices is bigger than
threshold::
{
"pytest_name": "bluetooth",
"args": {
"scan_devices": true,
"average_rssi_threshold": -65.0
}
}
To pair, connect with, and disconnect with the bluetooth input device::
{
"pytest_name": "bluetooth",
"args": {
"pair_with_match": true
}
}
"""
import contextlib
import glob
import logging
import os
import shutil
import sys
import threading
import time
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.test import event_log # TODO(chuntsen): Deprecate event log.
from cros.factory.test.i18n import _
from cros.factory.test import session
from cros.factory.test import state
from cros.factory.test import test_case
from cros.factory.test import test_ui
from cros.factory.test.utils import bluetooth_utils
from cros.factory.testlog import testlog
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import file_utils
from cros.factory.utils import sync_utils
INPUT_MAX_RETRY_TIMES = 10
INPUT_RETRY_INTERVAL = 1
RESET_ADAPTER_SLEEP_TIME = 5
READ_BATTERY_MAX_RETRY_TIMES = 10
BATTERY_LEVEL_KEY = 'battery_level'
READ_BATTERY_STEP_1 = 'read_battery_1'
READ_BATTERY_STEP_2 = 'read_battery_2'
def GetCurrentTime():
"""Get the current time."""
return time.strftime('%Y-%m-%d %H:%M:%S')
def ColonizeMac(mac):
""" Given a MAC address, normalize its colons.
Example: ABCDEF123456 -> AB:CD:EF:12:34:56
"""
mac_no_colons = ''.join(mac.strip().split(':'))
groups = (mac_no_colons[x:x+2] for x in range(0, len(mac_no_colons), 2))
return ':'.join(groups)
def GetInputCount():
"""Returns the number of input devices from probing /dev/input/event*."""
number_input = len(glob.glob('/dev/input/event*'))
logging.info('Found %d input devices.', number_input)
return number_input
def WaitForInputCount(expected_input_count, timeout=10):
"""Waits for the number of input devices to reach the given count.
Args:
expected_input_count: The number of input devices that determines success.
timeout: The maximum time in seconds that we will wait.
Raises:
type_utils.TimeoutError if timeout.
"""
def _CheckInputCount():
return GetInputCount() == expected_input_count
sync_utils.WaitFor(_CheckInputCount, timeout, poll_interval=0.2)
def _AppendLog(log_file, data):
"""Appends the log file on the local station."""
if not log_file:
return
# Prepend the current timestamp to each line.
data = ''.join(GetCurrentTime() + ' ' + line + '\n' if line else '\n'
for line in data.splitlines())
log_dir = os.path.dirname(log_file)
file_utils.TryMakeDirs(log_dir)
with open(log_file, 'a') as log:
log.write(data)
class BluetoothTest(test_case.TestCase):
ARGS = [
Arg('expected_adapter_count', int,
'Number of bluetooth adapters on the machine.',
default=0),
Arg('manufacturer_id', int,
'ID of the manufacturer.',
default=None),
Arg('detect_adapters_retry_times', int,
'Maximum retry time to detect adapters.',
default=10),
Arg('detect_adapters_interval_secs', int,
'Interval in seconds between each retry to detect adapters.',
default=2),
Arg('read_bluetooth_uuid_timeout_secs', int,
'Timeout to read bluetooth characteristics via uuid.',
default=None),
Arg('scan_devices', bool,
'Scan bluetooth device.',
default=False),
Arg('prompt_scan_message', bool,
'Prompts a message to tell user to enable remote devices discovery '
'mode.',
default=True),
Arg('keyword', str,
'Only cares remote devices whose "Name" contains keyword.',
default=None),
Arg('average_rssi_threshold', float,
'Checks the largest average RSSI among scanned device is equal to or '
'greater than average_rssi_threshold.',
default=None),
Arg('scan_counts', int,
'Number of scans to calculate average RSSI.',
default=3),
Arg('scan_timeout_secs', int,
'Timeout to do one scan.',
default=5),
Arg('input_device_mac', str,
'The mac address of bluetooth input device.',
default=None),
Arg('input_device_mac_key', str,
'A key for factory shared data containing the mac address.',
default=None),
Arg('input_device_rssi_key', str,
'A key for factory shared data containing the rssi value.',
default=None),
Arg('firmware_revision_string_key', str,
'A key of factory shared data containing firmware revision string.',
default=None),
Arg('firmware_revision_string', str,
'The firmware revision string.',
default=None),
Arg('average_rssi_lower_threshold', (float, dict),
'Checks the average RSSI of the target mac is equal to or '
'greater than this threshold.',
default=None),
Arg('average_rssi_upper_threshold', (float, dict),
'Checks the average RSSI of the target mac is equal to or '
'less than this threshold.',
default=None),
Arg('pair_with_match', bool,
'Whether to pair with the strongest match.',
default=False),
Arg('finish_after_pair', bool,
'Whether the test should end immediately after pairing completes.',
default=False),
Arg('unpair', bool,
'Whether to unpair matching devices instead of pair.',
default=False),
Arg('check_shift_pair_keys', bool,
'Check if shift-p-a-i-r keys are pressed.',
default=False),
Arg('check_battery_charging', bool,
'Whether to check if the battery is charging.',
default=False),
Arg('read_battery_level', int,
'Read the battery level.',
default=None),
Arg('check_battery_level', bool,
'Whether to check the battery level.',
default=False),
Arg('prompt_into_fixture', bool,
'Prompt the user to place the base into the test fixture.',
default=False),
Arg('use_charge_fixture', bool,
'Whether a charge fixture is employed.',
default=False),
Arg('reset_fixture', bool,
'Whether to reset the fixture.',
default=False),
Arg('start_charging', bool,
'Prompt the user to start charging the base.',
default=False),
Arg('enable_magnet', bool,
'Enable the base.',
default=False),
Arg('reset_magnet', bool,
'Reset the base.',
default=False),
Arg('stop_charging', bool,
'Prompt the user to stop charging the base.',
default=False),
Arg('base_enclosure_serial_number', unicode,
'The base enclosure serial number.',
default=None),
Arg('battery_log', str,
'The battery log file.',
default=None),
Arg('expected_battery_level', int,
'The expected battery level.',
default=100),
Arg('log_path', str,
'The directory of the log on the local test host.',
default=None),
Arg('keep_raw_logs', bool,
'Whether to attach the log by Testlog.',
default=True),
Arg('test_host_id_file', str,
'The file storing the id of the test host.',
default=None),
]
def GetInputDeviceMac(self):
"""Gets the input device MAC to pair with, or None if None
This may be specified in the arguments, or computed at scan time.
"""
if self._input_device_mac:
return self._input_device_mac
else:
return self._strongest_rssi_mac
def setUp(self):
self.dut = device_utils.CreateDUTInterface()
self.ui.ToggleTemplateClass('font-large', True)
self._strongest_rssi_mac = None
if self.args.input_device_mac_key:
self._input_device_mac = (
ColonizeMac(state.DataShelfGetValue(self.args.input_device_mac_key)))
else:
self._input_device_mac = self.args.input_device_mac
self.btmgmt = bluetooth_utils.BtMgmt(self.args.manufacturer_id)
self.hci_device = self.btmgmt.GetHciDevice()
self.host_mac = self.btmgmt.GetMac()
logging.info('manufacturer_id %s: %s %s',
self.args.manufacturer_id, self.hci_device, self.host_mac)
self.log_file = None
self.log_tmp_file = None
if self.args.base_enclosure_serial_number:
self.log_tmp_file = file_utils.CreateTemporaryFile()
if (self.args.test_host_id_file and
os.path.isfile(self.args.test_host_id_file)):
with open(self.args.test_host_id_file) as f:
test_host_id = f.read().strip()
else:
test_host_id = None
filename = '.'.join([self.args.base_enclosure_serial_number,
str(test_host_id)])
if self.args.log_path:
self.log_file = os.path.join(self.args.log_path, filename)
self.fixture = None
if self.args.use_charge_fixture:
# Import this module only when a test station needs it.
# A base SMT test station does not need to use the charge fixture.
# pylint: disable=no-name-in-module
from cros.factory.test.fixture import base_charge_fixture
# Note: only reset the fixture in InitializeFixture test.
# This will stop charging and disable the magnet initially.
# For the following tests, do not reset the fixture so that
# the charging could be continued across tests in the test list
# defined in the base_host. The purpose is to keep charging the
# battery while executing other tests.
self.fixture = base_charge_fixture.BaseChargeFixture(
reset=self.args.reset_fixture)
if self.args.expected_adapter_count:
self.AddTask(self.DetectAdapter, self.args.expected_adapter_count)
if self.args.scan_devices:
if self.args.prompt_scan_message:
self.AddTask(self.WaitKeyPressed,
_('Enable the connection ability of bluetooth device '
'and press Enter'))
self.AddTask(self.ScanDevices)
if self.args.input_device_rssi_key:
self.AddTask(self.DetectRSSIofTargetMAC)
if self.args.prompt_into_fixture:
self.AddTask(self.WaitKeyPressed,
_('Place the base into the fixture, '
'and press the space key on the test host.'),
test_ui.SPACE_KEY)
if self.args.read_battery_level == 1:
self.AddTask(self.ReadBatteryLevel, self._input_device_mac,
READ_BATTERY_STEP_1)
if self.args.enable_magnet and self.args.use_charge_fixture:
self.AddTask(self.FixtureControl, 'ENABLE_MAGNET')
if self.args.reset_magnet:
if self.args.use_charge_fixture:
self.AddTask(self.FixtureControl, 'DISABLE_MAGNET', post_sleep=1)
self.AddTask(self.FixtureControl, 'ENABLE_MAGNET')
else:
self.AddTask(self.WaitKeyPressed,
_('Please re-attach the magnet, '
'and press the space key on the test host.'),
test_ui.SPACE_KEY)
if self.args.start_charging:
if self.args.use_charge_fixture:
# Let it charge for a little while.
self.AddTask(self.FixtureControl, 'START_CHARGING')
else:
self.AddTask(self.WaitKeyPressed,
_('Turn on charging by pressing the green button, '
'take the keyboard out and put it back, '
'and press the space key on the test host.'),
test_ui.SPACE_KEY)
if self.args.check_shift_pair_keys:
self.AddTask(self.CheckDisconnectionOfPairedDevice,
self._input_device_mac)
if self.args.unpair:
self.AddTask(self.Unpair, self._input_device_mac, self.args.keyword)
if self.args.firmware_revision_string:
self.AddTask(self.CheckFirmwareRevision, self._input_device_mac)
if self.args.pair_with_match:
self.AddTask(self.TestInput, self.args.finish_after_pair)
if self.args.read_battery_level == 2:
self.AddTask(self.ReadBatteryLevel, self._input_device_mac,
READ_BATTERY_STEP_2)
if self.args.check_battery_level:
self.AddTask(self.CheckBatteryLevel)
if self.args.stop_charging:
if self.args.use_charge_fixture:
self.AddTask(self.FixtureControl, 'STOP_CHARGING')
else:
self.AddTask(self.WaitKeyPressed,
_('Press the green button again to stop charging, '
'and press the space key on the test host.'),
test_ui.SPACE_KEY)
# Group checker for Testlog.
self.group_checker = testlog.GroupParam('avg_rssi', ['mac', 'average_rssi'])
def tearDown(self):
"""Close the charge test fixture."""
if self.args.use_charge_fixture:
self.fixture.Close()
if self.log_file:
shutil.copyfile(self.log_tmp_file, self.log_file)
if self.log_tmp_file:
if self.args.keep_raw_logs:
testlog.AttachFile(
path=self.log_tmp_file,
mime_type='text/plain',
name='bluetooth.log',
description='plain text log of bluetooth',
delete=False)
os.remove(self.log_tmp_file)
def WaitKeyPressed(self, message, key=test_ui.ENTER_KEY):
"""Ask operator to turn on bluetooth device and press given key.
Args:
message: Html code containing message to show on the screen.
key: The key to be pressed.
"""
self.ui.SetState(message)
logging.info('wait for the user to press key %s', key)
self.ui.WaitKeysOnce(key)
def CheckFirmwareRevision(self, mac):
"""A task to read firmware revision string."""
self.ui.SetState(_('Read firmware revision string.'))
session.console.info('Begin reading firmware revision string via %s...',
self.hci_device)
try:
fw = self.RetryWithProgress(
'reading firmware', INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
bluetooth_utils.GattTool.GetDeviceInfo, mac,
'firmware revision string', hci_device=self.hci_device,
timeout=self.args.read_bluetooth_uuid_timeout_secs)
except bluetooth_utils.BluetoothUtilsError as e:
self.FailTask('Failed to get firmware revision string: %s' % e)
session.console.info('Expected firmware: %s',
self.args.firmware_revision_string)
session.console.info('Actual firmware: %s', fw)
state.DataShelfSetValue(self.args.firmware_revision_string_key, fw)
_AppendLog(self.log_tmp_file, 'FW: %s\n' % fw)
self.assertEqual(self.args.firmware_revision_string, fw,
'Expected firmware: %s, actual firmware: %s' %
(self.args.firmware_revision_string, fw))
def CheckBatteryLevel(self):
"""Checks whether the following conditions are satisfied:
1. The battery levels are read twice.
2. battery_level_1 < battery_level_2
3. battery_level_1 >= expected_battery_level
"""
self.ui.SetState(
_('Check if the battery has charged to a higher percentage'))
battery_level_1 = state.DataShelfGetValue(READ_BATTERY_STEP_1)
battery_level_2 = state.DataShelfGetValue(READ_BATTERY_STEP_2)
session.console.info('%s: %s', READ_BATTERY_STEP_1, battery_level_1)
session.console.info('%s: %s', READ_BATTERY_STEP_2, battery_level_2)
if not battery_level_1 or not battery_level_2:
fail_msg = 'Battery levels should be read twice. read_1: %s, read_2: %s'
elif (battery_level_1 > battery_level_2 or
(battery_level_1 == battery_level_2 and battery_level_1 < 100)):
fail_msg = 'Base battery is not charged up. read_1: %s, read_2: %s'
elif battery_level_1 < self.args.expected_battery_level:
# Note: battery_level_1 instead of battery_level_2 should be larger than
# the expected_battery_level since battery_level_2 is read while
# charging and its value is usually larger than its actual value.
fail_msg = 'Measured battery level %s is less than the expected level %s.'
else:
return
self.FailTask(fail_msg % (battery_level_1, battery_level_2))
def ReadBatteryLevel(self, mac, step):
"""Read battery level."""
msg = {
READ_BATTERY_STEP_1: _('Read battery level for the 1st time.'),
READ_BATTERY_STEP_2: _('Read battery level for the 2nd time.')
}[step]
self.ui.SetState(msg)
session.console.info('%s via %s ...', step, self.hci_device)
try:
battery_level = int(self.RetryWithProgress(
step, INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
bluetooth_utils.GattTool.GetDeviceInfo,
mac, 'battery level', hci_device=self.hci_device,
timeout=self.args.read_bluetooth_uuid_timeout_secs))
session.console.info('%s: %d', step, battery_level)
except bluetooth_utils.BluetoothUtilsError as e:
self.FailTask('%s failed to get battery level: %s' % (step, e))
old_battery_level = state.DataShelfGetValue(step)
if (step == READ_BATTERY_STEP_1 and
(old_battery_level is None or battery_level < old_battery_level)):
# If the battery level at step 1 becomes higher over different rounds
# (when the operator keeps retesting it for any reasons),
# we only keep the lowest one. This is because we want to test if the
# battery could charge to a higher level at step 2 than step 1.
state.DataShelfSetValue(step, battery_level)
elif step == READ_BATTERY_STEP_2:
# We keep the latest battery level read at step 2.
state.DataShelfSetValue(step, battery_level)
if step == READ_BATTERY_STEP_1:
data = '\nSN: %s\nMAC: %s\n' % (self.args.base_enclosure_serial_number,
mac)
else:
data = ''
data += '%s: %s\n' % (step, battery_level)
_AppendLog(self.log_tmp_file, data)
if self.args.battery_log:
with open(self.args.battery_log, 'a') as f:
f.write('%s %s %s [%s]: %s\n' %
(GetCurrentTime(), self.args.base_enclosure_serial_number, mac,
step, battery_level))
def FixtureControl(self, operation, post_sleep=0):
"""Control the charge test fixture.
Args:
operation: the operation to be performed by the test fixture.
"""
try:
# An operation is mapped to its corresponding fixture method defined in
# base_charge_fixture.BaseChargeFixture class.
FIXTURE_METHOD_DICT = {'START_CHARGING': 'StartCharging',
'STOP_CHARGING': 'StopCharging',
'ENABLE_MAGNET': 'EnableMagnet',
'DISABLE_MAGNET': 'DisableMagnet'}
fixture_method = getattr(self.fixture, FIXTURE_METHOD_DICT.get(operation))
session.console.info('Executing fixture method: %s',
fixture_method.__name__)
fixture_method()
self.Sleep(post_sleep)
except Exception as e:
self.FailTask('error in executing %s (%s)' % (operation, e))
def DetectAdapter(self, expected_adapter_count):
"""Check number of adapters.
Detects adapters from dbus and checks if the number of adapters matches the
expected number.
Args:
expected_adapter_count: The expected number of bluetooth adapters.
"""
self.ui.SetState(_('Detect bluetooth adapter'))
adapters = self.dut.bluetooth.GetAdapters(
self.args.detect_adapters_retry_times,
self.args.detect_adapters_interval_secs)
self.assertEqual(
len(adapters), expected_adapter_count,
'DetectAdapter: expect %d and find %d adapter(s).' %
(expected_adapter_count, len(adapters)))
def Unpair(self, device_mac, name_fragment):
"""Unpair from bluetooth devices.
Args:
device_mac: None, or the MAC address of the device to unpair.
name_fragment: None, or substring of the name of the device(s) to unpair.
"""
def _ShouldUnpairDevice(device_props):
"""Indicate if a device matches the filter, and so should be unpaired.
If a name fragment or MAC address is given, the corresponding property
must match. If neither is given, all devices should be unpaired.
"""
if device_mac is not None and device_props['Address'] != device_mac:
return False
if (name_fragment is not None and
name_fragment not in device_props.get('Name', '')):
return False
return device_props['Paired']
self.ui.SetState(_('Unpairing'))
input_count_before_unpair = GetInputCount()
bluetooth_manager = self.dut.bluetooth
adapter = bluetooth_manager.GetFirstAdapter(self.host_mac)
devices = bluetooth_manager.GetAllDevices(adapter).values()
devices_to_unpair = filter(_ShouldUnpairDevice, devices)
logging.info('Unpairing %d device(s)', len(devices_to_unpair))
for device_to_unpair in devices_to_unpair:
address = device_to_unpair['Address']
bluetooth_manager.DisconnectAndUnpairDevice(adapter, address)
bluetooth_manager.RemovePairedDevice(adapter, address)
# Check that we unpaired what we thought we did
expected_input_count = input_count_before_unpair - len(devices_to_unpair)
WaitForInputCount(expected_input_count)
def ScanDevices(self):
"""Scan bluetooth devices around.
In this task, the test will control the first adapter from BluetoothManager
and scan devices around for timeout_secs. The task passed if there is at
least one device.
If target_addresses is provided, the test will also check if it can find
at least one device specified in target_addresses list.
This sets the strongest matching device mac to self._strongest_rssi_mac.
Note: this task is intended to be executed on a DUT, i.e., a chromebook, to
test its bluetooth module. A typical test case is to see if it can detect a
bluetooth mouse placed around it.
"""
keyword = self.args.keyword
average_rssi_threshold = self.args.average_rssi_threshold
scan_counts = self.args.scan_counts
timeout_secs = self.args.scan_timeout_secs
def FilterByKeyword(devices):
"""Returns the devices filtered by keyword.
If keyword is None, leave devices as it is.
"""
if keyword is None:
return devices
filtered_devices = {}
for mac, props in devices.iteritems():
if 'Name' not in props:
logging.warning('Device %s: %s does not have "Name" property.',
mac, props)
continue
if keyword in props['Name']:
filtered_devices[mac] = props
logging.info('Device %s: "Name" property %s matches keyword %s.',
mac, props['Name'], keyword)
return filtered_devices
def UpdateRssi(devices_rssis, devices):
"""Updates devices_rssis using RSSI property in devices.
Args:
devices_rssis: A dict. Keys are mac addresses and values are lists of
scanned RSSI value.
devices: A dict. Keys are mac addresses and values are dicts of
properties.
"""
for mac, props in devices.iteritems():
if 'RSSI' not in props:
logging.warning('Device %s: %s does not have "RSSI" property.',
mac, props)
continue
# typecast to str to avoid the weird dbus.String type
devices_rssis.setdefault(str(mac), []).append(int(props['RSSI']))
logging.info('UpdateRssi: %s', devices_rssis)
mac_to_scan = self.GetInputDeviceMac()
def HasScannedTargetMac():
"""Helper to check if the target MAC has been scanned."""
return mac_to_scan and mac_to_scan in candidate_rssis
bluetooth_manager = self.dut.bluetooth
adapter = bluetooth_manager.GetFirstAdapter(self.host_mac)
# Records RSSI of each scan and calculates average rssi.
candidate_rssis = {}
for unused_count in xrange(scan_counts):
self.ui.SetState(_('Scanning...'))
with self.TimedProgressBar(timeout_secs):
devices = bluetooth_manager.ScanDevices(adapter, timeout_secs)
logging.info('Found %d device(s).', len(devices))
for mac, props in devices.iteritems():
try:
logging.info('Device found: %s. Name: %s, RSSI: %d',
mac, props['Name'], props['RSSI'])
except KeyError:
logging.exception('Name or RSSI is not available in %s', mac)
UpdateRssi(candidate_rssis, FilterByKeyword(devices))
# Optimization: if we are only interested in one particular address,
# then we can early-out as soon as we find it
if average_rssi_threshold is None and HasScannedTargetMac():
logging.info("Address found, ending scan early")
break
logging.info('Found %d candidate device(s) in %s scans.',
len(candidate_rssis), scan_counts)
session.console.info('Candidate devices scan results: %s', candidate_rssis)
if not candidate_rssis:
self.FailTask('ScanDevicesTask: Fail to find any candidate device.')
# Calculates maximum average RSSI.
max_average_rssi_mac, max_average_rssi = None, -sys.float_info.max
for mac, rssis in candidate_rssis.iteritems():
average_rssi = float(sum(rssis)) / len(rssis)
logging.info('Device %s has average RSSI: %f', mac, average_rssi)
event_log.Log('avg_rssi', mac=mac, average_rssi=average_rssi)
with self.group_checker:
testlog.LogParam('mac', mac)
testlog.LogParam('average_rssi', average_rssi)
if average_rssi > max_average_rssi:
max_average_rssi_mac, max_average_rssi = mac, average_rssi
logging.info('Device %s has the largest average RSSI: %f',
max_average_rssi_mac, max_average_rssi)
event_log.Log('bluetooth_scan_device', mac=max_average_rssi_mac,
rssi=max_average_rssi,
meet=max_average_rssi >= average_rssi_threshold)
testlog.LogParam('max_average_rssi_mac', max_average_rssi_mac)
testlog.CheckNumericParam('max_average_rssi', max_average_rssi,
min=average_rssi_threshold)
self._strongest_rssi_mac = max_average_rssi_mac
if mac_to_scan and not HasScannedTargetMac():
found_addresses = candidate_rssis.keys()
self.FailTask('Failed to find MAC address %s.'
'Scanned addresses: %s' % (mac_to_scan, found_addresses))
if average_rssi_threshold is None:
# Test is uninterested in RSSI thresholds
pass
elif average_rssi_threshold > max_average_rssi:
session.console.error('The largest average RSSI %f does not meet'
' threshold %f. Please ensure that the test BT '
"device is 'visible' and close to the DUT "
'antenna.',
max_average_rssi, average_rssi_threshold)
self.FailTask(
'ScanDeviceTask: The largest average RSSI %f of device %s does'
' not meet threshold %f.' %
(max_average_rssi, max_average_rssi_mac, average_rssi_threshold))
else:
session.console.info('The largest average RSSI %f meets threshold %f.',
max_average_rssi, average_rssi_threshold)
def CheckDisconnectionOfPairedDevice(self, device_mac):
"""Check whether a paired device has disconnected.
Args:
device_mac: None, or the MAC address of the device to unpair
"""
def _ConnectedDevice(device_props):
"""Indicates if a connected device matches the device_mac."""
return (device_props["Address"] == device_mac and
int(device_props["Connected"]) >= 1)
def _CheckDisconnection():
bluetooth_manager = self.dut.bluetooth
adapter = bluetooth_manager.GetFirstAdapter(self.host_mac)
devices = bluetooth_manager.GetAllDevices(adapter).values()
connected_devices = filter(_ConnectedDevice, devices)
logging.info('Connected and paired %d device(s)', len(connected_devices))
return not connected_devices
self.ui.SetState(_('Press shift-p-a-i-r simultaneously on the base.'))
disconnected = self.RetryWithProgress(
'Check disconnection of the paired base', INPUT_MAX_RETRY_TIMES,
INPUT_RETRY_INTERVAL, _CheckDisconnection)
if disconnected:
msg = 'Shift-P-A-I-R: done'
else:
msg = 'Shift-P-A-I-R: not done'
session.console.info(msg)
_AppendLog(self.log_tmp_file, msg)
if not disconnected:
self.FailTask(msg)
def DetectRSSIofTargetMAC(self):
"""Detect the RSSI strength at a given target MAC address.
In this task, a generic test host uses the first adapter from
BluetoothManager and scans devices around for timeout_secs. The task
passed if it can detect the RSSI strength at the target MAC.
Note: this task is intended to be executed on a generic test host to test
if the RSSI of a target device, e.g., a Ryu base, could be detected.
"""
mac_to_scan = self.GetInputDeviceMac()
scan_counts = self.args.scan_counts
timeout_secs = self.args.scan_timeout_secs
input_device_rssi_key = self.args.input_device_rssi_key
fail_msg = []
def _DeriveRSSIThreshold(threshold, fid):
if isinstance(threshold, (int, float)):
return threshold
elif isinstance(threshold, dict):
if fid in threshold:
return threshold.get(fid)
else:
fail_msg.append('Fixture ID "%s" is not legitimate!\n' % fid)
else:
fail_msg.append('Wrong type of RSSI threshold: %s\n' % threshold)
return None
fid = session.GetDeviceID()
average_rssi_lower_threshold = _DeriveRSSIThreshold(
self.args.average_rssi_lower_threshold, fid)
average_rssi_upper_threshold = _DeriveRSSIThreshold(
self.args.average_rssi_upper_threshold, fid)
if fail_msg:
fail_msg = ''.join(fail_msg)
session.console.error(fail_msg)
self.FailTask(fail_msg)
bluetooth_manager = self.dut.bluetooth
adapter = bluetooth_manager.GetFirstAdapter(self.host_mac)
logging.info('mac (%s): %s', self.host_mac, adapter)
rssis = []
for i in xrange(1, 1 + scan_counts):
self.ui.SetState(
_('Detect RSSI (count {count}/{total})', count=i, total=scan_counts))
with self.TimedProgressBar(timeout_secs):
devices = bluetooth_manager.ScanDevices(adapter,
timeout_secs=timeout_secs,
match_address=mac_to_scan)
for mac, props in devices.iteritems():
if mac == mac_to_scan and 'RSSI' in props:
session.console.info('RSSI of count %d: %.2f', i, props['RSSI'])
rssis.append(props['RSSI'])
if not rssis:
self.FailTask(
'DetectRSSIofTargetMAC: Fail to get RSSI from device %s.' %
mac_to_scan)
average_rssi = float(sum(rssis)) / len(rssis)
state.DataShelfSetValue(input_device_rssi_key, average_rssi)
logging.info('RSSIs at MAC %s: %s', mac_to_scan, rssis)
session.console.info('Average RSSI: %.2f', average_rssi)
fail_msg = ''
if (average_rssi_lower_threshold is not None and
average_rssi < average_rssi_lower_threshold):
fail_msg += ('Average RSSI %.2f less than the lower threshold %.2f\n' %
(average_rssi, average_rssi_lower_threshold))
if (average_rssi_upper_threshold is not None and
average_rssi > average_rssi_upper_threshold):
fail_msg += ('Average RSSI %.2f greater than the upper threshold %.2f' %
(average_rssi, average_rssi_upper_threshold))
# Convert dbus.Int16 in rssis below to regular integers.
status = (('pass' if fail_msg == '' else 'fail') +
' exp: [%.2f, %.2f]' % (average_rssi_lower_threshold,
average_rssi_upper_threshold))
data = ('Average RSSI: %.2f %s (%s)\n' %
(average_rssi, map(int, rssis), status))
_AppendLog(self.log_tmp_file, data)
if fail_msg:
session.console.error(fail_msg)
self.FailTask(fail_msg)
def TestInput(self, finish_after_pair):
"""Test bluetooth input device functionality.
The task will try to pair with the device given by the test,
and make the connection.
After the connection, the number of input event should plus one.
If it does not plus one, the task fails.
After connection, operator can try to use the input device and press Enter
to pass checking or Esc to fail the task.
In the end of test, the task will try to disconnect the device and remove
the device. If these procedures fail, the task fails.
Args:
finish_after_pair: Whether to end the test after pairing. If false,
the operator is prompted to test input, and then
the device is unpaired.
"""
def RemoveInput():
"""Disconnects the input device and removes it.
Returns:
If disconnection and removal both succeeded, return True, return False
otherwise.
"""
return_value = True
try:
bt_manager.SetDeviceConnected(adapter, target_mac, False)
logging.info('Turned off the connection')
except bt_manager.Error:
logging.exception('Fail to turn off the connection.')
return_value = False
try:
bt_manager.RemovePairedDevice(adapter, target_mac)
logging.info('Remove the device')
except bt_manager.Error:
logging.exception('Fail to remove the device.')
return_value = False
return return_value
def SaveLogAndFail(fail_reason):
"""Save the fail log and invoke Fail()."""
data = 'Pairing fail: %s\n' % fail_reason
_AppendLog(self.log_tmp_file, data)
self.FailTask(fail_reason)
def DisplayPasskey(self, passkey):
logging.info("Displaying passkey %s", passkey)
self.ui.SetState(
_('Enter passkey {key} then press enter on the base.', key=passkey))
def AuthenticationCancelled(self):
self.ui.SetState(_('Authentication failed, retrying...'))
need_to_cleanup = True
try:
input_count_before_connection = GetInputCount()
bt_manager = self.dut.bluetooth
adapter = bt_manager.GetFirstAdapter(self.host_mac)
target_mac = self.GetInputDeviceMac()
if not target_mac:
SaveLogAndFail('InputTestTask: No MAC with which to pair')
logging.info('Attempting pair with %s', target_mac)
bt_manager.DisconnectAndUnpairDevice(adapter, target_mac)
self.ui.SetState(_('Pairing to input device now...'))
success_create_device = self.RetryWithProgress(
'create paired device', INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
bt_manager.CreatePairedDevice, adapter, target_mac,
DisplayPasskey, AuthenticationCancelled)
if not success_create_device:
SaveLogAndFail('InputTestTask: Fail to create paired device.')
self.ui.SetState(_('Connecting to input device now...'))
success_connect_device = self.RetryWithProgress(
'connect input device', INPUT_MAX_RETRY_TIMES, INPUT_RETRY_INTERVAL,
bt_manager.SetDeviceConnected, adapter, target_mac, True)
if not success_connect_device:
SaveLogAndFail('InputTestTask: Fail to connect device.')
WaitForInputCount(input_count_before_connection + 1)
if finish_after_pair:
# We leave the device paired
need_to_cleanup = False
_AppendLog(self.log_tmp_file, 'Pairing finished\n')
return
logging.info('InputTestTask: Test the input by operator now')
self.ui.SetState(
_('Please test input. Press Escape to fail and Enter to pass'))
key = self.ui.WaitKeysOnce([test_ui.ENTER_KEY, test_ui.ESCAPE_KEY])
passed = key == test_ui.ENTER_KEY
success_to_remove = RemoveInput()
# No need to cleanup again after the task if removal succeeds here.
need_to_cleanup = not success_to_remove
if passed:
if success_to_remove:
return
else:
self.FailTask('InputTestTask: Fail to remove input')
else:
self.FailTask('Failed by operator')
finally:
if need_to_cleanup:
success_to_remove = RemoveInput()
if not success_to_remove:
logging.error('Fail to remove input in Cleanup')
@contextlib.contextmanager
def TimedProgressBar(self, timeout_secs):
"""Show timeout on progress bar."""
self.ui.DrawProgressBar(timeout_secs)
start_time = time.time()
stop_event = threading.Event()
def UpdateProgressBar():
elapsed_time = time.time() - start_time
if stop_event.isSet() or elapsed_time >= timeout_secs:
self.ui.SetProgress(timeout_secs)
raise StopIteration
self.ui.SetProgress(elapsed_time)
self.event_loop.AddTimedHandler(UpdateProgressBar, 0.2, repeat=True)
try:
yield
finally:
stop_event.set()
def RetryWithProgress(self, action_string, max_retry_times, retry_interval,
target, *args, **kwargs):
"""Runs target function with retries and shows retry times on progress bar.
Args:
action_string: The string to describe the action in logging.
max_retry_times: the maximal retry times.
retry_interval: the interval between retries.
target: The target function. *args and **kwargs will be passed to target.
Returns:
Return the return value of the target function.
"""
self.ui.DrawProgressBar(max_retry_times)
result = None
for unused_retry in xrange(max_retry_times):
try:
result = target(*args, **kwargs)
except Exception:
pass
self.ui.AdvanceProgress()
if result:
break
self.Sleep(retry_interval)
self.ui.SetProgress(max_retry_times)
logging.info('%s was done.' if result else '%s failed.', action_string)
return result