diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index f28b03292..0164aa00e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -374,6 +374,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { // Clean up any queries created by the dry run monitor monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata) } + // TODO: Update the Document as part of the Trigger and return back the trigger action result return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults) } catch (e: Exception) { @@ -387,6 +388,17 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) } finally { + if (monitor.deleteQueryIndexInEveryRun == true && + monitorCtx.docLevelMonitorQueries!!.docLevelQueryIndexExists(monitor.dataSources) + ) { + val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources) + if (!ack) { + logger.error( + "Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd! " + + "for monitor ${monitor.id}" + ) + } + } val endTime = System.currentTimeMillis() totalTimeTakenStat = endTime - startTime logger.debug( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt index 8c6eb4b69..a0bccec0a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt @@ -95,57 +95,69 @@ object DeleteMonitorService : private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { try { - val metadata = MonitorMetadataService.getMetadata(monitor) - metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> + if (monitor.deleteQueryIndexInEveryRun == false) { + val metadata = MonitorMetadataService.getMetadata(monitor) + metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> - val indicesExistsResponse: IndicesExistsResponse = - client.suspendUntil { - client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return } - if (indicesExistsResponse.isExists == false) { - return - } - // Check if there's any queries from other monitors in this queryIndex, - // to avoid unnecessary doc deletion, if we could just delete index completely - val searchResponse: SearchResponse = client.suspendUntil { - search( - SearchRequest(queryIndex).source( - SearchSourceBuilder() - .size(0) - .query( - QueryBuilders.boolQuery().mustNot( - QueryBuilders.matchQuery("monitor_id", monitor.id) + // Check if there's any queries from other monitors in this queryIndex, + // to avoid unnecessary doc deletion, if we could just delete index completely + val searchResponse: SearchResponse = client.suspendUntil { + search( + SearchRequest(queryIndex).source( + SearchSourceBuilder() + .size(0) + .query( + QueryBuilders.boolQuery().mustNot( + QueryBuilders.matchQuery("monitor_id", monitor.id) + ) ) - ) - ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), - it - ) - } - if (searchResponse.hits.totalHits.value == 0L) { - val ack: AcknowledgedResponse = client.suspendUntil { - client.admin().indices().delete( - DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it ) } - if (ack.isAcknowledged == false) { - log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") - } - } else { - // Delete all queries added by this monitor - val response: BulkByScrollResponse = suspendCoroutine { cont -> - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) - .refresh(true) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) - override fun onFailure(t: Exception) = cont.resumeWithException(t) - } + if (searchResponse.hits.totalHits.value == 0L) { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") + } + } else { + // Delete all queries added by this monitor + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) + } } } + } else { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(monitor.dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it + ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") + } } } catch (e: Exception) { // we only log the error and don't fail the request because if monitor document has been deleted successfully, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 36e1d83f4..aad64f1e3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -537,7 +537,8 @@ class TransportIndexMonitorAction @Inject constructor( if ( request.monitor.isMonitorOfStandardType() && Monitor.MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) == - Monitor.MonitorType.DOC_LEVEL_MONITOR + Monitor.MonitorType.DOC_LEVEL_MONITOR && + request.monitor.deleteQueryIndexInEveryRun == false ) { indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) } @@ -702,13 +703,22 @@ class TransportIndexMonitorAction @Inject constructor( Monitor.MonitorType.valueOf(currentMonitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR ) { updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor) - client.suspendUntil { - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(currentMonitor.dataSources.queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) - .execute(it) + if (docLevelMonitorQueries.docLevelQueryIndexExists(currentMonitor.dataSources)) { + client.suspendUntil { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(currentMonitor.dataSources.queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) + .execute(it) + } + } + if (currentMonitor.deleteQueryIndexInEveryRun == false) { + indexDocLevelMonitorQueries( + request.monitor, + currentMonitor.id, + updatedMetadata, + request.refreshPolicy + ) } - indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy) MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) } actionListener.onResponse( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 60daaa4cc..d2be2c8c0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -24,6 +24,7 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest +import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorRunnerService.monitorCtx @@ -181,6 +182,16 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } } + suspend fun deleteDocLevelQueryIndex(dataSources: DataSources): Boolean { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it + ) + } + return ack.isAcknowledged + } + fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { val clusterState = clusterService.state() return clusterState.metadata.hasAlias(dataSources.queryIndex) @@ -434,6 +445,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) ) indexRequests.add(indexRequest) + log.debug("query $query added for execution of monitor $monitorId on index $sourceIndex") } log.debug("bulk inserting percolate [${queries.size}] queries") if (indexRequests.isNotEmpty()) { @@ -478,7 +490,12 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ updatedProperties: MutableMap ): Pair { var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] - if (targetQueryIndex == null) { + if ( + targetQueryIndex == null || ( + targetQueryIndex != monitor.dataSources.queryIndex && + monitor.deleteQueryIndexInEveryRun == true + ) + ) { // queryIndex is alias which will always have only 1 backing index which is writeIndex // This is due to a fact that that _rollover API would maintain only single index under alias // if you don't add is_write_index setting when creating index initially diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 7d16cbabe..a4e5ad08f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -1195,7 +1195,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { dataSources = DataSources( queryIndex = customQueryIndex, queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), - ) + ), + owner = "alerting" ) try { createMonitor(monitor) @@ -2379,7 +2380,9 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) var monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), - triggers = listOf(trigger) + triggers = listOf(trigger), + dataSources = DataSources(), + owner = "alerting" ) // This doc should create close to 10000 (limit) fields in index mapping. It's easier to add mappings like this then via api val docPayload: StringBuilder = StringBuilder(100000) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt index 280074964..c2f6b5fd0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt @@ -166,6 +166,7 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { val indexName = "test_bwc_index" val bwcMonitorString = """ { + "owner": "alerting", "type": "monitor", "name": "test_bwc_monitor", "enabled": true, diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 2651c862e..311cc6d84 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -293,6 +293,9 @@ } } }, + "delete_query_index_in_every_run": { + "type": "boolean" + }, "ui_metadata": { "type": "object", "enabled": false diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 0340baa6b..a8f384cb6 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -94,6 +94,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient "id", null)), trigger1Serialized)), Map.of(), new DataSources(), + true, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( @@ -154,6 +155,7 @@ public void onFailure(Exception e) { List.of(), Map.of(), new DataSources(), + true, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( @@ -237,6 +239,7 @@ public void onFailure(Exception e) { "id", null)), trigger1Serialized)), Map.of(), new DataSources(), + true, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(