Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 71 additions & 7 deletions rs/consensus/dkg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ use ic_interfaces::{
p2p::consensus::{Bouncer, BouncerFactory, BouncerValue, PoolMutationsProducer},
validation::ValidationResult,
};
use ic_interfaces_registry::RegistryClient;
use ic_interfaces_state_manager::StateReader;
use ic_logger::{ReplicaLogger, error, info};
use ic_metrics::{
MetricsRegistry,
buckets::{decimal_buckets, linear_buckets},
};
use ic_replicated_state::ReplicatedState;
use ic_types::{
Height, NodeId, ReplicaVersion,
consensus::dkg::{DealingContent, DkgMessageId, InvalidDkgPayloadReason, Message},
Height, NodeId, ReplicaVersion, SubnetId,
consensus::dkg::{DealingContent, DkgMessageId, DkgSummary, InvalidDkgPayloadReason, Message},
crypto::{
Signed,
threshold_sig::ni_dkg::{NiDkgId, NiDkgTargetSubnet, config::NiDkgConfig},
Expand All @@ -25,14 +28,15 @@ use ic_types::{
use prometheus::Histogram;
use rayon::prelude::*;
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
sync::{Arc, Mutex},
};

pub mod dkg_key_manager;
pub mod payload_builder;
pub mod payload_validator;

use crate::payload_builder::build_target_id_config_map;
pub use crate::utils::get_vetkey_public_keys;

#[cfg(test)]
Expand Down Expand Up @@ -65,6 +69,9 @@ struct Metrics {
/// changes in the consensus and DKG pool.
pub struct DkgImpl {
node_id: NodeId,
subnet_id: SubnetId,
registry_client: Arc<dyn RegistryClient>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
crypto: Arc<dyn ConsensusCrypto>,
consensus_cache: Arc<dyn ConsensusPoolCache>,
dkg_key_manager: Arc<Mutex<DkgKeyManager>>,
Expand All @@ -76,16 +83,22 @@ impl DkgImpl {
/// Build a new DKG component
pub fn new(
node_id: NodeId,
subnet_id: SubnetId,
registry_client: Arc<dyn RegistryClient>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
crypto: Arc<dyn ConsensusCrypto>,
consensus_cache: Arc<dyn ConsensusPoolCache>,
dkg_key_manager: Arc<Mutex<DkgKeyManager>>,
metrics_registry: ic_metrics::MetricsRegistry,
logger: ReplicaLogger,
) -> Self {
Self {
node_id,
subnet_id,
registry_client,
state_reader,
crypto,
consensus_cache,
node_id,
dkg_key_manager,
logger,
metrics: Metrics {
Expand Down Expand Up @@ -291,6 +304,50 @@ fn get_handle_invalid_change_action<T: AsRef<str>>(message: &Message, reason: T)
ChangeAction::HandleInvalid(DkgMessageId::from(message), reason.as_ref().to_string())
}

pub(crate) fn get_configs_for_start_height(
subnet_id: SubnetId,
registry_client: &dyn RegistryClient,
state_manager: &dyn StateManager<State = ReplicatedState>,
registry_version: RegistryVersion,
start_height: Height,
dkg_summary: &DkgSummary,
) -> BTreeMap<NiDkgId, NiDkgConfig> {
let mut summary_configs = dkg_summary.configs.clone();
let Some(state) = state_manager.get_latest_certified_state() else {
return summary_configs;
};
let map = build_target_id_config_map(
subnet_id,
start_height,
registry_client,
state.get_ref(),
registry_version,
dkg_summary.next_transcripts(),
&BTreeSet::new(),
);
for (_, config_results) in map {
let (mut configs, mut errs) = (vec![], vec![]);
for config_result in config_results {
match config_result {
Ok(config) => configs.push(config),
Err((dkg_id, err)) => errs.push((dkg_id, err)),
}
}
if !errs.is_empty() {
continue;
}
for config in &configs {
if summary_configs.contains_key(&config.dkg_id()) {
continue;
}
}
for config in configs {
summary_configs.insert(config.dkg_id().clone(), config);
}
}
summary_configs
}

impl<T: DkgPool> PoolMutationsProducer<T> for DkgImpl {
type Mutations = Mutations;

Expand All @@ -306,8 +363,15 @@ impl<T: DkgPool> PoolMutationsProducer<T> for DkgImpl {
return ChangeAction::Purge(start_height).into();
}

let change_set: Mutations = dkg_summary
.configs
let configs = get_configs_for_start_height(
self.subnet_id,
self.registry_client.as_ref(),
self.state_reader.as_ref(),
self.registry_client.get_latest_version(),
start_height,
dkg_summary,
);
let change_set: Mutations = configs
.par_iter()
.filter_map(|(_id, config)| self.create_dealing(dkg_pool, config))
.collect();
Expand Down Expand Up @@ -336,7 +400,7 @@ impl<T: DkgPool> PoolMutationsProducer<T> for DkgImpl {
.map(|dealings| {
self.validate_dealings_for_dealer(
dkg_pool,
&dkg_summary.configs,
&configs,
start_height,
dealings.to_vec(),
)
Expand Down
Loading
Loading