Skip to content

Commit f287421

Browse files
varant-zlaiezvz
andauthored
Moving to drop table instead of archive (#1074)
## Summary Since BQMS does not support table rename, dropping table instead of archiving it. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - Behavior Changes - On schema changes detected during join output, the system now drops the existing table before saving new data, rather than archiving it. - Ensures the latest schema is applied immediately on reload, without retaining archived copies. - No changes to user-facing flows; compute, save, and reload continue as before. - Log messages updated to reflect the drop-on-schema-change action. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Co-authored-by: ezvz <[email protected]>
1 parent acb8333 commit f287421

File tree

2 files changed

+3
-3
lines changed

2 files changed

+3
-3
lines changed

spark/src/main/scala/ai/chronon/spark/JoinBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ abstract class JoinBase(val joinConfCloned: api.Join,
241241
dfOpt.map { df =>
242242
val table = joinMetaData.outputTable
243243

244-
tableUtils.archiveTableOnSchemaChange(table, df)
244+
tableUtils.dropTableOnSchemaChange(table, df)
245245
df.save(table)
246246

247247
tableUtils.loadTable(table, range.whereClauses)

spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
476476
}
477477
}
478478

479-
def archiveTableOnSchemaChange(tableName: String, incomingDf: DataFrame): Unit = {
479+
def dropTableOnSchemaChange(tableName: String, incomingDf: DataFrame): Unit = {
480480
if (!tableReachable(tableName)) return
481481

482482
val existingSchema = loadTable(tableName).schema
@@ -527,7 +527,7 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
527527
|""".stripMargin)
528528

529529
if (addedCols.nonEmpty || removedCols.nonEmpty || updatedCols.nonEmpty) {
530-
archiveTableIfExists(tableName, None)
530+
dropTableIfExists(tableName)
531531
}
532532

533533
}

0 commit comments

Comments
 (0)