Skip to content

Commit 81f8e7e

Browse files
authored
[AINode] Fix the bug that AINode would not stop during remove process (#17088)
1 parent d90397d commit 81f8e7e

File tree

5 files changed

+360
-13
lines changed

5 files changed

+360
-13
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.client.sync;
21+
22+
public enum CnToAnSyncRequestType {
23+
// Node Maintenance
24+
STOP_AI_NODE,
25+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.client.sync;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24+
import org.apache.iotdb.commons.client.ClientPoolFactory;
25+
import org.apache.iotdb.commons.client.IClientManager;
26+
import org.apache.iotdb.commons.client.sync.SyncAINodeClient;
27+
import org.apache.iotdb.commons.exception.UncheckedStartupException;
28+
import org.apache.iotdb.rpc.TSStatusCode;
29+
30+
import com.google.common.collect.ImmutableMap;
31+
import org.apache.ratis.util.function.CheckedBiFunction;
32+
import org.apache.thrift.TException;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.util.Arrays;
37+
import java.util.List;
38+
import java.util.Objects;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.stream.Collectors;
41+
42+
public class SyncAINodeClientPool {
43+
44+
private static final Logger LOGGER = LoggerFactory.getLogger(SyncAINodeClientPool.class);
45+
46+
private static final int DEFAULT_RETRY_NUM = 10;
47+
48+
private final IClientManager<TEndPoint, SyncAINodeClient> clientManager;
49+
50+
protected ImmutableMap<
51+
CnToAnSyncRequestType, CheckedBiFunction<Object, SyncAINodeClient, Object, Exception>>
52+
actionMap;
53+
54+
private SyncAINodeClientPool() {
55+
clientManager =
56+
new IClientManager.Factory<TEndPoint, SyncAINodeClient>()
57+
.createClientManager(new ClientPoolFactory.SyncAINodeClientPoolFactory());
58+
buildActionMap();
59+
checkActionMapCompleteness();
60+
}
61+
62+
private void buildActionMap() {
63+
ImmutableMap.Builder<
64+
CnToAnSyncRequestType, CheckedBiFunction<Object, SyncAINodeClient, Object, Exception>>
65+
actionMapBuilder = ImmutableMap.builder();
66+
actionMapBuilder.put(CnToAnSyncRequestType.STOP_AI_NODE, (req, client) -> client.stopAINode());
67+
actionMap = actionMapBuilder.build();
68+
}
69+
70+
private void checkActionMapCompleteness() {
71+
List<CnToAnSyncRequestType> lackList =
72+
Arrays.stream(CnToAnSyncRequestType.values())
73+
.filter(type -> !actionMap.containsKey(type))
74+
.collect(Collectors.toList());
75+
if (!lackList.isEmpty()) {
76+
throw new UncheckedStartupException(
77+
String.format("These request types should be added to actionMap: %s", lackList));
78+
}
79+
}
80+
81+
public Object sendSyncRequestToAINodeWithRetry(
82+
TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType) {
83+
Throwable lastException = new TException();
84+
for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) {
85+
try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) {
86+
return executeSyncRequest(requestType, client, req);
87+
} catch (Exception e) {
88+
lastException = e;
89+
if (retry != DEFAULT_RETRY_NUM - 1) {
90+
LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType, endPoint, retry + 1);
91+
doRetryWait(retry);
92+
}
93+
}
94+
}
95+
LOGGER.error("{} failed on AINode {}", requestType, endPoint, lastException);
96+
return new TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode())
97+
.setMessage("All retry failed due to: " + lastException.getMessage());
98+
}
99+
100+
public Object sendSyncRequestToAINodeWithGivenRetry(
101+
TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType, int retryNum) {
102+
Throwable lastException = new TException();
103+
for (int retry = 0; retry < retryNum; retry++) {
104+
try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) {
105+
return executeSyncRequest(requestType, client, req);
106+
} catch (Exception e) {
107+
lastException = e;
108+
if (retry != retryNum - 1) {
109+
LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType, endPoint, retry + 1);
110+
doRetryWait(retry);
111+
}
112+
}
113+
}
114+
LOGGER.error("{} failed on AINode {}", requestType, endPoint, lastException);
115+
return new TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode())
116+
.setMessage("All retry failed due to: " + lastException.getMessage());
117+
}
118+
119+
private Object executeSyncRequest(
120+
CnToAnSyncRequestType requestType, SyncAINodeClient client, Object req) throws Exception {
121+
return Objects.requireNonNull(actionMap.get(requestType)).apply(req, client);
122+
}
123+
124+
private void doRetryWait(int retryNum) {
125+
try {
126+
if (retryNum < 3) {
127+
TimeUnit.MILLISECONDS.sleep(800L);
128+
} else if (retryNum < 5) {
129+
TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum));
130+
} else {
131+
TimeUnit.MILLISECONDS.sleep(3200L);
132+
}
133+
} catch (InterruptedException e) {
134+
LOGGER.warn("Retry wait failed.", e);
135+
Thread.currentThread().interrupt();
136+
}
137+
}
138+
139+
private static class ClientPoolHolder {
140+
141+
private static final SyncAINodeClientPool INSTANCE = new SyncAINodeClientPool();
142+
143+
private ClientPoolHolder() {
144+
// Empty constructor
145+
}
146+
}
147+
148+
public static SyncAINodeClientPool getInstance() {
149+
return SyncAINodeClientPool.ClientPoolHolder.INSTANCE;
150+
}
151+
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
2323
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2424
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
25+
import org.apache.iotdb.confignode.client.sync.CnToAnSyncRequestType;
26+
import org.apache.iotdb.confignode.client.sync.SyncAINodeClientPool;
2527
import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
2628
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
2729
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
2830
import org.apache.iotdb.confignode.procedure.state.RemoveAINodeState;
2931
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
30-
import org.apache.iotdb.db.protocol.client.an.AINodeClient;
31-
import org.apache.iotdb.db.protocol.client.an.AINodeClientManager;
3232
import org.apache.iotdb.rpc.TSStatusCode;
3333

