Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/config/src/worker_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,17 @@ impl From<&CompactorConfig> for amp_worker_core::CompactorConfig {

/// Compaction algorithm tuning parameters.
///
/// Controls cooldown between compaction runs and eager compaction size limits.
/// Controls cooldown, segment exclusion, and eager compaction size limits.
#[derive(Debug, Clone, serde::Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[serde(default)]
pub struct CompactionAlgorithmConfig {
/// Base cooldown duration in seconds between compaction runs (default: 1024).
/// Minimum time in seconds before a file can initiate a new compaction group.
/// Prevents excessive I/O from rewriting large files too frequently (default: 1024).
pub cooldown_duration: ConfigDuration<1024>,
/// Number of latest segments to skip during compaction. Avoids compacting
/// recently-written data that may contain reorganized blocks (default: 3).
pub skip_latest_segments: u64,
/// Eager compaction limits (flattened fields: `overflow`, `bytes`, `rows`).
#[serde(
flatten,
Expand All @@ -309,6 +313,7 @@ impl Default for CompactionAlgorithmConfig {
fn default() -> Self {
Self {
cooldown_duration: ConfigDuration::default(),
skip_latest_segments: 3,
eager_compaction_limit: SizeLimitConfig::default_eager_limit(),
}
}
Expand All @@ -318,6 +323,7 @@ impl From<&CompactionAlgorithmConfig> for amp_worker_core::CompactionAlgorithmCo
fn from(config: &CompactionAlgorithmConfig) -> Self {
Self {
cooldown_duration: (&config.cooldown_duration).into(),
skip_latest_segments: config.skip_latest_segments,
eager_compaction_limit: (&config.eager_compaction_limit).into(),
}
}
Expand Down
36 changes: 27 additions & 9 deletions crates/core/worker-core/src/compaction/algorithm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,31 @@ use crate::compaction::{compactor::CompactionGroup, plan::CompactionFile};
/// based on their size and age.
///
/// ## Fields
/// - `cooldown_duration`: The base duration used to calculate
/// the cooldown period for files based on their generation.
/// - `cooldown_duration`: Minimum time before a file can initiate a new
/// compaction group. Prevents excessive I/O from rewriting large files
/// too frequently.
/// - `target_partition_size`: The upper bound for segment size limits.
/// Files exceeding this limit will not be compacted together. This
/// value must be non-unbounded.
/// - `max_eager_generation`: Segments up to this generation will not be subject to cooldowns.
/// - `skip_latest_segments`: Number of latest segments to skip during
/// compaction. Avoids compacting recently-written data that may contain
/// reorganized blocks.
#[derive(Clone, Copy, Debug)]
pub struct CompactionAlgorithm {
/// The amount of time a file must wait before it can be
/// compacted with files of different generations.
/// Minimum time before a file can initiate a new compaction group.
/// Prevents excessive I/O from rewriting large files too frequently.
pub cooldown_duration: Duration,
/// The upper bound for segment size limits. Files exceeding this limit
/// will not be compacted together. This value must be non-unbounded.
pub target_partition_size: SegmentSizeLimit,

/// Segments up to this generation will not be subject to cooldowns
pub max_eager_generation: Option<Generation>,

/// Number of latest segments to skip during compaction. Avoids compacting
/// recently-written data that may contain reorganized blocks.
pub skip_latest_segments: u64,
}

impl CompactionAlgorithm {
Expand Down Expand Up @@ -207,9 +215,17 @@ impl CompactionAlgorithm {
/// - When a group is started, if the candidate can be added to it.
///
/// The current algorithm is:
/// - If the file is `Hot`, it cannot start a new group.
/// - If a group has been started, it will accept files up to the target size, regardless of file state.
/// - `Hot` files (within `cooldown_duration`) cannot start a new group,
/// but can join an existing one. This prevents excessive I/O from
/// rewriting large files too frequently.
/// - Files are accepted into a group up to the target size.
///
/// Note: The latest segments are excluded before they reach the predicate
/// (see `CompactionPlan::from_snapshot`), controlled by
/// `skip_latest_segments`.
pub fn predicate(&self, group: &CompactionGroup, candidate: &CompactionFile) -> bool {
// Hot files cannot start a new group (prevents excessive I/O from
// rewriting large files too frequently).
if group.is_empty() && self.file_state(&candidate.size) == FileState::Hot {
return false;
}
Expand Down Expand Up @@ -241,13 +257,15 @@ impl<'a> From<&'a ParquetConfig> for CompactionAlgorithm {
Some(Generation::from(generation))
}
},
skip_latest_segments: config.compactor.algorithm.skip_latest_segments,
}
}
}

/// Cooldown period for file compaction. Before the period elapses,
/// the file will only be compacted if the candidate group shares the
/// same generation.
/// Cooldown period for file compaction. A file within its cooldown
/// period is considered "hot" and cannot initiate a new compaction
/// group. This prevents excessive I/O from rewriting large files
/// too frequently. Hot files can still be added to existing groups.
#[derive(Clone, Copy)]
pub struct Cooldown(Duration);

Expand Down
8 changes: 7 additions & 1 deletion crates/core/worker-core/src/compaction/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,15 @@ impl Default for CompactorConfig {

#[derive(Debug, Clone)]
pub struct CompactionAlgorithmConfig {
/// Base cooldown duration in seconds (default: 1024.0)
/// Minimum time before a file can initiate a new compaction group.
/// Prevents excessive I/O from rewriting large files too frequently
/// (default: 1024 seconds).
pub cooldown_duration: ConfigDuration<1024>,
/// Eager compaction limits
pub eager_compaction_limit: SizeLimitConfig,
/// Number of latest segments to skip during compaction. Avoids compacting
/// recently-written data that may contain reorganized blocks (default: 3).
pub skip_latest_segments: u64,
}

impl Default for CompactionAlgorithmConfig {
Expand All @@ -89,6 +94,7 @@ impl Default for CompactionAlgorithmConfig {
bytes: 0,
..Default::default()
},
skip_latest_segments: 3,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/core/worker-core/src/compaction/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ impl<'a> CompactionPlan<'a> {
) -> CompactionResult<Option<Self>> {
let chain = table.canonical_segments();

// Drop the latest N segments from the end of the canonical chain to
// avoid compacting data that may contain reorganized blocks.
let skip = opts.compactor.algorithm.skip_latest_segments as usize;
let chain = &chain[..chain.len().saturating_sub(skip)];

let size = chain.len();
if size == 0 {
return Ok(None);
Expand Down
9 changes: 8 additions & 1 deletion docs/schemas/config/ampd.spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
"minimum": 0
},
"cooldown_duration": {
"description": "Base cooldown duration in seconds between compaction runs (default: 1024).",
"description": "Minimum time in seconds before a file can initiate a new compaction group.\nPrevents excessive I/O from rewriting large files too frequently (default: 1024).",
"$ref": "#/$defs/ConfigDuration"
},
"metadata_concurrency": {
Expand All @@ -164,6 +164,13 @@
"format": "uint64",
"minimum": 0
},
"skip_latest_segments": {
"description": "Number of latest segments to skip during compaction. Avoids compacting\nrecently-written data that may contain reorganized blocks (default: 3).",
"type": "integer",
"format": "uint64",
"default": 3,
"minimum": 0
},
"write_concurrency": {
"description": "Maximum concurrent compaction write operations (default: 2).",
"type": "integer",
Expand Down
1 change: 1 addition & 0 deletions tests/src/tests/it_sql_dataset_batch_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ impl TestCtx {
opts_mut.collector.interval = Duration::ZERO;
opts_mut.compactor.interval = Duration::ZERO;
opts_mut.compactor.algorithm.cooldown_duration = Duration::ZERO;
opts_mut.compactor.algorithm.skip_latest_segments = 0;
opts_mut.partition = SegmentSizeLimit::new(100, 0, 0, 0, Generation::default(), 10);
let metadata_db = self.ctx.daemon_worker().metadata_db().clone();
let data_store = self.ctx.daemon_server().data_store().clone();
Expand Down
Loading