Skip to content

Commit d94e8d0

Browse files
committed
fix the computations
1 parent e5187b1 commit d94e8d0

File tree

1 file changed

+83
-74
lines changed

1 file changed

+83
-74
lines changed

src/csv.ts

Lines changed: 83 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export type CSVDataFrame = DataFrame<Metadata>;
2323

2424
const defaultChunkSize = 50 * 1024; // 50 KB, same as Papaparse default
2525
const defaultMaxCachedBytes = 20 * 1024 * 1024; // 20 MB
26-
const paddingRows = 10; // fetch a bit before and after the requested range, to avoid cutting rows
26+
const paddingRows = 20; // fetch a bit before and after the requested range, to avoid cutting rows
2727

2828
interface Params {
2929
url: string;
@@ -110,7 +110,7 @@ export async function csvDataFrame({
110110
step: ({ data, meta }, parser) => {
111111
const parsedRow = {
112112
start: cursor,
113-
end: meta.cursor,
113+
end: meta.cursor, // it's not exact! if the string contains "é", it counts as 2 bytes, but Papaparse counts it as 1 character!!!
114114
data,
115115
};
116116
cursor = parsedRow.end;
@@ -167,6 +167,7 @@ export async function csvDataFrame({
167167
},
168168
});
169169
});
170+
console.log(firstParsedRange);
170171
if (header === undefined) {
171172
throw new Error("No header row found in the CSV file");
172173
}
@@ -287,81 +288,88 @@ export async function csvDataFrame({
287288
},
288289
});
289290

