Skip to content

Commit a817ca7

Browse files
committed
Merge pull request #1726 from xiang90/fix_sender
etcdserver: add buffer to the sender queue
2 parents fd512ff + 7c4b84a commit a817ca7

File tree

2 files changed

+15
-17
lines changed

2 files changed

+15
-17
lines changed

etcdserver/sender.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
const (
3535
raftPrefix = "/raft"
3636
connPerSender = 4
37+
senderBufSize = connPerSender * 4
3738
)
3839

3940
type sendHub struct {
@@ -150,7 +151,7 @@ func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerS
150151
u: u,
151152
cid: cid,
152153
fs: fs,
153-
q: make(chan []byte),
154+
q: make(chan []byte, senderBufSize),
154155
shouldstop: shouldstop,
155156
}
156157
s.wg.Add(connPerSender)

etcdserver/sender_test.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,6 @@ func TestSendHubShouldStop(t *testing.T) {
9797
cl := newTestCluster(membs)
9898
ls := stats.NewLeaderStats("")
9999
h := newSendHub(tr, cl, nil, ls)
100-
// wait for handle goroutines start
101-
// TODO: wait for goroutines ready before return newSender
102-
time.Sleep(10 * time.Millisecond)
103100

104101
shouldstop := h.ShouldStopNotify()
105102
select {
@@ -123,9 +120,7 @@ func TestSenderSend(t *testing.T) {
123120
tr := &roundTripperRecorder{}
124121
fs := &stats.FollowerStats{}
125122
s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
126-
// wait for handle goroutines start
127-
// TODO: wait for goroutines ready before return newSender
128-
time.Sleep(10 * time.Millisecond)
123+
129124
if err := s.send([]byte("some data")); err != nil {
130125
t.Fatalf("unexpect send error: %v", err)
131126
}
@@ -145,22 +140,26 @@ func TestSenderExceedMaximalServing(t *testing.T) {
145140
tr := newRoundTripperBlocker()
146141
fs := &stats.FollowerStats{}
147142
s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
148-
// wait for handle goroutines start
149-
// TODO: wait for goroutines ready before return newSender
150-
time.Sleep(10 * time.Millisecond)
151-
// It could handle that many requests at the same time.
152-
for i := 0; i < connPerSender; i++ {
143+
144+
// keep the sender busy and make the buffer full
145+
// nothing can go out as we block the sender
146+
for i := 0; i < connPerSender+senderBufSize; i++ {
153147
if err := s.send([]byte("some data")); err != nil {
154148
t.Errorf("send err = %v, want nil", err)
155149
}
150+
// force the sender to grab data
151+
testutil.ForceGosched()
156152
}
157-
// This one exceeds its maximal serving ability
153+
154+
// try to send a data when we are sure the buffer is full
158155
if err := s.send([]byte("some data")); err == nil {
159156
t.Errorf("unexpect send success")
160157
}
158+
159+
// unblock the senders and force them to send out the data
161160
tr.unblock()
162-
// Make handles finish their post
163161
testutil.ForceGosched()
162+
164163
// It could send new data after previous ones succeed
165164
if err := s.send([]byte("some data")); err != nil {
166165
t.Errorf("send err = %v, want nil", err)
@@ -173,9 +172,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
173172
func TestSenderSendFailed(t *testing.T) {
174173
fs := &stats.FollowerStats{}
175174
s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
176-
// wait for handle goroutines start
177-
// TODO: wait for goroutines ready before return newSender
178-
time.Sleep(10 * time.Millisecond)
175+
179176
if err := s.send([]byte("some data")); err != nil {
180177
t.Fatalf("unexpect send error: %v", err)
181178
}

0 commit comments

Comments
 (0)