blob: 13dbfdd1109464c5a588b2dba6b79220273adedf [file] [log] [blame]
function RandomPushSource(toPush) {
this.pushed = 0;
this.toPush = toPush;
this.started = false;
this.paused = false;
this.closed = false;
this._intervalHandle = null;
}
RandomPushSource.prototype = {
readStart: function() {
if (this.closed) {
return;
}
if (!this.started) {
this._intervalHandle = setInterval(writeChunk, 23);
this.started = true;
}
if (this.paused) {
this._intervalHandle = setInterval(writeChunk, 23);
this.paused = false;
}
var stream = this;
function writeChunk() {
if (stream.paused) {
return;
}
stream.pushed++;
if (stream.toPush > 0 && stream.pushed > stream.toPush) {
if (stream._intervalHandle) {
clearInterval(stream._intervalHandle);
stream._intervalHandle = undefined;
}
stream.closed = true;
stream.onend();
}
else {
stream.ondata(randomChunk(128));
}
}
},
readStop: function() {
if (this.paused) {
return;
}
if (this.started) {
this.paused = true;
clearInterval(this._intervalHandle);
this._intervalHandle = undefined;
} else {
throw new Error('Can\'t pause reading an unstarted source.');
}
},
}
function randomChunk(size) {
var chunk = '';
for (var i = 0; i < size; i++) {
// Add a random character from the basic printable ASCII set.
chunk += String.fromCharCode(Math.round(Math.random() * 84) + 32)
}
return chunk;
}
function readableStreamToArray(readable, reader) {
var chunks = [];
if (reader == undefined) {
reader = readable.getReader();
}
return pump();
function pump() {
return reader.read().then(function({ value, done }) {
if (done) {
return chunks;
}
chunks.push(value);
return pump();
});
}
}
function SequentialPullSource(limit, async) {
this.current = 0;
this.limit = limit;
this.opened = false;
this.closed = false;
this._exec = function(f) {
f();
};
if (async)
this._exec = function(f) {
setTimeout(f, 0);
};
}
SequentialPullSource.prototype = {
open: function(cb) {
var myFunction = function() {
this.opened = true
cb();
};
this._exec(myFunction.bind(this));
},
read: function(cb) {
var myFunction = function() {
if (++this.current <= this.limit) {
cb(null, false, this.current);
} else {
cb(null, true, null);
}
};
this._exec(myFunction.bind(this));
},
close: function(cb) {
var myFunction = function() {
this.closed = true;
cb();
};
this._exec(myFunction.bind(this));
},
}
function sequentialReadableStream(limit, options) {
var sequentialSource = new SequentialPullSource(limit, options);
var stream = new ReadableStream({
start: function() {
return new Promise(function(resolve, reject) {
sequentialSource.open(function(err) {
if (err) {
reject(err);
}
resolve();
});
});
},
pull: function(enqueue, finish, error) {
sequentialSource.read(function(err, done, chunk) {
if (err) {
error(err);
} else if (done) {
sequentialSource.close(function(err) {
if (err) {
error(err);
}
finish();
});
} else {
enqueue(chunk);
}
});
},
});
stream.source = sequentialSource;
return stream;
};