Skip to content

Commit

Permalink
Added support for printing document data in notification messages for…
Browse files Browse the repository at this point in the history
… document level monitors.

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Mar 11, 2024
1 parent 60e3091 commit 2d46922
Show file tree
Hide file tree
Showing 7 changed files with 465 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.get.MultiGetItemResponse
import org.opensearch.action.get.MultiGetRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
Expand All @@ -23,12 +25,15 @@ import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.parseSampleDocTags
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
Expand Down Expand Up @@ -65,6 +70,7 @@ import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
Expand All @@ -84,6 +90,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()

// Maps a finding ID to the concrete index name.
val findingIdToConcreteIndex = mutableMapOf<String, String>()

// Maps the docId to the doc source
val docIdToDocMap = mutableMapOf<String, MutableList<MultiGetItemResponse>>()

override suspend fun runMonitor(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
Expand Down Expand Up @@ -457,6 +469,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
error = monitorResult.error ?: triggerResult.error
)

if (printsSampleDocData(trigger) && triggerFindingDocPairs.isNotEmpty())
getDocSources(
findingToDocPairs = findingToDocPairs,
monitorCtx = monitorCtx,
monitor = monitor
)

val alerts = mutableListOf<Alert>()
val alertContexts = mutableListOf<AlertContext>()
triggerFindingDocPairs.forEach {
Expand All @@ -469,12 +488,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
workflorwRunContext = workflowRunContext
)
alerts.add(alert)

val docId = alert.relatedDocIds.first().split("|").first()
val docSource = docIdToDocMap[docId]?.find { item ->
findingIdToConcreteIndex[alert.findingIds.first()] == item.index
}?.response?.convertToMap()

alertContexts.add(
AlertContext(
alert = alert,
associatedQueries = alert.findingIds.flatMap { findingId ->
monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList()
}
},
sampleDocs = listOfNotNull(docSource)
)
)
}
Expand Down Expand Up @@ -565,6 +591,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)
findingsToTriggeredQueries[finding.id] = triggeredQueries
findingIdToConcreteIndex[finding.id] = finding.index

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
Expand Down Expand Up @@ -1064,6 +1091,36 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return numDocs >= maxNumDocsThreshold
}

/**
* Performs an mGet request to retrieve the documents associated with findings.
*
* When possible, this will only retrieve the document fields that are specifically
* referenced for printing in the mustache template.
*/
private suspend fun getDocSources(
findingToDocPairs: List<Pair<String, String>>,
monitorCtx: MonitorRunnerExecutionContext,
monitor: Monitor
) {
val docFieldTags = parseSampleDocTags(monitor.triggers)
val request = MultiGetRequest()
findingToDocPairs.forEach { (_, docIdAndIndex) ->
val docIdAndIndexSplit = docIdAndIndex.split("|")
val docId = docIdAndIndexSplit[0]
val concreteIndex = docIdAndIndexSplit[1]
if (docId.isNotEmpty() && concreteIndex.isNotEmpty()) {
val docItem = MultiGetRequest.Item(concreteIndex, docId)
if (docFieldTags.isNotEmpty())
docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray()))
request.add(docItem)
}
}
val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) }
response.responses.forEach { item ->
docIdToDocMap.getOrPut(item.id) { mutableListOf() }.add(item)
}
}

/**
* POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name
* and doc source. A list of these POJOs would be passed to percolate query execution logic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,19 @@ data class BucketLevelTriggerExecutionContext(
*/
override fun asTemplateArg(): Map<String, Any?> {
val tempArg = super.asTemplateArg().toMutableMap()
tempArg["trigger"] = trigger.asTemplateArg()
tempArg["dedupedAlerts"] = dedupedAlerts.map { it.asTemplateArg() }
tempArg["newAlerts"] = newAlerts.map { it.asTemplateArg() }
tempArg["completedAlerts"] = completedAlerts.map { it.asTemplateArg() }
tempArg["results"] = results
tempArg[TRIGGER_FIELD] = trigger.asTemplateArg()
tempArg[DEDUPED_ALERTS_FIELD] = dedupedAlerts.map { it.asTemplateArg() }
tempArg[NEW_ALERTS_FIELD] = newAlerts.map { it.asTemplateArg() }
tempArg[COMPLETED_ALERTS_FIELD] = completedAlerts.map { it.asTemplateArg() }
tempArg[RESULTS_FIELD] = results
return tempArg
}

