7
7
8
8
from feast import FeatureView , Field
9
9
from feast .types import Float32 , Int32
10
+ from tests .integration .feature_repos .repo_configuration import (
11
+ construct_universal_feature_views ,
12
+ )
10
13
from tests .integration .feature_repos .universal .entities import driver
11
14
12
- # TODO(felixwang9817): Add a unit test that checks that write_to_offline_store can reorder columns.
13
- # This should only happen after https://github.com/feast-dev/feast/issues/2797 is fixed.
14
-
15
15
16
16
@pytest .mark .integration
17
17
@pytest .mark .universal_offline_stores
18
- @pytest .mark .universal_online_stores (only = ["sqlite" ])
19
- def test_writing_incorrect_schema_fails (environment , universal_data_sources ):
20
- """Tests that writing a dataframe with an incorrect schema fails."""
18
+ def test_reorder_columns (environment , universal_data_sources ):
19
+ """Tests that a dataframe with columns in the wrong order is reordered."""
21
20
store = environment .feature_store
22
21
_ , _ , data_sources = universal_data_sources
23
- driver_entity = driver ()
24
- driver_stats = FeatureView (
25
- name = "driver_stats" ,
26
- entities = [driver_entity ],
27
- schema = [
28
- Field (name = "avg_daily_trips" , dtype = Int32 ),
29
- Field (name = "conv_rate" , dtype = Float32 ),
30
- Field (name = "acc_rate" , dtype = Float32 ),
31
- ],
32
- source = data_sources .driver ,
33
- )
22
+ feature_views = construct_universal_feature_views (data_sources )
23
+ driver_fv = feature_views .driver
24
+ store .apply ([driver (), driver_fv ])
34
25
35
26
now = datetime .utcnow ()
36
27
ts = pd .Timestamp (now ).round ("ms" )
37
28
38
- entity_df = pd .DataFrame .from_dict (
39
- {"driver_id" : [1001 , 1002 ], "event_timestamp" : [ts - timedelta (hours = 3 ), ts ]}
29
+ # This dataframe has columns in the wrong order.
30
+ df_to_write = pd .DataFrame .from_dict (
31
+ {
32
+ "avg_daily_trips" : [random .randint (0 , 10 ), random .randint (0 , 10 )],
33
+ "created" : [ts , ts ],
34
+ "conv_rate" : [random .random (), random .random ()],
35
+ "event_timestamp" : [ts , ts ],
36
+ "acc_rate" : [random .random (), random .random ()],
37
+ "driver_id" : [1001 , 1001 ],
38
+ },
40
39
)
41
40
42
- store .apply ([driver_entity , driver_stats ])
43
- df = store .get_historical_features (
44
- entity_df = entity_df ,
45
- features = [
46
- "driver_stats:conv_rate" ,
47
- "driver_stats:acc_rate" ,
48
- "driver_stats:avg_daily_trips" ,
49
- ],
50
- full_feature_names = False ,
51
- ).to_df ()
41
+ store .write_to_offline_store (
42
+ driver_fv .name , df_to_write , allow_registry_cache = False
43
+ )
52
44
53
- assert df ["conv_rate" ].isnull ().all ()
54
- assert df ["acc_rate" ].isnull ().all ()
55
- assert df ["avg_daily_trips" ].isnull ().all ()
45
+
46
+ @pytest .mark .integration
47
+ @pytest .mark .universal_offline_stores
48
+ def test_writing_incorrect_schema_fails (environment , universal_data_sources ):
49
+ """Tests that writing a dataframe with an incorrect schema fails."""
50
+ store = environment .feature_store
51
+ _ , _ , data_sources = universal_data_sources
52
+ feature_views = construct_universal_feature_views (data_sources )
53
+ driver_fv = feature_views .driver
54
+ store .apply ([driver (), driver_fv ])
55
+
56
+ now = datetime .utcnow ()
57
+ ts = pd .Timestamp (now ).round ("ms" )
56
58
57
59
expected_df = pd .DataFrame .from_dict (
58
60
{
@@ -65,13 +67,12 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources):
65
67
)
66
68
with pytest .raises (ValueError ):
67
69
store .write_to_offline_store (
68
- driver_stats .name , expected_df , allow_registry_cache = False
70
+ driver_fv .name , expected_df , allow_registry_cache = False
69
71
)
70
72
71
73
72
74
@pytest .mark .integration
73
75
@pytest .mark .universal_offline_stores
74
- @pytest .mark .universal_online_stores (only = ["sqlite" ])
75
76
def test_writing_consecutively_to_offline_store (environment , universal_data_sources ):
76
77
store = environment .feature_store
77
78
_ , _ , data_sources = universal_data_sources
@@ -96,7 +97,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
96
97
entity_df = pd .DataFrame .from_dict (
97
98
{
98
99
"driver_id" : [1001 , 1001 ],
99
- "event_timestamp" : [ts - timedelta (hours = 4 ), ts - timedelta (hours = 3 )],
100
+ "event_timestamp" : [ts + timedelta (hours = 3 ), ts + timedelta (hours = 4 )],
100
101
}
101
102
)
102
103
@@ -117,7 +118,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
117
118
118
119
first_df = pd .DataFrame .from_dict (
119
120
{
120
- "event_timestamp" : [ts - timedelta (hours = 4 ), ts - timedelta (hours = 3 )],
121
+ "event_timestamp" : [ts + timedelta (hours = 3 ), ts + timedelta (hours = 4 )],
121
122
"driver_id" : [1001 , 1001 ],
122
123
"conv_rate" : [random .random (), random .random ()],
123
124
"acc_rate" : [random .random (), random .random ()],
@@ -155,7 +156,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
155
156
156
157
second_df = pd .DataFrame .from_dict (
157
158
{
158
- "event_timestamp" : [ts - timedelta (hours = 1 ), ts ],
159
+ "event_timestamp" : [ts + timedelta (hours = 5 ), ts + timedelta ( hours = 6 ) ],
159
160
"driver_id" : [1001 , 1001 ],
160
161
"conv_rate" : [random .random (), random .random ()],
161
162
"acc_rate" : [random .random (), random .random ()],
@@ -172,10 +173,10 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
172
173
{
173
174
"driver_id" : [1001 , 1001 , 1001 , 1001 ],
174
175
"event_timestamp" : [
175
- ts - timedelta (hours = 4 ),
176
- ts - timedelta (hours = 3 ),
177
- ts - timedelta (hours = 1 ),
178
- ts ,
176
+ ts + timedelta (hours = 3 ),
177
+ ts + timedelta (hours = 4 ),
178
+ ts + timedelta (hours = 5 ),
179
+ ts + timedelta ( hours = 6 ) ,
179
180
],
180
181
}
181
182
)
0 commit comments