Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 152 additions & 95 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,134 +1518,191 @@ def _offline_write():

await run_in_threadpool(_offline_write)

def _get_feature_view_and_df_for_online_write(
def _validate_and_convert_input_data(
self,
feature_view_name: str,
df: Optional[pd.DataFrame] = None,
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
allow_registry_cache: bool = True,
transform_on_write: bool = True,
):
feature_view_dict = {
fv_proto.name: fv_proto
for fv_proto in self.list_all_feature_views(allow_registry_cache)
}
try:
feature_view = feature_view_dict[feature_view_name]
except FeatureViewNotFoundException:
raise FeatureViewNotFoundException(feature_view_name, self.project)
df: Optional[pd.DataFrame],
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]],
) -> Optional[pd.DataFrame]:
"""
Validates input parameters and converts them to a pandas DataFrame.

Args:
df: Optional DataFrame input
inputs: Optional dictionary or DataFrame input

Returns:
Validated pandas DataFrame or None

Raises:
ValueError: If both df and inputs are provided
DataFrameSerializationError: If input data cannot be converted to DataFrame
"""
if df is not None and inputs is not None:
raise ValueError("Both df and inputs cannot be provided at the same time.")

if df is None and inputs is not None:
if isinstance(inputs, dict) or isinstance(inputs, List):
try:
df = pd.DataFrame(inputs)
return pd.DataFrame(inputs)
except Exception as _:
raise DataFrameSerializationError(inputs)
elif isinstance(inputs, pd.DataFrame):
pass
return inputs
else:
raise ValueError("inputs must be a dictionary or a pandas DataFrame.")

if df is not None and inputs is None:
if isinstance(df, dict) or isinstance(df, List):
try:
df = pd.DataFrame(df)
return pd.DataFrame(df)
except Exception as _:
raise DataFrameSerializationError(df)

if feature_view.features[0].vector_index and df is not None:
return df

def _transform_on_demand_feature_view_df(
self, feature_view: OnDemandFeatureView, df: pd.DataFrame
) -> pd.DataFrame:
"""
Apply transformations for an OnDemandFeatureView to the input dataframe.

Args:
feature_view: The OnDemandFeatureView containing the transformation
df: The input dataframe to transform

Returns:
Transformed dataframe

Raises:
Exception: For unsupported OnDemandFeatureView modes
"""
if feature_view.mode == "python" and isinstance(
feature_view.feature_transformation, PythonTransformation
):
input_dict = (
df.to_dict(orient="records")[0]
if feature_view.singleton
else df.to_dict(orient="list")
)

if feature_view.singleton:
transformed_rows = []

for i, row in df.iterrows():
output = feature_view.feature_transformation.udf(row.to_dict())
if i == 0:
transformed_rows = output
else:
for k in output:
if isinstance(output[k], list):
transformed_rows[k].extend(output[k])
else:
transformed_rows[k].append(output[k])

transformed_data = pd.DataFrame(transformed_rows)
else:
transformed_data = feature_view.feature_transformation.udf(input_dict)

if feature_view.write_to_online_store:
entities = [
self.get_entity(entity) for entity in (feature_view.entities or [])
]
join_keys = [entity.join_key for entity in entities if entity]
join_keys = [k for k in join_keys if k in input_dict.keys()]
transformed_df = (
pd.DataFrame(transformed_data)
if not isinstance(transformed_data, pd.DataFrame)
else transformed_data
)
input_df = pd.DataFrame(
[input_dict] if feature_view.singleton else input_dict
)
if input_df.shape[0] == transformed_df.shape[0]:
for k in input_dict:
if k not in transformed_data:
transformed_data[k] = input_dict[k]
transformed_df = pd.DataFrame(transformed_data)
else:
transformed_df = pd.merge(
transformed_df,
input_df,
how="left",
on=join_keys,
)
else:
# overwrite any transformed features and update the dictionary
for k in input_dict:
if k not in transformed_data:
transformed_data[k] = input_dict[k]

return pd.DataFrame(transformed_data)

