blob: 5f1576501992231d1e318306aa8f3d804444ea8e [file] [log] [blame]
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import collections
import copy
import json
import operator
from functools import reduce
from builtins import str as text
from future.utils import iteritems
import attr
from gevent.local import local
from google.protobuf import json_format as json_pb
from google.protobuf import message
from .internal.attr_util import attr_type
def freeze(obj):
"""Takes a generic object ``obj``, and returns an immutable version of it.
Supported types:
* dict / OrderedDict -> FrozenDict
* list -> tuple
* set -> frozenset
* any object with a working __hash__ implementation (assumes that hashable
means immutable)
Will raise TypeError if you pass an object which is not hashable.
"""
if isinstance(obj, dict):
return FrozenDict((freeze(k), freeze(v)) for k, v in iteritems(obj))
elif isinstance(obj, (list, tuple)):
return tuple(freeze(i) for i in obj)
elif isinstance(obj, set):
return frozenset(freeze(i) for i in obj)
else:
hash(obj)
return obj
def thaw(obj):
"""Takes a a frozen object, and returns a mutable version of it.
Conversions:
* collections.Mapping -> dict
* tuple -> list
* frozenset -> set
Close to the opposite of freeze().
Does not convert dict keys.
"""
if isinstance(obj, (dict, collections.OrderedDict, FrozenDict)):
return {k: thaw(v) for k, v in iteritems(obj)}
elif isinstance(obj, (list, tuple)):
return [thaw(i) for i in obj]
elif isinstance(obj, (set, frozenset)):
return {thaw(i) for i in obj}
else:
return obj
class FrozenDict(collections.Mapping):
"""An immutable OrderedDict.
Modified From: http://stackoverflow.com/a/2704866
"""
def __init__(self, *args, **kwargs):
self._d = collections.OrderedDict(*args, **kwargs)
# If getitem would raise a KeyError, then call this function back with the
# missing key instead. This should raise an exception. If the function
# returns, the original KeyError will be raised.
self.on_missing = lambda key: None
# Calculate the hash immediately so that we know all the items are
# hashable too.
self._hash = reduce(operator.xor,
(hash(i) for i in enumerate(iteritems(self._d))), 0)
def __eq__(self, other):
if not isinstance(other, collections.Mapping):
return NotImplemented
if self is other:
return True
if len(self) != len(other):
return False
for k, v in iteritems(self):
if k not in other or other[k] != v:
return False
return True
def __iter__(self):
return iter(self._d)
def __len__(self):
return len(self._d)
def __getitem__(self, key):
try:
return self._d[key]
except KeyError:
self.on_missing(key)
raise
def __hash__(self):
return self._hash
def __repr__(self):
return 'FrozenDict(%r)' % (list(iteritems(self._d)),)
class StepPresentation(object):
_RAW_STATUSES = (
None, 'SUCCESS', 'WARNING', 'FAILURE', 'EXCEPTION', 'CANCELED')
STATUSES = frozenset(status for status in _RAW_STATUSES if status)
# TODO(iannucci): use attr for this
@classmethod
def status_worst(cls, status_a, status_b):
"""Given two STATUS strings, return the worse of the two."""
if not hasattr(cls, 'STATUS_TO_BADNESS'):
cls.STATUS_TO_BADNESS = freeze({
status: i for i, status in enumerate(StepPresentation._RAW_STATUSES)})
if cls.STATUS_TO_BADNESS[status_a] > cls.STATUS_TO_BADNESS[status_b]:
return status_a
return status_b
def __init__(self, step_name):
self._name = step_name
self._finalized = False
self._logs = collections.OrderedDict()
self._links = collections.OrderedDict()
self._status = None
self._had_timeout = False
self._was_cancelled = False
self._step_summary_text = ''
self._step_text = ''
self._properties = {}
@property
def status(self):
return self._status
@status.setter
def status(self, val):
assert not self._finalized, 'Changing finalized step %r' % self._name
assert val in self.STATUSES
self._status = val
def set_worse_status(self, status):
"""Sets .status to this value if it's worse than the current status."""
self.status = self.status_worst(self.status, status)
@property
def had_timeout(self):
return self._had_timeout
@had_timeout.setter
def had_timeout(self, val):
assert not self._finalized, 'Changing finalized step %r' % self._name
assert isinstance(val, bool)
self._had_timeout = val
@property
def was_cancelled(self):
return self._was_cancelled
@was_cancelled.setter
def was_cancelled(self, val):
assert not self._finalized, 'Changing finalized step %r' % self._name
assert isinstance(val, bool)
self._was_cancelled = val
@property
def step_text(self):
return self._step_text
@step_text.setter
def step_text(self, val):
assert not self._finalized, 'Changing finalized step %r' % self._name
self._step_text = val
@property
def step_summary_text(self):
return self._step_summary_text
@step_summary_text.setter
def step_summary_text(self, val):
assert not self._finalized, 'Changing finalized step %r' % self._name
self._step_summary_text = val
@property
def logs(self):
assert not self._finalized, 'Reading logs after finalized %r' % self._name
return self._logs
@property
def links(self):
if not self._finalized:
return self._links
else:
return copy.deepcopy(self._links)
@property
def properties(self): # pylint: disable=E0202
if not self._finalized:
return self._properties
else:
return copy.deepcopy(self._properties)
@properties.setter
def properties(self, val): # pylint: disable=E0202
assert not self._finalized, 'Changing finalized step %r' % self._name
assert isinstance(val, dict)
self._properties = val
def finalize(self, step_stream):
self._finalized = True
# crbug.com/833539: prune all logs from memory when finalizing.
logs = self._logs
self._logs = None
if self.step_text:
step_stream.add_step_text(self.step_text.replace('\n', '<br/>'))
if self.step_summary_text:
step_stream.add_step_summary_text(self.step_summary_text)
# late proto import
from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb2
for name, log in iteritems(logs):
if isinstance(log, common_pb2.Log):
step_stream.append_log(log)
else:
with step_stream.new_log_stream(name) as log_stream:
if isinstance(log, (text, str, bytes)):
self.write_data(log_stream, log)
elif isinstance(log, collections.Iterable):
for line in log:
self.write_data(log_stream, line)
else:
raise ValueError('unknown log type %s: %r' % (type(log), log))
for label, url in iteritems(self.links):
# We fix spaces in urls; It's an extremely common mistake to make, and
# easy to remedy here.
step_stream.add_step_link(text(label), text(url).replace(" ", "%20"))
for key, value in sorted(iteritems(self._properties)):
if isinstance(value, message.Message):
value = json_pb.MessageToDict(value)
step_stream.set_build_property(key, json.dumps(value, sort_keys=True))
step_stream.set_step_status(self.status, self.had_timeout)
@staticmethod
def write_data(log_stream, data):
"""Write the supplied data into the logstream.
Args:
log_stream (stream.StreamEngine.Stream) - The target log stream. In
production, this is backed by a text log stream created by logdog
butler.
data (Union[str, bytes]) - Data to write. If the supplied data is bytes,
it should be valid utf-8 encoded bytes. Note that, in py2 mode, the
supported types are unicode and str respectively. However, due to
historical reason, data is str in py2 that doesn't have valid utf-8
encoding is accepted but discouraged.
"""
if isinstance(data, text):
# unicode in py2 and str in py3
log_stream.write_split(data)
elif isinstance(data, str): # str in py2
# TODO(yiwzhang): try decode and warn user if non-valid utf-8
# encoded data is supplied because log_stream only supports valid utf-8
# text in python3.
log_stream.write_split(data)
elif isinstance(data, bytes): # bytes in py3
# We assume data is valid utf-8 encoded bytes here after transition to
# python3. If there's a need to write raw bytes, we should support binary
# log stream here.
log_stream.write_split(data.decode('utf-8'))
else:
raise ValueError('unknown data type %s: %r' % (type(data), data))
@attr.s(frozen=True)
class ResourceCost(object):
"""A structure defining the resources that a given step may need.
For use with `api.step`; attaching a ResourceCost to a step will allow the
recipe engine to prevent too many costly steps from running concurrently.
See `api.step.ResourceCost` for full documentation.
"""
cpu = attr.ib(validator=attr_type(int), default=500)
memory = attr.ib(validator=attr_type(int), default=50)
disk = attr.ib(validator=attr_type(int), default=0)
net = attr.ib(validator=attr_type(int), default=0)
@classmethod
def zero(cls):
"""Returns a ResourceCost with zero for all resources."""
return cls(0, 0, 0, 0)
def __attrs_post_init__(self):
if self.cpu < 0:
raise ValueError('negative cpu amount')
if self.memory < 0:
raise ValueError('negative memory amount')
if self.disk < 0 or self.disk > 100:
raise ValueError('disk not in [0,100]')
if self.net < 0 or self.net > 100:
raise ValueError('net not in [0,100]')
def __nonzero__(self):
return not self.fits(0, 0, 0, 0)
def __str__(self):
bits = []
if self.cpu > 0:
cores = ('%0.2f' % (self.cpu / 1000.)).rstrip('0').rstrip('.')
bits.append('cpu=[%s cores]' % (cores,))
if self.memory > 0:
bits.append('memory=[%d MiB]' % (self.memory,))
if self.disk > 0:
bits.append('disk=[%d%%]' % (self.disk,))
if self.net > 0:
bits.append('net=[%d%%]' % (self.net,))
return ', '.join(bits)
def fits(self, cpu, memory, disk, net):
"""Returns True if this Resources fits within the given constraints."""
return (
self.cpu <= cpu and
self.memory <= memory and
self.disk <= disk and
self.net <= net
)
# A (global) registry of all PerGreentletState objects.
#
# This is used by the recipe engine to call back each
# PerGreenletState._get_setter_on_spawn when the recipe spawns a new greenlet
# (via the "recipe_engine/futures" module).
#
# Reset in between test runs by the simulator.
class _PerGreentletStateRegistry(list):
def clear(self):
"""Clears the Registry."""
self[:] = []
PerGreentletStateRegistry = _PerGreentletStateRegistry()
class PerGreenletState(local):
"""Subclass from PerGreenletState to get an object whose state is tied to the
current greenlet.
from recipe_engine.engine_types import PerGreenletState
class MyState(PerGreenletState):
cool_stuff = True
neat_thing = ""
def _get_setter_on_spawn(self):
# called on greenlet spawn; return a closure to propagate values from
# the previous greenlet to the new greenlet.
old_cool_stuff = self.cool_stuff
def _inner():
self.cool_stuff = old_cool_stuff
return _inner
class MyApi(RecipeApi):
def __init__(self):
self._state = MyState()
@property
def cool(self):
return self._state.cool_stuff
@property
def neat(self):
return self._state.neat_thing
def calculate(self):
self._state.cool_stuff = False
self._state.neat_thing = "yes"
"""
def __new__(cls, *args, **kwargs):
ret = super(PerGreenletState, cls).__new__(cls, *args, **kwargs)
PerGreentletStateRegistry.append(ret)
return ret
def _get_setter_on_spawn(self):
"""This method should be overridden by your subclass. It will be invoked by
the engine immediately BEFORE spawning a new greenlet, and it should return
a 0-argument function which should repopulate `self` immediately AFTER
spawning the new greenlet.
Example, a PerGreenletState which simply copies its old state to the new
state:
def _get_setter_on_spawn(self):
pre_spawn_state = self.state
def _inner():
self.state = pre_spawn_state
return _inner
This will allow reads and sets of the PerGreenletState's fields to be
per-greenlet, but carry across from greenlet to greenlet.
If this function is not implemented, or returns None, the PerGreenletState
contents will be reset in the new greenlet.
"""
pass