| #!/usr/bin/env {{ python }} |
| VERSION = '2.0' |
| |
| import io |
| import os |
| import re |
| import subprocess |
| import sys |
| |
| if sys.stdout.isatty() and os.path.exists('/dev/tty'): |
| TTY = io.TextIOWrapper(io.FileIO(os.open('/dev/tty', os.O_NOCTTY | os.O_RDWR), 'r+')) |
| else: |
| TTY = None |
| |
| TOPLEVEL = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'], encoding='utf-8').strip() |
| |
| LOCATION = os.path.join(TOPLEVEL, r'{{ location }}') |
| SCRIPTS = os.path.dirname(os.path.dirname(LOCATION)) |
| sys.path.append(SCRIPTS) |
| |
| REMOTE_RE = re.compile(r'{{ remote_re }}') |
| NULL_HASH = '0' * 40 |
| TRUNCATED_COMMIT_LEN = 12 |
| |
| CLASS_1 = 1 # Commit exists on another remote |
| CLASS_2 = 2 # Commit is a cherry-pick of one that exists on another remote |
| CLASS_3 = 3 # Commit references a bug which requires redaction |
| |
| COMMIT_REF_BASE = r'r?R?[a-f0-9A-F]+(\.\d+)?@?([0-9a-zA-z\-\/\.]+[0-9a-zA-z\-\/])?' |
| COMPOUND_COMMIT_REF = r'(?P<primary>{})(?P<secondary> \({}\))?'.format(COMMIT_REF_BASE, COMMIT_REF_BASE) |
| CHERRY_PICK_RE = [ |
| re.compile(r'\S* ?[Cc]herry[- ][Pp]ick of {}'.format(COMPOUND_COMMIT_REF)), |
| re.compile(r'\S* ?[Cc]herry[- ][Pp]ick {}'.format(COMPOUND_COMMIT_REF)), |
| re.compile(r'\S* ?[Cc]herry[- ][Pp]icked {}'.format(COMPOUND_COMMIT_REF)), |
| ] |
| UNPACK_SECONDARY_RE = re.compile(r' \(({})\)'.format(COMMIT_REF_BASE)) |
| |
| DEV_BRANCHES = re.compile(r'.*[(eng)(dev)(bug)]/.+') |
| PROD_BRANCHES = re.compile(r'\S+-[\d+\.]+-branch') |
| DEFAULT_BRANCHES = ('master', 'main') |
| |
| QUIET = -1 |
| VERBOSE = 1 |
| VERBOSITY = int(os.environ.get('VERBOSITY', '0')) |
| |
| SECURITY_LEVELS = {{ security_levels }} |
| MAX_LEVEL = max(SECURITY_LEVELS.values(), default=0) |
| |
| |
| def print_error(why): |
| if sys.stdout.isatty(): |
| RED_TEXT_FORMAT = '\033[31m{}\033[0m\n' |
| (TTY or sys.stderr).write(RED_TEXT_FORMAT.format(why)) |
| (TTY or sys.stderr).flush() |
| else: |
| sys.stderr.write('{}\n'.format(why)) |
| |
| |
| def print_tty(message): |
| (TTY or sys.stdout).write('{}\n'.format(message)) |
| (TTY or sys.stdout).flush() |
| |
| |
| DEFAULT_MODE = 'default' # Forbid class 1, prompt for class 2 and 3 |
| PUBLISH_MODE = 'publish' # Prompt for class 1, forbid class 2 and 3 |
| NO_RADAR_MODE = 'no-radar' # Forbid class 1 and 2, don't compute class 3 |
| MODE = os.environ.get('PUSH_HOOK_MODE', {{ default_pre_push_mode }}) |
| |
| |
| if MODE not in (DEFAULT_MODE, PUBLISH_MODE, NO_RADAR_MODE): |
| print_error("'{}' is not a recognized push mode".format(MODE)) |
| sys.exit(1) |
| |
| |
| REPOSITORY = None |
| try: |
| import webkitpy |
| from webkitcorepy import Version |
| from webkitscmpy import local, Commit |
| from webkitbugspy import version, Tracker |
| |
| if MODE != NO_RADAR_MODE and version >= Version(0, 9, 8): |
| REPOSITORY = local.Git(os.getcwd()) |
| |
| except: |
| pass |
| |
| if not REPOSITORY: |
| print_error('Checkout too out-of-date to check bug status of novel commits') |
| |
| |
| def security_level_for_remote(remote): |
| match = REMOTE_RE.match(remote) |
| if not match: |
| return None |
| return SECURITY_LEVELS.get('{}:{}'.format(match.group('host'), match.group('path')).lower(), None) |
| |
| |
| def security_level_for_remotes(remotes, name_to_remote=None): |
| if not name_to_remote: |
| _, name_to_remote = remote_name_mappings() |
| security_levels = [security_level_for_remote(name_to_remote.get(name, name)) for name in remotes] |
| if 0 in security_levels: |
| return 0 |
| if None in security_levels: |
| return None |
| return min(security_levels) |
| |
| |
| def commits_in(from_ref, to_ref): |
| command = ['git', 'rev-list'] |
| if from_ref: |
| command.append('{}..{}'.format(from_ref, to_ref)) |
| else: |
| command.append(to_ref) |
| proc = None |
| try: |
| proc = subprocess.Popen( |
| command, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| encoding='utf-8', |
| ) |
| line = proc.stdout.readline() |
| while line: |
| yield line.rstrip()[:TRUNCATED_COMMIT_LEN] |
| line = proc.stdout.readline() |
| finally: |
| if proc and proc.poll() is None: |
| proc.kill() |
| |
| |
| def remote_name_mappings(): |
| a, b = {}, {} |
| for line in subprocess.check_output( |
| ['git', 'remote', '-v'], |
| encoding='utf-8', |
| ).splitlines(): |
| remote = line.split() |
| if len(remote) < 2 or remote[1] == 'INVALID': |
| continue |
| match = REMOTE_RE.match(remote[1]) |
| if match: |
| b[remote[0]] = remote[1] |
| a['{}:{}'.format(match.group('host'), match.group('path'))] = remote[0] |
| return a, b |
| |
| |
| def remotes_for(commit): |
| if not commit: |
| return None |
| try: |
| result = set() |
| for line in subprocess.check_output( |
| ['git', 'branch', '-r', '--contains', commit], |
| stderr=subprocess.STDOUT, |
| encoding='utf-8', |
| ).splitlines(): |
| result.add(line.lstrip().split('/')[0]) |
| return list(result) |
| except subprocess.CalledProcessError: |
| return None |
| |
| |
| def branch_for(commit): |
| if not commit: |
| return None |
| try: |
| candidate = None |
| for line in subprocess.check_output( |
| ['git', 'branch', '-a', '--contains', commit], |
| stderr=subprocess.STDOUT, |
| encoding='utf-8', |
| ).splitlines(): |
| if line[0] == '*': |
| line = line[1:] |
| line = line.lstrip().rstrip() |
| is_remote = line.startswith('remotes') |
| if is_remote: |
| line = line.split('/', 2)[-1] |
| if not candidate: |
| candidate = line |
| continue |
| if candidate in DEFAULT_BRANCHES or PROD_BRANCHES.match(candidate): |
| continue |
| if PROD_BRANCHES.match(line) or DEV_BRANCHES.match(candidate): |
| candidate = line |
| |
| return candidate |
| except subprocess.CalledProcessError: |
| return None |
| |
| |
| def message_for(commit): |
| if not commit: |
| return None |
| return subprocess.check_output( |
| ['git', 'log', '--format=%B', '-n', '1', commit], |
| encoding='utf-8', |
| ) |
| |
| |
| def security_level_for_commit(commit, remotes, name_to_remote=None, allow_class_1=False): |
| if not name_to_remote: |
| _, name_to_remote = remote_name_mappings() |
| |
| # Class 1: the commit we're pushing exists on a remote already |
| class_1_level = None |
| if remotes: |
| class_1_level = security_level_for_remotes(remotes, name_to_remote) |
| # The commit is public already, we should preform no further checks |
| if class_1_level == 0: |
| return class_1_level, CLASS_1 |
| # The commit exists on another remote, and we are not allowing class 1 violations |
| if not allow_class_1 and remotes: |
| return class_1_level, CLASS_1 |
| |
| message = message_for(commit) |
| title = message.splitlines()[0] |
| |
| # Class 2: The commit we're pushing is a cherry-pick of something that exists on a remote already |
| class_2_level = None |
| for r in CHERRY_PICK_RE: |
| match = r.match(title) |
| if not match: |
| continue |
| primary = match.group('primary') |
| secondary = match.group('secondary') |
| if secondary: |
| secondary = UNPACK_SECONDARY_RE.match(secondary).groups()[0] |
| for candidate in filter(bool, (primary, secondary)): |
| original_remotes = remotes_for(candidate) |
| if not original_remotes: |
| continue |
| class_2_level = security_level_for_remotes(original_remotes, name_to_remote) |
| |
| # If the commit is a cherry-pick of something already public, the state of the bug it's referencing doesn't matter |
| if class_2_level == 0: |
| return class_2_level, CLASS_3 |
| # Check for a class 3 violation before returning our class 2 violation |
| break |
| break |
| |
| # Class 3: We must inspect the commit for bug references |
| if REPOSITORY: |
| did_redact = False |
| obj = Commit(hash=commit, message=message) |
| for issue in obj.issues: |
| redaction = issue.redacted |
| if redaction: |
| did_redact = True |
| if getattr(redaction, 'exemption', False): |
| return 0, CLASS_3 |
| if did_redact: |
| return 1, CLASS_3 |
| |
| if class_1_level is not None: |
| return class_1_level, CLASS_1 |
| if class_2_level is not None: |
| return class_2_level, CLASS_2 |
| |
| return None, None |
| |
| |
| def main(name, remote): |
| target_security_level = security_level_for_remote(remote) |
| if VERBOSITY > QUIET: |
| if target_security_level: |
| print_tty('Pushing to {}, which is classified as a secure remote'.format(name or remote)) |
| elif target_security_level == 0: |
| print_tty('Pushing to {}, which is classified as a public remote'.format(name or remote)) |
| else: |
| print_tty('Pushing to {}, which has no classification. Treating it as a public remote'.format(name or remote)) |
| |
| to_push = [] |
| for push in sys.stdin.readlines(): |
| _, local_ref, _, remote_ref = push.split() |
| # Deletes are always safe |
| if local_ref == NULL_HASH: |
| continue |
| if remote_ref == NULL_HASH: |
| remote_ref = None |
| to_push.append((remote_ref, local_ref)) |
| if not to_push: |
| if VERBOSITY > QUIET: |
| print_tty('No novel content being pushed') |
| return 0 |
| |
| remote_to_name, name_to_remote = remote_name_mappings() |
| if name == remote: |
| match = REMOTE_RE.match(remote) |
| if not match: |
| print_error("'{}' does not conform to a recognized target remote".format(remote)) |
| return 255 |
| name = remote_to_name.get('{}:{}'.format(match.group('host'), match.group('path')), remote) |
| if name not in name_to_remote: |
| print_error("Failed to map '{}' to an existing remote".format(remote)) |
| |
| remotes_with = {} |
| for from_ref, to_ref in to_push: |
| for commit in commits_in(from_ref, to_ref): |
| if commit not in remotes_with: |
| remotes_with[commit] = remotes_for(commit) or [] |
| |
| # Any future commits (being ancestors of this one) will nessesarily be on the same remote this commit is |
| rs = remotes_with.get(commit, []) |
| if not rs: |
| continue |
| if name in rs or MODE != PUBLISH_MODE and rs: |
| break |
| commit_level = security_level_for_remotes(rs, name_to_remote) |
| if commit_level == 0: |
| break |
| if (commit_level or MAX_LEVEL) <= (target_security_level or 0): |
| break |
| remotes_with = dict(filter(lambda pair: name not in pair[1], remotes_with.items())) |
| |
| print_tty("Verifying commits in '{}' mode".format(MODE)) |
| if VERBOSITY > QUIET: |
| print_tty('Attempting to push {} batch{} of commits to {}...'.format(len(to_push), '' if len(to_push) == 1 else 'es', name)) |
| |
| prompt_for = [] |
| for commit, remotes in remotes_with.items(): |
| security_level, categorization_class = security_level_for_commit(commit, remotes, name_to_remote=name_to_remote, allow_class_1=MODE == PUBLISH_MODE) |
| |
| # Unkown security level of source |
| if categorization_class == 1 and security_level is None and not target_security_level: |
| print_error("'{}' comes from an uncategorized remote, it's not safe to push it publically".format(commit)) |
| return 1 |
| # Security level of source exceeds security level of target |
| if categorization_class == 1 and (MAX_LEVEL if security_level is None else security_level) > (target_security_level or 0): |
| print_error("'{}' comes from a more secure remote than '{}'".format(commit, name)) |
| if MODE == PUBLISH_MODE: |
| prompt_for.append(commit) |
| else: |
| return 1 |
| |
| # Unknown security level from original of cherry-pick |
| if categorization_class == 2 and security_level is None and not target_security_level: |
| print_error("'{}' cherry-picks a change from an uncategorized remote, it's not safe to push it publically".format(commit)) |
| return 1 |
| # Security level of original of a cherry-pick exceeds security level of target |
| if categorization_class == 2 and (MAX_LEVEL if security_level is None else security_level) > (target_security_level or 0): |
| print_error("'{}' cherry-picks a change from a more secure remote than '{}'".format(commit, name)) |
| if MODE in (DEFAULT_MODE, PUBLISH_MODE): |
| prompt_for.append(commit) |
| else: |
| return 1 |
| |
| # Commit references a redacted issue, but we're pushing it publically |
| if categorization_class == 3 and (MAX_LEVEL if security_level is None else security_level) > (target_security_level or 0): |
| print_error("'{}' fixes a redacted issue, but '{}' is a public remote".format(commit, name)) |
| return 1 |
| |
| if VERBOSITY >= VERBOSE: |
| print_tty(' {} is security level {} by class {}'.format(commit, security_level, categorization_class)) |
| |
| if prompt_for and not TTY: |
| print_tty('No TTY available for prompting user, rejecting push') |
| return 1 |
| |
| if prompt_for: |
| branches = sorted({branch_for(commit) for commit in prompt_for}) |
| expected_response = 'publicize {}'.format( |
| branches[0] if len(branches) == 1 |
| else '{}, and {}'.format(', '.join(branches[:-1]), branches[-1]), |
| ) |
| print_tty('') |
| print_tty('We have detected you are about to publish security content to a public remote') |
| print_tty('Unless you are engaged in merge-back or fixing a security bug which has not shipped, you are likely making a mistake') |
| print_tty('') |
| sys.stdout.write('Type "{}" to continue with publication: '.format(expected_response)) |
| sys.stdout.flush() |
| response = TTY.readline().rstrip() |
| if response != expected_response: |
| if response: |
| print_error('Response does not match prompt') |
| print_error('User failed to explicitly approve potentially dangerous push') |
| return 1 |
| print_tty("User has explicitly approved potentially dangerous push, continuing...") |
| |
| elif VERBOSITY > QUIET: |
| print_tty('Verified {} novel commit{} to {} are safe to push'.format(len(remotes_with), '' if len(remotes_with) == 1 else 's', name)) |
| |
| return 0 |
| |
| if __name__ == '__main__': |
| try: |
| sys.exit(main(*sys.argv[1:])) |
| except KeyboardInterrupt: |
| print_tty('') |
| print_error('User has canceled push') |
| sys.exit(1) |