blob: 041ec6443f7db7ed0da2d327cb7f64837dd62f8a [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import abc
import ctypes
import datetime as dt
import json
import logging
from math import floor, log10
import os
import pathlib
import platform as py_platform
import plistlib
import shlex
import shutil
import subprocess
import sys
import textwrap
import threading
import time
import traceback as tb
import urllib
import urllib.error
import urllib.request
from typing import (Any, Callable, Dict, Final, Iterable, Iterator, List,
Optional, Sequence, Tuple, TypeVar, Union)
import psutil
if not hasattr(shlex, "join"):
raise Exception("Please update to python v3.8 that has shlex.join")
class TTYColor:
CYAN = "\033[1;36;6m"
PURPLE = "\033[1;35;5m"
BLUE = "\033[38;5;4m"
YELLOW = "\033[38;5;3m"
GREEN = "\033[38;5;2m"
RED = "\033[38;5;1m"
BLACK = "\033[38;5;0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
REVERSED = "\033[7m"
RESET = "\033[0m"
class ColoredLogFormatter(logging.Formatter):
FORMAT = "%(message)s"
FORMATS = {
logging.DEBUG: FORMAT + " (%(filename)s:%(lineno)d)",
logging.INFO: TTYColor.GREEN + FORMAT + TTYColor.RESET,
logging.WARNING: TTYColor.YELLOW + FORMAT + TTYColor.RESET,
logging.ERROR: TTYColor.RED + FORMAT + TTYColor.RESET,
logging.CRITICAL: TTYColor.BOLD + TTYColor.RED + FORMAT + TTYColor.RESET,
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
InputT = TypeVar("InputT")
KeyT = TypeVar("KeyT")
GroupT = TypeVar("GroupT")
def group_by(collection: Iterable[InputT],
key: Callable[[InputT], KeyT],
value: Optional[Callable[[InputT], Any]] = None,
group: Optional[Callable[[KeyT], GroupT]] = None,
sort_key: Optional[Callable[[Tuple[KeyT, GroupT]], Any]] = str
) -> Dict[KeyT, GroupT]:
"""
Works similar to itertools.groupby but does a global, SQL-style grouping
instead of a line-by-line basis like uniq.
key: a function that returns the grouping key for a group
group: a function that accepts a group_key and returns a group object that
has an append() method.
"""
assert key, "No key function provided"
key_fn = key
value_fn = value or (lambda item: item)
group_fn: Callable[[KeyT], GroupT] = group or (lambda key: [])
groups: Dict[KeyT, GroupT] = {}
for input_item in collection:
group_key: KeyT = key_fn(input_item)
group_item = value_fn(input_item)
if group_key not in groups:
new_group: GroupT = group_fn(group_key)
groups[group_key] = new_group
new_group.append(group_item)
else:
groups[group_key].append(group_item)
if sort_key:
# sort keys as well for more predictable behavior
items = sorted(groups.items(), key=sort_key)
else:
items = groups.items()
return dict(items)
def format_metric(value: Union[float, int],
stddev: Optional[float] = None) -> str:
"""Format value and stdev to only expose significant + 1 digits.
Example outputs:
100 ± 10%
100.1 ± 1.2%
100.12 ± 0.12%
100.123 ± 0.012%
100.1235 ± 0.0012%
"""
if not stddev:
return str(value)
stddev = float(stddev)
stddev_significant_digit = int(floor(log10(abs(stddev))))
value_width = max(0, 1 - stddev_significant_digit)
percent = stddev / value * 100
percent_significant_digit = int(floor(log10(abs(percent))))
percent_width = max(0, 1 - percent_significant_digit)
return f"{value:.{value_width}f} ± {percent:.{percent_width}f}%"
def sort_by_file_size(files: Iterable[pathlib.Path]) -> List[pathlib.Path]:
return sorted(files, key=lambda f: (-f.stat().st_size, f.name))
SIZE_UNITS: Final[Tuple[str, ...]] = ("B", "KiB", "MiB", "GiB", "TiB")
def get_file_size(file: pathlib.Path, digits: int = 2) -> str:
size = file.stat().st_size
unit_index = 0
divisor = 1024
while (unit_index < len(SIZE_UNITS)) and size >= divisor:
unit_index += 1
size /= divisor
return f"{size:.{digits}f} {SIZE_UNITS[unit_index]}"
class Platform(abc.ABC):
@property
@abc.abstractmethod
def name(self) -> str:
pass
@property
def is_remote(self) -> bool:
return False
@property
def machine(self) -> str:
return py_platform.machine()
@property
def is_ia32(self) -> bool:
return self.machine in ("i386", "i686", "x86")
@property
def is_x64(self) -> bool:
return self.machine in ("x86_64", "AMD64")
@property
def is_arm64(self) -> bool:
return self.machine in ("arm64", "aarch64")
@property
def is_macos(self) -> bool:
return False
@property
def is_linux(self) -> bool:
return False
@property
def is_posix(self) -> bool:
return self.is_macos or self.is_linux
@property
def is_win(self) -> bool:
return False
@property
def is_battery_powered(self) -> bool:
if not psutil.sensors_battery:
return False
status = psutil.sensors_battery()
if not status:
return False
return status.power_plugged
def search_app(self, app_path: pathlib.Path) -> Optional[pathlib.Path]:
return self.search_binary(app_path)
@abc.abstractmethod
def search_binary(self, app_path: pathlib.Path) -> Optional[pathlib.Path]:
pass
@abc.abstractmethod
def app_version(self, app_path: pathlib.Path) -> str:
pass
@property
def has_display(self) -> bool:
return True
def sleep(self, seconds: Union[float, dt.timedelta]) -> None:
if isinstance(seconds, dt.timedelta):
seconds = seconds.total_seconds()
if seconds == 0:
return
logging.debug("WAIT %ss", seconds)
time.sleep(seconds)
def which(self, binary):
# TODO(cbruni): support remote platforms
return shutil.which(binary)
def processes(self,
attrs: Optional[List[str]] = None) -> List[Dict[str, Any]]:
assert not self.is_remote, "Only local platform supported"
return [
p.info # pytype: disable=attribute-error
for p in psutil.process_iter(attrs=attrs)
]
def process_running(self, process_name_list: List[str]) -> Optional[str]:
for proc in psutil.process_iter():
try:
if proc.name().lower() in process_name_list:
return proc.name()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return None
def process_children(self, parent_pid: int,
recursive: bool = False) -> List[Dict[str, Any]]:
return [
p.as_dict()
for p in psutil.Process(parent_pid).children(recursive=recursive)
]
def foreground_process(self) -> Optional[Dict[str, Any]]:
return None
def sh_stdout(self,
*args,
shell: bool = False,
quiet: bool = False,
encoding: str = "utf-8") -> str:
completed_process = self.sh(
*args, shell=shell, capture_output=True, quiet=quiet)
return completed_process.stdout.decode(encoding)
def popen(self,
*args,
shell: bool = False,
stdout=None,
stderr=None,
stdin=None,
env=None,
quiet: bool = False) -> subprocess.Popen:
if not quiet:
logging.debug("SHELL: %s", shlex.join(map(str, args)))
logging.debug("CWD: %s", os.getcwd())
return subprocess.Popen(
args=args,
shell=shell,
stdin=stdin,
stderr=stderr,
stdout=stdout,
env=env)
def sh(self,
*args,
shell: bool = False,
capture_output: bool = False,
stdout=None,
stderr=None,
stdin=None,
env=None,
quiet=False) -> subprocess.CompletedProcess:
if not quiet:
logging.debug("SHELL: %s", shlex.join(map(str, args)))
logging.debug("CWD: %s", os.getcwd())
process = subprocess.run(
args=args,
shell=shell,
stdin=stdin,
stdout=stdout,
stderr=stderr,
env=env,
capture_output=capture_output,
check=False)
if process.returncode != 0:
raise SubprocessError(process)
return process
def exec_apple_script(self, script: str, quiet: bool = False):
raise NotImplementedError("AppleScript is only available on MacOS")
def terminate(self, proc_pid: int) -> None:
process = psutil.Process(proc_pid)
for proc in process.children(recursive=True):
proc.terminate()
process.terminate()
def log(self, *messages: Any, level: int = 2) -> None:
message_str = " ".join(map(str, messages))
if level == 3:
level = logging.DEBUG
if level == 2:
level = logging.INFO
if level == 1:
level = logging.WARNING
if level == 0:
level = logging.ERROR
logging.log(level, message_str)
# TODO(cbruni): split into separate list_system_monitoring and
# disable_system_monitoring methods
def check_system_monitoring(self, disable: bool = False) -> bool:
# pylint: disable=unused-argument
return True
def get_relative_cpu_speed(self) -> float:
return 1
def is_thermal_throttled(self) -> bool:
return self.get_relative_cpu_speed() < 1
def disk_usage(self, path: pathlib.Path) -> psutil._common.sdiskusage:
return psutil.disk_usage(str(path))
def cpu_usage(self) -> float:
return 1 - psutil.cpu_times_percent().idle / 100
def cpu_details(self) -> Dict[str, Any]:
details = {
"physical cores":
psutil.cpu_count(logical=False),
"logical cores":
psutil.cpu_count(logical=True),
"usage":
psutil.cpu_percent( # pytype: disable=attribute-error
percpu=True, interval=0.1),
"total usage":
psutil.cpu_percent(),
"system load":
psutil.getloadavg(),
}
try:
cpu_freq = psutil.cpu_freq()
except FileNotFoundError:
# MacOS M1 fail for this some times
return details
details.update({
"max frequency": f"{cpu_freq.max:.2f}Mhz",
"min frequency": f"{cpu_freq.min:.2f}Mhz",
"current frequency": f"{cpu_freq.current:.2f}Mhz",
})
return details
def system_details(self) -> Dict[str, Any]:
return {
"machine": py_platform.machine(),
"os": {
"system": py_platform.system(),
"release": py_platform.release(),
"version": py_platform.version(),
"platform": py_platform.platform(),
},
"python": {
"version": py_platform.python_version(),
"bits": "64" if sys.maxsize > 2**32 else "32",
},
"CPU": self.cpu_details(),
}
def download_to(self, url: str, path: pathlib.Path) -> pathlib.Path:
logging.debug("DOWNLOAD: %s\n TO: %s", url, path)
assert not path.exists(), f"Download destination {path} exists already."
try:
urllib.request.urlretrieve(url, path)
except (urllib.error.HTTPError, urllib.error.URLError) as e:
raise OSError(f"Could not load {url}") from e
assert path.exists(), (
f"Downloading {url} failed. Downloaded file {path} doesn't exist.")
return path
def concat_files(self, inputs: Iterable[pathlib.Path],
output: pathlib.Path) -> pathlib.Path:
with output.open("w", encoding="utf-8") as output_f:
for input_file in inputs:
assert input_file.is_file()
with input_file.open(encoding="utf-8") as input_f:
shutil.copyfileobj(input_f, output_f)
return output
def set_main_display_brightness(self, brightness_level: int) -> None:
raise NotImplementedError(
"Implementation is only available on MacOS for now")
def get_main_display_brightness(self) -> int:
raise NotImplementedError(
"Implementation is only available on MacOS for now")
def check_autobrightness(self) -> bool:
raise NotImplementedError(
"Implementation is only available on MacOS for now")
class SubprocessError(subprocess.CalledProcessError):
""" Custom version that also prints stderr for debugging"""
def __init__(self, process):
super().__init__(process.returncode, shlex.join(map(str, process.args)),
process.stdout, process.stderr)
def __str__(self) -> str:
super_str = super().__str__()
if not self.stderr:
return super_str
return f"{super_str}\nstderr:{self.stderr.decode()}"
class WinPlatform(Platform):
SEARCH_PATHS = (
pathlib.Path("."),
pathlib.Path(os.path.expandvars("%ProgramFiles%")),
pathlib.Path(os.path.expandvars("%ProgramFiles(x86)%")),
pathlib.Path(os.path.expandvars("%APPDATA%")),
pathlib.Path(os.path.expandvars("%LOCALAPPDATA%")),
)
@property
def is_win(self) -> bool:
return True
@property
def name(self) -> str:
return "win"
def search_binary(self, app_path: pathlib.Path) -> Optional[pathlib.Path]:
if app_path.suffix != ".exe":
raise ValueError("Expected executable path with '.exe' suffix, "
f"but got: '{app_path.name}'")
for path in self.SEARCH_PATHS:
# Recreate Path object for easier pyfakefs testing
result_path = pathlib.Path(path) / app_path
if result_path.exists():
return result_path
return None
def app_version(self, app_path: pathlib.Path) -> str:
assert app_path.exists(), f"Binary {app_path} does not exist."
return self.sh_stdout(
"powershell", "-command",
f"(Get-Item '{app_path}').VersionInfo.ProductVersion")
class PosixPlatform(Platform, metaclass=abc.ABCMeta):
def app_version(self, app_path: pathlib.Path) -> str:
assert app_path.exists(), f"Binary {app_path} does not exist."
return self.sh_stdout(app_path, "--version")
class MacOSPlatform(PosixPlatform):
SEARCH_PATHS = (
pathlib.Path("."),
pathlib.Path("/Applications"),
pathlib.Path.home() / "Applications",
)
@property
def is_macos(self) -> bool:
return True
@property
def name(self) -> str:
return "macos"
def _find_app_binary_path(self, app_path: pathlib.Path) -> pathlib.Path:
assert app_path.suffix == ".app"
bin_path = app_path / "Contents" / "MacOS" / app_path.stem
if bin_path.exists():
return bin_path
binaries = [path for path in bin_path.parent.iterdir() if path.is_file()]
if len(binaries) == 1:
return binaries[0]
# Fallback to read plist
plist_path = app_path / "Contents" / "Info.plist"
assert plist_path.is_file, (
f"Could not find Info.plist in app bundle: {app_path}")
with plist_path.open("rb") as f:
plist = plistlib.load(f)
bin_path = (
app_path / "Contents" / "MacOS" /
plist.get("CFBundleExecutable", app_path.stem))
if bin_path.is_file():
return bin_path
raise Exception(f"Invalid number of binaries candidates found: {binaries}")
def search_binary(self, app_path: pathlib.Path) -> Optional[pathlib.Path]:
if app_path.suffix != ".app":
raise ValueError("Expected app name with '.app' suffix, "
f"but got: '{app_path.name}'")
for search_path in self.SEARCH_PATHS:
# Recreate Path object for easier pyfakefs testing
result_path = pathlib.Path(search_path) / app_path
if not result_path.is_dir():
continue
result_path = self._find_app_binary_path(result_path)
if result_path.exists():
return result_path
return None
def search_app(self, app_path: pathlib.Path) -> Optional[pathlib.Path]:
binary = self.search_binary(app_path)
if not binary:
return None
# input: /Applications/Safari.app/Contents/MacOS/Safari
# output: /Applications/Safari.app
app_path = binary.parents[2]
assert app_path.suffix == ".app"
assert app_path.is_dir()
return app_path
def app_version(self, app_path: pathlib.Path) -> str:
assert app_path.exists(), f"Binary {bin} does not exist."
dot_app_path = None
current = app_path
while current != app_path.root:
if current.suffix == ".app":
dot_app_path = current
break
current = current.parent
if not dot_app_path:
# Most likely just a cli tool"
return self.sh_stdout(app_path, "--version").strip()
version_string = self.sh_stdout("mdls", "-name", "kMDItemVersion",
dot_app_path).strip()
# Filter output: 'kMDItemVersion = "14.1"' => '"14.1"'
_, version_string = version_string.split(" = ", maxsplit=1)
if version_string != "(null)":
# Strip quotes: '"14.1"' => '14.1'
return version_string[1:-1]
# Backup solution with --version
maybe_bin_path: Optional[pathlib.Path] = app_path
if app_path.suffix == ".app":
maybe_bin_path = self.search_binary(app_path)
if maybe_bin_path:
try:
return self.sh_stdout(maybe_bin_path, "--version").strip()
except SubprocessError as e:
logging.debug("Could not use --version: %s", e)
raise ValueError(f"Could not extract app version: {app_path}")
def exec_apple_script(self, script: str, quiet: bool = False):
if not quiet:
logging.debug("AppleScript: %s", script)
return self.sh("/usr/bin/osascript", "-e", script)
def foreground_process(self) -> Optional[Dict[str, Any]]:
foreground_process_info = self.sh_stdout("lsappinfo", "front").strip()
if not foreground_process_info:
return None
foreground_info = self.sh_stdout("lsappinfo", "info", "-only", "pid",
foreground_process_info).strip()
_, pid = foreground_info.split("=")
if pid and pid.isdigit():
return psutil.Process(int(pid)).as_dict()
return None
def get_relative_cpu_speed(self) -> float:
try:
lines = self.sh_stdout("pmset", "-g", "therm").split()
for index, line in enumerate(lines):
if line == "CPU_Speed_Limit":
return int(lines[index + 2]) / 100.0
except SubprocessError:
logging.debug("Could not get relative PCU speed: %s", tb.format_exc())
return 1
def system_details(self) -> Dict[str, Any]:
details = super().system_details()
details.update({
"system_profiler":
self.sh_stdout("system_profiler", "SPHardwareDataType"),
"sysctl_machdep_cpu":
self.sh_stdout("sysctl", "machdep.cpu"),
"sysctl_hw":
self.sh_stdout("sysctl", "hw"),
})
return details
def check_system_monitoring(self, disable: bool = False) -> bool:
return self.check_crowdstrike(disable)
def check_autobrightness(self) -> bool:
output = self.sh_stdout("system_profiler", "SPDisplaysDataType", "-json").strip()
data = json.loads(output)
if spdisplays_data := data.get("SPDisplaysDataType"):
for data in spdisplays_data:
if spdisplays_ndrvs := data.get("spdisplays_ndrvs"):
for display in spdisplays_ndrvs:
if auto_brightness := display.get("spdisplays_ambient_brightness"):
return auto_brightness == "spdisplays_yes"
raise Exception("Could not find 'spdisplays_ndrvs' from SPDisplaysDataType")
raise Exception("Could not get 'SPDisplaysDataType' form system profiler")
def check_crowdstrike(self, disable: bool = False) -> bool:
falconctl = pathlib.Path(
"/Applications/Falcon.app/Contents/Resources/falconctl")
if not falconctl.exists():
logging.debug("You're fine, falconctl or %s are not installed.",
falconctl)
return True
if not disable:
for process in self.processes(attrs=["exe"]):
exe = process["exe"]
if exe and exe.endswith("/com.crowdstrike.falcon.Agent"):
return False
return True
try:
logging.warning("Checking falcon sensor status:")
status = self.sh_stdout("sudo", falconctl, "stats", "agent_info")
except SubprocessError:
return True
if "operational: true" not in status:
# Early return if not running, no need to disable the sensor.
return True
# Try disabling the process
logging.warning("Disabling crowdstrike monitoring:")
self.sh("sudo", falconctl, "unload")
return True
def _get_display_service(self):
core_graphics = ctypes.CDLL(
"/System/Library/Frameworks/CoreGraphics.framework/CoreGraphics")
main_display = core_graphics.CGMainDisplayID()
display_services = ctypes.CDLL(
"/System/Library/PrivateFrameworks/DisplayServices.framework"
"/DisplayServices")
display_services.DisplayServicesSetBrightness.argtypes = [
ctypes.c_int, ctypes.c_float
]
display_services.DisplayServicesGetBrightness.argtypes = [
ctypes.c_int, ctypes.POINTER(ctypes.c_float)
]
return display_services, main_display
def set_main_display_brightness(self, brightness_level: int) -> None:
"""Sets the main display brightness at the specified percentage by
brightness_level.
This function imitates the open-source "brightness" tool at
https://github.com/nriley/brightness.
Since the benchmark doesn't care about older MacOSen, multiple displays
or other complications that tool has to consider, setting the brightness
level boils down to calling this function for the main display.
Args:
brightness_level: Percentage at which we want to set screen brightness.
Raises:
AssertionError: An error occurred when we tried to set the brightness
"""
display_services, main_display = self._get_display_service()
ret = display_services.DisplayServicesSetBrightness(main_display,
brightness_level / 100)
assert ret == 0
def get_main_display_brightness(self) -> int:
"""Gets the current brightness level of the main display .
This function imitates the open-source "brightness" tool at
https://github.com/nriley/brightness.
Since the benchmark doesn't care about older MacOSen, multiple displays
or other complications that tool has to consider, setting the brightness
level boils down to calling this function for the main display.
Returns:
An int of the current percentage value of the main screen brightness
Raises:
AssertionError: An error occurred when we tried to set the brightness
"""
display_services, main_display = self._get_display_service()
display_brightness = ctypes.c_float()
ret = display_services.DisplayServicesGetBrightness(
main_display, ctypes.byref(display_brightness))
assert ret == 0
return round(display_brightness.value * 100)
class LinuxPlatform(PosixPlatform):
SEARCH_PATHS = (
pathlib.Path("."),
pathlib.Path("/usr/local/sbin"),
pathlib.Path("/usr/local/bin"),
pathlib.Path("/usr/sbin"),
pathlib.Path("/usr/bin"),
pathlib.Path("/sbin"),
pathlib.Path("/bin"),
pathlib.Path("/opt/google"),
)
@property
def is_linux(self) -> bool:
return True
@property
def name(self) -> str:
return "linux"
def check_system_monitoring(self, disable: bool = False) -> bool:
return True
@property
def has_display(self) -> bool:
return "DISPLAY" in os.environ
def system_details(self) -> Dict[str, Any]:
details = super().system_details()
for info_bin in ("lscpu", "inxi"):
if self.which(info_bin):
details[info_bin] = self.sh_stdout(info_bin)
return details
def search_binary(self, app_path: pathlib.Path) -> Optional[pathlib.Path]:
for path in self.SEARCH_PATHS:
# Recreate Path object for easier pyfakefs testing
result_path = pathlib.Path(path) / app_path
if result_path.exists():
return result_path
return None
if sys.platform == "linux":
platform = LinuxPlatform()
elif sys.platform == "darwin":
platform = MacOSPlatform()
elif sys.platform == "win32":
platform = WinPlatform()
else:
raise Exception("Unsupported Platform")
log = platform.log
def search_app_or_executable(name: str,
macos: Sequence[str] = (),
win: Sequence[str] = (),
linux: Sequence[str] = ()) -> pathlib.Path:
executables: Sequence[str] = []
if platform.is_macos:
executables = macos
elif platform.is_win:
executables = win
elif platform.is_linux:
executables = linux
if not executables:
raise ValueError(
f"Executable {name} not supported on platform {platform.name}")
for name_or_path in executables:
binary = platform.search_app(pathlib.Path(name_or_path))
if binary and binary.exists():
return binary
raise Exception(f"Executable {name} not found on {platform.name}")
# =============================================================================
def urlopen(url: str):
try:
return urllib.request.urlopen(url)
except urllib.error.HTTPError as e:
log(f"Could not load url={url}")
raise e
# =============================================================================
class ChangeCWD:
def __init__(self, destination: pathlib.Path):
self.new_dir = destination
self.prev_dir: Optional[str] = None
def __enter__(self):
self.prev_dir = os.getcwd()
os.chdir(self.new_dir)
def __exit__(self, exc_type, exc_value, exc_traceback):
os.chdir(self.prev_dir)
class SystemSleepPreventer:
"""
Prevent the system from going to sleep while running the benchmark.
"""
def __init__(self):
self._process = None
def __enter__(self):
if platform.is_macos:
self._process = platform.popen("caffeinate", "-imdsu")
# TODO: Add linux support
def __exit__(self, exc_type, exc_value, exc_traceback):
if self._process is not None:
self._process.kill()
class TimeScope:
"""
Measures and logs the time spend during the lifetime of the TimeScope.
"""
def __init__(self, message: str, level: int = 3):
self._message = message
self._level = level
self._start: Optional[dt.datetime] = None
@property
def message(self) -> str:
return self._message
def __enter__(self):
self._start = dt.datetime.now()
def __exit__(self, exc_type, exc_value, exc_traceback):
assert self._start
diff = dt.datetime.now() - self._start
log(f"{self._message} duration={diff}", level=self._level)
class WaitRange:
min: dt.timedelta
max: dt.timedelta
current: dt.timedelta
def __init__(
self,
min: Union[float, dt.timedelta] = 0.1, # pylint: disable=redefined-builtin
timeout: Union[float, dt.timedelta] = 10,
factor: float = 1.01,
max: Optional[Union[float, dt.timedelta]] = None, # pylint: disable=redefined-builtin
max_iterations: Optional[int] = None):
if isinstance(min, dt.timedelta):
self.min = min
else:
self.min = dt.timedelta(seconds=min)
assert self.min.total_seconds() > 0
if not max:
self.max = self.min * 10
elif isinstance(max, dt.timedelta):
self.max = max
else:
self.max = dt.timedelta(seconds=max)
assert self.min <= self.max
assert 1.0 < factor
self.factor = factor
if isinstance(timeout, dt.timedelta):
self.timeout = timeout
else:
self.timeout = dt.timedelta(seconds=timeout)
assert 0 < self.timeout.total_seconds()
self.current = self.min
assert max_iterations is None or max_iterations > 0
self.max_iterations = max_iterations
def __iter__(self) -> Iterator[dt.timedelta]:
assert self.current
i = 0
while self.max_iterations is None or i < self.max_iterations:
yield self.current
self.current = min(self.current * self.factor, self.max)
i += 1
def wait_with_backoff(wait_range: WaitRange) -> Iterator[Tuple[float, float]]:
assert isinstance(wait_range, WaitRange)
start = dt.datetime.now()
timeout = wait_range.timeout
duration = 0
for sleep_for in wait_range:
duration = dt.datetime.now() - start
if duration > wait_range.timeout:
raise TimeoutError(f"Waited for {duration}")
time_left = timeout - duration
yield duration.total_seconds(), time_left.total_seconds()
platform.sleep(sleep_for.total_seconds())
class Durations:
"""
Helper object to track durations.
"""
class _DurationMeasureContext:
def __init__(self, durations: Durations, name: str):
self._start_time = None
self._durations = durations
self._name = name
def __enter__(self):
self._start_time = dt.datetime.now()
def __exit__(self, exc_type, exc_value, traceback):
assert self._start_time
delta = dt.datetime.now() - self._start_time
self._durations[self._name] = delta
def __init__(self):
self._durations: Dict[str, dt.timedelta] = {}
def __getitem__(self, name: str) -> dt.timedelta:
return self._durations[name]
def __setitem__(self, name: str, duration: dt.timedelta) -> None:
assert name not in self._durations, (f"Cannot set '{name}' duration twice!")
self._durations[name] = duration
def __len__(self) -> int:
return len(self._durations)
def measure(self, name: str) -> _DurationMeasureContext:
assert name not in self._durations, (
f"Cannot measure '{name}' duration twice!")
return self._DurationMeasureContext(self, name)
def to_json(self) -> Dict[str, float]:
return {
name: self._durations[name].total_seconds()
for name in sorted(self._durations.keys())
}
def wrap_lines(body: str, width: int = 80, indent: str = "") -> Iterable[str]:
for line in body.splitlines():
for split in textwrap.wrap(line, width):
yield f"{indent}{split}"
class Spinner:
CURSORS = "◐◓◑◒"
def __init__(self, sleep: float = 0.5):
self._is_running = False
self._sleep_time = sleep
def __enter__(self):
# Only enable the spinner if the output is an interactive terminal.
is_atty = hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
if is_atty:
self._is_running = True
threading.Thread(target=self._spin).start()
def __exit__(self, exc_type, exc_value, traceback):
if self._is_running:
self._is_running = False
self._sleep()
def _cursors(self) -> Iterable[str]:
while True:
yield from Spinner.CURSORS
def _spin(self) -> None:
stdout = sys.stdout
for cursor in self._cursors():
if not self._is_running:
return
# Print the current wait-cursor and send a carriage return to move to the
# start of the line.
stdout.write(f" {cursor}\r")
stdout.flush()
self._sleep()
def _sleep(self) -> None:
time.sleep(self._sleep_time)