Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a feature that flattens custom result index when enabled #1401

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private AnomalyDetectorSettings() {}
);

public static final String ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results.json";
public static final String FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results-flattened.json";
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
public static final String ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE = "mappings/anomaly-detection-state.json";
public static final String CHECKPOINT_INDEX_MAPPING_FILE = "mappings/anomaly-checkpoint.json";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -39,6 +40,8 @@
public class ADResultBulkTransportAction extends ResultBulkTransportAction<AnomalyResult, ADResultWriteRequest, ADResultBulkRequest> {

private static final Logger LOG = LogManager.getLogger(ADResultBulkTransportAction.class);
private final ClusterService clusterService;
private final Client client;

@Inject
public ADResultBulkTransportAction(
Expand All @@ -61,39 +64,82 @@ public ADResultBulkTransportAction(
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
ADResultBulkRequest::new
);
this.clusterService = clusterService;
this.client = client;
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_SOFT_LIMIT, it -> softLimit = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_HARD_LIMIT, it -> hardLimit = it);
}

/**
* Prepares a {@link BulkRequest} for indexing anomaly detection results.
*
* This method processes a list of anomaly detection results provided in the {@link ADResultBulkRequest}.
* Each result is evaluated based on the current indexing pressure and result priority. If a flattened
* result index exists for the result, the result is also added to the flattened index.
*
* @param indexingPressurePercent the current percentage of indexing pressure. This value influences
* whether a result is indexed based on predefined thresholds and probabilities.
* @param request the {@link ADResultBulkRequest} containing anomaly detection results
* to be processed.
* @return a {@link BulkRequest} containing all results that are eligible for indexing.
*
* <p><b>Behavior:</b></p>
* <ul>
* <li>Results are added to the bulk request if the indexing pressure is within acceptable limits
* or the result has high priority.</li>
* <li>If a flattened result index exists for a result, it is added to the flattened index in addition
* to the primary index.</li>
* </ul>
*
* <p><b>Indexing Pressure Thresholds:</b></p>
* <ul>
* <li>Below the soft limit: All results are added.</li>
* <li>Between the soft limit and the hard limit: High-priority results are always added, and
* other results are added based on a probability that decreases with increasing pressure.</li>
* <li>Above the hard limit: Only high-priority results are added.</li>
* </ul>
*
* @see ADResultBulkRequest
* @see BulkRequest
* @see ADResultWriteRequest
*/
@Override
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) {
BulkRequest bulkRequest = new BulkRequest();
List<ADResultWriteRequest> results = request.getResults();

if (indexingPressurePercent <= softLimit) {
for (ADResultWriteRequest resultWriteRequest : results) {
addResult(bulkRequest, resultWriteRequest.getResult(), resultWriteRequest.getResultIndex());
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
String resultIndex = resultWriteRequest.getResultIndex();

if (shouldAddResult(indexingPressurePercent, result)) {
addResult(bulkRequest, result, resultIndex);
addToFlattenedIndexIfExists(bulkRequest, result, resultIndex);
}
}

return bulkRequest;
}

private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult result) {
if (indexingPressurePercent <= softLimit) {
// Always add when below soft limit
return true;
} else if (indexingPressurePercent <= hardLimit) {
// exceed soft limit (60%) but smaller than hard limit (90%)
float acceptProbability = 1 - indexingPressurePercent;
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
if (result.isHighPriority() || random.nextFloat() < acceptProbability) {
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
}
}
return result.isHighPriority() || random.nextFloat() < acceptProbability;
} else {
// if exceeding hard limit, only index non-zero grade or error result
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
if (result.isHighPriority()) {
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
}
}
return result.isHighPriority();
}
}

return bulkRequest;
private void addToFlattenedIndexIfExists(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
String flattenedResultIndexName = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
if (clusterService.state().metadata().hasIndex(flattenedResultIndexName)) {
addResult(bulkRequest, result, flattenedResultIndexName);
}
}

private void addResult(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.timeseries.indices;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE;
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

Expand Down Expand Up @@ -272,6 +273,11 @@ protected static String getMappings(String mappingFileRelativePath) throws IOExc
return Resources.toString(url, Charsets.UTF_8);
}

public static String getScripts(String scriptFileRelativePath) throws IOException {
URL url = IndexManagement.class.getClassLoader().getResource(scriptFileRelativePath);
return Resources.toString(url, Charsets.UTF_8);
}

protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenIndex) {
request
.settings(
Expand Down Expand Up @@ -1008,6 +1014,44 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
}
}

/**
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
* creates flattened result index
* @param indexName the index name
* @param actionListener the action listener
* @throws IOException
*/
public void initFlattenedResultIndex(String indexName, ActionListener<CreateIndexResponse> actionListener) throws IOException {
logger.info("Initializing flattened result index: {}", indexName);

CreateIndexRequest request = new CreateIndexRequest(indexName)
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved
.mapping(getFlattenedResultIndexMappings(), XContentType.JSON)
.settings(settings);
choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {}", indexName);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}));
}

/**
* Get flattened result index mapping json content
* @return flattened result index mapping
* @throws IOException
*/
public String getFlattenedResultIndexMappings() throws IOException {
return getMappings(FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE);
}

public <T> void validateCustomIndexForBackendJob(
String resultIndexOrAlias,
String securityLogId,
Expand Down
Loading
Loading