diff --git a/crates/astria-core/src/generated/celestia.core.v1.tx.rs b/crates/astria-core/src/generated/celestia.core.v1.tx.rs new file mode 100644 index 0000000000..7c20152972 --- /dev/null +++ b/crates/astria-core/src/generated/celestia.core.v1.tx.rs @@ -0,0 +1,347 @@ +/// TxStatusRequest is the request type for the TxStatus gRPC method. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TxStatusRequest { + /// this is the hex encoded transaction hash (should be 64 bytes long) + #[prost(string, tag = "1")] + pub tx_id: ::prost::alloc::string::String, +} +impl ::prost::Name for TxStatusRequest { + const NAME: &'static str = "TxStatusRequest"; + const PACKAGE: &'static str = "celestia.core.v1.tx"; + fn full_name() -> ::prost::alloc::string::String { + ::prost::alloc::format!("celestia.core.v1.tx.{}", Self::NAME) + } +} +/// TxStatusResponse is the response type for the TxStatus gRPC method. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TxStatusResponse { + #[prost(int64, tag = "1")] + pub height: i64, + #[prost(uint32, tag = "2")] + pub index: u32, + /// execution_code is returned when the transaction has been committed + /// and returns whether it was successful or errored. A non zero + /// execution code indicated an error. + #[prost(uint32, tag = "3")] + pub execution_code: u32, + /// error log for failed transactions. + #[prost(string, tag = "4")] + pub error: ::prost::alloc::string::String, + /// status is the status of the transaction. + #[prost(string, tag = "5")] + pub status: ::prost::alloc::string::String, +} +impl ::prost::Name for TxStatusResponse { + const NAME: &'static str = "TxStatusResponse"; + const PACKAGE: &'static str = "celestia.core.v1.tx"; + fn full_name() -> ::prost::alloc::string::String { + ::prost::alloc::format!("celestia.core.v1.tx.{}", Self::NAME) + } +} +/// Generated client implementations. +#[cfg(feature = "client")] +pub mod tx_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// Service defines a gRPC service for interacting with transactions. + #[derive(Debug, Clone)] + pub struct TxClient { + inner: tonic::client::Grpc, + } + impl TxClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl TxClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> TxClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + TxClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// TxStatus returns the status of a transaction. There are four possible states: + /// - Committed + /// - Pending + /// - Evicted + /// - Unknown + pub async fn tx_status( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/celestia.core.v1.tx.Tx/TxStatus", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("celestia.core.v1.tx.Tx", "TxStatus")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +#[cfg(feature = "server")] +pub mod tx_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with TxServer. + #[async_trait] + pub trait Tx: Send + Sync + 'static { + /// TxStatus returns the status of a transaction. There are four possible states: + /// - Committed + /// - Pending + /// - Evicted + /// - Unknown + async fn tx_status( + self: std::sync::Arc, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + /// Service defines a gRPC service for interacting with transactions. + #[derive(Debug)] + pub struct TxServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl TxServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for TxServer + where + T: Tx, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/celestia.core.v1.tx.Tx/TxStatus" => { + #[allow(non_camel_case_types)] + struct TxStatusSvc(pub Arc); + impl tonic::server::UnaryService + for TxStatusSvc { + type Response = super::TxStatusResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::tx_status(inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = TxStatusSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for TxServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for TxServer { + const NAME: &'static str = "celestia.core.v1.tx.Tx"; + } +} diff --git a/crates/astria-core/src/generated/celestia.core.v1.tx.serde.rs b/crates/astria-core/src/generated/celestia.core.v1.tx.serde.rs new file mode 100644 index 0000000000..beb209057d --- /dev/null +++ b/crates/astria-core/src/generated/celestia.core.v1.tx.serde.rs @@ -0,0 +1,259 @@ +impl serde::Serialize for TxStatusRequest { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.tx_id.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("celestia.core.v1.tx.TxStatusRequest", len)?; + if !self.tx_id.is_empty() { + struct_ser.serialize_field("txId", &self.tx_id)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for TxStatusRequest { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "tx_id", + "txId", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + TxId, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "txId" | "tx_id" => Ok(GeneratedField::TxId), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = TxStatusRequest; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct celestia.core.v1.tx.TxStatusRequest") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut tx_id__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::TxId => { + if tx_id__.is_some() { + return Err(serde::de::Error::duplicate_field("txId")); + } + tx_id__ = Some(map_.next_value()?); + } + } + } + Ok(TxStatusRequest { + tx_id: tx_id__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("celestia.core.v1.tx.TxStatusRequest", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for TxStatusResponse { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.height != 0 { + len += 1; + } + if self.index != 0 { + len += 1; + } + if self.execution_code != 0 { + len += 1; + } + if !self.error.is_empty() { + len += 1; + } + if !self.status.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("celestia.core.v1.tx.TxStatusResponse", len)?; + if self.height != 0 { + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("height", ToString::to_string(&self.height).as_str())?; + } + if self.index != 0 { + struct_ser.serialize_field("index", &self.index)?; + } + if self.execution_code != 0 { + struct_ser.serialize_field("executionCode", &self.execution_code)?; + } + if !self.error.is_empty() { + struct_ser.serialize_field("error", &self.error)?; + } + if !self.status.is_empty() { + struct_ser.serialize_field("status", &self.status)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for TxStatusResponse { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "height", + "index", + "execution_code", + "executionCode", + "error", + "status", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Height, + Index, + ExecutionCode, + Error, + Status, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "height" => Ok(GeneratedField::Height), + "index" => Ok(GeneratedField::Index), + "executionCode" | "execution_code" => Ok(GeneratedField::ExecutionCode), + "error" => Ok(GeneratedField::Error), + "status" => Ok(GeneratedField::Status), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = TxStatusResponse; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct celestia.core.v1.tx.TxStatusResponse") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut height__ = None; + let mut index__ = None; + let mut execution_code__ = None; + let mut error__ = None; + let mut status__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Height => { + if height__.is_some() { + return Err(serde::de::Error::duplicate_field("height")); + } + height__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Index => { + if index__.is_some() { + return Err(serde::de::Error::duplicate_field("index")); + } + index__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::ExecutionCode => { + if execution_code__.is_some() { + return Err(serde::de::Error::duplicate_field("executionCode")); + } + execution_code__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Error => { + if error__.is_some() { + return Err(serde::de::Error::duplicate_field("error")); + } + error__ = Some(map_.next_value()?); + } + GeneratedField::Status => { + if status__.is_some() { + return Err(serde::de::Error::duplicate_field("status")); + } + status__ = Some(map_.next_value()?); + } + } + } + Ok(TxStatusResponse { + height: height__.unwrap_or_default(), + index: index__.unwrap_or_default(), + execution_code: execution_code__.unwrap_or_default(), + error: error__.unwrap_or_default(), + status: status__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("celestia.core.v1.tx.TxStatusResponse", FIELDS, GeneratedVisitor) + } +} diff --git a/crates/astria-core/src/generated/mod.rs b/crates/astria-core/src/generated/mod.rs index 1fef28c5e7..b9eae5191b 100644 --- a/crates/astria-core/src/generated/mod.rs +++ b/crates/astria-core/src/generated/mod.rs @@ -192,14 +192,29 @@ pub mod astria { #[path = ""] pub mod celestia { - #[path = "celestia.blob.v1.rs"] - pub mod v1 { - include!("celestia.blob.v1.rs"); + pub mod blob { + #[path = "celestia.blob.v1.rs"] + pub mod v1 { + include!("celestia.blob.v1.rs"); - #[cfg(feature = "serde")] - mod _serde_impl { - use super::*; - include!("celestia.blob.v1.serde.rs"); + #[cfg(feature = "serde")] + mod _serde_impl { + use super::*; + include!("celestia.blob.v1.serde.rs"); + } + } + } + pub mod core { + pub mod v1 { + pub mod tx { + include!("celestia.core.v1.tx.rs"); + + #[cfg(feature = "serde")] + mod _serde_impl { + use super::*; + include!("celestia.core.v1.tx.serde.rs"); + } + } } } } diff --git a/crates/astria-sequencer-relayer/CHANGELOG.md b/crates/astria-sequencer-relayer/CHANGELOG.md index 0bd6e4fa60..ef7f6dd1bf 100644 --- a/crates/astria-sequencer-relayer/CHANGELOG.md +++ b/crates/astria-sequencer-relayer/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Update `idna` dependency to resolve cargo audit warning [#1869](https://github.com/astriaorg/astria/pull/1869). +- Use `TxStatus` gRPC to confirm transaction commitment instead of `GetTx` [#1940](https://github.com/astriaorg/astria/pull/1940). ## [1.0.0] - 2024-10-25 diff --git a/crates/astria-sequencer-relayer/src/metrics.rs b/crates/astria-sequencer-relayer/src/metrics.rs index 85491bd751..a41a5012e9 100644 --- a/crates/astria-sequencer-relayer/src/metrics.rs +++ b/crates/astria-sequencer-relayer/src/metrics.rs @@ -26,6 +26,8 @@ pub struct Metrics { celestia_fees_total_utia: Gauge, celestia_fees_utia_per_uncompressed_blob_byte: Gauge, celestia_fees_utia_per_compressed_blob_byte: Gauge, + celestia_evicted_transaction_count: Counter, + celestia_unknown_status_transaction_count: Counter, } impl Metrics { @@ -88,6 +90,14 @@ impl Metrics { pub(crate) fn set_celestia_fees_utia_per_compressed_blob_byte(&self, utia: f64) { self.celestia_fees_utia_per_compressed_blob_byte.set(utia); } + + pub(crate) fn increment_celestia_evicted_transaction_count(&self) { + self.celestia_evicted_transaction_count.increment(1); + } + + pub(crate) fn increment_celestia_unknown_status_transaction_count(&self) { + self.celestia_unknown_status_transaction_count.increment(1); + } } impl telemetry::Metrics for Metrics { @@ -213,6 +223,19 @@ impl telemetry::Metrics for Metrics { submission", )? .register()?; + let celestia_evicted_transaction_count = builder + .new_counter_factory( + CELESTIA_EVICTED_TRANSACTION_COUNT, + "The number of transactions evicted from the Celestia mempool", + )? + .register()?; + let celestia_unknown_status_transaction_count = builder + .new_counter_factory( + CELESTIA_UNKNOWN_STATUS_TRANSACTION_COUNT, + "The number of transactions whose status in the Celestia mempool remains unknown \ + after 10s", + )? + .register()?; Ok(Self { celestia_submission_height, @@ -230,6 +253,8 @@ impl telemetry::Metrics for Metrics { celestia_fees_total_utia, celestia_fees_utia_per_uncompressed_blob_byte, celestia_fees_utia_per_compressed_blob_byte, + celestia_evicted_transaction_count, + celestia_unknown_status_transaction_count, }) } } @@ -249,7 +274,9 @@ metric_names!(const METRICS_NAMES: COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, CELESTIA_FEES_TOTAL_UTIA, CELESTIA_FEES_UTIA_PER_UNCOMPRESSED_BLOB_BYTE, - CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE + CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE, + CELESTIA_EVICTED_TRANSACTION_COUNT, + CELESTIA_UNKNOWN_STATUS_TRANSACTION_COUNT, ); #[cfg(test)] @@ -302,5 +329,13 @@ mod tests { CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE, "celestia_fees_utia_per_compressed_blob_byte", ); + assert_const( + CELESTIA_EVICTED_TRANSACTION_COUNT, + "celestia_evicted_transaction_count", + ); + assert_const( + CELESTIA_UNKNOWN_STATUS_TRANSACTION_COUNT, + "celestia_unknown_status_transaction_count", + ); } } diff --git a/crates/astria-sequencer-relayer/src/relayer/builder.rs b/crates/astria-sequencer-relayer/src/relayer/builder.rs index fd4d3fbf17..f938cb3cf1 100644 --- a/crates/astria-sequencer-relayer/src/relayer/builder.rs +++ b/crates/astria-sequencer-relayer/src/relayer/builder.rs @@ -77,8 +77,14 @@ impl Builder { .wrap_err("failed parsing provided celestia app grpc endpoint as Uri")?; let celestia_keys = CelestiaKeys::from_path(celestia_app_key_file) .wrap_err("failed to get celestia keys from file")?; - CelestiaClientBuilder::new(celestia_chain_id, uri, celestia_keys, state.clone()) - .wrap_err("failed to create celestia client builder")? + CelestiaClientBuilder::new( + celestia_chain_id, + uri, + celestia_keys, + state.clone(), + metrics, + ) + .wrap_err("failed to create celestia client builder")? }; Ok(super::Relayer { diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/builder.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/builder.rs index 1f78e81b51..7c799cb5b7 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/builder.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/builder.rs @@ -3,12 +3,15 @@ use std::{ time::Duration, }; -use astria_core::generated::cosmos::{ - base::tendermint::v1beta1::{ - service_client::ServiceClient as NodeInfoClient, - GetNodeInfoRequest, +use astria_core::generated::{ + celestia::core::v1::tx::tx_client::TxClient as TxStatusClient, + cosmos::{ + base::tendermint::v1beta1::{ + service_client::ServiceClient as NodeInfoClient, + GetNodeInfoRequest, + }, + tx::v1beta1::service_client::ServiceClient as TxClient, }, - tx::v1beta1::service_client::ServiceClient as TxClient, }; use http::Uri; use tendermint::account::Id as AccountId; @@ -30,6 +33,7 @@ use super::{ CelestiaKeys, GrpcResponseError, }; +use crate::Metrics; /// All gRPCs will time out with the given duration. const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); @@ -77,6 +81,7 @@ pub(in crate::relayer) struct Builder { address: Bech32Address, /// A handle to the mutable state of the relayer. state: Arc, + metrics: &'static Metrics, } impl Builder { @@ -86,6 +91,7 @@ impl Builder { uri: Uri, signing_keys: CelestiaKeys, state: Arc, + metrics: &'static Metrics, ) -> Result { let grpc_channel = Endpoint::from(uri).timeout(REQUEST_TIMEOUT).connect_lazy(); let address = bech32_encode(&signing_keys.address)?; @@ -95,6 +101,7 @@ impl Builder { signing_keys, address, state, + metrics, }) } @@ -109,6 +116,7 @@ impl Builder { signing_keys, address, state, + metrics, } = self; if received_celestia_chain_id != configured_celestia_chain_id { @@ -122,12 +130,15 @@ impl Builder { state.set_celestia_connected(true); let tx_client = TxClient::new(grpc_channel.clone()); + let tx_status_client = TxStatusClient::new(grpc_channel.clone()); Ok(CelestiaClient { grpc_channel, tx_client, + tx_status_client, signing_keys, address, chain_id: received_celestia_chain_id, + metrics, }) } diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs index 9309694d31..50e98fdfc0 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/error.rs @@ -77,26 +77,9 @@ pub(in crate::relayer) enum TrySubmitError { namespace: String, log: String, }, - /// The celestia app responded with the given error status to a `GetTxRequest`. - #[error("failed to get transaction")] - FailedToGetTx(#[source] GrpcResponseError), - /// The get transaction response was empty. - #[error("the get transaction response was empty")] - EmptyGetTxResponse, - /// The get transaction response contains an error code. - #[error( - "get transaction response contains error code `{code}`, tx `{tx_hash}`, namespace \ - `{namespace}`, log: {log}" - )] - GetTxResponseErrorCode { - tx_hash: String, - code: u32, - namespace: String, - log: String, - }, - /// The get transaction response specified a negative block height. - #[error("get transaction response specifies a negative block height ({0})")] - GetTxResponseNegativeBlockHeight(i64), + /// The transaction was either evicted from the mempool or the call to `TxStatus` failed. + #[error("failed to confirm transaction submission")] + FailedToConfirmSubmission(#[source] ConfirmSubmissionError), } /// A gRPC status representing an error response from an RPC call. @@ -137,3 +120,25 @@ impl std::error::Error for GrpcResponseError { #[derive(Error, Clone, Debug)] #[error(transparent)] pub(in crate::relayer) struct ProtobufDecodeError(#[from] DecodeError); + +/// An error in getting the status of a transaction via RPC `TxStatus`. +#[derive(Debug, Clone, Error)] +pub(in crate::relayer) enum TxStatusError { + #[error("received unfamiliar response for tx `{tx_hash}` from `TxStatus`: {status}")] + UnfamiliarStatus { status: String, tx_hash: String }, + #[error("failed to get transaction status")] + FailedToGetTxStatus(#[source] GrpcResponseError), +} + +/// An error in confirming the submission of a transaction. +#[derive(Debug, Clone, Error)] +pub(in crate::relayer) enum ConfirmSubmissionError { + #[error("tx `{tx_hash}` evicted from mempool")] + Evicted { tx_hash: String }, + #[error("received `UNKNOWN` status from `TxStatus` for tx: {tx_hash}")] + StatusUnknown { tx_hash: String }, + #[error(transparent)] + TxStatus(TxStatusError), + #[error("received negative block height from Celestia: {height}")] + NegativeHeight { height: i64 }, +} diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs index 83d15478ee..e9ad5b1fb6 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs @@ -22,11 +22,17 @@ use std::{ }; use astria_core::generated::{ - celestia::v1::{ - query_client::QueryClient as BlobQueryClient, - MsgPayForBlobs, - Params as BlobParams, - QueryParamsRequest as QueryBlobParamsRequest, + celestia::{ + blob::v1::{ + query_client::QueryClient as BlobQueryClient, + MsgPayForBlobs, + Params as BlobParams, + QueryParamsRequest as QueryBlobParamsRequest, + }, + core::v1::tx::{ + tx_client::TxClient as TxStatusClient, + TxStatusRequest, + }, }, cosmos::{ auth::v1beta1::{ @@ -57,8 +63,6 @@ use astria_core::generated::{ BroadcastTxRequest, BroadcastTxResponse, Fee, - GetTxRequest, - GetTxResponse, ModeInfo, SignDoc, SignerInfo, @@ -71,7 +75,6 @@ use astria_core::generated::{ BlobTx, }, }; -use astria_eyre::eyre::Report; pub(super) use builder::{ Builder as CelestiaClientBuilder, BuilderError, @@ -79,6 +82,10 @@ pub(super) use builder::{ use celestia_cost_params::CelestiaCostParams; pub(crate) use celestia_keys::CelestiaKeys; use celestia_types::Blob; +use error::{ + ConfirmSubmissionError, + TxStatusError, +}; pub(super) use error::{ GrpcResponseError, ProtobufDecodeError, @@ -120,25 +127,43 @@ use tracing::{ Level, }; +use crate::Metrics; + // From https://github.com/celestiaorg/cosmos-sdk/blob/v1.18.3-sdk-v0.46.14/types/errors/errors.go#L75 const INSUFFICIENT_FEE_CODE: u32 = 13; +// From https://github.com/celestiaorg/celestia-core/blob/d2ca0a2870973e17eadb62a763788bba1f04a1fb/rpc/core/tx.go#L20-L25 +const TX_STATUS_UNKNOWN: &str = "UNKNOWN"; +const TX_STATUS_PENDING: &str = "PENDING"; +const TX_STATUS_EVICTED: &str = "EVICTED"; +const TX_STATUS_COMMITTED: &str = "COMMITTED"; + +enum TxStatus { + Unknown, + Pending, + Evicted, + Committed(i64), +} + /// A client using the gRPC interface of a remote Celestia app to submit blob data to the Celestia /// chain. /// /// It is constructed using a [`CelestiaClientBuilder`]. -#[derive(Debug, Clone)] +#[derive(Clone)] pub(super) struct CelestiaClient { /// The inner `tonic` gRPC channel shared by the various generated gRPC clients. grpc_channel: Channel, - /// A gRPC client to broadcast and get transactions. + /// A gRPC client to broadcast transactions. tx_client: TxClient, + /// A gRPC client for querying transaction status. + tx_status_client: TxStatusClient, /// The crypto keys associated with our Celestia account. signing_keys: CelestiaKeys, /// The Bech32-encoded address of our Celestia account. address: Bech32Address, /// The Celestia network ID. chain_id: String, + metrics: &'static Metrics, } impl CelestiaClient { @@ -209,8 +234,8 @@ impl CelestiaClient { let hex_encoded_tx_hash = self.broadcast_tx(blob_tx).await?; if hex_encoded_tx_hash != blob_tx_hash.to_hex() { // This is not a critical error. Worst case, we restart the process now and try for a - // short while to `GetTx` for this tx using the wrong hash, resulting in a likely - // duplicate submission of this set of blobs. + // short while to get `TxStatus` for this tx using the wrong hash, resulting in a + // likely duplicate submission of this set of blobs. warn!( "tx hash `{hex_encoded_tx_hash}` returned from celestia app is not the same as \ the locally calculated one `{blob_tx_hash}`; submission file has invalid data" @@ -218,15 +243,16 @@ impl CelestiaClient { } info!(tx_hash = %hex_encoded_tx_hash, "broadcast blob transaction succeeded"); - let height = self.confirm_submission(hex_encoded_tx_hash).await; - Ok(height) + self.confirm_submission(hex_encoded_tx_hash) + .await + .map_err(TrySubmitError::FailedToConfirmSubmission) } - /// Repeatedly sends `GetTx` until a successful response is received or `timeout` duration has - /// elapsed. + /// Repeatedly sends `TxStatus` until a successful response is received or `timeout` duration + /// has elapsed. /// /// Returns the height of the Celestia block in which the blobs were submitted, or `None` if - /// timed out. + /// timed out or an error was returned. #[instrument(skip_all)] pub(super) async fn confirm_submission_with_timeout( &mut self, @@ -236,6 +262,7 @@ impl CelestiaClient { tokio::time::timeout(timeout, self.confirm_submission(blob_tx_hash.to_hex())) .await .ok() + .and_then(Result::ok) } #[instrument(skip_all, err)] @@ -322,67 +349,128 @@ impl CelestiaClient { lowercase_hex_encoded_tx_hash_from_response(response) } - /// Returns `Some(height)` if the tx submission has completed, or `None` if it is still - /// pending. - #[instrument(skip_all, err)] - async fn get_tx(&mut self, hex_encoded_tx_hash: String) -> Result, TrySubmitError> { - let request = GetTxRequest { - hash: hex_encoded_tx_hash, - }; - let response = self.tx_client.get_tx(request).await; - // trace-level logging, so using Debug format is ok. - #[cfg_attr(dylint_lib = "tracing_debug_field", allow(tracing_debug_field))] - { - trace!(?response); + /// Returns the response of `TxStatus` RPC call given a transaction's hash. If the transaction + /// is committed, the height of the block in which it was committed will be returned with + /// `TxStatusResponse::Committed`. + /// + /// # Errors + /// Returns an error in the following cases: + /// - The call to `TxStatus` failed. + /// - The status of the transaction is not recognized. + #[instrument(skip_all, err(level = Level::WARN))] + async fn tx_status(&mut self, hex_encoded_tx_hash: String) -> Result { + let response = self + .tx_status_client + .tx_status(TxStatusRequest { + tx_id: hex_encoded_tx_hash.clone(), + }) + .await + .map_err(|e| TxStatusError::FailedToGetTxStatus(e.into()))?; + match response.get_ref().status.as_str() { + TX_STATUS_UNKNOWN => Ok(TxStatus::Unknown), + TX_STATUS_PENDING => Ok(TxStatus::Pending), + TX_STATUS_EVICTED => Ok(TxStatus::Evicted), + TX_STATUS_COMMITTED => Ok(TxStatus::Committed(response.get_ref().height)), + _ => Err(TxStatusError::UnfamiliarStatus { + status: response.get_ref().status.to_string(), + tx_hash: hex_encoded_tx_hash, + }), } - block_height_from_response(response) } - /// Repeatedly sends `GetTx` until a successful response is received. Returns the height of the - /// Celestia block in which the blobs were submitted. - #[instrument(skip_all, fields(hex_encoded_tx_hash))] - async fn confirm_submission(&mut self, hex_encoded_tx_hash: String) -> u64 { - // The min seconds to sleep after receiving a GetTx response and sending the next request. - const MIN_POLL_INTERVAL_SECS: u64 = 1; - // The max seconds to sleep after receiving a GetTx response and sending the next request. - const MAX_POLL_INTERVAL_SECS: u64 = 12; - // How long to wait after starting `confirm_submission` before starting to log errors. - const START_LOGGING_DELAY: Duration = Duration::from_secs(12); - // The minimum duration between logging errors. - const LOG_ERROR_INTERVAL: Duration = Duration::from_secs(5); + /// Repeatedly calls `TxStatus` until the transaction is committed, returning the height of the + /// block in which the transaction was included. + /// + /// # Errors + /// Returns an error in the following cases: + /// - The transaction was evicted from the mempool. + /// - The status of the transaction is unknown. + /// - An error occurred while retrieving the transaction's status. + #[instrument(skip_all, fields(hex_encoded_tx_hash), err(level = Level::DEBUG))] + async fn confirm_submission( + &mut self, + hex_encoded_tx_hash: String, + ) -> Result { + // The minimum amount of time to sleep after receiving a TxStatus response and sending the + // next request. + const MIN_POLL_INTERVAL: Duration = Duration::from_secs(1); + // The maximum amount of time to sleep after receiving a TxStatus response and sending the + // next request. + const MAX_POLL_INTERVAL: Duration = Duration::from_secs(6); + // The amount of time to wait before switching to warn level logging instead of debug. + // Corresponds with the Celestia block time. + const START_WARN_DELAY: Duration = Duration::from_secs(6); + // The duration between logs. + const LOG_INTERVAL: Duration = Duration::from_secs(3); + // The maximum amount of time to wait for a transaction to be committed if its status is + // `UNKNOWN`. Corresponds with Celestia block time + 1 second down time. + const MAX_WAIT_FOR_UNKNOWN: Duration = Duration::from_secs(7); let start = Instant::now(); let mut logged_at = start; - let mut log_if_due = |maybe_error: Option| { - if start.elapsed() <= START_LOGGING_DELAY || logged_at.elapsed() <= LOG_ERROR_INTERVAL { + let mut log_if_due = |status: &str| { + if logged_at.elapsed() <= LOG_INTERVAL { return; } - let reason = maybe_error.map_or(Report::msg("transaction still pending"), Report::new); - warn!( - %reason, - tx_hash = %hex_encoded_tx_hash, - elapsed_seconds = start.elapsed().as_secs_f32(), - "waiting to confirm blob submission" - ); + + // If elapsed time since start is under `START_WARN_DELAY`, log at debug level at a + // constant interval. If elapsed time since start is over `START_WARN_DELAY`, this means + // at least one Celestia block has passed and the transaction should have been + // submitted. We then start logging at the warn level. + if start.elapsed() > START_WARN_DELAY { + warn!( + reason = format!("transaction status: {status}"), + tx_hash = %hex_encoded_tx_hash, + elapsed_seconds = start.elapsed().as_secs_f32(), + "waiting to confirm blob submission" + ); + } else { + debug!( + reason = format!("transaction status: {status}"), + tx_hash = %hex_encoded_tx_hash, + elapsed_seconds = start.elapsed().as_secs_f32(), + "waiting to confirm blob submission" + ); + } logged_at = Instant::now(); }; - let mut sleep_secs = MIN_POLL_INTERVAL_SECS; + let mut poll_interval = MIN_POLL_INTERVAL; loop { - tokio::time::sleep(Duration::from_secs(sleep_secs)).await; - match self.get_tx(hex_encoded_tx_hash.clone()).await { - Ok(Some(height)) => return height, - Ok(None) => { - sleep_secs = MIN_POLL_INTERVAL_SECS; - log_if_due(None); + tokio::time::sleep(poll_interval).await; + match self.tx_status(hex_encoded_tx_hash.clone()).await { + Ok(TxStatus::Unknown) => { + if start.elapsed() > MAX_WAIT_FOR_UNKNOWN { + self.metrics + .increment_celestia_unknown_status_transaction_count(); + return Err(ConfirmSubmissionError::StatusUnknown { + tx_hash: hex_encoded_tx_hash, + }); + } + log_if_due("UNKNOWN"); + } + Ok(TxStatus::Pending) => { + log_if_due("PENDING"); + } + Ok(TxStatus::Evicted) => { + self.metrics.increment_celestia_evicted_transaction_count(); + return Err(ConfirmSubmissionError::Evicted { + tx_hash: hex_encoded_tx_hash, + }); + } + Ok(TxStatus::Committed(height)) => { + return height + .try_into() + .map_err(|_| ConfirmSubmissionError::NegativeHeight { + height, + }) } Err(error) => { - sleep_secs = - std::cmp::min(sleep_secs.saturating_mul(2), MAX_POLL_INTERVAL_SECS); - log_if_due(Some(error)); + return Err(ConfirmSubmissionError::TxStatus(error)); } } + poll_interval = std::cmp::min(poll_interval.saturating_mul(2), MAX_POLL_INTERVAL); } } } @@ -494,53 +582,6 @@ fn lowercase_hex_encoded_tx_hash_from_response( Ok(tx_response.txhash) } -/// Extracts the block height from the given response if available, or `None` if the transaction is -/// not available yet. -fn block_height_from_response( - response: Result, Status>, -) -> Result, TrySubmitError> { - let ok_response = match response { - Ok(resp) => resp, - Err(status) => { - // trace-level logging, so using Debug format is ok. - #[cfg_attr(dylint_lib = "tracing_debug_field", allow(tracing_debug_field))] - { - trace!(?status); - } - if status.code() == tonic::Code::NotFound { - trace!(msg = status.message(), "transaction still pending"); - return Ok(None); - } - return Err(TrySubmitError::FailedToGetTx(GrpcResponseError::from( - status, - ))); - } - }; - let tx_response = ok_response - .into_inner() - .tx_response - .ok_or_else(|| TrySubmitError::EmptyGetTxResponse)?; - if tx_response.code != 0 { - let error = TrySubmitError::GetTxResponseErrorCode { - tx_hash: tx_response.txhash, - code: tx_response.code, - namespace: tx_response.codespace, - log: tx_response.raw_log, - }; - return Err(error); - } - if tx_response.height == 0 { - trace!(tx_hash = %tx_response.txhash, "transaction still pending"); - return Ok(None); - } - - let height = u64::try_from(tx_response.height) - .map_err(|_| TrySubmitError::GetTxResponseNegativeBlockHeight(tx_response.height))?; - - debug!(tx_hash = %tx_response.txhash, height, "transaction succeeded"); - Ok(Some(height)) -} - // Copied from https://github.com/celestiaorg/celestia-app/blob/v1.4.0/x/blob/types/payforblob.go#L174 // // `blob_sizes` is the collection of sizes in bytes of all the blobs' `data` fields. diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs index f6c4106230..64c7e1a039 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/tests.rs @@ -70,7 +70,7 @@ fn new_msg_pay_for_blobs_should_fail_for_large_blob() { } #[test] -fn account_from_good_response_should_succeed() { +fn account_from_good_query_account_response_should_succeed() { let base_account = BaseAccount { address: "address".to_string(), pub_key: None, @@ -90,7 +90,7 @@ fn account_from_good_response_should_succeed() { } #[test] -fn account_from_bad_response_should_fail() { +fn account_from_bad_query_account_response_should_fail() { // Should return `FailedToGetAccountInfo` if outer response is an error. let error = account_from_response(Err(Status::internal(""))).unwrap_err(); #[expect( @@ -155,7 +155,7 @@ fn account_from_bad_response_should_fail() { } #[test] -fn min_gas_price_from_good_response_should_succeed() { +fn min_gas_price_from_good_min_gas_price_response_should_succeed() { let min_gas_price = 1234.56_f64; let response = Response::new(MinGasPriceResponse { minimum_gas_price: format!("{min_gas_price}utia"), @@ -171,7 +171,7 @@ fn min_gas_price_from_good_response_should_succeed() { } #[test] -fn min_gas_price_from_bad_response_should_fail() { +fn min_gas_price_from_bad_min_gas_price_response_should_fail() { // Should return `FailedToGetMinGasPrice` if outer response is an error. let error = min_gas_price_from_response(Err(Status::internal(""))).unwrap_err(); #[expect( @@ -231,11 +231,6 @@ impl TxResponseBuilder { Self::default() } - fn with_height(mut self, height: i64) -> Self { - self.height = height; - self - } - fn with_tx_hash>(mut self, tx_hash: T) -> Self { self.tx_hash = tx_hash.as_ref().to_string(); self @@ -276,7 +271,7 @@ impl TxResponseBuilder { } #[test] -fn tx_hash_from_good_response_should_succeed() { +fn tx_hash_from_good_broadcast_tx_response_should_succeed() { let tx_hash = "abc"; let tx_response = TxResponseBuilder::new().with_tx_hash(tx_hash).build(); let response = Response::new(BroadcastTxResponse { @@ -288,7 +283,7 @@ fn tx_hash_from_good_response_should_succeed() { } #[test] -fn tx_hash_from_bad_response_should_fail() { +fn tx_hash_from_bad_broadcast_tx_response_should_fail() { // Should return `FailedToBroadcastTx` if outer response is an error. let error = lowercase_hex_encoded_tx_hash_from_response(Err(Status::internal(""))).unwrap_err(); #[expect( @@ -344,112 +339,6 @@ fn tx_hash_from_bad_response_should_fail() { } } -#[test] -fn block_height_from_good_response_should_succeed() { - let height = 9; - let tx_response = TxResponseBuilder::new().with_height(height).build(); - let response = Response::new(GetTxResponse { - tx: None, - tx_response: Some(tx_response), - }); - - let extracted_height = block_height_from_response(Ok(response)).unwrap(); - assert_eq!(Some(u64::try_from(height).unwrap()), extracted_height); -} - -#[test] -fn block_height_from_bad_response_should_fail() { - // Should return `FailedToGetTx` if outer response is an error other than `NotFound`. - let error = block_height_from_response(Err(Status::internal(""))).unwrap_err(); - #[expect( - clippy::manual_assert, - reason = "`assert!(matches!(..))` provides poor feedback on failure" - )] - if !matches!(error, TrySubmitError::FailedToGetTx(_)) { - panic!("expected `Error::FailedToGetTx`, got {error:?}"); - } - - // Should return `EmptyGetTxResponse` if the inner response's `tx_response` is `None`. - let response = Ok(Response::new(GetTxResponse { - tx: None, - tx_response: None, - })); - let error = block_height_from_response(response).unwrap_err(); - #[expect( - clippy::manual_assert, - reason = "`assert!(matches!(..))` provides poor feedback on failure" - )] - if !matches!(error, TrySubmitError::EmptyGetTxResponse) { - panic!("expected `Error::EmptyGetTxResponse`, got {error:?}"); - } - - // Should return `GetTxResponseErrorCode` if the inner response's `tx_response.code` is not 0. - let tx_hash = "abc"; - let code = 9; - let namespace = "def"; - let log = "ghi"; - let tx_response = TxResponseBuilder::new() - .with_tx_hash(tx_hash) - .with_code(code) - .with_codespace(namespace) - .with_raw_log(log) - .build(); - let response = Ok(Response::new(GetTxResponse { - tx: None, - tx_response: Some(tx_response), - })); - let error = block_height_from_response(response).unwrap_err(); - match error { - TrySubmitError::GetTxResponseErrorCode { - tx_hash: received_tx_hash, - code: received_code, - namespace: received_namespace, - log: received_log, - } => { - assert_eq!(tx_hash, received_tx_hash,); - assert_eq!(code, received_code,); - assert_eq!(namespace, received_namespace,); - assert_eq!(log, received_log,); - } - _ => panic!("expected `GetTxResponseErrorCode` error, but got {error:?}"), - } -} - -#[test] -fn block_height_from_response_with_negative_height_should_fail() { - let height = -9; - let tx_response = TxResponseBuilder::new().with_height(height).build(); - let response = Response::new(GetTxResponse { - tx: None, - tx_response: Some(tx_response), - }); - - let error = block_height_from_response(Ok(response)).unwrap_err(); - match error { - TrySubmitError::GetTxResponseNegativeBlockHeight(received_height) => { - assert_eq!(height, received_height); - } - _ => panic!("expected `GetTxResponseErrorCode` error, but got {error:?}"), - } -} - -#[test] -fn block_height_from_pending_response_should_return_none() { - // Should return `None` if outer response is a `NotFound` error. - let maybe_height = block_height_from_response(Err(Status::not_found(""))).unwrap(); - assert!(maybe_height.is_none()); - - // Should return `None` if the height is 0. - let tx_response = TxResponseBuilder::new().with_height(0).build(); - let response = Response::new(GetTxResponse { - tx: None, - tx_response: Some(tx_response), - }); - - let maybe_height = block_height_from_response(Ok(response)).unwrap(); - assert!(maybe_height.is_none()); -} - #[test] fn should_use_calculated_fee() { // If no last error provided, should use calculated fee. diff --git a/crates/astria-sequencer-relayer/src/relayer/submission.rs b/crates/astria-sequencer-relayer/src/relayer/submission.rs index bc9daa7493..82c8f2a36f 100644 --- a/crates/astria-sequencer-relayer/src/relayer/submission.rs +++ b/crates/astria-sequencer-relayer/src/relayer/submission.rs @@ -34,7 +34,7 @@ use tracing::{ use super::BlobTxHash; /// Represents a submission made to Celestia which has been confirmed as stored via a successful -/// `GetTx` call. +/// `TxStatus` call. #[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)] pub(super) struct CompletedSubmission { /// The height of the Celestia block in which the submission was stored. @@ -322,7 +322,7 @@ impl PreparedSubmission { &self.blob_tx_hash } - /// Returns the maximum duration for which the Celestia app should be polled with `GetTx` + /// Returns the maximum duration for which the Celestia app should be polled with `TxStatus` /// requests to confirm successful storage of the associated `BlobTx`. /// /// This is at least 15 seconds, but up to a maximum of a minute from when the submission was diff --git a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs index aebb0debd4..ec9d32c5f4 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs @@ -303,7 +303,7 @@ impl BlobSubmitter { /// This should only be called where submission state on startup is `Prepared`, meaning we don't yet /// know whether that final submission attempt succeeded or not. /// -/// Internally, this polls `GetTx` for up to one minute. The returned `SubmissionState` is +/// Internally, this polls `TxStatus` for up to one minute. The returned `SubmissionState` is /// guaranteed to be in `Started` state, either holding the heights of the previously prepared /// submission if confirmed by Celestia, or holding the heights of the last known confirmed /// submission in the case of timing out. diff --git a/crates/astria-sequencer-relayer/tests/blackbox/helpers/mock_celestia_app_server.rs b/crates/astria-sequencer-relayer/tests/blackbox/helpers/mock_celestia_app_server.rs index 940592d907..243b5b996d 100644 --- a/crates/astria-sequencer-relayer/tests/blackbox/helpers/mock_celestia_app_server.rs +++ b/crates/astria-sequencer-relayer/tests/blackbox/helpers/mock_celestia_app_server.rs @@ -7,14 +7,24 @@ use std::{ }; use astria_core::generated::{ - celestia::v1::{ - query_server::{ - Query as BlobQueryService, - QueryServer as BlobQueryServer, + celestia::{ + blob::v1::{ + query_server::{ + Query as BlobQueryService, + QueryServer as BlobQueryServer, + }, + Params as BlobParams, + QueryParamsRequest as QueryBlobParamsRequest, + QueryParamsResponse as QueryBlobParamsResponse, + }, + core::v1::tx::{ + tx_server::{ + Tx as TxStatusService, + TxServer as TxStatusServer, + }, + TxStatusRequest, + TxStatusResponse, }, - Params as BlobParams, - QueryParamsRequest as QueryBlobParamsRequest, - QueryParamsResponse as QueryBlobParamsResponse, }, cosmos::{ auth::v1beta1::{ @@ -96,7 +106,7 @@ const QUERY_ACCOUNT_GRPC_NAME: &str = "query_account"; const QUERY_AUTH_PARAMS_GRPC_NAME: &str = "query_auth_params"; const QUERY_BLOB_PARAMS_GRPC_NAME: &str = "query_blob_params"; const MIN_GAS_PRICE_GRPC_NAME: &str = "min_gas_price"; -const GET_TX_GRPC_NAME: &str = "get_tx"; +const TX_STATUS_GRPC_NAME: &str = "tx_status"; const BROADCAST_TX_GRPC_NAME: &str = "broadcast_tx"; pub struct MockCelestiaAppServer { @@ -110,10 +120,10 @@ impl MockCelestiaAppServer { pub async fn spawn(celestia_chain_id: String) -> Self { use tokio_stream::wrappers::TcpListenerStream; - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let local_addr = listener.local_addr().unwrap(); - + let grpc_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = grpc_listener.local_addr().unwrap(); let mock_server = MockServer::new(); + register_get_node_info(&mock_server, celestia_chain_id).await; register_query_account(&mock_server).await; register_query_auth_params(&mock_server).await; @@ -128,8 +138,9 @@ impl MockCelestiaAppServer { .add_service(AuthQueryServer::new(service_impl.clone())) .add_service(BlobQueryServer::new(service_impl.clone())) .add_service(MinGasPriceServer::new(service_impl.clone())) - .add_service(TxServer::new(service_impl)) - .serve_with_incoming(TcpListenerStream::new(listener)) + .add_service(TxServer::new(service_impl.clone())) + .add_service(TxStatusServer::new(service_impl)) + .serve_with_incoming(TcpListenerStream::new(grpc_listener)) .await .wrap_err("gRPC sequencer server failed") }) @@ -153,22 +164,8 @@ impl MockCelestiaAppServer { debug_name: impl Into, ) -> MockGuard { self.prepare_broadcast_tx_response(debug_name) - .mount_as_scoped(&self.mock_server) - .await - } - - pub async fn mount_get_tx_response(&self, height: i64, debug_name: impl Into) { - Self::prepare_get_tx_response(height, debug_name) - .mount(&self.mock_server) - .await; - } - - pub async fn mount_get_tx_response_as_scoped( - &self, - height: i64, - debug_name: impl Into, - ) -> MockGuard { - Self::prepare_get_tx_response(height, debug_name) + .expect(1) + .up_to_n_times(1) .mount_as_scoped(&self.mock_server) .await } @@ -202,29 +199,34 @@ impl MockCelestiaAppServer { .with_name(debug_name) } - fn prepare_get_tx_response(height: i64, debug_name: impl Into) -> Mock { - let debug_name = debug_name.into(); - // We only use the `tx_response.code` and `tx_response.height` fields in the success case. - // The `txhash` would be an actual hex-encoded SHA256 in prod, but here we can just use the - // debug name for ease of debugging. - let tx_response = TxResponse { - height, - txhash: debug_name.clone(), - code: 0, - ..TxResponse::default() - }; - let response = GetTxResponse { - tx: None, - tx_response: Some(tx_response), - }; - Mock::for_rpc_given(GET_TX_GRPC_NAME, message_type::()) - .respond_with(constant_response(response)) - .up_to_n_times(1) - .expect(1) + pub async fn mount_tx_status_response_as_scoped( + &self, + debug_name: impl Into, + status: String, + height: i64, + number_of_times: u64, + ) -> MockGuard { + prepare_tx_status_response(status, height) + .expect(number_of_times) + .up_to_n_times(number_of_times) .with_name(debug_name) + .mount_as_scoped(&self.mock_server) + .await } } +fn prepare_tx_status_response(status: String, height: i64) -> Mock { + Mock::for_rpc_given(TX_STATUS_GRPC_NAME, message_type::()).respond_with( + constant_response(TxStatusResponse { + height, + index: 0, + execution_code: 0, + error: String::new(), + status, + }), + ) +} + /// Registers a handler for all incoming `GetNodeInfoRequest`s which responds with the same /// `GetNodeInfoResponse` every time. async fn register_get_node_info(mock_server: &MockServer, celestia_chain_id: String) { @@ -406,7 +408,7 @@ impl TxService for CelestiaAppServiceImpl { self: Arc, request: Request, ) -> Result, Status> { - self.0.handle_request(GET_TX_GRPC_NAME, request).await + self.0.handle_request("get_tx", request).await } async fn broadcast_tx( @@ -425,3 +427,13 @@ fn extract_blob_namespaces(request: &BroadcastTxRequest) -> Vec { .map(|blob| Namespace::new_v0(blob.namespace_id.as_ref()).unwrap()) .collect() } + +#[async_trait::async_trait] +impl TxStatusService for CelestiaAppServiceImpl { + async fn tx_status( + self: Arc, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.0.handle_request(TX_STATUS_GRPC_NAME, request).await + } +} diff --git a/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs b/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs index 130d0e28a5..55366d6109 100644 --- a/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs +++ b/crates/astria-sequencer-relayer/tests/blackbox/helpers/test_sequencer_relayer.rs @@ -275,32 +275,22 @@ impl TestSequencerRelayer { .await } - /// Mounts a Celestia `GetTx` response. - /// - /// The `debug_name` is assigned to the mock and is output on error to assist with debugging. - /// It is also assigned as the `TxHash` in the request and response. - pub async fn mount_celestia_app_get_tx_response( + /// Mounts a Celestia `TxStatus` response and returns a `wiremock::MockGuard` to allow for + /// awaiting its satisfaction. + pub async fn mount_celestia_app_tx_status_response_as_scoped( &self, - celestia_height: i64, debug_name: impl Into, - ) { - self.celestia_app - .mount_get_tx_response(celestia_height, debug_name) - .await; - } - - /// Mounts a Celestia `GetTx` response and returns a `GrpcMockGuard` to allow for waiting for - /// the mock to be satisfied. - /// - /// The `debug_name` is assigned to the mock and is output on error to assist with debugging. - /// It is also assigned as the `TxHash` in the request and response. - pub async fn mount_celestia_app_get_tx_response_as_scoped( - &self, celestia_height: i64, - debug_name: impl Into, + status: &str, + number_of_times: u64, ) -> GrpcMockGuard { self.celestia_app - .mount_get_tx_response_as_scoped(celestia_height, debug_name) + .mount_tx_status_response_as_scoped( + debug_name, + status.to_string(), + celestia_height, + number_of_times, + ) .await } diff --git a/crates/astria-sequencer-relayer/tests/blackbox/main.rs b/crates/astria-sequencer-relayer/tests/blackbox/main.rs index 93770c5dce..d4ffdf96bb 100644 --- a/crates/astria-sequencer-relayer/tests/blackbox/main.rs +++ b/crates/astria-sequencer-relayer/tests/blackbox/main.rs @@ -11,6 +11,7 @@ use astria_core::{ primitive::v1::RollupId, protocol::test_utils::ConfigureSequencerBlock, }; +use futures::future::join; use helpers::{ SequencerBlockToMount, TestSequencerRelayerConfig, @@ -30,16 +31,16 @@ async fn one_block_is_relayed_to_celestia() { sequencer_relayer .mount_celestia_app_broadcast_tx_response("broadcast tx 1") .await; - let get_tx_guard = sequencer_relayer - .mount_celestia_app_get_tx_response_as_scoped(53, "get tx 1") + let tx_status_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "COMMITTED", 1) .await; // The `MIN_POLL_INTERVAL_SECS` is 1, meaning the relayer waits for 1 second before attempting - // the first `GetTx`, so we wait for 2 seconds. + // the first `TxStatus`, so we wait for 2 seconds. sequencer_relayer .timeout_ms( 2_000, - "waiting for get tx guard", - get_tx_guard.wait_until_satisfied(), + "waiting for tx status guard", + tx_status_guard.wait_until_satisfied(), ) .await; @@ -61,7 +62,7 @@ async fn one_block_is_relayed_to_celestia() { ); } -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn report_degraded_if_block_fetch_fails() { let sequencer_relayer = TestSequencerRelayerConfig::default().spawn_relayer().await; @@ -80,8 +81,8 @@ async fn report_degraded_if_block_fetch_fails() { sequencer_relayer .mount_celestia_app_broadcast_tx_response("broadcast tx 1") .await; - let get_tx_guard = sequencer_relayer - .mount_celestia_app_get_tx_response_as_scoped(53, "get tx 1") + let tx_status_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "COMMITTED", 1) .await; let healthz_status = sequencer_relayer .wait_for_healthz(StatusCode::OK, 2_000, "waiting for first healthz") @@ -90,8 +91,8 @@ async fn report_degraded_if_block_fetch_fails() { sequencer_relayer .timeout_ms( 2_000, - "waiting for get tx guard", - get_tx_guard.wait_until_satisfied(), + "waiting for tx status guard", + tx_status_guard.wait_until_satisfied(), ) .await; @@ -139,14 +140,14 @@ async fn later_height_in_state_leads_to_expected_relay() { sequencer_relayer .mount_celestia_app_broadcast_tx_response("broadcast tx 1") .await; - let get_tx_guard = sequencer_relayer - .mount_celestia_app_get_tx_response_as_scoped(53, "get tx 1") + let tx_status_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "COMMITTED", 1) .await; sequencer_relayer .timeout_ms( 2_000, - "waiting for get tx guard", - get_tx_guard.wait_until_satisfied(), + "waiting for tx status guard", + tx_status_guard.wait_until_satisfied(), ) .await; @@ -193,28 +194,22 @@ async fn three_blocks_are_relayed() { sequencer_relayer .mount_celestia_app_broadcast_tx_response("broadcast tx 1") .await; - sequencer_relayer - .mount_celestia_app_get_tx_response(53, "get tx 1") - .await; sequencer_relayer .mount_celestia_app_broadcast_tx_response("broadcast tx 2") .await; - sequencer_relayer - .mount_celestia_app_get_tx_response(53, "get tx 2") - .await; sequencer_relayer .mount_celestia_app_broadcast_tx_response("broadcast tx 3") .await; - let get_tx_guard = sequencer_relayer - .mount_celestia_app_get_tx_response_as_scoped(53, "get tx 3") + let tx_status_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "COMMITTED", 3) .await; - // Each block will have taken ~1 second due to the delay before each `GetTx`, so use 4.5 + // Each block will have taken ~1 second due to the delay before each `tx_status`, so use 4.5 // seconds. sequencer_relayer .timeout_ms( 4_500, - "waiting for get tx guard", - get_tx_guard.wait_until_satisfied(), + "waiting for tx status guard", + tx_status_guard.wait_until_satisfied(), ) .await; @@ -272,14 +267,14 @@ async fn should_filter_rollup() { sequencer_relayer .mount_celestia_app_broadcast_tx_response("broadcast tx 1") .await; - let get_tx_guard = sequencer_relayer - .mount_celestia_app_get_tx_response_as_scoped(53, "get tx 1") + let tx_status_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "COMMITTED", 1) .await; sequencer_relayer .timeout_ms( 10_000, - "waiting for get tx guard", - get_tx_guard.wait_until_satisfied(), + "waiting for tx status guard", + tx_status_guard.wait_until_satisfied(), ) .await; @@ -325,14 +320,14 @@ async fn should_shut_down() { // process. sequencer_relayer.relayer_shutdown_handle.take(); - let get_tx_guard = sequencer_relayer - .mount_celestia_app_get_tx_response_as_scoped(53, "get tx 1") + let tx_status_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "COMMITTED", 1) .await; sequencer_relayer .timeout_ms( 2_000, - "waiting for get tx guard", - get_tx_guard.wait_until_satisfied(), + "waiting for tx status guard", + tx_status_guard.wait_until_satisfied(), ) .await; @@ -362,3 +357,260 @@ async fn should_exit_if_celestia_chain_id_mismatch() { sequencer_relayer.wait_for_relayer_shutdown(100).await; } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn confirm_submission_loops_on_pending_status() { + let sequencer_relayer = TestSequencerRelayerConfig::default().spawn_relayer().await; + + sequencer_relayer.mount_abci_response(1).await; + let block_to_mount = SequencerBlockToMount::GoodAtHeight(1); + sequencer_relayer + .mount_sequencer_block_response(block_to_mount, "good block 1") + .await; + sequencer_relayer + .mount_celestia_app_broadcast_tx_response("broadcast tx 1") + .await; + + // Expect relayer to loop when it receives a PENDING status. Only respond up to the number of + // expected times, since a committed response will be mounted after. + let tx_pending_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "PENDING", 2) + .await; + // Allow 4 seconds for two `TxStatus` calls. MIN_POLL_INTERVAL_SECS is 1 with exponential + // backoff, so with two calls we're allowing 1 extra second for this mount to be satisfied. + sequencer_relayer + .timeout_ms( + 4_000, + "waiting for tx status pending guard", + tx_pending_guard.wait_until_satisfied(), + ) + .await; + + // Mount committed tx status response after sending two pending responses. Relayer should + // continue normal execution after this. + let tx_confirmed_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 2", 53, "COMMITTED", 1) + .await; + sequencer_relayer + .timeout_ms( + 6_000, + "waiting for tx status confirmed guard", + tx_confirmed_guard.wait_until_satisfied(), + ) + .await; + + // Assert the relayer reports the correct Celestia and sequencer heights. + sequencer_relayer + .wait_for_latest_confirmed_celestia_height(53, 1_000) + .await; + sequencer_relayer + .wait_for_latest_fetched_sequencer_height(1, 1_000) + .await; + sequencer_relayer + .wait_for_latest_observed_sequencer_height(1, 1_000) + .await; + + assert_eq!( + sequencer_relayer.celestia_app_received_blob_count(), + 2, + "expected 2 blobs in total, 1 header blob and 1 rollup blob" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn confirm_submission_loops_on_unknown_status_up_to_time_limit() { + let sequencer_relayer = TestSequencerRelayerConfig::default().spawn_relayer().await; + + sequencer_relayer.mount_abci_response(1).await; + let block_to_mount = SequencerBlockToMount::GoodAtHeight(1); + sequencer_relayer + .mount_sequencer_block_response(block_to_mount, "good block 1") + .await; + sequencer_relayer + .mount_celestia_app_broadcast_tx_response("broadcast tx 1") + .await; + + // Expect relayer to loop when it receives a UNKNOWN status. Only respond up to the number of + // expected times, since a committed response will be mounted after. + let tx_unknown_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "UNKNOWN", 2) + .await; + // Allow 4 seconds for two `TxStatus` calls. MIN_POLL_INTERVAL_SECS is 1 with exponential + // backoff, so with two calls we're allowing 1 extra second for this mount to be satisfied. + sequencer_relayer + .timeout_ms( + 4_000, + "waiting for tx status unknown guard", + tx_unknown_guard.wait_until_satisfied(), + ) + .await; + + // Mount committed tx status response after sending two unknown responses. Relayer should + // continue normal execution after this. + let tx_confirmed_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 2", 53, "COMMITTED", 1) + .await; + sequencer_relayer + .timeout_ms( + 6_000, + "waiting for tx status confirmed guard", + tx_confirmed_guard.wait_until_satisfied(), + ) + .await; + + // Assert the relayer reports the correct Celestia and sequencer heights. + sequencer_relayer + .wait_for_latest_confirmed_celestia_height(53, 1_000) + .await; + sequencer_relayer + .wait_for_latest_fetched_sequencer_height(1, 1_000) + .await; + sequencer_relayer + .wait_for_latest_observed_sequencer_height(1, 1_000) + .await; + + assert_eq!( + sequencer_relayer.celestia_app_received_blob_count(), + 2, + "expected 2 blobs in total, 1 header blob and 1 rollup blob" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn retries_submission_after_receiving_evicted_tx_status() { + let sequencer_relayer = TestSequencerRelayerConfig::default().spawn_relayer().await; + + sequencer_relayer.mount_abci_response(1).await; + let block_to_mount = SequencerBlockToMount::GoodAtHeight(1); + sequencer_relayer + .mount_sequencer_block_response(block_to_mount, "good block 1") + .await; + let broadcast_tx_guard_1 = sequencer_relayer + .mount_celestia_app_broadcast_tx_response_as_scoped("broadcast tx 1") + .await; + let tx_evicted_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "EVICTED", 1) + .await; + + sequencer_relayer + .timeout_ms( + 2_000, + "waiting for first broadcast tx guard and tx status evicted guard", + join( + broadcast_tx_guard_1.wait_until_satisfied(), + tx_evicted_guard.wait_until_satisfied(), + ), + ) + .await; + + // Relayer should retry submission after receiving an EVICTED status. + + let broadcast_tx_guard_2 = sequencer_relayer + .mount_celestia_app_broadcast_tx_response_as_scoped("broadcast tx 2") + .await; + let tx_confirmed_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 2", 53, "COMMITTED", 1) + .await; + sequencer_relayer + .timeout_ms( + 2_000, + "waiting for second broadcast tx guard and tx status confirmed guard", + join( + tx_confirmed_guard.wait_until_satisfied(), + broadcast_tx_guard_2.wait_until_satisfied(), + ), + ) + .await; + + // Assert the relayer reports the correct Celestia and sequencer heights. + sequencer_relayer + .wait_for_latest_confirmed_celestia_height(53, 1_000) + .await; + sequencer_relayer + .wait_for_latest_fetched_sequencer_height(1, 1_000) + .await; + sequencer_relayer + .wait_for_latest_observed_sequencer_height(1, 1_000) + .await; + + assert_eq!( + sequencer_relayer.celestia_app_received_blob_count(), + 4, + "expected 4 blobs in total, 2 header blobs and 2 rollup blobs" + ); + assert!(sequencer_relayer + .metrics_handle + .render() + .contains("astria_sequencer_relayer_celestia_evicted_transaction_count 1")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn confirm_submission_exits_for_unknown_status_after_time_limit() { + let sequencer_relayer = TestSequencerRelayerConfig::default().spawn_relayer().await; + + sequencer_relayer.mount_abci_response(1).await; + let block_to_mount = SequencerBlockToMount::GoodAtHeight(1); + sequencer_relayer + .mount_sequencer_block_response(block_to_mount, "good block 1") + .await; + + let broadcast_tx_guard_1 = sequencer_relayer + .mount_celestia_app_broadcast_tx_response_as_scoped("broadcast tx 1") + .await; + + let tx_unknown_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 1", 53, "UNKNOWN", 3) + .await; + + sequencer_relayer + .timeout_ms( + 8_000, + "waiting for first broadcast tx guard and tx status evicted guard", + join( + broadcast_tx_guard_1.wait_until_satisfied(), + tx_unknown_guard.wait_until_satisfied(), + ), + ) + .await; + + // Relayer should retry submission after receiving an UNKNOWN status more than 10s after + // beginning to poll. + + let broadcast_tx_guard_2 = sequencer_relayer + .mount_celestia_app_broadcast_tx_response_as_scoped("broadcast tx 2") + .await; + let tx_confirmed_guard = sequencer_relayer + .mount_celestia_app_tx_status_response_as_scoped("tx status 2", 53, "COMMITTED", 1) + .await; + sequencer_relayer + .timeout_ms( + 6_000, + "waiting for second broadcast tx guard and tx status confirmed guard", + join( + tx_confirmed_guard.wait_until_satisfied(), + broadcast_tx_guard_2.wait_until_satisfied(), + ), + ) + .await; + + // Assert the relayer reports the correct Celestia and sequencer heights. + sequencer_relayer + .wait_for_latest_confirmed_celestia_height(53, 1_000) + .await; + sequencer_relayer + .wait_for_latest_fetched_sequencer_height(1, 1_000) + .await; + sequencer_relayer + .wait_for_latest_observed_sequencer_height(1, 1_000) + .await; + + assert_eq!( + sequencer_relayer.celestia_app_received_blob_count(), + 4, + "expected 4 blobs in total, 2 header blobs and 2 rollup blobs" + ); + assert!(sequencer_relayer + .metrics_handle + .render() + .contains("astria_sequencer_relayer_celestia_unknown_status_transaction_count 1")); +} diff --git a/proto/vendored/celestia_app/celestia/core/tx/tx.proto b/proto/vendored/celestia_app/celestia/core/tx/tx.proto new file mode 100644 index 0000000000..06c819929f --- /dev/null +++ b/proto/vendored/celestia_app/celestia/core/tx/tx.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; +package celestia.core.v1.tx; + +// This file contains types which are copied from +// https://github.com/celestiaorg/celestia-app/blob/8cbda26c536f0ecd5028fb6e050d18badb774e01/proto/celestia/core/v1/tx/tx.proto +// (v3.3.0 tag). + +// Service defines a gRPC service for interacting with transactions. +service Tx { + // TxStatus returns the status of a transaction. There are four possible states: + // - Committed + // - Pending + // - Evicted + // - Unknown + rpc TxStatus(TxStatusRequest) returns (TxStatusResponse); +} + +// TxStatusRequest is the request type for the TxStatus gRPC method. +message TxStatusRequest { + // this is the hex encoded transaction hash (should be 64 bytes long) + string tx_id = 1; +} + +// TxStatusResponse is the response type for the TxStatus gRPC method. +message TxStatusResponse { + int64 height = 1; + uint32 index = 2; + // execution_code is returned when the transaction has been committed + // and returns whether it was successful or errored. A non zero + // execution code indicated an error. + uint32 execution_code = 3; + // error log for failed transactions. + string error = 4; + // status is the status of the transaction. + string status = 5; +}