diff --git a/atlas-eval/src/main/resources/reference.conf b/atlas-eval/src/main/resources/reference.conf index 19c4785ef..833a65b34 100644 --- a/atlas-eval/src/main/resources/reference.conf +++ b/atlas-eval/src/main/resources/reference.conf @@ -43,6 +43,10 @@ atlas.eval { enabled = false uri = "" } + + // Should no-data messages be enabled. For some use-cases they may not be desirable, only + // emit them if set to true. + enable-no-data-messages = true } graph { diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala index d506c1127..64543623c 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala @@ -99,6 +99,9 @@ private[stream] abstract class EvaluatorImpl( // Version of the LWC Server API to use private val lwcapiVersion: Int = config.getInt("atlas.eval.stream.lwcapi-version") + // Should no data messages be emitted? + private val enableNoDataMsgs = config.getBoolean("atlas.eval.stream.enable-no-data-messages") + // Counter for message that cannot be parsed private val badMessages = registry.counter("atlas.eval.badMessages") @@ -297,7 +300,7 @@ private[stream] abstract class EvaluatorImpl( .via(g) .flatMapConcat(s => Source(splitByStep(s))) .groupBy(Int.MaxValue, stepSize, allowClosedSubstreamRecreation = true) - .via(new FinalExprEval(context.interpreter)) + .via(new FinalExprEval(context.interpreter, enableNoDataMsgs)) .mergeSubstreams .via(context.monitorFlow("12_OutputSources")) .flatMapConcat(s => s) @@ -329,7 +332,7 @@ private[stream] abstract class EvaluatorImpl( Flow[DatapointGroup] .map(g => toTimeGroup(stepSize, exprs, g, context)) .merge(Source.single(sources), eagerComplete = false) - .via(new FinalExprEval(interpreter)) + .via(new FinalExprEval(interpreter, enableNoDataMsgs)) .flatMapConcat(s => s) .via(new OnUpstreamFinish[MessageEnvelope](context.dsLogger.close())) .merge(logSrc, eagerComplete = false) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index 8a242ff21..a6bd029f9 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -32,6 +32,8 @@ import com.netflix.atlas.core.model.StatefulExpr import com.netflix.atlas.core.model.StyleExpr import com.netflix.atlas.core.model.TimeSeries import com.netflix.atlas.core.util.IdentityMap +import com.netflix.atlas.eval.model.ArrayData +import com.netflix.atlas.eval.model.ChunkData import com.netflix.atlas.eval.model.ExprType import com.netflix.atlas.eval.model.TimeGroup import com.netflix.atlas.eval.model.TimeGroupsTuple @@ -49,8 +51,11 @@ import scala.collection.mutable * * @param exprInterpreter * Used for evaluating the expressions. + * @param enableNoDataMsgs + * If set to true, then a no data message will be emitted for each expression if there + * is no data to generate an actual data point. */ -private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) +private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDataMsgs: Boolean) extends GraphStage[FlowShape[AnyRef, Source[MessageEnvelope, NotUsed]]] with StrictLogging { @@ -199,43 +204,47 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) } // Generate the time series and diagnostic output - val output = recipients.flatMap { - case (styleExpr, infos) => - val exprStr = styleExpr.toString - val ids = infos.map(_.id) - // Use an identity map for the state to ensure that multiple equivalent stateful - // expressions, e.g. derivative(a) + derivative(a), will have isolated state. - val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any]) - val context = EvalContext(timestamp, timestamp + step, step, state) - try { - val result = styleExpr.expr.eval(context, dataExprToDatapoints) - states(styleExpr) = result.state - val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data - - // Collect final data size per DataSource - ids.foreach(rateCollector.incrementOutput(_, data.size)) - - // Create time series messages - infos.flatMap { info => - data.map { t => - val ts = TimeSeriesMessage( - styleExpr, - context, - t.withLabel(styleExpr.legend(t)), - info.palette, - Some(exprStr) - ) - new MessageEnvelope(info.id, ts) + val output = recipients + .flatMap { + case (styleExpr, infos) => + val exprStr = styleExpr.toString + val ids = infos.map(_.id) + // Use an identity map for the state to ensure that multiple equivalent stateful + // expressions, e.g. derivative(a) + derivative(a), will have isolated state. + val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any]) + val context = EvalContext(timestamp, timestamp + step, step, state) + try { + val result = styleExpr.expr.eval(context, dataExprToDatapoints) + states(styleExpr) = result.state + val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data + + // Collect final data size per DataSource + ids.foreach(rateCollector.incrementOutput(_, data.size)) + + // Create time series messages + infos.flatMap { info => + data.map { t => + val ts = TimeSeriesMessage( + styleExpr, + context, + t.withLabel(styleExpr.legend(t)), + info.palette, + Some(exprStr) + ) + new MessageEnvelope(info.id, ts) + } } + } catch { + case e: Exception => + val msg = error(styleExpr.toString, "final eval failed", e) + ids.map { id => + new MessageEnvelope(id, msg) + } } - } catch { - case e: Exception => - val msg = error(styleExpr.toString, "final eval failed", e) - ids.map { id => - new MessageEnvelope(id, msg) - } - } - } + } + .filter { env => + enableNoDataMsgs || hasFiniteValue(env.message()) + } val rateMessages = rateCollector.getAll.map { case (id, rate) => new MessageEnvelope(id, rate) @@ -244,6 +253,20 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) output ++ rateMessages } + private def hasFiniteValue(value: AnyRef): Boolean = { + value match { + case ts: TimeSeriesMessage => valueNotNaN(ts.data) + case _ => true + } + } + + private def valueNotNaN(value: ChunkData): Boolean = { + value match { + case ArrayData(vs) => vs.exists(!_.isNaN) + case null => true + } + } + private def handleSingleGroup(g: TimeGroup): Unit = { push(out, Source(handleData(g))) } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index 359bdcfbc..18a449219 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -51,9 +51,9 @@ class FinalExprEvalSuite extends FunSuite { private val registry = new DefaultRegistry() - private def run(input: List[AnyRef]): List[MessageEnvelope] = { + private def run(input: List[AnyRef], noDataMsgs: Boolean = true): List[MessageEnvelope] = { val future = Source(input) - .via(new FinalExprEval(interpreter)) + .via(new FinalExprEval(interpreter, noDataMsgs)) .flatMapConcat(s => s) .runWith(Sink.seq) Await.result(future, Duration.Inf).toList @@ -120,7 +120,15 @@ class FinalExprEvalSuite extends FunSuite { val (tsId, tsMsg) = tsMsgs.head.id -> tsMsgs.head.message.asInstanceOf[TimeSeriesMessage] assert(tsId == "a") assertEquals(tsMsg.label, "(NO DATA / NO DATA)") + } + test("no data line suppressed") { + val input = List( + sources(ds("a", "http://atlas/graph?q=name,latency,:eq,:dist-avg")), + TimeGroup(0L, step, Map.empty) + ) + val output = run(input, noDataMsgs = false) + assertEquals(output.size, 0) } private def isTimeSeries(messageEnvelope: MessageEnvelope): Boolean = {