Verify that ReadableStreamTee doesn't pull more chunks than the branch HWM (#16167)

ReadableStreamTee should only pull enough chunks to fill the HWM of the
branches (which is 1), and no more.

When pulling from the branches, verify that pull() on the original
stream still is only called enough to fill the emptiest branch.
diff --git a/streams/readable-streams/tee.any.js b/streams/readable-streams/tee.any.js
index ba78136..4a07f0f 100644
--- a/streams/readable-streams/tee.any.js
+++ b/streams/readable-streams/tee.any.js
@@ -1,6 +1,7 @@
 // META: global=worker,jsshell
 // META: script=../resources/rs-utils.js
 // META: script=../resources/test-utils.js
+// META: script=../resources/recording-streams.js
 'use strict';
 
 test(() => {
@@ -158,7 +159,7 @@
     })
   ]);
 
-}, 'ReadableStream teeing: canceling branch2 should not impact branch2');
+}, 'ReadableStream teeing: canceling branch2 should not impact branch1');
 
 promise_test(() => {
 
@@ -340,3 +341,111 @@
   assert_not_equals(getReader.call(rs2), undefined, 'getReader should work on rs2');
 
 }, 'ReadableStreamTee should not use a modified ReadableStream constructor from the global object');
+
+promise_test(t => {
+
+  const rs = recordingReadableStream({}, { highWaterMark: 0 });
+
+  // Create two branches, each with a HWM of 1. This should result in one
+  // chunk being pulled, not two.
+  rs.tee();
+  return flushAsyncEvents().then(() => {
+    assert_array_equals(rs.events, ['pull'], 'pull should only be called once');
+  });
+
+}, 'ReadableStreamTee should not pull more chunks than can fit in the branch queue');
+
+promise_test(t => {
+
+  const rs = recordingReadableStream({
+    pull(controller) {
+      controller.enqueue('a');
+    }
+  }, { highWaterMark: 0 });
+
+  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
+  return Promise.all([reader1.read(), reader2.read()])
+      .then(() => {
+    assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
+  });
+
+}, 'ReadableStreamTee should only pull enough to fill the emptiest queue');
+
+promise_test(t => {
+
+  const rs = recordingReadableStream({}, { highWaterMark: 0 });
+  const theError = { name: 'boo!' };
+
+  rs.controller.error(theError);
+
+  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
+
+  return flushAsyncEvents().then(() => {
+    assert_array_equals(rs.events, [], 'pull should not be called');
+
+    return Promise.all([
+      promise_rejects(t, theError, reader1.closed),
+      promise_rejects(t, theError, reader2.closed)
+    ]);
+  });
+
+}, 'ReadableStreamTee should not pull when original is already errored');
+
+for (const branch of [1, 2]) {
+  promise_test(t => {
+
+    const rs = recordingReadableStream({}, { highWaterMark: 0 });
+    const theError = { name: 'boo!' };
+
+    const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
+
+    return flushAsyncEvents().then(() => {
+      assert_array_equals(rs.events, ['pull'], 'pull should be called once');
+
+      rs.controller.enqueue('a');
+
+      const reader = (branch === 1) ? reader1 : reader2;
+      return reader.read();
+    }).then(() => flushAsyncEvents()).then(() => {
+      assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
+
+      rs.controller.error(theError);
+
+      return Promise.all([
+        promise_rejects(t, theError, reader1.closed),
+        promise_rejects(t, theError, reader2.closed)
+      ]);
+    }).then(() => flushAsyncEvents()).then(() => {
+      assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
+    });
+
+  }, `ReadableStreamTee stops pulling when original stream errors while branch ${branch} is reading`);
+}
+
+promise_test(t => {
+
+  const rs = recordingReadableStream({}, { highWaterMark: 0 });
+  const theError = { name: 'boo!' };
+
+  const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
+
+  return flushAsyncEvents().then(() => {
+    assert_array_equals(rs.events, ['pull'], 'pull should be called once');
+
+    rs.controller.enqueue('a');
+
+    return Promise.all([reader1.read(), reader2.read()]);
+  }).then(() => flushAsyncEvents()).then(() => {
+    assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
+
+    rs.controller.error(theError);
+
+    return Promise.all([
+      promise_rejects(t, theError, reader1.closed),
+      promise_rejects(t, theError, reader2.closed)
+    ]);
+  }).then(() => flushAsyncEvents()).then(() => {
+    assert_array_equals(rs.events, ['pull', 'pull'], 'pull should be called twice');
+  });
+
+}, 'ReadableStreamTee stops pulling when original stream errors while both branches are reading');