blob: a77027a1e3b5e524ecf86296b2e5c3d378ff5263 [file] [log] [blame]
/*
* Copyright (C) 2015 Canon Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
// @optional=STREAMS_API
// @internals
function privateInitializeReadableStreamReader(stream)
{
"use strict";
if (!@isReadableStream(stream))
throw new @TypeError("ReadableStreamReader needs a ReadableStream");
if (@isReadableStreamLocked(stream))
throw new @TypeError("ReadableStream is locked");
this.@state = stream.@state;
this.@readRequests = [];
if (stream.@state === @readableStreamReadable) {
this.@ownerReadableStream = stream;
this.@storedError = undefined;
stream.@reader = this;
this.@closedPromise = new Promise(function(resolve, reject) {
stream.@reader.@closedPromiseResolve = resolve;
stream.@reader.@closedPromiseReject = reject;
});
return this;
}
if (stream.@state === @readableStreamClosed) {
this.@ownerReadableStream = null;
this.@storedError = undefined;
this.@closedPromise = Promise.resolve();
return this;
}
// TODO: ASSERT(stream.@state === @readableStreamErrored);
this.@ownerReadableStream = null;
this.@storedError = stream.@storedError;
this.@closedPromise = Promise.reject(stream.@storedError);
return this;
}
function privateInitializeReadableStreamController(stream)
{
"use strict";
if (!@isReadableStream(stream))
throw new @TypeError("ReadableStreamController needs a ReadableStream");
if (typeof stream.@controller !== "undefined")
throw new @TypeError("ReadableStream already has a controller");
this.@controlledReadableStream = stream;
return this;
}
function teeReadableStream(stream, shouldClone)
{
"use strict";
// TODO: Assert: IsReadableStream(stream) is true.
// TODO: Assert: Type(shouldClone) is Boolean.
let reader = stream.getReader();
let teeState = {
closedOrErrored: false,
canceled1: false,
canceled2: false,
reason1: undefined,
reason: undefined,
};
teeState.cancelPromise = new @InternalPromise(function(resolve, reject) {
teeState.resolvePromise = resolve;
teeState.rejectPromise = reject;
});
let pullFunction = @teeReadableStreamPullFunction(teeState, reader, shouldClone);
let underlyingSource1 = {
"pull": pullFunction,
"cancel": @teeReadableStreamBranch1CancelFunction(teeState, stream)
};
let underlyingSource2 = {
"pull": pullFunction,
"cancel": @teeReadableStreamBranch2CancelFunction(teeState, stream)
};
let branch1 = new ReadableStream(underlyingSource1);
let branch2 = new ReadableStream(underlyingSource2);
reader.closed.catch(function(e) {
if (teeState.closedOrErrored)
return;
@errorReadableStream(branch1, e);
@errorReadableStream(branch2, e);
teeState.closedOrErrored = true;
});
// Additional fields compared to the spec, as they are needed within pull/cancel functions.
teeState.branch1 = branch1;
teeState.branch2 = branch2;
return [branch1, branch2];
}
function teeReadableStreamPullFunction(teeState, reader, shouldClone)
{
return function() {
reader.read().then(function(result) {
if (result.done && !teeState.closedOrErrored) {
@closeReadableStream(teeState.branch1);
@closeReadableStream(teeState.branch2);
teeState.closedOrErrored = true;
}
if (teeState.closedOrErrored)
return;
if (!teeState.canceled1) {
// TODO: Implement cloning if shouldClone is true
@enqueueInReadableStream(teeState.branch1, result.value);
}
if (!teeState.canceled2) {
// TODO: Implement cloning if shouldClone is true
@enqueueInReadableStream(teeState.branch2, result.value);
}
});
}
}
function teeReadableStreamBranch1CancelFunction(teeState, stream)
{
return function(r) {
teeState.canceled1 = true;
teeState.reason1 = r;
if (teeState.canceled2) {
@cancelReadableStream(stream, [teeState.reason1, teeState.reason2]).then(teeState.resolvePromise, teeState.rejectPromise);
}
return teeState.cancelPromise;
}
}
function teeReadableStreamBranch2CancelFunction(teeState, stream)
{
return function(r) {
teeState.canceled2 = true;
teeState.reason2 = r;
if (teeState.canceled1) {
@cancelReadableStream(stream, [teeState.reason1, teeState.reason2]).then(teeState.resolvePromise, teeState.rejectPromise);
}
return teeState.cancelPromise;
}
}
function isReadableStream(stream)
{
"use strict";
return @isObject(stream) && !!stream.@underlyingSource;
}
function isReadableStreamReader(reader)
{
"use strict";
return @isObject(reader) && typeof reader.@ownerReadableStream !== "undefined";
}
function isReadableStreamController(controller)
{
"use strict";
return @isObject(controller) && !!controller.@controlledReadableStream;
}
function errorReadableStream(stream, error)
{
"use strict";
// TODO: ASSERT(stream.@state === @readableStreamReadable);
stream.@queue = [];
stream.@storedError = error;
stream.@state = @readableStreamErrored;
if (!stream.@reader)
return;
var reader = stream.@reader;
var requests = reader.@readRequests;
for (var index = 0, length = requests.length; index < length; ++index)
requests[index].reject(error);
reader.@readRequests = [];
@releaseReadableStreamReader(reader);
reader.@storedError = error;
reader.@state = @readableStreamErrored;
reader.@closedPromiseReject(error);
}
function requestReadableStreamPull(stream)
{
"use strict";
if (stream.@state !== @readableStreamReadable)
return;
if (stream.@closeRequested)
return;
if (!stream.@started)
return;
if ((!@isReadableStreamLocked(stream) || !stream.@reader.@readRequests.length) && @getReadableStreamDesiredSize(stream) <= 0)
return;
if (stream.@pulling) {
stream.@pullAgain = true;
return;
}
stream.@pulling = true;
var promise = @promiseInvokeOrNoop(stream.@underlyingSource, "pull", [stream.@controller]);
promise.then(function() {
stream.@pulling = false;
if (stream.@pullAgain) {
stream.@pullAgain = false;
@requestReadableStreamPull(stream);
}
}, function(error) {
@errorReadableStream(stream, error);
});
}
function isReadableStreamLocked(stream)
{
"use strict";
return !!stream.@reader;
}
function getReadableStreamDesiredSize(stream)
{
"use strict";
return stream.@highWaterMark - stream.@queueSize;
}
function releaseReadableStreamReader(reader)
{
"use strict";
reader.@ownerReadableStream.@reader = undefined;
reader.@ownerReadableStream = null;
}
function cancelReadableStream(stream, reason)
{
"use strict";
if (stream.@state === @readableStreamClosed)
return Promise.resolve();
if (stream.@state === @readableStreamErrored)
return Promise.reject(stream.@storedError);
stream.@queue = [];
@finishClosingReadableStream(stream);
return @promiseInvokeOrNoop(stream.@underlyingSource, "cancel", [reason]).then(function() { });
}
function finishClosingReadableStream(stream)
{
"use strict";
// TODO: ASSERT(stream.@state === @readableStreamReadable);
stream.@state = @readableStreamClosed;
var reader = stream.@reader;
if (reader)
@closeReadableStreamReader(reader);
}
function closeReadableStream(stream)
{
"use strict";
// TODO. ASSERT(!stream.@closeRequested);
// TODO: ASSERT(stream.@state !== @readableStreamErrored);
if (stream.@state === @readableStreamClosed)
return;
stream.@closeRequested = true;
if (!stream.@queue.length)
@finishClosingReadableStream(stream);
}
function closeReadableStreamReader(reader)
{
"use strict";
var requests = reader.@readRequests;
for (var index = 0, length = requests.length; index < length; ++index)
requests[index].resolve({value:undefined, done: true});
reader.@readRequests = [];
@releaseReadableStreamReader(reader);
reader.@state = @readableStreamClosed;
reader.@closedPromiseResolve();
}
function enqueueInReadableStream(stream, chunk)
{
"use strict";
// TODO: ASSERT(!stream.@closeRequested);
// TODO: ASSERT(stream.@state !== @readableStreamErrored);
if (stream.@state === @readableStreamClosed)
return undefined;
if (@isReadableStreamLocked(stream) && stream.@reader.@readRequests.length) {
stream.@reader.@readRequests.shift().resolve({value: chunk, done: false});
@requestReadableStreamPull(stream);
return;
}
try {
var size = 1;
if (stream.@strategySize) {
size = Number(stream.@strategySize(chunk));
if (Number.isNaN(size) || size === +Infinity || size < 0)
throw new RangeError("Chunk size is not valid");
}
stream.@queue.push({ value: chunk, size: size });
stream.@queueSize += size;
}
catch(error) {
@errorReadableStream(stream, error);
throw error;
}
@requestReadableStreamPull(stream);
}
function readFromReadableStreamReader(reader)
{
"use strict";
if (reader.@state === @readableStreamClosed)
return Promise.resolve({value: undefined, done: true});
if (reader.@state === @readableStreamErrored)
return Promise.reject(reader.@storedError);
// TODO: ASSERT(!!reader.@ownerReadableStream);
// TODO: ASSERT(reader.@ownerReadableStream.@state === @readableStreamReadable);
var stream = reader.@ownerReadableStream;
if (stream.@queue.length) {
var chunk = stream.@queue.shift();
stream.@queueSize -= chunk.size;
if (!stream.@closeRequested)
@requestReadableStreamPull(stream);
else if (!stream.@queue.length)
@finishClosingReadableStream(stream);
return Promise.resolve({value: chunk.value, done: false});
}
var readRequest = {};
var readPromise = new Promise(function(resolve, reject) {
readRequest.resolve = resolve;
readRequest.reject = reject;
});
reader.@readRequests.push(readRequest);
@requestReadableStreamPull(stream);
return readPromise;
}
function invokeOrNoop(object, key, args)
{
"use strict";
var method = object[key];
if (typeof method === "undefined")
return;
return method.@apply(object, args);
}
function promiseInvokeOrNoop(object, key, args)
{
"use strict";
try {
var method = object[key];
if (typeof method === "undefined")
return Promise.resolve();
var result = method.@apply(object, args);
return Promise.resolve(result);
}
catch(error) {
return Promise.reject(error);
}
}