From 2a578010a1b73c3560e8dc57490bef90720fabd2 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 18 Jul 2023 02:23:20 +0800 Subject: [PATCH 1/3] chore(metric): add chart support --- cmd/main/main.go | 2 +- go.mod | 21 +- go.sum | 56 ++++- pkg/datamodel/datamodel.go | 5 - pkg/external/external.go | 25 ++- pkg/handler/publichandler.go | 183 +++++++++++++++- pkg/repository/influx.go | 389 ++++++++++++++++++++++++++++++++++- pkg/service/metric.go | 30 +++ pkg/service/service.go | 7 +- 9 files changed, 691 insertions(+), 27 deletions(-) diff --git a/cmd/main/main.go b/cmd/main/main.go index f74c87d..7293d7b 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -137,7 +137,7 @@ func main() { grpcServerOpts = append(grpcServerOpts, grpc.Creds(creds)) } - influxDBClient, influxDBQueryAPI := external.InitInfluxDBServiceClient(ctx, &config.Config) + influxDBClient, influxDBQueryAPI := external.InitInfluxDBServiceClientV2(ctx, &config.Config) defer influxDBClient.Close() influxDB := repository.NewInfluxDB(influxDBQueryAPI, config.Config.InfluxDB.Bucket) diff --git a/go.mod b/go.mod index 1be774d..8ccce5d 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,14 @@ go 1.19 retract v0.3.2 // Published accidentally. require ( + github.com/InfluxCommunity/influxdb3-go v0.1.0 github.com/gofrs/uuid v4.3.1+incompatible github.com/golang-migrate/migrate/v4 v4.15.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 github.com/iancoleman/strcase v0.2.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230704155506-ba92605f8d15 + github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230717175816-09fb1ab0a259 github.com/instill-ai/usage-client v0.2.4-alpha github.com/instill-ai/x v0.3.0-alpha github.com/jackc/pgx/v5 v5.2.0 @@ -39,6 +40,9 @@ require ( require ( cloud.google.com/go/longrunning v0.5.1 // indirect + github.com/andybalholm/brotli v1.0.4 // indirect + github.com/apache/arrow/go/v12 v12.0.0 // indirect + github.com/apache/thrift v0.16.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/catalinc/hashcash v0.0.0-20220723060415-5e3ec3e24f67 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect @@ -47,11 +51,15 @@ require ( github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/goccy/go-json v0.9.11 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/influxdata/line-protocol/v2 v2.2.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.13.0 // indirect github.com/jackc/pgio v1.0.0 // indirect @@ -62,14 +70,20 @@ require ( github.com/jackc/pgx/v4 v4.17.2 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/lib/pq v1.10.7 // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/opencontainers/image-spec v1.1.0-rc2 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.39.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect @@ -78,8 +92,11 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.9.0 // indirect + golang.org/x/mod v0.8.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect + golang.org/x/tools v0.6.0 // indirect + golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fb64359..71408b6 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,9 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= +github.com/InfluxCommunity/influxdb3-go v0.1.0 h1:c+5H7qD7WZ0KSTCtCrVjBoMEspIP8KBO5LfVfLAaXn8= +github.com/InfluxCommunity/influxdb3-go v0.1.0/go.mod h1:6hVZLGqLyfEvXu14JRm4Ai938q8BzJ73TGQ7VKh8qPA= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= @@ -119,9 +122,15 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0= github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY9UnI16Z+UJqRyk= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/arrow v0.0.0-20210818145353-234c94e4ce64/go.mod h1:2qMFB56yOP3KzkB3PbYZ4AlUFg3a88F67TIx5lB/WwY= github.com/apache/arrow/go/arrow v0.0.0-20211013220434-5962184e7a30/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= +github.com/apache/arrow/go/v12 v12.0.0 h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2OK5cmc= +github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg= +github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= +github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -414,7 +423,11 @@ github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzP github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= +github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= @@ -500,6 +513,8 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ= github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0= github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= +github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= +github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gocql/gocql v0.0.0-20210515062232-b7ef815b4556/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/godbus/dbus v0.0.0-20151105175453-c7fdd8b5cd55/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/godbus/dbus v0.0.0-20180201030542-885f9cc04c9c/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= @@ -567,12 +582,15 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM= +github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -713,8 +731,15 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230704155506-ba92605f8d15 h1:2q1x9I4n+Eq4+RXXqCJYfcVR41OxYnUmBbL8ch/vL/M= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230704155506-ba92605f8d15/go.mod h1:qsq5ecnA1xi2rLnVQFo/9xksA7I7wQu8c7rqM5xbIrQ= +github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo= +github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY= +github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= +github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE= +github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230717175816-09fb1ab0a259 h1:QGK1dDhR2drZ24HuwYbQVcL71hdKF63+mjR77WhBCWQ= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230717175816-09fb1ab0a259/go.mod h1:qsq5ecnA1xi2rLnVQFo/9xksA7I7wQu8c7rqM5xbIrQ= github.com/instill-ai/usage-client v0.2.4-alpha h1:mYXd62eZsmGKBlzwMcdEgTBgn8zlbagYUHro6+p50c8= github.com/instill-ai/usage-client v0.2.4-alpha/go.mod h1:BMxgyr02sqH6SeITXSV4M1ewwvfklzXIc5yzIqaN0c8= github.com/instill-ai/x v0.3.0-alpha h1:z9fedROOG2dVHhswBfVwU/hzHuq8/JKSUON7inF+FH8= @@ -831,12 +856,18 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/knadh/koanf v1.4.4 h1:d2jY5nCCeoaiqvEKSBW9rEc93EfNy/XWgWsSB3j7JEA= github.com/knadh/koanf v1.4.4/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -913,6 +944,10 @@ github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3N github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI= @@ -1036,6 +1071,8 @@ github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2 github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1092,7 +1129,6 @@ github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= @@ -1166,7 +1202,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= @@ -1213,6 +1249,9 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs= github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA= github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b/go.mod h1:T3BPAOm2cqquPa0MKWeNkmOM5RQsRhkrwMWonFMN7fE= go.einride.tech/aip v0.60.0 h1:h6bgabZ5BCfAptbGex8jbh3VvPBRLa6xq+pQ1CAjHYw= @@ -1360,6 +1399,7 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1393,6 +1433,8 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1494,6 +1536,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sys v0.0.0-20180224232135-f6cff0780e54/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1731,15 +1774,20 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0= +golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/gonum v0.9.3/go.mod h1:TZumC3NeyVQskjXqmyWt4S3bINhy7B4eYwW69EbyX+0= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= gonum.org/v1/plot v0.9.0/go.mod h1:3Pcqqmp6RHvJI72kgb8fThyUnav364FOsdDo2aGW5lY= diff --git a/pkg/datamodel/datamodel.go b/pkg/datamodel/datamodel.go index 05e8258..698df36 100644 --- a/pkg/datamodel/datamodel.go +++ b/pkg/datamodel/datamodel.go @@ -5,8 +5,6 @@ import ( "time" "github.com/gofrs/uuid" - - pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1alpha" ) // Base contains common columns for all tables @@ -31,6 +29,3 @@ type User struct { NewsletterSubscription bool `gorm:"default:false"` CookieToken sql.NullString } - -// PipelineMode is an alias type for Protobuf enum Pipeline.Mode -type PipelineMode pipelinePB.Pipeline_Mode diff --git a/pkg/external/external.go b/pkg/external/external.go index 11c5e05..06c6334 100644 --- a/pkg/external/external.go +++ b/pkg/external/external.go @@ -6,13 +6,15 @@ import ( "fmt" "time" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/log" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + influxdb3 "github.com/InfluxCommunity/influxdb3-go/influx" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/instill-ai/mgmt-backend/config" "github.com/instill-ai/mgmt-backend/pkg/logger" @@ -40,8 +42,8 @@ func InitUsageServiceClient(ctx context.Context, serverConfig *config.ServerConf return usagePB.NewUsageServiceClient(clientConn), clientConn } -// InitInfluxDBServiceClient initialises a InfluxDBServiceClient instance -func InitInfluxDBServiceClient(ctx context.Context, appConfig *config.AppConfig) (influxdb2.Client, api.QueryAPI) { +// InitInfluxDBServiceClientV2 initialises a InfluxDBServiceClientV2 instance +func InitInfluxDBServiceClientV2(ctx context.Context, appConfig *config.AppConfig) (influxdb2.Client, api.QueryAPI) { logger, _ := logger.GetZapLogger(ctx) @@ -77,3 +79,20 @@ func InitInfluxDBServiceClient(ctx context.Context, appConfig *config.AppConfig) return client, queryAPI } + +// InitInfluxDBServiceClientV3 initialises a InfluxDBServiceClientV3 instance +func InitInfluxDBServiceClientV3(ctx context.Context, appConfig *config.AppConfig) *influxdb3.Client { + + logger, _ := logger.GetZapLogger(ctx) + + client, err := influxdb3.New(influxdb3.Configs{ + HostURL: appConfig.InfluxDB.URL, + AuthToken: appConfig.InfluxDB.Token, + }) + + if err != nil { + logger.Error(err.Error()) + } + + return client +} diff --git a/pkg/handler/publichandler.go b/pkg/handler/publichandler.go index 45664dd..2f9490c 100644 --- a/pkg/handler/publichandler.go +++ b/pkg/handler/publichandler.go @@ -27,7 +27,6 @@ import ( custom_otel "github.com/instill-ai/mgmt-backend/pkg/logger/otel" mgmtPB "github.com/instill-ai/protogen-go/base/mgmt/v1alpha" healthcheckPB "github.com/instill-ai/protogen-go/common/healthcheck/v1alpha" - pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1alpha" checkfield "github.com/instill-ai/x/checkfield" ) @@ -635,14 +634,16 @@ func (h *PublicHandler) ListPipelineTriggerRecords(ctx context.Context, req *mgm return &mgmtPB.ListPipelineTriggerRecordsResponse{}, err } - var mode pipelinePB.Pipeline_Mode + var mode mgmtPB.Mode + var status mgmtPB.Status declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{ filtering.DeclareStandardFunctions(), filtering.DeclareIdent("start", filtering.TypeTimestamp), filtering.DeclareIdent("stop", filtering.TypeTimestamp), filtering.DeclareIdent("pipeline_id", filtering.TypeString), - filtering.DeclareEnumIdent("pipeline_mode", mode.Type()), + filtering.DeclareEnumIdent("trigger_mode", mode.Type()), + filtering.DeclareEnumIdent("status", status.Type()), }...) if err != nil { span.SetStatus(1, err.Error()) @@ -677,3 +678,179 @@ func (h *PublicHandler) ListPipelineTriggerRecords(ctx context.Context, req *mgm return &resp, nil } + +func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req *mgmtPB.ListPipelineTriggerChartRecordsRequest) (*mgmtPB.ListPipelineTriggerChartRecordsResponse, error) { + + eventName := "ListPipelineTriggerChartRecords" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + + logger, _ := logger.GetZapLogger(ctx) + + pbUser, err := h.GetUser(ctx) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListPipelineTriggerChartRecordsResponse{}, err + } + + var mode mgmtPB.Mode + var status mgmtPB.Status + + declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{ + filtering.DeclareStandardFunctions(), + filtering.DeclareIdent("start", filtering.TypeTimestamp), + filtering.DeclareIdent("stop", filtering.TypeTimestamp), + filtering.DeclareIdent("pipeline_id", filtering.TypeString), + filtering.DeclareEnumIdent("trigger_mode", mode.Type()), + filtering.DeclareEnumIdent("status", status.Type()), + }...) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListPipelineTriggerChartRecordsResponse{}, err + } + + filter, err := filtering.ParseFilter(req, declarations) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListPipelineTriggerChartRecordsResponse{}, err + } + + pipelineTriggerChartRecords, err := h.Service.ListPipelineTriggerChartRecords(ctx, pbUser, req.GetAggregationWindow(), filter) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListPipelineTriggerChartRecordsResponse{}, err + } + + resp := mgmtPB.ListPipelineTriggerChartRecordsResponse{ + PipelineTriggerChartRecords: pipelineTriggerChartRecords, + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + pbUser, + eventName, + ))) + + return &resp, nil +} + +func (h *PublicHandler) ListConnectorExecuteRecords(ctx context.Context, req *mgmtPB.ListConnectorExecuteRecordsRequest) (*mgmtPB.ListConnectorExecuteRecordsResponse, error) { + + eventName := "ListConnectorExecuteRecords" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + + logger, _ := logger.GetZapLogger(ctx) + + pbUser, err := h.GetUser(ctx) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListConnectorExecuteRecordsResponse{}, err + } + + var status mgmtPB.Status + + declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{ + filtering.DeclareStandardFunctions(), + filtering.DeclareIdent("start", filtering.TypeTimestamp), + filtering.DeclareIdent("stop", filtering.TypeTimestamp), + filtering.DeclareIdent("connector_id", filtering.TypeString), + filtering.DeclareEnumIdent("status", status.Type()), + }...) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListConnectorExecuteRecordsResponse{}, err + } + + filter, err := filtering.ParseFilter(req, declarations) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListConnectorExecuteRecordsResponse{}, err + } + + connectorExecuteRecords, totalSize, nextPageToken, err := h.Service.ListConnectorExecuteRecords(ctx, pbUser, req.GetPageSize(), req.GetPageToken(), filter) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListConnectorExecuteRecordsResponse{}, err + } + + resp := mgmtPB.ListConnectorExecuteRecordsResponse{ + ConnectorExecuteRecords: connectorExecuteRecords, + NextPageToken: nextPageToken, + TotalSize: totalSize, + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + pbUser, + eventName, + custom_otel.SetEventResult(fmt.Sprintf("Total records retrieved: %v", totalSize)), + ))) + + return &resp, nil +} + +func (h *PublicHandler) ListConnectorExecuteChartRecords(ctx context.Context, req *mgmtPB.ListConnectorExecuteChartRecordsRequest) (*mgmtPB.ListConnectorExecuteChartRecordsResponse, error) { + + eventName := "ListConnectorExecuteChartRecords" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + + logger, _ := logger.GetZapLogger(ctx) + + pbUser, err := h.GetUser(ctx) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListConnectorExecuteChartRecordsResponse{}, err + } + + var status mgmtPB.Status + + declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{ + filtering.DeclareStandardFunctions(), + filtering.DeclareIdent("start", filtering.TypeTimestamp), + filtering.DeclareIdent("stop", filtering.TypeTimestamp), + filtering.DeclareIdent("connector_id", filtering.TypeString), + filtering.DeclareEnumIdent("status", status.Type()), + }...) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListConnectorExecuteChartRecordsResponse{}, err + } + + filter, err := filtering.ParseFilter(req, declarations) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListConnectorExecuteChartRecordsResponse{}, err + } + + connectorExecuteChartRecords, err := h.Service.ListConnectorExecuteChartRecords(ctx, pbUser, req.GetAggregationWindow(), filter) + if err != nil { + span.SetStatus(1, err.Error()) + return &mgmtPB.ListConnectorExecuteChartRecordsResponse{}, err + } + + resp := mgmtPB.ListConnectorExecuteChartRecordsResponse{ + ConnectorExecuteChartRecords: connectorExecuteChartRecords, + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + pbUser, + eventName, + ))) + + return &resp, nil +} diff --git a/pkg/repository/influx.go b/pkg/repository/influx.go index 63d130d..866b6f4 100644 --- a/pkg/repository/influx.go +++ b/pkg/repository/influx.go @@ -18,7 +18,6 @@ import ( "github.com/instill-ai/x/paginate" mgmtPB "github.com/instill-ai/protogen-go/base/mgmt/v1alpha" - pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1alpha" ) // DefaultPageSize is the default pagination page size when page size is not assigned @@ -27,9 +26,15 @@ const DefaultPageSize = 100 // MaxPageSize is the maximum pagination page size if the assigned value is over this number const MaxPageSize = 1000 +// Default aggregate window +const AggregationWindow = 3600000000000 + // InfluxDB interface type InfluxDB interface { QueryPipelineTriggerRecords(ctx context.Context, owner string, pageSize int64, pageToken string, filter filtering.Filter) (pipelines []*mgmtPB.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error) + QueryPipelineTriggerChartRecords(ctx context.Context, owner string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtPB.PipelineTriggerChartRecord, err error) + QueryConnectorExecuteRecords(ctx context.Context, owner string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtPB.ConnectorExecuteRecord, totalSize int64, nextPageToken string, err error) + QueryConnectorExecuteChartRecords(ctx context.Context, owner string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtPB.ConnectorExecuteChartRecord, err error) } type influxDB struct { @@ -45,6 +50,8 @@ func NewInfluxDB(queryAPI api.QueryAPI, bucket string) InfluxDB { } } +//TODO: reuse duplicate codes + func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtPB.PipelineTriggerRecord, totalSize int64, nextPageToken string, err error) { logger, _ := logger.GetZapLogger(ctx) @@ -96,12 +103,14 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string |> range(start: %v, stop: %v) |> filter(fn: (r) => r["_measurement"] == "pipeline.trigger") |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> filter(fn: (r) => r["owner_uid"] == "%v") %v |> group() |> limit(n: %v)`, i.bucket, start, stop, + owner, expr, pageSize, ) @@ -111,12 +120,14 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string |> range(start: %v, stop: %v) |> filter(fn: (r) => r["_measurement"] == "pipeline.trigger") |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> filter(fn: (r) => r["owner_uid"] == "%v") %v |> group() |> count(column: "pipeline_trigger_id")`, i.bucket, start, stop, + owner, expr, ) @@ -132,7 +143,6 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string result, err := i.queryAPI.Query(ctx, query) var lastTimestamp time.Time - var ownerUUID string if err != nil { return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) } else { @@ -154,9 +164,9 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string PipelineTriggerId: result.Record().ValueByKey("pipeline_trigger_id").(string), PipelineId: result.Record().ValueByKey("pipeline_id").(string), PipelineUid: result.Record().ValueByKey("pipeline_uid").(string), - PipelineMode: pipelinePB.Pipeline_Mode(pipelinePB.Pipeline_Mode_value[result.Record().ValueByKey("pipeline_mode").(string)]), + TriggerMode: mgmtPB.Mode(mgmtPB.Mode_value[result.Record().ValueByKey("trigger_mode").(string)]), ComputeTimeDuration: float32(result.Record().ValueByKey("compute_time_duration").(float64)), - Status: result.Record().ValueByKey("status").(string), + Status: mgmtPB.Status(mgmtPB.Status_value[result.Record().ValueByKey("status").(string)]), }, ) } @@ -169,11 +179,10 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string } lastTimestamp = result.Record().Time() - ownerUUID = result.Record().ValueByKey("owner_uid").(string) } if int64(len(records)) < total { - pageToken = paginate.EncodeToken(lastTimestamp, ownerUUID) + pageToken = paginate.EncodeToken(lastTimestamp, owner) } else { pageToken = "" } @@ -181,7 +190,373 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string return records, int64(len(records)), pageToken, nil } -// TranspileFilter transpiles a parsed AIP filter expression to GORM DB clauses +func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtPB.PipelineTriggerChartRecord, err error) { + + logger, _ := logger.GetZapLogger(ctx) + + start := time.Time{}.Format(time.RFC3339Nano) + stop := time.Now().Format(time.RFC3339Nano) + + if aggregationWindow == 0 { + aggregationWindow = AggregationWindow + } + + // TODO: design better filter expression to flux transpiler + expr, err := i.transpileFilter(filter) + if err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + + if expr != "" { + exprs := strings.Split(expr, "&&") + + iTime := 0 + for _, expr := range exprs { + if strings.HasPrefix(expr, "start") { + start = strings.Split(expr, "@")[1] + iTime += 1 + } + if strings.HasPrefix(expr, "stop") { + stop = strings.Split(expr, "@")[1] + iTime += 1 + } + } + + expr = strings.Join(exprs[iTime:], "") + } + + query := fmt.Sprintf( + `t1 = from(bucket: "%v") + |> range(start: %v, stop: %v) + |> filter(fn: (r) => r["_measurement"] == "pipeline.trigger") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> filter(fn: (r) => r["owner_uid"] == "%v") + %v + |> group(columns: ["pipeline_id", "pipeline_uid", "trigger_mode", "status"]) + |> aggregateWindow(every: duration(v: %v), column: "trigger_time", fn: count, createEmpty: false) + t2 = from(bucket: "%v") + |> range(start: %v, stop: %v) + |> filter(fn: (r) => r["_measurement"] == "pipeline.trigger") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> filter(fn: (r) => r["owner_uid"] == "%v") + %v + |> group(columns: ["pipeline_id", "pipeline_uid", "trigger_mode", "status"]) + |> aggregateWindow(every: duration(v: %v), fn: sum, column: "compute_time_duration", createEmpty: false) + join(tables: {t1: t1, t2:t2}, on: ["_start", "_stop", "_time", "pipeline_id", "pipeline_uid", "trigger_mode", "status"])`, + i.bucket, + start, + stop, + owner, + expr, + aggregationWindow, + i.bucket, + start, + stop, + owner, + expr, + aggregationWindow, + ) + + result, err := i.queryAPI.Query(ctx, query) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } else { + var chartRecord *mgmtPB.PipelineTriggerChartRecord + var currentTablePosition = -1 + // Iterate over query response + for result.Next() { + // Notice when group key has changed + if result.TableChanged() { + logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String())) + } + + if result.Record().Table() != currentTablePosition { + chartRecord = &mgmtPB.PipelineTriggerChartRecord{ + PipelineId: result.Record().ValueByKey("pipeline_id").(string), + PipelineUid: result.Record().ValueByKey("pipeline_uid").(string), + TriggerMode: mgmtPB.Mode(mgmtPB.Mode_value[result.Record().ValueByKey("trigger_mode").(string)]), + Status: mgmtPB.Status(mgmtPB.Status_value[result.Record().ValueByKey("status").(string)]), + TimeBuckets: []*timestamppb.Timestamp{}, + TriggerCounts: []int64{}, + ComputeTimeDuration: []float32{}, + } + records = append(records, chartRecord) + currentTablePosition = result.Record().Table() + } + + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error()) + } + chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(result.Record().ValueByKey("_time").(time.Time))) + chartRecord.TriggerCounts = append(chartRecord.TriggerCounts, result.Record().ValueByKey("trigger_time").(int64)) + chartRecord.ComputeTimeDuration = append(chartRecord.ComputeTimeDuration, float32(result.Record().ValueByKey("compute_time_duration").(float64))) + } + // Check for an error + if result.Err() != nil { + return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } + if result.Record() == nil { + return nil, nil + } + } + + return records, nil +} + +func (i *influxDB) QueryConnectorExecuteRecords(ctx context.Context, owner string, pageSize int64, pageToken string, filter filtering.Filter) (records []*mgmtPB.ConnectorExecuteRecord, totalSize int64, nextPageToken string, err error) { + + logger, _ := logger.GetZapLogger(ctx) + + if pageSize == 0 { + pageSize = DefaultPageSize + } else if pageSize > MaxPageSize { + pageSize = MaxPageSize + } + + start := time.Time{}.Format(time.RFC3339Nano) + stop := time.Now().Format(time.RFC3339Nano) + + // TODO: design better filter expression to flux transpiler + expr, err := i.transpileFilter(filter) + if err != nil { + return nil, 0, "", status.Errorf(codes.Internal, err.Error()) + } + + if expr != "" { + exprs := strings.Split(expr, "&&") + + iTime := 0 + for _, expr := range exprs { + if strings.HasPrefix(expr, "start") { + start = strings.Split(expr, "@")[1] + iTime += 1 + } + if strings.HasPrefix(expr, "stop") { + stop = strings.Split(expr, "@")[1] + iTime += 1 + } + } + + expr = strings.Join(exprs[iTime:], "") + } + + if pageToken != "" { + startTime, _, err := paginate.DecodeToken(pageToken) + if err != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid page token: %s", err.Error()) + } + startTime = startTime.Add(time.Duration(1)) + start = startTime.Format(time.RFC3339Nano) + } + + query := fmt.Sprintf( + `from(bucket: "%v") + |> range(start: %v, stop: %v) + |> filter(fn: (r) => r["_measurement"] == "connector.execute") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> filter(fn: (r) => r["connector_owner_uid"] == "%v") + %v + |> group() + |> limit(n: %v)`, + i.bucket, + start, + stop, + owner, + expr, + pageSize, + ) + + totalQuery := fmt.Sprintf( + `from(bucket: "%v") + |> range(start: %v, stop: %v) + |> filter(fn: (r) => r["_measurement"] == "connector.execute") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> filter(fn: (r) => r["connector_owner_uid"] == "%v") + %v + |> group() + |> count(column: "connector_execute_id")`, + i.bucket, + start, + stop, + owner, + expr, + ) + + totalQueryResult, err := i.queryAPI.Query(ctx, totalQuery) + total := int64(0) + if err != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } else { + if totalQueryResult.Next() { + total = totalQueryResult.Record().ValueByKey("connector_execute_id").(int64) + } + } + + result, err := i.queryAPI.Query(ctx, query) + var lastTimestamp time.Time + if err != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } else { + // Iterate over query response + for result.Next() { + // Notice when group key has changed + if result.TableChanged() { + logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String())) + } + + executeTime, err := time.Parse(time.RFC3339Nano, result.Record().ValueByKey("execute_time").(string)) + if err != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error()) + } + records = append( + records, + &mgmtPB.ConnectorExecuteRecord{ + ExecuteTime: timestamppb.New(executeTime), + ConnectorExecuteId: result.Record().ValueByKey("connector_execute_id").(string), + ConnectorId: result.Record().ValueByKey("connector_id").(string), + ConnectorUid: result.Record().ValueByKey("connector_uid").(string), + ConnectorDefinitionUid: result.Record().ValueByKey("connector_definition_uid").(string), + PipelineTriggerId: result.Record().ValueByKey("pipeline_trigger_id").(string), + PipelineId: result.Record().ValueByKey("pipeline_id").(string), + PipelineUid: result.Record().ValueByKey("pipeline_uid").(string), + ComputeTimeDuration: float32(result.Record().ValueByKey("compute_time_duration").(float64)), + Status: mgmtPB.Status(mgmtPB.Status_value[result.Record().ValueByKey("status").(string)]), + }, + ) + } + // Check for an error + if result.Err() != nil { + return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } + if result.Record() == nil { + return nil, 0, "", nil + } + + lastTimestamp = result.Record().Time() + } + + if int64(len(records)) < total { + pageToken = paginate.EncodeToken(lastTimestamp, owner) + } else { + pageToken = "" + } + + return records, int64(len(records)), pageToken, nil +} + +func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner string, aggregationWindow int64, filter filtering.Filter) (records []*mgmtPB.ConnectorExecuteChartRecord, err error) { + + logger, _ := logger.GetZapLogger(ctx) + + start := time.Time{}.Format(time.RFC3339Nano) + stop := time.Now().Format(time.RFC3339Nano) + + if aggregationWindow == 0 { + aggregationWindow = AggregationWindow + } + + // TODO: design better filter expression to flux transpiler + expr, err := i.transpileFilter(filter) + if err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } + + if expr != "" { + exprs := strings.Split(expr, "&&") + + iTime := 0 + for _, expr := range exprs { + if strings.HasPrefix(expr, "start") { + start = strings.Split(expr, "@")[1] + iTime += 1 + } + if strings.HasPrefix(expr, "stop") { + stop = strings.Split(expr, "@")[1] + iTime += 1 + } + } + + expr = strings.Join(exprs[iTime:], "") + } + + query := fmt.Sprintf( + `t1 = from(bucket: "%v") + |> range(start: %v, stop: %v) + |> filter(fn: (r) => r["_measurement"] == "connector.execute") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> filter(fn: (r) => r["connector_owner_uid"] == "%v") + %v + |> group(columns: ["connector_id", "connector_uid", "status"]) + |> aggregateWindow(every: duration(v: %v), column: "execute_time", fn: count, createEmpty: false) + t2 = from(bucket: "%v") + |> range(start: %v, stop: %v) + |> filter(fn: (r) => r["_measurement"] == "connector.execute") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> filter(fn: (r) => r["connector_owner_uid"] == "%v") + %v + |> group(columns: ["connector_id", "connector_uid", "status"]) + |> aggregateWindow(every: duration(v: %v), fn: sum, column: "compute_time_duration", createEmpty: false) + join(tables: {t1: t1, t2:t2}, on: ["_start", "_stop", "_time", "connector_id", "connector_uid", "status"])`, + i.bucket, + start, + stop, + owner, + expr, + aggregationWindow, + i.bucket, + start, + stop, + owner, + expr, + aggregationWindow, + ) + + result, err := i.queryAPI.Query(ctx, query) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } else { + var chartRecord *mgmtPB.ConnectorExecuteChartRecord + var currentTablePosition = -1 + // Iterate over query response + for result.Next() { + // Notice when group key has changed + if result.TableChanged() { + logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String())) + } + + if result.Record().Table() != currentTablePosition { + chartRecord = &mgmtPB.ConnectorExecuteChartRecord{ + ConnectorId: result.Record().ValueByKey("connector_id").(string), + ConnectorUid: result.Record().ValueByKey("connector_uid").(string), + Status: mgmtPB.Status(mgmtPB.Status_value[result.Record().ValueByKey("status").(string)]), + TimeBuckets: []*timestamppb.Timestamp{}, + ExecuteCounts: []int64{}, + ComputeTimeDuration: []float32{}, + } + records = append(records, chartRecord) + currentTablePosition = result.Record().Table() + } + + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error()) + } + chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(result.Record().ValueByKey("_time").(time.Time))) + chartRecord.ExecuteCounts = append(chartRecord.ExecuteCounts, result.Record().ValueByKey("execute_time").(int64)) + chartRecord.ComputeTimeDuration = append(chartRecord.ComputeTimeDuration, float32(result.Record().ValueByKey("compute_time_duration").(float64))) + } + // Check for an error + if result.Err() != nil { + return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error()) + } + if result.Record() == nil { + return nil, nil + } + } + + return records, nil +} + +// TranspileFilter transpiles a parsed AIP filter expression to Flux query expression func (i *influxDB) transpileFilter(filter filtering.Filter) (string, error) { return (&Transpiler{ filter: filter, diff --git a/pkg/service/metric.go b/pkg/service/metric.go index edf3ded..0bed452 100644 --- a/pkg/service/metric.go +++ b/pkg/service/metric.go @@ -17,3 +17,33 @@ func (s *service) ListPipelineTriggerRecords(ctx context.Context, owner *mgmtPB. return pipelineTriggerRecords, ps, pt, nil } + +func (s *service) ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtPB.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtPB.PipelineTriggerChartRecord, error) { + + pipelineTriggerChartRecords, err := s.influxDB.QueryPipelineTriggerChartRecords(ctx, *owner.Uid, aggregationWindow, filter) + if err != nil { + return nil, err + } + + return pipelineTriggerChartRecords, nil +} + +func (s *service) ListConnectorExecuteRecords(ctx context.Context, owner *mgmtPB.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtPB.ConnectorExecuteRecord, int64, string, error) { + + connectorExecuteRecords, ps, pt, err := s.influxDB.QueryConnectorExecuteRecords(ctx, *owner.Uid, pageSize, pageToken, filter) + if err != nil { + return nil, 0, "", err + } + + return connectorExecuteRecords, ps, pt, nil +} + +func (s *service) ListConnectorExecuteChartRecords(ctx context.Context, owner *mgmtPB.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtPB.ConnectorExecuteChartRecord, error) { + + connectorExecuteChartRecords, err := s.influxDB.QueryConnectorExecuteChartRecords(ctx, *owner.Uid, aggregationWindow, filter) + if err != nil { + return nil, err + } + + return connectorExecuteChartRecords, nil +} diff --git a/pkg/service/service.go b/pkg/service/service.go index 65c3310..e22cca0 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -27,18 +27,21 @@ type Service interface { DeleteUserByID(ctx context.Context, id string) error ListPipelineTriggerRecords(ctx context.Context, owner *mgmtPB.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtPB.PipelineTriggerRecord, int64, string, error) + ListPipelineTriggerChartRecords(ctx context.Context, owner *mgmtPB.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtPB.PipelineTriggerChartRecord, error) + ListConnectorExecuteRecords(ctx context.Context, owner *mgmtPB.User, pageSize int64, pageToken string, filter filtering.Filter) ([]*mgmtPB.ConnectorExecuteRecord, int64, string, error) + ListConnectorExecuteChartRecords(ctx context.Context, owner *mgmtPB.User, aggregationWindow int64, filter filtering.Filter) ([]*mgmtPB.ConnectorExecuteChartRecord, error) } type service struct { repository repository.Repository - influxDB repository.InfluxDB + influxDB repository.InfluxDB } // NewService initiates a service instance func NewService(r repository.Repository, i repository.InfluxDB) Service { return &service{ repository: r, - influxDB: i, + influxDB: i, } } From 0689b177ee662d9b1c07e2749d8c4b19a76fa1ba Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 18 Jul 2023 05:43:42 +0800 Subject: [PATCH 2/3] chore: test disk clean --- .github/workflows/integration-test.yml | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index cd0b653..c446c06 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -52,9 +52,23 @@ jobs: run: | df --human-readable sudo apt clean - docker rmi $(docker image ls --all --quiet) rm --recursive --force "$AGENT_TOOLSDIRECTORY" - df --human-readable + + - name: Free Disk Space (Ubuntu) + uses: jlumbroso/free-disk-space@main + with: + # this might remove tools that are actually needed, + # if set to "true" but frees about 6 GB + tool-cache: false + + # all of these default to true, but feel free to set to + # "false" if necessary for your workflow + android: true + dotnet: true + haskell: true + large-packages: false + docker-images: true + swap-storage: true - name: Checkout uses: actions/checkout@v3 From 0eb12900199660cbfca46e58db97627475822b85 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Tue, 18 Jul 2023 12:20:52 +0800 Subject: [PATCH 3/3] chore: update default aggregation window --- pkg/repository/influx.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/repository/influx.go b/pkg/repository/influx.go index 866b6f4..6dcf284 100644 --- a/pkg/repository/influx.go +++ b/pkg/repository/influx.go @@ -27,7 +27,7 @@ const DefaultPageSize = 100 const MaxPageSize = 1000 // Default aggregate window -const AggregationWindow = 3600000000000 +var defaultAggregationWindow = time.Hour.Nanoseconds() // InfluxDB interface type InfluxDB interface { @@ -197,8 +197,8 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s start := time.Time{}.Format(time.RFC3339Nano) stop := time.Now().Format(time.RFC3339Nano) - if aggregationWindow == 0 { - aggregationWindow = AggregationWindow + if aggregationWindow < time.Minute.Nanoseconds() { + aggregationWindow = defaultAggregationWindow } // TODO: design better filter expression to flux transpiler @@ -451,8 +451,8 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner start := time.Time{}.Format(time.RFC3339Nano) stop := time.Now().Format(time.RFC3339Nano) - if aggregationWindow == 0 { - aggregationWindow = AggregationWindow + if aggregationWindow < time.Minute.Nanoseconds() { + aggregationWindow = defaultAggregationWindow } // TODO: design better filter expression to flux transpiler