| # Copyright 2013 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import psutil |
| import sys |
| import time |
| import urllib.parse |
| import util |
| |
| import command_executor |
| from command_executor import Command |
| from webelement import WebElement |
| from webshadowroot import WebShadowRoot |
| from windowreference import WindowReference |
| from framereference import FrameReference |
| from websocket_connection import WebSocketConnection |
| from exceptions import * |
| |
| ELEMENT_KEY_W3C = 'element-6066-11e4-a52e-4f735466cecf' |
| ELEMENT_KEY = 'ELEMENT' |
| SHADOW_KEY = 'shadow-6066-11e4-a52e-4f735466cecf' |
| FRAME_KEY = 'frame-075b-4da1-b6ba-e579c2d3230a' |
| WINDOW_KEY = 'window-fcc6-11e5-b4f8-330a88ab9d7f' |
| MAX_RETRY_COUNT = 5 |
| |
| def _ExceptionForLegacyResponse(response): |
| exception_class_map = { |
| 6: InvalidSessionId, |
| 7: NoSuchElement, |
| 8: NoSuchFrame, |
| 9: UnknownCommand, |
| 10: StaleElementReference, |
| 11: ElementNotVisible, |
| 12: InvalidElementState, |
| 13: UnknownError, |
| 17: JavaScriptError, |
| 19: XPathLookupError, |
| 21: Timeout, |
| 23: NoSuchWindow, |
| 24: InvalidCookieDomain, |
| 26: UnexpectedAlertOpen, |
| 27: NoSuchAlert, |
| 28: ScriptTimeout, |
| 32: InvalidSelector, |
| 33: SessionNotCreated, |
| 60: ElementNotInteractable, |
| 61: InvalidArgument, |
| 62: NoSuchCookie, |
| 405: UnsupportedOperation |
| } |
| status = response['status'] |
| msg = response['value']['message'] |
| return exception_class_map.get(status, ChromeDriverException)(msg) |
| |
| def _ExceptionForStandardResponse(response): |
| error = response['value']['error'] |
| msg = response['value']['message'] |
| |
| stacktrace = response['value']['stacktrace'] |
| if stacktrace: |
| msg += '\n\nStackTrace:\n\n' + stacktrace |
| |
| return EXCEPTION_MAP.get(error, ChromeDriverException)(msg) |
| |
| class ChromeDriver(object): |
| """Starts and controls a single Chrome instance on this machine.""" |
| |
| retry_count = 0 |
| retried_tests = [] |
| |
| def __init__(self, server_url, server_pid, **kwargs): |
| try: |
| self._InternalInit(server_url, **kwargs) |
| except Exception as e: |
| if not str(e).startswith('timed out'): |
| raise |
| else: |
| # Kill ChromeDriver child processes recursively |
| # (i.e. browsers and their child processes etc) |
| # when there is a timeout for launching browser |
| if server_pid: |
| processes = psutil.Process(server_pid).children(recursive=True) |
| if len(processes): |
| print('Terminating', len(processes), 'processes') |
| for p in processes: |
| p.terminate() |
| |
| _, alive = psutil.wait_procs(processes, timeout=3) |
| if len(alive): |
| print('Killing', len(alive), 'processes') |
| for p in alive: |
| p.kill() |
| |
| if ChromeDriver.retry_count < MAX_RETRY_COUNT: |
| ChromeDriver.retried_tests.append(kwargs.get('test_name')) |
| try: |
| self._InternalInit(server_url, **kwargs) |
| except: |
| # Only count it as retry if failed |
| print('Retry', ChromeDriver.retry_count, 'failed') |
| ChromeDriver.retry_count = ChromeDriver.retry_count + 1 |
| raise |
| else: |
| raise |
| |
| def _InternalInit(self, server_url, |
| chrome_binary=None, android_package=None, |
| android_activity=None, android_process=None, |
| android_use_running_app=None, chrome_switches=None, |
| chrome_extensions=None, chrome_log_path=None, |
| debugger_address=None, logging_prefs=None, |
| mobile_emulation=None, experimental_options=None, |
| download_dir=None, network_connection=None, |
| send_w3c_capability=True, send_w3c_request=True, |
| page_load_strategy=None, unexpected_alert_behaviour=None, |
| devtools_events_to_log=None, accept_insecure_certs=None, |
| timeouts=None, test_name=None, web_socket_url=None, browser_name=None, |
| http_timeout=None): |
| self._executor = command_executor.CommandExecutor(server_url, |
| http_timeout=http_timeout) |
| self._server_url = server_url |
| self.w3c_compliant = False |
| self.debuggerAddress = None |
| |
| options = {} |
| |
| if experimental_options: |
| assert isinstance(experimental_options, dict) |
| options = experimental_options.copy() |
| |
| if android_package: |
| options['androidPackage'] = android_package |
| if android_activity: |
| options['androidActivity'] = android_activity |
| if android_process: |
| options['androidProcess'] = android_process |
| if android_use_running_app: |
| options['androidUseRunningApp'] = android_use_running_app |
| elif chrome_binary: |
| options['binary'] = chrome_binary |
| |
| if chrome_switches is None: |
| chrome_switches = [] |
| |
| if sys.platform.startswith('linux') and android_package is None: |
| # Workaround for crbug.com/611886. |
| chrome_switches.append('no-sandbox') |
| # https://bugs.chromium.org/p/chromedriver/issues/detail?id=1695 |
| chrome_switches.append('disable-gpu') |
| |
| chrome_switches.append('force-color-profile=srgb') |
| |
| # Resampling can change the distance of a synthetic scroll. |
| chrome_switches.append('disable-features=ResamplingScrollEvents') |
| |
| assert type(chrome_switches) is list |
| options['args'] = chrome_switches |
| |
| # TODO(crbug.com/40101714): Work around a bug with headless on Mac. |
| if (util.GetPlatformName() == 'mac' and |
| browser_name == 'chrome-headless-shell' and |
| debugger_address is None): |
| options['excludeSwitches'] = ['--enable-logging'] |
| |
| if mobile_emulation: |
| assert type(mobile_emulation) is dict |
| options['mobileEmulation'] = mobile_emulation |
| |
| if chrome_extensions: |
| assert type(chrome_extensions) is list |
| options['extensions'] = chrome_extensions |
| |
| if chrome_log_path: |
| assert type(chrome_log_path) is str |
| options['logPath'] = chrome_log_path |
| |
| if debugger_address: |
| assert type(debugger_address) is str |
| options['debuggerAddress'] = debugger_address |
| |
| if logging_prefs: |
| assert type(logging_prefs) is dict |
| log_types = ['client', 'driver', 'browser', 'server', 'performance', |
| 'devtools'] |
| log_levels = ['ALL', 'DEBUG', 'INFO', 'WARNING', 'SEVERE', 'OFF'] |
| for log_type, log_level in logging_prefs.items(): |
| assert log_type in log_types |
| assert log_level in log_levels |
| else: |
| logging_prefs = {} |
| |
| if devtools_events_to_log: |
| assert type(devtools_events_to_log) is list |
| options['devToolsEventsToLog'] = devtools_events_to_log |
| |
| if download_dir: |
| if 'prefs' not in options: |
| options['prefs'] = {} |
| if 'download' not in options['prefs']: |
| options['prefs']['download'] = {} |
| options['prefs']['download']['default_directory'] = download_dir |
| |
| if send_w3c_capability is not None: |
| options['w3c'] = send_w3c_capability |
| |
| params = { |
| 'goog:chromeOptions': options, |
| 'se:options': { |
| 'loggingPrefs': logging_prefs |
| } |
| } |
| |
| if page_load_strategy: |
| assert type(page_load_strategy) is str |
| params['pageLoadStrategy'] = page_load_strategy |
| |
| if unexpected_alert_behaviour: |
| assert type(unexpected_alert_behaviour) is str |
| if send_w3c_request: |
| params['unhandledPromptBehavior'] = unexpected_alert_behaviour |
| else: |
| params['unexpectedAlertBehaviour'] = unexpected_alert_behaviour |
| |
| if network_connection: |
| params['networkConnectionEnabled'] = network_connection |
| |
| if accept_insecure_certs is not None: |
| params['acceptInsecureCerts'] = accept_insecure_certs |
| |
| if timeouts is not None: |
| params['timeouts'] = timeouts |
| |
| if test_name is not None: |
| params['goog:testName'] = test_name |
| |
| if web_socket_url is not None: |
| params['webSocketUrl'] = web_socket_url |
| |
| if browser_name is not None: |
| params['browserName'] = browser_name |
| |
| if send_w3c_request: |
| params = {'capabilities': {'alwaysMatch': params}} |
| else: |
| params = {'desiredCapabilities': params} |
| |
| response = self._ExecuteCommand(Command.NEW_SESSION, params) |
| if len(response.keys()) == 1 and 'value' in response.keys(): |
| self.w3c_compliant = True |
| self._session_id = response['value']['sessionId'] |
| self.capabilities = self._UnwrapValue(response['value']['capabilities']) |
| if ('goog:chromeOptions' in self.capabilities |
| and 'debuggerAddress' in self.capabilities['goog:chromeOptions']): |
| self.debuggerAddress = str( |
| self.capabilities['goog:chromeOptions']['debuggerAddress']) |
| elif isinstance(response['status'], int): |
| self.w3c_compliant = False |
| self._session_id = response['sessionId'] |
| self.capabilities = self._UnwrapValue(response['value']) |
| else: |
| raise UnknownError("unexpected response") |
| |
| def _KeyToTypeMap(self): |
| return [ |
| (WINDOW_KEY, WindowReference), |
| (FRAME_KEY, FrameReference), |
| (SHADOW_KEY, WebShadowRoot), |
| (ELEMENT_KEY_W3C if self.w3c_compliant else ELEMENT_KEY, WebElement), |
| ] |
| |
| def _WrapValue(self, value): |
| """Wrap value from client side for chromedriver side.""" |
| if isinstance(value, dict): |
| converted = {} |
| for key, val in value.items(): |
| converted[key] = self._WrapValue(val) |
| return converted |
| key_to_type = self._KeyToTypeMap() |
| for key, wrapper_type in key_to_type: |
| if isinstance(value, wrapper_type): |
| return {key: value._id} |
| if isinstance(value, list): |
| return list(self._WrapValue(item) for item in value) |
| return value |
| |
| def _UnwrapValue(self, value): |
| if isinstance(value, dict): |
| key_to_type = self._KeyToTypeMap() |
| for key, wrapper_type in key_to_type: |
| if (len(value) == 1 and key in value |
| and isinstance(value[key], str)): |
| return wrapper_type(self, value[key]) |
| unwraped = {} |
| for key, val in value.items(): |
| unwraped[key] = self._UnwrapValue(val) |
| return unwraped |
| elif isinstance(value, list): |
| return list(self._UnwrapValue(item) for item in value) |
| else: |
| return value |
| |
| def _ExecuteCommand(self, command, params={}): |
| params = self._WrapValue(params) |
| try: |
| response = self._executor.Execute(command, params) |
| except Exception as e: |
| if str(e).startswith('timed out'): |
| self._RequestCrash() |
| raise e |
| |
| if ('status' in response |
| and response['status'] != 0): |
| raise _ExceptionForLegacyResponse(response) |
| elif (type(response['value']) is dict |
| and 'error' in response['value']): |
| raise _ExceptionForStandardResponse(response) |
| return response |
| |
| def _RequestCrash(self): |
| # Can't issue a new command without session_id |
| if not hasattr(self, '_session_id') or self._session_id == None: |
| return |
| tempDriver = ChromeDriver(self._server_url, None, |
| debugger_address=self.debuggerAddress, test_name='_forceCrash') |
| try: |
| tempDriver.SendCommandAndGetResult("Page.crash", {}) |
| # allow time to complete writing the minidump |
| time.sleep(5) |
| except Exception as e: |
| # In some cases, Chrome will not honor the request |
| # Print the exception as it may give information on the Chrome state |
| # but Page.crash will also generate exception, so filter that out |
| message = str(e) |
| if 'session deleted because of page crash' not in message: |
| print('\n Exception from Page.crash: ' + message + '\n') |
| tempDriver.Quit() |
| |
| def ExecuteCommand(self, command, params={}): |
| params['sessionId'] = self._session_id |
| response = self._ExecuteCommand(command, params) |
| return self._UnwrapValue(response['value']) |
| |
| def CreateWebSocketConnection(self): |
| return WebSocketConnection(self._server_url, self._session_id) |
| |
| def CreateWebSocketConnectionIPv6(self): |
| url_components = urllib.parse.urlparse(self._server_url) |
| new_url = urllib.parse.urlunparse( |
| url_components._replace( |
| netloc=('%s:%d' % ('[::1]', url_components.port)))) |
| return WebSocketConnection(new_url, self._session_id) |
| |
| def GetWindowHandles(self): |
| return self.ExecuteCommand(Command.GET_WINDOW_HANDLES) |
| |
| def SwitchToWindow(self, handle_or_name): |
| if self.w3c_compliant: |
| self.ExecuteCommand(Command.SWITCH_TO_WINDOW, {'handle': handle_or_name}) |
| else: |
| self.ExecuteCommand(Command.SWITCH_TO_WINDOW, {'name': handle_or_name}) |
| |
| def GetCurrentWindowHandle(self): |
| return self.ExecuteCommand(Command.GET_CURRENT_WINDOW_HANDLE) |
| |
| def CloseWindow(self): |
| return self.ExecuteCommand(Command.CLOSE) |
| |
| def Load(self, url): |
| self.ExecuteCommand(Command.GET, {'url': url}) |
| |
| def LaunchApp(self, app_id): |
| self.ExecuteCommand(Command.LAUNCH_APP, {'id': app_id}) |
| |
| def ExecuteScript(self, script, *args): |
| converted_args = list(args) |
| return self.ExecuteCommand( |
| Command.EXECUTE_SCRIPT, {'script': script, 'args': converted_args}) |
| |
| def CreateVirtualSensor(self, sensor_type, sensor_params=None): |
| params = {'type': sensor_type} |
| if sensor_params is not None: |
| params.update(sensor_params) |
| return self.ExecuteCommand(Command.CREATE_VIRTUAL_SENSOR, params) |
| |
| def UpdateVirtualSensor(self, sensor_type, reading): |
| params = {'type': sensor_type, 'reading': reading} |
| return self.ExecuteCommand(Command.UPDATE_VIRTUAL_SENSOR, params) |
| |
| def RemoveVirtualSensor(self, sensor_type): |
| params = {'type': sensor_type} |
| return self.ExecuteCommand(Command.REMOVE_VIRTUAL_SENSOR, params) |
| |
| def GetVirtualSensorInformation(self, sensor_type): |
| params = {'type': sensor_type} |
| return self.ExecuteCommand(Command.GET_VIRTUAL_SENSOR_INFORMATION, params) |
| |
| def SetPermission(self, parameters): |
| return self.ExecuteCommand(Command.SET_PERMISSION, parameters) |
| |
| def ExecuteAsyncScript(self, script, *args): |
| converted_args = list(args) |
| return self.ExecuteCommand( |
| Command.EXECUTE_ASYNC_SCRIPT, |
| {'script': script, 'args': converted_args}) |
| |
| def SwitchToFrame(self, id_or_name): |
| if isinstance(id_or_name, str) and self.w3c_compliant: |
| try: |
| id_or_name = self.FindElement('css selector', |
| '[id="%s"]' % id_or_name) |
| except NoSuchElement: |
| try: |
| id_or_name = self.FindElement('css selector', |
| '[name="%s"]' % id_or_name) |
| except NoSuchElement: |
| raise NoSuchFrame(id_or_name) |
| self.ExecuteCommand(Command.SWITCH_TO_FRAME, {'id': id_or_name}) |
| |
| def SwitchToFrameByIndex(self, index): |
| self.SwitchToFrame(index) |
| |
| def SwitchToMainFrame(self): |
| self.SwitchToFrame(None) |
| |
| def SwitchToParentFrame(self): |
| self.ExecuteCommand(Command.SWITCH_TO_PARENT_FRAME) |
| |
| def GetSessions(self): |
| return self.ExecuteCommand(Command.GET_SESSIONS) |
| |
| def GetTitle(self): |
| return self.ExecuteCommand(Command.GET_TITLE) |
| |
| def GetPageSource(self): |
| return self.ExecuteCommand(Command.GET_PAGE_SOURCE) |
| |
| def FindElement(self, strategy, target): |
| return self.ExecuteCommand( |
| Command.FIND_ELEMENT, {'using': strategy, 'value': target}) |
| |
| def FindElements(self, strategy, target): |
| return self.ExecuteCommand( |
| Command.FIND_ELEMENTS, {'using': strategy, 'value': target}) |
| |
| def GetTimeouts(self): |
| return self.ExecuteCommand(Command.GET_TIMEOUTS) |
| |
| def SetTimeouts(self, params): |
| if (len(params) == 0): |
| return; |
| sorted_params = sorted(params.items(), key=lambda x: x[1]) |
| max_kv = sorted_params[-1]; |
| # make sure that we have ms on the both sides of inequality |
| if (self._executor.HttpTimeout() * 500 < max_kv[1]): |
| raise ChromeDriverException( |
| 'Timeout "%s" for ChromeDriver exceeds 50%% of the ' |
| 'HTTP connection timeout' |
| % max_kv[0]) |
| return self.ExecuteCommand(Command.SET_TIMEOUTS, params) |
| |
| def GetCurrentUrl(self): |
| return self.ExecuteCommand(Command.GET_CURRENT_URL) |
| |
| def GoBack(self): |
| return self.ExecuteCommand(Command.GO_BACK) |
| |
| def GoForward(self): |
| return self.ExecuteCommand(Command.GO_FORWARD) |
| |
| def Refresh(self): |
| return self.ExecuteCommand(Command.REFRESH) |
| |
| def MouseMoveTo(self, element=None, x_offset=None, y_offset=None): |
| params = {} |
| if element is not None: |
| params['element'] = element._id |
| if x_offset is not None: |
| params['xoffset'] = x_offset |
| if y_offset is not None: |
| params['yoffset'] = y_offset |
| self.ExecuteCommand(Command.MOUSE_MOVE_TO, params) |
| |
| def MouseClick(self, button=0): |
| self.ExecuteCommand(Command.MOUSE_CLICK, {'button': button}) |
| |
| def MouseButtonDown(self, button=0): |
| self.ExecuteCommand(Command.MOUSE_BUTTON_DOWN, {'button': button}) |
| |
| def MouseButtonUp(self, button=0): |
| self.ExecuteCommand(Command.MOUSE_BUTTON_UP, {'button': button}) |
| |
| def MouseDoubleClick(self, button=0): |
| self.ExecuteCommand(Command.MOUSE_DOUBLE_CLICK, {'button': button}) |
| |
| def TouchDown(self, x, y): |
| self.ExecuteCommand(Command.TOUCH_DOWN, {'x': x, 'y': y}) |
| |
| def TouchUp(self, x, y): |
| self.ExecuteCommand(Command.TOUCH_UP, {'x': x, 'y': y}) |
| |
| def TouchMove(self, x, y): |
| self.ExecuteCommand(Command.TOUCH_MOVE, {'x': x, 'y': y}) |
| |
| def TouchScroll(self, element, xoffset, yoffset): |
| params = {'element': element._id, 'xoffset': xoffset, 'yoffset': yoffset} |
| self.ExecuteCommand(Command.TOUCH_SCROLL, params) |
| |
| def TouchFlick(self, element, xoffset, yoffset, speed): |
| params = { |
| 'element': element._id, |
| 'xoffset': xoffset, |
| 'yoffset': yoffset, |
| 'speed': speed |
| } |
| self.ExecuteCommand(Command.TOUCH_FLICK, params) |
| |
| def PerformActions(self, actions): |
| """ |
| actions: a dictionary containing the specified actions users wish to perform |
| """ |
| self.ExecuteCommand(Command.PERFORM_ACTIONS, actions) |
| |
| def ReleaseActions(self): |
| self.ExecuteCommand(Command.RELEASE_ACTIONS) |
| |
| def GetCookies(self): |
| return self.ExecuteCommand(Command.GET_COOKIES) |
| |
| def GetNamedCookie(self, name): |
| return self.ExecuteCommand(Command.GET_NAMED_COOKIE, {'name': name}) |
| |
| def AddCookie(self, cookie): |
| self.ExecuteCommand(Command.ADD_COOKIE, {'cookie': cookie}) |
| |
| def DeleteCookie(self, name): |
| self.ExecuteCommand(Command.DELETE_COOKIE, {'name': name}) |
| |
| def DeleteAllCookies(self): |
| self.ExecuteCommand(Command.DELETE_ALL_COOKIES) |
| |
| def IsAlertOpen(self): |
| return self.ExecuteCommand(Command.GET_ALERT) |
| |
| def GetAlertMessage(self): |
| return self.ExecuteCommand(Command.GET_ALERT_TEXT) |
| |
| def HandleAlert(self, accept, prompt_text=''): |
| if prompt_text: |
| self.ExecuteCommand(Command.SET_ALERT_VALUE, {'text': prompt_text}) |
| if accept: |
| cmd = Command.ACCEPT_ALERT |
| else: |
| cmd = Command.DISMISS_ALERT |
| self.ExecuteCommand(cmd) |
| |
| def IsLoading(self): |
| return self.ExecuteCommand(Command.IS_LOADING) |
| |
| def GetWindowPosition(self): |
| position = self.ExecuteCommand(Command.GET_WINDOW_POSITION, |
| {'windowHandle': 'current'}) |
| return [position['x'], position['y']] |
| |
| def SetWindowPosition(self, x, y): |
| self.ExecuteCommand(Command.SET_WINDOW_POSITION, |
| {'windowHandle': 'current', 'x': x, 'y': y}) |
| |
| def GetWindowSize(self): |
| size = self.ExecuteCommand(Command.GET_WINDOW_SIZE, |
| {'windowHandle': 'current'}) |
| return [size['width'], size['height']] |
| |
| def NewWindow(self, window_type="window"): |
| return self.ExecuteCommand(Command.NEW_WINDOW, |
| {'type': window_type}) |
| |
| def GetWindowRect(self): |
| rect = self.ExecuteCommand(Command.GET_WINDOW_RECT) |
| return [rect['width'], rect['height'], rect['x'], rect['y']] |
| |
| def SetWindowSize(self, width, height): |
| return self.ExecuteCommand( |
| Command.SET_WINDOW_SIZE, |
| {'windowHandle': 'current', 'width': width, 'height': height}) |
| |
| def SetWindowRect(self, width, height, x, y): |
| return self.ExecuteCommand( |
| Command.SET_WINDOW_RECT, |
| {'width': width, 'height': height, 'x': x, 'y': y}) |
| |
| def MaximizeWindow(self): |
| return self.ExecuteCommand(Command.MAXIMIZE_WINDOW, |
| {'windowHandle': 'current'}) |
| |
| def MinimizeWindow(self): |
| return self.ExecuteCommand(Command.MINIMIZE_WINDOW, |
| {'windowHandle': 'current'}) |
| |
| def FullScreenWindow(self): |
| return self.ExecuteCommand(Command.FULLSCREEN_WINDOW) |
| |
| def SetDevicePosture(self, posture): |
| return self.ExecuteCommand(Command.SET_DEVICE_POSTURE, {'posture': posture}) |
| |
| def ClearDevicePosture(self): |
| return self.ExecuteCommand(Command.CLEAR_DEVICE_POSTURE) |
| |
| def SetDisplayFeatures(self, features): |
| return self.ExecuteCommand(Command.SET_DISPLAY_FEATURES, |
| {'features': features}) |
| |
| def ClearDisplayFeatures(self): |
| return self.ExecuteCommand(Command.CLEAR_DISPLAY_FEATURES) |
| |
| def TakeScreenshot(self): |
| return self.ExecuteCommand(Command.SCREENSHOT) |
| |
| def TakeFullPageScreenshot(self): |
| return self.ExecuteCommand(Command.FULL_PAGE_SCREENSHOT) |
| |
| def PrintPDF(self, params={}): |
| return self.ExecuteCommand(Command.PRINT, params) |
| |
| def Quit(self): |
| """Quits the browser and ends the session.""" |
| self.ExecuteCommand(Command.QUIT) |
| |
| def GetLog(self, type): |
| return self.ExecuteCommand(Command.GET_LOG, {'type': type}) |
| |
| def GetAvailableLogTypes(self): |
| return self.ExecuteCommand(Command.GET_AVAILABLE_LOG_TYPES) |
| |
| def SetNetworkConditions(self, latency, download_throughput, |
| upload_throughput, offline=False): |
| # Until http://crbug.com/456324 is resolved, we'll always set 'offline' to |
| # False, as going "offline" will sever Chromedriver's connection to Chrome. |
| params = { |
| 'network_conditions': { |
| 'offline': offline, |
| 'latency': latency, |
| 'download_throughput': download_throughput, |
| 'upload_throughput': upload_throughput |
| } |
| } |
| self.ExecuteCommand(Command.SET_NETWORK_CONDITIONS, params) |
| |
| def SetNetworkConditionsName(self, network_name): |
| self.ExecuteCommand( |
| Command.SET_NETWORK_CONDITIONS, {'network_name': network_name}) |
| |
| def GetNetworkConditions(self): |
| conditions = self.ExecuteCommand(Command.GET_NETWORK_CONDITIONS) |
| return { |
| 'latency': conditions['latency'], |
| 'download_throughput': conditions['download_throughput'], |
| 'upload_throughput': conditions['upload_throughput'], |
| 'offline': conditions['offline'] |
| } |
| |
| def GetNetworkConnection(self): |
| return self.ExecuteCommand(Command.GET_NETWORK_CONNECTION) |
| |
| def DeleteNetworkConditions(self): |
| self.ExecuteCommand(Command.DELETE_NETWORK_CONDITIONS) |
| |
| def SetNetworkConnection(self, connection_type): |
| params = {'parameters': {'type': connection_type}} |
| return self.ExecuteCommand(Command.SET_NETWORK_CONNECTION, params) |
| |
| def SendCommandAndGetResult(self, cmd, cmd_params): |
| params = {'cmd': cmd, 'params': cmd_params}; |
| return self.ExecuteCommand(Command.SEND_COMMAND_AND_GET_RESULT, params) |
| |
| def SendKeys(self, *values): |
| typing = [] |
| for value in values: |
| if isinstance(value, int): |
| value = str(value) |
| for i in range(len(value)): |
| typing.append(value[i]) |
| self.ExecuteCommand(Command.SEND_KEYS_TO_ACTIVE_ELEMENT, {'value': typing}) |
| |
| def GenerateTestReport(self, message): |
| self.ExecuteCommand(Command.GENERATE_TEST_REPORT, {'message': message}) |
| |
| def SetTimeZone(self, timeZone): |
| return self.ExecuteCommand(Command.SET_TIME_ZONE, {'time_zone': timeZone}) |
| |
| def AddVirtualAuthenticator(self, protocol=None, transport=None, |
| hasResidentKey=None, hasUserVerification=None, |
| isUserConsenting=None, isUserVerified=None, |
| extensions=None, defaultBackupState=None, |
| defaultBackupEligibility=None): |
| options = {} |
| if protocol is not None: |
| options['protocol'] = protocol |
| if transport is not None: |
| options['transport'] = transport |
| if hasResidentKey is not None: |
| options['hasResidentKey'] = hasResidentKey |
| if hasUserVerification is not None: |
| options['hasUserVerification'] = hasUserVerification |
| if isUserConsenting is not None: |
| options['isUserConsenting'] = isUserConsenting |
| if isUserVerified is not None: |
| options['isUserVerified'] = isUserVerified |
| if extensions is not None: |
| options['extensions'] = extensions |
| if defaultBackupState is not None: |
| options['defaultBackupState'] = defaultBackupState |
| if defaultBackupEligibility is not None: |
| options['defaultBackupEligibility'] = defaultBackupEligibility |
| |
| return self.ExecuteCommand(Command.ADD_VIRTUAL_AUTHENTICATOR, options) |
| |
| def RemoveVirtualAuthenticator(self, authenticatorId): |
| params = {'authenticatorId': authenticatorId} |
| return self.ExecuteCommand(Command.REMOVE_VIRTUAL_AUTHENTICATOR, params) |
| |
| def AddCredential(self, authenticatorId=None, credentialId=None, |
| isResidentCredential=None, rpId=None, privateKey=None, |
| userHandle=None, signCount=None, largeBlob=None, |
| backupState=None, backupEligibility=None,userName=None, |
| userDisplayName=None): |
| options = {} |
| if authenticatorId is not None: |
| options['authenticatorId'] = authenticatorId |
| if credentialId is not None: |
| options['credentialId'] = credentialId |
| if isResidentCredential is not None: |
| options['isResidentCredential'] = isResidentCredential |
| if rpId is not None: |
| options['rpId'] = rpId |
| if privateKey is not None: |
| options['privateKey'] = privateKey |
| if userHandle is not None: |
| options['userHandle'] = userHandle |
| if signCount is not None: |
| options['signCount'] = signCount |
| if largeBlob is not None: |
| options['largeBlob'] = largeBlob |
| if backupState is not None: |
| options['backupState'] = backupState |
| if backupEligibility is not None: |
| options['backupEligibility'] = backupEligibility |
| if userName is not None: |
| options['userName'] = userName |
| if userDisplayName is not None: |
| options['userDisplayName'] = userDisplayName |
| return self.ExecuteCommand(Command.ADD_CREDENTIAL, options) |
| |
| def GetCredentials(self, authenticatorId): |
| params = {'authenticatorId': authenticatorId} |
| return self.ExecuteCommand(Command.GET_CREDENTIALS, params) |
| |
| def RemoveCredential(self, authenticatorId, credentialId): |
| params = {'authenticatorId': authenticatorId, |
| 'credentialId': credentialId} |
| return self.ExecuteCommand(Command.REMOVE_CREDENTIAL, params) |
| |
| def RemoveAllCredentials(self, authenticatorId): |
| params = {'authenticatorId': authenticatorId} |
| return self.ExecuteCommand(Command.REMOVE_ALL_CREDENTIALS, params) |
| |
| def SetUserVerified(self, authenticatorId, isUserVerified): |
| params = {'authenticatorId': authenticatorId, |
| 'isUserVerified': isUserVerified} |
| return self.ExecuteCommand(Command.SET_USER_VERIFIED, params) |
| |
| def SetCredentialProperties(self, authenticatorId, credentialId, |
| backupState=None, backupEligibility=None): |
| params = {'authenticatorId': authenticatorId, 'credentialId': credentialId} |
| if backupState is not None: |
| params['backupState'] = backupState |
| if backupEligibility is not None: |
| params['backupEligibility'] = backupEligibility |
| return self.ExecuteCommand(Command.SET_CREDENTIAL_PROPERTIES, params) |
| |
| def SetSPCTransactionMode(self, mode): |
| params = {'mode': mode} |
| return self.ExecuteCommand(Command.SET_SPC_TRANSACTION_MODE, params) |
| |
| def SetRPHRegistrationMode(self, mode): |
| params = {'mode': mode} |
| return self.ExecuteCommand(Command.SET_RPH_REGISTRATION_MODE, params) |
| |
| def CancelFedCmDialog(self): |
| return self.ExecuteCommand(Command.CANCEL_FEDCM_DIALOG, {}) |
| |
| def SelectAccount(self, index): |
| params = {'accountIndex': index} |
| return self.ExecuteCommand(Command.SELECT_ACCOUNT, params) |
| |
| def ClickFedCmDialogButton(self, dialogButton, index=None): |
| params = {'dialogButton': dialogButton} |
| if index is not None: |
| params['index'] = index |
| return self.ExecuteCommand(Command.CLICK_FEDCM_DIALOG_BUTTON, params) |
| |
| def GetAccounts(self): |
| return self.ExecuteCommand(Command.GET_ACCOUNTS, {}) |
| |
| def GetFedCmTitle(self): |
| return self.ExecuteCommand(Command.GET_FEDCM_TITLE, {}) |
| |
| def GetDialogType(self): |
| return self.ExecuteCommand(Command.GET_DIALOG_TYPE, {}) |
| |
| def SetDelayEnabled(self, enabled): |
| params = {'enabled': enabled} |
| return self.ExecuteCommand(Command.SET_DELAY_ENABLED, params) |
| |
| def ResetCooldown(self): |
| return self.ExecuteCommand(Command.RESET_COOLDOWN, {}) |
| |
| def RunBounceTrackingMitigations(self): |
| return self.ExecuteCommand(Command.RUN_BOUNCE_TRACKING_MITIGATIONS, {}) |
| |
| def GetSessionId(self): |
| if not hasattr(self, '_session_id'): |
| return None |
| return self._session_id |
| |
| def GetCastSinks(self, vendorId): |
| params = {'vendorId': vendorId} |
| return self.ExecuteCommand(Command.GET_CAST_SINKS, params) |
| |
| def CreateVirtualPressureSource(self, type, metadata=None): |
| params = {'type': type} |
| if metadata is not None: |
| params.update(metadata) |
| return self.ExecuteCommand(Command.CREATE_VIRTUAL_PRESSURE_SOURCE, params) |
| |
| def UpdateVirtualPressureSource(self, type, sample, |
| own_contribution_estimate): |
| params = {'type': type, |
| 'sample': sample, |
| 'own_contribution_estimate': own_contribution_estimate} |
| return self.ExecuteCommand(Command.UPDATE_VIRTUAL_PRESSURE_SOURCE, |
| params) |
| |
| def RemoveVirtualPressureSource(self, type): |
| params = {'type': type} |
| return self.ExecuteCommand(Command.REMOVE_VIRTUAL_PRESSURE_SOURCE, params) |
| |
| def SetProtectedAudienceKAnonymity(self, owner, name, hashes): |
| params = {'owner': owner, 'name': name, 'hashes': hashes} |
| return self.ExecuteCommand(Command.SET_PROTECTED_AUDIENCE_KANONYMITY, |
| params) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, *args): |
| self.Quit() |