diff --git a/.travis.yml b/.travis.yml index f2383b0..92e1965 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,8 +3,8 @@ sudo: false language: scala scala: - - 2.11.8 - - 2.12.1 + - 2.11.11 + - 2.12.2 jdk: - oraclejdk8 diff --git a/CHANGELOG.md b/CHANGELOG.md index 792e018..928d64c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## 1.1.0 +## Fixed +- provides fix for dead-lock caused by unseen synchronized block caused by ch.qos.logback.core.AppenderBase +- the fix for the dead-lock is addressed by removing the use of blocking queue for logging events. As a result, this also removed the need for SplunkHecAppenderStrategy's +## Changed +- additionally, added Monix circuit breakers around consuming the log event stream and pushing data to Splunk via HTTP +- updated Monix to 2.3.0 +- cross compiling to Scala 2.11.11 and 2.12.2 + ## 1.0.3 - Updated dependencies to latest versions. diff --git a/README.md b/README.md index 1681871..dea0afc 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,8 @@ Some of the inspiration for this appender is based on the capabilities of the [l It is implemented using the principles of reactive streams. This was very straightforwardly done using the [Monix library](https://monix.io/). ## Compatability -- developed using **Scala 2.11.8** - due to the use of the Skinny Framework's HTTP client, the minimum **Java** version is **8**. However, I haven't tested the appender in a Java project yet. Technically, it should be compatible, but please let me know if it definitely works. -- I have tested the appender against an Enterprise Splunk Cloud HTTP Event Collector (*version 6.4.1.2*). +- I have tested the appender against the Enterprise Splunk Cloud HTTP Event Collector (*version 6.6.1*). ## Configuration ### Sample XML Configuration @@ -56,9 +55,6 @@ It is implemented using the principles of reactive streams. This was very straig - `` - The token that authorizes posting to the HEC endpoint - e.g. _1234-5678-91011-ABC-321_ -- `` - - A maximum queue size for log messages to be stored in memory. If this fills up and log messages are not posted to Splunk in a timely manner, then the queue will by default cause blocking to occur on subsequent appends (see [alternatives](#splunkhecappenderstrategy) to this below). - - 1000 *(default)* - `` - Log messages are buffered in memory off of the queue. Once a buffer is filled, logs are instantly posted to the HEC endpoint. This size also signifies the maximum payload size sent to the endpoint. - 25 *(default)* @@ -120,25 +116,6 @@ package object json { ####Custom Layout You can override the layout with a class extending either `SplunkHecJsonLayout`,`SplunkHecJsonLayoutBase`, or `LayoutBase[ILoggingEvent]`. Then `` can be specified in the `` section — see the XML example above - -###SplunkHecAppenderStrategy - -By default, `SplunkHecAppender` will use the calling thread (i.e. a thread from your application) when queueing new log messages on to the internal queue. If that queue happens to be full, possibly because the Splunk HEC API requests are taking too long, then blocking can take more time. This default behaviour is defined by the _appender strategy_ `BlockingSplunkHecAppenderStrategy`. There exists two more strategies: - -- `AsyncSplunkHecAppenderStrategy` - - This strategy uses an internal executor service (a `ForkJoinPool`) to asynchronously enqueue new log events. If the queue is full, your applications thread won't block. However, this means that background queue tasks could pile up if the queue is not drained fast enough (again, usually due to slowness with HTTP requests to the Splunk HEC). By default, the executor services parallelism factor is equal to the number of CPU cores available to the VM. This can be overridden with the `parallelism` setting. -```xml - - - 8 - -``` -- `SpillingSplunkHecAppenderStrategy` - - This strategy simply drops logging events when the internal queue has been filled up. Logging events will resume to be streamed to the Splunk HEC once the queue has the capacity again. However, due to the concurrent nature of the internal queue, the capacity check performed can't be guaranteed, so edge cases of the queues capacity can cause some blocking of your applications calling thread. -```xml - - -``` ## HTTP Client The base implementation uses the [skinny-framework's HTTP client](https://github.com/skinny-framework/skinny-framework). It is a tiny library and does not bring with it many dependencies. `SplunkHecAppender` uses `SkinnyHttpHecClient` for HTTP communication. diff --git a/build.sbt b/build.sbt index 390759c..65c0468 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ name := """splunk-logback-hec-appender""" -version := "1.0.3" -scalaVersion := "2.12.1" -crossScalaVersions := Seq("2.11.8", "2.12.1") +version := "1.1.0" +scalaVersion := "2.12.2" +crossScalaVersions := Seq("2.11.11", "2.12.2") organization := "io.policarp" homepage := Some(url("https://github.com/kdrakon/splunk-logback-hec-appender")) scmInfo := Some( @@ -19,7 +19,7 @@ libraryDependencies ++= Seq( "org.json4s" %% "json4s-native" % "3.5.0", - "io.monix" %% "monix" % "2.1.1", + "io.monix" %% "monix" % "2.3.0", "ch.qos.logback" % "logback-core" % "1.1.7", "ch.qos.logback" % "logback-classic" % "1.1.7", diff --git a/src/main/scala/io/policarp/logback/SplunkHecAppender.scala b/src/main/scala/io/policarp/logback/SplunkHecAppender.scala index 1c4e837..8ad8b5f 100644 --- a/src/main/scala/io/policarp/logback/SplunkHecAppender.scala +++ b/src/main/scala/io/policarp/logback/SplunkHecAppender.scala @@ -1,15 +1,16 @@ package io.policarp.logback -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ConcurrentLinkedQueue import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.AppenderBase import io.policarp.logback.hec.SplunkHecClient -import io.policarp.logback.hec.skinnyhttp.{ SkinnyHecClient, SkinnyHttpLogFilter } -import monix.eval.Task +import io.policarp.logback.hec.skinnyhttp.{SkinnyHecClient, SkinnyHttpLogFilter} +import monix.eval.{Task, TaskCircuitBreaker} import monix.execution.Scheduler -import monix.reactive.{ Consumer, Observable } -import org.reactivestreams.{ Publisher, Subscriber, Subscription } +import monix.execution.atomic.AtomicLong +import monix.reactive.{Consumer, Observable} +import org.reactivestreams.{Publisher, Subscriber, Subscription} import scala.beans.BeanProperty import scala.concurrent.duration._ @@ -21,33 +22,38 @@ class SplunkHecAppender extends SplunkHecAppenderBase with SkinnyHecClient { trait SplunkHecAppenderBase extends AppenderBase[ILoggingEvent] { self: SplunkHecClient => - @BeanProperty var queue: Int = 1000 @BeanProperty var buffer: Int = 25 @BeanProperty var flush: Int = 30 @BeanProperty var parallelism: Int = Runtime.getRuntime.availableProcessors() @BeanProperty var layout: SplunkHecJsonLayoutBase = SplunkHecJsonLayout() - @BeanProperty var appenderStrategy: SplunkHecAppenderStrategy = BlockingSplunkHecAppenderStrategy() private implicit val scheduler = Scheduler.computation(parallelism) - private implicit lazy val logPublisher = new LogPublisher(queue) + private implicit lazy val logPublisher = new LogPublisher() private lazy val logStream = Observable.fromReactivePublisher(logPublisher).bufferTimedAndCounted(flush seconds, buffer) private lazy val logConsumer = Consumer.foreachParallelAsync[Task[Unit]](parallelism)(task => task) override def start() = { super.start() implicit val impliedLayout = layout - logStream.map(postTask).runWith(logConsumer).runAsync + logStream.map(postTask).consumeWith(logConsumer).runAsync } - override def append(event: ILoggingEvent) = appenderStrategy.append(event) + override def append(event: ILoggingEvent) = logPublisher.enqueue(event) } -private[logback] class LogPublisher(queueSize: Int)(implicit scheduler: Scheduler) extends Publisher[ILoggingEvent] { +private[logback] class LogPublisher()(implicit scheduler: Scheduler) extends Publisher[ILoggingEvent] { - val logQueue = new LinkedBlockingQueue[ILoggingEvent](queueSize) + val logQueue = new ConcurrentLinkedQueue[ILoggingEvent]() - def enqueue(event: ILoggingEvent) = logQueue.put(event) + private val LogPublisherCircuitBreaker = TaskCircuitBreaker ( + maxFailures = 5, + resetTimeout = 10 nanos, + exponentialBackoffFactor = 2, + maxResetTimeout = 1 minute + ) + + def enqueue(event: ILoggingEvent) = logQueue.add(event) override def subscribe(subscriber: Subscriber[_ >: ILoggingEvent]) = { subscriber.onSubscribe(new Subscription { @@ -57,11 +63,20 @@ private[logback] class LogPublisher(queueSize: Int)(implicit scheduler: Schedule override def cancel() = { cancelled = true } override def request(n: Long) = if (!cancelled) { - Task { - for (i <- 1L to n) { - subscriber.onNext(logQueue.take()) + val requests = AtomicLong(n) + val pollingTask = Task { + val currentRequests = requests.get + for (_ <- 1L to currentRequests) { + Option(logQueue.poll()) match { + case None => + throw new IllegalStateException("logQueue is empty") // so trigger circuit breaker + case Some(event) => + subscriber.onNext(event) + requests.decrement() + } } - }.runAsync + } + LogPublisherCircuitBreaker.protect(pollingTask).onErrorRestartIf(_ => requests.get != 0).runAsync } }) } diff --git a/src/main/scala/io/policarp/logback/SplunkHecAppenderStrategy.scala b/src/main/scala/io/policarp/logback/SplunkHecAppenderStrategy.scala deleted file mode 100644 index 20cb21c..0000000 --- a/src/main/scala/io/policarp/logback/SplunkHecAppenderStrategy.scala +++ /dev/null @@ -1,70 +0,0 @@ -package io.policarp.logback - -import java.util.concurrent.ForkJoinPool - -import ch.qos.logback.classic.spi.ILoggingEvent - -import scala.beans.BeanProperty -import scala.concurrent.ExecutionContext - -trait SplunkHecAppenderStrategy { - - def append(event: ILoggingEvent)(implicit logPublisher: LogPublisher): Unit = { - Option(event).foreach(e => { - event.prepareForDeferredProcessing() - event.getCallerData - doAppend(event, logPublisher) - }) - } - - def doAppend(event: ILoggingEvent, logPublisher: LogPublisher): Unit -} - -/** - * Will block on enqueue of logging if the log buffer is currently filled. - * - * Warning: This can cause issues with the application if the Splunk HEC API requests - * are not executing fast enough and holding up the main execution. - */ -case class BlockingSplunkHecAppenderStrategy() extends SplunkHecAppenderStrategy { - override def doAppend(event: ILoggingEvent, logPublisher: LogPublisher) = { - logPublisher.enqueue(event) - } -} - -/** - * Will create an asynchronous task to run in the background that will attempt to enqueue a log event. - * If the queue buffer is full, that thread will block separately from the calling application thread. - * - * Warning: This could cause issues if the Spunk HEC API is slow. Background enqueue tasks will pile up - * and possibly utilise CPU resources more. - */ -case class AsyncSplunkHecAppenderStrategy() extends SplunkHecAppenderStrategy { - - import monix.eval.Task - import monix.execution.Scheduler - - @BeanProperty var parallelism: Int = Runtime.getRuntime.availableProcessors() - private lazy val scheduler: Scheduler = - Scheduler.apply(ExecutionContext.fromExecutorService(new ForkJoinPool(parallelism))) - - override def doAppend(event: ILoggingEvent, logPublisher: LogPublisher) = { - Task { - logPublisher.enqueue(event) - }.runAsync(scheduler) - } -} - -/** - * Will attempt to enqueue a log event if there is capacity only. - * - * Warning: Detecting remaining capacity is not guaranteed with the concurrent nature of the - * [[LogPublisher]], so some enqueues will block. - */ -case class SpillingSplunkHecAppenderStrategy() extends SplunkHecAppenderStrategy { - override def doAppend(event: ILoggingEvent, logPublisher: LogPublisher) = { - if (logPublisher.logQueue.remainingCapacity > 0) { - logPublisher.enqueue(event) - } - } -} \ No newline at end of file diff --git a/src/main/scala/io/policarp/logback/hec/SplunkHecClient.scala b/src/main/scala/io/policarp/logback/hec/SplunkHecClient.scala index bb737b4..59959c4 100644 --- a/src/main/scala/io/policarp/logback/hec/SplunkHecClient.scala +++ b/src/main/scala/io/policarp/logback/hec/SplunkHecClient.scala @@ -6,12 +6,14 @@ import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.LayoutBase import ch.qos.logback.core.filter.Filter import ch.qos.logback.core.spi.FilterReply -import monix.eval.Task -import skinny.http.{ HTTP, Request } +import monix.eval.{Task, TaskCircuitBreaker} +import skinny.http.{HTTP, Request} import scala.beans.BeanProperty +import scala.concurrent.duration._ trait SplunkHecClient { + import SplunkHecClient._ @BeanProperty var splunkUrl: String = "" @BeanProperty var token: String = "" @@ -30,13 +32,24 @@ trait SplunkHecClient { * @param layout the layout to use when posting the events * @return the Monix Task will asynchronously perform the job of posting the logs and will return Unit */ - def postTask(events: Seq[ILoggingEvent])(implicit layout: LayoutBase[ILoggingEvent]): Task[Unit] = Task { - prepareRequest(events, layout).foreach(executeRequest) + def postTask(events: Seq[ILoggingEvent])(implicit layout: LayoutBase[ILoggingEvent]): Task[Unit] = { + SplunkHecClientCircuitBreaker.protect( + Task { + prepareRequest(events, layout).foreach(executeRequest) + } + ) } } object SplunkHecClient { + private[hec] val SplunkHecClientCircuitBreaker = TaskCircuitBreaker ( + maxFailures = 5, + resetTimeout = 5 seconds, + exponentialBackoffFactor = 2, + maxResetTimeout = 10 minute + ) + /** * Creates a newline separated list of individual Splunk JSON events */ diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index ab8d038..5941112 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -9,12 +9,12 @@ - htttp://somewhere.com/ + + http://localhost:8088/services/collector/event + A106745E-0F59-40FE-AF3A-4BAE49759791 10 10 - - testing-sean user=${USER} diff --git a/src/test/scala/io/policarp/logback/SplunkHecAppenderBaseTest.scala b/src/test/scala/io/policarp/logback/SplunkHecAppenderBaseTest.scala index d3d2752..fa9caf8 100644 --- a/src/test/scala/io/policarp/logback/SplunkHecAppenderBaseTest.scala +++ b/src/test/scala/io/policarp/logback/SplunkHecAppenderBaseTest.scala @@ -12,26 +12,15 @@ import org.slf4j.LoggerFactory import scala.concurrent.Future -class SplunkHecAppenderTestWithBlockingStrategy extends SplunkHecAppenderBaseTest { - override val strategy = BlockingSplunkHecAppenderStrategy() -} - -class SplunkHecAppenderTestWithAsyncStrategy extends SplunkHecAppenderBaseTest { - override val strategy = AsyncSplunkHecAppenderStrategy() -} - -trait SplunkHecAppenderBaseTest extends WordSpec with Matchers with ScalaFutures { +class SplunkHecAppenderBaseTest extends WordSpec with Matchers with ScalaFutures { implicit val scheduler = Scheduler(scala.concurrent.ExecutionContext.global) - val strategy: SplunkHecAppenderStrategy - "The appender" should { "process a single event" in { val appender = new SplunkHecAppenderBase with FakeHecClient { - this.appenderStrategy = strategy this.flush = 1 this.buffer = 1 } @@ -58,7 +47,6 @@ trait SplunkHecAppenderBaseTest extends WordSpec with Matchers with ScalaFutures val executions = Integer.MAX_VALUE / 1000 val appender = new SplunkHecAppenderBase with FakeHecClient { - this.appenderStrategy = strategy this.flush = 1 this.buffer = 100 this.parallelism = 8 @@ -85,46 +73,4 @@ trait SplunkHecAppenderBaseTest extends WordSpec with Matchers with ScalaFutures } } } -} - -class SplunkHecAppenderTestWithSpillingStrategy extends WordSpec with Matchers with ScalaFutures { - - val logger = LoggerFactory.getLogger(classOf[SplunkHecAppenderTestWithSpillingStrategy]) - implicit val scheduler = Scheduler(scala.concurrent.ExecutionContext.global) - - "The appender" should { - "eventually log most events that get pushed to it" in { - - val minExecutions = Integer.MAX_VALUE / 1000 - - val appender = new SplunkHecAppenderBase with FakeHecClient { - this.appenderStrategy = SpillingSplunkHecAppenderStrategy() - this.flush = 5 - this.buffer = 1000 - this.parallelism = 8 - } - - appender.start() - - Task { - while (true) { - val event = MockLoggingEvent("SomeClass", "Normal stuff happening", Level.INFO) - appender.append(event) - } - }.runAsync - - val f = Future { - while (appender.fakeRequest.executions.get() < minExecutions) { - // wait - Thread.sleep(50) - } - } - - whenReady(f, Timeout(Span(1, Minute))) { r => - logger.info(s"Logged ${appender.fakeRequest.executions.get()} messages") - assert(appender.fakeRequest.executions.get() >= minExecutions) - } - - } - } -} +} \ No newline at end of file diff --git a/src/test/scala/io/policarp/logback/TestRunner.scala b/src/test/scala/io/policarp/logback/TestRunner.scala index 5a8d0dd..d7dcb01 100644 --- a/src/test/scala/io/policarp/logback/TestRunner.scala +++ b/src/test/scala/io/policarp/logback/TestRunner.scala @@ -4,7 +4,9 @@ import org.slf4j.LoggerFactory object TestRunner extends App { - for (i <- 1 to 19) { + val logger = LoggerFactory.getLogger("test") + + for (i <- 1 to 1023) { try { class Blah { val n = 0 @@ -12,9 +14,14 @@ object TestRunner extends App { } new Blah() } catch { - case e: Throwable => LoggerFactory.getLogger("test").error("IN THREAD", e) + case e: Throwable => logger.error("IN THREAD", e) } } + Thread.sleep(5000) + + logger.info("done sleep") + logger.info("...going for longer sleep") + Thread.sleep(60000) } \ No newline at end of file diff --git a/start-local-splunk.sh b/start-local-splunk.sh new file mode 100755 index 0000000..9a4a860 --- /dev/null +++ b/start-local-splunk.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker run --name splunk --hostname splunk -p 8000:8000 -p 8088:8088 -d -e "SPLUNK_START_ARGS=--accept-license" splunk/splunk:6.6.1 \ No newline at end of file