| # Copyright 2017 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Git utility.""" |
| |
| from __future__ import annotations |
| |
| import bisect |
| import dataclasses |
| import logging |
| import os |
| import re |
| import shutil |
| import stat |
| import subprocess |
| import tempfile |
| import time |
| import typing |
| |
| from bisect_kit import cache_util |
| from bisect_kit import errors |
| from bisect_kit import util |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| GIT_FULL_COMMIT_ID_LENGTH = 40 |
| |
| # Minimal acceptable length of git commit id. |
| # |
| # For chromium, hash collision rate over number of digits: |
| # - 6 digits: 4.85% |
| # - 7 digits: 0.32% |
| # - 8 digits: 0.01% |
| # As foolproof check, 7 digits should be enough. |
| GIT_MIN_COMMIT_ID_LENGTH = 7 |
| |
| |
| @dataclasses.dataclass(order=True, eq=True, frozen=True) |
| class Commit: |
| """A data class represents a git commit.""" |
| |
| timestamp: int = 0 |
| rev: str = '' |
| subject: str = 'dummy subject' |
| |
| @staticmethod |
| def from_rev_line(rev_line: str) -> Commit: |
| # The input `rev_line` may not include the subject part, thus here we unpack |
| # `subject` as a list with an asterisk. The length of the `subject` list |
| # should be at most one. |
| ts_str, rev, *subject = rev_line.split(' ', 2) |
| return Commit(int(ts_str), rev, *subject) |
| |
| @staticmethod |
| def make_commit_list(commit_tuples) -> list[Commit]: |
| return [Commit(*tuple) for tuple in commit_tuples] |
| |
| |
| @dataclasses.dataclass(eq=True) |
| class CommitMeta: |
| """A data class represents the metadata of a git commit.""" |
| |
| message: str | None = None |
| object: str | None = None |
| tree: str | None = None |
| type: str | None = None |
| parent: list[str] | None = None |
| author: str | None = None |
| committer: str | None = None |
| author_time: int | None = None |
| committer_time: int | None = None |
| |
| UNKNOWN_SUMMARY: typing.ClassVar[str] = '(unknown)' |
| |
| @staticmethod |
| def from_git_commit_object(git_cat_file_output: str) -> CommitMeta: |
| meta_dict: dict[str, typing.Any] = {} |
| header, meta_dict['message'] = git_cat_file_output.split('\n\n', 1) |
| for line in header.splitlines(): |
| if m := re.match(r'^(object|tree|type) (\w+)', line): |
| meta_dict[m.group(1)] = m.group(2) |
| continue |
| |
| if m := re.match(r'^parent (\w+)', line): |
| meta_dict['parent'] = line.split()[1:] |
| continue |
| |
| if m := re.match(r'^(author|committer) (.*) (\d+) (\S+)$', line): |
| meta_dict[m.group(1)] = m.group(2) |
| meta_dict['%s_time' % m.group(1)] = int(m.group(3)) |
| continue |
| return CommitMeta(**meta_dict) |
| |
| @staticmethod |
| def get_summary(meta) -> str: |
| if meta is None or meta.message is None: |
| return CommitMeta.UNKNOWN_SUMMARY |
| return meta.message.splitlines()[0] |
| |
| |
| @dataclasses.dataclass(eq=True, frozen=True) |
| class Reference: |
| """A data class represents a git reference.""" |
| |
| commit_hash: str |
| reference_name: str |
| |
| @staticmethod |
| def from_ls_remote_line(line: str) -> Reference: |
| commit_hash, reference_name = line.split(maxsplit=1) |
| return Reference(commit_hash, reference_name) |
| |
| |
| @dataclasses.dataclass(order=True, eq=True, frozen=True) |
| class Period: |
| """A data class represents an inclusive period of time.""" |
| |
| begin: int = 0 |
| end: int = 0 |
| |
| def __contains__(self, other): |
| if isinstance(other, int): |
| return self.begin <= other <= self.end |
| if isinstance(other, Period): |
| return self.begin <= other.begin and other.end <= self.end |
| raise TypeError( |
| 'The membership test only accepts a `Period` or an `int`' |
| ) |
| |
| |
| class PatchGitConfig: |
| """Generates temparory git config to avoid bare repository issue.""" |
| |
| # TODO(zjchang): remove the workaround when gclient supports |
| # safe.bareRepository = explict. |
| def __init__(self): |
| self.patched_system_config = None |
| |
| def __enter__(self) -> dict: |
| """Generates temparory git config. |
| |
| Returns: |
| A dict with GIT_CONFIG_SYSTEM if should apply new config. |
| """ |
| new_env = os.environ.copy() |
| |
| home_directory = os.path.expanduser('~') |
| origin_system_config = '/etc/gitconfig' |
| # This config is auto synced by gLinux. |
| origin_core_config = '/usr/share/git-core/config' |
| try: |
| bare_repository = config( |
| home_directory, |
| '-f', |
| origin_core_config, |
| 'safe.bareRepository', |
| disable_patch_config=True, |
| ) |
| except subprocess.CalledProcessError: |
| bare_repository = '' |
| if bare_repository.strip() != 'explicit': |
| return new_env |
| |
| self.patched_system_config = tempfile.NamedTemporaryFile( |
| delete=False |
| ).name |
| shutil.copy(origin_system_config, self.patched_system_config) |
| # patch system config |
| config( |
| home_directory, |
| '-f', |
| self.patched_system_config, |
| 'safe.bareRepository', |
| 'all', |
| disable_patch_config=True, |
| ) |
| new_env['GIT_CONFIG_SYSTEM'] = self.patched_system_config |
| return new_env |
| |
| def __exit__(self, exc_type, exc_value, exc_tb): |
| if self.patched_system_config: |
| os.unlink(self.patched_system_config) |
| |
| |
| def is_git_rev(rev: str) -> bool: |
| """Is a git hash-like version string. |
| |
| It accepts shortened hash with at least 7 digits. |
| """ |
| if not GIT_MIN_COMMIT_ID_LENGTH <= len(rev) <= GIT_FULL_COMMIT_ID_LENGTH: |
| return False |
| return bool(re.match(r'^[0-9a-f]+$', rev)) |
| |
| |
| def argtype_git_rev(rev: str) -> str: |
| """Validates git hash.""" |
| if not is_git_rev(rev): |
| msg = ( |
| 'should be git hash, at least %d digits' % GIT_MIN_COMMIT_ID_LENGTH |
| ) |
| raise errors.ArgTypeError(msg, '1a2b3c4d5e') |
| return rev |
| |
| |
| def is_git_root(path: str) -> bool: |
| """Is given path root of git repo.""" |
| return os.path.exists(os.path.join(path, '.git')) |
| |
| |
| def is_git_bare_dir(path: str) -> bool: |
| """Is inside .git folder or bare git checkout.""" |
| if not os.path.isdir(path): |
| return False |
| try: |
| return ( |
| git_output( |
| ['rev-parse', '--is-bare-repository'], |
| cwd=path, |
| ) |
| == 'true\n' |
| ) |
| except subprocess.CalledProcessError: |
| return False |
| |
| |
| def git(cmd: list[str], **kwargs): |
| with PatchGitConfig() as new_env: |
| if 'env' in kwargs: |
| new_env = kwargs['env'] | new_env |
| del kwargs['env'] |
| cmd = ['git'] + cmd |
| util.check_call(*cmd, env=new_env, **kwargs) |
| |
| |
| def git_output(cmd: list[str], **kwargs): |
| cmd = ['git'] + cmd |
| if 'disable_patch_config' in kwargs: |
| val = kwargs['disable_patch_config'] |
| del kwargs['disable_patch_config'] |
| if val: |
| return util.check_output(*cmd, **kwargs) |
| with PatchGitConfig() as new_env: |
| if 'env' in kwargs: |
| new_env = kwargs['env'] | new_env |
| del kwargs['env'] |
| return util.check_output(*cmd, env=new_env, **kwargs) |
| |
| |
| def clone(git_repo: str, repo_url: str, reference: str | None = None) -> None: |
| """Clone a git repo. |
| |
| Args: |
| git_repo: path of git repo. |
| repo_url: url of git repo. |
| reference: optional git reference. |
| """ |
| if not os.path.exists(git_repo): |
| os.makedirs(git_repo) |
| cmd = ['clone', repo_url, '.'] |
| if reference: |
| cmd += ['--reference', reference] |
| git(cmd, cwd=git_repo) |
| |
| |
| def checkout_version(git_repo: str, rev: str) -> None: |
| """git checkout. |
| |
| Args: |
| git_repo: path of git repo. |
| rev: git commit revision to checkout. |
| """ |
| git(['checkout', '-q', '-f', rev], cwd=git_repo) |
| |
| |
| def init(git_repo: str, initial_branch: str = 'main') -> None: |
| """git init. |
| |
| git_repo and its parent directories will be created if they don't exist. |
| |
| Args: |
| git_repo: path of git repo. |
| initial_branch: the default branch after git init |
| """ |
| if not os.path.exists(git_repo): |
| os.makedirs(git_repo) |
| |
| git(['init', '-q', '--initial-branch', initial_branch], cwd=git_repo) |
| |
| |
| def pull(git_repo: str) -> None: |
| git(['pull'], cwd=git_repo) |
| |
| |
| def commit_file( |
| git_repo: str, |
| path: str, |
| message: str, |
| content: str, |
| commit_time: str | None = None, |
| author_time: str | None = None, |
| ) -> None: |
| """Commit a file. |
| |
| Args: |
| git_repo: path of git repo |
| path: file path, relative to git_repo |
| message: commit message |
| content: file content |
| commit_time: commit timestamp |
| author_time: author timestamp |
| """ |
| if author_time is None: |
| author_time = commit_time |
| |
| env = {} |
| if author_time: |
| env['GIT_AUTHOR_DATE'] = str(author_time) |
| if commit_time: |
| env['GIT_COMMITTER_DATE'] = str(commit_time) |
| |
| full_path = os.path.join(git_repo, path) |
| dirname = os.path.dirname(full_path) |
| if not os.path.exists(dirname): |
| os.makedirs(dirname) |
| with open(full_path, 'w') as f: |
| f.write(content) |
| |
| git(['add', path], cwd=git_repo) |
| git(['commit', '-q', '-m', message, path], cwd=git_repo, env=env) |
| |
| |
| def config(git_repo: str, *args, **kwargs) -> str: |
| """Wrapper of 'git config'. |
| |
| Args: |
| git_repo: path of git repo. |
| args: parameters pass to 'git config' |
| """ |
| return git_output(['config', *args], cwd=git_repo, **kwargs) |
| |
| |
| def fetch(git_repo: str, *args, retry_prune_if_conflict: bool = False) -> None: |
| """Wrapper of 'git fetch' with retry support. |
| |
| Args: |
| git_repo: path of git repo. |
| args: parameters pass to 'git fetch' |
| retry_prune_if_conflict: retry with --prune if git references conflict |
| """ |
| tries = 0 |
| while True: |
| tries += 1 |
| stderr_lines: list[str] = [] |
| try: |
| git( |
| ['fetch', *args], |
| cwd=git_repo, |
| stderr_callback=stderr_lines.append, |
| ) |
| return |
| except subprocess.CalledProcessError: |
| if tries >= 5: |
| logger.error('git fetch failed too much times') |
| raise |
| stderr = ''.join(stderr_lines) |
| # retry 5xx internal server error |
| if 'The requested URL returned error: 5' in stderr: |
| delay = min(60, 10 * 2**tries) |
| logger.warning( |
| 'git fetch failed, will retry %s seconds later', delay |
| ) |
| time.sleep(delay) |
| continue |
| |
| # We have `retry_prune_if_conflict` instead of always pruning because as |
| # a bisector, we want to keep historical references as long as possible, |
| # even if they have been deleted on remote server. We prune references |
| # only if conflict is detected. |
| if ( |
| '(unable to update local ref)' in stderr |
| and retry_prune_if_conflict |
| and '--prune' not in args |
| ): |
| logger.warning( |
| 'git fetch failed due to conflicting references; try again with --prune' |
| ) |
| args = '--prune', *args |
| continue |
| raise |
| |
| |
| def _adjust_timestamp_increasingly( |
| commits: list[Commit], show_warning: bool = False |
| ) -> list[Commit]: |
| """Adjust commit timestamps. |
| |
| After adjust, the timestamps are increasing. |
| |
| Args: |
| commits: A list of `Commit` objects. |
| |
| Returns: |
| The adjusted list of `Commit` objects. |
| """ |
| result: list[Commit] = [] |
| adjusted_count = 0 |
| last_timestamp = -1 |
| for commit in commits: |
| if commit.timestamp < last_timestamp: |
| adjusted_count += 1 |
| last_timestamp = max(last_timestamp, commit.timestamp) |
| result.append(Commit(last_timestamp, commit.rev, commit.subject)) |
| |
| if show_warning and adjusted_count > 0: |
| logger.warning('Commit timestamps are not increasing') |
| logger.warning('%d timestamps adjusted', adjusted_count) |
| |
| return result |
| |
| |
| class FastLookupFailed(Exception): |
| """No data is cached for this query. |
| |
| The caller should fallback to the original operation. |
| """ |
| |
| |
| class FastLookupEntry: |
| """Cached commits from one branch of given time period. |
| |
| With this class, we can look up commit via commit hash and timestamp fast. |
| """ |
| |
| def __init__(self, git_repo: str, branch: str): |
| self.git_repo: str = git_repo |
| self.branch: str = branch |
| self.optimized_period: Period | None = None |
| self.cached: list[Commit] = [] |
| self.commit_to_index: dict[str, int] = {} |
| |
| def optimize(self, period: Period): |
| assert period.begin <= period.end |
| if self.optimized_period and period in self.optimized_period: |
| # already done |
| return |
| |
| self.cached = get_revlist_by_period(self.git_repo, self.branch, period) |
| self.optimized_period = period |
| |
| # Adjust timestamps, so we can do binary search by timestamp |
| self.cached = _adjust_timestamp_increasingly(self.cached) |
| |
| self.commit_to_index = { |
| commit.rev: i for i, commit in enumerate(self.cached) |
| } |
| |
| def get_rev_by_time(self, timestamp: int) -> str | None: |
| if not self.optimized_period: |
| raise FastLookupFailed |
| if timestamp not in self.optimized_period: |
| raise FastLookupFailed |
| |
| # Note that, the return value might be different as "git rev-list" if the |
| # actual commit timestamps are not fully increasing. |
| idx = bisect.bisect_right(self.cached, Commit(timestamp)) |
| if idx == 0 and timestamp < self.cached[0].timestamp: |
| return None |
| if idx == len(self.cached) or self.cached[idx].timestamp != timestamp: |
| idx -= 1 |
| return self.cached[idx].rev |
| |
| def is_containing_commit(self, rev: str) -> bool: |
| if rev in self.commit_to_index: |
| return True |
| raise FastLookupFailed |
| |
| |
| class FastLookup: |
| """Collection of FastLookupEntry""" |
| |
| def __init__(self): |
| self.entries: dict[str, dict[str, FastLookupEntry]] = {} |
| self.target_period: Period | None = None |
| |
| def optimize(self, period: Period): |
| self.target_period = period |
| |
| def disable(self): |
| self.target_period = None |
| self.entries = {} |
| |
| def get_rev_by_time( |
| self, git_repo: str, timestamp: int, branch: str |
| ) -> str | None: |
| if not self.target_period: |
| raise FastLookupFailed |
| if timestamp not in self.target_period: |
| raise FastLookupFailed |
| |
| if git_repo not in self.entries: |
| self.entries[git_repo] = {} |
| if branch not in self.entries[git_repo]: |
| self.entries[git_repo][branch] = FastLookupEntry(git_repo, branch) |
| entry = self.entries[git_repo][branch] |
| entry.optimize(self.target_period) |
| return entry.get_rev_by_time(timestamp) |
| |
| def is_containing_commit(self, git_repo: str, rev: str) -> bool: |
| # This function is optimized only after get_rev_by_time() is invoked. |
| if git_repo not in self.entries: |
| raise FastLookupFailed |
| |
| for entry in self.entries[git_repo].values(): |
| try: |
| return entry.is_containing_commit(rev) |
| except FastLookupFailed: |
| pass |
| raise FastLookupFailed |
| |
| |
| fast_lookup = FastLookup() |
| |
| |
| @cache_util.Cache.default_disabled |
| def is_containing_commit(git_repo: str, rev: str) -> bool: |
| """Determines given commit exists. |
| |
| Args: |
| git_repo: path of git repo. |
| rev: git commit revision in query. |
| |
| Returns: |
| True if rev is inside given git repo. If git_repo is not a git folder, |
| returns False as well. |
| """ |
| try: |
| return fast_lookup.is_containing_commit(git_repo, rev) |
| except FastLookupFailed: |
| pass |
| |
| try: |
| return git_output(['cat-file', '-t', rev], cwd=git_repo) in [ |
| 'commit\n', |
| 'tag\n', |
| ] |
| except subprocess.CalledProcessError: |
| return False |
| except OSError: |
| return False |
| |
| |
| @cache_util.Cache.default_disabled |
| def is_ancestor_commit(git_repo: str, old: str, new: str) -> bool: |
| """Determines `old` commit is ancestor of `new` commit. |
| |
| Args: |
| git_repo: path of git repo. |
| old: the ancestor commit. |
| new: the descendant commit. |
| |
| Returns: |
| True only if `old` is the ancestor of `new`. One commit is not considered |
| as ancestor of itself. |
| """ |
| try: |
| return ( |
| git_output( |
| ['rev-list', '--ancestry-path', '-1', '%s..%s' % (old, new)], |
| cwd=git_repo, |
| ) |
| != '' |
| ) |
| except subprocess.CalledProcessError: |
| return False |
| |
| |
| def ls_remote( |
| git_repo: str, |
| repository: str | None = None, |
| refs: list[str] | None = None, |
| ) -> list[Reference]: |
| """List references in a remote repository. |
| |
| Args: |
| git_repo: path of git repo. |
| repository: remote repository name to query. |
| refs: reference matching patterns. |
| """ |
| if refs and not repository: |
| raise errors.InternalError( |
| 'ls-remote: repository is not assigned while refs has value' |
| ) |
| cmd = ['ls-remote'] |
| if repository: |
| cmd.append(repository) |
| if refs: |
| cmd += refs |
| lines = git_output(cmd, cwd=git_repo).splitlines() |
| return [Reference.from_ls_remote_line(x) for x in lines] |
| |
| |
| @cache_util.Cache.default_disabled |
| def get_commit_metadata(git_repo: str, rev: str) -> CommitMeta: |
| """Get metadata of given commit. |
| |
| Args: |
| git_repo: path of git repo. |
| rev: git commit revision in query. |
| |
| Returns: |
| dict of metadata, including (if available): |
| tree: hash of git tree object |
| parent: list of parent commits; this field is unavailable for the very |
| first commit of git repo. |
| author: name and email of author |
| author_time: author timestamp (without timezone information) |
| committer: name and email of committer |
| committer_time: commit timestamp (without timezone information) |
| message: commit message text |
| """ |
| data = git_output(['cat-file', '-p', rev], cwd=git_repo, log_stdout=False) |
| return CommitMeta.from_git_commit_object(data) |
| |
| |
| def get_batch_commit_metadata( |
| git_repo: str, revs: typing.Iterable[str] |
| ) -> dict[str, CommitMeta | None]: |
| query = '\n'.join(revs) |
| logger.debug('get_batch_commit_metadata %r', query) |
| with tempfile.NamedTemporaryFile('w+t') as f: |
| f.write(query) |
| f.flush() |
| # The `util.check_output_in_bytes()` function doesn't support stdin, so use |
| # shell redirect instead. |
| # Call binary version because we need to count size in bytes later. |
| with PatchGitConfig() as new_env: |
| data = util.check_output_in_bytes( |
| 'sh', |
| '-c', |
| 'git cat-file --batch < ' + f.name, |
| cwd=git_repo, |
| env=new_env, |
| ) |
| |
| metas: dict[str, CommitMeta | None] = {} |
| while data: |
| first_line, data = data.split(b'\n', 1) |
| m = re.match(r'^(\w+) (\w+)(?: (\d+))?', first_line.decode('utf8')) |
| assert m, repr(first_line) |
| object_name, object_type = m.group(1, 2) |
| if not m.group(3): |
| metas[object_name] = None |
| continue |
| assert object_type in ['commit', 'tag'], ( |
| 'unsupported object type: %s' % object_type |
| ) |
| object_size = int(m.group(3)) |
| assert data[object_size] == ord(b'\n'), repr(data[object_size]) |
| obj, data = data[:object_size], data[object_size + 1 :] |
| metas[object_name] = CommitMeta.from_git_commit_object( |
| obj.decode('utf8') |
| ) |
| return metas |
| |
| |
| def get_revlist(git_repo: str, old: str, new: str) -> list[str]: |
| """Enumerates git commit between two revisions (inclusive). |
| |
| Args: |
| git_repo: path of git repo. |
| old: git commit revision. |
| new: git commit revision. |
| |
| Returns: |
| list of git revisions. The list contains the input revisions, old and new. |
| """ |
| assert old |
| assert new |
| cmd = [ |
| 'rev-list', |
| '--first-parent', |
| '--reverse', |
| '%s^..%s' % (old, new), |
| ] |
| revlist = git_output(cmd, cwd=git_repo).splitlines() |
| return revlist |
| |
| |
| def get_commit_log(git_repo: str, rev: str) -> str: |
| """Get git commit log. |
| |
| Args: |
| git_repo: path of git repo. |
| rev: git commit revision. |
| |
| Returns: |
| commit log message |
| """ |
| cmd = ['log', '-1', '--format=%B', rev] |
| msg = git_output(cmd, cwd=git_repo) |
| return msg |
| |
| |
| def get_commit_hash(git_repo: str, rev: str) -> str: |
| """Get git commit hash. |
| |
| Args: |
| git_repo: path of git repo. |
| rev: could be git tag, branch, or (shortened) commit hash |
| |
| Returns: |
| full git commit hash |
| |
| Raises: |
| ValueError: `rev` is not unique or doesn't exist |
| """ |
| try: |
| # Use '^{commit}' to restrict search only commits. |
| # Use '--' to avoid ambiguity, like matching rev against path name. |
| output = git_output( |
| ['rev-parse', '%s^{commit}' % rev, '--'], cwd=git_repo |
| ) |
| git_rev = output.rstrip('-\n') |
| except subprocess.CalledProcessError as e: |
| # Do not use 'git rev-parse --disambiguate' to determine uniqueness |
| # because it searches objects other than commits as well. |
| raise ValueError('%s is not unique or does not exist' % rev) from e |
| assert is_git_rev(git_rev) |
| return git_rev |
| |
| |
| def get_commit_time(git_repo: str, rev: str, path: str | None = None) -> int: |
| """Get git commit timestamp. |
| |
| Args: |
| git_repo: path of git repo |
| rev: git commit id, branch name, tag name, or other git object |
| path: path, relative to git_repo |
| |
| Returns: |
| timestamp (int) |
| """ |
| cmd = ['log', '-1', '--format=%ct', rev] |
| if path: |
| cmd += ['--', path] |
| line = git_output(cmd, cwd=git_repo) |
| return int(line) |
| |
| |
| def is_symbolic_link(git_repo: str, rev: str, path: str) -> bool: |
| """Check if a file is symbolic link. |
| |
| Args: |
| git_repo: path of git repo |
| rev: git commit id |
| path: file path |
| |
| Returns: |
| True if the specified file is a symbolic link in repo. |
| |
| Raises: |
| ValueError if not found |
| """ |
| # format: 120000 blob 8735a8c1dd96ede39a21d983d5c96792fd15c1a5 default.xml |
| # TODO(kcwu): handle escaped path with special characters |
| parts = git_output( |
| ['ls-tree', rev, '--full-name', path], cwd=git_repo |
| ).split() |
| if len(parts) >= 4 and parts[3] == path: |
| return stat.S_ISLNK(int(parts[0], 8)) |
| |
| raise ValueError( |
| 'file %s is not found in repo:%s rev:%s' % (path, git_repo, rev) |
| ) |
| |
| |
| @cache_util.Cache.default_disabled |
| def get_file_from_revision(git_repo: str, rev: str, path: str) -> str: |
| """Get file content of given revision. |
| |
| Args: |
| git_repo: path of git repo |
| rev: git commit id |
| path: file path |
| |
| Returns: |
| file content (str) |
| """ |
| result = git_output( |
| ['show', '%s:%s' % (rev, path)], cwd=git_repo, log_stdout=False |
| ) |
| |
| # It might be a symbolic link. |
| # In extreme case, it's possible that filenames contain special characters, |
| # like newlines. In practice, it should be safe to assume no such cases and |
| # reduce disk i/o. |
| if '\n' not in result and is_symbolic_link(git_repo, rev, path): |
| return get_file_from_revision(git_repo, rev, result) |
| |
| return result |
| |
| |
| def list_dir_from_revision(git_repo: str, rev: str, path: str) -> list[str]: |
| """Lists entries of directory of given revision. |
| |
| Args: |
| git_repo: path of git repo |
| rev: git commit id |
| path: directory path, relative to git root |
| |
| Returns: |
| list of names |
| |
| Raises: |
| subprocess.CalledProcessError: if `path` doesn't exists in `rev` |
| """ |
| return git_output( |
| ['ls-tree', '--name-only', '%s:%s' % (rev, path)], |
| cwd=git_repo, |
| log_stdout=False, |
| ).splitlines() |
| |
| |
| def get_rev_by_time( |
| git_repo: str, timestamp: int, branch: str | None, path: str | None = None |
| ) -> str | None: |
| """Query commit of given time. |
| |
| Args: |
| git_repo: path of git repo. |
| timestamp: timestamp |
| branch: only query parent of the `branch`. If branch=None, it means 'HEAD' |
| (current branch, usually). |
| path: only query history of path, relative to git_repo |
| |
| Returns: |
| git commit hash. None if path didn't exist at the given time. |
| """ |
| if not branch: |
| branch = 'HEAD' |
| |
| if not path: |
| try: |
| return fast_lookup.get_rev_by_time(git_repo, timestamp, branch) |
| except FastLookupFailed: |
| pass |
| |
| cmd = [ |
| 'rev-list', |
| '--first-parent', |
| '-1', |
| '--before', |
| str(timestamp), |
| branch, |
| '--', |
| ] |
| if path: |
| cmd += [path] |
| |
| result = git_output(cmd, cwd=git_repo).strip() |
| return result or None |
| |
| |
| def get_revlist_by_period( |
| git_repo: str, branch: str, period: Period |
| ) -> list[Commit]: |
| # Find the last commit before the begin of given period. |
| text = git_output( |
| [ |
| 'rev-list', |
| '--first-parent', |
| '--timestamp', |
| '-1', |
| '--before', |
| str(period.begin - 1), |
| branch, |
| '--', |
| ], |
| cwd=git_repo, |
| ) |
| |
| # Find commits in the period. |
| text += git_output( |
| [ |
| 'rev-list', |
| '--first-parent', |
| '--timestamp', |
| '--reverse', |
| '--after', |
| str(period.begin), |
| '--before', |
| str(period.end), |
| branch, |
| '--', |
| ], |
| cwd=git_repo, |
| ) |
| |
| return [Commit.from_rev_line(line) for line in text.splitlines()] |
| |
| |
| def reset_hard(git_repo: str) -> None: |
| """Restore modified and deleted files. |
| |
| This is simply wrapper of "git reset --hard". |
| |
| Args: |
| git_repo: path of git repo. |
| """ |
| git_output(['reset', '--hard'], cwd=git_repo) |
| |
| |
| def clean( |
| git_repo: str, |
| remove_ignored: bool = False, |
| remove_folder: bool = True, |
| is_dry_run: bool = False, |
| exclude_list: list[str] | None = None, |
| ) -> None: |
| """Clean up git repo directory. |
| |
| Args: |
| git_repo: path of git repo. |
| remove_ignored: remove files ignore by `.gitignore`. |
| remove_folder: remove folders. |
| is_dry_run: dry run. |
| exclude_list: files and/or directories to ignore, relative to git_repo |
| """ |
| args = [] |
| if remove_ignored: |
| args.append('-x') |
| if remove_folder: |
| args.append('-d') |
| if is_dry_run: |
| args.append('-n') |
| else: |
| args.append('-f') |
| if exclude_list is None: |
| exclude_list = [] |
| for exclude_pattern in exclude_list: |
| args.append('--exclude') |
| args.append(exclude_pattern) |
| git_output(['clean', *args], cwd=git_repo) |
| |
| |
| def list_untracked( |
| git_repo: str, exclude_list: list[str] | None = None |
| ) -> list[str]: |
| """List untracked files and directories. |
| |
| Args: |
| git_repo: path of git repo. |
| exclude_list: files and/or directories to ignore, relative to git_repo |
| |
| Returns: |
| list of paths, relative to git_repo |
| """ |
| exclude_flags = [] |
| if exclude_list: |
| for exclude in exclude_list: |
| assert not os.path.isabs(exclude), 'should be relative' |
| exclude_flags += ['--exclude', '/' + re.escape(exclude)] |
| |
| result = [] |
| for path in git_output( |
| ['ls-files', '--others', '--exclude-standard', *exclude_flags], |
| cwd=git_repo, |
| ).splitlines(): |
| # Remove the trailing slash, which means directory. |
| path = path.rstrip('/') |
| result.append(path) |
| return result |
| |
| |
| def remove_lock(git_repo: str) -> None: |
| """Remove git lock files. |
| |
| Args: |
| git_repo: path of git repo. |
| """ |
| head_lock = os.path.join(git_repo, '.git', 'HEAD.lock') |
| index_lock = os.path.join(git_repo, '.git', 'index.lock') |
| for lock_path in [head_lock, index_lock]: |
| if os.path.exists(lock_path): |
| os.unlink(lock_path) |
| logger.warning('git lock file deleted: %s', lock_path) |
| |
| |
| def distclean(git_repo: str, exclude_list: list[str] | None = None) -> None: |
| """Clean up git repo directory. |
| |
| Restore modified and deleted files. Delete untracked files and lock files. |
| |
| Args: |
| git_repo: path of git repo. |
| exclude_list: files and/or directories to ignore, relative to git_repo |
| """ |
| remove_lock(git_repo) |
| reset_hard(git_repo) |
| clean(git_repo, exclude_list=exclude_list) |
| |
| |
| def get_history( |
| git_repo: str, |
| path: str | None = None, |
| branch: str | None = None, |
| after: int | None = None, |
| before: int | None = None, |
| grep: str | None = None, |
| padding_begin: bool = False, |
| padding_end: bool = False, |
| with_subject: bool = False, |
| all_branch: bool = False, |
| ) -> list[Commit]: |
| """Get commit history of given path. |
| |
| `after` and `before` could be outside of lifetime of `path`. `padding` is |
| used to control what to return for such cases. |
| |
| Args: |
| git_repo: path of git repo. |
| path: path to query, relative to git_repo |
| branch: branch name or ref name |
| after: limit history after given time (inclusive) |
| before: limit history before given time (inclusive) |
| grep: limit history that matches the specified regular expression |
| padding_begin: If True, pads returned result with dummy record at exact |
| 'after' time, if 'path' existed at that time. |
| padding_end: If True, pads returned result with dummy record at exact |
| 'before' time, if 'path' existed at that time. |
| with_subject: If True, return commit subject together. |
| all_branch: If True, returns git log regardless the branch name. |
| |
| Returns: |
| List of (timestamp, git hash, subject); or (timestamp, git hash) depends |
| on with_subject flag. They are all events when `path` was added, removed, |
| modified, and start and end time if `padding` is true. If `padding` and |
| `with_subject` are both true, 'dummy subject' will be returned as padding |
| history's subject. |
| |
| For each pair, at `timestamp`, the repo state is `git hash`. In other |
| words, `timestamp` is not necessary the commit time of `git hash` for the |
| padded entries. |
| """ |
| assert not (all_branch and branch) |
| log_format = '%ct %H' if not with_subject else '%ct %H %s' |
| cmd = [ |
| 'log', |
| '--reverse', |
| '--first-parent', |
| '--format=' + log_format, |
| ] |
| if after: |
| cmd += ['--after', str(after)] |
| if before: |
| cmd += ['--before', str(before)] |
| if grep: |
| cmd += ['--grep', grep] |
| if branch: |
| assert not is_git_rev(branch) |
| cmd += [branch] |
| if all_branch: |
| cmd += ['--all'] |
| if path: |
| # '--' is necessary otherwise if `path` is removed in current revision, git |
| # will complain it's an ambiguous argument which may be path or something |
| # else (like git branch name, tag name, etc.) |
| cmd += ['--', path] |
| |
| lines = git_output(cmd, cwd=git_repo).splitlines() |
| result = [Commit.from_rev_line(line) for line in lines] |
| |
| if padding_end: |
| assert before, 'padding_end=True make no sense if before=None' |
| if get_rev_by_time(git_repo, before, branch, path=path): |
| before = int(before) |
| if not result or result[-1].timestamp != before: |
| git_rev = get_rev_by_time(git_repo, before, branch) |
| assert git_rev |
| result.append(Commit(before, git_rev)) |
| |
| if padding_begin: |
| assert after, 'padding_begin=True make no sense if after=None' |
| if get_rev_by_time(git_repo, after, branch, path=path): |
| after = int(after) |
| if not result or result[0].timestamp != after: |
| git_rev = get_rev_by_time(git_repo, after, branch) |
| assert git_rev |
| result.insert(0, Commit(after, git_rev)) |
| |
| return result |
| |
| |
| def get_history_recursively( |
| git_repo: str, |
| path: str, |
| after: int, |
| before: int, |
| parser_callback: typing.Callable[[str, str], list[str] | None], |
| padding_end: bool = True, |
| branch: str | None = None, |
| ) -> list[Commit]: |
| """Get commit history of given path and its dependencies. |
| |
| In comparison to get_history(), get_history_recursively also takes |
| dependencies into consideration. For example, if file A referenced file B, |
| get_history_recursively(A) will return commits of B in addition to A. This |
| applies recursively, so commits of C will be included if file B referenced |
| file C, and so on. |
| |
| This function is file type neutral. `parser_callback(filename, content)` will |
| be invoked to parse file content and should return list of filename of |
| dependencies. If `parser_callback` returns None (usually syntax error), the |
| commit is omitted. |
| |
| Args: |
| git_repo: path of git repo |
| path: path to query, relative to git_repo |
| after: limit history after given time (inclusive) |
| before: limit history before given time (inclusive) |
| parser_callback: callback to parse file content. See above comment. |
| padding_end: If True, pads returned result with dummy record at exact |
| 'after' time, if 'path' existed at that time. |
| branch: branch name or ref name |
| |
| Returns: |
| list of (commit timestamp, git hash) |
| """ |
| history = get_history( |
| git_repo, |
| path, |
| after=after, |
| before=before, |
| padding_begin=True, |
| branch=branch, |
| ) |
| |
| # Collect include information of each commit. |
| includes: dict[str, set[str]] = {} |
| for commit in history: |
| content = get_file_from_revision(git_repo, commit.rev, path) |
| parse_result = parser_callback(path, content) |
| if parse_result is None: |
| continue |
| for include_name in parse_result: |
| if include_name not in includes: |
| includes[include_name] = set() |
| includes[include_name].add(commit.rev) |
| |
| # Analyze the start time and end time of each include. |
| dependencies = [] |
| for include_name, rev_set in includes.items(): |
| appeared = None |
| for commit in history: |
| if commit.rev in rev_set: |
| if not appeared: |
| appeared = commit.timestamp |
| else: |
| if appeared: |
| # dependency file exists in time range [appeared, commit.timestamp) |
| dependencies.append( |
| (include_name, appeared, commit.timestamp - 1) |
| ) |
| appeared = None |
| |
| if appeared is not None: |
| dependencies.append((include_name, appeared, before)) |
| |
| # Recursion and merge. |
| result = list(history) |
| for include, appeared, disappeared in dependencies: |
| result += get_history_recursively( |
| git_repo, |
| include, |
| appeared, |
| disappeared, |
| parser_callback, |
| padding_end=False, |
| branch=branch, |
| ) |
| |
| # Sort and padding. |
| result.sort(key=lambda x: x.timestamp) |
| if padding_end: |
| result.append(Commit(before, result[-1].rev, result[-1].subject)) |
| |
| # Dedup. |
| result2: list[Commit] = [] |
| for x in result: |
| if result2 and result2[-1] == x: |
| continue |
| result2.append(x) |
| |
| return result2 |
| |
| |
| def get_branches( |
| git_repo: str, |
| all_branches: bool = True, |
| commit: str | None = None, |
| remote: bool = False, |
| ) -> list[str]: |
| """Get branches of a repository. |
| |
| Args: |
| git_repo: path of git repo |
| all_branches: return remote branches if is set to True |
| commit: return branches containing this commit if is not None |
| remote: only remote tracking branches |
| |
| Returns: |
| list of branch names |
| """ |
| cmd = ['branch', '--format=%(refname)'] |
| if all_branches: |
| cmd += ['-a'] |
| if commit: |
| cmd += ['--contains', commit] |
| if remote: |
| cmd.append('--remote') |
| |
| result = [] |
| for line in git_output(cmd, cwd=git_repo).splitlines(): |
| result.append(line.strip()) |
| return result |
| |
| |
| def get_tags(git_repo): |
| cmd = ['tag', '--list'] |
| result = [] |
| for line in git_output(cmd, cwd=git_repo, log_stdout=False).splitlines(): |
| result.append(line.strip()) |
| return result |
| |
| |
| def get_remotes(git_repo): |
| """Gets lists of git remote.""" |
| return git_output(['remote'], cwd=git_repo).splitlines() |
| |
| |
| def get_remote_url(git_repo, remote_name): |
| """Gets url of given remote.""" |
| return git_output(['remote', 'get-url', remote_name], cwd=git_repo).strip() |
| |
| |
| def set_remote_url(git_repo, remote_name, url): |
| """Sets url of given remote.""" |
| git(['remote', 'set-url', remote_name, url], cwd=git_repo) |
| |
| |
| def list_commits_between_commits(git_repo, old, new): |
| """Get all commits between (old, new]. |
| |
| Args: |
| git_repo: path of git repo. |
| old: old commit hash (exclusive) |
| new: new commit hash (inclusive) |
| |
| Returns: |
| list of (timestamp, rev) |
| """ |
| assert old and new |
| if old == new: |
| return [] |
| |
| assert is_ancestor_commit(git_repo, old, new) |
| # --first-parent is necessary for Android, see following link for more |
| # discussion. |
| # https://docs.google.com/document/d/1c8qiq14_ObRRjLT62sk9r5V5cyCGHX66dLYab4MVnks/edit#heading=h.n3i6mt2n6xuu |
| lines = git_output( |
| [ |
| 'rev-list', |
| '--timestamp', |
| '--reverse', |
| '--first-parent', |
| '%s..%s' % (old, new), |
| ], |
| cwd=git_repo, |
| ).splitlines() |
| commits = [Commit.from_rev_line(line) for line in lines] |
| |
| # bisect-kit has a fundamental assumption that commit timestamps are |
| # increasing because we sort and bisect the commits by timestamp across git |
| # repos. If not increasing, we have to adjust the timestamp as workaround. |
| # This might lead to bad bisect result, however the bad probability is low in |
| # practice since most machines' clocks are good enough. |
| commits = _adjust_timestamp_increasingly(commits, show_warning=True) |
| |
| return commits |