Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a feature that flattens custom result index when enabled #1401

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1318,10 +1318,7 @@ protected void rolloverAndDeleteHistoryIndex(
}

private void handleResultIndexRolloverAndDelete(String indexAlias, Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(
indexAlias,
getCustomResultIndexPattern(indexAlias)
);
RolloverRequest rolloverRequest = buildRolloverRequest(indexAlias, getCustomResultIndexPattern(indexAlias));

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
Expand All @@ -37,7 +39,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -574,54 +575,6 @@ protected void updateResultIndexSetting(String pipelineId, String flattenedResul
}));
}

private void handleFlattenResultIndexMappingUpdate(Config existingConfig, ActionListener<T> listener) {
if (config.getCustomResultIndexOrAlias() == null) {
return;
}
if (existingConfig.getFlattenResultIndexMapping()
&& !config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(config.getId());
client.admin().cluster().deletePipeline(new DeletePipelineRequest(pipelineId), new ActionListener<AcknowledgedResponse>() {

@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
logger.info("Ingest pipeline deleted successfully for pipelineId: {}", pipelineId);
} else {
logger.error("Failed to delete ingest pipeline for pipelineId: {}", pipelineId);
listener
.onFailure(
new OpenSearchStatusException(
"Ingest pipeline deletion was not acknowledged for pipelineId: " + pipelineId,
RestStatus.INTERNAL_SERVER_ERROR
)
);
}
}

@Override
public void onFailure(Exception e) {
if (e instanceof OpenSearchStatusException && ((OpenSearchStatusException) e).status() == RestStatus.NOT_FOUND) {
logger.info("Ingest pipeline [{}] not found, skipping deletion.", pipelineId);
listener.onResponse(null);
} else {
logger.error("Error while deleting ingest pipeline for pipelineId: {}", pipelineId, e);
listener.onFailure(e);
}
}
});
} else if (!existingConfig.getFlattenResultIndexMapping()
&& config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
// customers can choose to use a flattened result index for newly created detectors and disable it for those detectors.
// however, since enabling the flattened result index creates additional resources and due to bwc concerns,
// we do not allow customers to enable this feature for existing running detectors.
listener.onFailure(new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX, RestStatus.BAD_REQUEST));
return;
}
}

protected void updateConfig(String id, boolean indexingDryRun, ActionListener<T> listener) {
GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, id);
client
Expand Down Expand Up @@ -663,28 +616,104 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
);
return;
}
if (!existingConfig.getFlattenResultIndexMapping()
&& config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
// customers can choose to use a flattened result index for newly created detectors and disable it for those detectors.
// however, since enabling the flattened result index creates additional resources and due to bwc concerns,
// we do not allow customers to enable this feature for existing running detectors.
listener
.onFailure(
new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX, RestStatus.BAD_REQUEST)
);
return;
}
} else {
if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields())
|| !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) {
breakingUIChange = true;
}
}
handleFlattenResultIndexMappingUpdate(existingConfig, listener);
ActionListener<Void> confirmBatchRunningListener;

ActionListener<Void> confirmBatchRunningListener = ActionListener
.wrap(
r -> searchConfigInputIndices(id, indexingDryRun, listener),
// can't update config if there is task running
listener::onFailure
);
// when unselecting flatten result index, need to clean up alias and ingest pipeline resources
if (existingConfig.getFlattenResultIndexMapping()
&& !config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
confirmBatchRunningListener = ActionListener
.wrap(
r -> getFlattenResultAliasIndex(existingConfig, listener, id, indexingDryRun),
// can't update config if there is task running
listener::onFailure
);

} else {
confirmBatchRunningListener = ActionListener
.wrap(
r -> searchConfigInputIndices(id, indexingDryRun, listener),
// can't update config if there is task running
listener::onFailure
);
}
handler.confirmBatchRunning(id, batchTasks, confirmBatchRunningListener);
} catch (Exception e) {
String message = "Failed to parse config " + id;
logger.error(message, e);
listener.onFailure(new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR));
}
}

private void getFlattenResultAliasIndex(Config existingConfig, ActionListener<T> listener, String id, boolean indexingDryRun) {
String flattenResultIndexAlias = timeSeriesIndices
.getFlattenedResultIndexAlias(existingConfig.getCustomResultIndexOrAlias(), existingConfig.getId());
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(flattenResultIndexAlias);
client.admin().indices().getAliases(getAliasesRequest, ActionListener.wrap(getAliasesResponse -> {
Set<String> indices = getAliasesResponse.getAliases().keySet();
if (indices.isEmpty()) {
return;
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
}
String indexName = indices.iterator().next();
deleteAlias(indexName, flattenResultIndexAlias, existingConfig, listener, id, indexingDryRun);
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
}, exception -> listener.onFailure(exception)));
}

private void deleteAlias(
String indexName,
String aliasName,
Config existingConfig,
ActionListener<T> listener,
String id,
boolean indexingDryRun
) {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(indexName).alias(aliasName));
client
.admin()
.indices()
.aliases(
indicesAliasesRequest,
ActionListener
.wrap(
deleteAliasResponse -> deleteIngestPipeline(existingConfig, listener, id, indexingDryRun),
exception -> listener.onFailure(exception)
)
);
}

private void deleteIngestPipeline(Config existingConfig, ActionListener<T> listener, String id, boolean indexingDryRun) {
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(existingConfig.getId());

client
.admin()
.cluster()
.deletePipeline(
new DeletePipelineRequest(pipelineId),
ActionListener
.wrap(
deleteIngestPipelineResponse -> searchConfigInputIndices(id, indexingDryRun, listener),
exception -> listener.onFailure(exception)
)
);
}

protected void validateAgainstExistingHCConfig(String configId, boolean indexingDryRun, ActionListener<T> listener) {
Expand Down
Loading