Skip to content

Commit f4a1f60

Browse files
authored
Merge pull request #316 from JiazhenBao/main
Httpclient和async-func优化
2 parents 77fac26 + 03cbcda commit f4a1f60

File tree

12 files changed

+975
-544
lines changed

12 files changed

+975
-544
lines changed

async_function_pool/README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,13 @@
3535
出参
3636
字段名称描述类型说明taskId任务idString任务唯一标识taskStatus单个任务执行状态Integer1成功,2进行中,3不存在taskResult单个任务执行返回结果StringString类型,须用户根据实际类型自行序列化。执行成功与否的标志需要逻辑在返回的序列化String中标记,获取结果后反序列化解析返回信息。
3737
## 异步获取结果asyncGetLogicResult
38-
与同步获取相同,区别点在于同步是流程卡住的,等全部任务执行完成后才会返回。异步则是仅返回当前执行完成的任务。
38+
与同步获取相同,区别点在于同步是流程卡住的,等全部任务执行完成后才会返回。异步则是仅返回当前执行完成的任务。
39+
## 异步执行任务,无返回结果asyncRunLogicNoResult
40+
```
41+
/**
42+
* 异步执行任务,无返回结果
43+
*/
44+
@NaslLogic
45+
public Boolean asyncRunLogicNoResult(Function<String, String> asyncfunction, String requestStr) {
46+
47+
```

async_function_pool/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
<groupId>com.netease</groupId>
1414
<artifactId>async_function_pool</artifactId>
15-
<version>1.0.2</version>
15+
<version>1.0.4</version>
1616

1717
<properties>
1818
<maven.compiler.source>8</maven.compiler.source>

async_function_pool/src/main/java/com/netease/lib/tasks/api/FunctionManagerApi.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
import org.slf4j.LoggerFactory;
77
import org.springframework.stereotype.Component;
88

9+
import javax.annotation.Resource;
910
import java.util.ArrayList;
1011
import java.util.List;
1112
import java.util.Map;
1213
import java.util.UUID;
1314
import java.util.concurrent.CompletableFuture;
1415
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.Executor;
17+
import java.util.concurrent.RejectedExecutionException;
1518
import java.util.function.Function;
1619

1720
/**
@@ -34,6 +37,10 @@ public class FunctionManagerApi {
3437
*/
3538
private final Map<String, CompletableFuture<String>> runningTaskRegister = new ConcurrentHashMap<>();
3639

