Implement RTCQuicStream.waitForReadable() (#14103)

[Compared to the original Blink CL, this commit has been rebased on top of
 #14028]

Bug: 874296
Tbr: hbos@chromium.org
Change-Id: I1d76f979d9ff4cfcec0b5d5a34663dc48bb94056
Reviewed-on: https://chromium-review.googlesource.com/c/1289698
Commit-Queue: Steve Anton <steveanton@chromium.org>
Reviewed-by: Steve Anton <steveanton@chromium.org>
Cr-Commit-Position: refs/heads/master@{#608921}
diff --git a/webrtc-quic/RTCQuicStream.https.html b/webrtc-quic/RTCQuicStream.https.html
index ff78f3d..08c3a54 100644
--- a/webrtc-quic/RTCQuicStream.https.html
+++ b/webrtc-quic/RTCQuicStream.https.html
@@ -121,6 +121,15 @@
   const [ localQuicTransport, remoteQuicTransport ] =
     await makeTwoConnectedQuicTransports(t);
   const localStream = localQuicTransport.createStream();
+  const promise = localStream.waitForReadable(1);
+  localStream.reset();
+  await promise_rejects(t, 'InvalidStateError', promise);
+}, 'reset() rejects pending waitForReadable() promises.');
+
+promise_test(async t => {
+  const [ localQuicTransport, remoteQuicTransport ] =
+    await makeTwoConnectedQuicTransports(t);
+  const localStream = localQuicTransport.createStream();
   localStream.write(new Uint8Array(0));
   assert_equals(localStream.writeBufferedAmount, 0);
 }, 'write() with an empty array does nothing.');
@@ -290,4 +299,116 @@
   assert_throws('InvalidStateError', () => stream.readInto(new Uint8Array(1)));
 }, 'readInto() throws InvalidStateError.');
 
+promise_test(async t => {
+  const [ localQuicTransport, remoteQuicTransport ] =
+      await makeTwoConnectedQuicTransports(t);
+  const localStream = localQuicTransport.createStream();
+  localStream.write(new Uint8Array([ 65 ]));
+  const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
+  const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
+  await remoteStream.waitForReadable(1);
+  assert_equals(remoteStream.readBufferedAmount, 1);
+  const readBuffer = new Uint8Array(3);
+  assert_object_equals(
+      remoteStream.readInto(readBuffer),
+      { amount: 1, finished: false });
+  assert_array_equals(readBuffer, [ 65, 0, 0 ]);
+  assert_equals(remoteStream.readBufferedAmount, 0);
+}, 'Read 1 byte.');
+
+// Returns a Uint8Array of length |amount| with generated data.
+function generateData(amount) {
+  const data = new Uint8Array(amount);
+  for (let i = 0; i < data.length; i++) {
+    data[i] = i % 256;
+  }
+  return data;
+}
+
+// Writes |amount| of bytes to the given RTCQuicStream in maxWriteBufferedAmount
+// chunks.
+async function writeGeneratedData(stream, amount) {
+  const data = generateData(Math.min(stream.maxWriteBufferedAmount, amount));
+  while (amount > 0) {
+    const chunkSize = Math.min(stream.maxWriteBufferedAmount, amount);
+    await stream.waitForWriteBufferedAmountBelow(0);
+    stream.write(data.subarray(0, chunkSize));
+    amount -= chunkSize;
+  }
+}
+
+promise_test(async t => {
+  const [ localQuicTransport, remoteQuicTransport ] =
+      await makeTwoConnectedQuicTransports(t);
+  const localStream = localQuicTransport.createStream();
+  const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
+  writeGeneratedData(localStream, localStream.maxReadBufferedAmount);
+  const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
+  await remoteStream.waitForReadable(localStream.maxReadBufferedAmount);
+  const readBuffer = new Uint8Array(localStream.maxReadBufferedAmount);
+  assert_object_equals(
+      remoteStream.readInto(readBuffer),
+      { amount: localStream.maxReadBufferedAmount, finished: false });
+  assert_array_equals(
+      readBuffer, generateData(localStream.maxReadBufferedAmount));
+}, 'Read maxReadBufferedAmount bytes all at once.');
+
+promise_test(async t => {
+  const [ localQuicTransport, remoteQuicTransport ] =
+      await makeTwoConnectedQuicTransports(t);
+  const localStream = localQuicTransport.createStream();
+  const writeData = generateData(10);
+  localStream.write(writeData);
+  localStream.finish();
+  const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
+  const { stream: remoteStream } = await remoteWatcher.wait_for('quicstream');
+  await remoteStream.waitForReadable(11);
+  assert_equals(remoteStream.readBufferedAmount, 10);
+  const readBuffer = new Uint8Array(10);
+  assert_object_equals(
+    remoteStream.readInto(readBuffer), { amount: 10, finished: true });
+  assert_array_equals(readBuffer, writeData);
+}, 'waitForReadable() resolves early if remote fin is received.');
+
+promise_test(async t => {
+  const [ localQuicTransport, remoteQuicTransport ] =
+      await makeTwoConnectedQuicTransports(t);
+  const localStream = localQuicTransport.createStream();
+  await promise_rejects(t, new TypeError(),
+      localStream.waitForReadable(localStream.maxReadBufferedAmount + 1));
+}, 'waitForReadable() rejects with TypeError if amount is more than ' +
+  'maxReadBufferedAmount.');
+
+promise_test(async t => {
+  const [ localQuicTransport, remoteQuicTransport ] =
+      await makeTwoConnectedQuicTransports(t);
+  const localStream = localQuicTransport.createStream();
+  const promise1 = localStream.waitForReadable(10);
+  const promise2 = localStream.waitForReadable(10);
+  localStream.reset();
+  await Promise.all([
+      promise_rejects(t, 'InvalidStateError', promise1),
+      promise_rejects(t, 'InvalidStateError', promise2)]);
+}, 'Pending waitForReadable() promises rejected after reset().');
+
+promise_test(async t => {
+  const [ localQuicTransport, remoteQuicTransport ] =
+      await makeTwoConnectedQuicTransports(t);
+  const localStream = localQuicTransport.createStream();
+  localStream.write(new Uint8Array(1));
+  const remoteWatcher = new EventWatcher(t, remoteQuicTransport, 'quicstream');
+  const { stream : remoteStream} = await remoteWatcher.wait_for('quicstream');
+  const promise1 = remoteStream.waitForReadable(10);
+  const promise2 = remoteStream.waitForReadable(10);
+  localStream.reset();
+  await Promise.all([
+      promise_rejects(t, 'InvalidStateError', promise1),
+      promise_rejects(t, 'InvalidStateError', promise2)]);
+}, 'Pending waitForReadable() promises rejected after remote reset().');
+
+closed_stream_test(async (t, stream) => {
+  await promise_rejects(t, 'InvalidStateError',
+      stream.waitForReadable(1));
+}, 'waitForReadable() rejects with InvalidStateError.');
+
 </script>