Skip to content

Commit

Permalink
fix: Clustering compaction ignoring deltalogs (#39132)
Browse files Browse the repository at this point in the history
See also: #39131

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Jan 10, 2025
1 parent 4355b48 commit b8fca4f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
for _, segment := range inputSegments {
segmentClone := &datapb.CompactionSegmentBinlogs{
SegmentID: segment.SegmentID,
// only FieldBinlogs needed
// only FieldBinlogs and deltalogs needed
Deltalogs: segment.Deltalogs,
FieldBinlogs: segment.FieldBinlogs,
}
future := t.mappingPool.Submit(func() (any, error) {
Expand Down
18 changes: 18 additions & 0 deletions internal/datanode/compaction/clustering_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() {
}

func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
dblobs, err := getInt64DeltaBlobs(
1,
[]int64{100},
[]uint64{tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second), 0)},
)
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"1"}).
Return([][]byte{dblobs.GetValue()}, nil).Once()

schema := genCollectionSchema()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{})
Expand All @@ -193,6 +202,9 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
{
SegmentID: segmentID,
FieldBinlogs: lo.Values(fBinlogs),
Deltalogs: []*datapb.FieldBinlog{
{Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "1"}}},
},
},
}

Expand Down Expand Up @@ -236,6 +248,12 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
s.Equal(2, totalBinlogNum/len(schema.GetFields()))
s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum)

s.EqualValues(10239,
lo.SumBy(compactionResult.GetSegments(), func(seg *datapb.CompactionSegment) int64 {
return seg.GetNumOfRows()
}),
)
}

func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() {
Expand Down

0 comments on commit b8fca4f

Please sign in to comment.