blob: 3de9a543d8c1cbddd73085ff3c01e7b994e8f22c [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Simple test case management and execution framework.
Provides the TestCase and TestRunner class. The TestRunner can be fed
with multiple TestCases and execute these on the robot fully automatically while
handling potential error cases by resetting or recalibrating the device.
"""
from collections import defaultdict
from contracts import contract
from fnmatch import fnmatch
import itertools
import numpy as np
import os
import sys
import time
import traceback
from optofidelity.system import TestApp, CameraCalibration
from optofidelity.system import DUTException
from optofidelity.videoproc import VideoException
from optofidelity.test_run import TestClass, TestRun
def IterGroups(iterable, key):
return itertools.groupby(sorted(iterable, key=key), key=key)
def PrintPerf(msg, start_t, debug):
if debug["perf"]:
print "PERF: %s: %f s" % (msg, time.time() - start_t)
class TestCase(object):
"""A test case is defined as a test_class executed on an app."""
@contract(app=TestApp, test_class=TestClass)
def __init__(self, app, test_class):
self.app = app
self.test_class = test_class
def __str__(self):
return "%s/%s" % (self.app, self.test_class.name)
class TestResult(object):
def __init__(self, case, value_name):
self.app = case.app.name
self.dut = case.app.dut.name
self.case_name = case.test_class.name
self.value_name = value_name
self.values = []
class TestResults(object):
def __init__(self):
self.results = {}
def AddTestResults(self, case, results_map):
for name, values in results_map.items():
key = "%s/%s" % (case, name)
if key not in self.results:
self.results[key] = TestResult(case, name)
self.results[key].values.extend(values)
def PrintResults(self, ms_per_frame):
all_results = self.results.values()
for case_name, case_results in IterGroups(all_results,
lambda r: r.case_name):
print "Test Case %s:" % case_name
for value_name, value_results in IterGroups(case_results,
lambda r: r.value_name):
print " Measurement %s:" % value_name
for result in sorted(value_results, key=lambda r: np.mean(r.values)):
std = np.std(result.values) * ms_per_frame
mean = np.mean(result.values) * ms_per_frame
N = len(result.values)
format = " %s: %.2f ms (std=%.2f ms, N=%d)"
print format % (result.dut, mean, std, N)
class TestRunner(object):
"""Runs a collection of TestCases and handles potential error cases.
If an error happens during the execution, the robot will first try to reset,
then re-calibrate if the errors continue.
During execution, the test cases are sorted in a way that involves as little
app and dut switching as possible.
"""
test_classes = {}
max_tries = 3
def __init__(self, system, path, dump_all=False, skip_calib=False):
self.test_apps = {}
self.system = system
self.dump_all = dump_all
self.skip_calib = skip_calib
self.results_path = os.path.join(path, "results")
self.failures_path = os.path.join(path, "failures")
for dut in system:
for app in dut:
self.test_apps[str(app)] = app
@classmethod
@contract(test_class=type)
def RegisterTestClass(cls, test_class):
cls.test_classes[test_class.name] = test_class()
def RunTests(self, glob, debug):
# Discover all test cases
test_cases = []
for app in self.test_apps.values():
for test_class in self.test_classes.values():
test_cases.append(TestCase(app, test_class))
if glob:
test_cases = [c for c in test_cases if fnmatch(str(c), glob)]
if debug["robot"]:
for case in test_cases:
case.app.LocateButtons()
case.test_class.ExecuteTest(case.app, case.app.dut, self.system.camera)
return {}
print "Running the following tests:"
for case in test_cases:
print " %s" % str(case)
print
# Group test cases by DUT and run all of them
start_t = time.time()
results = TestResults()
for dut, cases in IterGroups(test_cases, lambda c: c.app.dut):
print "Running tests for: " + str(dut)
dut_results = self._RunDUTTests(results, dut, cases, debug, False)
PrintPerf("All tests", start_t, debug)
return results
def _RunDUTTests(self, results, dut, test_cases, debug, safe):
# Reset device
if safe:
dut.Reset()
else:
dut.Home()
# Group test cases by app and run all of them
try:
dut.LocateButtons(force=safe)
for app, cases in IterGroups(test_cases, lambda c: c.app):
mode = "(safe mode)" if safe else ""
print "Running tests for: %s %s" % (str(app), mode)
dut.EnterApp(app)
app_results = self._RunAppTests(results, dut, app, cases, debug, safe)
dut.ExitApp()
return results
except KeyboardInterrupt as e:
print "Keyboard interrupt received."
print "Resetting device before exiting app."
dut.Reset()
raise e
except:
traceback.print_exc()
if not safe:
return self._RunDUTTests(results, dut, cases, debug, True)
return {}
def _RunAppTests(self, results, dut, app, cases, debug, safe):
app.LocateButtons(force=safe)
# Calibrate camera by making the screen flash
start_t = time.time()
if self.skip_calib and app.calibration and not safe:
print "Skipping screen calibration"
else:
print "Calibrating screen"
app.calibration = CameraCalibration()
app.calibration.Calibrate(app, self.system.camera, debug["calibration"])
PrintPerf("Calibration", start_t, debug)
# Cache calibration video to disk. The video on the camera will get
# overwritten by the test runs.
if self.dump_all:
app.calibration.CacheVideo()
# Run all test cases on this app
for case in cases:
num_failures = 0
# Each test class defines how often the test should be repeated
for i in range(case.test_class.repetitions):
# Retry until it failed max_tries consecutive times
while num_failures < self.max_tries:
try:
# Run test and collect results
mode = "(safe mode)" if safe else ""
print "Running test: %s (rep %d retry %d) %s" % (
case, i + 1, num_failures, mode)
case_results = self._RunTest(results, dut, app, case, debug)
num_failures = 0
break
except KeyboardInterrupt as e:
raise e
except:
num_failures += 1
traceback.print_exc()
def _RunTest(self, results, dut, app, case, debug):
# Path to store results in
timestamp = time.strftime("%Y_%m_%d-%H%M")
result_path = os.path.join(self.results_path, str(case), timestamp)
failure_path = os.path.join(self.failures_path, str(case), timestamp)
# Record test video on the robot
start_t = time.time()
run = TestRun(case.test_class, app, self.results_path)
run.RecordTestVideo(self.system)
try:
# Process video and trace, return latency results
run.ProcessTestVideo(debug)
run.ProcessTestTrace(debug)
if self.dump_all:
run.SaveAll(result_path)
print "Results:"
for key, value in run.test_results.iteritems():
print " %s: %s" % (key, value)
results.AddTestResults(case, run.test_results)
PrintPerf(str(case), start_t, debug)
except Exception, e:
print "%s: Processing failed: %s" % (case, e)
traceback.print_exc()
run.SaveAll(failure_path)
raise e