Skip to content

Commit

Permalink
[fix] [to #79] RetryQueue is not thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
shabicheng committed Jun 6, 2020
1 parent a84225c commit a9ae3dd
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions producer/retry_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,37 @@ package producer

import (
"container/heap"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"time"
)

type RetryQueue []*ProducerBatch
// RetryQueue cache ProducerBatch and retry latter
type RetryQueue struct {
batch []*ProducerBatch
mutex sync.Mutex
}

func initRetryQueue() *RetryQueue {
retryQueue := RetryQueue{}
heap.Init(&retryQueue)
return &retryQueue
}

func (retryQueue *RetryQueue) sendToRetryQueue(producerBatch *ProducerBatch, logger log.Logger) {
level.Debug(logger).Log("msg", "Retry queue to get data")
level.Debug(logger).Log("msg", "Send to retry queue")
retryQueue.mutex.Lock()
defer retryQueue.mutex.Unlock()
if producerBatch != nil {
heap.Push(retryQueue, producerBatch)
}
}

func (retryQueue *RetryQueue) getRetryBatch(moverShutDownFlag bool) (producerBatchList []*ProducerBatch) {
retryQueue.mutex.Lock()
defer retryQueue.mutex.Unlock()
if !moverShutDownFlag {
for retryQueue.Len() > 0 {
producerBatch := heap.Pop(retryQueue)
Expand All @@ -36,30 +52,24 @@ func (retryQueue *RetryQueue) getRetryBatch(moverShutDownFlag bool) (producerBat
return producerBatchList
}

func initRetryQueue() *RetryQueue {
retryQueue := RetryQueue{}
heap.Init(&retryQueue)
return &retryQueue
}

func (retryQueue RetryQueue) Len() int {
return len(retryQueue)
func (retryQueue *RetryQueue) Len() int {
return len(retryQueue.batch)
}

func (retryQueue RetryQueue) Less(i, j int) bool {
return retryQueue[i].nextRetryMs < retryQueue[j].nextRetryMs
func (retryQueue *RetryQueue) Less(i, j int) bool {
return retryQueue.batch[i].nextRetryMs < retryQueue.batch[j].nextRetryMs
}
func (retryQueue RetryQueue) Swap(i, j int) {
retryQueue[i], retryQueue[j] = retryQueue[j], retryQueue[i]
func (retryQueue *RetryQueue) Swap(i, j int) {
retryQueue.batch[i], retryQueue.batch[j] = retryQueue.batch[j], retryQueue.batch[i]
}
func (retryQueue *RetryQueue) Push(x interface{}) {
item := x.(*ProducerBatch)
*retryQueue = append(*retryQueue, item)
retryQueue.batch = append(retryQueue.batch, item)
}
func (retryQueue *RetryQueue) Pop() interface{} {
old := *retryQueue
old := retryQueue.batch
n := len(old)
item := old[n-1]
*retryQueue = old[0 : n-1]
retryQueue.batch = old[0 : n-1]
return item
}

0 comments on commit a9ae3dd

Please sign in to comment.