diff --git a/crates/core/src/blob/packer.rs b/crates/core/src/blob/packer.rs index d2a97d9b..06b5cf75 100644 --- a/crates/core/src/blob/packer.rs +++ b/crates/core/src/blob/packer.rs @@ -846,6 +846,25 @@ impl Actor { } } +#[derive(Debug)] +pub struct RepackPackBlobs { + pub pack_id: PackId, + pub offset: u32, + pub length: u32, + pub blobs: Vec, +} + +impl RepackPackBlobs { + pub fn from_index_blob(pack_id: PackId, blob: IndexBlob) -> Self { + Self { + pack_id, + offset: blob.offset, + length: blob.length, + blobs: vec![blob], + } + } +} + /// The `Repacker` is responsible for repacking blobs into pack files. /// /// # Type Parameters @@ -862,6 +881,8 @@ where packer: Packer, /// The size limit of the pack file. size_limit: u32, + /// the blob type + blob_type: BlobType, } impl Repacker { @@ -895,6 +916,7 @@ impl Repacker { be, packer, size_limit, + blob_type, }) } @@ -909,30 +931,38 @@ impl Repacker { /// /// * If the blob could not be added /// * If reading the blob from the backend fails - pub fn add_fast(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> { + pub fn add_fast(&self, pack_blobs: RepackPackBlobs) -> RusticResult<()> { + let offset = pack_blobs.offset; let data = self.be.read_partial( FileType::Pack, - pack_id, - blob.tpe.is_cacheable(), - blob.offset, - blob.length, + &pack_blobs.pack_id, + self.blob_type.is_cacheable(), + offset, + pack_blobs.length, )?; - self.packer - .add_raw( - &data, - &blob.id, - 0, - blob.uncompressed_length, - Some(self.size_limit), - ) - .map_err(|err| { - err.overwrite_kind(ErrorKind::Internal) - .prepend_guidance_line( - "Failed to fast-add (unchecked) blob `{blob_id}` to packfile.", - ) - .attach_context("blob_id", blob.id.to_string()) - })?; + // TODO: write in parallel + for blob in pack_blobs.blobs { + let start = usize::try_from(blob.offset - offset) + .expect("convert from u32 to usize should not fail!"); + let end = usize::try_from(blob.offset + blob.length - offset) + .expect("convert from u32 to usize should not fail!"); + self.packer + .add_raw( + &data[start..end], + &blob.id, + 0, + blob.uncompressed_length, + Some(self.size_limit), + ) + .map_err(|err| { + err.overwrite_kind(ErrorKind::Internal) + .prepend_guidance_line( + "Failed to fast-add (unchecked) blob `{blob_id}` to packfile.", + ) + .attach_context("blob_id", blob.id.to_string()) + })?; + } Ok(()) } @@ -948,25 +978,36 @@ impl Repacker { /// /// * If the blob could not be added /// * If reading the blob from the backend fails - pub fn add(&self, pack_id: &PackId, blob: &IndexBlob) -> RusticResult<()> { - let data = self.be.read_encrypted_partial( + pub fn add(&self, pack_blobs: RepackPackBlobs) -> RusticResult<()> { + let offset = pack_blobs.offset; + let read_data = self.be.read_partial( FileType::Pack, - pack_id, - blob.tpe.is_cacheable(), - blob.offset, - blob.length, - blob.uncompressed_length, + &pack_blobs.pack_id, + self.blob_type.is_cacheable(), + offset, + pack_blobs.length, )?; - self.packer - .add_with_sizelimit(data, blob.id, Some(self.size_limit)) - .map_err(|err| { - RusticError::with_source( - ErrorKind::Internal, - "Failed to add blob to packfile.", - err, - ) - })?; + // TODO: write in parallel + for blob in pack_blobs.blobs { + let start = usize::try_from(blob.offset - offset) + .expect("convert from u32 to usize should not fail!"); + let end = usize::try_from(blob.offset + blob.length - offset) + .expect("convert from u32 to usize should not fail!"); + let data = self + .be + .read_encrypted_from_partial(&read_data[start..end], blob.uncompressed_length)?; + + self.packer + .add_with_sizelimit(data, blob.id, Some(self.size_limit)) + .map_err(|err| { + RusticError::with_source( + ErrorKind::Internal, + "Failed to add blob to packfile.", + err, + ) + })?; + } Ok(()) } diff --git a/crates/core/src/commands/prune.rs b/crates/core/src/commands/prune.rs index 82ddd408..6582e504 100644 --- a/crates/core/src/commands/prune.rs +++ b/crates/core/src/commands/prune.rs @@ -6,7 +6,6 @@ use std::{ cmp::Ordering, collections::{BTreeMap, BTreeSet}, str::FromStr, - sync::{Arc, Mutex}, }; use bytesize::ByteSize; @@ -27,7 +26,7 @@ use crate::{ }, blob::{ BlobId, BlobType, BlobTypeMap, Initialize, - packer::{PackSizer, Repacker}, + packer::{PackSizer, RepackPackBlobs, Repacker}, tree::TreeStreamerOnce, }, error::{ErrorKind, RusticError, RusticResult}, @@ -47,6 +46,11 @@ use crate::{ pub(super) mod constants { /// Minimum size of an index file to be considered for pruning pub(super) const MIN_INDEX_LEN: usize = 10_000; + /// The maximum size of pack-part which is read at once from the backend. + /// (needed to limit the memory size used for large backends) + pub(crate) const LIMIT_PACK_READ: u32 = 40 * 1024 * 1024; // 40 MiB + /// The maximum size of holes which are still read when repacking + pub(crate) const MAX_HOLESIZE: u32 = 256 * 1024; // 256 kiB } #[allow(clippy::struct_excessive_bools)] @@ -431,7 +435,7 @@ pub enum PackToDo { } /// A pack which is to be pruned -#[derive(Debug)] +#[derive(Debug, Clone)] struct PrunePack { /// The id of the pack id: PackId, @@ -1228,40 +1232,7 @@ pub(crate) fn prune_repository( let be = repo.dbe(); let pb = &repo.pb; - let indexer = Indexer::new_unindexed(be.clone()).into_shared(); - - // Calculate an approximation of sizes after pruning. - // The size actually is: - // total_size_of_all_blobs + total_size_of_pack_headers + #packs * pack_overhead - // This is hard/impossible to compute because: - // - the size of blobs can change during repacking if compression is changed - // - the size of pack headers depends on whether blobs are compressed or not - // - we don't know the number of packs generated by repacking - // So, we simply use the current size of the blobs and an estimation of the pack - // header size. - - let size_after_prune = BlobTypeMap::init(|blob_type| { - prune_plan.stats.size[blob_type].total_after_prune() - + prune_plan.stats.blobs[blob_type].total_after_prune() - * u64::from(HeaderEntry::ENTRY_LEN_COMPRESSED) - }); - - let tree_repacker = Repacker::new( - be.clone(), - BlobType::Tree, - indexer.clone(), - repo.config(), - size_after_prune[BlobType::Tree], - )?; - - let data_repacker = Repacker::new( - be.clone(), - BlobType::Data, - indexer.clone(), - repo.config(), - size_after_prune[BlobType::Data], - )?; - + let mut indexer = Indexer::new_unindexed(be.clone()); // mark unreferenced packs for deletion if !prune_plan.existing_packs.is_empty() { if opts.instant_delete { @@ -1270,58 +1241,36 @@ pub(crate) fn prune_repository( be.delete_list(true, existing_packs.iter(), p)?; } else { let p = pb.progress_counter("marking unneeded unindexed pack files for deletion..."); - p.set_length(prune_plan.existing_packs.len().try_into().unwrap()); + p.set_length( + prune_plan + .existing_packs + .len() + .try_into() + .unwrap_or_default(), + ); for (id, size) in prune_plan.existing_packs { let pack = IndexPack { id, size: Some(size), - time: Some(Zoned::now()), + time: Some(prune_plan.time.clone()), blobs: Vec::new(), }; - indexer.write().unwrap().add_remove(pack)?; + indexer.add_remove(pack)?; p.inc(1); } p.finish(); } } - // process packs by index_file - let p = match ( - prune_plan.index_files.is_empty(), - prune_plan.stats.packs.repack > 0, - ) { - (true, _) => { - info!("nothing to do!"); - pb.progress_hidden() - } - // TODO: Use a MultiProgressBar here - (false, true) => pb.progress_bytes("repacking // rebuilding index..."), - (false, false) => pb.progress_spinner("rebuilding index..."), - }; - - p.set_length(prune_plan.stats.size_sum().repack - prune_plan.stats.size_sum().repackrm); - - let mut indexes_remove = Vec::new(); - let tree_packs_remove = Arc::new(Mutex::new(Vec::new())); - let data_packs_remove = Arc::new(Mutex::new(Vec::new())); - - let delete_pack = |pack: PrunePack| { - // delete pack - match pack.blob_type { - BlobType::Data => data_packs_remove.lock().unwrap().push(pack.id), - BlobType::Tree => tree_packs_remove.lock().unwrap().push(pack.id), - } - }; - - let used_ids = Arc::new(Mutex::new(prune_plan.used_ids)); + if prune_plan.index_files.is_empty() { + info!("nothing to do!"); + return Ok(()); + } - let packs: Vec<_> = prune_plan + let indexes_remove: Vec<_> = prune_plan .index_files - .into_iter() - .inspect(|index| { - indexes_remove.push(index.id); - }) - .flat_map(|index| index.packs) + .iter() + .map(|index| index.id) .collect(); // remove old index files early if requested @@ -1330,10 +1279,25 @@ pub(crate) fn prune_repository( be.delete_list(true, indexes_remove.iter(), p)?; } - // write new pack files and index files - packs - .into_par_iter() - .try_for_each(|pack| -> RusticResult<_> { + let mut tree_packs_remove = Vec::new(); + let mut data_packs_remove = Vec::new(); + + let mut delete_pack = |pack: &PrunePack| { + // delete pack + match pack.blob_type { + BlobType::Data => data_packs_remove.push(pack.id), + BlobType::Tree => tree_packs_remove.push(pack.id), + } + }; + + let mut used_ids = prune_plan.used_ids; + let mut repack_packs = Vec::new(); + + // process packs by index_file + let p = pb.progress_counter("rebuilding index..."); + p.set_length(u64::try_from(prune_plan.index_files.len()).unwrap_or_default()); + for index in prune_plan.index_files { + for mut pack in index.packs { match pack.to_do { PackToDo::Undecided => { return Err(RusticError::new( @@ -1344,85 +1308,148 @@ pub(crate) fn prune_repository( .ask_report()); } PackToDo::Keep => { - // keep pack: add to new index; correct time if not set + // keep pack: add to new index let pack = pack.into_index_pack(&prune_plan.time); - indexer.write().unwrap().add(pack)?; + indexer.add(pack)?; } PackToDo::Repack => { - // TODO: repack in parallel - for blob in &pack.blobs { - if used_ids.lock().unwrap().remove(&blob.id).is_none() { - // don't save duplicate blobs - continue; - } - - let repacker = match blob.tpe { - BlobType::Data => &data_repacker, - BlobType::Tree => &tree_repacker, - }; - if opts.fast_repack { - repacker.add_fast(&pack.id, blob)?; - } else { - repacker.add(&pack.id, blob)?; - } - p.inc(u64::from(blob.length)); - } if opts.instant_delete { - delete_pack(pack); + delete_pack(&pack); } else { // mark pack for removal - let pack = pack.into_index_pack_with_time(&prune_plan.time); - indexer.write().unwrap().add_remove(pack)?; + let pack = pack.clone().into_index_pack_with_time(&prune_plan.time); + indexer.add_remove(pack)?; } + pack.blobs + .retain(|blob| used_ids.remove(&blob.id).is_some()); // don't save duplicate blobs + repack_packs.push(pack); } PackToDo::MarkDelete => { if opts.instant_delete { - delete_pack(pack); + delete_pack(&pack); } else { // mark pack for removal let pack = pack.into_index_pack_with_time(&prune_plan.time); - indexer.write().unwrap().add_remove(pack)?; + indexer.add_remove(pack)?; } } PackToDo::KeepMarked | PackToDo::KeepMarkedAndCorrect => { if opts.instant_delete { - delete_pack(pack); + delete_pack(&pack); } else { // keep pack: add to new index; keep the timestamp. - // Note the timestamp shouldn be set here, however if it is not, we use the current time to heal the entry! - let pack = pack.into_index_pack(&prune_plan.time); - indexer.write().unwrap().add_remove(pack)?; + // Note the timestamp shouldn't be None here, however if it is not not set, use the current time to heal the entry! + let time = pack.time.clone().unwrap_or_else(|| prune_plan.time.clone()); + let pack = pack.into_index_pack_with_time(&time); + indexer.add_remove(pack)?; } } PackToDo::Recover => { // recover pack: add to new index in section packs let pack = pack.into_index_pack_with_time(&prune_plan.time); - indexer.write().unwrap().add(pack)?; + indexer.add(pack)?; } - PackToDo::Delete => delete_pack(pack), + PackToDo::Delete => delete_pack(&pack), } - Ok(()) - })?; - _ = tree_repacker.finalize()?; - _ = data_repacker.finalize()?; - indexer.write().unwrap().finalize()?; + } + } p.finish(); + if repack_packs.is_empty() { + indexer.finalize()?; + } else { + let p = pb.progress_bytes("repacking..."); + p.set_length(prune_plan.stats.size_sum().repack - prune_plan.stats.size_sum().repackrm); + let indexer = indexer.into_shared(); + + // Calculate an approximation of sizes after pruning. + // The size actually is: + // total_size_of_all_blobs + total_size_of_pack_headers + #packs * pack_overhead + // This is hard/impossible to compute because: + // - the size of blobs can change during repacking if compression is changed + // - the size of pack headers depends on whether blobs are compressed or not + // - we don't know the number of packs generated by repacking + // So, we simply use the current size of the blobs and an estimation of the pack + // header size. + + let size_after_prune = BlobTypeMap::init(|blob_type| { + prune_plan.stats.size[blob_type].total_after_prune() + + prune_plan.stats.blobs[blob_type].total_after_prune() + * u64::from(HeaderEntry::ENTRY_LEN_COMPRESSED) + }); + + let tree_repacker = Repacker::new( + be.clone(), + BlobType::Tree, + indexer.clone(), + repo.config(), + size_after_prune[BlobType::Tree], + )?; + + let data_repacker = Repacker::new( + be.clone(), + BlobType::Data, + indexer.clone(), + repo.config(), + size_after_prune[BlobType::Data], + )?; + + // write new pack files and index files + repack_packs + .into_par_iter() + .try_for_each(|pack| -> RusticResult<_> { + let repacker = match pack.blob_type { + BlobType::Data => &data_repacker, + BlobType::Tree => &tree_repacker, + }; + let blobs: Vec<_> = pack + .blobs + .into_iter() + .map(|blob| RepackPackBlobs::from_index_blob(pack.id, blob)) + .coalesce(|mut x, mut y| { + // if the blobs are (almost) contiguous and we don't trespass the limit, read blobs one partial read + if y.offset <= x.offset + x.length + constants::MAX_HOLESIZE + && y.offset > x.offset + && y.offset + y.length - x.offset <= constants::LIMIT_PACK_READ + { + x.length = y.offset + y.length - x.offset; // read till the end of y + x.blobs.append(&mut y.blobs); + Ok(x) + } else { + Err((x, y)) + } + }) + .collect(); + + // TODO: repack in parallel + for blobs in blobs { + let length = u64::from(blobs.length); + if opts.fast_repack { + repacker.add_fast(blobs)?; + } else { + repacker.add(blobs)?; + } + p.inc(length); + } + Ok(()) + })?; + _ = tree_repacker.finalize()?; + _ = data_repacker.finalize()?; + indexer.write().unwrap().finalize()?; + p.finish(); + } + // remove old index files first as they may reference pack files which are removed soon. if !indexes_remove.is_empty() && !opts.early_delete_index { let p = pb.progress_counter("removing old index files..."); be.delete_list(true, indexes_remove.iter(), p)?; } - // get variable out of Arc> - let data_packs_remove = data_packs_remove.lock().unwrap(); if !data_packs_remove.is_empty() { let p = pb.progress_counter("removing old data packs..."); be.delete_list(false, data_packs_remove.iter(), p)?; } - // get variable out of Arc> - let tree_packs_remove = tree_packs_remove.lock().unwrap(); if !tree_packs_remove.is_empty() { let p = pb.progress_counter("removing old tree packs..."); be.delete_list(true, tree_packs_remove.iter(), p)?; diff --git a/crates/core/tests/integration/prune.rs b/crates/core/tests/integration/prune.rs index ff21b6dd..70c27f60 100644 --- a/crates/core/tests/integration/prune.rs +++ b/crates/core/tests/integration/prune.rs @@ -1,9 +1,11 @@ use anyhow::Result; +use bytesize::ByteSize; use jiff::Span; use rstest::rstest; use rustic_core::{ - BackupOptions, CheckOptions, LimitOption, PathList, PruneOptions, repofile::SnapshotFile, + BackupOptions, CheckOptions, ConfigOptions, LimitOption, PathList, PruneOptions, + repofile::{Chunker, SnapshotFile}, }; use super::{RepoOpen, TestSource, set_up_repo, tar_gz_testdata}; @@ -12,6 +14,13 @@ use super::{RepoOpen, TestSource, set_up_repo, tar_gz_testdata}; fn test_prune( tar_gz_testdata: Result, set_up_repo: Result, + #[values( + ConfigOptions::default(), + ConfigOptions::default() + .set_chunker(Chunker::FixedSize) + .set_chunk_size(ByteSize::b(2)) + )] + opts: ConfigOptions, #[values(true, false)] instant_delete: bool, #[values( LimitOption::Percentage(0), @@ -19,9 +28,11 @@ fn test_prune( LimitOption::Unlimited )] max_unused: LimitOption, + #[values(true, false)] fast_repack: bool, ) -> Result<()> { // Fixtures - let (source, repo) = (tar_gz_testdata?, set_up_repo?.to_indexed_ids()?); + let (source, mut repo) = (tar_gz_testdata?, set_up_repo?.to_indexed_ids()?); + _ = repo.apply_config(&opts)?; let opts = BackupOptions::default(); @@ -49,7 +60,8 @@ fn test_prune( let prune_opts = PruneOptions::default() .instant_delete(instant_delete) .max_unused(max_unused) - .keep_delete(Span::default()); + .keep_delete(Span::default()) + .fast_repack(fast_repack); let plan = repo.prune_plan(&prune_opts)?; // TODO: Snapshot-test the plan (currently doesn't impl Serialize) // assert_ron_snapshot!("prune", plan);