blob: 14b7a72ddc5cd0eeb229a1317685a7c86fac519d [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Task Queue API.
Enables an application to queue background work for itself. Work is done through
webhooks that process tasks pushed from a queue. Tasks will execute in
best-effort order of ETA. Webhooks that fail will cause tasks to be retried at a
later time. Multiple queues may exist with independent throttling controls.
Webhook URLs may be specified directly for Tasks, or the default URL scheme
may be used, which will translate Task names into URLs relative to a Queue's
base path. A default queue is also provided for simple usage.
"""
__all__ = [
'BadTaskStateError', 'BadTransactionState', 'BadTransactionStateError',
'DatastoreError', 'DuplicateTaskNameError', 'Error', 'InternalError',
'InvalidQueueError', 'InvalidQueueNameError', 'InvalidTaskError',
'InvalidTaskNameError', 'InvalidUrlError', 'PermissionDeniedError',
'TaskAlreadyExistsError', 'TaskTooLargeError', 'TombstonedTaskError',
'TooManyTasksError', 'TransientError', 'UnknownQueueError',
'MAX_QUEUE_NAME_LENGTH', 'MAX_TASK_NAME_LENGTH', 'MAX_TASK_SIZE_BYTES',
'MAX_URL_LENGTH',
'Queue', 'Task', 'TaskRetryOptions', 'add']
import calendar
import datetime
import math
import os
import re
import time
import urllib
import urlparse
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import namespace_manager
from google.appengine.api import urlfetch
from google.appengine.api.labs.taskqueue import taskqueue_service_pb
from google.appengine.runtime import apiproxy_errors
class Error(Exception):
"""Base-class for exceptions in this module."""
class UnknownQueueError(Error):
"""The queue specified is unknown."""
class TransientError(Error):
"""There was a transient error while accessing the queue.
Please Try again later.
"""
class InternalError(Error):
"""There was an internal error while accessing this queue.
If this problem continues, please contact the App Engine team through
our support forum with a description of your problem.
"""
class InvalidTaskError(Error):
"""The task's parameters, headers, or method is invalid."""
class InvalidTaskNameError(InvalidTaskError):
"""The task's name is invalid."""
class TaskTooLargeError(InvalidTaskError):
"""The task is too large with its headers and payload."""
class TaskAlreadyExistsError(InvalidTaskError):
"""Task already exists. It has not yet run."""
class TombstonedTaskError(InvalidTaskError):
"""Task has been tombstoned."""
class InvalidUrlError(InvalidTaskError):
"""The task's relative URL is invalid."""
class BadTaskStateError(Error):
"""The task is in the wrong state for the requested operation."""
class InvalidQueueError(Error):
"""The Queue's configuration is invalid."""
class InvalidQueueNameError(InvalidQueueError):
"""The Queue's name is invalid."""
class _RelativeUrlError(Error):
"""The relative URL supplied is invalid."""
class PermissionDeniedError(Error):
"""The requested operation is not allowed for this app."""
class DuplicateTaskNameError(Error):
"""The add arguments contain tasks with identical names."""
class TooManyTasksError(Error):
"""Too many tasks were present in a single function call."""
class DatastoreError(Error):
"""There was a datastore error while accessing the queue."""
class BadTransactionStateError(Error):
"""The state of the current transaction does not permit this operation."""
class InvalidTaskRetryOptionsError(Error):
"""The task retry configuration is invalid."""
BadTransactionState = BadTransactionStateError
MAX_QUEUE_NAME_LENGTH = 100
MAX_TASK_NAME_LENGTH = 500
MAX_TASK_SIZE_BYTES = 10 * (2 ** 10)
MAX_TASKS_PER_ADD = 100
MAX_URL_LENGTH = 2083
_DEFAULT_QUEUE = 'default'
_DEFAULT_QUEUE_PATH = '/_ah/queue'
_METHOD_MAP = {
'GET': taskqueue_service_pb.TaskQueueAddRequest.GET,
'POST': taskqueue_service_pb.TaskQueueAddRequest.POST,
'HEAD': taskqueue_service_pb.TaskQueueAddRequest.HEAD,
'PUT': taskqueue_service_pb.TaskQueueAddRequest.PUT,
'DELETE': taskqueue_service_pb.TaskQueueAddRequest.DELETE,
}
_NON_POST_METHODS = frozenset(['GET', 'HEAD', 'PUT', 'DELETE'])
_BODY_METHODS = frozenset(['POST', 'PUT'])
_TASK_NAME_PATTERN = r'^[a-zA-Z0-9-]{1,%s}$' % MAX_TASK_NAME_LENGTH
_TASK_NAME_RE = re.compile(_TASK_NAME_PATTERN)
_QUEUE_NAME_PATTERN = r'^[a-zA-Z0-9-]{1,%s}$' % MAX_QUEUE_NAME_LENGTH
_QUEUE_NAME_RE = re.compile(_QUEUE_NAME_PATTERN)
_ERROR_MAPPING = {
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE: UnknownQueueError,
taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR:
TransientError,
taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR: InternalError,
taskqueue_service_pb.TaskQueueServiceError.TASK_TOO_LARGE:
TaskTooLargeError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_TASK_NAME:
InvalidTaskNameError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME:
InvalidQueueNameError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_URL: InvalidUrlError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_RATE:
InvalidQueueError,
taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED:
PermissionDeniedError,
taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS:
TaskAlreadyExistsError,
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK:
TombstonedTaskError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA: InvalidTaskError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST: Error,
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK: Error,
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE: Error,
taskqueue_service_pb.TaskQueueServiceError.DUPLICATE_TASK_NAME:
DuplicateTaskNameError,
taskqueue_service_pb.TaskQueueServiceError.TOO_MANY_TASKS:
TooManyTasksError,
}
_PRESERVE_ENVIRONMENT_HEADERS = (
('X-AppEngine-Default-Namespace', 'HTTP_X_APPENGINE_DEFAULT_NAMESPACE'),)
class _UTCTimeZone(datetime.tzinfo):
"""UTC timezone."""
ZERO = datetime.timedelta(0)
def utcoffset(self, dt):
return self.ZERO
def dst(self, dt):
return self.ZERO
def tzname(self, dt):
return 'UTC'
_UTC = _UTCTimeZone()
def _parse_relative_url(relative_url):
"""Parses a relative URL and splits it into its path and query string.
Args:
relative_url: The relative URL, starting with a '/'.
Returns:
Tuple (path, query) where:
path: The path in the relative URL.
query: The query string in the URL without the '?' character.
Raises:
_RelativeUrlError if the relative_url is invalid for whatever reason
"""
if not relative_url:
raise _RelativeUrlError('Relative URL is empty')
(scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url)
if scheme or netloc:
raise _RelativeUrlError('Relative URL may not have a scheme or location')
if fragment:
raise _RelativeUrlError('Relative URL may not specify a fragment')
if not path or path[0] != '/':
raise _RelativeUrlError('Relative URL path must start with "/"')
return path, query
def _flatten_params(params):
"""Converts a dictionary of parameters to a list of parameters.
Any unicode strings in keys or values will be encoded as UTF-8.
Args:
params: Dictionary mapping parameter keys to values. Values will be
converted to a string and added to the list as tuple (key, value). If
a values is iterable and not a string, each contained value will be
added as a separate (key, value) tuple.
Returns:
List of (key, value) tuples.
"""
def get_string(value):
if isinstance(value, unicode):
return unicode(value).encode('utf-8')
else:
return str(value)
param_list = []
for key, value in params.iteritems():
key = get_string(key)
if isinstance(value, basestring):
param_list.append((key, get_string(value)))
else:
try:
iterator = iter(value)
except TypeError:
param_list.append((key, str(value)))
else:
param_list.extend((key, get_string(v)) for v in iterator)
return param_list
class TaskRetryOptions(object):
"""The options used to decide when a failed Task will be retried."""
__CONSTRUCTOR_KWARGS = frozenset(
['min_backoff_seconds', 'max_backoff_seconds',
'task_age_limit', 'max_doublings', 'task_retry_limit'])
def __init__(self, **kwargs):
"""Initializer.
Args:
min_backoff_seconds: The minimum number of seconds to wait before retrying
a task after failure. (optional)
max_backoff_seconds: The maximum number of seconds to wait before retrying
a task after failure. (optional)
task_age_limit: The number of seconds after creation afterwhich a failed
task will no longer be retried. The given value will be rounded up to
the nearest integer. If task_retry_limit is also specified then the task
will be retried until both limits are reached. (optional)
max_doublings: The maximum number of times that the interval between
failed task retries will be doubled before the increase becomes
constant. The constant will be:
2**(max_doublings - 1) * min_backoff_seconds. (optional)
task_retry_limit: The maximum number of times to retry a failed task
before giving up. If task_age_limit is specified then the task will be
retried until both limits are reached. (optional)
Raises:
InvalidTaskRetryOptionsError if any of the parameters are invalid.
"""
args_diff = set(kwargs.iterkeys()) - self.__CONSTRUCTOR_KWARGS
if args_diff:
raise TypeError('Invalid arguments: %s' % ', '.join(args_diff))
self.__min_backoff_seconds = kwargs.get('min_backoff_seconds')
if (self.__min_backoff_seconds is not None and
self.__min_backoff_seconds < 0):
raise InvalidTaskRetryOptionsError(
'The minimum retry interval cannot be negative')
self.__max_backoff_seconds = kwargs.get('max_backoff_seconds')
if (self.__max_backoff_seconds is not None and
self.__max_backoff_seconds < 0):
raise InvalidTaskRetryOptionsError(
'The maximum retry interval cannot be negative')
if (self.__min_backoff_seconds is not None and
self.__max_backoff_seconds is not None and
self.__max_backoff_seconds < self.__min_backoff_seconds):
raise InvalidTaskRetryOptionsError(
'The maximum retry interval cannot be less than the '
'minimum retry interval')
self.__max_doublings = kwargs.get('max_doublings')
if self.__max_doublings is not None and self.__max_doublings < 0:
raise InvalidTaskRetryOptionsError(
'The maximum number of retry interval doublings cannot be negative')
self.__task_retry_limit = kwargs.get('task_retry_limit')
if self.__task_retry_limit is not None and self.__task_retry_limit < 0:
raise InvalidTaskRetryOptionsError(
'The maximum number of retries cannot be negative')
self.__task_age_limit = kwargs.get('task_age_limit')
if self.__task_age_limit is not None:
if self.__task_age_limit < 0:
raise InvalidTaskRetryOptionsError(
'The expiry countdown cannot be negative')
self.__task_age_limit = int(math.ceil(self.__task_age_limit))
@property
def min_backoff_seconds(self):
"""The minimum number of seconds to wait before retrying a task."""
return self.__min_backoff_seconds
@property
def max_backoff_seconds(self):
"""The maximum number of seconds to wait before retrying a task."""
return self.__max_backoff_seconds
@property
def task_age_limit(self):
"""The number of seconds afterwhich a failed task will not be retried."""
return self.__task_age_limit
@property
def max_doublings(self):
"""The number of times that the retry interval will be doubled."""
return self.__max_doublings
@property
def task_retry_limit(self):
"""The number of times that a failed task will be retried."""
return self.__task_retry_limit
class Task(object):
"""Represents a single Task on a queue."""
__CONSTRUCTOR_KWARGS = frozenset([
'countdown', 'eta', 'headers', 'method', 'name', 'params',
'retry_options', 'url'])
__eta_posix = None
def __init__(self, payload=None, **kwargs):
"""Initializer.
All parameters are optional.
Args:
payload: The payload data for this Task that will be delivered to the
webhook as the HTTP request body. This is only allowed for POST and PUT
methods.
countdown: Time in seconds into the future that this Task should execute.
Defaults to zero.
eta: Absolute time when the Task should execute. May not be specified
if 'countdown' is also supplied. This may be timezone-aware or
timezone-naive.
headers: Dictionary of headers to pass to the webhook. Values in the
dictionary may be iterable to indicate repeated header fields.
method: Method to use when accessing the webhook. Defaults to 'POST'.
name: Name to give the Task; if not specified, a name will be
auto-generated when added to a queue and assigned to this object. Must
match the _TASK_NAME_PATTERN regular expression.
params: Dictionary of parameters to use for this Task. For POST requests
these params will be encoded as 'application/x-www-form-urlencoded' and
set to the payload. For all other methods, the parameters will be
converted to a query string. May not be specified if the URL already
contains a query string.
url: Relative URL where the webhook that should handle this task is
located for this application. May have a query string unless this is
a POST method.
retry_options: TaskRetryOptions used to control when the task will be
retried if it fails.
Raises:
InvalidTaskError if any of the parameters are invalid;
InvalidTaskNameError if the task name is invalid; InvalidUrlError if
the task URL is invalid or too long; TaskTooLargeError if the task with
its payload is too large.
"""
args_diff = set(kwargs.iterkeys()) - self.__CONSTRUCTOR_KWARGS
if args_diff:
raise TypeError('Invalid arguments: %s' % ', '.join(args_diff))
self.__name = kwargs.get('name')
if self.__name and not _TASK_NAME_RE.match(self.__name):
raise InvalidTaskNameError(
'Task name does not match expression "%s"; found %s' %
(_TASK_NAME_PATTERN, self.__name))
self.__default_url, self.__relative_url, query = Task.__determine_url(
kwargs.get('url', ''))
self.__headers = urlfetch._CaselessDict()
self.__headers.update(kwargs.get('headers', {}))
self.__method = kwargs.get('method', 'POST').upper()
self.__payload = None
params = kwargs.get('params', {})
for header_name, environ_name in _PRESERVE_ENVIRONMENT_HEADERS:
value = os.environ.get(environ_name)
if value is not None:
self.__headers.setdefault(header_name, value)
self.__headers.setdefault('X-AppEngine-Current-Namespace',
namespace_manager.get_namespace())
if query and params:
raise InvalidTaskError('Query string and parameters both present; '
'only one of these may be supplied')
if self.__method == 'POST':
if payload and params:
raise InvalidTaskError('Message body and parameters both present for '
'POST method; only one of these may be supplied')
elif query:
raise InvalidTaskError('POST method may not have a query string; '
'use the "params" keyword argument instead')
elif params:
self.__payload = Task.__encode_params(params)
self.__headers.setdefault(
'content-type', 'application/x-www-form-urlencoded')
elif payload is not None:
self.__payload = Task.__convert_payload(payload, self.__headers)
elif self.__method in _NON_POST_METHODS:
if payload and self.__method not in _BODY_METHODS:
raise InvalidTaskError('Payload may only be specified for methods %s' %
', '.join(_BODY_METHODS))
if payload:
self.__payload = Task.__convert_payload(payload, self.__headers)
if params:
query = Task.__encode_params(params)
if query:
self.__relative_url = '%s?%s' % (self.__relative_url, query)
else:
raise InvalidTaskError('Invalid method: %s' % self.__method)
self.__headers_list = _flatten_params(self.__headers)
self.__eta_posix = Task.__determine_eta_posix(
kwargs.get('eta'), kwargs.get('countdown'))
self.__eta = None
self.__retry_options = kwargs.get('retry_options')
self.__enqueued = False
if self.size > MAX_TASK_SIZE_BYTES:
raise TaskTooLargeError('Task size must be less than %d; found %d' %
(MAX_TASK_SIZE_BYTES, self.size))
@staticmethod
def __determine_url(relative_url):
"""Determines the URL of a task given a relative URL and a name.
Args:
relative_url: The relative URL for the Task.
Returns:
Tuple (default_url, relative_url, query) where:
default_url: True if this Task is using the default URL scheme;
False otherwise.
relative_url: String containing the relative URL for this Task.
query: The query string for this task.
Raises:
InvalidUrlError if the relative_url is invalid.
"""
if not relative_url:
default_url, query = True, ''
else:
default_url = False
try:
relative_url, query = _parse_relative_url(relative_url)
except _RelativeUrlError, e:
raise InvalidUrlError(e)
if len(relative_url) > MAX_URL_LENGTH:
raise InvalidUrlError(
'Task URL must be less than %d characters; found %d' %
(MAX_URL_LENGTH, len(relative_url)))
return (default_url, relative_url, query)
@staticmethod
def __determine_eta_posix(eta=None, countdown=None, current_time=time.time):
"""Determines the ETA for a task.
If 'eta' and 'countdown' are both None, the current time will be used.
Otherwise, only one of them may be specified.
Args:
eta: A datetime.datetime specifying the absolute ETA or None;
this may be timezone-aware or timezone-naive.
countdown: Count in seconds into the future from the present time that
the ETA should be assigned to.
Returns:
A float giving a POSIX timestamp containing the ETA.
Raises:
InvalidTaskError if the parameters are invalid.
"""
if eta is not None and countdown is not None:
raise InvalidTaskError('May not use a countdown and ETA together')
elif eta is not None:
if not isinstance(eta, datetime.datetime):
raise InvalidTaskError('ETA must be a datetime.datetime instance')
elif eta.tzinfo is None:
return time.mktime(eta.timetuple()) + eta.microsecond*1e-6
else:
return calendar.timegm(eta.utctimetuple()) + eta.microsecond*1e-6
elif countdown is not None:
try:
countdown = float(countdown)
except ValueError:
raise InvalidTaskError('Countdown must be a number')
except OverflowError:
raise InvalidTaskError('Countdown out of range')
else:
return current_time() + countdown
else:
return current_time()
@staticmethod
def __encode_params(params):
"""URL-encodes a list of parameters.
Args:
params: Dictionary of parameters, possibly with iterable values.
Returns:
URL-encoded version of the params, ready to be added to a query string or
POST body.
"""
return urllib.urlencode(_flatten_params(params))
@staticmethod
def __convert_payload(payload, headers):
"""Converts a Task payload into UTF-8 and sets headers if necessary.
Args:
payload: The payload data to convert.
headers: Dictionary of headers.
Returns:
The payload as a non-unicode string.
Raises:
InvalidTaskError if the payload is not a string or unicode instance.
"""
if isinstance(payload, unicode):
headers.setdefault('content-type', 'text/plain; charset=utf-8')
payload = payload.encode('utf-8')
elif not isinstance(payload, str):
raise InvalidTaskError(
'Task payloads must be strings; invalid payload: %r' % payload)
return payload
@property
def on_queue_url(self):
"""Returns True if this Task will run on the queue's URL."""
return self.__default_url
@property
def eta_posix(self):
"""Returns a POSIX timestamp giving when this Task will execute."""
if self.__eta_posix is None and self.__eta is not None:
self.__eta_posix = Task.__determine_eta_posix(self.__eta)
return self.__eta_posix
@property
def eta(self):
"""Returns a datetime when this Task will execute."""
if self.__eta is None and self.__eta_posix is not None:
self.__eta = datetime.datetime.fromtimestamp(self.__eta_posix, _UTC)
return self.__eta
@property
def headers(self):
"""Returns a copy of the headers for this Task."""
return self.__headers.copy()
@property
def method(self):
"""Returns the method to use for this Task."""
return self.__method
@property
def name(self):
"""Returns the name of this Task.
Will be None if using auto-assigned Task names and this Task has not yet
been added to a Queue.
"""
return self.__name
@property
def payload(self):
"""Returns the payload for this task, which may be None."""
return self.__payload
@property
def size(self):
"""Returns the size of this task in bytes."""
HEADER_SEPERATOR = len(': \r\n')
header_size = sum((len(key) + len(value) + HEADER_SEPERATOR)
for key, value in self.__headers_list)
return (len(self.__method) + len(self.__payload or '') +
len(self.__relative_url) + header_size)
@property
def url(self):
"""Returns the relative URL for this Task."""
return self.__relative_url
@property
def retry_options(self):
"""Returns the TaskRetryOptions for this task, which may be None."""
return self.__retry_options
@property
def was_enqueued(self):
"""Returns True if this Task has been enqueued.
Note: This will not check if this task already exists in the queue.
"""
return self.__enqueued
def add(self, queue_name=_DEFAULT_QUEUE, transactional=False):
"""Adds this Task to a queue. See Queue.add."""
return Queue(queue_name).add(self, transactional=transactional)
class Queue(object):
"""Represents a Queue."""
def __init__(self, name=_DEFAULT_QUEUE):
"""Initializer.
Args:
name: Name of this queue. If not supplied, defaults to the default queue.
Raises:
InvalidQueueNameError if the queue name is invalid.
"""
if not _QUEUE_NAME_RE.match(name):
raise InvalidQueueNameError(
'Queue name does not match pattern "%s"; found %s' %
(_QUEUE_NAME_PATTERN, name))
self.__name = name
self.__url = '%s/%s' % (_DEFAULT_QUEUE_PATH, self.__name)
self._app = None
def purge(self):
"""Removes all the tasks in this Queue.
This function takes constant time to purge a Queue and some delay may apply
before the call is effective.
Raises:
UnknownQueueError if the Queue does not exist on server side.
"""
request = taskqueue_service_pb.TaskQueuePurgeQueueRequest()
response = taskqueue_service_pb.TaskQueuePurgeQueueResponse()
request.set_queue_name(self.__name)
if self._app:
request.set_app_id(self._app)
try:
apiproxy_stub_map.MakeSyncCall('taskqueue',
'PurgeQueue',
request,
response)
except apiproxy_errors.ApplicationError, e:
raise self.__TranslateError(e.application_error, e.error_detail)
def add(self, task, transactional=False):
"""Adds a Task or list of Tasks to this Queue.
If a list of more than one Tasks is given, a raised exception does not
guarantee that no tasks were added to the queue (unless transactional is set
to True). To determine which tasks were successfully added when an exception
is raised, check the Task.was_enqueued property.
Args:
task: A Task instance or a list of Task instances that will added to the
queue.
transactional: If False adds the Task(s) to a queue irrespectively to the
enclosing transaction success or failure. An exception is raised if True
and called outside of a transaction. (optional)
Returns:
The Task or list of tasks that was supplied to this method.
Raises:
BadTaskStateError: if the Task(s) has already been added to a queue.
BadTransactionStateError: if the transactional argument is true but this
call is being made outside of the context of a transaction.
Error-subclass on application errors.
"""
try:
tasks = list(iter(task))
except TypeError:
tasks = [task]
multiple = False
else:
multiple = True
self.__AddTasks(tasks, transactional)
if multiple:
return tasks
else:
assert len(tasks) == 1
return tasks[0]
def __AddTasks(self, tasks, transactional):
"""Internal implementation of .add() where tasks must be a list."""
if len(tasks) > MAX_TASKS_PER_ADD:
raise TooManyTasksError(
'No more than %d tasks can be added in a single add call' %
MAX_TASKS_PER_ADD)
request = taskqueue_service_pb.TaskQueueBulkAddRequest()
response = taskqueue_service_pb.TaskQueueBulkAddResponse()
task_names = set()
for task in tasks:
if task.name:
if task.name in task_names:
raise DuplicateTaskNameError(
'The task name %r is used more than once in the request' %
task.name)
task_names.add(task.name)
self.__FillAddRequest(task, request.add_add_request(), transactional)
try:
apiproxy_stub_map.MakeSyncCall('taskqueue', 'BulkAdd', request, response)
except apiproxy_errors.ApplicationError, e:
raise self.__TranslateError(e.application_error, e.error_detail)
assert response.taskresult_size() == len(tasks), (
'expected %d results from BulkAdd(), got %d' % (
len(tasks), response.taskresult_size()))
exception = None
for task, task_result in zip(tasks, response.taskresult_list()):
if task_result.result() == taskqueue_service_pb.TaskQueueServiceError.OK:
if task_result.has_chosen_task_name():
task._Task__name = task_result.chosen_task_name()
task._Task__enqueued = True
elif (task_result.result() ==
taskqueue_service_pb.TaskQueueServiceError.SKIPPED):
pass
elif exception is None:
exception = self.__TranslateError(task_result.result())
if exception is not None:
raise exception
return tasks
def __FillTaskQueueRetryParameters(self,
retry_options,
retry_retry_parameters):
"""Populates a TaskQueueRetryParameters with data from a TaskRetryOptions.
Args:
retry_options: The TaskRetryOptions instance to use as a source for the
data to be added to retry_retry_parameters.
retry_retry_parameters: A taskqueue_service_pb.TaskQueueRetryParameters
to populate.
"""
if retry_options.min_backoff_seconds is not None:
retry_retry_parameters.set_min_backoff_sec(
retry_options.min_backoff_seconds)
if retry_options.max_backoff_seconds is not None:
retry_retry_parameters.set_max_backoff_sec(
retry_options.max_backoff_seconds)
if retry_options.task_retry_limit is not None:
retry_retry_parameters.set_retry_limit(retry_options.task_retry_limit)
if retry_options.task_age_limit is not None:
retry_retry_parameters.set_age_limit_sec(retry_options.task_age_limit)
if retry_options.max_doublings is not None:
retry_retry_parameters.set_max_doublings(retry_options.max_doublings)
def __FillAddRequest(self, task, task_request, transactional):
"""Populates a TaskQueueAddRequest with the data from a Task instance.
Args:
task: The Task instance to use as a source for the data to be added to
task_request.
task_request: The taskqueue_service_pb.TaskQueueAddRequest to populate.
transactional: If true then populates the task_request.transaction message
with information from the enclosing transaction (if any).
Raises:
BadTaskStateError: If the task was already added to a Queue.
BadTransactionStateError: If the transactional argument is True and there
is no enclosing transaction.
InvalidTaskNameError: If the transactional argument is True and the task
is named.
"""
if task.was_enqueued:
raise BadTaskStateError('Task has already been enqueued')
adjusted_url = task.url
if task.on_queue_url:
adjusted_url = self.__url + task.url
task_request.set_queue_name(self.__name)
task_request.set_eta_usec(long(task.eta_posix * 1e6))
task_request.set_method(_METHOD_MAP.get(task.method))
task_request.set_url(adjusted_url)
if task.name:
task_request.set_task_name(task.name)
else:
task_request.set_task_name('')
if task.payload:
task_request.set_body(task.payload)
for key, value in _flatten_params(task.headers):
header = task_request.add_header()
header.set_key(key)
header.set_value(value)
if task.retry_options:
self.__FillTaskQueueRetryParameters(
task.retry_options, task_request.mutable_retry_parameters())
if self._app:
task_request.set_app_id(self._app)
if transactional:
from google.appengine.api import datastore
if not datastore._MaybeSetupTransaction(task_request, []):
raise BadTransactionStateError(
'Transactional adds are not allowed outside of transactions')
if task_request.has_transaction() and task.name:
raise InvalidTaskNameError(
'Task bound to a transaction cannot be named.')
@property
def name(self):
"""Returns the name of this queue."""
return self.__name
@staticmethod
def __TranslateError(error, detail=''):
"""Translates a TaskQueueServiceError into an exception.
Args:
error: Value from TaskQueueServiceError enum.
detail: A human-readable description of the error.
Returns:
The corresponding Exception sub-class for that error code.
"""
if (error >= taskqueue_service_pb.TaskQueueServiceError.DATASTORE_ERROR
and isinstance(error, int)):
from google.appengine.api import datastore
datastore_exception = datastore._DatastoreExceptionFromErrorCodeAndDetail(
error - taskqueue_service_pb.TaskQueueServiceError.DATASTORE_ERROR,
detail)
class JointException(datastore_exception.__class__, DatastoreError):
"""There was a datastore error while accessing the queue."""
__msg = (u'taskqueue.DatastoreError caused by: %s %s' %
(datastore_exception.__class__, detail))
def __str__(self):
return JointException.__msg
return JointException()
else:
exception_class = _ERROR_MAPPING.get(error, None)
if exception_class:
return exception_class(detail)
else:
return Error('Application error %s: %s' % (error, detail))
def add(*args, **kwargs):
"""Convenience method will create a Task and add it to a queue.
All parameters are optional.
Args:
name: Name to give the Task; if not specified, a name will be
auto-generated when added to a queue and assigned to this object. Must
match the _TASK_NAME_PATTERN regular expression.
queue_name: Name of this queue. If not supplied, defaults to
the default queue.
url: Relative URL where the webhook that should handle this task is
located for this application. May have a query string unless this is
a POST method.
method: Method to use when accessing the webhook. Defaults to 'POST'.
headers: Dictionary of headers to pass to the webhook. Values in the
dictionary may be iterable to indicate repeated header fields.
payload: The payload data for this Task that will be delivered to the
webhook as the HTTP request body. This is only allowed for POST and PUT
methods.
params: Dictionary of parameters to use for this Task. For POST requests
these params will be encoded as 'application/x-www-form-urlencoded' and
set to the payload. For all other methods, the parameters will be
converted to a query string. May not be specified if the URL already
contains a query string.
transactional: If False adds the Task(s) to a queue irrespectively to the
enclosing transaction success or failure. An exception is raised if True
and called outside of a transaction. (optional)
countdown: Time in seconds into the future that this Task should execute.
Defaults to zero.
eta: Absolute time when the Task should execute. May not be specified
if 'countdown' is also supplied. This may be timezone-aware or
timezone-naive.
retry_options: TaskRetryOptions used to control when the task will be
retried if it fails.
Returns:
The Task that was added to the queue.
Raises:
InvalidTaskError if any of the parameters are invalid;
InvalidTaskNameError if the task name is invalid; InvalidUrlError if
the task URL is invalid or too long; TaskTooLargeError if the task with
its payload is too large.
"""
transactional = kwargs.pop('transactional', False)
queue_name = kwargs.pop('queue_name', _DEFAULT_QUEUE)
return Task(*args, **kwargs).add(
queue_name=queue_name, transactional=transactional)