blob: 400d8b58d770bffe78042fc00a946ed3de475e46 [file] [log] [blame]
// Copyright 2013 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
/// Underflow errors happen when the socket feeding a buffer is finished while
/// there are still blocked readers. Each reader will complete with this error.
class UnderflowError extends Error {
/// The [message] describes the underflow.
UnderflowError([this.message]);
final String? message;
@override
String toString() {
if (message != null) {
return 'StreamBuffer Underflow: $message';
}
return 'StreamBuffer Underflow';
}
}
/// Allow orderly reading of elements from a datastream, such as Socket, which
/// might not receive `List<int>` bytes regular chunks.
///
/// Example usage:
///
/// StreamBuffer<int> buffer = StreamBuffer();
/// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
/// buffer.read(100).then((bytes) {
/// // do something with 100 bytes;
/// });
///
/// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
/// [Socket] disconnects.
class StreamBuffer<T> implements StreamConsumer<List<T>> {
/// Create a stream buffer with optional, soft [limit] to the amount of data
/// the buffer will hold before pausing the underlying stream. A limit of 0
/// means no buffer limits.
StreamBuffer({bool throwOnError = false, int limit = 0})
: _throwOnError = throwOnError,
_limit = limit;
int _offset = 0;
int _counter = 0; // sum(_chunks[*].length) - _offset
final List<T> _chunks = [];
final List<_ReaderInWaiting<List<T>>> _readers = [];
StreamSubscription<List<T>>? _sub;
final bool _throwOnError;
Stream<List<T>>? _currentStream;
int _limit = 0;
set limit(int limit) {
_limit = limit;
if (_sub != null) {
if (!limited || _counter < limit) {
_sub!.resume();
} else {
_sub!.pause();
}
}
}
int get limit => _limit;
bool get limited => _limit > 0;
/// The amount of unread data buffered.
int get buffered => _counter;
List<T> _consume(int size) {
var follower = 0;
var ret = List<T?>.filled(size, null);
var leftToRead = size;
while (leftToRead > 0) {
var chunk = _chunks.first;
var listCap = (chunk is List) ? chunk.length - _offset : 1;
var subsize = leftToRead > listCap ? listCap : leftToRead;
if (chunk is List) {
ret.setRange(follower, follower + subsize,
chunk.getRange(_offset, _offset + subsize).cast<T>());
} else {
ret[follower] = chunk;
}
follower += subsize;
_offset += subsize;
_counter -= subsize;
leftToRead -= subsize;
if (!(chunk is List && _offset < chunk.length)) {
_offset = 0;
_chunks.removeAt(0);
}
}
if (limited && _sub!.isPaused && _counter < limit) {
_sub!.resume();
}
return ret.cast<T>();
}
/// Read fully [size] bytes from the stream and return in the future.
///
/// Throws [ArgumentError] if size is larger than optional buffer [limit].
Future<List<T>> read(int size) {
if (limited && size > limit) {
throw ArgumentError('Cannot read $size with limit $limit');
}
// If we have enough data to consume and there are no other readers, then
// we can return immediately.
if (size <= buffered && _readers.isEmpty) {
return Future<List<T>>.value(_consume(size));
}
final completer = Completer<List<T>>();
_readers.add(_ReaderInWaiting<List<T>>(size, completer));
return completer.future;
}
@override
Future addStream(Stream<List<T>> stream) {
var lastStream = _currentStream ?? stream;
_sub?.cancel();
_currentStream = stream;
final streamDone = Completer<void>();
_sub = stream.listen((items) {
_chunks.addAll(items);
_counter += items.length;
if (limited && _counter >= limit) {
_sub!.pause();
}
while (_readers.isNotEmpty && _readers.first.size <= _counter) {
var waiting = _readers.removeAt(0);
waiting.completer.complete(_consume(waiting.size));
}
}, onDone: () {
// User is piping in a new stream
if (stream == lastStream && _throwOnError) {
_closed(UnderflowError());
}
streamDone.complete();
}, onError: (e, stack) {
_closed(e, stack);
});
return streamDone.future;
}
void _closed(e, [StackTrace? stack]) {
for (final reader in _readers) {
if (!reader.completer.isCompleted) {
reader.completer.completeError(e, stack);
}
}
_readers.clear();
}
@override
Future close() {
final Future? ret = _sub?.cancel();
_sub = null;
return ret ?? Future.value(null);
}
}
class _ReaderInWaiting<T> {
_ReaderInWaiting(this.size, this.completer);
int size;
Completer<T> completer;
}