blob: 3a32fe437d6e641f0a70d59a669ec598258f65fa [file] [log] [blame]
/*
* Copyright (C) 2015 Canon Inc. All rights reserved.
* Copyright (C) 2015 Igalia.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
// @conditional=ENABLE(STREAMS_API)
// @internal
function readableStreamReaderGenericInitialize(reader, stream)
{
"use strict";
@putByIdDirectPrivate(reader, "ownerReadableStream", stream);
@putByIdDirectPrivate(stream, "reader", reader);
if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
@putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise));
else if (@getByIdDirectPrivate(stream, "state") === @streamClosed)
@putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() });
else {
@assert(@getByIdDirectPrivate(stream, "state") === @streamErrored);
@putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) });
}
}
function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)
{
"use strict";
if (!@isReadableStream(stream))
@throwTypeError("ReadableStreamDefaultController needs a ReadableStream");
// readableStreamController is initialized with null value.
if (@getByIdDirectPrivate(stream, "readableStreamController") !== null)
@throwTypeError("ReadableStream already has a controller");
@putByIdDirectPrivate(this, "controlledReadableStream", stream);
@putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
@putByIdDirectPrivate(this, "queue", @newQueue());
@putByIdDirectPrivate(this, "started", false);
@putByIdDirectPrivate(this, "closeRequested", false);
@putByIdDirectPrivate(this, "pullAgain", false);
@putByIdDirectPrivate(this, "pulling", false);
@putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark));
const controller = this;
@promiseInvokeOrNoopNoCatch(underlyingSource, "start", [this]).@then(() => {
@putByIdDirectPrivate(controller, "started", true);
@assert(!@getByIdDirectPrivate(controller, "pulling"));
@assert(!@getByIdDirectPrivate(controller, "pullAgain"));
@readableStreamDefaultControllerCallPullIfNeeded(controller);
}, (error) => {
@readableStreamDefaultControllerError(controller, error);
});
@putByIdDirectPrivate(this, "cancel", @readableStreamDefaultControllerCancel);
@putByIdDirectPrivate(this, "pull", @readableStreamDefaultControllerPull);
return this;
}
function readableStreamDefaultControllerError(controller, error)
{
"use strict";
const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
if (@getByIdDirectPrivate(stream, "state") !== @streamReadable)
return;
@putByIdDirectPrivate(controller, "queue", @newQueue());
@readableStreamError(stream, error);
}
function readableStreamPipeTo(stream, sink)
{
"use strict";
@assert(@isReadableStream(stream));
const reader = new @ReadableStreamDefaultReader(stream);
@getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); });
function doPipe() {
@readableStreamDefaultReaderRead(reader).@then(function(result) {
if (result.done) {
sink.close();
return;
}
try {
sink.enqueue(result.value);
} catch (e) {
sink.error("ReadableStream chunk enqueueing in the sink failed");
return;
}
doPipe();
});
}
doPipe();
}
function readableStreamTee(stream, shouldClone)
{
"use strict";
@assert(@isReadableStream(stream));
@assert(typeof(shouldClone) === "boolean");
const reader = new @ReadableStreamDefaultReader(stream);
const teeState = {
closedOrErrored: false,
canceled1: false,
canceled2: false,
reason1: @undefined,
reason2: @undefined,
};
teeState.cancelPromiseCapability = @newPromiseCapability(@InternalPromise);
const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone);
const branch1 = new @ReadableStream({
"pull": pullFunction,
"cancel": @readableStreamTeeBranch1CancelFunction(teeState, stream)
});
const branch2 = new @ReadableStream({
"pull": pullFunction,
"cancel": @readableStreamTeeBranch2CancelFunction(teeState, stream)
});
@getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) {
if (teeState.closedOrErrored)
return;
@readableStreamDefaultControllerError(branch1.@readableStreamController, e);
@readableStreamDefaultControllerError(branch2.@readableStreamController, e);
teeState.closedOrErrored = true;
});
// Additional fields compared to the spec, as they are needed within pull/cancel functions.
teeState.branch1 = branch1;
teeState.branch2 = branch2;
return [branch1, branch2];
}
function doStructuredClone(object)
{
"use strict";
// FIXME: We should implement http://w3c.github.io/html/infrastructure.html#ref-for-structured-clone-4
// Implementation is currently limited to ArrayBuffer/ArrayBufferView to meet Fetch API needs.
if (object instanceof @ArrayBuffer)
return @structuredCloneArrayBuffer(object);
if (@ArrayBuffer.@isView(object))
return @structuredCloneArrayBufferView(object);
@throwTypeError("structuredClone not implemented for: " + object);
}
function readableStreamTeePullFunction(teeState, reader, shouldClone)
{
"use strict";
return function() {
@Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) {
@assert(@isObject(result));
@assert(typeof result.done === "boolean");
if (result.done && !teeState.closedOrErrored) {
if (!teeState.canceled1)
@readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController);
if (!teeState.canceled2)
@readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController);
teeState.closedOrErrored = true;
}
if (teeState.closedOrErrored)
return;
if (!teeState.canceled1)
@readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value);
if (!teeState.canceled2)
@readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @doStructuredClone(result.value) : result.value);
});
}
}
function readableStreamTeeBranch1CancelFunction(teeState, stream)
{
"use strict";
return function(r) {
teeState.canceled1 = true;
teeState.reason1 = r;
if (teeState.canceled2) {
@readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
teeState.cancelPromiseCapability.@resolve,
teeState.cancelPromiseCapability.@reject);
}
return teeState.cancelPromiseCapability.@promise;
}
}
function readableStreamTeeBranch2CancelFunction(teeState, stream)
{
"use strict";
return function(r) {
teeState.canceled2 = true;
teeState.reason2 = r;
if (teeState.canceled1) {
@readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
teeState.cancelPromiseCapability.@resolve,
teeState.cancelPromiseCapability.@reject);
}
return teeState.cancelPromiseCapability.@promise;
}
}
function isReadableStream(stream)
{
"use strict";
// Spec tells to return true only if stream has a readableStreamController internal slot.
// However, since it is a private slot, it cannot be checked using hasOwnProperty().
// Therefore, readableStreamController is initialized with null value.
return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined;
}
function isReadableStreamDefaultReader(reader)
{
"use strict";
// Spec tells to return true only if reader has a readRequests internal slot.
// However, since it is a private slot, it cannot be checked using hasOwnProperty().
// Since readRequests is initialized with an empty array, the following test is ok.
return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests");
}
function isReadableStreamDefaultController(controller)
{
"use strict";
// Spec tells to return true only if controller has an underlyingSource internal slot.
// However, since it is a private slot, it cannot be checked using hasOwnProperty().
// underlyingSource is obtained in ReadableStream constructor: if undefined, it is set
// to an empty object. Therefore, following test is ok.
return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource");
}
function readableStreamError(stream, error)
{
"use strict";
@assert(@isReadableStream(stream));
@assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
@putByIdDirectPrivate(stream, "state", @streamErrored);
@putByIdDirectPrivate(stream, "storedError", error);
if (!@getByIdDirectPrivate(stream, "reader"))
return;
const reader = @getByIdDirectPrivate(stream, "reader");
if (@isReadableStreamDefaultReader(reader)) {
const requests = @getByIdDirectPrivate(reader, "readRequests");
for (let index = 0, length = requests.length; index < length; ++index)
requests[index].@reject.@call(@undefined, error);
@putByIdDirectPrivate(reader, "readRequests", []);
} else {
@assert(@isReadableStreamBYOBReader(reader));
const requests = @getByIdDirectPrivate(reader, "readIntoRequests");
for (let index = 0, length = requests.length; index < length; ++index)
requests[index].@reject.@call(@undefined, error);
@putByIdDirectPrivate(reader, "readIntoRequests", []);
}
@getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error);
@putByIdDirectPrivate(@getByIdDirectPrivate(reader, "closedPromiseCapability").@promise, "promiseIsHandled", true);
}
function readableStreamDefaultControllerCallPullIfNeeded(controller)
{
"use strict";
const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
return;
if (!@getByIdDirectPrivate(controller, "started"))
return;
if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
return;
if (@getByIdDirectPrivate(controller, "pulling")) {
@putByIdDirectPrivate(controller, "pullAgain", true);
return;
}
@assert(!@getByIdDirectPrivate(controller, "pullAgain"));
@putByIdDirectPrivate(controller, "pulling", true);
@promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingSource"), "pull", [controller]).@then(function() {
@putByIdDirectPrivate(controller, "pulling", false);
if (@getByIdDirectPrivate(controller, "pullAgain")) {
@putByIdDirectPrivate(controller, "pullAgain", false);
@readableStreamDefaultControllerCallPullIfNeeded(controller);
}
}, function(error) {
@readableStreamDefaultControllerError(controller, error);
});
}
function isReadableStreamLocked(stream)
{
"use strict";
@assert(@isReadableStream(stream));
return !!@getByIdDirectPrivate(stream, "reader");
}
function readableStreamDefaultControllerGetDesiredSize(controller)
{
"use strict";
const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
const state = @getByIdDirectPrivate(stream, "state");
if (state === @streamErrored)
return null;
if (state === @streamClosed)
return 0;
return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size;
}
function readableStreamReaderGenericCancel(reader, reason)
{
"use strict";
const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
@assert(!!stream);
return @readableStreamCancel(stream, reason);
}
function readableStreamCancel(stream, reason)
{
"use strict";
@putByIdDirectPrivate(stream, "disturbed", true);
const state = @getByIdDirectPrivate(stream, "state");
if (state === @streamClosed)
return @Promise.@resolve();
if (state === @streamErrored)
return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
@readableStreamClose(stream);
return @getByIdDirectPrivate(stream, "readableStreamController").@cancel(@getByIdDirectPrivate(stream, "readableStreamController"), reason).@then(function() { });
}
function readableStreamDefaultControllerCancel(controller, reason)
{
"use strict";
@putByIdDirectPrivate(controller, "queue", @newQueue());
return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingSource"), "cancel", [reason]);
}
function readableStreamDefaultControllerPull(controller)
{
"use strict";
const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
if (@getByIdDirectPrivate(controller, "queue").content.length) {
const chunk = @dequeueValue(@getByIdDirectPrivate(controller, "queue"));
if (@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(controller, "queue").content.length === 0)
@readableStreamClose(stream);
else
@readableStreamDefaultControllerCallPullIfNeeded(controller);
return @Promise.@resolve({value: chunk, done: false});
}
const pendingPromise = @readableStreamAddReadRequest(stream);
@readableStreamDefaultControllerCallPullIfNeeded(controller);
return pendingPromise;
}
function readableStreamDefaultControllerClose(controller)
{
"use strict";
@assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
@putByIdDirectPrivate(controller, "closeRequested", true);
if (@getByIdDirectPrivate(controller, "queue").content.length === 0)
@readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
}
function readableStreamClose(stream)
{
"use strict";
@assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
@putByIdDirectPrivate(stream, "state", @streamClosed);
const reader = @getByIdDirectPrivate(stream, "reader");
if (!reader)
return;
if (@isReadableStreamDefaultReader(reader)) {
const requests = @getByIdDirectPrivate(reader, "readRequests");
for (let index = 0, length = requests.length; index < length; ++index)
requests[index].@resolve.@call(@undefined, {value:@undefined, done: true});
@putByIdDirectPrivate(reader, "readRequests", []);
}
@getByIdDirectPrivate(reader, "closedPromiseCapability").@resolve.@call();
}
function readableStreamFulfillReadRequest(stream, chunk, done)
{
"use strict";
@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").@shift().@resolve.@call(@undefined, {value: chunk, done: done});
}
function readableStreamDefaultControllerEnqueue(controller, chunk)
{
"use strict";
const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
@assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) {
@readableStreamFulfillReadRequest(stream, chunk, false);
@readableStreamDefaultControllerCallPullIfNeeded(controller);
return;
}
try {
let chunkSize = 1;
if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined)
chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk);
@enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
}
catch(error) {
@readableStreamDefaultControllerError(controller, error);
throw error;
}
@readableStreamDefaultControllerCallPullIfNeeded(controller);
}
function readableStreamDefaultReaderRead(reader)
{
"use strict";
const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
@assert(!!stream);
const state = @getByIdDirectPrivate(stream, "state");
@putByIdDirectPrivate(stream, "disturbed", true);
if (state === @streamClosed)
return @Promise.@resolve({value: @undefined, done: true});
if (state === @streamErrored)
return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
@assert(state === @streamReadable);
return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController"));
}
function readableStreamAddReadRequest(stream)
{
"use strict";
@assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader")));
@assert(@getByIdDirectPrivate(stream, "state") == @streamReadable);
const readRequest = @newPromiseCapability(@Promise);
@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").@push(readRequest);
return readRequest.@promise;
}
function isReadableStreamDisturbed(stream)
{
"use strict";
@assert(@isReadableStream(stream));
return @getByIdDirectPrivate(stream, "disturbed");
}
function readableStreamReaderGenericRelease(reader)
{
"use strict";
@assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream"));
@assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader);
if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable)
@getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, @makeTypeError("releasing lock of reader whose stream is still in readable state"));
else
@putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@makeTypeError("reader released lock")) });
@putByIdDirectPrivate(@getByIdDirectPrivate(reader, "closedPromiseCapability").@promise, "promiseIsHandled", true);
@putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined);
@putByIdDirectPrivate(reader, "ownerReadableStream", @undefined);
}
function readableStreamDefaultControllerCanCloseOrEnqueue(controller)
{
"use strict";
return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable;
}