blob: 46bf8092ed9bc3f4fe464eb9d73e6f634e680934 [file] [log] [blame]
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utility class, extending subprocess.Popen"""
import errno
import io
import logging
import select
import signal
import subprocess
import sys
import time
import build_common
from util import nonblocking_io
class _BlackHoleOutputHandler(object):
"""Does nothing with any input/timeout callbacks made."""
def handle_stderr(self, text):
pass
def handle_stdout(self, text):
pass
def handle_timeout(self):
pass
def is_done(self):
return True
def _handle_stream_output(reader, handler):
if reader.closed:
return False
read = False
try:
for line in reader:
handler(line)
read = True
reader.close() # EOF is found.
except io.BlockingIOError:
# All available lines are read. No more line is available for now.
pass
return read
class Popen(subprocess.Popen):
"""Extends subprocess.Popen to run a process and filter its output. """
_SHUTDOWN_WAIT_SECONDS = 5
_MIN_TIMEOUT_SECONDS = 1
_STATE_RUNNING = 0 # Running normally
_STATE_TIMED_OUT = 1 # Timeout detected
_STATE_SENDING_SIGTERM = 2 # Sending SIGTERM
_STATE_SENDING_SIGKILL = 3 # Sending SIGKILL
_STATE_ABANDON = 4 # Tried killing, but did not seem to shut down.
_STATE_FINISHED = 5 # Shutdown, though might have been SIGTERM'd
_SHUTDOWN_STATES = (_STATE_SENDING_SIGKILL, _STATE_SENDING_SIGTERM)
def __init__(self, args, stdin=None, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, **kwargs):
assert not kwargs.get('shell', False), (
'We do not expect to run process with shell.')
assert kwargs.get('bufsize', 0) == 0, (
'buffering should be disabled.')
try:
super(Popen, self).__init__(
args, stdout=stdout, stderr=stderr, stdin=stdin, **kwargs)
except:
logging.error('Popen for args %s failed', args)
raise
logging.info('Created pid %d; the command follows:', self.pid)
build_common.log_subprocess_popen(args, **kwargs)
self._initialize_state()
def _initialize_state(self):
if self.stdout:
self.stdout = nonblocking_io.LineReader(self.stdout)
if self.stderr:
self.stderr = nonblocking_io.LineReader(self.stderr)
# This is the wall clock time the child needs to emit output by or be
# considered dead (timed-out). If None, there is no timeout.
self._child_output_deadline = None
# This is the rate (in seconds) at which the child needs to generate output.
# It is used to recompute _child_output_deadline.
self._child_output_timeout = None
# This is the wallclock time at which we expect the child to have completed.
# It can be None if we do not care when it finishes.
# It is set later based on the actual current time as needed if we set up a
# deadline on the call to run_process_filtering_output()
self._child_finish_deadline = None
# This is the wallclock time at which we will try the next shutdown step.
# This is initialized here intentionally to the beginning of time, and is
# updated based on the current time and a reasonable delay as each shutdown
# step is tried.
self._shutdown_deadline = None
# Sorted list of all the deadlines (wallclock times), for quickly choosing
# the next one.
self._deadlines = []
self._state = self._STATE_RUNNING # As far as we know yet.
# These are set later by run_process_filtering_output
self._output_handler = None
self._stop_on_done = False
def _are_all_pipes_closed(self):
return self.stdout.closed and (not self.stderr or self.stderr.closed)
def _close_all_pipes(self):
if not self.stdout.closed:
self.stdout.close()
if self.stderr and not self.stderr.closed:
self.stderr.close()
def _handle_output(self):
# Consume output from any streams.
if not self.stderr:
stderr_read = False
else:
stderr_read = _handle_stream_output(
self.stderr, self._output_handler.handle_stderr)
stdout_read = _handle_stream_output(
self.stdout, self._output_handler.handle_stdout)
if stderr_read or stdout_read:
self._update_child_output_deadline()
return True
if self._state == self._STATE_RUNNING:
now = time.time()
if (self._child_output_deadline and now >= self._child_output_deadline or
self._child_finish_deadline and now >= self._child_finish_deadline):
# Report a timeout
logging.debug("Process %d has timed out", self.pid)
self._output_handler.handle_timeout()
self._state = self._STATE_TIMED_OUT
return False
def update_timeout(self, timeout):
self._child_finish_deadline = time.time() + timeout
self._regenerated_deadline_list()
def _update_child_output_deadline(self):
if self._child_output_timeout is None:
self._child_output_deadline = None
else:
self._child_output_deadline = time.time() + self._child_output_timeout
self._regenerated_deadline_list()
def _update_shutdown_deadline(self):
self._shutdown_deadline = time.time() + self._SHUTDOWN_WAIT_SECONDS
self._regenerated_deadline_list()
def _regenerated_deadline_list(self):
self._deadlines = []
def add_deadline(deadline):
if deadline is not None:
self._deadlines.append(deadline)
add_deadline(self._child_output_deadline)
add_deadline(self._child_finish_deadline)
add_deadline(self._shutdown_deadline)
self._deadlines.sort()
def _find_next_deadline(self, now):
"""Returns the next deadline after 'now'.
If there are no deadlines, returns None."""
for deadline in self._deadlines:
if now < deadline:
return deadline
return None
def _compute_timeout(self, max_timeout=5):
"""Calculate the time (up to |max_timeout| seconds) before |deadline|"""
now = time.time()
deadline = self._find_next_deadline(now)
if deadline is None:
return max_timeout
else:
now = time.time()
# Clamp the timeout to a positive value less than |max_timeout|, since
# some versions of Python may throw an exception on negative values.
return max(0, min(deadline - now, max_timeout))
def _wait_for_child_output(self):
"""Waits for the child process to generate output."""
streams_to_block_reading_on = []
# Generate a list of handles to wait on for being able to read them.
# Filter out any that have been closed.
if not self.stdout.closed:
streams_to_block_reading_on.append(self.stdout)
if self.stderr and not self.stderr.closed:
streams_to_block_reading_on.append(self.stderr)
# If we have nothing to wait on, we're on our way out.
if not streams_to_block_reading_on:
assert self._are_all_pipes_closed()
return
try:
# Note: we hit some timeout case this function does not handle.
# To improve the debuggability in such a case, we assign the
# timeout value to a variable, so that util.debug module outputs
# the value to the log on failure.
timeout = self._compute_timeout()
select.select(streams_to_block_reading_on, [], [], timeout)
except select.error as e:
if e[0] == errno.EINTR:
logging.info("select has been interrupted, exit normally.")
sys.exit(0)
logging.error("select error: " + e[1])
sys.exit(-1)
def _signal_children_of_xvfb(self, signum):
# On platforms other than Linux, psutil may not exist. As such
# environment does not have xvfb-run, we can ignore the error.
# We should also ignore NoSuchProcess. This means the program has
# finished after creating psutil.Process object.
try:
import psutil
try:
proc = psutil.Process(self.pid)
if proc.name != 'xvfb-run':
return False
for child in proc.get_children():
if child.name != 'Xvfb':
child.send_signal(signum)
return True
except psutil.NoSuchProcess:
return False
except ImportError:
return False
def terminate(self):
if not self._signal_children_of_xvfb(signal.SIGTERM):
super(Popen, self).terminate()
self._terminate()
def _terminate(self):
self._update_shutdown_deadline()
self._state = self._STATE_SENDING_SIGTERM
def kill(self):
if not self._signal_children_of_xvfb(signal.SIGKILL):
super(Popen, self).kill()
self._kill()
def _kill(self):
self._update_shutdown_deadline()
self._state = self._STATE_SENDING_SIGKILL
def _is_done(self):
return (self._output_handler.is_done() or
self._state in self._SHUTDOWN_STATES)
def _advance_shutdown_state(self):
if self._stop_on_done:
# Switch to a black hole output handler, as the caller does not want any
# more output. But we still need to process it.
self._output_handler = _BlackHoleOutputHandler()
self._stop_on_done = False
if self._shutdown_deadline > time.time():
# Still waiting on the previous termination step
return True
if self._state < self._STATE_SENDING_SIGTERM:
logging.debug("Terminating process %d", self.pid)
self.terminate()
return True
elif self._state < self._STATE_SENDING_SIGKILL:
logging.error("Killing process %d", self.pid)
self.kill()
return True
return False
def run_process_filtering_output(self, output_handler, timeout=None,
output_timeout=None, stop_on_done=False):
"""Runs the process, invoking methods on output_handler as appropriate.
output_handler is expected to have the following interface:
output_handler.is_done()
Should returns true if process should be terminated. Note however
it is called only immediately after output is processed, so if
the process is not generating any output when this call would
return True, then it will not be terminated.
output_handler.handle_stdout(line)
Called whenever the process writes a line of text to stdout.
output_handler.handle_stderr(line)
Called whenever the process writes a line of text to stderr.
output_handler.handle_timeout()
Called whenever the process timeout is over.
If timeout is not None, it is a count in seconds to wait for process
termination.
if output_timeout is not None, it is the maximum count in seconds to wait
for any output activity from the child process.
If stop_on_done is True, the run loop stops trying to filter output as soon
as the output_handler signals it is done, and just waits for process
termination.
"""
for _ in self.run_process_filtering_output_generator(
output_handler, timeout=timeout, output_timeout=output_timeout,
stop_on_done=stop_on_done):
pass
def run_process_filtering_output_generator(
self, output_handler, timeout=None, output_timeout=None,
stop_on_done=False):
"""Generator version of run_process_filtering_output().
This generator yields after processing process output chunk.
"""
assert self._state == self._STATE_RUNNING
if timeout:
self.update_timeout(timeout)
if output_timeout:
self._child_output_timeout = output_timeout
self._update_child_output_deadline()
self._output_handler = output_handler
self._stop_on_done = stop_on_done
# Yield before processing any output.
yield
while not self._are_all_pipes_closed():
self._wait_for_child_output()
if not self._handle_output():
# We had no output. Check if the child process has already shut down.
# By design we ensure all output is read before doing this.
if self.poll() is not None:
self._close_all_pipes()
break
# Yield after processing the output.
yield
if self._is_done():
# Step towards shutting down the child process
if not self._advance_shutdown_state():
# If we made no progress, abandon the process in whatever state it is
# in.
self._state = self._STATE_ABANDON
logging.error("Abandoning process %d", self.pid)
return
# Wait for the normal process exit to complete, but this requires all output
# to be over, otherwise we could deadlock waiting for the child process to
# terminate, while the child process waits us to make room in the output
# pipes.
assert self._are_all_pipes_closed()
logging.debug("Waiting on process %d", self.pid)
self.wait()
self._state = self._STATE_FINISHED