Skip to content
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
cdfcd64
Add typeahead endpoints
farshidz Aug 13, 2025
f857323
Add API tests
farshidz Aug 13, 2025
356eed5
Add stats endpoint
farshidz Aug 13, 2025
f668ca9
Implement typeahead
farshidz Aug 13, 2025
d45eb57
Working mvp
farshidz Aug 15, 2025
b05939f
Rename endpoints
farshidz Aug 15, 2025
fddb170
Update tests
farshidz Aug 15, 2025
ccc1ded
Improve error handling
farshidz Aug 15, 2025
4ba127e
Fix delete endpoint
farshidz Aug 15, 2025
138058f
Update query profiles
farshidz Aug 15, 2025
1e54727
Improve fuzzy search
farshidz Aug 15, 2025
6df5be6
Fix yql and improve the schema
farshidz Aug 18, 2025
8e4a812
Use query hash as ID to avoid duplicate queries
farshidz Aug 19, 2025
5be71da
Implement delete endpoint
farshidz Aug 19, 2025
da8fdd5
Minor improvements
farshidz Aug 19, 2025
94bb181
Store query as a string for bm25
farshidz Aug 19, 2025
cafdddb
Tokenize the query so we match different parts of the query
farshidz Aug 19, 2025
cde876b
Add rank: filter for suffixes
farshidz Aug 19, 2025
d6e5260
Rename API parameters
farshidz Aug 21, 2025
60c0ab6
Index words not suffixes
farshidz Aug 21, 2025
d552b4b
Rename rank to popularity
farshidz Aug 21, 2025
ab579fd
Add score modifiers
farshidz Aug 21, 2025
1cbcd57
Change relevance to _score in response
farshidz Aug 21, 2025
348674f
Rename query to suggestion in suggestions response
farshidz Aug 21, 2025
46a5961
Fix non-fuzzy query to be prefix
farshidz Aug 21, 2025
d12e0c2
Index all prefixes
farshidz Aug 21, 2025
188dc5c
Rename TypeaheadHandler to Typeahead
farshidz Aug 21, 2025
47304af
Fix bug in delete endpoint
farshidz Aug 21, 2025
ca371cc
Add pydantic v2 base classes
farshidz Aug 21, 2025
155d834
Refactor to use pydantic
farshidz Aug 21, 2025
deee5c0
More refactoring
farshidz Aug 21, 2025
75c4877
Add file missed before for models
farshidz Aug 21, 2025
9290e6f
Store typeahead schema name in MarqoIndex
farshidz Aug 21, 2025
7532bb4
Minor refactoring
farshidz Aug 21, 2025
f76496e
Fix issues related to typeahead schema name
farshidz Aug 21, 2025
31d7a26
Simplify
farshidz Aug 21, 2025
16c569c
Refactor create index
farshidz Aug 21, 2025
ebdce4b
Remove inline imports
farshidz Aug 21, 2025
25fb86d
Improve validation
farshidz Aug 21, 2025
f7d3db4
Add TODOs
farshidz Aug 21, 2025
948489d
Fix circular import
farshidz Aug 21, 2025
ec04d23
Use blake3 for hashing
farshidz Aug 21, 2025
834e11e
Remove duplicate models
farshidz Aug 21, 2025
2224d47
Add todo, update throttling config
farshidz Aug 21, 2025
a06bc24
Remove uninteded changes to marqo test base class
farshidz Aug 21, 2025
bd8d60d
add metadata fields for display and filtering
papa99do Aug 25, 2025
43c99a0
moved test_services_xml.py
papa99do Aug 26, 2025
0804351
Add test for text normalisation
papa99do Aug 26, 2025
62586da
Use Pydantic models for type validation
papa99do Aug 26, 2025
1972fe9
Use pydantic model for typeahead stats
papa99do Aug 26, 2025
27f4eac
Unit test index and query of typeahead
papa99do Aug 26, 2025
fc08159
add unit test for vespa_application_package change
papa99do Aug 27, 2025
36f830e
unit test typeahead_vespa_schema.py
papa99do Aug 27, 2025
bbf22b6
unit test index_management
papa99do Aug 27, 2025
cc76fdd
test other methods in typeahead.py
papa99do Aug 27, 2025
1ed2bbb
add unit test for api method and pydantic models
papa99do Aug 27, 2025
59a7d2c
Support getting a list of queries from typeahead schema
papa99do Aug 27, 2025
5e0d77b
batch size limit of 128
papa99do Aug 28, 2025
ab5f71c
Add integration test
papa99do Aug 28, 2025
4992de1
add integration test
papa99do Aug 28, 2025
91f6d91
fix the integration test
papa99do Aug 28, 2025
27ca83e
api tests
papa99do Aug 28, 2025
619d3c1
Merge branch 'mainline' into farshid/type-ahead
papa99do Aug 28, 2025
a6ab39a
fix unit test
papa99do Aug 28, 2025
a470de6
fix the tests, there's still one failing.
papa99do Aug 28, 2025
4ee4474
fix an integration test failing locally
papa99do Aug 29, 2025
cbc9076
bump the version
papa99do Aug 29, 2025
8ed078d
Create typeahead schema during Marqo bootstrapping.
papa99do Sep 1, 2025
fea2cee
Merge branch 'mainline' into farshid/type-ahead
papa99do Sep 1, 2025
d12c323
fix the unit test
papa99do Sep 2, 2025
7207677
fix the unit test
papa99do Sep 2, 2025
aef3751
address PR comments
papa99do Sep 2, 2025
e8f6fdf
Add version check
papa99do Sep 2, 2025
76e5e98
address review comments
papa99do Sep 2, 2025
6634e15
address review comments
papa99do Sep 2, 2025
1cd5abd
extract feature support check logic to an annotation
papa99do Sep 2, 2025
fc25342
address final comments
papa99do Sep 2, 2025
77598e0
fix unit tests
papa99do Sep 3, 2025
092c50e
support wildcard query
papa99do Sep 3, 2025
08e75fd
support empty query for typeahead
papa99do Sep 3, 2025
1210474
Merge branch 'mainline' into farshid/type-ahead
wanliAlex Sep 3, 2025
a673bed
Merge branch 'mainline' into farshid/type-ahead
papa99do Sep 3, 2025
011dc8c
Merge branch 'mainline' into farshid/type-ahead
papa99do Sep 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
opentelemetry-api==1.33.1
opentelemetry-sdk==1.33.1

