Skip to content

Commit a93d300

Browse files
authored
feat: Update Spark Compute read source node to be able to use other data sources (#5445)
* feat: Make Spark Compute able to use other source Signed-off-by: HaoXuAI <[email protected]> * feat: Make Spark Compute able to use other source Signed-off-by: HaoXuAI <[email protected]> --------- Signed-off-by: HaoXuAI <[email protected]>
1 parent 6c94dbf commit a93d300

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

sdk/python/feast/infra/compute_engines/spark/feature_builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def build_source_node(self):
2929
source = self.feature_view.batch_source
3030
start_time = self.task.start_time
3131
end_time = self.task.end_time
32-
node = SparkReadNode("source", source, start_time, end_time)
32+
node = SparkReadNode("source", source, self.spark_session, start_time, end_time)
3333
self.nodes.append(node)
3434
return node
3535

sdk/python/feast/infra/compute_engines/spark/nodes.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@ def __init__(
5656
self,
5757
name: str,
5858
source: DataSource,
59+
spark_session: SparkSession,
5960
start_time: Optional[datetime] = None,
6061
end_time: Optional[datetime] = None,
6162
):
6263
super().__init__(name)
6364
self.source = source
65+
self.spark_session = spark_session
6466
self.start_time = start_time
6567
self.end_time = end_time
6668

@@ -72,7 +74,10 @@ def execute(self, context: ExecutionContext) -> DAGValue:
7274
start_time=self.start_time,
7375
end_time=self.end_time,
7476
)
75-
spark_df = cast(SparkRetrievalJob, retrieval_job).to_spark_df()
77+
if isinstance(retrieval_job, SparkRetrievalJob):
78+
spark_df = cast(SparkRetrievalJob, retrieval_job).to_spark_df()
79+
else:
80+
spark_df = self.spark_session.createDataFrame(retrieval_job.to_arrow())
7681

7782
return DAGValue(
7883
data=spark_df,

0 commit comments

Comments
 (0)