Skip to content

Commit

Permalink
Add the bydbctl analyze series command to analyze the series data. (#…
Browse files Browse the repository at this point in the history
…553)

* Add the `bydbctl analyze series` command to analyze the series data.

Signed-off-by: Gao Hongtao <[email protected]>

---------

Signed-off-by: Gao Hongtao <[email protected]>
  • Loading branch information
hanahmily authored Oct 23, 2024
1 parent 4cd3c09 commit d5ec659
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Release Notes.

## 0.8.0

### Features

- Add the `bydbctl analyze series` command to analyze the series data.

### Bug Fixes

- Fix the bug that TopN processing item leak. The item can not be updated but as a new item.
Expand Down
118 changes: 118 additions & 0 deletions bydbctl/internal/cmd/analyze.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cmd

import (
"context"
"errors"
"fmt"
"time"

"github.com/spf13/cobra"
"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/version"
)

func newAnalyzeCmd() *cobra.Command {
analyzeCmd := &cobra.Command{
Use: "analyze",
Version: version.Build(),
Short: "Analyze operation",
}

var subjectName string
seriesCmd := &cobra.Command{
Use: "series",
Version: version.Build(),
Short: "Analyze series cardinality and distribution",
RunE: func(_ *cobra.Command, args []string) (err error) {
if len(args) == 0 {
return errors.New("series index directory is required, its name should be 'sidx' in a segment 'seg-xxxxxx'")
}
store, err := inverted.NewStore(inverted.StoreOpts{
Path: args[0],
Logger: logger.GetLogger("series-analyzer"),
})
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
iter, err := store.SeriesIterator(ctx)
if err != nil {
return err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()
if subjectName != "" {
var found bool
for iter.Next() {
var s pbv1.Series
if err = s.Unmarshal(iter.Val().EntityValues); err != nil {
return err
}
if s.Subject == subjectName {
found = true
for i := range s.EntityValues {
fmt.Printf("%s,", pbv1.MustTagValueToStr(s.EntityValues[i]))
}
fmt.Println()
continue
}
if found {
break
}
}
return
}
var subject string
var count, total int
for iter.Next() {
total++
var s pbv1.Series
if err = s.Unmarshal(iter.Val().EntityValues); err != nil {
return err
}
if s.Subject != subject {
if subject != "" {
fmt.Printf("%s, %d\n", subject, count)
}
subject = s.Subject
count = 1
} else {
count++
}
}
if subject != "" {
fmt.Printf("%s, %d\n", subject, count)
}
fmt.Printf("total, %d\n", total)
return nil
},
}

seriesCmd.Flags().StringVarP(&subjectName, "subject", "s", "", "subject name")

analyzeCmd.AddCommand(seriesCmd)
return analyzeCmd
}
77 changes: 77 additions & 0 deletions bydbctl/internal/cmd/analyze_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cmd_test

import (
"path"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/spf13/cobra"
"github.com/zenizh/go-capturer"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
)

var _ = Describe("Measure Data Query", func() {
var addr, grpcAddr, directory string
var serverDeferFunc func()
var rootCmd *cobra.Command
var now time.Time
BeforeEach(func() {
var err error
now, err = time.ParseInLocation("2006-01-02T15:04:05", "2021-09-01T23:30:00", time.Local)
Expect(err).NotTo(HaveOccurred())
directory, _, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
var ports []int
ports, err = test.AllocateFreePorts(4)
Expect(err).NotTo(HaveOccurred())
grpcAddr, addr, serverDeferFunc = setup.ClosableStandalone(directory, ports)
addr = httpSchema + addr
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
})

It("analyzes all series", func() {
conn, err := grpclib.NewClient(
grpcAddr,
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
Expect(err).NotTo(HaveOccurred())
cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, time.Minute)
serverDeferFunc()

rootCmd.SetArgs([]string{"analyze", "series", path.Join(directory, "measure/sw_metric/seg-20210901/sidx")})
out := capturer.CaptureStdout(func() {
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
})
GinkgoWriter.Println(out)
Expect(out).To(ContainSubstring("total"))
})

AfterEach(func() {
})
})
3 changes: 2 additions & 1 deletion bydbctl/internal/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func RootCmdFlags(command *cobra.Command) {
_ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr"))
viper.SetDefault("addr", "http://localhost:17913")

command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(), newMeasureCmd(), newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd(), newHealthCheckCmd())
command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(), newMeasureCmd(),
newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd(), newHealthCheckCmd(), newAnalyzeCmd())
}

