# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tools to mount partition in an image or a block device."""
import argparse
import logging
import os
import stat
import struct
import tempfile
import time
from contextlib import contextmanager
import factory_common # pylint: disable=W0611
from cros.factory.utils.process_utils import Spawn
class MountPartitionException(Exception):
"""Exception for MountPartition."""
def MountPartition(source_path, index=None, mount_point=None, rw=False,
is_omaha_channel=False, options=None):
'''Mounts a partition in an image or a block device.
source_path: The image file or a block device.
index: The index of the partition, or None to mount as a single
partition. If source_path is a block device, index must be None.
Note that if is_omaha_channel is set, it is ignored.
mount_point: The mount point. If None, a temporary directory is used.
rw: Whether to mount as read/write.
is_omaha_channel: if it is True and source_path is a file, treats
source_path as a mini-Omaha channel file (kernel+rootfs) and mounts the
rootfs. rootfs offset bytes: 8 + BigEndian(first-8-bytes).
options: A list of options to add to the -o argument when mounting, e.g.,
['offset=8192', 'sizelimit=1048576'].
OSError: if image file or mount point doesn't exist.
subprocess.CalledProcessError: if mount fails.
MountPartitionException: if index is given while source_path is a block
if not mount_point:
mount_point = tempfile.mkdtemp(prefix='mount_partition.')
remove_mount_point = True
remove_mount_point = False
if not os.path.exists(source_path):
raise OSError('Image file %s does not exist' % source_path)
if not os.path.isdir(mount_point):
raise OSError('Mount point %s does not exist', mount_point)
for line in open('/etc/mtab').readlines():
if line.split()[1] == mount_point:
raise OSError('Mount point %s is already mounted' % mount_point)
all_options = ['rw' if rw else 'ro']
# source_path is a block device.
if stat.S_ISBLK(os.stat(source_path).st_mode):
if index:
raise MountPartitionException('index must be None for a block device.')
if is_omaha_channel:
raise MountPartitionException(
'is_omaha_channel must be False for a block device.')
# Use loop option on image file.
if is_omaha_channel:
with open(source_path, 'rb') as f:
first_8_bytes =
offset = struct.unpack('>Q', first_8_bytes)[0] + 8
all_options.append('offset=%d' % offset)
elif index:
def RunCGPT(option):
'''Runs cgpt and returns the integer result.'''
return int(
Spawn(['cgpt', 'show', '-i', str(index),
option, source_path],
read_stdout=True, check_call=True).stdout_data)
offset = RunCGPT('-b') * 512
all_options.append('offset=%d' % offset)
sizelimit = RunCGPT('-s') * 512
all_options.append('sizelimit=%d' % sizelimit)
if options:
Spawn(['mount', '-o', ','.join(all_options), source_path, mount_point],
log=True, check_call=True, sudo=True)
def Unmounter():
yield mount_point
finally:'Unmounting %s', mount_point)
for _ in range(5):
if Spawn(['umount', mount_point], call=True, sudo=True,
ignore_stderr=True).returncode == 0:
time.sleep(1) # And retry
logging.warn('Unable to umount %s', mount_point)
if remove_mount_point:
except OSError:
return Unmounter()
def main():
parser = argparse.ArgumentParser(
description="Mount a partition in an image file.")
parser.add_argument('-rw', '--rw', action='store_true',
help='mount partition read/write')
parser.add_argument('source_path', help='an image file or a block device')
parser.add_argument('index', type=int, help='partition index')
parser.add_argument('mount_point', help='mount point')
args = parser.parse_args()
MountPartition(args.source_path, args.index, args.mount_point,
if __name__ == '__main__':