Skip to content

Commit

Permalink
Support configurable bucket ranges for bulk_process_time Histogram (#48)
Browse files Browse the repository at this point in the history
* node/elasticsearch: support configurable bucket range for indexing time

* helpers.go, node/elasticsearch: allow user configuration of configurable bucket range represented as float64

* docs/node-elasticsearch.md: document configurable indexing time bucket, and format tables so that my IDE stops shouting

* node/elasticsearch/metrics_test.go: test that n buckets exist, ignoring values. also, stop testing behaviors specific to prometheus, as that is not firebolts concern
  • Loading branch information
ful09003 authored Mar 23, 2023
1 parent 1c4b91e commit 08415cb
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 24 deletions.
39 changes: 21 additions & 18 deletions docs/node-elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,29 @@ The `elasticsearch` node indexes documents to an Elasticsearch cluster. Its pa

The IndexRequest type contains four fields:

Field | Required | Default | Description
------------------------|:--------:|---------|--------------
Index | * | | Destination index name in Elasticsearch.
MappingType | | | In ES 7.x+, omit MappingType and ES will default to `_doc`. In ES 6.x-, specify your mapping type.
DocID | | | Provide a document ID to index this document under, or leave nil and ES will assign an ID.
Doc | * | | The document to index. This should be a struct that is marshallable to JSON.
| Field | Required | Default | Description |
|-------------|:--------:|---------|-----------------------------------------------------------------------------------------------------|
| Index | * | | Destination index name in Elasticsearch. |
| MappingType | | | In ES 7.x+, omit MappingType and ES will default to `_doc`. In ES 6.x-, specify your mapping type. |
| DocID | | | Provide a document ID to index this document under, or leave nil and ES will assign an ID. |
| Doc | * | | The document to index. This should be a struct that is marshallable to JSON. |

Internally, `elasticsearch` uses the `BulkService` in the `olivere/elastic` client.

## Configuration

Param | Required | Default | Description
--------------------------|:--------:|---------|--------------
elastic-addr | * | | Comma-separated list of Elasticsearch client nodes.
elastic-username | | | Username used in http/basic authentication.
elastic-password | | | Password used in http/basic authentication.
batch-size | | 100 | Wait until this many documents are collected and send them as a single batch.
batch-max-wait-ms | | 1000 | Max time, in ms, to wait for `batch-size` documents to be ready before sending a smaller batch.
bulk-index-timeout-ms | | 5000 | Timeout passed to Elasticsearch along with the bulk index request.
reconnect-batch-count | | 10000 | Reestablish connections to ES after this many batches. Useful to ensure that load remains distributed among ES client nodes as they are created or fail.
bulk-index-max-retries | | 3 | Number of times to retry indexing errors. Mapping errors / field type conflicts are not retryable.
bulk-index-timeout-seconds| | 20 | Forcibly cancel individual bulk indexing operations after this time.
index-workers | | 1 | Number of goroutine workers to use to process batch indexing operations against Elasticsearch.
| Param | Required | Default | Description |
|----------------------------|:--------:|----------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| elastic-addr | * | | Comma-separated list of Elasticsearch client nodes. |
| elastic-username | | | Username used in http/basic authentication. |
| elastic-password | | | Password used in http/basic authentication. |
| batch-size | | 100 | Wait until this many documents are collected and send them as a single batch. |
| batch-max-wait-ms | | 1000 | Max time, in ms, to wait for `batch-size` documents to be ready before sending a smaller batch. |
| bulk-index-timeout-ms | | 5000 | Timeout passed to Elasticsearch along with the bulk index request. |
| reconnect-batch-count | | 10000 | Reestablish connections to ES after this many batches. Useful to ensure that load remains distributed among ES client nodes as they are created or fail. |
| bulk-index-max-retries | | 3 | Number of times to retry indexing errors. Mapping errors / field type conflicts are not retryable. |
| bulk-index-timeout-seconds | | 20 | Forcibly cancel individual bulk indexing operations after this time. |
| index-workers | | 1 | Number of goroutine workers to use to process batch indexing operations against Elasticsearch. |
| histogram-min-bucket-sec | | 0.01 | Smallest bucket describing observed Elasticsearch client indexing time. |
| histogram-max-bucket-sec | | 2 * (bulk-index-timeout-ms/1000) | Maximum bucket describing observed Elasticsearch client indexing time. |
| histogram-bucket-count | | 8 | Number of buckets to generate between min and max histogram bucket seconds. |
32 changes: 32 additions & 0 deletions helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,35 @@ func (c Nodeconfig) StringConfigRequired(name string) (string, error) {
}
return userValue, nil
}

// Float64Config validates and fetches the flaot-typed optional config value specified by 'name', using the 'defaultValue' if
// no value was provided in the configuration. The default float64 (if used) is formatted following platform-and-golang
// default precision and width (%f formatting).
func (c Nodeconfig) Float64Config(name string, defaultValue float64, minValue float64, maxValue float64) (float64, error) {
// set the default value, if not provided
_, ok := c[name]
if !ok {
c[name] = fmt.Sprintf("%f", defaultValue)
}

return c.Float64ConfigRequired(name, minValue, maxValue)
}

