Skip to content

Commit 5a83691

Browse files
committed
Add ability to throttle disk reads
Useful for limiting disk load, but could also be a crude way to limit CPU usage
1 parent ee72171 commit 5a83691

File tree

7 files changed

+394
-50
lines changed

7 files changed

+394
-50
lines changed

bin/parpar.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,31 @@ var print_json = function(type, obj) {
2020
};
2121
var arg_parser = require('../lib/arg_parser.js');
2222

23+
var throttleParseFunc = function(prop, v) {
24+
if(!v || v == '0') return null;
25+
var m = (''+v).match(/^(([0-9.]*)([bkmgtper])\/)?([0-9.]*)(m?s|[mhdw])$/i);
26+
if(!m) error('Invalid format for `--'+prop+'-read-throttle`: ' + v);
27+
if(m[4] === '') m[4] = '1';
28+
if(m[1]) {
29+
if(m[2] === '') m[2] = '1';
30+
if(m[3] == 'r' || m[3] == 'R') return {
31+
mode: 'reqrate',
32+
count: m[2],
33+
time: arg_parser.parseTime(m[4] + m[5])
34+
};
35+
else return {
36+
mode: 'rate',
37+
size: arg_parser.parseSize(m[2] + m[3]),
38+
time: arg_parser.parseTime(m[4] + m[5])
39+
};
40+
} else {
41+
return {
42+
mode: 'delay',
43+
time: arg_parser.parseTime(m[4] + m[5])
44+
};
45+
}
46+
};
47+
2348
var opts = {
2449
'input-slices': {
2550
alias: 's',
@@ -173,6 +198,16 @@ var opts = {
173198
type: 'int',
174199
map: 'chunkReadThreads'
175200
},
201+
'seq-read-throttle': {
202+
type: 'string',
203+
map: 'seqReadThrottle',
204+
fn: throttleParseFunc.bind(null, 'seq')
205+
},
206+
'chunk-read-throttle': {
207+
type: 'string',
208+
map: 'chunkReadThrottle',
209+
fn: throttleParseFunc.bind(null, 'chunk')
210+
},
176211
'read-buffers': {
177212
type: 'int',
178213
map: 'readBuffers'

help-full.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,13 @@ I/O Tuning Options:
223223
Default `2`
224224
--read-buffers Maximum number of read buffers to read into and
225225
send to processing backend. Default `8`
226+
--seq-read-throttle Throttle sequential read speed. The following
227+
examples demonstrate permitted values:
228+
`2.5s`: delay each read request by 2.5 seconds
229+
`2.5M/s`: limit to 2.5MB per second
230+
`5r/2s`: limit of 5 requests per 2 seconds
231+
--chunk-read-throttle This is the same as `--seq-read-throttle` but
232+
applies to chunked reading.
226233
--read-hash-queue Number of read buffers to queue up for hashing
227234
before reading from a different file. Lower
228235
values may be more optimal on disks with faster

lib/filechunkreader.js

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
var fs = require('fs');
44
var async = require('async');
55
var ProcQueue = require('./procqueue');
6+
var ThrottleQueue = require('./throttlequeue');
67
var bufferSlice = Buffer.prototype.readBigInt64BE ? Buffer.prototype.subarray : Buffer.prototype.slice;
78

8-
function FileChunkReader(files, sliceSize, chunkSize, chunkOffset, bufPool, concurrency, cbChunk, cb) {
9+
function FileChunkReader(files, sliceSize, chunkSize, chunkOffset, bufPool, concurrency, throttle, cbChunk, cb) {
910
var readQ = new ProcQueue(concurrency);
1011
var readErr = null;
12+
if(!throttle) throttle = new ThrottleQueue.NoThrottle();
1113
async.eachSeries(files, function(file, cb) {
1214
if(file.size == 0 || file.size <= chunkOffset) return cb();
1315
fs.open(file.name, 'r', function(err, fd) {
@@ -20,26 +22,33 @@ function FileChunkReader(files, sliceSize, chunkSize, chunkOffset, bufPool, conc
2022
bufPool.get(function(buffer) {
2123
readQ.run(function(readDone) {
2224
if(readErr) return cb(readErr);
23-
fs.read(fd, buffer, 0, chunkSize, filePos, function(err, bytesRead) {
24-
if(err) readErr = err;
25-
else cbChunk(file, bufferSlice.call(buffer, 0, bytesRead), sliceNum, bufPool.put.bind(bufPool, buffer));
26-
27-
if(--chunksLeft == 0) {
28-
// all chunks read from this file, so close it
29-
fs.close(fd, function(err) {
30-
if(err) readErr = err;
25+
throttle.pass(chunkSize, function(cancelled, throttleReadDone) {
26+
if(cancelled) return cb();
27+
fs.read(fd, buffer, 0, chunkSize, filePos, function(err, bytesRead) {
28+
throttleReadDone();
29+
if(err) readErr = err;
30+
else cbChunk(file, bufferSlice.call(buffer, 0, bytesRead), sliceNum, bufPool.put.bind(bufPool, buffer));
31+
32+
if(--chunksLeft == 0) {
33+
// all chunks read from this file, so close it
34+
fs.close(fd, function(err) {
35+
if(err) readErr = err;
36+
readDone();
37+
});
38+
} else
3139
readDone();
32-
});
33-
} else
34-
readDone();
40+
});
41+
cb();
3542
});
36-
cb();
3743
});
3844
});
3945
}, cb);
4046
});
4147
}, function(err) {
42-
if(err) return cb(err);
48+
if(err) {
49+
throttle.cancel();
50+
return cb(err);
51+
}
4352
readQ.end(function() {
4453
cb(readErr);
4554
});

lib/fileseqreader.js

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"use strict";
22

33
var fs = require('fs');
4+
var ThrottleQueue = require('./throttlequeue');
45
var allocBuffer = (Buffer.allocUnsafe || Buffer);
56

67
function FileReaderData(file, buffer, len, pos, parent) {
@@ -25,14 +26,15 @@ FileReaderData.prototype = {
2526
}
2627
};
2728

28-
function FileSeqReader(files, readSize, readBuffers) {
29+
function FileSeqReader(files, readSize, readBuffers, throttleQ) {
2930
this.fileQueue = files.filter(function(file) {
3031
return file.size > 0;
3132
});
3233
this.buf = [];
3334
this.readSize = readSize;
3435
this.maxBufs = readBuffers;
3536
this.openFiles = [];
37+
this.throttleQ = throttleQ || (new ThrottleQueue.NoThrottle());
3638
}
3739

3840
FileSeqReader.prototype = {
@@ -46,6 +48,7 @@ FileSeqReader.prototype = {
4648
cb: null,
4749
finishCb: null,
4850
_isReading: false,
51+
throttleQ: null,
4952

5053
// when doing sequential read with chunker, caller requires the first chunkLen bytes of every slice, so ensure that this always arrives as one piece
5154
reqSliceLen: 0,
@@ -110,40 +113,44 @@ FileSeqReader.prototype = {
110113
_doRead: function(file, buffer) {
111114
var self = this;
112115
var readSize = this._readSize(file.pos, file.info.size);
113-
fs.read(file.fd, buffer, 0, readSize[0], null, function(err, bytesRead) {
114-
if(err) return self.cb(err);
115-
116-
// file position/EOF tracking
117-
var newPos = file.pos + bytesRead;
118-
if(newPos > file.info.size)
119-
return self.cb(new Error('Read past expected end of file - latest position (' + newPos + ') exceeds size (' + file.info.size + ')'));
120-
121-
var eof = (newPos == file.info.size);
122-
if(!eof && bytesRead != readSize[0])
123-
return self.cb(new Error("Read failure - expected " + readSize[0] + " bytes, got " + bytesRead + " bytes instead."));
124-
125-
// increase hashing count and wait for other end to signal when done
126-
var ret = new FileReaderData(file, buffer, bytesRead, file.pos, self);
127-
if(readSize[1])
128-
ret.chunks = readSize[1];
129-
file.hashQueue++;
130-
file.pos += bytesRead;
131-
self.cb(null, ret);
132-
133-
if(eof) {
134-
// remove from openFiles
135-
for(var i=0; i<self.openFiles.length; i++)
136-
if(self.openFiles[i].fd == file.fd) {
137-
self.openFiles.splice(i, 1);
138-
break;
139-
}
116+
this.throttleQ.pass(readSize[0], function(cancelled, readDone) {
117+
if(cancelled) return; // this should never happen because we only read once at a time
118+
fs.read(file.fd, buffer, 0, readSize[0], null, function(err, bytesRead) {
119+
readDone();
120+
if(err) return self.cb(err);
140121

141-
fs.close(file.fd, function(err) {
142-
if(err) self.cb(err);
143-
else self.readNext();
144-
});
145-
} else
146-
self.readNext();
122+
// file position/EOF tracking
123+
var newPos = file.pos + bytesRead;
124+
if(newPos > file.info.size)
125+
return self.cb(new Error('Read past expected end of file - latest position (' + newPos + ') exceeds size (' + file.info.size + ')'));
126+
127+
var eof = (newPos == file.info.size);
128+
if(!eof && bytesRead != readSize[0])
129+
return self.cb(new Error("Read failure - expected " + readSize[0] + " bytes, got " + bytesRead + " bytes instead."));
130+
131+
// increase hashing count and wait for other end to signal when done
132+
var ret = new FileReaderData(file, buffer, bytesRead, file.pos, self);
133+
if(readSize[1])
134+
ret.chunks = readSize[1];
135+
file.hashQueue++;
136+
file.pos += bytesRead;
137+
self.cb(null, ret);
138+
139+
if(eof) {
140+
// remove from openFiles
141+
for(var i=0; i<self.openFiles.length; i++)
142+
if(self.openFiles[i].fd == file.fd) {
143+
self.openFiles.splice(i, 1);
144+
break;
145+
}
146+
147+
fs.close(file.fd, function(err) {
148+
if(err) self.cb(err);
149+
else self.readNext();
150+
});
151+
} else
152+
self.readNext();
153+
});
147154
});
148155
},
149156

lib/par2gen.js

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ var fs = require('fs');
77
var path = require('path');
88
var FileSeqReader = require('./fileseqreader');
99
var FileChunkReader = require('./filechunkreader');
10+
var ThrottleQueue = require('./throttlequeue');
1011
var BufferPool = require('./bufferpool');
1112
var PAR2OutFile = require('./par2outfile');
1213
var bufferSlice = Buffer.prototype.readBigInt64BE ? Buffer.prototype.subarray : Buffer.prototype.slice;
@@ -187,6 +188,34 @@ function calcNumRecoverySlices(spec, sliceSize, inSlices, files) {
187188
}, 0);
188189
}
189190

191+
function createThrottle(opts) {
192+
var throttleTime = opts.time|0;
193+
switch(opts.mode) {
194+
case 'delay':
195+
if(throttleTime <= 0)
196+
throw new Error('Invalid throttle delay (' + throttleTime + ')');
197+
return new ThrottleQueue.DelayThrottle(throttleTime);
198+
case 'rate':
199+
if(!throttleTime) throttleTime = 1000;
200+
if(throttleTime <= 0)
201+
throw new Error('Invalid throttle time (' + throttleTime + ')');
202+
var throttleSize = opts.size|0;
203+
if(throttleSize <= 0)
204+
throw new Error('Invalid throttle size (' + throttleSize + ')');
205+
return new ThrottleQueue.RateThrottle(throttleSize, throttleTime);
206+
case 'reqrate':
207+
if(!throttleTime) throttleTime = 1000;
208+
if(throttleTime <= 0)
209+
throw new Error('Invalid throttle time (' + throttleTime + ')');
210+
var throttleCount = opts.count|0;
211+
if(throttleCount <= 0 || throttleCount != +opts.count)
212+
throw new Error('Invalid throttle count (' + throttleCount + ')');
213+
return new ThrottleQueue.ReqRateThrottle(throttleCount, throttleTime);
214+
default:
215+
throw new Error('Unknown throttle mode (' + opts.mode + ')');
216+
}
217+
}
218+
190219
// use negative value for sliceSize to indicate exact number of input blocks
191220
function PAR2Gen(fileInfo, sliceSize, opts) {
192221
if(!(this instanceof PAR2Gen))
@@ -240,6 +269,8 @@ function PAR2Gen(fileInfo, sliceSize, opts) {
240269
displayNameBase: '.', // base path, only used if displayNameFormat is 'path'
241270
seqReadSize: 4*1048576, // 4MB
242271
chunkReadThreads: 2,
272+
seqReadThrottle: null, // no throttle, otherwise format is {mode: ('delay'|'rate'|'reqrate') [,time: <ms>] [,size: <bytes>] [,count: <num>]}
273+
chunkReadThrottle: null, // same as above
243274
readBuffers: 8,
244275
readHashQueue: 5,
245276
numThreads: null, // null => number of processors
@@ -715,6 +746,11 @@ function PAR2Gen(fileInfo, sliceSize, opts) {
715746
throw new Error('Minimum chunk size (' + friendlySize(o.minChunkSize) + ') cannot exceed read buffer size (' + friendlySize(o.seqReadSize) + ')');
716747
}
717748

749+
if(o.seqReadThrottle)
750+
this.seqReadThrottle = createThrottle(o.seqReadThrottle);
751+
if(o.chunkReadThrottle)
752+
this.chunkReadThrottle = createThrottle(o.chunkReadThrottle);
753+
718754
if(o.memoryLimit) {
719755
var cpuMinChunk = Math.ceil(cpuRatio * o.minChunkSize /2) *2;
720756
if(o.minChunkSize && (o.recDataSize+1) * o.minChunkSize > o.memoryLimit)
@@ -1014,6 +1050,8 @@ PAR2Gen.prototype = {
10141050
chunkOffset: 0,
10151051
readSize: 0,
10161052
_buf: null,
1053+
seqReadThrottle: null,
1054+
chunkReadThrottle: null,
10171055

10181056
_rfPush: function(numSlices, sliceOffsetOrExponents, critPackets, creator) {
10191057
var packets, recvSize = 0, critTotalSize = sumSize(critPackets);
@@ -1306,15 +1344,15 @@ PAR2Gen.prototype = {
13061344
if(seeking) {
13071345
if(!self._chunker) return cb(new Error('Trying to perform chunked reads without a chunker'));
13081346
var bufPool = new BufferPool(this._buf, chunkSize, this.opts.readBuffers);
1309-
FileChunkReader(this.files, this.opts.sliceSize, chunkSize, this.chunkOffset, bufPool, this.opts.chunkReadThreads, function(file, buffer, sliceNum, cb) {
1347+
FileChunkReader(this.files, this.opts.sliceSize, chunkSize, this.chunkOffset, bufPool, this.opts.chunkReadThreads, this.chunkReadThrottle, function(file, buffer, sliceNum, cb) {
13101348
if(cbProgress) cbProgress('processing_slice', file, sliceNum);
13111349
self._chunker.processData(file.sliceOffset+sliceNum, buffer, cb);
13121350
}, function(err) {
13131351
if(err) cb(err);
13141352
else bufPool.end(cb);
13151353
});
13161354
} else {
1317-
var reader = new FileSeqReader(this.files, this.readSize, this.opts.readBuffers);
1355+
var reader = new FileSeqReader(this.files, this.readSize, this.opts.readBuffers, this.seqReadThrottle);
13181356
reader.setBuffers(this._buf);
13191357
reader.maxQueuePerFile = this.opts.readHashQueue;
13201358

0 commit comments

Comments
 (0)