blob: fe32376cac84352edc6eb8e267b0bb94c44221f6 [file] [log] [blame]
# Copyright 2025 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import dataclasses
import select
import time
from typing import Any, Callable, List, Optional
import dbus
from snegg.c.libei import libei
import snegg.ei as ei
class DBusService:
def __init__(self, bus: dbus.Bus, bus_name: str, interface: str) -> None:
self.bus = bus
self.bus_name = bus_name
self.interface = interface
def call(self,
path: str,
method: str,
signature: str = "",
params: List[Any] = []) -> Any:
return self.bus.call_blocking(self.bus_name, path, self.interface,
method, signature, params)
def get(self, path: str, namespace: str, property_name: str) -> Any:
return self.bus.call_blocking(self.bus_name, path,
"org.freedesktop.DBus.Properties", "Get",
"ss", [namespace, property_name],
)
class RemoteDesktop(DBusService):
def __init__(self, bus: dbus.Bus) -> None:
super().__init__(bus, "org.gnome.Mutter.RemoteDesktop",
"org.gnome.Mutter.RemoteDesktop")
def create_session(self) -> "RemoteDesktopSession":
path = self.call("/org/gnome/Mutter/RemoteDesktop", "CreateSession",
"b", [True])
return RemoteDesktopSession(self.bus, path)
class Startable(DBusService):
def __init__(self, bus: dbus.Bus, bus_name: str, path: str,
interface: str) -> None:
super().__init__(bus, bus_name, interface)
self.path = path
self.started = False
def __del__(self) -> None:
if self.started:
try:
self.call(self.path, "Stop")
except:
pass
def start(self) -> None:
self.call(self.path, "Start")
self.started = True
class RemoteDesktopSession(Startable):
def __init__(self, bus: dbus.Bus, path: str) -> None:
super().__init__(bus, "org.gnome.Mutter.RemoteDesktop", path,
"org.gnome.Mutter.RemoteDesktop.Session",
)
def id(self) -> str:
return self.get(self.path, "org.gnome.Mutter.RemoteDesktop.Session",
"SessionId")
def connect_to_eis(self) -> Any:
return self.call(self.path, "ConnectToEIS", "a{sv}", [{}])
class ScreenCast(DBusService):
def __init__(self, bus: dbus.Bus) -> None:
super().__init__(bus, "org.gnome.Mutter.ScreenCast",
"org.gnome.Mutter.ScreenCast")
def create_session(self, session_id: str) -> "ScreenCastSession":
path = self.call("/org/gnome/Mutter/ScreenCast", "CreateSession",
"a{sv}", [{"remote-desktop-session-id": session_id}],
)
return ScreenCastSession(self.bus, path)
class ScreenCastSession(DBusService):
def __init__(self, bus: dbus.Bus, path: str) -> None:
super().__init__(bus, "org.gnome.Mutter.ScreenCast",
"org.gnome.Mutter.ScreenCast.Session")
self.path = path
def record_monitor(self) -> "ScreenCastStream":
path = self.call(self.path, "RecordMonitor", "sa{sv}", ["", {}])
return ScreenCastStream(self.bus, path)
class ScreenCastStream(Startable):
def __init__(self, bus: dbus.Bus, path: str) -> None:
super().__init__(bus, "org.gnome.Mutter.ScreenCast", path,
"org.gnome.Mutter.ScreenCast.Stream",
)
@dataclasses.dataclass
class Devices:
pointer_relative: Optional[ei.Device] = None
pointer_absolute: Optional[ei.Device] = None
keyboard: Optional[ei.Device] = None
scroll: Optional[ei.Device] = None
button: Optional[ei.Device] = None
def __del__(self) -> None:
if self.pointer_relative:
self.pointer_relative.stop_emulating()
if self.pointer_absolute:
self.pointer_absolute.stop_emulating()
if self.keyboard:
self.keyboard.stop_emulating()
if self.scroll:
self.scroll.stop_emulating()
if self.button:
self.button.stop_emulating()
def ready(self) -> bool:
# Devices are considered ready when all have been received. Although
# invocations might not need them all, attempting to send input before
# all have been received doesn't work reliably. Note that the script
# might wait forever if some of these devices don't exist.
return (self.pointer_relative and self.pointer_absolute
and self.keyboard and self.scroll and self.button)
@dataclasses.dataclass
class IOLike:
fd: int
def fileno(self) -> int:
return self.fd
@dataclasses.dataclass
class Options:
executors: list[Callable[[Devices],
None]] = dataclasses.field(default_factory=list)
def connect_to_eis(fd: int, options: Options) -> None:
ctx = ei.Sender.create_for_fd(fd=IOLike(fd), name="ei-debug-events")
poll = select.poll()
poll.register(ctx.fd)
devices = Devices()
while poll.poll():
ctx.dispatch()
for e in ctx.events:
# Protect access to event_type because the getter throws for unknown
# events.
try:
event_type = e.event_type
except Exception as err:
print(err)
continue
if options.verbose:
print(e)
if event_type == ei.EventType.SEAT_ADDED:
if options.verbose:
print(e.seat)
e.seat.bind(ei.DeviceCapability.all())
elif event_type == ei.EventType.DEVICE_RESUMED:
if options.verbose:
print(e.device)
e.device.start_emulating()
if ei.DeviceCapability.POINTER in e.device.capabilities:
devices.pointer_relative = e.device
if ei.DeviceCapability.POINTER_ABSOLUTE in e.device.capabilities:
devices.pointer_absolute = e.device
if ei.DeviceCapability.SCROLL in e.device.capabilities:
devices.scroll = e.device
if ei.DeviceCapability.BUTTON in e.device.capabilities:
devices.button = e.device
if ei.DeviceCapability.KEYBOARD in e.device.capabilities:
devices.keyboard = e.device
if devices.ready():
for executor in options.executors:
executor(devices)
return
# snegg doesn't expose the scroll functions on devices
def scroll_discrete(device: ei.Device, y: int) -> None:
libei.device_scroll_discrete(device._cobject, 0, y) # pylint: disable=protected-access
device.frame()
def scroll_delta(device: ei.Device, y: int) -> None:
# For a virtual device (which we're certainly dealing with in this script),
# y is in dips. For a physical device, it would be mm.
libei.device_scroll_delta(device._cobject, 0, y) # pylint: disable=protected-access
device.frame()
class SleepAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(lambda _: time.sleep(values[0]))
class ScrollDeltaAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(lambda d: scroll_delta(d.scroll, values[0]))
class ScrollDiscreteAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(
lambda d: scroll_discrete(d.scroll, 120 * values[0]))
class MoveToAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(
lambda d: d.pointer_absolute.pointer_motion_absolute(
values[0], values[1]).frame())
class MoveByAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(lambda d: d.pointer_relative.pointer_motion(
values[0], values[1]).frame())
class ClickAction(argparse.Action):
BTN_LEFT = 0x110 # Per linux/input-event-codes.h
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
value: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(
lambda d: d.button.button_button(value + self.BTN_LEFT, True).frame(
).button_button(value + self.BTN_LEFT, False).frame())
class TypeAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(lambda d: TypeAction.exec(d.keyboard, values))
@staticmethod
def exec(keyboard: ei.Device, values: List[int]) -> None:
for value in values:
keyboard.keyboard_key(value, True).frame().keyboard_key(
value, False).frame()
class KeyDownAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(
lambda d: d.keyboard.keyboard_key(values[0], True).frame())
class KeyUpAction(argparse.Action):
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option: Optional[str] = None) -> None:
namespace.executors.append(
lambda d: d.keyboard.keyboard_key(values[0], False).frame())
if __name__ == "__main__":
options = Options()
parser = argparse.ArgumentParser()
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="enable debug output",
)
parser.add_argument(
"--sleep",
action=SleepAction,
nargs=1,
type=float,
metavar="seconds",
help="sleep before executing the next command.",
)
parser.add_argument(
"--scroll_delta",
action=ScrollDeltaAction,
nargs=1,
type=int,
metavar="px",
help="scroll in pixels down (+ve) or down (-ve)",
)
parser.add_argument(
"--scroll_discrete",
action=ScrollDiscreteAction,
nargs=1,
type=int,
metavar="ticks",
help="scroll in ticks down (+ve) or down (-ve)",
)
parser.add_argument(
"--click",
action=ClickAction,
nargs="?",
const=0, # Left button
type=int,
metavar="button",
help="click the mouse (left button by default)",
)
parser.add_argument(
"--move_to",
action=MoveToAction,
nargs=2,
type=float,
metavar=("x", "y"),
help="move the mouse to a location",
)
parser.add_argument(
"--move_by",
action=MoveByAction,
nargs=2,
type=float,
metavar=("dx", "dy"),
help="move the mouse by an amount",
)
parser.add_argument(
"--type",
action=TypeAction,
nargs="+",
type=int,
metavar="keycode",
help="type (press and release) a sequence of keys",
)
parser.add_argument(
"--key_down",
action=KeyDownAction,
nargs=1,
type=int,
metavar="keycode",
help="press a key",
)
parser.add_argument(
"--key_up",
action=KeyUpAction,
nargs=1,
type=int,
metavar="keycode",
help="release a key",
)
args = parser.parse_args(namespace=options)
if not options.executors:
parser.error("No commands specified")
try:
session_bus = dbus.SessionBus()
remote_desktop_session = RemoteDesktop(session_bus).create_session()
session_id = remote_desktop_session.id()
eis_fd = remote_desktop_session.connect_to_eis()
screencast_session = ScreenCast(session_bus).create_session(session_id)
remote_desktop_session.start()
stream = screencast_session.record_monitor()
stream.start()
connect_to_eis(eis_fd.take(), options)
except KeyboardInterrupt:
pass