// Float64ConfigRequired validates and fetches the float64-typed required config value specified by 'name', returning an error
// if no value was provided in the configuration.
func (c Nodeconfig) Float64ConfigRequired(name string, minValue, maxValue float64) (float64, error) {
userValue, ok := c[name]
if !ok {
return 0, fmt.Errorf("missing config value [%s]", name)
}

f64Value, err := strconv.ParseFloat(userValue, 64)
if err != nil {
return 0, fmt.Errorf("expected float64 value for config [%s]", name)
}

if f64Value > maxValue || f64Value < minValue {
return 0, fmt.Errorf("config value [%s] requires value between [%f] and [%f]", name, minValue, maxValue)
}
return f64Value, nil
}
2 changes: 1 addition & 1 deletion node/elasticsearch/connectionfactory_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestReconnect(t *testing.T) {
metrics.Init("elasticsearch")

metrics := &Metrics{}
metrics.RegisterElasticIndexMetrics()
metrics.RegisterElasticIndexMetrics(0.1, 20.0, 8)

cf := newEsBulkServiceFactory(context.TODO(), "http://localhost:9200", "", "", 3, 10000, metrics)

Expand Down
17 changes: 16 additions & 1 deletion node/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,24 @@ func (i *Elasticsearch) Setup(cfgMap map[string]string) error {
return err
}

bulkProcessHistogramMin, err := config.Float64Config("histogram-min-bucket-sec", 0.01, 0.01, math.MaxFloat64)
if err != nil {
return err
}

bulkProcessHistogramMax, err := config.Float64Config("histogram-max-bucket-sec", float64(2*(bulkIndexTimeoutMs/1000)), 0.01, math.MaxFloat64)
if err != nil {
return err
}

bulkProcessingHistogramBuckets, err := config.IntConfig("histogram-bucket-count", 8, 1, math.MaxInt)
if err != nil {
return err
}

// initialize metrics
metrics := &Metrics{}
metrics.RegisterElasticIndexMetrics()
metrics.RegisterElasticIndexMetrics(bulkProcessHistogramMin, bulkProcessHistogramMax, bulkProcessingHistogramBuckets)

// service factory; in tests it must be prepopulated with a mock
if i.serviceFactory == nil {
Expand Down
18 changes: 16 additions & 2 deletions node/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package elasticsearch

import (
"fmt"
"math"
"strconv"
"testing"
Expand Down Expand Up @@ -33,8 +34,21 @@ func TestSetup(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, "expected integer value for config [batch-size]", err.Error())

config["batch-size"] = "10" // clean up the prev err
config["batch-max-wait-ms"] = "-99999999" // less than minvalue
config["batch-size"] = "10" // clean up the prev err
config["histogram-min-bucket-sec"] = "aaaa" // not a float64
err = e.Setup(config)
assert.Error(t, err)
assert.Equal(t, "expected float64 value for config [histogram-min-bucket-sec]", err.Error())

config["histogram-min-bucket-sec"] = "0.01" // clean up the prev err
config["bulk-index-timeout-seconds"] = "20" // needed for calculating max bucket
config["histogram-max-bucket-sec"] = "-0.01" // value out of bounds
err = e.Setup(config)
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("config value [%s] requires value between [%f] and [%f]", "histogram-max-bucket-sec", 0.01, math.MaxFloat64), err.Error())

config["histogram-max-bucket-sec"] = "40.0" // clean up the prev err
config["batch-max-wait-ms"] = "-99999999" // less than minvalue
err = e.Setup(config)
assert.Error(t, err)
assert.Equal(t, "config value [batch-max-wait-ms] requires value between [1] and [2147483647]", err.Error())
Expand Down
12 changes: 10 additions & 2 deletions node/elasticsearch/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@ type Metrics struct {
AvailableBatchRoutines prometheus.Gauge
}

// generateElasticsearchProcessTimeBuckets generates a bucket slice usable for bucketing the BulkProcessTime (or future
// prometheus.Histogram metrics)
func generateElasticsearchProcessTimeBuckets(min, max float64, count int) []float64 {
return prometheus.ExponentialBucketsRange(min, max, count)
}

// RegisterElasticIndexMetrics initializes metrics and registers them with the prometheus client.
func (m *Metrics) RegisterElasticIndexMetrics() {
// To support user-configurable bucketing of Histogram metrics, a min, max, and count value must be supplied for generating
// exponential buckets
func (m *Metrics) RegisterElasticIndexMetrics(min, max float64, count int) {
m.BulkErrors = *prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metrics.Get().AppMetricsPrefix,
Expand All @@ -43,7 +51,7 @@ func (m *Metrics) RegisterElasticIndexMetrics() {
Namespace: metrics.Get().AppMetricsPrefix,
Name: "bulk_process_time",
Help: "Time to write bulk logs to elasticsearch",
Buckets: prometheus.ExponentialBuckets(0.01, 3, 8),
Buckets: generateElasticsearchProcessTimeBuckets(min, max, count),
},
)

Expand Down
35 changes: 35 additions & 0 deletions node/elasticsearch/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package elasticsearch

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_generateElasticsearchProcessTimeBuckets(t *testing.T) {
type args struct {
min float64
max float64
count int
}
tests := []struct {
name string
args args
wantCount int
}{
{
name: "creates expected number of default buckets",
args: args{
min: 0.01,
max: 10,
count: 8,
},
wantCount: 8,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.wantCount, len(generateElasticsearchProcessTimeBuckets(tt.args.min, tt.args.max, tt.args.count)), "generateElasticsearchProcessTimeBuckets(%v, %v, %v)", tt.args.min, tt.args.max, tt.args.count)
})
}
}

0 comments on commit 08415cb

Please sign in to comment.