Skip to content
This repository was archived by the owner on Jun 23, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,14 @@ type DiceDBConfig struct {

Engine string `mapstructure:"engine" default:"ironhawk" description:"the engine to use, values: ironhawk"`

EnableWAL bool `mapstructure:"enable-wal" default:"false" description:"enable write-ahead logging"`
WALDir string `mapstructure:"wal-dir" default:"logs" description:"the directory to store WAL segments"`
WALMode string `mapstructure:"wal-mode" default:"buffered" description:"wal mode to use, values: buffered, unbuffered"`
WALWriteMode string `mapstructure:"wal-write-mode" default:"default" description:"wal file write mode to use, values: default, fsync"`
WALBufferSizeMB int `mapstructure:"wal-buffer-size-mb" default:"1" description:"the size of the wal write buffer in megabytes"`
WALRotationMode string `mapstructure:"wal-rotation-mode" default:"segment-size" description:"wal rotation mode to use, values: segment-size, time"`
WALMaxSegmentSizeMB int `mapstructure:"wal-max-segment-size-mb" default:"16" description:"the maximum size of a wal segment file in megabytes before rotation"`
WALMaxSegmentRotationTimeSec int `mapstructure:"wal-max-segment-rotation-time-sec" default:"60" description:"the time interval (in seconds) after which wal a segment is rotated"`
WALBufferSyncIntervalMillis int `mapstructure:"wal-buffer-sync-interval-ms" default:"200" description:"the interval (in milliseconds) at which the wal write buffer is synced to disk"`
WALRetentionMode string `mapstructure:"wal-retention-mode" default:"num-segments" description:"the new horizon for wal segment post cleanup. values: num-segments, time, checkpoint"`
WALMaxSegmentCount int `mapstructure:"wal-max-segment-count" default:"10" description:"the maximum number of segments to retain, if the retention mode is 'num-segments'"`
WALMaxSegmentRetentionDurationSec int `mapstructure:"wal-max-segment-retention-duration-sec" default:"600" description:"the maximum duration (in seconds) for wal segments retention"`
WALRecoveryMode string `mapstructure:"wal-recovery-mode" default:"strict" description:"wal recovery mode in case of a corruption, values: strict, truncate, ignore"`
EnableWAL bool `mapstructure:"enable-wal" default:"false" description:"enable write-ahead logging"`
WALVariant string `mapstructure:"wal-variant" default:"forge" description:"wal variant to use, values: forge"`
WALDir string `mapstructure:"wal-dir" default:"logs" description:"the directory to store WAL segments"`
WALBufferSizeMB int `mapstructure:"wal-buffer-size-mb" default:"1" description:"the size of the wal write buffer in megabytes"`
WALRotationMode string `mapstructure:"wal-rotation-mode" default:"time" description:"wal rotation mode to use, values: segment-size, time"`
WALMaxSegmentSizeMB int `mapstructure:"wal-max-segment-size-mb" default:"16" description:"the maximum size of a wal segment file in megabytes before rotation"`
WALSegmentRotationTimeSec int `mapstructure:"wal-max-segment-rotation-time-sec" default:"60" description:"the time interval (in seconds) after which wal a segment is rotated"`
WALBufferSyncIntervalMillis int `mapstructure:"wal-buffer-sync-interval-ms" default:"200" description:"the interval (in milliseconds) at which the wal write buffer is synced to disk"`
}

func Load(flags *pflag.FlagSet) {
Expand Down
7 changes: 3 additions & 4 deletions internal/server/ironhawk/iothread.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa

select {
case <-ctx.Done():
slog.Debug("io-thread context cancelled, shutting down receive loop")
slog.Debug("io-thread context canceled, shutting down receive loop")
return ctx.Err()
case err := <-errCh:
return err
Expand Down Expand Up @@ -95,9 +95,8 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa
}

// Log command to WAL if enabled and not a replay
if err == nil && wal.GetWAL() != nil && !_c.IsReplay {
// Create WAL entry using protobuf message
if err := wal.GetWAL().LogCommand(_c.C); err != nil {
if wal.DefaultWAL != nil && !_c.IsReplay {
if err := wal.DefaultWAL.LogCommand(_c.C); err != nil {
slog.Error("failed to log command to WAL", slog.Any("error", err))
}
}
Expand Down
3 changes: 1 addition & 2 deletions internal/server/ironhawk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/shardmanager"
"github.com/dicedb/dice/internal/wal"
)

type Server struct {
Expand All @@ -28,7 +27,7 @@ type Server struct {
ioThreadManager *IOThreadManager
}

func NewServer(shardManager *shardmanager.ShardManager, ioThreadManager *IOThreadManager, watchManager *WatchManager, wl wal.WAL) *Server {
func NewServer(shardManager *shardmanager.ShardManager, ioThreadManager *IOThreadManager, watchManager *WatchManager) *Server {
return &Server{
Host: config.Config.Host,
Port: config.Config.Port,
Expand Down
79 changes: 27 additions & 52 deletions internal/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,51 @@ package wal

import (
"log/slog"
"sync"
"time"

w "github.com/dicedb/dicedb-go/wal"
"github.com/dicedb/dice/config"
"github.com/dicedb/dicedb-go/wire"
)

type WAL interface {
Init(t time.Time) error
// Init initializes the WAL.
// The WAL implementation should start all the background jobs and initialize the WAL.
Init() error
// Stop stops the WAL.
// The WAL implementation should stop all the background jobs and close the WAL.
Stop()
// LogCommand logs a command to the WAL.
LogCommand(c *wire.Command) error
Close() error
Replay(c func(*w.Element) error) error
Iterate(e *w.Element, c func(*w.Element) error) error
// Replay replays the command from the WAL.
ReplayCommand(cb func(c *wire.Command) error) error
}

var DefaultWAL WAL
var (
ticker *time.Ticker
stopCh chan struct{}
mu sync.Mutex
wl WAL
)

// GetWAL returns the global WAL instance
func GetWAL() WAL {
mu.Lock()
defer mu.Unlock()
return wl
}

// SetGlobalWAL sets the global WAL instance
func SetWAL(_wl WAL) {
mu.Lock()
defer mu.Unlock()
wl = _wl
}

func init() {
ticker = time.NewTicker(10 * time.Second)
stopCh = make(chan struct{})
}

func rotateWAL(wl WAL) {
mu.Lock()
defer mu.Unlock()

if err := wl.Close(); err != nil {
slog.Warn("error closing the WAL", slog.Any("error", err))
}

if err := wl.Init(time.Now()); err != nil {
slog.Warn("error creating a new WAL", slog.Any("error", err))
}
// TeardownWAL stops the WAL and closes the WAL instance.
func TeardownWAL() {
close(stopCh)
}

func periodicRotate(wl WAL) {
for {
select {
case <-ticker.C:
rotateWAL(wl)
case <-stopCh:
return
}
// SetupWAL initializes the WAL based on the configuration.
// It creates a new WAL instance based on the WAL variant and initializes it.
// If the initialization fails, it panics.
func SetupWAL() {
switch config.Config.WALVariant {
case "forge":
DefaultWAL = newWalForge()
default:
return
}
}

func InitBG(wl WAL) {
go periodicRotate(wl)
}

func ShutdownBG() {
close(stopCh)
ticker.Stop()
if err := DefaultWAL.Init(); err != nil {
slog.Error("could not initialize WAL", slog.Any("error", err))
panic(err)
}
}
Loading