From 930d65a01cdf268d356f6c63a679b6bf7a9c97a5 Mon Sep 17 00:00:00 2001 From: Arpit Date: Sun, 25 May 2025 13:27:39 +0000 Subject: [PATCH 1/8] Creating global instance of WAL --- internal/server/ironhawk/iothread.go | 7 +++-- internal/server/ironhawk/main.go | 3 +-- internal/wal/wal.go | 29 ++++++--------------- internal/wal/wal_null.go | 38 ---------------------------- server/main.go | 12 ++++----- tests/commands/ironhawk/setup.go | 4 +-- 6 files changed, 19 insertions(+), 74 deletions(-) delete mode 100644 internal/wal/wal_null.go diff --git a/internal/server/ironhawk/iothread.go b/internal/server/ironhawk/iothread.go index d3cd98222..f130821e2 100644 --- a/internal/server/ironhawk/iothread.go +++ b/internal/server/ironhawk/iothread.go @@ -60,7 +60,7 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa select { case <-ctx.Done(): - slog.Debug("io-thread context cancelled, shutting down receive loop") + slog.Debug("io-thread context canceled, shutting down receive loop") return ctx.Err() case err := <-errCh: return err @@ -95,9 +95,8 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa } // Log command to WAL if enabled and not a replay - if err == nil && wal.GetWAL() != nil && !_c.IsReplay { - // Create WAL entry using protobuf message - if err := wal.GetWAL().LogCommand(_c.C); err != nil { + if wal.DefaultWAL != nil && !_c.IsReplay { + if err := wal.DefaultWAL.LogCommand(_c.C); err != nil { slog.Error("failed to log command to WAL", slog.Any("error", err)) } } diff --git a/internal/server/ironhawk/main.go b/internal/server/ironhawk/main.go index 4f2ae9b7c..a2df3ef6f 100644 --- a/internal/server/ironhawk/main.go +++ b/internal/server/ironhawk/main.go @@ -15,7 +15,6 @@ import ( "github.com/dicedb/dice/config" "github.com/dicedb/dice/internal/shardmanager" - "github.com/dicedb/dice/internal/wal" ) type Server struct { @@ -28,7 +27,7 @@ type Server struct { ioThreadManager *IOThreadManager } -func NewServer(shardManager *shardmanager.ShardManager, ioThreadManager *IOThreadManager, watchManager *WatchManager, wl wal.WAL) *Server { +func NewServer(shardManager *shardmanager.ShardManager, ioThreadManager *IOThreadManager, watchManager *WatchManager) *Server { return &Server{ Host: config.Config.Host, Port: config.Config.Port, diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 16252e771..34f24095d 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -24,54 +24,41 @@ var ( ticker *time.Ticker stopCh chan struct{} mu sync.Mutex - wl WAL ) -// GetWAL returns the global WAL instance -func GetWAL() WAL { - mu.Lock() - defer mu.Unlock() - return wl -} - -// SetGlobalWAL sets the global WAL instance -func SetWAL(_wl WAL) { - mu.Lock() - defer mu.Unlock() - wl = _wl -} +var DefaultWAL WAL func init() { ticker = time.NewTicker(10 * time.Second) stopCh = make(chan struct{}) } -func rotateWAL(wl WAL) { +func rotateWAL() { mu.Lock() defer mu.Unlock() - if err := wl.Close(); err != nil { + if err := DefaultWAL.Close(); err != nil { slog.Warn("error closing the WAL", slog.Any("error", err)) } - if err := wl.Init(time.Now()); err != nil { + if err := DefaultWAL.Init(time.Now()); err != nil { slog.Warn("error creating a new WAL", slog.Any("error", err)) } } -func periodicRotate(wl WAL) { +func periodicRotate() { for { select { case <-ticker.C: - rotateWAL(wl) + rotateWAL() case <-stopCh: return } } } -func InitBG(wl WAL) { - go periodicRotate(wl) +func RunAsyncJobs() { + go periodicRotate() } func ShutdownBG() { diff --git a/internal/wal/wal_null.go b/internal/wal/wal_null.go deleted file mode 100644 index 46e030810..000000000 --- a/internal/wal/wal_null.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2022-present, DiceDB contributors -// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information. - -package wal - -import ( - "time" - - w "github.com/dicedb/dicedb-go/wal" - "github.com/dicedb/dicedb-go/wire" -) - -type WALNull struct { -} - -func NewNullWAL() (*WALNull, error) { - return &WALNull{}, nil -} - -func (w *WALNull) Init(t time.Time) error { - return nil -} - -func (w *WALNull) LogCommand(c *wire.Command) error { - return nil -} - -func (w *WALNull) Close() error { - return nil -} - -func (w *WALNull) Replay(callback func(*w.Element) error) error { - return nil -} - -func (w *WALNull) Iterate(entry *w.Element, callback func(*w.Element) error) error { - return nil -} diff --git a/server/main.go b/server/main.go index a537b6276..7e827b7ed 100644 --- a/server/main.go +++ b/server/main.go @@ -83,7 +83,6 @@ func Start() { var ( serverErrCh = make(chan error, 2) - wl wal.WAL walInitSuccessful = false ) @@ -95,17 +94,16 @@ func Start() { cancel() return } - wl = _wl - wal.SetWAL(wl) // Set the global WAL instance + wal.DefaultWAL = _wl - if err := wl.Init(time.Now()); err != nil { + if err := wal.DefaultWAL.Init(time.Now()); err != nil { slog.Warn("could not initialize WAL", slog.Any("error", err)) slog.Warn("disabling WAL and continuing") // TODO: Make sure that the WAL is disabled // We should not incurring any additional cost of making LogCommand // invocations. } else { - go wal.InitBG(wl) + go wal.RunAsyncJobs() slog.Debug("WAL initialization complete") walInitSuccessful = true } @@ -150,7 +148,7 @@ func Start() { } ioThreadManager := ironhawk.NewIOThreadManager() - ironhawkServer := ironhawk.NewServer(shardManager, ioThreadManager, watchManager, wl) + ironhawkServer := ironhawk.NewServer(shardManager, ioThreadManager, watchManager) serverWg.Add(1) go runServer(ctx, &serverWg, ironhawkServer, serverErrCh) @@ -173,7 +171,7 @@ func Start() { } return nil } - if err := wl.Replay(callback); err != nil { + if err := wal.DefaultWAL.Replay(callback); err != nil { slog.Error("error restoring from WAL", slog.Any("error", err)) } slog.Info("database restored from WAL") diff --git a/tests/commands/ironhawk/setup.go b/tests/commands/ironhawk/setup.go index f0b9786b5..156f9753b 100644 --- a/tests/commands/ironhawk/setup.go +++ b/tests/commands/ironhawk/setup.go @@ -112,9 +112,9 @@ func RunTestServer(wg *sync.WaitGroup) { shardManager := shardmanager.NewShardManager(1, gec) ioThreadManager := ironhawk.NewIOThreadManager() watchManager := &ironhawk.WatchManager{} - wl, _ := wal.NewAOFWAL(config.Config.WALDir) + wal.DefaultWAL, _ = wal.NewAOFWAL(config.Config.WALDir) - testServer := ironhawk.NewServer(shardManager, ioThreadManager, watchManager, wl) + testServer := ironhawk.NewServer(shardManager, ioThreadManager, watchManager) ctx, cancel := context.WithCancel(context.Background()) fmt.Println("Starting the test server on port", config.Config.Port) From 4a33e8af04631e1829f6b7cced098834ce664559 Mon Sep 17 00:00:00 2001 From: Arpit Date: Sun, 25 May 2025 13:42:44 +0000 Subject: [PATCH 2/8] AOF WAL renamed as Forge and flow is simplified --- config/config.go | 1 + internal/server/ironhawk/iothread.go | 2 ++ internal/wal/wal.go | 17 ++++++++- internal/wal/{wal_aof.go => wal_forge.go} | 42 ++++++++++++----------- server/main.go | 36 +++++-------------- tests/commands/ironhawk/setup.go | 2 +- 6 files changed, 50 insertions(+), 50 deletions(-) rename internal/wal/{wal_aof.go => wal_forge.go} (91%) diff --git a/config/config.go b/config/config.go index 8a8af2f6e..d8065626c 100644 --- a/config/config.go +++ b/config/config.go @@ -62,6 +62,7 @@ type DiceDBConfig struct { Engine string `mapstructure:"engine" default:"ironhawk" description:"the engine to use, values: ironhawk"` EnableWAL bool `mapstructure:"enable-wal" default:"false" description:"enable write-ahead logging"` + WALVariant string `mapstructure:"wal-variant" default:"forge" description:"wal variant to use, values: forge"` WALDir string `mapstructure:"wal-dir" default:"logs" description:"the directory to store WAL segments"` WALMode string `mapstructure:"wal-mode" default:"buffered" description:"wal mode to use, values: buffered, unbuffered"` WALWriteMode string `mapstructure:"wal-write-mode" default:"default" description:"wal file write mode to use, values: default, fsync"` diff --git a/internal/server/ironhawk/iothread.go b/internal/server/ironhawk/iothread.go index f130821e2..181e65b77 100644 --- a/internal/server/ironhawk/iothread.go +++ b/internal/server/ironhawk/iothread.go @@ -5,6 +5,7 @@ package ironhawk import ( "context" + "fmt" "log/slog" "strings" @@ -95,6 +96,7 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa } // Log command to WAL if enabled and not a replay + fmt.Println("wal.DefaultWAL", wal.DefaultWAL) if wal.DefaultWAL != nil && !_c.IsReplay { if err := wal.DefaultWAL.LogCommand(_c.C); err != nil { slog.Error("failed to log command to WAL", slog.Any("error", err)) diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 34f24095d..c177550ba 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/dicedb/dice/config" w "github.com/dicedb/dicedb-go/wal" "github.com/dicedb/dicedb-go/wire" ) @@ -57,7 +58,7 @@ func periodicRotate() { } } -func RunAsyncJobs() { +func StartAsyncJobs() { go periodicRotate() } @@ -65,3 +66,17 @@ func ShutdownBG() { close(stopCh) ticker.Stop() } + +func SetupWAL() { + switch config.Config.WALVariant { + case "forge": + DefaultWAL = NewWALForge() + default: + return + } + + if err := DefaultWAL.Init(time.Now()); err != nil { + slog.Error("could not initialize WAL", slog.Any("error", err)) + panic(err) + } +} diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_forge.go similarity index 91% rename from internal/wal/wal_aof.go rename to internal/wal/wal_forge.go index 62203436a..9469d849f 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_forge.go @@ -39,17 +39,17 @@ var bb []byte func init() { // TODO: Pre-allocate a buffer to avoid re-allocating it - // This will hold one WAL AOF Entry Before it is written to the buffer + // This will hold one WAL Forge Entry Before it is written to the buffer bb = make([]byte, 10*1024) } -type WALAOFEntry struct { +type WALForgeEntry struct { Len uint32 Crc32 uint32 Payload []byte } -type WALAOF struct { +type WALForge struct { logDir string currentSegmentFile *os.File walMode string @@ -73,10 +73,10 @@ type WALAOF struct { cancel context.CancelFunc } -func NewAOFWAL(directory string) (*WALAOF, error) { +func NewWALForge() *WALForge { ctx, cancel := context.WithCancel(context.Background()) - return &WALAOF{ - logDir: directory, + return &WALForge{ + logDir: config.Config.WALDir, walMode: config.Config.WALMode, bufferSyncTicker: time.NewTicker(time.Duration(config.Config.WALBufferSyncIntervalMillis) * time.Millisecond), segmentRotationTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRotationTimeSec) * time.Second), @@ -90,10 +90,10 @@ func NewAOFWAL(directory string) (*WALAOF, error) { rotationMode: config.Config.WALRotationMode, ctx: ctx, cancel: cancel, - }, nil + } } -func (wl *WALAOF) Init(t time.Time) error { +func (wl *WALForge) Init(t time.Time) error { // TODO - Restore existing checkpoints to memory // Create the directory if it doesn't exist @@ -123,6 +123,8 @@ func (wl *WALAOF) Init(t time.Time) error { wl.bufWriter = bufio.NewWriterSize(wl.currentSegmentFile, wl.bufferSize) go wl.keepSyncingBuffer() + go StartAsyncJobs() + switch wl.rotationMode { case RotationModeTime: go wl.rotateSegmentPeriodically() @@ -135,7 +137,7 @@ func (wl *WALAOF) Init(t time.Time) error { // Log writes a command to the WAL with a monotonically increasing sequence number. // The sequence number is assigned atomically and the command is written to the wl. -func (wl *WALAOF) LogCommand(c *wire.Command) error { +func (wl *WALForge) LogCommand(c *wire.Command) error { // Lock once for the entire sequence number operation wl.mu.Lock() defer wl.mu.Unlock() @@ -196,7 +198,7 @@ func (wl *WALAOF) LogCommand(c *wire.Command) error { } // rotateLogIfNeeded is not thread safe -func (wl *WALAOF) rotateLogIfNeeded(entrySize uint32) error { +func (wl *WALForge) rotateLogIfNeeded(entrySize uint32) error { if wl.currentSegmentSize+entrySize > wl.maxSegmentSize { if err := wl.rotateLog(); err != nil { return err @@ -206,7 +208,7 @@ func (wl *WALAOF) rotateLogIfNeeded(entrySize uint32) error { } // rotateLog is not thread safe -func (wl *WALAOF) rotateLog() error { +func (wl *WALForge) rotateLog() error { if err := wl.Sync(); err != nil { return err } @@ -235,7 +237,7 @@ func (wl *WALAOF) rotateLog() error { return nil } -func (wl *WALAOF) deleteOldestSegment() error { +func (wl *WALForge) deleteOldestSegment() error { oldestSegmentFilePath := filepath.Join(wl.logDir, segmentPrefix+fmt.Sprintf("%d", wl.oldestSegmentIndex)+segmentSuffix) // TODO: checkpoint before deleting the file @@ -247,7 +249,7 @@ func (wl *WALAOF) deleteOldestSegment() error { } // Close the WAL file. It also calls Sync() on the wl. -func (wl *WALAOF) Close() error { +func (wl *WALForge) Close() error { wl.cancel() if err := wl.Sync(); err != nil { return err @@ -257,7 +259,7 @@ func (wl *WALAOF) Close() error { // Writes out any data in the WAL's in-memory buffer to the segment file. If // fsync is enabled, it also calls fsync on the segment file. -func (wl *WALAOF) Sync() error { +func (wl *WALForge) Sync() error { if err := wl.bufWriter.Flush(); err != nil { return err } @@ -269,7 +271,7 @@ func (wl *WALAOF) Sync() error { return nil } -func (wl *WALAOF) keepSyncingBuffer() { +func (wl *WALForge) keepSyncingBuffer() { for { select { case <-wl.bufferSyncTicker.C: @@ -287,7 +289,7 @@ func (wl *WALAOF) keepSyncingBuffer() { } } -func (wl *WALAOF) rotateSegmentPeriodically() { +func (wl *WALForge) rotateSegmentPeriodically() { for { select { case <-wl.segmentRotationTicker.C: @@ -304,7 +306,7 @@ func (wl *WALAOF) rotateSegmentPeriodically() { } } -func (wl *WALAOF) deleteSegmentPeriodically() { +func (wl *WALForge) deleteSegmentPeriodically() { for { select { case <-wl.segmentRetentionTicker.C: @@ -320,7 +322,7 @@ func (wl *WALAOF) deleteSegmentPeriodically() { } } -func (wl *WALAOF) segmentFiles() ([]string, error) { +func (wl *WALForge) segmentFiles() ([]string, error) { // Get all segment files matching the pattern files, err := filepath.Glob(filepath.Join(wl.logDir, segmentPrefix+"*"+segmentSuffix)) if err != nil { @@ -340,7 +342,7 @@ func (wl *WALAOF) segmentFiles() ([]string, error) { return files, nil } -func (wl *WALAOF) Replay(callback func(*w.Element) error) error { +func (wl *WALForge) Replay(callback func(*w.Element) error) error { var crc uint32 var entrySize uint32 var el w.Element @@ -404,6 +406,6 @@ func (wl *WALAOF) Replay(callback func(*w.Element) error) error { return nil } -func (wl *WALAOF) Iterate(e *w.Element, c func(*w.Element) error) error { +func (wl *WALForge) Iterate(e *w.Element, c func(*w.Element) error) error { return c(e) } diff --git a/server/main.go b/server/main.go index 7e827b7ed..a19755d61 100644 --- a/server/main.go +++ b/server/main.go @@ -16,7 +16,6 @@ import ( "runtime/trace" "sync" "syscall" - "time" "github.com/dicedb/dice/internal/auth" "github.com/dicedb/dice/internal/cmd" @@ -82,31 +81,11 @@ func Start() { signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) var ( - serverErrCh = make(chan error, 2) - walInitSuccessful = false + serverErrCh = make(chan error, 2) ) if config.Config.EnableWAL { - _wl, err := wal.NewAOFWAL(config.Config.WALDir) - if err != nil { - slog.Warn("could not create WAL at", slog.String("wal-dir", config.Config.WALDir), slog.Any("error", err)) - sigs <- syscall.SIGKILL - cancel() - return - } - wal.DefaultWAL = _wl - - if err := wal.DefaultWAL.Init(time.Now()); err != nil { - slog.Warn("could not initialize WAL", slog.Any("error", err)) - slog.Warn("disabling WAL and continuing") - // TODO: Make sure that the WAL is disabled - // We should not incurring any additional cost of making LogCommand - // invocations. - } else { - go wal.RunAsyncJobs() - slog.Debug("WAL initialization complete") - walInitSuccessful = true - } + wal.SetupWAL() } // Get the number of available CPU cores on the machine using runtime.NumCPU(). @@ -150,11 +129,8 @@ func Start() { ioThreadManager := ironhawk.NewIOThreadManager() ironhawkServer := ironhawk.NewServer(shardManager, ioThreadManager, watchManager) - serverWg.Add(1) - go runServer(ctx, &serverWg, ironhawkServer, serverErrCh) - - // Recovery from WAL logs - if config.Config.EnableWAL && walInitSuccessful { + // Restore the database from WAL logs + if config.Config.EnableWAL { slog.Info("restoring database from WAL") callback := func(el *w.Element) error { var cd wire.Command @@ -177,6 +153,10 @@ func Start() { slog.Info("database restored from WAL") } + slog.Info("ready to accept connections") + serverWg.Add(1) + go runServer(ctx, &serverWg, ironhawkServer, serverErrCh) + wg.Add(1) go func() { defer wg.Done() diff --git a/tests/commands/ironhawk/setup.go b/tests/commands/ironhawk/setup.go index 156f9753b..e1ee0a29b 100644 --- a/tests/commands/ironhawk/setup.go +++ b/tests/commands/ironhawk/setup.go @@ -112,7 +112,7 @@ func RunTestServer(wg *sync.WaitGroup) { shardManager := shardmanager.NewShardManager(1, gec) ioThreadManager := ironhawk.NewIOThreadManager() watchManager := &ironhawk.WatchManager{} - wal.DefaultWAL, _ = wal.NewAOFWAL(config.Config.WALDir) + wal.SetupWAL() testServer := ironhawk.NewServer(shardManager, ioThreadManager, watchManager) From 60ab7dde1479a4dad6e6a1151ecd0d9844c4048d Mon Sep 17 00:00:00 2001 From: Arpit Date: Sun, 25 May 2025 13:55:41 +0000 Subject: [PATCH 3/8] Making the most essential public in `wal` module --- config/config.go | 1 - internal/server/ironhawk/iothread.go | 2 - internal/wal/wal.go | 6 +-- internal/wal/wal_forge.go | 58 ++++++++++------------------ server/main.go | 3 +- 5 files changed, 24 insertions(+), 46 deletions(-) diff --git a/config/config.go b/config/config.go index d8065626c..f45f8ddb8 100644 --- a/config/config.go +++ b/config/config.go @@ -64,7 +64,6 @@ type DiceDBConfig struct { EnableWAL bool `mapstructure:"enable-wal" default:"false" description:"enable write-ahead logging"` WALVariant string `mapstructure:"wal-variant" default:"forge" description:"wal variant to use, values: forge"` WALDir string `mapstructure:"wal-dir" default:"logs" description:"the directory to store WAL segments"` - WALMode string `mapstructure:"wal-mode" default:"buffered" description:"wal mode to use, values: buffered, unbuffered"` WALWriteMode string `mapstructure:"wal-write-mode" default:"default" description:"wal file write mode to use, values: default, fsync"` WALBufferSizeMB int `mapstructure:"wal-buffer-size-mb" default:"1" description:"the size of the wal write buffer in megabytes"` WALRotationMode string `mapstructure:"wal-rotation-mode" default:"segment-size" description:"wal rotation mode to use, values: segment-size, time"` diff --git a/internal/server/ironhawk/iothread.go b/internal/server/ironhawk/iothread.go index 181e65b77..f130821e2 100644 --- a/internal/server/ironhawk/iothread.go +++ b/internal/server/ironhawk/iothread.go @@ -5,7 +5,6 @@ package ironhawk import ( "context" - "fmt" "log/slog" "strings" @@ -96,7 +95,6 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa } // Log command to WAL if enabled and not a replay - fmt.Println("wal.DefaultWAL", wal.DefaultWAL) if wal.DefaultWAL != nil && !_c.IsReplay { if err := wal.DefaultWAL.LogCommand(_c.C); err != nil { slog.Error("failed to log command to WAL", slog.Any("error", err)) diff --git a/internal/wal/wal.go b/internal/wal/wal.go index c177550ba..9ffab5d34 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -58,11 +58,11 @@ func periodicRotate() { } } -func StartAsyncJobs() { +func startAsyncJobs() { go periodicRotate() } -func ShutdownBG() { +func TeardownWAL() { close(stopCh) ticker.Stop() } @@ -70,7 +70,7 @@ func ShutdownBG() { func SetupWAL() { switch config.Config.WALVariant { case "forge": - DefaultWAL = NewWALForge() + DefaultWAL = newWalForge() default: return } diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index 9469d849f..df7305711 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -28,11 +28,8 @@ import ( ) const ( - segmentPrefix = "seg-" - segmentSuffix = ".wal" - RotationModeTime = "time" - RetentionModeTime = "time" - WALModeUnbuffered = "unbuffered" + segmentPrefix = "seg-" + segmentSuffix = ".wal" ) var bb []byte @@ -43,16 +40,9 @@ func init() { bb = make([]byte, 10*1024) } -type WALForgeEntry struct { - Len uint32 - Crc32 uint32 - Payload []byte -} - -type WALForge struct { +type walForge struct { logDir string currentSegmentFile *os.File - walMode string writeMode string maxSegmentSize uint32 maxSegmentCount int @@ -73,11 +63,10 @@ type WALForge struct { cancel context.CancelFunc } -func NewWALForge() *WALForge { +func newWalForge() *walForge { ctx, cancel := context.WithCancel(context.Background()) - return &WALForge{ + return &walForge{ logDir: config.Config.WALDir, - walMode: config.Config.WALMode, bufferSyncTicker: time.NewTicker(time.Duration(config.Config.WALBufferSyncIntervalMillis) * time.Millisecond), segmentRotationTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRotationTimeSec) * time.Second), segmentRetentionTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRetentionDurationSec) * time.Second), @@ -93,7 +82,7 @@ func NewWALForge() *WALForge { } } -func (wl *WALForge) Init(t time.Time) error { +func (wl *walForge) Init(t time.Time) error { // TODO - Restore existing checkpoints to memory // Create the directory if it doesn't exist @@ -123,10 +112,10 @@ func (wl *WALForge) Init(t time.Time) error { wl.bufWriter = bufio.NewWriterSize(wl.currentSegmentFile, wl.bufferSize) go wl.keepSyncingBuffer() - go StartAsyncJobs() + go startAsyncJobs() switch wl.rotationMode { - case RotationModeTime: + case "time": go wl.rotateSegmentPeriodically() go wl.deleteSegmentPeriodically() default: @@ -137,7 +126,7 @@ func (wl *WALForge) Init(t time.Time) error { // Log writes a command to the WAL with a monotonically increasing sequence number. // The sequence number is assigned atomically and the command is written to the wl. -func (wl *WALForge) LogCommand(c *wire.Command) error { +func (wl *walForge) LogCommand(c *wire.Command) error { // Lock once for the entire sequence number operation wl.mu.Lock() defer wl.mu.Unlock() @@ -187,18 +176,11 @@ func (wl *WALForge) LogCommand(c *wire.Command) error { wl.currentSegmentSize += entrySize - // if wal-mode unbuffered immediately sync to disk - if wl.walMode == WALModeUnbuffered { - if err := wl.Sync(); err != nil { - return err - } - } - return nil } // rotateLogIfNeeded is not thread safe -func (wl *WALForge) rotateLogIfNeeded(entrySize uint32) error { +func (wl *walForge) rotateLogIfNeeded(entrySize uint32) error { if wl.currentSegmentSize+entrySize > wl.maxSegmentSize { if err := wl.rotateLog(); err != nil { return err @@ -208,7 +190,7 @@ func (wl *WALForge) rotateLogIfNeeded(entrySize uint32) error { } // rotateLog is not thread safe -func (wl *WALForge) rotateLog() error { +func (wl *walForge) rotateLog() error { if err := wl.Sync(); err != nil { return err } @@ -237,7 +219,7 @@ func (wl *WALForge) rotateLog() error { return nil } -func (wl *WALForge) deleteOldestSegment() error { +func (wl *walForge) deleteOldestSegment() error { oldestSegmentFilePath := filepath.Join(wl.logDir, segmentPrefix+fmt.Sprintf("%d", wl.oldestSegmentIndex)+segmentSuffix) // TODO: checkpoint before deleting the file @@ -249,7 +231,7 @@ func (wl *WALForge) deleteOldestSegment() error { } // Close the WAL file. It also calls Sync() on the wl. -func (wl *WALForge) Close() error { +func (wl *walForge) Close() error { wl.cancel() if err := wl.Sync(); err != nil { return err @@ -259,7 +241,7 @@ func (wl *WALForge) Close() error { // Writes out any data in the WAL's in-memory buffer to the segment file. If // fsync is enabled, it also calls fsync on the segment file. -func (wl *WALForge) Sync() error { +func (wl *walForge) Sync() error { if err := wl.bufWriter.Flush(); err != nil { return err } @@ -271,7 +253,7 @@ func (wl *WALForge) Sync() error { return nil } -func (wl *WALForge) keepSyncingBuffer() { +func (wl *walForge) keepSyncingBuffer() { for { select { case <-wl.bufferSyncTicker.C: @@ -289,7 +271,7 @@ func (wl *WALForge) keepSyncingBuffer() { } } -func (wl *WALForge) rotateSegmentPeriodically() { +func (wl *walForge) rotateSegmentPeriodically() { for { select { case <-wl.segmentRotationTicker.C: @@ -306,7 +288,7 @@ func (wl *WALForge) rotateSegmentPeriodically() { } } -func (wl *WALForge) deleteSegmentPeriodically() { +func (wl *walForge) deleteSegmentPeriodically() { for { select { case <-wl.segmentRetentionTicker.C: @@ -322,7 +304,7 @@ func (wl *WALForge) deleteSegmentPeriodically() { } } -func (wl *WALForge) segmentFiles() ([]string, error) { +func (wl *walForge) segmentFiles() ([]string, error) { // Get all segment files matching the pattern files, err := filepath.Glob(filepath.Join(wl.logDir, segmentPrefix+"*"+segmentSuffix)) if err != nil { @@ -342,7 +324,7 @@ func (wl *WALForge) segmentFiles() ([]string, error) { return files, nil } -func (wl *WALForge) Replay(callback func(*w.Element) error) error { +func (wl *walForge) Replay(callback func(*w.Element) error) error { var crc uint32 var entrySize uint32 var el w.Element @@ -406,6 +388,6 @@ func (wl *WALForge) Replay(callback func(*w.Element) error) error { return nil } -func (wl *WALForge) Iterate(e *w.Element, c func(*w.Element) error) error { +func (wl *walForge) Iterate(e *w.Element, c func(*w.Element) error) error { return c(e) } diff --git a/server/main.go b/server/main.go index a19755d61..beb582bf4 100644 --- a/server/main.go +++ b/server/main.go @@ -180,11 +180,10 @@ func Start() { close(sigs) if config.Config.EnableWAL { - wal.ShutdownBG() + wal.TeardownWAL() } cancel() - wg.Wait() } From 7fee4c6c32eed450e0724d88cdccb2419ac3ce3c Mon Sep 17 00:00:00 2001 From: Arpit Date: Sun, 25 May 2025 18:09:01 +0000 Subject: [PATCH 4/8] WAL Rotation Time as a configuration --- config/config.go | 1 + internal/wal/wal.go | 29 +++++++++++++++++++---------- internal/wal/wal_forge.go | 2 +- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/config/config.go b/config/config.go index f45f8ddb8..19e6d6c62 100644 --- a/config/config.go +++ b/config/config.go @@ -67,6 +67,7 @@ type DiceDBConfig struct { WALWriteMode string `mapstructure:"wal-write-mode" default:"default" description:"wal file write mode to use, values: default, fsync"` WALBufferSizeMB int `mapstructure:"wal-buffer-size-mb" default:"1" description:"the size of the wal write buffer in megabytes"` WALRotationMode string `mapstructure:"wal-rotation-mode" default:"segment-size" description:"wal rotation mode to use, values: segment-size, time"` + WALRotationTimeSec int `mapstructure:"wal-rotation-time-sec" default:"10" description:"the time interval (in seconds) after which wal a segment is rotated"` WALMaxSegmentSizeMB int `mapstructure:"wal-max-segment-size-mb" default:"16" description:"the maximum size of a wal segment file in megabytes before rotation"` WALMaxSegmentRotationTimeSec int `mapstructure:"wal-max-segment-rotation-time-sec" default:"60" description:"the time interval (in seconds) after which wal a segment is rotated"` WALBufferSyncIntervalMillis int `mapstructure:"wal-buffer-sync-interval-ms" default:"200" description:"the interval (in milliseconds) at which the wal write buffer is synced to disk"` diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 9ffab5d34..2e5914d92 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -14,23 +14,27 @@ import ( ) type WAL interface { - Init(t time.Time) error - LogCommand(c *wire.Command) error + // Init initializes the WAL. + Init() error + // Close closes the WAL. Close() error + // LogCommand logs a command to the WAL. + LogCommand(c *wire.Command) error + // Replay replays the WAL. Replay(c func(*w.Element) error) error + // Iterate iterates over the WAL. Iterate(e *w.Element, c func(*w.Element) error) error } var ( - ticker *time.Ticker - stopCh chan struct{} - mu sync.Mutex + rotTicker *time.Ticker + stopCh chan struct{} + mu sync.Mutex ) var DefaultWAL WAL func init() { - ticker = time.NewTicker(10 * time.Second) stopCh = make(chan struct{}) } @@ -42,7 +46,7 @@ func rotateWAL() { slog.Warn("error closing the WAL", slog.Any("error", err)) } - if err := DefaultWAL.Init(time.Now()); err != nil { + if err := DefaultWAL.Init(); err != nil { slog.Warn("error creating a new WAL", slog.Any("error", err)) } } @@ -50,7 +54,7 @@ func rotateWAL() { func periodicRotate() { for { select { - case <-ticker.C: + case <-rotTicker.C: rotateWAL() case <-stopCh: return @@ -62,11 +66,15 @@ func startAsyncJobs() { go periodicRotate() } +// TeardownWAL stops the WAL and closes the WAL instance. func TeardownWAL() { close(stopCh) - ticker.Stop() + rotTicker.Stop() } +// SetupWAL initializes the WAL based on the configuration. +// It creates a new WAL instance based on the WAL variant and initializes it. +// If the initialization fails, it panics. func SetupWAL() { switch config.Config.WALVariant { case "forge": @@ -75,7 +83,8 @@ func SetupWAL() { return } - if err := DefaultWAL.Init(time.Now()); err != nil { + rotTicker = time.NewTicker(time.Duration(config.Config.WALRotationTimeSec) * time.Second) + if err := DefaultWAL.Init(); err != nil { slog.Error("could not initialize WAL", slog.Any("error", err)) panic(err) } diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index df7305711..040c31ebd 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -82,7 +82,7 @@ func newWalForge() *walForge { } } -func (wl *walForge) Init(t time.Time) error { +func (wl *walForge) Init() error { // TODO - Restore existing checkpoints to memory // Create the directory if it doesn't exist From 5a9867e593224f1e04f68a4192005d1d74fbeff3 Mon Sep 17 00:00:00 2001 From: Arpit Date: Sun, 25 May 2025 18:14:23 +0000 Subject: [PATCH 5/8] Removing dependency of wal protos from wal interface --- internal/wal/wal.go | 5 +---- internal/wal/wal_forge.go | 14 ++++++++------ server/main.go | 12 +++--------- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 2e5914d92..837646dfa 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -9,7 +9,6 @@ import ( "time" "github.com/dicedb/dice/config" - w "github.com/dicedb/dicedb-go/wal" "github.com/dicedb/dicedb-go/wire" ) @@ -21,9 +20,7 @@ type WAL interface { // LogCommand logs a command to the WAL. LogCommand(c *wire.Command) error // Replay replays the WAL. - Replay(c func(*w.Element) error) error - // Iterate iterates over the WAL. - Iterate(e *w.Element, c func(*w.Element) error) error + ReplayCommand(cb func(c *wire.Command) error) error } var ( diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index 040c31ebd..f26094bbf 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -324,7 +324,7 @@ func (wl *walForge) segmentFiles() ([]string, error) { return files, nil } -func (wl *walForge) Replay(callback func(*w.Element) error) error { +func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { var crc uint32 var entrySize uint32 var el w.Element @@ -376,8 +376,14 @@ func (wl *walForge) Replay(callback func(*w.Element) error) error { return fmt.Errorf("error unmarshaling WAL entry: %w", err) } + var c wire.Command + if err := proto.Unmarshal(el.Payload, &c); err != nil { + file.Close() + return fmt.Errorf("error unmarshaling command: %w", err) + } + // Call provided replay function with parsed command - if err := callback(&el); err != nil { + if err := cb(&c); err != nil { file.Close() return fmt.Errorf("error replaying command: %w", err) } @@ -387,7 +393,3 @@ func (wl *walForge) Replay(callback func(*w.Element) error) error { return nil } - -func (wl *walForge) Iterate(e *w.Element, c func(*w.Element) error) error { - return c(e) -} diff --git a/server/main.go b/server/main.go index beb582bf4..35b798aae 100644 --- a/server/main.go +++ b/server/main.go @@ -21,9 +21,7 @@ import ( "github.com/dicedb/dice/internal/cmd" "github.com/dicedb/dice/internal/server/ironhawk" "github.com/dicedb/dice/internal/shardmanager" - w "github.com/dicedb/dicedb-go/wal" "github.com/dicedb/dicedb-go/wire" - "google.golang.org/protobuf/proto" "github.com/dicedb/dice/internal/wal" @@ -132,13 +130,9 @@ func Start() { // Restore the database from WAL logs if config.Config.EnableWAL { slog.Info("restoring database from WAL") - callback := func(el *w.Element) error { - var cd wire.Command - if err := proto.Unmarshal(el.Payload, &cd); err != nil { - return fmt.Errorf("failed to unmarshal command: %w", err) - } + callback := func(cd *wire.Command) error { cmdTemp := cmd.Cmd{ - C: &cd, + C: cd, IsReplay: true, } _, err := cmdTemp.Execute(shardManager) @@ -147,7 +141,7 @@ func Start() { } return nil } - if err := wal.DefaultWAL.Replay(callback); err != nil { + if err := wal.DefaultWAL.ReplayCommand(callback); err != nil { slog.Error("error restoring from WAL", slog.Any("error", err)) } slog.Info("database restored from WAL") From ad13305091e0c362bd93d26c58b571efc3ddf445 Mon Sep 17 00:00:00 2001 From: Arpit Date: Sun, 25 May 2025 18:22:05 +0000 Subject: [PATCH 6/8] Make WAL interface bare minimum with func moving to forge --- internal/wal/wal.go | 47 ++++++--------------------------------- internal/wal/wal_forge.go | 40 ++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 41 deletions(-) diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 837646dfa..6b26ab702 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -5,8 +5,6 @@ package wal import ( "log/slog" - "sync" - "time" "github.com/dicedb/dice/config" "github.com/dicedb/dicedb-go/wire" @@ -14,59 +12,29 @@ import ( type WAL interface { // Init initializes the WAL. + // The WAL implementation should start all the background jobs and initialize the WAL. Init() error - // Close closes the WAL. - Close() error + // Stop stops the WAL. + // The WAL implementation should stop all the background jobs and close the WAL. + Stop() // LogCommand logs a command to the WAL. LogCommand(c *wire.Command) error - // Replay replays the WAL. + // Replay replays the command from the WAL. ReplayCommand(cb func(c *wire.Command) error) error } +var DefaultWAL WAL var ( - rotTicker *time.Ticker - stopCh chan struct{} - mu sync.Mutex + stopCh chan struct{} ) -var DefaultWAL WAL - func init() { stopCh = make(chan struct{}) } -func rotateWAL() { - mu.Lock() - defer mu.Unlock() - - if err := DefaultWAL.Close(); err != nil { - slog.Warn("error closing the WAL", slog.Any("error", err)) - } - - if err := DefaultWAL.Init(); err != nil { - slog.Warn("error creating a new WAL", slog.Any("error", err)) - } -} - -func periodicRotate() { - for { - select { - case <-rotTicker.C: - rotateWAL() - case <-stopCh: - return - } - } -} - -func startAsyncJobs() { - go periodicRotate() -} - // TeardownWAL stops the WAL and closes the WAL instance. func TeardownWAL() { close(stopCh) - rotTicker.Stop() } // SetupWAL initializes the WAL based on the configuration. @@ -80,7 +48,6 @@ func SetupWAL() { return } - rotTicker = time.NewTicker(time.Duration(config.Config.WALRotationTimeSec) * time.Second) if err := DefaultWAL.Init(); err != nil { slog.Error("could not initialize WAL", slog.Any("error", err)) panic(err) diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index f26094bbf..ec4ec2d5a 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -40,6 +40,10 @@ func init() { bb = make([]byte, 10*1024) } +var ( + rotTicker *time.Ticker +) + type walForge struct { logDir string currentSegmentFile *os.File @@ -83,6 +87,8 @@ func newWalForge() *walForge { } func (wl *walForge) Init() error { + rotTicker = time.NewTicker(time.Duration(config.Config.WALRotationTimeSec) * time.Second) + // TODO - Restore existing checkpoints to memory // Create the directory if it doesn't exist @@ -112,7 +118,7 @@ func (wl *walForge) Init() error { wl.bufWriter = bufio.NewWriterSize(wl.currentSegmentFile, wl.bufferSize) go wl.keepSyncingBuffer() - go startAsyncJobs() + go wl.startAsyncJobs() switch wl.rotationMode { case "time": @@ -393,3 +399,35 @@ func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { return nil } + +func (wl *walForge) Stop() { + rotTicker.Stop() +} + +func (wl *walForge) rotateWAL() { + wl.mu.Lock() + defer wl.mu.Unlock() + + if err := wl.Close(); err != nil { + slog.Warn("error closing the WAL", slog.Any("error", err)) + } + + if err := wl.Init(); err != nil { + slog.Warn("error creating a new WAL", slog.Any("error", err)) + } +} + +func (wl *walForge) periodicRotate() { + for { + select { + case <-rotTicker.C: + wl.rotateWAL() + case <-stopCh: + return + } + } +} + +func (wl *walForge) startAsyncJobs() { + go wl.periodicRotate() +} From 6b79226785668476b337706f1470c8a5a662083f Mon Sep 17 00:00:00 2001 From: Arpit Date: Mon, 26 May 2025 04:36:21 +0000 Subject: [PATCH 7/8] Refactored and identified gaps in WAL implementations --- internal/wal/wal_forge.go | 342 ++++++++++++++++++-------------------- 1 file changed, 164 insertions(+), 178 deletions(-) diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index ec4ec2d5a..c1b8a656d 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -10,7 +10,6 @@ import ( "fmt" "hash/crc32" "io" - "log" "log/slog" "os" "path/filepath" @@ -29,132 +28,123 @@ import ( const ( segmentPrefix = "seg-" - segmentSuffix = ".wal" ) var bb []byte func init() { - // TODO: Pre-allocate a buffer to avoid re-allocating it + // Pre-allocate a buffer to avoid re-allocating it // This will hold one WAL Forge Entry Before it is written to the buffer bb = make([]byte, 10*1024) } -var ( - rotTicker *time.Ticker -) - type walForge struct { - logDir string - currentSegmentFile *os.File - writeMode string - maxSegmentSize uint32 - maxSegmentCount int - currentSegmentIndex int - currentSegmentSize uint32 - oldestSegmentIndex int - bufferSize int - retentionMode string - recoveryMode string - rotationMode string - lastSequenceNo uint64 - bufWriter *bufio.Writer - bufferSyncTicker *time.Ticker - segmentRotationTicker *time.Ticker - segmentRetentionTicker *time.Ticker - mu sync.Mutex - ctx context.Context - cancel context.CancelFunc + // Current Segment File and its writer + csf *os.File + csWriter *bufio.Writer + csIdx int + csSize uint32 + + // TODO: Come up with a way to generate a LSN that is + // monotonically increasing and even after restart it + // resumes from the last LSN and not start from 0. + lsn uint64 + + maxSegmentSizeBytes uint32 + + bufferSyncTicker *time.Ticker + segmentRotationTicker *time.Ticker + + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc } func newWalForge() *walForge { ctx, cancel := context.WithCancel(context.Background()) return &walForge{ - logDir: config.Config.WALDir, - bufferSyncTicker: time.NewTicker(time.Duration(config.Config.WALBufferSyncIntervalMillis) * time.Millisecond), - segmentRotationTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRotationTimeSec) * time.Second), - segmentRetentionTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRetentionDurationSec) * time.Second), - writeMode: config.Config.WALWriteMode, - maxSegmentSize: uint32(config.Config.WALMaxSegmentSizeMB) * 1024 * 1024, - maxSegmentCount: config.Config.WALMaxSegmentCount, - bufferSize: config.Config.WALBufferSizeMB * 1024 * 1024, - retentionMode: config.Config.WALRetentionMode, - recoveryMode: config.Config.WALRecoveryMode, - rotationMode: config.Config.WALRotationMode, - ctx: ctx, - cancel: cancel, + ctx: ctx, + cancel: cancel, + + bufferSyncTicker: time.NewTicker(time.Duration(config.Config.WALBufferSyncIntervalMillis) * time.Millisecond), + segmentRotationTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRotationTimeSec) * time.Second), + + maxSegmentSizeBytes: uint32(config.Config.WALMaxSegmentSizeMB) * 1024 * 1024, } } func (wl *walForge) Init() error { - rotTicker = time.NewTicker(time.Duration(config.Config.WALRotationTimeSec) * time.Second) + // TODO: Once the checkpoint is implemented + // Load the initial state of the database from this checkpoint + // and then reply the WAL files that happened after this checkpoint. - // TODO - Restore existing checkpoints to memory - - // Create the directory if it doesn't exist - if err := os.MkdirAll(wl.logDir, 0755); err != nil { + // Make sure the WAL directory exists + if err := os.MkdirAll(config.Config.WALDir, 0755); err != nil { return err } - // Get the list of log segment files in the directory - files, err := filepath.Glob(filepath.Join(wl.logDir, segmentPrefix+"*"+segmentSuffix)) + // Get the list of log segment files in the WAL directory + sfs, err := wl.segments() if err != nil { return err } + slog.Debug("Loading WAL segments", slog.Any("total_segments", len(sfs))) - if len(files) > 0 { - slog.Debug("Found existing log segments", slog.Any("total_files", len(files))) - // TODO - Check if we have newer WAL entries after the last checkpoint and simultaneously replay and checkpoint them - } - + // TODO: Do not assume that the first segment is always 0 + // Pick the one with the least value of the segment index + // Maintain a metadatafile that holds the latest segment index used + // and during rotation, it increments the segment index and uses it sf, err := os.OpenFile( - filepath.Join(wl.logDir, segmentPrefix+"0"+segmentSuffix), + filepath.Join(config.Config.WALDir, segmentPrefix+"0"+".wal"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } - wl.currentSegmentFile = sf - wl.bufWriter = bufio.NewWriterSize(wl.currentSegmentFile, wl.bufferSize) + wl.csf = sf + wl.csWriter = bufio.NewWriterSize(wl.csf, config.Config.WALBufferSizeMB*1024*1024) - go wl.keepSyncingBuffer() - go wl.startAsyncJobs() + go wl.periodicSyncBuffer() - switch wl.rotationMode { + switch config.Config.WALRotationMode { case "time": - go wl.rotateSegmentPeriodically() - go wl.deleteSegmentPeriodically() + go wl.periodicRotateSegment() default: return nil } return nil } -// Log writes a command to the WAL with a monotonically increasing sequence number. -// The sequence number is assigned atomically and the command is written to the wl. +// LogCommand writes a command to the WAL with a monotonically increasing LSN. func (wl *walForge) LogCommand(c *wire.Command) error { - // Lock once for the entire sequence number operation + // Lock once for the entire LSN operation wl.mu.Lock() defer wl.mu.Unlock() + // marshal the command to bytes b, err := proto.Marshal(c) if err != nil { return err } - wl.lastSequenceNo += 1 + // TODO: This logic changes as we change the LSN format + wl.lsn += 1 el := &w.Element{ - Lsn: wl.lastSequenceNo, + Lsn: wl.lsn, Timestamp: time.Now().UnixNano(), ElementType: w.ElementType_ELEMENT_TYPE_COMMAND, Payload: b, } + // marshal the WAL Element to bytes b, err = proto.Marshal(el) if err != nil { return err } + // Wrap the element with Checksum and Size + // and keep it ready to be written to the segment file through the buffer + // We call this WAL Entry. entrySize := uint32(4 + 4 + len(b)) if err := wl.rotateLogIfNeeded(entrySize); err != nil { return err @@ -163,14 +153,12 @@ func (wl *walForge) LogCommand(c *wire.Command) error { // If the entry size is greater than the buffer size, we need to // create a new buffer. if entrySize > uint32(cap(bb)) { - // TODO: In this case, we can do a one time creation - // of a new buffer and proceed rather than using the - // existing buffer. + // TODO: In this case, we can do a one time creation of a new buffer + // and proceed rather than using the existing buffer. panic(fmt.Errorf("buffer too small, %d > %d", entrySize, len(bb))) } bb = bb[:8+len(b)] - // Calculate CRC32 only on the payload chk := crc32.ChecksumIEEE(b) // Write header and payload @@ -178,16 +166,23 @@ func (wl *walForge) LogCommand(c *wire.Command) error { binary.LittleEndian.PutUint32(bb[4:8], uint32(len(b))) copy(bb[8:], b) - _, _ = wl.bufWriter.Write(bb) - - wl.currentSegmentSize += entrySize + // TODO: Check if we need to handle the error here, + // from my initial understanding, we should not be + // handling the error here because it would never happen. + // Have not tested this yet. + _, _ = wl.csWriter.Write(bb) + wl.csSize += entrySize return nil } -// rotateLogIfNeeded is not thread safe +// rotateLogIfNeeded checks if the current segment size + the entry size is +// greater than the max segment size, and if so, it rotates the log. +// This method is not thread safe and hence should be called with the lock held. func (wl *walForge) rotateLogIfNeeded(entrySize uint32) error { - if wl.currentSegmentSize+entrySize > wl.maxSegmentSize { + // If the current segment size + the entry size is greater than the max segment size, + // we need to rotate the log. + if wl.csSize+entrySize > wl.maxSegmentSizeBytes { if err := wl.rotateLog(); err != nil { return err } @@ -195,150 +190,134 @@ func (wl *walForge) rotateLogIfNeeded(entrySize uint32) error { return nil } -// rotateLog is not thread safe +// rotateLog rotates the log by closing the current segment file, +// incrementing the current segment index, and opening a new segment file. +// This method is thread safe. func (wl *walForge) rotateLog() error { - if err := wl.Sync(); err != nil { + wl.mu.Lock() + defer wl.mu.Unlock() + + // TODO: Ideally this function should not return any error + // Check for the conditions where it can return an error + // and handle them gracefully. + // I fear that we will need to do some cleanup operations in case of errors. + + // Sync the current segment file to disk + if err := wl.sync(); err != nil { return err } - if err := wl.currentSegmentFile.Close(); err != nil { + // Close the current segment file + if err := wl.csf.Close(); err != nil { return err } - wl.currentSegmentIndex++ - if wl.currentSegmentIndex-wl.oldestSegmentIndex+1 > wl.maxSegmentCount { - if err := wl.deleteOldestSegment(); err != nil { - return err - } - wl.oldestSegmentIndex++ - } + // Increment the current segment index + wl.csIdx++ - sf, err := os.OpenFile(filepath.Join(wl.logDir, segmentPrefix+fmt.Sprintf("%d", wl.currentSegmentIndex)+segmentSuffix), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + // Open a new segment file + sf, err := os.OpenFile( + filepath.Join(config.Config.WALDir, fmt.Sprintf("%s%d.wal", segmentPrefix, wl.csIdx)), + os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - log.Fatalf("failed opening file: %s", err) + // TODO: We are panicking here because we are not handling the error + // and we want to make sure that the WAL is not corrupted. + // We need to handle this error gracefully. + panic(fmt.Errorf("failed opening file: %w", err)) } - wl.currentSegmentSize = 0 - wl.currentSegmentFile = sf - wl.bufWriter = bufio.NewWriter(sf) + // Reset the trackers + wl.csf = sf + wl.csSize = 0 + wl.csWriter = bufio.NewWriter(sf) return nil } -func (wl *walForge) deleteOldestSegment() error { - oldestSegmentFilePath := filepath.Join(wl.logDir, segmentPrefix+fmt.Sprintf("%d", wl.oldestSegmentIndex)+segmentSuffix) +// Writes out any data in the WAL's in-memory buffer to the segment file. +// and syncs the segment file to disk. +// This method is thread safe. +func (wl *walForge) sync() error { + wl.mu.Lock() + defer wl.mu.Unlock() - // TODO: checkpoint before deleting the file - if err := os.Remove(oldestSegmentFilePath); err != nil { + // Flush the buffer to the segment file + if err := wl.csWriter.Flush(); err != nil { return err } - wl.oldestSegmentIndex++ - return nil -} -// Close the WAL file. It also calls Sync() on the wl. -func (wl *walForge) Close() error { - wl.cancel() - if err := wl.Sync(); err != nil { + // Sync the segment file to disk to make sure + // it is written to disk. + if err := wl.csf.Sync(); err != nil { return err } - return wl.currentSegmentFile.Close() -} -// Writes out any data in the WAL's in-memory buffer to the segment file. If -// fsync is enabled, it also calls fsync on the segment file. -func (wl *walForge) Sync() error { - if err := wl.bufWriter.Flush(); err != nil { - return err - } - if wl.writeMode == "fsync" { - if err := wl.currentSegmentFile.Sync(); err != nil { - return err - } - } + // TODO: Evaluate if DIRECT_IO is needed here. + // If we are using a file system that supports direct IO, + // we can use it to improve the performance. + // If we are using a file system that does not support direct IO, + // we can use the buffer to improve the performance. return nil } -func (wl *walForge) keepSyncingBuffer() { +func (wl *walForge) periodicSyncBuffer() { for { select { case <-wl.bufferSyncTicker.C: - wl.mu.Lock() - err := wl.Sync() - wl.mu.Unlock() - + err := wl.sync() if err != nil { slog.Error("failed to sync buffer", slog.String("error", err.Error())) } - case <-wl.ctx.Done(): return } } } -func (wl *walForge) rotateSegmentPeriodically() { +func (wl *walForge) periodicRotateSegment() { for { select { case <-wl.segmentRotationTicker.C: - wl.mu.Lock() - err := wl.rotateLog() - wl.mu.Unlock() - if err != nil { + // TODO: Remove this error handling once we clean up the error handling in the rotateLog function. + if err := wl.rotateLog(); err != nil { slog.Error("failed to rotate segment", slog.String("error", err.Error())) } - - case <-wl.ctx.Done(): - return - } - } -} - -func (wl *walForge) deleteSegmentPeriodically() { - for { - select { - case <-wl.segmentRetentionTicker.C: - wl.mu.Lock() - err := wl.deleteOldestSegment() - wl.mu.Unlock() - if err != nil { - slog.Error("failed to delete segment", slog.String("error", err.Error())) - } case <-wl.ctx.Done(): return } } } -func (wl *walForge) segmentFiles() ([]string, error) { +func (wl *walForge) segments() ([]string, error) { // Get all segment files matching the pattern - files, err := filepath.Glob(filepath.Join(wl.logDir, segmentPrefix+"*"+segmentSuffix)) + files, err := filepath.Glob(filepath.Join(config.Config.WALDir, segmentPrefix+"*"+".wal")) if err != nil { return nil, err } - // Sort files by numeric suffix sort.Slice(files, func(i, j int) bool { - parseSuffix := func(name string) int64 { - num, _ := strconv.ParseInt( - strings.TrimPrefix(strings.TrimSuffix(filepath.Base(name), segmentSuffix), segmentPrefix), 10, 64) - return num - } - return parseSuffix(files[i]) < parseSuffix(files[j]) + s1, _ := strconv.Atoi(strings.Split(strings.TrimPrefix(files[i], segmentPrefix), ".")[0]) + s2, _ := strconv.Atoi(strings.Split(strings.TrimPrefix(files[i], segmentPrefix), ".")[0]) + return s1 < s2 }) + // TODO: Check that the segment files are returned in the correct order + // The order has to be in ascending order of the segment index. return files, nil } +// ReplayCommand replays the commands from the WAL files. +// This method is thread safe. func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { - var crc uint32 - var entrySize uint32 + var crc, entrySize uint32 var el w.Element + + // Buffers to hold the header and the element bytes bb1h := make([]byte, 8) bb1ElementBytes := make([]byte, 10*1024) - // Get list of segment files sorted by timestamp - segments, err := wl.segmentFiles() + // Get list of segment files ordered by timestamp in ascending order + segments, err := wl.segments() if err != nil { return fmt.Errorf("error getting wal-segment files: %w", err) } @@ -352,14 +331,20 @@ func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { reader := bufio.NewReader(file) // Format: CRC32 (4 bytes) | Size of WAL entry (4 bytes) | WAL data + + // TODO: Replace this infinite loop with a more elegant solution for { // Read CRC32 (4 bytes) + entrySize (4 bytes) if _, err := io.ReadFull(reader, bb1h); err != nil { + // TODO: this terminating connection should be handled in a better way + // and the loop should not be infinite. + // Edge case: this EOF error can happen even in the next step when + // we are reading the WAL element from the file. if err == io.EOF { break } file.Close() - return fmt.Errorf("error reading CRC32: %w", err) + return fmt.Errorf("error reading WAL: %w", err) } crc = binary.LittleEndian.Uint32(bb1h[0:4]) entrySize = binary.LittleEndian.Uint32(bb1h[4:8]) @@ -372,7 +357,18 @@ func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { // Calculate CRC32 only on the payload expectedCRC := crc32.ChecksumIEEE(bb1ElementBytes[:entrySize]) if crc != expectedCRC { + // TODO: We are reprtitively closing the file here + // A better solution would be to move this logic to a function + // and use defer to close the file. + // The function. thus, in a way processes (replays) one segment at a time. file.Close() + + // TODO: THis is where we should trigger the WAL recovery + // Recovery process is all about truncating the segment file + // till this point and ignoring the rest. + // Log appropriate messages when this happens. + // Evaluate if this recovery mode should be a command line flag + // that would suggest if we should truncate, ignore, or stop the boot process. return fmt.Errorf("CRC32 mismatch: expected %d, got %d", crc, expectedCRC) } @@ -394,40 +390,30 @@ func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { return fmt.Errorf("error replaying command: %w", err) } } - file.Close() } return nil } +// Stop stops the WAL Forge. +// This method is thread safe. func (wl *walForge) Stop() { - rotTicker.Stop() -} - -func (wl *walForge) rotateWAL() { wl.mu.Lock() defer wl.mu.Unlock() - if err := wl.Close(); err != nil { - slog.Warn("error closing the WAL", slog.Any("error", err)) - } + // Stop the tickers + wl.bufferSyncTicker.Stop() + wl.segmentRotationTicker.Stop() - if err := wl.Init(); err != nil { - slog.Warn("error creating a new WAL", slog.Any("error", err)) - } -} + // Cancel the context + wl.cancel() -func (wl *walForge) periodicRotate() { - for { - select { - case <-rotTicker.C: - wl.rotateWAL() - case <-stopCh: - return - } + // Sync the current segment file to disk + if err := wl.sync(); err != nil { + slog.Error("failed to sync current segment file", slog.String("error", err.Error())) } -} -func (wl *walForge) startAsyncJobs() { - go wl.periodicRotate() + wl.csf.Close() + + // TODO: See if we are missing any other cleanup operations. } From ecd35a87791e5bd0881d122ea44995755f953b08 Mon Sep 17 00:00:00 2001 From: Arpit Date: Mon, 26 May 2025 05:43:10 +0000 Subject: [PATCH 8/8] Removing redundant WAL configurations --- config/config.go | 22 ++++++++-------------- internal/wal/wal_forge.go | 4 +++- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/config/config.go b/config/config.go index 19e6d6c62..ea9d51f57 100644 --- a/config/config.go +++ b/config/config.go @@ -61,20 +61,14 @@ type DiceDBConfig struct { Engine string `mapstructure:"engine" default:"ironhawk" description:"the engine to use, values: ironhawk"` - EnableWAL bool `mapstructure:"enable-wal" default:"false" description:"enable write-ahead logging"` - WALVariant string `mapstructure:"wal-variant" default:"forge" description:"wal variant to use, values: forge"` - WALDir string `mapstructure:"wal-dir" default:"logs" description:"the directory to store WAL segments"` - WALWriteMode string `mapstructure:"wal-write-mode" default:"default" description:"wal file write mode to use, values: default, fsync"` - WALBufferSizeMB int `mapstructure:"wal-buffer-size-mb" default:"1" description:"the size of the wal write buffer in megabytes"` - WALRotationMode string `mapstructure:"wal-rotation-mode" default:"segment-size" description:"wal rotation mode to use, values: segment-size, time"` - WALRotationTimeSec int `mapstructure:"wal-rotation-time-sec" default:"10" description:"the time interval (in seconds) after which wal a segment is rotated"` - WALMaxSegmentSizeMB int `mapstructure:"wal-max-segment-size-mb" default:"16" description:"the maximum size of a wal segment file in megabytes before rotation"` - WALMaxSegmentRotationTimeSec int `mapstructure:"wal-max-segment-rotation-time-sec" default:"60" description:"the time interval (in seconds) after which wal a segment is rotated"` - WALBufferSyncIntervalMillis int `mapstructure:"wal-buffer-sync-interval-ms" default:"200" description:"the interval (in milliseconds) at which the wal write buffer is synced to disk"` - WALRetentionMode string `mapstructure:"wal-retention-mode" default:"num-segments" description:"the new horizon for wal segment post cleanup. values: num-segments, time, checkpoint"` - WALMaxSegmentCount int `mapstructure:"wal-max-segment-count" default:"10" description:"the maximum number of segments to retain, if the retention mode is 'num-segments'"` - WALMaxSegmentRetentionDurationSec int `mapstructure:"wal-max-segment-retention-duration-sec" default:"600" description:"the maximum duration (in seconds) for wal segments retention"` - WALRecoveryMode string `mapstructure:"wal-recovery-mode" default:"strict" description:"wal recovery mode in case of a corruption, values: strict, truncate, ignore"` + EnableWAL bool `mapstructure:"enable-wal" default:"false" description:"enable write-ahead logging"` + WALVariant string `mapstructure:"wal-variant" default:"forge" description:"wal variant to use, values: forge"` + WALDir string `mapstructure:"wal-dir" default:"logs" description:"the directory to store WAL segments"` + WALBufferSizeMB int `mapstructure:"wal-buffer-size-mb" default:"1" description:"the size of the wal write buffer in megabytes"` + WALRotationMode string `mapstructure:"wal-rotation-mode" default:"time" description:"wal rotation mode to use, values: segment-size, time"` + WALMaxSegmentSizeMB int `mapstructure:"wal-max-segment-size-mb" default:"16" description:"the maximum size of a wal segment file in megabytes before rotation"` + WALSegmentRotationTimeSec int `mapstructure:"wal-max-segment-rotation-time-sec" default:"60" description:"the time interval (in seconds) after which wal a segment is rotated"` + WALBufferSyncIntervalMillis int `mapstructure:"wal-buffer-sync-interval-ms" default:"200" description:"the interval (in milliseconds) at which the wal write buffer is synced to disk"` } func Load(flags *pflag.FlagSet) { diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index c1b8a656d..d9a96ceb1 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -67,7 +67,7 @@ func newWalForge() *walForge { cancel: cancel, bufferSyncTicker: time.NewTicker(time.Duration(config.Config.WALBufferSyncIntervalMillis) * time.Millisecond), - segmentRotationTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRotationTimeSec) * time.Second), + segmentRotationTicker: time.NewTicker(time.Duration(config.Config.WALSegmentRotationTimeSec) * time.Second), maxSegmentSizeBytes: uint32(config.Config.WALMaxSegmentSizeMB) * 1024 * 1024, } @@ -194,6 +194,7 @@ func (wl *walForge) rotateLogIfNeeded(entrySize uint32) error { // incrementing the current segment index, and opening a new segment file. // This method is thread safe. func (wl *walForge) rotateLog() error { + fmt.Println("rotating log") wl.mu.Lock() defer wl.mu.Unlock() @@ -275,6 +276,7 @@ func (wl *walForge) periodicSyncBuffer() { } func (wl *walForge) periodicRotateSegment() { + fmt.Println("rotating segment") for { select { case <-wl.segmentRotationTicker.C: