blob: b565c1c70de529ec6e5f9ebec4edf726fd457d46 [file] [log] [blame]
# Copyright (C) 2010 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the Google name nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Package that implements the ServerProcess wrapper class"""
import errno
import logging
import re
import signal
import sys
import time
# Note that although win32 python does provide an implementation of
# the win32 select API, it only works on sockets, and not on the named pipes
# used by subprocess, so we have to use the native APIs directly.
_quote_cmd = None
if sys.platform == 'win32':
import msvcrt
import win32pipe
import win32file
import subprocess
_quote_cmd = subprocess.list2cmdline
else:
import fcntl
import os
import pipes
import select
_quote_cmd = lambda cmdline: ' '.join(pipes.quote(arg) for arg in cmdline)
_log = logging.getLogger(__name__)
_trailing_spaces_re = re.compile('(.*[^ ])?( +)$')
def quote_data(data):
txt = repr(data).replace('\\n', '\\n\n')[1:-1]
lines = []
for l in txt.splitlines():
m = _trailing_spaces_re.match(l)
if m:
l = m.group(1) + m.group(2).replace(' ', '\x20')
lines.append(l)
return lines
class ServerProcess(object):
"""This class provides a wrapper around a subprocess that
implements a simple request/response usage model. The primary benefit
is that reading responses takes a deadline, so that we don't ever block
indefinitely. The class also handles transparently restarting processes
as necessary to keep issuing commands.
"""
def __init__(self, port_obj, name, cmd, env=None, treat_no_data_as_crash=False,
more_logging=False):
self._port = port_obj
self._name = name # Should be the command name (e.g. content_shell, image_diff)
self._cmd = cmd
self._env = env
self._treat_no_data_as_crash = treat_no_data_as_crash
self._logging = more_logging
self._host = self._port.host
self._proc = None
self._pid = None
self._reset()
# See comment in imports for why we need the win32 APIs and can't just use select.
self._use_win32_apis = sys.platform == 'win32'
def name(self):
return self._name
def pid(self):
return self._pid
def _reset(self):
if getattr(self, '_proc', None):
if self._proc.stdin:
self._proc.stdin.close()
self._proc.stdin = None
if self._proc.stdout:
self._proc.stdout.close()
self._proc.stdout = None
if self._proc.stderr:
self._proc.stderr.close()
self._proc.stderr = None
self._proc = None
self._output = str() # bytesarray() once we require Python 2.6
self._error = str() # bytesarray() once we require Python 2.6
self._crashed = False
self.timed_out = False
def process_name(self):
return self._name
def _start(self):
if self._proc:
raise ValueError('%s already running' % self._name)
self._reset()
# close_fds is a workaround for http://bugs.python.org/issue2320
close_fds = not self._host.platform.is_win()
if self._logging:
env_str = ''
if self._env:
env_str += '\n'.join('%s=%s' % (k, v) for k, v in self._env.items()) + '\n'
_log.info('CMD: \n%s%s\n', env_str, _quote_cmd(self._cmd))
proc = self._host.executive.popen(self._cmd, stdin=self._host.executive.PIPE,
stdout=self._host.executive.PIPE,
stderr=self._host.executive.PIPE,
close_fds=close_fds,
env=self._env)
self._set_proc(proc)
def _set_proc(self, proc):
assert not self._proc
self._proc = proc
self._pid = self._proc.pid
if not self._use_win32_apis:
fd = self._proc.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
fd = self._proc.stderr.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def _handle_possible_interrupt(self):
"""This routine checks to see if the process crashed or exited
because of a keyboard interrupt and raises KeyboardInterrupt
accordingly.
"""
# FIXME: Linux and Mac set the returncode to -signal.SIGINT if a
# subprocess is killed with a ctrl^C. Previous comments in this
# routine said that supposedly Windows returns 0xc000001d, but that's
# not what -1073741510 evaluates to. Figure out what the right value
# is for win32 here ...
if self._proc.returncode in (-1073741510, -signal.SIGINT):
raise KeyboardInterrupt
def poll(self):
"""Check to see if the underlying process is running; returns None
if it still is (wrapper around subprocess.poll).
"""
if self._proc:
return self._proc.poll()
return None
def write(self, bytes):
"""Write a request to the subprocess. The subprocess is (re-)start()'ed
if is not already running.
"""
if not self._proc:
self._start()
try:
self._log_data(' IN', bytes)
self._proc.stdin.write(bytes)
except IOError:
self.stop(0.0)
# stop() calls _reset(), so we have to set crashed to True after calling stop().
self._crashed = True
def _pop_stdout_line_if_ready(self):
index_after_newline = self._output.find('\n') + 1
if index_after_newline > 0:
return self._pop_output_bytes(index_after_newline)
return None
def _pop_stderr_line_if_ready(self):
index_after_newline = self._error.find('\n') + 1
if index_after_newline > 0:
return self._pop_error_bytes(index_after_newline)
return None
def pop_all_buffered_stderr(self):
return self._pop_error_bytes(len(self._error))
def read_stdout_line(self, deadline):
return self._read(deadline, self._pop_stdout_line_if_ready)
def read_stderr_line(self, deadline):
return self._read(deadline, self._pop_stderr_line_if_ready)
def read_either_stdout_or_stderr_line(self, deadline):
def retrieve_bytes_from_buffers():
stdout_line = self._pop_stdout_line_if_ready()
if stdout_line:
return stdout_line, None
stderr_line = self._pop_stderr_line_if_ready()
if stderr_line:
return None, stderr_line
return None # Instructs the caller to keep waiting.
return_value = self._read(deadline, retrieve_bytes_from_buffers)
# FIXME: This is a bit of a hack around the fact that _read normally only
# returns one value, but this caller wants it to return two.
if return_value is None:
return None, None
return return_value
def read_stdout(self, deadline, size):
if size <= 0:
raise ValueError('ServerProcess.read() called with a non-positive size: %d ' % size)
def retrieve_bytes_from_stdout_buffer():
if len(self._output) >= size:
return self._pop_output_bytes(size)
return None
return self._read(deadline, retrieve_bytes_from_stdout_buffer)
def _log(self, message):
# This is a bit of a hack, but we first log a blank line to avoid
# messing up the master process's output.
_log.info('')
_log.info(message)
def _log_data(self, prefix, data):
if self._logging and data and len(data):
for line in quote_data(data):
_log.info('%s: %s', prefix, line)
def _handle_timeout(self):
self.timed_out = True
self._port.sample_process(self._name, self._proc.pid)
def _split_string_after_index(self, string, index):
return string[:index], string[index:]
def _pop_output_bytes(self, bytes_count):
output, self._output = self._split_string_after_index(self._output, bytes_count)
return output
def _pop_error_bytes(self, bytes_count):
output, self._error = self._split_string_after_index(self._error, bytes_count)
return output
def _wait_for_data_and_update_buffers_using_select(self, deadline, stopping=False):
if self._proc.stdout.closed or self._proc.stderr.closed:
# If the process crashed and is using FIFOs, like Chromium Android, the
# stdout and stderr pipes will be closed.
return
out_fd = self._proc.stdout.fileno()
err_fd = self._proc.stderr.fileno()
select_fds = (out_fd, err_fd)
try:
read_fds, _, _ = select.select(select_fds, [], select_fds, max(deadline - time.time(), 0))
except select.error as error:
# We can ignore EINVAL since it's likely the process just crashed and we'll
# figure that out the next time through the loop in _read().
if error.args[0] == errno.EINVAL:
return
raise
try:
# Note that we may get no data during read() even though
# select says we got something; see the select() man page
# on linux. I don't know if this happens on Mac OS and
# other Unixen as well, but we don't bother special-casing
# Linux because it's relatively harmless either way.
if out_fd in read_fds:
data = self._proc.stdout.read()
if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()):
self._crashed = True
self._log_data('OUT', data)
self._output += data
if err_fd in read_fds:
data = self._proc.stderr.read()
if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()):
self._crashed = True
self._log_data('ERR', data)
self._error += data
except IOError:
# We can ignore the IOErrors because we will detect if the
# subprocess crashed the next time through the loop in _read().
pass
def _wait_for_data_and_update_buffers_using_win32_apis(self, deadline):
# See http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
# and http://docs.activestate.com/activepython/2.6/pywin32/modules.html
# for documentation on all of these win32-specific modules.
now = time.time()
out_fh = msvcrt.get_osfhandle(self._proc.stdout.fileno())
err_fh = msvcrt.get_osfhandle(self._proc.stderr.fileno())
while (self._proc.poll() is None) and (now < deadline):
output = self._non_blocking_read_win32(out_fh)
self._log_data('OUT', output)
error = self._non_blocking_read_win32(err_fh)
self._log_data('ERR', error)
if output or error:
if output:
self._output += output
if error:
self._error += error
return
time.sleep(0.01)
now = time.time()
return
def _non_blocking_read_win32(self, handle):
try:
_, avail, _ = win32pipe.PeekNamedPipe(handle, 0)
if avail > 0:
_, buf = win32file.ReadFile(handle, avail, None)
return buf
except Exception as error: # pylint: disable=broad-except
if error[0] not in (109, errno.ESHUTDOWN): # 109 == win32 ERROR_BROKEN_PIPE
raise
return None
def has_crashed(self):
if not self._crashed and self.poll():
self._crashed = True
self._handle_possible_interrupt()
return self._crashed
# This read function is a bit oddly-designed, as it polls both stdout and stderr, yet
# only reads/returns from one of them (buffering both in local self._output/self._error).
# It might be cleaner to pass in the file descriptor to poll instead.
def _read(self, deadline, fetch_bytes_from_buffers_callback):
while True:
if self.has_crashed():
return None
if time.time() > deadline:
self._handle_timeout()
return None
bytes = fetch_bytes_from_buffers_callback()
if bytes is not None:
return bytes
if self._use_win32_apis:
self._wait_for_data_and_update_buffers_using_win32_apis(deadline)
else:
self._wait_for_data_and_update_buffers_using_select(deadline)
def start(self):
if not self._proc:
self._start()
def stop(self, timeout_secs=0.0):
if not self._proc:
return (None, None)
now = time.time()
if self._proc.stdin:
if self._logging:
_log.info(' IN: ^D')
self._proc.stdin.close()
self._proc.stdin = None
killed = False
if timeout_secs:
deadline = now + timeout_secs
while self._proc.poll() is None and time.time() < deadline:
time.sleep(0.01)
if self._proc.poll() is None:
_log.warning('stopping %s(pid %d) timed out, killing it', self._name, self._proc.pid)
if self._proc.poll() is None:
self._kill()
killed = True
_log.debug('killed pid %d', self._proc.pid)
# read any remaining data on the pipes and return it.
if not killed:
if self._use_win32_apis:
self._wait_for_data_and_update_buffers_using_win32_apis(now)
else:
self._wait_for_data_and_update_buffers_using_select(now, stopping=True)
out, err = self._output, self._error
self._reset()
return (out, err)
def kill(self):
self.stop(0.0)
def _kill(self):
self._host.executive.kill_process(self._proc.pid)
if self._proc.poll() is not None:
self._proc.wait()
def replace_input(self, stdin):
assert self._proc
if stdin:
self._proc.stdin.close()
self._proc.stdin = stdin
def replace_outputs(self, stdout, stderr):
assert self._proc
if stdout:
self._proc.stdout.close()
self._proc.stdout = stdout
if stderr:
self._proc.stderr.close()
self._proc.stderr = stderr