diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 5fac12ac8..6983669a2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -422,7 +422,7 @@ class AlertService( .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .opType(DocWriteRequest.OpType.INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) .id(alert.id) val indexResponse: IndexResponse = client.suspendUntil { index(alertIndexRequest, it) } @@ -472,7 +472,7 @@ class AlertService( } val bulkResponse: BulkResponse = client.suspendUntil { - bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL), it) } if (bulkResponse.hasFailures()) { bulkResponse.items.forEach { item -> @@ -534,7 +534,7 @@ class AlertService( } val bulkResponse: BulkResponse = client.suspendUntil { - bulk(BulkRequest().add(copyRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + bulk(BulkRequest().add(copyRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL), it) } if (bulkResponse.hasFailures()) { bulkResponse.items.forEach { item -> @@ -641,7 +641,7 @@ class AlertService( if (requestsToRetry.isEmpty()) return // Retry Bulk requests if there was any 429 response retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { - val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) } val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS } @@ -688,7 +688,7 @@ class AlertService( // If the index request is to be retried, the Alert is saved separately as well so that its relative ordering is maintained in // relation to index request in the retried bulk request for when it eventually succeeds. retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { - val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) } // TODO: This is only used to retrieve the retryCause, could instead fetch it from the bulkResponse iteration below val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }