Skip to content

Commit 4ffbdc5

Browse files
author
zhangqingsong
committed
Add SensorsData Connector
1 parent 6c2bf02 commit 4ffbdc5

File tree

47 files changed

+677
-657
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+677
-657
lines changed

.github/workflows/backend.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,3 +1435,28 @@ jobs:
14351435
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-redis-e2e -am -Pci
14361436
env:
14371437
MAVEN_OPTS: -Xmx4096m
1438+
1439+
connector-sensorsdata-it:
1440+
needs: [ changes, sanity-check ]
1441+
if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-sensorsdata-e2e')
1442+
runs-on: ${{ matrix.os }}
1443+
strategy:
1444+
matrix:
1445+
java: [ '8', '11' ]
1446+
os: [ 'ubuntu-latest' ]
1447+
timeout-minutes: 120
1448+
steps:
1449+
- uses: actions/checkout@v2
1450+
- name: Set up JDK ${{ matrix.java }}
1451+
uses: actions/setup-java@v3
1452+
with:
1453+
java-version: ${{ matrix.java }}
1454+
distribution: 'temurin'
1455+
cache: 'maven'
1456+
- name: free disk space
1457+
run: tools/github/free_disk_space.sh
1458+
- name: run sensorsdata connector integration test
1459+
run: |
1460+
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-sensorsdata-e2e -am -Pci
1461+
env:
1462+
MAVEN_OPTS: -Xmx4096m

