Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 126 additions & 22 deletions sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig):
metric_type: Optional[str] = "COSINE"
embedding_dim: Optional[int] = 128
vector_enabled: Optional[bool] = True
text_search_enabled: Optional[bool] = False
nlist: Optional[int] = 128
username: Optional[StrictStr] = ""
password: Optional[StrictStr] = ""
Expand Down Expand Up @@ -492,7 +493,19 @@ def retrieve_online_documents_v2(
Optional[Dict[str, ValueProto]],
]
]:
assert embedding is not None, "Key Word Search not yet implemented for Milvus"
"""
Retrieve documents using vector similarity search or keyword search in Milvus.
Args:
config: Feast configuration object
table: FeatureView object as the table to search
requested_features: List of requested features to retrieve
embedding: Query embedding to search for (optional)
top_k: Number of items to return
distance_metric: Distance metric to use (optional)
query_string: The query string to search for using keyword search (optional)
Returns:
List of tuples containing the event timestamp, entity key, and feature values
"""
entity_name_feast_primitive_type_map = {
k.name: k.dtype for k in table.entity_columns
}
Expand All @@ -502,10 +515,8 @@ def retrieve_online_documents_v2(
if not config.online_store.vector_enabled:
raise ValueError("Vector search is not enabled in the online store config")

search_params = {
"metric_type": distance_metric or config.online_store.metric_type,
"params": {"nprobe": 10},
}
if embedding is None and query_string is None:
raise ValueError("Either embedding or query_string must be provided")

composite_key_name = _get_composite_key_name(table)

Expand All @@ -520,25 +531,118 @@ def retrieve_online_documents_v2(
), (
f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema"
)
# Note we choose the first vector field as the field to search on. Not ideal but it's something.

# Find the vector search field if we need it
ann_search_field = None
for field in collection["fields"]:
if (
field["type"] in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]
and field["name"] in output_fields
):
ann_search_field = field["name"]
break
if embedding is not None:
for field in collection["fields"]:
if (
field["type"] in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]
and field["name"] in output_fields
):
ann_search_field = field["name"]
break

self.client.load_collection(collection_name)
results = self.client.search(
collection_name=collection_name,
data=[embedding],
anns_field=ann_search_field,
search_params=search_params,
limit=top_k,
output_fields=output_fields,
)

if (
embedding is not None
and query_string is not None
and config.online_store.vector_enabled
):
string_field_list = [
f.name
for f in table.features
if isinstance(f.dtype, PrimitiveFeastType)
and f.dtype.to_value_type() == ValueType.STRING
]

if not string_field_list:
raise ValueError(
"No string fields found in the feature view for text search in hybrid mode"
)

# Create a filter expression for text search
filter_expressions = []
for field in string_field_list:
if field in output_fields:
filter_expressions.append(f"{field} LIKE '%{query_string}%'")

# Combine filter expressions with OR
filter_expr = " OR ".join(filter_expressions) if filter_expressions else ""

# Vector search with text filter
search_params = {
"metric_type": distance_metric or config.online_store.metric_type,
"params": {"nprobe": 10},
}

# For hybrid search, use filter parameter instead of expr
results = self.client.search(
collection_name=collection_name,
data=[embedding],
anns_field=ann_search_field,
search_params=search_params,
limit=top_k,
output_fields=output_fields,
filter=filter_expr if filter_expr else None,
)

elif embedding is not None and config.online_store.vector_enabled:
# Vector search only
search_params = {
"metric_type": distance_metric or config.online_store.metric_type,
"params": {"nprobe": 10},
}

results = self.client.search(
collection_name=collection_name,
data=[embedding],
anns_field=ann_search_field,
search_params=search_params,
limit=top_k,
output_fields=output_fields,
)

elif query_string is not None:
string_field_list = [
f.name
for f in table.features
if isinstance(f.dtype, PrimitiveFeastType)
and f.dtype.to_value_type() == ValueType.STRING
]

if not string_field_list:
raise ValueError(
"No string fields found in the feature view for text search"
)

filter_expressions = []
for field in string_field_list:
if field in output_fields:
filter_expressions.append(f"{field} LIKE '%{query_string}%'")

filter_expr = " OR ".join(filter_expressions)

if not filter_expr:
raise ValueError(
"No text fields found in requested features for search"
)

query_results = self.client.query(
collection_name=collection_name,
filter=filter_expr,
output_fields=output_fields,
limit=top_k,
)

results = [
[{"entity": entity, "distance": -1.0}] for entity in query_results
]
else:
raise ValueError(
"Either vector_enabled must be True for embedding search or query_string must be provided for keyword search"
)