cachetools==6.1.0
cachetools==6.1.0
blake3==1.0.5
14 changes: 13 additions & 1 deletion src/marqo/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,16 @@ class Config(StrictBaseModel.Config, ImmutableBaseModel.Config):


class MarqoBaseModelV2(pydantic.BaseModel):
model_config = ConfigDict(validate_by_name=True, validate_assignment=True)
model_config = ConfigDict(validate_by_name=True, validate_assignment=True)


class StrictBaseModelV2(MarqoBaseModelV2):
model_config = ConfigDict(extra='forbid')


class ImmutableBaseModelV2(MarqoBaseModelV2):
model_config = ConfigDict(frozen=True)


class ImmutableStrictBaseModelV2(StrictBaseModelV2, ImmutableBaseModelV2):
pass
2 changes: 2 additions & 0 deletions src/marqo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from marqo.core.inference.api import Inference, ModelManager
from marqo.core.monitoring.monitoring import Monitoring
from marqo.core.search.recommender import Recommender
from marqo.core.typeahead.typeahead import Typeahead
from marqo.logging import get_logger
from marqo.tensor_search import enums
from marqo.tensor_search import utils
Expand Down Expand Up @@ -47,6 +48,7 @@ def __init__(
self.document = Document(vespa_client, self.index_management, self.inference)
self.recommender = Recommender(vespa_client, self.index_management, self.inference)
self.embed = Embed(vespa_client, self.index_management, self.inference)
self.typeahead = Typeahead(vespa_client, self.index_management)

self.model_manager = model_manager

Expand Down
24 changes: 18 additions & 6 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from marqo.core.models.marqo_index import SemiStructuredMarqoIndex
from marqo.core.models.marqo_index_request import MarqoIndexRequest
from marqo.core.semi_structured_vespa_index.semi_structured_vespa_schema import SemiStructuredVespaSchema
from marqo.core.typeahead.typeahead_vespa_schema import TypeaheadVespaSchema
from marqo.core.vespa_index.vespa_schema import for_marqo_index_request as vespa_schema_factory
from marqo.tensor_search.models.index_settings import IndexSettings
from marqo.vespa.vespa_client import VespaClient
Expand Down Expand Up @@ -162,7 +163,8 @@ def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) ->
OperationConflictError: If another index creation/deletion operation is
in progress and the lock cannot be acquired
"""
index_to_create: List[Tuple[str, MarqoIndex]] = []
index_to_create: List[Tuple[str, str, MarqoIndex]] = []

for request in marqo_index_requests:
# set the default prefixes if not provided
if request.model.text_query_prefix is None:
Expand All @@ -171,13 +173,23 @@ def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) ->
request.model.text_chunk_prefix = request.model.get_default_text_chunk_prefix()

schema, marqo_index = vespa_schema_factory(request).generate_schema()
index_to_create.append((schema, marqo_index))
logger.debug(f'Creating index {str(request.name)} with schema:\n{schema}')
logger.debug(f'Creating index {request.name} with schema:\n{schema}')

typeahead_schema, updated_marqo_index = TypeaheadVespaSchema(marqo_index).generate_schema()
logger.debug(
f'Creating typeahead schema for index {request.name} with schema: '
f'{updated_marqo_index.typeahead_schema_name}'
)

index_to_create.append((schema, typeahead_schema, updated_marqo_index))

with self._vespa_deployment_lock():
self._get_vespa_application().batch_add_index_setting_and_schema(index_to_create)
vespa_app = self._get_vespa_application()

# Deploy schemas and index settings (this will deploy everything together)
vespa_app.batch_add_index_setting_and_schema(index_to_create)

return [index for _, index in index_to_create]
return [index for _, _, index in index_to_create]

def delete_index_by_name(self, index_name: str) -> None:
"""
Expand Down Expand Up @@ -231,7 +243,7 @@ def is_subset(dict_a, dict_b):

