Refresh ReadableStream.pipeTo implementation up to spec
https://bugs.webkit.org/show_bug.cgi?id=215415

Reviewed by Alex Christensen.

LayoutTests/imported/w3c:

* web-platform-tests/fetch/api/response/response-stream-disturbed-by-pipe.any-expected.txt:
* web-platform-tests/fetch/api/response/response-stream-disturbed-by-pipe.any.worker-expected.txt:
* web-platform-tests/streams/idlharness.any-expected.txt:
* web-platform-tests/streams/idlharness.any.worker-expected.txt:
* web-platform-tests/streams/piping/abort.any-expected.txt:
* web-platform-tests/streams/piping/abort.any.worker-expected.txt:
* web-platform-tests/streams/piping/close-propagation-backward-expected.txt: Removed.
* web-platform-tests/streams/piping/close-propagation-backward.any-expected.txt:
* web-platform-tests/streams/piping/close-propagation-backward.any.worker-expected.txt:
* web-platform-tests/streams/piping/close-propagation-forward-expected.txt: Removed.
* web-platform-tests/streams/piping/close-propagation-forward.any-expected.txt:
* web-platform-tests/streams/piping/close-propagation-forward.any.worker-expected.txt:
* web-platform-tests/streams/piping/error-propagation-backward-expected.txt: Removed.
* web-platform-tests/streams/piping/error-propagation-backward.any-expected.txt:
* web-platform-tests/streams/piping/error-propagation-backward.any.worker-expected.txt:
* web-platform-tests/streams/piping/error-propagation-forward-expected.txt: Removed.
* web-platform-tests/streams/piping/error-propagation-forward.any-expected.txt:
* web-platform-tests/streams/piping/error-propagation-forward.any.worker-expected.txt:
* web-platform-tests/streams/piping/flow-control-expected.txt: Removed.
* web-platform-tests/streams/piping/flow-control.any-expected.txt:
* web-platform-tests/streams/piping/flow-control.any.worker-expected.txt:
* web-platform-tests/streams/piping/general-expected.txt: Removed.
* web-platform-tests/streams/piping/general.any-expected.txt:
* web-platform-tests/streams/piping/general.any.worker-expected.txt:
* web-platform-tests/streams/piping/multiple-propagation-expected.txt: Removed.
* web-platform-tests/streams/piping/multiple-propagation.any-expected.txt:
* web-platform-tests/streams/piping/multiple-propagation.any.worker-expected.txt:
* web-platform-tests/streams/piping/pipe-through-expected.txt: Removed.
* web-platform-tests/streams/piping/pipe-through.any-expected.txt:
* web-platform-tests/streams/piping/pipe-through.any.worker-expected.txt:
* web-platform-tests/streams/piping/then-interception.any-expected.txt:
* web-platform-tests/streams/piping/then-interception.any.worker-expected.txt:
* web-platform-tests/streams/piping/throwing-options.any-expected.txt:
* web-platform-tests/streams/piping/throwing-options.any.worker-expected.txt:
* web-platform-tests/streams/piping/transform-streams-expected.txt: Removed.
* web-platform-tests/streams/readable-streams/patched-global.any-expected.txt:
* web-platform-tests/streams/readable-streams/patched-global.any.worker-expected.txt:
* web-platform-tests/streams/readable-streams/reentrant-strategies.any-expected.txt:
* web-platform-tests/streams/readable-streams/reentrant-strategies.any.worker-expected.txt:

Source/WebCore:

Update pipeTo implementation as per specification.
Keep the old implementation if WritableStream is not enabled.

Covered by existing tests.

* Modules/streams/ReadableStream.js:
(pipeThrough):
* Modules/streams/ReadableStreamInternals.js:
(acquireReadableStreamDefaultReader):
(readableStreamPipeToWritableStream):
(pipeToLoop):
(pipeToErrorsMustBePropagatedForward):
(pipeToErrorsMustBePropagatedBackward):
(pipeToClosingMustBePropagatedForward):
(pipeToClosingMustBePropagatedBackward):
(pipeToShutdownWithAction):
(pipeToShutdown):
* Modules/streams/WritableStream.js:
(initializeWritableStream):
* bindings/js/JSDOMGlobalObject.cpp:
(WebCore::isWritableStreamAPIEnabled):
(WebCore::JSDOMGlobalObject::addBuiltinGlobals):
* bindings/js/WebCoreBuiltinNames.h:
* dom/AbortSignal.idl:

Tools:

Put console log in stderr as this makes some tests flaky otherwise.

* DumpRenderTree/TestOptions.cpp:
(shouldDumpJSConsoleLogInStdErr):
* WebKitTestRunner/TestOptions.cpp:
(WTR::shouldDumpJSConsoleLogInStdErr):

LayoutTests:

Re-enable piping tests.
Remove obsolete tests.

* TestExpectations:
* streams/readable-stream-pipeThrough-expected.txt:
* streams/readable-stream-pipeThrough.html:
Update according new API.
* streams/reference-implementation/pipe-to-expected.txt: Removed.
* streams/reference-implementation/pipe-to.html: Removed.
* streams/reference-implementation/pipe-through-expected.txt: Removed.
* streams/reference-implementation/pipe-through.html: Removed.
* streams/reference-implementation/pipe-to-options-expected.txt: Removed.
* streams/reference-implementation/pipe-to-options.html: Removed.
* streams/reference-implementation/readable-stream-templated-expected:


git-svn-id: http://svn.webkit.org/repository/webkit/trunk@266129 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/Source/WebCore/Modules/streams/ReadableStreamInternals.js b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
index 07ef2ef..e20a519 100644
--- a/Source/WebCore/Modules/streams/ReadableStreamInternals.js
+++ b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
@@ -131,6 +131,226 @@
     doPipe();
 }
 
