blob: 5ba67af7e55f9604257557ec826180a5819dfcaf [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Shared utilities for dealing with taskqueue."""
import datetime
from google.appengine.api import apiproxy_stub_map
from google.appengine.api.taskqueue import taskqueue_service_pb
def _human_readable_eta(eta_usec):
eta = datetime.datetime.utcfromtimestamp(eta_usec / 1e6)
return eta.strftime('%Y/%m/%d %H:%M:%S')
def _timedelta_without_microseconds(delta):
return datetime.timedelta(days=delta.days, seconds=delta.seconds)
def _human_readable_eta_delta(eta_usec):
now = datetime.datetime.utcnow()
eta = datetime.datetime.utcfromtimestamp(eta_usec / 1e6)
if eta > now:
return '%s from now' % _timedelta_without_microseconds(eta - now)
else:
return '%s ago' % _timedelta_without_microseconds(now - eta)
class QueueInfo(object):
"""Represents information about a Queue."""
def __init__(self,
name,
mode,
rate,
bucket_size,
tasks_in_queue,
oldest_eta_usec):
"""Initializer for _QueueInfo.
Args:
name: The name of the queue e.g. "default".
mode: A taskqueue_service_pb.TaskQueueMode constant representing the queue
type e.g. PULL.
rate: The execution rate of the queue as a string e.g. "10/s". May be None
for pull queues.
bucket_size: An int representing the size of the queues token bucket.
tasks_in_queue: The number of tasks in the queue.
oldest_eta_usec: The eta of the oldest task in the queue in microseconds
from the epoch.
"""
self.name = name
self.mode = mode
self.rate = rate
self.bucket_size = bucket_size
self.tasks_in_queue = tasks_in_queue
if oldest_eta_usec == -1: # No non-completed task in the queue.
self.oldest_eta_usec = None
else:
self.oldest_eta_usec = oldest_eta_usec
@property
def human_readable_oldest_task_eta(self):
if self.oldest_eta_usec:
return _human_readable_eta(self.oldest_eta_usec)
else:
return None
@property
def human_readable_oldest_task_eta_delta(self):
if self.oldest_eta_usec:
return _human_readable_eta_delta(self.oldest_eta_usec)
else:
return None
@classmethod
def _from_queue_and_stats(cls, queue, queue_stats):
"""Return a new _QueueInfo given information from the taskqueue service.
Args:
queue: A taskqueue_service_pb.TaskQueueFetchQueuesResponse_Queue instance
containing information about the queue.
queue_stats: A
taskqueue_service_pb.TaskQueueFetchQueueStatsResponse_QueueStats
instance containing information about the queue.
Returns:
A _QueueInfo constructed using the provided queue information.
"""
return cls(queue.queue_name(),
queue.mode(),
queue.user_specified_rate(),
queue.bucket_capacity(),
queue_stats.num_tasks(),
queue_stats.oldest_eta_usec())
@classmethod
def get(cls, queue_names=frozenset()):
"""Returns a 2-tuple: (list of push _QueueInfo, list of pull _QueueInfo)."""
fetch_queue_request = taskqueue_service_pb.TaskQueueFetchQueuesRequest()
fetch_queue_request.set_max_rows(1000)
fetch_queue_response = taskqueue_service_pb.TaskQueueFetchQueuesResponse()
apiproxy_stub_map.MakeSyncCall('taskqueue',
'FetchQueues',
fetch_queue_request,
fetch_queue_response)
queue_stats_request = taskqueue_service_pb.TaskQueueFetchQueueStatsRequest()
for queue in fetch_queue_response.queue_list():
queue_stats_request.add_queue_name(queue.queue_name())
queue_stats_response = (
taskqueue_service_pb.TaskQueueFetchQueueStatsResponse())
apiproxy_stub_map.MakeSyncCall('taskqueue',
'FetchQueueStats',
queue_stats_request,
queue_stats_response)
for queue, queue_stats in zip(
fetch_queue_response.queue_list(),
queue_stats_response.queuestats_list()):
if queue_names and queue.queue_name() not in queue_names:
continue
yield cls._from_queue_and_stats(queue, queue_stats)