diff --git a/docs/node-elasticsearch.md b/docs/node-elasticsearch.md index f8304cd..a81cc10 100644 --- a/docs/node-elasticsearch.md +++ b/docs/node-elasticsearch.md @@ -11,26 +11,29 @@ The `elasticsearch` node indexes documents to an Elasticsearch cluster. Its pa The IndexRequest type contains four fields: -Field | Required | Default | Description -------------------------|:--------:|---------|-------------- -Index | * | | Destination index name in Elasticsearch. -MappingType | | | In ES 7.x+, omit MappingType and ES will default to `_doc`. In ES 6.x-, specify your mapping type. -DocID | | | Provide a document ID to index this document under, or leave nil and ES will assign an ID. -Doc | * | | The document to index. This should be a struct that is marshallable to JSON. +| Field | Required | Default | Description | +|-------------|:--------:|---------|-----------------------------------------------------------------------------------------------------| +| Index | * | | Destination index name in Elasticsearch. | +| MappingType | | | In ES 7.x+, omit MappingType and ES will default to `_doc`. In ES 6.x-, specify your mapping type. | +| DocID | | | Provide a document ID to index this document under, or leave nil and ES will assign an ID. | +| Doc | * | | The document to index. This should be a struct that is marshallable to JSON. | Internally, `elasticsearch` uses the `BulkService` in the `olivere/elastic` client. ## Configuration -Param | Required | Default | Description ---------------------------|:--------:|---------|-------------- -elastic-addr | * | | Comma-separated list of Elasticsearch client nodes. -elastic-username | | | Username used in http/basic authentication. -elastic-password | | | Password used in http/basic authentication. -batch-size | | 100 | Wait until this many documents are collected and send them as a single batch. -batch-max-wait-ms | | 1000 | Max time, in ms, to wait for `batch-size` documents to be ready before sending a smaller batch. -bulk-index-timeout-ms | | 5000 | Timeout passed to Elasticsearch along with the bulk index request. -reconnect-batch-count | | 10000 | Reestablish connections to ES after this many batches. Useful to ensure that load remains distributed among ES client nodes as they are created or fail. -bulk-index-max-retries | | 3 | Number of times to retry indexing errors. Mapping errors / field type conflicts are not retryable. -bulk-index-timeout-seconds| | 20 | Forcibly cancel individual bulk indexing operations after this time. -index-workers | | 1 | Number of goroutine workers to use to process batch indexing operations against Elasticsearch. +| Param | Required | Default | Description | +|----------------------------|:--------:|----------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------| +| elastic-addr | * | | Comma-separated list of Elasticsearch client nodes. | +| elastic-username | | | Username used in http/basic authentication. | +| elastic-password | | | Password used in http/basic authentication. | +| batch-size | | 100 | Wait until this many documents are collected and send them as a single batch. | +| batch-max-wait-ms | | 1000 | Max time, in ms, to wait for `batch-size` documents to be ready before sending a smaller batch. | +| bulk-index-timeout-ms | | 5000 | Timeout passed to Elasticsearch along with the bulk index request. | +| reconnect-batch-count | | 10000 | Reestablish connections to ES after this many batches. Useful to ensure that load remains distributed among ES client nodes as they are created or fail. | +| bulk-index-max-retries | | 3 | Number of times to retry indexing errors. Mapping errors / field type conflicts are not retryable. | +| bulk-index-timeout-seconds | | 20 | Forcibly cancel individual bulk indexing operations after this time. | +| index-workers | | 1 | Number of goroutine workers to use to process batch indexing operations against Elasticsearch. | +| histogram-min-bucket-sec | | 0.01 | Smallest bucket describing observed Elasticsearch client indexing time. | +| histogram-max-bucket-sec | | 2 * (bulk-index-timeout-ms/1000) | Maximum bucket describing observed Elasticsearch client indexing time. | +| histogram-bucket-count | | 8 | Number of buckets to generate between min and max histogram bucket seconds. | diff --git a/helpers.go b/helpers.go index e4ad383..51b1d67 100644 --- a/helpers.go +++ b/helpers.go @@ -60,3 +60,35 @@ func (c Nodeconfig) StringConfigRequired(name string) (string, error) { } return userValue, nil } + +// Float64Config validates and fetches the flaot-typed optional config value specified by 'name', using the 'defaultValue' if +// no value was provided in the configuration. The default float64 (if used) is formatted following platform-and-golang +// default precision and width (%f formatting). +func (c Nodeconfig) Float64Config(name string, defaultValue float64, minValue float64, maxValue float64) (float64, error) { + // set the default value, if not provided + _, ok := c[name] + if !ok { + c[name] = fmt.Sprintf("%f", defaultValue) + } + + return c.Float64ConfigRequired(name, minValue, maxValue) +} + +// Float64ConfigRequired validates and fetches the float64-typed required config value specified by 'name', returning an error +// if no value was provided in the configuration. +func (c Nodeconfig) Float64ConfigRequired(name string, minValue, maxValue float64) (float64, error) { + userValue, ok := c[name] + if !ok { + return 0, fmt.Errorf("missing config value [%s]", name) + } + + f64Value, err := strconv.ParseFloat(userValue, 64) + if err != nil { + return 0, fmt.Errorf("expected float64 value for config [%s]", name) + } + + if f64Value > maxValue || f64Value < minValue { + return 0, fmt.Errorf("config value [%s] requires value between [%f] and [%f]", name, minValue, maxValue) + } + return f64Value, nil +} diff --git a/node/elasticsearch/connectionfactory_int_test.go b/node/elasticsearch/connectionfactory_int_test.go index 6425f44..f38ac61 100644 --- a/node/elasticsearch/connectionfactory_int_test.go +++ b/node/elasticsearch/connectionfactory_int_test.go @@ -17,7 +17,7 @@ func TestReconnect(t *testing.T) { metrics.Init("elasticsearch") metrics := &Metrics{} - metrics.RegisterElasticIndexMetrics() + metrics.RegisterElasticIndexMetrics(0.1, 20.0, 8) cf := newEsBulkServiceFactory(context.TODO(), "http://localhost:9200", "", "", 3, 10000, metrics) diff --git a/node/elasticsearch/elasticsearch.go b/node/elasticsearch/elasticsearch.go index 2fd424c..90562d1 100644 --- a/node/elasticsearch/elasticsearch.go +++ b/node/elasticsearch/elasticsearch.go @@ -82,9 +82,24 @@ func (i *Elasticsearch) Setup(cfgMap map[string]string) error { return err } + bulkProcessHistogramMin, err := config.Float64Config("histogram-min-bucket-sec", 0.01, 0.01, math.MaxFloat64) + if err != nil { + return err + } + + bulkProcessHistogramMax, err := config.Float64Config("histogram-max-bucket-sec", float64(2*(bulkIndexTimeoutMs/1000)), 0.01, math.MaxFloat64) + if err != nil { + return err + } + + bulkProcessingHistogramBuckets, err := config.IntConfig("histogram-bucket-count", 8, 1, math.MaxInt) + if err != nil { + return err + } + // initialize metrics metrics := &Metrics{} - metrics.RegisterElasticIndexMetrics() + metrics.RegisterElasticIndexMetrics(bulkProcessHistogramMin, bulkProcessHistogramMax, bulkProcessingHistogramBuckets) // service factory; in tests it must be prepopulated with a mock if i.serviceFactory == nil { diff --git a/node/elasticsearch/elasticsearch_test.go b/node/elasticsearch/elasticsearch_test.go index 209b78a..20d2eb9 100644 --- a/node/elasticsearch/elasticsearch_test.go +++ b/node/elasticsearch/elasticsearch_test.go @@ -1,6 +1,7 @@ package elasticsearch import ( + "fmt" "math" "strconv" "testing" @@ -33,8 +34,21 @@ func TestSetup(t *testing.T) { assert.Error(t, err) assert.Equal(t, "expected integer value for config [batch-size]", err.Error()) - config["batch-size"] = "10" // clean up the prev err - config["batch-max-wait-ms"] = "-99999999" // less than minvalue + config["batch-size"] = "10" // clean up the prev err + config["histogram-min-bucket-sec"] = "aaaa" // not a float64 + err = e.Setup(config) + assert.Error(t, err) + assert.Equal(t, "expected float64 value for config [histogram-min-bucket-sec]", err.Error()) + + config["histogram-min-bucket-sec"] = "0.01" // clean up the prev err + config["bulk-index-timeout-seconds"] = "20" // needed for calculating max bucket + config["histogram-max-bucket-sec"] = "-0.01" // value out of bounds + err = e.Setup(config) + assert.Error(t, err) + assert.Equal(t, fmt.Sprintf("config value [%s] requires value between [%f] and [%f]", "histogram-max-bucket-sec", 0.01, math.MaxFloat64), err.Error()) + + config["histogram-max-bucket-sec"] = "40.0" // clean up the prev err + config["batch-max-wait-ms"] = "-99999999" // less than minvalue err = e.Setup(config) assert.Error(t, err) assert.Equal(t, "config value [batch-max-wait-ms] requires value between [1] and [2147483647]", err.Error()) diff --git a/node/elasticsearch/metrics.go b/node/elasticsearch/metrics.go index 01614a5..47767db 100644 --- a/node/elasticsearch/metrics.go +++ b/node/elasticsearch/metrics.go @@ -18,8 +18,16 @@ type Metrics struct { AvailableBatchRoutines prometheus.Gauge } +// generateElasticsearchProcessTimeBuckets generates a bucket slice usable for bucketing the BulkProcessTime (or future +// prometheus.Histogram metrics) +func generateElasticsearchProcessTimeBuckets(min, max float64, count int) []float64 { + return prometheus.ExponentialBucketsRange(min, max, count) +} + // RegisterElasticIndexMetrics initializes metrics and registers them with the prometheus client. -func (m *Metrics) RegisterElasticIndexMetrics() { +// To support user-configurable bucketing of Histogram metrics, a min, max, and count value must be supplied for generating +// exponential buckets +func (m *Metrics) RegisterElasticIndexMetrics(min, max float64, count int) { m.BulkErrors = *prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: metrics.Get().AppMetricsPrefix, @@ -43,7 +51,7 @@ func (m *Metrics) RegisterElasticIndexMetrics() { Namespace: metrics.Get().AppMetricsPrefix, Name: "bulk_process_time", Help: "Time to write bulk logs to elasticsearch", - Buckets: prometheus.ExponentialBuckets(0.01, 3, 8), + Buckets: generateElasticsearchProcessTimeBuckets(min, max, count), }, ) diff --git a/node/elasticsearch/metrics_test.go b/node/elasticsearch/metrics_test.go new file mode 100644 index 0000000..fa82f57 --- /dev/null +++ b/node/elasticsearch/metrics_test.go @@ -0,0 +1,35 @@ +package elasticsearch + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_generateElasticsearchProcessTimeBuckets(t *testing.T) { + type args struct { + min float64 + max float64 + count int + } + tests := []struct { + name string + args args + wantCount int + }{ + { + name: "creates expected number of default buckets", + args: args{ + min: 0.01, + max: 10, + count: 8, + }, + wantCount: 8, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.wantCount, len(generateElasticsearchProcessTimeBuckets(tt.args.min, tt.args.max, tt.args.count)), "generateElasticsearchProcessTimeBuckets(%v, %v, %v)", tt.args.min, tt.args.max, tt.args.count) + }) + } +}