blob: 7a158e535b5837fd0ea1b53671045f8c3da3be96 [file] [log] [blame]
#!/usr/bin/env vpython
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
from test_env import future
import test_env
test_env.setup_test_env()
from test_support import test_case
import mock
from components import auth
from components.config.proto import project_config_pb2
from components.config.proto import service_config_pb2
import acl
import projects
import services
import storage
def can_read_config_set(config_set):
return acl.can_read_config_sets([config_set])[config_set]
def has_project_access(project_id):
return acl.has_projects_access([project_id])[project_id]
class AclTestCase(test_case.TestCase):
def setUp(self):
super(AclTestCase, self).setUp()
self.mock(auth, 'get_current_identity', mock.Mock())
auth.get_current_identity.return_value = auth.Anonymous
self.mock(auth, 'is_group_member', mock.Mock(return_value=False))
self.mock(
services, 'get_services_async', mock.Mock(return_value=future([])))
acl_cfg = service_config_pb2.AclCfg(
project_access_group='project-admins',
service_access_group='service-admins',
)
self.mock(projects, '_filter_existing', lambda pids: pids)
self.mock(storage, 'get_self_config_async', lambda *_: future(acl_cfg))
def test_admin_can_read_all(self):
self.mock(acl, 'is_admin', mock.Mock(return_value=True))
self.assertTrue(can_read_config_set('services/swarming'))
self.assertTrue(can_read_config_set('projects/chromium'))
self.assertTrue(has_project_access('chromium'))
def test_has_service_access(self):
self.assertFalse(can_read_config_set('services/swarming'))
services.get_services_async.return_value = future([
service_config_pb2.Service(
id='swarming', access=['group:swarming-app']),
])
auth.is_group_member.side_effect = lambda g, *_: g == 'swarming-app'
self.assertTrue(can_read_config_set('services/swarming'))
def test_service_access_group(self):
self.assertFalse(can_read_config_set('services/swarming'))
auth.is_group_member.side_effect = lambda name, *_: name == 'service-admins'
self.assertTrue(can_read_config_set('services/swarming'))
def test_has_service_access_no_access(self):
self.assertFalse(can_read_config_set('services/swarming'))
def test_has_project_access_group(self):
self.mock(projects, 'get_metadata_async', mock.Mock(return_value=future({
'secret': project_config_pb2.ProjectCfg(
access=['group:googlers', 'a@a.com']),
})))
self.assertFalse(can_read_config_set('projects/secret'))
auth.is_group_member.side_effect = lambda name, *_: name == 'googlers'
self.assertTrue(can_read_config_set('projects/secret'))
auth.is_group_member.side_effect = lambda name, *_: name == 'project-admins'
self.assertTrue(can_read_config_set('projects/secret'))
def test_has_project_access_identity(self):
self.mock(projects, 'get_metadata_async', mock.Mock(return_value=future({
'secret': project_config_pb2.ProjectCfg(
access=['group:googlers', 'a@a.com']),
})))
self.assertFalse(can_read_config_set('projects/secret'))
auth.get_current_identity.return_value = auth.Identity('user', 'a@a.com')
self.assertTrue(can_read_config_set('projects/secret'))
def test_can_read_project_config_no_access(self):
self.assertFalse(has_project_access('projects/swarming'))
self.assertFalse(can_read_config_set('projects/swarming/refs/heads/x'))
def test_malformed_config_set(self):
with self.assertRaises(ValueError):
can_read_config_set('invalid config set')
if __name__ == '__main__':
test_env.main()