Skip to content

Commit 69691ad

Browse files
committed
prune: Allow to read (nearly) contiguous blobs at once during rewrite
1 parent 2d5b24c commit 69691ad

File tree

2 files changed

+119
-51
lines changed

2 files changed

+119
-51
lines changed

crates/core/src/blob/packer.rs

Lines changed: 77 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,25 @@ impl Actor {
846846
}
847847
}
848848

849+
#[derive(Debug)]
850+
pub struct RepackPackBlobs {
851+
pub pack_id: PackId,
852+
pub offset: u32,
853+
pub length: u32,
854+
pub blobs: Vec<IndexBlob>,
855+
}
856+
857+
impl RepackPackBlobs {
858+
pub fn from_index_blob(pack_id: PackId, blob: IndexBlob) -> Self {
859+
Self {
860+
pack_id,
861+
offset: blob.offset,
862+
length: blob.length,
863+
blobs: vec![blob],
864+
}
865+
}
866+
}
867+
849868
/// The `Repacker` is responsible for repacking blobs into pack files.
850869
///
851870
/// # Type Parameters
@@ -862,6 +881,8 @@ where
862881
packer: Packer<BE>,
863882
/// The size limit of the pack file.
864883
size_limit: u32,
884+
/// the blob type
885+
blob_type: BlobType,
865886
}
866887

867888
impl<BE: DecryptFullBackend> Repacker<BE> {
@@ -895,6 +916,7 @@ impl<BE: DecryptFullBackend> Repacker<BE> {
895916
be,
896917
packer,
897918
size_limit,
919+
blob_type,
898920
})
899921
}
900922

