@@ -243,99 +243,104 @@ func (arc *activeReplicatorCommon) reset() error {
243243// reconnect asynchronously calls replicatorConnectFn until successful, or times out trying. Retry loop can be stopped by cancelling ctx
244244func (arc * activeReplicatorCommon ) reconnect () {
245245 arc .reconnectActive .Set (true )
246- go func () {
247- base .DebugfCtx (arc .ctx , base .KeyReplicate , "starting reconnector" )
248- defer func () {
249- arc .reconnectActive .Set (false )
250- }()
251-
252- initialReconnectInterval := defaultInitialReconnectInterval
253- if arc .config .InitialReconnectInterval != 0 {
254- initialReconnectInterval = arc .config .InitialReconnectInterval
255- }
256- maxReconnectInterval := defaultMaxReconnectInterval
257- if arc .config .MaxReconnectInterval != 0 {
258- maxReconnectInterval = arc .config .MaxReconnectInterval
259- }
246+ go arc .synchronousReconnect ()
247+ }
260248
261- // ctx causes the retry loop to stop if cancelled
262- ctx := arc .ctx
249+ // TODO: CBG-4882 - reconnect() and Start() race on reading/writing ctx
250+ //
251+ //go:norace
252+ func (arc * activeReplicatorCommon ) synchronousReconnect () {
253+ base .DebugfCtx (arc .ctx , base .KeyReplicate , "starting reconnector" )
254+ defer func () {
255+ arc .reconnectActive .Set (false )
256+ }()
263257
264- // if a reconnect timeout is set, we'll wrap the existing so both can stop the retry loop
265- var deadlineCancel context.CancelFunc
266- if arc .config .TotalReconnectTimeout != 0 {
267- ctx , deadlineCancel = context .WithDeadline (ctx , time .Now ().Add (arc .config .TotalReconnectTimeout ))
268- }
258+ initialReconnectInterval := defaultInitialReconnectInterval
259+ if arc .config .InitialReconnectInterval != 0 {
260+ initialReconnectInterval = arc .config .InitialReconnectInterval
261+ }
262+ maxReconnectInterval := defaultMaxReconnectInterval
263+ if arc .config .MaxReconnectInterval != 0 {
264+ maxReconnectInterval = arc .config .MaxReconnectInterval
265+ }
269266
270- sleeperFunc := base .SleeperFuncCtx (
271- base .CreateIndefiniteMaxDoublingSleeperFunc (
272- int (initialReconnectInterval .Milliseconds ()),
273- int (maxReconnectInterval .Milliseconds ())),
274- ctx )
267+ // ctx causes the retry loop to stop if cancelled
268+ ctx := arc .ctx
275269
276- retryFunc := func () ( shouldRetry bool , err error , _ any ) {
277- // check before and after acquiring lock to make sure to exit early if ActiveReplicatorCommon.Stop() was called.
278- if ctx . Err () != nil {
279- return false , ctx . Err (), nil
280- }
270+ // if a reconnect timeout is set, we'll wrap the existing so both can stop the retry loop
271+ var deadlineCancel context. CancelFunc
272+ if arc . config . TotalReconnectTimeout != 0 {
273+ ctx , deadlineCancel = context . WithDeadline ( ctx , time . Now (). Add ( arc . config . TotalReconnectTimeout ))
274+ }
281275
282- arc .lock .Lock ()
283- defer arc .lock .Unlock ()
276+ sleeperFunc := base .SleeperFuncCtx (
277+ base .CreateIndefiniteMaxDoublingSleeperFunc (
278+ int (initialReconnectInterval .Milliseconds ()),
279+ int (maxReconnectInterval .Milliseconds ())),
280+ ctx )
284281
285- if ctx .Err () != nil {
286- return false , ctx .Err (), nil
287- }
282+ retryFunc := func () (shouldRetry bool , err error , _ any ) {
283+ // check before and after acquiring lock to make sure to exit early if ActiveReplicatorCommon.Stop() was called.
284+ if ctx .Err () != nil {
285+ return false , ctx .Err (), nil
286+ }
288287
289- base .DebugfCtx (arc .ctx , base .KeyReplicate , "Attempting to reconnect replicator %s" , arc .config .ID )
288+ arc .lock .Lock ()
289+ defer arc .lock .Unlock ()
290290
291- // preserve lastError from the previous connect attempt
292- arc .setState (ReplicationStateReconnecting )
291+ if ctx .Err () != nil {
292+ return false , ctx .Err (), nil
293+ }
293294
294- // disconnect no-ops if nothing is active, but will close any checkpointer processes, blip contexts, etc, if active.
295- base .TracefCtx (arc .ctx , base .KeyReplicate , "calling disconnect from reconnect" )
296- err = arc ._disconnect ()
297- if err != nil {
298- base .InfofCtx (arc .ctx , base .KeyReplicate , "error stopping replicator on reconnect: %v" , err )
299- }
295+ base .DebugfCtx (arc .ctx , base .KeyReplicate , "Attempting to reconnect replicator %s" , arc .config .ID )
300296
301- // set lastError, but don't set an error state inside the reconnect loop
302- err = arc .replicatorConnectFn ()
303- arc .setLastError (err )
304- arc ._publishStatus ()
297+ // preserve lastError from the previous connect attempt
298+ arc .setState (ReplicationStateReconnecting )
305299
306- if err != nil {
307- base .InfofCtx (arc .ctx , base .KeyReplicate , "error starting replicator %s on reconnect: %v" , arc .config .ID , err )
308- } else {
309- base .DebugfCtx (arc .ctx , base .KeyReplicate , "replicator %s successfully reconnected" , arc .config .ID )
310- }
311- return err != nil , err , nil
300+ // disconnect no-ops if nothing is active, but will close any checkpointer processes, blip contexts, etc, if active.
301+ base .TracefCtx (arc .ctx , base .KeyReplicate , "calling disconnect from reconnect" )
302+ err = arc ._disconnect ()
303+ if err != nil {
304+ base .InfofCtx (arc .ctx , base .KeyReplicate , "error stopping replicator on reconnect: %v" , err )
312305 }
313306
314- retryErr , _ := base .RetryLoop (ctx , "replicator reconnect" , retryFunc , sleeperFunc )
315- // release timer associated with context deadline
316- if deadlineCancel != nil {
317- deadlineCancel ()
318- }
319- // Exit early if no error
320- if retryErr == nil {
321- return
322- }
307+ // set lastError, but don't set an error state inside the reconnect loop
308+ err = arc .replicatorConnectFn ()
309+ arc .setLastError (err )
310+ arc ._publishStatus ()
323311
324- // replicator was stopped - appropriate state has already been set
325- if errors . Is ( ctx . Err (), context . Canceled ) {
326- base . DebugfCtx ( ctx , base . KeyReplicate , "exiting reconnect loop: %v" , retryErr )
327- return
312+ if err != nil {
313+ base . InfofCtx ( arc . ctx , base . KeyReplicate , "error starting replicator %s on reconnect: %v" , arc . config . ID , err )
314+ } else {
315+ base . DebugfCtx ( arc . ctx , base . KeyReplicate , "replicator %s successfully reconnected" , arc . config . ID )
328316 }
317+ return err != nil , err , nil
318+ }
329319
330- base .WarnfCtx (ctx , "aborting reconnect loop: %v" , retryErr )
331- arc .replicationStats .NumReconnectsAborted .Add (1 )
332- arc .lock .Lock ()
333- defer arc .lock .Unlock ()
334- // use setState to preserve last error from retry loop set by setLastError
335- arc .setState (ReplicationStateError )
336- arc ._publishStatus ()
337- arc ._stop ()
338- }()
320+ retryErr , _ := base .RetryLoop (ctx , "replicator reconnect" , retryFunc , sleeperFunc )
321+ // release timer associated with context deadline
322+ if deadlineCancel != nil {
323+ deadlineCancel ()
324+ }
325+ // Exit early if no error
326+ if retryErr == nil {
327+ return
328+ }
329+
330+ // replicator was stopped - appropriate state has already been set
331+ if errors .Is (ctx .Err (), context .Canceled ) {
332+ base .DebugfCtx (ctx , base .KeyReplicate , "exiting reconnect loop: %v" , retryErr )
333+ return
334+ }
335+
336+ base .WarnfCtx (ctx , "aborting reconnect loop: %v" , retryErr )
337+ arc .replicationStats .NumReconnectsAborted .Add (1 )
338+ arc .lock .Lock ()
339+ defer arc .lock .Unlock ()
340+ // use setState to preserve last error from retry loop set by setLastError
341+ arc .setState (ReplicationStateError )
342+ arc ._publishStatus ()
343+ arc ._stop ()
339344}
340345
341346// disconnect will disconnect and stop the replicator, but not set the state - such that it will be reassigned and started again.
0 commit comments