Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
profile: minimal
- name: Run tests
run: |
cargo test --verbose --color always -- --show-output
RUST_BACKTRACE=1 cargo test --verbose --color always -- --show-output
env:
RAPID_GOSSIP_TEST_DB_HOST: localhost
RAPID_GOSSIP_TEST_DB_NAME: postgres
Expand Down
61 changes: 35 additions & 26 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio_postgres::Config;

use tokio::sync::Semaphore;

pub(crate) const SCHEMA_VERSION: i32 = 15;
pub(crate) const SCHEMA_VERSION: i32 = 17;
pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
// generate symlinks based on a 3-hour-granularity
Expand Down Expand Up @@ -167,11 +167,8 @@ pub(crate) fn db_index_creation_query() -> &'static str {
"
CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id);
CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_asc ON channel_updates(short_channel_id, direction, seen);
CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_desc_with_id ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id);
CREATE UNIQUE INDEX IF NOT EXISTS channel_updates_key ON channel_updates (short_channel_id, direction, timestamp);
CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen);
CREATE INDEX IF NOT EXISTS channel_updates_scid_asc_timestamp_desc ON channel_updates(short_channel_id ASC, timestamp DESC);
CREATE INDEX IF NOT EXISTS node_announcements_seen_pubkey ON node_announcements(seen, public_key);
CREATE INDEX IF NOT EXISTS node_announcements_pubkey_seen_desc ON node_announcements(public_key, seen DESC);
"
}

