Skip to content

Commit

Permalink
Upgrade Kafka Client, Address Dependency Vulnerability (#32)
Browse files Browse the repository at this point in the history
* Remove -u arg for compatibility with go 1.16

* Integration test improvements for known flaky tests

* Update tidwall/gjson to avoid version with security vuln

* Disable sticky partitions for all test producers to ensure near-even distribution of messages

* Upgrade confluent-kafka-go to 1.7.0
  • Loading branch information
Jeff Nadler authored Jul 27, 2021
1 parent 8a1bddd commit 1cb5465
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ cover: inttest .makecache/cover
@mkdir $@

.makecache/lint: $(FIREBOLT_SRCS)
@go get -u github.com/golangci/golangci-lint/cmd/[email protected]
@go get github.com/golangci/golangci-lint/cmd/[email protected]
@golangci-lint run --no-config --disable-all -E gosec -E interfacer -E vet -E deadcode -E gocyclo -E golint -E varcheck -E dupl -E ineffassign -E unconvert -E nakedret -E gofmt -E unparam -E prealloc ./...

# Unit Tests only
Expand Down
Binary file modified coverage_badge.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.16
require (
github.com/Comcast/go-leaderelection v0.0.0-20181102191523-272fd9e2bddc
github.com/Shopify/sarama v1.27.2
github.com/confluentinc/confluent-kafka-go v1.5.2
github.com/confluentinc/confluent-kafka-go v1.7.0
github.com/digitalocean/captainslog v0.0.0-20190610170928-cd175de8a6e2
github.com/golang/protobuf v1.5.2 // indirect
github.com/olivere/elastic/v7 v7.0.15
Expand All @@ -16,9 +16,7 @@ require (
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/tidwall/gjson v1.2.1 // indirect
github.com/tidwall/match v1.0.1 // indirect
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
github.com/tidwall/gjson v1.8.1 // indirect
golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/confluentinc/confluent-kafka-go v1.5.2 h1:l+qt+a0Okmq0Bdr1P55IX4fiwFJyg0lZQmfHkAFkv7E=
github.com/confluentinc/confluent-kafka-go v1.5.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -129,12 +129,12 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 h1:BP2bjP495BBPaBcS5rmqviTfrOkN5rO5ceKAMRZCRFc=
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/gjson v1.8.1 h1:8j5EE9Hrh3l9Od1OIEDAb7IpezNA20UdRngNAj5N0WU=
github.com/tidwall/gjson v1.8.1/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk=
github.com/tidwall/match v1.0.3 h1:FQUVvBImDutD8wJLN6c5eMzWtjgONK9MwIBCOrUJKeE=
github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.1.0 h1:K3hMW5epkdAVwibsQEfR/7Zj0Qgt4DxtNumTq/VloO8=
github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
3 changes: 2 additions & 1 deletion inttest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestRecovery(t *testing.T) {
go errorConsumer.Shutdown()
close(errCh)

fmt.Printf("inttest: recovery founc %d successful, %d errors, %d filtered", len(successCh), len(errCh), len(internal.FilteredEvents))
fmt.Printf("inttest: recovery found %d successful, %d errors, %d filtered", len(successCh), len(errCh), len(internal.FilteredEvents))
totalEvents := len(successCh) + len(errCh) + len(internal.FilteredEvents)
assert.Equal(t, 2400, totalEvents)
}
Expand All @@ -216,6 +216,7 @@ func produceTestData(count int) {
config := make(map[string]string)
config["brokers"] = "localhost:9092"
config["topic"] = testTopic
config["librdkafka.sticky.partitioning.linger.ms"] = "0" // sticky partitioning can cause most/all events to go to partition 0, but our tests rely on near-even distribution
err := kp.Setup(config)
if err != nil {
log.WithError(err).Info("failed to setup producer")
Expand Down
6 changes: 4 additions & 2 deletions node/kafkaconsumer/kafkaconsumer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ func TestKafkaConsumer(t *testing.T) {
// using a unique (by time) topic name makes it easier to run this test repeatedly in local testing without interference
// from the data left over by previous runs
topicName := fmt.Sprintf("kafkaconsumer-%d", time.Now().UnixNano())
testutil.EnsureTestTopicExists(topicName, 4)

// create a producer for test data
kp := &kafkaproducer.KafkaProducer{}
producerConfig := make(map[string]string)
producerConfig["brokers"] = "localhost:9092"
producerConfig["topic"] = topicName
producerConfig["librdkafka.sticky.partitioning.linger.ms"] = "0" // sticky partitioning can cause most/all events to go to partition 0, but our tests rely on near-even distribution
err := kp.Setup(producerConfig)
assert.Nil(t, err)

Expand All @@ -44,8 +46,8 @@ func TestKafkaConsumer(t *testing.T) {
})
}

// sleep to give kafka time to autocreate the topic and have a leader for it's partition
time.Sleep(10 * time.Second)
// sleep so that when the kafka consumer queries partition offsets, the 1000 messages are all accounted for
time.Sleep(5 * time.Second)

// start the kafka consumer with 'maxpartitionlag' 60
kc := &KafkaConsumer{}
Expand Down
13 changes: 9 additions & 4 deletions node/kafkaconsumer/recoveryconsumer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestRecoveryConsumerEndToEnd(t *testing.T) {
metrics.Init("recoveryconsumer")

topicName := fmt.Sprintf("recoveryconsumer-endtoend-%d", time.Now().UnixNano())
println("recoveryconsumer endtoend assignment integration test using topic " + topicName)
testutil.EnsureTestTopicExists(topicName, 4)

// first we will produce 2000 records to create a recovery state where the kafkaconsumer is 'behind'
// the kafkaconsumer is set up with 'maxpartitionlag' 60 * 4 partitions, so 240 records will be consumed immediately in "real time" by the main consumer
Expand Down Expand Up @@ -141,6 +141,7 @@ func TestRecoveryConsumerRateLimited(t *testing.T) {
metrics.Init("recoveryconsumer")

topicName := fmt.Sprintf("recoveryconsumer-ratelimit-%d", time.Now().UnixNano())
testutil.EnsureTestTopicExists(topicName, 4)

produceTestRecords(topicName, 1000, t)

Expand Down Expand Up @@ -174,7 +175,9 @@ func TestRecoveryConsumerRestart(t *testing.T) {
metrics.Init("recoveryconsumer")

eventTopicName := fmt.Sprintf("recoveryconsumer-restart-%d", time.Now().UnixNano())
testutil.EnsureTestTopicExists(eventTopicName, 4)
messageTopicName := fmt.Sprintf("recoveryconsumer-messages-%d", time.Now().UnixNano())
testutil.EnsureTestTopicExists(messageTopicName, 1)
println("recoveryconsumer endtoend assignment integration test using event topic " + eventTopicName)

// first we will produce 2000 records to create a recovery state where the kafkaconsumer is 'behind'
Expand Down Expand Up @@ -220,7 +223,7 @@ func TestRecoveryConsumerRestart(t *testing.T) {
// give the recoveryconsumer some time to do its thing
_ = testutil.AwaitCondition(func() bool {
return (previouslyRecovered + len(kc2.sendCh)) >= 8240
}, 1*time.Second, 60*time.Second)
}, 1*time.Second, 90*time.Second)
assert.True(t, 8240 <= previouslyRecovered+len(kc2.sendCh), "expected the number of records recovered to be > 8240 but it was %d", previouslyRecovered+len(kc2.sendCh)) // (60 * 4) from kafkaconsumer + (2000 * 4) from recoveryconsumer

err = kc2.Shutdown()
Expand All @@ -232,8 +235,9 @@ func TestRecoveryCancellation(t *testing.T) {
metrics.Init("recoveryconsumer")

eventTopicName := fmt.Sprintf("recoveryconsumer-restart-%d", time.Now().UnixNano())
testutil.EnsureTestTopicExists(eventTopicName, 4)
messageTopicName := fmt.Sprintf("recoveryconsumer-messages-%d", time.Now().UnixNano())
println("recoveryconsumer recovery cancellation integration test using event topic " + eventTopicName)
testutil.EnsureTestTopicExists(messageTopicName, 1)

// first we will produce 10000 records to create a recovery state where the kafkaconsumer is 'behind'
// the kafkaconsumer is set up with 'maxpartitionlag' 60 * 4 partitions, so 240 records will be consumed immediately in "real time" by the main consumer
Expand All @@ -243,7 +247,7 @@ func TestRecoveryCancellation(t *testing.T) {

// sleep to give kafka time to autocreate the topic and have a leader for it's partition, also without this sometimes
// it's too fast and kafka returns 0 for highwatermark, which results in a false-failed test
time.Sleep(5 * time.Second)
time.Sleep(15 * time.Second)

kc, err := startKafkaConsumerWithRealContext(eventTopicName, messageTopicName, 500, 2000, t)
assert.Nil(t, err)
Expand Down Expand Up @@ -309,6 +313,7 @@ func produceTestRecords(topicName string, numRecords int, t *testing.T) {
producerConfig := make(map[string]string)
producerConfig["brokers"] = "localhost:9092"
producerConfig["topic"] = topicName
producerConfig["librdkafka.sticky.partitioning.linger.ms"] = "0" // sticky partitioning can cause most/all events to go to partition 0, but our tests rely on near-even distribution
err := kp.Setup(producerConfig)
assert.Nil(t, err)
for i := 0; i < numRecords; i++ {
Expand Down
2 changes: 2 additions & 0 deletions testutil/kafkautil.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ func EnsureTestTopicExists(topicName string, partitions int32) error {
_, ok := topicMap[topicName]
if ok {
// topic exists!
log.Info("topic created successfully")
return nil
}

time.Sleep(200 * time.Millisecond)
}

}

0 comments on commit 1cb5465

Please sign in to comment.