Skip to content

Commit bab6644

Browse files
gbmarc1Marc-Antoine Belanger
andauthored
feat: Adding query timeout to to_df and to_arrow retrieval methods (#3505)
* feat:Adding query timeout to `to_df` and `to_arrow` retrieval methods Signed-off-by: gbmarc1 <[email protected]> * feat: add regex requirement for trino Signed-off-by: gbmarc1 <[email protected]> * build: lock deps with new regex requirement Signed-off-by: Marc-Antoine Belanger <[email protected]> --------- Signed-off-by: gbmarc1 <[email protected]> Signed-off-by: Marc-Antoine Belanger <[email protected]> Co-authored-by: Marc-Antoine Belanger <[email protected]>
1 parent 0c81431 commit bab6644

File tree

19 files changed

+437
-121
lines changed

19 files changed

+437
-121
lines changed

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,11 @@ def full_feature_names(self) -> bool:
441441
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
442442
return self._on_demand_feature_views
443443

444-
def _to_df_internal(self) -> pd.DataFrame:
444+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
445445
with self._query_generator() as query:
446-
df = self._execute_query(query).to_dataframe(create_bqstorage_client=True)
446+
df = self._execute_query(query=query, timeout=timeout).to_dataframe(
447+
create_bqstorage_client=True
448+
)
447449
return df
448450

449451
def to_sql(self) -> str:
@@ -507,15 +509,15 @@ def to_bigquery(
507509
print(f"Done writing to '{dest}'.")
508510
return str(dest)
509511

510-
def _to_arrow_internal(self) -> pyarrow.Table:
512+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
511513
with self._query_generator() as query:
512-
q = self._execute_query(query=query)
514+
q = self._execute_query(query=query, timeout=timeout)
513515
assert q
514516
return q.to_arrow()
515517

516518
@log_exceptions_and_usage
517519
def _execute_query(
518-
self, query, job_config=None, timeout: int = 1800
520+
self, query, job_config=None, timeout: Optional[int] = None
519521
) -> Optional[bigquery.job.query.QueryJob]:
520522
bq_job = self.client.query(query, job_config=job_config)
521523

@@ -525,7 +527,7 @@ def _execute_query(
525527
)
526528
return None
527529

528-
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
530+
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout or 1800)
529531
return bq_job
530532

531533
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ def get_temp_table_dml_header(
375375
return temp_table_dml_header
376376

377377
@log_exceptions_and_usage
378-
def _to_df_internal(self) -> pd.DataFrame:
378+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
379379
with self._query_generator() as query:
380380
temp_table_name = "_" + str(uuid.uuid4()).replace("-", "")
381381
temp_external_location = self.get_temp_s3_path()
@@ -392,7 +392,7 @@ def _to_df_internal(self) -> pd.DataFrame:
392392
)
393393

394394
@log_exceptions_and_usage
395-
def _to_arrow_internal(self) -> pa.Table:
395+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
396396
with self._query_generator() as query:
397397
temp_table_name = "_" + str(uuid.uuid4()).replace("-", "")
398398
temp_external_location = self.get_temp_s3_path()

sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ def __init__(
327327
engine: Engine,
328328
config: MsSqlServerOfflineStoreConfig,
329329
full_feature_names: bool,
330-
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
330+
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
331331
metadata: Optional[RetrievalMetadata] = None,
332332
drop_columns: Optional[List[str]] = None,
333333
):
@@ -347,10 +347,10 @@ def full_feature_names(self) -> bool:
347347
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
348348
return self._on_demand_feature_views
349349

350-
def _to_df_internal(self) -> pandas.DataFrame:
350+
def _to_df_internal(self, timeout: Optional[int] = None) -> pandas.DataFrame:
351351
return pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan)
352352

353-
def _to_arrow_internal(self) -> pyarrow.Table:
353+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
354354
result = pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan)
355355
return pyarrow.Table.from_pandas(result)
356356

sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ def __init__(
241241
query: Union[str, Callable[[], ContextManager[str]]],
242242
config: RepoConfig,
243243
full_feature_names: bool,
244-
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
244+
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
245245
metadata: Optional[RetrievalMetadata] = None,
246246
):
247247
if not isinstance(query, str):
@@ -267,15 +267,15 @@ def full_feature_names(self) -> bool:
267267
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
268268
return self._on_demand_feature_views
269269

270-
def _to_df_internal(self) -> pd.DataFrame:
270+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
271271
# We use arrow format because it gives better control of the table schema
272272
return self._to_arrow_internal().to_pandas()
273273

274274
def to_sql(self) -> str:
275275
with self._query_generator() as query:
276276
return query
277277

278-
def _to_arrow_internal(self) -> pa.Table:
278+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
279279
with self._query_generator() as query:
280280
with _get_conn(self.config.offline_store) as conn, conn.cursor() as cur:
281281
conn.set_session(readonly=True)

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,13 +336,13 @@ def to_spark_df(self) -> pyspark.sql.DataFrame:
336336
*_, last = map(self.spark_session.sql, statements)
337337
return last
338338

339-
def _to_df_internal(self) -> pd.DataFrame:
339+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
340340
"""Return dataset as Pandas DataFrame synchronously"""
341341
return self.to_spark_df().toPandas()
342342

343-
def _to_arrow_internal(self) -> pyarrow.Table:
343+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
344344
"""Return dataset as pyarrow Table synchronously"""
345-
return pyarrow.Table.from_pandas(self._to_df_internal())
345+
return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout))
346346

