blob: d7e9e521e2d7e097aca4d0edfc30e79fea4de862 [file] [log] [blame]
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "remoting/protocol/rtp_video_reader.h"
#include "base/task.h"
#include "remoting/proto/video.pb.h"
#include "remoting/protocol/session.h"
namespace remoting {
namespace protocol {
namespace {
const int kMaxPacketsInQueue = 1024;
const int kReceiverReportsIntervalMs = 1000;
} // namespace
RtpVideoReader::PacketsQueueEntry::PacketsQueueEntry()
: received(false),
packet(NULL) {
}
RtpVideoReader::RtpVideoReader()
: last_sequence_number_(0) {
}
RtpVideoReader::~RtpVideoReader() {
ResetQueue();
}
void RtpVideoReader::Init(protocol::Session* session, VideoStub* video_stub) {
rtp_reader_.Init(session->video_rtp_channel(),
NewCallback(this, &RtpVideoReader::OnRtpPacket));
rtcp_writer_.Init(session->video_rtcp_channel());
video_stub_ = video_stub;
}
void RtpVideoReader::Close() {
rtp_reader_.Close();
}
void RtpVideoReader::ResetQueue() {
for (PacketsQueue::iterator it = packets_queue_.begin();
it != packets_queue_.end(); ++it) {
delete it->packet;
}
packets_queue_.assign(kMaxPacketsInQueue, PacketsQueueEntry());
}
void RtpVideoReader::OnRtpPacket(const RtpPacket* rtp_packet) {
uint32 sequence_number = rtp_packet->extended_sequence_number();
int32 relative_number = sequence_number - last_sequence_number_;
int packet_index;
if (packets_queue_.empty()) {
// This is the first packet we've received. Setup the queue.
ResetQueue();
last_sequence_number_ = sequence_number;
packet_index = packets_queue_.size() - 1;
} else if (relative_number > 0) {
if (relative_number > kMaxPacketsInQueue) {
// Sequence number jumped too much for some reason. Reset the queue.
ResetQueue();
} else {
packets_queue_.resize(packets_queue_.size() + relative_number);
// Cleanup old packets, so that we don't have more than
// |kMaxPacketsInQueue| packets.
while (static_cast<int>(packets_queue_.size()) > kMaxPacketsInQueue) {
delete packets_queue_.front().packet;
packets_queue_.pop_front();
}
}
last_sequence_number_ = sequence_number;
packet_index = packets_queue_.size() - 1;
} else {
packet_index = packets_queue_.size() - 1 + relative_number;
if (packet_index < 0) {
// The packet is too old. Just drop it.
delete rtp_packet;
return;
}
}
CHECK_LT(packet_index, static_cast<int>(packets_queue_.size()));
if (packets_queue_[packet_index].received) {
VLOG(1) << "Received duplicate packet with sequence number "
<< sequence_number;
delete rtp_packet;
return;
}
packets_queue_[packet_index].packet = rtp_packet;
packets_queue_[packet_index].received = true;
CheckFullPacket(packets_queue_.begin() + packet_index);
}
void RtpVideoReader::CheckFullPacket(PacketsQueue::iterator pos) {
if (pos->packet->vp8_descriptor().fragmentation_info ==
Vp8Descriptor::NOT_FRAGMENTED) {
// The packet is not fragmented.
RebuildVideoPacket(pos, pos);
return;
}
PacketsQueue::iterator first = pos;
while (first > packets_queue_.begin() && first->packet &&
first->packet->vp8_descriptor().fragmentation_info !=
Vp8Descriptor::FIRST_FRAGMENT) {
first--;
}
if (!first->packet || first->packet->vp8_descriptor().fragmentation_info !=
Vp8Descriptor::FIRST_FRAGMENT) {
// We don't have first fragment.
return;
}
PacketsQueue::iterator last = pos;
while (last < (packets_queue_.end() - 1) && last->packet &&
last->packet->vp8_descriptor().fragmentation_info !=
Vp8Descriptor::LAST_FRAGMENT) {
last++;
}
if (!last->packet || last->packet->vp8_descriptor().fragmentation_info !=
Vp8Descriptor::LAST_FRAGMENT) {
// We don't have last fragment.
return;
}
// We've found first and last fragments, and we have all fragments in the
// middle, so we can rebuild fill packet.
RebuildVideoPacket(first, last);
}
void RtpVideoReader::RebuildVideoPacket(PacketsQueue::iterator first,
PacketsQueue::iterator last) {
VideoPacket* packet = new VideoPacket();
// Set flags.
if (first->packet->vp8_descriptor().frame_beginning)
packet->set_flags(packet->flags() | VideoPacket::FIRST_PACKET);
if (last->packet->header().marker)
packet->set_flags(packet->flags() | VideoPacket::LAST_PACKET);
packet->set_timestamp(first->packet->header().timestamp);
// Rebuild packet content from the fragments.
// TODO(sergeyu): Use CompoundBuffer inside of VideoPacket, so that we don't
// need to memcopy any data.
CompoundBuffer content;
for (PacketsQueue::iterator it = first; it <= last; ++it) {
content.Append(it->packet->payload());
// Delete packet because we don't need it anymore.
delete it->packet;
it->packet = NULL;
// Here we keep |received| flag set to true, so that duplicate RTP
// packets will be ignored.
}
packet->mutable_data()->resize(content.total_bytes());
content.CopyTo(const_cast<char*>(packet->mutable_data()->data()),
packet->data().size());
// Set format.
packet->mutable_format()->set_encoding(VideoPacketFormat::ENCODING_VP8);
video_stub_->ProcessVideoPacket(packet, new DeleteTask<VideoPacket>(packet));
SendReceiverReportIf();
}
void RtpVideoReader::SendReceiverReportIf() {
base::Time now = base::Time::Now();
// Send receiver report only if we haven't sent any bofore, or
// enough time has passed since the last report.
if (last_receiver_report_.is_null() ||
(now - last_receiver_report_).InMilliseconds() >
kReceiverReportsIntervalMs) {
RtcpReceiverReport report;
rtp_reader_.GetReceiverReport(&report);
rtcp_writer_.SendReport(report);
last_receiver_report_ = now;
}
}
} // namespace protocol
} // namespace remoting