Skip to content

Commit

Permalink
Added support for printing query/rule info in notification messages.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Mar 5, 2024
1 parent 51267e0 commit 063ed65
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.AlertContext
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
Expand Down Expand Up @@ -95,6 +96,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
monitorCtx.findingsToTriggeredQueries = mutableMapOf()

try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
Expand Down Expand Up @@ -456,6 +458,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)

val alerts = mutableListOf<Alert>()
val alertContexts = mutableListOf<AlertContext>()
triggerFindingDocPairs.forEach {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(it.first),
Expand All @@ -466,6 +469,14 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
workflorwRunContext = workflowRunContext
)
alerts.add(alert)
alertContexts.add(
AlertContext(
alert = alert,
associatedQueries = alert.findingIds.flatMap { findingId ->
monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList()
}
)
)
}

val shouldDefaultToPerExecution = defaultToPerExecutionAction(
Expand All @@ -479,13 +490,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
for (action in trigger.actions) {
val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alert in alerts) {
for (alert in alertContexts) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun)
triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() }
triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults)
}
} else if (alerts.isNotEmpty()) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun)
} else if (alertContexts.isNotEmpty()) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = alertContexts), monitorCtx, monitor, dryrun)
for (alert in alerts) {
triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() }
triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults)
Expand Down Expand Up @@ -532,6 +543,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()
val findingsToTriggeredQueries = mutableMapOf<String, List<DocLevelQuery>>()

docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
Expand All @@ -552,6 +564,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)
findingsToTriggeredQueries[finding.id] = triggeredQueries

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
Expand All @@ -578,6 +591,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
// suppress exception
logger.error("Optional finding callback failed", e)
}

if (monitorCtx.findingsToTriggeredQueries == null) monitorCtx.findingsToTriggeredQueries = findingsToTriggeredQueries
else monitorCtx.findingsToTriggeredQueries = monitorCtx.findingsToTriggeredQueries!! + findingsToTriggeredQueries

return findingDocPairs
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.ScriptService
Expand All @@ -38,6 +39,7 @@ data class MonitorRunnerExecutionContext(
var docLevelMonitorQueries: DocLevelMonitorQueries? = null,
var workflowService: WorkflowService? = null,
var jvmStats: JvmStats? = null,
var findingsToTriggeredQueries: Map<String, List<DocLevelQuery>>? = null,

@Volatile var retryPolicy: BackoffPolicy? = null,
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1997,6 +1997,65 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals(1, output.objectMap("trigger_results").values.size)
}

fun `test document-level monitor notification message includes queries`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "test-query", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val alertCategories = AlertCategory.values()
val actionExecutionScope = PerAlertActionScope(
actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet()
)
val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope)
val actions = (0..randomInt(10)).map {
randomActionWithPolicy(
template = randomTemplateScript("{{#ctx.alerts}}\n{{#associated_queries}}\n(name={{name}})\n{{/associated_queries}}\n{{/ctx.alerts}}"),
destinationId = createDestination().id,
actionExecutionPolicy = actionExecutionPolicy
)
}

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex")))

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(2, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
assertEquals(actions.size, alertActionResult.values.size)
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"] as Map<String, String>
assertTrue(
"The notification message is missing the query name.",
actionOutput["message"]!!.contains("(name=${docQuery.name})")
)
}
}
}
}

@Suppress("UNCHECKED_CAST")
/** helper that returns a field in a json map whose values are all json objects */
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
Expand Down

0 comments on commit 063ed65

Please sign in to comment.