companion object {
const val TRIGGER_FIELD = "trigger"
const val DEDUPED_ALERTS_FIELD = "dedupedAlerts"
const val NEW_ALERTS_FIELD = "newAlerts"
const val COMPLETED_ALERTS_FIELD = "completedAlerts"
const val RESULTS_FIELD = "results"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ data class DocumentLevelTriggerExecutionContext(
*/
override fun asTemplateArg(): Map<String, Any?> {
val tempArg = super.asTemplateArg().toMutableMap()
tempArg["trigger"] = trigger.asTemplateArg()
tempArg["alerts"] = alerts.map { it.asTemplateArg() }
tempArg[TRIGGER_FIELD] = trigger.asTemplateArg()
tempArg[ALERTS_FIELD] = alerts.map { it.asTemplateArg() }
return tempArg
}

companion object {
const val TRIGGER_FIELD = "trigger"
const val ALERTS_FIELD = "alerts"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.support.AggregationPath
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortOrder

class AggregationQueryRewriter {
Expand Down Expand Up @@ -62,6 +63,8 @@ class AggregationQueryRewriter {
if (factory is CompositeAggregationBuilder) {
if (returnSampleDocs) {
// TODO: Returning sample documents should ideally be a toggleable option at the action level.
// For now, identify which fields to return from the doc _source for the trigger's actions.
val docFieldTags = parseSampleDocTags(listOf(trigger))
val sampleDocsAgg = listOf(
AggregationBuilders.topHits("low_hits")
.size(5)
Expand All @@ -71,6 +74,7 @@ class AggregationQueryRewriter {
.sort("_score", SortOrder.DESC)
)
sampleDocsAgg.forEach { agg ->
if (docFieldTags.isNotEmpty()) agg.fetchSource(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray()))
if (!factory.subAggregations.contains(agg)) factory.subAggregation(agg)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ package org.opensearch.alerting.util
import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.commons.alerting.model.AlertContext
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.action.Action
Expand Down Expand Up @@ -183,74 +187,84 @@ fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) {
}
}

/**
* Checks the `message_template.source` in the [Script] for each [Action] in the [Trigger] for
* any instances of [AlertContext.SAMPLE_DOCS_FIELD] tags.
* This indicates the message is expected to print data from the sample docs, so we need to collect the samples.
*/
fun printsSampleDocData(trigger: Trigger): Boolean {
return trigger.actions.any { action ->
// The {{ctx}} mustache tag indicates the entire ctx object should be printed in the message string.
// TODO: Consider excluding `{{ctx}}` from criteria for bucket-level triggers as printing all of
// their sample documents could make the notification message too large to send.
action.messageTemplate.idOrCode.contains("{{ctx}}") ||
action.messageTemplate.idOrCode.contains(AlertContext.SAMPLE_DOCS_FIELD)
}
}

fun printsSampleDocData(triggers: List<Trigger>): Boolean {
return triggers.any { trigger -> printsSampleDocData(trigger) }
}

/**
* Mustache template supports iterating through a list using a `{{#listVariable}}{{/listVariable}}` block.
* https://mustache.github.io/mustache.5.html
*
* This function looks `{{#${[AlertContext.SAMPLE_DOCS_FIELD]}}}{{/${[AlertContext.SAMPLE_DOCS_FIELD]}}}` blocks,
* This function looks for `{{#${[AlertContext.SAMPLE_DOCS_FIELD]}}}{{/${[AlertContext.SAMPLE_DOCS_FIELD]}}}` blocks,
* and parses the contents for tags, which we interpret as fields within the sample document.
*
* @return a [Set] of [String]s indicating fields within a document.
*/
fun parseSampleDocTags(messageTemplate: Script): Set<String> {
val sampleBlockPrefix = "{{#${AlertContext.SAMPLE_DOCS_FIELD}}}"
val sampleBlockSuffix = "{{/${AlertContext.SAMPLE_DOCS_FIELD}}}"
val sourcePrefix = "_source."
val tagRegex = Regex("\\{\\{([^{}]+)}}")
val tags = mutableSetOf<String>()
try {
// Identify the start and end points of the sample block
var sampleBlockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix)
var sampleBlockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, sampleBlockStart)
var blockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix)
var blockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, blockStart)

// Sample start/end of -1 indicates there are no more complete sample blocks
while (sampleBlockStart != -1 && sampleBlockEnd != -1) {
while (blockStart != -1 && blockEnd != -1) {
// Isolate the sample block
val sampleBlock = messageTemplate.idOrCode.substring(sampleBlockStart, sampleBlockEnd)
val sampleBlock = messageTemplate.idOrCode.substring(blockStart, blockEnd)
// Remove the iteration wrapper tags
.removeSurrounding(sampleBlockPrefix, sampleBlockSuffix)
.removePrefix(sampleBlockPrefix)
.removeSuffix(sampleBlockSuffix)

// Search for each tag
tagRegex.findAll(sampleBlock).forEach { match ->
// Parse the field name from the tag (e.g., `{{_source.timestamp}}` becomes `_source.timestamp`)
val docField = match.groupValues[1].trim()
if (docField.isNotEmpty()) tags.add(docField)
// Parse the field name from the tag (e.g., `{{_source.timestamp}}` becomes `timestamp`)
var docField = match.groupValues[1].trim()
if (docField.startsWith(sourcePrefix)) {
docField = docField.removePrefix(sourcePrefix)
if (docField.isNotEmpty()) tags.add(docField)
}
}

// Identify any subsequent sample blocks
sampleBlockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix, sampleBlockEnd)
sampleBlockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, sampleBlockStart)
blockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix, blockEnd)
blockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, blockStart)
}
} catch (e: Exception) {
logger.warn("Failed to parse sample document fields.", e)
}
return tags
}

