Skip to content

Commit 915896a

Browse files
Add sources and schema parameters to tiled_transformation following on_demand_feature_view pattern
Co-authored-by: franciscojavierarceo <[email protected]>
1 parent fefd284 commit 915896a

File tree

3 files changed

+51
-6
lines changed

3 files changed

+51
-6
lines changed

docs/getting-started/architecture/feature-transformation.md

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,20 @@ Examples include:
130130

131131
```python
132132
from feast.transformation import tiled_transformation
133+
from feast import Field
134+
from feast.types import Float64, Int64
133135
from datetime import timedelta
134136

135137
@tiled_transformation(
136-
tile_size=timedelta(hours=1), # Process data in 1-hour tiles
137-
mode="pandas", # Use pandas processing mode
138-
overlap=timedelta(minutes=5), # 5-minute overlap between tiles
138+
sources=["transaction_source_fv"], # Source feature views or data sources
139+
schema=[ # Output feature schema
140+
Field(name="rolling_avg", dtype=Float64),
141+
Field(name="cumulative_amount", dtype=Float64),
142+
Field(name="transaction_count", dtype=Int64),
143+
],
144+
tile_size=timedelta(hours=1), # Process data in 1-hour tiles
145+
mode="pandas", # Use pandas processing mode
146+
overlap=timedelta(minutes=5), # 5-minute overlap between tiles
139147
aggregation_functions=[
140148
lambda df: df.groupby('entity_id').agg({
141149
'transaction_amount': ['sum', 'mean', 'count']
@@ -146,7 +154,8 @@ def hourly_transaction_features(df: pd.DataFrame) -> pd.DataFrame:
146154
"""Transform transaction data within each hour tile."""
147155
return df.assign(
148156
rolling_avg=df['transaction_amount'].rolling(window=10).mean(),
149-
cumulative_amount=df['transaction_amount'].cumsum()
157+
cumulative_amount=df['transaction_amount'].cumsum(),
158+
transaction_count=df.groupby('entity_id').cumcount() + 1
150159
)
151160

152161
# Usage in StreamFeatureView
@@ -161,6 +170,11 @@ stream_fv = StreamFeatureView(
161170
**Chaining Features Example:**
162171
```python
163172
@tiled_transformation(
173+
sources=["transaction_hourly_fv"], # Source feature views
174+
schema=[ # Output feature schema
175+
Field(name="local_cumsum", dtype=Float64),
176+
Field(name="global_cumsum", dtype=Float64),
177+
],
164178
tile_size=timedelta(hours=1),
165179
mode="pandas",
166180
chaining_functions=[
@@ -177,6 +191,8 @@ def chained_cumulative_features(df: pd.DataFrame) -> pd.DataFrame:
177191
```
178192

179193
**Configuration Options:**
194+
- `sources`: List of source feature views or data sources that this transformation depends on
195+
- `schema`: List of Field definitions specifying output feature names and data types
180196
- `tile_size`: Duration of each time tile (e.g., `timedelta(hours=1)`)
181197
- `mode`: Processing mode - currently supports `"pandas"`
182198
- `overlap`: Optional overlap between tiles for continuity

sdk/python/feast/transformation/pandas_tiled_transformation.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ def __init__(
1919
udf: Callable[[pd.DataFrame], pd.DataFrame],
2020
udf_string: str,
2121
tile_config: TileConfiguration,
22+
sources: Optional[List[Any]] = None,
23+
schema: Optional[List[Any]] = None,
2224
name: Optional[str] = None,
2325
tags: Optional[Dict[str, str]] = None,
2426
description: str = "",
@@ -30,6 +32,8 @@ def __init__(
3032
udf=udf,
3133
udf_string=udf_string,
3234
tile_config=tile_config,
35+
sources=sources,
36+
schema=schema,
3337
name=name,
3438
tags=tags,
3539
description=description,

sdk/python/feast/transformation/tiled_transformation.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
from datetime import timedelta
2-
from typing import Any, Callable, Dict, List, Optional, Union
2+
from typing import Any, Callable, Dict, List, Optional, Union, TYPE_CHECKING
33

44
from feast.transformation.base import Transformation
55
from feast.transformation.mode import TransformationMode
66

7+
if TYPE_CHECKING:
8+
from feast.feature_view import FeatureView
9+
from feast.data_source import DataSource
10+
from feast.field import Field
11+
712

813
class TileConfiguration:
914
"""
@@ -54,6 +59,8 @@ def __init__(
5459
udf: Callable[[Any], Any],
5560
udf_string: str,
5661
tile_config: TileConfiguration,
62+
sources: Optional[List[Union[str, "FeatureView", "DataSource"]]] = None,
63+
schema: Optional[List["Field"]] = None,
5764
name: Optional[str] = None,
5865
tags: Optional[Dict[str, str]] = None,
5966
description: str = "",
@@ -71,6 +78,8 @@ def __init__(
7178
owner=owner,
7279
)
7380
self.tile_config = tile_config
81+
self.sources = sources or []
82+
self.schema = schema or []
7483
self.aggregation_functions = aggregation_functions or []
7584
self.chaining_functions = chaining_functions or []
7685

@@ -177,6 +186,8 @@ def infer_features(self, *args, **kwargs) -> Any:
177186
def tiled_transformation(
178187
tile_size: timedelta,
179188
mode: str = "pandas",
189+
sources: Optional[List[Union[str, "FeatureView", "DataSource"]]] = None,
190+
schema: Optional[List["Field"]] = None,
180191
overlap: Optional[timedelta] = None,
181192
max_tiles_in_memory: int = 10,
182193
enable_late_data_handling: bool = True,
@@ -193,6 +204,8 @@ def tiled_transformation(
193204
Args:
194205
tile_size: The size of each time tile (e.g., timedelta(hours=1))
195206
mode: The transformation mode - currently supports "pandas"
207+
sources: List of source feature views or data sources that this transformation depends on
208+
schema: List of Field definitions specifying output feature names and data types
196209
overlap: Optional overlap between tiles for continuity
197210
max_tiles_in_memory: Maximum number of tiles to keep in memory
198211
enable_late_data_handling: Whether to handle late-arriving data
@@ -207,11 +220,19 @@ def tiled_transformation(
207220
@tiled_transformation(
208221
tile_size=timedelta(hours=1),
209222
mode="pandas",
223+
sources=["transaction_source_fv"],
224+
schema=[
225+
Field(name="rolling_avg", dtype=Float64),
226+
Field(name="cumulative_amount", dtype=Float64),
227+
],
210228
overlap=timedelta(minutes=5),
211229
aggregation_functions=[lambda df: df.groupby('entity_id').sum()]
212230
)
213231
def my_tiled_feature(df):
214-
return df.assign(derived_feature=df['value'] * 2)
232+
return df.assign(
233+
rolling_avg=df['value'].rolling(window=10).mean(),
234+
cumulative_amount=df['value'].cumsum()
235+
)
215236
"""
216237
def decorator(user_function):
217238
import dill
@@ -238,6 +259,8 @@ def mainify(obj):
238259
udf=user_function,
239260
udf_string=udf_string,
240261
tile_config=tile_config,
262+
sources=sources,
263+
schema=schema,
241264
name=name or user_function.__name__,
242265
tags=tags,
243266
description=description,
@@ -251,6 +274,8 @@ def mainify(obj):
251274
udf=user_function,
252275
udf_string=udf_string,
253276
tile_config=tile_config,
277+
sources=sources,
278+
schema=schema,
254279
name=name or user_function.__name__,
255280
tags=tags,
256281
description=description,

0 commit comments

Comments
 (0)