+function acquireReadableStreamDefaultReader(stream)
+{
+    return new @ReadableStreamDefaultReader(stream);
+}
+
+// FIXME: Replace readableStreamPipeTo by below function.
+// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
+function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal)
+{
+    @assert(@isReadableStream(source));
+    @assert(@isWritableStream(destination));
+    @assert(!@isReadableStreamLocked(source));
+    @assert(!@isWritableStreamLocked(destination));
+    @assert(signal === @undefined || signal instanceof @AbortSignal);
+
+    if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined)
+        return @Promise.@reject("Piping of readable by strean is not supported");
+
+    let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal };
+
+    pipeState.reader = @acquireReadableStreamDefaultReader(source);
+    pipeState.writer = @acquireWritableStreamDefaultWriter(destination);
+
+    @putByIdDirectPrivate(source, "disturbed", true);
+
+    pipeState.shuttingDown = false;
+    pipeState.promiseCapability = @newPromiseCapability(@Promise);
+    pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+    pipeState.pendingReadPromiseCapability.@resolve.@call();
+    pipeState.pendingWritePromise = @Promise.@resolve();
+
+    // FIXME: Support signal.
+
+    @pipeToErrorsMustBePropagatedForward(pipeState);
+    @pipeToErrorsMustBePropagatedBackward(pipeState);
+    @pipeToClosingMustBePropagatedForward(pipeState);
+    @pipeToClosingMustBePropagatedBackward(pipeState);
+
+    @pipeToLoop(pipeState);
+
+    return pipeState.promiseCapability.@promise;
+}
+
+function pipeToLoop(pipeState)
+{
+    if (pipeState.shuttingDown)
+        return;
+
+    @pipeToDoReadWrite(pipeState).@then((result) => {
+        if (result)
+            @pipeToLoop(pipeState);
+    });
+}
+
+function pipeToDoReadWrite(pipeState)
+{
+    @assert(!pipeState.shuttingDown);
+
+    pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+    @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => {
+        if (pipeState.shuttingDown) {
+            pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+            return;
+        }
+
+        @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => {
+            const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined;
+            pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite);
+            if (!canWrite)
+                return;
+
+            pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value);
+        }, (e) => {
+            pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+        });
+    }, (e) => {
+        pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+    });
+    return pipeState.pendingReadPromiseCapability.@promise;
+}
+
+function pipeToErrorsMustBePropagatedForward(pipeState)
+{
+    const action = () => {
+        pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+        const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+        if (!pipeState.preventAbort) {
+            @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error);
+            return;
+        }
+        @pipeToShutdown(pipeState, error);
+    };
+
+    if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) {
+        action();
+        return;
+    }
+
+    @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action);
+}
+
+function pipeToErrorsMustBePropagatedBackward(pipeState)
+{
+    const action = () => {
+        const error = @getByIdDirectPrivate(pipeState.destination, "storedError");
+        if (!pipeState.preventCancel) {
+            @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+            return;
+        }
+        @pipeToShutdown(pipeState, error);
+    };
+    if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") {
+        action();
+        return;
+    }
+    @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action);
+}
+
+function pipeToClosingMustBePropagatedForward(pipeState)
+{
+    const action = () => {
+        pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+        const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+        if (!pipeState.preventClose) {
+            @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer));
+            return;
+        }
+        @pipeToShutdown(pipeState);
+    };
+    if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) {
+        action();
+        return;
+    }
+    @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined);
+}
+
+function pipeToClosingMustBePropagatedBackward(pipeState)
+{
+    if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed")
+        return;
+
+    // @assert no chunks have been read/written
+
+    const error = @makeTypeError("closing is propagated backward");
+    if (!pipeState.preventCancel) {
+        @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+        return;
+    }
+    @pipeToShutdown(pipeState, error);
+}
+
+function pipeToShutdownWithAction(pipeState, action)
+{
+    if (pipeState.shuttingDown)
+        return;
+
+    pipeState.shuttingDown = true;
+
+    const hasError = arguments.length > 2;
+    const error = arguments[2];
+    const finalize = () => {
+        const promise = action();
+        promise.@then(() => {
+            if (hasError)
+                @pipeToFinalize(pipeState, error);
+            else
+                @pipeToFinalize(pipeState);
+        }, (e)  => {
+            @pipeToFinalize(pipeState, e);
+        });
+    };
+
+    if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+        pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+            pipeState.pendingWritePromise.@then(finalize, finalize);
+        }, (e) => @pipeToFinalize(pipeState, e));
+        return;
+    }
+
+    finalize();
+}
+
+function pipeToShutdown(pipeState)
+{
+    if (pipeState.shuttingDown)
+        return;
+
+    pipeState.shuttingDown = true;
+
+    const hasError = arguments.length > 1;
+    const error = arguments[1];
+    const finalize = () => {
+        if (hasError)
+            @pipeToFinalize(pipeState, error);
+        else
+            @pipeToFinalize(pipeState);
+    };
+
+    if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+        pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+            pipeState.pendingWritePromise.@then(finalize, finalize);
+        }, (e) => @pipeToFinalize(pipeState, e));
+        return;
+    }
+    finalize();
+}
+
+function pipeToFinalize(pipeState)
+{
+    @writableStreamDefaultWriterRelease(pipeState.writer);
+    @readableStreamReaderGenericRelease(pipeState.reader);
+
+    // FIXME: Implement signal support.
+
+    if (arguments.length > 1)
+        pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]);
+    else
+        pipeState.promiseCapability.@resolve.@call();
+}
+
 function readableStreamTee(stream, shouldClone)
 {
     "use strict";