| # Copyright 2018 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """Define local cache policies.""" |
| |
| from __future__ import print_function |
| |
| import errno |
| import hashlib |
| import io |
| import logging |
| import os |
| import random |
| import string |
| import subprocess |
| import sys |
| import time |
| |
| from utils import file_path |
| from utils import fs |
| from utils import lru |
| from utils import threading_utils |
| from utils import tools |
| from utils import logging_utils |
| |
| # The file size to be used when we don't know the correct file size, |
| # generally used for .isolated files. |
| UNKNOWN_FILE_SIZE = None |
| |
| |
| def file_write(path, content_generator): |
| """Writes file content as generated by content_generator. |
| |
| Creates the intermediary directory as needed. |
| |
| Returns the number of bytes written. |
| |
| Meant to be mocked out in unit tests. |
| """ |
| file_path.ensure_tree(os.path.dirname(path)) |
| total = 0 |
| with fs.open(path, 'wb') as f: |
| for d in content_generator: |
| total += len(d) |
| f.write(d) |
| return total |
| |
| |
| def is_valid_file(path, size): |
| """Returns if the given files appears valid. |
| |
| Currently it just checks the file exists and its size matches the expectation. |
| """ |
| if size == UNKNOWN_FILE_SIZE: |
| return fs.isfile(path) |
| try: |
| actual_size = fs.stat(path).st_size |
| except OSError as e: |
| logging.warning('Can\'t read item %s, assuming it\'s invalid: %s', |
| os.path.basename(path), e) |
| return False |
| if size != actual_size: |
| logging.warning( |
| 'Found invalid item %s; %d != %d', |
| os.path.basename(path), actual_size, size) |
| return False |
| return True |
| |
| |
| def trim_caches(caches, path, min_free_space, max_age_secs): |
| """Trims multiple caches. |
| |
| The goal here is to coherently trim all caches in a coherent LRU fashion, |
| deleting older items independent of which container they belong to. |
| |
| Two policies are enforced first: |
| - max_age_secs |
| - min_free_space |
| |
| Once that's done, then we enforce each cache's own policies. |
| |
| Returns: |
| Slice containing the size of all items evicted. |
| """ |
| min_ts = time.time() - max_age_secs if max_age_secs else 0 |
| free_disk = file_path.get_free_space(path) if min_free_space else 0 |
| logging_utils.user_logs( |
| "Trimming caches. min_ts: %d, free_disk: %d, min_free_space: %d", min_ts, |
| free_disk, min_free_space) |
| total = [] |
| if min_ts or free_disk: |
| while True: |
| oldest = [(c, c.oldest_evictable_ts()) for c in caches if len(c) > 0] |
| # remove the None entries which may appear in oldest_evictable_ts |
| oldest = [(c, age) for c, age in oldest if age] |
| if not oldest: |
| break |
| oldest.sort(key=lambda k: k[1]) |
| c, ts = oldest[0] |
| if ts >= min_ts and free_disk >= min_free_space: |
| break |
| total.append(c.remove_oldest_evictable_item()) |
| if min_free_space: |
| free_disk = file_path.get_free_space(path) |
| logging.info("free_disk after removing oldest entries: %d", free_disk) |
| # Evaluate each cache's own policies. |
| for c in caches: |
| logging_utils.user_logs("trimming cache with dir %s", c.cache_dir) |
| total.extend(c.trim()) |
| return total |
| |
| |
| class NamedCacheError(Exception): |
| """Named cache specific error.""" |
| |
| |
| class NoMoreSpace(Exception): |
| """Not enough space to map the whole directory.""" |
| |
| |
| class CachePolicies: |
| def __init__(self, max_cache_size, min_free_space, max_items, max_age_secs): |
| """Common caching policies for the multiple caches (isolated, named, cipd). |
| |
| Arguments: |
| - max_cache_size: Trim if the cache gets larger than this value. If 0, the |
| cache is effectively a leak. |
| - min_free_space: Trim if disk free space becomes lower than this value. If |
| 0, it will unconditionally fill the disk. |
| - max_items: Maximum number of items to keep in the cache. If 0, do not |
| enforce a limit. |
| - max_age_secs: Maximum age an item is kept in the cache until it is |
| automatically evicted. Having a lot of dead luggage slows |
| everything down. |
| """ |
| self.max_cache_size = max_cache_size |
| self.min_free_space = min_free_space |
| self.max_items = max_items |
| self.max_age_secs = max_age_secs |
| |
| def __str__(self): |
| return ('CachePolicies(max_cache_size=%s (%.3f GiB); max_items=%s; ' |
| 'min_free_space=%s (%.3f GiB); max_age_secs=%s)') % ( |
| self.max_cache_size, float(self.max_cache_size) / 1024**3, |
| self.max_items, self.min_free_space, |
| float(self.min_free_space) / 1024**3, self.max_age_secs) |
| |
| |
| class CacheMiss(Exception): |
| """Raised when an item is not in cache.""" |
| def __init__(self, digest): |
| self.digest = digest |
| super(CacheMiss, |
| self).__init__('Item with digest %r is not found in cache' % digest) |
| |
| |
| class Cache: |
| |
| def __init__(self, cache_dir): |
| if cache_dir is not None: |
| assert isinstance(cache_dir, str), cache_dir |
| assert file_path.isabs(cache_dir), cache_dir |
| self.cache_dir = cache_dir |
| self._lock = threading_utils.LockWithAssert() |
| # Profiling values. |
| self._added = [] |
| self._used = [] |
| |
| def __nonzero__(self): |
| """A cache is always True. |
| |
| Otherwise it falls back to __len__, which is surprising. |
| """ |
| return True |
| |
| def __bool__(self): |
| """A cache is always True. |
| |
| Otherwise it falls back to __len__, which is surprising. |
| """ |
| return True |
| |
| def __len__(self): |
| """Returns the number of entries in the cache.""" |
| raise NotImplementedError() |
| |
| def __iter__(self): |
| """Iterates over all the entries names.""" |
| raise NotImplementedError() |
| |
| def __contains__(self, name): |
| """Returns if an entry is in the cache.""" |
| raise NotImplementedError() |
| |
| @property |
| def total_size(self): |
| """Returns the total size of the cache in bytes.""" |
| raise NotImplementedError() |
| |
| @property |
| def added(self): |
| """Returns a list of the size for each entry added.""" |
| with self._lock: |
| return self._added[:] |
| |
| @property |
| def used(self): |
| """Returns a list of the size for each entry used.""" |
| with self._lock: |
| return self._used[:] |
| |
| def oldest_evictable_ts(self): |
| """Returns timestamp of oldest cache entry or None. If there are items in |
| the cache which are not evictable then return None. |
| |
| Returns: |
| Timestamp of the oldest item. |
| |
| Used for manual trimming. |
| """ |
| raise NotImplementedError() |
| |
| def remove_oldest_evictable_item(self): |
| """Removes the oldest item from the cache. If there are items in the cache |
| but none of them are evictable then no item will be removed. |
| Subclasses are free to decide the criteria for whether an item can be |
| evicted. |
| |
| Returns: |
| Size of the oldest item or None. If None is returned then no item was |
| removed from the cache. |
| |
| Used for manual trimming. |
| """ |
| raise NotImplementedError() |
| |
| def save(self): |
| """Saves the current cache to disk.""" |
| raise NotImplementedError() |
| |
| def trim(self): |
| """Enforces cache policies, then calls save(). |
| |
| Returns: |
| Slice with the size of evicted items. |
| """ |
| raise NotImplementedError() |
| |
| def cleanup(self): |
| """Deletes any corrupted item from the cache, then calls trim(), then |
| save(). |
| |
| It is assumed to take significantly more time than trim(). |
| """ |
| raise NotImplementedError() |
| |
| |
| class ContentAddressedCache(Cache): |
| """Content addressed cache that stores objects temporarily. |
| |
| It can be accessed concurrently from multiple threads, so it should protect |
| its internal state with some lock. |
| """ |
| |
| def __enter__(self): |
| """Context manager interface.""" |
| # TODO(maruel): Remove. |
| return self |
| |
| def __exit__(self, _exc_type, _exec_value, _traceback): |
| """Context manager interface.""" |
| # TODO(maruel): Remove. |
| return False |
| |
| def touch(self, digest, size): |
| """Ensures item is not corrupted and updates its LRU position. |
| |
| Arguments: |
| digest: hash digest of item to check. |
| size: expected size of this item. |
| |
| Returns: |
| True if item is in cache and not corrupted. |
| """ |
| raise NotImplementedError() |
| |
| def getfileobj(self, digest): |
| """Returns a readable file like object. |
| |
| If file exists on the file system it will have a .name attribute with an |
| absolute path to the file. |
| """ |
| raise NotImplementedError() |
| |
| def write(self, digest, content): |
| """Reads data from |content| generator and stores it in cache. |
| |
| It is possible to write to an object that already exists. It may be |
| ignored (sent to /dev/null) but the timestamp is still updated. |
| |
| Returns digest to simplify chaining. |
| """ |
| raise NotImplementedError() |
| |
| |
| class MemoryContentAddressedCache(ContentAddressedCache): |
| """ContentAddressedCache implementation that stores everything in memory.""" |
| |
| def __init__(self, file_mode_mask=0o500): |
| """Args: |
| file_mode_mask: bit mask to AND file mode with. Default value will make |
| all mapped files to be read only. |
| """ |
| super(MemoryContentAddressedCache, self).__init__(None) |
| self._file_mode_mask = file_mode_mask |
| # Items in a LRU lookup dict(digest: size). |
| self._lru = lru.LRUDict() |
| |
| # Cache interface implementation. |
| |
| def __len__(self): |
| with self._lock: |
| return len(self._lru) |
| |
| def __iter__(self): |
| # This is not thread-safe. |
| return self._lru.__iter__() |
| |
| def __contains__(self, digest): |
| with self._lock: |
| return digest in self._lru |
| |
| @property |
| def total_size(self): |
| with self._lock: |
| return sum(len(i) for i in self._lru.values()) |
| |
| def oldest_evictable_ts(self): |
| with self._lock: |
| try: |
| # (key, (value, ts)) |
| return self._lru.get_oldest()[1][1] |
| except KeyError: |
| return None |
| |
| def remove_oldest_evictable_item(self): |
| with self._lock: |
| # TODO(maruel): Update self._added. |
| # (key, (value, ts)) |
| return len(self._lru.pop_oldest()[1][0]) |
| |
| def save(self): |
| pass |
| |
| def trim(self): |
| """Trimming is not implemented for MemoryContentAddressedCache.""" |
| return [] |
| |
| def cleanup(self): |
| """Cleaning is irrelevant, as there's no stateful serialization.""" |
| |
| # ContentAddressedCache interface implementation. |
| |
| def __contains__(self, digest): |
| with self._lock: |
| return digest in self._lru |
| |
| def touch(self, digest, size): |
| with self._lock: |
| try: |
| self._lru.touch(digest) |
| except KeyError: |
| return False |
| return True |
| |
| def getfileobj(self, digest): |
| with self._lock: |
| try: |
| d = self._lru[digest] |
| except KeyError: |
| raise CacheMiss(digest) |
| self._used.append(len(d)) |
| self._lru.touch(digest) |
| return io.BytesIO(d) |
| |
| def write(self, digest, content): |
| # Assemble whole stream before taking the lock. |
| data = b''.join(content) |
| with self._lock: |
| self._lru.add(digest, data) |
| self._added.append(len(data)) |
| return digest |
| |
| |
| class DiskContentAddressedCache(ContentAddressedCache): |
| """Stateful LRU cache in a flat hash table in a directory. |
| |
| Saves its state as json file. |
| """ |
| STATE_FILE = 'state.json' |
| |
| def __init__(self, cache_dir, policies, trim, time_fn=None): |
| """ |
| Arguments: |
| cache_dir: directory where to place the cache. |
| policies: CachePolicies instance, cache retention policies. |
| trim: if True to enforce |policies| right away. |
| It can be done later by calling trim() explicitly. |
| """ |
| # All protected methods (starting with '_') except _path should be called |
| # with self._lock held. |
| super(DiskContentAddressedCache, self).__init__(cache_dir) |
| self.policies = policies |
| self.state_file = os.path.join(cache_dir, self.STATE_FILE) |
| # Items in a LRU lookup dict(digest: size). |
| self._lru = lru.LRUDict() |
| # Current cached free disk space. It is updated by self._trim(). |
| file_path.ensure_tree(self.cache_dir) |
| self._free_disk = file_path.get_free_space(self.cache_dir) |
| # The first item in the LRU cache that must not be evicted during this run |
| # since it was referenced. All items more recent that _protected in the LRU |
| # cache are also inherently protected. It could be a set() of all items |
| # referenced but this increases memory usage without a use case. |
| self._protected = None |
| # Cleanup operations done by self._load(), if any. |
| self._operations = [] |
| with tools.Profiler('Setup'): |
| with self._lock: |
| self._load(trim, time_fn) |
| |
| # Cache interface implementation. |
| |
| def __len__(self): |
| with self._lock: |
| return len(self._lru) |
| |
| def __iter__(self): |
| # This is not thread-safe. |
| return self._lru.__iter__() |
| |
| def __contains__(self, digest): |
| with self._lock: |
| return digest in self._lru |
| |
| @property |
| def total_size(self): |
| with self._lock: |
| return sum(self._lru.values()) |
| |
| def oldest_evictable_ts(self): |
| with self._lock: |
| try: |
| # (key, (value, ts)) |
| return self._lru.get_oldest()[1][1] |
| except KeyError: |
| return None |
| |
| def remove_oldest_evictable_item(self): |
| with self._lock: |
| # TODO(maruel): Update self._added. |
| return self._remove_lru_file(True) |
| |
| def save(self): |
| with self._lock: |
| return self._save() |
| |
| def trim(self): |
| """Forces retention policies.""" |
| with self._lock: |
| return self._trim() |
| |
| def cleanup(self): |
| """Cleans up the cache directory. |
| |
| Ensures there is no unknown files in cache_dir. |
| Ensures the read-only bits are set correctly. |
| |
| At that point, the cache was already loaded, trimmed to respect cache |
| policies. |
| """ |
| logging.info('DiskContentAddressedCache.cleanup(): Cleaning %s', |
| self.cache_dir) |
| with self._lock: |
| fs.chmod(self.cache_dir, 0o700) |
| # Ensure that all files listed in the state still exist and add new ones. |
| previous = set(self._lru) |
| # It'd be faster if there were a readdir() function. |
| for filename in fs.listdir(self.cache_dir): |
| if filename == self.STATE_FILE: |
| fs.chmod(os.path.join(self.cache_dir, filename), 0o600) |
| continue |
| if filename in previous: |
| fs.chmod(os.path.join(self.cache_dir, filename), 0o400) |
| previous.remove(filename) |
| continue |
| |
| # An untracked file. Delete it. |
| logging.warning( |
| 'DiskContentAddressedCache.cleanup(): Removing unknown file %s', |
| filename) |
| p = self._path(filename) |
| if fs.isdir(p): |
| try: |
| file_path.rmtree(p) |
| except OSError: |
| pass |
| else: |
| file_path.try_remove(p) |
| continue |
| |
| if previous: |
| # Filter out entries that were not found. |
| logging.warning( |
| 'DiskContentAddressedCache.cleanup(): Removed %d lost files', |
| len(previous)) |
| for filename in previous: |
| self._lru.pop(filename) |
| self._save() |
| |
| # Verify hash of every single item to detect corruption. the corrupted |
| # files will be evicted. |
| total = 0 |
| verified = 0 |
| deleted = 0 |
| logging.info( |
| 'DiskContentAddressedCache.cleanup(): Verifying modified files') |
| with self._lock: |
| for digest, (_, timestamp) in list(self._lru._items.items()): |
| total += 1 |
| # verify only if the mtime is grather than the timestamp in state.json |
| # to avoid take too long time. |
| if self._get_mtime(digest) <= timestamp: |
| continue |
| logging.warning( |
| 'DiskContentAddressedCache.cleanup(): Item has been modified.' |
| ' verifying item: %s', digest) |
| is_valid = self._is_valid_hash(digest) |
| verified += 1 |
| logging.warning( |
| 'DiskContentAddressedCache.cleanup(): verified. is_valid: %s, ' |
| 'item: %s', is_valid, digest) |
| if is_valid: |
| # Update timestamp in state.json |
| self._lru.touch(digest) |
| continue |
| # remove corrupted file from LRU and file system |
| self._lru.pop(digest) |
| self._delete_file(digest, UNKNOWN_FILE_SIZE) |
| deleted += 1 |
| logging.error( |
| 'DiskContentAddressedCache.cleanup(): Deleted corrupted item: %s', |
| digest) |
| self._save() |
| logging.info( |
| 'DiskContentAddressedCache.cleanup(): Verified modified files.' |
| ' total: %d, verified: %d, deleted: %d', total, verified, deleted) |
| |
| # ContentAddressedCache interface implementation. |
| |
| def __contains__(self, digest): |
| with self._lock: |
| return digest in self._lru |
| |
| def touch(self, digest, size): |
| """Verifies an actual file is valid and bumps its LRU position. |
| |
| Returns False if the file is missing or invalid. |
| |
| Note that is doesn't compute the hash so it could still be corrupted if the |
| file size didn't change. |
| """ |
| # Do the check outside the lock. |
| looks_valid = is_valid_file(self._path(digest), size) |
| |
| # Update its LRU position. |
| with self._lock: |
| if digest not in self._lru: |
| if looks_valid: |
| # Exists but not in the LRU anymore. |
| self._delete_file(digest, size) |
| return False |
| if not looks_valid: |
| self._lru.pop(digest) |
| # Exists but not in the LRU anymore. |
| self._delete_file(digest, size) |
| return False |
| self._lru.touch(digest) |
| self._protected = self._protected or digest |
| return True |
| |
| def getfileobj(self, digest): |
| try: |
| f = fs.open(self._path(digest), 'rb') |
| except IOError: |
| raise CacheMiss(digest) |
| with self._lock: |
| try: |
| self._used.append(self._lru[digest]) |
| except KeyError: |
| # If the digest is not actually in _lru, assume it is a cache miss. |
| # Existing file will be overwritten by whoever uses the cache and added |
| # to _lru. |
| f.close() |
| raise CacheMiss(digest) |
| return f |
| |
| def write(self, digest, content): |
| assert content is not None |
| with self._lock: |
| self._protected = self._protected or digest |
| path = self._path(digest) |
| # A stale broken file may remain. It is possible for the file to have write |
| # access bit removed which would cause the file_write() call to fail to open |
| # in write mode. Take no chance here. |
| file_path.try_remove(path) |
| try: |
| size = file_write(path, content) |
| except: |
| # There are two possible places were an exception can occur: |
| # 1) Inside |content| generator in case of network or unzipping errors. |
| # 2) Inside file_write itself in case of disk IO errors. |
| # In any case delete an incomplete file and propagate the exception to |
| # caller, it will be logged there. |
| file_path.try_remove(path) |
| raise |
| # Make the file read-only in the cache. This has a few side-effects since |
| # the file node is modified, so every directory entries to this file becomes |
| # read-only. It's fine here because it is a new file. |
| file_path.set_read_only(path, True) |
| with self._lock: |
| self._add(digest, size) |
| return digest |
| |
| # Internal functions. |
| |
| def _load(self, trim, time_fn): |
| """Loads state of the cache from json file. |
| |
| If cache_dir does not exist on disk, it is created. |
| """ |
| self._lock.assert_locked() |
| |
| if not fs.isfile(self.state_file): |
| if not fs.isdir(self.cache_dir): |
| fs.makedirs(self.cache_dir) |
| else: |
| # Load state of the cache. |
| try: |
| self._lru = lru.LRUDict.load(self.state_file) |
| except ValueError as err: |
| logging.error('Failed to load cache state: %s' % (err,)) |
| # Don't want to keep broken cache dir. |
| file_path.rmtree(self.cache_dir) |
| fs.makedirs(self.cache_dir) |
| self._free_disk = file_path.get_free_space(self.cache_dir) |
| if time_fn: |
| self._lru.time_fn = time_fn |
| if trim: |
| self._trim() |
| |
| def _save(self): |
| """Saves the LRU ordering.""" |
| self._lock.assert_locked() |
| if sys.platform != 'win32': |
| d = os.path.dirname(self.state_file) |
| if fs.isdir(d): |
| # Necessary otherwise the file can't be created. |
| file_path.set_read_only(d, False) |
| if fs.isfile(self.state_file): |
| file_path.set_read_only(self.state_file, False) |
| self._lru.save(self.state_file) |
| |
| def _trim(self): |
| """Trims anything we don't know, make sure enough free space exists.""" |
| self._lock.assert_locked() |
| evicted = [] |
| |
| # Trim old items. |
| if self.policies.max_age_secs: |
| cutoff = self._lru.time_fn() - self.policies.max_age_secs |
| while self._lru: |
| oldest = self._lru.get_oldest() |
| # (key, (data, ts) |
| if oldest[1][1] >= cutoff: |
| break |
| evicted.append(self._remove_lru_file(True)) |
| |
| # Ensure maximum cache size. |
| if self.policies.max_cache_size: |
| total_size = sum(self._lru.values()) |
| while total_size > self.policies.max_cache_size: |
| e = self._remove_lru_file(True) |
| evicted.append(e) |
| total_size -= e |
| |
| # Ensure maximum number of items in the cache. |
| if self.policies.max_items and len(self._lru) > self.policies.max_items: |
| for _ in range(len(self._lru) - self.policies.max_items): |
| evicted.append(self._remove_lru_file(True)) |
| |
| # Ensure enough free space. |
| self._free_disk = file_path.get_free_space(self.cache_dir) |
| while ( |
| self.policies.min_free_space and |
| self._lru and |
| self._free_disk < self.policies.min_free_space): |
| # self._free_disk is updated by this call. |
| evicted.append(self._remove_lru_file(True)) |
| |
| if evicted: |
| total_usage = sum(self._lru.values()) |
| usage_percent = 0. |
| if total_usage: |
| usage_percent = 100. * float(total_usage) / self.policies.max_cache_size |
| |
| logging.warning( |
| 'Trimmed %d file(s) (%.1fkb) due to not enough free disk space:' |
| ' %.1fkb free, %.1fkb cache (%.1f%% of its maximum capacity of ' |
| '%.1fkb)', len(evicted), |
| sum(evicted) / 1024., self._free_disk / 1024., total_usage / 1024., |
| usage_percent, self.policies.max_cache_size / 1024.) |
| self._save() |
| return evicted |
| |
| def _path(self, digest): |
| """Returns the path to one item.""" |
| return os.path.join(self.cache_dir, digest) |
| |
| def _remove_lru_file(self, allow_protected): |
| """Removes the latest recently used file and returns its size. |
| |
| Updates self._free_disk. |
| """ |
| self._lock.assert_locked() |
| try: |
| digest, _ = self._lru.get_oldest() |
| if not allow_protected and digest == self._protected: |
| total_size = sum(self._lru.values()) |
| msg = ('Not enough space to fetch the whole isolated tree.\n' |
| ' %s\n cache=%d bytes (%.3f GiB), %d items; ' |
| '%s bytes (%.3f GiB) free_space') % ( |
| self.policies, total_size, float(total_size) / 1024**3, |
| len(self._lru), self._free_disk, |
| float(self._free_disk) / 1024**3) |
| raise NoMoreSpace(msg) |
| except KeyError: |
| # That means an internal error. |
| raise NoMoreSpace('Nothing to remove, can\'t happend') |
| digest, (size, _) = self._lru.pop_oldest() |
| logging.debug('Removing LRU file %s with size %s bytes', digest, size) |
| self._delete_file(digest, size) |
| return size |
| |
| def _add(self, digest, size=UNKNOWN_FILE_SIZE): |
| """Adds an item into LRU cache marking it as a newest one.""" |
| self._lock.assert_locked() |
| if size == UNKNOWN_FILE_SIZE: |
| size = fs.stat(self._path(digest)).st_size |
| self._added.append(size) |
| self._lru.add(digest, size) |
| self._free_disk -= size |
| # Do a quicker version of self._trim(). It only enforces free disk space, |
| # not cache size limits. It doesn't actually look at real free disk space, |
| # only uses its cache values. self._trim() will be called later to enforce |
| # real trimming but doing this quick version here makes it possible to map |
| # an isolated that is larger than the current amount of free disk space when |
| # the cache size is already large. |
| while (self.policies.min_free_space and self._lru and |
| self._free_disk < self.policies.min_free_space): |
| # self._free_disk is updated by this call. |
| if self._remove_lru_file(False) == -1: |
| break |
| |
| def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE): |
| """Deletes cache file from the file system. |
| |
| Updates self._free_disk. |
| """ |
| self._lock.assert_locked() |
| try: |
| if size == UNKNOWN_FILE_SIZE: |
| try: |
| size = fs.stat(self._path(digest)).st_size |
| except OSError: |
| size = 0 |
| if file_path.try_remove(self._path(digest)): |
| self._free_disk += size |
| except OSError as e: |
| if e.errno != errno.ENOENT: |
| logging.error('Error attempting to delete a file %s:\n%s' % (digest, e)) |
| |
| def _get_mtime(self, digest): |
| """Get mtime of cache file.""" |
| return os.path.getmtime(self._path(digest)) |
| |
| def _is_valid_hash(self, digest): |
| """Verify digest with supported hash algos.""" |
| d = hashlib.sha256() |
| with fs.open(self._path(digest), 'rb') as f: |
| while True: |
| chunk = f.read(1024 * 1024) |
| if not chunk: |
| break |
| d.update(chunk) |
| return digest == d.hexdigest() |
| |
| |
| class NamedCache(Cache): |
| """Manages cache directories. |
| |
| A cache entry is a tuple (name, path), where |
| name is a short identifier that describes the contents of the cache, e.g. |
| "git_v8" could be all git repositories required by v8 builds, or |
| "build_chromium" could be build artefacts of the Chromium. |
| path is a directory path relative to the task run dir. Cache installation |
| puts the requested cache directory at the path. |
| """ |
| _DIR_ALPHABET = string.ascii_letters + string.digits |
| STATE_FILE = 'state.json' |
| NAMED_DIR = 'named' |
| |
| def __init__(self, cache_dir, policies, time_fn=None, keep=None): |
| """Initializes NamedCaches. |
| |
| Arguments: |
| - cache_dir is a directory for persistent cache storage. |
| - policies is a CachePolicies instance. |
| - time_fn is a function that returns timestamp (float) and used to take |
| timestamps when new caches are requested. Used in unit tests. |
| - keep: list of str representing cache items which must not be evicted from |
| cache under any circumstances. No cache eviction policy will be applied |
| to any of these items. |
| """ |
| super(NamedCache, self).__init__(cache_dir) |
| self._policies = policies |
| # LRU {cache_name -> tuple(cache_location, size)} |
| self.state_file = os.path.join(cache_dir, self.STATE_FILE) |
| self._lru = lru.LRUDict() |
| self._keep = set(keep or []) |
| logging.info('NamedCache: keep = %s', keep) |
| if not fs.isdir(self.cache_dir): |
| fs.makedirs(self.cache_dir) |
| elif fs.isfile(self.state_file): |
| try: |
| self._lru = lru.LRUDict.load(self.state_file) |
| for _, size in self._lru.values(): |
| if not isinstance(size, int): |
| with open(self.state_file, 'r') as f: |
| logging.info('named cache state file: %s\n%s', self.state_file, |
| f.read()) |
| raise ValueError("size is not integer: %s" % size) |
| |
| except ValueError: |
| logging.exception( |
| 'NamedCache: failed to load named cache state file; obliterating') |
| file_path.rmtree(self.cache_dir) |
| fs.makedirs(self.cache_dir) |
| self._lru = lru.LRUDict() |
| with self._lock: |
| self._try_upgrade() |
| if time_fn: |
| self._lru.time_fn = time_fn |
| |
| @property |
| def available(self): |
| """Returns a set of names of available caches.""" |
| with self._lock: |
| return set(self._lru) |
| |
| def _sudo_chown(self, path): |
| if sys.platform == 'win32': |
| return |
| uid = os.getuid() |
| if os.stat(path).st_uid == uid: |
| return |
| # Maybe owner of |path| is different from runner of this script. This is to |
| # make fs.rename work in that case. |
| # https://crbug.com/986676 |
| subprocess.check_call(['sudo', '-n', 'chown', str(uid), path]) |
| |
| def install(self, dst, name): |
| """Creates the directory |dst| and moves a previous named cache |name| if it |
| was in the local named caches cache. |
| |
| dst must be absolute, unicode and must not exist. |
| |
| Returns the reused named cache size in bytes, or 0 if none was present. |
| |
| Raises NamedCacheError if cannot install the cache. |
| """ |
| logging.info('NamedCache.install(%r, %r)', dst, name) |
| with self._lock: |
| try: |
| if fs.isdir(dst): |
| raise NamedCacheError( |
| 'installation directory %r already exists' % dst) |
| |
| # Remove the named symlink if it exists. |
| link_name = self._get_named_path(name) |
| if fs.exists(link_name): |
| # Remove the symlink itself, not its destination. |
| fs.remove(link_name) |
| |
| if name in self._lru: |
| rel_cache, size = self._lru.get(name) |
| abs_cache = os.path.join(self.cache_dir, rel_cache) |
| if fs.isdir(abs_cache): |
| logging.info('- reusing %r; size was %d', rel_cache, size) |
| file_path.ensure_tree(os.path.dirname(dst)) |
| self._sudo_chown(abs_cache) |
| fs.rename(abs_cache, dst) |
| self._remove(name) |
| return size |
| |
| logging.warning('- expected directory %r, does not exist', rel_cache) |
| self._remove(name) |
| |
| # The named cache does not exist, create an empty directory. When |
| # uninstalling, we will move it back to the cache and create an an |
| # entry. |
| logging.info('- creating new directory') |
| file_path.ensure_tree(dst) |
| return 0 |
| except (IOError, OSError, PermissionError) as ex: |
| if sys.platform == 'win32': |
| print("There may be running process in cache" |
| " e.g. https://crbug.com/1239809#c14", |
| file=sys.stderr) |
| subprocess.check_call( |
| ["powershell", "get-process | select path,starttime"]) |
| |
| # Raise using the original traceback. |
| exc = NamedCacheError( |
| 'cannot install cache named %r at %r: %s' % (name, dst, ex)) |
| raise exc.with_traceback(sys.exc_info()[2]) |
| finally: |
| self._save() |
| |
| def uninstall(self, src, name): |
| """Moves the cache directory back into the named cache hive for an eventual |
| reuse. |
| |
| The opposite of install(). |
| |
| src must be absolute and unicode. Its content is moved back into the local |
| named caches cache. |
| |
| Returns the named cache size in bytes. |
| |
| Raises NamedCacheError if cannot uninstall the cache. |
| """ |
| logging.info('NamedCache.uninstall(%r, %r)', src, name) |
| start = time.time() |
| with self._lock: |
| try: |
| if not fs.isdir(src): |
| logging.warning( |
| 'NamedCache: Directory %r does not exist anymore. Cache lost.', |
| src) |
| return |
| |
| if name in self._lru: |
| # This shouldn't happen but just remove the preexisting one and move |
| # on. |
| logging.error('- overwriting existing cache!') |
| self._remove(name) |
| |
| # Calculate the size of the named cache to keep. It's important because |
| # if size is zero (it's empty), we do not want to add it back to the |
| # named caches cache. |
| size = file_path.get_recursive_size(src) |
| logging.info('- Size is %d', size) |
| if not size: |
| # Do not save empty named cache. |
| return size |
| |
| # Move the dir and create an entry for the named cache. |
| rel_cache = self._allocate_dir() |
| abs_cache = os.path.join(self.cache_dir, rel_cache) |
| logging.info('- Moving to %r', rel_cache) |
| file_path.ensure_tree(os.path.dirname(abs_cache)) |
| self._sudo_chown(src) |
| fs.rename(src, abs_cache) |
| |
| self._lru.add(name, (rel_cache, size)) |
| self._added.append(size) |
| |
| # Create symlink <cache_dir>/<named>/<name> -> <cache_dir>/<short name> |
| # for user convenience. |
| named_path = self._get_named_path(name) |
| if fs.exists(named_path): |
| file_path.remove(named_path) |
| else: |
| file_path.ensure_tree(os.path.dirname(named_path)) |
| |
| try: |
| fs.symlink(os.path.join('..', rel_cache), named_path) |
| logging.info( |
| 'NamedCache: Created symlink %r to %r', named_path, abs_cache) |
| except OSError: |
| # Ignore on Windows. It happens when running as a normal user or when |
| # UAC is enabled and the user is a filtered administrator account. |
| if sys.platform != 'win32': |
| raise |
| return size |
| except (IOError, OSError, PermissionError) as ex: |
| # Raise using the original traceback. |
| exc = NamedCacheError( |
| 'cannot uninstall cache named %r at %r: %s' % (name, src, ex)) |
| raise exc.with_traceback(sys.exc_info()[2]) |
| finally: |
| # Call save() at every uninstall. The assumptions are: |
| # - The total the number of named caches is low, so the state.json file |
| # is small, so the time it takes to write it to disk is short. |
| # - The number of mapped named caches per task is low, so the number of |
| # times save() is called on tear-down isn't high enough to be |
| # significant. |
| # - uninstall() sometimes throws due to file locking on Windows or |
| # access rights on Linux. We want to keep as many as possible. |
| self._save() |
| logging.info('NamedCache.uninstall(%r, %r) took %d seconds', src, name, |
| time.time() - start) |
| |
| # Cache interface implementation. |
| |
| def __len__(self): |
| with self._lock: |
| return len(self._lru) |
| |
| def __iter__(self): |
| # This is not thread-safe. |
| return self._lru.__iter__() |
| |
| def __contains__(self, name): |
| with self._lock: |
| return name in self._lru |
| |
| @property |
| def total_size(self): |
| with self._lock: |
| return sum(size for _rel_path, size in self._lru.values()) |
| |
| def _oldest_evictable_item(self): |
| self._lock.assert_locked() |
| for name, ts in self._lru.items_with_ts(): |
| if name in self._keep: |
| logging.info("NamedCache: '%s' is kept", name) |
| continue |
| return name, ts |
| return None, None |
| |
| def oldest_evictable_ts(self): |
| with self._lock: |
| return self._oldest_evictable_item()[1] |
| |
| def remove_oldest_evictable_item(self): |
| with self._lock: |
| # TODO(maruel): Update self._added. |
| _, size = self._remove_lru_item() |
| if size: |
| return size |
| return None |
| |
| def save(self): |
| with self._lock: |
| return self._save() |
| |
| def touch(self, *names): |
| with self._lock: |
| for name in names: |
| if name in self._lru: |
| self._lru.touch(name) |
| self._save() |
| |
| def trim(self): |
| evicted = [] |
| with self._lock: |
| if not fs.isdir(self.cache_dir): |
| return evicted |
| |
| # Trim according to maximum number of items. |
| if self._policies.max_items: |
| while len(self._lru) > self._policies.max_items: |
| name, size = self._remove_lru_item() |
| if not name: |
| break |
| evicted.append(size) |
| logging.info( |
| 'NamedCache.trim(): Removed %r(%d) due to max_items(%d)', |
| name, size, self._policies.max_items) |
| |
| # Trim according to maximum age. |
| if self._policies.max_age_secs: |
| cutoff = self._lru.time_fn() - self._policies.max_age_secs |
| while self._lru: |
| ts = self._oldest_evictable_item()[1] |
| if not ts: |
| break |
| if ts >= cutoff: |
| break |
| # If we get to this point, self._remove_lru_item will not return None. |
| name, size = self._remove_lru_item() |
| evicted.append(size) |
| logging.info( |
| 'NamedCache.trim(): Removed %r(%d) due to max_age_secs(%d)', |
| name, size, self._policies.max_age_secs) |
| |
| # Trim according to minimum free space. |
| if self._policies.min_free_space: |
| while self._lru: |
| free_space = file_path.get_free_space(self.cache_dir) |
| if free_space >= self._policies.min_free_space: |
| break |
| name, size = self._remove_lru_item() |
| if not name: |
| break |
| evicted.append(size) |
| logging.info( |
| 'NamedCache.trim(): Removed %r(%d) due to min_free_space(%d)', |
| name, size, self._policies.min_free_space) |
| |
| # Trim according to maximum total size. |
| if self._policies.max_cache_size: |
| while self._lru: |
| total = sum(size for _rel_cache, size in self._lru.values()) |
| if total <= self._policies.max_cache_size: |
| break |
| name, size = self._remove_lru_item() |
| if not name: |
| break |
| evicted.append(size) |
| logging.info( |
| 'NamedCache.trim(): Removed %r(%d) due to max_cache_size(%d)', |
| name, size, self._policies.max_cache_size) |
| |
| self._save() |
| return evicted |
| |
| def cleanup(self): |
| """Removes unknown directories. |
| |
| Does not recalculate the cache size since it's surprisingly slow on some |
| OSes. |
| """ |
| logging.info('NamedCache.cleanup(): Cleaning %s', self.cache_dir) |
| success = True |
| with self._lock: |
| try: |
| actual = set(fs.listdir(self.cache_dir)) |
| actual.discard(self.NAMED_DIR) |
| actual.discard(self.STATE_FILE) |
| expected = {v[0]: k for k, v in self._lru.items()} |
| # First, handle the actual cache content. |
| # Remove missing entries. |
| for missing in (set(expected) - actual): |
| name, size = self._lru.pop(expected[missing]) |
| logging.warning( |
| 'NamedCache.cleanup(): Missing on disk %r(%d)', name, size) |
| # Remove unexpected items. |
| for unexpected in (actual - set(expected)): |
| try: |
| p = os.path.join(self.cache_dir, unexpected) |
| logging.warning( |
| 'NamedCache.cleanup(): Unexpected %r', unexpected) |
| if fs.isdir(p) and not fs.islink(p): |
| file_path.rmtree(p) |
| else: |
| fs.remove(p) |
| except (IOError, OSError) as e: |
| logging.error('Failed to remove %s: %s', unexpected, e) |
| success = False |
| |
| # Second, fix named cache links. |
| named = os.path.join(self.cache_dir, self.NAMED_DIR) |
| if fs.isdir(named): |
| actual = set(fs.listdir(named)) |
| expected = set(self._lru) |
| # Confirm entries. Do not add missing ones for now. |
| for name in expected.intersection(actual): |
| p = os.path.join(self.cache_dir, self.NAMED_DIR, name) |
| expected_link = os.path.join('..', self._lru[name][0]) |
| if fs.islink(p): |
| link = fs.readlink(p) |
| if expected_link == link: |
| continue |
| logging.warning( |
| 'Unexpected symlink for cache %s: %s, expected %s', |
| name, link, expected_link) |
| else: |
| logging.warning('Unexpected non symlink for cache %s', name) |
| if fs.isdir(p) and not fs.islink(p): |
| file_path.rmtree(p) |
| else: |
| fs.remove(p) |
| # Remove unexpected items. |
| for unexpected in (actual - expected): |
| try: |
| p = os.path.join(self.cache_dir, self.NAMED_DIR, unexpected) |
| if fs.isdir(p): |
| file_path.rmtree(p) |
| else: |
| fs.remove(p) |
| except (IOError, OSError) as e: |
| logging.error('Failed to remove %s: %s', unexpected, e) |
| success = False |
| finally: |
| self._save() |
| return success |
| |
| # Internal functions. |
| |
| def _try_upgrade(self): |
| """Upgrades from the old format to the new one if necessary. |
| |
| This code can be removed so all bots are known to have the right new format. |
| """ |
| if not self._lru: |
| return |
| _name, (data, _ts) = self._lru.get_oldest() |
| if isinstance(data, (list, tuple)): |
| return |
| # Update to v2. |
| def upgrade(_name, rel_cache): |
| abs_cache = os.path.join(self.cache_dir, rel_cache) |
| return rel_cache, file_path.get_recursive_size(abs_cache) |
| |
| self._lru.transform(upgrade) |
| self._save() |
| |
| def _remove_lru_item(self): |
| """Removes the oldest LRU entry. LRU must not be empty.""" |
| self._lock.assert_locked() |
| name, _ = self._oldest_evictable_item() |
| if name: |
| _, size = self._lru.get(name) |
| logging.info('Removing named cache %r, %d', name, size) |
| self._remove(name) |
| return name, size |
| return None, None |
| |
| def _allocate_dir(self): |
| """Creates and returns relative path of a new cache directory. |
| |
| In practice, it is a 2-letter string. |
| """ |
| # We randomly generate directory names that have two lower/upper case |
| # letters or digits. Total number of possibilities is (26*2 + 10)^2 = 3844. |
| abc_len = len(self._DIR_ALPHABET) |
| tried = set() |
| while len(tried) < 1000: |
| i = random.randint(0, abc_len * abc_len - 1) |
| rel_path = ( |
| self._DIR_ALPHABET[i // abc_len] + self._DIR_ALPHABET[i % abc_len]) |
| if rel_path in tried: |
| continue |
| abs_path = os.path.join(self.cache_dir, rel_path) |
| if not fs.exists(abs_path): |
| return rel_path |
| tried.add(rel_path) |
| raise NamedCacheError( |
| 'could not allocate a new cache dir, too many cache dirs') |
| |
| def _remove(self, name): |
| """Removes a cache directory and entry. |
| |
| Returns: |
| Number of caches deleted. |
| """ |
| self._lock.assert_locked() |
| # First try to remove the alias if it exists. |
| named_dir = self._get_named_path(name) |
| if fs.islink(named_dir): |
| fs.unlink(named_dir) |
| |
| # Then remove the actual data. |
| if name not in self._lru: |
| return |
| rel_path, _size = self._lru.get(name) |
| abs_path = os.path.join(self.cache_dir, rel_path) |
| if fs.isdir(abs_path): |
| file_path.rmtree(abs_path) |
| self._lru.pop(name) |
| |
| def _save(self): |
| self._lock.assert_locked() |
| file_path.ensure_tree(self.cache_dir) |
| self._lru.save(self.state_file) |
| |
| def _get_named_path(self, name): |
| return os.path.join(self.cache_dir, self.NAMED_DIR, name) |