Refresh ReadableStream.pipeTo implementation up to spec
https://bugs.webkit.org/show_bug.cgi?id=215415
Reviewed by Alex Christensen.
LayoutTests/imported/w3c:
* web-platform-tests/fetch/api/response/response-stream-disturbed-by-pipe.any-expected.txt:
* web-platform-tests/fetch/api/response/response-stream-disturbed-by-pipe.any.worker-expected.txt:
* web-platform-tests/streams/idlharness.any-expected.txt:
* web-platform-tests/streams/idlharness.any.worker-expected.txt:
* web-platform-tests/streams/piping/abort.any-expected.txt:
* web-platform-tests/streams/piping/abort.any.worker-expected.txt:
* web-platform-tests/streams/piping/close-propagation-backward-expected.txt: Removed.
* web-platform-tests/streams/piping/close-propagation-backward.any-expected.txt:
* web-platform-tests/streams/piping/close-propagation-backward.any.worker-expected.txt:
* web-platform-tests/streams/piping/close-propagation-forward-expected.txt: Removed.
* web-platform-tests/streams/piping/close-propagation-forward.any-expected.txt:
* web-platform-tests/streams/piping/close-propagation-forward.any.worker-expected.txt:
* web-platform-tests/streams/piping/error-propagation-backward-expected.txt: Removed.
* web-platform-tests/streams/piping/error-propagation-backward.any-expected.txt:
* web-platform-tests/streams/piping/error-propagation-backward.any.worker-expected.txt:
* web-platform-tests/streams/piping/error-propagation-forward-expected.txt: Removed.
* web-platform-tests/streams/piping/error-propagation-forward.any-expected.txt:
* web-platform-tests/streams/piping/error-propagation-forward.any.worker-expected.txt:
* web-platform-tests/streams/piping/flow-control-expected.txt: Removed.
* web-platform-tests/streams/piping/flow-control.any-expected.txt:
* web-platform-tests/streams/piping/flow-control.any.worker-expected.txt:
* web-platform-tests/streams/piping/general-expected.txt: Removed.
* web-platform-tests/streams/piping/general.any-expected.txt:
* web-platform-tests/streams/piping/general.any.worker-expected.txt:
* web-platform-tests/streams/piping/multiple-propagation-expected.txt: Removed.
* web-platform-tests/streams/piping/multiple-propagation.any-expected.txt:
* web-platform-tests/streams/piping/multiple-propagation.any.worker-expected.txt:
* web-platform-tests/streams/piping/pipe-through-expected.txt: Removed.
* web-platform-tests/streams/piping/pipe-through.any-expected.txt:
* web-platform-tests/streams/piping/pipe-through.any.worker-expected.txt:
* web-platform-tests/streams/piping/then-interception.any-expected.txt:
* web-platform-tests/streams/piping/then-interception.any.worker-expected.txt:
* web-platform-tests/streams/piping/throwing-options.any-expected.txt:
* web-platform-tests/streams/piping/throwing-options.any.worker-expected.txt:
* web-platform-tests/streams/piping/transform-streams-expected.txt: Removed.
* web-platform-tests/streams/readable-streams/patched-global.any-expected.txt:
* web-platform-tests/streams/readable-streams/patched-global.any.worker-expected.txt:
* web-platform-tests/streams/readable-streams/reentrant-strategies.any-expected.txt:
* web-platform-tests/streams/readable-streams/reentrant-strategies.any.worker-expected.txt:
Source/WebCore:
Update pipeTo implementation as per specification.
Keep the old implementation if WritableStream is not enabled.
Covered by existing tests.
* Modules/streams/ReadableStream.js:
(pipeThrough):
* Modules/streams/ReadableStreamInternals.js:
(acquireReadableStreamDefaultReader):
(readableStreamPipeToWritableStream):
(pipeToLoop):
(pipeToErrorsMustBePropagatedForward):
(pipeToErrorsMustBePropagatedBackward):
(pipeToClosingMustBePropagatedForward):
(pipeToClosingMustBePropagatedBackward):
(pipeToShutdownWithAction):
(pipeToShutdown):
* Modules/streams/WritableStream.js:
(initializeWritableStream):
* bindings/js/JSDOMGlobalObject.cpp:
(WebCore::isWritableStreamAPIEnabled):
(WebCore::JSDOMGlobalObject::addBuiltinGlobals):
* bindings/js/WebCoreBuiltinNames.h:
* dom/AbortSignal.idl:
Tools:
Put console log in stderr as this makes some tests flaky otherwise.
* DumpRenderTree/TestOptions.cpp:
(shouldDumpJSConsoleLogInStdErr):
* WebKitTestRunner/TestOptions.cpp:
(WTR::shouldDumpJSConsoleLogInStdErr):
LayoutTests:
Re-enable piping tests.
Remove obsolete tests.
* TestExpectations:
* streams/readable-stream-pipeThrough-expected.txt:
* streams/readable-stream-pipeThrough.html:
Update according new API.
* streams/reference-implementation/pipe-to-expected.txt: Removed.
* streams/reference-implementation/pipe-to.html: Removed.
* streams/reference-implementation/pipe-through-expected.txt: Removed.
* streams/reference-implementation/pipe-through.html: Removed.
* streams/reference-implementation/pipe-to-options-expected.txt: Removed.
* streams/reference-implementation/pipe-to-options.html: Removed.
* streams/reference-implementation/readable-stream-templated-expected:
git-svn-id: http://svn.webkit.org/repository/webkit/trunk@266129 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/Source/WebCore/Modules/streams/ReadableStreamInternals.js b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
index 07ef2ef..e20a519 100644
--- a/Source/WebCore/Modules/streams/ReadableStreamInternals.js
+++ b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
@@ -131,6 +131,226 @@
doPipe();
}
+function acquireReadableStreamDefaultReader(stream)
+{
+ return new @ReadableStreamDefaultReader(stream);
+}
+
+// FIXME: Replace readableStreamPipeTo by below function.
+// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
+function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal)
+{
+ @assert(@isReadableStream(source));
+ @assert(@isWritableStream(destination));
+ @assert(!@isReadableStreamLocked(source));
+ @assert(!@isWritableStreamLocked(destination));
+ @assert(signal === @undefined || signal instanceof @AbortSignal);
+
+ if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined)
+ return @Promise.@reject("Piping of readable by strean is not supported");
+
+ let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal };
+
+ pipeState.reader = @acquireReadableStreamDefaultReader(source);
+ pipeState.writer = @acquireWritableStreamDefaultWriter(destination);
+
+ @putByIdDirectPrivate(source, "disturbed", true);
+
+ pipeState.shuttingDown = false;
+ pipeState.promiseCapability = @newPromiseCapability(@Promise);
+ pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+ pipeState.pendingReadPromiseCapability.@resolve.@call();
+ pipeState.pendingWritePromise = @Promise.@resolve();
+
+ // FIXME: Support signal.
+
+ @pipeToErrorsMustBePropagatedForward(pipeState);
+ @pipeToErrorsMustBePropagatedBackward(pipeState);
+ @pipeToClosingMustBePropagatedForward(pipeState);
+ @pipeToClosingMustBePropagatedBackward(pipeState);
+
+ @pipeToLoop(pipeState);
+
+ return pipeState.promiseCapability.@promise;
+}
+
+function pipeToLoop(pipeState)
+{
+ if (pipeState.shuttingDown)
+ return;
+
+ @pipeToDoReadWrite(pipeState).@then((result) => {
+ if (result)
+ @pipeToLoop(pipeState);
+ });
+}
+
+function pipeToDoReadWrite(pipeState)
+{
+ @assert(!pipeState.shuttingDown);
+
+ pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+ @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => {
+ if (pipeState.shuttingDown) {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ return;
+ }
+
+ @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => {
+ const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined;
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite);
+ if (!canWrite)
+ return;
+
+ pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value);
+ }, (e) => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ });
+ }, (e) => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ });
+ return pipeState.pendingReadPromiseCapability.@promise;
+}
+
+function pipeToErrorsMustBePropagatedForward(pipeState)
+{
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventAbort) {
+ @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+ };
+
+ if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) {
+ action();
+ return;
+ }
+
+ @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action);
+}
+
+function pipeToErrorsMustBePropagatedBackward(pipeState)
+{
+ const action = () => {
+ const error = @getByIdDirectPrivate(pipeState.destination, "storedError");
+ if (!pipeState.preventCancel) {
+ @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+ };
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") {
+ action();
+ return;
+ }
+ @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action);
+}
+
+function pipeToClosingMustBePropagatedForward(pipeState)
+{
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventClose) {
+ @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer));
+ return;
+ }
+ @pipeToShutdown(pipeState);
+ };
+ if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) {
+ action();
+ return;
+ }
+ @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined);
+}
+
+function pipeToClosingMustBePropagatedBackward(pipeState)
+{
+ if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed")
+ return;
+
+ // @assert no chunks have been read/written
+
+ const error = @makeTypeError("closing is propagated backward");
+ if (!pipeState.preventCancel) {
+ @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+}
+
+function pipeToShutdownWithAction(pipeState, action)
+{
+ if (pipeState.shuttingDown)
+ return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 2;
+ const error = arguments[2];
+ const finalize = () => {
+ const promise = action();
+ promise.@then(() => {
+ if (hasError)
+ @pipeToFinalize(pipeState, error);
+ else
+ @pipeToFinalize(pipeState);
+ }, (e) => {
+ @pipeToFinalize(pipeState, e);
+ });
+ };
+
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+ pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+ pipeState.pendingWritePromise.@then(finalize, finalize);
+ }, (e) => @pipeToFinalize(pipeState, e));
+ return;
+ }
+
+ finalize();
+}
+
+function pipeToShutdown(pipeState)
+{
+ if (pipeState.shuttingDown)
+ return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 1;
+ const error = arguments[1];
+ const finalize = () => {
+ if (hasError)
+ @pipeToFinalize(pipeState, error);
+ else
+ @pipeToFinalize(pipeState);
+ };
+
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+ pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+ pipeState.pendingWritePromise.@then(finalize, finalize);
+ }, (e) => @pipeToFinalize(pipeState, e));
+ return;
+ }
+ finalize();
+}
+
+function pipeToFinalize(pipeState)
+{
+ @writableStreamDefaultWriterRelease(pipeState.writer);
+ @readableStreamReaderGenericRelease(pipeState.reader);
+
+ // FIXME: Implement signal support.
+
+ if (arguments.length > 1)
+ pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]);
+ else
+ pipeState.promiseCapability.@resolve.@call();
+}
+
function readableStreamTee(stream, shouldClone)
{
"use strict";