diff --git a/Cargo.lock b/Cargo.lock index d4c9188..ea325ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -328,6 +328,7 @@ dependencies = [ "bitcoin", "futures", "indexer", + "jsonrpc", "rayon", "redb", "reqwest", @@ -366,6 +367,8 @@ dependencies = [ "miniscript", "rand 0.8.5", "rand_core 0.6.4", + "rand 0.8.5", + "rand_core 0.6.4", ] [[package]] diff --git a/cli/v2/src/main.rs b/cli/v2/src/main.rs index fae2575..8bd5af6 100644 --- a/cli/v2/src/main.rs +++ b/cli/v2/src/main.rs @@ -10,7 +10,8 @@ use bdk_sp::{ self, address::NetworkUnchecked, bip32, - consensus::Decodable, + consensus::{deserialize, Decodable}, + hashes::Hash, hex::{DisplayHex, FromHex}, key::Secp256k1, script::PushBytesBuf, @@ -34,6 +35,7 @@ use bdk_sp_oracles::{ TrustedPeer, UnboundedReceiver, Warning, }, filters::kyoto::{FilterEvent, FilterSubscriber}, + frigate::{FrigateClient, History, SubscribeRequest, UnsubscribeRequest}, tweaks::blindbit::{BlindbitSubscriber, TweakEvent}, }; use bdk_sp_wallet::{ @@ -161,6 +163,15 @@ pub enum Commands { #[clap(long)] hash: Option, }, + + ScanFrigate { + #[clap(flatten)] + rpc_args: RpcArgs, + /// The scan private key for which outputs will be scanned for. + #[clap(long)] + start: Option, + }, + Create { /// Network #[clap(long, short, default_value = "signet")] @@ -567,6 +578,146 @@ async fn main() -> anyhow::Result<()> { ); } } + Commands::ScanFrigate { rpc_args, start } => { + // The implementation done here differ from what mentionned in the section https://github.com/sparrowwallet/frigate/tree/master?tab=readme-ov-file#blockchainsilentpaymentssubscribe + // We are doing a one time scanning only. So instead of calling `blockchain.scripthash.subscribe` on each script from the wallet, + // we just subscribe and read the scanning result from the stream. On each result received we update the wallet state and once scanning progress reaches 1.0 (100%) we stop. + + let mut client = FrigateClient::connect(&rpc_args.url) + .await + .unwrap() + .with_timeout(tokio::time::Duration::from_secs(600)); + + let labels = wallet + .indexer() + .index() + .num_to_label + .clone() + .into_keys() + .collect::>(); + let labels = if !labels.is_empty() { + Some(labels) + } else { + None + }; + + let subscribe_params = SubscribeRequest { + scan_priv_key: *wallet.indexer().scan_sk(), + spend_pub_key: *wallet.indexer().spend_pk(), + start_height: start, + labels, + }; + + // Attempt to subscribe; any timeout will trigger unsubscribe automatically. + match client.subscribe_with_timeout(&subscribe_params).await { + Ok(Some((histories, progress))) => { + tracing::info!( + "Initial subscription result: {} histories, progress {}", + histories.len(), + progress + ); + } + Ok(None) => { + tracing::info!("Subscription acknowledged, awaiting notifications"); + } + Err(e) => { + tracing::error!("Subscribe failed: {}", e); + return Err(e.into()); + } + } + + tracing::info!("Starting frigate scanning loop..."); + loop { + match client.read_from_stream(4096).await { + Ok(subscribe_result) => { + if subscribe_result["params"].is_object() { + let histories: Vec = serde_json::from_value( + subscribe_result["params"]["history"].clone(), + )?; + let progress = subscribe_result["params"]["progress"] + .as_f64() + .unwrap_or(0.0) as f32; + + let mut secrets_by_height: HashMap> = + HashMap::new(); + + tracing::debug!("Received history {:#?}", histories); + + histories.iter().for_each(|h| { + secrets_by_height + .entry(h.height) + .and_modify(|v| { + v.insert(h.tx_hash, h.tweak_key); + }) + .or_insert(HashMap::from([(h.tx_hash, h.tweak_key)])); + }); + + // Filter when the height is 0, because that would mean mempool transaction + for secret in secrets_by_height.into_iter().filter(|v| v.0 > 0) { + // Since frigate doesn't provide a blockchain.getblock we will mimick that here + // By constructing a block from the block header and the list of transactions + // received from the scan request + let mut raw_blk = client.get_block_header(secret.0).await.unwrap(); + raw_blk.push_str("00"); + + // Push dummy coinbase + let dummy_coinbase = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000"; + let coinbase: Transaction = + deserialize(&Vec::::from_hex(dummy_coinbase).unwrap()) + .unwrap(); + let mut block: Block = + deserialize(&Vec::::from_hex(&raw_blk).unwrap()).unwrap(); + + let mut blockhash = BlockHash::all_zeros(); + + let mut txs: Vec = vec![coinbase]; + for key in secret.1.keys() { + let tx_result = + client.get_transaction(key.to_string()).await.unwrap(); + let tx: Transaction = + deserialize(&Vec::::from_hex(&tx_result.1).unwrap()) + .unwrap(); + txs.push(tx); + + blockhash = BlockHash::from_str(&tx_result.0).unwrap(); + } + + block.txdata = txs; + tracing::debug!("Final block {:?}", block); + wallet.apply_block_relevant(&block, secret.1, secret.0); + + tracing::debug!("Checkpoint hash {blockhash:?}"); + let checkpoint = wallet.chain().tip().insert(BlockId { + height: secret.0, + hash: blockhash, + }); + wallet.update_chain(checkpoint); + } + + tracing::info!("Progress {progress}"); + // Check the progress + if progress >= 1.0 { + tracing::info!("Scanning completed"); + break; + } + } + } + Err(e) if e.to_string().contains("timed out") => { + tracing::warn!("read_from_stream timeout, exiting scan"); + let unsubscribe_request = UnsubscribeRequest { + scan_privkey: *wallet.indexer().scan_sk(), + spend_pubkey: *wallet.indexer().spend_pk(), + }; + let _ = client.unsubscribe(&unsubscribe_request).await; + break; + } + Err(e) => { + tracing::error!("read_from_stream error: {}", e); + return Err(e.into()); + } + } + } + } Commands::Balance => { fn print_balances<'a>( title_str: &'a str, diff --git a/oracles/Cargo.toml b/oracles/Cargo.toml index 2313138..f3ba162 100644 --- a/oracles/Cargo.toml +++ b/oracles/Cargo.toml @@ -13,9 +13,10 @@ redb = "2.4.0" rayon = "1.11.0" reqwest = { version = "0.12.23", features = ["json", "rustls-tls", "http2", "charset"], default-features = false } serde = { version = "1.0.219", features = ["serde_derive"] } -serde_json = "1.0.142" +serde_json = { version = "1.0.142", features = ["raw_value"]} url = "2.5.4" tracing = "0.1.41" +jsonrpc = "=0.18.0" [lints] workspace = true diff --git a/oracles/src/frigate/mod.rs b/oracles/src/frigate/mod.rs new file mode 100644 index 0000000..0266116 --- /dev/null +++ b/oracles/src/frigate/mod.rs @@ -0,0 +1,296 @@ +use bip157::tokio::io::{AsyncReadExt, AsyncWriteExt}; +use bip157::tokio::net::TcpStream; +use bip157::tokio::time::{timeout, Duration}; +use bitcoin::secp256k1::{PublicKey, SecretKey}; +use bitcoin::Txid; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug)] +pub enum FrigateError { + JsonRpc(jsonrpc::Error), + ParseUrl(url::ParseError), + Serde(serde_json::Error), + Generic(String), +} + +impl From for FrigateError { + fn from(value: serde_json::Error) -> Self { + FrigateError::Serde(value) + } +} + +impl From for FrigateError { + fn from(value: url::ParseError) -> Self { + Self::ParseUrl(value) + } +} + +impl From for FrigateError { + fn from(value: jsonrpc::Error) -> Self { + Self::JsonRpc(value) + } +} + +impl From for FrigateError { + fn from(value: std::io::Error) -> Self { + Self::Generic(format!("Generic error {:?}", value)) + } +} + +impl std::fmt::Display for FrigateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FrigateError::Generic(str) => write!(f, "{str}"), + _ => write!(f, "Something wrong happened"), + } + } +} +impl std::error::Error for FrigateError {} + +pub struct FrigateClient { + pub host_url: String, + client: Box, + pub request_timeout: Duration, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct History { + pub height: u32, + pub tx_hash: Txid, + pub tweak_key: PublicKey, +} + +#[derive(Serialize, Deserialize)] +pub struct NotifPayload { + scan_private_key: SecretKey, + spend_public_key: PublicKey, + address: String, + labels: Option>, + start_height: u32, + progress: f32, + history: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct SubscribeRequest { + pub scan_priv_key: SecretKey, + pub spend_pub_key: PublicKey, + pub start_height: Option, + pub labels: Option>, +} + +#[derive(Serialize, Deserialize)] +pub struct UnsubscribeRequest { + pub scan_privkey: SecretKey, + pub spend_pubkey: PublicKey, +} + +#[derive(Serialize, Deserialize)] +pub struct GetRequest { + pub tx_hash: Txid, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct RequestPayload { + pub method: String, + pub params: Value, + pub id: serde_json::Value, + pub jsonrpc: String, +} + +const SUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.subscribe"; +const UNSUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.unsubscribe"; +const GET_RPC_METHOD: &str = "blockchain.transaction.get"; +const VERSION_RPC_METHOD: &str = "server.version"; +const _SUBSCRIBE_OWNED_OUTPUTS: &str = "blockchain.scripthash.subscribe"; + +impl FrigateClient { + pub async fn connect(host_url: &str) -> Result { + let stream = TcpStream::connect(host_url) + .await + .map_err(|_| FrigateError::Generic("Can't connect to socket".to_string()))?; + + Ok(Self { + host_url: host_url.to_string(), + client: Box::new(stream), + request_timeout: Duration::from_secs(10), + }) + } + + /// Sets a custom request timeout for this client. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + pub async fn read_from_stream(&mut self, size: usize) -> Result { + let mut buffer = vec![0; size]; + let n = self.client.read(&mut buffer).await?; + + tracing::debug!("Read bytes from stream {n}"); + match n { + 0 => Err(FrigateError::Generic("Nothing read".to_string())), + _ => { + let response_str = String::from_utf8_lossy(&buffer[..n]); + let result: Value = + serde_json::from_str(&response_str).map_err(FrigateError::Serde)?; + + Ok(result) + } + } + } + + async fn send_request(&mut self, req_bytes: &[u8]) -> Result { + match timeout(self.request_timeout, async { + self.client.write_all(req_bytes).await?; + self.client.write_all(b"\n").await?; + self.client.flush().await?; + self.read_from_stream(4096).await + }) + .await + { + Ok(res) => res, + Err(_) => Err(FrigateError::Generic(format!( + "request timed out after {:?}", + self.request_timeout + ))), + } + } + + pub async fn get_block_header(&mut self, height: u32) -> Result { + let params = vec![height]; + let req = RequestPayload { + method: "blockchain.block.header".to_string(), + params: serde_json::json!(params), + id: serde_json::Value::from(1000), + jsonrpc: "2.0".to_string(), + }; + let req_bytes = serde_json::to_vec(&req)?; + let res = self.send_request(&req_bytes).await?; + + tracing::debug!("[Block Header Request] Result {:?}", res); + Ok(String::from(res["result"].as_str().unwrap())) + } + + pub async fn get_transaction( + &mut self, + txid: String, + ) -> Result<(String, String), FrigateError> { + let params = vec![txid, "true".to_string()]; + let req = RequestPayload { + method: GET_RPC_METHOD.to_string(), + params: serde_json::json!(params), + id: serde_json::Value::from(51), + jsonrpc: "2.0".to_string(), + }; + let req_bytes = serde_json::to_vec(&req)?; + let res = self.send_request(&req_bytes).await?; + + tracing::debug!("[Get tx Request] Result {:#?}", res); + let blockhash = String::from(res["result"]["blockhash"].as_str().unwrap()); + let hex = String::from(res["result"]["hex"].as_str().unwrap()); + Ok((blockhash, hex)) + } + + pub async fn version(&mut self) -> Result<(), FrigateError> { + let params = vec!["frigate-cli", "1.4"]; + + let req = RequestPayload { + method: VERSION_RPC_METHOD.to_string(), + params: serde_json::json!(params), + id: serde_json::Value::from(71), + jsonrpc: "2.0".to_string(), + }; + + let req_bytes = serde_json::to_vec(&req)?; + self.send_request(&req_bytes).await?; + + Ok(()) + } + + pub async fn subscribe( + &mut self, + req: &SubscribeRequest, + ) -> Result, f32)>, FrigateError> { + self.version().await?; + let mut params: Vec = vec![ + serde_json::json!(req.scan_priv_key), + serde_json::json!(req.spend_pub_key), + ]; + + if let Some(start_height) = req.start_height { + params.push(serde_json::json!(start_height)); + } + + if let Some(labels) = &req.labels { + params.push(serde_json::json!(labels)); + } + + let req = RequestPayload { + method: SUBSCRIBE_RPC_METHOD.to_string(), + params: serde_json::json!(params), + id: serde_json::Value::from(91), + jsonrpc: "2.0".to_string(), + }; + + let req_bytes = serde_json::to_vec(&req)?; + let result = self.send_request(&req_bytes).await?; + + if result["result"].is_string() { + tracing::info!( + "Subscribed to silent payment address: {:?}", + result["result"] + ); + return Ok(None); + } else if result["params"].is_object() { + let histories: Vec = + serde_json::from_value(result["params"]["history"].clone()) + .map_err(FrigateError::Serde)?; + let progress = result["params"]["progress"].as_f64().unwrap_or(0.0) as f32; + return Ok(Some((histories, progress))); + } + + Ok(None) + } + + pub async fn unsubscribe(&mut self, req: &UnsubscribeRequest) -> Result { + let params: Vec = vec![ + serde_json::json!(req.scan_privkey), + serde_json::json!(req.spend_pubkey), + ]; + + self.version().await?; + let req = RequestPayload { + method: UNSUBSCRIBE_RPC_METHOD.to_string(), + params: serde_json::json!(params), + id: serde_json::Value::from(189), + jsonrpc: "2.0".to_string(), + }; + + let req_bytes = serde_json::to_vec(&req)?; + let result = self.send_request(&req_bytes).await?; + + Ok(result["result"].to_string()) + } + + pub async fn subscribe_with_timeout( + &mut self, + req: &SubscribeRequest, + ) -> Result, f32)>, FrigateError> { + match self.subscribe(req).await { + Ok(res) => Ok(res), + Err(e) => { + if e.to_string().contains("timed out") { + tracing::warn!("subscribe request timed out, attempting unsubscribe"); + let unsub = UnsubscribeRequest { + scan_privkey: req.scan_priv_key, + spend_pubkey: req.spend_pub_key, + }; + let _ = self.unsubscribe(&unsub).await; + } + Err(e) + } + } + } +} diff --git a/oracles/src/lib.rs b/oracles/src/lib.rs index 1d5a6a9..93a6146 100644 --- a/oracles/src/lib.rs +++ b/oracles/src/lib.rs @@ -1,3 +1,4 @@ pub mod filters; +pub mod frigate; pub mod tweaks; pub use bip157;