diff --git a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java index 161044d29..9b057ad45 100644 --- a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java +++ b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.EnumMap; +import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,6 +46,8 @@ import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * This class provides utility methods for various anomaly detection indices. */ @@ -122,6 +125,22 @@ public static String getResultMappings() throws IOException { return getMappings(ANOMALY_RESULTS_INDEX_MAPPING_FILE); } + /** + * Retrieves the JSON mapping for the flattened result index with the "dynamic" field set to true + * @return JSON mapping for the flattened result index. + * @throws IOException if the mapping file cannot be read. + */ + public static String getFlattenedResultMappings() throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + + Map mapping = objectMapper + .readValue(ADIndexManagement.class.getClassLoader().getResourceAsStream(ANOMALY_RESULTS_INDEX_MAPPING_FILE), Map.class); + + mapping.put("dynamic", true); + + return objectMapper.writeValueAsString(mapping); + } + /** * Get anomaly detector state index mapping json content. * diff --git a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java index 7b3f573ff..9f9b46f32 100644 --- a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java +++ b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java @@ -592,7 +592,7 @@ public static AnomalyDetector parse( case RESULT_INDEX_FIELD_TTL: customResultIndexTTL = onlyParseNumberValue(parser); break; - case FLATTEN_RESULT_INDEX_MAPPING: + case FLATTEN_CUSTOM_RESULT_INDEX: flattenResultIndexMapping = onlyParseBooleanValue(parser); break; case BREAKING_UI_CHANGE_TIME: diff --git a/src/main/java/org/opensearch/ad/ratelimit/ADResultWriteRequest.java b/src/main/java/org/opensearch/ad/ratelimit/ADResultWriteRequest.java index 912396ebd..5f26f9244 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/ADResultWriteRequest.java +++ b/src/main/java/org/opensearch/ad/ratelimit/ADResultWriteRequest.java @@ -25,9 +25,10 @@ public ADResultWriteRequest( String detectorId, RequestPriority priority, AnomalyResult result, - String resultIndex + String resultIndex, + String flattenResultIndex ) { - super(expirationEpochMs, detectorId, priority, result, resultIndex); + super(expirationEpochMs, detectorId, priority, result, resultIndex, flattenResultIndex); } public ADResultWriteRequest(StreamInput in) throws IOException { diff --git a/src/main/java/org/opensearch/ad/ratelimit/ADResultWriteWorker.java b/src/main/java/org/opensearch/ad/ratelimit/ADResultWriteWorker.java index b57e99f1c..f9b4edc75 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/ADResultWriteWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/ADResultWriteWorker.java @@ -103,8 +103,9 @@ protected ADResultWriteRequest createResultWriteRequest( String configId, RequestPriority priority, AnomalyResult result, - String resultIndex + String resultIndex, + String flattenResultIndex ) { - return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex); + return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex); } } diff --git a/src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java b/src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java index cac437523..c84518890 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java +++ b/src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java @@ -86,7 +86,8 @@ public void saveResult(AnomalyResult result, Config config) { config.getId(), result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM, result, - config.getCustomResultIndexOrAlias() + config.getCustomResultIndexOrAlias(), + config.getFlattenResultIndexAlias() ) ); } diff --git a/src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java index 2de6b07e3..ef451208b 100644 --- a/src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java @@ -39,6 +39,8 @@ public class ADResultBulkTransportAction extends ResultBulkTransportAction { private static final Logger LOG = LogManager.getLogger(ADResultBulkTransportAction.class); + private final ClusterService clusterService; + private final Client client; @Inject public ADResultBulkTransportAction( @@ -61,39 +63,77 @@ public ADResultBulkTransportAction( ADCommonName.ANOMALY_RESULT_INDEX_ALIAS, ADResultBulkRequest::new ); + this.clusterService = clusterService; + this.client = client; clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_SOFT_LIMIT, it -> softLimit = it); clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_HARD_LIMIT, it -> hardLimit = it); } + /** + * Prepares a {@link BulkRequest} for indexing anomaly detection results. + * + * This method processes a list of anomaly detection results provided in the {@link ADResultBulkRequest}. + * Each result is evaluated based on the current indexing pressure and result priority. If a flattened + * result index exists for the result, the result is also added to the flattened index. + * + * @param indexingPressurePercent the current percentage of indexing pressure. This value influences + * whether a result is indexed based on predefined thresholds and probabilities. + * @param request the {@link ADResultBulkRequest} containing anomaly detection results + * to be processed. + * @return a {@link BulkRequest} containing all results that are eligible for indexing. + * + *

Behavior:

+ * + * + *

Indexing Pressure Thresholds:

+ * + * + * @see ADResultBulkRequest + * @see BulkRequest + * @see ADResultWriteRequest + */ @Override protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) { BulkRequest bulkRequest = new BulkRequest(); List results = request.getResults(); - if (indexingPressurePercent <= softLimit) { - for (ADResultWriteRequest resultWriteRequest : results) { - addResult(bulkRequest, resultWriteRequest.getResult(), resultWriteRequest.getResultIndex()); + for (ADResultWriteRequest resultWriteRequest : results) { + AnomalyResult result = resultWriteRequest.getResult(); + String resultIndex = resultWriteRequest.getResultIndex(); + + if (shouldAddResult(indexingPressurePercent, result)) { + addResult(bulkRequest, result, resultIndex); + if (resultWriteRequest.getFlattenResultIndex() != null) { + addResult(bulkRequest, result, resultWriteRequest.getFlattenResultIndex()); + } } + } + + return bulkRequest; + } + + private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult result) { + if (indexingPressurePercent <= softLimit) { + // Always add when below soft limit + return true; } else if (indexingPressurePercent <= hardLimit) { // exceed soft limit (60%) but smaller than hard limit (90%) float acceptProbability = 1 - indexingPressurePercent; - for (ADResultWriteRequest resultWriteRequest : results) { - AnomalyResult result = resultWriteRequest.getResult(); - if (result.isHighPriority() || random.nextFloat() < acceptProbability) { - addResult(bulkRequest, result, resultWriteRequest.getResultIndex()); - } - } + return result.isHighPriority() || random.nextFloat() < acceptProbability; } else { // if exceeding hard limit, only index non-zero grade or error result - for (ADResultWriteRequest resultWriteRequest : results) { - AnomalyResult result = resultWriteRequest.getResult(); - if (result.isHighPriority()) { - addResult(bulkRequest, result, resultWriteRequest.getResultIndex()); - } - } + return result.isHighPriority(); } - - return bulkRequest; } private void addResult(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) { diff --git a/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java index 83283468e..66915513b 100644 --- a/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java @@ -72,7 +72,8 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul config.getId(), RequestPriority.MEDIUM, result, - config.getCustomResultIndexOrAlias() + config.getCustomResultIndexOrAlias(), + config.getFlattenResultIndexAlias() ); } diff --git a/src/main/java/org/opensearch/forecast/model/Forecaster.java b/src/main/java/org/opensearch/forecast/model/Forecaster.java index 756b5c4e0..5f0333cdb 100644 --- a/src/main/java/org/opensearch/forecast/model/Forecaster.java +++ b/src/main/java/org/opensearch/forecast/model/Forecaster.java @@ -437,7 +437,7 @@ public static Forecaster parse( case RESULT_INDEX_FIELD_TTL: customResultIndexTTL = parser.intValue(); break; - case FLATTEN_RESULT_INDEX_MAPPING: + case FLATTEN_CUSTOM_RESULT_INDEX: flattenResultIndexMapping = parser.booleanValue(); break; case BREAKING_UI_CHANGE_TIME: diff --git a/src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteRequest.java b/src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteRequest.java index 54c33f5bb..3929a5e5f 100644 --- a/src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteRequest.java +++ b/src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteRequest.java @@ -25,9 +25,10 @@ public ForecastResultWriteRequest( String forecasterId, RequestPriority priority, ForecastResult result, - String resultIndex + String resultIndex, + String flattenResultIndex ) { - super(expirationEpochMs, forecasterId, priority, result, resultIndex); + super(expirationEpochMs, forecasterId, priority, result, resultIndex, flattenResultIndex); } public ForecastResultWriteRequest(StreamInput in) throws IOException { diff --git a/src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteWorker.java b/src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteWorker.java index 7f991bcf6..7fba8ab03 100644 --- a/src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteWorker.java +++ b/src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteWorker.java @@ -103,8 +103,9 @@ protected ForecastResultWriteRequest createResultWriteRequest( String configId, RequestPriority priority, ForecastResult result, - String resultIndex + String resultIndex, + String flattenResultIndex ) { - return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex); + return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex); } } diff --git a/src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java b/src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java index 10f256278..07fc26462 100644 --- a/src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java +++ b/src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java @@ -83,7 +83,8 @@ public void saveResult(ForecastResult result, Config config) { config.getId(), RequestPriority.MEDIUM, result, - config.getCustomResultIndexOrAlias() + config.getCustomResultIndexOrAlias(), + config.getFlattenResultIndexAlias() ) ); } diff --git a/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java b/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java index 6b3a09835..2b4d15899 100644 --- a/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java @@ -76,7 +76,8 @@ public ForecastResultWriteRequest createResultWriteRequest(Config config, Foreca config.getId(), RequestPriority.MEDIUM, result, - config.getCustomResultIndexOrAlias() + config.getCustomResultIndexOrAlias(), + config.getFlattenResultIndexAlias() ); } } diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index 8ffed43b7..460be7d22 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -44,6 +44,7 @@ public static String getTooManyCategoricalFieldErr(int limit) { public static String FAIL_TO_FIND_CONFIG_MSG = "Can't find config with id: "; public static final String CAN_NOT_CHANGE_CATEGORY_FIELD = "Can't change category field"; public static final String CAN_NOT_CHANGE_CUSTOM_RESULT_INDEX = "Can't change custom result index"; + public static final String CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX = "Can't change flatten result index"; public static final String CATEGORICAL_FIELD_TYPE_ERR_MSG = "Categorical field %s must be of type keyword or ip."; // Modifying message for FEATURE below may break the parseADValidationException method of ValidateAnomalyDetectorTransportAction public static final String FEATURE_INVALID_MSG_PREFIX = "Feature has an invalid query"; diff --git a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java index 6e47c8838..2606b1ee4 100644 --- a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java +++ b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java @@ -11,6 +11,7 @@ package org.opensearch.timeseries.indices; +import static org.opensearch.ad.indices.ADIndexManagement.getFlattenedResultMappings; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry; @@ -89,6 +90,7 @@ import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; import com.google.common.io.Resources; @@ -136,6 +138,7 @@ public abstract class IndexManagement & TimeSe private NamedXContentRegistry xContentRegistry; protected BiCheckedFunction configParser; protected String customResultIndexPrefix; + private final ObjectMapper objectMapper = new ObjectMapper(); protected class IndexState { // keep track of whether the mapping version is up-to-date @@ -272,6 +275,11 @@ protected static String getMappings(String mappingFileRelativePath) throws IOExc return Resources.toString(url, Charsets.UTF_8); } + public static String getScripts(String scriptFileRelativePath) throws IOException { + URL url = IndexManagement.class.getClassLoader().getResource(scriptFileRelativePath); + return Resources.toString(url, Charsets.UTF_8); + } + protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenIndex) { request .settings( @@ -1008,6 +1016,45 @@ public void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu } } + /** + * creates flattened result index + * @param flattenedResultIndexAlias the flattened result index alias + * @param actionListener the action listener + */ + public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener actionListener) { + try { + String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias); + logger.info("Initializing flattened result index: {}", indexName); + + CreateIndexRequest request = new CreateIndexRequest(indexName) + .mapping(getFlattenedResultMappings(), XContentType.JSON) + .settings(settings); + + if (flattenedResultIndexAlias != null) { + request.alias(new Alias(flattenedResultIndexAlias)); + } + + choosePrimaryShards(request, false); + + adminClient.indices().create(request, ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias); + actionListener.onResponse(response); + } else { + String errorMsg = "Index creation not acknowledged for index: " + indexName; + logger.error(errorMsg); + actionListener.onFailure(new IllegalStateException(errorMsg)); + } + }, exception -> { + logger.error("Failed to create flattened result index: {}", indexName, exception); + actionListener.onFailure(exception); + })); + } catch (Exception e) { + logger.error("Error while initializing flattened result index: {}", flattenedResultIndexAlias, e); + actionListener.onFailure(e); + } + } + public void validateCustomIndexForBackendJob( String resultIndexOrAlias, String securityLogId, @@ -1252,15 +1299,18 @@ protected void rolloverAndDeleteHistoryIndex( } // perform rollover and delete on found custom result index alias - candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex)); + candidateResultAliases.forEach(config -> { + handleResultIndexRolloverAndDelete(config.getCustomResultIndexOrAlias(), config, resultIndex); + if (config.getFlattenResultIndexMapping()) { + String flattenedResultIndexAlias = config.getFlattenResultIndexAlias(); + handleResultIndexRolloverAndDelete(flattenedResultIndexAlias, config, resultIndex); + } + }); }, e -> { logger.error("Failed to get configs with custom result index alias.", e); })); } - private void handleCustomResultIndex(Config config, IndexType resultIndex) { - RolloverRequest rolloverRequest = buildRolloverRequest( - config.getCustomResultIndexOrAlias(), - getCustomResultIndexPattern(config.getCustomResultIndexOrAlias()) - ); + private void handleResultIndexRolloverAndDelete(String indexAlias, Config config, IndexType resultIndex) { + RolloverRequest rolloverRequest = buildRolloverRequest(indexAlias, getCustomResultIndexPattern(indexAlias)); // add rollover conditions if found in config if (config.getCustomResultIndexMinAge() != null) { @@ -1272,9 +1322,9 @@ private void handleCustomResultIndex(Config config, IndexType resultIndex) { // perform rollover and delete on custom result index alias proceedWithRolloverAndDelete( - config.getCustomResultIndexOrAlias(), + indexAlias, rolloverRequest, - getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()), + getAllCustomResultIndexPattern(indexAlias), resultIndex, config.getCustomResultIndexTTL() ); diff --git a/src/main/java/org/opensearch/timeseries/model/Config.java b/src/main/java/org/opensearch/timeseries/model/Config.java index d61807528..9ff74c8ad 100644 --- a/src/main/java/org/opensearch/timeseries/model/Config.java +++ b/src/main/java/org/opensearch/timeseries/model/Config.java @@ -79,7 +79,7 @@ public abstract class Config implements Writeable, ToXContentObject { public static final String RESULT_INDEX_FIELD_MIN_SIZE = "result_index_min_size"; public static final String RESULT_INDEX_FIELD_MIN_AGE = "result_index_min_age"; public static final String RESULT_INDEX_FIELD_TTL = "result_index_ttl"; - public static final String FLATTEN_RESULT_INDEX_MAPPING = "flatten_result_index_mapping"; + public static final String FLATTEN_CUSTOM_RESULT_INDEX = "flatten_custom_result_index"; // Changing categorical field, feature attributes, interval, windowDelay, time field, horizon, indices, // result index would force us to display results only from the most recent update. Otherwise, // the UI appear cluttered and unclear. @@ -533,7 +533,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(RESULT_INDEX_FIELD_TTL, customResultIndexTTL); } if (flattenResultIndexMapping != null) { - builder.field(FLATTEN_RESULT_INDEX_MAPPING, flattenResultIndexMapping); + builder.field(FLATTEN_CUSTOM_RESULT_INDEX, flattenResultIndexMapping); } if (lastUIBreakingChangeTime != null) { builder.field(BREAKING_UI_CHANGE_TIME, lastUIBreakingChangeTime.toEpochMilli()); @@ -746,8 +746,22 @@ public Integer getCustomResultIndexTTL() { return customResultIndexTTL; } - public Boolean getFlattenResultIndexMapping() { - return flattenResultIndexMapping; + public boolean getFlattenResultIndexMapping() { + return flattenResultIndexMapping != null ? flattenResultIndexMapping : false; + } + + public String getFlattenResultIndexAlias() { + if (getFlattenResultIndexMapping()) { + return (getCustomResultIndexOrAlias() + "_flattened_" + getName()).toLowerCase(Locale.ROOT); + } + return null; + } + + public String getFlattenResultIndexIngestPipelineName() { + if (getFlattenResultIndexMapping()) { + return ("flatten_result_index_ingest_pipeline_" + getName()).toLowerCase(Locale.ROOT); + } + return null; } public Instant getLastBreakingUIChangeTime() { diff --git a/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteRequest.java b/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteRequest.java index 6d5a069f1..070344324 100644 --- a/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteRequest.java +++ b/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteRequest.java @@ -22,22 +22,33 @@ public abstract class ResultWriteRequest ext private final ResultType result; // If resultIndex is null, result will be stored in default result index. private final String resultIndex; + private final String flattenResultIndex; - public ResultWriteRequest(long expirationEpochMs, String configId, RequestPriority priority, ResultType result, String resultIndex) { + public ResultWriteRequest( + long expirationEpochMs, + String configId, + RequestPriority priority, + ResultType result, + String resultIndex, + String flattenResultIndex + ) { super(expirationEpochMs, configId, priority); this.result = result; this.resultIndex = resultIndex; + this.flattenResultIndex = flattenResultIndex; } - public ResultWriteRequest(StreamInput in, Writeable.Reader resultReader) throws IOException { + public ResultWriteRequest(StreamInput in, Reader resultReader) throws IOException { this.result = resultReader.read(in); this.resultIndex = in.readOptionalString(); + this.flattenResultIndex = in.readOptionalString(); } @Override public void writeTo(StreamOutput out) throws IOException { result.writeTo(out); out.writeOptionalString(resultIndex); + out.writeOptionalString(flattenResultIndex); } public ResultType getResult() { @@ -47,4 +58,8 @@ public ResultType getResult() { public String getResultIndex() { return resultIndex; } + + public String getFlattenResultIndex() { + return flattenResultIndex; + } } diff --git a/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java b/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java index 772062fef..06ea16149 100644 --- a/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java +++ b/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java @@ -199,7 +199,8 @@ private ActionListener> onGetConfig( id, resultToRetry.isHighPriority() ? RequestPriority.HIGH : RequestPriority.MEDIUM, resultToRetry, - config.getCustomResultIndexOrAlias() + config.getCustomResultIndexOrAlias(), + config.getFlattenResultIndexAlias() ) ); @@ -216,6 +217,7 @@ protected abstract ResultWriteRequestType createResultWriteRequest( String configId, RequestPriority priority, ResultType result, - String resultIndex + String resultIndex, + String flattenResultIndex ); } diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index 8cc9675a6..d5d6f76eb 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -8,6 +8,7 @@ import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.timeseries.constant.CommonMessages.CATEGORICAL_FIELD_TYPE_ERR_MSG; import static org.opensearch.timeseries.constant.CommonMessages.TIMESTAMP_VALIDATION_FAILED; +import static org.opensearch.timeseries.indices.IndexManagement.getScripts; import static org.opensearch.timeseries.util.ParseUtils.parseAggregators; import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static org.opensearch.timeseries.util.RestHandlerUtils.isExceptionCausedByInvalidQuery; @@ -25,10 +26,13 @@ import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsAction; import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.ingest.DeletePipelineRequest; +import org.opensearch.action.ingest.PutPipelineRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.IndicesOptions; @@ -40,11 +44,14 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -67,6 +74,7 @@ import org.opensearch.timeseries.model.TimeSeriesTask; import org.opensearch.timeseries.model.ValidationAspect; import org.opensearch.timeseries.model.ValidationIssueType; +import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.task.TaskCacheManager; import org.opensearch.timeseries.task.TaskManager; import org.opensearch.timeseries.util.*; @@ -398,23 +406,176 @@ protected void validateTimeField(boolean indexingDryRun, ActionListener liste } /** - * Prepare for indexing a new config. - * @param indexingDryRun if this is dryrun for indexing; when validation, it is true; when create/update, it is false + * Prepares for indexing a new configuration. + * + * This method handles the preparation of indexing a configuration, either during validation (dry run) + * or for create/update operations. It supports both PUT and POST REST request methods. + * + * @param indexingDryRun indicates whether this is a dry run for indexing. + * If {@code true}, the operation performs validation without creating/updating the configuration. + * If {@code false}, the configuration is created or updated. + * @param listener the {@link ActionListener} to handle the response or failure of the operation. + * + *

Behavior:

+ *
    + *
  • For {@code RestRequest.Method.PUT}: Validates that the job is not already running before proceeding + * with updating the configuration. It updates the configuration and manages the result index mapping + * if necessary.
  • + *
  • For {@code RestRequest.Method.POST}: Creates a new configuration. If a custom result index or alias is specified: + *
      + *
    • If flattening of the result index mapping is enabled, it initializes a flattened result index, + * sets up an ingest pipeline, and updates the flattened result index settings to bind the ingest pipeline + * with the flattened result index, enabling the writing of flattened nested fields into the flattened result index.
    • + *
    • If flattening is not enabled, directly returns the creation response.
    • + *
    + * If no custom result index or alias is specified, returns the creation response directly.
  • + *
+ * + *

Notes:

+ *
    + *
  • If the configuration has a custom result index or alias and flattening is enabled, + * the flattened result index name is suffixed with the detector ID in lowercase.
  • + *
  • The ingest pipeline ID is uniquely generated based on the detector ID in lowercase.
  • + *
+ * + *

Exceptions:

+ *
    + *
  • If the {@code createConfigResponse} is of an unexpected type, which indicates create config call has failed, + * then an {@link IllegalStateException} is thrown.
  • + *
*/ protected void prepareConfigIndexing(boolean indexingDryRun, ActionListener listener) { if (method == RestRequest.Method.PUT) { - handler - .confirmJobRunning( - clusterService, - client, - id, - listener, - () -> updateConfig(id, indexingDryRun, listener), - xContentRegistry - ); + handlePutRequest(indexingDryRun, listener); } else { - createConfig(indexingDryRun, listener); + handlePostRequest(indexingDryRun, listener); + } + } + + private void handlePutRequest(boolean indexingDryRun, ActionListener listener) { + handler + .confirmJobRunning( + clusterService, + client, + id, + listener, + () -> { updateConfig(id, indexingDryRun, listener); }, + xContentRegistry + ); + } + + private void handlePostRequest(boolean indexingDryRun, ActionListener listener) { + createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> { + if (shouldHandleFlattening(indexingDryRun)) { + String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse); + String flattenedResultIndexAlias = config.getFlattenResultIndexAlias(); + + timeSeriesIndices + .initFlattenedResultIndex( + flattenedResultIndexAlias, + ActionListener + .wrap( + initResponse -> setupIngestPipeline(flattenedResultIndexAlias, configId, listener, createConfigResponse), + listener::onFailure + ) + ); + } else { + listener.onResponse(createConfigResponse); + } + }, listener::onFailure)); + } + + private boolean shouldHandleFlattening(boolean indexingDryRun) { + Boolean flattenResultIndexMapping = config.getFlattenResultIndexMapping(); + + return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping); + } + + protected void setupIngestPipeline( + String flattenedResultIndexAlias, + String configId, + ActionListener listener, + T createConfigResponse + ) { + String pipelineId = config.getFlattenResultIndexIngestPipelineName(); + + try { + BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias); + + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON); + + client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(putPipelineResponse -> { + logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId); + bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse); + + }, exception -> { + logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception); + listener.onFailure(exception); + })); + + } catch (IOException e) { + logger.error("Exception while building ingest pipeline definition for pipeline ID: {}", pipelineId, e); + listener.onFailure(e); + } + } + + private BytesReference createPipelineDefinition(String indexName) throws IOException { + XContentBuilder pipelineBuilder = XContentFactory.jsonBuilder(); + pipelineBuilder.startObject(); + { + pipelineBuilder.field("description", "Ingest pipeline for flattening result index: " + indexName); + pipelineBuilder.startArray("processors"); + { + pipelineBuilder.startObject(); + { + pipelineBuilder.startObject("script"); + { + pipelineBuilder.field("lang", "painless"); + String flattenScript = getScripts(TimeSeriesSettings.FLATTEN_CUSTOM_RESULT_INDEX_PAINLESS); + pipelineBuilder.field("source", flattenScript); + } + pipelineBuilder.endObject(); + } + pipelineBuilder.endObject(); + } + pipelineBuilder.endArray(); } + pipelineBuilder.endObject(); + return BytesReference.bytes(pipelineBuilder); + } + + private UpdateSettingsRequest buildUpdateSettingsRequest( + String flattenedResultIndexAlias, + String defaultPipelineName, + String configId + ) { + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(); + updateSettingsRequest.indices(flattenedResultIndexAlias); + + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put("index.default_pipeline", defaultPipelineName); + + updateSettingsRequest.settings(settingsBuilder); + + return updateSettingsRequest; + } + + protected void bindIngestPipelineWithFlattenedResultIndex( + String pipelineId, + String configId, + String flattenedResultIndexAlias, + ActionListener listener, + T createConfigResponse + ) { + UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId, configId); + + client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> { + logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId); + listener.onResponse(createConfigResponse); + }, exception -> { + logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception); + listener.onFailure(exception); + })); } protected void updateConfig(String id, boolean indexingDryRun, ActionListener listener) { @@ -458,27 +619,92 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S ); return; } + if (!existingConfig.getFlattenResultIndexMapping() + && config.getFlattenResultIndexMapping() + && existingConfig.getCustomResultIndexOrAlias() != null) { + // customers can choose to use a flattened result index for newly created detectors and disable it for those detectors. + // however, since enabling the flattened result index creates additional resources and due to bwc concerns, + // we do not allow customers to enable this feature for existing running detectors. + listener + .onFailure( + new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX, RestStatus.BAD_REQUEST) + ); + return; + } } else { if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields()) || !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) { breakingUIChange = true; } } + ActionListener confirmBatchRunningListener; - ActionListener confirmBatchRunningListener = ActionListener - .wrap( - r -> searchConfigInputIndices(id, indexingDryRun, listener), - // can't update config if there is task running - listener::onFailure - ); + // when unselecting flatten result index, need to clean up alias and ingest pipeline resources + if (existingConfig.getFlattenResultIndexMapping() + && !config.getFlattenResultIndexMapping() + && existingConfig.getCustomResultIndexOrAlias() != null) { + confirmBatchRunningListener = ActionListener + .wrap( + r -> unbindIngestPipelineWithFlattenedResultIndex(existingConfig, listener, id, indexingDryRun), + // can't update config if there is task running + listener::onFailure + ); + } else { + confirmBatchRunningListener = ActionListener + .wrap( + r -> searchConfigInputIndices(id, indexingDryRun, listener), + // can't update config if there is task running + listener::onFailure + ); + } handler.confirmBatchRunning(id, batchTasks, confirmBatchRunningListener); } catch (Exception e) { String message = "Failed to parse config " + id; logger.error(message, e); listener.onFailure(new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)); } + } + + private void unbindIngestPipelineWithFlattenedResultIndex( + Config existingConfig, + ActionListener listener, + String id, + boolean indexingDryRun + ) { + // The pipeline name _none specifies that the index does not have an ingest pipeline. + UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest( + existingConfig.getFlattenResultIndexAlias(), + "_none", + existingConfig.getId() + ); + client + .admin() + .indices() + .updateSettings( + updateSettingsRequest, + ActionListener + .wrap( + updateSettingsResponse -> deleteIngestPipeline(existingConfig, listener, id, indexingDryRun), + exception -> listener.onFailure(exception) + ) + ); + } + private void deleteIngestPipeline(Config existingConfig, ActionListener listener, String id, boolean indexingDryRun) { + String pipelineId = existingConfig.getFlattenResultIndexIngestPipelineName(); + + client + .admin() + .cluster() + .deletePipeline( + new DeletePipelineRequest(pipelineId), + ActionListener + .wrap( + deleteIngestPipelineResponse -> searchConfigInputIndices(id, indexingDryRun, listener), + exception -> listener.onFailure(exception) + ) + ); } protected void validateAgainstExistingHCConfig(String configId, boolean indexingDryRun, ActionListener listener) { diff --git a/src/main/java/org/opensearch/timeseries/settings/TimeSeriesSettings.java b/src/main/java/org/opensearch/timeseries/settings/TimeSeriesSettings.java index 34bf7835f..d44d52f16 100644 --- a/src/main/java/org/opensearch/timeseries/settings/TimeSeriesSettings.java +++ b/src/main/java/org/opensearch/timeseries/settings/TimeSeriesSettings.java @@ -289,4 +289,6 @@ public class TimeSeriesSettings { // max entities to track per detector public static final int MAX_TRACKING_ENTITIES = 1000000; + + public static final String FLATTEN_CUSTOM_RESULT_INDEX_PAINLESS = "scripts/flatten-custom-result-index-painless.txt"; } diff --git a/src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java index 61efd6104..6177eaae9 100644 --- a/src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java @@ -34,6 +34,7 @@ import org.opensearch.index.IndexingPressure; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.timeseries.NodeStateManager; import org.opensearch.timeseries.model.IndexableResult; import org.opensearch.timeseries.ratelimit.ResultWriteRequest; import org.opensearch.timeseries.util.BulkUtil; @@ -51,6 +52,7 @@ public abstract class ResultBulkTransportAction features, boolean useDateNanos) throws IOException { + return createIndexAndGetAnomalyDetector(indexName, features, useDateNanos, false); + } + + private AnomalyDetector createIndexAndGetAnomalyDetector( + String indexName, + List features, + boolean useDateNanos, + boolean useFlattenResultIndex + ) throws IOException { TestHelpers.createIndexWithTimeField(client(), indexName, TIME_FIELD, useDateNanos); String testIndexData = "{\"keyword-field\": \"field-1\", \"ip-field\": \"1.2.3.4\", \"timestamp\": 1}"; TestHelpers.ingestDataToIndex(client(), indexName, TestHelpers.toHttpEntity(testIndexData)); - AnomalyDetector detector = TestHelpers.randomAnomalyDetector(TIME_FIELD, indexName, features); + + AnomalyDetector detector = useFlattenResultIndex + ? TestHelpers.randomAnomalyDetectorWithFlattenResultIndex(TIME_FIELD, indexName, features) + : TestHelpers.randomAnomalyDetector(TIME_FIELD, indexName, features); + return detector; } @@ -180,6 +194,272 @@ public void testCreateAnomalyDetectorWithDuplicateName() throws Exception { ); } + public void testCreateAnomalyDetector_withFlattenedResultIndex() throws Exception { + AnomalyDetector detector = createIndexAndGetAnomalyDetector( + INDEX_NAME, + ImmutableList.of(TestHelpers.randomFeature("feature_bytes", "agg", true)), + false, + true + ); + + // Test behavior when AD is disabled + updateClusterSettings(ADEnabledSetting.AD_ENABLED, false); + Exception ex = expectThrows( + ResponseException.class, + () -> TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI, + ImmutableMap.of(), + TestHelpers.toHttpEntity(detector), + null + ) + ); + assertThat(ex.getMessage(), containsString(ADCommonMessages.DISABLED_ERR_MSG)); + + // Test behavior when AD is enabled + updateClusterSettings(ADEnabledSetting.AD_ENABLED, true); + Response response = TestHelpers + .makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI, ImmutableMap.of(), TestHelpers.toHttpEntity(detector), null); + assertEquals("Create anomaly detector with flattened result index failed", RestStatus.CREATED, TestHelpers.restStatus(response)); + + Map responseMap = entityAsMap(response); + String id = (String) responseMap.get("_id"); + int version = (int) responseMap.get("_version"); + assertNotEquals("Response is missing Id", AnomalyDetector.NO_ID, id); + assertTrue("Incorrect version", version > 0); + + // Ensure the flattened result index was created + String expectedFlattenedIndex = "opensearch-ad-plugin-result-test_flattened_detectorwithflattenresultindex"; + assertTrue("Alias for flattened result index does not exist", aliasExists(expectedFlattenedIndex)); + + // Start detector + String startDetectorEndpoint = String.format(Locale.ROOT, TestHelpers.AD_BASE_START_DETECTOR_URL, id); + TestHelpers.makeRequest(client(), "POST", startDetectorEndpoint, ImmutableMap.of(), (HttpEntity) null, null); + + // Wait for detector results, check every 1 second, max 60 seconds + boolean resultsAvailable = false; + int maxRetries = 60; + int retryIntervalMs = 1000; + + Map searchResults = null; + for (int attempt = 0; attempt < maxRetries; attempt++) { + Response searchAllResponse = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_RESULT_URI + "/_search/" + expectedFlattenedIndex, + ImmutableMap.of(), + new StringEntity("{\"query\":{\"match_all\":{}}}", ContentType.APPLICATION_JSON), + null + ); + searchResults = entityAsMap(searchAllResponse); + List> hitsList = (List>) ((Map) searchResults.get("hits")).get("hits"); + + if (hitsList != null && !hitsList.isEmpty()) { + resultsAvailable = true; + break; + } + Thread.sleep(retryIntervalMs); + } + + assertTrue("No anomaly detection results found within timeout period", resultsAvailable); + + // Extract feature name and value from search results + Map firstHit = ((List>) ((Map) searchResults.get("hits")).get("hits")).get(0); + Map source = (Map) firstHit.get("_source"); + assertNotNull("Source should not be null", source); + assertTrue("Source should contain 'feature_data'", source.containsKey("feature_data")); + + List> featureDataList = (List>) source.get("feature_data"); + assertFalse("Feature data list should not be empty", featureDataList.isEmpty()); + + Map firstFeature = featureDataList.get(0); + String featureName = (String) firstFeature.get("feature_name"); + Double featureValue = ((Number) firstFeature.get("data")).doubleValue(); + + // Validate flattened result index mappings + Response getIndexResponse = TestHelpers.makeRequest(client(), "GET", expectedFlattenedIndex, ImmutableMap.of(), "", null); + Map flattenedResultIndex = entityAsMap(getIndexResponse); + + String indexKey = flattenedResultIndex.keySet().stream().findFirst().orElse(null); + Map indexDetails = (Map) flattenedResultIndex.get(indexKey); + Map mappings = (Map) indexDetails.get("mappings"); + + assertEquals("Dynamic field is not set to true", "true", mappings.get("dynamic").toString()); + + Map properties = (Map) mappings.get("properties"); + String expectedFieldKey = "feature_data_" + featureName; + assertTrue("Flattened field '" + expectedFieldKey + "' does not exist", properties.containsKey(expectedFieldKey)); + + // Search against flattened result index and validate value + Response searchFlattenResultIndexResponse = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_RESULT_URI + "/_search/" + expectedFlattenedIndex, + ImmutableMap.of(), + new StringEntity("{\"query\":{\"match_all\":{}}}", ContentType.APPLICATION_JSON), + null + ); + Map flattenedResultIndexSearchResults = entityAsMap(searchFlattenResultIndexResponse); + Map flattenedResultIndexHitsMap = (Map) flattenedResultIndexSearchResults.get("hits"); + List> flattenedResultIndexHitsList = (List>) flattenedResultIndexHitsMap.get("hits"); + + Map flattenedResultIndexFirstHit = flattenedResultIndexHitsList.get(0); + Map flattenedResultIndexSource = (Map) flattenedResultIndexFirstHit.get("_source"); + + assertTrue( + "Flattened result index does not contain '" + expectedFieldKey + "'", + flattenedResultIndexSource.containsKey(expectedFieldKey) + ); + + assertEquals( + "Flattened field value is not correct", + featureValue, + ((Number) flattenedResultIndexSource.get(expectedFieldKey)).doubleValue(), + 0.0001 + ); + } + + public void testUpdateAnomalyDetector_disableFlattenResultIndex_shouldDeletePipeline() throws Exception { + AnomalyDetector detector = createIndexAndGetAnomalyDetector( + INDEX_NAME, + ImmutableList.of(TestHelpers.randomFeature("feature_bytes", "agg", true)), + false, + true + ); + + // test behavior when AD is enabled + updateClusterSettings(ADEnabledSetting.AD_ENABLED, true); + Response response = TestHelpers + .makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI, ImmutableMap.of(), TestHelpers.toHttpEntity(detector), null); + assertEquals("Create anomaly detector with flattened result index failed", RestStatus.CREATED, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + String id = (String) responseMap.get("_id"); + String expectedFlattenedIndex = "opensearch-ad-plugin-result-test_flattened_detectorwithflattenresultindex"; + String expectedPipelineId = "flatten_result_index_ingest_pipeline_detectorwithflattenresultindex"; + String getIngestPipelineEndpoint = String.format(Locale.ROOT, "_ingest/pipeline/%s", expectedPipelineId); + Response getPipelineResponse = TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null); + assertEquals( + "Expected 200 response but got: " + getPipelineResponse.getStatusLine().getStatusCode(), + 200, + getPipelineResponse.getStatusLine().getStatusCode() + ); + List features = detector.getFeatureAttributes(); + AnomalyDetector newDetector = new AnomalyDetector( + id, + detector.getVersion(), + detector.getName(), + detector.getDescription(), + detector.getTimeField(), + detector.getIndices(), + features, + detector.getFilterQuery(), + detector.getInterval(), + detector.getWindowDelay(), + detector.getShingleSize(), + detector.getUiMetadata(), + detector.getSchemaVersion(), + detector.getLastUpdateTime(), + null, + detector.getUser(), + detector.getCustomResultIndexOrAlias(), + TestHelpers.randomImputationOption(features), + randomIntBetween(1, 10000), + randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), + randomIntBetween(1, 1000), + null, + null, + null, + null, + false, + detector.getLastBreakingUIChangeTime() + ); + Response updateResponse = TestHelpers + .makeRequest( + client(), + "PUT", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true", + ImmutableMap.of(), + TestHelpers.toHttpEntity(newDetector), + null + ); + assertEquals("Update anomaly detector failed", RestStatus.OK, TestHelpers.restStatus(updateResponse)); + ResponseException responseException = expectThrows( + ResponseException.class, + () -> TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null) + ); + int statusCode = responseException.getResponse().getStatusLine().getStatusCode(); + assertEquals("Expected 404 response but got: " + statusCode, 404, statusCode); + } + + public void testUpdateAnomalyDetectorFlattenResultIndexField() throws Exception { + TestHelpers.createIndexWithTimeField(client(), INDEX_NAME, TIME_FIELD, false); + String testIndexData = "{\"keyword-field\": \"field-1\", \"ip-field\": \"1.2.3.4\", \"timestamp\": 1}"; + TestHelpers.ingestDataToIndex(client(), INDEX_NAME, TestHelpers.toHttpEntity(testIndexData)); + AnomalyDetector detector = TestHelpers + .randomDetector( + ImmutableList.of(TestHelpers.randomFeature("feature_bytes", "agg", true)), + INDEX_NAME, + 5, + TIME_FIELD, + null, + ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "test" + ); + Response response = TestHelpers + .makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI, ImmutableMap.of(), TestHelpers.toHttpEntity(detector), null); + assertEquals("Create anomaly detector failed", RestStatus.CREATED, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + String id = (String) responseMap.get("_id"); + List features = detector.getFeatureAttributes(); + long expectedFeatures = features.stream().filter(Feature::getEnabled).count(); + AnomalyDetector newDetector = new AnomalyDetector( + id, + null, + detector.getName(), + detector.getDescription(), + detector.getTimeField(), + detector.getIndices(), + features, + detector.getFilterQuery(), + detector.getInterval(), + detector.getWindowDelay(), + detector.getShingleSize(), + detector.getUiMetadata(), + detector.getSchemaVersion(), + detector.getLastUpdateTime(), + detector.getCategoryFields(), + detector.getUser(), + detector.getCustomResultIndexOrAlias(), + TestHelpers.randomImputationOption(features), + randomIntBetween(1, 10000), + randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), + randomIntBetween(1, 1000), + detector.getRules(), + null, + null, + null, + true, + detector.getLastBreakingUIChangeTime() + ); + + Exception ex = expectThrows( + ResponseException.class, + () -> TestHelpers + .makeRequest( + client(), + "PUT", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true", + ImmutableMap.of(), + TestHelpers.toHttpEntity(newDetector), + null + ) + ); + assertThat(ex.getMessage(), containsString(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX)); + } + public void testCreateAnomalyDetector() throws Exception { AnomalyDetector detector = createIndexAndGetAnomalyDetector(INDEX_NAME); updateClusterSettings(ADEnabledSetting.AD_ENABLED, false); diff --git a/src/test/java/org/opensearch/ad/transport/handler/MultiEntityResultHandlerTests.java b/src/test/java/org/opensearch/ad/transport/handler/MultiEntityResultHandlerTests.java index 02b5b31ac..ee920c5a1 100644 --- a/src/test/java/org/opensearch/ad/transport/handler/MultiEntityResultHandlerTests.java +++ b/src/test/java/org/opensearch/ad/transport/handler/MultiEntityResultHandlerTests.java @@ -50,6 +50,7 @@ public void setUp() throws Exception { detectorId, RequestPriority.MEDIUM, TestHelpers.randomAnomalyDetectResult(), + null, null ); request.add(resultWriteRequest); diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index 0c8f45a63..b4bd580a0 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -171,6 +171,7 @@ public class TestHelpers { public static final String AD_BASE_RESULT_URI = AD_BASE_DETECTORS_URI + "/results"; public static final String AD_BASE_PREVIEW_URI = AD_BASE_DETECTORS_URI + "/%s/_preview"; public static final String AD_BASE_STATS_URI = "/_plugins/_anomaly_detection/stats"; + public static final String AD_BASE_START_DETECTOR_URL = AD_BASE_DETECTORS_URI + "/%s/_start"; public static ImmutableSet HISTORICAL_ANALYSIS_RUNNING_STATS = ImmutableSet .of(TaskState.CREATED.name(), TaskState.INIT.name(), TaskState.RUNNING.name()); // Task may fail if memory circuit breaker triggered. @@ -511,6 +512,41 @@ public static AnomalyDetector randomAnomalyDetector(String timefield, String ind ); } + public static AnomalyDetector randomAnomalyDetectorWithFlattenResultIndex(String timefield, String indexName, List features) + throws IOException { + return new AnomalyDetector( + randomAlphaOfLength(10), + randomLong(), + "detectorWithFlattenResultIndex", + randomAlphaOfLength(30), + timefield, + ImmutableList.of(indexName.toLowerCase(Locale.ROOT)), + features, + randomQuery(), + randomIntervalTimeConfiguration(), + randomIntervalTimeConfiguration(), + randomIntBetween(1, TimeSeriesSettings.MAX_SHINGLE_SIZE), + null, + randomInt(), + Instant.now(), + null, + randomUser(), + ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "test", + TestHelpers.randomImputationOption(features), + // timeDecay (reverse of recencyEmphasis) should be less than 1. + // so we start with 2. + randomIntBetween(2, 10000), + randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), + randomIntBetween(1, 1000), + null, + null, + null, + null, + true, + Instant.now() + ); + } + public static AnomalyDetector randomAnomalyDetectorWithEmptyFeature() throws IOException { return new AnomalyDetector( randomAlphaOfLength(10), @@ -1017,6 +1053,7 @@ public static ADResultWriteRequest randomADResultWriteRequest(String detectorId, detectorId, RequestPriority.MEDIUM, randomHCADAnomalyDetectResult(score, grade), + null, null ); return resultWriteRequest; @@ -2243,7 +2280,7 @@ public static ForecastResultWriteRequest randomForecastResultWriteRequest() { ForecastResult result = randomForecastResult(forecasterId); String resultIndex = random.nextBoolean() ? randomAlphaOfLength(10) : null; // Randomly decide to set resultIndex or not - return new ForecastResultWriteRequest(expirationEpochMs, forecasterId, priority, result, resultIndex); + return new ForecastResultWriteRequest(expirationEpochMs, forecasterId, priority, result, resultIndex, null); } public static ForecastResult randomForecastResult(String forecasterId) {