From d326b25206579a8ba5516f990081d57625532a6b Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Fri, 30 May 2025 18:23:56 +0530 Subject: [PATCH] addressing deadlocks in wal --- internal/wal/wal_forge.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index d9a96ceb1..edd5bff4e 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -192,12 +192,8 @@ func (wl *walForge) rotateLogIfNeeded(entrySize uint32) error { // rotateLog rotates the log by closing the current segment file, // incrementing the current segment index, and opening a new segment file. -// This method is thread safe. func (wl *walForge) rotateLog() error { fmt.Println("rotating log") - wl.mu.Lock() - defer wl.mu.Unlock() - // TODO: Ideally this function should not return any error // Check for the conditions where it can return an error // and handle them gracefully. @@ -237,11 +233,7 @@ func (wl *walForge) rotateLog() error { // Writes out any data in the WAL's in-memory buffer to the segment file. // and syncs the segment file to disk. -// This method is thread safe. func (wl *walForge) sync() error { - wl.mu.Lock() - defer wl.mu.Unlock() - // Flush the buffer to the segment file if err := wl.csWriter.Flush(); err != nil { return err @@ -265,10 +257,12 @@ func (wl *walForge) periodicSyncBuffer() { for { select { case <-wl.bufferSyncTicker.C: + wl.mu.Lock() err := wl.sync() if err != nil { slog.Error("failed to sync buffer", slog.String("error", err.Error())) } + wl.mu.Unlock() case <-wl.ctx.Done(): return } @@ -281,9 +275,11 @@ func (wl *walForge) periodicRotateSegment() { select { case <-wl.segmentRotationTicker.C: // TODO: Remove this error handling once we clean up the error handling in the rotateLog function. + wl.mu.Lock() if err := wl.rotateLog(); err != nil { slog.Error("failed to rotate segment", slog.String("error", err.Error())) } + wl.mu.Unlock() case <-wl.ctx.Done(): return }