Skip to content

Commit 2c38946

Browse files
authored
feat: Add SingleStore as an OnlineStore (feast-dev#4285)
Add SingleStore as an OnlineStore Signed-off-by: Olha Kramarenko <[email protected]>
1 parent 372fd75 commit 2c38946

File tree

15 files changed

+433
-6
lines changed

15 files changed

+433
-6
lines changed

Makefile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,17 @@ test-python-universal-cassandra-no-cloud-providers:
331331
not test_snowflake" \
332332
sdk/python/tests
333333

334+
test-python-universal-singlestore-online:
335+
PYTHONPATH='.' \
336+
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.singlestore_repo_configuration \
337+
PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.singlestore \
338+
python -m pytest -n 8 --integration \
339+
-k "not test_universal_cli and \
340+
not gcs_registry and \
341+
not s3_registry and \
342+
not test_snowflake" \
343+
sdk/python/tests
344+
334345
test-python-universal:
335346
python -m pytest -n 8 --integration sdk/python/tests
336347

docs/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
* [Rockset (contrib)](reference/online-stores/rockset.md)
104104
* [Hazelcast (contrib)](reference/online-stores/hazelcast.md)
105105
* [ScyllaDB (contrib)](reference/online-stores/scylladb.md)
106+
* [SingleStore (contrib)](reference/online-stores/singlestore.md)
106107
* [Providers](reference/providers/README.md)
107108
* [Local](reference/providers/local.md)
108109
* [Google Cloud Platform](reference/providers/google-cloud-platform.md)

docs/reference/online-stores/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli
6464

6565
{% content-ref url="remote.md" %}
6666
[remote.md](remote.md)
67+
68+
{% content-ref url="singlestore.md" %}
69+
[singlestore.md](singlestore.md)
6770
{% endcontent-ref %}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# SingleStore online store (contrib)
2+
3+
## Description
4+
5+
The SingleStore online store provides support for materializing feature values into a SingleStore database for serving online features.
6+
7+
## Getting started
8+
In order to use this online store, you'll need to run `pip install 'feast[singlestore]'`. You can get started by then running `feast init` and then setting the `feature_store.yaml` as described below.
9+
10+
## Example
11+
12+
{% code title="feature_store.yaml" %}
13+
```yaml
14+
project: my_feature_repo
15+
registry: data/registry.db
16+
provider: local
17+
online_store:
18+
type: singlestore
19+
host: DB_HOST
20+
port: DB_PORT
21+
database: DB_NAME
22+
user: DB_USERNAME
23+
password: DB_PASSWORD
24+
```
25+
{% endcode %}
26+
27+
## Functionality Matrix
28+
29+
The set of functionality supported by online stores is described in detail [here](overview.md#functionality).
30+
Below is a matrix indicating which functionality is supported by the SingleStore online store.
31+
32+
| | SingleStore |
33+
| :-------------------------------------------------------- | :----------- |
34+
| write feature values to the online store | yes |
35+
| read feature values from the online store | yes |
36+
| update infrastructure (e.g. tables) in the online store | yes |
37+
| teardown infrastructure (e.g. tables) in the online store | yes |
38+
| generate a plan of infrastructure changes | no |
39+
| support for on-demand transforms | yes |
40+
| readable by Python SDK | yes |
41+
| readable by Java | no |
42+
| readable by Go | no |
43+
| support for entityless feature views | yes |
44+
| support for concurrent writing to the same key | no |
45+
| support for ttl (time to live) at retrieval | no |
46+
| support for deleting expired data | no |
47+
| collocated by feature view | yes |
48+
| collocated by feature service | no |
49+
| collocated by entity key | no |
50+
51+
To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix).

sdk/python/docs/source/feast.infra.online_stores.contrib.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ feast.infra.online\_stores.contrib.postgres\_repo\_configuration module
8989
:undoc-members:
9090
:show-inheritance:
9191

92+
feast.infra.online\_stores.contrib.singlestore\_repo\_configuration module
93+
--------------------------------------------------------------------------
94+
95+
.. automodule:: feast.infra.online_stores.contrib.singlestore_repo_configuration
96+
:members:
97+
:undoc-members:
98+
:show-inheritance:
99+
92100
Module contents
93101
---------------
94102

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
feast.infra.registry.contrib.postgres package
2+
=============================================
3+
4+
Submodules
5+
----------
6+
7+
feast.infra.registry.contrib.postgres.postgres\_registry\_store module
8+
----------------------------------------------------------------------
9+
10+
.. automodule:: feast.infra.registry.contrib.postgres.postgres_registry_store
11+
:members:
12+
:undoc-members:
13+
:show-inheritance:
14+
15+
Module contents
16+
---------------
17+
18+
.. automodule:: feast.infra.registry.contrib.postgres
19+
:members:
20+
:undoc-members:
21+
:show-inheritance:

sdk/python/docs/source/feast.infra.registry.contrib.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Subpackages
88
:maxdepth: 4
99

1010
feast.infra.registry.contrib.azure
11+
feast.infra.registry.contrib.postgres
1112

1213
Module contents
1314
---------------
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
from __future__ import absolute_import
2+
3+
from collections import defaultdict
4+
from datetime import datetime
5+
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
6+
7+
import pytz
8+
import singlestoredb
9+
from pydantic import StrictStr
10+
from singlestoredb.connection import Connection, Cursor
11+
from singlestoredb.exceptions import InterfaceError
12+
13+
from feast import Entity, FeatureView, RepoConfig
14+
from feast.infra.key_encoding_utils import serialize_entity_key
15+
from feast.infra.online_stores.online_store import OnlineStore
16+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
17+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
18+
from feast.repo_config import FeastConfigBaseModel
19+
20+
21+
class SingleStoreOnlineStoreConfig(FeastConfigBaseModel):
22+
"""
23+
Configuration for the SingleStore online store.
24+
NOTE: The class *must* end with the `OnlineStoreConfig` suffix.
25+
"""
26+
27+
type: Literal["singlestore"] = "singlestore"
28+
29+
host: Optional[StrictStr] = None
30+
user: Optional[StrictStr] = None
31+
password: Optional[StrictStr] = None
32+
database: Optional[StrictStr] = None
33+
port: Optional[int] = None
34+
35+
36+
class SingleStoreOnlineStore(OnlineStore):
37+
"""
38+
An online store implementation that uses SingleStore.
39+
NOTE: The class *must* end with the `OnlineStore` suffix.
40+
"""
41+
42+
_conn: Optional[Connection] = None
43+
44+
def _init_conn(self, config: RepoConfig) -> Connection:
45+
online_store_config = config.online_store
46+
assert isinstance(online_store_config, SingleStoreOnlineStoreConfig)
47+
return singlestoredb.connect(
48+
host=online_store_config.host or "127.0.0.1",
49+
user=online_store_config.user or "test",
50+
password=online_store_config.password or "test",
51+
database=online_store_config.database or "feast",
52+
port=online_store_config.port or 3306,
53+
autocommit=True,
54+
)
55+
56+
def _get_cursor(self, config: RepoConfig) -> Any:
57+
# This will try to reconnect also.
58+
# In case it fails, we will have to create a new connection.
59+
if not self._conn:
60+
self._conn = self._init_conn(config)
61+
try:
62+
self._conn.ping(reconnect=True)
63+
except InterfaceError:
64+
self._conn = self._init_conn(config)
65+
return self._conn.cursor()
66+
67+
def online_write_batch(
68+
self,
69+
config: RepoConfig,
70+
table: FeatureView,
71+
data: List[
72+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
73+
],
74+
progress: Optional[Callable[[int], Any]],
75+
) -> None:
76+
project = config.project
77+
with self._get_cursor(config) as cur:
78+
insert_values = []
79+
for entity_key, values, timestamp, created_ts in data:
80+
entity_key_bin = serialize_entity_key(
81+
entity_key,
82+
entity_key_serialization_version=2,
83+
).hex()
84+
timestamp = _to_naive_utc(timestamp)
85+
if created_ts is not None:
86+
created_ts = _to_naive_utc(created_ts)
87+
88+
for feature_name, val in values.items():
89+
insert_values.append(
90+
(
91+
entity_key_bin,
92+
feature_name,
93+
val.SerializeToString(),
94+
timestamp,
95+
created_ts,
96+
)
97+
)
98+
# Control the batch so that we can update the progress
99+
batch_size = 50000
100+
for i in range(0, len(insert_values), batch_size):
101+
current_batch = insert_values[i : i + batch_size]
102+
cur.executemany(
103+
f"""
104+
INSERT INTO {_table_id(project, table)}
105+
(entity_key, feature_name, value, event_ts, created_ts)
106+
values (%s, %s, %s, %s, %s)
107+
ON DUPLICATE KEY UPDATE
108+
value = VALUES(value),
109+
event_ts = VALUES(event_ts),
110+
created_ts = VALUES(created_ts);
111+
""",
112+
current_batch,
113+
)
114+
if progress:
115+
progress(len(current_batch))
116+
117+
def online_read(
118+
self,
119+
config: RepoConfig,
120+
table: FeatureView,
121+
entity_keys: List[EntityKeyProto],
122+
requested_features: Optional[List[str]] = None,
123+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
124+
project = config.project
125+
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
126+
with self._get_cursor(config) as cur:
127+
keys = []
128+
for entity_key in entity_keys:
129+
keys.append(
130+
serialize_entity_key(
131+
entity_key,
132+
entity_key_serialization_version=2,
133+
).hex()
134+
)
135+
136+
if not requested_features:
137+
entity_key_placeholders = ",".join(["%s" for _ in keys])
138+
cur.execute(
139+
f"""
140+
SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)}
141+
WHERE entity_key IN ({entity_key_placeholders})
142+
ORDER BY event_ts;
143+
""",
144+
tuple(keys),
145+
)
146+
else:
147+
entity_key_placeholders = ",".join(["%s" for _ in keys])
148+
requested_features_placeholders = ",".join(
149+
["%s" for _ in requested_features]
150+
)
151+
cur.execute(
152+
f"""
153+
SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)}
154+
WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders})
155+
ORDER BY event_ts;
156+
""",
157+
tuple(keys + requested_features),
158+
)
159+
rows = cur.fetchall() or []
160+
161+
# Since we don't know the order returned from MySQL we'll need
162+
# to construct a dict to be able to quickly look up the correct row
163+
# when we iterate through the keys since they are in the correct order
164+
values_dict = defaultdict(list)
165+
for row in rows:
166+
values_dict[row[0]].append(row[1:])
167+
168+
for key in keys:
169+
if key in values_dict:
170+
key_values = values_dict[key]
171+
res = {}
172+
res_ts: Optional[datetime] = None
173+
for feature_name, value_bin, event_ts in key_values:
174+
val = ValueProto()
175+
val.ParseFromString(bytes(value_bin))
176+
res[feature_name] = val
177+
res_ts = event_ts
178+
result.append((res_ts, res))
179+
else:
180+
result.append((None, None))
181+
return result
182+
183+
def update(
184+
self,
185+
config: RepoConfig,
186+
tables_to_delete: Sequence[FeatureView],
187+
tables_to_keep: Sequence[FeatureView],
188+
entities_to_delete: Sequence[Entity],
189+
entities_to_keep: Sequence[Entity],
190+
partial: bool,
191+
) -> None:
192+
project = config.project
193+
with self._get_cursor(config) as cur:
194+
# We don't create any special state for the entities in this implementation.
195+
for table in tables_to_keep:
196+
cur.execute(
197+
f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512),
198+
feature_name VARCHAR(256),
199+
value BLOB,
200+
event_ts timestamp NULL DEFAULT NULL,
201+
created_ts timestamp NULL DEFAULT NULL,
202+
PRIMARY KEY(entity_key, feature_name),
203+
INDEX {_table_id(project, table)}_ek (entity_key))"""
204+
)
205+
206+
for table in tables_to_delete:
207+
_drop_table_and_index(cur, project, table)
208+
209+
def teardown(
210+
self,
211+
config: RepoConfig,
212+
tables: Sequence[FeatureView],
213+
entities: Sequence[Entity],
214+
) -> None:
215+
project = config.project
216+
with self._get_cursor(config) as cur:
217+
for table in tables:
218+
_drop_table_and_index(cur, project, table)
219+
220+
221+
def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None:
222+
table_name = _table_id(project, table)
223+
cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};")
224+
cur.execute(f"DROP TABLE IF EXISTS {table_name}")
225+
226+
227+
def _table_id(project: str, table: FeatureView) -> str:
228+
return f"{project}_{table.name}"
229+
230+
231+
def _to_naive_utc(ts: datetime) -> datetime:
232+
if ts.tzinfo is None:
233+
return ts
234+
else:
235+
return ts.astimezone(pytz.utc).replace(tzinfo=None)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from tests.integration.feature_repos.integration_test_repo_config import (
2+
IntegrationTestRepoConfig,
3+
)
4+
from tests.integration.feature_repos.universal.online_store.singlestore import (
5+
SingleStoreOnlineStoreCreator,
6+
)
7+
8+
FULL_REPO_CONFIGS = [
9+
IntegrationTestRepoConfig(online_store_creator=SingleStoreOnlineStoreCreator),
10+
]

sdk/python/feast/repo_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
"ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore",
6464
"elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore",
6565
"remote": "feast.infra.online_stores.remote.RemoteOnlineStore",
66+
"singlestore": "feast.infra.online_stores.contrib.singlestore_online_store.singlestore.SingleStoreOnlineStore",
6667
}
6768

6869
OFFLINE_STORE_CLASS_FOR_TYPE = {

0 commit comments

Comments
 (0)