blob: b90a227e27a28a781de2ed5917e5b8b56c1ceef2 [file] [log] [blame]
# Copyright 2018 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import os
import re
import sys
from builtins import zip
from gevent import subprocess
def _pattern2re(pattern):
"""Transforms a GA pattern to a regular expression."""
i = 0
escaped = False
regex = ''
# Keep track of the index where the character class started, None if we're not
# currently parsing a character class.
charclass_start = None
while i < len(pattern):
to_skip = 1
to_add = pattern[i]
if escaped:
escaped = False
elif pattern[i] == '\\':
escaped = True
elif pattern[i] == '[':
charclass_start = i
elif pattern[i] == ']':
# When ']' is the first character after a character class starts, it
# doesn't end it, it just means ']' is part of the character class.
if charclass_start is None or charclass_start < i - 1:
charclass_start = None
elif pattern[i] == '?' and charclass_start is None:
# '?' shouldn't be replaced inside character classes.
to_add = '[^/]'
elif pattern[i] == '*' and charclass_start is None:
# '*' shouldn't be replaced inside character classes.
if pattern[i:i+3] == '**/':
to_add = '((.+/)?)'
to_skip = 3
elif pattern[i:i+2] == '**':
to_add = '.+'
to_skip = 2
else:
to_add = '[^/]*'
elif charclass_start is None:
to_add = re.escape(pattern[i])
regex += to_add
i += to_skip
if regex.startswith('/'):
regex = '^' + regex
else:
regex = '/' + regex
return regex + '$'
def _parse_gitattr_line(line):
"""Parses a line in a GA files.
Args:
line (str) - A line in a GA file.
Returns:
If the line is empty, a comment, or doesn't modify the 'recipes' attribute,
this function returns None.
Otherwise, it returns a pair with |pattern| and |has_recipes|, where
|pattern| is a regex encoding the pattern, and |has_recipes| is True if the
'recipes' attribute was set and False if it was unset (-) or unspecified (!)
"""
line = line.strip()
if not line or line.startswith('#'):
return None
if line.startswith((r'\#', r'\!')):
line = line[1:]
if not line.startswith('"'):
line = line.split()
pattern = line[0]
attributes = line[1:]
else:
is_escaped = False
pattern = ''
for i, c in enumerate(line[1:], 1):
if is_escaped:
pattern += c
is_escaped = False
elif c == '\\':
is_escaped = True
elif c == '"':
attributes = line[i+1:].strip().split()
break
else:
pattern += c
has_recipes = None
for attribute in reversed(attributes):
action = True
if attribute.startswith(('-', '!')):
action = False
attribute = attribute[1:]
if attribute == 'recipes':
has_recipes = action
break
if has_recipes is None:
return None
return _pattern2re(pattern), has_recipes
class AttrChecker:
def __init__(self, repo, shortcircuit=True):
self._repo = repo
# Shortcircuit means we only care about whether any of the files we check
# has the 'recipes' attribute set (which is useful when checking if the
# revision is interesting), and not about the results for each individual
# file (which is useful for testing).
self._shortcircuit = shortcircuit
# A map from the git blob hash of a .gitattributes file to a list of the
# rules specified in that file that affect the 'recipes' attribute.
# Each rule is a pair of (pattern, action) where |pattern| is a compiled
# regex that matches the affected files, and action is True if the 'recipes'
# attributes is to be set or False otherwise.
# Rules are stored in the order they appear in the .gitattributes file.
self._gitattr_files_cache = {}
# Stores the gitattributes files for the current revision.
self._gitattr_files = None
def _git(self, cmd, stdin=None):
"""Executes a git command and returns the standard output."""
p = subprocess.Popen(
['git'] + cmd,
cwd=self._repo,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True)
stdout, _ = p.communicate(stdin if stdin else None)
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, ['git'] + cmd, None)
return stdout.strip().splitlines()
def _get_directories(self, files):
"""Lists all the directories touched by any of the |files|."""
dirs = set([''])
for f in files:
f = os.path.dirname(f)
while f and f not in dirs:
dirs.add(f)
f = os.path.dirname(f)
return dirs
def _ensure_gitattributes_files_loaded(self, revision, files):
"""Loads and parses all the .gitattributes files in the given revision."""
self._gitattr_files = []
# We list all the directories that were touched by any of the files, and
# search for .gitattributes files in them.
touched_dirs = self._get_directories(files)
possible_gitattr_paths = '\n'.join(
'%s:%s' % (revision, os.path.join(d, '.gitattributes'))
for d in touched_dirs)
# We ask git to list the hashes for all the .gitattributes files we listed
# above. If the file doesn't exist, git returns '<object> missing', where
# object is the revision and .gitattribute file we asked for.
possible_gitattr_blobs = self._git(
['cat-file', '--batch-check=%(objectname)'],
possible_gitattr_paths)
for line, d in zip(possible_gitattr_blobs, touched_dirs):
if line.endswith(' missing'):
continue
if d != '':
d += '/'
self._gitattr_files.append(('/' + d, self._parse_gitattr_file(line)))
# Store the paths in desc. order of length.
self._gitattr_files.sort()
self._gitattr_files.reverse()
def _parse_gitattr_file(self, blob_hash):
"""Returns a list of patterns and actions parsed from the GA file.
Parses the .gitattributes file pointed at by |blob_hash|, and returns the
patterns that set, unset or unspecify the 'recipes' attribute.
Args:
blob_hash (sha1) - A hash that points to a .gitattributes file in the git
repository.
Returns:
A list of |(pattern, action)| where |pattern| is a compiled regular
expression encoding a pattern in the GA file, and |action| is True if
'recipes' was set, and False if it was unset (-) or unspecified (!).
"""
if blob_hash in self._gitattr_files_cache:
return self._gitattr_files_cache[blob_hash]
rules = []
for line in self._git(['cat-file', 'blob', blob_hash]):
parsed_line = _parse_gitattr_line(line)
if parsed_line is None:
continue
pattern, attr_value = parsed_line
if rules and rules[-1][1] == attr_value:
rules[-1][0] = '((%s)|(%s))' % (rules[-1][0], pattern)
else:
if rules:
rules[-1][0] = re.compile(rules[-1][0])
rules.append([pattern, attr_value])
if rules:
rules[-1][0] = re.compile(rules[-1][0])
self._gitattr_files_cache[blob_hash] = rules
return rules
def _check_file(self, f):
"""Check whether |f| has the 'recipes' attribute set.
Returns True if the file |f| has the 'recipes' attribute set, and False
otherwise.
"""
# If the file path starts with the GA path, then the path is a parent of
# the file. Note that since the GA paths are sorted desc. according to
# length, the first we find will be the most specific one.
for path, rules in self._gitattr_files:
if not f.startswith(path):
continue
# Iterate over the rules in reverse, so the last rule comes first and we
# can return early.
result = None
for pattern, action in reversed(rules):
if pattern.search(f):
result = action
break
# If the result is not None, then the GA told us how to handle the file
# and we can stop looking.
if result is not None:
return result
# No GA specified a rule for the file, so the attribute is unspecified and
# not set.
return False
def check_files(self, revision, files):
"""Checks the 'recipes' attribute for the |files| at the given |revision|.
If |shortcircuit| was specified when creating this object, returns True if
any of the |files| has the 'recipes' attribute set.
Otherwise, returns a list with an entry for each file |f| specifying
whether it has the 'recipes' attribute set or not.
"""
# Make sure the gitattribute files are loaded at the right revision.
self._ensure_gitattributes_files_loaded(revision, files)
results = (self._check_file('/' + f) for f in files)
if self._shortcircuit:
return any(results)
return list(results)