Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ members = [
"crates/core/worker-core",
"crates/core/worker-datasets-derived",
"crates/core/worker-datasets-raw",
"crates/core/worker-gc",
"crates/parquet-ext",
"crates/extractors/evm-rpc",
"crates/extractors/evm-rpc/gen",
Expand Down
1 change: 1 addition & 0 deletions crates/bin/ampctl/src/cmd/job/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fn show_dataset_descriptor(descriptor: &serde_json::Value) -> Option<String> {
dataset_name = desc.dataset_name,
hash = &desc.manifest_hash.as_str()[..7],
)),
JobDescriptor::Gc(desc) => Some(format!("gc location:{}", desc.location_id)),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/bin/ampd/src/controller_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub async fn run(
ethcall_udfs_cache,
meter,
at,
config.gc_scheduling,
)
.await
.map_err(Error::ServiceInit)?;
Expand Down
1 change: 1 addition & 0 deletions crates/bin/ampd/src/solo_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub async fn run(
ethcall_udfs_cache.clone(),
meter.clone(),
config.controller_addrs.admin_api_addr,
config.gc_scheduling.clone(),
)
.await
.map_err(Error::ServiceInit)?;
Expand Down
4 changes: 4 additions & 0 deletions crates/config/src/config_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ pub struct ConfigFile {
// Worker event streaming configuration
#[serde(default)]
pub worker_events: crate::WorkerEventsConfig,

// Controller GC scheduling configuration
#[serde(default)]
pub gc_scheduling: crate::controller::GcSchedulingConfig,
}

impl ConfigFile {
Expand Down
36 changes: 34 additions & 2 deletions crates/config/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::net::{AddrParseError, SocketAddr};
use std::{
net::{AddrParseError, SocketAddr},
time::Duration,
};

use crate::config_file::ConfigFile;
use crate::{config_file::ConfigFile, worker_core::ConfigDuration};

/// Default port for the Admin API server.
pub const DEFAULT_CONTROLLER_ADMIN_API_PORT: u16 = 1610;
Expand All @@ -12,6 +15,35 @@ pub struct ControllerAddrs {
pub admin_api_addr: SocketAddr,
}

/// Configuration for controller-managed garbage collection scheduling.
///
/// Controls whether the controller schedules GC jobs and at what interval.
/// GC jobs are executed by workers, not the controller.
#[derive(Debug, Clone, Default, serde::Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct GcSchedulingConfig {
/// Enable GC job scheduling (default: `false`).
///
/// When `false`, the controller will not schedule any GC jobs.
/// Enable only after verifying the legacy worker-internal GC is disabled.
#[serde(default)]
pub enabled: bool,

/// Interval in seconds between GC scheduling sweeps (default: 60).
///
/// The controller scans for active physical table revisions at this
/// interval and schedules GC jobs for each one.
#[serde(default)]
pub interval: ConfigDuration<60>,
}

impl GcSchedulingConfig {
/// Returns the scheduling interval as a [`Duration`].
pub fn interval_duration(&self) -> Duration {
self.interval.clone().into()
}
}

impl Default for ControllerAddrs {
fn default() -> Self {
Self {
Expand Down
4 changes: 4 additions & 0 deletions crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use self::{
DEFAULT_MANIFESTS_DIRNAME, DEFAULT_MICROBATCH_MAX_INTERVAL, DEFAULT_PROVIDERS_DIRNAME,
DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL, no_defaults_override,
},
controller::GcSchedulingConfig,
metadb::{DEFAULT_METADB_CONN_POOL_SIZE, DEFAULT_METADB_DIRNAME, MetadataDbConfig},
redacted::Redacted,
};
Expand Down Expand Up @@ -112,6 +113,7 @@ fn resolve_config(
poll_interval: config_file.poll_interval_secs.into(),
keep_alive_interval: config_file.keep_alive_interval,
worker_events,
gc_scheduling: config_file.gc_scheduling,
})
}

