| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| """This module adds support for multiple processes in the dev_appserver. |
| |
| Each instance of the application is started as a separate process on a unique |
| port, for state isolation and parallel execution. A load-balancing process is |
| also created for each Backend. An API Server process is launched to handle all |
| memcache and datastore API requests, so that persistent state is shared across |
| all processes. Application and Backend instances forward their memcache and |
| datastore requests to the API Server using the remote_api interface. |
| |
| The base process is considered the Master. It manages all subprocesses, assigns |
| them ports, and issues /_ah/start requests to Backend instances. Ports are |
| either fixed (using the base value of --multiprocess_min_port) or randomly |
| chosen. The Master listens on the --port specified by the user, and forwards |
| all requests to an App Instance process. |
| |
| Each balancer forwards incoming requests to the next free instance, |
| or return with a HTTP 503 error if no free instance is available. |
| """ |
| |
| |
| import BaseHTTPServer |
| import copy |
| import cStringIO |
| import errno |
| import httplib |
| import logging |
| import os |
| import Queue |
| import signal |
| import socket |
| import subprocess |
| import sys |
| import threading |
| import time |
| import weakref |
| |
| from google.appengine.api import backendinfo |
| from google.appengine.api.backends import backends as backends_api |
| from google.appengine.ext.remote_api import remote_api_stub |
| from google.appengine.tools import api_server |
| |
| ARG_ADDRESS = 'address' |
| ARG_PORT = 'port' |
| |
| ARG_BACKENDS = 'backends' |
| ARG_MULTIPROCESS = 'multiprocess' |
| ARG_MULTIPROCESS_MIN_PORT = 'multiprocess_min_port' |
| ARG_MULTIPROCESS_API_PORT = 'multiprocess_api_port' |
| ARG_MULTIPROCESS_API_SERVER = 'multiprocess_api_server' |
| ARG_MULTIPROCESS_APP_INSTANCE_ID = 'multiprocess_app_instance' |
| ARG_MULTIPROCESS_BACKEND_ID = 'multiprocess_backend_id' |
| ARG_MULTIPROCESS_BACKEND_INSTANCE_ID = 'multiprocess_backend_instance_id' |
| ARG_MULTIPROCESS_FRONTEND_PORT = 'multiprocess_frontend_port' |
| |
| |
| |
| API_SERVER_HOST = 'localhost' |
| PATH_DEV_API_SERVER = '/_ah/dev_api_server' |
| |
| BACKEND_MAX_INSTANCES = 20 |
| |
| |
| class Error(Exception): pass |
| |
| |
| def SetThreadName(thread, name): |
| """Sets the name a of thread, including the GlobalProcess name.""" |
| thread.setName('[%s: %s]' % (GlobalProcess().Type(), name)) |
| |
| |
| class StartInstance(threading.Thread): |
| """Thread that periodically attempts to start a backend instance.""" |
| |
| def __init__(self, child): |
| threading.Thread.__init__(self) |
| self.child = child |
| self.setDaemon(True) |
| SetThreadName(self, 'Start %s' % child) |
| |
| def run(self): |
| while True: |
| self.child.SendStartRequest() |
| time.sleep(1) |
| |
| |
| class ChildProcess(object): |
| def __init__(self, |
| host, |
| port, |
| app_instance=None, |
| backend_id=None, |
| instance_id=None, |
| frontend_port=None): |
| """Creates an object representing a child process. |
| |
| Only one of the given args should be provided (except for instance_id when |
| backend_id is specified). |
| |
| Args: |
| app_instance: (int) The process represents the indicated app instance. |
| backend_id: (string) The process represents a backend. |
| instance_id: (int) The process represents the given backend instance. |
| frontend_port: (int) for backends, the frontend port. |
| """ |
| |
| self.app_instance = app_instance |
| self.backend_id = backend_id |
| self.instance_id = instance_id |
| self.process = None |
| self.argv = [] |
| self.started = False |
| self.connection_handler = httplib.HTTPConnection |
| self.SetHostPort(host, port) |
| self.frontend_port = frontend_port |
| |
| def __str__(self): |
| if self.app_instance is not None: |
| string = 'App Instance' |
| elif self.instance_id is not None: |
| string = 'Backend Instance: %s.%d' % (self.backend_id, self.instance_id) |
| elif self.backend_id: |
| string = 'Backend Balancer: %s' % self.backend_id |
| else: |
| string = 'Unknown' |
| return '%s [%s]' % (string, self.Address()) |
| |
| def SetHostPort(self, host, port): |
| """Sets the host and port that this process listens on.""" |
| self.host = host |
| self.port = port |
| if self.backend_id: |
| backends_api._set_dev_port(self.port, |
| self.backend_id, |
| self.instance_id) |
| |
| def Address(self): |
| """Returns the URL for this process.""" |
| return 'http://%s:%d' % self.HostPort() |
| |
| def HostPort(self): |
| """Returns the address of this process as a (host, port) pair.""" |
| return (self.host, self.port) |
| |
| def Start(self, argv, api_port): |
| """Starts the child process. |
| |
| Args: |
| argv: The argv of the parent process. When starting the subprocess, |
| we make a copy of the parent's argv, then modify it in accordance with |
| how the ChildProcess is configured, to represent different processes in |
| the multiprocess dev_appserver. |
| api_port: The port on which the API Server listens. |
| """ |
| self.argv = copy.deepcopy(argv) |
| self.api_port = api_port |
| |
| self.SetFlag('--multiprocess') |
| self.SetFlag('--address', short_flag='-a', value=self.host) |
| self.SetFlag('--port', short_flag='-p', value=self.port) |
| self.SetFlag('--multiprocess_api_port', value=self.api_port) |
| if self.frontend_port is not None: |
| self.SetFlag('--multiprocess_frontend_port', value=self.frontend_port) |
| |
| if self.app_instance is not None: |
| self.SetFlag('--multiprocess_app_instance_id', value=0) |
| if self.backend_id is not None: |
| self.SetFlag('--multiprocess_backend_id', value=self.backend_id) |
| if self.instance_id is not None: |
| self.SetFlag('--multiprocess_backend_instance_id', value=self.instance_id) |
| if self.argv[0].endswith('.py'): |
| |
| |
| |
| self.argv.insert(0, sys.executable) |
| |
| logging.debug('Starting %s with args: %s', self, self.argv) |
| self.process = subprocess.Popen(self.argv) |
| |
| def EnableStartRequests(self): |
| """Starts a thread to periodically send /_ah/start to this instance. |
| |
| We need a thread to do this because we want to restart any resident Backends |
| that have been shutdown, and because a backend instance is not considered |
| to be ready for serving until it has successfully responded to /_ah/start. |
| """ |
| if self.backend_id and self.instance_id is not None: |
| self.start_thread = StartInstance(self) |
| self.start_thread.start() |
| |
| def Connect(self): |
| """Attempts to connect to the child process. |
| |
| Returns: |
| bool: Whether a connection was made. |
| """ |
| logging.debug('Attempting connection to %s', self) |
| sock = None |
| result = True |
| try: |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.connect(self.HostPort()) |
| except: |
| result = False |
| if sock: |
| sock.close() |
| return result |
| |
| def WaitForConnection(self, timeout_s=30.0, poll_period_s=0.5): |
| """Blocks until the child process has started. |
| |
| This method repeatedly attempts to connect to the process on its HTTP server |
| port. Returns when a connection has been successfully established or the |
| timeout has been reached. |
| |
| Args: |
| timeout_s: Amount of time to wait, in seconds. |
| poll_period_s: Time to wait between connection attempts. |
| """ |
| finish_time = time.time() + timeout_s |
| while time.time() < finish_time: |
| if self.Connect(): |
| return True |
| time.sleep(poll_period_s) |
| logging.info('%s took more than %d seconds to start.', self, timeout_s) |
| return False |
| |
| def SendStartRequest(self): |
| """If the process has not been started, sends a request to /_ah/start.""" |
| if self.started: |
| return |
| |
| try: |
| response = self.SendRequest('GET', '/_ah/start') |
| rc = response.status |
| if (rc >= 200 and rc < 300) or rc == 404: |
| self.started = True |
| except KeyboardInterrupt: |
| pass |
| except Exception, e: |
| logging.error('Failed start request to %s: %s', self, e) |
| |
| def SendRequest(self, command, path, payload=None, headers=None): |
| """Sends an HTTP request to this process. |
| |
| Args: |
| command: The HTTP command (e.g., GET, POST) |
| path: The URL path for the request. |
| headers: A dictionary containing headers as key-value pairs. |
| """ |
| logging.debug('send request: %s %s to %s' % (command, path, self)) |
| connection = self.connection_handler('%s:%d' % self.HostPort()) |
| connection.request(command, path, payload, headers or {}) |
| response = connection.getresponse() |
| return response |
| |
| def SetFlag(self, flag, short_flag=None, value=None): |
| """Add a flag to self.argv, replacing the existing value if set. |
| |
| Args: |
| flag: flag to remove. |
| short_flag: one letter short version of the flag (optional) |
| value: Value of the flag (optional) |
| """ |
| self.RemoveFlag(flag, short_flag=short_flag, has_value=(value is not None)) |
| if value is None: |
| self.argv.append(flag) |
| else: |
| self.argv.append(flag + '=' + str(value)) |
| |
| def RemoveFlag(self, flag, short_flag=None, has_value=False): |
| """Removes an argument from self.argv. |
| |
| Args: |
| flag: flag to remove. |
| short_flag: one letter short version of the flag |
| has_value: True if the next argument after the short flag is the value. |
| """ |
| new_argv = [] |
| index = 0 |
| while index < len(self.argv): |
| value = self.argv[index] |
| index += 1 |
| |
| |
| if flag == value: |
| |
| if has_value: |
| index += 1 |
| continue |
| |
| |
| if has_value and value.startswith(flag + '='): |
| continue |
| |
| |
| if short_flag == value: |
| |
| if has_value: |
| index += 1 |
| continue |
| new_argv.append(value) |
| |
| |
| self.argv = new_argv |
| |
| |
| class DevProcess(object): |
| """Represents a process in the multiprocess dev_appserver.""" |
| |
| TYPE_MASTER = 'Master' |
| TYPE_APP_INSTANCE = 'App Instance' |
| TYPE_BACKEND_BALANCER = 'Backend Balancer' |
| TYPE_BACKEND_INSTANCE = 'Backend Instance' |
| TYPES = frozenset([TYPE_MASTER, |
| TYPE_APP_INSTANCE, |
| TYPE_BACKEND_BALANCER, |
| TYPE_BACKEND_INSTANCE]) |
| |
| def __init__(self): |
| """Creates a DevProcess with a default configuration.""" |
| self.process_type = None |
| |
| |
| self.desc = None |
| |
| |
| self.http_server = None |
| |
| |
| self.app_id = None |
| |
| |
| self.backends = None |
| |
| |
| |
| self.app_instance = None |
| |
| |
| self.backend_id = None |
| |
| |
| self.instance_id = None |
| |
| |
| self.backend_entry = None |
| |
| |
| self.host = None |
| |
| |
| self.port = None |
| |
| |
| self.api_port = None |
| |
| |
| self.multiprocess_min_port = 9000 |
| |
| |
| self.children = [] |
| |
| |
| self.child_app_instance = None |
| |
| |
| self.child_api_server = None |
| |
| |
| self.balance_set = None |
| |
| |
| |
| self.started = False |
| |
| def Init(self, appinfo, backends, options): |
| """Supplies a list of backends for future use. |
| |
| Args: |
| appinfo: An AppInfoExternal object. |
| backends: List of BackendEntry objects. |
| options: Dictionary of command-line options. |
| """ |
| self.backends = backends |
| self.options = options |
| |
| self.app_id = appinfo.application |
| self.host = options[ARG_ADDRESS] |
| self.port = options[ARG_PORT] |
| |
| |
| if ARG_MULTIPROCESS_APP_INSTANCE_ID in options: |
| self.SetType(DevProcess.TYPE_APP_INSTANCE) |
| self.app_instance = options[ARG_MULTIPROCESS_APP_INSTANCE_ID] |
| self.desc = str(self.app_instance) |
| if ARG_MULTIPROCESS_BACKEND_ID in options: |
| self.backend_id = options[ARG_MULTIPROCESS_BACKEND_ID] |
| self.desc = self.backend_id |
| if ARG_MULTIPROCESS_BACKEND_INSTANCE_ID in options: |
| self.SetType(DevProcess.TYPE_BACKEND_INSTANCE) |
| self.instance_id = int(options[ARG_MULTIPROCESS_BACKEND_INSTANCE_ID]) |
| self.desc += '.%d' % self.instance_id |
| else: |
| self.SetType(DevProcess.TYPE_BACKEND_BALANCER) |
| |
| if ARG_MULTIPROCESS_API_PORT in options: |
| self.api_port = int(options[ARG_MULTIPROCESS_API_PORT]) |
| if ARG_MULTIPROCESS_MIN_PORT in options: |
| self.multiprocess_min_port = int(options[ARG_MULTIPROCESS_MIN_PORT]) |
| |
| if self.IsBackend(): |
| self.InitBackendEntry() |
| |
| |
| if not self.Type(): |
| self.SetType(DevProcess.TYPE_MASTER) |
| |
| def InitBackendEntry(self): |
| """Finds the entry for the backend this process represents, if any.""" |
| for backend in self.backends: |
| if backend.name == self.backend_id: |
| self.backend_entry = backend |
| |
| if not self.backend_entry: |
| raise Error('No backend entry found for: ' % self) |
| |
| def HttpServer(self): |
| """Returns the HTTPServer used by this process.""" |
| return self.http_server |
| |
| def Address(self): |
| """Returns the address of this process.""" |
| return 'http://%s:%d' % (self.host, self.port) |
| |
| def SetHttpServer(self, http_server): |
| """Sets the http_server to be used when handling requests. |
| |
| Args: |
| http_server: An HTTPServer that receives requests. |
| """ |
| self.http_server = http_server |
| self.handle_requests = HandleRequestThread() |
| self.handle_requests.start() |
| |
| def StartChildren(self, argv, options): |
| """Starts the set of child processes.""" |
| self.children = [] |
| |
| |
| base_port = self.multiprocess_min_port |
| self.frontend_port = base_port |
| next_port = base_port |
| self.child_app_instance = ChildProcess(self.host, next_port, |
| app_instance=0) |
| self.children.append(self.child_app_instance) |
| next_port += 1 |
| |
| |
| for backend in self.backends: |
| base_port += 100 |
| next_port = base_port |
| |
| |
| for i in xrange(backend.instances): |
| self.children.append(ChildProcess(self.host, next_port, |
| backend_id=backend.name, |
| instance_id=i, |
| frontend_port=self.frontend_port)) |
| next_port += 1 |
| |
| |
| self.children.append(ChildProcess(self.host, base_port + 99, |
| backend_id=backend.name)) |
| |
| |
| |
| base_port += 100 |
| next_port = base_port |
| |
| |
| self.child_api_server = api_server.APIServerProcess( |
| executable=sys.executable, |
| script=os.path.join(os.path.dirname(argv[0]), 'api_server.py'), |
| host=self.host, |
| port=next_port, |
| app_id=self.app_id, |
| application_host=options['address'], |
| application_port=options['port'], |
| application_root=options['root_path'], |
| auto_id_policy=options['auto_id_policy'], |
| blobstore_path=options['blobstore_path'], |
| clear_datastore=options['clear_datastore'], |
| clear_prospective_search=options['clear_prospective_search'], |
| datastore_path=options['datastore_path'], |
| enable_sendmail=options['enable_sendmail'], |
| enable_task_running=not options['disable_task_running'], |
| high_replication=options['high_replication'], |
| logs_path=options['logs_path'], |
| prospective_search_path=options['prospective_search_path'], |
| require_indexes=options['require_indexes'], |
| show_mail_body=options['show_mail_body'], |
| smtp_host=options['smtp_host'], |
| smtp_password=options['smtp_password'], |
| smtp_port=options['smtp_port'], |
| smtp_user=options['smtp_user'], |
| task_retry_seconds=options['task_retry_seconds'], |
| trusted=options['trusted'], |
| use_sqlite=options['use_sqlite'], |
| ) |
| self.child_api_server.Start() |
| |
| |
| if self.multiprocess_min_port == 0: |
| self.AssignPortsRandomly() |
| |
| |
| self.api_port = next_port |
| |
| |
| |
| for child in self.children: |
| child.Start(argv, self.api_port) |
| |
| |
| self.child_api_server.WaitUntilServing() |
| |
| for child in self.children: |
| child.WaitForConnection() |
| |
| |
| |
| |
| message = '\n\nMultiprocess Setup Complete:' |
| message += '\n Remote API Server [%s]' % self.child_api_server.url |
| for child in self.children: |
| message += '\n %s' % child |
| message += '\n' |
| logging.info(message) |
| |
| |
| for child in self.children: |
| child.EnableStartRequests() |
| |
| def AssignPortsRandomly(self): |
| """Acquires a random port for each child process.""" |
| bound = [] |
| for child in self.children: |
| sock = socket.socket() |
| sock.bind(('localhost', 0)) |
| bound.append(sock) |
| child.SetHostPort(self.host, sock.getsockname()[1]) |
| for sock in bound: |
| sock.close() |
| |
| def __str__(self): |
| result = '[%s]' % self.Type() |
| if self.desc: |
| result += ' [%s]' % self.desc |
| return result |
| |
| def SetType(self, process_type): |
| if process_type not in DevProcess.TYPES: |
| raise Error('Unknown process type: %s' % process_type) |
| if self.process_type is not None: |
| raise Error('Process type cannot be set more than once.') |
| self.process_type = process_type |
| |
| def Type(self): |
| return self.process_type |
| |
| def IsDefault(self): |
| """Indicates whether this is the default dev_appserver process.""" |
| return self.Type() is None |
| |
| def IsMaster(self): |
| """Indicates whether this is the master process.""" |
| return self.Type() == DevProcess.TYPE_MASTER |
| |
| def IsSubprocess(self): |
| """Indicates that this is a subprocessess of the dev_appserver.""" |
| return not (self.IsDefault() or self.IsMaster()) |
| |
| def IsAppInstance(self): |
| """Indicates whether this process represents an application instance.""" |
| return self.Type() == DevProcess.TYPE_APP_INSTANCE |
| |
| def IsBackend(self): |
| """Indicates whether this process represents a backend.""" |
| return self.IsBackendBalancer() or self.IsBackendInstance() |
| |
| def IsBackendBalancer(self): |
| """Indicates whether this process represents a backend load balancer.""" |
| return self.Type() == DevProcess.TYPE_BACKEND_BALANCER |
| |
| def IsBackendInstance(self): |
| """Indicates whether this process represents a backend instance.""" |
| return self.Type() == DevProcess.TYPE_BACKEND_INSTANCE |
| |
| def IsBalancer(self): |
| """Indicates whether this process represents a load balancer.""" |
| return self.IsMaster() or self.IsBackendBalancer() |
| |
| def IsInstance(self): |
| """Indicates whether this process represents an instance.""" |
| return self.IsAppInstance() or self.IsBackendInstance() |
| |
| def InitBalanceSet(self): |
| """Construct a list of instances to balance traffic over.""" |
| if self.IsMaster(): |
| self.balance_set = [ self.child_app_instance.port ] |
| |
| if self.IsBackendBalancer(): |
| self.balance_set = [] |
| for instance in xrange(self.backend_entry.instances): |
| port = backends_api._get_dev_port(self.backend_id, instance) |
| self.balance_set.append(port) |
| |
| def GetBalanceSet(self): |
| """Return the set of ports over which this process balances requests.""" |
| return self.balance_set |
| |
| def FailFast(self): |
| """Indicates whether this process has fail-fast behavior.""" |
| if not self.backend_entry: |
| return False |
| if self.backend_entry.failfast: |
| return True |
| |
| return False |
| |
| def PrintStartMessage(self, app_id, host, port): |
| """Print the start message for processes that are started automatically.""" |
| url = 'http://%s:%d' % (host, port) |
| admin_url = '%s/_ah/admin' % url |
| if not self.IsSubprocess(): |
| |
| logging.info('Running application %s on port %d: %s', |
| app_id, port, url) |
| logging.info('Admin console is available at: %s', |
| admin_url) |
| |
| def Children(self): |
| """Returns the children of this process.""" |
| return self.children |
| |
| def MaybeConfigureRemoteDataApis(self): |
| """Set up stubs using remote_api as appropriate. |
| |
| If this is the API server (or is not multiprocess), return False. |
| Otherwise, set up the stubs for data based APIs as remote stubs pointing at |
| the to the API server and return True. |
| """ |
| if self.IsDefault(): |
| return False |
| |
| |
| |
| |
| |
| |
| |
| services = ( |
| 'app_identity_service', |
| 'capability_service', |
| 'datastore_v3', |
| 'mail', |
| 'memcache', |
| 'taskqueue', |
| 'urlfetch', |
| 'xmpp', |
| ) |
| remote_api_stub.ConfigureRemoteApi( |
| self.app_id, PATH_DEV_API_SERVER, lambda: ('', ''), |
| servername='%s:%d' % (API_SERVER_HOST, self.api_port), |
| services=services, use_remote_datastore=False) |
| return True |
| |
| def NewAppInfo(self, appinfo): |
| """Called when a new appinfo is read from disk on each request. |
| |
| The only action we take is to apply backend settings, such as the 'start' |
| directive, which adds a handler for /_ah/start. |
| |
| Args: |
| appinfo: An AppInfoExternal to be used on the next request. |
| """ |
| if self.backends: |
| appinfo.backends = self.backends |
| if self.IsBackend(): |
| appinfo.ApplyBackendSettings(self.backend_id) |
| |
| def UpdateEnv(self, env_dict): |
| """Copies backend port information to the supplied environment dictionary. |
| |
| This information is used by the Backends API to resolve backend and instance |
| addresses in the dev_appserver. |
| |
| User-supplied code has no access to the default environment. This method |
| will copy the environment variables needed for the backends api from the |
| default environment to the environment where user supplied code runs. |
| |
| Args: |
| env_dict: Dictionary with the new environment. |
| """ |
| if self.backend_id: |
| env_dict['BACKEND_ID'] = self.backend_id |
| if self.instance_id is not None: |
| env_dict['INSTANCE_ID'] = str(self.instance_id) |
| |
| for key in os.environ: |
| if key.startswith('BACKEND_PORT'): |
| env_dict[key] = os.environ[key] |
| |
| def ProcessRequest(self, request, client_address): |
| """Handles the SocketServer process_request call. |
| |
| If the request is to a backend the request will be handled by a separate |
| thread. If the backend is busy a 503 response will be sent. |
| |
| If this is a balancer instance each incoming request will be forwarded to |
| its own thread and handled there. |
| |
| If no backends are configured this override has no effect. |
| |
| Args: |
| http_server: The http server handling the request |
| request: the request to process |
| client_address: the client address |
| """ |
| assert not self.IsDefault() |
| |
| if self.IsBalancer(): |
| |
| ForwardRequestThread(request, client_address).start() |
| return |
| |
| assert self.IsAppInstance() or self.IsBackendInstance() |
| |
| if self.handle_requests.Active(): |
| if self.FailFast(): |
| logging.info('respond busy') |
| RespondBusyHandler(request, client_address) |
| return |
| |
| self.handle_requests.Enqueue(request, client_address) |
| |
| def HandleRequest(self, request): |
| """Hook that allows the DevProcess a chance to respond to requests. |
| |
| This hook is invoked just before normal request dispatch occurs in |
| dev_appserver.py. |
| |
| Args: |
| request: The request to be handled. |
| |
| Returns: |
| bool: Indicates whether the request was handled here. If False, normal |
| request handling should proceed. |
| """ |
| if self.IsBackendInstance() and not self.started: |
| if request.path != '/_ah/start': |
| request.send_response(httplib.FORBIDDEN, |
| 'Waiting for start request to finish.') |
| return True |
| |
| |
| return False |
| |
| def RequestComplete(self, request, response): |
| """Invoked when the process has finished handling a request.""" |
| rc = response.status_code |
| |
| if request.path == '/_ah/start': |
| if (rc >= 200 and rc < 300) or rc == 404: |
| self.started = True |
| |
| def UpdateSystemStub(self, system_service_stub): |
| """Copies info about the backends into the system stub.""" |
| if self.IsDefault(): |
| return |
| system_service_stub.set_backend_info(self.backends) |
| |
| |
| class HandleRequestThread(threading.Thread): |
| """Thread for handling HTTP requests. |
| |
| Instances needs to be able to respond with 503 when busy with other requests, |
| therefore requests are accepted in the main thread and forwarded to the |
| serving thread for processing. If the serving thread is busy with other |
| requests and the max pending queue length is reached a 503 error is sent back. |
| |
| Args: |
| http_server: Http server class handling the request. |
| max_pending_requests: The maximum number of pending requests in the queue. |
| """ |
| |
| def __init__(self): |
| threading.Thread.__init__(self) |
| self.setDaemon(True) |
| SetThreadName(self, 'HandleRequestThread') |
| |
| |
| self.active = False |
| |
| |
| self.pending = Queue.Queue() |
| |
| def Active(self): |
| """Indicates whether this thread is busy handling a request.""" |
| return self.active |
| |
| def Enqueue(self, request, client_address): |
| """Adds the indicated request to the pending request queue.""" |
| self.pending.put_nowait((request, client_address)) |
| |
| def run(self): |
| """Takes requests from the queue and handles them.""" |
| while True: |
| request, client_address = self.pending.get() |
| self.active = True |
| |
| |
| try: |
| HandleRequestDirectly(request, client_address) |
| except Exception, e: |
| logging.info('Exception in HandleRequestThread', exc_info=1) |
| finally: |
| self.active = False |
| |
| |
| class RespondBusyHandler(BaseHTTPServer.BaseHTTPRequestHandler): |
| """Handler that always will send back a 503 error.""" |
| |
| def __init__(self, request, client_address): |
| BaseHTTPServer.BaseHTTPRequestHandler.__init__( |
| self, request, client_address, HttpServer()) |
| |
| def handle_one_request(self): |
| """Override.""" |
| self.raw_requestline = self.rfile.readline() |
| if not self.raw_requestline: |
| self.close_connection = 1 |
| return |
| if not self.parse_request(): |
| return |
| self.send_error(httplib.SERVICE_UNAVAILABLE, 'Busy.') |
| |
| |
| class ForwardRequestThread(threading.Thread): |
| """Forwards an incoming request in a separate thread.""" |
| |
| def __init__(self, request, client_address): |
| threading.Thread.__init__(self) |
| self.request = request |
| self.client_address = client_address |
| self.setDaemon(True) |
| SetThreadName(self, 'ForwardRequestThread') |
| |
| def run(self): |
| ForwardRequestHandler(self.request, self.client_address) |
| |
| |
| class ForwardRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): |
| """Forwards the incoming request to next free backend instance.""" |
| |
| def __init__(self, |
| request, |
| client_address, |
| connection_handler=httplib.HTTPConnection): |
| """Constructor extending BaseHTTPRequestHandler. |
| |
| Args: |
| request: The incoming request. |
| client_address: A (ip, port) tuple with the address of the client. |
| backend: The HTTPServer that received the request. |
| connection_handler: http library to use when balancer the connection to |
| the next available backend instance. Used for dependency injection. |
| """ |
| self.connection_handler = connection_handler |
| BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, |
| request, |
| client_address, |
| HttpServer()) |
| |
| def handle_one_request(self): |
| """Override. Invoked from BaseHTTPRequestHandler constructor.""" |
| self.raw_requestline = self.rfile.readline() |
| if not self.raw_requestline: |
| self.close_connection = 1 |
| return |
| if not self.parse_request(): |
| return |
| |
| process = GlobalProcess() |
| balance_set = process.GetBalanceSet() |
| request_size = int(self.headers.get('content-length', 0)) |
| payload = self.rfile.read(request_size) |
| |
| |
| |
| |
| for port in balance_set: |
| logging.debug('balancer to port %d', port) |
| connection = self.connection_handler(process.host, port=port) |
| |
| |
| connection.response_class = ForwardResponse |
| connection.request(self.command, self.path, payload, dict(self.headers)) |
| try: |
| response = connection.getresponse() |
| except httplib.HTTPException, e: |
| |
| |
| self.send_error(httplib.INTERNAL_SERVER_ERROR, str(e)) |
| return |
| |
| if response.status != httplib.SERVICE_UNAVAILABLE: |
| self.wfile.write(response.data) |
| return |
| |
| |
| self.send_error(httplib.SERVICE_UNAVAILABLE, 'Busy') |
| |
| |
| class ForwardResponse(httplib.HTTPResponse): |
| """Modifies the HTTPResponse class so the raw request data is saved. |
| |
| This class is used by balancer instances when balancer requests to a |
| backend instance. |
| """ |
| |
| def __init__(self, sock, debuglevel=0, strict=0, method=None): |
| httplib.HTTPResponse.__init__(self, sock, debuglevel, strict, method) |
| self.data = self.fp.read() |
| self.fp = cStringIO.StringIO(self.data) |
| |
| |
| |
| |
| |
| _dev_process = DevProcess() |
| |
| |
| def GlobalProcess(): |
| """Returns a global DevProcess object representing the current process.""" |
| return _dev_process |
| |
| |
| def Enabled(): |
| """Indicates whether the dev_appserver is running in multiprocess mode.""" |
| return not GlobalProcess().IsDefault() |
| |
| |
| def HttpServer(): |
| """Returns the HTTPServer used by this process.""" |
| return GlobalProcess().HttpServer() |
| |
| |
| def HandleRequestDirectly(request, client_address): |
| """Handles the indicated request directly, without additional processing.""" |
| BaseHTTPServer.HTTPServer.process_request( |
| HttpServer(), request, client_address) |
| |
| |
| def PosixShutdown(): |
| """Kills a posix process with os.kill.""" |
| dev_process = GlobalProcess() |
| children = dev_process.Children() |
| for term_signal in (signal.SIGTERM, signal.SIGKILL): |
| for child in children: |
| if child.process is None: |
| continue |
| if child.process.returncode is not None: |
| continue |
| pid = child.process.pid |
| try: |
| logging.debug('posix kill %d with signal %d', pid, term_signal) |
| os.kill(pid, term_signal) |
| except OSError, err: |
| logging.error('Error encountered sending pid %d signal %d:%s\n', |
| pid, term_signal, err) |
| break |
| |
| time.sleep(0.2) |
| |
| |
| for child in children: |
| if child.process is None: |
| continue |
| if child.process.returncode is not None: |
| continue |
| try: |
| child.process.wait() |
| except OSError, e: |
| if e.errno != errno.ECHILD: |
| raise e |
| |
| |
| def Shutdown(): |
| """Shut down any child processes started.""" |
| dev_process = GlobalProcess() |
| if not dev_process.IsMaster(): |
| return |
| |
| |
| |
| if os.name == 'nt': |
| |
| import ctypes |
| for child in dev_process.Children(): |
| logging.debug('windows kill ' + str(child.process.pid)) |
| ctypes.windll.kernel32.TerminateProcess(int(child.process._handle), -1) |
| else: |
| PosixShutdown() |
| |
| dev_process.child_api_server.Quit() |
| |
| |
| def SetLogPrefix(prefix): |
| """Adds a prefix to the log handler to identify the process. |
| |
| Args: |
| prefix: The prefix string to append at the beginning of each line. |
| """ |
| formatter = logging.Formatter( |
| str(prefix) + ' [%(filename)s:%(lineno)d] %(levelname)s %(message)s') |
| logging._acquireLock() |
| try: |
| for handler in logging._handlerList: |
| if isinstance(handler, weakref.ref): |
| handler = handler() |
| if handler: |
| handler.setFormatter(formatter) |
| finally: |
| logging._releaseLock() |
| |
| |
| def Init(argv, options, root_path, appinfo): |
| """Enter multiprocess mode, if required. |
| |
| The dev_appserver runs in multiprocess mode if any Backends are configured. |
| The initial process becomes a "master" which acts as a router for the app, and |
| centralized memcache/datastore API server for sharing persistent state. |
| |
| This method works by configuring the global DevProcess object, which is |
| referenced by other files in the dev_appserver when necessary. The DevProcess |
| contains state indicating which role the current process plays in the |
| multiprocess architecture. |
| |
| The master process creates and shuts down subprocesses. A separate process is |
| created to represent an instance of the application, and a separate process is |
| created for each backend (to act as a load balancer) and for each backend |
| instance. |
| |
| On shutdown, the master process kills all subprocesses before exiting. |
| |
| Args: |
| argv: The command line arguments used when starting the main application. |
| options: Parsed dictionary of the command line arguments. |
| root_path: Root directory of the application. |
| appinfo: An AppInfoExternal object representing a parsed app.yaml file. |
| """ |
| if ARG_BACKENDS not in options: |
| return |
| |
| backends_path = os.path.join(root_path, 'backends.yaml') |
| if not os.path.exists(backends_path): |
| backends = [] |
| else: |
| backends_fh = open(backends_path) |
| try: |
| backend_info = backendinfo.LoadBackendInfo(backends_fh.read()) |
| finally: |
| backends_fh.close() |
| backends = backend_info.backends |
| |
| backend_set = set() |
| for backend in backends: |
| if backend.name in backend_set: |
| raise Error('Duplicate backend: %s' % backend.name) |
| if backend.instances is None: |
| backend.instances = 1 |
| elif backend.instances > BACKEND_MAX_INSTANCES: |
| raise Error('Maximum number of instances is %d', BACKEND_MAX_INSTANCES) |
| backend_set.add(backend.name) |
| |
| process = _dev_process |
| process.Init(appinfo, backends, options) |
| |
| if process.IsDefault(): |
| logging.info('Default process') |
| return |
| |
| SetLogPrefix(process) |
| if process.IsMaster(): |
| process.StartChildren(argv, options) |
| |
| process.InitBalanceSet() |
| |
| |
| if process.IsMaster(): |
| options['require_indexes'] = False |
| else: |
| options['require_indexes'] = True |
| |
| options['clear_datastore'] = False |
| options['clear_prospective_search'] = False |