blob: 12c4fe66443a1a99092560cb1e9803c01c293234 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Stub version of the Task Queue API.
This stub stores tasks and runs them via dev_appserver's AddEvent capability.
It also validates the tasks by checking their queue name against the queue.yaml.
As well as implementing Task Queue API functions, the stub exposes various other
functions that are used by the dev_appserver's admin console to display the
application's queues and tasks.
"""
from __future__ import with_statement
__all__ = []
import base64
import bisect
import calendar
import datetime
import logging
import os
import random
import string
import threading
import time
import taskqueue_service_pb
import taskqueue
from google.appengine.api import api_base_pb
from google.appengine.api import apiproxy_stub
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import queueinfo
from google.appengine.api import request_info
from google.appengine.api.taskqueue import taskqueue
from google.appengine.runtime import apiproxy_errors
DEFAULT_RATE = '5.00/s'
DEFAULT_RATE_FLOAT = 5.0
DEFAULT_BUCKET_SIZE = 5
MAX_ETA = datetime.timedelta(days=30)
MAX_PULL_TASK_SIZE_BYTES = 2 ** 20
MAX_PUSH_TASK_SIZE_BYTES = 100 * (2 ** 10)
MAX_TASK_SIZE = MAX_PUSH_TASK_SIZE_BYTES
MAX_REQUEST_SIZE = 32 << 20
BUILT_IN_HEADERS = set(['x-appengine-queuename',
'x-appengine-taskname',
'x-appengine-taskexecutioncount',
'x-appengine-taskpreviousresponse',
'x-appengine-taskretrycount',
'x-appengine-tasketa',
'x-appengine-development-payload',
'content-length'])
DEFAULT_QUEUE_NAME = 'default'
INF = 1e500
QUEUE_MODE = taskqueue_service_pb.TaskQueueMode
AUTOMATIC_QUEUES = {
DEFAULT_QUEUE_NAME: (0.2, DEFAULT_BUCKET_SIZE, DEFAULT_RATE),
'__cron': (1, 1, '1/s')}
def _GetAppId(request):
"""Returns the app id to use for the given request.
Args:
request: A protocol buffer that has an app_id field.
Returns:
A string containing the app id or None if no app id was specified.
"""
if request.has_app_id():
return request.app_id()
else:
return None
def _SecToUsec(t):
"""Converts a time in seconds since the epoch to usec since the epoch.
Args:
t: Time in seconds since the unix epoch
Returns:
An integer containing the number of usec since the unix epoch.
"""
return int(t * 1e6)
def _UsecToSec(t):
"""Converts a time in usec since the epoch to seconds since the epoch.
Args:
t: Time in usec since the unix epoch
Returns:
A float containing the number of seconds since the unix epoch.
"""
return t / 1e6
def _FormatEta(eta_usec):
"""Formats a task ETA as a date string in UTC."""
eta = datetime.datetime.utcfromtimestamp(_UsecToSec(eta_usec))
return eta.strftime('%Y/%m/%d %H:%M:%S')
def _TruncDelta(timedelta):
"""Strips the microseconds field from a timedelta.
Args:
timedelta: a datetime.timedelta.
Returns:
A datetime.timedelta with the microseconds field not filled.
"""
return datetime.timedelta(days=timedelta.days, seconds=timedelta.seconds)
def _EtaDelta(eta_usec, now):
"""Formats a task ETA as a relative time string."""
eta = datetime.datetime.utcfromtimestamp(_UsecToSec(eta_usec))
if eta > now:
return '%s from now' % _TruncDelta(eta - now)
else:
return '%s ago' % _TruncDelta(now - eta)
def QueryTasksResponseToDict(queue_name, task_response, now):
"""Converts a TaskQueueQueryTasksResponse_Task protobuf group into a dict.
Args:
queue_name: The name of the queue this task came from.
task_response: An instance of TaskQueueQueryTasksResponse_Task.
now: A datetime.datetime object containing the current time in UTC.
Returns:
A dict containing the fields used by the dev appserver's admin console.
Raises:
ValueError: A task response contains an unknown HTTP method type.
"""
task = {}
task['name'] = task_response.task_name()
task['queue_name'] = queue_name
task['url'] = task_response.url()
method = task_response.method()
if method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.GET:
task['method'] = 'GET'
elif method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.POST:
task['method'] = 'POST'
elif method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.HEAD:
task['method'] = 'HEAD'
elif method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.PUT:
task['method'] = 'PUT'
elif method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.DELETE:
task['method'] = 'DELETE'
else:
raise ValueError('Unexpected method: %d' % method)
task['eta'] = _FormatEta(task_response.eta_usec())
task['eta_usec'] = task_response.eta_usec()
task['eta_delta'] = _EtaDelta(task_response.eta_usec(), now)
task['body'] = base64.b64encode(task_response.body())
headers = [(header.key(), header.value())
for header in task_response.header_list()
if header.key().lower() not in BUILT_IN_HEADERS]
headers.append(('X-AppEngine-QueueName', queue_name))
headers.append(('X-AppEngine-TaskName', task_response.task_name()))
headers.append(('X-AppEngine-TaskRetryCount',
str(task_response.retry_count())))
headers.append(('X-AppEngine-TaskETA',
str(_UsecToSec(task_response.eta_usec()))))
headers.append(('X-AppEngine-Development-Payload', '1'))
headers.append(('Content-Length', str(len(task['body']))))
if 'content-type' not in frozenset(key.lower() for key, _ in headers):
headers.append(('Content-Type', 'application/octet-stream'))
headers.append(('X-AppEngine-TaskExecutionCount',
str(task_response.execution_count())))
if task_response.has_runlog() and task_response.runlog().has_response_code():
headers.append(('X-AppEngine-TaskPreviousResponse',
str(task_response.runlog().response_code())))
task['headers'] = headers
return task
class _Group(object):
"""A taskqueue group.
This class contains all of the queues for an application.
"""
def __init__(self, queue_yaml_parser=None, app_id=None,
_all_queues_valid=False, _update_newest_eta=None,
_testing_validate_state=False):
"""Constructor.
Args:
queue_yaml_parser: A function that takes no parameters and returns the
parsed results of the queue.yaml file. If this queue is not based on a
queue.yaml file use None.
app_id: The app id this Group is representing or None if it is the
currently running application.
_all_queues_valid: Automatically generate queues on first access.
_update_newest_eta: Callable for automatically executing tasks.
Takes the ETA of the task in seconds since the epoch, the queue_name
and a task name. May be None if automatic task running is disabled.
_testing_validate_state: Should this _Group and all of its _Queues
validate their state after each operation? This should only be used
during testing of the taskqueue_stub.
"""
self._queues = {}
self._queue_yaml_parser = queue_yaml_parser
self._all_queues_valid = _all_queues_valid
self._next_task_id = 1
self._app_id = app_id
if _update_newest_eta is None:
self._update_newest_eta = lambda x: None
else:
self._update_newest_eta = _update_newest_eta
self._testing_validate_state = _testing_validate_state
def GetQueuesAsDicts(self):
"""Gets all the applications's queues.
Returns:
A list of dictionaries, where each dictionary contains one queue's
attributes. E.g.:
[{'name': 'some-queue',
'max_rate': '1/s',
'bucket_size': 5,
'oldest_task': '2009/02/02 05:37:42',
'eta_delta': '0:00:06.342511 ago',
'tasks_in_queue': 12,
'acl': ['user1@gmail.com']}, ...]
The list of queues always includes the default queue.
"""
self._ReloadQueuesFromYaml()
now = datetime.datetime.utcnow()
queues = []
for queue_name, queue in sorted(self._queues.items()):
queue_dict = {}
queues.append(queue_dict)
queue_dict['name'] = queue_name
queue_dict['bucket_size'] = queue.bucket_capacity
if queue.user_specified_rate is not None:
queue_dict['max_rate'] = queue.user_specified_rate
else:
queue_dict['max_rate'] = ''
if queue.queue_mode == QUEUE_MODE.PULL:
queue_dict['mode'] = 'pull'
else:
queue_dict['mode'] = 'push'
queue_dict['acl'] = queue.acl
oldest_eta = queue.Oldest()
if oldest_eta:
queue_dict['oldest_task'] = _FormatEta(oldest_eta)
queue_dict['eta_delta'] = _EtaDelta(oldest_eta, now)
else:
queue_dict['oldest_task'] = ''
queue_dict['eta_delta'] = ''
queue_dict['tasks_in_queue'] = queue.Count()
if queue.retry_parameters:
retry_proto = queue.retry_parameters
retry_dict = {}
if retry_proto.has_retry_limit():
retry_dict['retry_limit'] = retry_proto.retry_limit()
if retry_proto.has_age_limit_sec():
retry_dict['age_limit_sec'] = retry_proto.age_limit_sec()
if retry_proto.has_min_backoff_sec():
retry_dict['min_backoff_sec'] = retry_proto.min_backoff_sec()
if retry_proto.has_max_backoff_sec():
retry_dict['max_backoff_sec'] = retry_proto.max_backoff_sec()
if retry_proto.has_max_doublings():
retry_dict['max_doublings'] = retry_proto.max_doublings()
queue_dict['retry_parameters'] = retry_dict
return queues
def HasQueue(self, queue_name):
"""Check if the specified queue_name references a valid queue.
Args:
queue_name: The name of the queue to check.
Returns:
True if the queue exists, False otherwise.
"""
self._ReloadQueuesFromYaml()
return queue_name in self._queues and (
self._queues[queue_name] is not None)
def GetQueue(self, queue_name):
"""Gets the _Queue instance for the specified queue.
Args:
queue_name: The name of the queue to fetch.
Returns:
The _Queue instance for the specified queue.
Raises:
KeyError if the queue does not exist.
"""
self._ReloadQueuesFromYaml()
return self._queues[queue_name]
def GetNextPushTask(self):
"""Finds the task with the lowest eta.
Returns:
A tuple containing the queue and task instance for the task with the
lowest eta, or (None, None) if there are no tasks.
"""
min_eta = INF
result = None, None
for queue in self._queues.itervalues():
if queue.queue_mode == QUEUE_MODE.PULL:
continue
task = queue.OldestTask()
if not task:
continue
if task.eta_usec() < min_eta:
result = queue, task
min_eta = task.eta_usec()
return result
def _ConstructQueue(self, queue_name, *args, **kwargs):
if '_testing_validate_state' in kwargs:
raise TypeError(
'_testing_validate_state should not be passed to _ConstructQueue')
kwargs['_testing_validate_state'] = self._testing_validate_state
self._queues[queue_name] = _Queue(queue_name, *args, **kwargs)
def _ConstructAutomaticQueue(self, queue_name):
if queue_name in AUTOMATIC_QUEUES:
self._ConstructQueue(queue_name, *AUTOMATIC_QUEUES[queue_name])
else:
assert self._all_queues_valid
self._ConstructQueue(queue_name)
def _ReloadQueuesFromYaml(self):
"""Update the queue map with the contents of the queue.yaml file.
This function will remove queues that no longer exist in the queue.yaml
file.
If no queue yaml parser has been defined, this function is a no-op.
"""
if not self._queue_yaml_parser:
return
queue_info = self._queue_yaml_parser()
if queue_info and queue_info.queue:
queues = queue_info.queue
else:
queues = []
old_queues = set(self._queues)
new_queues = set()
for entry in queues:
queue_name = entry.name
new_queues.add(queue_name)
retry_parameters = None
if entry.bucket_size:
bucket_size = entry.bucket_size
else:
bucket_size = DEFAULT_BUCKET_SIZE
if entry.retry_parameters:
retry_parameters = queueinfo.TranslateRetryParameters(
entry.retry_parameters)
if entry.mode == 'pull':
mode = QUEUE_MODE.PULL
if entry.rate is not None:
logging.warning(
'Refill rate must not be specified for pull-based queue. '
'Please check queue.yaml file.')
else:
mode = QUEUE_MODE.PUSH
if entry.rate is None:
logging.warning(
'Refill rate must be specified for push-based queue. '
'Please check queue.yaml file.')
max_rate = entry.rate
if entry.acl is not None:
acl = taskqueue_service_pb.TaskQueueAcl()
for acl_entry in entry.acl:
acl.add_user_email(acl_entry.user_email)
else:
acl = None
if self._queues.get(queue_name) is None:
self._ConstructQueue(queue_name, bucket_capacity=bucket_size,
user_specified_rate=max_rate, queue_mode=mode,
acl=acl, retry_parameters=retry_parameters,
target=entry.target)
else:
queue = self._queues[queue_name]
queue.bucket_size = bucket_size
queue.user_specified_rate = max_rate
queue.acl = acl
queue.queue_mode = mode
queue.retry_parameters = retry_parameters
if mode == QUEUE_MODE.PUSH:
eta = queue.Oldest()
if eta:
self._update_newest_eta(_UsecToSec(eta))
if DEFAULT_QUEUE_NAME not in self._queues:
self._ConstructAutomaticQueue(DEFAULT_QUEUE_NAME)
new_queues.add(DEFAULT_QUEUE_NAME)
if not self._all_queues_valid:
for queue_name in old_queues - new_queues:
del self._queues[queue_name]
def _ValidateQueueName(self, queue_name):
"""Tests if the specified queue exists and creates it if needed.
This function replicates the behaviour of the taskqueue service by
automatically creating the 'automatic' queues when they are first accessed.
Args:
queue_name: The name queue of the queue to check.
Returns:
If there are no problems, returns TaskQueueServiceError.OK. Otherwise
returns the correct constant from TaskQueueServiceError.
"""
if not queue_name:
return taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME
elif queue_name not in self._queues:
if queue_name in AUTOMATIC_QUEUES or self._all_queues_valid:
self._ConstructAutomaticQueue(queue_name)
else:
return taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE
elif self._queues[queue_name] is None:
return taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE
return taskqueue_service_pb.TaskQueueServiceError.OK
def _CheckQueueForRpc(self, queue_name):
"""Ensures the specified queue exists and creates it if needed.
This function replicates the behaviour of the taskqueue service by
automatically creating the 'automatic' queues when they are first accessed.
Args:
queue_name: The name queue of the queue to check
Raises:
ApplicationError: If the queue name is invalid, tombstoned or does not
exist.
"""
self._ReloadQueuesFromYaml()
response = self._ValidateQueueName(queue_name)
if response != taskqueue_service_pb.TaskQueueServiceError.OK:
raise apiproxy_errors.ApplicationError(response)
def _ChooseTaskName(self):
"""Returns a string containing a unique task name."""
self._next_task_id += 1
return 'task%d' % (self._next_task_id - 1)
def _VerifyTaskQueueAddRequest(self, request, now):
"""Checks that a TaskQueueAddRequest is valid.
Checks that a TaskQueueAddRequest specifies a valid eta and a valid queue.
Args:
request: The taskqueue_service_pb.TaskQueueAddRequest to validate.
now: A datetime.datetime object containing the current time in UTC.
Returns:
A taskqueue_service_pb.TaskQueueServiceError indicating any problems with
the request or taskqueue_service_pb.TaskQueueServiceError.OK if it is
valid.
"""
if request.eta_usec() < 0:
return taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA
eta = datetime.datetime.utcfromtimestamp(_UsecToSec(request.eta_usec()))
max_eta = now + MAX_ETA
if eta > max_eta:
return taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA
queue_name_response = self._ValidateQueueName(request.queue_name())
if queue_name_response != taskqueue_service_pb.TaskQueueServiceError.OK:
return queue_name_response
if request.has_crontimetable() and self._app_id is None:
return taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED
if request.mode() == QUEUE_MODE.PULL:
max_task_size_bytes = MAX_PULL_TASK_SIZE_BYTES
else:
max_task_size_bytes = MAX_PUSH_TASK_SIZE_BYTES
if request.ByteSize() > max_task_size_bytes:
return taskqueue_service_pb.TaskQueueServiceError.TASK_TOO_LARGE
return taskqueue_service_pb.TaskQueueServiceError.OK
def BulkAdd_Rpc(self, request, response):
"""Add many tasks to a queue using a single request.
Args:
request: The taskqueue_service_pb.TaskQueueBulkAddRequest. See
taskqueue_service.proto.
response: The taskqueue_service_pb.TaskQueueBulkAddResponse. See
taskqueue_service.proto.
"""
self._ReloadQueuesFromYaml()
if not request.add_request(0).queue_name():
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
error_found = False
task_results_with_chosen_names = set()
now = datetime.datetime.utcfromtimestamp(time.time())
for add_request in request.add_request_list():
task_result = response.add_taskresult()
result = self._VerifyTaskQueueAddRequest(add_request, now)
if result == taskqueue_service_pb.TaskQueueServiceError.OK:
if not add_request.task_name():
chosen_name = self._ChooseTaskName()
add_request.set_task_name(chosen_name)
task_results_with_chosen_names.add(id(task_result))
task_result.set_result(
taskqueue_service_pb.TaskQueueServiceError.SKIPPED)
else:
error_found = True
task_result.set_result(result)
if error_found:
return
if request.add_request(0).has_transaction():
self._TransactionalBulkAdd(request)
else:
self._NonTransactionalBulkAdd(request, response, now)
for add_request, task_result in zip(request.add_request_list(),
response.taskresult_list()):
if (task_result.result() ==
taskqueue_service_pb.TaskQueueServiceError.SKIPPED):
task_result.set_result(taskqueue_service_pb.TaskQueueServiceError.OK)
if id(task_result) in task_results_with_chosen_names:
task_result.set_chosen_task_name(add_request.task_name())
def _TransactionalBulkAdd(self, request):
"""Uses datastore.AddActions to associate tasks with a transaction.
Args:
request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
tasks to add. N.B. all tasks in the request have been validated and
assigned unique names.
"""
try:
apiproxy_stub_map.MakeSyncCall(
'datastore_v3', 'AddActions', request, api_base_pb.VoidProto())
except apiproxy_errors.ApplicationError, e:
raise apiproxy_errors.ApplicationError(
e.application_error +
taskqueue_service_pb.TaskQueueServiceError.DATASTORE_ERROR,
e.error_detail)
def _NonTransactionalBulkAdd(self, request, response, now):
"""Adds tasks to the appropriate _Queue instance.
Args:
request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
tasks to add. N.B. all tasks in the request have been validated and
those with empty names have been assigned unique names.
response: The taskqueue_service_pb.TaskQueueBulkAddResponse to populate
with the results. N.B. the chosen_task_name field in the response will
not be filled-in.
now: A datetime.datetime object containing the current time in UTC.
"""
queue_mode = request.add_request(0).mode()
queue_name = request.add_request(0).queue_name()
store = self._queues[queue_name]
if store.queue_mode != queue_mode:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_MODE)
for add_request, task_result in zip(request.add_request_list(),
response.taskresult_list()):
try:
store.Add(add_request, now)
except apiproxy_errors.ApplicationError, e:
task_result.set_result(e.application_error)
else:
task_result.set_result(taskqueue_service_pb.TaskQueueServiceError.OK)
if (store.queue_mode == QUEUE_MODE.PUSH and
store.Oldest() == add_request.eta_usec()):
self._update_newest_eta(_UsecToSec(add_request.eta_usec()))
def UpdateQueue_Rpc(self, request, response):
"""Implementation of the UpdateQueue RPC.
Args:
request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest.
response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse.
"""
queue_name = request.queue_name()
response = self._ValidateQueueName(queue_name)
is_unknown_queue = (
response == taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
if response != taskqueue_service_pb.TaskQueueServiceError.OK and (
not is_unknown_queue):
raise apiproxy_errors.ApplicationError(response)
if is_unknown_queue:
self._queues[queue_name] = _Queue(request.queue_name())
if self._app_id is not None:
self._queues[queue_name].Populate(random.randint(10, 100))
self._queues[queue_name].UpdateQueue_Rpc(request, response)
def FetchQueues_Rpc(self, request, response):
"""Implementation of the FetchQueues RPC.
Args:
request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest.
response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse.
"""
self._ReloadQueuesFromYaml()
for queue_name in sorted(self._queues):
if response.queue_size() > request.max_rows():
break
if self._queues[queue_name] is None:
continue
self._queues[queue_name].FetchQueues_Rpc(request, response)
def FetchQueueStats_Rpc(self, request, response):
"""Implementation of the FetchQueueStats rpc which returns 'random' data.
This implementation loads some stats from the task store, the rest are
random numbers.
Args:
request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest.
response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse.
"""
for queue_name in request.queue_name_list():
stats = response.add_queuestats()
if queue_name not in self._queues:
stats.set_num_tasks(0)
stats.set_oldest_eta_usec(-1)
continue
store = self._queues[queue_name]
stats.set_num_tasks(store.Count())
if stats.num_tasks() == 0:
stats.set_oldest_eta_usec(-1)
else:
stats.set_oldest_eta_usec(store.Oldest())
if random.randint(0, 9) > 0:
scanner_info = stats.mutable_scanner_info()
scanner_info.set_executed_last_minute(random.randint(0, 10))
scanner_info.set_executed_last_hour(scanner_info.executed_last_minute()
+ random.randint(0, 100))
scanner_info.set_sampling_duration_seconds(random.random() * 10000.0)
scanner_info.set_requests_in_flight(random.randint(0, 10))
def QueryTasks_Rpc(self, request, response):
"""Implementation of the QueryTasks RPC.
Args:
request: A taskqueue_service_pb.TaskQueueQueryTasksRequest.
response: A taskqueue_service_pb.TaskQueueQueryTasksResponse.
"""
self._CheckQueueForRpc(request.queue_name())
self._queues[request.queue_name()].QueryTasks_Rpc(request, response)
def FetchTask_Rpc(self, request, response):
"""Implementation of the FetchTask RPC.
Args:
request: A taskqueue_service_pb.TaskQueueFetchTaskRequest.
response: A taskqueue_service_pb.TaskQueueFetchTaskResponse.
"""
self._ReloadQueuesFromYaml()
self._CheckQueueForRpc(request.queue_name())
self._queues[request.queue_name()].FetchTask_Rpc(request, response)
def Delete_Rpc(self, request, response):
"""Implementation of the Delete RPC.
Deletes tasks from the task store.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteRequest.
response: A taskqueue_service_pb.TaskQueueDeleteResponse.
"""
self._ReloadQueuesFromYaml()
def _AddResultForAll(result):
for _ in request.task_name_list():
response.add_result(result)
if request.queue_name() not in self._queues:
_AddResultForAll(taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
elif self._queues[request.queue_name()] is None:
_AddResultForAll(
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
else:
self._queues[request.queue_name()].Delete_Rpc(request, response)
def DeleteQueue_Rpc(self, request, response):
"""Implementation of the DeleteQueue RPC.
Tombstones the queue.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteQueueRequest.
response: A taskqueue_service_pb.TaskQueueDeleteQueueResponse.
"""
self._CheckQueueForRpc(request.queue_name())
self._queues[request.queue_name()] = None
def PauseQueue_Rpc(self, request, response):
"""Implementation of the PauseQueue RPC.
Args:
request: A taskqueue_service_pb.TaskQueuePauseQueueRequest.
response: A taskqueue_service_pb.TaskQueuePauseQueueResponse.
"""
self._CheckQueueForRpc(request.queue_name())
self._queues[request.queue_name()].paused = request.pause()
def PurgeQueue_Rpc(self, request, response):
"""Implementation of the PurgeQueue RPC.
Args:
request: A taskqueue_service_pb.TaskQueuePurgeQueueRequest.
response: A taskqueue_service_pb.TaskQueuePurgeQueueResponse.
"""
self._CheckQueueForRpc(request.queue_name())
self._queues[request.queue_name()].PurgeQueue()
def QueryAndOwnTasks_Rpc(self, request, response):
"""Implementation of the QueryAndOwnTasks RPC.
Args:
request: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest.
response: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse.
"""
self._CheckQueueForRpc(request.queue_name())
self._queues[request.queue_name()].QueryAndOwnTasks_Rpc(request, response)
def ModifyTaskLease_Rpc(self, request, response):
"""Implementation of the ModifyTaskLease RPC.
Args:
request: A taskqueue_service_pb.TaskQueueModifyTaskLeaseRequest.
response: A taskqueue_service_pb.TaskQueueModifyTaskLeaseResponse.
"""
self._CheckQueueForRpc(request.queue_name())
self._queues[request.queue_name()].ModifyTaskLease_Rpc(request, response)
class Retry(object):
"""Task retry caclulator class.
Determines if and when a task should next be run
"""
_default_params = taskqueue_service_pb.TaskQueueRetryParameters()
def __init__(self, task, queue):
"""Constructor.
Args:
task: A taskqueue_service_pb.TaskQueueQueryTasksResponse_Task instance.
May be None.
queue: A _Queue instance. May be None.
"""
if task is not None and task.has_retry_parameters():
self._params = task.retry_parameters()
elif queue is not None and queue.retry_parameters is not None:
self._params = queue.retry_parameters
else:
self._params = self._default_params
def CanRetry(self, retry_count, age_usec):
"""Computes whether a task can be retried.
Args:
retry_count: An integer specifying which retry this is.
age_usec: An integer specifying the microseconds since the first try.
Returns:
True if a task is eligible for retrying.
"""
if self._params.has_retry_limit() and self._params.has_age_limit_sec():
return (self._params.retry_limit() >= retry_count or
self._params.age_limit_sec() >= _UsecToSec(age_usec))
if self._params.has_retry_limit():
return self._params.retry_limit() >= retry_count
if self._params.has_age_limit_sec():
return self._params.age_limit_sec() >= _UsecToSec(age_usec)
return True
def CalculateBackoffUsec(self, retry_count):
"""Calculates time before the specified retry.
Args:
retry_count: An integer specifying which retry this is.
Returns:
The number of microseconds before a task should be retried.
"""
exponent = min(retry_count - 1, self._params.max_doublings())
linear_steps = retry_count - exponent
min_backoff_usec = _SecToUsec(self._params.min_backoff_sec())
max_backoff_usec = _SecToUsec(self._params.max_backoff_sec())
backoff_usec = min_backoff_usec
if exponent > 0:
backoff_usec *= (2 ** (min(1023, exponent)))
if linear_steps > 1:
backoff_usec *= linear_steps
return int(min(max_backoff_usec, backoff_usec))
class _Queue(object):
"""A Taskqueue Queue.
This class contains all of the properties of a queue and a sorted list of
tasks.
"""
def __init__(self, queue_name, bucket_refill_per_second=DEFAULT_RATE_FLOAT,
bucket_capacity=DEFAULT_BUCKET_SIZE,
user_specified_rate=DEFAULT_RATE, retry_parameters=None,
max_concurrent_requests=None, paused=False,
queue_mode=QUEUE_MODE.PUSH, acl=None,
_testing_validate_state=None, target=None):
self.queue_name = queue_name
self.bucket_refill_per_second = bucket_refill_per_second
self.bucket_capacity = bucket_capacity
self.user_specified_rate = user_specified_rate
self.retry_parameters = retry_parameters
self.max_concurrent_requests = max_concurrent_requests
self.paused = paused
self.queue_mode = queue_mode
self.acl = acl
self.target = target
self._testing_validate_state = _testing_validate_state
self.task_name_archive = set()
self._sorted_by_name = []
self._sorted_by_eta = []
self._sorted_by_tag = []
self._lock = threading.Lock()
def VerifyIndexes(self):
"""Ensures that all three indexes are in a valid state.
This method is used by internal tests and should not need to be called in
any other circumstances.
Raises:
AssertionError: if the indexes are not in a valid state.
"""
assert self._IsInOrder(self._sorted_by_name)
assert self._IsInOrder(self._sorted_by_eta)
assert self._IsInOrder(self._sorted_by_tag)
tasks_by_name = set()
tasks_with_tags = set()
for name, task in self._sorted_by_name:
assert name == task.task_name()
assert name not in tasks_by_name
tasks_by_name.add(name)
if task.has_tag():
tasks_with_tags.add(name)
tasks_by_eta = set()
for eta, name, task in self._sorted_by_eta:
assert name == task.task_name()
assert eta == task.eta_usec()
assert name not in tasks_by_eta
tasks_by_eta.add(name)
assert tasks_by_eta == tasks_by_name
tasks_by_tag = set()
for tag, eta, name, task in self._sorted_by_tag:
assert name == task.task_name()
assert eta == task.eta_usec()
assert task.has_tag() and task.tag()
assert tag == task.tag()
assert name not in tasks_by_tag
tasks_by_tag.add(name)
assert tasks_by_tag == tasks_with_tags
@staticmethod
def _IsInOrder(l):
"""Determine if the specified list is in ascending order.
Args:
l: The list to check
Returns:
True if the list is in order, False otherwise
"""
sorted_list = sorted(l)
return l == sorted_list
def _WithLock(f):
"""Runs the decorated function within self._lock.
Args:
f: The function to be delegated to. Must be a member function (take self
as the first parameter).
Returns:
The result of f.
"""
def _Inner(self, *args, **kwargs):
with self._lock:
ret = f(self, *args, **kwargs)
if self._testing_validate_state:
self.VerifyIndexes()
return ret
_Inner.__doc__ = f.__doc__
return _Inner
@_WithLock
def UpdateQueue_Rpc(self, request, response):
"""Implementation of the UpdateQueue RPC.
Args:
request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest.
response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse.
"""
assert request.queue_name() == self.queue_name
self.bucket_refill_per_second = request.bucket_refill_per_second()
self.bucket_capacity = request.bucket_capacity()
if request.has_user_specified_rate():
self.user_specified_rate = request.user_specified_rate()
else:
self.user_specified_rate = None
if request.has_retry_parameters():
self.retry_parameters = request.retry_parameters()
else:
self.retry_parameters = None
if request.has_max_concurrent_requests():
self.max_concurrent_requests = request.max_concurrent_requests()
else:
self.max_concurrent_requests = None
self.queue_mode = request.mode()
if request.has_acl():
self.acl = request.acl()
else:
self.acl = None
@_WithLock
def FetchQueues_Rpc(self, request, response):
"""Fills out a queue message on the provided TaskQueueFetchQueuesResponse.
Args:
request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest.
response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse.
"""
response_queue = response.add_queue()
response_queue.set_queue_name(self.queue_name)
response_queue.set_bucket_refill_per_second(
self.bucket_refill_per_second)
response_queue.set_bucket_capacity(self.bucket_capacity)
if self.user_specified_rate is not None:
response_queue.set_user_specified_rate(self.user_specified_rate)
if self.max_concurrent_requests is not None:
response_queue.set_max_concurrent_requests(
self.max_concurrent_requests)
if self.retry_parameters is not None:
response_queue.retry_parameters().CopyFrom(self.retry_parameters)
response_queue.set_paused(self.paused)
if self.queue_mode is not None:
response_queue.set_mode(self.queue_mode)
if self.acl is not None:
response_queue.mutable_acl().CopyFrom(self.acl)
@_WithLock
def QueryTasks_Rpc(self, request, response):
"""Implementation of the QueryTasks RPC.
Args:
request: A taskqueue_service_pb.TaskQueueQueryTasksRequest.
response: A taskqueue_service_pb.TaskQueueQueryTasksResponse.
"""
assert not request.has_start_tag()
if request.has_start_eta_usec():
tasks = self._LookupNoAcquireLock(request.max_rows(),
name=request.start_task_name(),
eta=request.start_eta_usec())
else:
tasks = self._LookupNoAcquireLock(request.max_rows(),
name=request.start_task_name())
for task in tasks:
response.add_task().MergeFrom(task)
@_WithLock
def FetchTask_Rpc(self, request, response):
"""Implementation of the FetchTask RPC.
Args:
request: A taskqueue_service_pb.TaskQueueFetchTaskRequest.
response: A taskqueue_service_pb.TaskQueueFetchTaskResponse.
"""
task_name = request.task_name()
pos = self._LocateTaskByName(task_name)
if pos is None:
if task_name in self.task_name_archive:
error = taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK
else:
error = taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK
raise apiproxy_errors.ApplicationError(error)
_, task = self._sorted_by_name[pos]
response.mutable_task().add_task().CopyFrom(task)
@_WithLock
def Delete_Rpc(self, request, response):
"""Implementation of the Delete RPC.
Deletes tasks from the task store. We mimic a 1/20 chance of a
TRANSIENT_ERROR when the request has an app_id.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteRequest.
response: A taskqueue_service_pb.TaskQueueDeleteResponse.
"""
for taskname in request.task_name_list():
if request.has_app_id() and random.random() <= 0.05:
response.add_result(
taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR)
else:
response.add_result(self._DeleteNoAcquireLock(taskname))
def _QueryAndOwnTasksGetTaskList(self, max_rows, group_by_tag, now_eta_usec,
tag=None):
assert self._lock.locked()
if group_by_tag and tag:
return self._IndexScan(self._sorted_by_tag,
start_key=(tag, None, None,),
end_key=(tag, now_eta_usec, None,),
max_rows=max_rows)
elif group_by_tag:
tasks = self._IndexScan(self._sorted_by_eta,
start_key=(None, None,),
end_key=(now_eta_usec, None,),
max_rows=max_rows)
if not tasks:
return []
if tasks[0].has_tag():
tag = tasks[0].tag()
return self._QueryAndOwnTasksGetTaskList(
max_rows, True, now_eta_usec, tag)
else:
return [task for task in tasks if not task.has_tag()]
else:
return self._IndexScan(self._sorted_by_eta,
start_key=(None, None,),
end_key=(now_eta_usec, None,),
max_rows=max_rows)
@_WithLock
def QueryAndOwnTasks_Rpc(self, request, response):
"""Implementation of the QueryAndOwnTasks RPC.
Args:
request: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest.
response: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse.
"""
if self.queue_mode != QUEUE_MODE.PULL:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_MODE)
lease_seconds = request.lease_seconds()
if lease_seconds < 0:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST)
max_tasks = request.max_tasks()
if max_tasks <= 0:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST)
if request.has_tag() and not request.group_by_tag():
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST,
'Tag specified, but group_by_tag was not.')
now_eta_usec = _SecToUsec(time.time())
tasks = self._QueryAndOwnTasksGetTaskList(
max_tasks, request.group_by_tag(), now_eta_usec, request.tag())
tasks_to_delete = []
for task in tasks:
retry = Retry(task, self)
if not retry.CanRetry(task.retry_count() + 1, 0):
logging.warning(
'Task %s in queue %s cannot be leased again after %d leases.',
task.task_name(), self.queue_name, task.retry_count())
tasks_to_delete.append(task)
continue
self._PostponeTaskNoAcquireLock(
task, now_eta_usec + _SecToUsec(lease_seconds))
task_response = response.add_task()
task_response.set_task_name(task.task_name())
task_response.set_eta_usec(task.eta_usec())
task_response.set_retry_count(task.retry_count())
if task.has_tag():
task_response.set_tag(task.tag())
task_response.set_body(task.body())
for task in tasks_to_delete:
self._DeleteNoAcquireLock(task.task_name())
@_WithLock
def ModifyTaskLease_Rpc(self, request, response):
"""Implementation of the ModifyTaskLease RPC.
Args:
request: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest.
response: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse.
"""
if self.queue_mode != QUEUE_MODE.PULL:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_MODE)
if self.paused:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.QUEUE_PAUSED)
lease_seconds = request.lease_seconds()
if lease_seconds < 0:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST)
pos = self._LocateTaskByName(request.task_name())
if pos is None:
if request.task_name() in self.task_name_archive:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK)
else:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK)
_, task = self._sorted_by_name[pos]
if task.eta_usec() != request.eta_usec():
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TASK_LEASE_EXPIRED)
now_usec = _SecToUsec(time.time())
if task.eta_usec() < now_usec:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TASK_LEASE_EXPIRED)
future_eta_usec = now_usec + _SecToUsec(lease_seconds)
self._PostponeTaskNoAcquireLock(
task, future_eta_usec, increase_retries=False)
response.set_updated_eta_usec(future_eta_usec)
@_WithLock
def IncRetryCount(self, task_name):
"""Increment the retry count of a task by 1.
Args:
task_name: The name of the task to update.
"""
pos = self._LocateTaskByName(task_name)
assert pos is not None, (
'Task does not exist when trying to increase retry count.')
task = self._sorted_by_name[pos][1]
self._IncRetryCount(task)
def _IncRetryCount(self, task):
assert self._lock.locked()
retry_count = task.retry_count()
task.set_retry_count(retry_count + 1)
task.set_execution_count(task.execution_count() + 1)
@_WithLock
def GetTasksAsDicts(self):
"""Gets all of the tasks in this queue.
Returns:
A list of dictionaries, where each dictionary contains one task's
attributes. E.g.
[{'name': 'task-123',
'queue_name': 'default',
'url': '/update',
'method': 'GET',
'eta': '2009/02/02 05:37:42',
'eta_delta': '0:00:06.342511 ago',
'body': '',
'headers': [('user-header', 'some-value')
('X-AppEngine-QueueName': 'update-queue'),
('X-AppEngine-TaskName': 'task-123'),
('X-AppEngine-TaskExecutionCount': '1'),
('X-AppEngine-TaskRetryCount': '1'),
('X-AppEngine-TaskETA': '1234567890.123456'),
('X-AppEngine-Development-Payload': '1'),
('X-AppEngine-TaskPreviousResponse': '300'),
('Content-Length': 0),
('Content-Type': 'application/octet-stream')]
Raises:
ValueError: A task request contains an unknown HTTP method type.
"""
tasks = []
now = datetime.datetime.utcnow()
for _, _, task_response in self._sorted_by_eta:
tasks.append(QueryTasksResponseToDict(
self.queue_name, task_response, now))
return tasks
@_WithLock
def GetTaskAsDict(self, task_name):
"""Gets a specific task from this queue.
Returns:
A dictionary containing one task's attributes. E.g.
[{'name': 'task-123',
'queue_name': 'default',
'url': '/update',
'method': 'GET',
'eta': '2009/02/02 05:37:42',
'eta_delta': '0:00:06.342511 ago',
'body': '',
'headers': [('user-header', 'some-value')
('X-AppEngine-QueueName': 'update-queue'),
('X-AppEngine-TaskName': 'task-123'),
('X-AppEngine-TaskExecutionCount': '1'),
('X-AppEngine-TaskRetryCount': '1'),
('X-AppEngine-TaskETA': '1234567890.123456'),
('X-AppEngine-Development-Payload': '1'),
('X-AppEngine-TaskPreviousResponse': '300'),
('Content-Length': 0),
('Content-Type': 'application/octet-stream')]
Raises:
ValueError: A task request contains an unknown HTTP method type.
"""
task_responses = self._LookupNoAcquireLock(maximum=1, name=task_name)
if not task_responses:
return
task_response, = task_responses
if task_response.task_name() != task_name:
return
now = datetime.datetime.utcnow()
return QueryTasksResponseToDict(self.queue_name, task_response, now)
@_WithLock
def PurgeQueue(self):
"""Removes all content from the queue."""
self._sorted_by_name = []
self._sorted_by_eta = []
self._sorted_by_tag = []
@_WithLock
def _GetTasks(self):
"""Helper method for tests returning all tasks sorted by eta.
Returns:
A list of taskqueue_service_pb.TaskQueueQueryTasksResponse_Task objects
sorted by eta.
"""
return self._GetTasksNoAcquireLock()
def _GetTasksNoAcquireLock(self):
"""Helper method for tests returning all tasks sorted by eta.
Returns:
A list of taskqueue_service_pb.TaskQueueQueryTasksResponse_Task objects
sorted by eta.
"""
assert self._lock.locked()
tasks = []
for eta, task_name, task in self._sorted_by_eta:
tasks.append(task)
return tasks
def _InsertTask(self, task):
"""Insert a task into the store, keeps lists sorted.
Args:
task: the new task.
"""
assert self._lock.locked()
eta = task.eta_usec()
name = task.task_name()
bisect.insort_left(self._sorted_by_eta, (eta, name, task))
if task.has_tag():
bisect.insort_left(self._sorted_by_tag, (task.tag(), eta, name, task))
bisect.insort_left(self._sorted_by_name, (name, task))
self.task_name_archive.add(name)
@_WithLock
def RunTaskNow(self, task):
"""Change the eta of a task to now.
Args:
task: The TaskQueueQueryTasksResponse_Task run now. This must be
stored in this queue (otherwise an AssertionError is raised).
"""
self._PostponeTaskNoAcquireLock(task, 0, increase_retries=False)
@_WithLock
def PostponeTask(self, task, new_eta_usec):
"""Postpone the task to a future time and increment the retry count.
Args:
task: The TaskQueueQueryTasksResponse_Task to postpone. This must be
stored in this queue (otherwise an AssertionError is raised).
new_eta_usec: The new eta to set on the task. This must be greater then
the current eta on the task.
"""
assert new_eta_usec > task.eta_usec()
self._PostponeTaskNoAcquireLock(task, new_eta_usec)
def _PostponeTaskNoAcquireLock(self, task, new_eta_usec,
increase_retries=True):
assert self._lock.locked()
if increase_retries:
self._IncRetryCount(task)
name = task.task_name()
eta = task.eta_usec()
assert self._RemoveTaskFromIndex(
self._sorted_by_eta, (eta, name, None), task)
if task.has_tag():
assert self._RemoveTaskFromIndex(
self._sorted_by_tag, (task.tag(), eta, name, None), task)
self._PostponeTaskInsertOnly(task, new_eta_usec)
def _PostponeTaskInsertOnly(self, task, new_eta_usec):
assert self._lock.locked()
task.set_eta_usec(new_eta_usec)
name = task.task_name()
bisect.insort_left(self._sorted_by_eta, (new_eta_usec, name, task))
if task.has_tag():
tag = task.tag()
bisect.insort_left(self._sorted_by_tag, (tag, new_eta_usec, name, task))
@_WithLock
def Lookup(self, maximum, name=None, eta=None):
"""Lookup a number of sorted tasks from the store.
If 'eta' is specified, the tasks are looked up in a list sorted by 'eta',
then 'name'. Otherwise they are sorted by 'name'. We need to be able to
sort by 'eta' and 'name' because tasks can have identical eta. If you had
20 tasks with the same ETA, you wouldn't be able to page past them, since
the 'next eta' would give the first one again. Names are unique, though.
Args:
maximum: the maximum number of tasks to return.
name: a task name to start with.
eta: an eta to start with.
Returns:
A list of up to 'maximum' tasks.
Raises:
ValueError: if the task store gets corrupted.
"""
return self._LookupNoAcquireLock(maximum, name, eta)
def _IndexScan(self, index, start_key, end_key=None, max_rows=None):
"""Return the result of a 'scan' over the given index.
The scan is inclusive of start_key and exclusive of end_key. It returns at
most max_rows from the index.
Args:
index: One of the index lists, eg self._sorted_by_tag.
start_key: The key to start at.
end_key: Optional end key.
max_rows: The maximum number of rows to yield.
Returns:
a list of up to 'max_rows' TaskQueueQueryTasksResponse_Task instances from
the given index, in sorted order.
"""
assert self._lock.locked()
start_pos = bisect.bisect_left(index, start_key)
end_pos = INF
if end_key is not None:
end_pos = bisect.bisect_left(index, end_key)
if max_rows is not None:
end_pos = min(end_pos, start_pos + max_rows)
end_pos = min(end_pos, len(index))
tasks = []
for pos in xrange(start_pos, end_pos):
tasks.append(index[pos][-1])
return tasks
def _LookupNoAcquireLock(self, maximum, name=None, eta=None, tag=None):
assert self._lock.locked()
if tag is not None:
return self._IndexScan(self._sorted_by_tag,
start_key=(tag, eta, name,),
end_key=('%s\x00' % tag, None, None,),
max_rows=maximum)
elif eta is not None:
return self._IndexScan(self._sorted_by_eta,
start_key=(eta, name,),
max_rows=maximum)
else:
return self._IndexScan(self._sorted_by_name,
start_key=(name,),
max_rows=maximum)
@_WithLock
def Count(self):
"""Returns the number of tasks in the store."""
return len(self._sorted_by_name)
@_WithLock
def OldestTask(self):
"""Returns the task with the oldest eta in the store."""
if self._sorted_by_eta:
return self._sorted_by_eta[0][2]
return None
@_WithLock
def Oldest(self):
"""Returns the oldest eta in the store, or None if no tasks."""
if self._sorted_by_eta:
return self._sorted_by_eta[0][0]
return None
def _LocateTaskByName(self, task_name):
"""Locate the index of a task in _sorted_by_name list.
If the task does not exist in the list, return None.
Args:
task_name: Name of task to be located.
Returns:
Index of the task in _sorted_by_name list if task exists,
None otherwise.
"""
assert self._lock.locked()
pos = bisect.bisect_left(self._sorted_by_name, (task_name,))
if (pos >= len(self._sorted_by_name) or
self._sorted_by_name[pos][0] != task_name):
return None
return pos
@_WithLock
def Add(self, request, now):
"""Inserts a new task into the store.
Args:
request: A taskqueue_service_pb.TaskQueueAddRequest.
now: A datetime.datetime object containing the current time in UTC.
Raises:
apiproxy_errors.ApplicationError: If a task with the same name is already
in the store, or the task is tombstoned.
"""
if self._LocateTaskByName(request.task_name()) is not None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS)
if request.task_name() in self.task_name_archive:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK)
now_sec = calendar.timegm(now.utctimetuple())
task = taskqueue_service_pb.TaskQueueQueryTasksResponse_Task()
task.set_task_name(request.task_name())
task.set_eta_usec(request.eta_usec())
task.set_creation_time_usec(_SecToUsec(now_sec))
task.set_retry_count(0)
task.set_method(request.method())
if request.has_url():
task.set_url(request.url())
for keyvalue in request.header_list():
header = task.add_header()
header.set_key(keyvalue.key())
header.set_value(keyvalue.value())
if request.has_description():
task.set_description(request.description())
if request.has_body():
task.set_body(request.body())
if request.has_crontimetable():
task.mutable_crontimetable().set_schedule(
request.crontimetable().schedule())
task.mutable_crontimetable().set_timezone(
request.crontimetable().timezone())
if request.has_retry_parameters():
task.mutable_retry_parameters().CopyFrom(request.retry_parameters())
if request.has_tag():
task.set_tag(request.tag())
self._InsertTask(task)
@_WithLock
def Delete(self, name):
"""Deletes a task from the store by name.
Args:
name: the name of the task to delete.
Returns:
TaskQueueServiceError.UNKNOWN_TASK: if the task is unknown.
TaskQueueServiceError.INTERNAL_ERROR: if the store is corrupted.
TaskQueueServiceError.TOMBSTONED: if the task was deleted.
TaskQueueServiceError.OK: otherwise.
"""
return self._DeleteNoAcquireLock(name)
def _RemoveTaskFromIndex(self, index, index_tuple, task):
"""Remove a task from the specified index.
Args:
index: The index list that needs to be mutated.
index_tuple: The tuple to search for in the index.
task: The task instance that is expected to be stored at this location.
Returns:
True if the task was successfully removed from the index, False otherwise.
"""
assert self._lock.locked()
pos = bisect.bisect_left(index, index_tuple)
if index[pos][-1] is not task:
logging.debug('Expected %s, found %s', task, index[pos][-1])
return False
index.pop(pos)
return True
def _DeleteNoAcquireLock(self, name):
assert self._lock.locked()
pos = self._LocateTaskByName(name)
if pos is None:
if name in self.task_name_archive:
return taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK
else:
return taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK
old_task = self._sorted_by_name.pop(pos)[-1]
eta = old_task.eta_usec()
if not self._RemoveTaskFromIndex(
self._sorted_by_eta, (eta, name, None), old_task):
return taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR
if old_task.has_tag():
tag = old_task.tag()
if not self._RemoveTaskFromIndex(
self._sorted_by_tag, (tag, eta, name, None), old_task):
return taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR
return taskqueue_service_pb.TaskQueueServiceError.OK
@_WithLock
def Populate(self, num_tasks):
"""Populates the store with a number of tasks.
Args:
num_tasks: the number of tasks to insert.
"""
def RandomTask():
"""Creates a new task and randomly populates values."""
assert self._lock.locked()
task = taskqueue_service_pb.TaskQueueQueryTasksResponse_Task()
task.set_task_name(''.join(random.choice(string.ascii_lowercase)
for x in range(20)))
task.set_eta_usec(now_usec + random.randint(_SecToUsec(-10),
_SecToUsec(600)))
task.set_creation_time_usec(min(now_usec, task.eta_usec()) -
random.randint(0, _SecToUsec(20)))
task.set_url(random.choice(['/a', '/b', '/c', '/d']))
if random.random() < 0.2:
task.set_method(
taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.POST)
task.set_body('A' * 2000)
else:
task.set_method(
taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.GET)
retry_count = max(0, random.randint(-10, 5))
task.set_retry_count(retry_count)
task.set_execution_count(retry_count)
if random.random() < 0.3:
random_headers = [('nexus', 'one'),
('foo', 'bar'),
('content-type', 'text/plain'),
('from', 'user@email.com')]
for _ in xrange(random.randint(1, 4)):
elem = random.randint(0, len(random_headers) - 1)
key, value = random_headers.pop(elem)
header_proto = task.add_header()
header_proto.set_key(key)
header_proto.set_value(value)
return task
now_usec = _SecToUsec(time.time())
for _ in range(num_tasks):
self._InsertTask(RandomTask())
class _TaskExecutor(object):
"""Executor for a task object.
Converts a TaskQueueQueryTasksResponse_Task into a http request, then uses the
httplib library to send it to the http server.
"""
def __init__(self, default_host, request_data):
"""Constructor.
Args:
default_host: a string to use as the host/port to connect to if the host
header is not specified in the task.
request_data: A request_info.RequestInfo instance used to look up state
associated with the request that generated an API call.
"""
self._default_host = default_host
self._request_data = request_data
def _HeadersFromTask(self, task, queue):
"""Constructs the http headers for the given task.
This function will remove special headers (values in BUILT_IN_HEADERS) and
add the taskqueue headers.
Args:
task: The task, a TaskQueueQueryTasksResponse_Task instance.
queue: The queue that this task belongs to, an _Queue instance.
Returns:
A list of tuples containing the http header and value. There
may be be mutiple entries with the same key.
"""
headers = []
for header in task.header_list():
header_key_lower = header.key().lower()
if header_key_lower == 'host' and queue.target is not None:
headers.append(
(header.key(), '.'.join([queue.target, self._default_host])))
elif header_key_lower not in BUILT_IN_HEADERS:
headers.append((header.key(), header.value()))
headers.append(('X-AppEngine-QueueName', queue.queue_name))
headers.append(('X-AppEngine-TaskName', task.task_name()))
headers.append(('X-AppEngine-TaskRetryCount', str(task.retry_count())))
headers.append(('X-AppEngine-TaskETA',
str(_UsecToSec(task.eta_usec()))))
headers.append(('X-AppEngine-Fake-Is-Admin', '1'))
headers.append(('Content-Length', str(len(task.body()))))
if (task.has_body() and 'content-type' not in
[key.lower() for key, _ in headers]):
headers.append(('Content-Type', 'application/octet-stream'))
headers.append(('X-AppEngine-TaskExecutionCount',
str(task.execution_count())))
if task.has_runlog() and task.runlog().has_response_code():
headers.append(('X-AppEngine-TaskPreviousResponse',
str(task.runlog().response_code())))
return headers
def ExecuteTask(self, task, queue):
"""Construct a http request from the task and dispatch it.
Args:
task: The task to convert to a http request and then send. An instance of
taskqueue_service_pb.TaskQueueQueryTasksResponse_Task
queue: The queue that this task belongs to. An instance of _Queue.
Returns:
Http Response code from the task's execution, 0 if an exception occurred.
"""
method = task.RequestMethod_Name(task.method())
headers = self._HeadersFromTask(task, queue)
dispatcher = self._request_data.get_dispatcher()
try:
response = dispatcher.add_request(method, task.url(), headers,
task.body() if task.has_body() else '',
'0.1.0.2')
except request_info.ModuleDoesNotExistError:
logging.exception('Failed to dispatch task')
return 0
return int(response.status.split(' ', 1)[0])
class _BackgroundTaskScheduler(object):
"""The task scheduler class.
This class is designed to be run in a background thread.
Note: There must not be more than one instance of _BackgroundTaskScheduler per
group.
"""
def __init__(self, group, task_executor, retry_seconds, **kwargs):
"""Constructor.
Args:
group: The group that we will automatically execute tasks from. Must be an
instance of _Group.
task_executor: The class used to convert a task into a http request. Must
be an instance of _TaskExecutor.
retry_seconds: The number of seconds to delay a task by if its execution
fails.
_get_time: a callable that returns the current time in seconds since the
epoch. This argument may only be passed in by keyword. If unset, use
time.time.
"""
self._group = group
self._should_exit = False
self._next_wakeup = INF
self._event = threading.Event()
self._wakeup_lock = threading.Lock()
self.task_executor = task_executor
self.default_retry_seconds = retry_seconds
self._get_time = kwargs.pop('_get_time', time.time)
if kwargs:
raise TypeError('Unknown parameters: %s' % ', '.join(kwargs))
def UpdateNextEventTime(self, next_event_time):
"""Notify the TaskExecutor of the closest event it needs to process.
Args:
next_event_time: The time of the event in seconds since the epoch.
"""
with self._wakeup_lock:
if next_event_time < self._next_wakeup:
self._next_wakeup = next_event_time
self._event.set()
def Shutdown(self):
"""Request this TaskExecutor to exit."""
self._should_exit = True
self._event.set()
def _ProcessQueues(self):
with self._wakeup_lock:
self._next_wakeup = INF
now = self._get_time()
queue, task = self._group.GetNextPushTask()
while task and _UsecToSec(task.eta_usec()) <= now:
if task.retry_count() == 0:
task.set_first_try_usec(_SecToUsec(now))
response_code = self.task_executor.ExecuteTask(task, queue)
if response_code:
task.mutable_runlog().set_response_code(response_code)
else:
logging.error(
'An error occured while sending the task "%s" '
'(Url: "%s") in queue "%s". Treating as a task error.',
task.task_name(), task.url(), queue.queue_name)
now = self._get_time()
if 200 <= response_code < 300:
queue.Delete(task.task_name())
else:
retry = Retry(task, queue)
age_usec = _SecToUsec(now) - task.first_try_usec()
if retry.CanRetry(task.retry_count() + 1, age_usec):
retry_usec = retry.CalculateBackoffUsec(task.retry_count() + 1)
logging.warning(
'Task %s failed to execute. This task will retry in %.3f seconds',
task.task_name(), _UsecToSec(retry_usec))
queue.PostponeTask(task, _SecToUsec(now) + retry_usec)
else:
logging.warning(
'Task %s failed to execute. The task has no remaining retries. '
'Failing permanently after %d retries and %d seconds',
task.task_name(), task.retry_count(), _UsecToSec(age_usec))
queue.Delete(task.task_name())
queue, task = self._group.GetNextPushTask()
if task:
with self._wakeup_lock:
eta = _UsecToSec(task.eta_usec())
if eta < self._next_wakeup:
self._next_wakeup = eta
def _Wait(self):
"""Block until we need to process a task or we need to exit."""
now = self._get_time()
while not self._should_exit and self._next_wakeup > now:
timeout = self._next_wakeup - now
self._event.wait(timeout)
self._event.clear()
now = self._get_time()
def MainLoop(self):
"""The main loop of the scheduler."""
while not self._should_exit:
self._ProcessQueues()
self._Wait()
class TaskQueueServiceStub(apiproxy_stub.APIProxyStub):
"""Python only task queue service stub.
This stub executes tasks when enabled by using the dev_appserver's AddEvent
capability. When task running is disabled this stub will store tasks for
display on a console, where the user may manually execute the tasks.
"""
def __init__(self,
service_name='taskqueue',
root_path=None,
auto_task_running=False,
task_retry_seconds=30,
_all_queues_valid=False,
default_http_server='localhost',
_testing_validate_state=False,
request_data=None):
"""Constructor.
Args:
service_name: Service name expected for all calls.
root_path: Root path to the directory of the application which may contain
a queue.yaml file. If None, then it's assumed no queue.yaml file is
available.
auto_task_running: When True, the dev_appserver should automatically
run tasks after they are enqueued.
task_retry_seconds: How long to wait between task executions after a
task fails.
_testing_validate_state: Should this stub and all of its _Groups (and
thus and all of its _Queues) validate their state after each
operation? This should only be used during testing of the
taskqueue_stub.
request_data: A request_info.RequestInfo instance used to look up state
associated with the request that generated an API call.
"""
super(TaskQueueServiceStub, self).__init__(
service_name, max_request_size=MAX_REQUEST_SIZE,
request_data=request_data)
self._queues = {}
self._all_queues_valid = _all_queues_valid
self._root_path = root_path
self._testing_validate_state = _testing_validate_state
self._queues[None] = _Group(
self._ParseQueueYaml, app_id=None,
_all_queues_valid=_all_queues_valid,
_update_newest_eta=self._UpdateNextEventTime,
_testing_validate_state=self._testing_validate_state)
self._auto_task_running = auto_task_running
self._started = False
self._task_scheduler = _BackgroundTaskScheduler(
self._queues[None], _TaskExecutor(default_http_server,
self.request_data),
retry_seconds=task_retry_seconds)
self._yaml_last_modified = None
def StartBackgroundExecution(self):
"""Start automatic task execution."""
if not self._started and self._auto_task_running:
task_scheduler_thread = threading.Thread(
target=self._task_scheduler.MainLoop)
task_scheduler_thread.setDaemon(True)
task_scheduler_thread.start()
self._started = True
def Shutdown(self):
"""Requests the task scheduler to shutdown."""
self._task_scheduler.Shutdown()
def _ParseQueueYaml(self):
"""Loads the queue.yaml file and parses it.
Returns:
None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object
populated from the queue.yaml.
"""
if hasattr(self, 'queue_yaml_parser'):
return self.queue_yaml_parser(self._root_path)
if self._root_path is None:
return None
for queueyaml in ('queue.yaml', 'queue.yml'):
try:
path = os.path.join(self._root_path, queueyaml)
modified = os.stat(path).st_mtime
if self._yaml_last_modified and self._yaml_last_modified == modified:
return self._last_queue_info
fh = open(path, 'r')
except (IOError, OSError):
continue
try:
queue_info = queueinfo.LoadSingleQueue(fh)
self._last_queue_info = queue_info
self._yaml_last_modified = modified
return queue_info
finally:
fh.close()
return None
def _UpdateNextEventTime(self, callback_time):
"""Enqueue a task to be automatically scheduled.
Note: If auto task running is disabled, this function is a no-op.
Args:
callback_time: The earliest time this task may be run, in seconds since
the epoch.
"""
self._task_scheduler.UpdateNextEventTime(callback_time)
def _GetGroup(self, app_id=None):
"""Get the _Group instance for app_id, creating a new one if needed.
Args:
app_id: The app id in question. Note: This field is not validated.
"""
if app_id not in self._queues:
self._queues[app_id] = _Group(
app_id=app_id, _all_queues_valid=self._all_queues_valid,
_testing_validate_state=self._testing_validate_state)
return self._queues[app_id]
def _Dynamic_Add(self, request, response):
"""Add a single task to a queue.
This method is a wrapper around the BulkAdd RPC request.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: The taskqueue_service_pb.TaskQueueAddRequest. See
taskqueue_service.proto.
response: The taskqueue_service_pb.TaskQueueAddResponse. See
taskqueue_service.proto.
"""
bulk_request = taskqueue_service_pb.TaskQueueBulkAddRequest()
bulk_response = taskqueue_service_pb.TaskQueueBulkAddResponse()
bulk_request.add_add_request().CopyFrom(request)
self._Dynamic_BulkAdd(bulk_request, bulk_response)
assert bulk_response.taskresult_size() == 1
result = bulk_response.taskresult(0).result()
if result != taskqueue_service_pb.TaskQueueServiceError.OK:
raise apiproxy_errors.ApplicationError(result)
elif bulk_response.taskresult(0).has_chosen_task_name():
response.set_chosen_task_name(
bulk_response.taskresult(0).chosen_task_name())
def _Dynamic_BulkAdd(self, request, response):
"""Add many tasks to a queue using a single request.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: The taskqueue_service_pb.TaskQueueBulkAddRequest. See
taskqueue_service.proto.
response: The taskqueue_service_pb.TaskQueueBulkAddResponse. See
taskqueue_service.proto.
"""
assert request.add_request_size(), 'taskqueue should prevent empty requests'
self._GetGroup(_GetAppId(request.add_request(0))).BulkAdd_Rpc(
request, response)
def GetQueues(self):
"""Gets all the application's queues.
Returns:
A list of dictionaries, where each dictionary contains one queue's
attributes. E.g.:
[{'name': 'some-queue',
'max_rate': '1/s',
'bucket_size': 5,
'oldest_task': '2009/02/02 05:37:42',
'eta_delta': '0:00:06.342511 ago',
'tasks_in_queue': 12}, ...]
The list of queues always includes the default queue.
"""
return self._GetGroup().GetQueuesAsDicts()
def GetTasks(self, queue_name):
"""Gets a queue's tasks.
Args:
queue_name: Queue's name to return tasks for.
Returns:
A list of dictionaries, where each dictionary contains one task's
attributes. E.g.
[{'name': 'task-123',
'queue_name': 'default',
'url': '/update',
'method': 'GET',
'eta': '2009/02/02 05:37:42',
'eta_delta': '0:00:06.342511 ago',
'body': '',
'headers': [('user-header', 'some-value')
('X-AppEngine-QueueName': 'update-queue'),
('X-AppEngine-TaskName': 'task-123'),
('X-AppEngine-TaskRetryCount': '0'),
('X-AppEngine-TaskETA': '1234567890.123456'),
('X-AppEngine-Development-Payload': '1'),
('Content-Length': 0),
('Content-Type': 'application/octet-stream')]
Raises:
ValueError: A task request contains an unknown HTTP method type.
KeyError: An invalid queue name was specified.
"""
return self._GetGroup().GetQueue(queue_name).GetTasksAsDicts()
def DeleteTask(self, queue_name, task_name):
"""Deletes a task from a queue, without leaving a tombstone.
Args:
queue_name: the name of the queue to delete the task from.
task_name: the name of the task to delete.
"""
if self._GetGroup().HasQueue(queue_name):
queue = self._GetGroup().GetQueue(queue_name)
queue.Delete(task_name)
queue.task_name_archive.discard(task_name)
def FlushQueue(self, queue_name):
"""Removes all tasks from a queue, without leaving tombstones.
Args:
queue_name: the name of the queue to remove tasks from.
"""
if self._GetGroup().HasQueue(queue_name):
self._GetGroup().GetQueue(queue_name).PurgeQueue()
self._GetGroup().GetQueue(queue_name).task_name_archive.clear()
def _Dynamic_UpdateQueue(self, request, unused_response):
"""Local implementation of the UpdateQueue RPC in TaskQueueService.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest.
unused_response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse.
Not used.
"""
self._GetGroup(_GetAppId(request)).UpdateQueue_Rpc(request, unused_response)
def _Dynamic_FetchQueues(self, request, response):
"""Local implementation of the FetchQueues RPC in TaskQueueService.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest.
response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse.
"""
self._GetGroup(_GetAppId(request)).FetchQueues_Rpc(request, response)
def _Dynamic_FetchQueueStats(self, request, response):
"""Local 'random' implementation of the TaskQueueService.FetchQueueStats.
This implementation loads some stats from the task store, the rest with
random numbers.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest.
response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse.
"""
self._GetGroup(_GetAppId(request)).FetchQueueStats_Rpc(request, response)
def _Dynamic_QueryTasks(self, request, response):
"""Local implementation of the TaskQueueService.QueryTasks RPC.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueQueryTasksRequest.
response: A taskqueue_service_pb.TaskQueueQueryTasksResponse.
"""
self._GetGroup(_GetAppId(request)).QueryTasks_Rpc(request, response)
def _Dynamic_FetchTask(self, request, response):
"""Local implementation of the TaskQueueService.FetchTask RPC.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueFetchTaskRequest.
response: A taskqueue_service_pb.TaskQueueFetchTaskResponse.
"""
self._GetGroup(_GetAppId(request)).FetchTask_Rpc(request, response)
def _Dynamic_Delete(self, request, response):
"""Local delete implementation of TaskQueueService.Delete.
Deletes tasks from the task store. A 1/20 chance of a transient error.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteRequest.
response: A taskqueue_service_pb.TaskQueueDeleteResponse.
"""
self._GetGroup(_GetAppId(request)).Delete_Rpc(request, response)
def _Dynamic_ForceRun(self, request, response):
"""Local force run implementation of TaskQueueService.ForceRun.
Forces running of a task in a queue. This will fail randomly for testing if
the app id is non-empty.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueForceRunRequest.
response: A taskqueue_service_pb.TaskQueueForceRunResponse.
"""
if _GetAppId(request) is not None:
if random.random() <= 0.05:
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR)
elif random.random() <= 0.052:
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR)
else:
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.OK)
else:
group = self._GetGroup(None)
if not group.HasQueue(request.queue_name()):
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
return
queue = group.GetQueue(request.queue_name())
task = queue.Lookup(1, name=request.task_name())
if not task:
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK)
return
queue.RunTaskNow(task[0])
self._UpdateNextEventTime(0)
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.OK)
def _Dynamic_DeleteQueue(self, request, response):
"""Local delete implementation of TaskQueueService.DeleteQueue.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteQueueRequest.
response: A taskqueue_service_pb.TaskQueueDeleteQueueResponse.
"""
app_id = _GetAppId(request)
if app_id is None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED)
self._GetGroup(app_id).DeleteQueue_Rpc(request, response)
def _Dynamic_PauseQueue(self, request, response):
"""Local pause implementation of TaskQueueService.PauseQueue.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueuePauseQueueRequest.
response: A taskqueue_service_pb.TaskQueuePauseQueueResponse.
"""
app_id = _GetAppId(request)
if app_id is None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED)
self._GetGroup(app_id).PauseQueue_Rpc(request, response)
def _Dynamic_PurgeQueue(self, request, response):
"""Local purge implementation of TaskQueueService.PurgeQueue.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueuePurgeQueueRequest.
response: A taskqueue_service_pb.TaskQueuePurgeQueueResponse.
"""
self._GetGroup(_GetAppId(request)).PurgeQueue_Rpc(request, response)
def _Dynamic_DeleteGroup(self, request, response):
"""Local delete implementation of TaskQueueService.DeleteGroup.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteGroupRequest.
response: A taskqueue_service_pb.TaskQueueDeleteGroupResponse.
"""
app_id = _GetAppId(request)
if app_id is None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED)
if app_id in self._queues:
del self._queues[app_id]
else:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
def _Dynamic_UpdateStorageLimit(self, request, response):
"""Local implementation of TaskQueueService.UpdateStorageLimit.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueUpdateStorageLimitRequest.
response: A taskqueue_service_pb.TaskQueueUpdateStorageLimitResponse.
"""
if _GetAppId(request) is None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED)
if request.limit() < 0 or request.limit() > 1000 * (1024 ** 4):
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST)
response.set_new_limit(request.limit())
def _Dynamic_QueryAndOwnTasks(self, request, response):
"""Local implementation of TaskQueueService.QueryAndOwnTasks.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest.
response: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse.
Raises:
InvalidQueueModeError: If target queue is not a pull queue.
"""
self._GetGroup().QueryAndOwnTasks_Rpc(request, response)
def _Dynamic_ModifyTaskLease(self, request, response):
"""Local implementation of TaskQueueService.ModifyTaskLease.
Args:
request: A taskqueue_service_pb.TaskQueueModifyTaskLeaseRequest.
response: A taskqueue_service_pb.TaskQueueModifyTaskLeaseResponse.
Raises:
InvalidQueueModeError: If target queue is not a pull queue.
"""
self._GetGroup().ModifyTaskLease_Rpc(request, response)
def get_filtered_tasks(self, url=None, name=None, queue_names=None):
"""Get the tasks in the task queue with filters.
Args:
url: A URL that all returned tasks should point at.
name: The name of all returned tasks.
queue_names: A list of queue names to retrieve tasks from. If left blank
this will get default to all queues available.
Returns:
A list of taskqueue.Task objects.
"""
all_queue_names = [queue['name'] for queue in self.GetQueues()]
if isinstance(queue_names, basestring):
queue_names = [queue_names]
if queue_names is None:
queue_names = all_queue_names
task_dicts = []
for queue_name in queue_names:
if queue_name in all_queue_names:
for task in self.GetTasks(queue_name):
if url is not None and task['url'] != url:
continue
if name is not None and task['name'] != name:
continue
task_dicts.append(task)
tasks = []
for task in task_dicts:
payload = base64.b64decode(task['body'])
headers = dict(task['headers'])
headers['Content-Length'] = str(len(payload))
eta = datetime.datetime.strptime(task['eta'], '%Y/%m/%d %H:%M:%S')
eta = eta.replace(tzinfo=taskqueue._UTC)
task_object = taskqueue.Task(name=task['name'], method=task['method'],
url=task['url'], headers=headers,
payload=payload, eta=eta)
tasks.append(task_object)
return tasks