Skip to content

Commit 0cc8181

Browse files
committed
🐳 #350 修复请求消息在 Broker 环节乱序的问题
1 parent 5f6ba93 commit 0cc8181

File tree

1 file changed

+18
-8
lines changed

1 file changed

+18
-8
lines changed

net-bolt/bolt-core/src/main/java/com/iohao/game/bolt/broker/core/common/DefaultUserProcessorExecutorStrategy.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,30 @@ public Executor getExecutor(UserProcessorExecutorAware userProcessorExecutorAwar
5858
}
5959

6060
String userProcessorName = userProcessorExecutorAware.getClass().getSimpleName();
61-
if ("RequestMessageClientProcessor".equals(userProcessorName)) {
62-
// 单独一个池
63-
return ofExecutor("RequestMessage");
64-
}
6561

66-
return ofExecutor("common");
62+
return switch (userProcessorName) {
63+
// RequestMessage 相关的单独一个池,使用单线程传递请求消息。
64+
case "RequestMessageClientProcessor" -> ofExecutorRequestMessage();
65+
case "RequestMessageBrokerProcessor" -> ofExecutorRequestMessage();
66+
// 其他 UserProcessor 使用 common 多线程消费任务
67+
default -> ofExecutorCommon();
68+
};
69+
}
70+
71+
private Executor ofExecutorRequestMessage() {
72+
// 使用单线程传递请求消息
73+
return ofExecutorCommon("RequestMessage", 1);
6774
}
6875

69-
Executor ofExecutor(String name) {
76+
private Executor ofExecutorCommon() {
77+
int corePoolSize = RuntimeKit.availableProcessors;
78+
return ofExecutorCommon("common", corePoolSize);
79+
}
7080

81+
private Executor ofExecutorCommon(String name, int corePoolSize) {
7182
Executor executor = this.executorMap.get(name);
7283

7384
if (Objects.isNull(executor)) {
74-
int corePoolSize = RuntimeKit.availableProcessors;
7585
var tempExecutor = createExecutor(name, corePoolSize, corePoolSize);
7686
executor = MoreKit.firstNonNull(this.executorMap.putIfAbsent(name, tempExecutor), tempExecutor);
7787

@@ -84,7 +94,7 @@ Executor ofExecutor(String name) {
8494
return executor;
8595
}
8696

87-
Executor createExecutor(String userProcessorName, int corePoolSize, int maximumPoolSize) {
97+
private Executor createExecutor(String userProcessorName, int corePoolSize, int maximumPoolSize) {
8898

8999
/*
90100
* 下面对于 UserProcessor 提供了一些默认的 Executor 配置,

0 commit comments

Comments
 (0)