Skip to content

Commit f8d3890

Browse files
authored
fix: Snowflake remote storage (#3574)
* fix: Snowflake remote storage Signed-off-by: adamschmidt <[email protected]> * fix: lint Signed-off-by: adamschmidt <[email protected]> * fix: field string build Signed-off-by: adamschmidt <[email protected]> * fix: join typo Signed-off-by: adamschmidt <[email protected]> * fix: formatting Signed-off-by: adamschmidt <[email protected]> --------- Signed-off-by: adamschmidt <[email protected]>
1 parent 8b90e2f commit f8d3890

File tree

1 file changed

+28
-2
lines changed

1 file changed

+28
-2
lines changed

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ class SnowflakeOfflineStoreConfig(FeastConfigBaseModel):
109109
blob_export_location: Optional[str] = None
110110
""" Location (in S3, Google storage or Azure storage) where data is offloaded """
111111

112+
convert_timestamp_columns: Optional[bool] = None
113+
""" Convert timestamp columns on export to a Parquet-supported format """
114+
112115
class Config:
113116
allow_population_by_field_name = True
114117

@@ -152,6 +155,29 @@ def pull_latest_from_table_or_query(
152155
+ '"'
153156
)
154157

158+
if config.offline_store.convert_timestamp_columns:
159+
select_fields = list(
160+
map(
161+
lambda field_name: f'"{field_name}"',
162+
join_key_columns + feature_name_columns,
163+
)
164+
)
165+
select_timestamps = list(
166+
map(
167+
lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}",
168+
timestamp_columns,
169+
)
170+
)
171+
inner_field_string = ", ".join(select_fields + select_timestamps)
172+
else:
173+
select_fields = list(
174+
map(
175+
lambda field_name: f'"{field_name}"',
176+
join_key_columns + feature_name_columns + timestamp_columns,
177+
)
178+
)
179+
inner_field_string = ", ".join(select_fields)
180+
155181
if data_source.snowflake_options.warehouse:
156182
config.offline_store.warehouse = data_source.snowflake_options.warehouse
157183

@@ -166,7 +192,7 @@ def pull_latest_from_table_or_query(
166192
{field_string}
167193
{f''', TRIM({repr(DUMMY_ENTITY_VAL)}::VARIANT,'"') AS "{DUMMY_ENTITY_ID}"''' if not join_key_columns else ""}
168194
FROM (
169-
SELECT {field_string},
195+
SELECT {inner_field_string},
170196
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS "_feast_row"
171197
FROM {from_expression}
172198
WHERE "{timestamp_field}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
@@ -533,7 +559,7 @@ def to_remote_storage(self) -> List[str]:
533559
self.to_snowflake(table)
534560

535561
query = f"""
536-
COPY INTO '{self.config.offline_store.blob_export_location}/{table}' FROM "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n
562+
COPY INTO '{self.export_path}/{table}' FROM "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n
537563
STORAGE_INTEGRATION = {self.config.offline_store.storage_integration_name}\n
538564
FILE_FORMAT = (TYPE = PARQUET)
539565
DETAILED_OUTPUT = TRUE

0 commit comments

Comments
 (0)