Skip to content

Commit 9493ff7

Browse files
authored
fix: 同个instance配置多个group避免共享CanalMsgConsumer (#5175)
1 parent 31530dc commit 9493ff7

File tree

1 file changed

+12
-12
lines changed
  • client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader

1 file changed

+12
-12
lines changed

client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,5 @@
11
package com.alibaba.otter.canal.adapter.launcher.loader;
22

3-
import java.util.ArrayList;
4-
import java.util.List;
5-
import java.util.Properties;
6-
import java.util.concurrent.ExecutorService;
7-
import java.util.concurrent.Future;
8-
import java.util.concurrent.TimeUnit;
9-
import java.util.concurrent.TimeoutException;
10-
11-
import org.slf4j.Logger;
12-
import org.slf4j.LoggerFactory;
13-
143
import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
154
import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
165
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -23,6 +12,16 @@
2312
import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
2413
import com.alibaba.otter.canal.connector.core.spi.ExtensionLoader;
2514
import com.alibaba.otter.canal.connector.core.spi.ProxyCanalMsgConsumer;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.Properties;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Future;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.TimeoutException;
2625

2726
/**
2827
* 适配处理器
@@ -63,8 +62,9 @@ public AdapterProcessor(CanalClientConfig canalClientConfig, String destination,
6362

6463
// load connector consumer
6564
ExtensionLoader<CanalMsgConsumer> loader = new ExtensionLoader<>(CanalMsgConsumer.class);
65+
String key = destination + "_" + groupId;
6666
canalMsgConsumer = new ProxyCanalMsgConsumer(loader
67-
.getExtension(canalClientConfig.getMode().toLowerCase(), destination, CONNECTOR_SPI_DIR,
67+
.getExtension(canalClientConfig.getMode().toLowerCase(), key, CONNECTOR_SPI_DIR,
6868
CONNECTOR_STANDBY_SPI_DIR));
6969

7070
Properties properties = canalClientConfig.getConsumerProperties();

0 commit comments

Comments
 (0)