Skip to content

Commit

Permalink
Use clowder and kafka broker configuration from ioutils v1.25.2
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Jan 16, 2024
1 parent d67cb62 commit 2e0b7b2
Show file tree
Hide file tree
Showing 20 changed files with 270 additions and 418 deletions.
37 changes: 19 additions & 18 deletions cmd/ccx-notification-service/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,23 @@ func showAuthors() {

// showConfiguration function displays actual configuration.
func showConfiguration(config *conf.ConfigStruct) {
brokerConfig := conf.GetKafkaBrokerConfiguration(config)
brokerCfg := conf.GetKafkaBrokerConfiguration(config)
notifEventsCfg := conf.GetNotifEventsConfiguration(config)
log.Info().
Bool("Enabled", brokerConfig.Enabled).
Str("Address", brokerConfig.Address).
Str("SecurityProtocol", brokerConfig.SecurityProtocol).
Str("SaslMechanism", brokerConfig.SaslMechanism).
Str("Topic", brokerConfig.Topic).
Str("Timeout", brokerConfig.Timeout.String()).
Int("Likelihood threshold", brokerConfig.LikelihoodThreshold).
Int("Impact threshold", brokerConfig.ImpactThreshold).
Int("Severity threshold", brokerConfig.SeverityThreshold).
Int("Total risk threshold", brokerConfig.TotalRiskThreshold).
Str("CoolDown", brokerConfig.Cooldown).
Str("Event filter", brokerConfig.EventFilter).
Bool("Filter by tags", brokerConfig.TagFilterEnabled).
Strs("List of tags", brokerConfig.Tags).
Bool("Enabled", brokerCfg.Enabled).
Strs("Addresses", brokerCfg.Addresses).
Str("SecurityProtocol", brokerCfg.SecurityProtocol).
Str("SaslMechanism", brokerCfg.SaslMechanism).
Str("Topic", brokerCfg.Topic).
Str("Timeout", brokerCfg.Timeout.String()).
Int("Likelihood threshold", notifEventsCfg.LikelihoodThreshold).
Int("Impact threshold", notifEventsCfg.ImpactThreshold).
Int("Severity threshold", notifEventsCfg.SeverityThreshold).
Int("Total risk threshold", notifEventsCfg.TotalRiskThreshold).
Str("CoolDown", notifEventsCfg.Cooldown).
Str("Event filter", notifEventsCfg.EventFilter).
Bool("Filter by tags", notifEventsCfg.TagFilterEnabled).
Strs("List of tags", notifEventsCfg.Tags).
Msg("Broker configuration")

serviceLogConfig := conf.GetServiceLogConfiguration(config)
Expand All @@ -78,9 +79,9 @@ func showConfiguration(config *conf.ConfigStruct) {
Str("ClientID", serviceLogConfig.ClientID).
Str("Created by", serviceLogConfig.CreatedBy).
Str("Username", serviceLogConfig.Username).
Int("Likelihood threshold", brokerConfig.LikelihoodThreshold).
Int("Impact threshold", brokerConfig.ImpactThreshold).
Int("Severity threshold", brokerConfig.SeverityThreshold).
Int("Likelihood threshold", serviceLogConfig.LikelihoodThreshold).
Int("Impact threshold", serviceLogConfig.ImpactThreshold).
Int("Severity threshold", serviceLogConfig.SeverityThreshold).
Int("Total risk threshold", serviceLogConfig.TotalRiskThreshold).
Str("CoolDown", serviceLogConfig.Cooldown).
Str("Event filter", serviceLogConfig.EventFilter).
Expand Down
111 changes: 36 additions & 75 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ limitations under the License.
// configuration from provided configuration file and/or from environment
// variables. Additionally several specific functions named
// GetStorageConfiguration, GetLoggingConfiguration,
// GetKafkaBrokerConfiguration, GetNotificationsConfiguration and
// GetKafkaConfiguration, GetNotificationsConfiguration and
// GetMetricsConfiguration are to be used to return specific configuration
// options.
package conf
Expand Down Expand Up @@ -56,6 +56,8 @@ package conf
import (
"bytes"
"fmt"
clowderutils "github.com/RedHatInsights/insights-operator-utils/clowder"

Check failure on line 59 in conf/config.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
"github.com/RedHatInsights/insights-operator-utils/kafka"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -93,19 +95,20 @@ const (
// ConfigStruct is a structure holding the whole notification service
// configuration
type ConfigStruct struct {
LoggingConf logger.LoggingConfiguration `mapstructure:"logging" toml:"logging"`
CloudWatchConf logger.CloudWatchConfiguration `mapstructure:"cloudwatch" toml:"cloudwatch"`
SentryLoggingConf logger.SentryLoggingConfiguration `mapstructure:"sentry" toml:"sentry"`
KafkaZerologConf logger.KafkaZerologConfiguration `mapstructure:"kafka_zerolog" toml:"kafka_zerolog"`
Storage StorageConfiguration `mapstructure:"storage" toml:"storage"`
Kafka KafkaConfiguration `mapstructure:"kafka_broker" toml:"kafka_broker"`
ServiceLog ServiceLogConfiguration `mapstructure:"service_log" toml:"service_log"`
Dependencies DependenciesConfiguration `mapstructure:"dependencies" toml:"dependencies"`
Notifications NotificationsConfiguration `mapstructure:"notifications" toml:"notifications"`
Metrics MetricsConfiguration `mapstructure:"metrics" toml:"metrics"`
Cleaner CleanerConfiguration `mapstructure:"cleaner" toml:"cleaner"`
Processing ProcessingConfiguration `mapstructure:"processing" toml:"processing"`
DeleteOperation bool
LoggingConf logger.LoggingConfiguration `mapstructure:"logging" toml:"logging"`
CloudWatchConf logger.CloudWatchConfiguration `mapstructure:"cloudwatch" toml:"cloudwatch"`
SentryLoggingConf logger.SentryLoggingConfiguration `mapstructure:"sentry" toml:"sentry"`
KafkaZerologConf logger.KafkaZerologConfiguration `mapstructure:"kafka_zerolog" toml:"kafka_zerolog"`
Storage StorageConfiguration `mapstructure:"storage" toml:"storage"`
Kafka kafka.BrokerConfiguration `mapstructure:"kafka_broker" toml:"kafka_broker"`
NotificationEvents NotificationEventsConfiguration `mapstructure:"notification_events" toml:"notification_events"`
ServiceLog ServiceLogConfiguration `mapstructure:"service_log" toml:"service_log"`
Dependencies DependenciesConfiguration `mapstructure:"dependencies" toml:"dependencies"`
Notifications NotificationsConfiguration `mapstructure:"notifications" toml:"notifications"`
Metrics MetricsConfiguration `mapstructure:"metrics" toml:"metrics"`
Cleaner CleanerConfiguration `mapstructure:"cleaner" toml:"cleaner"`
Processing ProcessingConfiguration `mapstructure:"processing" toml:"processing"`
DeleteOperation bool
}

// LoggingConfiguration represents configuration for logging in general
Expand Down Expand Up @@ -151,25 +154,16 @@ type CleanerConfiguration struct {
MaxAge string `mapstructure:"max_age" toml:"max_age"`
}

// KafkaConfiguration represents configuration of Kafka brokers and topics
type KafkaConfiguration struct {
Enabled bool `mapstructure:"enabled" toml:"enabled"`
Address string `mapstructure:"address" toml:"address"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"`
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
Topic string `mapstructure:"topic" toml:"topic"`
Timeout time.Duration `mapstructure:"timeout" toml:"timeout"`
LikelihoodThreshold int `mapstructure:"likelihood_threshold" toml:"likelihood_threshold"`
ImpactThreshold int `mapstructure:"impact_threshold" toml:"impact_threshold"`
SeverityThreshold int `mapstructure:"severity_threshold" toml:"severity_threshold"`
TotalRiskThreshold int `mapstructure:"total_risk_threshold" toml:"total_risk_threshold"`
Cooldown string `mapstructure:"cooldown" toml:"cooldown"`
EventFilter string `mapstructure:"event_filter" toml:"event_filter"`
TagFilterEnabled bool `mapstructure:"tag_filter_enabled" toml:"tag_filter_enabled"`
Tags []string `mapstructure:"tags" toml:"tags"`
// NotificationEventsConfiguration represents configuration related with notification events
type NotificationEventsConfiguration struct {
LikelihoodThreshold int `mapstructure:"likelihood_threshold" toml:"likelihood_threshold"`
ImpactThreshold int `mapstructure:"impact_threshold" toml:"impact_threshold"`
SeverityThreshold int `mapstructure:"severity_threshold" toml:"severity_threshold"`
TotalRiskThreshold int `mapstructure:"total_risk_threshold" toml:"total_risk_threshold"`
Cooldown string `mapstructure:"cooldown" toml:"cooldown"`
EventFilter string `mapstructure:"event_filter" toml:"event_filter"`
TagFilterEnabled bool `mapstructure:"tag_filter_enabled" toml:"tag_filter_enabled"`
Tags []string `mapstructure:"tags" toml:"tags"`
TagsSet types.TagsSet
}

Expand Down Expand Up @@ -297,7 +291,7 @@ func LoadConfiguration(configFileEnvVariableName, defaultConfigFile string) (Con
}

// convert list/slice into regular set
configuration.Kafka.TagsSet = types.MakeSetOfTags(configuration.Kafka.Tags)
configuration.NotificationEvents.TagsSet = types.MakeSetOfTags(configuration.NotificationEvents.Tags)
configuration.ServiceLog.TagsSet = types.MakeSetOfTags(configuration.ServiceLog.Tags)

// everything's should be ok
Expand Down Expand Up @@ -339,10 +333,15 @@ func GetKafkaZerologConfiguration(configuration *ConfigStruct) logger.KafkaZerol
}

// GetKafkaBrokerConfiguration returns kafka broker configuration
func GetKafkaBrokerConfiguration(configuration *ConfigStruct) KafkaConfiguration {
func GetKafkaBrokerConfiguration(configuration *ConfigStruct) kafka.BrokerConfiguration {
return configuration.Kafka
}

// GetNotifEventsConfiguration returns configuration of notification events sent through Kafka
func GetNotifEventsConfiguration(configuration *ConfigStruct) NotificationEventsConfiguration {
return configuration.NotificationEvents
}

// GetServiceLogConfiguration returns ServiceLog configuration
func GetServiceLogConfiguration(configuration *ConfigStruct) ServiceLogConfiguration {
return configuration.ServiceLog
Expand Down Expand Up @@ -384,37 +383,8 @@ func updateConfigFromClowder(configuration *ConfigStruct) {
if clowder.LoadedConfig.Kafka == nil {
fmt.Println(noKafkaConfig)
} else {
// make sure broker(s) are configured in Clowder
if len(clowder.LoadedConfig.Kafka.Brokers) > 0 {
broker := clowder.LoadedConfig.Kafka.Brokers[0]
// port can be empty in clowder, so taking it into account
if broker.Port != nil {
configuration.Kafka.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
configuration.Kafka.Address = broker.Hostname
}

// SSL config
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
configuration.Kafka.SaslUsername = *broker.Sasl.Username
configuration.Kafka.SaslPassword = *broker.Sasl.Password
configuration.Kafka.SaslMechanism = *broker.Sasl.SaslMechanism
configuration.Kafka.SecurityProtocol = *broker.Sasl.SecurityProtocol
if caPath, err := clowder.LoadedConfig.KafkaCa(broker); err == nil {
configuration.Kafka.CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
}
}

} else {
fmt.Println(noBrokerConfig)
}

updateTopicsMapping(configuration)
clowderutils.UseBrokerConfig(&configuration.Kafka, clowder.LoadedConfig)
clowderutils.UseClowderTopics(&configuration.Kafka, clowder.KafkaTopics)
}

if clowder.LoadedConfig.Database != nil {
Expand All @@ -428,12 +398,3 @@ func updateConfigFromClowder(configuration *ConfigStruct) {
fmt.Println(noStorage)
}
}

func updateTopicsMapping(configuration *ConfigStruct) {
// Updating topics from clowder mapping if available
if topicCfg, ok := clowder.KafkaTopics[configuration.Kafka.Topic]; ok {
configuration.Kafka.Topic = topicCfg.Name
} else {
fmt.Printf(noTopicMapping, configuration.Kafka.Topic)
}
}
12 changes: 6 additions & 6 deletions conf/config_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,20 @@ func BenchmarkGetLoggingConfiguration(b *testing.B) {
}

// BenchmarkGetKafkaBrokerConfiguration measures the speed of
// GetKafkaBrokerConfiguration function from the conf module.
// GetKafkaConfiguration function from the conf module.
func BenchmarkGetKafkaBrokerConfiguration(b *testing.B) {
configuration := mustLoadBenchmarkConfiguration(b)

for i := 0; i < b.N; i++ {
// call benchmarked function
m := conf.GetKafkaBrokerConfiguration(&configuration)

n := conf.GetNotifEventsConfiguration(&configuration)
b.StopTimer()
if m.Address != "localhost:9092" {
b.Fatal("Wrong configuration: address = '" + m.Address + "'")
if len(m.Addresses) != 1 && m.Addresses[0] != "localhost:9092" {
b.Fatal("Wrong configuration: address = '" + m.Addresses[0] + "'")
}
if m.Cooldown != "24 hours" {
b.Fatal("Wrong configuration: cooldown = '" + m.Cooldown + "'")
if n.Cooldown != "24 hours" {
b.Fatal("Wrong configuration: cooldown = '" + n.Cooldown + "'")
}
b.StartTimer()
}
Expand Down
Loading

0 comments on commit 2e0b7b2

Please sign in to comment.