blob: f7e17c3eee41f95c3d7566fa96b7f88a9e7bfea5 [file] [log] [blame]
# Copyright (c) 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import os
import sys
import webapp2
from webapp2_extras import jinja2
from google.appengine.ext import ndb
from model import Badge, UserData, Settings
class BadgeView(object):
def __init__(self, user_data, badge_def_dict):
self.user_data = user_data
self.badge_def = badge_def_dict.get(user_data.badge_name)
if not self.badge_def:
self.show = False
return
if self.badge_def.show_number:
self.show = user_data.value > 0
self.level = user_data.value
else:
self.show = user_data.value >= self.badge_def.level_1
if user_data.value >= self.badge_def.level_3:
self.level = 3
elif user_data.value >= self.badge_def.level_2:
self.level = 2
else:
self.level = None # Don't show level for 1
class BaseHandler(webapp2.RequestHandler):
"""Provide a cached Jinja environment to each request."""
def __init__(self, *args, **kwargs):
webapp2.RequestHandler.__init__(self, *args, **kwargs)
@staticmethod
def jinja2_factory(app):
template_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), 'templates'))
config = {'template_path': template_dir}
jinja = jinja2.Jinja2(app, config=config)
return jinja
@webapp2.cached_property
def jinja2(self):
# Returns a Jinja2 renderer cached in the app registry.
return jinja2.get_jinja2(app=self.app, factory=BaseHandler.jinja2_factory)
def render_response(self, _template, **context):
# Renders a template and writes the result to the response.
context.update({
'app_version': os.environ.get('CURRENT_VERSION_ID'),
})
rv = self.jinja2.render_template(_template, **context)
self.response.write(rv)
class UserPage(BaseHandler):
"""Show all (non-hidden) chromium badges for the viewed user."""
def get(self, viewed_user_email, *args):
if '@' not in viewed_user_email:
viewed_user_email += '@chromium.org'
user_data_list = UserData.query(UserData.email == viewed_user_email).fetch()
badge_def_list = Badge.query().fetch()
badge_def_dict = {b.badge_name: b for b in badge_def_list}
badge_views = [
BadgeView(ud, badge_def_dict)
for ud in user_data_list]
context = {
'viewed_user_email': viewed_user_email,
'badges': badge_views,
}
self.render_response('user.html', **context)
class MainPage(BaseHandler):
"""The default page shows the signed in user their own badges."""
def get(self, *args):
# TODO: redirect to signed in user... what if not signed in?
self.redirect('/hinoka@chromium.org')
class BadgePage(BaseHandler):
"""Display page description, level thresholds, and times awarded."""
def get(self, badge_name, *args):
logging.info('badge_name = %r', badge_name)
badge_def = Badge.get_by_id(badge_name)
if not badge_def:
logging.info('badge def was %r', badge_def)
self.abort(404)
awarded_count = UserData.query(
UserData.badge_name == badge_name,
UserData.value >= badge_def.level_1 or 0).count()
l1_count = UserData.query(
UserData.badge_name == badge_name,
UserData.value >= badge_def.level_1 or 0,
UserData.value < badge_def.level_2 or sys.maxint).count()
l2_count = UserData.query(
UserData.badge_name == badge_name,
UserData.value >= badge_def.level_2 or 0,
UserData.value < badge_def.level_3 or sys.maxint).count()
l3_count = UserData.query(
UserData.badge_name == badge_name,
UserData.value >= badge_def.level_3 or 0).count()
context = {
'badge_def': badge_def,
'awarded_count': awarded_count,
'l1_count': l1_count,
'l2_count': l2_count,
'l3_count': l3_count
}
self.render_response('badge.html', **context)
class Update(BaseHandler):
"""Update badge data.
The expected format is:
[
{
"badge_name": <str>,
"level_1": <int>, # Optional
"level_2": <int>, # Optional
"level_3": <int>, # Optional
"show_number": <bool>, # Optional
"title": <str>, # Optional
"description": <str>, # Optional
"icon": <str>, # URI, Optional
"data": {
{
"email": <str>,
"value": <int>,
}
}
}
]
"""
def post(self):
password = self.request.POST.getone('password')
settings = Settings.get_by_id('1')
if password != settings.password:
self.response.set_status(403)
self.response.write('invalid password')
data = self.request.POST.getone('data')
if not data:
self.response.set_status(400)
self.response.write('no data given')
return
o = json.loads(data)
for badge in o:
logging.info('Updating %s' % badge)
b = self.update_badge_entity(badge)
self.update_user_data(badge, b)
@staticmethod
def update_badge_entity(badge):
name = badge['badge_name']
level_1 = badge.get('level_1', None)
level_2 = badge.get('level_2', None)
level_3 = badge.get('level_3', None)
show_number = badge.get('show_number', None)
title = badge.get('title', None)
description = badge.get('description', None)
icon = badge.get('icon', None)
b = Badge.get_by_id(id=name)
if not b:
b = Badge(id=name, badge_name=name)
if level_1 is not None:
b.level_1 = level_1
if level_2 is not None:
b.level_2 = level_2
if level_3 is not None:
b.level_3 = level_3
if show_number is not None:
b.show_number = show_number
if title is not None:
b.title = title
if description is not None:
b.description = description
if icon is not None:
b.icon = icon
b.put()
return b
@staticmethod
def update_user_data(badge, b):
data = badge.get('data', [])
# There might be a max batch size? We'll find out.
to_put = []
for item in data:
email = item['email']
value = int(item['value']) # JSON might turn it into a float.
uid = '%s:%s' % (b.badge_name, email)
d = UserData.get_by_id(id=uid)
if d and not d.visible:
continue
d = UserData(
badge_name=b.badge_name, email=email, value=value,
visible=True, id=uid)
to_put.append(d)
ndb.put_multi(to_put)
app = webapp2.WSGIApplication([
(r'/system/update', Update),
(r'/b/([-_.A-Za-z0-9]+)', BadgePage),
(r'/([-_+A-Za-z0-9]+(@[.A-Za-z]+)?)', UserPage),
('/', MainPage),
])