| # -*- coding: utf-8 -*- |
| # Copyright 2019 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| import json |
| import logging |
| import traceback |
| import zlib |
| |
| from google.protobuf import json_format as jsonpb |
| from google.protobuf.internal.containers import RepeatedCompositeFieldContainer |
| from google.protobuf.struct_pb2 import Struct |
| |
| import attr |
| import gevent |
| |
| from PB.go.chromium.org.luci.buildbucket.proto.build import Build |
| from PB.go.chromium.org.luci.buildbucket.proto.step import Step |
| from PB.go.chromium.org.luci.buildbucket.proto import common |
| |
| from ...recipe_api import InfraFailure, StepFailure |
| from ...third_party import logdog |
| |
| from ..attr_util import attr_type |
| |
| from . import StreamEngine |
| |
| |
| LOG = logging.getLogger(__name__) |
| |
| |
| @attr.s |
| class LUCIStepMarkdownWriter: |
| _step_text = attr.ib(default='') |
| def add_step_text(self, text): |
| self._step_text += text |
| |
| _step_summary_text = attr.ib(default='') |
| def add_step_summary_text(self, text): |
| self._step_summary_text += text |
| |
| _step_links = attr.ib(factory=list) |
| def add_step_link(self, linkname, link): |
| self._step_links.append((linkname, link)) |
| |
| def render(self): |
| escape_parens = lambda link: link.replace('(', r'\(').replace(')', r'\)') |
| |
| paragraphs = [] |
| |
| if self._step_summary_text: |
| paragraphs.append(self._step_summary_text) |
| |
| if self._step_text: |
| paragraphs.append(self._step_text) |
| |
| if self._step_links: |
| paragraphs.append( |
| '\n'.join( |
| ' * [%s](%s)' % (name, escape_parens(link)) |
| for name, link in self._step_links)) |
| |
| return '\n\n'.join(paragraphs) |
| |
| |
| @attr.s |
| class LUCILogStream(StreamEngine.Stream): |
| """Implementation of StreamEngine.Stream for luciexe mode. |
| |
| It's a very thin wrapper around a LogDog text stream.""" |
| |
| # pylint: disable=protected-access |
| _stream = attr.ib(validator=attr_type( |
| (type(None), logdog.stream.StreamClient._BasicStream))) |
| |
| def fileno(self): |
| """Returns underlying logdog file descriptor. |
| |
| Used by subprocess.Popen when redirecting a subprocess output to this |
| stream. |
| """ |
| return self._stream.fileno() |
| |
| def write_line(self, line): |
| """Writes a single line to the underlying stream.""" |
| self._stream.write(line + '\n') |
| |
| def close(self): |
| """Closes the stream. No more writes allowed by the current process.""" |
| if self.closed: |
| return |
| self._stream.close() |
| self._stream = None |
| |
| @property |
| def closed(self): |
| """Returns True if the stream has been closed.""" |
| return self._stream is None |
| |
| |
| @attr.s |
| class LUCIStepStream(StreamEngine.StepStream): |
| """Implementation of StreamEngine.StepStream for luciexe mode. |
| |
| Holds a stdout and stderr file (opened lazily), as well as a Step protobuf |
| message which is part of this execution's Build message. |
| |
| Has a ref to the global Build.output.properties field. |
| |
| Presentation changes alters either the Step or the properties and calls |
| a global callback to write these changes to the logdog Build proto stream |
| (owned by the LUCIStreamEngine). |
| |
| Handles uniqification of all logdog stream names in this process. |
| """ |
| _step = attr.ib(validator=attr_type(Step)) |
| _properties = attr.ib(validator=attr_type(Struct)) |
| _build_tags = attr.ib(validator=attr_type(RepeatedCompositeFieldContainer)) |
| _tags = attr.ib(validator=attr_type(RepeatedCompositeFieldContainer)) |
| _output_gitiles_commit = attr.ib(validator=attr_type(common.GitilesCommit)) |
| # change_cb is a void function which causes the LUCIStreamEngine to emit the |
| # current Build proto message. This must be called after any changes to: |
| # * self._step |
| # * self._properties |
| # |
| # TODO(iannucci): change _change_cb to a context-manager for step, i.e. |
| # with self._step as pb: |
| # # tweak pb |
| _change_cb = attr.ib() |
| |
| # The Butler StreamClient. Used to generate logs for individual steps. |
| _bsc = attr.ib(validator=attr_type(logdog.stream.StreamClient)) |
| |
| # File-like objects for stdout/stderr (logdog streams). |
| # |
| # stdhandle is a single handle which will either point to stdout or stderr. In |
| # the case where the step is using logdog for stdout and stderr, they're |
| # currently merged together because people are used to seeing the output |
| # interleaved. |
| # |
| # TODO(iannucci) Once logdog/resultdb supports viewing muxed streams again, |
| # separate stdout and stderr into separate streams. |
| _std_handle = attr.ib(default=None) |
| _logging = attr.ib(default=None) |
| |
| # If True, after initialization, allocate a log stream '$build.proto' that |
| # points to the 'build.proto' stream of the luciexe this step launches and |
| # set it as step.merge_build.from_logdog_stream. Host application will treat |
| # this step as merge step according to luciexe protocol. |
| # |
| # If this is set to 'legacy', then legacy_global_namespace in the merge_step |
| # will be set to True. |
| # |
| # See: [luciexe recursive invocation](https://pkg.go.dev/go.chromium.org/luci/luciexe?tab=doc#hdr-Recursive_Invocation) |
| _merge_step = attr.ib(default=False, |
| validator=attr.validators.in_((False, True, 'legacy'))) |
| |
| _back_compat_markdown = attr.ib(factory=LUCIStepMarkdownWriter) |
| |
| # A global set of created logdog stream names for all steps. Used to |
| # deduplicate log stream names, since the logdog stream name alphabet is |
| # a subset of the alphabet allowed for log names in build.proto. |
| # |
| # Contains the entire logstream name as seen by logdog (e.g. |
| # parent/child/logname). This is because we need to deduplicate log streams as |
| # logdog sees them (i.e. after we've 'normalized' whatever real name the user |
| # has given us for the stream). As long as the user gives us non-exotic names, |
| # these will be generally readable. If the user gives us junk with e.g. emoji, |
| # we may see conflicts (since multiple parents and steps may normalize to the |
| # same logdog stream name). |
| # |
| # TODO(iannucci): deduplicate hierarchically so that "💩 step" and "🎉 step" |
| # will do their deduplication at the step level, rather than at the leaf log |
| # level. e.g. right now we'll end up with: |
| # |
| # l___step/stdout "💩 step/stdout" |
| # l___step/stderr "💩 step/stderr" |
| # l___step/stdout_0 "🎉 step/stdout" |
| # l___step/stderr_0 "🎉 step/stderr" |
| # l___step/stdout_1 "🍔 step/stdout" |
| # l___step/stderr_1 "🍔 step/stderr" |
| # |
| # But we should really have: |
| # |
| # l___step/stdout "💩 step/stdout" |
| # l___step/stderr "💩 step/stderr" |
| # l___step_0/stdout "🎉 step/stdout" |
| # l___step_0/stderr "🎉 step/stderr" |
| # l___step_1/stdout "🍔 step/stdout" |
| # l___step_1/stderr "🍔 step/stderr" |
| _CREATED_LOGS = set() |
| |
| def __attrs_post_init__(self): |
| self._stream_namespace = '/'.join( |
| logdog.streamname.normalize_segment(seg, 'l') |
| for seg in self._step.name.split('|') |
| ) |
| |
| if self._merge_step: |
| stream_name = '/'.join((self._stream_namespace, 'u', 'build.proto')) |
| if stream_name in self._CREATED_LOGS: |
| raise ValueError("Duplicated build.proto stream %s" % (stream_name,)) |
| logdog.streamname.validate_stream_name(stream_name) |
| self._CREATED_LOGS.add(stream_name) |
| |
| self._step.merge_build.from_logdog_stream = stream_name |
| if self._merge_step == 'legacy': |
| self._step.merge_build.legacy_global_namespace = True |
| self._change_cb() |
| |
| def new_log_stream(self, log_name): |
| """Add a new log with name `log_name` to this step. |
| |
| Will mangle `log_name` to produce a valid and non-conflicting logdog stream |
| name. |
| |
| Returns file-like text stream object. This file-like object includes |
| fileno() and so is suitable for use with subprocess IO redirection.""" |
| try: |
| if log_name == 'logging': |
| logstream = self.logging |
| if logstream.closed: |
| raise ValueError('Attempting to open closed logstream %r' % log_name) |
| return logstream |
| |
| return self._new_log_stream(log_name) |
| except: |
| LOG.exception('new_log_stream %r: %r', self._step.name, log_name) |
| raise |
| |
| def _new_log_stream(self, log_name): |
| dedup_idx = 0 |
| base_flattened_name = '/'.join(( |
| self._stream_namespace, |
| logdog.streamname.normalize_segment(log_name, 'l') |
| )) |
| flat_name = base_flattened_name |
| while flat_name in self._CREATED_LOGS: |
| flat_name = logdog.streamname.normalize( |
| base_flattened_name + ('_%d' % dedup_idx), 'l') |
| dedup_idx += 1 |
| |
| logdog.streamname.validate_stream_name(flat_name) |
| log_stream = self._bsc.open_text(flat_name) |
| self._CREATED_LOGS.add(flat_name) |
| |
| log = self._step.logs.add() |
| log.name = log_name |
| log.url = flat_name |
| self._change_cb() |
| return LUCILogStream(log_stream) |
| |
| def append_log(self, log): |
| self._step.logs.add().CopyFrom(log) |
| self._change_cb() |
| |
| def mark_running(self): |
| if self._step.status == common.SCHEDULED: |
| self._step.summary_markdown = "" |
| self._step.status = common.STARTED |
| self._step.start_time.GetCurrentTime() |
| self._change_cb() |
| |
| def set_summary_markdown(self, text): |
| self._step.summary_markdown = text |
| self._change_cb() |
| |
| def add_step_text(self, text): |
| self._back_compat_markdown.add_step_text(text) |
| |
| def add_step_summary_text(self, text): |
| self._back_compat_markdown.add_step_summary_text(text) |
| |
| def add_step_link(self, name, url): |
| self._back_compat_markdown.add_step_link(name, url) |
| |
| def set_step_status(self, status, had_timeout): |
| _ = had_timeout |
| self._step.status = { |
| 'SUCCESS': common.SUCCESS, |
| 'FAILURE': common.FAILURE, |
| 'WARNING': common.SUCCESS, # TODO(crbug.com/854099): support WARNING |
| 'EXCEPTION': common.INFRA_FAILURE, |
| 'CANCELED': common.CANCELED, |
| }[status] |
| # TODO(iannucci): set timeout bit here |
| |
| def set_build_property(self, key, value): |
| # Intercept legacy properties; These were used late-stage in the |
| # @@@annotator@@@ era in lieu of adding additional annotator commands. |
| # |
| # Once annotations are gone, these should be replaced with proper API |
| # (rather than this tunnel via set_build_property). |
| if key == '$recipe_engine/buildbucket/runtime-tags': |
| for k, vals in json.loads(value).items(): |
| self._build_tags.extend( |
| [common.StringPair(key=k, value=v) for v in set(vals) |
| if common.StringPair(key=k, value=v) not in self._build_tags]) |
| elif key == '$recipe_engine/buildbucket/output_gitiles_commit': |
| self._output_gitiles_commit.CopyFrom( |
| jsonpb.Parse(value, common.GitilesCommit())) |
| else: |
| self._properties[key] = json.loads(value) |
| |
| self._change_cb() |
| |
| def set_step_tag(self, key, value): |
| self._tags.add(key=key, value=value) |
| self._change_cb() |
| |
| @property |
| def logging(self): |
| """Returns an open text stream for this step's logging stream.""" |
| if not self._logging: |
| self._logging = self._new_log_stream('logging') |
| return self._logging |
| |
| def open_std_handles(self, stdout=False, stderr=False): |
| if self._std_handle is not None: |
| LOG.exception('open_std_handles called twice: %r', self._step.name) |
| raise ValueError( |
| 'open_std_handles may only be called once: %r', self._step.name) |
| |
| ret = {} |
| if not stdout and not stderr: |
| return ret |
| |
| if not stdout: |
| self._std_handle = self.new_log_stream('stderr') |
| ret['stderr'] = self._std_handle |
| return ret |
| |
| self._std_handle = self.new_log_stream('stdout') |
| ret['stdout'] = self._std_handle |
| if stderr: |
| ret['stderr'] = self._std_handle |
| return ret |
| |
| @property |
| def env_vars(self): |
| logdog_namespace = self.user_namespace |
| if self._bsc.namespace: |
| logdog_namespace = '/'.join((self._bsc.namespace, logdog_namespace)) |
| return {'LOGDOG_NAMESPACE': logdog_namespace} |
| |
| @property |
| def user_namespace(self): |
| return '/'.join((self._stream_namespace, 'u')) |
| |
| def write_line(self, line): |
| """Differs from our @@@annotator@@@ brethren and puts logging data to |
| an independent stream.""" |
| # TODO(iannucci): have step_runner log the step metadata as a protobuf |
| # and/or put it in the Step proto message. |
| return self.logging.write_line(line) |
| |
| def close(self): |
| # TODO(iannucci): close ALL log streams, not just stdout/stderr/logging |
| # TODO(iannucci): this can actually double-close with subprocess runner... |
| # clean all of this up once annotations are gone. |
| if self._std_handle: |
| self._std_handle.close() |
| # TODO(iannucci): improve UI modification interface to immediately send UI |
| # changes when they happen. |
| self._step.end_time.GetCurrentTime() |
| # Python2 doesn't guarantee monotonic clock. So for quick step, it is |
| # possible that end_time < start_time. Force to set end_time = start_time |
| # in that case. |
| if self._step.end_time.ToDatetime() < self._step.start_time.ToDatetime(): |
| self._step.end_time.CopyFrom(self._step.start_time) |
| self._step.summary_markdown = self._back_compat_markdown.render() |
| if self._step.status == common.STARTED: |
| self._step.status = common.SUCCESS |
| self._change_cb() |
| |
| |
| @attr.s |
| class LUCIStreamEngine(StreamEngine): |
| """Implementation of StreamEngine for luciexe mode. |
| |
| Holds a LogDog datagram stream for Build messages and manages writes to this |
| stream. |
| """ |
| |
| # This causes the 'build.proto' datagram stream to export as JSONPB instead of |
| # Binary PB. Only used for debugging. `luciexe` protocol does not support |
| # JSONPB. |
| _export_build_as_json = attr.ib(validator=attr_type(bool)) |
| |
| # The current Build message. This is mutated and then sent with the _send |
| # function (seen as _change_cb in other classes in this file). |
| _build_proto = attr.ib(factory=lambda: Build( |
| status=common.STARTED, |
| output=dict(status=common.STARTED), |
| )) |
| |
| # The Butler StreamClient. Used to generate logs for individual steps. |
| _bsc = attr.ib( |
| validator=attr_type(logdog.stream.StreamClient), |
| factory=lambda: logdog.bootstrap.ButlerBootstrap.probe().stream_client(), |
| ) |
| |
| # The Build message datagram stream. |
| _build_stream = attr.ib() |
| @_build_stream.default |
| def _build_stream_default(self): |
| content_enc = 'jsonpb' if self._export_build_as_json else 'proto' |
| content_type = 'application/luci+%s; message=buildbucket.v2.Build' % ( |
| content_enc,) |
| if content_enc == 'proto': |
| content_type += '; encoding=zlib' |
| return self._bsc.open_datagram('build.proto', content_type=content_type) |
| |
| _send_event = attr.ib(default=gevent.event.Event()) |
| _sender_die = attr.ib(default=False) |
| |
| _sender = attr.ib() |
| @_sender.default |
| def _sender_default(self): |
| def _do_send(): |
| self._build_stream.send( |
| jsonpb.MessageToJson(self._build_proto, |
| preserving_proto_field_name=True).encode('utf-8') |
| if self._export_build_as_json else |
| zlib.compress(self._build_proto.SerializeToString()) |
| ) |
| |
| def _send_fn(): |
| while not self._sender_die: |
| # wait until SOMEONE wants to send something. |
| self._send_event.wait() |
| if self._sender_die: |
| break |
| |
| # Then wait for a second, in case other updates come in. |
| gevent.sleep(1) |
| |
| # atomically: |
| # clear the event |
| # serialize the current build proto state (part of _do_send) |
| # then send the serialized data asynchronously. |
| self._send_event.clear() |
| _do_send() |
| |
| # One last send before exiting to make sure all build updates are |
| # sent to logdog |
| _do_send() |
| |
| return gevent.spawn(_send_fn) |
| |
| def _send(self): |
| self._send_event.set() |
| |
| def new_step_stream(self, name_tokens, allow_subannotations, |
| merge_step=False): |
| assert not allow_subannotations, ( |
| 'Subannotations not currently supported in build.proto mode' |
| ) |
| step_pb = self._build_proto.steps.add( |
| name='|'.join(name_tokens), |
| status=common.SCHEDULED) |
| |
| ret = LUCIStepStream( |
| step_pb, self._build_proto.output.properties, self._build_proto.tags, |
| step_pb.tags, self._build_proto.output.gitiles_commit, |
| self._send, self._bsc, merge_step=merge_step) |
| self._send() |
| return ret |
| |
| def close(self): |
| self._sender_die = True |
| self._send() |
| self._sender.join() |
| self._build_stream.close() |
| |
| @property |
| def supports_concurrency(self): |
| return True |
| |
| def write_result(self, result): |
| self._build_proto.status = result.status |
| self._build_proto.summary_markdown = result.summary_markdown |
| self._build_proto.output.status = result.status |
| self._send() |
| |
| @property |
| def current_build_proto(self): |
| """Returns the current Build message. |
| |
| Note: Any update on the returned build before engine closes will be |
| sent to `build.proto` stream |
| """ |
| return self._build_proto |
| |
| @property |
| def was_successful(self): |
| """Used by luciexe to set the recipe engine's returncode. |
| |
| This isn't strictly necessary, but it can be helpful for debugging. |
| """ |
| return self._build_proto.status == common.SUCCESS |