Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Alerting Enhancements: Alerting Comments (Experimental) (#1561) #1572

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -157,7 +158,7 @@ class AlertService(
workflorwRunContext: WorkflowRunContext?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
val currentAlert = ctx.alertContext?.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
Expand Down Expand Up @@ -686,6 +687,8 @@ class AlertService(
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex

val commentIdsToDelete = mutableListOf<String>()

var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand Down Expand Up @@ -732,13 +735,22 @@ class AlertService(
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertsIndex, alert.id)
.routing(routingId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
// Only add completed alert to history index if history is enabled
IndexRequest(alertsHistoryIndex)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
} else {
// Otherwise, prepare the Alert's comments for deletion, and don't include
// a request to index the Alert to an Alert history index.
// The delete request can't be added to the list of DocWriteRequests because
// Comments are stored in aliased history indices, not a concrete Comments
// index like Alerts. A DeleteBy request will be used to delete Comments, instead
// of a regular Delete request
commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)))
null
}
)
}
}
Expand All @@ -758,6 +770,9 @@ class AlertService(
throw ExceptionsHelper.convertToOpenSearchException(retryCause)
}
}

// delete all the comments of any Alerts that were deleted
CommentsUtils.deleteComments(client, commentIdsToDelete)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.comments.CommentsIndices
import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
Expand All @@ -27,6 +28,7 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteAlertingCommentAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
Expand All @@ -40,8 +42,10 @@ import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexAlertingCommentAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
Expand All @@ -54,6 +58,7 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteAlertingCommentAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
Expand All @@ -68,8 +73,10 @@ import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexAlertingCommentAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
Expand Down Expand Up @@ -158,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"

@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val COMMENTS_BASE_URI = "/_plugins/_alerting/comments"

@JvmField val ALERTING_JOB_TYPES = listOf("monitor", "workflow")
}
Expand All @@ -166,6 +174,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
lateinit var commentsIndices: CommentsIndices
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
Expand Down Expand Up @@ -203,6 +212,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetWorkflowAction(),
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
RestIndexAlertingCommentAction(),
RestSearchAlertingCommentAction(),
RestDeleteAlertingCommentAction(),
)
}

Expand All @@ -229,6 +241,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_COMMENT_ACTION_TYPE, TransportIndexAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.SEARCH_COMMENTS_ACTION_TYPE, TransportSearchAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
Expand Down Expand Up @@ -287,6 +302,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
Expand Down Expand Up @@ -315,6 +331,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
scheduler,
runner,
scheduledJobIndices,
commentsIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator,
lockService,
Expand Down Expand Up @@ -389,7 +406,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED,
AlertingSettings.ALERTING_COMMENTS_ENABLED,
AlertingSettings.COMMENTS_HISTORY_MAX_DOCS,
AlertingSettings.COMMENTS_HISTORY_INDEX_MAX_AGE,
AlertingSettings.COMMENTS_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.COMMENTS_HISTORY_RETENTION_PERIOD,
AlertingSettings.COMMENTS_MAX_CONTENT_SIZE,
AlertingSettings.MAX_COMMENTS_PER_ALERT,
AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
Expand All @@ -36,6 +38,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.Comment
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.SearchInput
Expand Down Expand Up @@ -274,6 +277,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// to alertsToUpdate to ensure the Alert doc is updated at the end in either case
completedAlertsToUpdate.addAll(completedAlerts)

// retrieve max Comments per Alert notification setting
val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION)

// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
Expand All @@ -291,9 +297,18 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alertCategory in actionExecutionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
val alertsToExecuteActionsForIds = alertsToExecuteActionsFor.map { it.id }
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
monitorCtx.client!!,
alertsToExecuteActionsForIds,
maxComments
)
for (alert in alertsToExecuteActionsFor) {
val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert)
else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs)
val alertContext = if (alertCategory != AlertCategory.NEW) {
AlertContext(alert = alert, comments = allAlertsComments[alert.id])
} else {
getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs, allAlertsComments[alert.id])
}

val actionCtx = getActionContextForAlertCategory(
alertCategory,
Expand Down Expand Up @@ -329,12 +344,28 @@ object BucketLevelMonitorRunner : MonitorRunner() {
continue
}

val alertsToExecuteActionsForIds = dedupedAlerts.map { it.id }
.plus(newAlerts.map { it.id })
.plus(completedAlerts.map { it.id })
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
monitorCtx.client!!,
alertsToExecuteActionsForIds,
maxComments
)
val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
dedupedAlerts = dedupedAlerts.map {
AlertContext(alert = it, comments = allAlertsComments[it.id])
},
newAlerts = newAlerts.map {
getAlertContext(alert = it, alertSampleDocs = alertSampleDocs)
getAlertContext(
alert = it,
alertSampleDocs = alertSampleDocs,
alertComments = allAlertsComments[it.id]
)
},
completedAlerts = completedAlerts.map {
AlertContext(alert = it, comments = allAlertsComments[it.id])
},
completedAlerts = completedAlerts,
error = monitorResult.error ?: triggerResult.error
)
val actionResult = this.runAction(action, actionCtx, monitorCtx, monitor, dryrun)
Expand Down Expand Up @@ -537,17 +568,18 @@ object BucketLevelMonitorRunner : MonitorRunner() {
): BucketLevelTriggerExecutionContext {
return when (alertCategory) {
AlertCategory.DEDUPED ->
ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
ctx.copy(dedupedAlerts = listOf(alertContext), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
AlertCategory.NEW ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error)
AlertCategory.COMPLETED ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error)
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext), error = error)
}
}

private fun getAlertContext(
alert: Alert,
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>,
alertComments: List<Comment>?
): AlertContext {
val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash()
val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey)
Expand All @@ -561,7 +593,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
alert.monitorId,
alert.executionId
)
AlertContext(alert = alert, sampleDocs = listOf())
AlertContext(alert = alert, sampleDocs = listOf(), comments = alertComments)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ abstract class MonitorRunner {
dryrun: Boolean
): ActionRunResult {
return try {
if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alert)) {
if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alertContext?.alert)) {
return ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}
val actionOutput = mutableMapOf<String, String>()
Expand Down
Loading
Loading