Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add per document alerting in bucket level monitor for security analytics #1081

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,4 @@ run {
useCluster testClusters.integTest
}

// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: '../build-tools/opensearchplugin-coverage.gradle'
}

apply from: '../build-tools/pkgbuild.gradle'
142 changes: 130 additions & 12 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -83,6 +84,26 @@ class AlertService(

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlertsForWorkflow(workflow: Workflow, dataSources: DataSources): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
workflow = workflow,
size = workflow.triggers.size * 2, // We expect there to be only a single in-progress alert so fetch 2 to check
dataSources = dataSources
)

val foundAlerts = searchAlertsResponse.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
.groupBy { it.triggerId }
foundAlerts.values.forEach { alerts ->
if (alerts.size > 1) {
logger.warn("Found multiple alerts for same trigger: $alerts")
}
}

return workflow.triggers.associateWith { trigger ->
foundAlerts[trigger.id]?.firstOrNull()
}
}

suspend fun loadCurrentAlertsForQueryLevelMonitor(monitor: Monitor, workflowRunContext: WorkflowRunContext?): Map<Trigger, Alert?> {
val searchAlertsResponse: SearchResponse = searchAlerts(
monitor = monitor,
Expand Down Expand Up @@ -257,18 +278,84 @@ class AlertService(
ctx: ChainedAlertTriggerExecutionContext,
executionId: String,
workflow: Workflow,
associatedAlertIds: List<String>
): Alert {
return Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
associatedAlertIds: List<String>,
result: ChainedAlertTriggerRunResult,
alertError: AlertError? = null,
): Alert? {

val currentTime = Instant.now()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)

else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
)
} else {
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(
state = Alert.State.COMPLETED,
endTime = currentTime,
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = Alert.State.ACTIVE
currentAlert.copy(
state = alertState,
lastNotificationTime = currentTime,
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
)
} else {
if (alertError == null) Alert.State.ACTIVE
else Alert.State.ERROR
Alert(
startTime = Instant.now(),
lastNotificationTime = Instant.now(),
state = Alert.State.ACTIVE,
errorMessage = null, schemaVersion = -1,
chainedAlertTrigger = ctx.trigger,
executionId = executionId,
workflow = workflow,
associatedAlertIds = associatedAlertIds
)
}
}

fun updateActionResultsForBucketLevelAlert(
Expand Down Expand Up @@ -762,6 +849,37 @@ class AlertService(
return searchResponse
}

/**
* Searches for Alerts in the monitor's alertIndex.
*
* @param monitorId The Monitor to get Alerts for
* @param size The number of search hits (Alerts) to return
*/
private suspend fun searchAlerts(
workflow: Workflow,
size: Int,
dataSources: DataSources,
): SearchResponse {
val workflowId = workflow.id
val alertIndex = dataSources.alertsIndex

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId))
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, ""))
val searchSourceBuilder = SearchSourceBuilder()
.size(size)
.query(queryBuilder)

