blob: bd135ffcdc2af3107ab75a350db6d5be11c5361b [file] [log] [blame]
# Copyright 2019 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
from contextlib import contextmanager
import attr
from gevent.queue import Channel
from ..engine_types import ResourceCost
@attr.s
class ResourceWaiter:
"""Represents the machine's CPU, memory, disk and network as limited
resources.
Each cpu (according to multiprocessing.cpu_count()) is worth 1000 millicores.
Every subprocess that attempts to execute will first acquire its estimated
amount of millicores before launching the subprocess. As soon as the
subprocess completes, the held millicores are put back into the pool.
Similarly, memory is measured in megabytes of physical system memory (i.e. not
including swap). The assumption is that the recipe (and its subprocesses) is
really mostly the only thing running on the machine anyway.
Disk and net are different, however. They are unitless measures of 'percentage
of resource', where the absolute quantity of Disk (IOPS, read/write/seek
speed) and Network (bandwidth, latency) are not considered. Consider them more
a declaration that a given step contends on disk or network availability. If
you had many steps which each took `10%` disk, only 10 of them would run at
a time. Similarly, if you had steps which declared 50% of disk bandwidth
usage, only 2 of them would run at a time.
Because recipes are finite both in runtime and number of distinct steps, this
resource class unblocks other processes greedily. Whenever a subprocess
completes, this analyzes all the outstanding subprocesses and will unblock
whichever ones 'fit' in the now-freed resources. This is done in roughly FIFO
order (i.e. if two tasks could potentially fit, the first one to block will be
chosen over the second to unblock first).
This is different than what's deemed 'fair' in a typical scheduling scenario,
because in a mixed workload, heavy tasks could be forced to wait longer while
small tasks use the CPU. However, because the recipes typically run with
a hard finite timeout, it's better to use more of the CPU earlier than to
potentially waste time waiting for small tasks to finish in order to schedule
a heavy task earlier.
"""
# Required for __init__
_millicores_available = attr.ib()
_memory_available = attr.ib()
# Attrs with defaults.
_millicores_max = attr.ib()
@_millicores_max.default
def _millicores_max_default(self):
return self._millicores_available
_memory_max = attr.ib()
@_memory_max.default
def _memory_max_default(self):
return self._memory_available
_disk_available = attr.ib(default=100)
_disk_max = attr.ib(default=100)
_net_available = attr.ib(default=100)
_net_max = attr.ib(default=100)
# Each _waiters entry has a unique ID
_waiter_uid = attr.ib(default=0)
# List[Tuple[amount, waiter_uid, Channel]]
#
# The `uid` is used to ensure that Channel is never used when sorting
# this list.
_waiters = attr.ib(factory=list)
def _fits(self, resources):
assert isinstance(resources, ResourceCost)
return resources.fits(self._millicores_available, self._memory_available,
self._disk_available, self._net_available)
def _decr(self, resources):
assert isinstance(resources, ResourceCost)
self._millicores_available -= resources.cpu
self._memory_available -= resources.memory
self._disk_available -= resources.disk
self._net_available -= resources.net
def _incr(self, resources):
assert isinstance(resources, ResourceCost)
self._millicores_available += resources.cpu
self._memory_available += resources.memory
self._disk_available += resources.disk
self._net_available += resources.net
@contextmanager
def wait_for(self, resources, call_if_blocking):
"""Block until `resources` are available.
Args:
* resources (ResourceCost|None) - The amount of various resources to
acquire before yielding. If any aspect of this exceeds the maximum
amount of resource available on the system, this will instead acquire
the system maximum. If resources is all 0's, or is None, this does not
block.
* call_if_blocking (None|func(ResourceCost)) - `wait_for` will invoke this
callback if we would end up blocking before yielding. This callback
should only be used for reporting/diagnostics (i.e. it shouldn't raise
an exception.)
Yields control once the requisite amount of resources are available. Exiting
the context frees up the resources.
"""
if resources is None:
yield
return
assert isinstance(resources, ResourceCost)
if resources.cpu > self._millicores_max:
resources = attr.evolve(resources, cpu=self._millicores_max)
if resources.memory > self._memory_max:
resources = attr.evolve(resources, memory=self._memory_max)
if resources and (self._waiters or not self._fits(resources)):
# we need some amount of resource AND
# someone else is already waiting, or there isn't enough resource.
if call_if_blocking:
call_if_blocking()
wake_me = Channel()
self._waiter_uid += 1
self._waiters.append((resources, self._waiter_uid, wake_me))
self._waiters.sort(reverse=True) # stable sort
wake_me.get()
# At this point the greenlet that woke us already reserved our resources
# for us, and we're free to go.
else:
# Just directly take our cores.
assert self._fits(resources)
self._decr(resources)
try:
yield
finally:
self._incr(resources)
# We just added some resource back to the pot. Try to wake as many others
# as we can before proceeding.
to_wake, to_keep = [], []
for waiting_resources, uid, chan in self._waiters:
if self._fits(waiting_resources):
to_wake.append(chan)
self._decr(waiting_resources)
else:
to_keep.append((waiting_resources, uid, chan))
self._waiters = to_keep # _waiters was sorted before, so to_keep is also.
for chan in to_wake:
chan.put(None)