[Streams API] Implement ReadableStream tee
https://bugs.webkit.org/show_bug.cgi?id=146315

Reviewed by Darin Adler.

Source/WebCore:

Covered by rebased test.

* Modules/streams/ReadableStream.js:
(tee): Removing not implemented exception throwing.
* Modules/streams/ReadableStreamInternals.js:
(teeReadableStream): Implementing as per spec.
(teeReadableStreamPullFunction): Ditto.
(teeReadableStreamBranch2CancelFunction): Ditto.

LayoutTests:

* streams/reference-implementation/readable-stream-tee-expected.txt:


git-svn-id: http://svn.webkit.org/repository/webkit/trunk@191285 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/Source/WebCore/Modules/streams/ReadableStreamInternals.js b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
index 339918c..a77027a 100644
--- a/Source/WebCore/Modules/streams/ReadableStreamInternals.js
+++ b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
@@ -78,7 +78,99 @@
 {
     "use strict";
 
-    throw new @TypeError("tee is not implemented");
+    // TODO: Assert: IsReadableStream(stream) is true.
+    // TODO: Assert: Type(shouldClone) is Boolean.
+
+    let reader = stream.getReader();
+
+    let teeState = {
+        closedOrErrored: false,
+        canceled1: false,
+        canceled2: false,
+        reason1: undefined,
+        reason: undefined,
+    };
+
+    teeState.cancelPromise = new @InternalPromise(function(resolve, reject) {
+       teeState.resolvePromise = resolve;
+       teeState.rejectPromise = reject;
+    });
+
+    let pullFunction = @teeReadableStreamPullFunction(teeState, reader, shouldClone);
+
+    let underlyingSource1 = {
+        "pull": pullFunction,
+        "cancel": @teeReadableStreamBranch1CancelFunction(teeState, stream)
+    };
+
+    let underlyingSource2 = {
+        "pull": pullFunction,
+        "cancel": @teeReadableStreamBranch2CancelFunction(teeState, stream)
+    };
+
+    let branch1 = new ReadableStream(underlyingSource1);
+    let branch2 = new ReadableStream(underlyingSource2);
+
+    reader.closed.catch(function(e) {
+        if (teeState.closedOrErrored)
+            return;
+        @errorReadableStream(branch1, e);
+        @errorReadableStream(branch2, e);
+        teeState.closedOrErrored = true;
+    });
+
+    // Additional fields compared to the spec, as they are needed within pull/cancel functions.
+    teeState.branch1 = branch1;
+    teeState.branch2 = branch2;
+
+    return [branch1, branch2];
+}
+
+function teeReadableStreamPullFunction(teeState, reader, shouldClone)
+{
+    return function() {
+        reader.read().then(function(result) {
+            if (result.done && !teeState.closedOrErrored) {
+                @closeReadableStream(teeState.branch1);
+                @closeReadableStream(teeState.branch2);
+                teeState.closedOrErrored = true;
+            }
+            if (teeState.closedOrErrored)
+                return;
+            if (!teeState.canceled1) {
+                // TODO: Implement cloning if shouldClone is true
+                @enqueueInReadableStream(teeState.branch1, result.value);
+            }
+            if (!teeState.canceled2) {
+                // TODO: Implement cloning if shouldClone is true
+                @enqueueInReadableStream(teeState.branch2, result.value);
+            }
+        });
+    }
+}
+
+function teeReadableStreamBranch1CancelFunction(teeState, stream)
+{
+    return function(r) {
+        teeState.canceled1 = true;
+        teeState.reason1 = r;
+        if (teeState.canceled2) {
+            @cancelReadableStream(stream, [teeState.reason1, teeState.reason2]).then(teeState.resolvePromise, teeState.rejectPromise);
+        }
+        return teeState.cancelPromise;
+    }
+}
+
+function teeReadableStreamBranch2CancelFunction(teeState, stream)
+{
+    return function(r) {
+        teeState.canceled2 = true;
+        teeState.reason2 = r;
+        if (teeState.canceled1) {
+            @cancelReadableStream(stream, [teeState.reason1, teeState.reason2]).then(teeState.resolvePromise, teeState.rejectPromise);
+        }
+        return teeState.cancelPromise;
+    }
 }
 
 function isReadableStream(stream)