blob: 68856ec55c3d68821fd434a4194a98f3eef43008 [file] [log] [blame]
# Copyright 2013 The Swarming Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0 that
# can be found in the LICENSE file.
"""subprocess42 is the answer to life the universe and everything.
It has the particularity of having a Popen implementation that can yield output
as it is produced while implementing a timeout and not requiring the use of
worker threads.
TODO(maruel): Add VOID and TIMED_OUT support like subprocess2.
"""
import contextlib
import logging
import os
import signal
import time
import subprocess
from subprocess import CalledProcessError, PIPE, STDOUT # pylint: disable=W0611
from subprocess import call, check_output # pylint: disable=W0611
# Default maxsize argument.
MAX_SIZE = 16384
if subprocess.mswindows:
import msvcrt # pylint: disable=F0401
from ctypes import wintypes
from ctypes import windll
# Which to be received depends on how this process was called and outside the
# control of this script. See Popen docstring for more details.
STOP_SIGNALS = (signal.SIGBREAK, signal.SIGTERM)
def ReadFile(handle, desired_bytes):
"""Calls kernel32.ReadFile()."""
c_read = wintypes.DWORD()
buff = wintypes.create_string_buffer(desired_bytes+1)
windll.kernel32.ReadFile(
handle, buff, desired_bytes, wintypes.byref(c_read), None)
# NULL terminate it.
buff[c_read.value] = '\x00'
return wintypes.GetLastError(), buff.value
def PeekNamedPipe(handle):
"""Calls kernel32.PeekNamedPipe(). Simplified version."""
c_avail = wintypes.DWORD()
c_message = wintypes.DWORD()
success = windll.kernel32.PeekNamedPipe(
handle, None, 0, None, wintypes.byref(c_avail),
wintypes.byref(c_message))
if not success:
raise OSError(wintypes.GetLastError())
return c_avail.value
def recv_multi_impl(conns, maxsize, timeout):
"""Reads from the first available pipe.
It will immediately return on a closed connection, independent of timeout.
Arguments:
- maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- timeout: If None, it is blocking. If 0 or above, will return None if no
data is available within |timeout| seconds.
Returns:
tuple(int(index), str(data), bool(closed)).
"""
assert conns
assert timeout is None or isinstance(timeout, (int, float)), timeout
maxsize = max(maxsize or MAX_SIZE, 1)
# TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
# for proc.stdout and proc.stderr but they are implemented as named pipes on
# Windows. Since named pipes are not waitable object, they can't be passed
# as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
# and finally WFMO(). This requires caching the events handles in the Popen
# object and remembering the pending ReadFile() calls. This will require
# some re-architecture to store the relevant event handle and OVERLAPPEDIO
# object in Popen or the file object.
start = time.time()
handles = [
(i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
]
while True:
for index, handle in handles:
try:
avail = min(PeekNamedPipe(handle), maxsize)
if avail:
return index, ReadFile(handle, avail)[1], False
except OSError:
# The pipe closed.
return index, None, True
if timeout is not None and (time.time() - start) >= timeout:
return None, None, False
# Polling rocks.
time.sleep(0.001)
else:
import fcntl # pylint: disable=F0401
import select
# Signals that mean this process should exit quickly.
STOP_SIGNALS = (signal.SIGINT, signal.SIGTERM)
def recv_multi_impl(conns, maxsize, timeout):
"""Reads from the first available pipe.
It will immediately return on a closed connection, independent of timeout.
Arguments:
- maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- timeout: If None, it is blocking. If 0 or above, will return None if no
data is available within |timeout| seconds.
Returns:
tuple(int(index), str(data), bool(closed)).
"""
assert conns
assert timeout is None or isinstance(timeout, (int, float)), timeout
maxsize = max(maxsize or MAX_SIZE, 1)
# select(timeout=0) will block, it has to be a value > 0.
if timeout == 0:
timeout = 0.001
try:
r, _, _ = select.select(conns, [], [], timeout)
except select.error:
r = None
if not r:
return None, None, False
conn = r[0]
# Temporarily make it non-blocking.
# TODO(maruel): This is not very ifficient when the caller is doing this in
# a loop. Add a mechanism to have the caller handle this.
flags = fcntl.fcntl(conn, fcntl.F_GETFL)
if not conn.closed:
# pylint: disable=E1101
fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
try:
data = conn.read(maxsize)
if not data:
# On posix, this means the channel closed.
return conns.index(conn), None, True
return conns.index(conn), data, False
finally:
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags)
class Popen(subprocess.Popen):
"""Adds timeout support on stdout and stderr.
Inspired by
http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
Arguments:
- detached: If True, the process is created in a new process group. On
Windows, use CREATE_NEW_PROCESS_GROUP. On posix, use os.setpgid(0, 0).
Additional members:
- start: timestamp when this process started.
- end: timestamp when this process exited, as seen by this process.
- detached: If True, the child process was started as a detached process.
- gid: process group id, if any.
"""
def __init__(self, args, **kwargs):
assert 'creationflags' not in kwargs
assert 'preexec_fn' not in kwargs
self.start = time.time()
self.end = None
self.gid = None
self.detached = kwargs.pop('detached', False)
if self.detached:
if subprocess.mswindows:
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
else:
kwargs['preexec_fn'] = lambda: os.setpgid(0, 0)
super(Popen, self).__init__(args, **kwargs)
if self.detached and not subprocess.mswindows:
self.gid = os.getpgid(self.pid)
def duration(self):
"""Duration of the child process.
It is greater or equal to the actual time the child process ran. It can be
significantly higher than the real value if neither .wait() nor .poll() was
used.
"""
return (self.end or time.time()) - self.start
def wait(self):
ret = super(Popen, self).wait()
if not self.end:
# communicate() uses wait() internally.
self.end = time.time()
return ret
def poll(self):
ret = super(Popen, self).poll()
if ret is not None and not self.end:
self.end = time.time()
return ret
def yield_any(self, maxsize=None, hard_timeout=None, soft_timeout=None):
"""Yields output until the process terminates or is killed by a timeout.
Yielded values are in the form (pipename, data).
Arguments:
- maxsize: See recv_any(). Can be a callable function.
- hard_timeout: If None, the process is never killed. If set, the process is
killed after |hard_timeout| seconds. Can be a callable function.
- soft_timeout: If None, the call is blocking. If set, yields None, None
if no data is available within |soft_timeout| seconds. It resets
itself after each yield. Can be a callable function.
"""
if hard_timeout is not None:
# hard_timeout=0 means the process is not even given a little chance to
# execute and will be immediately killed.
if isinstance(hard_timeout, (int, float)):
assert hard_timeout > 0., hard_timeout
old_hard_timeout = hard_timeout
hard_timeout = lambda: old_hard_timeout
if soft_timeout is not None:
# soft_timeout=0 effectively means that the pipe is continuously polled.
if isinstance(soft_timeout, (int, float)):
assert soft_timeout >= 0, soft_timeout
old_soft_timeout = soft_timeout
soft_timeout = lambda: old_soft_timeout
else:
assert callable(soft_timeout), soft_timeout
last_yield = time.time()
while self.poll() is None:
ms = maxsize
if callable(maxsize):
ms = maxsize()
t, data = self.recv_any(
maxsize=ms,
timeout=self._calc_timeout(hard_timeout, soft_timeout, last_yield))
if data or soft_timeout is not None:
yield t, data
last_yield = time.time()
if hard_timeout and self.duration() >= hard_timeout():
break
if self.poll() is None and hard_timeout:
logging.debug('Kill %s %s', self.duration(), hard_timeout())
self.kill()
self.wait()
# Read all remaining output in the pipes.
# There is 3 cases:
# - pipes get closed automatically by the calling process before it exits
# - pipes are closed automated by the OS
# - pipes are kept open due to grand-children processes outliving the
# children process.
while True:
ms = maxsize
if callable(maxsize):
ms = maxsize()
# timeout=0 is mainly to handle the case where a grand-children process
# outlives the process started.
t, data = self.recv_any(maxsize=ms, timeout=0)
if not data:
break
yield t, data
def _calc_timeout(self, hard_timeout, soft_timeout, last_yield):
"""Returns the timeout to be used on the next recv_any() in yield_any().
It depends on both timeout. It returns None if no timeout is used. Otherwise
it returns a value >= 0.001. It's not 0 because it's effectively polling, on
linux it can peg a single core, so adding 1ms sleep does a tremendous
difference.
"""
hard_remaining = (
None if hard_timeout is None
else max(hard_timeout() - self.duration(), 0))
soft_remaining = (
None if soft_timeout is None
else max(soft_timeout() - (time.time() - last_yield), 0))
if hard_remaining is None:
return soft_remaining
if soft_remaining is None:
return hard_remaining
return min(hard_remaining, soft_remaining)
def recv_any(self, maxsize=None, timeout=None):
"""Reads from the first pipe available from stdout and stderr.
Arguments:
- maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
- timeout: If None, it is blocking. If 0 or above, will return None if no
data is available within |timeout| seconds.
Returns:
tuple(int(index) or None, str(data)).
"""
# recv_multi_impl will early exit on a closed connection. Loop accordingly
# to simplify call sites.
while True:
pipes = [
x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
]
# If both stdout and stderr have the exact file handle, they are
# effectively the same pipe. Deduplicate it since otherwise it confuses
# recv_multi_impl().
if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
pipes.pop(0)
if not pipes:
return None, None
start = time.time()
conns, names = zip(*pipes)
index, data, closed = recv_multi_impl(conns, maxsize, timeout)
if index is None:
return index, data
if closed:
self._close(names[index])
if not data:
# Loop again. The other pipe may still be open.
if timeout:
timeout -= (time.time() - start)
continue
if self.universal_newlines:
data = self._translate_newlines(data)
return names[index], data
def recv_out(self, maxsize=None, timeout=None):
"""Reads from stdout synchronously with timeout."""
return self._recv('stdout', maxsize, timeout)
def recv_err(self, maxsize=None, timeout=None):
"""Reads from stderr synchronously with timeout."""
return self._recv('stderr', maxsize, timeout)
def terminate(self):
"""Tries to do something saner on Windows that the stdlib.
Windows:
self.detached/CREATE_NEW_PROCESS_GROUP determines what can be used:
- If set, only SIGBREAK can be sent and it is sent to a single process.
- If not set, in theory only SIGINT can be used and *all processes* in
the processgroup receive it. In practice, we just kill the process.
See http://msdn.microsoft.com/library/windows/desktop/ms683155.aspx
The default on Windows is to call TerminateProcess() always, which is not
useful.
On Posix, always send SIGTERM.
"""
try:
if subprocess.mswindows and self.detached:
return self.send_signal(signal.CTRL_BREAK_EVENT)
super(Popen, self).terminate()
except OSError:
# The function will throw if the process terminated in-between. Swallow
# this.
pass
def kill(self):
"""Kills the process and its children if possible.
Swallows exceptions and return True on success.
"""
if self.gid:
try:
os.killpg(self.gid, signal.SIGKILL)
except OSError:
return False
else:
try:
super(Popen, self).kill()
except OSError:
return False
return True
def _close(self, which):
"""Closes either stdout or stderr."""
getattr(self, which).close()
setattr(self, which, None)
def _recv(self, which, maxsize, timeout):
"""Reads from one of stdout or stderr synchronously with timeout."""
conn = getattr(self, which)
if conn is None:
return None
_, data, closed = recv_multi_impl([conn], maxsize, timeout)
if closed:
self._close(which)
if self.universal_newlines and data:
data = self._translate_newlines(data)
return data
def call_with_timeout(args, timeout, **kwargs):
"""Runs an executable with an optional timeout.
timeout 0 or None disables the timeout.
"""
proc = Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
**kwargs)
if timeout:
out = ''
err = ''
for t, data in proc.yield_any(hard_timeout=timeout):
if t == 'stdout':
out += data
else:
err += data
else:
# This code path is much faster.
out, err = proc.communicate()
return out, err, proc.returncode, proc.duration()
@contextlib.contextmanager
def set_signal_handler(signals, handler):
"""Temporarilly override signals handler."""
previous = dict((s, signal.signal(s, handler)) for s in signals)
yield None
for s in signals:
signal.signal(s, previous[s])
@contextlib.contextmanager
def Popen_with_handler(args, **kwargs):
proc = None
def handler(_signum, _frame):
if proc:
proc.terminate()
with set_signal_handler(STOP_SIGNALS, handler):
proc = Popen(args, detached=True, **kwargs)
try:
yield proc
finally:
proc.kill()