-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: Compute Engine Initial Implementation #5223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Left some small nits on naming.
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def build_aggregation_node(self, input_node): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should remove the 'node' suffix from all of there, as I think it's implicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, I think it's better to keep the 'node' suffix, as it makes the output more descriptive and clearly indicates that it's a node, which can then be chained in the build method
def build_validation_node(self, input_node): | ||
raise | ||
|
||
def build(self) -> ExecutionPlan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not: build_dag(self)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to keep build(), more consistent with the builder.build()
pattern.
class DAGFormat(str, Enum): | ||
SPARK = "spark" | ||
PANDAS = "pandas" | ||
ARROW = "arrow" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Native python?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it's only a placeholder for now. The native python will be interesting as it works with on demand feature view. Will do it in the next PR
spark_df = result.data # should be a Spark DataFrame | ||
|
||
# ✅ 4. Return as Arrow | ||
return spark_df.toPandas().to_arrow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serializing as pandas here could be expensive no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, only to keep it consistent with the current get_historical_features API. But definitely needs some rework
|
||
|
||
class SparkDAGBuilder(DAGBuilder): | ||
def build_source_node(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah just like above, don't think we need node suffix for these methods
sdk/python/feast/infra/compute_engines/spark/spark_dag_builder.py
Outdated
Show resolved
Hide resolved
Signed-off-by: HaoXuAI <[email protected]>
@HaoXuAI is this ready to review? Or still draft? |
Not yet. Still needs some work but should be ready soon |
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
Signed-off-by: HaoXuAI <[email protected]>
udf_string: Optional[str] | ||
feature_transformation: Transformation | ||
batch_engine: Optional[Field] | ||
aggregations: Optional[List[Aggregation]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm to be honest, I don't love putting the transformation in the FeatureView.
I think it'd be more intuitive to put Aggregation under Transformation and make the FeatureViews purely represent schemas available for online or offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This is only to be consistent with the current StreamFeatureView
and Aggregation
class. It makes sense to have Aggregation
one type of Transformation
(as well as for Filter), but that will need some work to refactor.
For Tecton, they put Aggregate
in feature
(what Feast called schema
) as
features=[
Aggregate(
input_column=Field("transaction", Int64),
function="count",
time_window=TimeWindow(window_size=timedelta(days=1)),
)
],
In some way that also make sense but for Feast it will needs much more work to refactor the schema
API.
For Chrono they put it into a GroupBy API, which is similar to our FeatureView:
v1 = GroupBy(
sources=[source],
keys=["user_id"], # We are aggregating by user
online=True,
aggregations=[Aggregation(
input_column="refund_amt",
operation=Operation.SUM,
windows=window_sizes
), # The sum of purchases prices in various windows
],
)
I can look into how to merge Aggregation
with Transformation
together in the next PR, and added it to a TODO for now.
from feast.infra.materialization.batch_materialization_engine import MaterializationTask | ||
|
||
|
||
class FeatureBuilder(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose my concern with calling it FeatureBuilder rather than DAGBuilder is because users will say that this isn't a fit for RAG.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think RAG or embedding is still a type of feature, so that should still work?
Tecton defines Embedding into feature directly: https://docs.tecton.ai/docs/beta/defining-features/feature-views/embeddings which is also interesting.
For feast, RAG or embedding can be either custom udf in Transformation
, and maybe like Aggregation
a separate API (Or merge into a special type of Transformation
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think Tecton has captured the audience there candidly. But maybe that's fine for now and we can revisit later so no worries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this:
def transform_feature(df: DataFrame) -> DataFrame:
df = df.withColumn("sum_conv_rate", df["sum_conv_rate"] * 2)
df = df.withColumn("avg_acc_rate", df["avg_acc_rate"] * 2)
return df
driver_stats_fv = BatchFeatureView(
name="driver_hourly_stats",
entities=[driver],
mode="python",
batch_engine=Field(...), # Not supported yet.
aggregations=[
Aggregation(column="conv_rate", function="sum"),
Aggregation(column="acc_rate", function="avg"),
],
join=[...], # Not supported yet
udf=transform_feature,
udf_string="transform_feature",
ttl=timedelta(days=3), # Not supported for materialization yet
filter="filter_expr", # Not fully support
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
offline=False, # not supported yet
source=data_source,
)
What is the point of the Aggregations in the Feature View?
aggregations=[
Aggregation(column="conv_rate", function="sum"),
Aggregation(column="acc_rate", function="avg"),
],
aggregation from the Design perspective is to simplify defining aggregation features with OOTB API. There are some benefits listed in Tecton: https://docs.tecton.ai/docs/beta/defining-features/feature-views/aggregation-engine. What I think important is the lifetime window support. For the Aggregation you can define |
Signed-off-by: HaoXuAI <[email protected]>
Yeah that makes a lot of sense, I'm good with this. |
This is helpful and a really good discussion. And I agree the interface is still immature and definitely subject to change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go! 🚀🚀🚀
# [0.49.0](v0.48.0...v0.49.0) (2025-04-29) ### Bug Fixes * Adding brackets to unit tests ([c46fea3](c46fea3)) * Adding logic back for a step ([2bb240b](2bb240b)) * Adjustment for unit test action ([a6f78ae](a6f78ae)) * Allow get_historical_features with only On Demand Feature View ([#5256](#5256)) ([0752795](0752795)) * CI adjustment ([3850643](3850643)) * Embed Query configuration breaks when switching between DataFrame and SQL ([#5257](#5257)) ([32375a5](32375a5)) * Fix for proto issue in utils ([1b291b2](1b291b2)) * Fix milvus online_read ([#5233](#5233)) ([4b91f26](4b91f26)) * Fix tests ([431d9b8](431d9b8)) * Fixed Permissions object parameter in example ([#5259](#5259)) ([045c100](045c100)) * Java CI [#12](#12) ([d7e44ac](d7e44ac)) * Java PR [#15](#15) ([a5da3bb](a5da3bb)) * Java PR [#16](#16) ([e0320fe](e0320fe)) * Java PR [#17](#17) ([49da810](49da810)) * Materialization logs ([#5243](#5243)) ([4aa2f49](4aa2f49)) * Moving to custom github action for checking skip tests ([caf312e](caf312e)) * Operator - remove default replicas setting from Feast Deployment ([#5294](#5294)) ([e416d01](e416d01)) * Patch java pr [#14](#14) ([592526c](592526c)) * Patch update for test ([a3e8967](a3e8967)) * Remove conditional from steps ([995307f](995307f)) * Remove misleading HTTP prefix from gRPC endpoints in logs and doc ([#5280](#5280)) ([0ee3a1e](0ee3a1e)) * removing id ([268ade2](268ade2)) * Renaming workflow file ([5f46279](5f46279)) * Resolve `no pq wrapper` import issue ([#5240](#5240)) ([d5906f1](d5906f1)) * Update actions to remove check skip tests ([#5275](#5275)) ([b976f27](b976f27)) * Update docling demo ([446efea](446efea)) * Update java pr [#13](#13) ([fda7db7](fda7db7)) * Update java_pr ([fa138f4](fa138f4)) * Update repo_config.py ([6a59815](6a59815)) * Update unit tests workflow ([06486a0](06486a0)) * Updated docs for docling demo ([768e6cc](768e6cc)) * Updating action for unit tests ([0996c28](0996c28)) * Updating github actions to filter at job level ([0a09622](0a09622)) * Updating Java CI ([c7c3a3c](c7c3a3c)) * Updating java pr to skip tests ([e997dd9](e997dd9)) * Updating workflows ([c66bcd2](c66bcd2)) ### Features * Add date_partition_column_format for spark source ([#5273](#5273)) ([7a61d6f](7a61d6f)) * Add Milvus tutorial with Feast integration ([#5292](#5292)) ([a1388a5](a1388a5)) * Add pgvector tutorial with PostgreSQL integration ([#5290](#5290)) ([bb1cbea](bb1cbea)) * Add ReactFlow visualization for Feast registry metadata ([#5297](#5297)) ([9768970](9768970)) * Add retrieve online documents v2 method into pgvector ([#5253](#5253)) ([6770ee6](6770ee6)) * Compute Engine Initial Implementation ([#5223](#5223)) ([64bdafd](64bdafd)) * Enable write node for compute engine ([#5287](#5287)) ([f9baf97](f9baf97)) * Local compute engine ([#5278](#5278)) ([8e06dfe](8e06dfe)) * Make transform on writes configurable for ingestion ([#5283](#5283)) ([ecad170](ecad170)) * Offline store update pull_all_from_table_or_query to make timestampfield optional ([#5281](#5281)) ([4b94608](4b94608)) * Serialization version 2 deprecation notice ([#5248](#5248)) ([327d99d](327d99d)) * Vector length definition moved to Feature View from Config ([#5289](#5289)) ([d8f1c97](d8f1c97))
* add compute engine Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * add doc Signed-off-by: HaoXuAI <[email protected]> * add test Signed-off-by: HaoXuAI <[email protected]> * add integration test Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * update API Signed-off-by: HaoXuAI <[email protected]> * fix linting Signed-off-by: HaoXuAI <[email protected]> * update doc Signed-off-by: HaoXuAI <[email protected]> * update doc Signed-off-by: HaoXuAI <[email protected]> * update test Signed-off-by: HaoXuAI <[email protected]> * update doc Signed-off-by: HaoXuAI <[email protected]> --------- Signed-off-by: HaoXuAI <[email protected]> Signed-off-by: Jacob Weinhold <[email protected]>
# [0.49.0](feast-dev/feast@v0.48.0...v0.49.0) (2025-04-29) ### Bug Fixes * Adding brackets to unit tests ([c46fea3](feast-dev@c46fea3)) * Adding logic back for a step ([2bb240b](feast-dev@2bb240b)) * Adjustment for unit test action ([a6f78ae](feast-dev@a6f78ae)) * Allow get_historical_features with only On Demand Feature View ([feast-dev#5256](feast-dev#5256)) ([0752795](feast-dev@0752795)) * CI adjustment ([3850643](feast-dev@3850643)) * Embed Query configuration breaks when switching between DataFrame and SQL ([feast-dev#5257](feast-dev#5257)) ([32375a5](feast-dev@32375a5)) * Fix for proto issue in utils ([1b291b2](feast-dev@1b291b2)) * Fix milvus online_read ([feast-dev#5233](feast-dev#5233)) ([4b91f26](feast-dev@4b91f26)) * Fix tests ([431d9b8](feast-dev@431d9b8)) * Fixed Permissions object parameter in example ([feast-dev#5259](feast-dev#5259)) ([045c100](feast-dev@045c100)) * Java CI [feast-dev#12](feast-dev#12) ([d7e44ac](feast-dev@d7e44ac)) * Java PR [feast-dev#15](feast-dev#15) ([a5da3bb](feast-dev@a5da3bb)) * Java PR [feast-dev#16](feast-dev#16) ([e0320fe](feast-dev@e0320fe)) * Java PR [feast-dev#17](feast-dev#17) ([49da810](feast-dev@49da810)) * Materialization logs ([feast-dev#5243](feast-dev#5243)) ([4aa2f49](feast-dev@4aa2f49)) * Moving to custom github action for checking skip tests ([caf312e](feast-dev@caf312e)) * Operator - remove default replicas setting from Feast Deployment ([feast-dev#5294](feast-dev#5294)) ([e416d01](feast-dev@e416d01)) * Patch java pr [feast-dev#14](feast-dev#14) ([592526c](feast-dev@592526c)) * Patch update for test ([a3e8967](feast-dev@a3e8967)) * Remove conditional from steps ([995307f](feast-dev@995307f)) * Remove misleading HTTP prefix from gRPC endpoints in logs and doc ([feast-dev#5280](feast-dev#5280)) ([0ee3a1e](feast-dev@0ee3a1e)) * removing id ([268ade2](feast-dev@268ade2)) * Renaming workflow file ([5f46279](feast-dev@5f46279)) * Resolve `no pq wrapper` import issue ([feast-dev#5240](feast-dev#5240)) ([d5906f1](feast-dev@d5906f1)) * Update actions to remove check skip tests ([feast-dev#5275](feast-dev#5275)) ([b976f27](feast-dev@b976f27)) * Update docling demo ([446efea](feast-dev@446efea)) * Update java pr [feast-dev#13](feast-dev#13) ([fda7db7](feast-dev@fda7db7)) * Update java_pr ([fa138f4](feast-dev@fa138f4)) * Update repo_config.py ([6a59815](feast-dev@6a59815)) * Update unit tests workflow ([06486a0](feast-dev@06486a0)) * Updated docs for docling demo ([768e6cc](feast-dev@768e6cc)) * Updating action for unit tests ([0996c28](feast-dev@0996c28)) * Updating github actions to filter at job level ([0a09622](feast-dev@0a09622)) * Updating Java CI ([c7c3a3c](feast-dev@c7c3a3c)) * Updating java pr to skip tests ([e997dd9](feast-dev@e997dd9)) * Updating workflows ([c66bcd2](feast-dev@c66bcd2)) ### Features * Add date_partition_column_format for spark source ([feast-dev#5273](feast-dev#5273)) ([7a61d6f](feast-dev@7a61d6f)) * Add Milvus tutorial with Feast integration ([feast-dev#5292](feast-dev#5292)) ([a1388a5](feast-dev@a1388a5)) * Add pgvector tutorial with PostgreSQL integration ([feast-dev#5290](feast-dev#5290)) ([bb1cbea](feast-dev@bb1cbea)) * Add ReactFlow visualization for Feast registry metadata ([feast-dev#5297](feast-dev#5297)) ([9768970](feast-dev@9768970)) * Add retrieve online documents v2 method into pgvector ([feast-dev#5253](feast-dev#5253)) ([6770ee6](feast-dev@6770ee6)) * Compute Engine Initial Implementation ([feast-dev#5223](feast-dev#5223)) ([64bdafd](feast-dev@64bdafd)) * Enable write node for compute engine ([feast-dev#5287](feast-dev#5287)) ([f9baf97](feast-dev@f9baf97)) * Local compute engine ([feast-dev#5278](feast-dev#5278)) ([8e06dfe](feast-dev@8e06dfe)) * Make transform on writes configurable for ingestion ([feast-dev#5283](feast-dev#5283)) ([ecad170](feast-dev@ecad170)) * Offline store update pull_all_from_table_or_query to make timestampfield optional ([feast-dev#5281](feast-dev#5281)) ([4b94608](feast-dev@4b94608)) * Serialization version 2 deprecation notice ([feast-dev#5248](feast-dev#5248)) ([327d99d](feast-dev@327d99d)) * Vector length definition moved to Feature View from Config ([feast-dev#5289](feast-dev#5289)) ([d8f1c97](feast-dev@d8f1c97)) Signed-off-by: Jacob Weinhold <[email protected]>
What this PR does / why we need it:
Introduce the ComputeEngine component that runs current
materialization
andget_historical_features
on a compute engine such as Spark, Flink, etc.This will hand over heavy computing work from online/offline stores to the engines. And let the online/offline deal with IO only.
The compute engine builds the execution plan in a DAG format named
FeatureBuilder
. It derives feature generation from Feature View definitiions including:User facing API, e.g:
Feature build flow:
ttl
, andfilter
aggregations
udf
orfeature_transformations
Current use case:
TODOs:
batch_engine
andstream_engine
configurations.Which issue(s) this PR fixes:
Misc