blob: 2738e369b72553e182c1a666aad9fa939f64ea30 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-"
#
# Copyright 2020 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing methods interfacing with gerrit.
i.e Create new bugfix change tickets, and reading metadata about a specific change.
Example CURL command that creates CL:
curl -b /home/chromeos_patches/.git-credential-cache/cookie \
--header "Content-Type: application/json" \
--data \
'{"project":"chromiumos/third_party/kernel",\
"subject":"test",\
"branch":"chromeos-4.19",\
"topic":"test_topic"}' https://chromium-review.googlesource.com/a/changes/
"""
from enum import Enum
import http
import json
import logging
import os
import re
import subprocess
import sys
import common
import git_interface
import requests # pylint: disable=import-error
class GerritStatus(str, Enum):
"""String representations for Gerrit CL status."""
NEW = "NEW"
ABANDONED = "ABANDONED"
MERGED = "MERGED"
def get_auth_cookie():
"""Load cookies in order to authenticate requests with gerrit/googlesource."""
# This cookie should exist in order to perform GAIA authenticated requests
try:
gerrit_credentials_cookies = http.cookiejar.MozillaCookieJar(
common.GCE_GIT_COOKIE_PATH, None, None
)
gerrit_credentials_cookies.load()
return gerrit_credentials_cookies
except FileNotFoundError:
try:
gerrit_credentials_cookies = http.cookiejar.MozillaCookieJar(
common.LOCAL_GIT_COOKIE_PATH, None, None
)
gerrit_credentials_cookies.load()
return gerrit_credentials_cookies
except FileNotFoundError:
logging.error(
"Could not locate gitcookies file. Generate cookie file and try again"
)
logging.error(
"If running locally, ensure gitcookies file is located at ~/.gitcookies"
)
logging.error(
"Learn more by visiting go/gob-dev#testing-user-authentication"
)
raise
def retrieve_and_parse_endpoint(endpoint_url):
"""Retrieves Gerrit endpoint response and removes XSSI prefix )]}'"""
try:
resp = requests.get(endpoint_url, cookies=get_auth_cookie())
resp.raise_for_status()
resp_json = json.loads(resp.text[5:])
except requests.exceptions.HTTPError as e:
raise type(e)(
"Endpoint %s should have HTTP response 200" % endpoint_url
) from e
except json.decoder.JSONDecodeError as e:
raise ValueError(
"Response should contain json )]} prefix to prevent XSSI attacks"
) from e
return resp_json
def set_and_parse_endpoint(endpoint_url, payload=None):
"""POST request to gerrit endpoint with specified payload."""
try:
resp = requests.post(
endpoint_url, json=payload, cookies=get_auth_cookie()
)
resp.raise_for_status()
resp_json = json.loads(resp.text[5:])
except json.decoder.JSONDecodeError as e:
raise ValueError(
"Response should contain json )]} prefix to prevent XSSI attacks"
) from e
return resp_json
def get_full_changeid(changeid, branch):
"""Returns the changeid with url-encoding in project~branch~changeid format."""
project = "chromiumos%2Fthird_party%2Fkernel"
chromeos_branch = common.chromeos_branch(branch)
return "%s~%s~%s" % (project, chromeos_branch, changeid)
def get_reviewers(changeid, branch):
"""Retrieves list of reviewer emails from gerrit given a chromeos changeid."""
unique_changeid = get_full_changeid(changeid, branch)
list_reviewers_endpoint = os.path.join(
common.CHROMIUM_REVIEW_BASEURL, "changes", unique_changeid, "reviewers"
)
resp = retrieve_and_parse_endpoint(list_reviewers_endpoint)
return [
reviewer_resp["email"]
for reviewer_resp in resp
if "email" in reviewer_resp
]
def abandon_change(changeid, branch, reason=None):
"""Abandons a change."""
unique_changeid = get_full_changeid(changeid, branch)
abandon_change_endpoint = os.path.join(
common.CHROMIUM_REVIEW_BASEURL, "changes", unique_changeid, "abandon"
)
abandon_payload = {"message": reason} if reason else None
try:
set_and_parse_endpoint(abandon_change_endpoint, abandon_payload)
logging.info("Abandoned changeid %s on Gerrit", changeid)
except requests.exceptions.HTTPError as e:
if e.response.status_code == http.HTTPStatus.CONFLICT:
logging.info(
"Change %s for branch %s has already been abandoned",
changeid,
branch,
)
else:
raise
def restore_change(changeid, branch, reason=None):
"""Restores an abandoned change."""
unique_changeid = get_full_changeid(changeid, branch)
restore_change_endpoint = os.path.join(
common.CHROMIUM_REVIEW_BASEURL, "changes", unique_changeid, "restore"
)
restore_payload = {"message": reason} if reason else None
try:
set_and_parse_endpoint(restore_change_endpoint, restore_payload)
logging.info("Restored changeid %s on Gerrit", changeid)
except requests.exceptions.HTTPError as e:
if e.response.status_code == http.HTTPStatus.CONFLICT:
logging.info(
"Change %s for branch %s has already been restored",
changeid,
branch,
)
else:
raise
def get_change(changeid, branch):
"""Retrieves ChangeInfo from gerrit using its changeid"""
unique_changeid = get_full_changeid(changeid, branch)
get_change_endpoint = os.path.join(
common.CHROMIUM_REVIEW_BASEURL, "changes", unique_changeid
)
return retrieve_and_parse_endpoint(get_change_endpoint)
def get_messages(changeid, branch):
"""Retrieves ChangeInfo from gerrit using its changeid"""
unique_changeid = get_full_changeid(changeid, branch)
get_messages_endpoint = os.path.join(
common.CHROMIUM_REVIEW_BASEURL, "changes", unique_changeid, "messages"
)
return retrieve_and_parse_endpoint(get_messages_endpoint)
def set_hashtag(changeid, branch):
"""Set hashtag to be autogenerated indicating a robot generated CL."""
unique_changeid = get_full_changeid(changeid, branch)
set_hashtag_endpoint = os.path.join(
common.CHROMIUM_REVIEW_BASEURL, "changes", unique_changeid, "hashtags"
)
hashtag_input_payload = {"add": ["autogenerated"]}
set_and_parse_endpoint(set_hashtag_endpoint, hashtag_input_payload)
def get_status(changeid, branch):
"""Retrieves the latest status of a changeid by checking gerrit."""
change_info = get_change(changeid, branch)
return change_info["status"]
def get_bug_test_line(chrome_sha):
"""Retrieve BUG and TEST lines from the chrome sha."""
# stable fixes don't have a fixee changeid
bug_test_line = "BUG=%s\nTEST=%s"
bug = test = None
if not chrome_sha:
return bug_test_line % (bug, test)
chrome_commit_msg = git_interface.get_chrome_commit_message(chrome_sha)
bug_matches = re.findall("^BUG=(.*)$", chrome_commit_msg, re.M)
test_matches = re.findall("^TEST=(.*)$", chrome_commit_msg, re.M)
if bug_matches:
bug = bug_matches[-1]
if bug is None or bug == "None":
bug = "None (see commit %s)" % chrome_sha
if test_matches:
test = test_matches[-1]
return bug_test_line % (bug, test)
def generate_fix_message(fixer_upstream_sha, bug_test_line):
"""Generates new commit message for a fix change.
Use script ./contrib/from_upstream.py to generate new commit msg
Commit message should include essential information:
i.e:
FROMGIT, FROMLIST, ANDROID, CHROMIUM, etc.
commit message indiciating what is happening
BUG=...
TEST=...
tag for Fixes: <upstream-sha>
"""
fix_upstream_commit_msg = git_interface.get_upstream_commit_message(
fixer_upstream_sha
)
upstream_full_sha = git_interface.get_upstream_fullsha(fixer_upstream_sha)
cherry_picked = "(cherry picked from commit %s)\n\n" % upstream_full_sha
commit_message = "UPSTREAM: %s%s%s" % (
fix_upstream_commit_msg,
cherry_picked,
bug_test_line,
)
return commit_message
def create_change(
fixee_kernel_sha,
fixer_upstream_sha,
branch,
is_chromeos,
fixer_changeid=None,
):
"""Creates a Patch in gerrit given a ChangeInput object.
Determines whether a change for a fix has already been created,
and avoids duplicate creations.
"""
cwd = os.getcwd()
chromeos_branch = common.chromeos_branch(branch)
if is_chromeos:
fixee_changeid = git_interface.get_commit_changeid_linux_chrome(
fixee_kernel_sha
)
chrome_kernel_sha = fixee_kernel_sha
if not fixee_changeid:
# This may be a merge. Try to find its merge commit.
merge_sha = git_interface.get_merge_sha(
chromeos_branch, fixee_kernel_sha
)
if merge_sha:
chrome_kernel_sha = merge_sha
fixee_changeid = git_interface.get_commit_changeid_linux_chrome(
merge_sha
)
else:
fixee_changeid = None
chrome_kernel_sha = None
bug_test_line = get_bug_test_line(chrome_kernel_sha)
fix_commit_message = generate_fix_message(fixer_upstream_sha, bug_test_line)
# Copy all requests to well defined mailing list if there is no reviewer
# and the patch is ChromeOS-specific. Note that the mailing list must be
# accessible from outside Google because Gerrit will send e-mail to it,
# and the Gerrit e-mail address is a chromium.org address.
#
# This is for stable bug fix patches that don't have a direct fixee changeid
# since groups of stable commits get merged as one changeid
cc = ["cros-kernel-codereviews@googlegroups.com"] if is_chromeos else None
reviewers = None
try:
if fixee_changeid:
cl_reviewers = get_reviewers(fixee_changeid, branch)
def excluding_rule(email):
if not email.endswith("google.com") and not email.endswith(
"chromium.org"
):
return False
return email != common.GWSQ_EMAIL
cl_reviewers = list(filter(excluding_rule, cl_reviewers))
if cl_reviewers:
reviewers = cl_reviewers
cc = None
except requests.exceptions.HTTPError:
# There is a Change-Id in the commit log, but Gerrit does not have a
# matching entry.
logging.warning(
"Failed to get reviewer(s) from gerrit for Change-Id %s",
fixee_changeid,
)
if not reviewers:
# Fall back to list of e-mails found in tags after the last
# "cherry picked" message.
emails = git_interface.get_tag_emails_linux_chrome(fixee_kernel_sha)
if emails:
reviewers = emails
cc = None
try:
# Cherry pick changes and generate commit message indicating fix from upstream
handler = git_interface.commitHandler(
common.Kernel.linux_chrome, branch
)
fixer_changeid = handler.cherry_pick_and_push(
fixer_upstream_sha,
fixer_changeid,
fix_commit_message,
reviewers,
cc,
)
except ValueError:
# Error cherry-picking and pushing fix patch
logging.error(
"Failed to cherry-pick and push SHA %s", fixer_upstream_sha
)
return None
os.chdir(cwd)
return fixer_changeid
def gerrit_cmd(args):
"""Return True on success."""
cmd = [common.GERRIT_PATH] + args
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return True
except subprocess.CalledProcessError:
return False
def label_cq(cl_num, value):
return gerrit_cmd(["label-cq", cl_num, value])
def label_as(cl_num, value):
return gerrit_cmd(["label-as", cl_num, value])
def label_v(cl_num, value):
return gerrit_cmd(["label-v", cl_num, value])
def gerrit_output(args):
"""Return None on fail."""
cmd = [common.GERRIT_PATH] + args
try:
return subprocess.check_output(
cmd, encoding="utf-8", errors="ignore", stderr=subprocess.DEVNULL
)
except subprocess.CalledProcessError:
return None
def get_cl_nums(changeid):
"""Get all CL numbers for the given changeid.
Return a dict that maps from branch to changeid.
"""
q = f"project:chromiumos/third_party/kernel change:{changeid}"
logging.info('Query CL numbers via query string "%s"', q)
data = gerrit_output(["--json", "search", q])
if data is None:
return {}
ret = {}
try:
for obj in json.loads(data):
# Example to parse: chromeos-6.6
branch = obj["branch"].split("-")[1]
ret[branch] = obj["number"]
except (json.decoder.JSONDecodeError, IndexError, KeyError):
pass
return ret
def label_cq_plus1(branch, changeid):
"""Label CQ+1 for the given branch and changeid.
Use Gerrit CLI to query the CL number and label it CQ+1.
Returns True if CQ+1 success; otherwise False.
"""
try:
cl_num = get_cl_nums(changeid)[branch]
logging.info("CL number: %s", cl_num)
except KeyError:
return False
return label_cq(cl_num, "1")
def inspect(cl_num=None):
"""Inspect CL.
If `cl_num` is None, `gerrit --json mine`.
Otherwise, `gerrit --json inspect [cl_num]`.
Return the JSON as a dictionary.
"""
args = ["--json"]
if cl_num:
args += ["inspect", cl_num]
else:
args += ["mine"]
data = gerrit_output(args)
if not data:
return None
try:
return json.loads(data)
except json.decoder.JSONDecodeError:
return None
def send_message(cl_num, msg):
"""Send message.
`gerrit message [cl_num] [msg]`.
Return True on success.
"""
return gerrit_cmd(["message", cl_num, msg])
def add_reviewer(cl_num, reviewer):
"""Add reviewer.
`gerrit reviewers [cl_num] [reviewer]`.
Return True on success.
"""
return gerrit_cmd(["reviewers", cl_num, reviewer])
if __name__ == "__main__":
if len(sys.argv) == 3:
print(json.dumps(get_messages(sys.argv[1], sys.argv[2])))
else:
print("Usage: %s [CHANGE-ID] [BRANCH]" % sys.argv[0])