| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| |
| |
| """Stub version of the Task Queue API. |
| |
| This stub stores tasks and runs them via dev_appserver's AddEvent capability. |
| It also validates the tasks by checking their queue name against the queue.yaml. |
| |
| As well as implementing Task Queue API functions, the stub exposes various other |
| functions that are used by the dev_appserver's admin console to display the |
| application's queues and tasks. |
| """ |
| |
| from __future__ import with_statement |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| __all__ = [] |
| |
| import base64 |
| import bisect |
| import calendar |
| import datetime |
| import logging |
| import os |
| import random |
| import string |
| import threading |
| import time |
| |
| import taskqueue_service_pb |
| import taskqueue |
| |
| from google.appengine.api import api_base_pb |
| from google.appengine.api import apiproxy_stub |
| from google.appengine.api import apiproxy_stub_map |
| from google.appengine.api import queueinfo |
| from google.appengine.api import request_info |
| from google.appengine.api.taskqueue import taskqueue |
| from google.appengine.runtime import apiproxy_errors |
| |
| |
| |
| |
| DEFAULT_RATE = '5.00/s' |
| DEFAULT_RATE_FLOAT = 5.0 |
| |
| |
| |
| |
| |
| DEFAULT_BUCKET_SIZE = 5 |
| |
| |
| MAX_ETA = datetime.timedelta(days=30) |
| |
| |
| |
| |
| MAX_PULL_TASK_SIZE_BYTES = 2 ** 20 |
| |
| MAX_PUSH_TASK_SIZE_BYTES = 100 * (2 ** 10) |
| |
| MAX_TASK_SIZE = MAX_PUSH_TASK_SIZE_BYTES |
| |
| |
| MAX_REQUEST_SIZE = 32 << 20 |
| |
| |
| |
| BUILT_IN_HEADERS = set(['x-appengine-queuename', |
| 'x-appengine-taskname', |
| 'x-appengine-taskexecutioncount', |
| 'x-appengine-taskpreviousresponse', |
| 'x-appengine-taskretrycount', |
| 'x-appengine-tasketa', |
| 'x-appengine-development-payload', |
| 'content-length']) |
| |
| |
| |
| DEFAULT_QUEUE_NAME = 'default' |
| |
| |
| |
| |
| INF = 1e500 |
| |
| |
| QUEUE_MODE = taskqueue_service_pb.TaskQueueMode |
| |
| AUTOMATIC_QUEUES = { |
| DEFAULT_QUEUE_NAME: (0.2, DEFAULT_BUCKET_SIZE, DEFAULT_RATE), |
| |
| |
| '__cron': (1, 1, '1/s')} |
| |
| |
| def _GetAppId(request): |
| """Returns the app id to use for the given request. |
| |
| Args: |
| request: A protocol buffer that has an app_id field. |
| |
| Returns: |
| A string containing the app id or None if no app id was specified. |
| """ |
| if request.has_app_id(): |
| return request.app_id() |
| else: |
| return None |
| |
| |
| def _SecToUsec(t): |
| """Converts a time in seconds since the epoch to usec since the epoch. |
| |
| Args: |
| t: Time in seconds since the unix epoch |
| |
| Returns: |
| An integer containing the number of usec since the unix epoch. |
| """ |
| return int(t * 1e6) |
| |
| |
| def _UsecToSec(t): |
| """Converts a time in usec since the epoch to seconds since the epoch. |
| |
| Args: |
| t: Time in usec since the unix epoch |
| |
| Returns: |
| A float containing the number of seconds since the unix epoch. |
| """ |
| return t / 1e6 |
| |
| |
| |
| def _FormatEta(eta_usec): |
| """Formats a task ETA as a date string in UTC.""" |
| eta = datetime.datetime.utcfromtimestamp(_UsecToSec(eta_usec)) |
| return eta.strftime('%Y/%m/%d %H:%M:%S') |
| |
| |
| def _TruncDelta(timedelta): |
| """Strips the microseconds field from a timedelta. |
| |
| Args: |
| timedelta: a datetime.timedelta. |
| |
| Returns: |
| A datetime.timedelta with the microseconds field not filled. |
| """ |
| return datetime.timedelta(days=timedelta.days, seconds=timedelta.seconds) |
| |
| |
| def _EtaDelta(eta_usec, now): |
| """Formats a task ETA as a relative time string.""" |
| eta = datetime.datetime.utcfromtimestamp(_UsecToSec(eta_usec)) |
| if eta > now: |
| return '%s from now' % _TruncDelta(eta - now) |
| else: |
| return '%s ago' % _TruncDelta(now - eta) |
| |
| |
| def QueryTasksResponseToDict(queue_name, task_response, now): |
| """Converts a TaskQueueQueryTasksResponse_Task protobuf group into a dict. |
| |
| Args: |
| queue_name: The name of the queue this task came from. |
| task_response: An instance of TaskQueueQueryTasksResponse_Task. |
| now: A datetime.datetime object containing the current time in UTC. |
| |
| Returns: |
| A dict containing the fields used by the dev appserver's admin console. |
| |
| Raises: |
| ValueError: A task response contains an unknown HTTP method type. |
| """ |
| task = {} |
| |
| task['name'] = task_response.task_name() |
| task['queue_name'] = queue_name |
| task['url'] = task_response.url() |
| method = task_response.method() |
| if method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.GET: |
| task['method'] = 'GET' |
| elif method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.POST: |
| task['method'] = 'POST' |
| elif method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.HEAD: |
| task['method'] = 'HEAD' |
| elif method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.PUT: |
| task['method'] = 'PUT' |
| elif method == taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.DELETE: |
| task['method'] = 'DELETE' |
| else: |
| raise ValueError('Unexpected method: %d' % method) |
| |
| task['eta'] = _FormatEta(task_response.eta_usec()) |
| task['eta_usec'] = task_response.eta_usec() |
| task['eta_delta'] = _EtaDelta(task_response.eta_usec(), now) |
| task['body'] = base64.b64encode(task_response.body()) |
| |
| |
| |
| headers = [(header.key(), header.value()) |
| for header in task_response.header_list() |
| if header.key().lower() not in BUILT_IN_HEADERS] |
| |
| |
| headers.append(('X-AppEngine-QueueName', queue_name)) |
| headers.append(('X-AppEngine-TaskName', task_response.task_name())) |
| headers.append(('X-AppEngine-TaskRetryCount', |
| str(task_response.retry_count()))) |
| headers.append(('X-AppEngine-TaskETA', |
| str(_UsecToSec(task_response.eta_usec())))) |
| headers.append(('X-AppEngine-Development-Payload', '1')) |
| headers.append(('Content-Length', str(len(task['body'])))) |
| if 'content-type' not in frozenset(key.lower() for key, _ in headers): |
| headers.append(('Content-Type', 'application/octet-stream')) |
| headers.append(('X-AppEngine-TaskExecutionCount', |
| str(task_response.execution_count()))) |
| if task_response.has_runlog() and task_response.runlog().has_response_code(): |
| headers.append(('X-AppEngine-TaskPreviousResponse', |
| str(task_response.runlog().response_code()))) |
| task['headers'] = headers |
| |
| return task |
| |
| |
| class _Group(object): |
| """A taskqueue group. |
| |
| This class contains all of the queues for an application. |
| """ |
| |
| def __init__(self, queue_yaml_parser=None, app_id=None, |
| _all_queues_valid=False, _update_newest_eta=None, |
| _testing_validate_state=False): |
| """Constructor. |
| |
| Args: |
| queue_yaml_parser: A function that takes no parameters and returns the |
| parsed results of the queue.yaml file. If this queue is not based on a |
| queue.yaml file use None. |
| app_id: The app id this Group is representing or None if it is the |
| currently running application. |
| _all_queues_valid: Automatically generate queues on first access. |
| _update_newest_eta: Callable for automatically executing tasks. |
| Takes the ETA of the task in seconds since the epoch, the queue_name |
| and a task name. May be None if automatic task running is disabled. |
| _testing_validate_state: Should this _Group and all of its _Queues |
| validate their state after each operation? This should only be used |
| during testing of the taskqueue_stub. |
| """ |
| |
| |
| self._queues = {} |
| self._queue_yaml_parser = queue_yaml_parser |
| self._all_queues_valid = _all_queues_valid |
| self._next_task_id = 1 |
| self._app_id = app_id |
| if _update_newest_eta is None: |
| self._update_newest_eta = lambda x: None |
| else: |
| self._update_newest_eta = _update_newest_eta |
| self._testing_validate_state = _testing_validate_state |
| |
| |
| |
| |
| def GetQueuesAsDicts(self): |
| """Gets all the applications's queues. |
| |
| Returns: |
| A list of dictionaries, where each dictionary contains one queue's |
| attributes. E.g.: |
| [{'name': 'some-queue', |
| 'max_rate': '1/s', |
| 'bucket_size': 5, |
| 'oldest_task': '2009/02/02 05:37:42', |
| 'eta_delta': '0:00:06.342511 ago', |
| 'tasks_in_queue': 12, |
| 'acl': ['user1@gmail.com']}, ...] |
| The list of queues always includes the default queue. |
| """ |
| self._ReloadQueuesFromYaml() |
| now = datetime.datetime.utcnow() |
| |
| queues = [] |
| for queue_name, queue in sorted(self._queues.items()): |
| queue_dict = {} |
| queues.append(queue_dict) |
| |
| queue_dict['name'] = queue_name |
| queue_dict['bucket_size'] = queue.bucket_capacity |
| if queue.user_specified_rate is not None: |
| queue_dict['max_rate'] = queue.user_specified_rate |
| else: |
| queue_dict['max_rate'] = '' |
| if queue.queue_mode == QUEUE_MODE.PULL: |
| queue_dict['mode'] = 'pull' |
| else: |
| queue_dict['mode'] = 'push' |
| queue_dict['acl'] = queue.acl |
| |
| |
| oldest_eta = queue.Oldest() |
| if oldest_eta: |
| queue_dict['oldest_task'] = _FormatEta(oldest_eta) |
| queue_dict['eta_delta'] = _EtaDelta(oldest_eta, now) |
| else: |
| queue_dict['oldest_task'] = '' |
| queue_dict['eta_delta'] = '' |
| queue_dict['tasks_in_queue'] = queue.Count() |
| |
| if queue.retry_parameters: |
| retry_proto = queue.retry_parameters |
| retry_dict = {} |
| |
| if retry_proto.has_retry_limit(): |
| retry_dict['retry_limit'] = retry_proto.retry_limit() |
| if retry_proto.has_age_limit_sec(): |
| retry_dict['age_limit_sec'] = retry_proto.age_limit_sec() |
| if retry_proto.has_min_backoff_sec(): |
| retry_dict['min_backoff_sec'] = retry_proto.min_backoff_sec() |
| if retry_proto.has_max_backoff_sec(): |
| retry_dict['max_backoff_sec'] = retry_proto.max_backoff_sec() |
| if retry_proto.has_max_doublings(): |
| retry_dict['max_doublings'] = retry_proto.max_doublings() |
| |
| queue_dict['retry_parameters'] = retry_dict |
| return queues |
| |
| def HasQueue(self, queue_name): |
| """Check if the specified queue_name references a valid queue. |
| |
| Args: |
| queue_name: The name of the queue to check. |
| |
| Returns: |
| True if the queue exists, False otherwise. |
| """ |
| self._ReloadQueuesFromYaml() |
| return queue_name in self._queues and ( |
| self._queues[queue_name] is not None) |
| |
| def GetQueue(self, queue_name): |
| """Gets the _Queue instance for the specified queue. |
| |
| Args: |
| queue_name: The name of the queue to fetch. |
| |
| Returns: |
| The _Queue instance for the specified queue. |
| |
| Raises: |
| KeyError if the queue does not exist. |
| """ |
| self._ReloadQueuesFromYaml() |
| return self._queues[queue_name] |
| |
| def GetNextPushTask(self): |
| """Finds the task with the lowest eta. |
| |
| Returns: |
| A tuple containing the queue and task instance for the task with the |
| lowest eta, or (None, None) if there are no tasks. |
| """ |
| min_eta = INF |
| result = None, None |
| |
| |
| for queue in self._queues.itervalues(): |
| if queue.queue_mode == QUEUE_MODE.PULL: |
| continue |
| task = queue.OldestTask() |
| if not task: |
| continue |
| if task.eta_usec() < min_eta: |
| result = queue, task |
| min_eta = task.eta_usec() |
| return result |
| |
| def _ConstructQueue(self, queue_name, *args, **kwargs): |
| if '_testing_validate_state' in kwargs: |
| raise TypeError( |
| '_testing_validate_state should not be passed to _ConstructQueue') |
| kwargs['_testing_validate_state'] = self._testing_validate_state |
| self._queues[queue_name] = _Queue(queue_name, *args, **kwargs) |
| |
| def _ConstructAutomaticQueue(self, queue_name): |
| if queue_name in AUTOMATIC_QUEUES: |
| self._ConstructQueue(queue_name, *AUTOMATIC_QUEUES[queue_name]) |
| else: |
| |
| |
| assert self._all_queues_valid |
| self._ConstructQueue(queue_name) |
| |
| def _ReloadQueuesFromYaml(self): |
| """Update the queue map with the contents of the queue.yaml file. |
| |
| This function will remove queues that no longer exist in the queue.yaml |
| file. |
| |
| If no queue yaml parser has been defined, this function is a no-op. |
| """ |
| if not self._queue_yaml_parser: |
| return |
| |
| queue_info = self._queue_yaml_parser() |
| |
| if queue_info and queue_info.queue: |
| queues = queue_info.queue |
| else: |
| queues = [] |
| |
| old_queues = set(self._queues) |
| new_queues = set() |
| |
| for entry in queues: |
| queue_name = entry.name |
| new_queues.add(queue_name) |
| |
| retry_parameters = None |
| |
| |
| if entry.bucket_size: |
| bucket_size = entry.bucket_size |
| else: |
| bucket_size = DEFAULT_BUCKET_SIZE |
| if entry.retry_parameters: |
| retry_parameters = queueinfo.TranslateRetryParameters( |
| entry.retry_parameters) |
| |
| if entry.mode == 'pull': |
| mode = QUEUE_MODE.PULL |
| if entry.rate is not None: |
| logging.warning( |
| 'Refill rate must not be specified for pull-based queue. ' |
| 'Please check queue.yaml file.') |
| else: |
| mode = QUEUE_MODE.PUSH |
| if entry.rate is None: |
| logging.warning( |
| 'Refill rate must be specified for push-based queue. ' |
| 'Please check queue.yaml file.') |
| max_rate = entry.rate |
| |
| if entry.acl is not None: |
| acl = taskqueue_service_pb.TaskQueueAcl() |
| for acl_entry in entry.acl: |
| acl.add_user_email(acl_entry.user_email) |
| else: |
| acl = None |
| |
| if self._queues.get(queue_name) is None: |
| |
| self._ConstructQueue(queue_name, bucket_capacity=bucket_size, |
| user_specified_rate=max_rate, queue_mode=mode, |
| acl=acl, retry_parameters=retry_parameters, |
| target=entry.target) |
| else: |
| |
| |
| queue = self._queues[queue_name] |
| queue.bucket_size = bucket_size |
| queue.user_specified_rate = max_rate |
| queue.acl = acl |
| queue.queue_mode = mode |
| queue.retry_parameters = retry_parameters |
| if mode == QUEUE_MODE.PUSH: |
| eta = queue.Oldest() |
| if eta: |
| self._update_newest_eta(_UsecToSec(eta)) |
| |
| if DEFAULT_QUEUE_NAME not in self._queues: |
| self._ConstructAutomaticQueue(DEFAULT_QUEUE_NAME) |
| |
| |
| new_queues.add(DEFAULT_QUEUE_NAME) |
| if not self._all_queues_valid: |
| |
| for queue_name in old_queues - new_queues: |
| |
| |
| |
| del self._queues[queue_name] |
| |
| |
| |
| |
| def _ValidateQueueName(self, queue_name): |
| """Tests if the specified queue exists and creates it if needed. |
| |
| This function replicates the behaviour of the taskqueue service by |
| automatically creating the 'automatic' queues when they are first accessed. |
| |
| Args: |
| queue_name: The name queue of the queue to check. |
| |
| Returns: |
| If there are no problems, returns TaskQueueServiceError.OK. Otherwise |
| returns the correct constant from TaskQueueServiceError. |
| """ |
| if not queue_name: |
| return taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME |
| elif queue_name not in self._queues: |
| if queue_name in AUTOMATIC_QUEUES or self._all_queues_valid: |
| |
| self._ConstructAutomaticQueue(queue_name) |
| else: |
| return taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE |
| elif self._queues[queue_name] is None: |
| return taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE |
| |
| return taskqueue_service_pb.TaskQueueServiceError.OK |
| |
| def _CheckQueueForRpc(self, queue_name): |
| """Ensures the specified queue exists and creates it if needed. |
| |
| This function replicates the behaviour of the taskqueue service by |
| automatically creating the 'automatic' queues when they are first accessed. |
| |
| Args: |
| queue_name: The name queue of the queue to check |
| |
| Raises: |
| ApplicationError: If the queue name is invalid, tombstoned or does not |
| exist. |
| """ |
| self._ReloadQueuesFromYaml() |
| |
| response = self._ValidateQueueName(queue_name) |
| if response != taskqueue_service_pb.TaskQueueServiceError.OK: |
| raise apiproxy_errors.ApplicationError(response) |
| |
| def _ChooseTaskName(self): |
| """Returns a string containing a unique task name.""" |
| |
| |
| |
| |
| self._next_task_id += 1 |
| return 'task%d' % (self._next_task_id - 1) |
| |
| def _VerifyTaskQueueAddRequest(self, request, now): |
| """Checks that a TaskQueueAddRequest is valid. |
| |
| Checks that a TaskQueueAddRequest specifies a valid eta and a valid queue. |
| |
| Args: |
| request: The taskqueue_service_pb.TaskQueueAddRequest to validate. |
| now: A datetime.datetime object containing the current time in UTC. |
| |
| Returns: |
| A taskqueue_service_pb.TaskQueueServiceError indicating any problems with |
| the request or taskqueue_service_pb.TaskQueueServiceError.OK if it is |
| valid. |
| """ |
| if request.eta_usec() < 0: |
| return taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA |
| |
| eta = datetime.datetime.utcfromtimestamp(_UsecToSec(request.eta_usec())) |
| max_eta = now + MAX_ETA |
| if eta > max_eta: |
| return taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA |
| |
| |
| queue_name_response = self._ValidateQueueName(request.queue_name()) |
| if queue_name_response != taskqueue_service_pb.TaskQueueServiceError.OK: |
| return queue_name_response |
| |
| |
| if request.has_crontimetable() and self._app_id is None: |
| return taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED |
| |
| if request.mode() == QUEUE_MODE.PULL: |
| max_task_size_bytes = MAX_PULL_TASK_SIZE_BYTES |
| else: |
| max_task_size_bytes = MAX_PUSH_TASK_SIZE_BYTES |
| |
| if request.ByteSize() > max_task_size_bytes: |
| return taskqueue_service_pb.TaskQueueServiceError.TASK_TOO_LARGE |
| |
| return taskqueue_service_pb.TaskQueueServiceError.OK |
| |
| |
| |
| |
| def BulkAdd_Rpc(self, request, response): |
| """Add many tasks to a queue using a single request. |
| |
| Args: |
| request: The taskqueue_service_pb.TaskQueueBulkAddRequest. See |
| taskqueue_service.proto. |
| response: The taskqueue_service_pb.TaskQueueBulkAddResponse. See |
| taskqueue_service.proto. |
| """ |
| self._ReloadQueuesFromYaml() |
| |
| |
| if not request.add_request(0).queue_name(): |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE) |
| |
| error_found = False |
| task_results_with_chosen_names = set() |
| now = datetime.datetime.utcfromtimestamp(time.time()) |
| |
| |
| for add_request in request.add_request_list(): |
| task_result = response.add_taskresult() |
| result = self._VerifyTaskQueueAddRequest(add_request, now) |
| if result == taskqueue_service_pb.TaskQueueServiceError.OK: |
| if not add_request.task_name(): |
| chosen_name = self._ChooseTaskName() |
| add_request.set_task_name(chosen_name) |
| task_results_with_chosen_names.add(id(task_result)) |
| |
| |
| |
| task_result.set_result( |
| taskqueue_service_pb.TaskQueueServiceError.SKIPPED) |
| else: |
| error_found = True |
| task_result.set_result(result) |
| |
| if error_found: |
| return |
| |
| |
| if request.add_request(0).has_transaction(): |
| self._TransactionalBulkAdd(request) |
| else: |
| self._NonTransactionalBulkAdd(request, response, now) |
| |
| |
| for add_request, task_result in zip(request.add_request_list(), |
| response.taskresult_list()): |
| if (task_result.result() == |
| taskqueue_service_pb.TaskQueueServiceError.SKIPPED): |
| task_result.set_result(taskqueue_service_pb.TaskQueueServiceError.OK) |
| if id(task_result) in task_results_with_chosen_names: |
| task_result.set_chosen_task_name(add_request.task_name()) |
| |
| def _TransactionalBulkAdd(self, request): |
| """Uses datastore.AddActions to associate tasks with a transaction. |
| |
| Args: |
| request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the |
| tasks to add. N.B. all tasks in the request have been validated and |
| assigned unique names. |
| """ |
| try: |
| apiproxy_stub_map.MakeSyncCall( |
| 'datastore_v3', 'AddActions', request, api_base_pb.VoidProto()) |
| except apiproxy_errors.ApplicationError, e: |
| raise apiproxy_errors.ApplicationError( |
| e.application_error + |
| taskqueue_service_pb.TaskQueueServiceError.DATASTORE_ERROR, |
| e.error_detail) |
| |
| def _NonTransactionalBulkAdd(self, request, response, now): |
| """Adds tasks to the appropriate _Queue instance. |
| |
| Args: |
| request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the |
| tasks to add. N.B. all tasks in the request have been validated and |
| those with empty names have been assigned unique names. |
| response: The taskqueue_service_pb.TaskQueueBulkAddResponse to populate |
| with the results. N.B. the chosen_task_name field in the response will |
| not be filled-in. |
| now: A datetime.datetime object containing the current time in UTC. |
| """ |
| queue_mode = request.add_request(0).mode() |
| |
| |
| queue_name = request.add_request(0).queue_name() |
| store = self._queues[queue_name] |
| if store.queue_mode != queue_mode: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_MODE) |
| |
| for add_request, task_result in zip(request.add_request_list(), |
| response.taskresult_list()): |
| try: |
| store.Add(add_request, now) |
| except apiproxy_errors.ApplicationError, e: |
| task_result.set_result(e.application_error) |
| else: |
| task_result.set_result(taskqueue_service_pb.TaskQueueServiceError.OK) |
| if (store.queue_mode == QUEUE_MODE.PUSH and |
| store.Oldest() == add_request.eta_usec()): |
| self._update_newest_eta(_UsecToSec(add_request.eta_usec())) |
| |
| def UpdateQueue_Rpc(self, request, response): |
| """Implementation of the UpdateQueue RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest. |
| response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse. |
| """ |
| queue_name = request.queue_name() |
| |
| response = self._ValidateQueueName(queue_name) |
| |
| is_unknown_queue = ( |
| response == taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE) |
| if response != taskqueue_service_pb.TaskQueueServiceError.OK and ( |
| not is_unknown_queue): |
| raise apiproxy_errors.ApplicationError(response) |
| |
| if is_unknown_queue: |
| self._queues[queue_name] = _Queue(request.queue_name()) |
| |
| |
| |
| if self._app_id is not None: |
| self._queues[queue_name].Populate(random.randint(10, 100)) |
| self._queues[queue_name].UpdateQueue_Rpc(request, response) |
| |
| def FetchQueues_Rpc(self, request, response): |
| """Implementation of the FetchQueues RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest. |
| response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse. |
| """ |
| self._ReloadQueuesFromYaml() |
| for queue_name in sorted(self._queues): |
| if response.queue_size() > request.max_rows(): |
| break |
| |
| |
| if self._queues[queue_name] is None: |
| continue |
| |
| |
| self._queues[queue_name].FetchQueues_Rpc(request, response) |
| |
| def FetchQueueStats_Rpc(self, request, response): |
| """Implementation of the FetchQueueStats rpc which returns 'random' data. |
| |
| This implementation loads some stats from the task store, the rest are |
| random numbers. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest. |
| response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse. |
| """ |
| for queue_name in request.queue_name_list(): |
| stats = response.add_queuestats() |
| if queue_name not in self._queues: |
| |
| stats.set_num_tasks(0) |
| stats.set_oldest_eta_usec(-1) |
| continue |
| store = self._queues[queue_name] |
| |
| stats.set_num_tasks(store.Count()) |
| if stats.num_tasks() == 0: |
| stats.set_oldest_eta_usec(-1) |
| else: |
| stats.set_oldest_eta_usec(store.Oldest()) |
| |
| |
| if random.randint(0, 9) > 0: |
| scanner_info = stats.mutable_scanner_info() |
| scanner_info.set_executed_last_minute(random.randint(0, 10)) |
| scanner_info.set_executed_last_hour(scanner_info.executed_last_minute() |
| + random.randint(0, 100)) |
| scanner_info.set_sampling_duration_seconds(random.random() * 10000.0) |
| scanner_info.set_requests_in_flight(random.randint(0, 10)) |
| |
| def QueryTasks_Rpc(self, request, response): |
| """Implementation of the QueryTasks RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueQueryTasksRequest. |
| response: A taskqueue_service_pb.TaskQueueQueryTasksResponse. |
| """ |
| self._CheckQueueForRpc(request.queue_name()) |
| self._queues[request.queue_name()].QueryTasks_Rpc(request, response) |
| |
| def FetchTask_Rpc(self, request, response): |
| """Implementation of the FetchTask RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueFetchTaskRequest. |
| response: A taskqueue_service_pb.TaskQueueFetchTaskResponse. |
| """ |
| self._ReloadQueuesFromYaml() |
| |
| self._CheckQueueForRpc(request.queue_name()) |
| self._queues[request.queue_name()].FetchTask_Rpc(request, response) |
| |
| def Delete_Rpc(self, request, response): |
| """Implementation of the Delete RPC. |
| |
| Deletes tasks from the task store. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueDeleteRequest. |
| response: A taskqueue_service_pb.TaskQueueDeleteResponse. |
| """ |
| self._ReloadQueuesFromYaml() |
| |
| def _AddResultForAll(result): |
| for _ in request.task_name_list(): |
| response.add_result(result) |
| if request.queue_name() not in self._queues: |
| _AddResultForAll(taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE) |
| elif self._queues[request.queue_name()] is None: |
| _AddResultForAll( |
| taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE) |
| else: |
| self._queues[request.queue_name()].Delete_Rpc(request, response) |
| |
| def DeleteQueue_Rpc(self, request, response): |
| """Implementation of the DeleteQueue RPC. |
| |
| Tombstones the queue. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueDeleteQueueRequest. |
| response: A taskqueue_service_pb.TaskQueueDeleteQueueResponse. |
| """ |
| self._CheckQueueForRpc(request.queue_name()) |
| |
| |
| self._queues[request.queue_name()] = None |
| |
| def PauseQueue_Rpc(self, request, response): |
| """Implementation of the PauseQueue RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueuePauseQueueRequest. |
| response: A taskqueue_service_pb.TaskQueuePauseQueueResponse. |
| """ |
| self._CheckQueueForRpc(request.queue_name()) |
| self._queues[request.queue_name()].paused = request.pause() |
| |
| def PurgeQueue_Rpc(self, request, response): |
| """Implementation of the PurgeQueue RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueuePurgeQueueRequest. |
| response: A taskqueue_service_pb.TaskQueuePurgeQueueResponse. |
| """ |
| self._CheckQueueForRpc(request.queue_name()) |
| self._queues[request.queue_name()].PurgeQueue() |
| |
| def QueryAndOwnTasks_Rpc(self, request, response): |
| """Implementation of the QueryAndOwnTasks RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest. |
| response: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse. |
| """ |
| self._CheckQueueForRpc(request.queue_name()) |
| |
| |
| |
| self._queues[request.queue_name()].QueryAndOwnTasks_Rpc(request, response) |
| |
| def ModifyTaskLease_Rpc(self, request, response): |
| """Implementation of the ModifyTaskLease RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueModifyTaskLeaseRequest. |
| response: A taskqueue_service_pb.TaskQueueModifyTaskLeaseResponse. |
| """ |
| self._CheckQueueForRpc(request.queue_name()) |
| self._queues[request.queue_name()].ModifyTaskLease_Rpc(request, response) |
| |
| |
| class Retry(object): |
| """Task retry caclulator class. |
| |
| Determines if and when a task should next be run |
| """ |
| |
| |
| |
| _default_params = taskqueue_service_pb.TaskQueueRetryParameters() |
| |
| def __init__(self, task, queue): |
| """Constructor. |
| |
| Args: |
| task: A taskqueue_service_pb.TaskQueueQueryTasksResponse_Task instance. |
| May be None. |
| queue: A _Queue instance. May be None. |
| """ |
| if task is not None and task.has_retry_parameters(): |
| self._params = task.retry_parameters() |
| elif queue is not None and queue.retry_parameters is not None: |
| self._params = queue.retry_parameters |
| else: |
| self._params = self._default_params |
| |
| def CanRetry(self, retry_count, age_usec): |
| """Computes whether a task can be retried. |
| |
| Args: |
| retry_count: An integer specifying which retry this is. |
| age_usec: An integer specifying the microseconds since the first try. |
| |
| Returns: |
| True if a task is eligible for retrying. |
| """ |
| if self._params.has_retry_limit() and self._params.has_age_limit_sec(): |
| return (self._params.retry_limit() >= retry_count or |
| self._params.age_limit_sec() >= _UsecToSec(age_usec)) |
| |
| if self._params.has_retry_limit(): |
| return self._params.retry_limit() >= retry_count |
| |
| if self._params.has_age_limit_sec(): |
| return self._params.age_limit_sec() >= _UsecToSec(age_usec) |
| |
| return True |
| |
| def CalculateBackoffUsec(self, retry_count): |
| """Calculates time before the specified retry. |
| |
| Args: |
| retry_count: An integer specifying which retry this is. |
| |
| Returns: |
| The number of microseconds before a task should be retried. |
| """ |
| exponent = min(retry_count - 1, self._params.max_doublings()) |
| linear_steps = retry_count - exponent |
| min_backoff_usec = _SecToUsec(self._params.min_backoff_sec()) |
| max_backoff_usec = _SecToUsec(self._params.max_backoff_sec()) |
| backoff_usec = min_backoff_usec |
| if exponent > 0: |
| backoff_usec *= (2 ** (min(1023, exponent))) |
| if linear_steps > 1: |
| backoff_usec *= linear_steps |
| |
| return int(min(max_backoff_usec, backoff_usec)) |
| |
| |
| class _Queue(object): |
| """A Taskqueue Queue. |
| |
| This class contains all of the properties of a queue and a sorted list of |
| tasks. |
| """ |
| |
| def __init__(self, queue_name, bucket_refill_per_second=DEFAULT_RATE_FLOAT, |
| bucket_capacity=DEFAULT_BUCKET_SIZE, |
| user_specified_rate=DEFAULT_RATE, retry_parameters=None, |
| max_concurrent_requests=None, paused=False, |
| queue_mode=QUEUE_MODE.PUSH, acl=None, |
| _testing_validate_state=None, target=None): |
| |
| self.queue_name = queue_name |
| self.bucket_refill_per_second = bucket_refill_per_second |
| self.bucket_capacity = bucket_capacity |
| self.user_specified_rate = user_specified_rate |
| self.retry_parameters = retry_parameters |
| self.max_concurrent_requests = max_concurrent_requests |
| self.paused = paused |
| self.queue_mode = queue_mode |
| self.acl = acl |
| self.target = target |
| self._testing_validate_state = _testing_validate_state |
| |
| |
| self.task_name_archive = set() |
| |
| self._sorted_by_name = [] |
| |
| self._sorted_by_eta = [] |
| |
| self._sorted_by_tag = [] |
| |
| |
| self._lock = threading.Lock() |
| |
| def VerifyIndexes(self): |
| """Ensures that all three indexes are in a valid state. |
| |
| This method is used by internal tests and should not need to be called in |
| any other circumstances. |
| |
| Raises: |
| AssertionError: if the indexes are not in a valid state. |
| """ |
| assert self._IsInOrder(self._sorted_by_name) |
| assert self._IsInOrder(self._sorted_by_eta) |
| assert self._IsInOrder(self._sorted_by_tag) |
| |
| tasks_by_name = set() |
| tasks_with_tags = set() |
| for name, task in self._sorted_by_name: |
| assert name == task.task_name() |
| assert name not in tasks_by_name |
| tasks_by_name.add(name) |
| if task.has_tag(): |
| tasks_with_tags.add(name) |
| |
| tasks_by_eta = set() |
| for eta, name, task in self._sorted_by_eta: |
| assert name == task.task_name() |
| assert eta == task.eta_usec() |
| assert name not in tasks_by_eta |
| tasks_by_eta.add(name) |
| |
| assert tasks_by_eta == tasks_by_name |
| |
| tasks_by_tag = set() |
| for tag, eta, name, task in self._sorted_by_tag: |
| assert name == task.task_name() |
| assert eta == task.eta_usec() |
| assert task.has_tag() and task.tag() |
| assert tag == task.tag() |
| assert name not in tasks_by_tag |
| tasks_by_tag.add(name) |
| assert tasks_by_tag == tasks_with_tags |
| |
| @staticmethod |
| def _IsInOrder(l): |
| """Determine if the specified list is in ascending order. |
| |
| Args: |
| l: The list to check |
| |
| Returns: |
| True if the list is in order, False otherwise |
| """ |
| sorted_list = sorted(l) |
| return l == sorted_list |
| |
| def _WithLock(f): |
| """Runs the decorated function within self._lock. |
| |
| Args: |
| f: The function to be delegated to. Must be a member function (take self |
| as the first parameter). |
| |
| Returns: |
| The result of f. |
| """ |
| |
| def _Inner(self, *args, **kwargs): |
| with self._lock: |
| ret = f(self, *args, **kwargs) |
| if self._testing_validate_state: |
| self.VerifyIndexes() |
| return ret |
| _Inner.__doc__ = f.__doc__ |
| return _Inner |
| |
| |
| |
| |
| @_WithLock |
| def UpdateQueue_Rpc(self, request, response): |
| """Implementation of the UpdateQueue RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest. |
| response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse. |
| """ |
| assert request.queue_name() == self.queue_name |
| |
| |
| |
| self.bucket_refill_per_second = request.bucket_refill_per_second() |
| self.bucket_capacity = request.bucket_capacity() |
| if request.has_user_specified_rate(): |
| self.user_specified_rate = request.user_specified_rate() |
| else: |
| self.user_specified_rate = None |
| if request.has_retry_parameters(): |
| self.retry_parameters = request.retry_parameters() |
| else: |
| self.retry_parameters = None |
| if request.has_max_concurrent_requests(): |
| self.max_concurrent_requests = request.max_concurrent_requests() |
| else: |
| self.max_concurrent_requests = None |
| self.queue_mode = request.mode() |
| if request.has_acl(): |
| self.acl = request.acl() |
| else: |
| self.acl = None |
| |
| @_WithLock |
| def FetchQueues_Rpc(self, request, response): |
| """Fills out a queue message on the provided TaskQueueFetchQueuesResponse. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest. |
| response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse. |
| """ |
| response_queue = response.add_queue() |
| |
| response_queue.set_queue_name(self.queue_name) |
| response_queue.set_bucket_refill_per_second( |
| self.bucket_refill_per_second) |
| response_queue.set_bucket_capacity(self.bucket_capacity) |
| if self.user_specified_rate is not None: |
| response_queue.set_user_specified_rate(self.user_specified_rate) |
| if self.max_concurrent_requests is not None: |
| response_queue.set_max_concurrent_requests( |
| self.max_concurrent_requests) |
| if self.retry_parameters is not None: |
| response_queue.retry_parameters().CopyFrom(self.retry_parameters) |
| response_queue.set_paused(self.paused) |
| if self.queue_mode is not None: |
| response_queue.set_mode(self.queue_mode) |
| if self.acl is not None: |
| response_queue.mutable_acl().CopyFrom(self.acl) |
| |
| @_WithLock |
| def QueryTasks_Rpc(self, request, response): |
| """Implementation of the QueryTasks RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueQueryTasksRequest. |
| response: A taskqueue_service_pb.TaskQueueQueryTasksResponse. |
| """ |
| |
| assert not request.has_start_tag() |
| |
| if request.has_start_eta_usec(): |
| tasks = self._LookupNoAcquireLock(request.max_rows(), |
| name=request.start_task_name(), |
| eta=request.start_eta_usec()) |
| else: |
| tasks = self._LookupNoAcquireLock(request.max_rows(), |
| name=request.start_task_name()) |
| for task in tasks: |
| response.add_task().MergeFrom(task) |
| |
| @_WithLock |
| def FetchTask_Rpc(self, request, response): |
| """Implementation of the FetchTask RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueFetchTaskRequest. |
| response: A taskqueue_service_pb.TaskQueueFetchTaskResponse. |
| """ |
| task_name = request.task_name() |
| pos = self._LocateTaskByName(task_name) |
| if pos is None: |
| if task_name in self.task_name_archive: |
| error = taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK |
| else: |
| error = taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK |
| raise apiproxy_errors.ApplicationError(error) |
| |
| _, task = self._sorted_by_name[pos] |
| response.mutable_task().add_task().CopyFrom(task) |
| |
| @_WithLock |
| def Delete_Rpc(self, request, response): |
| """Implementation of the Delete RPC. |
| |
| Deletes tasks from the task store. We mimic a 1/20 chance of a |
| TRANSIENT_ERROR when the request has an app_id. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueDeleteRequest. |
| response: A taskqueue_service_pb.TaskQueueDeleteResponse. |
| """ |
| for taskname in request.task_name_list(): |
| if request.has_app_id() and random.random() <= 0.05: |
| response.add_result( |
| taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR) |
| else: |
| response.add_result(self._DeleteNoAcquireLock(taskname)) |
| |
| def _QueryAndOwnTasksGetTaskList(self, max_rows, group_by_tag, now_eta_usec, |
| tag=None): |
| assert self._lock.locked() |
| if group_by_tag and tag: |
| |
| return self._IndexScan(self._sorted_by_tag, |
| start_key=(tag, None, None,), |
| end_key=(tag, now_eta_usec, None,), |
| max_rows=max_rows) |
| elif group_by_tag: |
| |
| tasks = self._IndexScan(self._sorted_by_eta, |
| start_key=(None, None,), |
| end_key=(now_eta_usec, None,), |
| max_rows=max_rows) |
| if not tasks: |
| return [] |
| |
| if tasks[0].has_tag(): |
| tag = tasks[0].tag() |
| return self._QueryAndOwnTasksGetTaskList( |
| max_rows, True, now_eta_usec, tag) |
| else: |
| |
| return [task for task in tasks if not task.has_tag()] |
| else: |
| return self._IndexScan(self._sorted_by_eta, |
| start_key=(None, None,), |
| end_key=(now_eta_usec, None,), |
| max_rows=max_rows) |
| |
| @_WithLock |
| def QueryAndOwnTasks_Rpc(self, request, response): |
| """Implementation of the QueryAndOwnTasks RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest. |
| response: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse. |
| """ |
| if self.queue_mode != QUEUE_MODE.PULL: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_MODE) |
| |
| |
| lease_seconds = request.lease_seconds() |
| if lease_seconds < 0: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST) |
| max_tasks = request.max_tasks() |
| if max_tasks <= 0: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST) |
| |
| |
| if request.has_tag() and not request.group_by_tag(): |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST, |
| 'Tag specified, but group_by_tag was not.') |
| |
| now_eta_usec = _SecToUsec(time.time()) |
| tasks = self._QueryAndOwnTasksGetTaskList( |
| max_tasks, request.group_by_tag(), now_eta_usec, request.tag()) |
| |
| tasks_to_delete = [] |
| for task in tasks: |
| retry = Retry(task, self) |
| if not retry.CanRetry(task.retry_count() + 1, 0): |
| logging.warning( |
| 'Task %s in queue %s cannot be leased again after %d leases.', |
| task.task_name(), self.queue_name, task.retry_count()) |
| tasks_to_delete.append(task) |
| continue |
| |
| self._PostponeTaskNoAcquireLock( |
| task, now_eta_usec + _SecToUsec(lease_seconds)) |
| |
| |
| task_response = response.add_task() |
| task_response.set_task_name(task.task_name()) |
| task_response.set_eta_usec(task.eta_usec()) |
| task_response.set_retry_count(task.retry_count()) |
| if task.has_tag(): |
| task_response.set_tag(task.tag()) |
| |
| |
| |
| task_response.set_body(task.body()) |
| |
| |
| for task in tasks_to_delete: |
| self._DeleteNoAcquireLock(task.task_name()) |
| |
| @_WithLock |
| def ModifyTaskLease_Rpc(self, request, response): |
| """Implementation of the ModifyTaskLease RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest. |
| response: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse. |
| """ |
| if self.queue_mode != QUEUE_MODE.PULL: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_MODE) |
| |
| if self.paused: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.QUEUE_PAUSED) |
| |
| |
| lease_seconds = request.lease_seconds() |
| if lease_seconds < 0: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST) |
| |
| pos = self._LocateTaskByName(request.task_name()) |
| if pos is None: |
| if request.task_name() in self.task_name_archive: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK) |
| else: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK) |
| |
| |
| _, task = self._sorted_by_name[pos] |
| if task.eta_usec() != request.eta_usec(): |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.TASK_LEASE_EXPIRED) |
| |
| now_usec = _SecToUsec(time.time()) |
| |
| if task.eta_usec() < now_usec: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.TASK_LEASE_EXPIRED) |
| |
| |
| future_eta_usec = now_usec + _SecToUsec(lease_seconds) |
| self._PostponeTaskNoAcquireLock( |
| task, future_eta_usec, increase_retries=False) |
| response.set_updated_eta_usec(future_eta_usec) |
| |
| @_WithLock |
| def IncRetryCount(self, task_name): |
| """Increment the retry count of a task by 1. |
| |
| Args: |
| task_name: The name of the task to update. |
| """ |
| pos = self._LocateTaskByName(task_name) |
| assert pos is not None, ( |
| 'Task does not exist when trying to increase retry count.') |
| |
| task = self._sorted_by_name[pos][1] |
| self._IncRetryCount(task) |
| |
| def _IncRetryCount(self, task): |
| assert self._lock.locked() |
| retry_count = task.retry_count() |
| task.set_retry_count(retry_count + 1) |
| |
| task.set_execution_count(task.execution_count() + 1) |
| |
| |
| |
| |
| @_WithLock |
| def GetTasksAsDicts(self): |
| """Gets all of the tasks in this queue. |
| |
| Returns: |
| A list of dictionaries, where each dictionary contains one task's |
| attributes. E.g. |
| [{'name': 'task-123', |
| 'queue_name': 'default', |
| 'url': '/update', |
| 'method': 'GET', |
| 'eta': '2009/02/02 05:37:42', |
| 'eta_delta': '0:00:06.342511 ago', |
| 'body': '', |
| 'headers': [('user-header', 'some-value') |
| ('X-AppEngine-QueueName': 'update-queue'), |
| ('X-AppEngine-TaskName': 'task-123'), |
| ('X-AppEngine-TaskExecutionCount': '1'), |
| ('X-AppEngine-TaskRetryCount': '1'), |
| ('X-AppEngine-TaskETA': '1234567890.123456'), |
| ('X-AppEngine-Development-Payload': '1'), |
| ('X-AppEngine-TaskPreviousResponse': '300'), |
| ('Content-Length': 0), |
| ('Content-Type': 'application/octet-stream')] |
| |
| Raises: |
| ValueError: A task request contains an unknown HTTP method type. |
| """ |
| tasks = [] |
| now = datetime.datetime.utcnow() |
| |
| for _, _, task_response in self._sorted_by_eta: |
| tasks.append(QueryTasksResponseToDict( |
| self.queue_name, task_response, now)) |
| return tasks |
| |
| @_WithLock |
| def GetTaskAsDict(self, task_name): |
| """Gets a specific task from this queue. |
| |
| Returns: |
| A dictionary containing one task's attributes. E.g. |
| [{'name': 'task-123', |
| 'queue_name': 'default', |
| 'url': '/update', |
| 'method': 'GET', |
| 'eta': '2009/02/02 05:37:42', |
| 'eta_delta': '0:00:06.342511 ago', |
| 'body': '', |
| 'headers': [('user-header', 'some-value') |
| ('X-AppEngine-QueueName': 'update-queue'), |
| ('X-AppEngine-TaskName': 'task-123'), |
| ('X-AppEngine-TaskExecutionCount': '1'), |
| ('X-AppEngine-TaskRetryCount': '1'), |
| ('X-AppEngine-TaskETA': '1234567890.123456'), |
| ('X-AppEngine-Development-Payload': '1'), |
| ('X-AppEngine-TaskPreviousResponse': '300'), |
| ('Content-Length': 0), |
| ('Content-Type': 'application/octet-stream')] |
| |
| Raises: |
| ValueError: A task request contains an unknown HTTP method type. |
| """ |
| task_responses = self._LookupNoAcquireLock(maximum=1, name=task_name) |
| if not task_responses: |
| return |
| task_response, = task_responses |
| if task_response.task_name() != task_name: |
| return |
| |
| now = datetime.datetime.utcnow() |
| return QueryTasksResponseToDict(self.queue_name, task_response, now) |
| |
| @_WithLock |
| def PurgeQueue(self): |
| """Removes all content from the queue.""" |
| self._sorted_by_name = [] |
| self._sorted_by_eta = [] |
| self._sorted_by_tag = [] |
| |
| @_WithLock |
| def _GetTasks(self): |
| """Helper method for tests returning all tasks sorted by eta. |
| |
| Returns: |
| A list of taskqueue_service_pb.TaskQueueQueryTasksResponse_Task objects |
| sorted by eta. |
| """ |
| return self._GetTasksNoAcquireLock() |
| |
| def _GetTasksNoAcquireLock(self): |
| """Helper method for tests returning all tasks sorted by eta. |
| |
| Returns: |
| A list of taskqueue_service_pb.TaskQueueQueryTasksResponse_Task objects |
| sorted by eta. |
| """ |
| assert self._lock.locked() |
| tasks = [] |
| for eta, task_name, task in self._sorted_by_eta: |
| tasks.append(task) |
| return tasks |
| |
| def _InsertTask(self, task): |
| """Insert a task into the store, keeps lists sorted. |
| |
| Args: |
| task: the new task. |
| """ |
| assert self._lock.locked() |
| eta = task.eta_usec() |
| name = task.task_name() |
| bisect.insort_left(self._sorted_by_eta, (eta, name, task)) |
| if task.has_tag(): |
| bisect.insort_left(self._sorted_by_tag, (task.tag(), eta, name, task)) |
| bisect.insort_left(self._sorted_by_name, (name, task)) |
| self.task_name_archive.add(name) |
| |
| @_WithLock |
| def RunTaskNow(self, task): |
| """Change the eta of a task to now. |
| |
| Args: |
| task: The TaskQueueQueryTasksResponse_Task run now. This must be |
| stored in this queue (otherwise an AssertionError is raised). |
| """ |
| self._PostponeTaskNoAcquireLock(task, 0, increase_retries=False) |
| |
| @_WithLock |
| def PostponeTask(self, task, new_eta_usec): |
| """Postpone the task to a future time and increment the retry count. |
| |
| Args: |
| task: The TaskQueueQueryTasksResponse_Task to postpone. This must be |
| stored in this queue (otherwise an AssertionError is raised). |
| new_eta_usec: The new eta to set on the task. This must be greater then |
| the current eta on the task. |
| """ |
| assert new_eta_usec > task.eta_usec() |
| self._PostponeTaskNoAcquireLock(task, new_eta_usec) |
| |
| def _PostponeTaskNoAcquireLock(self, task, new_eta_usec, |
| increase_retries=True): |
| assert self._lock.locked() |
| if increase_retries: |
| self._IncRetryCount(task) |
| name = task.task_name() |
| eta = task.eta_usec() |
| assert self._RemoveTaskFromIndex( |
| self._sorted_by_eta, (eta, name, None), task) |
| if task.has_tag(): |
| assert self._RemoveTaskFromIndex( |
| self._sorted_by_tag, (task.tag(), eta, name, None), task) |
| self._PostponeTaskInsertOnly(task, new_eta_usec) |
| |
| def _PostponeTaskInsertOnly(self, task, new_eta_usec): |
| assert self._lock.locked() |
| task.set_eta_usec(new_eta_usec) |
| name = task.task_name() |
| bisect.insort_left(self._sorted_by_eta, (new_eta_usec, name, task)) |
| if task.has_tag(): |
| tag = task.tag() |
| bisect.insort_left(self._sorted_by_tag, (tag, new_eta_usec, name, task)) |
| |
| @_WithLock |
| def Lookup(self, maximum, name=None, eta=None): |
| """Lookup a number of sorted tasks from the store. |
| |
| If 'eta' is specified, the tasks are looked up in a list sorted by 'eta', |
| then 'name'. Otherwise they are sorted by 'name'. We need to be able to |
| sort by 'eta' and 'name' because tasks can have identical eta. If you had |
| 20 tasks with the same ETA, you wouldn't be able to page past them, since |
| the 'next eta' would give the first one again. Names are unique, though. |
| |
| Args: |
| maximum: the maximum number of tasks to return. |
| name: a task name to start with. |
| eta: an eta to start with. |
| |
| Returns: |
| A list of up to 'maximum' tasks. |
| |
| Raises: |
| ValueError: if the task store gets corrupted. |
| """ |
| return self._LookupNoAcquireLock(maximum, name, eta) |
| |
| def _IndexScan(self, index, start_key, end_key=None, max_rows=None): |
| """Return the result of a 'scan' over the given index. |
| |
| The scan is inclusive of start_key and exclusive of end_key. It returns at |
| most max_rows from the index. |
| |
| Args: |
| index: One of the index lists, eg self._sorted_by_tag. |
| start_key: The key to start at. |
| end_key: Optional end key. |
| max_rows: The maximum number of rows to yield. |
| |
| Returns: |
| a list of up to 'max_rows' TaskQueueQueryTasksResponse_Task instances from |
| the given index, in sorted order. |
| """ |
| assert self._lock.locked() |
| |
| start_pos = bisect.bisect_left(index, start_key) |
| end_pos = INF |
| if end_key is not None: |
| end_pos = bisect.bisect_left(index, end_key) |
| if max_rows is not None: |
| end_pos = min(end_pos, start_pos + max_rows) |
| end_pos = min(end_pos, len(index)) |
| |
| tasks = [] |
| for pos in xrange(start_pos, end_pos): |
| tasks.append(index[pos][-1]) |
| return tasks |
| |
| def _LookupNoAcquireLock(self, maximum, name=None, eta=None, tag=None): |
| assert self._lock.locked() |
| if tag is not None: |
| |
| return self._IndexScan(self._sorted_by_tag, |
| start_key=(tag, eta, name,), |
| end_key=('%s\x00' % tag, None, None,), |
| max_rows=maximum) |
| elif eta is not None: |
| |
| return self._IndexScan(self._sorted_by_eta, |
| start_key=(eta, name,), |
| max_rows=maximum) |
| else: |
| |
| return self._IndexScan(self._sorted_by_name, |
| start_key=(name,), |
| max_rows=maximum) |
| |
| @_WithLock |
| def Count(self): |
| """Returns the number of tasks in the store.""" |
| return len(self._sorted_by_name) |
| |
| @_WithLock |
| def OldestTask(self): |
| """Returns the task with the oldest eta in the store.""" |
| if self._sorted_by_eta: |
| return self._sorted_by_eta[0][2] |
| return None |
| |
| @_WithLock |
| def Oldest(self): |
| """Returns the oldest eta in the store, or None if no tasks.""" |
| if self._sorted_by_eta: |
| return self._sorted_by_eta[0][0] |
| return None |
| |
| def _LocateTaskByName(self, task_name): |
| """Locate the index of a task in _sorted_by_name list. |
| |
| If the task does not exist in the list, return None. |
| |
| Args: |
| task_name: Name of task to be located. |
| |
| Returns: |
| Index of the task in _sorted_by_name list if task exists, |
| None otherwise. |
| """ |
| assert self._lock.locked() |
| pos = bisect.bisect_left(self._sorted_by_name, (task_name,)) |
| if (pos >= len(self._sorted_by_name) or |
| self._sorted_by_name[pos][0] != task_name): |
| return None |
| return pos |
| |
| @_WithLock |
| def Add(self, request, now): |
| """Inserts a new task into the store. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueAddRequest. |
| now: A datetime.datetime object containing the current time in UTC. |
| |
| Raises: |
| apiproxy_errors.ApplicationError: If a task with the same name is already |
| in the store, or the task is tombstoned. |
| """ |
| |
| if self._LocateTaskByName(request.task_name()) is not None: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS) |
| if request.task_name() in self.task_name_archive: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK) |
| |
| now_sec = calendar.timegm(now.utctimetuple()) |
| task = taskqueue_service_pb.TaskQueueQueryTasksResponse_Task() |
| task.set_task_name(request.task_name()) |
| task.set_eta_usec(request.eta_usec()) |
| task.set_creation_time_usec(_SecToUsec(now_sec)) |
| task.set_retry_count(0) |
| task.set_method(request.method()) |
| |
| if request.has_url(): |
| task.set_url(request.url()) |
| for keyvalue in request.header_list(): |
| header = task.add_header() |
| header.set_key(keyvalue.key()) |
| header.set_value(keyvalue.value()) |
| if request.has_description(): |
| task.set_description(request.description()) |
| if request.has_body(): |
| task.set_body(request.body()) |
| if request.has_crontimetable(): |
| task.mutable_crontimetable().set_schedule( |
| request.crontimetable().schedule()) |
| task.mutable_crontimetable().set_timezone( |
| request.crontimetable().timezone()) |
| if request.has_retry_parameters(): |
| task.mutable_retry_parameters().CopyFrom(request.retry_parameters()) |
| if request.has_tag(): |
| task.set_tag(request.tag()) |
| self._InsertTask(task) |
| |
| @_WithLock |
| def Delete(self, name): |
| """Deletes a task from the store by name. |
| |
| Args: |
| name: the name of the task to delete. |
| |
| Returns: |
| TaskQueueServiceError.UNKNOWN_TASK: if the task is unknown. |
| TaskQueueServiceError.INTERNAL_ERROR: if the store is corrupted. |
| TaskQueueServiceError.TOMBSTONED: if the task was deleted. |
| TaskQueueServiceError.OK: otherwise. |
| """ |
| return self._DeleteNoAcquireLock(name) |
| |
| def _RemoveTaskFromIndex(self, index, index_tuple, task): |
| """Remove a task from the specified index. |
| |
| Args: |
| index: The index list that needs to be mutated. |
| index_tuple: The tuple to search for in the index. |
| task: The task instance that is expected to be stored at this location. |
| |
| Returns: |
| True if the task was successfully removed from the index, False otherwise. |
| """ |
| assert self._lock.locked() |
| pos = bisect.bisect_left(index, index_tuple) |
| if index[pos][-1] is not task: |
| logging.debug('Expected %s, found %s', task, index[pos][-1]) |
| return False |
| index.pop(pos) |
| return True |
| |
| def _DeleteNoAcquireLock(self, name): |
| assert self._lock.locked() |
| pos = self._LocateTaskByName(name) |
| if pos is None: |
| if name in self.task_name_archive: |
| return taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK |
| else: |
| return taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK |
| |
| old_task = self._sorted_by_name.pop(pos)[-1] |
| |
| |
| eta = old_task.eta_usec() |
| if not self._RemoveTaskFromIndex( |
| self._sorted_by_eta, (eta, name, None), old_task): |
| return taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR |
| |
| |
| if old_task.has_tag(): |
| tag = old_task.tag() |
| if not self._RemoveTaskFromIndex( |
| self._sorted_by_tag, (tag, eta, name, None), old_task): |
| return taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR |
| |
| return taskqueue_service_pb.TaskQueueServiceError.OK |
| |
| @_WithLock |
| def Populate(self, num_tasks): |
| """Populates the store with a number of tasks. |
| |
| Args: |
| num_tasks: the number of tasks to insert. |
| """ |
| |
| def RandomTask(): |
| """Creates a new task and randomly populates values.""" |
| assert self._lock.locked() |
| task = taskqueue_service_pb.TaskQueueQueryTasksResponse_Task() |
| task.set_task_name(''.join(random.choice(string.ascii_lowercase) |
| for x in range(20))) |
| |
| task.set_eta_usec(now_usec + random.randint(_SecToUsec(-10), |
| _SecToUsec(600))) |
| |
| |
| |
| task.set_creation_time_usec(min(now_usec, task.eta_usec()) - |
| random.randint(0, _SecToUsec(20))) |
| |
| task.set_url(random.choice(['/a', '/b', '/c', '/d'])) |
| if random.random() < 0.2: |
| task.set_method( |
| taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.POST) |
| task.set_body('A' * 2000) |
| else: |
| task.set_method( |
| taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.GET) |
| retry_count = max(0, random.randint(-10, 5)) |
| task.set_retry_count(retry_count) |
| task.set_execution_count(retry_count) |
| if random.random() < 0.3: |
| random_headers = [('nexus', 'one'), |
| ('foo', 'bar'), |
| ('content-type', 'text/plain'), |
| ('from', 'user@email.com')] |
| for _ in xrange(random.randint(1, 4)): |
| elem = random.randint(0, len(random_headers) - 1) |
| key, value = random_headers.pop(elem) |
| header_proto = task.add_header() |
| header_proto.set_key(key) |
| header_proto.set_value(value) |
| return task |
| |
| now_usec = _SecToUsec(time.time()) |
| for _ in range(num_tasks): |
| self._InsertTask(RandomTask()) |
| |
| |
| class _TaskExecutor(object): |
| """Executor for a task object. |
| |
| Converts a TaskQueueQueryTasksResponse_Task into a http request, then uses the |
| httplib library to send it to the http server. |
| """ |
| |
| def __init__(self, default_host, request_data): |
| """Constructor. |
| |
| Args: |
| default_host: a string to use as the host/port to connect to if the host |
| header is not specified in the task. |
| request_data: A request_info.RequestInfo instance used to look up state |
| associated with the request that generated an API call. |
| """ |
| self._default_host = default_host |
| self._request_data = request_data |
| |
| def _HeadersFromTask(self, task, queue): |
| """Constructs the http headers for the given task. |
| |
| This function will remove special headers (values in BUILT_IN_HEADERS) and |
| add the taskqueue headers. |
| |
| Args: |
| task: The task, a TaskQueueQueryTasksResponse_Task instance. |
| queue: The queue that this task belongs to, an _Queue instance. |
| |
| Returns: |
| A list of tuples containing the http header and value. There |
| may be be mutiple entries with the same key. |
| """ |
| headers = [] |
| for header in task.header_list(): |
| header_key_lower = header.key().lower() |
| |
| if header_key_lower == 'host' and queue.target is not None: |
| headers.append( |
| (header.key(), '.'.join([queue.target, self._default_host]))) |
| elif header_key_lower not in BUILT_IN_HEADERS: |
| headers.append((header.key(), header.value())) |
| |
| |
| headers.append(('X-AppEngine-QueueName', queue.queue_name)) |
| headers.append(('X-AppEngine-TaskName', task.task_name())) |
| headers.append(('X-AppEngine-TaskRetryCount', str(task.retry_count()))) |
| headers.append(('X-AppEngine-TaskETA', |
| str(_UsecToSec(task.eta_usec())))) |
| headers.append(('X-AppEngine-Fake-Is-Admin', '1')) |
| headers.append(('Content-Length', str(len(task.body())))) |
| if (task.has_body() and 'content-type' not in |
| [key.lower() for key, _ in headers]): |
| headers.append(('Content-Type', 'application/octet-stream')) |
| headers.append(('X-AppEngine-TaskExecutionCount', |
| str(task.execution_count()))) |
| if task.has_runlog() and task.runlog().has_response_code(): |
| headers.append(('X-AppEngine-TaskPreviousResponse', |
| str(task.runlog().response_code()))) |
| |
| return headers |
| |
| def ExecuteTask(self, task, queue): |
| """Construct a http request from the task and dispatch it. |
| |
| Args: |
| task: The task to convert to a http request and then send. An instance of |
| taskqueue_service_pb.TaskQueueQueryTasksResponse_Task |
| queue: The queue that this task belongs to. An instance of _Queue. |
| |
| Returns: |
| Http Response code from the task's execution, 0 if an exception occurred. |
| """ |
| method = task.RequestMethod_Name(task.method()) |
| headers = self._HeadersFromTask(task, queue) |
| dispatcher = self._request_data.get_dispatcher() |
| try: |
| response = dispatcher.add_request(method, task.url(), headers, |
| task.body() if task.has_body() else '', |
| '0.1.0.2') |
| except request_info.ModuleDoesNotExistError: |
| logging.exception('Failed to dispatch task') |
| return 0 |
| return int(response.status.split(' ', 1)[0]) |
| |
| |
| class _BackgroundTaskScheduler(object): |
| """The task scheduler class. |
| |
| This class is designed to be run in a background thread. |
| |
| Note: There must not be more than one instance of _BackgroundTaskScheduler per |
| group. |
| """ |
| |
| def __init__(self, group, task_executor, retry_seconds, **kwargs): |
| """Constructor. |
| |
| Args: |
| group: The group that we will automatically execute tasks from. Must be an |
| instance of _Group. |
| task_executor: The class used to convert a task into a http request. Must |
| be an instance of _TaskExecutor. |
| retry_seconds: The number of seconds to delay a task by if its execution |
| fails. |
| _get_time: a callable that returns the current time in seconds since the |
| epoch. This argument may only be passed in by keyword. If unset, use |
| time.time. |
| """ |
| self._group = group |
| self._should_exit = False |
| self._next_wakeup = INF |
| self._event = threading.Event() |
| self._wakeup_lock = threading.Lock() |
| self.task_executor = task_executor |
| self.default_retry_seconds = retry_seconds |
| |
| self._get_time = kwargs.pop('_get_time', time.time) |
| if kwargs: |
| raise TypeError('Unknown parameters: %s' % ', '.join(kwargs)) |
| |
| def UpdateNextEventTime(self, next_event_time): |
| """Notify the TaskExecutor of the closest event it needs to process. |
| |
| Args: |
| next_event_time: The time of the event in seconds since the epoch. |
| """ |
| with self._wakeup_lock: |
| if next_event_time < self._next_wakeup: |
| self._next_wakeup = next_event_time |
| self._event.set() |
| |
| def Shutdown(self): |
| """Request this TaskExecutor to exit.""" |
| self._should_exit = True |
| self._event.set() |
| |
| def _ProcessQueues(self): |
| with self._wakeup_lock: |
| self._next_wakeup = INF |
| |
| now = self._get_time() |
| queue, task = self._group.GetNextPushTask() |
| while task and _UsecToSec(task.eta_usec()) <= now: |
| if task.retry_count() == 0: |
| task.set_first_try_usec(_SecToUsec(now)) |
| |
| response_code = self.task_executor.ExecuteTask(task, queue) |
| if response_code: |
| task.mutable_runlog().set_response_code(response_code) |
| else: |
| logging.error( |
| 'An error occured while sending the task "%s" ' |
| '(Url: "%s") in queue "%s". Treating as a task error.', |
| task.task_name(), task.url(), queue.queue_name) |
| |
| |
| |
| |
| now = self._get_time() |
| if 200 <= response_code < 300: |
| queue.Delete(task.task_name()) |
| else: |
| retry = Retry(task, queue) |
| age_usec = _SecToUsec(now) - task.first_try_usec() |
| if retry.CanRetry(task.retry_count() + 1, age_usec): |
| retry_usec = retry.CalculateBackoffUsec(task.retry_count() + 1) |
| logging.warning( |
| 'Task %s failed to execute. This task will retry in %.3f seconds', |
| task.task_name(), _UsecToSec(retry_usec)) |
| |
| |
| |
| queue.PostponeTask(task, _SecToUsec(now) + retry_usec) |
| else: |
| logging.warning( |
| 'Task %s failed to execute. The task has no remaining retries. ' |
| 'Failing permanently after %d retries and %d seconds', |
| task.task_name(), task.retry_count(), _UsecToSec(age_usec)) |
| queue.Delete(task.task_name()) |
| queue, task = self._group.GetNextPushTask() |
| |
| if task: |
| with self._wakeup_lock: |
| eta = _UsecToSec(task.eta_usec()) |
| if eta < self._next_wakeup: |
| self._next_wakeup = eta |
| |
| def _Wait(self): |
| """Block until we need to process a task or we need to exit.""" |
| |
| |
| now = self._get_time() |
| while not self._should_exit and self._next_wakeup > now: |
| timeout = self._next_wakeup - now |
| self._event.wait(timeout) |
| self._event.clear() |
| now = self._get_time() |
| |
| def MainLoop(self): |
| """The main loop of the scheduler.""" |
| while not self._should_exit: |
| self._ProcessQueues() |
| self._Wait() |
| |
| |
| class TaskQueueServiceStub(apiproxy_stub.APIProxyStub): |
| """Python only task queue service stub. |
| |
| This stub executes tasks when enabled by using the dev_appserver's AddEvent |
| capability. When task running is disabled this stub will store tasks for |
| display on a console, where the user may manually execute the tasks. |
| """ |
| |
| def __init__(self, |
| service_name='taskqueue', |
| root_path=None, |
| auto_task_running=False, |
| task_retry_seconds=30, |
| _all_queues_valid=False, |
| default_http_server='localhost', |
| _testing_validate_state=False, |
| request_data=None): |
| """Constructor. |
| |
| Args: |
| service_name: Service name expected for all calls. |
| root_path: Root path to the directory of the application which may contain |
| a queue.yaml file. If None, then it's assumed no queue.yaml file is |
| available. |
| auto_task_running: When True, the dev_appserver should automatically |
| run tasks after they are enqueued. |
| task_retry_seconds: How long to wait between task executions after a |
| task fails. |
| _testing_validate_state: Should this stub and all of its _Groups (and |
| thus and all of its _Queues) validate their state after each |
| operation? This should only be used during testing of the |
| taskqueue_stub. |
| request_data: A request_info.RequestInfo instance used to look up state |
| associated with the request that generated an API call. |
| """ |
| super(TaskQueueServiceStub, self).__init__( |
| service_name, max_request_size=MAX_REQUEST_SIZE, |
| request_data=request_data) |
| |
| |
| self._queues = {} |
| |
| |
| |
| |
| |
| self._all_queues_valid = _all_queues_valid |
| |
| self._root_path = root_path |
| self._testing_validate_state = _testing_validate_state |
| |
| |
| self._queues[None] = _Group( |
| self._ParseQueueYaml, app_id=None, |
| _all_queues_valid=_all_queues_valid, |
| _update_newest_eta=self._UpdateNextEventTime, |
| _testing_validate_state=self._testing_validate_state) |
| |
| self._auto_task_running = auto_task_running |
| self._started = False |
| |
| self._task_scheduler = _BackgroundTaskScheduler( |
| self._queues[None], _TaskExecutor(default_http_server, |
| self.request_data), |
| retry_seconds=task_retry_seconds) |
| self._yaml_last_modified = None |
| |
| def StartBackgroundExecution(self): |
| """Start automatic task execution.""" |
| if not self._started and self._auto_task_running: |
| task_scheduler_thread = threading.Thread( |
| target=self._task_scheduler.MainLoop) |
| task_scheduler_thread.setDaemon(True) |
| task_scheduler_thread.start() |
| self._started = True |
| |
| def Shutdown(self): |
| """Requests the task scheduler to shutdown.""" |
| self._task_scheduler.Shutdown() |
| |
| def _ParseQueueYaml(self): |
| """Loads the queue.yaml file and parses it. |
| |
| Returns: |
| None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object |
| populated from the queue.yaml. |
| """ |
| if hasattr(self, 'queue_yaml_parser'): |
| |
| return self.queue_yaml_parser(self._root_path) |
| |
| |
| |
| if self._root_path is None: |
| return None |
| for queueyaml in ('queue.yaml', 'queue.yml'): |
| try: |
| path = os.path.join(self._root_path, queueyaml) |
| modified = os.stat(path).st_mtime |
| if self._yaml_last_modified and self._yaml_last_modified == modified: |
| return self._last_queue_info |
| fh = open(path, 'r') |
| except (IOError, OSError): |
| continue |
| try: |
| queue_info = queueinfo.LoadSingleQueue(fh) |
| self._last_queue_info = queue_info |
| self._yaml_last_modified = modified |
| return queue_info |
| finally: |
| fh.close() |
| return None |
| |
| def _UpdateNextEventTime(self, callback_time): |
| """Enqueue a task to be automatically scheduled. |
| |
| Note: If auto task running is disabled, this function is a no-op. |
| |
| Args: |
| callback_time: The earliest time this task may be run, in seconds since |
| the epoch. |
| """ |
| self._task_scheduler.UpdateNextEventTime(callback_time) |
| |
| def _GetGroup(self, app_id=None): |
| """Get the _Group instance for app_id, creating a new one if needed. |
| |
| Args: |
| app_id: The app id in question. Note: This field is not validated. |
| """ |
| if app_id not in self._queues: |
| self._queues[app_id] = _Group( |
| app_id=app_id, _all_queues_valid=self._all_queues_valid, |
| _testing_validate_state=self._testing_validate_state) |
| return self._queues[app_id] |
| |
| def _Dynamic_Add(self, request, response): |
| """Add a single task to a queue. |
| |
| This method is a wrapper around the BulkAdd RPC request. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: The taskqueue_service_pb.TaskQueueAddRequest. See |
| taskqueue_service.proto. |
| response: The taskqueue_service_pb.TaskQueueAddResponse. See |
| taskqueue_service.proto. |
| """ |
| bulk_request = taskqueue_service_pb.TaskQueueBulkAddRequest() |
| bulk_response = taskqueue_service_pb.TaskQueueBulkAddResponse() |
| |
| bulk_request.add_add_request().CopyFrom(request) |
| self._Dynamic_BulkAdd(bulk_request, bulk_response) |
| |
| assert bulk_response.taskresult_size() == 1 |
| result = bulk_response.taskresult(0).result() |
| |
| if result != taskqueue_service_pb.TaskQueueServiceError.OK: |
| raise apiproxy_errors.ApplicationError(result) |
| elif bulk_response.taskresult(0).has_chosen_task_name(): |
| response.set_chosen_task_name( |
| bulk_response.taskresult(0).chosen_task_name()) |
| |
| def _Dynamic_BulkAdd(self, request, response): |
| """Add many tasks to a queue using a single request. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: The taskqueue_service_pb.TaskQueueBulkAddRequest. See |
| taskqueue_service.proto. |
| response: The taskqueue_service_pb.TaskQueueBulkAddResponse. See |
| taskqueue_service.proto. |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| assert request.add_request_size(), 'taskqueue should prevent empty requests' |
| self._GetGroup(_GetAppId(request.add_request(0))).BulkAdd_Rpc( |
| request, response) |
| |
| def GetQueues(self): |
| """Gets all the application's queues. |
| |
| Returns: |
| A list of dictionaries, where each dictionary contains one queue's |
| attributes. E.g.: |
| [{'name': 'some-queue', |
| 'max_rate': '1/s', |
| 'bucket_size': 5, |
| 'oldest_task': '2009/02/02 05:37:42', |
| 'eta_delta': '0:00:06.342511 ago', |
| 'tasks_in_queue': 12}, ...] |
| The list of queues always includes the default queue. |
| """ |
| return self._GetGroup().GetQueuesAsDicts() |
| |
| def GetTasks(self, queue_name): |
| """Gets a queue's tasks. |
| |
| Args: |
| queue_name: Queue's name to return tasks for. |
| |
| Returns: |
| A list of dictionaries, where each dictionary contains one task's |
| attributes. E.g. |
| [{'name': 'task-123', |
| 'queue_name': 'default', |
| 'url': '/update', |
| 'method': 'GET', |
| 'eta': '2009/02/02 05:37:42', |
| 'eta_delta': '0:00:06.342511 ago', |
| 'body': '', |
| 'headers': [('user-header', 'some-value') |
| ('X-AppEngine-QueueName': 'update-queue'), |
| ('X-AppEngine-TaskName': 'task-123'), |
| ('X-AppEngine-TaskRetryCount': '0'), |
| ('X-AppEngine-TaskETA': '1234567890.123456'), |
| ('X-AppEngine-Development-Payload': '1'), |
| ('Content-Length': 0), |
| ('Content-Type': 'application/octet-stream')] |
| |
| Raises: |
| ValueError: A task request contains an unknown HTTP method type. |
| KeyError: An invalid queue name was specified. |
| """ |
| return self._GetGroup().GetQueue(queue_name).GetTasksAsDicts() |
| |
| def DeleteTask(self, queue_name, task_name): |
| """Deletes a task from a queue, without leaving a tombstone. |
| |
| Args: |
| queue_name: the name of the queue to delete the task from. |
| task_name: the name of the task to delete. |
| """ |
| if self._GetGroup().HasQueue(queue_name): |
| queue = self._GetGroup().GetQueue(queue_name) |
| queue.Delete(task_name) |
| queue.task_name_archive.discard(task_name) |
| |
| def FlushQueue(self, queue_name): |
| """Removes all tasks from a queue, without leaving tombstones. |
| |
| Args: |
| queue_name: the name of the queue to remove tasks from. |
| """ |
| if self._GetGroup().HasQueue(queue_name): |
| self._GetGroup().GetQueue(queue_name).PurgeQueue() |
| self._GetGroup().GetQueue(queue_name).task_name_archive.clear() |
| |
| def _Dynamic_UpdateQueue(self, request, unused_response): |
| """Local implementation of the UpdateQueue RPC in TaskQueueService. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest. |
| unused_response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse. |
| Not used. |
| """ |
| self._GetGroup(_GetAppId(request)).UpdateQueue_Rpc(request, unused_response) |
| |
| def _Dynamic_FetchQueues(self, request, response): |
| """Local implementation of the FetchQueues RPC in TaskQueueService. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest. |
| response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse. |
| """ |
| self._GetGroup(_GetAppId(request)).FetchQueues_Rpc(request, response) |
| |
| def _Dynamic_FetchQueueStats(self, request, response): |
| """Local 'random' implementation of the TaskQueueService.FetchQueueStats. |
| |
| This implementation loads some stats from the task store, the rest with |
| random numbers. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest. |
| response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse. |
| """ |
| self._GetGroup(_GetAppId(request)).FetchQueueStats_Rpc(request, response) |
| |
| def _Dynamic_QueryTasks(self, request, response): |
| """Local implementation of the TaskQueueService.QueryTasks RPC. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueQueryTasksRequest. |
| response: A taskqueue_service_pb.TaskQueueQueryTasksResponse. |
| """ |
| self._GetGroup(_GetAppId(request)).QueryTasks_Rpc(request, response) |
| |
| def _Dynamic_FetchTask(self, request, response): |
| """Local implementation of the TaskQueueService.FetchTask RPC. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueFetchTaskRequest. |
| response: A taskqueue_service_pb.TaskQueueFetchTaskResponse. |
| """ |
| self._GetGroup(_GetAppId(request)).FetchTask_Rpc(request, response) |
| |
| def _Dynamic_Delete(self, request, response): |
| """Local delete implementation of TaskQueueService.Delete. |
| |
| Deletes tasks from the task store. A 1/20 chance of a transient error. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueDeleteRequest. |
| response: A taskqueue_service_pb.TaskQueueDeleteResponse. |
| """ |
| self._GetGroup(_GetAppId(request)).Delete_Rpc(request, response) |
| |
| def _Dynamic_ForceRun(self, request, response): |
| """Local force run implementation of TaskQueueService.ForceRun. |
| |
| Forces running of a task in a queue. This will fail randomly for testing if |
| the app id is non-empty. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueForceRunRequest. |
| response: A taskqueue_service_pb.TaskQueueForceRunResponse. |
| """ |
| if _GetAppId(request) is not None: |
| |
| if random.random() <= 0.05: |
| response.set_result( |
| taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR) |
| elif random.random() <= 0.052: |
| response.set_result( |
| taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR) |
| else: |
| response.set_result( |
| taskqueue_service_pb.TaskQueueServiceError.OK) |
| else: |
| group = self._GetGroup(None) |
| if not group.HasQueue(request.queue_name()): |
| response.set_result( |
| taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE) |
| return |
| queue = group.GetQueue(request.queue_name()) |
| task = queue.Lookup(1, name=request.task_name()) |
| if not task: |
| response.set_result( |
| taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK) |
| return |
| queue.RunTaskNow(task[0]) |
| self._UpdateNextEventTime(0) |
| response.set_result( |
| taskqueue_service_pb.TaskQueueServiceError.OK) |
| |
| def _Dynamic_DeleteQueue(self, request, response): |
| """Local delete implementation of TaskQueueService.DeleteQueue. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueDeleteQueueRequest. |
| response: A taskqueue_service_pb.TaskQueueDeleteQueueResponse. |
| """ |
| app_id = _GetAppId(request) |
| if app_id is None: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED) |
| self._GetGroup(app_id).DeleteQueue_Rpc(request, response) |
| |
| def _Dynamic_PauseQueue(self, request, response): |
| """Local pause implementation of TaskQueueService.PauseQueue. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueuePauseQueueRequest. |
| response: A taskqueue_service_pb.TaskQueuePauseQueueResponse. |
| """ |
| app_id = _GetAppId(request) |
| if app_id is None: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED) |
| self._GetGroup(app_id).PauseQueue_Rpc(request, response) |
| |
| def _Dynamic_PurgeQueue(self, request, response): |
| """Local purge implementation of TaskQueueService.PurgeQueue. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueuePurgeQueueRequest. |
| response: A taskqueue_service_pb.TaskQueuePurgeQueueResponse. |
| """ |
| |
| self._GetGroup(_GetAppId(request)).PurgeQueue_Rpc(request, response) |
| |
| def _Dynamic_DeleteGroup(self, request, response): |
| """Local delete implementation of TaskQueueService.DeleteGroup. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueDeleteGroupRequest. |
| response: A taskqueue_service_pb.TaskQueueDeleteGroupResponse. |
| """ |
| app_id = _GetAppId(request) |
| if app_id is None: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED) |
| |
| if app_id in self._queues: |
| del self._queues[app_id] |
| else: |
| |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE) |
| |
| def _Dynamic_UpdateStorageLimit(self, request, response): |
| """Local implementation of TaskQueueService.UpdateStorageLimit. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueUpdateStorageLimitRequest. |
| response: A taskqueue_service_pb.TaskQueueUpdateStorageLimitResponse. |
| """ |
| if _GetAppId(request) is None: |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED) |
| |
| if request.limit() < 0 or request.limit() > 1000 * (1024 ** 4): |
| raise apiproxy_errors.ApplicationError( |
| taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST) |
| |
| response.set_new_limit(request.limit()) |
| |
| def _Dynamic_QueryAndOwnTasks(self, request, response): |
| """Local implementation of TaskQueueService.QueryAndOwnTasks. |
| |
| Must adhere to the '_Dynamic_' naming convention for stubbing to work. |
| See taskqueue_service.proto for a full description of the RPC. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksRequest. |
| response: A taskqueue_service_pb.TaskQueueQueryAndOwnTasksResponse. |
| |
| Raises: |
| InvalidQueueModeError: If target queue is not a pull queue. |
| """ |
| |
| |
| |
| |
| |
| self._GetGroup().QueryAndOwnTasks_Rpc(request, response) |
| |
| def _Dynamic_ModifyTaskLease(self, request, response): |
| """Local implementation of TaskQueueService.ModifyTaskLease. |
| |
| Args: |
| request: A taskqueue_service_pb.TaskQueueModifyTaskLeaseRequest. |
| response: A taskqueue_service_pb.TaskQueueModifyTaskLeaseResponse. |
| |
| Raises: |
| InvalidQueueModeError: If target queue is not a pull queue. |
| """ |
| |
| self._GetGroup().ModifyTaskLease_Rpc(request, response) |
| |
| |
| |
| |
| |
| def get_filtered_tasks(self, url=None, name=None, queue_names=None): |
| """Get the tasks in the task queue with filters. |
| |
| Args: |
| url: A URL that all returned tasks should point at. |
| name: The name of all returned tasks. |
| queue_names: A list of queue names to retrieve tasks from. If left blank |
| this will get default to all queues available. |
| |
| Returns: |
| A list of taskqueue.Task objects. |
| """ |
| all_queue_names = [queue['name'] for queue in self.GetQueues()] |
| |
| |
| if isinstance(queue_names, basestring): |
| queue_names = [queue_names] |
| |
| |
| if queue_names is None: |
| queue_names = all_queue_names |
| |
| |
| task_dicts = [] |
| for queue_name in queue_names: |
| if queue_name in all_queue_names: |
| for task in self.GetTasks(queue_name): |
| if url is not None and task['url'] != url: |
| continue |
| if name is not None and task['name'] != name: |
| continue |
| task_dicts.append(task) |
| |
| tasks = [] |
| for task in task_dicts: |
| |
| payload = base64.b64decode(task['body']) |
| |
| headers = dict(task['headers']) |
| headers['Content-Length'] = str(len(payload)) |
| |
| |
| eta = datetime.datetime.strptime(task['eta'], '%Y/%m/%d %H:%M:%S') |
| eta = eta.replace(tzinfo=taskqueue._UTC) |
| |
| task_object = taskqueue.Task(name=task['name'], method=task['method'], |
| url=task['url'], headers=headers, |
| payload=payload, eta=eta) |
| tasks.append(task_object) |
| return tasks |