blob: 6a5ed106e6a020527af8bf6a7c30603c5cb7034f [file] [log] [blame]
# Copyright (c) 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Library handling DevTools websocket interaction.
"""
import datetime
import httplib
import json
import logging
import os
import sys
import time
file_dir = os.path.dirname(__file__)
sys.path.append(os.path.join(file_dir, '..', '..', 'perf'))
from core import path_util
sys.path.append(path_util.GetTelemetryDir())
from telemetry.internal.backends.chrome_inspector import inspector_websocket
from telemetry.internal.backends.chrome_inspector import websocket
import common_util
DEFAULT_TIMEOUT_SECONDS = 10
_WEBSOCKET_TIMEOUT_SECONDS = 10
class DevToolsConnectionException(Exception):
def __init__(self, message):
super(DevToolsConnectionException, self).__init__(message)
logging.warning("DevToolsConnectionException: " + message)
class DevToolsConnectionTargetCrashed(DevToolsConnectionException):
pass
# Taken from telemetry.internal.backends.chrome_inspector.tracing_backend.
# TODO(mattcary): combine this with the above and export?
class _StreamReader(object):
def __init__(self, inspector, stream_handle):
self._inspector_websocket = inspector
self._handle = stream_handle
self._callback = None
self._data = None
def Read(self, callback):
# Do not allow the instance of this class to be reused, as
# we only read data sequentially at the moment, so a stream
# can only be read once.
assert not self._callback
self._data = []
self._callback = callback
self._ReadChunkFromStream()
# Queue one extra read ahead to avoid latency.
self._ReadChunkFromStream()
def _ReadChunkFromStream(self):
# Limit max block size to avoid fragmenting memory in sock.recv(),
# (see https://github.com/liris/websocket-client/issues/163 for details)
req = {'method': 'IO.read', 'params': {
'handle': self._handle, 'size': 32768}}
self._inspector_websocket.AsyncRequest(req, self._GotChunkFromStream)
def _GotChunkFromStream(self, response):
# Quietly discard responses from reads queued ahead after EOF.
if self._data is None:
return
if 'error' in response:
raise DevToolsConnectionException(
'Reading trace failed: %s' % response['error']['message'])
result = response['result']
self._data.append(result['data'])
if not result.get('eof', False):
self._ReadChunkFromStream()
return
req = {'method': 'IO.close', 'params': {'handle': self._handle}}
self._inspector_websocket.SendAndIgnoreResponse(req)
trace_string = ''.join(self._data)
self._data = None
self._callback(trace_string)
class DevToolsConnection(object):
"""Handles the communication with a DevTools server.
"""
TRACING_DOMAIN = 'Tracing'
TRACING_END_METHOD = 'Tracing.end'
TRACING_DATA_METHOD = 'Tracing.dataCollected'
TRACING_DONE_EVENT = 'Tracing.tracingComplete'
TRACING_STREAM_EVENT = 'Tracing.tracingComplete' # Same as TRACING_DONE.
TRACING_TIMEOUT = 300
HTTP_ATTEMPTS = 10
HTTP_ATTEMPT_INTERVAL_SECONDS = 0.1
def __init__(self, hostname, port):
"""Initializes the connection with a DevTools server.
Args:
hostname: server hostname.
port: port number.
"""
self._http_hostname = hostname
self._http_port = port
self._event_listeners = {}
self._domain_listeners = {}
self._scoped_states = {}
self._domains_to_enable = set()
self._tearing_down_tracing = False
self._ws = None
self._target_descriptor = None
self._stop_delay_multiplier = 0
self._monitoring_start_timestamp = None
self._monitoring_stop_timestamp = None
self._Connect()
self.RegisterListener('Inspector.targetCrashed', self)
def RegisterListener(self, name, listener):
"""Registers a listener for an event.
Also takes care of enabling the relevant domain before starting monitoring.
Args:
name: (str) Domain or event the listener wants to listen to, e.g.
"Network.requestWillBeSent" or "Tracing".
listener: (Listener) listener instance.
"""
if '.' in name:
domain = name[:name.index('.')]
self._event_listeners[name] = listener
else:
domain = name
self._domain_listeners[domain] = listener
self._domains_to_enable.add(domain)
def UnregisterListener(self, listener):
"""Unregisters a listener.
Args:
listener: (Listener) listener to unregister.
"""
keys = ([k for k, l in self._event_listeners if l is listener] +
[k for k, l in self._domain_listeners if l is listener])
assert keys, "Removing non-existent listener"
for key in keys:
if key in self._event_listeners:
del(self._event_listeners[key])
if key in self._domain_listeners:
del(self._domain_listeners[key])
def SetScopedState(self, method, params, default_params, enable_domain):
"""Changes state at the beginning the monitoring and resets it at the end.
|method| is called with |params| at the beginning of the monitoring. After
the monitoring completes, the state is reset by calling |method| with
|default_params|.
Args:
method: (str) Method.
params: (dict) Parameters to set when the monitoring starts.
default_params: (dict) Parameters to reset the state at the end.
enable_domain: (bool) True if enabling the domain is required.
"""
if enable_domain:
if '.' in method:
domain = method[:method.index('.')]
assert domain, 'No valid domain'
self._domains_to_enable.add(domain)
scoped_state_value = (params, default_params)
if self._scoped_states.has_key(method):
assert self._scoped_states[method] == scoped_state_value
else:
self._scoped_states[method] = scoped_state_value
def SyncRequest(self, method, params=None):
"""Issues a synchronous request to the DevTools server.
Args:
method: (str) Method.
params: (dict) Optional parameters to the request.
Returns:
The answer.
"""
request = {'method': method}
if params:
request['params'] = params
return self._ws.SyncRequest(request, timeout=_WEBSOCKET_TIMEOUT_SECONDS)
def SendAndIgnoreResponse(self, method, params=None):
"""Issues a request to the DevTools server, do not wait for the response.
Args:
method: (str) Method.
params: (dict) Optional parameters to the request.
"""
request = {'method': method}
if params:
request['params'] = params
self._ws.SendAndIgnoreResponse(request)
def SyncRequestNoResponse(self, method, params=None):
"""As SyncRequest, but asserts that no meaningful response was received.
Args:
method: (str) Method.
params: (dict) Optional parameters to the request.
"""
result = self.SyncRequest(method, params)
if 'error' in result or ('result' in result and
result['result']):
raise DevToolsConnectionException(
'Unexpected response for %s: %s' % (method, result))
def ClearCache(self):
"""Clears buffer cache.
Will assert that the browser supports cache clearing.
"""
res = self.SyncRequest('Network.canClearBrowserCache')
assert res['result'], 'Cache clearing is not supported by this browser.'
self.SyncRequest('Network.clearBrowserCache')
def MonitorUrl(self, url, timeout_seconds=DEFAULT_TIMEOUT_SECONDS,
stop_delay_multiplier=0):
"""Navigate to url and dispatch monitoring loop.
Unless you have registered a listener that will call StopMonitoring, this
will run until timeout from chrome.
Args:
url: (str) a URL to navigate to before starting monitoring loop.
timeout_seconds: timeout in seconds for monitoring loop.
stop_delay_multiplier: (float) How long to wait after page load completed
before tearing down, relative to the time it took to reach the page load
to complete.
"""
for domain in self._domains_to_enable:
self._ws.RegisterDomain(domain, self._OnDataReceived)
if domain != self.TRACING_DOMAIN:
self.SyncRequestNoResponse('%s.enable' % domain)
# Tracing setup must be done by the tracing track to control filtering
# and output.
for scoped_state in self._scoped_states:
self.SyncRequestNoResponse(scoped_state,
self._scoped_states[scoped_state][0])
self._tearing_down_tracing = False
logging.info('Navigate to %s' % url)
self.SendAndIgnoreResponse('Page.navigate', {'url': url})
self._monitoring_start_timestamp = datetime.datetime.now()
self._Dispatch(timeout=timeout_seconds,
stop_delay_multiplier=stop_delay_multiplier)
self._monitoring_start_timestamp = None
logging.info('Tearing down monitoring.')
self._TearDownMonitoring()
def StopMonitoring(self):
"""Sets the timestamp when to stop monitoring.
Args:
address_delayed_stop: Whether the MonitorUrl()'s stop_delay_multiplier
should be addressed or not.
"""
if self._stop_delay_multiplier == 0:
self._StopMonitoringImmediately()
elif self._monitoring_stop_timestamp is None:
assert self._monitoring_start_timestamp is not None
current_time = datetime.datetime.now()
stop_delay_duration_seconds = self._stop_delay_multiplier * (
current_time - self._monitoring_start_timestamp).seconds
logging.info('Delaying monitoring stop for %.1fs',
stop_delay_duration_seconds)
self._monitoring_stop_timestamp = (
current_time + datetime.timedelta(
seconds=stop_delay_duration_seconds))
def ExecuteJavaScript(self, expression):
"""Run JavaScript expression.
Args:
expression: JavaScript expression to run.
Returns:
The return value from the JavaScript expression.
"""
# Note: Clients may be tempted to do naive string interpolation to inject
# Python values into the JavaScript expression, which could lead to syntax
# errors during evaluation (e.g. injecting strings with special characters).
# If this becomes an issue, consider extending the interface of this method
# as in: https://github.com/catapult-project/catapult/issues/3028
response = self.SyncRequest('Runtime.evaluate', {
'expression': expression,
'returnByValue': True})
if 'error' in response:
raise Exception(response['error']['message'])
if 'wasThrown' in response['result'] and response['result']['wasThrown']:
raise Exception(response['error']['result']['description'])
if response['result']['result']['type'] == 'undefined':
return None
return response['result']['result']['value']
def PollForJavaScriptExpression(self, expression, interval):
"""Wait until JavaScript expression is true.
Args:
expression: JavaScript expression to run.
interval: Period between expression evaluation in seconds.
"""
common_util.PollFor(lambda: bool(self.ExecuteJavaScript(expression)),
'JavaScript: {}'.format(expression),
interval)
def Close(self):
"""Cleanly close chrome by closing the only tab."""
assert self._ws
response = self._HttpRequest('/close/' + self._target_descriptor['id'])
assert response == 'Target is closing'
self._ws = None
def _StopMonitoringImmediately(self):
self._monitoring_stop_timestamp = datetime.datetime.now()
def _Dispatch(self, timeout, kind='Monitoring', stop_delay_multiplier=0):
self._monitoring_stop_timestamp = None
self._stop_delay_multiplier = stop_delay_multiplier
while True:
try:
self._ws.DispatchNotifications(timeout=timeout)
except websocket.WebSocketTimeoutException:
if self._monitoring_stop_timestamp is None:
logging.warning('%s stopped on a timeout.' % kind)
break
if self._monitoring_stop_timestamp:
# After the first timeout reduce the timeout to check when to stop
# monitoring more often, because the page at this moment can already be
# loaded and not many events would be arriving from it.
timeout = 1
if datetime.datetime.now() >= self._monitoring_stop_timestamp:
break
def Handle(self, method, event):
del event # unused
if method == 'Inspector.targetCrashed':
raise DevToolsConnectionTargetCrashed('Renderer crashed.')
def _TearDownMonitoring(self):
if self.TRACING_DOMAIN in self._domains_to_enable:
logging.info('Fetching tracing')
self.SyncRequestNoResponse(self.TRACING_END_METHOD)
self._tearing_down_tracing = True
self._Dispatch(timeout=self.TRACING_TIMEOUT, kind='Tracing')
for scoped_state in self._scoped_states:
self.SyncRequestNoResponse(scoped_state,
self._scoped_states[scoped_state][1])
for domain in self._domains_to_enable:
if domain != self.TRACING_DOMAIN:
self.SyncRequest('%s.disable' % domain)
self._ws.UnregisterDomain(domain)
self._domains_to_enable.clear()
self._domain_listeners.clear()
self._event_listeners.clear()
self._scoped_states.clear()
def _OnDataReceived(self, msg):
if 'method' not in msg:
raise DevToolsConnectionException('Malformed message: %s' % msg)
method = msg['method']
domain = method[:method.index('.')]
if self._tearing_down_tracing and method == self.TRACING_STREAM_EVENT:
stream_handle = msg.get('params', {}).get('stream')
if not stream_handle:
self._tearing_down_tracing = False
self._StopMonitoringImmediately()
# Fall through to regular dispatching.
else:
_StreamReader(self._ws, stream_handle).Read(self._TracingStreamDone)
# Skip regular dispatching.
return
if (method not in self._event_listeners and
domain not in self._domain_listeners):
return
if method in self._event_listeners:
self._event_listeners[method].Handle(method, msg)
if domain in self._domain_listeners:
self._domain_listeners[domain].Handle(method, msg)
if self._tearing_down_tracing and method == self.TRACING_DONE_EVENT:
self._tearing_down_tracing = False
self._StopMonitoringImmediately()
def _TracingStreamDone(self, data):
tracing_events = json.loads(data)
for evt in tracing_events:
self._OnDataReceived({'method': self.TRACING_DATA_METHOD,
'params': {'value': [evt]}})
if self._please_stop:
break
self._tearing_down_tracing = False
self._StopMonitoringImmediately()
def _HttpRequest(self, path):
assert path[0] == '/'
for _ in xrange(self.HTTP_ATTEMPTS):
r = httplib.HTTPConnection(self._http_hostname, self._http_port)
try:
r.request('GET', '/json' + path)
response = r.getresponse()
if response.status != 200:
raise DevToolsConnectionException(
'Cannot connect to DevTools, reponse code %d' % response.status)
return response.read()
except httplib.BadStatusLine as exception:
logging.warning('Devtools HTTP connection failed: %s' % repr(exception))
time.sleep(self.HTTP_ATTEMPT_INTERVAL_SECONDS)
finally:
r.close()
# Raise the exception that has failed the last attempt.
raise
def _Connect(self):
assert not self._ws
assert not self._target_descriptor
for target_descriptor in json.loads(self._HttpRequest('/list')):
if target_descriptor['type'] == 'page':
self._target_descriptor = target_descriptor
break
if not self._target_descriptor:
raise DevToolsConnectionException(
'No pages are open, connected to a wrong instance?')
if self._target_descriptor['url'] != 'about:blank':
raise DevToolsConnectionException(
'Looks like devtools connection was made to a different instance.')
self._ws = inspector_websocket.InspectorWebsocket()
self._ws.Connect(self._target_descriptor['webSocketDebuggerUrl'],
timeout=_WEBSOCKET_TIMEOUT_SECONDS)
class Listener(object):
"""Listens to events forwarded by a DevToolsConnection instance."""
def __init__(self, connection):
"""Initializes a Listener instance.
Args:
connection: (DevToolsConnection).
"""
pass
def Handle(self, method, msg):
"""Handles an event this instance listens for.
Args:
event_name: (str) Event name, as registered.
event: (dict) complete event.
"""
raise NotImplementedError
class Track(Listener):
"""Collects data from a DevTools server."""
def GetEvents(self):
"""Returns a list of collected events, finalizing the state if necessary."""
raise NotImplementedError
def ToJsonDict(self):
"""Serializes to a dictionary, to be dumped as JSON.
Returns:
A dict that can be dumped by the json module, and loaded by
FromJsonDict().
"""
raise NotImplementedError
@classmethod
def FromJsonDict(cls, _json_dict):
"""Returns a Track instance constructed from data dumped by
Track.ToJsonDict().
Args:
json_data: (dict) Parsed from a JSON file using the json module.
Returns:
a Track instance.
"""
# There is no sensible way to deserialize this abstract class, but
# subclasses are not required to define a deserialization method. For
# example, for testing we have a FakeRequestTrack which is never
# deserialized; instead fake instances are deserialized as RequestTracks.
assert False