3434
import org.slf4j.Logger;
@@ -65,16 +65,13 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveAINodeState st
6565
try {
6666
switch (state) {
6767
case NODE_STOP:
68-
TSStatus resp = null;
69-
try (AINodeClient client =
70-
AINodeClientManager.getInstance()
71-
.borrowClient(AINodeClientManager.AINODE_ID_PLACEHOLDER)) {
72-
resp = client.stopAINode();
73-
} catch (Exception e) {
74-
LOGGER.warn(
75-
"Failed to stop AINode {}, but the remove process will continue.",
76-
removedAINode.getInternalEndPoint());
77-
}
68+
TSStatus resp =
69+
(TSStatus)
70+
SyncAINodeClientPool.getInstance()
71+
.sendSyncRequestToAINodeWithRetry(
72+
removedAINode.getInternalEndPoint(),
73+
null,
74+
CnToAnSyncRequestType.STOP_AI_NODE);
7875
if (resp != null && resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
7976
LOGGER.info("Successfully stopped AINode {}", removedAINode.getInternalEndPoint());
8077
} else {
@@ -92,7 +89,6 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveAINodeState st
9289
env.getConfigManager()
9390
.getConsensusManager()
9491
.write(new RemoveAINodePlan(removedAINode));
95-
9692
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
9793
throw new ProcedureException(
9894
String.format(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty;
3232
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
3333
import org.apache.iotdb.commons.client.property.ThriftClientProperty.DefaultProperty;
34+
import org.apache.iotdb.commons.client.sync.SyncAINodeClient;
3435
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
3536
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
3637
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
@@ -393,6 +394,27 @@ public GenericKeyedObjectPool<TEndPoint, AsyncPipeConsensusServiceClient> create
393394
}
394395
}
395396

397+
public static class SyncAINodeClientPoolFactory
398+
implements IClientPoolFactory<TEndPoint, SyncAINodeClient> {
399+
400+
@Override
401+
public GenericKeyedObjectPool<TEndPoint, SyncAINodeClient> createClientPool(
402+
ClientManager<TEndPoint, SyncAINodeClient> manager) {
403+
GenericKeyedObjectPool<TEndPoint, SyncAINodeClient> clientPool =
404+
new GenericKeyedObjectPool<>(
405+
new SyncAINodeClient.Factory(
406+
manager,
407+
new ThriftClientProperty.Builder()
408+
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
409+
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
410+
.build()),
411+
new ClientPoolProperty.Builder<SyncAINodeClient>().build().getConfig());
412+
ClientManagerMetrics.getInstance()
413+
.registerClientManager(this.getClass().getSimpleName(), clientPool);
414+
return clientPool;
415+
}
416+
}
417+
396418
public static class AsyncAINodeHeartbeatServiceClientPoolFactory
397419
implements IClientPoolFactory<TEndPoint, AsyncAINodeInternalServiceClient> {
398420

0 commit comments

Comments
 (0)