Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,12 @@ def with_join_key_map(self, join_key_map: Dict[str, str]):

return cp

def update_materialization_intervals(self, existing_materialization_intervals):
if existing_materialization_intervals:
def update_materialization_intervals(
self, existing_materialization_intervals: List[Tuple[datetime, datetime]]
):
if len(existing_materialization_intervals) > 0:
for interval in existing_materialization_intervals:
self.materialization_intervals.append(
(interval.start_time.ToDatetime(), interval.end_time.ToDatetime())
)
self.materialization_intervals.append((interval[0], interval[1]))

def to_proto(self) -> FeatureViewProto:
"""
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,11 @@ def apply_feature_view(
existing_feature_view_proto
)
feature_view.created_timestamp = (
existing_feature_view.created_timestamp.replace(tzinfo=None)
existing_feature_view.created_timestamp
)
if isinstance(feature_view, (FeatureView, StreamFeatureView)):
feature_view.update_materialization_intervals(
existing_feature_view_proto.meta.materialization_intervals
existing_feature_view.materialization_intervals
)
feature_view_proto = feature_view.to_proto()
feature_view_proto.spec.project = project
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/registry/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ def apply_materialization(
start_date_timestamp.FromDatetime(start_date)
end_date_timestamp.FromDatetime(end_date)

# TODO: for this to work for stream feature views, ApplyMaterializationRequest needs to be updated
request = RegistryServer_pb2.ApplyMaterializationRequest(
feature_view=feature_view.to_proto(),
project=project,
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,9 @@ def _apply_object(
)
if isinstance(obj, (FeatureView, StreamFeatureView)):
obj.update_materialization_intervals(
deserialized_proto.meta.materialization_intervals
type(obj)
.from_proto(deserialized_proto)
.materialization_intervals
)
values = {
proto_field_name: obj.to_proto().SerializeToString(),
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/registry_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import grpc
from google.protobuf.empty_pb2 import Empty
from pytz import utc

from feast import FeatureStore
from feast.data_source import DataSource
Expand Down Expand Up @@ -293,10 +294,10 @@ def ApplyMaterialization(
feature_view=FeatureView.from_proto(request.feature_view),
project=request.project,
start_date=datetime.fromtimestamp(
request.start_date.seconds + request.start_date.nanos / 1e9
request.start_date.seconds + request.start_date.nanos / 1e9, tz=utc
),
end_date=datetime.fromtimestamp(
request.end_date.seconds + request.end_date.nanos / 1e9
request.end_date.seconds + request.end_date.nanos / 1e9, tz=utc
),
commit=request.commit,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
import logging
import os
import time
from datetime import timedelta
from datetime import datetime, timedelta
from tempfile import mkstemp
from unittest import mock

import grpc_testing
import pandas as pd
import pytest
from pytest_lazyfixture import lazy_fixture
from pytz import utc
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.minio import MinioContainer
Expand Down Expand Up @@ -783,6 +784,28 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame:
assert (
updated_feature_view.created_timestamp is not None
and updated_feature_view.created_timestamp == feature_view.created_timestamp
and len(updated_feature_view.materialization_intervals) == 0
)

# Simulate materialization
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should put this before test_registry.apply_feature_view(updated_fv1, project) to make sure that an apply operation doesn't mess with materialization intervals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

current_date = datetime.utcnow()
end_date = current_date.replace(tzinfo=utc)
start_date = (current_date - timedelta(days=1)).replace(tzinfo=utc)
test_registry.apply_materialization(
updated_feature_view, project, start_date, end_date
)
materialized_feature_view = test_registry.get_feature_view(
"my_feature_view_1", project
)

# Check if materialization_intervals was updated by the registry
assert (
materialized_feature_view.created_timestamp is not None
and materialized_feature_view.created_timestamp
== feature_view.created_timestamp
and len(materialized_feature_view.materialization_intervals) > 0
and materialized_feature_view.materialization_intervals[0][0] == start_date
and materialized_feature_view.materialization_intervals[0][1] == end_date
)

# Modify sfv by changing the dtype
Expand Down Expand Up @@ -844,13 +867,13 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame:

# The created_timestamp for the stream feature view should be set to the created_timestamp value stored from the
# previous apply
# Materialization_intervals is not set
assert (
updated_sfv.created_timestamp is not None
and updated_sfv.created_timestamp == existing_sfv.created_timestamp
and len(updated_sfv.materialization_intervals) == 0
)

test_registry.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
Expand Down
1 change: 0 additions & 1 deletion sdk/python/tests/unit/test_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import assertpy
import pytest

Expand Down
15 changes: 8 additions & 7 deletions sdk/python/tests/unit/test_feature_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from typeguard import TypeCheckError

from feast import utils
from feast.batch_feature_view import BatchFeatureView
from feast.data_format import AvroFormat
from feast.data_source import KafkaSource
Expand Down Expand Up @@ -138,13 +139,13 @@ def test_update_materialization_intervals():
source=batch_source,
)
updated_feature_view.update_materialization_intervals(
stored_feature_view.to_proto().meta.materialization_intervals
stored_feature_view.materialization_intervals
)
assert len(updated_feature_view.materialization_intervals) == 0

current_time = datetime.now()
start_date = current_time - timedelta(days=1)
end_date = current_time
current_time = datetime.utcnow()
start_date = utils.make_tzaware(current_time - timedelta(days=1))
end_date = utils.make_tzaware(current_time)
updated_feature_view.materialization_intervals.append((start_date, end_date))

# Update the Feature View, i.e. simply update the name
Expand All @@ -156,14 +157,14 @@ def test_update_materialization_intervals():
)

second_updated_feature_view.update_materialization_intervals(
updated_feature_view.to_proto().meta.materialization_intervals
updated_feature_view.materialization_intervals
)
assert len(second_updated_feature_view.materialization_intervals) == 1
assert (
second_updated_feature_view.materialization_intervals[0][0]
== second_updated_feature_view.materialization_intervals[0][0]
== updated_feature_view.materialization_intervals[0][0]
)
assert (
second_updated_feature_view.materialization_intervals[0][1]
== second_updated_feature_view.materialization_intervals[0][1]
== updated_feature_view.materialization_intervals[0][1]
)
1 change: 1 addition & 0 deletions sdk/python/tests/unit/test_on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict, List

import pandas as pd
Expand Down
11 changes: 6 additions & 5 deletions sdk/python/tests/unit/test_stream_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

from feast import utils
from feast.aggregation import Aggregation
from feast.batch_feature_view import BatchFeatureView
from feast.data_format import AvroFormat
Expand Down Expand Up @@ -285,12 +286,12 @@ def test_update_materialization_intervals():
udf=simple_udf,
tags={},
)
current_time = datetime.now()
start_date = current_time - timedelta(days=1)
end_date = current_time
current_time = datetime.utcnow()
start_date = utils.make_tzaware(current_time - timedelta(days=1))
end_date = utils.make_tzaware(current_time)
stored_stream_feature_view.materialization_intervals.append((start_date, end_date))

# # Update the stream feature view i.e. here it's simply the name
# Update the stream feature view i.e. here it's simply the name
updated_stream_feature_view = StreamFeatureView(
name="test kafka stream feature view updated",
entities=[entity],
Expand All @@ -314,7 +315,7 @@ def test_update_materialization_intervals():
)

updated_stream_feature_view.update_materialization_intervals(
stored_stream_feature_view.to_proto().meta.materialization_intervals
stored_stream_feature_view.materialization_intervals
)

assert (
Expand Down
28 changes: 26 additions & 2 deletions sdk/python/tests/utils/e2e_test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,34 @@ def validate_offline_online_store_consistency(
fs.materialize_incremental(feature_views=[fv.name], end_date=now)
updated_fv = fs.registry.get_feature_view(fv.name, fs.project)

print(
"Materialized first interval equal",
updated_fv.materialization_intervals[0][0],
start_date,
)
print(
"Materialized first interval equal",
updated_fv.materialization_intervals[0][1],
end_date,
)
print(
"Materialized second interval equal",
updated_fv.materialization_intervals[1][0],
end_date,
)
print(
"Materialized second interval equal",
updated_fv.materialization_intervals[1][1],
now,
)

# Check if materialization_intervals was updated by the registry
assert (
updated_fv.materialization_intervals is not None
and len(updated_fv.materialization_intervals) > 0
len(updated_fv.materialization_intervals) > 0
and updated_fv.materialization_intervals[0][0] == start_date
and updated_fv.materialization_intervals[0][1] == end_date
and updated_fv.materialization_intervals[1][0] == end_date
and updated_fv.materialization_intervals[1][1] == now.replace(tzinfo=utc)
)

# check result of materialize_incremental()
Expand Down