diff --git a/statsdaemon.go b/statsdaemon.go index 4248760..dcb940b 100644 --- a/statsdaemon.go +++ b/statsdaemon.go @@ -15,6 +15,7 @@ import ( "sort" "strconv" "strings" + "sync" "syscall" "time" ) @@ -129,7 +130,6 @@ func monitor() { func submit(deadline time.Time) error { var buffer bytes.Buffer - var num int64 now := time.Now().Unix() @@ -152,9 +152,28 @@ func submit(deadline time.Time) error { return errors.New(errmsg) } - num += processCounters(&buffer, now) - num += processGauges(&buffer, now) - num += processTimers(&buffer, now, percentThreshold) + var numchan chan int64 + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + numchan <- processCounters(&buffer, now) + }() + go func() { + defer wg.Done() + numchan <- processGauges(&buffer, now) + }() + go func() { + defer wg.Done() + numchan <- processTimers(&buffer, now, percentThreshold) + }() + wg.Done() + close(numchan) + + var num int64 + for n := range numchan { + num += n + } if num == 0 { return nil } @@ -216,63 +235,72 @@ func processGauges(buffer *bytes.Buffer, now int64) int64 { return num } -func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 { - var num int64 - for u, t := range timers { - if len(t) > 0 { - num++ +func processTimer(buffer *bytes.Buffer, now int64, pctls Percentiles, u string, t Uint64Slice) { + sort.Sort(t) + min := t[0] + max := t[len(t)-1] + maxAtThreshold := max + count := len(t) - sort.Sort(t) - min := t[0] - max := t[len(t)-1] - maxAtThreshold := max - count := len(t) + sum := uint64(0) + for _, value := range t { + sum += value + } + mean := float64(sum) / float64(len(t)) - sum := uint64(0) - for _, value := range t { - sum += value - } - mean := float64(sum) / float64(len(t)) - - for _, pct := range pctls { - - if len(t) > 1 { - var abs float64 - if pct.float >= 0 { - abs = pct.float - } else { - abs = 100 + pct.float - } - // poor man's math.Round(x): - // math.Floor(x + 0.5) - indexOfPerc := int(math.Floor(((abs / 100.0) * float64(count)) + 0.5)) - if pct.float >= 0 { - indexOfPerc -= 1 // index offset=0 - } - maxAtThreshold = t[indexOfPerc] - } + for _, pct := range pctls { - var tmpl string - var pctstr string - if pct.float >= 0 { - tmpl = "%s.upper_%s %d %d\n" - pctstr = pct.str - } else { - tmpl = "%s.lower_%s %d %d\n" - pctstr = pct.str[1:] - } - fmt.Fprintf(buffer, tmpl, u, pctstr, maxAtThreshold, now) + if len(t) > 1 { + var abs float64 + if pct.float >= 0 { + abs = pct.float + } else { + abs = 100 + pct.float + } + // poor man's math.Round(x): + // math.Floor(x + 0.5) + indexOfPerc := int(math.Floor(((abs / 100.0) * float64(count)) + 0.5)) + if pct.float >= 0 { + indexOfPerc -= 1 // index offset=0 } + maxAtThreshold = t[indexOfPerc] + } - var z Uint64Slice - timers[u] = z + var tmpl string + var pctstr string + if pct.float >= 0 { + tmpl = "%s.upper_%s %d %d\n" + pctstr = pct.str + } else { + tmpl = "%s.lower_%s %d %d\n" + pctstr = pct.str[1:] + } + fmt.Fprintf(buffer, tmpl, u, pctstr, maxAtThreshold, now) + } + + var z Uint64Slice + timers[u] = z - fmt.Fprintf(buffer, "%s.mean %f %d\n", u, mean, now) - fmt.Fprintf(buffer, "%s.upper %d %d\n", u, max, now) - fmt.Fprintf(buffer, "%s.lower %d %d\n", u, min, now) - fmt.Fprintf(buffer, "%s.count %d %d\n", u, count, now) + fmt.Fprintf(buffer, "%s.mean %f %d\n", u, mean, now) + fmt.Fprintf(buffer, "%s.upper %d %d\n", u, max, now) + fmt.Fprintf(buffer, "%s.lower %d %d\n", u, min, now) + fmt.Fprintf(buffer, "%s.count %d %d\n", u, count, now) +} + +func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 { + var num int64 + var wg sync.WaitGroup + for u, t := range timers { + if len(t) > 0 { + num++ + wg.Add(1) + go func() { + defer wg.Done() + processTimer(buffer, now, pctls, u, t) + }() } } + wg.Wait() return num }