blob: e30fc24052c6c48762e164cd84582b3064a8db64 [file] [log] [blame]
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import abc
import atexit
import collections.abc
import contextlib
import dataclasses
import functools
import inspect
import logging
import os
import pathlib
import platform as py_platform
import shlex
import shutil
import socket
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
from typing import TYPE_CHECKING, Any, Callable, Final, Generator, Iterable, \
Iterator, Mapping, Optional, Sequence, Type
import google.cloud.storage as gcloud_storage
import psutil
from crossbench import parse
from crossbench import path as pth
from crossbench.helper import wait
from crossbench.parse import ObjectParser
from crossbench.plt import proc_helper
from crossbench.plt.arch import MachineArch
from crossbench.plt.bin import Binary
from crossbench.plt.port_manager import LocalPortManager, PortManager, \
PortScope
from crossbench.plt.remote import RemotePopen
if TYPE_CHECKING:
import datetime as dt
import google.cloud.storage.blob as gcloud_blob
from crossbench.plt.display_info import DisplayInfo
from crossbench.plt.process_meminfo import ProcessMeminfo
from crossbench.plt.signals import AnySignals, Signals
from crossbench.plt.types import CmdArg, ProcessIo, ProcessLike, TupleCmdArgs
from crossbench.plt.version import PlatformVersion
from crossbench.types import JsonDict
class Environ(collections.abc.MutableMapping, metaclass=abc.ABCMeta):
pass
class LocalEnviron(Environ):
def __init__(self) -> None:
self._environ = os.environ
def __getitem__(self, key: str) -> str:
return self._environ.__getitem__(key)
def __setitem__(self, key: str, item: str) -> None:
self._environ.__setitem__(key, item)
def __delitem__(self, key: str) -> None:
self._environ.__delitem__(key)
def __iter__(self) -> Iterator[str]:
return self._environ.__iter__()
def __len__(self) -> int:
return self._environ.__len__()
class SubprocessError(subprocess.CalledProcessError):
""" Custom version that also prints stderr for debugging"""
def __init__(self, platform: Platform,
process: subprocess.CompletedProcess) -> None:
self.platform = platform
super().__init__(process.returncode, shlex.join(map(str, process.args)),
process.stdout, process.stderr)
def __str__(self) -> str:
super_str = super().__str__()
if not self.stderr:
return f"{self.platform}: {super_str}"
return f"{self.platform}: {super_str}\nstderr:{self.stderr.decode()}"
@dataclasses.dataclass
class CPUFreqInfo:
min: float
max: float
current: float
_NEXT_PLATFORM_ID = 0
def _next_id() -> int:
global _NEXT_PLATFORM_ID # noqa: PLW0603
new_id = _NEXT_PLATFORM_ID
_NEXT_PLATFORM_ID += 1
return new_id
DEFAULT_CACHE_DIR: Final = pth.LocalPath(__file__).parents[2] / "cache"
class Platform(abc.ABC):
def __init__(self) -> None:
self._id: Final[int] = _next_id()
self._binary_lookup_override: dict[str, pth.AnyPath] = {}
self._cache_dir_root: pth.AnyPath | None = None
self._default_port_manager: Final[PortManager] = self._create_port_manager()
self._default_tmp_dir: Final[pth.AnyPath] = self._create_default_tmp_dir()
def _create_port_manager(self) -> PortManager:
return LocalPortManager(self)
def _create_default_tmp_dir(self) -> pth.AnyPath:
self.assert_is_local()
return self.path(tempfile.gettempdir())
def assert_is_local(self) -> None:
if self.is_local:
return
caller = "assert_is_local"
caller = inspect.stack()[1].function
raise RuntimeError(f"{type(self).__name__}.{caller}(...) is not supported "
"on remote platform")
@property
@abc.abstractmethod
def signals(self) -> Type[AnySignals]:
pass
@property
def id(self) -> int:
return self._id
@property
def unique_name(self) -> str:
"""Unique id per platform."""
key_str = ".".join(self.key)
remote_str = "remote" if self.is_remote else "local"
return f"{key_str}.{remote_str}.{self.id}"
@property
@abc.abstractmethod
def name(self) -> str:
"""Descriptive name e.g. macos of the platform. non-unique."""
@property
@abc.abstractmethod
def version_str(self) -> str:
pass
@property
@abc.abstractmethod
def version(self) -> PlatformVersion:
pass
@property
def version_parts(self) -> tuple[int, ...]:
return self.version.parts
@property
@abc.abstractmethod
def model(self) -> str:
pass
@property
@abc.abstractmethod
def cpu(self) -> str:
pass
@functools.lru_cache(maxsize=2)
def cpu_cores(self, logical: bool) -> int:
self.assert_is_local()
if cores := psutil.cpu_count(logical=logical):
return cores
return 0
@property
def full_version(self) -> str:
return f"{self.name} {self.version_str} {self.machine}"
def __str__(self) -> str:
return self.unique_name
@property
def is_remote(self) -> bool:
return False
@property
def is_local(self) -> bool:
return not self.is_remote
@property
def host_platform(self) -> Platform:
return self
@functools.cached_property
def machine(self) -> MachineArch:
raw = self._raw_machine_arch()
if raw in ("i386", "i686", "x86", "ia32"):
return MachineArch.IA32
if raw in ("x86_64", "AMD64"):
return MachineArch.X64
if raw in ("arm64", "aarch64"):
return MachineArch.ARM_64
if raw in ("arm"):
return MachineArch.ARM_32
raise NotImplementedError(f"Unsupported machine type: {raw}")
@functools.lru_cache(maxsize=1)
def _raw_machine_arch(self) -> str:
self.assert_is_local()
return py_platform.machine()
@property
def is_ia32(self) -> bool:
return self.machine == MachineArch.IA32
@property
def is_x64(self) -> bool:
return self.machine == MachineArch.X64
@property
def is_arm64(self) -> bool:
return self.machine == MachineArch.ARM_64
@property
def key(self) -> tuple[str, str]:
"""Key used for looking up platform specific objects."""
return (self.name, str(self.machine))
@property
def is_macos(self) -> bool:
return False
@property
def is_ios(self) -> bool:
return False
@property
def is_apple(self) -> bool:
return self.is_macos or self.is_ios
@property
def is_linux(self) -> bool:
return False
@property
def is_android(self) -> bool:
return False
@property
def is_chromeos(self) -> bool:
return False
@property
def is_posix(self) -> bool:
return self.is_macos or self.is_linux or self.is_android
@property
def is_win(self) -> bool:
return False
@property
def is_remote_ssh(self) -> bool:
return False
@property
def environ(self) -> Environ:
self.assert_is_local()
return LocalEnviron()
@property
def is_battery_powered(self) -> bool:
self.assert_is_local()
if not psutil.sensors_battery: # type: ignore
return False
status = psutil.sensors_battery()
if not status:
return False
return not status.power_plugged
@functools.lru_cache(maxsize=1)
def cpu_details(self) -> dict[str, Any]:
self.assert_is_local()
details = {
"physical cores": self.cpu_cores(logical=False),
"logical cores": self.cpu_cores(logical=True),
"usage": psutil.cpu_percent(percpu=True, interval=0.1),
"total usage": psutil.cpu_percent(),
"system load": psutil.getloadavg(),
"info": self.cpu,
"min frequency": "N/A",
"max frequency": "N/A",
"current frequency": "N/A",
}
if cpu_freq := self._cpu_freq():
details.update({
"min frequency": f"{cpu_freq.min:.2f}Mhz",
"max frequency": f"{cpu_freq.max:.2f}Mhz",
"current frequency": f"{cpu_freq.current:.2f}Mhz",
})
return details
def _cpu_freq(self) -> Optional[CPUFreqInfo]:
self.assert_is_local()
cpu_freq = psutil.cpu_freq()
return CPUFreqInfo(cpu_freq.min, cpu_freq.max, cpu_freq.current)
@functools.lru_cache(maxsize=1)
def system_details(self) -> dict[str, Any]:
return {
"machine": str(self.machine),
"os": self.os_details(),
"python": self.python_details(),
"CPU": self.cpu_details(),
"display": self.display_details()
}
@functools.lru_cache(maxsize=1)
def os_details(self) -> JsonDict:
self.assert_is_local()
return {
"system": py_platform.system(),
"release": py_platform.release(),
"version": py_platform.version(),
"platform": py_platform.platform(),
}
@functools.lru_cache(maxsize=1)
def python_details(self) -> JsonDict:
self.assert_is_local()
return {
"version": py_platform.python_version(),
"bits": 64 if sys.maxsize > 2**32 else 32,
}
def display_details(self) -> tuple[DisplayInfo, ...]:
# TODO: implement on more platforms
return ()
def get_relative_cpu_speed(self) -> float:
return 1
def is_thermal_throttled(self) -> bool:
return self.get_relative_cpu_speed() < 1
@abc.abstractmethod
def uptime(self) -> dt.timedelta:
pass
def disk_usage(self, path: pth.AnyPathLike) -> psutil._common.sdiskusage:
return psutil.disk_usage(str(self.local_path(path)))
def cpu_usage(self) -> float:
self.assert_is_local()
return 1 - psutil.cpu_times_percent().idle / 100
def _search_executable(
self,
name: str,
macos: Sequence[str],
win: Sequence[str],
linux: Sequence[str],
lookup_callable: Callable[[pth.AnyPath], Optional[pth.AnyPath]],
) -> pth.AnyPath:
executables: Sequence[str] = []
if self.is_macos:
executables = macos
elif self.is_win:
executables = win
elif self.is_linux:
executables = linux
if not executables:
raise ValueError(f"Executable {name} not supported on {self}")
for name_or_path in executables:
path = self.local_path(name_or_path).expanduser()
binary = lookup_callable(path)
if binary and self.exists(binary):
return binary
raise ValueError(f"Executable {name} not found on {self}")
def search_app_or_executable(
self,
name: str,
macos: Sequence[str] = (),
win: Sequence[str] = (),
linux: Sequence[str] = ()
) -> pth.AnyPath:
return self._search_executable(name, macos, win, linux, self.search_app)
def search_platform_binary(
self,
name: str,
macos: Sequence[str] = (),
win: Sequence[str] = (),
linux: Sequence[str] = ()
) -> pth.AnyPath:
return self._search_executable(name, macos, win, linux, self.search_binary)
def search_app(self, app_or_bin: pth.AnyPath) -> Optional[pth.AnyPath]:
"""Look up a application bundle (macos) or binary (all other platforms) in
the common search paths.
"""
return self.search_binary(app_or_bin)
@abc.abstractmethod
def search_binary(self, app_or_bin: pth.AnyPathLike) -> Optional[pth.AnyPath]:
"""Look up a binary in the common search paths based of a path or a single
segment path with just the binary name.
Returns the location of the binary (and not the .app bundle on macOS).
"""
@abc.abstractmethod
def app_version(self, app_or_bin: pth.AnyPathLike) -> str:
pass
@property
def is_headless(self) -> bool:
return not self.has_display
@property
def has_display(self) -> bool:
"""Return a bool whether the platform has an active display.
This can be false on linux without $DISPLAY, true an all other platforms."""
return True
def sleep(self, seconds: float | dt.timedelta) -> None:
wait.sleep(seconds)
def parse_binary_path(self,
value: pth.AnyPathLike,
name: str = "value") -> pth.AnyPath:
# Helper to avoid circular imports.
return parse.PathParser.binary_path(value, self, name)
def parse_local_binary_path(self,
value: pth.AnyPathLike,
name: str = "value") -> pth.LocalPath:
self.assert_is_local()
# Helper to avoid circular imports.
return parse.PathParser.local_binary_path(value, self, name)
def which(self, binary_name: pth.AnyPathLike) -> Optional[pth.AnyPath]:
if not binary_name:
raise ValueError("Got empty path")
self.assert_is_local()
if binary_override := self.lookup_binary_override(binary_name):
return binary_override
if result := shutil.which(os.fspath(binary_name)):
return self.path(result)
return None
def lookup_binary_override(
self, binary_name: pth.AnyPathLike) -> Optional[pth.AnyPath]:
return self._binary_lookup_override.get(os.fspath(binary_name))
def set_binary_lookup_override(self, binary_name: pth.AnyPathLike,
new_path: Optional[pth.AnyPath]) -> None:
name = os.fspath(binary_name)
if new_path is None:
prev_result = self._binary_lookup_override.pop(name, None)
if prev_result is None:
logging.debug(
"Could not remove binary override for %s as it was never set",
binary_name)
return
if self.search_binary(new_path) is None:
raise ValueError(f"Suggested binary override for {repr(name)} "
f"does not exist: {new_path}")
self._binary_lookup_override[name] = new_path
@contextlib.contextmanager
def override_binary(self, binary: pth.AnyPathLike | Binary,
result: Optional[pth.AnyPath]) -> Iterator[None]:
binary_name: pth.AnyPathLike = ""
if isinstance(binary, Binary):
if override := binary.platform_path(self):
binary_name = override[0]
else:
raise RuntimeError("Cannot override binary:"
f" {binary} is not supported supported on {self}")
else:
binary_name = binary
prev_override = self.lookup_binary_override(binary_name)
self.set_binary_lookup_override(binary_name, result)
try:
yield
finally:
self.set_binary_lookup_override(binary_name, prev_override)
def send_signal(self, process: ProcessLike, signal: Signals) -> None:
self.assert_is_local()
if isinstance(process, int):
os.kill(process, signal.value)
else:
process.send_signal(signal.value)
def terminate(self, process: ProcessLike) -> None:
self._handle_process_tree(process, lambda process: process.terminate())
def kill(self, process: ProcessLike) -> None:
self._handle_process_tree(process, lambda process: process.kill())
def terminate_gracefully(self,
process: ProcessLike,
timeout: int = 1,
signal: Optional[Signals] = None) -> None:
proc_helper.terminate_gracefully(self, process, timeout, signal)
def process_pid(self, process: ProcessLike) -> int:
if isinstance(process, int):
return process
if isinstance(process, RemotePopen):
assert self.is_remote, (
f"Cannot access remote process {process} on local platform {self}")
return process.remote_pid
return process.pid
def _handle_process_tree(self, process: ProcessLike,
callback: Callable[[psutil.Process], None]) -> None:
self.assert_is_local()
try:
pid: int = self.process_pid(process)
ps_process = psutil.Process(pid)
for child_process in ps_process.children(recursive=True):
with contextlib.suppress(*proc_helper.PROCESS_NOT_FOUND_EXCEPTIONS):
callback(child_process)
callback(ps_process)
except proc_helper.PROCESS_NOT_FOUND_EXCEPTIONS:
pass
def processes(self,
attrs: Optional[list[str]] = None) -> list[dict[str, Any]]:
# TODO(cbruni): support remote platforms
assert self.is_local, "Only local platform supported"
return self._collect_process_dict(psutil.process_iter(attrs=attrs))
def process_running(self, process_name_list: list[str]) -> Optional[str]:
self.assert_is_local()
# TODO(cbruni): support remote platforms
for proc in psutil.process_iter(attrs=["name"]):
try:
if proc.name().lower() in process_name_list:
return proc.name()
except proc_helper.PROCESS_NOT_FOUND_EXCEPTIONS:
pass
return None
def process_children(self,
parent_pid: int,
recursive: bool = False) -> list[dict[str, Any]]:
self.assert_is_local()
# TODO(cbruni): support remote platforms
try:
process = psutil.Process(parent_pid)
except proc_helper.PROCESS_NOT_FOUND_EXCEPTIONS:
return []
return self._collect_process_dict(process.children(recursive=recursive))
def _collect_process_dict(
self, process_iterator: Iterable[psutil.Process]) -> list[dict[str, Any]]:
process_info_list: list[dict[str, Any]] = []
for process in process_iterator:
with contextlib.suppress(*proc_helper.PROCESS_NOT_FOUND_EXCEPTIONS):
process_info_list.append(process.as_dict())
return process_info_list
def process_info(self, process: ProcessLike) -> Optional[dict[str, Any]]:
self.assert_is_local()
# TODO(cbruni): support remote platforms
try:
pid = self.process_pid(process)
return psutil.Process(pid).as_dict()
except proc_helper.PROCESS_NOT_FOUND_EXCEPTIONS:
return None
def process_meminfo(self, process_name: str,
timeout: dt.timedelta) -> list[ProcessMeminfo]:
del process_name, timeout
raise NotImplementedError(f"process_meminfo not implemented for {self}.")
def system_meminfo(self, timeout: dt.timedelta) -> dict[str, float]:
del timeout
raise NotImplementedError(f"system_meminfo not implemented for {self}.")
def foreground_process(self) -> Optional[dict[str, Any]]:
return None
def dump_java_heap(self, identifier: str, path: pth.AnyPath) -> None:
del identifier, path
raise NotImplementedError(f"dump_java_heap not implemented for {self}.")
@property
def default_tmp_dir(self) -> pth.AnyPath:
return self._default_tmp_dir
@property
def ports(self) -> PortScope:
return self._default_port_manager.scope
def is_port_used(self, port: int) -> bool:
self.assert_is_local()
for conn in psutil.net_connections(kind="inet"):
if conn.status == psutil.CONN_LISTEN and conn.laddr:
if conn.laddr.port == port:
return True
return False
def wait_for_port(self, port: int, timeout: dt.timedelta) -> None:
for _ in wait.wait_with_backoff(timeout):
if self.is_port_used(port):
break
def get_free_port(self) -> int:
self.assert_is_local()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("localhost", 0))
return s.getsockname()[1]
def local_cache_dir(self, name: Optional[str] = None) -> pth.LocalPath:
return self.local_path(self.cache_dir(name))
def cache_dir(self, name: Optional[str] = None) -> pth.AnyPath:
if self._cache_dir_root is None:
self._cache_dir_root = self._lazy_setup_cache_dir()
assert self._cache_dir_root, "missing cache dir"
if not name:
dir = self._cache_dir_root
else:
dir = self._cache_dir_root / pth.safe_filename(name)
self.mkdir(dir, parents=True, exist_ok=True)
return dir
def _lazy_setup_cache_dir(self) -> pth.AnyPath:
if self.is_local and DEFAULT_CACHE_DIR:
return self.local_path(DEFAULT_CACHE_DIR)
tmp_cache_dir = self.default_tmp_dir / "crossbench_cache"
atexit.register(self.rm, tmp_cache_dir, dir=True, missing_ok=True)
return tmp_cache_dir
def set_cache_dir(self, path: pth.AnyPath) -> None:
self._cache_dir_root = self.path(path)
self.mkdir(path, parents=True)
def cat(self, file: pth.AnyPathLike, encoding: str = "utf-8") -> str:
"""Meow! I return the file contents as a str."""
return self.local_path(file).read_text(encoding=encoding)
def cat_bytes(self, file: pth.AnyPathLike) -> bytes:
"""Hiss! I return the file contents as bytes."""
return self.local_path(file).read_bytes()
def read_text(self, file: pth.AnyPathLike, encoding: str = "utf-8") -> str:
return self.cat(file, encoding)
def write_text(self,
file: pth.AnyPathLike,
data: str,
encoding: str = "utf-8") -> None:
self.local_path(file).write_text(data, encoding)
def read_bytes(self, file: pth.AnyPathLike) -> bytes:
return self.cat_bytes(file)
def write_bytes(self, file: pth.AnyPathLike, data: bytes) -> None:
self.local_path(file).write_bytes(data)
def pull(self, from_path: pth.AnyPath,
to_path: pth.LocalPath) -> pth.LocalPath:
""" Download / Copy a (remote) file to the local filesystem.
By default this is just a copy operation on the local filesystem.
"""
self.assert_is_local()
return self.local_path(self.copy_file(from_path, to_path))
def push(self, from_path: pth.LocalPath, to_path: pth.AnyPath) -> pth.AnyPath:
""" Copy a local file to this (remote) platform.
By default this is just a copy operation on the local filesystem.
"""
self.assert_is_local()
return self.copy_file(from_path, to_path)
def copy(self, from_path: pth.AnyPath, to_path: pth.AnyPath) -> pth.AnyPath:
""" Convenience implementation for copying local files and dirs """
if not self.exists(from_path):
raise ValueError(f"Cannot copy non-existing source path: {from_path}")
if self.is_dir(from_path):
return self.copy_dir(from_path, to_path)
return self.copy_file(from_path, to_path)
def copy_dir(self, from_path: pth.AnyPathLike,
to_path: pth.AnyPathLike) -> pth.AnyPath:
from_path = self.local_path(from_path)
to_path = self.local_path(to_path)
if from_path != to_path:
self.mkdir(to_path.parent, parents=True, exist_ok=True)
shutil.copytree(os.fspath(from_path), os.fspath(to_path))
return to_path
def copy_file(self, from_path: pth.AnyPathLike,
to_path: pth.AnyPathLike) -> pth.AnyPath:
from_path = self.local_path(from_path)
to_path = self.local_path(to_path)
if from_path != to_path:
self.mkdir(to_path.parent, parents=True, exist_ok=True)
shutil.copy2(os.fspath(from_path), os.fspath(to_path))
return to_path
def rm(self,
path: pth.AnyPathLike,
dir: bool = False,
missing_ok: bool = False) -> None:
"""Remove a single file on this platform."""
path = self.local_path(path)
if dir:
if missing_ok and not self.exists(path):
return
shutil.rmtree(os.fspath(path))
else:
path.unlink(missing_ok)
def rename(self, src_path: pth.AnyPathLike,
dst_path: pth.AnyPathLike) -> pth.AnyPath:
"""Remove a single file on this platform."""
return self.local_path(src_path).rename(dst_path)
def symlink_or_copy(self, src: pth.AnyPathLike,
dst: pth.AnyPathLike) -> pth.AnyPath:
"""Windows does not support symlinking without admin support.
Copy files on windows (see WinPlatform) but symlink everywhere else."""
assert not self.is_win, "Unsupported operation 'symlink_or_copy' on windows"
dst_path = self.local_path(dst)
dst_path.symlink_to(self.path(src))
return dst_path
def path(self, path: pth.AnyPathLike) -> pth.AnyPath:
""""Used to convert any paths and strings to a platform specific
remote path.
For instance a remote ADB platform on windows returns posix paths:
posix_path = adb_remote_platform.patch(windows_path)
This is used when passing out platform specific paths to remote shell
commands.
"""
return self.local_path(path)
def local_path(self, path: pth.AnyPathLike) -> pth.LocalPath:
self.assert_is_local()
return pth.LocalPath(path)
def absolute(self, path: pth.AnyPathLike) -> pth.AnyPath:
"""Convert an arbitrary path to a platform-specific absolute path"""
platform_path: pth.AnyPath = self.path(path)
if self.is_local:
return self.local_path(platform_path).absolute()
if platform_path.is_absolute():
return platform_path
raise RuntimeError(
f"Converting relative to absolute paths is not supported on {self}")
def is_absolute(self, path: pth.AnyPathLike) -> bool:
path = self.path(path)
return path.is_absolute()
def home(self) -> pth.AnyPath:
return pathlib.Path.home()
def touch(self, path: pth.AnyPathLike) -> None:
self.local_path(path).touch(exist_ok=True)
def mkdir(self,
path: pth.AnyPathLike,
parents: bool = True,
exist_ok: bool = True) -> None:
self.local_path(path).mkdir(parents=parents, exist_ok=exist_ok)
def mkdtemp(self,
suffix: Optional[str] = None,
prefix: Optional[str] = None,
dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath:
self.assert_is_local()
return self.path(tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir))
def mktemp(self,
suffix: Optional[str] = None,
prefix: Optional[str] = None,
dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath:
self.assert_is_local()
fd, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
os.close(fd)
return self.path(name)
@contextlib.contextmanager
def NamedTemporaryFile( # noqa: N802
self,
suffix: Optional[str] = None,
prefix: Optional[str] = None,
dir: Optional[pth.AnyPathLike] = None) -> Iterator[pth.AnyPath]:
tmp_file: pth.AnyPath = self.mktemp(suffix, prefix, dir)
try:
yield tmp_file
finally:
self.rm(tmp_file, missing_ok=True)
@contextlib.contextmanager
def TemporaryDirectory( # noqa: N802
self,
suffix: Optional[str] = None,
prefix: Optional[str] = None,
dir: Optional[pth.AnyPathLike] = None) -> Iterator[pth.AnyPath]:
tmp_dir = self.mkdtemp(suffix, prefix, dir)
try:
yield tmp_dir
finally:
self.rm(tmp_dir, dir=True, missing_ok=True)
def exists(self, path: pth.AnyPathLike) -> bool:
return self.local_path(path).exists()
def is_file(self, path: pth.AnyPathLike) -> bool:
return self.local_path(path).is_file()
def is_dir(self, path: pth.AnyPathLike) -> bool:
return self.local_path(path).is_dir()
def iterdir(self,
path: pth.AnyPathLike) -> Generator[pth.AnyPath, None, None]:
return self.local_path(path).iterdir()
def glob(self, path: pth.AnyPathLike,
pattern: str) -> Generator[pth.AnyPath, None, None]:
# TODO: support remotely
return self.local_path(path).glob(pattern)
def chmod(self, path: pth.AnyPathLike, mode: int) -> None:
self.local_path(path).chmod(mode)
def file_size(self, path: pth.AnyPathLike) -> int:
# TODO: support remotely
return self.local_path(path).stat().st_size
def last_modified(self, path: pth.AnyPathLike) -> float:
return self.local_path(path).stat().st_mtime
def sh_stdout(self,
*args: CmdArg,
shell: bool = False,
quiet: bool = False,
encoding: str = "utf-8",
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
check: bool = True) -> str:
result = self.sh_stdout_bytes(
*args, shell=shell, quiet=quiet, stdin=stdin, env=env, check=check)
return result.decode(encoding)
def sh_stdout_bytes(self,
*args: CmdArg,
shell: bool = False,
quiet: bool = False,
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
check: bool = True) -> bytes:
completed_process = self.sh(
*args,
shell=shell,
capture_output=True,
quiet=quiet,
stdin=stdin,
env=env,
check=check)
return completed_process.stdout
def validate_shell_args(self, args: TupleCmdArgs, shell: bool) -> None:
if shell and len(args) != 1:
raise ValueError("Expected single sh arg with shell=True, "
f"but got: {args}")
def popen(self,
*args: CmdArg,
bufsize: int = -1,
shell: bool = False,
stdout: ProcessIo = None,
stderr: ProcessIo = None,
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False) -> subprocess.Popen:
self.assert_is_local()
self.validate_shell_args(args, shell)
if not quiet:
logging.debug("SHELL: %s", shlex.join(map(str, args)))
logging.debug("CWD: %s", pth.LocalPath.cwd())
return subprocess.Popen(
args=args,
bufsize=bufsize,
shell=shell,
stdin=stdin,
stderr=stderr,
stdout=stdout,
env=env)
def sh(self,
*args: CmdArg,
shell: bool = False,
capture_output: bool = False,
stdout: ProcessIo = None,
stderr: ProcessIo = None,
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False,
check: bool = True) -> subprocess.CompletedProcess:
self.assert_is_local()
self.validate_shell_args(args, shell)
if not quiet:
logging.debug("SHELL: %s", shlex.join(map(str, args)))
logging.debug("CWD: %s", pth.LocalPath.cwd())
process = subprocess.run(
args=args,
shell=shell,
stdin=stdin,
stdout=stdout,
stderr=stderr,
env=env,
capture_output=capture_output,
check=False)
if check and process.returncode != 0:
raise SubprocessError(self, process)
return process
def exec_apple_script(self, script: str, *args: str) -> str:
del script, args
raise NotImplementedError("AppleScript is only available on MacOS")
def log(self, *messages: Any, level: int = 2) -> None:
message_str = " ".join(map(str, messages))
if level == 3:
level = logging.DEBUG
if level == 2:
level = logging.INFO
if level == 1:
level = logging.WARNING
if level == 0:
level = logging.ERROR
logging.log(level, message_str)
# TODO(cbruni): split into separate list_system_monitoring and
# disable_system_monitoring methods
def check_system_monitoring(self, disable: bool = False) -> bool:
del disable
return True
def download_to(self, url: str, path: pth.AnyPath) -> pth.AnyPath:
self.assert_is_local()
logging.debug("DOWNLOAD: %s\n TO: %s", url, path)
assert not self.exists(path), f"Download destination {path} exists already."
url = ObjectParser.url_str(url, schemes=("http", "https"))
try:
urllib.request.urlretrieve(url, path) # noqa: S310
except (urllib.error.HTTPError, urllib.error.URLError) as e:
raise OSError(f"Could not load {url}") from e
assert self.exists(path), (
f"Downloading {url} failed. Downloaded file {path} doesn't exist.")
return path
def download_gcs_file(self, gcs_url: str, local_path: pth.LocalPath) -> None:
blob: gcloud_blob.Blob = self.prepare_gcs_request(gcs_url)
local_path.parent.mkdir(parents=True, exist_ok=True)
blob.download_to_filename(str(local_path))
def prepare_gcs_request(self, gcs_url: str) -> gcloud_blob.Blob:
parsed = ObjectParser.url(gcs_url, schemes=("gs",))
bucket_name = parsed.netloc
object_name = parsed.path.lstrip("/")
if not bucket_name:
raise ValueError(f"Missing bucket name in URL: {gcs_url}")
client = gcloud_storage.Client(project="")
bucket = client.bucket(bucket_name)
blob: gcloud_blob.Blob = bucket.blob(object_name)
blob.reload()
return blob
def concat_files(self,
inputs: Iterable[pth.LocalPath],
output: pth.LocalPath,
prefix: str = "") -> pth.LocalPath:
self.assert_is_local()
with output.open("w", encoding="utf-8") as output_f:
if prefix:
output_f.write(prefix)
for input_file in inputs:
assert input_file.is_file()
with input_file.open(encoding="utf-8") as input_f:
shutil.copyfileobj(input_f, output_f)
return output
@contextlib.contextmanager
def wakelock(self) -> Iterator[None]:
"""
Prevent the system from going to sleep while running active.
"""
logging.debug("Missing wakelock support on %s", self)
yield
def set_main_display_brightness(self, brightness_level: int) -> None:
raise NotImplementedError(
"'set_main_display_brightness' is only available on MacOS for now")
def get_main_display_brightness(self) -> int:
raise NotImplementedError(
"'get_main_display_brightness' is only available on MacOS for now")
def set_display_refresh_rate(self,
refresh_rate: int,
retry: int = 3) -> tuple[bool, str]:
raise NotImplementedError(
"'set_display_refresh_rate' is only available on MacOS for now")
def check_autobrightness(self) -> bool:
raise NotImplementedError(
"'check_autobrightness' is only available on MacOS for now")
def screenshot(self, result_path: pth.AnyPath) -> None:
# TODO: support screen coordinates
raise NotImplementedError("'screenshot' is only available on MacOS for now")
def display_resolution(self) -> tuple[int, int]:
raise NotImplementedError(
"'display_resolution' is only available on Android and ChromeOS for "
"now")
@contextlib.contextmanager
def low_power_mode(self) -> Generator[None, Any, None]:
raise NotImplementedError("'low_power_mode' is only supported on Android")
def user_id(self) -> int:
self.assert_is_local()
return os.getuid()