Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the bug that TopN processing item leak #552

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Release Notes.

## 0.8.0

### Bug Fixes

- Fix the bug that TopN processing item leak. The item can not be updated but as a new item.

### Documentation

- Improve the description of the memory in observability doc.
Expand Down
9 changes: 8 additions & 1 deletion banyand/measure/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
type dataPointWithEntityValues struct {
*measurev1.DataPointValue
entityValues []*modelv1.TagValue
seriesID uint64
}

type topNStreamingProcessor struct {
Expand Down Expand Up @@ -272,6 +273,9 @@ func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
t.errCh = t.streamingFlow.Window(streaming.NewTumblingTimeWindows(t.interval, flushInterval)).
AllowedMaxWindows(int(t.topNSchema.GetLruSize())).
TopN(int(t.topNSchema.GetCountersNumber()),
streaming.WithKeyExtractor(func(record flow.StreamRecord) uint64 {
return record.Data().(flow.Data)[4].(uint64)
}),
streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[2].(int64)
}),
Expand Down Expand Up @@ -322,7 +326,7 @@ func (manager *topNProcessorManager) Close() error {
return err
}

func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalWriteRequest) {
func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, request *measurev1.InternalWriteRequest) {
go func() {
manager.RLock()
defer manager.RUnlock()
Expand All @@ -331,6 +335,7 @@ func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalW
processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
request.GetRequest().GetDataPoint(),
request.GetEntityValues(),
seriesID,
}, request.GetRequest().GetDataPoint().GetTimestamp())
}
}
Expand Down Expand Up @@ -436,6 +441,7 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
// groupBy tag values as v3
nil,
dpWithEvs.seriesID,
}
}, nil
}
Expand All @@ -458,6 +464,7 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
transform(groupLocator, func(locator partition.TagLocator) *modelv1.TagValue {
return extractTagValue(dpWithEvs.DataPointValue, locator)
}),
dpWithEvs.seriesID,
}
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamilies)

if stm.processorManager != nil {
stm.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{
stm.processorManager.onMeasureWrite(uint64(series.ID), &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: stm.GetSchema().Metadata,
DataPoint: req.DataPoint,
Expand Down
46 changes: 26 additions & 20 deletions pkg/flow/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
g "github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"

"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
flowTest "github.com/apache/skywalking-banyandb/pkg/test/flow"
Expand Down Expand Up @@ -146,14 +147,17 @@ var _ = g.Describe("Streaming", func() {
f = New("test", flowTest.NewSlice(input)).
Map(flow.UnaryFunc[any](func(_ context.Context, item interface{}) interface{} {
// groupBy
return flow.Data{item.(*record).service, int64(item.(*record).value)}
return flow.Data{item.(*record).service, int64(item.(*record).value), item.(*record).service + item.(*record).instance}
})).
Window(NewTumblingTimeWindows(15*time.Second, 15*time.Second)).
TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[1].(int64)
}), OrderBy(ASC), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
return record.Data().(flow.Data)[0].(string)
})).
TopN(3, WithKeyExtractor(func(record flow.StreamRecord) uint64 {
return convert.HashStr(record.Data().(flow.Data)[2].(string))
}),
WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[1].(int64)
}), OrderBy(ASC), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
return record.Data().(flow.Data)[0].(string)
})).
To(snk)

