Skip to content

Commit

Permalink
changes to add support for remote monitors in alerting (#661)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored and jowg-amazon committed Jul 2, 2024
1 parent 413ae0f commit 0d658aa
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 21 deletions.
26 changes: 14 additions & 12 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.time.Instant
import java.util.Locale
import java.util.regex.Pattern

data class Monitor(
override val id: String = NO_ID,
Expand All @@ -34,7 +34,7 @@ data class Monitor(
override val enabledTime: Instant?,
// TODO: Check how this behaves during rolling upgrade/multi-version cluster
// Can read/write and parsing break if it's done from an old -> new version of the plugin?
val monitorType: MonitorType,
val monitorType: String,
val user: User?,
val schemaVersion: Int = NO_SCHEMA_VERSION,
val inputs: List<Input>,
Expand All @@ -56,13 +56,13 @@ data class Monitor(
require(triggerIds.add(trigger.id)) { "Duplicate trigger id: ${trigger.id}. Trigger ids must be unique." }
// Verify Trigger type based on Monitor type
when (monitorType) {
MonitorType.QUERY_LEVEL_MONITOR ->
MonitorType.QUERY_LEVEL_MONITOR.value ->
require(trigger is QueryLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" }
MonitorType.BUCKET_LEVEL_MONITOR ->
MonitorType.BUCKET_LEVEL_MONITOR.value ->
require(trigger is BucketLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" }
MonitorType.CLUSTER_METRICS_MONITOR ->
MonitorType.CLUSTER_METRICS_MONITOR.value ->
require(trigger is QueryLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" }
MonitorType.DOC_LEVEL_MONITOR ->
MonitorType.DOC_LEVEL_MONITOR.value ->
require(trigger is DocumentLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" }
}
}
Expand Down Expand Up @@ -94,7 +94,7 @@ data class Monitor(
schedule = Schedule.readFrom(sin),
lastUpdateTime = sin.readInstant(),
enabledTime = sin.readOptionalInstant(),
monitorType = sin.readEnum(MonitorType::class.java),
monitorType = sin.readString(),
user = if (sin.readBoolean()) {
User(sin)
} else {
Expand Down Expand Up @@ -179,7 +179,7 @@ data class Monitor(
schedule.writeTo(out)
out.writeInstant(lastUpdateTime)
out.writeOptionalInstant(enabledTime)
out.writeEnum(monitorType)
out.writeString(monitorType)
out.writeBoolean(user != null)
user?.writeTo(out)
out.writeInt(schemaVersion)
Expand Down Expand Up @@ -227,6 +227,7 @@ data class Monitor(
const val DATA_SOURCES_FIELD = "data_sources"
const val ENABLED_TIME_FIELD = "enabled_time"
const val OWNER_FIELD = "owner"
val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}")

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
// the different subclasses and creating circular dependencies
Expand Down Expand Up @@ -265,9 +266,10 @@ data class Monitor(
NAME_FIELD -> name = xcp.text()
MONITOR_TYPE_FIELD -> {
monitorType = xcp.text()
val allowedTypes = MonitorType.values().map { it.value }
if (!allowedTypes.contains(monitorType)) {
throw IllegalStateException("Monitor type should be one of $allowedTypes")
val matcher = MONITOR_TYPE_PATTERN.matcher(monitorType)
val find = matcher.matches()
if (!find) {
throw IllegalStateException("Monitor type should follow pattern ${MONITOR_TYPE_PATTERN.pattern()}")
}
}
USER_FIELD -> user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp)
Expand Down Expand Up @@ -325,7 +327,7 @@ data class Monitor(
requireNotNull(schedule) { "Monitor schedule is null" },
lastUpdateTime ?: Instant.now(),
enabledTime,
MonitorType.valueOf(monitorType.uppercase(Locale.ROOT)),
monitorType,
user,
schemaVersion,
inputs.toList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.time.Instant
import java.util.Locale

class IndexUtils {
companion object {
Expand Down Expand Up @@ -46,7 +47,9 @@ class IndexUtils {
}
}

fun Monitor.isBucketLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.BUCKET_LEVEL_MONITOR
fun Monitor.isBucketLevelMonitor(): Boolean =
isMonitorOfStandardType() &&
Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.BUCKET_LEVEL_MONITOR

fun XContentBuilder.optionalUserField(name: String, user: User?): XContentBuilder {
if (user == null) {
Expand Down Expand Up @@ -85,3 +88,8 @@ fun XContentParser.instant(): Instant? {
* Extension function for ES 6.3 and above that duplicates the ES 6.2 XContentBuilder.string() method.
*/
fun XContentBuilder.string(): String = BytesReference.bytes(this).utf8ToString()

fun Monitor.isMonitorOfStandardType(): Boolean {
val standardMonitorTypes = Monitor.MonitorType.values().map { it.value.uppercase(Locale.ROOT) }.toSet()
return standardMonitorTypes.contains(this.monitorType.uppercase(Locale.ROOT))
}
10 changes: 5 additions & 5 deletions src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fun randomQueryLevelMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -100,7 +100,7 @@ fun randomQueryLevelMonitorWithoutUser(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -124,7 +124,7 @@ fun randomBucketLevelMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -142,7 +142,7 @@ fun randomClusterMetricsMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -160,7 +160,7 @@ fun randomDocumentLevelMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GetMonitorResponseTests : OpenSearchTestCase() {
schedule = cronSchedule,
lastUpdateTime = Instant.now(),
enabledTime = Instant.now(),
monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR,
monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value,
user = randomUser(),
schemaVersion = 0,
inputs = mutableListOf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class IndexMonitorResponseTests {
schedule = cronSchedule,
lastUpdateTime = Instant.now(),
enabledTime = Instant.now(),
monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR,
monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value,
user = randomUser(),
schemaVersion = 0,
inputs = mutableListOf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class XContentTests {
""".trimIndent()
val parsedMonitor = Monitor.parse(parser(monitorString))
Assertions.assertEquals(
Monitor.MonitorType.QUERY_LEVEL_MONITOR,
Monitor.MonitorType.QUERY_LEVEL_MONITOR.value,
parsedMonitor.monitorType,
"Incorrect monitor type"
)
Expand Down

0 comments on commit 0d658aa

Please sign in to comment.