blob: 011e81d160c4cdbff327545c93ebb9b8c633cf72 [file] [log] [blame]
#!/usr/bin/env vpython
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import datetime
import logging
import sys
import unittest
import test_env
test_env.setup_test_env()
from google.appengine.ext import ndb
from components import config as config_component
from components import utils
from components.auth import model
from components.auth.proto import security_config_pb2
from components.config import validation
from test_support import test_case
from proto import config_pb2
import config
class ConfigTest(test_case.TestCase):
def setUp(self):
super(ConfigTest, self).setUp()
self.mock_now(datetime.datetime(2014, 1, 2, 3, 4, 5))
model.AuthGlobalConfig(
key=model.root_key(),
auth_db_rev=0,
).put()
def test_validate_settings_cfg(self):
def is_valid(**fields):
ctx = validation.Context()
config.validate_settings_cfg(config_pb2.SettingsCfg(**fields), ctx)
return not ctx.messages
self.assertTrue(is_valid())
self.assertTrue(is_valid(auth_db_gs_path='a/b'))
self.assertTrue(is_valid(auth_db_gs_path='a/b/c'))
self.assertFalse(is_valid(auth_db_gs_path='a/'))
self.assertFalse(is_valid(auth_db_gs_path='a'))
self.assertFalse(is_valid(auth_db_gs_path='a//b'))
self.assertFalse(is_valid(auth_db_gs_path='a/b/'))
def test_refetch_config(self):
initial_revs = {
'a.cfg': config.Revision('old_a_rev', 'urla'),
'b.cfg': config.Revision('old_b_rev', 'urlb'),
'c.cfg': config.Revision('old_c_rev', 'urlc'),
}
revs = initial_revs.copy()
bumps = []
def bump_rev(pkg, rev, conf):
revs[pkg] = rev
bumps.append((pkg, rev, conf, ndb.in_transaction()))
return True
@ndb.tasklet
def get_rev_async(pkg):
raise ndb.Return(revs[pkg])
self.mock(config, 'is_remote_configured', lambda: True)
self.mock(config, '_CONFIG_SCHEMAS', {
# Will be updated outside of auth db transaction.
'a.cfg': {
'proto_class': None,
'revision_getter': lambda: get_rev_async('a.cfg'),
'validator': lambda body: self.assertEqual(body, 'new a body'),
'updater': lambda root, rev, conf: bump_rev('a.cfg', rev, conf),
'use_authdb_transaction': False,
},
# Will not be changed.
'b.cfg': {
'proto_class': None,
'revision_getter': lambda: get_rev_async('b.cfg'),
'validator': lambda _body: True,
'updater': lambda root, rev, conf: bump_rev('b.cfg', rev, conf),
'use_authdb_transaction': False,
},
# Will be updated inside auth db transaction.
'c.cfg': {
'proto_class': None,
'revision_getter': lambda: get_rev_async('c.cfg'),
'validator': lambda body: self.assertEqual(body, 'new c body'),
'updater': lambda root, rev, conf: bump_rev('c.cfg', rev, conf),
'use_authdb_transaction': True,
},
})
# _fetch_configs is called by config.refetch_config().
configs_to_fetch = {
'a.cfg': (config.Revision('new_a_rev', 'urla'), 'new a body'),
'b.cfg': (config.Revision('old_b_rev', 'urlb'), 'old b body'),
'c.cfg': (config.Revision('new_c_rev', 'urlc'), 'new c body'),
}
self.mock(config, '_fetch_configs', lambda _: configs_to_fetch)
# Old revisions initially.
self.assertEqual(initial_revs, config.get_revisions())
# Initial update.
config.refetch_config()
self.assertEqual([
('a.cfg', config.Revision('new_a_rev', 'urla'), 'new a body', False),
('c.cfg', config.Revision('new_c_rev', 'urlc'), 'new c body', True),
], bumps)
del bumps[:]
# Updated revisions now.
self.assertEqual(
{k: v[0] for k, v in configs_to_fetch.items()},
config.get_revisions())
# Refetch, nothing new.
config.refetch_config()
self.assertFalse(bumps)
def test_update_imports_config(self):
new_rev = config.Revision('rev', 'url')
body = 'tarball{url:"a" systems:"b"}'
self.assertTrue(config._update_imports_config(None, new_rev, body))
self.assertEqual(
new_rev, config._get_imports_config_revision_async().get_result())
def test_validate_ip_allowlist_config_empty_assignments(self):
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(name='abc'),
],
assignments=[
config_pb2.IPAllowlistConfig.Assignment(
identity='should not exist',
ip_allowlist_name='abc'),
])
with self.assertRaises(ValueError):
config._validate_ip_allowlist_config(conf)
def test_validate_ip_allowlist_config_empty(self):
config._validate_ip_allowlist_config(config_pb2.IPAllowlistConfig())
def test_validate_ip_allowlist_config_bad_name(self):
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(name='<bad name>'),
])
with self.assertRaises(ValueError):
config._validate_ip_allowlist_config(conf)
def test_validate_ip_allowlist_config_duplicated_wl(self):
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(name='abc'),
config_pb2.IPAllowlistConfig.IPAllowlist(name='abc'),
])
with self.assertRaises(ValueError):
config._validate_ip_allowlist_config(conf)
def test_validate_ip_allowlist_config_bad_subnet(self):
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(
name='abc',
subnets=['not a subnet']),
])
with self.assertRaises(ValueError):
config._validate_ip_allowlist_config(conf)
def test_validate_ip_allowlist_unknown_include(self):
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(
name='abc',
subnets=[],
includes=['unknown']),
])
with self.assertRaises(ValueError):
config._validate_ip_allowlist_config(conf)
def test_validate_ip_allowlist_include_cycle_1(self):
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(
name='abc',
subnets=[],
includes=['abc']),
])
with self.assertRaises(ValueError):
config._validate_ip_allowlist_config(conf)
def test_validate_ip_allowlist_include_cycle_2(self):
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(
name='abc',
subnets=[],
includes=['def']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='def',
subnets=[],
includes=['abc']),
])
with self.assertRaises(ValueError):
config._validate_ip_allowlist_config(conf)
def test_validate_ip_allowlist_include_diamond(self):
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(
name='abc',
subnets=[],
includes=['middle1', 'middle2']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='middle1',
subnets=[],
includes=['inner']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='middle2',
subnets=[],
includes=['inner']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='inner',
subnets=[]),
])
config._validate_ip_allowlist_config(conf)
def test_update_ip_allowlist_config(self):
def run(conf):
return config._update_authdb_configs({
'ip_allowlist.cfg': (
config.Revision('ip_whitelist_cfg_rev', 'http://url'), conf
),
})
# Pushing empty config to empty DB -> no changes.
self.assertFalse(run(config_pb2.IPAllowlistConfig()))
# Added a bunch of IP allowlists.
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(
name='abc',
subnets=['0.0.0.1/32']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='bots',
subnets=['0.0.0.2/32']),
config_pb2.IPAllowlistConfig.IPAllowlist(name='empty'),
])
self.assertTrue(run(conf))
# Verify everything is there.
self.assertEqual(
{
'abc': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1388631845000000,
'subnets': [u'0.0.0.1/32'],
},
'bots': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1388631845000000,
'subnets': [u'0.0.0.2/32'],
},
'empty': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1388631845000000,
'subnets': [],
},
},
{
x.key.id(): x.to_serializable_dict()
for x in model.AuthIPWhitelist.query(ancestor=model.root_key())
})
# Exact same config a bit later -> no changes applied.
self.mock_now(datetime.datetime(2014, 2, 2, 3, 4, 5))
self.assertFalse(run(conf))
# Modify allowlist, add new one, remove some.
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(
name='abc',
subnets=['0.0.0.3/32']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='bots',
subnets=['0.0.0.2/32']),
config_pb2.IPAllowlistConfig.IPAllowlist(name='another'),
])
self.mock_now(datetime.datetime(2014, 3, 2, 3, 4, 5))
self.assertTrue(run(conf))
# Verify everything is there.
self.assertEqual(
{
'abc': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1393729445000000,
'subnets': [u'0.0.0.3/32'],
},
'bots': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1388631845000000,
'subnets': [u'0.0.0.2/32'],
},
'another': {
'created_by': 'service:sample-app',
'created_ts': 1393729445000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1393729445000000,
'subnets': [],
},
},
{
x.key.id(): x.to_serializable_dict()
for x in model.AuthIPWhitelist.query(ancestor=model.root_key())
})
def test_update_ip_allowlist_config_with_includes(self):
def run(conf):
return config._update_authdb_configs({
'ip_allowlist.cfg': (
config.Revision('ip_whitelist_cfg_rev', 'http://url'), conf
),
})
conf = config_pb2.IPAllowlistConfig(
ip_allowlists=[
config_pb2.IPAllowlistConfig.IPAllowlist(
name='a',
subnets=['0.0.0.1/32']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='b',
subnets=['0.0.0.1/32', '0.0.0.2/32'],
includes=['a']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='c',
subnets=['0.0.0.3/32'],
includes=['a', 'b']),
config_pb2.IPAllowlistConfig.IPAllowlist(
name='d',
includes=['c']),
])
self.assertTrue(run(conf))
# Verify everything is there.
self.assertEqual(
{
'a': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1388631845000000,
'subnets': [u'0.0.0.1/32'],
},
'b': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1388631845000000,
'subnets': [u'0.0.0.1/32', u'0.0.0.2/32'],
},
'c': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1388631845000000,
'subnets': [u'0.0.0.1/32', u'0.0.0.2/32', u'0.0.0.3/32'],
},
'd': {
'created_by': 'service:sample-app',
'created_ts': 1388631845000000,
'description':
u'Imported from ip_allowlist.cfg',
'modified_by': 'service:sample-app',
'modified_ts': 1388631845000000,
'subnets': [u'0.0.0.1/32', u'0.0.0.2/32', u'0.0.0.3/32'],
},
},
{
x.key.id(): x.to_serializable_dict()
for x in model.AuthIPWhitelist.query(ancestor=model.root_key())
})
def test_update_oauth_config(self):
def run(conf):
return config._update_authdb_configs({
'oauth.cfg': (config.Revision('oauth_cfg_rev', 'http://url'), conf),
})
# Pushing empty config to empty state -> no changes.
self.assertFalse(run(config_pb2.OAuthConfig()))
# Updating config.
self.assertTrue(run(config_pb2.OAuthConfig(
primary_client_id='a',
primary_client_secret='b',
client_ids=['c', 'd'],
token_server_url='https://token-server')))
self.assertEqual({
'auth_db_rev': 1,
'auth_db_prev_rev': 0,
'modified_by': model.get_service_self_identity(),
'modified_ts': datetime.datetime(2014, 1, 2, 3, 4, 5),
'oauth_additional_client_ids': [u'c', u'd'],
'oauth_client_id': u'a',
'oauth_client_secret': u'b',
'security_config': None,
'token_server_url': u'https://token-server',
}, model.root_key().get().to_dict())
# Same config again -> no changes.
self.assertFalse(run(config_pb2.OAuthConfig(
primary_client_id='a',
primary_client_secret='b',
client_ids=['c', 'd'],
token_server_url='https://token-server')))
def test_validate_oauth_config(self):
with self.assertRaises(ValueError):
config._validate_oauth_config(
config_pb2.OAuthConfig(
primary_client_id='a',
primary_client_secret='b',
client_ids=['c', 'd'],
token_server_url='https://not-root-url/abc/def'))
def test_fetch_configs_ok(self):
fetches = {
'imports.cfg': ('imports_cfg_rev', 'tarball{url:"a" systems:"b"}'),
'ip_allowlist.cfg': (
'ip_whitelist_cfg_rev', config_pb2.IPAllowlistConfig()),
'oauth.cfg': ('oauth_cfg_rev',
config_pb2.OAuthConfig(primary_client_id='a')),
'settings.cfg': (None, None), # emulate missing config
}
@ndb.tasklet
def get_self_config_mock(path, *_args, **_kwargs):
self.assertIn(path, fetches)
raise ndb.Return(fetches.pop(path))
self.mock(config_component, 'get_self_config_async', get_self_config_mock)
self.mock(config, '_get_configs_url', lambda: 'http://url')
result = config._fetch_configs(fetches.keys())
self.assertFalse(fetches)
self.assertEqual(
{
'imports.cfg': (config.Revision('imports_cfg_rev', 'http://url'),
'tarball{url:"a" systems:"b"}'),
'ip_allowlist.cfg': (
config.Revision('ip_whitelist_cfg_rev', 'http://url'),
config_pb2.IPAllowlistConfig()),
'oauth.cfg': (config.Revision('oauth_cfg_rev', 'http://url'),
config_pb2.OAuthConfig(primary_client_id='a')),
'settings.cfg': (config.Revision('0' * 40, 'http://url'), ''),
},
result)
def test_fetch_configs_not_valid(self):
@ndb.tasklet
def get_self_config_mock(*_args, **_kwargs):
raise ndb.Return(('imports_cfg_rev', 'bad config'))
self.mock(config_component, 'get_self_config_async', get_self_config_mock)
self.mock(config, '_get_configs_url', lambda: 'http://url')
with self.assertRaises(config.CannotLoadConfigError):
config._fetch_configs(['imports.cfg'])
def test_gitiles_url(self):
self.assertEqual(
'https://host/repo/+/aaa/path/b/c.cfg',
config._gitiles_url('https://host/repo/+/HEAD/path', 'aaa', 'b/c.cfg'))
self.assertEqual(
'https://not-gitiles',
config._gitiles_url('https://not-gitiles', 'aaa', 'b/c.cfg'))
def test_update_service_config(self):
# Missing.
self.assertIsNone(config._get_service_config('abc.cfg'))
self.assertIsNone(
config._get_service_config_rev_async('abc.cfg').get_result())
# Updated.
rev = config.Revision('rev', 'url')
self.assertTrue(config._update_service_config('abc.cfg', rev, 'body'))
self.assertEqual('body', config._get_service_config('abc.cfg'))
self.assertEqual(
rev, config._get_service_config_rev_async('abc.cfg').get_result())
# Same body, returns False, though updates rev.
rev2 = config.Revision('rev2', 'url')
self.assertFalse(config._update_service_config('abc.cfg', rev2, 'body'))
self.assertEqual(
rev2, config._get_service_config_rev_async('abc.cfg').get_result())
def test_settings_updates(self):
# Fetch only settings.cfg in this test case.
self.mock(config, 'is_remote_configured', lambda: True)
self.mock(config, '_CONFIG_SCHEMAS', {
'settings.cfg': config._CONFIG_SCHEMAS['settings.cfg'],
})
# Default settings.
self.assertEqual(config_pb2.SettingsCfg(), config.get_settings())
# Mock new settings value in luci-config.
settings_cfg_text = 'enable_ts_monitoring: true'
self.mock(config, '_fetch_configs', lambda _: {
'settings.cfg': (config.Revision('rev', 'url'), settings_cfg_text),
})
# Fetch them.
config.refetch_config()
# Verify they are used now.
utils.clear_cache(config.get_settings)
self.assertEqual(
config_pb2.SettingsCfg(enable_ts_monitoring=True),
config.get_settings())
# "Delete" them from luci-config.
self.mock(config, '_fetch_configs', lambda _: {
'settings.cfg': (config.Revision('0'*40, 'url'), ''),
})
# Fetch them.
config.refetch_config()
# Verify defaults are restored.
utils.clear_cache(config.get_settings)
self.assertEqual(config_pb2.SettingsCfg(), config.get_settings())
def test_validate_security_config_ok(self):
ctx = validation.Context()
config.validate_security_config(security_config_pb2.SecurityConfig(), ctx)
self.assertEqual(ctx.result().messages, [])
def test_validate_security_config_bad_regexp(self):
ctx = validation.Context()
config.validate_security_config(security_config_pb2.SecurityConfig(
internal_service_regexp=['???'],
), ctx)
self.assertEqual(ctx.result().messages, [
validation.Message(
"internal_service_regexp: bad regexp '???' - nothing to repeat", 40),
])
def test_update_security_config(self):
def cfg(internal_service_regexp):
return security_config_pb2.SecurityConfig(
internal_service_regexp=internal_service_regexp)
def run(conf):
return config._update_authdb_configs({
'security.cfg': (config.Revision('cfg_rev', 'http://url'), conf),
})
def extract():
d = model.root_key().get()
return {
'auth_db_rev': d.auth_db_rev,
'security_config': security_config_pb2.SecurityConfig.FromString(
d.security_config),
}
# Pushing empty config -> no changes.
self.assertFalse(run(cfg([])))
# Updating the config.
self.assertTrue(run(cfg([r'example\.com'])))
self.assertEqual({
'auth_db_rev': 1,
'security_config': cfg([r'example\.com']),
}, extract())
# Pushing same config again. No changes.
self.assertFalse(run(cfg([r'example\.com'])))
def test_update_two_authdb_cfgs(self):
"""It is OK to update oauth.cfg and security.cfg at once."""
def oauth_cfg(client_id):
return config_pb2.OAuthConfig(primary_client_id=client_id)
def sec_cfg(regexps):
return security_config_pb2.SecurityConfig(internal_service_regexp=regexps)
def run(oauth, sec):
return config._update_authdb_configs({
'oauth.cfg': (config.Revision('cfg_rev', 'http://url'), oauth),
'security.cfg': (config.Revision('cfg_rev', 'http://url'), sec),
})
def extract():
d = model.root_key().get()
sec = security_config_pb2.SecurityConfig()
if d.security_config:
sec.MergeFromString(d.security_config)
return {
'auth_db_rev': d.auth_db_rev,
'oauth_client_id': d.oauth_client_id,
'security_config': sec,
}
# Both are empty when applied to empty state. No changes.
self.assertFalse(run(oauth_cfg(''), sec_cfg([])))
self.assertEqual({
'auth_db_rev': 0,
'oauth_client_id': u'',
'security_config': sec_cfg([]),
}, extract())
# Both have changes. AuthDB revision is bumped only once. Both changes are
# preserved.
self.assertTrue(run(oauth_cfg('z'), sec_cfg(['z'])))
self.assertEqual({
'auth_db_rev': 1,
'oauth_client_id': u'z',
'security_config': sec_cfg(['z']),
}, extract())
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.FATAL)
unittest.main()