blob: 3c14d9b5eb8bbd31f0e99d4ee90cc28b1503148b [file] [log] [blame]
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import contextlib
import dataclasses
import datetime as dt
import functools
import logging
import math
import re
import shlex
from typing import TYPE_CHECKING, Any, Final, Generator, Mapping, Optional
from mobly.controllers import android_device
from snippet_uiautomator import uiautomator
from typing_extensions import override
from crossbench import path as pth
from crossbench.flags.base import Flags, FlagsData
from crossbench.helper.path_finder import BundletoolFinder
from crossbench.parse import NumberParser
from crossbench.plt.arch import MachineArch
from crossbench.plt.base import SubprocessError
from crossbench.plt.bin import Binaries
from crossbench.plt.device_info import DeviceInfo
from crossbench.plt.port_manager import PortManager
from crossbench.plt.posix import PosixVersion, RemotePosixPlatform
from crossbench.plt.process_meminfo import ProcessMeminfo
from protoc import activitymanagerservice_pb2, battery_pb2, enums_pb2, \
windowmanagerservice_pb2
if TYPE_CHECKING:
import subprocess
from crossbench.plt.base import Platform
from crossbench.plt.display_info import DisplayInfo
from crossbench.plt.types import CmdArg, ListCmdArgs, ProcessIo
from crossbench.types import JsonDict
# Defines the Android permissions to be granted.
# TODO(381985595): make this configurable.
ANDROID_PERMISSIONS: Final = ("POST_NOTIFICATIONS", "CAMERA", "RECORD_AUDIO")
@dataclasses.dataclass(frozen=True)
class AndroidDeviceInfo(DeviceInfo):
model: str = ""
product: str = ""
transport_id: str = ""
@property
def serial_id(self) -> str:
return self.device_id
def _find_adb_bin(platform: Platform,
adb_bin: Optional[pth.AnyPath] = None) -> pth.AnyPath:
if adb_bin:
return platform.parse_binary_path(adb_bin)
if adb_bin := Binaries.ADB.search(platform):
return adb_bin
raise ValueError(
"Could not find adb binary."
"See https://developer.android.com/tools/adb fore more details.")
def adb_devices(
platform: Platform,
adb_bin: Optional[pth.AnyPath] = None) -> dict[str, AndroidDeviceInfo]:
adb_bin = adb_bin or _find_adb_bin(platform)
output = platform.sh_stdout(adb_bin, "devices", "-l")
raw_lines = output.strip().splitlines()[1:]
result: dict[str, AndroidDeviceInfo] = {}
for line in raw_lines:
serial_id, details_str = line.split(" ", maxsplit=1)
details: dict[str, str] = _parse_adb_device_info(details_str)
device = AndroidDeviceInfo(
device_id=serial_id.strip(),
name=details.get("device", ""),
model=details.get("model", ""),
product=details.get("product", ""),
transport_id=details.get("transport_id", ""))
result[device.serial_id] = device
return result
def _parse_adb_device_info(value: str) -> dict[str, str]:
"""
Convert a line from adb devices -l into a descriptive dictionary.
`value` is a line of output, typically:
ABCDEF01234567 device 2-1 product:shiba model:AOSP device:shiba transport_id:3
Some older versions of adb would not contain the `2-1` part.
"""
parts = value.strip().split(" ")
assert parts[0], "device"
return dict(part.split(":") for part in parts[1:] if ":" in part)
class Adb:
def __init__(self,
host_platform: Platform,
device_identifier: Optional[str] = None,
adb_bin: Optional[pth.AnyPath] = None,
bundletool: Optional[pth.AnyPath] = None) -> None:
self._host_platform: Final[Platform] = host_platform
self._adb_bin: Final[pth.AnyPath] = _find_adb_bin(host_platform, adb_bin)
self._bundletool: Final[pth.AnyPath
| None] = BundletoolFinder.find_binary(
host_platform, bundletool)
serial_id, device_info = self._start(device_identifier)
self._serial_id: Final[str] = serial_id
self._device_info: Final[AndroidDeviceInfo] = device_info
def _start(
self,
device_identifier: Optional[str] = None) -> tuple[str, AndroidDeviceInfo]:
self.start_server()
serial_id, device_info = self._find_serial_id(device_identifier)
logging.debug("ADB Selected device: %s %s", serial_id, device_info)
assert serial_id
return serial_id, device_info
def _find_serial_id(
self,
device_identifier: Optional[str] = None) -> tuple[str, AndroidDeviceInfo]:
devices = self.devices()
if not devices:
raise ValueError("adb could not find any attached devices."
"Connect your device and use 'adb devices' to list all.")
if device_identifier is None:
if len(devices) != 1:
raise ValueError(
f"Too many adb devices attached, please specify one of: {devices}")
device_identifier = list(devices.keys())[0]
if not device_identifier:
raise ValueError(f"Invalid device identifier: {repr(device_identifier)}")
if device_identifier in devices:
return device_identifier, devices[device_identifier]
matches: list[str] = []
under_name = device_identifier.replace(" ", "_")
for key, device in devices.items():
if device_identifier in device.model or under_name in device.model:
matches.append(key)
if not matches:
raise ValueError(
f"Could not find adb device matching: '{device_identifier}'")
if len(matches) > 1:
raise ValueError(
f"Found {len(matches)} adb devices matching: '{device_identifier}'.\n"
f"Choices: {matches}")
return matches[0], devices[matches[0]]
def __str__(self) -> str:
info = f"info='{self._device_info}'"
if model := self._device_info.model:
info = f"model={repr(model)}"
return f"adb(device_id={repr(self._serial_id)}, {info})"
def has_root(self) -> bool:
return self.shell_stdout("id").startswith("uid=0(root)")
def path(self, path: pth.AnyPathLike) -> pth.AnyPath:
return pth.AnyPosixPath(path)
@property
def serial_id(self) -> str:
return self._serial_id
@functools.cached_property
def build_version(self) -> int:
return int(self.getprop("ro.build.version.release"))
@functools.cached_property
def build_description(self) -> str:
return self.getprop("ro.build.description")
@property
def device_info(self) -> AndroidDeviceInfo:
return self._device_info
def _build_adb_cmd(self,
*args: CmdArg,
use_serial_id: bool = True) -> ListCmdArgs:
adb_cmd: ListCmdArgs = [self._adb_bin]
if use_serial_id:
adb_cmd.extend(("-s", self._serial_id))
adb_cmd.extend(args)
return adb_cmd
def _adb(self,
*args: CmdArg,
shell: bool = False,
capture_output: bool = False,
stdout: ProcessIo = None,
stderr: ProcessIo = None,
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False,
check: bool = True,
use_serial_id: bool = True) -> subprocess.CompletedProcess:
del shell
adb_cmd = self._build_adb_cmd(*args, use_serial_id=use_serial_id)
return self._host_platform.sh(
*adb_cmd,
capture_output=capture_output,
stdout=stdout,
stderr=stderr,
stdin=stdin,
env=env,
quiet=quiet,
check=check)
def _adb_stdout(self,
*args: CmdArg,
quiet: bool = False,
stdin: ProcessIo = None,
encoding: str = "utf-8",
use_serial_id: bool = True,
check: bool = True) -> str:
result = self._adb_stdout_bytes(
*args,
quiet=quiet,
stdin=stdin,
use_serial_id=use_serial_id,
check=check)
return result.decode(encoding)
def _adb_stdout_bytes(self,
*args: CmdArg,
quiet: bool = False,
stdin: ProcessIo = None,
use_serial_id: bool = True,
check: bool = True) -> bytes:
adb_cmd = self._build_adb_cmd(*args, use_serial_id=use_serial_id)
return self._host_platform.sh_stdout_bytes(
*adb_cmd, quiet=quiet, check=check, stdin=stdin)
def _get_current_user(self) -> str | None:
try:
return self.shell_stdout("am", "get-current-user").strip()
except SubprocessError as e:
logging.info(
"get-current-user failed, return code %d, stderr %s, stdout %s",
e.returncode, e.stderr, e.stdout)
return None
def build_shell_cmd(self, *args: CmdArg, shell: bool = False) -> ListCmdArgs:
self._host_platform.validate_shell_args(args, shell)
shell_cmd: ListCmdArgs = ["shell"]
if not shell:
shell_cmd.append(shlex.join(map(str, args)))
elif len(args) == 1:
shell_cmd.append(args[0])
else:
raise ValueError("Expected single sh arg with shell=True, "
f"but got: {args}")
adb_shell_cmd = self._build_adb_cmd(*shell_cmd)
return adb_shell_cmd
def shell_stdout(self,
*args: CmdArg,
shell: bool = False,
quiet: bool = False,
encoding: str = "utf-8",
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
check: bool = True) -> str:
result = self.shell_stdout_bytes(
*args, shell=shell, quiet=quiet, stdin=stdin, env=env, check=check)
return result.decode(encoding)
def shell_stdout_bytes(self,
*args: CmdArg,
shell: bool = False,
quiet: bool = False,
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
check: bool = True) -> bytes:
# -e: choose escape character, or "none"; default '~'
# -n: don't read from stdin
# -T: disable pty allocation
# -t: allocate a pty if on a tty (-tt: force pty allocation)
# -x: disable remote exit codes and stdout/stderr separation
if env:
raise ValueError("ADB shell only supports an empty env for now.")
shell_cmd = self.build_shell_cmd(*args, shell=shell)
return self._host_platform.sh_stdout_bytes(
*shell_cmd, stdin=stdin, quiet=quiet, check=check)
def shell(self,
*args: CmdArg,
shell: bool = False,
capture_output: bool = False,
stdout: ProcessIo = None,
stderr: ProcessIo = None,
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False,
check: bool = True) -> subprocess.CompletedProcess:
if env:
raise ValueError("ADB shell only supports an empty env for now.")
# See shell_stdout for more `adb shell` options.
shell_cmd = self.build_shell_cmd(*args, shell=shell)
return self._host_platform.sh(
*shell_cmd,
capture_output=capture_output,
stdout=stdout,
stderr=stderr,
stdin=stdin,
env=env,
quiet=quiet,
check=check)
def start_server(self) -> None:
self._adb_stdout("start-server", use_serial_id=False)
def stop_server(self) -> None:
self.kill_server()
def kill_server(self) -> None:
self._adb_stdout("kill-server", use_serial_id=False)
def root(self) -> None:
self._adb("root", use_serial_id=False)
def unroot(self) -> None:
self._adb("unroot", use_serial_id=False)
def devices(self) -> dict[str, AndroidDeviceInfo]:
return adb_devices(self._host_platform, self._adb_bin)
def forward(self,
local: int,
remote: int | str,
local_protocol: str = "tcp",
remote_protocol: str = "tcp",
flags_data: FlagsData = None) -> int:
cmd_args: list[Any] = ["forward"]
if flags_data:
parsed_flags = Flags(flags_data)
cmd_args.extend(list(parsed_flags))
cmd_args.append(f"{local_protocol}:{local}")
cmd_args.append(f"{remote_protocol}:{remote}")
stdout = self._adb_stdout(*cmd_args).strip()
if not stdout:
used_ports = self._adb_stdout("forward", "--list")
raise ValueError(
f"Could not setup port-forwarding, ports in use:\n{used_ports}")
local_port = NumberParser.port_number(stdout, "local_port")
return local_port
def forward_remove(self, local: int, protocol: str = "tcp") -> None:
self._adb("forward", "--remove", f"{protocol}:{local}")
def reverse(self, remote: int, local: int, protocol: str = "tcp") -> int:
stdout = self._adb_stdout("reverse", f"{protocol}:{remote}",
f"{protocol}:{local}").strip()
if not stdout:
used_ports = self._adb_stdout("reverse", "--list")
raise ValueError("Could not setup reverse port-forwarding, "
f"ports in use:\n{used_ports}")
remote_port = NumberParser.port_number(stdout, "remote_port")
return remote_port
def reverse_remove(self, remote: int, protocol: str = "tcp") -> None:
self._adb("reverse", "--remove", f"{protocol}:{remote}")
def pull(self, device_src_path: pth.AnyPath,
local_dest_path: pth.LocalPath) -> None:
self._adb("pull", self.path(device_src_path), local_dest_path)
def push(self, local_src_path: pth.LocalPath,
device_dest_path: pth.AnyPath) -> None:
self._adb("push", local_src_path, self.path(device_dest_path))
def cmd(self,
*args: str,
quiet: bool = False,
encoding: str = "utf-8") -> str:
cmd: ListCmdArgs = ["cmd", *args]
return self.shell_stdout(*cmd, quiet=quiet, encoding=encoding)
def dumpsys(self,
*args: str,
quiet: bool = False,
encoding: str = "utf-8") -> str:
cmd: ListCmdArgs = ["dumpsys", *args]
return self.shell_stdout(*cmd, quiet=quiet, encoding=encoding)
def dumpsys_bytes(self, *args: str, quiet: bool = False) -> bytes:
cmd: ListCmdArgs = ["dumpsys", *args]
return self.shell_stdout_bytes(*cmd, quiet=quiet)
def getprop(self,
*args: str,
quiet: bool = False,
encoding: str = "utf-8") -> str:
cmd: ListCmdArgs = ["getprop", *args]
return self.shell_stdout(*cmd, quiet=quiet, encoding=encoding).strip()
def services(self, quiet: bool = False, encoding: str = "utf-8") -> list[str]:
lines = list(
self.cmd("-l", quiet=quiet, encoding=encoding).strip().splitlines())
lines = lines[1:]
lines.sort()
return [line.strip() for line in lines]
def packages(self, quiet: bool = False, encoding: str = "utf-8") -> list[str]:
# adb shell cmd package list packages
raw_list = self.cmd(
"package", "list", "packages", quiet=quiet,
encoding=encoding).strip().splitlines()
packages = [package.split(":", maxsplit=2)[1] for package in raw_list]
packages.sort()
return packages
def force_stop(self, package_name: str) -> None:
if not package_name:
raise ValueError("Got empty package name")
self.shell("am", "force-stop", package_name)
def force_clear(self, package_name: str) -> None:
if not package_name:
raise ValueError("Got empty package name")
cmd: ListCmdArgs = ["pm", "clear"]
if user := self._get_current_user():
cmd.extend(["--user", user])
cmd.extend([package_name])
self.shell(*cmd)
def install(self,
bundle: pth.AnyPath,
allow_downgrade: bool = False,
modules: Optional[str] = None) -> None:
if bundle.suffix == ".apks":
self.install_apks(bundle, allow_downgrade, modules)
if bundle.suffix == ".apk":
self.install_apk(bundle, allow_downgrade)
def install_apk(self,
apk: pth.AnyPath,
allow_downgrade: bool = False) -> None:
if not self._host_platform.exists(apk):
raise ValueError(f"APK {apk} does not exist {self._host_platform}.")
args = ["install"]
if allow_downgrade:
args.append("-d")
args.append(str(apk))
self._adb(*args)
def install_apks(self,
apks: pth.AnyPath,
allow_downgrade: bool = False,
modules: Optional[str] = None) -> None:
if not self._host_platform.exists(apks):
raise ValueError(f"APK {apks} does not exist on {self._host_platform}.")
if self._bundletool is None:
raise RuntimeError(
"Could not find bundletool binary required for install_apks")
if self._bundletool.suffix == ".jar":
binary = ["java", "-jar", str(self._bundletool)]
else:
binary = [str(self._bundletool)]
cmd = binary + [
"install-apks",
f"--apks={apks}",
f"--adb={self._adb_bin}",
f"--device-id={self._serial_id}",
]
if allow_downgrade:
cmd.append("--allow-downgrade")
if modules:
cmd.append(f"--modules={modules}")
self._host_platform.sh(*cmd)
def uninstall(self, package_name: str, missing_ok: bool = False) -> None:
if not package_name:
raise ValueError("Got empty package name")
try:
self._adb("uninstall", package_name)
except Exception as e: # noqa: BLE001
if missing_ok:
logging.debug("Could not uninstall %s: %s", package_name, e)
else:
raise
def grant_permissions(self, package_name: str) -> None:
if self.build_version < 13:
# Notification permission setting is needed for Android 13 and above.
# https://developer.android.com/develop/ui/views/notifications/notification-permission
return
if not package_name:
raise ValueError("Got empty package name")
user: str | None = self._get_current_user()
for perm in ANDROID_PERMISSIONS:
cmd: ListCmdArgs = ["pm", "grant"]
if user:
cmd.extend(["--user", user])
cmd.extend([package_name, f"android.permission.{perm}"])
self.shell(*cmd)
class AndroidAdbPortManager(PortManager):
def __init__(self, platform: AndroidAdbPlatform, adb: Adb) -> None:
super().__init__(platform)
self._adb: Adb = adb
@property
def host_platform(self) -> Platform:
return self._platform.host_platform
@override
def forward(self, local_port: int, remote_port: int) -> int:
local_port = NumberParser.positive_zero_int(local_port, "local_port")
remote_port = NumberParser.port_number(remote_port, "remote_port")
local_port = self._adb.forward(
local_port, remote_port, local_protocol="tcp", remote_protocol="tcp")
logging.debug("Forwarded Remote Port: %s:%s <= %s:%s",
self.host_platform.name, local_port, self, remote_port)
return local_port
@override
def forward_devtools(self, local_port: int, remote_identifier: str) -> int:
local_port = NumberParser.positive_zero_int(local_port, "local_port")
local_port = self._adb.forward(
local=local_port,
remote=remote_identifier,
local_protocol="tcp",
remote_protocol="localabstract")
logging.debug("Forwarded DevTools Port: %s:%s <= %s:%s",
self.host_platform.name, local_port, self, remote_identifier)
return local_port
@override
def stop_forward(self, local_port: int) -> None:
self._adb.forward_remove(local_port, protocol="tcp")
@override
def reverse_forward(self, remote_port: int, local_port: int) -> int:
remote_port = NumberParser.positive_zero_int(remote_port, "remote_port")
local_port = NumberParser.port_number(local_port, "local_port")
remote_port = self._adb.reverse(remote_port, local_port, protocol="tcp")
logging.debug("Forwarded Local Port: %s:%s => %s:%s", self.host_platform,
local_port, self, remote_port)
return remote_port
@override
def stop_reverse_forward(self, remote_port: int) -> None:
self._adb.reverse_remove(remote_port, protocol="tcp")
class AndroidVersion(PosixVersion):
pass
class AndroidAdbPlatform(RemotePosixPlatform):
def __init__(self,
host_platform: Platform,
device_identifier: Optional[str] = None,
adb: Optional[Adb] = None) -> None:
assert not host_platform.is_remote, (
"adb on remote platform is not supported yet")
self._adb: Final[Adb] = adb or Adb(host_platform, device_identifier)
super().__init__(host_platform)
def _create_port_manager(self) -> PortManager:
return AndroidAdbPortManager(self, self._adb)
@override
def _create_default_tmp_dir(self) -> pth.AnyPath:
return self.path("/data/local/tmp/")
@property
@override
def is_android(self) -> bool:
return True
@property
@override
def name(self) -> str:
return "android"
@functools.cached_property
@override
def version_str(self) -> str:
return str(self.adb.build_version)
@functools.cached_property
@override
def version(self) -> AndroidVersion:
return AndroidVersion.parse(self.adb.build_description)
@functools.cached_property
@override
def model(self) -> str:
return self.adb.getprop("ro.product.model")
@property
def serial_id(self) -> str:
return self._adb.serial_id
@functools.cached_property
def uiautomator_device(self) -> android_device.AndroidDevice:
ad = android_device.AndroidDevice(self.serial_id)
ad.services.register(uiautomator.ANDROID_SERVICE_NAME,
uiautomator.UiAutomatorService)
return ad
@functools.cached_property
@override
def cpu(self) -> str:
variant = self.adb.getprop("dalvik.vm.isa.arm.variant")
platform = self.adb.getprop("ro.board.platform")
cpu_str = f"{variant} {platform}"
# Some android devices do not populate props for CPU info.
# In that case, fallback to attempting to parse /proc/cpuinfo
if not variant or not platform:
return super().cpu
if num_cores := self.cpu_cores(logical=False):
cpu_str = f"{cpu_str} {num_cores} cores"
return cpu_str
def cpu_usage(self) -> float:
return math.nan
@property
def adb(self) -> Adb:
return self._adb
_MACHINE_ARCH_LOOKUP: Final = {
"arm64-v8a": MachineArch.ARM_64,
"armeabi-v7a": MachineArch.ARM_32,
"x86": MachineArch.IA32,
"x86_64": MachineArch.X64,
}
@functools.cached_property
@override
def machine(self) -> MachineArch:
cpu_abi = self.adb.getprop("ro.product.cpu.abi")
arch = self._MACHINE_ARCH_LOOKUP.get(cpu_abi, None)
if not arch:
raise ValueError(f"Unknown android CPU ABI: {cpu_abi}")
return arch
@override
def get_relative_cpu_speed(self) -> float:
# TODO figure out
return 1.0
def app_path_to_package(self, app_path: pth.AnyPathLike) -> str:
path = self.path(app_path)
parts = path.parts
if len(parts) > 1:
raise ValueError(f"Invalid android package name: '{path}'")
package: str = parts[0]
packages = self.adb.packages()
if package not in packages:
raise ValueError(f"Package '{package}' is not installed on {self._adb}")
return package
@override
def search_binary(self, app_or_bin: pth.AnyPathLike) -> Optional[pth.AnyPath]:
app_or_bin_path = self.path(app_or_bin)
if not app_or_bin_path.parts:
raise ValueError("Got empty path")
if result_path := self.which(app_or_bin_path):
return result_path
if str(app_or_bin) in self.adb.packages():
return app_or_bin_path
return None
@override
def home(self) -> pth.AnyPath:
raise RuntimeError("Cannot access home dir on (non-rooted) android device")
_VERSION_NAME_RE: Final[re.Pattern] = re.compile(
r"versionName=(?P<version>.+)")
@override
def app_version(self, app_or_bin: pth.AnyPathLike) -> str:
# adb shell dumpsys package com.chrome.canary | grep versionName -C2
package = self.app_path_to_package(app_or_bin)
package_info = self.adb.dumpsys("package", str(package))
match_result = self._VERSION_NAME_RE.search(package_info)
if match_result is None:
raise ValueError(
f"Could not find version for '{package}': {package_info}")
return match_result.group("version")
@override
def process_children(self,
parent_pid: int,
recursive: bool = False) -> list[dict[str, Any]]:
# TODO: implement
return []
@override
def foreground_process(self) -> Optional[dict[str, Any]]:
# adb shell dumpsys activity activities
# TODO: implement
return None
@override
def check_autobrightness(self) -> bool:
# adb shell dumpsys display
# TODO: implement.
return True
_BRIGHTNESS_RE: Final[re.Pattern] = re.compile(
r"mLatestFloatBrightness=(?P<brightness>[0-9]+\.[0-9]+)")
@override
def get_main_display_brightness(self) -> int:
display_info: str = self.adb.dumpsys("display")
match_result = self._BRIGHTNESS_RE.search(display_info)
if match_result is None:
raise ValueError("Could not parse adb display brightness.")
return int(float(match_result.group("brightness")) * 100)
@override
def build_shell_cmd(self, *args: CmdArg, shell: bool = False) -> ListCmdArgs:
return self.adb.build_shell_cmd(*args, shell=shell)
@override
def sh(self,
*args: CmdArg,
shell: bool = False,
capture_output: bool = False,
stdout: ProcessIo = None,
stderr: ProcessIo = None,
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False,
check: bool = True) -> subprocess.CompletedProcess:
return self.adb.shell(
*args,
shell=shell,
capture_output=capture_output,
stdout=stdout,
stderr=stderr,
stdin=stdin,
env=env,
quiet=quiet,
check=check)
@override
def sh_stdout_bytes(self,
*args: CmdArg,
shell: bool = False,
quiet: bool = False,
stdin: ProcessIo = None,
env: Optional[Mapping[str, str]] = None,
check: bool = True) -> bytes:
return self.adb.shell_stdout_bytes(
*args, shell=shell, stdin=stdin, env=env, quiet=quiet, check=check)
@override
def pull(self, from_path: pth.AnyPath,
to_path: pth.LocalPath) -> pth.LocalPath:
device_path = self.path(from_path)
if not self.exists(device_path):
raise ValueError(f"Source file '{from_path}' does not exist on {self}")
local_host_path = self.host_path(to_path)
local_host_path.parent.mkdir(parents=True, exist_ok=True)
self.adb.pull(device_path, local_host_path)
return to_path
@override
def push(self, from_path: pth.LocalPath, to_path: pth.AnyPath) -> pth.AnyPath:
to_path = self.path(to_path)
self.adb.push(self.host_path(from_path), to_path)
return to_path
def _mktemp_sh(self,
is_dir: bool,
suffix: Optional[str] = None,
prefix: Optional[str] = None,
dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath:
temp_path = super()._mktemp_sh(is_dir, prefix=prefix, dir=dir)
if not suffix:
return temp_path
# android's mktemp does not support suffix on some platforms.
temp_path_with_suffix = temp_path.with_name(f"{temp_path.name}{suffix}")
self.rename(temp_path, temp_path_with_suffix)
return temp_path_with_suffix
@override
def processes(self,
attrs: Optional[list[str]] = None) -> list[dict[str, Any]]:
lines = self.sh_stdout("ps", "-A", "-o", "PID,NAME").splitlines()
if len(lines) == 1:
return []
res: list[dict[str, Any]] = []
for line in lines[1:]:
tokens = line.strip().split(maxsplit=1)
assert len(tokens) == 2, f"Got invalid process tokens: {tokens}"
res.append({"pid": int(tokens[0]), "name": tokens[1]})
return res
_DUMPSYS_TIMEOUT_RE: Final[re.Pattern] = re.compile(
rb"\*\*\* SERVICE '[^']+' DUMP TIMEOUT \(\d+ms\) EXPIRED \*\*\*")
@override
def process_meminfo(
self, process_name: str, timeout: dt.timedelta = dt.timedelta(seconds=10)
) -> list[ProcessMeminfo]:
timeout_ms = int(timeout / dt.timedelta(milliseconds=1))
meminfo_output: bytes = self.adb.dumpsys_bytes("-T", str(timeout_ms),
"meminfo", "--proto",
"--package", process_name)
if self._DUMPSYS_TIMEOUT_RE.search(meminfo_output):
raise TimeoutError("dumpsys meminfo timed out")
proto_dump = activitymanagerservice_pb2.MemInfoDumpProto()
proto_dump.ParseFromString(meminfo_output)
meminfos: list[ProcessMeminfo] = []
for app_process in proto_dump.app_processes:
mem_info = app_process.process_memory.total_heap.mem_info
meminfos.append(
ProcessMeminfo(
pid=app_process.process_memory.pid,
name=app_process.process_memory.process_name,
pss_total=mem_info.total_pss_kb,
rss_total=mem_info.total_rss_kb,
swap_total=mem_info.dirty_swap_pss_kb or mem_info.dirty_swap_kb))
return meminfos
_DUMPSYS_SYSTEM_TOTAL_FREE_RE: Final[re.Pattern] = re.compile(
br"Total RAM: (?P<total_ram_kb>[0-9][,0-9]*)K.*"
br"\n Free RAM: [0-9][,0-9]*K \( *"
br"(?P<cached_pss_kb>[0-9][,0-9]*)K cached pss \+ +"
br"(?P<cached_kernel_kb>[0-9][,0-9]*)K cached kernel \+ +"
br"(?P<free_kb>[0-9][,0-9]*)K free\)"
# Include other footer lines so we don't have to parse the whole output
# for optional fields.
br".*$",
re.DOTALL)
_DUMPSYS_SYSTEM_DMA_BUF_RE: Final[re.Pattern] = re.compile(
br"DMA-BUF: +(?P<dma_buf_kb>[0-9][,0-9]*)K \("
br" +(?P<dma_buf_mapped_kb>[0-9][,0-9]*)K mapped \+"
br" +(?P<dma_buf_unmapped_kb>[0-9][,0-9]*)K unmapped\)", re.DOTALL)
def _groupdict_to_system_meminfo(
self, groupdict: dict[str, bytes]) -> dict[str, float]:
return {
key: float(value.replace(b",", b""))
for [key, value] in groupdict.items()
}
def system_meminfo(
self,
timeout: dt.timedelta = dt.timedelta(seconds=10)) -> dict[str, float]:
timeout_ms = int(timeout / dt.timedelta(milliseconds=1))
# TODO: switch to proto parsing if/when DMA-BUF counters are in proto
# output.
meminfo_output: bytes = self.adb.dumpsys_bytes("-T", str(timeout_ms),
"meminfo")
if self._DUMPSYS_TIMEOUT_RE.search(meminfo_output):
raise TimeoutError("dumpsys meminfo timed out")
meminfo: dict[str, float] = {}
footer_match = self._DUMPSYS_SYSTEM_TOTAL_FREE_RE.search(meminfo_output)
if not footer_match:
raise RuntimeError("No 'Total RAM' line found in dumpsys meminfo output")
meminfo.update(self._groupdict_to_system_meminfo(footer_match.groupdict()))
dma_buf_match = self._DUMPSYS_SYSTEM_DMA_BUF_RE.search(footer_match[0])
if dma_buf_match:
meminfo.update(
self._groupdict_to_system_meminfo(dma_buf_match.groupdict()))
return meminfo
def dump_java_heap(self, identifier: str, path: pth.AnyPath) -> None:
self.sh("am", "dumpheap", identifier, self.path(path))
@functools.lru_cache(maxsize=1)
@override
def cpu_details(self) -> dict[str, Any]:
# TODO: Implement properly (i.e. remove all n/a values)
return {
"info": self.cpu,
"physical cores": self.cpu_cores(logical=False),
"logical cores": self.cpu_cores(logical=True),
"usage": "n/a",
"total usage": "n/a",
"system load": "n/a",
"min frequency": "n/a",
"max frequency": "n/a",
"current frequency": "n/a",
}
_GETPROP_RE: Final[re.Pattern] = re.compile(
r"^\[(?P<key>[^\]]+)\]: \[(?P<value>[^\]]+)\]$")
@functools.lru_cache(maxsize=1)
@override
def system_details(self) -> dict[str, Any]:
system_details = super().system_details()
system_details.update({
"Android": self._getprop_system_details(),
})
return system_details
def _getprop_system_details(self) -> dict[str, Any]:
properties: dict[str, str] = {}
for line in self.adb.shell_stdout("getprop").strip().splitlines():
result = self._GETPROP_RE.fullmatch(line)
if result:
properties[result.group("key")] = result.group("value")
return properties
@functools.lru_cache(maxsize=1)
@override
def python_details(self) -> JsonDict:
# TODO: Implement properly (i.e. remove all n/a values)
return {
"version": "n/a",
"bits": "n/a",
}
@property
@override
def is_battery_powered(self) -> bool:
battery_info_bytes = self.adb.dumpsys_bytes("battery", "--proto")
battery_info = battery_pb2.BatteryServiceDumpProto()
battery_info.ParseFromString(battery_info_bytes)
return (battery_info.plugged ==
enums_pb2.BatteryPluggedStateEnum.BATTERY_PLUGGED_NONE)
@override
def screenshot(self, result_path: pth.AnyPath) -> None:
self.sh("screencap", "-p", result_path)
_DUMPSYS_WINDOW_DISPLAYS_RE: Final[re.Pattern] = re.compile(
r" cur=(?P<x>\d+)x(?P<y>\d+) ")
@functools.lru_cache(maxsize=1)
def display_details(self) -> tuple[DisplayInfo, ...]:
return ({"resolution": self.display_resolution(), "refresh_rate": -1},)
@override
def display_resolution(self) -> tuple[int, int]:
displays_bytes = self.adb.dumpsys_bytes("window", "displays", "--proto")
displays = windowmanagerservice_pb2.WindowManagerServiceDumpProto()
displays.ParseFromString(displays_bytes)
width = (
displays.root_window_container.window_container.configuration_container
.full_configuration.window_configuration.max_bounds.right)
height = (
displays.root_window_container.window_container.configuration_container
.full_configuration.window_configuration.max_bounds.bottom)
return (width, height)
def user_id(self) -> int:
return NumberParser.any_int(self.sh_stdout("am", "get-current-user"))
@override
@contextlib.contextmanager
def low_power_mode(self) -> Generator[None, Any, None]:
try:
self.lock_screen()
self.doze()
yield
finally:
self.exit_doze()
self.unlock_screen()
def doze(self) -> None:
self.adb.dumpsys("deviceidle", "force-idle")
def exit_doze(self) -> None:
self.adb.dumpsys("deviceidle", "unforce")
self.adb.dumpsys("battery", "reset")
def lock_screen(self) -> None:
self.adb.shell("input", "keyevent", "KEYCODE_POWER")
def unlock_screen(self) -> None:
# Wake up the device
self.adb.shell("input", "keyevent", "KEYCODE_WAKEUP")
# Unlock the device
self.adb.shell("input", "keyevent", "KEYCODE_MENU")