blob: bd0daee03f9a05928fe28a4ac85d2d20a204e6a5 [file] [log] [blame]
import json
import os
import urlparse
import re
import webdriver
from support.asserts import assert_error
from support.http_request import HTTPRequest
from support import merge_dictionaries
default_host = "http://127.0.0.1"
default_port = "4444"
def _ensure_valid_window(session):
"""If current window is not open anymore, ensure to have a valid one selected."""
try:
session.window_handle
except webdriver.NoSuchWindowException:
session.window_handle = session.handles[0]
def _dismiss_user_prompts(session):
"""Dismisses any open user prompts in windows."""
current_window = session.window_handle
for window in _windows(session):
session.window_handle = window
try:
session.alert.dismiss()
except webdriver.NoSuchAlertException:
pass
session.window_handle = current_window
def _restore_windows(session):
"""Closes superfluous windows opened by the test without ending
the session implicitly by closing the last window.
"""
current_window = session.window_handle
for window in _windows(session, exclude=[current_window]):
session.window_handle = window
if len(session.handles) > 1:
session.close()
session.window_handle = current_window
def _switch_to_top_level_browsing_context(session):
"""If the current browsing context selected by WebDriver is a
`<frame>` or an `<iframe>`, switch it back to the top-level
browsing context.
"""
session.switch_frame(None)
def _windows(session, exclude=None):
"""Set of window handles, filtered by an `exclude` list if
provided.
"""
if exclude is None:
exclude = []
wins = [w for w in session.handles if w not in exclude]
return set(wins)
def create_frame(session):
"""Create an `iframe` element in the current browsing context and insert it
into the document. Return a reference to the newly-created element."""
def create_frame():
append = """
var frame = document.createElement('iframe');
document.body.appendChild(frame);
return frame;
"""
return session.execute_script(append)
return create_frame
def create_window(session):
"""Open new window and return the window handle."""
def create_window():
windows_before = session.handles
name = session.execute_script("window.open()")
assert len(session.handles) == len(windows_before) + 1
new_windows = list(set(session.handles) - set(windows_before))
return new_windows.pop()
return create_window
def http(session):
return HTTPRequest(session.transport.host, session.transport.port)
def server_config():
return json.loads(os.environ.get("WD_SERVER_CONFIG"))
def configuration():
host = os.environ.get("WD_HOST", default_host)
port = int(os.environ.get("WD_PORT", default_port))
capabilities = json.loads(os.environ.get("WD_CAPABILITIES", "{}"))
return {
"host": host,
"port": port,
"capabilities": capabilities
}
_current_session = None
def session(configuration, request):
"""Create and start a session for a test that does not itself test session creation.
By default the session will stay open after each test, but we always try to start a
new one and assume that if that fails there is already a valid session. This makes it
possible to recover from some errors that might leave the session in a bad state, but
does not demand that we start a new session per test."""
global _current_session
if _current_session is None:
_current_session = webdriver.Session(configuration["host"],
configuration["port"],
capabilities={"alwaysMatch": configuration["capabilities"]})
try:
_current_session.start()
except webdriver.errors.SessionNotCreatedException:
if not _current_session.session_id:
raise
# finalisers are popped off a stack,
# making their ordering reverse
request.addfinalizer(lambda: _switch_to_top_level_browsing_context(_current_session))
request.addfinalizer(lambda: _restore_windows(_current_session))
request.addfinalizer(lambda: _dismiss_user_prompts(_current_session))
request.addfinalizer(lambda: _ensure_valid_window(_current_session))
return _current_session
def new_session(configuration, request):
"""Return a factory function that will attempt to start a session with a given body.
This is intended for tests that are themselves testing new session creation, and the
session created is closed at the end of the test."""
def end():
global _current_session
if _current_session is not None and _current_session.session_id:
_current_session.end()
_current_session = None
def create_session(body):
global _current_session
_session = webdriver.Session(configuration["host"],
configuration["port"],
capabilities=None)
# TODO: merge in some capabilities from the confguration capabilities
# since these might be needed to start the browser
value = _session.send_command("POST", "session", body=body)
# Don't set the global session until we are sure this succeeded
_current_session = _session
_session.session_id = value["sessionId"]
return value, _current_session
end()
request.addfinalizer(end)
return create_session
def url(server_config):
def inner(path, query="", fragment=""):
rv = urlparse.urlunsplit(("http",
"%s:%s" % (server_config["host"],
server_config["ports"]["http"][0]),
path,
query,
fragment))
return rv
return inner
def create_dialog(session):
"""Create a dialog (one of "alert", "prompt", or "confirm") and provide a
function to validate that the dialog has been "handled" (either accepted or
dismissed) by returning some value."""
def create_dialog(dialog_type, text=None, result_var=None):
assert dialog_type in ("alert", "confirm", "prompt"), (
"Invalid dialog type: '%s'" % dialog_type)
if text is None:
text = ""
assert isinstance(text, basestring), "`text` parameter must be a string"
if result_var is None:
result_var = "__WEBDRIVER"
assert re.search(r"^[_$a-z$][_$a-z0-9]*$", result_var, re.IGNORECASE), (
'The `result_var` must be a valid JavaScript identifier')
# Script completion and modal summoning are scheduled on two separate
# turns of the event loop to ensure that both occur regardless of how
# the user agent manages script execution.
spawn = """
var done = arguments[0];
setTimeout(done, 0);
setTimeout(function() {{
window.{0} = window.{1}("{2}");
}}, 0);
""".format(result_var, dialog_type, text)
session.send_session_command("POST",
"execute/async",
{"script": spawn, "args": []})
return create_dialog