// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "remoting/protocol/webrtc_data_stream_adapter.h"

#include <stdint.h>
#include <utility>

#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/callback.h"
#include "base/callback_helpers.h"
#include "base/location.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "net/base/net_errors.h"
#include "remoting/base/compound_buffer.h"
#include "remoting/protocol/message_serialization.h"

namespace remoting {
namespace protocol {

WebrtcDataStreamAdapter::WebrtcDataStreamAdapter(
    rtc::scoped_refptr<webrtc::DataChannelInterface> channel)
    : channel_(channel.get()), weak_ptr_factory_(this) {
  channel_->RegisterObserver(this);
  DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting);
}

WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() {
  if (channel_) {
    channel_->UnregisterObserver();
    channel_->Close();

    // Destroy |channel_| asynchronously as it may be on stack.
    base::SequencedTaskRunnerHandle::Get()->PostTask(
        FROM_HERE,
        base::BindOnce(base::DoNothing::Once<
                           rtc::scoped_refptr<webrtc::DataChannelInterface>>(),
                       std::move(channel_)));
  }
}

void WebrtcDataStreamAdapter::Start(EventHandler* event_handler) {
  DCHECK(!event_handler_);
  DCHECK(event_handler);

  event_handler_ = event_handler;
}

void WebrtcDataStreamAdapter::Send(google::protobuf::MessageLite* message,
                                   base::OnceClosure done) {
  // This shouldn't DCHECK in the CLOSED case, because the connection may be
  // abruptly closed at any time and the caller may not have been notified, yet.
  // The message will still be enqueued so that the outstanding done callbacks
  // are dropped at the expected time in the expected order.
  DCHECK(state_ != State::CONNECTING);

  rtc::CopyOnWriteBuffer buffer;
  buffer.SetSize(message->ByteSize());
  message->SerializeWithCachedSizesToArray(
      reinterpret_cast<uint8_t*>(buffer.data()));
  pending_messages_.emplace(
      webrtc::DataBuffer(std::move(buffer), true /* binary */),
      std::move(done));

  SendMessagesIfReady();
}

void WebrtcDataStreamAdapter::SendMessagesIfReady() {
  // We use our own send queue instead of queuing multiple messages in the
  // data-channel queue so we can invoke the done callback as close to the
  // message actually being sent as possible and avoid overrunning the data-
  // channel queue. There is also lower-level buffering beneath the data-channel
  // queue, which we do want to keep full to ensure the link is fully utilized.

  // Send messages to the data channel until it has to add one to its own queue.
  // This ensures that the lower-level buffers remain full.
  while (state_ == State::OPEN && channel_->buffered_amount() == 0 &&
         !pending_messages_.empty()) {
    PendingMessage message = std::move(pending_messages_.front());
    pending_messages_.pop();
    if (!channel_->Send(std::move(message.buffer))) {
      LOG(ERROR) << "Send failed on data channel " << channel_->label();
      channel_->Close();
      return;
    }

    if (message.done_callback) {
      // Invoke callback asynchronously to avoid nested calls to Send.
      base::SequencedTaskRunnerHandle::Get()->PostTask(
          FROM_HERE, std::move(message.done_callback));
    }
  }
}

void WebrtcDataStreamAdapter::OnStateChange() {
  switch (channel_->state()) {
    case webrtc::DataChannelInterface::kOpen:
      DCHECK(state_ == State::CONNECTING);
      state_ = State::OPEN;
      base::SequencedTaskRunnerHandle::Get()->PostTask(
          FROM_HERE, base::BindOnce(&WebrtcDataStreamAdapter::InvokeOpenEvent,
                                    weak_ptr_factory_.GetWeakPtr()));
      break;

    case webrtc::DataChannelInterface::kClosing:
      if (state_ != State::CLOSED) {
        state_ = State::CLOSED;
        base::SequencedTaskRunnerHandle::Get()->PostTask(
            FROM_HERE,
            base::BindOnce(&WebrtcDataStreamAdapter::InvokeClosedEvent,
                           weak_ptr_factory_.GetWeakPtr()));
      }
      break;

    case webrtc::DataChannelInterface::kConnecting:
    case webrtc::DataChannelInterface::kClosed:
      break;
  }
}

void WebrtcDataStreamAdapter::OnMessage(const webrtc::DataBuffer& rtc_buffer) {
  if (state_ != State::OPEN) {
    LOG(ERROR) << "Dropping a message received when the channel is not open.";
    return;
  }

  std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer());
  buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()),
                       rtc_buffer.data.size());
  buffer->Lock();
  base::SequencedTaskRunnerHandle::Get()->PostTask(
      FROM_HERE,
      base::BindOnce(&WebrtcDataStreamAdapter::InvokeMessageEvent,
                     weak_ptr_factory_.GetWeakPtr(), std::move(buffer)));
}

void WebrtcDataStreamAdapter::OnBufferedAmountChange(uint64_t previous_amount) {
  // WebRTC explicitly doesn't support sending from observer callbacks, so post
  // a task to let the stack unwind.
  base::SequencedTaskRunnerHandle::Get()->PostTask(
      FROM_HERE, base::BindOnce(&WebrtcDataStreamAdapter::SendMessagesIfReady,
                                weak_ptr_factory_.GetWeakPtr()));
}

void WebrtcDataStreamAdapter::InvokeOpenEvent() {
  event_handler_->OnMessagePipeOpen();
}

void WebrtcDataStreamAdapter::InvokeClosedEvent() {
  event_handler_->OnMessagePipeClosed();
}

void WebrtcDataStreamAdapter::InvokeMessageEvent(
    std::unique_ptr<CompoundBuffer> buffer) {
  event_handler_->OnMessageReceived(std::move(buffer));
}

WebrtcDataStreamAdapter::PendingMessage::PendingMessage(
    webrtc::DataBuffer buffer,
    base::OnceClosure done_callback)
    : buffer(std::move(buffer)), done_callback(std::move(done_callback)) {}

WebrtcDataStreamAdapter::PendingMessage&
WebrtcDataStreamAdapter::PendingMessage::operator=(PendingMessage&&) = default;

WebrtcDataStreamAdapter::PendingMessage::PendingMessage(PendingMessage&&) =
    default;

WebrtcDataStreamAdapter::PendingMessage::~PendingMessage() = default;

}  // namespace protocol
}  // namespace remoting
