@@ -1559,75 +1559,16 @@ def get_online_features(
15591559            ... ) 
15601560            >>> online_response_dict = online_response.to_dict() 
15611561        """ 
1562-         if  isinstance (entity_rows , list ):
1563-             columnar : Dict [str , List [Any ]] =  {k : [] for  k  in  entity_rows [0 ].keys ()}
1564-             for  entity_row  in  entity_rows :
1565-                 for  key , value  in  entity_row .items ():
1566-                     try :
1567-                         columnar [key ].append (value )
1568-                     except  KeyError  as  e :
1569-                         raise  ValueError (
1570-                             "All entity_rows must have the same keys." 
1571-                         ) from  e 
1572- 
1573-             entity_rows  =  columnar 
1562+         provider  =  self ._get_provider ()
15741563
1575-         (
1576-             join_key_values ,
1577-             grouped_refs ,
1578-             entity_name_to_join_key_map ,
1579-             requested_on_demand_feature_views ,
1580-             feature_refs ,
1581-             requested_result_row_names ,
1582-             online_features_response ,
1583-         ) =  utils ._prepare_entities_to_read_from_online_store (
1564+         return  provider .get_online_features (
1565+             config = self .config ,
1566+             features = features ,
1567+             entity_rows = entity_rows ,
15841568            registry = self ._registry ,
15851569            project = self .project ,
1586-             features = features ,
1587-             entity_values = entity_rows ,
15881570            full_feature_names = full_feature_names ,
1589-             native_entity_values = True ,
1590-         )
1591- 
1592-         provider  =  self ._get_provider ()
1593-         for  table , requested_features  in  grouped_refs :
1594-             # Get the correct set of entity values with the correct join keys. 
1595-             table_entity_values , idxs  =  utils ._get_unique_entities (
1596-                 table ,
1597-                 join_key_values ,
1598-                 entity_name_to_join_key_map ,
1599-             )
1600- 
1601-             # Fetch feature data for the minimum set of Entities. 
1602-             feature_data  =  self ._read_from_online_store (
1603-                 table_entity_values ,
1604-                 provider ,
1605-                 requested_features ,
1606-                 table ,
1607-             )
1608- 
1609-             # Populate the result_rows with the Features from the OnlineStore inplace. 
1610-             utils ._populate_response_from_feature_data (
1611-                 feature_data ,
1612-                 idxs ,
1613-                 online_features_response ,
1614-                 full_feature_names ,
1615-                 requested_features ,
1616-                 table ,
1617-             )
1618- 
1619-         if  requested_on_demand_feature_views :
1620-             utils ._augment_response_with_on_demand_transforms (
1621-                 online_features_response ,
1622-                 feature_refs ,
1623-                 requested_on_demand_feature_views ,
1624-                 full_feature_names ,
1625-             )
1626- 
1627-         utils ._drop_unneeded_columns (
1628-             online_features_response , requested_result_row_names 
16291571        )
1630-         return  OnlineResponse (online_features_response )
16311572
16321573    async  def  get_online_features_async (
16331574        self ,
@@ -1664,75 +1605,16 @@ async def get_online_features_async(
16641605        Raises: 
16651606            Exception: No entity with the specified name exists. 
16661607        """ 
1667-         if  isinstance (entity_rows , list ):
1668-             columnar : Dict [str , List [Any ]] =  {k : [] for  k  in  entity_rows [0 ].keys ()}
1669-             for  entity_row  in  entity_rows :
1670-                 for  key , value  in  entity_row .items ():
1671-                     try :
1672-                         columnar [key ].append (value )
1673-                     except  KeyError  as  e :
1674-                         raise  ValueError (
1675-                             "All entity_rows must have the same keys." 
1676-                         ) from  e 
1677- 
1678-             entity_rows  =  columnar 
1608+         provider  =  self ._get_provider ()
16791609
1680-         (
1681-             join_key_values ,
1682-             grouped_refs ,
1683-             entity_name_to_join_key_map ,
1684-             requested_on_demand_feature_views ,
1685-             feature_refs ,
1686-             requested_result_row_names ,
1687-             online_features_response ,
1688-         ) =  utils ._prepare_entities_to_read_from_online_store (
1610+         return  await  provider .get_online_features_async (
1611+             config = self .config ,
1612+             features = features ,
1613+             entity_rows = entity_rows ,
16891614            registry = self ._registry ,
16901615            project = self .project ,
1691-             features = features ,
1692-             entity_values = entity_rows ,
16931616            full_feature_names = full_feature_names ,
1694-             native_entity_values = True ,
1695-         )
1696- 
1697-         provider  =  self ._get_provider ()
1698-         for  table , requested_features  in  grouped_refs :
1699-             # Get the correct set of entity values with the correct join keys. 
1700-             table_entity_values , idxs  =  utils ._get_unique_entities (
1701-                 table ,
1702-                 join_key_values ,
1703-                 entity_name_to_join_key_map ,
1704-             )
1705- 
1706-             # Fetch feature data for the minimum set of Entities. 
1707-             feature_data  =  await  self ._read_from_online_store_async (
1708-                 table_entity_values ,
1709-                 provider ,
1710-                 requested_features ,
1711-                 table ,
1712-             )
1713- 
1714-             # Populate the result_rows with the Features from the OnlineStore inplace. 
1715-             utils ._populate_response_from_feature_data (
1716-                 feature_data ,
1717-                 idxs ,
1718-                 online_features_response ,
1719-                 full_feature_names ,
1720-                 requested_features ,
1721-                 table ,
1722-             )
1723- 
1724-         if  requested_on_demand_feature_views :
1725-             utils ._augment_response_with_on_demand_transforms (
1726-                 online_features_response ,
1727-                 feature_refs ,
1728-                 requested_on_demand_feature_views ,
1729-                 full_feature_names ,
1730-             )
1731- 
1732-         utils ._drop_unneeded_columns (
1733-             online_features_response , requested_result_row_names 
17341617        )
1735-         return  OnlineResponse (online_features_response )
17361618
17371619    def  retrieve_online_documents (
17381620        self ,
@@ -1806,53 +1688,6 @@ def retrieve_online_documents(
18061688        )
18071689        return  OnlineResponse (online_features_response )
18081690
1809-     def  _read_from_online_store (
1810-         self ,
1811-         entity_rows : Iterable [Mapping [str , Value ]],
1812-         provider : Provider ,
1813-         requested_features : List [str ],
1814-         table : FeatureView ,
1815-     ) ->  List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
1816-         """Read and process data from the OnlineStore for a given FeatureView. 
1817- 
1818-         This method guarantees that the order of the data in each element of the 
1819-         List returned is the same as the order of `requested_features`. 
1820- 
1821-         This method assumes that `provider.online_read` returns data for each 
1822-         combination of Entities in `entity_rows` in the same order as they 
1823-         are provided. 
1824-         """ 
1825-         entity_key_protos  =  utils ._get_entity_key_protos (entity_rows )
1826- 
1827-         # Fetch data for Entities. 
1828-         read_rows  =  provider .online_read (
1829-             config = self .config ,
1830-             table = table ,
1831-             entity_keys = entity_key_protos ,
1832-             requested_features = requested_features ,
1833-         )
1834- 
1835-         return  utils ._convert_rows_to_protobuf (requested_features , read_rows )
1836- 
1837-     async  def  _read_from_online_store_async (
1838-         self ,
1839-         entity_rows : Iterable [Mapping [str , Value ]],
1840-         provider : Provider ,
1841-         requested_features : List [str ],
1842-         table : FeatureView ,
1843-     ) ->  List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
1844-         entity_key_protos  =  utils ._get_entity_key_protos (entity_rows )
1845- 
1846-         # Fetch data for Entities. 
1847-         read_rows  =  await  provider .online_read_async (
1848-             config = self .config ,
1849-             table = table ,
1850-             entity_keys = entity_key_protos ,
1851-             requested_features = requested_features ,
1852-         )
1853- 
1854-         return  utils ._convert_rows_to_protobuf (requested_features , read_rows )
1855- 
18561691    def  _retrieve_from_online_store (
18571692        self ,
18581693        provider : Provider ,
0 commit comments