blob: 3870606a798582546758dc5440ffbcc319b3898d [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Library to generate, maintain, and read static slave pool maps."""
import collections
import itertools
import json
import os
# Used to store a unique slave class. Consists of a name and an optional
# subclass.
SlaveClass = collections.namedtuple('SlaveClass',
('name', 'subtype'))
# Used to store a full slave configuration. Each slave class maps to a single
# slave configuration.
SlaveClassConfig = collections.namedtuple('SlaveClassConfig',
('cls', 'exclusive', 'pools', 'count'))
# Used to store associations between slave class and slaves. Also used to
# store persistent state.
SlaveState = collections.namedtuple('SlaveState',
('class_map', 'unallocated'))
# Used to store a slave map entry (see SlaveAllocator.GetSlaveMap())
SlaveMap = collections.namedtuple('SlaveMap',
('entries', 'unallocated'))
# Used to store a slave map entry (see SlaveAllocator.GetSlaveMap())
SlaveMapEntry = collections.namedtuple('SlaveMapEntry',
('classes', 'keys'))
class SlaveAllocator(object):
"""A slave pool management object.
Individual slave machines are added to named Pools. A slave cannot be a member
of more than one pool.
A Class is a named allocation specification. When allocation is performed,
the allocator maps Clases to Slaves. Therefore, a Class is the unit at which
allocation is performed.
Classes may optionally include a subtype string to help disambiguate them;
for all practical purposes the subtype is just part of the name.
The primary allocation function, '_GetSlaveClassMap', deterministically
associates Classes with sets of slaves from the registered Pools according
to their registered specification.
Keys are a generalization of a builder name, representing a specific entity
that requires slave allocation. While key names need not correspond to
builder names, it is expected that they largely will.
In order to be assigned, Keys gain Class membership via 'Join()'. A key may
join multiple classes, and will be assigned the superset of slaves that those
classes were assigned.
The Allocator may optionally save and load its class allocation state to an
external JSON file. This can be used to enforce class-to-slave mapping
consistency (i.e., builder affinity).
When a new allocation is performed, the SlaveAllocator's State is updated, and
subsequent operations will prefer the previous layout.
# The default path to load/save state to, if none is specified.
DEFAULT_STATE_PATH = 'slave_pool.json'
def __init__(self, state_path=None, list_unallocated=False):
"""Initializes a new slave pool instance.
state_path (str): The path (relative or absolute) of the allocation
save-state JSON file. If None, DEFAULT_STATE_PATH will be used.
list_unallocated (bool): Include an entry listing unallocated slaves.
This entry will be ignored for operations, but can be useful when
generating expectations.
self._state_path = state_path
self._list_unallocated = list_unallocated
self._state = None
self._pools = {}
self._classes = {}
self._membership = {}
self._all_slaves = {}
def state_path(self):
return self._state_path or self.DEFAULT_STATE_PATH
def LoadStateDict(self, state_class_map=None):
"""Loads previous allocation state from a state dictionary.
The state dictionary is structured:
<class-name>: {
<class-subtype>: [
state_class_map (dict): A state class map dictionary. If None or empty,
the current state will be cleared.
if not state_class_map:
self._state = None
class_map = {}
for class_name, class_name_entry in state_class_map.iteritems():
for subtype, slave_list in class_name_entry.iteritems():
cls = SlaveClass(name=class_name, subtype=subtype)
class_map.setdefault(cls, []).extend(str(s) for s in slave_list)
self._state = SlaveState(
def LoadState(self, enforce=True):
"""Loads slave pools from the store, replacing the current in-memory set.
enforce (bool): If True, raise an IOError if the state file does not
exist or a ValueError if it could not be loaded.
state = {}
if not os.path.exists(self.state_path):
if enforce:
raise IOError("State path does not exist: %s" % (self.state_path,))
with open(self.state_path, 'r') as fd:
state = json.load(fd)
except (IOError, ValueError):
if enforce:
def SaveState(self):
"""Saves the current slave pool set to the store path."""
state_dict = {}
if self._state and self._state.class_map:
class_map = state_dict['class_map'] = {}
for sc, slave_list in self._state.class_map.iteritems():
class_dict = class_map.setdefault(, {})
subtype_dict = class_dict.setdefault(sc.subtype, [])
if self._list_unallocated:
state_dict['unallocated'] = sorted(list(self._state.unallocated or ()))
with open(self.state_path, 'w') as fd:
json.dump(state_dict, fd, sort_keys=True, indent=2)
def AddPool(self, name, *slaves):
"""Returns (str): The slave pool that was allocated (for chaining).
name (str): The slave pool name.
slaves: Slave name strings that belong to this pool.
pool = self._pools.get(name)
if not pool:
pool = self._pools[name] = set()
for slave in slaves:
current_pool = self._all_slaves.get(slave)
if current_pool is not None:
if current_pool != name:
raise ValueError("Cannot register slave '%s' with multiple pools "
"(%s, %s)" % (slave, current_pool, name))
self._all_slaves[slave] = name
return name
def GetPool(self, name):
"""Returns (frozenset): The contents of the named slave pol.
name (str): The name of the pool to query.
KeyError: If the named pool doesn't exist.
pool = self._pools.get(name)
if not pool:
raise KeyError("No pool named '%s'" % (name,))
return frozenset(pool)
def Alloc(self, name, subtype=None, exclusive=True, pools=None, count=1):
"""Returns (SlaveClass): The SlaveClass that was allocated.
name (str): The base name of the class to allocate.
subtype (str): If not None, the class subtype. This, along with the name,
forms the class ID.
exclusive (bool): If True, slaves allocated in this class may not be
reused in other allocations.
pools (iterable): If not None, constrain allocation to the named slave
count (int): The number of slaves to allocate for this class.
# Expand our pools.
pools = set(pools or ())
invalid_pools = pools.difference(set(self._pools.iterkeys()))
assert not invalid_pools, (
"Class references undefined pools: %s" % (sorted(invalid_pools),))
cls = SlaveClass(name=name, subtype=subtype)
config = SlaveClassConfig(cls=cls, exclusive=exclusive, pools=pools,
# Register this configuration.
current_config = self._classes.get(cls)
if current_config:
# Duplicate allocations must match configurations.
assert current_config == config, (
"Class allocation doesn't match current for %s: %s != %s" % (
cls, config, current_config))
self._classes[cls] = config
return cls
def Join(self, key, slave_class):
"""Returns (SlaveClass): The 'slave_class' passed in (for chaining).
name (str): The key to join to the slave class.
slave_class (SlaveClass): The slave class to join.
self._membership.setdefault(slave_class, set()).add(key)
return slave_class
def _GetSlaveClassMap(self):
"""Returns (dict): A dictionary mapping SlaveClass to slave tuples.
Applies the current slave configuration to the allocator's slave class. The
result is a dictionary mapping slave names to a tuple of keys belonging
to that slave.
all_slaves = set(self._all_slaves.iterkeys())
n_state = SlaveState(
lru = all_slaves.copy()
exclusive = set()
# The remaining classes to allocate. We keep this sorted for determinism.
remaining_classes = sorted(self._classes.iterkeys())
def allocate_slaves(config, slaves):
class_slaves = n_state.class_map.setdefault(config.cls, [])
if config.count:
slaves = slaves[:max(0, config.count - len(class_slaves))]
if config.exclusive:
if len(lru) == 0:
# Reload LRU.
return class_slaves
def candidate_slaves(config, state):
# Get slaves from the candidate pools.
slaves = set()
for pool in (config.pools or self._pools.iterkeys()):
if state:
slaves &= set(state.class_map.get(config.cls, ()))
# Remove any slaves that have been exclusively allocated.
# Deterministically prefer slaves that haven't been used over those that
# have.
return sorted(slaves & lru) + sorted(slaves.difference(lru))
def apply_config(state=None, finite=False):
incomplete_classes = []
for slave_class in remaining_classes:
if not self._membership.get(slave_class):
# This slave class has no members; ignore it.
config = self._classes[slave_class]
if not (finite and config.count is None):
slaves = candidate_slaves(config, state)
# We're only applying finite configurations in this pass.
slaves = ()
allocated_slaves = allocate_slaves(config, slaves)
if len(allocated_slaves) < max(config.count, 1):
# Return the set of classes that still need allocations.
return incomplete_classes
# If we have a state, apply as much as possible to the current
# configuration. Note that anything can change between the saved state and
# the current configuration, including:
# - Slaves added / removed from pools.
# - Slaves moved from one pool to another.
# - Slave classes added/removed.
if self._state:
remaining_classes = apply_config(self._state)
remaining_classes = apply_config(finite=True)
remaining_classes = apply_config()
# Are there any slave classes remaining?
assert not remaining_classes, (
"Failed to apply config for slave classes: %s" % (remaining_classes,))
self._state = n_state
return n_state
def GetSlaveMap(self):
"""Returns (dict): A dictionary mapping slaves to lists of keys.
slave_map_entries = {}
n_state = self._GetSlaveClassMap()
for slave_class, slaves in n_state.class_map.iteritems():
for slave in slaves:
entry = slave_map_entries.get(slave)
if not entry:
entry = slave_map_entries[slave] = SlaveMapEntry(classes=set(),
entry.keys.extend(self._membership.get(slave_class, ()))
# Convert SlaveMapEntry fields to immutable form.
result = SlaveMap(
for k, v in slave_map_entries.iteritems():
result.entries[k] = SlaveMapEntry(
return result
def BuildClassMap(sm):
class_map = {}
for s, e in sm.entries.iteritems():
for cls in e.classes:
subtype_map = class_map.setdefault(, {})
subtype_map.setdefault(cls.subtype, set()).add(s)
return class_map