Skip to content

Commit

Permalink
chained alert behavior changes
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Aug 4, 2023
1 parent 09911d2 commit 68e7685
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 60 deletions.
142 changes: 130 additions & 12 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -83,6 +84,26 @@ class AlertService(

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForWorkflow(workflow: Workflow, dataSources: DataSources): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
workflow = workflow,
size = workflow.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
dataSources = dataSources
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
.groupBy { it.triggerId }
foundAlerts.values.forEach { alerts ->
if (alerts.size > 1) {
logger.warn("Found multiple alerts for same trigger: $alerts")

Check warning on line 98 in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt#L98

Added line #L98 was not covered by tests
}
}

return workflow.triggers.associateWith { trigger ->
foundAlerts[trigger.id]?.firstOrNull()
}
}

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
Expand Down Expand Up @@ -257,18 +278,84 @@ class AlertService(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
associatedAlertIds: List<String>,
result: ChainedAlertTriggerRunResult,
alertError: AlertError? = null,
): Alert? {

val currentTime = Instant.now()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {

Check warning on line 297 in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt#L294-L297

Added lines #L294 - L297 were not covered by tests
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1

Check warning on line 302 in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt#L300-L302

Added lines #L300 - L302 were not covered by tests
)
)

else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))

Check warning on line 306 in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt#L306

