Skip to content

Commit

Permalink
chore(metric): add jwt injection and update chart queries (#117)
Browse files Browse the repository at this point in the history
Because

- support jwt-sub verification for api calls
- support always return current pipeline name for metric charts

This commit

- add header injection for jwt-sub
- update /charts endpoints

Resolves INS-1773
  • Loading branch information
heiruwu committed Sep 7, 2023
1 parent c981342 commit a55ea85
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 62 deletions.
7 changes: 7 additions & 0 deletions pkg/middleware/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"github.com/instill-ai/mgmt-backend/pkg/constant"
"github.com/instill-ai/mgmt-backend/pkg/logger"

mgmtPB "github.com/instill-ai/protogen-go/base/mgmt/v1alpha"
)

// GetRequestSingleHeader get a request header, the header has to be single-value HTTP header
Expand Down Expand Up @@ -158,3 +160,8 @@ func ErrorHandler(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.
}
}
}

func InjectOwnerToContext(ctx context.Context, owner *mgmtPB.User) context.Context {
ctx = metadata.AppendToOutgoingContext(ctx, "Jwt-Sub", owner.GetUid())
return ctx
}
145 changes: 83 additions & 62 deletions pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (i *influxDB) QueryPipelineTriggerTableRecords(ctx context.Context, owner s
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
|> drop(columns: ["owner_uid", "trigger_mode", "compute_time_duration", "pipeline_trigger_id", "status"])
|> group(columns: ["pipeline_id", "pipeline_uid", "pipeline_release_id", "pipeline_release_uid"])
|> group(columns: ["pipeline_id", "pipeline_uid"])
|> map(fn: (r) => ({r with trigger_time: time(v: r.trigger_time)}))
|> max(column: "trigger_time")
|> rename(columns: {trigger_time: "most_recent_trigger_time"})
Expand All @@ -298,13 +298,13 @@ func (i *influxDB) QueryPipelineTriggerTableRecords(ctx context.Context, owner s
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
|> drop(columns: ["owner_uid", "trigger_mode", "compute_time_duration", "pipeline_trigger_id"])
|> group(columns: ["pipeline_id", "pipeline_uid", "pipeline_release_id", "pipeline_release_uid", "status"])
|> group(columns: ["pipeline_id", "pipeline_uid", "status"])
|> count(column: "trigger_time")
|> rename(columns: {trigger_time: "trigger_count"})
|> group(columns: ["pipeline_id", "pipeline_uid", "pipeline_release_id", "pipeline_release_uid"])
join(tables: {t1: t1, t2: t2}, on: ["pipeline_id", "pipeline_uid", "pipeline_release_id", "pipeline_release_uid"])
|> group(columns: ["pipeline_id", "pipeline_uid"])
join(tables: {t1: t1, t2: t2}, on: ["pipeline_id", "pipeline_uid"])
|> group()
|> pivot(rowKey: ["pipeline_id", "pipeline_uid", "pipeline_release_id", "pipeline_release_uid", "most_recent_trigger_time"], columnKey: ["status"], valueColumn: "trigger_count")
|> pivot(rowKey: ["pipeline_id", "pipeline_uid", "most_recent_trigger_time"], columnKey: ["status"], valueColumn: "trigger_count")
|> sort(columns: ["most_recent_trigger_time"], desc: true)
|> filter(fn: (r) => r["most_recent_trigger_time"] < time(v: %v))`,
i.bucket,
Expand Down Expand Up @@ -435,36 +435,51 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
}

query := fmt.Sprintf(
`t1 = from(bucket: "%v")
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "pipeline.trigger")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
|> group(columns: ["pipeline_id", "pipeline_uid", "pipeline_release_id", "pipeline_release_uid", "trigger_mode", "status"])
|> sort(columns: ["trigger_time"])
|> aggregateWindow(every: duration(v: %v), column: "trigger_time", fn: count, createEmpty: false)
t2 = from(bucket: "%v")
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "pipeline.trigger")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
|> group(columns: ["pipeline_id", "pipeline_uid", "pipeline_release_id", "pipeline_release_uid", "trigger_mode", "status"])
|> sort(columns: ["trigger_time"])
|> aggregateWindow(every: duration(v: %v), fn: sum, column: "compute_time_duration", createEmpty: false)
join(tables: {t1: t1, t2:t2}, on: ["_start", "_stop", "_time", "pipeline_id", "pipeline_uid", "pipeline_release_id", "pipeline_release_uid", "trigger_mode", "status"])`,
`base =
from(bucket: "%v")
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "pipeline.trigger")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
bucketBase =
base
|> group(columns: ["pipeline_uid"])
|> sort(columns: ["trigger_time"])
bucketTrigger =
bucketBase
|> aggregateWindow(
every: duration(v: %v),
column: "trigger_time",
fn: count,
createEmpty: false,
)
bucketDuration =
bucketBase
|> aggregateWindow(
every: duration(v: %v),
fn: sum,
column: "compute_time_duration",
createEmpty: false,
)
bucket =
join(
tables: {t1: bucketTrigger, t2: bucketDuration},
on: ["_start", "_stop", "_time", "pipeline_uid"],
)
nameMap =
base
|> keep(columns: ["trigger_time", "pipeline_id", "pipeline_uid"])
|> group(columns: ["pipeline_uid"])
|> top(columns: ["trigger_time"], n: 1)
|> drop(columns: ["trigger_time"])
join(tables: {t1: bucket, t2: nameMap}, on: ["pipeline_uid"])`,
i.bucket,
start,
stop,
owner,
expr,
aggregationWindow,
i.bucket,
start,
stop,
owner,
expr,
aggregationWindow,
)

Expand Down Expand Up @@ -499,12 +514,6 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
if v, match := result.Record().ValueByKey(constant.PipelineReleaseUID).(string); match {
chartRecord.PipelineReleaseUid = v
}
if v, match := result.Record().ValueByKey(constant.TriggerMode).(string); match {
chartRecord.TriggerMode = mgmtPB.Mode(mgmtPB.Mode_value[v])
}
if v, match := result.Record().ValueByKey(constant.Status).(string); match {
chartRecord.Status = mgmtPB.Status(mgmtPB.Status_value[v])
}
chartRecord.TimeBuckets = []*timestamppb.Timestamp{}
chartRecord.TriggerCounts = []int64{}
chartRecord.ComputeTimeDuration = []float32{}
Expand Down Expand Up @@ -809,36 +818,51 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner
}

query := fmt.Sprintf(
`t1 = from(bucket: "%v")
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "connector.execute")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["connector_owner_uid"] == "%v")
%v
|> group(columns: ["connector_id", "connector_uid", "status"])
|> sort(columns: ["execute_time"])
|> aggregateWindow(every: duration(v: %v), column: "execute_time", fn: count, createEmpty: false)
t2 = from(bucket: "%v")
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "connector.execute")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["connector_owner_uid"] == "%v")
%v
|> group(columns: ["connector_id", "connector_uid", "status"])
|> sort(columns: ["execute_time"])
|> aggregateWindow(every: duration(v: %v), fn: sum, column: "compute_time_duration", createEmpty: false)
join(tables: {t1: t1, t2:t2}, on: ["_start", "_stop", "_time", "connector_id", "connector_uid", "status"])`,
`base =
from(bucket: "%v")
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "connector.execute")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["connector_owner_uid"] == "%v")
%v
bucketBase =
base
|> group(columns: ["connector_uid"])
|> sort(columns: ["execute_time"])
bucketTrigger =
bucketBase
|> aggregateWindow(
every: duration(v: %v),
column: "execute_time",
fn: count,
createEmpty: false,
)
bucketDuration =
bucketBase
|> aggregateWindow(
every: duration(v: %v),
fn: sum,
column: "compute_time_duration",
createEmpty: false,
)
bucket =
join(
tables: {t1: bucketTrigger, t2: bucketDuration},
on: ["_start", "_stop", "_time", "connector_uid"],
)
nameMap =
base
|> keep(columns: ["execute_time", "connector_id", "connector_uid"])
|> group(columns: ["connector_uid"])
|> top(columns: ["execute_time"], n: 1)
|> drop(columns: ["execute_time"])
join(tables: {t1: bucket, t2: nameMap}, on: ["connector_uid"])`,
i.bucket,
start,
stop,
owner,
expr,
aggregationWindow,
i.bucket,
start,
stop,
owner,
expr,
aggregationWindow,
)

Expand Down Expand Up @@ -867,9 +891,6 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner
if v, match := result.Record().ValueByKey(constant.ConnectorUID).(string); match {
chartRecord.ConnectorUid = v
}
if v, match := result.Record().ValueByKey(constant.Status).(string); match {
chartRecord.Status = mgmtPB.Status(mgmtPB.Status_value[v])
}
chartRecord.TimeBuckets = []*timestamppb.Timestamp{}
chartRecord.ExecuteCounts = []int64{}
chartRecord.ComputeTimeDuration = []float32{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.einride.tech/aip/filtering"

"github.com/instill-ai/mgmt-backend/pkg/constant"
"github.com/instill-ai/mgmt-backend/pkg/middleware"
"github.com/instill-ai/mgmt-backend/pkg/repository"

mgmtPB "github.com/instill-ai/protogen-go/base/mgmt/v1alpha"
Expand All @@ -16,6 +17,8 @@ import (

func (s *service) pipelineUIDLookup(ctx context.Context, filter filtering.Filter, owner *mgmtPB.User) filtering.Filter {

ctx = middleware.InjectOwnerToContext(ctx, owner)

// lookup pipeline uid
if len(filter.CheckedExpr.GetExpr().GetCallExpr().GetArgs()) > 0 {
pipelineID, _ := repository.ExtractConstExpr(filter.CheckedExpr.GetExpr(), constant.PipelineID, false)
Expand Down Expand Up @@ -43,6 +46,8 @@ func (s *service) pipelineUIDLookup(ctx context.Context, filter filtering.Filter

func (s *service) connectorUIDLookup(ctx context.Context, filter filtering.Filter, owner *mgmtPB.User) filtering.Filter {

ctx = middleware.InjectOwnerToContext(ctx, owner)

// lookup connector uid
if len(filter.CheckedExpr.GetExpr().GetCallExpr().GetArgs()) > 0 {
connectorID, _ := repository.ExtractConstExpr(filter.CheckedExpr.GetExpr(), constant.ConnectorID, false)
Expand Down

0 comments on commit a55ea85

Please sign in to comment.