elif feature_view.mode == "pandas" and isinstance(
feature_view.feature_transformation, PandasTransformation
):
transformed_df = feature_view.feature_transformation.udf(df)
for col in df.columns:
transformed_df[col] = df[col]
return transformed_df
else:
raise Exception("Unsupported OnDemandFeatureView mode")

def _validate_vector_features(self, feature_view, df: pd.DataFrame) -> None:
"""
Validates vector features in the DataFrame against the feature view specifications.

Args:
feature_view: The feature view containing vector feature specifications
df: The DataFrame to validate

Raises:
ValueError: If vector dimension constraints are violated
"""
if feature_view.features and feature_view.features[0].vector_index:
fv_vector_feature_name = feature_view.features[0].name
df_vector_feature_index = df.columns.get_loc(fv_vector_feature_name)

if feature_view.features[0].vector_length != 0:
if (
df.shape[df_vector_feature_index]
> feature_view.features[0].vector_length
):
raise ValueError(
f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors which is greater than expected (i.e {feature_view.features[0].vector_length}) by feature view {feature_view.name}."
f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors "
f"which is greater than expected (i.e {feature_view.features[0].vector_length}) "
f"by feature view {feature_view.name}."
)

def _get_feature_view_and_df_for_online_write(
self,
feature_view_name: str,
df: Optional[pd.DataFrame] = None,
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
allow_registry_cache: bool = True,
transform_on_write: bool = True,
):
feature_view_dict = {
fv_proto.name: fv_proto
for fv_proto in self.list_all_feature_views(allow_registry_cache)
}
try:
feature_view = feature_view_dict[feature_view_name]
except FeatureViewNotFoundException:
raise FeatureViewNotFoundException(feature_view_name, self.project)

# Convert inputs/df to a consistent DataFrame format
df = self._validate_and_convert_input_data(df, inputs)

if df is not None:
self._validate_vector_features(feature_view, df)

# # Apply transformations if this is an OnDemandFeatureView with write_to_online_store=True
if (
isinstance(feature_view, OnDemandFeatureView)
and feature_view.write_to_online_store
and transform_on_write
):
if (
feature_view.mode == "python"
and isinstance(
feature_view.feature_transformation, PythonTransformation
)
and df is not None
):
input_dict = (
df.to_dict(orient="records")[0]
if feature_view.singleton
else df.to_dict(orient="list")
)
if feature_view.singleton:
transformed_rows = []

for i, row in df.iterrows():
output = feature_view.feature_transformation.udf(row.to_dict())
if i == 0:
transformed_rows = output
else:
for k in output:
if isinstance(output[k], list):
transformed_rows[k].extend(output[k])
else:
transformed_rows[k].append(output[k])

transformed_data = pd.DataFrame(transformed_rows)
else:
transformed_data = feature_view.feature_transformation.udf(
input_dict
)
if feature_view.write_to_online_store:
entities = [
self.get_entity(entity)
for entity in (feature_view.entities or [])
]
join_keys = [entity.join_key for entity in entities if entity]
join_keys = [k for k in join_keys if k in input_dict.keys()]
transformed_df = (
pd.DataFrame(transformed_data)
if not isinstance(transformed_data, pd.DataFrame)
else transformed_data
)
input_df = pd.DataFrame(
[input_dict] if feature_view.singleton else input_dict
)
if input_df.shape[0] == transformed_df.shape[0]:
for k in input_dict:
if k not in transformed_data:
transformed_data[k] = input_dict[k]
transformed_df = pd.DataFrame(transformed_data)
else:
transformed_df = pd.merge(
transformed_df,
input_df,
how="left",
on=join_keys,
)
else:
# overwrite any transformed features and update the dictionary
for k in input_dict:
if k not in transformed_data:
transformed_data[k] = input_dict[k]
df = pd.DataFrame(transformed_data)
elif feature_view.mode == "pandas" and isinstance(
feature_view.feature_transformation, PandasTransformation
):
transformed_df = feature_view.feature_transformation.udf(df)
if df is not None:
for col in df.columns:
transformed_df[col] = df[col]
df = transformed_df

else:
raise Exception("Unsupported OnDemandFeatureView mode")
df = self._transform_on_demand_feature_view_df(feature_view, df)

return feature_view, df

Expand Down
Loading