blob: 198372d3f979773fc7b7104817cc3f3475fdcd83 [file] [log] [blame] [edit]
#!/usr/bin/env python
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Demo Pipeline API application."""
from __future__ import with_statement
import logging
import os
import time
from google.appengine.api import mail
from google.appengine.api import users
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp import util
import pipeline
from pipeline import common
# Let anyone hit Pipeline handlers!
pipeline.set_enforce_auth(False)
################################################################################
# An example pipeline for generating reports.
class LongCount(pipeline.Pipeline):
def run(self, entity_kind, property_name, value):
cursor = None
count = 0
while True:
query = db.GqlQuery(
'SELECT * FROM %s WHERE %s = :1' % (entity_kind, property_name),
value.lower(), key_only=True, cursor=cursor)
result = query.fetch(1000)
count += len(result)
if len(result) < 1000:
return (entity_kind, property_name, value, count)
else:
cursor = query.cursor()
class SplitCount(pipeline.Pipeline):
def run(self, entity_kind, property_name, *value_list):
all_counts = []
for value in value_list:
stage = yield LongCount(entity_kind, property_name, value)
all_counts.append(stage)
yield common.Append(*all_counts)
class UselessPipeline(pipeline.Pipeline):
"""This pipeline is totally useless. It just demostrates that it will run
in parallel, yield a named output, and properly be ignored by the system.
"""
def run(self):
if not self.test_mode and self.current_attempt == 1:
self.set_status(message='Pretending to fail, will retry shortly.',
console_url='/static/console.html',
status_links={'Home': '/'})
raise pipeline.Retry('Whoops, I need to retry')
# Looks like a generator, but it's not.
if False:
yield common.Log.info('Okay!')
self.fill('coolness', 1234)
class EmailCountReport(pipeline.Pipeline):
def run(self, receiver_email_address, kind_count_list):
body = [
'At %s the counts are:' % time.ctime()
]
result_sum = 0
for (entity_kind, property_name, value, count) in kind_count_list:
body.append('%s.%s = "%s" -> %d' % (
entity_kind, property_name, value, count))
result_sum += count
rendered = '\n'.join(body)
logging.info('Email body is:\n%s', rendered)
# Works in production, I swear!
sender = '%s@%s.appspotmail.com' % (os.environ['APPLICATION_ID'],
os.environ['APPLICATION_ID'])
try:
mail.send_mail(
sender=sender,
to=receiver_email_address,
subject='Entity count report',
body=rendered)
except (mail.InvalidSenderError, mail.InvalidEmailError):
logging.exception('This should work in production.')
return result_sum
class CountReport(pipeline.Pipeline):
def run(self, email_address, entity_kind, property_name, *value_list):
yield common.Log.info('UselessPipeline.coolness = %s',
(yield UselessPipeline()).coolness)
split_counts = yield SplitCount(entity_kind, property_name, *value_list)
yield common.Log.info('SplitCount result = %s', split_counts)
with pipeline.After(split_counts):
with pipeline.InOrder():
yield common.Delay(seconds=1)
yield common.Log.info('Done waiting')
yield EmailCountReport(email_address, split_counts)
def finalized(self):
if not self.was_aborted:
logging.info('All done! Found %s results', self.outputs.default.value)
################################################################################
# Silly guestbook application to run the pipelines on.
class GuestbookPost(db.Model):
color = db.StringProperty()
write_time = db.DateTimeProperty(auto_now_add=True)
class StartPipelineHandler(webapp.RequestHandler):
def get(self):
self.response.out.write(template.render('start.html', {}))
def post(self):
colors = [color for color in self.request.get_all('color') if color]
job = CountReport(
users.get_current_user().email(),
GuestbookPost.kind(),
'color',
*colors)
job.start()
self.redirect('/_ah/pipeline/status?root=%s' % job.pipeline_id)
class MainHandler(webapp.RequestHandler):
def get(self):
context = {'posts': GuestbookPost.all().order('-write_time').fetch(100)}
self.response.out.write(template.render('guestbook.html', context))
def post(self):
color = self.request.get('color')
if color:
GuestbookPost(color=color.lower()).put()
self.redirect('/')
def main():
application = webapp.WSGIApplication([
(r'/', MainHandler),
(r'/pipeline', StartPipelineHandler),
], debug=True)
util.run_wsgi_app(application)
if __name__ == '__main__':
main()