blob: f9c4322491a035a8c1a7b7ce186a0b786dcc85ed [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, os, re, string, threading, time
import dbus
def make_dbus_boolean(value):
value = value.upper()
if value in ["ON", "TRUE"]:
return dbus.Boolean(1)
elif value in ["OFF", "FALSE"]:
return dbus.Boolean(0)
else:
return dbus.Boolean(int(value))
#
# Convert a DBus value to a printable value; used
# to print properties returned via DBus
#
def convert_dbus_value(value, indent=0):
spacer = ' ' * indent
if value.__class__ == dbus.Byte:
return int(value)
elif value.__class__ == dbus.Boolean:
return bool(value)
elif value.__class__ == dbus.Dictionary:
valstr = "{"
for key in value:
valstr += "\n" + spacer + " " + \
key + ": " + str(convert_dbus_value(value[key], indent + 4))
valstr += "\n" + spacer + "}"
return valstr
elif value.__class__ == dbus.Array:
valstr = "["
for val in value:
valstr += "\n" + spacer + " " + \
str(convert_dbus_value(val, indent + 4))
valstr += "\n" + spacer + "]"
return valstr
else:
return str(value)
class FlimFlam(object):
CONNMAN="org.chromium.flimflam"
UNKNOWN_METHOD = 'org.freedesktop.DBus.Error.UnknownMethod'
DEVICE_WIMAX = 'wimax'
DEVICE_CELLULAR = 'cellular'
def __init__(self, bus=None):
if not bus:
bus = dbus.SystemBus()
self.bus = bus
self.manager = dbus.Interface(bus.get_object(self.CONNMAN, "/"),
self.CONNMAN+".Manager")
def _FindDevice(self, device_type, timeout):
""" Return the first device object that matches a given device type.
Wait until the device type is avilable or until timeout
Args:
device_type: string format of the type of device.
timeout: in seconds
Returns: Device or None
"""
timeout = time.time() + timeout
device_obj = None
while time.time() < timeout:
device_obj = self.FindElementByPropertySubstring('Device',
'Type',
device_type)
if device_obj:
break
time.sleep(1)
return device_obj
def FindCellularDevice(self, timeout=30):
return self._FindDevice(self.DEVICE_CELLULAR, timeout)
def FindWimaxDevice(self, timeout=30):
return self._FindDevice(self.DEVICE_WIMAX, timeout)
def _FindService(self, device_type, timeout):
"""Return the first service object that matches the device type.
Wait until a service is available or until the timeout.
Args:
device_type: string format of the type of device.
timeout: in seconds
Returns: service or None
"""
start_time = time.time()
timeout = start_time + timeout
service = None
while time.time() < timeout:
service = self.FindElementByPropertySubstring('Service',
'Type', device_type)
if service:
break
time.sleep(.5)
return service
def FindCellularService(self, timeout=30):
return self._FindService(self.DEVICE_CELLULAR, timeout)
def FindWimaxService(self, timeout=30):
return self._FindService(self.DEVICE_WIMAX, timeout)
def GetService(self, params):
path = self.manager.GetService(params)
return self.GetObjectInterface("Service", path)
def ConnectService(self, assoc_timeout=15, config_timeout=15,
async=False, service=None, service_type='',
retry=False, retries=1, retry_sleep=15,
save_creds=False,
**kwargs):
"""Connect to a service and wait until connection is up
Args:
assoc_timeout, config_timeout: Timeouts in seconds.
async: return immediately. do not wait for connection.
service: DBus service
service_type: If supplied, invoke type-specific code to find service.
retry: Retry connection after Connect failure.
retries: Number of retries to allow.
retry_sleep: Number of seconds to wait before retrying.
kwargs: Additional args for type-specific code
Returns:
(success, dictionary), where dictionary contains stats and diagnostics.
"""
output = {}
connected_states = ["ready", "portal", "online"]
# Retry connections on failure. Need to call GetService again as some
# Connect failure states are unrecoverable.
connect_success = False
while not connect_success:
if service_type == "wifi":
try:
# Sanity check to make sure the caller hasn't provided
# both a service and a service type. At which point its
# unclear what they actually want to do, so err on the
# side of caution and except out.
if service:
raise Exception('supplied service and service type')
params = {
"Type": service_type,
"Mode": kwargs["mode"],
"SSID": kwargs["ssid"],
"Security": kwargs.get("security", "none"),
"SaveCredentials": save_creds }
# Supply a passphrase only if it is non-empty.
passphrase = kwargs.get("passphrase", "")
if passphrase:
params["Passphrase"] = passphrase
path = self.manager.GetService(params)
service = self.GetObjectInterface("Service", path)
except Exception, e:
output["reason"] = "FAIL(GetService): exception %s" % e
return (False, output)
output["service"] = service
try:
service.Connect()
connect_success = True
except Exception, e:
if not retry or retries == 0:
output["reason"] = "FAIL(Connect): exception %s" % e
return (False, output)
else:
logging.info("INFO(Connect): connect failed. Retrying...")
retries -= 1
if not connect_success:
# FlimFlam can be a little funny sometimes. At least for In
# Progress errors, even though the service state may be failed,
# it is actually still trying to connect. As such, while we're
# waiting for retry, keep checking the service state to see if
# it actually succeeded in connecting.
state = self.WaitForServiceState(service=service,
expected_states=connected_states,
timeout=retry_sleep,
ignore_failure=True)[0]
if state in connected_states:
return (True, output)
# While service can be caller provided, it is also set by the
# GetService call above. If service was not caller provided we
# need to reset it to None so we don't fail the sanity check
# above.
if service_type != '':
service = None
if async:
return (True, output)
logging.info("Associating...")
(state, assoc_time) = (
self.WaitForServiceState(service,
["configuration"] + connected_states,
assoc_timeout))
output["state"] = state
if state == "failure":
output["reason"] = "FAIL(assoc)"
if assoc_time > assoc_timeout:
output["reason"] = "TIMEOUT(assoc)"
output["assoc_time"] = assoc_time
if "reason" in output:
return (False, output)
(state, config_time) = (
self.WaitForServiceState(service,
connected_states, config_timeout))
output["state"] = state
if state == "failure":
output["reason"] = "FAIL(config)"
if config_time > config_timeout:
output["reason"] = "TIMEOUT(config)"
output["config_time"] = config_time
if "reason" in output:
return (False, output)
return (True, output)
def DisconnectService(self, service, wait_timeout=15):
try:
service.Disconnect()
except dbus.exceptions.DBusException, error:
if error._dbus_error_name not in [
self.CONNMAN + ".Error.InProgress",
self.CONNMAN + ".Error.NotConnected", ]:
raise error
return self.WaitForServiceState(service, ['idle'], wait_timeout)
def _GetContainerName(self, kind):
"""Map connman element names to the names of their collections."""
# For example, Device - > Devices.
# Just pulling this out so we can use a map if we start
# caring about "AvailableTechnologies"
return kind + "s"
def GetObjectInterface(self, kind, path):
return dbus.Interface(self.bus.get_object(self.CONNMAN, path),
self.CONNMAN + "." + kind)
def FindElementByNameSubstring(self, kind, substring):
properties = self.manager.GetProperties(utf8_strings = True)
for path in properties[self._GetContainerName(kind)]:
if path.find(substring) >= 0:
return self.GetObjectInterface(kind, path)
return None
def FindElementByPropertySubstring(self, kind, prop, substring):
properties = self.manager.GetProperties(utf8_strings = True)
for path in properties[self._GetContainerName(kind)]:
obj = self.GetObjectInterface(kind, path)
try:
obj_properties = obj.GetProperties(utf8_strings = True)
except dbus.exceptions.DBusException, error:
if error._dbus_error_name == self.UNKNOWN_METHOD:
# object disappeared; ignore and keep looking
continue
else:
raise error
if (prop in obj_properties and
obj_properties[prop].find(substring) >= 0):
return obj
return None
def GetObjectList(self, kind, properties=None):
if properties is None:
properties = self.manager.GetProperties(utf8_strings = True)
return [self.GetObjectInterface(kind, path)
for path in properties[self._GetContainerName(kind)]]
def GetActiveProfile(self):
properties = self.manager.GetProperties(utf8_strings = True)
return self.GetObjectInterface("Profile", properties["ActiveProfile"])
def CreateProfile(self, ident):
path = self.manager.CreateProfile(ident)
return self.GetObjectInterface("Profile", path)
def RemoveProfile(self, ident):
self.manager.RemoveProfile(ident)
def PushProfile(self, ident):
path = self.manager.PushProfile(ident)
return self.GetObjectInterface("Profile", path)
def PopProfile(self, ident):
self.manager.PopProfile(ident)
def PopAnyProfile(self):
self.manager.PopAnyProfile()
def GetSystemState(self):
properties = self.manager.GetProperties(utf8_strings = True)
return properties["State"]
def WaitForServiceState(self, service, expected_states, timeout,
ignore_failure=False, property_name="State"):
"""Wait until service enters a state in expected_states or times out.
Args:
service: service to watch
expected_states: list of exit states
timeout: in seconds
ignore_failure: should the failure state be ignored?
property_name: name of service property
Returns: (state, seconds waited)
If the state is "failure" and ignore_failure is False we return
immediately without waiting for the timeout.
"""
state = None
start_time = time.time()
timeout = start_time + timeout
while time.time() < timeout:
properties = service.GetProperties(utf8_strings = True)
state = properties.get(property_name, None)
if ((state == "failure" and not ignore_failure) or
state in expected_states):
break
time.sleep(.5)
config_time = time.time() - start_time
# str() to remove DBus boxing
return (str(state), config_time)
def GetDebugTags(self):
return self.manager.GetDebugTags()
def ListDebugTags(self):
return self.manager.ListDebugTags()
def SetDebugTags(self, taglist):
try:
self.manager.SetDebugTags(taglist)
self.SetDebugLevel(-4)
except dbus.exceptions.DBusException, error:
if error.get_dbus_name() not in [
"org.freedesktop.DBus.Error.UnknownMethod" ]:
raise error
def SetDebugLevel(self, level):
self.manager.SetDebugLevel(level)
def ConvertSSID(self, ssid_list):
ssid = ""
for byte in ssid_list:
if (str(byte) in string.printable):
ssid = ssid + str(byte)
else:
ssid = ssid + "."
return ssid
def GetServiceOrder(self):
return self.manager.GetServiceOrder()
def SetServiceOrder(self, new_order):
old_order = self.GetServiceOrder()
self.manager.SetServiceOrder(new_order)
return (old_order, new_order)
def EnableTechnology(self, tech):
try:
self.manager.EnableTechnology(tech)
except dbus.exceptions.DBusException, error:
if error.get_dbus_name() not in [
self.CONNMAN + ".Error.AlreadyEnabled" ]:
raise error
def DisableTechnology(self, tech):
self.manager.DisableTechnology(tech)
def RequestScan(self, technology):
self.manager.RequestScan(technology)
def GetCountry(self):
properties = self.manager.GetProperties(utf8_strings = True)
return properties["Country"]
def SetCountry(self, country):
self.manager.SetProperty("Country", country)
def GetCheckPortalList(self):
properties = self.manager.GetProperties(utf8_strings = True)
return properties["CheckPortalList"]
def SetCheckPortalList(self, tech_list):
self.manager.SetProperty("CheckPortalList", tech_list)
def GetPortalURL(self):
properties = self.manager.GetProperties(utf8_strings = True)
return properties["PortalURL"]
def SetPortalURL(self, url):
self.manager.SetProperty("PortalURL", url)
def GetArpGateway(self):
properties = self.manager.GetProperties()
return properties["ArpGateway"]
def SetArpGateway(self, do_arp_gateway):
self.manager.SetProperty("ArpGateway", do_arp_gateway)
def RegisterAgent(self, path):
return self.manager.RegisterAgent(path)
def UnregisterAgent(self, path):
return self.manager.UnregisterAgent(path)
class DeviceManager(object):
"""Use flimflam to isolate a given interface for testing.
DeviceManager can be used to turn off network devices that are not
under test so that they will not interfere with testing.
NB: Ethernet devices are special inside Flimflam. You will need to
take care of them via other means (like, for example, the
backchannel ethernet code in client autotests)
Sample usage:
device_manager = flimflam.DeviceManager()
try:
device_manager.ShutdownAllExcept('cellular')
use routing.getRouteFor()
to verify that only the expected device is used
do stuff to test cellular connections
finally:
device_manager.RestoreDevices()
"""
def __init__(self, flim=None):
self.flim_ = flim or FlimFlam()
self.devices_to_restore_ = []
def _EnableDevice(self, device, enable):
"""Enables/Disables a device in shill/flimflam."""
try:
# Assume we're talking to shill.
if enable:
device.Enable()
else:
device.Disable()
except dbus.exceptions.DBusException, error:
if error.get_dbus_name() in [
"org.freedesktop.DBus.Error.UnknownMethod"]:
# Fallback to flimflam.
device.SetProperty("Powered", enable)
else:
raise error
def ShutdownAllExcept(self, device_type):
"""Shutdown all devices except device_type ones."""
for device in self.flim_.GetObjectList('Device'):
device_properties = device.GetProperties(utf8_strings = True);
if (device_properties["Type"] != device_type):
logging.info("Powering off %s device %s",
device_properties["Type"],
device.object_path)
self.devices_to_restore_.append(device.object_path)
self._EnableDevice(device, False)
def RestoreDevices(self):
"""Restore devices powered down in ShutdownAllExcept."""
to_raise = None
for device_path in self.devices_to_restore_:
try:
logging.info("Attempting to power on device %s", device_path)
device = self.flim_.GetObjectInterface("Device", device_path)
self._EnableDevice(device, True)
except Exception, e:
# We want to keep on trying to power things on, so save an
# exception and continue
to_raise = e
if to_raise:
raise to_raise