// A mixed paradigm scheduler const bs = require('binary-search'); // The core managed data type: a queue will be pushed a number of these and asked to manage it. exports.Job = function(data, prio=0, wt=1){ // Data is an array; // Ideally, Both prio and wt are functions which take one argument: how many write cycles they've been waiting. This helps manage time-sensitive jobs. // But this is too computationally difficult, so they are integer constants. // Immutable Properties this.data = data; //A set of work to be done (like character writes) this.prio = prio; // If a.prio > b.prio, all of `a.data` will be sent before any of `b.data` this.wt = wt; // After numerous calls, (amount called of a/amount called of b) = a.wt/b.wt if a.prio = b.prio this.maxr = () => this.data.length/this.wt; // A utility calculation: If a job has a lower maxr, it will run out of data earlier. // Mutable Properties this.wacc = 0; // mutable property: Queue will change this to keep track between dequeues of how much "left over" push real estate this should have. } exports.Queue = function(delayms, maxExport, call){ // Every delay ms, Queue executes `call(maxExport # of objs in queue)` let jobs = {}; // Links priorities to unordered arrays of jobs let prios = []; // Array of priorities (keys of jobs), sorted. let open = true; // Is dequeue() allowed to be called (i.e., has the timeout expired?) let disab = true; // Is the queue disabled? let pauseCall = null; // Either null or a function which will replace the next dequeue() call. this.size = 0; this.enqueue = function(job){ if (! (job instanceof exports.Job)) job = new exports.Job(job); if (! job.data[0]) return; let prio = job.prio; if (!jobs[prio]){ jobs[prio] = []; prios.splice(Math.abs(bs(prios, prio, (el, ne) => el-ne)), 0, prio); // prios is meant to be sorted least to most, and each job layer is too (by "maximum number of rounds"). } jobs[prio].splice(Math.abs(bs(jobs[prio], job, (el, ne) => el.maxr() - ne.maxr())), 0, job); // These were sorted like this so that getNumOrAll could use [0] or [.length-1] or .pop instead of having to re-sort lists repetitively. this.size += job.data.length; if (open) this.dequeue(); } this.enable = function(){ disab = false; if (open) this.dequeue(); } this.disable = function(){ disab = true; } this.pause = function(call){ // run `call()` instead of `dequeue()` when timeout expires. Just do it once (overwrites other calls) pauseCall = call; } this.dequeue = function(){ // Wraps getNumOrAll, managing open/disab, and concatenating possibly multiple layers if (disab){ open = true; return; } if (prios.length == 0){ open = true; return; } if (pauseCall){ call(); pauseCall = null; return; } open = false; let data = []; while (prios.length > 0 && data.length < maxExport){ data.push(...getNumOrAll(prios[prios.length-1], maxExport-data.length)); } this.size -= data.length; call(data); if (delayms) setTimeout(()=>this.dequeue(), delayms); } function getNumOrAll(prio, num){ // Step 1: (Pre-)sort by job.data.length/job.weight. let jobq = jobs[prio]; let dequeued = []; // Step 2: Start at lowest, and pop all until job.data.length>job.normweight*num (decreasing num as popping and recalc job.normweight). Delete the job. let weightsum = jobq.map(job => job.wt).reduce((acc, cur)=>acc+cur); while (jobq[0] && jobq[0].data.length<(jobq[0].wt*num/weightsum)){ // The second req is SO weird. Think about it this way: for len objs // to be pushed, the "odds" of the job getting a push must >= len (pushed // len times). But it gets num tries, so the odds*num >= len. // The odds are wt/wtsum for each try. dequeued.push(...jobq.shift().data); } // Step 3: Then, pop job.normweight*num//1 elems from remaining, without num decrease or normweight recalc. But keep job.wacc = job.normweight*num%1 let efflen = num - dequeued.length; for (let job of jobq){ let topop = job.wt*efflen/weightsum; job.wacc += topop%1; topop = topop-topop%1; let data = job.data; dequeued.push(...data.splice(data.length-topop)); } // Step 4: Shallow copy job array, and sort by job.wacc. for (let job of jobq.slice().sort((el, ne) => el.wacc-ne.wacc)){ // Step 5: Iterate through array (high->low), and subtract 1 until the length of output is num. if (dequeued.length == num || job.data.length == 0) break; job.wacc--; dequeued.push(job.data.pop()); } // Step 6: If empty, remove prio && jobs[prio]; return. if (jobq.length == 0){ delete jobs[prio]; prios.splice(bs(prios, prio, (el, ne)=>el-ne), 1); } return dequeued; } }