Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1546,13 +1546,19 @@ def load_balance(
check_status(status)

@retry_on_rpc_failure()
def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int:
def compact(
self,
collection_name: str,
is_clustering: Optional[bool] = False,
timeout: Optional[float] = None,
**kwargs,
) -> int:
request = Prepare.describe_collection_request(collection_name)
rf = self._stub.DescribeCollection.future(request, timeout=timeout)
response = rf.result()
check_status(response.status)

req = Prepare.manual_compaction(response.collectionID)
req = Prepare.manual_compaction(response.collectionID, is_clustering)
future = self._stub.ManualCompaction.future(req, timeout=timeout)
response = future.result()
check_status(response.status)
Expand Down
6 changes: 5 additions & 1 deletion pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,16 @@ def load_balance_request(
)

@classmethod
def manual_compaction(cls, collection_id: int):
def manual_compaction(cls, collection_id: int, is_clustering: bool):
if collection_id is None or not isinstance(collection_id, int):
raise ParamError(message=f"collection_id value {collection_id} is illegal")

if is_clustering is None or not isinstance(is_clustering, bool):
raise ParamError(message=f"is_clustering value {is_clustering} is illegal")

request = milvus_types.ManualCompactionRequest()
request.collectionID = collection_id
request.majorCompaction = is_clustering

return request

Expand Down
20 changes: 16 additions & 4 deletions pymilvus/client/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,13 +1044,16 @@ def load_balance(
**kwargs,
)

def compact(self, collection_name, timeout=None, **kwargs) -> int:
def compact(self, collection_name, is_clustering=False, timeout=None, **kwargs) -> int:
"""
Do compaction for the collection.

:param collection_name: The collection name to compact
:type collection_name: str

:param is_clustering: trigger clustering compaction
:type is_clustering: bool

:param timeout: The timeout for this method, unit: second
:type timeout: int

Expand All @@ -1060,15 +1063,22 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int:
:raises MilvusException: If collection name not exist.
"""
with self._connection() as handler:
return handler.compact(collection_name, timeout=timeout, **kwargs)
return handler.compact(
collection_name, is_clustering=is_clustering, timeout=timeout, **kwargs
)

def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState:
def get_compaction_state(
self, compaction_id: int, is_clustering=False, timeout=None, **kwargs
) -> CompactionState:
"""
Get compaction states of a targeted compaction id

:param compaction_id: the id returned by compact
:type compaction_id: int

:param is_clustering: get clustering compaction
:type is_clustering: bool

:param timeout: The timeout for this method, unit: second
:type timeout: int

Expand All @@ -1079,7 +1089,9 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co
"""

with self._connection() as handler:
return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs)
return handler.get_compaction_state(
compaction_id, is_clustering=is_clustering, timeout=timeout, **kwargs
)

def wait_for_compaction_completed(
self, compaction_id: int, timeout=None, **kwargs
Expand Down
32 changes: 29 additions & 3 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1491,37 +1491,57 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs):
**copy_kwargs,
)

def compact(self, timeout: Optional[float] = None, **kwargs):
def compact(
self, is_clustering: Optional[bool] = False, timeout: Optional[float] = None, **kwargs
):
"""Compact merge the small segments in a collection

Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_clustering (``bool``, optional): Option to trigger clustering compaction.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs)
if is_clustering:
self.clustering_compaction_id = conn.compact(
self._name, is_clustering=is_clustering, timeout=timeout, **kwargs
)
else:
self.compaction_id = conn.compact(
self._name, is_clustering=is_clustering, timeout=timeout, **kwargs
)

def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState:
def get_compaction_state(
self, timeout: Optional[float] = None, is_clustering: Optional[bool] = False, **kwargs
) -> CompactionState:
"""Get the current compaction state

Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_clustering (``bool``, optional): Option to get clustering compaction state.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_clustering:
return conn.get_compaction_state(
self.clustering_compaction_id, timeout=timeout, **kwargs
)
return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs)

def wait_for_compaction_completed(
self,
timeout: Optional[float] = None,
is_clustering: Optional[bool] = False,
**kwargs,
) -> CompactionState:
"""Block until the current collection's compaction completed
Expand All @@ -1531,10 +1551,16 @@ def wait_for_compaction_completed(
for the RPC. When timeout is set to None, client waits until server response
or error occur.

is_clustering (``bool``, optional): Option to get clustering compaction state.

Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_clustering:
return conn.wait_for_compaction_completed(
self.clustering_compaction_id, timeout=timeout, **kwargs
)
return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs)

def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans:
Expand Down