blob: 98af085644a29bbcdd40c82a5275b7138f060784 [file] [log] [blame]
# Copyright 2015 Google Inc. All Rights Reserved.
import unittest
import webtest
import apprtc
import compute_page
from test_util import CapturingFunction
from test_util import ReplaceFunction
from google.appengine.ext import testbed
class FakeComputeService(object):
"""Handles Compute Engine service calls to the Google API client."""
def __init__(self):
# This is a bit hacky but the Google API is lots of function
# chaining. This at least makes things easier to reason about as
# opposed to making a new variable for each level.
self.instances = CapturingFunction(lambda: self.instances)
self.instances.start = CapturingFunction(lambda: self.instances.start)
self.instances.start.execute = CapturingFunction()
self.instances.stop = CapturingFunction(lambda: self.instances.stop)
self.instances.stop.execute = CapturingFunction()
# Change the status of an instance by setting the return value.
self.get_return_value = None
self.instances.get = CapturingFunction(lambda: self.instances.get)
self.instances.get.execute = CapturingFunction(
lambda: self.get_return_value)
class ComputePageHandlerTest(unittest.TestCase):
def setUp(self):
# First, create an instance of the Testbed class.
self.testbed = testbed.Testbed()
# Then activate the testbed, which prepares the service stubs for use.
self.testbed.activate()
# Next, declare which service stubs you want to use.
self.testbed.init_taskqueue_stub()
self.test_app = webtest.TestApp(apprtc.app)
# Fake out the compute service.
self.build_compute_service_replacement = ReplaceFunction(
compute_page.ComputePage,
'_build_compute_service',
self.fake_build_compute_service)
# Note that '_build_compute_service' is called at request
# time. Tests need to be able to setup test data in the fake
# before ComputePage is initialize. Thus compute_service is
# created here and it is assumed that _build_compute_service is
# only called once.
self.compute_service = FakeComputeService()
# Default testbed app name.
self.app_id = 'testbed-test'
def fake_build_compute_service(self, *args):
return self.compute_service
def tearDown(self):
self.testbed.deactivate()
del self.build_compute_service_replacement
# TODO(decurtis): Extract these functions to our own test base class.
def makePostRequest(self, path, body=''):
return self.test_app.post(path, body, headers={'User-Agent': 'Safari'})
def makeGetRequest(self, path):
# PhantomJS uses WebKit, so Safari is closest to the thruth.
return self.test_app.get(path, headers={'User-Agent': 'Safari'})
def testGetStatus(self):
instance = 'test-instance'
zone = 'test-zone'
# Fake instance has status of RUNNING
instance_status = compute_page.COMPUTE_STATUS_RUNNING
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
response = self.makeGetRequest('/compute/%s/%s/%s' %
(compute_page.ACTION_STATUS, instance, zone))
get_dict = {'project': self.app_id,
'instance': instance,
'zone': zone}
actual_get_dict = self.compute_service.instances.get.last_kwargs
self.assertEqual(get_dict, actual_get_dict)
self.assertEqual(instance_status, response.body)
def testPostStartWhenTerminated(self):
"""Test start action with instance in the TERMINATED state."""
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
# Suppose that instance is TERMINATED.
instance_status = compute_page.COMPUTE_STATUS_TERMINATED
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
# makePostRequest will check for 200 success.
self.makePostRequest(start_url)
# If the state is TERMINATED then we should have a new start task.
tasks = self.testbed.get_stub('taskqueue').GetTasks('default')
self.assertEqual(1, len(tasks))
# Verify start() API called only once.
self.assertEqual(1, self.compute_service.instances.start.num_calls)
# Check start() API called when in the TERMINATED state.
get_dict = {'project': self.app_id,
'instance': instance,
'zone': zone}
actual_get_dict = self.compute_service.instances.start.last_kwargs
self.assertEqual(get_dict, actual_get_dict)
def testPostStartWhenRunning(self):
"""Test start action when instance in the RUNNING state."""
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
instance_status = compute_page.COMPUTE_STATUS_RUNNING
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
self.makePostRequest(start_url)
# No further tasks needed.
tasks = self.testbed.get_stub('taskqueue').GetTasks('default')
self.assertEqual(0, len(tasks))
# Verify start() API not called.
self.assertEqual(0, self.compute_service.instances.start.num_calls)
def testPostStartNotRunningOrTerminated(self):
"""Test start action in an intermediate state."""
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
# Check all intermediate states.
for instance_status in ['PROVISIONING', 'STAGING', 'STOPPING']:
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
self.makePostRequest(start_url)
# Since if the instance neither RUNNING nor TERMINATED then a
# new task should be queued.
tasks = taskqueue.GetTasks('default')
self.assertEqual(1, len(tasks))
task = tasks[-1]
self.assertEqual(start_url, task['url'])
# Verify start() API not called.
self.assertEqual(0, self.compute_service.instances.start.num_calls)
# Simulate the start task running AGAIN but instance is RUNNING.
taskqueue.FlushQueue('default')
def testPostRestartWhenRunning(self):
"""Test restart action in a running state."""
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
restart_url = '/compute/%s/%s/%s' % (compute_page.ACTION_RESTART,
instance, zone)
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: 'RUNNING'
}
self.makePostRequest(restart_url)
# A new task should be queued.
tasks = taskqueue.GetTasks('default')
self.assertEqual(1, len(tasks))
task = tasks[-1]
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
self.assertEqual(start_url, task['url'])
# Verify stop() API called.
self.assertEqual(1, self.compute_service.instances.stop.num_calls)
def testPostRestartWhenNotRunning(self):
"""Test restart action in a non-running state."""
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
restart_url = '/compute/%s/%s/%s' % (compute_page.ACTION_RESTART,
instance, zone)
for status in ['PROVISIONING', 'STAGING', 'STOPPING', 'TERMINATED']:
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: status
}
self.makePostRequest(restart_url)
# No task should be queued.
tasks = taskqueue.GetTasks('default')
self.assertEqual(0, len(tasks))
# Verify stop() API not called.
self.assertEqual(0, self.compute_service.instances.stop.num_calls)