diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutAction.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutAction.kt new file mode 100644 index 00000000..801edc47 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutAction.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionType + +class DocLevelMonitorFanOutAction private constructor() : ActionType(NAME, ::DocLevelMonitorFanOutResponse) { + companion object { + val INSTANCE = DocLevelMonitorFanOutAction() + const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout" + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt new file mode 100644 index 00000000..fe5cfe29 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.commons.alerting.model.IndexExecutionContext +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.MonitorMetadata +import org.opensearch.commons.alerting.model.WorkflowRunContext +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.index.shard.ShardId +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { + val monitor: Monitor + val dryRun: Boolean + val monitorMetadata: MonitorMetadata + val executionId: String + val indexExecutionContext: IndexExecutionContext? + val shardIds: List + val concreteIndicesSeenSoFar: List + val workflowRunContext: WorkflowRunContext? + + constructor( + monitor: Monitor, + dryRun: Boolean, + monitorMetadata: MonitorMetadata, + executionId: String, + indexExecutionContext: IndexExecutionContext?, + shardIds: List, + concreteIndicesSeenSoFar: List, + workflowRunContext: WorkflowRunContext? + ) : super() { + this.monitor = monitor + this.dryRun = dryRun + this.monitorMetadata = monitorMetadata + this.executionId = executionId + this.indexExecutionContext = indexExecutionContext + this.shardIds = shardIds + this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar + this.workflowRunContext = workflowRunContext + require(false == shardIds.isEmpty()) { } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + monitor = Monitor.readFrom(sin)!!, + dryRun = sin.readBoolean(), + monitorMetadata = MonitorMetadata.readFrom(sin), + executionId = sin.readString(), + shardIds = sin.readList(::ShardId), + concreteIndicesSeenSoFar = sin.readStringList(), + workflowRunContext = if (sin.readBoolean()) { + WorkflowRunContext(sin) + } else { null }, + indexExecutionContext = IndexExecutionContext(sin) + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + monitor.writeTo(out) + out.writeBoolean(dryRun) + monitorMetadata.writeTo(out) + out.writeString(executionId) + out.writeCollection(shardIds) + out.writeStringCollection(concreteIndicesSeenSoFar) + out.writeBoolean(workflowRunContext != null) + workflowRunContext?.writeTo(out) + indexExecutionContext?.writeTo(out) + } + + override fun validate(): ActionRequestValidationException? { + var actionValidationException: ActionRequestValidationException? = null + if (shardIds.isEmpty()) { + actionValidationException = ActionRequestValidationException() + actionValidationException.addValidationError("shard_ids is null or empty") + } + return actionValidationException + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("monitor", monitor) + .field("dry_run", dryRun) + .field("execution_id", executionId) + .field("index_execution_context", indexExecutionContext) + .field("shard_ids", shardIds) + .field("concrete_indices", concreteIndicesSeenSoFar) + .field("workflow_run_context", workflowRunContext) + return builder.endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponse.kt new file mode 100644 index 00000000..6e5cde55 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponse.kt @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.commons.alerting.model.InputRunResults +import org.opensearch.commons.alerting.util.AlertingException +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject { + val nodeId: String + val executionId: String + val monitorId: String + val lastRunContexts: MutableMap + val inputResults: InputRunResults + val triggerResults: Map + val exception: AlertingException? + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + nodeId = sin.readString(), + executionId = sin.readString(), + monitorId = sin.readString(), + lastRunContexts = sin.readMap()!! as MutableMap, + inputResults = InputRunResults.readFrom(sin), + triggerResults = suppressWarning(sin.readMap(StreamInput::readString, DocumentLevelTriggerRunResult::readFrom)), + exception = sin.readException() + ) + + constructor( + nodeId: String, + executionId: String, + monitorId: String, + lastRunContexts: MutableMap, + inputResults: InputRunResults = InputRunResults(), // partial, + triggerResults: Map = mapOf(), + exception: AlertingException? = null + ) : super() { + this.nodeId = nodeId + this.executionId = executionId + this.monitorId = monitorId + this.lastRunContexts = lastRunContexts + this.inputResults = inputResults + this.triggerResults = triggerResults + this.exception = exception + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(nodeId) + out.writeString(executionId) + out.writeString(monitorId) + out.writeMap(lastRunContexts) + inputResults.writeTo(out) + out.writeMap( + triggerResults, + StreamOutput::writeString, + { stream, stats -> stats.writeTo(stream) } + ) + out.writeException(exception) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("node_id", nodeId) + .field("execution_id", executionId) + .field("monitor_id", monitorId) + .field("last_run_contexts", lastRunContexts) + .field("input_results", inputResults) + .field("trigger_results", triggerResults) + .field("exception", exception) + .endObject() + return builder + } + + companion object { + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTriggerRunResult.kt new file mode 100644 index 00000000..34328ca2 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTriggerRunResult.kt @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +data class BucketLevelTriggerRunResult( + override var triggerName: String, + override var error: Exception? = null, + var aggregationResultBuckets: Map, + var actionResultsMap: MutableMap> = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readException() as Exception?, // error + sin.readMap(StreamInput::readString, ::AggregationResultBucket), + sin.readMap() as MutableMap> + ) + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder + .field(AGG_RESULT_BUCKETS, aggregationResultBuckets) + .field(ACTIONS_RESULTS, actionResultsMap as Map) + } + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeMap(aggregationResultBuckets, StreamOutput::writeString) { + valueOut: StreamOutput, aggResultBucket: AggregationResultBucket -> + aggResultBucket.writeTo(valueOut) + } + out.writeMap(actionResultsMap as Map) + } + + companion object { + const val AGG_RESULT_BUCKETS = "agg_result_buckets" + const val ACTIONS_RESULTS = "action_results" + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return BucketLevelTriggerRunResult(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTriggerRunResult.kt new file mode 100644 index 00000000..015762cf --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTriggerRunResult.kt @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class ChainedAlertTriggerRunResult( + override var triggerName: String, + var triggered: Boolean, + override var error: Exception?, + var actionResults: MutableMap = mutableMapOf(), + val associatedAlertIds: Set +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap, + associatedAlertIds = sin.readStringList().toSet() + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field("triggered", triggered) + .field("action_results", actionResults as Map) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + out.writeStringCollection(associatedAlertIds) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return ChainedAlertTriggerRunResult(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsTriggerRunResult.kt new file mode 100644 index 00000000..d3af9be3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsTriggerRunResult.kt @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class ClusterMetricsTriggerRunResult( + override var triggerName: String, + override var triggered: Boolean, + override var error: Exception?, + override var actionResults: MutableMap = mutableMapOf(), + var clusterTriggerResults: List = listOf() +) : QueryLevelTriggerRunResult( + triggerName = triggerName, + error = error, + triggered = triggered, + actionResults = actionResults +) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap, + clusterTriggerResults = sin.readList((ClusterTriggerResult)::readFrom) + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + builder + .field(TRIGGERED_FIELD, triggered) + .field(ACTION_RESULTS_FIELD, actionResults as Map) + .startArray(CLUSTER_RESULTS_FIELD) + clusterTriggerResults.forEach { it.toXContent(builder, params) } + return builder.endArray() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + clusterTriggerResults.forEach { it.writeTo(out) } + } + + companion object { + const val TRIGGERED_FIELD = "triggered" + const val ACTION_RESULTS_FIELD = "action_results" + const val CLUSTER_RESULTS_FIELD = "cluster_results" + } + + data class ClusterTriggerResult( + val cluster: String, + val triggered: Boolean + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + cluster = sin.readString(), + triggered = sin.readBoolean() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(cluster) + .field(TRIGGERED_FIELD, triggered) + .endObject() + .endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(cluster) + out.writeBoolean(triggered) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterTriggerResult { + return ClusterTriggerResult(sin) + } + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTriggerRunResult.kt new file mode 100644 index 00000000..1acb354b --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTriggerRunResult.kt @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException + +data class DocumentLevelTriggerRunResult( + override var triggerName: String, + var triggeredDocs: List, + override var error: Exception?, + var actionResultsMap: MutableMap> = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggeredDocs = sin.readStringList(), + actionResultsMap = readActionResults(sin) + ) + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field("triggeredDocs", triggeredDocs as List) + .field("action_results", actionResultsMap as Map) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeStringCollection(triggeredDocs) + out.writeInt(actionResultsMap.size) + actionResultsMap.forEach { (alert, actionResults) -> + out.writeString(alert) + out.writeInt(actionResults.size) + actionResults.forEach { (id, result) -> + out.writeString(id) + result.writeTo(out) + } + } + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return DocumentLevelTriggerRunResult(sin) + } + + @JvmStatic + fun readActionResults(sin: StreamInput): MutableMap> { + val actionResultsMapReconstruct: MutableMap> = mutableMapOf() + val size = sin.readInt() + var idx = 0 + while (idx < size) { + val alert = sin.readString() + val actionResultsSize = sin.readInt() + val actionRunResultElem = mutableMapOf() + var i = 0 + while (i < actionResultsSize) { + val actionId = sin.readString() + val actionResult = ActionRunResult.readFrom(sin) + actionRunResultElem[actionId] = actionResult + ++i + } + actionResultsMapReconstruct[alert] = actionRunResultElem + ++idx + } + return actionResultsMapReconstruct + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt new file mode 100644 index 00000000..8872b525 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +data class IndexExecutionContext( + val queries: List, + val lastRunContext: MutableMap, // previous execution + val updatedLastRunContext: MutableMap, // without sequence numbers + val indexName: String, + val concreteIndexName: String, + val updatedIndexNames: List, + val concreteIndexNames: List, + val conflictingFields: List, + val docIds: List? = emptyList() +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + queries = sin.readList { DocLevelQuery(sin) }, + lastRunContext = sin.readMap() as MutableMap, + updatedLastRunContext = sin.readMap() as MutableMap, + indexName = sin.readString(), + concreteIndexName = sin.readString(), + updatedIndexNames = sin.readStringList(), + concreteIndexNames = sin.readStringList(), + conflictingFields = sin.readStringList(), + docIds = sin.readOptionalStringList() + ) + + override fun writeTo(out: StreamOutput?) { + out!!.writeCollection(queries) + out.writeMap(lastRunContext) + out.writeMap(updatedLastRunContext) + out.writeString(indexName) + out.writeString(concreteIndexName) + out.writeStringCollection(updatedIndexNames) + out.writeStringCollection(concreteIndexNames) + out.writeStringCollection(conflictingFields) + out.writeOptionalStringCollection(docIds) + } + + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + builder!!.startObject() + .field("queries", queries) + .field("last_run_context", lastRunContext) + .field("updated_last_run_context", updatedLastRunContext) + .field("index_name", indexName) + .field("concrete_index_name", concreteIndexName) + .field("udpated_index_names", updatedIndexNames) + .field("concrete_index_names", concreteIndexNames) + .field("conflicting_fields", conflictingFields) + .field("doc_ids", docIds) + .endObject() + return builder + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorMetadata.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorMetadata.kt new file mode 100644 index 00000000..a90f3cc3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorMetadata.kt @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.model.Monitor.Companion.NO_ID +import org.opensearch.commons.alerting.util.instant +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.seqno.SequenceNumbers +import java.io.IOException +import java.time.Instant + +data class MonitorMetadata( + val id: String, + val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + val monitorId: String, + val lastActionExecutionTimes: List, + val lastRunContext: Map, + // Maps (sourceIndex + monitorId) --> concreteQueryIndex + val sourceToQueryIndexMapping: MutableMap = mutableMapOf() +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), + monitorId = sin.readString(), + lastActionExecutionTimes = sin.readList(ActionExecutionTime.Companion::readFrom), + lastRunContext = Monitor.suppressWarning(sin.readMap()), + sourceToQueryIndexMapping = sin.readMap() as MutableMap + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + out.writeString(monitorId) + out.writeCollection(lastActionExecutionTimes) + out.writeMap(lastRunContext) + out.writeMap(sourceToQueryIndexMapping as MutableMap) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean("with_type", false)) builder.startObject(METADATA) + builder.field(MONITOR_ID_FIELD, monitorId) + .field(LAST_ACTION_EXECUTION_FIELD, lastActionExecutionTimes.toTypedArray()) + if (lastRunContext.isNotEmpty()) builder.field(LAST_RUN_CONTEXT_FIELD, lastRunContext) + if (sourceToQueryIndexMapping.isNotEmpty()) { + builder.field(SOURCE_TO_QUERY_INDEX_MAP_FIELD, sourceToQueryIndexMapping as MutableMap) + } + if (params.paramAsBoolean("with_type", false)) builder.endObject() + return builder.endObject() + } + + companion object { + const val METADATA = "metadata" + const val MONITOR_ID_FIELD = "monitor_id" + const val LAST_ACTION_EXECUTION_FIELD = "last_action_execution_times" + const val LAST_RUN_CONTEXT_FIELD = "last_run_context" + const val SOURCE_TO_QUERY_INDEX_MAP_FIELD = "source_to_query_index_mapping" + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ): MonitorMetadata { + lateinit var monitorId: String + val lastActionExecutionTimes = mutableListOf() + var lastRunContext: Map = mapOf() + var sourceToQueryIndexMapping: MutableMap = mutableMapOf() + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + MONITOR_ID_FIELD -> monitorId = xcp.text() + LAST_ACTION_EXECUTION_FIELD -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + lastActionExecutionTimes.add(ActionExecutionTime.parse(xcp)) + } + } + LAST_RUN_CONTEXT_FIELD -> lastRunContext = xcp.map() + SOURCE_TO_QUERY_INDEX_MAP_FIELD -> sourceToQueryIndexMapping = xcp.map() as MutableMap + } + } + + return MonitorMetadata( + if (id != NO_ID) id else "$monitorId-metadata", + seqNo = seqNo, + primaryTerm = primaryTerm, + monitorId = monitorId, + lastActionExecutionTimes = lastActionExecutionTimes, + lastRunContext = lastRunContext, + sourceToQueryIndexMapping = sourceToQueryIndexMapping + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): MonitorMetadata { + return MonitorMetadata(sin) + } + + /** workflowMetadataId is used as key for monitor metadata in the case when the workflow execution happens + so the monitor lastRunContext (in the case of doc level monitor) is not interfering with the monitor execution + WorkflowMetadataId will be either workflowId-metadata (when executing the workflow as it is scheduled) + or timestampWithUUID-metadata (when a workflow is executed in a dry-run mode) + In the case of temp workflow, doc level monitors must have lastRunContext created from scratch + That's why we are using workflowMetadataId - in order to ensure that the doc level monitor metadata is created from scratch + **/ + fun getId(monitor: Monitor, workflowMetadataId: String? = null): String { + return if (workflowMetadataId.isNullOrEmpty()) { "${monitor.id}-metadata" } + // WorkflowMetadataId already contains -metadata suffix + else { "$workflowMetadataId-${monitor.id}-metadata" } + } + } +} + +/** + * A value object containing action execution time. + */ +data class ActionExecutionTime( + val actionId: String, + val executionTime: Instant +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // actionId + sin.readInstant() // executionTime + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(ACTION_ID_FIELD, actionId) + .field(EXECUTION_TIME_FIELD, executionTime) + .endObject() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(actionId) + out.writeInstant(executionTime) + } + + companion object { + const val ACTION_ID_FIELD = "action_id" + const val EXECUTION_TIME_FIELD = "execution_time" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ActionExecutionTime { + lateinit var actionId: String + lateinit var executionTime: Instant + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + ACTION_ID_FIELD -> actionId = xcp.text() + EXECUTION_TIME_FIELD -> executionTime = xcp.instant()!! + } + } + + return ActionExecutionTime( + actionId, + executionTime + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ActionExecutionTime { + return ActionExecutionTime(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt new file mode 100644 index 00000000..d403313b --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt @@ -0,0 +1,215 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class MonitorRunResult( + val monitorName: String, + val periodStart: Instant, + val periodEnd: Instant, + val error: Exception? = null, + val inputResults: InputRunResults = InputRunResults(), + val triggerResults: Map = mapOf() +) : Writeable, ToXContent { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + sin.readString(), // monitorName + sin.readInstant(), // periodStart + sin.readInstant(), // periodEnd + sin.readException(), // error + InputRunResults.readFrom(sin), // inputResults + suppressWarning(sin.readMap()) as Map // triggerResults + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field("monitor_name", monitorName) + .optionalTimeField("period_start", periodStart) + .optionalTimeField("period_end", periodEnd) + .field("error", error?.message) + .field("input_results", inputResults) + .field("trigger_results", triggerResults) + .endObject() + } + + /** Returns error information to store in the Alert. Currently it's just the stack trace but it can be more */ + fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed running monitor:\n${error.userErrorMessage()}") + } + + if (inputResults.error != null) { + return AlertError(Instant.now(), "Failed fetching inputs:\n${inputResults.error.userErrorMessage()}") + } + return null + } + + fun scriptContextError(trigger: Trigger): Exception? { + return error ?: inputResults.error ?: triggerResults[trigger.id]?.error + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): MonitorRunResult { + return MonitorRunResult(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(monitorName) + out.writeInstant(periodStart) + out.writeInstant(periodEnd) + out.writeException(error) + inputResults.writeTo(out) + out.writeMap(triggerResults) + } +} + +data class InputRunResults( + val results: List> = listOf(), + val error: Exception? = null, + val aggTriggersAfterKey: MutableMap? = null +) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field("results", results) + .field("error", error?.message) + .endObject() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeVInt(results.size) + for (map in results) { + out.writeMap(map) + } + out.writeException(error) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): InputRunResults { + val count = sin.readVInt() // count + val list = mutableListOf>() + for (i in 0 until count) { + list.add(suppressWarning(sin.readMap())) // result(map) + } + val error = sin.readException() // error + return InputRunResults(list, error) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } + + fun afterKeysPresent(): Boolean { + aggTriggersAfterKey?.forEach { + if (it.value.afterKey != null && !it.value.lastPage) { + return true + } + } + return false + } +} + +data class TriggerAfterKey(val afterKey: Map?, val lastPage: Boolean) + +data class ActionRunResult( + val actionId: String, + val actionName: String, + val output: Map, + val throttled: Boolean = false, + val executionTime: Instant? = null, + val error: Exception? = null +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // actionId + sin.readString(), // actionName + suppressWarning(sin.readMap()), // output + sin.readBoolean(), // throttled + sin.readOptionalInstant(), // executionTime + sin.readException() // error + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field("id", actionId) + .field("name", actionName) + .field("output", output) + .field("throttled", throttled) + .optionalTimeField("executionTime", executionTime) + .field("error", error?.message) + .endObject() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(actionId) + out.writeString(actionName) + out.writeMap(output) + out.writeBoolean(throttled) + out.writeOptionalInstant(executionTime) + out.writeException(error) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ActionRunResult { + return ActionRunResult(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + } +} + +private val logger = LogManager.getLogger(MonitorRunResult::class.java) + +/** Constructs an error message from an exception suitable for human consumption. */ +fun Throwable.userErrorMessage(): String { + return when { + this is ScriptException -> this.scriptStack.joinToString(separator = "\n", limit = 100) + this is OpenSearchException -> this.detailedMessage + this.message != null -> { + logger.info("Internal error: ${this.message}. See the opensearch.log for details", this) + this.message!! + } + else -> { + logger.info("Unknown Internal error. See the OpenSearch log for details.", this) + "Unknown Internal error. See the OpenSearch log for details." + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt new file mode 100644 index 00000000..101d0067 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +open class QueryLevelTriggerRunResult( + override var triggerName: String, + open var triggered: Boolean, + override var error: Exception?, + open var actionResults: MutableMap = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field("triggered", triggered) + .field("action_results", actionResults as Map) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return QueryLevelTriggerRunResult(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerRunResult.kt new file mode 100644 index 00000000..84efde39 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerRunResult.kt @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException +import java.time.Instant + +abstract class TriggerRunResult( + open var triggerName: String, + open var error: Exception? = null +) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("name", triggerName) + + internalXContent(builder, params) + val msg = error?.message + + builder.field("error", msg) + .endObject() + return builder + } + + abstract fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder + + /** Returns error information to store in the Alert. Currently it's just the stack trace but it can be more */ + open fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(triggerName) + out.writeException(error) + } + + companion object { + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowMetadata.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowMetadata.kt new file mode 100644 index 00000000..48deaed6 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowMetadata.kt @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException +import java.time.Instant + +data class WorkflowMetadata( + val id: String, + val workflowId: String, + val monitorIds: List, + val latestRunTime: Instant, + val latestExecutionId: String +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + workflowId = sin.readString(), + monitorIds = sin.readStringList(), + latestRunTime = sin.readInstant(), + latestExecutionId = sin.readString() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(workflowId) + out.writeStringCollection(monitorIds) + out.writeInstant(latestRunTime) + out.writeString(latestExecutionId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean("with_type", false)) builder.startObject(METADATA) + builder.field(WORKFLOW_ID_FIELD, workflowId) + .field(MONITOR_IDS_FIELD, monitorIds) + .optionalTimeField(LATEST_RUN_TIME, latestRunTime) + .field(LATEST_EXECUTION_ID, latestExecutionId) + if (params.paramAsBoolean("with_type", false)) builder.endObject() + return builder.endObject() + } + + companion object { + const val METADATA = "workflow_metadata" + const val WORKFLOW_ID_FIELD = "workflow_id" + const val MONITOR_IDS_FIELD = "monitor_ids" + const val LATEST_RUN_TIME = "latest_run_time" + const val LATEST_EXECUTION_ID = "latest_execution_id" + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser): WorkflowMetadata { + lateinit var workflowId: String + var monitorIds = mutableListOf() + lateinit var latestRunTime: Instant + lateinit var latestExecutionId: String + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + WORKFLOW_ID_FIELD -> workflowId = xcp.text() + MONITOR_IDS_FIELD -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + monitorIds.add(xcp.text()) + } + } + LATEST_RUN_TIME -> latestRunTime = xcp.instant()!! + LATEST_EXECUTION_ID -> latestExecutionId = xcp.text() + } + } + return WorkflowMetadata( + id = "$workflowId-metadata", + workflowId = workflowId, + monitorIds = monitorIds, + latestRunTime = latestRunTime, + latestExecutionId = latestExecutionId + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): WorkflowMetadata { + return WorkflowMetadata(sin) + } + + fun getId(workflowId: String? = null) = "$workflowId-metadata" + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt new file mode 100644 index 00000000..d478315e --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder + +data class WorkflowRunContext( + // In case of dry run it's random generated id, while in other cases it's workflowId + val workflowId: String, + val workflowMetadataId: String, + val chainedMonitorId: String?, + val matchingDocIdsPerIndex: Map>, + val auditDelegateMonitorAlerts: Boolean +) : Writeable, ToXContentObject { + companion object { + fun readFrom(sin: StreamInput): WorkflowRunContext { + return WorkflowRunContext(sin) + } + } + + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readString(), + sin.readOptionalString(), + sin.readMap() as Map>, + sin.readBoolean() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeString(workflowMetadataId) + out.writeOptionalString(chainedMonitorId) + out.writeMap(matchingDocIdsPerIndex) + out.writeBoolean(auditDelegateMonitorAlerts) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + .field("workflow_id", workflowId) + .field("workflow_metadata_id", workflowMetadataId) + .field("chained_monitor_id", chainedMonitorId) + .field("matching_doc_ids_per_index", matchingDocIdsPerIndex) + .field("audit_delegate_monitor_alerts", auditDelegateMonitorAlerts) + .endObject() + return builder + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunResult.kt new file mode 100644 index 00000000..1b5fe3d8 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunResult.kt @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException +import java.lang.Exception +import java.time.Instant + +data class WorkflowRunResult( + val workflowId: String, + val workflowName: String, + val monitorRunResults: List> = mutableListOf(), + val executionStartTime: Instant, + var executionEndTime: Instant? = null, + val executionId: String, + val error: Exception? = null, + val triggerResults: Map = mapOf() +) : Writeable, ToXContent { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + workflowId = sin.readString(), + workflowName = sin.readString(), + monitorRunResults = sin.readList> { s: StreamInput -> MonitorRunResult.readFrom(s) }, + executionStartTime = sin.readInstant(), + executionEndTime = sin.readOptionalInstant(), + executionId = sin.readString(), + error = sin.readException(), + triggerResults = suppressWarning(sin.readMap()) as Map + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeString(workflowName) + out.writeList(monitorRunResults) + out.writeInstant(executionStartTime) + out.writeOptionalInstant(executionEndTime) + out.writeString(executionId) + out.writeException(error) + out.writeMap(triggerResults) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.field("execution_id", executionId) + builder.field("workflow_name", workflowName) + builder.field("workflow_id", workflowId) + builder.field("trigger_results", triggerResults) + builder.startArray("monitor_run_results") + for (monitorResult in monitorRunResults) { + monitorResult.toXContent(builder, ToXContent.EMPTY_PARAMS) + } + builder.endArray() + .field("execution_start_time", executionStartTime) + .field("execution_end_time", executionEndTime) + .field("error", error?.message) + .endObject() + return builder + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): WorkflowRunResult { + return WorkflowRunResult(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/AlertingException.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/AlertingException.kt new file mode 100644 index 00000000..312758f0 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/AlertingException.kt @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.util + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException +import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException +import org.opensearch.core.common.Strings +import org.opensearch.core.rest.RestStatus +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.indices.InvalidIndexNameException + +private val log = LogManager.getLogger(AlertingException::class.java) + +/** + * Converts into a user friendly message. + */ +class AlertingException(message: String, val status: RestStatus, val ex: Exception) : OpenSearchException(message, ex) { + + override fun status(): RestStatus { + return status + } + + companion object { + @JvmStatic + fun wrap(ex: Exception): OpenSearchException { + log.error("Alerting error: $ex") + + var friendlyMsg = "Unknown error" + var status = RestStatus.INTERNAL_SERVER_ERROR + when (ex) { + is IndexNotFoundException -> { + status = ex.status() + friendlyMsg = "Configured indices are not found: ${ex.index}" + } + is OpenSearchSecurityException -> { + status = ex.status() + friendlyMsg = "User doesn't have permissions to execute this action. Contact administrator." + } + is OpenSearchStatusException -> { + status = ex.status() + friendlyMsg = ex.message as String + } + is IllegalArgumentException -> { + status = RestStatus.BAD_REQUEST + friendlyMsg = ex.message as String + } + is VersionConflictEngineException -> { + status = ex.status() + friendlyMsg = ex.message as String + } + is InvalidIndexNameException -> { + status = RestStatus.BAD_REQUEST + friendlyMsg = ex.message as String + } + else -> { + if (!Strings.isNullOrEmpty(ex.message)) { + friendlyMsg = ex.message as String + } + } + } + // Wrapping the origin exception as runtime to avoid it being formatted. + // Currently, alerting-kibana is using `error.root_cause.reason` as text in the toast message. + // Below logic is to set friendly message to error.root_cause.reason. + return AlertingException(friendlyMsg, status, Exception("${ex.javaClass.name}: ${ex.message}")) + } + + @JvmStatic + fun merge(vararg ex: AlertingException): AlertingException { + var friendlyMsg = "" + var unwrappedExceptionMsg = "" + ex.forEach { + if (friendlyMsg != "") { + friendlyMsg += ", ${it.message}" + unwrappedExceptionMsg += ", ${it.ex.message}" + } else { + friendlyMsg = it.message.orEmpty() + unwrappedExceptionMsg = "${it.ex.message}" + } + } + return AlertingException(friendlyMsg, ex.first().status, Exception(unwrappedExceptionMsg)) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index 5d018e8d..887e8430 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -1,5 +1,6 @@ package org.opensearch.commons.alerting.util +import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.commons.authuser.User @@ -93,3 +94,11 @@ fun Monitor.isMonitorOfStandardType(): Boolean { val standardMonitorTypes = Monitor.MonitorType.values().map { it.value.uppercase(Locale.ROOT) }.toSet() return standardMonitorTypes.contains(this.monitorType.uppercase(Locale.ROOT)) } + +fun getBucketKeysHash(bucketKeys: List): String = bucketKeys.joinToString(separator = "#") + +/** + * Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used + * as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values. + */ +fun AggregationResultBucket.getBucketKeysHash(): String = getBucketKeysHash(this.bucketKeys) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index d6a5e595..d4f1e4f4 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -18,10 +18,12 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter import org.opensearch.commons.alerting.model.ActionExecutionResult +import org.opensearch.commons.alerting.model.ActionRunResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BaseAlert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult import org.opensearch.commons.alerting.model.ChainedAlertTrigger import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput @@ -31,12 +33,16 @@ import org.opensearch.commons.alerting.model.Delegate import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger +import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Input +import org.opensearch.commons.alerting.model.InputRunResults import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.NoOpTrigger import org.opensearch.commons.alerting.model.QueryLevelTrigger +import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult import org.opensearch.commons.alerting.model.Schedule import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.Sequence @@ -50,6 +56,7 @@ import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.model.action.PerExecutionActionScope import org.opensearch.commons.alerting.model.action.Throttle +import org.opensearch.commons.alerting.util.getBucketKeysHash import org.opensearch.commons.alerting.util.string import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry @@ -657,3 +664,115 @@ fun createCorrelationAlertTemplateArgs(correlationAlert: CorrelationAlert): Map< CorrelationAlert.CORRELATION_RULE_NAME to correlationAlert.correlationRuleName ) } + +fun randomInputRunResults(): InputRunResults { + return InputRunResults(listOf(), null) +} + +fun randomActionRunResult(): ActionRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", "val1")) + map.plus(Pair("key2", "val2")) + return ActionRunResult( + "1234", + "test-action", + map, + false, + Instant.now(), + null + ) +} + +fun randomDocumentLevelTriggerRunResult(): DocumentLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + return DocumentLevelTriggerRunResult( + "trigger-name", + mutableListOf(UUIDs.randomBase64UUID().toString()), + null, + mutableMapOf(Pair("alertId", map)) + ) +} +fun randomDocumentLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomDocumentLevelTriggerRunResult() + triggerResults.plus(Pair("test", triggerRunResult)) + + return MonitorRunResult( + "test-monitor", + Instant.now(), + Instant.now(), + null, + randomInputRunResults(), + triggerResults + ) +} + +fun randomBucketLevelTriggerRunResult(): BucketLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + + val aggBucket1 = AggregationResultBucket( + "parent_bucket_path_1", + listOf("bucket_key_1"), + mapOf("k1" to "val1", "k2" to "val2") + ) + val aggBucket2 = AggregationResultBucket( + "parent_bucket_path_2", + listOf("bucket_key_2"), + mapOf("k1" to "val1", "k2" to "val2") + ) + + val actionResultsMap: MutableMap> = mutableMapOf() + actionResultsMap[aggBucket1.getBucketKeysHash()] = map + actionResultsMap[aggBucket2.getBucketKeysHash()] = map + + return BucketLevelTriggerRunResult( + "trigger-name", + null, + mapOf( + aggBucket1.getBucketKeysHash() to aggBucket1, + aggBucket2.getBucketKeysHash() to aggBucket2 + ), + actionResultsMap + ) +} + +fun randomBucketLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomBucketLevelTriggerRunResult() + triggerResults.plus(Pair("test", triggerRunResult)) + + return MonitorRunResult( + "test-monitor", + Instant.now(), + Instant.now(), + null, + randomInputRunResults(), + triggerResults + ) +} + +fun randomQueryLevelTriggerRunResult(): QueryLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + return QueryLevelTriggerRunResult("trigger-name", true, null, map) +} + +fun randomQueryLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomQueryLevelTriggerRunResult() + triggerResults.plus(Pair("test", triggerRunResult)) + + return MonitorRunResult( + "test-monitor", + Instant.now(), + Instant.now(), + null, + randomInputRunResults(), + triggerResults + ) +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt new file mode 100644 index 00000000..3188c7e3 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.commons.alerting.model.ActionExecutionTime +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.IndexExecutionContext +import org.opensearch.commons.alerting.model.IntervalSchedule +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.MonitorMetadata +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.WorkflowRunContext +import org.opensearch.commons.alerting.randomDocumentLevelMonitor +import org.opensearch.commons.alerting.randomDocumentLevelTrigger +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.index.shard.ShardId +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.script.Script +import org.opensearch.test.OpenSearchTestCase +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.UUID + +class DocLevelMonitorFanOutRequestTests : OpenSearchTestCase() { + + fun `test doc level monitor fan out request as stream`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("return true")) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + val monitorMetadata = MonitorMetadata( + "test", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + Monitor.NO_ID, + listOf(ActionExecutionTime("", Instant.now())), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001") + ) + val indexExecutionContext = IndexExecutionContext( + listOf(docQuery), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("index" to mutableMapOf("1" to "1")), + "test-index", + "test-index", + listOf("test-index"), + listOf("test-index"), + listOf("test-field"), + listOf("1", "2") + ) + val workflowRunContext = WorkflowRunContext( + Workflow.NO_ID, + Workflow.NO_ID, + Monitor.NO_ID, + mutableMapOf("index" to listOf("1")), + true + ) + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + monitor, + false, + monitorMetadata, + UUID.randomUUID().toString(), + indexExecutionContext, + listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)), + listOf("test-index"), + workflowRunContext + ) + val out = BytesStreamOutput() + docLevelMonitorFanOutRequest.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) + assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor) + assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId) + assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata) + assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) + assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) + assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponseTests.kt new file mode 100644 index 00000000..876360fd --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponseTests.kt @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.commons.alerting.model.InputRunResults +import org.opensearch.commons.alerting.randomDocumentLevelTriggerRunResult +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.test.OpenSearchTestCase + +class DocLevelMonitorFanOutResponseTests : OpenSearchTestCase() { + fun `test doc level monitor fan out response with errors as stream`() { + val docLevelMonitorFanOutResponse = DocLevelMonitorFanOutResponse( + "nodeid", + "eid", + "monitorId", + mutableMapOf("index" to mutableMapOf("1" to "1")), + InputRunResults(error = null), + mapOf("1" to randomDocumentLevelTriggerRunResult(), "2" to randomDocumentLevelTriggerRunResult()) + ) + val out = BytesStreamOutput() + docLevelMonitorFanOutResponse.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutResponse = DocLevelMonitorFanOutResponse(sin) + assertEquals(docLevelMonitorFanOutResponse.nodeId, newDocLevelMonitorFanOutResponse.nodeId) + assertEquals(docLevelMonitorFanOutResponse.executionId, newDocLevelMonitorFanOutResponse.executionId) + assertEquals(docLevelMonitorFanOutResponse.monitorId, newDocLevelMonitorFanOutResponse.monitorId) + assertEquals(docLevelMonitorFanOutResponse.lastRunContexts, newDocLevelMonitorFanOutResponse.lastRunContexts) + assertEquals(docLevelMonitorFanOutResponse.inputResults, newDocLevelMonitorFanOutResponse.inputResults) + assertEquals(docLevelMonitorFanOutResponse.triggerResults, newDocLevelMonitorFanOutResponse.triggerResults) + } + + fun `test doc level monitor fan out response as stream`() { + val workflow = DocLevelMonitorFanOutResponse( + "nodeid", + "eid", + "monitorId", + mapOf("index" to mapOf("1" to "1")) as MutableMap, + InputRunResults(), + mapOf("1" to randomDocumentLevelTriggerRunResult(), "2" to randomDocumentLevelTriggerRunResult()) + ) + val out = BytesStreamOutput() + workflow.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newWorkflow = DocLevelMonitorFanOutResponse(sin) + assertEquals(workflow.nodeId, newWorkflow.nodeId) + assertEquals(workflow.executionId, newWorkflow.executionId) + assertEquals(workflow.monitorId, newWorkflow.monitorId) + assertEquals(workflow.lastRunContexts, newWorkflow.lastRunContexts) + assertEquals(workflow.inputResults, newWorkflow.inputResults) + assertEquals(workflow.triggerResults, newWorkflow.triggerResults) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 220806c1..4d6ab97c 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -1,19 +1,27 @@ package org.opensearch.commons.alerting.model +import org.junit.Assert import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test +import org.opensearch.common.UUIDs import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.Throttle import org.opensearch.commons.alerting.randomAction import org.opensearch.commons.alerting.randomActionExecutionPolicy +import org.opensearch.commons.alerting.randomBucketLevelMonitorRunResult import org.opensearch.commons.alerting.randomBucketLevelTrigger +import org.opensearch.commons.alerting.randomBucketLevelTriggerRunResult import org.opensearch.commons.alerting.randomChainedAlertTrigger import org.opensearch.commons.alerting.randomDocLevelQuery +import org.opensearch.commons.alerting.randomDocumentLevelMonitorRunResult import org.opensearch.commons.alerting.randomDocumentLevelTrigger +import org.opensearch.commons.alerting.randomInputRunResults import org.opensearch.commons.alerting.randomQueryLevelMonitor +import org.opensearch.commons.alerting.randomQueryLevelMonitorRunResult import org.opensearch.commons.alerting.randomQueryLevelTrigger +import org.opensearch.commons.alerting.randomQueryLevelTriggerRunResult import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty @@ -21,6 +29,7 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant +import org.opensearch.test.OpenSearchTestCase import kotlin.test.assertTrue class WriteableTests { @@ -217,4 +226,120 @@ class WriteableTests { Assertions.assertEquals(createdTime, newComment.createdTime) Assertions.assertEquals(user, newComment.user) } + + fun `test actionrunresult as stream`() { + val actionRunResult = randomActionRunResult() + val out = BytesStreamOutput() + actionRunResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newActionRunResult = ActionRunResult(sin) + OpenSearchTestCase.assertEquals( + "Round tripping ActionRunResult doesn't work", + actionRunResult, + newActionRunResult + ) + } + + fun `test query-level triggerrunresult as stream`() { + val runResult = randomQueryLevelTriggerRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = QueryLevelTriggerRunResult(sin) + OpenSearchTestCase.assertEquals(runResult.triggerName, newRunResult.triggerName) + OpenSearchTestCase.assertEquals(runResult.triggered, newRunResult.triggered) + OpenSearchTestCase.assertEquals(runResult.error, newRunResult.error) + OpenSearchTestCase.assertEquals(runResult.actionResults, newRunResult.actionResults) + } + + fun `test bucket-level triggerrunresult as stream`() { + val runResult = randomBucketLevelTriggerRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = BucketLevelTriggerRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) + } + + fun `test doc-level triggerrunresult as stream`() { + val runResult = randomDocumentLevelTriggerRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = DocumentLevelTriggerRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) + } + + fun `test inputrunresult as stream`() { + val runResult = randomInputRunResults() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = InputRunResults.readFrom(sin) + OpenSearchTestCase.assertEquals("Round tripping InputRunResults doesn't work", runResult, newRunResult) + } + + fun `test query-level monitorrunresult as stream`() { + val runResult = randomQueryLevelMonitorRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = MonitorRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) + } + + fun `test bucket-level monitorrunresult as stream`() { + val runResult = randomBucketLevelMonitorRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = MonitorRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) + } + + @Test + fun `test doc-level monitorrunresult as stream`() { + val runResult = randomDocumentLevelMonitorRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = MonitorRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) + } + + @Test + fun `test DocumentLevelTriggerRunResult as stream`() { + val workflow = randomDocumentLevelTriggerRunResult() + val out = BytesStreamOutput() + workflow.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newWorkflow = DocumentLevelTriggerRunResult(sin) + Assert.assertEquals("Round tripping dltrr failed", newWorkflow, workflow) + } + + fun randomDocumentLevelTriggerRunResult(): DocumentLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + return DocumentLevelTriggerRunResult( + "trigger-name", + mutableListOf(UUIDs.randomBase64UUID().toString()), + null, + mutableMapOf(Pair("alertId", map)) + ) + } + + fun randomActionRunResult(): ActionRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", "val1")) + map.plus(Pair("key2", "val2")) + return ActionRunResult( + "1234", + "test-action", + map, + false, + Instant.now(), + null + ) + } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index 113f4deb..5364116b 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -3,6 +3,7 @@ package org.opensearch.commons.alerting.model import org.junit.Assert.assertEquals import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test +import org.opensearch.common.xcontent.XContentFactory import org.opensearch.commons.alerting.builder import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy @@ -543,4 +544,18 @@ class XContentTests { val parsedComment = Comment.parse(parser(commentString), "123") Assertions.assertEquals(comment, parsedComment, "Round tripping Comment doesn't work") } + + @Test + fun `test MonitorMetadata`() { + val monitorMetadata = MonitorMetadata( + id = "monitorId-metadata", + monitorId = "monitorId", + lastActionExecutionTimes = emptyList(), + lastRunContext = emptyMap(), + sourceToQueryIndexMapping = mutableMapOf() + ) + val monitorMetadataString = monitorMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string() + val parsedMonitorMetadata = MonitorMetadata.parse(parser(monitorMetadataString)) + assertEquals("Round tripping MonitorMetadata doesn't work", monitorMetadata, parsedMonitorMetadata) + } }