func init() {
Expand Down
65 changes: 65 additions & 0 deletions docs/interacting/bydbctl/analyze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Analyze the data

`bydbctl` provides a powerful subcommand `analyze` to analyze the data stored in BanyanDB. The `analyze` subcommand supports the following operations:

## Analyze the series

`bydbctl analyze series` to analyze the series cardinality and distribution, which is useful for understanding the performance impact of the series.

Flags:

* `-s` or `--subject`: The name of stream or measure in the series index. If it's absent, the command will analyze all series in the series index.

Arguments:

* `path`: The path of the series index. It's mandatory.

### All series distribution in the series index

```shell
bydbctl analyze series /tmp/measure/measure-default/seg-xxxxxx/sidx
```

The expected result is a csv file with the following content:

```csv
browser_app_page_ajax_error_sum_day, 1
browser_app_page_ajax_error_sum_hour, 1
browser_app_page_dns_avg_day_topn, 1644
....
service_sidecar_internal_resp_latency_nanos_hour, 10
service_sla_day, 7
service_sla_hour, 10
service_traffic_minute, 48
tag_autocomplete_minute, 8
zipkin_service_span_traffic_minute, 31
zipkin_service_traffic_minute, 4
total, 1266040
```

The first column is the series name, and the second column is the cardinality of the series.

The `total` line shows the total number of series in the series index.

### A specific series detail in the series index

```shell
bydbctl analyze series -s endpoint_sla_day /tmp/measure/measure-default/seg-xxxxxx/sidx
```

The expected result is a csv file with the following content:

```csv
...
"bWVzaC1zdnI6OnJhdGluZy5zYW1wbGUtc2VydmljZXM=.1_R0VUOi9zb25ncy8zNi9yZXZpZXdzLzM3",
"bWVzaC1zdnI6OnJhdGluZy5zYW1wbGUtc2VydmljZXM=.1_R0VUOi9zb25ncy8zNy9yZXZpZXdzLzM4",
"bWVzaC1zdnI6OnJhdGluZy5zYW1wbGUtc2VydmljZXM=.1_R0VUOi9zb25ncy8zOC9yZXZpZXdzLzM5",
"bWVzaC1zdnI6OnJhdGluZy5zYW1wbGUtc2VydmljZXM=.1_R0VUOi9zb25ncy8zOS9yZXZpZXdzLzQw",
"bWVzaC1zdnI6OnJlY29tbWVuZGF0aW9uLnNhbXBsZS1zZXJ2aWNlcw==.1_R0VUOi9yY21k",
"bWVzaC1zdnI6OnNvbmdzLnNhbXBsZS1zZXJ2aWNlcw==.1_R0VUOi9zb25ncy90b3A=",
...
```

Each row represents a group of tags which compose a series. They are defined in the `entity` field on `Stream` or `Measure`.

In this example, `endpoint_sla_day`'s `entity` is `entity_id` tag which comes from OAP's internal `endpoint_id`.
2 changes: 2 additions & 0 deletions docs/menu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ catalog:
path: "/interacting/bydbctl/query/filter-operation"
- name: "CRUD Property"
path: "/interacting/bydbctl/property"
- name: "Analyzing Data"
path: "/interacting/bydbctl/analyze"
- name: "Web UI"
catalog:
- name: "Dashboard"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ require (
github.com/blevesearch/segment v0.9.1 // indirect
github.com/blevesearch/snowballstem v0.9.0 // indirect
github.com/blevesearch/vellum v1.0.10 // indirect
github.com/blugelabs/bluge_segment_api v0.2.0 // indirect
github.com/blugelabs/bluge_segment_api v0.2.0
github.com/blugelabs/ice v1.0.0 // indirect
github.com/caio/go-tdigest v3.1.0+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ func (s Series) String() string {
return fmt.Sprintf("%s:%d", s.EntityValues, s.ID)
}

// SortedField returns the value of the sorted field.
func (s Series) SortedField() []byte {
return s.EntityValues
}

// SeriesDocument represents a series document in an index.
type SeriesDocument struct {
Fields map[string][]byte
Expand All @@ -223,6 +228,7 @@ type SeriesStore interface {
Store
// Search returns a list of series that match the given matchers.
Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error)
SeriesIterator(context.Context) (FieldIterator[Series], error)
}

// SeriesMatcherType represents the type of series matcher.
Expand Down
64 changes: 64 additions & 0 deletions pkg/index/inverted/inverted_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

"github.com/blugelabs/bluge"
"github.com/blugelabs/bluge/search"
segment "github.com/blugelabs/bluge_segment_api"
"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
Expand Down Expand Up @@ -161,3 +163,65 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey
}
return result, nil
}

func (s *store) SeriesIterator(ctx context.Context) (index.FieldIterator[index.Series], error) {
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
defer func() {
_ = reader.Close()
}()
dict, err := reader.DictionaryIterator(entityField, nil, nil, nil)
if err != nil {
return nil, err
}
return &dictIterator{dict: dict, ctx: ctx}, nil
}

type dictIterator struct {
dict segment.DictionaryIterator
ctx context.Context
err error
series index.Series
i int
}

func (d *dictIterator) Next() bool {
if d.err != nil {
return false
}
if d.i%1000 == 0 {
select {
case <-d.ctx.Done():
d.err = d.ctx.Err()
return false
default:
}
}
de, err := d.dict.Next()
if err != nil {
d.err = err
return false
}
if de == nil {
return false
}
d.series = index.Series{
EntityValues: convert.StringToBytes(de.Term()),
}
d.i++
return true
}

func (d *dictIterator) Query() index.Query {
return nil
}

func (d *dictIterator) Val() index.Series {
return d.series
}

func (d *dictIterator) Close() error {
return multierr.Combine(d.err, d.dict.Close())
}
Loading

0 comments on commit d5ec659

Please sign in to comment.