Added line #L306 was not covered by tests
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
)
} else {
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(
state = Alert.State.COMPLETED,
endTime = currentTime,
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = Alert.State.ACTIVE
currentAlert.copy(
state = alertState,
lastNotificationTime = currentTime,
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR

Check warning on line 347 in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt#L347

Added line #L347 was not covered by tests
Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}
}

fun updateActionResultsForBucketLevelAlert(
Expand Down Expand Up @@ -758,6 +845,37 @@ class AlertService(
return searchResponse
}

/**
* Searches for Alerts in the monitor's alertIndex.
*
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(
workflow: Workflow,
size: Int,
dataSources: DataSources,
): SearchResponse {
val workflowId = workflow.id
val alertIndex = dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId))
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, ""))
val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)

val searchRequest = SearchRequest(alertIndex)
.routing(workflowId)
.source(searchSourceBuilder)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
if (searchResponse.status() != RestStatus.OK) {
throw (searchResponse.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts"))
}
return searchResponse
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
Expand Down
10 changes: 10 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
Expand Down Expand Up @@ -52,6 +53,15 @@ class TriggerService(val scriptService: ScriptService) {
return result.triggered && !suppress
}

fun isChainedAlertTriggerActionable(
ctx: ChainedAlertTriggerExecutionContext,
result: ChainedAlertTriggerRunResult,
): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null
return result.triggered && !suppress
}

fun runQueryLevelTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.script

import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
import org.opensearch.commons.alerting.model.Workflow
import java.time.Instant
Expand All @@ -18,15 +19,17 @@ data class ChainedAlertTriggerExecutionContext(
val error: Exception? = null,
val trigger: ChainedAlertTrigger,
val alertGeneratingMonitors: Set<String>,
val monitorIdToAlertIdsMap: Map<String, Set<String>>
val monitorIdToAlertIdsMap: Map<String, Set<String>>,
val alert: Alert? = null
) {

constructor(
workflow: Workflow,
workflowRunResult: WorkflowRunResult,
trigger: ChainedAlertTrigger,
alertGeneratingMonitors: Set<String>,
monitorIdToAlertIdsMap: Map<String, Set<String>>
monitorIdToAlertIdsMap: Map<String, Set<String>>,
alert: Alert? = null

Check warning on line 32 in alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/script/ChainedAlertTriggerExecutionContext.kt#L32

Added line #L32 was not covered by tests
) :
this(
workflow,
Expand All @@ -36,7 +39,8 @@ data class ChainedAlertTriggerExecutionContext(
workflowRunResult.error,
trigger,
alertGeneratingMonitors,
monitorIdToAlertIdsMap
monitorIdToAlertIdsMap,
alert
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,14 @@ class TransportGetWorkflowAlertsAction @Inject constructor(
}

fun resolveAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
return if (getAlertsRequest.alertIndex.isNullOrEmpty()) AlertIndices.ALERT_INDEX
else getAlertsRequest.alertIndex!!
var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN
if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) {
alertIndex = getAlertsRequest.alertIndex!!
}
return if (alertIndex == AlertIndices.ALERT_INDEX)
AlertIndices.ALL_ALERT_INDEX_PATTERN

Check warning on line 165 in alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAlertsAction.kt#L165

Added line #L165 was not covered by tests
else
alertIndex
}

fun resolveAssociatedAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,36 +150,67 @@ object CompositeWorkflowRunner : WorkflowRunner() {
error = lastErrorDelegateRun,
triggerResults = triggerResults
)
if (dataSources != null) {
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources)
val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow)
for (trigger in workflow.triggers) {
val caTrigger = trigger as ChainedAlertTrigger
val triggerCtx = ChainedAlertTriggerExecutionContext(
workflow = workflow,
workflowRunResult = workflowRunResult,
trigger = caTrigger,
alertGeneratingMonitors = monitorIdToAlertIdsMap.keys,
monitorIdToAlertIdsMap = monitorIdToAlertIdsMap
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources!!)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(dataSources)
monitorCtx.alertService!!.loadCurrentAlertsForWorkflow(workflow, dataSources)
} catch (e: Exception) {
logger.error("Failed to fetch current alerts for workflow", e)

Check warning on line 158 in alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt#L157-L158

Added lines #L157 - L158 were not covered by tests
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading alerts for workflow: $id", e)
return workflowRunResult.copy(error = e)

Check warning on line 162 in alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt#L161-L162

Added lines #L161 - L162 were not covered by tests
}
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources)
val updatedAlerts = mutableListOf<Alert>()
val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow)
for (trigger in workflow.triggers) {
val currentAlert = currentAlerts[trigger]
val caTrigger = trigger as ChainedAlertTrigger
val triggerCtx = ChainedAlertTriggerExecutionContext(
workflow = workflow,
workflowRunResult = workflowRunResult,
trigger = caTrigger,
alertGeneratingMonitors = monitorIdToAlertIdsMap.keys,
monitorIdToAlertIdsMap = monitorIdToAlertIdsMap,
alert = currentAlert
)
runChainedAlertTrigger(
monitorCtx,
workflow,
trigger,
executionId,
triggerCtx,
dryRun,
triggerResults,
updatedAlerts
)
}
if (!dryRun && workflow.id != Workflow.NO_ID && updatedAlerts.isNotEmpty()) {
monitorCtx.retryPolicy?.let {
monitorCtx.alertService!!.saveAlerts(
dataSources,
updatedAlerts,
it,
routingId = workflow.id
)
runChainedAlertTrigger(dataSources, monitorCtx, workflow, trigger, executionId, triggerCtx, dryRun, triggerResults)
}
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading current chained alerts for workflow: $id", e)
return WorkflowRunResult(
workflowId = workflow.id,
workflowName = workflow.name,
monitorRunResults = emptyList(),
executionStartTime = workflowExecutionStartTime,
executionEndTime = Instant.now(),
executionId = executionId,
error = AlertingException.wrap(e),
triggerResults = emptyMap()
)
}
} catch (e: Exception) {

Check warning on line 200 in alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt#L200

Added line #L200 was not covered by tests
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id
logger.error("Error loading current chained alerts for workflow: $id", e)
return WorkflowRunResult(
workflowId = workflow.id,
workflowName = workflow.name,
monitorRunResults = emptyList(),
executionStartTime = workflowExecutionStartTime,
executionEndTime = Instant.now(),
executionId = executionId,
error = AlertingException.wrap(e),
triggerResults = emptyMap()

Check warning on line 212 in alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt

View check run for this annotation

Codecov / codecov/patch

alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt#L203-L212

Added lines #L203 - L212 were not covered by tests
)
}
workflowRunResult.executionEndTime = Instant.now()

Expand Down Expand Up @@ -260,37 +291,30 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}

private suspend fun runChainedAlertTrigger(
dataSources: DataSources,
monitorCtx: MonitorRunnerExecutionContext,
workflow: Workflow,
trigger: ChainedAlertTrigger,
executionId: String,
triggerCtx: ChainedAlertTriggerExecutionContext,
dryRun: Boolean,
triggerResults: MutableMap<String, ChainedAlertTriggerRunResult>,
updatedAlerts: MutableList<Alert>,
) {
val triggerRunResult = monitorCtx.triggerService!!.runChainedAlertTrigger(
workflow, trigger, triggerCtx.alertGeneratingMonitors, triggerCtx.monitorIdToAlertIdsMap
)
triggerResults[trigger.id] = triggerRunResult
if (triggerRunResult.triggered) {
if (monitorCtx.triggerService!!.isChainedAlertTriggerActionable(triggerCtx, triggerRunResult)) {
val actionCtx = triggerCtx
for (action in trigger.actions) {
triggerRunResult.actionResults[action.id] = this.runAction(action, actionCtx, monitorCtx, workflow, dryRun)
}
val alert = monitorCtx.alertService!!.composeChainedAlert(
triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList()
)
if (!dryRun && workflow.id != Workflow.NO_ID) {
monitorCtx.retryPolicy?.let {
monitorCtx.alertService!!.saveAlerts(
dataSources,
listOf(alert),
it,
routingId = workflow.id
)
}
}
}
val alert = monitorCtx.alertService!!.composeChainedAlert(
triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList(), triggerRunResult
)
if (alert != null) {
updatedAlerts.add(alert)
}
}

Expand Down
Loading

0 comments on commit 68e7685

Please sign in to comment.