Skip to content

Commit fd9ca3a

Browse files
authored
RATIS-2329. NettyRpcProxy should support handling netty channel exception to prevent replication stuck (#1285)
1 parent 0c2b3fa commit fd9ca3a

File tree

3 files changed

+39
-11
lines changed

3 files changed

+39
-11
lines changed

ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,14 @@ static <T> T getFromFuture(CompletableFuture<T> future, Supplier<Object> name, T
9191
}
9292

9393
static boolean shouldReconnect(Throwable e) {
94-
return ReflectionUtils.isInstance(e,
95-
SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class,
96-
AlreadyClosedException.class);
94+
for (; e != null; e = e.getCause()) {
95+
if (ReflectionUtils.isInstance(e,
96+
SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class,
97+
AlreadyClosedException.class, TimeoutIOException.class)) {
98+
return true;
99+
}
100+
}
101+
return false;
97102
}
98103

99104
static void readFully(InputStream in, int buffSize) throws IOException {

ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@
3030
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
3131
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto;
3232
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
33+
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
3334
import org.apache.ratis.util.IOUtils;
3435
import org.apache.ratis.util.PeerProxyMap;
3536
import org.apache.ratis.util.ProtoUtils;
3637
import org.apache.ratis.util.TimeDuration;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3740

3841
import java.io.Closeable;
3942
import java.io.IOException;
@@ -47,6 +50,7 @@
4750
import static org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY;
4851

4952
public class NettyRpcProxy implements Closeable {
53+
public static final Logger LOG = LoggerFactory.getLogger(NettyRpcProxy.class);
5054
public static class PeerMap extends PeerProxyMap<NettyRpcProxy> {
5155
private final EventLoopGroup group;
5256

@@ -121,6 +125,18 @@ protected void channelRead0(ChannelHandlerContext ctx,
121125
future.complete(proto);
122126
}
123127
}
128+
129+
@Override
130+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
131+
client.close();
132+
failOutstandingRequests(new IOException("Caught an exception for the connection to " + peer, cause));
133+
}
134+
135+
@Override
136+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
137+
failOutstandingRequests(new AlreadyClosedException("Channel to " + peer + " is inactive."));
138+
super.channelInactive(ctx);
139+
}
124140
};
125141
final ChannelInitializer<SocketChannel> initializer
126142
= new ChannelInitializer<SocketChannel>() {
@@ -153,9 +169,14 @@ synchronized CompletableFuture<RaftNettyServerReplyProto> pollReply() {
153169
@Override
154170
public synchronized void close() {
155171
client.close();
172+
failOutstandingRequests(new AlreadyClosedException("Closing connection to " + peer));
173+
}
174+
175+
private synchronized void failOutstandingRequests(Throwable cause) {
156176
if (!replies.isEmpty()) {
157-
final IOException e = new IOException("Connection to " + peer + " is closed.");
158-
replies.stream().forEach(f -> f.completeExceptionally(e));
177+
LOG.warn("Still have {} requests outstanding from {} connection: {}",
178+
replies.size(), peer, cause.toString());
179+
replies.forEach(f -> f.completeExceptionally(cause));
159180
replies.clear();
160181
}
161182
}

ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,8 @@ void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER cluste
181181
// the second half still have retry time remaining.
182182
sleepTime.apply(t -> t*2).sleep();
183183

184-
if (leader != null) {
185-
cluster.restartServer(leader, false);
186-
} else {
187-
cluster.start();
188-
}
189-
184+
// The client will try to reconnect, but the server is
185+
// not started at this time and the retry will fail anyway.
190186
// all the calls should fail for ordering guarantee
191187
for(int i = 0; i < replies.size(); i++) {
192188
final CheckedRunnable<Exception> getReply = replies.get(i)::get;
@@ -203,6 +199,12 @@ void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER cluste
203199

204200
testFailureCaseAsync("last-request", () -> client.async().send(new SimpleMessage("last")),
205201
AlreadyClosedException.class, RaftRetryFailureException.class);
202+
203+
if (leader != null) {
204+
cluster.restartServer(leader, false);
205+
} else {
206+
cluster.start();
207+
}
206208
}
207209
}
208210

0 commit comments

Comments
 (0)