.github/workflows/labeler/label-scope-conf.yml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,4 +315,11 @@ aerospike:
315315
- all:
316316
- changed-files:
317317
- any-glob-to-any-file: seatunnel-connectors-v2/connector-aerospike/**
318-
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(aerospike)/**'
318+
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(aerospike)/**'
319+
320+
sensorsdata:
321+
- all:
322+
- changed-files:
323+
- any-glob-to-any-file: seatunnel-connectors-v2/connector-sensorsdata/**
324+
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(sensorsdata)/**'
325+

config/plugin_config

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,4 @@ connector-sls
9494
connector-qdrant
9595
connector-typesense
9696
connector-cdc-opengauss
97+
connector-sensorsdata
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<details><summary> Change Log </summary>
2+
3+
| Change | Commit | Version |
4+
| --- | --- | --- |
5+
6+
7+
</details>

docs/en/connector-v2/sink/SensorsData.md

Lines changed: 162 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import ChangeLog from '../changelog/connector-sensorsdata.md';
2+
13
# SensorsData
24

35
> SensorsData sink connector
@@ -18,35 +20,40 @@ A sink plugin which use SensorsData SDK send data records.
1820

1921
## Options
2022

21-
| name | type | required | default value |
22-
|--------------------|---------|----------|---------------|
23-
| server_url | string | yes | - |
24-
| bulk_size | int | no | 50 |
25-
| max_cache_row_size | int | no | 0 |
26-
| consumer | string | no | - |
27-
| entity_name | string | yes | users |
28-
| record_type | string | yes | users |
29-
| schema | string | yes | users |
30-
| distinct_id_column | string | yes | - |
31-
| identity_fields | array | yes | - |
32-
| property_fields | array | yes | - |
33-
| event_name | string | yes | - |
34-
| time_column | string | yes | - |
35-
| time_free | boolean | no | false |
36-
| detail_id_column | string | no | - |
37-
| item_id_column | string | no | - |
38-
| item_type_column | string | no | - |
39-
| skip_error_record | boolean | no | false |
40-
| common-options | | no | - |
41-
23+
| name | type | required | default value |
24+
|---------------------------|---------|----------|---------------|
25+
| server_url | string | yes | - |
26+
| bulk_size | int | no | 50 |
27+
| max_cache_row_size | int | no | 0 |
28+
| consumer | string | no | - |
29+
| entity_name | string | yes | users |
30+
| record_type | string | yes | users |
31+
| schema | string | yes | users |
32+
| distinct_id_column | string | yes | - |
33+
| identity_fields | array | yes | - |
34+
| property_fields | array | yes | - |
35+
| event_name | string | yes | - |
36+
| time_column | string | yes | - |
37+
| time_free | boolean | no | false |
38+
| detail_id_column | string | no | - |
39+
| item_id_column | string | no | - |
40+
| item_type_column | string | no | - |
41+
| skip_error_record | boolean | no | false |
42+
| instant_events | array | no | - |
43+
| distinct_id_by_identities | boolean | no | false |
44+
| null_as_profile_unset | boolean | no | false |
45+
| common-options | | no | - |
46+
47+
48+
## Parameter Interpretation
4249
### server_url [string]
4350

44-
SensorsData data sink address, the format is https://${host}:8106/sa?project=${project}
51+
SensorsData data sink address, the format is `https://${host}:8106/sa?project=${project}`
4552

4653
### bulk_size [int]
4754

4855
Threshold for the triggering flush operation in SensorsData SDK. When the memory cache queue reaches this value, the
49-
data in the cache will be sended. The default value is 50.
56+
data in the cache will be sent. The default value is 50.
5057

5158
### max_cache_row_size[int]
5259

@@ -79,14 +86,27 @@ The identity fields of the user entity.
7986

8087
### property_fields[array]
8188

82-
The property fields of the data record.
89+
The property fields of the data record. supported types:
90+
- BOOLEAN
91+
- DECIMAL
92+
- INT
93+
- BIGINT
94+
- FLOAT
95+
- DOUBLE
96+
- NUMBER
97+
- STRING
98+
- DATE
99+
- TIMESTAMP
100+
- LIST
101+
- LIST_COMMA
102+
- LIST_SEMICOLON
83103

84104
### event_name[string]
85105

86106
Currently, two formats are supported:
87107

88108
1. Fill in the name of the event record.
89-
2. Use value of a field from upstream data as the event name, the format is ${your field name}, where event name is the
109+
2. Use value of a field from upstream data as the event name, the format is `${your field name}`, where event name is the
90110
value of the columns of the upstream data.
91111

92112
For example, Upstream data is the following:
@@ -96,8 +116,7 @@ For example, Upstream data is the following:
96116
| Purchase | 16 | data-example1 |
97117
| Order | 23 | data-example2 |
98118

99-
If ${name} is set as the event name, the event name of the first row is "Purchase", and the event name of the second row
100-
is "Order".
119+
If `${name}` is set as the event name, the event name of the first row is "Purchase", and the event name of the second row is "Order".
101120

102121
### time_column[string]
103122

@@ -119,15 +138,29 @@ The item id column of the item entity.
119138

120139
The item type column of the item entity.
121140

122-
### skip_error_record[string]
141+
### skip_error_record[boolean]
123142

124143
Whether ignore the error in translating the data record.
125144

145+
### instant_events[array]
146+
147+
Given a list of event names, mark the event as an instant event.
148+
149+
### distinct_id_by_identities[boolean]
150+
151+
When enabled, this option automatically fills the distinct_id using the values from identity_fields columns when the distinct_id_column value is null. This ensures that SensorsData receives a non-null distinct_id value as required.
152+
153+
### null_as_profile_unset[boolean]
154+
155+
When enabled, null values in profile properties will be converted to profile unset operations, effectively removing the existing value from the profile.
156+
126157
### common options
127158

128159
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
129160

130-
## Example
161+
## Examples
162+
163+
### Basic Event Tracking
131164

132165
```hocon
133166
sink {
@@ -155,9 +188,106 @@ sink {
155188
}
156189
```
157190

158-
## Changelog
191+
### Dynamic Event Names
192+
193+
```hocon
194+
sink {
195+
SensorsData {
196+
server_url = "http://10.1.136.63:8106/sa?project=default"
197+
time_free = true
198+
199+
record_type = events
200+
schema = events
201+
event_name = "${event_type}" # Use dynamic event name from data
202+
time_column = event_timestamp
203+
distinct_id_column = user_id
204+
identity_fields = [
205+
{ source = user_id, target = "$identity_login_id" }
206+
{ source = user_id, target = "$identity_distinct_id" }
207+
]
208+
property_fields = [
209+
{ target = "price", source = amount, type = DECIMAL }
210+
{ target = "category", source = product_category, type = STRING }
211+
{ target = "device", source = device_type, type = STRING }
212+
]
213+
instant_events = ["$AppStart", "$AppEnd"] # Mark specific events as instant
214+
}
215+
}
216+
```
217+
218+
### Profile Property Updates
219+
220+
```hocon
221+
sink {
222+
SensorsData {
223+
server_url = "http://10.1.136.63:8106/sa?project=default"
224+
time_free = true
225+
226+
entity_name = users
227+
record_type = profile
228+
schema = users
229+
distinct_id_column = user_id
230+
identity_fields = [
231+
{ source = email, target = "$identity_email" }
232+
{ source = phone, target = "$identity_phone" }
233+
]
234+
property_fields = [
235+
{ target = "name", source = full_name, type = STRING }
236+
{ target = "age", source = user_age, type = INT }
237+
{ target = "gender", source = user_gender, type = STRING }
238+
{ target = "location", source = user_location, type = STRING }
239+
]
240+
null_as_profile_unset = true # Remove properties when null
241+
}
242+
}
243+
```
244+
245+
### Item Tracking
246+
247+
```hocon
248+
sink {
249+
SensorsData {
250+
server_url = "http://10.1.136.63:8106/sa?project=default"
251+
time_free = true
159252
160-
### 2.3.8-beta 2024-11-13
253+
record_type = items
254+
schema = items
255+
event_name = "$ItemViewed"
256+
time_column = view_time
257+
distinct_id_column = user_id
258+
identity_fields = [
259+
{ source = user_id, target = "$identity_login_id" }
260+
]
261+
property_fields = [
262+
{ target = "view_duration", source = duration, type = INT }
263+
{ target = "referrer", source = referrer_url, type = STRING }
264+
]
265+
item_id_column = product_id
266+
item_type_column = product_type
267+
}
268+
}
269+
```
270+
271+
### Console Output (for Testing)
272+
273+
```hocon
274+
sink {
275+
SensorsData {
276+
server_url = "http://10.1.136.63:8106/sa?project=default"
277+
consumer = "console" # Output to console instead of sending to server
278+
record_type = events
279+
schema = events
280+
event_name = "$TestEvent"
281+
time_column = timestamp
282+
distinct_id_column = test_id
283+
property_fields = [
284+
{ target = "test", source = test_field, type = STRING }
285+
]
286+
}
287+
}
288+
```
289+
290+
## Changelog
161291

162-
- Add SensorsData Sink Connector
292+
<ChangeLog />
163293

plugin-mapping.properties

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ seatunnel.source.Opengauss-CDC = connector-cdc-opengauss
144144
seatunnel.source.GraphQL = connector-graphql
145145
seatunnel.sink.GraphQL = connector-graphql
146146
seatunnel.sink.Aerospike = connector-aerospike
147-
seatunnel.sink.SensorsData-SDK = connector-sensors-sdk
147+
seatunnel.sink.SensorsData = connector-sensorsdata
148148

149149
seatunnel.transform.Sql = seatunnel-transforms-v2
150150
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
@@ -163,4 +163,3 @@ seatunnel.transform.FieldRename = seatunnel-transforms-v2
163163
seatunnel.transform.TableRename = seatunnel-transforms-v2
164164
seatunnel.transform.TableMerge = seatunnel-transforms-v2
165165
seatunnel.transform.TableFilter = seatunnel-transforms-v2
166-
seatunnel.transform.SensorsDataJson = seatunnel-transforms-v2

seatunnel-connectors-v2/connector-sensorsdata/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,10 @@
5252
<version>${project.version}</version>
5353
</dependency>
5454

55+
<dependency>
56+
<groupId>com.sensorsdata.analytics.javasdk</groupId>
57+
<artifactId>SensorsAnalyticsSDK</artifactId>
58+
<version>3.6.9</version>
59+
</dependency>
5560
</dependencies>
5661
</project>

seatunnel-connectors-v2/connector-sensorsdata/src/main/java/org/apache/seatunnel/connectors/seatunnel/sensorsdata/sdk/config/SensorsDataSDKSinkConfig.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ public class SensorsDataSDKSinkConfig extends SensorsDataConfigBase {
3939

4040
public SensorsDataSDKSinkConfig(ReadonlyConfig config) {
4141
super(config);
42-
// server 相关
43-
this.serverUrl = config.get(SensorsDataSDKOptions.SERVER_URL);
44-
this.bulkSize = config.get(SensorsDataSDKOptions.BULK_SIZE);
45-
this.maxCacheRowSize = config.get(SensorsDataSDKOptions.MAX_CACHE_ROW_SIZE);
46-
this.consumer = config.get(SensorsDataSDKOptions.CONSUMER);
42+
// sensorsdata server
43+
this.serverUrl = config.get(SensorsDataSDKSinkOptions.SERVER_URL);
44+
this.bulkSize = config.get(SensorsDataSDKSinkOptions.BULK_SIZE);
45+
this.maxCacheRowSize = config.get(SensorsDataSDKSinkOptions.MAX_CACHE_ROW_SIZE);
46+
this.consumer = config.get(SensorsDataSDKSinkOptions.CONSUMER);
4747
this.instantEvents =
48-
Optional.ofNullable(config.get(SensorsDataSDKOptions.INSTANT_EVENT_LIST))
48+
Optional.ofNullable(config.get(SensorsDataSDKSinkOptions.INSTANT_EVENT_LIST))
4949
.orElse(new ArrayList<>());
5050
}
5151
}
Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,29 @@
2525
import java.util.List;
2626

2727
@SuppressWarnings("checkstyle:MagicNumber")
28-
public interface SensorsDataSDKOptions extends SensorsDataOptions {
28+
public interface SensorsDataSDKSinkOptions extends SensorsDataOptions {
2929

3030
Option<String> SERVER_URL =
3131
Options.key("server_url")
3232
.stringType()
3333
.noDefaultValue()
34-
.withDescription("格式:https://{ip}:8106/sa?project={project}");
34+
.withDescription("Format:https://{ip}:8106/sa?project={project}");
3535

3636
Option<Integer> BULK_SIZE =
3737
Options.key("bulk_size")
3838
.intType()
3939
.defaultValue(50)
40-
.withDescription("触发 flush 操作阈值,当内存缓存队列达到该值时,将缓存中的数据批量上报,默认 50");
40+
.withDescription(
41+
"Threshold for triggering flush operation. When the memory cache queue reaches this value, "
42+
+ "the data in the cache will be batch uploaded.");
4143

4244
Option<Integer> MAX_CACHE_ROW_SIZE =
4345
Options.key("max_cache_row_size")
4446
.intType()
4547
.defaultValue(0)
46-
.withDescription("最大缓存刷新大小,若超过该值,立即触发 flush 操作,默认为 0 ,根据 bulkSize 来进行判断");
48+
.withDescription(
49+
"Maximum cache refresh size. If it exceeds this value, the flush operation will "
50+
+ "be triggered immediately. The default value is 0, which depends on bulkSize.");
4751

4852
Option<String> CONSUMER =
4953
Options.key("consumer")
@@ -55,5 +59,6 @@ public interface SensorsDataSDKOptions extends SensorsDataOptions {
5559
Options.key("instant_events")
5660
.listType()
5761
.defaultValue(new ArrayList<>())
58-
.withDescription("即时事件的事件名列表,默认为空");
62+
.withDescription(
63+
"Given a list of event names, mark the event as an instant event.");
5964
}

0 commit comments

Comments
 (0)