blob: b0c45e56af271dc0e91e1c776184651496a10938 [file] [log] [blame] [edit]
#!/usr/bin/env python
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the Pipeline API."""
from __future__ import with_statement
import base64
import datetime
import functools
import logging
import os
import pickle
import sys
import unittest
import urllib
import urlparse
# Fix up paths for running tests.
sys.path.insert(0, '../src/')
try:
import json
except ImportError:
import simplejson as json
from pipeline import common
from pipeline import pipeline
import test_shared
import testutil
from google.appengine.api import mail
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import testbed
# For convenience.
_BarrierIndex = pipeline.models._BarrierIndex
_BarrierRecord = pipeline.models._BarrierRecord
_PipelineRecord = pipeline.models._PipelineRecord
_SlotRecord = pipeline.models._SlotRecord
_StatusRecord = pipeline.models._StatusRecord
class TestBase(testutil.TestSetupMixin, unittest.TestCase):
"""Base class for all tests in this module."""
def setUp(self):
super(TestBase, self).setUp()
self.maxDiff = 10**10
# First, create an instance of the Testbed class.
self.testbed = testbed.Testbed()
# Then activate the testbed, which prepares the service stubs for use.
self.testbed.activate()
# Next, declare which service stubs you want to use.
self.testbed.init_blobstore_stub()
self.testbed.init_urlfetch_stub()
self.testbed.init_app_identity_stub()
def tearDown(self):
self.testbed.deactivate()
def assertIn(self, the_thing, what_thing_should_be_in):
"""Asserts that something is contained in something else."""
if the_thing not in what_thing_should_be_in:
raise AssertionError('Could not find %r in %r' % (
the_thing, what_thing_should_be_in))
class SlotTest(TestBase):
"""Tests for the Slot class."""
def testCreate(self):
"""Tests creating Slots with names and keys."""
slot = pipeline.Slot(name='stuff')
self.assertEquals('stuff', slot.name)
self.assertTrue(slot.key)
self.assertFalse(slot.filled)
self.assertFalse(slot._exists)
self.assertRaises(pipeline.SlotNotFilledError, lambda: slot.value)
self.assertRaises(pipeline.SlotNotFilledError, lambda: slot.filler)
self.assertRaises(pipeline.SlotNotFilledError, lambda: slot.fill_datetime)
slot_key = db.Key.from_path('mykind', 'mykey')
slot = pipeline.Slot(name='stuff', slot_key=slot_key)
self.assertEquals('stuff', slot.name)
self.assertEquals(slot_key, slot.key)
self.assertFalse(slot.filled)
self.assertTrue(slot._exists)
self.assertRaises(pipeline.UnexpectedPipelineError, pipeline.Slot)
def testSlotRecord(self):
"""Tests filling Slot attributes with a _SlotRecord."""
slot_key = db.Key.from_path('myslot', 'mykey')
filler_key = db.Key.from_path('myfiller', 'mykey')
now = datetime.datetime.utcnow()
slot_record = _SlotRecord(
filler=filler_key,
value=json.dumps('my value'),
status=_SlotRecord.FILLED,
fill_time=now)
slot = pipeline.Slot(name='stuff', slot_key=slot_key)
slot._set_value(slot_record)
self.assertTrue(slot._exists)
self.assertTrue(slot.filled)
self.assertEquals('my value', slot.value)
self.assertEquals(filler_key.name(), slot.filler)
self.assertEquals(now, slot.fill_datetime)
def testValueTestMode(self):
"""Tests filling Slot attributes for test mode."""
slot_key = db.Key.from_path('myslot', 'mykey')
filler_key = db.Key.from_path('myfiller', 'mykey')
now = datetime.datetime.utcnow()
value = 'my value'
slot = pipeline.Slot(name='stuff', slot_key=slot_key)
slot._set_value_test(filler_key, value)
self.assertTrue(slot._exists)
self.assertTrue(slot.filled)
self.assertEquals('my value', slot.value)
self.assertEquals(filler_key.name(), slot.filler)
self.assertTrue(isinstance(slot.fill_datetime, datetime.datetime))
class PipelineFutureTest(TestBase):
"""Tests for the PipelineFuture class."""
def testNormal(self):
"""Tests using a PipelineFuture in normal mode."""
future = pipeline.PipelineFuture([])
self.assertTrue('default' in future._output_dict)
default = future.default
self.assertTrue(isinstance(default, pipeline.Slot))
self.assertFalse(default.filled)
self.assertFalse('stuff' in future._output_dict)
stuff = future.stuff
self.assertTrue('stuff' in future._output_dict)
self.assertNotEquals(stuff.key, default.key)
self.assertTrue(isinstance(stuff, pipeline.Slot))
self.assertFalse(stuff.filled)
def testStrictMode(self):
"""Tests using a PipelineFuture that's in strict mode."""
future = pipeline.PipelineFuture(['one', 'two'])
self.assertTrue(future._strict)
self.assertTrue('default' in future._output_dict)
self.assertTrue('one' in future._output_dict)
self.assertTrue('two' in future._output_dict)
default = future.default
self.assertTrue(isinstance(default, pipeline.Slot))
self.assertFalse(default.filled)
one = future.one
self.assertTrue(isinstance(one, pipeline.Slot))
self.assertFalse(one.filled)
self.assertNotEquals(one.key, default.key)
two = future.two
self.assertTrue(isinstance(two, pipeline.Slot))
self.assertFalse(two.filled)
self.assertNotEquals(two.key, default.key)
self.assertNotEquals(two.key, one.key)
self.assertRaises(pipeline.SlotNotDeclaredError, lambda: future.three)
def testReservedOutputs(self):
"""Tests reserved output slot names."""
self.assertRaises(pipeline.UnexpectedPipelineError,
pipeline.PipelineFuture, ['default'])
def testInheritOutputs(self):
"""Tests _inherit_outputs without resolving their values."""
future = pipeline.PipelineFuture([])
already_defined = {
'one': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist1')),
'two': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist2')),
'three': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist3')),
'default': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist4')),
}
future = pipeline.PipelineFuture([])
self.assertFalse(future.default._exists)
future._inherit_outputs('mypipeline', already_defined)
self.assertEquals(already_defined['one'], str(future.one.key))
self.assertEquals(already_defined['two'], str(future.two.key))
self.assertEquals(already_defined['three'], str(future.three.key))
self.assertEquals(already_defined['default'], str(future.default.key))
self.assertTrue(future.one._exists)
self.assertTrue(future.two._exists)
self.assertTrue(future.three._exists)
self.assertTrue(future.default._exists)
def testInheritOutputsStrictMode(self):
"""Tests _inherit_outputs without resolving their values in strict mode."""
already_defined = {
'one': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist1')),
'two': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist2')),
'three': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist3')),
'default': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist4')),
}
future = pipeline.PipelineFuture(['one', 'two', 'three'])
self.assertFalse(future.one._exists)
self.assertFalse(future.two._exists)
self.assertFalse(future.three._exists)
self.assertFalse(future.default._exists)
future._inherit_outputs('mypipeline', already_defined)
self.assertEquals(already_defined['one'], str(future.one.key))
self.assertEquals(already_defined['two'], str(future.two.key))
self.assertEquals(already_defined['three'], str(future.three.key))
self.assertEquals(already_defined['default'], str(future.default.key))
self.assertTrue(future.one._exists)
self.assertTrue(future.two._exists)
self.assertTrue(future.three._exists)
self.assertTrue(future.default._exists)
def testInheritOutputsStrictModeUndeclared(self):
"""Tests _inherit_outputs when an inherited output has not been declared."""
already_defined = {
'one': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist1')),
'two': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist2')),
'three': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist3')),
'default': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist4')),
'five': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist5')),
}
future = pipeline.PipelineFuture(['one', 'two', 'three'])
self.assertRaises(pipeline.UnexpectedPipelineError, future._inherit_outputs,
'mypipeline', already_defined)
def testInheritOutputsResolveValues(self):
"""Tests _inherit_outputs with resolving their current values."""
one = _SlotRecord(
value=json.dumps('hi one'),
status=_SlotRecord.FILLED,
fill_time=datetime.datetime.utcnow(),
filler=db.Key.from_path('mykind', 'mykey1'))
one.put()
two = _SlotRecord(
value=json.dumps('hi two'),
status=_SlotRecord.FILLED,
fill_time=datetime.datetime.utcnow(),
filler=db.Key.from_path('mykind', 'mykey2'))
two.put()
three = _SlotRecord()
three.put()
default = _SlotRecord()
default.put()
already_defined = {
'one': str(one.key()),
'two': str(two.key()),
'three': str(three.key()),
'default': str(default.key()),
}
future = pipeline.PipelineFuture([])
future._inherit_outputs('mypipeline', already_defined, resolve_outputs=True)
self.assertEquals('hi one', future.one.value)
self.assertEquals('hi two', future.two.value)
self.assertFalse(future.three.filled)
def testInheritOutputsResolveValuesMissing(self):
"""Tests when output _SlotRecords are missing for inherited outputs."""
already_defined = {
'four': str(db.Key.from_path(_SlotRecord.kind(), 'does not exist')),
}
future = pipeline.PipelineFuture([])
self.assertRaises(pipeline.UnexpectedPipelineError, future._inherit_outputs,
'mypipeline', already_defined, resolve_outputs=True)
class NothingPipeline(pipeline.Pipeline):
"""Pipeline that does nothing."""
output_names = ['one', 'two']
def run(self):
self.fill('one', 1)
self.fill('two', 1)
class OutputlessPipeline(pipeline.Pipeline):
"""Pipeline that outputs nothing."""
def run(self):
pass
class AsyncOutputlessPipeline(pipeline.Pipeline):
"""Pipeline that outputs nothing."""
async = True
def run(self):
self.complete()
class AsyncCancellable(pipeline.Pipeline):
"""Pipeline that can be cancelled."""
async = True
def run(self):
self.complete()
def try_cancel(self):
return True
class PipelineTest(TestBase):
"""Tests for the Pipeline class."""
def testClassPath(self):
"""Tests the class path resolution class method."""
module_dict = {}
self.assertEquals(None, pipeline.Pipeline._class_path)
pipeline.Pipeline._set_class_path(module_dict)
self.assertEquals(None, pipeline.Pipeline._class_path)
class MyModule(object):
pass
mymodule = MyModule()
setattr(mymodule, 'NothingPipeline', NothingPipeline)
# Does not require __main__.
module_dict['other'] = mymodule
NothingPipeline._set_class_path(module_dict=module_dict)
self.assertEquals('other.NothingPipeline', NothingPipeline._class_path)
# Will ignore __main__.
NothingPipeline._class_path = None
module_dict['__main__'] = mymodule
NothingPipeline._set_class_path(module_dict=module_dict)
self.assertEquals('other.NothingPipeline', NothingPipeline._class_path)
# Will use __main__ as a last resort.
NothingPipeline._class_path = None
del module_dict['other']
NothingPipeline._set_class_path(module_dict=module_dict)
self.assertEquals('__main__.NothingPipeline', NothingPipeline._class_path)
def testStart(self):
"""Tests starting a Pipeline."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
self.assertEquals(('one', 'two'), stage.args)
self.assertEquals({'three': 'red', 'four': 1234}, stage.kwargs)
self.assertTrue(stage.start() is None)
self.assertEquals('default', stage.queue_name)
self.assertEquals('/_ah/pipeline', stage.base_path)
self.assertEquals(stage.pipeline_id, stage.root_pipeline_id)
self.assertTrue(stage.is_root)
pipeline_record = _PipelineRecord.get_by_key_name(stage.pipeline_id)
self.assertTrue(pipeline_record is not None)
self.assertEquals('__main__.NothingPipeline', pipeline_record.class_path)
self.assertEquals(_PipelineRecord.WAITING, pipeline_record.status)
params = pipeline_record.params
self.assertEquals(params['args'],
[{'type': 'value', 'value': 'one'}, {'type': 'value', 'value': 'two'}])
self.assertEquals(params['kwargs'],
{'four': {'type': 'value', 'value': 1234},
'three': {'type': 'value', 'value': 'red'}})
self.assertEquals([], params['after_all'])
self.assertEquals('default', params['queue_name'])
self.assertEquals('/_ah/pipeline', params['base_path'])
self.assertEquals(set(NothingPipeline.output_names + ['default']),
set(params['output_slots'].keys()))
self.assertTrue(pipeline_record.is_root_pipeline)
self.assertTrue(isinstance(pipeline_record.start_time, datetime.datetime))
# Verify that all output slots are present.
slot_records = list(_SlotRecord.all().filter(
'root_pipeline =',
db.Key.from_path(_PipelineRecord.kind(), stage.pipeline_id)))
slot_dict = dict((s.key(), s) for s in slot_records)
self.assertEquals(3, len(slot_dict))
for outputs in params['output_slots'].itervalues():
slot_record = slot_dict[db.Key(outputs)]
self.assertEquals(_SlotRecord.WAITING, slot_record.status)
# Verify that trying to add another output slot will fail.
self.assertRaises(pipeline.SlotNotDeclaredError,
lambda: stage.outputs.does_not_exist)
# Verify that the slot existence has been set to true.
for slot in stage.outputs._output_dict.itervalues():
self.assertTrue(slot._exists)
# Verify the enqueued task.
task_list = test_shared.get_tasks()
self.assertEquals(1, len(task_list))
task = task_list[0]
self.assertEquals(
{'pipeline_key': [str(db.Key.from_path(
_PipelineRecord.kind(), stage.pipeline_id))]},
task['params'])
self.assertEquals('/_ah/pipeline/run', task['url'])
def testStartIdempotenceKey(self):
"""Tests starting a pipeline with an idempotence key."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
self.assertTrue(stage.start(idempotence_key='banana') is None)
self.assertEquals('banana', stage.pipeline_id)
def testStartReturnTask(self):
"""Tests starting a pipeline and returning the kick-off task."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
task = stage.start(return_task=True, idempotence_key='banana')
self.assertEquals(0, len(test_shared.get_tasks()))
self.assertEquals('/_ah/pipeline/run', task.url)
self.assertEquals(
'pipeline_key=%s' % db.Key.from_path(_PipelineRecord.kind(), 'banana'),
task.payload)
self.assertTrue(task.name is None)
def testStartQueueName(self):
"""Tests that the start queue name will be preserved."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
self.assertTrue(stage.start(queue_name='other') is None)
self.assertEquals(0, len(test_shared.get_tasks('default')))
self.assertEquals(1, len(test_shared.get_tasks('other')))
def testStartCountdown(self):
"""Tests starting a pipeline with a countdown."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
eta = datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
task = stage.start(return_task=True, countdown=30)
self.assertEquals(0, len(test_shared.get_tasks()))
self.assertTrue(eta <= task.eta.replace(tzinfo=None))
def testStartEta(self):
"""Tests starting a pipeline with an eta."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
eta = datetime.datetime.now() + datetime.timedelta(seconds=30)
task = stage.start(return_task=True, eta=eta)
self.assertEquals(0, len(test_shared.get_tasks()))
self.assertEquals(eta, test_shared.utc_to_local(task.eta))
def testStartCountdownAndEta(self):
"""Tests starting a pipeline with both a countdown and eta."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
eta = datetime.datetime.utcnow() + datetime.timedelta(seconds=30)
self.assertRaises(pipeline.PipelineSetupError,
stage.start, countdown=30, eta=eta)
def testStartUndeclaredOutputs(self):
"""Tests that accessing undeclared outputs on a root pipeline will err.
Only applies to root pipelines that have no named outputs and only have
the default output slot.
"""
stage = OutputlessPipeline()
stage.start()
self.assertFalse(stage.outputs.default.filled)
self.assertRaises(pipeline.SlotNotDeclaredError, lambda: stage.outputs.blah)
def testStartIdempotenceKeyExists(self):
"""Tests when the idempotence key is a dupe."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
other_stage = OutputlessPipeline()
self.assertRaises(pipeline.PipelineExistsError,
other_stage.start, idempotence_key='banana')
def testStartIdempotenceKeyIsRandomGarbage(self):
"""Tests when the idempotence key binary garbage."""
idempotence_key = '\xfb\xcaOu\t72\xa2\x08\xc9\xb9\x82\xa1\xf4>\xba>SwL'
self.assertRaises(UnicodeDecodeError, idempotence_key.encode, 'utf-8')
stage = OutputlessPipeline()
stage.start(idempotence_key=idempotence_key)
other_stage = OutputlessPipeline()
self.assertRaises(pipeline.PipelineExistsError,
other_stage.start, idempotence_key=idempotence_key)
result = OutputlessPipeline.from_id(idempotence_key)
self.assertTrue(result is not None)
def testStartRetryParameters(self):
"""Tests setting retry backoff parameters before calling start()."""
stage = OutputlessPipeline()
stage.max_attempts = 15
stage.backoff_seconds = 1234.56
stage.backoff_factor = 2.718
stage.start(idempotence_key='banana')
pipeline_record = _PipelineRecord.get_by_key_name(stage.pipeline_id)
self.assertTrue(pipeline_record is not None)
self.assertEquals(15, pipeline_record.params['max_attempts'])
self.assertEquals(1234.56, pipeline_record.params['backoff_seconds'])
self.assertEquals(2.718, pipeline_record.params['backoff_factor'])
def testStartException(self):
"""Tests when a dependent method from start raises an exception."""
def mock_raise(*args, **kwargs):
raise Exception('Doh! Fake error')
stage = OutputlessPipeline()
stage._set_values_internal = mock_raise
try:
stage.start(idempotence_key='banana')
self.fail('Did not raise')
except pipeline.PipelineSetupError, e:
self.assertEquals(
'Error starting __main__.OutputlessPipeline(*(), **{})#banana: '
'Doh! Fake error',
str(e))
def testFromId(self):
"""Tests retrieving a Pipeline instance by ID."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.max_attempts = 15
stage.backoff_seconds = 1234.56
stage.backoff_factor = 2.718
stage.target = 'my-other-target'
stage.start(queue_name='other', base_path='/other', idempotence_key='meep')
other = NothingPipeline.from_id(stage.pipeline_id)
self.assertEquals(('one', 'two'), other.args)
self.assertEquals({'three': 'red', 'four': 1234}, other.kwargs)
self.assertEquals('other', other.queue_name)
self.assertEquals('/other', other.base_path)
self.assertEquals('meep', other.pipeline_id)
self.assertEquals('meep', other.root_pipeline_id)
self.assertTrue(other.is_root)
self.assertEquals(15, other.max_attempts)
self.assertEquals(1234.56, other.backoff_seconds)
self.assertEquals(2.718, other.backoff_factor)
self.assertEquals('my-other-target', other.target)
self.assertEquals(1, other.current_attempt)
self.assertFalse(other.outputs.one.filled)
self.assertEquals(stage.outputs.one.key, other.outputs.one.key)
self.assertFalse(other.outputs.two.filled)
self.assertEquals(stage.outputs.two.key, other.outputs.two.key)
def testFromIdResolveOutputs(self):
"""Tests retrieving a Pipeline instance by ID and resolving its outputs."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.start(queue_name='other', base_path='/other', idempotence_key='meep')
stage.fill('one', 'red')
stage.fill('two', 'blue')
other = NothingPipeline.from_id(stage.pipeline_id)
self.assertTrue(other.outputs.one.filled)
self.assertEquals(stage.outputs.one.key, other.outputs.one.key)
self.assertEquals('red', other.outputs.one.value)
self.assertTrue(other.outputs.two.filled)
self.assertEquals(stage.outputs.two.key, other.outputs.two.key)
self.assertEquals('blue', other.outputs.two.value)
def testFromIdReturnsOriginalClass(self):
"""Tests that from_id() will always return the original class."""
stage = AsyncOutputlessPipeline()
stage.start()
other = pipeline.Pipeline.from_id(stage.pipeline_id)
self.assertTrue(isinstance(other, AsyncOutputlessPipeline))
self.assertTrue(type(other) is not pipeline.Pipeline)
self.assertTrue(other.async) # Class variables preserved
def testFromIdCannotFindOriginalClass(self):
"""Tests when from_id() cannot find the original class."""
stage = NothingPipeline()
stage.start()
pipeline_record = _PipelineRecord.get_by_key_name(stage.pipeline_id)
pipeline_record.class_path = 'does_not_exist.or_something'
pipeline_record.put()
other = pipeline.Pipeline.from_id(stage.pipeline_id)
self.assertTrue(type(other) is pipeline.Pipeline)
def testFillString(self):
"""Tests filling a slot by name."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.start(queue_name='other', base_path='/other', idempotence_key='meep')
stage.fill('one', 'red')
stage.fill('two', 'blue')
other = NothingPipeline.from_id(stage.pipeline_id)
self.assertEquals('red', other.outputs.one.value)
self.assertEquals('blue', other.outputs.two.value)
def testFillSlot(self):
"""Tests filling a slot with a Slot instance."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.start(queue_name='other', base_path='/other', idempotence_key='meep')
stage.fill(stage.outputs.one, 'red')
stage.fill(stage.outputs.two, 'blue')
other = NothingPipeline.from_id(stage.pipeline_id)
self.assertEquals('red', other.outputs.one.value)
self.assertEquals('blue', other.outputs.two.value)
def testFillSlot_Huge(self):
"""Tests filling a slot with over 1MB of data."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.start(queue_name='other', base_path='/other', idempotence_key='meep')
big_data = 'red' * 1000000
self.assertTrue(len(big_data) > 1000000)
small_data = 'blue' * 500
self.assertTrue(len(small_data) < 1000000)
stage.fill(stage.outputs.one, big_data)
stage.fill(stage.outputs.two, small_data)
other = NothingPipeline.from_id(stage.pipeline_id)
self.assertEquals(big_data, other.outputs.one.value)
self.assertEquals(small_data, other.outputs.two.value)
def testFillSlotErrors(self):
"""Tests errors that happen when filling slots."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.start(queue_name='other', base_path='/other', idempotence_key='meep')
self.assertRaises(pipeline.UnexpectedPipelineError,
stage.fill, object(), 'red')
slot = pipeline.Slot(name='one')
self.assertRaises(pipeline.SlotNotDeclaredError,
stage.fill, slot, 'red')
db.delete(stage.outputs.one.key)
self.assertRaises(pipeline.UnexpectedPipelineError,
stage.fill, stage.outputs.one, 'red')
def testComplete(self):
"""Tests asynchronous completion of the pipeline."""
stage = AsyncOutputlessPipeline()
stage.start(idempotence_key='banana')
stage.complete(1234)
other = AsyncOutputlessPipeline.from_id(stage.pipeline_id)
self.assertEquals(1234, other.outputs.default.value)
def testCompleteDisallowed(self):
"""Tests completion of the pipeline when it's not asynchronous."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.start()
self.assertRaises(pipeline.UnexpectedPipelineError, stage.complete)
def testGetCallbackUrl(self):
"""Tests the get_callback_url method."""
stage = AsyncOutputlessPipeline()
stage.start(idempotence_key='banana')
result = stage.get_callback_url(one='red', two='blue', three=12345)
self.assertEquals(
'/_ah/pipeline/callback'
'?one=red&pipeline_id=banana&three=12345&two=blue',
result)
def testGetCallbackTask(self):
"""Tests the get_callback_task method."""
stage = AsyncOutputlessPipeline()
stage.start(idempotence_key='banana')
now = datetime.datetime.now()
task = stage.get_callback_task(
params=dict(one='red', two='blue', three=12345),
method='overridden',
name='my-name',
eta=now)
self.assertEquals('/_ah/pipeline/callback', task.url)
self.assertEquals(
{'two': ['blue'],
'one': ['red'],
'pipeline_id': ['banana'],
'three': ['12345']},
urlparse.parse_qs(task.payload))
self.assertEquals('POST', task.method)
self.assertEquals('my-name', task.name)
self.assertEquals(now, test_shared.utc_to_local(task.eta))
def testAccesorsUnknown(self):
"""Tests using accessors when they have unknown values."""
stage = OutputlessPipeline()
self.assertTrue(stage.pipeline_id is None)
self.assertTrue(stage.root_pipeline_id is None)
self.assertTrue(stage.queue_name is None)
self.assertTrue(stage.base_path is None)
self.assertFalse(stage.has_finalized)
self.assertFalse(stage.was_aborted)
self.assertFalse(stage.has_finalized)
def testHasFinalized(self):
"""Tests the has_finalized method."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
self.assertFalse(stage.has_finalized)
other = OutputlessPipeline.from_id(stage.pipeline_id)
self.assertFalse(other.has_finalized)
other._context.transition_complete(other._pipeline_key)
another = OutputlessPipeline.from_id(stage.pipeline_id)
self.assertTrue(another.has_finalized)
def testWasAborted(self):
"""Tests the was_aborted method."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
self.assertFalse(stage.was_aborted)
other = OutputlessPipeline.from_id(stage.pipeline_id)
self.assertFalse(other.was_aborted)
other.abort()
# Even after sending the abort signal, it won't show up as aborted.
another = OutputlessPipeline.from_id(stage.pipeline_id)
self.assertFalse(another.was_aborted)
# Now transition to the aborted state.
another._context.transition_aborted(stage._pipeline_key)
yet_another = OutputlessPipeline.from_id(stage.pipeline_id)
self.assertTrue(yet_another.was_aborted)
def testRetryPossible(self):
"""Tests calling retry when it is possible."""
stage = AsyncCancellable()
stage.start(idempotence_key='banana')
self.assertEquals(1, stage.current_attempt)
self.assertTrue(stage.retry('My message 1'))
other = AsyncCancellable.from_id(stage.pipeline_id)
self.assertEquals(2, other.current_attempt)
self.assertTrue(stage.retry())
other = AsyncCancellable.from_id(stage.pipeline_id)
self.assertEquals(3, other.current_attempt)
def testRetryNotPossible(self):
"""Tests calling retry when the pipeline says it's not possible."""
stage = AsyncOutputlessPipeline()
stage.start(idempotence_key='banana')
self.assertEquals(1, stage.current_attempt)
self.assertFalse(stage.retry())
other = AsyncCancellable.from_id(stage.pipeline_id)
self.assertEquals(1, other.current_attempt)
def testRetryDisallowed(self):
"""Tests retry of the pipeline when it's not asynchronous."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
self.assertEquals(1, stage.current_attempt)
self.assertRaises(pipeline.UnexpectedPipelineError, stage.retry)
def testAbortRootSync(self):
"""Tests aborting a non-async, root pipeline."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
self.assertTrue(stage.abort('gotta bail!'))
# Does not effect the current instance; it's just a signal.
self.assertFalse(stage.was_aborted)
def testAbortRootAsync(self):
"""Tests when the root pipeline is async and try_cancel is True."""
stage = AsyncCancellable()
stage.start(idempotence_key='banana')
self.assertTrue(stage.abort('gotta bail!'))
# Does not effect the current instance; it's just a signal.
self.assertFalse(stage.was_aborted)
def testAbortRootAsyncNotPossible(self):
"""Tests when the root pipeline is async and cannot be canceled."""
stage = AsyncOutputlessPipeline()
stage.start(idempotence_key='banana')
self.assertFalse(stage.abort('gotta bail!'))
# Does not effect the current instance; it's just a signal.
self.assertFalse(stage.was_aborted)
def testAbortRootSyncAlreadyAborted(self):
"""Tests aborting when the sync pipeline has already been aborted."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
self.assertTrue(stage.abort('gotta bail!'))
self.assertFalse(stage.abort('gotta bail 2!'))
def testAbortRootAsyncAlreadyAborted(self):
"""Tests aborting when the async pipeline has already been aborted."""
stage = AsyncCancellable()
stage.start(idempotence_key='banana')
self.assertTrue(stage.abort('gotta bail!'))
self.assertFalse(stage.abort('gotta bail 2!'))
def testFinalizeEmailDone(self):
"""Tests completion emails for completed root pipelines."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
stage._context.transition_complete(stage._pipeline_key)
other = OutputlessPipeline.from_id(stage.pipeline_id)
result = []
def fake_mail(self, sender, subject, body, html=None):
result.append((sender, subject, body, html))
old_sendmail = pipeline.Pipeline._send_mail
pipeline.Pipeline._send_mail = fake_mail
try:
other.send_result_email()
finally:
pipeline.Pipeline._send_mail = old_sendmail
self.assertEquals(1, len(result))
sender, subject, body, html = result[0]
self.assertEquals('my-app-id@my-app-id.appspotmail.com', sender)
self.assertEquals(
'Pipeline successful: App "my-app-id", '
'__main__.OutputlessPipeline#banana',
subject)
self.assertEquals(
'View the pipeline results here:\n\n'
'http://my-app-id.appspot.com/_ah/pipeline/status?root=banana\n\n'
'Thanks,\n\nThe Pipeline API\n',
body)
self.assertEquals(
'<html><body>\n<p>View the pipeline results here:</p>\n\n<p><a href="'
'http://my-app-id.appspot.com/_ah/pipeline/status?root=banana"\n'
'>http://my-app-id.appspot.com/_ah/pipeline/status?root=banana'
'</a></p>\n\n<p>\nThanks,\n<br>\nThe Pipeline API\n</p>\n'
'</body></html>\n',
html)
def testFinalizeEmailAborted(self):
"""Tests completion emails for aborted root pipelines."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
stage._context.transition_aborted(stage._pipeline_key)
other = OutputlessPipeline.from_id(stage.pipeline_id)
result = []
def fake_mail(self, sender, subject, body, html=None):
result.append((sender, subject, body, html))
old_sendmail = pipeline.Pipeline._send_mail
pipeline.Pipeline._send_mail = fake_mail
try:
other.send_result_email()
finally:
pipeline.Pipeline._send_mail = old_sendmail
self.assertEquals(1, len(result))
sender, subject, body, html = result[0]
self.assertEquals('my-app-id@my-app-id.appspotmail.com', sender)
self.assertEquals(
'Pipeline aborted: App "my-app-id", '
'__main__.OutputlessPipeline#banana',
subject)
self.assertEquals(
'View the pipeline results here:\n\n'
'http://my-app-id.appspot.com/_ah/pipeline/status?root=banana\n\n'
'Thanks,\n\nThe Pipeline API\n',
body)
self.assertEquals(
'<html><body>\n<p>View the pipeline results here:</p>\n\n<p><a href="'
'http://my-app-id.appspot.com/_ah/pipeline/status?root=banana"\n'
'>http://my-app-id.appspot.com/_ah/pipeline/status?root=banana'
'</a></p>\n\n<p>\nThanks,\n<br>\nThe Pipeline API\n</p>\n'
'</body></html>\n',
html)
def testFinalizeEmailError(self):
"""Tests when send_result_email raises an error."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
stage._context.transition_complete(stage._pipeline_key)
other = OutputlessPipeline.from_id(stage.pipeline_id)
def fake_mail(*args, **kwargs):
raise mail.InvalidEmailError('Doh!')
old_sendmail = pipeline.Pipeline._send_mail
pipeline.Pipeline._send_mail = fake_mail
try:
other.send_result_email()
finally:
pipeline.Pipeline._send_mail = old_sendmail
def testSetStatus(self):
"""Tests for the set_status method."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
stage.set_status(
message='This is my message',
console_url='/path/to/the/console',
status_links=dict(first='/one', second='/two', third='/three'))
record_list = list(_StatusRecord.all())
self.assertEquals(1, len(record_list))
status_record = record_list[0]
self.assertEquals('This is my message', status_record.message)
self.assertEquals('/path/to/the/console', status_record.console_url)
self.assertEquals(['first', 'second', 'third'], status_record.link_names)
self.assertEquals(['/one', '/two', '/three'], status_record.link_urls)
self.assertTrue(isinstance(status_record.status_time, datetime.datetime))
# Now resetting it will overwrite all fields.
stage.set_status(console_url='/another_console')
after_status_record = db.get(status_record.key())
self.assertEquals(None, after_status_record.message)
self.assertEquals('/another_console', after_status_record.console_url)
self.assertEquals([], after_status_record.link_names)
self.assertEquals([], after_status_record.link_urls)
self.assertNotEquals(after_status_record.status_time,
status_record.status_time)
def testSetStatusError(self):
"""Tests when set_status hits a Datastore error."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
try:
stage.set_status(message=object())
except pipeline.PipelineRuntimeError, e:
self.assertEquals(
'Could not set status for __main__.OutputlessPipeline(*(), **{})'
'#banana: Property message must be convertible to a Text instance '
'(Text() argument should be str or unicode, not object)',
str(e))
def testTestMode(self):
"""Tests the test_mode property of Pipelines."""
from pipeline import pipeline as local_pipeline
stage = OutputlessPipeline()
self.assertFalse(stage.test_mode)
local_pipeline._TEST_MODE = True
try:
self.assertTrue(stage.test_mode)
finally:
local_pipeline._TEST_MODE = False
def testCleanup(self):
"""Tests the cleanup method of Pipelines."""
stage = OutputlessPipeline()
self.assertRaises(pipeline.UnexpectedPipelineError, stage.cleanup)
stage.start(idempotence_key='banana')
self.assertTrue(stage.is_root)
stage.cleanup()
task_list = test_shared.get_tasks('default')
self.assertEquals(2, len(task_list))
start_task, cleanup_task = task_list
self.assertEquals('/_ah/pipeline/run', start_task['url'])
self.assertEquals('/_ah/pipeline/cleanup', cleanup_task['url'])
self.assertEquals(
'aglteS1hcHAtaWRyHwsSE19BRV9QaXBlbGluZV9SZWNvcmQiBmJhbmFuYQw',
dict(cleanup_task['headers'])['X-Ae-Pipeline-Key'])
self.assertEquals(
['aglteS1hcHAtaWRyHwsSE19BRV9QaXBlbGluZV9SZWNvcmQiBmJhbmFuYQw'],
cleanup_task['params']['root_pipeline_key'])
# If the stage is actually a child stage, then cleanup does nothing.
stage._root_pipeline_key = db.Key.from_path(
_PipelineRecord.kind(), 'other')
self.assertFalse(stage.is_root)
stage.cleanup()
task_list = test_shared.get_tasks('default')
self.assertEquals(2, len(task_list))
def testInheritTarget(self):
"""Tests pipeline inherits task target if none is specified."""
stage = OutputlessPipeline()
self.assertEqual('my-version.foo-module', stage.target)
stage.start(idempotence_key='banana')
task_list = test_shared.get_tasks('default')
self.assertEquals(1, len(task_list))
start_task = task_list[0]
self.assertEquals('/_ah/pipeline/run', start_task['url'])
self.assertEquals(
'my-version.foo-module.my-app-id.appspot.com',
dict(start_task['headers'])['Host'])
def testWithParams(self):
"""Tests the with_params helper method."""
stage = OutputlessPipeline().with_params(target='my-cool-target')
self.assertEquals('my-cool-target', stage.target)
stage.start(idempotence_key='banana')
task_list = test_shared.get_tasks('default')
self.assertEquals(1, len(task_list))
start_task = task_list[0]
self.assertEquals('/_ah/pipeline/run', start_task['url'])
self.assertEquals(
'my-cool-target.my-app-id.appspot.com',
dict(start_task['headers'])['Host'])
def testWithParams_Errors(self):
"""Tests misuse of the with_params helper method."""
stage = OutputlessPipeline()
# Bad argument
self.assertRaises(
TypeError, stage.with_params, unknown_arg='blah')
# If it's already active then you can't change the parameters.
stage.start(idempotence_key='banana')
self.assertRaises(
pipeline.UnexpectedPipelineError, stage.with_params)
class OrderingTest(TestBase):
"""Tests for the Ordering classes."""
def testAfterEmpty(self):
"""Tests when no futures are passed to the After() constructor."""
pipeline.After._local._after_all_futures = []
futures = []
after = pipeline.After(*futures)
self.assertEquals([], pipeline.After._local._after_all_futures)
after.__enter__()
self.assertEquals([], pipeline.After._local._after_all_futures)
self.assertFalse(after.__exit__(None, None, None))
self.assertEquals([], pipeline.After._local._after_all_futures)
def testAfterParameterNotFuture(self):
"""Tests when some other object is passed to the After() constructor."""
futures = [object(), object()]
self.assertRaises(TypeError, pipeline.After, *futures)
def testAfter(self):
"""Tests the After class."""
pipeline.After._local._after_all_futures = []
futures = [pipeline.PipelineFuture([]), pipeline.PipelineFuture([])]
after = pipeline.After(*futures)
self.assertEquals([], pipeline.After._local._after_all_futures)
after.__enter__()
self.assertEquals(sorted(futures),
sorted(pipeline.After._local._after_all_futures))
self.assertFalse(after.__exit__(None, None, None))
self.assertEquals([], pipeline.After._local._after_all_futures)
def testAfterNested(self):
"""Tests nested behavior of the After class."""
pipeline.After._local._after_all_futures = []
futures = [pipeline.PipelineFuture([]), pipeline.PipelineFuture([])]
after = pipeline.After(*futures)
self.assertEquals([], pipeline.After._local._after_all_futures)
after.__enter__()
self.assertEquals(sorted(futures),
sorted(pipeline.After._local._after_all_futures))
after2 = pipeline.After(*futures)
self.assertEquals(sorted(futures),
sorted(pipeline.After._local._after_all_futures))
after2.__enter__()
self.assertEquals(sorted(futures + futures),
sorted(pipeline.After._local._after_all_futures))
self.assertFalse(after.__exit__(None, None, None))
self.assertEquals(sorted(futures),
sorted(pipeline.After._local._after_all_futures))
self.assertFalse(after.__exit__(None, None, None))
self.assertEquals([], pipeline.After._local._after_all_futures)
def testInOrder(self):
"""Tests the InOrder class."""
pipeline.InOrder._local._in_order_futures = set()
pipeline.InOrder._local._activated = False
inorder = pipeline.InOrder()
self.assertFalse(pipeline.InOrder._local._activated)
self.assertEquals(set(), pipeline.InOrder._local._in_order_futures)
pipeline.InOrder._add_future(object())
self.assertEquals(set(), pipeline.InOrder._local._in_order_futures)
inorder.__enter__()
self.assertTrue(pipeline.InOrder._local._activated)
one, two, three = object(), object(), object()
pipeline.InOrder._add_future(one)
pipeline.InOrder._add_future(two)
pipeline.InOrder._add_future(three)
pipeline.InOrder._add_future(three)
self.assertEquals(set([one, two, three]),
pipeline.InOrder._local._in_order_futures)
inorder.__exit__(None, None, None)
self.assertFalse(pipeline.InOrder._local._activated)
self.assertEquals(set(), pipeline.InOrder._local._in_order_futures)
def testInOrderNested(self):
"""Tests nested behavior of the InOrder class."""
pipeline.InOrder._local._in_order_futures = set()
pipeline.InOrder._local._activated = False
inorder = pipeline.InOrder()
self.assertFalse(pipeline.InOrder._local._activated)
inorder.__enter__()
self.assertTrue(pipeline.InOrder._local._activated)
inorder2 = pipeline.InOrder()
self.assertRaises(pipeline.UnexpectedPipelineError, inorder2.__enter__)
inorder.__exit__(None, None, None)
class EmailOnHighReplicationTest(TestBase):
TEST_APP_ID = 's~my-hrd-app'
def testFinalizeEmailDone_HighReplication(self):
"""Tests completion emails for completed root pipelines on HRD."""
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
stage._context.transition_complete(stage._pipeline_key)
other = OutputlessPipeline.from_id(stage.pipeline_id)
result = []
def fake_mail(self, sender, subject, body, html=None):
result.append((sender, subject, body, html))
old_sendmail = pipeline.Pipeline._send_mail
pipeline.Pipeline._send_mail = fake_mail
try:
other.send_result_email()
finally:
pipeline.Pipeline._send_mail = old_sendmail
self.assertEquals(1, len(result))
sender, subject, body, html = result[0]
self.assertEquals('my-hrd-app@my-hrd-app.appspotmail.com', sender)
self.assertEquals(
'Pipeline successful: App "my-hrd-app", '
'__main__.OutputlessPipeline#banana',
subject)
self.assertEquals(
'View the pipeline results here:\n\n'
'http://my-hrd-app.appspot.com/_ah/pipeline/status?root=banana\n\n'
'Thanks,\n\nThe Pipeline API\n',
body)
self.assertEquals(
'<html><body>\n<p>View the pipeline results here:</p>\n\n<p><a href="'
'http://my-hrd-app.appspot.com/_ah/pipeline/status?root=banana"\n'
'>http://my-hrd-app.appspot.com/_ah/pipeline/status?root=banana'
'</a></p>\n\n<p>\nThanks,\n<br>\nThe Pipeline API\n</p>\n'
'</body></html>\n',
html)
class GenerateArgs(pipeline.Pipeline):
"""Pipeline to test the _generate_args helper function."""
output_names = ['three', 'four']
def run(self, *args, **kwargs):
pass
class UtilitiesTest(TestBase):
"""Tests for module-level utilities."""
def testDereferenceArgsNotFilled(self):
"""Tests when an argument was not filled."""
slot_key = db.Key.from_path(_SlotRecord.kind(), 'myslot')
args = [{'type': 'slot', 'slot_key': str(slot_key)}]
self.assertRaises(pipeline.SlotNotFilledError,
pipeline._dereference_args, 'foo', args, {})
def testDereferenceArgsBadType(self):
"""Tests when a positional argument has a bad type."""
self.assertRaises(pipeline.UnexpectedPipelineError,
pipeline._dereference_args, 'foo', [{'type': 'bad'}], {})
def testDereferenceKwargsBadType(self):
"""Tests when a keyword argument has a bad type."""
self.assertRaises(pipeline.UnexpectedPipelineError,
pipeline._dereference_args, 'foo', [], {'one': {'type': 'bad'}})
def testGenerateArgs(self):
"""Tests generating a parameter dictionary from arguments."""
future = pipeline.PipelineFuture(['one', 'two', 'unused'])
other_future = pipeline.PipelineFuture(['three', 'four'])
future.one.key = db.Key.from_path('First', 'one')
future.two.key = db.Key.from_path('First', 'two')
future.default.key = db.Key.from_path('First', 'three')
future.unused.key = db.Key.from_path('First', 'unused')
other_future.three.key = db.Key.from_path('Second', 'three')
other_future.four.key = db.Key.from_path('Second', 'four')
other_future.default.key = db.Key.from_path('Second', 'four')
other_future._after_all_pipelines.add(future)
# When the parameters are small.
stage = GenerateArgs(future.one, 'some value', future,
red=1234, blue=future.two)
(dependent_slots, output_slot_keys,
params_text, params_blob) = pipeline._generate_args(
stage,
other_future,
'my-queue',
'/base-path')
self.assertEquals(
set([future.one.key, future.default.key, future.two.key]),
dependent_slots)
self.assertEquals(
set([other_future.three.key, other_future.four.key,
other_future.default.key]),
output_slot_keys)
self.assertEquals(None, params_blob)
params = json.loads(params_text)
self.assertEquals(
{
'queue_name': 'my-queue',
'after_all': [str(future.default.key)],
'class_path': '__main__.GenerateArgs',
'args': [
{'slot_key': str(future.one.key),
'type': 'slot'},
{'type': 'value', 'value': 'some value'},
{'slot_key': str(future.default.key),
'type': 'slot'}
],
'base_path': '/base-path',
'kwargs': {
'blue': {'slot_key': str(future.two.key),
'type': 'slot'},
'red': {'type': 'value', 'value': 1234}
},
'output_slots': {
'default': str(other_future.default.key),
'four': str(other_future.four.key),
'three': str(other_future.three.key)
},
'max_attempts': 3,
'backoff_factor': 2,
'backoff_seconds': 15,
'task_retry': False,
'target': 'my-version.foo-module',
}, params)
# When the parameters are big enough we need an external blob.
stage = GenerateArgs(future.one, 'some value' * 1000000, future,
red=1234, blue=future.two)
(dependent_slots, output_slot_keys,
params_text, params_blob) = pipeline._generate_args(
stage,
other_future,
'my-queue',
'/base-path')
self.assertEquals(
set([future.one.key, future.default.key, future.two.key]),
dependent_slots)
self.assertEquals(
set([other_future.three.key, other_future.four.key,
other_future.default.key]),
output_slot_keys)
self.assertEquals(None, params_text)
params = json.loads(blobstore.BlobInfo(params_blob).open().read())
self.assertEquals('some value' * 1000000, params['args'][1]['value'])
def testShortRepr(self):
"""Tests for the _short_repr function."""
my_dict = {
'red': 1,
'two': ['hi'] * 100
}
self.assertEquals(
"{'red': 1, 'two': ['hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi',"
" 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi',"
" 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi', 'hi',"
" '... (619 bytes)",
pipeline._short_repr(my_dict))
class PipelineContextTest(TestBase):
"""Tests for the internal _PipelineContext class."""
def setUp(self):
"""Sets up the test harness."""
TestBase.setUp(self)
self.pipeline1_key = db.Key.from_path(_PipelineRecord.kind(), '1')
self.pipeline2_key = db.Key.from_path(_PipelineRecord.kind(), '2')
self.pipeline3_key = db.Key.from_path(_PipelineRecord.kind(), '3')
self.pipeline4_key = db.Key.from_path(_PipelineRecord.kind(), '4')
self.pipeline5_key = db.Key.from_path(_PipelineRecord.kind(), '5')
self.slot1_key = db.Key.from_path(_SlotRecord.kind(), 'one')
self.slot2_key = db.Key.from_path(_SlotRecord.kind(), 'missing')
self.slot3_key = db.Key.from_path(_SlotRecord.kind(), 'three')
self.slot4_key = db.Key.from_path(_SlotRecord.kind(), 'four')
self.slot1 = _SlotRecord(
key=self.slot1_key,
status=_SlotRecord.FILLED)
self.slot3 = _SlotRecord(
key=self.slot3_key,
status=_SlotRecord.WAITING)
self.slot4 = _SlotRecord(
key=self.slot4_key,
status=_SlotRecord.FILLED)
self.barrier1, self.barrier1_index1 = (
pipeline._PipelineContext._create_barrier_entities(
self.pipeline1_key,
self.pipeline1_key,
_BarrierRecord.FINALIZE,
[self.slot1_key]))
self.barrier2, self.barrier2_index1, self.barrier2_index3 = (
pipeline._PipelineContext._create_barrier_entities(
self.pipeline2_key,
self.pipeline2_key,
_BarrierRecord.START,
[self.slot1_key, self.slot3_key]))
self.barrier3, self.barrier3_index1, self.barrier3_index4 = (
pipeline._PipelineContext._create_barrier_entities(
self.pipeline3_key,
self.pipeline3_key,
_BarrierRecord.START,
[self.slot1_key, self.slot4_key]))
self.barrier3.status = _BarrierRecord.FIRED
self.barrier4 = _BarrierRecord(
parent=self.pipeline4_key,
key_name=_BarrierRecord.START,
root_pipeline=self.pipeline4_key,
target=self.pipeline4_key,
blocking_slots=[self.slot1_key, self.slot2_key],
status=_BarrierRecord.FIRED)
self.barrier4, self.barrier4_index1, self.barrier4_index2 = (
pipeline._PipelineContext._create_barrier_entities(
self.pipeline4_key,
self.pipeline4_key,
_BarrierRecord.START,
[self.slot1_key, self.slot2_key]))
self.barrier4.status = _BarrierRecord.FIRED
self.barrier5, self.barrier5_index1 = (
pipeline._PipelineContext._create_barrier_entities(
self.pipeline5_key,
self.pipeline5_key,
_BarrierRecord.START,
[self.slot1_key]))
self.context = pipeline._PipelineContext(
'my-task1', 'default', '/base-path')
def testNotifyBarrierFire_WithBarrierIndexes(self):
"""Tests barrier firing behavior."""
self.assertEquals(_BarrierRecord.WAITING, self.barrier1.status)
self.assertEquals(_BarrierRecord.WAITING, self.barrier2.status)
self.assertEquals(_BarrierRecord.FIRED, self.barrier3.status)
self.assertTrue(self.barrier3.trigger_time is None)
self.assertEquals(_BarrierRecord.FIRED, self.barrier4.status)
self.assertEquals(_BarrierRecord.WAITING, self.barrier5.status)
db.put([self.barrier1, self.barrier2, self.barrier3, self.barrier4,
self.barrier5, self.slot1, self.slot3, self.slot4,
self.barrier1_index1, self.barrier2_index1, self.barrier2_index3,
self.barrier3_index1, self.barrier3_index4, self.barrier4_index1,
self.barrier4_index2, self.barrier5_index1])
self.context.notify_barriers(
self.slot1_key,
None,
use_barrier_indexes=True,
max_to_notify=3)
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(3, len(task_list))
task_list.sort(key=lambda x: x['name']) # For deterministic tests.
first_task, second_task, continuation_task = task_list
self.assertEquals(
{'pipeline_key': [str(self.pipeline1_key)],
'purpose': [_BarrierRecord.FINALIZE]},
first_task['params'])
self.assertEquals('/base-path/finalized', first_task['url'])
self.assertEquals(
{'pipeline_key': [str(self.pipeline3_key)],
'purpose': [_BarrierRecord.START]},
second_task['params'])
self.assertEquals('/base-path/run', second_task['url'])
self.assertEquals('/base-path/output', continuation_task['url'])
self.assertEquals(
[str(self.slot1_key)], continuation_task['params']['slot_key'])
self.assertEquals(
'my-task1-ae-barrier-notify-0',
continuation_task['name'])
barrier1, barrier2, barrier3 = db.get(
[self.barrier1.key(), self.barrier2.key(), self.barrier3.key()])
self.assertEquals(_BarrierRecord.FIRED, barrier1.status)
self.assertTrue(barrier1.trigger_time is not None)
self.assertEquals(_BarrierRecord.WAITING, barrier2.status)
self.assertTrue(barrier2.trigger_time is None)
# NOTE: This barrier relies on slots 1 and 4, to force the "blocking slots"
# inner loop to be excerised. By putting slot4 last on the last barrier
# tested in the loop, we ensure that any inner-loop variables do not pollute
# the outer function context.
self.assertEquals(_BarrierRecord.FIRED, barrier3.status)
# Show that if the _BarrierRecord was already in the FIRED state that it
# will not be overwritten again and have its trigger_time changed.
self.assertTrue(barrier3.trigger_time is None)
# Run the first continuation task. It should raise an error because slot2
# does not exist.
self.context.task_name = 'my-task1-ae-barrier-notify-0'
self.assertRaises(
pipeline.UnexpectedPipelineError,
functools.partial(
self.context.notify_barriers,
self.slot1_key,
continuation_task['params']['cursor'][0],
use_barrier_indexes=True,
max_to_notify=2))
# No tasks should be added because the exception was raised.
task_list = test_shared.get_tasks()
self.assertEquals([], task_list)
# Adding slot2 should allow forward progress.
slot2 = _SlotRecord(
key=self.slot2_key,
status=_SlotRecord.WAITING)
db.put(slot2)
self.context.notify_barriers(
self.slot1_key,
continuation_task['params']['cursor'][0],
use_barrier_indexes=True,
max_to_notify=2)
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(2, len(task_list))
third_task, continuation2_task = task_list
self.assertEquals(
{'pipeline_key': [str(self.pipeline5_key)],
'purpose': [_BarrierRecord.START]},
third_task['params'])
self.assertEquals('/base-path/run', third_task['url'])
self.assertEquals('/base-path/output', continuation2_task['url'])
self.assertEquals(
[str(self.slot1_key)], continuation2_task['params']['slot_key'])
self.assertEquals(
'my-task1-ae-barrier-notify-1',
continuation2_task['name'])
barrier4, barrier5 = db.get([self.barrier4.key(), self.barrier5.key()])
self.assertEquals(_BarrierRecord.FIRED, barrier4.status)
# Shows that the _BarrierRecord entity was not overwritten.
self.assertTrue(barrier4.trigger_time is None)
self.assertEquals(_BarrierRecord.FIRED, barrier5.status)
self.assertTrue(barrier5.trigger_time is not None)
# Running the continuation task again will re-tigger the barriers,
# but no tasks will be inserted because they're already tombstoned.
self.context.task_name = 'my-task1-ae-barrier-notify-0'
self.context.notify_barriers(
self.slot1_key,
continuation_task['params']['cursor'][0],
use_barrier_indexes=True,
max_to_notify=2)
self.assertEquals(0, len(test_shared.get_tasks()))
# Running the last continuation task will do nothing.
self.context.task_name = 'my-task1-ae-barrier-notify-1'
self.context.notify_barriers(
self.slot1_key,
continuation2_task['params']['cursor'][0],
use_barrier_indexes=True,
max_to_notify=2)
self.assertEquals(0, len(test_shared.get_tasks()))
def testNotifyBarrierFire_WithBarrierIndexes_BarrierMissing(self):
"""Tests _BarrierIndex firing when a _BarrierRecord is missing."""
self.assertEquals(_BarrierRecord.WAITING, self.barrier1.status)
db.put([self.slot1, self.barrier1_index1])
# The _BarrierRecord corresponding to barrier1_index1 is never put, which
# will cause notify_barriers to fail with a missing barrier error.
self.assertNotEquals(None, db.get(self.barrier1_index1.key()))
self.assertEquals(None, db.get(self.barrier1.key()))
# This doesn't raise an exception.
self.context.notify_barriers(
self.slot1_key,
None,
use_barrier_indexes=True,
max_to_notify=3)
def testNotifyBarrierFire_NoBarrierIndexes(self):
"""Tests barrier firing behavior without using _BarrierIndexes."""
self.assertEquals(_BarrierRecord.WAITING, self.barrier1.status)
self.assertEquals(_BarrierRecord.WAITING, self.barrier2.status)
self.assertEquals(_BarrierRecord.FIRED, self.barrier3.status)
self.assertTrue(self.barrier3.trigger_time is None)
self.assertEquals(_BarrierRecord.FIRED, self.barrier4.status)
self.assertEquals(_BarrierRecord.WAITING, self.barrier5.status)
db.put([self.barrier1, self.barrier2, self.barrier3, self.barrier4,
self.barrier5, self.slot1, self.slot3, self.slot4])
self.context.notify_barriers(
self.slot1_key,
None,
use_barrier_indexes=False,
max_to_notify=3)
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(3, len(task_list))
task_list.sort(key=lambda x: x['name']) # For deterministic tests.
first_task, second_task, continuation_task = task_list
self.assertEquals(
{'pipeline_key': [str(self.pipeline1_key)],
'purpose': [_BarrierRecord.FINALIZE]},
first_task['params'])
self.assertEquals('/base-path/finalized', first_task['url'])
self.assertEquals(
{'pipeline_key': [str(self.pipeline3_key)],
'purpose': [_BarrierRecord.START]},
second_task['params'])
self.assertEquals('/base-path/run', second_task['url'])
self.assertEquals('/base-path/output', continuation_task['url'])
self.assertEquals(
[str(self.slot1_key)], continuation_task['params']['slot_key'])
self.assertEquals(
'my-task1-ae-barrier-notify-0',
continuation_task['name'])
barrier1, barrier2, barrier3 = db.get(
[self.barrier1.key(), self.barrier2.key(), self.barrier3.key()])
self.assertEquals(_BarrierRecord.FIRED, barrier1.status)
self.assertTrue(barrier1.trigger_time is not None)
self.assertEquals(_BarrierRecord.WAITING, barrier2.status)
self.assertTrue(barrier2.trigger_time is None)
# NOTE: This barrier relies on slots 1 and 4, to force the "blocking slots"
# inner loop to be excerised. By putting slot4 last on the last barrier
# tested in the loop, we ensure that any inner-loop variables do not pollute
# the outer function context.
self.assertEquals(_BarrierRecord.FIRED, barrier3.status)
# Show that if the _BarrierRecord was already in the FIRED state that it
# will not be overwritten again and have its trigger_time changed.
self.assertTrue(barrier3.trigger_time is None)
# Run the first continuation task. It should raise an error because slot2
# does not exist.
self.context.task_name = 'my-task1-ae-barrier-notify-0'
self.assertRaises(
pipeline.UnexpectedPipelineError,
functools.partial(
self.context.notify_barriers,
self.slot1_key,
continuation_task['params']['cursor'][0],
use_barrier_indexes=False,
max_to_notify=2))
# No tasks should be added because the exception was raised.
task_list = test_shared.get_tasks()
self.assertEquals([], task_list)
# Adding slot2 should allow forward progress.
slot2 = _SlotRecord(
key=self.slot2_key,
status=_SlotRecord.WAITING)
db.put(slot2)
self.context.notify_barriers(
self.slot1_key,
continuation_task['params']['cursor'][0],
use_barrier_indexes=False,
max_to_notify=2)
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(2, len(task_list))
third_task, continuation2_task = task_list
self.assertEquals(
{'pipeline_key': [str(self.pipeline5_key)],
'purpose': [_BarrierRecord.START]},
third_task['params'])
self.assertEquals('/base-path/run', third_task['url'])
self.assertEquals('/base-path/output', continuation2_task['url'])
self.assertEquals(
[str(self.slot1_key)], continuation2_task['params']['slot_key'])
self.assertEquals(
'my-task1-ae-barrier-notify-1',
continuation2_task['name'])
barrier4, barrier5 = db.get([self.barrier4.key(), self.barrier5.key()])
self.assertEquals(_BarrierRecord.FIRED, barrier4.status)
# Shows that the _BarrierRecord entity was not overwritten.
self.assertTrue(barrier4.trigger_time is None)
self.assertEquals(_BarrierRecord.FIRED, barrier5.status)
self.assertTrue(barrier5.trigger_time is not None)
# Running the continuation task again will re-tigger the barriers,
# but no tasks will be inserted because they're already tombstoned.
self.context.task_name = 'my-task1-ae-barrier-notify-0'
self.context.notify_barriers(
self.slot1_key,
continuation_task['params']['cursor'][0],
use_barrier_indexes=False,
max_to_notify=2)
self.assertEquals(0, len(test_shared.get_tasks()))
# Running the last continuation task will do nothing.
self.context.task_name = 'my-task1-ae-barrier-notify-1'
self.context.notify_barriers(
self.slot1_key,
continuation2_task['params']['cursor'][0],
use_barrier_indexes=False,
max_to_notify=2)
self.assertEquals(0, len(test_shared.get_tasks()))
def testTransitionRunMissing(self):
"""Tests transition_run when the _PipelineRecord is missing."""
self.assertTrue(db.get(self.pipeline1_key) is None)
self.context.transition_run(self.pipeline1_key)
# That's it. No exception raised.
def testTransitionRunBadStatus(self):
"""Tests transition_run when the _PipelineRecord.status is bad."""
pipeline_record = _PipelineRecord(
status=_PipelineRecord.DONE,
key=self.pipeline1_key)
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.context.transition_run(self.pipeline1_key)
# That's it. No exception raised.
def testTransitionRunMissingBarrier(self):
"""Tests transition_run when the finalization _BarrierRecord is missing."""
pipeline_record = _PipelineRecord(
status=_PipelineRecord.WAITING,
key=self.pipeline1_key)
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.assertRaises(pipeline.UnexpectedPipelineError,
self.context.transition_run,
self.pipeline1_key,
blocking_slot_keys=[self.slot1_key])
def testTransitionCompleteMissing(self):
"""Tests transition_complete when the _PipelineRecord is missing."""
self.assertTrue(db.get(self.pipeline1_key) is None)
self.context.transition_complete(self.pipeline1_key)
# That's it. No exception raised.
def testTransitionCompleteBadStatus(self):
"""Tests transition_complete when the _PipelineRecord.status is bad."""
pipeline_record = _PipelineRecord(
status=_PipelineRecord.DONE,
key=self.pipeline1_key)
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.context.transition_complete(self.pipeline1_key)
# That's it. No exception raised.
def testTransitionRetryMissing(self):
"""Tests transition_retry when the _PipelineRecord is missing."""
self.assertTrue(db.get(self.pipeline1_key) is None)
self.assertFalse(
self.context.transition_retry(self.pipeline1_key, 'my message'))
# No exception raised.
self.assertEquals(0, len(test_shared.get_tasks()))
def testTransitionRetryBadStatus(self):
"""Tests transition_retry when the _PipelineRecord.status is bad."""
pipeline_record = _PipelineRecord(
status=_PipelineRecord.DONE,
key=self.pipeline1_key)
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.assertFalse(
self.context.transition_retry(self.pipeline1_key, 'my message'))
# No exception raised.
self.assertEquals(0, len(test_shared.get_tasks()))
def testTransitionRetryMaxFailures(self):
"""Tests transition_retry when _PipelineRecord.max_attempts is exceeded."""
params = {
'backoff_seconds': 10,
'backoff_factor': 1.5,
'max_attempts': 15,
'task_retry': False,
}
pipeline_record = _PipelineRecord(
status=_PipelineRecord.WAITING,
key=self.pipeline1_key,
max_attempts=5,
current_attempt=4,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params),
root_pipeline=self.pipeline5_key)
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.assertFalse(
self.context.transition_retry(self.pipeline1_key, 'my message'))
# A finalize task should be enqueued.
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(1, len(task_list))
self.assertEquals('/base-path/fanout_abort', task_list[0]['url'])
self.assertEquals(
{'root_pipeline_key': [str(self.pipeline5_key)]},
task_list[0]['params'])
def testTransitionRetryTaskParams(self):
"""Tests that transition_retry will enqueue retry tasks properly.
Attempts multiple retries and verifies ETAs and task parameters.
"""
params = {
'backoff_seconds': 12,
'backoff_factor': 1.5,
'max_attempts': 5,
'task_retry': False,
}
pipeline_record = _PipelineRecord(
status=_PipelineRecord.WAITING,
key=self.pipeline1_key,
max_attempts=5,
current_attempt=0,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params),
root_pipeline=self.pipeline5_key)
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
start_time = datetime.datetime.now()
when_list = [
start_time + datetime.timedelta(seconds=(30 * i))
for i in xrange(5)
]
closure_when_list = list(when_list)
def fake_gettime():
return closure_when_list.pop(0)
self.context._gettime = fake_gettime
for attempt, delay_seconds in enumerate([12, 18, 27, 40.5]):
self.context.transition_retry(
self.pipeline1_key, 'my message %d' % attempt)
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(1, len(task_list))
task = task_list[0]
self.assertEquals('/base-path/run', task['url'])
self.assertEquals(
{
'pipeline_key': [str(self.pipeline1_key)],
'attempt': [str(attempt + 1)],
'purpose': ['start']
}, task['params'])
next_eta = when_list[attempt] + datetime.timedelta(seconds=delay_seconds)
self.assertEquals(next_eta, test_shared.utc_to_local(task['eta']))
pipeline_record = db.get(self.pipeline1_key)
self.assertEquals(attempt + 1, pipeline_record.current_attempt)
self.assertEquals(next_eta, pipeline_record.next_retry_time)
self.assertEquals('my message %d' % attempt,
pipeline_record.retry_message)
# Simulate last attempt.
self.context.transition_retry(self.pipeline1_key, 'my message 5')
# A finalize task should be enqueued.
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(1, len(task_list))
self.assertEquals('/base-path/fanout_abort', task_list[0]['url'])
self.assertEquals(
{'root_pipeline_key': [str(self.pipeline5_key)]},
task_list[0]['params'])
def testBeginAbortMissing(self):
"""Tests begin_abort when the pipeline is missing."""
self.assertTrue(db.get(self.pipeline1_key) is None)
self.assertFalse(
self.context.begin_abort(self.pipeline1_key, 'error message'))
def testBeginAbortAlreadyAborted(self):
"""Tests begin_abort when the pipeline was already aborted."""
params = {
'backoff_seconds': 12,
'backoff_factor': 1.5,
'max_attempts': 5,
'task_retry': False,
}
pipeline_record = _PipelineRecord(
status=_PipelineRecord.ABORTED,
abort_requested=False,
key=self.pipeline1_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params))
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.assertFalse(
self.context.begin_abort(self.pipeline1_key, 'error message'))
def testBeginAbortAlreadySignalled(self):
"""Tests begin_abort when the pipeline has already been signalled."""
params = {
'backoff_seconds': 12,
'backoff_factor': 1.5,
'max_attempts': 5,
'task_retry': False,
}
pipeline_record = _PipelineRecord(
status=_PipelineRecord.WAITING,
abort_requested=True,
key=self.pipeline1_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params))
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.assertFalse(
self.context.begin_abort(self.pipeline1_key, 'error message'))
def testBeginAbortTaskEnqueued(self):
"""Tests that a successful begin_abort will enqueue an abort task."""
params = {
'backoff_seconds': 12,
'backoff_factor': 1.5,
'max_attempts': 5,
'task_retry': False,
}
pipeline_record = _PipelineRecord(
status=_PipelineRecord.RUN,
key=self.pipeline1_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params))
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.assertTrue(
self.context.begin_abort(self.pipeline1_key, 'error message'))
# A finalize task should be enqueued.
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(1, len(task_list))
self.assertEquals('/base-path/fanout_abort', task_list[0]['url'])
self.assertEquals(
{'root_pipeline_key': [str(self.pipeline1_key)]},
task_list[0]['params'])
def testContinueAbort(self):
"""Tests the whole life cycle of continue_abort."""
params = {
'backoff_seconds': 12,
'backoff_factor': 1.5,
'max_attempts': 5,
'task_retry': False,
}
pipeline_record1 = _PipelineRecord(
status=_PipelineRecord.RUN,
key=self.pipeline1_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params),
root_pipeline=self.pipeline1_key)
pipeline_record2 = _PipelineRecord(
status=_PipelineRecord.RUN,
key=self.pipeline2_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params),
root_pipeline=self.pipeline1_key)
pipeline_record3 = _PipelineRecord(
status=_PipelineRecord.RUN,
key=self.pipeline3_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params),
root_pipeline=self.pipeline1_key)
pipeline_record4 = _PipelineRecord(
status=_PipelineRecord.ABORTED,
key=self.pipeline4_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params),
root_pipeline=self.pipeline1_key)
pipeline_record5 = _PipelineRecord(
status=_PipelineRecord.DONE,
key=self.pipeline5_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params),
root_pipeline=self.pipeline1_key)
db.put([pipeline_record1, pipeline_record2, pipeline_record3,
pipeline_record4, pipeline_record5])
self.context.continue_abort(self.pipeline1_key, max_to_notify=2)
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(3, len(task_list))
# For deterministic tests.
task_list.sort(key=lambda x: x['params'].get('pipeline_key'))
continuation_task, first_task, second_task = task_list
# Abort for the first pipeline
self.assertEquals('/base-path/abort', first_task['url'])
self.assertEquals(
{'pipeline_key': [str(self.pipeline1_key)],
'purpose': ['abort']},
first_task['params'])
# Abort for the second pipeline
self.assertEquals('/base-path/abort', second_task['url'])
self.assertEquals(
{'pipeline_key': [str(self.pipeline2_key)],
'purpose': ['abort']},
second_task['params'])
# Continuation
self.assertEquals('/base-path/fanout_abort', continuation_task['url'])
self.assertEquals(set(['cursor', 'root_pipeline_key']),
set(continuation_task['params'].keys()))
self.assertEquals(str(self.pipeline1_key),
continuation_task['params']['root_pipeline_key'][0])
self.assertTrue(continuation_task['name'].endswith('-0'))
cursor = continuation_task['params']['cursor'][0]
# Now run the continuation task
self.context.task_name = continuation_task['name']
self.context.continue_abort(
self.pipeline1_key, cursor=cursor, max_to_notify=1)
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(2, len(task_list))
# For deterministic tests.
task_list.sort(key=lambda x: x['params'].get('pipeline_key'))
second_continuation_task, fifth_task = task_list
# Abort for the third pipeline
self.assertEquals('/base-path/abort', fifth_task['url'])
self.assertEquals(
{'pipeline_key': [str(self.pipeline3_key)],
'purpose': ['abort']},
fifth_task['params'])
# Another continuation
self.assertEquals('/base-path/fanout_abort',
second_continuation_task['url'])
self.assertEquals(set(['cursor', 'root_pipeline_key']),
set(second_continuation_task['params'].keys()))
self.assertEquals(
str(self.pipeline1_key),
second_continuation_task['params']['root_pipeline_key'][0])
self.assertTrue(second_continuation_task['name'].endswith('-1'))
cursor2 = second_continuation_task['params']['cursor'][0]
# Now run another continuation task.
self.context.task_name = second_continuation_task['name']
self.context.continue_abort(
self.pipeline1_key, cursor=cursor2, max_to_notify=2)
# This task will find two pipelines that are already in terminal states,
# and skip then.
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(1, len(task_list))
third_continuation_task = task_list[0]
self.assertEquals('/base-path/fanout_abort',
third_continuation_task['url'])
self.assertEquals(set(['cursor', 'root_pipeline_key']),
set(third_continuation_task['params'].keys()))
self.assertEquals(
str(self.pipeline1_key),
third_continuation_task['params']['root_pipeline_key'][0])
self.assertTrue(third_continuation_task['name'].endswith('-2'))
cursor3 = third_continuation_task['params']['cursor'][0]
# Run the third continuation task, which will do nothing.
self.context.task_name = second_continuation_task['name']
self.context.continue_abort(
self.pipeline1_key, cursor=cursor3, max_to_notify=2)
# Nothing left to do.
task_list = test_shared.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(0, len(task_list))
def testTransitionAbortedMissing(self):
"""Tests transition_aborted when the pipeline is missing."""
self.assertTrue(db.get(self.pipeline1_key) is None)
self.context.transition_aborted(self.pipeline1_key)
# That's it. No exception raised.
def testTransitionAbortedBadStatus(self):
"""Tests transition_aborted when the pipeline is in a bad state."""
params = {
'backoff_seconds': 12,
'backoff_factor': 1.5,
'max_attempts': 5,
'task_retry': False,
}
finalized_time = datetime.datetime.now()
pipeline_record = _PipelineRecord(
status=_PipelineRecord.ABORTED,
key=self.pipeline1_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params),
finalized_time=finalized_time)
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.context.transition_aborted(self.pipeline1_key)
# Finalized time will stay the same.
after_record = db.get(self.pipeline1_key)
self.assertEquals(pipeline_record.finalized_time,
after_record.finalized_time)
def testTransitionAbortedSuccess(self):
"""Tests when transition_aborted is successful."""
params = {
'backoff_seconds': 12,
'backoff_factor': 1.5,
'max_attempts': 5,
'task_retry': False,
}
pipeline_record = _PipelineRecord(
status=_PipelineRecord.WAITING,
key=self.pipeline1_key,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(params))
pipeline_record.put()
self.assertTrue(db.get(self.pipeline1_key) is not None)
self.context.transition_aborted(self.pipeline1_key)
after_record = db.get(self.pipeline1_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
self.assertTrue(pipeline_record.finalized_time is None)
self.assertTrue(isinstance(after_record.finalized_time, datetime.datetime))
class EvaluateErrorTest(test_shared.TaskRunningMixin, TestBase):
"""Task execution tests for error situations."""
def setUp(self):
"""Sets up the test harness."""
super(EvaluateErrorTest, self).setUp()
self.pipeline_key = db.Key.from_path(_PipelineRecord.kind(), '1')
self.slot_key = db.Key.from_path(_SlotRecord.kind(), 'red')
self.context = pipeline._PipelineContext(
'my-task1', 'default', '/base-path')
def testPipelineMissing(self):
"""Tests running a pipeline key that's disappeared."""
self.assertTrue(db.get(self.pipeline_key) is None)
self.context.evaluate(self.pipeline_key)
# That's it. No exception raised.
def testPipelineBadStatus(self):
"""Tests running a pipeline that has an invalid status."""
pipeline_record = _PipelineRecord(
status=_PipelineRecord.DONE,
key=self.pipeline_key)
pipeline_record.put()
self.assertTrue(db.get(self.pipeline_key) is not None)
self.context.evaluate(self.pipeline_key)
def testDefaultSlotMissing(self):
"""Tests when the default slot is missing."""
pipeline_record = _PipelineRecord(
root_pipeline=self.pipeline_key,
status=_PipelineRecord.WAITING,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps({
'output_slots': {'default': str(self.slot_key)}}),
key=self.pipeline_key)
pipeline_record.put()
self.assertTrue(db.get(self.slot_key) is None)
self.assertTrue(db.get(self.pipeline_key) is not None)
self.context.evaluate(self.pipeline_key)
# That's it. No exception raised.
def testRootPipelineMissing(self):
"""Tests when the root pipeline record is missing."""
missing_key = db.Key.from_path(_PipelineRecord.kind(), 'unknown')
slot_record = _SlotRecord(key=self.slot_key)
slot_record.put()
pipeline_record = _PipelineRecord(
root_pipeline=missing_key,
status=_PipelineRecord.WAITING,
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps({
'output_slots': {'default': str(self.slot_key)}}),
key=self.pipeline_key)
pipeline_record.put()
self.assertTrue(db.get(missing_key) is None)
self.assertTrue(db.get(self.slot_key) is not None)
self.assertTrue(db.get(self.pipeline_key) is not None)
self.context.evaluate(self.pipeline_key)
# That's it. No exception raised.
def testResolutionError(self):
"""Tests when the pipeline class couldn't be found."""
slot_record = _SlotRecord(key=self.slot_key)
slot_record.put()
pipeline_record = _PipelineRecord(
root_pipeline=self.pipeline_key,
status=_PipelineRecord.WAITING,
class_path='does.not.exist',
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps({
'output_slots': {'default': str(self.slot_key)}}),
key=self.pipeline_key)
pipeline_record.put()
self.assertTrue(db.get(self.slot_key) is not None)
self.assertTrue(db.get(self.pipeline_key) is not None)
self.assertRaises(ImportError, self.context.evaluate, self.pipeline_key)
class DumbSync(pipeline.Pipeline):
"""A dumb pipeline that's synchronous."""
def run(self, *args):
pass
class DumbAsync(pipeline.Pipeline):
"""A dumb pipeline that's asynchronous."""
async = True
def run(self):
self.complete()
class DumbGenerator(pipeline.Pipeline):
"""A dumb pipeline that's a generator that yeilds nothing."""
def run(self):
if False:
yield 1
class DumbGeneratorYields(pipeline.Pipeline):
"""A dumb pipeline that's a generator that yields something."""
def run(self, block=False):
yield DumbSync(1)
result = yield DumbSync(2)
if block:
yield DumbSync(3, result)
class DiesOnCreation(pipeline.Pipeline):
"""A pipeline that raises an exception on insantiation."""
def __init__(self, *args, **kwargs):
raise Exception('This will not work!')
class DiesOnRun(pipeline.Pipeline):
"""A pipeline that raises an exception when it's executed."""
def run(self):
raise Exception('Cannot run this one!')
class RetryAfterYield(pipeline.Pipeline):
"""A generator pipeline that raises a Retry exception after yielding once."""
def run(self):
yield DumbSync()
raise pipeline.Retry('I want to retry now!')
class DiesAfterYield(pipeline.Pipeline):
"""A generator pipeline that dies after yielding once."""
def run(self):
yield DumbSync()
raise Exception('Whoops I will die now!')
class RetriesOnRun(pipeline.Pipeline):
"""A pipeline that raises a Retry exception on run."""
def run(self):
raise pipeline.Retry('Gotta go and retry now!')
class AbortsOnRun(pipeline.Pipeline):
"""A pipeline that raises an Abort exception on run."""
def run(self):
raise pipeline.Abort('Gotta go and abort now!')
class AsyncCannotAbort(pipeline.Pipeline):
"""An async pipeline that cannot be aborted once active."""
async = True
def run(self):
pass
class AbortAfterYield(pipeline.Pipeline):
"""A generator pipeline that raises an Abort exception after yielding once."""
def run(self):
yield DumbSync()
raise pipeline.Abort('I want to abort now!')
class AsyncCanAbort(pipeline.Pipeline):
"""An async pipeline that cannot be aborted once active."""
async = True
def run(self):
pass
def try_cancel(self):
return True
class SyncMissedOutput(pipeline.Pipeline):
"""A sync pipeline that forgets to fill in a named output slot."""
output_names = ['another']
def run(self):
return 5
class GeneratorMissedOutput(pipeline.Pipeline):
"""A generator pipeline that forgets to fill in a named output slot."""
output_names = ['another']
def run(self):
if False:
yield 1
class TaskRunningTest(test_shared.TaskRunningMixin, TestBase):
"""End-to-end tests for task-running and race-condition situations.
Many of these are cases where an executor task runs for a second time when
it shouldn't have or some kind of transient error occurred.
"""
def setUp(self):
"""Sets up the test harness."""
super(TaskRunningTest, self).setUp()
self.pipeline_key = db.Key.from_path(_PipelineRecord.kind(), 'one')
self.pipeline2_key = db.Key.from_path(_PipelineRecord.kind(), 'two')
self.slot_key = db.Key.from_path(_SlotRecord.kind(), 'red')
self.slot_record = _SlotRecord(key=self.slot_key)
self.pipeline_record = _PipelineRecord(
root_pipeline=self.pipeline_key,
status=_PipelineRecord.WAITING,
class_path='does.not.exist',
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps({
'output_slots': {'default': str(self.slot_key)},
'args': [],
'kwargs': {},
'task_retry': False,
'backoff_seconds': 1,
'backoff_factor': 2,
'max_attempts': 4,
'queue_name': 'default',
'base_path': '',
}),
key=self.pipeline_key,
max_attempts=4)
self.barrier_record = _BarrierRecord(
parent=self.pipeline_key,
key_name=_BarrierRecord.FINALIZE,
target=self.pipeline_key,
root_pipeline=self.pipeline_key,
blocking_slots=[self.slot_key])
self.context = pipeline._PipelineContext(
'my-task1', 'default', '/base-path')
def testSubstagesRunImmediately(self):
"""Tests that sub-stages with no blocking slots are run immediately."""
self.pipeline_record.class_path = '__main__.DumbGeneratorYields'
db.put([self.pipeline_record, self.slot_record, self.barrier_record])
before_record = db.get(self.pipeline_key)
self.assertEquals([], before_record.fanned_out)
self.context.evaluate(self.pipeline_key)
after_record = db.get(self.pipeline_key)
self.assertEquals(2, len(after_record.fanned_out))
child1_key, child2_key = after_record.fanned_out
task_list = test_shared.get_tasks()
self.assertEquals(1, len(task_list))
fanout_task = task_list[0]
# Verify that the start time is set for non-blocked child pipelines.
child_record_list = db.get(after_record.fanned_out)
for child_record in child_record_list:
self.assertTrue(child_record.start_time is not None)
# One fan-out task with both children.
self.assertEquals(
[str(self.pipeline_key)],
fanout_task['params']['parent_key'])
self.assertEquals(
['0', '1'],
fanout_task['params']['child_indexes'])
self.assertEquals('/base-path/fanout', fanout_task['url'])
# Only finalization barriers present.
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.START,
parent=child1_key)) is None)
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.START,
parent=child2_key)) is None)
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.FINALIZE,
parent=child1_key)) is not None)
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.FINALIZE,
parent=child2_key)) is not None)
def testSubstagesBlock(self):
"""Tests that sub-stages with pending inputs will have a barrier added."""
self.pipeline_record.class_path = '__main__.DumbGeneratorYields'
params = self.pipeline_record.params.copy()
params.update({
'output_slots': {'default': str(self.slot_key)},
'args': [{'type': 'value', 'value': True}],
'kwargs': {},
})
self.pipeline_record.params_text = json.dumps(params)
db.put([self.pipeline_record, self.slot_record, self.barrier_record])
before_record = db.get(self.pipeline_key)
self.assertEquals([], before_record.fanned_out)
self.context.evaluate(self.pipeline_key)
after_record = db.get(self.pipeline_key)
self.assertEquals(3, len(after_record.fanned_out))
task_list = test_shared.get_tasks()
self.assertEquals(1, len(task_list))
fanout_task = task_list[0]
# Only two children should start.
self.assertEquals('/base-path/fanout', fanout_task['url'])
self.assertEquals(
[str(self.pipeline_key)],
fanout_task['params']['parent_key'])
self.assertEquals(
['0', '1'],
fanout_task['params']['child_indexes'])
run_children = set(after_record.fanned_out[int(i)]
for i in fanout_task['params']['child_indexes'])
self.assertEquals(2, len(run_children))
child1_key, child2_key = run_children
other_child_key = list(set(after_record.fanned_out) - run_children)[0]
# Only a start barrier inserted for the one pending child.
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.START,
parent=child1_key)) is None)
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.START,
parent=child2_key)) is None)
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.START,
parent=other_child_key)) is not None)
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.FINALIZE,
parent=child1_key)) is not None)
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.FINALIZE,
parent=child2_key)) is not None)
self.assertTrue(db.get(
db.Key.from_path(_BarrierRecord.kind(), _BarrierRecord.FINALIZE,
parent=other_child_key)) is not None)
def testFannedOutOrdering(self):
"""Tests that the fanned_out property lists children in code order."""
self.pipeline_record.class_path = '__main__.DumbGeneratorYields'
params = self.pipeline_record.params.copy()
params.update({
'output_slots': {'default': str(self.slot_key)},
'args': [{'type': 'value', 'value': True}],
'kwargs': {},
})
self.pipeline_record.params_text = json.dumps(params)
db.put([self.pipeline_record, self.slot_record, self.barrier_record])
before_record = db.get(self.pipeline_key)
self.assertEquals([], before_record.fanned_out)
self.context.evaluate(self.pipeline_key)
after_record = db.get(self.pipeline_key)
self.assertEquals(3, len(after_record.fanned_out))
children = db.get(after_record.fanned_out)
self.assertEquals(1, children[0].params['args'][0]['value'])
self.assertEquals(2, children[1].params['args'][0]['value'])
self.assertEquals(3, children[2].params['args'][0]['value'])
def testSyncWaitingStartRerun(self):
"""Tests a waiting, sync pipeline being re-run after it already output."""
self.pipeline_record.class_path = '__main__.DumbSync'
db.put([self.pipeline_record, self.slot_record])
before_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.WAITING, before_record.status)
self.assertTrue(before_record.fill_time is None)
self.context.evaluate(self.pipeline_key)
after_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.FILLED, after_record.status)
self.assertTrue(after_record.fill_time is not None)
after_pipeline = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_pipeline.status)
self.context.evaluate(self.pipeline_key)
second_after_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.FILLED, second_after_record.status)
self.assertTrue(second_after_record.fill_time is not None)
# The output slot fill times are different, which means the pipeline re-ran.
self.assertNotEquals(second_after_record.fill_time, after_record.fill_time)
def testSyncFinalizingRerun(self):
"""Tests a finalizing, sync pipeline task being re-run."""
self.pipeline_record.class_path = '__main__.DumbSync'
self.slot_record.status = _SlotRecord.FILLED
self.slot_record.value_text = json.dumps(None)
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
second_after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
# Finalized time will stay the same.
self.assertEquals(after_record.finalized_time,
second_after_record.finalized_time)
def testSyncDoneFinalizeRerun(self):
"""Tests a done, sync pipeline task being re-refinalized."""
now = datetime.datetime.utcnow()
self.pipeline_record.class_path = '__main__.DumbSync'
self.pipeline_record.status = _PipelineRecord.DONE
self.pipeline_record.finalized_time = now
self.slot_record.status = _SlotRecord.FILLED
self.slot_record.value_text = json.dumps(None)
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
# Finalize time stays the same.
self.assertEquals(now, after_record.finalized_time)
def testAsyncWaitingRerun(self):
"""Tests a waiting, async pipeline task being re-run."""
self.pipeline_record.class_path = '__main__.DumbAsync'
db.put([self.pipeline_record, self.slot_record])
before_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.WAITING, before_record.status)
self.assertTrue(before_record.fill_time is None)
self.context.evaluate(self.pipeline_key)
after_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.FILLED, after_record.status)
self.assertTrue(after_record.fill_time is not None)
after_pipeline = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.RUN, after_pipeline.status)
self.context.evaluate(self.pipeline_key)
second_after_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.FILLED, second_after_record.status)
self.assertTrue(second_after_record.fill_time is not None)
# The output slot fill times are different, which means the pipeline re-ran.
self.assertNotEquals(second_after_record.fill_time, after_record.fill_time)
def testAsyncRunRerun(self):
"""Tests a run, async pipeline task being re-run."""
self.pipeline_record.class_path = '__main__.DumbAsync'
self.pipeline_record.status = _PipelineRecord.RUN
db.put([self.pipeline_record, self.slot_record])
before_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.WAITING, before_record.status)
self.assertTrue(before_record.fill_time is None)
self.context.evaluate(self.pipeline_key)
after_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.FILLED, after_record.status)
self.assertTrue(after_record.fill_time is not None)
after_pipeline = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.RUN, after_pipeline.status)
self.context.evaluate(self.pipeline_key)
second_after_record = db.get(self.slot_key)
self.assertEquals(_SlotRecord.FILLED, second_after_record.status)
self.assertTrue(second_after_record.fill_time is not None)
# The output slot fill times are different, which means the pipeline re-ran.
self.assertNotEquals(second_after_record.fill_time, after_record.fill_time)
def testAsyncFinalizingRerun(self):
"""Tests a finalizing, async pipeline task being re-run."""
self.pipeline_record.class_path = '__main__.DumbAsync'
self.slot_record.status = _SlotRecord.FILLED
self.slot_record.value_text = json.dumps(None)
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
after_pipeline = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_pipeline.status)
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
second_after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
# Finalized time will stay the same.
self.assertEquals(after_record.finalized_time,
second_after_record.finalized_time)
def testAsyncDoneFinalizeRerun(self):
"""Tests a done, async pipeline task being re-finalized."""
now = datetime.datetime.utcnow()
self.pipeline_record.class_path = '__main__.DumbAsync'
self.pipeline_record.status = _PipelineRecord.DONE
self.pipeline_record.finalized_time = now
self.slot_record.status = _SlotRecord.FILLED
self.slot_record.value_text = json.dumps(None)
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
# Finalize time stays the same.
self.assertEquals(now, after_record.finalized_time)
def testNonYieldingGeneratorWaitingFilled(self):
"""Tests a waiting, non-yielding generator will fill its output slot."""
self.pipeline_record.class_path = '__main__.DumbGenerator'
db.put([self.pipeline_record, self.slot_record])
self.assertEquals(_SlotRecord.WAITING, db.get(self.slot_key).status)
self.context.evaluate(self.pipeline_key)
# Output slot is filled.
after_slot = db.get(self.slot_key)
self.assertEquals(_SlotRecord.FILLED, after_slot.status)
# Pipeline is now in the run state.
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.RUN, after_record.status)
def testNonYieldingGeneratorRunNotFilledRerun(self):
"""Tests a run, non-yielding generator with a not filled output slot.
This happens when the generator yields no children and is moved to the
RUN state but then fails before it could output to the default slot.
"""
self.pipeline_record.class_path = '__main__.DumbGenerator'
self.pipeline_record.status = _PipelineRecord.RUN
db.put([self.pipeline_record, self.slot_record])
self.assertEquals(_SlotRecord.WAITING, db.get(self.slot_key).status)
self.context.evaluate(self.pipeline_key)
# Output slot is filled.
after_slot = db.get(self.slot_key)
self.assertEquals(_SlotRecord.FILLED, after_slot.status)
def testGeneratorRunReRun(self):
"""Tests a run, yielding generator that is re-run."""
self.pipeline_record.class_path = '__main__.DumbGeneratorYields'
self.pipeline_record.status = _PipelineRecord.RUN
self.pipeline_record.fanned_out = [self.pipeline2_key]
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key)
# Output slot wasn't filled.
after_slot = db.get(self.slot_key)
self.assertEquals(_SlotRecord.WAITING, after_slot.status)
# Status hasn't changed.
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.RUN, after_record.status)
def testGeneratorFinalizingRerun(self):
"""Tests a finalizing, generator pipeline task being re-run."""
self.pipeline_record.class_path = '__main__.DumbGeneratorYields'
self.pipeline_record.status = _PipelineRecord.RUN
self.slot_record.status = _SlotRecord.FILLED
self.slot_record.value_text = json.dumps(None)
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
second_after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
# Finalized time will stay the same.
self.assertEquals(after_record.finalized_time,
second_after_record.finalized_time)
def testGeneratorDoneFinalizeRerun(self):
"""Tests a done, generator pipeline task being re-run."""
now = datetime.datetime.utcnow()
self.pipeline_record.class_path = '__main__.DumbGeneratorYields'
self.pipeline_record.status = _PipelineRecord.DONE
self.pipeline_record.finalized_time = now
self.slot_record.status = _SlotRecord.FILLED
self.slot_record.value_text = json.dumps(None)
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.FINALIZE)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.DONE, after_record.status)
# Finalize time stays the same.
self.assertEquals(now, after_record.finalized_time)
def testFromIdFails(self):
"""Tests when evaluate's call to from_id fails a retry attempt is made."""
self.pipeline_record.class_path = '__main__.DiesOnCreation'
db.put([self.pipeline_record, self.slot_record])
self.assertEquals(0, self.pipeline_record.current_attempt)
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.START)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertEquals('Exception: This will not work!',
after_record.retry_message)
def testMismatchedAttempt(self):
"""Tests when the task's current attempt does not match the datastore."""
self.pipeline_record.class_path = '__main__.DiesOnRun'
self.pipeline_record.current_attempt = 3
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key,
purpose=_BarrierRecord.START,
attempt=1)
# Didn't run because no state change occurred, retry count is the same.
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(3, after_record.current_attempt)
self.assertEquals(None, after_record.retry_message)
def testPastMaxAttempts(self):
"""Tests when the current attempt number is beyond the max attempts.
This could happen if the user edits 'max_attempts' during execution.
"""
self.pipeline_record.class_path = '__main__.DiesOnRun'
self.pipeline_record.current_attempt = 5
self.pipeline_record.max_attempts = 3
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key,
purpose=_BarrierRecord.START,
attempt=5)
# Didn't run because no state change occurred, retry count is the same.
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(5, after_record.current_attempt)
self.assertEquals(None, after_record.retry_message)
def testPrematureRetry(self):
"""Tests when the current retry request came prematurely."""
now = datetime.datetime.utcnow()
self.pipeline_record.class_path = '__main__.DiesOnRun'
self.pipeline_record.current_attempt = 1
self.pipeline_record.max_attempts = 3
self.pipeline_record.next_retry_time = now + datetime.timedelta(seconds=30)
db.put([self.pipeline_record, self.slot_record])
self.assertRaises(
pipeline.UnexpectedPipelineError,
self.context.evaluate,
self.pipeline_key,
purpose=_BarrierRecord.START,
attempt=1)
# Didn't run because no state change occurred, retry count is the same.
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertEquals(None, after_record.retry_message)
def testRunExceptionRetry(self):
"""Tests that exceptions in Sync/Async pipelines cause a retry."""
self.pipeline_record.class_path = '__main__.DiesOnRun'
db.put([self.pipeline_record, self.slot_record])
self.assertEquals(0, self.pipeline_record.current_attempt)
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.START)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertEquals('Exception: Cannot run this one!',
after_record.retry_message)
def testRunForceRetry(self):
"""Tests that explicit Retry on a synchronous pipeline."""
self.pipeline_record.class_path = '__main__.RetriesOnRun'
db.put([self.pipeline_record, self.slot_record])
self.assertEquals(0, self.pipeline_record.current_attempt)
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.START)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertEquals('Gotta go and retry now!',
after_record.retry_message)
def testGeneratorExceptionRetry(self):
"""Tests that exceptions in a generator pipeline cause a retry."""
self.pipeline_record.class_path = '__main__.DiesAfterYield'
db.put([self.pipeline_record, self.slot_record])
self.assertEquals(0, self.pipeline_record.current_attempt)
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.START)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertEquals('Exception: Whoops I will die now!',
after_record.retry_message)
def testGeneratorForceRetry(self):
"""Tests when a generator raises a user-initiated retry exception."""
self.pipeline_record.class_path = '__main__.RetryAfterYield'
db.put([self.pipeline_record, self.slot_record])
self.assertEquals(0, self.pipeline_record.current_attempt)
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.START)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertEquals('I want to retry now!', after_record.retry_message)
def testNonAsyncAbortSignal(self):
"""Tests when a non-async pipeline receives the abort signal."""
self.pipeline_record.class_path = '__main__.DumbSync'
self.pipeline_record.status = _PipelineRecord.WAITING
self.assertTrue(self.pipeline_record.finalized_time is None)
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.ABORT)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
self.assertEquals(0, after_record.current_attempt)
self.assertTrue(after_record.retry_message is None)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is not None)
def testAbortRootPipelineFastPath(self):
"""Tests root pipeline status also functions as the abort signal."""
root_pipeline = _PipelineRecord(
root_pipeline=self.pipeline2_key,
status=_PipelineRecord.RUN,
class_path='does.not.exist',
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps({
'output_slots': {'default': str(self.slot_key)},
'args': [],
'kwargs': {},
'task_retry': False,
'backoff_seconds': 1,
'backoff_factor': 2,
'max_attempts': 4,
'queue_name': 'default',
'base_path': '',
}),
key=self.pipeline2_key,
is_root_pipeline=True,
max_attempts=4,
abort_requested=True)
# Use DiesOnRun to ensure that we don't actually run the pipeline.
self.pipeline_record.class_path = '__main__.DiesOnRun'
self.pipeline_record.root_pipeline = self.pipeline2_key
db.put([self.pipeline_record, self.slot_record, root_pipeline])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.START)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
self.assertEquals(0, after_record.current_attempt)
self.assertTrue(after_record.retry_message is None)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is not None)
def testNonAsyncAbortSignalRepeated(self):
"""Tests when a non-async pipeline has the abort request repeated.
Tests the case of getting the abort signal is successful, and that the
pipeline will finalize before being aborted.
"""
self.pipeline_record.class_path = '__main__.DumbSync'
self.pipeline_record.status = _PipelineRecord.WAITING
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.ABORT)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
self.assertEquals(0, after_record.current_attempt)
self.assertTrue(after_record.retry_message is None)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is not None)
# Run a second time-- this should be ignored.
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.ABORT)
after_record2 = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
self.assertEquals(0, after_record2.current_attempt)
self.assertTrue(after_record2.retry_message is None)
self.assertTrue(after_record2.abort_message is None)
self.assertEquals(after_record.finalized_time, after_record2.finalized_time)
def testAsyncAbortSignalBeforeStart(self):
"""Tests when an async pipeline has an abort request and has not run yet.
Verifies that the pipeline will be finalized and transitioned to ABORTED.
"""
self.pipeline_record.class_path = '__main__.DumbAsync'
self.pipeline_record.status = _PipelineRecord.WAITING
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.ABORT)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
self.assertEquals(0, after_record.current_attempt)
self.assertTrue(after_record.retry_message is None)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is not None)
def testAsyncAbortSignalDisallowed(self):
"""Tests when an async pipeline receives abort but try_cancel is False."""
self.pipeline_record.class_path = '__main__.AsyncCannotAbort'
self.pipeline_record.status = _PipelineRecord.RUN
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.ABORT)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.RUN, after_record.status)
self.assertEquals(0, after_record.current_attempt)
self.assertTrue(after_record.retry_message is None)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is None)
def testAsyncAbortSignalAllowed(self):
"""Tests when an async pipeline receives abort but try_cancel is True."""
self.pipeline_record.class_path = '__main__.AsyncCanAbort'
self.pipeline_record.status = _PipelineRecord.RUN
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.ABORT)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
self.assertEquals(0, after_record.current_attempt)
self.assertTrue(after_record.retry_message is None)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is not None)
def testGeneratorAbortException(self):
"""Tests when a generator raises an abort after it's begun yielding."""
self.pipeline_record.class_path = '__main__.AbortAfterYield'
self.pipeline_record.status = _PipelineRecord.RUN
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.ABORT)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
self.assertEquals(0, after_record.current_attempt)
self.assertTrue(after_record.retry_message is None)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is not None)
def testRetryWhenSyncDoesNotFillSlot(self):
"""Tests when a sync pipeline does not fill a slot that it will retry."""
self.pipeline_record.class_path = '__main__.SyncMissedOutput'
self.pipeline_record.status = _PipelineRecord.WAITING
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertEquals(
'SlotNotFilledError: Outputs set([\'another\']) for pipeline ID "one" '
'were never filled by "__main__.SyncMissedOutput".',
after_record.retry_message)
def testNonYieldingGeneratorDoesNotFillSlot(self):
"""Tests non-yielding pipelines that do not fill a slot will retry."""
self.pipeline_record.class_path = '__main__.GeneratorMissedOutput'
self.pipeline_record.status = _PipelineRecord.WAITING
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertEquals(
'SlotNotFilledError: Outputs set([\'another\']) for pipeline ID "one" '
'were never filled by "__main__.GeneratorMissedOutput".',
after_record.retry_message)
def testAbortWithBadInputs(self):
"""Tests aborting a pipeline with unresolvable input slots."""
self.pipeline_record.class_path = '__main__.DumbSync'
self.pipeline_record.params['args'] = [
{'type': 'slot',
'slot_key': 'aglteS1hcHAtaWRyGQsSEF9BRV9DYXNjYWRlX1Nsb3QiA3JlZAw'}
]
self.pipeline_record.status = _PipelineRecord.WAITING
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.ABORT)
# Forced into the abort state.
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.ABORTED, after_record.status)
def testPassBadValue(self):
"""Tests when a pipeline passes a non-serializable value to a child."""
self.pipeline_record.class_path = '__main__.PassBadValue'
self.pipeline_record.status = _PipelineRecord.WAITING
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.START)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertIn('Bad child arguments. TypeError', after_record.retry_message)
self.assertIn('is not JSON serializable', after_record.retry_message)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is None)
def testReturnBadValue(self):
"""Tests when a pipeline returns a non-serializable value."""
self.pipeline_record.class_path = '__main__.ReturnBadValue'
self.pipeline_record.status = _PipelineRecord.WAITING
db.put([self.pipeline_record, self.slot_record])
self.context.evaluate(self.pipeline_key, purpose=_BarrierRecord.START)
after_record = db.get(self.pipeline_key)
self.assertEquals(_PipelineRecord.WAITING, after_record.status)
self.assertEquals(1, after_record.current_attempt)
self.assertIn('Bad return value. TypeError', after_record.retry_message)
self.assertIn('is not JSON serializable', after_record.retry_message)
self.assertTrue(after_record.abort_message is None)
self.assertTrue(after_record.finalized_time is None)
class HandlersPrivateTest(TestBase):
"""Tests that the pipeline request handlers are all private."""
def testBarrierHandler(self):
"""Tests the _BarrierHandler."""
handler = test_shared.create_handler(pipeline._BarrierHandler, 'POST', '/')
handler.post()
self.assertEquals((403, 'Forbidden'), handler.response._Response__status)
def testPipelineHandler(self):
"""Tests the _PipelineHandler."""
handler = test_shared.create_handler(pipeline._PipelineHandler, 'POST', '/')
handler.post()
self.assertEquals((403, 'Forbidden'), handler.response._Response__status)
def testFanoutAbortHandler(self):
"""Tests the _FanoutAbortHandler."""
handler = test_shared.create_handler(
pipeline._FanoutAbortHandler, 'POST', '/')
handler.post()
self.assertEquals((403, 'Forbidden'), handler.response._Response__status)
def testFanoutHandler(self):
"""Tests the _FanoutHandler."""
handler = test_shared.create_handler(pipeline._FanoutHandler, 'POST', '/')
handler.post()
self.assertEquals((403, 'Forbidden'), handler.response._Response__status)
def testCleanupHandler(self):
"""Tests the _CleanupHandler."""
handler = test_shared.create_handler(pipeline._CleanupHandler, 'POST', '/')
handler.post()
self.assertEquals((403, 'Forbidden'), handler.response._Response__status)
class InternalOnlyPipeline(pipeline.Pipeline):
"""Pipeline with internal-only callbacks."""
async = True
def run(self):
pass
class AdminOnlyPipeline(pipeline.Pipeline):
"""Pipeline with internal-only callbacks."""
async = True
admin_callbacks = True
def run(self):
pass
def callback(self, **kwargs):
pass
class PublicPipeline(pipeline.Pipeline):
"""Pipeline with public callbacks."""
async = True
public_callbacks = True
def run(self):
pass
def callback(self, **kwargs):
return (200, 'text/plain', repr(kwargs))
class DummyKind(db.Expando):
pass
class NoTransactionPipeline(PublicPipeline):
"""Pipeline that verifies the callback is executed outside a transaction."""
def callback(self, **kwargs):
if db.is_in_transaction():
try:
# If we are in non xg-transaction, we should be unable to write to 24
# new entity groups (1 is used to read pipeline state).
# Assumes the entity group limit is 25 (was previously 5).
for _ in xrange(24):
DummyKind().put()
try:
# Verify something is not wrong in the testbed and/or limits changed
DummyKind().put()
return (500, 'text/plain', 'More than 5 entity groups used.')
except db.BadRequestError:
return (203, 'text/plain', 'In a XG transaction')
except db.BadRequestError:
return (202, 'text/plain', 'In a non-XG transaction')
else:
return (201, 'text/plain', 'Outside a transaction.')
class NoXgTransactionPipeline(NoTransactionPipeline):
"""Pipeline that verifies the callback is in non-XG transaction."""
_callback_xg_transaction = False
class XgTransactionPipeline(NoTransactionPipeline):
"""Pipeline that verifies the callback is in a XG transaction."""
_callback_xg_transaction = True
class CallbackHandlerTest(TestBase):
"""Tests for the _CallbackHandler class."""
def testErrors(self):
"""Tests for error conditions."""
# No pipeline_id param.
handler = test_shared.create_handler(
pipeline._CallbackHandler, 'GET', '/?red=one&blue=two')
handler.get()
self.assertEquals((400, 'Bad Request'), handler.response._Response__status)
# Non-existent pipeline.
handler = test_shared.create_handler(
pipeline._CallbackHandler, 'GET', '/?pipeline_id=blah&red=one&blue=two')
handler.get()
self.assertEquals((400, 'Bad Request'), handler.response._Response__status)
# Pipeline exists but class path is bogus.
stage = InternalOnlyPipeline()
stage.start()
pipeline_record = pipeline.models._PipelineRecord.get_by_key_name(
stage.pipeline_id)
params = pipeline_record.params
params['class_path'] = 'does.not.exist'
pipeline_record.params_text = json.dumps(params)
pipeline_record.put()
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=%s&red=one&blue=two' % stage.pipeline_id)
handler.get()
self.assertEquals((400, 'Bad Request'), handler.response._Response__status)
# Internal-only callbacks.
stage = InternalOnlyPipeline()
stage.start()
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=%s&red=one&blue=two' % stage.pipeline_id)
handler.get()
self.assertEquals((400, 'Bad Request'), handler.response._Response__status)
# Admin-only callbacks but not admin.
stage = AdminOnlyPipeline()
stage.start()
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=%s&red=one&blue=two' % stage.pipeline_id)
handler.get()
self.assertEquals((400, 'Bad Request'), handler.response._Response__status)
def testAdminOnly(self):
"""Tests accessing a callback that is admin-only."""
stage = AdminOnlyPipeline()
stage.start()
os.environ['USER_IS_ADMIN'] = '1'
try:
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=%s&red=one&blue=two' % stage.pipeline_id)
handler.get()
finally:
del os.environ['USER_IS_ADMIN']
self.assertEquals((200, 'OK'), handler.response._Response__status)
def testPublic(self):
"""Tests accessing a callback that is public."""
stage = PublicPipeline()
stage.start()
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=%s&red=one&blue=two' % stage.pipeline_id)
handler.get()
self.assertEquals((200, 'OK'), handler.response._Response__status)
def testReturnValue(self):
"""Tests when the callback has a return value to render as output."""
stage = PublicPipeline()
stage.start()
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=%s&red=one&blue=two' % stage.pipeline_id)
handler.get()
self.assertEquals((200, 'OK'), handler.response._Response__status)
self.assertEquals(
{'red': u'one', 'blue': u'two'},
eval(handler.response.out.getvalue()))
self.assertEquals('text/plain', handler.response.headers['Content-Type'])
def RunTransactionTest(self, stage, expected_code):
stage.start()
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=%s' % stage.pipeline_id)
handler.get()
self.assertEquals(expected_code, handler.response._Response__status[0])
def testNoTransaction(self):
"""Tests that the callback is not called from within a trans. by default."""
self.RunTransactionTest(NoTransactionPipeline(), 201)
def testNonXgTransaction(self):
"""Tests that the callback is called within a single EG transaction."""
self.RunTransactionTest(NoXgTransactionPipeline(), 202)
def testXgTransaction(self):
"""Tests that the callback is called within a cross EG transaction."""
self.RunTransactionTest(XgTransactionPipeline(), 203)
def testGiveUpOnTask(self):
"""Tests that after N retries the task is abandoned."""
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=does_not_exist')
handler.request.environ['HTTP_X_APPENGINE_TASKRETRYCOUNT'] = '1'
handler.get()
self.assertEquals(400, handler.response._Response__status[0])
handler = test_shared.create_handler(
pipeline._CallbackHandler,
'GET', '/?pipeline_id=does_not_exist')
handler.request.environ['HTTP_X_APPENGINE_TASKRETRYCOUNT'] = '10'
handler.get()
self.assertEquals(200, handler.response._Response__status[0])
class CleanupHandlerTest(test_shared.TaskRunningMixin, TestBase):
"""Tests for the _CleanupHandler class."""
def testSuccess(self):
"""Tests successfully deleting all child pipeline elements."""
self.assertEquals(0, len(list(_PipelineRecord.all())))
self.assertEquals(0, len(list(_SlotRecord.all())))
self.assertEquals(0, len(list(_BarrierRecord.all())))
self.assertEquals(0, len(list(_StatusRecord.all())))
stage = OutputlessPipeline()
stage.start(idempotence_key='banana')
stage.set_status('My status here!')
self.assertEquals(1, len(list(_PipelineRecord.all())))
self.assertEquals(1, len(list(_SlotRecord.all())))
self.assertEquals(1, len(list(_BarrierRecord.all())))
self.assertEquals(1, len(list(_StatusRecord.all())))
self.assertEquals(1, len(list(_BarrierIndex.all())))
stage.cleanup()
task_list = self.get_tasks()
self.assertEquals(2, len(task_list))
# The order of the tasks (start or cleanup) is unclear, so
# fish out the one that's the cleanup task and run it directly.
for task in task_list:
if task['url'] == '/_ah/pipeline/cleanup':
self.run_task(task)
self.assertEquals(0, len(list(_PipelineRecord.all())))
self.assertEquals(0, len(list(_SlotRecord.all())))
self.assertEquals(0, len(list(_BarrierRecord.all())))
self.assertEquals(0, len(list(_StatusRecord.all())))
self.assertEquals(0, len(list(_BarrierIndex.all())))
class FanoutHandlerTest(test_shared.TaskRunningMixin, TestBase):
"""Tests for the _FanoutHandler class."""
def testOldStyle(self):
"""Tests the old fanout parameter style for backwards compatibility."""
stage = DumbGeneratorYields()
stage.start(idempotence_key='banana')
task_list = self.get_tasks()
test_shared.delete_tasks(task_list)
self.run_task(task_list[0])
task_list = self.get_tasks()
self.assertEquals(1, len(task_list))
fanout_task = task_list[0]
self.assertEquals('/_ah/pipeline/fanout', fanout_task['url'])
after_record = db.get(stage._pipeline_key)
fanout_task['body'] = base64.b64encode(urllib.urlencode(
[('pipeline_key', str(after_record.fanned_out[0])),
('pipeline_key', str(after_record.fanned_out[1]))]))
test_shared.delete_tasks(task_list)
self.run_task(fanout_task)
task_list = self.get_tasks()
test_shared.delete_tasks(task_list)
self.assertEquals(2, len(task_list))
for task in task_list:
self.assertEquals('/_ah/pipeline/run', task['url'])
children_keys = [
db.Key(t['params']['pipeline_key'][0]) for t in task_list]
self.assertEquals(set(children_keys), set(after_record.fanned_out))
################################################################################
# Begin functional test section!
class RunOrder(db.Model):
"""Saves the order of method calls."""
order = db.ListProperty(db.Text)
@classmethod
def add(cls, message):
def txn():
runorder = RunOrder.get_by_key_name('singleton')
if runorder is None:
runorder = RunOrder(key_name='singleton')
runorder.order.append(db.Text(message))
runorder.put()
db.run_in_transaction(txn)
@classmethod
def get(cls):
runorder = RunOrder.get_by_key_name('singleton')
if runorder is None:
return []
else:
return [str(s) for s in runorder.order]
class SaveRunOrder(pipeline.Pipeline):
"""Pipeline that saves the run order message supplied."""
def run(self, message):
RunOrder.add(message)
class EchoSync(pipeline.Pipeline):
"""Pipeline that echos input."""
def run(self, *args):
if not args:
return None
if len(args) == 1:
return args[0]
return args
class EchoAsync(pipeline.Pipeline):
"""Asynchronous pipeline that echos input."""
async = True
def run(self, *args):
self.get_callback_task(
params=dict(return_value=pickle.dumps(args))).add()
def callback(self, return_value):
args = pickle.loads(str(return_value))
if not args:
self.complete(None)
elif len(args) == 1:
self.complete(args[0])
else:
self.complete(args)
def run_test(self, *args):
self.callback(pickle.dumps(args))
class EchoNamedSync(pipeline.Pipeline):
"""Pipeline that echos named inputs to named outputs."""
def run(self, **kwargs):
prefix = kwargs.get('prefix', '')
if prefix:
del kwargs['prefix']
for name, value in kwargs.iteritems():
self.fill(name, prefix + value)
class EchoParticularNamedSync(EchoNamedSync):
"""Has preexisting output names so it can be used as a root pipeline."""
output_names = ['one', 'two', 'three', 'four']
class EchoNamedAsync(pipeline.Pipeline):
"""Asynchronous pipeline that echos named inputs to named outputs."""
async = True
def run(self, **kwargs):
self.get_callback_task(params=kwargs).add()
def callback(self, **kwargs):
prefix = kwargs.get('prefix', '')
if prefix:
del kwargs['prefix']
for name, value in kwargs.iteritems():
self.fill(name, prefix + value)
self.complete()
def run_test(self, **kwargs):
self.callback(**kwargs)
class EchoNamedHalfAsync(pipeline.Pipeline):
"""Pipeline that echos to named outputs and completes async.
This is different than the other EchoNamedAsync because it fills all the
slots except the default slot immediately, and then uses a callback to
finally complete.
"""
async = True
output_names = ['one', 'two', 'three', 'four']
def run(self, **kwargs):
prefix = kwargs.get('prefix', '')
if prefix:
del kwargs['prefix']
for name, value in kwargs.iteritems():
self.fill(name, prefix + value)
self.get_callback_task(params=kwargs).add()
def callback(self, **kwargs):
self.complete()
def run_test(self, **kwargs):
prefix = kwargs.get('prefix', '')
if prefix:
del kwargs['prefix']
for name, value in kwargs.iteritems():
self.fill(name, prefix + value)
self.callback(**kwargs)
class EchoParticularNamedAsync(EchoNamedAsync):
"""Has preexisting output names so it can be used as a root pipeline."""
output_names = ['one', 'two', 'three', 'four']
class FillAndPass(pipeline.Pipeline):
"""Test pipeline that fills some outputs and passes the rest to a child."""
def run(self, to_fill, **kwargs):
for name in to_fill:
self.fill(name, kwargs.pop(name))
adjusted_kwargs = {}
for name, value in kwargs.iteritems():
adjusted_kwargs[name] = value
if adjusted_kwargs:
yield EchoNamedSync(**adjusted_kwargs)
class FillAndPassParticular(FillAndPass):
"""Has preexisting output names so it can be used as a root pipeline."""
output_names = ['one', 'two', 'three', 'four']
class StrictChildInheritsAll(pipeline.Pipeline):
"""Test pipeline whose strict child inherits all outputs."""
output_names = ['one', 'two', 'three', 'four']
def run(self, **kwargs):
yield EchoParticularNamedSync(**kwargs)
class StrictChildGeneratorInheritsAll(pipeline.Pipeline):
"""Test pipeline whose strict child generator inherits all outputs."""
output_names = ['one', 'two', 'three', 'four']
def run(self, **kwargs):
yield FillAndPassParticular(kwargs.keys(), **kwargs)
class ConsumePartialChildrenStrict(pipeline.Pipeline):
"""Test pipeline that consumes a subset of a strict child's outputs."""
def run(self, **kwargs):
result = yield EchoParticularNamedSync(**kwargs)
yield EchoSync(result.one, result.two)
class ConsumePartialChildren(pipeline.Pipeline):
"""Test pipeline that consumes a subset of a dynamic child's outputs."""
def run(self, **kwargs):
result = yield EchoNamedSync(**kwargs)
yield EchoSync(result.one, result.two)
class DoNotConsumeDefault(pipeline.Pipeline):
"""Test pipeline that does not consume a child's default output."""
def run(self, value):
yield EchoSync('not used')
yield EchoSync(value)
class TwoLevelFillAndPass(pipeline.Pipeline):
"""Two-level deep version of fill and pass."""
output_names = ['one', 'two', 'three', 'four']
def run(self, **kwargs):
# This stage will prefix any keyword args with 'first-'.
stage = yield FillAndPass(
[],
prefix='first-',
one=kwargs.get('one'),
two=kwargs.get('two'))
adjusted_kwargs = kwargs.copy()
adjusted_kwargs['one'] = stage.one
adjusted_kwargs['two'] = stage.two
adjusted_kwargs['prefix'] = 'second-'
# This stage will prefix any keyword args with 'second-'. That means
# any args that were passed in from the output of the first stage will
# be prefixed twice: 'second-first-<kwarg>'.
yield FillAndPass([], **adjusted_kwargs)
class DivideWithRemainder(pipeline.Pipeline):
"""Divides a number, returning the divisor and the quotient."""
output_names = ['remainder']
def run(self, dividend, divisor):
self.fill(self.outputs.remainder, dividend % divisor)
return dividend // divisor
class EuclidGCD(pipeline.Pipeline):
"""Does the Euclidean Greatest Common Factor recursive algorithm."""
output_names = ['gcd']
def run(self, a, b):
a, b = max(a, b), min(a, b)
if b == 0:
self.fill(self.outputs.gcd, a)
return
result = yield DivideWithRemainder(a, b)
recurse = yield EuclidGCD(b, result.remainder)
class UnusedOutputReference(pipeline.Pipeline):
"""Test pipeline that touches a child output but doesn't consume it."""
def run(self):
result = yield EchoParticularNamedSync(
one='red', two='blue', three='green', four='yellow')
print result.one
print result.two
print result.three
yield EchoSync(result.four)
class AccessUndeclaredDefaultOnly(pipeline.Pipeline):
"""Test pipeline accesses undeclared output of a default-only pipeline."""
def run(self):
result = yield EchoSync('hi')
yield EchoSync(result.does_not_exist)
class RunMethod(pipeline.Pipeline):
"""Test pipeline that outputs what method was used for running it."""
def run(self):
return 'run()'
def run_test(self):
return 'run_test()'
class DoAfter(pipeline.Pipeline):
"""Test the After clause."""
def run(self):
first = yield SaveRunOrder('first')
second = yield SaveRunOrder('first')
with pipeline.After(first, second):
third = yield SaveRunOrder('third')
fourth = yield SaveRunOrder('third')
class DoAfterNested(pipeline.Pipeline):
"""Test the After clause in multiple nestings."""
def run(self):
first = yield SaveRunOrder('first')
second = yield SaveRunOrder('first')
with pipeline.After(first, second):
third = yield SaveRunOrder('third')
fourth = yield SaveRunOrder('third')
with pipeline.After(third, fourth):
with pipeline.After(third):
yield SaveRunOrder('fifth')
with pipeline.After(fourth):
yield SaveRunOrder('fifth')
class DoAfterList(pipeline.Pipeline):
"""Test the After clause with a list of jobs."""
def run(self):
job_list = []
for i in xrange(10):
job = yield EchoNamedHalfAsync(
one='red', two='blue', three='green', four='yellow')
job_list.append(job)
with pipeline.After(*job_list):
combined = yield common.Concat(*[job.one for job in job_list])
result = yield SaveRunOrder(combined)
with pipeline.After(result):
yield SaveRunOrder('twelfth')
class DoInOrder(pipeline.Pipeline):
"""Test the InOrder clause."""
def run(self):
with pipeline.InOrder():
yield SaveRunOrder('first')
yield SaveRunOrder('second')
yield SaveRunOrder('third')
yield SaveRunOrder('fourth')
class DoInOrderNested(pipeline.Pipeline):
"""Test the InOrder clause when nested."""
def run(self):
with pipeline.InOrder():
yield SaveRunOrder('first')
yield SaveRunOrder('second')
with pipeline.InOrder():
# Should break.
yield SaveRunOrder('third')
yield SaveRunOrder('fourth')
class MixAfterInOrder(pipeline.Pipeline):
"""Test mixing After and InOrder clauses."""
def run(self):
first = yield SaveRunOrder('first')
with pipeline.After(first):
with pipeline.InOrder():
yield SaveRunOrder('second')
yield SaveRunOrder('third')
fourth = yield SaveRunOrder('fourth')
with pipeline.InOrder():
with pipeline.After(fourth):
yield SaveRunOrder('fifth')
yield SaveRunOrder('sixth')
class RecordFinalized(pipeline.Pipeline):
"""Records calls to finalized."""
def run(self, depth):
yield SaveRunOrder('run#%d' % depth)
def finalized(self):
RunOrder.add('finalized#%d' % self.args[0])
def finalized_test(self):
RunOrder.add('finalized_test#%d' % self.args[0])
class NestedFinalize(pipeline.Pipeline):
"""Test nested pipelines are finalized in a reasonable order."""
def run(self, depth):
if depth == 0:
return
yield RecordFinalized(depth)
yield NestedFinalize(depth - 1)
class YieldBadValue(pipeline.Pipeline):
"""Test pipeline that yields something that's not a pipeline."""
def run(self):
yield 5
class YieldChildTwice(pipeline.Pipeline):
"""Test pipeline that yields the same child pipeline twice."""
def run(self):
child = EchoSync('bad')
yield child
yield child
class FinalizeFailure(pipeline.Pipeline):
"""Test when finalized raises an error."""
def run(self):
pass
def finalized(self):
raise Exception('Doh something broke!')
class SyncForcesRetry(pipeline.Pipeline):
"""Test when a synchronous pipeline raises the Retry exception."""
def run(self):
raise pipeline.Retry('We need to try this again')
class AsyncForcesRetry(pipeline.Pipeline):
"""Test when a synchronous pipeline raises the Retry exception."""
async = True
def run(self):
raise pipeline.Retry('We need to try this again')
def run_test(self):
raise pipeline.Retry('We need to try this again')
class GeneratorForcesRetry(pipeline.Pipeline):
"""Test when a generator pipeline raises the Retry exception."""
def run(self):
if False:
yield 1
raise pipeline.Retry('We need to try this again')
class SyncRaiseAbort(pipeline.Pipeline):
"""Raises an abort signal."""
def run(self):
RunOrder.add('run SyncRaiseAbort')
raise pipeline.Abort('Gotta bail!')
def finalized(self):
RunOrder.add('finalized SyncRaiseAbort: %s' % self.was_aborted)
class AsyncRaiseAbort(pipeline.Pipeline):
"""Raises an abort signal in an asynchronous pipeline."""
async = True
def run(self):
raise pipeline.Abort('Gotta bail!')
def run_test(self):
raise pipeline.Abort('Gotta bail!')
class GeneratorRaiseAbort(pipeline.Pipeline):
"""Raises an abort signal in a generator pipeline."""
def run(self):
if False:
yield 1
raise pipeline.Abort('Gotta bail!')
class AbortAndRecordFinalized(pipeline.Pipeline):
"""Records calls to finalized."""
def run(self):
RunOrder.add('run AbortAndRecordFinalized')
yield SyncRaiseAbort()
def finalized(self):
RunOrder.add('finalized AbortAndRecordFinalized: %s' %
self.was_aborted)
class SetStatusPipeline(pipeline.Pipeline):
"""Simple pipeline that just sets its status a few times."""
def run(self):
self.set_status(message='My message')
self.set_status(console_url='/path/to/my/console')
self.set_status(status_links=dict(one='/red', two='/blue'))
self.set_status(message='My message',
console_url='/path/to/my/console',
status_links=dict(one='/red', two='/blue'))
class PassBadValue(pipeline.Pipeline):
"""Simple pipeline that passes along a non-JSON serializable value."""
def run(self):
yield EchoSync(object())
class ReturnBadValue(pipeline.Pipeline):
"""Simple pipeline that returns a non-JSON serializable value."""
def run(self):
return object()
class EchoParams(pipeline.Pipeline):
"""Echos the parameters this pipeline has."""
def run(self):
ALLOWED = ('backoff_seconds', 'backoff_factor', 'max_attempts', 'target')
return dict((key, getattr(self, key)) for key in ALLOWED)
class WithParams(pipeline.Pipeline):
"""Simple pipeline that uses the with_params helper method."""
def run(self):
foo = yield EchoParams().with_params(
max_attempts=8,
backoff_seconds=99,
target='other-backend')
yield EchoSync(foo, 'stuff')
class FunctionalTest(test_shared.TaskRunningMixin, TestBase):
"""End-to-end tests for various Pipeline constructs."""
def setUp(self):
"""Sets up the test harness."""
super(FunctionalTest, self).setUp()
def testStartSync(self):
"""Tests starting and executing just a synchronous pipeline."""
stage = EchoSync(1, 2, 3)
self.assertFalse(stage.async)
self.assertEquals((1, 2, 3), EchoSync(1, 2, 3).run(1, 2, 3))
outputs = self.run_pipeline(stage)
self.assertEquals([1, 2, 3], outputs.default.value)
def testStartAsync(self):
"""Tests starting and executing an asynchronous pipeline."""
stage = EchoAsync(1, 2, 3)
self.assertTrue(stage.async)
outputs = self.run_pipeline(stage)
self.assertEquals([1, 2, 3], outputs.default.value)
def testSyncNamedOutputs(self):
"""Tests a synchronous pipeline with named outputs."""
stage = EchoParticularNamedSync(
one='red', two='blue', three='green', four='yellow')
self.assertFalse(stage.async)
outputs = self.run_pipeline(stage)
self.assertEquals(None, outputs.default.value)
self.assertEquals('red', outputs.one.value)
self.assertEquals('blue', outputs.two.value)
self.assertEquals('green', outputs.three.value)
self.assertEquals('yellow', outputs.four.value)
def testAsyncNamedOutputs(self):
"""Tests an asynchronous pipeline with named outputs."""
stage = EchoParticularNamedAsync(
one='red', two='blue', three='green', four='yellow')
self.assertTrue(stage.async)
outputs = self.run_pipeline(stage)
self.assertEquals(None, outputs.default.value)
self.assertEquals('red', outputs.one.value)
self.assertEquals('blue', outputs.two.value)
self.assertEquals('green', outputs.three.value)
self.assertEquals('yellow', outputs.four.value)
def testInheirtOutputs(self):
"""Tests when a pipeline generator child inherits all parent outputs."""
stage = FillAndPassParticular(
[],
one='red', two='blue', three='green', four='yellow',
prefix='passed-')
outputs = self.run_pipeline(stage)
self.assertEquals(None, outputs.default.value)
self.assertEquals('passed-red', outputs.one.value)
self.assertEquals('passed-blue', outputs.two.value)
self.assertEquals('passed-green', outputs.three.value)
self.assertEquals('passed-yellow', outputs.four.value)
def testInheritOutputsPartial(self):
"""Tests when a pipeline generator child inherits some parent outputs."""
stage = FillAndPassParticular(
['one', 'three'],
one='red', two='blue', three='green', four='yellow',
prefix='passed-')
outputs = self.run_pipeline(stage)
self.assertEquals(None, outputs.default.value)
self.assertEquals('red', outputs.one.value)
self.assertEquals('passed-blue', outputs.two.value)
self.assertEquals('green', outputs.three.value)
self.assertEquals('passed-yellow', outputs.four.value)
def testInheritOutputsStrict(self):
"""Tests strict child of a pipeline generator inherits all outputs."""
stage = StrictChildInheritsAll(
one='red', two='blue', three='green', four='yellow')
outputs = self.run_pipeline(stage)
self.assertEquals(None, outputs.default.value)
self.assertEquals('red', outputs.one.value)
self.assertEquals('blue', outputs.two.value)
self.assertEquals('green', outputs.three.value)
self.assertEquals('yellow', outputs.four.value)
def testInheritChildSyncStrictMissing(self):
"""Tests when a strict child pipeline does not output to a required slot."""
stage = StrictChildInheritsAll(
one='red', two='blue', three='green')
self.assertRaises(pipeline.SlotNotFilledError, self.run_pipeline, stage)
def testInheritChildSyncStrictNotDeclared(self):
"""Tests when a strict child pipeline outputs to an undeclared name."""
stage = StrictChildInheritsAll(
one='red', two='blue', three='green', four='yellow', five='undeclared')
self.assertRaises(pipeline.SlotNotDeclaredError, self.run_pipeline, stage)
def testInheritGeneratorStrict(self):
"""Tests when a strict child pipeline inherits all outputs."""
stage = StrictChildGeneratorInheritsAll(
one='red', two='blue', three='green', four='yellow')
outputs = self.run_pipeline(stage)
self.assertEquals(None, outputs.default.value)
self.assertEquals('red', outputs.one.value)
self.assertEquals('blue', outputs.two.value)
self.assertEquals('green', outputs.three.value)
self.assertEquals('yellow', outputs.four.value)
def testInheritGeneratorStrictMissing(self):
"""Tests when a strict child generator does not output to a slot."""
stage = StrictChildGeneratorInheritsAll(
one='red', two='blue', three='green')
self.assertRaises(pipeline.SlotNotFilledError, self.run_pipeline, stage)
def testInheritGeneratorStrictNotDeclared(self):
"""Tests when a strict child generator outputs to an undeclared name."""
stage = StrictChildGeneratorInheritsAll(
one='red', two='blue', three='green', four='yellow', five='undeclared')
self.assertRaises(pipeline.SlotNotDeclaredError, self.run_pipeline, stage)
def testPartialConsumptionStrict(self):
"""Tests when a parent pipeline consumes a subset of strict child outputs.
When the child is strict, then partial consumption is fine since all
outputs must be declared ahead of time.
"""
stage = ConsumePartialChildrenStrict(
one='red', two='blue', three='green', four='yellow')
outputs = self.run_pipeline(stage)
self.assertEquals(['red', 'blue'], outputs.default.value)
def testPartialConsumptionDynamic(self):
"""Tests when a parent pipeline consumes a subset of dynamic child outputs.
When the child is dynamic, then all outputs must be consumed by the caller.
"""
stage = ConsumePartialChildren(
one='red', two='blue', three='green', four='yellow')
self.assertRaises(pipeline.SlotNotDeclaredError, self.run_pipeline, stage)
def testNoDefaultConsumption(self):
"""Tests when a parent pipeline does not consume default output."""
stage = DoNotConsumeDefault('hi there')
outputs = self.run_pipeline(stage)
self.assertEquals('hi there', outputs.default.value)
def testGeneratorNoChildren(self):
"""Tests when a generator pipeline yields no children."""
self.assertRaises(StopIteration, FillAndPass([]).run([]).next)
stage = FillAndPass([])
outputs = self.run_pipeline(stage)
self.assertTrue(outputs.default.value is None)
def testSyncMissingNamedOutput(self):
"""Tests when a sync pipeline does not output to a named output."""
stage = EchoParticularNamedSync(one='red', two='blue', three='green')
self.assertFalse(stage.async)
self.assertRaises(pipeline.SlotNotFilledError, self.run_pipeline, stage)
def testGeneratorNoChildrenMissingNamedOutput(self):
"""Tests a missing output from a generator with no child pipelines."""
stage = FillAndPassParticular(
['one', 'two', 'three'],
one='red', two='blue', three='green')
self.assertRaises(pipeline.SlotNotFilledError, self.run_pipeline, stage)
def testSyncUndeclaredOutput(self):
"""Tests when a strict sync pipeline outputs to an undeclared output."""
stage = EchoParticularNamedSync(
one='red', two='blue', three='green', four='yellow', other='stuff')
self.assertFalse(stage.async)
self.assertRaises(pipeline.SlotNotDeclaredError, self.run_pipeline, stage)
def testGeneratorChildlessUndeclaredOutput(self):
"""Tests when a childless generator outputs to an undeclared output."""
stage = FillAndPassParticular(
['one', 'two', 'three', 'four', 'other'],
one='red', two='blue', three='green', four='yellow', other='stuff')
self.assertRaises(pipeline.SlotNotDeclaredError, self.run_pipeline, stage)
def testRootGeneratorChildInheritOutputUndeclared(self):
"""Tests when root's child inherits all and outputs to a bad name."""
stage = FillAndPassParticular(
['one', 'two'],
one='red', two='blue', three='green', four='yellow', other='stuff')
self.assertRaises(pipeline.SlotNotDeclaredError, self.run_pipeline, stage)
def testDeepGeneratorChildInheritOutputUndeclared(self):
"""Tests when a pipeline that is not the root outputs to a bad name."""
stage = TwoLevelFillAndPass(
one='red', two='blue', three='green', four='yellow', other='stuff')
self.assertRaises(pipeline.SlotNotDeclaredError, self.run_pipeline, stage)
def testDeepGenerator(self):
"""Tests a multi-level generator."""
stage = TwoLevelFillAndPass(
one='red', two='blue', three='green', four='yellow')
outputs = self.run_pipeline(stage)
self.assertEquals(None, outputs.default.value)
self.assertEquals('second-first-red', outputs.one.value)
self.assertEquals('second-first-blue', outputs.two.value)
self.assertEquals('second-green', outputs.three.value)
self.assertEquals('second-yellow', outputs.four.value)
def testDeepGenerator_Huge(self):
"""Tests a multi-level generator with huge inputs and outputs."""
big_data = 'blue' * 1000000
stage = TwoLevelFillAndPass(
one='red', two=big_data, three='green', four='yellow')
outputs = self.run_pipeline(stage)
self.assertEquals(None, outputs.default.value)
self.assertEquals('second-first-red', outputs.one.value)
self.assertEquals('second-first-' + big_data, outputs.two.value)
self.assertEquals('second-green', outputs.three.value)
self.assertEquals('second-yellow', outputs.four.value)
def testOnlyConsumePassedOnOutputs(self):
"""Tests that just accessing a Slot on a PipelineFuture won't consume it."""
stage = UnusedOutputReference()
outputs = self.run_pipeline(stage)
self.assertEquals('yellow', outputs.default.value)
def testAccessUndeclaredOutputsBreaks(self):
"""Tests errors accessing undeclared outputs on a default-only pipeline."""
stage = AccessUndeclaredDefaultOnly()
self.assertRaises(pipeline.SlotNotFilledError, self.run_pipeline, stage)
def testGeneratorRecursive(self):
"""Tests a recursive nesting of generators."""
stage = EuclidGCD(1071, 462)
outputs = self.run_pipeline(stage)
self.assertEquals(21, outputs.gcd.value)
stage = EuclidGCD(1071, 463)
outputs = self.run_pipeline(stage)
self.assertEquals(1, outputs.gcd.value)
def testAfter(self):
"""Tests the After() class."""
stage = DoAfter()
self.run_pipeline(stage)
self.assertEquals(['first', 'first', 'third', 'third'],
RunOrder.get())
def testAfterWithNesting(self):
"""Tests that After() nesting of the same dependencies doesn't break."""
stage = DoAfterNested()
self.run_pipeline(stage)
self.assertEquals(['first', 'first', 'third', 'third', 'fifth', 'fifth'],
RunOrder.get())
def testAfterWithList(self):
"""Tests that After() with a list of dependencies works."""
stage = DoAfterList()
self.run_pipeline(stage)
self.assertEquals( ['redredredredredredredredredred', 'twelfth'],
RunOrder.get())
def testInOrder(self):
"""Tests the InOrder() class."""
stage = DoInOrder()
self.run_pipeline(stage)
self.assertEquals(['first', 'second', 'third', 'fourth'],
RunOrder.get())
def testInOrderNesting(self):
"""Tests that InOrder nesting is not allowed."""
stage = DoInOrderNested()
self.assertRaises(
pipeline.UnexpectedPipelineError, self.run_pipeline, stage)
def testMixAfterInOrder(self):
"""Tests nesting Afters in InOrder blocks and vice versa."""
stage = MixAfterInOrder()
self.run_pipeline(stage)
self.assertEquals(['first', 'second', 'third', 'fourth', 'fifth', 'sixth'],
RunOrder.get())
def testFinalized(self):
"""Tests the order of finalization."""
stage = NestedFinalize(5)
self.run_pipeline(stage)
run_order = RunOrder.get()
# Ensure each entry is unique.
self.assertEquals(10, len(set(run_order)))
# That there are 5 run entries that are in reasonable order.
run_entries = [
int(r[len('run#'):]) for r in run_order
if r.startswith('run#')]
self.assertEquals(5, len(run_entries))
self.assertEquals([5, 4, 3, 2, 1], run_entries)
# That there are 5 finalized entries that are in reasonable order.
if self.test_mode:
finalized_name = 'finalized_test#'
else:
finalized_name = 'finalized#'
finalized_entries = [
int(r[len(finalized_name):]) for r in run_order
if r.startswith(finalized_name)]
self.assertEquals(5, len(finalized_entries))
self.assertEquals([5, 4, 3, 2, 1], finalized_entries)
def testRunTest(self):
"""Tests that run_test is preferred over run for test mode."""
stage = RunMethod()
outputs = self.run_pipeline(stage)
if self.test_mode:
self.assertEquals('run_test()', outputs.default.value)
else:
self.assertEquals('run()', outputs.default.value)
def testYieldBadValue(self):
"""Tests yielding something that is invalid."""
stage = YieldBadValue()
self.assertRaises(
pipeline.UnexpectedPipelineError, self.run_pipeline, stage)
def testYieldPipelineInstanceTwice(self):
"""Tests when a Pipeline instance is yielded multiple times."""
stage = YieldChildTwice()
self.assertRaises(
pipeline.UnexpectedPipelineError, self.run_pipeline, stage)
def testFinalizeException(self):
"""Tests that finalized exceptions just raise up without being caught."""
stage = FinalizeFailure()
try:
self.run_pipeline(stage)
self.fail('Should have raised')
except Exception, e:
self.assertEquals('Doh something broke!', str(e))
def testSyncRetryException(self):
"""Tests when a sync generator raises a Retry exception."""
stage = SyncForcesRetry()
self.assertRaises(pipeline.Retry, self.run_pipeline, stage)
def testAsyncRetryException(self):
"""Tests when an async generator raises a Retry exception."""
stage = AsyncForcesRetry()
self.assertRaises(pipeline.Retry, self.run_pipeline, stage)
def testGeneratorRetryException(self):
"""Tests when a generator raises a Retry exception."""
stage = GeneratorForcesRetry()
self.assertRaises(pipeline.Retry, self.run_pipeline, stage)
def testSyncAbortException(self):
"""Tests when a sync pipeline raises an abort exception."""
stage = SyncRaiseAbort()
self.assertRaises(pipeline.Abort, self.run_pipeline, stage)
def testAsyncAbortException(self):
"""Tests when an async pipeline raises an abort exception."""
stage = AsyncRaiseAbort()
self.assertRaises(pipeline.Abort, self.run_pipeline, stage)
def testGeneratorAbortException(self):
"""Tests when a generator pipeline raises an abort exception."""
stage = GeneratorRaiseAbort()
self.assertRaises(pipeline.Abort, self.run_pipeline, stage)
def testAbortThenFinalize(self):
"""Tests that pipelines are finalized after abort is raised.
This test requires special handling for different modes to confirm that
finalization happens after abort in production mode.
"""
stage = AbortAndRecordFinalized()
if self.test_mode:
# Finalize after abort doesn't happen in test mode.
try:
self.run_pipeline(stage)
self.fail('Should have raised')
except Exception, e:
self.assertEquals('Gotta bail!', str(e))
run_order = RunOrder.get()
self.assertEquals(['run AbortAndRecordFinalized', 'run SyncRaiseAbort'],
run_order)
else:
self.run_pipeline(stage, _task_retry=False, _require_slots_filled=False)
# Non-deterministic results for finalize. Must equal one of these two.
expected_order1 = [
'run AbortAndRecordFinalized',
'run SyncRaiseAbort',
'finalized SyncRaiseAbort: True',
'finalized AbortAndRecordFinalized: True',
]
expected_order2 = [
'run AbortAndRecordFinalized',
'run SyncRaiseAbort',
'finalized AbortAndRecordFinalized: True',
'finalized SyncRaiseAbort: True',
]
run_order = RunOrder.get()
self.assertTrue(run_order == expected_order1 or
run_order == expected_order2,
'Found order: %r' % run_order)
def testSetStatus_Working(self):
"""Tests that setting the status does not raise errors."""
stage = SetStatusPipeline()
self.run_pipeline(stage)
# That's it. No exceptions raised.
def testPassBadValue(self):
"""Tests when a pipeline passes a non-serializable value to a child."""
stage = PassBadValue()
self.assertRaises(TypeError, self.run_pipeline, stage)
def testReturnBadValue(self):
"""Tests when a pipeline returns a non-serializable value."""
stage = ReturnBadValue()
self.assertRaises(TypeError, self.run_pipeline, stage)
def testWithParams(self):
"""Tests when a pipeline uses the with_params helper."""
stage = WithParams()
outputs = self.run_pipeline(stage)
if self.test_mode:
# In test mode you cannot modify the runtime parameters.
self.assertEquals(
[
{
'backoff_seconds': 15,
'backoff_factor': 2,
'target': None,
'max_attempts': 3
},
'stuff'
],
outputs.default.value)
else:
self.assertEquals(
[
{
'backoff_seconds': 99,
'backoff_factor': 2,
'target': 'other-backend',
'max_attempts': 8
},
'stuff',
],
outputs.default.value)
class FunctionalTestModeTest(test_shared.TestModeMixin, FunctionalTest):
"""Runs all functional tests in test mode."""
DO_NOT_DELETE = 'Seriously... We only need the class declaration.'
class StatusTest(TestBase):
"""Tests for the status handlers."""
def setUp(self):
"""Sets up the test harness."""
TestBase.setUp(self)
self.fill_time = datetime.datetime(2010, 12, 10, 13, 55, 16, 416567)
self.pipeline1_key = db.Key.from_path(_PipelineRecord.kind(), 'one')
self.pipeline2_key = db.Key.from_path(_PipelineRecord.kind(), 'two')
self.pipeline3_key = db.Key.from_path(_PipelineRecord.kind(), 'three')
self.slot1_key = db.Key.from_path(_SlotRecord.kind(), 'red')
self.slot2_key = db.Key.from_path(_SlotRecord.kind(), 'blue')
self.slot3_key = db.Key.from_path(_SlotRecord.kind(), 'green')
self.slot1_record = _SlotRecord(
key=self.slot1_key,
root_pipeline=self.pipeline1_key)
self.slot2_record = _SlotRecord(
key=self.slot2_key,
root_pipeline=self.pipeline1_key)
self.slot3_record = _SlotRecord(
key=self.slot3_key,
root_pipeline=self.pipeline1_key)
self.base_params = {
'args': [],
'kwargs': {},
'task_retry': False,
'backoff_seconds': 1,
'backoff_factor': 2,
'max_attempts': 4,
'queue_name': 'default',
'base_path': '',
'after_all': [],
}
self.params1 = self.base_params.copy()
self.params1.update({
'output_slots': {'default': str(self.slot1_key)},
})
self.params2 = self.base_params.copy()
self.params2.update({
'output_slots': {'default': str(self.slot2_key)},
})
self.params3 = self.base_params.copy()
self.params3.update({
'output_slots': {'default': str(self.slot3_key)},
})
self.pipeline1_record = _PipelineRecord(
root_pipeline=self.pipeline1_key,
status=_PipelineRecord.RUN,
class_path='does.not.exist1',
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(self.params1),
key=self.pipeline1_key,
max_attempts=4)
self.pipeline2_record = _PipelineRecord(
root_pipeline=self.pipeline1_key,
status=_PipelineRecord.WAITING,
class_path='does.not.exist2',
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(self.params2),
key=self.pipeline2_key,
max_attempts=3)
self.pipeline3_record = _PipelineRecord(
root_pipeline=self.pipeline1_key,
status=_PipelineRecord.DONE,
class_path='does.not.exist3',
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(self.params3),
key=self.pipeline3_key,
max_attempts=2)
self.barrier1_record = _BarrierRecord(
parent=self.pipeline1_key,
key_name=_BarrierRecord.FINALIZE,
target=self.pipeline1_key,
root_pipeline=self.pipeline1_key,
blocking_slots=[self.slot1_key])
self.barrier2_record = _BarrierRecord(
parent=self.pipeline2_key,
key_name=_BarrierRecord.FINALIZE,
target=self.pipeline2_key,
root_pipeline=self.pipeline1_key,
blocking_slots=[self.slot2_key])
self.barrier2_record_start = _BarrierRecord(
parent=self.pipeline2_key,
key_name=_BarrierRecord.START,
target=self.pipeline2_key,
root_pipeline=self.pipeline1_key,
blocking_slots=[])
self.barrier3_record = _BarrierRecord(
parent=self.pipeline3_key,
key_name=_BarrierRecord.FINALIZE,
target=self.pipeline3_key,
root_pipeline=self.pipeline1_key,
blocking_slots=[self.slot3_key])
def testGetTimestampMs(self):
"""Tests for the _get_timestamp_ms function."""
when = datetime.datetime(2010, 12, 10, 13, 55, 16, 416567)
self.assertEquals(1291989316416L, pipeline._get_timestamp_ms(when))
def testGetInternalStatus_Missing(self):
"""Tests for _get_internal_status when the pipeline is missing."""
try:
pipeline._get_internal_status(pipeline_key=self.pipeline1_key)
self.fail('Did not raise')
except pipeline.PipelineStatusError, e:
self.assertEquals('Could not find pipeline ID "one"', str(e))
def testGetInternalStatus_OutputSlotMissing(self):
"""Tests for _get_internal_status when the output slot is missing."""
try:
pipeline._get_internal_status(
pipeline_key=self.pipeline1_key,
pipeline_dict={self.pipeline1_key: self.pipeline1_record},
barrier_dict={self.barrier1_record.key(): self.barrier1_record})
self.fail('Did not raise')
except pipeline.PipelineStatusError, e:
self.assertEquals(
'Default output slot with '
'key=aglteS1hcHAtaWRyGgsSEV9BRV9QaXBlbGluZV9TbG90IgNyZWQM '
'missing for pipeline ID "one"', str(e))
def testGetInternalStatus_FinalizeBarrierMissing(self):
"""Tests for _get_internal_status when the finalize barrier is missing."""
try:
pipeline._get_internal_status(
pipeline_key=self.pipeline1_key,
pipeline_dict={self.pipeline1_key: self.pipeline1_record},
slot_dict={self.slot1_key: self.slot1_record})
self.fail('Did not raise')
except pipeline.PipelineStatusError, e:
self.assertEquals(
'Finalization barrier missing for pipeline ID "one"', str(e))
def testGetInternalStatus_Finalizing(self):
"""Tests for _get_internal_status when the status is finalizing."""
self.slot1_record.status = _SlotRecord.FILLED
self.slot1_record.fill_time = self.fill_time
expected = {
'status': 'finalizing',
'currentAttempt': 1,
'afterSlotKeys': [],
'outputs': {
'default': str(self.slot1_key),
},
'args': [],
'classPath': 'does.not.exist1',
'children': [],
'endTimeMs': 1291989316416L,
'maxAttempts': 4,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
}
self.assertEquals(expected, pipeline._get_internal_status(
pipeline_key=self.pipeline1_key,
pipeline_dict={self.pipeline1_key: self.pipeline1_record},
slot_dict={self.slot1_key: self.slot1_record},
barrier_dict={self.barrier1_record.key(): self.barrier1_record}))
def testGetInternalStatus_Retry(self):
"""Tests for _get_internal_status when the status is retry."""
self.pipeline2_record.next_retry_time = self.fill_time
self.pipeline2_record.retry_message = 'My retry message'
expected = {
'status': 'retry',
'lastRetryMessage': 'My retry message',
'currentAttempt': 1,
'afterSlotKeys': [],
'startTimeMs': 1291989316416L,
'outputs': {
'default': str(self.slot2_key),
},
'args': [],
'classPath': 'does.not.exist2',
'children': [],
'maxAttempts': 3,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
}
self.assertEquals(expected, pipeline._get_internal_status(
pipeline_key=self.pipeline2_key,
pipeline_dict={self.pipeline2_key: self.pipeline2_record},
slot_dict={self.slot2_key: self.slot1_record},
barrier_dict={self.barrier2_record.key(): self.barrier2_record}))
def testGetInternalStatus_Waiting(self):
"""Tests for _get_internal_status when the status is waiting."""
expected = {
'status': 'waiting',
'currentAttempt': 1,
'afterSlotKeys': [],
'outputs': {
'default': str(self.slot2_key)
},
'args': [],
'classPath': 'does.not.exist2',
'children': [],
'maxAttempts': 3,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
}
self.assertEquals(expected, pipeline._get_internal_status(
pipeline_key=self.pipeline2_key,
pipeline_dict={self.pipeline2_key: self.pipeline2_record},
slot_dict={self.slot2_key: self.slot1_record},
barrier_dict={
self.barrier2_record.key(): self.barrier2_record,
self.barrier2_record_start.key(): self.barrier2_record_start}))
def testGetInternalStatus_Run(self):
"""Tests for _get_internal_status when the status is run."""
self.pipeline1_record.start_time = self.fill_time
expected = {
'status': 'run',
'currentAttempt': 1,
'afterSlotKeys': [],
'startTimeMs': 1291989316416L,
'outputs': {
'default': str(self.slot1_key)
},
'args': [],
'classPath': 'does.not.exist1',
'children': [],
'maxAttempts': 4,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
}
self.assertEquals(expected, pipeline._get_internal_status(
pipeline_key=self.pipeline1_key,
pipeline_dict={self.pipeline1_key: self.pipeline1_record},
slot_dict={self.slot1_key: self.slot1_record},
barrier_dict={self.barrier1_record.key(): self.barrier1_record}))
def testGetInternalStatus_RunAfterRetry(self):
"""Tests _get_internal_status when a stage is re-run on retrying."""
self.pipeline1_record.start_time = self.fill_time
self.pipeline1_record.next_retry_time = self.fill_time
self.pipeline1_record.retry_message = 'My retry message'
self.pipeline1_record.current_attempt = 1
expected = {
'status': 'run',
'currentAttempt': 2,
'lastRetryMessage': 'My retry message',
'afterSlotKeys': [],
'startTimeMs': 1291989316416L,
'outputs': {
'default': str(self.slot1_key)
},
'args': [],
'classPath': 'does.not.exist1',
'children': [],
'maxAttempts': 4,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
}
self.assertEquals(expected, pipeline._get_internal_status(
pipeline_key=self.pipeline1_key,
pipeline_dict={self.pipeline1_key: self.pipeline1_record},
slot_dict={self.slot1_key: self.slot1_record},
barrier_dict={self.barrier1_record.key(): self.barrier1_record}))
def testGetInternalStatus_Aborted(self):
"""Tests for _get_internal_status when the status is aborted."""
self.pipeline1_record.status = _PipelineRecord.ABORTED
self.pipeline1_record.abort_message = 'I had to bail'
expected = {
'status': 'aborted',
'currentAttempt': 1,
'afterSlotKeys': [],
'abortMessage': 'I had to bail',
'outputs': {
'default': str(self.slot1_key),
},
'args': [],
'classPath': 'does.not.exist1',
'children': [],
'maxAttempts': 4,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
}
self.assertEquals(expected, pipeline._get_internal_status(
pipeline_key=self.pipeline1_key,
pipeline_dict={self.pipeline1_key: self.pipeline1_record},
slot_dict={self.slot1_key: self.slot1_record},
barrier_dict={self.barrier1_record.key(): self.barrier1_record}))
def testGetInternalStatus_MoreParams(self):
"""Tests for _get_internal_status with children, slots, and outputs."""
self.pipeline1_record.start_time = self.fill_time
self.pipeline1_record.fanned_out = [
self.pipeline2_key, self.pipeline3_key]
self.pipeline1_record.params['args'] = [
{'type': 'slot', 'slot_key': 'foobar'},
{'type': 'slot', 'slot_key': 'meepa'},
]
self.pipeline1_record.params['kwargs'] = {
'my_arg': {'type': 'slot', 'slot_key': 'other'},
'second_arg': {'type': 'value', 'value': 1234},
}
self.pipeline1_record.params['output_slots'] = {
'default': str(self.slot1_key),
'another_one': str(self.slot2_key),
}
self.pipeline1_record.params['after_all'] = [
str(self.slot2_key),
]
expected = {
'status': 'run',
'currentAttempt': 1,
'afterSlotKeys': [
'aglteS1hcHAtaWRyGwsSEV9BRV9QaXBlbGluZV9TbG90IgRibHVlDA'
],
'startTimeMs': 1291989316416L,
'outputs': {
'default': 'aglteS1hcHAtaWRyGgsSEV9BRV9QaXBlbGluZV9TbG90IgNyZWQM',
'another_one':
'aglteS1hcHAtaWRyGwsSEV9BRV9QaXBlbGluZV9TbG90IgRibHVlDA',
},
'args': [
{'type': 'slot', 'slotKey': 'foobar'},
{'type': 'slot', 'slotKey': 'meepa'}
],
'classPath': 'does.not.exist1',
'children': [u'two', u'three'],
'maxAttempts': 4,
'kwargs': {
'my_arg': {'type': 'slot', 'slotKey': 'other'},
'second_arg': {'type': 'value', 'value': 1234},
},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
}
self.assertEquals(expected, pipeline._get_internal_status(
pipeline_key=self.pipeline1_key,
pipeline_dict={self.pipeline1_key: self.pipeline1_record},
slot_dict={self.slot1_key: self.slot1_record},
barrier_dict={self.barrier1_record.key(): self.barrier1_record}))
def testGetInternalStatus_StatusRecord(self):
"""Tests for _get_internal_status when the status record is present."""
status_record = _StatusRecord(
key=db.Key.from_path(_StatusRecord.kind(), self.pipeline1_key.name()),
message='My status message',
status_time=self.fill_time,
console_url='/path/to/console',
link_names=[db.Text(x) for x in ('one', 'two', 'three')],
link_urls=[db.Text(x) for x in ('/first', '/second', '/third')],
root_pipeline=self.pipeline1_key)
expected = {
'status': 'run',
'currentAttempt': 1,
'afterSlotKeys': [],
'statusTimeMs': 1291989316416L,
'outputs': {
'default': str(self.slot1_key)
},
'args': [],
'classPath': 'does.not.exist1',
'children': [],
'maxAttempts': 4,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default',
'statusLinks': {
'three': '/third',
'two': '/second',
'one': '/first'
},
'statusConsoleUrl': '/path/to/console',
'statusMessage': 'My status message',
}
self.assertEquals(expected, pipeline._get_internal_status(
pipeline_key=self.pipeline1_key,
pipeline_dict={self.pipeline1_key: self.pipeline1_record},
slot_dict={self.slot1_key: self.slot1_record},
barrier_dict={self.barrier1_record.key(): self.barrier1_record},
status_dict={status_record.key(): status_record}))
def testGetInternalSlot_Missing(self):
"""Tests _get_internal_slot when the slot is missing."""
try:
pipeline._get_internal_slot(slot_key=self.slot1_key)
self.fail('Did not raise')
except pipeline.PipelineStatusError, e:
self.assertEquals(
'Could not find data for output slot key '
'"aglteS1hcHAtaWRyGgsSEV9BRV9QaXBlbGluZV9TbG90IgNyZWQM".',
str(e))
def testGetInternalSlot_Filled(self):
"""Tests _get_internal_slot when the slot is filled."""
self.slot1_record.status = _SlotRecord.FILLED
self.slot1_record.filler = self.pipeline2_key
self.slot1_record.fill_time = self.fill_time
self.slot1_record.root_pipeline = self.pipeline1_key
self.slot1_record.value_text = json.dumps({
'one': 1234, 'two': 'hello'})
expected = {
'status': 'filled',
'fillerPipelineId': 'two',
'value': {'two': 'hello', 'one': 1234},
'fillTimeMs': 1291989316416L
}
self.assertEquals(
expected,
pipeline._get_internal_slot(
slot_key=self.slot1_key,
slot_dict={self.slot1_key: self.slot1_record}))
def testGetInternalSlot_Waiting(self):
"""Tests _get_internal_slot when the slot is waiting."""
self.slot1_record.status = _SlotRecord.WAITING
self.slot1_record.root_pipeline = self.pipeline1_key
expected = {
'status': 'waiting',
'fillerPipelineId': 'two',
}
self.assertEquals(
expected,
pipeline._get_internal_slot(
slot_key=self.slot1_key,
slot_dict={self.slot1_key: self.slot1_record},
filler_pipeline_key=self.pipeline2_key))
def testGetStatusTree_RootMissing(self):
"""Tests get_status_tree when the root pipeline is missing."""
try:
pipeline.get_status_tree(self.pipeline1_key.name())
self.fail('Did not raise')
except pipeline.PipelineStatusError, e:
self.assertEquals('Could not find pipeline ID "one"', str(e))
def testGetStatusTree_NotRoot(self):
"""Tests get_status_tree when the pipeline query is not the root."""
found1_root = _PipelineRecord.root_pipeline.get_value_for_datastore(
self.pipeline1_record)
found2_root = _PipelineRecord.root_pipeline.get_value_for_datastore(
self.pipeline2_record)
self.assertEquals(found1_root, self.pipeline1_key)
self.assertEquals(found2_root, self.pipeline1_key)
db.put([self.pipeline1_record, self.pipeline2_record,
self.slot1_record, self.slot2_record,
self.barrier1_record, self.barrier2_record])
pipeline.get_status_tree(self.pipeline2_key.name())
expected = {
'pipelines': {
'one': {
'afterSlotKeys': [],
'args': [],
'backoffFactor': 2,
'backoffSeconds': 1,
'children': [],
'classPath': 'does.not.exist1',
'currentAttempt': 1,
'kwargs': {},
'maxAttempts': 4,
'outputs': {
'default': str(self.slot1_key),
},
'queueName': 'default',
'status': 'run',
},
},
'rootPipelineId': 'one',
'slots': {},
}
self.assertEquals(
expected,
pipeline.get_status_tree(self.pipeline2_key.name()))
def testGetStatusTree_NotRoot_MissingParent(self):
"""Tests get_status_tree with a non-root pipeline and missing parent."""
found1_root = _PipelineRecord.root_pipeline.get_value_for_datastore(
self.pipeline1_record)
found2_root = _PipelineRecord.root_pipeline.get_value_for_datastore(
self.pipeline2_record)
self.assertEquals(found1_root, self.pipeline1_key)
self.assertEquals(found2_root, self.pipeline1_key)
# Don't put pipeline1_record
db.put([self.pipeline2_record, self.slot1_record, self.slot2_record,
self.barrier1_record, self.barrier2_record])
try:
pipeline.get_status_tree(self.pipeline1_key.name())
self.fail('Did not raise')
except pipeline.PipelineStatusError, e:
self.assertEquals('Could not find pipeline ID "one"', str(e))
def testGetStatusTree_ChildMissing(self):
"""Tests get_status_tree when a fanned out child pipeline is missing."""
self.pipeline1_record.fanned_out = [self.pipeline2_key]
db.put([self.pipeline1_record, self.barrier1_record, self.slot1_record])
try:
pipeline.get_status_tree(self.pipeline1_key.name())
self.fail('Did not raise')
except pipeline.PipelineStatusError, e:
self.assertEquals(
'Pipeline ID "one" points to child ID "two" which does not exist.',
str(e))
def testGetStatusTree_Example(self):
"""Tests a full example of a good get_status_tree response."""
self.pipeline1_record.fanned_out = [self.pipeline2_key, self.pipeline3_key]
self.slot1_record.root_pipeline = self.pipeline1_key
self.pipeline3_record.finalized_time = self.fill_time
# This one looks like a child, but it will be ignored since it is not
# reachable from the root via the fanned_out property.
bad_pipeline_key = db.Key.from_path(_PipelineRecord.kind(), 'ignored')
bad_pipeline_record = _PipelineRecord(
root_pipeline=self.pipeline1_key,
status=_PipelineRecord.RUN,
class_path='does.not.exist4',
# Bug in DB means we need to use the storage name here,
# not the local property name.
params=json.dumps(self.params1),
key=bad_pipeline_key,
max_attempts=4)
db.put([
self.pipeline1_record, self.pipeline2_record, self.pipeline3_record,
self.barrier1_record, self.barrier2_record, self.barrier3_record,
self.slot1_record, self.slot2_record, self.slot3_record,
bad_pipeline_record])
expected = {
'rootPipelineId': 'one',
'pipelines': {
'three': {
'status': 'done',
'currentAttempt': 1L,
'afterSlotKeys': [],
'outputs': {
'default': str(self.slot3_key)
},
'args': [],
'classPath': 'does.not.exist3',
'children': [],
'endTimeMs': 1291989316416L,
'maxAttempts': 2L,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
},
'two': {
'status': 'run',
'currentAttempt': 1L,
'afterSlotKeys': [],
'outputs': {
'default': str(self.slot2_key)
},
'args': [],
'classPath': 'does.not.exist2',
'children': [],
'maxAttempts': 3L,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
},
'one': {
'status': 'run',
'currentAttempt': 1L,
'afterSlotKeys': [],
'outputs': {
'default': str(self.slot1_key)
},
'args': [],
'classPath': 'does.not.exist1',
'children': ['two', 'three'],
'maxAttempts': 4L,
'kwargs': {},
'backoffFactor': 2,
'backoffSeconds': 1,
'queueName': 'default'
}
},
'slots': {
str(self.slot2_key): {
'status': 'waiting',
'fillerPipelineId': 'two'
},
str(self.slot3_key): {
'status': 'waiting',
'fillerPipelineId': 'three'
}
}
}
self.assertEquals(
expected,
pipeline.get_status_tree(self.pipeline1_key.name()))
def testGetPipelineNames(self):
"""Tests the get_pipeline_names function."""
names = pipeline.get_pipeline_names()
self.assertTrue(None not in names) # No base-class Pipeline
self.assertIn('__main__.EchoSync', names)
found = False
for name in names:
# Name may be relative to another module, like 'foo.pipeline.common...'
found = 'pipeline.common.Delay' in name
if found:
break
self.assertTrue(found)
def testGetRootList(self):
"""Tests the get_root_list function."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.start(idempotence_key='banana')
stage.set_status('This one has a message')
stage2 = EchoSync('one')
stage2.start(idempotence_key='lemon')
found = pipeline.get_root_list()
self.assertFalse('cursor' in found) # No next page available
found_names = [
(p['pipelineId'], p['classPath']) for p in found['pipelines']]
expected = [
('lemon', '__main__.EchoSync'),
('banana', '__main__.NothingPipeline')
]
self.assertEquals(expected, found_names)
self.assertEquals('This one has a message',
found['pipelines'][1]['statusMessage'])
def testGetRootList_FinalizeBarrierMissing(self):
"""Tests get_status_tree when a finalization barrier is missing."""
stage = NothingPipeline('one', 'two', three='red', four=1234)
stage.start(idempotence_key='banana')
stage.set_status('This one has a message')
stage_key = db.Key.from_path(
pipeline._PipelineRecord.kind(), stage.pipeline_id)
finalization_key = db.Key.from_path(
pipeline._BarrierRecord.kind(), _BarrierRecord.FINALIZE,
parent=stage_key)
db.delete(finalization_key)
found = pipeline.get_root_list()
self.assertFalse('cursor' in found) # No next page available
found_names = [
(p['pipelineId'], p['classPath']) for p in found['pipelines']]
expected = [
('banana', '')
]
self.assertEquals(expected, found_names)
self.assertEquals(
'Finalization barrier missing for pipeline ID "%s"' % stage.pipeline_id,
found['pipelines'][0]['status'])
def testGetRootListCursor(self):
"""Tests the count and cursor behavior of get_root_list."""
NothingPipeline().start(idempotence_key='banana')
NothingPipeline().start(idempotence_key='lemon')
# Find newest
found = pipeline.get_root_list(count=1)
self.assertIn('cursor', found)
self.assertEquals(1, len(found['pipelines']))
self.assertEquals('lemon', found['pipelines'][0]['pipelineId'])
# Find next newest, and no cursor should be returned.
found = pipeline.get_root_list(count=1, cursor=found['cursor'])
self.assertFalse('cursor' in found)
self.assertEquals(1, len(found['pipelines']))
self.assertEquals('banana', found['pipelines'][0]['pipelineId'])
def testGetRootListClassPath(self):
"""Tests filtering a root list to a single class_path."""
NothingPipeline().start(idempotence_key='banana')
NothingPipeline().start(idempotence_key='lemon')
EchoSync('one').start(idempotence_key='tomato')
found = pipeline.get_root_list(class_path=NothingPipeline.class_path)
self.assertEquals(['__main__.NothingPipeline', '__main__.NothingPipeline'],
[p['classPath'] for p in found['pipelines']])
found = pipeline.get_root_list(class_path=EchoSync.class_path)
self.assertEquals(['__main__.EchoSync'],
[p['classPath'] for p in found['pipelines']])
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()