Skip to content

Commit b5059eb

Browse files
authored
fix: make unhealthy signal send non-blocking (#32)
1 parent 98fb5f2 commit b5059eb

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

internal/relay/relay.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,12 @@ func (re *Relay) Start(globalCtx context.Context) error {
103103
// Start the consumer group worker by trigger a signal to the relay loop to fetch
104104
// a consumer worker to fetch initial healthy node.
105105
re.log.Info("starting consumer worker")
106-
re.signalCh <- struct{}{}
106+
// The push is non-blocking to avoid getting stuck trying to send on the poll loop
107+
// if the threshold checker go-routine might have already sent on the channel concurrently.
108+
select {
109+
case re.signalCh <- struct{}{}:
110+
default:
111+
}
107112

108113
wg.Add(1)
109114
// Relay teardown.
@@ -179,7 +184,12 @@ loop:
179184
of, err := re.source.GetHighWatermark(ctx, server.Client)
180185
if err != nil {
181186
re.log.Error("could not get end offsets (first poll); sending unhealthy signal", "id", server.ID, "server", server.Config.BootstrapBrokers, "error", err)
182-
re.signalCh <- struct{}{}
187+
// The push is non-blocking to avoid getting stuck trying to send on the poll loop
188+
// if the threshold checker go-routine might have already sent on the channel concurrently.
189+
select {
190+
case re.signalCh <- struct{}{}:
191+
default:
192+
}
183193

184194
continue loop
185195
}
@@ -197,7 +207,12 @@ loop:
197207
fetches, err := re.source.GetFetches(server)
198208
if err != nil {
199209
re.log.Error("marking server as unhealthy", "server", server.ID)
200-
re.signalCh <- struct{}{}
210+
// The push is non-blocking to avoid getting stuck trying to send on the poll loop
211+
// if the threshold checker go-routine might have already sent on the channel concurrently.
212+
select {
213+
case re.signalCh <- struct{}{}:
214+
default:
215+
}
201216

202217
continue loop
203218
}

internal/relay/source_pool.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,12 @@ func (sp *SourcePool) healthcheck(ctx context.Context, signal chan struct{}) err
368368
sp.fetchCancel()
369369

370370
// Signal the relay poll loop to start asking for a healthy client.
371-
signal <- struct{}{}
371+
// The push is non-blocking to avoid getting stuck trying to send on the poll loop
372+
// if the poll loop's subsection (checking for errors) has already sent a signal
373+
select {
374+
case signal <- struct{}{}:
375+
default:
376+
}
372377
}
373378
}
374379
}

0 commit comments

Comments
 (0)