blob: 4c95c11f731fd7cc7190648f31ef56551f8b56af [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Task Queue API.
Enables an application to queue background work for itself. Work is done
through webhooks that process tasks pushed from a queue, or workers that
manually pull tasks from a queue. In push queues, Tasks will execute in
best-effort order of ETA. Webhooks that fail will cause tasks to be retried
at a later time. In pull queues, workers are responsible of leasing tasks for
processing and deleting the tasks when completed. Multiple queues may exist
with independent throttling controls.
Webhook URLs may be specified directly for push Tasks, or the default URL scheme
may be used, which will translate Task names into URLs relative to a Queue's
base path. A default queue is also provided for simple usage.
"""
__all__ = [
'BadTaskStateError', 'BadTransactionState', 'BadTransactionStateError',
'DatastoreError', 'DuplicateTaskNameError', 'Error', 'InternalError',
'InvalidQueueError', 'InvalidQueueNameError', 'InvalidTaskError',
'InvalidTaskNameError', 'InvalidUrlError', 'PermissionDeniedError',
'TaskAlreadyExistsError', 'TaskTooLargeError', 'TombstonedTaskError',
'TooManyTasksError', 'TransientError', 'UnknownQueueError',
'InvalidLeaseTimeError', 'InvalidMaxTasksError', 'InvalidDeadlineError',
'InvalidQueueModeError', 'TransactionalRequestTooLargeError',
'TaskLeaseExpiredError', 'QueuePausedError', 'InvalidEtaError',
'InvalidTagError',
'MAX_QUEUE_NAME_LENGTH', 'MAX_TASK_NAME_LENGTH', 'MAX_TASK_SIZE_BYTES',
'MAX_PULL_TASK_SIZE_BYTES', 'MAX_PUSH_TASK_SIZE_BYTES',
'MAX_LEASE_SECONDS', 'MAX_TASKS_PER_ADD',
'MAX_TASKS_PER_LEASE',
'MAX_URL_LENGTH',
'DEFAULT_APP_VERSION',
'Queue', 'QueueStatistics', 'Task', 'TaskRetryOptions', 'add', 'create_rpc']
import calendar
import cgi
import datetime
import logging
import math
import os
import re
import time
import urllib
import urlparse
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import app_identity
from google.appengine.api import namespace_manager
from google.appengine.api import urlfetch
from google.appengine.api.taskqueue import taskqueue_service_pb
from google.appengine.runtime import apiproxy_errors
class Error(Exception):
"""Base-class for exceptions in this module."""
class UnknownQueueError(Error):
"""The queue specified is unknown."""
class TransientError(Error):
"""There was a transient error while accessing the queue.
Please Try again later.
"""
class InternalError(Error):
"""There was an internal error while accessing this queue.
If this problem continues, please contact the App Engine team through
our support forum with a description of your problem.
"""
class InvalidTaskError(Error):
"""The task's parameters, headers, or method is invalid."""
class InvalidTaskNameError(InvalidTaskError):
"""The task's name is invalid."""
class TaskTooLargeError(InvalidTaskError):
"""The task is too large with its headers and payload."""
class TaskAlreadyExistsError(InvalidTaskError):
"""Task already exists. It has not yet run."""
class TombstonedTaskError(InvalidTaskError):
"""Task has been tombstoned."""
class InvalidUrlError(InvalidTaskError):
"""The task's relative URL is invalid."""
class InvalidEtaError(InvalidTaskError):
"""The task's ETA is invalid."""
class BadTaskStateError(Error):
"""The task is in the wrong state for the requested operation."""
class InvalidQueueError(Error):
"""The Queue's configuration is invalid."""
class InvalidQueueNameError(InvalidQueueError):
"""The Queue's name is invalid."""
class _RelativeUrlError(Error):
"""The relative URL supplied is invalid."""
class PermissionDeniedError(Error):
"""The requested operation is not allowed for this app."""
class DuplicateTaskNameError(Error):
"""The add arguments contain tasks with identical names."""
class TooManyTasksError(Error):
"""Too many tasks were present in a single function call."""
class DatastoreError(Error):
"""There was a datastore error while accessing the queue."""
class BadTransactionStateError(Error):
"""The state of the current transaction does not permit this operation."""
class InvalidTaskRetryOptionsError(Error):
"""The task retry configuration is invalid."""
class InvalidLeaseTimeError(Error):
"""The lease time period is invalid."""
class InvalidMaxTasksError(Error):
"""The requested max tasks in lease_tasks is invalid."""
class InvalidDeadlineError(Error):
"""The requested deadline in lease_tasks is invalid."""
class InvalidQueueModeError(Error):
"""Invoking PULL queue operation on a PUSH queue or vice versa."""
class TransactionalRequestTooLargeError(TaskTooLargeError):
"""The total size of this transaction (including tasks) was too large."""
class TaskLeaseExpiredError(Error):
"""The task lease could not be renewed because it had already expired."""
class QueuePausedError(Error):
"""The queue is paused and cannot process modify task lease requests."""
class InvalidTagError(Error):
"""The specified tag is invalid."""
class _DefaultAppVersionSingleton(object):
def __repr__(self):
return '<DefaultApplicationVersion>'
class _UnknownAppVersionSingleton(object):
def __repr__(self):
return '<UnknownApplicationVersion>'
BadTransactionState = BadTransactionStateError
MAX_QUEUE_NAME_LENGTH = 100
MAX_PULL_TASK_SIZE_BYTES = 2 ** 20
MAX_PUSH_TASK_SIZE_BYTES = 100 * (2 ** 10)
MAX_TASK_NAME_LENGTH = 500
MAX_TASK_SIZE_BYTES = MAX_PUSH_TASK_SIZE_BYTES
MAX_TASKS_PER_ADD = 100
MAX_TRANSACTIONAL_REQUEST_SIZE_BYTES = 2 ** 20
MAX_URL_LENGTH = 2083
MAX_TASKS_PER_LEASE = 1000
MAX_TAG_LENGTH = 500
MAX_LEASE_SECONDS = 3600 * 24 * 7
DEFAULT_APP_VERSION = _DefaultAppVersionSingleton()
_UNKNOWN_APP_VERSION = _UnknownAppVersionSingleton()
_DEFAULT_QUEUE = 'default'
_DEFAULT_QUEUE_PATH = '/_ah/queue'
_MAX_COUNTDOWN_SECONDS = 3600 * 24 * 30
_METHOD_MAP = {
'GET': taskqueue_service_pb.TaskQueueAddRequest.GET,
'POST': taskqueue_service_pb.TaskQueueAddRequest.POST,
'HEAD': taskqueue_service_pb.TaskQueueAddRequest.HEAD,
'PUT': taskqueue_service_pb.TaskQueueAddRequest.PUT,
'DELETE': taskqueue_service_pb.TaskQueueAddRequest.DELETE,
}
_NON_POST_HTTP_METHODS = frozenset(['GET', 'HEAD', 'PUT', 'DELETE'])
_BODY_METHODS = frozenset(['POST', 'PUT', 'PULL'])
_TASK_NAME_PATTERN = r'^[a-zA-Z0-9_-]{1,%s}$' % MAX_TASK_NAME_LENGTH
_TASK_NAME_RE = re.compile(_TASK_NAME_PATTERN)
_QUEUE_NAME_PATTERN = r'^[a-zA-Z0-9-]{1,%s}$' % MAX_QUEUE_NAME_LENGTH
_QUEUE_NAME_RE = re.compile(_QUEUE_NAME_PATTERN)
_ERROR_MAPPING = {
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE: UnknownQueueError,
taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR:
TransientError,
taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR: InternalError,
taskqueue_service_pb.TaskQueueServiceError.TASK_TOO_LARGE:
TaskTooLargeError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_TASK_NAME:
InvalidTaskNameError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME:
InvalidQueueNameError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_URL: InvalidUrlError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_RATE:
InvalidQueueError,
taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED:
PermissionDeniedError,
taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS:
TaskAlreadyExistsError,
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK:
TombstonedTaskError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA: InvalidEtaError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST: Error,
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK: Error,
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE: Error,
taskqueue_service_pb.TaskQueueServiceError.DUPLICATE_TASK_NAME:
DuplicateTaskNameError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_MODE:
InvalidQueueModeError,
taskqueue_service_pb.TaskQueueServiceError.TOO_MANY_TASKS:
TooManyTasksError,
taskqueue_service_pb.TaskQueueServiceError.TRANSACTIONAL_REQUEST_TOO_LARGE:
TransactionalRequestTooLargeError,
taskqueue_service_pb.TaskQueueServiceError.TASK_LEASE_EXPIRED:
TaskLeaseExpiredError,
taskqueue_service_pb.TaskQueueServiceError.QUEUE_PAUSED:
QueuePausedError,
taskqueue_service_pb.TaskQueueServiceError.INVALID_TAG:
InvalidTagError,
}
_PRESERVE_ENVIRONMENT_HEADERS = (
('X-AppEngine-Default-Namespace', 'HTTP_X_APPENGINE_DEFAULT_NAMESPACE'),)
class _UTCTimeZone(datetime.tzinfo):
"""UTC timezone."""
ZERO = datetime.timedelta(0)
def utcoffset(self, dt):
return self.ZERO
def dst(self, dt):
return self.ZERO
def tzname(self, dt):
return 'UTC'
def __repr__(self):
return '_UTCTimeZone()'
_UTC = _UTCTimeZone()
def _parse_relative_url(relative_url):
"""Parses a relative URL and splits it into its path and query string.
Args:
relative_url: The relative URL, starting with a '/'.
Returns:
Tuple (path, query) where:
path: The path in the relative URL.
query: The query string in the URL without the '?' character.
Raises:
_RelativeUrlError if the relative_url is invalid for whatever reason.
"""
if not relative_url:
raise _RelativeUrlError('Relative URL is empty')
(scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url)
if scheme or netloc:
raise _RelativeUrlError('Relative URL may not have a scheme or location')
if fragment:
raise _RelativeUrlError('Relative URL may not specify a fragment')
if not path or path[0] != '/':
raise _RelativeUrlError('Relative URL path must start with "/"')
return path, query
def _flatten_params(params):
"""Converts a dictionary of parameters to a list of parameters.
Any unicode strings in keys or values will be encoded as UTF-8.
Args:
params: Dictionary mapping parameter keys to values. Values will be
converted to a string and added to the list as tuple (key, value). If
a values is iterable and not a string, each contained value will be
added as a separate (key, value) tuple.
Returns:
List of (key, value) tuples.
"""
def get_string(value):
if isinstance(value, unicode):
return unicode(value).encode('utf-8')
else:
return str(value)
param_list = []
for key, value in params.iteritems():
key = get_string(key)
if isinstance(value, basestring):
param_list.append((key, get_string(value)))
else:
try:
iterator = iter(value)
except TypeError:
param_list.append((key, str(value)))
else:
param_list.extend((key, get_string(v)) for v in iterator)
return param_list
def _MakeAsyncCall(method, request, response, get_result_hook=None, rpc=None):
"""Internal helper to schedule an asynchronous RPC.
Args:
method: The name of the taskqueue_service method that should be called,
e.g. 'BulkAdd'.
request: The protocol buffer containing the request argument.
response: The protocol buffer to be populated with the response.
get_result_hook: An optional hook function used to process results
(See UserRPC.make_call() for more info).
rpc: An optional UserRPC object that will be used to make the call.
Returns:
A UserRPC object; either the one passed in as the rpc argument,
or a new one if no rpc was passed in.
"""
if rpc is None:
rpc = create_rpc()
assert rpc.service == 'taskqueue', repr(rpc.service)
rpc.make_call(method, request, response, get_result_hook, None)
return rpc
def _TranslateError(error, detail=''):
"""Translates a TaskQueueServiceError into an exception.
Args:
error: Value from TaskQueueServiceError enum.
detail: A human-readable description of the error.
Returns:
The corresponding Exception sub-class for that error code.
"""
if (error >= taskqueue_service_pb.TaskQueueServiceError.DATASTORE_ERROR
and isinstance(error, int)):
from google.appengine.api import datastore
datastore_exception = datastore._DatastoreExceptionFromErrorCodeAndDetail(
error - taskqueue_service_pb.TaskQueueServiceError.DATASTORE_ERROR,
detail)
class JointException(datastore_exception.__class__, DatastoreError):
"""There was a datastore error while accessing the queue."""
__msg = (u'taskqueue.DatastoreError caused by: %s %s' %
(datastore_exception.__class__, detail))
def __str__(self):
return JointException.__msg
return JointException()
else:
exception_class = _ERROR_MAPPING.get(error, None)
if exception_class:
return exception_class(detail)
else:
return Error('Application error %s: %s' % (error, detail))
def _ValidateDeadline(deadline):
if not isinstance(deadline, (int, long, float)):
raise TypeError(
'deadline must be numeric')
if deadline <= 0:
raise InvalidDeadlineError(
'Negative or zero deadline requested')
def create_rpc(deadline=None, callback=None):
"""Creates an RPC object for use with the Task Queue API.
Args:
deadline: Optional deadline in seconds for the operation; the default
is a system-specific deadline (typically 5 seconds).
callback: Optional callable to invoke on completion.
Returns:
An apiproxy_stub_map.UserRPC object specialized for this service.
"""
if deadline is not None:
_ValidateDeadline(deadline)
return apiproxy_stub_map.UserRPC('taskqueue', deadline, callback)
class TaskRetryOptions(object):
"""The options used to decide when a failed Task will be retried."""
__CONSTRUCTOR_KWARGS = frozenset(
['min_backoff_seconds', 'max_backoff_seconds',
'task_age_limit', 'max_doublings', 'task_retry_limit'])
def __init__(self, **kwargs):
"""Initializer.
Args:
min_backoff_seconds: The minimum number of seconds to wait before retrying
a task after failure. (optional)
max_backoff_seconds: The maximum number of seconds to wait before retrying
a task after failure. (optional)
task_age_limit: The number of seconds after creation afterwhich a failed
task will no longer be retried. The given value will be rounded up to
the nearest integer. If task_retry_limit is also specified then the task
will be retried until both limits are reached. (optional)
max_doublings: The maximum number of times that the interval between
failed task retries will be doubled before the increase becomes
constant. The constant will be:
2**(max_doublings - 1) * min_backoff_seconds. (optional)
task_retry_limit: The maximum number of times to retry a failed task
before giving up. If task_age_limit is specified then the task will be
retried until both limits are reached. (optional)
Raises:
InvalidTaskRetryOptionsError if any of the parameters are invalid.
"""
args_diff = set(kwargs.iterkeys()) - self.__CONSTRUCTOR_KWARGS
if args_diff:
raise TypeError('Invalid arguments: %s' % ', '.join(args_diff))
self.__min_backoff_seconds = kwargs.get('min_backoff_seconds')
if (self.__min_backoff_seconds is not None and
self.__min_backoff_seconds < 0):
raise InvalidTaskRetryOptionsError(
'The minimum retry interval cannot be negative')
self.__max_backoff_seconds = kwargs.get('max_backoff_seconds')
if (self.__max_backoff_seconds is not None and
self.__max_backoff_seconds < 0):
raise InvalidTaskRetryOptionsError(
'The maximum retry interval cannot be negative')
if (self.__min_backoff_seconds is not None and
self.__max_backoff_seconds is not None and
self.__max_backoff_seconds < self.__min_backoff_seconds):
raise InvalidTaskRetryOptionsError(
'The maximum retry interval cannot be less than the '
'minimum retry interval')
self.__max_doublings = kwargs.get('max_doublings')
if self.__max_doublings is not None and self.__max_doublings < 0:
raise InvalidTaskRetryOptionsError(
'The maximum number of retry interval doublings cannot be negative')
self.__task_retry_limit = kwargs.get('task_retry_limit')
if self.__task_retry_limit is not None and self.__task_retry_limit < 0:
raise InvalidTaskRetryOptionsError(
'The maximum number of retries cannot be negative')
self.__task_age_limit = kwargs.get('task_age_limit')
if self.__task_age_limit is not None:
if self.__task_age_limit < 0:
raise InvalidTaskRetryOptionsError(
'The expiry countdown cannot be negative')
self.__task_age_limit = int(math.ceil(self.__task_age_limit))
@property
def min_backoff_seconds(self):
"""The minimum number of seconds to wait before retrying a task."""
return self.__min_backoff_seconds
@property
def max_backoff_seconds(self):
"""The maximum number of seconds to wait before retrying a task."""
return self.__max_backoff_seconds
@property
def task_age_limit(self):
"""The number of seconds afterwhich a failed task will not be retried."""
return self.__task_age_limit
@property
def max_doublings(self):
"""The number of times that the retry interval will be doubled."""
return self.__max_doublings
@property
def task_retry_limit(self):
"""The number of times that a failed task will be retried."""
return self.__task_retry_limit
def __repr__(self):
properties = ['%s=%r' % (attr, getattr(self, attr)) for attr in
self.__CONSTRUCTOR_KWARGS]
return 'TaskRetryOptions(%s)' % ', '.join(properties)
class Task(object):
"""Represents a single Task on a queue."""
__CONSTRUCTOR_KWARGS = frozenset([
'countdown', 'eta', 'headers', 'method', 'name', 'params',
'retry_options', 'tag', 'target', 'url', '_size_check'])
__eta_posix = None
__target = None
def __init__(self, payload=None, **kwargs):
"""Initializer.
All parameters are optional.
Args:
payload: The payload data for this Task that will either be delivered
to the webhook as the HTTP request body or fetched by workers for pull
queues. This is only allowed for POST, PUT and PULL methods.
name: Name to give the Task; if not specified, a name will be
auto-generated when added to a queue and assigned to this object. Must
match the _TASK_NAME_PATTERN regular expression.
method: Method to use when accessing the webhook. Defaults to 'POST'. If
set to 'PULL', task will not be automatically delivered to the webhook,
instead it stays in the queue until leased.
url: Relative URL where the webhook that should handle this task is
located for this application. May have a query string unless this is
a POST method. Must not be specified if method is PULL.
headers: Dictionary of headers to pass to the webhook. Values in the
dictionary may be iterable to indicate repeated header fields. Must not
be specified if method is PULL.
params: Dictionary of parameters to use for Task. For POST and PULL
requests these params will be encoded as
'application/x-www-form-urlencoded' and set to the payload. For all
other methods, the parameters will be converted to a query string. Must
not be specified if the URL already contains a query string, or the
task already has payload.
countdown: Time in seconds into the future that this Task should execute.
Defaults to zero.
eta: A datetime.datetime specifying the absolute time at which the task
should be executed. Must not be specified if 'countdown' is specified.
This may be timezone-aware or timezone-naive. If None, defaults to now.
For pull tasks, no worker will be able to lease this task before the
time indicated by eta.
retry_options: TaskRetryOptions used to control when the task will be
retried if it fails.
target: The alternate version/backend on which to execute this task, or
DEFAULT_APP_VERSION to execute on the application's default version.
tag: The tag to be used when grouping by tag (PULL tasks only).
Raises:
InvalidEtaError: if the ETA is too far into the future;
InvalidTagError: if the tag is too long;
InvalidTaskError: if any of the parameters are invalid;
InvalidTaskNameError: if the task name is invalid;
InvalidUrlError: if the task URL is invalid or too long;
TaskTooLargeError: if the task with its payload is too large.
"""
args_diff = set(kwargs.iterkeys()) - self.__CONSTRUCTOR_KWARGS
if args_diff:
raise TypeError('Invalid arguments: %s' % ', '.join(args_diff))
self.__name = kwargs.get('name')
if self.__name and not _TASK_NAME_RE.match(self.__name):
raise InvalidTaskNameError(
'Task name does not match expression "%s"; found %s' %
(_TASK_NAME_PATTERN, self.__name))
self.__default_url, self.__relative_url, query = Task.__determine_url(
kwargs.get('url', ''))
self.__headers = urlfetch._CaselessDict()
self.__headers.update(kwargs.get('headers', {}))
self.__method = kwargs.get('method', 'POST').upper()
self.__tag = kwargs.get('tag')
self.__payload = None
self.__retry_count = 0
self.__queue_name = None
size_check = kwargs.get('_size_check', True)
params = kwargs.get('params', {})
for header_name, environ_name in _PRESERVE_ENVIRONMENT_HEADERS:
value = os.environ.get(environ_name)
if value is not None:
self.__headers.setdefault(header_name, value)
self.__headers.setdefault('X-AppEngine-Current-Namespace',
namespace_manager.get_namespace())
if query and params:
raise InvalidTaskError('Query string and parameters both present; '
'only one of these may be supplied')
if self.__method != 'PULL' and self.__tag is not None:
raise InvalidTaskError('tag may only be specified for PULL tasks')
if self.__method == 'PULL':
if not self.__default_url:
raise InvalidTaskError('url must not be specified for PULL tasks')
if kwargs.get('headers'):
raise InvalidTaskError('headers must not be specified for PULL tasks')
if kwargs.get('target'):
raise InvalidTaskError('target must not be specified for PULL tasks')
if params:
if payload:
raise InvalidTaskError(
'Message body and parameters both present for '
'PULL method; only one of these may be supplied')
payload = Task.__encode_params(params)
if payload is None:
raise InvalidTaskError('payload must be specified for PULL task')
self.__payload = Task.__convert_payload(payload, self.__headers)
elif self.__method == 'POST':
if payload and params:
raise InvalidTaskError('Message body and parameters both present for '
'POST method; only one of these may be '
'supplied')
elif query:
raise InvalidTaskError('POST method may not have a query string; '
'use the "params" keyword argument instead')
elif params:
self.__payload = Task.__encode_params(params)
self.__headers.setdefault(
'content-type', 'application/x-www-form-urlencoded')
elif payload is not None:
self.__payload = Task.__convert_payload(payload, self.__headers)
elif self.__method in _NON_POST_HTTP_METHODS:
if payload and self.__method not in _BODY_METHODS:
raise InvalidTaskError(
'Payload may only be specified for methods %s' %
', '.join(_BODY_METHODS))
if payload:
self.__payload = Task.__convert_payload(payload, self.__headers)
if params:
query = Task.__encode_params(params)
if query:
self.__relative_url = '%s?%s' % (self.__relative_url, query)
else:
raise InvalidTaskError('Invalid method: %s' % self.__method)
self.__target = kwargs.get('target')
self.__resolve_hostname_and_target()
self.__headers_list = _flatten_params(self.__headers)
self.__eta_posix = Task.__determine_eta_posix(
kwargs.get('eta'), kwargs.get('countdown'))
self.__eta = None
self.__retry_options = kwargs.get('retry_options')
self.__enqueued = False
self.__deleted = False
if self.__eta_posix - time.time() > _MAX_COUNTDOWN_SECONDS:
raise InvalidEtaError('ETA too far in the future')
if size_check:
if self.__method == 'PULL':
max_task_size_bytes = MAX_PULL_TASK_SIZE_BYTES
else:
max_task_size_bytes = MAX_PUSH_TASK_SIZE_BYTES
if self.size > max_task_size_bytes:
raise TaskTooLargeError('Task size must be less than %d; found %d' %
(max_task_size_bytes, self.size))
if self.__tag and len(self.__tag) > MAX_TAG_LENGTH:
raise InvalidTagError(
'Tag must be <= %d bytes. Got a %d byte tag.' % (
MAX_TAG_LENGTH, len(self.__tag)))
def __resolve_hostname_and_target(self):
"""Resolve the values of the target parameter and the `Host' header.
Requires that the attributes __target and __headers exist before this method
is called.
This function should only be called once from the __init__ function of the
Task class.
Raises:
InvalidTaskError: If the task is invalid.
"""
if 'HTTP_HOST' not in os.environ:
logging.warning(
'The HTTP_HOST environment variable was not set, but is required '
'to determine the correct value for the `Task.target\' property. '
'Please update your unit tests to specify a correct value for this '
'environment variable.')
if self.__target is not None and 'Host' in self.__headers:
raise InvalidTaskError(
'A host header may not be set when a target is specified.')
elif self.__target is not None:
host = self.__host_from_target(self.__target)
if host:
self.__headers['Host'] = host
elif 'Host' in self.__headers:
self.__target = self.__target_from_host(self.__headers['Host'])
else:
if 'HTTP_HOST' in os.environ:
self.__headers['Host'] = os.environ['HTTP_HOST']
self.__target = self.__target_from_host(self.__headers['Host'])
else:
self.__target = _UNKNOWN_APP_VERSION
@staticmethod
def __target_from_host(host):
"""Calculate the value of the target parameter from a host header.
Args:
host: A string representing the hostname for this task.
Returns:
A string containing the target of this task, or the constant
DEFAULT_APP_VERSION if it is the default version.
If this code is running in a unit-test where the environment variable
`DEFAULT_VERSION_HOSTNAME' is not set then the constant
_UNKNOWN_APP_VERSION is returned.
"""
default_hostname = app_identity.get_default_version_hostname()
if default_hostname is None:
return _UNKNOWN_APP_VERSION
if host.endswith(default_hostname):
version_name = host[:-(len(default_hostname) + 1)]
if version_name:
return version_name
return DEFAULT_APP_VERSION
@staticmethod
def __host_from_target(target):
"""Calculate the value of the host header from a target.
Args:
target: A string representing the target hostname or the constant
DEFAULT_APP_VERSION.
Returns:
The string to be used as the host header, or None if it can not be
determined.
"""
default_hostname = app_identity.get_default_version_hostname()
if default_hostname is None:
return None
if target is DEFAULT_APP_VERSION:
return default_hostname
else:
return '%s.%s' % (target, default_hostname)
@staticmethod
def __determine_url(relative_url):
"""Determines the URL of a task given a relative URL and a name.
Args:
relative_url: The relative URL for the Task.
Returns:
Tuple (default_url, relative_url, query) where:
default_url: True if this Task is using the default URL scheme;
False otherwise.
relative_url: String containing the relative URL for this Task.
query: The query string for this task.
Raises:
InvalidUrlError if the relative_url is invalid.
"""
if not relative_url:
default_url, query = True, ''
else:
default_url = False
try:
relative_url, query = _parse_relative_url(relative_url)
except _RelativeUrlError, e:
raise InvalidUrlError(e)
if len(relative_url) > MAX_URL_LENGTH:
raise InvalidUrlError(
'Task URL must be less than %d characters; found %d' %
(MAX_URL_LENGTH, len(relative_url)))
return (default_url, relative_url, query)
@staticmethod
def __determine_eta_posix(eta=None, countdown=None, current_time=None):
"""Determines the ETA for a task.
If 'eta' and 'countdown' are both None, the current time will be used.
Otherwise, only one of them may be specified.
Args:
eta: A datetime.datetime specifying the absolute ETA or None;
this may be timezone-aware or timezone-naive.
countdown: Count in seconds into the future from the present time that
the ETA should be assigned to.
current_time: Function that returns the current datetime. (Defaults to
time.time if None is provided.)
Returns:
A float giving a POSIX timestamp containing the ETA.
Raises:
InvalidTaskError if the parameters are invalid.
"""
if not current_time:
current_time = time.time
if eta is not None and countdown is not None:
raise InvalidTaskError('May not use a countdown and ETA together')
elif eta is not None:
if not isinstance(eta, datetime.datetime):
raise InvalidTaskError('ETA must be a datetime.datetime instance')
elif eta.tzinfo is None:
return time.mktime(eta.timetuple()) + eta.microsecond*1e-6
else:
return calendar.timegm(eta.utctimetuple()) + eta.microsecond*1e-6
elif countdown is not None:
try:
countdown = float(countdown)
except ValueError:
raise InvalidTaskError('Countdown must be a number')
except OverflowError:
raise InvalidTaskError('Countdown out of range')
else:
return current_time() + countdown
else:
return current_time()
@staticmethod
def __encode_params(params):
"""URL-encodes a list of parameters.
Args:
params: Dictionary of parameters, possibly with iterable values.
Returns:
URL-encoded version of the params, ready to be added to a query string or
POST body.
"""
return urllib.urlencode(_flatten_params(params))
@staticmethod
def __convert_payload(payload, headers):
"""Converts a Task payload into UTF-8 and sets headers if necessary.
Args:
payload: The payload data to convert.
headers: Dictionary of headers.
Returns:
The payload as a non-unicode string.
Raises:
InvalidTaskError if the payload is not a string or unicode instance.
"""
if isinstance(payload, unicode):
headers.setdefault('content-type', 'text/plain; charset=utf-8')
payload = payload.encode('utf-8')
elif not isinstance(payload, str):
raise InvalidTaskError(
'Task payloads must be strings; invalid payload: %r' % payload)
return payload
@classmethod
def _FromQueryAndOwnResponseTask(cls, queue_name, response_task):
kwargs = {
'_size_check': False,
'payload': response_task.body(),
'name': response_task.task_name(),
'method': 'PULL'}
if response_task.has_tag():
kwargs['tag'] = response_task.tag()
self = cls(**kwargs)
self.__eta_posix = response_task.eta_usec() * 1e-6
self.__retry_count = response_task.retry_count()
self.__queue_name = queue_name
self.__enqueued = True
return self
@property
def eta_posix(self):
"""Returns a POSIX timestamp giving when this Task will execute."""
if self.__eta_posix is None and self.__eta is not None:
self.__eta_posix = Task.__determine_eta_posix(self.__eta)
return self.__eta_posix
@property
def eta(self):
"""Returns a datetime when this Task will execute."""
if self.__eta is None and self.__eta_posix is not None:
self.__eta = datetime.datetime.fromtimestamp(self.__eta_posix, _UTC)
return self.__eta
@property
def _eta_usec(self):
"""Returns a int microseconds timestamp when this Task will execute."""
return int(round(self.eta_posix * 1e6))
@property
def headers(self):
"""Returns a copy of the headers for this Task."""
return self.__headers.copy()
@property
def method(self):
"""Returns the method to use for this Task."""
return self.__method
@property
def name(self):
"""Returns the name of this Task.
Will be None if using auto-assigned Task names and this Task has not yet
been added to a Queue.
"""
return self.__name
@property
def on_queue_url(self):
"""Returns True if this Task will run on the queue's URL."""
return self.__default_url
@property
def payload(self):
"""Returns the payload for this task, which may be None."""
return self.__payload
@property
def queue_name(self):
"""Returns the name of the queue this Task is associated with.
Will be None if this Task has not yet been added to a queue.
"""
return self.__queue_name
@property
def retry_count(self):
"""Returns the number of retries have been done on the task."""
return self.__retry_count
@property
def retry_options(self):
"""Returns the TaskRetryOptions for this task, which may be None."""
return self.__retry_options
@property
def size(self):
"""Returns the size of this task in bytes."""
HEADER_SEPERATOR = len(': \r\n')
header_size = sum((len(key) + len(value) + HEADER_SEPERATOR)
for key, value in self.__headers_list)
return (len(self.__method) + len(self.__payload or '') +
len(self.__relative_url) + header_size)
@property
def tag(self):
"""Returns the tag for this Task."""
return self.__tag
@property
def target(self):
"""Returns the target for this Task."""
return self.__target
@property
def url(self):
"""Returns the relative URL for this Task."""
return self.__relative_url
@property
def was_enqueued(self):
"""Returns True if this Task has been enqueued.
Note: This will not check if this task already exists in the queue.
"""
return self.__enqueued
@property
def was_deleted(self):
"""Returns True if this Task has been successfully deleted."""
return self.__deleted
def add_async(self, queue_name=_DEFAULT_QUEUE, transactional=False, rpc=None):
"""Asynchronously adds this Task to a queue. See Queue.add_async."""
return Queue(queue_name).add_async(self, transactional, rpc)
def add(self, queue_name=_DEFAULT_QUEUE, transactional=False):
"""Adds this Task to a queue. See Queue.add."""
return self.add_async(queue_name, transactional).get_result()
def extract_params(self):
"""Returns the parameters for this task.
Returns:
A dictionary of strings mapping parameter names to their values as
strings. If the same name parameter has several values then the value will
be a list of strings. For POST and PULL requests then the parameters are
extracted from the task payload. For all other methods, the parameters are
extracted from the url query string. An empty dictionary is returned if
the task contains an empty payload or query string.
Raises:
ValueError: if the payload does not contain valid
'application/x-www-form-urlencoded' data (for POST and PULL) or the url
does not contain a valid query (all other methods).
"""
if self.__method in ('PULL', 'POST'):
query = self.__payload
else:
query = urlparse.urlparse(self.__relative_url).query
p = {}
if not query:
return p
for key, value in cgi.parse_qsl(
query, keep_blank_values=True, strict_parsing=True):
p.setdefault(key, []).append(value)
for key, value in p.items():
if len(value) == 1:
p[key] = value[0]
return p
def __repr__(self):
COMMON_ATTRS = ['eta', 'method', 'name', 'queue_name', 'payload', 'size',
'retry_options', 'was_enqueued', 'was_deleted']
PULL_QUEUE_ATTRS = ['tag']
PUSH_QUEUE_ATTRS = ['headers', 'url', 'target']
if self.method == 'PULL':
attrs = COMMON_ATTRS + PULL_QUEUE_ATTRS
else:
attrs = COMMON_ATTRS + PUSH_QUEUE_ATTRS
properties = ['%s=%r' % (attr, getattr(self, attr))
for attr in sorted(attrs)]
return 'Task<%s>' % ', '.join(properties)
class QueueStatistics(object):
"""Represents the current state of a Queue."""
_ATTRS = ['queue', 'tasks', 'oldest_eta_usec', 'executed_last_minute',
'in_flight', 'enforced_rate']
def __init__(self,
queue,
tasks,
oldest_eta_usec=None,
executed_last_minute=None,
in_flight=None,
enforced_rate=None):
"""Constructor.
Args:
queue: The Queue instance this QueueStatistics is for.
tasks: The number of tasks left.
oldest_eta_usec: The eta of the oldest non-completed task for the queue;
None if unknown.
executed_last_minute: The number of tasks executed in the last minute.
in_flight: The number of tasks that are currently executing.
enforced_rate: The current enforced rate. In tasks/second.
"""
self.queue = queue
self.tasks = tasks
self.oldest_eta_usec = oldest_eta_usec
self.executed_last_minute = executed_last_minute
self.in_flight = in_flight
self.enforced_rate = enforced_rate
def __eq__(self, o):
if not isinstance(o, QueueStatistics):
return NotImplemented
result = True
for attr in self._ATTRS:
result = result and (getattr(self, attr) == getattr(o, attr))
return result
def __ne__(self, o):
if not isinstance(o, QueueStatistics):
return NotImplemented
result = False
for attr in self._ATTRS:
result = result or (getattr(self, attr) != getattr(o, attr))
return result
def __repr__(self):
properties = ['%s=%r' % (attr, getattr(self, attr)) for attr in self._ATTRS]
return 'QueueStatistics(%s)' % ', '.join(properties)
@classmethod
def _ConstructFromFetchQueueStatsResponse(cls, queue, response):
"""Helper for converting from a FetchQeueueStatsResponse_QueueStats proto.
Args:
queue: A Queue instance.
response: a FetchQeueueStatsResponse_QueueStats instance.
Returns:
A new QueueStatistics instance.
"""
args = {'queue': queue, 'tasks': response.num_tasks()}
if response.oldest_eta_usec() >= 0:
args['oldest_eta_usec'] = response.oldest_eta_usec()
else:
args['oldest_eta_usec'] = None
if response.has_scanner_info():
scanner_info = response.scanner_info()
args['executed_last_minute'] = scanner_info.executed_last_minute()
if scanner_info.has_requests_in_flight():
args['in_flight'] = scanner_info.requests_in_flight()
if scanner_info.has_enforced_rate():
args['enforced_rate'] = scanner_info.enforced_rate()
return cls(**args)
@classmethod
def fetch_async(cls, queue_or_queues, rpc=None):
"""Asynchronously get the queue details for multiple queues.
Args:
queue_or_queues: An iterable of Queue instances, or an iterable of
strings corresponding to queue names, or a Queue instance or a string
corresponding to a queue name.
rpc: An optional UserRPC object.
Returns:
A UserRPC object, call get_result to complete the RPC and obtain the
result.
If an iterable (other than string) is provided as input, the result will
be a list of of QueueStatistics objects, one for each queue in the order
requested.
Otherwise, if a single item was provided as input, then the result will be
a single QueueStatistics object.
Raises:
TypeError: If queue_or_queues is not one of: Queue instance, string, an
iterable containing only Queue instances or an iterable containing
only strings.
"""
wants_list = True
if isinstance(queue_or_queues, basestring):
queue_or_queues = [queue_or_queues]
wants_list = False
try:
queues_list = [queue for queue in queue_or_queues]
except TypeError:
queues_list = [queue_or_queues]
wants_list = False
contains_strs = any(isinstance(queue, basestring) for queue in queues_list)
contains_queues = any(isinstance(queue, Queue) for queue in queues_list)
if contains_strs and contains_queues:
raise TypeError('queue_or_queues must contain either strings or Queue '
'instances, not both.')
if contains_strs:
queues_list = [Queue(queue_name) for queue_name in queues_list]
return cls._FetchMultipleQueues(queues_list, wants_list, rpc)
@classmethod
def fetch(cls, queue_or_queues, deadline=10):
"""Get the queue details for multiple queues.
Args:
queue_or_queues: An iterable of Queue instances, or an iterable of
strings corresponding to queue names, or a Queue instance or a string
corresponding to a queue name.
deadline: The maximum number of seconds to wait before aborting the
method call.
Returns:
If an iterable (other than string) is provided as input, a list of of
QueueStatistics objects will be returned, one for each queue in the order
requested.
Otherwise, if a single item was provided as input, then a single
QueueStatistics object will be returned.
Raises:
TypeError: If queue_or_queues is not one of: Queue instance, string, an
iterable containing only Queue instances or an iterable containing
only strings.
Error-subclass on application errors.
"""
_ValidateDeadline(deadline)
if not queue_or_queues:
return []
rpc = create_rpc(deadline)
cls.fetch_async(queue_or_queues, rpc)
return rpc.get_result()
@classmethod
def _FetchMultipleQueues(cls, queues, multiple, rpc=None):
"""Internal implementation of fetch stats where queues must be a list."""
def ResultHook(rpc):
"""Process the TaskQueueFetchQueueStatsResponse."""
try:
rpc.check_success()
except apiproxy_errors.ApplicationError, e:
raise _TranslateError(e.application_error, e.error_detail)
assert len(queues) == rpc.response.queuestats_size(), (
'Expected %d results, got %d' % (
len(queues), rpc.response.queuestats_size()))
queue_stats = [cls._ConstructFromFetchQueueStatsResponse(queue, response)
for queue, response in zip(queues,
rpc.response.queuestats_list())]
if multiple:
return queue_stats
else:
assert len(queue_stats) == 1
return queue_stats[0]
request = taskqueue_service_pb.TaskQueueFetchQueueStatsRequest()
response = taskqueue_service_pb.TaskQueueFetchQueueStatsResponse()
requested_app_id = queues[0]._app
for queue in queues:
request.add_queue_name(queue.name)
if queue._app != requested_app_id:
raise InvalidQueueError('Inconsistent app ids requested.')
if requested_app_id:
request.set_app_id(requested_app_id)
return _MakeAsyncCall('FetchQueueStats',
request,
response,
ResultHook,
rpc)
class Queue(object):
"""Represents a Queue."""
def __init__(self, name=_DEFAULT_QUEUE):
"""Initializer.
Args:
name: Name of this queue. If not supplied, defaults to the default queue.
Raises:
InvalidQueueNameError if the queue name is invalid.
"""
if not _QUEUE_NAME_RE.match(name):
raise InvalidQueueNameError(
'Queue name does not match pattern "%s"; found %s' %
(_QUEUE_NAME_PATTERN, name))
self.__name = name
self.__url = '%s/%s' % (_DEFAULT_QUEUE_PATH, self.__name)
self._app = None
def purge(self):
"""Removes all the tasks in this Queue.
This function takes constant time to purge a Queue and some delay may apply
before the call is effective.
Raises:
Error-subclass on application errors.
"""
request = taskqueue_service_pb.TaskQueuePurgeQueueRequest()
response = taskqueue_service_pb.TaskQueuePurgeQueueResponse()
request.set_queue_name(self.__name)
if self._app:
request.set_app_id(self._app)
try:
apiproxy_stub_map.MakeSyncCall('taskqueue',
'PurgeQueue',
request,
response)
except apiproxy_errors.ApplicationError, e:
raise _TranslateError(e.application_error, e.error_detail)
def delete_tasks_by_name_async(self, task_name, rpc=None):
"""Asynchronously deletes a Task or list of Tasks in this Queue, by name.
This function is identical to delete_tasks_by_name() except that it returns
an asynchronous object. You can call get_result() on the return value to
block on the call.
Args:
task_name: A string corresponding to a task name, or an iterable of
strings corresponding to task names.
rpc: An optional UserRPC object.
Returns:
A UserRPC object, call get_result to complete the RPC and obtain the
result.
If an iterable (other than string) is provided as input, the result will
be a list of of Task objects, one for each task name in the order
requested. The Task.was_deleted property will be True for each task
deleted by this call, and will be False for unknown and tombstoned tasks.
Otherwise, if a single string was provided as input, then the result will
be a single Task object.
Raises:
DuplicateTaskNameError: if a Task name is repeated in the request.
"""
if isinstance(task_name, str):
return self.delete_tasks_async(Task(name=task_name), rpc)
else:
tasks = [Task(name=name) for name in task_name]
return self.delete_tasks_async(tasks, rpc)
def delete_tasks_by_name(self, task_name):
"""Deletes a Task or list of Tasks in this Queue, by name.
When multiple tasks are specified, an exception will be raised if any
individual task fails to be deleted.
Args:
task_name: A string corresponding to a task name, or an iterable of
strings corresponding to task names.
Returns:
If an iterable (other than string) is provided as input, a list of of
Task objects, one for each task name in the order requested. The
Task.was_deleted property will be True for each task deleted by this call,
and will be False for unknown and tombstoned tasks.
Otherwise, if a single string was provided as input, a single Task object.
Raises:
DuplicateTaskNameError: if a Task name is repeated in the request.
Error-subclass on application errors.
"""
return self.delete_tasks_by_name_async(task_name).get_result()
def delete_tasks_async(self, task, rpc=None):
"""Asynchronously deletes a Task or list of Tasks in this Queue.
This function is identical to delete_tasks() except that it returns an
asynchronous object. You can call get_result() on the return value to block
on the call.
Args:
task: A Task instance or a list of Task instances that will be deleted
from the Queue.
rpc: An optional UserRPC object.
Returns:
A UserRPC object, call get_result to complete the RPC and obtain the Task
or list of tasks passed into this call.
Raises:
BadTaskStateError: if the Task(s) to be deleted do not have task names or
have already been deleted.
DuplicateTaskNameError: if a Task is repeated in the request.
"""
try:
tasks = list(iter(task))
except TypeError:
tasks = [task]
multiple = False
else:
multiple = True
return self.__DeleteTasks(tasks, multiple, rpc)
def delete_tasks(self, task):
"""Deletes a Task or list of Tasks in this Queue.
When multiple tasks are specified, an exception will be raised if any
individual task fails to be deleted. Check the task.was_deleted property.
Task name is the only task attribute used to select tasks for deletion. If
there is any task with was_deleted property set to True, or without a task
name, a BadTaskStateError will be raised immediately.
Args:
task: A Task instance or a list of Task instances that will be deleted
from the Queue.
Returns:
The Task or list of tasks passed into this call.
Raises:
BadTaskStateError: if the Task(s) to be deleted do not have task names or
have already been deleted.
DuplicateTaskNameError: if a Task is repeated in the request.
Error-subclass on application errors.
"""
return self.delete_tasks_async(task).get_result()
def __DeleteTasks(self, tasks, multiple, rpc=None):
"""Internal implementation of delete_tasks_async(), tasks must be a list."""
def ResultHook(rpc):
"""Process the TaskQueueDeleteResponse."""
try:
rpc.check_success()
except apiproxy_errors.ApplicationError, e:
raise _TranslateError(e.application_error, e.error_detail)
assert rpc.response.result_size() == len(tasks), (
'expected %d results from delete(), got %d' % (
len(tasks), rpc.response.result_size()))
IGNORED_STATES = [
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK,
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK]
exception = None
for task, result in zip(tasks, rpc.response.result_list()):
if result == taskqueue_service_pb.TaskQueueServiceError.OK:
task._Task__deleted = True
elif result in IGNORED_STATES:
task._Task__deleted = False
elif exception is None:
exception = _TranslateError(result)
if exception is not None:
raise exception
if multiple:
return tasks
else:
assert len(tasks) == 1
return tasks[0]
request = taskqueue_service_pb.TaskQueueDeleteRequest()
response = taskqueue_service_pb.TaskQueueDeleteResponse()
request.set_queue_name(self.__name)
task_names = set()
for task in tasks:
if not task.name:
raise BadTaskStateError('Task name must be specified for a task')
if task.was_deleted:
raise BadTaskStateError('Task %s has already been deleted' % task.name)
if task.name in task_names:
raise DuplicateTaskNameError(
'The task name %r is used more than once in the request' %
task.name)
task_names.add(task.name)
request.add_task_name(task.name)
return _MakeAsyncCall('Delete',
request,
response,
ResultHook,
rpc)
@staticmethod
def _ValidateLeaseSeconds(lease_seconds):
if not isinstance(lease_seconds, (float, int, long)):
raise TypeError(
'lease_seconds must be a float or an integer')
lease_seconds = float(lease_seconds)
if lease_seconds < 0.0:
raise InvalidLeaseTimeError(
'lease_seconds must not be negative')
if lease_seconds > MAX_LEASE_SECONDS:
raise InvalidLeaseTimeError(
'Lease time must not be greater than %d seconds' %
MAX_LEASE_SECONDS)
return lease_seconds
@staticmethod
def _ValidateMaxTasks(max_tasks):
if not isinstance(max_tasks, (int, long)):
raise TypeError(
'max_tasks must be an integer')
if max_tasks <= 0:
raise InvalidMaxTasksError(
'Negative or zero tasks requested')
if max_tasks > MAX_TASKS_PER_LEASE:
raise InvalidMaxTasksError(
'Only %d tasks can be leased at once' %
MAX_TASKS_PER_LEASE)
def _QueryAndOwnTasks(self, request, response, queue_name, rpc=None):
def ResultHook(rpc):
"""Process the TaskQueueQueryAndOwnTasksResponse."""
try:
rpc.check_success()
except apiproxy_errors.ApplicationError, e:
raise _TranslateError(e.application_error, e.error_detail)
tasks = []
for response_task in rpc.response.task_list():
tasks.append(
Task._FromQueryAndOwnResponseTask(queue_name, response_task))
return tasks
return _MakeAsyncCall('QueryAndOwnTasks',
request,
response,
ResultHook,
rpc)
def lease_tasks_async(self, lease_seconds, max_tasks, rpc=None):
"""Asynchronously leases a number of tasks from the Queue.
This function is identical to lease_tasks() except that it returns an
asynchronous object. You can call get_result() on the return value to block
on the call.
Args:
lease_seconds: Number of seconds to lease the tasks.
max_tasks: Max number of tasks to lease from the pull Queue.
rpc: An optional UserRPC object.
Returns:
A UserRPC object, call get_result to complete the RPC and obtain the list
of tasks leased from the Queue.
Raises:
InvalidLeaseTimeError: if lease_seconds is not a valid float or integer
number or is outside the valid range.
InvalidMaxTasksError: if max_tasks is not a valid integer or is outside
the valid range.
"""
lease_seconds = self._ValidateLeaseSeconds(lease_seconds)
self._ValidateMaxTasks(max_tasks)
request = taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest()
response = taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse()
request.set_queue_name(self.__name)
request.set_lease_seconds(lease_seconds)
request.set_max_tasks(max_tasks)
return self._QueryAndOwnTasks(request, response, self.__name, rpc)
def lease_tasks(self, lease_seconds, max_tasks, deadline=10):
"""Leases a number of tasks from the Queue for a period of time.
This method can only be performed on a pull Queue. Any non-pull tasks in
the pull Queue will be converted into pull tasks when being leased. If
fewer than max_tasks are available, all available tasks will be returned.
The lease_tasks method supports leasing at most 1000 tasks for no longer
than a week in a single call.
Args:
lease_seconds: Number of seconds to lease the tasks.
max_tasks: Max number of tasks to lease from the pull Queue.
deadline: The maximum number of seconds to wait before aborting the
method call.
Returns:
A list of tasks leased from the Queue.
Raises:
InvalidLeaseTimeError: if lease_seconds is not a valid float or integer
number or is outside the valid range.
InvalidMaxTasksError: if max_tasks is not a valid integer or is outside
the valid range.
InvalidQueueModeError: if invoked on a queue that is not in pull mode.
Error-subclass on application errors.
"""
_ValidateDeadline(deadline)
rpc = create_rpc(deadline)
self.lease_tasks_async(lease_seconds, max_tasks, rpc)
return rpc.get_result()
def lease_tasks_by_tag_async(self,
lease_seconds,
max_tasks,
tag=None,
rpc=None):
"""Asynchronously leases a number of tasks from the Queue.
This function is identical to lease_tasks_by_tag() except that it returns an
asynchronous object. You can call get_result() on the return value to block
on the call.
Args:
lease_seconds: Number of seconds to lease the tasks.
max_tasks: Max number of tasks to lease from the pull Queue.
tag: The to query for, or None to group by the first available tag.
rpc: An optional UserRPC object.
Returns:
A UserRPC object, call get_result to complete the RPC and obtain the list
of tasks leased from the Queue.
Raises:
InvalidLeaseTimeError: if lease_seconds is not a valid float or integer
number or is outside the valid range.
InvalidMaxTasksError: if max_tasks is not a valid integer or is outside
the valid range.
"""
lease_seconds = self._ValidateLeaseSeconds(lease_seconds)
self._ValidateMaxTasks(max_tasks)
request = taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest()
response = taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse()
request.set_queue_name(self.__name)
request.set_lease_seconds(lease_seconds)
request.set_max_tasks(max_tasks)
request.set_group_by_tag(True)
if tag is not None:
request.set_tag(tag)
return self._QueryAndOwnTasks(request, response, self.__name, rpc)
def lease_tasks_by_tag(self, lease_seconds, max_tasks, tag=None, deadline=10):
"""Leases a number of tasks from the Queue for a period of time.
This method can only be performed on a pull Queue. Any non-pull tasks in
the pull Queue will be converted into pull tasks when being leased. If
fewer than max_tasks are available, all available tasks will be returned.
The lease_tasks method supports leasing at most 1000 tasks for no longer
than a week in a single call.
Args:
lease_seconds: Number of seconds to lease the tasks.
max_tasks: Max number of tasks to lease from the pull Queue.
tag: The to query for, or None to group by the first available tag.
deadline: The maximum number of seconds to wait before aborting the
method call.
Returns:
A list of tasks leased from the Queue.
Raises:
InvalidLeaseTimeError: if lease_seconds is not a valid float or integer
number or is outside the valid range.
InvalidMaxTasksError: if max_tasks is not a valid integer or is outside
the valid range.
InvalidQueueModeError: if invoked on a queue that is not in pull mode.
Error-subclass on application errors.
"""
_ValidateDeadline(deadline)
rpc = create_rpc(deadline)
self.lease_tasks_by_tag_async(lease_seconds, max_tasks, tag, rpc)
return rpc.get_result()
def add_async(self, task, transactional=False, rpc=None):
"""Asynchronously adds a Task or list of Tasks into this Queue.
This function is identical to add() except that it returns an asynchronous
object. You can call get_result() on the return value to block on the call.
Args:
task: A Task instance or a list of Task instances that will be added to
the queue.
transactional: If True, adds the task(s) if and only if the enclosing
transaction is successfully committed. It is an error for transactional
to be True in the absence of an enclosing transaction. If False, adds
the task(s) immediately, ignoring any enclosing transaction's success or
failure.
rpc: An optional UserRPC object.
Returns:
A UserRPC object, call get_result to complete the RPC and obtain the Task
or list of tasks that was supplied to this method.
Raises:
BadTaskStateError: if the Task(s) has already been added to a queue.
BadTransactionStateError: if the transactional argument is true but this
call is being made outside of the context of a transaction.
DuplicateTaskNameError: if a Task name is repeated in the request.
InvalidTaskError: if both push and pull tasks exist in the task list.
InvalidTaskNameError: if a Task name is provided but is not legal.
TooManyTasksError: if task contains more than MAX_TASKS_PER_ADD tasks.
TransactionalRequestTooLargeError: if transactional is True and the total
size of the tasks and supporting request data exceeds
MAX_TRANSACTIONAL_REQUEST_SIZE_BYTES.
"""
try:
tasks = list(iter(task))
except TypeError:
tasks = [task]
multiple = False
else:
multiple = True
has_push_task = False
has_pull_task = False
for task in tasks:
if task.method == 'PULL':
has_pull_task = True
else:
has_push_task = True
if has_push_task and has_pull_task:
raise InvalidTaskError(
'Can not add both push and pull tasks in a single call.')
if has_push_task:
fill_function = self.__FillAddPushTasksRequest
else:
fill_function = self.__FillAddPullTasksRequest
return self.__AddTasks(tasks,
transactional,
fill_function,
multiple,
rpc)
def add(self, task, transactional=False):
"""Adds a Task or list of Tasks into this Queue.
If a list of more than one Tasks is given, a raised exception does not
guarantee that no tasks were added to the queue (unless transactional is set
to True). To determine which tasks were successfully added when an exception
is raised, check the Task.was_enqueued property.
Push tasks, i.e. those with method not equal to PULL, may not be added to
queues in pull mode. Similarly, pull tasks may not be added to queues in
push mode.
If a TaskAlreadyExistsError or TombstonedTaskError is raised, the caller can
be guaranteed that for each one of the provided tasks, either the
corresponding task was successfully added, or a task with the given name was
successfully added in the past.
Args:
task: A Task instance or a list of Task instances that will be added to
the queue.
transactional: If True, adds the task(s) if and only if the enclosing
transaction is successfully committed. It is an error for transactional
to be True in the absence of an enclosing transaction. If False, adds
the task(s) immediately, ignoring any enclosing transaction's success or
failure.
Returns:
The Task or list of tasks that was supplied to this method.
Raises:
BadTaskStateError: if the Task(s) has already been added to a queue.
BadTransactionStateError: if the transactional argument is true but this
call is being made outside of the context of a transaction.
DuplicateTaskNameError: if a Task name is repeated in the request.
InvalidTaskNameError: if a Task name is provided but is not legal.
InvalidTaskError: if both push and pull tasks exist in the task list.
InvalidQueueModeError: if a task with method PULL is added to a queue in
push mode, or a task with method not equal to PULL is added to a queue
in pull mode.
TaskAlreadyExistsError: if a task with the same name as a given name has
previously been added to the queue.
TombstonedTaskError: if a task with the same name as a given name has
previously been added to the queue and deleted.
TooManyTasksError: if task contains more than MAX_TASKS_PER_ADD tasks.
TransactionalRequestTooLargeError: if transactional is True and the total
size of the tasks and supporting request data exceeds
MAX_TRANSACTIONAL_REQUEST_SIZE_BYTES.
Error-subclass on application errors.
"""
if task:
return self.add_async(task, transactional).get_result()
else:
return []
def __AddTasks(self, tasks, transactional, fill_request, multiple, rpc=None):
"""Internal implementation of adding tasks where tasks must be a list."""
def ResultHook(rpc):
"""Process the TaskQueueBulkAddResponse."""
try:
rpc.check_success()
except apiproxy_errors.ApplicationError, e:
raise _TranslateError(e.application_error, e.error_detail)
assert rpc.response.taskresult_size() == len(tasks), (
'expected %d results from BulkAdd(), got %d' % (
len(tasks), rpc.response.taskresult_size()))
exception = None
for task, task_result in zip(tasks, rpc.response.taskresult_list()):
if (task_result.result() ==
taskqueue_service_pb.TaskQueueServiceError.OK):
if task_result.has_chosen_task_name():
task._Task__name = task_result.chosen_task_name()
task._Task__queue_name = self.__name
task._Task__enqueued = True
elif (task_result.result() ==
taskqueue_service_pb.TaskQueueServiceError.SKIPPED):
pass
elif (exception is None or isinstance(exception, TaskAlreadyExistsError)
or isinstance(exception, TombstonedTaskError)):
exception = _TranslateError(task_result.result())
if exception is not None:
raise exception
if multiple:
return tasks
else:
assert len(tasks) == 1
return tasks[0]
if len(tasks) > MAX_TASKS_PER_ADD:
raise TooManyTasksError(
'No more than %d tasks can be added in a single call' %
MAX_TASKS_PER_ADD)
request = taskqueue_service_pb.TaskQueueBulkAddRequest()
response = taskqueue_service_pb.TaskQueueBulkAddResponse()
task_names = set()
for task in tasks:
if task.name:
if task.name in task_names:
raise DuplicateTaskNameError(
'The task name %r is used more than once in the request' %
task.name)
task_names.add(task.name)
if task.was_enqueued:
raise BadTaskStateError('Task has already been enqueued.')
fill_request(task, request.add_add_request(), transactional)
if transactional and (request.ByteSize() >
MAX_TRANSACTIONAL_REQUEST_SIZE_BYTES):
raise TransactionalRequestTooLargeError(
'Transactional request size must be less than %d; found %d' %
(MAX_TRANSACTIONAL_REQUEST_SIZE_BYTES, request.ByteSize()))
return _MakeAsyncCall('BulkAdd',
request,
response,
ResultHook,
rpc)
def __FillTaskQueueRetryParameters(self,
retry_options,
retry_retry_parameters):
"""Populates a TaskQueueRetryParameters with data from a TaskRetryOptions.
Args:
retry_options: The TaskRetryOptions instance to use as a source for the
data to be added to retry_retry_parameters.
retry_retry_parameters: A taskqueue_service_pb.TaskQueueRetryParameters
to populate.
"""
if retry_options.min_backoff_seconds is not None:
retry_retry_parameters.set_min_backoff_sec(
retry_options.min_backoff_seconds)
if retry_options.max_backoff_seconds is not None:
retry_retry_parameters.set_max_backoff_sec(
retry_options.max_backoff_seconds)
if retry_options.task_retry_limit is not None:
retry_retry_parameters.set_retry_limit(retry_options.task_retry_limit)
if retry_options.task_age_limit is not None:
retry_retry_parameters.set_age_limit_sec(retry_options.task_age_limit)
if retry_options.max_doublings is not None:
retry_retry_parameters.set_max_doublings(retry_options.max_doublings)
def __FillAddPushTasksRequest(self, task, task_request, transactional):
"""Populates a TaskQueueAddRequest with the data from a push Task instance.
Args:
task: The Task instance to use as a source for the data to be added to
task_request.
task_request: The taskqueue_service_pb.TaskQueueAddRequest to populate.
transactional: If true then populates the task_request.transaction message
with information from the enclosing transaction (if any).
Raises:
BadTaskStateError: If the task was already added to a Queue.
BadTransactionStateError: If the transactional argument is True and there
is no enclosing transaction.
InvalidTaskNameError: If the transactional argument is True and the task
is named.
"""
task_request.set_mode(taskqueue_service_pb.TaskQueueMode.PUSH)
self.__FillTaskCommon(task, task_request, transactional)
adjusted_url = task.url
if task.on_queue_url:
adjusted_url = self.__url + task.url
task_request.set_method(_METHOD_MAP.get(task.method))
task_request.set_url(adjusted_url)
if task.payload:
task_request.set_body(task.payload)
for key, value in _flatten_params(task.headers):
header = task_request.add_header()
header.set_key(key)
header.set_value(value)
if task.retry_options:
self.__FillTaskQueueRetryParameters(
task.retry_options, task_request.mutable_retry_parameters())
def __FillAddPullTasksRequest(self, task, task_request, transactional):
"""Populates a TaskQueueAddRequest with the data from a pull Task instance.
Args:
task: The Task instance to use as a source for the data to be added to
task_request.
task_request: The taskqueue_service_pb.TaskQueueAddRequest to populate.
transactional: If true then populates the task_request.transaction message
with information from the enclosing transaction (if any).
Raises:
BadTaskStateError: if the task doesn't have a payload, or has already been
added to a Queue.
BadTransactionStateError: if the transactional argument is True and there
is no enclosing transaction.
InvalidTaskNameError: if the transactional argument is True and the task
is named.
"""
task_request.set_mode(taskqueue_service_pb.TaskQueueMode.PULL)
self.__FillTaskCommon(task, task_request, transactional)
if task.payload is not None:
task_request.set_body(task.payload)
else:
raise BadTaskStateError('Pull task must have a payload')
def __FillTaskCommon(self, task, task_request, transactional):
"""Fills common fields for both push tasks and pull tasks."""
if self._app:
task_request.set_app_id(self._app)
task_request.set_queue_name(self.__name)
task_request.set_eta_usec(task._eta_usec)
if task.name:
task_request.set_task_name(task.name)
else:
task_request.set_task_name('')
if task.tag:
task_request.set_tag(task.tag)
if transactional:
from google.appengine.api import datastore
if not datastore._MaybeSetupTransaction(task_request, []):
raise BadTransactionStateError(
'Transactional adds are not allowed outside of transactions')
if task_request.has_transaction() and task.name:
raise InvalidTaskNameError(
'Task bound to a transaction cannot be named.')
@property
def name(self):
"""Returns the name of this queue."""
return self.__name
def modify_task_lease(self, task, lease_seconds):
"""Modifies the lease of a task in this queue.
Args:
task: A task instance that will have its lease modified.
lease_seconds: Number of seconds, from the current time, that the task
lease will be modified to. If lease_seconds is 0, then the task lease
is removed and the task will be available for leasing again using
the lease_tasks method.
Raises:
TypeError: if lease_seconds is not a valid float or integer.
InvalidLeaseTimeError: if lease_seconds is outside the valid range.
Error-subclass on application errors.
"""
lease_seconds = self._ValidateLeaseSeconds(lease_seconds)
request = taskqueue_service_pb.TaskQueueModifyTaskLeaseRequest()
response = taskqueue_service_pb.TaskQueueModifyTaskLeaseResponse()
request.set_queue_name(self.__name)
request.set_task_name(task.name)
request.set_eta_usec(task._eta_usec)
request.set_lease_seconds(lease_seconds)
try:
apiproxy_stub_map.MakeSyncCall('taskqueue',
'ModifyTaskLease',
request,
response)
except apiproxy_errors.ApplicationError, e:
raise _TranslateError(e.application_error, e.error_detail)
task._Task__eta_posix = response.updated_eta_usec() * 1e-6
task._Task__eta = None
def fetch_statistics_async(self, rpc=None):
"""Asynchronously get the current details about this queue.
Args:
rpc: An optional UserRPC object.
Returns:
A UserRPC object, call get_result to complete the RPC and obtain a
QueueStatistics instance containing information about this queue.
"""
return QueueStatistics.fetch_async(self, rpc)
def fetch_statistics(self, deadline=10):
"""Get the current details about this queue.
Args:
deadline: The maximum number of seconds to wait before aborting the
method call.
Returns:
A QueueStatistics instance containing information about this queue.
Error-subclass on application errors.
"""
_ValidateDeadline(deadline)
rpc = create_rpc(deadline)
self.fetch_statistics_async(rpc)
return rpc.get_result()
def __repr__(self):
ATTRS = ['name']
if self._app:
ATTRS.append('_app')
properties = ['%s=%r' % (attr, getattr(self, attr)) for attr in ATTRS]
return 'Queue<%s>' % ', '.join(properties)
def __eq__(self, o):
if not isinstance(o, Queue):
return NotImplemented
return self.name == o.name and self._app == o._app
def __ne__(self, o):
if not isinstance(o, Queue):
return NotImplemented
return self.name != o.name or self._app != o._app
def add(*args, **kwargs):
"""Convenience method will create a Task and add it to a queue.
All parameters are optional.
Push tasks, i.e. those with method not equal to PULL, may not be added to
queues in pull mode. Similarly, pull tasks may not be added to queues in push
mode.
Args:
payload: The payload data for this Task that will be delivered to the
webhook as the HTTP request body or fetched by workers for pull queues.
This is only allowed for POST, PUT, and PULL methods.
queue_name: Name of queue to insert task into. If not supplied, defaults to
the default queue.
name: Name to give the Task; if not specified, a name will be
auto-generated when added to a queue and assigned to this object. Must
match the _TASK_NAME_PATTERN regular expression.
method: Method to use when accessing the webhook. Defaults to 'POST'. If
set to 'PULL', task will not be automatiacally delivered to the webhook,
instead it stays in the queue until leased.
url: Relative URL where the webhook that should handle this task is
located for this application. May have a query string unless this is
a POST method. Must not be specified if method is PULL.
headers: Dictionary of headers to pass to the webhook. Values in the
dictionary may be iterable to indicate repeated header fields. Must not be
specified if method is PULL.
params: Dictionary of parameters to use for this Task. For POST and PULL
requests these params will be encoded as
'application/x-www-form-urlencoded' and set to the payload. For all other
methods, the parameters will be converted to a query string. Must not be
specified if the URL already contains a query string, or the task already
has a payload.
transactional: If True, adds the task(s) if and only if the enclosing
transaction is successfully committed. It is an error for transactional
to be True in the absence of an enclosing transaction. If False, adds
the task(s) immediately, ignoring any enclosing transaction's success or
failure.
countdown: Time in seconds into the future that this Task should execute.
Defaults to zero.
eta: A datetime.datetime specifying the absolute time at which the task
should be executed. Must not be specified if 'countdown' is specified.
This may be timezone-aware or timezone-naive. If None, defaults to now.
For pull tasks, no worker will be able to lease this task before the time
indicated by eta.
retry_options: TaskRetryOptions used to control when the task will be
retried if it fails.
tag: The tag to be used when grouping by tag (PULL tasks only).
target: The alternate version/backend on which to execute this task, or
DEFAULT_APP_VERSION to execute on the application's default version.
Returns:
The Task that was added to the queue.
Raises:
BadTransactionStateError: if the transactional argument is true but this
call is being made outside of the context of a transaction.
InvalidEtaError: if the ETA is too far into the future.
InvalidQueueModeError if a task with method PULL is added to a queue in push
mode, or a task with method not equal to PULL is added to a queue in pull
mode.
InvalidTagError: if the tag is too long.
InvalidTaskError if any of the parameters are invalid.
InvalidTaskNameError if the task name is invalid.
InvalidUrlError if the task URL is invalid or too long.
TaskTooLargeError if the task with its payload is too large.
TransactionalRequestTooLargeError: if transactional is True and the total
size of the tasks and supporting request data exceeds
MAX_TRANSACTIONAL_REQUEST_SIZE_BYTES.
"""
transactional = kwargs.pop('transactional', False)
queue_name = kwargs.pop('queue_name', _DEFAULT_QUEUE)
return Task(*args, **kwargs).add(
queue_name=queue_name, transactional=transactional)