| /* |
| * Copyright (C) 2012 Google Inc. All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions |
| * are met: |
| * 1. Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright |
| * notice, this list of conditions and the following disclaimer in the |
| * documentation and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' AND |
| * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
| * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
| * DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR |
| * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
| * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR |
| * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
| * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, |
| * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "third_party/blink/renderer/modules/peerconnection/rtc_data_channel.h" |
| |
| #include <limits> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| |
| #include "base/containers/span.h" |
| #include "base/metrics/histogram_functions.h" |
| #include "base/metrics/histogram_macros.h" |
| #include "base/numerics/safe_conversions.h" |
| #include "base/task/single_thread_task_runner.h" |
| #include "components/webrtc/thread_wrapper.h" |
| #include "third_party/blink/public/platform/task_type.h" |
| #include "third_party/blink/renderer/bindings/modules/v8/v8_rtc_data_channel_state.h" |
| #include "third_party/blink/renderer/bindings/modules/v8/v8_rtc_priority_type.h" |
| #include "third_party/blink/renderer/core/events/message_event.h" |
| #include "third_party/blink/renderer/core/execution_context/execution_context.h" |
| #include "third_party/blink/renderer/core/execution_context/execution_context_lifecycle_observer.h" |
| #include "third_party/blink/renderer/core/fileapi/blob.h" |
| #include "third_party/blink/renderer/core/fileapi/file_error.h" |
| #include "third_party/blink/renderer/core/fileapi/file_reader_client.h" |
| #include "third_party/blink/renderer/core/fileapi/file_reader_loader.h" |
| #include "third_party/blink/renderer/core/frame/local_dom_window.h" |
| #include "third_party/blink/renderer/core/frame/local_frame.h" |
| #include "third_party/blink/renderer/core/inspector/console_message.h" |
| #include "third_party/blink/renderer/core/typed_arrays/dom_array_buffer.h" |
| #include "third_party/blink/renderer/core/typed_arrays/dom_array_buffer_view.h" |
| #include "third_party/blink/renderer/modules/peerconnection/adapters/web_rtc_cross_thread_copier.h" |
| #include "third_party/blink/renderer/modules/peerconnection/rtc_error_event.h" |
| #include "third_party/blink/renderer/modules/peerconnection/rtc_peer_connection.h" |
| #include "third_party/blink/renderer/modules/peerconnection/rtc_peer_connection_handler.h" |
| #include "third_party/blink/renderer/platform/bindings/exception_state.h" |
| #include "third_party/blink/renderer/platform/heap/garbage_collected.h" |
| #include "third_party/blink/renderer/platform/runtime_enabled_features.h" |
| #include "third_party/blink/renderer/platform/scheduler/public/post_cross_thread_task.h" |
| #include "third_party/blink/renderer/platform/scheduler/public/scheduling_policy.h" |
| #include "third_party/blink/renderer/platform/wtf/cross_thread_copier_base.h" |
| #include "third_party/blink/renderer/platform/wtf/cross_thread_copier_std.h" |
| #include "third_party/blink/renderer/platform/wtf/cross_thread_functional.h" |
| #include "third_party/blink/renderer/platform/wtf/thread_safe_ref_counted.h" |
| #include "third_party/webrtc/api/priority.h" |
| |
| namespace blink { |
| |
| template <> |
| struct CrossThreadCopier<webrtc::scoped_refptr<webrtc::DataChannelInterface>> |
| : public CrossThreadCopierPassThrough< |
| webrtc::scoped_refptr<webrtc::DataChannelInterface>> { |
| STATIC_ONLY(CrossThreadCopier); |
| }; |
| |
| namespace { |
| |
| // These values are persisted to logs. Entries should not be renumbered and |
| // numeric values should never be reused. |
| enum class DataChannelCounters { |
| kCreated = 0, |
| kOpened = 1, |
| kReliable = 2, |
| kOrdered = 3, |
| kNegotiated = 4, |
| kMaxValue = kNegotiated, |
| }; |
| |
| // These values are persisted to logs. Entries should not be renumbered and |
| // numeric values should never be reused. |
| enum class DataChannelAggregateType { |
| kUnReliableUnordered = 0, |
| kUnReliableOrdered = 1, |
| kReliableUnordered = 2, |
| kReliableOrdered = 3, |
| kMaxValue = kReliableOrdered, |
| }; |
| |
| void IncrementCounter(DataChannelCounters counter) { |
| base::UmaHistogramEnumeration("WebRTC.DataChannelCounters", counter); |
| } |
| |
| void IncrementCounters(const webrtc::DataChannelInterface& channel) { |
| int aggregate_type = 0; |
| |
| IncrementCounter(DataChannelCounters::kCreated); |
| if (channel.reliable()) { |
| IncrementCounter(DataChannelCounters::kReliable); |
| aggregate_type += 2; |
| } |
| if (channel.ordered()) { |
| IncrementCounter(DataChannelCounters::kOrdered); |
| aggregate_type += 1; |
| } |
| if (channel.negotiated()) |
| IncrementCounter(DataChannelCounters::kNegotiated); |
| |
| base::UmaHistogramEnumeration( |
| "WebRTC.DataChannelAggregateType", |
| static_cast<DataChannelAggregateType>(aggregate_type)); |
| |
| // Only record max retransmits and max packet life time if set. |
| if (channel.maxRetransmitsOpt()) { |
| base::UmaHistogramCustomCounts("WebRTC.DataChannelMaxRetransmits", |
| *(channel.maxRetransmitsOpt()), 1, |
| std::numeric_limits<uint16_t>::max(), 50); |
| } |
| if (channel.maxPacketLifeTime()) { |
| base::UmaHistogramCustomCounts("WebRTC.DataChannelMaxPacketLifeTime", |
| *channel.maxPacketLifeTime(), 1, |
| std::numeric_limits<uint16_t>::max(), 50); |
| } |
| } |
| |
| void RecordMessageSent(const webrtc::DataChannelInterface& channel, |
| size_t num_bytes) { |
| // Currently, messages are capped at some fairly low limit (16 Kb?) |
| // but we may allow unlimited-size messages at some point, so making |
| // the histogram maximum quite large (100 Mb) to have some |
| // granularity at the higher end in that eventuality. The histogram |
| // buckets are exponentially growing in size, so we'll still have |
| // good granularity at the low end. |
| |
| // This makes the last bucket in the histogram count messages from |
| // 100 Mb to infinity. |
| const int kMaxBucketSize = 100 * 1024 * 1024; |
| const int kNumBuckets = 50; |
| |
| if (channel.reliable()) { |
| UMA_HISTOGRAM_CUSTOM_COUNTS("WebRTC.ReliableDataChannelMessageSize", |
| base::checked_cast<int>(num_bytes), 1, |
| kMaxBucketSize, kNumBuckets); |
| } else { |
| UMA_HISTOGRAM_CUSTOM_COUNTS("WebRTC.UnreliableDataChannelMessageSize", |
| base::checked_cast<int>(num_bytes), 1, |
| kMaxBucketSize, kNumBuckets); |
| } |
| } |
| |
| // These values are persisted to logs. Entries should not be renumbered and |
| // numeric values should never be reused. |
| enum class DataChannelSctpErrorCode { |
| kUnspecified = 0, |
| kInvalidStreamIdentifier = 1, |
| kMissingMandatoryParameter = 2, |
| kStaleCookieError = 3, |
| kOutOfResource = 4, |
| kUnresolvableAddress = 5, |
| kUnrecognizedChunkType = 6, |
| kInvalidMandatoryParameter = 7, |
| kUnrecognizedParameters = 8, |
| kNoUserData = 9, |
| kCookieReceivedWhileShuttingDown = 10, |
| kRestartWithNewAddresses = 11, |
| kUserInitiatedAbort = 12, |
| kProtocolViolation = 13, |
| kOther = 14, |
| kMaxValue = kOther, |
| }; |
| |
| void IncrementErrorCounter(const webrtc::RTCError& error) { |
| DataChannelSctpErrorCode uma_code; |
| auto code = error.sctp_cause_code(); |
| if (!code.has_value()) { |
| uma_code = DataChannelSctpErrorCode::kUnspecified; |
| } else if (*code >= static_cast<int>(DataChannelSctpErrorCode::kOther)) { |
| uma_code = DataChannelSctpErrorCode::kOther; |
| } else { |
| uma_code = static_cast<DataChannelSctpErrorCode>(*code); |
| } |
| base::UmaHistogramEnumeration("WebRTC.DataChannelSctpErrorCode", uma_code); |
| } |
| |
| } // namespace |
| |
| static void ThrowNotOpenException(ExceptionState* exception_state) { |
| exception_state->ThrowDOMException(DOMExceptionCode::kInvalidStateError, |
| "RTCDataChannel.readyState is not 'open'"); |
| } |
| |
| static void ThrowSendBufferFullException(ExceptionState* exception_state) { |
| exception_state->ThrowDOMException(DOMExceptionCode::kOperationError, |
| "RTCDataChannel send queue is full"); |
| } |
| |
| RTCDataChannel::Observer::Observer( |
| scoped_refptr<base::SingleThreadTaskRunner> main_thread, |
| RTCDataChannel* blink_channel, |
| webrtc::scoped_refptr<webrtc::DataChannelInterface> channel) |
| : main_thread_(main_thread), |
| blink_channel_(blink_channel), |
| webrtc_channel_(std::move(channel)) { |
| CHECK(webrtc_channel_.get()); |
| } |
| |
| RTCDataChannel::Observer::~Observer() { |
| CHECK(!is_registered()) << "Reference to blink channel hasn't been released."; |
| } |
| |
| const webrtc::scoped_refptr<webrtc::DataChannelInterface>& |
| RTCDataChannel::Observer::channel() const { |
| return webrtc_channel_; |
| } |
| |
| bool RTCDataChannel::Observer::is_registered() const { |
| DCHECK(main_thread_->BelongsToCurrentThread()); |
| return blink_channel_ != nullptr; |
| } |
| |
| void RTCDataChannel::Observer::Unregister() { |
| DCHECK(main_thread_->BelongsToCurrentThread()); |
| webrtc_channel_->UnregisterObserver(); |
| blink_channel_ = nullptr; |
| } |
| |
| void RTCDataChannel::Observer::OnStateChange() { |
| PostCrossThreadTask( |
| *main_thread_, FROM_HERE, |
| CrossThreadBindOnce(&RTCDataChannel::Observer::OnStateChangeImpl, |
| scoped_refptr<Observer>(this), |
| webrtc_channel_->state())); |
| } |
| |
| void RTCDataChannel::Observer::OnBufferedAmountChange(uint64_t sent_data_size) { |
| PostCrossThreadTask( |
| *main_thread_, FROM_HERE, |
| CrossThreadBindOnce(&RTCDataChannel::Observer::OnBufferedAmountChangeImpl, |
| scoped_refptr<Observer>(this), |
| base::checked_cast<unsigned>(sent_data_size))); |
| } |
| |
| void RTCDataChannel::Observer::OnMessage(const webrtc::DataBuffer& buffer) { |
| PostCrossThreadTask( |
| *main_thread_, FROM_HERE, |
| CrossThreadBindOnce(&RTCDataChannel::Observer::OnMessageImpl, |
| scoped_refptr<Observer>(this), buffer)); |
| } |
| |
| bool RTCDataChannel::Observer::IsOkToCallOnTheNetworkThread() { |
| return true; |
| } |
| |
| void RTCDataChannel::Observer::OnStateChangeImpl( |
| webrtc::DataChannelInterface::DataState state) { |
| DCHECK(main_thread_->BelongsToCurrentThread()); |
| if (blink_channel_) |
| blink_channel_->OnStateChange(state); |
| } |
| |
| void RTCDataChannel::Observer::OnBufferedAmountChangeImpl( |
| unsigned sent_data_size) { |
| DCHECK(main_thread_->BelongsToCurrentThread()); |
| if (blink_channel_) |
| blink_channel_->OnBufferedAmountChange(sent_data_size); |
| } |
| |
| void RTCDataChannel::Observer::OnMessageImpl(webrtc::DataBuffer buffer) { |
| DCHECK(main_thread_->BelongsToCurrentThread()); |
| if (blink_channel_) |
| blink_channel_->OnMessage(std::move(buffer)); |
| } |
| |
| // static |
| void RTCDataChannel::EnsureThreadWrappersForWorkerThread() { |
| webrtc::ThreadWrapper::EnsureForCurrentMessageLoop(); |
| webrtc::ThreadWrapper::current()->set_send_allowed(true); |
| } |
| |
| RTCDataChannel::RTCDataChannel( |
| ExecutionContext* context, |
| webrtc::scoped_refptr<webrtc::DataChannelInterface> data_channel) |
| : ActiveScriptWrappable<RTCDataChannel>({}), |
| ExecutionContextLifecycleObserver(context), |
| observer_(base::MakeRefCounted<Observer>( |
| context->GetTaskRunner(TaskType::kNetworking), |
| this, |
| std::move(data_channel))) { |
| if (RuntimeEnabledFeatures::TransferableRTCDataChannelEnabled()) { |
| // Delay connecting to the observer to give a chance for this RTCDataChannel |
| // to be transferred. See: |
| // https://w3c.github.io/webrtc-extensions/#rtcdatachannel-transferable |
| context->GetTaskRunner(TaskType::kNetworking) |
| ->PostTask(FROM_HERE, BindOnce(&RTCDataChannel::RegisterObserver, |
| WrapWeakPersistent(this))); |
| } else { |
| RegisterObserver(); |
| } |
| |
| IncrementCounters(*channel().get()); |
| } |
| |
| RTCDataChannel::~RTCDataChannel() { |
| // `Dispose()` must have been called to clear up webrtc references. |
| CHECK(!observer_->is_registered()); |
| } |
| |
| void RTCDataChannel::RegisterObserver() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| is_transferable_ = false; |
| |
| // Do not connect if `this` was transferred, as it should be going away soon. |
| if (was_transferred_) { |
| return; |
| } |
| |
| // The context might have been destroyed already if registration was delayed. |
| if (stopped_) { |
| return; |
| } |
| |
| channel()->RegisterObserver(observer_.get()); |
| if (channel()->state() != state_) { |
| observer_->OnStateChange(); |
| } |
| } |
| |
| String RTCDataChannel::label() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return String::FromUTF8(channel()->label()); |
| } |
| |
| bool RTCDataChannel::reliable() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return channel()->reliable(); |
| } |
| |
| bool RTCDataChannel::ordered() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return channel()->ordered(); |
| } |
| |
| std::optional<uint16_t> RTCDataChannel::maxPacketLifeTime() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| if (channel()->maxPacketLifeTime()) |
| return *channel()->maxPacketLifeTime(); |
| return std::nullopt; |
| } |
| |
| std::optional<uint16_t> RTCDataChannel::maxRetransmits() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| if (channel()->maxRetransmitsOpt()) |
| return *channel()->maxRetransmitsOpt(); |
| return std::nullopt; |
| } |
| |
| String RTCDataChannel::protocol() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return String::FromUTF8(channel()->protocol()); |
| } |
| |
| bool RTCDataChannel::negotiated() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return channel()->negotiated(); |
| } |
| |
| std::optional<uint16_t> RTCDataChannel::id() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| if (id_.has_value()) { |
| return id_; |
| } |
| |
| int id = channel()->id(); |
| if (id == -1) { |
| return std::nullopt; |
| } |
| |
| DCHECK(id >= 0 && id <= std::numeric_limits<uint16_t>::max()); |
| id_ = static_cast<uint16_t>(id); |
| |
| return id; |
| } |
| |
| V8RTCPriorityType RTCDataChannel::priority() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| webrtc::PriorityValue priority = channel()->priority(); |
| if (priority <= webrtc::PriorityValue(webrtc::Priority::kVeryLow)) { |
| return V8RTCPriorityType(V8RTCPriorityType::Enum::kVeryLow); |
| } |
| if (priority <= webrtc::PriorityValue(webrtc::Priority::kLow)) { |
| return V8RTCPriorityType(V8RTCPriorityType::Enum::kLow); |
| } |
| if (priority <= webrtc::PriorityValue(webrtc::Priority::kMedium)) { |
| return V8RTCPriorityType(V8RTCPriorityType::Enum::kMedium); |
| } |
| return V8RTCPriorityType(V8RTCPriorityType::Enum::kHigh); |
| } |
| |
| V8RTCDataChannelState RTCDataChannel::readyState() const { |
| switch (state_) { |
| case webrtc::DataChannelInterface::kConnecting: |
| return V8RTCDataChannelState(V8RTCDataChannelState::Enum::kConnecting); |
| case webrtc::DataChannelInterface::kOpen: |
| return V8RTCDataChannelState(V8RTCDataChannelState::Enum::kOpen); |
| case webrtc::DataChannelInterface::kClosing: |
| return V8RTCDataChannelState(V8RTCDataChannelState::Enum::kClosing); |
| case webrtc::DataChannelInterface::kClosed: |
| return V8RTCDataChannelState(V8RTCDataChannelState::Enum::kClosed); |
| } |
| |
| NOTREACHED(); |
| } |
| |
| unsigned RTCDataChannel::bufferedAmount() const { |
| return buffered_amount_; |
| } |
| |
| unsigned RTCDataChannel::bufferedAmountLowThreshold() const { |
| return buffered_amount_low_threshold_; |
| } |
| |
| void RTCDataChannel::setBufferedAmountLowThreshold(unsigned threshold) { |
| buffered_amount_low_threshold_ = threshold; |
| } |
| |
| V8BinaryType RTCDataChannel::binaryType() const { |
| return V8BinaryType(binary_type_); |
| } |
| |
| void RTCDataChannel::setBinaryType(const V8BinaryType& binary_type) { |
| binary_type_ = binary_type.AsEnum(); |
| } |
| |
| bool RTCDataChannel::ValidateSendLength(uint64_t length, |
| ExceptionState& exception_state) { |
| // Send algorithm: https://w3c.github.io/webrtc-pc/#datachannel-send |
| |
| // TODO(orphis): Throw TypeError if length > transport.maxMessageSize |
| |
| auto updated_buffered_amount = |
| base::CheckedNumeric<unsigned>(buffered_amount_) + length; |
| if (!updated_buffered_amount.IsValid() || |
| updated_buffered_amount.ValueOrDie() > |
| webrtc::DataChannelInterface::MaxSendQueueSize()) { |
| ThrowSendBufferFullException(&exception_state); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| void RTCDataChannel::send(const String& data, ExceptionState& exception_state) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| is_transferable_ = false; |
| |
| if (state_ != webrtc::DataChannelInterface::kOpen) { |
| ThrowNotOpenException(&exception_state); |
| return; |
| } |
| |
| webrtc::DataBuffer data_buffer(data.Utf8()); |
| |
| if (!ValidateSendLength(data_buffer.size(), exception_state)) |
| return; |
| |
| buffered_amount_ += data_buffer.size(); |
| RecordMessageSent(*channel().get(), data_buffer.size()); |
| SendDataBuffer(std::move(data_buffer)); |
| } |
| |
| void RTCDataChannel::send(DOMArrayBuffer* data, |
| ExceptionState& exception_state) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| is_transferable_ = false; |
| |
| if (state_ != webrtc::DataChannelInterface::kOpen) { |
| ThrowNotOpenException(&exception_state); |
| return; |
| } |
| |
| size_t data_length = data->ByteLength(); |
| |
| if (!ValidateSendLength(data_length, exception_state)) |
| return; |
| |
| buffered_amount_ += data_length; |
| SendRawData(static_cast<const char*>((data->Data())), data_length); |
| } |
| |
| void RTCDataChannel::send(NotShared<DOMArrayBufferView> data, |
| ExceptionState& exception_state) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| is_transferable_ = false; |
| |
| if (state_ != webrtc::DataChannelInterface::kOpen) { |
| ThrowNotOpenException(&exception_state); |
| return; |
| } |
| |
| if (!ValidateSendLength(data->byteLength(), exception_state)) |
| return; |
| |
| buffered_amount_ += data->byteLength(); |
| SendRawData(static_cast<const char*>(data->BaseAddress()), |
| data->byteLength()); |
| } |
| |
| void RTCDataChannel::send(Blob* data, ExceptionState& exception_state) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| is_transferable_ = false; |
| |
| if (state_ != webrtc::DataChannelInterface::kOpen) { |
| ThrowNotOpenException(&exception_state); |
| return; |
| } |
| |
| if (!ValidateSendLength(data->size(), exception_state)) { |
| return; |
| } |
| |
| buffered_amount_ += data->size(); |
| |
| PendingMessage* message = MakeGarbageCollected<PendingMessage>(); |
| message->type_ = PendingMessage::Type::kBufferPending; |
| message->blob_reader_ = |
| BlobReader::Create(GetExecutionContext(), this, message); |
| message->blob_reader_->Start(data); |
| pending_messages_.push_back(message); |
| } |
| |
| void RTCDataChannel::close() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| if (state_ == webrtc::DataChannelInterface::kClosing || |
| state_ == webrtc::DataChannelInterface::kClosed) { |
| return; |
| } |
| closed_from_owner_ = true; |
| OnStateChange(webrtc::DataChannelInterface::kClosing); |
| |
| if (pending_messages_.empty()) { |
| channel()->Close(); |
| } else { |
| PendingMessage* message = MakeGarbageCollected<PendingMessage>(); |
| message->type_ = PendingMessage::Type::kCloseEvent; |
| pending_messages_.push_back(message); |
| } |
| } |
| |
| bool RTCDataChannel::IsTransferable() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return RuntimeEnabledFeatures::TransferableRTCDataChannelEnabled() && |
| is_transferable_; |
| } |
| |
| webrtc::scoped_refptr<webrtc::DataChannelInterface> |
| RTCDataChannel::TransferUnderlyingChannel() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| |
| if (!IsTransferable()) { |
| return nullptr; |
| } |
| |
| // Only allow a single transfer. |
| was_transferred_ = true; |
| is_transferable_ = false; |
| |
| // Bypass OnStateChange() to avoid emitting an event. |
| state_ = webrtc::DataChannelInterface::kClosed; |
| feature_handle_for_scheduler_.reset(); |
| |
| return channel(); |
| } |
| |
| const AtomicString& RTCDataChannel::InterfaceName() const { |
| return event_target_names::kRTCDataChannel; |
| } |
| |
| ExecutionContext* RTCDataChannel::GetExecutionContext() const { |
| return ExecutionContextLifecycleObserver::GetExecutionContext(); |
| } |
| |
| void RTCDataChannel::ContextDestroyed() { |
| Dispose(); |
| stopped_ = true; |
| state_ = webrtc::DataChannelInterface::kClosed; |
| feature_handle_for_scheduler_.reset(); |
| } |
| |
| // ActiveScriptWrappable |
| bool RTCDataChannel::HasPendingActivity() const { |
| if (stopped_) |
| return false; |
| |
| // A RTCDataChannel object must not be garbage collected if its |
| // * readyState is connecting and at least one event listener is registered |
| // for open events, message events, error events, closing events |
| // or close events. |
| // * readyState is open and at least one event listener is registered for |
| // message events, error events, closing events, or close events. |
| // * readyState is closing and at least one event listener is registered for |
| // error events, or close events. |
| // * underlying data transport is established and data is queued to be |
| // transmitted. |
| bool has_valid_listeners = false; |
| switch (state_) { |
| case webrtc::DataChannelInterface::kConnecting: |
| has_valid_listeners |= HasEventListeners(event_type_names::kOpen); |
| [[fallthrough]]; |
| case webrtc::DataChannelInterface::kOpen: |
| has_valid_listeners |= HasEventListeners(event_type_names::kMessage) || |
| HasEventListeners(event_type_names::kClosing); |
| [[fallthrough]]; |
| case webrtc::DataChannelInterface::kClosing: |
| has_valid_listeners |= HasEventListeners(event_type_names::kError) || |
| HasEventListeners(event_type_names::kClose); |
| break; |
| default: |
| break; |
| } |
| |
| if (has_valid_listeners) |
| return true; |
| |
| return state_ != webrtc::DataChannelInterface::kClosed && |
| bufferedAmount() > 0; |
| } |
| |
| void RTCDataChannel::Trace(Visitor* visitor) const { |
| visitor->Trace(pending_messages_); |
| EventTarget::Trace(visitor); |
| ExecutionContextLifecycleObserver::Trace(visitor); |
| } |
| |
| void RTCDataChannel::SetStateToOpenWithoutEvent() { |
| DCHECK_NE(state_, webrtc::DataChannelInterface::kOpen); |
| IncrementCounter(DataChannelCounters::kOpened); |
| state_ = webrtc::DataChannelInterface::kOpen; |
| CreateFeatureHandleForScheduler(); |
| } |
| |
| void RTCDataChannel::DispatchOpenEvent() { |
| DispatchEvent(*Event::Create(event_type_names::kOpen)); |
| } |
| |
| void RTCDataChannel::OnStateChange( |
| webrtc::DataChannelInterface::DataState state) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| |
| if (state_ == webrtc::DataChannelInterface::kClosed) |
| return; |
| |
| if (state_ == webrtc::DataChannelInterface::kClosing && |
| state != webrtc::DataChannelInterface::kClosed) { |
| return; |
| } |
| |
| if (state == state_) { |
| return; |
| } |
| |
| state_ = state; |
| |
| switch (state_) { |
| case webrtc::DataChannelInterface::kOpen: |
| IncrementCounter(DataChannelCounters::kOpened); |
| CreateFeatureHandleForScheduler(); |
| DispatchEvent(*Event::Create(event_type_names::kOpen)); |
| break; |
| case webrtc::DataChannelInterface::kClosing: |
| if (!closed_from_owner_) { |
| DispatchEvent(*Event::Create(event_type_names::kClosing)); |
| } |
| break; |
| case webrtc::DataChannelInterface::kClosed: { |
| feature_handle_for_scheduler_.reset(); |
| auto error = channel()->error(); |
| if (!error.ok()) { |
| LOG(ERROR) << "DataChannel error: \"" << error.message() << "\"" |
| << ", code: " << error.sctp_cause_code().value_or(-1); |
| |
| if (error.error_detail() == webrtc::RTCErrorDetailType::NONE) { |
| error.set_error_detail( |
| webrtc::RTCErrorDetailType::DATA_CHANNEL_FAILURE); |
| } |
| |
| IncrementErrorCounter(error); |
| DispatchEvent(*MakeGarbageCollected<RTCErrorEvent>( |
| event_type_names::kError, error)); |
| } |
| DispatchEvent(*Event::Create(event_type_names::kClose)); |
| break; |
| } |
| default: |
| break; |
| } |
| } |
| |
| void RTCDataChannel::OnBufferedAmountChange(unsigned sent_data_size) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| unsigned previous_amount = buffered_amount_; |
| DVLOG(1) << "OnBufferedAmountChange " << previous_amount; |
| DCHECK_GE(buffered_amount_, sent_data_size); |
| buffered_amount_ -= sent_data_size; |
| |
| if (previous_amount > buffered_amount_low_threshold_ && |
| buffered_amount_ <= buffered_amount_low_threshold_) { |
| DispatchEvent(*Event::Create(event_type_names::kBufferedamountlow)); |
| } |
| } |
| |
| void RTCDataChannel::OnMessage(webrtc::DataBuffer buffer) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| |
| if (buffer.binary) { |
| switch (binary_type_) { |
| case V8BinaryType::Enum::kBlob: { |
| auto blob_data = std::make_unique<BlobData>(); |
| blob_data->AppendBytes(base::span(buffer.data)); |
| uint64_t blob_size = blob_data->length(); |
| auto* blob = MakeGarbageCollected<Blob>( |
| BlobDataHandle::Create(std::move(blob_data), blob_size)); |
| DispatchEvent(*MessageEvent::Create(blob)); |
| return; |
| } |
| case V8BinaryType::Enum::kArraybuffer: { |
| DOMArrayBuffer* dom_buffer = DOMArrayBuffer::Create(buffer.data); |
| DispatchEvent(*MessageEvent::Create(dom_buffer)); |
| return; |
| } |
| } |
| NOTREACHED(); |
| } else { |
| String text = |
| buffer.data.size() > 0 ? String::FromUTF8(buffer.data) : g_empty_string; |
| if (!text) { |
| LOG(ERROR) << "Failed convert received data to UTF16"; |
| return; |
| } |
| DispatchEvent(*MessageEvent::Create(text)); |
| } |
| } |
| |
| void RTCDataChannel::Dispose() { |
| if (stopped_) |
| return; |
| |
| // If `this` was transferred, DelayObserverRegistration() should have never |
| // registered `observer_`. |
| if (!was_transferred_) { |
| // Clear the weak persistent reference to this on-heap object. |
| observer_->Unregister(); |
| } |
| } |
| |
| const webrtc::scoped_refptr<webrtc::DataChannelInterface>& |
| RTCDataChannel::channel() const { |
| return observer_->channel(); |
| } |
| |
| void RTCDataChannel::SendRawData(const char* data, size_t length) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| CHECK(!was_transferred_); |
| webrtc::CopyOnWriteBuffer buffer(data, length); |
| webrtc::DataBuffer data_buffer(buffer, true); |
| RecordMessageSent(*channel().get(), data_buffer.size()); |
| |
| if (pending_messages_.empty()) { |
| SendDataBuffer(std::move(data_buffer)); |
| } else { |
| PendingMessage* message = MakeGarbageCollected<PendingMessage>(); |
| message->type_ = PendingMessage::Type::kBufferReady; |
| message->buffer_ = std::move(data_buffer); |
| pending_messages_.push_back(message); |
| } |
| } |
| |
| void RTCDataChannel::SendDataBuffer(webrtc::DataBuffer data_buffer) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| CHECK(!was_transferred_); |
| |
| // SCTP data channels queue the packet on failure and always return true, so |
| // Send can be called asynchronously for them. |
| channel()->SendAsync(std::move(data_buffer), [](webrtc::RTCError error) { |
| // TODO(orphis): Use this callback in combination with SendAsync to report |
| // completion of the send API to the JS layer. |
| // The possible failures per the spec are: |
| // - Channel not in open state. Although we check the state in each Send() |
| // implementation, it's possible to have a short race between the WebRTC |
| // state and the Chrome state, i.e. sending while a remote close event is |
| // pending. In this case, it's safe to ignore send failures. |
| // - Data longer than the transport maxMessageSize (not yet implemented in |
| // WebRTC or Blink). |
| // - Send Buffers full (buffered amount accounting in Blink layer to check |
| // for it). |
| if (!error.ok()) { |
| // TODO(orphis): Add collect UMA stats about failure. |
| // Note that when we get this callback, we're on WebRTC's network thread |
| // So the callback needs to be propagated to the main (JS) thread. |
| LOG(ERROR) << "Send failed" << webrtc::ToString(error.type()); |
| } |
| }); |
| } |
| |
| void RTCDataChannel::CreateFeatureHandleForScheduler() { |
| DCHECK(!feature_handle_for_scheduler_); |
| LocalDOMWindow* window = DynamicTo<LocalDOMWindow>(GetExecutionContext()); |
| // Ideally we'd use To<LocalDOMWindow>, but in unittests the ExecutionContext |
| // may not be a LocalDOMWindow. |
| if (!window) |
| return; |
| // This can happen for detached frames. |
| if (!window->GetFrame()) |
| return; |
| feature_handle_for_scheduler_ = |
| window->GetFrame()->GetFrameScheduler()->RegisterFeature( |
| SchedulingPolicy::Feature::kWebRTC, |
| {SchedulingPolicy::DisableAggressiveThrottling(), |
| SchedulingPolicy::DisableAlignWakeUps()}); |
| } |
| |
| void RTCDataChannel::ProcessSendQueue() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| bool stop_processing = false; |
| while (!pending_messages_.empty() && !stop_processing) { |
| auto& message = pending_messages_.front(); |
| switch (message->type_) { |
| case PendingMessage::Type::kBufferReady: |
| SendDataBuffer(std::move(*message->buffer_)); |
| pending_messages_.pop_front(); |
| break; |
| case PendingMessage::Type::kBufferPending: |
| if (message->blob_reader_->HasFinishedLoading()) { |
| SendDataBuffer(std::move(*message->buffer_)); |
| pending_messages_.pop_front(); |
| } else { |
| stop_processing = true; |
| } |
| break; |
| case PendingMessage::Type::kCloseEvent: |
| channel()->Close(); |
| pending_messages_.pop_front(); |
| break; |
| case PendingMessage::Type::kBlobFailure: |
| pending_messages_.pop_front(); |
| break; |
| } |
| } |
| } |
| |
| void RTCDataChannel::PendingMessage::Trace(Visitor* visitor) const { |
| visitor->Trace(blob_reader_); |
| } |
| |
| void RTCDataChannel::BlobReader::DidFinishLoading(FileReaderData data) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DOMArrayBuffer* array_buffer = std::move(data).AsDOMArrayBuffer(); |
| webrtc::CopyOnWriteBuffer buffer( |
| static_cast<const char*>((array_buffer->Data())), |
| array_buffer->ByteLength()); |
| message_->buffer_ = webrtc::DataBuffer(buffer, true); |
| message_->type_ = RTCDataChannel::PendingMessage::Type::kBufferReady; |
| data_channel_->ProcessSendQueue(); |
| Dispose(); |
| } |
| |
| void RTCDataChannel::BlobReader::DidFail(FileErrorCode error) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| GetExecutionContext()->AddConsoleMessage(MakeGarbageCollected<ConsoleMessage>( |
| mojom::ConsoleMessageSource::kJavaScript, |
| mojom::ConsoleMessageLevel::kError, |
| "Couldn't read Blob content, skipping message.")); |
| message_->type_ = RTCDataChannel::PendingMessage::Type::kBlobFailure; |
| data_channel_->ProcessSendQueue(); |
| Dispose(); |
| } |
| |
| RTCDataChannel::BlobReader::BlobReader(ExecutionContext* context, |
| RTCDataChannel* data_channel, |
| PendingMessage* message) |
| : ExecutionContextLifecycleObserver(context), |
| loader_(MakeGarbageCollected<FileReaderLoader>( |
| this, |
| GetExecutionContext()->GetTaskRunner(TaskType::kFileReading))), |
| data_channel_(data_channel), |
| message_(message), |
| keep_alive_(this) {} |
| |
| RTCDataChannel::BlobReader::~BlobReader() = default; |
| |
| void RTCDataChannel::BlobReader::Start(Blob* blob) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| loader_->Start(blob->GetBlobDataHandle()); |
| } |
| |
| void RTCDataChannel::BlobReader::Trace(Visitor* visitor) const { |
| ExecutionContextLifecycleObserver::Trace(visitor); |
| FileReaderAccumulator::Trace(visitor); |
| visitor->Trace(loader_); |
| visitor->Trace(data_channel_); |
| visitor->Trace(message_); |
| } |
| |
| bool RTCDataChannel::BlobReader::HasFinishedLoading() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return loader_->HasFinishedLoading(); |
| } |
| |
| void RTCDataChannel::BlobReader::ContextDestroyed() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| loader_->Cancel(); |
| Dispose(); |
| } |
| |
| void RTCDataChannel::BlobReader::Dispose() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| keep_alive_.Clear(); |
| } |
| |
| } // namespace blink |