Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a feature that flattens custom result index when enabled #1401

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

jackiehanyang
Copy link
Collaborator

Description

wip - sending implementation out first.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang
Copy link
Collaborator Author

jackiehanyang commented Jan 24, 2025

resolved
non related IT test case failed:
REPRODUCE WITH: ./gradlew ':integTest' --tests "org.opensearch.ad.rest.AnomalyDetectorRestApiIT.testSearchTopAnomalyResultsWithCustomResultIndex" -Dtests.seed=764AE145CE6907EA -Dtests.security.manager=false -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=es-419 -Dtests.timezone=NZ -Druntime.java=21
org.opensearch.ad.rest.AnomalyDetectorRestApiIT > testSearchTopAnomalyResultsWithCustomResultIndex FAILED
org.opensearch.client.ResponseException: method [POST], host [http://127.0.0.1:44973/], URI [/_plugins/_anomaly_detection/detectors], status line [HTTP/1.1 500 Internal Server Error]
{"error":{"root_cause":[{"type":"status_exception","reason":"Fail to create detector"}],"type":"status_exception","reason":"Fail to create detector"},"status":500}
at __randomizedtesting.SeedInfo.seed([764AE145CE6907EA:A9D4336811E01C9A]:0)
at app//org.opensearch.client.RestClient.convertResponse(RestClient.java:501)
at app//org.opensearch.client.RestClient.performRequest(RestClient.java:384)
at app//org.opensearch.client.RestClient.performRequest(RestClient.java:359)
at app//org.opensearch.timeseries.TestHelpers.makeRequest(TestHelpers.java:231)
at app//org.opensearch.timeseries.TestHelpers.makeRequest(TestHelpers.java:204)
at app//org.opensearch.ad.AnomalyDetectorRestTestCase.createAnomalyDetector(AnomalyDetectorRestTestCase.java:133)
at app//org.opensearch.ad.rest.AnomalyDetectorRestApiIT.testSearchTopAnomalyResultsWithCustomResultIndex(AnomalyDetectorRestApiIT.java:2069)

@jackiehanyang jackiehanyang force-pushed the ingestPipelineFlatten branch 2 times, most recently from 2fd6bc2 to 8b12fd7 Compare January 27, 2025 07:22
Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang
Copy link
Collaborator Author

checking the failed bwc test...

Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang jackiehanyang force-pushed the ingestPipelineFlatten branch 4 times, most recently from ac8f122 to 9ffc24f Compare January 27, 2025 18:59
src/main/java/org/opensearch/timeseries/model/Config.java Outdated Show resolved Hide resolved
.format(Locale.ROOT, "/opensearch-ad-plugin-result-test_flattened_%s", id.toLowerCase(Locale.ROOT));
// wait for the detector starts writing result
try {
Thread.sleep(60 * 1000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make our build slow down 1 minute. Can you periodically check for results and wait for 1 second if not?

Map<String, Object> flattenedResultIndex = entityAsMap(getIndexResponse);

String indexKey = flattenedResultIndex.keySet().stream().findFirst().orElse(null);
Map<String, Object> indexDetails = (Map<String, Object>) flattenedResultIndex.get(indexKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add checks for the concrete value? For example, in original result index, entity.name is foo and value is bar, you will see entity.name.foo: bar. Also, in original result index, feature data name is bytes and value is blah. In flattened result index, we see feature_data_feature_bytes: blah.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a concrete check for feature_data -

assertTrue("Flattened field 'feature_data_feature_bytes' does not exist", properties.containsKey("feature_data_feature_bytes"));

the painless script works the same for every nested fields, so if feature_data can be flattened, then entity fields can be flattened successfully as well. But I can certainly add more assertion checks here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add more assertion. containsKey may not be enough.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you address the latest comment?

Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang jackiehanyang force-pushed the ingestPipelineFlatten branch 3 times, most recently from 4400f44 to b876a22 Compare January 28, 2025 00:07
Signed-off-by: Jackie Han <[email protected]>
@jackiehanyang jackiehanyang force-pushed the ingestPipelineFlatten branch 2 times, most recently from f1e21cf to ad3d99c Compare January 28, 2025 09:15
Signed-off-by: Jackie Han <[email protected]>
@kaituo
Copy link
Collaborator

kaituo commented Jan 28, 2025

bwc test failure is similar to opensearch-project/OpenSearch#15234

=== Standard error of node `node{::adBwcCluster#twoThirdsUpgradedClusterTask-0}` ===
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:393) [opensearch-2.19.0.jar:2.19.0]
?  	... 41 more
?  Caused by: java.lang.IllegalStateException: unexpected byte [0xac]
?  	at org.opensearch.core.common.io.stream.StreamInput.readBoolean(StreamInput.java:596) ~[opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.core.common.io.stream.StreamInput.readOptionalBoolean(StreamInput.java:606) ~[opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.search.internal.ShardSearchRequest.<init>(ShardSearchRequest.java:255) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.RequestHandlerRegistry.newRequest(RequestHandlerRegistry.java:87) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.newRequest(NativeMessageHandler.java:316) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handleRequest(NativeMessageHandler.java:271) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handleMessage(NativeMessageHandler.java:146) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.messageReceived(NativeMessageHandler.java:126) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundHandler.messageReceivedFromPipeline(InboundHandler.java:120) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:112) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:768) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundBytesHandler.forwardFragments(InboundBytesHandler.java:137) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundBytesHandler.doHandleBytes(InboundBytesHandler.java:77) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:124) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:113) ~[opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
?  	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
?  	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
?  	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[?:?]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
?  	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[?:?]
?  	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[?:?]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[?:?]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) ~[?:?]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) ~[?:?]
?  	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[?:?]
?  	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[?:?]
?  	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
?  	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
? WARN ][r.suppressed             ] [adBwcCluster0-2] path: /_opendistro/_anomaly_detection/detectors/yHNUrJQBrgHv3CV3UwAC/_start, params: {detectorID=yHNUrJQBrgHv3CV3UwAC}
?  org.opensearch.OpenSearchStatusException: Fail to start detector
?  	at org.opensearch.timeseries.util.RestHandlerUtils.lambda$wrapRestActionListener$2(RestHandlerUtils.java:243) [opensearch-anomaly-detection-2.19.0.0.jar:2.19.0.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.timeseries.rest.handler.IndexJobActionHandler.lambda$onGetJobForWrite$10(IndexJobActionHandler.java:303) [opensearch-anomaly-detection-2.19.0.0.jar:2.19.0.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.timeseries.task.TaskManager.lambda$getAndExecuteOnLatestTasks$17(TaskManager.java:605) [opensearch-anomaly-detection-2.19.0.0.jar:2.19.0.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.support.TransportAction$1.onFailure(TransportAction.java:124) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.core.action.ActionListener$5.onFailure(ActionListener.java:277) [opensearch-core-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.raisePhaseFailure(AbstractSearchAsyncAction.java:802) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:775) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:395) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:815) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:548) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.AbstractSearchAsyncAction$1.onFailure(AbstractSearchAsyncAction.java:316) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.SearchExecutionStatsCollector.onFailure(SearchExecutionStatsCollector.java:104) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:75) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleException(SearchTransportService.java:766) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.TransportService$9.handleException(TransportService.java:1741) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:1527) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.lambda$handleException$5(NativeMessageHandler.java:454) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:343) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handleException(NativeMessageHandler.java:452) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handlerResponseError(NativeMessageHandler.java:444) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.handleMessage(NativeMessageHandler.java:172) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.NativeMessageHandler.messageReceived(NativeMessageHandler.java:126) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundHandler.messageReceivedFromPipeline(InboundHandler.java:120) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:112) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:768) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundBytesHandler.forwardFragments(InboundBytesHandler.java:137) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundBytesHandler.doHandleBytes(InboundBytesHandler.java:77) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:124) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:113) [opensearch-2.19.0.jar:2.19.0]
?  	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) [transport-netty4-client-2.19.0.jar:2.19.0]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) [netty-handler-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.117.Final.jar:4.1.117.Final]
?  	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.117.Final.jar:4.1.117.Final]
?  	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
?   ? last 40 non error or warning messages from /__w/anomaly-detection/anomaly-detection/build/testclusters/adBwcCluster0-2/logs/opensearch.stdout.log ?
? [2025-01-28T09:53:13,046][DEBUG][o.o.c.c.C.CoordinatorPublication] [adBwcCluster0-2] publishing version 63 to [PublicationTarget{discoveryNode={adBwcCluster0-0}{yxG8heptSiK2eK9S9Ks46A}{mFsI26s-QTuJOWcGDLRAjQ}{127.0.0.1}{127.0.0.1:37589}{dimr}{upgraded=true, testattr=test, shard_indexing_pressure_enabled=true}, state=NOT_STARTED, ackIsPending=true}, PublicationTarget{discoveryNode={adBwcCluster0-1}{th_t3o4ySIqKWHrFfQg35g}{IePje0osQvilETYvk6-fFA}{127.0.0.1}{127.0.0.1:45277}{dimr}{upgraded=true, testattr=test, shard_indexing_pressure_enabled=true}, state=NOT_STARTED, ackIsPending=true}, PublicationTarget{discoveryNode={adBwcCluster0-2}{jpEavNb_SKyKsfA6FVKT0A}{SalIDjFNQGC9PjW8hGFlLQ}{127.0.0.1}{127.0.0.1:33951}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}, state=NOT_STARTED, ackIsPending=true}]
? [2025-01-28T09:53:13,046][DEBUG][o.o.c.c.PublicationTransportHandler] [adBwcCluster0-2] received diff cluster state version [63] with uuid [_1E3aVONQ_6Q0Oj7pl9LVg], diff size [589]

