Skip to content

Commit 3c35ea3

Browse files
committed
Support major compaction in ManualCompaction
Signed-off-by: wayblink <[email protected]>
1 parent 1cb5a19 commit 3c35ea3

File tree

4 files changed

+49
-10
lines changed

4 files changed

+49
-10
lines changed

pymilvus/client/grpc_handler.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,13 +1525,19 @@ def load_balance(
15251525
check_status(status)
15261526

15271527
@retry_on_rpc_failure()
1528-
def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int:
1528+
def compact(
1529+
self,
1530+
collection_name: str,
1531+
timeout: Optional[float] = None,
1532+
is_major: Optional[bool] = False,
1533+
**kwargs,
1534+
) -> int:
15291535
request = Prepare.describe_collection_request(collection_name)
15301536
rf = self._stub.DescribeCollection.future(request, timeout=timeout)
15311537
response = rf.result()
15321538
check_status(response.status)
15331539

1534-
req = Prepare.manual_compaction(response.collectionID)
1540+
req = Prepare.manual_compaction(response.collectionID, is_major)
15351541
future = self._stub.ManualCompaction.future(req, timeout=timeout)
15361542
response = future.result()
15371543
check_status(response.status)

pymilvus/client/prepare.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -961,12 +961,13 @@ def load_balance_request(
961961
)
962962

963963
@classmethod
964-
def manual_compaction(cls, collection_id: int):
964+
def manual_compaction(cls, collection_id: int, is_major: bool):
965965
if collection_id is None or not isinstance(collection_id, int):
966966
raise ParamError(message=f"collection_id value {collection_id} is illegal")
967967

968968
request = milvus_types.ManualCompactionRequest()
969969
request.collectionID = collection_id
970+
request.majorCompaction = is_major
970971

971972
return request
972973

pymilvus/client/stub.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,7 @@ def load_balance(
10441044
**kwargs,
10451045
)
10461046

1047-
def compact(self, collection_name, timeout=None, **kwargs) -> int:
1047+
def compact(self, collection_name, timeout=None, is_major=False, **kwargs) -> int:
10481048
"""
10491049
Do compaction for the collection.
10501050
@@ -1054,15 +1054,20 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int:
10541054
:param timeout: The timeout for this method, unit: second
10551055
:type timeout: int
10561056
1057+
:param is_major: trigger major compaction
1058+
:type is_major: bool
1059+
10571060
:return: the compaction ID
10581061
:rtype: int
10591062
10601063
:raises MilvusException: If collection name not exist.
10611064
"""
10621065
with self._connection() as handler:
1063-
return handler.compact(collection_name, timeout=timeout, **kwargs)
1066+
return handler.compact(collection_name, timeout=timeout, is_major=is_major, **kwargs)
10641067

1065-
def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState:
1068+
def get_compaction_state(
1069+
self, compaction_id: int, timeout=None, is_major=False, **kwargs
1070+
) -> CompactionState:
10661071
"""
10671072
Get compaction states of a targeted compaction id
10681073
@@ -1072,14 +1077,19 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co
10721077
:param timeout: The timeout for this method, unit: second
10731078
:type timeout: int
10741079
1080+
:param is_major: get major compaction
1081+
:type is_major: bool
1082+
10751083
:return: the state of the compaction
10761084
:rtype: CompactionState
10771085
10781086
:raises MilvusException: If compaction_id doesn't exist.
10791087
"""
10801088

10811089
with self._connection() as handler:
1082-
return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs)
1090+
return handler.get_compaction_state(
1091+
compaction_id, timeout=timeout, is_major=is_major, **kwargs
1092+
)
10831093

10841094
def wait_for_compaction_completed(
10851095
self, compaction_id: int, timeout=None, **kwargs

pymilvus/orm/collection.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,37 +1489,53 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs):
14891489
)
14901490
index.drop(timeout=timeout, **kwargs)
14911491

1492-
def compact(self, timeout: Optional[float] = None, **kwargs):
1492+
def compact(self, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs):
14931493
"""Compact merge the small segments in a collection
14941494
14951495
Args:
14961496
timeout (``float``, optional): An optional duration of time in seconds to allow
14971497
for the RPC. When timeout is set to None, client waits until server response
14981498
or error occur.
14991499
1500+
is_major (``bool``, optional): An optional setting to trigger major compaction.
1501+
15001502
Raises:
15011503
MilvusException: If anything goes wrong.
15021504
"""
15031505
conn = self._get_connection()
1504-
self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs)
1506+
if is_major:
1507+
self.major_compaction_id = conn.compact(
1508+
self._name, timeout=timeout, is_major=is_major, **kwargs
1509+
)
1510+
else:
1511+
self.compaction_id = conn.compact(
1512+
self._name, timeout=timeout, is_major=is_major, **kwargs
1513+
)
15051514

1506-
def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState:
1515+
def get_compaction_state(
1516+
self, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs
1517+
) -> CompactionState:
15071518
"""Get the current compaction state
15081519
15091520
Args:
15101521
timeout (``float``, optional): An optional duration of time in seconds to allow
15111522
for the RPC. When timeout is set to None, client waits until server response
15121523
or error occur.
15131524
1525+
is_major (``bool``, optional): An optional setting to get major compaction state.
1526+
15141527
Raises:
15151528
MilvusException: If anything goes wrong.
15161529
"""
15171530
conn = self._get_connection()
1531+
if is_major:
1532+
return conn.get_compaction_state(self.major_compaction_id, timeout=timeout, **kwargs)
15181533
return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs)
15191534

15201535
def wait_for_compaction_completed(
15211536
self,
15221537
timeout: Optional[float] = None,
1538+
is_major: Optional[bool] = False,
15231539
**kwargs,
15241540
) -> CompactionState:
15251541
"""Block until the current collection's compaction completed
@@ -1529,10 +1545,16 @@ def wait_for_compaction_completed(
15291545
for the RPC. When timeout is set to None, client waits until server response
15301546
or error occur.
15311547
1548+
is_major (``bool``, optional): An optional setting to get major compaction state.
1549+
15321550
Raises:
15331551
MilvusException: If anything goes wrong.
15341552
"""
15351553
conn = self._get_connection()
1554+
if is_major:
1555+
return conn.wait_for_compaction_completed(
1556+
self.major_compaction_id, timeout=timeout, **kwargs
1557+
)
15361558
return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs)
15371559

15381560
def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans:

0 commit comments

Comments
 (0)