blob: 20376457e1d8791f61f648e42248ce11ba0ad366 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Stub version of the Task Queue API.
This stub stores tasks and runs them via dev_appserver's AddEvent capability.
It also validates the tasks by checking their queue name against the queue.yaml.
As well as implementing Task Queue API functions, the stub exposes various other
functions that are used by the dev_appserver's admin console to display the
application's queues and tasks.
"""
import StringIO
import base64
import bisect
import datetime
import logging
import os
import random
import string
import time
from google.appengine.api import api_base_pb
from google.appengine.api import apiproxy_stub
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import queueinfo
from google.appengine.api.labs.taskqueue import taskqueue_service_pb
from google.appengine.runtime import apiproxy_errors
DEFAULT_RATE = '5.00/s'
DEFAULT_BUCKET_SIZE = 5
MAX_ETA_DELTA_DAYS = 30
admin_console_dummy_tasks = {}
BUILT_IN_HEADERS = set(['x-appengine-queuename',
'x-appengine-taskname',
'x-appengine-taskretrycount',
'x-appengine-development-payload',
'content-length'])
DEFAULT_QUEUE_NAME = 'default'
CRON_QUEUE_NAME = '__cron'
class _DummyTaskStore(object):
"""A class that encapsulates a sorted store of tasks.
Used for testing the admin console.
"""
def __init__(self):
"""Constructor."""
self._sorted_by_name = []
self._sorted_by_eta = []
def _InsertTask(self, task):
"""Insert a task into the dummy store, keeps lists sorted.
Args:
task: the new task.
"""
eta = task.eta_usec()
name = task.task_name()
bisect.insort_left(self._sorted_by_eta, (eta, name, task))
bisect.insort_left(self._sorted_by_name, (name, task))
def Lookup(self, maximum, name=None, eta=None):
"""Lookup a number of sorted tasks from the store.
If 'eta' is specified, the tasks are looked up in a list sorted by 'eta',
then 'name'. Otherwise they are sorted by 'name'. We need to be able to
sort by 'eta' and 'name' because tasks can have identical eta. If you had
20 tasks with the same ETA, you wouldn't be able to page past them, since
the 'next eta' would give the first one again. Names are unique, though.
Args:
maximum: the maximum number of tasks to return.
name: a task name to start with.
eta: an eta to start with.
Returns:
A list of up to 'maximum' tasks.
Raises:
ValueError: if the task store gets corrupted.
"""
if eta is None:
pos = bisect.bisect_left(self._sorted_by_name, (name,))
tasks = (x[1] for x in self._sorted_by_name[pos:pos + maximum])
return list(tasks)
if name is None:
raise ValueError('must supply name or eta')
pos = bisect.bisect_left(self._sorted_by_eta, (eta, name))
tasks = (x[2] for x in self._sorted_by_eta[pos:pos + maximum])
return list(tasks)
def Count(self):
"""Returns the number of tasks in the store."""
return len(self._sorted_by_name)
def Oldest(self):
"""Returns the oldest eta in the store, or None if no tasks."""
if self._sorted_by_eta:
return self._sorted_by_eta[0][0]
return None
def Add(self, request):
"""Inserts a new task into the store.
Args:
request: A taskqueue_service_pb.TaskQueueAddRequest.
Raises:
apiproxy_errors.ApplicationError: If a task with the same name is already
in the store.
"""
pos = bisect.bisect_left(self._sorted_by_name, (request.task_name(),))
if (pos < len(self._sorted_by_name) and
self._sorted_by_name[pos][0] == request.task_name()):
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS)
now = datetime.datetime.utcnow()
now_sec = time.mktime(now.timetuple())
task = taskqueue_service_pb.TaskQueueQueryTasksResponse_Task()
task.set_task_name(request.task_name())
task.set_eta_usec(request.eta_usec())
task.set_creation_time_usec(now_sec * 1e6)
task.set_url(request.url())
task.set_method(request.method())
for keyvalue in task.header_list():
header = task.add_header()
header.set_key(keyvalue.key())
header.set_value(keyvalue.value())
if request.has_description():
task.set_description(request.description())
if request.has_body():
task.set_body(request.body())
if request.has_crontimetable():
task.mutable_crontimetable().set_schedule(
request.crontimetable().schedule())
task.mutable_crontimetable().set_timezone(
request.crontimetable().timezone())
self._InsertTask(task)
def Delete(self, name):
"""Deletes a task from the store by name.
Args:
name: the name of the task to delete.
Returns:
TaskQueueServiceError.UNKNOWN_TASK: if the task is unknown.
TaskQueueServiceError.INTERNAL_ERROR: if the store is corrupted.
TaskQueueServiceError.OK: otherwise.
"""
pos = bisect.bisect_left(self._sorted_by_name, (name,))
if pos >= len(self._sorted_by_name):
return taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK
if self._sorted_by_name[pos][1].task_name() != name:
logging.info('looking for task name %s, got task name %s', name,
self._sorted_by_name[pos][1].task_name())
return taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK
old_task = self._sorted_by_name.pop(pos)[1]
eta = old_task.eta_usec()
pos = bisect.bisect_left(self._sorted_by_eta, (eta, name, None))
if self._sorted_by_eta[pos][2] is not old_task:
logging.error('task store corrupted')
return taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR
self._sorted_by_eta.pop(pos)
return taskqueue_service_pb.TaskQueueServiceError.OK
def Populate(self, num_tasks):
"""Populates the store with a number of tasks.
Args:
num_tasks: the number of tasks to insert.
"""
now = datetime.datetime.utcnow()
now_sec = time.mktime(now.timetuple())
def RandomTask():
"""Creates a new task and randomly populates values."""
task = taskqueue_service_pb.TaskQueueQueryTasksResponse_Task()
task.set_task_name(''.join(random.choice(string.ascii_lowercase)
for x in range(20)))
task.set_eta_usec(int(now_sec * 1e6) + random.randint(-10e6, 600e6))
task.set_creation_time_usec(min(now_sec * 1e6, task.eta_usec()) -
random.randint(0, 2e7))
task.set_url(random.choice(['/a', '/b', '/c', '/d']))
if random.random() < 0.2:
task.set_method(
taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.POST)
task.set_body('A' * 2000)
else:
task.set_method(
taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.GET)
task.set_retry_count(max(0, random.randint(-10, 5)))
if random.random() < 0.3:
random_headers = [('nexus', 'one'),
('foo', 'bar'),
('content-type', 'text/plain'),
('from', 'user@email.com')]
for _ in xrange(random.randint(1, 4)):
elem = random.randint(0, len(random_headers)-1)
key, value = random_headers.pop(elem)
header_proto = task.add_header()
header_proto.set_key(key)
header_proto.set_value(value)
return task
for _ in range(num_tasks):
self._InsertTask(RandomTask())
def _ParseQueueYaml(unused_self, root_path):
"""Loads the queue.yaml file and parses it.
Args:
unused_self: Allows this function to be bound to a class member. Not used.
root_path: Directory containing queue.yaml. Not used.
Returns:
None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object
populated from the queue.yaml.
"""
if root_path is None:
return None
for queueyaml in ('queue.yaml', 'queue.yml'):
try:
fh = open(os.path.join(root_path, queueyaml), 'r')
except IOError:
continue
try:
queue_info = queueinfo.LoadSingleQueue(fh)
return queue_info
finally:
fh.close()
return None
def _CompareTasksByEta(a, b):
"""Python sort comparator for tasks by estimated time of arrival (ETA).
Args:
a: A taskqueue_service_pb.TaskQueueAddRequest.
b: A taskqueue_service_pb.TaskQueueAddRequest.
Returns:
Standard 1/0/-1 comparison result.
"""
if a.eta_usec() > b.eta_usec():
return 1
if a.eta_usec() < b.eta_usec():
return -1
return 0
def _FormatEta(eta_usec):
"""Formats a task ETA as a date string in UTC."""
eta = datetime.datetime.fromtimestamp(eta_usec/1000000)
return eta.strftime('%Y/%m/%d %H:%M:%S')
def _EtaDelta(eta_usec):
"""Formats a task ETA as a relative time string."""
eta = datetime.datetime.fromtimestamp(eta_usec/1000000)
now = datetime.datetime.utcnow()
if eta > now:
return str(eta - now) + ' from now'
else:
return str(now - eta) + ' ago'
class TaskQueueServiceStub(apiproxy_stub.APIProxyStub):
"""Python only task queue service stub.
This stub executes tasks when enabled by using the dev_appserver's AddEvent
capability. When task running is disabled this stub will store tasks for
display on a console, where the user may manually execute the tasks.
"""
queue_yaml_parser = _ParseQueueYaml
def __init__(self,
service_name='taskqueue',
root_path=None,
auto_task_running=False,
task_retry_seconds=30,
_all_queues_valid=False):
"""Constructor.
Args:
service_name: Service name expected for all calls.
root_path: Root path to the directory of the application which may contain
a queue.yaml file. If None, then it's assumed no queue.yaml file is
available.
auto_task_running: When True, the dev_appserver should automatically
run tasks after they are enqueued.
task_retry_seconds: How long to wait between task executions after a
task fails.
"""
super(TaskQueueServiceStub, self).__init__(service_name)
self._taskqueues = {}
self._next_task_id = 1
self._root_path = root_path
self._all_queues_valid = _all_queues_valid
self._add_event = None
self._auto_task_running = auto_task_running
self._task_retry_seconds = task_retry_seconds
self._app_queues = {}
class _QueueDetails(taskqueue_service_pb.TaskQueueUpdateQueueRequest):
def __init__(self, paused=False):
self.paused = paused
def _ChooseTaskName(self):
"""Returns a string containing a unique task name."""
self._next_task_id += 1
return 'task%d' % (self._next_task_id - 1)
def _VerifyTaskQueueAddRequest(self, request):
"""Checks that a TaskQueueAddRequest is valid.
Checks that a TaskQueueAddRequest specifies a valid eta and a valid queue.
Args:
request: The taskqueue_service_pb.TaskQueueAddRequest to validate.
Returns:
A taskqueue_service_pb.TaskQueueServiceError indicating any problems with
the request or taskqueue_service_pb.TaskQueueServiceError.OK if it is
valid.
"""
if request.eta_usec() < 0:
return taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA
eta = datetime.datetime.utcfromtimestamp(request.eta_usec() / 1e6)
max_eta = (datetime.datetime.utcnow() +
datetime.timedelta(days=MAX_ETA_DELTA_DAYS))
if eta > max_eta:
return taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA
return taskqueue_service_pb.TaskQueueServiceError.OK
def _Dynamic_Add(self, request, response):
bulk_request = taskqueue_service_pb.TaskQueueBulkAddRequest()
bulk_response = taskqueue_service_pb.TaskQueueBulkAddResponse()
bulk_request.add_add_request().CopyFrom(request)
self._Dynamic_BulkAdd(bulk_request, bulk_response)
assert bulk_response.taskresult_size() == 1
result = bulk_response.taskresult(0).result()
if result != taskqueue_service_pb.TaskQueueServiceError.OK:
raise apiproxy_errors.ApplicationError(result)
elif bulk_response.taskresult(0).has_chosen_task_name():
response.set_chosen_task_name(
bulk_response.taskresult(0).chosen_task_name())
def _Dynamic_BulkAdd(self, request, response):
"""Add many tasks to a queue using a single request.
Args:
request: The taskqueue_service_pb.TaskQueueBulkAddRequest. See
taskqueue_service.proto.
response: The taskqueue_service_pb.TaskQueueBulkAddResponse. See
taskqueue_service.proto.
"""
assert request.add_request_size(), 'taskqueue should prevent empty requests'
app_id = None
if request.add_request(0).has_app_id():
app_id = request.add_request(0).app_id()
if not self._IsValidQueue(request.add_request(0).queue_name(), app_id):
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
error_found = False
task_results_with_chosen_names = []
for add_request in request.add_request_list():
task_result = response.add_taskresult()
error = self._VerifyTaskQueueAddRequest(add_request)
if error == taskqueue_service_pb.TaskQueueServiceError.OK:
if not add_request.task_name():
chosen_name = self._ChooseTaskName()
add_request.set_task_name(chosen_name)
task_results_with_chosen_names.append(task_result)
task_result.set_result(
taskqueue_service_pb.TaskQueueServiceError.SKIPPED)
else:
error_found = True
task_result.set_result(error)
if error_found:
return
if request.add_request(0).has_transaction():
self._TransactionalBulkAdd(request)
elif request.add_request(0).has_app_id():
self._DummyTaskStoreBulkAdd(request, response)
else:
self._NonTransactionalBulkAdd(request, response)
for add_request, task_result in zip(request.add_request_list(),
response.taskresult_list()):
if (task_result.result() ==
taskqueue_service_pb.TaskQueueServiceError.SKIPPED):
task_result.set_result(taskqueue_service_pb.TaskQueueServiceError.OK)
if task_result in task_results_with_chosen_names:
task_result.set_chosen_task_name(add_request.task_name())
def _TransactionalBulkAdd(self, request):
"""Uses datastore.AddActions to associate tasks with a transaction.
Args:
request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
tasks to add. N.B. all tasks in the request have been validated and
assigned unique names.
"""
try:
apiproxy_stub_map.MakeSyncCall(
'datastore_v3', 'AddActions', request, api_base_pb.VoidProto())
except apiproxy_errors.ApplicationError, e:
raise apiproxy_errors.ApplicationError(
e.application_error +
taskqueue_service_pb.TaskQueueServiceError.DATASTORE_ERROR,
e.error_detail)
def _DummyTaskStoreBulkAdd(self, request, response):
"""Adds tasks to the appropriate DummyTaskStore.
Args:
request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
tasks to add. N.B. all tasks in the request have been validated and
those with empty names have been assigned unique names.
response: The taskqueue_service_pb.TaskQueueBulkAddResponse to populate
with the results. N.B. the chosen_task_name field in the response will
not be filled-in.
"""
store = self.GetDummyTaskStore(request.add_request(0).app_id(),
request.add_request(0).queue_name())
for add_request, task_result in zip(request.add_request_list(),
response.taskresult_list()):
try:
store.Add(add_request)
except apiproxy_errors.ApplicationError, e:
task_result.set_result(e.application_error)
else:
task_result.set_result(taskqueue_service_pb.TaskQueueServiceError.OK)
def _NonTransactionalBulkAdd(self, request, response):
"""Adds tasks to the appropriate list in in self._taskqueues.
Args:
request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
tasks to add. N.B. all tasks in the request have been validated and
those with empty names have been assigned unique names.
response: The taskqueue_service_pb.TaskQueueBulkAddResponse to populate
with the results. N.B. the chosen_task_name field in the response will
not be filled-in.
"""
existing_tasks = self._taskqueues.setdefault(
request.add_request(0).queue_name(), [])
existing_task_names = set(task.task_name() for task in existing_tasks)
def DefineCallback(queue_name, task_name):
return lambda: self._RunTask(queue_name, task_name)
for add_request, task_result in zip(request.add_request_list(),
response.taskresult_list()):
if add_request.task_name() in existing_task_names:
task_result.set_result(
taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS)
else:
existing_tasks.append(add_request)
if self._add_event and self._auto_task_running:
self._add_event(
add_request.eta_usec() / 1000000.0,
DefineCallback(add_request.queue_name(), add_request.task_name()))
existing_tasks.sort(_CompareTasksByEta)
def _IsValidQueue(self, queue_name, app_id):
"""Determines whether a queue is valid, i.e. tasks can be added to it.
Valid queues are the 'default' queue, plus any queues in the queue.yaml
file.
Args:
queue_name: the name of the queue to validate.
app_id: the app_id. Can be None.
Returns:
True iff queue is valid.
"""
if self._all_queues_valid:
return True
if queue_name == DEFAULT_QUEUE_NAME or queue_name == CRON_QUEUE_NAME:
return True
queue_info = self.queue_yaml_parser(self._root_path)
if queue_info and queue_info.queue:
for entry in queue_info.queue:
if entry.name == queue_name:
return True
if app_id is not None:
queues = self._app_queues.get(app_id, {})
return queues.get(queue_name, None) is not None
return False
def _RunTask(self, queue_name, task_name):
"""Returns a fake request for running a task in the dev_appserver.
Args:
queue_name: The queue the task is in.
task_name: The name of the task to run.
Returns:
None if this task no longer exists or tuple (connection, addrinfo) of
a fake connection and address information used to run this task. The
task will be deleted after it runs or re-enqueued in the future on
failure.
"""
task_list = self.GetTasks(queue_name)
for task in task_list:
if task['name'] == task_name:
break
else:
return None
class FakeConnection(object):
def __init__(self, input_buffer):
self.rfile = StringIO.StringIO(input_buffer)
self.wfile = StringIO.StringIO()
self.wfile_close = self.wfile.close
self.wfile.close = self.connection_done
def connection_done(myself):
result = myself.wfile.getvalue()
myself.wfile_close()
first_line, rest = (result.split('\n', 1) + ['', ''])[:2]
version, code, rest = (first_line.split(' ', 2) + ['', '500', ''])[:3]
try:
code = int(code)
except ValueError:
code = 500
if 200 <= int(code) <= 299:
self.DeleteTask(queue_name, task_name)
return
logging.warning('Task named "%s" on queue "%s" failed with code %s; '
'will retry in %d seconds',
task_name, queue_name, code, self._task_retry_seconds)
self._add_event(
time.time() + self._task_retry_seconds,
lambda: self._RunTask(queue_name, task_name))
def close(self):
pass
def makefile(self, mode, buffsize):
if mode.startswith('w'):
return self.wfile
else:
return self.rfile
payload = StringIO.StringIO()
payload.write('%s %s HTTP/1.1\r\n' % (task['method'], task['url']))
for key, value in task['headers']:
payload.write('%s: %s\r\n' % (key, value))
payload.write('\r\n')
payload.write(task['body'])
return FakeConnection(payload.getvalue()), ('0.1.0.2', 80)
def GetQueues(self):
"""Gets all the applications's queues.
Returns:
A list of dictionaries, where each dictionary contains one queue's
attributes. E.g.:
[{'name': 'some-queue',
'max_rate': '1/s',
'bucket_size': 5,
'oldest_task': '2009/02/02 05:37:42',
'eta_delta': '0:00:06.342511 ago',
'tasks_in_queue': 12}, ...]
The list of queues always includes the default queue.
"""
queues = []
queue_info = self.queue_yaml_parser(self._root_path)
has_default = False
if queue_info and queue_info.queue:
for entry in queue_info.queue:
if entry.name == DEFAULT_QUEUE_NAME:
has_default = True
queue = {}
queues.append(queue)
queue['name'] = entry.name
queue['max_rate'] = entry.rate
if entry.bucket_size:
queue['bucket_size'] = entry.bucket_size
else:
queue['bucket_size'] = DEFAULT_BUCKET_SIZE
tasks = self._taskqueues.setdefault(entry.name, [])
if tasks:
queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
else:
queue['oldest_task'] = ''
queue['tasks_in_queue'] = len(tasks)
if not has_default:
queue = {}
queues.append(queue)
queue['name'] = DEFAULT_QUEUE_NAME
queue['max_rate'] = DEFAULT_RATE
queue['bucket_size'] = DEFAULT_BUCKET_SIZE
tasks = self._taskqueues.get(DEFAULT_QUEUE_NAME, [])
if tasks:
queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
else:
queue['oldest_task'] = ''
queue['tasks_in_queue'] = len(tasks)
return queues
def GetTasks(self, queue_name):
"""Gets a queue's tasks.
Args:
queue_name: Queue's name to return tasks for.
Returns:
A list of dictionaries, where each dictionary contains one task's
attributes. E.g.
[{'name': 'task-123',
'queue_name': 'default',
'url': '/update',
'method': 'GET',
'eta': '2009/02/02 05:37:42',
'eta_delta': '0:00:06.342511 ago',
'body': '',
'headers': [('user-header', 'some-value')
('X-AppEngine-QueueName': 'update-queue'),
('X-AppEngine-TaskName': 'task-123'),
('X-AppEngine-TaskRetryCount': '0'),
('X-AppEngine-Development-Payload': '1'),
('Content-Length': 0),
('Content-Type': 'application/octet-stream')]
Raises:
ValueError: A task request contains an unknown HTTP method type.
"""
tasks = self._taskqueues.get(queue_name, [])
result_tasks = []
for task_request in tasks:
task = {}
result_tasks.append(task)
task['name'] = task_request.task_name()
task['queue_name'] = queue_name
task['url'] = task_request.url()
method = task_request.method()
if method == taskqueue_service_pb.TaskQueueAddRequest.GET:
task['method'] = 'GET'
elif method == taskqueue_service_pb.TaskQueueAddRequest.POST:
task['method'] = 'POST'
elif method == taskqueue_service_pb.TaskQueueAddRequest.HEAD:
task['method'] = 'HEAD'
elif method == taskqueue_service_pb.TaskQueueAddRequest.PUT:
task['method'] = 'PUT'
elif method == taskqueue_service_pb.TaskQueueAddRequest.DELETE:
task['method'] = 'DELETE'
else:
raise ValueError('Unexpected method: %d' % method)
task['eta'] = _FormatEta(task_request.eta_usec())
task['eta_delta'] = _EtaDelta(task_request.eta_usec())
task['body'] = base64.b64encode(task_request.body())
headers = [(header.key(), header.value())
for header in task_request.header_list()
if header.key().lower() not in BUILT_IN_HEADERS]
headers.append(('X-AppEngine-QueueName', queue_name))
headers.append(('X-AppEngine-TaskName', task['name']))
headers.append(('X-AppEngine-TaskRetryCount', '0'))
headers.append(('X-AppEngine-Development-Payload', '1'))
headers.append(('Content-Length', len(task['body'])))
if 'content-type' not in frozenset(key.lower() for key, _ in headers):
headers.append(('Content-Type', 'application/octet-stream'))
task['headers'] = headers
return result_tasks
def DeleteTask(self, queue_name, task_name):
"""Deletes a task from a queue.
Args:
queue_name: the name of the queue to delete the task from.
task_name: the name of the task to delete.
"""
tasks = self._taskqueues.get(queue_name, [])
for task in tasks:
if task.task_name() == task_name:
tasks.remove(task)
return
def FlushQueue(self, queue_name):
"""Removes all tasks from a queue.
Args:
queue_name: the name of the queue to remove tasks from.
"""
self._taskqueues[queue_name] = []
def _Dynamic_UpdateQueue(self, request, unused_response):
"""Local implementation of the UpdateQueue RPC in TaskQueueService.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest.
unused_response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse.
Not used.
"""
queues = self._app_queues.setdefault(request.app_id(), {})
if request.queue_name() in queues and queues[request.queue_name()] is None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
defensive_copy = self._QueueDetails()
defensive_copy.CopyFrom(request)
queues[request.queue_name()] = defensive_copy
def _Dynamic_FetchQueues(self, request, response):
"""Local implementation of the FetchQueues RPC in TaskQueueService.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest.
response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse.
"""
queues = self._app_queues.get(request.app_id(), {})
for unused_key, queue in sorted(queues.items()):
if request.max_rows() == response.queue_size():
break
if queue is None:
continue
response_queue = response.add_queue()
response_queue.set_queue_name(queue.queue_name())
response_queue.set_bucket_refill_per_second(
queue.bucket_refill_per_second())
response_queue.set_bucket_capacity(queue.bucket_capacity())
response_queue.set_user_specified_rate(queue.user_specified_rate())
if queue.has_max_concurrent_requests():
response_queue.set_max_concurrent_requests(
queue.max_concurrent_requests())
response_queue.set_paused(queue.paused)
def _Dynamic_FetchQueueStats(self, request, response):
"""Local 'random' implementation of the TaskQueueService.FetchQueueStats.
This implementation loads some stats from the dummy store,
the rest with random numbers.
Must adhere to the '_Dynamic_' naming convention for stubbing to work.
See taskqueue_service.proto for a full description of the RPC.
Args:
request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest.
response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse.
"""
for queue in request.queue_name_list():
store = self.GetDummyTaskStore(request.app_id(), queue)
stats = response.add_queuestats()
stats.set_num_tasks(store.Count())
if stats.num_tasks() == 0:
stats.set_oldest_eta_usec(-1)
else:
stats.set_oldest_eta_usec(store.Oldest())
if random.randint(0, 9) > 0:
scanner_info = stats.mutable_scanner_info()
scanner_info.set_executed_last_minute(random.randint(0, 10))
scanner_info.set_executed_last_hour(scanner_info.executed_last_minute()
+ random.randint(0, 100))
scanner_info.set_sampling_duration_seconds(random.random() * 10000.0)
scanner_info.set_requests_in_flight(random.randint(0, 10))
def GetDummyTaskStore(self, app_id, queue_name):
"""Get the dummy task store for this app_id/queue_name pair.
Creates an entry and populates it, if there's not already an entry.
Args:
app_id: the app_id.
queue_name: the queue_name.
Returns:
the existing or the new dummy store.
"""
task_store_key = (app_id, queue_name)
if task_store_key not in admin_console_dummy_tasks:
store = _DummyTaskStore()
if not self._all_queues_valid and queue_name != CRON_QUEUE_NAME:
store.Populate(random.randint(10, 100))
admin_console_dummy_tasks[task_store_key] = store
else:
store = admin_console_dummy_tasks[task_store_key]
return store
def _Dynamic_QueryTasks(self, request, response):
"""Local implementation of the TaskQueueService.QueryTasks RPC.
Uses the dummy store, creating tasks if this is the first time the
queue has been seen.
Args:
request: A taskqueue_service_pb.TaskQueueQueryTasksRequest.
response: A taskqueue_service_pb.TaskQueueQueryTasksResponse.
"""
store = self.GetDummyTaskStore(request.app_id(), request.queue_name())
if request.has_start_eta_usec():
tasks = store.Lookup(request.max_rows(), name=request.start_task_name(),
eta=request.start_eta_usec())
else:
tasks = store.Lookup(request.max_rows(), name=request.start_task_name())
for task in tasks:
response.add_task().MergeFrom(task)
def _Dynamic_Delete(self, request, response):
"""Local delete implementation of TaskQueueService.Delete.
Deletes tasks from the dummy store. A 1/20 chance of a transient error.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteRequest.
response: A taskqueue_service_pb.TaskQueueDeleteResponse.
"""
task_store_key = (request.app_id(), request.queue_name())
if task_store_key not in admin_console_dummy_tasks:
for _ in request.task_name_list():
response.add_result(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
return
store = admin_console_dummy_tasks[task_store_key]
for taskname in request.task_name_list():
if random.random() <= 0.05:
response.add_result(
taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR)
else:
response.add_result(store.Delete(taskname))
def _Dynamic_ForceRun(self, request, response):
"""Local force run implementation of TaskQueueService.ForceRun.
Forces running of a task in a queue. This is a no-op here.
This will fail randomly for testing.
Args:
request: A taskqueue_service_pb.TaskQueueForceRunRequest.
response: A taskqueue_service_pb.TaskQueueForceRunResponse.
"""
if random.random() <= 0.05:
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR)
elif random.random() <= 0.052:
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR)
else:
response.set_result(
taskqueue_service_pb.TaskQueueServiceError.OK)
def _Dynamic_DeleteQueue(self, request, response):
"""Local delete implementation of TaskQueueService.DeleteQueue.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteQueueRequest.
response: A taskqueue_service_pb.TaskQueueDeleteQueueResponse.
"""
if not request.queue_name():
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME)
queues = self._app_queues.get(request.app_id(), {})
if request.queue_name() not in queues:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
elif queues[request.queue_name()] is None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
queues[request.queue_name()] = None
def _Dynamic_PauseQueue(self, request, response):
"""Local pause implementation of TaskQueueService.PauseQueue.
Args:
request: A taskqueue_service_pb.TaskQueuePauseQueueRequest.
response: A taskqueue_service_pb.TaskQueuePauseQueueResponse.
"""
if not request.queue_name():
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME)
queues = self._app_queues.get(request.app_id(), {})
if request.queue_name() != DEFAULT_QUEUE_NAME:
if request.queue_name() not in queues:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
elif queues[request.queue_name()] is None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
queues[request.queue_name()].paused = request.pause()
def _Dynamic_PurgeQueue(self, request, response):
"""Local purge implementation of TaskQueueService.PurgeQueue.
Args:
request: A taskqueue_service_pb.TaskQueuePurgeQueueRequest.
response: A taskqueue_service_pb.TaskQueuePurgeQueueResponse.
"""
if not request.queue_name():
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME)
if request.has_app_id():
queues = self._app_queues.get(request.app_id(), {})
if request.queue_name() != DEFAULT_QUEUE_NAME:
if request.queue_name() not in queues:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
elif queues[request.queue_name()] is None:
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
store = self.GetDummyTaskStore(request.app_id(), request.queue_name())
for task in store.Lookup(store.Count()):
store.Delete(task.task_name())
elif (not self._IsValidQueue(request.queue_name(), None)
and not request.queue_name() in self._taskqueues):
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
self.FlushQueue(request.queue_name())
def _Dynamic_DeleteGroup(self, request, response):
"""Local delete implementation of TaskQueueService.DeleteGroup.
Args:
request: A taskqueue_service_pb.TaskQueueDeleteGroupRequest.
response: A taskqueue_service_pb.TaskQueueDeleteGroupResponse.
"""
queues = self._app_queues.get(request.app_id(), {})
for queue in queues.iterkeys():
store = self.GetDummyTaskStore(request.app_id(), queue)
for task in store.Lookup(store.Count()):
store.Delete(task.task_name())
self.FlushQueue(queue)
self._app_queues[request.app_id()] = {}
def _Dynamic_UpdateStorageLimit(self, request, response):
"""Local implementation of TaskQueueService.UpdateStorageLimit.
Args:
request: A taskqueue_service_pb.TaskQueueUpdateStorageLimitRequest.
response: A taskqueue_service_pb.TaskQueueUpdateStorageLimitResponse.
"""
if request.limit() < 0 or request.limit() > 1000 * (1024 ** 4):
raise apiproxy_errors.ApplicationError(
taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST)
response.set_new_limit(request.limit())