blob: a9ee6fcde3995b6f8f91091ed2b0daccdac9dc8d [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Runs the touchpad drag latency test using QuickStep v2
# Usage example:
# $ python qstep.py 6
# Input device : /dev/input/event6
# Serial device : /dev/ttyACM1
# Laser log file : /tmp/QuickStep_2016_01_21__1554_22_lsaer.log
# evtest log file: /tmp/QuickStep_2016_01_21__1554_22_evtest.log
# Clock zeroed at 1453409662 (rt 0.259ms)
# ........................................
# Processing data, may take a minute or two...
# Drag latency (min method) = 19.62 ms
# Average Maximum Minimum
# 0.0237723313845 0.0333168506622 0.0167829990387
#
# Note, before running this script, check that evtest can grab the device.
import argparse
import glob
import os
import subprocess
import sys
import tempfile
import time
import serial
import numpy
import latency_measurement as lm
import minimization
# Teensy commands (always singe char). Defined in QuickStep.ino
# TODO(kamrik): link to QuickStep.ino once it's opensourced.
CMD_RESET = 'F'
CMD_SYNC_ZERO = 'Z'
CMD_TIME_NOW = 'T'
CMD_AUTO_LASER_ON = 'L'
CMD_AUTO_LASER_OFF = 'l'
# Globals
debug_mode = False
def log(msg):
if debug_mode:
print(msg)
def sndrcv(ser, data):
""" Send a 1-char command.
Return the reply and how long it took.
"""
t0 = time.time()
ser.write(data)
reply = ser.readline()
t1 = time.time()
dt = (t1 - t0) * 1000
log('sndrcv(): round trip %.3fms, reply=%s' % (dt, reply.strip()))
return dt, reply
def runCommStats(ser, N=100):
"""
Measure the USB serial round trip time.
Send CMD_TIME_NOW to the Teensy N times measuring the round trip each time.
Prints out stats (min, median, max).
"""
log('Running USB comm stats...')
ser.flushInput()
sndrcv(ser, CMD_SYNC_ZERO)
tstart = time.time()
times = numpy.zeros((N, 1))
for i in range(N):
dt, _ = sndrcv(ser, CMD_TIME_NOW)
times[i] = dt
t_tot = (time.time() - tstart)*1000
median = numpy.median(times)
stats = (times.min(), median, times.max(), N)
log('USB comm round trip stats:')
log('min=%.2fms, median=%.2fms, max=%.2fms N=%d' % stats)
if (median > 2):
print('ERROR: the median round trip is too high: %.2f' % median)
sys.exit(2)
def zeroClock(ser, max_delay_ms=1.0, retries=10):
"""
Tell the TeensyUSB to zero its clock (CMD_SYNC_ZERO).
Returns the time when the command was sent.
Verify that the response arrived within max_delay_ms.
This is the simple zeroing used when the round trip is fast.
It does not employ the same method as Android clock sync.
"""
# Check that we get reasonable ping time with Teensy
# this also 'warms up' the comms, first msg is often slower
runCommStats(ser, N=10)
ser.flushInput()
for i in range(retries):
t0 = time.time()
dt, _ = sndrcv(ser, CMD_SYNC_ZERO)
if dt < max_delay_ms:
print('Clock zeroed at %.0f (rt %.3fms)' % (t0, dt))
return t0
print('Error, failed to zero the clock after %d retries')
return -1
def parseTrigger(trigger_line):
""" Parse a trigger line from QuickStep.
Trigger events look like this: "G L 12902345 1 1"
The parts:
* G - common for all trigger events
* L - means laser
* 12902345 is timestamp in us since zeroed
* 1st 1 or 0 is trigger value. 0 = changed to dark, 1 = changed to light,
* 2nd 1 is counter of how many times this trigger happened since last
readout, should always be 1 in our case
"""
parts = trigger_line.strip().split()
if len(parts) != 5:
raise Exception('Malformed laser trigger line:\n' + trigger_line)
t_us = int(parts[2])
val = int(parts[3])
return (t_us / 1e6, val)
def parse_args():
temp_dir = tempfile.gettempdir()
serial = '/dev/ttyACM0'
# Try to autodetect the QuickStep serial port
ls_ttyACM = glob.glob('/dev/ttyACM*')
if len(ls_ttyACM) > 0:
serial = ls_ttyACM[0]
description = "Run the touchpad drag latency test using QuickStep v2"
parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('input',
help='input device, e.g: 6 or /dev/input/event6')
parser.add_argument('-s', '--serial', default=serial,
help='QuickStep serial port')
parser.add_argument('-l', '--logdir', default=temp_dir,
help='where to store logs')
parser.add_argument('-n', default=40, type=int,
help='Number of laser toggles to read')
parser.add_argument('-d', '--debug', action='store_true',
help='talk more')
args = parser.parse_args()
global debug_mode
debug_mode = args.debug
if args.input.isalnum():
args.input = '/dev/input/event' + args.input
return args
if __name__ == '__main__':
args = parse_args()
# Create names for log files
prefix = time.strftime('QuickStep_%Y_%m_%d__%H%M_%S')
laser_file_name = os.path.join(args.logdir, prefix + '_lsaer.log')
evtest_file_name = os.path.join(args.logdir, prefix + '_evtest.log')
print('Input device : ' + args.input)
print('Serial device : ' + args.serial)
print('Laser log file : ' + laser_file_name)
print('evtest log file: ' + evtest_file_name)
with serial.Serial(args.serial, 115200) as ser:
sndrcv(ser, CMD_RESET)
tstart = time.time()
t_zero = zeroClock(ser)
if t_zero < 0:
print('Error: Couldn\'t zero clock, exitting')
sys.exit(1)
# Fire up the evtest process
cmd = 'evtest %s > %s' % (args.input, evtest_file_name)
evtest = subprocess.Popen(cmd, shell=True)
# Turn on laser trigger auto-sending
sndrcv(ser, CMD_AUTO_LASER_ON)
trigger_count = 0
while trigger_count < args.n:
# The following line blocks until a message from QuickStep arrives
trigger_line = ser.readline()
trigger_count += 1
log('#%d/%d - ' % (trigger_count, args.n) +
trigger_line.strip())
if not debug_mode:
sys.stdout.write('.')
sys.stdout.flush()
t, val = parseTrigger(trigger_line)
t += t_zero
with open(laser_file_name, 'at') as flaser:
flaser.write('%.3f %d\n' % (t, val))
sndrcv(ser, CMD_AUTO_LASER_OFF)
# Send SIGTERM to evtest process
evtest.terminate()
print "\nProcessing data, may take a minute or two..."
lm.main(evtest_file_name, laser_file_name)