Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Add trigger interval config for auto balance #39154

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ queryCoord:
checkSegmentInterval: 1000
checkChannelInterval: 1000
checkBalanceInterval: 3000
autoBalanceInterval: 3000 # the interval for triggerauto balance
checkIndexInterval: 10000
channelTaskTimeout: 60000 # 1 minute
segmentTaskTimeout: 120000 # 2 minute
Expand Down
59 changes: 39 additions & 20 deletions internal/querycoordv2/checkers/balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type BalanceChecker struct {
scheduler task.Scheduler
targetMgr meta.TargetManagerInterface
getBalancerFunc GetBalancerFunc

// record auto balance ts
autoBalanceTs time.Time
}

func NewBalanceChecker(meta *meta.Meta,
Expand Down Expand Up @@ -80,22 +83,12 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b
return metaExist && targetExist
}

func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int64 {
ids := b.meta.GetAll(ctx)

// all replicas belonging to loading collection will be skipped
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
collection := b.meta.GetCollection(ctx, cid)
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
})
sort.Slice(loadedCollections, func(i, j int) bool {
return loadedCollections[i] < loadedCollections[j]
})

if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
// balance collections influenced by stopping nodes
stoppingReplicas := make([]int64, 0)
for _, cid := range loadedCollections {
for _, cid := range ids {
// if target and meta isn't ready, skip balance this collection
if !b.readyToCheck(ctx, cid) {
continue
Expand All @@ -113,12 +106,27 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
}
}

return nil
}

func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 {
// 1. no stopping balance and auto balance is disabled, return empty collections for balance
// 2. when balancer isn't active, skip auto balance
if !Params.QueryCoordCfg.AutoBalance.GetAsBool() || !b.IsActive() {
return nil
}

ids := b.meta.GetAll(ctx)

// all replicas belonging to loading collection will be skipped
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
collection := b.meta.GetCollection(ctx, cid)
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
})
sort.Slice(loadedCollections, func(i, j int) bool {
return loadedCollections[i] < loadedCollections[j]
})

// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
Expand Down Expand Up @@ -173,16 +181,27 @@ func (b *BalanceChecker) balanceReplicas(ctx context.Context, replicaIDs []int64
}

func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
ret := make([]task.Task, 0)

replicasToBalance := b.replicasToBalance(ctx)
segmentPlans, channelPlans := b.balanceReplicas(ctx, replicasToBalance)
// iterate all collection to find a collection to balance
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
replicasToBalance := b.replicasToBalance(ctx)
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
var segmentPlans []balance.SegmentAssignPlan
var channelPlans []balance.ChannelAssignPlan
stoppingReplicas := b.getReplicaForStoppingBalance(ctx)
if len(stoppingReplicas) > 0 {
// check for stopping balance first
segmentPlans, channelPlans = b.balanceReplicas(ctx, stoppingReplicas)
} else {
// then check for auto balance
if time.Since(b.autoBalanceTs) > paramtable.Get().QueryCoordCfg.AutoBalanceInterval.GetAsDuration(time.Millisecond) {
b.autoBalanceTs = time.Now()
replicasToBalance := b.getReplicaForNormalBalance(ctx)
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
// iterate all collection to find a collection to balance
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
replicasToBalance := b.getReplicaForNormalBalance(ctx)
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
}
}
}

ret := make([]task.Task, 0)
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
task.SetPriority(task.TaskPriorityLow, tasks...)
task.SetReason("segment unbalanced", tasks...)
Expand Down
88 changes: 80 additions & 8 deletions internal/querycoordv2/checkers/balance_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package checkers
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
Expand Down Expand Up @@ -144,22 +146,22 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
return 0
})
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Empty(replicasToBalance)
segPlans, _ := suite.checker.balanceReplicas(ctx, replicasToBalance)
suite.Empty(segPlans)

// test enable auto balance
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
idsToBalance := []int64{int64(replicaID1)}
replicasToBalance = suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
// next round
idsToBalance = []int64{int64(replicaID2)}
replicasToBalance = suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
// final round
replicasToBalance = suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
suite.Empty(replicasToBalance)
}

Expand Down Expand Up @@ -221,7 +223,7 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
return 1
})
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Len(replicasToBalance, 1)
}

Expand Down Expand Up @@ -289,7 +291,7 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {

// test stopping balance
idsToBalance := []int64{int64(replicaID1), int64(replicaID2)}
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)

// checker check
Expand Down Expand Up @@ -347,7 +349,7 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
// test normal balance when one collection has unready target
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true)
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false)
replicasToBalance := suite.checker.replicasToBalance(ctx)
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
suite.Len(replicasToBalance, 0)

// test stopping balance with target not ready
Expand All @@ -364,10 +366,80 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())

idsToBalance := []int64{int64(replicaID1)}
replicasToBalance = suite.checker.replicasToBalance(ctx)
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
}

func (suite *BalanceCheckerTestSuite) TestAutoBalanceInterval() {
ctx := context.Background()
// set up nodes info
nodeID1, nodeID2 := 1, 2
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID1),
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(nodeID2),
Address: "localhost",
Hostname: "localhost",
}))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))

segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
{
ID: 2,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)

// set collections meta
cid1, replicaID1, partitionID1 := 1, 1, 1
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
collection1.Status = querypb.LoadStatus_Loaded
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))

funcCallCounter := atomic.NewInt64(0)
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, r *meta.Replica) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
funcCallCounter.Inc()
return nil, nil
})

// first auto balance should be triggered
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))

// second auto balance won't be triggered due to autoBalanceInterval == 3s
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))

// set autoBalanceInterval == 1, sleep 1s, auto balance should be triggered
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key, "1000")
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key)
time.Sleep(1 * time.Second)
suite.checker.Check(ctx)
suite.Equal(funcCallCounter.Load(), int64(1))
}

func TestBalanceCheckerSuite(t *testing.T) {
suite.Run(t, new(BalanceCheckerTestSuite))
}
11 changes: 11 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,7 @@ type queryCoordConfig struct {
SegmentCheckInterval ParamItem `refreshable:"true"`
ChannelCheckInterval ParamItem `refreshable:"true"`
BalanceCheckInterval ParamItem `refreshable:"true"`
AutoBalanceInterval ParamItem `refreshable:"true"`
IndexCheckInterval ParamItem `refreshable:"true"`
ChannelTaskTimeout ParamItem `refreshable:"true"`
SegmentTaskTimeout ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -2446,6 +2447,16 @@ If this parameter is set false, Milvus simply searches the growing segments with
Export: false,
}
p.ClusterLevelLoadResourceGroups.Init(base.mgr)

p.AutoBalanceInterval = ParamItem{
Key: "queryCoord.autoBalanceInterval",
Version: "2.5.3",
DefaultValue: "3000",
Doc: "the interval for triggerauto balance",
PanicIfEmpty: true,
Export: true,
}
p.AutoBalanceInterval.Init(base.mgr)
}

// /////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func TestComponentParam(t *testing.T) {
assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0)

assert.Equal(t, 10, Params.CollectionChannelCountFactor.GetAsInt())
assert.Equal(t, 3000, Params.AutoBalanceInterval.GetAsInt())
})

t.Run("test queryNodeConfig", func(t *testing.T) {
Expand Down
Loading