| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*-" |
| # |
| # Copyright 2020 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Module containing methods interfacing with git |
| |
| i.e Parsing git logs for change-id, full commit sha's, etc. |
| """ |
| |
| import datetime |
| import logging |
| import re |
| import subprocess |
| |
| import common |
| |
| |
| def _git_check_output(path, command): |
| git_cmd = ["git", "-C", path] + command |
| return subprocess.check_output( |
| git_cmd, encoding="utf-8", errors="ignore", stderr=subprocess.DEVNULL |
| ) |
| |
| |
| def get_upstream_fullsha(abbrev_sha): |
| """Returns the full upstream sha for an abbreviated 12 digit sha using git cli""" |
| upstream_absolute_path = common.get_kernel_absolute_path( |
| common.UPSTREAM_PATH |
| ) |
| try: |
| cmd = ["rev-parse", abbrev_sha] |
| full_sha = _git_check_output(upstream_absolute_path, cmd) |
| return full_sha.rstrip() |
| except subprocess.CalledProcessError as e: |
| raise type(e)( |
| "Could not find full upstream sha for %s" % abbrev_sha, e.cmd |
| ) from e |
| |
| |
| def _get_commit_message(kernel_path, sha): |
| """Returns the commit message for a sha in a given local path to kernel.""" |
| try: |
| cmd = ["log", "--format=%B", "-n", "1", sha] |
| commit_message = _git_check_output(kernel_path, cmd) |
| |
| # Single newline following commit message |
| return commit_message.rstrip() + "\n" |
| except subprocess.CalledProcessError as e: |
| raise type(e)( |
| "Couldnt retrieve commit in kernel path %s for sha %s" |
| % (kernel_path, sha), |
| e.cmd, |
| ) from e |
| |
| |
| def get_upstream_commit_message(upstream_sha): |
| """Returns the commit message for a given upstream sha using git cli.""" |
| upstream_absolute_path = common.get_kernel_absolute_path( |
| common.UPSTREAM_PATH |
| ) |
| return _get_commit_message(upstream_absolute_path, upstream_sha) |
| |
| |
| def get_chrome_commit_message(chrome_sha): |
| """Returns the commit message for a given chrome sha using git cli.""" |
| chrome_absolute_path = common.get_kernel_absolute_path(common.CHROMEOS_PATH) |
| return _get_commit_message(chrome_absolute_path, chrome_sha) |
| |
| |
| def get_merge_sha(branch, sha): |
| """Returns SHA of merge commit for provided SHA if available""" |
| |
| chrome_absolute_path = common.get_kernel_absolute_path(common.CHROMEOS_PATH) |
| |
| try: |
| # Get list of merges in <branch> since <sha> |
| cmd = [ |
| "log", |
| "--format=%h", |
| "--abbrev=12", |
| "--ancestry-path", |
| "--merges", |
| "%s..%s" % (sha, branch), |
| ] |
| sha_list = _git_check_output(chrome_absolute_path, cmd) |
| if not sha_list: |
| logging.info("No merge commit for sha %s in branch %s", sha, branch) |
| return None |
| # merge_sha is our presumed merge commit |
| merge_sha = sha_list.splitlines()[-1] |
| # Verify if <sha> is indeed part of the merge |
| cmd = [ |
| "log", |
| "--format=%h", |
| "--abbrev=12", |
| "%s~1..%s" % (merge_sha, merge_sha), |
| ] |
| sha_list = _git_check_output(chrome_absolute_path, cmd) |
| if sha_list and sha in sha_list.splitlines(): |
| return merge_sha |
| logging.info( |
| "Merge commit for sha %s found as %s, but sha is missing in merge", |
| sha, |
| merge_sha, |
| ) |
| |
| except subprocess.CalledProcessError as e: |
| logging.info( |
| 'Error "%s" while trying to find merge commit for sha %s in branch %s', |
| e, |
| sha, |
| branch, |
| ) |
| |
| return None |
| |
| |
| def get_committer_date(kernel_sha, branch=None): |
| """Gets committer_date in git repository.""" |
| if branch is None: |
| absolute_path = common.get_kernel_absolute_path(common.UPSTREAM_PATH) |
| else: |
| absolute_path = common.get_kernel_absolute_path(common.CHROMEOS_PATH) |
| cmd = ["checkout", common.chromeos_branch(branch)] |
| _git_check_output(absolute_path, cmd) |
| |
| try: |
| cmd = ["log", "--format=%ct", "-n", "1", kernel_sha] |
| ts = int(_git_check_output(absolute_path, cmd)) |
| return datetime.datetime.fromtimestamp(ts, datetime.timezone.utc) |
| except subprocess.CalledProcessError: |
| return None |
| |
| |
| def get_commit_changeid_linux_chrome(kernel_sha): |
| """Returns the changeid of the kernel_sha commit by parsing linux_chrome git log. |
| |
| kernel_sha will be one of linux_stable or linux_chrome commits. |
| """ |
| chrome_absolute_path = common.get_kernel_absolute_path(common.CHROMEOS_PATH) |
| try: |
| cmd = ["log", "--format=%B", "-n", "1", kernel_sha] |
| commit_message = _git_check_output(chrome_absolute_path, cmd) |
| |
| m = re.findall("^Change-Id: (I[a-z0-9]{40})$", commit_message, re.M) |
| |
| # Get last change-id in case chrome sha cherry-picked/reverted into new commit |
| return m[-1] |
| except subprocess.CalledProcessError as e: |
| raise type(e)( |
| "Couldnt retrieve changeid for commit %s" % kernel_sha, e.cmd |
| ) from e |
| except IndexError as e: |
| # linux_stable kernel_sha's do not have an associated ChangeID |
| return None |
| |
| |
| def get_tag_emails_linux_chrome(sha): |
| """Returns unique list of chromium.org or google.com e-mails. |
| |
| The returned lust of e-mails is associated with tags found after |
| the last 'cherry picked from commit' message in the commit identified |
| by sha. Tags and e-mails are found by parsing the commit log. |
| |
| sha is expected to be be a commit in linux_stable or in linux_chrome. |
| """ |
| absolute_path = common.get_kernel_absolute_path(common.CHROMEOS_PATH) |
| try: |
| cmd = ["log", "--format=%B", "-n", "1", sha] |
| commit_message = _git_check_output(absolute_path, cmd) |
| # If the commit has been cherry-picked, use subsequent tags to create |
| # list of reviewers. Otherwise, use all tags. Either case, only return |
| # e-mail addresses from Google domains. |
| s = commit_message.split("cherry picked from commit") |
| tags = "Signed-off-by|Reviewed-by|Tested-by|Commit-Queue" |
| domains = "chromium.org|google.com" |
| m = "^(?:%s): .* <(.*@(?:%s))>$" % (tags, domains) |
| emails = re.findall(m, s[-1], re.M) |
| if not emails: |
| # Final fallback: In some situations, "cherry picked from" |
| # is at the very end of the commit description, with no |
| # subsequent tags. If that happens, look for tags in the |
| # entire description. |
| emails = re.findall(m, commit_message, re.M) |
| return list(set(emails)) |
| except subprocess.CalledProcessError as e: |
| raise type(e)( |
| "Could not retrieve tag e-mails for commit %s" % sha, e.cmd |
| ) from e |
| except IndexError: |
| # sha does do not have a recognized tag |
| return None |
| |
| |
| # match "vX.Y[.Z][.rcN]" |
| version = re.compile(r"(v[0-9]+(?:\.[0-9]+)+(?:-rc[0-9]+)?)\s*") |
| |
| |
| def get_integrated_tag(sha): |
| """For a given SHA, find the first upstream tag that includes it.""" |
| |
| try: |
| path = common.get_kernel_absolute_path(common.UPSTREAM_PATH) |
| cmd = ["describe", "--match", "v*", "--contains", sha] |
| tag = _git_check_output(path, cmd) |
| return version.match(tag).group() |
| except AttributeError: |
| return None |
| except subprocess.CalledProcessError: |
| return None |
| |
| |
| class commitHandler: |
| """Class to control active accesses on a git repository""" |
| |
| commit_list = {} |
| |
| def __init__(self, kernel, branch=None, full_reset=True): |
| self.kernel = kernel |
| self.metadata = common.get_kernel_metadata(kernel) |
| if not branch: |
| branch = self.metadata.branches[0] |
| self.branch = branch |
| self.branchname = self.metadata.get_kernel_branch(branch) |
| self.path = common.get_kernel_absolute_path(self.metadata.path) |
| self.status = "unknown" |
| self.full_reset = full_reset |
| |
| if kernel not in self.commit_list: |
| self.commit_list[kernel] = {} |
| |
| current_branch_cmd = ["symbolic-ref", "-q", "--short", "HEAD"] |
| try: |
| self.current_branch = self.__git_check_output( |
| current_branch_cmd |
| ).rstrip() |
| except subprocess.CalledProcessError: |
| self.__reset_head() |
| |
| def __base_tag(self): |
| """Return base tag for selected branch |
| |
| The base tag is derived from the tag template in metadata or, |
| if the tag template is empty, from the most recent tag in the |
| selected branch. |
| """ |
| if self.metadata.tag_template: |
| return self.metadata.tag_template % self.branch |
| # Pick base tag from most recent tag in branch (self.branchname) |
| cmd = ["describe", "--abbrev=0", self.branchname] |
| return self.__git_check_output(cmd).rstrip() |
| |
| def __git_command(self, command): |
| return ["git", "-C", self.path] + command |
| |
| def __git_check_output(self, command): |
| cmd = self.__git_command(command) |
| return subprocess.check_output(cmd, encoding="utf-8", errors="ignore") |
| |
| def __git_run(self, command): |
| cmd = self.__git_command(command) |
| subprocess.run(cmd, check=True) |
| |
| def __reset_hard_ref(self, reference): |
| """Force reset to provided reference""" |
| reset_cmd = ["reset", "-q", "--hard", reference] |
| self.__git_run(reset_cmd) |
| |
| def __reset_hard_head(self): |
| """Force hard reset to git head in checked out branch""" |
| try: |
| self.__reset_hard_ref("HEAD") |
| except subprocess.CalledProcessError: |
| self.__reset_head() |
| |
| def __reset_hard_origin(self): |
| """Force hard reset to head of remote branch""" |
| self.__reset_hard_ref("origin/%s" % self.branchname) |
| |
| def __reset_head(self): |
| """Reset HEAD to one of the known good states.""" |
| branch = ["branch", "-q", "-f", self.branchname, "origin/%s" % self.branchname] |
| checkout = ["checkout", "-q", "-f", self.branchname] |
| clean_untracked = ["clean", "-d", "-x", "-f", "-q"] |
| |
| self.__git_run(branch) |
| self.__git_run(checkout) |
| self.__git_run(clean_untracked) |
| |
| self.current_branch = self.branchname |
| |
| def __checkout_and_clean(self): |
| """Clean up uncommitted files in branch and checkout to be up to date with origin.""" |
| clean_untracked = ["clean", "-d", "-x", "-f", "-q"] |
| checkout = ["checkout", "-q", self.branchname] |
| |
| self.__reset_hard_head() |
| self.__git_run(clean_untracked) |
| |
| if self.current_branch != self.branchname: |
| self.__git_run(checkout) |
| self.current_branch = self.branchname |
| |
| if self.full_reset: |
| self.__reset_hard_origin() |
| |
| def __setup(self, branch=None): |
| """Local setup function, to be called for each access. |
| |
| Also sets the active branch if provided. |
| """ |
| if branch and branch != self.branch: |
| self.branch = branch |
| self.branchname = self.metadata.get_kernel_branch(branch) |
| self.status = "unknown" |
| |
| if self.status == "unknown": |
| self.__checkout_and_clean() |
| elif self.status == "changed": |
| self.__reset_hard_origin() |
| |
| self.status = "clean" |
| |
| def __search_subject(self, sha): |
| """Check if subject associated with 'sha' exists in the current branch""" |
| |
| try: |
| # Retrieve subject line of provided SHA |
| cmd = ["log", "--pretty=format:%s", "-n", "1", sha] |
| subject = self.__git_check_output(cmd) |
| except subprocess.CalledProcessError: |
| logging.error("Failed to get subject for sha %s", sha) |
| return False |
| |
| commit_list = self.commit_list[self.kernel] |
| if self.branch not in commit_list: |
| cmd = [ |
| "log", |
| "--no-merges", |
| "--format=%s", |
| "%s..%s" % (self.__base_tag(), self.branchname), |
| ] |
| subjects = self.__git_check_output(cmd) |
| commit_list[self.branch] = subjects.splitlines() |
| |
| # The following is a raw search which will match, for example, a revert of a commit. |
| # A better method to check if commits have been applied would be desirable. |
| subjects = commit_list[self.branch] |
| return any(subject in s for s in subjects) |
| |
| def __get_git_push_cmd(self, reviewers, cc): |
| """Generates git push command with added reviewers and autogenerated tag. |
| |
| Read more about gerrit tags here: |
| https://gerrit-review.googlesource.com/Documentation/cmd-receive-pack.html |
| """ |
| reviewers_tag = ["r=%s" % r for r in reviewers] if reviewers else [] |
| cc_tag = ["cc=%s" % c for c in cc] if cc else [] |
| autogenerated_tag = ["t=autogenerated", "t=cop"] |
| tags = ",".join(reviewers_tag + cc_tag + autogenerated_tag) |
| head = "HEAD:refs/for/%s%%%s" % (self.branchname, tags) |
| return ["push", "origin", head] |
| |
| def fetch(self, remote=None): |
| """Fetch changes from provided remote or from origin""" |
| if not remote: |
| remote = "origin" |
| self.__setup() |
| fetch_cmd = ["fetch", "-q", remote] |
| self.__git_run(fetch_cmd) |
| self.status = "changed" |
| |
| def pull(self, branch=None): |
| """Pull changes from remote repository into provided or default branch""" |
| self.__setup(branch) |
| pull_cmd = ["pull", "-q"] |
| self.__git_run(pull_cmd) |
| |
| def cherry_pick_and_push( |
| self, |
| fixer_upstream_sha, |
| fixer_changeid, |
| fix_commit_message, |
| reviewers, |
| cc, |
| ): |
| """Cherry picks upstream commit into chrome repo. |
| |
| Adds reviewers and autogenerated tag with the pushed commit. |
| """ |
| |
| logging.info( |
| "Cherry-picking SHA %s into %s", fixer_upstream_sha, self.branchname |
| ) |
| |
| self.__setup() |
| try: |
| self.status = "changed" |
| self.__git_run(["cherry-pick", "-n", fixer_upstream_sha]) |
| self.__git_run(["commit", "-s", "-m", fix_commit_message]) |
| |
| # commit has been cherry-picked and committed locally, precommit hook |
| # in git repository adds changeid to the commit message. Pick it unless |
| # we already have one passed as parameter. |
| if not fixer_changeid: |
| fixer_changeid = get_commit_changeid_linux_chrome("HEAD") |
| |
| # Sometimes the commit hook doesn't attach the Change-Id to the last |
| # paragraph in the commit message. This seems to happen if the commit |
| # message includes '---' which would normally identify the start of |
| # comments. If the Change-Id is not in the last paragraph, uploading |
| # the patch is rejected by Gerrit. Force-move the Change-Id to the end |
| # of the commit message to solve the problem. This conveniently also |
| # replaces the auto-generated Change-Id with the optional Change-Id |
| # passed as parameter. |
| commit_message = get_chrome_commit_message("HEAD") |
| commit_message = re.sub(r"Change-Id:.*\n?", "", commit_message) |
| commit_message = commit_message.rstrip() |
| commit_message += "\nChange-Id: %s" % fixer_changeid |
| self.__git_run(["commit", "--amend", "-m", commit_message]) |
| |
| git_push_cmd = self.__get_git_push_cmd(reviewers, cc) |
| self.__git_run(git_push_cmd) |
| |
| return fixer_changeid |
| except subprocess.CalledProcessError as e: |
| raise ValueError( |
| "Failed to cherrypick and push upstream fix %s on branch %s" |
| % (fixer_upstream_sha, self.branchname) |
| ) from e |
| finally: |
| self.__reset_hard_head() |
| self.status = "changed" |
| |
| def cherrypick_status(self, sha, branch=None, apply=True): |
| """cherry-pick provided sha into repository and branch identified by this class instance |
| |
| Return Status Enum: |
| MERGED if the patch has already been applied, |
| OPEN if the patch is missing and applies cleanly, |
| CONFLICT if the patch is missing and fails to apply. |
| """ |
| |
| self.__setup(branch) |
| |
| ret = None |
| try: |
| applied = self.__search_subject(sha) |
| if applied: |
| ret = common.Status.MERGED |
| raise ValueError |
| |
| if not apply: |
| raise ValueError |
| |
| result = subprocess.call( |
| self.__git_command(["cherry-pick", "-n", sha]), |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| ) |
| if result: |
| ret = common.Status.CONFLICT |
| raise ValueError |
| |
| diff = self.__git_check_output(["diff", "HEAD"]) |
| if diff: |
| ret = common.Status.OPEN |
| raise ValueError |
| |
| ret = common.Status.MERGED |
| |
| except ValueError: |
| pass |
| |
| except subprocess.CalledProcessError: |
| ret = common.Status.CONFLICT |
| |
| finally: |
| self.__reset_hard_head() |
| self.status = "clean" |
| |
| return ret |