-
Notifications
You must be signed in to change notification settings - Fork 382
Description
Is there an existing issue for the same bug?
- I have checked the existing issues.
Version or Commit ID
main
Other environment information
Actual behavior and How to reproduce it
import sys
import os
import concurrent.futures
import pytest
import polars as pl
from common import common_values
from infinity.common import ConflictType, InfinityException, SparseVector
import infinity
from infinity.errors import ErrorCode
from common.utils import trace_expected_exceptions
import random
import json
import base64
import time
import infinity.index as index
current_dir = os.path.dirname(os.path.abspath(file))
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from infinity_http import infinity_http
@pytest.fixture(scope="class")
def http(request):
return request.config.getoption("--http")
@pytest.fixture(scope="class")
def setup_class(request, http):
if http:
uri = common_values.TEST_LOCAL_HOST
request.cls.infinity_obj = infinity_http()
else:
uri = common_values.TEST_LOCAL_HOST
request.cls.infinity_obj = infinity.connect(uri)
request.cls.uri = uri
yield
request.cls.infinity_obj.disconnect()
@pytest.mark.usefixtures("setup_class")
@pytest.mark.usefixtures("suffix")
class TestSnapshot:
"""Comprehensive snapshot testing for Infinity database"""
def create_comprehensive_table(self, table_name: str):
"""Create a table with all data types and indexes"""
table_schema = {
"id": {"type": "int", "constraints": ["primary key"]},
"name": {"type": "varchar"},
"age": {"type": "int8"},
"salary": {"type": "float64"},
"is_active": {"type": "bool"},
"vector_col": {"type": "vector,128,float"},
"tensor_col": {"type": "tensor,64,float"},
"sparse_col": {"type": "sparse,30000,float,int16"}
}
# Create table
db_obj = self.infinity_obj.get_database("default_db")
table_obj = db_obj.create_table(table_name, table_schema, ConflictType.Ignore)
# Create indexes
self._create_indexes(table_obj)
return table_obj
def _create_indexes(self, table_obj):
"""Create various types of indexes"""
# Primary key is already created
# Secondary indexes
table_obj.create_index("idx_name", index.IndexInfo("name", index.IndexType.Secondary), ConflictType.Ignore)
table_obj.create_index("idx_age_salary", index.IndexInfo("age", index.IndexType.Secondary), ConflictType.Ignore)
# Vector indexes
table_obj.create_index("idx_vector_hnsw", index.IndexInfo("vector_col", index.IndexType.Hnsw, {"metric": "cosine", "m": "16", "ef_construction": "200"}), ConflictType.Ignore)
# Full-text search index
table_obj.create_index("idx_name_fts", index.IndexInfo("name", index.IndexType.FullText), ConflictType.Ignore)
# BMP index
table_obj.create_index("idx_vector_bmp", index.IndexInfo("sparse_col", index.IndexType.BMP, {"block_size": "16", "compress_type": "compress"}), ConflictType.Ignore)
# # EMVB index (for tensors)
# table_obj.create_index("idx_tensor_emvb", index.IndexInfo("tensor_col", index.IndexType.EMVB, {"pq_subspace_num": "32", "pq_subspace_bits": "8"}), ConflictType.Ignore)
# IVF index
table_obj.create_index("idx_vector_ivf", index.IndexInfo("vector_col", index.IndexType.IVF, {"metric": "l2"}), ConflictType.Ignore)
def insert_comprehensive_data(self, table_obj, num_rows: int = 1000):
"""Insert comprehensive test data"""
data = []
for i in range(num_rows):
# Create sparse vector data (only 5-10 non-zero elements out of 30000)
num_non_zero = random.randint(5, 10)
indices = sorted(random.sample(range(30000), num_non_zero))
values = [random.uniform(-1, 1) for _ in range(num_non_zero)]
# Create sparse vector using SparseVector class
sparse_data = SparseVector(indices, values)
row = {
"id": i,
"name": f"user_{i}",
"age": random.randint(18, 80),
"salary": random.uniform(30000, 150000),
"is_active": random.choice([True, False]),
"vector_col": [random.uniform(-1, 1) for _ in range(128)],
"tensor_col": [random.uniform(-1, 1) for _ in range(64)],
"sparse_col": sparse_data,
}
data.append(row)
# Insert in batches
batch_size = 100
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
table_obj.insert(batch)
def test_snapshot_large_table(self, suffix):
"""Test snapshot with large table"""
table_name = f"test_large_dataset{suffix}"
snapshot_name = f"large_snapshot{suffix}"
db_obj = self.infinity_obj.get_database("default_db")
# Drop original table
db_obj.drop_table(table_name, ConflictType.Ignore)
# Create table and insert large amount of data
table_obj = self.create_comprehensive_table(table_name)
self.insert_comprehensive_data(table_obj, 10000) # 100K rows
running the script in pythonsdk
Expected behavior
No response
Additional information
No response