Skip to content

Commit

Permalink
per document alerting in bucket level monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Aug 8, 2023
1 parent ca28061 commit 276ed12
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 76 deletions.
5 changes: 0 additions & 5 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,4 @@ run {
useCluster testClusters.integTest
}

// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: '../build-tools/opensearchplugin-coverage.gradle'
}

apply from: '../build-tools/pkgbuild.gradle'
142 changes: 130 additions & 12 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -83,6 +84,26 @@ class AlertService(

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForWorkflow(workflow: Workflow, dataSources: DataSources): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
workflow = workflow,
size = workflow.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
dataSources = dataSources
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
.groupBy { it.triggerId }
foundAlerts.values.forEach { alerts ->
if (alerts.size > 1) {
logger.warn("Found multiple alerts for same trigger: $alerts")
}
}

return workflow.triggers.associateWith { trigger ->
foundAlerts[trigger.id]?.firstOrNull()
}
}

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
Expand Down Expand Up @@ -257,18 +278,84 @@ class AlertService(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
associatedAlertIds: List<String>,
result: ChainedAlertTriggerRunResult,
alertError: AlertError? = null,
): Alert? {

val currentTime = Instant.now()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)

else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
)
} else {
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(
state = Alert.State.COMPLETED,
endTime = currentTime,
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = Alert.State.ACTIVE
currentAlert.copy(
state = alertState,
lastNotificationTime = currentTime,
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR
Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}
}

fun updateActionResultsForBucketLevelAlert(
Expand Down Expand Up @@ -762,6 +849,37 @@ class AlertService(
return searchResponse
}

/**
* Searches for Alerts in the monitor's alertIndex.
*
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(
workflow: Workflow,
size: Int,
dataSources: DataSources,
): SearchResponse {
val workflowId = workflow.id
val alertIndex = dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId))
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, ""))
val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)

val searchRequest = SearchRequest(alertIndex)
.routing(workflowId)
.source(searchSourceBuilder)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
if (searchResponse.status() != RestStatus.OK) {
throw (searchResponse.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts"))
}
return searchResponse
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -51,6 +52,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.UUID
import java.util.stream.Collectors

object BucketLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -151,15 +153,17 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (triggerResults[trigger.id]?.error != null) continue
val findings =
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) {
logger.debug("Creating bucket level findings")
createFindings(
logger.debug("Creating bucket level monitor findings. MonitorId: ${monitor.id}")
createFindingsAndPerDocumentAlerts(
triggerResult,
monitor,
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
executionId,
workflowRunContext,
trigger
)
} else {
emptyList()
Expand Down Expand Up @@ -351,14 +355,16 @@ object BucketLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
}

private suspend fun createFindings(
private suspend fun createFindingsAndPerDocumentAlerts(
triggerResult: BucketLevelTriggerRunResult,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
shouldCreateFinding: Boolean,
executionId: String,
workflowRunContext: WorkflowRunContext?,
trigger: BucketLevelTrigger
): List<String> {
monitor.inputs.forEach { input ->
if (input is SearchInput) {
Expand Down Expand Up @@ -417,7 +423,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
sr.source().query(queryBuilder)
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
val findings = createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
createPerDocumentAlerts(monitorCtx, findings, workflowRunContext, executionId, monitor, trigger)
return findings.stream().map { it.id }.collect(Collectors.toList())
} else {
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
}
Expand All @@ -426,20 +434,50 @@ object BucketLevelMonitorRunner : MonitorRunner() {
return listOf()
}

private suspend fun createPerDocumentAlerts(
monitorCtx: MonitorRunnerExecutionContext,
findings: List<Finding>,
workflowRunContext: WorkflowRunContext?,
executionId: String,
monitor: Monitor,
trigger: BucketLevelTrigger,
) {
val currentTime = Instant.now()
val alerts = mutableListOf<Alert>()
findings.forEach {
val alert = Alert(
id = UUID.randomUUID().toString(),
monitor = monitor,
startTime = currentTime,
lastNotificationTime = currentTime,
state = Alert.State.ACTIVE,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
findingIds = listOf(it.id),
relatedDocIds = it.relatedDocIds,
executionId = executionId,
workflowId = workflowRunContext?.workflowId ?: "",
severity = trigger.severity, //
actionExecutionResults = listOf(), // todo
)
alerts.add(alert)
}
monitorCtx.alertService!!.saveAlerts(monitor.dataSources, alerts, monitorCtx.retryPolicy!!, false, monitor.id)
}

private suspend fun createFindingPerIndex(
searchResponse: SearchResponse,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null
): List<String> {
workflowExecutionId: String? = null,
): List<Finding> {
val docIdsByIndexName: MutableMap<String, MutableList<String>> = mutableMapOf()
for (hit in searchResponse.hits.hits) {
val ids = docIdsByIndexName.getOrDefault(hit.index, mutableListOf())
ids.add(hit.id)
docIdsByIndexName[hit.index] = ids
}
val findings = mutableListOf<String>()
val findings = mutableListOf<Finding>()
var requestsToRetry: MutableList<IndexRequest> = mutableListOf()
docIdsByIndexName.entries.forEach { it ->
run {
Expand All @@ -464,15 +502,15 @@ object BucketLevelMonitorRunner : MonitorRunner() {
.routing(finding.id)
requestsToRetry.add(indexRequest)
}
findings.add(finding.id)
findings.add(finding)
}
}
if (requestsToRetry.isEmpty()) return listOf()
monitorCtx.retryPolicy!!.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.bulk(bulkRequest, it) }
val findingsBeingRetried = mutableListOf<Finding>()
requestsToRetry = mutableListOf()
val findingsBeingRetried = mutableListOf<Alert>()
bulkResponse.items.forEach { item ->
if (item.isFailed) {
if (item.status() == RestStatus.TOO_MANY_REQUESTS) {
Expand Down
10 changes: 10 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
Expand Down Expand Up @@ -52,6 +53,15 @@ class TriggerService(val scriptService: ScriptService) {
return result.triggered && !suppress
}

fun isChainedAlertTriggerActionable(
ctx: ChainedAlertTriggerExecutionContext,
result: ChainedAlertTriggerRunResult,
): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null
return result.triggered && !suppress
}

fun runQueryLevelTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.script

import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
import org.opensearch.commons.alerting.model.Workflow
import java.time.Instant
Expand All @@ -18,15 +19,17 @@ data class ChainedAlertTriggerExecutionContext(
val error: Exception? = null,
val trigger: ChainedAlertTrigger,
val alertGeneratingMonitors: Set<String>,
val monitorIdToAlertIdsMap: Map<String, Set<String>>
val monitorIdToAlertIdsMap: Map<String, Set<String>>,
val alert: Alert? = null
) {

constructor(
workflow: Workflow,
workflowRunResult: WorkflowRunResult,
trigger: ChainedAlertTrigger,
alertGeneratingMonitors: Set<String>,
monitorIdToAlertIdsMap: Map<String, Set<String>>
monitorIdToAlertIdsMap: Map<String, Set<String>>,
alert: Alert? = null
) :
this(
workflow,
Expand All @@ -36,7 +39,8 @@ data class ChainedAlertTriggerExecutionContext(
workflowRunResult.error,
trigger,
alertGeneratingMonitors,
monitorIdToAlertIdsMap
monitorIdToAlertIdsMap,
alert
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,14 @@ class TransportGetWorkflowAlertsAction @Inject constructor(
}

fun resolveAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
return if (getAlertsRequest.alertIndex.isNullOrEmpty()) AlertIndices.ALERT_INDEX
else getAlertsRequest.alertIndex!!
var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN
if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) {
alertIndex = getAlertsRequest.alertIndex!!
}
return if (alertIndex == AlertIndices.ALERT_INDEX)
AlertIndices.ALL_ALERT_INDEX_PATTERN
else
alertIndex
}

fun resolveAssociatedAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
Expand Down
Loading

0 comments on commit 276ed12

Please sign in to comment.