Refresh ReadableStream.pipeTo implementation up to spec
https://bugs.webkit.org/show_bug.cgi?id=215415
Reviewed by Alex Christensen.
LayoutTests/imported/w3c:
* web-platform-tests/fetch/api/response/response-stream-disturbed-by-pipe.any-expected.txt:
* web-platform-tests/fetch/api/response/response-stream-disturbed-by-pipe.any.worker-expected.txt:
* web-platform-tests/streams/idlharness.any-expected.txt:
* web-platform-tests/streams/idlharness.any.worker-expected.txt:
* web-platform-tests/streams/piping/abort.any-expected.txt:
* web-platform-tests/streams/piping/abort.any.worker-expected.txt:
* web-platform-tests/streams/piping/close-propagation-backward-expected.txt: Removed.
* web-platform-tests/streams/piping/close-propagation-backward.any-expected.txt:
* web-platform-tests/streams/piping/close-propagation-backward.any.worker-expected.txt:
* web-platform-tests/streams/piping/close-propagation-forward-expected.txt: Removed.
* web-platform-tests/streams/piping/close-propagation-forward.any-expected.txt:
* web-platform-tests/streams/piping/close-propagation-forward.any.worker-expected.txt:
* web-platform-tests/streams/piping/error-propagation-backward-expected.txt: Removed.
* web-platform-tests/streams/piping/error-propagation-backward.any-expected.txt:
* web-platform-tests/streams/piping/error-propagation-backward.any.worker-expected.txt:
* web-platform-tests/streams/piping/error-propagation-forward-expected.txt: Removed.
* web-platform-tests/streams/piping/error-propagation-forward.any-expected.txt:
* web-platform-tests/streams/piping/error-propagation-forward.any.worker-expected.txt:
* web-platform-tests/streams/piping/flow-control-expected.txt: Removed.
* web-platform-tests/streams/piping/flow-control.any-expected.txt:
* web-platform-tests/streams/piping/flow-control.any.worker-expected.txt:
* web-platform-tests/streams/piping/general-expected.txt: Removed.
* web-platform-tests/streams/piping/general.any-expected.txt:
* web-platform-tests/streams/piping/general.any.worker-expected.txt:
* web-platform-tests/streams/piping/multiple-propagation-expected.txt: Removed.
* web-platform-tests/streams/piping/multiple-propagation.any-expected.txt:
* web-platform-tests/streams/piping/multiple-propagation.any.worker-expected.txt:
* web-platform-tests/streams/piping/pipe-through-expected.txt: Removed.
* web-platform-tests/streams/piping/pipe-through.any-expected.txt:
* web-platform-tests/streams/piping/pipe-through.any.worker-expected.txt:
* web-platform-tests/streams/piping/then-interception.any-expected.txt:
* web-platform-tests/streams/piping/then-interception.any.worker-expected.txt:
* web-platform-tests/streams/piping/throwing-options.any-expected.txt:
* web-platform-tests/streams/piping/throwing-options.any.worker-expected.txt:
* web-platform-tests/streams/piping/transform-streams-expected.txt: Removed.
* web-platform-tests/streams/readable-streams/patched-global.any-expected.txt:
* web-platform-tests/streams/readable-streams/patched-global.any.worker-expected.txt:
* web-platform-tests/streams/readable-streams/reentrant-strategies.any-expected.txt:
* web-platform-tests/streams/readable-streams/reentrant-strategies.any.worker-expected.txt:
Source/WebCore:
Update pipeTo implementation as per specification.
Keep the old implementation if WritableStream is not enabled.
Covered by existing tests.
* Modules/streams/ReadableStream.js:
(pipeThrough):
* Modules/streams/ReadableStreamInternals.js:
(acquireReadableStreamDefaultReader):
(readableStreamPipeToWritableStream):
(pipeToLoop):
(pipeToErrorsMustBePropagatedForward):
(pipeToErrorsMustBePropagatedBackward):
(pipeToClosingMustBePropagatedForward):
(pipeToClosingMustBePropagatedBackward):
(pipeToShutdownWithAction):
(pipeToShutdown):
* Modules/streams/WritableStream.js:
(initializeWritableStream):
* bindings/js/JSDOMGlobalObject.cpp:
(WebCore::isWritableStreamAPIEnabled):
(WebCore::JSDOMGlobalObject::addBuiltinGlobals):
* bindings/js/WebCoreBuiltinNames.h:
* dom/AbortSignal.idl:
Tools:
Put console log in stderr as this makes some tests flaky otherwise.
* DumpRenderTree/TestOptions.cpp:
(shouldDumpJSConsoleLogInStdErr):
* WebKitTestRunner/TestOptions.cpp:
(WTR::shouldDumpJSConsoleLogInStdErr):
LayoutTests:
Re-enable piping tests.
Remove obsolete tests.
* TestExpectations:
* streams/readable-stream-pipeThrough-expected.txt:
* streams/readable-stream-pipeThrough.html:
Update according new API.
* streams/reference-implementation/pipe-to-expected.txt: Removed.
* streams/reference-implementation/pipe-to.html: Removed.
* streams/reference-implementation/pipe-through-expected.txt: Removed.
* streams/reference-implementation/pipe-through.html: Removed.
* streams/reference-implementation/pipe-to-options-expected.txt: Removed.
* streams/reference-implementation/pipe-to-options.html: Removed.
* streams/reference-implementation/readable-stream-templated-expected:
git-svn-id: http://svn.webkit.org/repository/webkit/trunk@266129 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/Source/WebCore/Modules/streams/ReadableStream.js b/Source/WebCore/Modules/streams/ReadableStream.js
index 9108ebc..161fce6 100644
--- a/Source/WebCore/Modules/streams/ReadableStream.js
+++ b/Source/WebCore/Modules/streams/ReadableStream.js
@@ -115,6 +115,41 @@
{
"use strict";
+ if (@writableStreamAPIEnabled()) {
+ if (!@isReadableStream(this))
+ throw @makeThisTypeError("ReadableStream", "getReader");
+
+ if (@isReadableStreamLocked(this))
+ throw @makeTypeError("ReadableStream is locked");
+
+ const transforms = streams;
+ const writable = transforms["writable"];
+
+ if (!@isWritableStream(writable))
+ throw @makeTypeError("writable should be WritableStream");
+
+ if (options === @undefined)
+ options = { };
+
+ let signal;
+ if ("signal" in options) {
+ signal = options["signal"];
+ if (!(signal instanceof @AbortSignal))
+ throw @makeTypeError("options.signal must be AbortSignal");
+ }
+
+ const preventClose = !!options["preventClose"];
+ const preventAbort = !!options["preventAbort"];
+ const preventCancel = !!options["preventCancel"];
+
+ if (@isWritableStreamLocked(writable))
+ throw @makeTypeError("WritableStream is locked");
+
+ @readableStreamPipeToWritableStream(this, writable, preventClose, preventAbort, preventCancel, signal);
+
+ return transforms["readable"];
+ }
+
const writable = streams.writable;
const readable = streams.readable;
const promise = this.pipeTo(writable, options);
@@ -129,7 +164,38 @@
// FIXME: https://bugs.webkit.org/show_bug.cgi?id=159869.
// Built-in generator should be able to parse function signature to compute the function length correctly.
- const options = arguments[1];
+ let options = arguments[1];
+
+ if (@writableStreamAPIEnabled()) {
+ if (!@isReadableStream(this))
+ return @Promise.@reject(@makeThisTypeError("ReadableStream", "pipeTo"));
+
+ if (!@isWritableStream(destination))
+ return @Promise.@reject(@makeTypeError("ReadableStream pipeTo requires a WritableStream"));
+
+ if (options === @undefined)
+ options = { };
+
+ // FIXME. We should catch exceptions and reject.
+ let signal;
+ if ("signal" in options) {
+ signal = options["signal"];
+ if (!(signal instanceof @AbortSignal))
+ return @Promise.@reject(@makeTypeError("options.signal must be AbortSignal"));
+ }
+
+ const preventClose = !!options["preventClose"];
+ const preventAbort = !!options["preventAbort"];
+ const preventCancel = !!options["preventCancel"];
+
+ if (@isReadableStreamLocked(this))
+ return @Promise.@reject(@makeTypeError("ReadableStream is locked"));
+
+ if (@isWritableStreamLocked(destination))
+ return @Promise.@reject(@makeTypeError("WritableStream is locked"));
+
+ return @readableStreamPipeToWritableStream(this, destination, preventClose, preventAbort, preventCancel, signal);
+ }
// FIXME: rewrite pipeTo so as to require to have 'this' as a ReadableStream and destination be a WritableStream.
// See https://github.com/whatwg/streams/issues/407.
diff --git a/Source/WebCore/Modules/streams/ReadableStreamInternals.js b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
index 07ef2ef..e20a519 100644
--- a/Source/WebCore/Modules/streams/ReadableStreamInternals.js
+++ b/Source/WebCore/Modules/streams/ReadableStreamInternals.js
@@ -131,6 +131,226 @@
doPipe();
}
+function acquireReadableStreamDefaultReader(stream)
+{
+ return new @ReadableStreamDefaultReader(stream);
+}
+
+// FIXME: Replace readableStreamPipeTo by below function.
+// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
+function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal)
+{
+ @assert(@isReadableStream(source));
+ @assert(@isWritableStream(destination));
+ @assert(!@isReadableStreamLocked(source));
+ @assert(!@isWritableStreamLocked(destination));
+ @assert(signal === @undefined || signal instanceof @AbortSignal);
+
+ if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined)
+ return @Promise.@reject("Piping of readable by strean is not supported");
+
+ let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal };
+
+ pipeState.reader = @acquireReadableStreamDefaultReader(source);
+ pipeState.writer = @acquireWritableStreamDefaultWriter(destination);
+
+ @putByIdDirectPrivate(source, "disturbed", true);
+
+ pipeState.shuttingDown = false;
+ pipeState.promiseCapability = @newPromiseCapability(@Promise);
+ pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+ pipeState.pendingReadPromiseCapability.@resolve.@call();
+ pipeState.pendingWritePromise = @Promise.@resolve();
+
+ // FIXME: Support signal.
+
+ @pipeToErrorsMustBePropagatedForward(pipeState);
+ @pipeToErrorsMustBePropagatedBackward(pipeState);
+ @pipeToClosingMustBePropagatedForward(pipeState);
+ @pipeToClosingMustBePropagatedBackward(pipeState);
+
+ @pipeToLoop(pipeState);
+
+ return pipeState.promiseCapability.@promise;
+}
+
+function pipeToLoop(pipeState)
+{
+ if (pipeState.shuttingDown)
+ return;
+
+ @pipeToDoReadWrite(pipeState).@then((result) => {
+ if (result)
+ @pipeToLoop(pipeState);
+ });
+}
+
+function pipeToDoReadWrite(pipeState)
+{
+ @assert(!pipeState.shuttingDown);
+
+ pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+ @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => {
+ if (pipeState.shuttingDown) {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ return;
+ }
+
+ @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => {
+ const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined;
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite);
+ if (!canWrite)
+ return;
+
+ pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value);
+ }, (e) => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ });
+ }, (e) => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ });
+ return pipeState.pendingReadPromiseCapability.@promise;
+}
+
+function pipeToErrorsMustBePropagatedForward(pipeState)
+{
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventAbort) {
+ @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+ };
+
+ if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) {
+ action();
+ return;
+ }
+
+ @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action);
+}
+
+function pipeToErrorsMustBePropagatedBackward(pipeState)
+{
+ const action = () => {
+ const error = @getByIdDirectPrivate(pipeState.destination, "storedError");
+ if (!pipeState.preventCancel) {
+ @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+ };
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") {
+ action();
+ return;
+ }
+ @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action);
+}
+
+function pipeToClosingMustBePropagatedForward(pipeState)
+{
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventClose) {
+ @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer));
+ return;
+ }
+ @pipeToShutdown(pipeState);
+ };
+ if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) {
+ action();
+ return;
+ }
+ @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined);
+}
+
+function pipeToClosingMustBePropagatedBackward(pipeState)
+{
+ if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed")
+ return;
+
+ // @assert no chunks have been read/written
+
+ const error = @makeTypeError("closing is propagated backward");
+ if (!pipeState.preventCancel) {
+ @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+}
+
+function pipeToShutdownWithAction(pipeState, action)
+{
+ if (pipeState.shuttingDown)
+ return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 2;
+ const error = arguments[2];
+ const finalize = () => {
+ const promise = action();
+ promise.@then(() => {
+ if (hasError)
+ @pipeToFinalize(pipeState, error);
+ else
+ @pipeToFinalize(pipeState);
+ }, (e) => {
+ @pipeToFinalize(pipeState, e);
+ });
+ };
+
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+ pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+ pipeState.pendingWritePromise.@then(finalize, finalize);
+ }, (e) => @pipeToFinalize(pipeState, e));
+ return;
+ }
+
+ finalize();
+}
+
+function pipeToShutdown(pipeState)
+{
+ if (pipeState.shuttingDown)
+ return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 1;
+ const error = arguments[1];
+ const finalize = () => {
+ if (hasError)
+ @pipeToFinalize(pipeState, error);
+ else
+ @pipeToFinalize(pipeState);
+ };
+
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+ pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+ pipeState.pendingWritePromise.@then(finalize, finalize);
+ }, (e) => @pipeToFinalize(pipeState, e));
+ return;
+ }
+ finalize();
+}
+
+function pipeToFinalize(pipeState)
+{
+ @writableStreamDefaultWriterRelease(pipeState.writer);
+ @readableStreamReaderGenericRelease(pipeState.reader);
+
+ // FIXME: Implement signal support.
+
+ if (arguments.length > 1)
+ pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]);
+ else
+ pipeState.promiseCapability.@resolve.@call();
+}
+
function readableStreamTee(stream, shouldClone)
{
"use strict";