blob: 9dc545fba4e711df559f87267b90616756170d93 [file] [log] [blame]
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/jingle_glue/jingle_channel.h"
#include "base/lock.h"
#include "base/logging.h"
#include "base/message_loop.h"
#include "base/waitable_event.h"
#include "media/base/data_buffer.h"
#include "remoting/jingle_glue/jingle_thread.h"
#include "third_party/libjingle/source/talk/base/stream.h"
using media::DataBuffer;
namespace remoting {
// Size of a read buffer chunk in bytes.
const size_t kReadBufferSize = 4096;
JingleChannel::JingleChannel(Callback* callback)
: state_(INITIALIZING),
callback_(callback),
closed_(false),
event_handler_(this),
write_buffer_size_(0),
current_write_buf_pos_(0) {
DCHECK(callback_ != NULL);
}
// This constructor is only used in unit test.
JingleChannel::JingleChannel()
: state_(CLOSED),
closed_(false),
write_buffer_size_(0),
current_write_buf_pos_(0) {
}
JingleChannel::~JingleChannel() {
DCHECK(closed_ || stream_ == NULL);
}
void JingleChannel::Init(JingleThread* thread,
talk_base::StreamInterface* stream,
const std::string& jid) {
thread_ = thread;
stream_.reset(stream);
stream_->SignalEvent.connect(&event_handler_, &EventHandler::OnStreamEvent);
jid_ = jid;
// Initialize |state_|.
switch (stream->GetState()) {
case talk_base::SS_CLOSED:
SetState(CLOSED);
break;
case talk_base::SS_OPENING:
SetState(CONNECTING);
break;
case talk_base::SS_OPEN:
SetState(OPEN);
// Try to read in case there is something in the stream.
thread_->message_loop()->PostTask(
FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoRead));
break;
default:
NOTREACHED();
}
}
void JingleChannel::Write(scoped_refptr<DataBuffer> data) {
// Discard empty packets.
if (data->GetDataSize() != 0) {
AutoLock auto_lock(write_lock_);
write_queue_.push_back(data);
write_buffer_size_ += data->GetDataSize();
// Post event so that the data gets written in the tunnel thread.
thread_->message_loop()->PostTask(
FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoWrite));
}
}
void JingleChannel::DoRead() {
while (true) {
size_t bytes_to_read;
if (stream_->GetAvailable(&bytes_to_read)) {
// Return immediately if we know there is nothing to read.
if (bytes_to_read == 0)
return;
} else {
// Try to read kReadBufferSize if the stream doesn't support
// GetAvailable().
bytes_to_read = kReadBufferSize;
}
scoped_refptr<DataBuffer> buffer(
new DataBuffer(new uint8[bytes_to_read], kReadBufferSize));
size_t bytes_read;
int error;
talk_base::StreamResult result = stream_->Read(
buffer->GetWritableData(), bytes_to_read, &bytes_read, &error);
switch (result) {
case talk_base::SR_SUCCESS: {
DCHECK_GT(bytes_read, 0U);
buffer->SetDataSize(bytes_read);
{
AutoLock auto_lock(state_lock_);
// Drop received data if the channel is already closed.
if (!closed_)
callback_->OnPacketReceived(this, buffer);
}
break;
}
case talk_base::SR_BLOCK: {
return;
}
case talk_base::SR_EOS: {
SetState(CLOSED);
return;
}
case talk_base::SR_ERROR: {
SetState(FAILED);
return;
}
}
}
}
void JingleChannel::DoWrite() {
while (true) {
if (!current_write_buf_) {
AutoLock auto_lock(write_lock_);
if (write_queue_.empty())
break;
current_write_buf_ = write_queue_.front();
current_write_buf_pos_ = 0;
write_queue_.pop_front();
}
size_t bytes_written;
int error;
talk_base::StreamResult result = stream_->Write(
current_write_buf_->GetData() + current_write_buf_pos_,
current_write_buf_->GetDataSize() - current_write_buf_pos_,
&bytes_written, &error);
switch (result) {
case talk_base::SR_SUCCESS: {
current_write_buf_pos_ += bytes_written;
if (current_write_buf_pos_ >= current_write_buf_->GetDataSize())
current_write_buf_ = NULL;
{
AutoLock auto_lock(write_lock_);
write_buffer_size_ -= bytes_written;
}
break;
}
case talk_base::SR_BLOCK: {
return;
}
case talk_base::SR_EOS: {
SetState(CLOSED);
return;
}
case talk_base::SR_ERROR: {
SetState(FAILED);
return;
}
}
}
}
void JingleChannel::OnStreamEvent(talk_base::StreamInterface* stream,
int events, int error) {
if (events & talk_base::SE_OPEN)
SetState(OPEN);
if (state_ == OPEN && (events & talk_base::SE_WRITE))
DoWrite();
if (state_ == OPEN && (events & talk_base::SE_READ))
DoRead();
if (events & talk_base::SE_CLOSE)
SetState(CLOSED);
}
void JingleChannel::SetState(State new_state) {
if (new_state != state_) {
state_ = new_state;
{
AutoLock auto_lock(state_lock_);
if (!closed_)
callback_->OnStateChange(this, new_state);
}
}
}
void JingleChannel::Close() {
Close(NULL);
}
void JingleChannel::Close(Task* closed_task) {
{
AutoLock auto_lock(state_lock_);
if (closed_) {
// We are already closed.
if (closed_task)
thread_->message_loop()->PostTask(FROM_HERE, closed_task);
return;
}
closed_ = true;
if (closed_task)
closed_task_.reset(closed_task);
}
thread_->message_loop()->PostTask(
FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose));
}
void JingleChannel::DoClose() {
DCHECK(closed_);
stream_->Close();
stream_.reset();
// TODO(sergeyu): Even though we have called Close() for the stream, it
// doesn't mean that the p2p sessions has been closed. I.e. |closed_task_|
// is called too early. If the client is closed right after that the other
// side will not receive notification that the channel was closed.
if (closed_task_.get()) {
closed_task_->Run();
closed_task_.reset();
}
}
size_t JingleChannel::write_buffer_size() {
AutoLock auto_lock(write_lock_);
return write_buffer_size_;
}
} // namespace remoting