Skip to content

Commit 1b3cd90

Browse files
cshuorahil-c
authored andcommitted
[HUDI-9672] Disable skipping clustering for spark incremental query to avoid data duplication (#13659)
1 parent 07a89c4 commit 1b3cd90

File tree

2 files changed

+40
-48
lines changed

2 files changed

+40
-48
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,7 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext,
6060
if (fullTableScan) {
6161
metaClient.getCommitsAndCompactionTimeline
6262
} else {
63-
val completeTimeline =
64-
metaClient.getCommitsTimeline.filterCompletedInstants()
65-
.findInstantsInRangeByCompletionTime(startCompletionTime, endCompletionTime)
66-
67-
// Need to add pending compaction instants to avoid data missing, see HUDI-5990 for details.
68-
val pendingCompactionTimeline = metaClient.getCommitsAndCompactionTimeline.filterPendingMajorOrMinorCompactionTimeline()
69-
concatTimeline(completeTimeline, pendingCompactionTimeline, metaClient)
63+
queryContext.getActiveTimeline
7064
}
7165
}
7266

@@ -200,8 +194,10 @@ trait HoodieIncrementalRelationV2Trait extends HoodieBaseRelation {
200194
.metaClient(metaClient)
201195
.startCompletionTime(optParams(DataSourceReadOptions.START_COMMIT.key))
202196
.endCompletionTime(optParams.getOrElse(DataSourceReadOptions.END_COMMIT.key, null))
203-
.skipClustering(optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SKIP_CLUSTER.key(),
204-
String.valueOf(DataSourceReadOptions.INCREMENTAL_READ_SKIP_CLUSTER.defaultValue)).toBoolean)
197+
// do not support skip cluster for spark incremental query yet to avoid data duplication problem,
198+
// see details in HUDI-9672.
199+
// .skipClustering(optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SKIP_CLUSTER.key(),
200+
// String.valueOf(DataSourceReadOptions.INCREMENTAL_READ_SKIP_CLUSTER.defaultValue)).toBoolean)
205201
.skipCompaction(optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SKIP_COMPACT.key(),
206202
String.valueOf(DataSourceReadOptions.INCREMENTAL_READ_SKIP_COMPACT.defaultValue)).toBoolean)
207203
.rangeType(rangeType)

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala

Lines changed: 35 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -213,54 +213,50 @@ class TestStreamingSource extends StreamTest {
213213
}
214214

215215
test("Test mor streaming source with clustering") {
216-
Array("true", "false").foreach(skipCluster => {
217-
withTempDir { inputDir =>
218-
val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream_cluster"
219-
val metaClient = HoodieTableMetaClient.newTableBuilder()
220-
.setTableType(MERGE_ON_READ)
221-
.setTableName(getTableName(tablePath))
222-
.setRecordKeyFields("id")
223-
.setPreCombineField("ts")
224-
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), tablePath)
216+
withTempDir { inputDir =>
217+
val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream_cluster"
218+
val metaClient = HoodieTableMetaClient.newTableBuilder()
219+
.setTableType(MERGE_ON_READ)
220+
.setTableName(getTableName(tablePath))
221+
.setRecordKeyFields("id")
222+
.setPreCombineField("ts")
223+
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), tablePath)
225224

226-
addData(tablePath, Seq(("1", "a1", "10", "000")))
227-
addData(tablePath, Seq(("2", "a1", "11", "001")))
228-
addData(tablePath, Seq(("3", "a1", "12", "002")))
229-
addData(tablePath, Seq(("4", "a1", "13", "003")), enableInlineCluster = true)
230-
addData(tablePath, Seq(("5", "a1", "14", "004")))
225+
addData(tablePath, Seq(("1", "a1", "10", "000")))
226+
addData(tablePath, Seq(("2", "a1", "11", "001")))
227+
addData(tablePath, Seq(("3", "a1", "12", "002")))
228+
addData(tablePath, Seq(("4", "a1", "13", "003")), enableInlineCluster = true)
229+
addData(tablePath, Seq(("5", "a1", "14", "004")))
231230

232-
val timestamp =
233-
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
234-
.firstInstant().get().getCompletionTime
231+
val timestamp =
232+
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
233+
.firstInstant().get().getCompletionTime
235234

236-
val df = spark.readStream
237-
.format("org.apache.hudi")
238-
.option(START_OFFSET.key(), timestamp)
239-
.option(DataSourceReadOptions.INCREMENTAL_READ_SKIP_CLUSTER.key(), skipCluster)
240-
.load(tablePath)
241-
.select("id", "name", "price", "ts")
235+
val df = spark.readStream
236+
.format("org.apache.hudi")
237+
.option(START_OFFSET.key(), timestamp)
238+
.load(tablePath)
239+
.select("id", "name", "price", "ts")
242240

243-
testStream(df)(
244-
AssertOnQuery { q => q.processAllAvailable(); true },
245-
// Start after the first commit
246-
CheckAnswerRows(Seq(
247-
Row("2", "a1", "11", "001"),
248-
Row("3", "a1", "12", "002"),
249-
Row("4", "a1", "13", "003"),
250-
Row("5", "a1", "14", "004")), lastOnly = true, isSorted = false)
251-
)
252-
assertTrue(metaClient.reloadActiveTimeline
253-
.filter(JavaConversions.getPredicate(
254-
e => e.isCompleted && HoodieTimeline.REPLACE_COMMIT_ACTION.equals(e.getAction)))
255-
.countInstants() > 0)
256-
}
257-
})
241+
testStream(df)(
242+
AssertOnQuery { q => q.processAllAvailable(); true },
243+
// Start after the first commit
244+
CheckAnswerRows(Seq(
245+
Row("2", "a1", "11", "001"),
246+
Row("3", "a1", "12", "002"),
247+
Row("4", "a1", "13", "003"),
248+
Row("5", "a1", "14", "004")), lastOnly = true, isSorted = false))
249+
assertTrue(metaClient.reloadActiveTimeline
250+
.filter(JavaConversions.getPredicate(
251+
e => e.isCompleted && HoodieTimeline.REPLACE_COMMIT_ACTION.equals(e.getAction)))
252+
.countInstants() > 0)
253+
}
258254
}
259255

260256
test("test mor stream source with compaction") {
261257
Array("true", "false").foreach(skipCompact => {
262258
withTempDir { inputDir =>
263-
val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream"
259+
val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream_$skipCompact"
264260
val metaClient = HoodieTableMetaClient.newTableBuilder()
265261
.setTableType(MERGE_ON_READ)
266262
.setTableName(getTableName(tablePath))

0 commit comments

Comments
 (0)