[Streams API] Implement ReadableStream tee
https://bugs.webkit.org/show_bug.cgi?id=146315
Reviewed by Darin Adler.
Source/WebCore:
Covered by rebased test.
* Modules/streams/ReadableStream.js:
(tee): Removing not implemented exception throwing.
* Modules/streams/ReadableStreamInternals.js:
(teeReadableStream): Implementing as per spec.
(teeReadableStreamPullFunction): Ditto.
(teeReadableStreamBranch2CancelFunction): Ditto.
LayoutTests:
* streams/reference-implementation/readable-stream-tee-expected.txt:
git-svn-id: http://svn.webkit.org/repository/webkit/trunk@191285 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/Source/WebCore/Modules/streams/ReadableStreamInternals.js b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
index 339918c..a77027a 100644
--- a/Source/WebCore/Modules/streams/ReadableStreamInternals.js
+++ b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
@@ -78,7 +78,99 @@
{
"use strict";
- throw new @TypeError("tee is not implemented");
+ // TODO: Assert: IsReadableStream(stream) is true.
+ // TODO: Assert: Type(shouldClone) is Boolean.
+
+ let reader = stream.getReader();
+
+ let teeState = {
+ closedOrErrored: false,
+ canceled1: false,
+ canceled2: false,
+ reason1: undefined,
+ reason: undefined,
+ };
+
+ teeState.cancelPromise = new @InternalPromise(function(resolve, reject) {
+ teeState.resolvePromise = resolve;
+ teeState.rejectPromise = reject;
+ });
+
+ let pullFunction = @teeReadableStreamPullFunction(teeState, reader, shouldClone);
+
+ let underlyingSource1 = {
+ "pull": pullFunction,
+ "cancel": @teeReadableStreamBranch1CancelFunction(teeState, stream)
+ };
+
+ let underlyingSource2 = {
+ "pull": pullFunction,
+ "cancel": @teeReadableStreamBranch2CancelFunction(teeState, stream)
+ };
+
+ let branch1 = new ReadableStream(underlyingSource1);
+ let branch2 = new ReadableStream(underlyingSource2);
+
+ reader.closed.catch(function(e) {
+ if (teeState.closedOrErrored)
+ return;
+ @errorReadableStream(branch1, e);
+ @errorReadableStream(branch2, e);
+ teeState.closedOrErrored = true;
+ });
+
+ // Additional fields compared to the spec, as they are needed within pull/cancel functions.
+ teeState.branch1 = branch1;
+ teeState.branch2 = branch2;
+
+ return [branch1, branch2];
+}
+
+function teeReadableStreamPullFunction(teeState, reader, shouldClone)
+{
+ return function() {
+ reader.read().then(function(result) {
+ if (result.done && !teeState.closedOrErrored) {
+ @closeReadableStream(teeState.branch1);
+ @closeReadableStream(teeState.branch2);
+ teeState.closedOrErrored = true;
+ }
+ if (teeState.closedOrErrored)
+ return;
+ if (!teeState.canceled1) {
+ // TODO: Implement cloning if shouldClone is true
+ @enqueueInReadableStream(teeState.branch1, result.value);
+ }
+ if (!teeState.canceled2) {
+ // TODO: Implement cloning if shouldClone is true
+ @enqueueInReadableStream(teeState.branch2, result.value);
+ }
+ });
+ }
+}
+
+function teeReadableStreamBranch1CancelFunction(teeState, stream)
+{
+ return function(r) {
+ teeState.canceled1 = true;
+ teeState.reason1 = r;
+ if (teeState.canceled2) {
+ @cancelReadableStream(stream, [teeState.reason1, teeState.reason2]).then(teeState.resolvePromise, teeState.rejectPromise);
+ }
+ return teeState.cancelPromise;
+ }
+}
+
+function teeReadableStreamBranch2CancelFunction(teeState, stream)
+{
+ return function(r) {
+ teeState.canceled2 = true;
+ teeState.reason2 = r;
+ if (teeState.canceled1) {
+ @cancelReadableStream(stream, [teeState.reason1, teeState.reason2]).then(teeState.resolvePromise, teeState.rejectPromise);
+ }
+ return teeState.cancelPromise;
+ }
}
function isReadableStream(stream)