347347
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
348348
"""

sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,16 @@ def full_feature_names(self) -> bool:
8585
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
8686
return self._on_demand_feature_views
8787

88-
def _to_df_internal(self) -> pd.DataFrame:
88+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
8989
"""Return dataset as Pandas DataFrame synchronously including on demand transforms"""
9090
results = self._client.execute_query(query_text=self._query)
9191
self.pyarrow_schema = results.pyarrow_schema
9292
return results.to_dataframe()
9393

94-
def _to_arrow_internal(self) -> pyarrow.Table:
94+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
9595
"""Return payrrow dataset as synchronously including on demand transforms"""
9696
return pyarrow.Table.from_pandas(
97-
self._to_df_internal(), schema=self.pyarrow_schema
97+
self._to_df_internal(timeout=timeout), schema=self.pyarrow_schema
9898
)
9999

100100
def to_sql(self) -> str:

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,14 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
7676
return self._on_demand_feature_views
7777

7878
@log_exceptions_and_usage
79-
def _to_df_internal(self) -> pd.DataFrame:
79+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
8080
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
8181
df = self.evaluation_function().compute()
8282
df = df.reset_index(drop=True)
8383
return df
8484

8585
@log_exceptions_and_usage
86-
def _to_arrow_internal(self):
86+
def _to_arrow_internal(self, timeout: Optional[int] = None):
8787
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
8888
df = self.evaluation_function().compute()
8989
return pyarrow.Table.from_pandas(df)

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ class RetrievalJob(ABC):
6262
"""A RetrievalJob manages the execution of a query to retrieve data from the offline store."""
6363

6464
def to_df(
65-
self, validation_reference: Optional["ValidationReference"] = None
65+
self,
66+
validation_reference: Optional["ValidationReference"] = None,
67+
timeout: Optional[int] = None,
6668
) -> pd.DataFrame:
6769
"""
6870
Synchronously executes the underlying query and returns the result as a pandas dataframe.
@@ -72,8 +74,9 @@ def to_df(
7274
7375
Args:
7476
validation_reference (optional): The validation to apply against the retrieved dataframe.
77+
timeout (optional): The query timeout if applicable.
7578
"""
76-
features_df = self._to_df_internal()
79+
features_df = self._to_df_internal(timeout=timeout)
7780

7881
if self.on_demand_feature_views:
7982
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
@@ -101,7 +104,9 @@ def to_df(
101104
return features_df
102105

103106
def to_arrow(
104-
self, validation_reference: Optional["ValidationReference"] = None
107+
self,
108+
validation_reference: Optional["ValidationReference"] = None,
109+
timeout: Optional[int] = None,
105110
) -> pyarrow.Table:
106111
"""
107112
Synchronously executes the underlying query and returns the result as an arrow table.
@@ -111,11 +116,12 @@ def to_arrow(
111116
112117
Args:
113118
validation_reference (optional): The validation to apply against the retrieved dataframe.
119+
timeout (optional): The query timeout if applicable.
114120
"""
115121
if not self.on_demand_feature_views and not validation_reference:
116-
return self._to_arrow_internal()
122+
return self._to_arrow_internal(timeout=timeout)
117123

118-
features_df = self._to_df_internal()
124+
features_df = self._to_df_internal(timeout=timeout)
119125
if self.on_demand_feature_views:
120126
for odfv in self.on_demand_feature_views:
121127
features_df = features_df.join(
@@ -147,20 +153,24 @@ def to_sql(self) -> str:
147153
pass
148154

149155
@abstractmethod
150-
def _to_df_internal(self) -> pd.DataFrame:
156+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
151157
"""
152158
Synchronously executes the underlying query and returns the result as a pandas dataframe.
153159
160+
timeout: RetreivalJob implementations may implement a timeout.
161+
154162
Does not handle on demand transformations or dataset validation. For either of those,
155163
`to_df` should be used.
156164
"""
157165
pass
158166

159167
@abstractmethod
160-
def _to_arrow_internal(self) -> pyarrow.Table:
168+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
161169
"""
162170
Synchronously executes the underlying query and returns the result as an arrow table.
163171
172+
timeout: RetreivalJob implementations may implement a timeout.
173+
164174
Does not handle on demand transformations or dataset validation. For either of those,
165175
`to_arrow` should be used.
166176
"""

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
400400
return self._on_demand_feature_views
401401

402402
@log_exceptions_and_usage
403-
def _to_df_internal(self) -> pd.DataFrame:
403+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
404404
with self._query_generator() as query:
405405
return aws_utils.unload_redshift_query_to_df(
406406
self._redshift_client,
@@ -414,7 +414,7 @@ def _to_df_internal(self) -> pd.DataFrame:
414414
)
415415

416416
@log_exceptions_and_usage
417-
def _to_arrow_internal(self) -> pa.Table:
417+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
418418
with self._query_generator() as query:
419419
return aws_utils.unload_redshift_query_to_pa(
420420
self._redshift_client,

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ def full_feature_names(self) -> bool:
410410
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
411411
return self._on_demand_feature_views
412412

413-
def _to_df_internal(self) -> pd.DataFrame:
413+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
414414
with self._query_generator() as query:
415415

416416
df = execute_snowflake_statement(
@@ -419,7 +419,7 @@ def _to_df_internal(self) -> pd.DataFrame:
419419

420420
return df
421421

422-
def _to_arrow_internal(self) -> pyarrow.Table:
422+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
423423
with self._query_generator() as query:
424424

425425
pa_table = execute_snowflake_statement(

0 commit comments

Comments
 (0)