Skip to content

Commit

Permalink
set the cancelAfterTimeInterval parameter on SearchRequest object in … (
Browse files Browse the repository at this point in the history
#1366)

* set the cancelAfterTimeInterval parameter on SearchRequest object in all MonitorRunners

Signed-off-by: Riya Saxena <[email protected]>

* address the comments for pr 1366

Signed-off-by: Riya Saxena <[email protected]>

* address the comments for pr 1366

Signed-off-by: Riya Saxena <[email protected]>

* fix merge conflicts

* fix merge conflicts

Signed-off-by: Riya Saxena <[email protected]>

* fix merge conflicts

Signed-off-by: Riya Saxena <[email protected]>

---------

Signed-off-by: Riya Saxena <[email protected]>
Signed-off-by: Riya <[email protected]>
  • Loading branch information
riysaxen-amzn authored Apr 4, 2024
1 parent d7ee971 commit d2a590e
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -446,6 +448,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.parseSampleDocTags
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
Expand All @@ -41,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
Expand Down Expand Up @@ -116,7 +118,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

try {
validate(monitor)
} catch (e: Exception) {
Expand Down Expand Up @@ -881,7 +882,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.size(monitorCtx.docLevelMonitorShardFetchSize)
)
.preference(Preference.PRIMARY_FIRST.type())

request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)
if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) {
request.source().fetchSource(false)
for (field in fieldsToFetch) {
Expand Down Expand Up @@ -936,7 +939,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
"$monitorInputIndices against query index $queryIndices"
)
var response: SearchResponse

try {
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)

response = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ data class MonitorRunnerExecutionContext(

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var cancelAfterTimeInterval: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
Expand Down Expand Up @@ -153,6 +154,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
ALERT_BACKOFF_MILLIS.get(monitorCtx.settings),
ALERT_BACKOFF_COUNT.get(monitorCtx.settings)
)

monitorCtx.cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(monitorCtx.settings)

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count ->
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
Expand All @@ -169,6 +173,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
}

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) {
monitorCtx.cancelAfterTimeInterval = it
}
monitorCtx.allowList = ALLOW_LIST.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) {
monitorCtx.allowList = it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class TransportGetFindingsSearchAction @Inject constructor(
)
}
searchSourceBuilder.query(queryBuilder).trackTotalHits(true)

client.threadPool().threadContext.stashContext().use {
scope.launch {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.alerting.util

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertService
import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.destination.Destination
Expand All @@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.script.Script
import kotlin.math.max

private val logger = LogManager.getLogger("AlertingUtils")

Expand Down Expand Up @@ -172,6 +175,16 @@ inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
}
}

fun getCancelAfterTimeInterval(): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes
if (givenInterval == -1L) {
return givenInterval
}
return max(givenInterval, AlertService.ALERTS_SEARCH_TIMEOUT.minutes)
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class AlertServiceTests : OpenSearchTestCase() {
xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java)
threadPool = Mockito.mock(ThreadPool::class.java)
clusterService = Mockito.mock(ClusterService::class.java)

settings = Settings.builder().build()
val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand Down

0 comments on commit d2a590e

Please sign in to comment.