Skip to content

Commit 81f928c

Browse files
authored
Merge branch 'main' into tchow/align-confs
2 parents 84d7004 + e3d313e commit 81f928c

File tree

5 files changed

+72
-59
lines changed

5 files changed

+72
-59
lines changed

api/src/main/scala/ai/chronon/api/Row.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,14 @@ object Row {
195195
debinarizer(value.asInstanceOf[BinaryType])
196196
}
197197

198+
case TimestampType =>
199+
guard {
200+
// Spark timestamps (which is used to map to the TimestampType are meant to be in micros
201+
// thus we multiply by 1000 to work with them
202+
case millis: Long => millis * 1000L
203+
case value => value
204+
}
205+
198206
case StringType => guard { value: Any => deStringer(value.asInstanceOf[StringType]) }
199207
case _ => passThroughFunc
200208
}

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ class DelegatingBigQueryMetastoreCatalog extends TableCatalog with SupportsNames
9797
.getOrElse(throw new NoSuchTableException(s"BigQuery table $identNoCatalog not found."))
9898
table.getDefinition.asInstanceOf[TableDefinition] match {
9999
case view: ViewDefinition => {
100-
connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
100+
throw new UnsupportedOperationException(
101+
s"Cannot load view $identNoCatalog as a table. Views are not supported in this catalog. ")
101102
}
102103
case externalTable: ExternalTableDefinition => {
103104
val uris = externalTable.getSourceUris.asScala

online/src/test/scala/ai/chronon/online/test/CatalystUtilComplexAvroTest.scala renamed to flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala

Lines changed: 59 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
1-
package ai.chronon.online.test
1+
package ai.chronon.flink.test.deser
22

3-
import ai.chronon.api.{StructType => ChrononStructType}
4-
import ai.chronon.online.CatalystUtil
5-
import ai.chronon.online.serde.AvroConversions
3+
import ai.chronon.api.{Accuracy, Builders, GroupBy}
4+
import ai.chronon.flink.deser.SourceProjectionDeserializationSchema
5+
import ai.chronon.online.serde.{AvroCodec, AvroSerDe}
66
import org.apache.avro.Schema
77
import org.apache.avro.generic.{GenericData, GenericRecord}
88
import org.apache.avro.io.{BinaryEncoder, EncoderFactory}
99
import org.apache.avro.specific.SpecificDatumWriter
10-
import org.apache.spark.sql.Row
11-
import org.apache.spark.sql.avro.AvroCatalystUtils
12-
import org.apache.spark.sql.catalyst.InternalRow
13-
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
10+
import org.apache.flink.api.common.functions.util.ListCollector
1411
import org.scalatest.flatspec.AnyFlatSpec
1512

1613
import java.io.ByteArrayOutputStream
1714
import java.time.Instant
15+
import java.util
1816
import java.util.{Base64, UUID, HashMap => JHashMap}
1917
import scala.collection.JavaConverters._
2018

@@ -23,38 +21,58 @@ import scala.collection.JavaConverters._
2321
class CatalystUtilComplexAvroTest extends AnyFlatSpec {
2422
import LargeEventPayloadGenerator._
2523

26-
val selects: Seq[(String, String)] = Map(
27-
"favorite" -> "IF(event = 'favorite', 1, 0)",
28-
"item_id" -> "EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes.sold_item_ids, attributes.item_id), ','), e -> CAST(e AS LONG)))",
29-
"ts" -> "timestamp",
30-
"add_cart" -> "IF(event = 'add_cart', 1, 0)",
31-
"purchase" -> "IF(event = 'payment', 1, 0)",
32-
"view" -> "IF(event = 'view', 1, 0)"
33-
).toSeq
34-
35-
val wheres: Seq[String] = Seq(
36-
"event in ('add_cart', 'view', 'payment', 'favorite')",
37-
"( (attributes.code in ('1', '3') AND attributes.test_code in ('1', '3')) OR ((NOT attributes.code IS NOT NULL) AND " +
38-
"(NOT attributes.test_code IS NOT NULL) AND attributes.region in ('US', 'CA', 'AU', 'MX', 'JP', 'NZ', 'BR', 'CN') AND logger = 'native' AND src in ('ios', 'android')) )",
39-
"attributes.canary IS NULL OR attributes.canary != 'true'",
40-
"attributes.support IS NULL OR attributes.support != 'true'"
41-
)
24+
val testGroupBy: GroupBy = {
25+
val selects = Map(
26+
"favorite" -> "IF(event = 'favorite', 1, 0)",
27+
"item_id" -> "EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes.sold_item_ids, attributes.item_id), ','), e -> CAST(e AS LONG)))",
28+
"add_cart" -> "IF(event = 'add_cart', 1, 0)",
29+
"purchase" -> "IF(event = 'payment', 1, 0)",
30+
"view" -> "IF(event = 'view', 1, 0)"
31+
)
32+
33+
val wheres: Seq[String] = Seq(
34+
"event in ('add_cart', 'view', 'payment', 'favorite')",
35+
"( (attributes.code in ('1', '3') AND attributes.test_code in ('1', '3')) OR ((NOT attributes.code IS NOT NULL) AND " +
36+
"(NOT attributes.test_code IS NOT NULL) AND attributes.region in ('US', 'CA', 'AU', 'MX', 'JP', 'NZ', 'BR', 'CN') AND logger = 'native' AND src in ('ios', 'android')) )",
37+
"attributes.canary IS NULL OR attributes.canary != 'true'",
38+
"attributes.support IS NULL OR attributes.support != 'true'"
39+
)
40+
41+
Builders.GroupBy(
42+
sources = Seq(
43+
Builders.Source.events(
44+
table = "events.my_item_data",
45+
topic = "my_item_data",
46+
query = Builders.Query(
47+
selects = selects,
48+
wheres = wheres,
49+
timeColumn = "unix_millis(TIMESTAMP(timestamp))",
50+
startPartition = "20231106"
51+
)
52+
)
53+
),
54+
keyColumns = Seq("item_id"),
55+
aggregations = Seq.empty,
56+
metaData = Builders.MetaData(
57+
name = "item_event_group_by"
58+
),
59+
accuracy = Accuracy.TEMPORAL
60+
)
61+
}
4262

4363
def processEvent(base64Payload: String): Seq[Map[String, Any]] = {
4464
val payloadBytes = java.util.Base64.getDecoder.decode(base64Payload)
65+
val avroSchema = AvroCodec.of(testSchema.toString).schema
66+
val avroSerDe = new AvroSerDe(avroSchema)
67+
68+
val deserSchema = new SourceProjectionDeserializationSchema(avroSerDe, testGroupBy)
69+
deserSchema.open(new DummyInitializationContext)
70+
val resultList = new util.ArrayList[Map[String, Any]]()
71+
val listCollector = new ListCollector(resultList)
4572

46-
val encoder = AvroCatalystUtils.buildEncoder(testSchema.toString)
47-
val sparkRowDeser = encoder.asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
48-
val avroDeserializer = AvroCatalystUtils.buildAvroDataToCatalyst(testSchema.toString)
49-
val internalRow = avroDeserializer.nullSafeEval(payloadBytes).asInstanceOf[InternalRow]
50-
val sparkRow = sparkRowDeser(internalRow)
51-
val chrononSchema =
52-
AvroConversions.toChrononSchema(testSchema).asInstanceOf[ChrononStructType]
53-
val eventExprEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]]
54-
val rowSerializer = eventExprEncoder.createSerializer()
55-
val cu = new CatalystUtil(chrononSchema, selects, wheres)
56-
val catalystInternalRow = rowSerializer(sparkRow)
57-
cu.performSql(catalystInternalRow).toSeq
73+
deserSchema.deserialize(payloadBytes, listCollector)
74+
75+
resultList.asScala
5876
}
5977

6078
private def validateQueryResults(result: Seq[Map[String, Any]],
@@ -68,6 +86,7 @@ class CatalystUtilComplexAvroTest extends AnyFlatSpec {
6886
assert(result.map(r => r("add_cart")).toSet == Set(if (isAddCart) 1 else 0))
6987
assert(result.map(r => r("purchase")).toSet == Set(if (isPurchase) 1 else 0))
7088
assert(result.map(r => r("view")).toSet == Set(if (isView) 1 else 0))
89+
assert(result.map(r => r("ts")).toSet == Set(fixedTimestamp, fixedTimestamp))
7190
}
7291

7392
it should "match event condition (add_cart)" in {
@@ -185,19 +204,21 @@ class CatalystUtilComplexAvroTest extends AnyFlatSpec {
185204

186205
object LargeEventPayloadGenerator {
187206
val testSchema: Schema = new Schema.Parser().parse(
188-
"""{"type":"record","name":"LargeEvent","namespace":"com.customer","fields":[{"name":"event","type":"string"},{"name":"timestamp","type":"long"},{"name":"browser","type":["null","string"],"default":null},{"name":"primary","type":"boolean"},{"name":"id","type":"string"},{"name":"page_id","type":"string"},{"name":"logger","type":"string"},{"name":"src","type":"string"},{"name":"ip","type":"string"},{"name":"user_agent","type":"string"},{"name":"loc","type":"string"},{"name":"ref","type":"string"},{"name":"cookie_map","type":["null",{"type":"map","values":"string"}],"default":null},{"name":"ab","type":["null",{"type":"map","values":{"type":"array","items":"string"}}],"default":null},{"name":"user","type":["null","long"],"default":null},{"name":"mobile_request","type":["null","boolean"],"default":null},{"name":"mobile_device","type":["null","boolean"],"default":null},{"name":"mobile_template","type":["null","boolean"],"default":null},{"name":"currency","type":["null","string"],"default":null},{"name":"language","type":["null","string"],"default":null},{"name":"region","type":["null","string"],"default":null},{"name":"item_ids","type":["null",{"type":"array","items":"long"}],"default":null},{"name":"event_timestamp","type":["null","long"],"default":null},{"name":"attrs","type":["null",{"type":"map","values":"string"}],"default":null},{"name":"attributes","type":{"type":"record","name":"Attributes","fields":[{"name":"code","type":["null","string"],"default":null},{"name":"test_code","type":["null","string"],"default":null},{"name":"region","type":["null","string"],"default":null},{"name":"canary","type":["null","string"],"default":null},{"name":"support","type":["null","string"],"default":null},{"name":"item_id","type":["null","string"],"default":null},{"name":"sold_item_ids","type":["null","string"],"default":null}]}}]}"""
207+
"""{"type":"record","name":"LargeEvent","namespace":"com.customer","fields":[{"name":"event","type":"string"},{ "name": "timestamp", "type": ["null", { "type": "long", "logicalType": "timestamp-millis" }], "default": null },{"name":"browser","type":["null","string"],"default":null},{"name":"primary","type":"boolean"},{"name":"id","type":"string"},{"name":"page_id","type":"string"},{"name":"logger","type":"string"},{"name":"src","type":"string"},{"name":"ip","type":"string"},{"name":"user_agent","type":"string"},{"name":"loc","type":"string"},{"name":"ref","type":"string"},{"name":"cookie_map","type":["null",{"type":"map","values":"string"}],"default":null},{"name":"ab","type":["null",{"type":"map","values":{"type":"array","items":"string"}}],"default":null},{"name":"user","type":["null","long"],"default":null},{"name":"mobile_request","type":["null","boolean"],"default":null},{"name":"mobile_device","type":["null","boolean"],"default":null},{"name":"mobile_template","type":["null","boolean"],"default":null},{"name":"currency","type":["null","string"],"default":null},{"name":"language","type":["null","string"],"default":null},{"name":"region","type":["null","string"],"default":null},{"name":"item_ids","type":["null",{"type":"array","items":"long"}],"default":null},{"name":"event_timestamp","type":["null","long"],"default":null},{"name":"attrs","type":["null",{"type":"map","values":"string"}],"default":null},{"name":"attributes","type":{"type":"record","name":"Attributes","fields":[{"name":"code","type":["null","string"],"default":null},{"name":"test_code","type":["null","string"],"default":null},{"name":"region","type":["null","string"],"default":null},{"name":"canary","type":["null","string"],"default":null},{"name":"support","type":["null","string"],"default":null},{"name":"item_id","type":["null","string"],"default":null},{"name":"sold_item_ids","type":["null","string"],"default":null}]}}]}"""
189208
)
190209

191210
// Create writer for serializing records
192211
val writer = new SpecificDatumWriter[GenericRecord](testSchema)
193212

213+
val fixedTimestamp: Long = Instant.parse("2025-06-06T00:00:00Z").toEpochMilli
214+
194215
// Function to create a base event record
195216
def createBaseEvent(): GenericRecord = {
196217
val record = new GenericData.Record(testSchema)
197218

198219
// Set default values
199220
record.put("event", "view")
200-
record.put("timestamp", Instant.now().toEpochMilli)
221+
record.put("timestamp", fixedTimestamp)
201222
record.put("browser", "test-browser-id")
202223
record.put("primary", true)
203224
record.put("id", s"test-id-${UUID.randomUUID().toString.take(8)}")

online/src/main/scala/ai/chronon/online/CatalystUtil.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import ai.chronon.online.Extensions.StructTypeOps
2222
import ai.chronon.online.serde._
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.analysis.FunctionAlreadyExistsException
25+
import org.apache.spark.sql.internal.SQLConf
2526
import org.apache.spark.sql.{SparkSession, types}
2627
import org.slf4j.LoggerFactory
2728

@@ -47,6 +48,8 @@ object CatalystUtil {
4748
// The default doesn't seem to be set properly in the scala 2.13 version of spark
4849
// running into this issue https://github.com/dotnet/spark/issues/435
4950
.config("spark.driver.bindAddress", "127.0.0.1")
51+
.config(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)
52+
.config(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, false)
5053
.enableHiveSupport() // needed to support registering Hive UDFs via CREATE FUNCTION.. calls
5154
.getOrCreate()
5255
assert(spark.sessionState.conf.wholeStageEnabled)

online/src/main/scala/org/apache/spark/sql/avro/AvroCatalystUtils.scala

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)