blob: d8246e48bc66fdc6323ab1173f0d064411b125be [file] [log] [blame]
#!/usr/bin/env python2
#
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for sys_utils module."""
from __future__ import print_function
import os
import subprocess
import tempfile
import unittest
import mox
from six import assertRaisesRegex
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.utils import file_utils
from cros.factory.utils.process_utils import Spawn
from cros.factory.utils import sys_utils
SAMPLE_INTERRUPTS = """ CPU0 CPU1 CPU2 CPU3
0: 124 0 0 0 IO-APIC-edge timer
1: 19 683 7 14 IO-APIC-edge i8042
NMI: 612 631 661 401 Non-maskable interrupts
SPU: 0 0 0 0 Spurious interrupts"""
class GetInterruptsTest(unittest.TestCase):
"""Unit tests for GetInterrupts."""
def setUp(self):
self.mox = mox.Mox()
def tearDown(self):
self.mox.VerifyAll()
self.mox.UnsetStubs()
def testGetCount(self):
self.mox.StubOutWithMock(file_utils, 'ReadLines')
file_utils.ReadLines('/proc/interrupts').AndReturn(
SAMPLE_INTERRUPTS.split('\n'))
self.mox.ReplayAll()
ints = sys_utils.GetInterrupts()
# Digit-type interrupts:
self.assertEqual(ints['0'], 124)
self.assertEqual(ints['1'], 19 + 683 + 7 + 14)
# String-type interrupts:
self.assertEqual(ints['NMI'], 612 + 631 + 661 + 401)
self.assertEqual(ints['SPU'], 0)
def testFailToReadProcInterrupts(self):
self.mox.StubOutWithMock(file_utils, 'ReadLines')
file_utils.ReadLines('/proc/interrupts').AndReturn(None)
self.mox.ReplayAll()
assertRaisesRegex(
self, OSError, r'Unable to read /proc/interrupts',
sys_utils.GetInterrupts)
SAMPLE_INPUT_DEVICES = """I: Bus=0019 Vendor=0000 Product=0005 Version=0000
N: Name="Lid Switch"
P: Phys=PNP0C0D/button/input0
S: Sysfs=/devices/LNXSYSTM:00/device:00/PNP0C0D:00/input/input0
U: Uniq=
H: Handlers=lid_event_handler lid_event_handler event0
B: PROP=0
B: EV=21
B: SW=1
I: Bus=0019 Vendor=0000 Product=0001 Version=0000
N: Name="Power Button"
P: Phys=PNP0C0C/button/input0
S: Sysfs=/devices/LNXSYSTM:00/device:00/PNP0C0C:00/input/input1
U: Uniq=
H: Handlers=kbd event1
B: PROP=0
B: EV=3
B: KEY=10000000000000 0
I: Bus=0019 Vendor=0000 Product=0001 Version=0000
N: Name="Power Button"
P: Phys=LNXPWRBN/button/input0
S: Sysfs=/devices/LNXSYSTM:00/LNXPWRBN:00/input/input2
U: Uniq=
H: Handlers=kbd event2
B: PROP=0
B: EV=3
B: KEY=10000000000000 0
I: Bus=0011 Vendor=0001 Product=0001 Version=ab83
N: Name="AT Translated Set 2 keyboard"
P: Phys=isa0060/serio0/input0
S: Sysfs=/devices/platform/i8042/serio0/input/input3
U: Uniq=
H: Handlers=sysrq kbd event3
B: PROP=0
B: EV=120013
B: KEY=400402000000 3803078f800d001 feffffdfffefffff fffffffffffffffe
B: MSC=10
B: LED=7
I: Bus=0018 Vendor=0000 Product=0000 Version=0000
N: Name="Atmel maXTouch Touchscreen"
P: Phys=i2c-3-004a/input0
S: Sysfs=/devices/platform/80860F41:03/i2c-3/3-004a/input/input4
U: Uniq=
H: Handlers=event4
B: PROP=0
B: EV=b
B: KEY=400 0 0 0 0 0
B: ABS=661800001000003
I: Bus=0018 Vendor=0000 Product=0000 Version=0000
N: Name="Atmel maXTouch Touchpad"
P: Phys=i2c-0-004b/input0
S: Sysfs=/devices/platform/80860F41:00/i2c-0/0-004b/input/input5
U: Uniq=
H: Handlers=event5
B: PROP=5
B: EV=b
B: KEY=e520 10000 0 0 0 0
B: ABS=661800001000003
I: Bus=0000 Vendor=0000 Product=0000 Version=0000
N: Name="HDA Intel HDMI"
P: Phys=ALSA
S: Sysfs=/devices/pci0000:00/0000:00:1b.0/sound/card1/input6
U: Uniq=
H: Handlers=event6
B: PROP=0
B: EV=21
B: SW=140
I: Bus=0000 Vendor=0000 Product=0000 Version=0000
N: Name="HDA Intel HDMI"
P: Phys=ALSA
S: Sysfs=/devices/pci0000:00/0000:00:1b.0/sound/card1/input7
U: Uniq=
H: Handlers=event7
B: PROP=0
B: EV=21
B: SW=140
"""
class GetI2CBusTest(unittest.TestCase):
"""Unit tests for GetI2CBus."""
def setUp(self):
self.mox = mox.Mox()
def tearDown(self):
self.mox.VerifyAll()
self.mox.UnsetStubs()
def testTouchpad(self):
self.mox.StubOutWithMock(file_utils, 'ReadFile')
file_utils.ReadFile('/proc/bus/input/devices').AndReturn(
SAMPLE_INPUT_DEVICES)
self.mox.ReplayAll()
self.assertEqual(sys_utils.GetI2CBus(['Dummy device name',
'Atmel maXTouch Touchpad']), 0)
def testTouchscreen(self):
self.mox.StubOutWithMock(file_utils, 'ReadFile')
file_utils.ReadFile('/proc/bus/input/devices').AndReturn(
SAMPLE_INPUT_DEVICES)
self.mox.ReplayAll()
self.assertEqual(sys_utils.GetI2CBus(['Atmel maXTouch Touchscreen']), 3)
def testNoMatch(self):
self.mox.StubOutWithMock(file_utils, 'ReadFile')
file_utils.ReadFile('/proc/bus/input/devices').AndReturn(
SAMPLE_INPUT_DEVICES)
self.mox.ReplayAll()
self.assertEqual(sys_utils.GetI2CBus(['Unknown Device']), None)
def testNonI2CDevice(self):
self.mox.StubOutWithMock(file_utils, 'ReadFile')
file_utils.ReadFile('/proc/bus/input/devices').AndReturn(
SAMPLE_INPUT_DEVICES)
self.mox.ReplayAll()
self.assertEqual(sys_utils.GetI2CBus(['Lid Switch']), None)
SAMPLE_PARTITIONS = """major minor #blocks name
7 0 313564 loop0
179 0 15388672 mmcblk0
179 1 11036672 mmcblk0p1
179 2 16384 mmcblk0p2
179 3 2097152 mmcblk0p3
179 4 16384 mmcblk0p4
179 5 2097152 mmcblk0p5
179 6 0 mmcblk0p6
179 7 0 mmcblk0p7
179 8 16384 mmcblk0p8
179 9 0 mmcblk0p9
179 10 0 mmcblk0p10
179 11 8192 mmcblk0p11
179 12 16384 mmcblk0p12
179 32 4096 mmcblk0boot1
179 16 4096 mmcblk0boot0
254 0 1048576 dm-0
254 1 313564 dm-1
253 0 1953128 zram0"""
class GetPartitionsTest(unittest.TestCase):
"""Unit tests for GetInterrupts."""
def setUp(self):
self.mox = mox.Mox()
def tearDown(self):
self.mox.VerifyAll()
self.mox.UnsetStubs()
def testGetPartitions(self):
self.mox.StubOutWithMock(file_utils, 'ReadLines')
file_utils.ReadLines('/proc/partitions').AndReturn(
SAMPLE_PARTITIONS.split('\n'))
self.mox.ReplayAll()
partitions = sys_utils.GetPartitions()
for p in partitions:
print(unicode(p))
class MountDeviceAndReadFileTest(unittest.TestCase):
"""Unittest for MountDeviceAndReadFile."""
def setUp(self):
# Creates a temp file and create file system on it as a mock device.
self.device = tempfile.NamedTemporaryFile(prefix='MountDeviceAndReadFile')
Spawn(['truncate', '-s', '1M', self.device.name], log=True,
check_call=True)
# In CrOS chroot, mkfs and mkfs.extX may live in different locations that
# normal user can't run without adding /sbin and /usr/sbin.
env = os.environ.copy()
env['PATH'] = '/sbin:/usr/sbin:' + env['PATH']
Spawn(['/sbin/mkfs', '-E', 'root_owner=%d:%d' % (os.getuid(), os.getgid()),
'-F', '-t', 'ext3', self.device.name], log=True, check_call=True,
env=env)
# Creates a file with some content on the device.
mount_point = tempfile.mkdtemp(prefix='MountDeviceAndReadFileSetup')
Spawn(['mount', self.device.name, mount_point], sudo=True, check_call=True,
log=True)
self.content = 'file content'
self.file_name = 'file'
with open(os.path.join(mount_point, self.file_name), 'w') as f:
f.write(self.content)
Spawn(['umount', '-l', mount_point], sudo=True, check_call=True, log=True)
os.rmdir(mount_point)
self.dut = device_utils.CreateDUTInterface()
# Since 'mount', 'umount' requires root privilege, make link.Shell function
# executes commands as root.
# The reason why we don't use 'sudo' in MountPartition() is to make our code
# more general. Because some Android devices don't have 'sudo'.
def _SudoShell(command, stdin=None, stdout=None, stderr=None, cwd=None):
if isinstance(command, basestring):
command = ['sudo', 'sh', '-c', command]
else:
command = ['sudo'] + command
return subprocess.Popen(command, cwd=cwd,
close_fds=True, stdin=stdin,
stdout=stdout, stderr=stderr)
self.dut.link.Shell = _SudoShell
def tearDown(self):
self.device.close()
def testMountDeviceAndReadFile(self):
self.assertEqual(
self.content,
sys_utils.MountDeviceAndReadFile(
self.device.name, self.file_name))
def testMountDeviceAndReadFileWrongFile(self):
with self.assertRaises(IOError):
sys_utils.MountDeviceAndReadFile(self.device.name, 'no_file')
def testMountDeviceAndReadFileWrongDevice(self):
with self.assertRaises(Exception):
sys_utils.MountDeviceAndReadFile('no_device', self.file_name)
def testMountDeviceAndReadFileWithDUT(self):
self.assertEqual(
self.content,
sys_utils.MountDeviceAndReadFile(
self.device.name, self.file_name, self.dut))
def testMountDeviceAndReadFileWrongFileWithDUT(self):
with self.assertRaises(IOError):
sys_utils.MountDeviceAndReadFile(self.device.name, 'no_file', self.dut)
def testMountDeviceAndReadFileWrongDeviceWithDUT(self):
with self.assertRaises(Exception):
sys_utils.MountDeviceAndReadFile('no_device', self.file_name, self.dut)
class TestLogMessagesTest(unittest.TestCase):
def setUp(self):
self.mox = mox.Mox()
def tearDown(self):
self.mox.VerifyAll()
self.mox.UnsetStubs()
def testGetVarLogMessages(self):
with tempfile.NamedTemporaryFile(bufsize=0) as f:
data = ("Captain's log.\xFF\n" # \xFF = invalid UTF-8
'We are in pursuit of a starship of Ferengi design.\n')
f.write(('X' * 100) + '\n' + data)
# Use max_length=len(data) + 5 so that we'll end up reading
# (and discarding) the last 5 bytes of garbage X's.
self.assertEqual(
'<truncated 101 bytes>\n'
"Captain's log.\xFF\n"
'We are in pursuit of a starship of Ferengi design.\n',
sys_utils.GetVarLogMessages(max_length=(len(data) + 5), path=f.name))
dut = device_utils.CreateDUTInterface(board_class='LinuxBoard')
self.assertEqual(
'<truncated 101 bytes>\n'
"Captain's log.\xFF\n"
'We are in pursuit of a starship of Ferengi design.\n',
sys_utils.GetVarLogMessages(
max_length=(len(data) + 5), path=f.name, dut=dut))
def testGetVarLogMessagesBeforeReboot(self):
EARLIER_VAR_LOG_MESSAGES = (
"19:26:17 kernel: That's all, folks.\n"
"19:26:56 kernel: [ 0.000000] Initializing cgroup subsys cpuset\n"
"19:26:56 kernel: [ 0.000000] Initializing cgroup subsys cpu\n"
"19:26:56 kernel: [ 0.000000] Linux version blahblahblah\n")
VAR_LOG_MESSAGES = (
"19:00:00 kernel: 7 p.m. and all's well.\n"
"19:27:17 kernel: That's all, folks.\n"
"19:27:17 kernel: Kernel logging (proc) stopped.\n"
"19:27:56 kernel: imklog 4.6.2, log source = /proc/kmsg started.\n"
"19:27:56 rsyslogd: "
'[origin software="rsyslogd" blahblahblah] (re)start\n'
"19:27:56 kernel: [ 0.000000] Initializing cgroup subsys cpuset\n"
"19:27:56 kernel: [ 0.000000] Initializing cgroup subsys cpu\n"
"19:27:56 kernel: [ 0.000000] Linux version blahblahblah\n"
"19:27:56 kernel: [ 0.000000] Command line: blahblahblah\n")
dut = device_utils.CreateDUTInterface(board_class='LinuxBoard')
with tempfile.NamedTemporaryFile(bufsize=0) as f:
f.write(VAR_LOG_MESSAGES)
self.assertEqual(
("19:27:17 kernel: That's all, folks.\n"
"19:27:17 kernel: Kernel logging (proc) stopped.\n"
"<after reboot, kernel came up at 19:27:56>\n"),
sys_utils.GetVarLogMessagesBeforeReboot(
path=f.name, lines=2, dut=dut))
self.assertEqual(
("19:27:17 kernel: That's all, folks.\n"
"19:27:17 kernel: Kernel logging (proc) stopped.\n"
"<after reboot, kernel came up at 19:27:56>\n"),
sys_utils.GetVarLogMessagesBeforeReboot(path=f.name, lines=2))
self.assertEqual(
("19:27:17 kernel: Kernel logging (proc) stopped.\n"
"<after reboot, kernel came up at 19:27:56>\n"),
sys_utils.GetVarLogMessagesBeforeReboot(path=f.name, lines=1))
self.assertEqual(
("19:00:00 kernel: 7 p.m. and all's well.\n"
"19:27:17 kernel: That's all, folks.\n"
"19:27:17 kernel: Kernel logging (proc) stopped.\n"
"<after reboot, kernel came up at 19:27:56>\n"),
sys_utils.GetVarLogMessagesBeforeReboot(path=f.name, lines=100))
with tempfile.NamedTemporaryFile(bufsize=0) as f:
f.write(EARLIER_VAR_LOG_MESSAGES)
self.assertEqual(
("19:26:17 kernel: That's all, folks.\n"
"<after reboot, kernel came up at 19:26:56>\n"),
sys_utils.GetVarLogMessagesBeforeReboot(path=f.name, lines=1))
class TestGetRunningFactoryPythonArchivePath(unittest.TestCase):
def setUp(self):
self.mox = mox.Mox()
def tearDown(self):
self.mox.UnsetStubs()
def testNotInPythonArchive(self):
sys_utils.__file__ = '/path/to/factory/utils/sys_utils.py'
self.mox.StubOutWithMock(sys_utils.os.path, 'exists')
sys_utils.os.path.exists(sys_utils.__file__).AndReturn(True)
self.mox.ReplayAll()
self.assertEqual(sys_utils.GetRunningFactoryPythonArchivePath(), None)
self.mox.VerifyAll()
def testInPythonFactoryArchive(self):
factory_par = '/path/to/factory.par'
sys_utils.__file__ = '/path/to/factory.par/cros/factory/utils/sys_utils.py'
self.mox.StubOutWithMock(sys_utils.os.path, 'exists')
sys_utils.os.path.exists(sys_utils.__file__).AndReturn(False)
sys_utils.os.path.exists(factory_par).AndReturn(True)
self.mox.ReplayAll()
self.assertEqual(sys_utils.GetRunningFactoryPythonArchivePath(),
factory_par)
self.mox.VerifyAll()
def testNonExistingFileWithoutCrosFactoryPrefix(self):
sys_utils.__file__ = '/path/to/nowhere/utils/sys_utils.py'
self.mox.StubOutWithMock(sys_utils.os.path, 'exists')
sys_utils.os.path.exists(sys_utils.__file__).AndReturn(False)
self.mox.ReplayAll()
self.assertEqual(sys_utils.GetRunningFactoryPythonArchivePath(), None)
self.mox.VerifyAll()
def testNonExistingFactoryPythonArchive(self):
factory_par = '/path/to/factory.par'
sys_utils.__file__ = '/path/to/factory.par/cros/factory/utils/sys_utils.py'
self.mox.StubOutWithMock(sys_utils.os.path, 'exists')
sys_utils.os.path.exists(sys_utils.__file__).AndReturn(False)
sys_utils.os.path.exists(factory_par).AndReturn(False)
self.mox.ReplayAll()
self.assertEqual(sys_utils.GetRunningFactoryPythonArchivePath(), None)
self.mox.VerifyAll()
if __name__ == '__main__':
unittest.main()