blob: ba5189ab6938bc65163a87b9d0091357ab57327b [file] [log] [blame]
# Copyright 2019 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import bdb
import copy
import datetime
import inspect
import io
import json
import logging
import os
import re
import sys
import traceback
from contextlib import contextmanager
from future.utils import raise_
import attr
import gevent
import gevent.local
from google.protobuf import json_format as jsonpb
from pympler import summary, tracker
from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb2
from PB.go.chromium.org.luci.lucictx import sections as sections_pb2
from PB.recipe_engine import engine_properties as engine_properties_pb2
from PB.recipe_engine import result as result_pb2
from .. import recipe_api
from .. import util
from ..step_data import StepData, ExecutionResult
from ..engine_types import StepPresentation, thaw
from ..engine_types import PerGreenletState, PerGreentletStateRegistry
from ..third_party import luci_context
from . import debugger
from .engine_env import merge_envs
from .exceptions import RecipeUsageError, CrashEngine
from .global_shutdown import GLOBAL_SHUTDOWN
from .resource_semaphore import ResourceWaiter
from .step_runner import Step
LOG = logging.getLogger(__name__)
MAX_SUMMARY_MARKDOWN_SIZE = 4000
@attr.s(frozen=True, slots=True, repr=False)
class _ActiveStep:
"""The object type that we keep in RecipeEngine._step_stack."""
step_data = attr.ib() # type: StepData
step_stream = attr.ib() # type: StepStream
is_parent = attr.ib() # type: bool
children_presentations = attr.ib(factory=list) # type: List[StepPresentation]
greenlets = attr.ib(factory=list) # type: List[gevent.Greenlet]
def close(self):
"""If step_data is set, finalizes its StepPresentation with
self.step_stream, then closes self.step_stream.
"""
gevent.wait(self.greenlets)
if self.step_data:
self.step_data.presentation.finalize(self.step_stream)
self.step_stream.close()
class _MemoryProfiler:
"""The memory profiler used in recipe engine that is backed by Pympler.
Note: This class is currently not thread safe. The snapshot operation is not
atomic. The profiler will be called before each step execution. Therefore, it
is okay for now as steps are executed serially. However, once we start to
execute steps in parallel, the implementation needs to be re-evaluated to
ensure the atomicity of snapshot operation
"""
def __init__(self, initial_snapshot_name='Bootstrap'):
self._current_snapshot_name = initial_snapshot_name
self._diff_snapshot = False
self._tracker = tracker.SummaryTracker()
def snapshot(self, snapshot_name):
"""Snapshot the memory.
Returns [generator of str] - formatted memory snapshot or diff surrounded
by dividing line. When this method is called for the first time, the full
snapshot will be returned. After that, it will only return the diff with
the previous snapshot.
"""
memsum = self._tracker.create_summary()
last_snapshot_name = self._current_snapshot_name
self._current_snapshot_name = snapshot_name
if self._diff_snapshot:
yield ((
'-------- Diff between current snapshot (%s) and last snapshot (%s) '
'Starts --------') % (snapshot_name, last_snapshot_name))
diff = self._tracker.diff(summary1=memsum)
yield from summary.format_(diff)
yield ((
'-------- Diff between current snapshot (%s) and last snapshot (%s) '
'Ends --------') % (snapshot_name, last_snapshot_name))
else:
# create_summary() won't make the return value latest summary in the
# underlying tracker. Manually moving it forward
self._tracker.s0 = memsum
# Only dump the full snapshot when this method is called for the first
# time. From then onwards, dump diff only
self._diff_snapshot = True
yield '-------- Memory Snapshot (%s) Start --------' % snapshot_name
yield from summary.format_(memsum)
yield '-------- Memory Snapshot (%s) Ends --------' % snapshot_name
def _get_reasons(exception: Exception) -> list[str]:
if isinstance(exception, ExceptionGroup):
reasons = []
for exc in exception.exceptions:
reasons.extend(_get_reasons(exc))
return reasons
elif isinstance(exception, recipe_api.StepFailure):
return [exception.reason]
else:
return [str(exception)]
class RecipeEngine:
"""
Knows how to execute steps emitted by a recipe, holds global state such as
step history and build properties. Each recipe module API has a reference to
this object.
Recipe modules that are aware of the engine:
* properties - uses engine.properties.
* step - uses engine.create_step(...), and previous_step_result.
"""
def __init__(self, recipe_deps, step_runner, stream_engine,
properties, environ, start_dir, initial_luci_context,
num_logical_cores, memory_mb):
"""See run_steps() for parameter meanings."""
self._recipe_deps = recipe_deps
self._step_runner = step_runner
self._stream_engine = stream_engine # type: StreamEngine
self._properties = properties
self._engine_properties = _get_engine_properties(properties)
self._environ = environ.copy()
self._start_dir = start_dir
self._clients = {client.IDENT: client for client in (
recipe_api.ConcurrencyClient(
stream_engine.supports_concurrency,
self.spawn_greenlet),
recipe_api.LUCIContextClient(initial_luci_context),
recipe_api.PathsClient(start_dir),
recipe_api.PropertiesClient(properties),
recipe_api.StepClient(self),
)}
self._resource = ResourceWaiter(num_logical_cores * 1000, memory_mb)
self._memory_profiler = _MemoryProfiler() if (
self._engine_properties.memory_profiler.enable_snapshot) else None
# A greenlet-local store which holds a stack of _ActiveStep objects, holding
# the most recently executed step at each nest level (objects deeper in the
# stack have lower nest levels). When we pop from this stack, we close the
# corresponding step stream.
#
# NOTE: Due to the way that steps are run in the recipe engine, only the tip
# of this stack may be a 'real' step; i.e. anything other than the tip of
# the stack is a parent nesting step.
class StepStack(PerGreenletState):
steps = [_ActiveStep(None, None, True)] # "root" parent
def _get_setter_on_spawn(self):
tip_step = self.steps[-1]
def _inner():
self.steps = [tip_step]
return _inner
self._step_stack_storage = StepStack()
# Map of namespace_tuple -> {step_name: int} to deduplicate `step_name`s
# within a namespace.
self._step_names = {}
@property
def _step_stack(self):
return self._step_stack_storage.steps
@property
def properties(self):
"""Used by recipe_deps._instantiate_api and recipe_deps.Recipe._run_steps"""
return self._properties
@property
def environ(self):
"""Used by recipe_deps._instantiate_api and recipe_deps.Recipe._run_steps"""
return self._environ
def resolve_requirement(self, req: recipe_api.UnresolvedRequirement):
"""Resolves a requirement or raises ValueError if it cannot be resolved.
Args:
* req (UnresolvedRequirement): The requirement to resolve.
Returns the resolved requirement.
Raises ValueError if the requirement cannot be satisfied.
"""
assert isinstance(req, recipe_api.UnresolvedRequirement)
if req._typ == 'client':
return self._clients.get(req._name)
raise ValueError('Unknown requirement type [%s]' % (req._typ,))
def initialize_path_client_HACK(self, root_api):
"""This is a hack; the "PathsClient" currently works to provide a reverse
string->Path lookup by walking down the recipe's `api` object and calling
the various 'root' path methods (like .resource(), etc.).
However, we would like to eventually simplify the 'paths' system, whose
whole complexity exists to facilitate 'pure-data' config.py processing,
which is also going to be deprecated in favor of protos and removal of the
config subsystem.
Args:
* root_api (RecipeScriptApi): The root `api` object which would be passed
to the recipe's RunSteps function.
"""
self._clients['paths']._initialize_with_recipe_api(root_api)
def close_non_parent_step(self):
"""Closes the tip of the _step_stack if it's not a parent nesting step."""
try:
tip_step = self._step_stack[-1]
if tip_step.is_parent:
return
self._step_stack.pop().close()
except:
_log_crash(self._stream_engine, "close_non_parent_step()")
raise CrashEngine("Closing non-parent step failed.")
@property
def active_step(self):
"""Returns the current _ActiveStep.step_data.
May be None if the _ActiveStep is the root _ActiveStep.
"""
return self._step_stack[-1].step_data
def spawn_greenlet(self, func, args, kwargs, greenlet_name):
"""Returns a gevent.Greenlet which has been initialized with the correct
greenlet-local-storage state.
Args:
* greenlet_name (str|None) - If non-None, assign this to the greenlet's
name.
"""
self.close_non_parent_step()
to_run = [pgs._get_setter_on_spawn() for pgs in PerGreentletStateRegistry]
current_step = self._step_stack[-1]
def _runner():
for fn in to_run:
fn()
try:
return func(*args, **kwargs)
finally:
self.close_non_parent_step()
ret = gevent.spawn(_runner)
if greenlet_name is not None:
ret.name = greenlet_name
# need stack frames here, rather than greenlet 'lightweight' stack
ret.spawning_frames = [frame_tup[0] for frame_tup in inspect.stack(2)]
current_step.greenlets.append(ret)
return ret
def _record_step_name(self, name):
"""Records a step name in the current namespace.
Args:
* name (str) - The name of the step we want to run in the current context.
Side effect:
* calls close_non_parent_step.
* Updates global tracking state for this step name.
Returns Tuple[str] of the step name_tokens that should ACTUALLY run.
"""
self.close_non_parent_step()
try:
namespace = ()
if self.active_step:
namespace = self.active_step.name_tokens
cur_state = self._step_names.setdefault(namespace, {})
cur_count = cur_state.setdefault(name, 0)
dedup_name = name
if cur_count:
dedup_name = name + ' (%d)' % (cur_count + 1)
cur_state[name] += 1
return namespace + (dedup_name,)
except:
_log_crash(self._stream_engine, "_record_step_name(%r)" % (name,))
raise CrashEngine("Getting name tokens for %r failed." % (name,))
def _write_memory_snapshot(self, log_stream, snapshot_name):
"""Snapshot the memory and write the result to the supplied log stream if
the memory snapshot is enabled.
Args:
* log_stream (Stream) - stream that the diff will write to. An None
stream will make this method no-op
* snapshot_name (str) - Name of the snapshot. The name will be preserved
along with the snapshot
TODO(crbug.com/1057844): After luciexe rolls out, instead of writing the
log to arbitrary log stream, it should constantly write to memory_profile
log stream created in setup_build step to consolidate all memory snapshots
in one UI page.
"""
if self._memory_profiler and log_stream:
for line in self._memory_profiler.snapshot(snapshot_name):
log_stream.write_line(line)
@contextmanager
def parent_step(self, name):
"""Opens a parent step with the given name in the current namespace.
Args:
* name (str) - The name of the parent step to open.
Yields a tuple of (StepPresentation, List[StepData]):
* The StepPresentation for this parent step.
* The List of children StepData of this parent step.
"""
name_tokens = self._record_step_name(name)
try:
step_data = StepData(name_tokens, ExecutionResult(retcode=0))
# TODO(iannucci): Use '|' instead of '.'
presentation = StepPresentation('.'.join(name_tokens))
self._step_stack[-1].children_presentations.append(presentation)
step_data.presentation = presentation
step_data.finalize()
active_step = _ActiveStep(
step_data,
self._stream_engine.new_step_stream(name_tokens, False),
True)
active_step.step_stream.mark_running()
self._step_stack.append(active_step)
except:
_log_crash(self._stream_engine, "parent_step(%r)" % (name_tokens))
raise CrashEngine("Prepping parent step %r failed." % (name_tokens))
try:
yield presentation, active_step.children_presentations
finally:
try:
self.close_non_parent_step()
self._step_stack.pop().close()
except:
_log_crash(
self._stream_engine, "parent_step.close(%r)" % (name_tokens,))
raise CrashEngine("Closing parent step %r failed." % (name_tokens,))
def run_step(self, step_config):
"""Runs a step.
Args:
step_config (StepConfig): The step configuration to run.
Returns:
A StepData object containing the result of the finished step.
"""
# TODO(iannucci): When subannotations are handled with annotee, move
# `allow_subannotations` into recipe_module/step.
name_tokens = self._record_step_name(step_config.name)
# TODO(iannucci): Start with had_exception=True and overwrite when we know
# we DIDN'T have an exception.
ret = StepData(name_tokens, ExecutionResult())
try:
self._step_runner.register_step_config(name_tokens, step_config)
except:
# Test data functions are not allowed to raise exceptions. Instead of
# letting user code catch these, we crash the test immediately.
_log_crash(self._stream_engine, "register_step_config(%r)" % (ret.name,))
raise CrashEngine("Registering step_config failed for %r." % (
ret.name
))
step_stream = self._stream_engine.new_step_stream(name_tokens,
step_config.allow_subannotations, merge_step=step_config.merge_step)
caught = None
try:
# initialize presentation to show an exception.
ret.presentation = StepPresentation(step_config.name)
ret.presentation.status = 'EXCEPTION'
# Add `presentation` to the parents of the active step.
self._step_stack[-1].children_presentations.append(ret.presentation)
self._step_stack.append(_ActiveStep(ret, step_stream, False))
# NOTE: It's important to not open debug_log until:
# 1) We know we're going to have to quit due to GLOBAL_SHUTDOWN; OR
# 2) We know we have the resources to run this step.
#
# Otherwise if we open it here, the recipe can run out of file descriptors
# in the event that it has many, many blocked steps.
debug_log = None
try: # _run_step should never raise an exception, except for GreenletExit
if GLOBAL_SHUTDOWN.ready():
debug_log = step_stream.new_log_stream('$debug')
debug_log.write_line('GLOBAL_SHUTDOWN already active, skipping step.')
step_stream.mark_running() # to set start time, etc.
raise gevent.GreenletExit()
def _if_blocking():
step_stream.set_summary_markdown(
'Waiting for resources: `%s`' % (step_config.cost,))
with self._resource.wait_for(step_config.cost, _if_blocking):
debug_log = step_stream.new_log_stream('$debug')
step_stream.mark_running()
try:
self._write_memory_snapshot(
debug_log, 'Step: %s' % '.'.join(name_tokens))
caught = _run_step(
debug_log, ret, step_stream, self._step_runner, step_config,
self._environ, self._start_dir)
finally:
# NOTE: See the accompanying note in stream.py.
step_stream.reset_subannotation_state()
except gevent.GreenletExit:
ret.exc_result = attr.evolve(ret.exc_result, was_cancelled=True)
finally:
if debug_log:
debug_log.close()
ret.finalize()
if step_config.merge_step:
_update_merge_step_presentation(ret.presentation,
ret.step.sub_build,
step_stream.user_namespace,
step_config.infra_step)
# If there's a buffered exception, we raise it now.
if caught:
raise_(caught[0], caught[1], caught[2])
return ret
finally:
# per sys.exc_info this is recommended in python 2.x to avoid creating
# garbage cycles.
del caught
def _setup_build_step(self, recipe, emit_initial_properties):
with self._stream_engine.new_step_stream(('setup_build',), False) as step:
step.mark_running()
if emit_initial_properties:
for key in sorted(self.properties):
step.set_build_property(
key, json.dumps(self.properties[key], sort_keys=True))
run_recipe_help_lines = [
'To repro this locally, run the following line from the root of a %r'
' checkout:' % (self._recipe_deps.main_repo.name),
'',
'%s run --properties-file - %s <<EOF' % (
os.path.join(
'.', self._recipe_deps.main_repo.simple_cfg.recipes_path,
'recipes.py'),
recipe),
]
run_recipe_help_lines.extend(
json.dumps(self.properties, indent=2).splitlines())
run_recipe_help_lines += [
'EOF',
'',
'To run on Windows, you can put the JSON in a file and redirect the',
'contents of the file into run_recipe.py, with the < operator.',
]
with step.new_log_stream('run_recipe') as log:
for line in run_recipe_help_lines:
log.write_line(line)
with step.new_log_stream('memory_profile') as memory_log:
self._write_memory_snapshot(memory_log, 'Step: setup_build')
step.write_line('Running recipe with %s' % (self.properties,))
py_ver = '%d.%d.%d' % (
sys.version_info.major,
sys.version_info.minor,
sys.version_info.micro,
)
step.add_step_text('running recipe: "%s" with Python %s' % (
recipe, py_ver))
@classmethod
def run_steps(cls, recipe_deps, properties, stream_engine, step_runner,
environ, cwd, initial_luci_context,
num_logical_cores, memory_mb,
emit_initial_properties=False, test_data=None,
skip_setup_build=False):
"""Runs a recipe (given by the 'recipe' property). Used by all
implementations including the simulator.
Args:
* recipe_deps (RecipeDeps) - The loaded recipe repo dependencies.
* properties: a dictionary of properties to pass to the recipe. The
'recipe' property defines which recipe to actually run.
* stream_engine: the StreamEngine to use to create individual step
streams.
* step_runner: The StepRunner to use to 'actually run' the steps.
* environ: The mapping object representing the environment in which
recipe runs. Generally obtained via `os.environ`.
* cwd (str): The current working directory to run the recipe.
* initial_luci_context (Dict[str, Dict]): The content of LUCI_CONTEXT to
pass to the recipe.
* num_logical_cores (int): The number of logical CPU cores to assume the
machine has.
* memory_mb (int): The amount of memory to assume the machine has, in MiB.
* emit_initial_properties (bool): If True, write the initial recipe engine
properties in the "setup_build" step.
Returns a 2-tuple of:
* result_pb2.RawResult
* The tuple containing exception info if there is an uncaught exception
triggered by recipe code or None
Does NOT raise exceptions.
"""
result = result_pb2.RawResult()
uncaught_exception = None
assert 'recipe' in properties
recipe = properties['recipe']
try:
# This does all loading and importing of the recipe script.
recipe_obj = recipe_deps.main_repo.recipes[recipe]
# Make sure `global_symbols` (which is a cached execfile of the recipe
# python file) executes here so that we can correctly catch any
# RecipeUsageError exceptions which exec'ing it may cause.
# TODO(iannucci): Make this @cached_property more explicit (e.g.
# 'load_global_symbols' and 'cached_global_symbols' or something).
_ = recipe_obj.global_symbols
engine = cls(
recipe_deps, step_runner, stream_engine,
properties, environ, cwd, initial_luci_context, num_logical_cores,
memory_mb)
api = recipe_obj.mk_api(engine, test_data)
engine.initialize_path_client_HACK(api)
except (RecipeUsageError, ImportError, AssertionError) as ex:
_log_crash(stream_engine, 'loading recipe')
# TODO(iannucci): differentiate infra failure and user failure; will
# result in expectation changes, but that should be safe in its own CL.
result.status = common_pb2.INFRA_FAILURE
result.summary_markdown = 'Uncaught exception: ' + util.format_ex(ex)
return result, uncaught_exception
# TODO(iannucci): Don't skip this during tests (but maybe filter it out from
# expectations).
if not skip_setup_build:
try:
engine._setup_build_step(recipe, emit_initial_properties)
except Exception as ex:
_log_crash(stream_engine, 'setup_build')
result.status = common_pb2.INFRA_FAILURE
result.summary_markdown = 'Uncaught Exception: ' + util.format_ex(ex)
return result, uncaught_exception
try:
try:
try:
raw_result = recipe_obj.run_steps(api, engine)
if raw_result is None:
result.status = common_pb2.SUCCESS
# Notify user that they used the wrong recipe return type.
elif not isinstance(raw_result, result_pb2.RawResult):
result.status = common_pb2.FAILURE
result.summary_markdown = ('"%r" is not a valid return type for '
'recipes. Did you mean to use "RawResult"?' % (type(raw_result), ))
else:
result.CopyFrom(raw_result)
finally:
# TODO(iannucci): give this more symmetry with parent_step
engine.close_non_parent_step()
engine._step_stack[-1].close() # pylint: disable=protected-access
except* recipe_api.StepFailure as ex:
if debugger.should_set_implicit_breakpoints():
# =========================================================
# DEBUG: RunSteps just exited with a StepFailure exception.
# =========================================================
breakpoint() # pylint: disable=forgotten-debug-statement
reasons = _get_reasons(ex)
is_infra_failure = False
was_cancelled = recipe_api.was_cancelled(ex)
for sub_ex in ex.exceptions:
if isinstance(sub_ex, recipe_api.InfraFailure):
is_infra_failure = True
if was_cancelled and GLOBAL_SHUTDOWN.ready():
# We presume if we caught a cancelation exception and GLOBAL_SHUTDOWN
# is on that the original exception was due to GLOBAL_SHUTDOWN... this
# isn't 100% guaranteed, but for now it's close enough.
result.status = common_pb2.CANCELED
elif is_infra_failure:
result.status = common_pb2.INFRA_FAILURE
else:
result.status = common_pb2.FAILURE
# The encoded summary markdown length has a hard limit. We include as
# many of the triggering exceptions as possible without exceeding the
# limit.
parts = []
max_size = MAX_SUMMARY_MARKDOWN_SIZE - 25
for i, reason in enumerate(reasons):
# Always include the first exception.
if i == 0 or len('\n'.join([*parts, reason])) < max_size:
parts.append(reason)
else:
num_hidden = len(reasons) - len(parts)
parts.append(
f'\n({num_hidden}/{len(reasons)} errors truncated)'
)
result.summary_markdown = '\n'.join(parts)
except bdb.BdbQuit: # let debugger quit flow through
raise
# All other exceptions are reported to the user and are fatal.
except Exception as ex: # pylint: disable=broad-except
if debugger.should_set_implicit_breakpoints():
# =============================================================
# DEBUG: RunSteps just exited with a non-StepFailure exception.
# =============================================================
breakpoint() # pylint: disable=forgotten-debug-statement
_log_crash(stream_engine, 'Uncaught exception')
result.status = common_pb2.INFRA_FAILURE
result.summary_markdown = 'Uncaught Exception: ' + util.format_ex(ex)
uncaught_exception = sys.exc_info()
except CrashEngine as ex:
_log_crash(stream_engine, 'Engine Crash')
result.status = common_pb2.INFRA_FAILURE
result.summary_markdown = util.format_ex(ex)
return result, uncaught_exception
def _set_initial_status(presentation, step_config, exc_result):
"""Calculates and returns a StepPresentation.status value from a StepConfig
and an ExecutionResult.
"""
# TODO(iannucci): make StepPresentation.status enumey instead of stringy.
presentation.had_timeout = exc_result.had_timeout
presentation.was_cancelled = exc_result.was_cancelled
# TODO(iannucci): there should really be a TIMEOUT status, I think:
# CANCELED: externally imposed interrupt
# TIMEOUT: self-imposed timeout
if exc_result.was_cancelled:
presentation.status = 'CANCELED'
return
if exc_result.had_exception:
presentation.status = 'EXCEPTION'
return
if (step_config.ok_ret is step_config.ALL_OK or
exc_result.retcode in step_config.ok_ret):
presentation.status = 'SUCCESS'
return
presentation.status = 'EXCEPTION' if step_config.infra_step else 'FAILURE'
def _update_merge_step_presentation(presentation, sub_build,
user_namespace, infra_step):
"""Update the step presentation for merge step based on the result sub build.
Overrides the presentation status with the status of the sub-build. If the
status of the sub-build is CANCELED, the presentation status will be set to
INFRA_FAILURE because there's no CANCELED status support in StepPresentation.
TODO(crbug.com/1096713): Support setting CANCELED Status through
StepPresentation after getting rid of annotator mode.
The summary_markdown of the sub-build will be appended to step text and all
logs in the output of the sub-build will be merged into to the step logs.
If any of the following conditions is matched, the status will be explicitly
set to INFRA_FAILURE and the summary_markdown and output logs won't be merged
* The luciexe that the current merge step invokes does NOT write its final
build proto to the provided output location.
* The final build proto reports a non-terminal status.
"""
def append_step_text(text):
if presentation.step_text:
presentation.step_text += '\n'
presentation.step_text += text
if presentation.status != 'SUCCESS':
# Something went wrong already before we check the sub build proto.
# Return immediately so that error is not masked.
return
elif sub_build is None:
presentation.status = 'EXCEPTION'
append_step_text(
"Merge Step Error: Can't find the final build output for luciexe.")
elif not (sub_build.status & common_pb2.ENDED_MASK):
presentation.status = 'EXCEPTION'
append_step_text(
'Merge Step Error: expected terminal build status of sub build; '
'got status: %s.' % common_pb2.Status.Name(sub_build.status))
else:
presentation.status = {
common_pb2.SUCCESS: 'SUCCESS',
common_pb2.FAILURE: 'EXCEPTION' if infra_step else 'FAILURE',
common_pb2.CANCELED: 'EXCEPTION',
common_pb2.INFRA_FAILURE: 'EXCEPTION',
}[sub_build.status]
if sub_build.summary_markdown:
append_step_text(sub_build.summary_markdown)
if sub_build.output.logs:
for log in sub_build.output.logs:
merged_log = common_pb2.Log()
merged_log.MergeFrom(log)
if user_namespace:
merged_log.url = '/'.join((user_namespace, log.url))
presentation.logs[log.name] = merged_log
def _get_engine_properties(properties):
"""Retrieve and resurrect JSON serialized engine properties from all
properties passed to recipe.
The serialized value is associated with key '$recipe_engine'.
Args:
* properties (dict): All input properties for passed to recipe
Returns a engine_properties_pb2.EngineProperties object
"""
return jsonpb.ParseDict(
properties.get('$recipe_engine', {}),
engine_properties_pb2.EngineProperties(),
ignore_unknown_fields=True)
def _prepopulate_placeholders(step_config, step_data):
"""Pre-fills the StepData with None for every placeholder available in
`step_config`. This is so that users don't have to worry about various
placeholders not existing on StepData."""
for itm in step_config.cmd:
if isinstance(itm, util.OutputPlaceholder):
step_data.assign_placeholder(itm, None)
def _resolve_output_placeholders(
debug, name_tokens, step_config, step_data, step_runner):
"""Takes the original (unmodified by _render_placeholders) step_config and
invokes the '.result()' method on every placeholder. This will update
'step_data' with the results.
`step_runner` is used for `placeholder` which should return test data
input for the InputPlaceholder.cleanup and OutputPlaceholder.result methods.
"""
for itm in step_config.cmd:
if isinstance(itm, util.Placeholder):
test_data = step_runner.placeholder(name_tokens, itm)
if isinstance(itm, util.InputPlaceholder):
debug.write_line(' cleaning %r' % (itm,))
itm.cleanup(test_data.enabled)
else:
debug.write_line(' finding result of %r' % (itm,))
step_data.assign_placeholder(itm, itm.result(
step_data.presentation, test_data))
if step_config.stdin:
debug.write_line(' cleaning stdin: %r' % (step_config.stdin,))
step_config.stdin.cleanup(
step_runner.handle_placeholder(name_tokens, 'stdin').enabled)
for handle in ('stdout', 'stderr'):
placeholder = getattr(step_config, handle)
if placeholder:
debug.write_line(' finding result of %s: %r' % (handle, placeholder))
test_data = step_runner.handle_placeholder(name_tokens, handle)
setattr(step_data, handle, placeholder.result(
step_data.presentation, test_data))
def _render_config(debug, name_tokens, step_config, step_runner, step_stream,
environ, start_dir):
"""Returns
* a step_runner.Step which is ready for consumption by
StepRunner.run. None if render fails.
* reason for render failure. None if render succeeds.
`step_runner` is used for `placeholder` and `handle_placeholder`
which should return test data input for the Placeholder.render method
(or an empty test data object).
"""
cmd = []
debug.write_line('rendering placeholders')
for itm in step_config.cmd:
if isinstance(itm, util.Placeholder):
debug.write_line(' %r' % (itm,))
cmd.extend(itm.render(step_runner.placeholder(name_tokens, itm)))
else:
cmd.append(itm)
handles = {}
debug.write_line('rendering std handles')
std_handle_reqs = {}
for handle in ('stdout', 'stderr', 'stdin'):
placeholder = getattr(step_config, handle)
if placeholder:
debug.write_line(' %s: %r' % (handle, placeholder))
placeholder.render(step_runner.handle_placeholder(name_tokens, handle))
# TODO(iannucci): maybe verify read/write permissions for backing_file
# here?
handles[handle] = placeholder.backing_file
elif handle == 'stdin':
handles[handle] = None
else:
std_handle_reqs[handle] = True
if std_handle_reqs:
handles.update(step_stream.open_std_handles(**std_handle_reqs))
debug.write_line('merging envs')
pathsep = step_config.env_suffixes.pathsep
# TODO(iannucci): remove second return value from merge_envs, it's not needed
# any more.
env, _ = merge_envs(environ,
step_config.env,
step_config.env_prefixes.mapping,
step_config.env_suffixes.mapping,
pathsep)
env.update(step_stream.env_vars)
step_luci_context = step_config.luci_context
if step_luci_context or step_config.timeout:
debug.write_line('writing LUCI_CONTEXT file')
if step_config.timeout:
ideal_soft_deadline = step_runner.now() + step_config.timeout
step_luci_context = dict(step_luci_context) if step_luci_context else {}
d = step_luci_context.get('deadline', None)
if d:
d = copy.deepcopy(d)
if d.soft_deadline: # finite
d.soft_deadline = min(ideal_soft_deadline, d.soft_deadline)
else: # infinite
d.soft_deadline = ideal_soft_deadline
else:
d = sections_pb2.Deadline(
soft_deadline=ideal_soft_deadline, grace_period=30)
step_luci_context['deadline'] = d
section_values = {
key: jsonpb.MessageToDict(pb_val) if pb_val is not None else None
for key, pb_val in step_luci_context.items()
}
if step_config.timeout:
# the json serialization is nicer to print here
debug.write_line(' adjusted deadline: %r' % (
section_values['deadline'],))
lctx_file = step_runner.write_luci_context(section_values)
debug.write_line(' done: %r' % (lctx_file,))
env[luci_context.ENV_KEY] = lctx_file
debug.write_line('checking cwd: %r' % (step_config.cwd,))
cwd = step_config.cwd or start_dir
if not step_runner.isabs(name_tokens, cwd):
debug.write_line(' not absolute: %r' % (cwd,))
return None, 'cwd %r is not absolute' % (cwd,)
if not step_runner.isdir(name_tokens, cwd):
debug.write_line(' not a directory: %r' % (cwd,))
return None, 'cwd %r is not a directory' % (cwd,)
if not step_runner.access(name_tokens, cwd, os.R_OK):
debug.write_line(' no read perms: %r' % (cwd,))
return None, 'no read perms on cwd %r' % (cwd,)
path = env.get('PATH', '').split(pathsep)
debug.write_line('resolving cmd0 %r' % (cmd[0],))
debug.write_line(' in PATH: %s' % (path,))
debug.write_line(' with cwd: %s' % (cwd,))
cmd0 = step_runner.resolve_cmd0(name_tokens, debug, cmd[0], cwd, path)
if cmd0 is None:
debug.write_line('failed to resolve cmd0')
return Step(
cmd=tuple(cmd),
cwd=cwd,
env=env,
luci_context=step_luci_context,
**handles), 'cmd0 %r not found' % (cmd[0],)
debug.write_line('resolved cmd0: %r' % (cmd0,))
return Step(
cmd=(cmd0,) + tuple(cmd[1:]),
cwd=cwd,
env=env,
luci_context=step_luci_context,
**handles), None
def _run_step(debug_log, step_data, step_stream, step_runner,
step_config, base_environ, start_dir):
"""Does all the logic to actually execute the step.
This will:
* resolve all placeholders
* execute the step
* calculate initial presentation status
* parse all placeholders
Args:
* debug_log (Stream) - The stream we should write debugging information to.
* step_data (StepData) - The StepData object we're going to ultimately
return to userspace.
* step_stream (StepStream) - The StepStream for the step we're about to
execute.
* step_runner (StepRunner)
* step_config (StepConfig) - The step to run.
* base_environ (dict|FakeEnviron) - The 'base' environment to merge the
step_config's environmental parameters into.
Returns (exc_info|None). Any exception which was raised while running the step
(or None, if everything was OK).
Side effects: populates the step_data.presentation object.
"""
if not step_config.cmd:
debug_log.write_line('Noop step.')
step_data.exc_result = step_runner.run_noop(
step_data.name_tokens, debug_log)
_set_initial_status(step_data.presentation, step_config,
step_data.exc_result)
return None
caught = None
exc_details = step_stream.new_log_stream('execution details')
try:
# Write the stack trace of the function call to the debug log
debug_log.write_line('Stack trace for this step:')
buffer = io.StringIO()
traceback.print_stack(file=buffer)
for line in buffer.getvalue().split("\n"):
debug_log.write_line(line)
debug_log.write_line('Prepopulating placeholder data')
_prepopulate_placeholders(step_config, step_data)
debug_log.write_line('Rendering input placeholders')
rendered_step, render_err = _render_config(
debug_log, step_data.name_tokens, step_config, step_runner, step_stream,
base_environ, start_dir)
if rendered_step is not None:
_print_step(exc_details, rendered_step)
if render_err:
step_data.exc_result = ExecutionResult(had_exception=True)
step_data.presentation.step_text = render_err
else:
debug_log.write_line('Executing step')
try:
step_data.exc_result = step_runner.run(
step_data.name_tokens, debug_log, rendered_step)
except gevent.GreenletExit:
# Greenlet was killed while running the step
step_data.exc_result = ExecutionResult(was_cancelled=True)
if step_data.exc_result.retcode is not None:
# Windows error codes such as 0xC0000005 and 0xC0000409 are much
# easier to recognize and differentiate in hex.
exc_details.write_line(
'Step had exit code: %s (a.k.a. 0x%08X)' % (
step_data.exc_result.retcode,
step_data.exc_result.retcode & 0xffffffff))
# Have to render presentation.status once here for the placeholders to
# observe.
_set_initial_status(step_data.presentation, step_config,
step_data.exc_result)
if not render_err:
debug_log.write_line('Resolving output placeholders')
_resolve_output_placeholders(
debug_log, step_data.name_tokens, step_config, step_data, step_runner)
except: # pylint: disable=bare-except
caught = sys.exc_info()
debug_log.write_line('Unhandled exception:')
for line in traceback.format_exc().splitlines():
debug_log.write_line(line)
step_data.exc_result = attr.evolve(step_data.exc_result, had_exception=True)
if step_data.exc_result.had_timeout:
exc_details.write_line('Step timed out.')
if step_data.exc_result.had_exception:
exc_details.write_line('Step had exception.')
for line in traceback.format_exc().splitlines():
debug_log.write_line(line)
if step_data.exc_result.was_cancelled:
exc_details.write_line('Step was canceled.')
exc_details.close()
# Re-render the presentation status; If one of the output placeholders blew up
# or there was otherwise something bad that happened, we should adjust
# accordingly.
_set_initial_status(step_data.presentation, step_config, step_data.exc_result)
return caught
try:
from shlex import quote as _single_arg_quote
except ImportError:
from pipes import quote as _single_arg_quote
def _shell_quote(arg):
"""Shell-quotes a string with minimal noise such that it is still reproduced
exactly in a bash/zsh shell.
"""
if arg == '':
return "''"
# Normal shell-printable string without quotes
if re.match(r'[-+,./0-9:@A-Z_a-z]+$', arg):
return arg
# Printable within regular single quotes.
if re.match('[\040-\176]+$', arg):
return "'%s'" % arg.replace("'", "'\\''")
# Something complicated, printable within special escaping quotes.
# The $'stuff' syntax makes shells interpret escape characters.
# We promote real newlines to be their escaped counterparts, since
# copy+pasting things with newlines is meh.
return "$" + _single_arg_quote(arg).replace('\\', '\\\\').replace('\n', '\\n')
def _print_step(execution_log, step):
"""Prints the step command and relevant metadata.
Intended to be similar to the information that Buildbot prints at the
beginning of each non-annotator step.
"""
assert isinstance(step, Step), 'expected Step, got {}'.format(step)
execution_log.write_line('Executing command [')
for arg in step.cmd:
execution_log.write_line(' %r,' % arg)
execution_log.write_line(']')
# Apparently some recipes (I think mostly test recipes) pass commands whose
# arguments contain literal newlines (hence the newline replacement bit).
#
# TODO(iannucci): Make this illegal?
execution_log.write_line(
'escaped for shell: %s'
% ' '.join(_shell_quote(arg) for arg in step.cmd).replace('\n', '\\n'))
execution_log.write_line('in dir ' + step.cwd)
# Technically very soon _before_ the step runs, but should be insignificant.
# Displayed in two formats because ISO format is easy to read but the deadline
# that may show inside the LUCI_CONTEXT dict in a few lines will be a
# timestamp.
now = datetime.datetime.now()
execution_log.write_line(f'at time {now.isoformat()} ({now.timestamp()})')
# Some LUCI_CONTEXT sections may contain secrets; explicitly allow the
# sections we know are safe.
luci_context_allowed = ['realm', 'luciexe', 'deadline']
luci_context_header_printed = False
for section in luci_context_allowed:
data = step.luci_context.get(section, None)
if data is not None:
if not luci_context_header_printed:
luci_context_header_printed = True
execution_log.write_line('LUCI_CONTEXT:')
execution_log.write_line(
' %r: %r' % (section, jsonpb.MessageToDict(data)))
# TODO(iannucci): print the DIFF against the original environment
execution_log.write_line('full environment:')
for key, value in sorted(step.env.items()):
execution_log.write_line(' %s: %s' % (key, value.replace('\n', '\\n')))
execution_log.write_line('')
def _log_crash(stream_engine, crash_location):
# Pipe is reserved for step names, but can show up when crashing in internal
# recipe engine functions which take the step names. Replace it with "<PIPE>"
# so we don't double-crash when trying to report an actual problem.
name = 'RECIPE CRASH (%s)' % (crash_location.replace("|", "<PIPE>"),)
with stream_engine.new_step_stream((name,), False) as stream:
stream.mark_running()
stream.set_step_status('EXCEPTION', had_timeout=False)
stream.write_line('The recipe has crashed at point %r!' % crash_location)
stream.write_line('')
_, exc, _ = sys.exc_info()
for line in traceback.format_exception(exc):
for part in line.splitlines():
stream.write_line(part)