Skip to content

Commit

Permalink
eval: add flag to disable no data messages (#1710)
Browse files Browse the repository at this point in the history
When running in streaming support disabling the no data
messages for the expression. Some legacy use-cases may
not account for them and fail.
  • Loading branch information
brharrington authored Oct 18, 2024
1 parent d661d81 commit 7005cc3
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 40 deletions.
4 changes: 4 additions & 0 deletions atlas-eval/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ atlas.eval {
enabled = false
uri = ""
}

// Should no-data messages be enabled. For some use-cases they may not be desirable, only
// emit them if set to true.
enable-no-data-messages = true
}

graph {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private[stream] abstract class EvaluatorImpl(
// Version of the LWC Server API to use
private val lwcapiVersion: Int = config.getInt("atlas.eval.stream.lwcapi-version")

// Should no data messages be emitted?
private val enableNoDataMsgs = config.getBoolean("atlas.eval.stream.enable-no-data-messages")

// Counter for message that cannot be parsed
private val badMessages = registry.counter("atlas.eval.badMessages")

Expand Down Expand Up @@ -297,7 +300,7 @@ private[stream] abstract class EvaluatorImpl(
.via(g)
.flatMapConcat(s => Source(splitByStep(s)))
.groupBy(Int.MaxValue, stepSize, allowClosedSubstreamRecreation = true)
.via(new FinalExprEval(context.interpreter))
.via(new FinalExprEval(context.interpreter, enableNoDataMsgs))
.mergeSubstreams
.via(context.monitorFlow("12_OutputSources"))
.flatMapConcat(s => s)
Expand Down Expand Up @@ -329,7 +332,7 @@ private[stream] abstract class EvaluatorImpl(
Flow[DatapointGroup]
.map(g => toTimeGroup(stepSize, exprs, g, context))
.merge(Source.single(sources), eagerComplete = false)
.via(new FinalExprEval(interpreter))
.via(new FinalExprEval(interpreter, enableNoDataMsgs))
.flatMapConcat(s => s)
.via(new OnUpstreamFinish[MessageEnvelope](context.dsLogger.close()))
.merge(logSrc, eagerComplete = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TimeSeries
import com.netflix.atlas.core.util.IdentityMap
import com.netflix.atlas.eval.model.ArrayData
import com.netflix.atlas.eval.model.ChunkData
import com.netflix.atlas.eval.model.ExprType
import com.netflix.atlas.eval.model.TimeGroup
import com.netflix.atlas.eval.model.TimeGroupsTuple
Expand All @@ -49,8 +51,11 @@ import scala.collection.mutable
*
* @param exprInterpreter
* Used for evaluating the expressions.
* @param enableNoDataMsgs
* If set to true, then a no data message will be emitted for each expression if there
* is no data to generate an actual data point.
*/
private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter, enableNoDataMsgs: Boolean)
extends GraphStage[FlowShape[AnyRef, Source[MessageEnvelope, NotUsed]]]
with StrictLogging {

Expand Down Expand Up @@ -199,43 +204,47 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
}

// Generate the time series and diagnostic output
val output = recipients.flatMap {
case (styleExpr, infos) =>
val exprStr = styleExpr.toString
val ids = infos.map(_.id)
// Use an identity map for the state to ensure that multiple equivalent stateful
// expressions, e.g. derivative(a) + derivative(a), will have isolated state.
val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any])
val context = EvalContext(timestamp, timestamp + step, step, state)
try {
val result = styleExpr.expr.eval(context, dataExprToDatapoints)
states(styleExpr) = result.state
val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data

// Collect final data size per DataSource
ids.foreach(rateCollector.incrementOutput(_, data.size))

// Create time series messages
infos.flatMap { info =>
data.map { t =>
val ts = TimeSeriesMessage(
styleExpr,
context,
t.withLabel(styleExpr.legend(t)),
info.palette,
Some(exprStr)
)
new MessageEnvelope(info.id, ts)
val output = recipients
.flatMap {
case (styleExpr, infos) =>
val exprStr = styleExpr.toString
val ids = infos.map(_.id)
// Use an identity map for the state to ensure that multiple equivalent stateful
// expressions, e.g. derivative(a) + derivative(a), will have isolated state.
val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any])
val context = EvalContext(timestamp, timestamp + step, step, state)
try {
val result = styleExpr.expr.eval(context, dataExprToDatapoints)
states(styleExpr) = result.state
val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data

// Collect final data size per DataSource
ids.foreach(rateCollector.incrementOutput(_, data.size))

// Create time series messages
infos.flatMap { info =>
data.map { t =>
val ts = TimeSeriesMessage(
styleExpr,
context,
t.withLabel(styleExpr.legend(t)),
info.palette,
Some(exprStr)
)
new MessageEnvelope(info.id, ts)
}
}
} catch {
case e: Exception =>
val msg = error(styleExpr.toString, "final eval failed", e)
ids.map { id =>
new MessageEnvelope(id, msg)
}
}
} catch {
case e: Exception =>
val msg = error(styleExpr.toString, "final eval failed", e)
ids.map { id =>
new MessageEnvelope(id, msg)
}
}
}
}
.filter { env =>
enableNoDataMsgs || hasFiniteValue(env.message())
}

val rateMessages = rateCollector.getAll.map {
case (id, rate) => new MessageEnvelope(id, rate)
Expand All @@ -244,6 +253,20 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
output ++ rateMessages
}

private def hasFiniteValue(value: AnyRef): Boolean = {
value match {
case ts: TimeSeriesMessage => valueNotNaN(ts.data)
case _ => true
}
}

private def valueNotNaN(value: ChunkData): Boolean = {
value match {
case ArrayData(vs) => vs.exists(!_.isNaN)
case null => true
}
}

private def handleSingleGroup(g: TimeGroup): Unit = {
push(out, Source(handleData(g)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class FinalExprEvalSuite extends FunSuite {

private val registry = new DefaultRegistry()

private def run(input: List[AnyRef]): List[MessageEnvelope] = {
private def run(input: List[AnyRef], noDataMsgs: Boolean = true): List[MessageEnvelope] = {
val future = Source(input)
.via(new FinalExprEval(interpreter))
.via(new FinalExprEval(interpreter, noDataMsgs))
.flatMapConcat(s => s)
.runWith(Sink.seq)
Await.result(future, Duration.Inf).toList
Expand Down Expand Up @@ -120,7 +120,15 @@ class FinalExprEvalSuite extends FunSuite {
val (tsId, tsMsg) = tsMsgs.head.id -> tsMsgs.head.message.asInstanceOf[TimeSeriesMessage]
assert(tsId == "a")
assertEquals(tsMsg.label, "(NO DATA / NO DATA)")
}

test("no data line suppressed") {
val input = List(
sources(ds("a", "http://atlas/graph?q=name,latency,:eq,:dist-avg")),
TimeGroup(0L, step, Map.empty)
)
val output = run(input, noDataMsgs = false)
assertEquals(output.size, 0)
}

private def isTimeSeries(messageEnvelope: MessageEnvelope): Boolean = {
Expand Down

0 comments on commit 7005cc3

Please sign in to comment.