blob: 2965b935d6d33c4ea2b3382e45e71656bccf3472 [file]
# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test gslock library."""
import multiprocessing
import pytest
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import gs
from chromite.lib.paygen import gslock
# We access a lot of protected members during testing.
# pylint: disable=protected-access
def _InProcessAcquire(lock_uri):
"""Acquire a lock in a sub-process, but don't release.
This helper has to be pickleable, so can't be a member of the test class.
Args:
lock_uri: URI of the lock to acquire.
Returns:
boolean telling if this method got the lock.
"""
lock = gslock.Lock(lock_uri)
try:
lock.Acquire()
return True
except gslock.LockNotAcquired:
return False
def _InProcessDoubleAcquire(lock_uri):
"""Acquire a lock in a sub-process, and reacquire it a second time.
Do not release the lock after acquiring.
This helper has to be pickleable, so can't be a member of the test class.
Args:
lock_uri: URI of the lock to acquire.
Returns:
int describing how many times it acquired a lock.
"""
count = 0
lock = gslock.Lock(lock_uri)
try:
lock.Acquire()
count += 1
lock.Acquire()
count += 1
except gslock.LockNotAcquired:
pass
return count
def _InProcessDataUpdate(lock_uri_data_uri):
"""Increment a number in a GS file protected by a lock.
Keeps looking until the lock is acquired, so effectively, blocking. Stores
or increments an integer in the data_uri by one, once.
This helper has to be pickleable, so can't be a member of the test class.
Args:
lock_uri_data_uri: Tuple containing (lock_uri, data_uri). Passed as a
tuple, since multiprocessing.Pool.map only allows a single argument
in.
lock_uri: URI of the lock to acquire.
data_uri: URI of the data file to create/increment.
Returns:
boolean describing if this method got the lock.
"""
lock_uri, data_uri = lock_uri_data_uri
ctx = gs.GSContext()
# Keep trying until the lock is acquired.
while True:
try:
with gslock.Lock(lock_uri):
if ctx.Exists(data_uri):
data = int(ctx.Cat(data_uri)) + 1
else:
data = 1
ctx.CreateWithContents(data_uri, str(data))
return True
except gslock.LockNotAcquired:
pass
class GSLockTest(cros_test_lib.MockTestCase):
"""This test suite covers the GSLock file."""
# For contention tests, how many parallel workers to spawn. To really
# stress test, you can bump it up to 200, but 20 seems to provide good
# coverage w/out sucking up too many resources.
NUM_THREADS = 20
@pytest.mark.network_test
def setUp(self) -> None:
self.ctx = gs.GSContext()
@pytest.mark.network_test
def testLock(self) -> None:
"""Test getting a lock."""
# Force a known host name.
self.PatchObject(
cros_build_lib, "MachineDetails", return_value="TestHost"
)
with gs.TemporaryURL(self.ctx, "gslock") as lock_uri:
lock = gslock.Lock(lock_uri)
self.assertFalse(self.ctx.Exists(lock_uri))
lock.Acquire()
self.assertTrue(self.ctx.Exists(lock_uri))
contents = self.ctx.Cat(lock_uri)
self.assertEqual(contents, "TestHost")
lock.Release()
self.assertFalse(self.ctx.Exists(lock_uri))
@pytest.mark.network_test
def testLockRepetition(self) -> None:
"""Test acquiring same lock multiple times."""
# Force a known host name.
self.PatchObject(
cros_build_lib, "MachineDetails", return_value="TestHost"
)
with gs.TemporaryURL(self.ctx, "gslock") as lock_uri:
lock = gslock.Lock(lock_uri)
self.assertFalse(self.ctx.Exists(lock_uri))
lock.Acquire()
self.assertTrue(self.ctx.Exists(lock_uri))
lock.Acquire()
self.assertTrue(self.ctx.Exists(lock_uri))
lock.Release()
self.assertFalse(self.ctx.Exists(lock_uri))
lock.Acquire()
self.assertTrue(self.ctx.Exists(lock_uri))
lock.Release()
self.assertFalse(self.ctx.Exists(lock_uri))
@pytest.mark.network_test
def testLockConflict(self) -> None:
"""Test lock conflict."""
with gs.TemporaryURL(self.ctx, "gslock") as lock_uri:
lock1 = gslock.Lock(lock_uri)
lock2 = gslock.Lock(lock_uri)
# Manually lock 1, and ensure lock2 can't lock.
lock1.Acquire()
self.assertRaises(gslock.LockNotAcquired, lock2.Acquire)
lock1.Release()
# Use a with clause on 2, and ensure 1 can't lock.
with lock2:
self.assertRaises(gslock.LockNotAcquired, lock1.Acquire)
# Ensure we can renew a given lock.
lock1.Acquire()
lock1.Renew()
lock1.Release()
# Ensure we get an error renewing a lock we don't hold.
self.assertRaises(gslock.LockNotAcquired, lock1.Renew)
@pytest.mark.network_test
def testLockTimeout(self) -> None:
"""Test getting a lock when an old timed out one is present."""
with gs.TemporaryURL(self.ctx, "gslock") as lock_uri:
# Both locks are always timed out.
lock1 = gslock.Lock(lock_uri, lock_timeout_mins=-1)
lock2 = gslock.Lock(lock_uri, lock_timeout_mins=-1)
lock1.Acquire()
lock2.Acquire()
@pytest.mark.network_test
def testRaceToAcquire(self) -> None:
"""Have lots of processes race to acquire the same lock."""
count = self.NUM_THREADS
with (
gs.TemporaryURL(self.ctx, "gslock") as lock_uri,
multiprocessing.Pool(processes=count) as pool,
):
results = pool.map(_InProcessAcquire, [lock_uri] * count)
# Clean up the lock since the processes explicitly only acquire.
self.ctx.Remove(lock_uri)
# Ensure that only one of them got the lock.
self.assertEqual(results.count(True), 1)
@pytest.mark.network_test
def testRaceToDoubleAcquire(self) -> None:
"""Have lots of processes race to double acquire the same lock."""
count = self.NUM_THREADS
with (
gs.TemporaryURL(self.ctx, "gslock") as lock_uri,
multiprocessing.Pool(processes=count) as pool,
):
results = pool.map(_InProcessDoubleAcquire, [lock_uri] * count)
# Clean up the lock since the processes explicitly only acquire.
self.ctx.Remove(lock_uri)
# Ensure that only one of them got the lock (and got it twice).
self.assertEqual(results.count(0), count - 1)
self.assertEqual(results.count(2), 1)
@pytest.mark.network_test
def testMultiProcessDataUpdate(self) -> None:
"""Have lots of processes update a GS file protected by a lock."""
count = self.NUM_THREADS
with (
gs.TemporaryURL(self.ctx, "gslock") as lock_uri,
multiprocessing.Pool(processes=count) as pool,
):
data_uri = lock_uri + ".data"
results = pool.map(
_InProcessDataUpdate, [(lock_uri, data_uri)] * count
)
self.assertEqual(self.ctx.Cat(data_uri), str(count))
# Ensure that all report success
self.assertEqual(results.count(True), count)
@pytest.mark.network_test
def testDryrunLock(self) -> None:
"""Ensure that lock can be obtained and released in dry-run mode."""
with gs.TemporaryURL(self.ctx, "gslock") as lock_uri:
lock = gslock.Lock(lock_uri, dry_run=True)
self.assertIsNone(lock.Acquire())
self.assertFalse(self.ctx.Exists(lock_uri))
self.assertIsNone(lock.Release())
@pytest.mark.network_test
def testDryrunLockRepetition(self) -> None:
"""Test acquiring same lock multiple times in dry-run mode."""
with gs.TemporaryURL(self.ctx, "gslock") as lock_uri:
lock = gslock.Lock(lock_uri, dry_run=True)
self.assertIsNone(lock.Acquire())
self.assertIsNone(lock.Acquire())
self.assertIsNone(lock.Release())
self.assertIsNone(lock.Acquire())
self.assertIsNone(lock.Release())