diff --git a/pymilvus/orm/collection.py b/pymilvus/orm/collection.py index 6d1d357f4..9af48d97d 100644 --- a/pymilvus/orm/collection.py +++ b/pymilvus/orm/collection.py @@ -12,6 +12,7 @@ import copy import json +import time from typing import Dict, List, Optional, Union import pandas as pd @@ -20,6 +21,7 @@ from pymilvus.client.types import ( CompactionPlans, CompactionState, + LoadState, Replica, cmp_consistency_level, get_consistency_level, @@ -29,6 +31,7 @@ DataTypeNotMatchException, ExceptionsMessage, IndexNotExistException, + MilvusException, PartitionAlreadyExistException, PartitionNotExistException, SchemaNotReadyException, @@ -1360,3 +1363,41 @@ def get_replicas(self, timeout: Optional[float] = None, **kwargs) -> Replica: def describe(self, timeout: Optional[float] = None): conn = self._get_connection() return conn.describe_collection(self.name, timeout=timeout) + + def optimize(self, timeout: Optional[float] = None, **kwargs): + """Optimize the server to gain the best performance. + + Be careful, by default this method may hang very very long. + The collection should be INDEXED before optimize. + """ + + timeout = timeout or 12 * 60 * 60 # set default timeout to 12hrs + + start_time = time.time() + conn = self._get_connection() + + # check if indexed + if not self.has_index(): + raise MilvusException(message="Please index before calling optimize") + + self.flush(timeout=timeout) + index = self.index() + + def has_pending_rows() -> bool: + info = conn.get_index_build_progress(self.name, index.index_name, timeout=timeout) + return info.get("pending_index_rows", -1) > 0 + + while(True) : + if not has_pending_rows(): + self.compact() + self.wait_for_compaction_completed() + if not has_pending_rows(): + break + + if time.time() - start_time > timeout: + raise MilvusException(message=f"Wait for optimize timeout in {timeout}s") + + time.sleep(5) + + if conn.get_load_state(self.name) not in (LoadState.NotExist, LoadState.NotLoad): + self.load(_refresh=True)