blob: 023dfca0a624d35d49b9c1fb5e112c39e4b11d52 [file] [log] [blame]
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Base class for tests that need to update the policies enforced by Chrome.
Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and
SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install.
The current implementation depends on the platform. The implementations might
change in the future, but tests relying on the above calls will keep working.
"""
# On ChromeOS, a mock DMServer is started and enterprise enrollment faked
# against it. The mock DMServer then serves user and device policy to Chrome.
#
# For this setup to work, the DNS, GAIA and TPM (if present) are mocked as well:
#
# * The mock DNS resolves all addresses to 127.0.0.1. This allows the mock GAIA
# to handle all login attempts. It also eliminates the impact of flaky network
# connections on tests. Beware though that no cloud services can be accessed
# due to this DNS redirect.
#
# * The mock GAIA permits login with arbitrary credentials and accepts any OAuth
# tokens sent to it for verification as valid.
#
# * When running on a real device, its TPM is disabled. If the TPM were enabled,
# enrollment could not be undone without a reboot. Disabling the TPM makes
# cryptohomed behave as if no TPM was present, allowing enrollment to be
# undone by removing the install attributes.
#
# To disable the TPM, 0 must be written to /sys/class/misc/tpm0/device/enabled.
# Since this file is not writeable, a tpmfs is mounted that shadows the file
# with a writeable copy.
import json
import logging
import os
import subprocess
import pyauto
if pyauto.PyUITest.IsChromeOS():
import sys
import warnings
import pyauto_paths
# Ignore deprecation warnings, they make our output more cluttered.
warnings.filterwarnings('ignore', category=DeprecationWarning)
# Find the path to the pyproto and add it to sys.path.
# Prepend it so that google.protobuf is loaded from here.
for path in pyauto_paths.GetBuildDirs():
p = os.path.join(path, 'pyproto')
if os.path.isdir(p):
sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy',
'proto')] + sys.path
break
sys.path.append('/usr/local') # to import autotest libs.
import dbus
import device_management_backend_pb2 as dm
import pyauto_utils
import string
import tempfile
import urllib
import urllib2
import uuid
from autotest.cros import auth_server
from autotest.cros import constants
from autotest.cros import cros_ui
from autotest.cros import dns_server
elif pyauto.PyUITest.IsWin():
import _winreg as winreg
elif pyauto.PyUITest.IsMac():
import getpass
import plistlib
# ASN.1 object identifier for PKCS#1/RSA.
PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'
TPM_SYSFS_PATH = '/sys/class/misc/tpm0'
TPM_SYSFS_ENABLED_FILE = os.path.join(TPM_SYSFS_PATH, 'device/enabled')
class PolicyTestBase(pyauto.PyUITest):
"""A base class for tests that need to set up and modify policies.
Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and
SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome.
"""
if pyauto.PyUITest.IsChromeOS():
# TODO(bartfab): Extend the C++ wrapper that starts the mock DMServer so
# that an owner can be passed in. Without this, the server will assume that
# the owner is user@example.com and for consistency, so must we.
owner = 'user@example.com'
# Subclasses may override these credentials to fake enrollment into another
# mode or use different device and machine IDs.
mode = 'enterprise'
device_id = string.upper(str(uuid.uuid4()))
machine_id = 'CROSTEST'
_auth_server = None
_dns_server = None
def ShouldAutoLogin(self):
return False
@staticmethod
def _Call(command, check=False):
"""Invokes a subprocess and optionally asserts the return value is zero."""
with open(os.devnull, 'w') as devnull:
if check:
return subprocess.check_call(command.split(' '), stdout=devnull)
else:
return subprocess.call(command.split(' '), stdout=devnull)
def _WriteFile(self, path, content):
"""Writes content to path, creating any intermediary directories."""
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
f = open(path, 'w')
f.write(content)
f.close()
def _GetTestServerPoliciesFilePath(self):
"""Returns the path of the cloud policy configuration file."""
assert self.IsChromeOS()
return os.path.join(self._temp_data_dir, 'device_management')
def _GetHttpURLForDeviceManagement(self):
"""Returns the URL at which the TestServer is serving user policy."""
assert self.IsChromeOS()
return self._http_server.GetURL('device_management').spec()
def _RemoveIfExists(self, filename):
"""Removes a file if it exists."""
if os.path.exists(filename):
os.remove(filename)
def _StartSessionManagerAndChrome(self):
"""Starts the session manager and Chrome.
Requires that the session manager be stopped already.
"""
# Ugly hack: session manager will not spawn Chrome if this file exists. That
# is usually a good thing (to keep the automation channel open), but in this
# case we really want to restart chrome. PyUITest.setUp() will be called
# after session manager and chrome have restarted, and will setup the
# automation channel.
restore_magic_file = False
if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE):
logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. '
'Removing temporarily for the next restart.')
restore_magic_file = True
os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
logging.debug('Starting session manager again')
cros_ui.start()
# cros_ui.start() waits for the login prompt to be visible, so Chrome has
# already started once it returns.
if restore_magic_file:
open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close()
assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
def _WritePolicyOnChromeOS(self):
"""Updates the mock DMServer's input file with current policy."""
assert self.IsChromeOS()
policy_dict = {
'google/chromeos/device': self._device_policy,
'google/chromeos/user': {
'mandatory': self._user_policy,
'recommended': {},
},
'managed_users': ['*'],
}
self._WriteFile(self._GetTestServerPoliciesFilePath(),
json.dumps(policy_dict, sort_keys=True, indent=2) + '\n')
@staticmethod
def _IsCryptohomedReadyOnChromeOS():
"""Checks whether cryptohomed is running and ready to accept DBus calls."""
assert pyauto.PyUITest.IsChromeOS()
try:
bus = dbus.SystemBus()
proxy = bus.get_object('org.chromium.Cryptohome',
'/org/chromium/Cryptohome')
dbus.Interface(proxy, 'org.chromium.CryptohomeInterface')
except dbus.DBusException:
return False
return True
def _ClearInstallAttributesOnChromeOS(self):
"""Resets the install attributes."""
assert self.IsChromeOS()
self._RemoveIfExists('/home/.shadow/install_attributes.pb')
self._Call('restart cryptohomed', check=True)
assert self.WaitUntil(self._IsCryptohomedReadyOnChromeOS)
def _DMPostRequest(self, request_type, request, headers):
"""Posts a request to the mock DMServer."""
assert self.IsChromeOS()
url = self._GetHttpURLForDeviceManagement()
url += '?' + urllib.urlencode({
'deviceid': self.device_id,
'oauth_token': 'dummy_oauth_token_that_is_not_checked_anyway',
'request': request_type,
'devicetype': 2,
'apptype': 'Chrome',
'agent': 'Chrome',
})
response = dm.DeviceManagementResponse()
response.ParseFromString(urllib2.urlopen(urllib2.Request(
url, request.SerializeToString(), headers)).read())
return response
def _DMRegisterDevice(self):
"""Registers with the mock DMServer and returns the DMToken."""
assert self.IsChromeOS()
dm_request = dm.DeviceManagementRequest()
request = dm_request.register_request
request.type = dm.DeviceRegisterRequest.DEVICE
request.machine_id = self.machine_id
dm_response = self._DMPostRequest('register', dm_request, {})
return dm_response.register_response.device_management_token
def _DMFetchPolicy(self, dm_token):
"""Fetches device policy from the mock DMServer."""
assert self.IsChromeOS()
dm_request = dm.DeviceManagementRequest()
policy_request = dm_request.policy_request
request = policy_request.request.add()
request.policy_type = 'google/chromeos/device'
request.signature_type = dm.PolicyFetchRequest.SHA1_RSA
headers = {'Authorization': 'GoogleDMToken token=' + dm_token}
dm_response = self._DMPostRequest('policy', dm_request, headers)
response = dm_response.policy_response.response[0]
assert response.policy_data
assert response.policy_data_signature
assert response.new_public_key
return response
def ExtraChromeFlags(self):
"""Sets up Chrome to use cloud policies on ChromeOS."""
flags = pyauto.PyUITest.ExtraChromeFlags(self)
if self.IsChromeOS():
while '--skip-oauth-login' in flags:
flags.remove('--skip-oauth-login')
url = self._GetHttpURLForDeviceManagement()
flags.append('--device-management-url=' + url)
flags.append('--disable-sync')
return flags
def _SetUpWithSessionManagerStopped(self):
"""Sets up the test environment after stopping the session manager."""
assert self.IsChromeOS()
logging.debug('Stopping session manager')
cros_ui.stop(allow_fail=True)
# Start mock GAIA server.
self._auth_server = auth_server.GoogleAuthServer()
self._auth_server.run()
# Disable TPM if present.
if os.path.exists(TPM_SYSFS_PATH):
self._Call('mount -t tmpfs -o size=1k tmpfs %s'
% os.path.realpath(TPM_SYSFS_PATH), check=True)
self._WriteFile(TPM_SYSFS_ENABLED_FILE, '0')
# Clear install attributes and restart cryptohomed to pick up the change.
self._ClearInstallAttributesOnChromeOS()
# Set install attributes to mock enterprise enrollment.
bus = dbus.SystemBus()
proxy = bus.get_object('org.chromium.Cryptohome',
'/org/chromium/Cryptohome')
install_attributes = {
'enterprise.device_id': self.device_id,
'enterprise.domain': string.split(self.owner, '@')[-1],
'enterprise.mode': self.mode,
'enterprise.owned': 'true',
'enterprise.user': self.owner
}
interface = dbus.Interface(proxy, 'org.chromium.CryptohomeInterface')
for name, value in install_attributes.iteritems():
interface.InstallAttributesSet(name, '%s\0' % value)
interface.InstallAttributesFinalize()
# Start mock DNS server that redirects all traffic to 127.0.0.1.
self._dns_server = dns_server.LocalDns()
self._dns_server.run()
# Start mock DMServer.
source_dir = os.path.normpath(pyauto_paths.GetSourceDir())
self._temp_data_dir = tempfile.mkdtemp(dir=source_dir)
logging.debug('TestServer input path: %s' % self._temp_data_dir)
relative_temp_data_dir = os.path.basename(self._temp_data_dir)
self._http_server = self.StartHTTPServer(relative_temp_data_dir)
# Initialize the policy served.
self._device_policy = {}
self._user_policy = {}
self._WritePolicyOnChromeOS()
# Register with mock DMServer and retrieve initial device policy blob.
dm_token = self._DMRegisterDevice()
policy = self._DMFetchPolicy(dm_token)
# Write the initial device policy blob.
self._WriteFile(constants.OWNER_KEY_FILE, policy.new_public_key)
self._WriteFile(constants.SIGNED_POLICY_FILE, policy.SerializeToString())
# Remove any existing vaults.
self.RemoveAllCryptohomeVaultsOnChromeOS()
# Restart session manager and Chrome.
self._StartSessionManagerAndChrome()
def _tearDownWithSessionManagerStopped(self):
"""Resets the test environment after stopping the session manager."""
assert self.IsChromeOS()
logging.debug('Stopping session manager')
cros_ui.stop(allow_fail=True)
# Stop mock GAIA server.
if self._auth_server:
self._auth_server.stop()
# Reenable TPM if present.
if os.path.exists(TPM_SYSFS_PATH):
self._Call('umount %s' % os.path.realpath(TPM_SYSFS_PATH))
# Clear install attributes and restart cryptohomed to pick up the change.
self._ClearInstallAttributesOnChromeOS()
# Stop mock DNS server.
if self._dns_server:
self._dns_server.stop()
# Stop mock DMServer.
self.StopHTTPServer(self._http_server)
# Clear the policy served.
pyauto_utils.RemovePath(self._temp_data_dir)
# Remove the device policy blob.
self._RemoveIfExists(constants.OWNER_KEY_FILE)
self._RemoveIfExists(constants.SIGNED_POLICY_FILE)
# Remove any existing vaults.
self.RemoveAllCryptohomeVaultsOnChromeOS()
# Restart session manager and Chrome.
self._StartSessionManagerAndChrome()
def setUp(self):
"""Sets up the platform for policy testing.
On ChromeOS, part of the setup involves restarting the session manager to
inject an initial device policy blob.
"""
if self.IsChromeOS():
# Perform the remainder of the setup with the device manager stopped.
try:
self.WaitForSessionManagerRestart(
self._SetUpWithSessionManagerStopped)
except:
# Destroy the non re-entrant services.
if self._auth_server:
self._auth_server.stop()
if self._dns_server:
self._dns_server.stop()
raise
pyauto.PyUITest.setUp(self)
self._branding = self.GetBrowserInfo()['properties']['branding']
def tearDown(self):
"""Cleans up the policies and related files created in tests."""
if self.IsChromeOS():
# Perform the cleanup with the device manager stopped.
self.WaitForSessionManagerRestart(self._tearDownWithSessionManagerStopped)
else:
# On other platforms, there is only user policy to clear.
self.SetUserPolicy(refresh=False)
pyauto.PyUITest.tearDown(self)
def LoginWithTestAccount(self, account='prod_enterprise_test_user'):
"""Convenience method for logging in with one of the test accounts."""
assert self.IsChromeOS()
credentials = self.GetPrivateInfo()[account]
self.Login(credentials['username'], credentials['password'])
assert self.GetLoginInfo()['is_logged_in']
def _GetCurrentLoginScreenId(self):
return self.ExecuteJavascriptInOOBEWebUI(
"""window.domAutomationController.send(
String(cr.ui.Oobe.getInstance().currentScreen.id));
""")
def _WaitForLoginScreenId(self, id):
self.assertTrue(
self.WaitUntil(function=self._GetCurrentLoginScreenId,
expect_retval=id),
msg='Expected login screen "%s" to be visible.' % id)
def _CheckLoginFormLoading(self):
return self.ExecuteJavascriptInOOBEWebUI(
"""window.domAutomationController.send(
cr.ui.Oobe.getInstance().currentScreen.loading);
""")
def PrepareToWaitForLoginFormReload(self):
self.assertEqual('gaia-signin',
self._GetCurrentLoginScreenId(),
msg='Expected the login form to be visible.')
self.assertTrue(
self.WaitUntil(function=self._CheckLoginFormLoading,
expect_retval=False),
msg='Expected the login form to finish loading.')
# Set up a sentinel variable that is false now and will toggle to true when
# the login form starts reloading.
self.ExecuteJavascriptInOOBEWebUI(
"""var screen = cr.ui.Oobe.getInstance().currentScreen;
if (!('reload_started' in screen)) {
screen.orig_loadAuthExtension_ = screen.loadAuthExtension_;
screen.loadAuthExtension_ = function(data) {
this.orig_loadAuthExtension_(data);
if (this.loading)
this.reload_started = true;
}
}
screen.reload_started = false;
window.domAutomationController.send(true);""")
def _CheckLoginFormReloaded(self):
return self.ExecuteJavascriptInOOBEWebUI(
"""window.domAutomationController.send(
cr.ui.Oobe.getInstance().currentScreen.reload_started &&
!cr.ui.Oobe.getInstance().currentScreen.loading);
""")
def WaitForLoginFormReload(self):
self.assertEqual('gaia-signin',
self._GetCurrentLoginScreenId(),
msg='Expected the login form to be visible.')
self.assertTrue(
self.WaitUntil(function=self._CheckLoginFormReloaded),
msg='Expected the login form to finish reloading.')
def _SetUserPolicyChromeOS(self, user_policy=None):
"""Writes the given user policy to the mock DMServer's input file."""
self._user_policy = user_policy or {}
self._WritePolicyOnChromeOS()
def _SetUserPolicyWin(self, user_policy=None):
"""Writes the given user policy to the Windows registry."""
def SetValueEx(key, sub_key, value):
if isinstance(value, int):
winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value))
elif isinstance(value, basestring):
winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii'))
elif isinstance(value, list):
k = winreg.CreateKey(key, sub_key)
for index, v in list(enumerate(value)):
SetValueEx(k, str(index + 1), v)
winreg.CloseKey(k)
else:
raise TypeError('Unsupported data type: "%s"' % value)
assert self.IsWin()
if self._branding == 'Google Chrome':
reg_base = r'SOFTWARE\Policies\Google\Chrome'
else:
reg_base = r'SOFTWARE\Policies\Chromium'
if subprocess.call(
r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0:
logging.debug(r'Removing %s' % reg_base)
subprocess.call(r'reg delete HKLM\%s /f' % reg_base)
if user_policy is not None:
root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base)
for k, v in user_policy.iteritems():
SetValueEx(root_key, k, v)
winreg.CloseKey(root_key)
def _SetUserPolicyLinux(self, user_policy=None):
"""Writes the given user policy to the JSON policy file read by Chrome."""
assert self.IsLinux()
sudo_cmd_file = os.path.join(os.path.dirname(__file__),
'policy_posix_util.py')
if self._branding == 'Google Chrome':
policies_location_base = '/etc/opt/chrome'
else:
policies_location_base = '/etc/chromium'
if os.path.exists(policies_location_base):
logging.debug('Removing directory %s' % policies_location_base)
subprocess.call(['suid-python', sudo_cmd_file,
'remove_dir', policies_location_base])
if user_policy is not None:
self._WriteFile('/tmp/chrome.json',
json.dumps(user_policy, sort_keys=True, indent=2) + '\n')
policies_location = '%s/policies/managed' % policies_location_base
subprocess.call(['suid-python', sudo_cmd_file,
'setup_dir', policies_location])
subprocess.call(['suid-python', sudo_cmd_file,
'perm_dir', policies_location])
# Copy chrome.json file to the managed directory
subprocess.call(['suid-python', sudo_cmd_file,
'copy', '/tmp/chrome.json', policies_location])
os.remove('/tmp/chrome.json')
def _SetUserPolicyMac(self, user_policy=None):
"""Writes the given user policy to the plist policy file read by Chrome."""
assert self.IsMac()
sudo_cmd_file = os.path.join(os.path.dirname(__file__),
'policy_posix_util.py')
if self._branding == 'Google Chrome':
policies_file_base = 'com.google.Chrome.plist'
else:
policies_file_base = 'org.chromium.Chromium.plist'
policies_location = os.path.join('/Library', 'Managed Preferences',
getpass.getuser())
if os.path.exists(policies_location):
logging.debug('Removing directory %s' % policies_location)
subprocess.call(['suid-python', sudo_cmd_file,
'remove_dir', policies_location])
if user_policy is not None:
policies_tmp_file = os.path.join('/tmp', policies_file_base)
plistlib.writePlist(user_policy, policies_tmp_file)
subprocess.call(['suid-python', sudo_cmd_file,
'setup_dir', policies_location])
# Copy policy file to the managed directory
subprocess.call(['suid-python', sudo_cmd_file,
'copy', policies_tmp_file, policies_location])
os.remove(policies_tmp_file)
def SetUserPolicy(self, user_policy=None, refresh=True):
"""Sets the user policy provided as a dict.
Args:
user_policy: The user policy to set. None clears it.
refresh: If True, Chrome will refresh and apply the new policy.
Requires Chrome to be alive for it.
"""
if self.IsChromeOS():
self._SetUserPolicyChromeOS(user_policy=user_policy)
elif self.IsWin():
self._SetUserPolicyWin(user_policy=user_policy)
elif self.IsLinux():
self._SetUserPolicyLinux(user_policy=user_policy)
elif self.IsMac():
self._SetUserPolicyMac(user_policy=user_policy)
else:
raise NotImplementedError('Not available on this platform.')
if refresh:
self.RefreshPolicies()
def SetDevicePolicy(self, device_policy=None, refresh=True):
"""Sets the device policy provided as a dict.
Args:
device_policy: The device policy to set. None clears it.
refresh: If True, Chrome will refresh and apply the new policy.
Requires Chrome to be alive for it.
"""
assert self.IsChromeOS()
self._device_policy = device_policy or {}
self._WritePolicyOnChromeOS()
if refresh:
self.RefreshPolicies()