| // Licensed to the Software Freedom Conservancy (SFC) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The SFC licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| const { EventEmitter } = require('node:events') |
| const WebSocket = require('ws') |
| |
| const RESPONSE_TIMEOUT = 1000 * 30 |
| |
| class Index extends EventEmitter { |
| id = 0 |
| connected = false |
| events = [] |
| browsingContexts = [] |
| |
| /** |
| * Create a new websocket connection |
| * @param _webSocketUrl |
| */ |
| constructor(_webSocketUrl) { |
| super() |
| this.connected = false |
| this._closed = false |
| this._pending = new Map() |
| this._connectWaiters = new Set() |
| this._ws = new WebSocket(_webSocketUrl) |
| this._ws.on('open', () => { |
| // The handshake can complete after close()/_failPending() has already |
| // marked the connection closed. Don't flip connected back to true and |
| // proactively close the now-orphan socket so it does not leak. |
| if (this._closed) { |
| try { |
| this._ws.close() |
| } catch { |
| /* socket already closing */ |
| } |
| return |
| } |
| this.connected = true |
| for (const { resolve } of this._connectWaiters) { |
| resolve() |
| } |
| this._connectWaiters.clear() |
| }) |
| // Single shared response dispatcher. Avoids attaching a new 'message' |
| // listener for every in-flight send(), which previously caused |
| // MaxListenersExceededWarning under concurrent BiDi traffic |
| // (e.g. network interception during a page navigation). |
| this._ws.on('message', (data) => { |
| // Frames can arrive after close() has cleared _pending; ignore them |
| // rather than re-emitting parse errors or dispatching to nothing. |
| if (this._closed) { |
| return |
| } |
| let payload |
| try { |
| payload = JSON.parse(data.toString()) |
| } catch (err) { |
| // Surface protocol parse failures rather than silently dropping — |
| // otherwise callers see misleading send() timeouts. |
| const wrapped = new Error(`Failed to parse BiDi message: ${err.message}`) |
| if (this.listenerCount('error') > 0) { |
| this.emit('error', wrapped) |
| } else { |
| process.emitWarning(wrapped.message, 'BiDiProtocolWarning') |
| } |
| return |
| } |
| // Messages without a numeric id are BiDi events, not command |
| // responses; they are routed via subscribe/EventEmitter elsewhere |
| // and intentionally ignored by this dispatcher. |
| if (payload == null || typeof payload.id !== 'number') { |
| return |
| } |
| const entry = this._pending.get(payload.id) |
| if (entry === undefined) { |
| return |
| } |
| clearTimeout(entry.timeoutId) |
| this._pending.delete(payload.id) |
| entry.resolve(payload) |
| }) |
| // Fail any in-flight send() calls promptly when the peer disconnects |
| // or the socket errors, instead of waiting for RESPONSE_TIMEOUT. |
| this._ws.on('close', () => { |
| this._failPending(new Error('BiDi connection closed unexpectedly')) |
| }) |
| this._ws.on('error', (err) => { |
| this._failPending(new Error(`BiDi connection error: ${err.message}`)) |
| }) |
| } |
| |
| /** |
| * Reject any in-flight sends and mark the connection failed. Idempotent so |
| * that close() and the underlying 'close'/'error' events do not double-reject. |
| * @param {Error} error |
| * @private |
| */ |
| _failPending(error) { |
| if (this._closed) { |
| return |
| } |
| this._closed = true |
| this.connected = false |
| for (const { reject, timeoutId } of this._pending.values()) { |
| clearTimeout(timeoutId) |
| reject(error) |
| } |
| this._pending.clear() |
| // Reject any callers parked in waitForConnection() so close() (or an |
| // unexpected disconnect) cannot leave them hanging forever. |
| for (const { reject } of this._connectWaiters) { |
| reject(error) |
| } |
| this._connectWaiters.clear() |
| } |
| |
| /** |
| * @returns {WebSocket} |
| */ |
| get socket() { |
| return this._ws |
| } |
| |
| /** |
| * @returns {boolean|*} |
| */ |
| get isConnected() { |
| return this.connected |
| } |
| |
| /** |
| * Get Bidi Status |
| * @returns {Promise<*>} |
| */ |
| get status() { |
| return this.send({ |
| method: 'session.status', |
| params: {}, |
| }) |
| } |
| |
| /** |
| * Resolve connection |
| * @returns {Promise<unknown>} |
| */ |
| async waitForConnection() { |
| return new Promise((resolve, reject) => { |
| if (this._closed) { |
| reject(new Error('BiDi connection is closed')) |
| return |
| } |
| if (this.connected) { |
| resolve() |
| return |
| } |
| // Park the waiter in a Set so the constructor's 'open' handler can |
| // resolve it and _failPending() can reject it. Avoids attaching socket |
| // listeners that close()'s removeAllListeners('close') would strip. |
| this._connectWaiters.add({ resolve, reject }) |
| }) |
| } |
| |
| /** |
| * Sends a bidi request |
| * @param params |
| * @returns {Promise<unknown>} |
| */ |
| async send(params) { |
| if (this._closed) { |
| throw new Error('BiDi connection is closed') |
| } |
| if (!this.connected) { |
| await this.waitForConnection() |
| } |
| // Defense in depth: even after waitForConnection() resolves, the socket |
| // may have transitioned to CLOSING/CLOSED (e.g. caller closed the raw |
| // socket). Refuse rather than throwing from inside ws.send(). |
| if (this._ws.readyState !== WebSocket.OPEN) { |
| throw new Error('BiDi connection is not open') |
| } |
| |
| const id = ++this.id |
| |
| this._ws.send(JSON.stringify({ id, ...params })) |
| |
| return new Promise((resolve, reject) => { |
| const timeoutId = setTimeout(() => { |
| this._pending.delete(id) |
| reject(new Error(`Request with id ${id} timed out`)) |
| }, RESPONSE_TIMEOUT) |
| |
| this._pending.set(id, { resolve, reject, timeoutId }) |
| }) |
| } |
| |
| /** |
| * Subscribe to events |
| * @param events |
| * @param browsingContexts |
| * @returns {Promise<void>} |
| */ |
| async subscribe(events, browsingContexts) { |
| function toArray(arg) { |
| if (arg === undefined) { |
| return [] |
| } |
| |
| return Array.isArray(arg) ? [...arg] : [arg] |
| } |
| |
| const eventsArray = toArray(events) |
| const contextsArray = toArray(browsingContexts) |
| |
| const params = { |
| method: 'session.subscribe', |
| params: {}, |
| } |
| |
| if (eventsArray.length && eventsArray.some((event) => typeof event !== 'string')) { |
| throw new TypeError('events should be string or string array') |
| } |
| |
| if (contextsArray.length && contextsArray.some((context) => typeof context !== 'string')) { |
| throw new TypeError('browsingContexts should be string or string array') |
| } |
| |
| if (eventsArray.length) { |
| params.params.events = eventsArray |
| } |
| |
| if (contextsArray.length) { |
| params.params.contexts = contextsArray |
| } |
| |
| this.events.push(...eventsArray) |
| |
| await this.send(params) |
| } |
| |
| /** |
| * Unsubscribe to events |
| * @param events |
| * @param browsingContexts |
| * @returns {Promise<void>} |
| */ |
| async unsubscribe(events, browsingContexts) { |
| const eventsToRemove = typeof events === 'string' ? [events] : events |
| |
| // Check if the eventsToRemove are in the subscribed events array |
| // Filter out events that are not in this.events before filtering |
| const existingEvents = eventsToRemove.filter((event) => this.events.includes(event)) |
| |
| // Remove the events from the subscribed events array |
| this.events = this.events.filter((event) => !existingEvents.includes(event)) |
| |
| if (typeof browsingContexts === 'string') { |
| this.browsingContexts.pop() |
| } else if (Array.isArray(browsingContexts)) { |
| this.browsingContexts = this.browsingContexts.filter((id) => !browsingContexts.includes(id)) |
| } |
| |
| if (existingEvents.length === 0) { |
| return |
| } |
| const params = { |
| method: 'session.unsubscribe', |
| params: { |
| events: existingEvents, |
| }, |
| } |
| |
| if (this.browsingContexts.length > 0) { |
| params.params.contexts = this.browsingContexts |
| } |
| |
| await this.send(params) |
| } |
| |
| /** |
| * Close ws connection. |
| * @returns {Promise<unknown>} |
| */ |
| close() { |
| this._failPending(new Error('BiDi connection closed before response was received')) |
| |
| const closeWebSocket = (callback) => { |
| // don't close if it's already closed |
| if (this._ws.readyState === 3) { |
| callback() |
| } else { |
| // don't notify on user-initiated shutdown ('disconnect' event) |
| this._ws.removeAllListeners('close') |
| this._ws.once('close', () => { |
| this._ws.removeAllListeners() |
| callback() |
| }) |
| this._ws.close() |
| } |
| } |
| return new Promise((fulfill, _) => { |
| closeWebSocket(fulfill) |
| }) |
| } |
| } |
| |
| /** |
| * API |
| * @type {function(*): Promise<Index>} |
| */ |
| module.exports = Index |