fun parseSampleDocTags(actions: List<Action>): Set<String> {
return actions.flatMap { action -> parseSampleDocTags(action.messageTemplate) }
.toSet()
fun parseSampleDocTags(triggers: List<Trigger>): Set<String> {
return triggers.flatMap { trigger ->
trigger.actions.flatMap { action -> parseSampleDocTags(action.messageTemplate) }
}.toSet()
}

fun parseSampleDocTags(triggers: List<Trigger>): Set<String> {
return triggers.flatMap { trigger -> parseSampleDocTags(trigger.actions) }
.toSet()
/**
* Checks the `message_template.source` in the [Script] for each [Action] in the [Trigger] for
* any instances of [AlertContext.SAMPLE_DOCS_FIELD] tags.
* This indicates the message is expected to print data from the sample docs, so we need to collect the samples.
*/
fun printsSampleDocData(trigger: Trigger): Boolean {
return trigger.actions.any { action ->
val alertsField = when (trigger) {
is BucketLevelTrigger -> "{{ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}}"
is DocumentLevelTrigger -> "{{ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}}"
// Only bucket, and document level monitors are supported currently.
else -> return false
}

// TODO: Consider excluding the following tags from TRUE criteria (especially for bucket-level triggers) as
// printing all of the sample documents could make the notification message too large to send.
// 1. {{ctx}} - prints entire ctx object in the message string
// 2. {{ctx.<alertsField>}} - prints entire alerts array in the message string, which includes the sample docs
// 3. {{AlertContext.SAMPLE_DOCS_FIELD}} - prints entire sample docs array in the message string
val validTags = listOfNotNull(
"{{ctx}}",
alertsField,
AlertContext.SAMPLE_DOCS_FIELD
)
validTags.any { tag -> action.messageTemplate.idOrCode.contains(tag) }
}
}
Loading

0 comments on commit 2d46922

Please sign in to comment.