From dce182b39d1886052471e50a129db5856f316cd4 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 13 Jul 2023 22:13:00 -0300 Subject: [PATCH 01/35] fix(sinks): fixes on custom headers validation. --- maestro/config/exporter_builder.go | 26 ++++++++++++++----- maestro/config/types.go | 3 ++- .../backend/otlphttpexporter/configuration.go | 15 +++++++++++ sinks/backend/prometheus/configuration.go | 2 +- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/maestro/config/exporter_builder.go b/maestro/config/exporter_builder.go index 56231ba42..a6f27af97 100644 --- a/maestro/config/exporter_builder.go +++ b/maestro/config/exporter_builder.go @@ -51,11 +51,23 @@ type OTLPHTTPExporterBuilder struct { } func (O *OTLPHTTPExporterBuilder) GetExportersFromMetadata(config types.Metadata, authenticationExtensionName string) (Exporters, string) { - endpointCfg := config.GetSubMetadata("exporter")["endpoint"].(string) - return Exporters{ - OTLPExporter: &OTLPExporterConfig{ - Endpoint: endpointCfg, - Auth: Auth{Authenticator: authenticationExtensionName}, - }, - }, "otlphttp" + exporterSubMeta := config.GetSubMetadata("exporter") + endpointCfg := exporterSubMeta["endpoint"].(string) + customHeaders, ok := exporterSubMeta["headers"] + if !ok || customHeaders == nil { + return Exporters{ + OTLPExporter: &OTLPExporterConfig{ + Endpoint: endpointCfg, + Auth: Auth{Authenticator: authenticationExtensionName}, + }, + }, "otlphttp" + } else { + return Exporters{ + OTLPExporter: &OTLPExporterConfig{ + Endpoint: endpointCfg, + Auth: Auth{Authenticator: authenticationExtensionName}, + Headers: customHeaders.(map[string]interface{}), + }, + }, "otlphttp" + } } diff --git a/maestro/config/types.go b/maestro/config/types.go index da3321da4..637abb2c2 100644 --- a/maestro/config/types.go +++ b/maestro/config/types.go @@ -134,7 +134,8 @@ type LoggingExporterConfig struct { } type OTLPExporterConfig struct { - Endpoint string `json:"endpoint" yaml:"endpoint"` + Endpoint string `json:"endpoint" yaml:"endpoint"` + Headers map[string]interface{} `json:"headers,omitempty" yaml:"headers,omitempty"` Auth struct { Authenticator string `json:"authenticator" yaml:"authenticator"` } diff --git a/sinks/backend/otlphttpexporter/configuration.go b/sinks/backend/otlphttpexporter/configuration.go index 87143dd80..48eec15f3 100644 --- a/sinks/backend/otlphttpexporter/configuration.go +++ b/sinks/backend/otlphttpexporter/configuration.go @@ -31,6 +31,11 @@ import ( // insecure_skip_verify: true const EndpointFieldName = "endpoint" +const CustomHeadersConfigFeature = "headers" + +var invalidCustomHeaders = []string{ + "Content-Encoding", "Content-Type", "User-Agent", "Authorization", +} type OTLPHTTPBackend struct { Endpoint string `yaml:"endpoint"` @@ -89,6 +94,16 @@ func (b *OTLPHTTPBackend) ValidateConfiguration(config types.Metadata) error { if _, err := url.ParseRequestURI(endpointUrl.(string)); err != nil { return errors.Wrap(errors.ErrInvalidEndpoint, err) } + // check for custom http headers + customHeaders, customHeadersOk := config[CustomHeadersConfigFeature] + if customHeadersOk { + headersAsMap := customHeaders.(map[string]interface{}) + for _, header := range invalidCustomHeaders { + if _, ok := headersAsMap[header]; ok { + return errors.New("invalid custom headers") + } + } + } return nil } diff --git a/sinks/backend/prometheus/configuration.go b/sinks/backend/prometheus/configuration.go index f4aac0529..81ccaf499 100644 --- a/sinks/backend/prometheus/configuration.go +++ b/sinks/backend/prometheus/configuration.go @@ -61,7 +61,7 @@ func (p *Backend) ValidateConfiguration(config types.Metadata) error { if customHeadersOk { headersAsMap := customHeaders.(map[string]interface{}) for _, header := range invalidCustomHeaders { - if _, ok := headersAsMap[header]; !ok { + if _, ok := headersAsMap[header]; ok { return errors.New("invalid custom headers") } } From 48a03cb5cc3155275df76293fd0f02c4547a8c14 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 13 Jul 2023 23:01:40 -0300 Subject: [PATCH 02/35] fix(sinks): general fixes on working. --- sinker/config_state_check.go | 2 +- sinker/redis/consumer/streams.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sinker/config_state_check.go b/sinker/config_state_check.go index 4eb7f2af8..c52357231 100644 --- a/sinker/config_state_check.go +++ b/sinker/config_state_check.go @@ -15,7 +15,7 @@ const ( streamID = "orb.sinker" streamLen = 1000 CheckerFreq = 5 * time.Minute - DefaultTimeout = 30 * time.Minute + DefaultTimeout = 5 * time.Minute ) func (svc *SinkerService) checkState(_ time.Time) { diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index 95be472f5..2488fbb5b 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -150,6 +150,7 @@ func (es eventStore) handleSinksRemove(_ context.Context, e updateSinkEvent) err if ok := es.configRepo.Exists(e.owner, e.sinkID); ok { err := es.configRepo.Remove(e.owner, e.sinkID) if err != nil { + es.logger.Error("error during remove sinker cache entry", zap.Error(err)) return err } } From 066add3aa0c9a5a7d43732ceb4a3109ec8ba3ed9 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 13 Jul 2023 23:34:48 -0300 Subject: [PATCH 03/35] fix(sinks): fix conflict error code. --- sinks/sinks_service.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index 66d595e11..086cca26c 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -91,7 +91,7 @@ func validateAuthType(s *Sink) (authentication_type.AuthenticationType, error) { } authTypeStr, ok := authMetadata["type"] if !ok { - return nil, errors.Wrap(errors.ErrAuthTypeNotFound, errors.New("authentication type not found")) + return nil, errors.Wrap(errors.ErrAuthTypeNotFound, errors.New("authentication type not found")) } if _, ok := authTypeStr.(string); !ok { @@ -341,11 +341,11 @@ func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) } err = svc.sinkRepo.Update(ctx, sink) if err != nil { - return Sink{}, errors.Wrap(ErrUpdateEntity, err) + return Sink{}, err } sinkEdited, err := svc.sinkRepo.RetrieveById(ctx, sink.ID) if err != nil { - return Sink{}, errors.Wrap(ErrUpdateEntity, err) + return Sink{}, err } sinkEdited, err = svc.decryptMetadata(cfg, sinkEdited) if err != nil { From 44edc77a84e76348eabdebe0aa7bdb7b5de9b56b Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 13 Jul 2023 23:53:09 -0300 Subject: [PATCH 04/35] fix(sinks): fix header not reaching redis cache. --- sinker/config/types.go | 4 +++- sinker/message_handler.go | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sinker/config/types.go b/sinker/config/types.go index 53dfe5b93..220e0fa7f 100644 --- a/sinker/config/types.go +++ b/sinker/config/types.go @@ -19,7 +19,9 @@ type SinkConfig struct { Username string `json:"username"` } `json:"authentication"` Exporter struct { - RemoteHost string `json:"remote_host"` + RemoteHost *string `json:"remote_host",omitempty` + Endpoint *string `json:"endpoint",omitempty` + Headers map[string]string `json:"headers",omitempty` } `json:"exporter"` OpenTelemetry string `json:"opentelemetry"` State PrometheusState `json:"state,omitempty"` diff --git a/sinker/message_handler.go b/sinker/message_handler.go index 1ede15553..5a1f5c4be 100644 --- a/sinker/message_handler.go +++ b/sinker/message_handler.go @@ -34,7 +34,7 @@ func (svc SinkerService) remoteWriteToPrometheus(tsList prometheus.TSList, owner ctx = context.WithValue(ctx, "deprecation", "opentelemetry") } cfg := prometheus.NewConfig( - prometheus.WriteURLOption(cfgRepo.Exporter.RemoteHost), + prometheus.WriteURLOption(*cfgRepo.Exporter.RemoteHost), ) promClient, err := prometheus.NewClient(cfg) @@ -62,7 +62,8 @@ func (svc SinkerService) remoteWriteToPrometheus(tsList prometheus.TSList, owner return err } - svc.logger.Debug("successful sink", zap.Int("payload_size_b", result.PayloadSize), zap.String("sink_id", sinkID), zap.String("url", cfgRepo.Exporter.RemoteHost), zap.String("user", cfgRepo.Authentication.Username)) + svc.logger.Debug("successful sink", zap.Int("payload_size_b", result.PayloadSize), + zap.String("sink_id", sinkID)) if cfgRepo.State != config.Active { cfgRepo.State = config.Active From 6b03ee4f6ef743eaba04fb481e95a6d864d88ace Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 00:33:20 -0300 Subject: [PATCH 05/35] fix(sinks): simplify config on pass-through. --- sinker/config/types.go | 17 ++----- sinker/config_state_check.go | 2 +- sinker/message_handler.go | 18 ++++++-- sinker/redis/consumer/streams.go | 8 ++-- sinker/redis/sinker_test.go | 78 ++++++++++++++++++++++++-------- 5 files changed, 80 insertions(+), 43 deletions(-) diff --git a/sinker/config/types.go b/sinker/config/types.go index 220e0fa7f..59a39c8d1 100644 --- a/sinker/config/types.go +++ b/sinker/config/types.go @@ -6,24 +6,15 @@ package config import ( "database/sql/driver" + "github.com/orb-community/orb/pkg/types" "time" ) // SinkConfigParser to be compatible with new sinks config is coming from eventbus type SinkConfig struct { - SinkID string `json:"sink_id"` - OwnerID string `json:"owner_id"` - Authentication struct { - Type string `json:"type"` - Password string `json:"password"` - Username string `json:"username"` - } `json:"authentication"` - Exporter struct { - RemoteHost *string `json:"remote_host",omitempty` - Endpoint *string `json:"endpoint",omitempty` - Headers map[string]string `json:"headers",omitempty` - } `json:"exporter"` - OpenTelemetry string `json:"opentelemetry"` + SinkID string `json:"sink_id"` + OwnerID string `json:"owner_id"` + Config types.Metadata `json:"config"` State PrometheusState `json:"state,omitempty"` Msg string `json:"msg,omitempty"` LastRemoteWrite time.Time `json:"last_remote_write,omitempty"` diff --git a/sinker/config_state_check.go b/sinker/config_state_check.go index c52357231..7a6301805 100644 --- a/sinker/config_state_check.go +++ b/sinker/config_state_check.go @@ -35,7 +35,7 @@ func (svc *SinkerService) checkState(_ time.Time) { // Set idle if the sinker is more than 30 minutes not sending metrics (Remove from Redis) if cfg.LastRemoteWrite.Add(DefaultTimeout).Before(time.Now()) { if cfg.State == config.Active { - if cfg.OpenTelemetry != "enabled" { + if v, ok := cfg.Config["opentelemetry"]; !ok || v != "enabled" { if err := svc.sinkerCache.Remove(cfg.OwnerID, cfg.SinkID); err != nil { svc.logger.Error("error updating sink config cache", zap.Error(err)) return diff --git a/sinker/message_handler.go b/sinker/message_handler.go index 5a1f5c4be..7db500b25 100644 --- a/sinker/message_handler.go +++ b/sinker/message_handler.go @@ -29,12 +29,18 @@ func (svc SinkerService) remoteWriteToPrometheus(tsList prometheus.TSList, owner return err } ctx := context.Background() - if cfgRepo.OpenTelemetry == "enabled" { + otelMetadata, ok := cfgRepo.Config["opentelemetry"] + if ok && otelMetadata == "enabled" { svc.logger.Info("deprecate warning opentelemetry sink scraping legacy agent", zap.String("sink-ID", cfgRepo.SinkID)) ctx = context.WithValue(ctx, "deprecation", "opentelemetry") } + configMetadata := cfgRepo.Config.GetSubMetadata("exporter") + if configMetadata == nil { + svc.logger.Error("unable to find prometheus remote host", zap.Error(err)) + return err + } cfg := prometheus.NewConfig( - prometheus.WriteURLOption(*cfgRepo.Exporter.RemoteHost), + prometheus.WriteURLOption(configMetadata["remote_host"].(string)), ) promClient, err := prometheus.NewClient(cfg) @@ -42,9 +48,13 @@ func (svc SinkerService) remoteWriteToPrometheus(tsList prometheus.TSList, owner svc.logger.Error("unable to construct client", zap.Error(err)) return err } - + authMetadata := cfgRepo.Config.GetSubMetadata("authentication") + if authMetadata == nil { + svc.logger.Error("unable to find prometheus remote host", zap.Error(err)) + return err + } var headers = make(map[string]string) - headers["Authorization"] = svc.encodeBase64(cfgRepo.Authentication.Username, cfgRepo.Authentication.Password) + headers["Authorization"] = svc.encodeBase64(authMetadata["username"].(string), authMetadata["password"].(string)) result, writeErr := promClient.WriteTimeSeries(ctx, tsList, prometheus.WriteOptions{Headers: headers}) if err := error(writeErr); err != nil { if cfgRepo.Msg != fmt.Sprint(err) { diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index 2488fbb5b..a79b4760a 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -172,11 +172,9 @@ func (es eventStore) handleSinksUpdate(_ context.Context, e updateSinkEvent) err if err != nil { return err } - sinkConfig.Authentication.Type = cfg.Authentication.Type - sinkConfig.Authentication.Username = cfg.Authentication.Username - sinkConfig.Authentication.Password = cfg.Authentication.Password - sinkConfig.Exporter.RemoteHost = cfg.Exporter.RemoteHost - sinkConfig.OpenTelemetry = cfg.OpenTelemetry + if sinkConfig.Config == nil { + sinkConfig.Config = cfg.Config + } if sinkConfig.OwnerID == "" { sinkConfig.OwnerID = e.owner } diff --git a/sinker/redis/sinker_test.go b/sinker/redis/sinker_test.go index 0ee54eb6c..2088cc9e7 100644 --- a/sinker/redis/sinker_test.go +++ b/sinker/redis/sinker_test.go @@ -2,7 +2,7 @@ package redis_test import ( "fmt" - "reflect" + "github.com/orb-community/orb/pkg/types" "testing" "time" @@ -21,10 +21,21 @@ func TestSinkerConfigSave(t *testing.T) { var config config2.SinkConfig config.SinkID = "123" config.OwnerID = "test" - config.Authentication.Type = "basic_auth" - config.Authentication.Username = "user" - config.Authentication.Password = "password" - config.Exporter.RemoteHost = "localhost" + config.Config = types.Metadata{ + "authentication": types.Metadata{ + "password": "password", + "type": "basicauth", + "username": "user", + }, + "exporter": types.Metadata{ + "headers": map[string]string{ + "X-Tenant": "MY_TENANT_1", + }, + "remote_host": "localhost", + }, + "opentelemetry": "enabled", + } + config.State = 0 config.Msg = "" config.LastRemoteWrite = time.Time{} @@ -40,8 +51,7 @@ func TestSinkerConfigSave(t *testing.T) { config: config2.SinkConfig{ SinkID: "124", OwnerID: "test", - Exporter: config.Exporter, - Authentication: config.Authentication, + Config: config.Config, State: 0, Msg: "", LastRemoteWrite: time.Time{}, @@ -67,10 +77,20 @@ func TestGetSinkerConfig(t *testing.T) { var config config2.SinkConfig config.SinkID = "123" config.OwnerID = "test" - config.Authentication.Type = "basic_auth" - config.Authentication.Username = "user" - config.Authentication.Password = "password" - config.Exporter.RemoteHost = "localhost" + config.Config = types.Metadata{ + "authentication": types.Metadata{ + "password": "password", + "type": "basicauth", + "username": "user", + }, + "exporter": types.Metadata{ + "headers": map[string]string{ + "X-Tenant": "MY_TENANT_1", + }, + "remote_host": "localhost", + }, + "opentelemetry": "enabled", + } config.State = 0 config.Msg = "" config.LastRemoteWrite = time.Time{} @@ -98,7 +118,17 @@ func TestGetSinkerConfig(t *testing.T) { for desc, tc := range cases { t.Run(desc, func(t *testing.T) { sinkConfig, err := sinkerCache.Get(tc.config.OwnerID, tc.sinkID) - assert.True(t, reflect.DeepEqual(tc.config, sinkConfig), fmt.Sprintf("%s: expected %v got %v", desc, tc.config, sinkConfig)) + assert.Equal(t, tc.config.SinkID, sinkConfig.SinkID, fmt.Sprintf("%s: expected %s got %s", desc, tc.config.SinkID, sinkConfig.SinkID)) + assert.Equal(t, tc.config.State, sinkConfig.State, fmt.Sprintf("%s: expected %s got %s", desc, tc.config.State, sinkConfig.State)) + assert.Equal(t, tc.config.OwnerID, sinkConfig.OwnerID, fmt.Sprintf("%s: expected %s got %s", desc, tc.config.OwnerID, sinkConfig.OwnerID)) + assert.Equal(t, tc.config.Msg, sinkConfig.Msg, fmt.Sprintf("%s: expected %s got %s", desc, tc.config.Msg, sinkConfig.Msg)) + assert.Equal(t, tc.config.LastRemoteWrite, sinkConfig.LastRemoteWrite, fmt.Sprintf("%s: expected %s got %s", desc, tc.config.LastRemoteWrite, sinkConfig.LastRemoteWrite)) + if tc.config.Config != nil { + _, ok := sinkConfig.Config["authentication"] + assert.True(t, ok, fmt.Sprintf("%s: should contain authentication metadata", desc)) + _, ok = sinkConfig.Config["exporter"] + assert.True(t, ok, fmt.Sprintf("%s: should contain exporter metadata", desc)) + } assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s", desc, tc.err, err)) }) } @@ -109,12 +139,22 @@ func TestGetAllSinkerConfig(t *testing.T) { var config config2.SinkConfig config.SinkID = "123" config.OwnerID = "test" - config.Authentication.Type = "basic_auth" - config.Authentication.Username = "user" - config.Authentication.Password = "password" - config.Exporter.RemoteHost = "localhost" config.State = 0 config.Msg = "" + config.Config = types.Metadata{ + "authentication": types.Metadata{ + "password": "password", + "type": "basicauth", + "username": "user", + }, + "exporter": types.Metadata{ + "headers": map[string]string{ + "X-Tenant": "MY_TENANT_1", + }, + "remote_host": "localhost", + }, + "opentelemetry": "enabled", + } config.LastRemoteWrite = time.Time{} sinksConfig := map[string]struct { config config2.SinkConfig @@ -123,8 +163,7 @@ func TestGetAllSinkerConfig(t *testing.T) { config: config2.SinkConfig{ SinkID: "123", OwnerID: "test", - Exporter: config.Exporter, - Authentication: config.Authentication, + Config: config.Config, State: 0, Msg: "", LastRemoteWrite: time.Time{}, @@ -134,8 +173,7 @@ func TestGetAllSinkerConfig(t *testing.T) { config: config2.SinkConfig{ SinkID: "134", OwnerID: "test", - Exporter: config.Exporter, - Authentication: config.Authentication, + Config: config.Config, State: 0, Msg: "", LastRemoteWrite: time.Time{}, From 601a182ab51af6a6cd15a707d4e3916f4cba495d Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 00:50:30 -0300 Subject: [PATCH 06/35] fix(sinks): add better error messages. --- sinks/sinks_service.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index 086cca26c..3792230c4 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -295,11 +295,11 @@ func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) sink.Backend = currentSink.Backend be, err := validateBackend(&sink) if err != nil { - return Sink{}, errors.Wrap(ErrMalformedEntity, err) + return Sink{}, errors.Wrap(errors.New("incorrect backend and exporter configuration"), err) } at, err := validateAuthType(&sink) if err != nil { - return Sink{}, errors.Wrap(ErrMalformedEntity, err) + return Sink{}, errors.Wrap(errors.New("incorrect authentication configuration"), err) } cfg = Configuration{ Authentication: at, @@ -313,7 +313,8 @@ func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) if sink.Format == "yaml" { configDataByte, err := yaml.Marshal(sink.Config) if err != nil { - return Sink{}, errors.Wrap(ErrMalformedEntity, err) + svc.logger.Error("failed to marshal config data", zap.Error(err)) + return Sink{}, errors.Wrap(errors.New("configuration is invalid for yaml format"), err) } sink.ConfigData = string(configDataByte) } From 1e4dfd0dd38bc9d3604b47083d2238241d16b08a Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 09:06:12 -0300 Subject: [PATCH 07/35] fix(sinker): fix handle config. --- sinker/redis/consumer/streams.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index a79b4760a..bf20331a3 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -113,6 +113,7 @@ func decodeSinksCreate(event map[string]interface{}) (updateSinkEvent, error) { val := updateSinkEvent{ sinkID: read(event, "sink_id", ""), owner: read(event, "owner", ""), + config: readMetadata(event, "config"), timestamp: time.Time{}, } var metadata types.Metadata @@ -127,6 +128,7 @@ func decodeSinksUpdate(event map[string]interface{}) (updateSinkEvent, error) { val := updateSinkEvent{ sinkID: read(event, "sink_id", ""), owner: read(event, "owner", ""), + config: readMetadata(event, "config"), timestamp: time.Time{}, } var metadata types.Metadata @@ -141,6 +143,7 @@ func decodeSinksRemove(event map[string]interface{}) (updateSinkEvent, error) { val := updateSinkEvent{ sinkID: read(event, "sink_id", ""), owner: read(event, "owner", ""), + config: readMetadata(event, "config"), timestamp: time.Time{}, } return val, nil @@ -198,18 +201,12 @@ func (es eventStore) handleSinksUpdate(_ context.Context, e updateSinkEvent) err } func (es eventStore) handleSinksCreate(_ context.Context, e updateSinkEvent) error { - data, err := json.Marshal(e.config) - if err != nil { - return err - } var cfg config.SinkConfig - if err := json.Unmarshal(data, &cfg); err != nil { - return err - } + cfg.Config = types.FromMap(e.config) cfg.SinkID = e.sinkID cfg.OwnerID = e.owner cfg.State = config.Unknown - err = es.configRepo.Add(cfg) + err := es.configRepo.Add(cfg) if err != nil { return err } @@ -224,3 +221,12 @@ func read(event map[string]interface{}, key, def string) string { } return val } + +func readMetadata(event map[string]interface{}, key string) types.Metadata { + val, ok := event[key].(types.Metadata) + if !ok { + return types.Metadata{} + } + + return val +} From b1dfc8f957c663735a0b560bb3ffab579954d770 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 09:35:57 -0300 Subject: [PATCH 08/35] fix(sinks): fix delete event. --- sinker/redis/consumer/streams.go | 15 --------------- sinks/redis/producer/events.go | 2 +- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index bf20331a3..e7befbce7 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -69,11 +69,6 @@ func (es eventStore) Subscribe(context context.Context) error { break } err = es.handleSinksCreate(context, rte) - if err != nil { - es.logger.Error("Failed to handle event", zap.String("operation", event["operation"].(string)), zap.Error(err)) - break - } - es.client.XAck(context, stream, subGroup, msg.ID) case sinksUpdate: rte, derr := decodeSinksUpdate(event) if derr != nil { @@ -116,11 +111,6 @@ func decodeSinksCreate(event map[string]interface{}) (updateSinkEvent, error) { config: readMetadata(event, "config"), timestamp: time.Time{}, } - var metadata types.Metadata - if err := json.Unmarshal([]byte(read(event, "config", "")), &metadata); err != nil { - return updateSinkEvent{}, err - } - val.config = metadata return val, nil } @@ -131,11 +121,6 @@ func decodeSinksUpdate(event map[string]interface{}) (updateSinkEvent, error) { config: readMetadata(event, "config"), timestamp: time.Time{}, } - var metadata types.Metadata - if err := json.Unmarshal([]byte(read(event, "config", "")), &metadata); err != nil { - return updateSinkEvent{}, err - } - val.config = metadata return val, nil } diff --git a/sinks/redis/producer/events.go b/sinks/redis/producer/events.go index a578ba648..acec59a7c 100644 --- a/sinks/redis/producer/events.go +++ b/sinks/redis/producer/events.go @@ -59,7 +59,7 @@ type deleteSinkEvent struct { func (dse deleteSinkEvent) Encode() (map[string]interface{}, error) { return map[string]interface{}{ "sink_id": dse.sinkID, - "owner_id": dse.ownerID, + "owner": dse.ownerID, "operation": SinkDelete, }, nil } From a590e36ff71d1f8e16154d43561bfda58ad29b40 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 09:39:57 -0300 Subject: [PATCH 09/35] fix(sinker): fix not sending any feedback when delete key is not found. --- sinker/redis/consumer/streams.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index e7befbce7..d01c2c7f5 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -3,6 +3,8 @@ package consumer import ( "context" "encoding/json" + "fmt" + "github.com/orb-community/orb/pkg/errors" "time" "github.com/go-redis/redis/v8" @@ -141,6 +143,10 @@ func (es eventStore) handleSinksRemove(_ context.Context, e updateSinkEvent) err es.logger.Error("error during remove sinker cache entry", zap.Error(err)) return err } + } else { + es.logger.Error("did not found any sinker cache entry for removal", + zap.String("key", fmt.Sprintf("sinker_key-%s-%s", e.owner, e.sinkID))) + return errors.New("did not found any sinker cache entry for removal") } return nil } From 47abecd9bbffdb5e774acda582060e6f1e220707 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 10:08:36 -0300 Subject: [PATCH 10/35] fix(maestro): fix not removing the sink deployment after the sink is removed. --- maestro/redis/consumer/hashset.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/maestro/redis/consumer/hashset.go b/maestro/redis/consumer/hashset.go index 26b31a995..506d6818e 100644 --- a/maestro/redis/consumer/hashset.go +++ b/maestro/redis/consumer/hashset.go @@ -44,6 +44,10 @@ func (es eventStore) handleSinksDeleteCollector(ctx context.Context, event redis if err != nil { return err } + err = es.sinkerKeyRedisClient.HDel(ctx, deploymentKey, event.SinkID).Err() + if err != nil { + return err + } return nil } From 9555f65866cf98e634190c40d96663ced861af8a Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 14:34:34 -0300 Subject: [PATCH 11/35] fix(agent): fix context cancellation. --- agent/backend/pktvisor/scrape.go | 12 ++++++------ agent/otel/bridgeservice.go | 6 ++++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/agent/backend/pktvisor/scrape.go b/agent/backend/pktvisor/scrape.go index 120e42473..60089085d 100644 --- a/agent/backend/pktvisor/scrape.go +++ b/agent/backend/pktvisor/scrape.go @@ -37,26 +37,26 @@ func (p *pktvisorBackend) scrapeMetrics(period uint) (map[string]interface{}, er func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Metrics, error) { - bridgeService := otel.NewBridgeService(ctx, &p.policyRepo, p.agentTags) + bridgeService := otel.NewBridgeService(ctx, cancelFunc, &p.policyRepo, p.agentTags) if p.mqttClient != nil { cfg := otlpmqttexporter.CreateConfigClient(p.mqttClient, p.otlpMetricsTopic, p.pktvisorVersion, bridgeService) set := otlpmqttexporter.CreateDefaultSettings(p.logger) - // Create the OTLP metrics exporter that'll receive and verify the metrics produced. - exporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg) + // Create the OTLP metrics metricsExporter that'll receive and verify the metrics produced. + metricsExporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg) if err != nil { return nil, err } - return exporter, nil + return metricsExporter, nil } else { cfg := otlpmqttexporter.CreateConfig(p.mqttConfig.Address, p.mqttConfig.Id, p.mqttConfig.Key, p.mqttConfig.ChannelID, p.pktvisorVersion, p.otlpMetricsTopic, bridgeService) set := otlpmqttexporter.CreateDefaultSettings(p.logger) // Create the OTLP metrics exporter that'll receive and verify the metrics produced. - exporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg) + metricsExporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg) if err != nil { return nil, err } - return exporter, nil + return metricsExporter, nil } } diff --git a/agent/otel/bridgeservice.go b/agent/otel/bridgeservice.go index 9e1ea7d2a..ba93ee857 100644 --- a/agent/otel/bridgeservice.go +++ b/agent/otel/bridgeservice.go @@ -21,13 +21,15 @@ var _ AgentBridgeService = (*BridgeService)(nil) type BridgeService struct { bridgeContext context.Context + cancelFunc context.CancelFunc policyRepo policies.PolicyRepo AgentTags map[string]string } -func NewBridgeService(ctx context.Context, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService { +func NewBridgeService(ctx context.Context, cancelFunc context.CancelFunc, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService { return &BridgeService{ bridgeContext: ctx, + cancelFunc: cancelFunc, policyRepo: *policyRepo, AgentTags: agentTags, } @@ -47,5 +49,5 @@ func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) { ctx.Done() - b.bridgeContext.Done() + b.cancelFunc() } From bd7234cba4bce16e49848928e4a406efd67562c8 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 15:06:26 -0300 Subject: [PATCH 12/35] fix(sinks): update sink will set state to unknown. --- sinks/sinks_service.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sinks/sinks_service.go b/sinks/sinks_service.go index 3792230c4..e87941b66 100644 --- a/sinks/sinks_service.go +++ b/sinks/sinks_service.go @@ -218,7 +218,8 @@ func (svc sinkService) UpdateSinkInternal(ctx context.Context, sink Sink) (Sink, defaultMetadata := make(types.Metadata, 1) defaultMetadata["opentelemetry"] = "enabled" sink.Config.Merge(defaultMetadata) - currentSink.Error = "" + sink.State = Unknown + sink.Error = "" if sink.Format == "yaml" { configDataByte, err := yaml.Marshal(sink.Config) if err != nil { @@ -309,7 +310,8 @@ func (svc sinkService) UpdateSink(ctx context.Context, token string, sink Sink) defaultMetadata := make(types.Metadata, 1) defaultMetadata["opentelemetry"] = "enabled" sink.Config.Merge(defaultMetadata) - currentSink.Error = "" + sink.State = Unknown + sink.Error = "" if sink.Format == "yaml" { configDataByte, err := yaml.Marshal(sink.Config) if err != nil { From 12aceccfd728e6eb0806974dde47a57f8188ad3a Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 15:16:39 -0300 Subject: [PATCH 13/35] fix(agent): fix scrape for diode with context cancel func. --- agent/backend/diode/scrape.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/backend/diode/scrape.go b/agent/backend/diode/scrape.go index b1fdfba9d..c4a2d86e2 100644 --- a/agent/backend/diode/scrape.go +++ b/agent/backend/diode/scrape.go @@ -26,7 +26,7 @@ const ( func (d *diodeBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Logs, error) { - bridgeService := otel.NewBridgeService(ctx, &d.policyRepo, d.agentTags) + bridgeService := otel.NewBridgeService(ctx, cancelFunc, &d.policyRepo, d.agentTags) if d.mqttClient != nil { cfg := otlpmqttexporter.CreateConfigClient(d.mqttClient, d.logTopic, d.version, bridgeService) set := otlpmqttexporter.CreateDefaultSettings(d.logger) From 7787122a7a781cc2b38958fd40e59e7ff0a17e3e Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 16:16:24 -0300 Subject: [PATCH 14/35] fix(sinker): fix update setting config to null. --- sinker/redis/consumer/streams.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index d01c2c7f5..d8b4be35d 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -2,7 +2,6 @@ package consumer import ( "context" - "encoding/json" "fmt" "github.com/orb-community/orb/pkg/errors" "time" @@ -152,15 +151,11 @@ func (es eventStore) handleSinksRemove(_ context.Context, e updateSinkEvent) err } func (es eventStore) handleSinksUpdate(_ context.Context, e updateSinkEvent) error { - data, err := json.Marshal(e.config) - if err != nil { - return err - } var cfg config.SinkConfig - if err := json.Unmarshal(data, &cfg); err != nil { - return err - } - + cfg.Config = types.FromMap(e.config) + cfg.SinkID = e.sinkID + cfg.OwnerID = e.owner + cfg.State = config.Unknown if ok := es.configRepo.Exists(e.owner, e.sinkID); ok { sinkConfig, err := es.configRepo.Get(e.owner, e.sinkID) if err != nil { @@ -180,10 +175,7 @@ func (es eventStore) handleSinksUpdate(_ context.Context, e updateSinkEvent) err return err } } else { - cfg.State = config.Unknown - cfg.SinkID = e.sinkID - cfg.OwnerID = e.owner - err = es.configRepo.Add(cfg) + err := es.configRepo.Add(cfg) if err != nil { return err } From d3cb6160ddfbd20d0bc6a9788f1e18046591e995 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 19:09:15 -0300 Subject: [PATCH 15/35] fix(sinker): fix update setting config to null. --- maestro/monitor/monitor.go | 2 +- maestro/redis/consumer/hashset.go | 26 ++++------ maestro/service.go | 8 +++- sinker/redis/consumer/events.go | 10 ++-- sinker/redis/consumer/streams.go | 79 +++++++++++++++---------------- 5 files changed, 59 insertions(+), 66 deletions(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index 5809c7d3b..bc56b7ecc 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -283,7 +283,7 @@ func (svc *monitorService) analyzeLogs(logEntry []string) (status string, err er if strings.Contains(logLine, "400 Bad Request") { errorMessage := "error: remote write returned HTTP status 400 Bad Request" return "warning", errors.New(errorMessage) - } + } // other generic errors if strings.Contains(logLine, "error") { errStringLog := strings.TrimRight(logLine, "error") diff --git a/maestro/redis/consumer/hashset.go b/maestro/redis/consumer/hashset.go index 506d6818e..e5162edff 100644 --- a/maestro/redis/consumer/hashset.go +++ b/maestro/redis/consumer/hashset.go @@ -101,48 +101,38 @@ func (es eventStore) handleSinksUpdateCollector(ctx context.Context, event redis if err != nil { es.logger.Error("could not fetch info for sink", zap.String("sink-id", event.SinkID), zap.Error(err)) } - var cfg types.Metadata - if err := json.Unmarshal(sinkData.Config, &cfg); err != nil { + var metadata types.Metadata + if err := json.Unmarshal(sinkData.Config, &metadata); err != nil { return err } + es.logger.Info("metadata", zap.Any("metadata", metadata)) data := config.SinkData{ SinkID: sinkData.Id, OwnerID: sinkData.OwnerID, Backend: sinkData.Backend, - Config: cfg, + Config: metadata, } _ = data.State.SetFromString(sinkData.State) deploy, err := config.GetDeploymentJson(es.kafkaUrl, data) + es.logger.Info("deploy", zap.Any("deploy", deploy)) if err != nil { es.logger.Error("error trying to get deployment json for sink ID", zap.String("sinkId", event.SinkID), zap.Error(err)) return err } - es.sinkerKeyRedisClient.HSet(ctx, deploymentKey, event.SinkID, deploy) - err = es.kubecontrol.UpdateOtelCollector(ctx, event.Owner, event.SinkID, deploy) - if err != nil { - return err - } - // changing state on updated sink to unknown - sinkData.OwnerID = event.Owner - es.PublishSinkStateChange(sinkData, "unknown", err, err) - data.SinkID = sinkData.Id - data.OwnerID = sinkData.OwnerID - err = data.State.SetFromString("unknown") + err = es.sinkerKeyRedisClient.HSet(ctx, deploymentKey, event.SinkID, deploy).Err() if err != nil { - es.logger.Error("error setting state as unknown", zap.Error(err)) + es.logger.Error("error trying to update deployment json for sink ID", zap.String("sinkId", event.SinkID), zap.Error(err)) return err } - err = es.UpdateSinkStateCache(ctx, data) + err = es.kubecontrol.UpdateOtelCollector(ctx, event.Owner, event.SinkID, deploy) if err != nil { - es.logger.Error("error update sink cache state as unknown", zap.Error(err)) return err } return nil } func (es eventStore) UpdateSinkCache(ctx context.Context, data config.SinkData) (err error) { - data.State = config.Unknown keyPrefix := "sinker_key" skey := fmt.Sprintf("%s-%s:%s", keyPrefix, data.OwnerID, data.SinkID) bytes, err := json.Marshal(data) diff --git a/maestro/service.go b/maestro/service.go index bfa09ea08..cb473aa86 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -12,6 +12,7 @@ import ( "context" "encoding/json" "github.com/orb-community/orb/maestro/monitor" + "github.com/orb-community/orb/pkg/types" "strings" "github.com/go-redis/redis/v8" @@ -81,20 +82,23 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can for _, sinkRes := range sinksRes.Sinks { sinkContext := context.WithValue(loadCtx, "sink-id", sinkRes.Id) - var data maestroconfig.SinkData - if err := json.Unmarshal(sinkRes.Config, &data); err != nil { + var metadata types.Metadata + if err := json.Unmarshal(sinkRes.Config, &metadata); err != nil { svc.logger.Warn("failed to unmarshal sink, skipping", zap.String("sink-id", sinkRes.Id)) continue } if val, _ := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkRes.Id); val != "" { svc.logger.Info("Skipping deploymentEntry because it is already created") } else { + var data maestroconfig.SinkData data.SinkID = sinkRes.Id + data.Config = metadata err := svc.eventStore.CreateDeploymentEntry(sinkContext, data) if err != nil { svc.logger.Warn("failed to create deploymentEntry for sink, skipping", zap.String("sink-id", sinkRes.Id)) continue } + svc.logger.Info("metadata", zap.Any("metadata", metadata)) err = svc.eventStore.UpdateSinkCache(ctx, data) if err != nil { svc.logger.Warn("failed to update cache for sink", zap.String("sink-id", sinkRes.Id)) diff --git a/sinker/redis/consumer/events.go b/sinker/redis/consumer/events.go index ea09af950..9d1639e90 100644 --- a/sinker/redis/consumer/events.go +++ b/sinker/redis/consumer/events.go @@ -14,9 +14,9 @@ import ( "github.com/orb-community/orb/pkg/types" ) -type updateSinkEvent struct { - sinkID string - owner string - config types.Metadata - timestamp time.Time +type UpdateSinkEvent struct { + SinkID string + Owner string + Config types.Metadata + Timestamp time.Time } diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index d8b4be35d..e54ebb4f4 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -105,70 +105,69 @@ func NewEventStore(sinkerService sinker.Service, configRepo config.ConfigRepo, c } } -func decodeSinksCreate(event map[string]interface{}) (updateSinkEvent, error) { - val := updateSinkEvent{ - sinkID: read(event, "sink_id", ""), - owner: read(event, "owner", ""), - config: readMetadata(event, "config"), - timestamp: time.Time{}, +func decodeSinksCreate(event map[string]interface{}) (UpdateSinkEvent, error) { + val := UpdateSinkEvent{ + SinkID: read(event, "sink_id", ""), + Owner: read(event, "owner", ""), + Config: readMetadata(event, "config"), + Timestamp: time.Time{}, } return val, nil } -func decodeSinksUpdate(event map[string]interface{}) (updateSinkEvent, error) { - val := updateSinkEvent{ - sinkID: read(event, "sink_id", ""), - owner: read(event, "owner", ""), - config: readMetadata(event, "config"), - timestamp: time.Time{}, +func decodeSinksUpdate(event map[string]interface{}) (UpdateSinkEvent, error) { + val := UpdateSinkEvent{ + SinkID: read(event, "sink_id", ""), + Owner: read(event, "owner", ""), + Config: readMetadata(event, "config"), + Timestamp: time.Time{}, } return val, nil } -func decodeSinksRemove(event map[string]interface{}) (updateSinkEvent, error) { - val := updateSinkEvent{ - sinkID: read(event, "sink_id", ""), - owner: read(event, "owner", ""), - config: readMetadata(event, "config"), - timestamp: time.Time{}, +func decodeSinksRemove(event map[string]interface{}) (UpdateSinkEvent, error) { + val := UpdateSinkEvent{ + SinkID: read(event, "sink_id", ""), + Owner: read(event, "owner", ""), + Timestamp: time.Time{}, } return val, nil } -func (es eventStore) handleSinksRemove(_ context.Context, e updateSinkEvent) error { - if ok := es.configRepo.Exists(e.owner, e.sinkID); ok { - err := es.configRepo.Remove(e.owner, e.sinkID) +func (es eventStore) handleSinksRemove(_ context.Context, e UpdateSinkEvent) error { + if ok := es.configRepo.Exists(e.Owner, e.SinkID); ok { + err := es.configRepo.Remove(e.Owner, e.SinkID) if err != nil { es.logger.Error("error during remove sinker cache entry", zap.Error(err)) return err } } else { es.logger.Error("did not found any sinker cache entry for removal", - zap.String("key", fmt.Sprintf("sinker_key-%s-%s", e.owner, e.sinkID))) + zap.String("key", fmt.Sprintf("sinker_key-%s-%s", e.Owner, e.SinkID))) return errors.New("did not found any sinker cache entry for removal") } return nil } -func (es eventStore) handleSinksUpdate(_ context.Context, e updateSinkEvent) error { +func (es eventStore) handleSinksUpdate(_ context.Context, e UpdateSinkEvent) error { var cfg config.SinkConfig - cfg.Config = types.FromMap(e.config) - cfg.SinkID = e.sinkID - cfg.OwnerID = e.owner + cfg.Config = types.FromMap(e.Config) + es.logger.Info("metadata", zap.Any("metadata", cfg.Config)) + es.logger.Info("metadata", zap.Any("incoming", e.Config)) + cfg.SinkID = e.SinkID + cfg.OwnerID = e.Owner cfg.State = config.Unknown - if ok := es.configRepo.Exists(e.owner, e.sinkID); ok { - sinkConfig, err := es.configRepo.Get(e.owner, e.sinkID) + if ok := es.configRepo.Exists(e.Owner, e.SinkID); ok { + sinkConfig, err := es.configRepo.Get(e.Owner, e.SinkID) if err != nil { return err } - if sinkConfig.Config == nil { - sinkConfig.Config = cfg.Config - } + sinkConfig.Config = cfg.Config if sinkConfig.OwnerID == "" { - sinkConfig.OwnerID = e.owner + sinkConfig.OwnerID = e.Owner } if sinkConfig.SinkID == "" { - sinkConfig.SinkID = e.sinkID + sinkConfig.SinkID = e.SinkID } err = es.configRepo.Edit(sinkConfig) if err != nil { @@ -183,11 +182,11 @@ func (es eventStore) handleSinksUpdate(_ context.Context, e updateSinkEvent) err return nil } -func (es eventStore) handleSinksCreate(_ context.Context, e updateSinkEvent) error { +func (es eventStore) handleSinksCreate(_ context.Context, e UpdateSinkEvent) error { var cfg config.SinkConfig - cfg.Config = types.FromMap(e.config) - cfg.SinkID = e.sinkID - cfg.OwnerID = e.owner + cfg.Config = types.FromMap(e.Config) + cfg.SinkID = e.SinkID + cfg.OwnerID = e.Owner cfg.State = config.Unknown err := es.configRepo.Add(cfg) if err != nil { @@ -205,10 +204,10 @@ func read(event map[string]interface{}, key, def string) string { return val } -func readMetadata(event map[string]interface{}, key string) types.Metadata { - val, ok := event[key].(types.Metadata) +func readMetadata(event map[string]interface{}, key string) map[string]interface{} { + val, ok := event[key].(map[string]interface{}) if !ok { - return types.Metadata{} + return map[string]interface{}{} } return val From 1e48f69c2606d0df200cc22c01cad849a4693a42 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 19:12:58 -0300 Subject: [PATCH 16/35] fix(sinker): fix timestamps. --- sinker/redis/consumer/streams.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index e54ebb4f4..511b3bbc9 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -110,7 +110,7 @@ func decodeSinksCreate(event map[string]interface{}) (UpdateSinkEvent, error) { SinkID: read(event, "sink_id", ""), Owner: read(event, "owner", ""), Config: readMetadata(event, "config"), - Timestamp: time.Time{}, + Timestamp: time.Now(), } return val, nil } @@ -120,7 +120,7 @@ func decodeSinksUpdate(event map[string]interface{}) (UpdateSinkEvent, error) { SinkID: read(event, "sink_id", ""), Owner: read(event, "owner", ""), Config: readMetadata(event, "config"), - Timestamp: time.Time{}, + Timestamp: time.Now(), } return val, nil } @@ -129,7 +129,7 @@ func decodeSinksRemove(event map[string]interface{}) (UpdateSinkEvent, error) { val := UpdateSinkEvent{ SinkID: read(event, "sink_id", ""), Owner: read(event, "owner", ""), - Timestamp: time.Time{}, + Timestamp: time.Now(), } return val, nil } From 698a5174e390df9911e188f10d8ae17132a27c01 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 19:28:51 -0300 Subject: [PATCH 17/35] fix(sinker): fix parsing. --- sinker/redis/consumer/streams.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index 511b3bbc9..571058867 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "encoding/json" "fmt" "github.com/orb-community/orb/pkg/errors" "time" @@ -112,6 +113,11 @@ func decodeSinksCreate(event map[string]interface{}) (UpdateSinkEvent, error) { Config: readMetadata(event, "config"), Timestamp: time.Now(), } + var metadata types.Metadata + if err := json.Unmarshal([]byte(read(event, "config", "")), &metadata); err != nil { + return UpdateSinkEvent{}, err + } + val.Config = metadata return val, nil } @@ -122,6 +128,11 @@ func decodeSinksUpdate(event map[string]interface{}) (UpdateSinkEvent, error) { Config: readMetadata(event, "config"), Timestamp: time.Now(), } + var metadata types.Metadata + if err := json.Unmarshal([]byte(read(event, "config", "")), &metadata); err != nil { + return UpdateSinkEvent{}, err + } + val.Config = metadata return val, nil } @@ -204,10 +215,10 @@ func read(event map[string]interface{}, key, def string) string { return val } -func readMetadata(event map[string]interface{}, key string) map[string]interface{} { - val, ok := event[key].(map[string]interface{}) +func readMetadata(event map[string]interface{}, key string) types.Metadata { + val, ok := event[key].(types.Metadata) if !ok { - return map[string]interface{}{} + return types.Metadata{} } return val From f00ad6d7fb46b914e000f3d15bc11297fe0aa8dd Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 19:29:54 -0300 Subject: [PATCH 18/35] fix: remove debug logs. --- maestro/redis/consumer/hashset.go | 1 - maestro/service.go | 1 - sinker/redis/consumer/streams.go | 2 -- 3 files changed, 4 deletions(-) diff --git a/maestro/redis/consumer/hashset.go b/maestro/redis/consumer/hashset.go index e5162edff..2770f3f32 100644 --- a/maestro/redis/consumer/hashset.go +++ b/maestro/redis/consumer/hashset.go @@ -105,7 +105,6 @@ func (es eventStore) handleSinksUpdateCollector(ctx context.Context, event redis if err := json.Unmarshal(sinkData.Config, &metadata); err != nil { return err } - es.logger.Info("metadata", zap.Any("metadata", metadata)) data := config.SinkData{ SinkID: sinkData.Id, OwnerID: sinkData.OwnerID, diff --git a/maestro/service.go b/maestro/service.go index cb473aa86..6a0b94a64 100644 --- a/maestro/service.go +++ b/maestro/service.go @@ -98,7 +98,6 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can svc.logger.Warn("failed to create deploymentEntry for sink, skipping", zap.String("sink-id", sinkRes.Id)) continue } - svc.logger.Info("metadata", zap.Any("metadata", metadata)) err = svc.eventStore.UpdateSinkCache(ctx, data) if err != nil { svc.logger.Warn("failed to update cache for sink", zap.String("sink-id", sinkRes.Id)) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index 571058867..f5338ae46 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -163,8 +163,6 @@ func (es eventStore) handleSinksRemove(_ context.Context, e UpdateSinkEvent) err func (es eventStore) handleSinksUpdate(_ context.Context, e UpdateSinkEvent) error { var cfg config.SinkConfig cfg.Config = types.FromMap(e.Config) - es.logger.Info("metadata", zap.Any("metadata", cfg.Config)) - es.logger.Info("metadata", zap.Any("incoming", e.Config)) cfg.SinkID = e.SinkID cfg.OwnerID = e.Owner cfg.State = config.Unknown From b8290aec26ce297f7551399c29f5dfb0bf474259 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 19:53:07 -0300 Subject: [PATCH 19/35] fix: remove debug logs. --- sinker/redis/consumer/streams.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index f5338ae46..df0139953 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -125,7 +125,6 @@ func decodeSinksUpdate(event map[string]interface{}) (UpdateSinkEvent, error) { val := UpdateSinkEvent{ SinkID: read(event, "sink_id", ""), Owner: read(event, "owner", ""), - Config: readMetadata(event, "config"), Timestamp: time.Now(), } var metadata types.Metadata From 3920b9597a04250bc10e5e35d07165f540aac401 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Fri, 14 Jul 2023 19:58:42 -0300 Subject: [PATCH 20/35] fix(maestro): remove debug logs. --- maestro/redis/consumer/hashset.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maestro/redis/consumer/hashset.go b/maestro/redis/consumer/hashset.go index 2770f3f32..d0f250b3c 100644 --- a/maestro/redis/consumer/hashset.go +++ b/maestro/redis/consumer/hashset.go @@ -114,7 +114,7 @@ func (es eventStore) handleSinksUpdateCollector(ctx context.Context, event redis _ = data.State.SetFromString(sinkData.State) deploy, err := config.GetDeploymentJson(es.kafkaUrl, data) - es.logger.Info("deploy", zap.Any("deploy", deploy)) + if err != nil { es.logger.Error("error trying to get deployment json for sink ID", zap.String("sinkId", event.SinkID), zap.Error(err)) return err From c3b5506e6113fdeecc1bfc6a0e8035a5f30c2ebb Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 17 Jul 2023 17:21:17 -0300 Subject: [PATCH 21/35] fix: add timeout and proper handling of when sink has no activity. --- maestro/config/config_builder.go | 3 +++ maestro/monitor/monitor.go | 3 +-- maestro/redis/consumer/hashset.go | 4 ++++ sinker/redis/sinker.go | 3 ++- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/maestro/config/config_builder.go b/maestro/config/config_builder.go index 5f105b8db..7ec609f19 100644 --- a/maestro/config/config_builder.go +++ b/maestro/config/config_builder.go @@ -376,6 +376,9 @@ func ReturnConfigYamlFromSink(_ context.Context, kafkaUrlConfig string, sink Sin return "", errors.New("invalid authentication type") } exporterBuilder := FromStrategy(sink.Backend) + if exporterBuilder == nil { + return "", errors.New("invalid backend") + } extensions, extensionName := authBuilder.GetExtensionsFromMetadata(sink.Config) exporters, exporterName := exporterBuilder.GetExportersFromMetadata(sink.Config, extensionName) if exporterName == "" { diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index bc56b7ecc..38d70d38b 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -210,8 +210,7 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { // here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle var idleLimit int64 = 0 if activityErr != nil || lastActivity == 0 { - svc.logger.Error("error on getting last collector activity", zap.Error(activityErr)) - continue + idleLimit = time.Now().Unix() - 900 // forces to idle if no activity in 15 minutes } else { idleLimit = time.Now().Unix() - idleTimeSeconds // within 10 minutes } diff --git a/maestro/redis/consumer/hashset.go b/maestro/redis/consumer/hashset.go index d0f250b3c..d7d33d0ee 100644 --- a/maestro/redis/consumer/hashset.go +++ b/maestro/redis/consumer/hashset.go @@ -48,6 +48,10 @@ func (es eventStore) handleSinksDeleteCollector(ctx context.Context, event redis if err != nil { return err } + err = es.RemoveSinkActivity(ctx, event.SinkID) + if err != nil { + return err + } return nil } diff --git a/sinker/redis/sinker.go b/sinker/redis/sinker.go index 97c3a4d39..d180f61e4 100644 --- a/sinker/redis/sinker.go +++ b/sinker/redis/sinker.go @@ -111,9 +111,10 @@ func (s *sinkerCache) AddActivity(ownerID string, sinkID string) error { if ownerID == "" || sinkID == "" { return errors.New("invalid parameters") } + defaultExpiration := time.Duration(10) * time.Minute skey := fmt.Sprintf("%s:%s", activityPrefix, sinkID) lastActivity := strconv.FormatInt(time.Now().Unix(), 10) - if err := s.client.Set(context.Background(), skey, lastActivity, 0).Err(); err != nil { + if err := s.client.Set(context.Background(), skey, lastActivity, defaultExpiration).Err(); err != nil { return err } s.logger.Info("added activity for owner and sink ids", zap.String("owner", ownerID), zap.String("sinkID", sinkID)) From 96bb4aea5dccdf68be5d480e1a5bd4b38dbbd8c0 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 17 Jul 2023 17:41:35 -0300 Subject: [PATCH 22/35] fix: remove redis entries before removing collector. --- maestro/redis/consumer/hashset.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/maestro/redis/consumer/hashset.go b/maestro/redis/consumer/hashset.go index d7d33d0ee..ba09caa0b 100644 --- a/maestro/redis/consumer/hashset.go +++ b/maestro/redis/consumer/hashset.go @@ -35,20 +35,20 @@ func (es eventStore) GetDeploymentEntryFromSinkId(ctx context.Context, sinkId st // handleSinksDeleteCollector will delete Deployment Entry and force delete otel collector func (es eventStore) handleSinksDeleteCollector(ctx context.Context, event redis.SinksUpdateEvent) error { es.logger.Info("Received maestro DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner)) - deploymentEntry, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID) + err := es.sinkerKeyRedisClient.HDel(ctx, deploymentKey, event.SinkID).Err() if err != nil { - es.logger.Error("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) return err } - err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID, deploymentEntry) + err = es.RemoveSinkActivity(ctx, event.SinkID) if err != nil { return err } - err = es.sinkerKeyRedisClient.HDel(ctx, deploymentKey, event.SinkID).Err() + deploymentEntry, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID) if err != nil { + es.logger.Error("did not find collector entry for sink", zap.String("sink-id", event.SinkID)) return err } - err = es.RemoveSinkActivity(ctx, event.SinkID) + err = es.kubecontrol.DeleteOtelCollector(ctx, event.Owner, event.SinkID, deploymentEntry) if err != nil { return err } From 8462cbb6d36d7964d27df62e1110dda66dc98b49 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 17 Jul 2023 18:15:52 -0300 Subject: [PATCH 23/35] fix(maestro): add kill method for stale collectors without sink-id, happening when collector stays up even without a sink, wasting resources. --- maestro/kubecontrol/kubecontrol.go | 23 ++++++++++++++++++++ maestro/monitor/monitor.go | 35 +++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/maestro/kubecontrol/kubecontrol.go b/maestro/kubecontrol/kubecontrol.go index ffea58af8..4fdf4f7c2 100644 --- a/maestro/kubecontrol/kubecontrol.go +++ b/maestro/kubecontrol/kubecontrol.go @@ -49,6 +49,9 @@ type Service interface { // UpdateOtelCollector - update an existing collector by id UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error + + // KillOtelCollector - kill an existing collector by id, terminating by the ownerID, sinkID without the file + KillOtelCollector(ctx context.Context, ownerID, sinkID string) error } func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) error { @@ -157,3 +160,23 @@ func (svc *deployService) DeleteOtelCollector(ctx context.Context, ownerID, sink } return nil } + +func (svc *deployService) KillOtelCollector(ctx context.Context, deploymentName string, sinkId string) error { + stdOutListenFunction := func(out *bufio.Scanner, err *bufio.Scanner) { + for out.Scan() { + svc.logger.Info("Deploy Info: " + out.Text()) + } + for err.Scan() { + svc.logger.Info("Deploy Error: " + err.Text()) + } + } + + // execute action + cmd := exec.Command("kubectl", "delete", deploymentName, "-n", namespace) + _, _, err := execCmd(ctx, cmd, svc.logger, stdOutListenFunction) + if err == nil { + svc.logger.Info(fmt.Sprintf("successfully killed the otel-collector for sink-id: %s", sinkId)) + } + + return nil +} diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index 38d70d38b..9de2a49f9 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -140,7 +140,32 @@ func (svc *monitorService) getRunningPods(ctx context.Context) ([]k8scorev1.Pod, return pods.Items, err } +func (svc *monitorService) getDeploymentName(ctx context.Context, sinkID string) (string, error) { + config, err := rest.InClusterConfig() + if err != nil { + svc.logger.Error("error on get cluster config", zap.Error(err)) + return "", err + } + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + svc.logger.Error("error on get client", zap.Error(err)) + return "", err + } + deployments, err := clientSet.AppsV1().Deployments(namespace).List(ctx, k8smetav1.ListOptions{}) + if err != nil { + svc.logger.Error("error on get deployments", zap.Error(err)) + return "", err + } + for _, deployment := range deployments.Items { + if strings.Contains(deployment.Name, sinkID) { + return deployment.Name, nil + } + } + return "", errors.New("not found deployment for this sink-id") +} + func (svc *monitorService) monitorSinks(ctx context.Context) { + runningCollectors, err := svc.getRunningPods(ctx) if err != nil { svc.logger.Error("error getting running pods on namespace", zap.Error(err)) @@ -160,7 +185,6 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { var sink *sinkspb.SinkRes for _, sinkRes := range sinksRes.Sinks { if strings.Contains(collector.Name, sinkRes.Id) { - svc.logger.Warn("collector found for sink", zap.String("collector name", collector.Name), zap.String("sink", sinkRes.Id)) sink = sinkRes break } @@ -171,6 +195,15 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { deploymentEntry, err := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId) if err != nil { svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId)) + deploymentName, err := svc.getDeploymentName(ctx, sinkId) + if err != nil { + svc.logger.Error("error getting deployment name", zap.Error(err)) + continue + } + err = svc.kubecontrol.KillOtelCollector(ctx, deploymentName, sinkId) + if err != nil { + svc.logger.Error("error removing otel collector, manual intervention required", zap.Error(err)) + } continue } err = svc.kubecontrol.DeleteOtelCollector(ctx, "", sinkId, deploymentEntry) From 0f266ac81cca621d5cf5e2496453a9dc88999d56 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 17 Jul 2023 18:33:17 -0300 Subject: [PATCH 24/35] fix(maestro): remove unused method. --- maestro/monitor/monitor.go | 26 +------------------------- 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index 9de2a49f9..f464ca949 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -140,30 +140,6 @@ func (svc *monitorService) getRunningPods(ctx context.Context) ([]k8scorev1.Pod, return pods.Items, err } -func (svc *monitorService) getDeploymentName(ctx context.Context, sinkID string) (string, error) { - config, err := rest.InClusterConfig() - if err != nil { - svc.logger.Error("error on get cluster config", zap.Error(err)) - return "", err - } - clientSet, err := kubernetes.NewForConfig(config) - if err != nil { - svc.logger.Error("error on get client", zap.Error(err)) - return "", err - } - deployments, err := clientSet.AppsV1().Deployments(namespace).List(ctx, k8smetav1.ListOptions{}) - if err != nil { - svc.logger.Error("error on get deployments", zap.Error(err)) - return "", err - } - for _, deployment := range deployments.Items { - if strings.Contains(deployment.Name, sinkID) { - return deployment.Name, nil - } - } - return "", errors.New("not found deployment for this sink-id") -} - func (svc *monitorService) monitorSinks(ctx context.Context) { runningCollectors, err := svc.getRunningPods(ctx) @@ -195,7 +171,7 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { deploymentEntry, err := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId) if err != nil { svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId)) - deploymentName, err := svc.getDeploymentName(ctx, sinkId) + deploymentName := "otel-" + sinkId if err != nil { svc.logger.Error("error getting deployment name", zap.Error(err)) continue From c715a90f4f4d11d275570ec2c6e55c9f63319979 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 17 Jul 2023 18:50:11 -0300 Subject: [PATCH 25/35] fix(maestro): fix monitor not getting the correct sink-id. --- maestro/monitor/monitor.go | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index f464ca949..bfa6e2581 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -140,6 +140,32 @@ func (svc *monitorService) getRunningPods(ctx context.Context) ([]k8scorev1.Pod, return pods.Items, err } +func (svc *monitorService) getDeploymentName(ctx context.Context, sinkID string) (string, error) { + config, err := rest.InClusterConfig() + if err != nil { + svc.logger.Error("error on get cluster config", zap.Error(err)) + return "", err + } + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + svc.logger.Error("error on get client", zap.Error(err)) + return "", err + } + deployments, err := clientSet.AppsV1().Deployments(namespace).List(ctx, k8smetav1.ListOptions{}) + if err != nil { + svc.logger.Error("error on get deployments", zap.Error(err)) + return "", err + } + svc.logger.Info("sinkID", zap.String("sinkID", sinkID)) + for _, deployment := range deployments.Items { + svc.logger.Info("deployment name", zap.String("name", deployment.Name)) + if deployment.Name == "otel-"+sinkID { + return deployment.Name, nil + } + } + return "", errors.New("not found deployment for this sink-id") +} + func (svc *monitorService) monitorSinks(ctx context.Context) { runningCollectors, err := svc.getRunningPods(ctx) @@ -167,11 +193,11 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { } if sink == nil { svc.logger.Warn("collector not found for sink, depleting collector", zap.String("collector name", collector.Name)) - sinkId := collector.Name[5:51] + sinkId := collector.Name[5:41] deploymentEntry, err := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId) if err != nil { svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId)) - deploymentName := "otel-" + sinkId + deploymentName, err := svc.getDeploymentName(ctx, sinkId) if err != nil { svc.logger.Error("error getting deployment name", zap.Error(err)) continue From 4251a7ebceb98700de891c82596e55e171794e0f Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 17 Jul 2023 18:51:46 -0300 Subject: [PATCH 26/35] fix(maestro): remove unused method and use simpler form to get deployment name. --- maestro/monitor/monitor.go | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index bfa6e2581..07ee17af3 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -140,32 +140,6 @@ func (svc *monitorService) getRunningPods(ctx context.Context) ([]k8scorev1.Pod, return pods.Items, err } -func (svc *monitorService) getDeploymentName(ctx context.Context, sinkID string) (string, error) { - config, err := rest.InClusterConfig() - if err != nil { - svc.logger.Error("error on get cluster config", zap.Error(err)) - return "", err - } - clientSet, err := kubernetes.NewForConfig(config) - if err != nil { - svc.logger.Error("error on get client", zap.Error(err)) - return "", err - } - deployments, err := clientSet.AppsV1().Deployments(namespace).List(ctx, k8smetav1.ListOptions{}) - if err != nil { - svc.logger.Error("error on get deployments", zap.Error(err)) - return "", err - } - svc.logger.Info("sinkID", zap.String("sinkID", sinkID)) - for _, deployment := range deployments.Items { - svc.logger.Info("deployment name", zap.String("name", deployment.Name)) - if deployment.Name == "otel-"+sinkID { - return deployment.Name, nil - } - } - return "", errors.New("not found deployment for this sink-id") -} - func (svc *monitorService) monitorSinks(ctx context.Context) { runningCollectors, err := svc.getRunningPods(ctx) @@ -197,7 +171,7 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { deploymentEntry, err := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId) if err != nil { svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId)) - deploymentName, err := svc.getDeploymentName(ctx, sinkId) + deploymentName := "otel-" + sinkId if err != nil { svc.logger.Error("error getting deployment name", zap.Error(err)) continue From 8d28c2be56cb8797a8f3925faface9a53c329212 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 18 Jul 2023 08:54:39 -0300 Subject: [PATCH 27/35] fix(sinker): fixed typos. --- sinker/redis/consumer/streams.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sinker/redis/consumer/streams.go b/sinker/redis/consumer/streams.go index df0139953..2faae6d84 100644 --- a/sinker/redis/consumer/streams.go +++ b/sinker/redis/consumer/streams.go @@ -152,9 +152,9 @@ func (es eventStore) handleSinksRemove(_ context.Context, e UpdateSinkEvent) err return err } } else { - es.logger.Error("did not found any sinker cache entry for removal", + es.logger.Error("did not find any sinker cache entry for removal", zap.String("key", fmt.Sprintf("sinker_key-%s-%s", e.Owner, e.SinkID))) - return errors.New("did not found any sinker cache entry for removal") + return errors.New("did not find any sinker cache entry for removal") } return nil } From 962924e2d50df102626a255898acdf6303810622 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 18 Jul 2023 09:39:10 -0300 Subject: [PATCH 28/35] fix(maestro): fix deployment name. --- maestro/monitor/monitor.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index 07ee17af3..a55c87297 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -172,10 +172,6 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { if err != nil { svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId)) deploymentName := "otel-" + sinkId - if err != nil { - svc.logger.Error("error getting deployment name", zap.Error(err)) - continue - } err = svc.kubecontrol.KillOtelCollector(ctx, deploymentName, sinkId) if err != nil { svc.logger.Error("error removing otel collector, manual intervention required", zap.Error(err)) From 53b90c90b11b78e288d73b5384ebb406bc419d41 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 18 Jul 2023 09:53:24 -0300 Subject: [PATCH 29/35] fix(maestro): fix deployment name. --- maestro/kubecontrol/kubecontrol.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maestro/kubecontrol/kubecontrol.go b/maestro/kubecontrol/kubecontrol.go index 4fdf4f7c2..3fd6ce06e 100644 --- a/maestro/kubecontrol/kubecontrol.go +++ b/maestro/kubecontrol/kubecontrol.go @@ -172,7 +172,7 @@ func (svc *deployService) KillOtelCollector(ctx context.Context, deploymentName } // execute action - cmd := exec.Command("kubectl", "delete", deploymentName, "-n", namespace) + cmd := exec.Command("kubectl", "delete deployment", deploymentName, "-n", namespace) _, _, err := execCmd(ctx, cmd, svc.logger, stdOutListenFunction) if err == nil { svc.logger.Info(fmt.Sprintf("successfully killed the otel-collector for sink-id: %s", sinkId)) From f8ed6e421036f689692049b3e617f2e6c4237b42 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 18 Jul 2023 10:46:15 -0300 Subject: [PATCH 30/35] fix(maestro): fix delete deployment command. --- maestro/kubecontrol/kubecontrol.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maestro/kubecontrol/kubecontrol.go b/maestro/kubecontrol/kubecontrol.go index 3fd6ce06e..edf83ab7c 100644 --- a/maestro/kubecontrol/kubecontrol.go +++ b/maestro/kubecontrol/kubecontrol.go @@ -172,7 +172,7 @@ func (svc *deployService) KillOtelCollector(ctx context.Context, deploymentName } // execute action - cmd := exec.Command("kubectl", "delete deployment", deploymentName, "-n", namespace) + cmd := exec.Command("kubectl", "delete", "deploy", deploymentName, "-n", namespace) _, _, err := execCmd(ctx, cmd, svc.logger, stdOutListenFunction) if err == nil { svc.logger.Info(fmt.Sprintf("successfully killed the otel-collector for sink-id: %s", sinkId)) From 08bb49b5b4354f15baffd7491d0b5d2b0cf6781a Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 18 Jul 2023 14:35:37 -0300 Subject: [PATCH 31/35] fix(maestro): make monitor just look the last log line, instead of last 10. --- maestro/monitor/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index a55c87297..237d6bfed 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -71,7 +71,7 @@ func (svc *monitorService) Start(ctx context.Context, cancelFunc context.CancelF } func (svc *monitorService) getPodLogs(ctx context.Context, pod k8scorev1.Pod) ([]string, error) { - maxTailLines := int64(10) + maxTailLines := int64(1) sinceSeconds := int64(300) podLogOpts := k8scorev1.PodLogOptions{TailLines: &maxTailLines, SinceSeconds: &sinceSeconds} config, err := rest.InClusterConfig() From 50141d636f447770a066204164b38c4d6869d225 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 19 Jul 2023 15:36:27 -0300 Subject: [PATCH 32/35] fix(maestro): remove restriction on monitor to check collectors with error status. --- maestro/monitor/monitor.go | 70 +++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index 237d6bfed..717d114fc 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -191,51 +191,45 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { } data.SinkID = sink.Id data.OwnerID = sink.OwnerID - // only analyze logs if current status is "active" or "warning" var logsErr error var status string - if sink.GetState() == "active" || sink.GetState() == "warning" { - logs, err := svc.getPodLogs(ctx, collector) - if err != nil { - svc.logger.Error("error on getting logs, skipping", zap.Error(err)) - continue - } - status, logsErr = svc.analyzeLogs(logs) - if status == "fail" { - svc.logger.Error("error during analyze logs", zap.Error(logsErr)) - continue - } + logs, err := svc.getPodLogs(ctx, collector) + if err != nil { + svc.logger.Error("error on getting logs, skipping", zap.Error(err)) + continue + } + status, logsErr = svc.analyzeLogs(logs) + if status == "fail" { + svc.logger.Error("error during analyze logs", zap.Error(logsErr)) + continue } var lastActivity int64 var activityErr error - // if log analysis return active or warning we should check if have activity on collector - if status == "active" || status == "warning" { - lastActivity, activityErr = svc.eventStore.GetActivity(sink.Id) - // if logs reported 'active' status - // here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle - var idleLimit int64 = 0 - if activityErr != nil || lastActivity == 0 { - idleLimit = time.Now().Unix() - 900 // forces to idle if no activity in 15 minutes - } else { - idleLimit = time.Now().Unix() - idleTimeSeconds // within 10 minutes - } - if idleLimit >= lastActivity { - //changing state on sinks - svc.eventStore.PublishSinkStateChange(sink, "idle", logsErr, err) - //changing state on redis sinker - data.State.SetFromString("idle") - svc.eventStore.UpdateSinkStateCache(ctx, data) - deploymentEntry, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id) - if errDeploy != nil { - svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr)) - continue - } - err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deploymentEntry) - if err != nil { - svc.logger.Error("error removing otel collector", zap.Error(err)) - } + lastActivity, activityErr = svc.eventStore.GetActivity(sink.Id) + // if logs reported 'active' status + // here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle + var idleLimit int64 = 0 + if activityErr != nil || lastActivity == 0 { + idleLimit = time.Now().Unix() - 900 // forces to idle if no activity in 15 minutes + } else { + idleLimit = time.Now().Unix() - idleTimeSeconds // within 10 minutes + } + if idleLimit >= lastActivity { + //changing state on sinks + svc.eventStore.PublishSinkStateChange(sink, "idle", logsErr, err) + //changing state on redis sinker + data.State.SetFromString("idle") + svc.eventStore.UpdateSinkStateCache(ctx, data) + deploymentEntry, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id) + if errDeploy != nil { + svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr)) continue } + err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deploymentEntry) + if err != nil { + svc.logger.Error("error removing otel collector", zap.Error(err)) + } + continue } //set the new sink status if changed during checks if sink.GetState() != status && status != "" { From fab038ef0276dbbdbdd280f22439849ca47ec513 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 19 Jul 2023 16:36:17 -0300 Subject: [PATCH 33/35] fix(maestro): simplify check for activity to move sink to idle. --- maestro/monitor/monitor.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index 717d114fc..e0092e882 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -203,17 +203,10 @@ func (svc *monitorService) monitorSinks(ctx context.Context) { svc.logger.Error("error during analyze logs", zap.Error(logsErr)) continue } - var lastActivity int64 - var activityErr error - lastActivity, activityErr = svc.eventStore.GetActivity(sink.Id) + lastActivity, activityErr := svc.eventStore.GetActivity(sink.Id) // if logs reported 'active' status // here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle - var idleLimit int64 = 0 - if activityErr != nil || lastActivity == 0 { - idleLimit = time.Now().Unix() - 900 // forces to idle if no activity in 15 minutes - } else { - idleLimit = time.Now().Unix() - idleTimeSeconds // within 10 minutes - } + idleLimit := time.Now().Unix() - idleTimeSeconds // within 10 minutes if idleLimit >= lastActivity { //changing state on sinks svc.eventStore.PublishSinkStateChange(sink, "idle", logsErr, err) From db2c678b803726abdc29e3bb2f65633246d84501 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 19 Jul 2023 16:49:38 -0300 Subject: [PATCH 34/35] fix(sinker): simplify update activity. --- sinker/otel/bridgeservice/bridge.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sinker/otel/bridgeservice/bridge.go b/sinker/otel/bridgeservice/bridge.go index 4bb14c6f7..45753d7a3 100644 --- a/sinker/otel/bridgeservice/bridge.go +++ b/sinker/otel/bridgeservice/bridge.go @@ -97,15 +97,13 @@ func (bs *SinkerOtelBridgeService) NotifyActiveSink(ctx context.Context, mfOwner } bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State)) } - } else if cfgRepo.State == config.Active || cfgRepo.State == config.Warning { + } else { err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId) if err != nil { bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err)) return err } bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State)) - } else if cfgRepo.State == config.Error { - cfgRepo.Msg = message } return nil From cfda910b0fc7b9c603c7b999a5bc453a1ab6df22 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 19 Jul 2023 17:04:28 -0300 Subject: [PATCH 35/35] fix(maestro): remove log that is not useful. --- maestro/monitor/monitor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/maestro/monitor/monitor.go b/maestro/monitor/monitor.go index e0092e882..c9fa10ccc 100644 --- a/maestro/monitor/monitor.go +++ b/maestro/monitor/monitor.go @@ -105,7 +105,6 @@ func (svc *monitorService) getPodLogs(ctx context.Context, pod k8scorev1.Pod) ([ } str := buf.String() splitLogs := strings.Split(str, "\n") - svc.logger.Info("logs length", zap.Int("amount line logs", len(splitLogs))) return splitLogs, nil }