290-
// await Promise.resolve(); // ensure async
291+
let previousAverageRowBytes = undefined as number | undefined;
292+
let i = 0;
293+
while (previousAverageRowBytes !== cache.averageRowBytes && i < 10) {
294+
i++; // to avoid infinite loops in case of instability
291295

292-
if (rowEnd < cache.serial.validRows.length) {
293-
// all rows are in the serial range
294-
return;
295-
}
296-
if (rowStart < cache.serial.validRows.length) {
297-
// ignore the rows already cached
298-
rowStart = cache.serial.validRows.length;
299-
}
300-
301-
const estimatedStart = Math.floor(
302-
cache.header.bytes + rowStart * cache.averageRowBytes
303-
);
304-
const estimatedEnd = Math.min(
305-
byteLength,
306-
Math.ceil(cache.header.bytes + rowEnd * cache.averageRowBytes)
307-
);
308-
// find the ranges of rows we don't have yet
309-
// start with the full range, and then remove the parts we have
310-
const missingRange = {
311-
start: estimatedStart,
312-
end: estimatedEnd,
313-
};
314-
const missingRanges: { start: number; end: number }[] = [];
315-
// Loop on the random ranges, which are sorted and non-overlapping
316-
for (const range of cache.random) {
317-
if (missingRange.end <= range.start) {
318-
// no overlap, and no more overlap possible
319-
missingRanges.push(missingRange);
320-
break;
296+
if (rowEnd < cache.serial.validRows.length) {
297+
// all rows are in the serial range
298+
return;
299+
}
300+
if (rowStart < cache.serial.validRows.length) {
301+
// ignore the rows already cached
302+
rowStart = cache.serial.validRows.length;
321303
}
322-
if (missingRange.start >= range.end) {
323-
// no overlap, check the next range
324-
continue;
304+
305+
const estimatedStart = Math.floor(
306+
cache.serial.end +
307+
(rowStart - cache.serial.validRows.length) * cache.averageRowBytes
308+
);
309+
const estimatedEnd = Math.min(
310+
byteLength,
311+
Math.ceil(
312+
cache.serial.end +
313+
(rowEnd - cache.serial.validRows.length) * cache.averageRowBytes
314+
)
315+
);
316+
// find the ranges of rows we don't have yet
317+
// start with the full range, and then remove the parts we have
318+
const missingRange = {
319+
start: estimatedStart,
320+
end: estimatedEnd,
321+
};
322+
const missingRanges: { start: number; end: number }[] = [];
323+
// Loop on the random ranges, which are sorted and non-overlapping
324+
for (const range of cache.random) {
325+
if (missingRange.end <= range.start) {
326+
// no overlap, and no more overlap possible
327+
missingRanges.push(missingRange);
328+
break;
329+
}
330+
if (missingRange.start >= range.end) {
331+
// no overlap, check the next range
332+
continue;
333+
}
334+
// overlap
335+
if (missingRange.start < range.start) {
336+
// add the part before the overlap
337+
missingRanges.push({
338+
start: missingRange.start,
339+
end: range.start,
340+
});
341+
}
342+
// move the start to the end of the range
343+
missingRange.start = range.end;
344+
if (missingRange.start >= missingRange.end) {
345+
// no more missing range
346+
break;
347+
}
325348
}
326-
// overlap
327-
if (missingRange.start < range.start) {
328-
// add the part before the overlap
329-
missingRanges.push({
330-
start: missingRange.start,
331-
end: range.start,
332-
});
349+
if (missingRange.start < missingRange.end) {
350+
// add the remaining part
351+
missingRanges.push(missingRange);
333352
}
334-
// move the start to the end of the range
335-
missingRange.start = range.end;
336-
if (missingRange.start >= missingRange.end) {
337-
// no more missing range
338-
break;
353+
354+
if (missingRanges.length === 0) {
355+
// all rows are already cached
356+
return;
339357
}
340-
}
341-
if (missingRange.start < missingRange.end) {
342-
// add the remaining part
343-
missingRanges.push(missingRange);
344-
}
345358

346-
if (missingRanges.length === 0) {
347-
// all rows are already cached
348-
return;
359+
// fetch each missing range and fill the cache
360+
await Promise.all(
361+
missingRanges.map(({ start, end }) =>
362+
fetchRange({ start, end, signal, cache, eventTarget })
363+
)
364+
).finally(() => {
365+
// TODO(SL): Update the average size of a row?
366+
// For now, we keep it constant, to provide stability - otherwise empty rows appear after the update
367+
previousAverageRowBytes = cache.averageRowBytes;
368+
cache.averageRowBytes = getAverageRowBytes(cache);
369+
//eventTarget.dispatchEvent(new CustomEvent("resolve")); // to refresh the table (hmmm. Or better call fetch again until we reach stability?)
370+
});
349371
}
350372

351-
// fetch each missing range and fill the cache
352-
353-
await Promise.all(
354-
missingRanges.map(({ start, end }) =>
355-
fetchRange({ start, end, signal, cache, eventTarget })
356-
)
357-
// TODO(SL): check the signal?
358-
).finally(() => {
359-
// TODO(SL): Update the average size of a row?
360-
// For now, we keep it constant, to provide stability - otherwise empty rows appear after the update
361-
// cache.averageRowBytes = getAverageRowBytes(cache);
362-
// eventTarget.dispatchEvent(new CustomEvent("resolve")); // to refresh the table
363-
});
364-
365373
// TODO(SL): evict old rows (or only cell contents?) if needed
366374
// TODO(SL): handle fetching (and most importantly storing) only part of the columns?
367375
// Note that source.coop does not support negative ranges for now https://github.com/source-cooperative/data.source.coop/issues/57 (for https://github.com/hyparam/hightable/issues/298#issuecomment-3381567614)
@@ -392,7 +400,6 @@ function findParsedRow({ cache, row }: { cache: Cache; row: number }):
392400
};
393401
}
394402
const estimatedStart =
395-
cache.header.bytes +
396403
cache.serial.end +
397404
(row - cache.serial.validRows.length) * cache.averageRowBytes;
398405
// find the range containing this row
@@ -403,10 +410,12 @@ function findParsedRow({ cache, row }: { cache: Cache; row: number }):
403410
return; // not found
404411
}
405412
// estimate the row index of the first row in the range
406-
const firstRowIndex = Math.round(
407-
// is .round() better than .floor() or .ceil()?
408-
(range.start - cache.header.bytes) / cache.averageRowBytes
409-
);
413+
const firstRowIndex =
414+
cache.serial.validRows.length +
415+
Math.round(
416+
// is .round() better than .floor() or .ceil()?
417+
(range.start - cache.serial.end) / cache.averageRowBytes
418+
);
410419
// get the row in the range. This way, we ensure that calls to findParsedRow() with increasing row numbers
411420
// will return rows in the same order, without holes or duplicates, even if the averageRowBytes is not accurate.
412421
const parsedRow = range.validRows[row - firstRowIndex];

0 commit comments

Comments
 (0)