blob: 28afba55b24ae9caeee8d46198e07a432fc2cfb8 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests a storage device by running the badblocks command.
By default the unused portion of the stateful partition is used. (For
instance, on a device with a 32GB hard drive, cgpt reports that partition 1 is
about 25 GiB, but the size of the filesystem is only about 1 GiB. We
run the test on the unused 24 GiB.)
Alternatively one can specify the use of a file in the filesystem allocated by
the test, or raw mode where a specific file/partition must be provided.
from collections import namedtuple
import logging
import re
from select import select
import subprocess
import threading
import time
import unittest
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.test import event_log
from cros.factory.test import factory
from cros.factory.test.i18n import test_ui as i18n_test_ui
from cros.factory.test import test_ui
from cros.factory.test import ui_templates
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import debug_utils
from cros.factory.utils import file_utils
from cros.factory.utils import sys_utils
HTML = """
<div id="bb-phase" style="font-size: 200%"></div>
<div id="bb-status" style="font-size: 150%"></div>
<div id="bb-progress"></div>
class BadBlocksTest(unittest.TestCase):
# SATA link speed, or None if unknown.
sata_link_speed_mbps = None
ARGS = [
Arg('mode', str, 'String to specify which operating mode to use, '
'currently this supports file, raw or stateful_partition_free_space.',
Arg('device_path', str, 'Override the device path on which to test. '
'Also functions as a file path for file and raw modes.',
Arg('max_bytes', (int, long), 'Maximum size to test, in bytes.',
Arg('max_errors', int, 'Stops testing after the given number of errors.',
default=20, optional=True),
Arg('timeout_secs', (int, float), 'Timeout in seconds for progress lines',
Arg('extra_log_cmd', str,
'Extra command to run at start/finish to collect logs.',
Arg('log_threshold_secs', (int, float),
'If no badblocks output is detected for this long, log an error '
'but do not fail',
Arg('log_interval_secs', int,
'The interval between progress logs in seconds.',
Arg('drop_caches_interval_secs', int,
'The interval between dropping caches in seconds.',
Arg('destructive', bool,
'Do desctructive read / write test. If set to False, '
'the data will be kept after testing, but longer testing time is '
def setUp(self):
self.dut = device_utils.CreateDUTInterface()
self.ui = test_ui.UI()
self.template = ui_templates.TwoSections(self.ui)
# A process that monitors /var/log/messages file.
self.message_monitor = None
def runTest(self):
thread = threading.Thread(target=self._CheckBadBlocks)
def tearDown(self):
# Sync, so that any problems (like writing outside of our partition)
# will show up sooner rather than later.
if self.args.mode == 'file':
self.dut.Call(['rm', '-f', self.args.device_path])
if self.message_monitor:
self.message_monitor = None
def _CheckBadBlocks(self):
def CheckArgs(self):
if self.args.max_bytes:
# We don't want to try running bad blocks on <1kB
self.assertTrue(self.args.max_bytes >= 1024, 'max_bytes too small.')
if self.args.device_path is None:
if self.args.mode == 'raw':
raise ValueError('In raw mode the device_path must be specified.')
self.args.device_path =
if self.args.mode == 'file':
if self.args.device_path.startswith('/dev/'):
# In file mode we want to use the filesystem, not a device node,
# so we default to the stateful partition.
self.args.device_path = '/mnt/stateful_partition/temp_badblocks_file'
if self.args.max_bytes is None:
# Default to 100MB file size for testing.
self.args.max_bytes = 100 * 1024 * 1024
# Add in a file extension, to ensure we are only using our own file.
self.args.device_path = self.args.device_path + '.for_bad_blocks_test'
self.args.max_bytes = self._GenerateTestFile(self.args.device_path,
def DetermineParameters(self):
# TODO(bhthompson): refactor this for a better device type detection.
# pylint: disable=W0201
if self.args.mode == 'file':
unused_mount_on, self._filesystem =
self._filesystem = self.args.device_path
# If 'mmcblk' in self._filesystem assume we are eMMC.
self._is_mmc = 'mmcblk' in self._filesystem
first_block = 0
sector_size = 1024
if self.args.mode == 'file':
last_block = self.args.max_bytes / sector_size'Using a generated file at %s, size %dB, sector size %dB, '
'last block %d.', self.args.device_path, self.args.max_bytes,
sector_size, last_block)
elif self.args.mode == 'raw':
# For some files like dev nodes we cannot trust the stats provided by
# the os, so we manually seek to the end of the file to determine size.
raw_file_bytes = file_utils.GetFileSizeInBytes(self.args.device_path,
if self.args.max_bytes is None or self.args.max_bytes > raw_file_bytes:'Setting max_bytes to the available size of %dB.',
self.args.max_bytes = raw_file_bytes
if self.args.device_path.startswith('/dev/'):
partitions = sys_utils.PartitionManager(self.args.device_path, self.dut)
sector_size = partitions.GetSectorSize()
last_block = self.args.max_bytes / sector_size'Using an existing file at %s, size %dB, sector size %dB, '
'last block %d.', self.args.device_path, self.args.max_bytes,
sector_size, last_block)
elif self.args.mode == 'stateful_partition_free_space':
part_prefix = 'p' if self.args.device_path[-1].isdigit() else ''
# Always partition 1
partition_path = '%s%s1' % (self.args.device_path, part_prefix)
# Determine total length of the FS
dumpe2fs = self.dut.CheckOutput(['dumpe2fs', '-h', partition_path],
log=True)'Filesystem info for header:\n%s', dumpe2fs)
fields = dict(re.findall(r'^(.+):\s+(.+)$', dumpe2fs, re.MULTILINE))
fs_first_block = int(fields['First block'])
fs_block_count = int(fields['Block count'])
fs_block_size = int(fields['Block size'])
partitions = sys_utils.PartitionManager(self.args.device_path, self.dut)
partition_index = 1
start_sector = partitions.GetPartitionOffsetInSector(partition_index)
sector_count = partitions.GetPartitionSizeInSector(partition_index)
sector_size = partitions.GetSectorSize()
# Could get this to work, but for now we assume that fs_block_size is a
# multiple of sector_size.
self.assertEquals(0, fs_block_size % sector_size,
'fs_block_size %d is not a multiple of sector_size %d' %
(fs_block_size, sector_size))
first_unused_sector = (fs_first_block + fs_block_count) * (
fs_block_size / sector_size)
first_block = first_unused_sector + start_sector
sectors_to_test = sector_count - first_unused_sector
if self.args.max_bytes:
sectors_to_test = min(sectors_to_test,
self.args.max_bytes / sector_size)
last_block = first_block + sectors_to_test - 1', '.join(
['%s=%s' % (x, locals()[x])
for x in ['fs_first_block', 'fs_block_count', 'fs_block_size',
'start_sector', 'sector_count',
last_block >= first_block,
'This test requires miniOmaha installed factory test image')
raise ValueError('Invalid mode selected, check test_list mode setting.')
Parameters = namedtuple('Parameters', ('first_block last_block sector_size '
'device_path max_errors'))
return Parameters(first_block, last_block, sector_size,
self.args.device_path, self.args.max_errors)
def _CheckBadBlocksImpl(self):
'badblocks test may not be run within the chroot')
params = self.DetermineParameters()
test_size_mb = '%.1f MiB' % (
(params.last_block - params.first_block + 1) *
params.sector_size / 1024. ** 2)
'Testing {test_size_mb} region of storage',
# Kill any badblocks processes currently running
self.dut.Call(['killall', 'badblocks'])
# -f = force (since the device may be in use)
# -s = show progress
# -v = verbose (print error count)
# -w = destructive write+read test
# -n = non-destructive write+read test
args = '-fsv'
args += 'w' if self.args.destructive else 'n'
process = self.dut.Popen(
['badblocks', args, '-b', str(params.sector_size)] +
(['-e', str(params.max_errors)] if params.max_errors else []) +
[params.device_path, str(params.last_block), str(params.first_block)],
log=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# The total number of phases there will be (8: read and write for each
# of 4 different patterns).
total_phases = 8
# The phase we're currently in (0-relative).
current_phase = 0
# How far we are through the current phase.
fraction_within_phase = 0
buf = []
lines = []
def UpdatePhase():
event_log.Log('start_phase', current_phase=current_phase)
'Phase {current_phase}/{total_phases}: ',
current_phase=min(current_phase + 1, total_phases),
last_drop_caches_time = last_log_time = time.time()
while True:
# Assume no output in timeout_secs means hung on disk op.
start_time = time.time()
rlist, unused_wlist, unused_xlist = select(
[process.stdout], [], [], self.args.timeout_secs)
end_time = time.time()
if end_time - start_time > self.args.log_threshold_secs:
factory.console.warn('Delay of %.2f s between badblocks progress lines',
end_time - start_time)
event_log.Log('delay', duration_secs=end_time - start_time)
'Timeout: No badblocks output for %.2f s' % self.args.timeout_secs)
ch =
if ch in ['', '\x08', '\r', '\n']:
line = ''.join(buf).strip()
if line:
# Log if this is not a progress line or log_interval_secs has passed
# since last log line.
match = re.match(r'([.0-9]+)% done, ', line)
log_elapsed_time = time.time() - last_log_time
if not match or log_elapsed_time > self.args.log_interval_secs:
last_log_time = time.time()'badblocks> %s', line)
match ='([.0-9]+)% done, ', line)
if match:
# The percentage reported is actually the percentage until the last
# block; convert it to an offset within the current phase.
block_offset = (
float( / 100) * (params.last_block + 1)
fraction_within_phase = (block_offset - params.first_block) / float(
params.last_block + 1 - params.first_block)
self.ui.SetHTML(line[match.end():], id='bb-progress')
line = line[:match.start()].strip() # Remove percentage from status
line = line.rstrip(':')
if line and line != 'done':
self.ui.SetHTML(test_ui.Escape(line), id='bb-status')
# Calculate overall percentage done.
fraction_done = (current_phase / float(total_phases) +
max(0, fraction_within_phase) / float(total_phases))
self.template.SetProgressBarValue(round(fraction_done * 100))
if line.startswith('done'):
current_phase += 1
fraction_within_phase = 0
if ch == '':
buf = []
# See if we shuold drop caches.
if (self.args.drop_caches_interval_secs and
(time.time() - last_drop_caches_time >
self.args.drop_caches_interval_secs)):'Dropping caches')
self.dut.WriteFile('/proc/sys/vm/drop_caches', '1')
last_drop_caches_time = time.time()
0, process.wait(),
'badblocks returned with error code %d' % process.returncode)
last_line = lines[-1]
self.assertEquals('Pass completed, 0 bad blocks found. (0/0/0 errors)',
def _GenerateTestFile(self, file_path, file_bytes):
"""Generate a sparse file for testing of a given size.
file_path: String of the path to the file to generate.
file_bytes: Int/Long of the number of bytes to generate.
Int/Long of the number of bytes actually allocated.
Assertion if the containing folder does not exist.
Assertion if the filesystem does not have adequate space.
folder = self.dut.path.dirname(file_path)
self.assertTrue(self.dut.path.isdir(folder), 'Folder does not exist.')
stat = self.dut.CheckOutput(['stat', '-f', '-c', '%a|%s', folder])
(available_blocks, block_size) = map(int, stat.split('|'))
free_bytes = available_blocks * block_size'Detected %dB free space at %s', free_bytes, folder)
# Assume we want at least 10MB free on the file system, so we make a pad.
pad = 10 * 1024 * 1024
if file_bytes > free_bytes + pad:
logging.warn('The file size is too large for the file system, '
'clipping file to %dB.', free_bytes - pad)
file_bytes = free_bytes - pad
self.dut.Call(['truncate', '-s', str(file_bytes), file_path], log=True)
return file_bytes
def _LogSmartctl(self):
# No smartctl on mmc.
# TODO (cychiang) We need to find a replacement
# of smartctl for mmc.
if self._is_mmc:
if self.args.extra_log_cmd:
process = self.dut.Popen(
self.args.extra_log_cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, log=True)
except NotImplementedError:
# ADBLink can't separate stderr and stdout to different PIPE
process = self.dut.Popen(
self.args.extra_log_cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, log=True)
stdout_data, stderr_data = process.communicate()
if stdout_data:'stdout:\n%s', stdout_data)
if stderr_data:'stderr:\n%s', stderr_data)
event_log.Log('log_command', command=self.args.extra_log_cmd,
stdout=stdout_data, stderr=stderr_data)
smartctl_output = self.dut.CheckOutput(
['smartctl', '-a', self._filesystem])
event_log.Log('smartctl', stdout=smartctl_output)'smartctl output: %s', smartctl_output)
'SMART overall-health self-assessment test result: PASSED'
in smartctl_output,
'SMART says drive is not healthy')
def _UpdateSATALinkSpeed(self):
"""Updates the current SATA link speed based on /var/log/messages."""
# No SATA on mmc.
if self._is_mmc:
first_time = self.message_monitor is None
if first_time:
self.message_monitor = self.dut.Popen(['tail', '-f', '/var/log/messages'],
# List of dicts to log.
link_info_events = []
if first_time:
rlist, unused_wlist, unused_xlist = select(
[self.message_monitor.stdout], [], [], self.args.timeout_secs)
if not rlist:
logging.warn('UpdateSATALinkSpeed: Cannot get any line from '
'/var/log/messages after %d seconds',
while True:
rlist, unused_wlist, unused_xlist = select(
[self.message_monitor.stdout], [], [], 0)
if not rlist:
log_line = self.message_monitor.stdout.readline()
if not log_line: # this shouldn't happen
log_line = log_line.strip()
match = re.match(r'(\S)+.+SATA link up ([0-9.]+) (G|M)bps', log_line)
if match:
self.sata_link_speed_mbps = (
int(float( *
(1000 if == 'G' else 1)))
# Copy any ATA-related messages to the test log, and put in event logs.
if not first_time and'\bata[0-9.]+:', log_line):'System log message: %s', log_line)
event_log.Log('system_log_message', log_line=log_line)
if first_time and link_info_events:
# First time, ignore all but the last
link_info_events = link_info_events[-1:]
for event in link_info_events:'SATA link info: %r', event)
event_log.Log('sata_link_info', **event)