From b12ddc5fc75f4c262595407328c668df24993ebc Mon Sep 17 00:00:00 2001 From: Matias Date: Tue, 17 Mar 2026 13:58:23 -0300 Subject: [PATCH 1/2] feat(ampsync): configurable destination table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two new config options for controlling PostgreSQL destination table names: - TABLE_SUFFIX: appends a static suffix (e.g., "green" → usdc_transfers_green) - TABLE_VERSION: appends sanitized dataset revision (e.g., 1.0.0 → usdc_transfers_v1_0_0) Composition order: {table}_v{revision}_{suffix}. Both options are independent and backward compatible — omitting both preserves existing behavior. Introduces TableMapping (source → destination) to separate the dataset table name used in streaming queries from the PostgreSQL table name used for writes. --- crates/bin/ampsync/src/commands/sync.rs | 32 ++- crates/bin/ampsync/src/config.rs | 320 +++++++++++++++++++++++- crates/bin/ampsync/src/manager.rs | 72 +++--- crates/bin/ampsync/src/task.rs | 105 ++++---- 4 files changed, 436 insertions(+), 93 deletions(-) diff --git a/crates/bin/ampsync/src/commands/sync.rs b/crates/bin/ampsync/src/commands/sync.rs index 428b62881..79dbe1784 100644 --- a/crates/bin/ampsync/src/commands/sync.rs +++ b/crates/bin/ampsync/src/commands/sync.rs @@ -3,7 +3,12 @@ use anyhow::{Context, Result}; use sqlx::postgres::PgPoolOptions; use tracing::info; -use crate::{config::SyncConfig, engine::Engine, health, manager::StreamManager}; +use crate::{ + config::{self, SyncConfig}, + engine::Engine, + health, + manager::StreamManager, +}; pub async fn run(config: SyncConfig) -> Result<()> { info!("Starting ampsync"); @@ -22,11 +27,6 @@ pub async fn run(config: SyncConfig) -> Result<()> { .context("Failed to run state store migrations")?; info!("State store migrations complete"); - info!("Tables to sync: {} tables", config.tables.len()); - for table_name in &config.tables { - info!(" - {}", table_name); - } - // Create engine let engine = Engine::new(pool.clone()); @@ -34,6 +34,24 @@ pub async fn run(config: SyncConfig) -> Result<()> { let dataset = config.dataset.to_full_reference(); info!("Dataset: {}", dataset); + // Build table mappings (source → destination) based on suffix/version config + let mappings = config::build_table_mappings( + &config.tables, + &dataset, + config.table_version, + config.table_suffix.as_deref(), + ) + .context("Failed to build table mappings")?; + + info!("Tables to sync: {} tables", mappings.len()); + for mapping in &mappings { + if mapping.source == mapping.destination { + info!(" - {}", mapping.source); + } else { + info!(" - {} -> {}", mapping.source, mapping.destination); + } + } + // Create streaming client let grpc_max_decode_bytes = config.grpc_max_decode_mb as usize * 1024 * 1024; info!( @@ -56,7 +74,7 @@ pub async fn run(config: SyncConfig) -> Result<()> { info!("Amp client initialized"); // Spawn streaming tasks (table creation happens in StreamTask::new) - let manager = StreamManager::new(&config.tables, dataset, &config, engine, client, pool); + let manager = StreamManager::new(&mappings, dataset, &config, engine, client, pool); // Start health server if configured if let Some(port) = config.health_port { diff --git a/crates/bin/ampsync/src/config.rs b/crates/bin/ampsync/src/config.rs index 1d0cd5033..c1685ae2d 100644 --- a/crates/bin/ampsync/src/config.rs +++ b/crates/bin/ampsync/src/config.rs @@ -1,5 +1,7 @@ use clap::{Args, Parser, Subcommand}; -use datasets_common::partial_reference::PartialReference; +use datasets_common::{partial_reference::PartialReference, reference::Reference}; + +use crate::sql; #[derive(Parser, Debug)] #[command(name = "ampsync")] @@ -52,6 +54,28 @@ pub struct SyncConfig { )] pub tables: Vec, + /// Suffix to append to all destination table names + /// + /// When set, all PostgreSQL table names will have this suffix appended. + /// Example: --table-suffix green + /// With TABLES=usdc_transfers → PostgreSQL table: usdc_transfers_green + /// + /// Can also be set via TABLE_SUFFIX environment variable + #[arg(long, env = "TABLE_SUFFIX")] + pub table_suffix: Option, + + /// Append the dataset revision to destination table names + /// + /// When enabled, the sanitized dataset revision is appended to table names. + /// Example: dataset @1.0.0, TABLES=usdc_transfers → usdc_transfers_v1_0_0 + /// + /// When combined with --table-suffix, version comes first: + /// usdc_transfers_v1_0_0_green + /// + /// Can also be set via TABLE_VERSION environment variable + #[arg(long, env = "TABLE_VERSION", default_value_t = false)] + pub table_version: bool, + /// PostgreSQL connection URL (required) /// /// Format: postgresql://[user]:[password]@[host]:[port]/[database] @@ -110,3 +134,297 @@ pub struct SyncConfig { #[arg(long, env = "HEALTH_PORT")] pub health_port: Option, } + +/// Maps a source table name (in the dataset) to a destination table name (in PostgreSQL). +#[derive(Debug, Clone)] +pub struct TableMapping { + /// Table name in the dataset (used for streaming queries). + pub source: String, + /// Table name in PostgreSQL (used for create/insert/delete). + pub destination: String, +} + +/// Errors from building table mappings. +#[derive(Debug, thiserror::Error)] +pub enum TableMappingError { + #[error("Source table name '{source}' is not a valid identifier: {source_err}")] + InvalidSourceTable { + source: String, + #[source] + source_err: sql::ValidateIdentifierError, + }, + + #[error( + "Destination table name '{destination}' (derived from source '{source}') is not a valid identifier: {source_err}" + )] + InvalidDestinationTable { + source: String, + destination: String, + #[source] + source_err: sql::ValidateIdentifierError, + }, + + #[error("Table suffix '{suffix}' is not a valid identifier fragment: {source}")] + InvalidSuffix { + suffix: String, + #[source] + source: sql::ValidateIdentifierError, + }, + + #[error( + "Duplicate destination table '{destination}' (from source tables '{first}' and '{second}')" + )] + DuplicateDestination { + destination: String, + first: String, + second: String, + }, +} + +/// Sanitize a dataset revision string for use in a table name. +/// +/// Replaces non-alphanumeric characters with underscores and prepends `v`. +/// Example: `1.0.0` → `v1_0_0`, `abc123...` → `vabc123...` +fn sanitize_revision(revision: &str) -> String { + let sanitized: String = revision + .chars() + .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' }) + .collect(); + format!("v{}", sanitized) +} + +/// Build destination table name from source name, optional version, and optional suffix. +fn build_destination_name( + source: &str, + version_segment: Option<&str>, + suffix: Option<&str>, +) -> String { + let mut name = source.to_string(); + if let Some(ver) = version_segment { + name = format!("{}_{}", name, ver); + } + if let Some(sfx) = suffix { + name = format!("{}_{}", name, sfx); + } + name +} + +/// Build table mappings from config and resolved dataset reference. +/// +/// Validates all source and destination table names. When `table_version` is enabled, +/// the sanitized dataset revision is appended. When `table_suffix` is set, it is +/// appended after the version segment. +/// +/// # Composition order +/// +/// `{source}_v{revision}_{suffix}` +pub fn build_table_mappings( + tables: &[String], + dataset: &Reference, + table_version: bool, + table_suffix: Option<&str>, +) -> Result, TableMappingError> { + // Validate suffix as an identifier fragment (must be a valid identifier on its own) + if let Some(suffix) = table_suffix { + sql::validate_identifier(suffix).map_err(|err| TableMappingError::InvalidSuffix { + suffix: suffix.to_string(), + source: err, + })?; + } + + let version_segment = if table_version { + Some(sanitize_revision(&dataset.revision().to_string())) + } else { + None + }; + + let mappings: Vec = tables + .iter() + .map(|source| { + // Validate source table name + sql::validate_identifier(source).map_err(|err| { + TableMappingError::InvalidSourceTable { + source: source.clone(), + source_err: err, + } + })?; + + let destination = + build_destination_name(source, version_segment.as_deref(), table_suffix); + + // Validate destination table name (it could exceed length limits) + sql::validate_identifier(&destination).map_err(|err| { + TableMappingError::InvalidDestinationTable { + source: source.clone(), + destination: destination.clone(), + source_err: err, + } + })?; + + Ok(TableMapping { + source: source.clone(), + destination, + }) + }) + .collect::, _>>()?; + + // Check for duplicate destination table names + let mut seen = std::collections::HashMap::with_capacity(mappings.len()); + for mapping in &mappings { + if let Some(first_source) = seen.insert(&mapping.destination, &mapping.source) { + return Err(TableMappingError::DuplicateDestination { + destination: mapping.destination.clone(), + first: first_source.clone(), + second: mapping.source.clone(), + }); + } + } + + Ok(mappings) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Helper to create a test Reference. + fn test_reference(revision: &str) -> Reference { + format!("_/test_dataset@{}", revision).parse().unwrap() + } + + #[test] + fn test_build_table_mappings_no_options() { + let tables = vec!["transfers".to_string(), "blocks".to_string()]; + let dataset = test_reference("1.0.0"); + let mappings = build_table_mappings(&tables, &dataset, false, None).unwrap(); + + assert_eq!(mappings.len(), 2); + assert_eq!(mappings[0].source, "transfers"); + assert_eq!(mappings[0].destination, "transfers"); + assert_eq!(mappings[1].source, "blocks"); + assert_eq!(mappings[1].destination, "blocks"); + } + + #[test] + fn test_build_table_mappings_with_version() { + let tables = vec!["transfers".to_string()]; + let dataset = test_reference("1.0.0"); + let mappings = build_table_mappings(&tables, &dataset, true, None).unwrap(); + + assert_eq!(mappings[0].source, "transfers"); + assert_eq!(mappings[0].destination, "transfers_v1_0_0"); + } + + #[test] + fn test_build_table_mappings_with_suffix() { + let tables = vec!["transfers".to_string()]; + let dataset = test_reference("1.0.0"); + let mappings = build_table_mappings(&tables, &dataset, false, Some("green")).unwrap(); + + assert_eq!(mappings[0].source, "transfers"); + assert_eq!(mappings[0].destination, "transfers_green"); + } + + #[test] + fn test_build_table_mappings_with_version_and_suffix() { + let tables = vec!["transfers".to_string()]; + let dataset = test_reference("1.0.0"); + let mappings = build_table_mappings(&tables, &dataset, true, Some("green")).unwrap(); + + assert_eq!(mappings[0].source, "transfers"); + assert_eq!(mappings[0].destination, "transfers_v1_0_0_green"); + } + + #[test] + fn test_build_table_mappings_invalid_source() { + let tables = vec!["bad-table".to_string()]; + let dataset = test_reference("1.0.0"); + let err = build_table_mappings(&tables, &dataset, false, None).unwrap_err(); + + assert!(matches!(err, TableMappingError::InvalidSourceTable { .. })); + } + + #[test] + fn test_build_table_mappings_invalid_suffix() { + let tables = vec!["transfers".to_string()]; + let dataset = test_reference("1.0.0"); + let err = build_table_mappings(&tables, &dataset, false, Some("bad-suffix")).unwrap_err(); + + assert!(matches!(err, TableMappingError::InvalidSuffix { .. })); + } + + #[test] + fn test_build_table_mappings_destination_too_long() { + // Source is valid but destination with version+suffix exceeds 63 bytes + let tables = vec!["a".repeat(50)]; + let tables: Vec = tables.into_iter().collect(); + let dataset = test_reference("1.0.0"); + let err = + build_table_mappings(&tables, &dataset, true, Some("long_suffix_here")).unwrap_err(); + + assert!(matches!( + err, + TableMappingError::InvalidDestinationTable { .. } + )); + } + + #[test] + fn test_build_table_mappings_duplicate_destination() { + // Two different source tables that map to the same destination + let tables = vec!["transfers".to_string(), "transfers".to_string()]; + let dataset = test_reference("1.0.0"); + let err = build_table_mappings(&tables, &dataset, false, None).unwrap_err(); + + assert!(matches!( + err, + TableMappingError::DuplicateDestination { .. } + )); + } + + #[test] + fn test_sanitize_revision_semver() { + assert_eq!(sanitize_revision("1.0.0"), "v1_0_0"); + assert_eq!(sanitize_revision("12.3.45"), "v12_3_45"); + } + + #[test] + fn test_sanitize_revision_hash() { + let hash = "abc123def456"; + assert_eq!(sanitize_revision(hash), format!("v{}", hash)); + } + + #[test] + fn test_sanitize_revision_special_chars() { + assert_eq!(sanitize_revision("1.0.0-beta"), "v1_0_0_beta"); + assert_eq!(sanitize_revision("1.0.0+build"), "v1_0_0_build"); + } + + #[test] + fn test_build_destination_name_no_options() { + assert_eq!(build_destination_name("transfers", None, None), "transfers"); + } + + #[test] + fn test_build_destination_name_version_only() { + assert_eq!( + build_destination_name("transfers", Some("v1_0_0"), None), + "transfers_v1_0_0" + ); + } + + #[test] + fn test_build_destination_name_suffix_only() { + assert_eq!( + build_destination_name("transfers", None, Some("green")), + "transfers_green" + ); + } + + #[test] + fn test_build_destination_name_both() { + assert_eq!( + build_destination_name("transfers", Some("v1_0_0"), Some("green")), + "transfers_v1_0_0_green" + ); + } +} diff --git a/crates/bin/ampsync/src/manager.rs b/crates/bin/ampsync/src/manager.rs index 80e18a7be..1ce581d04 100644 --- a/crates/bin/ampsync/src/manager.rs +++ b/crates/bin/ampsync/src/manager.rs @@ -7,7 +7,6 @@ use std::time::Duration; use amp_client::AmpClient; -use common::BlockNum; use datasets_common::reference::Reference; use monitoring::logging; use sqlx::PgPool; @@ -15,7 +14,11 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; -use crate::{config::SyncConfig, engine::Engine, task::StreamTask}; +use crate::{ + config::{SyncConfig, TableMapping}, + engine::Engine, + task::{StreamTask, StreamTaskConfig}, +}; /// Maximum number of restart attempts per table const MAX_RESTART_ATTEMPTS: u32 = 10; @@ -26,18 +29,6 @@ const INITIAL_BACKOFF_SECS: u64 = 1; /// Maximum backoff duration in seconds (5 minutes) const MAX_BACKOFF_SECS: u64 = 300; -/// Configuration for a single table's streaming task. -#[derive(Clone)] -struct TableTaskConfig { - table_name: String, - dataset: Reference, - engine: Engine, - client: AmpClient, - pool: PgPool, - retention: BlockNum, - shutdown: CancellationToken, -} - /// Manages streaming tasks for multiple tables with restart logic and graceful shutdown. pub struct StreamManager { tasks: Vec>, @@ -54,14 +45,14 @@ impl StreamManager { /// /// # Arguments /// - /// * `tables` - List of table names to sync + /// * `mappings` - Table mappings (source → destination) /// * `dataset` - Fully resolved dataset reference /// * `config` - Ampsync configuration /// * `engine` - Database engine for table operations /// * `client` - Amp client for streaming queries /// * `pool` - PostgreSQL connection pool pub fn new( - tables: &[String], + mappings: &[TableMapping], dataset: Reference, config: &SyncConfig, engine: Engine, @@ -70,13 +61,18 @@ impl StreamManager { ) -> Self { let shutdown = CancellationToken::new(); - let tasks = tables + let tasks = mappings .iter() - .map(|table_name| { - info!("Creating stream for table: {}", table_name); - - let task_config = TableTaskConfig { - table_name: table_name.clone(), + .map(|mapping| { + info!( + source = %mapping.source, + destination = %mapping.destination, + "Creating stream for table" + ); + + let task_config = StreamTaskConfig { + source_table: mapping.source.clone(), + destination_table: mapping.destination.clone(), dataset: dataset.clone(), engine: engine.clone(), client: client.clone(), @@ -86,7 +82,11 @@ impl StreamManager { }; let handle = tokio::spawn(Self::run_task_with_restart(task_config)); - info!("Spawned task for table: {}", table_name); + info!( + source = %mapping.source, + destination = %mapping.destination, + "Spawned task for table" + ); handle }) .collect(); @@ -95,20 +95,20 @@ impl StreamManager { } /// Runs a single table's streaming task with automatic restart on failure. - async fn run_task_with_restart(config: TableTaskConfig) { + async fn run_task_with_restart(config: StreamTaskConfig) { let mut restart_count = 0; loop { // Check shutdown before (re)starting if config.shutdown.is_cancelled() { - info!(table = %config.table_name, "shutdown_before_restart"); + info!(table = %config.destination_table, "shutdown_before_restart"); break; } // Build and run task match Self::build_and_run_task(&config).await { Ok(()) => { - info!(table = %config.table_name, "task_stopped_cleanly"); + info!(table = %config.destination_table, "task_stopped_cleanly"); break; } Err(err) => { @@ -116,7 +116,7 @@ impl StreamManager { if restart_count >= MAX_RESTART_ATTEMPTS { error!( - table = %config.table_name, + table = %config.destination_table, error = %err, error_source = logging::error_source(err.as_ref()), restart_count = restart_count, @@ -127,7 +127,7 @@ impl StreamManager { let backoff_duration = calculate_backoff(restart_count); warn!( - table = %config.table_name, + table = %config.destination_table, error = %err, error_source = logging::error_source(err.as_ref()), restart_count = restart_count, @@ -143,19 +143,11 @@ impl StreamManager { /// Builds and runs a single streaming task. async fn build_and_run_task( - config: &TableTaskConfig, + config: &StreamTaskConfig, ) -> Result<(), Box> { - let task = StreamTask::new( - config.table_name.clone(), - config.dataset.clone(), - config.engine.clone(), - config.client.clone(), - config.pool.clone(), - config.retention, - config.shutdown.clone(), - ) - .await - .map_err(|e| Box::new(e) as Box)?; + let task = StreamTask::new(config.clone()) + .await + .map_err(|e| Box::new(e) as Box)?; task.run() .await diff --git a/crates/bin/ampsync/src/task.rs b/crates/bin/ampsync/src/task.rs index c08026978..a0ccebd5c 100644 --- a/crates/bin/ampsync/src/task.rs +++ b/crates/bin/ampsync/src/task.rs @@ -11,6 +11,21 @@ use tracing::{debug, error, info, instrument, warn}; use crate::engine::Engine; +/// Configuration for creating a [`StreamTask`]. +#[derive(Clone)] +pub struct StreamTaskConfig { + /// Source table name in the dataset (used for streaming queries). + pub source_table: String, + /// Destination table name in PostgreSQL (used for create/insert/delete). + pub destination_table: String, + pub dataset: Reference, + pub engine: Engine, + pub client: AmpClient, + pub pool: PgPool, + pub retention: BlockNum, + pub shutdown: CancellationToken, +} + /// Errors that occur during stream task execution #[derive(Debug, thiserror::Error)] pub enum StreamTaskError { @@ -101,7 +116,7 @@ pub enum StreamTaskError { /// The task respects the `CancellationToken` for clean shutdown. When cancelled, /// it stops processing new events and allows the current event to complete. pub struct StreamTask { - table_name: String, + destination_table: String, dataset: Reference, stream: TransactionalStream, engine: Engine, @@ -120,13 +135,7 @@ impl StreamTask { /// 6. Returns a configured StreamTask ready to run /// /// # Arguments - /// - `table_name`: Target table name - /// - `dataset`: Fully resolved dataset reference (for stream ID) - /// - `engine`: Database engine for table creation - /// - `client`: AmpClient instance - /// - `pool`: Database connection pool (for creating state store) - /// - `retention`: Retention window in blocks - /// - `shutdown`: Cancellation token for graceful shutdown + /// - `config`: Stream task configuration (source/destination tables, dataset, engine, etc.) /// /// # Errors /// @@ -134,22 +143,27 @@ impl StreamTask { /// - State store creation fails /// - Stream creation fails /// - Table creation fails - pub async fn new( - table_name: String, - dataset: Reference, - engine: Engine, - client: AmpClient, - pool: PgPool, - retention: BlockNum, - shutdown: CancellationToken, - ) -> Result { - // Build streaming query - let query = crate::sql::streaming_query(&dataset, &table_name); - - // Create state store for this table using fully qualified dataset reference - let stream_id = format!("{}:{}", dataset, table_name); + pub async fn new(config: StreamTaskConfig) -> Result { + let StreamTaskConfig { + source_table, + destination_table, + dataset, + engine, + client, + pool, + retention, + shutdown, + } = config; + + // Build streaming query using the source table name (dataset table) + let query = crate::sql::streaming_query(&dataset, &source_table); + + // State store key uses the destination table name so different destinations + // get independent stream positions. + let stream_id = format!("{}:{}", dataset, destination_table); info!( - table = %table_name, + source_table = %source_table, + destination_table = %destination_table, stream_id = %stream_id, "creating_state_store" ); @@ -162,7 +176,8 @@ impl StreamTask { // Create transactional stream info!( - table = %table_name, + source_table = %source_table, + destination_table = %destination_table, query = %query, retention_blocks = retention, "creating_transactional_stream" @@ -179,25 +194,25 @@ impl StreamTask { // Get schema from stream let schema = stream.schema(); info!( - table = %table_name, + destination_table = %destination_table, num_fields = schema.fields().len(), "retrieved_schema_from_stream" ); - // Create table using stream schema - info!(table = %table_name, "creating_table"); + // Create destination table using stream schema (idempotent via IF NOT EXISTS) + info!(destination_table = %destination_table, "creating_table"); engine - .create_table(&table_name, schema) + .create_table(&destination_table, schema) .await .map_err(|err| StreamTaskError::CreateTable { - table_name: table_name.clone(), + table_name: destination_table.clone(), source: Box::new(err), })?; - info!(table = %table_name, "table_created"); + info!(destination_table = %destination_table, "table_created"); Ok(Self { - table_name, + destination_table, dataset, stream, engine, @@ -225,11 +240,11 @@ impl StreamTask { /// Returns an error if: /// - Event processing fails (database operation error) /// - State commit fails (PostgreSQL connection error) - #[instrument(skip(self), fields(table = %self.table_name, dataset = %self.dataset))] + #[instrument(skip(self), fields(table = %self.destination_table, dataset = %self.dataset))] pub async fn run(self) -> Result<(), StreamTaskError> { // Destructure self to avoid borrowing issues with non-Sync types let Self { - table_name, + destination_table, dataset: _, mut stream, engine, @@ -250,7 +265,7 @@ impl StreamTask { event_result = stream.next() => { match event_result { Some(Ok((event, commit))) => { - if let Err(e) = Self::handle_event(&table_name, &engine, event, commit).await { + if let Err(e) = Self::handle_event(&destination_table, &engine, event, commit).await { error!(error = %e, error_source = logging::error_source(&e), "event_handling_failed"); return Err(e); } @@ -262,7 +277,7 @@ impl StreamTask { None => { error!("stream_ended_unexpectedly"); return Err(StreamTaskError::UnexpectedStreamEnd { - table_name: table_name.clone(), + table_name: destination_table.clone(), }); } } @@ -290,9 +305,9 @@ impl StreamTask { /// Returns an error if database operations or state commits fail. Errors /// cause the entire task to fail, requiring a restart (with automatic Undo /// cleanup via gap detection). - #[instrument(skip(engine, event, commit), fields(table = %table_name))] + #[instrument(skip(engine, event, commit), fields(table = %destination_table))] async fn handle_event( - table_name: &str, + destination_table: &str, engine: &Engine, event: TransactionEvent, commit: amp_client::CommitHandle, @@ -315,22 +330,22 @@ impl StreamTask { // Add _tx_id and _row_index columns let batch_with_metadata = crate::arrow::add_transaction_metadata(batch, id) .map_err(|err| StreamTaskError::AddMetadata { - table_name: table_name.to_string(), + table_name: destination_table.to_string(), source: err, })?; // Insert to database engine - .insert_batch(table_name, batch_with_metadata) + .insert_batch(destination_table, batch_with_metadata) .await .map_err(|err| StreamTaskError::InsertBatch { - table_name: table_name.to_string(), + table_name: destination_table.to_string(), source: err, })?; // Commit state (marks transaction as durable) commit.await.map_err(|err| StreamTaskError::CommitState { - table_name: table_name.to_string(), + table_name: destination_table.to_string(), source: err, })?; @@ -357,16 +372,16 @@ impl StreamTask { // Delete rows by transaction ID range engine - .delete_by_tx_range(table_name, invalidate_start, invalidate_end) + .delete_by_tx_range(destination_table, invalidate_start, invalidate_end) .await .map_err(|err| StreamTaskError::DeleteByTxRange { - table_name: table_name.to_string(), + table_name: destination_table.to_string(), source: err, })?; // Commit state (acknowledges cleanup) commit.await.map_err(|err| StreamTaskError::CommitState { - table_name: table_name.to_string(), + table_name: destination_table.to_string(), source: err, })?; @@ -394,7 +409,7 @@ impl StreamTask { // Just commit (state managed by amp-client) // No database operations needed for watermarks commit.await.map_err(|err| StreamTaskError::CommitState { - table_name: table_name.to_string(), + table_name: destination_table.to_string(), source: err, })?; From ea7feec231e64d369b406d163c8c8e49db97ac16 Mon Sep 17 00:00:00 2001 From: Matias Date: Tue, 17 Mar 2026 18:39:50 -0300 Subject: [PATCH 2/2] f: PR comments --- crates/bin/ampsync/src/config.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/crates/bin/ampsync/src/config.rs b/crates/bin/ampsync/src/config.rs index c1685ae2d..4ba345a60 100644 --- a/crates/bin/ampsync/src/config.rs +++ b/crates/bin/ampsync/src/config.rs @@ -185,6 +185,10 @@ pub enum TableMappingError { /// /// Replaces non-alphanumeric characters with underscores and prepends `v`. /// Example: `1.0.0` → `v1_0_0`, `abc123...` → `vabc123...` +/// +/// Note: different revisions may produce the same sanitized output +/// (e.g., `1.0.0` and `1-0-0` both become `v1_0_0`). This is acceptable +/// because a single ampsync instance targets one dataset revision. fn sanitize_revision(revision: &str) -> String { let sanitized: String = revision .chars() @@ -224,6 +228,9 @@ pub fn build_table_mappings( table_version: bool, table_suffix: Option<&str>, ) -> Result, TableMappingError> { + // Normalize empty suffix to None (e.g., TABLE_SUFFIX="" in env) + let table_suffix = table_suffix.filter(|s| !s.is_empty()); + // Validate suffix as an identifier fragment (must be a valid identifier on its own) if let Some(suffix) = table_suffix { sql::validate_identifier(suffix).map_err(|err| TableMappingError::InvalidSuffix { @@ -335,6 +342,16 @@ mod tests { assert_eq!(mappings[0].destination, "transfers_v1_0_0_green"); } + #[test] + fn test_build_table_mappings_empty_suffix_treated_as_none() { + let tables = vec!["transfers".to_string()]; + let dataset = test_reference("1.0.0"); + let mappings = build_table_mappings(&tables, &dataset, false, Some("")).unwrap(); + + assert_eq!(mappings[0].source, "transfers"); + assert_eq!(mappings[0].destination, "transfers"); + } + #[test] fn test_build_table_mappings_invalid_source() { let tables = vec!["bad-table".to_string()];