| # Copyright 2021 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Some utilities for concurrency.""" |
| |
| from collections import deque |
| import contextlib |
| import logging |
| import multiprocessing |
| import sys |
| import threading |
| import time |
| import traceback |
| |
| LOGGER = logging.getLogger('dockerbuild') |
| |
| |
| class Pool(): |
| """Multithreaded work pool. Dispatch tasks via `apply' to be run concurrently. |
| |
| This class is similar to the multiprocessing.Pool class, but for threads |
| instead of processes. It queues up all work requests and then runs them all |
| with a target count of active threads to run in parallel. There is no pooling |
| of the threads themselves: one thread is created per task. Hence this isn't |
| suitable for applications with a large or unbounded number of small tasks. But |
| it works fine for our application of O(100) large IO-bound tasks. |
| """ |
| |
| # A dictionary of thread-local attributes. |
| local = threading.local() |
| |
| class Thread(threading.Thread): |
| """Pool-specific Thread subclass that communicates status to the pool.""" |
| |
| def __init__(self, target, args, event, errors): |
| super(Pool.Thread, self).__init__() |
| |
| self.target = target |
| self.args = args |
| self.event = event |
| self.errors = errors |
| self.active = True |
| self.thread_index = None |
| |
| def run(self): |
| if self.thread_index is None: |
| raise Exception( |
| 'ThreadPool must set thread_index before starting a Thread.') |
| Pool.local.thread_index = self.thread_index |
| try: |
| self.target(*self.args) |
| except: # pylint: disable=bare-except |
| # We disable the pylint check as we really do want to catch absolutely |
| # everything here, and propagate back to the main thread. |
| |
| # Return an error string encapsulating the traceback and error message |
| # from the exception we received, so that the pool can include it in its |
| # error message. |
| self.errors.append(''.join(traceback.format_exception(*sys.exc_info()))) |
| finally: |
| # We set this member variable rather than relying on Thread.is_active to |
| # avoid races where the main thread wakes up in between a worker thread |
| # signalling the event and actually exiting. |
| self.active = False |
| # Signal the pool controller thread that a task has finished. |
| self.event.set() |
| |
| class TaskException(Exception): |
| pass |
| |
| def __init__(self, cpus=None): |
| self.queued_threads = deque() |
| self.active_threads = [] |
| self.target_active_thread_count = cpus or multiprocessing.cpu_count() |
| self.errors = [] |
| self.event = threading.Event() |
| |
| def apply(self, f, args): |
| """Add a task which runs f(*args) to the pool. |
| |
| This mirrors the multiprocessing.Pool API. |
| """ |
| self.queued_threads.appendleft( |
| Pool.Thread(f, args, self.event, self.errors)) |
| |
| def run_all(self): |
| """Run all enqueued tasks, blocking until all have finished. |
| |
| If a task throws an exception, it is propagated back to the thread that |
| called `run_all', and no new tasks will be launched. |
| """ |
| |
| while True: |
| if self.errors: |
| error = self.errors[0] |
| LOGGER.error('Build task failed, exiting') |
| |
| # We could consider waiting for all currently active threads to finish |
| # before re-raising the error from the child, for a cleaner shutdown. In |
| # practice this seems to be fine though. |
| # |
| # We could also consider retrying failed tasks to mitigate the effects |
| # of rare race conditions. |
| raise Pool.TaskException(error) |
| |
| # There's a possible race here where a thread finishes and sets the event |
| # in-between us clearing it and executing the below code, but it's benign |
| # as this loop body simply does nothing if all threads are still active. |
| |
| new_active_threads = [] |
| active_thread_indexes = set() |
| for t in self.active_threads: |
| if t.active: |
| new_active_threads.append(t) |
| active_thread_indexes.add(t.thread_index) |
| self.active_threads = new_active_threads |
| |
| # Each Thread is assigned an index between |
| # [0..target_active_thread_count), available as Pool.local.thread_index. |
| # This can be used to manage any resources resources which can persist |
| # across tasks, but should not be shared between threads. |
| available_thread_indexes = [ |
| i for i in reversed(range(self.target_active_thread_count)) |
| if i not in active_thread_indexes |
| ] |
| |
| # Start as many threads as necessary to bring the active thread count back |
| # up to `target_active_thread_count'. |
| new_threads_to_start = min( |
| len(self.queued_threads), |
| self.target_active_thread_count - len(self.active_threads)) |
| if new_threads_to_start > 0: |
| for _ in range(new_threads_to_start): |
| t = self.queued_threads.pop() |
| self.active_threads.append(t) |
| t.thread_index = available_thread_indexes.pop() |
| t.start() |
| |
| if not self.active_threads: |
| # We've run out of tasks to run. |
| break |
| |
| self.event.wait() |
| |
| # There's a possible race here where another thread also sets the event in |
| # between the above wait finishing and us clearing it below. But this is |
| # fine - we'll find all finished threads regardless of whether they were |
| # the one to signal us. |
| self.event.clear() |
| |
| |
| class KeyedLock(): |
| """A lock which blocks based on a dynamic set of keys. |
| |
| For any given key, this class acts the same as `Lock'. Useful for guarding |
| access to resources tied to a particular identifier, for example filesystem |
| paths. |
| |
| Note that a unique lock is created for each key, and they're never deleted |
| until the whole KeyedLock is deleted. As such, this class should only be used |
| when the set of keys is small and bounded. |
| """ |
| |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.lock_dict = {} |
| |
| def get(self, key): |
| # Lock around this "get-or-insert" operation, to ensure we create a unique |
| # lock for every key. |
| with self.lock: |
| lock = self.lock_dict.get(key) |
| if not lock: |
| self.lock_dict[key] = lock = threading.Lock() |
| |
| return lock |
| |
| |
| class RWLock(): |
| """A simple single-writer, multi-reader lock, using a mutex and semaphore. |
| |
| This is used to synchronise access to resources where the users fall into two |
| classes: reader and writer. Any number of readers can access the resource at |
| once, but none can access it while there are any writers. |
| """ |
| |
| def __init__(self): |
| self.read_lock = threading.Lock() |
| self.rw_semaphore = threading.Semaphore() |
| self.count = 0 |
| |
| @contextlib.contextmanager |
| def read(self): |
| # Increment the count, while holding the read lock. |
| with self.read_lock: |
| self.count += 1 |
| # If we're the first reader, acquire the read-write semaphore. This has |
| # the effect of blocking any writers while allowing other readers. |
| if self.count == 1: |
| self.rw_semaphore.acquire() |
| |
| # Critical section. Here we hold the read-write semaphore, if we're the |
| # first reader, or nothing if we're a later reader. So during this block |
| # we exclude any writers, but not any other readers. |
| yield |
| |
| # Decrement the count, while holding the read lock. |
| with self.read_lock: |
| self.count -= 1 |
| # If we're the last reader, release the read-write semaphore. Now writers |
| # will be able to access the resource. Here we see why this must be a |
| # semaphore, and not a mutex. If there are multiple readers that finish in |
| # a different sequence to which they started, then the final reader to |
| # finish may not be the same as the first reader to start. |
| if self.count == 0: |
| self.rw_semaphore.release() |
| |
| def write(self): |
| # Writes are simple - just acquire the read-write semaphore. This will block |
| # if there are any concurrent readers or writers. |
| return self.rw_semaphore |
| |
| def shared(self): |
| return self.read() |
| |
| def exclusive(self): |
| return self.write() |
| |
| |
| # This lock is used to make sure that no processes are spawned while a file is |
| # in the process of being opened for writing. It's acquired in 'shared' mode by |
| # anything opening files, and in 'exclusive' mode by System.run around spawning |
| # processes. |
| # |
| # On systems using `fork' to spawn new processes, the subprocess will |
| # inherit any inheritable open file handles from the parent. While handles are |
| # non-inheritable by default in Python 3, there may be race conditions where |
| # a file has not yet been set as non-inheritable during open(). If a process |
| # is spawned while a file handle is open and inheritable, that process will |
| # keep the file handle open much longer than it should, which can cause issues |
| # with: |
| # |
| # * Multiple processes trying to write to the same file, which Windows seems not |
| # to like. |
| # * Keeping temp files open when we're trying to delete them. |
| # * Having binaries open in write mode when we try to execute them, which isn't |
| # allowed. |
| PROCESS_SPAWN_LOCK = RWLock() |