Skip to content

Commit 4d42391

Browse files
JoeTurkiboks1971
andcommitted
Fix a deadlock with Abort
Fix a deadlock with write and read on abort. more likely to happen with tcp. Co-authored-by: boks1971 <[email protected]>
1 parent 57f9351 commit 4d42391

File tree

2 files changed

+160
-0
lines changed

2 files changed

+160
-0
lines changed

association.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,14 @@ func (a *Association) Abort(reason string) {
578578

579579
a.lock.Unlock()
580580

581+
// short bound for abort flush.
582+
_ = a.netConn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
581583
a.awakeWriteLoop()
582584

585+
// unblock readLoop even if the underlying TCP connection is half-open.
586+
// We want Abort to return promptly during shutdown.
587+
_ = a.netConn.SetReadDeadline(time.Now())
588+
583589
// Wait for readLoop to end
584590
<-a.readLoopCloseCh
585591
}

association_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2691,6 +2691,10 @@ func (c *dumbConn2) RemoteAddr() net.Addr {
26912691
return c.remoteAddr
26922692
}
26932693

2694+
func (c *dumbConn2) SetDeadline(time.Time) error { return nil }
2695+
func (c *dumbConn2) SetReadDeadline(time.Time) error { return nil }
2696+
func (c *dumbConn2) SetWriteDeadline(time.Time) error { return nil }
2697+
26942698
func (c *dumbConn2) inboundHandler(packet []byte) {
26952699
c.mutex.Lock()
26962700
defer c.mutex.Unlock()
@@ -2880,6 +2884,156 @@ func noErrorClose(t *testing.T, closeF func() error) {
28802884
require.NoError(t, closeF())
28812885
}
28822886

2887+
// blockingCloseConn simulates a TCP/TLS connection where Close blocks, and Read
2888+
// only unblocks when a past read deadline is set.
2889+
type blockingCloseConn struct {
2890+
readBlocked chan struct{}
2891+
closeBlocked chan struct{}
2892+
once sync.Once
2893+
}
2894+
2895+
func newBlockingCloseConn() *blockingCloseConn {
2896+
return &blockingCloseConn{
2897+
readBlocked: make(chan struct{}),
2898+
closeBlocked: make(chan struct{}),
2899+
}
2900+
}
2901+
2902+
func (c *blockingCloseConn) unblockRead() {
2903+
c.once.Do(func() { close(c.readBlocked) })
2904+
}
2905+
2906+
func (c *blockingCloseConn) Read(_ []byte) (int, error) {
2907+
<-c.readBlocked
2908+
2909+
return 0, os.ErrDeadlineExceeded
2910+
}
2911+
2912+
func (c *blockingCloseConn) Write(p []byte) (int, error) { return len(p), nil }
2913+
2914+
func (c *blockingCloseConn) Close() error {
2915+
<-c.closeBlocked
2916+
c.unblockRead()
2917+
2918+
return nil
2919+
}
2920+
2921+
func (c *blockingCloseConn) LocalAddr() net.Addr { return &net.IPAddr{} }
2922+
func (c *blockingCloseConn) RemoteAddr() net.Addr { return &net.IPAddr{} }
2923+
func (c *blockingCloseConn) SetDeadline(_ time.Time) error { return nil }
2924+
func (c *blockingCloseConn) SetWriteDeadline(_ time.Time) error { return nil }
2925+
2926+
func (c *blockingCloseConn) SetReadDeadline(t time.Time) error {
2927+
if !t.IsZero() && !t.After(time.Now()) {
2928+
c.unblockRead()
2929+
}
2930+
2931+
return nil
2932+
}
2933+
2934+
func TestAssociationAbortUnblocksStuckRead(t *testing.T) {
2935+
conn := newBlockingCloseConn()
2936+
assoc := createAssociation(Config{
2937+
NetConn: conn,
2938+
LoggerFactory: logging.NewDefaultLoggerFactory(),
2939+
})
2940+
assoc.init(false)
2941+
2942+
done := make(chan struct{})
2943+
go func() {
2944+
assoc.Abort("abort read")
2945+
close(done)
2946+
}()
2947+
2948+
select {
2949+
case <-done:
2950+
case <-time.After(200 * time.Millisecond):
2951+
require.FailNow(t, "Abort did not return while read loop was blocked")
2952+
}
2953+
2954+
close(conn.closeBlocked)
2955+
}
2956+
2957+
// blockingWriteConn simulates a connection whose Write blocks until a write
2958+
// deadline is set, SetWriteDeadline unblocks the pending Write immediately.
2959+
type blockingWriteConn struct {
2960+
readBlocked chan struct{}
2961+
writeBlocked chan struct{}
2962+
writeDeadlineCalled chan struct{}
2963+
unblockReadOnce sync.Once
2964+
unblockWriteOnce sync.Once
2965+
}
2966+
2967+
func newBlockingWriteConn() *blockingWriteConn {
2968+
return &blockingWriteConn{
2969+
readBlocked: make(chan struct{}),
2970+
writeBlocked: make(chan struct{}),
2971+
writeDeadlineCalled: make(chan struct{}, 1),
2972+
}
2973+
}
2974+
2975+
func (c *blockingWriteConn) Read(_ []byte) (int, error) {
2976+
<-c.readBlocked
2977+
2978+
return 0, os.ErrDeadlineExceeded
2979+
}
2980+
2981+
func (c *blockingWriteConn) Write(p []byte) (int, error) {
2982+
<-c.writeBlocked
2983+
2984+
return len(p), nil
2985+
}
2986+
2987+
func (c *blockingWriteConn) Close() error { return nil }
2988+
func (c *blockingWriteConn) LocalAddr() net.Addr { return &net.IPAddr{} }
2989+
func (c *blockingWriteConn) RemoteAddr() net.Addr { return &net.IPAddr{} }
2990+
func (c *blockingWriteConn) SetDeadline(_ time.Time) error { return nil }
2991+
2992+
func (c *blockingWriteConn) SetWriteDeadline(_ time.Time) error {
2993+
c.unblockWriteOnce.Do(func() { close(c.writeBlocked) })
2994+
select {
2995+
case c.writeDeadlineCalled <- struct{}{}:
2996+
default:
2997+
}
2998+
2999+
return nil
3000+
}
3001+
3002+
func (c *blockingWriteConn) SetReadDeadline(t time.Time) error {
3003+
if !t.IsZero() && !t.After(time.Now()) {
3004+
c.unblockReadOnce.Do(func() { close(c.readBlocked) })
3005+
}
3006+
3007+
return nil
3008+
}
3009+
3010+
func TestAssociationAbortSetsWriteDeadline(t *testing.T) {
3011+
conn := newBlockingWriteConn()
3012+
assoc := createAssociation(Config{
3013+
NetConn: conn,
3014+
LoggerFactory: logging.NewDefaultLoggerFactory(),
3015+
})
3016+
assoc.init(false)
3017+
3018+
done := make(chan struct{})
3019+
go func() {
3020+
assoc.Abort("abort write deadline")
3021+
close(done)
3022+
}()
3023+
3024+
select {
3025+
case <-conn.writeDeadlineCalled:
3026+
case <-time.After(200 * time.Millisecond):
3027+
require.FailNow(t, "Abort did not call SetWriteDeadline")
3028+
}
3029+
3030+
select {
3031+
case <-done:
3032+
case <-time.After(300 * time.Millisecond):
3033+
require.FailNow(t, "Abort did not return promptly")
3034+
}
3035+
}
3036+
28833037
// readMyNextTSN uses a lock to read the myNextTSN field of the association.
28843038
// Avoids a data race.
28853039
func readMyNextTSN(a *Association) uint32 {

0 commit comments

Comments
 (0)