Skip to content

Commit

Permalink
Support for multiple broker addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Feb 19, 2024
1 parent 3744d6a commit cde01e5
Show file tree
Hide file tree
Showing 18 changed files with 250 additions and 2,110 deletions.
59 changes: 31 additions & 28 deletions ccx_notification_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ import (
"strconv"
"strings"

kafkautils "github.com/RedHatInsights/insights-operator-utils/kafka"

utils "github.com/RedHatInsights/insights-operator-utils/migrations"
"github.com/prometheus/client_golang/prometheus/promhttp"

Expand All @@ -68,6 +66,7 @@ const (
versionMessage = "CCX Notification Writer version 1.0"
authorsMessage = "Pavel Tisnovsky, Red Hat Inc."
connectionToBrokerMessage = "Connection to broker"
allBrokerConnectionAttemptsMessage = "Couldn't connect to any of the provided brokers"
operationFailedMessage = "Operation failed"
notConnectedToBrokerMessage = "Not connected to broker"
brokerConnectionSuccessMessage = "Broker connection OK"
Expand Down Expand Up @@ -128,7 +127,7 @@ func showConfiguration(configuration *ConfigStruct) {
// retrieve and then display broker configuration
brokerConfig := GetBrokerConfiguration(configuration)
log.Info().
Str(brokerAddresses, strings.Join(brokerConfig.Addresses, ",")).
Str(brokerAddresses, brokerConfig.Addresses).
Str("Security protocol", brokerConfig.SecurityProtocol).
Str("Cert path", brokerConfig.CertPath).
Str("Sasl mechanism", brokerConfig.SaslMechanism).
Expand Down Expand Up @@ -174,35 +173,39 @@ func tryToConnectToKafka(configuration *ConfigStruct) (int, error) {

// display basic info about broker that will be used
log.Info().
Str(brokerAddresses, strings.Join(brokerConfiguration.Addresses, ",")).
Msgf("Establishing connection to first Kafka broker from list")
Str(brokerAddresses, brokerConfiguration.Addresses)

// create new broker instance (w/o any checks)
broker := sarama.NewBroker(brokerConfiguration.Addresses[0])
var err error
for _, addr := range strings.Split(brokerConfiguration.Addresses, ",") {
// create new broker instance (w/o any checks)
broker := sarama.NewBroker(addr)

// check broker connection
err := broker.Open(nil)
if err != nil {
log.Error().Err(err).Msg(connectionToBrokerMessage)
return ExitStatusKafkaError, err
}
// check broker connection
err = broker.Open(nil)
if err != nil {
log.Error().Err(err).Msg(connectionToBrokerMessage)
continue
}

// check if connection remain
connected, err := broker.Connected()
if err != nil {
log.Error().Err(err).Msg(connectionToBrokerMessage)
return ExitStatusKafkaError, err
}
if !connected {
log.Error().Err(err).Msg(notConnectedToBrokerMessage)
return ExitStatusConsumerError, err
}
// check if connection remain
connected, err := broker.Connected()
if err != nil {
log.Error().Err(err).Msg(connectionToBrokerMessage)
continue
}
if !connected {
log.Error().Err(err).Msg(notConnectedToBrokerMessage)
continue
}

// connection was established
log.Info().Msg(brokerConnectionSuccessMessage)
// connection was established
log.Info().Msg(brokerConnectionSuccessMessage)

// everything seems to be ok
return ExitStatusOK, nil
// everything seems to be ok
return ExitStatusOK, nil
}
log.Error().Msg(allBrokerConnectionAttemptsMessage)
return ExitStatusKafkaError, err
}

// performDatabaseInitialization function performs database initialization -
Expand Down Expand Up @@ -467,7 +470,7 @@ func startService(configuration *ConfigStruct) (int, error) {
}

// startConsumer function starts the Kafka consumer.
func startConsumer(brokerConfiguration *kafkautils.BrokerConfiguration, storage Storage) error {
func startConsumer(brokerConfiguration *BrokerConfiguration, storage Storage) error {
consumer, err := NewConsumer(brokerConfiguration, storage)
if err != nil {
log.Error().Err(err).Msg("Construct broker failed")
Expand Down
10 changes: 4 additions & 6 deletions ccx_notification_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"os"
"testing"

kafkautils "github.com/RedHatInsights/insights-operator-utils/kafka"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -75,8 +73,8 @@ func TestShowAuthors(t *testing.T) {
func TestShowConfiguration(t *testing.T) {
// fill in configuration structure
configuration := main.ConfigStruct{}
configuration.Broker = kafkautils.BrokerConfiguration{
Addresses: []string{"broker_address"},
configuration.Broker = main.BrokerConfiguration{
Addresses: "broker_address",
Topic: "broker_topic",
}
configuration.Metrics = main.MetricsConfiguration{
Expand Down Expand Up @@ -155,8 +153,8 @@ func TestDoSelectedOperationShowAuthors(t *testing.T) {
func TestDoSelectedOperationShowConfiguration(t *testing.T) {
// fill in configuration structure
configuration := main.ConfigStruct{}
configuration.Broker = kafkautils.BrokerConfiguration{
Addresses: []string{"broker_address"},
configuration.Broker = main.BrokerConfiguration{
Addresses: "broker_address",
Topic: "broker_topic",
}
configuration.Metrics = main.MetricsConfiguration{
Expand Down
2 changes: 1 addition & 1 deletion config-devel.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[broker]
addresses = ["kafka:29092"]
addresses = "kafka:29092"
topic = "ccx.ocp.results"
group = "test-consumer-group"
enabled = true
Expand Down
83 changes: 74 additions & 9 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ import (

"path/filepath"

clowderutils "github.com/RedHatInsights/insights-operator-utils/clowder"
kafkautils "github.com/RedHatInsights/insights-operator-utils/kafka"
clowder "github.com/redhatinsights/app-common-go/pkg/api/v1"

"github.com/rs/zerolog/log"
Expand All @@ -110,16 +108,17 @@ const (
noKafkaConfig = "no Kafka configuration available in Clowder, using default one"
noBrokerConfig = "warning: no broker configurations found in clowder config"
noSaslConfig = "warning: SASL configuration is missing"
noTopicMapping = "warning: no kafka mapping found for topic %s"
noStorage = "warning: no storage section in Clowder config"
)

// ConfigStruct is a structure holding the whole notification service
// configuration
type ConfigStruct struct {
Broker kafkautils.BrokerConfiguration `mapstructure:"broker" toml:"broker"`
Storage StorageConfiguration `mapstructure:"storage" toml:"storage"`
Logging LoggingConfiguration `mapstructure:"logging" toml:"logging"`
Metrics MetricsConfiguration `mapstructure:"metrics" toml:"metrics"`
Broker BrokerConfiguration `mapstructure:"broker" toml:"broker"`
Storage StorageConfiguration `mapstructure:"storage" toml:"storage"`
Logging LoggingConfiguration `mapstructure:"logging" toml:"logging"`
Metrics MetricsConfiguration `mapstructure:"metrics" toml:"metrics"`
}

// MetricsConfiguration holds metrics related configuration
Expand All @@ -144,6 +143,28 @@ type LoggingConfiguration struct {
LogLevel string `mapstructure:"log_level" toml:"log_level"`
}

// BrokerConfiguration represents configuration for the broker
type BrokerConfiguration struct {
// Addresses represents Kafka broker addresses
Addresses string `mapstructure:"addresses" toml:"addresses"`
// SecurityProtocol represents the security protocol used by the broker
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
// CertPath is the path to a file containing the certificate to be used with the broker
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
// SaslMechanism is the SASL mechanism used for authentication
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
// SaslUsername is the username used in case of PLAIN mechanism
SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"`
// SaslPassword is the password used in case of PLAIN mechanism
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
// Topic is name of Kafka topic
Topic string `mapstructure:"topic" toml:"topic"`
// Group is name of Kafka group
Group string `mapstructure:"group" toml:"group"`
// Enabled is set to true if Kafka consumer is to be enabled
Enabled bool `mapstructure:"enabled" toml:"enabled"`
}

// StorageConfiguration represents configuration of data storage
type StorageConfiguration struct {
Driver string `mapstructure:"db_driver" toml:"db_driver"`
Expand Down Expand Up @@ -237,7 +258,7 @@ func GetLoggingConfiguration(configuration *ConfigStruct) LoggingConfiguration {
}

// GetBrokerConfiguration returns broker configuration
func GetBrokerConfiguration(configuration *ConfigStruct) kafkautils.BrokerConfiguration {
func GetBrokerConfiguration(configuration *ConfigStruct) BrokerConfiguration {
return configuration.Broker
}

Expand All @@ -246,6 +267,42 @@ func GetMetricsConfiguration(configuration *ConfigStruct) MetricsConfiguration {
return configuration.Metrics
}

func updateBrokerCfgFromClowder(configuration *ConfigStruct) {
// make sure broker(s) are configured in Clowder
if len(clowder.LoadedConfig.Kafka.Brokers) > 0 {
configuration.Broker.Addresses = ""
for _, broker := range clowder.LoadedConfig.Kafka.Brokers {
if broker.Port != nil {
configuration.Broker.Addresses += fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + ","
} else {
configuration.Broker.Addresses += broker.Hostname + ","
}
}
// remove the extra comma
configuration.Broker.Addresses = configuration.Broker.Addresses[:len(configuration.Broker.Addresses)-1]

// SSL config
clowderBrokerCfg := clowder.LoadedConfig.Kafka.Brokers[0]
if clowderBrokerCfg.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if clowderBrokerCfg.Sasl != nil {
configuration.Broker.SaslUsername = *clowderBrokerCfg.Sasl.Username
configuration.Broker.SaslPassword = *clowderBrokerCfg.Sasl.Password
configuration.Broker.SaslMechanism = *clowderBrokerCfg.Sasl.SaslMechanism
configuration.Broker.SecurityProtocol = *clowderBrokerCfg.SecurityProtocol
if caPath, err := clowder.LoadedConfig.KafkaCa(clowderBrokerCfg); err == nil {
configuration.Broker.CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
}
}
} else {
fmt.Println(noBrokerConfig)
}
updateTopicsMapping(&configuration.Broker)
}

// updateConfigFromClowder updates the current config with the values defined in clowder
func updateConfigFromClowder(configuration *ConfigStruct) {
// check if Clowder is enabled. If not, simply skip the logic.
Expand All @@ -258,8 +315,7 @@ func updateConfigFromClowder(configuration *ConfigStruct) {
if clowder.LoadedConfig.Kafka == nil {
fmt.Println(noKafkaConfig)
} else {
clowderutils.UseBrokerConfig(&configuration.Broker, clowder.LoadedConfig)
clowderutils.UseClowderTopics(&configuration.Broker, clowder.KafkaTopics)
updateBrokerCfgFromClowder(configuration)
}

if clowder.LoadedConfig.Database != nil {
Expand All @@ -273,3 +329,12 @@ func updateConfigFromClowder(configuration *ConfigStruct) {
fmt.Println(noStorage)
}
}

func updateTopicsMapping(configuration *BrokerConfiguration) {
// Get the correct topic name from clowder mapping if available
if clowderTopic, ok := clowder.KafkaTopics[configuration.Topic]; ok {
configuration.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, configuration.Topic)
}
}
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

[broker]
addresses = ["localhost:9092","localhost:9093"]
addresses = "localhost:9093,localhost:9092"
security_protocol = "PLAINTEXT"
sasl_mechanism = "not-used"
sasl_username = "not-used"
Expand Down
5 changes: 2 additions & 3 deletions config_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package main_test
// https://redhatinsights.github.io/ccx-notification-writer/packages/config_benchmark_test.html

import (
"strings"
"testing"

main "github.com/RedHatInsights/ccx-notification-writer"
Expand Down Expand Up @@ -72,8 +71,8 @@ func BenchmarkGetBrokerConfiguration(b *testing.B) {
m := main.GetBrokerConfiguration(&configuration)

b.StopTimer()
if m.Addresses[0] != "kafka:29092" {
b.Fatal("Wrong configuration: addresses = '" + strings.Join(m.Addresses, ",") + "'")
if m.Addresses != "kafka:29092" {
b.Fatal("Wrong configuration: addresses = '" + m.Addresses + "'")
}
b.StartTimer()
}
Expand Down
22 changes: 11 additions & 11 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ func TestLoadBrokerConfiguration(t *testing.T) {
assert.Nil(t, err, "Failed loading configuration file from env var!")

brokerCfg := main.GetBrokerConfiguration(&config)

assert.Equal(t, []string{"localhost:29092"}, brokerCfg.Addresses)
assert.Equal(t, "localhost:29092", brokerCfg.Addresses)
assert.Equal(t, "ccx_test_notifications", brokerCfg.Topic)
}

Expand Down Expand Up @@ -248,7 +247,7 @@ func TestLoadConfigurationNoKafkaBroker(t *testing.T) {
brokerCfg := main.GetBrokerConfiguration(&config)

// check broker configuration
assert.Equal(t, []string{"localhost:29092"}, brokerCfg.Addresses)
assert.Equal(t, "localhost:29092", brokerCfg.Addresses)
assert.Equal(t, "ccx_test_notifications", brokerCfg.Topic)
assert.Equal(t, "test-consumer-group", brokerCfg.Group)
assert.True(t, brokerCfg.Enabled)
Expand Down Expand Up @@ -293,7 +292,7 @@ func TestLoadConfigurationKafkaBrokerEmptyConfig(t *testing.T) {
print(clowder.KafkaServers)
print(clowder.LoadedConfig.Kafka)
// check broker configuration
assert.Equal(t, []string{""}, brokerCfg.Addresses)
assert.Equal(t, "", brokerCfg.Addresses)
assert.Equal(t, "ccx_test_notifications", brokerCfg.Topic)
assert.Equal(t, "test-consumer-group", brokerCfg.Group)
assert.True(t, brokerCfg.Enabled)
Expand Down Expand Up @@ -336,7 +335,7 @@ func TestLoadConfigurationKafkaBrokerNoPort(t *testing.T) {

// check broker configuration
// no port should be set
assert.Equal(t, []string{"test"}, brokerCfg.Addresses)
assert.Equal(t, "test", brokerCfg.Addresses)
assert.Equal(t, "ccx_test_notifications", brokerCfg.Topic)
assert.Equal(t, "test-consumer-group", brokerCfg.Group)
assert.True(t, brokerCfg.Enabled)
Expand Down Expand Up @@ -380,7 +379,7 @@ func TestLoadConfigurationKafkaBrokerPort(t *testing.T) {
brokerCfg := main.GetBrokerConfiguration(&config)

// check broker configuration
assert.Equal(t, []string{"test:1234"}, brokerCfg.Addresses)
assert.Equal(t, "test:1234", brokerCfg.Addresses)
assert.Equal(t, "ccx_test_notifications", brokerCfg.Topic)
assert.Equal(t, "test-consumer-group", brokerCfg.Group)
assert.True(t, brokerCfg.Enabled)
Expand Down Expand Up @@ -428,7 +427,7 @@ func TestLoadConfigurationKafkaBrokerAuthConfigMissingSASL(t *testing.T) {
brokerCfg := main.GetBrokerConfiguration(&config)

// check broker configuration
assert.Equal(t, []string{"test:1234"}, brokerCfg.Addresses)
assert.Equal(t, "test:1234", brokerCfg.Addresses)
assert.Equal(t, "ccx_test_notifications", brokerCfg.Topic)
assert.Equal(t, "test-consumer-group", brokerCfg.Group)
assert.True(t, brokerCfg.Enabled)
Expand Down Expand Up @@ -464,7 +463,8 @@ func TestLoadConfigurationKafkaBrokerAuthConfig(t *testing.T) {
Sasl: &clowder.KafkaSASLConfig{
Username: &username,
Password: &password,
SaslMechanism: &saslMechanism},
SaslMechanism: &saslMechanism,
},
},
}

Expand Down Expand Up @@ -493,7 +493,7 @@ func TestLoadConfigurationKafkaBrokerAuthConfig(t *testing.T) {
brokerCfg := main.GetBrokerConfiguration(&config)

// check broker configuration
assert.Equal(t, []string{"test:1234"}, brokerCfg.Addresses)
assert.Equal(t, "test:1234", brokerCfg.Addresses)
assert.Equal(t, "ccx_test_notifications", brokerCfg.Topic)
assert.Equal(t, "test-consumer-group", brokerCfg.Group)
assert.True(t, brokerCfg.Enabled)
Expand Down Expand Up @@ -540,7 +540,7 @@ func TestLoadConfigurationKafkaTopicUpdatedFromClowder(t *testing.T) {
assert.NoError(t, err, "Failed loading configuration file")

brokerCfg := main.GetBrokerConfiguration(&config)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", hostname, port)}, brokerCfg.Addresses)
assert.Equal(t, fmt.Sprintf("%s:%d", hostname, port), brokerCfg.Addresses)
assert.Equal(t, newTopicName, brokerCfg.Topic)

// config with different broker configuration, broker's hostname taken from clowder, but no topic to map to
Expand All @@ -550,7 +550,7 @@ func TestLoadConfigurationKafkaTopicUpdatedFromClowder(t *testing.T) {
assert.NoError(t, err, "Failed loading configuration file")

brokerCfg = main.GetBrokerConfiguration(&config)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", hostname, port)}, brokerCfg.Addresses)
assert.Equal(t, fmt.Sprintf("%s:%d", hostname, port), brokerCfg.Addresses)
assert.Equal(t, topicName, brokerCfg.Topic)
}

Expand Down
Loading

0 comments on commit cde01e5

Please sign in to comment.