diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index 5a5f22fb3..f2eace7e9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -27,6 +27,9 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.MonitorMetadataService +import org.opensearch.alerting.MonitorRunnerService.monitorCtx +import org.opensearch.alerting.WorkflowMetadataService import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.addFilter @@ -43,6 +46,7 @@ import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.util.isQueryLevelMonitor import org.opensearch.alerting.util.use +import org.opensearch.alerting.workflow.CompositeWorkflowRunner import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject @@ -372,6 +376,37 @@ class TransportIndexWorkflowAction @Inject constructor( ) return } + + val createdWorkflow = request.workflow.copy(id = indexResponse.id) + val executionId = CompositeWorkflowRunner.generateExecutionId(false, createdWorkflow) + + val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata( + workflow = createdWorkflow, + skipIndex = false, + executionId = executionId + ) + + val delegates = (createdWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } + val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) + + for (monitor in monitors) { + var (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata( + monitor = monitor, + createWithRunContext = true, + workflowMetadataId = workflowMetadata.id + ) + + if (created == false) { + log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!") + } + + if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor) + monitorMetadata = monitorMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping) + } + // When inserting queries in queryIndex we could update sourceToQueryIndexMapping + MonitorMetadataService.upsertMetadata(monitorMetadata, updating = true) + } actionListener.onResponse( IndexWorkflowResponse( indexResponse.id, indexResponse.version, indexResponse.seqNo, @@ -499,6 +534,33 @@ class TransportIndexWorkflowAction @Inject constructor( ) return } + + val updatedWorkflow = request.workflow.copy(id = indexResponse.id) + val executionId = CompositeWorkflowRunner.generateExecutionId(false, updatedWorkflow) + + val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata( + workflow = updatedWorkflow, + skipIndex = false, + executionId = executionId + ) + + val delegates = (updatedWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } + val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) + + for (monitor in monitors) { + val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata( + monitor = monitor, + createWithRunContext = true, + workflowMetadataId = workflowMetadata.id + ) + + if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + var updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor) + val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor) + updatedMetadata = updatedMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping) + MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) + } + } actionListener.onResponse( IndexWorkflowResponse( indexResponse.id, indexResponse.version, indexResponse.seqNo, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 0dbf7eac2..cfed18c89 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -277,7 +277,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { } } - private fun generateExecutionId( + fun generateExecutionId( isTempWorkflow: Boolean, workflow: Workflow, ): String { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index e623dbb9a..2ae177b3c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -3150,8 +3150,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { searchWorkflowMetadata(id = workflowId) } catch (ex: java.lang.Exception) { exception = ex + assertTrue(exception is java.util.NoSuchElementException) } - assertTrue(exception is java.util.NoSuchElementException) } fun `test execute workflow with custom alerts and finding index with bucket and doc monitor bucket monitor used as chained finding`() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 33ae93717..9cd2c5e26 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -1140,4 +1140,49 @@ class WorkflowRestApiIT : AlertingRestTestCase() { val acknowledged = acknowledgeChainedAlertsResponse["success"] as List Assert.assertEquals(acknowledged[0], alerts1[0]["id"]) } + + fun `test run workflow as scheduled job success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = false + ) + val monitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_field" : "us-west-2" + }""" + + indexDoc(index, "1", testDoc) + Thread.sleep(80000) + + val findings = searchFindings(monitor.copy(id = monitorResponse.id)) + assertEquals("Findings saved for test monitor", 1, findings.size) + } }