diff --git a/rs/consensus/dkg/src/lib.rs b/rs/consensus/dkg/src/lib.rs index 19958d91edbe..3a9821878279 100644 --- a/rs/consensus/dkg/src/lib.rs +++ b/rs/consensus/dkg/src/lib.rs @@ -9,14 +9,17 @@ use ic_interfaces::{ p2p::consensus::{Bouncer, BouncerFactory, BouncerValue, PoolMutationsProducer}, validation::ValidationResult, }; +use ic_interfaces_registry::RegistryClient; +use ic_interfaces_state_manager::StateReader; use ic_logger::{ReplicaLogger, error, info}; use ic_metrics::{ MetricsRegistry, buckets::{decimal_buckets, linear_buckets}, }; +use ic_replicated_state::ReplicatedState; use ic_types::{ - Height, NodeId, ReplicaVersion, - consensus::dkg::{DealingContent, DkgMessageId, InvalidDkgPayloadReason, Message}, + Height, NodeId, ReplicaVersion, SubnetId, + consensus::dkg::{DealingContent, DkgMessageId, DkgSummary, InvalidDkgPayloadReason, Message}, crypto::{ Signed, threshold_sig::ni_dkg::{NiDkgId, NiDkgTargetSubnet, config::NiDkgConfig}, @@ -25,7 +28,7 @@ use ic_types::{ use prometheus::Histogram; use rayon::prelude::*; use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, sync::{Arc, Mutex}, }; @@ -33,6 +36,7 @@ pub mod dkg_key_manager; pub mod payload_builder; pub mod payload_validator; +use crate::payload_builder::build_target_id_config_map; pub use crate::utils::get_vetkey_public_keys; #[cfg(test)] @@ -65,6 +69,9 @@ struct Metrics { /// changes in the consensus and DKG pool. pub struct DkgImpl { node_id: NodeId, + subnet_id: SubnetId, + registry_client: Arc, + state_reader: Arc>, crypto: Arc, consensus_cache: Arc, dkg_key_manager: Arc>, @@ -76,6 +83,9 @@ impl DkgImpl { /// Build a new DKG component pub fn new( node_id: NodeId, + subnet_id: SubnetId, + registry_client: Arc, + state_reader: Arc>, crypto: Arc, consensus_cache: Arc, dkg_key_manager: Arc>, @@ -83,9 +93,12 @@ impl DkgImpl { logger: ReplicaLogger, ) -> Self { Self { + node_id, + subnet_id, + registry_client, + state_reader, crypto, consensus_cache, - node_id, dkg_key_manager, logger, metrics: Metrics { @@ -291,6 +304,50 @@ fn get_handle_invalid_change_action>(message: &Message, reason: T) ChangeAction::HandleInvalid(DkgMessageId::from(message), reason.as_ref().to_string()) } +pub(crate) fn get_configs_for_start_height( + subnet_id: SubnetId, + registry_client: &dyn RegistryClient, + state_manager: &dyn StateManager, + registry_version: RegistryVersion, + start_height: Height, + dkg_summary: &DkgSummary, +) -> BTreeMap { + let mut summary_configs = dkg_summary.configs.clone(); + let Some(state) = state_manager.get_latest_certified_state() else { + return summary_configs; + }; + let map = build_target_id_config_map( + subnet_id, + start_height, + registry_client, + state.get_ref(), + registry_version, + dkg_summary.next_transcripts(), + &BTreeSet::new(), + ); + for (_, config_results) in map { + let (mut configs, mut errs) = (vec![], vec![]); + for config_result in config_results { + match config_result { + Ok(config) => configs.push(config), + Err((dkg_id, err)) => errs.push((dkg_id, err)), + } + } + if !errs.is_empty() { + continue; + } + for config in &configs { + if summary_configs.contains_key(&config.dkg_id()) { + continue; + } + } + for config in configs { + summary_configs.insert(config.dkg_id().clone(), config); + } + } + summary_configs +} + impl PoolMutationsProducer for DkgImpl { type Mutations = Mutations; @@ -306,8 +363,15 @@ impl PoolMutationsProducer for DkgImpl { return ChangeAction::Purge(start_height).into(); } - let change_set: Mutations = dkg_summary - .configs + let configs = get_configs_for_start_height( + self.subnet_id, + self.registry_client.as_ref(), + self.state_reader.as_ref(), + self.registry_client.get_latest_version(), + start_height, + dkg_summary, + ); + let change_set: Mutations = configs .par_iter() .filter_map(|(_id, config)| self.create_dealing(dkg_pool, config)) .collect(); @@ -336,7 +400,7 @@ impl PoolMutationsProducer for DkgImpl { .map(|dealings| { self.validate_dealings_for_dealer( dkg_pool, - &dkg_summary.configs, + &configs, start_height, dealings.to_vec(), ) diff --git a/rs/consensus/dkg/src/payload_builder.rs b/rs/consensus/dkg/src/payload_builder.rs index 9d34269373b8..2a2ab147ab0f 100644 --- a/rs/consensus/dkg/src/payload_builder.rs +++ b/rs/consensus/dkg/src/payload_builder.rs @@ -1,6 +1,6 @@ use crate::{ MAX_EARLY_REMOTE_TRANSCRIPTS, MAX_REMOTE_DKG_ATTEMPTS, MAX_REMOTE_DKGS_PER_INTERVAL, - REMOTE_DKG_REPEATED_FAILURE_ERROR, + REMOTE_DKG_REPEATED_FAILURE_ERROR, get_configs_for_start_height, utils::{self, tags_iter, vetkd_key_ids_for_subnet}, }; use ic_consensus_utils::{crypto::ConsensusCrypto, pool_reader::PoolReader}; @@ -80,6 +80,8 @@ pub fn create_payload( // If the height is not a start height, create a payload with new dealings, // and possibly early remote transcripts. create_data_payload( + subnet_id, + registry_client, pool_reader, dkg_pool, parent, @@ -96,6 +98,8 @@ pub fn create_payload( } fn create_data_payload( + this_subnet_id: SubnetId, + registry_client: &dyn RegistryClient, pool_reader: &PoolReader<'_>, dkg_pool: Arc>, parent: &Block, @@ -109,19 +113,40 @@ fn create_data_payload( ) -> Result { // Get all existing dealer ids from the chain. let dealers_from_chain = utils::get_dealers_from_chain(pool_reader, parent); + let configs = get_configs_for_start_height( + this_subnet_id, + registry_client, + state_manager, + validation_context.registry_version, + last_summary_block.height, + last_dkg_summary, + ); // Select new dealings for the payload. let pool_lock = dkg_pool .read() .expect("Couldn't lock DKG pool for reading."); let new_validated_dealings = select_dealings_for_payload( - &last_dkg_summary.configs, + &configs, &dealers_from_chain, &*pool_lock, max_dealings_per_block, ); drop(pool_lock); + for d in &new_validated_dealings { + if matches!(d.content.dkg_id.target_subnet, NiDkgTargetSubnet::Remote(_)) { + info!( + logger, + "Including remote dealing for dkg id {:?} in data block payload at height {}", + d.content.dkg_id, + parent.height.increment(), + ); + } + } let remote_dkg_transcripts = create_early_remote_transcripts( + this_subnet_id, + last_summary_block.height, + registry_client, pool_reader, crypto, parent, @@ -207,6 +232,9 @@ fn select_dealings_for_payload( #[allow(clippy::type_complexity)] pub(crate) fn create_early_remote_transcripts( + this_subnet_id: SubnetId, + start_block_height: Height, + registry_client: &dyn RegistryClient, pool_reader: &PoolReader<'_>, crypto: &dyn ConsensusCrypto, parent: &Block, @@ -220,58 +248,60 @@ pub(crate) fn create_early_remote_transcripts( .get_state_at(validation_context.certified_height) .map_err(DkgPayloadCreationError::StateManagerError)?; - // Since this function is relatively expensive, we simply return if there are no outstanding DKG contexts - let callback_id_map = build_target_id_callback_map(state.get_ref()); - if callback_id_map.is_empty() { - return Ok(vec![]); - } - // Get all dealings for DKGs that have not been completed yet let (all_dealings, completed) = utils::get_dkg_dealings(pool_reader, parent); + let completed_target_ids = + get_completed_target_ids(last_dkg_summary.configs.keys(), &completed); - // Collect map of remote target_ids to DKG configs - let mut remote_configs: BTreeMap> = BTreeMap::new(); - for config in last_dkg_summary.configs.values() { - let dkg_id = config.dkg_id(); - if completed.contains(dkg_id) { - // Skip DKGs that have already been completed - continue; - } - if let NiDkgTargetSubnet::Remote(target_id) = dkg_id.target_subnet { - remote_configs.entry(target_id).or_default().push(config); - } + // Since this function is relatively expensive, we simply return if there are no outstanding DKG contexts + let callback_id_map = build_target_id_config_map( + this_subnet_id, + start_block_height, + registry_client, + state.get_ref(), + validation_context.registry_version, + last_dkg_summary.next_transcripts(), + &completed_target_ids, + ); + if callback_id_map.is_empty() { + return Ok(vec![]); } // Try to create transcripts for all configs of each target_id. Note that we either include // all transcript results for a target_id or none of them. let mut selected_transcripts = vec![]; - for (target_id, configs) in remote_configs { - // Lookup the callback id and the expected number of configs for this target_id - let Some((expected_config_num, callback_id)) = callback_id_map.get(&target_id) else { - warn!( - logger, - "Unable to find callback id associated with remote target id {target_id:?} at block height {}", - parent.height.increment() - ); + for (callback_id, config_results) in callback_id_map.into_iter() { + // Ensure that creating these transcripts would not exceed the maximum number of early + // remote transcripts. We continue with the next target_id in case it requires less + // transcripts. + if selected_transcripts.len() + config_results.len() > MAX_EARLY_REMOTE_TRANSCRIPTS { continue; - }; + } - // Check that we have the expected number of configs for this target_id - if configs.len() != *expected_config_num { - // This may happen if we did not manage to create all required transcripts as part of - // the last summary block. We will handle this in the next summary block instead. - continue; + let (mut configs, mut errs) = (vec![], vec![]); + for config_result in config_results { + match config_result { + Ok(config) => configs.push(config), + Err((dkg_id, err)) => errs.push((dkg_id, err)), + } } - // Ensure that creating these transcripts would not exceed the maximum number of early - // remote transcripts. We continue with the next target_id in case it requires less - // transcripts. - if selected_transcripts.len() + configs.len() > MAX_EARLY_REMOTE_TRANSCRIPTS { + if !errs.is_empty() { + for (dkg_id, err) in errs { + error!( + logger, + "Failed to create early remote transcript for dkg id {:?} at height {}: {}", + dkg_id, + parent.height.increment(), + err + ); + selected_transcripts.push((dkg_id, callback_id, Err(err))); + } continue; } // If any of the configs has less dealings than the threshold, we skip this target_id - if configs.iter().any(|config| { + if configs.iter().any(|config: &NiDkgConfig| { let dealings_count = all_dealings .get(config.dkg_id()) .map_or(0, |dealings| dealings.len()); @@ -306,7 +336,7 @@ pub(crate) fn create_early_remote_transcripts( return Err(DkgPayloadCreationError::DkgCreateTranscriptError(err)); } }; - selected_transcripts.push((config.dkg_id().clone(), *callback_id, transcript_result)); + selected_transcripts.push((config.dkg_id().clone(), callback_id, transcript_result)); } } @@ -1081,6 +1111,94 @@ fn build_target_id_callback_map( .collect() } +pub fn build_target_id_config_map( + dealer_subnet: SubnetId, + start_block_height: Height, + registry_client: &dyn RegistryClient, + state: &ReplicatedState, + registry_version: RegistryVersion, + reshared_transcripts: &BTreeMap, + completed_target_ids: &BTreeSet, +) -> BTreeMap>> { + let call_contexts = &state.metadata.subnet_call_context_manager; + let setup_initial_dkg_configs = call_contexts + .setup_initial_dkg_contexts + .iter() + .filter(|(_, context)| { + context.registry_version <= registry_version + && !completed_target_ids.contains(&context.target_id) + }) + .filter_map(|(&callback_id, context)| { + let dealers = + get_node_list(dealer_subnet, registry_client, context.registry_version).ok()?; + let low_thr_dkg_id = NiDkgId { + start_block_height, + dealer_subnet, + dkg_tag: NiDkgTag::LowThreshold, + target_subnet: NiDkgTargetSubnet::Remote(context.target_id), + }; + let high_thr_dkg_id = NiDkgId { + start_block_height, + dealer_subnet, + dkg_tag: NiDkgTag::HighThreshold, + target_subnet: NiDkgTargetSubnet::Remote(context.target_id), + }; + let low_thr_config = create_remote_dkg_config( + low_thr_dkg_id.clone(), + &dealers, + &context.nodes_in_target_subnet, + &context.registry_version, + None, + ) + .map_err(|err| (low_thr_dkg_id, err.to_string())); + let high_thr_config = create_remote_dkg_config( + high_thr_dkg_id.clone(), + &dealers, + &context.nodes_in_target_subnet, + &context.registry_version, + None, + ) + .map_err(|err| (high_thr_dkg_id, err.to_string())); + + Some((callback_id, vec![low_thr_config, high_thr_config])) + }); + + // rehsare chain key contexts + let reshare_chain_key_configs = call_contexts + .reshare_chain_key_contexts + .iter() + .filter(|(_, context)| { + context.registry_version <= registry_version + && !completed_target_ids.contains(&context.target_id) + }) + .filter_map(|(&callback_id, context)| { + let key_id = NiDkgMasterPublicKeyId::try_from(context.key_id.clone()).ok()?; + let tag = NiDkgTag::HighThresholdForKey(key_id); + let reshared_transcript = reshared_transcripts.get(&tag)?; + let dealers = + get_node_list(dealer_subnet, registry_client, context.registry_version).ok()?; + let dkg_id = NiDkgId { + start_block_height, + dealer_subnet, + dkg_tag: tag, + target_subnet: NiDkgTargetSubnet::Remote(context.target_id), + }; + let config = create_remote_dkg_config( + dkg_id.clone(), + &dealers, + &context.nodes, + &context.registry_version, + Some(reshared_transcript.clone()), + ) + .map_err(|err| (dkg_id, err.to_string())); + Some((callback_id, vec![config])) + }); + + setup_initial_dkg_configs + .chain(reshare_chain_key_configs) + .collect() +} + fn add_callback_ids_to_transcript_results( new_transcripts: BTreeMap>, state: &ReplicatedState, diff --git a/rs/consensus/dkg/src/payload_validator.rs b/rs/consensus/dkg/src/payload_validator.rs index 5b0807c5d387..ee451c911530 100644 --- a/rs/consensus/dkg/src/payload_validator.rs +++ b/rs/consensus/dkg/src/payload_validator.rs @@ -1,3 +1,5 @@ +use crate::get_configs_for_start_height; + use self::payload_builder::create_early_remote_transcripts; use super::{crypto_validate_dealing, payload_builder, utils}; use ic_consensus_utils::{crypto::ConsensusCrypto, pool_reader::PoolReader}; @@ -11,7 +13,7 @@ use ic_logger::{ReplicaLogger, info, warn}; use ic_registry_client_helpers::subnet::SubnetRegistry; use ic_replicated_state::ReplicatedState; use ic_types::{ - SubnetId, + Height, SubnetId, batch::ValidationContext, consensus::{ Block, BlockPayload, @@ -117,6 +119,9 @@ pub fn validate_payload( }); validate_dealings_payload( + subnet_id, + last_summary_block.height, + registry_client, crypto, pool_reader, dkg_pool, @@ -136,6 +141,9 @@ pub fn validate_payload( // Validates the payload containing dealings. #[allow(clippy::result_large_err)] fn validate_dealings_payload( + subnet_id: SubnetId, + start_block_height: Height, + registry_client: &dyn RegistryClient, crypto: &dyn ConsensusCrypto, pool_reader: &PoolReader<'_>, dkg_pool: &dyn DkgPool, @@ -181,6 +189,15 @@ fn validate_dealings_payload( return Err(InvalidDkgPayloadReason::DealerAlreadyDealt(dealer_id).into()); } + let configs = get_configs_for_start_height( + subnet_id, + registry_client, + state_manager, + validation_context.registry_version, + start_block_height, + last_summary, + ); + // Check that all messages have a valid DKG config from the summary and the // dealer is valid, then verify each dealing. for message in &dealings.messages { @@ -192,7 +209,7 @@ fn validate_dealings_payload( continue; } - let Some(config) = last_summary.configs.get(&message.content.dkg_id) else { + let Some(config) = configs.get(&message.content.dkg_id) else { return Err(InvalidDkgPayloadReason::MissingDkgConfigForDealing.into()); }; @@ -203,6 +220,9 @@ fn validate_dealings_payload( // If we have early transcripts, we compare them if !dealings.transcripts_for_remote_subnets.is_empty() { let expected_transcripts = create_early_remote_transcripts( + subnet_id, + start_block_height, + registry_client, pool_reader, crypto, parent, diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index 7533bafd7cf9..a77d1c29a308 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -614,6 +614,9 @@ fn start_consensus( abortable_broadcast_channels.dkg, ic_consensus_dkg::DkgImpl::new( node_id, + subnet_id, + Arc::clone(®istry_client), + Arc::clone(&state_manager) as Arc<_>, Arc::clone(&consensus_crypto), Arc::clone(&consensus_pool_cache), dkg_key_manager, diff --git a/rs/tests/consensus/subnet_recovery/common.rs b/rs/tests/consensus/subnet_recovery/common.rs index 6c2e1b23d984..6fce97630145 100644 --- a/rs/tests/consensus/subnet_recovery/common.rs +++ b/rs/tests/consensus/subnet_recovery/common.rs @@ -38,7 +38,10 @@ use anyhow::bail; use canister_test::Canister; use ic_base_types::NodeId; use ic_consensus_system_test_utils::{ - node::{assert_node_is_assigned_with_ssh_session, assert_node_is_unassigned_with_ssh_session}, + node::{ + assert_node_is_assigned_with_ssh_session, assert_node_is_unassigned_with_ssh_session, + await_node_certified_height, + }, rw_message::{install_nns_and_check_progress, store_message}, ssh_access::{disable_ssh_access_to_node, wait_until_authentication_is_granted}, subnet::{ @@ -85,7 +88,7 @@ const NNS_NODES_LARGE: usize = 40; const APP_NODES_LARGE: usize = 37; /// 40 dealings * 4 transcripts being reshared (high/local, low/local, high/remote, low/remote) /// plus 14 as a safety margin -const DKG_INTERVAL_LARGE: u64 = 4 * NNS_NODES_LARGE as u64 + 14; +const DKG_INTERVAL_LARGE: u64 = 499; /// A very large DKG interval to test recovery when the subnet stalls during its first DKG /// interval. @@ -417,12 +420,21 @@ fn app_subnet_recovery_test(env: TestEnv, cfg: TestConfig) { )); // The first application subnet encountered during iteration is the source subnet because it was inserted first. - let source_subnet_id = env + let source_subnet = env .topology_snapshot() .subnets() .find(|subnet| subnet.subnet_type() == SubnetType::Application) - .expect("there is no source subnet") - .subnet_id; + .expect("there is no source subnet"); + + let source_subnet_id = source_subnet.subnet_id; + await_node_certified_height( + &source_subnet + .nodes() + .next() + .expect("there is no node in the source subnet"), + Height::from(1000), + logger.clone(), + ); let create_new_subnet = !topology_snapshot .subnets()