Skip to content

Commit

Permalink
Merge branch 'main' into dev_measure_aggregate_function
Browse files Browse the repository at this point in the history
  • Loading branch information
StLeoX authored Oct 10, 2024
2 parents d38a113 + bcaf9a8 commit 312f4d1
Show file tree
Hide file tree
Showing 81 changed files with 5,072 additions and 566 deletions.
11 changes: 11 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

Release Notes.

## 0.8.0

### Chores

- Fix metrics system typo.

## 0.7.0

### File System Changes
Expand All @@ -27,6 +33,8 @@ Release Notes.
- Add HTTP health check endpoint for the data node.
- Add slow query log for the distributed query and local query.
- Support applying the index rule to the tag belonging to the entity.
- Add search analyzer "url" which breaks test into tokens at any non-letter and non-digit character.
- Introduce "match_option" to the "match" query.

### Bugs

Expand All @@ -44,6 +52,9 @@ Release Notes.
- Fix panic when removing a expired segment.
- Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order.
- Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase.
- Fix the bug that the long running query doesn't stop when the context is canceled.
- Fix the bug that merge block with different tags or fields.
- Fix the bug that the pending measure block is not released when a full block is merged.

### Documentation

Expand Down
21 changes: 9 additions & 12 deletions api/proto/banyandb/database/v1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,16 @@ message IndexRule {
Type type = 3 [(validate.rules).enum.defined_only = true];
// updated_at indicates when the IndexRule is updated
google.protobuf.Timestamp updated_at = 4;
enum Analyzer {
ANALYZER_UNSPECIFIED = 0;
// Keyword analyzer is a “noop” analyzer which returns the entire input string as a single token.
ANALYZER_KEYWORD = 1;
// Standard analyzer provides grammar based tokenization
ANALYZER_STANDARD = 2;
// Simple analyzer breaks text into tokens at any non-letter character,
// such as numbers, spaces, hyphens and apostrophes, discards non-letter characters,
// and changes uppercase to lowercase.
ANALYZER_SIMPLE = 3;
}

// analyzer analyzes tag value to support the full-text searching for TYPE_INVERTED indices.
Analyzer analyzer = 5;
// available analyzers are:
// - "standard" provides grammar based tokenization
// - "simple" breaks text into tokens at any non-letter character,
// such as numbers, spaces, hyphens and apostrophes, discards non-letter characters,
// and changes uppercase to lowercase.
// - "keyword" is a “noop” analyzer which returns the entire input string as a single token.
// - "url" breaks test into tokens at any non-letter and non-digit character.
string analyzer = 5;
// no_sort indicates whether the index is not for sorting.
bool no_sort = 6;
}
Expand Down
10 changes: 10 additions & 0 deletions api/proto/banyandb/model/v1/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ message Condition {
string name = 1;
BinaryOp op = 2;
TagValue value = 3;
message MatchOption {
string analyzer = 1;
enum Operator {
OPERATOR_UNSPECIFIED = 0;
OPERATOR_AND = 1;
OPERATOR_OR = 2;
}
Operator operator = 2;
}
MatchOption match_option = 4;
}

// tag_families are indexed.
Expand Down
3 changes: 3 additions & 0 deletions banyand/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ COPY build/bin/windows/${TARGETARCH}/banyand-server-static "/banyand"

FROM build-${TARGETOS} AS final

ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR
ENV GRPC_GO_LOG_FORMATTER=json

EXPOSE 17912
EXPOSE 17913
EXPOSE 6060
Expand Down
3 changes: 1 addition & 2 deletions banyand/dquery/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type measureQueryProcessor struct {
*queryService
}

func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
n := time.Now()
now := n.UnixNano()
Expand Down Expand Up @@ -82,7 +82,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("query plan")
}
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
Expand Down
3 changes: 1 addition & 2 deletions banyand/dquery/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type streamQueryProcessor struct {
*queryService
}

