Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix live mode shows the first query result instead of separately requested two different results #230

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## tip

* BUGFIX: fix image links in public readme.
* BUGFIX: fix live mode shows the first query result instead of separately requested two different results. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/229).

## v0.14.2

Expand Down
48 changes: 37 additions & 11 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func NewDatasource(ctx context.Context, settings backend.DataSourceInstanceSetti
}

return &Datasource{
settings: settings,
httpClient: cl,
streamCh: make(chan *data.Frame),
grafanaSettings: grafanaSettings,
settings: settings,
httpClient: cl,
liveModeResponses: sync.Map{},
grafanaSettings: grafanaSettings,
}, nil
}

Expand Down Expand Up @@ -98,16 +98,19 @@ func NewGrafanaSettings(settings backend.DataSourceInstanceSettings) (*GrafanaSe
type Datasource struct {
settings backend.DataSourceInstanceSettings

httpClient *http.Client
streamCh chan *data.Frame
grafanaSettings *GrafanaSettings
httpClient *http.Client
liveModeResponses sync.Map
grafanaSettings *GrafanaSettings
}

// SubscribeStream called when a user tries to subscribe to a plugin/datasource
// managed channel path – thus plugin can check subscribe permissions and communicate
// options with Grafana Core. As soon as first subscriber joins channel RunStream
// will be called.
func (d *Datasource) SubscribeStream(_ context.Context, _ *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
func (d *Datasource) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
ch := make(chan *data.Frame, 1)
d.liveModeResponses.Store(req.Path, ch)

return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
}, nil
Expand All @@ -127,10 +130,20 @@ func (d *Datasource) PublishStream(_ context.Context, _ *backend.PublishStreamRe
// the call will be terminated until next active subscriber appears. Call termination
// can happen with a delay.
func (d *Datasource) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error {
// request.Path is created in the frontend. Please check this function in the frontend
// runLiveQueryThroughBackend where the path is created.
// path: `${request.requestId}/${query.refId}`
ch, ok := d.liveModeResponses.Load(request.Path)
if !ok {
return fmt.Errorf("failed to find the channel for the query: %s", request.Path)
}

livestream := ch.(chan *data.Frame)

go func() {
prev := data.FrameJSONCache{}
var err error
for frame := range d.streamCh {
for frame := range livestream {
next, _ := data.FrameToJSONCache(frame)
if next.SameSchema(&prev) {
err = sender.SendBytes(next.Bytes(data.IncludeAll))
Expand Down Expand Up @@ -164,7 +177,14 @@ func (d *Datasource) RunStream(ctx context.Context, request *backend.RunStreamRe
func (d *Datasource) Dispose() {
// Clean up datasource instance resources.
d.httpClient.CloseIdleConnections()
close(d.streamCh)
// close all channels before clear the map
d.liveModeResponses.Range(func(key, value interface{}) bool {
ch := value.(chan *data.Frame)
close(ch)
return true
})
// clear the map
d.liveModeResponses.Clear()
}

// QueryData handles multiple queries and returns multiple responses.
Expand Down Expand Up @@ -211,7 +231,13 @@ func (d *Datasource) streamQuery(ctx context.Context, request *backend.RunStream
return err
}

return parseStreamResponse(r, d.streamCh)
ch, ok := d.liveModeResponses.Load(request.Path)
if !ok {
return fmt.Errorf("failed to find the channel for the query: %s", request.Path)
}

livestream := ch.(chan *data.Frame)
return parseStreamResponse(r, livestream)
}

// getQueryFromRaw parses the query json from the raw message.
Expand Down
17 changes: 10 additions & 7 deletions pkg/plugin/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"log"
"net/http"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -516,10 +515,12 @@ func TestDatasourceStreamQueryRequest(t *testing.T) {
datasource := instance.(*Datasource)
packetSender := &mockStreamSender{packets: []json.RawMessage{}}
sender := backend.NewStreamSender(packetSender)
datasource.streamCh = make(chan *data.Frame)
ch := make(chan *data.Frame)
datasource.liveModeResponses.Store("request_id/ref_id", ch)
expErr := func(ctx context.Context, e string) {
_ = packetSender.Reset()
err := datasource.RunStream(ctx, &backend.RunStreamRequest{
Path: "request_id/ref_id",
Data: json.RawMessage(`
{
"datasource": {
Expand Down Expand Up @@ -705,10 +706,12 @@ func TestDatasourceStreamRequestWithRetry(t *testing.T) {
datasource := instance.(*Datasource)
packetSender := &mockStreamSender{packets: []json.RawMessage{}}
sender := backend.NewStreamSender(packetSender)
datasource.streamCh = make(chan *data.Frame)
ch := make(chan *data.Frame)
datasource.liveModeResponses.Store("request_id/ref_id", ch)

expErr := func(e string) {
err := datasource.RunStream(ctx, &backend.RunStreamRequest{
Path: "request_id/ref_id",
Data: json.RawMessage(`
{
"datasource": {
Expand All @@ -734,6 +737,7 @@ func TestDatasourceStreamRequestWithRetry(t *testing.T) {
expValue := func() {
_ = packetSender.Reset()
err := datasource.RunStream(ctx, &backend.RunStreamRequest{
Path: "request_id/ref_id",
Data: json.RawMessage(`
{
"datasource": {
Expand Down Expand Up @@ -798,9 +802,6 @@ func TestDatasourceStreamRequestWithRetry(t *testing.T) {
if err != nil {
t.Fatalf("error marshal expected frames %s", err)
}
log.Printf("got: %s", got)
log.Printf("exp: %s", exp)
log.Printf("FIRST ===========")
if !bytes.Equal(got, exp) {
t.Fatalf("unexpected metric %s want %s", got, exp)
}
Expand Down Expand Up @@ -895,8 +896,10 @@ func TestDatasourceStreamTailProcess(t *testing.T) {
datasource := instance.(*Datasource)
packetSender := &mockStreamSender{packets: []json.RawMessage{}}
sender := backend.NewStreamSender(packetSender)
datasource.streamCh = make(chan *data.Frame)
ch := make(chan *data.Frame)
datasource.liveModeResponses.Store("request_id/ref_id", ch)
if err := datasource.RunStream(ctx, &backend.RunStreamRequest{
Path: "request_id/ref_id",
Data: json.RawMessage(`
{
"datasource": {
Expand Down
4 changes: 2 additions & 2 deletions src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,15 @@ export class VictoriaLogsDatasource
addr: {
scope: LiveChannelScope.DataSource,
namespace: this.uid,
path: `vl/${query.refId}`, // this will allow each new query to create a new connection
path: `${request.requestId}/${query.refId}`, // this will allow each new query to create a new connection
data: {
...query,
},
},
}).pipe(map((response) => {
return {
data: response.data || [],
key: `victorialogs-datasource-${query.refId}`,
key: `victorialogs-datasource-${request.requestId}-${query.refId}`,
state: LoadingState.Streaming,
};
}));
Expand Down
Loading