blob: 330a0ed52369073632d9116c6b1e87aca33d325c [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This module adds support for multiple processes in the dev_appserver.
Each instance of the application is started as a separate process on a unique
port, for state isolation and parallel execution. A load-balancing process is
also created for each Backend. An API Server process is launched to handle all
memcache and datastore API requests, so that persistent state is shared across
all processes. Application and Backend instances forward their memcache and
datastore requests to the API Server using the remote_api interface.
The base process is considered the Master. It manages all subprocesses, assigns
them ports, and issues /_ah/start requests to Backend instances. Ports are
either fixed (using the base value of --multiprocess_min_port) or randomly
chosen. The Master listens on the --port specified by the user, and forwards
all requests to an App Instance process.
Each balancer forwards incoming requests to the next free instance,
or return with a HTTP 503 error if no free instance is available.
"""
import BaseHTTPServer
import copy
import cStringIO
import errno
import httplib
import logging
import os
import Queue
import signal
import socket
import subprocess
import sys
import threading
import time
import weakref
from google.appengine.api import backendinfo
from google.appengine.api.backends import backends as backends_api
from google.appengine.ext.remote_api import remote_api_stub
from google.appengine.tools import api_server
ARG_ADDRESS = 'address'
ARG_PORT = 'port'
ARG_BACKENDS = 'backends'
ARG_MULTIPROCESS = 'multiprocess'
ARG_MULTIPROCESS_MIN_PORT = 'multiprocess_min_port'
ARG_MULTIPROCESS_API_PORT = 'multiprocess_api_port'
ARG_MULTIPROCESS_API_SERVER = 'multiprocess_api_server'
ARG_MULTIPROCESS_APP_INSTANCE_ID = 'multiprocess_app_instance'
ARG_MULTIPROCESS_BACKEND_ID = 'multiprocess_backend_id'
ARG_MULTIPROCESS_BACKEND_INSTANCE_ID = 'multiprocess_backend_instance_id'
ARG_MULTIPROCESS_FRONTEND_PORT = 'multiprocess_frontend_port'
API_SERVER_HOST = 'localhost'
PATH_DEV_API_SERVER = '/_ah/dev_api_server'
BACKEND_MAX_INSTANCES = 20
class Error(Exception): pass
def SetThreadName(thread, name):
"""Sets the name a of thread, including the GlobalProcess name."""
thread.setName('[%s: %s]' % (GlobalProcess().Type(), name))
class StartInstance(threading.Thread):
"""Thread that periodically attempts to start a backend instance."""
def __init__(self, child):
threading.Thread.__init__(self)
self.child = child
self.setDaemon(True)
SetThreadName(self, 'Start %s' % child)
def run(self):
while True:
self.child.SendStartRequest()
time.sleep(1)
class ChildProcess(object):
def __init__(self,
host,
port,
app_instance=None,
backend_id=None,
instance_id=None,
frontend_port=None):
"""Creates an object representing a child process.
Only one of the given args should be provided (except for instance_id when
backend_id is specified).
Args:
app_instance: (int) The process represents the indicated app instance.
backend_id: (string) The process represents a backend.
instance_id: (int) The process represents the given backend instance.
frontend_port: (int) for backends, the frontend port.
"""
self.app_instance = app_instance
self.backend_id = backend_id
self.instance_id = instance_id
self.process = None
self.argv = []
self.started = False
self.connection_handler = httplib.HTTPConnection
self.SetHostPort(host, port)
self.frontend_port = frontend_port
def __str__(self):
if self.app_instance is not None:
string = 'App Instance'
elif self.instance_id is not None:
string = 'Backend Instance: %s.%d' % (self.backend_id, self.instance_id)
elif self.backend_id:
string = 'Backend Balancer: %s' % self.backend_id
else:
string = 'Unknown'
return '%s [%s]' % (string, self.Address())
def SetHostPort(self, host, port):
"""Sets the host and port that this process listens on."""
self.host = host
self.port = port
if self.backend_id:
backends_api._set_dev_port(self.port,
self.backend_id,
self.instance_id)
def Address(self):
"""Returns the URL for this process."""
return 'http://%s:%d' % self.HostPort()
def HostPort(self):
"""Returns the address of this process as a (host, port) pair."""
return (self.host, self.port)
def Start(self, argv, api_port):
"""Starts the child process.
Args:
argv: The argv of the parent process. When starting the subprocess,
we make a copy of the parent's argv, then modify it in accordance with
how the ChildProcess is configured, to represent different processes in
the multiprocess dev_appserver.
api_port: The port on which the API Server listens.
"""
self.argv = copy.deepcopy(argv)
self.api_port = api_port
self.SetFlag('--multiprocess')
self.SetFlag('--address', short_flag='-a', value=self.host)
self.SetFlag('--port', short_flag='-p', value=self.port)
self.SetFlag('--multiprocess_api_port', value=self.api_port)
if self.frontend_port is not None:
self.SetFlag('--multiprocess_frontend_port', value=self.frontend_port)
if self.app_instance is not None:
self.SetFlag('--multiprocess_app_instance_id', value=0)
if self.backend_id is not None:
self.SetFlag('--multiprocess_backend_id', value=self.backend_id)
if self.instance_id is not None:
self.SetFlag('--multiprocess_backend_instance_id', value=self.instance_id)
if self.argv[0].endswith('.py'):
self.argv.insert(0, sys.executable)
logging.debug('Starting %s with args: %s', self, self.argv)
self.process = subprocess.Popen(self.argv)
def EnableStartRequests(self):
"""Starts a thread to periodically send /_ah/start to this instance.
We need a thread to do this because we want to restart any resident Backends
that have been shutdown, and because a backend instance is not considered
to be ready for serving until it has successfully responded to /_ah/start.
"""
if self.backend_id and self.instance_id is not None:
self.start_thread = StartInstance(self)
self.start_thread.start()
def Connect(self):
"""Attempts to connect to the child process.
Returns:
bool: Whether a connection was made.
"""
logging.debug('Attempting connection to %s', self)
sock = None
result = True
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(self.HostPort())
except:
result = False
if sock:
sock.close()
return result
def WaitForConnection(self, timeout_s=30.0, poll_period_s=0.5):
"""Blocks until the child process has started.
This method repeatedly attempts to connect to the process on its HTTP server
port. Returns when a connection has been successfully established or the
timeout has been reached.
Args:
timeout_s: Amount of time to wait, in seconds.
poll_period_s: Time to wait between connection attempts.
"""
finish_time = time.time() + timeout_s
while time.time() < finish_time:
if self.Connect():
return True
time.sleep(poll_period_s)
logging.info('%s took more than %d seconds to start.', self, timeout_s)
return False
def SendStartRequest(self):
"""If the process has not been started, sends a request to /_ah/start."""
if self.started:
return
try:
response = self.SendRequest('GET', '/_ah/start')
rc = response.status
if (rc >= 200 and rc < 300) or rc == 404:
self.started = True
except KeyboardInterrupt:
pass
except Exception, e:
logging.error('Failed start request to %s: %s', self, e)
def SendRequest(self, command, path, payload=None, headers=None):
"""Sends an HTTP request to this process.
Args:
command: The HTTP command (e.g., GET, POST)
path: The URL path for the request.
headers: A dictionary containing headers as key-value pairs.
"""
logging.debug('send request: %s %s to %s' % (command, path, self))
connection = self.connection_handler('%s:%d' % self.HostPort())
connection.request(command, path, payload, headers or {})
response = connection.getresponse()
return response
def SetFlag(self, flag, short_flag=None, value=None):
"""Add a flag to self.argv, replacing the existing value if set.
Args:
flag: flag to remove.
short_flag: one letter short version of the flag (optional)
value: Value of the flag (optional)
"""
self.RemoveFlag(flag, short_flag=short_flag, has_value=(value is not None))
if value is None:
self.argv.append(flag)
else:
self.argv.append(flag + '=' + str(value))
def RemoveFlag(self, flag, short_flag=None, has_value=False):
"""Removes an argument from self.argv.
Args:
flag: flag to remove.
short_flag: one letter short version of the flag
has_value: True if the next argument after the short flag is the value.
"""
new_argv = []
index = 0
while index < len(self.argv):
value = self.argv[index]
index += 1
if flag == value:
if has_value:
index += 1
continue
if has_value and value.startswith(flag + '='):
continue
if short_flag == value:
if has_value:
index += 1
continue
new_argv.append(value)
self.argv = new_argv
class DevProcess(object):
"""Represents a process in the multiprocess dev_appserver."""
TYPE_MASTER = 'Master'
TYPE_APP_INSTANCE = 'App Instance'
TYPE_BACKEND_BALANCER = 'Backend Balancer'
TYPE_BACKEND_INSTANCE = 'Backend Instance'
TYPES = frozenset([TYPE_MASTER,
TYPE_APP_INSTANCE,
TYPE_BACKEND_BALANCER,
TYPE_BACKEND_INSTANCE])
def __init__(self):
"""Creates a DevProcess with a default configuration."""
self.process_type = None
self.desc = None
self.http_server = None
self.app_id = None
self.backends = None
self.app_instance = None
self.backend_id = None
self.instance_id = None
self.backend_entry = None
self.host = None
self.port = None
self.api_port = None
self.multiprocess_min_port = 9000
self.children = []
self.child_app_instance = None
self.child_api_server = None
self.balance_set = None
self.started = False
def Init(self, appinfo, backends, options):
"""Supplies a list of backends for future use.
Args:
appinfo: An AppInfoExternal object.
backends: List of BackendEntry objects.
options: Dictionary of command-line options.
"""
self.backends = backends
self.options = options
self.app_id = appinfo.application
self.host = options[ARG_ADDRESS]
self.port = options[ARG_PORT]
if ARG_MULTIPROCESS_APP_INSTANCE_ID in options:
self.SetType(DevProcess.TYPE_APP_INSTANCE)
self.app_instance = options[ARG_MULTIPROCESS_APP_INSTANCE_ID]
self.desc = str(self.app_instance)
if ARG_MULTIPROCESS_BACKEND_ID in options:
self.backend_id = options[ARG_MULTIPROCESS_BACKEND_ID]
self.desc = self.backend_id
if ARG_MULTIPROCESS_BACKEND_INSTANCE_ID in options:
self.SetType(DevProcess.TYPE_BACKEND_INSTANCE)
self.instance_id = int(options[ARG_MULTIPROCESS_BACKEND_INSTANCE_ID])
self.desc += '.%d' % self.instance_id
else:
self.SetType(DevProcess.TYPE_BACKEND_BALANCER)
if ARG_MULTIPROCESS_API_PORT in options:
self.api_port = int(options[ARG_MULTIPROCESS_API_PORT])
if ARG_MULTIPROCESS_MIN_PORT in options:
self.multiprocess_min_port = int(options[ARG_MULTIPROCESS_MIN_PORT])
if self.IsBackend():
self.InitBackendEntry()
if not self.Type():
self.SetType(DevProcess.TYPE_MASTER)
def InitBackendEntry(self):
"""Finds the entry for the backend this process represents, if any."""
for backend in self.backends:
if backend.name == self.backend_id:
self.backend_entry = backend
if not self.backend_entry:
raise Error('No backend entry found for: ' % self)
def HttpServer(self):
"""Returns the HTTPServer used by this process."""
return self.http_server
def Address(self):
"""Returns the address of this process."""
return 'http://%s:%d' % (self.host, self.port)
def SetHttpServer(self, http_server):
"""Sets the http_server to be used when handling requests.
Args:
http_server: An HTTPServer that receives requests.
"""
self.http_server = http_server
self.handle_requests = HandleRequestThread()
self.handle_requests.start()
def StartChildren(self, argv, options):
"""Starts the set of child processes."""
self.children = []
base_port = self.multiprocess_min_port
self.frontend_port = base_port
next_port = base_port
self.child_app_instance = ChildProcess(self.host, next_port,
app_instance=0)
self.children.append(self.child_app_instance)
next_port += 1
for backend in self.backends:
base_port += 100
next_port = base_port
for i in xrange(backend.instances):
self.children.append(ChildProcess(self.host, next_port,
backend_id=backend.name,
instance_id=i,
frontend_port=self.frontend_port))
next_port += 1
self.children.append(ChildProcess(self.host, base_port + 99,
backend_id=backend.name))
base_port += 100
next_port = base_port
self.child_api_server = api_server.APIServerProcess(
executable=sys.executable,
script=os.path.join(os.path.dirname(argv[0]), 'api_server.py'),
host=self.host,
port=next_port,
app_id=self.app_id,
application_host=options['address'],
application_port=options['port'],
application_root=options['root_path'],
auto_id_policy=options['auto_id_policy'],
blobstore_path=options['blobstore_path'],
clear_datastore=options['clear_datastore'],
clear_prospective_search=options['clear_prospective_search'],
datastore_path=options['datastore_path'],
enable_sendmail=options['enable_sendmail'],
enable_task_running=not options['disable_task_running'],
high_replication=options['high_replication'],
logs_path=options['logs_path'],
prospective_search_path=options['prospective_search_path'],
require_indexes=options['require_indexes'],
show_mail_body=options['show_mail_body'],
smtp_host=options['smtp_host'],
smtp_password=options['smtp_password'],
smtp_port=options['smtp_port'],
smtp_user=options['smtp_user'],
task_retry_seconds=options['task_retry_seconds'],
trusted=options['trusted'],
use_sqlite=options['use_sqlite'],
)
self.child_api_server.Start()
if self.multiprocess_min_port == 0:
self.AssignPortsRandomly()
self.api_port = next_port
for child in self.children:
child.Start(argv, self.api_port)
self.child_api_server.WaitUntilServing()
for child in self.children:
child.WaitForConnection()
message = '\n\nMultiprocess Setup Complete:'
message += '\n Remote API Server [%s]' % self.child_api_server.url
for child in self.children:
message += '\n %s' % child
message += '\n'
logging.info(message)
for child in self.children:
child.EnableStartRequests()
def AssignPortsRandomly(self):
"""Acquires a random port for each child process."""
bound = []
for child in self.children:
sock = socket.socket()
sock.bind(('localhost', 0))
bound.append(sock)
child.SetHostPort(self.host, sock.getsockname()[1])
for sock in bound:
sock.close()
def __str__(self):
result = '[%s]' % self.Type()
if self.desc:
result += ' [%s]' % self.desc
return result
def SetType(self, process_type):
if process_type not in DevProcess.TYPES:
raise Error('Unknown process type: %s' % process_type)
if self.process_type is not None:
raise Error('Process type cannot be set more than once.')
self.process_type = process_type
def Type(self):
return self.process_type
def IsDefault(self):
"""Indicates whether this is the default dev_appserver process."""
return self.Type() is None
def IsMaster(self):
"""Indicates whether this is the master process."""
return self.Type() == DevProcess.TYPE_MASTER
def IsSubprocess(self):
"""Indicates that this is a subprocessess of the dev_appserver."""
return not (self.IsDefault() or self.IsMaster())
def IsAppInstance(self):
"""Indicates whether this process represents an application instance."""
return self.Type() == DevProcess.TYPE_APP_INSTANCE
def IsBackend(self):
"""Indicates whether this process represents a backend."""
return self.IsBackendBalancer() or self.IsBackendInstance()
def IsBackendBalancer(self):
"""Indicates whether this process represents a backend load balancer."""
return self.Type() == DevProcess.TYPE_BACKEND_BALANCER
def IsBackendInstance(self):
"""Indicates whether this process represents a backend instance."""
return self.Type() == DevProcess.TYPE_BACKEND_INSTANCE
def IsBalancer(self):
"""Indicates whether this process represents a load balancer."""
return self.IsMaster() or self.IsBackendBalancer()
def IsInstance(self):
"""Indicates whether this process represents an instance."""
return self.IsAppInstance() or self.IsBackendInstance()
def InitBalanceSet(self):
"""Construct a list of instances to balance traffic over."""
if self.IsMaster():
self.balance_set = [ self.child_app_instance.port ]
if self.IsBackendBalancer():
self.balance_set = []
for instance in xrange(self.backend_entry.instances):
port = backends_api._get_dev_port(self.backend_id, instance)
self.balance_set.append(port)
def GetBalanceSet(self):
"""Return the set of ports over which this process balances requests."""
return self.balance_set
def FailFast(self):
"""Indicates whether this process has fail-fast behavior."""
if not self.backend_entry:
return False
if self.backend_entry.failfast:
return True
return False
def PrintStartMessage(self, app_id, host, port):
"""Print the start message for processes that are started automatically."""
url = 'http://%s:%d' % (host, port)
admin_url = '%s/_ah/admin' % url
if not self.IsSubprocess():
logging.info('Running application %s on port %d: %s',
app_id, port, url)
logging.info('Admin console is available at: %s',
admin_url)
def Children(self):
"""Returns the children of this process."""
return self.children
def MaybeConfigureRemoteDataApis(self):
"""Set up stubs using remote_api as appropriate.
If this is the API server (or is not multiprocess), return False.
Otherwise, set up the stubs for data based APIs as remote stubs pointing at
the to the API server and return True.
"""
if self.IsDefault():
return False
services = (
'app_identity_service',
'capability_service',
'datastore_v3',
'mail',
'memcache',
'taskqueue',
'urlfetch',
'xmpp',
)
remote_api_stub.ConfigureRemoteApi(
self.app_id, PATH_DEV_API_SERVER, lambda: ('', ''),
servername='%s:%d' % (API_SERVER_HOST, self.api_port),
services=services, use_remote_datastore=False)
return True
def NewAppInfo(self, appinfo):
"""Called when a new appinfo is read from disk on each request.
The only action we take is to apply backend settings, such as the 'start'
directive, which adds a handler for /_ah/start.
Args:
appinfo: An AppInfoExternal to be used on the next request.
"""
if self.backends:
appinfo.backends = self.backends
if self.IsBackend():
appinfo.ApplyBackendSettings(self.backend_id)
def UpdateEnv(self, env_dict):
"""Copies backend port information to the supplied environment dictionary.
This information is used by the Backends API to resolve backend and instance
addresses in the dev_appserver.
User-supplied code has no access to the default environment. This method
will copy the environment variables needed for the backends api from the
default environment to the environment where user supplied code runs.
Args:
env_dict: Dictionary with the new environment.
"""
if self.backend_id:
env_dict['BACKEND_ID'] = self.backend_id
if self.instance_id is not None:
env_dict['INSTANCE_ID'] = str(self.instance_id)
for key in os.environ:
if key.startswith('BACKEND_PORT'):
env_dict[key] = os.environ[key]
def ProcessRequest(self, request, client_address):
"""Handles the SocketServer process_request call.
If the request is to a backend the request will be handled by a separate
thread. If the backend is busy a 503 response will be sent.
If this is a balancer instance each incoming request will be forwarded to
its own thread and handled there.
If no backends are configured this override has no effect.
Args:
http_server: The http server handling the request
request: the request to process
client_address: the client address
"""
assert not self.IsDefault()
if self.IsBalancer():
ForwardRequestThread(request, client_address).start()
return
assert self.IsAppInstance() or self.IsBackendInstance()
if self.handle_requests.Active():
if self.FailFast():
logging.info('respond busy')
RespondBusyHandler(request, client_address)
return
self.handle_requests.Enqueue(request, client_address)
def HandleRequest(self, request):
"""Hook that allows the DevProcess a chance to respond to requests.
This hook is invoked just before normal request dispatch occurs in
dev_appserver.py.
Args:
request: The request to be handled.
Returns:
bool: Indicates whether the request was handled here. If False, normal
request handling should proceed.
"""
if self.IsBackendInstance() and not self.started:
if request.path != '/_ah/start':
request.send_response(httplib.FORBIDDEN,
'Waiting for start request to finish.')
return True
return False
def RequestComplete(self, request, response):
"""Invoked when the process has finished handling a request."""
rc = response.status_code
if request.path == '/_ah/start':
if (rc >= 200 and rc < 300) or rc == 404:
self.started = True
def UpdateSystemStub(self, system_service_stub):
"""Copies info about the backends into the system stub."""
if self.IsDefault():
return
system_service_stub.set_backend_info(self.backends)
class HandleRequestThread(threading.Thread):
"""Thread for handling HTTP requests.
Instances needs to be able to respond with 503 when busy with other requests,
therefore requests are accepted in the main thread and forwarded to the
serving thread for processing. If the serving thread is busy with other
requests and the max pending queue length is reached a 503 error is sent back.
Args:
http_server: Http server class handling the request.
max_pending_requests: The maximum number of pending requests in the queue.
"""
def __init__(self):
threading.Thread.__init__(self)
self.setDaemon(True)
SetThreadName(self, 'HandleRequestThread')
self.active = False
self.pending = Queue.Queue()
def Active(self):
"""Indicates whether this thread is busy handling a request."""
return self.active
def Enqueue(self, request, client_address):
"""Adds the indicated request to the pending request queue."""
self.pending.put_nowait((request, client_address))
def run(self):
"""Takes requests from the queue and handles them."""
while True:
request, client_address = self.pending.get()
self.active = True
try:
HandleRequestDirectly(request, client_address)
except Exception, e:
logging.info('Exception in HandleRequestThread', exc_info=1)
finally:
self.active = False
class RespondBusyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""Handler that always will send back a 503 error."""
def __init__(self, request, client_address):
BaseHTTPServer.BaseHTTPRequestHandler.__init__(
self, request, client_address, HttpServer())
def handle_one_request(self):
"""Override."""
self.raw_requestline = self.rfile.readline()
if not self.raw_requestline:
self.close_connection = 1
return
if not self.parse_request():
return
self.send_error(httplib.SERVICE_UNAVAILABLE, 'Busy.')
class ForwardRequestThread(threading.Thread):
"""Forwards an incoming request in a separate thread."""
def __init__(self, request, client_address):
threading.Thread.__init__(self)
self.request = request
self.client_address = client_address
self.setDaemon(True)
SetThreadName(self, 'ForwardRequestThread')
def run(self):
ForwardRequestHandler(self.request, self.client_address)
class ForwardRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""Forwards the incoming request to next free backend instance."""
def __init__(self,
request,
client_address,
connection_handler=httplib.HTTPConnection):
"""Constructor extending BaseHTTPRequestHandler.
Args:
request: The incoming request.
client_address: A (ip, port) tuple with the address of the client.
backend: The HTTPServer that received the request.
connection_handler: http library to use when balancer the connection to
the next available backend instance. Used for dependency injection.
"""
self.connection_handler = connection_handler
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self,
request,
client_address,
HttpServer())
def handle_one_request(self):
"""Override. Invoked from BaseHTTPRequestHandler constructor."""
self.raw_requestline = self.rfile.readline()
if not self.raw_requestline:
self.close_connection = 1
return
if not self.parse_request():
return
process = GlobalProcess()
balance_set = process.GetBalanceSet()
request_size = int(self.headers.get('content-length', 0))
payload = self.rfile.read(request_size)
for port in balance_set:
logging.debug('balancer to port %d', port)
connection = self.connection_handler(process.host, port=port)
connection.response_class = ForwardResponse
connection.request(self.command, self.path, payload, dict(self.headers))
try:
response = connection.getresponse()
except httplib.HTTPException, e:
self.send_error(httplib.INTERNAL_SERVER_ERROR, str(e))
return
if response.status != httplib.SERVICE_UNAVAILABLE:
self.wfile.write(response.data)
return
self.send_error(httplib.SERVICE_UNAVAILABLE, 'Busy')
class ForwardResponse(httplib.HTTPResponse):
"""Modifies the HTTPResponse class so the raw request data is saved.
This class is used by balancer instances when balancer requests to a
backend instance.
"""
def __init__(self, sock, debuglevel=0, strict=0, method=None):
httplib.HTTPResponse.__init__(self, sock, debuglevel, strict, method)
self.data = self.fp.read()
self.fp = cStringIO.StringIO(self.data)
_dev_process = DevProcess()
def GlobalProcess():
"""Returns a global DevProcess object representing the current process."""
return _dev_process
def Enabled():
"""Indicates whether the dev_appserver is running in multiprocess mode."""
return not GlobalProcess().IsDefault()
def HttpServer():
"""Returns the HTTPServer used by this process."""
return GlobalProcess().HttpServer()
def HandleRequestDirectly(request, client_address):
"""Handles the indicated request directly, without additional processing."""
BaseHTTPServer.HTTPServer.process_request(
HttpServer(), request, client_address)
def PosixShutdown():
"""Kills a posix process with os.kill."""
dev_process = GlobalProcess()
children = dev_process.Children()
for term_signal in (signal.SIGTERM, signal.SIGKILL):
for child in children:
if child.process is None:
continue
if child.process.returncode is not None:
continue
pid = child.process.pid
try:
logging.debug('posix kill %d with signal %d', pid, term_signal)
os.kill(pid, term_signal)
except OSError, err:
logging.error('Error encountered sending pid %d signal %d:%s\n',
pid, term_signal, err)
break
time.sleep(0.2)
for child in children:
if child.process is None:
continue
if child.process.returncode is not None:
continue
try:
child.process.wait()
except OSError, e:
if e.errno != errno.ECHILD:
raise e
def Shutdown():
"""Shut down any child processes started."""
dev_process = GlobalProcess()
if not dev_process.IsMaster():
return
if os.name == 'nt':
import ctypes
for child in dev_process.Children():
logging.debug('windows kill ' + str(child.process.pid))
ctypes.windll.kernel32.TerminateProcess(int(child.process._handle), -1)
else:
PosixShutdown()
dev_process.child_api_server.Quit()
def SetLogPrefix(prefix):
"""Adds a prefix to the log handler to identify the process.
Args:
prefix: The prefix string to append at the beginning of each line.
"""
formatter = logging.Formatter(
str(prefix) + ' [%(filename)s:%(lineno)d] %(levelname)s %(message)s')
logging._acquireLock()
try:
for handler in logging._handlerList:
if isinstance(handler, weakref.ref):
handler = handler()
if handler:
handler.setFormatter(formatter)
finally:
logging._releaseLock()
def Init(argv, options, root_path, appinfo):
"""Enter multiprocess mode, if required.
The dev_appserver runs in multiprocess mode if any Backends are configured.
The initial process becomes a "master" which acts as a router for the app, and
centralized memcache/datastore API server for sharing persistent state.
This method works by configuring the global DevProcess object, which is
referenced by other files in the dev_appserver when necessary. The DevProcess
contains state indicating which role the current process plays in the
multiprocess architecture.
The master process creates and shuts down subprocesses. A separate process is
created to represent an instance of the application, and a separate process is
created for each backend (to act as a load balancer) and for each backend
instance.
On shutdown, the master process kills all subprocesses before exiting.
Args:
argv: The command line arguments used when starting the main application.
options: Parsed dictionary of the command line arguments.
root_path: Root directory of the application.
appinfo: An AppInfoExternal object representing a parsed app.yaml file.
"""
if ARG_BACKENDS not in options:
return
backends_path = os.path.join(root_path, 'backends.yaml')
if not os.path.exists(backends_path):
backends = []
else:
backends_fh = open(backends_path)
try:
backend_info = backendinfo.LoadBackendInfo(backends_fh.read())
finally:
backends_fh.close()
backends = backend_info.backends
backend_set = set()
for backend in backends:
if backend.name in backend_set:
raise Error('Duplicate backend: %s' % backend.name)
if backend.instances is None:
backend.instances = 1
elif backend.instances > BACKEND_MAX_INSTANCES:
raise Error('Maximum number of instances is %d', BACKEND_MAX_INSTANCES)
backend_set.add(backend.name)
process = _dev_process
process.Init(appinfo, backends, options)
if process.IsDefault():
logging.info('Default process')
return
SetLogPrefix(process)
if process.IsMaster():
process.StartChildren(argv, options)
process.InitBalanceSet()
if process.IsMaster():
options['require_indexes'] = False
else:
options['require_indexes'] = True
options['clear_datastore'] = False
options['clear_prospective_search'] = False