blob: 90541446c407905e785ab3655756ff0322a3f7c7 [file] [log] [blame]
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import functools
import logging
import re
import shlex
import subprocess
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
from crossbench import cli_helper
from crossbench.plt.arch import MachineArch
from crossbench.plt.posix import RemotePosixPlatform
if TYPE_CHECKING:
from crossbench.path import (LocalPath, LocalPathLike, RemotePath,
RemotePathLike)
from crossbench.plt.base import CmdArg, ListCmdArgs, Platform
from crossbench.types import JsonDict
def _find_adb_bin(platform: Platform) -> RemotePath:
adb_bin = platform.search_platform_binary(
name="adb",
macos=["adb", "~/Library/Android/sdk/platform-tools/adb"],
linux=["adb"],
win=["adb.exe", "Android/sdk/platform-tools/adb.exe"])
if adb_bin:
return adb_bin
raise ValueError(
"Could not find adb binary."
"See https://developer.android.com/tools/adb fore more details.")
def adb_devices(
platform: Platform,
adb_bin: Optional[RemotePath] = None) -> Dict[str, Dict[str, str]]:
adb_bin = adb_bin or _find_adb_bin(platform)
output = platform.sh_stdout(adb_bin, "devices", "-l")
raw_lines = output.strip().splitlines()[1:]
result: Dict[str, Dict[str, str]] = {}
for line in raw_lines:
serial_id, details = line.split(" ", maxsplit=1)
result[serial_id.strip()] = _parse_adb_device_info(details.strip())
return result
def _parse_adb_device_info(value: str) -> Dict[str, str]:
parts = value.split(" ")
assert parts[0], "device"
return dict(part.split(":") for part in parts[1:])
class Adb:
_serial_id: str
_device_info: Dict[str, str]
_adb_bin: RemotePath
def __init__(self,
host_platform: Platform,
device_identifier: Optional[str] = None,
adb_bin: Optional[RemotePath] = None) -> None:
self._host_platform = host_platform
if adb_bin:
self._adb_bin = cli_helper.parse_binary_path(
adb_bin, platform=host_platform)
else:
self._adb_bin = _find_adb_bin(host_platform)
self.start_server()
self._serial_id, self._device_info = self._find_serial_id(device_identifier)
logging.debug("ADB Selected device: %s %s", self._serial_id,
self._device_info)
assert self._serial_id
def _find_serial_id(
self,
device_identifier: Optional[str] = None) -> Tuple[str, Dict[str, str]]:
devices = self.devices()
if not devices:
raise ValueError("adb could not find any attached devices."
"Connect your device and use 'adb devices' to list all.")
if device_identifier is None:
if len(devices) != 1:
raise ValueError(
f"Too many adb devices attached, please specify one of: {devices}")
device_identifier = list(devices.keys())[0]
if not device_identifier:
raise ValueError(f"Invalid device identifier: {repr(device_identifier)}")
if device_identifier in devices:
return device_identifier, devices[device_identifier]
matches: List[str] = []
under_name = device_identifier.replace(" ", "_")
for key, device_info in devices.items():
for _, info_value in device_info.items():
if device_identifier in info_value or (under_name in info_value):
matches.append(key)
if not matches:
raise ValueError(
f"Could not find adb device matching: '{device_identifier}'")
if len(matches) > 1:
raise ValueError(
f"Found {len(matches)} adb devices matching: '{device_identifier}'.\n"
f"Choices: {matches}")
return matches[0], devices[matches[0]]
def __str__(self) -> str:
info = f"info='{self._device_info}'"
if model := self._device_info.get("model"):
info = f"model={repr(model)}"
return f"adb(device_id={repr(self._serial_id)}, {info})"
def has_root(self) -> bool:
return self.shell_stdout("id").startswith("uid=0(root)")
def path(self, path: RemotePathLike) -> RemotePath:
return self._host_platform.path(path)
@property
def serial_id(self) -> str:
return self._serial_id
@functools.cached_property
def build_version(self) -> int:
return int(self.getprop("ro.build.version.release"))
@property
def device_info(self) -> Dict[str, str]:
return self._device_info
def popen(self,
*args: CmdArg,
shell: bool = False,
stdout=None,
stderr=None,
stdin=None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False) -> subprocess.Popen:
del shell
assert not env, "ADB does not support setting env vars."
if not quiet:
logging.debug("SHELL: %s", shlex.join(map(str, args)))
adb_cmd: ListCmdArgs = [self._adb_bin, "-s", self._serial_id, "shell"]
adb_cmd.extend(args)
return self._host_platform.popen(
*adb_cmd, stdout=stdout, stderr=stderr, stdin=stdin)
def _adb(self,
*args: CmdArg,
shell: bool = False,
capture_output: bool = False,
stdout=None,
stderr=None,
stdin=None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False,
check: bool = True,
use_serial_id: bool = True) -> subprocess.CompletedProcess:
del shell
adb_cmd: ListCmdArgs = []
if use_serial_id:
adb_cmd = [self._adb_bin, "-s", self._serial_id]
else:
adb_cmd = [self._adb_bin]
adb_cmd.extend(args)
return self._host_platform.sh(
*adb_cmd,
capture_output=capture_output,
stdout=stdout,
stderr=stderr,
stdin=stdin,
env=env,
quiet=quiet,
check=check)
def _adb_stdout(self,
*args: CmdArg,
quiet: bool = False,
encoding: str = "utf-8",
use_serial_id: bool = True,
check: bool = True) -> str:
adb_cmd: ListCmdArgs = []
if use_serial_id:
adb_cmd = [self._adb_bin, "-s", self._serial_id]
else:
adb_cmd = [self._adb_bin]
adb_cmd.extend(args)
return self._host_platform.sh_stdout(
*adb_cmd, quiet=quiet, encoding=encoding, check=check)
def shell_stdout(self,
*args: CmdArg,
quiet: bool = False,
encoding: str = "utf-8",
env: Optional[Mapping[str, str]] = None,
check: bool = True) -> str:
# -e: choose escape character, or "none"; default '~'
# -n: don't read from stdin
# -T: disable pty allocation
# -t: allocate a pty if on a tty (-tt: force pty allocation)
# -x: disable remote exit codes and stdout/stderr separation
if env:
raise ValueError("ADB shell only supports an empty env for now.")
# Need to escape spaces in args for adb shell
args = map(lambda x: str(x).replace(" ", "\\ "), args)
return self._adb_stdout(
"shell", *args, quiet=quiet, encoding=encoding, check=check)
def shell(self,
*args: CmdArg,
shell: bool = False,
capture_output: bool = False,
stdout=None,
stderr=None,
stdin=None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False,
check: bool = True) -> subprocess.CompletedProcess:
# See shell_stdout for more `adb shell` options.
adb_cmd: ListCmdArgs = ["shell", *args]
return self._adb(
*adb_cmd,
shell=shell,
capture_output=capture_output,
stdout=stdout,
stderr=stderr,
stdin=stdin,
env=env,
quiet=quiet,
check=check)
def start_server(self) -> None:
self._adb_stdout("start-server", use_serial_id=False)
def stop_server(self) -> None:
self.kill_server()
def kill_server(self) -> None:
self._adb_stdout("kill-server", use_serial_id=False)
def devices(self) -> Dict[str, Dict[str, str]]:
return adb_devices(self._host_platform, self._adb_bin)
def reverse(self, remote: int, local: int, protocol: str = "tcp") -> None:
self._adb("reverse", f"{protocol}:{remote}", f"{protocol}:{local}")
def reverse_remove(self, remote: int, protocol: str = "tcp") -> None:
self._adb("reverse", "--remove", f"{protocol}:{remote}")
def pull(self, device_src_path: RemotePath,
local_dest_path: LocalPath) -> None:
self._adb("pull", self.path(device_src_path), local_dest_path)
def push(self, local_src_path: LocalPath,
device_dest_path: RemotePath) -> None:
self._adb("push", local_src_path, self.path(device_dest_path))
def cmd(self,
*args: str,
quiet: bool = False,
encoding: str = "utf-8") -> str:
cmd: ListCmdArgs = ["cmd", *args]
return self.shell_stdout(*cmd, quiet=quiet, encoding=encoding)
def dumpsys(self,
*args: str,
quiet: bool = False,
encoding: str = "utf-8") -> str:
cmd: ListCmdArgs = ["dumpsys", *args]
return self.shell_stdout(*cmd, quiet=quiet, encoding=encoding)
def getprop(self,
*args: str,
quiet: bool = False,
encoding: str = "utf-8") -> str:
cmd: ListCmdArgs = ["getprop", *args]
return self.shell_stdout(*cmd, quiet=quiet, encoding=encoding).strip()
def services(self, quiet: bool = False, encoding: str = "utf-8") -> List[str]:
lines = list(
self.cmd("-l", quiet=quiet, encoding=encoding).strip().splitlines())
lines = lines[1:]
lines.sort()
return [line.strip() for line in lines]
def packages(self, quiet: bool = False, encoding: str = "utf-8") -> List[str]:
# adb shell cmd package list packages
raw_list = self.cmd(
"package", "list", "packages", quiet=quiet,
encoding=encoding).strip().splitlines()
packages = [package.split(":", maxsplit=2)[1] for package in raw_list]
packages.sort()
return packages
def force_stop(self, package_name: str) -> None:
if not package_name:
raise ValueError("Got empty package name")
self.shell("am", "force-stop", package_name)
def install(self,
bundle: LocalPath,
allow_downgrade: bool = False,
modules: Optional[str] = None) -> None:
if bundle.suffix == ".apks":
self.install_apks(bundle, allow_downgrade, modules)
if bundle.suffix == ".apk":
self.install_apk(bundle, allow_downgrade)
def install_apk(self, apk: LocalPath, allow_downgrade: bool = False) -> None:
if not apk.exists():
raise ValueError(f"APK {apk} does not exist.")
args = ["install"]
if allow_downgrade:
args.append("-d")
args.append(str(apk))
self._adb(*args)
def install_apks(self,
apks: LocalPath,
allow_downgrade: bool = False,
modules: Optional[str] = None) -> None:
if not apks.exists():
raise ValueError(f"APK {apks} does not exist.")
cmd = [
"bundletool",
"install-apks",
f"--apks={apks}",
f"--device-id={self._serial_id}",
]
if allow_downgrade:
cmd.append("--allow-downgrade")
if modules:
cmd.append(f"--modules={modules}")
self._host_platform.sh(*cmd)
def uninstall(self, package_name: str, missing_ok: bool = False) -> None:
if not package_name:
raise ValueError("Got empty package name")
try:
self._adb("uninstall", package_name)
except Exception as e: # pylint: disable=broad-except
if missing_ok:
logging.debug("Could not uninstall %s: %s", package_name, e)
else:
raise
def grant_notification_permissions(self, package_name: str) -> None:
if self.build_version < 13:
# Notification permission setting is needed for Android 13 and above.
# https://developer.android.com/develop/ui/views/notifications/notification-permission # pylint: disable=line-too-long
return
if not package_name:
raise ValueError("Got empty package name")
cmd: ListCmdArgs = ["pm", "grant"]
if self.build_version >= 14:
user = self.cmd("user", "get-main-user").strip()
cmd.extend(["--user", user])
cmd.extend([package_name, "android.permission.POST_NOTIFICATIONS"])
self.shell(*cmd)
class AndroidAdbPlatform(RemotePosixPlatform):
def __init__(self,
host_platform: Platform,
device_identifier: Optional[str] = None,
adb: Optional[Adb] = None) -> None:
super().__init__(host_platform)
self._system_details: Optional[Dict[str, Any]] = None
self._cpu_details: Optional[Dict[str, Any]] = None
assert not host_platform.is_remote, (
"adb on remote platform is not supported yet")
self._adb = adb or Adb(host_platform, device_identifier)
@property
def is_android(self) -> bool:
return True
@property
def name(self) -> str:
return "android"
@functools.cached_property
def version(self) -> str: #pylint: disable=invalid-overridden-method
return str(self.adb.build_version)
@functools.cached_property
def device(self) -> str: #pylint: disable=invalid-overridden-method
return self.adb.getprop("ro.product.model")
@functools.cached_property
def cpu(self) -> str: #pylint: disable=invalid-overridden-method
variant = self.adb.getprop("dalvik.vm.isa.arm.variant")
platform = self.adb.getprop("ro.board.platform")
cpu_str = f"{variant} {platform}"
if cores_info := self._get_cpu_cores_info():
cpu_str = f"{cpu_str} {cores_info}"
return cpu_str
@property
def adb(self) -> Adb:
return self._adb
_MACHINE_ARCH_LOOKUP = {
"arm64-v8a": MachineArch.ARM_64,
"armeabi-v7a": MachineArch.ARM_32,
"x86": MachineArch.IA32,
"x86_64": MachineArch.X64,
}
@functools.cached_property
def machine(self) -> MachineArch: #pylint: disable=invalid-overridden-method
cpu_abi = self.adb.getprop("ro.product.cpu.abi")
arch = self._MACHINE_ARCH_LOOKUP.get(cpu_abi, None)
if not arch:
raise ValueError(f"Unknown android CPU ABI: {cpu_abi}")
return arch
def app_path_to_package(self, app_path: RemotePathLike) -> str:
path = self.path(app_path)
if len(path.parts) > 1:
raise ValueError(f"Invalid android package name: '{path}'")
package: str = path.parts[0]
packages = self.adb.packages()
if package not in packages:
raise ValueError(f"Package '{package}' is not installed on {self._adb}")
return package
def search_binary(self, app_or_bin: RemotePathLike) -> Optional[RemotePath]:
app_or_bin_path = self.path(app_or_bin)
if not app_or_bin_path.parts:
raise ValueError("Got empty path")
if result_path := self.which(app_or_bin_path):
return result_path
if str(app_or_bin) in self.adb.packages():
return app_or_bin_path
return None
def home(self) -> RemotePath:
raise RuntimeError("Cannot access home dir on (non-rooted) android device")
_VERSION_NAME_RE = re.compile(r"versionName=(?P<version>.+)")
def app_version(self, app_or_bin: RemotePathLike) -> str:
# adb shell dumpsys package com.chrome.canary | grep versionName -C2
package = self.app_path_to_package(app_or_bin)
package_info = self.adb.dumpsys("package", str(package))
match_result = self._VERSION_NAME_RE.search(package_info)
if match_result is None:
raise ValueError(
f"Could not find version for '{package}': {package_info}")
return match_result.group("version")
def process_children(self,
parent_pid: int,
recursive: bool = False) -> List[Dict[str, Any]]:
# TODO: implement
return []
def foreground_process(self) -> Optional[Dict[str, Any]]:
# adb shell dumpsys activity activities
# TODO: implement
return None
def get_relative_cpu_speed(self) -> float:
# TODO figure out
return 1.0
def python_details(self) -> JsonDict:
# Python is not available on android.
return {}
def os_details(self) -> JsonDict:
# TODO: add more info
return {"version": self.version}
def check_autobrightness(self) -> bool:
# adb shell dumpsys display
# TODO: implement.
return True
_BRIGHTNESS_RE = re.compile(
r"mLatestFloatBrightness=(?P<brightness>[0-9]+\.[0-9]+)")
def get_main_display_brightness(self) -> int:
display_info: str = self.adb.shell_stdout("dumpsys", "display")
match_result = self._BRIGHTNESS_RE.search(display_info)
if match_result is None:
raise ValueError("Could not parse adb display brightness.")
return int(float(match_result.group("brightness")) * 100)
@property
def default_tmp_dir(self) -> RemotePath:
return self.path("/data/local/tmp/")
def sh(self,
*args: CmdArg,
shell: bool = False,
capture_output: bool = False,
stdout=None,
stderr=None,
stdin=None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False,
check: bool = False) -> subprocess.CompletedProcess:
return self.adb.shell(
*args,
shell=shell,
capture_output=capture_output,
stdout=stdout,
stderr=stderr,
stdin=stdin,
env=env,
quiet=quiet,
check=check)
def sh_stdout(self,
*args: CmdArg,
shell: bool = False,
quiet: bool = False,
encoding: str = "utf-8",
env: Optional[Mapping[str, str]] = None,
check: bool = True) -> str:
# The shell option is not supported on adb.
del shell
return self.adb.shell_stdout(
*args, env=env, quiet=quiet, encoding=encoding, check=check)
def popen(self,
*args: CmdArg,
shell: bool = False,
stdout=None,
stderr=None,
stdin=None,
env: Optional[Mapping[str, str]] = None,
quiet: bool = False) -> subprocess.Popen:
return self.adb.popen(
*args,
shell=shell,
stdout=stdout,
stderr=stderr,
stdin=stdin,
env=env,
quiet=quiet)
def reverse_port_forward(self, remote_port: int, local_port: int) -> None:
self.adb.reverse(remote_port, local_port, protocol="tcp")
def stop_reverse_port_forward(self, remote_port: int) -> None:
self.adb.reverse_remove(remote_port, protocol="tcp")
def rsync(self, from_path: RemotePath, to_path: LocalPath) -> LocalPath:
return self.pull(from_path, to_path)
def pull(self, from_path: RemotePath, to_path: LocalPath) -> LocalPath:
device_path = self.path(from_path)
assert self.exists(device_path), (
f"Source file '{from_path}' does not exist on {self}")
local_host_path = self.host_path(to_path)
local_host_path.parent.mkdir(parents=True, exist_ok=True)
self.adb.pull(device_path, local_host_path)
return to_path
def push(self, from_path: LocalPath, to_path: RemotePath) -> RemotePath:
self.adb.push(from_path, self.path(to_path))
return to_path
def set_file_contents(self,
file: RemotePathLike,
data: str,
encoding: str = "utf-8") -> None:
# self.push a tmp file with the given contents
tmp_dir: LocalPath = self.host_path(self.host_platform.mkdtemp())
try:
tmp_file = tmp_dir / "push.data"
with tmp_file.open("w", encoding=encoding) as f:
f.write(data)
self.push(tmp_file, self.path(file))
finally:
self.host_platform.rm(tmp_dir, dir=True, missing_ok=True)
def processes(self,
attrs: Optional[List[str]] = None) -> List[Dict[str, Any]]:
lines = self.sh_stdout("ps", "-A", "-o", "PID,NAME").splitlines()
if len(lines) == 1:
return []
res: List[Dict[str, Any]] = []
for line in lines[1:]:
tokens = line.strip().split(maxsplit=1)
assert len(tokens) == 2, f"Got invalid process tokens: {tokens}"
res.append({"pid": int(tokens[0]), "name": tokens[1]})
return res
def cpu_details(self) -> Dict[str, Any]:
if self._cpu_details:
return self._cpu_details
# TODO: Implement properly (i.e. remove all n/a values)
self._cpu_details = {
"info": self.cpu,
"physical cores": "n/a",
"logical cores": "n/a",
"usage": "n/a",
"total usage": "n/a",
"system load": "n/a",
"max frequency": "n/a",
"min frequency": "n/a",
"current frequency": "n/a",
}
return self._cpu_details
_GETPROP_RE = re.compile(r"^\[(?P<key>[^\]]+)\]: \[(?P<value>[^\]]+)\]$")
def _getprop_system_details(self) -> Dict[str, Any]:
details = super().system_details()
properties: Dict[str, str] = {}
for line in self.adb.shell_stdout("getprop").strip().splitlines():
result = self._GETPROP_RE.fullmatch(line)
if result:
properties[result.group("key")] = result.group("value")
details["android"] = properties
return details
def system_details(self) -> Dict[str, Any]:
if self._system_details:
return self._system_details
# TODO: Implement properly (i.e. remove all n/a values)
self._system_details = {
"machine": self.sh_stdout("uname", "-m").split()[0],
"os": {
"system": self.sh_stdout("uname", "-s").split()[0],
"release": self.sh_stdout("uname", "-r").split()[0],
"version": self.sh_stdout("uname", "-v").split()[0],
"platform": "n/a",
},
"python": {
"version": "n/a",
"bits": "n/a",
},
"CPU": self.cpu_details(),
"Android": self._getprop_system_details(),
}
return self._system_details
def screenshot(self, result_path: RemotePath) -> None:
self.sh("screencap", "-p", result_path)