blob: 0be568e3f1b3235ff221100cdce27a23dbba606c [file] [log] [blame] [edit]
#!/usr/bin/env {{ python }}
VERSION = '2.0'
import io
import os
import re
import subprocess
import sys
if sys.stdout.isatty() and os.path.exists('/dev/tty'):
TTY = io.TextIOWrapper(io.FileIO(os.open('/dev/tty', os.O_NOCTTY | os.O_RDWR), 'r+'))
else:
TTY = None
TOPLEVEL = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'], encoding='utf-8').strip()
LOCATION = os.path.join(TOPLEVEL, r'{{ location }}')
SCRIPTS = os.path.dirname(os.path.dirname(LOCATION))
sys.path.append(SCRIPTS)
REMOTE_RE = re.compile(r'{{ remote_re }}')
NULL_HASH = '0' * 40
TRUNCATED_COMMIT_LEN = 12
CLASS_1 = 1 # Commit exists on another remote
CLASS_2 = 2 # Commit is a cherry-pick of one that exists on another remote
CLASS_3 = 3 # Commit references a bug which requires redaction
COMMIT_REF_BASE = r'r?R?[a-f0-9A-F]+(\.\d+)?@?([0-9a-zA-z\-\/\.]+[0-9a-zA-z\-\/])?'
COMPOUND_COMMIT_REF = r'(?P<primary>{})(?P<secondary> \({}\))?'.format(COMMIT_REF_BASE, COMMIT_REF_BASE)
CHERRY_PICK_RE = [
re.compile(r'\S* ?[Cc]herry[- ][Pp]ick of {}'.format(COMPOUND_COMMIT_REF)),
re.compile(r'\S* ?[Cc]herry[- ][Pp]ick {}'.format(COMPOUND_COMMIT_REF)),
re.compile(r'\S* ?[Cc]herry[- ][Pp]icked {}'.format(COMPOUND_COMMIT_REF)),
]
UNPACK_SECONDARY_RE = re.compile(r' \(({})\)'.format(COMMIT_REF_BASE))
DEV_BRANCHES = re.compile(r'.*[(eng)(dev)(bug)]/.+')
PROD_BRANCHES = re.compile(r'\S+-[\d+\.]+-branch')
DEFAULT_BRANCHES = ('master', 'main')
QUIET = -1
VERBOSE = 1
VERBOSITY = int(os.environ.get('VERBOSITY', '0'))
SECURITY_LEVELS = {{ security_levels }}
MAX_LEVEL = max(SECURITY_LEVELS.values(), default=0)
def print_error(why):
if sys.stdout.isatty():
RED_TEXT_FORMAT = '\033[31m{}\033[0m\n'
(TTY or sys.stderr).write(RED_TEXT_FORMAT.format(why))
(TTY or sys.stderr).flush()
else:
sys.stderr.write('{}\n'.format(why))
def print_tty(message):
(TTY or sys.stdout).write('{}\n'.format(message))
(TTY or sys.stdout).flush()
DEFAULT_MODE = 'default' # Forbid class 1, prompt for class 2 and 3
PUBLISH_MODE = 'publish' # Prompt for class 1, forbid class 2 and 3
NO_RADAR_MODE = 'no-radar' # Forbid class 1 and 2, don't compute class 3
MODE = os.environ.get('PUSH_HOOK_MODE', {{ default_pre_push_mode }})
if MODE not in (DEFAULT_MODE, PUBLISH_MODE, NO_RADAR_MODE):
print_error("'{}' is not a recognized push mode".format(MODE))
sys.exit(1)
REPOSITORY = None
try:
import webkitpy
from webkitcorepy import Version
from webkitscmpy import local, Commit
from webkitbugspy import version, Tracker
if MODE != NO_RADAR_MODE and version >= Version(0, 9, 8):
REPOSITORY = local.Git(os.getcwd())
except:
pass
if not REPOSITORY:
print_error('Checkout too out-of-date to check bug status of novel commits')
def security_level_for_remote(remote):
match = REMOTE_RE.match(remote)
if not match:
return None
return SECURITY_LEVELS.get('{}:{}'.format(match.group('host'), match.group('path')).lower(), None)
def security_level_for_remotes(remotes, name_to_remote=None):
if not name_to_remote:
_, name_to_remote = remote_name_mappings()
security_levels = [security_level_for_remote(name_to_remote.get(name, name)) for name in remotes]
if 0 in security_levels:
return 0
if None in security_levels:
return None
return min(security_levels)
def commits_in(from_ref, to_ref):
command = ['git', 'rev-list']
if from_ref:
command.append('{}..{}'.format(from_ref, to_ref))
else:
command.append(to_ref)
proc = None
try:
proc = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
)
line = proc.stdout.readline()
while line:
yield line.rstrip()[:TRUNCATED_COMMIT_LEN]
line = proc.stdout.readline()
finally:
if proc and proc.poll() is None:
proc.kill()
def remote_name_mappings():
a, b = {}, {}
for line in subprocess.check_output(
['git', 'remote', '-v'],
encoding='utf-8',
).splitlines():
remote = line.split()
if len(remote) < 2 or remote[1] == 'INVALID':
continue
match = REMOTE_RE.match(remote[1])
if match:
b[remote[0]] = remote[1]
a['{}:{}'.format(match.group('host'), match.group('path'))] = remote[0]
return a, b
def remotes_for(commit):
if not commit:
return None
try:
result = set()
for line in subprocess.check_output(
['git', 'branch', '-r', '--contains', commit],
stderr=subprocess.STDOUT,
encoding='utf-8',
).splitlines():
result.add(line.lstrip().split('/')[0])
return list(result)
except subprocess.CalledProcessError:
return None
def branch_for(commit):
if not commit:
return None
try:
candidate = None
for line in subprocess.check_output(
['git', 'branch', '-a', '--contains', commit],
stderr=subprocess.STDOUT,
encoding='utf-8',
).splitlines():
if line[0] == '*':
line = line[1:]
line = line.lstrip().rstrip()
is_remote = line.startswith('remotes')
if is_remote:
line = line.split('/', 2)[-1]
if not candidate:
candidate = line
continue
if candidate in DEFAULT_BRANCHES or PROD_BRANCHES.match(candidate):
continue
if PROD_BRANCHES.match(line) or DEV_BRANCHES.match(candidate):
candidate = line
return candidate
except subprocess.CalledProcessError:
return None
def message_for(commit):
if not commit:
return None
return subprocess.check_output(
['git', 'log', '--format=%B', '-n', '1', commit],
encoding='utf-8',
)
def security_level_for_commit(commit, remotes, name_to_remote=None, allow_class_1=False):
if not name_to_remote:
_, name_to_remote = remote_name_mappings()
# Class 1: the commit we're pushing exists on a remote already
class_1_level = None
if remotes:
class_1_level = security_level_for_remotes(remotes, name_to_remote)
# The commit is public already, we should preform no further checks
if class_1_level == 0:
return class_1_level, CLASS_1
# The commit exists on another remote, and we are not allowing class 1 violations
if not allow_class_1 and remotes:
return class_1_level, CLASS_1
message = message_for(commit)
title = message.splitlines()[0]
# Class 2: The commit we're pushing is a cherry-pick of something that exists on a remote already
class_2_level = None
for r in CHERRY_PICK_RE:
match = r.match(title)
if not match:
continue
primary = match.group('primary')
secondary = match.group('secondary')
if secondary:
secondary = UNPACK_SECONDARY_RE.match(secondary).groups()[0]
for candidate in filter(bool, (primary, secondary)):
original_remotes = remotes_for(candidate)
if not original_remotes:
continue
class_2_level = security_level_for_remotes(original_remotes, name_to_remote)
# If the commit is a cherry-pick of something already public, the state of the bug it's referencing doesn't matter
if class_2_level == 0:
return class_2_level, CLASS_3
# Check for a class 3 violation before returning our class 2 violation
break
break
# Class 3: We must inspect the commit for bug references
if REPOSITORY:
did_redact = False
obj = Commit(hash=commit, message=message)
for issue in obj.issues:
redaction = issue.redacted
if redaction:
did_redact = True
if getattr(redaction, 'exemption', False):
return 0, CLASS_3
if did_redact:
return 1, CLASS_3
if class_1_level is not None:
return class_1_level, CLASS_1
if class_2_level is not None:
return class_2_level, CLASS_2
return None, None
def main(name, remote):
target_security_level = security_level_for_remote(remote)
if VERBOSITY > QUIET:
if target_security_level:
print_tty('Pushing to {}, which is classified as a secure remote'.format(name or remote))
elif target_security_level == 0:
print_tty('Pushing to {}, which is classified as a public remote'.format(name or remote))
else:
print_tty('Pushing to {}, which has no classification. Treating it as a public remote'.format(name or remote))
to_push = []
for push in sys.stdin.readlines():
_, local_ref, _, remote_ref = push.split()
# Deletes are always safe
if local_ref == NULL_HASH:
continue
if remote_ref == NULL_HASH:
remote_ref = None
to_push.append((remote_ref, local_ref))
if not to_push:
if VERBOSITY > QUIET:
print_tty('No novel content being pushed')
return 0
remote_to_name, name_to_remote = remote_name_mappings()
if name == remote:
match = REMOTE_RE.match(remote)
if not match:
print_error("'{}' does not conform to a recognized target remote".format(remote))
return 255
name = remote_to_name.get('{}:{}'.format(match.group('host'), match.group('path')), remote)
if name not in name_to_remote:
print_error("Failed to map '{}' to an existing remote".format(remote))
remotes_with = {}
for from_ref, to_ref in to_push:
for commit in commits_in(from_ref, to_ref):
if commit not in remotes_with:
remotes_with[commit] = remotes_for(commit) or []
# Any future commits (being ancestors of this one) will nessesarily be on the same remote this commit is
rs = remotes_with.get(commit, [])
if not rs:
continue
if name in rs or MODE != PUBLISH_MODE and rs:
break
commit_level = security_level_for_remotes(rs, name_to_remote)
if commit_level == 0:
break
if (commit_level or MAX_LEVEL) <= (target_security_level or 0):
break
remotes_with = dict(filter(lambda pair: name not in pair[1], remotes_with.items()))
print_tty("Verifying commits in '{}' mode".format(MODE))
if VERBOSITY > QUIET:
print_tty('Attempting to push {} batch{} of commits to {}...'.format(len(to_push), '' if len(to_push) == 1 else 'es', name))
prompt_for = []
for commit, remotes in remotes_with.items():
security_level, categorization_class = security_level_for_commit(commit, remotes, name_to_remote=name_to_remote, allow_class_1=MODE == PUBLISH_MODE)
# Unkown security level of source
if categorization_class == 1 and security_level is None and not target_security_level:
print_error("'{}' comes from an uncategorized remote, it's not safe to push it publically".format(commit))
return 1
# Security level of source exceeds security level of target
if categorization_class == 1 and (MAX_LEVEL if security_level is None else security_level) > (target_security_level or 0):
print_error("'{}' comes from a more secure remote than '{}'".format(commit, name))
if MODE == PUBLISH_MODE:
prompt_for.append(commit)
else:
return 1
# Unknown security level from original of cherry-pick
if categorization_class == 2 and security_level is None and not target_security_level:
print_error("'{}' cherry-picks a change from an uncategorized remote, it's not safe to push it publically".format(commit))
return 1
# Security level of original of a cherry-pick exceeds security level of target
if categorization_class == 2 and (MAX_LEVEL if security_level is None else security_level) > (target_security_level or 0):
print_error("'{}' cherry-picks a change from a more secure remote than '{}'".format(commit, name))
if MODE in (DEFAULT_MODE, PUBLISH_MODE):
prompt_for.append(commit)
else:
return 1
# Commit references a redacted issue, but we're pushing it publically
if categorization_class == 3 and (MAX_LEVEL if security_level is None else security_level) > (target_security_level or 0):
print_error("'{}' fixes a redacted issue, but '{}' is a public remote".format(commit, name))
return 1
if VERBOSITY >= VERBOSE:
print_tty(' {} is security level {} by class {}'.format(commit, security_level, categorization_class))
if prompt_for and not TTY:
print_tty('No TTY available for prompting user, rejecting push')
return 1
if prompt_for:
branches = sorted({branch_for(commit) for commit in prompt_for})
expected_response = 'publicize {}'.format(
branches[0] if len(branches) == 1
else '{}, and {}'.format(', '.join(branches[:-1]), branches[-1]),
)
print_tty('')
print_tty('We have detected you are about to publish security content to a public remote')
print_tty('Unless you are engaged in merge-back or fixing a security bug which has not shipped, you are likely making a mistake')
print_tty('')
sys.stdout.write('Type "{}" to continue with publication: '.format(expected_response))
sys.stdout.flush()
response = TTY.readline().rstrip()
if response != expected_response:
if response:
print_error('Response does not match prompt')
print_error('User failed to explicitly approve potentially dangerous push')
return 1
print_tty("User has explicitly approved potentially dangerous push, continuing...")
elif VERBOSITY > QUIET:
print_tty('Verified {} novel commit{} to {} are safe to push'.format(len(remotes_with), '' if len(remotes_with) == 1 else 's', name))
return 0
if __name__ == '__main__':
try:
sys.exit(main(*sys.argv[1:]))
except KeyboardInterrupt:
print_tty('')
print_error('User has canceled push')
sys.exit(1)