blob: cfa8481e3f97e2ae1a94328e047edc52f6303668 [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/quic/quic_stream_sequencer.h"
#include <algorithm>
#include <limits>
#include <utility>
#include "base/logging.h"
#include "net/quic/quic_clock.h"
#include "net/quic/quic_flags.h"
#include "net/quic/quic_frame_list.h"
#include "net/quic/quic_protocol.h"
#include "net/quic/reliable_quic_stream.h"
#include "net/quic/stream_sequencer_buffer.h"
using std::min;
using std::numeric_limits;
using std::string;
namespace net {
QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream,
const QuicClock* clock)
: stream_(quic_stream),
close_offset_(numeric_limits<QuicStreamOffset>::max()),
blocked_(false),
num_frames_received_(0),
num_duplicate_frames_received_(0),
num_early_frames_received_(0),
clock_(clock),
ignore_read_data_(false) {
if (FLAGS_quic_use_stream_sequencer_buffer) {
DVLOG(1) << "Use StreamSequencerBuffer for stream: " << stream_->id();
buffered_frames_.reset(
new StreamSequencerBuffer(kStreamReceiveWindowLimit));
} else {
buffered_frames_.reset(new QuicFrameList());
}
}
QuicStreamSequencer::~QuicStreamSequencer() {}
void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
++num_frames_received_;
const QuicStreamOffset byte_offset = frame.offset;
const size_t data_len = frame.frame_length;
if (data_len == 0 && !frame.fin) {
// Stream frames must have data or a fin flag.
stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
"Empty stream frame without FIN set.");
return;
}
if (frame.fin) {
CloseStreamAtOffset(frame.offset + data_len);
if (data_len == 0) {
return;
}
}
size_t bytes_written;
QuicErrorCode result = buffered_frames_->OnStreamData(
byte_offset, StringPiece(frame.frame_buffer, frame.frame_length),
clock_->ApproximateNow(), &bytes_written);
if (result == QUIC_INVALID_STREAM_DATA) {
stream_->CloseConnectionWithDetails(
QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
return;
}
if (result == QUIC_NO_ERROR && bytes_written == 0) {
++num_duplicate_frames_received_;
// Silently ignore duplicates.
return;
}
if (byte_offset > buffered_frames_->BytesConsumed()) {
++num_early_frames_received_;
}
if (blocked_) {
return;
}
if (byte_offset == buffered_frames_->BytesConsumed()) {
if (ignore_read_data_) {
FlushBufferedFrames();
} else {
stream_->OnDataAvailable();
}
}
}
void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
// If there is a scheduled close, the new offset should match it.
if (close_offset_ != kMaxOffset && offset != close_offset_) {
stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
return;
}
close_offset_ = offset;
MaybeCloseStream();
}
bool QuicStreamSequencer::MaybeCloseStream() {
if (blocked_ || !IsClosed()) {
return false;
}
DVLOG(1) << "Passing up termination, as we've processed "
<< buffered_frames_->BytesConsumed() << " of " << close_offset_
<< " bytes.";
// This will cause the stream to consume the FIN.
// Technically it's an error if |num_bytes_consumed| isn't exactly
// equal to |close_offset|, but error handling seems silly at this point.
if (ignore_read_data_) {
// The sequencer is discarding stream data and must notify the stream on
// receipt of a FIN because the consumer won't.
stream_->OnFinRead();
} else {
stream_->OnDataAvailable();
}
buffered_frames_->Clear();
return true;
}
int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const {
DCHECK(!blocked_);
return buffered_frames_->GetReadableRegions(iov, iov_len);
}
bool QuicStreamSequencer::GetReadableRegion(iovec* iov,
QuicTime* timestamp) const {
DCHECK(!blocked_);
return buffered_frames_->GetReadableRegion(iov, timestamp);
}
int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
DCHECK(!blocked_);
size_t bytes_read = buffered_frames_->Readv(iov, iov_len);
stream_->AddBytesConsumed(bytes_read);
return static_cast<int>(bytes_read);
}
bool QuicStreamSequencer::HasBytesToRead() const {
return buffered_frames_->HasBytesToRead();
}
bool QuicStreamSequencer::IsClosed() const {
return buffered_frames_->BytesConsumed() >= close_offset_;
}
void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
DCHECK(!blocked_);
bool result = buffered_frames_->MarkConsumed(num_bytes_consumed);
if (!result) {
LOG(DFATAL) << "Invalid argument to MarkConsumed."
<< " expect to consume: " << num_bytes_consumed
<< ", but not enough bytes available.";
stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
return;
}
stream_->AddBytesConsumed(num_bytes_consumed);
}
void QuicStreamSequencer::SetBlockedUntilFlush() {
blocked_ = true;
}
void QuicStreamSequencer::SetUnblocked() {
blocked_ = false;
if (IsClosed() || HasBytesToRead()) {
stream_->OnDataAvailable();
}
}
void QuicStreamSequencer::StopReading() {
if (ignore_read_data_) {
return;
}
ignore_read_data_ = true;
FlushBufferedFrames();
}
void QuicStreamSequencer::FlushBufferedFrames() {
DCHECK(ignore_read_data_);
size_t bytes_flushed = buffered_frames_->FlushBufferedFrames();
DVLOG(1) << "Flushing buffered data at offset "
<< buffered_frames_->BytesConsumed() << " length " << bytes_flushed
<< " for stream " << stream_->id();
stream_->AddBytesConsumed(bytes_flushed);
MaybeCloseStream();
}
size_t QuicStreamSequencer::NumBytesBuffered() const {
return buffered_frames_->BytesBuffered();
}
QuicStreamOffset QuicStreamSequencer::NumBytesConsumed() const {
return buffered_frames_->BytesConsumed();
}
} // namespace net