blob: 85bf699ecd2adaa857878cfe1c7a69bf09a9c5f6 [file] [log] [blame]
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import sys
import json
from command_executor import CommandExecutor
_THIS_DIR = os.path.abspath(os.path.dirname(__file__))
_PARENT_DIR = os.path.join(_THIS_DIR, os.pardir)
sys.path.insert(1, _PARENT_DIR)
import chrome_paths
sys.path.remove(_PARENT_DIR)
sys.path.insert(0,os.path.join(chrome_paths.GetSrc(), 'third_party',
'catapult', 'telemetry', 'third_party',
'websocket-client'))
import websocket
class WebSocketCommands:
CREATE_WEBSOCKET = \
'/session/:sessionId/chromium/create_websocket'
SEND_OVER_WEBSOCKET = \
'/session/:sessionId/chromium/send_command_from_websocket'
class WebSocketConnection(object):
def __init__(self, server_url, session_id):
self._server_url = server_url.replace('http', 'ws')
self._session_id = session_id
self._command_id = -1
cmd_params = {'sessionId': session_id}
path = CommandExecutor.CreatePath(
WebSocketCommands.CREATE_WEBSOCKET, cmd_params)
self._websocket = websocket.create_connection(self._server_url + path)
def SendCommand(self, cmd_params):
cmd_params['id'] = self._command_id
self._command_id -= 1
self._websocket.send(json.dumps(cmd_params))