34
34
import feast .serving .registry .RegistryRepository ;
35
35
import feast .serving .util .Metrics ;
36
36
import feast .storage .api .retriever .OnlineRetrieverV2 ;
37
- import io .grpc .Status ;
38
37
import io .opentracing .Span ;
39
38
import io .opentracing .Tracer ;
40
39
import java .util .*;
@@ -51,6 +50,11 @@ public class OnlineServingServiceV2 implements ServingServiceV2 {
51
50
private final OnlineTransformationService onlineTransformationService ;
52
51
private final String project ;
53
52
53
+ public static final String DUMMY_ENTITY_ID = "__dummy_id" ;
54
+ public static final String DUMMY_ENTITY_VAL = "" ;
55
+ public static final ValueProto .Value DUMMY_ENTITY_VALUE =
56
+ ValueProto .Value .newBuilder ().setStringVal (DUMMY_ENTITY_VAL ).build ();
57
+
54
58
public OnlineServingServiceV2 (
55
59
OnlineRetrieverV2 retriever ,
56
60
Tracer tracer ,
@@ -103,31 +107,18 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures(
103
107
104
108
List <Map <String , ValueProto .Value >> entityRows = getEntityRows (request );
105
109
106
- List <String > entityNames ;
107
- if (retrievedFeatureReferences .size () > 0 ) {
108
- entityNames = this .registryRepository .getEntitiesList (retrievedFeatureReferences .get (0 ));
109
- } else {
110
- throw new RuntimeException ("Requested features list must not be empty" );
111
- }
112
-
113
110
Span storageRetrievalSpan = tracer .buildSpan ("storageRetrieval" ).start ();
114
111
if (storageRetrievalSpan != null ) {
115
112
storageRetrievalSpan .setTag ("entities" , entityRows .size ());
116
113
storageRetrievalSpan .setTag ("features" , retrievedFeatureReferences .size ());
117
114
}
115
+
118
116
List <List <feast .storage .api .retriever .Feature >> features =
119
- retriever . getOnlineFeatures ( entityRows , retrievedFeatureReferences , entityNames );
117
+ retrieveFeatures ( retrievedFeatureReferences , entityRows );
120
118
121
119
if (storageRetrievalSpan != null ) {
122
120
storageRetrievalSpan .finish ();
123
121
}
124
- if (features .size () != entityRows .size ()) {
125
- throw Status .INTERNAL
126
- .withDescription (
127
- "The no. of FeatureRow obtained from OnlineRetriever"
128
- + "does not match no. of entityRow passed." )
129
- .asRuntimeException ();
130
- }
131
122
132
123
Span postProcessingSpan = tracer .buildSpan ("postProcessing" ).start ();
133
124
@@ -255,6 +246,84 @@ private List<Map<String, ValueProto.Value>> getEntityRows(
255
246
return entityRows ;
256
247
}
257
248
249
+ private List <List <feast .storage .api .retriever .Feature >> retrieveFeatures (
250
+ List <FeatureReferenceV2 > featureReferences , List <Map <String , ValueProto .Value >> entityRows ) {
251
+ // Prepare feature reference to index mapping. This mapping will be used to arrange the
252
+ // retrieved features to the same order as in the input.
253
+ if (featureReferences .isEmpty ()) {
254
+ throw new RuntimeException ("Requested features list must not be empty." );
255
+ }
256
+ Map <FeatureReferenceV2 , Integer > featureReferenceToIndexMap =
257
+ new HashMap <>(featureReferences .size ());
258
+ for (int i = 0 ; i < featureReferences .size (); i ++) {
259
+ FeatureReferenceV2 featureReference = featureReferences .get (i );
260
+ if (featureReferenceToIndexMap .containsKey (featureReference )) {
261
+ throw new RuntimeException (
262
+ String .format (
263
+ "Found duplicate features %s:%s." ,
264
+ featureReference .getFeatureViewName (), featureReference .getFeatureName ()));
265
+ }
266
+ featureReferenceToIndexMap .put (featureReference , i );
267
+ }
268
+
269
+ // Create placeholders for retrieved features.
270
+ List <List <feast .storage .api .retriever .Feature >> features = new ArrayList <>(entityRows .size ());
271
+ for (int i = 0 ; i < entityRows .size (); i ++) {
272
+ List <feast .storage .api .retriever .Feature > featuresPerEntity =
273
+ new ArrayList <>(featureReferences .size ());
274
+ for (int j = 0 ; j < featureReferences .size (); j ++) {
275
+ featuresPerEntity .add (null );
276
+ }
277
+ features .add (featuresPerEntity );
278
+ }
279
+
280
+ // Group feature references by join keys.
281
+ Map <String , List <FeatureReferenceV2 >> groupNameToFeatureReferencesMap =
282
+ featureReferences .stream ()
283
+ .collect (
284
+ Collectors .groupingBy (
285
+ featureReference ->
286
+ this .registryRepository .getEntitiesList (featureReference ).stream ()
287
+ .map (this .registryRepository ::getEntityJoinKey )
288
+ .sorted ()
289
+ .collect (Collectors .joining ("," ))));
290
+
291
+ // Retrieve features one group at a time.
292
+ for (List <FeatureReferenceV2 > featureReferencesPerGroup :
293
+ groupNameToFeatureReferencesMap .values ()) {
294
+ List <String > entityNames =
295
+ this .registryRepository .getEntitiesList (featureReferencesPerGroup .get (0 ));
296
+ List <Map <String , ValueProto .Value >> entityRowsPerGroup = new ArrayList <>(entityRows .size ());
297
+ for (Map <String , ValueProto .Value > entityRow : entityRows ) {
298
+ Map <String , ValueProto .Value > entityRowPerGroup = new HashMap <>();
299
+ entityNames .stream ()
300
+ .map (this .registryRepository ::getEntityJoinKey )
301
+ .forEach (
302
+ joinKey -> {
303
+ if (joinKey .equals (DUMMY_ENTITY_ID )) {
304
+ entityRowPerGroup .put (joinKey , DUMMY_ENTITY_VALUE );
305
+ } else {
306
+ ValueProto .Value value = entityRow .get (joinKey );
307
+ if (value != null ) {
308
+ entityRowPerGroup .put (joinKey , value );
309
+ }
310
+ }
311
+ });
312
+ entityRowsPerGroup .add (entityRowPerGroup );
313
+ }
314
+ List <List <feast .storage .api .retriever .Feature >> featuresPerGroup =
315
+ retriever .getOnlineFeatures (entityRowsPerGroup , featureReferencesPerGroup , entityNames );
316
+ for (int i = 0 ; i < featuresPerGroup .size (); i ++) {
317
+ for (int j = 0 ; j < featureReferencesPerGroup .size (); j ++) {
318
+ int k = featureReferenceToIndexMap .get (featureReferencesPerGroup .get (j ));
319
+ features .get (i ).set (k , featuresPerGroup .get (i ).get (j ));
320
+ }
321
+ }
322
+ }
323
+
324
+ return features ;
325
+ }
326
+
258
327
private void populateOnDemandFeatures (
259
328
List <FeatureReferenceV2 > onDemandFeatureReferences ,
260
329
List <FeatureReferenceV2 > onDemandFeatureSources ,
0 commit comments