blob: 5fe77a66df697d44bb98236fea53088c8598448c [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import logging
import os
import tempfile
import time
from contextlib import contextmanager
import factory_common # pylint: disable=W0611
from cros.factory.utils.process_utils import Spawn
def MountPartition(image_file, index=None, mount_point=None, rw=False):
'''Mounts a partition in an image file.
Args:
image_file: The image file.
index: The index of the partition, or None to mount as a single
partition.
mount_point: The mount point for the loopback mount. If None,
a temporary directory is used.
rw: Whether to mount as read/write.
Raises:
OSError: if image file or mount point doesn't exist.
subprocess.CalledProcessError: if mount fails.
'''
if not mount_point:
mount_point = tempfile.mkdtemp(prefix='mount_partition.')
remove_mount_point = True
else:
remove_mount_point = False
if not os.path.exists(image_file):
raise OSError('Image file %s does not exist' % image_file)
if not os.path.isdir(mount_point):
raise OSError('Mount point %s does not exist', mount_point)
for line in open('/etc/mtab').readlines():
if line.split()[1] == mount_point:
raise OSError('Mount point %s is already mounted' % mount_point)
options = '%s,loop' % ('rw' if rw else 'ro')
if index:
def RunCGPT(option):
'''Runs cgpt and returns the integer result.'''
return int(
Spawn(['cgpt', 'show', '-i', str(index),
option, image_file],
read_stdout=True, check_call=True).stdout_data)
offset = RunCGPT('-b') * 512
size = RunCGPT('-s') * 512
options += ',offset=%d,sizelimit=%d' % (offset, size)
Spawn(['mount', '-o', options, image_file, mount_point],
log=True, check_call=True, sudo=True)
@contextmanager
def Unmounter():
try:
yield mount_point
finally:
logging.info('Unmounting %s', mount_point)
for _ in range(5):
if Spawn(['umount', mount_point], call=True, sudo=True,
ignore_stderr=True).returncode == 0:
break
time.sleep(1) # And retry
else:
logging.warn('Unable to umount %s', mount_point)
if remove_mount_point:
try:
os.rmdir(mount_point)
except OSError:
pass
return Unmounter()
def main():
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
description="Mount a partition in an image file.")
parser.add_argument('-rw', '--rw', action='store_true',
help='mount partition read/write')
parser.add_argument('image_file', help='image file')
parser.add_argument('index', type=int, help='partition index')
parser.add_argument('mount_point', help='mount point')
args = parser.parse_args()
MountPartition(args.image_file, args.index, args.mount_point, args.rw)
if __name__ == '__main__':
main()