Skip to content

Commit e8e643e

Browse files
fix: Clean up snowflake to_spark_df() (feast-dev#3607)
Signed-off-by: Miles Adkins <[email protected]>
1 parent 902f23f commit e8e643e

File tree

4 files changed

+15
-33
lines changed

4 files changed

+15
-33
lines changed

docs/reference/offline-stores/overview.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ Below is a matrix indicating which `RetrievalJob`s support what functionality.
4646
| --------------------------------- | --- | --- | --- | --- | --- | --- | --- |
4747
| export to dataframe | yes | yes | yes | yes | yes | yes | yes |
4848
| export to arrow table | yes | yes | yes | yes | yes | yes | yes |
49-
| export to arrow batches | no | no | yes | yes | no | no | no |
50-
| export to SQL | no | yes | yes | yes | yes | no | yes |
49+
| export to arrow batches | no | no | no | yes | no | no | no |
50+
| export to SQL | no | yes | yes | yes | yes | no | yes |
5151
| export to data lake (S3, GCS, etc.) | no | no | yes | no | yes | no | no |
5252
| export to data warehouse | no | yes | yes | yes | yes | no | no |
53-
| export as Spark dataframe | no | no | yes | no | no | yes | no |
53+
| export as Spark dataframe | no | no | yes | no | no | yes | no |
5454
| local execution of Python-based on-demand transforms | yes | yes | yes | yes | yes | no | yes |
5555
| remote execution of Python-based on-demand transforms | no | no | no | no | no | no | no |
5656
| persist results in the offline store | yes | yes | yes | yes | yes | yes | no |

docs/reference/offline-stores/snowflake.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ Below is a matrix indicating which functionality is supported by `SnowflakeRetri
5353
| ----------------------------------------------------- | --------- |
5454
| export to dataframe | yes |
5555
| export to arrow table | yes |
56-
| export to arrow batches | yes |
56+
| export to arrow batches | yes |
5757
| export to SQL | yes |
5858
| export to data lake (S3, GCS, etc.) | yes |
5959
| export to data warehouse | yes |

sdk/python/feast/errors.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,6 @@ def __init__(self, name, project=None):
5656
super().__init__(f"Feature view {name} does not exist")
5757

5858

59-
class InvalidSparkSessionException(Exception):
60-
def __init__(self, spark_arg):
61-
super().__init__(
62-
f" Need Spark Session to convert results to spark data frame\
63-
recieved {type(spark_arg)} instead. "
64-
)
65-
66-
6759
class OnDemandFeatureViewNotFoundException(FeastObjectNotFoundException):
6860
def __init__(self, name, project=None):
6961
if project:

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,7 @@
2828

2929
from feast import OnDemandFeatureView
3030
from feast.data_source import DataSource
31-
from feast.errors import (
32-
EntitySQLEmptyResults,
33-
InvalidEntityType,
34-
InvalidSparkSessionException,
35-
)
31+
from feast.errors import EntitySQLEmptyResults, InvalidEntityType
3632
from feast.feature_logging import LoggingConfig, LoggingSource
3733
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
3834
from feast.infra.offline_stores import offline_utils
@@ -528,28 +524,22 @@ def to_spark_df(self, spark_session: "SparkSession") -> "DataFrame":
528524
"""
529525

530526
try:
531-
from pyspark.sql import DataFrame, SparkSession
527+
from pyspark.sql import DataFrame
532528
except ImportError as e:
533529
from feast.errors import FeastExtrasDependencyImportError
534530

535531
raise FeastExtrasDependencyImportError("spark", str(e))
536532

537-
if isinstance(spark_session, SparkSession):
538-
arrow_batches = self.to_arrow_batches()
533+
spark_session.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
539534

540-
if arrow_batches:
541-
spark_df = reduce(
542-
DataFrame.unionAll,
543-
[
544-
spark_session.createDataFrame(batch.to_pandas())
545-
for batch in arrow_batches
546-
],
547-
)
548-
return spark_df
549-
else:
550-
raise EntitySQLEmptyResults(self.to_sql())
551-
else:
552-
raise InvalidSparkSessionException(spark_session)
535+
# This can be improved by parallelizing the read of chunks
536+
pandas_batches = self.to_pandas_batches()
537+
538+
spark_df = reduce(
539+
DataFrame.unionAll,
540+
[spark_session.createDataFrame(batch) for batch in pandas_batches],
541+
)
542+
return spark_df
553543

554544
def persist(
555545
self,

0 commit comments

Comments
 (0)