blob: c8bcf1cf6d6dc6d73a140f9c215f92c7161c2f84 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
# Disable 'Access to a protected member ...'. NDB uses '_' for other purposes.
# pylint: disable=W0212
import datetime
import sys
import threading
import unittest
from test_support import test_env
test_env.setup_test_env()
from google.appengine.ext import ndb
from components import utils
from test_support import test_case
class Rambling(ndb.Model):
"""Fake statistics."""
a = ndb.IntegerProperty()
b = ndb.FloatProperty()
c = ndb.DateTimeProperty()
d = ndb.DateProperty()
def to_dict(self):
out = super(Rambling, self).to_dict()
out['e'] = datetime.timedelta(seconds=1.1)
out['f'] = '\xc4\xa9'
return out
class UtilsTest(test_case.TestCase):
def test_json(self):
r = Rambling(
a=2,
b=0.2,
c=datetime.datetime(2012, 1, 2, 3, 4, 5, 6),
d=datetime.date(2012, 1, 2))
actual = utils.to_json_encodable([r])
# Confirm that default is tight encoding and sorted keys.
expected = [
{
'a': 2,
'b': 0.2,
'c': u'2012-01-02 03:04:05',
'd': u'2012-01-02',
'e': 1,
'f': u'\u0129',
},
]
self.assertEqual(expected, actual)
self.assertEqual([0, 1], utils.to_json_encodable(range(2)))
self.assertEqual([0, 1], utils.to_json_encodable(i for i in (0, 1)))
self.assertEqual([0, 1], utils.to_json_encodable(xrange(2)))
def test_validate_root_service_url_dev_server(self):
self.mock(utils, 'is_local_dev_server', lambda: True)
utils.validate_root_service_url('https://blah')
utils.validate_root_service_url('http://localhost:8080')
def test_validate_root_service_url_gae(self):
self.mock(utils, 'is_local_dev_server', lambda: False)
utils.validate_root_service_url('https://blah')
with self.assertRaises(ValueError):
utils.validate_root_service_url('http://localhost:8080')
def test_validate_root_service_bad(self):
with self.assertRaises(ValueError):
utils.validate_root_service_url('')
with self.assertRaises(ValueError):
utils.validate_root_service_url('blah://blah')
with self.assertRaises(ValueError):
utils.validate_root_service_url('https://')
with self.assertRaises(ValueError):
utils.validate_root_service_url('https://blah/')
with self.assertRaises(ValueError):
utils.validate_root_service_url('https://blah?asdad')
def test_datetime_to_rfc2822(self):
self.assertEqual(
'Mon, 02 Jan 2012 03:04:05 -0000',
utils.datetime_to_rfc2822(datetime.datetime(2012, 1, 2, 3, 4, 5)))
def test_milliseconds_since_epoch(self):
self.mock_now(datetime.datetime(1970, 1, 2, 3, 4, 5, 6789))
delta = utils.milliseconds_since_epoch(None)
self.assertEqual(97445007, delta)
def test_cache(self):
calls = []
@utils.cache
def get_me():
calls.append(1)
return len(calls)
self.assertEqual(1, get_me())
self.assertEqual(1, get_me())
self.assertEqual(1, len(calls))
def test_cache_with_tasklets(self):
@utils.cache
def f():
ndb.sleep(0).wait() # Yield thread.
return 1
@ndb.tasklet
def g():
yield () # Make g a generator.
raise ndb.Return(f())
def test():
ndb.Future.wait_all([(g()), (g())])
t = threading.Thread(target=test)
t.daemon = True
t.start()
t.join(1)
if t.is_alive():
self.fail('deadlock')
def test_clear_cache(self):
calls = []
@utils.cache
def get_me():
calls.append(1)
return len(calls)
self.assertEqual(1, get_me())
utils.clear_cache(get_me)
self.assertEqual(2, get_me())
self.assertEqual(2, len(calls))
class FakeNdbContext(object):
def __init__(self):
self.get_calls = []
self.set_calls = []
self.cached_value = None
@ndb.tasklet
def memcache_get(self, key):
self.get_calls.append(key)
raise ndb.Return(self.cached_value)
# pylint: disable=redefined-outer-name
@ndb.tasklet
def memcache_set(self, key, value, time=None):
self.cached_value = value
self.set_calls.append((key, value, time))
class MemcacheTest(test_case.TestCase):
def setUp(self):
super(MemcacheTest, self).setUp()
self.f_calls = []
self.f_value = 'value'
self.ctx = FakeNdbContext()
self.mock(ndb, 'get_context', lambda: self.ctx)
@utils.memcache('f', ['a', 'b', 'c', 'd'], time=54)
def f(self, a, b, c=3, d=4, e=5):
self.f_calls.append((a, b, c, d, e))
return self.f_value
@utils.memcache_async('f', ['a', 'b', 'c', 'd'], time=54)
@ndb.tasklet
def f_async(self, a, b, c=3, d=4, e=5):
self.f_calls.append((a, b, c, d, e))
raise ndb.Return(self.f_value)
def test_async(self):
self.f_async(1, 2, 3, 4, 5).get_result()
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 3, 4]'])
self.assertEqual(self.f_calls, [(1, 2, 3, 4, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 3, 4]', ('value',), 54)])
def test_call(self):
self.f(1, 2, 3, 4, 5)
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 3, 4]'])
self.assertEqual(self.f_calls, [(1, 2, 3, 4, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 3, 4]', ('value',), 54)])
self.ctx.get_calls = []
self.f_calls = []
self.ctx.set_calls = []
self.f(1, 2, 3, 4)
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 3, 4]'])
self.assertEqual(self.f_calls, [])
self.assertEqual(self.ctx.set_calls, [])
def test_none(self):
self.f_value = None
self.assertEqual(self.f(1, 2, 3, 4), None)
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 3, 4]'])
self.assertEqual(self.f_calls, [(1, 2, 3, 4, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 3, 4]', (None,), 54)])
self.ctx.get_calls = []
self.f_calls = []
self.ctx.set_calls = []
self.assertEqual(self.f(1, 2, 3, 4), None)
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 3, 4]'])
self.assertEqual(self.f_calls, [])
self.assertEqual(self.ctx.set_calls, [])
def test_call_without_optional_arg(self):
self.f(1, 2)
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 3, 4]'])
self.assertEqual(self.f_calls, [(1, 2, 3, 4, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 3, 4]', ('value',), 54)])
def test_call_kwargs(self):
self.f(1, 2, c=30, d=40)
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 30, 40]'])
self.assertEqual(self.f_calls, [(1, 2, 30, 40, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 30, 40]', ('value',), 54)])
def test_call_all_kwargs(self):
self.f(a=1, b=2, c=30, d=40)
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 30, 40]'])
self.assertEqual(self.f_calls, [(1, 2, 30, 40, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 30, 40]', ('value',), 54)])
def test_call_packed_args(self):
self.f(*[1, 2])
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 3, 4]'])
self.assertEqual(self.f_calls, [(1, 2, 3, 4, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 3, 4]', ('value',), 54)])
def test_call_packed_kwargs(self):
self.f(1, 2, **{'c':30, 'd': 40})
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 30, 40]'])
self.assertEqual(self.f_calls, [(1, 2, 30, 40, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 30, 40]', ('value',), 54)])
def test_call_packed_both(self):
self.f(*[1, 2], **{'c':30, 'd': 40})
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[1, 2, 30, 40]'])
self.assertEqual(self.f_calls, [(1, 2, 30, 40, 5)])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[1, 2, 30, 40]', ('value',), 54)])
def test_empty_key_arg(self):
@utils.memcache('f')
def f(a):
# pylint: disable=unused-argument
return 1
f(1)
self.assertEqual(self.ctx.get_calls, ['utils.memcache/v1a/f[]'])
self.assertEqual(
self.ctx.set_calls,
[('utils.memcache/v1a/f[]', (1,), None)])
def test_nonexisting_arg(self):
with self.assertRaises(KeyError):
# pylint: disable=unused-variable
@utils.memcache('f', ['b'])
def f(a):
# pylint: disable=unused-argument
pass
def test_invalid_args(self):
with self.assertRaises(TypeError):
# pylint: disable=no-value-for-parameter
self.f()
with self.assertRaises(TypeError):
# pylint: disable=no-value-for-parameter
self.f(b=3)
with self.assertRaises(TypeError):
# pylint: disable=unexpected-keyword-arg
self.f(1, 2, x=3)
def test_args_prohibited(self):
with self.assertRaises(NotImplementedError):
# pylint: disable=unused-variable
@utils.memcache('f', [])
def f(a, *args):
# pylint: disable=unused-argument
pass
def test_kwargs_prohibited(self):
with self.assertRaises(NotImplementedError):
# pylint: disable=unused-variable
@utils.memcache('f', [])
def f(**kwargs):
# pylint: disable=unused-argument
pass
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
unittest.main()