Skip to content

Commit a1b4ab1

Browse files
authored
Merge pull request #7 from Trendyol/fix/not-updated-xlog-pos
fix: update xlog pos when no change in a while
2 parents 245567c + 52a8ec4 commit a1b4ab1

File tree

3 files changed

+41
-14
lines changed

3 files changed

+41
-14
lines changed

connector.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ func NewConnector(ctx context.Context, cfg config.Config, listenerFunc replicati
9797

9898
m := metric.NewMetric(cfg.Slot.Name)
9999

100-
sl, err := slot.NewSlot(ctx, cfg.DSN(), cfg.Slot, m)
100+
stream := replication.NewStream(conn, cfg, m, &system, listenerFunc)
101+
102+
sl, err := slot.NewSlot(ctx, cfg.DSN(), cfg.Slot, m, stream.(slot.XLogUpdater))
101103
if err != nil {
102104
return nil, err
103105
}
@@ -108,7 +110,6 @@ func NewConnector(ctx context.Context, cfg config.Config, listenerFunc replicati
108110
}
109111
logger.Info("slot info", "info", slotInfo)
110112

111-
stream := replication.NewStream(conn, cfg, m, &system, listenerFunc)
112113
prometheusRegistry := metric.NewRegistry(m)
113114

114115
tdb, err := timescaledb.NewTimescaleDB(ctx, cfg.DSN())

pq/replication/stream.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
goerrors "errors"
77
"fmt"
8+
"sync"
89
"sync/atomic"
910
"time"
1011

@@ -55,6 +56,7 @@ type stream struct {
5556
messageCH chan *Message
5657
listenerFunc ListenerFunc
5758
sinkEnd chan struct{}
59+
mu *sync.RWMutex
5860
config config.Config
5961
lastXLogPos pq.LSN
6062
closed atomic.Bool
@@ -71,6 +73,7 @@ func NewStream(conn pq.Connection, cfg config.Config, m metric.Metric, system *p
7173
listenerFunc: listenerFunc,
7274
lastXLogPos: 10,
7375
sinkEnd: make(chan struct{}, 1),
76+
mu: &sync.RWMutex{},
7477
}
7578
}
7679

@@ -123,7 +126,7 @@ func (s *stream) sink(ctx context.Context) {
123126
}
124127

125128
if pgconn.Timeout(err) {
126-
err = SendStandbyStatusUpdate(ctx, s.conn, uint64(0))
129+
err = SendStandbyStatusUpdate(ctx, s.conn, uint64(s.LoadXLogPos()))
127130
if err != nil {
128131
logger.Error("send stand by status update", "error", err)
129132
break
@@ -235,6 +238,21 @@ func (s *stream) GetMetric() metric.Metric {
235238
return s.metric
236239
}
237240

241+
func (s *stream) UpdateXLogPos(l pq.LSN) {
242+
s.mu.Lock()
243+
defer s.mu.Unlock()
244+
245+
if s.lastXLogPos < l {
246+
s.lastXLogPos = l
247+
}
248+
}
249+
250+
func (s *stream) LoadXLogPos() pq.LSN {
251+
s.mu.RLock()
252+
defer s.mu.RUnlock()
253+
return s.lastXLogPos
254+
}
255+
238256
func SendStandbyStatusUpdate(_ context.Context, conn pq.Connection, walWritePosition uint64) error {
239257
data := make([]byte, 0, 34)
240258
data = append(data, StandbyStatusUpdateByteID)

pq/slot/slot.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,20 @@ var (
2020

2121
var typeMap = pgtype.NewMap()
2222

23+
type XLogUpdater interface {
24+
UpdateXLogPos(l pq.LSN)
25+
}
26+
2327
type Slot struct {
24-
conn pq.Connection
25-
metric metric.Metric
26-
ticker *time.Ticker
27-
statusSQL string
28-
cfg Config
28+
conn pq.Connection
29+
metric metric.Metric
30+
logUpdater XLogUpdater
31+
ticker *time.Ticker
32+
statusSQL string
33+
cfg Config
2934
}
3035

31-
func NewSlot(ctx context.Context, dsn string, cfg Config, m metric.Metric) (*Slot, error) {
36+
func NewSlot(ctx context.Context, dsn string, cfg Config, m metric.Metric, updater XLogUpdater) (*Slot, error) {
3237
query := fmt.Sprintf("SELECT slot_name, slot_type, active, active_pid, restart_lsn, confirmed_flush_lsn, wal_status, PG_CURRENT_WAL_LSN() AS current_lsn FROM pg_replication_slots WHERE slot_name = '%s';", cfg.Name)
3338

3439
conn, err := pq.NewConnection(ctx, dsn)
@@ -37,11 +42,12 @@ func NewSlot(ctx context.Context, dsn string, cfg Config, m metric.Metric) (*Slo
3742
}
3843

3944
return &Slot{
40-
cfg: cfg,
41-
conn: conn,
42-
statusSQL: query,
43-
metric: m,
44-
ticker: time.NewTicker(time.Millisecond * cfg.SlotActivityCheckerInterval),
45+
cfg: cfg,
46+
conn: conn,
47+
statusSQL: query,
48+
metric: m,
49+
ticker: time.NewTicker(time.Millisecond * cfg.SlotActivityCheckerInterval),
50+
logUpdater: updater,
4551
}, nil
4652
}
4753

@@ -109,6 +115,8 @@ func (s *Slot) Metrics(ctx context.Context) {
109115
s.metric.SetSlotRetainedWALSize(float64(slotInfo.RetainedWALSize))
110116
s.metric.SetSlotLag(float64(slotInfo.Lag))
111117

118+
s.logUpdater.UpdateXLogPos(slotInfo.CurrentLSN)
119+
112120
logger.Debug("slot metrics", "info", slotInfo)
113121
}
114122
}

0 commit comments

Comments
 (0)