diff --git a/storage/clickhousespanstore/pool.go b/storage/clickhousespanstore/pool.go index 710a6bc..21d5c95 100644 --- a/storage/clickhousespanstore/pool.go +++ b/storage/clickhousespanstore/pool.go @@ -60,11 +60,14 @@ func (pool *WriteWorkerPool) Work() { finish := false nextWorkerID := int32(1) pendingSpanCount := 0 + + pool.done.Add(1) + defer pool.done.Done() + for { // Initialize to zero, or update value from previous loop numPendingSpans.Set(float64(pendingSpanCount)) - pool.done.Add(1) select { case batch := <-pool.batches: batchSize := len(batch) @@ -103,7 +106,6 @@ func (pool *WriteWorkerPool) Work() { pool.workers.CloseWorkers() finish = true } - pool.done.Done() if finish { break diff --git a/storage/clickhousespanstore/writer.go b/storage/clickhousespanstore/writer.go index d68a36e..931242c 100644 --- a/storage/clickhousespanstore/writer.go +++ b/storage/clickhousespanstore/writer.go @@ -88,13 +88,32 @@ func (w *SpanWriter) registerMetrics() { func (w *SpanWriter) backgroundWriter(maxSpanCount int) { pool := NewWorkerPool(&w.workerParams, maxSpanCount) go pool.Work() - batch := make([]*model.Span, 0, w.size) + batch := make([]*model.Span, 0, w.size) timer := time.After(w.workerParams.delay) last := time.Now() + finishSpan := func() { + for len(w.spans) > 0 { + select { + case span := <-w.spans: + batch = append(batch, span) + if len(batch) >= cap(batch) { + pool.WriteBatch(batch) + } + default: + } + } + if len(batch) > 0 { + pool.WriteBatch(batch) + } + pool.Close() + } + + w.done.Add(1) + defer w.done.Done() + for { - w.done.Add(1) flush := false finish := false @@ -128,11 +147,7 @@ func (w *SpanWriter) backgroundWriter(maxSpanCount int) { } if finish { - pool.Close() - } - w.done.Done() - - if finish { + finishSpan() break } }