From d5ec659068d26fdee76667c222501607fde81e65 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Wed, 23 Oct 2024 14:46:54 +0800 Subject: [PATCH] Add the `bydbctl analyze series` command to analyze the series data. (#553) * Add the `bydbctl analyze series` command to analyze the series data. Signed-off-by: Gao Hongtao --------- Signed-off-by: Gao Hongtao --- CHANGES.md | 4 + bydbctl/internal/cmd/analyze.go | 118 ++++++++++++++++++++++++++ bydbctl/internal/cmd/analyze_test.go | 77 +++++++++++++++++ bydbctl/internal/cmd/root.go | 3 +- docs/interacting/bydbctl/analyze.md | 65 ++++++++++++++ docs/menu.yml | 2 + go.mod | 2 +- pkg/index/index.go | 6 ++ pkg/index/inverted/inverted_series.go | 64 ++++++++++++++ pkg/pb/v1/value.go | 15 ++++ 10 files changed, 354 insertions(+), 2 deletions(-) create mode 100644 bydbctl/internal/cmd/analyze.go create mode 100644 bydbctl/internal/cmd/analyze_test.go create mode 100644 docs/interacting/bydbctl/analyze.md diff --git a/CHANGES.md b/CHANGES.md index dcfd8e6b9..e52ab56f2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,10 @@ Release Notes. ## 0.8.0 +### Features + +- Add the `bydbctl analyze series` command to analyze the series data. + ### Bug Fixes - Fix the bug that TopN processing item leak. The item can not be updated but as a new item. diff --git a/bydbctl/internal/cmd/analyze.go b/bydbctl/internal/cmd/analyze.go new file mode 100644 index 000000000..04f85207d --- /dev/null +++ b/bydbctl/internal/cmd/analyze.go @@ -0,0 +1,118 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/spf13/cobra" + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/version" +) + +func newAnalyzeCmd() *cobra.Command { + analyzeCmd := &cobra.Command{ + Use: "analyze", + Version: version.Build(), + Short: "Analyze operation", + } + + var subjectName string + seriesCmd := &cobra.Command{ + Use: "series", + Version: version.Build(), + Short: "Analyze series cardinality and distribution", + RunE: func(_ *cobra.Command, args []string) (err error) { + if len(args) == 0 { + return errors.New("series index directory is required, its name should be 'sidx' in a segment 'seg-xxxxxx'") + } + store, err := inverted.NewStore(inverted.StoreOpts{ + Path: args[0], + Logger: logger.GetLogger("series-analyzer"), + }) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + iter, err := store.SeriesIterator(ctx) + if err != nil { + return err + } + defer func() { + err = multierr.Append(err, iter.Close()) + }() + if subjectName != "" { + var found bool + for iter.Next() { + var s pbv1.Series + if err = s.Unmarshal(iter.Val().EntityValues); err != nil { + return err + } + if s.Subject == subjectName { + found = true + for i := range s.EntityValues { + fmt.Printf("%s,", pbv1.MustTagValueToStr(s.EntityValues[i])) + } + fmt.Println() + continue + } + if found { + break + } + } + return + } + var subject string + var count, total int + for iter.Next() { + total++ + var s pbv1.Series + if err = s.Unmarshal(iter.Val().EntityValues); err != nil { + return err + } + if s.Subject != subject { + if subject != "" { + fmt.Printf("%s, %d\n", subject, count) + } + subject = s.Subject + count = 1 + } else { + count++ + } + } + if subject != "" { + fmt.Printf("%s, %d\n", subject, count) + } + fmt.Printf("total, %d\n", total) + return nil + }, + } + + seriesCmd.Flags().StringVarP(&subjectName, "subject", "s", "", "subject name") + + analyzeCmd.AddCommand(seriesCmd) + return analyzeCmd +} diff --git a/bydbctl/internal/cmd/analyze_test.go b/bydbctl/internal/cmd/analyze_test.go new file mode 100644 index 000000000..ae68cdab7 --- /dev/null +++ b/bydbctl/internal/cmd/analyze_test.go @@ -0,0 +1,77 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd_test + +import ( + "path" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/spf13/cobra" + "github.com/zenizh/go-capturer" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/setup" + cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data" +) + +var _ = Describe("Measure Data Query", func() { + var addr, grpcAddr, directory string + var serverDeferFunc func() + var rootCmd *cobra.Command + var now time.Time + BeforeEach(func() { + var err error + now, err = time.ParseInLocation("2006-01-02T15:04:05", "2021-09-01T23:30:00", time.Local) + Expect(err).NotTo(HaveOccurred()) + directory, _, err = test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + var ports []int + ports, err = test.AllocateFreePorts(4) + Expect(err).NotTo(HaveOccurred()) + grpcAddr, addr, serverDeferFunc = setup.ClosableStandalone(directory, ports) + addr = httpSchema + addr + rootCmd = &cobra.Command{Use: "root"} + cmd.RootCmdFlags(rootCmd) + }) + + It("analyzes all series", func() { + conn, err := grpclib.NewClient( + grpcAddr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, time.Minute) + serverDeferFunc() + + rootCmd.SetArgs([]string{"analyze", "series", path.Join(directory, "measure/sw_metric/seg-20210901/sidx")}) + out := capturer.CaptureStdout(func() { + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + GinkgoWriter.Println(out) + Expect(out).To(ContainSubstring("total")) + }) + + AfterEach(func() { + }) +}) diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go index 4efff65c6..de4b2ddd1 100644 --- a/bydbctl/internal/cmd/root.go +++ b/bydbctl/internal/cmd/root.go @@ -69,7 +69,8 @@ func RootCmdFlags(command *cobra.Command) { _ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr")) viper.SetDefault("addr", "http://localhost:17913") - command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(), newMeasureCmd(), newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd(), newHealthCheckCmd()) + command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(), newMeasureCmd(), + newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd(), newHealthCheckCmd(), newAnalyzeCmd()) } func init() { diff --git a/docs/interacting/bydbctl/analyze.md b/docs/interacting/bydbctl/analyze.md new file mode 100644 index 000000000..9a92f002b --- /dev/null +++ b/docs/interacting/bydbctl/analyze.md @@ -0,0 +1,65 @@ +# Analyze the data + +`bydbctl` provides a powerful subcommand `analyze` to analyze the data stored in BanyanDB. The `analyze` subcommand supports the following operations: + +## Analyze the series + +`bydbctl analyze series` to analyze the series cardinality and distribution, which is useful for understanding the performance impact of the series. + +Flags: + +* `-s` or `--subject`: The name of stream or measure in the series index. If it's absent, the command will analyze all series in the series index. + +Arguments: + +* `path`: The path of the series index. It's mandatory. + +### All series distribution in the series index + +```shell +bydbctl analyze series /tmp/measure/measure-default/seg-xxxxxx/sidx +``` + +The expected result is a csv file with the following content: + +```csv +browser_app_page_ajax_error_sum_day, 1 +browser_app_page_ajax_error_sum_hour, 1 +browser_app_page_dns_avg_day_topn, 1644 +.... +service_sidecar_internal_resp_latency_nanos_hour, 10 +service_sla_day, 7 +service_sla_hour, 10 +service_traffic_minute, 48 +tag_autocomplete_minute, 8 +zipkin_service_span_traffic_minute, 31 +zipkin_service_traffic_minute, 4 +total, 1266040 +``` + +The first column is the series name, and the second column is the cardinality of the series. + +The `total` line shows the total number of series in the series index. + +### A specific series detail in the series index + +```shell +bydbctl analyze series -s endpoint_sla_day /tmp/measure/measure-default/seg-xxxxxx/sidx +``` + +The expected result is a csv file with the following content: + +```csv +... +"bWVzaC1zdnI6OnJhdGluZy5zYW1wbGUtc2VydmljZXM=.1_R0VUOi9zb25ncy8zNi9yZXZpZXdzLzM3", +"bWVzaC1zdnI6OnJhdGluZy5zYW1wbGUtc2VydmljZXM=.1_R0VUOi9zb25ncy8zNy9yZXZpZXdzLzM4", +"bWVzaC1zdnI6OnJhdGluZy5zYW1wbGUtc2VydmljZXM=.1_R0VUOi9zb25ncy8zOC9yZXZpZXdzLzM5", +"bWVzaC1zdnI6OnJhdGluZy5zYW1wbGUtc2VydmljZXM=.1_R0VUOi9zb25ncy8zOS9yZXZpZXdzLzQw", +"bWVzaC1zdnI6OnJlY29tbWVuZGF0aW9uLnNhbXBsZS1zZXJ2aWNlcw==.1_R0VUOi9yY21k", +"bWVzaC1zdnI6OnNvbmdzLnNhbXBsZS1zZXJ2aWNlcw==.1_R0VUOi9zb25ncy90b3A=", +... +``` + +Each row represents a group of tags which compose a series. They are defined in the `entity` field on `Stream` or `Measure`. + +In this example, `endpoint_sla_day`'s `entity` is `entity_id` tag which comes from OAP's internal `endpoint_id`. diff --git a/docs/menu.yml b/docs/menu.yml index 19ec17330..992edb812 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -65,6 +65,8 @@ catalog: path: "/interacting/bydbctl/query/filter-operation" - name: "CRUD Property" path: "/interacting/bydbctl/property" + - name: "Analyzing Data" + path: "/interacting/bydbctl/analyze" - name: "Web UI" catalog: - name: "Dashboard" diff --git a/go.mod b/go.mod index dde09b8f6..c5c5a97fb 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,7 @@ require ( github.com/blevesearch/segment v0.9.1 // indirect github.com/blevesearch/snowballstem v0.9.0 // indirect github.com/blevesearch/vellum v1.0.10 // indirect - github.com/blugelabs/bluge_segment_api v0.2.0 // indirect + github.com/blugelabs/bluge_segment_api v0.2.0 github.com/blugelabs/ice v1.0.0 // indirect github.com/caio/go-tdigest v3.1.0+incompatible // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect diff --git a/pkg/index/index.go b/pkg/index/index.go index 644909916..1f1565fad 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -212,6 +212,11 @@ func (s Series) String() string { return fmt.Sprintf("%s:%d", s.EntityValues, s.ID) } +// SortedField returns the value of the sorted field. +func (s Series) SortedField() []byte { + return s.EntityValues +} + // SeriesDocument represents a series document in an index. type SeriesDocument struct { Fields map[string][]byte @@ -223,6 +228,7 @@ type SeriesStore interface { Store // Search returns a list of series that match the given matchers. Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error) + SeriesIterator(context.Context) (FieldIterator[Series], error) } // SeriesMatcherType represents the type of series matcher. diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 7288dd315..25143e592 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -24,7 +24,9 @@ import ( "github.com/blugelabs/bluge" "github.com/blugelabs/bluge/search" + segment "github.com/blugelabs/bluge_segment_api" "github.com/pkg/errors" + "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" @@ -161,3 +163,65 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey } return result, nil } + +func (s *store) SeriesIterator(ctx context.Context) (index.FieldIterator[index.Series], error) { + reader, err := s.writer.Reader() + if err != nil { + return nil, err + } + defer func() { + _ = reader.Close() + }() + dict, err := reader.DictionaryIterator(entityField, nil, nil, nil) + if err != nil { + return nil, err + } + return &dictIterator{dict: dict, ctx: ctx}, nil +} + +type dictIterator struct { + dict segment.DictionaryIterator + ctx context.Context + err error + series index.Series + i int +} + +func (d *dictIterator) Next() bool { + if d.err != nil { + return false + } + if d.i%1000 == 0 { + select { + case <-d.ctx.Done(): + d.err = d.ctx.Err() + return false + default: + } + } + de, err := d.dict.Next() + if err != nil { + d.err = err + return false + } + if de == nil { + return false + } + d.series = index.Series{ + EntityValues: convert.StringToBytes(de.Term()), + } + d.i++ + return true +} + +func (d *dictIterator) Query() index.Query { + return nil +} + +func (d *dictIterator) Val() index.Series { + return d.series +} + +func (d *dictIterator) Close() error { + return multierr.Combine(d.err, d.dict.Close()) +} diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go index ec6e757ee..1e73e6c78 100644 --- a/pkg/pb/v1/value.go +++ b/pkg/pb/v1/value.go @@ -20,6 +20,7 @@ package v1 import ( "bytes" "fmt" + "strconv" "github.com/pkg/errors" @@ -82,6 +83,20 @@ func MustTagValueSpecToValueType(tag databasev1.TagType) ValueType { } } +// MustTagValueToStr converts modelv1.TagValue to string. +func MustTagValueToStr(tag *modelv1.TagValue) string { + switch tag.Value.(type) { + case *modelv1.TagValue_Str: + return `"` + tag.GetStr().Value + `"` + case *modelv1.TagValue_Int: + return strconv.FormatInt(tag.GetInt().Value, 10) + case *modelv1.TagValue_BinaryData: + return fmt.Sprintf("%x", tag.GetBinaryData()) + default: + panic("unknown tag value type") + } +} + func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) { dest = append(dest, byte(MustTagValueToValueType(tv))) switch tv.Value.(type) {