blob: bef40ece1ffbc2b3f0c85a66871177632e4563cb [file]
#!/usr/bin/env python
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import logging
import os
import sys
import tempfile
import unittest
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(
__file__.decode(sys.getfilesystemencoding()))))
sys.path.insert(0, ROOT_DIR)
sys.path.insert(0, os.path.join(ROOT_DIR, 'third_party'))
from depot_tools import fix_encoding
from utils import file_path
from utils import fs
import named_cache
def write_file(path, contents):
with open(path, 'wb') as f:
f.write(contents)
def read_file(path):
with open(path, 'rb') as f:
return f.read()
class CacheManagerTest(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp(prefix=u'named_cache_test')
self.manager = named_cache.CacheManager(self.tempdir)
def tearDown(self):
try:
file_path.rmtree(self.tempdir)
finally:
super(CacheManagerTest, self).tearDown()
def make_caches(self, names):
dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test')
try:
names = map(unicode, names)
for n in names:
self.manager.install(os.path.join(dest_dir, n), n)
self.assertEqual(set(names), set(os.listdir(dest_dir)))
for n in names:
self.manager.uninstall(os.path.join(dest_dir, n), n)
self.assertEqual([], os.listdir(dest_dir))
self.assertTrue(self.manager.available.issuperset(names))
finally:
file_path.rmtree(dest_dir)
def test_get_oldest(self):
with self.manager.open():
self.assertIsNone(self.manager.get_oldest())
self.make_caches(range(10))
self.assertEqual(self.manager.get_oldest(), u'0')
def test_get_timestamp(self):
now = 0
time_fn = lambda: now
with self.manager.open(time_fn=time_fn):
for i in xrange(10):
self.make_caches([i])
now += 1
for i in xrange(10):
self.assertEqual(i, self.manager.get_timestamp(str(i)))
def test_clean_cache(self):
dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test')
with self.manager.open():
self.assertEqual([], os.listdir(self.manager.root_dir))
a_path = os.path.join(dest_dir, u'a')
b_path = os.path.join(dest_dir, u'b')
self.manager.install(a_path, u'1')
self.manager.install(b_path, u'2')
self.assertEqual({u'a', u'b'}, set(os.listdir(dest_dir)))
self.assertFalse(self.manager.available)
self.assertEqual([], os.listdir(self.manager.root_dir))
write_file(os.path.join(a_path, u'x'), u'x')
write_file(os.path.join(b_path, u'y'), u'y')
self.manager.uninstall(a_path, u'1')
self.manager.uninstall(b_path, u'2')
self.assertEqual(3, len(os.listdir(self.manager.root_dir)))
path1 = os.path.join(self.manager.root_dir, self.manager._lru['1'])
path2 = os.path.join(self.manager.root_dir, self.manager._lru['2'])
self.assertEqual('x', read_file(os.path.join(path1, u'x')))
self.assertEqual('y', read_file(os.path.join(path2, u'y')))
self.assertEqual(os.readlink(self.manager._get_named_path('1')), path1)
self.assertEqual(os.readlink(self.manager._get_named_path('2')), path2)
def test_existing_cache(self):
dest_dir = tempfile.mkdtemp(prefix=u'named_cache_test')
with self.manager.open():
# Assume test_clean passes.
a_path = os.path.join(dest_dir, u'a')
b_path = os.path.join(dest_dir, u'b')
self.manager.install(a_path, u'1')
write_file(os.path.join(dest_dir, u'a', u'x'), u'x')
self.manager.uninstall(a_path, u'1')
# Test starts here.
self.manager.install(a_path, u'1')
self.manager.install(b_path, u'2')
self.assertEqual({'a', 'b'}, set(os.listdir(dest_dir)))
self.assertFalse(self.manager.available)
self.assertEqual(['named'], os.listdir(self.manager.root_dir))
self.assertEqual(
'x', read_file(os.path.join(os.path.join(dest_dir, u'a', u'x'))))
write_file(os.path.join(a_path, 'x'), 'x2')
write_file(os.path.join(b_path, 'y'), 'y')
self.manager.uninstall(a_path, '1')
self.manager.uninstall(b_path, '2')
self.assertEqual(3, len(os.listdir(self.manager.root_dir)))
path1 = os.path.join(self.manager.root_dir, self.manager._lru['1'])
path2 = os.path.join(self.manager.root_dir, self.manager._lru['2'])
self.assertEqual('x2', read_file(os.path.join(path1, 'x')))
self.assertEqual('y', read_file(os.path.join(path2, 'y')))
self.assertEqual(os.readlink(self.manager._get_named_path('1')), path1)
self.assertEqual(os.readlink(self.manager._get_named_path('2')), path2)
def test_trim(self):
with self.manager.open():
item_count = named_cache.MAX_CACHE_SIZE + 10
self.make_caches(range(item_count))
self.assertEqual(len(self.manager), item_count)
self.manager.trim(None)
self.assertEqual(len(self.manager), named_cache.MAX_CACHE_SIZE)
self.assertEqual(
set(map(str, xrange(10, 10 + named_cache.MAX_CACHE_SIZE))),
set(os.listdir(os.path.join(self.tempdir, 'named'))))
def test_corrupted(self):
with open(os.path.join(self.tempdir, u'state.json'), 'w') as f:
f.write('}}}}')
fs.makedirs(os.path.join(self.tempdir, 'a'), 0777)
with self.manager.open():
self.assertFalse(os.path.isdir(self.tempdir))
self.make_caches(['a'])
self.assertTrue(fs.islink(os.path.join(self.tempdir, 'named', 'a')))
if __name__ == '__main__':
fix_encoding.fix_encoding()
VERBOSE = '-v' in sys.argv
logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR)
unittest.main()