blob: c548a83fd3b090907a200b0d28c03a67efc8d202 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Manage the lifecycle of modules and dispatch requests to them."""
import collections
import logging
import threading
import urlparse
import wsgiref.headers
from google.appengine.api import request_info
from google.appengine.tools.devappserver2 import constants
from google.appengine.tools.devappserver2 import instance
from google.appengine.tools.devappserver2 import scheduled_executor
from google.appengine.tools.devappserver2 import module
from google.appengine.tools.devappserver2 import start_response_utils
from google.appengine.tools.devappserver2 import thread_executor
from google.appengine.tools.devappserver2 import wsgi_server
# This file uses pep8 naming.
# pylint: disable=invalid-name
_THREAD_POOL = thread_executor.ThreadExecutor()
ResponseTuple = collections.namedtuple('ResponseTuple',
['status', 'headers', 'content'])
class PortRegistry(object):
def __init__(self):
self._ports = {}
self._ports_lock = threading.RLock()
def add(self, port, _module, inst):
with self._ports_lock:
self._ports[port] = (_module, inst)
def get(self, port):
with self._ports_lock:
return self._ports[port]
class Dispatcher(request_info.Dispatcher):
"""A devappserver2 implementation of request_info.Dispatcher.
In addition to the request_info.Dispatcher interface, it owns modules and
manages their lifetimes.
"""
def __init__(self,
configuration,
host,
port,
auth_domain,
runtime_stderr_loglevel,
php_config,
python_config,
cloud_sql_config,
module_to_max_instances,
use_mtime_file_watcher,
automatic_restart,
allow_skipped_files,
module_to_threadsafe_override):
"""Initializer for Dispatcher.
Args:
configuration: An application_configuration.ApplicationConfiguration
instance storing the configuration data for the app.
host: A string containing the host that any HTTP servers should bind to
e.g. "localhost".
port: An int specifying the first port where servers should listen.
auth_domain: A string containing the auth domain to set in the environment
variables.
runtime_stderr_loglevel: An int reprenting the minimum logging level at
which runtime log messages should be written to stderr. See
devappserver2.py for possible values.
php_config: A runtime_config_pb2.PhpConfig instances containing PHP
runtime-specific configuration. If None then defaults are used.
python_config: A runtime_config_pb2.PythonConfig instance containing
Python runtime-specific configuration. If None then defaults are
used.
cloud_sql_config: A runtime_config_pb2.CloudSQL instance containing the
required configuration for local Google Cloud SQL development. If None
then Cloud SQL will not be available.
module_to_max_instances: A mapping between a module name and the maximum
number of instances that can be created (this overrides the settings
found in the configuration argument) e.g.
{'default': 10, 'backend': 15}.
use_mtime_file_watcher: A bool containing whether to use mtime polling to
monitor file changes even if other options are available on the
current platform.
automatic_restart: If True then instances will be restarted when a
file or configuration change that effects them is detected.
allow_skipped_files: If True then all files in the application's directory
are readable, even if they appear in a static handler or "skip_files"
directive.
module_to_threadsafe_override: A mapping between the module name and what
to override the module's YAML threadsafe configuration (so modules
not named continue to use their YAML configuration).
"""
self._configuration = configuration
self._php_config = php_config
self._python_config = python_config
self._cloud_sql_config = cloud_sql_config
self._request_data = None
self._api_host = None
self._api_port = None
self._running_modules = []
self._module_configurations = {}
self._host = host
self._port = port
self._auth_domain = auth_domain
self._runtime_stderr_loglevel = runtime_stderr_loglevel
self._module_name_to_module = {}
self._dispatch_server = None
self._quit_event = threading.Event() # Set when quit() has been called.
self._update_checking_thread = threading.Thread(
target=self._loop_checking_for_updates)
self._module_to_max_instances = module_to_max_instances or {}
self._use_mtime_file_watcher = use_mtime_file_watcher
self._automatic_restart = automatic_restart
self._allow_skipped_files = allow_skipped_files
self._module_to_threadsafe_override = module_to_threadsafe_override
self._executor = scheduled_executor.ScheduledExecutor(_THREAD_POOL)
self._port_registry = PortRegistry()
def start(self, api_host, api_port, request_data):
"""Starts the configured modules.
Args:
api_host: The hostname that APIServer listens for RPC requests on.
api_port: The port that APIServer listens for RPC requests on.
request_data: A wsgi_request_info.WSGIRequestInfo that will be provided
with request information for use by API stubs.
"""
self._api_host = api_host
self._api_port = api_port
self._request_data = request_data
port = self._port
self._executor.start()
if self._configuration.dispatch:
self._dispatch_server = wsgi_server.WsgiServer((self._host, port), self)
self._dispatch_server.start()
logging.info('Starting dispatcher running at: http://%s:%s', self._host,
self._dispatch_server.port)
self._update_checking_thread.start()
if port:
port += 1
self._port_registry.add(self._dispatch_server.port, None, None)
for module_configuration in self._configuration.modules:
self._module_configurations[
module_configuration.module_name] = module_configuration
_module, port = self._create_module(module_configuration, port)
_module.start()
self._module_name_to_module[module_configuration.module_name] = _module
logging.info('Starting module "%s" running at: http://%s',
module_configuration.module_name, _module.balanced_address)
@property
def dispatch_port(self):
"""The port that the dispatch HTTP server for the Module is listening on."""
assert self._dispatch_server, 'dispatch server not running'
assert self._dispatch_server.ready, 'dispatch server not ready'
return self._dispatch_server.port
@property
def host(self):
"""The host that the HTTP server for this Dispatcher is listening on."""
return self._host
@property
def dispatch_address(self):
"""The address of the dispatch HTTP server e.g. "localhost:8080"."""
if self.dispatch_port != 80:
return '%s:%s' % (self.host, self.dispatch_port)
else:
return self.host
def _check_for_updates(self):
self._configuration.dispatch.check_for_updates()
def _loop_checking_for_updates(self):
"""Loops until the Dispatcher exits, reloading dispatch.yaml config."""
while not self._quit_event.is_set():
self._check_for_updates()
self._quit_event.wait(timeout=1)
def quit(self):
"""Quits all modules."""
self._executor.quit()
self._quit_event.set()
if self._dispatch_server:
self._dispatch_server.quit()
for _module in self._module_name_to_module.values():
_module.quit()
def _create_module(self, module_configuration, port):
max_instances = self._module_to_max_instances.get(
module_configuration.module_name)
threadsafe_override = self._module_to_threadsafe_override.get(
module_configuration.module_name)
module_args = (module_configuration,
self._host,
port,
self._api_host,
self._api_port,
self._auth_domain,
self._runtime_stderr_loglevel,
self._php_config,
self._python_config,
self._cloud_sql_config,
self._port,
self._port_registry,
self._request_data,
self,
max_instances,
self._use_mtime_file_watcher,
self._automatic_restart,
self._allow_skipped_files,
threadsafe_override)
if module_configuration.manual_scaling:
_module = module.ManualScalingModule(*module_args)
elif module_configuration.basic_scaling:
_module = module.BasicScalingModule(*module_args)
else:
_module = module.AutoScalingModule(*module_args)
if port != 0:
port += 1
return _module, port
@property
def modules(self):
return self._module_name_to_module.values()
def get_hostname(self, module_name, version, instance_id=None):
"""Returns the hostname for a (module, version, instance_id) tuple.
If instance_id is set, this will return a hostname for that particular
instances. Otherwise, it will return the hostname for load-balancing.
Args:
module_name: A str containing the name of the module.
version: A str containing the version.
instance_id: An optional str containing the instance ID.
Returns:
A str containing the hostname.
Raises:
request_info.ModuleDoesNotExistError: The module does not exist.
request_info.VersionDoesNotExistError: The version does not exist.
request_info.InvalidInstanceIdError: The instance ID is not valid for the
module/version or the module/version uses automatic scaling.
"""
_module = self._get_module(module_name, version)
if instance_id is None:
return _module.balanced_address
else:
return _module.get_instance_address(instance_id)
def get_module_names(self):
"""Returns a list of module names."""
return list(self._module_name_to_module)
def get_module_by_name(self, _module):
"""Returns the module with the given name.
Args:
_module: A str containing the name of the module.
Returns:
The module.Module with the provided name.
Raises:
request_info.ModuleDoesNotExistError: The module does not exist.
"""
try:
return self._module_name_to_module[_module]
except KeyError:
raise request_info.ModuleDoesNotExistError(_module)
def get_versions(self, _module):
"""Returns a list of versions for a module.
Args:
_module: A str containing the name of the module.
Returns:
A list of str containing the versions for the specified module.
Raises:
request_info.ModuleDoesNotExistError: The module does not exist.
"""
if _module in self._module_configurations:
return [self._module_configurations[_module].major_version]
else:
raise request_info.ModuleDoesNotExistError(_module)
def get_default_version(self, _module):
"""Returns the default version for a module.
Args:
_module: A str containing the name of the module.
Returns:
A str containing the default version for the specified module.
Raises:
request_info.ModuleDoesNotExistError: The module does not exist.
"""
if _module in self._module_configurations:
return self._module_configurations[_module].major_version
else:
raise request_info.ModuleDoesNotExistError(_module)
def add_event(self, runnable, eta, service=None, event_id=None):
"""Add a callable to be run at the specified time.
Args:
runnable: A callable object to call at the specified time.
eta: An int containing the time to run the event, in seconds since the
epoch.
service: A str containing the name of the service that owns this event.
This should be set if event_id is set.
event_id: A str containing the id of the event. If set, this can be passed
to update_event to change the time at which the event should run.
"""
if service is not None and event_id is not None:
key = (service, event_id)
else:
key = None
self._executor.add_event(runnable, eta, key)
def update_event(self, eta, service, event_id):
"""Update the eta of a scheduled event.
Args:
eta: An int containing the time to run the event, in seconds since the
epoch.
service: A str containing the name of the service that owns this event.
event_id: A str containing the id of the event to update.
"""
self._executor.update_event(eta, (service, event_id))
def _get_module(self, module_name, version):
"""Attempts to find the specified module.
Args:
module_name: The name of the module.
version: The version id.
Returns:
Module object.
Raises:
request_info.ModuleDoesNotExistError: The module doesn't exist.
request_info.VersionDoesNotExistError: The version doesn't exist.
"""
if not module_name:
module_name = 'default'
if module_name not in self._module_name_to_module:
raise request_info.ModuleDoesNotExistError()
if (version is not None and
version != self._module_configurations[module_name].major_version):
raise request_info.VersionDoesNotExistError()
return self._module_name_to_module[module_name]
def _get_module_with_soft_routing(self, module_name, version):
"""Uses soft-routing to find the specified module.
Soft-routing is an attempt to match the production resolution order, which
is slightly more permissive than the Modules API behavior. Here are the
rules allowed:
1. If a module is requested that doesn't exist, use the default module.
2. If a module is requested that doesn't exist, and there is no default
module, use any module.
Args:
module_name: The name of the module.
version: The version id.
Returns:
Module object.
Raises:
request_info.ModuleDoesNotExistError: The module doesn't exist.
request_info.VersionDoesNotExistError: The version doesn't exist.
"""
if not module_name or module_name not in self._module_name_to_module:
if 'default' in self._module_name_to_module:
module_name = 'default'
elif self._module_name_to_module:
# If there is no default module, but there are other modules, take any.
# This is somewhat of a hack, and can be removed if we ever enforce the
# existence of a default module.
module_name = self._module_name_to_module.keys()[0]
else:
raise request_info.ModuleDoesNotExistError(module_name)
if (version is not None and
version != self._module_configurations[module_name].major_version):
raise request_info.VersionDoesNotExistError()
return self._module_name_to_module[module_name]
def set_num_instances(self, module_name, version, num_instances):
"""Sets the number of instances to run for a version of a module.
Args:
module_name: A str containing the name of the module.
version: A str containing the version.
num_instances: An int containing the number of instances to run.
Raises:
ModuleDoesNotExistError: The module does not exist.
VersionDoesNotExistError: The version does not exist.
NotSupportedWithAutoScalingError: The provided module/version uses
automatic scaling.
"""
self._get_module(module_name, version).set_num_instances(num_instances)
def get_num_instances(self, module_name, version):
"""Returns the number of instances running for a version of a module.
Returns:
An int containing the number of instances running for a module version.
Args:
module_name: A str containing the name of the module.
version: A str containing the version.
Raises:
ModuleDoesNotExistError: The module does not exist.
VersionDoesNotExistError: The version does not exist.
NotSupportedWithAutoScalingError: The provided module/version uses
automatic scaling.
"""
return self._get_module(module_name, version).get_num_instances()
def start_module(self, module_name, version):
"""Starts a module.
Args:
module_name: A str containing the name of the module.
version: A str containing the version.
Raises:
ModuleDoesNotExistError: The module does not exist.
VersionDoesNotExistError: The version does not exist.
NotSupportedWithAutoScalingError: The provided module/version uses
automatic scaling.
"""
self._get_module(module_name, version).resume()
def stop_module(self, module_name, version):
"""Stops a module.
Args:
module_name: A str containing the name of the module.
version: A str containing the version.
Raises:
ModuleDoesNotExistError: The module does not exist.
VersionDoesNotExistError: The version does not exist.
NotSupportedWithAutoScalingError: The provided module/version uses
automatic scaling.
"""
self._get_module(module_name, version).suspend()
def send_background_request(self, module_name, version, inst,
background_request_id):
"""Dispatch a background thread request.
Args:
module_name: A str containing the module name to service this
request.
version: A str containing the version to service this request.
inst: The instance to service this request.
background_request_id: A str containing the unique background thread
request identifier.
Raises:
NotSupportedWithAutoScalingError: The provided module/version uses
automatic scaling.
BackgroundThreadLimitReachedError: The instance is at its background
thread capacity.
"""
_module = self._get_module_with_soft_routing(module_name, version)
try:
inst.reserve_background_thread()
except instance.CannotAcceptRequests:
raise request_info.BackgroundThreadLimitReachedError()
port = _module.get_instance_port(inst.instance_id)
environ = _module.build_request_environ(
'GET', '/_ah/background',
[('X-AppEngine-BackgroundRequest', background_request_id)],
'', '0.1.0.3', port)
_THREAD_POOL.submit(self._handle_request,
environ,
start_response_utils.null_start_response,
_module,
inst,
request_type=instance.BACKGROUND_REQUEST,
catch_and_log_exceptions=True)
# TODO: Think of better names for add_async_request and
# add_request.
def add_async_request(self, method, relative_url, headers, body, source_ip,
module_name=None, version=None, instance_id=None):
"""Dispatch an HTTP request asynchronously.
Args:
method: A str containing the HTTP method of the request.
relative_url: A str containing path and query string of the request.
headers: A list of (key, value) tuples where key and value are both str.
body: A str containing the request body.
source_ip: The source ip address for the request.
module_name: An optional str containing the module name to service this
request. If unset, the request will be dispatched to the default
module.
version: An optional str containing the version to service this request.
If unset, the request will be dispatched to the default version.
instance_id: An optional str containing the instance_id of the instance to
service this request. If unset, the request will be dispatched to
according to the load-balancing for the module and version.
"""
if module_name:
_module = self._get_module_with_soft_routing(module_name, version)
else:
_module = self._module_for_request(urlparse.urlsplit(relative_url).path)
inst = _module.get_instance(instance_id) if instance_id else None
port = _module.get_instance_port(instance_id) if instance_id else (
_module.balanced_port)
environ = _module.build_request_environ(method, relative_url, headers, body,
source_ip, port)
_THREAD_POOL.submit(self._handle_request,
environ,
start_response_utils.null_start_response,
_module,
inst,
catch_and_log_exceptions=True)
def add_request(self, method, relative_url, headers, body, source_ip,
module_name=None, version=None, instance_id=None,
fake_login=False):
"""Process an HTTP request.
Args:
method: A str containing the HTTP method of the request.
relative_url: A str containing path and query string of the request.
headers: A list of (key, value) tuples where key and value are both str.
body: A str containing the request body.
source_ip: The source ip address for the request.
module_name: An optional str containing the module name to service this
request. If unset, the request will be dispatched according to the
host header and relative_url.
version: An optional str containing the version to service this request.
If unset, the request will be dispatched according to the host header
and relative_url.
instance_id: An optional str containing the instance_id of the instance to
service this request. If unset, the request will be dispatched
according to the host header and relative_url and, if applicable, the
load-balancing for the module and version.
fake_login: A bool indicating whether login checks should be bypassed,
i.e. "login: required" should be ignored for this request.
Returns:
A request_info.ResponseTuple containing the response information for the
HTTP request.
"""
if module_name:
_module = self._get_module_with_soft_routing(module_name, version)
inst = _module.get_instance(instance_id) if instance_id else None
else:
headers_dict = wsgiref.headers.Headers(headers)
_module, inst = self._resolve_target(
headers_dict['Host'], urlparse.urlsplit(relative_url).path)
if inst:
try:
port = _module.get_instance_port(inst.instance_id)
except request_info.NotSupportedWithAutoScalingError:
port = _module.balanced_port
else:
port = _module.balanced_port
environ = _module.build_request_environ(method, relative_url, headers, body,
source_ip, port,
fake_login=fake_login)
start_response = start_response_utils.CapturingStartResponse()
response = self._handle_request(environ,
start_response,
_module,
inst)
return request_info.ResponseTuple(start_response.status,
start_response.response_headers,
start_response.merged_response(response))
def _resolve_target(self, hostname, path):
"""Returns the module and instance that should handle this request.
Args:
hostname: A string containing the value of the host header in the request
or None if one was not present.
path: A string containing the path of the request.
Returns:
A tuple (_module, inst) where:
_module: The module.Module that should handle this request.
inst: The instance.Instance that should handle this request or None if
the module's load balancing should decide on the instance.
Raises:
request_info.ModuleDoesNotExistError: if hostname is not known.
"""
if self._port == 80:
default_address = self.host
else:
default_address = '%s:%s' % (self.host, self._port)
if not hostname or hostname == default_address:
return self._module_for_request(path), None
default_address_offset = hostname.find(default_address)
if default_address_offset > 0:
prefix = hostname[:default_address_offset - 1]
# The prefix should be 'module', but might be 'instance.version.module',
# 'version.module', or 'instance.module'. These alternatives work in
# production, but devappserver2 doesn't support running multiple versions
# of the same module. All we can really do is route to the default
# version of the specified module.
if '.' in prefix:
logging.warning('Ignoring instance/version in %s; multiple versions '
'are not supported in devappserver.', prefix)
module_name = prefix.split('.')[-1]
return self._get_module_with_soft_routing(module_name, None), None
else:
if ':' in hostname:
port = int(hostname.split(':', 1)[1])
else:
port = 80
try:
_module, inst = self._port_registry.get(port)
except KeyError:
raise request_info.ModuleDoesNotExistError(hostname)
if not _module:
_module = self._module_for_request(path)
return _module, inst
def _handle_request(self, environ, start_response, _module,
inst=None, request_type=instance.NORMAL_REQUEST,
catch_and_log_exceptions=False):
"""Dispatch a WSGI request.
Args:
environ: An environ dict for the request as defined in PEP-333.
start_response: A function with semantics defined in PEP-333.
_module: The module to dispatch this request to.
inst: The instance to service this request. If None, the module will
be left to choose the instance to serve this request.
request_type: The request_type of this request. See instance.*_REQUEST
module constants.
catch_and_log_exceptions: A bool containing whether to catch and log
exceptions in handling the request instead of leaving it for the
caller to handle.
Returns:
An iterable over the response to the request as defined in PEP-333.
"""
try:
return _module._handle_request(environ, start_response, inst=inst,
request_type=request_type)
except:
if catch_and_log_exceptions:
logging.exception('Internal error while handling request.')
else:
raise
def __call__(self, environ, start_response):
return self._handle_request(
environ, start_response, self._module_for_request(environ['PATH_INFO']))
def _module_for_request(self, path):
dispatch = self._configuration.dispatch
if dispatch:
for url, module_name in dispatch.dispatch:
if (url.path_exact and path == url.path or
not url.path_exact and path.startswith(url.path)):
return self._get_module_with_soft_routing(module_name, None)
return self._get_module_with_soft_routing(None, None)