blob: 2bf26f195dc21cfb09578c9fdefd02891ec9109b [file] [log] [blame]
# Copyright 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Performs consecutive read/write operations on a single file.
Description
-----------
This pytest performs consecutive read/write operations on a single file under a
specific directory or mount point. The primary purpose is to keep storage busy
for some reliability test. It can also be used as a simple approach to stress a
storage device as well.
Since it operates on a single file under user space, controls over which block
is exercising are limited (e.g., it might always write on the same block.)
In this sense, this test should be considered as a test with limited coverage.
Also noted that, an unexpected abortion will leave the temporary file uncleaned
in the specified directory or mount point.
Please also refer to the pytest `removable_storage`, which might also be useful
to test a storage device.
Test Procedure
--------------
This is an automated test without user interaction.
Dependency
----------
Use `toybox` and `dd` to perform read/write operations.
Use `/dev/urandom` to generate random data for write.
Examples
--------
To test read/write a 10MB file under `/home/root`, add this in test list::
{
"pytest_name": "storage_simple_stress",
"args": {
"operations": 1,
"dir": "/home/root",
"file_size": 10485760
}
}
To test read/write of a 100MB file 3 times for block device 'mmcblk1p1'::
{
"pytest_name": "storage_simple_stress",
"args": {
"operations": 3,
"mount_device": "/dev/block/mmcblk1p1",
"dir": ".",
"file_size": 104857600
}
}
"""
import logging
import time
import unittest
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import sys_utils
BLOCK_SIZE = 4096
class SimpleStorageStressTest(unittest.TestCase):
ARGS = [
Arg('dir', str, 'Directory for creating files for random access.'),
Arg('file_size', int,
'The file size of generated file.'),
Arg('operations', int,
'The number of operations to perform.'),
Arg('mount_device', str, 'If not None, we mount the given device first '
'to a temp directory, and perform reading / writing under the '
"directory. The arugment 'dir' will be used as the relative path "
'under the mount point.', default=None),
]
def setUp(self):
self._dut = device_utils.CreateDUTInterface()
def ReadWriteFile(self, test_file, file_size):
"""Performs a read/write to a specific file."""
with self._dut.temp.TempFile() as data_file:
# Prepare a random content.
logging.info('Preparing data.')
self._dut.CheckCall(
['toybox', 'dd', 'if=/dev/urandom',
'of=%s' % data_file, 'bs=%d' % file_size,
'count=1', 'conv=sync'])
# perform write operation.
logging.info('Performing write test.')
start_time = time.time()
self._dut.CheckCall(
['toybox', 'dd', 'if=%s' % data_file, 'of=%s' % test_file,
'bs=%d' % BLOCK_SIZE, 'conv=fsync'])
write_time = time.time() - start_time
# Drop cache to ensure the system do a real read.
logging.debug('Memory usage before drop_caches = %s',
self._dut.CheckOutput(['free', '-m']))
# For the constant, please refer to 'man drop_caches'
self._dut.WriteFile('/proc/sys/vm/drop_caches', '3')
logging.debug('Memory usage after drop_caches = %s',
self._dut.CheckOutput(['free', '-m']))
# perform read operation.
logging.info('Performing read test.')
start_time = time.time()
self._dut.CheckCall(
'toybox dd if=%s | toybox cmp %s -' % (test_file, data_file))
read_time = time.time() - start_time
logging.info('Write time=%.3f secs', write_time)
logging.info('Read time=%.3f secs', read_time)
return True
def TestReadWriteIn(self, dirpath):
file_size = self.args.file_size
for iteration in xrange(self.args.operations):
with self._dut.temp.TempFile(dir=dirpath) as temp_file:
logging.info(
'[%d/%d]: Tempfile[%s] created for %d bytes write/read test',
iteration, self.args.operations, temp_file, file_size)
self.ReadWriteFile(temp_file, file_size)
def runTest(self):
if self.args.mount_device:
with sys_utils.MountPartition(
self.args.mount_device, rw=True, dut=self._dut) as mount_path:
self.TestReadWriteIn(self._dut.path.join(mount_path, self.args.dir))
else:
self.TestReadWriteIn(self.args.dir)