errCh = f.Open()
Expand All @@ -179,15 +183,15 @@ var _ = g.Describe("Streaming", func() {
g.Expect(len(snk.Value())).Should(gomega.BeNumerically(">=", 1))
// e2e-service-consumer Group
g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(gomega.BeEquivalentTo([]*Tuple2{
{int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500)}, 7000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)},
{int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500), "e2e-service-consumerinstance-001"}, 7000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600), "e2e-service-consumerinstance-004"}, 6000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700), "e2e-service-consumerinstance-002"}, 4000)},
}))
// e2e-service-provider Group
g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(gomega.BeEquivalentTo([]*Tuple2{
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)},
{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700), "e2e-service-providerinstance-003"}, 5000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800), "e2e-service-providerinstance-002"}, 3000)},
{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000), "e2e-service-providerinstance-001"}, 1000)},
}))
}).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed())
})
Expand All @@ -209,10 +213,12 @@ var _ = g.Describe("Streaming", func() {
f = New("test", flowTest.NewSlice(input)).
Map(flow.UnaryFunc[any](func(_ context.Context, item interface{}) interface{} {
// groupBy
return flow.Data{item.(*record).service, int64(item.(*record).value)}
return flow.Data{item.(*record).service, int64(item.(*record).value), item.(*record).service + item.(*record).instance}
})).
Window(NewTumblingTimeWindows(15*time.Second, 15*time.Second)).
TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
TopN(3, WithKeyExtractor(func(record flow.StreamRecord) uint64 {
return convert.HashStr(record.Data().(flow.Data)[2].(string))
}), WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[1].(int64)
}), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
return record.Data().(flow.Data)[0].(string)
Expand Down Expand Up @@ -242,15 +248,15 @@ var _ = g.Describe("Streaming", func() {
g.Expect(len(snk.Value())).Should(gomega.BeNumerically(">=", 1))
// e2e-service-consumer Group
g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(gomega.BeEquivalentTo([]*Tuple2{
{int64(9900), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9900)}, 2000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700), "e2e-service-consumerinstance-002"}, 4000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600), "e2e-service-consumerinstance-004"}, 6000)},
{int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500), "e2e-service-consumerinstance-001"}, 7000)},
}))
// e2e-service-provider Group
g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(gomega.BeEquivalentTo([]*Tuple2{
{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)},
{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000), "e2e-service-providerinstance-001"}, 1000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800), "e2e-service-providerinstance-002"}, 3000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700), "e2e-service-providerinstance-003"}, 5000)},
}))
}).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed())
})
Expand Down
83 changes: 68 additions & 15 deletions pkg/flow/streaming/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (s *windowedFlow) TopN(topNum int, opts ...any) flow.Flow {

type topNAggregatorGroup struct {
aggregatorGroup map[string]*topNAggregator
keyExtractor func(flow.StreamRecord) uint64
sortKeyExtractor func(flow.StreamRecord) int64
groupKeyExtractor func(flow.StreamRecord) string
comparator utils.Comparator
Expand All @@ -86,9 +87,9 @@ type topNAggregatorGroup struct {

type topNAggregator struct {
*topNAggregatorGroup
treeMap *treemap.Map
currentTopNum int
dirty bool
treeMap *treemap.Map
dict map[uint64]int64
dirty bool
}

// TopNOption is the option to set up a top-n aggregator group.
Expand All @@ -101,6 +102,13 @@ func WithSortKeyExtractor(sortKeyExtractor func(flow.StreamRecord) int64) TopNOp
}
}

// WithKeyExtractor sets a closure to extract the key.
func WithKeyExtractor(keyExtractor func(flow.StreamRecord) uint64) TopNOption {
return func(aggregator *topNAggregatorGroup) {
aggregator.keyExtractor = keyExtractor
}
}

// WithGroupKeyExtractor extract group key from the StreamRecord.
func WithGroupKeyExtractor(groupKeyExtractor func(flow.StreamRecord) string) TopNOption {
return func(aggregator *topNAggregatorGroup) {
Expand All @@ -117,14 +125,16 @@ func OrderBy(sort TopNSort) TopNOption {

func (t *topNAggregatorGroup) Add(input []flow.StreamRecord) {
for _, item := range input {
key := t.keyExtractor(item)
sortKey := t.sortKeyExtractor(item)
groupKey := t.groupKeyExtractor(item)
aggregator := t.getOrCreateGroup(groupKey)
aggregator.removeExistedItem(key)
if aggregator.checkSortKeyInBufferRange(sortKey) {
if e := t.l.Debug(); e.Enabled() {
e.Str("group", groupKey).Time("elem_ts", time.Unix(0, item.TimestampMillis()*int64(time.Millisecond))).Msg("put into topN buffer")
e.Str("group", groupKey).Uint64("key", key).Time("elem_ts", time.Unix(0, item.TimestampMillis()*int64(time.Millisecond))).Msg("put into topN buffer")
}
aggregator.put(sortKey, item)
aggregator.put(key, sortKey, item)
aggregator.doCleanUp()
}
}
Expand All @@ -138,7 +148,7 @@ func (t *topNAggregatorGroup) Snapshot() interface{} {
}
aggregator.dirty = false
iter := aggregator.treeMap.Iterator()
items := make([]*Tuple2, 0, aggregator.currentTopNum)
items := make([]*Tuple2, 0, aggregator.size())
for iter.Next() {
list := iter.Value().([]interface{})
for _, item := range list {
Expand Down Expand Up @@ -180,35 +190,35 @@ func (t *topNAggregatorGroup) getOrCreateGroup(group string) *topNAggregator {
t.aggregatorGroup[group] = &topNAggregator{
topNAggregatorGroup: t,
treeMap: treemap.NewWith(t.comparator),
dict: make(map[uint64]int64),
}
return t.aggregatorGroup[group]
}

func (t *topNAggregator) doCleanUp() {
// do cleanup: maintain the treeMap windowSize
if t.currentTopNum > t.cacheSize {
if t.size() > t.cacheSize {
lastKey, lastValues := t.treeMap.Max()
size := len(lastValues.([]interface{}))
l := lastValues.([]interface{})
delete(t.dict, t.keyExtractor(l[len(l)-1].(flow.StreamRecord)))
// remove last one
if size <= 1 {
t.currentTopNum -= size
if len(l) <= 1 {
t.treeMap.Remove(lastKey)
} else {
t.currentTopNum--
t.treeMap.Put(lastKey, lastValues.([]interface{})[0:size-1])
t.treeMap.Put(lastKey, l[:len(l)-1])
}
}
}

func (t *topNAggregator) put(sortKey int64, data flow.StreamRecord) {
t.currentTopNum++
func (t *topNAggregator) put(key uint64, sortKey int64, data flow.StreamRecord) {
t.dirty = true
if existingList, ok := t.treeMap.Get(sortKey); ok {
existingList = append(existingList.([]interface{}), data)
t.treeMap.Put(sortKey, existingList)
} else {
t.treeMap.Put(sortKey, []interface{}{data})
}
t.dict[key] = sortKey
}

func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool {
Expand All @@ -223,7 +233,50 @@ func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool {
if t.comparator(sortKey, worstKey.(int64)) < 0 {
return true
}
return t.currentTopNum < t.cacheSize
return t.size() < t.cacheSize
}

func (t *topNAggregator) removeExistedItem(key uint64) {
existed, ok := t.dict[key]
if !ok {
return
}
delete(t.dict, key)
list, ok := t.treeMap.Get(existed)
if !ok {
return
}
l := list.([]interface{})
for i := 0; i < len(l); i++ {
if t.keyExtractor(l[i].(flow.StreamRecord)) == key {
l = append(l[:i], l[i+1:]...)
}
}
if len(l) == 0 {
t.treeMap.Remove(existed)
return
}
t.treeMap.Put(existed, l)
}

func (t *topNAggregator) size() int {
return len(t.dict)
}

func (t *topNAggregatorGroup) leakCheck() {
for g, agg := range t.aggregatorGroup {
if agg.size() > t.cacheSize {
panic(g + "leak detected: topN buffer size exceed the cache size")
}
iter := agg.treeMap.Iterator()
count := 0
for iter.Next() {
count += len(iter.Value().([]interface{}))
}
if count != agg.size() {
panic(g + "leak detected: treeMap size not match dictionary size")
}
}
}

// Tuple2 is a tuple with 2 fields. Each field may be a separate type.
Expand Down
Loading
Loading