blob: ca7ac95ca6536f88ec2d4fc0a6e77b30bfc69223 [file] [log] [blame]
# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test Utils library."""
import multiprocessing
import os
import threading
import time
from chromite.lib import cros_test_lib
from chromite.lib import osutils
from chromite.lib.paygen import utils
# We access a lot of protected members during testing.
# pylint: disable=protected-access
# Tests involving the memory semaphore should block this long.
ACQUIRE_TIMEOUT = 120
ACQUIRE_SHOULD_BLOCK_TIMEOUT = 20
class TestUtils(cros_test_lib.TempDirTestCase):
"""Test utils methods."""
@classmethod
def setUpClass(cls) -> None:
"""Class setup to run system polling quickly in semaphore tests."""
utils.MemoryConsumptionSemaphore.SYSTEM_POLLING_INTERVAL_SECONDS = 0
class MockClock:
"""Mock clock that is manually incremented."""
def __call__(self):
"""Return the current mock time."""
return self._now
def __init__(self) -> None:
"""Init the clock."""
self._now = 0.0
def add_time(self, n) -> None:
"""Add some amount of time."""
self._now += n
def mock_get_system_available(self, how_much):
"""Mock the system's available memory, used to override /proc."""
return lambda: how_much
def testRestrictedAttrDictHashing(self) -> None:
"""Tests that RestrictedAttrDict hashing works in various cases."""
# Temporary variables for sanity.
self.assertEqual(
hash(utils.RestrictedAttrDict()), hash(utils.RestrictedAttrDict())
)
self.assertEqual(utils.RestrictedAttrDict(), utils.RestrictedAttrDict())
# Local variables with differing addresses.
a = utils.RestrictedAttrDict()
b = utils.RestrictedAttrDict()
self.assertEqual(hash(a), hash(b))
self.assertEqual(a, b)
def testRestrictedAttrDictHashingDerivedClasses(self) -> None:
"""Tests that RestrictedAttrDict hashing works in derived classes."""
class A(utils.RestrictedAttrDict):
"""Derived class A."""
_slots = ("foo",)
a = A()
class B(utils.RestrictedAttrDict):
"""Derived class B."""
_slots = ("foo",)
b = B()
self.assertNotEqual(hash(a), hash(b))
self.assertNotEqual(a, b)
def testRestrictedAttrDictHashingDerivedClassesUnorderedSlots(self) -> None:
"""Tests that RestrictedAttrDict hashing works
... in derived classes with unordered slots.
"""
class A(utils.RestrictedAttrDict):
"""Derived class A."""
_slots = ("foo", "bar")
a = A()
class B(utils.RestrictedAttrDict):
"""Derived class B."""
_slots = ("bar", "foo")
b = B()
self.assertNotEqual(hash(a), hash(b))
self.assertNotEqual(a, b)
def testRestrictedAttrDictHashingDerivedClassesUnorderedInitialization(
self,
) -> None:
"""Tests that RestrictedAttrDict hashing works
... in derived class with unordered slots initialization.
"""
class A(utils.RestrictedAttrDict):
"""Derived class A."""
_slots = ("foo", "bar")
a1 = A(foo=1, bar=2)
a2 = A(bar=2, foo=1)
self.assertEqual(hash(a1), hash(a2))
self.assertEqual(a1, a2)
def testRestrictedAttrDictHashingDerivedClassesDataStructures(self) -> None:
"""Tests that RestrictedAttrDict hashing works
... in derived class with data structures.
"""
class A(utils.RestrictedAttrDict):
"""Derived class A."""
_slots = ("foo", "bar", "car", "far")
a1 = A(foo={}, bar=set(), car=[], far=tuple())
a2 = A(car=[], bar=set(), foo={}, far=tuple())
self.assertEqual(hash(a1), hash(a2))
self.assertEqual(a1, a2)
a3 = A(foo={}, bar=set(), car=[], far=tuple())
a4 = A(car=1, bar=set(), foo={}, far=tuple())
self.assertNotEqual(hash(a3), hash(a4))
self.assertNotEqual(a3, a4)
def testListdirFullpath(self) -> None:
file_a = os.path.join(self.tempdir, "a")
file_b = os.path.join(self.tempdir, "b")
osutils.Touch(file_a)
osutils.Touch(file_b)
self.assertEqual(
sorted(utils.ListdirFullpath(self.tempdir)), [file_a, file_b]
)
def testReadLsbRelease(self) -> None:
"""Tests that we correctly read the lsb release file."""
path = os.path.join(self.tempdir, "etc", "lsb-release")
osutils.WriteFile(path, "key=value\nfoo=bar\n", makedirs=True)
self.assertEqual(
utils.ReadLsbRelease(self.tempdir), {"key": "value", "foo": "bar"}
)
def testReadMinorVersion(self) -> None:
"""Tests that we correctly read the update_engine.conf file."""
path = os.path.join(self.tempdir, "etc", "update_engine.conf")
osutils.WriteFile(
path, "PAYLOAD_VERSION=2\nPAYLOAD_MINOR_VERSION=6\n", makedirs=True
)
self.assertEqual(utils.ReadMinorVersion(self.tempdir), "6")
def testMassiveMemoryConsumptionSemaphore(self) -> None:
"""Tests that we block on not having enough memory."""
# You should never get 2**64 bytes.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=2**64,
single_proc_max_bytes=2**64,
quiescence_time_seconds=0.0,
)
# You can't get that much.
self.assertEqual(
_semaphore.acquire(ACQUIRE_SHOULD_BLOCK_TIMEOUT).result, False
)
def testNoMemoryConsumptionSemaphore(self) -> None:
"""Tests that you can acquire a very little amount of memory."""
# You should always get one byte.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=0.0,
)
# Sure you can have two bytes.
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
def testTotalMaxMemoryConsumptionSemaphore(self) -> None:
"""Tests that the total_max is respected."""
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=0,
single_proc_max_bytes=1,
quiescence_time_seconds=0.0,
total_max=3,
)
# Look at all this memory.
_semaphore._get_system_available = self.mock_get_system_available(
2**64
)
# Sure you can have three.
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
# Nope, you're now over max.
self.assertEqual(_semaphore.acquire(1).result, False)
def testQuiesceMemoryConsumptionSemaphore(self) -> None:
"""Tests that you wait for memory utilization to settle (quiesce)."""
# All you want is two bytes.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=2.0,
)
# Should want two bytes, have a whole lot.
_semaphore._get_system_available = self.mock_get_system_available(
2**64
)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
# Should want two bytes, have a whole lot (but you'll block for 2
# seconds).
_semaphore._get_system_available = self.mock_get_system_available(
2**64 - 2
)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
def testUncheckedMemoryConsumptionSemaphore(self) -> None:
"""Tests that some acquires work unchecked."""
# You should never get 2**64 bytes (i wish...).
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=2**64,
single_proc_max_bytes=2**64,
quiescence_time_seconds=2.0,
unchecked_acquires=2,
)
# Nothing available, but we expect unchecked_acquires to allow it.
_semaphore._get_system_available = self.mock_get_system_available(0)
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
def testQuiescenceUnblocksMemoryConsumptionSemaphore(self) -> None:
"""Test that after a period of time you unblock (due to quiescence)."""
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=2.0,
unchecked_acquires=0,
)
# Make large amount of memory available, but we expect quiescence
# to block the second task.
_semaphore._get_system_available = self.mock_get_system_available(
2**64
)
start_time = time.time()
self.assertEqual(_semaphore.acquire(ACQUIRE_TIMEOUT).result, True)
_semaphore.release()
# Get the lock or die trying. We spin fast here instead of
# ACQUIRE_TIMEOUT.
while not _semaphore.acquire(1).result:
continue
_semaphore.release()
# Check that the lock was acquired after quiescence_time_seconds.
end_time = time.time()
# Why 1.8? Because the clock isn't monotonic and we don't want to flake.
self.assertGreaterEqual(end_time - start_time, 1.8)
def testThreadedMemoryConsumptionSemaphore(self) -> None:
"""Test many threads simultaneously using the Semaphore."""
initial_memory = 6
# These are lists so we can write nonlocal.
mem_avail = [initial_memory]
good_thread_exits = [0]
mock_clock = TestUtils.MockClock()
lock, exit_lock = threading.Lock(), threading.Lock()
test_threads = 8
# Currently executes in 1.6 seconds a 2 x Xeon Gold 6154 CPUs
get_and_releases = 50
def sub_mem() -> None:
with lock:
mem_avail[0] = mem_avail[0] - 1
self.assertGreaterEqual(mem_avail[0], 0)
def add_mem() -> None:
with lock:
mem_avail[0] = mem_avail[0] + 1
self.assertGreaterEqual(mem_avail[0], 0)
def get_mem():
with lock:
return mem_avail[0]
# Ask for two bytes available each time.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=0.1,
unchecked_acquires=1,
clock=mock_clock,
)
_semaphore._get_system_available = get_mem
def hammer_semaphore() -> None:
for _ in range(get_and_releases):
while not _semaphore.acquire(0.1).result:
continue
# Simulate 'using the memory'.
sub_mem()
time.sleep(0.1)
add_mem()
_semaphore.release()
with exit_lock:
good_thread_exits[0] = good_thread_exits[0] + 1
threads = [
threading.Thread(target=hammer_semaphore)
for _ in range(test_threads)
]
for x in threads:
x.daemon = True
x.start()
# ~Maximum 600 seconds realtime, keeps clock ticking for overall
# timeout.
for _ in range(60000):
time.sleep(0.01)
mock_clock.add_time(0.1)
# Maybe we can break early? (and waste some time for other threads).
threads_dead = [not x.is_alive() for x in threads]
if all(threads_dead):
break
# If we didn't get here a thread did not exit. This is fatal and may
# indicate a deadlock has been introduced.
self.assertEqual(initial_memory, get_mem())
self.assertEqual(good_thread_exits[0], test_threads)
def testMultiProcessedMemoryConsumptionSemaphore(self) -> None:
"""Test many processes simultaneously using the Semaphore."""
initial_memory = 6
mem_avail = multiprocessing.Value("I", initial_memory, lock=True)
good_process_exits = multiprocessing.Value("I", 0, lock=True)
n_processes = 4
# Currently executes in 10 seconds a 2 x Xeon Gold 6154 CPUs.
get_and_releases = 25
def sub_mem() -> None:
with mem_avail.get_lock():
mem_avail.value -= 1
self.assertGreaterEqual(mem_avail.value, 0)
def add_mem() -> None:
with mem_avail.get_lock():
mem_avail.value += 1
self.assertLessEqual(mem_avail.value, 6)
def get_mem():
with mem_avail.get_lock():
return mem_avail.value
# Ask for two bytes available each time.
_semaphore = utils.MemoryConsumptionSemaphore(
system_available_buffer_bytes=1,
single_proc_max_bytes=1,
quiescence_time_seconds=0.1,
unchecked_acquires=1,
)
_semaphore._get_system_available = get_mem
def hammer_semaphore() -> None:
for _ in range(get_and_releases):
while not _semaphore.acquire(0.1).result:
continue
# Simulate 'using the memory'.
sub_mem()
time.sleep(0.1)
add_mem()
_semaphore.release()
with good_process_exits.get_lock():
good_process_exits.value = good_process_exits.value + 1
processes = [
multiprocessing.Process(target=hammer_semaphore)
for _ in range(n_processes)
]
for p in processes:
p.daemon = True
p.start()
for p in processes:
p.join()
# If we didn't get here a proc did not exit. This is fatal and may
# indicate a deadlock has been introduced.
self.assertEqual(initial_memory, get_mem())
with good_process_exits.get_lock():
self.assertEqual(good_process_exits.value, n_processes)