blob: 4263e3991a93bb51f8c6e01f8de892dc26ef9f1b [file] [log] [blame]
/*
* Copyright (C) 2020 Apple Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
// @internal
function isTransformStream(stream)
{
"use strict";
return @isObject(stream) && !!@getByIdDirectPrivate(stream, "readable");
}
function isTransformStreamDefaultController(controller)
{
"use strict";
return @isObject(controller) && !!@getByIdDirectPrivate(controller, "transformAlgorithm");
}
function createTransformStream(startAlgorithm, transformAlgorithm, flushAlgorithm, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)
{
if (writableHighWaterMark === @undefined)
writableHighWaterMark = 1;
if (writableSizeAlgorithm === @undefined)
writableSizeAlgorithm = () => 1;
if (readableHighWaterMark === @undefined)
readableHighWaterMark = 0;
if (readableSizeAlgorithm === @undefined)
readableSizeAlgorithm = () => 1;
@assert(writableHighWaterMark >= 0);
@assert(readableHighWaterMark >= 0);
const transform = {};
@putByIdDirectPrivate(transform, "TransformStream", true);
const stream = new @TransformStream(transform);
const startPromiseCapability = @newPromiseCapability(@Promise);
@initializeTransformStream(stream, startPromiseCapability.@promise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm);
const controller = new @TransformStreamDefaultController();
@setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
startAlgorithm().@then(() => {
startPromiseCapability.@resolve.@call();
}, (error) => {
startPromiseCapability.@reject.@call(@undefined, error);
});
return stream;
}
function initializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)
{
"use strict";
const startAlgorithm = () => { return startPromise; };
const writeAlgorithm = (chunk) => { return @transformStreamDefaultSinkWriteAlgorithm(stream, chunk); }
const abortAlgorithm = (reason) => { return @transformStreamDefaultSinkAbortAlgorithm(stream, reason); }
const closeAlgorithm = () => { return @transformStreamDefaultSinkCloseAlgorithm(stream); }
const writable = @createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm);
const pullAlgorithm = () => { return @transformStreamDefaultSourcePullAlgorithm(stream); };
const cancelAlgorithm = (reason) => {
@transformStreamErrorWritableAndUnblockWrite(stream, reason);
return @Promise.@resolve();
};
const underlyingSource = { };
@putByIdDirectPrivate(underlyingSource, "start", startAlgorithm);
@putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm);
@putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm);
const options = { };
@putByIdDirectPrivate(options, "size", readableSizeAlgorithm);
@putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark);
const readable = new @ReadableStream(underlyingSource, options);
// The writable to expose to JS through writable getter.
@putByIdDirectPrivate(stream, "writable", writable);
// The writable to use for the actual transform algorithms.
@putByIdDirectPrivate(stream, "internalWritable", @getInternalWritableStream(writable));
@putByIdDirectPrivate(stream, "readable", readable);
@putByIdDirectPrivate(stream, "backpressure", @undefined);
@putByIdDirectPrivate(stream, "backpressureChangePromise", @undefined);
@transformStreamSetBackpressure(stream, true);
@putByIdDirectPrivate(stream, "controller", @undefined);
}
function transformStreamError(stream, e)
{
"use strict";
const readable = @getByIdDirectPrivate(stream, "readable");
const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
@readableStreamDefaultControllerError(readableController, e);
@transformStreamErrorWritableAndUnblockWrite(stream, e);
}
function transformStreamErrorWritableAndUnblockWrite(stream, e)
{
"use strict";
@transformStreamDefaultControllerClearAlgorithms(@getByIdDirectPrivate(stream, "controller"));
const writable = @getByIdDirectPrivate(stream, "internalWritable");
@writableStreamDefaultControllerErrorIfNeeded(@getByIdDirectPrivate(writable, "controller"), e);
if (@getByIdDirectPrivate(stream, "backpressure"))
@transformStreamSetBackpressure(stream, false);
}
function transformStreamSetBackpressure(stream, backpressure)
{
"use strict";
@assert(@getByIdDirectPrivate(stream, "backpressure") !== backpressure);
const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise");
if (backpressureChangePromise !== @undefined)
backpressureChangePromise.@resolve.@call();
@putByIdDirectPrivate(stream, "backpressureChangePromise", @newPromiseCapability(@Promise));
@putByIdDirectPrivate(stream, "backpressure", backpressure);
}
function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm)
{
"use strict";
@assert(@isTransformStream(stream));
@assert(@getByIdDirectPrivate(stream, "controller") === @undefined);
@putByIdDirectPrivate(controller, "stream", stream);
@putByIdDirectPrivate(stream, "controller", controller);
@putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm);
@putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm);
}
function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict)
{
"use strict";
const controller = new @TransformStreamDefaultController();
let transformAlgorithm = (chunk) => {
try {
@transformStreamDefaultControllerEnqueue(controller, chunk);
} catch (e) {
return @Promise.@reject(e);
}
return @Promise.@resolve();
};
let flushAlgorithm = () => { return @Promise.@resolve(); };
if ("transform" in transformerDict)
transformAlgorithm = (chunk) => {
return @promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]);
};
if ("flush" in transformerDict) {
flushAlgorithm = () => {
return @promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]);
};
}
@setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
}
function transformStreamDefaultControllerClearAlgorithms(controller)
{
"use strict";
// We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check.
@putByIdDirectPrivate(controller, "transformAlgorithm", true);
@putByIdDirectPrivate(controller, "flushAlgorithm", @undefined);
}
function transformStreamDefaultControllerEnqueue(controller, chunk)
{
"use strict";
const stream = @getByIdDirectPrivate(controller, "stream");
const readable = @getByIdDirectPrivate(stream, "readable");
const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
@assert(readableController !== @undefined);
if (!@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
@throwTypeError("TransformStream.readable cannot close or enqueue");
try {
@readableStreamDefaultControllerEnqueue(readableController, chunk);
} catch (e) {
@transformStreamErrorWritableAndUnblockWrite(stream, e);
throw @getByIdDirectPrivate(readable, "storedError");
}
const backpressure = !@readableStreamDefaultControllerShouldCallPull(readableController);
if (backpressure !== @getByIdDirectPrivate(stream, "backpressure")) {
@assert(backpressure);
@transformStreamSetBackpressure(stream, true);
}
}
function transformStreamDefaultControllerError(controller, e)
{
"use strict";
@transformStreamError(@getByIdDirectPrivate(controller, "stream"), e);
}
function transformStreamDefaultControllerPerformTransform(controller, chunk)
{
"use strict";
const promiseCapability = @newPromiseCapability(@Promise);
const transformPromise = @getByIdDirectPrivate(controller, "transformAlgorithm").@call(@undefined, chunk);
transformPromise.@then(() => {
promiseCapability.@resolve();
}, (r) => {
@transformStreamError(@getByIdDirectPrivate(controller, "stream"), r);
promiseCapability.@reject.@call(@undefined, r);
});
return promiseCapability.@promise;
}
function transformStreamDefaultControllerTerminate(controller)
{
"use strict";
const stream = @getByIdDirectPrivate(controller, "stream");
const readable = @getByIdDirectPrivate(stream, "readable");
const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
// FIXME: Update readableStreamDefaultControllerClose to make this check.
if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
@readableStreamDefaultControllerClose(readableController);
const error = @makeTypeError("the stream has been terminated");
@transformStreamErrorWritableAndUnblockWrite(stream, error);
}
function transformStreamDefaultSinkWriteAlgorithm(stream, chunk)
{
"use strict";
const writable = @getByIdDirectPrivate(stream, "internalWritable");
@assert(@getByIdDirectPrivate(writable, "state") === "writable");
const controller = @getByIdDirectPrivate(stream, "controller");
if (@getByIdDirectPrivate(stream, "backpressure")) {
const promiseCapability = @newPromiseCapability(@Promise);
const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise");
@assert(backpressureChangePromise !== @undefined);
backpressureChangePromise.@promise.@then(() => {
const state = @getByIdDirectPrivate(writable, "state");
if (state === "erroring") {
promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(writable, "storedError"));
return;
}
@assert(state === "writable");
@transformStreamDefaultControllerPerformTransform(controller, chunk).@then(() => {
promiseCapability.@resolve();
}, (e) => {
promiseCapability.@reject.@call(@undefined, e);
});
}, (e) => {
promiseCapability.@reject.@call(@undefined, e);
});
return promiseCapability.@promise;
}
return @transformStreamDefaultControllerPerformTransform(controller, chunk);
}
function transformStreamDefaultSinkAbortAlgorithm(stream, reason)
{
"use strict";
@transformStreamError(stream, reason);
return @Promise.@resolve();
}
function transformStreamDefaultSinkCloseAlgorithm(stream)
{
"use strict";
const readable = @getByIdDirectPrivate(stream, "readable");
const controller = @getByIdDirectPrivate(stream, "controller");
const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
const flushAlgorithm = @getByIdDirectPrivate(controller, "flushAlgorithm");
@assert(flushAlgorithm !== @undefined);
const flushPromise = @getByIdDirectPrivate(controller, "flushAlgorithm").@call();
@transformStreamDefaultControllerClearAlgorithms(controller);
const promiseCapability = @newPromiseCapability(@Promise);
flushPromise.@then(() => {
if (@getByIdDirectPrivate(readable, "state") === @streamErrored) {
promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError"));
return;
}
// FIXME: Update readableStreamDefaultControllerClose to make this check.
if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
@readableStreamDefaultControllerClose(readableController);
promiseCapability.@resolve();
}, (r) => {
@transformStreamError(@getByIdDirectPrivate(controller, "stream"), r);
promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError"));
});
return promiseCapability.@promise;
}
function transformStreamDefaultSourcePullAlgorithm(stream)
{
"use strict";
@assert(@getByIdDirectPrivate(stream, "backpressure"));
@assert(@getByIdDirectPrivate(stream, "backpressureChangePromise") !== @undefined);
@transformStreamSetBackpressure(stream, false);
return @getByIdDirectPrivate(stream, "backpressureChangePromise").@promise;
}