From 78c68b14529c759c89b7745390753207420ccf5a Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Tue, 1 Aug 2023 14:19:57 -0700 Subject: [PATCH] Resolve integ test issues with adding test dependency Signed-off-by: Ashish Agrawal --- alerting/build.gradle | 1 + .../alerting/MonitorDataSourcesIT.kt | 462 +++++++++--------- 2 files changed, 232 insertions(+), 231 deletions(-) diff --git a/alerting/build.gradle b/alerting/build.gradle index 581b866b3..fa9a9eff4 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -115,6 +115,7 @@ dependencies { api project(":alerting-core") implementation "com.github.seancfoley:ipaddress:5.3.3" + testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "org.mockito:mockito-core:${versions.mockito}" testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index f0c841a15..dce694cc7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -3350,237 +3350,237 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(auditStateAlerts2.size, 2) } -// fun `test execute with custom alerts and finding index with bucket and doc monitor when doc monitor is used in chained finding`() { -// val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1") -// val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "2") -// -// var docLevelMonitor = randomDocumentLevelMonitor( -// inputs = listOf(DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2))), -// triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)), -// dataSources = DataSources( -// alertsIndex = "custom_alerts_index", -// findingsIndex = "custom_findings_index", -// findingsIndexPattern = "custom_findings_index-1" -// ) -// ) -// -// val docLevelMonitorResponse = createMonitor(docLevelMonitor)!! -// -// val query = QueryBuilders.rangeQuery("test_strict_date_time") -// .gt("{{period_end}}||-10d") -// .lte("{{period_end}}") -// .format("epoch_millis") -// val compositeSources = listOf( -// TermsValuesSourceBuilder("test_field_1").field("test_field_1") -// ) -// val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) -// val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) -// // Bucket level monitor will reduce the size of matched doc ids on those that belong to a bucket that contains more than 1 document after term grouping -// val triggerScript = """ -// params.docCount > 1 -// """.trimIndent() -// -// var trigger = randomBucketLevelTrigger() -// trigger = trigger.copy( -// bucketSelector = BucketSelectorExtAggregationBuilder( -// name = trigger.id, -// bucketsPathsMap = mapOf("docCount" to "_count"), -// script = Script(triggerScript), -// parentBucketPath = "composite_agg", -// filter = null, -// ) -// ) -// -// val bucketLevelMonitorResponse = createMonitor( -// randomBucketLevelMonitor( -// inputs = listOf(input), -// enabled = false, -// triggers = listOf(trigger), -// dataSources = DataSources( -// findingsEnabled = true, -// alertsIndex = "custom_alerts_index", -// findingsIndex = "custom_findings_index", -// findingsIndexPattern = "custom_findings_index-1" -// ) -// ) -// )!! -// -// var docLevelMonitor1 = randomDocumentLevelMonitor( -// // Match the documents with test_field_1: test_value_3 -// inputs = listOf(DocLevelMonitorInput("description", listOf(index), listOf(docQuery2))), -// triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)), -// dataSources = DataSources( -// findingsEnabled = true, -// alertsIndex = "custom_alerts_index_1", -// findingsIndex = "custom_findings_index_1", -// findingsIndexPattern = "custom_findings_index_1-1" -// ) -// ) -// -// val docLevelMonitorResponse1 = createMonitor(docLevelMonitor1)!! -// -// val queryMonitorInput = SearchInput( -// indices = listOf(index), -// query = SearchSourceBuilder().query( -// QueryBuilders -// .rangeQuery("test_strict_date_time") -// .gt("{{period_end}}||-10d") -// .lte("{{period_end}}") -// .format("epoch_millis") -// ) -// ) -// val queryTriggerScript = """ -// return ctx.results[0].hits.hits.size() > 0 -// """.trimIndent() -// -// val queryLevelTrigger = randomQueryLevelTrigger(condition = Script(queryTriggerScript)) -// val queryMonitorResponse = -// createMonitor(randomQueryLevelMonitor(inputs = listOf(queryMonitorInput), triggers = listOf(queryLevelTrigger)))!! -// -// // 1. docMonitor (chainedFinding = null) 2. bucketMonitor (chainedFinding = docMonitor) 3. docMonitor (chainedFinding = bucketMonitor) 4. queryMonitor (chainedFinding = docMonitor 3) -// var workflow = randomWorkflow( -// monitorIds = listOf( -// docLevelMonitorResponse.id, -// bucketLevelMonitorResponse.id, -// docLevelMonitorResponse1.id, -// queryMonitorResponse.id -// ), -// auditDelegateMonitorAlerts = false -// ) -// val workflowResponse = upsertWorkflow(workflow)!! -// val workflowById = searchWorkflow(workflowResponse.id) -// assertNotNull(workflowById) -// -// // Creates 5 documents -// insertSampleTimeSerializedData( -// index, -// listOf( -// "test_value_1", -// "test_value_1", // adding duplicate to verify aggregation -// "test_value_2", -// "test_value_2", -// "test_value_3", -// "test_value_3" -// ) -// ) -// -// val workflowId = workflowResponse.id -// // 1. Doc level monitor should reduce the doc findings to 4 (3 - test_value_2, 4 - test_value_2, 5 - test_value_3, 6 - test_value_3) -// // 2. Bucket level monitor will match the fetch the docs from current findings execution, although it contains rules for matching documents which has test_value_2 and test value_3 -// val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! -// assertNotNull(executeWorkflowResponse) -// -// for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) { -// when (monitorRunResults.monitorName) { -// // Verify first doc level monitor execution, alerts and findings -// docLevelMonitorResponse.monitor.name -> { -// assertEquals(1, monitorRunResults.inputResults.results.size) -// val values = monitorRunResults.triggerResults.values -// assertEquals(1, values.size) -// @Suppress("UNCHECKED_CAST") -// val docLevelTrigger = values.iterator().next() as DocumentLevelTriggerRunResult -// val triggeredDocIds = docLevelTrigger.triggeredDocs.map { it.split("|")[0] } -// val expectedTriggeredDocIds = listOf("3", "4", "5", "6") -// assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) -// -// val getAlertsResponse = -// assertAlerts( -// docLevelMonitorResponse.id, -// docLevelMonitorResponse.monitor.dataSources.alertsIndex, -// alertSize = 4, -// workflowId = workflowId -// ) -// assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 4) -// assertFindings( -// docLevelMonitorResponse.id, -// docLevelMonitorResponse.monitor.dataSources.findingsIndex, -// 4, -// 4, -// listOf("3", "4", "5", "6") -// ) -// } -// // Verify second bucket level monitor execution, alerts and findings -// bucketLevelMonitorResponse.monitor.name -> { -// val searchResult = monitorRunResults.inputResults.results.first() -// -// @Suppress("UNCHECKED_CAST") -// val buckets = -// searchResult -// .stringMap("aggregations")?.stringMap("composite_agg") -// ?.get("buckets") as List> -// assertEquals("Incorrect search result", 2, buckets.size) -// -// val getAlertsResponse = -// assertAlerts( -// bucketLevelMonitorResponse.id, -// bucketLevelMonitorResponse.monitor.dataSources.alertsIndex, -// alertSize = 2, -// workflowId -// ) -// assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2) -// assertFindings( -// bucketLevelMonitorResponse.id, -// bucketLevelMonitorResponse.monitor.dataSources.findingsIndex, -// 1, -// 4, -// listOf("3", "4", "5", "6") -// ) -// } -// // Verify third doc level monitor execution, alerts and findings -// docLevelMonitorResponse1.monitor.name -> { -// assertEquals(1, monitorRunResults.inputResults.results.size) -// val values = monitorRunResults.triggerResults.values -// assertEquals(1, values.size) -// @Suppress("UNCHECKED_CAST") -// val docLevelTrigger = values.iterator().next() as DocumentLevelTriggerRunResult -// val triggeredDocIds = docLevelTrigger.triggeredDocs.map { it.split("|")[0] } -// val expectedTriggeredDocIds = listOf("5", "6") -// assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) -// -// val getAlertsResponse = -// assertAlerts( -// docLevelMonitorResponse1.id, -// docLevelMonitorResponse1.monitor.dataSources.alertsIndex, -// alertSize = 2, -// workflowId -// ) -// assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse1.id, 2) -// assertFindings( -// docLevelMonitorResponse1.id, -// docLevelMonitorResponse1.monitor.dataSources.findingsIndex, -// 2, -// 2, -// listOf("5", "6") -// ) -// } -// // Verify fourth query level monitor execution -// queryMonitorResponse.monitor.name -> { -// assertEquals(1, monitorRunResults.inputResults.results.size) -// val values = monitorRunResults.triggerResults.values -// assertEquals(1, values.size) -// @Suppress("UNCHECKED_CAST") -// val totalHits = -// ( -// ( -// monitorRunResults.inputResults.results[0]["hits"] -// as kotlin.collections.Map -// )["total"] as kotlin.collections.Map -// )["value"] -// assertEquals(2, totalHits) -// @Suppress("UNCHECKED_CAST") -// val docIds = -// ( -// ( -// monitorRunResults.inputResults.results[0]["hits"] -// as kotlin.collections.Map -// )["hits"] as List> -// ) -// .map { it["_id"]!! } -// assertEquals(listOf("5", "6"), docIds.sorted()) -// } -// } -// } -// } + fun `test execute with custom alerts and finding index with bucket and doc monitor when doc monitor is used in chained finding`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1") + val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "2") + + var docLevelMonitor = randomDocumentLevelMonitor( + inputs = listOf(DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2))), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)), + dataSources = DataSources( + alertsIndex = "custom_alerts_index", + findingsIndex = "custom_findings_index", + findingsIndexPattern = "custom_findings_index-1" + ) + ) + + val docLevelMonitorResponse = createMonitor(docLevelMonitor)!! + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field_1").field("test_field_1") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + // Bucket level monitor will reduce the size of matched doc ids on those that belong to a bucket that contains more than 1 document after term grouping + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null, + ) + ) + + val bucketLevelMonitorResponse = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources( + findingsEnabled = true, + alertsIndex = "custom_alerts_index", + findingsIndex = "custom_findings_index", + findingsIndexPattern = "custom_findings_index-1" + ) + ) + )!! + + var docLevelMonitor1 = randomDocumentLevelMonitor( + // Match the documents with test_field_1: test_value_3 + inputs = listOf(DocLevelMonitorInput("description", listOf(index), listOf(docQuery2))), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)), + dataSources = DataSources( + findingsEnabled = true, + alertsIndex = "custom_alerts_index_1", + findingsIndex = "custom_findings_index_1", + findingsIndexPattern = "custom_findings_index_1-1" + ) + ) + + val docLevelMonitorResponse1 = createMonitor(docLevelMonitor1)!! + + val queryMonitorInput = SearchInput( + indices = listOf(index), + query = SearchSourceBuilder().query( + QueryBuilders + .rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + ) + ) + val queryTriggerScript = """ + return ctx.results[0].hits.hits.size() > 0 + """.trimIndent() + + val queryLevelTrigger = randomQueryLevelTrigger(condition = Script(queryTriggerScript)) + val queryMonitorResponse = + createMonitor(randomQueryLevelMonitor(inputs = listOf(queryMonitorInput), triggers = listOf(queryLevelTrigger)))!! + + // 1. docMonitor (chainedFinding = null) 2. bucketMonitor (chainedFinding = docMonitor) 3. docMonitor (chainedFinding = bucketMonitor) 4. queryMonitor (chainedFinding = docMonitor 3) + var workflow = randomWorkflow( + monitorIds = listOf( + docLevelMonitorResponse.id, + bucketLevelMonitorResponse.id, + docLevelMonitorResponse1.id, + queryMonitorResponse.id + ), + auditDelegateMonitorAlerts = false + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + // Creates 5 documents + insertSampleTimeSerializedData( + index, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2", + "test_value_2", + "test_value_3", + "test_value_3" + ) + ) + + val workflowId = workflowResponse.id + // 1. Doc level monitor should reduce the doc findings to 4 (3 - test_value_2, 4 - test_value_2, 5 - test_value_3, 6 - test_value_3) + // 2. Bucket level monitor will match the fetch the docs from current findings execution, although it contains rules for matching documents which has test_value_2 and test value_3 + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + assertNotNull(executeWorkflowResponse) + + for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) { + when (monitorRunResults.monitorName) { + // Verify first doc level monitor execution, alerts and findings + docLevelMonitorResponse.monitor.name -> { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + @Suppress("UNCHECKED_CAST") + val docLevelTrigger = values.iterator().next() as DocumentLevelTriggerRunResult + val triggeredDocIds = docLevelTrigger.triggeredDocs.map { it.split("|")[0] } + val expectedTriggeredDocIds = listOf("3", "4", "5", "6") + assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) + + val getAlertsResponse = + assertAlerts( + docLevelMonitorResponse.id, + docLevelMonitorResponse.monitor.dataSources.alertsIndex, + alertSize = 4, + workflowId = workflowId + ) + assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 4) + assertFindings( + docLevelMonitorResponse.id, + docLevelMonitorResponse.monitor.dataSources.findingsIndex, + 4, + 4, + listOf("3", "4", "5", "6") + ) + } + // Verify second bucket level monitor execution, alerts and findings + bucketLevelMonitorResponse.monitor.name -> { + val searchResult = monitorRunResults.inputResults.results.first() + + @Suppress("UNCHECKED_CAST") + val buckets = + searchResult + .stringMap("aggregations")?.stringMap("composite_agg") + ?.get("buckets") as List> + assertEquals("Incorrect search result", 2, buckets.size) + + val getAlertsResponse = + assertAlerts( + bucketLevelMonitorResponse.id, + bucketLevelMonitorResponse.monitor.dataSources.alertsIndex, + alertSize = 2, + workflowId + ) + assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2) + assertFindings( + bucketLevelMonitorResponse.id, + bucketLevelMonitorResponse.monitor.dataSources.findingsIndex, + 1, + 4, + listOf("3", "4", "5", "6") + ) + } + // Verify third doc level monitor execution, alerts and findings + docLevelMonitorResponse1.monitor.name -> { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + @Suppress("UNCHECKED_CAST") + val docLevelTrigger = values.iterator().next() as DocumentLevelTriggerRunResult + val triggeredDocIds = docLevelTrigger.triggeredDocs.map { it.split("|")[0] } + val expectedTriggeredDocIds = listOf("5", "6") + assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) + + val getAlertsResponse = + assertAlerts( + docLevelMonitorResponse1.id, + docLevelMonitorResponse1.monitor.dataSources.alertsIndex, + alertSize = 2, + workflowId + ) + assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse1.id, 2) + assertFindings( + docLevelMonitorResponse1.id, + docLevelMonitorResponse1.monitor.dataSources.findingsIndex, + 2, + 2, + listOf("5", "6") + ) + } + // Verify fourth query level monitor execution + queryMonitorResponse.monitor.name -> { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + @Suppress("UNCHECKED_CAST") + val totalHits = + ( + ( + monitorRunResults.inputResults.results[0]["hits"] + as kotlin.collections.Map + )["total"] as kotlin.collections.Map + )["value"] + assertEquals(2, totalHits) + @Suppress("UNCHECKED_CAST") + val docIds = + ( + ( + monitorRunResults.inputResults.results[0]["hits"] + as kotlin.collections.Map + )["hits"] as List> + ) + .map { it["_id"]!! } + assertEquals(listOf("5", "6"), docIds.sorted()) + } + } + } + } fun `test execute workflow input error`() { val docLevelInput = DocLevelMonitorInput(