Skip to content

Commit afbd420

Browse files
authored
feat: [cherry-pick] Support clustering compaction (#2220)
#2015 #2219 --------- Signed-off-by: wayblink <[email protected]>
1 parent a441516 commit afbd420

File tree

4 files changed

+58
-10
lines changed

4 files changed

+58
-10
lines changed

pymilvus/client/grpc_handler.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,13 +1546,19 @@ def load_balance(
15461546
check_status(status)
15471547

15481548
@retry_on_rpc_failure()
1549-
def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int:
1549+
def compact(
1550+
self,
1551+
collection_name: str,
1552+
is_clustering: Optional[bool] = False,
1553+
timeout: Optional[float] = None,
1554+
**kwargs,
1555+
) -> int:
15501556
request = Prepare.describe_collection_request(collection_name)
15511557
rf = self._stub.DescribeCollection.future(request, timeout=timeout)
15521558
response = rf.result()
15531559
check_status(response.status)
15541560

1555-
req = Prepare.manual_compaction(response.collectionID)
1561+
req = Prepare.manual_compaction(response.collectionID, is_clustering)
15561562
future = self._stub.ManualCompaction.future(req, timeout=timeout)
15571563
response = future.result()
15581564
check_status(response.status)

pymilvus/client/prepare.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -962,12 +962,16 @@ def load_balance_request(
962962
)
963963

964964
@classmethod
965-
def manual_compaction(cls, collection_id: int):
965+
def manual_compaction(cls, collection_id: int, is_clustering: bool):
966966
if collection_id is None or not isinstance(collection_id, int):
967967
raise ParamError(message=f"collection_id value {collection_id} is illegal")
968968

969+
if is_clustering is None or not isinstance(is_clustering, bool):
970+
raise ParamError(message=f"is_clustering value {is_clustering} is illegal")
971+
969972
request = milvus_types.ManualCompactionRequest()
970973
request.collectionID = collection_id
974+
request.majorCompaction = is_clustering
971975

972976
return request
973977

pymilvus/client/stub.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,13 +1044,16 @@ def load_balance(
10441044
**kwargs,
10451045
)
10461046

1047-
def compact(self, collection_name, timeout=None, **kwargs) -> int:
1047+
def compact(self, collection_name, is_clustering=False, timeout=None, **kwargs) -> int:
10481048
"""
10491049
Do compaction for the collection.
10501050
10511051
:param collection_name: The collection name to compact
10521052
:type collection_name: str
10531053
1054+
:param is_clustering: trigger clustering compaction
1055+
:type is_clustering: bool
1056+
10541057
:param timeout: The timeout for this method, unit: second
10551058
:type timeout: int
10561059
@@ -1060,15 +1063,22 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int:
10601063
:raises MilvusException: If collection name not exist.
10611064
"""
10621065
with self._connection() as handler:
1063-
return handler.compact(collection_name, timeout=timeout, **kwargs)
1066+
return handler.compact(
1067+
collection_name, is_clustering=is_clustering, timeout=timeout, **kwargs
1068+
)
10641069

1065-
def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState:
1070+
def get_compaction_state(
1071+
self, compaction_id: int, is_clustering=False, timeout=None, **kwargs
1072+
) -> CompactionState:
10661073
"""
10671074
Get compaction states of a targeted compaction id
10681075
10691076
:param compaction_id: the id returned by compact
10701077
:type compaction_id: int
10711078
1079+
:param is_clustering: get clustering compaction
1080+
:type is_clustering: bool
1081+
10721082
:param timeout: The timeout for this method, unit: second
10731083
:type timeout: int
10741084
@@ -1079,7 +1089,9 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co
10791089
"""
10801090

10811091
with self._connection() as handler:
1082-
return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs)
1092+
return handler.get_compaction_state(
1093+
compaction_id, is_clustering=is_clustering, timeout=timeout, **kwargs
1094+
)
10831095

10841096
def wait_for_compaction_completed(
10851097
self, compaction_id: int, timeout=None, **kwargs

pymilvus/orm/collection.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,37 +1491,57 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs):
14911491
**copy_kwargs,
14921492
)
14931493

1494-
def compact(self, timeout: Optional[float] = None, **kwargs):
1494+
def compact(
1495+
self, is_clustering: Optional[bool] = False, timeout: Optional[float] = None, **kwargs
1496+
):
14951497
"""Compact merge the small segments in a collection
14961498
14971499
Args:
14981500
timeout (``float``, optional): An optional duration of time in seconds to allow
14991501
for the RPC. When timeout is set to None, client waits until server response
15001502
or error occur.
15011503
1504+
is_clustering (``bool``, optional): Option to trigger clustering compaction.
1505+
15021506
Raises:
15031507
MilvusException: If anything goes wrong.
15041508
"""
15051509
conn = self._get_connection()
1506-
self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs)
1510+
if is_clustering:
1511+
self.clustering_compaction_id = conn.compact(
1512+
self._name, is_clustering=is_clustering, timeout=timeout, **kwargs
1513+
)
1514+
else:
1515+
self.compaction_id = conn.compact(
1516+
self._name, is_clustering=is_clustering, timeout=timeout, **kwargs
1517+
)
15071518

1508-
def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState:
1519+
def get_compaction_state(
1520+
self, timeout: Optional[float] = None, is_clustering: Optional[bool] = False, **kwargs
1521+
) -> CompactionState:
15091522
"""Get the current compaction state
15101523
15111524
Args:
15121525
timeout (``float``, optional): An optional duration of time in seconds to allow
15131526
for the RPC. When timeout is set to None, client waits until server response
15141527
or error occur.
15151528
1529+
is_clustering (``bool``, optional): Option to get clustering compaction state.
1530+
15161531
Raises:
15171532
MilvusException: If anything goes wrong.
15181533
"""
15191534
conn = self._get_connection()
1535+
if is_clustering:
1536+
return conn.get_compaction_state(
1537+
self.clustering_compaction_id, timeout=timeout, **kwargs
1538+
)
15201539
return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs)
15211540

15221541
def wait_for_compaction_completed(
15231542
self,
15241543
timeout: Optional[float] = None,
1544+
is_clustering: Optional[bool] = False,
15251545
**kwargs,
15261546
) -> CompactionState:
15271547
"""Block until the current collection's compaction completed
@@ -1531,10 +1551,16 @@ def wait_for_compaction_completed(
15311551
for the RPC. When timeout is set to None, client waits until server response
15321552
or error occur.
15331553
1554+
is_clustering (``bool``, optional): Option to get clustering compaction state.
1555+
15341556
Raises:
15351557
MilvusException: If anything goes wrong.
15361558
"""
15371559
conn = self._get_connection()
1560+
if is_clustering:
1561+
return conn.wait_for_compaction_completed(
1562+
self.clustering_compaction_id, timeout=timeout, **kwargs
1563+
)
15381564
return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs)
15391565

15401566
def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans:

0 commit comments

Comments
 (0)