Skip to content

Commit 23cc9d6

Browse files
Fix deadlock with ccevents
Signed-off-by: Marcus Brandenburger <[email protected]>
1 parent 6c41b0e commit 23cc9d6

File tree

1 file changed

+63
-17
lines changed

1 file changed

+63
-17
lines changed

platform/fabric/events.go

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,88 @@ import (
1515

1616
// EventListener models the parameters to use for chaincode listening.
1717
type EventListener struct {
18-
sync.RWMutex
1918
chaincodeListener chan *committer.ChaincodeEvent
2019
subscriber events.Subscriber
2120
chaincodeName string
22-
closing bool
21+
22+
subscribeOnce sync.Once
23+
24+
middleCh chan *committer.ChaincodeEvent
25+
closing chan struct{}
26+
closed chan struct{}
2327
}
2428

2529
func newEventListener(subscriber events.Subscriber, chaincodeName string) *EventListener {
2630
return &EventListener{
27-
chaincodeName: chaincodeName,
28-
subscriber: subscriber,
31+
chaincodeName: chaincodeName,
32+
subscriber: subscriber,
33+
chaincodeListener: make(chan *committer.ChaincodeEvent),
34+
middleCh: make(chan *committer.ChaincodeEvent),
35+
closing: make(chan struct{}),
36+
closed: make(chan struct{}),
2937
}
3038
}
3139

3240
// ChaincodeEvents returns a channel from which chaincode events emitted by transaction functions in the specified chaincode can be read.
33-
func (e *EventListener) ChaincodeEvents() chan *committer.ChaincodeEvent {
34-
e.chaincodeListener = make(chan *committer.ChaincodeEvent, 1)
35-
e.subscriber.Subscribe(e.chaincodeName, e)
41+
func (e *EventListener) ChaincodeEvents() <-chan *committer.ChaincodeEvent {
42+
e.subscribeOnce.Do(func() {
43+
44+
go func() {
45+
exit := func(v *committer.ChaincodeEvent, needSend bool) {
46+
close(e.closed)
47+
if needSend {
48+
e.chaincodeListener <- v
49+
}
50+
close(e.chaincodeListener)
51+
}
52+
53+
for {
54+
select {
55+
case <-e.closing:
56+
exit(nil, false)
57+
return
58+
case v := <-e.middleCh:
59+
select {
60+
case <-e.closing:
61+
exit(v, true)
62+
return
63+
case e.chaincodeListener <- v:
64+
}
65+
}
66+
}
67+
}()
68+
69+
e.subscriber.Subscribe(e.chaincodeName, e)
70+
})
71+
3672
return e.chaincodeListener
3773
}
3874

3975
// CloseChaincodeEvents closes the channel from which chaincode events are read.
4076
func (e *EventListener) CloseChaincodeEvents() {
41-
e.Lock()
42-
e.closing = true
43-
e.Unlock()
44-
45-
e.subscriber.Unsubscribe(e.chaincodeName, e)
46-
close(e.chaincodeListener)
77+
select {
78+
case e.closing <- struct{}{}:
79+
e.subscriber.Unsubscribe(e.chaincodeName, e)
80+
<-e.closed
81+
case <-e.closed:
82+
}
4783
}
4884

4985
// OnReceive pushes events to the listener
5086
func (e *EventListener) OnReceive(event events.Event) {
51-
e.RLock()
52-
defer e.RUnlock()
53-
if !e.closing {
54-
e.chaincodeListener <- event.Message().(*committer.ChaincodeEvent)
87+
if event == nil {
88+
return
89+
}
90+
91+
select {
92+
case <-e.closed:
93+
return
94+
default:
95+
}
96+
97+
select {
98+
case <-e.closed:
99+
return
100+
case e.middleCh <- event.Message().(*committer.ChaincodeEvent):
55101
}
56102
}

0 commit comments

Comments
 (0)