blob: 1ca4718c6d9a825fb58adf82243087fd81f8faab [file] [log] [blame]
/*
* Copyright (C) 2015 Canon Inc.
* Copyright (C) 2015 Igalia
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
// @conditional=ENABLE(STREAMS_API)
// @internal
function isWritableStream(stream)
{
"use strict";
return @isObject(stream) && !!@getByIdDirectPrivate(stream, "underlyingSink");
}
function syncWritableStreamStateWithQueue(stream)
{
"use strict";
const state = @getByIdDirectPrivate(stream, "state");
if (state === @streamClosing)
return;
@assert(state === @streamWritable || state === @streamWaiting);
const shouldApplyBackpressure = @getByIdDirectPrivate(stream, "queue").size > @getByIdDirectPrivate(stream, "strategy").highWaterMark;
if (shouldApplyBackpressure && state === @streamWritable) {
@putByIdDirectPrivate(stream, "state", @streamWaiting);
@putByIdDirectPrivate(stream, "readyPromiseCapability", @newPromiseCapability(@Promise));
}
if (!shouldApplyBackpressure && state === @streamWaiting) {
@putByIdDirectPrivate(stream, "state", @streamWritable);
@getByIdDirectPrivate(stream, "readyPromiseCapability").@resolve.@call();
}
}
function errorWritableStream(stream, e)
{
"use strict";
const state = @getByIdDirectPrivate(stream, "state");
if (state === @streamClosed || state === @streamErrored)
return;
while (@getByIdDirectPrivate(stream, "queue").content.length > 0) {
const writeRecord = @dequeueValue(@getByIdDirectPrivate(stream, "queue"));
if (writeRecord !== "close")
writeRecord.promiseCapability.@reject.@call(@undefined, e);
}
@putByIdDirectPrivate(stream, "storedError", e);
if (state === @streamWaiting)
@getByIdDirectPrivate(stream, "readyPromiseCapability").@resolve.@call();
@getByIdDirectPrivate(stream, "closedPromiseCapability").@reject.@call(@undefined, e);
@putByIdDirectPrivate(stream, "state", @streamErrored);
}
function callOrScheduleWritableStreamAdvanceQueue(stream)
{
"use strict";
if (!@getByIdDirectPrivate(stream, "started"))
@getByIdDirectPrivate(stream, "startedPromise").@then(function() { @writableStreamAdvanceQueue(stream); });
else
@writableStreamAdvanceQueue(stream);
}
function writableStreamAdvanceQueue(stream)
{
"use strict";
if (@getByIdDirectPrivate(stream, "queue").content.length === 0 || @getByIdDirectPrivate(stream, "writing"))
return;
const writeRecord = @peekQueueValue(@getByIdDirectPrivate(stream, "queue"));
if (writeRecord === "close") {
@assert(@getByIdDirectPrivate(stream, "state") === @streamClosing);
@dequeueValue(@getByIdDirectPrivate(stream, "queue"));
@assert(@getByIdDirectPrivate(stream, "queue").content.length === 0);
@closeWritableStream(stream);
return;
}
@putByIdDirectPrivate(stream, "writing", true);
@promiseInvokeOrNoop(@getByIdDirectPrivate(stream, "underlyingSink"), "write", [writeRecord.chunk]).@then(
function() {
if (@getByIdDirectPrivate(stream, "state") === @streamErrored)
return;
@putByIdDirectPrivate(stream, "writing", false);
writeRecord.promiseCapability.@resolve.@call();
@dequeueValue(@getByIdDirectPrivate(stream, "queue"));
@syncWritableStreamStateWithQueue(stream);
@writableStreamAdvanceQueue(stream);
},
function(r) {
@errorWritableStream(stream, r);
}
);
}
function closeWritableStream(stream)
{
"use strict";
@assert(@getByIdDirectPrivate(stream, "state") === @streamClosing);
@promiseInvokeOrNoop(@getByIdDirectPrivate(stream, "underlyingSink"), "close").@then(
function() {
if (@getByIdDirectPrivate(stream, "state") === @streamErrored)
return;
@assert(@getByIdDirectPrivate(stream, "state") === @streamClosing);
@getByIdDirectPrivate(stream, "closedPromiseCapability").@resolve.@call();
@putByIdDirectPrivate(stream, "state", @streamClosed);
},
function(r) {
@errorWritableStream(stream, r);
}
);
}