Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.alibaba.otter.canal.client.adapter.support;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Created by @author zhuchao on @date 2021/11/11.
*/
public class FileName2KeyMapping {

private static Map<String, String> MAP = new ConcurrentHashMap<>();

public static void register(String type, String fileName, String key) {
MAP.putIfAbsent(join(type, fileName), key);
}

public static void unregister(String type, String fileName) {
MAP.remove(join(type, fileName));
}

public static String getKey(String type, String fileName) {
return MAP.get(join(type, fileName));
}

private static String join(String type, String fileName) {
return type + "|" + fileName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class Util {

private static final Logger logger = LoggerFactory.getLogger(Util.class);

public static final String AUTO_GENERATED_PREFIX = "AUTO_GENERATED_";

/**
* 通过DS执行sql
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
package com.alibaba.otter.canal.client.adapter.es.core;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
Expand All @@ -21,7 +12,17 @@
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.FileName2KeyMapping;
import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
import com.alibaba.otter.canal.client.adapter.support.SPI;
import com.alibaba.otter.canal.client.adapter.support.Util;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;

/**
* ES外部适配器
Expand All @@ -42,6 +43,8 @@ public abstract class ESAdapter implements OuterAdapter {

protected Properties envProperties;

protected OuterAdapterConfig configuration;

public ESSyncService getEsSyncService() {
return esSyncService;
}
Expand All @@ -58,23 +61,13 @@ public Map<String, Map<String, ESSyncConfig>> getDbTableEsSyncConfig() {
public void init(OuterAdapterConfig configuration, Properties envProperties) {
try {
this.envProperties = envProperties;
this.configuration = configuration;
Map<String, ESSyncConfig> esSyncConfigTmp = ESSyncConfigLoader.load(envProperties);
// 过滤不匹配的key的配置
esSyncConfigTmp.forEach((key, config) -> {
if ((config.getOuterAdapterKey() == null && configuration.getKey() == null)
|| (config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
.equalsIgnoreCase(configuration.getKey()))) {
esSyncConfig.put(key, config);
}
addConfig(key, config);
});

for (Map.Entry<String, ESSyncConfig> entry : esSyncConfig.entrySet()) {
String configName = entry.getKey();
ESSyncConfig config = entry.getValue();

addSyncConfigToCache(configName, config);
}

esSyncService = new ESSyncService(esTemplate);

esConfigMonitor = new ESConfigMonitor();
Expand Down Expand Up @@ -138,7 +131,7 @@ public String getDestination(String task) {
return null;
}

public void addSyncConfigToCache(String configName, ESSyncConfig config) {
private void addSyncConfigToCache(String configName, ESSyncConfig config) {
Properties envProperties = this.envProperties;
SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
config.getEsMapping().setSchemaItem(schemaItem);
Expand All @@ -155,28 +148,69 @@ public void addSyncConfigToCache(String configName, ESSyncConfig config) {
String schema = matcher.group(2);

schemaItem.getAliasTableItems()
.values()
.forEach(tableItem -> {
Map<String, ESSyncConfig> esSyncConfigMap;
if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
+ "-"
+ StringUtils.trimToEmpty(config.getGroupId())
+ "_"
+ tableItem.getSchema() == null ? schema : tableItem.getSchema()
+ "-"
+ tableItem.getTableName(),
k -> new ConcurrentHashMap<>());
} else {
esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
+ "_"
+ tableItem.getSchema() == null ? schema : tableItem.getSchema()
+ "-"
+ tableItem.getTableName(),
k -> new ConcurrentHashMap<>());
}

esSyncConfigMap.put(configName, config);
});
.values()
.forEach(tableItem -> {
Map<String, ESSyncConfig> esSyncConfigMap;
if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
+ "-"
+ StringUtils.trimToEmpty(config.getGroupId())
+ "_"
+ tableItem.getSchema() == null ? schema : tableItem.getSchema()
+ "-"
+ tableItem.getTableName(),
k -> new ConcurrentHashMap<>());
} else {
esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
+ "_"
+ tableItem.getSchema() == null ? schema : tableItem.getSchema()
+ "-"
+ tableItem.getTableName(),
k -> new ConcurrentHashMap<>());
}

esSyncConfigMap.put(configName, config);
});
}

public boolean addConfig(String fileName, ESSyncConfig config) {
if (match(config)) {
esSyncConfig.put(fileName, config);
addSyncConfigToCache(fileName, config);
FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
configuration.getKey());
return true;
}
return false;
}

public void updateConfig(String fileName, ESSyncConfig config) {
if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
.equals(configuration.getKey())) {
// 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
throw new RuntimeException("not allow to change outAdapterKey");
}
esSyncConfig.put(fileName, config);
addSyncConfigToCache(fileName, config);
}

public void deleteConfig(String fileName) {
esSyncConfig.remove(fileName);
for (Map<String, ESSyncConfig> configMap : dbTableEsSyncConfig.values()) {
if (configMap != null) {
configMap.remove(fileName);
}
}
FileName2KeyMapping.unregister(getClass().getAnnotation(SPI.class).value(), fileName);
}

private boolean match(ESSyncConfig config) {
boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
.equalsIgnoreCase(configuration.getKey());
boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
.startsWith(StringUtils
.join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
config.getGroupId()}, '-'));
return sameMatch || prefixMatch;
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package com.alibaba.otter.canal.client.adapter.es.core.monitor;

import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
import com.alibaba.otter.canal.client.adapter.es.core.ESAdapter;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
import com.alibaba.otter.canal.client.adapter.support.Util;
import java.io.File;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
import com.alibaba.otter.canal.client.adapter.es.core.ESAdapter;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
import com.alibaba.otter.canal.client.adapter.support.Util;

public class ESConfigMonitor {

private static final Logger logger = LoggerFactory.getLogger(ESConfigMonitor.class);
Expand All @@ -36,7 +33,7 @@ public void init(ESAdapter esAdapter, Properties envProperties) {
File confDir = Util.getConfDirPath(adapterName);
try {
FileAlterationObserver observer = new FileAlterationObserver(confDir,
FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
FileListener listener = new FileListener();
observer.addListener(listener);
fileMonitor = new FileAlterationMonitor(3000, observer);
Expand Down Expand Up @@ -69,9 +66,14 @@ public void onFileCreate(File file) {
null,
envProperties);
if (config != null) {
// 这里要记得设置esVersion bugfix
config.setEsVersion(adapterName);
config.validate();
addConfigToCache(file, config);
logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
boolean result = esAdapter.addConfig(file.getName(), config);
if (result) {
logger.info("Add a new es mapping config: {} to canal adapter",
file.getName());
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
Expand Down Expand Up @@ -99,12 +101,10 @@ public void onFileChange(File file) {
if (config == null) {
return;
}
// 这里要记得设置esVersion bugfix
config.setEsVersion(adapterName);
config.validate();
if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
deleteConfigFromCache(file);
}
addConfigToCache(file, config);

esAdapter.updateConfig(file.getName(), config);
logger.info("Change a es mapping config: {} of canal adapter", file.getName());
}
} catch (Exception e) {
Expand All @@ -118,29 +118,12 @@ public void onFileDelete(File file) {

try {
if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
deleteConfigFromCache(file);

esAdapter.deleteConfig(file.getName());
logger.info("Delete a es mapping config: {} of canal adapter", file.getName());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

private void addConfigToCache(File file, ESSyncConfig config) {
esAdapter.getEsSyncConfig().put(file.getName(), config);

esAdapter.addSyncConfigToCache(file.getName(), config);
}

private void deleteConfigFromCache(File file) {
esAdapter.getEsSyncConfig().remove(file.getName());
for (Map<String, ESSyncConfig> configMap : esAdapter.getDbTableEsSyncConfig().values()) {
if (configMap != null) {
configMap.remove(file.getName());
}
}

}
}
}
Loading