blob: a0424e631dc554e13df7d4fc2423a6f2f2907177 [file] [log] [blame]
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""subprocess42 is the answer to life the universe and everything.
It has the particularity of having a Popen implementation that can yield output
as it is produced while implementing a timeout and NOT requiring the use of
worker threads.
Example:
Wait for a child process with a timeout, send SIGTERM, wait a grace period
then send SIGKILL:
def wait_terminate_then_kill(proc, timeout, grace):
try:
return proc.wait(timeout)
except subprocess42.TimeoutExpired:
proc.terminate()
try:
return proc.wait(grace)
except subprocess42.TimeoutExpired:
proc.kill()
return proc.wait()
TODO(maruel): Add VOID support like subprocess2.
"""
import collections
import contextlib
import errno
import os
import signal
import sys
import threading
import time
import subprocess
from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611
from subprocess import list2cmdline
# Default maxsize argument.
MAX_SIZE = 16384
# Set to True when inhibit_crash_dump() has been called.
_OS_ERROR_REPORTING_INHIBITED = False
if subprocess.mswindows:
import ctypes
import msvcrt # pylint: disable=F0401
from ctypes import wintypes
from ctypes import windll
# Which to be received depends on how this process was called and outside the
# control of this script. See Popen docstring for more details.
STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM)
def ReadFile(handle, desired_bytes):
"""Calls kernel32.ReadFile()."""
c_read = wintypes.DWORD()
buff = wintypes.create_string_buffer(desired_bytes+1)
windll.kernel32.ReadFile(
handle, buff, desired_bytes, wintypes.byref(c_read), None)
# NULL terminate it.
buff[c_read.value] = '\x00'
return wintypes.GetLastError(), buff.value
def PeekNamedPipe(handle):
"""Calls kernel32.PeekNamedPipe(). Simplified version."""
c_avail = wintypes.DWORD()
c_message = wintypes.DWORD()
success = windll.kernel32.PeekNamedPipe(
handle, None, 0, None, wintypes.byref(c_avail),
wintypes.byref(c_message))
if not success:
raise OSError(wintypes.GetLastError())
return c_avail.value
def recv_multi_impl(conns, maxsize, timeout):
"""Reads from the first available pipe.
It will immediately return on a closed connection, independent of timeout.
Arguments:
- maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- timeout: If None, it is blocking. If 0 or above, will return None if no
data is available within |timeout| seconds.
Returns:
tuple(int(index), str(data), bool(closed)).
"""
assert conns
assert timeout is None or isinstance(timeout, (int, float)), timeout
maxsize = max(maxsize or MAX_SIZE, 1)
# TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
# for proc.stdout and proc.stderr but they are implemented as named pipes on
# Windows. Since named pipes are not waitable object, they can't be passed
# as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
# and finally WFMO(). This requires caching the events handles in the Popen
# object and remembering the pending ReadFile() calls. This will require
# some re-architecture to store the relevant event handle and OVERLAPPEDIO
# object in Popen or the file object.
start = time.time()
handles = [
(i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
]
while True:
for index, handle in handles:
try:
avail = min(PeekNamedPipe(handle), maxsize)
if avail:
return index, ReadFile(handle, avail)[1], False
except OSError:
# The pipe closed.
return index, None, True
if timeout is not None and (time.time() - start) >= timeout:
return None, None, False
# Polling rocks.
time.sleep(0.001)
else:
import fcntl # pylint: disable=F0401
import select
# Signals that mean this process should exit quickly.
STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM)
def recv_multi_impl(conns, maxsize, timeout):
"""Reads from the first available pipe.
It will immediately return on a closed connection, independent of timeout.
Arguments:
- maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- timeout: If None, it is blocking. If 0 or above, will return None if no
data is available within |timeout| seconds.
Returns:
tuple(int(index), str(data), bool(closed)).
"""
assert conns
assert timeout is None or isinstance(timeout, (int, float)), timeout
maxsize = max(maxsize or MAX_SIZE, 1)
# select(timeout=0) will block, it has to be a value > 0.
if timeout == 0:
timeout = 0.001
try:
r, _, _ = select.select(conns, [], [], timeout)
except select.error:
r = None
if not r:
return None, None, False
conn = r[0]
# Temporarily make it non-blocking.
# TODO(maruel): This is not very efficient when the caller is doing this in
# a loop. Add a mechanism to have the caller handle this.
flags = fcntl.fcntl(conn, fcntl.F_GETFL)
if not conn.closed:
# pylint: disable=E1101
fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
try:
try:
data = conn.read(maxsize)
except IOError as e:
# On posix, this means the read would block.
if e.errno == errno.EAGAIN:
return conns.index(conn), None, False
raise e
if not data:
# On posix, this means the channel closed.
return conns.index(conn), None, True
return conns.index(conn), data, False
finally:
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags)
class TimeoutExpired(Exception):
"""Compatible with python3 subprocess."""
def __init__(self, cmd, timeout, output=None, stderr=None):
self.cmd = cmd
self.timeout = timeout
self.output = output
# Non-standard:
self.stderr = stderr
super(TimeoutExpired, self).__init__(str(self))
def __str__(self):
return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout)
class Popen(subprocess.Popen):
"""Adds timeout support on stdout and stderr.
Inspired by
http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
Unlike subprocess, yield_any(), recv_*(), communicate() will close stdout and
stderr once the child process closes them, after all the data is read.
Mutated behavior:
- args: transparently encode('utf-8') any unicode items.
- cwd: transparently encode('utf-8') if unicode.
- env: transparently encode('utf-8') any unicode keys or values.
Additional arguments:
- detached: If True, the process is created in a new process group. On
Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0).
Additional members:
- start: timestamp when this process started.
- end: timestamp when this process exited, as seen by this process.
- detached: If True, the child process was started as a detached process.
- gid: process group id, if any.
- duration: time in seconds the process lasted.
Additional methods:
- yield_any(): yields output until the process terminates.
- recv_any(): reads from stdout and/or stderr with optional timeout.
- recv_out() & recv_err(): specialized version of recv_any().
"""
# subprocess.Popen.__init__() is not threadsafe; there is a race between
# creating the exec-error pipe for the child and setting it to CLOEXEC during
# which another thread can fork and cause the pipe to be inherited by its
# descendents, which will cause the current Popen to hang until all those
# descendents exit. Protect this with a lock so that only one fork/exec can
# happen at a time.
popen_lock = threading.Lock()
def __init__(self, args, **kwargs):
assert 'creationflags' not in kwargs
assert 'preexec_fn' not in kwargs, 'Use detached=True instead'
# Windows version of subprocess.Popen() really doens't like unicode. In
# practice we should use the current ANSI code page, but settle for utf-8
# across all OSes for consistency.
to_str = lambda i: i if isinstance(i, str) else i.encode('utf-8')
args = [to_str(i) for i in args]
if kwargs.get('cwd') is not None:
kwargs['cwd'] = to_str(kwargs['cwd'])
if kwargs.get('env'):
kwargs['env'] = {
to_str(k): to_str(v) for k, v in kwargs['env'].iteritems()
}
self.start = time.time()
self.end = None
self.gid = None
self.detached = kwargs.pop('detached', False)
if self.detached:
if subprocess.mswindows:
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
else:
kwargs['preexec_fn'] = lambda: os.setpgid(0, 0)
with self.popen_lock:
super(Popen, self).__init__(args, **kwargs)
self.args = args
if self.detached and not subprocess.mswindows:
try:
self.gid = os.getpgid(self.pid)
except OSError:
# sometimes the process can run+finish before we collect its pgid. fun.
pass
def duration(self):
"""Duration of the child process.
It is greater or equal to the actual time the child process ran. It can be
significantly higher than the real value if neither .wait() nor .poll() was
used.
"""
return (self.end or time.time()) - self.start
# pylint: disable=arguments-differ,redefined-builtin
def communicate(self, input=None, timeout=None):
"""Implements python3's timeout support.
Unlike wait(), timeout=0 is considered the same as None.
Raises:
- TimeoutExpired when more than timeout seconds were spent waiting for the
process.
"""
if not timeout:
return super(Popen, self).communicate(input=input)
assert isinstance(timeout, (int, float)), timeout
if self.stdin or self.stdout or self.stderr:
stdout = '' if self.stdout else None
stderr = '' if self.stderr else None
t = None
if input is not None:
assert self.stdin, (
'Can\'t use communicate(input) if not using '
'Popen(stdin=subprocess42.PIPE')
# TODO(maruel): Switch back to non-threading.
def write():
try:
self.stdin.write(input)
except IOError:
pass
t = threading.Thread(name='Popen.communicate', target=write)
t.daemon = True
t.start()
try:
if self.stdout or self.stderr:
start = time.time()
end = start + timeout
def remaining():
return max(end - time.time(), 0)
for pipe, data in self.yield_any(timeout=remaining):
if pipe is None:
raise TimeoutExpired(self.args, timeout, stdout, stderr)
assert pipe in ('stdout', 'stderr'), pipe
if pipe == 'stdout':
stdout += data
else:
stderr += data
else:
# Only stdin is piped.
self.wait(timeout=timeout)
finally:
if t:
try:
self.stdin.close()
except IOError:
pass
t.join()
else:
# No pipe. The user wanted to use wait().
self.wait(timeout=timeout)
return None, None
# Indirectly initialize self.end.
self.wait()
return stdout, stderr
def wait(self, timeout=None): # pylint: disable=arguments-differ
"""Implements python3's timeout support.
Raises:
- TimeoutExpired when more than timeout seconds were spent waiting for the
process.
"""
assert timeout is None or isinstance(timeout, (int, float)), timeout
if timeout is None:
super(Popen, self).wait()
elif self.returncode is None:
if subprocess.mswindows:
WAIT_TIMEOUT = 258
result = subprocess._subprocess.WaitForSingleObject(
self._handle, int(timeout * 1000))
if result == WAIT_TIMEOUT:
raise TimeoutExpired(self.args, timeout)
self.returncode = subprocess._subprocess.GetExitCodeProcess(
self._handle)
else:
# If you think the following code is horrible, it's because it is
# inspired by python3's stdlib.
end = time.time() + timeout
delay = 0.001
while True:
try:
pid, sts = subprocess._eintr_retry_call(
os.waitpid, self.pid, os.WNOHANG)
except OSError as e:
if e.errno != errno.ECHILD:
raise
pid = self.pid
sts = 0
if pid == self.pid:
# This sets self.returncode.
self._handle_exitstatus(sts)
break
remaining = end - time.time()
if remaining <= 0:
raise TimeoutExpired(self.args, timeout)
delay = min(delay * 2, remaining, .05)
time.sleep(delay)
if not self.end:
# communicate() uses wait() internally.
self.end = time.time()
return self.returncode
def poll(self):
ret = super(Popen, self).poll()
if ret is not None and not self.end:
self.end = time.time()
return ret
def yield_any_line(self, **kwargs):
"""Yields lines until the process terminates.
Like yield_any, but yields lines.
"""
return split(self.yield_any(**kwargs))
def yield_any(self, maxsize=None, timeout=None):
"""Yields output until the process terminates.
Unlike wait(), does not raise TimeoutExpired.
Yields:
(pipename, data) where pipename is either 'stdout', 'stderr' or None in
case of timeout or when the child process closed one of the pipe(s) and
all pending data on the pipe was read.
Arguments:
- maxsize: See recv_any(). Can be a callable function.
- timeout: If None, the call is blocking. If set, yields None, None if no
data is available within |timeout| seconds. It resets itself after
each yield. Can be a callable function.
"""
assert self.stdout or self.stderr
if timeout is not None:
# timeout=0 effectively means that the pipe is continuously polled.
if isinstance(timeout, (int, float)):
assert timeout >= 0, timeout
old_timeout = timeout
timeout = lambda: old_timeout
else:
assert callable(timeout), timeout
if maxsize is not None and not callable(maxsize):
assert isinstance(maxsize, (int, float)), maxsize
last_yield = time.time()
while self.poll() is None:
to = timeout() if timeout else None
if to is not None:
to = max(to - (time.time() - last_yield), 0)
t, data = self.recv_any(
maxsize=maxsize() if callable(maxsize) else maxsize, timeout=to)
if data or to is 0:
yield t, data
last_yield = time.time()
# Read all remaining output in the pipes.
# There is 3 cases:
# - pipes get closed automatically by the calling process before it exits
# - pipes are closed automated by the OS
# - pipes are kept open due to grand-children processes outliving the
# children process.
while True:
ms = maxsize
if callable(maxsize):
ms = maxsize()
# timeout=0 is mainly to handle the case where a grand-children process
# outlives the process started.
t, data = self.recv_any(maxsize=ms, timeout=0)
if not data:
break
yield t, data
def recv_any(self, maxsize=None, timeout=None):
"""Reads from the first pipe available from stdout and stderr.
Unlike wait(), does not throw TimeoutExpired.
Arguments:
- maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- timeout: If None, it is blocking. If 0 or above, will return None if no
data is available within |timeout| seconds.
Returns:
tuple(pipename or None, str(data)). pipename is one of 'stdout' or
'stderr'.
"""
# recv_multi_impl will early exit on a closed connection. Loop accordingly
# to simplify call sites.
while True:
pipes = [
x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
]
# If both stdout and stderr have the exact file handle, they are
# effectively the same pipe. Deduplicate it since otherwise it confuses
# recv_multi_impl().
if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
pipes.pop(0)
if not pipes:
return None, None
start = time.time()
conns, names = zip(*pipes)
index, data, closed = recv_multi_impl(conns, maxsize, timeout)
if index is None:
return index, data
if closed:
self._close(names[index])
if not data:
# Loop again. The other pipe may still be open.
if timeout:
timeout -= (time.time() - start)
continue
if self.universal_newlines and data:
data = self._translate_newlines(data)
return names[index], data
def recv_out(self, maxsize=None, timeout=None):
"""Reads from stdout synchronously with timeout."""
return self._recv('stdout', maxsize, timeout)
def recv_err(self, maxsize=None, timeout=None):
"""Reads from stderr synchronously with timeout."""
return self._recv('stderr', maxsize, timeout)
def terminate(self):
"""Tries to do something saner on Windows that the stdlib.
Windows:
self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used:
- If set, only SIGBREAK can be sent and it is sent to a single process.
- If not set, in theory only SIGINT can be used and *all processes* in
the processgroup receive it. In practice, we just kill the process.
See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx
The default on Windows is to call TerminateProcess() always, which is not
useful.
On Posix, always send SIGTERM.
"""
try:
if subprocess.mswindows and self.detached:
return self.send_signal(signal.CTRL_BREAK_EVENT)
super(Popen, self).terminate()
except OSError:
# The function will throw if the process terminated in-between. Swallow
# this.
pass
def kill(self):
"""Kills the process and its children if possible.
Swallows exceptions and return True on success.
"""
if self.gid:
try:
os.killpg(self.gid, signal.SIGKILL)
except OSError:
return False
else:
try:
super(Popen, self).kill()
except OSError:
return False
return True
def _close(self, which):
"""Closes either stdout or stderr."""
getattr(self, which).close()
setattr(self, which, None)
def _recv(self, which, maxsize, timeout):
"""Reads from one of stdout or stderr synchronously with timeout."""
conn = getattr(self, which)
if conn is None:
return None
_, data, closed = recv_multi_impl([conn], maxsize, timeout)
if closed:
self._close(which)
if self.universal_newlines and data:
data = self._translate_newlines(data)
return data
@contextlib.contextmanager
def set_signal_handler(signals, handler):
"""Temporarilly override signals handler.
Useful when waiting for a child process to handle signals like SIGTERM, so the
signal can be propagated to the child process.
"""
previous = {s: signal.signal(s, handler) for s in signals}
try:
yield
finally:
for sig, h in previous.iteritems():
signal.signal(sig, h)
def call(*args, **kwargs):
"""Adds support for timeout."""
timeout = kwargs.pop('timeout', None)
return Popen(*args, **kwargs).wait(timeout)
def check_call(*args, **kwargs):
"""Adds support for timeout."""
retcode = call(*args, **kwargs)
if retcode:
raise CalledProcessError(retcode, kwargs.get('args') or args[0])
return 0
def check_output(*args, **kwargs):
"""Adds support for timeout."""
timeout = kwargs.pop('timeout', None)
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
process = Popen(stdout=PIPE, *args, **kwargs)
output, _ = process.communicate(timeout=timeout)
retcode = process.poll()
if retcode:
raise CalledProcessError(retcode, kwargs.get('args') or args[0], output)
return output
def call_with_timeout(args, timeout, **kwargs):
"""Runs an executable; kill it in case of timeout."""
proc = Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
try:
out, err = proc.communicate(timeout=timeout)
except TimeoutExpired as e:
out = e.output
err = e.stderr
proc.kill()
proc.wait()
return out, err, proc.returncode, proc.duration()
def inhibit_os_error_reporting():
"""Inhibits error reporting UI and core files.
This function should be called as early as possible in the process lifetime.
"""
global _OS_ERROR_REPORTING_INHIBITED
if not _OS_ERROR_REPORTING_INHIBITED:
_OS_ERROR_REPORTING_INHIBITED = True
if sys.platform == 'win32':
# Windows has a bad habit of opening a dialog when a console program
# crashes, rather than just letting it crash. Therefore, when a program
# crashes on Windows, we don't find out until the build step times out.
# This code prevents the dialog from appearing, so that we find out
# immediately and don't waste time waiting for a user to close the dialog.
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
SEM_FAILCRITICALERRORS = 1
SEM_NOGPFAULTERRORBOX = 2
SEM_NOALIGNMENTFAULTEXCEPT = 0x8000
ctypes.windll.kernel32.SetErrorMode(
SEM_FAILCRITICALERRORS|SEM_NOGPFAULTERRORBOX|
SEM_NOALIGNMENTFAULTEXCEPT)
# TODO(maruel): Other OSes.
# - OSX, need to figure out a way to make the following process tree local:
# defaults write com.apple.CrashReporter UseUNC 1
# defaults write com.apple.CrashReporter DialogType none
# - Ubuntu, disable apport if needed.
def split(data, sep='\n'):
"""Splits pipe data by |sep|. Does some buffering.
For example, [('stdout', 'a\nb'), ('stdout', '\n'), ('stderr', 'c\n')] ->
[('stdout', 'a'), ('stdout', 'b'), ('stderr', 'c')].
Args:
data: iterable of tuples (pipe_name, bytes).
Returns:
An iterator of tuples (pipe_name, bytes) where bytes is the input data
but split by sep into separate tuples.
"""
# A dict {pipe_name -> list of pending chunks without separators}
pending_chunks = collections.defaultdict(list)
for pipe_name, chunk in data:
if chunk is None:
# Happens if a pipe is closed.
continue
pending = pending_chunks[pipe_name]
start = 0 # offset in chunk to start |sep| search from
while start < len(chunk):
j = chunk.find(sep, start)
if j == -1:
pending_chunks[pipe_name].append(chunk[start:])
break
to_emit = chunk[start:j]
start = j + 1
if pending:
# prepend and forget
to_emit = ''.join(pending) + to_emit
pending = []
pending_chunks[pipe_name] = pending
yield pipe_name, to_emit
# Emit remaining chunks that don't end with separators as is.
for pipe_name, chunks in sorted(pending_chunks.iteritems()):
if chunks:
yield pipe_name, ''.join(chunks)