| # Copyright 2016 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """An interface to git-cl. |
| |
| The git-cl tool is responsible for communicating with Gerrit and Buildbucket to |
| manage changelists and try jobs associated with them. |
| """ |
| |
| import collections |
| import enum |
| import logging |
| import re |
| from typing import Mapping, NamedTuple, Optional, Set |
| |
| from blinkpy.common.checkout.git import Git |
| from blinkpy.common.net.results_fetcher import filter_latest_builds |
| from blinkpy.common.net.rpc import Build, BuildStatus, BuildbucketClient |
| from blinkpy.common.system.executive import ScriptError |
| |
| _log = logging.getLogger(__name__) |
| |
| |
| BuildStatuses = Mapping[Build, BuildStatus] |
| |
| |
| # TODO(crbug.com/41483974): Replace `issue_number` and `patchset` paired |
| # arguments in `GitCL.*` with this more meaningful type. |
| class CLRevisionID(NamedTuple): |
| """An identifier for a Gerrit CL patchset.""" |
| issue: int |
| patchset: Optional[int] = None |
| |
| def __str__(self) -> str: |
| base_url = f'https://crrev.com/c/{self.issue}' |
| return f'{base_url}/{self.patchset}' if self.patchset else base_url |
| |
| |
| class CLStatus(enum.Enum): |
| """A "best effort status" of a CL [0]. |
| |
| [0]: https://chromium.googlesource.com/chromium/tools/depot_tools/+/85e409e/git_cl.py#2397 |
| """ |
| ERROR = 'error' |
| UNSENT = 'unsent' |
| WAITING = 'waiting' |
| REPLY = 'reply' |
| LGTM = 'lgtm' |
| DRY_RUN = 'dry-run' |
| COMMIT = 'commit' |
| CLOSED = 'closed' |
| |
| |
| class CLSummary(NamedTuple): |
| """The current status of a particular CL and its associated builds. |
| |
| It contains both the CL's status as reported by `git-cl status' as well as |
| a mapping of Build objects to BuildStatus objects. |
| """ |
| status: CLStatus |
| try_job_results: BuildStatuses |
| |
| |
| class GitCL: |
| |
| def __init__(self, host, cwd=None, bb_client=None): |
| self._host = host |
| self.bb_client = bb_client or BuildbucketClient.from_host(host) |
| self._cwd = cwd |
| self._git_executable_name = Git.find_executable_name( |
| host.executive, host.platform) |
| |
| def run(self, args): |
| """Runs git-cl with the given arguments and returns the output. |
| |
| Args: |
| args: A list of arguments passed to `git cl`. |
| |
| Returns: |
| A string (the output from git-cl). |
| """ |
| command = [self._git_executable_name, 'cl'] + args |
| # Suppress the stderr of git-cl because git-cl will show a warning when |
| # running on Swarming bots with local git cache. |
| return self._host.executive.run_command( |
| command, cwd=self._cwd, stderr=self._host.executive.PIPE) |
| |
| def close(self, issue: Optional[int] = None): |
| command = ['set-close'] |
| if issue: |
| command.append(f'--issue={issue}') |
| self.run(command) |
| |
| def trigger_try_jobs(self, builders, bucket=None): |
| """Triggers try jobs on the given builders. |
| |
| Args: |
| builder: A list of builder names. |
| bucket: When specified, all jobs are triggered to be in this bucket |
| (instead of the configured or default buckets). |
| """ |
| if bucket: |
| builders_by_bucket = {bucket: builders} |
| else: |
| builders_by_bucket = self._group_builders_by_bucket(builders) |
| # Sort both buckets and builders to ensure stable unit tests. |
| for bucket in sorted(builders_by_bucket): |
| command = ['try'] |
| # Buckets are required by `git cl try`. When no bucket is specified, |
| # use the default bucket. |
| command.extend(['-B', bucket or 'luci.chromium.try']) |
| for builder in sorted(builders_by_bucket[bucket]): |
| command.extend(['-b', builder]) |
| self.run(command) |
| |
| def _group_builders_by_bucket(self, builders): |
| builders_by_bucket = collections.defaultdict(list) |
| for builder in builders: |
| bucket = self._host.builders.bucket_for_builder(builder) |
| builders_by_bucket[bucket].append(builder) |
| return dict(builders_by_bucket) |
| |
| def get_issue_number(self) -> int | None: |
| """Parse the issue number, if available.""" |
| # Expected output of git cl issue looks like: |
| # "<Optional message> Issue number: 1234 (<url>)". |
| # Note: git cl gets the number from local git config, e.g. |
| # by running `git config branch.<branchname>.gerritissue`. |
| try: |
| output = self.run(['issue']).split() |
| if 'number:' in output: |
| return int(output[output.index('number:') + 1]) |
| except (ScriptError, ValueError): |
| pass |
| # `git cl issue` is internally implemented with `git config`, so it |
| # won't work in non-Git environments like cogfs. |
| return None |
| |
| def get_cl_status(self, issue: Optional[int] = None) -> Optional[CLStatus]: |
| """Get the status of a CL. |
| |
| Arguments: |
| issue: The issue number, or `None` for the current issue. |
| |
| Returns: |
| The status of the CL, or `None` if no current issue is set. |
| """ |
| command = ['status', '--field=status'] |
| if issue: |
| command.append(f'--issue={issue}') |
| raw_status = self.run(command).strip().lower() |
| return None if raw_status == 'none' else CLStatus(raw_status) |
| |
| def _get_latest_patchset(self): |
| return self.run(['status', '--field=patch']).strip() |
| |
| def wait_for_try_jobs(self, |
| poll_delay_seconds=10 * 60, |
| timeout_seconds=120 * 60, |
| cq_only=False): |
| """Waits until all try jobs are finished and returns results, or None. |
| |
| This function can also be interrupted if the corresponding CL is |
| closed while the try jobs are still running. |
| |
| Returns: |
| None if a timeout occurs, a CLSummary tuple otherwise. |
| """ |
| |
| def finished_try_job_results_or_none(): |
| cl_status = self.get_cl_status() |
| _log.debug(f'Fetched CL status: {cl_status.value}') |
| issue_number = self.get_issue_number() |
| try_job_results = self.latest_try_jobs( |
| issue_number, cq_only=cq_only) |
| if (cl_status is CLStatus.CLOSED or |
| (try_job_results and self.all_finished(try_job_results))): |
| return CLSummary(status=cl_status, |
| try_job_results=try_job_results) |
| return None |
| |
| return self._wait_for( |
| finished_try_job_results_or_none, |
| poll_delay_seconds, |
| timeout_seconds, |
| message=' for try jobs') |
| |
| def wait_for_closed_status( |
| self, |
| poll_delay_seconds: float = 2 * 60, |
| timeout_seconds: float = 30 * 60, |
| issue: Optional[int] = None, |
| start: Optional[float] = None) -> Optional[CLStatus]: |
| """Waits until git cl reports that the current CL is closed.""" |
| |
| def closed_status_or_none(): |
| status = self.get_cl_status(issue) |
| _log.debug('CL status is: %s', status) |
| if status is CLStatus.CLOSED: |
| self._host.print_('CL is closed.') |
| return status |
| return None |
| |
| return self._wait_for(closed_status_or_none, |
| poll_delay_seconds, |
| timeout_seconds, |
| message=' for closed status', |
| start=start) |
| |
| def _wait_for(self, |
| poll_function, |
| poll_delay_seconds, |
| timeout_seconds, |
| message='', |
| start: Optional[float] = None): |
| """Waits for the given poll_function to return something other than None. |
| |
| Args: |
| poll_function: A function with no args that returns something |
| when ready, or None when not ready. |
| poll_delay_seconds: Time to wait between fetching results. |
| timeout_seconds: Time to wait before aborting. |
| message: Message to print indicate what is being waited for. |
| start: A UNIX-epoch timestamp that each polled duration should be |
| calculated against. Defaults to the time of the call. This |
| method will poll at least once, so passing an already timed-out |
| start is safe. |
| |
| Returns: |
| The value returned by poll_function, or None on timeout. |
| """ |
| if start is None: |
| start = self._host.time() |
| self._host.print_('Waiting%s, timeout: %d seconds.' % |
| (message, timeout_seconds)) |
| while (self._host.time() - start) < timeout_seconds: |
| # TODO(crbug.com/40631540): The poll delay is actually twice what is |
| # documented because we `sleep()` twice per loop. Get rid of one and |
| # fix the tests that broke. |
| self._host.sleep(poll_delay_seconds) |
| value = poll_function() |
| if value is not None: |
| return value |
| self._host.print_('Waiting%s. %d seconds passed.' % |
| (message, self._host.time() - start)) |
| self._host.sleep(poll_delay_seconds) |
| self._host.print_('Timed out waiting%s.' % message) |
| # Poll one more time in case the result recently changed. |
| return poll_function() |
| |
| def latest_try_jobs(self, |
| issue_number=None, |
| builder_names=None, |
| cq_only=False, |
| patchset=None): |
| """Fetches a dict of Build to BuildStatus for the latest try jobs. |
| |
| This variant fetches try job data from buildbucket directly. |
| |
| This includes jobs that are not yet finished and builds with infra |
| failures, so if a build is in this list, that doesn't guarantee that |
| there are results. |
| |
| Args: |
| issue_number: The git cl/issue number we're working with. |
| builder_names: Optional list of builders used to filter results. |
| cq_only: If True, only include CQ jobs. |
| patchset: If given, use this patchset instead of the latest. |
| |
| Returns: |
| A dict mapping Build objects to BuildStatus objects, with |
| only the latest jobs included. |
| """ |
| if not issue_number: |
| issue_number = self.get_issue_number() |
| return self.filter_latest( |
| self.try_job_results( |
| issue_number, |
| builder_names, |
| cq_only=cq_only, |
| patchset=patchset)) |
| |
| @staticmethod |
| def filter_latest(try_results): |
| """Returns the latest entries from from a Build to BuildStatus dict.""" |
| if try_results is None: |
| return None |
| latest_builds = filter_latest_builds(try_results.keys()) |
| return {b: s for b, s in try_results.items() if b in latest_builds} |
| |
| @staticmethod |
| def filter_incomplete(build_statuses: BuildStatuses) -> Set[Build]: |
| incomplete_statuses = {BuildStatus.INFRA_FAILURE, BuildStatus.CANCELED} |
| return { |
| build |
| for build, status in build_statuses.items() |
| if status in incomplete_statuses |
| } |
| |
| def try_job_results(self, |
| issue_number=None, |
| builder_names=None, |
| cq_only=False, |
| patchset=None): |
| """Returns a dict mapping Build objects to BuildStatus objects.""" |
| if not issue_number: |
| issue_number = self.get_issue_number() |
| builds = self.fetch_raw_try_job_results(issue_number, patchset) |
| build_to_status = {} |
| for build in builds: |
| builder_name = build['builder']['builder'] |
| if builder_names and builder_name not in builder_names: |
| continue |
| is_cq = 'tags' in build and { |
| 'key': 'user_agent', |
| 'value': 'cq' |
| } in build['tags'] |
| is_experimental = 'tags' in build and { |
| 'key': 'cq_experimental', |
| 'value': 'true' |
| } in build['tags'] |
| if cq_only and not (is_cq and not is_experimental): |
| continue |
| build_number = build.get('number') |
| status = build.get('status') |
| build_id = build.get('id') |
| build_to_status[Build(builder_name, build_number, |
| build_id)] = BuildStatus[status] |
| return build_to_status |
| |
| def fetch_raw_try_job_results(self, issue_number, patchset=None): |
| """Gets try job results for the specified CL from buildbucket. |
| |
| This uses the SearchBuilds rpc format specified in |
| https://cs.chromium.org/chromium/infra/go/src/go.chromium.org/luci/buildbucket/proto/rpc.proto |
| |
| The response is a list of dicts of the following form: |
| [ |
| { |
| "status": <status> |
| "builder": { |
| "builder": <builder_name> |
| }, |
| "number": <build_number>, |
| "tags": [ |
| { |
| "key": <tag key> |
| "value": <tag value> |
| }, |
| ... more tags |
| ] |
| }, |
| ... more builds, |
| ] |
| |
| This method returns the JSON representation of the above response. |
| """ |
| if not patchset: |
| patchset = self._get_latest_patchset() |
| predicate = { |
| 'gerritChanges': [{ |
| 'host': 'chromium-review.googlesource.com', |
| 'project': 'chromium/src', |
| 'change': issue_number, |
| 'patchset': patchset, |
| }], |
| } |
| return self.bb_client.search_builds( |
| predicate, ['builder.builder', 'status', 'tags', 'number', 'id']) |
| |
| @staticmethod |
| def _build(result_dict): |
| """Converts a parsed try result dict to a Build object.""" |
| builder_name = result_dict['builder_name'] |
| url = result_dict['url'] |
| if url is None: |
| return Build(builder_name, None) |
| |
| # LUCI jobs |
| # TODO(martiniss): Switch to using build number once `git cl |
| # try-results` uses buildbucket v2 API. |
| tags = result_dict.get('tags', []) |
| for tag in tags: |
| if tag.startswith("build_address:"): |
| build_number = tag.split('/')[-1] |
| return Build(builder_name, int(build_number)) |
| |
| # BuildBot jobs |
| match = re.match(r'.*/builds/(\d+)/?$', url) |
| if match: |
| build_number = match.group(1) |
| return Build(builder_name, int(build_number)) |
| |
| # Swarming tasks |
| match = re.match(r'.*/task/([0-9a-f]+)(/?|\?.*)$', url) |
| assert match, '%s did not match expected format' % url |
| task_id = match.group(1) |
| return Build(builder_name, task_id) |
| |
| @staticmethod |
| def all_finished(try_results): |
| return all(s in BuildStatus.COMPLETED for s in try_results.values()) |
| |
| @staticmethod |
| def all_success(try_results): |
| return all(s is BuildStatus.SUCCESS for s in try_results.values()) |
| |
| @staticmethod |
| def some_failed(try_results): |
| return any(s & BuildStatus.FAILURE for s in try_results.values()) |