blob: 61acb6b6574e8d098352b8b6f71454c8bbe7cafb [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
""" gft_report.py: Device detail reports for factory process.
The reports for factory is currently a python native dict.
You can use CreateReport() to create a native report,
ValidateReport() to check if a native report is valid,
FormatReport() to print a native report in text form,
EncodeReport() to transform a report to ASCII-safe format,
DecodeReport() to covert from an encoded string.
"""
import base64
import gzip
import os
import pprint
import re
import StringIO
import sys
import time
import crosfw
import gft_common
from gft_common import ErrorMsg, VerboseMsg, DebugMsg, ErrorDie
# TODO(tammo): It would be nice to be less python-centric, and maybe
# to be more flexible. Consider using YAML for the primary structured
# portion of the data. Also consider collecting log data
# incrementally on disk. The routines here could be called by other
# modules to add log data (either structured or non-structured), and
# then the final log creation just serializes the collection.
# Consider using MIME for this serialization (and MIME types could be
# indicated when data is registered to be included).
# Update this if any field names (or formats) have been changed.
REPORT_VERSION = 4
# Keys in decoding results
KEY_INFO_DECODERS = 'decoders'
def ParseKeyValueData(pattern, data):
"""Converts a given key-value style into [(key, value)] format.
Args:
pattern: A regex pattern to decode key/value pairs
data: The data to be parsed.
Returns:
A { key: value, ... } dict.
Raises:
ValueError: When the input is invalid.
"""
parsed_list = {}
for line in data.splitlines():
matched = re.match(pattern, line.strip())
if not matched:
raise ValueError("Invalid data: %s" % line)
(name, value) = (matched.group(1), matched.group(2))
if name in parsed_list:
raise ValueError("Duplicated entry: %s" % name)
parsed_list[name] = value
return parsed_list
def ParseVPDOutput(output):
""" Converts "a"="b"\n list into [(a, b)] """
return ParseKeyValueData('"(.*)"="(.*)"$', output)
def ParseCrossystemOutput(output):
""" Converts [a = b # comment] into [(a, b)] """
return ParseKeyValueData("^([^ =]*) *= *(.*[^ ]) *# [^#]*$", output)
def EncodeReport(native_report, text_armed=True):
""" Encodes a native python-dict report into ASCII friendly form. """
buf = StringIO.StringIO()
zbuf = gzip.GzipFile(fileobj=buf, mode='wb')
zbuf.write(FormatReport(native_report) + "\n")
zbuf.close()
buf.seek(0) # prepare for read
data = buf.read()
if text_armed:
data = base64.encodestring(data)
return data
def DecodeReport(blob, info={}):
"""Decodes a report in blob (or ASCII) form into native python dict.
Args:
blob: A data containing encoded report.
info: Dictionary object to receive information collected during decode.
"""
# The report may be encoded in gzipped/base64 formats.
def Base64Decoder(data):
info[KEY_INFO_DECODERS] += ['b64']
return base64.decodestring(data)
def GunzipDecoder(data):
info[KEY_INFO_DECODERS] += ['gz']
buf = StringIO.StringIO()
buf.write(data)
buf.seek(0) # prepare for read
zbuf = gzip.GzipFile(fileobj=buf, mode='rb')
return zbuf.read()
def ReportDecoder(data):
info[KEY_INFO_DECODERS] += ['rpt']
return eval(data)
decoders = [
lambda(x): ReportDecoder(GunzipDecoder(Base64Decoder(x))),
lambda(x): ReportDecoder(GunzipDecoder(x)),
lambda(x): ReportDecoder(x),
]
for decoder in decoders:
try:
info[KEY_INFO_DECODERS] = []
decoded = decoder(blob)
return decoded
except:
pass
raise ValueError, "Invalid report."
def FormatReport(native_report):
""" Returns a pretty-formatted text presentation of the report. """
return pprint.pformat(native_report)
def ValidateReport(native_report):
'''Type check the details data.
Args:
native_report: A dict object containing the detail report.
Returns:
None if validation passed, otherwise a string of reason for failure.
'''
mandatory_string_keys = ['version',
'create_params',
'device_timestamp',
'platform_name',
'hwid',
'tag',
]
mandatory_dict_keys = ['crossystem',
'hwid_properties',
'cooked_probe_results',
'ro_vpd',
'rw_vpd',
]
mandatory_list_keys = ['modem_status',
'verbose_log',
'wp_status',
]
if not isinstance(native_report, dict):
return 'native_report must be a dict'
# populate key difference
mandatory_key_set = (set(mandatory_string_keys) |
set(mandatory_dict_keys) |
set(mandatory_list_keys))
detail_key_set = set(native_report.keys())
key_set_delta = detail_key_set ^ mandatory_key_set
if key_set_delta:
err_msg = 'detail key sets differ'
extra = detail_key_set - mandatory_key_set
if extra:
err_msg += ', extra keys [%s]' % ', '.join([repr(x) for x in extra])
missing = mandatory_key_set - detail_key_set
if missing:
err_msg += ', missing keys [%s]' % ', '.join([repr(x) for x in missing])
return err_msg
# validate value attributes
for key in mandatory_string_keys:
if not isinstance(native_report[key], str):
return 'property %s in report should be a simple string.' % key
for key in mandatory_dict_keys:
value = native_report[key]
if not isinstance(value, dict):
return 'property %s in report should be a dict.' % key
# TODO(tammo): Either fix the check below (which currently forces
# each dict value to be simple string, and thus is not compatible
# with the deeper nesting used by the hwid_properties and cooked_*
# fields) or switch to a YAML/etc report format w/o checks.
# for subkey in value:
# if not isinstance(value[subkey], str):
# return ('property %s.%s in report should be a simple string.' %
# (key, subkey))
for key in mandatory_list_keys:
value = native_report[key]
if not isinstance(value, list):
return 'property %s in report should be a list.' % key
# each elements in value should be simple string
for subvalue in value:
if not isinstance(subvalue, str):
return 'property %s in report should be a list of simple strings.' % key
return None
def CreateReport(create_params,
system_details,
verbose_log_path=gft_common.DEFAULT_CONSOLE_LOG_PATH,
vpd_source=None,
tag='',
verbose=False):
"""Creates a detail report for current device.
Collects hwid, vpd, probed components, and attach a timestamp.
Args:
create_params: the original command line that creates the report.
probed_components: A match result from gft_hwcomp.HardwareComponents
vpd_source: Optional input image for VPD values (None for system)
Returns:
A dict mapping keys to details. Example:
{'hwid': 'ABC',
'device_timestamp': '20111031-175023', # UTC
...}
"""
report = {}
# TODO(tammo): Consider adding /var/log/messages* and /etc/lsb-release.
# TODO(hungte) we may also add in future:
# rootfs hash, dump_kernel_config, lsb-release from release image,
# gooftool version, result of dev_vboot_debug,
# /var/log/factory.log and any other customized data
# General information
report['version'] = '%s' % REPORT_VERSION
report['create_params'] = ' '.join(create_params)
report['tag'] = tag
# System Hardware ID
report['hwid'] = gft_common.SystemOutput("crossystem hwid").strip()
report['platform_name'] = gft_common.SystemOutput(
"mosys platform name").strip()
# crossystem reports many system configuration data
report['crossystem'] = ParseCrossystemOutput(
gft_common.SystemOutput("crossystem").strip())
# Vital Product Data
vpd_cmd = '-f %s' % vpd_source if vpd_source else ''
report['ro_vpd'] = ParseVPDOutput(
gft_common.SystemOutput("vpd -i RO_VPD -l %s" % vpd_cmd,
progress_message="Reading RO VPD",
show_progress=verbose).strip())
report['rw_vpd'] = ParseVPDOutput(
gft_common.SystemOutput("vpd -i RW_VPD -l %s" % vpd_cmd,
progress_message="Reading RW VPD",
show_progress=verbose).strip())
# HWID-related Device Data
report['hwid_properties'] = system_details.hwid_properties.__dict__
report['cooked_probe_results'] = system_details.cooked_results.__dict__
# Firmware write protection status
# TODO(hungte) Replace by crosfw.Flashrom.
ec_wp_status = gft_common.SystemOutput(
'flashrom -p internal:bus=lpc --get-size 2>/dev/null && '
'flashrom -p internal:bus=lpc --wp-status || '
'echo "EC is not available."',
ignore_status=True)
bios_wp_status = gft_common.SystemOutput(
'flashrom -p internal:bus=spi --wp-status')
wp_status_message = 'main: %s\nec: %s' % (bios_wp_status, ec_wp_status)
report['wp_status'] = wp_status_message.splitlines()
# Cellular status
modem_status = gft_common.SystemOutput('modem status', ignore_status=True)
report['modem_status'] = modem_status.splitlines()
# Verbose log. Should be prepared before the last step.
if verbose_log_path and os.path.exists(verbose_log_path):
verbose_log = gft_common.ReadFile(verbose_log_path).splitlines()
else:
verbose_log = ['(Not available)']
report['verbose_log'] = verbose_log
# Finally, attach a timestamp. This must be the last entry.
report['device_timestamp'] = time.strftime("%Y%m%d-%H%M%S", time.gmtime())
return report
def GetReportName(report_blob):
"""Returns the proper report file name according to report content.
Args:
report_path: Path to report file.
Returns:
File name according to provided report.
"""
def quote_atom(atom, delimiter=''):
return re.sub(r'[^a-zA-Z0-9]+', delimiter, atom).strip(delimiter)
info = {}
report = DecodeReport(report_blob, info)
error = ValidateReport(report)
if error:
ErrorDie("Failed decoding report: %s" % error)
platform = report['platform_name'].lower()
serial = report['ro_vpd'].get('serial_number', 'unknown')
date = report['device_timestamp']
ext = info[KEY_INFO_DECODERS]
ext.reverse()
return '%s_%s_%s.%s' % (quote_atom(platform), quote_atom(serial),
quote_atom(date, '-'), '.'.join(ext))
#############################################################################
# Console main entry
@gft_common.GFTConsole
def main():
""" Main entry as a utility. """
# Only load optparse when running as a stand-alone command.
import glob
import optparse
parser = optparse.OptionParser()
parser.add_option('--debug', action='store_true',
help='provide debug messages.')
parser.add_option('--decode', action='store_true',
help='decode a encoded report to human readable format.')
parser.add_option('--log_path', metavar='PATH',
default=gft_common.DEFAULT_CONSOLE_LOG_PATH,
help='use the given path for getting verbose logs.')
parser.add_option('--report_path', metavar='PATH', default='report.gz',
help='use this path to read / write reports.')
parser.add_option('--report_format', metavar='FORMAT', default='gz',
help='format of the generated report; '
'currently supported values: gz, base64')
parser.add_option('--db_path', metavar='DB_PATH',
help='path pattern for hardware components databases')
(options, args) = parser.parse_args()
if args:
parser.error('Un-expected parameter(s): %s\n' % ' '.join(args))
if options.debug:
gft_common.SetDebugLevel(options.debug)
# Decode
if options.decode:
if not options.report_path:
parser.error('Need --report_path to assign target for decoding.')
data = gft_common.ReadFile(options.report_path)
if options.report_format == 'gz':
text_armed = False
elif options.report_format == 'base64':
text_armed = True
else:
ErrorDie('gft_report: invalid report format: %s' % options.report_format)
print FormatReport(DecodeReport(data))
print "Report name: %s" % GetReportName(data)
return
# Encode
if not options.db_path:
parser.error('Need --db_path.')
# populate db_path
db_files = glob.glob(options.db_path)
if not db_files:
parser.error('No valid files in --db_path (%s)' % options.db_path)
best_match = None
for db_file in db_files:
VerboseMsg("gft_report: Matching for %s..." % db_file)
(probed, failure) = hwcomp.match_current_system(db_file)
if not failure:
best_match = probed
break
if not best_match:
best_match = probed
VerboseMsg('gft_report: Found hardware components: %s' %
hwcomp.pformat(best_match))
# create the native report
main_fw_file = crosfw.LoadMainFirmware().path
native_report = CreateReport(sys.argv,
best_match,
verbose_log_path=options.log_path,
vpd_source=main_fw_file,
verbose=True)
invalid_message = ValidateReport(native_report)
assert not invalid_message, "Invalid report: %s" % invalid_message
# write the text report into stderr, without being logged.
sys.stdout.flush()
sys.stderr.write(FormatReport(native_report) + '\n')
if options.report_format == 'gz':
data = EncodeReport(native_report, text_armed=False)
elif options.report_format == 'base64':
data = EncodeReport(native_report, text_armed=True)
else:
ErrorDie('gft_report: invalid report format: %s' % options.report_format)
if options.report_path:
print "Saved report to: %s" % options.report_path
with open(options.report_path, "wb") as report_handle:
report_handle.write(data)
else:
print data
print "Report name: %s" % GetReportName(data)
if __name__ == "__main__":
main()