val searchRequest = SearchRequest(alertIndex)
.routing(workflowId)
.source(searchSourceBuilder)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
if (searchResponse.status() != RestStatus.OK) {
throw (searchResponse.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts"))
}
return searchResponse
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -51,6 +52,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.UUID
import java.util.stream.Collectors

object BucketLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -151,15 +153,17 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (triggerResults[trigger.id]?.error != null) continue
val findings =
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) {
logger.debug("Creating bucket level findings")
createFindings(
logger.debug("Creating bucket level monitor findings. MonitorId: ${monitor.id}")
createFindingsAndPerDocumentAlerts(
triggerResult,
monitor,
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
executionId,
workflowRunContext,
trigger
)
} else {
emptyList()
Expand Down Expand Up @@ -351,14 +355,16 @@ object BucketLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
}

private suspend fun createFindings(
private suspend fun createFindingsAndPerDocumentAlerts(
triggerResult: BucketLevelTriggerRunResult,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
shouldCreateFinding: Boolean,
executionId: String,
workflowRunContext: WorkflowRunContext?,
trigger: BucketLevelTrigger
): List<String> {
monitor.inputs.forEach { input ->
if (input is SearchInput) {
Expand Down Expand Up @@ -417,7 +423,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
sr.source().query(queryBuilder)
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
val findings = createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
createPerDocumentAlerts(monitorCtx, findings, workflowRunContext, executionId, monitor, trigger)
return findings.stream().map { it.id }.collect(Collectors.toList())
} else {
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
}
Expand All @@ -426,20 +434,50 @@ object BucketLevelMonitorRunner : MonitorRunner() {
return listOf()
}

private suspend fun createPerDocumentAlerts(
monitorCtx: MonitorRunnerExecutionContext,
findings: List<Finding>,
workflowRunContext: WorkflowRunContext?,
executionId: String,
monitor: Monitor,
trigger: BucketLevelTrigger,
) {
val currentTime = Instant.now()
val alerts = mutableListOf<Alert>()
findings.forEach {
val alert = Alert(
id = UUID.randomUUID().toString(),
monitor = monitor,
startTime = currentTime,
lastNotificationTime = currentTime,
state = Alert.State.ACTIVE,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
findingIds = listOf(it.id),
relatedDocIds = it.relatedDocIds,
executionId = executionId,
workflowId = workflowRunContext?.workflowId ?: "",
severity = trigger.severity, //
actionExecutionResults = listOf(), // todo
)
alerts.add(alert)
}
monitorCtx.alertService!!.saveAlerts(monitor.dataSources, alerts, monitorCtx.retryPolicy!!, false, monitor.id)
}

private suspend fun createFindingPerIndex(
searchResponse: SearchResponse,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null
): List<String> {
workflowExecutionId: String? = null,
): List<Finding> {
val docIdsByIndexName: MutableMap<String, MutableList<String>> = mutableMapOf()
for (hit in searchResponse.hits.hits) {
val ids = docIdsByIndexName.getOrDefault(hit.index, mutableListOf())
ids.add(hit.id)
docIdsByIndexName[hit.index] = ids
}
val findings = mutableListOf<String>()
val findings = mutableListOf<Finding>()
var requestsToRetry: MutableList<IndexRequest> = mutableListOf()
docIdsByIndexName.entries.forEach { it ->
run {
Expand All @@ -464,15 +502,15 @@ object BucketLevelMonitorRunner : MonitorRunner() {
.routing(finding.id)
requestsToRetry.add(indexRequest)
}
findings.add(finding.id)
findings.add(finding)
}
}
if (requestsToRetry.isEmpty()) return listOf()
monitorCtx.retryPolicy!!.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.bulk(bulkRequest, it) }
val findingsBeingRetried = mutableListOf<Finding>()
requestsToRetry = mutableListOf()
val findingsBeingRetried = mutableListOf<Alert>()
bulkResponse.items.forEach { item ->
if (item.isFailed) {
if (item.status() == RestStatus.TOO_MANY_REQUESTS) {
Expand Down
10 changes: 10 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
Expand Down Expand Up @@ -52,6 +53,15 @@ class TriggerService(val scriptService: ScriptService) {
return result.triggered && !suppress
}

fun isChainedAlertTriggerActionable(
ctx: ChainedAlertTriggerExecutionContext,
result: ChainedAlertTriggerRunResult,
): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null
return result.triggered && !suppress
}

fun runQueryLevelTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.script

import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
import org.opensearch.commons.alerting.model.Workflow
import java.time.Instant
Expand All @@ -18,15 +19,17 @@ data class ChainedAlertTriggerExecutionContext(
val error: Exception? = null,
val trigger: ChainedAlertTrigger,
val alertGeneratingMonitors: Set<String>,
val monitorIdToAlertIdsMap: Map<String, Set<String>>
val monitorIdToAlertIdsMap: Map<String, Set<String>>,
val alert: Alert? = null
) {

constructor(
workflow: Workflow,
workflowRunResult: WorkflowRunResult,
trigger: ChainedAlertTrigger,
alertGeneratingMonitors: Set<String>,
monitorIdToAlertIdsMap: Map<String, Set<String>>
monitorIdToAlertIdsMap: Map<String, Set<String>>,
alert: Alert? = null
) :
this(
workflow,
Expand All @@ -36,7 +39,8 @@ data class ChainedAlertTriggerExecutionContext(
workflowRunResult.error,
trigger,
alertGeneratingMonitors,
monitorIdToAlertIdsMap
monitorIdToAlertIdsMap,
alert
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,14 @@ class TransportGetWorkflowAlertsAction @Inject constructor(
}

fun resolveAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
return if (getAlertsRequest.alertIndex.isNullOrEmpty()) AlertIndices.ALERT_INDEX
else getAlertsRequest.alertIndex!!
var alertIndex = AlertIndices.ALL_ALERT_INDEX_PATTERN
if (getAlertsRequest.alertIndex.isNullOrEmpty() == false) {
alertIndex = getAlertsRequest.alertIndex!!
}
return if (alertIndex == AlertIndices.ALERT_INDEX)
AlertIndices.ALL_ALERT_INDEX_PATTERN
else
alertIndex
}

fun resolveAssociatedAlertsIndexName(getAlertsRequest: GetWorkflowAlertsRequest): String {
Expand Down
Loading
Loading