Skip to content

Commit

Permalink
Merge pull request #12 from eirsep/2.13-threat-intel
Browse files Browse the repository at this point in the history
backport PRs opensearch-project#1585 and opensearch-project#1587 to 2.13-threat-intel
  • Loading branch information
AWSHurneyt authored Jun 28, 2024
2 parents 407e11a + d9a9418 commit a584eb5
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
Expand All @@ -27,6 +28,7 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonit
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.TransportService
import java.io.IOException
import java.time.Instant
Expand Down Expand Up @@ -64,12 +66,28 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
logger.info(monitorMetadata.lastRunContext.toMutableMap().toString())
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
val updatedLastRunContext = lastRunContext.toMutableMap()

val remoteDocLevelMonitorInput = monitor.inputs[0] as RemoteDocLevelMonitorInput
val docLevelMonitorInput = remoteDocLevelMonitorInput.docLevelMonitorInput
var shards: Set<String> = mutableSetOf()
var concreteIndices = listOf<String>()

// Resolve all passed indices to concrete indices
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
// cleanup old indices that are not monitored anymore from the same monitor
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
for (ind in runContextKeys) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
lastRunContext.remove(ind)
}
}

try {
docLevelMonitorInput.indices.forEach { indexName ->
concreteIndices = IndexUtils.resolveAllIndices(
Expand All @@ -93,6 +111,39 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
}
}

concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(concreteIndexName)
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}

val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(
indexName,
monitorCtx.clusterService!!.state()
)
) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
}

concreteIndices.forEach {
val shardCount = getShardsCount(monitorCtx.clusterService!!, it)
for (i in 0 until shardCount) {
Expand All @@ -111,7 +162,7 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
val docLevelMonitorFanOutResponses = monitorCtx.remoteMonitors[monitor.monitorType]!!.monitorRunner.doFanOut(
monitorCtx.clusterService!!,
monitor,
monitorMetadata,
monitorMetadata.copy(lastRunContext = lastRunContext),
executionId,
concreteIndices,
workflowRunContext,
Expand All @@ -120,12 +171,12 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
nodeMap,
nodeShardAssignments
)
updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, lastRunContext)
updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, updatedLastRunContext)
val triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses)
val inputRunResults = buildInputRunResults(docLevelMonitorFanOutResponses)
if (!isTempMonitor) {
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = lastRunContext),
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}
Expand Down Expand Up @@ -216,17 +267,17 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
// fanOutResponse.lastRunContexts //updatedContexts for relevant shards
val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap<String, Any>

if (fanOutResponse.lastRunContexts.contains("index") && fanOutResponse.lastRunContexts["index"] == indexName) {
fanOutResponse.lastRunContexts.keys.forEach {
if (fanOutResponse.lastRunContexts.contains(indexName)) {
(fanOutResponse.lastRunContexts[indexName] as Map<String, Any>).forEach {

val seq_no = fanOutResponse.lastRunContexts[it].toString().toIntOrNull()
val seq_no = it.value.toString().toIntOrNull()
if (
it != "shards_count" &&
it != "index" &&
it.key != "shards_count" &&
it.key != "index" &&
seq_no != null &&
seq_no >= 0
) {
indexLastRunContext[it] = seq_no
indexLastRunContext[it.key] = seq_no
}
}
}
Expand Down Expand Up @@ -309,4 +360,29 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
}
return InputRunResults(listOf(inputRunResults), if (!errors.isEmpty()) AlertingException.merge(*errors.toTypedArray()) else null)
}

private fun createdRecently(
monitor: Monitor,
periodStart: Instant,
periodEnd: Instant,
indexMetadata: IndexMetadata
): Boolean {
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO
}
return updatedLastRunContext
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ public void onFailure(Exception e) {
);
};
} else {
String indices = restRequest.param("index", "index");
List<String> index = List.of(indices.split(","));
SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput =
new SampleRemoteDocLevelMonitorInput("hello", Map.of("world", 1), 2);
BytesStreamOutput out2 = new BytesStreamOutput();
sampleRemoteDocLevelMonitorInput.writeTo(out2);
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();

DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", List.of("index"), emptyList());
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList());
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);

Monitor remoteDocLevelMonitor = new Monitor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, Action
SampleRemoteMonitorTrigger1 remoteMonitorTrigger = new SampleRemoteMonitorTrigger1(triggerSin);


((Map<String, Object>) lastRunContext.get(index)).put("0", 0);
if (lastRunContext.containsKey(index)) {
((Map<String, Object>) lastRunContext.get(index)).put("2", 0);
}
if (docLevelMonitorInput.getIndices().size() > 1 && lastRunContext.containsKey(docLevelMonitorInput.getIndices().get(1))) {
((Map<String, Object>) lastRunContext.get(docLevelMonitorInput.getIndices().get(1))).put("4", 0);
}
IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX)
.source(Map.of(sampleRemoteDocLevelMonitorInput.getA(), remoteMonitorTrigger.getA())).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest, new ActionListener<>() {
Expand Down
Loading

0 comments on commit a584eb5

Please sign in to comment.