Skip to content

Commit

Permalink
add support for RemoteDocLevelMonitorInput (#1564)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored Jun 5, 2024
1 parent 8ef66c7 commit 47effcb
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
Expand Down Expand Up @@ -185,8 +186,10 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
else null
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
Expand All @@ -208,8 +211,10 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
else null
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
Expand Down Expand Up @@ -64,7 +65,8 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val remoteDocLevelMonitorInput = monitor.inputs[0] as RemoteDocLevelMonitorInput
val docLevelMonitorInput = remoteDocLevelMonitorInput.docLevelMonitorInput
var shards: Set<String> = mutableSetOf()
var concreteIndices = listOf<String>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.commons.alerting.util.isMonitorOfStandardType
import org.opensearch.commons.authuser.User
Expand Down Expand Up @@ -185,10 +187,15 @@ class TransportIndexMonitorAction @Inject constructor(
) {
val indices = mutableListOf<String>()
// todo: for doc level alerting: check if index is present before monitor is created.
val searchInputs = request.monitor.inputs.filter { it.name() == SearchInput.SEARCH_FIELD || it.name() == DOC_LEVEL_INPUT_FIELD }
val searchInputs = request.monitor.inputs.filter {
it.name() == SearchInput.SEARCH_FIELD ||
it.name() == DOC_LEVEL_INPUT_FIELD ||
it.name() == REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD
}
searchInputs.forEach {
val inputIndices = if (it.name() == SearchInput.SEARCH_FIELD) (it as SearchInput).indices
else (it as DocLevelMonitorInput).indices
else if (it.name() == DOC_LEVEL_INPUT_FIELD) (it as DocLevelMonitorInput).indices
else (it as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices
indices.addAll(inputIndices)
}
val updatedIndices = indices.map { index ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.alerting.monitor.inputs.SampleRemoteDocLevelMonitorInput;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1;
Expand All @@ -22,6 +23,7 @@
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.action.Action;
import org.opensearch.commons.alerting.model.action.Throttle;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -206,6 +208,15 @@ public void onFailure(Exception e) {
);
};
} else {
SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput =
new SampleRemoteDocLevelMonitorInput("hello", Map.of("world", 1), 2);
BytesStreamOutput out2 = new BytesStreamOutput();
sampleRemoteDocLevelMonitorInput.writeTo(out2);
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();

DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", List.of("index"), emptyList());
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);

Monitor remoteDocLevelMonitor = new Monitor(
Monitor.NO_ID,
Monitor.NO_VERSION,
Expand All @@ -217,7 +228,7 @@ public void onFailure(Exception e) {
SampleRemoteMonitorPlugin.SAMPLE_REMOTE_DOC_LEVEL_MONITOR,
null,
0,
List.of(new DocLevelMonitorInput("description", List.of("index"), emptyList())),
List.of(remoteDocLevelMonitorInput),
List.of(),
Map.of(),
new DataSources(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.alerting.monitor.inputs.SampleRemoteDocLevelMonitorInput;
import org.opensearch.alerting.monitor.runners.SampleRemoteDocLevelMonitorRunner;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest;
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse;
import org.opensearch.commons.alerting.model.DocLevelMonitorInput;
import org.opensearch.commons.alerting.model.InputRunResults;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -55,30 +60,42 @@ public TransportRemoteDocLevelMonitorFanOutAction(

@Override
protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, ActionListener<DocLevelMonitorFanOutResponse> actionListener) {
Monitor monitor = request.getMonitor();
Map<String, Object> lastRunContext = request.getMonitorMetadata().getLastRunContext();
((Map<String, Object>) lastRunContext.get("index")).put("0", 0);
IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX)
.source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
DocLevelMonitorFanOutResponse response = new DocLevelMonitorFanOutResponse(
clusterService.localNode().getId(),
request.getExecutionId(),
monitor.getId(),
lastRunContext,
new InputRunResults(),
new HashMap<>(),
null
);
actionListener.onResponse(response);
}
try {
Monitor monitor = request.getMonitor();
Map<String, Object> lastRunContext = request.getMonitorMetadata().getLastRunContext();

@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
});
RemoteDocLevelMonitorInput input = (RemoteDocLevelMonitorInput) monitor.getInputs().get(0);
BytesReference customInputSerialized = input.getInput();
StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes);
SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput = new SampleRemoteDocLevelMonitorInput(sin);
DocLevelMonitorInput docLevelMonitorInput = input.getDocLevelMonitorInput();
String index = docLevelMonitorInput.getIndices().get(0);

((Map<String, Object>) lastRunContext.get(index)).put("0", 0);
IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX)
.source(sampleRemoteDocLevelMonitorInput.getB()).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
DocLevelMonitorFanOutResponse response = new DocLevelMonitorFanOutResponse(
clusterService.localNode().getId(),
request.getExecutionId(),
monitor.getId(),
lastRunContext,
new InputRunResults(),
new HashMap<>(),
null
);
actionListener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
});
} catch (Exception ex) {
actionListener.onFailure(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.monitor.inputs;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Map;

public class SampleRemoteDocLevelMonitorInput implements Writeable {

private String a;

private Map<String, Object> b;

private int c;

public SampleRemoteDocLevelMonitorInput(String a, Map<String, Object> b, int c) {
this.a = a;
this.b = b;
this.c = c;
}

public SampleRemoteDocLevelMonitorInput(StreamInput sin) throws IOException {
this(
sin.readString(),
sin.readMap(),
sin.readInt()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(a);
out.writeMap(b);
out.writeInt(c);
}

public int getC() {
return c;
}

public Map<String, Object> getB() {
return b;
}

public String getA() {
return a;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ public void testSampleRemoteDocLevelMonitor() throws IOException, InterruptedExc
LoggingDeprecationHandler.INSTANCE,
searchResponse.getEntity().getContent()
).map();
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1);
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1 &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("world") &&
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("world").toString().equals("1"));
return found.get();
} catch (IOException ex) {
return false;
Expand Down

0 comments on commit 47effcb

Please sign in to comment.