Expand Down Expand Up @@ -328,27 +325,39 @@ pub(crate) async fn upgrade_db<L: Deref + Clone + Send + Sync + 'static>(
// Note that we don't bother doing this one in a transaction, and as such need to support
// resuming on a crash.
let _ = client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await;
tokio::spawn(async move {
let client = crate::connect_to_db().await;
let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap());
let sem = Arc::new(Semaphore::new(16));
while let Some(scid_res) = scids.next().await {
let scid: i64 = scid_res.unwrap().get(0);
let permit = Arc::clone(&sem).acquire_owned().await.unwrap();
let logger = logger.clone();
tokio::spawn(async move {
let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint()));
let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await
.expect("We shouldn't have accepted a channel announce with a bad TXO");
let client = crate::connect_to_db().await;
client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap();
std::mem::drop(permit);
});
}
let _all_updates_complete = sem.acquire_many(16).await.unwrap();
client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap();
client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap();
});
let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap());
let sem = Arc::new(Semaphore::new(16));
while let Some(scid_res) = scids.next().await {
let scid: i64 = scid_res.unwrap().get(0);
let permit = Arc::clone(&sem).acquire_owned().await.unwrap();
let logger = logger.clone();
tokio::spawn(async move {
let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint()));
let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await
.expect("We shouldn't have accepted a channel announce with a bad TXO");
let client = crate::connect_to_db().await;
client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap();
std::mem::drop(permit);
});
}
let _all_updates_complete = sem.acquire_many(16).await.unwrap();
client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap();
client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap();
}
if schema >= 1 && schema <= 15 {
let tx = client.transaction().await.unwrap();
tx.execute("DROP INDEX IF EXISTS node_announcements_seen_pubkey", &[]).await.unwrap();
tx.execute("UPDATE config SET db_schema = 16 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema >= 1 && schema <= 16 {
let tx = client.transaction().await.unwrap();
tx.execute("DROP INDEX IF EXISTS channel_updates_scid_dir_seen_desc_with_id", &[]).await.unwrap();
tx.execute("DROP INDEX IF EXISTS channel_updates_scid_asc_timestamp_desc", &[]).await.unwrap();
tx.execute("DROP INDEX IF EXISTS channel_updates_seen", &[]).await.unwrap();
tx.execute("DROP INDEX IF EXISTS node_announcements_pubkey_timestamp_desc", &[]).await.unwrap();
tx.execute("UPDATE config SET db_schema = 17 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema <= 1 || schema > SCHEMA_VERSION {
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
Expand Down
89 changes: 55 additions & 34 deletions src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, &logger);

log_info!(logger, "Obtaining corresponding database entries");
let start = Instant::now();
// get all the channel announcements that are currently in the network graph
let announcement_rows = client.query_raw("SELECT announcement_signed, funding_amount_sats, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
let mut pinned_rows = Box::pin(announcement_rows);
Expand All @@ -171,7 +172,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS

announcement_count += 1;
}
log_info!(logger, "Fetched {} announcement rows", announcement_count);
log_info!(logger, "Fetched {} announcement rows in {:?}", announcement_count, start.elapsed());

{
// THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
Expand All @@ -184,20 +185,27 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS

// here is where the channels whose first update in either direction occurred after
// `last_seen_timestamp` are added to the selection
let start = Instant::now();
let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
[&channel_ids, &last_sync_timestamp_float];
let newer_oldest_directional_updates = client.query_raw("
SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
SELECT DISTINCT ON (short_channel_id) *
FROM (
SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
FROM channel_updates
WHERE short_channel_id = any($1)
ORDER BY short_channel_id ASC, direction ASC, seen ASC
) AS directional_last_seens
ORDER BY short_channel_id ASC, seen DESC
) AS distinct_chans
WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
SELECT scids.short_channel_id, CAST(EXTRACT('epoch' from GREATEST(dir0.seen, dir1.seen)) AS BIGINT) AS seen
FROM unnest($1::bigint[]) AS scids(short_channel_id)
CROSS JOIN LATERAL (
SELECT seen
FROM channel_updates
WHERE short_channel_id = scids.short_channel_id AND direction = false
ORDER BY seen ASC
LIMIT 1
) dir0
CROSS JOIN LATERAL (
SELECT seen
FROM channel_updates
WHERE short_channel_id = scids.short_channel_id AND direction = true
ORDER BY seen ASC
LIMIT 1
) dir1
WHERE GREATEST(dir0.seen, dir1.seen) >= TO_TIMESTAMP($2)
", params).await.unwrap();
let mut pinned_updates = Box::pin(newer_oldest_directional_updates);

Expand All @@ -215,7 +223,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS

newer_oldest_directional_update_count += 1;
}
log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
log_info!(logger, "Fetched {} update rows of the first update in a new direction in {:?}", newer_oldest_directional_update_count, start.elapsed());
}

if include_reminders {
Expand Down Expand Up @@ -252,6 +260,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
3x the timeframe that we consider necessitates reminders.
*/

let start = Instant::now();
let mutated_updates = client.query_raw("
SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
Expand Down Expand Up @@ -321,7 +330,8 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
}
}
log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction in {:?}",
older_latest_directional_update_count, start.elapsed());
}
}

Expand All @@ -333,17 +343,22 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
// there was an update in either direction that happened after the last sync (to avoid
// collecting too many reference updates)
let reference_rows = client.query_raw("
SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
WHERE id IN (
SELECT DISTINCT ON (short_channel_id, direction) id
SELECT cu.id, d.direction, CAST(EXTRACT('epoch' from cu.seen) AS BIGINT) AS seen, cu.blob_signed
FROM (
SELECT DISTINCT short_channel_id
FROM channel_updates
WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
SELECT DISTINCT ON (short_channel_id) short_channel_id
FROM channel_updates
WHERE seen >= TO_TIMESTAMP($1)
)
ORDER BY short_channel_id ASC, direction ASC, seen DESC
)
WHERE seen >= TO_TIMESTAMP($1)
) AS recent_scids
CROSS JOIN (VALUES (false), (true)) AS d(direction)
JOIN LATERAL (
SELECT id, seen, blob_signed
FROM channel_updates
WHERE short_channel_id = recent_scids.short_channel_id
AND direction = d.direction
AND seen < TO_TIMESTAMP($1)
ORDER BY seen DESC
LIMIT 1
) cu ON true
", [last_sync_timestamp_float]).await.unwrap();
let mut pinned_rows = Box::pin(reference_rows);

