blob: f7c64f737b246c1e17c587d63f0aea0ddb1f350c [file] [log] [blame] [edit]
#!/usr/bin/env python
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Datastore models used by the Google App Engine Pipeline API."""
from google.appengine.ext import db
from google.appengine.ext import blobstore
try:
import json
except ImportError:
import simplejson as json
# Relative imports
import util
class _PipelineRecord(db.Model):
"""Represents a Pipeline.
Key name is a randomly assigned UUID. No parent entity.
Properties:
class_path: Path of the Python class to use for this pipeline.
root_pipeline: The root of the whole workflow; set to itself this pipeline
is its own root.
fanned_out: List of child _PipelineRecords that were started when this
generator pipeline moved from WAITING to RUN.
start_time: For pipelines with no start _BarrierRecord, when this pipeline
was enqueued to run immediately.
finalized_time: When this pipeline moved from WAITING or RUN to DONE.
params: Serialized parameter dictionary.
status: The current status of the pipeline.
current_attempt: The current attempt (starting at 0) to run.
max_attempts: Maximum number of attempts (starting at 0) to run.
next_retry_time: ETA of the next retry attempt.
retry_message: Why the last attempt failed; None or empty if no message.
Root pipeline properties:
is_root_pipeline: This is a root pipeline.
abort_message: Why the whole pipeline was aborted; only saved on
root pipelines.
abort_requested: If an abort signal has been requested for this root
pipeline; only saved on root pipelines
"""
WAITING = 'waiting'
RUN = 'run'
DONE = 'done'
ABORTED = 'aborted'
class_path = db.StringProperty()
root_pipeline = db.SelfReferenceProperty(
collection_name='child_pipelines_set')
fanned_out = db.ListProperty(db.Key, indexed=False)
start_time = db.DateTimeProperty(indexed=True)
finalized_time = db.DateTimeProperty(indexed=False)
# One of these two will be set, depending on the size of the params.
params_text = db.TextProperty(name='params')
params_blob = blobstore.BlobReferenceProperty(
name='params_blob', indexed=False)
status = db.StringProperty(choices=(WAITING, RUN, DONE, ABORTED),
default=WAITING)
# Retry behavior
current_attempt = db.IntegerProperty(default=0, indexed=False)
max_attempts = db.IntegerProperty(default=1, indexed=False)
next_retry_time = db.DateTimeProperty(indexed=False)
retry_message = db.TextProperty()
# Root pipeline properties
is_root_pipeline = db.BooleanProperty()
abort_message = db.TextProperty()
abort_requested = db.BooleanProperty(indexed=False)
@classmethod
def kind(cls):
return '_AE_Pipeline_Record'
@property
def params(self):
"""Returns the dictionary of parameters for this Pipeline."""
if hasattr(self, '_params_decoded'):
return self._params_decoded
if self.params_blob is not None:
value_encoded = self.params_blob.open().read()
else:
value_encoded = self.params_text
value = json.loads(value_encoded, cls=util.JsonDecoder)
if isinstance(value, dict):
kwargs = value.get('kwargs')
if kwargs:
adjusted_kwargs = {}
for arg_key, arg_value in kwargs.iteritems():
# Python only allows non-unicode strings as keyword arguments.
adjusted_kwargs[str(arg_key)] = arg_value
value['kwargs'] = adjusted_kwargs
self._params_decoded = value
return self._params_decoded
class _SlotRecord(db.Model):
"""Represents an output slot.
Key name is a randomly assigned UUID. No parent for slots of child pipelines.
For the outputs of root pipelines, the parent entity is the root
_PipelineRecord (see Pipeline.start()).
Properties:
root_pipeline: The root of the workflow.
filler: The pipeline that filled this slot.
value: Serialized value for this slot.
status: The current status of the slot.
fill_time: When the slot was filled by the filler.
"""
FILLED = 'filled'
WAITING = 'waiting'
root_pipeline = db.ReferenceProperty(_PipelineRecord)
filler = db.ReferenceProperty(_PipelineRecord,
collection_name='filled_slots_set')
# One of these two will be set, depending on the size of the value.
value_text = db.TextProperty(name='value')
value_blob = blobstore.BlobReferenceProperty(
name='value_blob', indexed=False)
status = db.StringProperty(choices=(FILLED, WAITING), default=WAITING,
indexed=False)
fill_time = db.DateTimeProperty(indexed=False)
@classmethod
def kind(cls):
return '_AE_Pipeline_Slot'
@property
def value(self):
"""Returns the value of this Slot."""
if hasattr(self, '_value_decoded'):
return self._value_decoded
if self.value_blob is not None:
encoded_value = self.value_blob.open().read()
else:
encoded_value = self.value_text
self._value_decoded = json.loads(encoded_value, cls=util.JsonDecoder)
return self._value_decoded
class _BarrierRecord(db.Model):
"""Represents a barrier.
Key name is the purpose of the barrier (START or FINALIZE). Parent entity
is the _PipelineRecord the barrier should trigger when all of its
blocking_slots are filled.
Properties:
root_pipeline: The root of the workflow.
target: The pipeline to run when the barrier fires.
blocking_slots: The slots that must be filled before this barrier fires.
trigger_time: When this barrier fired.
status: The current status of the barrier.
"""
# Barrier statuses
FIRED = 'fired'
WAITING = 'waiting'
# Barrier trigger reasons (used as key names)
START = 'start'
FINALIZE = 'finalize'
ABORT = 'abort'
root_pipeline = db.ReferenceProperty(_PipelineRecord)
target = db.ReferenceProperty(_PipelineRecord,
collection_name='called_barrier_set')
blocking_slots = db.ListProperty(db.Key)
trigger_time = db.DateTimeProperty(indexed=False)
status = db.StringProperty(choices=(FIRED, WAITING), default=WAITING,
indexed=False)
@classmethod
def kind(cls):
return '_AE_Pipeline_Barrier'
class _BarrierIndex(db.Model):
"""Indicates a _BarrierRecord that is dependent on a slot.
Previously, when a _SlotRecord was filled, notify_barriers() would query for
all _BarrierRecords where the 'blocking_slots' property equals the
_SlotRecord's key. The problem with that approach is the 'blocking_slots'
index is eventually consistent, meaning _BarrierRecords that were just written
will not match the query. When pipelines are created and barriers are notified
in rapid succession, the inconsistent queries can cause certain barriers never
to fire. The outcome is a pipeline is WAITING and never RUN, even though all
of its dependent slots have been filled.
This entity is used to make it so barrier fan-out is fully consistent
with the High Replication Datastore. It's used by notify_barriers() to
do fully consistent ancestor queries every time a slot is filled. This
ensures that even all _BarrierRecords dependent on a _SlotRecord will
be found regardless of eventual consistency.
The key path for _BarrierIndexes is this for root entities:
_PipelineRecord<owns_slot_id>/_SlotRecord<slot_id>/
_PipelineRecord<dependent_pipeline_id>/_BarrierIndex<purpose>
And this for child pipelines:
_SlotRecord<slot_id>/_PipelineRecord<dependent_pipeline_id>/
_BarrierIndex<purpose>
That path is translated to the _BarrierRecord it should fire:
_PipelineRecord<dependent_pipeline_id>/_BarrierRecord<purpose>
All queries for _BarrierIndexes are key-only and thus the model requires
no properties or helper methods.
"""
# Enable this entity to be cleaned up.
root_pipeline = db.ReferenceProperty(_PipelineRecord)
@classmethod
def kind(cls):
return '_AE_Barrier_Index'
@classmethod
def to_barrier_key(cls, barrier_index_key):
"""Converts a _BarrierIndex key to a _BarrierRecord key.
Args:
barrier_index_key: db.Key for a _BarrierIndex entity.
Returns:
db.Key for the corresponding _BarrierRecord entity.
"""
barrier_index_path = barrier_index_key.to_path()
# Pick out the items from the _BarrierIndex key path that we need to
# construct the _BarrierRecord key path.
(pipeline_kind, dependent_pipeline_id,
unused_kind, purpose) = barrier_index_path[-4:]
barrier_record_path = (
pipeline_kind, dependent_pipeline_id,
_BarrierRecord.kind(), purpose)
return db.Key.from_path(*barrier_record_path)
class _StatusRecord(db.Model):
"""Represents the current status of a pipeline.
Properties:
message: The textual message to show.
console_url: URL to iframe as the primary console for this pipeline.
link_names: Human display names for status links.
link_urls: URLs corresponding to human names for status links.
status_time: When the status was written.
"""
root_pipeline = db.ReferenceProperty(_PipelineRecord)
message = db.TextProperty()
console_url = db.TextProperty()
link_names = db.ListProperty(db.Text, indexed=False)
link_urls = db.ListProperty(db.Text, indexed=False)
status_time = db.DateTimeProperty(indexed=False)
@classmethod
def kind(cls):
return '_AE_Pipeline_Status'