flimflam-test: Refactor and clean flimflam.py
Fix a lot of spacing issues, and make some methods static so that pylint
is happy. Change two instances where we accessed dbus exception name
strings directly to use getters.
BUG=chromium-os:38166
TEST=Connected and disconnect from WiFi network via script.
Change-Id: I9d1698a187c3caa5dc9441cf3b9471a8a43f4086
Reviewed-on: https://gerrit.chromium.org/gerrit/41874
Tested-by: Christopher Wiley <wiley@chromium.org>
Reviewed-by: mukesh agrawal <quiche@chromium.org>
Commit-Queue: Christopher Wiley <wiley@chromium.org>
diff --git a/test/flimflam.py b/test/flimflam.py
index b2dec7d..c14717f 100755
--- a/test/flimflam.py
+++ b/test/flimflam.py
@@ -2,18 +2,18 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import logging, os, re, string, threading, time
+import logging, time
import dbus
def make_dbus_boolean(value):
- value = value.upper()
- if value in ["ON", "TRUE"]:
- return dbus.Boolean(1)
- elif value in ["OFF", "FALSE"]:
- return dbus.Boolean(0)
- else:
- return dbus.Boolean(int(value))
+ value = value.upper()
+ if value in ["ON", "TRUE"]:
+ return dbus.Boolean(1)
+ elif value in ["OFF", "FALSE"]:
+ return dbus.Boolean(0)
+ else:
+ return dbus.Boolean(int(value))
#
# Convert a DBus value to a printable value; used
@@ -44,18 +44,71 @@
class FlimFlam(object):
- CONNMAN="org.chromium.flimflam"
+ SHILL_DBUS_INTERFACE = "org.chromium.flimflam"
UNKNOWN_METHOD = 'org.freedesktop.DBus.Error.UnknownMethod'
DEVICE_WIMAX = 'wimax'
DEVICE_CELLULAR = 'cellular'
+ @staticmethod
+ def _GetContainerName(kind):
+ """Map shill element names to the names of their collections."""
+ # For example, Device - > Devices.
+ # Just pulling this out so we can use a map if we start
+ # caring about "AvailableTechnologies"
+ return kind + "s"
+
+ @staticmethod
+ def WaitForServiceState(service, expected_states, timeout,
+ ignore_failure=False, property_name="State"):
+ """Wait until service enters a state in expected_states or times out.
+ Args:
+ service: service to watch
+ expected_states: list of exit states
+ timeout: in seconds
+ ignore_failure: should the failure state be ignored?
+ property_name: name of service property
+
+ Returns: (state, seconds waited)
+
+ If the state is "failure" and ignore_failure is False we return
+ immediately without waiting for the timeout.
+ """
+
+ state = None
+ start_time = time.time()
+ timeout = start_time + timeout
+ while time.time() < timeout:
+ properties = service.GetProperties(utf8_strings = True)
+ state = properties.get(property_name, None)
+ if ((state == "failure" and not ignore_failure) or
+ state in expected_states):
+ break
+ time.sleep(.5)
+
+ config_time = time.time() - start_time
+ # str() to remove DBus boxing
+ return (str(state), config_time)
+
+ @staticmethod
+ def DisconnectService(service, wait_timeout=15):
+ try:
+ service.Disconnect()
+ except dbus.exceptions.DBusException, error:
+ if error.get_dbus_name() not in [
+ FlimFlam.SHILL_DBUS_INTERFACE + ".Error.InProgress",
+ FlimFlam.SHILL_DBUS_INTERFACE + ".Error.NotConnected", ]:
+ raise error
+ return FlimFlam.WaitForServiceState(service, ['idle'], wait_timeout)
+
def __init__(self, bus=None):
if not bus:
bus = dbus.SystemBus()
self.bus = bus
- self.manager = dbus.Interface(bus.get_object(self.CONNMAN, "/"),
- self.CONNMAN+".Manager")
+ shill = bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, "/")
+ self.manager = dbus.Interface(
+ shill,
+ FlimFlam.SHILL_DBUS_INTERFACE + ".Manager")
def _FindDevice(self, device_type, timeout):
""" Return the first device object that matches a given device type.
@@ -186,10 +239,11 @@
# it is actually still trying to connect. As such, while we're
# waiting for retry, keep checking the service state to see if
# it actually succeeded in connecting.
- state = self.WaitForServiceState(service=service,
- expected_states=connected_states,
- timeout=retry_sleep,
- ignore_failure=True)[0]
+ state = FlimFlam.WaitForServiceState(
+ service=service,
+ expected_states=connected_states,
+ timeout=retry_sleep,
+ ignore_failure=True)[0]
if state in connected_states:
return (True, output)
@@ -206,9 +260,9 @@
logging.info("Associating...")
(state, assoc_time) = (
- self.WaitForServiceState(service,
- ["configuration"] + connected_states,
- assoc_timeout))
+ FlimFlam.WaitForServiceState(service,
+ ["configuration"] + connected_states,
+ assoc_timeout))
output["state"] = state
if state == "failure":
output["reason"] = "FAIL(assoc)"
@@ -220,8 +274,8 @@
(state, config_time) = (
- self.WaitForServiceState(service,
- connected_states, config_timeout))
+ FlimFlam.WaitForServiceState(service,
+ connected_states, config_timeout))
output["state"] = state
if state == "failure":
output["reason"] = "FAIL(config)"
@@ -234,42 +288,26 @@
return (True, output)
- def DisconnectService(self, service, wait_timeout=15):
- try:
- service.Disconnect()
- except dbus.exceptions.DBusException, error:
- if error._dbus_error_name not in [
- self.CONNMAN + ".Error.InProgress",
- self.CONNMAN + ".Error.NotConnected", ]:
- raise error
- return self.WaitForServiceState(service, ['idle'], wait_timeout)
-
- def _GetContainerName(self, kind):
- """Map connman element names to the names of their collections."""
- # For example, Device - > Devices.
- # Just pulling this out so we can use a map if we start
- # caring about "AvailableTechnologies"
- return kind + "s"
-
def GetObjectInterface(self, kind, path):
- return dbus.Interface(self.bus.get_object(self.CONNMAN, path),
- self.CONNMAN + "." + kind)
+ return dbus.Interface(
+ self.bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, path),
+ FlimFlam.SHILL_DBUS_INTERFACE + "." + kind)
def FindElementByNameSubstring(self, kind, substring):
properties = self.manager.GetProperties(utf8_strings = True)
- for path in properties[self._GetContainerName(kind)]:
+ for path in properties[FlimFlam._GetContainerName(kind)]:
if path.find(substring) >= 0:
return self.GetObjectInterface(kind, path)
return None
def FindElementByPropertySubstring(self, kind, prop, substring):
properties = self.manager.GetProperties(utf8_strings = True)
- for path in properties[self._GetContainerName(kind)]:
+ for path in properties[FlimFlam._GetContainerName(kind)]:
obj = self.GetObjectInterface(kind, path)
try:
obj_properties = obj.GetProperties(utf8_strings = True)
except dbus.exceptions.DBusException, error:
- if error._dbus_error_name == self.UNKNOWN_METHOD:
+ if error.get_dbus_name() == self.UNKNOWN_METHOD:
# object disappeared; ignore and keep looking
continue
else:
@@ -283,7 +321,7 @@
if properties is None:
properties = self.manager.GetProperties(utf8_strings = True)
return [self.GetObjectInterface(kind, path)
- for path in properties[self._GetContainerName(kind)]]
+ for path in properties[FlimFlam._GetContainerName(kind)]]
def GetActiveProfile(self):
properties = self.manager.GetProperties(utf8_strings = True)
@@ -310,37 +348,6 @@
properties = self.manager.GetProperties(utf8_strings = True)
return properties["State"]
- def WaitForServiceState(self, service, expected_states, timeout,
- ignore_failure=False, property_name="State"):
- """Wait until service enters a state in expected_states or times out.
- Args:
- service: service to watch
- expected_states: list of exit states
- timeout: in seconds
- ignore_failure: should the failure state be ignored?
- property_name: name of service property
-
- Returns: (state, seconds waited)
-
- If the state is "failure" and ignore_failure is False we return
- immediately without waiting for the timeout.
- """
-
- state = None
- start_time = time.time()
- timeout = start_time + timeout
- while time.time() < timeout:
- properties = service.GetProperties(utf8_strings = True)
- state = properties.get(property_name, None)
- if ((state == "failure" and not ignore_failure) or
- state in expected_states):
- break
- time.sleep(.5)
-
- config_time = time.time() - start_time
- # str() to remove DBus boxing
- return (str(state), config_time)
-
def GetDebugTags(self):
return self.manager.GetDebugTags()
@@ -357,16 +364,7 @@
raise error
def SetDebugLevel(self, level):
- self.manager.SetDebugLevel(level)
-
- def ConvertSSID(self, ssid_list):
- ssid = ""
- for byte in ssid_list:
- if (str(byte) in string.printable):
- ssid = ssid + str(byte)
- else:
- ssid = ssid + "."
- return ssid
+ self.manager.SetDebugLevel(level)
def GetServiceOrder(self):
return self.manager.GetServiceOrder()
@@ -381,7 +379,7 @@
self.manager.EnableTechnology(tech)
except dbus.exceptions.DBusException, error:
if error.get_dbus_name() not in [
- self.CONNMAN + ".Error.AlreadyEnabled" ]:
+ FlimFlam.SHILL_DBUS_INTERFACE + ".Error.AlreadyEnabled" ]:
raise error
def DisableTechnology(self, tech):
@@ -426,68 +424,63 @@
class DeviceManager(object):
- """Use flimflam to isolate a given interface for testing.
+ """Use flimflam to isolate a given interface for testing.
- DeviceManager can be used to turn off network devices that are not
- under test so that they will not interfere with testing.
+ DeviceManager can be used to turn off network devices that are not
+ under test so that they will not interfere with testing.
- NB: Ethernet devices are special inside Flimflam. You will need to
- take care of them via other means (like, for example, the
- backchannel ethernet code in client autotests)
+ NB: Ethernet devices are special inside Flimflam. You will need to
+ take care of them via other means (like, for example, the
+ backchannel ethernet code in client autotests)
- Sample usage:
+ Sample usage:
- device_manager = flimflam.DeviceManager()
- try:
- device_manager.ShutdownAllExcept('cellular')
- use routing.getRouteFor()
- to verify that only the expected device is used
- do stuff to test cellular connections
- finally:
- device_manager.RestoreDevices()
- """
- def __init__(self, flim=None):
- self.flim_ = flim or FlimFlam()
- self.devices_to_restore_ = []
-
- def _EnableDevice(self, device, enable):
- """Enables/Disables a device in shill/flimflam."""
+ device_manager = flimflam.DeviceManager()
try:
- # Assume we're talking to shill.
- if enable:
- device.Enable()
- else:
- device.Disable()
- except dbus.exceptions.DBusException, error:
- if error.get_dbus_name() in [
- "org.freedesktop.DBus.Error.UnknownMethod"]:
- # Fallback to flimflam.
- device.SetProperty("Powered", enable)
- else:
- raise error
+ device_manager.ShutdownAllExcept('cellular')
+ use routing.getRouteFor()
+ to verify that only the expected device is used
+ do stuff to test cellular connections
+ finally:
+ device_manager.RestoreDevices()
+ """
- def ShutdownAllExcept(self, device_type):
- """Shutdown all devices except device_type ones."""
- for device in self.flim_.GetObjectList('Device'):
- device_properties = device.GetProperties(utf8_strings = True);
- if (device_properties["Type"] != device_type):
- logging.info("Powering off %s device %s",
- device_properties["Type"],
- device.object_path)
- self.devices_to_restore_.append(device.object_path)
- self._EnableDevice(device, False)
+ @staticmethod
+ def _EnableDevice(device, enable):
+ """Enables/Disables a device in shill."""
+ if enable:
+ device.Enable()
+ else:
+ device.Disable()
- def RestoreDevices(self):
- """Restore devices powered down in ShutdownAllExcept."""
- to_raise = None
- for device_path in self.devices_to_restore_:
- try:
- logging.info("Attempting to power on device %s", device_path)
- device = self.flim_.GetObjectInterface("Device", device_path)
- self._EnableDevice(device, True)
- except Exception, e:
- # We want to keep on trying to power things on, so save an
- # exception and continue
- to_raise = e
- if to_raise:
- raise to_raise
+ def __init__(self, flim=None):
+ self.flim_ = flim or FlimFlam()
+ self.devices_to_restore_ = []
+
+ def ShutdownAllExcept(self, device_type):
+ """Shutdown all devices except device_type ones."""
+ for device in self.flim_.GetObjectList('Device'):
+ device_properties = device.GetProperties(utf8_strings = True)
+ if (device_properties["Type"] != device_type):
+ logging.info("Powering off %s device %s",
+ device_properties["Type"],
+ device.object_path)
+ self.devices_to_restore_.append(device.object_path)
+ DeviceManager._EnableDevice(device, False)
+
+ def RestoreDevices(self):
+ """Restore devices powered down in ShutdownAllExcept."""
+ should_raise = False
+ to_raise = Exception("Nothing to raise")
+ for device_path in self.devices_to_restore_:
+ try:
+ logging.info("Attempting to power on device %s", device_path)
+ device = self.flim_.GetObjectInterface("Device", device_path)
+ DeviceManager._EnableDevice(device, True)
+ except Exception, e:
+ # We want to keep on trying to power things on, so save an
+ # exception and continue
+ should_raise = True
+ to_raise = e
+ if should_raise:
+ raise to_raise