diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ec791537854..cf5aa3785bc 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -126,7 +126,7 @@ use std::time::Duration; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; use store::{ BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary, - KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, + KeyValueStore, KeyValueStoreOp, PayloadStatusFilter, StoreItem, StoreOp, }; use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor}; use tokio_stream::Stream; @@ -2019,7 +2019,14 @@ impl BeaconChain { } else { let (advanced_state_root, mut state) = self .store - .get_advanced_hot_state(beacon_block_root, request_slot, beacon_state_root)? + .get_advanced_hot_state( + beacon_block_root, + request_slot, + beacon_state_root, + // TODO(gloas): The payload status does not change the shuffling nor the + // justification checkpoint + PayloadStatusFilter::Any, + )? .ok_or(Error::MissingBeaconState(beacon_state_root))?; if state.current_epoch() < request_epoch { partial_state_advance( @@ -4613,12 +4620,17 @@ impl BeaconChain { // Atomically read some values from the head whilst avoiding holding cached head `Arc` any // longer than necessary. - let (head_slot, head_block_root, head_state_root) = { + let (head_slot, head_block_root, head_state_root, head_payload_status) = { let head = self.canonical_head.cached_head(); ( head.head_slot(), head.head_block_root(), head.head_state_root(), + if head.snapshot.beacon_state.is_parent_block_full() { + PayloadStatusFilter::Full + } else { + PayloadStatusFilter::Empty + }, ) }; let (state, state_root_opt) = if head_slot < slot { @@ -4637,7 +4649,12 @@ impl BeaconChain { // state cache thanks to the state advance timer. let (state_root, state) = self .store - .get_advanced_hot_state(head_block_root, slot, head_state_root) + .get_advanced_hot_state( + head_block_root, + slot, + head_state_root, + head_payload_status, + ) .map_err(BlockProductionError::FailedToLoadState)? .ok_or(BlockProductionError::UnableToProduceAtSlot(slot))?; (state, Some(state_root)) @@ -4741,7 +4758,12 @@ impl BeaconChain { let (state_root, state) = self .store - .get_advanced_hot_state_from_cache(re_org_parent_block, slot) + .get_advanced_hot_state_from_cache( + re_org_parent_block, + slot, + // TODO(gloas): Use the correct payload status for the re-org parent. + PayloadStatusFilter::Any, + ) .or_else(|| { warn!(reason = "no state in cache", "Not attempting re-org"); None @@ -4878,7 +4900,13 @@ impl BeaconChain { .ok_or(Error::MissingBeaconBlock(parent_block_root))?; let (state_root, state) = self .store - .get_advanced_hot_state(parent_block_root, proposal_slot, block.state_root())? + .get_advanced_hot_state( + parent_block_root, + proposal_slot, + block.state_root(), + // TODO(gloas): The post-state of the block and payload have the same proposers? + PayloadStatusFilter::Any, + )? .ok_or(Error::MissingBeaconState(block.state_root()))?; (Cow::Owned(state), state_root) }; @@ -6907,7 +6935,13 @@ impl BeaconChain { } else { let (state_root, state) = self .store - .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)? + .get_advanced_hot_state( + head_block_root, + target_slot, + head_block.state_root, + // TODO(gloas): The post-state of the block and payload have the same shuffling? + PayloadStatusFilter::Any, + )? .ok_or(Error::MissingBeaconState(head_block.state_root))?; (state, state_root) }; diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index fe111628dbf..36dcdd75fb6 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -15,6 +15,7 @@ use crate::{BeaconChainError, metrics}; use kzg::{Error as KzgError, Kzg, KzgCommitment}; use ssz_derive::{Decode, Encode}; use std::time::Duration; +use store::PayloadStatusFilter; use tracing::{debug, instrument}; use tree_hash::TreeHash; use types::data::BlobIdentifier; @@ -510,7 +511,13 @@ pub fn validate_blob_sidecar_for_gossip>( // prior to the finalized slot (which is invalid and inaccessible in our DB schema). let (parent_state_root, state) = chain .store - .get_advanced_hot_state(root, block.slot(), parent_block.state_root())? + .get_advanced_hot_state( + root, + block.slot(), + parent_block.state_root(), + PayloadStatusFilter::Any, + )? .ok_or_else(|| { BeaconChainError::DBInconsistent( format!("Missing state for parent block {root:?}",), diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index e5b656adf8d..47624097899 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -39,7 +39,7 @@ use state_processing::{AllCaches, per_slot_processing}; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; +use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp, PayloadStatusFilter}; use task_executor::{ShutdownReason, TaskExecutor}; use tracing::{debug, error, info}; use tree_hash::TreeHash; @@ -805,7 +805,12 @@ where }; let (_head_state_root, head_state) = store - .get_advanced_hot_state(head_block_root, current_slot, head_block.state_root()) + .get_advanced_hot_state( + head_block_root, + current_slot, + head_block.state_root(), + PayloadStatusFilter::Any, + ) .map_err(|e| descriptive_db_error("head state", &e))? .ok_or("Head state not found in store")?; diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index db071db166d..3efbd381eff 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -55,7 +55,8 @@ use state_processing::AllCaches; use std::sync::Arc; use std::time::Duration; use store::{ - Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator, + Error as StoreError, KeyValueStore, KeyValueStoreOp, PayloadStatusFilter, StoreConfig, + iter::StateRootsIterator, }; use task_executor::{JoinHandle, ShutdownReason}; use tracing::info_span; @@ -306,7 +307,12 @@ impl CanonicalHead { .ok_or(Error::MissingBeaconBlock(beacon_block_root))?; let current_slot = fork_choice.fc_store().get_current_slot(); let (_, beacon_state) = store - .get_advanced_hot_state(beacon_block_root, current_slot, beacon_block.state_root())? + .get_advanced_hot_state( + beacon_block_root, + current_slot, + beacon_block.state_root(), + PayloadStatusFilter::Any, + )? .ok_or(Error::MissingBeaconState(beacon_block.state_root()))?; let snapshot = BeaconSnapshot { @@ -679,6 +685,7 @@ impl BeaconChain { new_view.head_block_root, current_slot, beacon_block.state_root(), + PayloadStatusFilter::Any, )? .ok_or(Error::MissingBeaconState(beacon_block.state_root()))?; diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index cf3385ec5b0..2c3c5860f47 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -16,6 +16,7 @@ use ssz_types::VariableList; use std::iter; use std::marker::PhantomData; use std::sync::Arc; +use store::PayloadStatusFilter; use tracing::{debug, instrument}; use types::data::ColumnIndex; use types::{ @@ -709,7 +710,13 @@ fn verify_proposer_and_signature( ); chain .store - .get_advanced_hot_state(block_parent_root, column_slot, parent_block.state_root) + .get_advanced_hot_state( + block_parent_root, + column_slot, + parent_block.state_root, + // TODO(gloas): The post-state of the block and payload have the same proposers? + PayloadStatusFilter::Any, + ) .map_err(|e| GossipDataColumnError::BeaconChainError(Box::new(e.into())))? .ok_or_else(|| { GossipDataColumnError::BeaconChainError(Box::new( diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v24.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v24.rs index 1e1823a8364..af6085c405f 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v24.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v24.rs @@ -329,7 +329,7 @@ pub fn upgrade_to_v24( ); } else { // 1. Store snapshot or diff at this slot (if required). - let storage_strategy = db.hot_storage_strategy(slot)?; + let storage_strategy = db.hot_storage_strategy(slot, Slot::new(0), false)?; debug!( %slot, ?state_root, @@ -394,6 +394,8 @@ pub fn upgrade_to_v24( slot, latest_block_root: old_summary.latest_block_root, latest_block_slot: old_summary.latest_block_slot, + // TODO(gloas): this migration is using the previous type + latest_block_is_full: false, previous_state_root, diff_base_state, }; diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index cb916cb5142..13deadfa362 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -23,6 +23,7 @@ use std::sync::{ Arc, atomic::{AtomicBool, Ordering}, }; +use store::PayloadStatusFilter; use task_executor::TaskExecutor; use tokio::time::{Instant, sleep, sleep_until}; use tracing::{Instrument, debug, debug_span, error, instrument, warn}; @@ -272,14 +273,27 @@ fn advance_head(beacon_chain: &Arc>) -> Resu } } - let (head_block_root, head_block_state_root) = { + let (head_block_root, head_block_state_root, head_payload_status) = { let snapshot = beacon_chain.head_snapshot(); - (snapshot.beacon_block_root, snapshot.beacon_state_root()) + ( + snapshot.beacon_block_root, + snapshot.beacon_state_root(), + if snapshot.beacon_state.is_parent_block_full() { + PayloadStatusFilter::Full + } else { + PayloadStatusFilter::Empty + }, + ) }; let (head_state_root, mut state) = beacon_chain .store - .get_advanced_hot_state(head_block_root, current_slot, head_block_state_root)? + .get_advanced_hot_state( + head_block_root, + current_slot, + head_block_state_root, + head_payload_status, + )? .ok_or(Error::HeadMissingFromSnapshotCache(head_block_root))?; let initial_slot = state.slot(); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ea5f735bded..87dee446c4e 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3797,7 +3797,12 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { let (split_state_root, mut advanced_split_state) = harness .chain .store - .get_advanced_hot_state(split.block_root, split.slot, split.state_root) + .get_advanced_hot_state( + split.block_root, + split.slot, + split.state_root, + store::PayloadStatusFilter::Full, + ) .unwrap() .unwrap(); complete_state_advance( diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6e165702a27..ab7db050be8 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -13,8 +13,8 @@ use crate::metadata::{ }; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ - BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, StoreItem, - StoreOp, get_data_column_key, + BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, + PayloadStatusFilter, StoreItem, StoreOp, get_data_column_key, metrics::{self, COLD_METRIC, HOT_METRIC}, parse_data_column_key, }; @@ -451,15 +451,50 @@ impl HotColdDB, BeaconNodeBackend> { } impl, Cold: ItemStore> HotColdDB { + /// Post Gloas cold storage stores a mix of post-block and post-payload states, whichever is + /// canonical, such that there's a single state per slot. fn cold_storage_strategy(&self, slot: Slot) -> Result { // The start slot for the freezer HDiff is always 0 Ok(self.hierarchy.storage_strategy(slot, Slot::new(0))?) } - pub fn hot_storage_strategy(&self, slot: Slot) -> Result { - Ok(self - .hierarchy - .storage_strategy(slot, self.hot_hdiff_start_slot()?)?) + /// Post Gloas hot storage contains both post-block and post-payload states. + pub fn hot_storage_strategy( + &self, + slot: Slot, + latest_block_slot: Slot, + latest_block_is_full: bool, + ) -> Result { + if slot == latest_block_slot && latest_block_is_full { + // Replay from the post-block state + Ok(StorageStrategy::ReplayFrom(slot)) + } else { + Ok(self + .hierarchy + .storage_strategy(slot, self.hot_hdiff_start_slot()?)?) + } + } + + pub fn hot_storage_strategy_of_state( + &self, + state: &BeaconState, + ) -> Result { + self.hot_storage_strategy( + state.slot(), + state.latest_block_header().slot, + state.is_parent_block_full(), + ) + } + + pub fn hot_storage_strategy_of_summary( + &self, + summary: &HotStateSummary, + ) -> Result { + self.hot_storage_strategy( + summary.slot, + summary.latest_block_slot, + summary.latest_block_is_full, + ) } pub fn hot_hdiff_start_slot(&self) -> Result { @@ -1142,8 +1177,11 @@ impl, Cold: ItemStore> HotColdDB block_root: Hash256, max_slot: Slot, state_root: Hash256, + payload_status: PayloadStatusFilter, ) -> Result)>, Error> { - if let Some(cached) = self.get_advanced_hot_state_from_cache(block_root, max_slot) { + if let Some(cached) = + self.get_advanced_hot_state_from_cache(block_root, max_slot, payload_status) + { return Ok(Some(cached)); } @@ -1213,10 +1251,11 @@ impl, Cold: ItemStore> HotColdDB &self, block_root: Hash256, max_slot: Slot, + payload_status: PayloadStatusFilter, ) -> Option<(Hash256, BeaconState)> { self.state_cache .lock() - .get_by_block_root(block_root, max_slot) + .get_by_block_root(block_root, max_slot, payload_status) } /// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk. @@ -1387,8 +1426,10 @@ impl, Cold: ItemStore> HotColdDB // NOTE: `hot_storage_strategy` can error if there are states in the database // prior to the `anchor_slot`. This can happen if checkpoint sync has been // botched and left some states in the database prior to completing. + // NOTE: Use placeholder values for `latest_block_slot = 0` and + // `latest_block_is_full = false` to attempt to delete the diffs if they exist if let Some(slot) = slot - && let Ok(strategy) = self.hot_storage_strategy(slot) + && let Ok(strategy) = self.hot_storage_strategy(slot, Slot::new(0), false) { match strategy { StorageStrategy::Snapshot => { @@ -1608,7 +1649,7 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeletePayloadEnvelope(_) => (), - StoreOp::DeleteState(_, _) => (), + StoreOp::DeleteState { .. } => (), StoreOp::DeleteBlobs(_) => (), @@ -1695,7 +1736,7 @@ impl, Cold: ItemStore> HotColdDB debug!( ?state_root, slot = %state.slot(), - storage_strategy = ?self.hot_storage_strategy(state.slot())?, + storage_strategy = ?self.hot_storage_strategy_of_state(state)?, diff_base_state = %summary.diff_base_state, previous_state_root = ?summary.previous_state_root, "Storing hot state summary and diffs" @@ -1718,7 +1759,7 @@ impl, Cold: ItemStore> HotColdDB self, *state_root, state, - self.hot_storage_strategy(state.slot())?, + self.hot_storage_strategy_of_state(state)?, )?; ops.push(hot_state_summary.as_kv_store_op(*state_root)); Ok(hot_state_summary) @@ -1731,7 +1772,7 @@ impl, Cold: ItemStore> HotColdDB ops: &mut Vec, ) -> Result<(), Error> { let slot = state.slot(); - let storage_strategy = self.hot_storage_strategy(slot)?; + let storage_strategy = self.hot_storage_strategy_of_state(state)?; match storage_strategy { StorageStrategy::ReplayFrom(_) => { // Already have persisted the state summary, don't persist anything else @@ -1853,17 +1894,18 @@ impl, Cold: ItemStore> HotColdDB return Ok(buffer); } - let Some(HotStateSummary { - slot, - diff_base_state, - .. - }) = self.load_hot_state_summary(&state_root)? - else { + let Some(summary) = self.load_hot_state_summary(&state_root)? else { return Err(Error::MissingHotStateSummary(state_root)); }; - let buffer = match self.hot_storage_strategy(slot)? { + let buffer = match self.hot_storage_strategy_of_summary(&summary)? { + // TODO(gloas): The slot is not enough to know if this state root if for the snapshot. + // Consider two API calls: + // 1. requests the post-block state of block at slot 2^21 = Load snapshot + // 2. requests the post-payload state of block at slot 2^21 = Load snapshot of + // post-block state at slot 2^21 + replay the payload StorageStrategy::Snapshot => { + // Load existing snapshots to help debug the status of the DB. There's 1 let Some(state) = self.load_hot_state_as_snapshot(state_root)? else { let existing_snapshots = self.load_hot_state_snapshot_roots()?; debug!( @@ -1871,12 +1913,12 @@ impl, Cold: ItemStore> HotColdDB existing_snapshots = ?existing_snapshots, "Missing hot state snapshot" ); - return Err(Error::MissingHotStateSnapshot(state_root, slot)); + return Err(Error::MissingHotStateSnapshot(state_root, summary.slot)); }; HDiffBuffer::from_state(state) } StorageStrategy::DiffFrom(from_slot) => { - let from_state_root = diff_base_state.get_root(from_slot)?; + let from_state_root = summary.diff_base_state.get_root(from_slot)?; let mut buffer = self.load_hot_hdiff_buffer(from_state_root).map_err(|e| { Error::LoadingHotHdiffBufferError( format!("load hdiff DiffFrom {from_slot} {state_root}"), @@ -1893,7 +1935,7 @@ impl, Cold: ItemStore> HotColdDB buffer } StorageStrategy::ReplayFrom(from_slot) => { - let from_state_root = diff_base_state.get_root(from_slot)?; + let from_state_root = summary.diff_base_state.get_root(from_slot)?; self.load_hot_hdiff_buffer(from_state_root).map_err(|e| { Error::LoadingHotHdiffBufferError( format!("load hdiff ReplayFrom {from_slot} {state_root}"), @@ -1907,7 +1949,7 @@ impl, Cold: ItemStore> HotColdDB // Add buffer to cache for future calls. self.state_cache .lock() - .put_hdiff_buffer(state_root, slot, &buffer); + .put_hdiff_buffer(state_root, summary.slot, &buffer); Ok(buffer) } @@ -1939,14 +1981,8 @@ impl, Cold: ItemStore> HotColdDB ) -> Result, Hash256)>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); - if let Some(HotStateSummary { - slot, - latest_block_root, - diff_base_state, - .. - }) = self.load_hot_state_summary(state_root)? - { - let mut state = match self.hot_storage_strategy(slot)? { + if let Some(summary) = self.load_hot_state_summary(state_root)? { + let mut state = match self.hot_storage_strategy_of_summary(&summary)? { strat @ StorageStrategy::Snapshot | strat @ StorageStrategy::DiffFrom(_) => { let buffer_timer = metrics::start_timer_vec( &metrics::BEACON_HDIFF_BUFFER_LOAD_TIME, @@ -1954,7 +1990,7 @@ impl, Cold: ItemStore> HotColdDB ); let buffer = self.load_hot_hdiff_buffer(*state_root).map_err(|e| { Error::LoadingHotHdiffBufferError( - format!("load state {strat:?} {slot}"), + format!("load state {strat:?} {}", summary.slot), *state_root, e.into(), ) @@ -1971,7 +2007,7 @@ impl, Cold: ItemStore> HotColdDB state } StorageStrategy::ReplayFrom(from_slot) => { - let from_state_root = diff_base_state.get_root(from_slot)?; + let from_state_root = summary.diff_base_state.get_root(from_slot)?; let (mut base_state, _) = self .load_hot_state(&from_state_root, update_cache) @@ -1984,7 +2020,7 @@ impl, Cold: ItemStore> HotColdDB })? .ok_or(HotColdDBError::MissingHotState { state_root: from_state_root, - requested_by_state_summary: (*state_root, slot), + requested_by_state_summary: (*state_root, summary.slot), })?; // Immediately rebase the state from disk on the finalized state so that we can @@ -1995,15 +2031,15 @@ impl, Cold: ItemStore> HotColdDB self.load_hot_state_using_replay( base_state, - slot, - latest_block_root, + summary.slot, + summary.latest_block_root, update_cache, )? } }; state.apply_pending_mutations()?; - Ok(Some((state, latest_block_root))) + Ok(Some((state, summary.latest_block_root))) } else { Ok(None) } @@ -2300,7 +2336,8 @@ impl, Cold: ItemStore> HotColdDB } metrics::inc_counter(&metrics::STORE_BEACON_HISTORIC_STATE_CACHE_MISS); - return self.load_cold_state_by_slot_using_replay(cached_state, slot); + // TODO + return self.load_cold_state_by_slot_using_replay(cached_state, slot, true); } metrics::inc_counter(&metrics::STORE_BEACON_HISTORIC_STATE_CACHE_MISS); @@ -2325,7 +2362,8 @@ impl, Cold: ItemStore> HotColdDB // No prior state found in cache (above), need to load by diffing and then // replaying. let base_state = self.load_cold_state_by_slot(from)?; - self.load_cold_state_by_slot_using_replay(base_state, slot) + // TODO + self.load_cold_state_by_slot_using_replay(base_state, slot, true) } } } @@ -2334,6 +2372,7 @@ impl, Cold: ItemStore> HotColdDB &self, mut base_state: BeaconState, slot: Slot, + _end_slot_payload_present: bool, ) -> Result, Error> { if !base_state.all_caches_built() { // Build all caches and update the historic state cache so that these caches may be used @@ -2357,6 +2396,7 @@ impl, Cold: ItemStore> HotColdDB } let blocks = self.load_cold_blocks(base_state.slot() + 1, slot)?; + // TODO(gloas): Need to load blocks and payloads, using end_slot_payload_present // Include state root for base state as it is required by block processing to not // have to hash the state. @@ -2480,6 +2520,15 @@ impl, Cold: ItemStore> HotColdDB })? } + pub fn load_cold_payloads( + &self, + _start_slot: Slot, + _end_slot: Slot, + _end_slot_payload_present: bool, + ) -> Result<(), Error> { + Ok(()) + } + /// Load the blocks between `start_slot` and `end_slot` by backtracking from `end_block_hash`. /// /// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot @@ -3611,6 +3660,8 @@ pub fn migrate_database, Cold: ItemStore>( let mut cold_db_block_ops = vec![]; // Iterate in descending order until the current split slot + // Post Gloas we allow the freezer DB to contain both post-block and post-payload states. The + // state_roots vector defines which are canonical. let state_roots: Vec<_> = process_results(RootsIterator::new(&store, finalized_state), |iter| { iter.take_while(|(_, _, slot)| *slot >= current_split.slot) @@ -3915,6 +3966,7 @@ pub struct HotStateSummary { pub slot: Slot, pub latest_block_root: Hash256, pub latest_block_slot: Slot, + pub latest_block_is_full: bool, pub diff_base_state: OptionalDiffBaseState, pub previous_state_root: Hash256, } @@ -4036,6 +4088,7 @@ impl HotStateSummary { slot: state.slot(), latest_block_root, latest_block_slot: state.latest_block_header().slot, + latest_block_is_full: state.is_parent_block_full(), diff_base_state, previous_state_root, }) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index ee9cfce0ecc..cb207f44e66 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -34,6 +34,7 @@ pub use crate::metadata::BlobInfo; pub use errors::Error; pub use metadata::AnchorInfo; pub use metrics::scrape_for_metrics; +pub use state_cache::PayloadStatusFilter; use std::collections::HashSet; use std::sync::Arc; use strum::{EnumIter, EnumString, IntoStaticStr}; diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 4b0d1ee0160..ae280a8aa30 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -17,13 +17,33 @@ const CULL_EXEMPT_DENOMINATOR: usize = 10; /// be culled from the cache. const EPOCH_FINALIZATION_LIMIT: u64 = 4; +/// Whether the execution payload for a block has been included (full) or not (empty). +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +enum PayloadStatus { + Empty, + Full, +} + +/// Filter for querying states by payload status. +#[derive(Debug, Clone, Copy)] +pub enum PayloadStatusFilter { + /// Only return the state where the execution payload was included. + Full, + /// Only return the state where the execution payload was not included. + Empty, + /// Return any state, preferring full to empty. + Any, +} + #[derive(Debug)] pub struct FinalizedState { state_root: Hash256, state: BeaconState, } -/// Map from block_root -> slot -> state_root. +/// Map from block_root -> slot -> payload_status -> state_root. +/// TODO(gloas): Before gloas there's a state for each (block_root, slot). After Gloas there's a +/// state for each (block_root, payload_status, slot), in that order. #[derive(Debug, Default)] pub struct BlockMap { blocks: HashMap, @@ -32,7 +52,7 @@ pub struct BlockMap { /// Map from slot -> state_root. #[derive(Debug, Default)] pub struct SlotMap { - slots: BTreeMap, + slots: BTreeMap>, } #[derive(Debug)] @@ -42,6 +62,8 @@ pub struct StateCache { // the state_root states: LruCache)>, block_map: BlockMap, + // This cache stores finalized HDiff buffers. The persisted states are always for the post-block + // empty state without applying the payload. So we don't index by PayloadStatus. hdiff_buffers: HotHDiffBufferCache, max_epoch: Epoch, head_block_root: Hash256, @@ -131,7 +153,12 @@ impl StateCache { } // Add to block map. - self.block_map.insert(block_root, state.slot(), state_root); + self.block_map.insert( + block_root, + state.slot(), + PayloadStatus::from_state(&state), + state_root, + ); // Prune block map. let state_roots_to_prune = self.block_map.prune(state.slot()); @@ -157,8 +184,11 @@ impl StateCache { // useful buffers. let slot = state.slot(); if pre_finalized_slots_to_retain.contains(&slot) { - let hdiff_buffer = HDiffBuffer::from_state(state); - self.hdiff_buffers.put(state_root, slot, hdiff_buffer); + // Only keep the HDiff buffers of post-block states before applying the payload + if !state.is_parent_block_full() { + let hdiff_buffer = HDiffBuffer::from_state(state); + self.hdiff_buffers.put(state_root, slot, hdiff_buffer); + } } } } @@ -213,9 +243,11 @@ impl StateCache { // caller's responsibility to not feed us garbage) as we don't want to thread the // hierarchy config through here. So any state received is converted to an // HDiffBuffer and saved. - let hdiff_buffer = HDiffBuffer::from_state(state.clone()); - self.hdiff_buffers - .put(state_root, state.slot(), hdiff_buffer); + if !state.is_parent_block_full() { + let hdiff_buffer = HDiffBuffer::from_state(state.clone()); + self.hdiff_buffers + .put(state_root, state.slot(), hdiff_buffer); + } return Ok(PutStateOutcome::PreFinalizedHDiffBuffer); } } @@ -253,8 +285,12 @@ impl StateCache { } // Record the connection from block root and slot to this state. - let slot = state.slot(); - self.block_map.insert(block_root, slot, state_root); + self.block_map.insert( + block_root, + state.slot(), + PayloadStatus::from_state(state), + state_root, + ); Ok(PutStateOutcome::New(deleted_states)) } @@ -304,6 +340,7 @@ impl StateCache { &mut self, block_root: Hash256, slot: Slot, + payload_status: PayloadStatusFilter, ) -> Option<(Hash256, BeaconState)> { let slot_map = self.block_map.blocks.get(&block_root)?; @@ -312,8 +349,21 @@ impl StateCache { .slots .iter() .rev() - .find_map(|(ancestor_slot, state_root)| { - (*ancestor_slot <= slot).then_some(*state_root) + .find_map(|(ancestor_slot, status_map)| { + if *ancestor_slot <= slot { + match payload_status { + PayloadStatusFilter::Full => status_map.get(&PayloadStatus::Full).copied(), + PayloadStatusFilter::Empty => { + status_map.get(&PayloadStatus::Empty).copied() + } + PayloadStatusFilter::Any => status_map + .get(&PayloadStatus::Full) + .or_else(|| status_map.get(&PayloadStatus::Empty)) + .copied(), + } + } else { + None + } })?; let state = self.get_by_state_root(state_root)?; @@ -327,8 +377,10 @@ impl StateCache { pub fn delete_block_states(&mut self, block_root: &Hash256) { if let Some(slot_map) = self.block_map.delete_block_states(block_root) { - for state_root in slot_map.slots.values() { - self.states.pop(state_root); + for status_map in slot_map.slots.values() { + for state_root in status_map.values() { + self.states.pop(state_root); + } } } } @@ -399,19 +451,30 @@ impl StateCache { } impl BlockMap { - fn insert(&mut self, block_root: Hash256, slot: Slot, state_root: Hash256) { - let slot_map = self.blocks.entry(block_root).or_default(); - slot_map.slots.insert(slot, state_root); + fn insert( + &mut self, + block_root: Hash256, + slot: Slot, + payload_status: PayloadStatus, + state_root: Hash256, + ) { + self.blocks + .entry(block_root) + .or_default() + .slots + .entry(slot) + .or_default() + .insert(payload_status, state_root); } fn prune(&mut self, finalized_slot: Slot) -> HashSet { let mut pruned_states = HashSet::new(); self.blocks.retain(|_, slot_map| { - slot_map.slots.retain(|slot, state_root| { + slot_map.slots.retain(|slot, status_map| { let keep = *slot >= finalized_slot; if !keep { - pruned_states.insert(*state_root); + pruned_states.extend(status_map.values()); } keep }); @@ -424,9 +487,10 @@ impl BlockMap { fn delete(&mut self, state_root_to_delete: &Hash256) { self.blocks.retain(|_, slot_map| { - slot_map - .slots - .retain(|_, state_root| state_root != state_root_to_delete); + slot_map.slots.retain(|_, status_map| { + status_map.retain(|_, state_root| state_root != state_root_to_delete); + !status_map.is_empty() + }); !slot_map.slots.is_empty() }); } @@ -512,3 +576,13 @@ impl HotHDiffBufferCache { .sum() } } + +impl PayloadStatus { + fn from_state(state: &BeaconState) -> Self { + if state.is_parent_block_full() { + Self::Full + } else { + Self::Empty + } + } +} diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 3edf1e0644d..621fbf47562 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -255,6 +255,13 @@ pub struct ProposerHeadInfo { pub current_slot: Slot, } +impl ProposerHeadInfo { + pub fn is_parent_block_full(&self) -> bool { + // TODO(gloas): Set to something once fork-choice is implemented + false + } +} + /// Error type to enable short-circuiting checks in `get_proposer_head`. /// /// This type intentionally does not implement `Debug` so that callers are forced to handle the