Skip to content
9 changes: 9 additions & 0 deletions apps/evm/server/force_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/evstack/ev-node/pkg/config"
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
da "github.com/evstack/ev-node/pkg/da/types"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -73,6 +74,14 @@ func (m *mockDA) HasForcedInclusionNamespace() bool {
return true
}

func (m *mockDA) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
return nil, nil
}

func (m *mockDA) LocalHead(ctx context.Context) (uint64, error) {
return 0, nil
}

func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) {
testHeight := uint64(100)

Expand Down
32 changes: 32 additions & 0 deletions block/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type Metrics struct {
// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period
ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious

// Sync mode metrics
SyncMode metrics.Gauge // Current sync mode: 0=catchup, 1=follow
SubscribeErrors metrics.Counter // Number of subscription failures
ModeSwitches metrics.Counter // Number of catchup<->follow mode transitions
}

// PrometheusMetrics returns Metrics built using Prometheus client library
Expand Down Expand Up @@ -201,6 +206,28 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)",
}, labels).With(labelsAndValues...)

// Sync mode metrics
m.SyncMode = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "sync_mode",
Help: "Current sync mode: 0=catchup (polling), 1=follow (subscription)",
}, labels).With(labelsAndValues...)

m.SubscribeErrors = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "subscribe_errors_total",
Help: "Total number of DA subscription failures",
}, labels).With(labelsAndValues...)

m.ModeSwitches = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "mode_switches_total",
Help: "Total number of sync mode transitions between catchup and follow",
}, labels).With(labelsAndValues...)

// DA Submitter metrics
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -269,6 +296,11 @@ func NopMetrics() *Metrics {
// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod: discard.NewGauge(),
ForcedInclusionTxsMalicious: discard.NewCounter(),

// Sync mode metrics
SyncMode: discard.NewGauge(),
SubscribeErrors: discard.NewCounter(),
ModeSwitches: discard.NewCounter(),
}

// Initialize maps with no-op metrics
Expand Down
24 changes: 24 additions & 0 deletions block/internal/common/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"

// BlobsFromSubscription returns non-empty blob data from a subscription response.
func BlobsFromSubscription(resp *blobrpc.SubscriptionResponse) [][]byte {
if resp == nil || len(resp.Blobs) == 0 {
return nil
}

blobs := make([][]byte, 0, len(resp.Blobs))
for _, blob := range resp.Blobs {
if blob == nil {
continue
}
data := blob.Data()
if len(data) == 0 {
continue
}
blobs = append(blobs, data)
}

return blobs
}
63 changes: 63 additions & 0 deletions block/internal/da/async_block_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type AsyncBlockRetriever interface {
Stop()
GetCachedBlock(ctx context.Context, daHeight uint64) (*BlockData, error)
UpdateCurrentHeight(height uint64)
StoreBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time)
}

// BlockData contains data retrieved from a single DA height
Expand Down Expand Up @@ -125,6 +126,68 @@ func (f *asyncBlockRetriever) UpdateCurrentHeight(height uint64) {
}
}

// StoreBlock caches a block's blobs, favoring existing data to avoid churn.
func (f *asyncBlockRetriever) StoreBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time) {
if len(f.namespace) == 0 {
return
}
if height < f.daStartHeight {
return
}
if len(blobs) == 0 {
return
}

filtered := make([][]byte, 0, len(blobs))
for _, blob := range blobs {
if len(blob) > 0 {
filtered = append(filtered, blob)
}
}
if len(filtered) == 0 {
return
}

if timestamp.IsZero() {
timestamp = time.Now().UTC()
}

key := newBlockDataKey(height)
if existing, err := f.cache.Get(ctx, key); err == nil {
var pbBlock pb.BlockData
if err := proto.Unmarshal(existing, &pbBlock); err == nil && len(pbBlock.Blobs) > 0 {
return
}
}

pbBlock := &pb.BlockData{
Height: height,
Timestamp: timestamp.Unix(),
Blobs: filtered,
}
data, err := proto.Marshal(pbBlock)
if err != nil {
f.logger.Error().
Err(err).
Uint64("height", height).
Msg("failed to marshal block for caching")
return
}

if err := f.cache.Put(ctx, key, data); err != nil {
f.logger.Error().
Err(err).
Uint64("height", height).
Msg("failed to cache block")
return
}

f.logger.Debug().
Uint64("height", height).
Int("blob_count", len(filtered)).
Msg("cached block from subscription")
}

func newBlockDataKey(height uint64) ds.Key {
return ds.NewKey(fmt.Sprintf("/block/%d", height))
}
Expand Down
24 changes: 24 additions & 0 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,27 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype

return results, nil
}

// Subscribe subscribes to blobs in the specified namespace.
// Returns a channel that receives subscription responses as new blobs are included.
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
ns, err := share.NewNamespaceFromBytes(namespace)
if err != nil {
return nil, fmt.Errorf("invalid namespace: %w", err)
}

return c.blobAPI.Subscribe(ctx, ns)
}

// LocalHead returns the height of the locally synced DA head.
func (c *client) LocalHead(ctx context.Context) (uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for me while reading, appears a little confusing localHead in this context.

Is it querying the local node? is the last header that the DA has of my chain? is the last header that the DA layer has synced?

maybe this thought changes once all the review is done

headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()

header, err := c.headerAPI.LocalHead(headCtx)
if err != nil {
return 0, fmt.Errorf("failed to get local head: %w", err)
}

return header.Height, nil
}
Loading