servo v4: manufacturing scripts
This directory contains manufacturing scripts for
servo v4 and servo micro. Also included are the
RTM firmware versions.
BUG=chromium:571477
TEST=successfully flashes servo v4 and servo micro.
Change-Id: I7cc90741ef9f77350a9086e7c50510da9f311f84
Reviewed-on: https://chromium-review.googlesource.com/415600
Commit-Ready: Nick Sanders <nsanders@chromium.org>
Tested-by: Nick Sanders <nsanders@chromium.org>
Reviewed-by: Todd Broch <tbroch@chromium.org>
diff --git a/servo/scripts/servo_mfg/binfiles/Keyboard.hex b/servo/scripts/servo_mfg/binfiles/Keyboard.hex
new file mode 120000
index 0000000..99c4e46
--- /dev/null
+++ b/servo/scripts/servo_mfg/binfiles/Keyboard.hex
@@ -0,0 +1 @@
+../../../firmware/usbkm/KeyboardSerial/Keyboard.hex
\ No newline at end of file
diff --git a/servo/scripts/servo_mfg/binfiles/servo_micro.bin b/servo/scripts/servo_mfg/binfiles/servo_micro.bin
new file mode 120000
index 0000000..b89ae25
--- /dev/null
+++ b/servo/scripts/servo_mfg/binfiles/servo_micro.bin
@@ -0,0 +1 @@
+servo_micro_firmware-servo-9040.B.bin
\ No newline at end of file
diff --git a/servo/scripts/servo_mfg/binfiles/servo_micro_firmware-servo-9040.B.bin b/servo/scripts/servo_mfg/binfiles/servo_micro_firmware-servo-9040.B.bin
new file mode 100755
index 0000000..ea69c32
--- /dev/null
+++ b/servo/scripts/servo_mfg/binfiles/servo_micro_firmware-servo-9040.B.bin
Binary files differ
diff --git a/servo/scripts/servo_mfg/binfiles/servo_v4.bin b/servo/scripts/servo_mfg/binfiles/servo_v4.bin
new file mode 120000
index 0000000..02f1a26
--- /dev/null
+++ b/servo/scripts/servo_mfg/binfiles/servo_v4.bin
@@ -0,0 +1 @@
+servo_v4_firmware-servo-9040.B.bin
\ No newline at end of file
diff --git a/servo/scripts/servo_mfg/binfiles/servo_v4_firmware-servo-9040.B.bin b/servo/scripts/servo_mfg/binfiles/servo_v4_firmware-servo-9040.B.bin
new file mode 100755
index 0000000..a2378b4
--- /dev/null
+++ b/servo/scripts/servo_mfg/binfiles/servo_v4_firmware-servo-9040.B.bin
Binary files differ
diff --git a/servo/scripts/servo_mfg/dfu-util b/servo/scripts/servo_mfg/dfu-util
new file mode 100755
index 0000000..dc53f77
--- /dev/null
+++ b/servo/scripts/servo_mfg/dfu-util
Binary files differ
diff --git a/servo/scripts/servo_mfg/flash_stm32.sh b/servo/scripts/servo_mfg/flash_stm32.sh
new file mode 100755
index 0000000..6d2958b
--- /dev/null
+++ b/servo/scripts/servo_mfg/flash_stm32.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+FLAGS_timeout=600
+IMG=${1:-sweetberry.bin}
+
+function flash_stm32_dfu() {
+ DFU_DEVICE=0483:df11
+ ADDR=0x08000000
+ DFU_UTIL='./dfu-util'
+ which $DFU_UTIL &> /dev/null || die \
+ "no dfu-util util found. Did you 'sudo emerge dfu-util'"
+
+ dev_cnt=$(lsusb -d $DFU_DEVICE | wc -l)
+ if [ $dev_cnt -eq 0 ] ; then
+ die "unable to locate dfu device at $DFU_DEVICE"
+ elif [ $dev_cnt -ne 1 ] ; then
+ die "too many dfu devices (${dev_cnt}). Disconnect all but one."
+ fi
+
+ SIZE=$(wc -c ${IMG} | cut -d' ' -f1)
+ # Remove read protection
+ timeout -k 10 -s 9 "${FLAGS_timeout}" \
+ $DFU_UTIL -a 0 -s ${ADDR}:${SIZE}:force:unprotect -D "${IMG}"
+ # Wait for mass-erase and reboot after unprotection
+ sleep 1
+ # Actual image flashing
+ timeout -k 10 -s 9 "${FLAGS_timeout}" \
+ $DFU_UTIL -a 0 -s ${ADDR}:${SIZE} -D "${IMG}"
+}
+
+
+flash_stm32_dfu
+
diff --git a/servo/scripts/servo_mfg/mfg_servo_common.py b/servo/scripts/servo_mfg/mfg_servo_common.py
new file mode 100755
index 0000000..4baaea4
--- /dev/null
+++ b/servo/scripts/servo_mfg/mfg_servo_common.py
@@ -0,0 +1,159 @@
+#!/usr/bin/python
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Script to test and flash servo v4 boards.
+
+This script holds functionality shared between
+various servo manufacturiong scripts.
+"""
+
+import errno
+import os
+import subprocess
+import time
+
+import pty_driver
+import stm32uart
+
+logfile = None
+testerlogfile = None
+
+def open_logfile(filename):
+ """Open a file and create directory structure if needed."""
+ if not os.path.exists(os.path.dirname(filename)):
+ try:
+ os.makedirs(os.path.dirname(filename))
+ except OSError as exc:
+ if exc.errno != errno.EEXIST:
+ raise
+ return open(filename, "a");
+
+def finish_logfile():
+ """Finish a logfile and detetch logging."""
+ global logfile
+ logfile = None
+
+def setup_logfile(logname, serial):
+ """Open a logfile for this servo device."""
+ global logfile
+ if logfile:
+ logfile.flush()
+ logfile.close()
+
+ filename = "%s_%s_%s.log" % (logname, serial, time.time())
+ logfile = open_logfile(filename);
+
+def setup_tester_logfile(testerlogname):
+ """Open a logfile for this test session."""
+ global testerlogfile
+
+ filename = "%s_%s.log" % (testerlogname, time.time())
+ testerlogfile = open_logfile(filename);
+
+
+def log(output):
+ """Print output to console, and any open logfiles."""
+ global logfile
+ global testerlogfile
+ print output
+ if logfile:
+ logfile.write(output)
+ logfile.write("\n")
+ logfile.flush()
+ if testerlogfile:
+ testerlogfile.write(output)
+ testerlogfile.write("\n")
+ testerlogfile.flush()
+
+
+def check_usb(vidpid):
+ """Check if vidpid is present on the system's USB."""
+ if subprocess.call("lsusb -d %s > /dev/null" % vidpid, shell=True):
+ return False
+ return True
+
+def check_usb_sn(vidpid):
+ """Return the serial number of the first USB device with VID:PID vidpid,
+ or None if no device is found.
+
+ This will not work well with two of the same device attached.
+ """
+ sn = None
+ try:
+ lsusbstr = subprocess.check_output("lsusb -d %s -v | grep iSerial" % vidpid, shell=True)
+ sn = lsusbstr.split()[2]
+ except:
+ pass
+
+ return sn
+
+def wait_for_usb_remove(vidpid):
+ """Wait for USB device with vidpid to be removed."""
+ while check_usb(vidpid):
+ time.sleep(1)
+
+def wait_for_usb(vidpid):
+ """Wait for usb device with vidpid to be present."""
+ while not check_usb(vidpid):
+ time.sleep(1)
+
+def do_dfu(bin_name):
+ """Flash file 'bin_name" to an stm32 DFU target.
+
+ Must have 'dfu-util' and 'flash_stm32.sh' present
+ in the current directory to work.
+ """
+ if subprocess.call("./flash_stm32.sh %s" % bin_name, shell=True):
+ log("Flash, Failed to flash stm32: %s" % bin_name)
+ raise Exception("Flash", "Failed to flash %s" % bin_name)
+
+def do_atmega(bin_name):
+ """Flash file 'bin_name" to an atmega DFU target.
+
+ Must have 'dfu-programmer' present in the current directory to work.
+ """
+ if subprocess.call("dfu-programmer atmega32u4 erase --force", shell=True):
+ log("Flash, Failed to erase atmega")
+ raise Exception("Flash", "Failed to erase atmega")
+ if subprocess.call("dfu-programmer atmega32u4 flash %s" % bin_name,
+ shell=True):
+ log("Flash, Failed to flash atmega: %s" % bin_name)
+ raise Exception("Flash", "Failed to flash atmega: %s" % bin_name)
+
+def do_serialno(serialno, pty):
+ """Set serialnumber 'serialno' via ec console 'pty'.
+
+ Commands are:
+ # > serialno set 1234
+ # Saving serial number
+ # Serial number: 1234
+ """
+ cmd = "serialno set %s" % serialno
+ regex = "Serial number: (.*)$"
+
+ results = pty._issue_cmd_get_results(cmd, [regex])[0]
+ sn = results[1].strip().strip('\n\r')
+
+ if sn == serialno:
+ log("Success !")
+ else:
+ log("Serial number set to %s but saved as %s." % (serialno, sn))
+ raise Exception("Serial Number",
+ "Serial number set to %s but saved as %s." % (serialno, sn))
+ log("Serial set to %s" % sn )
+
+def setup_tinyservod(vidpid, interface):
+ """Set up a pty to the servo v4's ec console in order
+ to send commands. Returns a pty_driver object.
+ """
+ vidstr, pidstr = vidpid.split(":")
+ vid = int(vidstr, 16)
+ pid = int(pidstr, 16)
+ suart = stm32uart.Suart(vendor=vid, product=pid,
+ interface=interface, serialname=None)
+ suart.run()
+ pty = pty_driver.ptyDriver(suart, [])
+
+ return pty
diff --git a/servo/scripts/servo_mfg/mfg_servo_micro.py b/servo/scripts/servo_mfg/mfg_servo_micro.py
new file mode 100755
index 0000000..7ce23db
--- /dev/null
+++ b/servo/scripts/servo_mfg/mfg_servo_micro.py
@@ -0,0 +1,85 @@
+#!/usr/bin/python
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Script to test and flash servo micro boards.
+
+This script will continuously flash new boards in a loop,
+consisting of barcode scan, flash, provision, test.
+
+It will produce logfiles in a logfile/ directory for
+each servo and for the full run.
+"""
+
+import argparse
+import re
+
+import mfg_servo_common as c
+
+BIN_NAME="binfiles/servo_micro.bin"
+STM_DFU_VIDPID = "0483:df11"
+STM_VIDPID = "18d1:501a"
+serialno = "Uninitialized"
+LOGNAME = "logfiles/mfg_servo_micro"
+TESTERLOGNAME="logfiles/mfg_servo_micro_run"
+
+RE_SERIALNO = re.compile('^SM(C[0-9]{10}|N[PDQ][0-9]{5})$')
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Image a servo micro device")
+ parser.add_argument('-s', '--serialno', type=str,
+ help="serial number to program", default=None)
+ args = parser.parse_args()
+
+ serialno = args.serialno
+
+ c.setup_tester_logfile(TESTERLOGNAME)
+
+ while(True):
+ # Fetch barcode values
+ if not serialno:
+ done = False
+ while not done:
+ serialno = raw_input("Scan serial number barcode: ")
+ if RE_SERIALNO.match(serialno):
+ print "Scanned sn %s" % serialno
+ done = True
+
+ c.setup_logfile(LOGNAME, serialno)
+ c.log("Scanned sn %s" % serialno)
+
+ c.log("\n\n************************************************\n")
+ c.log("Plug in servo_micro via OTG adapter")
+ c.wait_for_usb(STM_DFU_VIDPID)
+ c.log("Found DFU target")
+ c.do_dfu(BIN_NAME)
+
+ c.log("\n\n************************************************\n")
+ c.log("Plug in servo_micro via normal cable")
+ c.wait_for_usb(STM_VIDPID)
+
+ c.log("Programming sn:%s" % serialno)
+ pty = c.setup_tinyservod(STM_VIDPID, 3)
+ c.do_serialno(serialno, pty)
+ c.log("Done programming serialno")
+ c.log("")
+
+ c.log("Finished programming.")
+ c.log("\n\n************************************************\n")
+ c.log("PASS")
+ c.log("Servo_Micro, %s, PASS" % serialno)
+ c.log("\n\n************************************************\n")
+
+ c.finish_logfile()
+ print "Finished programming."
+
+ if args.serialno:
+ break
+
+ serialno = None
+
+
+if __name__ == "__main__":
+ main()
diff --git a/servo/scripts/servo_mfg/mfg_servo_v4.py b/servo/scripts/servo_mfg/mfg_servo_v4.py
new file mode 100755
index 0000000..7c22e25
--- /dev/null
+++ b/servo/scripts/servo_mfg/mfg_servo_v4.py
@@ -0,0 +1,212 @@
+#!/usr/bin/python
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Script to test and flash servo v4 boards.
+
+This script will continuously flash new boards in a loop,
+consisting of barcode scan, flash, provision, test.
+
+It will produce logfiles in a logfile/ directory for
+each servo and for the full run.
+"""
+
+
+import argparse
+import subprocess
+import time
+import re
+
+import mfg_servo_common as c
+
+BIN_NAME="binfiles/servo_v4.bin"
+STM_DFU_VIDPID = "0483:df11"
+STM_VIDPID = "18d1:501b"
+ATM_BIN = "binfiles/Keyboard.hex"
+ATM_LUFA_VIDPID = "03eb:2042"
+ATM_DFU_VIDPID = "03eb:2ff4"
+CYPRESS_DUT_VIDPID = "04b4:6502"
+
+MACADDR="00:E0:4C:36:00:02"
+LOGNAME="logfiles/mfg_servo_v4"
+TESTERLOGNAME="logfiles/mfg_servo_v4_run"
+
+RE_MACADDR = re.compile('^([0-9A-Fa-f]{2}[:-]){5}([0-9A-F]{2})$')
+RE_SERIALNO = re.compile('^(C[0-9]{10}|N[PDQ][0-9]{5})$')
+
+
+def do_macaddr(macaddr, usemodule=False):
+ """Provision macaddr to Realtek r8152 chip.
+
+ Process is as follows for rtk programming kernel module:
+ # lsmod | grep mii | grep r8152
+ # sudo rmmod r815x cdc_ether r8152
+ # sudo insmod r8152.ko
+
+ This is not needed for a kernel built with the RTK r8152
+ firmware provisioning driver.
+ https://chromium-review.googlesource.com/#/c/411325/
+
+ Actual mac provisioning uses:
+ # sudo ./rtunicpg-x86_64 /# 0 /efuse /nodeid 00E04C360001
+
+ Must have 'rtunicpg-x86_64' in the current directory to function.
+ """
+ if subprocess.call('ifconfig | grep eth0 > /dev/null', shell=True):
+ c.log("Waiting for enet")
+ time.sleep(3)
+ if not subprocess.call('ifconfig | grep -i "%s" > /dev/null' % macaddr, shell=True):
+ c.log("Macaddr already set to %s" % macaddr)
+ return
+
+ if not subprocess.call("lsmod | grep mii | grep r815x", shell=True):
+ if subprocess.call("rmmod r815x cdc_ether", shell=True):
+ c.log("Failed to remove r815x cdc_ether")
+ raise Exception("Enet", "Failed remove r815x cdc_ether")
+ if usemodule:
+ if subprocess.call("lsmod | grep mii | grep r8152", shell=True):
+ if subprocess.call("sudo insmod r8152.ko", shell=True):
+ c.log("Failed to add r8152 ko")
+ raise Exception("Enet", "Failed to add r8152 ko")
+ if subprocess.call("./rtunicpg-x86_64 /# 0 /efuse /nodeid %s"
+ % macaddr.replace(":", ""), shell=True):
+ c.log("Failed to set mac %s" % macaddr)
+ raise Exception("Enet", "Failed to set mac %s" % macaddr)
+
+ c.log("Set macaddr")
+
+def do_enable_atmega(pty):
+ """Enable the atmega by deasserting reset on the servo v4's ioexpander
+ via ec console 'pty'.
+ """
+ cmdrd = "i2cxfer r 1 0x40 2"
+ cmdwr = "i2cxfer w 1 0x40 2 0x82"
+ regex = "^(.+)$"
+ regex_atm = "(0x8[02] .1[23][80].)"
+
+ results = pty._issue_cmd_get_results(cmdrd, [regex_atm])[0]
+ rd = results[1].strip().strip('\n\r')
+ if rd == "0x82 [130]":
+ c.log("Atmega already enabled")
+ return True
+
+ if rd != "0x80 [128]":
+ c.log("Check atmega enabled failed: [%s]" % rd)
+ raise Exception("Atmega", "Check atmega enabled failed: [%s]" % rd)
+
+ # Issue i2c write to enable atmega
+ pty._issue_cmd(cmdwr)
+
+ # Check if it stuck
+ results = pty._issue_cmd_get_results(cmdrd, [regex_atm])[0]
+ rd = results[1].strip().strip('\n\r')
+ if rd != "0x82 [130]":
+ c.log("Enable atmega failed: %s" % rd)
+ raise Exception("Atmega", "Enable atmega failed: [%s]" % rd)
+
+ return True
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Image a servo v4 device")
+ parser.add_argument('-s', '--serialno', type=str,
+ help="serial number to program", default=None)
+ parser.add_argument('-m', '--macaddr', type=str,
+ help="macaddr to program", default=None)
+ parser.add_argument('--no_flash', action="store_true",
+ help="Skip DFU step")
+
+ args = parser.parse_args()
+
+ serialno = args.serialno
+ macaddr = args.macaddr
+
+ c.setup_tester_logfile(TESTERLOGNAME)
+
+ while(True):
+ # Fetch barcode values
+ if not serialno:
+ done = False
+ while not done:
+ serialno = raw_input("Scan serial number barcode: ")
+ if RE_SERIALNO.match(serialno):
+ print "Scanned sn %s" % serialno
+ done = True
+ if not macaddr:
+ done = False
+ while not done:
+ macaddr = raw_input("Scan mac addr barcode: ")
+ if RE_MACADDR.match(macaddr):
+ print "Scanned mac %s" % macaddr
+ done = True
+
+ c.setup_logfile(LOGNAME, serialno)
+
+ c.log("Scanned sn %s" % serialno)
+ c.log("Scanned mac %s" % macaddr)
+
+ if not args.no_flash:
+ c.log("\n\n************************************************\n")
+ c.log("Plug in servo_v4 via OTG adapter")
+ c.wait_for_usb(STM_DFU_VIDPID)
+ c.log("Found DFU target")
+ c.do_dfu(BIN_NAME)
+
+ c.log("\n\n************************************************\n")
+ c.log("Plug in servo_v4 via normal cable")
+ c.wait_for_usb(STM_VIDPID)
+
+ c.log("\n\n************************************************\n")
+ c.log("Plug in servo_v4 via DUT cable")
+ # Wait for cypress USB hub
+ c.wait_for_usb(CYPRESS_DUT_VIDPID)
+
+ c.log("Programming sn:%s mac:%s" % (serialno, macaddr))
+
+ c.log("Programming sn:%s" % serialno)
+ pty = c.setup_tinyservod(STM_VIDPID, 0)
+ c.do_serialno(serialno, pty)
+ c.log("Done programming serialno")
+ c.log("")
+
+
+ # Wait for atmega dfu, if not already programmed
+ c.log("Programming Atmega")
+ do_enable_atmega(pty)
+ # Wait for atmega boot.
+ time.sleep(1)
+ if not c.check_usb(ATM_LUFA_VIDPID):
+ c.wait_for_usb(ATM_DFU_VIDPID)
+ c.do_atmega(ATM_BIN)
+ c.log("Done programming Atmega")
+ else:
+ c.log("Atmega already programmed")
+ c.log("")
+
+ c.log("Programming mac:%s" % macaddr)
+ do_macaddr(macaddr)
+ c.log("Done programming mac")
+ c.log("")
+
+ c.log("Finished programming.")
+ c.log("\n\n************************************************\n")
+ c.log("PASS")
+ c.log("ServoV4, %s, %s, PASS" % (serialno, macaddr))
+ c.log("\n\n************************************************\n")
+
+ c.finish_logfile()
+
+ # If we have specified by command line don't loop again.
+ if args.macaddr or args.serialno:
+ break
+ macaddr = None
+ serialno = None
+
+ print "\n\n************************************************\n"
+ print "Make sure no servo_v4 is plugged"
+ c.wait_for_usb_remove(STM_VIDPID)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/servo/scripts/servo_mfg/pty_driver.py b/servo/scripts/servo_mfg/pty_driver.py
new file mode 100644
index 0000000..790fbed
--- /dev/null
+++ b/servo/scripts/servo_mfg/pty_driver.py
@@ -0,0 +1,265 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import ast
+import errno
+import os
+import pexpect
+from pexpect import fdpexpect
+
+DEFAULT_UART_TIMEOUT = 3 # 3 seconds is plenty even for slow platforms
+
+class ptyError(Exception):
+ """Exception class for pty errors."""
+
+UART_PARAMS = {'uart_cmd': None,
+ 'uart_multicmd': None,
+ 'uart_regexp': None,
+ 'uart_timeout': DEFAULT_UART_TIMEOUT
+ }
+
+class ptyDriver():
+ """."""
+ def __init__(self, interface, params, fast=False):
+ """."""
+ self._child = None
+ self._fd = None
+ self._interface = interface
+ self._pty_path = self._interface.get_pty()
+ self._dict = UART_PARAMS
+ self._fast = fast
+
+ def _open(self):
+ """Connect to serial device and create pexpect interface."""
+ self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK)
+ self._child = fdpexpect.fdspawn(self._fd)
+ # pexpect dafaults to a 100ms delay before sending characters, to
+ # work around race conditions in ssh. We don't need this feature
+ # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up.
+ if self._fast:
+ self._child.delaybeforesend = 0.001
+
+ def _close(self):
+ """Close serial device connection."""
+ os.close(self._fd)
+ self._fd = None
+ self._child = None
+
+ def _flush(self):
+ """Flush device output to prevent previous messages interfering."""
+ if self._child.sendline("") != 1:
+ raise ptyError("Failed to send newline.")
+ while True:
+ try:
+ self._child.expect(".", timeout=0.01)
+ except (pexpect.TIMEOUT, pexpect.EOF):
+ break
+ except OSError, e:
+ # EAGAIN indicates no data available, maybe we didn't wait long enough
+ if e.errno != errno.EAGAIN:
+ raise
+ break
+
+ def _send(self, cmds):
+ """Send command to EC.
+
+ This function always flushes serial device before sending, and is used as
+ a wrapper function to make sure the channel is always flushed before
+ sending commands.
+
+ Args:
+ cmds: The commands to send to the device, either a list or a string.
+
+ Raises:
+ ptyError: Raised when writing to the device fails.
+ """
+ self._flush()
+ if not isinstance(cmds, list):
+ cmds = [cmds]
+ for cmd in cmds:
+ if self._child.sendline(cmd) != len(cmd) + 1:
+ raise ptyError("Failed to send command.")
+
+ def _issue_cmd(self, cmds):
+ """Send command to the device and do not wait for response.
+
+ Args:
+ cmds: The commands to send to the device, either a list or a string.
+ """
+ self._issue_cmd_get_results(cmds, [])
+
+ def _issue_cmd_get_results(self, cmds,
+ regex_list, timeout=DEFAULT_UART_TIMEOUT):
+ """Send command to the device and wait for response.
+
+ This function waits for response message matching a regular
+ expressions.
+
+ Args:
+ cmds: The commands issued, either a list or a string.
+ regex_list: List of Regular expressions used to match response message.
+ Note1, list must be ordered.
+ Note2, empty list sends and returns.
+
+ Returns:
+ List of tuples, each of which contains the entire matched string and
+ all the subgroups of the match. None if not matched.
+ For example:
+ response of the given command:
+ High temp: 37.2
+ Low temp: 36.4
+ regex_list:
+ ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
+ returns:
+ [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
+
+ Raises:
+ ptyError: If timed out waiting for a response
+ """
+ result_list = []
+ self._open()
+ try:
+ self._send(cmds)
+ for regex in regex_list:
+ self._child.expect(regex, timeout)
+ match = self._child.match
+ lastindex = match.lastindex if match and match.lastindex else 0
+ # Create a tuple which contains the entire matched string and all
+ # the subgroups of the match.
+ result = match.group(*range(lastindex + 1)) if match else None
+ result_list.append(result)
+ except pexpect.TIMEOUT:
+ raise ptyError("Timeout waiting for response.")
+ finally:
+ self._close()
+ return result_list
+
+ def _issue_cmd_get_multi_results(self, cmd, regex):
+ """Send command to the device and wait for multiple response.
+
+ This function waits for arbitary number of response message
+ matching a regular expression.
+
+ Args:
+ cmd: The command issued.
+ regex: Regular expression used to match response message.
+
+ Returns:
+ List of tuples, each of which contains the entire matched string and
+ all the subgroups of the match. None if not matched.
+ """
+ result_list = []
+ self._open()
+ try:
+ self._send(cmd)
+ while True:
+ try:
+ self._child.expect(regex, timeout=0.1)
+ match = self._child.match
+ lastindex = match.lastindex if match and match.lastindex else 0
+ # Create a tuple which contains the entire matched string and all
+ # the subgroups of the match.
+ result = match.group(*range(lastindex + 1)) if match else None
+ result_list.append(result)
+ except pexpect.TIMEOUT:
+ break
+ finally:
+ self._close()
+ return result_list
+
+ def _Set_uart_timeout(self, timeout):
+ """Set timeout value for waiting for the device response.
+
+ Args:
+ timeout: Timeout value in second.
+ """
+ self._dict['uart_timeout'] = timeout
+
+ def _Get_uart_timeout(self):
+ """Get timeout value for waiting for the device response.
+
+ Returns:
+ Timeout value in second.
+ """
+ return self._dict['uart_timeout']
+
+ def _Set_uart_regexp(self, regexp):
+ """Set the list of regular expressions which matches the command response.
+
+ Args:
+ regexp: A string which contains a list of regular expressions.
+ """
+ if not isinstance(regexp, str):
+ raise ecError('The argument regexp should be a string.')
+ self._dict['uart_regexp'] = ast.literal_eval(regexp)
+
+ def _Get_uart_regexp(self):
+ """Get the list of regular expressions which matches the command response.
+
+ Returns:
+ A string which contains a list of regular expressions.
+ """
+ return str(self._dict['uart_regexp'])
+
+ def _Set_uart_cmd(self, cmd):
+ """Set the UART command and send it to the device.
+
+ If ec_uart_regexp is 'None', the command is just sent and it doesn't care
+ about its response.
+
+ If ec_uart_regexp is not 'None', the command is send and its response,
+ which matches the regular expression of ec_uart_regexp, will be kept.
+ Use its getter to obtain this result. If no match after ec_uart_timeout
+ seconds, a timeout error will be raised.
+
+ Args:
+ cmd: A string of UART command.
+ """
+ if self._dict['uart_regexp']:
+ self._dict['uart_cmd'] = self._issue_cmd_get_results(
+ cmd,
+ self._dict['uart_regexp'],
+ self._dict['uart_timeout'])
+ else:
+ self._dict['uart_cmd'] = None
+ self._issue_cmd(cmd)
+
+ def _Set_uart_multicmd(self, cmds):
+ """Set multiple UART commands and send them to the device.
+
+ Note that ec_uart_regexp is not supported to match the results.
+
+ Args:
+ cmds: A semicolon-separated string of UART commands.
+ """
+ self._issue_cmd(cmds.split(';'))
+
+ def _Get_uart_cmd(self):
+ """Get the result of the latest UART command.
+
+ Returns:
+ A string which contains a list of tuples, each of which contains the
+ entire matched string and all the subgroups of the match. 'None' if
+ the ec_uart_regexp is 'None'.
+ """
+ return str(self._dict['uart_cmd'])
+
+ def _Set_uart_capture(self, cmd):
+ """Set UART capture mode (on or off).
+
+ Once capture is enabled, UART output could be collected periodically by
+ invoking _Get_uart_stream() below.
+
+ Args:
+ cmd: an int, 1 of on, 0 for off
+ """
+ self._interface.set_capture_active(cmd)
+
+ def _Get_uart_capture(self):
+ """Get the UART capture mode (on or off)."""
+ return self._interface.get_capture_active()
+
+ def _Get_uart_stream(self):
+ """Get uart stream generated since last time."""
+ return self._interface.get_stream()
diff --git a/servo/scripts/servo_mfg/rtunicpg-x86_64 b/servo/scripts/servo_mfg/rtunicpg-x86_64
new file mode 100755
index 0000000..2555ceb
--- /dev/null
+++ b/servo/scripts/servo_mfg/rtunicpg-x86_64
Binary files differ
diff --git a/servo/scripts/servo_mfg/stm32uart.py b/servo/scripts/servo_mfg/stm32uart.py
new file mode 100644
index 0000000..297ecff
--- /dev/null
+++ b/servo/scripts/servo_mfg/stm32uart.py
@@ -0,0 +1,214 @@
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Allow creation of uart/console interface via stm32 usb endpoint."""
+import errno
+import exceptions
+import os
+import pty
+import select
+import sys
+import termios
+import threading
+import time
+import tty
+import usb
+
+import stm32usb
+
+
+class SuartError(Exception):
+ """Class for exceptions of Suart."""
+ def __init__(self, msg, value=0):
+ """SuartError constructor.
+
+ Args:
+ msg: string, message describing error in detail
+ value: integer, value of error when non-zero status returned. Default=0
+ """
+ super(SuartError, self).__init__(msg, value)
+ self.msg = msg
+ self.value = value
+
+
+class Suart():
+ """Provide interface to stm32 serial usb endpoint."""
+ def __init__(self, vendor=0x18d1, product=0x501a, interface=0,
+ serialname=None, ftdi_context=None):
+ """Suart contstructor.
+
+ Initializes stm32 USB stream interface.
+
+ Args:
+ vendor: usb vendor id of stm32 device
+ product: usb product id of stm32 device
+ interface: interface number of stm32 device to use
+ serialname: n/a. Defaults to None.
+ ftdi_context: n/a. Defaults to None.
+
+ Raises:
+ SuartError: If init fails
+ """
+ self._susb = stm32usb.Susb(vendor=vendor, product=product,
+ interface=interface, serialname=serialname)
+
+
+ def __del__(self):
+ """Suart destructor."""
+ pass
+
+ def run_rx_thread(self):
+ ep = select.epoll()
+ ep.register(self._ptym, select.EPOLLHUP)
+ while True:
+ events = ep.poll(0)
+ # Check if the pty is connected to anything, or hungup.
+ if not events:
+ try:
+ r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS)
+ if r:
+ os.write(self._ptym, r)
+
+ except Exception as e:
+ # If we miss some characters on pty disconnect, that's fine.
+ # ep.read() also throws USBError on timeout, which we discard.
+ if type(e) not in [exceptions.OSError, usb.core.USBError]:
+ pass
+ else:
+ time.sleep(.1)
+
+ def run_tx_thread(self):
+ ep = select.epoll()
+ ep.register(self._ptym, select.EPOLLHUP)
+ while True:
+ events = ep.poll(0)
+ # Check if the pty is connected to anything, or hungup.
+ if not events:
+ try:
+ r = os.read(self._ptym, 64)
+ if r:
+ self._susb._write_ep.write(r, self._susb.TIMEOUT_MS)
+
+ except Exception as e:
+ pass
+ else:
+ time.sleep(.1)
+
+
+ def run(self):
+ """Creates pthreads to poll stm32 & PTY for data.
+ """
+
+ m, s = os.openpty()
+ self._ptyname = os.ttyname(s)
+
+ self._ptym = m
+ self._ptys = s
+
+ os.fchmod(s, 0o660)
+
+ # Change the owner and group of the PTY to the user who started servod.
+ try:
+ uid = int(os.environ.get('SUDO_UID', -1))
+ except TypeError:
+ uid = -1
+
+ try:
+ gid = int(os.environ.get('SUDO_GID', -1))
+ except TypeError:
+ gid = -1
+ os.fchown(s, uid, gid)
+
+ tty.setraw(self._ptym, termios.TCSADRAIN)
+
+ # Generate a HUP flag on pty slave fd.
+ os.fdopen(s).close()
+
+ self._rx_thread = threading.Thread(target=self.run_rx_thread, args=[])
+ self._rx_thread.daemon = True
+ self._rx_thread.start()
+
+ self._tx_thread = threading.Thread(target=self.run_tx_thread, args=[])
+ self._tx_thread.daemon = True
+ self._tx_thread.start()
+
+
+ def get_uart_props(self):
+ """Get the uart's properties.
+
+ Returns:
+ dict where:
+ baudrate: integer of uarts baudrate
+ bits: integer, number of bits of data Can be 5|6|7|8 inclusive
+ parity: integer, parity of 0-2 inclusive where:
+ 0: no parity
+ 1: odd parity
+ 2: even parity
+ sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
+ 0: 1 stop bit
+ 1: 1.5 stop bits
+ 2: 2 stop bits
+ """
+ return {'baudrate': 115200,
+ 'bits': 8,
+ 'parity': 0,
+ 'sbits': 1}
+
+
+ def set_uart_props(self, line_props):
+ """Set the uart's properties. Note that Suart cannot set properties
+ and will fail if the properties are not the default 115200,8n1.
+
+ Args:
+ line_props: dict where:
+ baudrate: integer of uarts baudrate
+ bits: integer, number of bits of data ( prior to stop bit)
+ parity: integer, parity of 0-2 inclusive where
+ 0: no parity
+ 1: odd parity
+ 2: even parity
+ sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
+ 0: 1 stop bit
+ 1: 1.5 stop bits
+ 2: 2 stop bits
+
+ Raises:
+ SuartError: If requested line properties are not the default.
+ """
+ curr_props = self.get_uart_props()
+ for prop in line_props:
+ if line_props[prop] != curr_props[prop]:
+ raise SuartError("Line property %s cannot be set from %s to %s" % (
+ prop, curr_props[prop], line_props[prop]))
+ return True
+
+
+ def get_pty(self):
+ """Gets path to pty for communication to/from uart.
+
+ Returns:
+ String path to the pty connected to the uart
+ """
+ return self._ptyname
+
+
+def test():
+ format='%(asctime)s - %(name)s - %(levelname)s'
+ if True:
+ format += ' - %(filename)s:%(lineno)d:%(funcName)s'
+ format += ' - %(message)s'
+
+ sobj = Suart()
+ sobj.run()
+
+ # run() is a thread so just busy wait to mimic server
+ while True:
+ # ours sleeps to eleven!
+ time.sleep(11)
+
+if __name__ == '__main__':
+ try:
+ test()
+ except KeyboardInterrupt:
+ sys.exit(0)
diff --git a/servo/scripts/servo_mfg/stm32usb.py b/servo/scripts/servo_mfg/stm32usb.py
new file mode 100644
index 0000000..dc823ea
--- /dev/null
+++ b/servo/scripts/servo_mfg/stm32usb.py
@@ -0,0 +1,91 @@
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Allows creation of an interface via stm32 usb."""
+
+import usb
+
+
+class SusbError(Exception):
+ """Class for exceptions of Susb."""
+ def __init__(self, msg, value=0):
+ """SusbError constructor.
+
+ Args:
+ msg: string, message describing error in detail
+ value: integer, value of error when non-zero status returned. Default=0
+ """
+ super(SusbError, self).__init__(msg, value)
+ self.msg = msg
+ self.value = value
+
+
+class Susb():
+ """Provide stm32 USB functionality.
+
+ Instance Variables:
+ _logger: S.* tagged log output
+ _dev: pyUSB device object
+ _read_ep: pyUSB read endpoint for this interface
+ _write_ep: pyUSB write endpoint for this interface
+ """
+ READ_ENDPOINT = 0x81
+ WRITE_ENDPOINT = 0x1
+ TIMEOUT_MS = 100
+
+ def __init__(self, vendor=0x18d1,
+ product=0x500f, interface=1, serialname=None, logger=None):
+ """Susb constructor.
+
+ Disconvers and connects to stm32 USB endpoints.
+
+ Args:
+ vendor : usb vendor id of stm32 device
+ product : usb product id of stm32 device
+ interface : interface number ( 1 - 4 ) of stm32 device to use
+ serialname: string of device serialname.
+
+ Raises:
+ SusbError: An error accessing Susb object
+ """
+
+ # Find the stm32.
+ dev = usb.core.find(idVendor=vendor, idProduct=product)
+ if dev is None:
+ raise SusbError("USB device not found")
+
+ serial = '(%s)' % serialname if serialname else ''
+ # If we can't set configuration, it's already been set.
+ try:
+ dev.set_configuration()
+ except usb.core.USBError:
+ pass
+
+ # Get an endpoint instance.
+ cfg = dev.get_active_configuration()
+ intf = usb.util.find_descriptor(cfg, bInterfaceNumber=interface)
+ self._intf = intf
+
+ if not intf:
+ raise SusbError("Interface not found")
+
+ # Detatch raiden.ko if it is loaded.
+ if dev.is_kernel_driver_active(intf.bInterfaceNumber) is True:
+ dev.detach_kernel_driver(intf.bInterfaceNumber)
+ #self._logger.debug("InterfaceNumber: %s" % intf.bInterfaceNumber)
+
+ read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
+ read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
+ self._read_ep = read_ep
+ #self._logger.debug("Reader endpoint: 0x%x" % read_ep.bEndpointAddress)
+
+ write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
+ write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
+ self._write_ep = write_ep
+ #self._logger.debug("Writer endpoint: 0x%x" % write_ep.bEndpointAddress)
+
+ #self._logger.debug("Set up stm32 usb")
+
+ def __del__(self):
+ """Sgpio destructor."""