blob: 1f2d9fa1c07d68fc4693ce6d728cd4e01a43949e [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Allows testing and verification of GPS chips on Android DUTs.
GPS devices have a standard text-based output format called NMEA sentences.
This test reads those NMEA sentences for certain values, and checks that they
are within configurable ranges.
Here's an example of input arguments::
ARGS={
'station_name': 'gps_fatp',
'fixture_id': session.GetDeviceID(),
'timeout': 30,
'init_timeout': 30,
'warmup_count': 1,
'gps_config_file': 'gpsconfig_jobs.xml',
'gps_config_job': 'Factory_Track_Test',
'nmea_out_path': '/data/gps/nmea_out',
'nmea_prefix': '$PREFIX,1,',
'nmea_fields': {'signal_strength': 4,
'carrier_to_noise': 7,
'ref_clock_offset': 12},
'limits': [('signal_strength', 'count', '>', 25),
('signal_strength', 'mean', '>', -120),
('ref_clock_offset', 'mean', '>', -3000),
('ref_clock_offset', 'mean', '<', 3000)]})}
"""
from __future__ import print_function
import logging
import os
import re
import subprocess
import sys
import threading
import unittest
import numpy
import factory_common # pylint: disable=unused-import
from cros.factory import device
from cros.factory.device import device_utils
from cros.factory.test import event_log # TODO(chuntsen): Deprecate event log.
from cros.factory.test import session
from cros.factory.testlog import testlog
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import sync_utils
from cros.factory.utils import time_utils
START_GLGPS_TIMEOUT = 5
DEFAULT_INIT_TIMEOUT = 10
DEFAULT_TIMEOUT = 15
DEVICE_GPS_CONFIG_PATH = '/data/gps/gpsconfig_jobs.xml'
EVENT_LOG_NAME = 'gps'
GLGPS_BINARY = 'glgps'
SERIAL_NOT_AVAILABLE = 'NOT_AVAILABLE'
_LOGPARSER_PASS = 'PASSED'
_LOGPARSER_FAIL = 'FAILED'
STAT_FNS = {
'count': len,
'min': lambda l: float(numpy.min(l)),
'max': lambda l: float(numpy.max(l)),
'median': lambda l: float(numpy.median(l)),
'mean': lambda l: float(numpy.mean(l)),
'std': lambda l: float(numpy.std(l))}
CMP_FNS = {
'<': lambda x, y: x < y,
'<=': lambda x, y: x <= y,
'>': lambda x, y: x > y,
'>=': lambda x, y: x >= y}
class GPS(unittest.TestCase):
ARGS = [
Arg('station_name', str,
'Name of the station. We might want to run the GPS test at '
'different points in the factory. This can be used to identify '
'them, and will be saved in event_logs. e.g. "gps_smt"'),
Arg('fixture_id', str,
'Name of the fixture. This will be saved in event_logs.'),
Arg('init_timeout', (int, float),
'How long to poll for good data before giving up. '
'Default %d seconds.' % DEFAULT_INIT_TIMEOUT,
default=DEFAULT_INIT_TIMEOUT),
Arg('timeout', (int, float),
'How long to run the test. Default %d seconds.' % DEFAULT_TIMEOUT,
default=DEFAULT_TIMEOUT),
Arg('warmup_count', int,
'How many initial matching NMEA sentences to ignore before starting '
'to record data.',
default=0),
Arg('gps_config_file', str,
'Relative or absolute path to GPS configuration file to upload '
'to device. If relative, searches both in directory of this test '
'file, and in directory of currently executing Python script.'),
Arg('gps_config_job', str,
'Name of the job within gps_config_file to run. This will be passed '
'as an argument to glgps to start the job during test execution.'),
Arg('nmea_out_path', str,
'Path to the nmea_out file on the device. This should be a named '
'pipe which is specified in gps_config_file in the <hal> element, '
'like so: <hal NmeaOutName="/data/gps/nmea_out">'),
Arg('nmea_prefix', str,
'Prefix of NMEA sentences for which to filter. Only these sentences '
'will be parsed based on nmea_fields.'),
Arg('nmea_fields', dict,
'Dictionary of fields to pull from the NMEA sentence. '
'Key is a string representing the name of the field, and '
'value is its comma-separated index.'),
Arg('limits', list,
'List of limits, in the format ("nmea_field", "fn", "cmp", value), '
'where fn can be any of %s, and cmp can be any of %s.'
% (STAT_FNS.keys(), CMP_FNS.keys()),
default=[])
]
def setUp(self):
self.dut = device_utils.CreateDUTInterface()
# Store the serial numbers for later use.
self._mlb_serial_number = (self.dut.info.mlb_serial_number or
SERIAL_NOT_AVAILABLE)
self._serial_number = (self.dut.info.serial_number or SERIAL_NOT_AVAILABLE)
session.console.info('Got MLB serial number: %s', self._mlb_serial_number)
session.console.info('Got serial number: %s', self._serial_number)
# Create a list for test-level failures.
self._failures = []
def _PushConfigFile(self):
# Push the jobs config so we can start the job <self.args.gps_config_job>.
session.console.info('Pushing config file...')
config_path = None
if self.args.gps_config_file.startswith('/'):
if os.path.isfile(self.args.gps_config_file):
config_path = self.args.gps_config_file
else:
# May be in the directory of this file, or the executing Python script.
possible_dirs = [os.path.dirname(os.path.abspath(__file__)),
os.path.dirname(os.path.abspath(sys.argv[0]))]
for possible_dir in possible_dirs:
try_config_path = os.path.join(possible_dir, self.args.gps_config_file)
if os.path.isfile(try_config_path):
config_path = try_config_path
break
if not config_path:
self.fail('Config file %s could not be found' % self.args.gps_config_file)
with open(config_path) as f:
self.dut.WriteFile('/data/gps', f.read())
def _ParseNMEAStream(self, file_stream):
"""Parse NMEA stream and return values.
Returns:
Dictionary, where keys are from self.args.nmea_fields, and values
are lists of parsed values in NMEA sentences.
"""
all_values = {key: [] for key in self.args.nmea_fields}
start_time = time_utils.MonotonicTime()
timeout = self.args.init_timeout # Initial timeout to get valid data.
warmup_count = self.args.warmup_count # Initial sentences to ignore.
init = False
warmup_init = False
nmea_line = None
while True:
time_left = timeout - (time_utils.MonotonicTime() - start_time)
if time_left <= 0:
break
# Make sure we have a useful NMEA line to work with.
nmea_line = file_stream.readline().rstrip()
if not nmea_line.startswith(self.args.nmea_prefix):
continue
logging.debug('[%d] nmea_line: %s', time_left, nmea_line)
# Split the comma-separated data. Use the indices sorted in ascending
# order to look at values deterministically from left-to-right.
nmea_values = nmea_line.split(',')
sorted_indices = sorted(self.args.nmea_fields.values())
index_to_key = {v: k for k, v in self.args.nmea_fields.items()}
values = [nmea_values[index] for index in sorted_indices]
# Initialization.
if not init:
if all([value != '' for value in values]):
# This is the first set of valid values we are getting.
init = True
else:
session.console.info('[%d] Waiting for initialization...', time_left)
continue
# Warmup.
if not warmup_init:
if warmup_count == 0:
# This is the first warmed up value we are getting.
warmup_init = True
start_time = time_utils.MonotonicTime()
time_left = timeout = self.args.timeout
else:
session.console.info('[%d] Warming up...', time_left)
warmup_count -= 1
continue
# Check and record each value.
current_values = {}
for index in sorted_indices:
key = index_to_key[index]
if nmea_values[index] == '':
self._failures.append(
'Empty value for %s encountered after initialization' % key)
return all_values
try:
parsed_value = float(nmea_values[index])
except ValueError:
self._failures.append('Non-numeric value encountered for %s: %s'
% (key, nmea_values[index]))
return all_values
all_values[key].append(parsed_value)
current_values[key] = parsed_value
# Print parsed values.
current_values_str = ' '.join(
'%s=%.1f' % (key, value)
for key, value in current_values.iteritems())
session.console.info('[%d] %s', time_left, current_values_str)
logging.debug('Timeout has been reached (%d secs)', self.args.timeout)
return all_values
def _CheckLimits(self, all_values):
"""Check limits specified by self.args.limits.
Returns:
The tuple (field_stats, limit_results, limit_failures_str), where:
field_stats: A dictionary mapping each NMEA field. Values are
dictionaries of STAT_FNS.keys() to its corresponding calculated
value.
limit_results: A dictionary mapping each limit (from self.args.limits)
to a boolean representing its success/failure.
limit_failures_str: A list of strings, where each string represents a
human-readable representation of a limit failure.
"""
# Are there values to calculate stats?
if not all(all_values.values()):
self._failures.append('Never initialized')
field_stats = None
else:
field_stats = {key: {} for key in self.args.nmea_fields}
for key, values in all_values.iteritems():
for fn_name, fn in STAT_FNS.iteritems():
field_stats[key][fn_name] = fn(values)
field_stats_str = ' '.join(
'%s=%.1f' % (k, v)
for k, v in field_stats[key].iteritems())
session.console.info('%s: %s', key, field_stats_str)
# Do limit testing.
limit_results = {}
limit_failures_str = []
for nmea_field, stat_fn, cmp_fn_key, limit_value in self.args.limits:
cmp_fn = CMP_FNS[cmp_fn_key]
if not field_stats:
test_value = None
passed = False
else:
test_value = field_stats[nmea_field][stat_fn]
passed = cmp_fn(test_value, limit_value)
passed_str = 'PASS' if passed else 'FAIL'
limit_str = ('%s.%s %s %.1f'
% (nmea_field, stat_fn, cmp_fn_key, limit_value))
result_str = '%s %s' % (passed_str, limit_str)
limit_results[limit_str] = {'test_value': test_value, 'passed': passed}
session.console.info(result_str)
if not passed:
limit_failures_str.append(result_str)
logging.debug('Results to be logged: %s', limit_results)
return (field_stats, limit_results, limit_failures_str)
def runTest(self):
self._PushConfigFile()
# Stop glgps if it's already running.
self._KillGLGPS()
# Stop gpsd if it's already running.
session.console.info('Stopping gpsd...')
self.dut.Call('stop gpsd')
# Run glgps for <args.timeout> seconds with <self.args.gps_config_job>.
session.console.info('Starting %s job...', self.args.gps_config_job)
def StartGLGPS():
self.dut.Call([GLGPS_BINARY,
DEVICE_GPS_CONFIG_PATH,
self.args.gps_config_job])
glgps_thread = threading.Thread(target=StartGLGPS)
glgps_thread.daemon = True
glgps_thread.start()
# Check that glgps is running and is writing to <self.args.nmea_out_path>.
def CheckGLGPSRunning():
try:
self.dut.CheckCall('ps | grep %s' % GLGPS_BINARY)
self.dut.CheckCall('[[ -n `timeout 1 cat %s` ]]'
% self.args.nmea_out_path)
return True
except device.CalledProcessError:
return False
if not sync_utils.PollForCondition(poll_method=CheckGLGPSRunning,
timeout_secs=START_GLGPS_TIMEOUT,
poll_interval_secs=0):
self.fail('%s was not running' % GLGPS_BINARY)
# Get the latest readings from the NMEA output file.
session.console.info('Reading from NMEA output file...')
# TODO(kitching): Move this into AdbTarget so we can use something like
# self.dut.Popen() instead of calling adb directly.
cat_process = subprocess.Popen(
['adb', 'shell', 'cat %s' % self.args.nmea_out_path],
stdout=subprocess.PIPE)
all_values = self._ParseNMEAStream(cat_process.stdout)
field_stats, limit_results, limit_failures_str = (
self._CheckLimits(all_values))
# Log directly to event_log.
# The 'results' value format is:
# {'signal_strength.min > -130.0': {'test_value': -120.0,
# 'passed': True},
# ... }
log_dict = {
'station_name': self.args.station_name,
'fixture_id': self.args.fixture_id,
'mlb_serial_number': self._mlb_serial_number,
'serial_number': self._serial_number,
'passed': not self._failures and not limit_failures_str,
'failures': self._failures,
'stats': field_stats,
'results': limit_results}
event_log.Log(EVENT_LOG_NAME, **log_dict)
testlog.LogParam('stats', field_stats)
testlog.LogParam('results', limit_results)
# Check for failures.
if limit_failures_str:
self.fail('\n'.join(limit_failures_str))
def tearDown(self):
# Kill glgps.
self._KillGLGPS()
# Restart normal gpsd operation.
session.console.info('Restarting normal gpsd...')
self.dut.Call('start gpsd')
def _KillGLGPS(self):
# Stop the glgps with Factory_Test_Track.
try:
ps_line = self.dut.CheckOutput('ps | grep %s' % GLGPS_BINARY)
except device.CalledProcessError:
# Process is not running. Don't kill it!
session.console.info('%s already stopped', GLGPS_BINARY)
return
glgps_pid = re.split(r' *', ps_line)[1]
if not glgps_pid:
# Process is not running. Don't kill it!
session.console.info('%s already stopped', GLGPS_BINARY)
else:
session.console.info('Killing %s pid %d...', GLGPS_BINARY, int(glgps_pid))
self.dut.CheckOutput(['kill', glgps_pid])
# TODO(kitching): Join the GLGPS thread before sending a kill signal?