if (is_subset(marqo_index.tensor_field_map, existing_index.tensor_field_map) and
is_subset(marqo_index.field_map, existing_index.field_map) and
is_subset(marqo_index.name_to_string_array_field_map, existing_index.name_to_string_array_field_map)):
is_subset(marqo_index.name_to_string_array_field_map, existing_index.name_to_string_array_field_map)):
logger.debug(f'Another thread has updated the index {marqo_index.name} already.')
return

Expand Down
12 changes: 10 additions & 2 deletions src/marqo/core/index_management/vespa_application_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,15 +693,20 @@ def rollback(self, marqo_version: str) -> None:
self._store.save_file(new_backup.to_zip_stream().read(), self._BACKUP_FILE)
self._deploy()

def batch_add_index_setting_and_schema(self, indexes: List[Tuple[str, MarqoIndex]]) -> None:
for schema, index in indexes:
def batch_add_index_setting_and_schema(self, indexes: List[Tuple[str, str, MarqoIndex]]) -> None:
for schema, typeahead_schema, index in indexes:
if self.has_index(index.name):
raise IndexExistsError(f"Index {index.name} already exists")

# Add index settings and schema
self._index_setting_store.save_index_setting(index)
self._store.save_file(schema, 'schemas', f'{index.schema_name}.sd')
self._service_xml.add_schema(index.schema_name)

# Add typeahead schema if provided
self._store.save_file(typeahead_schema, 'schemas', f'{index.typeahead_schema_name}.sd')
self._service_xml.add_schema(index.typeahead_schema_name)

self._persist_index_settings()
self._store.save_file(self._service_xml.to_xml(), self._SERVICES_XML_FILE)
self._deploy()
Expand All @@ -714,6 +719,9 @@ def batch_delete_index_setting_and_schema(self, index_names: List[str]) -> None:
self._index_setting_store.delete_index_setting(index.name)
self._store.remove_file('schemas', f'{index.schema_name}.sd')
self._service_xml.remove_schema(index.schema_name)
if index.typeahead_schema_name is not None:
self._store.remove_file('schemas', f'{index.typeahead_schema_name}.sd')
self._service_xml.remove_schema(index.typeahead_schema_name)

self._add_schema_removal_override()
self._persist_index_settings()
Expand Down
1 change: 1 addition & 0 deletions src/marqo/core/models/marqo_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ class MarqoIndex(ImmutableBaseModel, ABC):
"""
name: str
schema_name: str
typeahead_schema_name: Optional[str] = None
type: IndexType # We need this so that we can deserialize the correct subclass
model: Model
normalize_embeddings: bool
Expand Down
129 changes: 129 additions & 0 deletions src/marqo/core/models/typeahead.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Pydantic models for typeahead API requests and responses."""

from typing import List, Optional, Dict

from pydantic import Field, field_validator

from marqo.base_model import ImmutableStrictBaseModelV2
from marqo.core.exceptions import InvalidArgumentError
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.utils import read_env_vars_and_defaults_ints


class TypeaheadRequest(ImmutableStrictBaseModelV2):
"""Request model for typeahead suggestions."""

q: str = Field(..., description="Partial user search input")
limit: int = Field(default=10, ge=0, description="Maximum number of suggestions to return")
fuzzy_edit_distance: int = Field(
default=2,
ge=0,
alias="fuzzyEditDistance",
description="Maximum edit distance for fuzzy matching"
)
min_fuzzy_match_length: int = Field(
default=3,
ge=0,
alias="minFuzzyMatchLength",
description="Minimum length to switch to fuzzy matching"
)
popularity_weight: Optional[float] = Field(
default=None,
alias="popularityWeight",
description="Weight for popularity score in ranking"
)
bm25_weight: Optional[float] = Field(
default=None,
alias="bm25Weight",
description="Weight for BM25 score in ranking"
)

@field_validator('q')
def validate_q(cls, v: str) -> str:
if not v or not v.strip():
raise ValueError("q is required")
return v.strip()


