Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/bin/ampsync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ datasets-common = { path = "../../core/datasets-common" }
futures.workspace = true
monitoring = { path = "../../core/monitoring" }
pg_escape.workspace = true
serde.workspace = true
toml.workspace = true
rustls.workspace = true
sqlparser.workspace = true
sqlx = { version = "0.8.6", features = [
Expand Down
48 changes: 48 additions & 0 deletions crates/bin/ampsync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Options:
--grpc-max-decode-mb <MB> Max gRPC decode size in MiB (default: 32, range: 1-512)
--max-db-connections <N> Max connections (default: 10, range: 1-1000)
--retention-blocks <N> Retention blocks (default: 128, min: 64)
--config <PATH> TOML config file for index definitions
--auth-token <TOKEN> Authentication token for Arrow Flight
-h, --help Print help
-V, --version Print version
Expand Down Expand Up @@ -55,6 +56,7 @@ All CLI arguments can also be set via environment variables:
- **`MAX_DB_CONNECTIONS`** (default: `10`): Database connection pool size (valid range: 1-1000)
- **`RETENTION_BLOCKS`** (default: `128`): Watermark retention window (must be >= 64)
- **`AMP_AUTH_TOKEN`** (optional): Bearer token for authenticating requests to the Arrow Flight server
- **`AMPSYNC_CONFIG`** (optional): Path to TOML config file for index definitions (see [Index Configuration](#index-configuration))

## Running

Expand Down Expand Up @@ -93,6 +95,52 @@ services:
restart: unless-stopped
```

## Index Configuration

Ampsync can create PostgreSQL indexes on synced tables via a TOML config file
passed with `--config` (or `AMPSYNC_CONFIG` env var). Indexes are created
atomically alongside tables using `IF NOT EXISTS`, so adding new indexes and
restarting is safe and idempotent.

### Config file format

Each `[[index]]` entry defines one index:

```toml
[[index]]
table = "transfers" # required — must be in --tables list
name = "idx_transfers_sender" # required — valid SQL identifier
columns = ["sender", "block_number desc"] # required — key columns, optional sort
method = "btree" # optional — btree (default), hash, gin, gist, brin
unique = false # optional — default false
include = ["amount"] # optional — INCLUDE columns (covering index)
where = "amount > 1000" # optional — partial index WHERE clause
```

### Example

```toml
# Covering index for lookups by seller with timestamp ordering
[[index]]
table = "usdc_transfers"
name = "idx_usdc_transfers_seller_timestamp_covering"
columns = ["seller_address", "timestamp desc"]
include = ["buyer_address", "value_usdc"]

# Simple index on buyer address
[[index]]
table = "usdc_transfers"
name = "idx_usdc_transfers_buyer_address"
columns = ["buyer_address"]

# Partial index for large transfers only
[[index]]
table = "usdc_transfers"
name = "idx_usdc_transfers_large"
columns = ["seller_address"]
where = "value_usdc > 10000"
```

## Database Schema

### System Columns
Expand Down
34 changes: 32 additions & 2 deletions crates/bin/ampsync/src/commands/sync.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections::HashMap;

use amp_client::{AmpClient, PostgresStateStore};
use anyhow::{Context, Result};
use sqlx::postgres::PgPoolOptions;
use tracing::info;

use crate::{config::SyncConfig, engine::Engine, health, manager::StreamManager};
use crate::{config::SyncConfig, engine::Engine, health, index, manager::StreamManager};

pub async fn run(config: SyncConfig) -> Result<()> {
info!("Starting ampsync");
Expand Down Expand Up @@ -55,8 +57,36 @@ pub async fn run(config: SyncConfig) -> Result<()> {

info!("Amp client initialized");

// Load index configuration from config file (if provided)
let indexes_by_table = match &config.config {
Some(config_path) => {
info!(path = %config_path.display(), "Loading index configuration");
index::load_config(config_path, &config.tables)
.await
.context("Failed to load index configuration")?
}
None => HashMap::new(),
};

if !indexes_by_table.is_empty() {
let total: usize = indexes_by_table.values().map(|v| v.len()).sum();
info!(
total_indexes = total,
tables_with_indexes = indexes_by_table.len(),
"Index configuration loaded"
);
}

// Spawn streaming tasks (table creation happens in StreamTask::new)
let manager = StreamManager::new(&config.tables, dataset, &config, engine, client, pool);
let manager = StreamManager::new(
&config.tables,
dataset,
&config,
engine,
client,
pool,
&indexes_by_table,
);

// Start health server if configured
if let Some(port) = config.health_port {
Expand Down
11 changes: 11 additions & 0 deletions crates/bin/ampsync/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::path::PathBuf;

use clap::{Args, Parser, Subcommand};
use datasets_common::partial_reference::PartialReference;

Expand Down Expand Up @@ -99,6 +101,15 @@ pub struct SyncConfig {
#[arg(long, env = "AMP_AUTH_TOKEN")]
pub auth_token: Option<String>,

/// Path to TOML config file for index definitions and other settings
///
/// The config file can define indexes to create on synced tables.
/// See the "Index Configuration" section in the crate README for format details.
///
/// Can also be set via AMPSYNC_CONFIG environment variable
#[arg(long, env = "AMPSYNC_CONFIG")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using a different name here, index-config or something like that. Otherwise it sounds like the configuration file to run ampsync, where i would expect to be able to set things like auth, table, etc (basically all other config).

pub config: Option<PathBuf>,

/// Health check server port (optional)
///
/// When provided, starts an HTTP server on 0.0.0.0 that exposes a /healthz
Expand Down
Loading
Loading