Skip to content

Commit 01ab462

Browse files
franciscojavierarceokysersozeleekm-zlatan-el
authored
fix: Updating the batch field so that you can query create and event date. (#3411)
* fix: Assertion condition when value is 0 (#3401) * fix: Add assertion condition when value is 0 Signed-off-by: zlatan.el <[email protected]> * chore: Add comment about zero value validation Signed-off-by: zlatan.el <[email protected]> * chore: Modifiy the comment Signed-off-by: zlatan.el <[email protected]> * chore: Add the comment Signed-off-by: zlatan.el <[email protected]> Signed-off-by: zlatan.el <[email protected]> Co-authored-by: zlatan.el <[email protected]> Signed-off-by: franciscojavierarceo <[email protected]> * updating the batch field so that if you want return the created date of a model you can just add it in the get_online_features feature argument Signed-off-by: franciscojavierarceo <[email protected]> * linted Signed-off-by: franciscojavierarceo <[email protected]> * adding change to also support querying the event_timestamp Signed-off-by: franciscojavierarceo <[email protected]> Signed-off-by: zlatan.el <[email protected]> Signed-off-by: franciscojavierarceo <[email protected]> Co-authored-by: kysersozelee <[email protected]> Co-authored-by: zlatan.el <[email protected]>
1 parent 81c3483 commit 01ab462

File tree

1 file changed

+39
-7
lines changed
  • sdk/python/feast/infra/offline_stores

1 file changed

+39
-7
lines changed

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ def evaluate_historical_retrieval():
267267
)
268268

269269
entity_df_with_features = _drop_columns(
270-
df_to_join, timestamp_field, created_timestamp_column
270+
df_to_join, features, timestamp_field, created_timestamp_column
271271
)
272272

273273
# Ensure that we delete dataframes to free up memory
@@ -599,6 +599,11 @@ def _normalize_timestamp(
599599
created_timestamp_column_type = df_to_join_types[created_timestamp_column]
600600

601601
if not hasattr(timestamp_field_type, "tz") or timestamp_field_type.tz != pytz.UTC:
602+
# if you are querying for the event timestamp field, we have to deduplicate
603+
if len(df_to_join[timestamp_field].shape) > 1:
604+
df_to_join, dups = _df_column_uniquify(df_to_join)
605+
df_to_join = df_to_join.drop(columns=dups)
606+
602607
# Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
603608
df_to_join[timestamp_field] = df_to_join[timestamp_field].apply(
604609
lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc),
@@ -609,6 +614,11 @@ def _normalize_timestamp(
609614
not hasattr(created_timestamp_column_type, "tz")
610615
or created_timestamp_column_type.tz != pytz.UTC
611616
):
617+
if len(df_to_join[created_timestamp_column].shape) > 1:
618+
# if you are querying for the created timestamp field, we have to deduplicate
619+
df_to_join, dups = _df_column_uniquify(df_to_join)
620+
df_to_join = df_to_join.drop(columns=dups)
621+
612622
df_to_join[created_timestamp_column] = df_to_join[
613623
created_timestamp_column
614624
].apply(
@@ -701,14 +711,36 @@ def _drop_duplicates(
701711

702712
def _drop_columns(
703713
df_to_join: dd.DataFrame,
714+
features: List[str],
704715
timestamp_field: str,
705716
created_timestamp_column: str,
706717
) -> dd.DataFrame:
707-
entity_df_with_features = df_to_join.drop([timestamp_field], axis=1).persist()
708-
709-
if created_timestamp_column:
710-
entity_df_with_features = entity_df_with_features.drop(
711-
[created_timestamp_column], axis=1
712-
).persist()
718+
entity_df_with_features = df_to_join
719+
timestamp_columns = [
720+
timestamp_field,
721+
created_timestamp_column,
722+
]
723+
for column in timestamp_columns:
724+
if column and column not in features:
725+
entity_df_with_features = entity_df_with_features.drop(
726+
[column], axis=1
727+
).persist()
713728

714729
return entity_df_with_features
730+
731+
732+
def _df_column_uniquify(df: dd.DataFrame) -> Tuple[dd.DataFrame, List[str]]:
733+
df_columns = df.columns
734+
new_columns = []
735+
duplicate_cols = []
736+
for item in df_columns:
737+
counter = 0
738+
newitem = item
739+
while newitem in new_columns:
740+
counter += 1
741+
newitem = "{}_{}".format(item, counter)
742+
if counter > 0:
743+
duplicate_cols.append(newitem)
744+
new_columns.append(newitem)
745+
df.columns = new_columns
746+
return df, duplicate_cols

0 commit comments

Comments
 (0)