|
var _util = require("zrender/lib/core/util");
var each = _util.each;
var isFunction = _util.isFunction;
var createHashMap = _util.createHashMap;
var noop = _util.noop;
var _task = require("./task");
var createTask = _task.createTask;
var _component = require("../util/component");
var getUID = _component.getUID;
var GlobalModel = require("../model/Global");
var ExtensionAPI = require("../ExtensionAPI");
var _model = require("../util/model");
var normalizeToArray = _model.normalizeToArray;
/**
* @module echarts/stream/Scheduler
*/
/**
* @constructor
*/
function Scheduler(ecInstance, api, dataProcessorHandlers, visualHandlers) {
// this._pipelineMap = createHashMap();
this.ecInstance = ecInstance;
this.api = api;
this.unfinished; // Fix current processors in case that in some rear cases that
// processors might be registered after echarts instance created.
// Register processors incrementally for a echarts instance is
// not supported by this stream architecture.
this._dataProcessorHandlers = dataProcessorHandlers.slice();
this._visualHandlers = visualHandlers.slice();
/**
* @private
* @type {
* [handlerUID: string]: {
* seriesTaskMap?: {
* [seriesUID: string]: Task
* },
* overallTask?: Task
* }
* }
*/
this._stageTaskMap = createHashMap();
}
var proto = Scheduler.prototype; // If seriesModel provided, incremental threshold is check by series data.
proto.getPerformArgs = function (task, isBlock) {
// For overall task
if (!task.__pipeline) {
return;
}
var pipeline = this._pipelineMap.get(task.__pipeline.id);
var pCtx = pipeline.context;
var incremental = !isBlock && pipeline.progressiveEnabled && (!pCtx || pCtx.canProgressiveRender) && task.__idxInPipeline > pipeline.bockIndex;
return {
step: incremental ? pipeline.step : null
};
};
proto.getPipeline = function (pipelineId) {
return this._pipelineMap.get(pipelineId);
};
/**
* Current, progressive rendering starts from visual and layout.
* Always detect render mode in the same stage, avoiding that incorrect
* detection caused by data filtering.
* Caution:
* `updateStreamModes` use `seriesModel.getData()`.
*/
proto.updateStreamModes = function (seriesModel, view) {
var pipeline = this._pipelineMap.get(seriesModel.uid);
var data = seriesModel.getData();
var dataLen = data.count(); // `canProgressiveRender` means that can render progressively in each
// animation frame. Note that some types of series do not provide
// `view.incrementalPrepareRender` but support `chart.appendData`. We
// use the term `incremental` but not `progressive` to describe the
// case that `chart.appendData`.
var canProgressiveRender = pipeline.progressiveEnabled && view.incrementalPrepareRender && dataLen >= pipeline.threshold;
var large = seriesModel.get('large') && dataLen >= seriesModel.get('largeThreshold');
seriesModel.pipelineContext = pipeline.context = {
canProgressiveRender: canProgressiveRender,
large: large
};
};
proto.restorePipelines = function (ecModel) {
var scheduler = this;
var pipelineMap = scheduler._pipelineMap = createHashMap();
ecModel.eachSeries(function (seriesModel) {
var progressive = seriesModel.getProgressive();
var pipelineId = seriesModel.uid;
pipelineMap.set(pipelineId, {
id: pipelineId,
head: null,
tail: null,
threshold: seriesModel.getProgressiveThreshold(),
progressiveEnabled: progressive && !(seriesModel.preventIncremental && seriesModel.preventIncremental()),
bockIndex: -1,
step: progressive || 700,
// ??? Temporarily number
count: 0
});
pipe(scheduler, seriesModel, seriesModel.dataTask);
});
};
proto.prepareStageTasks = function () {
var stageTaskMap = this._stageTaskMap;
var ecModel = this.ecInstance.getModel();
var api = this.api;
each([this._dataProcessorHandlers, this._visualHandlers], function (stageHandlers) {
each(stageHandlers, function (handler) {
var record = stageTaskMap.get(handler.uid) || stageTaskMap.set(handler.uid, []);
handler.reset && createSeriesStageTask(this, handler, record, ecModel, api);
handler.overallReset && createOverallStageTask(this, handler, record, ecModel, api);
}, this);
}, this);
};
proto.prepareView = function (view, model, ecModel, api) {
var renderTask = view.renderTask;
var context = renderTask.context;
context.model = model;
context.ecModel = ecModel;
context.api = api;
renderTask.__block = !view.incrementalPrepareRender;
pipe(this, model, renderTask);
};
proto.performDataProcessorTasks = function (ecModel, payload) {
// If we do not use `block` here, it should be considered when to update modes.
performStageTasks(this, this._dataProcessorHandlers, ecModel, payload, {
block: true
});
}; // opt
// opt.visualType: 'visual' or 'layout'
// opt.setDirty
proto.performVisualTasks = function (ecModel, payload, opt) {
performStageTasks(this, this._visualHandlers, ecModel, payload, opt);
};
function performStageTasks(scheduler, stageHandlers, ecModel, payload, opt) {
opt = opt || {};
var unfinished;
each(stageHandlers, function (stageHandler, idx) {
if (opt.visualType && opt.visualType !== stageHandler.visualType) {
return;
}
var stageHandlerRecord = scheduler._stageTaskMap.get(stageHandler.uid);
var seriesTaskMap = stageHandlerRecord.seriesTaskMap;
var overallTask = stageHandlerRecord.overallTask;
if (overallTask) {
var overallNeedDirty;
var agentStubMap = overallTask.agentStubMap;
agentStubMap.each(function (stub) {
if (needSetDirty(opt, stub)) {
stub.dirty();
overallNeedDirty = true;
}
});
overallNeedDirty && overallTask.dirty();
updatePayload(overallTask, payload);
var performArgs = scheduler.getPerformArgs(overallTask, opt.block); // Execute stubs firstly, which may set the overall task dirty,
// then execute the overall task. And stub will call seriesModel.setData,
// which ensures that in the overallTask seriesModel.getData() will not
// return incorrect data.
agentStubMap.each(function (stub) {
stub.perform(performArgs);
});
unfinished |= overallTask.perform(performArgs);
} else if (seriesTaskMap) {
seriesTaskMap.each(function (task, pipelineId) {
if (needSetDirty(opt, task)) {
task.dirty();
}
var performArgs = scheduler.getPerformArgs(task, opt.block);
performArgs.skip = !stageHandler.performRawSeries && ecModel.isSeriesFiltered(task.context.model);
updatePayload(task, payload);
unfinished |= task.perform(performArgs);
});
}
});
function needSetDirty(opt, task) {
return opt.setDirty && (!opt.dirtyMap || opt.dirtyMap.get(task.__pipeline.id));
}
scheduler.unfinished |= unfinished;
}
proto.performSeriesTasks = function (ecModel) {
var unfinished;
ecModel.eachSeries(function (seriesModel) {
// Progress to the end for dataInit and dataRestore.
unfinished |= seriesModel.dataTask.perform();
});
this.unfinished |= unfinished;
};
proto.plan = function () {
// Travel pipelines, check block.
this._pipelineMap.each(function (pipeline) {
var task = pipeline.tail;
do {
if (task.__block) {
pipeline.bockIndex = task.__idxInPipeline;
break;
}
task = task.getUpstream();
} while (task);
});
};
var updatePayload = proto.updatePayload = function (task, payload) {
payload !== 'remain' && (task.context.payload = payload);
};
function createSeriesStageTask(scheduler, stageHandler, stageHandlerRecord, ecModel, api) {
var seriesTaskMap = stageHandlerRecord.seriesTaskMap || (stageHandlerRecord.seriesTaskMap = createHashMap());
var seriesType = stageHandler.seriesType;
var getTargetSeries = stageHandler.getTargetSeries; // If a stageHandler should cover all series, `createOnAllSeries` should be declared mandatorily,
// to avoid some typo or abuse. Otherwise if an extension do not specify a `seriesType`,
// it works but it may cause other irrelevant charts blocked.
if (stageHandler.createOnAllSeries) {
ecModel.eachRawSeries(create);
} else if (seriesType) {
ecModel.eachRawSeriesByType(seriesType, create);
} else if (getTargetSeries) {
getTargetSeries(ecModel, api).each(create);
}
function create(seriesModel) {
var pipelineId = seriesModel.uid; // Init tasks for each seriesModel only once.
// Reuse original task instance.
var task = seriesTaskMap.get(pipelineId) || seriesTaskMap.set(pipelineId, createTask({
plan: seriesTaskPlan,
reset: seriesTaskReset,
count: seriesTaskCount
}));
task.context = {
model: seriesModel,
ecModel: ecModel,
api: api,
useClearVisual: stageHandler.isVisual && !stageHandler.isLayout,
plan: stageHandler.plan,
reset: stageHandler.reset,
scheduler: scheduler
};
pipe(scheduler, seriesModel, task);
} // Clear unused series tasks.
var pipelineMap = scheduler._pipelineMap;
seriesTaskMap.each(function (task, pipelineId) {
if (!pipelineMap.get(pipelineId)) {
task.dispose();
seriesTaskMap.removeKey(pipelineId);
}
});
}
function createOverallStageTask(scheduler, stageHandler, stageHandlerRecord, ecModel, api) {
var overallTask = stageHandlerRecord.overallTask = stageHandlerRecord.overallTask // For overall task, the function only be called on reset stage.
|| createTask({
reset: overallTaskReset
});
overallTask.context = {
ecModel: ecModel,
api: api,
overallReset: stageHandler.overallReset,
scheduler: scheduler
}; // Reuse orignal stubs.
var agentStubMap = overallTask.agentStubMap = overallTask.agentStubMap || createHashMap();
var seriesType = stageHandler.seriesType;
var getTargetSeries = stageHandler.getTargetSeries;
var overallProgress = true;
var isOverallFilter = stageHandler.isOverallFilter; // An overall task with seriesType detected or has `getTargetSeries`, we add
// stub in each pipelines, it will set the overall task dirty when the pipeline
// progress. Moreover, to avoid call the overall task each frame (too frequent),
// we set the pipeline block.
if (seriesType) {
ecModel.eachRawSeriesByType(seriesType, createStub);
} else if (getTargetSeries) {
getTargetSeries(ecModel, api).each(createStub);
} // Otherwise, (usually it is legancy case), the overall task will only be
// executed when upstream dirty. Otherwise the progressive rendering of all
// pipelines will be disabled unexpectedly. But it still needs stubs to receive
// dirty info from upsteam.
else {
overallProgress = false;
each(ecModel.getSeries(), createStub);
}
function createStub(seriesModel) {
var pipelineId = seriesModel.uid;
var stub = agentStubMap.get(pipelineId) || agentStubMap.set(pipelineId, createTask({
reset: stubReset,
onDirty: stubOnDirty
}));
stub.context = {
model: seriesModel,
overallProgress: overallProgress,
isOverallFilter: isOverallFilter
};
stub.agent = overallTask;
stub.__block = overallProgress;
pipe(scheduler, seriesModel, stub);
} // Clear unused stubs.
var pipelineMap = scheduler._pipelineMap;
agentStubMap.each(function (stub, pipelineId) {
if (!pipelineMap.get(pipelineId)) {
stub.dispose();
agentStubMap.removeKey(pipelineId);
}
});
}
function overallTaskReset(context) {
context.overallReset(context.ecModel, context.api, context.payload);
}
function stubReset(context, upstreamContext) {
return context.overallProgress && stubProgress;
}
function stubProgress() {
this.agent.dirty();
this.getDownstream().dirty();
}
function stubOnDirty() {
this.agent && this.agent.dirty();
}
function seriesTaskPlan(context) {
return context.plan && context.plan(context.model, context.ecModel, context.api, context.payload);
}
function seriesTaskReset(context) {
if (context.useClearVisual) {
context.data.clearAllVisual();
}
var resetDefines = context.resetDefines = normalizeToArray(context.reset(context.model, context.ecModel, context.api, context.payload));
if (resetDefines.length) {
return seriesTaskProgress;
}
}
function seriesTaskProgress(params, context) {
var data = context.data;
var resetDefines = context.resetDefines;
for (var k = 0; k < resetDefines.length; k++) {
var resetDefine = resetDefines[k];
if (resetDefine && resetDefine.dataEach) {
for (var i = params.start; i < params.end; i++) {
resetDefine.dataEach(data, i);
}
} else if (resetDefine && resetDefine.progress) {
resetDefine.progress(params, data);
}
}
}
function seriesTaskCount(context) {
return context.data.count();
}
function pipe(scheduler, seriesModel, task) {
var pipelineId = seriesModel.uid;
var pipeline = scheduler._pipelineMap.get(pipelineId);
!pipeline.head && (pipeline.head = task);
pipeline.tail && pipeline.tail.pipe(task);
pipeline.tail = task;
task.__idxInPipeline = pipeline.count++;
task.__pipeline = pipeline;
}
Scheduler.wrapStageHandler = function (stageHandler, visualType) {
if (isFunction(stageHandler)) {
stageHandler = {
overallReset: stageHandler,
seriesType: detectSeriseType(stageHandler)
};
}
stageHandler.uid = getUID('stageHandler');
visualType && (stageHandler.visualType = visualType);
return stageHandler;
};
/**
* Only some legacy stage handlers (usually in echarts extensions) are pure function.
* To ensure that they can work normally, they should work in block mode, that is,
* they should not be started util the previous tasks finished. So they cause the
* progressive rendering disabled. We try to detect the series type, to narrow down
* the block range to only the series type they concern, but not all series.
*/
function detectSeriseType(legacyFunc) {
seriesType = null;
try {
// Assume there is no async when calling `eachSeriesByType`.
legacyFunc(ecModelMock, apiMock);
} catch (e) {}
return seriesType;
}
var ecModelMock = {};
var apiMock = {};
var seriesType;
mockMethods(ecModelMock, GlobalModel);
mockMethods(apiMock, ExtensionAPI);
ecModelMock.eachSeriesByType = ecModelMock.eachRawSeriesByType = function (type) {
seriesType = type;
};
ecModelMock.eachComponent = function (cond) {
if (cond.mainType === 'series' && cond.subType) {
seriesType = cond.subType;
}
};
function mockMethods(target, Clz) {
for (var name in Clz.prototype) {
// Do not use hasOwnProperty
target[name] = noop;
}
}
var _default = Scheduler;
module.exports = _default;
|