blob: cf57ea6ea046430d3d5cf8ca93d282d119094f1e [file] [log] [blame]
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utilities for requesting information for a gerrit server via https.
https://gerrit-review.googlesource.com/Documentation/rest-api.html
"""
import base64
import datetime
import functools
import html.parser
import http.client
import http.cookiejar
import json
import logging
import os
import re
import socket
import sys
from typing import Any, Dict, Optional, Tuple, Union
import urllib.parse
import urllib.request
import warnings
from chromite.third_party import httplib2
from chromite.third_party.oauth2client import gce
from chromite.lib import auth
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import git
from chromite.lib import retry_util
from chromite.lib import timeout_util
_GAE_VERSION = "GAE_VERSION"
class ErrorParser(html.parser.HTMLParser):
"""Class to parse GOB error message reported as HTML.
Only data inside <div id='af-error-container'> section is retrieved from the
GOB error message. Retrieved data is processed as follows:
- newlines are removed
- each <br> tag is replaced with '\n'
- each <p> tag is replaced with '\n\n'
"""
def __init__(self) -> None:
html.parser.HTMLParser.__init__(self)
self.in_div = False
self.err_data = ""
def handle_starttag(self, tag, attrs) -> None:
tag_id = [x[1] for x in attrs if x[0] == "id"]
if tag == "div" and tag_id and tag_id[0] == "af-error-container":
self.in_div = True
return
if self.in_div:
if tag == "p":
self.err_data += "\n\n"
return
if tag == "br":
self.err_data += "\n"
return
def handle_endtag(self, tag) -> None:
if tag == "div":
self.in_div = False
def handle_data(self, data) -> None:
if self.in_div:
self.err_data += data.replace("\n", "")
def ParsedDiv(self):
return self.err_data.strip()
def error(self, message) -> None:
# Pylint correctly flags a missing abstract method, but the error is in
# Python itself. We can delete this method once we move to Python
# 3.10+. https://bugs.python.org/issue31844
pass
@functools.lru_cache
def _GetAppCredentials():
"""Returns the singleton Appengine credentials for gerrit code review."""
return gce.AppAssertionCredentials(
scope="https://www.googleapis.com/auth/gerritcodereview"
)
TRY_LIMIT = 11
SLEEP = 0.5
REQUEST_TIMEOUT_SECONDS = 120 # 2 minutes.
# Controls the transport protocol used to communicate with Gerrit servers using
# git. This is parameterized primarily to enable cros_test_lib.GerritTestCase.
GIT_PROTOCOL = "https"
# The GOB conflict errors which could be ignorable.
GOB_CONFLICT_ERRORS = (
rb"change is closed",
rb"Cannot reduce vote on labels for closed change",
)
GOB_CONFLICT_ERRORS_RE = re.compile(
rb"|".join(GOB_CONFLICT_ERRORS), re.IGNORECASE
)
GOB_ERROR_REASON_CLOSED_CHANGE = "CLOSED CHANGE"
class GOBError(Exception):
"""Exception class for errors communicating with the GOB service."""
def __init__(self, http_status=None, reason=None) -> None:
self.http_status = http_status
self.reason = reason
message = ""
if http_status is not None:
message += "(http_status): %d" % (http_status,)
if reason is not None:
message += "(reason): %s" % (reason,)
if not message:
message = "Unknown error"
super().__init__(message)
class InternalGOBError(GOBError):
"""Exception class for GOB errors with status >= 500"""
def _QueryString(param_dict, first_param=None):
"""Encodes query parameters in the key:val[+key:val...] format.
Format specified here:
https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#list-changes
"""
q = [urllib.parse.quote(first_param)] if first_param else []
q.extend(["%s:%s" % (key, val) for key, val in param_dict.items()])
return "+".join(q)
def GetCookies(host, path, cookie_paths=None):
"""Returns cookies that should be set on a request.
Used by CreateHttpReq for any requests that do not already specify a Cookie
header. All requests made by this library are HTTPS.
Args:
host: The hostname of the Gerrit service.
path: The path on the Gerrit service, already including /a/ if
applicable.
cookie_paths: Files to look in for cookies. Defaults to looking in the
standard places where GoB places cookies.
Returns:
A dict of cookie name to value, with no URL encoding applied.
"""
cookies = {}
if cookie_paths is None:
cookie_paths = (constants.GOB_COOKIE_PATH, constants.GITCOOKIES_PATH)
for cookie_path in cookie_paths:
if os.path.isfile(cookie_path):
with open(cookie_path, encoding="utf-8") as f:
for line in f:
fields = line.strip().split("\t")
if line.strip().startswith("#") or len(fields) != 7:
continue
domain, xpath, key, value = (
fields[0],
fields[2],
fields[5],
fields[6],
)
if http.cookiejar.domain_match(
host, domain
) and path.startswith(xpath):
cookies[key] = value
return cookies
def CreateHttpReq(
host: str,
path: str,
reqtype: Optional[str] = "GET",
headers: Optional[Dict[str, str]] = None,
body: Optional[Union[bytes, str]] = None,
) -> urllib.request.Request:
"""Returns a https connection request object to a gerrit service."""
path = "/a/" + path.lstrip("/")
headers = headers or {}
if _InAppengine():
# TODO(phobbs) how can we choose to only run this on GCE / AppEngine?
credentials = _GetAppCredentials()
try:
headers.setdefault(
"Authorization",
"Bearer %s" % credentials.get_access_token().access_token,
)
except gce.HttpAccessTokenRefreshError as e:
logging.debug("Failed to retrieve gce access token: %s", e)
# Not in an Appengine or GCE environment.
except httplib2.ServerNotFoundError:
pass
cookies = GetCookies(host, path)
if "Cookie" not in headers and cookies:
headers["Cookie"] = "; ".join(
"%s=%s" % (n, v) for n, v in cookies.items()
)
elif "Authorization" not in headers:
try:
git_creds = auth.GitCreds()
except auth.AccessTokenError:
git_creds = None
if git_creds:
headers.setdefault("Authorization", "Bearer %s" % git_creds)
else:
logging.debug(
"No gitcookies file, Appengine credentials, or LUCI git creds "
"found."
)
if "User-Agent" not in headers:
# We may not be in a git repository.
try:
version = git.GetGitRepoRevision(
os.path.dirname(os.path.realpath(__file__))
)
except cros_build_lib.RunCommandError:
version = "unknown"
headers["User-Agent"] = " ".join(
(
"chromite.lib.gob_util",
os.path.basename(sys.argv[0]),
version,
)
)
if body:
body = json.JSONEncoder().encode(body).encode("utf-8")
headers.setdefault("Content-Type", "application/json")
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug("%s https://%s%s", reqtype, host, path)
for key, val in headers.items():
if key.lower() in ("authorization", "cookie"):
val = "HIDDEN"
logging.debug("%s: %s", key, val)
if body:
logging.debug(body)
return urllib.request.Request(
f"https://{host}{path}", data=body, headers=headers, method=reqtype
)
def _InAppengine():
"""Returns whether we're in the Appengine environment."""
return _GAE_VERSION in os.environ
def FetchUrl(
host,
path,
reqtype="GET",
headers=None,
body=None,
expect: Union[int, Tuple[int]] = 200,
ignore_404=True,
):
"""Fetches the http response from the specified URL.
Args:
host: The hostname of the Gerrit service.
path: The path on the Gerrit service. This will be prefixed with '/a'
automatically.
reqtype: The request type. Can be GET or POST.
headers: A mapping of extra HTTP headers to pass in with the request.
body: A string of data to send after the headers are finished.
expect: The status code(s) to expect as "success". For some requests,
Gerrit will return 204 to confirm proper processing of the request.
ignore_404: For many requests, gerrit-on-borg will return 404 if the
request doesn't match the database contents. In most such cases, we
want the API to return None rather than raise an Exception.
Returns:
The connection's reply, as bytes.
"""
if isinstance(expect, int):
expect = (expect,)
@timeout_util.TimeoutDecorator(REQUEST_TIMEOUT_SECONDS)
def _FetchUrlHelper():
err_prefix = (
f"A transient error occurred while querying {host}/{path}\n"
)
try:
_request = CreateHttpReq(
host, path, reqtype=reqtype, headers=headers, body=body
)
with urllib.request.urlopen(_request) as response:
return _ProcessResponse(response, err_prefix)
except urllib.error.HTTPError as e:
# Any non-HTTP/2xx status is thrown as an exception even though it's
# the response. We handle the actual HTTP codes below.
return _ProcessResponse(e, err_prefix)
except socket.error as ex:
logging.warning("%s%s", err_prefix, str(ex))
raise
def _ProcessResponse(
response: http.client.HTTPResponse, err_prefix: str
) -> bytes:
"""Process the Response object.
Args:
response: the url response object to parse.
err_prefix: the prefix to use.
Returns:
The server's reply, as bytes.
Raises:
GOBError with the failure status code and reason.
"""
# Normal/good responses.
response_body = response.read()
if response.status == 404 and ignore_404:
return b""
elif response.status in expect:
return response_body
# Bad responses.
logging.debug("response msg:\n%s", response.msg)
http_version = "HTTP/%s" % ("1.1" if response.version == 11 else "1.0")
msg = "%s %s %s\n%s %d %s\nResponse body: %r" % (
reqtype,
f"https://{host}/{path}",
http_version,
http_version,
response.status,
response.reason,
response_body,
)
# Ones we can retry.
if response.status >= 500:
# A status >=500 is assumed to be a possible transient error; retry.
logging.warning("%s%s", err_prefix, msg)
raise InternalGOBError(
http_status=response.status, reason=response.reason
)
# Ones we cannot retry.
home = os.environ.get("HOME", "~")
url = "https://%s/new-password" % host
if response.status in (302, 303, 307):
err_prefix = (
"Redirect found; missing/bad %s/.gitcookies credentials or "
"permissions (0600)?\n See %s" % (home, url)
)
elif response.status in (400,):
err_prefix = (
"Permission error; talk to the admins of the GoB instance"
)
elif response.status in (401,):
err_prefix = (
"Authorization error; missing/bad %s/.gitcookies "
"credentials or permissions (0600)?\n See %s" % (home, url)
)
elif response.status in (422,):
err_prefix = "Bad request body?"
logging.warning(err_prefix)
# If GOB output contained expected error message, reduce log visibility
# of raw GOB output reported below.
ep = ErrorParser()
ep.feed(response_body.decode("utf-8"))
ep.close()
parsed_div = ep.ParsedDiv()
if parsed_div:
logging.warning("GOB Error:\n%s", parsed_div)
logging_function = logging.debug
else:
logging_function = logging.warning
logging_function(msg)
if response.status >= 400:
# The 'X-ErrorId' header is set only on >= 400 response code.
logging_function("X-ErrorId: %s", response.getheader("X-ErrorId"))
if response.status == http.client.CONFLICT:
# 409 conflict
if GOB_CONFLICT_ERRORS_RE.search(response_body):
raise GOBError(
http_status=response.status,
reason=GOB_ERROR_REASON_CLOSED_CHANGE,
)
else:
raise GOBError(
http_status=response.status, reason=response.reason
)
else:
raise GOBError(http_status=response.status, reason=response.reason)
return retry_util.RetryException(
(socket.error, InternalGOBError, timeout_util.TimeoutError),
TRY_LIMIT,
_FetchUrlHelper,
sleep=SLEEP,
backoff_factor=2,
)
def FetchUrlJson(*args, **kwargs):
"""Fetch the specified URL and parse it as JSON.
See FetchUrl for arguments.
"""
fh = FetchUrl(*args, **kwargs)
# In case ignore_404 is True, we want to return None instead of
# raising an exception.
if not fh:
return None
# The first line of the response should always be: )]}'
if not fh.startswith(b")]}'"):
raise GOBError(
http_status=200, reason="Unexpected json output: %r" % fh
)
_, _, json_data = fh.partition(b"\n")
return json.loads(json_data)
def QueryChanges(
host, param_dict, first_param=None, limit=None, o_params=None, start=None
):
"""Queries a gerrit-on-borg server for changes matching query terms.
Args:
host: The Gerrit server hostname.
param_dict: A dictionary of search parameters, as documented here:
https://gerrit-review.googlesource.com/Documentation/user-search.html
first_param: A change identifier
limit: Maximum number of results to return.
o_params: A list of additional output specifiers, as documented here:
https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#list-changes
start: Offset in the result set to start at.
Returns:
A list of json-decoded query results.
"""
# Note that no attempt is made to escape special characters; YMMV.
if not param_dict and not first_param:
raise RuntimeError("QueryChanges requires search parameters")
path = "changes/?q=%s" % _QueryString(param_dict, first_param)
if start:
path = "%s&S=%d" % (path, start)
if limit:
path = "%s&n=%d" % (path, limit)
if o_params:
path = "%s&%s" % (path, "&".join(["o=%s" % p for p in o_params]))
# Don't ignore 404; a query should always return a list, even if it's empty.
return FetchUrlJson(host, path, ignore_404=False)
def MultiQueryChanges(
host, param_dict, change_list, limit=None, o_params=None, start=None
):
"""Initiate a query composed of multiple sets of query parameters."""
if not change_list:
raise RuntimeError(
"MultiQueryChanges requires a list of change numbers/id's"
)
q = ["q=%s" % "+OR+".join(urllib.parse.quote(str(x)) for x in change_list)]
if param_dict:
q.append(_QueryString(param_dict))
if limit:
q.append("n=%d" % limit)
if start:
q.append("S=%s" % start)
if o_params:
q.extend(["o=%s" % p for p in o_params])
path = "changes/?%s" % "&".join(q)
try:
result = FetchUrlJson(host, path, ignore_404=False)
except GOBError as e:
msg = "%s:\n%s" % (e, path)
raise GOBError(http_status=e.http_status, reason=msg)
return result
def GetGerritFetchUrl(host):
"""Returns URL of a gerrit instance to fetch from for gerrit |host| name."""
return "https://%s/" % host
def GetChangePageUrl(host, change_number):
"""Given a gerrit host name and change number, return change page url."""
return "https://%s/#/c/%d/" % (host, change_number)
def _GetChangePath(change):
"""Given a change id, return a path prefix for the change."""
return "changes/%s" % str(change).replace("/", "%2F")
def GetChangeUrl(host, change):
"""Given a gerrit host name and change id, return an url for the change."""
return "https://%s/a/%s" % (host, _GetChangePath(change))
def GetChange(host, change):
"""Query a gerrit server for information about a single change."""
return FetchUrlJson(host, _GetChangePath(change))
def GetChangeReview(host, change, revision=None):
"""Get the current review information for a change."""
if revision is None:
revision = "current"
path = "%s/revisions/%s/review" % (_GetChangePath(change), revision)
return FetchUrlJson(host, path)
def GetChangeCommit(host, change, revision=None):
"""Get the current review information for a change."""
if revision is None:
revision = "current"
path = "%s/revisions/%s/commit" % (_GetChangePath(change), revision)
return FetchUrlJson(host, path)
def GetChangeCurrentRevision(host, change):
"""Get information about the latest revision for a given change."""
jmsg = GetChangeReview(host, change)
if jmsg:
return jmsg.get("current_revision")
def GetChangeDetail(host, change, o_params=None):
"""Query a gerrit server for extended information about a single change."""
path = "%s/detail" % _GetChangePath(change)
if o_params:
path = "%s?%s" % (path, "&".join(["o=%s" % p for p in o_params]))
return FetchUrlJson(host, path)
def GetChangeMergeable(host, change, revision=None) -> Optional[Dict[str, Any]]:
"""Query a gerrit server for "mergeable" state."""
if revision is None:
revision = "current"
path = "%s/revisions/%s/mergeable" % (_GetChangePath(change), revision)
return FetchUrlJson(host, path)
def GetRelatedChanges(host, change, revision=None) -> Optional[Dict[str, Any]]:
"""Get changes that depend on or are dependencies for a given change.
Args:
host: The Gerrit host to interact with.
change: The Gerrit change ID.
revision: The Gerrit change revision, default is "current".
Returns:
A JSON response dict repesenting a RelatedChangesInfo entity.
"""
if revision is None:
revision = "current"
path = "%s/revisions/%s/related" % (_GetChangePath(change), revision)
return FetchUrlJson(host, path)
def GetChangeReviewers(host, change) -> None:
"""Get information about all reviewers attached to a change.
Args:
host: The Gerrit host to interact with.
change: The Gerrit change ID.
"""
warnings.warn("GetChangeReviewers is deprecated; use GetReviewers instead.")
GetReviewers(host, change)
def CreateChange(
host: str, project: str, branch: str, subject: str, publish: bool
) -> Dict[str, Any]:
"""Creates an empty change.
Args:
host: The Gerrit host to interact with.
project: The name of the Gerrit project for the change.
branch: Branch for the change.
subject: Initial commit message for the change.
publish: If True, will publish the CL after uploading. Stays in WIP mode
otherwise.
Returns:
A JSON response dict.
"""
path = "changes/"
body = {"project": project, "branch": branch, "subject": subject}
if not publish:
body["work_in_progress"] = "true"
body["notify"] = "NONE"
return FetchUrlJson(
host,
path,
body=body,
reqtype="POST",
expect=(200, 201),
ignore_404=False,
)
def ChangeEdit(
host: str, change: str, filepath: str, contents: str
) -> Dict[str, Any]:
"""Attaches file modifications to an open change.
Args:
host: The Gerrit host to interact with.
change: A Gerrit change number.
filepath: Path of the file in the repo to modify.
contents: New contents of the file.
Returns:
A JSON response dict.
"""
path = "%s/edit/%s" % (
_GetChangePath(change),
urllib.parse.quote(filepath, ""),
)
contents = contents.encode("utf-8") # string -> bytes
contents = base64.b64encode(contents) # bytes -> bytes
contents = contents.decode("utf-8") # bytes -> string
body = {"binary_content": "data:text/plain;base64,%s" % contents}
return FetchUrlJson(host, path, body=body, reqtype="PUT", expect=204)
def PublishChangeEdit(host: str, change: str) -> Dict[str, Any]:
"""Publishes any open edits in a change.
Args:
host: The Gerrit host to interact with.
change: A Gerrit change number.
Returns:
A JSON response dict.
"""
path = "%s/edit:publish" % _GetChangePath(change)
body = {"notify": "NONE"}
return FetchUrlJson(host, path, body=body, reqtype="POST", expect=204)
def AbandonChange(host, change, msg="", notify=None):
"""Abandon a gerrit change."""
path = "%s/abandon" % _GetChangePath(change)
body = {"message": msg}
if notify is not None:
body["notify"] = notify
return FetchUrlJson(host, path, reqtype="POST", body=body, ignore_404=False)
def RestoreChange(host, change, msg=""):
"""Restore a previously abandoned change."""
path = "%s/restore" % _GetChangePath(change)
body = {"message": msg}
return FetchUrlJson(host, path, reqtype="POST", body=body, ignore_404=False)
def Delete(host, change) -> None:
"""Delete a gerrit change."""
path = _GetChangePath(change)
FetchUrl(host, path, reqtype="DELETE", expect=204, ignore_404=False)
def CherryPick(
host,
change,
branch,
rev="current",
msg="",
allow_conflicts: bool = False,
notify=None,
):
"""Cherry pick a change to a branch."""
path = "%s/revisions/%s/cherrypick" % (_GetChangePath(change), rev)
body = {
"destination": branch,
"message": msg,
"allow_conflicts": allow_conflicts,
}
if notify is not None:
body["notify"] = notify
return FetchUrlJson(host, path, reqtype="POST", body=body)
def SubmitChange(host, change, revision=None, wait_for_merge=True, notify=None):
"""Submits a gerrit change via Gerrit."""
if revision is None:
revision = "current"
path = "%s/revisions/%s/submit" % (_GetChangePath(change), revision)
body = {"wait_for_merge": wait_for_merge}
if notify is not None:
body["notify"] = notify
return FetchUrlJson(host, path, reqtype="POST", body=body, ignore_404=False)
def CheckChange(host, change, sha1=None):
"""Performs consistency checks on the change, and fixes inconsistencies.
This is useful for forcing Gerrit to check whether a change has already been
merged into the git repo. Namely, if |sha1| is provided and the change is in
'NEW' status, Gerrit will check if a change with that |sha1| is in the repo
and mark the change as 'MERGED' if it exists.
Args:
host: The Gerrit host to interact with.
change: The Gerrit change ID.
sha1: An optional hint of the commit's SHA1 in Git.
"""
path = "%s/check" % (_GetChangePath(change),)
if sha1:
body, headers = {"expect_merged_as": sha1}, {}
else:
body, headers = {}, {"Content-Length": "0"}
return FetchUrlJson(
host, path, reqtype="POST", body=body, ignore_404=False, headers=headers
)
def MarkPrivate(host, change) -> None:
"""Marks the given CL as private.
Args:
host: The gob host to interact with.
change: CL number on the given host.
"""
path = "%s/private" % _GetChangePath(change)
try:
FetchUrlJson(host, path, reqtype="POST", ignore_404=False)
except GOBError as e:
# 201: created -- change was successfully marked private.
if e.http_status != 201:
raise
else:
raise GOBError(
http_status=200,
reason="Change was already marked private",
)
def MarkNotPrivate(host, change) -> None:
"""Sets the private bit on given CL to False.
Args:
host: The gob host to interact with.
change: CL number on the given host.
"""
path = "%s/private.delete" % _GetChangePath(change)
try:
FetchUrlJson(host, path, reqtype="POST", expect=204, ignore_404=False)
except GOBError as e:
if e.http_status == 409:
raise GOBError(
http_status=e.http_status,
reason="Change was already marked not private",
)
else:
raise
def MarkWorkInProgress(host, change, msg=""):
"""Marks the given CL as Work-In-Progress.
Args:
host: The gob host to interact with.
change: CL number on the given host.
msg: Message to post together with the action.
"""
path = "%s/wip" % _GetChangePath(change)
body = {"message": msg}
return FetchUrlJson(host, path, reqtype="POST", body=body, ignore_404=False)
def MarkReadyForReview(host, change, msg=""):
"""Marks the given CL as Ready-For-Review.
Args:
host: The gob host to interact with.
change: CL number on the given host.
msg: Message to post together with the action.
"""
path = "%s/ready" % _GetChangePath(change)
body = {"message": msg}
return FetchUrlJson(host, path, reqtype="POST", body=body, ignore_404=False)
def GetAttentionSet(host: str, change: str) -> Optional[Dict[str, Any]]:
"""Get information about the attention set of a change.
Args:
host: The Gerrit host to interact with.
change: The Gerrit change ID.
Returns:
A JSON response dict.
"""
path = "%s/attention" % _GetChangePath(change)
return FetchUrlJson(host, path)
def AddAttentionSet(
host: str, change: str, add: Tuple[str, ...], reason: str, notify: str = ""
) -> Optional[Dict[str, Any]]:
"""Add users to the attention set of a change."""
if not add:
return
body = {}
body["reason"] = reason
if notify:
body["notify"] = notify
path = "%s/attention" % _GetChangePath(change)
for r in add:
body["user"] = r
jmsg = FetchUrlJson(
host, path, reqtype="POST", body=body, ignore_404=False
)
# Return the last response. We've run through at least one request if we got
# here.
return jmsg
def RemoveAttentionSet(
host: str,
change: str,
remove: Tuple[str, ...],
reason: str,
notify: str = "",
) -> None:
"""Remove users from the attention set of a change."""
if not remove:
return
body = {}
body["reason"] = reason
if notify:
body["notify"] = notify
for r in remove:
path = "%s/attention/%s/delete" % (_GetChangePath(change), r)
FetchUrl(host, path, reqtype="POST", body=body, expect=204)
def GetReviewers(host, change):
"""Get information about all reviewers attached to a change.
Args:
host: The Gerrit host to interact with.
change: The Gerrit change ID.
"""
path = "%s/reviewers" % _GetChangePath(change)
return FetchUrlJson(host, path)
def AddReviewers(host, change, add=None, notify=None):
"""Add reviewers to a change."""
if not add:
return
if isinstance(add, str):
add = (add,)
body = {}
if notify:
body["notify"] = notify
path = "%s/reviewers" % _GetChangePath(change)
for r in add:
body["reviewer"] = r
jmsg = FetchUrlJson(
host, path, reqtype="POST", body=body, ignore_404=False
)
return jmsg
def RemoveReviewers(host, change, remove=None, notify=None) -> None:
"""Remove reviewers from a change."""
if not remove:
return
if isinstance(remove, str):
remove = (remove,)
body = {}
if notify:
body["notify"] = notify
for r in remove:
path = "%s/reviewers/%s/delete" % (_GetChangePath(change), r)
FetchUrl(host, path, reqtype="POST", body=body, expect=204)
def SetReview(
host,
change,
revision=None,
msg=None,
labels=None,
notify=None,
reviewers=None,
cc=None,
remove_reviewers=None,
ready=None,
wip=None,
) -> None:
"""Set labels and/or add a message to a code review."""
if revision is None:
revision = "current"
# Ignore 'notify' on purpose - it's not empty by default in the caller.
if not any((msg, labels, reviewers, cc, ready, wip)):
return
path = "%s/revisions/%s/review" % (_GetChangePath(change), revision)
body = {"reviewers": []}
if msg:
body["message"] = msg
if labels:
body["labels"] = labels
if notify:
body["notify"] = notify
if reviewers:
body["reviewers"].extend({"reviewer": x} for x in reviewers)
if cc:
body["reviewers"].extend({"reviewer": x, "state": "CC"} for x in cc)
if remove_reviewers:
body["reviewers"].extend(
{"reviewer": x, "state": "REMOVED"} for x in remove_reviewers
)
if ready is not None:
body["ready"] = ready
if wip is not None:
body["work_in_progress"] = wip
response = FetchUrlJson(host, path, reqtype="POST", body=body)
if response is None:
raise GOBError(
http_status=404, reason="CL %s not found in %s" % (change, host)
)
if labels:
for key, val in labels.items():
if (
"labels" not in response
or key not in response["labels"]
or int(response["labels"][key] != int(val))
):
raise GOBError(
http_status=200,
reason='Unable to set "%s" label on change %s.'
% (key, change),
)
def SetTopic(host, change, topic):
"""Set |topic| for a change. If |topic| is empty, it will be deleted"""
path = "%s/topic" % _GetChangePath(change)
body = {"topic": topic}
return FetchUrlJson(host, path, reqtype="PUT", body=body, ignore_404=False)
def SetHashtags(host, change, add, remove):
"""Adds and / or removes hashtags from a change.
Args:
host: Hostname (without protocol prefix) of the gerrit server.
change: A gerrit change number.
add: a list of hashtags to be added.
remove: a list of hashtags to be removed.
"""
path = "%s/hashtags" % _GetChangePath(change)
return FetchUrlJson(
host,
path,
reqtype="POST",
body={"add": add, "remove": remove},
ignore_404=False,
)
def ResetReviewLabels(
host, change, label, value="0", revision=None, message=None, notify=None
) -> None:
"""Reset the value of a given label for all reviewers on a change."""
if revision is None:
revision = "current"
# This is tricky when working on the "current" revision, because there's
# always the risk that the "current" revision will change in between API
# calls. So, the code dereferences the "current" revision down to a literal
# sha1 at the beginning and uses it for all subsequent calls. As a quick
# check, the "current" revision is dereferenced again at the end, and if it
# differs from the previous "current" revision, an exception is raised.
current = revision == "current"
jmsg = GetChangeDetail(
host, change, o_params=["CURRENT_REVISION", "CURRENT_COMMIT"]
)
if current:
revision = jmsg["current_revision"]
value = str(value)
path = "%s/revisions/%s/review" % (_GetChangePath(change), revision)
message = message or (
"%s label set to %s programmatically by chromite." % (label, value)
)
for review in jmsg.get("labels", {}).get(label, {}).get("all", []):
if str(review.get("value", value)) != value:
body = {
"message": message,
"labels": {label: value},
"on_behalf_of": review["_account_id"],
}
if notify:
body["notify"] = notify
response = FetchUrlJson(host, path, reqtype="POST", body=body)
if str(response["labels"][label]) != value:
username = review.get("email", jmsg.get("name", ""))
raise GOBError(
http_status=200,
reason='Unable to set %s label for user "%s" on change %s.'
% (label, username, change),
)
if current:
new_revision = GetChangeCurrentRevision(host, change)
if not new_revision:
raise GOBError(
http_status=200,
reason='Could not get review information for change "%s"'
% change,
)
elif new_revision != revision:
raise GOBError(
http_status=200,
reason=f'While resetting labels on change "{change}", a new '
"patchset was uploaded.",
)
def GetTipOfTrunkRevision(git_url):
"""Returns the current git revision on the default branch."""
parsed_url = urllib.parse.urlparse(git_url)
path = parsed_url[2].rstrip("/") + "/+log/HEAD?n=1&format=JSON"
j = FetchUrlJson(parsed_url[1], path, ignore_404=False)
if not j:
raise GOBError(
reason="Could not find revision information from %s" % git_url
)
try:
return j["log"][0]["commit"]
except (IndexError, KeyError, TypeError):
msg = (
"The json returned by https://%s%s has an unfamiliar structure:\n"
"%s\n" % (parsed_url[1], path, j)
)
raise GOBError(reason=msg)
def GetFileContentsOnHead(git_url: str, filepath: str) -> str:
"""Returns the current contents of a file on the default branch.
Retrieves the contents from Gitiles via its API, not Gerrit's.
Args:
git_url: URL for the repository to get the file contents from.
filepath: Path of the file in the repository.
Returns:
The contents of the file as a string.
"""
return GetFileContents(git_url, filepath, ref="HEAD")
def GetFileContents(git_url: str, filepath: str, ref="HEAD") -> str:
"""Returns the current contents of a file on the given ref.
Retrieves the contents from Gitiles via its API, not Gerrit's.
Args:
git_url: URL for the repository to get the file contents from.
filepath: Path of the file in the repository.
ref: The ref to use, e.g. HEAD or refs/heads/main
Returns:
The contents of the file as a string.
"""
parsed_url = urllib.parse.urlparse(git_url)
path = parsed_url[2].rstrip("/") + f"/+/{ref}/{filepath}?format=TEXT"
contents = FetchUrl(parsed_url[1], path, ignore_404=False)
contents = base64.b64decode(contents)
return contents.decode("utf-8")
def GetFileContentsFromGerrit(
host: str, change: str, filepath: str, revision: Optional[str] = None
) -> Optional[str]:
"""Returns the current contents of a file from the Gerrit.
Args:
host: The Gerrit host to interact with.
change: A Gerrit change number.
filepath: Path of the file in the repo to retrieve.
revision: The specific revision in the change. Defaults or None to the
latest revision.
Returns:
Contents of the file.
"""
if revision is None:
revision = "current"
path = "%s/revisions/%s/files/%s/content" % (
_GetChangePath(change),
revision,
urllib.parse.quote(filepath, ""),
)
contents = FetchUrl(host, path)
if contents is None:
return None
contents = base64.b64decode(contents) # bytes -> bytes
contents = contents.decode("utf-8") # bytes -> string
return contents
def Rebase(
host: str, change: str, allow_conflicts: bool = False
) -> Optional[Dict[str, Any]]:
"""Rebase the CL to the main branch.
Args:
host: The Gerrit host to interact with.
change: A Gerrit change number.
allow_conflicts: True if allowing the merge-conflict after rebasing.
Returns:
ChangeInfo of the change after the reading.
"""
path = "%s/rebase" % (_GetChangePath(change),)
body = {"allow_conflicts": "true" if allow_conflicts else "false"}
change_info = FetchUrlJson(host, path, body=body, reqtype="POST")
if change_info is None:
return None
return change_info
def GetCommitDate(git_url, commit):
"""Returns the date of a particular git commit.
The returned object is naive in the sense that it doesn't carry any timezone
information - you should assume UTC.
Args:
git_url: URL for the repository to get the commit date from.
commit: A git commit identifier (e.g. a sha1).
Returns:
A datetime object.
"""
parsed_url = urllib.parse.urlparse(git_url)
path = "%s/+log/%s?n=1&format=JSON" % (parsed_url.path.rstrip("/"), commit)
j = FetchUrlJson(parsed_url.netloc, path, ignore_404=False)
if not j:
raise GOBError(
reason="Could not find revision information from %s" % git_url
)
try:
commit_timestr = j["log"][0]["committer"]["time"]
except (IndexError, KeyError, TypeError):
msg = (
"The json returned by https://%s%s has an unfamiliar structure:\n"
"%s\n" % (parsed_url.netloc, path, j)
)
raise GOBError(reason=msg)
try:
# We're parsing a string of the form 'Tue Dec 02 17:48:06 2014'.
return datetime.datetime.strptime(
commit_timestr, constants.GOB_COMMIT_TIME_FORMAT
)
except ValueError:
raise GOBError(
reason='Failed parsing commit time "%s"' % commit_timestr
)
def GetAccount(host, account="self"):
"""Get information about the user account."""
return FetchUrlJson(host, "accounts/%s" % (account,))