blob: 73c1e7125e2a29a22aedfa6f291e128ec39b8e17 [file] [log] [blame]
/*
* Copyright (C) 2011, 2012 Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "modules/websockets/WorkerWebSocketChannel.h"
#include <memory>
#include "core/dom/DOMArrayBuffer.h"
#include "core/dom/ExecutionContext.h"
#include "core/fileapi/Blob.h"
#include "core/loader/ThreadableLoadingContext.h"
#include "core/workers/WorkerGlobalScope.h"
#include "core/workers/WorkerLoaderProxy.h"
#include "core/workers/WorkerThread.h"
#include "modules/websockets/DocumentWebSocketChannel.h"
#include "platform/CrossThreadFunctional.h"
#include "platform/WaitableEvent.h"
#include "platform/heap/SafePoint.h"
#include "platform/wtf/Assertions.h"
#include "platform/wtf/Functional.h"
#include "platform/wtf/PtrUtil.h"
#include "platform/wtf/text/CString.h"
#include "platform/wtf/text/WTFString.h"
#include "public/platform/Platform.h"
namespace blink {
typedef WorkerWebSocketChannel::Bridge Bridge;
typedef WorkerWebSocketChannel::Peer Peer;
// Created and destroyed on the worker thread. All setters of this class are
// called on the main thread, while all getters are called on the worker
// thread. signalWorkerThread() must be called before any getters are called.
class WebSocketChannelSyncHelper {
public:
WebSocketChannelSyncHelper() {}
~WebSocketChannelSyncHelper() {}
// All setters are called on the main thread.
void SetConnectRequestResult(bool connect_request_result) {
DCHECK(IsMainThread());
connect_request_result_ = connect_request_result;
}
// All getters are called on the worker thread.
bool ConnectRequestResult() const {
DCHECK(!IsMainThread());
return connect_request_result_;
}
// This should be called after all setters are called and before any
// getters are called.
void SignalWorkerThread() {
DCHECK(IsMainThread());
event_.Signal();
}
void Wait() {
DCHECK(!IsMainThread());
event_.Wait();
}
private:
WaitableEvent event_;
bool connect_request_result_ = false;
};
WorkerWebSocketChannel::WorkerWebSocketChannel(
WorkerGlobalScope& worker_global_scope,
WebSocketChannelClient* client,
std::unique_ptr<SourceLocation> location)
: bridge_(new Bridge(client, worker_global_scope)),
location_at_connection_(std::move(location)) {}
WorkerWebSocketChannel::~WorkerWebSocketChannel() {
DCHECK(!bridge_);
}
bool WorkerWebSocketChannel::Connect(const KURL& url, const String& protocol) {
DCHECK(bridge_);
return bridge_->Connect(location_at_connection_->Clone(), url, protocol);
}
void WorkerWebSocketChannel::Send(const CString& message) {
DCHECK(bridge_);
bridge_->Send(message);
}
void WorkerWebSocketChannel::Send(const DOMArrayBuffer& binary_data,
unsigned byte_offset,
unsigned byte_length) {
DCHECK(bridge_);
bridge_->Send(binary_data, byte_offset, byte_length);
}
void WorkerWebSocketChannel::Send(PassRefPtr<BlobDataHandle> blob_data) {
DCHECK(bridge_);
bridge_->Send(std::move(blob_data));
}
void WorkerWebSocketChannel::Close(int code, const String& reason) {
DCHECK(bridge_);
bridge_->Close(code, reason);
}
void WorkerWebSocketChannel::Fail(const String& reason,
MessageLevel level,
std::unique_ptr<SourceLocation> location) {
if (!bridge_)
return;
std::unique_ptr<SourceLocation> captured_location = SourceLocation::Capture();
if (!captured_location->IsUnknown()) {
// If we are in JavaScript context, use the current location instead
// of passed one - it's more precise.
bridge_->Fail(reason, level, std::move(captured_location));
} else if (location->IsUnknown()) {
// No information is specified by the caller - use the url
// and the line number at the connection.
bridge_->Fail(reason, level, location_at_connection_->Clone());
} else {
// Use the specified information.
bridge_->Fail(reason, level, std::move(location));
}
}
void WorkerWebSocketChannel::Disconnect() {
bridge_->Disconnect();
bridge_.Clear();
}
DEFINE_TRACE(WorkerWebSocketChannel) {
visitor->Trace(bridge_);
WebSocketChannel::Trace(visitor);
}
Peer::Peer(Bridge* bridge,
PassRefPtr<WorkerLoaderProxy> loader_proxy,
RefPtr<WebTaskRunner> worker_networking_task_runner,
WorkerThreadLifecycleContext* worker_thread_lifecycle_context)
: WorkerThreadLifecycleObserver(worker_thread_lifecycle_context),
bridge_(bridge),
loader_proxy_(std::move(loader_proxy)),
worker_networking_task_runner_(std::move(worker_networking_task_runner)),
main_web_socket_channel_(nullptr) {
DCHECK(IsMainThread());
}
Peer::~Peer() {
DCHECK(IsMainThread());
}
bool Peer::Initialize(std::unique_ptr<SourceLocation> location,
ThreadableLoadingContext* loading_context) {
DCHECK(IsMainThread());
if (WasContextDestroyedBeforeObserverCreation())
return false;
main_web_socket_channel_ = DocumentWebSocketChannel::Create(
loading_context, this, std::move(location));
return true;
}
bool Peer::Connect(const KURL& url, const String& protocol) {
DCHECK(IsMainThread());
if (!main_web_socket_channel_)
return false;
return main_web_socket_channel_->Connect(url, protocol);
}
void Peer::SendTextAsCharVector(std::unique_ptr<Vector<char>> data) {
DCHECK(IsMainThread());
if (main_web_socket_channel_)
main_web_socket_channel_->SendTextAsCharVector(std::move(data));
}
void Peer::SendBinaryAsCharVector(std::unique_ptr<Vector<char>> data) {
DCHECK(IsMainThread());
if (main_web_socket_channel_)
main_web_socket_channel_->SendBinaryAsCharVector(std::move(data));
}
void Peer::SendBlob(PassRefPtr<BlobDataHandle> blob_data) {
DCHECK(IsMainThread());
if (main_web_socket_channel_)
main_web_socket_channel_->Send(std::move(blob_data));
}
void Peer::Close(int code, const String& reason) {
DCHECK(IsMainThread());
if (!main_web_socket_channel_)
return;
main_web_socket_channel_->Close(code, reason);
}
void Peer::Fail(const String& reason,
MessageLevel level,
std::unique_ptr<SourceLocation> location) {
DCHECK(IsMainThread());
if (!main_web_socket_channel_)
return;
main_web_socket_channel_->Fail(reason, level, std::move(location));
}
void Peer::Disconnect() {
DCHECK(IsMainThread());
if (!main_web_socket_channel_)
return;
main_web_socket_channel_->Disconnect();
main_web_socket_channel_ = nullptr;
}
static void WorkerGlobalScopeDidConnect(Bridge* bridge,
const String& subprotocol,
const String& extensions) {
if (bridge && bridge->Client())
bridge->Client()->DidConnect(subprotocol, extensions);
}
void Peer::DidConnect(const String& subprotocol, const String& extensions) {
DCHECK(IsMainThread());
worker_networking_task_runner_->PostTask(
BLINK_FROM_HERE, CrossThreadBind(&WorkerGlobalScopeDidConnect, bridge_,
subprotocol, extensions));
}
static void WorkerGlobalScopeDidReceiveTextMessage(Bridge* bridge,
const String& payload) {
if (bridge && bridge->Client())
bridge->Client()->DidReceiveTextMessage(payload);
}
void Peer::DidReceiveTextMessage(const String& payload) {
DCHECK(IsMainThread());
worker_networking_task_runner_->PostTask(
BLINK_FROM_HERE, CrossThreadBind(&WorkerGlobalScopeDidReceiveTextMessage,
bridge_, payload));
}
static void WorkerGlobalScopeDidReceiveBinaryMessage(
Bridge* bridge,
std::unique_ptr<Vector<char>> payload) {
if (bridge && bridge->Client())
bridge->Client()->DidReceiveBinaryMessage(std::move(payload));
}
void Peer::DidReceiveBinaryMessage(std::unique_ptr<Vector<char>> payload) {
DCHECK(IsMainThread());
worker_networking_task_runner_->PostTask(
BLINK_FROM_HERE,
CrossThreadBind(&WorkerGlobalScopeDidReceiveBinaryMessage, bridge_,
WTF::Passed(std::move(payload))));
}
static void WorkerGlobalScopeDidConsumeBufferedAmount(Bridge* bridge,
uint64_t consumed) {
if (bridge && bridge->Client())
bridge->Client()->DidConsumeBufferedAmount(consumed);
}
void Peer::DidConsumeBufferedAmount(uint64_t consumed) {
DCHECK(IsMainThread());
worker_networking_task_runner_->PostTask(
BLINK_FROM_HERE,
CrossThreadBind(&WorkerGlobalScopeDidConsumeBufferedAmount, bridge_,
consumed));
}
static void WorkerGlobalScopeDidStartClosingHandshake(Bridge* bridge) {
if (bridge && bridge->Client())
bridge->Client()->DidStartClosingHandshake();
}
void Peer::DidStartClosingHandshake() {
DCHECK(IsMainThread());
worker_networking_task_runner_->PostTask(
BLINK_FROM_HERE,
CrossThreadBind(&WorkerGlobalScopeDidStartClosingHandshake, bridge_));
}
static void WorkerGlobalScopeDidClose(
Bridge* bridge,
WebSocketChannelClient::ClosingHandshakeCompletionStatus
closing_handshake_completion,
unsigned short code,
const String& reason) {
if (bridge && bridge->Client())
bridge->Client()->DidClose(closing_handshake_completion, code, reason);
}
void Peer::DidClose(
ClosingHandshakeCompletionStatus closing_handshake_completion,
unsigned short code,
const String& reason) {
DCHECK(IsMainThread());
if (main_web_socket_channel_) {
main_web_socket_channel_->Disconnect();
main_web_socket_channel_ = nullptr;
}
worker_networking_task_runner_->PostTask(
BLINK_FROM_HERE,
CrossThreadBind(&WorkerGlobalScopeDidClose, bridge_,
closing_handshake_completion, code, reason));
}
static void WorkerGlobalScopeDidError(Bridge* bridge) {
if (bridge && bridge->Client())
bridge->Client()->DidError();
}
void Peer::DidError() {
DCHECK(IsMainThread());
worker_networking_task_runner_->PostTask(
BLINK_FROM_HERE, CrossThreadBind(&WorkerGlobalScopeDidError, bridge_));
}
void Peer::ContextDestroyed(WorkerThreadLifecycleContext*) {
DCHECK(IsMainThread());
if (main_web_socket_channel_) {
main_web_socket_channel_->Disconnect();
main_web_socket_channel_ = nullptr;
}
bridge_ = nullptr;
}
DEFINE_TRACE(Peer) {
visitor->Trace(main_web_socket_channel_);
WebSocketChannelClient::Trace(visitor);
WorkerThreadLifecycleObserver::Trace(visitor);
}
Bridge::Bridge(WebSocketChannelClient* client,
WorkerGlobalScope& worker_global_scope)
: client_(client),
worker_global_scope_(worker_global_scope),
loader_proxy_(worker_global_scope_->GetThread()->GetWorkerLoaderProxy()),
parent_frame_task_runners_(
worker_global_scope_->GetThread()->GetParentFrameTaskRunners()) {}
Bridge::~Bridge() {
DCHECK(!peer_);
}
void Bridge::ConnectOnMainThread(
std::unique_ptr<SourceLocation> location,
RefPtr<WorkerLoaderProxy> loader_proxy,
RefPtr<WebTaskRunner> worker_networking_task_runner,
WorkerThreadLifecycleContext* worker_thread_lifecycle_context,
const KURL& url,
const String& protocol,
WebSocketChannelSyncHelper* sync_helper) {
DCHECK(IsMainThread());
DCHECK(!peer_);
ThreadableLoadingContext* loading_context =
loader_proxy->GetThreadableLoadingContext();
if (!loading_context)
return;
Peer* peer =
new Peer(this, loader_proxy_, std::move(worker_networking_task_runner),
worker_thread_lifecycle_context);
if (peer->Initialize(std::move(location), loading_context)) {
peer_ = peer;
sync_helper->SetConnectRequestResult(peer_->Connect(url, protocol));
}
sync_helper->SignalWorkerThread();
}
bool Bridge::Connect(std::unique_ptr<SourceLocation> location,
const KURL& url,
const String& protocol) {
// Wait for completion of the task on the main thread because the mixed
// content check must synchronously be conducted.
WebSocketChannelSyncHelper sync_helper;
RefPtr<WebTaskRunner> worker_networking_task_runner =
TaskRunnerHelper::Get(TaskType::kNetworking, worker_global_scope_.Get());
parent_frame_task_runners_->Get(TaskType::kNetworking)
->PostTask(
BLINK_FROM_HERE,
CrossThreadBind(&Bridge::ConnectOnMainThread,
WrapCrossThreadPersistent(this),
WTF::Passed(location->Clone()), loader_proxy_,
std::move(worker_networking_task_runner),
WrapCrossThreadPersistent(
worker_global_scope_->GetThread()
->GetWorkerThreadLifecycleContext()),
url, protocol, CrossThreadUnretained(&sync_helper)));
sync_helper.Wait();
return sync_helper.ConnectRequestResult();
}
void Bridge::Send(const CString& message) {
DCHECK(peer_);
std::unique_ptr<Vector<char>> data =
WTF::WrapUnique(new Vector<char>(message.length()));
if (message.length())
memcpy(data->data(), static_cast<const char*>(message.data()),
message.length());
parent_frame_task_runners_->Get(TaskType::kNetworking)
->PostTask(BLINK_FROM_HERE,
CrossThreadBind(&Peer::SendTextAsCharVector, peer_,
WTF::Passed(std::move(data))));
}
void Bridge::Send(const DOMArrayBuffer& binary_data,
unsigned byte_offset,
unsigned byte_length) {
DCHECK(peer_);
// ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied
// into Vector<char>.
std::unique_ptr<Vector<char>> data =
WTF::MakeUnique<Vector<char>>(byte_length);
if (binary_data.ByteLength())
memcpy(data->data(),
static_cast<const char*>(binary_data.Data()) + byte_offset,
byte_length);
parent_frame_task_runners_->Get(TaskType::kNetworking)
->PostTask(BLINK_FROM_HERE,
CrossThreadBind(&Peer::SendBinaryAsCharVector, peer_,
WTF::Passed(std::move(data))));
}
void Bridge::Send(PassRefPtr<BlobDataHandle> data) {
DCHECK(peer_);
parent_frame_task_runners_->Get(TaskType::kNetworking)
->PostTask(BLINK_FROM_HERE,
CrossThreadBind(&Peer::SendBlob, peer_, std::move(data)));
}
void Bridge::Close(int code, const String& reason) {
DCHECK(peer_);
parent_frame_task_runners_->Get(TaskType::kNetworking)
->PostTask(BLINK_FROM_HERE,
CrossThreadBind(&Peer::Close, peer_, code, reason));
}
void Bridge::Fail(const String& reason,
MessageLevel level,
std::unique_ptr<SourceLocation> location) {
DCHECK(peer_);
parent_frame_task_runners_->Get(TaskType::kNetworking)
->PostTask(BLINK_FROM_HERE,
CrossThreadBind(&Peer::Fail, peer_, reason, level,
WTF::Passed(location->Clone())));
}
void Bridge::Disconnect() {
if (!peer_)
return;
parent_frame_task_runners_->Get(TaskType::kNetworking)
->PostTask(BLINK_FROM_HERE, CrossThreadBind(&Peer::Disconnect, peer_));
client_ = nullptr;
peer_ = nullptr;
worker_global_scope_.Clear();
}
DEFINE_TRACE(Bridge) {
visitor->Trace(client_);
visitor->Trace(worker_global_scope_);
}
} // namespace blink