Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
598 changes: 301 additions & 297 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ default-features = false
[workspace.dependencies.snarkvm]
#path = "../snarkVM"
git = "https://github.com/ProvableHQ/snarkVM.git"
rev = "4e3855d69"
#rev = "4e3855d69"
#version = "=4.4.0"
branch = "feat/track-error"
default-features = false

[workspace.dependencies.anyhow]
Expand Down
1 change: 1 addition & 0 deletions cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ impl Start {

// Set up the tokio Runtime.
// TODO(kaimast): set up a panic handler here for each worker thread once [`tokio::runtime::Builder::unhandled_panic`](https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.unhandled_panic) is stabilized.
// As of now, detached tasks may panic and the error may not be handled by the top-level `catch_unwind`.
runtime::Builder::new_multi_thread()
.enable_all()
.thread_stack_size(8 * 1024 * 1024)
Expand Down
14 changes: 12 additions & 2 deletions display/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use ratatui::{
};
use std::{
io,
io::Write,
sync::Arc,
thread,
time::{Duration, Instant},
Expand Down Expand Up @@ -94,11 +95,20 @@ impl<N: Network> Display<N> {
execute!(terminal.backend_mut(), LeaveAlternateScreen, DisableMouseCapture)?;
terminal.show_cursor()?;

// Exit.
// Print any error that may have occurred.
if let Err(err) = res {
println!("{err:?}")
eprintln!("{err:?}");
}

// Write any remaining log output to stdout while the node is shutting down.
let mut log_receiver = display.logs.into_log_receiver();
tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some(log) = log_receiver.recv().await {
let _ = write!(stdout, "{}", String::from_utf8(log).unwrap_or_default());
}
});

Ok(())
}
}
Expand Down
4 changes: 4 additions & 0 deletions display/src/pages/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,8 @@ impl Logs {
.block(Block::default().borders(Borders::ALL).style(header_style()).title("Logs"));
f.render_widget(combined_logs, chunks[0]);
}

pub fn into_log_receiver(self) -> mpsc::Receiver<Vec<u8>> {
self.log_receiver
}
}
2 changes: 1 addition & 1 deletion node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ workspace = true

[dependencies.snarkvm]
workspace = true
features = [ "utilities" ]
features = [ "utilities", "async" ]

[dependencies.time]
workspace = true
Expand Down
88 changes: 48 additions & 40 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ use snarkvm::{
puzzle::{Solution, SolutionID},
},
prelude::{ConsensusVersion, committee::Committee},
utilities::flatten_error,
utilities::{
flatten_error,
task::{self, JoinHandle},
},
};

use aleo_std::StorageMode;
Expand All @@ -83,7 +86,7 @@ use std::{
};
#[cfg(not(feature = "locktick"))]
use tokio::sync::Mutex as TMutex;
use tokio::{sync::OnceCell, task::JoinHandle};
use tokio::sync::OnceCell;

