blob: 054cb48e5c1dd9b559290f6198d22e9de98cfb88 [file] [log] [blame]
# mypy: allow-untyped-defs
import os
from multiprocessing import Queue, Event
from subprocess import PIPE
from threading import Thread
from mozprocess import ProcessHandlerMixin
from . import chrome_spki_certs
from .base import Browser, ExecutorBrowser
from .base import get_timeout_multiplier # noqa: F401
from ..executors import executor_kwargs as base_executor_kwargs
from ..executors.executorcontentshell import ( # noqa: F401
ContentShellCrashtestExecutor,
ContentShellPrintRefTestExecutor,
ContentShellRefTestExecutor,
ContentShellTestharnessExecutor,
)
__wptrunner__ = {"product": "content_shell",
"check_args": "check_args",
"browser": "ContentShellBrowser",
"executor": {
"crashtest": "ContentShellCrashtestExecutor",
"print-reftest": "ContentShellPrintRefTestExecutor",
"reftest": "ContentShellRefTestExecutor",
"testharness": "ContentShellTestharnessExecutor",
},
"browser_kwargs": "browser_kwargs",
"executor_kwargs": "executor_kwargs",
"env_extras": "env_extras",
"env_options": "env_options",
"update_properties": "update_properties",
"timeout_multiplier": "get_timeout_multiplier",}
def check_args(**kwargs):
pass
def browser_kwargs(logger, test_type, run_info_data, config, **kwargs):
args = kwargs["binary_args"]
args.append("--ignore-certificate-errors-spki-list=%s" %
','.join(chrome_spki_certs.IGNORE_CERTIFICATE_ERRORS_SPKI_LIST))
webtranport_h3_port = config.ports.get('webtransport-h3')
if webtranport_h3_port is not None:
args.append(
f"--origin-to-force-quic-on=web-platform.test:{webtranport_h3_port[0]}")
# These flags are specific to content_shell - they activate web test protocol mode.
args.append("--run-web-tests")
args.append("-")
return {"binary": kwargs["binary"],
"binary_args": args}
def executor_kwargs(logger, test_type, test_environment, run_info_data,
**kwargs):
executor_kwargs = base_executor_kwargs(test_type, test_environment, run_info_data,
**kwargs)
return executor_kwargs
def env_extras(**kwargs):
return []
def env_options():
return {"server_host": "127.0.0.1",
"testharnessreport": "testharnessreport-content-shell.js"}
def update_properties():
return (["debug", "os", "processor"], {"os": ["version"], "processor": ["bits"]})
class ContentShellBrowser(Browser):
"""Class that represents an instance of content_shell.
Upon startup, the stdout, stderr, and stdin pipes of the underlying content_shell
process are connected to multiprocessing Queues so that the runner process can
interact with content_shell through its protocol mode.
"""
def __init__(self, logger, binary="content_shell", binary_args=[], **kwargs):
super().__init__(logger)
self._args = [binary] + binary_args
self._proc = None
def start(self, group_metadata, **kwargs):
self.logger.debug("Starting content shell: %s..." % self._args[0])
# Unfortunately we need to use the Process class directly because we do not
# want mozprocess to do any output handling at all.
self._proc = ProcessHandlerMixin.Process(self._args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
if os.name == "posix":
self._proc.pgid = ProcessHandlerMixin._getpgid(self._proc.pid)
self._proc.detached_pid = None
self._stdout_queue = Queue()
self._stderr_queue = Queue()
self._stdin_queue = Queue()
self._io_stopped = Event()
self._stdout_reader = self._create_reader_thread(self._proc.stdout, self._stdout_queue)
self._stderr_reader = self._create_reader_thread(self._proc.stderr, self._stderr_queue)
self._stdin_writer = self._create_writer_thread(self._proc.stdin, self._stdin_queue)
# Content shell is likely still in the process of initializing. The actual waiting
# for the startup to finish is done in the ContentShellProtocol.
self.logger.debug("Content shell has been started.")
def stop(self, force=False):
self.logger.debug("Stopping content shell...")
if self.is_alive():
kill_result = self._proc.kill(timeout=5)
# This makes sure any left-over child processes get killed.
# See http://bugzilla.mozilla.org/show_bug.cgi?id=1760080
if force and kill_result != 0:
self._proc.kill(9, timeout=5)
# We need to shut down these queues cleanly to avoid broken pipe error spam in the logs.
self._stdout_reader.join(2)
self._stderr_reader.join(2)
self._stdin_queue.put(None)
self._stdin_writer.join(2)
for thread in [self._stdout_reader, self._stderr_reader, self._stdin_writer]:
if thread.is_alive():
self.logger.warning("Content shell IO threads did not shut down gracefully.")
return False
stopped = not self.is_alive()
if stopped:
self.logger.debug("Content shell has been stopped.")
else:
self.logger.warning("Content shell failed to stop.")
return stopped
def is_alive(self):
return self._proc is not None and self._proc.poll() is None
def pid(self):
return self._proc.pid if self._proc else None
def executor_browser(self):
"""This function returns the `ExecutorBrowser` object that is used by other
processes to interact with content_shell. In our case, this consists of the three
multiprocessing Queues as well as an `io_stopped` event to signal when the
underlying pipes have reached EOF.
"""
return ExecutorBrowser, {"stdout_queue": self._stdout_queue,
"stderr_queue": self._stderr_queue,
"stdin_queue": self._stdin_queue,
"io_stopped": self._io_stopped}
def check_crash(self, process, test):
return not self.is_alive()
def _create_reader_thread(self, stream, queue):
"""This creates (and starts) a background thread which reads lines from `stream` and
puts them into `queue` until `stream` reports EOF.
"""
def reader_thread(stream, queue, stop_event):
while True:
line = stream.readline()
if not line:
break
queue.put(line)
stop_event.set()
queue.close()
queue.join_thread()
result = Thread(target=reader_thread, args=(stream, queue, self._io_stopped), daemon=True)
result.start()
return result
def _create_writer_thread(self, stream, queue):
"""This creates (and starts) a background thread which gets items from `queue` and
writes them into `stream` until it encounters a None item in the queue.
"""
def writer_thread(stream, queue):
while True:
line = queue.get()
if not line:
break
stream.write(line)
stream.flush()
result = Thread(target=writer_thread, args=(stream, queue), daemon=True)
result.start()
return result