Skip to content

Commit 9b3e5c2

Browse files
authored
Merge releases/2.22 into mainline (#1275)
2 parents 439e097 + 721289d commit 9b3e5c2

File tree

17 files changed

+291
-185
lines changed

17 files changed

+291
-185
lines changed

src/marqo/core/index_management/vespa_application_package.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,52 @@ def _cleanup_container_config(self):
8181
"""
8282
Components config needs to be in sync with the components in the jar files. This method cleans up the
8383
custom components config, so we can always start fresh. This assumes that the container section of the
84-
services.xml file only has `node` config and empty `document-api` and `search` elements initially. Please
85-
note that any manual config change in container section will be overwritten.
84+
services.xml file only has empty `document-api`, `document-processing`, `search` elements, and the preserved
85+
elements initially.
86+
87+
Please note that:
88+
- Any manual config change to non-preserved elements in the container section will be overwritten during Vespa
89+
bootstrapping (when a new version of Marqo is deployed).
90+
- Manual rollback will replace the entire services.xml file with the previous version. This means the changes
91+
to the preserved elements will also be reverted.
92+
93+
Preserved elements:
94+
- <nodes>...</nodes>: container nodes configuration
95+
- <config name="com.yahoo.document.restapi.document-operation-executor">...</config>: doc operation executors
8696
"""
8797
container_element = self._ensure_only_one('container')
8898
for child in container_element.findall('*'):
99+
if self._should_preserve_container_element(child):
100+
continue
101+
89102
if child.tag in ['document-api', 'document-processing', 'search']:
103+
# clear the children of these elements to add config
90104
child.clear()
91-
elif child.tag != 'nodes':
105+
else:
106+
# clean up other components
92107
container_element.remove(child)
93108

109+
def _should_preserve_container_element(self, element):
110+
"""
111+
Determines if a container element should be preserved during cleanup.
112+
113+
Args:
114+
element: XML element to check
115+
116+
Returns:
117+
bool: True if element should be preserved, False if it should be removed
118+
"""
119+
# Always preserve nodes element
120+
if element.tag == 'nodes':
121+
return True
122+
123+
# Preserve document-operation-executor config
124+
if (element.tag == 'config' and
125+
element.get('name') == 'com.yahoo.document.restapi.document-operation-executor'):
126+
return True
127+
128+
return False
129+
94130
def _config_search(self):
95131
search_elements = self._ensure_only_one('container/search')
96132
chain = ET.SubElement(search_elements, 'chain')

src/marqo/core/monitoring/statsd_middleware.py

Lines changed: 29 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,18 @@
88

99
from marqo.core.monitoring.statsd_client import StatsDClient
1010

11-
_SEARCH_RE = re.compile(r"/indexes/[^/]+/search$")
1211
_DOCS_RE = re.compile(r"/indexes/[^/]+/documents$")
1312
_DOCUMENT_ID_RE = re.compile(r"(/documents/)[^/]+")
1413

1514

1615
class StatsDMiddleware(BaseHTTPMiddleware):
1716
"""
18-
Emits the former Reverse-Proxy (RP) CloudWatch metrics via StatsD.
17+
Emits high-cardinality generic metrics.
1918
20-
Metrics implemented (parity with RP):
21-
• requests.completed counter • status_code
22-
also • path, method, status_code
23-
• marqo_processing_time timing (no tags)
24-
• search_processing_time timing (no tags)
25-
• index_processing_time timing (no tags)
26-
• x-count-success / -failure / -error counter • method
19+
• request.duration_ms |ms path,method,status_code
20+
• batch.success |c path,method,status_code
21+
• batch.failure |c path,method,status_code
22+
• batch.error |c path,method,status_code
2723
"""
2824

2925
def __init__(self, app, statsd_client: StatsDClient):
@@ -38,44 +34,30 @@ async def dispatch(self, request: Request, call_next):
3834
response: Response = await call_next(request)
3935
duration_ms = int((time.perf_counter() - t_start) * 1000)
4036

41-
status = response.status_code
42-
status_tag = f"{status // 100}XX" # 2XX / 3XX / 4XX / 5XX
43-
44-
# --- requests.completed (status-only) ------------------------
45-
self.statsd.increment("requests.completed", tags={"status_code": status_tag})
46-
47-
# --- requests.completed (path/method/status variant) ---------
48-
sanitized_path = self._sanitize_path(request.url.path)
49-
self.statsd.increment(
50-
"requests.completed",
51-
tags={
52-
"path": sanitized_path,
53-
"method": request.method,
54-
"status_code": status_tag,
55-
},
56-
)
57-
58-
# --- marqo_processing_time -----------------------------
59-
self.statsd.timing("marqo_processing_time", duration_ms)
60-
61-
# --- search_processing_time ----------------------------
62-
if _SEARCH_RE.fullmatch(request.url.path):
63-
self.statsd.timing("search_processing_time", duration_ms)
64-
65-
# --- index_processing_time and x-count-* counters -------
66-
if _DOCS_RE.fullmatch(request.url.path):
67-
if request.method in {"POST", "PATCH"}:
68-
self.statsd.timing("index_processing_time", duration_ms)
69-
70-
if request.method in {"POST", "PATCH", "GET"}:
71-
lowered: Dict[str, str] = {k.lower(): v for k, v in response.headers.items()}
72-
for hdr in ("x-count-success", "x-count-failure", "x-count-error"):
73-
if hdr in lowered:
74-
try:
75-
self.statsd.increment(hdr, int(lowered[hdr]), tags={"method": request.method})
76-
except ValueError:
77-
# Header value wasn’t an int – ignore
78-
pass
37+
path_tag = self._sanitize_path(request.url.path)
38+
tags = {
39+
"path": path_tag,
40+
"method": request.method,
41+
"status_code": str(response.status_code),
42+
}
43+
44+
# latency
45+
self.statsd.timing("request.duration_ms", duration_ms, tags=tags)
46+
47+
# batch outcome counters
48+
if _DOCS_RE.fullmatch(request.url.path) and request.method in {"POST", "PATCH", "GET"}:
49+
lowered: Dict[str, str] = {k.lower(): v for k, v in response.headers.items()}
50+
for hdr, metric in (
51+
("x-count-success", "batch.success"),
52+
("x-count-failure", "batch.failure"),
53+
("x-count-error", "batch.error"),
54+
):
55+
if hdr in lowered:
56+
try:
57+
self.statsd.increment(metric, int(lowered[hdr]), tags=tags)
58+
except ValueError:
59+
# Header value wasn’t an int – ignore
60+
pass
7961

8062
return response
8163

src/marqo/core/search/recommender.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ def __init__(self, vespa_client: VespaClient, index_management: IndexManagement,
2626
def get_doc_vectors_from_ids(self,
2727
index_name: str,
2828
documents: Union[List[str], Dict[str, float]],
29-
tensor_fields: Optional[List[str]] = None,
30-
concurrency: Optional[int] = None) -> Dict[str, List[List[float]]]:
29+
tensor_fields: Optional[List[str]] = None) -> Dict[str, List[List[float]]]:
3130
"""
3231
This method gets documents from Vespa using their IDs, removes any unnecessary data, checks for
3332
lack of vectors, then returns a list of document vectors. Can be used internally (in recommend)
@@ -37,7 +36,6 @@ def get_doc_vectors_from_ids(self,
3736
index_name: Name of the index to search
3837
documents: A list of document IDs or a dictionary where the keys are document IDs and the values are weights
3938
tensor_fields: List of tensor fields to use for recommendation (can include text, image, audio, and video fields)
40-
concurrency: Max number of concurrent requests to use when fetching documents by batch
4139
4240
Returns:
4341
A dictionary mapping document IDs to lists of vector embeddings. This is flattened to 1 list per document
@@ -90,8 +88,7 @@ def get_doc_vectors_from_ids(self,
9088
config.Config(self.vespa_client, inference=self.inference),
9189
index_name,
9290
document_ids,
93-
tensor_fields=tensor_fields,
94-
concurrency=concurrency
91+
tensor_fields=tensor_fields
9592
)
9693

9794
# Check that all documents were found

src/marqo/tensor_search/models/search.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ class SearchContextTensor(BaseModel):
6969
class SearchContextDocumentsParameters(BaseModel):
7070
tensor_fields: Optional[List[str]] = Field(None, alias='tensorFields')
7171
exclude_input_documents: bool = Field(True, alias='excludeInputDocuments')
72-
concurrency: Optional[int] = None
7372

7473
@validator('tensor_fields', pre=True, always=True)
7574
def check_tensor_fields_not_empty(cls, v):

src/marqo/tensor_search/tensor_search.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -943,8 +943,7 @@ def get_query_vectors_from_jobs(
943943
context_doc_vectors = config.recommender.get_doc_vectors_from_ids(
944944
index_name=q.index.name,
945945
documents=context_documents.ids,
946-
tensor_fields=context_documents.parameters.tensor_fields,
947-
concurrency=context_documents.parameters.concurrency
946+
tensor_fields=context_documents.parameters.tensor_fields
948947
)
949948

950949
# Update weights and vectors list
@@ -1354,7 +1353,6 @@ def get_doc_vectors_per_tensor_field_by_ids(
13541353
index_name: str,
13551354
document_ids: List[str],
13561355
tensor_fields: Optional[List[str]] = None,
1357-
concurrency: Optional[int] = None
13581356
) -> Dict[str, Dict[str, List[List[float]]]]:
13591357
"""
13601358
Get only the embeddings for documents by their IDs.
@@ -1383,8 +1381,7 @@ def get_doc_vectors_per_tensor_field_by_ids(
13831381
batch_get = config.vespa_client.get_batch(
13841382
document_ids,
13851383
marqo_index.schema_name,
1386-
fields=fields_to_retrieve,
1387-
concurrency=concurrency
1384+
fields=fields_to_retrieve
13881385
)
13891386

13901387
vespa_index = vespa_index_factory(marqo_index)

src/marqo/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "2.22.0"
1+
__version__ = "2.22.1"
22

33
def get_version() -> str:
44
return f"{__version__}"

tests/integ_tests/core/index_management/test_index_management.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,52 @@ def test_bootstrap_vespa_should_override_and_backup_configs(self):
219219
os.path.join(self._test_dir, 'existing_vespa_app', *file)
220220
)
221221

222+
def test_bootstrap_vespa_should_preserve_document_operation_executor_config(self):
223+
"""
224+
Integration test to verify that document-operation-executor config is preserved
225+
during the bootstrap process and not overridden by Marqo OS bootstrapping logic.
226+
"""
227+
# Deploy initial app package
228+
self._deploy_initial_app_package()
229+
230+
# First bootstrap to get Marqo configured
231+
self.index_management.bootstrap_vespa()
232+
233+
# Now manually add document-operation-executor config to simulate Cloud team configuration
234+
app = self.index_management._get_vespa_application()
235+
services_xml_content = app._store.read_text_file('services.xml')
236+
237+
# Insert document-operation-executor config into the container section
238+
# Find a position after search config but before nodes
239+
services_xml_with_config = services_xml_content.replace(
240+
'</search>',
241+
'''</search>
242+
<config name="com.yahoo.document.restapi.document-operation-executor">
243+
<maxThrottled>0</maxThrottled>
244+
</config>'''
245+
)
246+
247+
app._store.save_file(services_xml_with_config, 'services.xml')
248+
app._deploy()
249+
250+
# Bootstrap Marqo again - this should preserve the document-operation-executor config
251+
self.index_management.bootstrap_vespa()
252+
253+
# Verify the config is still present after bootstrap
254+
bootstrapped_app = str(self.vespa_client.download_application())
255+
services_xml_path = os.path.join(bootstrapped_app, 'services.xml')
256+
257+
with open(services_xml_path, 'r') as f:
258+
final_services_xml = f.read()
259+
260+
# Assert that the document-operation-executor config is preserved
261+
self.assertIn('config name="com.yahoo.document.restapi.document-operation-executor"', final_services_xml)
262+
self.assertIn('<maxThrottled>0</maxThrottled>', final_services_xml)
263+
264+
# Also verify that Marqo components were properly added
265+
self.assertIn('ai.marqo.search.HybridSearcher', final_services_xml)
266+
self.assertIn('ai.marqo.index.IndexSettingRequestHandler', final_services_xml)
267+
222268
def test_rollback_should_succeed(self):
223269
self._deploy_existing_app_package()
224270
self.index_management.bootstrap_vespa()

tests/integ_tests/core/monitoring/test_metrics_udp.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,12 @@ def test_metrics_roundtrip(self):
144144
self.client.get("/indexes/foo/documents/abc123") # redaction
145145

146146
patterns = [
147-
r"marqo_processing_time:\d+\|ms",
148-
r"requests\.completed:1\|c\|#status_code:\dXX",
149-
r"search_processing_time:\d+\|ms",
150-
r"index_processing_time:\d+\|ms",
151-
r"x-count-success:\d+\|c",
152-
r"x-count-failure:\d+\|c",
153-
r"x-count-error:\d+\|c",
154-
r"requests\.completed:1\|c\|#path:/indexes/foo/documents(?:/<document_id>)?,method:(?:GET|POST),status_code:\dXX",
147+
r"request\.duration_ms:\d+\|ms\|#path:/indexes/foo/search,method:GET,status_code:200",
148+
r"request\.duration_ms:\d+\|ms\|#path:/indexes/foo/documents,method:POST,status_code:200",
149+
r"request\.duration_ms:\d+\|ms\|#path:/indexes/foo/documents/<document_id>,method:GET,status_code:200",
150+
r"batch\.success:1\|c\|#path:/indexes/foo/documents,method:POST,status_code:200",
151+
r"batch\.failure:0\|c\|#path:/indexes/foo/documents,method:POST,status_code:200",
152+
r"batch\.error:0\|c\|#path:/indexes/foo/documents,method:POST,status_code:200",
155153
]
156154

157155
# Wait until the six packets we assert on have arrived

tests/integ_tests/tensor_search/search/test_search_with_context.py

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -933,59 +933,6 @@ def mock_get_batch(*args, **kwargs):
933933
# But we should still have embeddings
934934
self.assertGreater(len(doc_embeddings), 0)
935935

936-
def test_search_with_context_documents_concurrency_parameter_controls_vespa_concurrency(self):
937-
"""Test that context.documents.parameters.concurrency is passed to vespa_client.get_batch."""
938-
index = self.structured_default_text_index
939-
940-
# Add documents to the index
941-
docs = [
942-
{"_id": "doc1", "text_field_1": "Test document 1"},
943-
{"_id": "doc2", "text_field_1": "Test document 2"}
944-
]
945-
946-
self.add_documents(
947-
config=self.config,
948-
add_docs_params=AddDocsParams(
949-
index_name=index.name,
950-
docs=docs,
951-
tensor_fields=None
952-
)
953-
)
954-
955-
# Mock vespa_client.get_batch to capture the concurrency parameter
956-
original_get_batch = self.config.vespa_client.get_batch
957-
captured_concurrency = []
958-
959-
def mock_get_batch(*args, **kwargs):
960-
captured_concurrency.append(kwargs.get('concurrency'))
961-
return original_get_batch(*args, **kwargs)
962-
963-
with mock.patch.object(self.config.vespa_client, 'get_batch', side_effect=mock_get_batch):
964-
# Create search context with specific concurrency
965-
search_context = SearchContext(
966-
documents=SearchContextDocuments(
967-
ids={"doc1": 1.0, "doc2": 1.0},
968-
parameters=SearchContextDocumentsParameters(
969-
tensorFields=["text_field_1"],
970-
excludeInputDocuments=False,
971-
concurrency=5 # Test with concurrency=5
972-
)
973-
)
974-
)
975-
976-
# Perform search with context documents
977-
tensor_search.search(
978-
config=self.config,
979-
index_name=index.name,
980-
text=None,
981-
context=search_context,
982-
result_count=5
983-
)
984-
985-
# Verify that get_batch was called with the correct concurrency parameter
986-
self.assertEqual(len(captured_concurrency), 1, "get_batch should have been called")
987-
self.assertEqual(captured_concurrency[0], 5, "get_batch should be called with concurrency=5")
988-
989936
def test_search_with_context_documents_max_search_context_docs_env_var(self):
990937
"""Test that MARQO_MAX_SEARCH_CONTEXT_DOCS environment variable controls the limit for context documents."""
991938

tests/integ_tests/tensor_search/test_api_query_logging_integration.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,7 @@ def test_slow_query_logging_all_fields_sanitised_excluding_secret_fields(self):
170170
},
171171
"parameters": {
172172
"tensorFields": ["text_field_1"],
173-
"excludeInputDocuments": False,
174-
"concurrency": 5,
173+
"excludeInputDocuments": False
175174
}
176175
}
177176
},
@@ -251,8 +250,7 @@ def test_slow_query_logging_all_fields_sanitised_excluding_secret_fields(self):
251250
},
252251
"parameters": {
253252
"tensorFields": ["text_field_1"],
254-
"excludeInputDocuments": False,
255-
"concurrency": 5,
253+
"excludeInputDocuments": False
256254
}
257255
}
258256
},

0 commit comments

Comments
 (0)