/// A helper type for an optional proposed batch.
pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
Expand Down Expand Up @@ -424,7 +427,7 @@ impl<N: Network> Primary<N> {
// Resend the batch proposal to the validator for signing.
Some(peer_ip) => {
let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
tokio::spawn(async move {
task::spawn(async move {
debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
// Resend the batch proposal to the peer.
if gateway.send(peer_ip, event_).await.is_none() {
Expand Down Expand Up @@ -594,14 +597,13 @@ impl<N: Network> Primary<N> {
}

// Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
let transaction = spawn_blocking!({
match transaction {
Data::Object(transaction) => Ok(transaction),
Data::Buffer(bytes) => {
Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
}
let transaction = task::spawn_blocking(|| match transaction {
Data::Object(transaction) => Ok(transaction),
Data::Buffer(bytes) => {
Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))
}
})?;
})
.await?;

// TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
// ConsensusVersion V8 Migration logic -
Expand Down Expand Up @@ -696,15 +698,18 @@ impl<N: Network> Primary<N> {
// Prepare the previous batch certificate IDs.
let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
// Sign the batch header and construct the proposal.
let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
&private_key,
round,
current_timestamp,
committee_id,
transmission_ids,
previous_certificate_ids,
&mut rand::thread_rng()
))
let (batch_header, proposal) = task::spawn_blocking(move || {
BatchHeader::new(
&private_key,
round,
current_timestamp,
committee_id,
transmission_ids,
previous_certificate_ids,
&mut rand::thread_rng(),
)
})
.await
.and_then(|batch_header| {
Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
.map(|proposal| (batch_header, proposal))
Expand Down Expand Up @@ -737,7 +742,7 @@ impl<N: Network> Primary<N> {
let BatchPropose { round: batch_round, batch_header } = batch_propose;

// Deserialize the batch header.
let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
let batch_header = task::spawn_blocking(|| batch_header.deserialize_blocking()).await?;
// Ensure the round matches in the batch header.
if batch_round != batch_header.round() {
// Proceed to disconnect the validator.
Expand Down Expand Up @@ -803,7 +808,7 @@ impl<N: Network> Primary<N> {
// Instead, rebroadcast the cached signature to the peer.
if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
let gateway = self.gateway.clone();
tokio::spawn(async move {
task::spawn(async move {
debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
// Resend the batch signature to the peer.
Expand Down Expand Up @@ -870,10 +875,10 @@ impl<N: Network> Primary<N> {

// Ensure the batch header from the peer is valid.
let (storage, header) = (self.storage.clone(), batch_header.clone());

// Check the batch header, and return early if it already exists in storage.
let Some(missing_transmissions) =
spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
let Some(missing_transmissions) = task::spawn_blocking(move || {
storage.check_batch_header(&header, missing_transmissions, Default::default())
})
.await?
else {
return Ok(());
};
Expand Down Expand Up @@ -902,14 +907,13 @@ impl<N: Network> Primary<N> {
(transmission_id, transmission)
{
// Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
let transaction = spawn_blocking!({
match transaction {
Data::Object(transaction) => Ok(transaction),
Data::Buffer(bytes) => {
Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
}
let transaction = task::spawn_blocking(|| match transaction {
Data::Object(transaction) => Ok(transaction),
Data::Buffer(bytes) => {
Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))
}
})?;
})
.await?;

// TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
// ConsensusVersion V8 Migration logic -
Expand Down Expand Up @@ -967,7 +971,7 @@ impl<N: Network> Primary<N> {
let batch_id = batch_header.batch_id();
// Sign the batch ID.
let account = self.gateway.account().clone();
let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
let signature = task::spawn_blocking(move || account.sign(&[batch_id], &mut rand::thread_rng())).await?;

// Ensure the proposal has not already been signed.
//
Expand Down Expand Up @@ -995,7 +999,7 @@ impl<N: Network> Primary<N> {

// Broadcast the signature back to the validator.
let self_ = self.clone();
tokio::spawn(async move {
task::spawn(async move {
let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
// Send the batch signature to the peer.
if self_.gateway.send(peer_ip, event).await.is_some() {
Expand Down Expand Up @@ -1040,7 +1044,7 @@ impl<N: Network> Primary<N> {
}

let self_ = self.clone();
let Some(proposal) = spawn_blocking!({
let Some(proposal) = task::spawn_blocking(move || {
// Acquire the write lock.
let mut proposed_batch = self_.proposed_batch.write();
// Add the signature to the batch, and determine if the batch is ready to be certified.
Expand Down Expand Up @@ -1097,7 +1101,7 @@ impl<N: Network> Primary<N> {
Some(proposal) => Ok(Some(proposal)),
None => Ok(None),
}
})?
}).await?
else {
return Ok(());
};
Expand Down Expand Up @@ -1243,7 +1247,7 @@ impl<N: Network> Primary<N> {

// Retrieve the block locators.
let self__ = self_.clone();
let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
let block_locators = match task::spawn_blocking(move || self__.sync.get_block_locators()).await {
Ok(block_locators) => block_locators,
Err(e) => {
warn!("Failed to retrieve block locators - {e}");
Expand Down Expand Up @@ -1672,7 +1676,8 @@ impl<N: Network> Primary<N> {
let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
// Store the certified batch.
let (storage, certificate_) = (self.storage.clone(), certificate.clone());
spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
task::spawn_blocking(move || storage.insert_certificate(certificate_, transmissions, Default::default()))
.await?;
debug!("Stored a batch certificate for round {}", certificate.round());
// If a BFT sender was provided, send the certificate to the BFT.
if let Some(bft_sender) = self.bft_sender.get() {
Expand Down Expand Up @@ -1760,7 +1765,10 @@ impl<N: Network> Primary<N> {
if !self.storage.contains_certificate(certificate.id()) {
// Store the batch certificate.
let (storage, certificate_) = (self.storage.clone(), certificate.clone());
spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
task::spawn_blocking(move || {
storage.insert_certificate(certificate_, missing_transmissions, Default::default())
})
.await?;
debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
// If a BFT sender was provided, send the round and certificate to the BFT.
if let Some(bft_sender) = self.bft_sender.get() {
Expand Down Expand Up @@ -1972,7 +1980,7 @@ impl<N: Network> Primary<N> {
impl<N: Network> Primary<N> {
/// Spawns a task with the given future; it should only be used for long-running tasks.
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
self.handles.lock().push(task::spawn(future));
}

/// Shuts down the primary.
Expand Down
15 changes: 9 additions & 6 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ use snarkvm::{
types::Field,
},
ledger::{PendingBlock, authority::Authority, block::Block, narwhal::BatchCertificate},
utilities::{cfg_into_iter, cfg_iter, ensure_equals, flatten_error},
utilities::{
cfg_into_iter,
cfg_iter,
ensure_equals,
flatten_error,
task::{self, JoinHandle},
},
};

use anyhow::{Context, Result, anyhow, bail, ensure};
Expand All @@ -52,10 +58,7 @@ use std::{
};
#[cfg(not(feature = "locktick"))]
use tokio::sync::Mutex as TMutex;
use tokio::{
sync::{OnceCell, oneshot},
task::JoinHandle,
};
use tokio::sync::{OnceCell, oneshot};

/// Block synchronization logic for validators.
///
Expand Down Expand Up @@ -1042,7 +1045,7 @@ impl<N: Network> Sync<N> {
impl<N: Network> Sync<N> {
/// Spawns a task with the given future; it should only be used for long-running tasks.
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
self.handles.lock().push(task::spawn(future));
}

/// Shuts down the primary.
Expand Down
Loading