Skip to content

Commit be7d249

Browse files
0xFFFFFFF0Hzoemak
authored andcommitted
es8x adapter (alibaba#4640)
1 parent 1eee23e commit be7d249

File tree

15 files changed

+1463
-0
lines changed

15 files changed

+1463
-0
lines changed

client-adapter/es8x/pom.xml

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<parent>
5+
<artifactId>canal.client-adapter</artifactId>
6+
<groupId>com.alibaba.otter</groupId>
7+
<version>1.1.7-SNAPSHOT</version>
8+
<relativePath>../pom.xml</relativePath>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
<groupId>com.alibaba.otter</groupId>
12+
<artifactId>client-adapter.es8x</artifactId>
13+
<packaging>jar</packaging>
14+
<name>canal client adapter es v8x module for otter ${project.version}</name>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>com.alibaba.otter</groupId>
19+
<artifactId>client-adapter.common</artifactId>
20+
<version>${project.version}</version>
21+
<scope>provided</scope>
22+
</dependency>
23+
<dependency>
24+
<groupId>com.alibaba.otter</groupId>
25+
<artifactId>client-adapter.escore</artifactId>
26+
<version>${project.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>co.elastic.clients</groupId>
30+
<artifactId>elasticsearch-java</artifactId>
31+
<version>8.6.2</version>
32+
</dependency>
33+
<!--<dependency>
34+
<groupId>org.elasticsearch.client</groupId>
35+
<artifactId>elasticsearch-rest-client</artifactId>
36+
<version>8.6.2</version>
37+
</dependency>-->
38+
<dependency>
39+
<groupId>org.elasticsearch.client</groupId>
40+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
41+
<version>7.17.9</version>
42+
</dependency>
43+
44+
<dependency>
45+
<groupId>junit</groupId>
46+
<artifactId>junit</artifactId>
47+
<scope>test</scope>
48+
</dependency>
49+
</dependencies>
50+
51+
<build>
52+
<plugins>
53+
<plugin>
54+
<groupId>org.apache.maven.plugins</groupId>
55+
<artifactId>maven-assembly-plugin</artifactId>
56+
<version>2.4</version>
57+
<configuration>
58+
<descriptorRefs>
59+
<descriptorRef>jar-with-dependencies</descriptorRef>
60+
</descriptorRefs>
61+
</configuration>
62+
<executions>
63+
<execution>
64+
<id>make-assembly</id>
65+
<phase>package</phase>
66+
<goals>
67+
<goal>single</goal>
68+
</goals>
69+
</execution>
70+
</executions>
71+
</plugin>
72+
<plugin>
73+
<artifactId>maven-antrun-plugin</artifactId>
74+
<executions>
75+
<execution>
76+
<phase>package</phase>
77+
<goals>
78+
<goal>run</goal>
79+
</goals>
80+
<configuration>
81+
<tasks>
82+
<copy todir="${project.basedir}/../launcher/target/classes/es8" overwrite="true">
83+
<fileset dir="${project.basedir}/target/classes/es8" erroronmissingdir="true">
84+
<include name="*.yml"/>
85+
</fileset>
86+
</copy>
87+
</tasks>
88+
</configuration>
89+
</execution>
90+
</executions>
91+
</plugin>
92+
</plugins>
93+
</build>
94+
</project>
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.alibaba.otter.canal.client.adapter.es8x;
2+
3+
import com.alibaba.otter.canal.client.adapter.es.core.ESAdapter;
4+
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
5+
import com.alibaba.otter.canal.client.adapter.es8x.etl.ESEtlService;
6+
import com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate;
7+
import com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection;
8+
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
9+
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
10+
import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
11+
import com.alibaba.otter.canal.client.adapter.support.SPI;
12+
import org.elasticsearch.action.search.SearchResponse;
13+
14+
import javax.sql.DataSource;
15+
import java.util.LinkedHashMap;
16+
import java.util.List;
17+
import java.util.Map;
18+
import java.util.Properties;
19+
20+
/**
21+
* ES 8.x 外部适配器
22+
*
23+
* @author ymz 2013-02-23
24+
* @version 1.0.0
25+
*/
26+
@SPI("es8")
27+
public class ES8xAdapter extends ESAdapter {
28+
29+
private ESConnection esConnection;
30+
31+
public ESConnection getEsConnection() {
32+
return esConnection;
33+
}
34+
35+
@Override
36+
public void init(OuterAdapterConfig configuration, Properties envProperties) {
37+
try {
38+
Map<String, String> properties = configuration.getProperties();
39+
40+
String[] hostArray = configuration.getHosts().split(",");
41+
esConnection = new ESConnection(hostArray, properties);
42+
43+
this.esTemplate = new ES8xTemplate(esConnection);
44+
45+
envProperties.put("es.version", "es8");
46+
super.init(configuration, envProperties);
47+
} catch (Throwable e) {
48+
throw new RuntimeException(e);
49+
}
50+
}
51+
52+
@Override
53+
public Map<String, Object> count(String task) {
54+
ESSyncConfig config = esSyncConfig.get(task);
55+
ESSyncConfig.ESMapping mapping = config.getEsMapping();
56+
SearchResponse response = this.esConnection.new ESSearchRequest(mapping.get_index()).size(0).getResponse();
57+
58+
long rowCount = response.getHits().getTotalHits().value;
59+
Map<String, Object> res = new LinkedHashMap<>();
60+
res.put("esIndex", mapping.get_index());
61+
res.put("count", rowCount);
62+
return res;
63+
}
64+
65+
@Override
66+
public EtlResult etl(String task, List<String> params) {
67+
EtlResult etlResult = new EtlResult();
68+
ESSyncConfig config = esSyncConfig.get(task);
69+
if (config != null) {
70+
DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
71+
ESEtlService esEtlService = new ESEtlService(esConnection, config);
72+
if (dataSource != null) {
73+
return esEtlService.importData(params);
74+
} else {
75+
etlResult.setSucceeded(false);
76+
etlResult.setErrorMessage("DataSource not found");
77+
return etlResult;
78+
}
79+
} else {
80+
StringBuilder resultMsg = new StringBuilder();
81+
boolean resSuccess = true;
82+
for (ESSyncConfig configTmp : esSyncConfig.values()) {
83+
// 取所有的destination为task的配置
84+
if (configTmp.getDestination().equals(task)) {
85+
ESEtlService esEtlService = new ESEtlService(esConnection, configTmp);
86+
EtlResult etlRes = esEtlService.importData(params);
87+
if (!etlRes.getSucceeded()) {
88+
resSuccess = false;
89+
resultMsg.append(etlRes.getErrorMessage()).append("\n");
90+
} else {
91+
resultMsg.append(etlRes.getResultMessage()).append("\n");
92+
}
93+
}
94+
}
95+
if (resultMsg.length() > 0) {
96+
etlResult.setSucceeded(resSuccess);
97+
if (resSuccess) {
98+
etlResult.setResultMessage(resultMsg.toString());
99+
} else {
100+
etlResult.setErrorMessage(resultMsg.toString());
101+
}
102+
return etlResult;
103+
}
104+
}
105+
etlResult.setSucceeded(false);
106+
etlResult.setErrorMessage("Task not found");
107+
return etlResult;
108+
}
109+
110+
@Override
111+
public void destroy() {
112+
super.destroy();
113+
if (esConnection != null) {
114+
esConnection.close();
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)