blob: 6e43c121c965895031ea87ab52ce73b2a9396057 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import datetime
import hashlib
import logging
import os
import sys
import unittest
import zlib
import test_env
test_env.setup_test_env()
import cloudstorage
from google.appengine.ext import ndb
import webtest
from components import auth
from components import auth_testing
from components import template
from components import utils
from test_support import test_case
import acl
import config
import gcs
import handlers_backend
import handlers_frontend
import model
import stats
# Access to a protected member _XXX of a client class
# pylint: disable=W0212
class MainTest(test_case.TestCase):
"""Tests the handlers."""
APP_DIR = test_env.APP_DIR
def setUp(self):
"""Creates a new app instance for every test case."""
super(MainTest, self).setUp()
self.testbed.init_user_stub()
# When called during a taskqueue, the call to get_app_version() may fail so
# pre-fetch it.
version = utils.get_app_version()
self.mock(utils, 'get_task_queue_host', lambda: version)
self.source_ip = '192.168.0.1'
self.app_frontend = webtest.TestApp(
handlers_frontend.create_application(debug=True),
extra_environ={'REMOTE_ADDR': self.source_ip})
# This is awkward but both the frontend and backend applications uses the
# same template variables.
template.reset()
self.app_backend = webtest.TestApp(
handlers_backend.create_application(debug=True),
extra_environ={'REMOTE_ADDR': self.source_ip})
# Tasks are enqueued on the backend.
self.app = self.app_backend
self.auth_app = webtest.TestApp(
auth.create_wsgi_application(debug=True),
extra_environ={
'REMOTE_ADDR': self.source_ip,
'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
})
full_access_group = config.settings().auth.full_access_group
readonly_access_group = config.settings().auth.readonly_access_group
auth.bootstrap_group(
auth.ADMIN_GROUP,
[auth.Identity(auth.IDENTITY_USER, 'admin@example.com')])
auth.bootstrap_group(
readonly_access_group,
[auth.Identity(auth.IDENTITY_USER, 'reader@example.com')])
auth.bootstrap_group(
full_access_group,
[auth.Identity(auth.IDENTITY_USER, 'writer@example.com')])
# TODO(maruel): Create a BOTS_GROUP.
self.set_as_anonymous()
def tearDown(self):
template.reset()
super(MainTest, self).tearDown()
def set_as_anonymous(self):
self.testbed.setup_env(USER_EMAIL='', overwrite=True)
auth.ip_whitelist_key(auth.bots_ip_whitelist()).delete()
auth_testing.reset_local_state()
def set_as_admin(self):
self.set_as_anonymous()
self.testbed.setup_env(USER_EMAIL='admin@example.com', overwrite=True)
def set_as_reader(self):
self.set_as_anonymous()
self.testbed.setup_env(USER_EMAIL='reader@example.com', overwrite=True)
def _gen_stats(self):
# Generates data for the last 10 days, last 10 hours and last 10 minutes.
# TODO(maruel): Stop accessing the DB directly. Use stats_framework_mock to
# generate it.
now = datetime.datetime(2010, 1, 2, 3, 4, 5, 6)
self.mock_now(now, 0)
handler = stats.STATS_HANDLER
for i in xrange(10):
s = stats._Snapshot(requests=100 + i)
day = (now - datetime.timedelta(days=i)).date()
handler.stats_day_cls(key=handler.day_key(day), values_compressed=s).put()
for i in xrange(10):
s = stats._Snapshot(requests=10 + i)
timestamp = (now - datetime.timedelta(hours=i))
handler.stats_hour_cls(
key=handler.hour_key(timestamp), values_compressed=s).put()
for i in xrange(10):
s = stats._Snapshot(requests=1 + i)
timestamp = (now - datetime.timedelta(minutes=i))
handler.stats_minute_cls(
key=handler.minute_key(timestamp), values_compressed=s).put()
@staticmethod
def gen_content_inline(namespace='default', content='Foo'):
hashhex = hashlib.sha1(content).hexdigest()
key = model.get_entry_key(namespace, hashhex)
model.new_content_entry(
key,
is_isolated=False,
content=content,
compressed_size=len(content),
expanded_size=len(content),
is_verified=True).put()
return hashhex
def get_xsrf_token(self):
"""Gets the generic XSRF token for web clients."""
resp = self.auth_app.post(
'/auth/api/v1/accounts/self/xsrf_token',
headers={'X-XSRF-Token-Request': '1'}).json
return resp['xsrf_token'].encode('ascii')
def test_root(self):
# Just asserts it doesn't crash.
self.app_frontend.get('/')
def test_browse(self):
self.set_as_reader()
hashhex = self.gen_content_inline()
self.app_frontend.get('/browse?namespace=default&digest=%s' % hashhex)
self.app_frontend.get(
'/browse?namespace=default&digest=%s&as=file1.txt' % hashhex)
def test_browse_missing(self):
self.set_as_reader()
hashhex = '0123456780123456780123456789990123456789'
self.app_frontend.get(
'/browse?namespace=default&digest=%s' % hashhex, status=404)
def test_content(self):
self.set_as_reader()
hashhex = self.gen_content_inline(content='Foo')
resp = self.app_frontend.get(
'/content?namespace=default&digest=%s' % hashhex)
self.assertEqual('Foo', resp.body)
resp = self.app_frontend.get(
'/content?namespace=default&digest=%s&as=file1.txt' % hashhex)
self.assertEqual('Foo', resp.body)
def test_content_gcs(self):
content = 'Foo'
compressed = zlib.compress(content)
namespace = 'default-gzip'
hashhex = hashlib.sha1(content).hexdigest()
def read_file(bucket, key):
self.assertEqual(u'sample-app', bucket)
self.assertEqual(namespace + '/' + hashhex, key)
return [compressed]
self.mock(gcs, 'read_file', read_file)
key = model.get_entry_key(namespace, hashhex)
model.new_content_entry(
key,
is_isolated=False,
compressed_size=len(compressed),
expanded_size=len(content),
is_verified=True).put()
self.set_as_reader()
resp = self.app_frontend.get(
'/content?namespace=default-gzip&digest=%s' % hashhex)
self.assertEqual(content, resp.body)
resp = self.app_frontend.get(
'/content?namespace=default-gzip&digest=%s&as=file1.txt' % hashhex)
self.assertEqual(content, resp.body)
self.assertNotEqual(None, key.get())
def test_content_gcs_missing(self):
content = 'Foo'
compressed = zlib.compress(content)
namespace = 'default-gzip'
hashhex = hashlib.sha1(content).hexdigest()
def read_file(bucket, key):
self.assertEqual(u'sample-app', bucket)
self.assertEqual(namespace + '/' + hashhex, key)
raise cloudstorage.NotFoundError('Someone deleted the file from GCS')
self.mock(gcs, 'read_file', read_file)
key = model.get_entry_key(namespace, hashhex)
model.new_content_entry(
key,
is_isolated=False,
compressed_size=len(compressed),
expanded_size=len(content),
is_verified=True).put()
self.set_as_reader()
self.app_frontend.get(
'/content?namespace=default-gzip&digest=%s' % hashhex, status=404)
self.assertEqual(None, key.get())
def test_config(self):
self.set_as_admin()
resp = self.app_frontend.get('/restricted/config')
# TODO(maruel): Use beautifulsoup?
priv_key = 'test private key'
params = {
'gs_private_key': priv_key,
'keyid': str(config.settings_info()['cfg'].key.integer_id()),
'xsrf_token': self.get_xsrf_token(),
}
self.assertEqual('', config.settings().gs_private_key)
resp = self.app_frontend.post('/restricted/config', params)
self.assertNotIn('Update conflict', resp)
self.assertEqual(priv_key, config.settings().gs_private_key)
def test_config_conflict(self):
self.set_as_admin()
resp = self.app_frontend.get('/restricted/config')
# TODO(maruel): Use beautifulsoup?
params = {
'google_analytics': 'foobar',
'keyid': str(config.settings().key.integer_id() - 1),
'reusable_task_age_secs': 30,
'xsrf_token': self.get_xsrf_token(),
}
self.assertEqual('', config.settings().google_analytics)
resp = self.app_frontend.post('/restricted/config', params)
self.assertIn('Update conflict', resp)
self.assertEqual('', config.settings().google_analytics)
def disabled_test_stats(self):
self._gen_stats()
response = self.app_frontend.get('/stats')
# Just ensure something is returned.
self.assertGreater(response.content_length, 4000)
def disabled_test_api_stats_days(self):
self._gen_stats()
# It's cheezy but at least it asserts that the data makes sense.
expected = (
'google.visualization.Query.setResponse({"status":"ok","table":{"rows":'
'[{"c":[{"v":"Date(2010,0,2)"},{"v":100},{"v":100},{"v":0},{"v":0},{"v"'
':0},{"v":0},{"v":0},{"v":0},{"v":0}]}],"cols":[{"type":"date","id":"ke'
'y","label":"Day"},{"type":"number","id":"requests","label":"Total"},{"'
'type":"number","id":"other_requests","label":"Other"},{"type":"number"'
',"id":"failures","label":"Failures"},{"type":"number","id":"uploads","'
'label":"Uploads"},{"type":"number","id":"downloads","label":"Downloads'
'"},{"type":"number","id":"contains_requests","label":"Lookups"},{"type'
'":"number","id":"uploads_bytes","label":"Uploaded"},{"type":"number","'
'id":"downloads_bytes","label":"Downloaded"},{"type":"number","id":"con'
'tains_lookups","label":"Items looked up"}]},"reqId":"0","version":"0.6'
'"});')
response = self.app_frontend.get('/isolate/api/v1/stats/days?duration=1')
self.assertEqual(expected, response.body)
def disabled_test_api_stats_hours(self):
self._gen_stats()
# It's cheezy but at least it asserts that the data makes sense.
expected = (
'google.visualization.Query.setResponse({"status":"ok","table":{"rows":'
'[{"c":[{"v":"Date(2010,0,2,3,0,0)"},{"v":10},{"v":10},{"v":0},{"v":0},'
'{"v":0},{"v":0},{"v":0},{"v":0},{"v":0}]}],"cols":[{"type":"datetime",'
'"id":"key","label":"Time"},{"type":"number","id":"requests","label":"T'
'otal"},{"type":"number","id":"other_requests","label":"Other"},{"type"'
':"number","id":"failures","label":"Failures"},{"type":"number","id":"u'
'ploads","label":"Uploads"},{"type":"number","id":"downloads","label":"'
'Downloads"},{"type":"number","id":"contains_requests","label":"Lookups'
'"},{"type":"number","id":"uploads_bytes","label":"Uploaded"},{"type":"'
'number","id":"downloads_bytes","label":"Downloaded"},{"type":"number",'
'"id":"contains_lookups","label":"Items looked up"}]},"reqId":"0","vers'
'ion":"0.6"});')
response = self.app_frontend.get(
'/isolate/api/v1/stats/hours?duration=1&now=')
self.assertEqual(expected, response.body)
def disabled_test_api_stats_minutes(self):
self._gen_stats()
# It's cheezy but at least it asserts that the data makes sense.
expected = (
'google.visualization.Query.setResponse({"status":"ok","table":{"rows":'
'[{"c":[{"v":"Date(2010,0,2,3,4,0)"},{"v":1},{"v":1},{"v":0},{"v":0},{"'
'v":0},{"v":0},{"v":0},{"v":0},{"v":0}]}],"cols":[{"type":"datetime","i'
'd":"key","label":"Time"},{"type":"number","id":"requests","label":"Tot'
'al"},{"type":"number","id":"other_requests","label":"Other"},{"type":"'
'number","id":"failures","label":"Failures"},{"type":"number","id":"upl'
'oads","label":"Uploads"},{"type":"number","id":"downloads","label":"Do'
'wnloads"},{"type":"number","id":"contains_requests","label":"Lookups"}'
',{"type":"number","id":"uploads_bytes","label":"Uploaded"},{"type":"nu'
'mber","id":"downloads_bytes","label":"Downloaded"},{"type":"number","i'
'd":"contains_lookups","label":"Items looked up"}]},"reqId":"0","versio'
'n":"0.6"});')
response = self.app_frontend.get('/isolate/api/v1/stats/minutes?duration=1')
self.assertEqual(expected, response.body)
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.FATAL)
unittest.main()