Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrades for ES 2 #25

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 60 additions & 39 deletions elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
#! /usr/bin/python
#Copyright 2014 Jeremy Carroll
# Copyright 2014 Jeremy Carroll
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import collectd
import json
import urllib2
import socket
import collections
import copy

PREFIX = "elasticsearch"
ES_CLUSTER = "elasticsearch"
ES_HOST = "localhost"
ES_PORT = 9200
ES_VERSION = "1.0"
ES_VERSION = "1.0.0"
ES_URL = ""
VERBOSE_LOGGING = False

Expand All @@ -34,29 +34,29 @@

# DICT: ElasticSearch 1.0.0
STATS_ES1 = {
## STORE
# STORE
'indices.store.throttle-time': Stat("counter", "nodes.%s.indices.store.throttle_time_in_millis"),

##SEARCH
# SEARCH
'indices.search.open-contexts': Stat("gauge", "nodes.%s.indices.search.open_contexts"),

##CACHE
# CACHE
'indices.cache.field.eviction': Stat("counter", "nodes.%s.indices.fielddata.evictions"),
'indices.cache.field.size': Stat("bytes", "nodes.%s.indices.fielddata.memory_size_in_bytes"),
'indices.cache.filter.evictions': Stat("counter", "nodes.%s.indices.filter_cache.evictions"),
'indices.cache.filter.size': Stat("bytes", "nodes.%s.indices.filter_cache.memory_size_in_bytes"),

##GC
# GC
'jvm.gc.time': Stat("counter", "nodes.%s.jvm.gc.collectors.young.collection_time_in_millis"),
'jvm.gc.count': Stat("counter", "nodes.%s.jvm.gc.collectors.young.collection_count"),
'jvm.gc.old-time': Stat("counter", "nodes.%s.jvm.gc.collectors.old.collection_time_in_millis"),
'jvm.gc.old-count': Stat("counter", "nodes.%s.jvm.gc.collectors.old.collection_count"),

## FLUSH
# FLUSH
'indices.flush.total': Stat("counter", "nodes.%s.indices.flush.total"),
'indices.flush.time': Stat("counter", "nodes.%s.indices.flush.total_time_in_millis"),

## MERGES
# # MERGES
'indices.merges.current': Stat("gauge", "nodes.%s.indices.merges.current"),
'indices.merges.current-docs': Stat("gauge", "nodes.%s.indices.merges.current_docs"),
'indices.merges.current-size': Stat("bytes", "nodes.%s.indices.merges.current_size_in_bytes"),
Expand All @@ -65,41 +65,46 @@
'indices.merges.total-size': Stat("bytes", "nodes.%s.indices.merges.total_size_in_bytes"),
'indices.merges.time': Stat("counter", "nodes.%s.indices.merges.total_time_in_millis"),

## REFRESH
# REFRESH
'indices.refresh.total': Stat("counter", "nodes.%s.indices.refresh.total"),
'indices.refresh.time': Stat("counter", "nodes.%s.indices.refresh.total_time_in_millis"),
}

# DICT: ElasticSearch 2.0.0
STATS_ES2 = copy.deepcopy(STATS_ES1)
del STATS_ES2['indices.cache.filter.evictions']
del STATS_ES2['indices.cache.filter.size']

# DICT: ElasticSearch 0.9.x
STATS_ES09 = {

##GC
# GC
'jvm.gc.time': Stat("counter", "nodes.%s.jvm.gc.collection_time_in_millis"),
'jvm.gc.count': Stat("counter", "nodes.%s.jvm.gc.collection_count"),

##CPU
# CPU
'process.cpu.percent': Stat("gauge", "nodes.%s.process.cpu.percent"),
}

