Skip to content

Commit

Permalink
Merge pull request #10 from kdrakon/1.1.0/fixing-deadlock
Browse files Browse the repository at this point in the history
1.1.0/fixing deadlock
  • Loading branch information
Sean Policarpio authored Jun 30, 2017
2 parents 3d0ae9d + bf98fa2 commit 52488fc
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 182 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ sudo: false
language: scala

scala:
- 2.11.8
- 2.12.1
- 2.11.11
- 2.12.2

jdk:
- oraclejdk8
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 1.1.0
## Fixed
- provides fix for dead-lock caused by unseen synchronized block caused by ch.qos.logback.core.AppenderBase
- the fix for the dead-lock is addressed by removing the use of blocking queue for logging events. As a result, this also removed the need for SplunkHecAppenderStrategy's
## Changed
- additionally, added Monix circuit breakers around consuming the log event stream and pushing data to Splunk via HTTP
- updated Monix to 2.3.0
- cross compiling to Scala 2.11.11 and 2.12.2

## 1.0.3
- Updated dependencies to latest versions.

Expand Down
25 changes: 1 addition & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ Some of the inspiration for this appender is based on the capabilities of the [l
It is implemented using the principles of reactive streams. This was very straightforwardly done using the [Monix library](https://monix.io/).

## Compatability
- developed using **Scala 2.11.8**
- due to the use of the Skinny Framework's HTTP client, the minimum **Java** version is **8**. However, I haven't tested the appender in a Java project yet. Technically, it should be compatible, but please let me know if it definitely works.
- I have tested the appender against an Enterprise Splunk Cloud HTTP Event Collector (*version 6.4.1.2*).
- I have tested the appender against the Enterprise Splunk Cloud HTTP Event Collector (*version 6.6.1*).

## Configuration
### Sample XML Configuration
Expand Down Expand Up @@ -56,9 +55,6 @@ It is implemented using the principles of reactive streams. This was very straig
- `<token>`
- The token that authorizes posting to the HEC endpoint
- e.g. _1234-5678-91011-ABC-321_
- `<queue>`
- A maximum queue size for log messages to be stored in memory. If this fills up and log messages are not posted to Splunk in a timely manner, then the queue will by default cause blocking to occur on subsequent appends (see [alternatives](#splunkhecappenderstrategy) to this below).
- 1000 *(default)*
- `<buffer>`
- Log messages are buffered in memory off of the queue. Once a buffer is filled, logs are instantly posted to the HEC endpoint. This size also signifies the maximum payload size sent to the endpoint.
- 25 *(default)*
Expand Down Expand Up @@ -120,25 +116,6 @@ package object json {

####Custom Layout
You can override the layout with a class extending either `SplunkHecJsonLayout`,`SplunkHecJsonLayoutBase`, or `LayoutBase[ILoggingEvent]`. Then `<layout>` can be specified in the `<appender>` section — see the XML example above

###SplunkHecAppenderStrategy

By default, `SplunkHecAppender` will use the calling thread (i.e. a thread from your application) when queueing new log messages on to the internal queue. If that queue happens to be full, possibly because the Splunk HEC API requests are taking too long, then blocking can take more time. This default behaviour is defined by the _appender strategy_ `BlockingSplunkHecAppenderStrategy`. There exists two more strategies:

- `AsyncSplunkHecAppenderStrategy`
- This strategy uses an internal executor service (a `ForkJoinPool`) to asynchronously enqueue new log events. If the queue is full, your applications thread won't block. However, this means that background queue tasks could pile up if the queue is not drained fast enough (again, usually due to slowness with HTTP requests to the Splunk HEC). By default, the executor services parallelism factor is equal to the number of CPU cores available to the VM. This can be overridden with the `parallelism` setting.
```xml
<!-- add this to your appender section -->
<appenderStrategy class="io.policarp.logback.AsyncSplunkHecAppenderStrategy">
<parallelism>8</parallelism>
</appenderStrategy>
```
- `SpillingSplunkHecAppenderStrategy`
- This strategy simply drops logging events when the internal queue has been filled up. Logging events will resume to be streamed to the Splunk HEC once the queue has the capacity again. However, due to the concurrent nature of the internal queue, the capacity check performed can't be guaranteed, so edge cases of the queues capacity can cause some blocking of your applications calling thread.
```xml
<!-- add this to your appender section -->
<appenderStrategy class="io.policarp.logback.SpillingSplunkHecAppenderStrategy"/>
```

## HTTP Client
The base implementation uses the [skinny-framework's HTTP client](https://github.com/skinny-framework/skinny-framework). It is a tiny library and does not bring with it many dependencies. `SplunkHecAppender` uses `SkinnyHttpHecClient` for HTTP communication.
Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name := """splunk-logback-hec-appender"""
version := "1.0.3"
scalaVersion := "2.12.1"
crossScalaVersions := Seq("2.11.8", "2.12.1")
version := "1.1.0"
scalaVersion := "2.12.2"
crossScalaVersions := Seq("2.11.11", "2.12.2")
organization := "io.policarp"
homepage := Some(url("https://github.com/kdrakon/splunk-logback-hec-appender"))
scmInfo := Some(
Expand All @@ -19,7 +19,7 @@ libraryDependencies ++= Seq(

"org.json4s" %% "json4s-native" % "3.5.0",

"io.monix" %% "monix" % "2.1.1",
"io.monix" %% "monix" % "2.3.0",

"ch.qos.logback" % "logback-core" % "1.1.7",
"ch.qos.logback" % "logback-classic" % "1.1.7",
Expand Down
49 changes: 32 additions & 17 deletions src/main/scala/io/policarp/logback/SplunkHecAppender.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package io.policarp.logback

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ConcurrentLinkedQueue

import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.AppenderBase
import io.policarp.logback.hec.SplunkHecClient
import io.policarp.logback.hec.skinnyhttp.{ SkinnyHecClient, SkinnyHttpLogFilter }
import monix.eval.Task
import io.policarp.logback.hec.skinnyhttp.{SkinnyHecClient, SkinnyHttpLogFilter}
import monix.eval.{Task, TaskCircuitBreaker}
import monix.execution.Scheduler
import monix.reactive.{ Consumer, Observable }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import monix.execution.atomic.AtomicLong
import monix.reactive.{Consumer, Observable}
import org.reactivestreams.{Publisher, Subscriber, Subscription}

import scala.beans.BeanProperty
import scala.concurrent.duration._
Expand All @@ -21,33 +22,38 @@ class SplunkHecAppender extends SplunkHecAppenderBase with SkinnyHecClient {
trait SplunkHecAppenderBase extends AppenderBase[ILoggingEvent] {
self: SplunkHecClient =>

@BeanProperty var queue: Int = 1000
@BeanProperty var buffer: Int = 25
@BeanProperty var flush: Int = 30
@BeanProperty var parallelism: Int = Runtime.getRuntime.availableProcessors()
@BeanProperty var layout: SplunkHecJsonLayoutBase = SplunkHecJsonLayout()
@BeanProperty var appenderStrategy: SplunkHecAppenderStrategy = BlockingSplunkHecAppenderStrategy()

private implicit val scheduler = Scheduler.computation(parallelism)

private implicit lazy val logPublisher = new LogPublisher(queue)
private implicit lazy val logPublisher = new LogPublisher()
private lazy val logStream = Observable.fromReactivePublisher(logPublisher).bufferTimedAndCounted(flush seconds, buffer)
private lazy val logConsumer = Consumer.foreachParallelAsync[Task[Unit]](parallelism)(task => task)

override def start() = {
super.start()
implicit val impliedLayout = layout
logStream.map(postTask).runWith(logConsumer).runAsync
logStream.map(postTask).consumeWith(logConsumer).runAsync
}

override def append(event: ILoggingEvent) = appenderStrategy.append(event)
override def append(event: ILoggingEvent) = logPublisher.enqueue(event)
}

private[logback] class LogPublisher(queueSize: Int)(implicit scheduler: Scheduler) extends Publisher[ILoggingEvent] {
private[logback] class LogPublisher()(implicit scheduler: Scheduler) extends Publisher[ILoggingEvent] {

val logQueue = new LinkedBlockingQueue[ILoggingEvent](queueSize)
val logQueue = new ConcurrentLinkedQueue[ILoggingEvent]()

def enqueue(event: ILoggingEvent) = logQueue.put(event)
private val LogPublisherCircuitBreaker = TaskCircuitBreaker (
maxFailures = 5,
resetTimeout = 10 nanos,
exponentialBackoffFactor = 2,
maxResetTimeout = 1 minute
)

def enqueue(event: ILoggingEvent) = logQueue.add(event)

override def subscribe(subscriber: Subscriber[_ >: ILoggingEvent]) = {
subscriber.onSubscribe(new Subscription {
Expand All @@ -57,11 +63,20 @@ private[logback] class LogPublisher(queueSize: Int)(implicit scheduler: Schedule
override def cancel() = { cancelled = true }

override def request(n: Long) = if (!cancelled) {
Task {
for (i <- 1L to n) {
subscriber.onNext(logQueue.take())
val requests = AtomicLong(n)
val pollingTask = Task {
val currentRequests = requests.get
for (_ <- 1L to currentRequests) {
Option(logQueue.poll()) match {
case None =>
throw new IllegalStateException("logQueue is empty") // so trigger circuit breaker
case Some(event) =>
subscriber.onNext(event)
requests.decrement()
}
}
}.runAsync
}
LogPublisherCircuitBreaker.protect(pollingTask).onErrorRestartIf(_ => requests.get != 0).runAsync
}
})
}
Expand Down
70 changes: 0 additions & 70 deletions src/main/scala/io/policarp/logback/SplunkHecAppenderStrategy.scala

This file was deleted.

21 changes: 17 additions & 4 deletions src/main/scala/io/policarp/logback/hec/SplunkHecClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.LayoutBase
import ch.qos.logback.core.filter.Filter
import ch.qos.logback.core.spi.FilterReply
import monix.eval.Task
import skinny.http.{ HTTP, Request }
import monix.eval.{Task, TaskCircuitBreaker}
import skinny.http.{HTTP, Request}

import scala.beans.BeanProperty
import scala.concurrent.duration._

trait SplunkHecClient {
import SplunkHecClient._

@BeanProperty var splunkUrl: String = ""
@BeanProperty var token: String = ""
Expand All @@ -30,13 +32,24 @@ trait SplunkHecClient {
* @param layout the layout to use when posting the events
* @return the Monix Task will asynchronously perform the job of posting the logs and will return Unit
*/
def postTask(events: Seq[ILoggingEvent])(implicit layout: LayoutBase[ILoggingEvent]): Task[Unit] = Task {
prepareRequest(events, layout).foreach(executeRequest)
def postTask(events: Seq[ILoggingEvent])(implicit layout: LayoutBase[ILoggingEvent]): Task[Unit] = {
SplunkHecClientCircuitBreaker.protect(
Task {
prepareRequest(events, layout).foreach(executeRequest)
}
)
}
}

object SplunkHecClient {

private[hec] val SplunkHecClientCircuitBreaker = TaskCircuitBreaker (
maxFailures = 5,
resetTimeout = 5 seconds,
exponentialBackoffFactor = 2,
maxResetTimeout = 10 minute
)

/**
* Creates a newline separated list of individual Splunk JSON events
*/
Expand Down
6 changes: 3 additions & 3 deletions src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

<appender name="splunk" class="io.policarp.logback.SplunkHecAppender">

<splunkUrl>htttp://somewhere.com/</splunkUrl>
<!--use start-local-splunk.sh-->
<splunkUrl>http://localhost:8088/services/collector/event</splunkUrl>
<token>A106745E-0F59-40FE-AF3A-4BAE49759791</token>
<buffer>10</buffer>
<flush>10</flush>

<appenderStrategy class="io.policarp.logback.AsyncSplunkHecAppenderStrategy"/>

<layout class="io.policarp.logback.SplunkHecJsonLayout">
<source>testing-sean</source>
<custom>user=${USER}</custom>
Expand Down
Loading

0 comments on commit 52488fc

Please sign in to comment.