40+
41+
@Resource(name = "libraryCommonTaskExecutor")
42+
private Executor contextAwareExecutor;
43+
3744
/**
3845
* 初始化注册逻辑
3946
*
@@ -63,7 +70,7 @@ public String asyncRunLogic(String logicKey, String requestStr) {
6370
logger.error("asyncRunLogic not exist: {}", logicKey);
6471
return null;
6572
}
66-
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> function.apply(requestStr));
73+
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> function.apply(requestStr), contextAwareExecutor);
6774
logger.info("asyncRunLogic success: {}", logicKey);
6875
String taskId = UUID.randomUUID().toString();
6976
runningTaskRegister.put(taskId, future);
@@ -130,4 +137,21 @@ public List<ThreadResultDTO> asyncGetLogicResult(List<String> taskIdList) {
130137
return resultDTOList;
131138
}
132139

140+
/**
141+
* 异步执行任务,无返回结果
142+
*/
143+
@NaslLogic
144+
public Boolean asyncRunLogicNoResult(Function<String, String> asyncfunction, String requestStr) {
145+
try {
146+
contextAwareExecutor.execute(() -> asyncfunction.apply(requestStr));
147+
return true;
148+
} catch (RejectedExecutionException e) {
149+
logger.error("Async task rejected for request: {}", requestStr, e);
150+
return false;
151+
} catch (Exception e) {
152+
logger.error("Failed to submit async task for request: {}", requestStr, e);
153+
return false;
154+
}
155+
}
156+
133157
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.netease.lib.tasks.config;
2+
3+
import org.springframework.context.annotation.Bean;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
6+
import org.springframework.web.context.request.RequestAttributes;
7+
import org.springframework.web.context.request.RequestContextHolder;
8+
9+
import javax.annotation.Resource;
10+
import java.util.concurrent.Executor;
11+
import java.util.concurrent.ThreadPoolExecutor;
12+
13+
@Configuration
14+
public class AsyncExecutorConfig {
15+
16+
@Resource(name = "libraryThreadPoolConfig")
17+
private ThreadPoolConfig threadPoolConfig;
18+
19+
@Bean(name = "libraryCommonTaskExecutor")
20+
public Executor commonTaskExecutor() {
21+
// Spring 默认配置是核心线程数大小为1,最大线程容量大小不受限制,队列容量也不受限制。
22+
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
23+
// 核心线程数
24+
executor.setCorePoolSize(Integer.parseInt(threadPoolConfig.getCorePoolSize()));
25+
// 最大线程数
26+
executor.setMaxPoolSize(Integer.parseInt(threadPoolConfig.getMaxPoolSize()));
27+
// 队列大小
28+
executor.setQueueCapacity(Integer.parseInt(threadPoolConfig.getQueueCapacity()));
29+
//核心线程等待销毁时间
30+
executor.setKeepAliveSeconds(Integer.parseInt(threadPoolConfig.getKeepAliveSeconds()));
31+
// 当最大池已满时,此策略保证不会丢失任务请求,但是可能会影响应用程序整体性能。
32+
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
33+
executor.setThreadNamePrefix("common-pool");
34+
// 传递request
35+
executor.setTaskDecorator(runnable -> {
36+
RequestAttributes context = RequestContextHolder.getRequestAttributes();
37+
return () -> {
38+
try {
39+
if (context != null) {
40+
RequestContextHolder.setRequestAttributes(context);
41+
}
42+
runnable.run();
43+
} finally {
44+
if (context != null) {
45+
RequestContextHolder.resetRequestAttributes();
46+
}
47+
}
48+
};
49+
});
50+
executor.initialize();
51+
return executor;
52+
}
53+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.netease.lib.tasks.config;
2+
3+
import com.netease.lowcode.core.EnvironmentType;
4+
import com.netease.lowcode.core.annotation.Environment;
5+
import com.netease.lowcode.core.annotation.NaslConfiguration;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.stereotype.Component;
8+
9+
@Component("libraryThreadPoolConfig")
10+
public class ThreadPoolConfig {
11+
12+
/**
13+
* 核心线程数
14+
*/
15+
@Value("${corePoolSize:8}")
16+
@NaslConfiguration(defaultValue = {
17+
@Environment(type = EnvironmentType.DEV, value = "8"),
18+
@Environment(type = EnvironmentType.ONLINE, value = "8")
19+
})
20+
public String corePoolSize;
21+
22+
/**
23+
* 最大线程数
24+
*/
25+
@Value("${maxPoolSize:24}")
26+
@NaslConfiguration(defaultValue = {
27+
@Environment(type = EnvironmentType.DEV, value = "24"),
28+
@Environment(type = EnvironmentType.ONLINE, value = "24")
29+
})
30+
public String maxPoolSize;
31+
32+
/**
33+
* 队列大小
34+
*/
35+
@Value("${queueCapacity:1024}")
36+
@NaslConfiguration(defaultValue = {
37+
@Environment(type = EnvironmentType.DEV, value = "1024"),
38+
@Environment(type = EnvironmentType.ONLINE, value = "1024")
39+
})
40+
public String queueCapacity;
41+
42+
/**
43+
* 核心线程等待销毁时间
44+
*/
45+
@Value("${keepAliveSeconds:60}")
46+
@NaslConfiguration(defaultValue = {
47+
@Environment(type = EnvironmentType.DEV, value = "60"),
48+
@Environment(type = EnvironmentType.ONLINE, value = "60")
49+
})
50+
public String keepAliveSeconds;
51+
52+
public String getCorePoolSize() {
53+
return corePoolSize;
54+
}
55+
56+
public void setCorePoolSize(String corePoolSize) {
57+
this.corePoolSize = corePoolSize;
58+
}
59+
60+
public String getMaxPoolSize() {
61+
return maxPoolSize;
62+
}
63+
64+
public void setMaxPoolSize(String maxPoolSize) {
65+
this.maxPoolSize = maxPoolSize;
66+
}
67+
68+
public String getQueueCapacity() {
69+
return queueCapacity;
70+
}
71+
72+
public void setQueueCapacity(String queueCapacity) {
73+
this.queueCapacity = queueCapacity;
74+
}
75+
76+
public String getKeepAliveSeconds() {
77+
return keepAliveSeconds;
78+
}
79+
80+
public void setKeepAliveSeconds(String keepAliveSeconds) {
81+
this.keepAliveSeconds = keepAliveSeconds;
82+
}
83+
}

0 commit comments

Comments
 (0)