Map<String, Object> responseMap = entityAsMap(response);
String id = (String) responseMap.get("_id");
List<Feature> features = detector.getFeatureAttributes();
long expectedFeatures = features.stream().filter(Feature::getEnabled).count();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we check equality of expectedFeatures?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you address this comment?

Signed-off-by: Jackie Han <[email protected]>
client.admin().indices().getAliases(getAliasesRequest, ActionListener.wrap(getAliasesResponse -> {
Set<String> indices = getAliasesResponse.getAliases().keySet();
if (indices.isEmpty()) {
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to return listener


private void getFlattenResultAliasIndex(Config existingConfig, ActionListener<T> listener, String id, boolean indexingDryRun) {
String flattenResultIndexAlias = timeSeriesIndices
.getFlattenedResultIndexAlias(existingConfig.getCustomResultIndexOrAlias(), existingConfig.getId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existingConfig.getCustomResultIndexOrAlias can be an alias. Running get alias call on an alias can cause exception.

PUT my-data-stream

POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "my-data-stream",
        "alias": "my-alias"
      }
    }
  ]
}

GET my-alias/_alis

Result:

{
  "error" : "no handler found for uri [/my-alias/_alis] and method [GET]"
}

return;
}
String indexName = indices.iterator().next();
deleteAlias(indexName, flattenResultIndexAlias, existingConfig, listener, id, indexingDryRun);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to delete alias? Unselecting flatten option causes data deletion does not seem to be a good user experience.

.format(Locale.ROOT, "/opensearch-ad-plugin-result-test_flattened_%s", id.toLowerCase(Locale.ROOT));
// wait for the detector starts writing result
try {
Thread.sleep(60 * 1000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would still wait for 1 minute, right?

Map<String, Object> responseMap = entityAsMap(response);
String id = (String) responseMap.get("_id");
List<Feature> features = detector.getFeatureAttributes();
long expectedFeatures = features.stream().filter(Feature::getEnabled).count();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you address this comment?

Map<String, Object> flattenedResultIndex = entityAsMap(getIndexResponse);

String indexKey = flattenedResultIndex.keySet().stream().findFirst().orElse(null);
Map<String, Object> indexDetails = (Map<String, Object>) flattenedResultIndex.get(indexKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you address the latest comment?

Comment on lines +1058 to +1064
public String getFlattenedResultIndexAlias(String indexOrAliasName, String configId) {
return indexOrAliasName + "_flattened_" + configId.toLowerCase(Locale.ROOT);
}

public String getFlattenResultIndexIngestPipelineId(String configId) {
return "flatten_result_index_ingest_pipeline" + configId.toLowerCase(Locale.ROOT);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest move these to Config and it is a fixed value when creating the config so we don't need to repeatedly reconstruct it.

Comment on lines 138 to 142
private void addToFlattenedIndexIfExists(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
String flattenedResultIndexAlias = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
if (clusterService.state().metadata().hasAlias(flattenedResultIndexAlias)) {
addResult(bulkRequest, result, flattenedResultIndexAlias);
}
Copy link
Collaborator

@kaituo kaituo Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because methods in ADResultBulkTransportAction are async, you cannot use async method like nodeStateManager.getConfig. The caller of ADResultBulkTransportAction.prepareBulkRequest does not expect async call and is synchronously waiting for the result. How about adding

ResultWriteRequest:
String flattenIndex;
public ResultWriteRequest(long expirationEpochMs, String configId, RequestPriority priority, ResultType result, String resultIndex, String flattenIndex) {
        super(expirationEpochMs, configId, priority);
        this.result = result;
        this.resultIndex = resultIndex;
        this.flattenIndex = flattenIndex;
    }
public String getFlattenIndex() {
     return flattenIndex;
}
and then in ADResultBulkTransportAction, you just check if getFlattenIndex() is a null or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants