diff --git a/server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java b/server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java index f1c64fb4f0d90..762fd2d8be60d 100644 --- a/server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java +++ b/server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java @@ -11,6 +11,14 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.search.ResourceType; +import org.opensearch.search.backpressure.settings.NodeDuressSettings; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers; + +import java.util.EnumMap; /** * Main class to declare Workload Management related settings @@ -31,6 +39,9 @@ public class WorkloadManagementSettings { private Double nodeLevelMemoryRejectionThreshold; private Double nodeLevelCpuCancellationThreshold; private Double nodeLevelCpuRejectionThreshold; + private TimeValue queryGroupServiceRunIntervalInMillis; + private NodeDuressTrackers nodeDuressTrackers; + private Boolean queryGroupServiceEnabled; /** * Setting name for node level memory based rejection threshold for QueryGroup service @@ -84,6 +95,14 @@ public class WorkloadManagementSettings { Setting.Property.Dynamic, Setting.Property.NodeScope ); + public static final String QUERYGROUP_SERVICE_ENABLED_SETTING_NAME = "wlm.query_group.service.enabled"; + + public static final Setting QUERYGROUP_SERVICE_ENABLED_SETTING = Setting.boolSetting( + QUERYGROUP_SERVICE_ENABLED_SETTING_NAME, + false, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); /** * Setting name for Query Group Service run interval */ @@ -108,6 +127,9 @@ public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSett nodeLevelMemoryRejectionThreshold = NODE_LEVEL_MEMORY_REJECTION_THRESHOLD.get(settings); nodeLevelCpuCancellationThreshold = NODE_LEVEL_CPU_CANCELLATION_THRESHOLD.get(settings); nodeLevelCpuRejectionThreshold = NODE_LEVEL_CPU_REJECTION_THRESHOLD.get(settings); + queryGroupServiceRunIntervalInMillis = TimeValue.timeValueMillis(QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING.get(settings)); + nodeDuressTrackers = setupNodeDuressTracker(settings, clusterSettings); + queryGroupServiceEnabled = QUERYGROUP_SERVICE_ENABLED_SETTING.get(settings); ensureRejectionThresholdIsLessThanCancellation( nodeLevelMemoryRejectionThreshold, @@ -128,6 +150,50 @@ public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSett clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_REJECTION_THRESHOLD, this::setNodeLevelCpuRejectionThreshold); } + /** + * Gets the interval at which the Query Group Service runs. + * + * @return the interval as a \`TimeValue\` object. + */ + public TimeValue getQueryGroupServiceRunInterval() { + return queryGroupServiceRunIntervalInMillis; + } + + /** + * Gets the \`NodeDuressTrackers\` instance which tracks the node duress state. + * + * @return the \`NodeDuressTrackers\` instance. + */ + public NodeDuressTrackers getNodeDuressTrackers() { + return nodeDuressTrackers; + } + + public Boolean queryGroupServiceEnabled() { + return queryGroupServiceEnabled; + } + + private NodeDuressTrackers setupNodeDuressTracker(Settings settings, ClusterSettings clusterSettings) { + NodeDuressSettings nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); + return new NodeDuressTrackers(new EnumMap<>(ResourceType.class) { + { + put( + ResourceType.CPU, + new NodeDuressTrackers.NodeDuressTracker( + () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= nodeDuressSettings.getCpuThreshold(), + nodeDuressSettings::getNumSuccessiveBreaches + ) + ); + put( + ResourceType.MEMORY, + new NodeDuressTrackers.NodeDuressTracker( + () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= nodeDuressSettings.getHeapThreshold(), + nodeDuressSettings::getNumSuccessiveBreaches + ) + ); + } + }); + } + /** * Method to get the node level memory based cancellation threshold * @return current node level memory based cancellation threshold