Skip to content

Commit

Permalink
fix(metric): fix pagination with multiple key sorting (#106)
Browse files Browse the repository at this point in the history
Because

- have duplicate records when sorting with multiple key with pagination

This commit

- use `trigger_time`/`execute_time` as sorting key

Resolves INS-1388
  • Loading branch information
heiruwu authored Jul 24, 2023
1 parent 32fcef1 commit 020fc9e
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
|> group()
|> sort(columns: ["trigger_time"])
|> limit(n: %v)`,
i.bucket,
start,
Expand Down Expand Up @@ -177,8 +178,18 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string
if v, match := result.Record().ValueByKey("compute_time_duration").(float64); match {
record.ComputeTimeDuration = float32(v)
}
// TODO: temporary solution for legacy data format, currently there is no way to update the tags in influxdb
if v, match := result.Record().ValueByKey("status").(string); match {
record.Status = mgmtPB.Status(mgmtPB.Status_value[v])
if v == "completed" {
record.Status = mgmtPB.Status_STATUS_COMPLETED
} else if v == "errored" {
record.Status = mgmtPB.Status_STATUS_ERRORED
} else {
record.Status = mgmtPB.Status(mgmtPB.Status_value[v])
}
}
if v, match := result.Record().ValueByKey("pipeline_mode").(string); match {
record.TriggerMode = mgmtPB.Mode(mgmtPB.Mode_value[v])
}

records = append(records, record)
Expand Down Expand Up @@ -246,6 +257,7 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
|> group(columns: ["pipeline_id", "pipeline_uid", "trigger_mode", "status"])
|> sort(columns: ["trigger_time"])
|> aggregateWindow(every: duration(v: %v), column: "trigger_time", fn: count, createEmpty: false)
t2 = from(bucket: "%v")
|> range(start: %v, stop: %v)
Expand All @@ -254,6 +266,7 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
|> group(columns: ["pipeline_id", "pipeline_uid", "trigger_mode", "status"])
|> sort(columns: ["trigger_time"])
|> aggregateWindow(every: duration(v: %v), fn: sum, column: "compute_time_duration", createEmpty: false)
join(tables: {t1: t1, t2:t2}, on: ["_start", "_stop", "_time", "pipeline_id", "pipeline_uid", "trigger_mode", "status"])`,
i.bucket,
Expand Down Expand Up @@ -384,6 +397,7 @@ func (i *influxDB) QueryConnectorExecuteRecords(ctx context.Context, owner strin
|> filter(fn: (r) => r["connector_owner_uid"] == "%v")
%v
|> group()
|> sort(columns: ["execute_time"])
|> limit(n: %v)`,
i.bucket,
start,
Expand Down Expand Up @@ -533,6 +547,7 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner
|> filter(fn: (r) => r["connector_owner_uid"] == "%v")
%v
|> group(columns: ["connector_id", "connector_uid", "status"])
|> sort(columns: ["execute_time"])
|> aggregateWindow(every: duration(v: %v), column: "execute_time", fn: count, createEmpty: false)
t2 = from(bucket: "%v")
|> range(start: %v, stop: %v)
Expand All @@ -541,6 +556,7 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner
|> filter(fn: (r) => r["connector_owner_uid"] == "%v")
%v
|> group(columns: ["connector_id", "connector_uid", "status"])
|> sort(columns: ["execute_time"])
|> aggregateWindow(every: duration(v: %v), fn: sum, column: "compute_time_duration", createEmpty: false)
join(tables: {t1: t1, t2:t2}, on: ["_start", "_stop", "_time", "connector_id", "connector_uid", "status"])`,
i.bucket,
Expand Down

0 comments on commit 020fc9e

Please sign in to comment.