From 21a4e2bc552289c24591097f7c92bec95746b285 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 3 Jan 2025 15:16:18 +0800 Subject: [PATCH] enhance: enable describe_replica api in milvus client Signed-off-by: Wei Liu --- examples/partition.py | 3 ++ pymilvus/client/grpc_handler.py | 31 +++++++++++++++ pymilvus/client/types.py | 52 +++++++++++++++++++++++++ pymilvus/milvus_client/milvus_client.py | 17 ++++++++ 4 files changed, 103 insertions(+) diff --git a/examples/partition.py b/examples/partition.py index 7466c034a..a21ac0c42 100644 --- a/examples/partition.py +++ b/examples/partition.py @@ -44,6 +44,9 @@ milvus_client.release_collection(collection_name) milvus_client.load_partitions(collection_name, partition_names =["p1", "p2"]) +replicas=milvus_client.describe_replica(collection_name) +print("replicas:", replicas) + print(fmt.format("Start search in partiton p1")) vectors_to_search = rng.random((1, dim)) result = milvus_client.search(collection_name, vectors_to_search, limit=3, output_fields=["pk", "a", "b"], partition_names = ["p1"]) diff --git a/pymilvus/client/grpc_handler.py b/pymilvus/client/grpc_handler.py index 258711125..15691f562 100644 --- a/pymilvus/client/grpc_handler.py +++ b/pymilvus/client/grpc_handler.py @@ -53,6 +53,7 @@ Plan, PrivilegeGroupInfo, Replica, + ReplicaInfo, ResourceGroupConfig, ResourceGroupInfo, RoleInfo, @@ -1747,6 +1748,36 @@ def get_replicas( return Replica(groups) + @retry_on_rpc_failure() + def describe_replica( + self, collection_name: str, timeout: Optional[float] = None, **kwargs + ) -> List[ReplicaInfo]: + collection_id = self.describe_collection(collection_name, timeout, **kwargs)[ + "collection_id" + ] + + req = Prepare.get_replicas(collection_id) + future = self._stub.GetReplicas.future(req, timeout=timeout) + response = future.result() + check_status(response.status) + + groups = [] + for replica in response.replicas: + shards = [ + Shard(s.dm_channel_name, s.node_ids, s.leaderID) for s in replica.shard_replicas + ] + groups.append( + ReplicaInfo( + replica.replicaID, + shards, + replica.node_ids, + replica.resource_group_name, + replica.num_outbound_node, + ) + ) + + return groups + @retry_on_rpc_failure() def do_bulk_insert( self, diff --git a/pymilvus/client/types.py b/pymilvus/client/types.py index 2916d41f8..16b2b417f 100644 --- a/pymilvus/client/types.py +++ b/pymilvus/client/types.py @@ -356,6 +356,11 @@ def shard_leader(self) -> int: class Group: + """ + This class represents replica info in orm format api, which is deprecated in milvus client api. + use `ReplicaInfo` instead. + """ + def __init__( self, group_id: int, @@ -400,6 +405,10 @@ def num_outbound_node(self): class Replica: """ + This class represents replica info list in orm format api, + which is deprecated in milvus client api. + use `List[ReplicaInfo]` instead. + Replica groups: - Group: , , , @@ -428,6 +437,49 @@ def groups(self): return self._groups +class ReplicaInfo: + def __init__( + self, + replica_id: int, + shards: List[str], + nodes: List[tuple], + resource_group: str, + num_outbound_node: dict, + ) -> None: + self._id = replica_id + self._shards = shards + self._nodes = tuple(nodes) + self._resource_group = resource_group + self._num_outbound_node = num_outbound_node + + def __repr__(self) -> str: + return ( + f"ReplicaInfo: , , " + f", , " + f"" + ) + + @property + def id(self): + return self._id + + @property + def group_nodes(self): + return self._nodes + + @property + def shards(self): + return self._shards + + @property + def resource_group(self): + return self._resource_group + + @property + def num_outbound_node(self): + return self._num_outbound_node + + class BulkInsertState: """enum states of bulk insert task""" diff --git a/pymilvus/milvus_client/milvus_client.py b/pymilvus/milvus_client/milvus_client.py index 8535036e6..0884ccedb 100644 --- a/pymilvus/milvus_client/milvus_client.py +++ b/pymilvus/milvus_client/milvus_client.py @@ -12,6 +12,7 @@ ExtraList, LoadState, OmitZeroDict, + ReplicaInfo, ResourceGroupConfig, construct_cost_extra, ) @@ -1696,3 +1697,19 @@ def transfer_replica( return conn.transfer_replica( source_group, target_group, collection_name, num_replicas, timeout ) + + def describe_replica( + self, collection_name: str, timeout: Optional[float] = None, **kwargs + ) -> List[ReplicaInfo]: + """Get the current loaded replica information + + Args: + collection_name (``str``): The name of the given collection. + timeout (``float``, optional): An optional duration of time in seconds to allow + for the RPC. When timeout is set to None, client waits until server response + or error occur. + Returns: + List[ReplicaInfo]: All the replica information. + """ + conn = self._get_connection() + return conn.describe_replica(collection_name, timeout=timeout, **kwargs)