Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,

// Consensus
consensusController, err := consensus.NewConsensusController(
ctx, p2pNode, sender, peers, p2pKey,
ctx, eth2Cl, p2pNode, sender, peers, p2pKey,
deadlineFunc, gaterFunc, consensusDebugger, featureset.Enabled(featureset.ChainSplitHalt))
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions core/consensus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/consensus/qbft"
"github.com/obolnetwork/charon/p2p"
Expand All @@ -37,13 +38,13 @@ type consensusController struct {
}

// NewConsensusController creates a new consensus controller with the default consensus protocol.
func NewConsensusController(ctx context.Context, p2pNode host.Host, sender *p2p.Sender,
func NewConsensusController(ctx context.Context, eth2Cl eth2wrap.Client, p2pNode host.Host, sender *p2p.Sender,
peers []p2p.Peer, p2pKey *k1.PrivateKey, deadlineFunc core.DeadlineFunc,
gaterFunc core.DutyGaterFunc, debugger Debugger, compareAttestations bool,
) (core.ConsensusController, error) {
qbftDeadliner := core.NewDeadliner(ctx, "consensus.qbft", deadlineFunc)

defaultConsensus, err := qbft.NewConsensus(p2pNode, sender, peers, p2pKey, qbftDeadliner, gaterFunc, debugger.AddInstance, compareAttestations)
defaultConsensus, err := qbft.NewConsensus(ctx, eth2Cl, p2pNode, sender, peers, p2pKey, qbftDeadliner, gaterFunc, debugger.AddInstance, compareAttestations)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion core/consensus/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/obolnetwork/charon/eth2util/enr"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/testutil"
"github.com/obolnetwork/charon/testutil/beaconmock"
)

func TestConsensusController(t *testing.T) {
Expand Down Expand Up @@ -61,7 +62,11 @@ func TestConsensusController(t *testing.T) {
debugger := csmocks.NewDebugger(t)
ctx := context.Background()

controller, err := consensus.NewConsensusController(ctx, hosts[0], new(p2p.Sender), peers, p2pkeys[0], deadlineFunc, gaterFunc, debugger, false)
// Create a mock beacon client for test
bmock, err := beaconmock.New(ctx)
require.NoError(t, err)

controller, err := consensus.NewConsensusController(ctx, bmock, hosts[0], new(p2p.Sender), peers, p2pkeys[0], deadlineFunc, gaterFunc, debugger, false)
require.NoError(t, err)
require.NotNil(t, controller)

Expand Down
15 changes: 13 additions & 2 deletions core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
Expand Down Expand Up @@ -245,7 +246,7 @@ func attestationChecker(ctx context.Context, attLeaderSet *pbv1.UnsignedDataSet,
}

// NewConsensus returns a new consensus QBFT component.
func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey,
func NewConsensus(ctx context.Context, eth2Cl eth2wrap.Client, p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey,
deadliner core.Deadliner, gaterFunc core.DutyGaterFunc, snifferFunc func(*pbv1.SniffedConsensusInstance), compareAttestations bool,
) (*Consensus, error) {
// Extract peer pubkeys.
Expand All @@ -263,6 +264,16 @@ func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKe
keys[int64(i)] = pk
}

genesisTime, err := eth2wrap.FetchGenesisTime(ctx, eth2Cl)
if err != nil {
return nil, errors.Wrap(err, "fetch genesis time")
}

slotDuration, _, err := eth2wrap.FetchSlotsConfig(ctx, eth2Cl)
if err != nil {
return nil, errors.Wrap(err, "fetch slot duration")
}

c := &Consensus{
p2pNode: p2pNode,
sender: sender,
Expand All @@ -274,7 +285,7 @@ func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKe
snifferFunc: snifferFunc,
gaterFunc: gaterFunc,
dropFilter: log.Filter(),
timerFunc: timer.GetRoundTimerFunc(),
timerFunc: timer.GetRoundTimerFunc(genesisTime, slotDuration),
metrics: metrics.NewConsensusMetrics(protocols.QBFTv2ProtocolID),
compareAttestations: compareAttestations,
}
Expand Down
4 changes: 3 additions & 1 deletion core/consensus/qbft/qbft_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"testing"
"time"

k1 "github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -519,7 +520,8 @@ func TestInstanceIO_MaybeStart(t *testing.T) {
c.deadliner = deadliner
c.gaterFunc = func(core.Duty) bool { return true }
c.mutable.instances = make(map[core.Duty]*instance.IO[Msg])
c.timerFunc = timer.GetRoundTimerFunc()
// Use zero values for tests to use default clock.Now() behavior
c.timerFunc = timer.GetRoundTimerFunc(time.Time{}, 0)

// Generate a p2p private key pair.
p2pKey := testutil.GenerateInsecureK1Key(t, 0)
Expand Down
10 changes: 9 additions & 1 deletion core/consensus/qbft/qbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/libp2p/go-libp2p"
libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/obolnetwork/charon/eth2util/enr"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/testutil"
"github.com/obolnetwork/charon/testutil/beaconmock"
)

func TestQBFTConsensus(t *testing.T) {
Expand Down Expand Up @@ -126,7 +128,13 @@ func testQBFTConsensus(t *testing.T, threshold, nodes int) {
deadliner := coremocks.NewDeadliner(t)
deadliner.On("Add", mock.Anything).Return(true)
deadliner.On("C").Return(nil)
c, err := qbft.NewConsensus(hosts[i], new(p2p.Sender), peers, p2pkeys[i], deadliner, gaterFunc, sniffer, false)

// Create a mock beacon client for test
// Use zero genesis time so timer uses relative timing instead of absolute slot-based timing
bmock, err := beaconmock.New(t.Context(), beaconmock.WithGenesisTime(time.Time{}))
require.NoError(t, err)

c, err := qbft.NewConsensus(t.Context(), bmock, hosts[i], new(p2p.Sender), peers, p2pkeys[i], deadliner, gaterFunc, sniffer, false)
require.NoError(t, err)
c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error {
results <- set
Expand Down
71 changes: 64 additions & 7 deletions core/consensus/timer/roundtimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,25 @@ const (
type RoundTimerFunc func(core.Duty) RoundTimer

// GetRoundTimerFunc returns a timer function based on the enabled features.
func GetRoundTimerFunc() RoundTimerFunc {
// Genesis time and slot duration are required to calculate deterministic slot start times.
func GetRoundTimerFunc(genesisTime time.Time, slotDuration time.Duration) RoundTimerFunc {
if featureset.Enabled(featureset.Linear) {
return func(duty core.Duty) RoundTimer {
// Linear timer only affects Proposer duty
if duty.Type == core.DutyProposer {
return NewLinearRoundTimerWithDuty(duty)
} else if featureset.Enabled(featureset.EagerDoubleLinear) {
return NewDoubleEagerLinearRoundTimerWithDuty(duty)
return NewDoubleEagerLinearRoundTimerWithDutyAndTiming(duty, genesisTime, slotDuration)
}

return NewIncreasingRoundTimerWithDuty(duty)
}
}

if featureset.Enabled(featureset.EagerDoubleLinear) {
return NewDoubleEagerLinearRoundTimerWithDuty
return func(duty core.Duty) RoundTimer {
return NewDoubleEagerLinearRoundTimerWithDutyAndTiming(duty, genesisTime, slotDuration)
}
}

// Default to increasing round timer.
Expand Down Expand Up @@ -84,6 +87,20 @@ func proposalTimeoutOptimization(duty core.Duty, round int64) bool {
return featureset.Enabled(featureset.ProposalTimeout) && duty.Type == core.DutyProposer && round == 1
}

// getDutyStartDelayWithDuration returns the delay from slot start to when a duty is scheduled to begin,
// given the slot duration. This matches the scheduler's slot offsets to ensure timers align with when
// consensus actually starts.
func getDutyStartDelayWithDuration(dutyType core.DutyType, slotDuration time.Duration) time.Duration {
switch dutyType {
case core.DutyAttester:
return slotDuration / 3
case core.DutyAggregator, core.DutySyncContribution:
return (2 * slotDuration) / 3
default:
return 0
}
}

// NewIncreasingRoundTimer returns a new increasing round timer type.
func NewIncreasingRoundTimer() RoundTimer {
return NewIncreasingRoundTimerWithClock(clockwork.NewRealClock())
Expand Down Expand Up @@ -164,6 +181,30 @@ func NewDoubleEagerLinearRoundTimerWithDutyAndClock(duty core.Duty, clock clockw
}
}

// NewDoubleEagerLinearRoundTimerWithDutyAndTiming returns a new eager double linear round timer type for a specific duty with genesis time and slot duration.
// This ensures deterministic behavior across all nodes by using slot start time as the reference.
func NewDoubleEagerLinearRoundTimerWithDutyAndTiming(duty core.Duty, genesisTime time.Time, slotDuration time.Duration) RoundTimer {
return &doubleEagerLinearRoundTimer{
clock: clockwork.NewRealClock(),
duty: duty,
genesisTime: genesisTime,
slotDuration: slotDuration,
firstDeadlines: make(map[int64]time.Time),
}
}

// NewDoubleEagerLinearRoundTimerWithDutyTimingAndClock returns a new eager double linear round timer type for a specific duty, genesis time, slot duration, and custom clock.
// This is primarily used for testing with fake clocks.
func NewDoubleEagerLinearRoundTimerWithDutyTimingAndClock(duty core.Duty, genesisTime time.Time, slotDuration time.Duration, clock clockwork.Clock) RoundTimer {
return &doubleEagerLinearRoundTimer{
clock: clock,
duty: duty,
genesisTime: genesisTime,
slotDuration: slotDuration,
firstDeadlines: make(map[int64]time.Time),
}
}

// doubleEagerLinearRoundTimer implements a round timerType with the following properties:
//
// It doubles the round duration when a leader is active.
Expand All @@ -179,8 +220,10 @@ func NewDoubleEagerLinearRoundTimerWithDutyAndClock(duty core.Duty, clock clockw
//
// It is linear, meaning the round duration increases linearly with the round number: 1s, 2s, 3s, etc.
type doubleEagerLinearRoundTimer struct {
clock clockwork.Clock
duty core.Duty
clock clockwork.Clock
duty core.Duty
genesisTime time.Time
slotDuration time.Duration

mu sync.Mutex
firstDeadlines map[int64]time.Time
Expand All @@ -206,8 +249,22 @@ func (t *doubleEagerLinearRoundTimer) Timer(round int64) (<-chan time.Time, func
// Deadline is either double the first timeout
deadline = first.Add(timeout)
} else {
// Or the first timeout
deadline = t.clock.Now().Add(timeout)
// Calculate the first deadline.
// If genesisTime and slotDuration are set, use slot start time for determinism.
// Otherwise, fall back to clock.Now().
if !t.genesisTime.IsZero() && t.slotDuration > 0 {
// Calculate slot start time deterministically from duty slot number.
slotStart := t.genesisTime.Add(t.slotDuration * time.Duration(t.duty.Slot))

// Add duty-specific delay to account for when the duty is scheduled to start.
dutyDelay := getDutyStartDelayWithDuration(t.duty.Type, t.slotDuration)
dutyStart := slotStart.Add(dutyDelay)

// Deadline is duty start time plus the round timeout
deadline = dutyStart.Add(timeout)
} else {
deadline = t.clock.Now().Add(timeout)
}
t.firstDeadlines[round] = deadline
}

Expand Down
10 changes: 7 additions & 3 deletions core/consensus/timer/roundtimer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,25 @@ func TestLinearRoundTimer(t *testing.T) {
}

func TestGetTimerFunc(t *testing.T) {
timerFunc := timer.GetRoundTimerFunc()
// Use zero values for tests to use default clock.Now() behavior
genesisTime := time.Time{}
slotDuration := time.Duration(0)

timerFunc := timer.GetRoundTimerFunc(genesisTime, slotDuration)
require.Equal(t, timer.TimerEagerDoubleLinear, timerFunc(core.NewAttesterDuty(0)).Type())
require.Equal(t, timer.TimerEagerDoubleLinear, timerFunc(core.NewAttesterDuty(1)).Type())
require.Equal(t, timer.TimerEagerDoubleLinear, timerFunc(core.NewAttesterDuty(2)).Type())

featureset.DisableForT(t, featureset.EagerDoubleLinear)

timerFunc = timer.GetRoundTimerFunc()
timerFunc = timer.GetRoundTimerFunc(genesisTime, slotDuration)
require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(0)).Type())
require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(1)).Type())
require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(2)).Type())

featureset.EnableForT(t, featureset.Linear)

timerFunc = timer.GetRoundTimerFunc()
timerFunc = timer.GetRoundTimerFunc(genesisTime, slotDuration)
// non proposer duty, defaults to increasing
require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(0)).Type())
require.Equal(t, timer.TimerIncreasing, timerFunc(core.NewAttesterDuty(1)).Type())
Expand Down
Loading