blob: 406b9ea482c6fd3730a5bdc2f9c3dd2cce5b30c4 [file] [log] [blame]
/*
* Copyright (C) 2015 Canon Inc.
* Copyright (C) 2015 Igalia
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
// @internal
function isWritableStream(stream)
{
"use strict";
return @isObject(stream) && !!@getByIdDirectPrivate(stream, "underlyingSink");
}
function isWritableStreamDefaultWriter(writer)
{
"use strict";
return @isObject(writer) && !!@getByIdDirectPrivate(writer, "closedPromise");
}
function acquireWritableStreamDefaultWriter(stream)
{
return new @WritableStreamDefaultWriter(stream);
}
// https://streams.spec.whatwg.org/#create-writable-stream
function createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)
{
@assert(typeof highWaterMark === "number" && !@isNaN(highWaterMark) && highWaterMark >= 0);
const internalStream = { };
@initializeWritableStreamSlots(internalStream, { });
const controller = new @WritableStreamDefaultController();
@setUpWritableStreamDefaultController(internalStream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm);
return @createWritableStreamFromInternal(internalStream);
}
function createInternalWritableStreamFromUnderlyingSink(underlyingSink, strategy)
{
"use strict";
const stream = { };
if (underlyingSink === @undefined)
underlyingSink = { };
if (strategy === @undefined)
strategy = { };
if (!@isObject(underlyingSink))
@throwTypeError("WritableStream constructor takes an object as first argument");
if ("type" in underlyingSink)
@throwRangeError("Invalid type is specified");
const sizeAlgorithm = @extractSizeAlgorithm(strategy);
const highWaterMark = @extractHighWaterMark(strategy, 1);
const underlyingSinkDict = { };
if ("start" in underlyingSink) {
underlyingSinkDict["start"] = underlyingSink["start"];
if (typeof underlyingSinkDict["start"] !== "function")
@throwTypeError("underlyingSink.start should be a function");
}
if ("write" in underlyingSink) {
underlyingSinkDict["write"] = underlyingSink["write"];
if (typeof underlyingSinkDict["write"] !== "function")
@throwTypeError("underlyingSink.write should be a function");
}
if ("close" in underlyingSink) {
underlyingSinkDict["close"] = underlyingSink["close"];
if (typeof underlyingSinkDict["close"] !== "function")
@throwTypeError("underlyingSink.close should be a function");
}
if ("abort" in underlyingSink) {
underlyingSinkDict["abort"] = underlyingSink["abort"];
if (typeof underlyingSinkDict["abort"] !== "function")
@throwTypeError("underlyingSink.abort should be a function");
}
@initializeWritableStreamSlots(stream, underlyingSink);
@setUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm);
return stream;
}
function initializeWritableStreamSlots(stream, underlyingSink)
{
@putByIdDirectPrivate(stream, "state", "writable");
@putByIdDirectPrivate(stream, "storedError", @undefined);
@putByIdDirectPrivate(stream, "writer", @undefined);
@putByIdDirectPrivate(stream, "controller", @undefined);
@putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined);
@putByIdDirectPrivate(stream, "closeRequest", @undefined);
@putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);
@putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
@putByIdDirectPrivate(stream, "writeRequests", []);
@putByIdDirectPrivate(stream, "backpressure", false);
@putByIdDirectPrivate(stream, "underlyingSink", underlyingSink);
}
function writableStreamCloseForBindings(stream)
{
if (@isWritableStreamLocked(stream))
return @Promise.@reject(@makeTypeError("WritableStream.close method can only be used on non locked WritableStream"));
if (@writableStreamCloseQueuedOrInFlight(stream))
return @Promise.@reject(@makeTypeError("WritableStream.close method can only be used on a being close WritableStream"));
return @writableStreamClose(stream);
}
function writableStreamAbortForBindings(stream, reason)
{
if (@isWritableStreamLocked(stream))
return @Promise.@reject(@makeTypeError("WritableStream.abort method can only be used on non locked WritableStream"));
return @writableStreamAbort(stream, reason);
}
function isWritableStreamLocked(stream)
{
return @getByIdDirectPrivate(stream, "writer") !== @undefined;
}
function setUpWritableStreamDefaultWriter(writer, stream)
{
if (@isWritableStreamLocked(stream))
@throwTypeError("WritableStream is locked");
@putByIdDirectPrivate(writer, "stream", stream);
@putByIdDirectPrivate(stream, "writer", writer);
const readyPromiseCapability = @newPromiseCapability(@Promise);
const closedPromiseCapability = @newPromiseCapability(@Promise);
@putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
@putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
const state = @getByIdDirectPrivate(stream, "state");
if (state === "writable") {
if (@writableStreamCloseQueuedOrInFlight(stream) || !@getByIdDirectPrivate(stream, "backpressure"))
readyPromiseCapability.@resolve.@call();
} else if (state === "erroring") {
readyPromiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(stream, "storedError"));
@markPromiseAsHandled(readyPromiseCapability.@promise);
} else if (state === "closed") {
readyPromiseCapability.@resolve.@call();
closedPromiseCapability.@resolve.@call();
} else {
@assert(state === "errored");
const storedError = @getByIdDirectPrivate(stream, "storedError");
readyPromiseCapability.@reject.@call(@undefined, storedError);
@markPromiseAsHandled(readyPromiseCapability.@promise);
closedPromiseCapability.@reject.@call(@undefined, storedError);
@markPromiseAsHandled(closedPromiseCapability.@promise);
}
}
function writableStreamAbort(stream, reason)
{
const state = @getByIdDirectPrivate(stream, "state");
if (state === "closed" || state === "errored")
return @Promise.@resolve();
const pendingAbortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
if (pendingAbortRequest !== @undefined)
return pendingAbortRequest.promise.@promise;
@assert(state === "writable" || state === "erroring");
let wasAlreadyErroring = false;
if (state === "erroring") {
wasAlreadyErroring = true;
reason = @undefined;
}
const abortPromiseCapability = @newPromiseCapability(@Promise);
@putByIdDirectPrivate(stream, "pendingAbortRequest", { promise : abortPromiseCapability, reason : reason, wasAlreadyErroring : wasAlreadyErroring });
if (!wasAlreadyErroring)
@writableStreamStartErroring(stream, reason);
return abortPromiseCapability.@promise;
}
function writableStreamClose(stream)
{
const state = @getByIdDirectPrivate(stream, "state");
if (state === "closed" || state === "errored")
return @Promise.@reject(@makeTypeError("Cannot close a writable stream that is closed or errored"));
@assert(state === "writable" || state === "erroring");
@assert(!@writableStreamCloseQueuedOrInFlight(stream));
const closePromiseCapability = @newPromiseCapability(@Promise);
@putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability);
const writer = @getByIdDirectPrivate(stream, "writer");
if (writer !== @undefined && @getByIdDirectPrivate(stream, "backpressure") && state === "writable")
@getByIdDirectPrivate(writer, "readyPromise").@resolve.@call();
@writableStreamDefaultControllerClose(@getByIdDirectPrivate(stream, "controller"));
return closePromiseCapability.@promise;
}
function writableStreamAddWriteRequest(stream)
{
@assert(@isWritableStreamLocked(stream))
@assert(@getByIdDirectPrivate(stream, "state") === "writable");
const writePromiseCapability = @newPromiseCapability(@Promise);
const writeRequests = @getByIdDirectPrivate(stream, "writeRequests");
@arrayPush(writeRequests, writePromiseCapability);
return writePromiseCapability.@promise;
}
function writableStreamCloseQueuedOrInFlight(stream)
{
return @getByIdDirectPrivate(stream, "closeRequest") !== @undefined || @getByIdDirectPrivate(stream, "inFlightCloseRequest") !== @undefined;
}
function writableStreamDealWithRejection(stream, error)
{
const state = @getByIdDirectPrivate(stream, "state");
if (state === "writable") {
@writableStreamStartErroring(stream, error);
return;
}
@assert(state === "erroring");
@writableStreamFinishErroring(stream);
}
function writableStreamFinishErroring(stream)
{
@assert(@getByIdDirectPrivate(stream, "state") === "erroring");
@assert(!@writableStreamHasOperationMarkedInFlight(stream));
@putByIdDirectPrivate(stream, "state", "errored");
const controller = @getByIdDirectPrivate(stream, "controller");
@getByIdDirectPrivate(controller, "errorSteps").@call();
const storedError = @getByIdDirectPrivate(stream, "storedError");
const requests = @getByIdDirectPrivate(stream, "writeRequests");
for (let index = 0, length = requests.length; index < length; ++index)
requests[index].@reject.@call(@undefined, storedError);
@putByIdDirectPrivate(stream, "writeRequests", []);
const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
if (abortRequest === @undefined) {
@writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
return;
}
@putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
if (abortRequest.wasAlreadyErroring) {
abortRequest.promise.@reject.@call(@undefined, storedError);
@writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
return;
}
@getByIdDirectPrivate(controller, "abortSteps").@call(@undefined, abortRequest.reason).@then(() => {
abortRequest.promise.@resolve.@call();
@writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
}, (reason) => {
abortRequest.promise.@reject.@call(@undefined, reason);
@writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
});
}
function writableStreamFinishInFlightClose(stream)
{
const inFlightCloseRequest = @getByIdDirectPrivate(stream, "inFlightCloseRequest");
inFlightCloseRequest.@resolve.@call();
@putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);
const state = @getByIdDirectPrivate(stream, "state");
@assert(state === "writable" || state === "erroring");
if (state === "erroring") {
@putByIdDirectPrivate(stream, "storedError", @undefined);
const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
if (abortRequest !== @undefined) {
abortRequest.promise.@resolve.@call();
@putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
}
}
@putByIdDirectPrivate(stream, "state", "closed");
const writer = @getByIdDirectPrivate(stream, "writer");
if (writer !== @undefined)
@getByIdDirectPrivate(writer, "closedPromise").@resolve.@call();
@assert(@getByIdDirectPrivate(stream, "pendingAbortRequest") === @undefined);
@assert(@getByIdDirectPrivate(stream, "storedError") === @undefined);
}
function writableStreamFinishInFlightCloseWithError(stream, error)
{
const inFlightCloseRequest = @getByIdDirectPrivate(stream, "inFlightCloseRequest");
@assert(inFlightCloseRequest !== @undefined);
inFlightCloseRequest.@reject.@call(@undefined, error);
@putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);
const state = @getByIdDirectPrivate(stream, "state");
@assert(state === "writable" || state === "erroring");
const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
if (abortRequest !== @undefined) {
abortRequest.promise.@reject.@call(@undefined, error);
@putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
}
@writableStreamDealWithRejection(stream, error);
}
function writableStreamFinishInFlightWrite(stream)
{
const inFlightWriteRequest = @getByIdDirectPrivate(stream, "inFlightWriteRequest");
@assert(inFlightWriteRequest !== @undefined);
inFlightWriteRequest.@resolve.@call();
@putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined);
}
function writableStreamFinishInFlightWriteWithError(stream, error)
{
const inFlightWriteRequest = @getByIdDirectPrivate(stream, "inFlightWriteRequest");
@assert(inFlightWriteRequest !== @undefined);
inFlightWriteRequest.@reject.@call(@undefined, error);
@putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined);
const state = @getByIdDirectPrivate(stream, "state");
@assert(state === "writable" || state === "erroring");
@writableStreamDealWithRejection(stream, error);
}
function writableStreamHasOperationMarkedInFlight(stream)
{
return @getByIdDirectPrivate(stream, "inFlightWriteRequest") !== @undefined || @getByIdDirectPrivate(stream, "inFlightCloseRequest") !== @undefined;
}
function writableStreamMarkCloseRequestInFlight(stream)
{
const closeRequest = @getByIdDirectPrivate(stream, "closeRequest");
@assert(@getByIdDirectPrivate(stream, "inFlightCloseRequest") === @undefined);
@assert(closeRequest !== @undefined);
@putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest);
@putByIdDirectPrivate(stream, "closeRequest", @undefined);
}
function writableStreamMarkFirstWriteRequestInFlight(stream)
{
const writeRequests = @getByIdDirectPrivate(stream, "writeRequests");
@assert(@getByIdDirectPrivate(stream, "inFlightWriteRequest") === @undefined);
@assert(writeRequests.length > 0);
const writeRequest = writeRequests.@shift();
@putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest);
}
function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream)
{
@assert(@getByIdDirectPrivate(stream, "state") === "errored");
const storedError = @getByIdDirectPrivate(stream, "storedError");
const closeRequest = @getByIdDirectPrivate(stream, "closeRequest");
if (closeRequest !== @undefined) {
@assert(@getByIdDirectPrivate(stream, "inFlightCloseRequest") === @undefined);
closeRequest.@reject.@call(@undefined, storedError);
@putByIdDirectPrivate(stream, "closeRequest", @undefined);
}
const writer = @getByIdDirectPrivate(stream, "writer");
if (writer !== @undefined) {
const closedPromise = @getByIdDirectPrivate(writer, "closedPromise");
closedPromise.@reject.@call(@undefined, storedError);
@markPromiseAsHandled(closedPromise.@promise);
}
}
function writableStreamStartErroring(stream, reason)
{
@assert(@getByIdDirectPrivate(stream, "storedError") === @undefined);
@assert(@getByIdDirectPrivate(stream, "state") === "writable");
const controller = @getByIdDirectPrivate(stream, "controller");
@assert(controller !== @undefined);
@putByIdDirectPrivate(stream, "state", "erroring");
@putByIdDirectPrivate(stream, "storedError", reason);
const writer = @getByIdDirectPrivate(stream, "writer");
if (writer !== @undefined)
@writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
if (!@writableStreamHasOperationMarkedInFlight(stream) && @getByIdDirectPrivate(controller, "started"))
@writableStreamFinishErroring(stream);
}
function writableStreamUpdateBackpressure(stream, backpressure)
{
@assert(@getByIdDirectPrivate(stream, "state") === "writable");
@assert(!@writableStreamCloseQueuedOrInFlight(stream));
const writer = @getByIdDirectPrivate(stream, "writer");
if (writer !== @undefined && backpressure !== @getByIdDirectPrivate(stream, "backpressure")) {
if (backpressure)
@putByIdDirectPrivate(writer, "readyPromise", @newPromiseCapability(@Promise));
else
@getByIdDirectPrivate(writer, "readyPromise").@resolve.@call();
}
@putByIdDirectPrivate(stream, "backpressure", backpressure);
}
function writableStreamDefaultWriterAbort(writer, reason)
{
const stream = @getByIdDirectPrivate(writer, "stream");
@assert(stream !== @undefined);
return @writableStreamAbort(stream, reason);
}
function writableStreamDefaultWriterClose(writer)
{
const stream = @getByIdDirectPrivate(writer, "stream");
@assert(stream !== @undefined);
return @writableStreamClose(stream);
}
function writableStreamDefaultWriterCloseWithErrorPropagation(writer)
{
const stream = @getByIdDirectPrivate(writer, "stream");
@assert(stream !== @undefined);
const state = @getByIdDirectPrivate(stream, "state");
if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
return @Promise.@resolve();
if (state === "errored")
return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
@assert(state === "writable" || state === "erroring");
return @writableStreamDefaultWriterClose(writer);
}
function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error)
{
let closedPromiseCapability = @getByIdDirectPrivate(writer, "closedPromise");
let closedPromise = closedPromiseCapability.@promise;
if ((@getPromiseInternalField(closedPromise, @promiseFieldFlags) & @promiseStateMask) !== @promiseStatePending) {
closedPromiseCapability = @newPromiseCapability(@Promise);
closedPromise = closedPromiseCapability.@promise;
@putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
}
closedPromiseCapability.@reject.@call(@undefined, error);
@markPromiseAsHandled(closedPromise);
}
function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error)
{
let readyPromiseCapability = @getByIdDirectPrivate(writer, "readyPromise");
let readyPromise = readyPromiseCapability.@promise;
if ((@getPromiseInternalField(readyPromise, @promiseFieldFlags) & @promiseStateMask) !== @promiseStatePending) {
readyPromiseCapability = @newPromiseCapability(@Promise);
readyPromise = readyPromiseCapability.@promise;
@putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
}
readyPromiseCapability.@reject.@call(@undefined, error);
@markPromiseAsHandled(readyPromise);
}
function writableStreamDefaultWriterGetDesiredSize(writer)
{
const stream = @getByIdDirectPrivate(writer, "stream");
@assert(stream !== @undefined);
const state = @getByIdDirectPrivate(stream, "state");
if (state === "errored" || state === "erroring")
return null;
if (state === "closed")
return 0;
return @writableStreamDefaultControllerGetDesiredSize(@getByIdDirectPrivate(stream, "controller"));
}
function writableStreamDefaultWriterRelease(writer)
{
const stream = @getByIdDirectPrivate(writer, "stream");
@assert(stream !== @undefined);
@assert(@getByIdDirectPrivate(stream, "writer") === writer);
const releasedError = @makeTypeError("writableStreamDefaultWriterRelease");
@writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
@writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
@putByIdDirectPrivate(stream, "writer", @undefined);
@putByIdDirectPrivate(writer, "stream", @undefined);
}
function writableStreamDefaultWriterWrite(writer, chunk)
{
const stream = @getByIdDirectPrivate(writer, "stream");
@assert(stream !== @undefined);
const controller = @getByIdDirectPrivate(stream, "controller");
@assert(controller !== @undefined);
const chunkSize = @writableStreamDefaultControllerGetChunkSize(controller, chunk);
if (stream !== @getByIdDirectPrivate(writer, "stream"))
return @Promise.@reject(@makeTypeError("writer is not stream's writer"));
const state = @getByIdDirectPrivate(stream, "state");
if (state === "errored")
return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
return @Promise.@reject(@makeTypeError("stream is closing or closed"));
if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
return @Promise.@reject(@makeTypeError("stream is closing or closed"));
if (state === "erroring")
return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
@assert(state === "writable");
const promise = @writableStreamAddWriteRequest(stream);
@writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
return promise;
}
function setUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)
{
@assert(@isWritableStream(stream));
@assert(@getByIdDirectPrivate(stream, "controller") === @undefined);
@putByIdDirectPrivate(controller, "stream", stream);
@putByIdDirectPrivate(stream, "controller", controller);
@resetQueue(@getByIdDirectPrivate(controller, "queue"));
@putByIdDirectPrivate(controller, "started", false);
@putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm);
@putByIdDirectPrivate(controller, "strategyHWM", highWaterMark);
@putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm);
@putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm);
@putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm);
const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
@writableStreamUpdateBackpressure(stream, backpressure);
@Promise.@resolve(startAlgorithm.@call()).@then(() => {
const state = @getByIdDirectPrivate(stream, "state");
@assert(state === "writable" || state === "erroring");
@putByIdDirectPrivate(controller, "started", true);
@writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}, (error) => {
const state = @getByIdDirectPrivate(stream, "state");
@assert(state === "writable" || state === "erroring");
@putByIdDirectPrivate(controller, "started", true);
@writableStreamDealWithRejection(stream, error);
});
}
function setUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm)
{
const controller = new @WritableStreamDefaultController();
let startAlgorithm = () => { };
let writeAlgorithm = () => { return @Promise.@resolve(); };
let closeAlgorithm = () => { return @Promise.@resolve(); };
let abortAlgorithm = () => { return @Promise.@resolve(); };
if ("start" in underlyingSinkDict) {
const startMethod = underlyingSinkDict["start"];
startAlgorithm = () => @promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]);
}
if ("write" in underlyingSinkDict) {
const writeMethod = underlyingSinkDict["write"];
writeAlgorithm = (chunk) => @promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]);
}
if ("close" in underlyingSinkDict) {
const closeMethod = underlyingSinkDict["close"];
closeAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []);
}
if ("abort" in underlyingSinkDict) {
const abortMethod = underlyingSinkDict["abort"];
abortAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]);
}
@setUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm);
}
function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller)
{
const stream = @getByIdDirectPrivate(controller, "stream");
if (!@getByIdDirectPrivate(controller, "started"))
return;
@assert(stream !== @undefined);
if (@getByIdDirectPrivate(stream, "inFlightWriteRequest") !== @undefined)
return;
const state = @getByIdDirectPrivate(stream, "state");
@assert(state !== "closed" || state !== "errored");
if (state === "erroring") {
@writableStreamFinishErroring(stream);
return;
}
if (@getByIdDirectPrivate(controller, "queue").content.length === 0)
return;
const value = @peekQueueValue(@getByIdDirectPrivate(controller, "queue"));
if (value === @isCloseSentinel)
@writableStreamDefaultControllerProcessClose(controller);
else
@writableStreamDefaultControllerProcessWrite(controller, value);
}
function isCloseSentinel()
{
}
function writableStreamDefaultControllerClearAlgorithms(controller)
{
@putByIdDirectPrivate(controller, "writeAlgorithm", @undefined);
@putByIdDirectPrivate(controller, "closeAlgorithm", @undefined);
@putByIdDirectPrivate(controller, "abortAlgorithm", @undefined);
@putByIdDirectPrivate(controller, "strategySizeAlgorithm", @undefined);
}
function writableStreamDefaultControllerClose(controller)
{
@enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), @isCloseSentinel, 0);
@writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}
function writableStreamDefaultControllerError(controller, error)
{
const stream = @getByIdDirectPrivate(controller, "stream");
@assert(stream !== @undefined);
@assert(@getByIdDirectPrivate(stream, "state") === "writable");
@writableStreamDefaultControllerClearAlgorithms(controller);
@writableStreamStartErroring(stream, error);
}
function writableStreamDefaultControllerErrorIfNeeded(controller, error)
{
const stream = @getByIdDirectPrivate(controller, "stream");
if (@getByIdDirectPrivate(stream, "state") === "writable")
@writableStreamDefaultControllerError(controller, error);
}
function writableStreamDefaultControllerGetBackpressure(controller)
{
const desiredSize = @writableStreamDefaultControllerGetDesiredSize(controller);
return desiredSize <= 0;
}
function writableStreamDefaultControllerGetChunkSize(controller, chunk)
{
try {
return @getByIdDirectPrivate(controller, "strategySizeAlgorithm").@call(@undefined, chunk);
} catch (e) {
@writableStreamDefaultControllerErrorIfNeeded(controller, e);
return 1;
}
}
function writableStreamDefaultControllerGetDesiredSize(controller)
{
return @getByIdDirectPrivate(controller, "strategyHWM") - @getByIdDirectPrivate(controller, "queue").size;
}
function writableStreamDefaultControllerProcessClose(controller)
{
const stream = @getByIdDirectPrivate(controller, "stream");
@writableStreamMarkCloseRequestInFlight(stream);
@dequeueValue(@getByIdDirectPrivate(controller, "queue"));
@assert(@getByIdDirectPrivate(controller, "queue").content.length === 0);
const sinkClosePromise = @getByIdDirectPrivate(controller, "closeAlgorithm").@call();
@writableStreamDefaultControllerClearAlgorithms(controller);
sinkClosePromise.@then(() => {
@writableStreamFinishInFlightClose(stream);
}, (reason) => {
@writableStreamFinishInFlightCloseWithError(stream, reason);
});
}
function writableStreamDefaultControllerProcessWrite(controller, chunk)
{
const stream = @getByIdDirectPrivate(controller, "stream");
@writableStreamMarkFirstWriteRequestInFlight(stream);
const sinkWritePromise = @getByIdDirectPrivate(controller, "writeAlgorithm").@call(@undefined, chunk);
sinkWritePromise.@then(() => {
@writableStreamFinishInFlightWrite(stream);
const state = @getByIdDirectPrivate(stream, "state");
@assert(state === "writable" || state === "erroring");
@dequeueValue(@getByIdDirectPrivate(controller, "queue"));
if (!@writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
@writableStreamUpdateBackpressure(stream, backpressure);
}
@writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}, (reason) => {
const state = @getByIdDirectPrivate(stream, "state");
if (state === "writable")
@writableStreamDefaultControllerClearAlgorithms(controller);
@writableStreamFinishInFlightWriteWithError(stream, reason);
});
}
function writableStreamDefaultControllerWrite(controller, chunk, chunkSize)
{
try {
@enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
const stream = @getByIdDirectPrivate(controller, "stream");
const state = @getByIdDirectPrivate(stream, "state");
if (!@writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
@writableStreamUpdateBackpressure(stream, backpressure);
}
@writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
} catch (e) {
@writableStreamDefaultControllerErrorIfNeeded(controller, e);
}
}