blob: c5fd4cfffc87316e8efcc85d93a8ae44577cdc77 [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import abc
import collections
import logging
import pathlib
import re
from typing import (Any, Dict, Iterable, Iterator, List, Optional, Tuple, Type,
TypeVar, Union)
from ordered_set import OrderedSet
class FrozenFlagsError(RuntimeError):
pass
FreezableT = TypeVar("FreezableT", bound="Freezable")
class Freezable:
def __init__(self, *args, **kwargs) -> None:
self._frozen = False
super().__init__(*args, **kwargs)
def __hash__(self):
self.freeze()
return hash(str(self))
@property
def is_frozen(self) -> bool:
return self._frozen
def freeze(self: FreezableT) -> FreezableT:
self._frozen = True
return self
def assert_not_frozen(self, msg: Optional[str] = None) -> None:
if not self._frozen:
return
if not msg:
msg = f"Cannot modify frozen {type(self).__name__}"
raise FrozenFlagsError(msg)
BasicFlagsT = TypeVar("BasicFlagsT", bound="BasicFlags")
class BasicFlags(Freezable, collections.UserDict):
"""Basic implementation for command line flags (similar to Dic[str, str].
This class is mostly used to make sure command-line flags for browsers
don't end up having contradicting values.
"""
InitialDataType = Optional[Union[Dict[str, str], "Flags",
Iterable[Union[Tuple[str, Optional[str]],
str]]]]
_WHITE_SPACE_RE = re.compile(r"\s+")
_BASIC_FLAG_NAME_RE = re.compile(r"(--?)[^\s=-][^\s=]*")
# Handles space-separated flags: --foo="1" --bar --baz='2' --boo=3
_VALUE_PATTERN = (r"('(?P<value_single_quotes>[^']*)')|"
r"(\"(?P<value_double_quotes>[^\"]*)\")|"
r"(?P<value_no_quotes>[^'\" ]+)")
_END_OR_SEPARATOR_PATTERN = r"(\s*\s\s*|$)"
_PARSE_RE = re.compile(fr"(?P<name>{_BASIC_FLAG_NAME_RE.pattern})"
fr"((?P<equal>=)({_VALUE_PATTERN})?)?"
fr"{_END_OR_SEPARATOR_PATTERN}")
@classmethod
def split(cls, flag_str: str) -> Tuple[str, Optional[str]]:
if "=" in flag_str:
flag_name, flag_value = flag_str.split("=", maxsplit=1)
return (flag_name, flag_value)
return (flag_str, None)
@classmethod
def parse(cls: Type[BasicFlagsT], data: Any) -> BasicFlagsT:
if isinstance(data, cls):
return data
if isinstance(data, str):
return cls.loads(data)
return cls(data)
@classmethod
def loads(cls: Type[BasicFlagsT], raw_flags: str) -> BasicFlagsT:
return cls._loads(raw_flags)
@classmethod
def _loads(cls: Type[BasicFlagsT],
raw_flags: str,
msg: str = "flag") -> BasicFlagsT:
raw_flags = raw_flags.strip()
if not raw_flags:
return cls()
flag_parts: List[Tuple[str, Optional[str]]] = []
current_end: Optional[int] = None
for match in cls._PARSE_RE.finditer(raw_flags):
if current_end is None:
if match.start() != 0:
part = raw_flags[:match.start()]
raise ValueError(f"Invalid {msg} part at pos=0: {repr(part)}")
else:
if current_end != match.start():
raise ValueError(f"Invalid {msg}: could not consume all data")
current_end = match.end()
groups = match.groupdict()
maybe_flag_name: Optional[str] = groups.get("name")
if not maybe_flag_name:
raise ValueError(f"Invalid {msg}: {repr(raw_flags)}")
# Re-assign since pytype doesn't remove the Optional.
flag_name: str = maybe_flag_name
flag_value: Optional[str] = (
groups.get("value_single_quotes") or
groups.get("value_double_quotes") or groups.get("value_no_quotes"))
if groups.get("equal") and not flag_value:
raise ValueError(f"Invalid {msg}: missing value for {repr(flag_name)}")
assert flag_name
flag_parts.append((flag_name, flag_value))
if current_end != len(raw_flags):
part = raw_flags[current_end:]
raise ValueError(
f"Invalid {msg} part at pos={current_end or 0}: {repr(part)}")
return cls(flag_parts)
def __init__(self, initial_data: Flags.InitialDataType = None) -> None:
super().__init__(initial_data)
def __setitem__(self, flag_name: str, flag_value: Optional[str]) -> None:
return self.set(flag_name, flag_value)
def set(self,
flag_name: str,
flag_value: Optional[str] = None,
override: bool = False) -> None:
self._set(flag_name, flag_value, override)
def _set(self,
flag_name: str,
flag_value: Optional[str] = None,
override: bool = False) -> None:
self.assert_not_frozen()
self._validate_flag_name(flag_name)
if flag_value:
self._validate_flag_value(flag_name, flag_value)
self._validate_override(flag_name, flag_value, override)
self.data[flag_name] = flag_value
def _validate_flag_name(self, flag_name: str) -> None:
if not flag_name:
raise ValueError("Cannot set empty flag")
if self._WHITE_SPACE_RE.search(flag_name):
raise ValueError(
f"Flag name cannot contain whitespaces: {repr(flag_name)}")
if "=" in flag_name:
raise ValueError(
f"Flag name contains '=': {repr(flag_name)}, please split")
if flag_name[0] != "-":
raise ValueError(
f"Flag name must begin with a '-', but got {repr(flag_name)}")
if not self._BASIC_FLAG_NAME_RE.fullmatch(flag_name):
raise ValueError(
f"Flag name contains invalid characters: {repr(flag_name)}")
def _validate_flag_value(self, flag_name: str, flag_value: str) -> None:
assert flag_value, "Got invalid empty flag_value."
if not isinstance(flag_value, str):
raise TypeError(
f"Expected None or string flag-value for flag {flag_name}, "
f"but got: {repr(flag_value)}")
def _validate_override(self, flag_name: str, flag_value: Optional[str],
override: bool) -> None:
if override or flag_name not in self:
return
old_value = self[flag_name]
if flag_value != old_value:
raise ValueError(f"Flag {flag_name}={repr(flag_value)} was already set "
f"with a different previous value: {repr(old_value)}")
# pylint: disable=arguments-differ
def update(self,
initial_data: Flags.InitialDataType = None,
override: bool = False) -> None:
# pylint: disable=arguments-differ
if initial_data is None:
return
if isinstance(initial_data, (Flags, dict)):
for flag_name, flag_value in initial_data.items():
self.set(flag_name, flag_value, override)
else:
for flag_name_or_items in initial_data:
if isinstance(flag_name_or_items, str):
self.set(flag_name_or_items, None, override)
else:
flag_name, flag_value = flag_name_or_items
self.set(flag_name, flag_value, override)
def merge(self, other: Flags.InitialDataType):
self.update(other)
def copy(self: BasicFlagsT) -> BasicFlagsT:
return self.__class__(self)
def merge_copy(self, other: Flags.InitialDataType):
ret = self.copy()
ret.merge(other)
return ret
def _describe(self, flag_name: str) -> str:
value = self.get(flag_name)
if value is None:
return flag_name
return f"{flag_name}={value}"
def get_list(self) -> Iterator[str]:
return (k if v is None else f"{k}={v}" for k, v in self.items())
def __str__(self) -> str:
return " ".join(self.get_list())
class Flags(BasicFlags):
"""
Subclass with slightly stricter flag name checking.
Most command-line programs adhere to this.
"""
_FLAG_NAME_RE = re.compile(r"(--?)[a-zA-Z0-9][a-zA-Z0-9_-]*")
_PARSE_RE = re.compile(fr"(?P<name>{_FLAG_NAME_RE.pattern})"
fr"((?P<equal>=)({BasicFlags._VALUE_PATTERN})?)?"
fr"{BasicFlags._END_OR_SEPARATOR_PATTERN}")
def _validate_flag_name(self, flag_name: str) -> None:
super()._validate_flag_name(flag_name)
if not self._FLAG_NAME_RE.fullmatch(flag_name):
raise ValueError(
f"Flag name contains invalid characters: {repr(flag_name)}")
FlagsT = TypeVar("FlagsT", bound=Flags)
class JSFlags(Flags):
"""Custom flags implementation for V8 flags (--js-flags in chrome)
Additionally to the base Flag implementation it asserts that bool flags
with the --no-.../--no... prefix are not contradicting each other.
"""
_NO_PREFIX = "--no"
_NAME_RE = re.compile(r"--[a-zA-Z_][a-zA-Z0-9_-]+")
# We allow two forms:
# - space separated: --foo="1" --bar --baz='2' --boo=3
# - comma separated: --foo="1",--bar ,--baz='2', --boo=3
_VALUE_PATTERN = (r"('(?P<value_single_quotes>[^',]+)')|"
r"(\"(?P<value_double_quotes>[^\",]+)\")|"
r"(?P<value_no_quotes>[^'\", =]+)")
_END_OR_SEPARATOR_PATTERN = r"(\s*[,\s]\s*|$)"
_PARSE_RE = re.compile(fr"(?P<name>{_NAME_RE.pattern})"
fr"((?P<equal>=)({_VALUE_PATTERN})?)?"
fr"{_END_OR_SEPARATOR_PATTERN}")
@classmethod
def loads(cls, raw_flags: str) -> JSFlags:
return cls._loads(raw_flags, "--js-flags")
def copy(self) -> JSFlags:
return self.__class__(self)
def _set(self,
flag_name: str,
flag_value: Optional[str] = None,
override: bool = False) -> None:
self._validate_js_flag_name(flag_name)
if flag_value is not None:
self._validate_js_flag_value(flag_name, flag_value)
self._check_negated_flag(flag_name, override)
super()._set(flag_name, flag_value, override)
def _validate_js_flag_value(self, flag_name: str, flag_value: str) -> None:
if not isinstance(flag_value, str):
raise TypeError("JSFlag value must be str, "
f"but got {type(flag_value)}: {repr(flag_value)}")
if "," in flag_value:
raise ValueError(
"--js-flags: Comma in V8 flag value, flag escaping for chrome's "
f"--js-flags might not work: {flag_name}={repr(flag_value)}")
if self._WHITE_SPACE_RE.search(flag_value):
raise ValueError("--js-flags: V8 flag-values cannot contain whitespaces:"
f"{flag_name}={repr(flag_value)}")
def _validate_js_flag_name(self, flag_name: str) -> None:
if not flag_name.startswith("--"):
raise ValueError("--js-flags: Only long-form flag names allowed, "
f"but got {repr(flag_name)}")
if not self._NAME_RE.fullmatch(flag_name):
raise ValueError(f"--js-flags: Invalid flag name {repr(flag_name)}. \n"
"Check invalid characters in the V8 flag name?")
def _check_negated_flag(self, flag_name: str, override: bool) -> None:
if flag_name.startswith(self._NO_PREFIX):
enabled = flag_name[len(self._NO_PREFIX):]
# Check for --no-foo form
if enabled.startswith("-"):
enabled = enabled[1:]
enabled = "--" + enabled
if override:
del self[enabled]
elif enabled in self:
raise ValueError(
f"Conflicting flag {flag_name}, "
f"it has already been enabled by {repr(self._describe(enabled))}")
else:
# --foo => --no-foo
disabled = f"--no-{flag_name[2:]}"
if disabled not in self:
# Try compact version: --foo => --nofoo
disabled = f"--no{flag_name[2:]}"
if disabled not in self:
return
if override:
del self[disabled]
else:
raise ValueError(f"Conflicting flag {flag_name}, "
"it has previously been disabled by "
f"{repr(self._describe(flag_name))}")
def __str__(self) -> str:
return ",".join(self.get_list())
class ChromeFlags(Flags):
"""Specialized Flags for Chrome/Chromium-based browser.
This has special treatment for --js-flags and the feature flags:
--enable-features/--disable-features
--enable-blink-features/--disable-blink-features
"""
_JS_FLAG = "--js-flags"
def __init__(self, initial_data: Flags.InitialDataType = None) -> None:
self._features = ChromeFeatures()
self._blink_features = ChromeBlinkFeatures()
self._js_flags = JSFlags()
super().__init__(initial_data)
def freeze(self) -> ChromeFlags:
super().freeze()
self._js_flags.freeze()
self._features.freeze()
self._blink_features.freeze()
return self
def _set(self,
flag_name: str,
flag_value: Optional[str] = None,
override: bool = False) -> None:
self.assert_not_frozen()
# pylint: disable=signature-differs
if flag_name == ChromeFeatures.ENABLE_FLAG:
if flag_value is None:
raise ValueError(f"{ChromeFeatures.ENABLE_FLAG} cannot be None")
for feature in flag_value.split(","):
self._features.enable(feature)
elif flag_name == ChromeFeatures.DISABLE_FLAG:
if flag_value is None:
raise ValueError(f"{ChromeFeatures.DISABLE_FLAG} cannot be None")
for feature in flag_value.split(","):
self._features.disable(feature)
elif flag_name == ChromeBlinkFeatures.ENABLE_FLAG:
if flag_value is None:
raise ValueError(f"{ChromeBlinkFeatures.ENABLE_FLAG} cannot be None")
for feature in flag_value.split(","):
self._blink_features.enable(feature)
elif flag_name == ChromeBlinkFeatures.DISABLE_FLAG:
if flag_value is None:
raise ValueError(f"{ChromeBlinkFeatures.DISABLE_FLAG} cannot be None")
for feature in flag_value.split(","):
self._blink_features.disable(feature)
elif flag_name == self._JS_FLAG:
if flag_value is None:
raise ValueError(f"{self._JS_FLAG} cannot be None")
self._set_js_flag(flag_value, override)
else:
flag_value = self._verify_flag(flag_name, flag_value)
super()._set(flag_name, flag_value, override)
def _set_js_flag(self, raw_js_flags: str, override: bool) -> None:
new_js_flags = JSFlags(self._js_flags)
for js_flag_name, js_flag_value in JSFlags.parse(raw_js_flags).items():
new_js_flags.set(js_flag_name, js_flag_value, override=override)
self._js_flags.update(new_js_flags)
def _verify_flag(self, name: str, value: Optional[str]) -> Optional[str]:
if candidate := self._find_misspelled_flag(name):
logging.error(
"Potentially misspelled flag: '%s'. "
"Did you mean to use %s ?", name, candidate)
if name == "--user-data-dir":
if not value or not value.strip():
raise ValueError("--user-data-dir cannot be the empty string.")
expanded_dir = str(pathlib.Path(value).expanduser())
if expanded_dir != value:
logging.warning(
"Chrome Flags: auto-expanding --user-data-dir from '%s' to '%s'",
value, expanded_dir)
return expanded_dir
return value
def _find_misspelled_flag(self, name: str) -> Optional[str]:
if name in ("--enable-feature", "--enabled-feature", "--enabled-features"):
return "--enable-features"
if name in ("--disable-feature", "--disabled-feature",
"--disabled-features"):
return "--disable-features"
if name in ("--enable-blink-feature", "--enabled-blink-feature",
"--enabled-blink-features"):
return "--enable-blink-features"
if name in ("--disable-blink-feature", "--disabled-blink-feature",
"--disabled-blink-features"):
return "--disable-blink-features"
return None
@property
def features(self) -> ChromeFeatures:
return self._features
@property
def blink_features(self) -> ChromeBlinkFeatures:
return self._blink_features
@property
def js_flags(self) -> JSFlags:
return self._js_flags
def merge(self, other: Flags.InitialDataType) -> None:
if not isinstance(other, ChromeFlags):
other = ChromeFlags(other)
self.features.merge(other.features)
self.blink_features.merge(other.blink_features)
self.js_flags.merge(other.js_flags)
for name, value in other.base_items():
self.set(name, value)
def base_items(self) -> Iterable[Tuple[str, Optional[str]]]:
yield from super().items()
def items(self) -> Iterable[Tuple[str, Optional[str]]]:
yield from self.base_items()
if self._js_flags:
yield (self._JS_FLAG, str(self.js_flags))
yield from self.features.items()
yield from self.blink_features.items()
class ChromeBaseFeatures(Freezable, abc.ABC):
ENABLE_FLAG: str = ""
DISABLE_FLAG: str = ""
def __init__(self) -> None:
super().__init__()
self._enabled: Dict[str, Optional[str]] = {}
self._disabled: OrderedSet[str] = OrderedSet()
@property
def is_empty(self) -> bool:
return len(self._enabled) == 0 and len(self._disabled) == 0
@property
def enabled(self) -> Dict[str, Optional[str]]:
return dict(self._enabled)
@property
def disabled(self) -> OrderedSet[str]:
return OrderedSet(self._disabled)
def _parse_feature(self, feature: str) -> Tuple[str, Optional[str]]:
if not feature:
raise ValueError("Cannot parse empty feature")
if "," in feature:
raise ValueError(f"{repr(feature)} contains multiple features. "
"Please split them first.")
return self._parse_feature_parts(feature)
@abc.abstractmethod
def _parse_feature_parts(self, feature: str) -> Tuple[str, Optional[str]]:
pass
def enable(self, feature: str) -> None:
name, value = self._parse_feature(feature)
self._enable(name, value)
def _enable(self, name: str, value: Optional[str]) -> None:
self.assert_not_frozen()
if name in self._disabled:
raise ValueError(
f"Cannot enable previously disabled feature={repr(name)}")
if name in self._enabled:
prev_value = self._enabled[name]
if value != prev_value:
raise ValueError("Cannot set conflicting values "
f"({repr(prev_value)}, vs. {repr(value)}) "
f"for the same feature={repr(name)}")
else:
self._enabled[name] = value
def disable(self, feature: str) -> None:
self.assert_not_frozen()
name, _ = self._parse_feature(feature)
if name in self._enabled:
raise ValueError(
f"Cannot disable previously enabled feature={repr(name)}")
self._disabled.add(name)
def update(self, other: ChromeBaseFeatures) -> None:
if not isinstance(other, type(self)):
raise TypeError(f"Cannot merge {type(self)} with {type(other)}")
for disabled in other.disabled:
self.disable(disabled)
for name, value in other.enabled.items():
self._enable(name, value)
def merge(self, other: ChromeBaseFeatures) -> None:
self.update(other)
def items(self) -> Iterable[Tuple[str, str]]:
if self._enabled:
joined = ",".join(
k if v is None else f"{k}{v}" for k, v in self._enabled.items())
yield (self.ENABLE_FLAG, joined)
if self._disabled:
joined = ",".join(self._disabled)
yield (self.DISABLE_FLAG, joined)
def get_list(self) -> Iterable[str]:
for flag_name, features_str in self.items():
yield f"{flag_name}={features_str}"
def __str__(self) -> str:
result = " ".join(self.get_list())
return result
class ChromeFeatures(ChromeBaseFeatures):
"""
Chrome Features set, throws if features are enabled and disabled at the same
time.
Examples:
--disable-features="MyFeature1"
--enable-features="MyFeature1,MyFeature2"
--enable-features="MyFeature1:k1/v1/k2/v2,MyFeature2"
--enable-features="MyFeature3<Trial2:k1/v1/k2/v2"
"""
ENABLE_FLAG: str = "--enable-features"
DISABLE_FLAG: str = "--disable-features"
def _parse_feature_parts(self, feature: str) -> Tuple[str, Optional[str]]:
parts = feature.split("<")
if len(parts) == 2:
return (parts[0], "<" + parts[1])
if len(parts) != 1:
raise ValueError(f"Invalid number of feature parts: {repr(parts)}")
parts = feature.split(":")
if len(parts) == 2:
return (parts[0], ":" + parts[1])
if len(parts) != 1:
raise ValueError(f"Invalid number of feature parts: {repr(parts)}")
return (feature, None)
class ChromeBlinkFeatures(ChromeBaseFeatures):
"""
Chrome Features set, throws if features are enabled and disabled at the same
time.
Examples:
--disable-blink-features="MyFeature1"
--enable-blink-features="MyFeature1,MyFeature2"
"""
ENABLE_FLAG: str = "--enable-blink-features"
DISABLE_FLAG: str = "--disable-blink-features"
def _parse_feature_parts(self, feature: str) -> Tuple[str, Optional[str]]:
if "<" in feature or ":" in feature:
raise ValueError("blink feature do not have params, "
f"but found param separator in {repr(feature)}")
return (feature, None)