| // Copyright 2013 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| import 'dart:async'; |
| |
| /// Underflow errors happen when the socket feeding a buffer is finished while |
| /// there are still blocked readers. Each reader will complete with this error. |
| class UnderflowError extends Error { |
| /// The [message] describes the underflow. |
| UnderflowError([this.message]); |
| |
| final String? message; |
| |
| @override |
| String toString() { |
| if (message != null) { |
| return 'StreamBuffer Underflow: $message'; |
| } |
| return 'StreamBuffer Underflow'; |
| } |
| } |
| |
| /// Allow orderly reading of elements from a datastream, such as Socket, which |
| /// might not receive `List<int>` bytes regular chunks. |
| /// |
| /// Example usage: |
| /// |
| /// StreamBuffer<int> buffer = StreamBuffer(); |
| /// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); |
| /// buffer.read(100).then((bytes) { |
| /// // do something with 100 bytes; |
| /// }); |
| /// |
| /// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected |
| /// [Socket] disconnects. |
| class StreamBuffer<T> implements StreamConsumer<List<T>> { |
| /// Create a stream buffer with optional, soft [limit] to the amount of data |
| /// the buffer will hold before pausing the underlying stream. A limit of 0 |
| /// means no buffer limits. |
| StreamBuffer({bool throwOnError = false, int limit = 0}) |
| : _throwOnError = throwOnError, |
| _limit = limit; |
| |
| int _offset = 0; |
| int _counter = 0; // sum(_chunks[*].length) - _offset |
| final List<T> _chunks = []; |
| final List<_ReaderInWaiting<List<T>>> _readers = []; |
| StreamSubscription<List<T>>? _sub; |
| |
| final bool _throwOnError; |
| |
| Stream<List<T>>? _currentStream; |
| |
| int _limit = 0; |
| |
| set limit(int limit) { |
| _limit = limit; |
| if (_sub != null) { |
| if (!limited || _counter < limit) { |
| _sub!.resume(); |
| } else { |
| _sub!.pause(); |
| } |
| } |
| } |
| |
| int get limit => _limit; |
| |
| bool get limited => _limit > 0; |
| |
| /// The amount of unread data buffered. |
| int get buffered => _counter; |
| |
| List<T> _consume(int size) { |
| var follower = 0; |
| var ret = List<T?>.filled(size, null); |
| var leftToRead = size; |
| while (leftToRead > 0) { |
| var chunk = _chunks.first; |
| var listCap = (chunk is List) ? chunk.length - _offset : 1; |
| var subsize = leftToRead > listCap ? listCap : leftToRead; |
| if (chunk is List) { |
| ret.setRange(follower, follower + subsize, |
| chunk.getRange(_offset, _offset + subsize).cast<T>()); |
| } else { |
| ret[follower] = chunk; |
| } |
| follower += subsize; |
| _offset += subsize; |
| _counter -= subsize; |
| leftToRead -= subsize; |
| if (!(chunk is List && _offset < chunk.length)) { |
| _offset = 0; |
| _chunks.removeAt(0); |
| } |
| } |
| if (limited && _sub!.isPaused && _counter < limit) { |
| _sub!.resume(); |
| } |
| return ret.cast<T>(); |
| } |
| |
| /// Read fully [size] bytes from the stream and return in the future. |
| /// |
| /// Throws [ArgumentError] if size is larger than optional buffer [limit]. |
| Future<List<T>> read(int size) { |
| if (limited && size > limit) { |
| throw ArgumentError('Cannot read $size with limit $limit'); |
| } |
| |
| // If we have enough data to consume and there are no other readers, then |
| // we can return immediately. |
| if (size <= buffered && _readers.isEmpty) { |
| return Future<List<T>>.value(_consume(size)); |
| } |
| final completer = Completer<List<T>>(); |
| _readers.add(_ReaderInWaiting<List<T>>(size, completer)); |
| return completer.future; |
| } |
| |
| @override |
| Future addStream(Stream<List<T>> stream) { |
| var lastStream = _currentStream ?? stream; |
| _sub?.cancel(); |
| _currentStream = stream; |
| |
| final streamDone = Completer<void>(); |
| _sub = stream.listen((items) { |
| _chunks.addAll(items); |
| _counter += items.length; |
| if (limited && _counter >= limit) { |
| _sub!.pause(); |
| } |
| |
| while (_readers.isNotEmpty && _readers.first.size <= _counter) { |
| var waiting = _readers.removeAt(0); |
| waiting.completer.complete(_consume(waiting.size)); |
| } |
| }, onDone: () { |
| // User is piping in a new stream |
| if (stream == lastStream && _throwOnError) { |
| _closed(UnderflowError()); |
| } |
| streamDone.complete(); |
| }, onError: (e, stack) { |
| _closed(e, stack); |
| }); |
| return streamDone.future; |
| } |
| |
| void _closed(e, [StackTrace? stack]) { |
| for (final reader in _readers) { |
| if (!reader.completer.isCompleted) { |
| reader.completer.completeError(e, stack); |
| } |
| } |
| _readers.clear(); |
| } |
| |
| @override |
| Future close() { |
| final Future? ret = _sub?.cancel(); |
| _sub = null; |
| return ret ?? Future.value(null); |
| } |
| } |
| |
| class _ReaderInWaiting<T> { |
| _ReaderInWaiting(this.size, this.completer); |
| |
| int size; |
| Completer<T> completer; |
| } |