Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/reference/online-stores/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,28 @@ online_store:

The full set of configuration options is available in [MySQLOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.mysql_online_store.MySQLOnlineStoreConfig).

## Batch write mode
By default, the MySQL online store performs row-by-row insert and commit for each feature record. While this ensures per-record atomicity, it can lead to significant overhead on write operations — especially on distributed SQL databases (for example, TiDB, which is MySQL-compatible and uses a consensus protocol).

To improve writing performance, you can enable batch write mode by setting `batch_write` to `true` and `batch_size`, which executes multiple insert queries in batches and commits them together per batch instead of committing each record individually.

{% code title="feature_store.yaml" %}
```yaml
project: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: mysql
host: DB_HOST
port: DB_PORT
database: DB_NAME
user: DB_USERNAME
password: DB_PASSWORD
batch_write: true
batch_size: 100
```
{% endcode %}

## Functionality Matrix

The set of functionality supported by online stores is described in detail [here](overview.md#functionality).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ online_store:
user: test # mysql user, default to test
password: test # mysql password, default to test
database: feast # mysql database, default to feast
batch_write: false # supporting batch write and commit per batch
batch_size: 100 # batch size, default to 100

```

#### Apply the feature definitions in `example.py`
Expand Down
118 changes: 94 additions & 24 deletions sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class MySQLOnlineStoreConfig(FeastConfigBaseModel):
password: Optional[StrictStr] = None
database: Optional[StrictStr] = None
port: Optional[int] = None
batch_write: Optional[bool] = False
batch_size: Optional[int] = None


class MySQLOnlineStore(OnlineStore):
Expand All @@ -51,7 +53,7 @@ def _get_conn(self, config: RepoConfig) -> Connection:
password=online_store_config.password or "test",
database=online_store_config.database or "feast",
port=online_store_config.port or 3306,
autocommit=True,
autocommit=(not online_store_config.batch_write),
)
return self._conn

Expand All @@ -69,29 +71,97 @@ def online_write_batch(

project = config.project

for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=3,
).hex()
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
created_ts = to_naive_utc(created_ts)

for feature_name, val in values.items():
self.write_to_table(
created_ts,
cur,
entity_key_bin,
feature_name,
project,
table,
timestamp,
val,
)
conn.commit()
if progress:
progress(1)
batch_write = config.online_store.batch_write
if not batch_write:
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=3,
).hex()
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
created_ts = to_naive_utc(created_ts)

for feature_name, val in values.items():
self.write_to_table(
created_ts,
cur,
entity_key_bin,
feature_name,
project,
table,
timestamp,
val,
)
conn.commit()
if progress:
progress(1)
else:
batch_size = config.online_store.bacth_size
if not batch_size or batch_size < 2:
raise ValueError("Batch size must be at least 2")
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=2,
).hex()
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
created_ts = to_naive_utc(created_ts)

for feature_name, val in values.items():
serialized_val = val.SerializeToString()
insert_values.append(
(
entity_key_bin,
feature_name,
serialized_val,
timestamp,
created_ts,
)
)

if len(insert_values) >= batch_size:
try:
self._execute_batch(cur, project, table, insert_values)
conn.commit()
if progress:
progress(len(insert_values))
except Exception as e:
conn.rollback()
raise e
insert_values.clear()

if insert_values:
try:
self._execute_batch(cur, project, table, insert_values)
conn.commit()
if progress:
progress(len(insert_values))
except Exception as e:
conn.rollback()
raise e

def _execute_batch(self, cur, project, table, insert_values):
sql = f"""
INSERT INTO {_table_id(project, table)}
(entity_key, feature_name, value, event_ts, created_ts)
values (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
value = VALUES(value),
event_ts = VALUES(event_ts),
created_ts = VALUES(created_ts);
"""
try:
cur.executemany(sql, insert_values)
except Exception as e:
# Log SQL info for debugging without leaking sensitive data
first_sample = insert_values[0] if insert_values else None
raise RuntimeError(
f"Failed to execute batch insert into table '{_table_id(project, table)}' "
f"(rows={len(insert_values)}, sample={first_sample}): {e}"
) from e

@staticmethod
def write_to_table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.universal.online_store.mysql import (
BatchWriteMySQLOnlineStoreCreator,
MySQLOnlineStoreCreator,
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(online_store_creator=MySQLOnlineStoreCreator),
IntegrationTestRepoConfig(online_store_creator=BatchWriteMySQLOnlineStoreCreator),
]
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,31 @@ def create_online_store(self) -> Dict[str, str]:

def teardown(self):
self.container.stop()


class BatchWriteMySQLOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = (
MySqlContainer("mysql:latest", platform="linux/amd64")
.with_exposed_ports(3306)
.with_env("MYSQL_USER", "root")
.with_env("MYSQL_PASSWORD", "test")
.with_env("MYSQL_DATABASE", "test")
)

def create_online_store(self) -> Dict[str, str]:
self.container.start()
exposed_port = self.container.get_exposed_port(3306)
return {
"type": "mysql",
"user": "root",
"password": "test",
"database": "test",
"port": exposed_port,
"batch_write": "True",
"bacth_size": "1000",
}

def teardown(self):
self.container.stop()
Loading