blob: 676e3eee3049f2cf9ed540c35c848b27149f4378 [file] [log] [blame]
#!/usr/bin/env vpython
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
from test_env import future
import test_env
test_env.setup_test_env()
from test_support import test_case
import mock
from components import auth
from components.config.proto import project_config_pb2
from components.config.proto import service_config_pb2
import acl
import projects
import services
import storage
TEST_USER = auth.Identity.from_bytes('user:someone@example.com')
TEST_PROJECT = 'some-project'
TEST_SERVICE = 'some-service'
PROJECT_SET = 'projects/' + TEST_PROJECT
SERVICE_SET = 'services/' + TEST_SERVICE
def can_read(config_set):
return acl.can_read_config_sets([config_set])[config_set]
can_validate = acl.can_validate
can_reimport = acl.can_reimport
class AclTestCase(test_case.TestCase):
def setUp(self):
super(AclTestCase, self).setUp()
self.mock(auth, 'is_group_member', mock.Mock(return_value=False))
self.mock(auth, 'get_current_identity', mock.Mock())
auth.get_current_identity.return_value = TEST_USER
self.mock(services, 'get_services_async', mock.Mock(return_value=future([
service_config_pb2.Service(
id=TEST_SERVICE,
access=['group:some-service-access'],
),
service_config_pb2.Service(
id='hidden',
access=['group:hidden'],
),
])))
perms_map = {
auth.root_realm(TEST_PROJECT): {
acl._PERMISSION_READ: 'some-project-access',
acl._PERMISSION_VALIDATE: 'some-project-validate',
acl._PERMISSION_REIMPORT: 'some-project-reimport',
},
auth.root_realm('hidden'): {
acl._PERMISSION_READ: 'hidden',
},
}
def has_permission_mock(perm, realms):
assert len(realms) == 1, realms
group = perms_map.get(realms[0], {}).get(perm)
return bool(group and auth.is_group_member(group))
self.mock(auth, 'has_permission', has_permission_mock)
self.mock(projects, 'get_metadata_async', mock.Mock(return_value=future({
TEST_PROJECT: project_config_pb2.ProjectCfg(),
'hidden': project_config_pb2.ProjectCfg(),
})))
self.mock(storage, 'get_self_config_async', lambda *_: future(
service_config_pb2.AclCfg(
project_access_group='project-access',
project_reimport_group='project-reimport',
project_validation_group='project-validation',
service_access_group='service-access',
service_reimport_group='service-reimport',
service_validation_group='service-validation',
)))
self.mock(projects, '_filter_existing', lambda pids: pids)
def mock_membership(self, *groups):
auth.is_group_member.side_effect = lambda name, *_: name in groups
def test_malformed_config_set(self):
with self.assertRaises(ValueError):
can_read('something/odd')
def test_access_project_forbidden(self):
self.assertFalse(can_read(PROJECT_SET))
def test_access_project_via_project_acl(self):
self.mock_membership('some-project-access')
self.assertTrue(can_read(PROJECT_SET))
def test_access_project_via_global_acl(self):
self.mock_membership('project-access')
self.assertTrue(can_read(PROJECT_SET))
def test_access_project_unknown(self):
self.mock_membership('project-access')
self.assertTrue(can_read('projects/unknown'))
def test_validate_project_invisible(self):
self.assertFalse(can_validate(PROJECT_SET))
def test_validate_project_forbidden(self):
self.mock_membership('project-access')
self.assertFalse(can_validate(PROJECT_SET))
def test_validate_project_via_project_acl(self):
self.mock_membership('some-project-access', 'some-project-validate')
self.assertTrue(can_validate(PROJECT_SET))
def test_validate_project_via_global_acl(self):
self.mock_membership('project-access', 'project-validation')
self.assertTrue(can_validate(PROJECT_SET))
def test_reimport_project_invisible(self):
self.assertFalse(can_reimport(PROJECT_SET))
def test_reimport_project_forbidden(self):
self.mock_membership('project-access')
self.assertFalse(can_reimport(PROJECT_SET))
def test_reimport_project_via_project_acl(self):
self.mock_membership('some-project-access', 'some-project-reimport')
self.assertTrue(can_reimport(PROJECT_SET))
def test_reimport_project_via_global_acl(self):
self.mock_membership('project-access', 'project-reimport')
self.assertTrue(can_reimport(PROJECT_SET))
def test_access_service_frobidden(self):
self.assertFalse(can_read(SERVICE_SET))
def test_access_service_via_service_acl(self):
self.mock_membership('some-service-access')
self.assertTrue(can_read(SERVICE_SET))
def test_access_service_via_global_acl(self):
self.mock_membership('service-access')
self.assertTrue(can_read(SERVICE_SET))
def test_access_service_unknown(self):
self.mock_membership('service-access')
self.assertTrue(can_read('services/unknown'))
def test_validate_service_invisible(self):
self.assertFalse(can_validate(SERVICE_SET))
def test_validate_service_forbidden(self):
self.mock_membership('service-access')
self.assertFalse(can_validate(SERVICE_SET))
def test_validate_service_via_global_acl(self):
self.mock_membership('service-access', 'service-validation')
self.assertTrue(can_validate(SERVICE_SET))
def test_reimport_service_invisible(self):
self.assertFalse(can_reimport(SERVICE_SET))
def test_reimport_service_forbidden(self):
self.mock_membership('service-access')
self.assertFalse(can_reimport(SERVICE_SET))
def test_reimport_service_via_global_acl(self):
self.mock_membership('service-access', 'service-reimport')
self.assertTrue(can_reimport(SERVICE_SET))
### Tests for can_read_config_sets batch mode.
def test_can_read_config_sets_many(self):
self.mock_membership('some-service-access', 'some-project-access')
res = acl.can_read_config_sets([
PROJECT_SET, 'projects/hidden', 'projects/unknown',
SERVICE_SET, 'services/hidden', 'services/unknown',
PROJECT_SET, SERVICE_SET, # intentional dups
])
self.assertEqual(res, {
PROJECT_SET: True,
SERVICE_SET: True,
'projects/hidden': False,
'projects/unknown': False,
'services/hidden': False,
'services/unknown': False,
})
### Tests for _check_service_config_acl helper.
def test_check_service_config_acl_as_identity(self):
res = acl._check_service_config_acl({
'missing': None,
'empty': service_config_pb2.Service(),
'yes': service_config_pb2.Service(
access=['nope@example.com', 'group:nope', 'someone@example.com']),
'no': service_config_pb2.Service(
access=['nope@example.com', 'group:nope']),
'also-yes': service_config_pb2.Service(
access=['user:someone@example.com']),
})
self.assertEqual(res, {
'missing': False,
'empty': False,
'yes': True,
'no': False,
'also-yes': True,
})
def test_check_service_config_acl_via_group(self):
self.mock_membership('some-group')
res = acl._check_service_config_acl({
'missing': None,
'empty': service_config_pb2.Service(),
'yes': service_config_pb2.Service(
access=['nope@example.com', 'group:nope', 'group:some-group']),
'no': service_config_pb2.Service(
access=['nope@example.com', 'group:nope']),
'also-yes': service_config_pb2.Service(
access=['group:some-group']),
})
self.assertEqual(res, {
'missing': False,
'empty': False,
'yes': True,
'no': False,
'also-yes': True,
})
if __name__ == '__main__':
test_env.main()