diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 795d71e..f1e74a8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -63,7 +63,7 @@ jobs: profile: minimal - name: Run tests run: | - cargo test --verbose --color always -- --show-output + RUST_BACKTRACE=1 cargo test --verbose --color always -- --show-output env: RAPID_GOSSIP_TEST_DB_HOST: localhost RAPID_GOSSIP_TEST_DB_NAME: postgres diff --git a/src/config.rs b/src/config.rs index 2352780..8195a31 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,7 +21,7 @@ use tokio_postgres::Config; use tokio::sync::Semaphore; -pub(crate) const SCHEMA_VERSION: i32 = 15; +pub(crate) const SCHEMA_VERSION: i32 = 17; pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks // generate symlinks based on a 3-hour-granularity @@ -167,11 +167,8 @@ pub(crate) fn db_index_creation_query() -> &'static str { " CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id); CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_asc ON channel_updates(short_channel_id, direction, seen); - CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_desc_with_id ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id); CREATE UNIQUE INDEX IF NOT EXISTS channel_updates_key ON channel_updates (short_channel_id, direction, timestamp); - CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen); - CREATE INDEX IF NOT EXISTS channel_updates_scid_asc_timestamp_desc ON channel_updates(short_channel_id ASC, timestamp DESC); - CREATE INDEX IF NOT EXISTS node_announcements_seen_pubkey ON node_announcements(seen, public_key); + CREATE INDEX IF NOT EXISTS node_announcements_pubkey_seen_desc ON node_announcements(public_key, seen DESC); " } @@ -328,27 +325,39 @@ pub(crate) async fn upgrade_db( // Note that we don't bother doing this one in a transaction, and as such need to support // resuming on a crash. let _ = client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await; - tokio::spawn(async move { - let client = crate::connect_to_db().await; - let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap()); - let sem = Arc::new(Semaphore::new(16)); - while let Some(scid_res) = scids.next().await { - let scid: i64 = scid_res.unwrap().get(0); - let permit = Arc::clone(&sem).acquire_owned().await.unwrap(); - let logger = logger.clone(); - tokio::spawn(async move { - let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint())); - let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await - .expect("We shouldn't have accepted a channel announce with a bad TXO"); - let client = crate::connect_to_db().await; - client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap(); - std::mem::drop(permit); - }); - } - let _all_updates_complete = sem.acquire_many(16).await.unwrap(); - client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap(); - client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap(); - }); + let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap()); + let sem = Arc::new(Semaphore::new(16)); + while let Some(scid_res) = scids.next().await { + let scid: i64 = scid_res.unwrap().get(0); + let permit = Arc::clone(&sem).acquire_owned().await.unwrap(); + let logger = logger.clone(); + tokio::spawn(async move { + let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint())); + let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await + .expect("We shouldn't have accepted a channel announce with a bad TXO"); + let client = crate::connect_to_db().await; + client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap(); + std::mem::drop(permit); + }); + } + let _all_updates_complete = sem.acquire_many(16).await.unwrap(); + client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap(); + client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap(); + } + if schema >= 1 && schema <= 15 { + let tx = client.transaction().await.unwrap(); + tx.execute("DROP INDEX IF EXISTS node_announcements_seen_pubkey", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 16 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); + } + if schema >= 1 && schema <= 16 { + let tx = client.transaction().await.unwrap(); + tx.execute("DROP INDEX IF EXISTS channel_updates_scid_dir_seen_desc_with_id", &[]).await.unwrap(); + tx.execute("DROP INDEX IF EXISTS channel_updates_scid_asc_timestamp_desc", &[]).await.unwrap(); + tx.execute("DROP INDEX IF EXISTS channel_updates_seen", &[]).await.unwrap(); + tx.execute("DROP INDEX IF EXISTS node_announcements_pubkey_timestamp_desc", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 17 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); } if schema <= 1 || schema > SCHEMA_VERSION { panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION); diff --git a/src/lookup.rs b/src/lookup.rs index 071995f..93671ce 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -147,6 +147,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, &logger); log_info!(logger, "Obtaining corresponding database entries"); + let start = Instant::now(); // get all the channel announcements that are currently in the network graph let announcement_rows = client.query_raw("SELECT announcement_signed, funding_amount_sats, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap(); let mut pinned_rows = Box::pin(announcement_rows); @@ -171,7 +172,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS announcement_count += 1; } - log_info!(logger, "Fetched {} announcement rows", announcement_count); + log_info!(logger, "Fetched {} announcement rows in {:?}", announcement_count, start.elapsed()); { // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA @@ -184,20 +185,27 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS // here is where the channels whose first update in either direction occurred after // `last_seen_timestamp` are added to the selection + let start = Instant::now(); let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &last_sync_timestamp_float]; let newer_oldest_directional_updates = client.query_raw(" - SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM ( - SELECT DISTINCT ON (short_channel_id) * - FROM ( - SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen - FROM channel_updates - WHERE short_channel_id = any($1) - ORDER BY short_channel_id ASC, direction ASC, seen ASC - ) AS directional_last_seens - ORDER BY short_channel_id ASC, seen DESC - ) AS distinct_chans - WHERE distinct_chans.seen >= TO_TIMESTAMP($2) + SELECT scids.short_channel_id, CAST(EXTRACT('epoch' from GREATEST(dir0.seen, dir1.seen)) AS BIGINT) AS seen + FROM unnest($1::bigint[]) AS scids(short_channel_id) + CROSS JOIN LATERAL ( + SELECT seen + FROM channel_updates + WHERE short_channel_id = scids.short_channel_id AND direction = false + ORDER BY seen ASC + LIMIT 1 + ) dir0 + CROSS JOIN LATERAL ( + SELECT seen + FROM channel_updates + WHERE short_channel_id = scids.short_channel_id AND direction = true + ORDER BY seen ASC + LIMIT 1 + ) dir1 + WHERE GREATEST(dir0.seen, dir1.seen) >= TO_TIMESTAMP($2) ", params).await.unwrap(); let mut pinned_updates = Box::pin(newer_oldest_directional_updates); @@ -215,7 +223,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS newer_oldest_directional_update_count += 1; } - log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count); + log_info!(logger, "Fetched {} update rows of the first update in a new direction in {:?}", newer_oldest_directional_update_count, start.elapsed()); } if include_reminders { @@ -252,6 +260,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS 3x the timeframe that we consider necessitates reminders. */ + let start = Instant::now(); let mutated_updates = client.query_raw(" SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM ( SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE ( @@ -321,7 +330,8 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction); } } - log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count); + log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction in {:?}", + older_latest_directional_update_count, start.elapsed()); } } @@ -333,17 +343,22 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl // there was an update in either direction that happened after the last sync (to avoid // collecting too many reference updates) let reference_rows = client.query_raw(" - SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates - WHERE id IN ( - SELECT DISTINCT ON (short_channel_id, direction) id + SELECT cu.id, d.direction, CAST(EXTRACT('epoch' from cu.seen) AS BIGINT) AS seen, cu.blob_signed + FROM ( + SELECT DISTINCT short_channel_id FROM channel_updates - WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN ( - SELECT DISTINCT ON (short_channel_id) short_channel_id - FROM channel_updates - WHERE seen >= TO_TIMESTAMP($1) - ) - ORDER BY short_channel_id ASC, direction ASC, seen DESC - ) + WHERE seen >= TO_TIMESTAMP($1) + ) AS recent_scids + CROSS JOIN (VALUES (false), (true)) AS d(direction) + JOIN LATERAL ( + SELECT id, seen, blob_signed + FROM channel_updates + WHERE short_channel_id = recent_scids.short_channel_id + AND direction = d.direction + AND seen < TO_TIMESTAMP($1) + ORDER BY seen DESC + LIMIT 1 + ) cu ON true ", [last_sync_timestamp_float]).await.unwrap(); let mut pinned_rows = Box::pin(reference_rows); @@ -381,13 +396,14 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl reference_row_count += 1; } - log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}", + log_info!(logger, "Fetched + processed {} reference rows (delta size: {}) in {:?}", reference_row_count, delta_set.len(), start.elapsed()); // get all the intermediate channel updates // (to calculate the set of mutated fields for snapshotting, where intermediate updates may // have been omitted) + let start = Instant::now(); let intermediate_updates = client.query_raw(" SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_updates @@ -468,7 +484,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl } } } - log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); + log_info!(logger, "Fetched + processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } pub(super) async fn fetch_node_updates(network_graph: &NetworkGraph, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option, logger: L) -> NodeDeltaSet where L::Target: Logger { @@ -502,12 +518,16 @@ pub(super) async fn fetch_node_updates(network_graph: &Network // get the latest node updates prior to last_sync_timestamp let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float]; let reference_rows = client.query_raw(" - SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed - FROM node_announcements - WHERE - public_key = ANY($1) AND - seen < TO_TIMESTAMP($2) - ORDER BY public_key ASC, seen DESC + SELECT pk.public_key, CAST(EXTRACT('epoch' from na.seen) AS BIGINT) AS seen, na.announcement_signed + FROM unnest($1::varchar[]) AS pk(public_key) + CROSS JOIN LATERAL ( + SELECT seen, announcement_signed + FROM node_announcements + WHERE public_key = pk.public_key + AND seen < TO_TIMESTAMP($2) + ORDER BY seen DESC + LIMIT 1 + ) na ", params).await.unwrap(); let mut pinned_rows = Box::pin(reference_rows); @@ -539,7 +559,7 @@ pub(super) async fn fetch_node_updates(network_graph: &Network } - log_info!(logger, "Processed {} node announcement reference rows (delta size: {}) in {:?}", + log_info!(logger, "Fetched + processed {} node announcement reference rows (delta size: {}) in {:?}", reference_row_count, delta_set.len(), start.elapsed()); let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()); @@ -564,6 +584,7 @@ pub(super) async fn fetch_node_updates(network_graph: &Network // get all the intermediate node updates // (to calculate the set of mutated fields for snapshotting, where intermediate updates may // have been omitted) + let start = Instant::now(); let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &effective_threshold_timestamp]; let intermediate_updates = client.query_raw(" SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen @@ -646,7 +667,7 @@ pub(super) async fn fetch_node_updates(network_graph: &Network previous_node_id = Some(node_id); } - log_info!(logger, "Processed intermediate node announcement rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); + log_info!(logger, "Fetched + processed intermediate node announcement rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); delta_set } diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 7d97f22..0379aaa 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -511,8 +511,8 @@ async fn test_unidirectional_intermediate_update_consideration() { let serialization = serialize_delta(&delta, 1, logger.clone()); logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1); - logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1); - logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed 1 reference rows", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed intermediate rows (2)", 1); assert_eq!(serialization.message_count, 3); assert_eq!(serialization.channel_announcement_count, 1); @@ -578,8 +578,8 @@ async fn test_bidirectional_intermediate_update_consideration() { let serialization = serialize_delta(&delta, 1, logger.clone()); logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1); - logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1); - logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed 2 reference rows", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed intermediate rows (2)", 1); assert_eq!(serialization.message_count, 1); assert_eq!(serialization.channel_announcement_count, 0); @@ -662,8 +662,8 @@ async fn test_channel_reminders() { logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1); logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 4 update rows of the latest update in the less recently updated direction", 1); - logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1); - logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed 2 reference rows", 1); + logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed intermediate rows (2)", 1); assert_eq!(serialization.message_count, 4); assert_eq!(serialization.channel_announcement_count, 0);