blob: 1d747259b5107addc5d9837fbbc01e87f52b9cd3 [file] [log] [blame]
#!/usr/bin/env vpython3
# Copyright 2014 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import logging
import math
import os
import platform
import re
import shutil
import socket
import subprocess
import sys
import tempfile
import time
import unittest
import mock
import six
import test_env_api
test_env_api.setup_test_env()
from api import platforms
from depot_tools import auto_stub
from utils import file_path
# Disable caching before importing os_utilities.
from utils import tools
tools.cached = lambda func: func
import os_utilities
class TestOsUtilities(auto_stub.TestCase):
def setUp(self):
super(TestOsUtilities, self).setUp()
tools.clear_cache_all()
def tearDown(self):
super(TestOsUtilities, self).tearDown()
tools.clear_cache_all()
def test_get_os_name(self):
expected = (u'Debian', u'Linux', u'Mac', u'Raspbian', u'Ubuntu', u'Windows')
self.assertIn(os_utilities.get_os_name(), expected)
def test_get_cpu_type(self):
actual = os_utilities.get_cpu_type()
if actual == u'x86':
return
self.assertTrue(actual.startswith(u'arm'), actual)
@unittest.skipUnless(
sys.platform.startswith('linux'), 'this is only for linux')
def test_get_os_values_linux(self):
with mock.patch(
'platforms.linux.get_os_version_number', lambda: '16.04.6'), mock.patch(
'os_utilities.get_os_name', lambda: 'Ubuntu'):
self.assertEqual(
os_utilities.get_os_values(),
['Linux', 'Ubuntu', 'Ubuntu-16', 'Ubuntu-16.04', 'Ubuntu-16.04.6'])
@unittest.skipUnless(sys.platform == 'darwin', 'this is only for Mac')
def test_get_os_values_mac(self):
with mock.patch(
'platforms.osx.get_os_version_number',
return_value='10.15.5'), mock.patch(
'platforms.osx.get_os_build_version', return_value='19F101'):
self.assertEqual(
os_utilities.get_os_values(),
['Mac', 'Mac-10', 'Mac-10.15', 'Mac-10.15.5', 'Mac-10.15.5-19F101'])
def test_get_cpu_type_mips(self):
self.mock(platform, 'machine', lambda: 'mips64')
self.assertEqual(os_utilities.get_cpu_type(), u'mips')
def test_get_cpu_bitness(self):
expected = (u'32', u'64')
self.assertIn(os_utilities.get_cpu_bitness(), expected)
def test_get_cpu_dimensions(self):
values = os_utilities.get_cpu_dimensions()
self.assertGreater(len(values), 1)
def test_get_cpu_dimensions_mips(self):
if six.PY2:
self.mock(sys, 'platform', 'linux2')
else:
self.mock(sys, 'platform', 'linux')
self.mock(platform, 'machine', lambda: 'mips64')
self.mock(os_utilities, 'get_cpuinfo',
lambda: {u'name': 'Cavium Octeon II V0.1'})
self.mock(sys, 'maxsize', 2**31 - 1)
self.assertEqual(
os_utilities.get_cpu_dimensions(),
[u'mips', u'mips-32', u'mips-32-Cavium_Octeon_II_V0.1'])
def test_parse_intel_model(self):
examples = [
('Intel(R) Core(TM) i5-5200U CPU @ 2.20GHz', 'i5-5200U'),
('Intel(R) Core(TM) i7-2635QM CPU @ 2.00GHz', 'i7-2635QM'),
('Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz', 'i7-4578U'),
('Intel(R) Core(TM)2 Duo CPU P8600 @ 2.40GHz', 'P8600'),
('Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz', 'i7-4870HQ'),
('Intel(R) Core(TM) i7-6700T CPU @ 2.80GHz', 'i7-6700T'),
('Intel(R) Pentium(R) CPU N3710 @ 1.60GHz', 'N3710'),
('Intel(R) Xeon(R) CPU E3-1220 V2 @ 3.10GHz', 'E3-1220 V2'),
('Intel(R) Xeon(R) CPU E3-1230 v3 @ 3.30GHz', 'E3-1230 v3'),
('Intel(R) Xeon(R) CPU E5-2670 0 @ 2.60GHz', 'E5-2670'),
('Intel(R) Xeon(R) CPU E5-2697 v2 @ 2.70GHz', 'E5-2697 v2'),
# As generated by platforms.gce.get_cpuinfo():
('Intel(R) Xeon(R) CPU Sandy Bridge GCE', 'Sandy Bridge GCE'),
('Intel(R) Xeon(R) CPU @ 2.30GHz', None),
]
for i, expected in examples:
actual = os_utilities._parse_intel_model(i)
self.assertEqual(expected, actual)
@unittest.skipUnless(six.PY2, 'Python2 only')
def test_get_python_versions_py2(self):
versions = os_utilities.get_python_versions()
self.assertEqual(len(versions), 3)
self.assertEqual(versions[0], u'2')
self.assertEqual(versions[1], u'2.7')
# we don't know which micro version we use in test.
self.assertRegexpMatches(versions[2], u'2.7.[0-9]+')
@unittest.skipUnless(six.PY3, 'Python3 only')
def test_get_python_versions_py3(self):
versions = os_utilities.get_python_versions()
self.assertEqual(len(versions), 3)
self.assertEqual(versions[0], u'3')
self.assertEqual(versions[1], u'3.8')
# we don't know which micro version we use in test.
self.assertRegex(versions[2], u'3.8.[0-9]+')
def test_get_ip(self):
ip = os_utilities.get_ip()
self.assertNotEqual('127.0.0.1', ip)
ipv4 = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'
ipv6 = r'^%s$' % ':'.join([r'[0-9a-f]{1,4}'] * 8)
self.assertTrue(re.match(ipv4, ip) or re.match(ipv6, ip), ip)
def test_get_num_processors(self):
self.assertGreater(os_utilities.get_num_processors(), 0)
def test_get_physical_ram(self):
self.assertGreater(os_utilities.get_physical_ram(), 0)
def test_get_disks_info(self):
info = os_utilities.get_disks_info()
self.assertGreater(len(info), 0)
root_path = u'c:\\' if sys.platform == 'win32' else u'/'
root = info[root_path]
# Round the same way.
free_disk = round(
float(file_path.get_free_space(root_path)) / 1024. / 1024., 1)
delta = math.fabs(free_disk - root['free_mb'])
# Check that they are mostly equal. There can be some gitter as there is
# disk I/O during the two calls.
self.assertLess(delta, 2., (delta, free_disk, root['free_mb']))
def test_get_gpu(self):
actual = os_utilities.get_gpu()
self.assertTrue(actual is None or actual)
def test_get_dimensions(self):
dimensions = os_utilities.get_dimensions()
for key, values in dimensions.items():
self.assertIsInstance(key, six.text_type)
self.assertIsInstance(values, list)
for value in values:
self.assertIsInstance(value, six.text_type)
actual = set(dimensions)
# Only set when the process is running in a properly configured GUI context.
actual.discard(u'locale')
# Only set on machines with SSD.
actual.discard(u'ssd')
# There are cases where this dimension is not set.
actual.discard(u'machine_type')
# Only set on ARM Linux machines.
actual.discard(u'device_tree_compatible')
# Only set on bare metal Linux machines.
actual.discard(u'cpu_governor')
# Only set on Windows machines.
actual.discard(u'visual_studio_version')
# Only set on Windows machines.
actual.discard(u'windows_client_version')
expected = {
u'cores', u'cpu', u'gce', u'gpu', u'id', u'os', u'pool', u'python'}
if platforms.is_gce():
expected.add(u'image')
expected.add(u'zone')
expected.add(u'gcp')
if sys.platform == 'darwin':
expected.add(u'mac_model')
# Bot may not have HiDPI and Xcode preinstalled
actual.discard(u'hidpi')
actual.discard(u'xcode_version')
actual.discard(u'device') # iOS devices
if sys.platform.startswith('linux'):
expected.add(u'inside_docker')
expected.add(u'kvm')
if sys.platform == 'win32':
expected.add(u'integrity')
self.assertEqual(expected, actual)
def test_override_id_via_env(self):
mock_env = os.environ.copy()
mock_env['SWARMING_BOT_ID'] = 'customid'
self.mock(os, 'environ', mock_env)
dimensions = os_utilities.get_dimensions()
self.assertIsInstance(dimensions[u'id'], list)
self.assertEqual(len(dimensions[u'id']), 1)
self.assertIsInstance(dimensions[u'id'][0], six.text_type)
self.assertEqual(dimensions[u'id'][0], u'customid')
@unittest.skipIf(sys.platform == 'win32', 'TODO(crbug.com/1017545)')
def test_get_state(self):
actual = os_utilities.get_state()
actual.pop('reboot_required', None)
actual.pop('temp', None)
expected = {
u'audio',
u'cost_usd_hour',
u'cpu_name',
u'cwd',
u'disks',
u'env',
u'gpu',
u'ip',
u'hostname',
u'nb_files_in_temp',
u'pid',
u'python',
u'ram',
u'running_time',
u'ssd',
u'started_ts',
u'uptime',
u'user',
}
if sys.platform in ('cygwin', 'win32'):
expected.add(u'cygwin')
if sys.platform == 'darwin':
expected.add(u'xcode')
if sys.platform == 'win32':
expected.add(u'integrity')
if u'quarantined' in actual:
self.fail(actual[u'quarantined'])
self.assertEqual(expected, set(actual))
def test_get_hostname_gce_docker(self):
self.mock(platforms, 'is_gce', lambda: True)
self.mock(os.path, 'isfile', lambda _: True)
self.mock(socket, 'getfqdn', lambda: 'dockerhost')
self.assertEqual(os_utilities.get_hostname(), 'dockerhost')
def test_get_hostname_gce_nodocker(self):
self.mock(platforms, 'is_gce', lambda: True)
self.mock(os.path, 'isfile', lambda _: False)
manual_mock = not hasattr(platforms, 'gce')
if manual_mock:
# On macOS.
class Mock(object):
def get_metadata(self):
return None
platforms.gce = Mock()
try:
self.mock(platforms.gce, 'get_metadata',
lambda: {'instance': {'hostname': 'gcehost'}})
self.assertEqual(os_utilities.get_hostname(), 'gcehost')
finally:
if manual_mock:
del platforms.gce
def test_get_hostname_nogce(self):
self.mock(platforms, 'is_gce', lambda: False)
self.mock(os.path, 'isfile', lambda _: False)
self.mock(socket, 'getfqdn', lambda: 'somehost')
self.assertEqual(os_utilities.get_hostname(), 'somehost')
def test_get_hostname_macos(self):
self.mock(platforms, 'is_gce', lambda: False)
self.mock(os.path, 'isfile', lambda _: False)
self.mock(socket, 'getfqdn', lambda: 'somehost.in-addr.arpa')
self.mock(socket, 'gethostname', lambda: 'somehost')
self.assertEqual(os_utilities.get_hostname(), 'somehost')
def test_setup_auto_startup_win(self):
# TODO(maruel): Figure out a way to test properly.
pass
def test_setup_auto_startup_osx(self):
# TODO(maruel): Figure out a way to test properly.
pass
def test_host_reboot(self):
class Foo(Exception):
pass
def raise_exception(x):
raise x
self.mock(subprocess, 'check_call', lambda _: None)
self.mock(time, 'sleep', lambda _: raise_exception(Foo()))
self.mock(logging, 'error', lambda *_: None)
with self.assertRaises(Foo):
os_utilities.host_reboot()
def test_host_reboot_and_return(self):
self.mock(subprocess, 'check_call', lambda _: None)
self.assertIs(True, os_utilities.host_reboot_and_return())
def test_host_reboot_and_return_with_message(self):
self.mock(subprocess, 'check_call', lambda _: None)
self.assertIs(True, os_utilities.host_reboot_and_return(message='Boo'))
def test_host_reboot_with_timeout(self):
self.mock(subprocess, 'check_call', lambda _: None)
self.mock(logging, 'error', lambda *_: None)
now = [0]
def mock_sleep(dt):
now[0] += dt
self.mock(time, 'sleep', mock_sleep)
self.mock(time, 'time', lambda: now[0])
self.assertFalse(os_utilities.host_reboot(timeout=60))
self.assertEqual(time.time(), 60)
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
unittest.main()