Expand Down Expand Up @@ -230,6 +232,8 @@ pub struct Config {
pub keep_alive_interval: u64,
/// Worker event streaming configuration.
pub worker_events: WorkerEventsConfig,
/// GC scheduling configuration for the controller.
pub gc_scheduling: GcSchedulingConfig,
}

/// Configuration for worker event streaming.
Expand Down
11 changes: 11 additions & 0 deletions crates/core/metadata-db/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ where
.map_err(Error::Database)
}

/// Returns distinct location IDs that have at least one expired entry in the GC manifest.
#[tracing::instrument(skip(exe), err)]
pub async fn locations_with_expired_entries<'c, E>(exe: E) -> Result<Vec<LocationId>, Error>
where
E: Executor<'c>,
{
sql::locations_with_expired_entries(exe)
.await
.map_err(Error::Database)
}

/// Streams files that have passed their expiration time and are ready for deletion.
#[tracing::instrument(skip(exe))]
pub fn stream_expired<'c, E>(
Expand Down
15 changes: 15 additions & 0 deletions crates/core/metadata-db/src/gc/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ where
Box::pin(sqlx::query_as(query).bind(location_id).fetch(exe))
}

/// Returns distinct location IDs that have at least one expired entry in the GC manifest.
pub async fn locations_with_expired_entries<'c, E>(exe: E) -> Result<Vec<LocationId>, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let query = indoc::indoc! {r#"
SELECT DISTINCT location_id
FROM gc_manifest
WHERE expiration < CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
ORDER BY location_id;
"#};

sqlx::query_scalar(query).fetch_all(exe).await
}

