Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change disk circuit breaker to cluster settings #2634

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,31 @@

package org.opensearch.ml.breaker;

import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD;

import java.io.File;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Optional;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.ml.common.exception.MLException;

/**
* A circuit breaker for disk usage.
*/
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<Long> {
// TODO: make this value configurable as cluster setting
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<ByteSizeValue> {
private static final String ML_DISK_CB = "Disk Circuit Breaker";
public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L;
private static final long GB = 1024 * 1024 * 1024;
private String diskDir;

public DiskCircuitBreaker(String diskDir) {
super(DEFAULT_DISK_SHORTAGE_THRESHOLD);
this.diskDir = diskDir;
}
public static final ByteSizeValue DEFAULT_DISK_SHORTAGE_THRESHOLD = new ByteSizeValue(5, ByteSizeUnit.GB);
private final File diskDir;

public DiskCircuitBreaker(long threshold, String diskDir) {
super(threshold);
public DiskCircuitBreaker(Settings settings, ClusterService clusterService, File diskDir) {
super(Optional.ofNullable(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.get(settings)).orElse(DEFAULT_DISK_SHORTAGE_THRESHOLD));
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD, super::setThreshold);
this.diskDir = diskDir;
}

Expand All @@ -42,7 +43,7 @@ public String getName() {
public boolean isOpen() {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () -> {
return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB
return new ByteSizeValue(diskDir.getFreeSpace(), ByteSizeUnit.BYTES).compareTo(getThreshold()) < 0; // in GB
});
} catch (PrivilegedActionException e) {
throw new MLException("Failed to run disk circuit breaker");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.breaker;

import java.io.File;
import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -76,7 +77,7 @@ public MLCircuitBreakerService init(Path path) {
// Register memory circuit breaker
registerBreaker(BreakerName.MEMORY, new MemoryCircuitBreaker(this.settings, this.clusterService, this.jvmService));
log.info("Registered ML memory breaker.");
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString()));
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(this.settings, this.clusterService, new File(path.toString())));
log.info("Registered ML disk breaker.");
// Register native memory circuit breaker, disabling due to unstability.
// registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD;

import java.util.Optional;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.jvm.JvmService;
Expand All @@ -15,11 +17,9 @@
* A circuit breaker for memory usage.
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
// TODO: make this value configurable as cluster setting
private static final String ML_MEMORY_CB = "Memory Circuit Breaker";
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
private final JvmService jvmService;
private volatile Integer jvmHeapMemThreshold = 85;

public MemoryCircuitBreaker(JvmService jvmService) {
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
Expand All @@ -32,22 +32,23 @@ public MemoryCircuitBreaker(short threshold, JvmService jvmService) {
}

public MemoryCircuitBreaker(Settings settings, ClusterService clusterService, JvmService jvmService) {
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
super(
Optional
.ofNullable(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings))
.map(Integer::shortValue)
.orElse(DEFAULT_JVM_HEAP_USAGE_THRESHOLD)
);
this.jvmService = jvmService;
this.jvmHeapMemThreshold = ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> jvmHeapMemThreshold = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue()));
}

@Override
public String getName() {
return ML_MEMORY_CB;
}

@Override
public Short getThreshold() {
return this.jvmHeapMemThreshold.shortValue();
}

@Override
public boolean isOpen() {
return getThreshold() < 100 && jvmService.stats().getMem().getHeapUsedPercent() > getThreshold();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD;

import java.util.Optional;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.os.OsService;
Expand All @@ -18,18 +20,22 @@ public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker";
public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90;
private final OsService osService;
private volatile Integer nativeMemThreshold = 90;

public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) {
super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD);
super(
Optional
.ofNullable(ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings))
.map(Integer::shortValue)
.orElse(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD)
);
this.osService = osService;
this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue()));
}

public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) {
super(threshold.shortValue());
this.nativeMemThreshold = threshold;
this.osService = osService;
}

Expand All @@ -38,13 +44,8 @@ public String getName() {
return ML_MEMORY_CB;
}

@Override
public Short getThreshold() {
return this.nativeMemThreshold.shortValue();
}

