Skip to content

Commit c1af897

Browse files
authored
Email alert when blob missing from CL (#481)
* alert when blob missing from cl * update content * update content * update content * update content * check email config * check email config * check email config * check email config * fix config * fix config * fix config * fix config * refactor * check shard exist * retry download blob * check blob exist before put cache * Fix comments * complete err msg * better err msg * handle send email error
1 parent 6999af8 commit c1af897

File tree

9 files changed

+121
-50
lines changed

9 files changed

+121
-50
lines changed

cmd/es-node/config.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,24 @@ func NewConfig(ctx *cli.Context, lg log.Logger) (*node.Config, error) {
5454
return nil, fmt.Errorf("failed to load storage config: %w", err)
5555
}
5656

57+
emailConfig, err := email.GetEmailConfig(ctx)
58+
if err != nil {
59+
lg.Warn("Failed to load email config, email notifications will be disabled.", "error", err)
60+
}
5761
dlConfig := NewDownloaderConfig(ctx)
62+
if emailConfig != nil {
63+
dlConfig.EmailConfig = emailConfig
64+
}
5865
minerConfig, err := NewMinerConfig(ctx, client, storageConfig.L1Contract, storageConfig.Miner, lg)
5966
if err != nil {
6067
return nil, fmt.Errorf("failed to load miner config: %w", err)
6168
}
69+
if minerConfig != nil && minerConfig.EmailEnabled {
70+
if emailConfig == nil {
71+
return nil, fmt.Errorf("email config is required by miner but not loaded")
72+
}
73+
minerConfig.EmailConfig = *emailConfig
74+
}
6275
chainId := new(big.Int).SetUint64(ctx.GlobalUint64(flags.ChainId.Name))
6376
lg.Info("Read chain ID of EthStorage network", "chainID", chainId)
6477
if minerConfig != nil {
@@ -134,13 +147,6 @@ func NewMinerConfig(ctx *cli.Context, client *ethclient.Client, l1Contract, mine
134147
if err != nil {
135148
return nil, err
136149
}
137-
if minerConfig.EmailEnabled {
138-
emailConfig, err := email.GetEmailConfig(ctx)
139-
if err != nil {
140-
return nil, fmt.Errorf("failed to get email config: %w", err)
141-
}
142-
minerConfig.EmailConfig = *emailConfig
143-
}
144150

145151
cctx := context.Background()
146152
cr := newContractReader(cctx, client, l1Contract, lg)

ethstorage/archiver/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (a *APIService) Start(ctx context.Context) error {
119119
return err
120120
}
121121
r := mux.NewRouter()
122-
// Deprecated
122+
// Deprecated by Fusaka but still used by OP Stack
123123
r.HandleFunc("/eth/v1/beacon/blob_sidecars/{id}", a.blobSidecarHandler)
124124
// Fusaka
125125
r.HandleFunc("/eth/v1/beacon/blobs/{id}", a.blobsHandler)

ethstorage/blobs/blob.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
)
1313

1414
type BeaconBlobs struct {
15-
Data []string `json:"data"`
15+
Data []string `json:"data"`
16+
Message string `json:"message"`
17+
Code int `json:"code"`
1618
}
1719

1820
func BlobToVersionedHash(blobBytes []byte) (common.Hash, error) {

ethstorage/downloader/blob_disk_cache.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ func (c *BlobDiskCache) SetBlockBlobs(block *blockBlobs) error {
6868
}
6969
var blbs []*blob
7070
for _, b := range block.blobs {
71+
if b.data == nil {
72+
continue
73+
}
7174
kvi := b.kvIndex.Uint64()
7275
id, err := c.store.Put(b.data)
7376
if err != nil {

ethstorage/downloader/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33

44
package downloader
55

6+
import "github.com/ethstorage/go-ethstorage/ethstorage/email"
7+
68
type Config struct {
79
DownloadStart int64 // which block should we download the blobs from
810
DownloadDump string // where to dump the download blobs
911
DownloadThreadNum int // how many threads that will be used to download the blobs into storage file
12+
EmailConfig *email.EmailConfig
1013
}

ethstorage/downloader/downloader.go

Lines changed: 84 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"math/big"
1313
"os"
1414
"path/filepath"
15+
"slices"
1516
"sync"
1617
"time"
1718

@@ -22,6 +23,7 @@ import (
2223
"github.com/ethereum/go-ethereum/rpc"
2324

2425
"github.com/ethstorage/go-ethstorage/ethstorage"
26+
"github.com/ethstorage/go-ethstorage/ethstorage/email"
2527
"github.com/ethstorage/go-ethstorage/ethstorage/eth"
2628
)
2729

@@ -70,10 +72,11 @@ type Downloader struct {
7072
dlLatestReq chan struct{}
7173
dlFinalizedReq chan struct{}
7274

73-
lg log.Logger
74-
done chan struct{}
75-
wg sync.WaitGroup
76-
mu sync.Mutex
75+
emailConfig *email.EmailConfig
76+
lg log.Logger
77+
done chan struct{}
78+
wg sync.WaitGroup
79+
mu sync.Mutex
7780
}
7881

7982
type blob struct {
@@ -105,28 +108,27 @@ func NewDownloader(
105108
db ethdb.Database,
106109
sm *ethstorage.StorageManager,
107110
cache BlobCache,
108-
downloadStart int64,
109-
downloadDump string,
110111
minDurationForBlobsRequest uint64,
111-
downloadThreadNum int,
112+
downloadConfig Config,
112113
lg log.Logger,
113114
) *Downloader {
114-
sm.DownloadThreadNum = downloadThreadNum
115+
sm.DownloadThreadNum = downloadConfig.DownloadThreadNum
115116
return &Downloader{
116117
Cache: cache,
117118
l1Source: l1Source,
118119
l1Beacon: l1Beacon,
119120
daClient: daClient,
120121
db: db,
121122
sm: sm,
122-
dumpDir: downloadDump,
123+
dumpDir: downloadConfig.DownloadDump,
123124
minDurationForBlobsRequest: minDurationForBlobsRequest,
124125
dlLatestReq: make(chan struct{}, 1),
125126
dlFinalizedReq: make(chan struct{}, 1),
126127
lg: lg,
127128
done: make(chan struct{}),
128-
lastDownloadBlock: downloadStart,
129+
lastDownloadBlock: downloadConfig.DownloadStart,
129130
downloadedBlobs: 0,
131+
emailConfig: downloadConfig.EmailConfig,
130132
}
131133
}
132134

@@ -386,33 +388,22 @@ func (s *Downloader) downloadRange(start int64, end int64, toCache bool) ([]blob
386388
)
387389
}
388390

389-
var clBlobs map[common.Hash]eth.Blob
390-
if s.l1Beacon != nil {
391-
clBlobs, err = s.l1Beacon.DownloadBlobs(s.l1Beacon.Timestamp2Slot(elBlock.timestamp))
392-
if err != nil {
393-
s.lg.Error("L1 beacon download blob error", "err", err)
394-
return nil, err
395-
}
396-
} else if s.daClient != nil {
397-
var hashes []common.Hash
398-
for _, blob := range elBlock.blobs {
399-
hashes = append(hashes, blob.hash)
400-
}
401-
402-
clBlobs, err = s.daClient.DownloadBlobs(hashes)
403-
if err != nil {
404-
s.lg.Error("DA client download blob error", "err", err)
405-
return nil, err
406-
}
407-
} else {
408-
return nil, fmt.Errorf("no beacon client or DA client is available")
391+
clBlobs, err := s.downloadBlobsWithRetry(elBlock, 3)
392+
if err != nil {
393+
s.lg.Error("Failed to download blobs for the block after 3 attempts", "block", elBlock.number, "err", err)
394+
// Empty CL blob will be handled later in the EL blob loop
409395
}
410396

411397
for _, elBlob := range elBlock.blobs {
398+
shard := elBlob.kvIndex.Uint64() >> s.sm.KvEntriesBits()
399+
if !slices.Contains(s.sm.Shards(), shard) {
400+
s.lg.Warn("Shard not initialized locally for the kvIndex, skip this blob", "kvIndex", elBlob.kvIndex.Uint64(), "shard", shard)
401+
continue
402+
}
412403
clBlob, exists := clBlobs[elBlob.hash]
413404
if !exists {
414-
s.lg.Error("Did not find the event specified blob in the CL")
415-
405+
s.notifyBlobMissing(elBlock.number, elBlob.kvIndex.Uint64(), elBlob.hash)
406+
continue
416407
}
417408
// encode blobs so that miner can do sampling directly from cache
418409
elBlob.data = s.sm.EncodeBlob(clBlob.Data, elBlob.hash, elBlob.kvIndex.Uint64(), s.sm.MaxKvSize())
@@ -434,6 +425,49 @@ func (s *Downloader) downloadRange(start int64, end int64, toCache bool) ([]blob
434425
return blobs, nil
435426
}
436427

428+
func (s *Downloader) downloadBlobsWithRetry(elBlock *blockBlobs, maxAttempts int) (map[common.Hash]eth.Blob, error) {
429+
var lastErr error
430+
for attempt := 1; attempt <= maxAttempts; attempt++ {
431+
clBlobs, err := s.downloadBlobs(elBlock)
432+
if err == nil {
433+
return clBlobs, nil
434+
}
435+
lastErr = err
436+
if attempt < maxAttempts {
437+
time.Sleep(3 * time.Second)
438+
}
439+
}
440+
return nil, lastErr
441+
}
442+
443+
func (s *Downloader) downloadBlobs(elBlock *blockBlobs) (map[common.Hash]eth.Blob, error) {
444+
if s.l1Beacon != nil {
445+
slot := s.l1Beacon.Timestamp2Slot(elBlock.timestamp)
446+
clBlobs, err := s.l1Beacon.DownloadBlobs(slot)
447+
if err != nil {
448+
s.lg.Error("L1 beacon download blob error", "block", elBlock.number, "slot", slot, "err", err)
449+
return nil, err
450+
}
451+
return clBlobs, nil
452+
}
453+
454+
if s.daClient != nil {
455+
hashes := make([]common.Hash, 0, len(elBlock.blobs))
456+
for _, b := range elBlock.blobs {
457+
hashes = append(hashes, b.hash)
458+
}
459+
460+
clBlobs, err := s.daClient.DownloadBlobs(hashes)
461+
if err != nil {
462+
s.lg.Error("DA client download blob error", "err", err)
463+
return nil, err
464+
}
465+
return clBlobs, nil
466+
}
467+
468+
return nil, fmt.Errorf("no beacon client or DA client is available")
469+
}
470+
437471
func (s *Downloader) dumpBlobsIfNeeded(blobs []blob) {
438472
if s.dumpDir != "" {
439473
for _, blob := range blobs {
@@ -484,3 +518,21 @@ func (s *Downloader) eventsToBlocks(events []types.Log) ([]*blockBlobs, error) {
484518

485519
return blocks, nil
486520
}
521+
522+
func (s *Downloader) notifyBlobMissing(blockNumber uint64, kvIndex uint64, hash common.Hash) {
523+
title := "🛑 Fatal Error from es-node: Downloader Failed to Locate Blob in CL"
524+
msg := "The downloader couldn't locate the specified blob in the consensus layer. The node is stopped pending resolution. "
525+
msg += "Details from the EL event: \n"
526+
msg += fmt.Sprintf(" - blockNumber: %d\n", blockNumber)
527+
msg += fmt.Sprintf(" - kvIndex: %d\n", kvIndex)
528+
msg += fmt.Sprintf(" - hash: %s\n", hash.Hex())
529+
msg += "This may indicate a potential issue with blob availability on the consensus layer. \n"
530+
531+
if s.emailConfig != nil {
532+
if err := email.SendEmail(title, msg, *s.emailConfig, s.lg); err == nil {
533+
return
534+
}
535+
}
536+
s.lg.Error(title)
537+
s.lg.Crit(msg)
538+
}

ethstorage/email/email.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (c EmailConfig) String() string {
5454
)
5555
}
5656

57-
func SendEmail(emailSubject, msg string, config EmailConfig, lg log.Logger) {
57+
func SendEmail(emailSubject, msg string, config EmailConfig, lg log.Logger) error {
5858
lg.Info("Sending email notification", "subject", emailSubject)
5959

6060
emailBody := fmt.Sprintf("Subject: %s\r\n", emailSubject)
@@ -80,4 +80,5 @@ func SendEmail(emailSubject, msg string, config EmailConfig, lg log.Logger) {
8080
} else {
8181
lg.Info("Email notification sent successfully!")
8282
}
83+
return err
8384
}

ethstorage/eth/beacon_client.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,25 +77,31 @@ func (c *BeaconClient) DownloadBlobs(slot uint64) (map[common.Hash]Blob, error)
7777
}
7878
resp, err := http.Get(beaconUrl)
7979
if err != nil {
80-
return nil, err
80+
return nil, fmt.Errorf("failed to query beacon blobs with url %s: %w", beaconUrl, err)
8181
}
8282
defer resp.Body.Close()
8383

8484
var blobsResp blobs.BeaconBlobs
8585
if err := json.NewDecoder(resp.Body).Decode(&blobsResp); err != nil {
86-
return nil, err
86+
return nil, fmt.Errorf("failed to decode beacon blobs response from url %s: %w", beaconUrl, err)
87+
}
88+
if len(blobsResp.Data) == 0 {
89+
err := fmt.Sprintf("no blobs found for slot %d", slot)
90+
if blobsResp.Code != 0 || blobsResp.Message != "" {
91+
err = fmt.Sprintf("%s: %d %s", err, blobsResp.Code, blobsResp.Message)
92+
}
93+
return nil, fmt.Errorf("%s", err)
8794
}
88-
8995
res := map[common.Hash]Blob{}
9096
for _, beaconBlob := range blobsResp.Data {
9197
// decode hex string to bytes
9298
asciiBytes, err := hex.DecodeString(beaconBlob[2:])
9399
if err != nil {
94-
return nil, err
100+
return nil, fmt.Errorf("failed to decode beacon blob hex string %s: %w", beaconBlob, err)
95101
}
96102
hash, err := blobs.BlobToVersionedHash(asciiBytes)
97103
if err != nil {
98-
return nil, err
104+
return nil, fmt.Errorf("failed to compute versioned hash for blob: %w", err)
99105
}
100106
res[hash] = Blob{VersionedHash: hash, Data: asciiBytes}
101107
}

ethstorage/node/node.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,8 @@ func (n *EsNode) initL2(ctx context.Context, cfg *Config) error {
139139
n.db,
140140
n.storageManager,
141141
n.blobCache,
142-
cfg.Downloader.DownloadStart,
143-
cfg.Downloader.DownloadDump,
144142
cfg.L1.L1MinDurationForBlobsRequest,
145-
cfg.Downloader.DownloadThreadNum,
143+
cfg.Downloader,
146144
n.lg,
147145
)
148146
return nil

0 commit comments

Comments
 (0)