blob: 080da1efe230d800cbb0c5b54264adafb8fbc8c5 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import logging
import os
import shutil
import sys
import tempfile
import traceback
import yaml
from cros.factory.test.env import paths
from cros.factory.utils import file_utils
from cros.factory.utils.process_utils import Spawn
DESCRIPTION = '''
Audits source hashes logged during system finalization
to verify that no unauthorized changes have been made to
a factory bundle. Hashes are compared against a "golden"
(known to be correct) set of hashes.
Each value for GOLDEN or SAMPLE may be any of the following:
- the 'py' directory of a factory source tree
- a factory log bundle (with name ending in '.tar.xz')
- a factory toolkit (with name ending in '.run')
- an event log file (in which case the last 'source_hashes' event
logged is examined)
For example, to compare multiple factory logs against a "golden"
toolkit:
audit_source_hashes --golden install_factory_toolkit.run logs/*.tar.xz
'''
class AuditException(Exception):
pass
def GetHashes(path):
"""Gets a dictionary of hashes for the given path.
Args:
path: A path to any of the accepted file types (see DESCRIPTION).
Returns:
A dictionary of paths to their hashes.
"""
if os.path.isdir(path):
if os.path.basename(path) != 'py':
raise AuditException(
'%s is a directory, but not a path to a "py" source directory' % path)
return file_utils.HashSourceTree(path)['hashes']
if path.endswith('.run'):
tmpdir = tempfile.mkdtemp(prefix='toolkit.')
try:
# Extract and hash toolkit
Spawn([path, '--tar', '-axf', '-C', tmpdir, './usr/local/factory/py'],
check_call=True, log=True)
return file_utils.HashSourceTree(
os.path.join(tmpdir, 'usr', 'local', 'factory', 'py'))['hashes']
finally:
shutil.rmtree(tmpdir)
# None of the above: it's either an event log file, or a report
# containing an event log file.
if path.endswith('.tar.xz'):
# Looks like a report. Extract the event log data.
proc = Spawn(['tar', '-Oaxf', path, 'events'], read_stdout=True)
if proc.returncode:
raise AuditException(
'Unable to read events from report %s (tar returned %d)' % (
path, proc.returncode))
data = proc.stdout_data
else:
# Assume it's an event log. Look for the event specifically
# to avoid a bunch of unnecessary YAML parsing.
data = file_utils.ReadFile(path)
events = data.split('\n---\n')
for e in reversed(events):
if not e.startswith('EVENT: source_hashes'):
continue
data = yaml.load(e)
hash_function = data.get('hash_function')
if hash_function != file_utils.SOURCE_HASH_FUNCTION_NAME:
raise ValueError(
'Expected hash function %r but got %r' % (
file_utils.SOURCE_HASH_FUNCTION_NAME,
hash_function))
return data['hashes']
raise AuditException(
'No source_hashes event in event log %s' % path)
def FindMismatches(golden_hashes, sample_hashes, sample, out):
error_count = [0]
def ReportLine(path, msg):
if error_count[0] == 0:
out.write('In sample %s:\n' % sample)
out.write('- %s: %s\n' % (path, msg))
error_count[0] += 1
all_keys = sorted(set(sample_hashes.keys()) |
set(golden_hashes.keys()))
for k in all_keys:
sample_value = sample_hashes.get(k)
golden_value = golden_hashes.get(k)
if golden_value is None and sample_value is not None:
ReportLine(k, 'unexpected file encountered in sample')
elif sample_value is None and golden_value is not None:
ReportLine(k, 'missing from sample')
elif sample_value != golden_value:
ReportLine(k, 'hash mismatch (expected %s, found %s)' % (
golden_value, sample_value))
return error_count[0]
def AuditHashes(golden, samples, out):
"""Audits source hashes (see DESCRIPTION).
Args:
golden: Path to golden used for analysis.
samples: A list of samples to compare against.
Returns:
True if all values are correct, False otherwise.
"""
golden_hashes = GetHashes(golden)
total_bad_samples = 0
total_mismatched_hashes = 0
total_exceptions = 0
for s in samples:
try:
sample_hashes = GetHashes(s)
mismatched_hashes = FindMismatches(golden_hashes,
sample_hashes, s, out)
except Exception:
out.write('Error processing sample %s\n' % s)
traceback.print_exc(file=out)
total_exceptions += 1
total_bad_samples += 1
else:
total_mismatched_hashes += mismatched_hashes
if mismatched_hashes:
total_bad_samples += 1
if total_bad_samples:
out.write('\n'
'Found %d mismatched hashes and %d exceptions.\n'
'FAILED (%d/%d samples passed).\n' % (
total_mismatched_hashes,
total_exceptions,
len(samples) - total_bad_samples,
len(samples)))
return False
out.write('PASSED (%d/%d samples passed).\n' % (
len(samples), len(samples)))
return True
def main(argv=None, out=sys.stderr):
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'--golden', '-g', metavar='GOLDEN',
help='Source of "golden" (correct) hashes; defaults to this source tree')
parser.add_argument(
'samples', metavar='SAMPLE', nargs='+',
help='Samples to check against the golden')
args = parser.parse_args(sys.argv[1:] if argv is None else argv)
logging.basicConfig(level=logging.WARNING)
if args.golden is None:
args.golden = os.path.join(paths.FACTORY_DIR, 'py')
sys.exit(0 if AuditHashes(args.golden, args.samples, out) else 1)
if __name__ == '__main__':
main()