@Override
public boolean isOpen() {
return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue();
return osService.stats().getMem().getUsedPercent() > getThreshold();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@

package org.opensearch.ml.breaker;

import lombok.Data;

/**
* An abstract class for all breakers with threshold.
* @param <T> data type of threshold
*/
@Data
public abstract class ThresholdCircuitBreaker<T> implements CircuitBreaker {

private T threshold;
private volatile T threshold;

public ThresholdCircuitBreaker(T threshold) {
this.threshold = threshold;
}

public T getThreshold() {
return threshold;
}

@Override
public abstract boolean isOpen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ public List<Setting<?>> getSettings() {
MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE,
MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX,
MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD,
MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD,
MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD,
MLCommonsSettings.ML_COMMONS_EXCLUDE_NODE_NAMES,
MLCommonsSettings.ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.function.Function;

import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.ml.common.conversation.ConversationalIndexConstants;
import org.opensearch.searchpipelines.questionanswering.generative.GenerativeQAProcessorConstants;

Expand Down Expand Up @@ -77,6 +79,14 @@ private MLCommonsSettings() {}
public static final Setting<Integer> ML_COMMONS_JVM_HEAP_MEM_THRESHOLD = Setting
.intSetting("plugins.ml_commons.jvm_heap_memory_threshold", 85, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<ByteSizeValue> ML_COMMONS_DISK_FREE_SPACE_THRESHOLD = Setting
.byteSizeSetting(
"plugins.ml_commons.disk_free_space_threshold",
new ByteSizeValue(5L, ByteSizeUnit.GB),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: another alternative could be ByteSizeValue instead of int. it has more fine-grained control.


public static final Setting<String> ML_COMMONS_EXCLUDE_NODE_NAMES = Setting
.simpleString("plugins.ml_commons.exclude_nodes._name", Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<Boolean> ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN = Setting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the license header seems not with right format

/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 */


package org.opensearch.ml.breaker;

import static org.mockito.Mockito.when;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD;

import java.io.File;
import java.util.HashSet;
import java.util.List;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;

public class DiskCircuitBreakerTests {
@Mock
ClusterService clusterService;

@Mock
File file;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);
when(clusterService.getClusterSettings())
.thenReturn(new ClusterSettings(Settings.EMPTY, new HashSet<>(List.of(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD))));
}

@Test
public void test_isOpen_whenDiskFreeSpaceIsHigherThanMinValue_breakerIsNotOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(
Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(4L, ByteSizeUnit.GB)).build(),
clusterService,
file
);
when(file.getFreeSpace()).thenReturn(5 * 1024 * 1024 * 1024L);
Assert.assertFalse(breaker.isOpen());
}

@Test
public void test_isOpen_whenDiskFreeSpaceIsLessThanMinValue_breakerIsOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(
Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(5L, ByteSizeUnit.GB)).build(),
clusterService,
file
);
when(file.getFreeSpace()).thenReturn(4 * 1024 * 1024 * 1024L);
Assert.assertTrue(breaker.isOpen());
}

@Test
public void test_isOpen_whenDiskFreeSpaceConfiguredToZero_breakerIsNotOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(
Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(0L, ByteSizeUnit.KB)).build(),
clusterService,
file
);
when(file.getFreeSpace()).thenReturn((long) (Math.random() * 1024 * 1024 * 1024 * 1024L));
Assert.assertFalse(breaker.isOpen());
}

@Test
public void test_getName() {
CircuitBreaker breaker = new DiskCircuitBreaker(Settings.EMPTY, clusterService, file);
Assert.assertEquals("Disk Circuit Breaker", breaker.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.ml.breaker;

import static org.mockito.Mockito.when;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD;

Expand Down Expand Up @@ -103,7 +104,9 @@ public void testInit() {
.build();
ClusterSettings clusterSettings = new ClusterSettings(
settings,
new HashSet<>(Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD, ML_COMMONS_JVM_HEAP_MEM_THRESHOLD))
new HashSet<>(
Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD, ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, ML_COMMONS_DISK_FREE_SPACE_THRESHOLD)
)
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
mlCircuitBreakerService = new MLCircuitBreakerService(jvmService, osService, settings, clusterService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public void setupSettings() throws IOException {
String jsonEntity = "{\n"
+ " \"persistent\" : {\n"
+ " \"plugins.ml_commons.jvm_heap_memory_threshold\" : 100, \n"
+ " \"plugins.ml_commons.native_memory_threshold\" : 100 \n"
+ " \"plugins.ml_commons.native_memory_threshold\" : 100, \n"
+ " \"plugins.ml_commons.disk_free_space_threshold\" : 0 \n"
+ " }\n"
+ "}";
response = TestHelper
Expand Down
Loading