A python SDK for OceanBase Multimodal Store (Vector Store / Full Text Search / JSON Table), based on SQLAlchemy, compatible with Milvus API.
- git clone this repo, then install with:
poetry install- install with pip:
pip install pyobvector==0.2.18You can build document locally with sphinx:
mkdir build
make htmlFor detailed release notes and changelog, see RELEASE_NOTES.md.
pyobvector supports two modes:
Milvus compatible mode: You can use theMilvusLikeClientclass to use vector storage in a way similar to the Milvus APISQLAlchemy hybrid mode: You can use the vector storage function provided by theObVecClientclass and execute the relational database statement with the SQLAlchemy library. In this mode, you can regardpyobvectoras an extension of SQLAlchemy.
Refer to tests/test_milvus_like_client.py for more examples.
A simple workflow to perform ANN search with OceanBase Vector Store:
- setup a client:
from pyobvector import *
client = MilvusLikeClient(uri="127.0.0.1:2881", user="test@test")- create a collection with vector index:
test_collection_name = "ann_test"
# define the schema of collection with optional partitions
range_part = ObRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
schema = client.create_schema(partitions=range_part)
# define field schema of collection
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=3)
schema.add_field(field_name="meta", datatype=DataType.JSON, nullable=True)
# define index parameters
idx_params = self.client.prepare_index_params()
idx_params.add_index(
field_name='embedding',
index_type=VecIndexType.HNSW,
index_name='vidx',
metric_type="L2",
params={"M": 16, "efConstruction": 256},
)
# create collection
client.create_collection(
collection_name=test_collection_name,
schema=schema,
index_params=idx_params,
)- insert data to your collection:
# prepare
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
# insert data
client.insert(collection_name=test_collection_name, data=data1)- do ann search:
res = client.search(collection_name=test_collection_name, data=[0,0,0], anns_field='embedding', limit=5, output_fields=['id'])
# For example, the result will be:
# [{'id': 112}, {'id': 111}, {'id': 10}, {'id': 11}, {'id': 12}]- setup a client:
from pyobvector import *
from sqlalchemy import Column, Integer, JSON
from sqlalchemy import func
client = ObVecClient(uri="127.0.0.1:2881", user="test@test")- create a partitioned table with vector index:
# create partitioned table
range_part = ObRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
cols = [
Column('id', Integer, primary_key=True, autoincrement=False),
Column('embedding', VECTOR(3)),
Column('meta', JSON)
]
client.create_table(test_collection_name, columns=cols, partitions=range_part)
# create vector index
client.create_index(
test_collection_name,
is_vec_index=True,
index_name='vidx',
column_names=['embedding'],
vidx_params='distance=l2, type=hnsw, lib=vsag',
)- insert data to your collection:
# insert data
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
client.insert(test_collection_name, data=data1)- do ann search:
# perform ann search with basic column selection
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_column_names=['id'] # Legacy parameter
)
# For example, the result will be:
# [(112,), (111,), (10,), (11,), (12,)]
# perform ann search with SQLAlchemy expressions (recommended)
from sqlalchemy import Table, text, func
table = Table(test_collection_name, client.metadata_obj, autoload_with=client.engine)
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_columns=[
table.c.id,
table.c.meta,
(table.c.id + 1000).label('id_plus_1000'),
text("JSON_EXTRACT(meta, '$.key') as extracted_key")
]
)
# For example, the result will be:
# [(112, '{"key": "value"}', 1112, 'value'), ...]
# perform ann search with distance threshold (filter results by distance)
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
with_dist=True,
topk=10,
output_column_names=['id'],
distance_threshold=0.5 # Only return results where distance <= 0.5
)
# Only returns results with distance <= 0.5
# For example, the result will be:
# [(10, 0.0), (11, 0.0), ...] # Only includes results with distance <= 0.5The ann_search method supports flexible output column selection through the output_columns parameter:
-
output_columns(recommended): Accepts SQLAlchemy Column objects, expressions, or a mix of both- Column objects:
table.c.id,table.c.name - Expressions:
(table.c.age + 10).label('age_plus_10') - JSON queries:
text("JSON_EXTRACT(meta, '$.key') as extracted_key") - String functions:
func.concat(table.c.name, ' (', table.c.age, ')').label('name_age')
- Column objects:
-
output_column_names(legacy): Accepts list of column name strings- Example:
['id', 'name', 'meta']
- Example:
-
Parameter Priority:
output_columnstakes precedence overoutput_column_nameswhen both are provided -
distance_threshold(optional): Filter results by distance threshold- Type:
Optional[float] - Only returns results where
distance <= threshold - Example:
distance_threshold=0.5returns only results with distance <= 0.5 - Use case: Quality control for similarity search, only return highly similar results
- Type:
-
If you want to use pure
SQLAlchemyAPI withOceanBasedialect, you can just get anSQLAlchemy.engineviaclient.engine. The engine can also be created as following:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)- Async engine is also supported:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)- For further usage in pure
SQLAlchemymode, please refer to SQLAlchemy