blob: c7ba941ed8b018f203feb8e2a2ecb1ed70a04f70 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""QueueInfo tools.
A library for working with QueueInfo records, describing task queue entries
for an application. Supports loading the records from queue.yaml.
A queue has two required parameters and various optional ones. The required
parameters are 'name' (must be unique for an appid) and 'rate' (the rate
at which jobs in the queue are run). There is an optional parameter
'bucket_size' that will allow tokens to be 'saved up' (for more on the
algorithm, see http://en.wikipedia.org/wiki/Token_Bucket). rate is expressed
as number/unit, with number being an int or a float, and unit being one of
's' (seconds), 'm' (minutes), 'h' (hours) or 'd' (days). bucket_size is
an integer.
An example of the use of bucket_size rate: the free email quota is 2000/d,
and the maximum you can send in a single minute is 11. So we can define a
queue for sending email like this:
queue:
- name: mail-queue
rate: 2000/d
bucket_size: 10
If this queue had been idle for a while before some jobs were submitted to it,
the first 10 jobs submitted would be run immediately, then subsequent ones
would be run once every 40s or so. The limit of 2000 per day would still apply.
Another optional parameter is 'max_concurrent_requests', which pertains to the
requests being made by the queue. It specifies the maximum number of requests
that may be in-flight at any one time. An example:
queue:
- name: server-queue
rate: 50/s
max_concurrent_requests: 5
Each queue has an optional 'mode' parameter with legal values 'push' and 'pull'.
If mode is not specified, it defaults to 'push'. Tasks in queues with mode
'push' are invoked (pushed) at the specified rate. Tasks in queues with mode
'pull' are not directly invoked by App Engine. These tasks are leased for a
period by client code, and deleted by client code when the task's work is
finished. If not deleted before the expiry of the lease, the tasks are available
for lease again.
Each queue has an optional 'target' parameter. If specified all tasks inserted
into the queue will be executed on the specified alternate version/server
instance.
A queue may also optionally specify retry_parameters.
retry_parameters:
task_retry_limit: 100
task_age_limit: 1d
min_backoff_seconds: 0.1
max_backoff_seconds: 3600
max_doublings: 10
Each task in the queue that fails during execution will be retried using these
parameters. All these fields are optional.
task_retry_limit: A non-negative integer. Tasks will be retried a maximum of
task_retry_limit times before failing permanently. If task_age_limit is also
specified, both task_retry_limit and task_age_limit must be exceeded before a
task fails permanently.
task_age_limit: A non-negative floating point number followed by a suffix s
(seconds), m (minutes), h (hours) or d (days). If the time since a task was
first tried exceeds task_age_limit, it will fail permanently. If
task_retry_limit is also specified, both task_retry_limit and task_age_limit
must be exceeded before a task fails permanently.
min_backoff_seconds: A non-negative floating point number. This is the minimum
interval after the first failure and the first retry of a task. If
max_backoff_seconds is also specified, min_backoff_seconds must not be greater
than max_backoff_seconds.
max_backoff_seconds: A non-negative floating point number. This is the maximum
allowed interval between successive retries of a failed task. If
min_backoff_seconds is also specified, min_backoff_seconds must not be greater
than max_backoff_seconds.
max_doublings: A non-negative integer. On successive failures, the retry backoff
interval will be successively doubled up to max_doublings times, starting at
min_backoff_seconds and not exceeding max_backoff_seconds. For retries after
max_doublings, the retry backoff will increase by the value of the backoff
when doubling ceased. e.g. for min_backoff_seconds of 1 ,max_doublings of 5,
we have successive retry backoffs of 1, 2, 4, 8, 16, 32, 64, 96, 128, ...
not exceeding max_backoff_seconds.
A queue may optionally specify an acl (Access Control List).
acl:
- user_email: a@foo.com
- writer_email: b@gmail.com
Each email must correspond to an account hosted by Google. The acl is
enforced for queue access from outside AppEngine.
An app's queues are also subject to storage quota limits for their stored tasks,
i.e. those tasks that have been added to queues but not yet executed. This quota
is part of their total storage quota (including datastore and blobstore quota).
We allow an app to override the default portion of this quota available for
taskqueue storage (100M) with a top level field "total_storage_limit".
total_storage_limit: 1.2G
If no suffix is specified, the number is interpreted as bytes. Supported
suffices are B (bytes), K (kilobytes), M (megabytes), G (gigabytes) and
T (terabytes). If total_storage_limit exceeds the total disk storage
available to an app, it is clamped.
"""
from google.appengine.api import appinfo
from google.appengine.api import validation
from google.appengine.api import yaml_builder
from google.appengine.api import yaml_listener
from google.appengine.api import yaml_object
from google.appengine.api.taskqueue import taskqueue_service_pb
_NAME_REGEX = r'^[A-Za-z0-9-]{0,499}$'
_RATE_REGEX = r'^(0|[0-9]+(\.[0-9]*)?/[smhd])'
_TOTAL_STORAGE_LIMIT_REGEX = r'^([0-9]+(\.[0-9]*)?[BKMGT]?)'
_MODE_REGEX = r'(pull)|(push)'
MODULE_ID_RE_STRING = r'(?!-)[a-z\d\-]{1,63}'
MODULE_VERSION_RE_STRING = r'(?!-)[a-z\d\-]{1,100}'
_VERSION_REGEX = r'^(?:(?:(%s):)?)(%s)$' % (MODULE_ID_RE_STRING,
MODULE_VERSION_RE_STRING)
QUEUE = 'queue'
NAME = 'name'
RATE = 'rate'
BUCKET_SIZE = 'bucket_size'
MODE = 'mode'
TARGET = 'target'
MAX_CONCURRENT_REQUESTS = 'max_concurrent_requests'
TOTAL_STORAGE_LIMIT = 'total_storage_limit'
BYTE_SUFFIXES = 'BKMGT'
RETRY_PARAMETERS = 'retry_parameters'
TASK_RETRY_LIMIT = 'task_retry_limit'
TASK_AGE_LIMIT = 'task_age_limit'
MIN_BACKOFF_SECONDS = 'min_backoff_seconds'
MAX_BACKOFF_SECONDS = 'max_backoff_seconds'
MAX_DOUBLINGS = 'max_doublings'
ACL = 'acl'
USER_EMAIL = 'user_email'
WRITER_EMAIL = 'writer_email'
class MalformedQueueConfiguration(Exception):
"""Configuration file for Task Queue is malformed."""
class RetryParameters(validation.Validated):
"""Retry parameters for a single task queue."""
ATTRIBUTES = {
TASK_RETRY_LIMIT: validation.Optional(validation.TYPE_INT),
TASK_AGE_LIMIT: validation.Optional(validation.TimeValue()),
MIN_BACKOFF_SECONDS: validation.Optional(validation.TYPE_FLOAT),
MAX_BACKOFF_SECONDS: validation.Optional(validation.TYPE_FLOAT),
MAX_DOUBLINGS: validation.Optional(validation.TYPE_INT),
}
class Acl(validation.Validated):
"""Access control list for a single task queue."""
ATTRIBUTES = {
USER_EMAIL: validation.Optional(validation.TYPE_STR),
WRITER_EMAIL: validation.Optional(validation.TYPE_STR),
}
class QueueEntry(validation.Validated):
"""A queue entry describes a single task queue."""
ATTRIBUTES = {
NAME: _NAME_REGEX,
RATE: validation.Optional(_RATE_REGEX),
MODE: validation.Optional(_MODE_REGEX),
BUCKET_SIZE: validation.Optional(validation.TYPE_INT),
MAX_CONCURRENT_REQUESTS: validation.Optional(validation.TYPE_INT),
RETRY_PARAMETERS: validation.Optional(RetryParameters),
ACL: validation.Optional(validation.Repeated(Acl)),
TARGET: validation.Optional(_VERSION_REGEX),
}
class QueueInfoExternal(validation.Validated):
"""QueueInfoExternal describes all queue entries for an application."""
ATTRIBUTES = {
appinfo.APPLICATION: validation.Optional(appinfo.APPLICATION_RE_STRING),
TOTAL_STORAGE_LIMIT: validation.Optional(_TOTAL_STORAGE_LIMIT_REGEX),
QUEUE: validation.Optional(validation.Repeated(QueueEntry)),
}
def LoadSingleQueue(queue_info, open_fn=None):
"""Load a queue.yaml file or string and return a QueueInfoExternal object.
Args:
queue_info: the contents of a queue.yaml file, as a string.
open_fn: Function for opening files. Unused.
Returns:
A QueueInfoExternal object.
"""
builder = yaml_object.ObjectBuilder(QueueInfoExternal)
handler = yaml_builder.BuilderHandler(builder)
listener = yaml_listener.EventListener(handler)
listener.Parse(queue_info)
queue_info = handler.GetResults()
if len(queue_info) < 1:
raise MalformedQueueConfiguration('Empty queue configuration.')
if len(queue_info) > 1:
raise MalformedQueueConfiguration('Multiple queue: sections '
'in configuration.')
return queue_info[0]
def ParseRate(rate):
"""Parses a rate string in the form number/unit, or the literal 0.
The unit is one of s (seconds), m (minutes), h (hours) or d (days).
Args:
rate: the rate string.
Returns:
a floating point number representing the rate/second.
Raises:
MalformedQueueConfiguration: if the rate is invalid
"""
if rate == "0":
return 0.0
elements = rate.split('/')
if len(elements) != 2:
raise MalformedQueueConfiguration('Rate "%s" is invalid.' % rate)
number, unit = elements
try:
number = float(number)
except ValueError:
raise MalformedQueueConfiguration('Rate "%s" is invalid:'
' "%s" is not a number.' %
(rate, number))
if unit not in 'smhd':
raise MalformedQueueConfiguration('Rate "%s" is invalid:'
' "%s" is not one of s, m, h, d.' %
(rate, unit))
if unit == 's':
return number
if unit == 'm':
return number/60
if unit == 'h':
return number/(60 * 60)
if unit == 'd':
return number/(24 * 60 * 60)
def ParseTotalStorageLimit(limit):
"""Parses a string representing the storage bytes limit.
Optional limit suffixes are:
B (bytes), K (kilobytes), M (megabytes), G (gigabytes), T (terabytes)
Args:
limit: The storage bytes limit string.
Returns:
An int representing the storage limit in bytes.
Raises:
MalformedQueueConfiguration: if the limit argument isn't a valid python
double followed by an optional suffix.
"""
limit = limit.strip()
if not limit:
raise MalformedQueueConfiguration('Total Storage Limit must not be empty.')
try:
if limit[-1] in BYTE_SUFFIXES:
number = float(limit[0:-1])
for c in BYTE_SUFFIXES:
if limit[-1] != c:
number = number * 1024
else:
return int(number)
else:
return int(limit)
except ValueError:
raise MalformedQueueConfiguration('Total Storage Limit "%s" is invalid.' %
limit)
def ParseTaskAgeLimit(age_limit):
"""Parses a string representing the task's age limit (maximum allowed age).
The string must be a non-negative integer or floating point number followed by
one of s, m, h, or d (seconds, minutes, hours or days respectively).
Args:
age_limit: The task age limit string.
Returns:
An int representing the age limit in seconds.
Raises:
MalformedQueueConfiguration: if the limit argument isn't a valid python
double followed by a required suffix.
"""
age_limit = age_limit.strip()
if not age_limit:
raise MalformedQueueConfiguration('Task Age Limit must not be empty.')
unit = age_limit[-1]
if unit not in "smhd":
raise MalformedQueueConfiguration('Task Age_Limit must be in s (seconds), '
'm (minutes), h (hours) or d (days)')
try:
number = float(age_limit[0:-1])
if unit == 's':
return int(number)
if unit == 'm':
return int(number * 60)
if unit == 'h':
return int(number * 3600)
if unit == 'd':
return int(number * 86400)
except ValueError:
raise MalformedQueueConfiguration('Task Age_Limit "%s" is invalid.' %
age_limit)
def TranslateRetryParameters(retry):
"""Populates a TaskQueueRetryParameters from a queueinfo.RetryParameters.
Args:
retry: A queueinfo.RetryParameters read from queue.yaml that describes the
queue's retry parameters.
Returns:
A taskqueue_service_pb.TaskQueueRetryParameters proto populated with the
data from "retry".
Raises:
MalformedQueueConfiguration: if the retry parameters are invalid.
"""
params = taskqueue_service_pb.TaskQueueRetryParameters()
if retry.task_retry_limit is not None:
params.set_retry_limit(int(retry.task_retry_limit))
if retry.task_age_limit is not None:
params.set_age_limit_sec(ParseTaskAgeLimit(retry.task_age_limit))
if retry.min_backoff_seconds is not None:
params.set_min_backoff_sec(float(retry.min_backoff_seconds))
if retry.max_backoff_seconds is not None:
params.set_max_backoff_sec(float(retry.max_backoff_seconds))
if retry.max_doublings is not None:
params.set_max_doublings(int(retry.max_doublings))
if params.has_min_backoff_sec() and not params.has_max_backoff_sec():
if params.min_backoff_sec() > params.max_backoff_sec():
params.set_max_backoff_sec(params.min_backoff_sec())
if not params.has_min_backoff_sec() and params.has_max_backoff_sec():
if params.min_backoff_sec() > params.max_backoff_sec():
params.set_min_backoff_sec(params.max_backoff_sec())
if params.has_retry_limit() and params.retry_limit() < 0:
raise MalformedQueueConfiguration(
'Task retry limit must not be less than zero.')
if params.has_age_limit_sec() and not params.age_limit_sec() > 0:
raise MalformedQueueConfiguration(
'Task age limit must be greater than zero.')
if params.has_min_backoff_sec() and params.min_backoff_sec() < 0:
raise MalformedQueueConfiguration(
'Min backoff seconds must not be less than zero.')
if params.has_max_backoff_sec() and params.max_backoff_sec() < 0:
raise MalformedQueueConfiguration(
'Max backoff seconds must not be less than zero.')
if params.has_max_doublings() and params.max_doublings() < 0:
raise MalformedQueueConfiguration(
'Max doublings must not be less than zero.')
if (params.has_min_backoff_sec() and params.has_max_backoff_sec() and
params.min_backoff_sec() > params.max_backoff_sec()):
raise MalformedQueueConfiguration(
'Min backoff sec must not be greater than than max backoff sec.')
return params