blob: 87286a3928710e5ef16011b8b3f8d687f5a53ab6 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2014 The Swarming Authors. All rights reserved.
# Use of this source code is governed by the Apache v2.0 license that can be
# found in the LICENSE file.
import collections
import datetime
import logging
import os
import StringIO
import sys
import tarfile
import tempfile
import unittest
APP_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
import test_env
test_env.setup_test_env()
from google.appengine.ext import ndb
from components import auth
from components import auth_testing
from components.auth import model
from test_support import test_case
import importer
def build_tar_gz(content):
"""Returns bytes of tar.gz archive build from {filename -> body} dict."""
out = StringIO.StringIO()
with tarfile.open(mode='w|gz', fileobj=out) as tar:
for name, value in content.iteritems():
# tarfile module doesn't support in-memory files (it tries to os.stat
# them), so dump to disk first to keep the code simple.
path = None
try:
fd, path = tempfile.mkstemp(prefix='importer_test')
with os.fdopen(fd, 'w') as f:
f.write(value)
tar.add(path, arcname=name)
finally:
if path:
os.remove(path)
return out.getvalue()
def ident(name):
if '@' not in name:
return auth.Identity(auth.IDENTITY_USER, '%s@example.com' % name)
else:
return auth.Identity(auth.IDENTITY_USER, name)
def group(name, members, nested=None):
return model.AuthGroup(
key=model.group_key(name),
created_by=ident('admin'),
created_ts=datetime.datetime(1999, 1, 2, 3, 4, 5, 6),
modified_by=ident('admin'),
modified_ts=datetime.datetime(1999, 1, 2, 3, 4, 5, 6),
members=[ident(x) for x in members],
nested=nested or [])
def fetch_groups():
return {x.key.id(): x.to_dict() for x in model.AuthGroup.query()}
class ImporterTest(test_case.TestCase):
def setUp(self):
super(ImporterTest, self).setUp()
auth_testing.mock_is_admin(self, True)
auth_testing.mock_get_current_identity(self)
def mock_urlfetch(self, urls):
def mock_get_access_token(_scope):
return 'token', 0
self.mock(importer.app_identity, 'get_access_token', mock_get_access_token)
@ndb.tasklet
def mock_fetch(**kwargs):
self.assertIn(kwargs['url'], urls)
self.assertEqual({'Authorization': 'OAuth token'}, kwargs['headers'])
class ReturnValue(object):
status_code = 200
content = urls[kwargs['url']]
raise ndb.Return(ReturnValue())
self.mock(ndb.get_context(), 'urlfetch', mock_fetch)
def test_extract_tar_archive(self):
expected = {
'0': '0',
'a/1': '1',
'a/2': '2',
'b/1': '3',
'b/c/d': '4',
}
out = {
name: fileobj.read()
for name, fileobj in importer.extract_tar_archive(build_tar_gz(expected))
}
self.assertEqual(expected, out)
def test_load_group_file_ok(self):
body = '\n'.join(['', 'b', 'a', ''])
expected = [
auth.Identity.from_bytes('user:a@example.com'),
auth.Identity.from_bytes('user:b@example.com'),
]
self.assertEqual(expected, importer.load_group_file(body, 'example.com'))
def test_load_group_file_bad_id(self):
body = 'bad id'
with self.assertRaises(importer.BundleBadFormatError):
importer.load_group_file(body, 'example.com')
def test_prepare_import(self):
service_id = auth.Identity.from_bytes('service:some-service')
self.mock(auth, 'get_service_self_identity', lambda: service_id)
existing_groups = [
group('normal-group', [], ['ldap/cleared']),
group('not-ldap/some', []),
group('ldap/updated', ['a']),
group('ldap/unchanged', ['a']),
group('ldap/deleted', ['a']),
group('ldap/cleared', ['a']),
]
imported_groups = {
'ldap/new': [ident('a')],
'ldap/updated': [ident('a'), ident('b')],
'ldap/unchanged': [ident('a')],
}
to_put, to_delete = importer.prepare_import(
'ldap',
existing_groups,
imported_groups,
datetime.datetime(2010, 1, 2, 3, 4, 5, 6))
expected_to_put = {
'ldap/cleared': {
'created_by': ident('admin'),
'created_ts': datetime.datetime(1999, 1, 2, 3, 4, 5, 6),
'description': '',
'globs': [],
'members': [],
'modified_by': service_id,
'modified_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'nested': [],
},
'ldap/new': {
'created_by': service_id,
'created_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'description': '',
'globs': [],
'members': [ident('a')],
'modified_by': service_id,
'modified_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'nested': [],
},
'ldap/updated': {
'created_by': ident('admin'),
'created_ts': datetime.datetime(1999, 1, 2, 3, 4, 5, 6),
'description': '',
'globs': [],
'members': [ident('a'), ident('b')],
'modified_by': service_id,
'modified_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'nested': [],
},
}
self.assertEqual(expected_to_put, {x.key.id(): x.to_dict() for x in to_put})
self.assertEqual([model.group_key('ldap/deleted')], to_delete)
def test_load_tarball(self):
bundle = build_tar_gz({
'at_root': 'a\nb',
'ldap/ bad name': 'a\nb',
'ldap/group-a': 'a\nb',
'ldap/group-b': 'a\nb',
'ldap/group-c': 'a\nb',
'ldap/deeper/group-a': 'a\nb',
'not-ldap/group-a': 'a\nb',
})
result = importer.load_tarball(
content=bundle,
systems=['ldap'],
groups=['ldap/group-a', 'ldap/group-b'],
domain='example.com')
expected = {
'ldap': {
'ldap/group-a': [
auth.Identity.from_bytes('user:a@example.com'),
auth.Identity.from_bytes('user:b@example.com')
],
'ldap/group-b': [
auth.Identity.from_bytes('user:a@example.com'),
auth.Identity.from_bytes('user:b@example.com')
],
}
}
self.assertEqual(expected, result)
def test_load_tarball_bad_group(self):
bundle = build_tar_gz({
'at_root': 'a\nb',
'ldap/group-a': 'a\n!!!!!',
})
with self.assertRaises(importer.BundleBadFormatError):
importer.load_tarball(
content=bundle,
systems=['ldap'],
groups=['ldap/group-a', 'ldap/group-b'],
domain='example.com')
def test_import_external_groups(self):
self.mock_now(datetime.datetime(2010, 1, 2, 3, 4, 5, 6))
service_id = auth.Identity.from_bytes('service:some-service')
self.mock(auth, 'get_service_self_identity', lambda: service_id)
importer.write_config([
{
'domain': 'example.com',
'format': 'tarball',
'groups': ['ldap/new'],
'oauth_scopes': ['scope'],
'systems': ['ldap'],
'url': 'https://fake_tarball',
},
{
'format': 'plainlist',
'group': 'external_1',
'oauth_scopes': ['scope'],
'url': 'https://fake_external_1',
},
{
'description': 'Some external group',
'domain': 'example.com',
'format': 'plainlist',
'group': 'external_2',
'oauth_scopes': ['scope'],
'url': 'https://fake_external_2',
},
])
self.mock_urlfetch({
'https://fake_tarball': build_tar_gz({
'ldap/new': 'a\nb',
}),
'https://fake_external_1': 'abc@test.com\ndef@test.com\n',
'https://fake_external_2': '123\n456',
})
# Should be deleted during import, since not in a imported bundle.
group('ldap/deleted', []).put()
# Should be updated.
group('external/external_1', ['x', 'y']).put()
# Should be removed, since not in list of external groups.
group('external/deleted', []).put()
# Run the import.
initial_auth_db_rev = model.get_auth_db_revision()
importer.import_external_groups()
self.assertEqual(initial_auth_db_rev + 1, model.get_auth_db_revision())
# Verify final state.
expected_groups = {
'ldap/new': {
'created_by': service_id,
'created_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'description': u'',
'globs': [],
'members': [ident('a'), ident('b')],
'modified_by': service_id,
'modified_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'nested': [],
},
'external/external_1': {
'created_by': ident('admin'),
'created_ts': datetime.datetime(1999, 1, 2, 3, 4, 5, 6),
'description': u'',
'globs': [],
'members': [ident('abc@test.com'), ident('def@test.com')],
'modified_by': service_id,
'modified_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'nested': [],
},
'external/external_2': {
'created_by': service_id,
'created_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'description': u'',
'globs': [],
'members': [ident('123'), ident('456')],
'modified_by': service_id,
'modified_ts': datetime.datetime(2010, 1, 2, 3, 4, 5, 6),
'nested': [],
},
}
self.assertEqual(expected_groups, fetch_groups())
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.FATAL)
unittest.main()