-
Amanda Ghassaei authoredAmanda Ghassaei authored
persistentWorkers.js 3.50 KiB
//global workers so they do not have to be reinstantiated
function persistentWorkers(numWorkers){
//check compatibility
if (!(typeof window.Worker === "function")){
console.log("workers not supported");
return null;
}
//local variables
var allWorkers = [];
var mapQueue = [];
//create array of workers
var URL = window.URL || window.webkitURL;
var workerURL = makeBlobURL(URL, myWorker);
for (var i=0;i<numWorkers;i++){
var worker = new Worker(workerURL);
worker.onmessage = onMessage;
worker.postMessage({url: document.location.toString()});
allWorkers.push(worker);
}
URL.revokeObjectURL(workerURL);
function map(data, executable, env, incrCallback, finalCallback){
//save new task in map queue
mapQueue.push({data:data,
executable:prepareExeFunc(executable.toString()),
env:env,
index:0,
incrCallback:incrCallback,
finalCallback:finalCallback,
finished:false,
activeThreads:0});
for (var i=0;i<allWorkers.length;i++){
allWorkers[i].postMessage({isWorking:true});//ask workers if they are busy
}
}
function prepareExeFunc(string){
var index = string.indexOf("(");//get index of first ( in function declaration
if (index == -1) {
console.log("exe function not formed properly for web workers " + string);
return null;
}
return "function executable" + string.substring(index);
}
function onMessage(e){
if (e.data.result){//handle result first
//get current work item off queue
if (mapQueue.length == 0) return;
var currentTask = mapQueue[0];
currentTask.activeThreads--;//decrement active threads
if (currentTask.incrCallback) currentTask.incrCallback(e.data.result);//incremental callback
if (currentTask.finished && currentTask.activeThreads == 0){
if (currentTask.finalCallback) currentTask.finalCallback();
mapQueue.shift();//remove first element
}
}
var nextTask = getNextTask(mapQueue[0], 0);
if (!nextTask) return;
var currentIndex = nextTask.index;
nextTask.index++;
//check that the index is not out of bounds
if (nextTask.data.length<=currentIndex){
nextTask.finished = true;
e.data.result = null;//remove result so it doesn't get handled twice
onMessage(e);//try again in case there is another item in the queue to start
return;
}
if (e.data.isWorking === false){
nextTask.activeThreads++;
e.target.postMessage({
arg:nextTask.data[currentIndex],
localEnv:JSON.stringify(nextTask.env),
executable:nextTask.executable});
}
}
function getNextTask(currentTask, index){
if (!currentTask) return null;
if (currentTask.finished){
var nextIndex = index+1;
if (mapQueue.length<=nextIndex) return null;
return getNextTask(mapQueue[nextIndex]);
} else {
return currentTask;
}
}
function makeBlobURL(URL, someFunction) {
var blob = new Blob(["(" + someFunction.toString() + ")()"], { type: "text/javascript" });
return URL.createObjectURL(blob);
}
return {map:map, allWorkers:allWorkers};//return all public methods and vars
}