Expand Down Expand Up @@ -381,13 +396,14 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
reference_row_count += 1;
}

log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
log_info!(logger, "Fetched + processed {} reference rows (delta size: {}) in {:?}",
reference_row_count, delta_set.len(), start.elapsed());

// get all the intermediate channel updates
// (to calculate the set of mutated fields for snapshotting, where intermediate updates may
// have been omitted)

let start = Instant::now();
let intermediate_updates = client.query_raw("
SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
FROM channel_updates
Expand Down Expand Up @@ -468,7 +484,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
}
}
}
log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
log_info!(logger, "Fetched + processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}

pub(super) async fn fetch_node_updates<L: Deref + Clone>(network_graph: &NetworkGraph<L>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) -> NodeDeltaSet where L::Target: Logger {
Expand Down Expand Up @@ -502,12 +518,16 @@ pub(super) async fn fetch_node_updates<L: Deref + Clone>(network_graph: &Network
// get the latest node updates prior to last_sync_timestamp
let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float];
let reference_rows = client.query_raw("
SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed
FROM node_announcements
WHERE
public_key = ANY($1) AND
seen < TO_TIMESTAMP($2)
ORDER BY public_key ASC, seen DESC
SELECT pk.public_key, CAST(EXTRACT('epoch' from na.seen) AS BIGINT) AS seen, na.announcement_signed
FROM unnest($1::varchar[]) AS pk(public_key)
CROSS JOIN LATERAL (
SELECT seen, announcement_signed
FROM node_announcements
WHERE public_key = pk.public_key
AND seen < TO_TIMESTAMP($2)
ORDER BY seen DESC
LIMIT 1
) na
", params).await.unwrap();
let mut pinned_rows = Box::pin(reference_rows);

Expand Down Expand Up @@ -539,7 +559,7 @@ pub(super) async fn fetch_node_updates<L: Deref + Clone>(network_graph: &Network
}


log_info!(logger, "Processed {} node announcement reference rows (delta size: {}) in {:?}",
log_info!(logger, "Fetched + processed {} node announcement reference rows (delta size: {}) in {:?}",
reference_row_count, delta_set.len(), start.elapsed());

let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
Expand All @@ -564,6 +584,7 @@ pub(super) async fn fetch_node_updates<L: Deref + Clone>(network_graph: &Network
// get all the intermediate node updates
// (to calculate the set of mutated fields for snapshotting, where intermediate updates may
// have been omitted)
let start = Instant::now();
let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &effective_threshold_timestamp];
let intermediate_updates = client.query_raw("
SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
Expand Down Expand Up @@ -646,7 +667,7 @@ pub(super) async fn fetch_node_updates<L: Deref + Clone>(network_graph: &Network

previous_node_id = Some(node_id);
}
log_info!(logger, "Processed intermediate node announcement rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
log_info!(logger, "Fetched + processed intermediate node announcement rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());

delta_set
}
Expand Down
12 changes: 6 additions & 6 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,8 @@ async fn test_unidirectional_intermediate_update_consideration() {
let serialization = serialize_delta(&delta, 1, logger.clone());

logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed 1 reference rows", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed intermediate rows (2)", 1);

assert_eq!(serialization.message_count, 3);
assert_eq!(serialization.channel_announcement_count, 1);
Expand Down Expand Up @@ -578,8 +578,8 @@ async fn test_bidirectional_intermediate_update_consideration() {
let serialization = serialize_delta(&delta, 1, logger.clone());

logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed 2 reference rows", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed intermediate rows (2)", 1);

assert_eq!(serialization.message_count, 1);
assert_eq!(serialization.channel_announcement_count, 0);
Expand Down Expand Up @@ -662,8 +662,8 @@ async fn test_channel_reminders() {

logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 4 update rows of the latest update in the less recently updated direction", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed 2 reference rows", 1);
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched + processed intermediate rows (2)", 1);

assert_eq!(serialization.message_count, 4);
assert_eq!(serialization.channel_announcement_count, 0);
Expand Down
Loading