result_list = []
for hits in results:
Expand All @@ -559,7 +663,7 @@ def retrieve_online_documents_v2(
# entity_key_proto = None
if field in ["created_ts", "event_ts"]:
res_ts = datetime.fromtimestamp(field_value / 1e6)
elif field == ann_search_field:
elif field == ann_search_field and embedding is not None:
serialized_embedding = _serialize_vector_to_float_list(
embedding
)
Expand Down
175 changes: 175 additions & 0 deletions sdk/python/tests/unit/online_store/test_online_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -1484,3 +1484,178 @@ def test_milvus_native_from_feast_data() -> None:

# Clean up the collection
client.drop_collection(collection_name=COLLECTION_NAME)


def test_milvus_keyword_search() -> None:
"""
Test retrieving documents from the Milvus online store using keyword search.
"""
random.seed(42)
n = 10 # number of samples
vector_length = 10
runner = CliRunner()
with runner.local_repo(
example_repo_py=get_example_repo("example_rag_feature_repo.py"),
offline_store="file",
online_store="milvus",
apply=False,
teardown=False,
) as store:
from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource
from feast.types import Array, Float32, Int64, String, UnixTimestamp

rag_documents_source = FileSource(
path="data/embedded_documents.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)

item = Entity(
name="item_id",
join_keys=["item_id"],
value_type=ValueType.INT64,
)
author = Entity(
name="author_id",
join_keys=["author_id"],
value_type=ValueType.STRING,
)

document_embeddings = FeatureView(
name="text_documents",
entities=[item, author],
schema=[
Field(
name="vector",
dtype=Array(Float32),
vector_index=True,
vector_search_metric="COSINE",
),
Field(name="item_id", dtype=Int64),
Field(name="author_id", dtype=String),
Field(name="content", dtype=String),
Field(name="title", dtype=String),
Field(name="created_timestamp", dtype=UnixTimestamp),
Field(name="event_timestamp", dtype=UnixTimestamp),
],
source=rag_documents_source,
ttl=timedelta(hours=24),
)

store.apply([rag_documents_source, item, document_embeddings])

# Write some data with specific text content for keyword search
document_embeddings_fv = store.get_feature_view(name="text_documents")
provider = store._get_provider()

contents = [
"Feast is an open source feature store for machine learning",
"Feature stores solve the problem of coordinating features for training and serving",
"Milvus is a vector database that can be used with Feast",
"Keyword search uses BM25 algorithm for relevance ranking",
"Vector search uses embeddings for semantic similarity",
"Python is a popular programming language for machine learning",
"Feast supports multiple storage backends for online and offline use cases",
"Online stores are used for low-latency feature serving",
"Offline stores are used for batch feature retrieval during training",
"Feast enables data scientists to define, manage, and share features",
]

titles = [
"Introduction to Feast",
"Feature Store Benefits",
"Using Milvus with Feast",
"Keyword Search Fundamentals",
"Vector Search Overview",
"Python for ML",
"Feast Storage Options",
"Online Serving with Feast",
"Offline Training Support",
"Feast for Data Scientists",
]

item_keys = [
EntityKeyProto(
join_keys=["item_id", "author_id"],
entity_values=[
ValueProto(int64_val=i),
ValueProto(string_val=f"author_{i}"),
],
)
for i in range(n)
]
data = []
for i, item_key in enumerate(item_keys):
data.append(
(
item_key,
{
"vector": ValueProto(
float_list_val=FloatListProto(
val=np.random.random(vector_length)
)
),
"content": ValueProto(string_val=contents[i]),
"title": ValueProto(string_val=titles[i]),
},
_utc_now(),
_utc_now(),
)
)

provider.online_write_batch(
config=store.config,
table=document_embeddings_fv,
data=data,
progress=None,
)

# Test keyword search for "Milvus"
result_milvus = store.retrieve_online_documents_v2(
features=[
"text_documents:content",
"text_documents:title",
],
query_string="Milvus",
top_k=3,
).to_dict()

# Verify that documents containing "Milvus" are returned
assert len(result_milvus["content"]) > 0
assert any("Milvus" in content for content in result_milvus["content"])

# Test keyword search for "machine learning"
result_ml = store.retrieve_online_documents_v2(
features=[
"text_documents:content",
"text_documents:title",
],
query_string="machine learning",
top_k=3,
).to_dict()

# Verify that documents containing "machine learning" are returned
assert len(result_ml["content"]) > 0
assert any(
"machine learning" in content.lower() for content in result_ml["content"]
)

# Test hybrid search (vector + keyword)
query_embedding = np.random.random(vector_length).tolist()
result_hybrid = store.retrieve_online_documents_v2(
features=[
"text_documents:content",
"text_documents:title",
"text_documents:vector",
],
query=query_embedding,
query_string="Feast",
top_k=3,
).to_dict()

# Verify hybrid search results
assert len(result_hybrid["content"]) > 0
assert any("Feast" in content for content in result_hybrid["content"])
assert len(result_hybrid["vector"]) > 0
Loading