@@ -909,30 +931,38 @@ impl<BE: DecryptFullBackend> Repacker<BE> {
909931
///
910932
/// * If the blob could not be added
911933
/// * If reading the blob from the backend fails
912-
pub fn add_fast(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> {
934+
pub fn add_fast(&self, pack_blobs: RepackPackBlobs) -> RusticResult<()> {
935+
let offset = pack_blobs.offset;
913936
let data = self.be.read_partial(
914937
FileType::Pack,
915-
pack_id,
916-
blob.tpe.is_cacheable(),
917-
blob.offset,
918-
blob.length,
938+
&pack_blobs.pack_id,
939+
self.blob_type.is_cacheable(),
940+
offset,
941+
pack_blobs.length,
919942
)?;
920943

921-
self.packer
922-
.add_raw(
923-
&data,
924-
&blob.id,
925-
0,
926-
blob.uncompressed_length,
927-
Some(self.size_limit),
928-
)
929-
.map_err(|err| {
930-
err.overwrite_kind(ErrorKind::Internal)
931-
.prepend_guidance_line(
932-
"Failed to fast-add (unchecked) blob `{blob_id}` to packfile.",
933-
)
934-
.attach_context("blob_id", blob.id.to_string())
935-
})?;
944+
// TODO: write in parallel
945+
for blob in pack_blobs.blobs {
946+
let start = usize::try_from(blob.offset - offset)
947+
.expect("convert from u32 to usize should not fail!");
948+
let end = usize::try_from(blob.offset + blob.length - offset)
949+
.expect("convert from u32 to usize should not fail!");
950+
self.packer
951+
.add_raw(
952+
&data[start..end],
953+
&blob.id,
954+
0,
955+
blob.uncompressed_length,
956+
Some(self.size_limit),
957+
)
958+
.map_err(|err| {
959+
err.overwrite_kind(ErrorKind::Internal)
960+
.prepend_guidance_line(
961+
"Failed to fast-add (unchecked) blob `{blob_id}` to packfile.",
962+
)
963+
.attach_context("blob_id", blob.id.to_string())
964+
})?;
965+
}
936966

937967
Ok(())
938968
}
@@ -948,25 +978,36 @@ impl<BE: DecryptFullBackend> Repacker<BE> {
948978
///
949979
/// * If the blob could not be added
950980
/// * If reading the blob from the backend fails
951-
pub fn add(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> {
952-
let data = self.be.read_encrypted_partial(
981+
pub fn add(&self, pack_blobs: RepackPackBlobs) -> RusticResult<()> {
982+
let offset = pack_blobs.offset;
983+
let read_data = self.be.read_partial(
953984
FileType::Pack,
954-
pack_id,
955-
blob.tpe.is_cacheable(),
956-
blob.offset,
957-
blob.length,
958-
blob.uncompressed_length,
985+
&pack_blobs.pack_id,
986+
self.blob_type.is_cacheable(),
987+
offset,
988+
pack_blobs.length,
959989
)?;
960990

961-
self.packer
962-
.add_with_sizelimit(data, blob.id, Some(self.size_limit))
963-
.map_err(|err| {
964-
RusticError::with_source(
965-
ErrorKind::Internal,
966-
"Failed to add blob to packfile.",
967-
err,
968-
)
969-
})?;
991+
// TODO: write in parallel
992+
for blob in pack_blobs.blobs {
993+
let start = usize::try_from(blob.offset - offset)
994+
.expect("convert from u32 to usize should not fail!");
995+
let end = usize::try_from(blob.offset + blob.length - offset)
996+
.expect("convert from u32 to usize should not fail!");
997+
let data = self
998+
.be
999+
.read_encrypted_from_partial(&read_data[start..end], blob.uncompressed_length)?;
1000+
1001+
self.packer
1002+
.add_with_sizelimit(data, blob.id, Some(self.size_limit))
1003+
.map_err(|err| {
1004+
RusticError::with_source(
1005+
ErrorKind::Internal,
1006+
"Failed to add blob to packfile.",
1007+
err,
1008+
)
1009+
})?;
1010+
}
9701011

9711012
Ok(())
9721013
}

crates/core/src/commands/prune.rs

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
},
2727
blob::{
2828
BlobId, BlobType, BlobTypeMap, Initialize,
29-
packer::{PackSizer, Repacker},
29+
packer::{PackSizer, RepackPackBlobs, Repacker},
3030
tree::TreeStreamerOnce,
3131
},
3232
error::{ErrorKind, RusticError, RusticResult},
@@ -46,6 +46,11 @@ use crate::{
4646
pub(super) mod constants {
4747
/// Minimum size of an index file to be considered for pruning
4848
pub(super) const MIN_INDEX_LEN: usize = 10_000;
49+
/// The maximum size of pack-part which is read at once from the backend.
50+
/// (needed to limit the memory size used for large backends)
51+
pub(crate) const LIMIT_PACK_READ: u32 = 40 * 1024 * 1024; // 40 MiB
52+
/// The maximum size of holes which are still read when repacking
53+
pub(crate) const MAX_HOLESIZE: u32 = 256 * 1024; // 256 kiB
4954
}
5055

5156
#[allow(clippy::struct_excessive_bools)]
@@ -1266,6 +1271,11 @@ pub(crate) fn prune_repository<P: ProgressBars, S: Open>(
12661271
}
12671272
}
12681273

1274+
if prune_plan.index_files.is_empty() {
1275+
info!("nothing to do!");
1276+
return Ok(());
1277+
}
1278+
12691279
let mut indexes_remove = Vec::new();
12701280
let mut tree_packs_remove = Vec::new();
12711281
let mut data_packs_remove = Vec::new();
@@ -1282,12 +1292,8 @@ pub(crate) fn prune_repository<P: ProgressBars, S: Open>(
12821292
let mut repack_packs = Vec::new();
12831293

12841294
// process packs by index_file
1285-
let p = if prune_plan.index_files.is_empty() {
1286-
info!("nothing to do!");
1287-
pb.progress_hidden()
1288-
} else {
1289-
pb.progress_spinner("rebuilding index...")
1290-
};
1295+
let p = pb.progress_counter("rebuilding index...");
1296+
p.set_length(u64::try_from(prune_plan.index_files.len()).unwrap_or_default());
12911297
for index in prune_plan.index_files {
12921298
indexes_remove.push(index.id);
12931299
for mut pack in index.packs {
@@ -1397,18 +1403,39 @@ pub(crate) fn prune_repository<P: ProgressBars, S: Open>(
13971403
repack_packs
13981404
.into_par_iter()
13991405
.try_for_each(|pack| -> RusticResult<_> {
1406+
let repacker = match pack.blob_type {
1407+
BlobType::Data => &data_repacker,
1408+
BlobType::Tree => &tree_repacker,
1409+
};
1410+
let blobs: Vec<_> = pack
1411+
.blobs
1412+
.into_iter()
1413+
.map(|blob| RepackPackBlobs::from_index_blob(pack.id, blob))
1414+
.coalesce(|mut x, mut y| {
1415+
// if the blobs are (almost) contiguous and we don't trespass the limit, read blobs one partial read
1416+
if y.offset <= x.offset + x.length + constants::MAX_HOLESIZE
1417+
&& y.offset > x.offset
1418+
&& y.offset + y.length - x.offset <= constants::LIMIT_PACK_READ
1419+
{
1420+
x.length = y.offset + y.length - x.offset; // read till the end of y
1421+
x.blobs.append(&mut y.blobs);
1422+
Ok(x)
1423+
} else {
1424+
Err((x, y))
1425+
}
1426+
})
1427+
.collect();
1428+
14001429
// TODO: repack in parallel
1401-
for blob in &pack.blobs {
1402-
let repacker = match blob.tpe {
1403-
BlobType::Data => &data_repacker,
1404-
BlobType::Tree => &tree_repacker,
1405-
};
1430+
for blobs in blobs {
1431+
dbg!(&blobs);
1432+
let length = u64::from(blobs.length);
14061433
if opts.fast_repack {
1407-
repacker.add_fast(&pack.id, blob)?;
1434+
repacker.add_fast(blobs)?;
14081435
} else {
1409-
repacker.add(&pack.id, blob)?;
1436+
repacker.add(blobs)?;
14101437
}
1411-
p.inc(u64::from(blob.length));
1438+
p.inc(length);
14121439
}
14131440
Ok(())
14141441
})?;

0 commit comments

Comments
 (0)