Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions crates/bin/ampsync/src/commands/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ use anyhow::{Context, Result};
use sqlx::postgres::PgPoolOptions;
use tracing::info;

use crate::{config::SyncConfig, engine::Engine, health, manager::StreamManager};
use crate::{
config::{self, SyncConfig},
engine::Engine,
health,
manager::StreamManager,
};

pub async fn run(config: SyncConfig) -> Result<()> {
info!("Starting ampsync");
Expand All @@ -22,18 +27,31 @@ pub async fn run(config: SyncConfig) -> Result<()> {
.context("Failed to run state store migrations")?;
info!("State store migrations complete");

info!("Tables to sync: {} tables", config.tables.len());
for table_name in &config.tables {
info!(" - {}", table_name);
}

// Create engine
let engine = Engine::new(pool.clone());

// Convert PartialReference to full Reference by filling in defaults
let dataset = config.dataset.to_full_reference();
info!("Dataset: {}", dataset);

// Build table mappings (source → destination) based on suffix/version config
let mappings = config::build_table_mappings(
&config.tables,
&dataset,
config.table_version,
config.table_suffix.as_deref(),
)
.context("Failed to build table mappings")?;

info!("Tables to sync: {} tables", mappings.len());
for mapping in &mappings {
if mapping.source == mapping.destination {
info!(" - {}", mapping.source);
} else {
info!(" - {} -> {}", mapping.source, mapping.destination);
}
}

// Create streaming client
let grpc_max_decode_bytes = config.grpc_max_decode_mb as usize * 1024 * 1024;
info!(
Expand All @@ -56,7 +74,7 @@ pub async fn run(config: SyncConfig) -> Result<()> {
info!("Amp client initialized");

// Spawn streaming tasks (table creation happens in StreamTask::new)
let manager = StreamManager::new(&config.tables, dataset, &config, engine, client, pool);
let manager = StreamManager::new(&mappings, dataset, &config, engine, client, pool);

// Start health server if configured
if let Some(port) = config.health_port {
Expand Down
Loading
Loading