func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (p *streamQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
n := time.Now()
now := n.UnixNano()
queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
Expand Down Expand Up @@ -78,7 +78,6 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if p.log.Debug().Enabled() {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
ctx := context.Background()
if queryCriteria.Trace {
var tracer *query.Tracer
var span *query.Span
Expand Down
5 changes: 3 additions & 2 deletions banyand/dquery/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type topNQueryProcessor struct {
*queryService
}

func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
if !ok {
t.log.Warn().Msg("invalid event data type")
Expand All @@ -64,7 +64,8 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event")
}
if request.Trace {
tracer, ctx := pkgquery.NewTracer(context.TODO(), n.Format(time.RFC3339Nano))
var tracer *pkgquery.Tracer
tracer, ctx = pkgquery.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "distributed-client")
span.Tag("request", convert.BytesToString(logger.Proto(request)))
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
if err != nil {
return nil, nil, err
}
iter, err := s.store.Iterator(fieldKey, rangeOpts,
iter, err := s.store.Iterator(ctx, fieldKey, rangeOpts,
opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
if err != nil {
return nil, nil, err
Expand Down
14 changes: 6 additions & 8 deletions banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
if status != modelv1.Status_STATUS_SUCCEED {
ms.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, "measure", "write")
}
ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, "measure", "write")
ms.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "measure", "write")
if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send measure write response")
ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "measure", "write")
Expand Down Expand Up @@ -149,7 +149,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
continue
}
message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(data.TopicMeasureWrite, message)
_, errWritePub := publisher.Publish(ctx, data.TopicMeasureWrite, message)
if errWritePub != nil {
ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a message")
reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled)
Expand All @@ -161,7 +161,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er

var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: make([]*measurev1.DataPoint, 0)}

func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
for _, g := range req.Groups {
ms.metrics.totalStarted.Inc(1, g, "measure", "query")
}
Expand All @@ -180,7 +180,6 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest)
}
now := time.Now()
if req.Trace {
ctx := context.TODO()
tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "measure-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
Expand All @@ -194,7 +193,7 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest)
span.Stop()
}()
}
feat, err := ms.broadcaster.Publish(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req))
feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req))
if err != nil {
return nil, err
}
Expand All @@ -215,13 +214,12 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest)
return nil, nil
}

func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
func (ms *measureService) TopN(ctx context.Context, topNRequest *measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
if err = timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", topNRequest.GetTimeRange(), err)
}
now := time.Now()
if topNRequest.Trace {
ctx := context.TODO()
tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "topn-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(topNRequest)))
Expand All @@ -236,7 +234,7 @@ func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNReq
}()
}
message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest)
feat, errQuery := ms.broadcaster.Publish(data.TopicTopNQuery, message)
feat, errQuery := ms.broadcaster.Publish(ctx, data.TopicTopNQuery, message)
if errQuery != nil {
return nil, errQuery
}
Expand Down
7 changes: 3 additions & 4 deletions banyand/liaison/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
continue
}
message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(data.TopicStreamWrite, message)
_, errWritePub := publisher.Publish(ctx, data.TopicStreamWrite, message)
if errWritePub != nil {
s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled)
Expand All @@ -155,7 +155,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {

var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements: make([]*streamv1.Element, 0)}

func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (resp *streamv1.QueryResponse, err error) {
func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) (resp *streamv1.QueryResponse, err error) {
for _, g := range req.Groups {
s.metrics.totalStarted.Inc(1, g, "stream", "query")
}
Expand All @@ -178,7 +178,6 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (re
}
now := time.Now()
if req.Trace {
ctx := context.TODO()
tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "stream-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
Expand All @@ -193,7 +192,7 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (re
}()
}
message := bus.NewMessage(bus.MessageID(now.UnixNano()), req)
feat, errQuery := s.broadcaster.Publish(data.TopicStreamQuery, message)
feat, errQuery := s.broadcaster.Publish(ctx, data.TopicStreamQuery, message)
if errQuery != nil {
if errors.Is(errQuery, io.EOF) {
return emptyStreamQueryResponse, nil
Expand Down
Loading

0 comments on commit 312f4d1

Please sign in to comment.