Skip to content

Commit

Permalink
chore(metrics): add filter for org (#162)
Browse files Browse the repository at this point in the history
Because

- support organization dashboard

This commit

- add target `owner_id` filter for query
  • Loading branch information
heiruwu committed Dec 15, 2023
1 parent 898807d commit eb7161b
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 26 deletions.
4 changes: 3 additions & 1 deletion pkg/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const MaxPayloadSize = 1024 * 1024 * 32
const (
Start string = "start"
Stop string = "stop"
OwnerName string = "owner_name"
PipelineID string = "pipeline_id"
PipelineUID string = "pipeline_uid"
PipelineReleaseID string = "pipeline_release_id"
Expand All @@ -45,7 +46,8 @@ const (

// Metric data enum
const (
OwnerUID string = "owner_uid"
PipelineOwnerUID string = "owner_uid"
ConnectorOwnerUID string = "connector_owner_uid"
PipelineTriggerMeasurement string = "pipeline.trigger"
ConnectorExecuteMeasurement string = "connector.execute"
PipelineMode string = "pipeline_mode"
Expand Down
3 changes: 3 additions & 0 deletions pkg/handler/publichandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ func (h *PublicHandler) ListPipelineTriggerRecords(ctx context.Context, req *mgm
filtering.DeclareStandardFunctions(),
filtering.DeclareIdent(constant.Start, filtering.TypeTimestamp),
filtering.DeclareIdent(constant.Stop, filtering.TypeTimestamp),
filtering.DeclareIdent(constant.OwnerName, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineID, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineUID, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineReleaseID, filtering.TypeString),
Expand Down Expand Up @@ -945,6 +946,7 @@ func (h *PublicHandler) ListPipelineTriggerTableRecords(ctx context.Context, req
filtering.DeclareStandardFunctions(),
filtering.DeclareIdent(constant.Start, filtering.TypeTimestamp),
filtering.DeclareIdent(constant.Stop, filtering.TypeTimestamp),
filtering.DeclareIdent(constant.OwnerName, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineID, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineUID, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineReleaseID, filtering.TypeString),
Expand Down Expand Up @@ -1013,6 +1015,7 @@ func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req
filtering.DeclareStandardFunctions(),
filtering.DeclareIdent(constant.Start, filtering.TypeTimestamp),
filtering.DeclareIdent(constant.Stop, filtering.TypeTimestamp),
filtering.DeclareIdent(constant.OwnerName, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineID, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineUID, filtering.TypeString),
filtering.DeclareIdent(constant.PipelineReleaseID, filtering.TypeString),
Expand Down
15 changes: 2 additions & 13 deletions pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func NewInfluxDB(queryAPI api.QueryAPI, bucket string) InfluxDB {

func (i *influxDB) constructRecordQuery(
ctx context.Context,
owner string,
pageSize int64,
pageToken string,
filter filtering.Filter,
Expand Down Expand Up @@ -107,15 +106,13 @@ func (i *influxDB) constructRecordQuery(
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "%v")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => %v)
%v
|> group()
|> sort(columns: ["%v"])`,
i.bucket,
start,
stop,
measurement,
owner,
expr,
sortKey,
)
Expand Down Expand Up @@ -151,7 +148,7 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string

logger, _ := logger.GetZapLogger(ctx)

query, total, err := i.constructRecordQuery(ctx, fmt.Sprintf("r[\"owner_uid\"] == \"%v\"", owner), pageSize, pageToken, filter, constant.PipelineTriggerMeasurement, constant.TriggerTime)
query, total, err := i.constructRecordQuery(ctx, pageSize, pageToken, filter, constant.PipelineTriggerMeasurement, constant.TriggerTime)
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
}
Expand Down Expand Up @@ -285,7 +282,6 @@ func (i *influxDB) QueryPipelineTriggerTableRecords(ctx context.Context, owner s
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "pipeline.trigger")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
triggerRank =
base
Expand Down Expand Up @@ -333,7 +329,6 @@ func (i *influxDB) QueryPipelineTriggerTableRecords(ctx context.Context, owner s
i.bucket,
start,
stop,
owner,
expr,
mostRecetTimeFilter,
)
Expand Down Expand Up @@ -461,7 +456,6 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "pipeline.trigger")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["owner_uid"] == "%v")
%v
bucketBase =
base
Expand Down Expand Up @@ -498,7 +492,6 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
i.bucket,
start,
stop,
owner,
expr,
aggregationWindow,
aggregationWindow,
Expand Down Expand Up @@ -568,7 +561,7 @@ func (i *influxDB) QueryConnectorExecuteRecords(ctx context.Context, owner strin

logger, _ := logger.GetZapLogger(ctx)

query, total, err := i.constructRecordQuery(ctx, fmt.Sprintf("r[\"connector_owner_uid\"] == \"%v\"", owner), pageSize, pageToken, filter, constant.ConnectorExecuteMeasurement, "execute_time")
query, total, err := i.constructRecordQuery(ctx, pageSize, pageToken, filter, constant.ConnectorExecuteMeasurement, "execute_time")
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
}
Expand Down Expand Up @@ -695,7 +688,6 @@ func (i *influxDB) QueryConnectorExecuteTableRecords(ctx context.Context, owner
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "connector.execute")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["connector_owner_uid"] == "%v")
%v
executeRank =
base
Expand Down Expand Up @@ -743,7 +735,6 @@ func (i *influxDB) QueryConnectorExecuteTableRecords(ctx context.Context, owner
i.bucket,
start,
stop,
owner,
expr,
mostRecetTimeFilter,
)
Expand Down Expand Up @@ -865,7 +856,6 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner
|> range(start: %v, stop: %v)
|> filter(fn: (r) => r["_measurement"] == "connector.execute")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["connector_owner_uid"] == "%v")
%v
bucketBase =
base
Expand Down Expand Up @@ -902,7 +892,6 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner
i.bucket,
start,
stop,
owner,
expr,
aggregationWindow,
aggregationWindow,
Expand Down
125 changes: 113 additions & 12 deletions pkg/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package service
import (
"context"
"fmt"
"strings"

"go.einride.tech/aip/filtering"
"google.golang.org/grpc/metadata"

"github.com/gofrs/uuid"
"github.com/instill-ai/mgmt-backend/internal/resource"
"github.com/instill-ai/mgmt-backend/pkg/constant"
"github.com/instill-ai/mgmt-backend/pkg/repository"

Expand All @@ -19,6 +22,80 @@ func InjectOwnerToContext(ctx context.Context, owner *mgmtPB.User) context.Conte
return ctx
}

func (s *service) checkPipelineOwnership(ctx context.Context, filter filtering.Filter, owner *mgmtPB.User) (*string, filtering.Filter, error) {
ownerUID := owner.Uid

if len(filter.CheckedExpr.GetExpr().GetCallExpr().GetArgs()) > 0 {

ownerName, _ := repository.ExtractConstExpr(filter.CheckedExpr.GetExpr(), constant.OwnerName, false)

if strings.HasPrefix(ownerName, "users") {
if ownerName != fmt.Sprintf("users/%s", owner.Id) {
return nil, filter, ErrNoPermission
}
repository.HijackConstExpr(filter.CheckedExpr.GetExpr(), constant.OwnerName, constant.PipelineOwnerUID, *owner.Uid, false)
} else if strings.HasPrefix(ownerName, "organizations") {
id, err := resource.GetRscNameID(ownerName)
if err != nil {
return nil, filter, err
}
org, err := s.GetOrganizationAdmin(ctx, id)
if err != nil {
return nil, filter, err
}
granted, err := s.GetACLClient().CheckPermission("organization", uuid.FromStringOrNil(org.Uid), "user", uuid.FromStringOrNil(owner.GetUid()), "", "member")
if err != nil {
return nil, filter, err
}
if !granted {
return nil, filter, ErrNoPermission
}
repository.HijackConstExpr(filter.CheckedExpr.GetExpr(), constant.OwnerName, constant.PipelineOwnerUID, org.Uid, false)
ownerUID = &org.Uid
} else {
return nil, filter, fmt.Errorf("owner_name namepsace format error")
}
}
return ownerUID, filter, nil
}

func (s *service) checkConnectorOwnership(ctx context.Context, filter filtering.Filter, owner *mgmtPB.User) (*string, filtering.Filter, error) {
ownerUID := owner.Uid

if len(filter.CheckedExpr.GetExpr().GetCallExpr().GetArgs()) > 0 {

ownerName, _ := repository.ExtractConstExpr(filter.CheckedExpr.GetExpr(), constant.OwnerName, false)

if strings.HasPrefix(ownerName, "users") {
if ownerName != fmt.Sprintf("users/%s", owner.Id) {
return nil, filter, ErrNoPermission
}
repository.HijackConstExpr(filter.CheckedExpr.GetExpr(), constant.OwnerName, constant.ConnectorOwnerUID, *owner.Uid, false)
} else if strings.HasPrefix(ownerName, "organizations") {
id, err := resource.GetRscNameID(ownerName)
if err != nil {
return nil, filter, err
}
org, err := s.GetOrganizationAdmin(ctx, id)
if err != nil {
return nil, filter, err
}
granted, err := s.GetACLClient().CheckPermission("organization", uuid.FromStringOrNil(org.Uid), "user", uuid.FromStringOrNil(owner.GetUid()), "", "member")
if err != nil {
return nil, filter, err
}
if !granted {
return nil, filter, ErrNoPermission
}
repository.HijackConstExpr(filter.CheckedExpr.GetExpr(), constant.OwnerName, constant.ConnectorOwnerUID, org.Uid, false)
ownerUID = &org.Uid
} else {
return nil, filter, fmt.Errorf("owner_name namepsace format error")
}
}
return ownerUID, filter, nil
}

func (s *service) pipelineUIDLookup(ctx context.Context, filter filtering.Filter, owner *mgmtPB.User) (filtering.Filter, error) {

ctx = InjectOwnerToContext(ctx, owner)
Expand Down Expand Up @@ -75,13 +152,17 @@ func (s *service) connectorUIDLookup(ctx context.Context, filter filtering.Filte

func (s *service) ListPipelineTriggerRecords(ctx context.Context, owner *mgmtPB.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtPB.PipelineTriggerRecord, int64, string, error) {

var err error
ownerUID, filter, err := s.checkPipelineOwnership(ctx, filter, owner)
if err != nil {
return []*mgmtPB.PipelineTriggerRecord{}, 0, "", err
}

filter, err = s.pipelineUIDLookup(ctx, filter, owner)
if err != nil {
return []*mgmtPB.PipelineTriggerRecord{}, 0, "", nil
}

pipelineTriggerRecords, ps, pt, err := s.influxDB.QueryPipelineTriggerRecords(ctx, *owner.Uid, pageSize, pageToken, filter)
pipelineTriggerRecords, ps, pt, err := s.influxDB.QueryPipelineTriggerRecords(ctx, *ownerUID, pageSize, pageToken, filter)
if err != nil {
return nil, 0, "", err
}
Expand All @@ -91,13 +172,17 @@ func (s *service) ListPipelineTriggerRecords(ctx context.Context, owner *mgmtPB.

func (s *service) ListPipelineTriggerTableRecords(ctx context.Context, owner *mgmtPB.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtPB.PipelineTriggerTableRecord, int64, string, error) {

var err error
ownerUID, filter, err := s.checkPipelineOwnership(ctx, filter, owner)
if err != nil {
return []*mgmtPB.PipelineTriggerTableRecord{}, 0, "", err
}

filter, err = s.pipelineUIDLookup(ctx, filter, owner)
if err != nil {
return []*mgmtPB.PipelineTriggerTableRecord{}, 0, "", nil
}

pipelineTriggerTableRecords, ps, pt, err := s.influxDB.QueryPipelineTriggerTableRecords(ctx, *owner.Uid, pageSize, pageToken, filter)
pipelineTriggerTableRecords, ps, pt, err := s.influxDB.QueryPipelineTriggerTableRecords(ctx, *ownerUID, pageSize, pageToken, filter)
if err != nil {
return nil, 0, "", err
}
Expand All @@ -107,13 +192,17 @@ func (s *service) ListPipelineTriggerTableRecords(ctx context.Context, owner *mg

func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtPB.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtPB.PipelineTriggerChartRecord, error) {

var err error
ownerUID, filter, err := s.checkPipelineOwnership(ctx, filter, owner)
if err != nil {
return []*mgmtPB.PipelineTriggerChartRecord{}, err
}

filter, err = s.pipelineUIDLookup(ctx, filter, owner)
if err != nil {
return []*mgmtPB.PipelineTriggerChartRecord{}, nil
}

pipelineTriggerChartRecords, err := s.influxDB.QueryPipelineTriggerChartRecords(ctx, *owner.Uid, aggregationWindow, filter)
pipelineTriggerChartRecords, err := s.influxDB.QueryPipelineTriggerChartRecords(ctx, *ownerUID, aggregationWindow, filter)
if err != nil {
return nil, err
}
Expand All @@ -123,7 +212,11 @@ func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mg

func (s *service) ListConnectorExecuteRecords(ctx context.Context, owner *mgmtPB.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtPB.ConnectorExecuteRecord, int64, string, error) {

var err error
ownerUID, filter, err := s.checkConnectorOwnership(ctx, filter, owner)
if err != nil {
return []*mgmtPB.ConnectorExecuteRecord{}, 0, "", err
}

filter, err = s.pipelineUIDLookup(ctx, filter, owner)
if err != nil {
return []*mgmtPB.ConnectorExecuteRecord{}, 0, "", nil
Expand All @@ -134,7 +227,7 @@ func (s *service) ListConnectorExecuteRecords(ctx context.Context, owner *mgmtPB
return []*mgmtPB.ConnectorExecuteRecord{}, 0, "", nil
}

connectorExecuteRecords, ps, pt, err := s.influxDB.QueryConnectorExecuteRecords(ctx, *owner.Uid, pageSize, pageToken, filter)
connectorExecuteRecords, ps, pt, err := s.influxDB.QueryConnectorExecuteRecords(ctx, *ownerUID, pageSize, pageToken, filter)
if err != nil {
return nil, 0, "", err
}
Expand All @@ -144,13 +237,17 @@ func (s *service) ListConnectorExecuteRecords(ctx context.Context, owner *mgmtPB

func (s *service) ListConnectorExecuteTableRecords(ctx context.Context, owner *mgmtPB.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtPB.ConnectorExecuteTableRecord, int64, string, error) {

var err error
ownerUID, filter, err := s.checkConnectorOwnership(ctx, filter, owner)
if err != nil {
return []*mgmtPB.ConnectorExecuteTableRecord{}, 0, "", err
}

filter, err = s.connectorUIDLookup(ctx, filter, owner)
if err != nil {
return []*mgmtPB.ConnectorExecuteTableRecord{}, 0, "", nil
}

connectorExecuteTableRecords, ps, pt, err := s.influxDB.QueryConnectorExecuteTableRecords(ctx, *owner.Uid, pageSize, pageToken, filter)
connectorExecuteTableRecords, ps, pt, err := s.influxDB.QueryConnectorExecuteTableRecords(ctx, *ownerUID, pageSize, pageToken, filter)
if err != nil {
return nil, 0, "", err
}
Expand All @@ -160,7 +257,11 @@ func (s *service) ListConnectorExecuteTableRecords(ctx context.Context, owner *m

func (s *service) ListConnectorExecuteChartRecords(ctx context.Context, owner *mgmtPB.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtPB.ConnectorExecuteChartRecord, error) {

var err error
ownerUID, filter, err := s.checkConnectorOwnership(ctx, filter, owner)
if err != nil {
return []*mgmtPB.ConnectorExecuteChartRecord{}, err
}

filter, err = s.pipelineUIDLookup(ctx, filter, owner)
if err != nil {
return []*mgmtPB.ConnectorExecuteChartRecord{}, nil
Expand All @@ -171,7 +272,7 @@ func (s *service) ListConnectorExecuteChartRecords(ctx context.Context, owner *m
return []*mgmtPB.ConnectorExecuteChartRecord{}, nil
}

connectorExecuteChartRecords, err := s.influxDB.QueryConnectorExecuteChartRecords(ctx, *owner.Uid, aggregationWindow, filter)
connectorExecuteChartRecords, err := s.influxDB.QueryConnectorExecuteChartRecords(ctx, *ownerUID, aggregationWindow, filter)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit eb7161b

Please sign in to comment.