blob: fe5b76e01ad38275b51804b5f548606b838e64c8 [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
'''A module to provide interface to ChromeOS services.'''
import datetime
import os
import re
import shutil
import struct
import subprocess
import tempfile
import time
# Source of ACPI information on ChromeOS machines.
ACPI_DIR = '/sys/devices/platform/chromeos_acpi'
class ChromeOSInterfaceError(Exception):
'''ChromeOS interface specific exception.'''
pass
class ChromeOSInterface(object):
'''An object to encapsulate OS services functions.'''
def __init__(self, silent):
'''Object construction time initialization.
The only parameter is the Boolean 'silent', when True the instance
does not duplicate log messages on the console.
'''
self.silent = silent
self.state_dir = None
self.log_file = None
self.acpi_dir = ACPI_DIR
def init(self, state_dir=None, log_file=None):
'''Initialize the ChromeOS interface object.
Args:
state_dir - a string, the name of the directory (as defined by the
caller). The contents of this directory persist over
system restarts and power cycles.
log_file - a string, the name of the log file kept in the state
directory.
Default argument values support unit testing.
'''
self.state_dir = state_dir
if self.state_dir:
if not os.path.exists(self.state_dir):
try:
os.mkdir(self.state_dir)
except OSError, err:
raise ChromeOSInterfaceError(err)
if log_file:
self.log_file = os.path.join(state_dir, log_file)
def target_hosted(self):
'''Return True if running on a ChromeOS target.'''
signature = open('/proc/version_signature', 'r').read()
return re.search(r'chrom(ium|e)os', signature, re.IGNORECASE) != None
def state_dir_file(self, file_name):
'''Get a full path of a file in the state directory.'''
return os.path.join(self.state_dir, file_name)
def acpi_file(self, file_name):
'''Get a full path of a file in the ACPI directory.'''
return os.path.join(self.acpi_dir, file_name)
def init_environment(self):
'''Initialize Chrome OS interface environment.
If state dir was not set up by the constructor, create a temp
directory, otherwise create the directory defined during construction
of this object.
Return the state directory name.
'''
if self.target_hosted() and not os.path.exists(self.acpi_dir):
raise ChromeOSInterfaceError(
'ACPI directory %s not found' % self.acpi_dir)
if not self.state_dir:
self.state_dir = tempfile.mkdtemp(suffix='_saft')
else:
# Wipe out state directory, to start the state machine clean.
shutil.rmtree(self.state_dir)
# And recreate it
self.init(self.state_dir, self.log_file)
return self.state_dir
def shut_down(self, new_log='/var/saft_log.txt'):
'''Destroy temporary environment so that the test can be restarted.'''
if os.path.exists(self.log_file):
shutil.copyfile(self.log_file, new_log)
shutil.rmtree(self.state_dir)
def log(self, text):
'''Write text to the log file and print it on the screen, if enabled.
The entire log (maintained across reboots) can be found in
self.log_file.
'''
# Don't print on the screen unless enabled.
if not self.silent:
print text
if not self.log_file or not os.path.exists(self.state_dir):
# Called before environment was initialized, ignore.
return
timestamp = datetime.datetime.strftime(
datetime.datetime.now(), '%I:%M:%S %p:')
log_f = open(self.log_file, 'a')
log_f.write('%s %s\n' % (timestamp, text))
log_f.close()
def exec_exists(self, program):
'''Check if the passed in string is a valid executable found in PATH.'''
for path in os.environ['PATH'].split(os.pathsep):
exe_file = os.path.join(path, program)
if (os.path.isfile(exe_file) or os.path.islink(exe_file)
) and os.access(exe_file, os.X_OK):
return True
return False
def run_shell_command(self, cmd):
'''Run a shell command.
In case of the command returning an error print its stdout and stderr
outputs on the console and dump them into the log. Otherwise suppress all
output.
In case of command error raise an OSInterfaceError exception.
Return the subprocess.Popen() instance to provide access to console
output in case command succeeded.
'''
self.log('Executing %s' % cmd)
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
process.wait()
if process.returncode:
err = ['Failed running: %s' % cmd]
err.append('stdout:')
err.append(process.stdout.read())
err.append('stderr:')
err.append(process.stderr.read())
text = '\n'.join(err)
print text
self.log(text)
raise ChromeOSInterfaceError('command %s failed' % cmd)
return process
def is_removable_device(self, device):
'''Check if a certain storage device is removable.
device - a string, file name of a storage device or a device partition
(as in /dev/sda[0-9]).
Returns True if the device is removable, False if not.
'''
# Drop trailing digit(s) and letter(s) (if any)
dev_name_stripper = re.compile('[0-9].*$')
base_dev = dev_name_stripper.sub('', device.split('/')[2])
removable = int(open('/sys/block/%s/removable' % base_dev, 'r'
).read())
return removable == 1
def get_root_dev(self):
'''Return a string, the name of the root device'''
return self.run_shell_command_get_output('rootdev -s')[0]
def run_shell_command_get_output(self, cmd):
'''Run shell command and return its console output to the caller.
The output is returned as a list of strings stripped of the newline
characters.'''
process = self.run_shell_command(cmd)
return [x.rstrip() for x in process.stdout.readlines()]
def boot_state_vector(self):
'''Read and return to caller a string describing the system state.
The string has a form of x:x:x:<removable>:<partition_number>, where
x' represent contents of the appropriate BINF files as reported by
ACPI, <removable> is set to 1 or 0 depending if the root device is
removable or not, and <partition number> is the last element of the
root device name, designating the partition where the root fs is
mounted.
This vector fully describes the way the system came up.
'''
binf_fname_template = 'BINF.%d'
state = []
for index in range(3):
fname = os.path.join(self.acpi_dir, binf_fname_template % index)
max_wait = 30
cycles = 0
# In some cases (for instance when running off the flash file
# system) the ACPI files go not get created right away. Let's give
# it some time to settle.
while not os.path.exists(fname):
if cycles == max_wait:
self.log('%s is not present' % fname)
raise AssertionError
time.sleep(1)
cycles += 1
if cycles:
self.log('ACPI took %d cycles' % cycles)
state.append(open(fname, 'r').read())
root_dev = self.get_root_dev()
state.append('%d' % int(self.is_removable_device(root_dev)))
state.append('%s' % root_dev[-1])
state_str = ':'.join(state)
return state_str
def get_writeable_mount_point(self, dev, tmp_dir):
'''Get mountpoint of the passed in device mounted in read/write mode.
If the device is already mounted and is writeable - return its mount
point. If the device is mounted but read-only - remount it read/write
and return its mount point. If the device is not mounted - mount it read
write on the passsed in path and return this path.
'''
# The device root file system is mounted on is represented as /dev/root
# otherwise.
options_filter = re.compile('.*\((.+)\).*')
root_dev = self.get_root_dev()
if dev == root_dev:
dev = '/dev/root'
for line in self.run_shell_command_get_output('mount'):
if not line.startswith('%s ' % dev):
continue
mount_options = options_filter.match(line).groups(0)[0]
# found mounted
if 'ro' in mount_options.split(','):
# mounted read only
self.run_shell_command('mount -o remount,rw %s' % dev)
return line.split()[2] # Mountpoint is the third element.
# Not found, needs to be mounted
self.run_shell_command('mount %s %s' % (dev, tmp_dir))
return tmp_dir
def retrieve_body_version(self, blob):
'''Given a blob, retrieve body version.
Currently works for both, firmware and kernel blobs. Returns '-1' in
case the version can not be retrieved reliably.
'''
header_format = '<8s8sQ'
preamble_format = '<40sQ'
magic, _, kb_size = struct.unpack_from(header_format, blob)
if magic != 'CHROMEOS':
return -1 # This could be a corrupted version case.
_, version = struct.unpack_from(preamble_format, blob, kb_size)
return version
def read_partition(self, partition, size):
'''Read the requested partition, up to size bytes.'''
tmp_file = self.state_dir_file('part.tmp')
self.run_shell_command('dd if=%s of=%s bs=1 count=%d' % (
partition, tmp_file, size))
fileh = open(tmp_file, 'r')
data = fileh.read()
fileh.close()
os.remove(tmp_file)
return data