blob: 9974131fd593ddf56d463a15c6bc795b6c358e71 [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import collections
import dataclasses
import functools
import pathlib
import shlex
import subprocess
from typing import (TYPE_CHECKING, Any, Iterable, Mapping, MutableMapping,
Optional, Sequence)
import psutil
from typing_extensions import override
from crossbench import path as pth
from crossbench import plt
from crossbench.benchmarks.base import SubStoryBenchmark
from crossbench.cli.cli import CrossBenchCLI
from crossbench.plt.android_adb import Adb, AndroidAdbPlatform
from crossbench.plt.base import MachineArch, Platform, SubprocessError
from crossbench.plt.chromeos_ssh import ChromeOsSshPlatform
from crossbench.plt.ios import IOSPlatform
from crossbench.plt.linux import LinuxPlatform, RemoteLinuxPlatform
from crossbench.plt.linux_ssh import LinuxSshPlatform
from crossbench.plt.macos import MacOSPlatform
from crossbench.plt.port_manager import LocalPortManager, PortManager
from crossbench.plt.process_meminfo import ProcessMeminfo
from crossbench.plt.win import WinPlatform
from crossbench.stories.story import Story
if TYPE_CHECKING:
import datetime as dt
from crossbench.plt.types import CmdArg, ListCmdArgs, TupleCmdArgs
from crossbench.runner.run import Run
from crossbench.runner.runner import Runner
GIB = 1014**3
@dataclasses.dataclass(frozen=True)
class DownloadMockData:
url: str
path: pth.AnyPath
data: bytes | None = None
class ShResult:
def __init__(self, result: str | bytes = "", success: bool = True) -> None:
if isinstance(result, str):
result = result.encode("utf-8")
assert isinstance(result, bytes)
self._result = result
self._success = success
@property
def result(self) -> bytes:
return self._result
@property
def stdout(self) -> bytes:
return self.result
@property
def success(self) -> bool:
return self._success
class TrackingPortManagerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.forwarded_ports: dict[int, int] = {}
self.reverse_forwarded_ports: dict[int, int] = {}
def forward(self, local_port: int, remote_port: int) -> int:
local_port = super().forward(local_port, remote_port)
self.forwarded_ports[local_port] = remote_port
return local_port
def stop_forward(self, local_port: int) -> None:
if local_port in self.forwarded_ports:
del self.forwarded_ports[local_port]
def reverse_forward(self, remote_port: int, local_port: int) -> int:
remote_port = super().reverse_forward(remote_port, local_port)
self.reverse_forwarded_ports[remote_port] = local_port
return remote_port
def stop_reverse_forward(self, remote_port: int) -> None:
if remote_port in self.reverse_forwarded_ports:
del self.reverse_forwarded_ports[remote_port]
class MockRemoterPortManagerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.current_port = 60000
def _next_port(self) -> int:
self.current_port += 1
return self.current_port
def reverse_forward(self, remote_port: int, local_port: int) -> int:
del local_port
if remote_port == 0:
return self._next_port()
return remote_port
def forward(self, local_port: int, remote_port: int) -> int:
del remote_port
if local_port == 0:
return self._next_port()
return local_port
class MockLocalPortManager(TrackingPortManagerMixin, LocalPortManager):
pass
class MockRemotePortManager(TrackingPortManagerMixin,
MockRemoterPortManagerMixin, PortManager):
pass
class MockPlatformMixin:
def __init__(self, *args, is_battery_powered=False, **kwargs):
self._is_battery_powered = is_battery_powered
# Cache some helper properties that might fail under pyfakefs.
self._sh_cmds: list[TupleCmdArgs] = []
self._expected_sh_cmds: list[TupleCmdArgs] | None = None
self._sh_results: list[ShResult] = []
self._download_results: list[DownloadMockData] = []
self.file_contents: MutableMapping[pth.AnyPath, list[str]] = (
collections.defaultdict(list))
self.sleeps: list[dt.timedelta] = []
self.use_mock_machine = True
self.use_mock_name = True
self.use_fs = False
self._machine_arch: [MachineArch] = None # type: ignore
self.popens: list[MockPopen] = []
self.mkdir_calls: int = 0
self.screenshots: list[pth.AnyPath] = []
super().__init__(*args, **kwargs)
@property
def has_display(self) -> bool:
return True
def _create_port_manager(self) -> PortManager:
if self.is_local:
return MockLocalPortManager(self)
return MockRemotePortManager(self)
@property
def port_manager(self) -> PortManager:
return self._default_port_manager
def os_details(self):
return {
"system": "mock os system",
"release": "mock os release",
"version": "mock os version",
"platform": "mock os platform",
}
def expect_download(self,
url: str,
path: pth.AnyPath,
data: Optional[bytes] = None):
self._download_results.append(DownloadMockData(url, path, data))
def download_to(self, url: str, path: pth.AnyPath) -> pth.AnyPath:
assert self._download_results, (
f"No more download test data, but requested: {url}")
provided_data = self._download_results.pop()
assert url == provided_data.url, (f"Expected download url {url}, "
f"but got: {provided_data.url}")
assert path == provided_data.path, (
f"Expected download result path {path}, but got: {provided_data.path}")
if provided_data.data:
pathlib.Path(path).write_bytes(provided_data.data)
else:
self.touch(path)
return path
def expect_sh(
self, *args: CmdArg | int,
result: bytes | str | ShResult = ShResult()) -> None:
if args:
if self._expected_sh_cmds is None:
self._expected_sh_cmds = []
self._expected_sh_cmds.append(self._convert_sh_args(*args))
if isinstance(result, str):
result = ShResult(result)
if isinstance(result, bytes):
result = ShResult(result)
assert isinstance(result, ShResult)
self._sh_results.append(result)
def _convert_sh_args(self, *args: CmdArg | int) -> TupleCmdArgs:
converted_args : ListCmdArgs = []
for arg in args:
if not isinstance(arg, (str, pathlib.PurePath)):
arg = str(arg)
converted_args.append(arg)
return tuple(converted_args)
@property
def sh_results(self) -> list[ShResult]:
return list(self._sh_results)
@sh_results.setter
def sh_results(self, results: Iterable[ShResult]) -> None:
assert not self._sh_results, "Trying to override non-consumed results"
assert not self._expected_sh_cmds, (
"expect_sh() cannot be used together with sh_results")
for result in results:
self.expect_sh(result=result)
@property
def sh_cmds(self) -> list[TupleCmdArgs]:
return list(self._sh_cmds)
@property
def expected_sh_cmds(self) -> Optional[list[TupleCmdArgs]]:
if self._expected_sh_cmds is None:
return None
return list(self._expected_sh_cmds)
@property
def name(self) -> str:
if self.use_mock_name:
return f"mock.{super().name}"
return super().name
@property
def machine(self) -> MachineArch:
if not self.use_mock_machine:
return super().machine
if self._machine_arch:
return self._machine_arch
return MachineArch.ARM_64
@machine.setter
def machine(self, value: MachineArch) -> None:
self._machine_arch = value
@property
def version_str(self) -> str:
return "1.2.3.4.5"
@property
def device(self) -> str:
return "TestBook Pro"
@property
def cpu(self) -> str:
return "Mega CPU @ 3.00GHz"
@property
def is_battery_powered(self) -> bool:
return self._is_battery_powered
def is_thermal_throttled(self) -> bool:
return False
def disk_usage(self, path: pth.AnyPathLike) -> psutil._common.sdiskusage:
del path
# pylint: disable=protected-access
return psutil._common.sdiskusage(
total=GIB * 100, used=20 * GIB, free=80 * GIB, percent=20)
def cpu_usage(self) -> float:
return 0.1
@functools.lru_cache(maxsize=1)
def cpu_details(self) -> dict[str, Any]:
return {"physical cores": 2, "logical cores": 4, "info": self.cpu}
def write_text(self,
file: pth.AnyPathLike,
data: str,
encoding: str = "utf-8") -> None:
file_path = self.path(file)
self.file_contents[file_path].append(data)
if self.use_fs:
super().write_text(file_path, data, encoding)
@functools.lru_cache(maxsize=1)
def system_details(self):
return {"CPU": "20-core 3.1 GHz"}
def sleep(self, duration):
self.sleeps.append(duration)
def processes(self, attrs=()):
del attrs
return []
def process_children(self, parent_pid: int, recursive=False):
del parent_pid, recursive
return []
def foreground_process(self):
return None
def search_platform_binary(
self,
name: str,
macos: Sequence[str] = (),
win: Sequence[str] = (),
linux: Sequence[str] = ()
) -> pth.AnyPath:
del macos, win, linux
return self.path(f"/usr/bin/{name}")
def sh_stdout_bytes(self,
*args: CmdArg,
shell: bool = False,
quiet: bool = False,
stdin=None,
env: Optional[Mapping[str, str]] = None,
check: bool = True) -> bytes:
del shell, quiet, stdin, env, check
if self._expected_sh_cmds is not None:
assert self._expected_sh_cmds, (
f"Missing expected sh_cmds, but got: {args}")
# Convert all args to str first, sh accepts both str and Paths.
expected = tuple(map(str, self._expected_sh_cmds[0]))
str_args = tuple(map(str, args))
assert expected == str_args, (f"After {len(self._sh_cmds)} cmds: \n"
f" expected: {expected}\n"
f" got: {str_args}")
self._expected_sh_cmds.pop(0)
self._sh_cmds.append(args)
if not self._sh_results:
cmd = shlex.join(map(str, args))
raise ValueError(f"After {len(self._sh_cmds)} cmds: "
f"MockPlatform has no more sh outputs for cmd: {cmd}")
sh_result = self._sh_results.pop(0)
if not sh_result.success:
raise SubprocessError(self, subprocess.CompletedProcess(args, -1))
return sh_result.result
def sh(self,
*args: CmdArg,
shell: bool = False,
capture_output: bool = False,
stdout=None,
stderr=None,
stdin=None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False,
check: bool = True):
del capture_output, stderr, stdin, stdout
result = self.sh_stdout(
*args, shell=shell, quiet=quiet, env=env, check=check)
# TODO: Generalize this in the future, to mimic failing `sh` calls.
return subprocess.CompletedProcess(args, 0, stdout=result.encode("utf-8"))
def popen(self,
*args: CmdArg,
bufsize=-1,
shell: bool = False,
stdout=None,
stderr=None,
stdin=None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False) -> MockPopen:
del bufsize, stdout, stderr, stdin
self.sh_stdout(*args, shell=shell, quiet=quiet, env=env)
if not self.popens:
raise ValueError("No valid mock popen.")
return self.popens.pop(0)
def mkdir(self,
path: pth.AnyPathLike,
parents: bool = True,
exist_ok: bool = True) -> None:
super().mkdir(path, parents, exist_ok)
self.mkdir_calls += 1
def process_meminfo(self, process_name: str,
timeout: dt.timedelta) -> list[ProcessMeminfo]:
del timeout
return [
ProcessMeminfo(1, process_name, 2, 3, 4),
ProcessMeminfo(2, process_name, 3, 4, 5),
]
def system_meminfo(self, timeout: dt.timedelta) -> dict[str, float]:
del timeout
return {
"total_ram_kb": 5,
"cached_pss_kb": 4,
"cached_kernel_kb": 3,
"free_kb": 2,
}
@override
def screenshot(self, result_path: pth.AnyPath) -> None:
self.screenshots.append(result_path)
class MockFd:
def __init__(self):
self.expected_writes: list[bytes] = []
self.read_returns: list[bytes] = []
def __del__(self):
assert not self.expected_writes
assert not self.read_returns
def write(self, data: bytes):
if not self.expected_writes:
raise ValueError("No expected writes.")
expected = self.expected_writes.pop(0)
assert data == expected, (
f"Expected write does not match. Expected: {expected} Got: {data!r}")
def readline(self):
if not self.read_returns:
raise ValueError("No read returns.")
return self.read_returns.pop(0)
def flush(self):
return
class MockPopen:
def __init__(self, stdout: MockFd, stdin: MockFd):
self._stdout: MockFd = stdout
self._stdin: MockFd = stdin
def poll(self):
return
def kill(self):
return
def wait(self):
return
@property
def stdin(self):
return self._stdin
@property
def stdout(self):
return self._stdout
class PosixMockPlatformMixin(MockPlatformMixin):
pass
class WinMockPlatformMixin(MockPlatformMixin):
# TODO: use wrapper fake path to get windows-path formatting by default
# when running on posix.
def path(self, path: pth.AnyPathLike) -> pth.AnyPath:
return pathlib.PureWindowsPath(path)
class LinuxMockPlatform(PosixMockPlatformMixin, LinuxPlatform):
pass
class RemoteLinuxMockPlatform(PosixMockPlatformMixin, RemoteLinuxPlatform):
pass
class LinuxSshMockPlatform(PosixMockPlatformMixin, LinuxSshPlatform):
pass
class ChromeOsSshMockPlatform(PosixMockPlatformMixin, ChromeOsSshPlatform):
pass
class MacOsMockPlatform(PosixMockPlatformMixin, MacOSPlatform):
pass
class MacIOSMockPlatform(PosixMockPlatformMixin, IOSPlatform):
pass
class WinMockPlatform(WinMockPlatformMixin, WinPlatform):
pass
class MockAdb(Adb):
@override
def start_server(self) -> None:
pass
@override
def stop_server(self) -> None:
pass
@override
def kill_server(self) -> None:
pass
class AndroidAdbMockPlatform(MockPlatformMixin, AndroidAdbPlatform):
pass
class GenericMockPlatform(MockPlatformMixin, Platform):
pass
if plt.PLATFORM.is_linux:
MockPlatform = LinuxMockPlatform
elif plt.PLATFORM.is_macos:
MockPlatform = MacOsMockPlatform
elif plt.PLATFORM.is_win:
MockPlatform = WinMockPlatform
else:
raise RuntimeError(f"Unsupported platform: {plt.PLATFORM}")
class MockStory(Story):
@classmethod
@override
def all_story_names(cls):
return ["story_1", "story_2"]
def run(self, run: Run) -> None:
pass
class MockBenchmark(SubStoryBenchmark):
NAME = "mock-benchmark"
DEFAULT_STORY_CLS = MockStory
class MockCLI(CrossBenchCLI):
runner: Runner
platform: Platform
def __init__(self, platform: Platform, enable_logging: bool = True) -> None:
self.platform = platform
super().__init__(enable_logging=enable_logging)