| # Copyright 2013 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import platform |
| import sys |
| import util |
| |
| import command_executor |
| from command_executor import Command |
| from webelement import WebElement |
| |
| ELEMENT_KEY_W3C = "element-6066-11e4-a52e-4f735466cecf" |
| ELEMENT_KEY = "ELEMENT" |
| MAX_RETRY_COUNT = 3 |
| |
| class ChromeDriverException(Exception): |
| pass |
| class NoSuchElement(ChromeDriverException): |
| pass |
| class NoSuchFrame(ChromeDriverException): |
| pass |
| class UnknownCommand(ChromeDriverException): |
| pass |
| class StaleElementReference(ChromeDriverException): |
| pass |
| class ElementNotVisible(ChromeDriverException): |
| pass |
| class InvalidElementState(ChromeDriverException): |
| pass |
| class UnknownError(ChromeDriverException): |
| pass |
| class JavaScriptError(ChromeDriverException): |
| pass |
| class XPathLookupError(ChromeDriverException): |
| pass |
| class Timeout(ChromeDriverException): |
| pass |
| class NoSuchWindow(ChromeDriverException): |
| pass |
| class InvalidCookieDomain(ChromeDriverException): |
| pass |
| class ScriptTimeout(ChromeDriverException): |
| pass |
| class InvalidSelector(ChromeDriverException): |
| pass |
| class SessionNotCreated(ChromeDriverException): |
| pass |
| class InvalidSessionId(ChromeDriverException): |
| pass |
| class UnexpectedAlertOpen(ChromeDriverException): |
| pass |
| class NoSuchAlert(ChromeDriverException): |
| pass |
| class NoSuchCookie(ChromeDriverException): |
| pass |
| class InvalidArgument(ChromeDriverException): |
| pass |
| class ElementNotInteractable(ChromeDriverException): |
| pass |
| class UnsupportedOperation(ChromeDriverException): |
| pass |
| |
| def _ExceptionForLegacyResponse(response): |
| exception_class_map = { |
| 6: InvalidSessionId, |
| 7: NoSuchElement, |
| 8: NoSuchFrame, |
| 9: UnknownCommand, |
| 10: StaleElementReference, |
| 11: ElementNotVisible, |
| 12: InvalidElementState, |
| 13: UnknownError, |
| 17: JavaScriptError, |
| 19: XPathLookupError, |
| 21: Timeout, |
| 23: NoSuchWindow, |
| 24: InvalidCookieDomain, |
| 26: UnexpectedAlertOpen, |
| 27: NoSuchAlert, |
| 28: ScriptTimeout, |
| 32: InvalidSelector, |
| 33: SessionNotCreated, |
| 60: ElementNotInteractable, |
| 61: InvalidArgument, |
| 62: NoSuchCookie, |
| 405: UnsupportedOperation |
| } |
| status = response['status'] |
| msg = response['value']['message'] |
| return exception_class_map.get(status, ChromeDriverException)(msg) |
| |
| def _ExceptionForStandardResponse(response): |
| exception_map = { |
| 'invalid session id' : InvalidSessionId, |
| 'no such element': NoSuchElement, |
| 'no such frame': NoSuchFrame, |
| 'unknown command': UnknownCommand, |
| 'stale element reference': StaleElementReference, |
| 'element not interactable': ElementNotVisible, |
| 'invalid element state': InvalidElementState, |
| 'unknown error': UnknownError, |
| 'javascript error': JavaScriptError, |
| 'invalid selector': XPathLookupError, |
| 'timeout': Timeout, |
| 'no such window': NoSuchWindow, |
| 'invalid cookie domain': InvalidCookieDomain, |
| 'unexpected alert open': UnexpectedAlertOpen, |
| 'no such alert': NoSuchAlert, |
| 'script timeout': ScriptTimeout, |
| 'invalid selector': InvalidSelector, |
| 'session not created': SessionNotCreated, |
| 'no such cookie': NoSuchCookie, |
| 'invalid argument': InvalidArgument, |
| 'element not interactable': ElementNotInteractable, |
| 'unsupported operation': UnsupportedOperation, |
| } |
| |
| error = response['value']['error'] |
| msg = response['value']['message'] |
| return exception_map.get(error, ChromeDriverException)(msg) |
| |
| class ChromeDriver(object): |
| """Starts and controls a single Chrome instance on this machine.""" |
| |
| retry_count = 0 |
| retried_tests = [] |
| |
| def __init__(self, *args, **kwargs): |
| try: |
| self._InternalInit(*args, **kwargs) |
| except Exception as e: |
| if not e.message.startswith('timed out'): |
| raise |
| else: |
| if ChromeDriver.retry_count < MAX_RETRY_COUNT: |
| ChromeDriver.retry_count = ChromeDriver.retry_count + 1 |
| ChromeDriver.retried_tests.append(kwargs.get('test_name')) |
| self._InternalInit(*args, **kwargs) |
| else: |
| raise |
| |
| def _InternalInit(self, server_url, chrome_binary=None, android_package=None, |
| android_activity=None, android_process=None, |
| android_use_running_app=None, chrome_switches=None, |
| chrome_extensions=None, chrome_log_path=None, |
| debugger_address=None, logging_prefs=None, |
| mobile_emulation=None, experimental_options=None, |
| download_dir=None, network_connection=None, |
| send_w3c_capability=True, send_w3c_request=True, |
| page_load_strategy=None, unexpected_alert_behaviour=None, |
| devtools_events_to_log=None, accept_insecure_certs=None, |
| timeouts=None, test_name=None): |
| self._executor = command_executor.CommandExecutor(server_url) |
| self.w3c_compliant = False |
| |
| options = {} |
| |
| if experimental_options: |
| assert isinstance(experimental_options, dict) |
| options = experimental_options.copy() |
| |
| if android_package: |
| options['androidPackage'] = android_package |
| if android_activity: |
| options['androidActivity'] = android_activity |
| if android_process: |
| options['androidProcess'] = android_process |
| if android_use_running_app: |
| options['androidUseRunningApp'] = android_use_running_app |
| elif chrome_binary: |
| options['binary'] = chrome_binary |
| |
| if sys.platform.startswith('linux') and android_package is None: |
| if chrome_switches is None: |
| chrome_switches = [] |
| # Workaround for crbug.com/611886. |
| chrome_switches.append('no-sandbox') |
| # https://bugs.chromium.org/p/chromedriver/issues/detail?id=1695 |
| chrome_switches.append('disable-gpu') |
| |
| if chrome_switches is None: |
| chrome_switches = [] |
| chrome_switches.append('force-color-profile=srgb') |
| |
| if chrome_switches: |
| assert type(chrome_switches) is list |
| options['args'] = chrome_switches |
| |
| if mobile_emulation: |
| assert type(mobile_emulation) is dict |
| options['mobileEmulation'] = mobile_emulation |
| |
| if chrome_extensions: |
| assert type(chrome_extensions) is list |
| options['extensions'] = chrome_extensions |
| |
| if chrome_log_path: |
| assert type(chrome_log_path) is str |
| options['logPath'] = chrome_log_path |
| |
| if debugger_address: |
| assert type(debugger_address) is str |
| options['debuggerAddress'] = debugger_address |
| |
| if logging_prefs: |
| assert type(logging_prefs) is dict |
| log_types = ['client', 'driver', 'browser', 'server', 'performance', |
| 'devtools'] |
| log_levels = ['ALL', 'DEBUG', 'INFO', 'WARNING', 'SEVERE', 'OFF'] |
| for log_type, log_level in logging_prefs.iteritems(): |
| assert log_type in log_types |
| assert log_level in log_levels |
| else: |
| logging_prefs = {} |
| |
| if devtools_events_to_log: |
| assert type(devtools_events_to_log) is list |
| options['devToolsEventsToLog'] = devtools_events_to_log |
| |
| download_prefs = {} |
| if download_dir: |
| if 'prefs' not in options: |
| options['prefs'] = {} |
| if 'download' not in options['prefs']: |
| options['prefs']['download'] = {} |
| options['prefs']['download']['default_directory'] = download_dir |
| |
| if send_w3c_capability is not None: |
| options['w3c'] = send_w3c_capability |
| |
| params = { |
| 'goog:chromeOptions': options, |
| 'goog:loggingPrefs': logging_prefs |
| } |
| |
| if page_load_strategy: |
| assert type(page_load_strategy) is str |
| params['pageLoadStrategy'] = page_load_strategy |
| |
| if unexpected_alert_behaviour: |
| assert type(unexpected_alert_behaviour) is str |
| if send_w3c_request: |
| params['unhandledPromptBehavior'] = unexpected_alert_behaviour |
| else: |
| params['unexpectedAlertBehaviour'] = unexpected_alert_behaviour |
| |
| if network_connection: |
| params['networkConnectionEnabled'] = network_connection |
| |
| if accept_insecure_certs is not None: |
| params['acceptInsecureCerts'] = accept_insecure_certs |
| |
| if timeouts is not None: |
| params['timeouts'] = timeouts |
| |
| if test_name is not None: |
| params['goog:testName'] = test_name |
| |
| if send_w3c_request: |
| params = {'capabilities': {'alwaysMatch': params}} |
| else: |
| params = {'desiredCapabilities': params} |
| |
| response = self._ExecuteCommand(Command.NEW_SESSION, params) |
| if len(response.keys()) == 1 and 'value' in response.keys(): |
| self.w3c_compliant = True |
| self._session_id = response['value']['sessionId'] |
| self.capabilities = self._UnwrapValue(response['value']['capabilities']) |
| elif isinstance(response['status'], int): |
| self.w3c_compliant = False |
| self._session_id = response['sessionId'] |
| self.capabilities = self._UnwrapValue(response['value']) |
| else: |
| raise UnknownError("unexpected response") |
| |
| def _WrapValue(self, value): |
| """Wrap value from client side for chromedriver side.""" |
| if isinstance(value, dict): |
| converted = {} |
| for key, val in value.items(): |
| converted[key] = self._WrapValue(val) |
| return converted |
| elif isinstance(value, WebElement): |
| if (self.w3c_compliant): |
| return {ELEMENT_KEY_W3C: value._id} |
| else: |
| return {ELEMENT_KEY: value._id} |
| elif isinstance(value, list): |
| return list(self._WrapValue(item) for item in value) |
| else: |
| return value |
| |
| def _UnwrapValue(self, value): |
| if isinstance(value, dict): |
| if (self.w3c_compliant and len(value) == 1 |
| and ELEMENT_KEY_W3C in value |
| and isinstance( |
| value[ELEMENT_KEY_W3C], basestring)): |
| return WebElement(self, value[ELEMENT_KEY_W3C]) |
| elif (len(value) == 1 and ELEMENT_KEY in value |
| and isinstance(value[ELEMENT_KEY], basestring)): |
| return WebElement(self, value[ELEMENT_KEY]) |
| else: |
| unwraped = {} |
| for key, val in value.items(): |
| unwraped[key] = self._UnwrapValue(val) |
| return unwraped |
| elif isinstance(value, list): |
| return list(self._UnwrapValue(item) for item in value) |
| else: |
| return value |
| |
| def _ExecuteCommand(self, command, params={}): |
| params = self._WrapValue(params) |
| response = self._executor.Execute(command, params) |
| if ('status' in response |
| and response['status'] != 0): |
| raise _ExceptionForLegacyResponse(response) |
| elif (type(response['value']) is dict |
| and 'error' in response['value']): |
| raise _ExceptionForStandardResponse(response) |
| return response |
| |
| def ExecuteCommand(self, command, params={}): |
| params['sessionId'] = self._session_id |
| response = self._ExecuteCommand(command, params) |
| return self._UnwrapValue(response['value']) |
| |
| def GetWindowHandles(self): |
| return self.ExecuteCommand(Command.GET_WINDOW_HANDLES) |
| |
| def SwitchToWindow(self, handle_or_name): |
| if self.w3c_compliant: |
| self.ExecuteCommand(Command.SWITCH_TO_WINDOW, {'handle': handle_or_name}) |
| else: |
| self.ExecuteCommand(Command.SWITCH_TO_WINDOW, {'name': handle_or_name}) |
| |
| def GetCurrentWindowHandle(self): |
| return self.ExecuteCommand(Command.GET_CURRENT_WINDOW_HANDLE) |
| |
| def CloseWindow(self): |
| return self.ExecuteCommand(Command.CLOSE) |
| |
| def Load(self, url): |
| self.ExecuteCommand(Command.GET, {'url': url}) |
| |
| def LaunchApp(self, app_id): |
| self.ExecuteCommand(Command.LAUNCH_APP, {'id': app_id}) |
| |
| def ExecuteScript(self, script, *args): |
| converted_args = list(args) |
| return self.ExecuteCommand( |
| Command.EXECUTE_SCRIPT, {'script': script, 'args': converted_args}) |
| |
| def ExecuteAsyncScript(self, script, *args): |
| converted_args = list(args) |
| return self.ExecuteCommand( |
| Command.EXECUTE_ASYNC_SCRIPT, |
| {'script': script, 'args': converted_args}) |
| |
| def SwitchToFrame(self, id_or_name): |
| if isinstance(id_or_name, basestring) and self.w3c_compliant: |
| try: |
| id_or_name = self.FindElement('css selector', |
| '[id="%s"]' % id_or_name) |
| except NoSuchElement: |
| try: |
| id_or_name = self.FindElement('css selector', |
| '[name="%s"]' % id_or_name) |
| except NoSuchElement: |
| raise NoSuchFrame(id_or_name) |
| self.ExecuteCommand(Command.SWITCH_TO_FRAME, {'id': id_or_name}) |
| |
| def SwitchToFrameByIndex(self, index): |
| self.SwitchToFrame(index) |
| |
| def SwitchToMainFrame(self): |
| self.SwitchToFrame(None) |
| |
| def SwitchToParentFrame(self): |
| self.ExecuteCommand(Command.SWITCH_TO_PARENT_FRAME) |
| |
| def GetSessions(self): |
| return self.ExecuteCommand(Command.GET_SESSIONS) |
| |
| def GetTitle(self): |
| return self.ExecuteCommand(Command.GET_TITLE) |
| |
| def GetPageSource(self): |
| return self.ExecuteCommand(Command.GET_PAGE_SOURCE) |
| |
| def FindElement(self, strategy, target): |
| return self.ExecuteCommand( |
| Command.FIND_ELEMENT, {'using': strategy, 'value': target}) |
| |
| def FindElements(self, strategy, target): |
| return self.ExecuteCommand( |
| Command.FIND_ELEMENTS, {'using': strategy, 'value': target}) |
| |
| def GetTimeouts(self): |
| return self.ExecuteCommand(Command.GET_TIMEOUTS) |
| |
| def SetTimeouts(self, params): |
| return self.ExecuteCommand(Command.SET_TIMEOUTS, params) |
| |
| def GetCurrentUrl(self): |
| return self.ExecuteCommand(Command.GET_CURRENT_URL) |
| |
| def GoBack(self): |
| return self.ExecuteCommand(Command.GO_BACK) |
| |
| def GoForward(self): |
| return self.ExecuteCommand(Command.GO_FORWARD) |
| |
| def Refresh(self): |
| return self.ExecuteCommand(Command.REFRESH) |
| |
| def MouseMoveTo(self, element=None, x_offset=None, y_offset=None): |
| params = {} |
| if element is not None: |
| params['element'] = element._id |
| if x_offset is not None: |
| params['xoffset'] = x_offset |
| if y_offset is not None: |
| params['yoffset'] = y_offset |
| self.ExecuteCommand(Command.MOUSE_MOVE_TO, params) |
| |
| def MouseClick(self, button=0): |
| self.ExecuteCommand(Command.MOUSE_CLICK, {'button': button}) |
| |
| def MouseButtonDown(self, button=0): |
| self.ExecuteCommand(Command.MOUSE_BUTTON_DOWN, {'button': button}) |
| |
| def MouseButtonUp(self, button=0): |
| self.ExecuteCommand(Command.MOUSE_BUTTON_UP, {'button': button}) |
| |
| def MouseDoubleClick(self, button=0): |
| self.ExecuteCommand(Command.MOUSE_DOUBLE_CLICK, {'button': button}) |
| |
| def TouchDown(self, x, y): |
| self.ExecuteCommand(Command.TOUCH_DOWN, {'x': x, 'y': y}) |
| |
| def TouchUp(self, x, y): |
| self.ExecuteCommand(Command.TOUCH_UP, {'x': x, 'y': y}) |
| |
| def TouchMove(self, x, y): |
| self.ExecuteCommand(Command.TOUCH_MOVE, {'x': x, 'y': y}) |
| |
| def TouchScroll(self, element, xoffset, yoffset): |
| params = {'element': element._id, 'xoffset': xoffset, 'yoffset': yoffset} |
| self.ExecuteCommand(Command.TOUCH_SCROLL, params) |
| |
| def TouchFlick(self, element, xoffset, yoffset, speed): |
| params = { |
| 'element': element._id, |
| 'xoffset': xoffset, |
| 'yoffset': yoffset, |
| 'speed': speed |
| } |
| self.ExecuteCommand(Command.TOUCH_FLICK, params) |
| |
| def TouchPinch(self, x, y, scale): |
| params = {'x': x, 'y': y, 'scale': scale} |
| self.ExecuteCommand(Command.TOUCH_PINCH, params) |
| |
| def PerformActions(self, actions): |
| """ |
| actions: a dictionary containing the specified actions users wish to perform |
| """ |
| self.ExecuteCommand(Command.PERFORM_ACTIONS, actions) |
| |
| def ReleaseActions(self): |
| self.ExecuteCommand(Command.RELEASE_ACTIONS) |
| |
| def GetCookies(self): |
| return self.ExecuteCommand(Command.GET_COOKIES) |
| |
| def GetNamedCookie(self, name): |
| return self.ExecuteCommand(Command.GET_NAMED_COOKIE, {'name': name}) |
| |
| def AddCookie(self, cookie): |
| self.ExecuteCommand(Command.ADD_COOKIE, {'cookie': cookie}) |
| |
| def DeleteCookie(self, name): |
| self.ExecuteCommand(Command.DELETE_COOKIE, {'name': name}) |
| |
| def DeleteAllCookies(self): |
| self.ExecuteCommand(Command.DELETE_ALL_COOKIES) |
| |
| def IsAlertOpen(self): |
| return self.ExecuteCommand(Command.GET_ALERT) |
| |
| def GetAlertMessage(self): |
| return self.ExecuteCommand(Command.GET_ALERT_TEXT) |
| |
| def HandleAlert(self, accept, prompt_text=''): |
| if prompt_text: |
| self.ExecuteCommand(Command.SET_ALERT_VALUE, {'text': prompt_text}) |
| if accept: |
| cmd = Command.ACCEPT_ALERT |
| else: |
| cmd = Command.DISMISS_ALERT |
| self.ExecuteCommand(cmd) |
| |
| def IsLoading(self): |
| return self.ExecuteCommand(Command.IS_LOADING) |
| |
| def GetWindowPosition(self): |
| position = self.ExecuteCommand(Command.GET_WINDOW_POSITION, |
| {'windowHandle': 'current'}) |
| return [position['x'], position['y']] |
| |
| def SetWindowPosition(self, x, y): |
| self.ExecuteCommand(Command.SET_WINDOW_POSITION, |
| {'windowHandle': 'current', 'x': x, 'y': y}) |
| |
| def GetWindowSize(self): |
| size = self.ExecuteCommand(Command.GET_WINDOW_SIZE, |
| {'windowHandle': 'current'}) |
| return [size['width'], size['height']] |
| |
| def GetWindowRect(self): |
| rect = self.ExecuteCommand(Command.GET_WINDOW_RECT) |
| return [rect['width'], rect['height'], rect['x'], rect['y']] |
| |
| def SetWindowSize(self, width, height): |
| return self.ExecuteCommand( |
| Command.SET_WINDOW_SIZE, |
| {'windowHandle': 'current', 'width': width, 'height': height}) |
| |
| def SetWindowRect(self, width, height, x, y): |
| return self.ExecuteCommand( |
| Command.SET_WINDOW_RECT, |
| {'width': width, 'height': height, 'x': x, 'y': y}) |
| |
| def MaximizeWindow(self): |
| return self.ExecuteCommand(Command.MAXIMIZE_WINDOW, |
| {'windowHandle': 'current'}) |
| |
| def MinimizeWindow(self): |
| return self.ExecuteCommand(Command.MINIMIZE_WINDOW, |
| {'windowHandle': 'current'}) |
| |
| def FullScreenWindow(self): |
| return self.ExecuteCommand(Command.FULLSCREEN_WINDOW) |
| |
| def TakeScreenshot(self): |
| return self.ExecuteCommand(Command.SCREENSHOT) |
| |
| def Quit(self): |
| """Quits the browser and ends the session.""" |
| self.ExecuteCommand(Command.QUIT) |
| |
| def GetLog(self, type): |
| return self.ExecuteCommand(Command.GET_LOG, {'type': type}) |
| |
| def GetAvailableLogTypes(self): |
| return self.ExecuteCommand(Command.GET_AVAILABLE_LOG_TYPES) |
| |
| def IsAutoReporting(self): |
| return self.ExecuteCommand(Command.IS_AUTO_REPORTING) |
| |
| def SetAutoReporting(self, enabled): |
| self.ExecuteCommand(Command.SET_AUTO_REPORTING, {'enabled': enabled}) |
| |
| def SetNetworkConditions(self, latency, download_throughput, |
| upload_throughput, offline=False): |
| # Until http://crbug.com/456324 is resolved, we'll always set 'offline' to |
| # False, as going "offline" will sever Chromedriver's connection to Chrome. |
| params = { |
| 'network_conditions': { |
| 'offline': offline, |
| 'latency': latency, |
| 'download_throughput': download_throughput, |
| 'upload_throughput': upload_throughput |
| } |
| } |
| self.ExecuteCommand(Command.SET_NETWORK_CONDITIONS, params) |
| |
| def SetNetworkConditionsName(self, network_name): |
| self.ExecuteCommand( |
| Command.SET_NETWORK_CONDITIONS, {'network_name': network_name}) |
| |
| def GetNetworkConditions(self): |
| conditions = self.ExecuteCommand(Command.GET_NETWORK_CONDITIONS) |
| return { |
| 'latency': conditions['latency'], |
| 'download_throughput': conditions['download_throughput'], |
| 'upload_throughput': conditions['upload_throughput'], |
| 'offline': conditions['offline'] |
| } |
| |
| def GetNetworkConnection(self): |
| return self.ExecuteCommand(Command.GET_NETWORK_CONNECTION) |
| |
| def DeleteNetworkConditions(self): |
| self.ExecuteCommand(Command.DELETE_NETWORK_CONDITIONS) |
| |
| def SetNetworkConnection(self, connection_type): |
| params = {'parameters': {'type': connection_type}} |
| return self.ExecuteCommand(Command.SET_NETWORK_CONNECTION, params) |
| |
| def SendCommand(self, cmd, cmd_params): |
| params = {'parameters': {'cmd': cmd, 'params': cmd_params}}; |
| return self.ExecuteCommand(Command.SEND_COMMAND, params) |
| |
| def SendCommandAndGetResult(self, cmd, cmd_params): |
| params = {'cmd': cmd, 'params': cmd_params}; |
| return self.ExecuteCommand(Command.SEND_COMMAND_AND_GET_RESULT, params) |
| |
| def GetScreenOrientation(self): |
| screen_orientation = self.ExecuteCommand(Command.GET_SCREEN_ORIENTATION) |
| return { |
| 'orientation': screen_orientation['orientation'] |
| } |
| |
| def SetScreenOrientation(self, orientation_type): |
| params = {'parameters': {'orientation': orientation_type}} |
| self.ExecuteCommand(Command.SET_SCREEN_ORIENTATION, params) |
| |
| def DeleteScreenOrientationLock(self): |
| self.ExecuteCommand(Command.DELETE_SCREEN_ORIENTATION) |
| |
| def SendKeys(self, *values): |
| typing = [] |
| for value in values: |
| if isinstance(value, int): |
| value = str(value) |
| for i in range(len(value)): |
| typing.append(value[i]) |
| self.ExecuteCommand(Command.SEND_KEYS_TO_ACTIVE_ELEMENT, {'value': typing}) |
| |
| def GenerateTestReport(self, message): |
| self.ExecuteCommand(Command.GENERATE_TEST_REPORT, {'message': message}) |