blob: bd392399248196dd27e40c97021f0fab15967879 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tests for devappserver2.admin.taskqueue_utils."""
import unittest
import google
import mox
from google.appengine.api import apiproxy_stub_map
from google.appengine.api.taskqueue import taskqueue_service_pb
from google.appengine.tools.devappserver2.admin import taskqueue_utils
class TestQueueInfo(unittest.TestCase):
"""Tests for taskqueue_queues_handler._QueueInfo."""
def setUp(self):
self.mox = mox.Mox()
self.mox.StubOutWithMock(apiproxy_stub_map, 'MakeSyncCall')
def tearDown(self):
self.mox.UnsetStubs()
def test_construction(self):
queue = taskqueue_service_pb.TaskQueueFetchQueuesResponse_Queue()
queue.set_queue_name('queue1')
queue.set_mode(taskqueue_service_pb.TaskQueueMode.PUSH)
queue.set_user_specified_rate('20/s')
queue.set_bucket_capacity(10)
queue_stats = (
taskqueue_service_pb.TaskQueueFetchQueueStatsResponse_QueueStats())
queue_stats.set_num_tasks(25)
queue_stats.set_oldest_eta_usec(123*10**6)
info = taskqueue_utils.QueueInfo._from_queue_and_stats(
queue, queue_stats)
self.assertEqual('queue1', info.name)
self.assertEqual(taskqueue_service_pb.TaskQueueMode.PUSH, info.mode)
self.assertEqual('20/s', info.rate)
self.assertEqual(10, info.bucket_size)
self.assertEqual(25, info.tasks_in_queue)
self.assertEqual(123*10**6, info.oldest_eta_usec)
self.assertEqual('1970/01/01 00:02:03', info.human_readable_oldest_task_eta)
self.assertEqual(mox.Regex(r'\d+ days, \d{1,2}:\d{1,2}:\d{1,2} ago'),
info.human_readable_oldest_task_eta_delta)
def test_get(self):
fetch_queue_request = taskqueue_service_pb.TaskQueueFetchQueuesRequest()
fetch_queue_request.set_max_rows(1000)
fetch_queue_response = taskqueue_service_pb.TaskQueueFetchQueuesResponse()
queue1 = fetch_queue_response.add_queue()
queue1.set_queue_name('queue1')
queue1.set_mode(taskqueue_service_pb.TaskQueueMode.PUSH)
queue1.set_user_specified_rate('20/s')
queue1.set_bucket_capacity(10)
queue2 = fetch_queue_response.add_queue()
queue2.set_queue_name('queue2')
queue2.set_mode(taskqueue_service_pb.TaskQueueMode.PULL)
queue2.set_user_specified_rate('20/s')
queue2.set_bucket_capacity(10)
apiproxy_stub_map.MakeSyncCall(
'taskqueue',
'FetchQueues',
fetch_queue_request,
mox.IgnoreArg()).WithSideEffects(
lambda _, _1, _2, response: response.CopyFrom(fetch_queue_response))
queue_stats_request = taskqueue_service_pb.TaskQueueFetchQueueStatsRequest()
queue_stats_request.add_queue_name('queue1')
queue_stats_request.add_queue_name('queue2')
queue_stats_response = (
taskqueue_service_pb.TaskQueueFetchQueueStatsResponse())
queue_stats1 = queue_stats_response.add_queuestats()
queue_stats1.set_num_tasks(20)
queue_stats1.set_oldest_eta_usec(-1)
queue_stats2 = queue_stats_response.add_queuestats()
queue_stats2.set_num_tasks(50)
queue_stats2.set_oldest_eta_usec(1234567890)
apiproxy_stub_map.MakeSyncCall(
'taskqueue',
'FetchQueueStats',
queue_stats_request,
mox.IgnoreArg()).WithSideEffects(
lambda _, _1, _2, response: response.CopyFrom(queue_stats_response))
self.mox.ReplayAll()
queues = list(taskqueue_utils.QueueInfo.get())
self.mox.VerifyAll()
self.assertEqual('queue1', queues[0].name)
self.assertEqual('queue2', queues[1].name)
def test_get_with_queue_name(self):
fetch_queue_request = taskqueue_service_pb.TaskQueueFetchQueuesRequest()
fetch_queue_request.set_max_rows(1000)
fetch_queue_response = taskqueue_service_pb.TaskQueueFetchQueuesResponse()
queue1 = fetch_queue_response.add_queue()
queue1.set_queue_name('queue1')
queue1.set_mode(taskqueue_service_pb.TaskQueueMode.PUSH)
queue1.set_user_specified_rate('20/s')
queue1.set_bucket_capacity(10)
queue2 = fetch_queue_response.add_queue()
queue2.set_queue_name('queue2')
queue2.set_mode(taskqueue_service_pb.TaskQueueMode.PULL)
queue2.set_user_specified_rate('20/s')
queue2.set_bucket_capacity(10)
apiproxy_stub_map.MakeSyncCall(
'taskqueue',
'FetchQueues',
fetch_queue_request,
mox.IgnoreArg()).WithSideEffects(
lambda _, _1, _2, response: response.CopyFrom(fetch_queue_response))
queue_stats_request = taskqueue_service_pb.TaskQueueFetchQueueStatsRequest()
queue_stats_request.add_queue_name('queue1')
queue_stats_request.add_queue_name('queue2')
queue_stats_response = (
taskqueue_service_pb.TaskQueueFetchQueueStatsResponse())
queue_stats1 = queue_stats_response.add_queuestats()
queue_stats1.set_num_tasks(20)
queue_stats1.set_oldest_eta_usec(-1)
queue_stats2 = queue_stats_response.add_queuestats()
queue_stats2.set_num_tasks(50)
queue_stats2.set_oldest_eta_usec(1234567890)
apiproxy_stub_map.MakeSyncCall(
'taskqueue',
'FetchQueueStats',
queue_stats_request,
mox.IgnoreArg()).WithSideEffects(
lambda _, _1, _2, response: response.CopyFrom(queue_stats_response))
self.mox.ReplayAll()
queues = list(taskqueue_utils.QueueInfo.get(frozenset(['queue1'])))
self.mox.VerifyAll()
self.assertEqual('queue1', queues[0].name)
self.assertEqual(1, len(queues))
if __name__ == '__main__':
unittest.main()