# DICT: Common stuff
STATS = {

## DOCS
# DOCS
'indices.docs.count': Stat("gauge", "nodes.%s.indices.docs.count"),
'indices.docs.deleted': Stat("counter", "nodes.%s.indices.docs.deleted"),

## STORE
# STORE
'indices.store.size': Stat("bytes", "nodes.%s.indices.store.size_in_bytes"),

## INDEXING
# INDEXING
'indices.indexing.index-total': Stat("counter", "nodes.%s.indices.indexing.index_total"),
'indices.indexing.index-time': Stat("counter", "nodes.%s.indices.indexing.index_time_in_millis"),
'indices.indexing.delete-total': Stat("counter", "nodes.%s.indices.indexing.delete_total"),
'indices.indexing.delete-time': Stat("counter", "nodes.%s.indices.indexing.delete_time_in_millis"),
'indices.indexing.index-current': Stat("gauge", "nodes.%s.indices.indexing.index_current"),
'indices.indexing.delete-current': Stat("gauge", "nodes.%s.indices.indexing.delete_current"),

## GET
# GET
'indices.get.total': Stat("counter", "nodes.%s.indices.get.total"),
'indices.get.time': Stat("counter", "nodes.%s.indices.get.time_in_millis"),
'indices.get.exists-total': Stat("counter", "nodes.%s.indices.get.exists_total"),
Expand All @@ -108,7 +113,7 @@
'indices.get.missing-time': Stat("counter", "nodes.%s.indices.get.missing_time_in_millis"),
'indices.get.current': Stat("gauge", "nodes.%s.indices.get.current"),

## SEARCH
# SEARCH
'indices.search.query-current': Stat("gauge", "nodes.%s.indices.search.query_current"),
'indices.search.query-total': Stat("counter", "nodes.%s.indices.search.query_total"),
'indices.search.query-time': Stat("counter", "nodes.%s.indices.search.query_time_in_millis"),
Expand All @@ -117,14 +122,14 @@
'indices.search.fetch-time': Stat("counter", "nodes.%s.indices.search.fetch_time_in_millis"),

# JVM METRICS #
## MEM
# MEM
'jvm.mem.heap-committed': Stat("bytes", "nodes.%s.jvm.mem.heap_committed_in_bytes"),
'jvm.mem.heap-used': Stat("bytes", "nodes.%s.jvm.mem.heap_used_in_bytes"),
'jvm.mem.heap-used-percent': Stat("percent", "nodes.%s.jvm.mem.heap_used_percent"),
'jvm.mem.non-heap-committed': Stat("bytes", "nodes.%s.jvm.mem.non_heap_committed_in_bytes"),
'jvm.mem.non-heap-used': Stat("bytes", "nodes.%s.jvm.mem.non_heap_used_in_bytes"),

## THREADS
# THREADS
'jvm.threads.count': Stat("gauge", "nodes.%s.jvm.threads.count"),
'jvm.threads.peak': Stat("gauge", "nodes.%s.jvm.threads.peak_count"),

Expand Down Expand Up @@ -160,7 +165,8 @@ def lookup_stat(stat, json):

def configure_callback(conf):
"""Received configuration information"""
global ES_HOST, ES_PORT, ES_URL, ES_VERSION, VERBOSE_LOGGING, STATS_CUR
global ES_HOST, ES_PORT, ES_CLUSTER, ES_URL, ES_VERSION, VERBOSE_LOGGING, STATS_CUR

for node in conf.children:
if node.key == 'Host':
ES_HOST = node.values[0]
Expand All @@ -175,21 +181,33 @@ def configure_callback(conf):
else:
collectd.warning('elasticsearch plugin: Unknown config key: %s.'
% node.key)
if ES_VERSION == "1.0":

major, _, _ = ES_VERSION.split('.')
major = int(major)

if major >= 1 and major < 2:
ES_URL = "http://" + ES_HOST + ":" + str(ES_PORT) + "/_nodes/_local/stats/transport,http,process,jvm,indices,thread_pool"
STATS_CUR = dict(STATS.items() + STATS_ES1.items())
elif major >= 2 and major < 3:
ES_URL = "http://" + ES_HOST + ":" + str(ES_PORT) + "/_nodes/_local/stats/transport,http,process,jvm,indices,thread_pool"
STATS_CUR = dict(STATS.items() + STATS_ES2.items())
else:
ES_URL = "http://" + ES_HOST + ":" + str(ES_PORT) + "/_cluster/nodes/_local/stats?http=true&process=true&jvm=true&transport=true&thread_pool=true"
STATS_CUR = dict(STATS.items() + STATS_ES09.items())

# add info on thread pools
for pool in ['generic', 'index', 'get', 'snapshot', 'merge', 'optimize', 'bulk', 'warmer', 'flush', 'search', 'refresh']:
for attr in ['threads', 'queue', 'active', 'largest']:
path = 'thread_pool.{0}.{1}'.format(pool, attr)
STATS_CUR[path] = Stat("gauge", 'nodes.%s.{0}'.format(path))
for attr in ['completed', 'rejected']:
path = 'thread_pool.{0}.{1}'.format(pool, attr)
STATS_CUR[path] = Stat("counter", 'nodes.%s.{0}'.format(path))
if major == 2:
pools = ['generic', 'index', 'get', 'snapshot', 'force_merge', 'bulk', 'warmer', 'flush', 'search', 'refresh', 'fetch_shard_started', 'fetch_shard_store', 'listener', 'management', 'percolate', 'suggest']
else:
pools = ['generic', 'index', 'get', 'snapshot', 'merge', 'optimize', 'bulk', 'warmer', 'flush', 'search', 'refresh']

for pool in pools:
for attr in ['threads', 'queue', 'active', 'largest']:
path = 'thread_pool.{0}.{1}'.format(pool, attr)
STATS_CUR[path] = Stat("gauge", 'nodes.%s.{0}'.format(path))
for attr in ['completed', 'rejected']:
path = 'thread_pool.{0}.{1}'.format(pool, attr)
STATS_CUR[path] = Stat("counter", 'nodes.%s.{0}'.format(path))

log_verbose('Configured with version=%s, host=%s, port=%s, url=%s' % (ES_VERSION, ES_HOST, ES_PORT, ES_URL))

Expand All @@ -202,9 +220,10 @@ def fetch_stats():
except urllib2.URLError, e:
collectd.error('elasticsearch plugin: Error connecting to %s - %r' % (ES_URL, e))
return None
print result['cluster_name']

ES_CLUSTER = result['cluster_name']
log_verbose('elasticsearch cluster: %s' % ES_CLUSTER)

return parse_stats(result)


Expand All @@ -220,6 +239,7 @@ def dispatch_stat(result, name, key):
if result is None:
collectd.warning('elasticsearch plugin: Value not found for %s' % name)
return

estype = key.type
value = int(result)
log_verbose('Sending value[%s]: %s=%s' % (estype, name, value))
Expand All @@ -234,7 +254,7 @@ def dispatch_stat(result, name, key):

def read_callback():
log_verbose('Read callback called')
stats = fetch_stats()
fetch_stats()


def dig_it_up(obj, path):
Expand All @@ -251,5 +271,6 @@ def log_verbose(msg):
return
collectd.info('elasticsearch plugin [verbose]: %s' % msg)


collectd.register_config(configure_callback)
collectd.register_read(read_callback)