Skip to content
This repository has been archived by the owner on Jul 14, 2024. It is now read-only.

Monitoring Component Assessment

Victor Voisin edited this page Mar 20, 2014 · 1 revision

##Riemann

In RiemannEventTest, we are sending three events to Riemann. Using the configuration below, it will give us a count of 2 for query ?name=Lyon and 1 for ?name=Paris.

#Launch
bin/riemann etc/riemann.config
#From source
lein run

##Logstash

It's possible to check how Logstash reads mongo logs files by using simple-mongo-cluster to easily deploy a mongo cluster. Then, you could use mongostat as following to check the status of the cluster. mongostat --host localhost:24000 --discover --noheaders -n 2 30 > mongostat.log

Options --noheaders and -n aren't working with --discover in version 2.4.9. It's fixed in master.

#Launch
sudo java -jar logstash-1.3.3-flatjar.jar agent -f logstash.conf

##Configuration

Basic configuration examples.

####Logstash

input {
	file {
		type => "linux-syslog"
		path => ["/var/log/syslog" ]
	}
	tcp {
		type => "Riemann"
		port => 9999
	}
	file {
		type => "mongoconf"
		path => ["./simple-mongo-cluster/mongodb/conf/conf*.log"]
	}
    	file {
		type => "mongod"
		path => ["./simple-mongo-cluster/mongodb/rs*/rs*.log"]
	}
	file {
		type => "mongos"
		path => ["./simple-mongo-cluster/mongodb/mongos.log"]
	}
	file {
	    type => "mongostat"
	    path => ["./mongostat.log"]
	}
}
filter {
    if [type] == "mongostat" {
        grok {
            patterns_dir => "./patterns"
            match => ["message", "%{HOSTNAME:host}:%{INT:port}%{SPACE}%{METRIC:insert}%{SPACE}%{METRIC:query}%{SPACE}%{METRIC:update}%{SPACE}%{METRIC:delete}%{SPACE}%{METRIC:getmore}%{SPACE}%{COMMAND:command}%{MONGOTYPE1}%{SIZE:vsize}%{SPACE}%{SIZE:res}%{SPACE}%{NUMBER:fault}%{MONGOTYPE2}%{SIZE:netIn}%{SPACE}%{SIZE:netOut}%{SPACE}%{NUMBER:connections}%{SPACE}%{USERNAME:replicaset}%{SPACE}%{WORD:replicaMember}%{SPACE}%{TIME:time}"]
        }
    }
    
    if [tags] == "_grokparsefailure" { 
        drop { } 
    }
    
    if [message] == "" { 
        drop { } 
    }
}
output {
	stdout { }
	elasticsearch_http {
		host => "127.0.0.1"
	}
}

Patterns in dir patterns. Used to parse mongostat data.

METRIC (\*%{NUMBER})|(%{NUMBER})
COMMAND (%{NUMBER}\|%{NUMBER})|(%{NUMBER})
SIZE (%{NUMBER}[a-z])|(%{NUMBER})
LOCKEDDB (%{WORD}\:%{NUMBER}%)
MONGOTYPE2 (%{SPACE}%{LOCKEDDB:lockedDb}%{SPACE}%{NUMBER:indexMissedPercent}%{SPACE}%{COMMAND:QrQw}%{SPACE}%{COMMAND:ArAw}%{SPACE})|%{SPACE}
MONGOTYPE1 (%{SPACE}%{NUMBER:flushes}%{SPACE}%{SIZE:mapped}%{SPACE})|%{SPACE}

####Riemann

; -*- mode: clojure; -*-
; vim: filetype=clojure

(logging/init :file "riemann.log")

; Listen on the local interface over TCP (5555), UDP (5555), and websockets
; (5556)
(let [host "127.0.0.1"]
  (tcp-server :host host)
  (udp-server :host host)
  (ws-server  :host host))

  ; Expire old events from the index every 5 seconds.
  (periodically-expire 5)

  (let [index (default :ttl 3 (index))]
  
  
  (defn newrate
  "Take the sum of every event over interval seconds. 
   Emits one event every interval seconds. Starts as soon as an event is
received, stops when an expired event arrives. Uses the most recently
received event with a metric as a template. Event ttls decrease constantly if
no new events arrive."
  [interval & children]
  (assert (< 0 interval))
  (let [last-event (atom nil)
        sum (atom '(0 0))

        add-sum (fn add-sum [[current previous] addend]
                              (list (+ current addend) previous))
        swap-sum (fn swap-sum [[current previous]]
                               (list 0 current))

        swap-event (fn swap-event [e sum]
                     (let [e (merge e {:metric sum 
                                       :time (unix-time)})]
                       (if-let [ttl (:ttl e)]
                         (assoc e :ttl (- ttl interval))
                         e)))

        tick (bound-fn tick []
               ; Get last metric
               (let [sum (second (swap! sum swap-sum))
                     event (swap! last-event swap-event sum)]
                 ; Forward event to children.
                 (call-rescue event children)))

        poller (periodically-until-expired interval interval tick)]

    (fn rate' [event]
      (when-let [m (:metric event)]
        ; TTLs decay by interval when emitted, so we add interval once.
        ; That way, incoming and outgoing TTLs, under constant event flow, are
        ; the same.
        (reset! last-event
                (if-let [ttl (:ttl event)]
                  (assoc event :ttl (+ ttl interval))
                  event))
        (swap! sum add-sum m))
      (poller event))))
  
  ;Send aggregate events to logstash
  ;Riemann should include this PR: https://github.com/aphyr/riemann/pull/341
  (def logclient (logstash {:host "0.0.0.0" :port 9999}))
  
  ;Sum up how many time a query is made
  ;Assuming that queries are found in the field description
  (streams
    (where (service "queryLogging")
      #(info %)
      (by [:description]
        ;20s Interval
        (newrate 20
          #(logclient %)
        )
      )  
    )
  )
  

  ; Inbound events will be passed to these streams:
  (streams

    ; Index all events immediately.
    index

    ; Calculate an overall rate of events.
    (with {:metric 1 :host nil :state "ok" :service "events/sec"}
      (rate 5 index))

    ; Log expired events.
    (expired
      (fn [event] (info "expired" event)))
))