#!/usr/bin/env python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Extracts data from firmware binaries and sends it to the shopfloor server.
./ --firmware bios.bin --rma RMA00001234 --outdir /tmp/
./ --ro_vpd ro_vpd.bin --rw_vpd rw_vpd.bin --gbb gbb.bin \
--rma RMA00001234 --outdir /tmp/
import logging
import optparse
import os
import re
import sys
import xmlrpclib
from httplib import socket
from subprocess import Popen, PIPE
_GBB_UTILITY_LOCATION = '/usr/bin/gbb_utility'
_VPD_UTILITY_LOCATION = '/usr/sbin/vpd'
# These are standard required fields, this can be added to as needed
_REQUIRED_RO_VPD_FIELDS = ('region', 'serial_number')
_options = None
class ServerFault(Exception):
def _server_api(call):
"""Decorator of calls to remote server.
Converts xmlrpclib.Fault generated during remote procedural call to
a simplified form (shopfloor.ServerFault).
def wrapped_call(*args, **kargs):
return call(*args, **kargs)
except xmlrpclib.Fault as e:
logging.exception('Shopfloor server:')
raise ServerFault(e.faultString.partition(':')[2])
wrapped_call.__name__ = call.__name__
return wrapped_call
def get_instance(shopfloor_url):
"""Gets an instance (for client side) to access the shop floor server.
shopfloor_url - String. URL (including port) of the shopfloor server.
xmlrpc proxy object to the shopfloor server.
return xmlrpclib.ServerProxy(shopfloor_url)
def server_is_up(server):
"""Checks if the given instance is successfully connected.
server - xmlrpc proxy object to hte shopfloor server.
Bool - True if the server responds, False otherwise.
except socket.error:
return False
return True
def SaveDeviceData(shopfloor, device_data, overwrite=False):
"""Sends device data to the shopfloor server.
shopfloor - xmlrpc proxy to shopfloor server.
device_data - dictionary containing device information.
overwrite - Bool. Whether to replace data if it exists on
the server.
dictionary - Contents defined by RMA shopfloor module.
return shopfloor.SaveDeviceData(device_data, overwrite)
class Obj(object):
"""Generic wrapper allowing dot-notation dict access."""
def __init__(self, **field_dict):
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return repr(self.__dict__)
def is_valid(self):
return False if None in self.__dict__.values() else True
class DeviceData(object):
"""Container for device-specific data."""
def __init__(self, serial_number, vpd, hwid):
self.serial_number = serial_number
self.vpd = vpd
self.hwid = hwid
def FromDictionary(cls, dictionary):
vpd = VPD(dictionary['vpd']['ro'], dictionary['vpd']['rw'])
return DeviceData(dictionary['rma_number'], vpd, dictionary['hwid'])
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return repr(self.__dict__)
def __str__(self):
pretty_str = 'RMA Number: %s\n' % self.serial_number
pretty_str += 'HWID: %s\n' % self.hwid
pretty_str += str(self.vpd) + '\n'
return pretty_str
def is_valid(self):
if self.serial_number is None:
return False
if not self.vpd.is_valid():
return False
# This checks for default v2 TEST HWIDs, the regex spaces are required to
# avoid collision with HWIDv3s which could validly contain 'TEST'.
if self.hwid is None or' TEST ', self.hwid):
return False
return True
class VPD(object):
"""Vital Product Data container."""
def __init__(self, ro_vpd, rw_vpd): = ro_vpd = rw_vpd
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
censored_dict = self.__dict__.copy()
for key in censored_dict['rw']:
if key in ['ubind_attribute', 'gbind_attribute']:
censored_dict['rw'][key] = 'REDACTED'
return repr(censored_dict)
def __str__(self):
def format_vpd_str(vpd_dict):
keys = vpd_dict.keys()
vpd_str = ''
for key in keys:
vpd_str += ' %s: %s\n' % (key, vpd_dict[key])
return vpd_str
vpd_str = 'RO VPD:\n'
vpd_str += format_vpd_str(
vpd_str += 'RW VPD:\n'
vpd_str += format_vpd_str(
return vpd_str
def is_valid(self):
if None in self.__dict__.values():
return False
if field not in
return False
return True
def Shell(command, stdin=None):
"""Convenience wrapper for running a shell command.
This is defined here instead of using the existing factory
implementation to allow portability outside of a factory image.
process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = process.communicate(input=stdin)
status = process.poll()
return Obj(stdout=stdout, stderr=stderr, status=status, success=(status == 0))
def ExtractVPD(partition, filename):
"""Returns the VPD from a firmware or vpd binary."""
vpd_dict = {}
vpd_response = Shell(_options.vpd_utility + ' -l -i %s -f %s' %
(partition, filename))
raw_vpd = vpd_response.stdout # pylint: disable=E1101
for line in raw_vpd.splitlines():
match = re.match(r'"(.*)"="(.*)"$', line.strip())
(name, value) = (,
vpd_dict[name] = value
return vpd_dict
def GetHWID(filename):
"""Returns the HWID from a firmware or gbb binary."""
gbb_response = Shell(_GBB_UTILITY_LOCATION + ' -g --hwid %s' % filename)
raw_gbb = gbb_response.stdout # pylint: disable=E1101
match = re.match(r'hardware_id: (.*)', raw_gbb.strip())
if match is None:
logging.warning('Unable to find HWID in gbb: %s', filename)
return None
def DeviceDataFromFirmwareRegions(ro_vpd_file, rw_vpd_file, gbb_file,
"""Read device data from a device's firmware.
ro_vpd_file: Path to a file containing either a RO_VPD region
or entire firmware image.
rw_vpd_file: Path to a file containing either a RW_VPD region
or entire firmware image.
gbb_file: Path to a file containing either a GBB region
or entire firmware image.
rma_number: The RMA number for the device.
A DeviceData object containing VPD and HWID info extracted from
the firmware files.
serial_number = rma_number
ro_vpd = ExtractVPD('RO_VPD', ro_vpd_file)
rw_vpd = ExtractVPD('RW_VPD', rw_vpd_file)
vpd = VPD(ro_vpd, rw_vpd)
hwid = GetHWID(gbb_file)
return DeviceData(serial_number, vpd, hwid)
def prompt(question):
"""Display the question to the user and wait for input."""
answer = raw_input().lower()
if answer in ['y', 'yes']:
return True
elif answer in ['n', 'no']:
return False
return prompt(question)
def option_parser():
parser = optparse.OptionParser()
parser.add_option('-f', '--firmware', dest='firmware', metavar='FILE',
help='Location of source firmware binary.')
parser.add_option('--gbb', dest='gbb', metavar='FILE',
help='Location of file containing the Google binary block.')
parser.add_option('--gbb_utility', dest='gbb_utility', metavar='FILE',
help='Location of gbb utility.')
parser.add_option('-o', '--outdir', dest='output_directory', metavar='DIR',
help=('Location of output directory. '
'Defaults to the current directory.'))
parser.add_option('-r', '--rma', dest='rma_number', metavar='RMAxxxxxxxx',
help='RMA number.')
parser.add_option('--ro_vpd', dest='ro_vpd', metavar='FILE',
help='Location of file containing the read-only VPD.')
parser.add_option('--rw_vpd', dest='rw_vpd', metavar='FILE',
help='Location of file containing the read-write VPD.')
parser.add_option('-s', '--shopfloor', dest='shopfloor_url', metavar='URL',
help='URL of shopfloor server to send data.')
parser.add_option('-v', '--verbose', dest='verbose', action='store_true',
help='Enable debug logging.')
parser.add_option('--vpd_utility', dest='vpd_utility', metavar='FILE',
help='Location of vpd utility.')
return parser
def main():
"""Main entry when invoked from the command line."""
global _options # pylint: disable=W0603
parser = option_parser()
(_options, args) = parser.parse_args()
if _options.verbose:
level=logging.DEBUG if _options.verbose else logging.INFO)
if args:
parser.error('Invalid args: %s' % ' '.join(args))
if not os.path.exists(_options.gbb_utility):
parser.error('Unable to find GBB utility: %s' % _options.gbb_utility)
if not os.path.exists(_options.vpd_utility):
parser.error('Unable to find VPD utility: %s' % _options.vpd_utility)
shopfloor = get_instance(_options.shopfloor_url)
if not server_is_up(shopfloor):
logging.fatal('Unable to connect with shopfloor server: %s',
if not _options.firmware and (not _options.ro_vpd or not _options.rw_vpd or
not _options.gbb):
parser.error('You must specify a firmware file. (-f)')
if not _options.rma_number:
parser.error('You must specify an RMA number. (-r)')
ro_vpd = _options.ro_vpd if _options.ro_vpd else _options.firmware
rw_vpd = _options.rw_vpd if _options.rw_vpd else _options.firmware
gbb = _options.gbb if _options.gbb else _options.firmware
device_data = DeviceDataFromFirmwareRegions(ro_vpd, rw_vpd, gbb,
if not device_data.is_valid():
logging.warning('Device data is not valid. Not sending data to shopfloor '
'server: %r', device_data)
sys.exit(0) # Return success to proceed with flashing netboot firmware.
reply = SaveDeviceData(shopfloor, device_data)
if reply['status'] == 'success':'Successfully sent device data to shopfloor server.')
elif reply['status'] == 'conflict':
existing_device_data = DeviceData.FromDictionary(reply['data'])
if device_data == existing_device_data:'Found existing identical device data.')
if prompt(
'RMA data for %s already exists.\n'
'Existing data:\n'
'New data:\n'
'Do you want to replace it (Y/N)? ' %
(_options.rma_number, existing_device_data, device_data)):
reply = SaveDeviceData(shopfloor, device_data, overwrite=True)
if reply['status'] != 'success':
logging.fatal('Unexpected error while overwriting data: %s', reply)
logging.fatal('Invalid reply from shopfloor server: %s', reply)
if __name__ == '__main__':