class TypeaheadSuggestion(ImmutableStrictBaseModelV2):
"""Individual suggestion in typeahead response."""

suggestion: str = Field(..., description="The suggested query text")
score: float = Field(..., alias="_score", description="Relevance score for the suggestion")
metadata: Optional[dict] = Field(default=None, description="Additional metadata")


class TypeaheadResponse(ImmutableStrictBaseModelV2):
"""Response model for typeahead suggestions."""

suggestions: List[TypeaheadSuggestion] = Field(..., description="List of suggestions")
processing_time_ms: Optional[float] = Field(
default=None,
alias="processingTimeMs",
description="Processing time in milliseconds"
)


class TypeaheadAddQueryRequest(ImmutableStrictBaseModelV2):
query: str = Field(..., description="User search query")
# Please note that popularity is not mandatory. This is to support multiple popularity values in metadata for future
popularity: float = Field(default=0.0, description="Popularity score")
metadata: Dict[str, float] = Field(default_factory=dict, description="Additional metadata")

@field_validator('query')
def validate_q(cls, v: str) -> str:
if not v or not v.strip():
raise ValueError("query is required")
return v.strip()


class TypeaheadIndexRequest(ImmutableStrictBaseModelV2):
queries: List[TypeaheadAddQueryRequest]

@field_validator('queries')
def validate_queries_batch_size(cls, queries):
query_count = len(queries)
max_queries = read_env_vars_and_defaults_ints(EnvVars.MARQO_MAX_DOCUMENTS_BATCH_SIZE)
if query_count == 0:
raise InvalidArgumentError("Received empty index queries request")
elif query_count > max_queries:
raise InvalidArgumentError(
f"Number of queries in index request ({query_count}) exceeds limit of {max_queries}. "
f"Please break up your request into smaller batches."
)
return queries


class TypeaheadIndexError(ImmutableStrictBaseModelV2):
query: Optional[str] = None
message: str
code: int = 400


class TypeaheadIndexResponse(ImmutableStrictBaseModelV2):
indexed: int = Field(..., description="Indexed queries")
errors: List[TypeaheadIndexError] = Field(default_factory=list, description="Index Errors")
processing_time_ms: float = Field(
alias="processingTimeMs",
description="Processing time in milliseconds"
)


class TypeaheadStatsResponse(ImmutableStrictBaseModelV2):
indexed_queries: int = Field(
alias="indexedQueries",
description="Number of indexed queries"
)


class TypeaheadQuery(ImmutableStrictBaseModelV2):
"""Represents a query from the typeahead schema."""
query: str = Field(..., description="The query string")
popularity: float = Field(..., description="Popularity score")
metadata: Dict[str, float] = Field(..., description="Additional metadata")
last_updated_at: Optional[int] = Field(None, alias="lastUpdatedAt", description="Last updated timestamp")


class TypeaheadGetQueriesResponse(ImmutableStrictBaseModelV2):
"""Response model for getting typeahead queries."""
queries: List[TypeaheadQuery] = Field(..., description="List of retrieved queries")
2 changes: 0 additions & 2 deletions src/marqo/core/search/hybrid_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from marqo.core.vespa_index.vespa_index import for_marqo_index as vespa_index_factory
from marqo.core.structured_vespa_index.common import RANK_PROFILE_HYBRID_CUSTOM_SEARCHER
from marqo.core.models.interpolation_method import InterpolationMethod
from marqo.tensor_search import index_meta_cache
from marqo.tensor_search import utils
from marqo.tensor_search.enums import (
SearchMethod
Expand All @@ -28,7 +27,6 @@
from marqo.tensor_search.telemetry import RequestMetricsStore
from marqo.tensor_search.tensor_search import run_vectorise_pipeline, gather_documents_from_response, logger
from marqo.vespa.exceptions import VespaStatusError
import semver
from marqo.tensor_search.models.sort_by_model import SortByModel
from marqo.tensor_search.models.relevance_cutoff_model import RelevanceCutoffModel

Expand Down
35 changes: 35 additions & 0 deletions src/marqo/core/typeahead/text_normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import unicodedata
from typing import List


def normalize_text(text: str) -> str:
"""
Normalize text by removing accents and converting to lowercase.

Args:
text: Input text to normalize

Returns:
Normalized text with accents removed and lowercased
"""
if not text:
return ""

# Normalize to NFKD form and remove accents
normalized = unicodedata.normalize('NFKD', text)
# Filter out combining characters (accents)
without_accents = ''.join(c for c in normalized if not unicodedata.combining(c))
# Convert to lowercase
return without_accents.lower()


def generate_prefixes(text: str) -> List[str]:
result = []
prefix = ""
for ch in text:
if ch.isspace():
prefix = "" # reset when hitting whitespace
else:
prefix += ch
result.append(prefix)
return result
Loading