From c3a11bd06f3748bf518e072f9fb496ceeb28b24c Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 15 Mar 2024 12:44:22 -0700 Subject: [PATCH] [Backport 2.11] Enhance per bucket, and per document monitor notification message ctx. (#1450) (#1480) * [Backport 2.x] Enhance per bucket, and per document monitor notification message ctx. (#1450) (#1477) * Enhance per bucket, and per document monitor notification message ctx. (#1450) * Adding dev logs. Signed-off-by: AWSHurneyt * Added support for returning sample documents for bucket level monitors. Signed-off-by: AWSHurneyt * Added support for printing query/rule info in notification messages. Signed-off-by: AWSHurneyt * Extracted out helper function. Signed-off-by: AWSHurneyt * Extracted out helper function. Signed-off-by: AWSHurneyt * Added support for printing document data in notification messages for document level monitors. Signed-off-by: AWSHurneyt * Refactored logic after making AlertContext a separate class from Alert instead of inheriting/extending it in common utils. Signed-off-by: AWSHurneyt * Moved AlertContext data model from common utils to alerting plugin. Signed-off-by: AWSHurneyt * Fixed ktlint errors. Signed-off-by: AWSHurneyt * Added additional unit tests. Signed-off-by: AWSHurneyt * Extracted sample doc aggs logic into helper function. Added support for sorting sample docs based on metric aggregations. Signed-off-by: AWSHurneyt * Extracted get sample doc logic into helper function. Added sorting for sample docs. Signed-off-by: AWSHurneyt * Removed dev code. Signed-off-by: AWSHurneyt * Fixed ktlint errors. Signed-off-by: AWSHurneyt * Added comments based on PR feedback. Signed-off-by: AWSHurneyt * Added logic to make mGet calls in batches. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt (cherry picked from commit 5dc690caf1d4b9935f9aeb946c104be8d4861a77) Signed-off-by: AWSHurneyt * Fixed imports. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt * [Backport 2.11] Backport #1427 and #1464 to 2.11 (#1479) * Feature findings enhancemnt (#1427) (#1457) * added support for param in Finding API * added detectionType as param for Findings API enhancements * added searchString param in FIndingsAPI * adding addiional params findingIds, startTime and endTime --------- (cherry picked from commit 2420c2ccfd2c4fa6405527d062f916269bbf4e57) Signed-off-by: Riya Saxena Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] * Findings API Enhancements changes and integ tests fix (#1464) (#1474) * solution to fix integ tests Signed-off-by: Riya Saxena * fix flaky DocumentMonitor Runner tests Signed-off-by: Riya Saxena * fix findings API enhancemnts Signed-off-by: Riya Saxena --------- Signed-off-by: Riya Saxena (cherry picked from commit ba84d04d56fcbd8eb1ae0c9c35cb9fb6b432afbc) * fix integ tests Signed-off-by: Joanne Wang --------- Signed-off-by: Riya Saxena Signed-off-by: github-actions[bot] Signed-off-by: Joanne Wang Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] Co-authored-by: Riya <69919272+riysaxen-amzn@users.noreply.github.com> * [Backport 2.x] Enhance per bucket, and per document monitor notification message ctx. (#1450) (#1477) * Enhance per bucket, and per document monitor notification message ctx. (#1450) * Adding dev logs. Signed-off-by: AWSHurneyt * Added support for returning sample documents for bucket level monitors. Signed-off-by: AWSHurneyt * Added support for printing query/rule info in notification messages. Signed-off-by: AWSHurneyt * Extracted out helper function. Signed-off-by: AWSHurneyt * Extracted out helper function. Signed-off-by: AWSHurneyt * Added support for printing document data in notification messages for document level monitors. Signed-off-by: AWSHurneyt * Refactored logic after making AlertContext a separate class from Alert instead of inheriting/extending it in common utils. Signed-off-by: AWSHurneyt * Moved AlertContext data model from common utils to alerting plugin. Signed-off-by: AWSHurneyt * Fixed ktlint errors. Signed-off-by: AWSHurneyt * Added additional unit tests. Signed-off-by: AWSHurneyt * Extracted sample doc aggs logic into helper function. Added support for sorting sample docs based on metric aggregations. Signed-off-by: AWSHurneyt * Extracted get sample doc logic into helper function. Added sorting for sample docs. Signed-off-by: AWSHurneyt * Removed dev code. Signed-off-by: AWSHurneyt * Fixed ktlint errors. Signed-off-by: AWSHurneyt * Added comments based on PR feedback. Signed-off-by: AWSHurneyt * Added logic to make mGet calls in batches. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt (cherry picked from commit 5dc690caf1d4b9935f9aeb946c104be8d4861a77) Signed-off-by: AWSHurneyt * Fixed imports. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt * Fixed test. Signed-off-by: AWSHurneyt * Fixed ktlint error. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt Signed-off-by: Riya Saxena Signed-off-by: github-actions[bot] Signed-off-by: Joanne Wang Co-authored-by: Joanne Wang Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] Co-authored-by: Riya <69919272+riysaxen-amzn@users.noreply.github.com> --- .../alerting/BucketLevelMonitorRunner.kt | 125 +++++- .../alerting/DocumentLevelMonitorRunner.kt | 83 +++- .../org/opensearch/alerting/InputService.kt | 90 ++-- .../alerting/MonitorRunnerExecutionContext.kt | 2 + .../opensearch/alerting/model/AlertContext.kt | 49 +++ .../BucketLevelTriggerExecutionContext.kt | 25 +- .../DocumentLevelTriggerExecutionContext.kt | 15 +- .../alerting/util/AggregationQueryRewriter.kt | 64 ++- .../opensearch/alerting/util/AlertingUtils.kt | 93 +++- .../alerting/DocumentMonitorRunnerIT.kt | 216 ++++++++++ .../alerting/MonitorRunnerServiceIT.kt | 85 ++++ .../org/opensearch/alerting/TestHelpers.kt | 20 + .../alerting/model/AlertContextTests.kt | 396 ++++++++++++++++++ .../alerting/util/AlertingUtilsTests.kt | 179 ++++++++ 14 files changed, 1383 insertions(+), 59 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index c8adc021c..aa3422c57 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -13,10 +13,12 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.opensearchapi.withClosableContext @@ -25,7 +27,9 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.util.getCombinedTriggerRunResult +import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext +import org.opensearch.client.Client import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert @@ -221,6 +225,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { } } + // The alertSampleDocs map structure is Map>> + val alertSampleDocs = mutableMapOf>>>() for (trigger in monitor.triggers) { val alertsToUpdate = mutableSetOf() val completedAlertsToUpdate = mutableSetOf() @@ -231,6 +237,32 @@ object BucketLevelMonitorRunner : MonitorRunner() { ?: mutableListOf() // Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts) + + // Only collect sample docs for triggered triggers, and only when at least 1 action prints sample doc data. + val isTriggered = !nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty() + if (isTriggered && printsSampleDocData(trigger)) { + try { + val searchRequest = monitorCtx.inputService!!.getSearchRequest( + monitor = monitor.copy(triggers = listOf(trigger)), + searchInput = monitor.inputs[0] as SearchInput, + periodStart = periodStart, + periodEnd = periodEnd, + prevResult = monitorResult.inputResults, + matchingDocIdsPerIndex = null, + returnSampleDocs = true + ) + val sampleDocumentsByBucket = getSampleDocs( + client = monitorCtx.client!!, + monitorId = monitor.id, + triggerId = trigger.id, + searchRequest = searchRequest + ) + alertSampleDocs[trigger.id] = sampleDocumentsByBucket + } catch (e: Exception) { + logger.error("Error retrieving sample documents for trigger {} of monitor {}.", trigger.id, monitor.id, e) + } + } + val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf() val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf() @@ -256,9 +288,12 @@ object BucketLevelMonitorRunner : MonitorRunner() { for (alertCategory in actionExecutionScope.actionableAlerts) { val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() for (alert in alertsToExecuteActionsFor) { + val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert) + else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs) + val actionCtx = getActionContextForAlertCategory( alertCategory, - alert, + alertContext, triggerCtx, monitorOrTriggerError ) @@ -292,7 +327,9 @@ object BucketLevelMonitorRunner : MonitorRunner() { val actionCtx = triggerCtx.copy( dedupedAlerts = dedupedAlerts, - newAlerts = newAlerts, + newAlerts = newAlerts.map { + getAlertContext(alert = it, alertSampleDocs = alertSampleDocs) + }, completedAlerts = completedAlerts, error = monitorResult.error ?: triggerResult.error ) @@ -487,17 +524,93 @@ object BucketLevelMonitorRunner : MonitorRunner() { private fun getActionContextForAlertCategory( alertCategory: AlertCategory, - alert: Alert, + alertContext: AlertContext, ctx: BucketLevelTriggerExecutionContext, error: Exception? ): BucketLevelTriggerExecutionContext { return when (alertCategory) { AlertCategory.DEDUPED -> - ctx.copy(dedupedAlerts = listOf(alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error) + ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error) AlertCategory.NEW -> - ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alert), completedAlerts = emptyList(), error = error) + ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error) AlertCategory.COMPLETED -> - ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alert), error = error) + ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error) + } + } + + private fun getAlertContext( + alert: Alert, + alertSampleDocs: Map>>> + ): AlertContext { + val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash() + val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey) + return if (!bucketKey.isNullOrEmpty() && !sampleDocs.isNullOrEmpty()) { + AlertContext(alert = alert, sampleDocs = sampleDocs) + } else { + logger.error( + "Failed to retrieve sample documents for alert {} from trigger {} of monitor {} during execution {}.", + alert.id, + alert.triggerId, + alert.monitorId, + alert.executionId + ) + AlertContext(alert = alert, sampleDocs = listOf()) } } + + /** + * Executes the monitor's query with the addition of 2 top_hits aggregations that are used to return the top 5, + * and bottom 5 documents for each bucket. + * + * @return Map> + */ + @Suppress("UNCHECKED_CAST") + private suspend fun getSampleDocs( + client: Client, + monitorId: String, + triggerId: String, + searchRequest: SearchRequest + ): Map>> { + val sampleDocumentsByBucket = mutableMapOf>>() + val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf()) as Map + val compositeAgg = aggs.getOrDefault("composite_agg", mapOf()) as Map + val buckets = compositeAgg.getOrDefault("buckets", emptyList>()) as List> + + buckets.forEach { bucket -> + val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf()) as Map).values.toList()) + if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.") + + val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf()) as Map) + .getOrDefault("hits", mapOf()) as Map + val topHits = unwrappedTopHits.getOrDefault("hits", listOf>()) as List> + + val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf()) as Map) + .getOrDefault("hits", mapOf()) as Map + val lowHits = unwrappedLowHits.getOrDefault("hits", listOf>()) as List> + + // Reversing the order of lowHits so allHits will be in descending order. + val allHits = topHits + lowHits.reversed() + + if (allHits.isEmpty()) { + // We expect sample documents to be available for each bucket. + logger.error("Sample documents not found for trigger {} of monitor {}.", triggerId, monitorId) + } + + // Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each. + // The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data. + val uniqueHitIds = mutableSetOf() + val dedupedHits = mutableListOf>() + allHits.forEach { hit -> + val hitId = hit["_id"] as String + if (!uniqueHitIds.contains(hitId)) { + uniqueHitIds.add(hitId) + dedupedHits.add(hit) + } + } + sampleDocumentsByBucket[bucketKey] = dedupedHits + } + + return sampleDocumentsByBucket + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index ff939418a..7262b9260 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -13,22 +13,28 @@ import org.opensearch.action.admin.indices.refresh.RefreshAction import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.get.MultiGetItemResponse +import org.opensearch.action.get.MultiGetRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage +import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.util.parseSampleDocTags +import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata @@ -64,6 +70,7 @@ import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant @@ -83,6 +90,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * Docs are fetched from the source index per shard and transformed.*/ val transformedDocs = mutableListOf>() + // Maps a finding ID to the related document. + private val findingIdToDocSource = mutableMapOf() + override suspend fun runMonitor( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, @@ -95,6 +105,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + monitorCtx.findingsToTriggeredQueries = mutableMapOf() try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) @@ -455,7 +466,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() { error = monitorResult.error ?: triggerResult.error ) + if (printsSampleDocData(trigger) && triggerFindingDocPairs.isNotEmpty()) + getDocSources( + findingToDocPairs = findingToDocPairs, + monitorCtx = monitorCtx, + monitor = monitor + ) + val alerts = mutableListOf() + val alertContexts = mutableListOf() triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), @@ -466,6 +485,18 @@ class DocumentLevelMonitorRunner : MonitorRunner() { workflorwRunContext = workflowRunContext ) alerts.add(alert) + + val docSource = findingIdToDocSource[alert.findingIds.first()]?.response?.convertToMap() + + alertContexts.add( + AlertContext( + alert = alert, + associatedQueries = alert.findingIds.flatMap { findingId -> + monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList() + }, + sampleDocs = listOfNotNull(docSource) + ) + ) } val shouldDefaultToPerExecution = defaultToPerExecutionAction( @@ -479,13 +510,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() { for (action in trigger.actions) { val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { - for (alert in alerts) { - val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) - triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } - triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) + for (alertContext in alertContexts) { + val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alertContext)), monitorCtx, monitor, dryrun) + triggerResult.actionResultsMap.getOrPut(alertContext.alert.id) { mutableMapOf() } + triggerResult.actionResultsMap[alertContext.alert.id]?.set(action.id, actionResults) } - } else if (alerts.isNotEmpty()) { - val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun) + } else if (alertContexts.isNotEmpty()) { + val actionResults = this.runAction(action, actionCtx.copy(alerts = alertContexts), monitorCtx, monitor, dryrun) for (alert in alerts) { triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) @@ -532,6 +563,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { val findingDocPairs = mutableListOf>() val findings = mutableListOf() val indexRequests = mutableListOf() + val findingsToTriggeredQueries = mutableMapOf>() docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } @@ -552,6 +584,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) findingDocPairs.add(Pair(finding.id, it.key)) findings.add(finding) + findingsToTriggeredQueries[finding.id] = triggeredQueries val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) @@ -578,6 +611,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() { // suppress exception logger.error("Optional finding callback failed", e) } + + if (monitorCtx.findingsToTriggeredQueries == null) monitorCtx.findingsToTriggeredQueries = findingsToTriggeredQueries + else monitorCtx.findingsToTriggeredQueries = monitorCtx.findingsToTriggeredQueries!! + findingsToTriggeredQueries + return findingDocPairs } @@ -1047,6 +1084,40 @@ class DocumentLevelMonitorRunner : MonitorRunner() { return numDocs >= maxNumDocsThreshold } + /** + * Performs an mGet request to retrieve the documents associated with findings. + * + * When possible, this will only retrieve the document fields that are specifically + * referenced for printing in the mustache template. + */ + private suspend fun getDocSources( + findingToDocPairs: List>, + monitorCtx: MonitorRunnerExecutionContext, + monitor: Monitor + ) { + val docFieldTags = parseSampleDocTags(monitor.triggers) + val request = MultiGetRequest() + + // Perform mGet request in batches. + findingToDocPairs.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> + batch.forEach { (findingId, docIdAndIndex) -> + val docIdAndIndexSplit = docIdAndIndex.split("|") + val docId = docIdAndIndexSplit[0] + val concreteIndex = docIdAndIndexSplit[1] + if (findingId.isNotEmpty() && docId.isNotEmpty() && concreteIndex.isNotEmpty()) { + val docItem = MultiGetRequest.Item(concreteIndex, docId) + if (docFieldTags.isNotEmpty()) + docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) + request.add(docItem) + } + val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) } + response.responses.forEach { item -> + findingIdToDocSource[findingId] = item + } + } + } + } + /** * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name * and doc source. A list of these POJOs would be passed to percolate query execution logic. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index b31e21d5f..24c92b2b1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -74,38 +74,15 @@ class InputService( monitor.inputs.forEach { input -> when (input) { is SearchInput -> { - // TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient - val searchParams = mapOf( - "period_start" to periodStart.toEpochMilli(), - "period_end" to periodEnd.toEpochMilli() + val searchRequest = getSearchRequest( + monitor = monitor, + searchInput = input, + periodStart = periodStart, + periodEnd = periodEnd, + prevResult = prevResult, + matchingDocIdsPerIndex = matchingDocIdsPerIndex, + returnSampleDocs = false ) - - // Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly - // which causes a strange bug where the rewritten query persists on the Monitor across executions - val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers) - - // Rewrite query to consider the doc ids per given index - if (chainedFindingExist(matchingDocIdsPerIndex) && rewrittenQuery.query() != null) { - val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), matchingDocIdsPerIndex!!) - rewrittenQuery.query(updatedSourceQuery) - } - - val searchSource = scriptService.compile( - Script( - ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, - rewrittenQuery.toString(), searchParams - ), - TemplateScript.CONTEXT - ) - .newInstance(searchParams) - .execute() - - val searchRequest = SearchRequest() - .indices(*input.indices.toTypedArray()) - .preference(Preference.PRIMARY_FIRST.type()) - XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { - searchRequest.source(SearchSourceBuilder.fromXContent(it)) - } val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } aggTriggerAfterKey += AggregationQueryRewriter.getAfterKeysFromSearchResponse( searchResponse, @@ -223,4 +200,55 @@ class InputService( InputRunResults(emptyList(), e) } } + + fun getSearchRequest( + monitor: Monitor, + searchInput: SearchInput, + periodStart: Instant, + periodEnd: Instant, + prevResult: InputRunResults?, + matchingDocIdsPerIndex: Map>?, + returnSampleDocs: Boolean = false + ): SearchRequest { + // TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient + val searchParams = mapOf( + "period_start" to periodStart.toEpochMilli(), + "period_end" to periodEnd.toEpochMilli() + ) + + // Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly + // which causes a strange bug where the rewritten query persists on the Monitor across executions + val rewrittenQuery = AggregationQueryRewriter.rewriteQuery( + deepCopyQuery(searchInput.query), + prevResult, + monitor.triggers, + returnSampleDocs + ) + + // Rewrite query to consider the doc ids per given index + if (chainedFindingExist(matchingDocIdsPerIndex) && rewrittenQuery.query() != null) { + val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), matchingDocIdsPerIndex!!) + rewrittenQuery.query(updatedSourceQuery) + } + + val searchSource = scriptService.compile( + Script( + ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, + rewrittenQuery.toString(), searchParams + ), + TemplateScript.CONTEXT + ) + .newInstance(searchParams) + .execute() + + val searchRequest = SearchRequest() + .indices(*searchInput.indices.toTypedArray()) + .preference(Preference.PRIMARY_FIRST.type()) + + XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { + searchRequest.source(SearchSourceBuilder.fromXContent(it)) + } + + return searchRequest + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 424656c6b..307c88b3b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -17,6 +17,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService @@ -38,6 +39,7 @@ data class MonitorRunnerExecutionContext( var docLevelMonitorQueries: DocLevelMonitorQueries? = null, var workflowService: WorkflowService? = null, var jvmStats: JvmStats? = null, + var findingsToTriggeredQueries: Map>? = null, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt new file mode 100644 index 000000000..f981691c8 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.DocLevelQuery + +/** + * This model is a wrapper for [Alert] that should only be used to create a more + * informative alert object to enrich mustache template notification messages. + */ +data class AlertContext( + val alert: Alert, + val associatedQueries: List? = null, + val sampleDocs: List>? = null +) { + fun asTemplateArg(): Map { + val queriesContext = associatedQueries?.map { + mapOf( + DocLevelQuery.QUERY_ID_FIELD to it.id, + DocLevelQuery.NAME_FIELD to it.name, + DocLevelQuery.TAGS_FIELD to it.tags + ) + } + + // Compile the custom context fields. + val customContextFields = mapOf( + ASSOCIATED_QUERIES_FIELD to queriesContext, + SAMPLE_DOCS_FIELD to sampleDocs + ) + + // Get the alert template args + val templateArgs = alert.asTemplateArg().toMutableMap() + + // Add the non-null custom context fields to the alert templateArgs. + customContextFields.forEach { (key, value) -> + value?.let { templateArgs[key] = it } + } + return templateArgs + } + + companion object { + const val ASSOCIATED_QUERIES_FIELD = "associated_queries" + const val SAMPLE_DOCS_FIELD = "sample_documents" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt index 72518ed48..597ff5b3e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt @@ -5,6 +5,8 @@ package org.opensearch.alerting.script +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.Alert @@ -12,6 +14,8 @@ import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Monitor import java.time.Instant +private val logger = LogManager.getLogger(BucketLevelTriggerExecutionContext::class.java) + data class BucketLevelTriggerExecutionContext( override val monitor: Monitor, val trigger: BucketLevelTrigger, @@ -19,7 +23,7 @@ data class BucketLevelTriggerExecutionContext( override val periodStart: Instant, override val periodEnd: Instant, val dedupedAlerts: List = listOf(), - val newAlerts: List = listOf(), + val newAlerts: List = listOf(), val completedAlerts: List = listOf(), override val error: Exception? = null ) : TriggerExecutionContext(monitor, results, periodStart, periodEnd, error) { @@ -29,7 +33,7 @@ data class BucketLevelTriggerExecutionContext( trigger: BucketLevelTrigger, monitorRunResult: MonitorRunResult, dedupedAlerts: List = listOf(), - newAlerts: List = listOf(), + newAlerts: List = listOf(), completedAlerts: List = listOf() ) : this( monitor, trigger, monitorRunResult.inputResults.results, monitorRunResult.periodStart, monitorRunResult.periodEnd, @@ -42,10 +46,19 @@ data class BucketLevelTriggerExecutionContext( */ override fun asTemplateArg(): Map { val tempArg = super.asTemplateArg().toMutableMap() - tempArg["trigger"] = trigger.asTemplateArg() - tempArg["dedupedAlerts"] = dedupedAlerts.map { it.asTemplateArg() } - tempArg["newAlerts"] = newAlerts.map { it.asTemplateArg() } - tempArg["completedAlerts"] = completedAlerts.map { it.asTemplateArg() } + tempArg[TRIGGER_FIELD] = trigger.asTemplateArg() + tempArg[DEDUPED_ALERTS_FIELD] = dedupedAlerts.map { it.asTemplateArg() } + tempArg[NEW_ALERTS_FIELD] = newAlerts.map { it.asTemplateArg() } + tempArg[COMPLETED_ALERTS_FIELD] = completedAlerts.map { it.asTemplateArg() } + tempArg[RESULTS_FIELD] = results return tempArg } + + companion object { + const val TRIGGER_FIELD = "trigger" + const val DEDUPED_ALERTS_FIELD = "dedupedAlerts" + const val NEW_ALERTS_FIELD = "newAlerts" + const val COMPLETED_ALERTS_FIELD = "completedAlerts" + const val RESULTS_FIELD = "results" + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt index 66de731f6..543e6bdf7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.script -import org.opensearch.commons.alerting.model.Alert +import org.opensearch.alerting.model.AlertContext import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor import java.time.Instant @@ -16,7 +16,7 @@ data class DocumentLevelTriggerExecutionContext( override val results: List>, override val periodStart: Instant, override val periodEnd: Instant, - val alerts: List = listOf(), + val alerts: List = listOf(), val triggeredDocs: List, val relatedFindings: List, override val error: Exception? = null @@ -25,7 +25,7 @@ data class DocumentLevelTriggerExecutionContext( constructor( monitor: Monitor, trigger: DocumentLevelTrigger, - alerts: List = listOf() + alerts: List = listOf() ) : this( monitor, trigger, emptyList(), Instant.now(), Instant.now(), alerts, emptyList(), emptyList(), null @@ -37,8 +37,13 @@ data class DocumentLevelTriggerExecutionContext( */ override fun asTemplateArg(): Map { val tempArg = super.asTemplateArg().toMutableMap() - tempArg["trigger"] = trigger.asTemplateArg() - tempArg["alerts"] = alerts.map { it.asTemplateArg() } + tempArg[TRIGGER_FIELD] = trigger.asTemplateArg() + tempArg[ALERTS_FIELD] = alerts.map { it.asTemplateArg() } return tempArg } + + companion object { + const val TRIGGER_FIELD = "trigger" + const val ALERTS_FIELD = "alerts" + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt index e1b6675b2..3989bd384 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt @@ -8,15 +8,20 @@ package org.opensearch.alerting.util import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.TriggerAfterKey +import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Trigger import org.opensearch.search.aggregations.AggregationBuilder +import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.bucket.SingleBucketAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder import org.opensearch.search.aggregations.support.AggregationPath import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext +import org.opensearch.search.sort.SortOrder class AggregationQueryRewriter { @@ -26,10 +31,23 @@ class AggregationQueryRewriter { * for each trigger. */ fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List): SearchSourceBuilder { + return rewriteQuery(query, prevResult, triggers, false) + } + + /** + * Optionally adds support for returning sample documents for each bucket of data returned for a bucket level monitor. + */ + fun rewriteQuery( + query: SearchSourceBuilder, + prevResult: InputRunResults?, + triggers: List, + returnSampleDocs: Boolean = false + ): SearchSourceBuilder { triggers.forEach { trigger -> if (trigger is BucketLevelTrigger) { // add bucket selector pipeline aggregation for each trigger in query query.aggregation(trigger.bucketSelector) + // if this request is processing the subsequent pages of input query result, then add after key if (prevResult?.aggTriggersAfterKey?.get(trigger.id) != null) { val parentBucketPath = AggregationPath.parse(trigger.bucketSelector.parentBucketPath) @@ -48,11 +66,29 @@ class AggregationQueryRewriter { throw IllegalArgumentException("ParentBucketPath: $parentBucketPath not found in input query results") } } + if (factory is CompositeAggregationBuilder) { - // if the afterKey from previous result is null, what does it signify? - // A) result set exhausted OR B) first page ? - val afterKey = prevResult.aggTriggersAfterKey[trigger.id]!!.afterKey - factory.aggregateAfter(afterKey) + if (returnSampleDocs) { + // TODO: Returning sample documents should ideally be a toggleable option at the action level. + // For now, identify which fields to return from the doc _source for the trigger's actions. + val docFieldTags = parseSampleDocTags(listOf(trigger)) + val sampleDocsAgg = getSampleDocAggs(factory) + sampleDocsAgg.forEach { agg -> + if (docFieldTags.isNotEmpty()) agg.fetchSource( + FetchSourceContext( + true, + docFieldTags.toTypedArray(), + emptyArray() + ) + ) + if (!factory.subAggregations.contains(agg)) factory.subAggregation(agg) + } + } else { + // if the afterKey from previous result is null, what does it signify? + // A) result set exhausted OR B) first page ? + val afterKey = prevResult.aggTriggersAfterKey[trigger.id]!!.afterKey + factory.aggregateAfter(afterKey) + } } else { throw IllegalStateException("AfterKeys are not expected to be present in non CompositeAggregationBuilder") } @@ -110,5 +146,25 @@ class AggregationQueryRewriter { } return bucketLevelTriggerAfterKeys } + + @Suppress("UNCHECKED_CAST") + private fun getSampleDocAggs(factory: CompositeAggregationBuilder): List { + var defaultSortFields = listOf("_score") + val aggregations = factory.subAggregations.flatMap { + (it.convertToMap()[it.name] as Map).values.flatMap { field -> + field as Map + field.values + } + } + if (aggregations.isNotEmpty()) defaultSortFields = aggregations + + val lowHitsAgg = AggregationBuilders.topHits("low_hits").size(5) + val topHitsAgg = AggregationBuilders.topHits("top_hits").size(5) + defaultSortFields.forEach { + lowHitsAgg.sort(it, SortOrder.ASC) + topHitsAgg.sort(it, SortOrder.DESC) + } + return listOf(lowHitsAgg, topHitsAgg) + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 33911b216..a1c33c7b9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -6,17 +6,24 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.model.AggregationResultBucket +import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Trigger import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope import org.opensearch.commons.alerting.util.isBucketLevelMonitor +import org.opensearch.script.Script private val logger = LogManager.getLogger("AlertingUtils") @@ -75,7 +82,9 @@ fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.Monitor * Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used * as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values. */ -fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinToString(separator = "#") +fun AggregationResultBucket.getBucketKeysHash(): String = getBucketKeysHash(this.bucketKeys) + +fun getBucketKeysHash(bucketKeys: List): String = bucketKeys.joinToString(separator = "#") fun Action.getActionExecutionPolicy(monitor: Monitor): ActionExecutionPolicy? { // When the ActionExecutionPolicy is null for an Action, the default is resolved at runtime @@ -139,3 +148,85 @@ fun defaultToPerExecutionAction( return false } + +/** + * Mustache template supports iterating through a list using a `{{#listVariable}}{{/listVariable}}` block. + * https://mustache.github.io/mustache.5.html + * + * This function looks for `{{#${[AlertContext.SAMPLE_DOCS_FIELD]}}}{{/${[AlertContext.SAMPLE_DOCS_FIELD]}}}` blocks, + * and parses the contents for tags, which we interpret as fields within the sample document. + * + * @return a [Set] of [String]s indicating fields within a document. + */ +fun parseSampleDocTags(messageTemplate: Script): Set { + val sampleBlockPrefix = "{{#${AlertContext.SAMPLE_DOCS_FIELD}}}" + val sampleBlockSuffix = "{{/${AlertContext.SAMPLE_DOCS_FIELD}}}" + val sourcePrefix = "_source." + val tagRegex = Regex("\\{\\{([^{}]+)}}") + val tags = mutableSetOf() + try { + // Identify the start and end points of the sample block + var blockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix) + var blockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, blockStart) + + // Sample start/end of -1 indicates there are no more complete sample blocks + while (blockStart != -1 && blockEnd != -1) { + // Isolate the sample block + val sampleBlock = messageTemplate.idOrCode.substring(blockStart, blockEnd) + // Remove the iteration wrapper tags + .removePrefix(sampleBlockPrefix) + .removeSuffix(sampleBlockSuffix) + + // Search for each tag + tagRegex.findAll(sampleBlock).forEach { match -> + // Parse the field name from the tag (e.g., `{{_source.timestamp}}` becomes `timestamp`) + var docField = match.groupValues[1].trim() + if (docField.startsWith(sourcePrefix)) { + docField = docField.removePrefix(sourcePrefix) + if (docField.isNotEmpty()) tags.add(docField) + } + } + + // Identify any subsequent sample blocks + blockStart = messageTemplate.idOrCode.indexOf(sampleBlockPrefix, blockEnd) + blockEnd = messageTemplate.idOrCode.indexOf(sampleBlockSuffix, blockStart) + } + } catch (e: Exception) { + logger.warn("Failed to parse sample document fields.", e) + } + return tags +} + +fun parseSampleDocTags(triggers: List): Set { + return triggers.flatMap { trigger -> + trigger.actions.flatMap { action -> parseSampleDocTags(action.messageTemplate) } + }.toSet() +} + +/** + * Checks the `message_template.source` in the [Script] for each [Action] in the [Trigger] for + * any instances of [AlertContext.SAMPLE_DOCS_FIELD] tags. + * This indicates the message is expected to print data from the sample docs, so we need to collect the samples. + */ +fun printsSampleDocData(trigger: Trigger): Boolean { + return trigger.actions.any { action -> + val alertsField = when (trigger) { + is BucketLevelTrigger -> "{{ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}}" + is DocumentLevelTrigger -> "{{ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}}" + // Only bucket, and document level monitors are supported currently. + else -> return false + } + + // TODO: Consider excluding the following tags from TRUE criteria (especially for bucket-level triggers) as + // printing all of the sample documents could make the notification message too large to send. + // 1. {{ctx}} - prints entire ctx object in the message string + // 2. {{ctx.}} - prints entire alerts array in the message string, which includes the sample docs + // 3. {{AlertContext.SAMPLE_DOCS_FIELD}} - prints entire sample docs array in the message string + val validTags = listOfNotNull( + "{{ctx}}", + alertsField, + AlertContext.SAMPLE_DOCS_FIELD + ) + validTags.any { tag -> action.messageTemplate.idOrCode.contains(tag) } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index d0a724775..c50371884 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -1989,6 +1989,222 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals(1, output.objectMap("trigger_results").values.size) } + fun `test document-level monitor notification message includes queries`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "test-query") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val alertCategories = AlertCategory.values() + val actionExecutionScope = PerAlertActionScope( + actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet() + ) + val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope) + val actions = (0..randomInt(10)).map { + randomActionWithPolicy( + template = randomTemplateScript( + "{{#ctx.alerts}}\n{{#associated_queries}}\n(name={{name}})\n{{/associated_queries}}\n{{/ctx.alerts}}" + ), + destinationId = createDestination().id, + actionExecutionPolicy = actionExecutionPolicy + ) + } + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex"))) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(2, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + assertEquals(actions.size, alertActionResult.values.size) + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") + val actionOutput = (actionResult as Map>)["output"] as Map + assertTrue( + "The notification message is missing the query name.", + actionOutput["message"]!!.contains("(name=${docQuery.name})") + ) + } + } + } + } + + fun `test expected document and rules print in notification message`() { + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "Test message", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = DocLevelQuery(query = "\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + // Prints all fields in doc source + val scriptSource1 = """ + Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n + - Trigger: {{ctx.trigger.name}}\n + - Severity: {{ctx.trigger.severity}}\n + - Period start: {{ctx.periodStart}}\n + - Period end: {{ctx.periodEnd}}\n\n + - New Alerts:\n + {{#ctx.alerts}}\n + Document values + {{#sample_documents}}\n + Test field: {{_source.test_field}}\n + Message: {{_source.message}}\n + Timestamp: {{_source.test_strict_date_time}}\n + {{/sample_documents}}\n + \n + Matching queries\n + {{#associated_queries}}\n + Query ID: {{id}}\n + Query name: {{name}}\n + {{/associated_queries}}\n + {{/ctx.alerts}} + """.trimIndent() + + // Only prints a few fields from the doc source + val scriptSource2 = """ + Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n + - Trigger: {{ctx.trigger.name}}\n + - Severity: {{ctx.trigger.severity}}\n + - Period start: {{ctx.periodStart}}\n + - Period end: {{ctx.periodEnd}}\n\n + - New Alerts:\n + {{#ctx.alerts}}\n + Document values + {{#sample_documents}}\n + Test field: {{_source.test_field}}\n + Message: {{_source.message}}\n + {{/sample_documents}}\n + \n + Matching queries\n + {{#associated_queries}}\n + Query ID: {{id}}\n + Query name: {{name}}\n + {{/associated_queries}}\n + {{/ctx.alerts}} + """.trimIndent() + + // Doesn't print any document data + val scriptSource3 = """ + Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.\n + - Trigger: {{ctx.trigger.name}}\n + - Severity: {{ctx.trigger.severity}}\n + - Period start: {{ctx.periodStart}}\n + - Period end: {{ctx.periodEnd}}\n\n + - New Alerts:\n + {{#ctx.alerts}}\n + Matching queries\n + {{#associated_queries}}\n + Query ID: {{id}}\n + Query name: {{name}}\n + {{/associated_queries}}\n + {{/ctx.alerts}} + """.trimIndent() + + // Using 'alert.copy()' here because 'randomAction()' applies the 'template' for the message subject, and message body + val actions = listOf( + randomAction(name = "action1", template = randomTemplateScript("action1 message"), destinationId = createDestination().id) + .copy(messageTemplate = randomTemplateScript(scriptSource1)), + randomAction(name = "action2", template = randomTemplateScript("action2 message"), destinationId = createDestination().id) + .copy(messageTemplate = randomTemplateScript(scriptSource2)), + randomAction(name = "action3", template = randomTemplateScript("action3 message"), destinationId = createDestination().id) + .copy(messageTemplate = randomTemplateScript(scriptSource3)) + ) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)) + ) + ) + + indexDoc(index, "", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + val triggerResults = output.objectMap("trigger_results") + assertEquals(1, triggerResults.values.size) + + val expectedMessageContents = mapOf( + "action1" to Pair( + // First item in pair is INCLUDED content + listOf( + "Test field: us-west-2", + "Message: Test message", + "Timestamp: $testTime", + "Query ID: ${docQuery.id}", + "Query name: ${docQuery.name}", + ), + // Second item in pair is EXCLUDED content + listOf() + ), + "action2" to Pair( + // First item in pair is INCLUDED content + listOf( + "Test field: us-west-2", + "Message: Test message", + "Query ID: ${docQuery.id}", + "Query name: ${docQuery.name}", + ), + // Second item in pair is EXCLUDED content + listOf("Timestamp: $testTime") + ), + "action3" to Pair( + // First item in pair is INCLUDED content + listOf( + "Query ID: ${docQuery.id}", + "Query name: ${docQuery.name}", + ), + // Second item in pair is EXCLUDED content + listOf( + "Test field: us-west-2", + "Message: Test message", + "Timestamp: $testTime", + ) + ), + ) + val actionResults = triggerResults.values.first().objectMap("action_results").values.first().values + @Suppress("UNCHECKED_CAST") + actionResults.forEach { action -> + val messageContent = ((action as Map)["output"] as Map)["message"] as String + expectedMessageContents[action["name"]]!!.first.forEach { + assertTrue(messageContent.contains(it)) + } + expectedMessageContents[action["name"]]!!.second.forEach { + assertFalse(messageContent.contains(it)) + } + } + } + @Suppress("UNCHECKED_CAST") /** helper that returns a field in a json map whose values are all json objects */ private fun Map.objectMap(key: String): Map> { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index a56129850..f50d22ddb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -45,6 +45,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase import java.net.URLEncoder import java.time.Instant import java.time.ZonedDateTime @@ -53,6 +54,7 @@ import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit.DAYS import java.time.temporal.ChronoUnit.MILLIS import java.time.temporal.ChronoUnit.MINUTES +import java.util.concurrent.TimeUnit import kotlin.collections.HashMap class MonitorRunnerServiceIT : AlertingRestTestCase() { @@ -1903,6 +1905,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { mutableMapOf(Pair(actionThrottleEnabled.id, 0), Pair(actionThrottleNotEnabled.id, 0)) ) assertEquals(notThrottledActionResults.size, 2) + // Save the lastExecutionTimes of the actions for the Alert to be compared later against // the next Monitor execution run previousAlertExecutionTime[it.id] = mutableMapOf() @@ -1949,6 +1952,88 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { } } + fun `test bucket-level monitor notification message includes sample docs per bucket`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedData( + testIndex, + listOf( + "test_value_1", + "test_value_1", + "test_value_2" + ) + ) + + val messageSource = "{{#ctx.newAlerts}}\n{{#sample_documents}}\n (docId={{_id}}) \n{{/sample_documents}}\n{{/ctx.newAlerts}}" + val bucket1DocIds = listOf("(docId=1)", "(docId=2)") + val bucket2DocIds = listOf("(docId=3)") + + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + val action = randomAction( + template = randomTemplateScript(source = messageSource), + destinationId = createDestination().id + ) + var trigger = randomBucketLevelTrigger(actions = listOf(action)) + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + + val output = entityAsMap(executeMonitor(monitor.id)) + // The 'events' in this case are the bucketKeys hashes representing the Alert events + val expectedEvents = setOf("test_value_1", "test_value_2") + + assertEquals(monitor.name, output["monitor_name"]) + for (triggerResult in output.objectMap("trigger_results").values) { + for (alertEvent in triggerResult.objectMap("action_results")) { + assertTrue(expectedEvents.contains(alertEvent.key)) + val actionResults = alertEvent.value.values as Collection> + for (actionResult in actionResults) { + val actionOutput = actionResult["output"] as Map + if (actionResult["name"] == action.name) { + when (alertEvent.key) { + "test_value_1" -> bucket1DocIds.forEach { docEntry -> + assertTrue( + "The notification message is missing docEntry $docEntry", + !actionOutput["message"].isNullOrEmpty() && actionOutput["message"]!!.contains(docEntry) + ) + } + "test_value_2" -> bucket2DocIds.forEach { docEntry -> + assertTrue( + "The notification message is missing docEntry $docEntry", + !actionOutput["message"].isNullOrEmpty() && actionOutput["message"]!!.contains(docEntry) + ) + } + } + } else { + fail("Unknown action: ${actionResult["name"]}") + } + } + } + } + } + private fun prepareTestAnomalyResult(detectorId: String, user: User) { val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17" try { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 21fb54995..37620ebbe 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -9,6 +9,7 @@ import junit.framework.TestCase.assertNull import org.apache.http.Header import org.apache.http.HttpEntity import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults @@ -795,3 +796,22 @@ fun randomChainedAlertTrigger( } else actions ) } + +fun randomAlertContext( + alert: Alert = randomAlert(), + associatedQueries: List? = (-1..2).random().takeIf { it != -1 }?.let { + (0..it).map { randomDocLevelQuery() } + }, + sampleDocs: List>? = (-1..2).random().takeIf { it != -1 }?.let { + (0..it).map { + // Using 'randomFinding' to mimic documents in an index. + randomFinding().asTemplateArg() + } + } +): AlertContext { + return AlertContext( + alert = alert, + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt new file mode 100644 index 000000000..08a7d14b1 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertContextTests.kt @@ -0,0 +1,396 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.randomAlertContext +import org.opensearch.alerting.randomDocLevelQuery +import org.opensearch.alerting.randomFinding +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.test.OpenSearchTestCase + +@Suppress("UNCHECKED_CAST") +class AlertContextTests : OpenSearchTestCase() { + + fun `test AlertContext asTemplateArg with null associatedQueries and null sampleDocs`() { + val associatedQueries: List? = null + val sampleDocs: List>? = null + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertNull("Template associated queries should be null", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with null associatedQueries and 0 sampleDocs`() { + val associatedQueries: List? = null + val sampleDocs: List> = listOf() + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertNull("Template associated queries should be null", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template args sample docs should have size ${sampleDocs!!.size}", + sampleDocs!!.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with null associatedQueries and 1 sampleDocs`() { + val associatedQueries: List? = null + val sampleDocs: List> = listOf(randomFinding().asTemplateArg()) + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertNull("Template associated queries should be null", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with null associatedQueries and multiple sampleDocs`() { + val associatedQueries: List? = null + val sampleDocs: List> = (0..2).map { randomFinding().asTemplateArg() } + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertNull("Template associated queries should be null", templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD]) + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 0 associatedQueries and null sampleDocs`() { + val associatedQueries: List = listOf() + val sampleDocs: List>? = null + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 1 associatedQueries and null sampleDocs`() { + val associatedQueries: List = listOf(randomDocLevelQuery()) + val sampleDocs: List>? = null + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with multiple associatedQueries and null sampleDocs`() { + val associatedQueries: List = (0..2).map { randomDocLevelQuery() } + val sampleDocs: List>? = null + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + assertNull("Template sample docs should be null", templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 0 associatedQueries and 0 sampleDocs`() { + val associatedQueries: List = listOf() + val sampleDocs: List> = listOf() + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 0 associatedQueries and 1 sampleDocs`() { + val associatedQueries: List = listOf() + val sampleDocs: List> = listOf(randomFinding().asTemplateArg()) + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 0 associatedQueries and multiple sampleDocs`() { + val associatedQueries: List = listOf() + val sampleDocs: List> = (0..2).map { randomFinding().asTemplateArg() } + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 1 associatedQueries and 0 sampleDocs`() { + val associatedQueries: List = listOf(randomDocLevelQuery()) + val sampleDocs: List> = listOf() + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with multiple associatedQueries and 0 sampleDocs`() { + val associatedQueries: List = (0..2).map { randomDocLevelQuery() } + val sampleDocs: List> = listOf() + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with 1 associatedQueries and 1 sampleDocs`() { + val associatedQueries: List = listOf(randomDocLevelQuery()) + val sampleDocs: List> = listOf(randomFinding().asTemplateArg()) + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + fun `test AlertContext asTemplateArg with multiple associatedQueries and multiple sampleDocs`() { + val associatedQueries: List = (0..2).map { randomDocLevelQuery() } + val sampleDocs: List> = (0..2).map { randomFinding().asTemplateArg() } + val alertContext: AlertContext = randomAlertContext( + associatedQueries = associatedQueries, + sampleDocs = sampleDocs + ) + + val templateArgs = alertContext.asTemplateArg() + + assertAlertIsEqual(alertContext = alertContext, templateArgs = templateArgs) + assertEquals( + "Template args associated queries should have size ${associatedQueries.size}", + associatedQueries.size, + (templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] as List).size + ) + assertEquals( + "Template associated queries do not match", + formatAssociatedQueries(alertContext), + templateArgs[AlertContext.ASSOCIATED_QUERIES_FIELD] + ) + + assertEquals( + "Template args sample docs should have size ${sampleDocs.size}", + sampleDocs.size, + (templateArgs[AlertContext.SAMPLE_DOCS_FIELD] as List>).size + ) + assertEquals("Template args sample docs do not match", alertContext.sampleDocs, templateArgs[AlertContext.SAMPLE_DOCS_FIELD]) + } + + private fun assertAlertIsEqual(alertContext: AlertContext, templateArgs: Map) { + assertEquals("Template args id does not match", alertContext.alert.id, templateArgs[Alert.ALERT_ID_FIELD]) + assertEquals("Template args version does not match", alertContext.alert.version, templateArgs[Alert.ALERT_VERSION_FIELD]) + assertEquals("Template args state does not match", alertContext.alert.state.toString(), templateArgs[Alert.STATE_FIELD]) + assertEquals("Template args error message does not match", alertContext.alert.errorMessage, templateArgs[Alert.ERROR_MESSAGE_FIELD]) + assertEquals("Template args acknowledged time does not match", null, templateArgs[Alert.ACKNOWLEDGED_TIME_FIELD]) + assertEquals("Template args end time does not", alertContext.alert.endTime?.toEpochMilli(), templateArgs[Alert.END_TIME_FIELD]) + assertEquals("Template args start time does not", alertContext.alert.startTime.toEpochMilli(), templateArgs[Alert.START_TIME_FIELD]) + assertEquals("Template args last notification time does not match", templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null) + assertEquals("Template args severity does not match", alertContext.alert.severity, templateArgs[Alert.SEVERITY_FIELD]) + } + + private fun formatAssociatedQueries(alertContext: AlertContext): List>? { + return alertContext.associatedQueries?.map { + mapOf( + DocLevelQuery.QUERY_ID_FIELD to it.id, + DocLevelQuery.NAME_FIELD to it.name, + DocLevelQuery.TAGS_FIELD to it.tags + ) + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt new file mode 100644 index 000000000..31dcb6591 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt @@ -0,0 +1,179 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.alerting.model.AlertContext +import org.opensearch.alerting.randomAction +import org.opensearch.alerting.randomBucketLevelTrigger +import org.opensearch.alerting.randomChainedAlertTrigger +import org.opensearch.alerting.randomDocumentLevelTrigger +import org.opensearch.alerting.randomQueryLevelTrigger +import org.opensearch.alerting.randomTemplateScript +import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.test.OpenSearchTestCase + +class AlertingUtilsTests : OpenSearchTestCase() { + fun `test parseSampleDocTags only returns expected tags`() { + val expectedDocSourceTags = (0..3).map { "field$it" } + val unexpectedDocSourceTags = ((expectedDocSourceTags.size + 1)..(expectedDocSourceTags.size + 5)) + .map { "field$it" } + + val unexpectedTagsScriptSource = unexpectedDocSourceTags.joinToString { field -> "$field = {{$field}}" } + val expectedTagsScriptSource = unexpectedTagsScriptSource + """ + ${unexpectedDocSourceTags.joinToString("\n") { field -> "$field = {{$field}}" }} + {{#alerts}} + {{#${AlertContext.SAMPLE_DOCS_FIELD}}} + ${expectedDocSourceTags.joinToString("\n") { field -> "$field = {{_source.$field}}" }} + {{/${AlertContext.SAMPLE_DOCS_FIELD}}} + {{/alerts}} + """.trimIndent() + + // Action that prints doc source data + val trigger1 = randomDocumentLevelTrigger( + actions = listOf(randomAction(template = randomTemplateScript(source = expectedTagsScriptSource))) + ) + + // Action that does not print doc source data + val trigger2 = randomDocumentLevelTrigger( + actions = listOf(randomAction(template = randomTemplateScript(source = unexpectedTagsScriptSource))) + ) + + // No actions + val trigger3 = randomDocumentLevelTrigger(actions = listOf()) + + val tags = parseSampleDocTags(listOf(trigger1, trigger2, trigger3)) + + assertEquals(expectedDocSourceTags.size, tags.size) + expectedDocSourceTags.forEach { tag -> assertTrue(tags.contains(tag)) } + unexpectedDocSourceTags.forEach { tag -> assertFalse(tags.contains(tag)) } + } + + fun `test printsSampleDocData entire ctx tag returns TRUE`() { + val tag = "{{ctx}}" + val triggers = listOf( + randomBucketLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))), + randomDocumentLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))) + ) + + triggers.forEach { trigger -> assertTrue(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData entire alerts tag returns TRUE`() { + val triggers = listOf( + randomBucketLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = "{{ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}}" + ) + ) + ) + ), + randomDocumentLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = "{{ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}}" + ) + ) + ) + ) + ) + + triggers.forEach { trigger -> assertTrue(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData entire sample_docs tag returns TRUE`() { + val triggers = listOf( + randomBucketLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = """ + {{#ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}} + {{${AlertContext.SAMPLE_DOCS_FIELD}}} + {{/ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}} + """.trimIndent() + ) + ) + ) + ), + randomDocumentLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = """ + {{#ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}} + {{${AlertContext.SAMPLE_DOCS_FIELD}}} + {{/ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}} + """.trimIndent() + ) + ) + ) + ) + ) + + triggers.forEach { trigger -> assertTrue(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData sample_docs iteration block returns TRUE`() { + val triggers = listOf( + randomBucketLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = """ + {{#ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}} + "{{#${AlertContext.SAMPLE_DOCS_FIELD}}}" + {{_source.field}} + "{{/${AlertContext.SAMPLE_DOCS_FIELD}}}" + {{/ctx.${BucketLevelTriggerExecutionContext.NEW_ALERTS_FIELD}}} + """.trimIndent() + ) + ) + ) + ), + randomDocumentLevelTrigger( + actions = listOf( + randomAction( + template = randomTemplateScript( + source = """ + {{#ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}} + {{#${AlertContext.SAMPLE_DOCS_FIELD}}} + {{_source.field}} + {{/${AlertContext.SAMPLE_DOCS_FIELD}}} + {{/ctx.${DocumentLevelTriggerExecutionContext.ALERTS_FIELD}}} + """.trimIndent() + ) + ) + ) + ) + ) + + triggers.forEach { trigger -> assertTrue(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData unrelated tag returns FALSE`() { + val tag = "{{ctx.monitor.name}}" + val triggers = listOf( + randomBucketLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))), + randomDocumentLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))) + ) + + triggers.forEach { trigger -> assertFalse(printsSampleDocData(trigger)) } + } + + fun `test printsSampleDocData unsupported trigger types return FALSE`() { + val tag = "{{ctx}}" + val triggers = listOf( + randomQueryLevelTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))), + randomChainedAlertTrigger(actions = listOf(randomAction(template = randomTemplateScript(source = tag)))) + ) + + triggers.forEach { trigger -> assertFalse(printsSampleDocData(trigger)) } + } +}