Skip to content

Commit

Permalink
Merge pull request #35 from meitu/fix/rebalance_bug
Browse files Browse the repository at this point in the history
Fix rebalance bug:
1. rebalance was triggered while the consumers changed, and close the triggerCh
2. partition consumer would give up to claim the partition when received the stop signal, then call `stop` function would reclose the triggerCh as well, and the claim would be pending
3. topic consumer wait partition consumer would be pending as well, because the partition consumer was pending
  • Loading branch information
git-hulk authored Nov 6, 2018
2 parents f15eac3 + 16b1839 commit 4149a4b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
30 changes: 17 additions & 13 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ type ConsumerGroup struct {
topicConsumers map[string]*topicConsumer
saramaConsumer sarama.Consumer

id string
state int
wg sync.WaitGroup
stopCh chan struct{}
triggerCh chan int
stopOnce *sync.Once
owners map[string]map[int32]string
id string
state int
wg sync.WaitGroup
stopCh chan struct{}
triggerCh chan int
triggerOnce *sync.Once
owners map[string]map[int32]string

config *Config
logger *logrus.Logger
Expand All @@ -62,7 +62,6 @@ func NewConsumerGroup(config *Config) (*ConsumerGroup, error) {
cg.id = genConsumerID()
}
cg.name = config.GroupID
cg.stopOnce = new(sync.Once)
cg.triggerCh = make(chan int)
cg.topicConsumers = make(map[string]*topicConsumer)
cg.onLoad = make([]func(), 0)
Expand Down Expand Up @@ -129,10 +128,6 @@ func (cg *ConsumerGroup) IsStopped() bool {
return cg.state == cgStopped
}

func (cg *ConsumerGroup) triggerRebalance() {
cg.triggerCh <- restartEvent
}

func (cg *ConsumerGroup) callRecover() {
if err := recover(); err != nil {
cg.logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -167,6 +162,7 @@ func (cg *ConsumerGroup) start() {
CONSUME_TOPIC_LOOP:
for {
cg.logger.WithField("group", cg.name).Info("Consumer group started")
cg.triggerOnce = new(sync.Once)
cg.stopCh = make(chan struct{})

err := cg.watchRebalance()
Expand All @@ -191,6 +187,10 @@ CONSUME_TOPIC_LOOP:
defer cg.callRecover()
defer wg.Done()
tc.wg.Wait()
cg.logger.WithFields(logrus.Fields{
"group": tc.group,
"topic": tc.name,
}).Info("Stop the topic consumer")
}(consumer)
}
cg.state = cgStart
Expand Down Expand Up @@ -219,7 +219,11 @@ CONSUME_TOPIC_LOOP:
}

func (cg *ConsumerGroup) stop() {
cg.stopOnce.Do(func() { cg.triggerCh <- quitEvent })
cg.triggerOnce.Do(func() { cg.triggerCh <- quitEvent })
}

func (cg *ConsumerGroup) triggerRebalance() {
cg.triggerOnce.Do(func() { cg.triggerCh <- restartEvent })
}

func (cg *ConsumerGroup) getPartitionConsumer(topic string, partition int32, nextOffset int64) (sarama.PartitionConsumer, error) {
Expand Down
4 changes: 2 additions & 2 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (pc *partitionConsumer) start() {
"topic": pc.topic,
"partition": pc.partition,
"err": err,
}).Error("Failed to clamin the partition and gave up")
}).Error("Failed to claim the partition and gave up")
goto ERROR
}
defer func() {
Expand Down Expand Up @@ -166,7 +166,7 @@ func (pc *partitionConsumer) claim() error {
"partition": pc.partition,
"retries": i,
"err": err,
}).Warn("Failed to claim the partiton with retries")
}).Warn("Failed to claim the partition with retries")
}
select {
case <-timer.C:
Expand Down
4 changes: 0 additions & 4 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ func (tc *topicConsumer) start() {
"group": tc.group,
"topic": topic,
}).Info("Start the topic consumer")
defer cg.logger.WithFields(logrus.Fields{
"group": tc.group,
"topic": topic,
}).Info("Stop the topic consumer")

partitions, err := tc.assignPartitions()
if err != nil {
Expand Down

0 comments on commit 4149a4b

Please sign in to comment.