Skip to content

Commit 3cfe4eb

Browse files
feat: Add batch commit mode for MySQL OnlineStore (#5699)
* feat: Support batch_write for MySQL OnlineStore Signed-off-by: Chimey Rock <[email protected]> docs: update document for MySQL Online Store to support batch write Signed-off-by: Chimey Rock <[email protected]> * docs: Add batch_write for MySQL Online Store Signed-off-by: Chimey Rock <[email protected]> --------- Signed-off-by: Chimey Rock <[email protected]>
1 parent 398fdcf commit 3cfe4eb

File tree

5 files changed

+149
-24
lines changed

5 files changed

+149
-24
lines changed

docs/reference/online-stores/mysql.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,28 @@ online_store:
2828
2929
The full set of configuration options is available in [MySQLOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.mysql_online_store.MySQLOnlineStoreConfig).
3030
31+
## Batch write mode
32+
By default, the MySQL online store performs row-by-row insert and commit for each feature record. While this ensures per-record atomicity, it can lead to significant overhead on write operations — especially on distributed SQL databases (for example, TiDB, which is MySQL-compatible and uses a consensus protocol).
33+
34+
To improve writing performance, you can enable batch write mode by setting `batch_write` to `true` and `batch_size`, which executes multiple insert queries in batches and commits them together per batch instead of committing each record individually.
35+
36+
{% code title="feature_store.yaml" %}
37+
```yaml
38+
project: my_feature_repo
39+
registry: data/registry.db
40+
provider: local
41+
online_store:
42+
type: mysql
43+
host: DB_HOST
44+
port: DB_PORT
45+
database: DB_NAME
46+
user: DB_USERNAME
47+
password: DB_PASSWORD
48+
batch_write: true
49+
batch_size: 100
50+
```
51+
{% endcode %}
52+
3153
## Functionality Matrix
3254

3355
The set of functionality supported by online stores is described in detail [here](overview.md#functionality).

sdk/python/feast/infra/online_stores/mysql_online_store/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ online_store:
2525
user: test # mysql user, default to test
2626
password: test # mysql password, default to test
2727
database: feast # mysql database, default to feast
28+
batch_write: false # supporting batch write and commit per batch
29+
batch_size: 100 # batch size, default to 100
30+
2831
```
2932

3033
#### Apply the feature definitions in `example.py`

sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py

Lines changed: 94 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class MySQLOnlineStoreConfig(FeastConfigBaseModel):
3030
password: Optional[StrictStr] = None
3131
database: Optional[StrictStr] = None
3232
port: Optional[int] = None
33+
batch_write: Optional[bool] = False
34+
batch_size: Optional[int] = None
3335

3436

3537
class MySQLOnlineStore(OnlineStore):
@@ -51,7 +53,7 @@ def _get_conn(self, config: RepoConfig) -> Connection:
5153
password=online_store_config.password or "test",
5254
database=online_store_config.database or "feast",
5355
port=online_store_config.port or 3306,
54-
autocommit=True,
56+
autocommit=(not online_store_config.batch_write),
5557
)
5658
return self._conn
5759

@@ -69,29 +71,97 @@ def online_write_batch(
6971

7072
project = config.project
7173

72-
for entity_key, values, timestamp, created_ts in data:
73-
entity_key_bin = serialize_entity_key(
74-
entity_key,
75-
entity_key_serialization_version=3,
76-
).hex()
77-
timestamp = to_naive_utc(timestamp)
78-
if created_ts is not None:
79-
created_ts = to_naive_utc(created_ts)
80-
81-
for feature_name, val in values.items():
82-
self.write_to_table(
83-
created_ts,
84-
cur,
85-
entity_key_bin,
86-
feature_name,
87-
project,
88-
table,
89-
timestamp,
90-
val,
91-
)
92-
conn.commit()
93-
if progress:
94-
progress(1)
74+
batch_write = config.online_store.batch_write
75+
if not batch_write:
76+
for entity_key, values, timestamp, created_ts in data:
77+
entity_key_bin = serialize_entity_key(
78+
entity_key,
79+
entity_key_serialization_version=3,
80+
).hex()
81+
timestamp = to_naive_utc(timestamp)
82+
if created_ts is not None:
83+
created_ts = to_naive_utc(created_ts)
84+
85+
for feature_name, val in values.items():
86+
self.write_to_table(
87+
created_ts,
88+
cur,
89+
entity_key_bin,
90+
feature_name,
91+
project,
92+
table,
93+
timestamp,
94+
val,
95+
)
96+
conn.commit()
97+
if progress:
98+
progress(1)
99+
else:
100+
batch_size = config.online_store.bacth_size
101+
if not batch_size or batch_size < 2:
102+
raise ValueError("Batch size must be at least 2")
103+
insert_values = []
104+
for entity_key, values, timestamp, created_ts in data:
105+
entity_key_bin = serialize_entity_key(
106+
entity_key,
107+
entity_key_serialization_version=2,
108+
).hex()
109+
timestamp = to_naive_utc(timestamp)
110+
if created_ts is not None:
111+
created_ts = to_naive_utc(created_ts)
112+
113+
for feature_name, val in values.items():
114+
serialized_val = val.SerializeToString()
115+
insert_values.append(
116+
(
117+
entity_key_bin,
118+
feature_name,
119+
serialized_val,
120+
timestamp,
121+
created_ts,
122+
)
123+
)
124+
125+
if len(insert_values) >= batch_size:
126+
try:
127+
self._execute_batch(cur, project, table, insert_values)
128+
conn.commit()
129+
if progress:
130+
progress(len(insert_values))
131+
except Exception as e:
132+
conn.rollback()
133+
raise e
134+
insert_values.clear()
135+
136+
if insert_values:
137+
try:
138+
self._execute_batch(cur, project, table, insert_values)
139+
conn.commit()
140+
if progress:
141+
progress(len(insert_values))
142+
except Exception as e:
143+
conn.rollback()
144+
raise e
145+
146+
def _execute_batch(self, cur, project, table, insert_values):
147+
sql = f"""
148+
INSERT INTO {_table_id(project, table)}
149+
(entity_key, feature_name, value, event_ts, created_ts)
150+
values (%s, %s, %s, %s, %s)
151+
ON DUPLICATE KEY UPDATE
152+
value = VALUES(value),
153+
event_ts = VALUES(event_ts),
154+
created_ts = VALUES(created_ts);
155+
"""
156+
try:
157+
cur.executemany(sql, insert_values)
158+
except Exception as e:
159+
# Log SQL info for debugging without leaking sensitive data
160+
first_sample = insert_values[0] if insert_values else None
161+
raise RuntimeError(
162+
f"Failed to execute batch insert into table '{_table_id(project, table)}' "
163+
f"(rows={len(insert_values)}, sample={first_sample}): {e}"
164+
) from e
95165

96166
@staticmethod
97167
def write_to_table(

sdk/python/feast/infra/online_stores/mysql_online_store/mysql_repo_configuration.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
IntegrationTestRepoConfig,
33
)
44
from tests.integration.feature_repos.universal.online_store.mysql import (
5+
BatchWriteMySQLOnlineStoreCreator,
56
MySQLOnlineStoreCreator,
67
)
78

89
FULL_REPO_CONFIGS = [
910
IntegrationTestRepoConfig(online_store_creator=MySQLOnlineStoreCreator),
11+
IntegrationTestRepoConfig(online_store_creator=BatchWriteMySQLOnlineStoreCreator),
1012
]

sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,31 @@ def create_online_store(self) -> Dict[str, str]:
3131

3232
def teardown(self):
3333
self.container.stop()
34+
35+
36+
class BatchWriteMySQLOnlineStoreCreator(OnlineStoreCreator):
37+
def __init__(self, project_name: str, **kwargs):
38+
super().__init__(project_name)
39+
self.container = (
40+
MySqlContainer("mysql:latest", platform="linux/amd64")
41+
.with_exposed_ports(3306)
42+
.with_env("MYSQL_USER", "root")
43+
.with_env("MYSQL_PASSWORD", "test")
44+
.with_env("MYSQL_DATABASE", "test")
45+
)
46+
47+
def create_online_store(self) -> Dict[str, str]:
48+
self.container.start()
49+
exposed_port = self.container.get_exposed_port(3306)
50+
return {
51+
"type": "mysql",
52+
"user": "root",
53+
"password": "test",
54+
"database": "test",
55+
"port": exposed_port,
56+
"batch_write": "True",
57+
"bacth_size": "1000",
58+
}
59+
60+
def teardown(self):
61+
self.container.stop()

0 commit comments

Comments
 (0)