/// GC manifest row from the database
#[derive(Debug, sqlx::FromRow)]
pub struct GcManifestRow {
Expand Down
1 change: 1 addition & 0 deletions crates/core/monitoring/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const AMP_CRATES: &[&str] = &[
"amp_worker_core",
"amp_worker_datasets_derived",
"amp_worker_datasets_raw",
"amp_worker_gc",
"ampcc",
"ampctl",
"ampd",
Expand Down
18 changes: 18 additions & 0 deletions crates/core/worker-gc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "amp-worker-gc"
edition.workspace = true
version.workspace = true
license-file.workspace = true

[dependencies]
amp-data-store = { path = "../data-store" }
amp-worker-core = { path = "../worker-core" }
datasets-common = { path = "../datasets-common" }
futures.workspace = true
metadata-db = { path = "../metadata-db" }
monitoring = { path = "../monitoring" }
object_store.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tracing.workspace = true
77 changes: 77 additions & 0 deletions crates/core/worker-gc/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! Error types for GC job execution.

use amp_worker_core::{error_detail::ErrorDetailsProvider, retryable::RetryableErrorExt};

/// Errors that can occur during garbage collection job execution.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The target physical table revision does not exist.
///
/// This occurs when the `location_id` in the job descriptor does not match any
/// revision in the metadata database. The revision may have been deleted between
/// scheduling and execution.
///
/// This is a fatal (non-retryable) error since the location will not reappear.
#[error("location not found: {0}")]
LocationNotFound(metadata_db::physical_table_revision::LocationId),

/// Failed to query the metadata database for the physical table revision.
///
/// This occurs when looking up the revision by `location_id` at the start of
/// the GC job. Common causes include database connectivity issues or timeouts.
#[error("metadata database error")]
MetadataDb(#[source] metadata_db::Error),

/// Failed to stream expired files from the GC manifest.
///
/// This occurs during step 1 of the collection algorithm when querying
/// `gc_manifest` for files whose expiration has passed. Common causes include
/// database connectivity issues or query timeouts.
#[error("failed to stream expired files")]
FileStream(#[source] metadata_db::Error),

/// Failed to delete file metadata records from Postgres.
///
/// This occurs during step 2 of the collection algorithm when removing
/// `file_metadata` rows for expired files. Common causes include database
/// connectivity issues or transaction conflicts.
#[error("failed to delete file metadata")]
FileMetadataDelete(#[source] metadata_db::Error),

/// Failed to delete a physical file from object storage.
///
/// This occurs during step 3 of the collection algorithm when removing
/// Parquet files from S3/GCS/local storage. Common causes include network
/// failures, permission errors, or storage service unavailability.
///
/// Note: `NotFound` errors are tolerated (the file is already gone).
/// Only other object store errors trigger this variant.
#[error("object store error")]
ObjectStore(#[source] object_store::Error),
}

impl RetryableErrorExt for Error {
fn is_retryable(&self) -> bool {
match self {
Self::LocationNotFound(_) => false,
Self::MetadataDb(_) => true,
Self::FileStream(_) => true,
Self::FileMetadataDelete(_) => true,
Self::ObjectStore(_) => true,
}
}
}

impl amp_worker_core::retryable::JobErrorExt for Error {
fn error_code(&self) -> &'static str {
match self {
Self::LocationNotFound(_) => "GC_LOCATION_NOT_FOUND",
Self::MetadataDb(_) => "GC_METADATA_DB",
Self::FileStream(_) => "GC_FILE_STREAM",
Self::FileMetadataDelete(_) => "GC_FILE_METADATA_DELETE",
Self::ObjectStore(_) => "GC_OBJECT_STORE",
}
}
}

impl ErrorDetailsProvider for Error {}
15 changes: 15 additions & 0 deletions crates/core/worker-gc/src/job_ctx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! Context for GC job execution.

use amp_data_store::DataStore;
use metadata_db::MetadataDb;
use monitoring::telemetry::metrics::Meter;

/// Dependencies required to execute a GC job.
pub struct Context {
/// Connection to the metadata database (Postgres).
pub metadata_db: MetadataDb,
/// Connection to object storage (S3/GCS/local FS).
pub data_store: DataStore,
/// OpenTelemetry meter for recording GC metrics.
pub meter: Option<Meter>,
}
56 changes: 56 additions & 0 deletions crates/core/worker-gc/src/job_descriptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use metadata_db::physical_table_revision::LocationId;

use crate::job_kind::GcJobKind;

/// Job descriptor for garbage collection.
///
/// Contains the fields needed to execute a GC job for a single physical table revision.
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct JobDescriptor {
/// The physical table revision to collect garbage for.
pub location_id: LocationId,
}

impl From<JobDescriptor> for metadata_db::job_events::EventDetailOwned {
fn from(desc: JobDescriptor) -> Self {
#[derive(serde::Serialize)]
struct Tagged<'a> {
kind: GcJobKind,
#[serde(flatten)]
inner: &'a JobDescriptor,
}

// SAFETY: `to_raw_value` only fails on non-string map keys which cannot occur
// with a flat struct containing only a primitive integer field.
let raw = serde_json::value::to_raw_value(&Tagged {
kind: GcJobKind,
inner: &desc,
})
.expect("JobDescriptor serialization is infallible");

metadata_db::job_events::EventDetail::from_owned_unchecked(raw)
}
}

impl TryFrom<&metadata_db::job_events::EventDetail<'_>> for JobDescriptor {
type Error = InvalidJobDescriptorError;

fn try_from(raw: &metadata_db::job_events::EventDetail<'_>) -> Result<Self, Self::Error> {
#[derive(serde::Deserialize)]
struct TaggedOwned {
#[allow(dead_code)]
kind: GcJobKind,
#[serde(flatten)]
inner: JobDescriptor,
}

let tagged: TaggedOwned =
serde_json::from_str(raw.as_str()).map_err(InvalidJobDescriptorError)?;
Ok(tagged.inner)
}
}

/// Error returned when an [`EventDetail`] cannot be converted into a [`JobDescriptor`].
#[derive(Debug, thiserror::Error)]
#[error("invalid job descriptor")]
pub struct InvalidJobDescriptorError(#[source] pub serde_json::Error);
Loading
Loading