blob: bc9eab1996cc311c88a72b30b16f1847b2be7723 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Flash PD PSU RW firmware over the USBPD comm channel using console.
Example:
util/flash_pd.py ./build/zinger/ec.RW.bin
"""
import array
import errno
import logging
import optparse
import os
import re
import socket
import sys
import time
import serial
# TODO(tbroch): Discuss adding hdctools as an EC package RDEPENDS
from servo import client
from servo import multiservo
VERSION = '0.0.2'
# RW area is half of the 32-kB
MAX_FW_SIZE = 16 * 1024
# 20 first bytes of SHA-256 of RW when erased (set to all F's)
ERASED_RW_HASH = 'd86670be 559860c7 2b2149e8 d2ae1104 9550e093'
class FlashPDError(Exception):
"""Exception class for flash_pd utility."""
class FlashPD(client.ServoClient):
"""class to flash PD MCU.
Note, Some designs(samus) have multiple embedded MCUs. In that case
the convention is to name the pty associated with usbpd as
'usbpd_uart_pty'. In the case where there is only one MCU we prefer
'usbpd_uart_pty' but will also associate 'ecu_uart_pty' with having
capability to flash the USB-PD capable PSU(zinger).
Attributes:
_options : Values instance from optparse.
Public Methods:
expect : Examine console output for an expected response.
flash_command : Write a PD flash command and interrogate its result.
get_version : Retrieve current version of PD FW.
"""
def __init__(self, options):
"""Constructor.
Args:
options : Values instance from optparse.
Raises:
FlashPDError: If unable to determine the console pty
"""
super(FlashPD, self).__init__(host=options.server, port=options.port)
self._options = options
self._serial = None
try:
pty = self.get('usbpd_uart_pty')
except socket.error as e:
raise FlashPDError('Can\'t connect to servod :: %s' % e)
except client.ServoClientError:
pty = self.get('ec_uart_pty')
if not pty:
raise FlashPDError('Unable to determine EC uart from servod')
logging.debug('Opening serial connection to %s', pty)
try:
self._serial = serial.Serial(pty, timeout=1)
except OSError as e:
if e.errno == errno.EAGAIN:
# try twice if already open EAGAIN failure causes disconnect.
self._serial = serial.Serial(pty, timeout=1)
else:
raise FlashPDError('%s' % e)
# quiet other channels that might pollute console.
self._serial.write('chan 1\n')
self._serial.flushOutput()
self._serial.flushInput()
def __del__(self):
"""Deconstructor."""
if self._serial:
for l in self._serial:
logging.debug('flash: %s', l)
self._serial.write('chan 0xffffffff\n')
self._serial.write('chan restore\n')
self._serial.close()
def expect(self, val, timeout=5):
"""Scan serial output for particular string.
Args:
val : string to look for
timeout : integer seconds to look before timing out.
Returns:
tuple : boolean if 'val' found in console output.
string of line that had 'val' in it.
"""
done = False
deadline = time.time() + timeout
while not done and (time.time() < deadline):
l = None
for l in self._serial:
done = val in l
logging.debug('Is %s in: %s', val, l)
if done or time.time() > deadline:
break
if not done:
logging.debug("Expect '%s' missing", val)
return (done, l)
def pd_command(self, cmd, expect='DONE 0', retries=2, ignore_fail=False):
"""Send PD command and interrogate output.
Args:
cmd : string of 'pd <port>' command to execute
expect : string of expected response after 'cmd'
retries : integer number of times to repeat command if it fails.
ignore_fail : boolean to ignore failure
Returns:
tuple :
found : boolean, whether response matches expected.
line : string of line returned by expect method.
Raises:
FlashPDError: if command failed to match expected return string after
retries.
"""
tries = retries + 1
for i in xrange(tries):
self._serial.write('pd %d %s\n' % (self._options.multiport, cmd))
(found, line) = self.expect(expect)
if i:
time.sleep(1)
logging.debug("pd cmd Retry%d for '%s'", i, cmd)
if found:
break
if (i + 1) == tries and not found and not ignore_fail:
raise FlashPDError("Failed pd cmd: '%s' after %d retries\n" %
(cmd, retries))
return (found, line)
def flash_command(self, cmd, expect='DONE 0', retries=2, ignore_fail=False):
"""Helper method."""
flash_cmd = 'flash %s' % cmd
return self.pd_command(flash_cmd, expect, retries, ignore_fail)
def get_version(self):
"""Retreive PSU firmware version.
Looks like: 'version: zinger_v1.1.1917-bfd'
Returns:
version : string of version
Raises:
FlashPDError : if can't determine version
"""
(found, line) = self.flash_command('version', expect='version:')
logging.debug('is version in: %s', line)
m = False
if found:
m = re.match(r'.*version:\s+(\w+_v\d+\.\d+\.\d+-\S+).*', line)
if not m:
raise FlashPDError('Unable to determine PD FW version')
return m.group(1)
def reboot(self):
"""Reboot PSU.
Use 'version' to poll for success after DONE encountered.
Raises:
FlashPDError : If unable to reboot
"""
self.flash_command('reboot', expect=r'DONE', retries=0)
self.flash_command('version', retries=10)
def flash_pd(options):
"""Flash power delivery firmware."""
ec = FlashPD(options)
with open(options.firmware) as fd:
fw = fd.read()
fw_size = len(fw)
# The RW firmware should be already padded and signed
if fw_size != MAX_FW_SIZE:
raise FlashPDError('Bad RW firmware size %d/%d' % (fw_size, MAX_FW_SIZE))
words = array.array('I', fw)
logging.info('Current PD FW version is %s', ec.get_version())
if options.versiononly:
return
logging.info('Flashing %d bytes', fw_size)
# reset flashed signature to reboot in RO
ec.flash_command('signature')
# reboot in RO
ec.reboot()
# erase all RW partition
ec.flash_command('erase')
# try info command and guarantee we're in RO
(done, line) = ec.flash_command('info', expect=r'INFO')
m = re.match(r'INFO.*(18d1\S{4})', line)
if done and m:
done = ec.expect('DONE 0')
in_rw = int(m.group(1), 16) & 0x1
if in_rw:
raise FlashPDError('Not in RO after erase')
# Google UFP devices share their hash to DFP after info command so check it
(done, _) = ec.pd_command('hash', expect=ERASED_RW_HASH)
if not done:
raise FlashPDError('Erase failed')
logging.info('Successfully erased flash.')
if options.eraseonly:
ec.reboot()
logging.info('After erase, FW version is %s', ec.get_version())
return
# write firmware content
for i in xrange(len(words) / 6):
chunk = words[i * 6: (i + 1) * 6]
cmd = ' '.join(['%08x' % (w) for w in chunk])
ec.flash_command(cmd)
if not i % 0x10:
logging.info('Chunk %d of %d done.', i, len(words) / 6)
# write the remaining words
chunk = words[len(words) / 6 * 6:]
cmd = ' '.join(['%08x' % (w) for w in chunk])
ec.flash_command(cmd)
# reboot in RW
ec.reboot()
logging.info('Flashing DONE.')
logging.info('New PD FW version is %s', ec.get_version())
def parse_args():
"""Parse commandline arguments.
Note, reads sys.argv directly
Returns:
options : dict of from optparse.parse_args().
Raises:
FlashPDError : If problems with arguments
"""
description = (
'%prog [<switch args>] <firmware.bin>'
'\n'
'%prog is a utility for flashing the USB-PD charger RW firmware over '
'the USB-PD communication channel using PD MCU console commands.'
)
examples = (
'\nExamples:\n'
' %prog build/zinger/ec.RW.flat\n'
)
parser = optparse.OptionParser(version='%prog ' + VERSION)
parser.description = description
parser.add_option('-d', '--debug', action='store_true', default=False,
help='enable debug messages.')
parser.add_option('-s', '--server', help='host where servod is running',
default=client.DEFAULT_HOST)
parser.add_option('-p', '--port', default=client.DEFAULT_PORT, type=int,
help='port servod is listening on.')
parser.add_option('-m', '--multiport', default=0, type=int,
help='If design has multiple type-C ports, this identifies '
'which one has USB PD PSU.')
parser.add_option('', '--timeout', default=5, type=int,
help='Timeout seconds to wait for console output.')
parser.add_option('', '--eraseonly', action='store_true', default=False,
help='Only erase RW portion and exit.')
parser.add_option('-V', '--versiononly', action='store_true', default=False,
help='Only read version and exit.')
multiservo.add_multiservo_parser_options(parser)
parser.set_usage(parser.get_usage() + examples)
(options, args) = parser.parse_args()
# TODO(tbroch) Add this once we refactor module to ease use in scripts.
if options.name:
raise NotImplementedError('Multiservo support TBD')
# Add after to enumerate options.firmware but outside 'help' generation
parser.add_option('-f', '', action='store', type='string', dest='firmware')
if len(args) != 1:
raise FlashPDError('Must supply power delivery firmware to write.')
options.firmware = args[0]
if not os.path.exists(options.firmware):
raise FlashPDError('Unable to find file %s' % options.firmware)
fw_size = os.path.getsize(options.firmware)
if fw_size > MAX_FW_SIZE:
raise FlashPDError('Firmware too large %d/%d' % (fw_size, MAX_FW_SIZE))
return options
def main_function():
options = parse_args()
loglevel = logging.INFO
log_format = '%(asctime)s - %(name)s - %(levelname)s'
if options.debug:
loglevel = logging.DEBUG
log_format += ' - %(filename)s:%(lineno)d:%(funcName)s'
log_format += ' - %(message)s'
logging.basicConfig(level=loglevel, format=log_format)
flash_pd(options)
def main():
"""Main function wrapper to catch exceptions properly."""
try:
main_function()
except KeyboardInterrupt:
sys.exit(0)
except FlashPDError as e:
print 'Error: ', e.message
sys.exit(1)
if __name__ == '__main__':
main()