-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Closed
Labels
Description
Expected Behavior
With feast version 0.19.3
, feast materialize
should not throw an unhandled exception
In feast version 0.18.1
, everything works as expected.
→ python feast_materialize.py
Materializing 1 feature views from 2022-03-10 05:41:44-08:00 to 2022-03-11 05:41:44-08:00 into the dynamodb online store.
ryoung_division_by_zero_reproducer:
100%|█████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 19.34it/s]
Current Behavior
→ python feast_materialize.py
/Users/ryoung/.pyenv/versions/3.8.10/lib/python3.8/importlib/__init__.py:127: DeprecationWarning: The toolz.compatibility module is no longer needed in Python 3 and has been deprecated. Please import these utilities directly from the standard library. This module will be removed in a future release.
return _bootstrap._gcd_import(name[level:], package, level)
Materializing 1 feature views from 2022-03-10 05:42:56-08:00 to 2022-03-11 05:42:56-08:00 into the dynamodb online store.
ryoung_division_by_zero_reproducer:
Traceback (most recent call last):
File "feast_materialize.py", line 32, in <module>
fs.materialize(
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/feast/usage.py", line 269, in wrapper
return func(*args, **kwargs)
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/feast/feature_store.py", line 1130, in materialize
provider.materialize_single_feature_view(
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/feast/infra/passthrough_provider.py", line 154, in materialize_single_feature_view
table = offline_job.to_arrow()
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/feast/infra/offline_stores/offline_store.py", line 121, in to_arrow
return self._to_arrow_internal()
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/feast/usage.py", line 280, in wrapper
raise exc.with_traceback(traceback)
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/feast/usage.py", line 269, in wrapper
return func(*args, **kwargs)
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/feast/infra/offline_stores/file.py", line 75, in _to_arrow_internal
df = self.evaluation_function().compute()
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/feast/infra/offline_stores/file.py", line 309, in evaluate_offline_job
source_df = source_df.sort_values(by=event_timestamp_column)
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/dask/dataframe/core.py", line 4388, in sort_values
return sort_values(
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/dask/dataframe/shuffle.py", line 146, in sort_values
df = rearrange_by_divisions(
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/dask/dataframe/shuffle.py", line 446, in rearrange_by_divisions
df3 = rearrange_by_column(
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/dask/dataframe/shuffle.py", line 473, in rearrange_by_column
df = df.repartition(npartitions=npartitions)
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/dask/dataframe/core.py", line 1319, in repartition
return repartition_npartitions(self, npartitions)
File "/Users/ryoung/.pyenv/versions/3.8.10/envs/python-monorepo-3.8.10/lib/python3.8/site-packages/dask/dataframe/core.py", line 6859, in repartition_npartitions
npartitions_ratio = df.npartitions / npartitions
ZeroDivisionError: division by zero
Steps to reproduce
Create a list of feature records in PySpark and write them out as a parquet file.
from pyspark.sql import types as T
from datetime import datetime, timedelta
INPUT_SCHEMA = T.StructType(
[
T.StructField("id", T.StringType(), False),
T.StructField("feature1", T.FloatType(), False),
T.StructField("feature2", T.FloatType(), False),
T.StructField("event_timestamp", T.TimestampType(), False),
]
)
now = datetime.now()
one_hour_ago = now - timedelta(hours=1)
feature_records = [
{
"id": "foo",
"event_timestamp": one_hour_ago,
"feature1": 5.50,
"feature2": 7.50,
},
{
"id": "bar",
"event_timestamp": one_hour_ago,
"feature1": -1.10,
"feature2": 2.20,
},
]
df = spark.createDataFrame(data=feature_records, schema=INPUT_SCHEMA)
df.show(truncate=False)
df.write.parquet(mode="overwrite", path="s3://XXX/reproducer/2022-03-11T05:34:51.599215/")
The output should look something like:
+---+--------+--------+--------------------------+
|id |feature1|feature2|event_timestamp |
+---+--------+--------+--------------------------+
|foo|5.5 |7.5 |2022-03-11 04:35:39.318222|
|bar|-1.1 |2.2 |2022-03-11 04:35:39.318222|
+---+--------+--------+--------------------------+
Create a feast_materialize.py
script.
from datetime import datetime, timedelta
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource, ValueType
now = datetime.utcnow()
one_day_ago = now - timedelta(days=1)
s3_url = "s3://XXX/reproducer/2022-03-11T05:34:51.599215/"
offline_features_dump = FileSource(
path=s3_url,
event_timestamp_column="event_timestamp",
)
entity = Entity(name="id", value_type=ValueType.STRING)
feature_names = ["feature1", "feature2"]
feature_view = FeatureView(
name="ryoung_division_by_zero_reproducer",
entities=["id"],
ttl=timedelta(days=30),
features=[Feature(name=f, dtype=ValueType.FLOAT) for f in feature_names],
online=True,
batch_source=offline_features_dump,
)
fs = FeatureStore(".")
fs.apply(entity)
fs.apply(feature_view)
fs.materialize(
start_date=one_day_ago,
end_date=now,
feature_views=["ryoung_division_by_zero_reproducer"],
)
Note that you need to supply your own S3 bucket.
Specifications
- Version:
0.19.3
- Platform:
Darwin Kernel Version 21.3.0
- Subsystem:
Possible Solution
I downgraded back to feast version 0.18.1
.