diff --git a/app/app.go b/app/app.go index 584373efe..b1314d0aa 100644 --- a/app/app.go +++ b/app/app.go @@ -630,7 +630,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, // Consensus consensusController, err := consensus.NewConsensusController( - ctx, p2pNode, sender, peers, p2pKey, + ctx, eth2Cl, p2pNode, sender, peers, p2pKey, deadlineFunc, gaterFunc, consensusDebugger, featureset.Enabled(featureset.ChainSplitHalt)) if err != nil { return err diff --git a/core/consensus/controller.go b/core/consensus/controller.go index fa8f93300..76201b4bc 100644 --- a/core/consensus/controller.go +++ b/core/consensus/controller.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/eth2wrap" "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/core/consensus/qbft" "github.com/obolnetwork/charon/p2p" @@ -37,13 +38,13 @@ type consensusController struct { } // NewConsensusController creates a new consensus controller with the default consensus protocol. -func NewConsensusController(ctx context.Context, p2pNode host.Host, sender *p2p.Sender, +func NewConsensusController(ctx context.Context, eth2Cl eth2wrap.Client, p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey, deadlineFunc core.DeadlineFunc, gaterFunc core.DutyGaterFunc, debugger Debugger, compareAttestations bool, ) (core.ConsensusController, error) { qbftDeadliner := core.NewDeadliner(ctx, "consensus.qbft", deadlineFunc) - defaultConsensus, err := qbft.NewConsensus(p2pNode, sender, peers, p2pKey, qbftDeadliner, gaterFunc, debugger.AddInstance, compareAttestations) + defaultConsensus, err := qbft.NewConsensus(ctx, eth2Cl, p2pNode, sender, peers, p2pKey, qbftDeadliner, gaterFunc, debugger.AddInstance, compareAttestations) if err != nil { return nil, err } diff --git a/core/consensus/controller_test.go b/core/consensus/controller_test.go index 8e7f0184c..8572465e0 100644 --- a/core/consensus/controller_test.go +++ b/core/consensus/controller_test.go @@ -23,6 +23,7 @@ import ( "github.com/obolnetwork/charon/eth2util/enr" "github.com/obolnetwork/charon/p2p" "github.com/obolnetwork/charon/testutil" + "github.com/obolnetwork/charon/testutil/beaconmock" ) func TestConsensusController(t *testing.T) { @@ -61,7 +62,11 @@ func TestConsensusController(t *testing.T) { debugger := csmocks.NewDebugger(t) ctx := context.Background() - controller, err := consensus.NewConsensusController(ctx, hosts[0], new(p2p.Sender), peers, p2pkeys[0], deadlineFunc, gaterFunc, debugger, false) + // Create a mock beacon client for test + bmock, err := beaconmock.New(ctx) + require.NoError(t, err) + + controller, err := consensus.NewConsensusController(ctx, bmock, hosts[0], new(p2p.Sender), peers, p2pkeys[0], deadlineFunc, gaterFunc, debugger, false) require.NoError(t, err) require.NotNil(t, controller) diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index 31f399ec2..77caa588e 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -21,6 +21,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/eth2wrap" "github.com/obolnetwork/charon/app/featureset" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" @@ -245,7 +246,7 @@ func attestationChecker(ctx context.Context, attLeaderSet *pbv1.UnsignedDataSet, } // NewConsensus returns a new consensus QBFT component. -func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey, +func NewConsensus(ctx context.Context, eth2Cl eth2wrap.Client, p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey, deadliner core.Deadliner, gaterFunc core.DutyGaterFunc, snifferFunc func(*pbv1.SniffedConsensusInstance), compareAttestations bool, ) (*Consensus, error) { // Extract peer pubkeys. @@ -263,6 +264,16 @@ func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKe keys[int64(i)] = pk } + genesisTime, err := eth2wrap.FetchGenesisTime(ctx, eth2Cl) + if err != nil { + return nil, errors.Wrap(err, "fetch genesis time") + } + + slotDuration, _, err := eth2wrap.FetchSlotsConfig(ctx, eth2Cl) + if err != nil { + return nil, errors.Wrap(err, "fetch slot duration") + } + c := &Consensus{ p2pNode: p2pNode, sender: sender, @@ -274,7 +285,7 @@ func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKe snifferFunc: snifferFunc, gaterFunc: gaterFunc, dropFilter: log.Filter(), - timerFunc: timer.GetRoundTimerFunc(), + timerFunc: timer.GetRoundTimerFunc(genesisTime, slotDuration), metrics: metrics.NewConsensusMetrics(protocols.QBFTv2ProtocolID), compareAttestations: compareAttestations, } diff --git a/core/consensus/qbft/qbft_internal_test.go b/core/consensus/qbft/qbft_internal_test.go index 577b1baaa..419e6a73e 100644 --- a/core/consensus/qbft/qbft_internal_test.go +++ b/core/consensus/qbft/qbft_internal_test.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "testing" + "time" k1 "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/stretchr/testify/mock" @@ -519,7 +520,8 @@ func TestInstanceIO_MaybeStart(t *testing.T) { c.deadliner = deadliner c.gaterFunc = func(core.Duty) bool { return true } c.mutable.instances = make(map[core.Duty]*instance.IO[Msg]) - c.timerFunc = timer.GetRoundTimerFunc() + // Use zero values for tests to use default clock.Now() behavior + c.timerFunc = timer.GetRoundTimerFunc(time.Time{}, 0) // Generate a p2p private key pair. p2pKey := testutil.GenerateInsecureK1Key(t, 0) diff --git a/core/consensus/qbft/qbft_test.go b/core/consensus/qbft/qbft_test.go index 714e9861a..3d3b79116 100644 --- a/core/consensus/qbft/qbft_test.go +++ b/core/consensus/qbft/qbft_test.go @@ -7,6 +7,7 @@ import ( "fmt" "math/rand" "testing" + "time" "github.com/libp2p/go-libp2p" libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" @@ -27,6 +28,7 @@ import ( "github.com/obolnetwork/charon/eth2util/enr" "github.com/obolnetwork/charon/p2p" "github.com/obolnetwork/charon/testutil" + "github.com/obolnetwork/charon/testutil/beaconmock" ) func TestQBFTConsensus(t *testing.T) { @@ -126,7 +128,13 @@ func testQBFTConsensus(t *testing.T, threshold, nodes int) { deadliner := coremocks.NewDeadliner(t) deadliner.On("Add", mock.Anything).Return(true) deadliner.On("C").Return(nil) - c, err := qbft.NewConsensus(hosts[i], new(p2p.Sender), peers, p2pkeys[i], deadliner, gaterFunc, sniffer, false) + + // Create a mock beacon client for test + // Use zero genesis time so timer uses relative timing instead of absolute slot-based timing + bmock, err := beaconmock.New(t.Context(), beaconmock.WithGenesisTime(time.Time{})) + require.NoError(t, err) + + c, err := qbft.NewConsensus(t.Context(), bmock, hosts[i], new(p2p.Sender), peers, p2pkeys[i], deadliner, gaterFunc, sniffer, false) require.NoError(t, err) c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error { results <- set diff --git a/core/consensus/timer/roundtimer.go b/core/consensus/timer/roundtimer.go index cf4f5f5f4..4ea0729c0 100644 --- a/core/consensus/timer/roundtimer.go +++ b/core/consensus/timer/roundtimer.go @@ -23,14 +23,15 @@ const ( type RoundTimerFunc func(core.Duty) RoundTimer // GetRoundTimerFunc returns a timer function based on the enabled features. -func GetRoundTimerFunc() RoundTimerFunc { +// Genesis time and slot duration are required to calculate deterministic slot start times. +func GetRoundTimerFunc(genesisTime time.Time, slotDuration time.Duration) RoundTimerFunc { if featureset.Enabled(featureset.Linear) { return func(duty core.Duty) RoundTimer { // Linear timer only affects Proposer duty if duty.Type == core.DutyProposer { return NewLinearRoundTimerWithDuty(duty) } else if featureset.Enabled(featureset.EagerDoubleLinear) { - return NewDoubleEagerLinearRoundTimerWithDuty(duty) + return NewDoubleEagerLinearRoundTimerWithDutyAndTiming(duty, genesisTime, slotDuration) } return NewIncreasingRoundTimerWithDuty(duty) @@ -38,7 +39,9 @@ func GetRoundTimerFunc() RoundTimerFunc { } if featureset.Enabled(featureset.EagerDoubleLinear) { - return NewDoubleEagerLinearRoundTimerWithDuty + return func(duty core.Duty) RoundTimer { + return NewDoubleEagerLinearRoundTimerWithDutyAndTiming(duty, genesisTime, slotDuration) + } } // Default to increasing round timer. @@ -84,6 +87,20 @@ func proposalTimeoutOptimization(duty core.Duty, round int64) bool { return featureset.Enabled(featureset.ProposalTimeout) && duty.Type == core.DutyProposer && round == 1 } +// getDutyStartDelayWithDuration returns the delay from slot start to when a duty is scheduled to begin, +// given the slot duration. This matches the scheduler's slot offsets to ensure timers align with when +// consensus actually starts. +func getDutyStartDelayWithDuration(dutyType core.DutyType, slotDuration time.Duration) time.Duration { + switch dutyType { + case core.DutyAttester: + return slotDuration / 3 + case core.DutyAggregator, core.DutySyncContribution: + return (2 * slotDuration) / 3 + default: + return 0 + } +} + // NewIncreasingRoundTimer returns a new increasing round timer type. func NewIncreasingRoundTimer() RoundTimer { return NewIncreasingRoundTimerWithClock(clockwork.NewRealClock()) @@ -164,6 +181,30 @@ func NewDoubleEagerLinearRoundTimerWithDutyAndClock(duty core.Duty, clock clockw } } +// NewDoubleEagerLinearRoundTimerWithDutyAndTiming returns a new eager double linear round timer type for a specific duty with genesis time and slot duration. +// This ensures deterministic behavior across all nodes by using slot start time as the reference. +func NewDoubleEagerLinearRoundTimerWithDutyAndTiming(duty core.Duty, genesisTime time.Time, slotDuration time.Duration) RoundTimer { + return &doubleEagerLinearRoundTimer{ + clock: clockwork.NewRealClock(), + duty: duty, + genesisTime: genesisTime, + slotDuration: slotDuration, + firstDeadlines: make(map[int64]time.Time), + } +} + +// NewDoubleEagerLinearRoundTimerWithDutyTimingAndClock returns a new eager double linear round timer type for a specific duty, genesis time, slot duration, and custom clock. +// This is primarily used for testing with fake clocks. +func NewDoubleEagerLinearRoundTimerWithDutyTimingAndClock(duty core.Duty, genesisTime time.Time, slotDuration time.Duration, clock clockwork.Clock) RoundTimer { + return &doubleEagerLinearRoundTimer{ + clock: clock, + duty: duty, + genesisTime: genesisTime, + slotDuration: slotDuration, + firstDeadlines: make(map[int64]time.Time), + } +} + // doubleEagerLinearRoundTimer implements a round timerType with the following properties: // // It doubles the round duration when a leader is active. @@ -179,8 +220,10 @@ func NewDoubleEagerLinearRoundTimerWithDutyAndClock(duty core.Duty, clock clockw // // It is linear, meaning the round duration increases linearly with the round number: 1s, 2s, 3s, etc. type doubleEagerLinearRoundTimer struct { - clock clockwork.Clock - duty core.Duty + clock clockwork.Clock + duty core.Duty + genesisTime time.Time + slotDuration time.Duration mu sync.Mutex firstDeadlines map[int64]time.Time @@ -206,8 +249,22 @@ func (t *doubleEagerLinearRoundTimer) Timer(round int64) (<-chan time.Time, func // Deadline is either double the first timeout deadline = first.Add(timeout) } else { - // Or the first timeout - deadline = t.clock.Now().Add(timeout) + // Calculate the first deadline. + // If genesisTime and slotDuration are set, use slot start time for determinism. + // Otherwise, fall back to clock.Now(). + if !t.genesisTime.IsZero() && t.slotDuration > 0 { + // Calculate slot start time deterministically from duty slot number. + slotStart := t.genesisTime.Add(t.slotDuration * time.Duration(t.duty.Slot)) + + // Add duty-specific delay to account for when the duty is scheduled to start. + dutyDelay := getDutyStartDelayWithDuration(t.duty.Type, t.slotDuration) + dutyStart := slotStart.Add(dutyDelay) + + // Deadline is duty start time plus the round timeout + deadline = dutyStart.Add(timeout) + } else { + deadline = t.clock.Now().Add(timeout) + } t.firstDeadlines[round] = deadline } diff --git a/core/consensus/timer/roundtimer_test.go b/core/consensus/timer/roundtimer_test.go index 8214a97f4..5e1d5ff5c 100644 --- a/core/consensus/timer/roundtimer_test.go +++ b/core/consensus/timer/roundtimer_test.go @@ -164,21 +164,25 @@ func TestLinearRoundTimer(t *testing.T) { } func TestGetTimerFunc(t *testing.T) { - timerFunc := timer.GetRoundTimerFunc() + // Use zero values for tests to use default clock.Now() behavior + genesisTime := time.Time{} + slotDuration := time.Duration(0) + + timerFunc := timer.GetRoundTimerFunc(genesisTime, slotDuration) require.Equal(t, timer.TimerEagerDoubleLinear, timerFunc(core.NewAttesterDuty(0)).Type()) require.Equal(t, timer.TimerEagerDoubleLinear, timerFunc(core.NewAttesterDuty(1)).Type()) require.Equal(t, timer.TimerEagerDoubleLinear, timerFunc(core.NewAttesterDuty(2)).Type()) featureset.DisableForT(t, featureset.EagerDoubleLinear) - timerFunc = timer.GetRoundTimerFunc() + timerFunc = timer.GetRoundTimerFunc(genesisTime, slotDuration) require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(0)).Type()) require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(1)).Type()) require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(2)).Type()) featureset.EnableForT(t, featureset.Linear) - timerFunc = timer.GetRoundTimerFunc() + timerFunc = timer.GetRoundTimerFunc(genesisTime, slotDuration) // non proposer duty, defaults to increasing require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(0)).Type()) require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(1)).Type())