1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
// A mixed paradigm scheduler
const bs = require('binary-search');
// The core managed data type: a queue will be pushed a number of these and asked to manage it.
exports.Job = function(data, prio=0, wt=1){
// Data is an array;
// Ideally, Both prio and wt are functions which take one argument: how many write cycles they've been waiting. This helps manage time-sensitive jobs.
// But this is too computationally difficult, so they are integer constants.
// Immutable Properties
this.data = data; //A set of work to be done (like character writes)
this.prio = prio; // If a.prio > b.prio, all of `a.data` will be sent before any of `b.data`
this.wt = wt; // After numerous calls, (amount called of a/amount called of b) = a.wt/b.wt if a.prio = b.prio
this.maxr = () => this.data.length/this.wt; // A utility calculation: If a job has a lower maxr, it will run out of data earlier.
// Mutable Properties
this.wacc = 0; // mutable property: Queue will change this to keep track between dequeues of how much "left over" push real estate this should have.
}
exports.Queue = function(delayms, maxExport, call){
// Every delay ms, Queue executes `call(maxExport # of objs in queue)`
let jobs = {}; // Links priorities to unordered arrays of jobs
let prios = []; // Array of priorities (keys of jobs), sorted.
let open = true; // Is dequeue() allowed to be called (i.e., has the timeout expired?)
let disab = true; // Is the queue disabled?
let pauseCall = null; // Either null or a function which will replace the next dequeue() call.
this.size = 0;
this.enqueue = function(job){
if (! (job instanceof exports.Job)) job = new exports.Job(job);
if (! job.data[0]) return;
let prio = job.prio;
if (!jobs[prio]){
jobs[prio] = [];
prios.splice(Math.abs(bs(prios, prio, (el, ne) => el-ne)), 0, prio); // prios is meant to be sorted least to most, and each job layer is too (by "maximum number of rounds").
}
jobs[prio].splice(Math.abs(bs(jobs[prio], job, (el, ne) => el.maxr() - ne.maxr())), 0, job);
// These were sorted like this so that getNumOrAll could use [0] or [.length-1] or .pop instead of having to re-sort lists repetitively.
this.size += job.data.length;
if (open) this.dequeue();
}
this.enable = function(){
disab = false;
if (open) this.dequeue();
}
this.disable = function(){
disab = true;
}
this.pause = function(call){ // run `call()` instead of `dequeue()` when timeout expires. Just do it once (overwrites other calls)
pauseCall = call;
}
this.dequeue = function(){
// Wraps getNumOrAll, managing open/disab, and concatenating possibly multiple layers
if (disab){
open = true;
return;
}
if (prios.length == 0){
open = true;
return;
}
if (pauseCall){
call();
pauseCall = null;
return;
}
open = false;
let data = [];
while (prios.length > 0 && data.length < maxExport){
data.push(...getNumOrAll(prios[prios.length-1], maxExport-data.length));
}
this.size -= data.length;
call(data);
if (delayms) setTimeout(()=>this.dequeue(), delayms);
}
function getNumOrAll(prio, num){
// Step 1: (Pre-)sort by job.data.length/job.weight.
let jobq = jobs[prio];
let dequeued = [];
// Step 2: Start at lowest, and pop all until job.data.length>job.normweight*num (decreasing num as popping and recalc job.normweight). Delete the job.
let weightsum = jobq.map(job => job.wt).reduce((acc, cur)=>acc+cur);
while (jobq[0] && jobq[0].data.length<(jobq[0].wt*num/weightsum)){
// The second req is SO weird. Think about it this way: for len objs
// to be pushed, the "odds" of the job getting a push must >= len (pushed
// len times). But it gets num tries, so the odds*num >= len.
// The odds are wt/wtsum for each try.
dequeued.push(...jobq.shift().data);
}
// Step 3: Then, pop job.normweight*num//1 elems from remaining, without num decrease or normweight recalc. But keep job.wacc = job.normweight*num%1
let efflen = num - dequeued.length;
for (let job of jobq){
let topop = job.wt*efflen/weightsum;
job.wacc += topop%1;
topop = topop-topop%1;
let data = job.data;
dequeued.push(...data.splice(data.length-topop));
}
// Step 4: Shallow copy job array, and sort by job.wacc.
let sort = jobq.slice().sort((el, ne) => el.wacc-ne.wacc);
// not inline because when inline, runs over and over again (loop)
for (let job of sort){
// Step 5: Iterate through array (high->low), and subtract 1 until the length of output is num.
if (dequeued.length == num || job.data.length == 0) break;
job.wacc--;
dequeued.push(job.data.pop());
}
// Step 6: If empty, remove prio && jobs[prio]; return.
if (jobq.length == 0){
delete jobs[prio];
prios.splice(bs(prios, prio, (el, ne)=>el-ne), 1);
}
return dequeued;
}
}
|