blob: 6d39aeaa0f93c3c326aa6fc232de9181a706a6d6 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Background thread handler.
Provides a way to create new threads which are not bound to the creator's
request context and do not need to end before the creator request completes.
"""
import logging
import sys
import thread
import threading
import traceback
from google.appengine.api.logservice import logservice
from google.appengine.runtime import request_environment
BACKGROUND_REQUEST_ID = 'HTTP_X_APPENGINE_BACKGROUNDREQUEST'
class _BackgroundRequest(object):
"""A class for coordinating between a background thread and its creator.
This facilitates swapping target, args and kwargs for thread_id between the
background thread creator and the background thread.
"""
def __init__(self):
self._ready_condition = threading.Condition()
self._callable_ready = False
self._thread_id_ready = False
def ProvideCallable(self, target, args, kwargs):
"""Sets the target and the args to be provided to this background request.
Args:
target: A callable for the background thread to execute.
args: A tuple of positional args to be passed to target.
kwargs: A dict of keyword args to be passed to target.
Returns:
The thread ID of the thread servicing this background request.
"""
with self._ready_condition:
self._target = target
self._args = args
self._kwargs = kwargs
self._callable_ready = True
self._ready_condition.notify()
while not self._thread_id_ready:
self._ready_condition.wait()
return self._thread_id
def WaitForCallable(self):
"""Sets the thread ID and returns the callable and args for this request.
This will block until the request details have been set.
Returns:
A tuple (target, args, kwargs) where
target: A callable for the background thread to execute.
args: A tuple of positional args to be passed to target.
kwargs: A dict of keyword args to be passed to target.
"""
with self._ready_condition:
self._thread_id = thread.get_ident()
self._thread_id_ready = True
self._ready_condition.notify()
while not self._callable_ready:
self._ready_condition.wait()
return self._target, self._args, self._kwargs
class _BackgroundRequestsContainer(object):
"""A container for storing pending background thread requests."""
def __init__(self):
self._requests = {}
self._lock = threading.Lock()
def _GetOrAddRequest(self, request_id):
with self._lock:
if request_id in self._requests:
return self._requests[request_id]
else:
request = _BackgroundRequest()
self._requests[request_id] = request
return request
def _RemoveRequest(self, request_id):
with self._lock:
del self._requests[request_id]
def EnqueueBackgroundThread(self, request_id, target, args, kwargs):
"""Enqueues a new background thread request for a certain request ID.
Args:
request_id: A str containing the request ID for this background thread.
target: A callable for the background thread to execute.
args: A tuple of positional args to be passed to target.
kwargs: A dict of keyword args to be passed to target.
Returns:
The thread_id of the background thread.
"""
request = self._GetOrAddRequest(request_id)
return request.ProvideCallable(target, args, kwargs)
def RunBackgroundThread(self, request_id):
"""Runs the callable enqueued for the specified request ID."""
request = self._GetOrAddRequest(request_id)
target, args, kwargs = request.WaitForCallable()
self._RemoveRequest(request_id)
target(*args, **kwargs)
_pending_background_threads = _BackgroundRequestsContainer()
def EnqueueBackgroundThread(request_id, target, args, kwargs):
"""Enqueues a new background thread request for a certain request ID.
Args:
request_id: A str containing the request ID for this background thread.
target: A callable for the background thread to execute.
args: A tuple of positional args to be passed to target.
kwargs: A dict of keyword args to be passed to target.
Returns:
The thread_id of the background thread.
"""
return _pending_background_threads.EnqueueBackgroundThread(
request_id, target, args, kwargs)
def Handle(environ):
"""Handles a background request.
This function is called by the runtime in response to an /_ah/background
request.
Args:
environ: A dict containing the environ for this request (i.e. for
os.environ).
Returns:
A dict containing:
error: App Engine error code. 0 if completed successfully, 1 otherwise.
response_code: The HTTP response code. 200 if completed successfully, 500
otherwise.
logs: A list of tuples (timestamp_usec, severity, message) of log
entries. timestamp_usec is a long, severity is int and message is
str. Severity levels are 0..4 for Debug, Info, Warning, Error,
Critical.
"""
error = logservice.LogsBuffer()
request_environment.current_request.Init(error, environ)
response = {'error': 0, 'response_code': 200}
try:
request_id = environ[BACKGROUND_REQUEST_ID]
_pending_background_threads.RunBackgroundThread(request_id)
return response
except:
exception = sys.exc_info()
tb = exception[2].tb_next
if tb:
tb = tb.tb_next
message = ''.join(traceback.format_exception(exception[0], exception[1],
tb))
logging.error(message)
response['response_code'] = 500
response['error'] = 1
return response
finally:
request_environment.current_request.Clear()
response['logs'] = error.parse_logs()