blob: ba6acc3e052422626d6882920616c378982dc19b [file] [log] [blame]
'use strict';
class MeasurementSet {
_platformId;
_metricId;
_lastModified;
_sortedClusters = [];
_primaryClusterEndTime = null;
_clusterCount = null;
_clusterStart = null;
_clusterSize = null;
_allFetches = new Map;
_callbackMap = new Map;
_primaryClusterPromise = null;
_segmentationCache = new Map;
constructor(platformId, metricId, lastModified)
{
this._platformId = platformId;
this._metricId = metricId;
this._lastModified = +lastModified;
}
platformId() { return this._platformId; }
metricId() { return this._metricId; }
static findSet(platformId, metricId, lastModified)
{
if (!this._setMap)
this._setMap = new Map;
const key = `${platformId}-${metricId}`;
let measurementSet = this._setMap.get(key);
if (!measurementSet) {
measurementSet = new MeasurementSet(platformId, metricId, lastModified);
this._setMap.set(key, measurementSet);
}
return measurementSet;
}
findClusters(startTime, endTime)
{
const clusterStart = this._clusterStart;
const clusterSize = this._clusterSize;
const clusters = [];
let clusterEnd = clusterStart + Math.floor(Math.max(0, startTime - clusterStart) / clusterSize) * clusterSize;
const lastClusterEndTime = this._primaryClusterEndTime;
const firstClusterEndTime = lastClusterEndTime - clusterSize * (this._clusterCount - 1);
do {
clusterEnd += clusterSize;
if (firstClusterEndTime <= clusterEnd && clusterEnd <= this._primaryClusterEndTime)
clusters.push(clusterEnd);
} while (clusterEnd < endTime);
return clusters;
}
async fetchBetween(startTime, endTime, callback, noCache)
{
if (noCache) {
this._primaryClusterPromise = null;
this._allFetches = new Map;
}
if (!this._primaryClusterPromise)
this._primaryClusterPromise = this._fetchPrimaryCluster(noCache);
if (callback)
this._primaryClusterPromise.catch(callback);
await this._primaryClusterPromise;
this._allFetches.set(this._primaryClusterEndTime, this._primaryClusterPromise);
return Promise.all(this.findClusters(startTime, endTime).map((clusterEndTime) => this._ensureClusterPromise(clusterEndTime, callback)));
}
_ensureClusterPromise(clusterEndTime, callback)
{
let callbackSet = this._callbackMap.get(clusterEndTime);
if (!callbackSet) {
callbackSet = new Set;
this._callbackMap.set(clusterEndTime, callbackSet);
}
if (callback)
callbackSet.add(callback);
let promise = this._allFetches.get(clusterEndTime);
if (promise) {
if (callback)
promise.then(callback, callback);
return promise;
}
promise = this._fetchSecondaryCluster(clusterEndTime);
for (const existingCallback of callbackSet)
promise.then(existingCallback, existingCallback);
this._allFetches.set(clusterEndTime, promise);
return promise;
}
_urlForCache(clusterEndTime = null /* primary */)
{
const suffix = clusterEndTime ? '-' + +clusterEndTime : '';
return `/data/measurement-set-${this._platformId}-${this._metricId}${suffix}.json`;
}
async _fetchPrimaryCluster(noCache = false)
{
let data = null;
if (!noCache) {
try {
data = await RemoteAPI.getJSONWithStatus(this._urlForCache());
if (+data['lastModified'] < this._lastModified)
data = null;
} catch (error) {
if (error != 404)
throw error;
}
}
if (!data) // Request the latest JSON if noCache was true or the cached JSON was outdated.
data = await RemoteAPI.getJSONWithStatus(`/api/measurement-set?platform=${this._platformId}&metric=${this._metricId}`);
this._primaryClusterEndTime = data['endTime'];
this._clusterCount = data['clusterCount'];
this._clusterStart = data['clusterStart'];
this._clusterSize = data['clusterSize'];
this._addFetchedCluster(new MeasurementCluster(data));
}
async _fetchSecondaryCluster(endTime)
{
console.assert(endTime);
console.assert(this._primaryClusterEndTime);
const data = await RemoteAPI.getJSONWithStatus(this._urlForCache(endTime));
this._addFetchedCluster(new MeasurementCluster(data));
}
_addFetchedCluster(cluster)
{
for (let clusterIndex = 0; clusterIndex < this._sortedClusters.length; clusterIndex++) {
const startTime = this._sortedClusters[clusterIndex].startTime();
if (cluster.startTime() <= startTime) {
this._sortedClusters.splice(clusterIndex, startTime == cluster.startTime() ? 1 : 0, cluster);
return;
}
}
this._sortedClusters.push(cluster);
}
hasFetchedRange(startTime, endTime)
{
console.assert(startTime < endTime);
let foundStart = false;
let previousEndTime = null;
endTime = Math.min(endTime, this._primaryClusterEndTime);
for (const cluster of this._sortedClusters) {
const containsStart = cluster.startTime() <= startTime && startTime <= cluster.endTime();
const containsEnd = cluster.startTime() <= endTime && endTime <= cluster.endTime();
const preceedingClusterIsMissing = previousEndTime !== null && previousEndTime != cluster.startTime();
if (containsStart && containsEnd)
return true;
if (containsStart)
foundStart = true;
if (foundStart && preceedingClusterIsMissing)
return false;
if (containsEnd)
return foundStart; // Return true iff there were not missing clusters from the one that contains startTime
previousEndTime = cluster.endTime();
}
return false; // Didn't find a cluster that contains startTime or endTime
}
fetchedTimeSeries(configType, includeOutliers, extendToFuture)
{
Instrumentation.startMeasuringTime('MeasurementSet', 'fetchedTimeSeries');
// FIXME: Properly construct TimeSeries.
const series = new TimeSeries();
const idMap = {};
for (const cluster of this._sortedClusters)
cluster.addToSeries(series, configType, includeOutliers, idMap);
if (extendToFuture)
series.extendToFuture();
Instrumentation.endMeasuringTime('MeasurementSet', 'fetchedTimeSeries');
return series;
}
async fetchSegmentation(segmentationName, parameters, configType, includeOutliers, extendToFuture)
{
let cacheMap = this._segmentationCache.get(configType);
if (!cacheMap) {
cacheMap = new WeakMap;
this._segmentationCache.set(configType, cacheMap);
}
const timeSeries = new TimeSeries;
const idMap = {};
const promises = [];
for (const cluster of this._sortedClusters) {
const clusterStart = timeSeries.length();
cluster.addToSeries(timeSeries, configType, includeOutliers, idMap);
const clusterEnd = timeSeries.length();
promises.push(this._cachedClusterSegmentation(segmentationName, parameters, cacheMap,
cluster, timeSeries, clusterStart, clusterEnd, idMap));
}
if (!timeSeries.length())
return Promise.resolve(null);
const clusterSegmentations = await Promise.all(promises);
const segmentationSeries = [];
const addSegmentMergingIdenticalSegments = (startingPoint, endingPoint) => {
const value = Statistics.mean(timeSeries.valuesBetweenRange(startingPoint.seriesIndex, endingPoint.seriesIndex));
if (!segmentationSeries.length || value !== segmentationSeries[segmentationSeries.length - 1].value) {
segmentationSeries.push({value: value, time: startingPoint.time, seriesIndex: startingPoint.seriesIndex, interval: function () { return null; }});
segmentationSeries.push({value: value, time: endingPoint.time, seriesIndex: endingPoint.seriesIndex, interval: function () { return null; }});
} else
segmentationSeries[segmentationSeries.length - 1].seriesIndex = endingPoint.seriesIndex;
};
let startingIndex = 0;
for (const segmentation of clusterSegmentations) {
for (const endingIndex of segmentation) {
addSegmentMergingIdenticalSegments(timeSeries.findPointByIndex(startingIndex), timeSeries.findPointByIndex(endingIndex));
startingIndex = endingIndex;
}
}
if (extendToFuture)
timeSeries.extendToFuture();
addSegmentMergingIdenticalSegments(timeSeries.findPointByIndex(startingIndex), timeSeries.lastPoint());
return segmentationSeries;
}
async _cachedClusterSegmentation(segmentationName, parameters, cacheMap, cluster, timeSeries, clusterStart, clusterEnd, idMap)
{
const cache = cacheMap.get(cluster);
if (cache && this._validateSegmentationCache(cache, segmentationName, parameters)) {
const segmentationByIndex = new Array(cache.segmentation.length);
for (let i = 0; i < cache.segmentation.length; i++) {
const id = cache.segmentation[i];
if (!(id in idMap))
return null;
segmentationByIndex[i] = idMap[id];
}
return Promise.resolve(segmentationByIndex);
}
const clusterValues = timeSeries.valuesBetweenRange(clusterStart, clusterEnd);
const segmentationInClusterIndex = await this._invokeSegmentationAlgorithm(segmentationName, parameters, clusterValues);
// Remove cluster start/end as segmentation points. Otherwise each cluster will be placed into its own segment.
const segmentation = segmentationInClusterIndex.slice(1, -1).map((index) => clusterStart + index);
cacheMap.set(cluster, {segmentationName,
segmentationParameters: parameters.slice(),
segmentation: segmentation.map((index) => timeSeries.findPointByIndex(index).id)});
return segmentation;
}
_validateSegmentationCache(cache, segmentationName, parameters)
{
if (cache.segmentationName != segmentationName)
return false;
if (!!cache.segmentationParameters != !!parameters)
return false;
if (parameters) {
if (parameters.length != cache.segmentationParameters.length)
return false;
for (var i = 0; i < parameters.length; i++) {
if (parameters[i] != cache.segmentationParameters[i])
return false;
}
}
return true;
}
async _invokeSegmentationAlgorithm(segmentationName, parameters, timeSeriesValues)
{
const args = [timeSeriesValues].concat(parameters || []);
const timeSeriesIsShortEnoughForSyncComputation = timeSeriesValues.length < 100;
if (timeSeriesIsShortEnoughForSyncComputation || !AsyncTask.isAvailable()) {
Instrumentation.startMeasuringTime('_invokeSegmentationAlgorithm', 'syncSegmentation');
const segmentation = Statistics[segmentationName].apply(timeSeriesValues, args);
Instrumentation.endMeasuringTime('_invokeSegmentationAlgorithm', 'syncSegmentation');
return Promise.resolve(segmentation);
}
const task = new AsyncTask(segmentationName, args);
const response = await task.execute();
Instrumentation.reportMeasurement('_invokeSegmentationAlgorithm', 'workerStartLatency', 'ms', response.startLatency);
Instrumentation.reportMeasurement('_invokeSegmentationAlgorithm', 'workerTime', 'ms', response.workerTime);
Instrumentation.reportMeasurement('_invokeSegmentationAlgorithm', 'totalTime', 'ms', response.totalTime);
return response.result;
}
}
if (typeof module != 'undefined')
module.exports.MeasurementSet = MeasurementSet;