blob: 7b77a97bdf48c30d8ef8f90d37e5f820b06ffb0d [file] [log] [blame]
# Copyright (c) 2010 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import copy
import datetime
import logging
import os
import pickle
import sys
import uuid
import webapp2
from google.appengine.api import memcache
from google.appengine.api import users
from google.appengine.ext import db
from google.appengine.ext.webapp import template
TIMEOUT_DURATION = 10
class WorkerMetrics(db.Model):
last_worker_connect = db.DateTimeProperty()
class WorkerPassword(db.Model):
password = db.StringProperty()
class MirrorTask(db.Model):
name = db.StringProperty()
contacts = db.StringListProperty()
source = db.StringProperty()
destination = db.StringProperty()
syncs_per_day = db.FloatProperty()
last_sync = db.DateTimeProperty(auto_now_add=True)
status = db.StringProperty()
fraction_complete = db.FloatProperty()
class MirrorWorkLog(db.Model):
work_id = db.StringProperty()
start_timestamp = db.DateTimeProperty(auto_now_add=True)
finish_timestamp = db.DateTimeProperty()
status = db.StringProperty()
result = db.StringProperty()
basis_name = db.StringProperty()
basis_contacts = db.StringListProperty()
basis_source = db.StringProperty()
basis_destination = db.StringProperty()
basis_syncs_per_day = db.FloatProperty()
percent_complete = db.FloatProperty()
task = db.ReferenceProperty(MirrorTask)
class MirrorTaskChange(db.Model):
timestamp = db.DateTimeProperty(auto_now_add=True)
who = db.UserProperty()
task = db.ReferenceProperty(MirrorTask)
old_name = db.StringProperty()
old_contacts = db.StringListProperty()
old_source = db.StringProperty()
old_destination = db.StringProperty()
old_syncs_per_day = db.FloatProperty()
new_name = db.StringProperty()
new_contacts = db.StringListProperty()
new_source = db.StringProperty()
new_destination = db.StringProperty()
new_syncs_per_day = db.FloatProperty()
def GetWorkerPassword():
passwd = memcache.get('worker_password')
if passwd is None:
wp = WorkerPassword.all().get()
if wp:
passwd = wp.password
memcache.add('worker_password', passwd, 60)
return passwd
def CheckWorkerPassword(request_handler):
passwd = GetWorkerPassword()
return request_handler.request.get('password') == passwd
def CheckUserPassword(request_handler):
uri = request_handler.request.uri
user = users.get_current_user()
if not user:
request_handler.redirect(users.create_login_url(uri))
return False
# Only allow @google.com folks to look.
if not user.email().endswith('@google.com'):
return False
return True
def UpdateWorkerMetrics():
wm = WorkerMetrics.all().get()
if not wm:
wm = WorkerMetrics()
wm.last_worker_connect = datetime.datetime.utcnow()
wm.put()
class StartHandler(webapp2.RequestHandler):
"""Handle requests to start work."""
def post(self):
self.response.headers['Content-Type'] = 'text/plain'
if not CheckWorkerPassword(self):
self.response.write('boo\n')
logging.debug('Bad password!')
return
UpdateWorkerMetrics()
# Find work.
tasks = MirrorTask.all().fetch(1000) or []
picked_task = None
scale = 1000
for task in tasks:
if (datetime.datetime.utcnow() >
task.last_sync + (datetime.timedelta(days=1) * scale /
int(task.syncs_per_day * scale))):
picked_task = task
break
if picked_task:
# Create a job entry.
work_id = str(uuid.uuid1())
mwl = MirrorWorkLog()
mwl.basis_name = picked_task.name
mwl.basis_contacts = picked_task.contacts
mwl.basis_source = picked_task.source
mwl.basis_destination = picked_task.destination
mwl.basis_syncs_per_day = picked_task.syncs_per_day
mwl.work_id = work_id
mwl.fraction_complete = 0.0
mwl.task = picked_task
mwl.status = 'running'
mwl.put()
# Update task.
picked_task.last_sync = datetime.datetime.utcnow()
picked_task.fraction_complete = 0.0
picked_task.status = 'running'
picked_task.put()
# Give client info about this job.
self.response.out.write(pickle.dumps({
'type': 'mirror',
'name': picked_task.name,
'source': picked_task.source,
'destination': picked_task.destination,
'work_id': work_id,
}))
else:
# Sleep if there's nothing to do.
self.response.out.write(pickle.dumps({
'type': 'sleep',
'duration': TIMEOUT_DURATION,
}))
class StatusHandler(webapp2.RequestHandler):
"""Handle requests to note changes in a task's status."""
def post(self):
if not CheckWorkerPassword(self): return
UpdateWorkerMetrics()
status = self.request.get('status')
fraction_complete = float(self.request.get('fraction_complete'))
work_id = str(self.request.get('work_id'))
mwl = db.GqlQuery('SELECT * FROM MirrorWorkLog '
'WHERE work_id = :1', work_id).get()
if mwl:
if mwl.task.status == 'idle':
status = 'idle'
if status == 'idle':
mwl.finish_timestamp = datetime.datetime.utcnow()
mwl.status = status
mwl.fraction_complete = fraction_complete
mwl.put()
# Update root task.
mwl.task.fraction_complete = fraction_complete
mwl.task.status = status
mwl.task.put()
self.response.out.write(str(status))
class SetHandler(webapp2.RequestHandler):
"""Handle requests to change tasks."""
def post(self):
if not CheckUserPassword(self): return
key = self.request.get('key')
# Optionally stop.
if self.request.get('stop'):
task = MirrorTask.get(key)
task.status = 'idle'
task.put()
return
# Optionally start.
if self.request.get('start'):
task = MirrorTask.get(key)
scale = 1000
stale = task.last_sync - (datetime.timedelta(days=1) * scale /
int(task.syncs_per_day * scale))
stale -= datetime.timedelta(days=1) # Just for good measure.
task.last_sync = stale
task.put()
return
# Prepare a change.
change = MirrorTaskChange()
# Get the old value if any.
if key:
task = MirrorTask.get(key)
# Store old state.
change.old_name = task.name
change.old_contacts = task.contacts
change.old_source = task.source
change.old_destination = task.destination
change.old_syncs_per_day = task.syncs_per_day
if self.request.get('delete'):
# Store the change.
change.who = users.get_current_user()
change.task = task
change.put()
# Delete it.
task.delete()
return
else:
task = MirrorTask()
task.fraction_complete = 1.0
task.status = 'idle'
# Update it.
task.name = self.request.get('name')
task.contacts = [i.strip() for i
in self.request.get('contacts').split(',')]
task.source = self.request.get('source')
task.destination = self.request.get('destination')
task.syncs_per_day = float(self.request.get('syncs_per_day'))
task.put()
# Store the change.
change.who = users.get_current_user()
change.task = task
change.new_name = task.name
change.new_contacts = task.contacts
change.new_source = task.source
change.new_destination = task.destination
change.new_syncs_per_day = task.syncs_per_day
change.put()
class AdminHandler(webapp2.RequestHandler):
"""Handle admin only tasks."""
def post(self):
if not CheckUserPassword(self): return
if not users.is_current_user_admin(): return
# Handle setting password.
password = self.request.get('password')
if password:
wp = WorkerPassword.all().get()
if not wp:
wp = WorkerPassword()
wp.password = password
wp.put()
memcache.add('worker_password', password, 60)
class HealthHandler(webapp2.RequestHandler):
"""Handle requests for health check."""
def get(self):
wm = WorkerMetrics.all().get()
if not wm:
return
# Convert last_worker_connect to integer seconds.
tm = datetime.datetime.utcnow()
td = tm - wm.last_worker_connect
last_worker_connect = td.seconds + td.days * 24 * 3600
# Emit as json.
self.response.headers['Content-Type'] = 'text/plain'
self.response.out.write(json.dumps({
'last_worker_connect': last_worker_connect,
}))
def DescribeChange(change):
class Description:
pass
desc = Description()
desc.timestamp = change.timestamp
desc.who = change.who.email()
desc.name = change.old_name
if change.new_name is None:
desc.changes = 'DELETED'
else:
changes = ''
if change.old_name != change.new_name:
changes += 'name: %s. ' % change.new_name
if change.old_contacts != change.new_contacts:
changes += 'contacts: %s. ' % ', '.join(change.new_contacts)
if change.old_source != change.new_source:
changes += 'source: %s.' % change.new_destination
if change.old_destination != change.new_destination:
changes += 'destination: %s.' % change.new_destination
if change.old_syncs_per_day != change.new_syncs_per_day:
changes += 'daily syncs: %s. ' % str(change.new_syncs_per_day)
desc.changes = changes
return desc
class ConsoleHandler(webapp2.RequestHandler):
"""View log info."""
def get(self):
if not CheckUserPassword(self): return
# Get tasks.
tasks = db.GqlQuery('SELECT * FROM MirrorTask').fetch(1000) or []
# Embelish with fields for template.
for task in tasks:
task.contacts_str = ', '.join(task.contacts)
task.key_str = str(task.key())
task.is_idle = task.status == 'idle'
if task.status == 'running':
task.status_str = 'running %0.2f%%' % (task.fraction_complete * 100)
else:
task.status_str = task.status
# Get Changes.
changes = db.GqlQuery('SELECT * FROM MirrorTaskChange '
'ORDER BY timestamp DESC').fetch(100) or []
changes = [DescribeChange(c) for c in changes]
# Decide if we're an admin.
is_admin = users.is_current_user_admin()
# Display main console.
template_values = {
'email': users.get_current_user().email(),
'logout_url': users.create_logout_url('/'),
'tasks': tasks,
'changes': changes,
'is_admin': is_admin,
}
# Add things for admins.
if is_admin:
passwd = GetWorkerPassword()
template_values['password'] = passwd
# Render the template.
path = os.path.join(os.path.dirname(__file__), 'templates', 'console.html')
self.response.out.write(template.render(path, template_values))
APP = webapp2.WSGIApplication([
('/', ConsoleHandler),
('/start', StartHandler),
('/status', StatusHandler),
('/set', SetHandler),
('/admin', AdminHandler),
('/health', HealthHandler),
], debug=False)