From 4e846e2f5766d7831c02ff7ee94fec93cccd9bd5 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 29 Aug 2024 22:02:27 +0530 Subject: [PATCH] refactor node level threshold Signed-off-by: Kiran Prakash --- .../cancellation/DefaultTaskCancellation.java | 16 +++++++--- .../DefaultTaskSelectionStrategy.java | 11 ++----- .../DefaultTaskCancellationTests.java | 32 ++++++++++++------- .../DefaultTaskSelectionStrategyTests.java | 15 ++------- 4 files changed, 37 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java b/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java index d502ce0394c63..12e4fbc595e6d 100644 --- a/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java +++ b/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java @@ -16,6 +16,7 @@ import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.wlm.QueryGroupLevelResourceUsageView; +import org.opensearch.wlm.WorkloadManagementSettings; import java.util.ArrayList; import java.util.Collection; @@ -47,6 +48,7 @@ public class DefaultTaskCancellation { private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); + protected final WorkloadManagementSettings workloadManagementSettings; protected final DefaultTaskSelectionStrategy defaultTaskSelectionStrategy; // a map of QueryGroupId to its corresponding QueryGroupLevelResourceUsageView object protected final Map queryGroupLevelResourceUsageViews; @@ -55,12 +57,14 @@ public class DefaultTaskCancellation { protected BooleanSupplier isNodeInDuress; public DefaultTaskCancellation( + WorkloadManagementSettings workloadManagementSettings, DefaultTaskSelectionStrategy defaultTaskSelectionStrategy, Map queryGroupLevelResourceUsageViews, Collection activeQueryGroups, Collection deletedQueryGroups, BooleanSupplier isNodeInDuress ) { + this.workloadManagementSettings = workloadManagementSettings; this.defaultTaskSelectionStrategy = defaultTaskSelectionStrategy; this.queryGroupLevelResourceUsageViews = queryGroupLevelResourceUsageViews; this.activeQueryGroups = activeQueryGroups; @@ -181,7 +185,7 @@ private List getTaskCancellations(QueryGroup queryGroup, Resou resourceType ); List taskCancellations = new ArrayList<>(); - for(Task task : selectedTasksToCancel) { + for (Task task : selectedTasksToCancel) { String cancellationReason = createCancellationReason(queryGroup, task, resourceType); taskCancellations.add(createTaskCancellation((CancellableTask) task, cancellationReason)); } @@ -213,7 +217,7 @@ protected List getTaskCancellationsForDeletedQueryGroup(QueryG queryGroupLevelResourceUsageViews.get(queryGroup.get_id()).getActiveTasks() ); List taskCancellations = new ArrayList<>(); - for(Task task : tasks) { + for (Task task : tasks) { String cancellationReason = "[Workload Management] Cancelling Task ID : " + task.getId() + " from QueryGroup ID : " @@ -235,12 +239,16 @@ private Long convertThresholdIntoLong(ResourceType resourceType, Double resource Long threshold = null; if (resourceType == ResourceType.MEMORY) { // Check if resource usage is breaching the threshold - threshold = (long) (resourceThresholdInPercentage * HEAP_SIZE_BYTES); + double nodeLevelCancellationThreshold = this.workloadManagementSettings.getNodeLevelMemoryCancellationThreshold() + * HEAP_SIZE_BYTES; + threshold = (long) (resourceThresholdInPercentage * nodeLevelCancellationThreshold); } else if (resourceType == ResourceType.CPU) { // Get the total CPU time of the process in milliseconds long cpuTotalTimeInMillis = ProcessProbe.getInstance().getProcessCpuTotalTime(); + double nodeLevelCancellationThreshold = this.workloadManagementSettings.getNodeLevelCpuCancellationThreshold() + * cpuTotalTimeInMillis; // Check if resource usage is breaching the threshold - threshold = (long) (resourceThresholdInPercentage * cpuTotalTimeInMillis); + threshold = (long) (resourceThresholdInPercentage * nodeLevelCancellationThreshold); } return threshold; } diff --git a/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskSelectionStrategy.java b/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskSelectionStrategy.java index 124873647c2e5..33b854ce5d760 100644 --- a/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskSelectionStrategy.java +++ b/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskSelectionStrategy.java @@ -46,11 +46,7 @@ public Comparator sortingCondition() { * @return The list of selected tasks * @throws IllegalArgumentException If the limit is less than zero */ - public List selectTasksForCancellation( - List tasks, - long limit, - ResourceType resourceType - ) { + public List selectTasksForCancellation(List tasks, long limit, ResourceType resourceType) { if (limit < 0) { throw new IllegalArgumentException("limit has to be greater than zero"); } @@ -84,9 +80,6 @@ public List selectTasksForCancellation( * @return A list of {@link TaskCancellation} objects representing the tasks selected for cancellation. */ public List selectTasksFromDeletedQueryGroup(List tasks) { - return tasks - .stream() - .filter(task -> task instanceof CancellableTask) - .collect(Collectors.toList()); + return tasks.stream().filter(task -> task instanceof CancellableTask).collect(Collectors.toList()); } } diff --git a/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskCancellationTests.java b/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskCancellationTests.java index a455478b27116..5c77c5c7f7a55 100644 --- a/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskCancellationTests.java @@ -17,6 +17,7 @@ import org.opensearch.tasks.TaskCancellation; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.wlm.QueryGroupLevelResourceUsageView; +import org.opensearch.wlm.WorkloadManagementSettings; import org.junit.Before; import java.util.Collection; @@ -39,13 +40,21 @@ public class DefaultTaskCancellationTests extends OpenSearchTestCase { private static class TestTaskCancellationImpl extends DefaultTaskCancellation { public TestTaskCancellationImpl( + WorkloadManagementSettings workloadManagementSettings, DefaultTaskSelectionStrategy defaultTaskSelectionStrategy, Map queryGroupLevelViews, Set activeQueryGroups, Set deletedQueryGroups, BooleanSupplier isNodeInDuress ) { - super(defaultTaskSelectionStrategy, queryGroupLevelViews, activeQueryGroups, deletedQueryGroups, isNodeInDuress); + super( + workloadManagementSettings, + defaultTaskSelectionStrategy, + queryGroupLevelViews, + activeQueryGroups, + deletedQueryGroups, + isNodeInDuress + ); } } @@ -53,13 +62,16 @@ public TestTaskCancellationImpl( private Set activeQueryGroups; private Set deletedQueryGroups; private DefaultTaskCancellation taskCancellation; + private WorkloadManagementSettings workloadManagementSettings; @Before public void setup() { + workloadManagementSettings = mock(WorkloadManagementSettings.class); queryGroupLevelViews = new HashMap<>(); activeQueryGroups = new HashSet<>(); deletedQueryGroups = new HashSet<>(); taskCancellation = new TestTaskCancellationImpl( + workloadManagementSettings, new DefaultTaskSelectionStrategy(), queryGroupLevelViews, activeQueryGroups, @@ -156,6 +168,7 @@ public void testGetCancellableTasksFrom_returnsNoTasksWhenNotBreachingThreshold( QueryGroupLevelResourceUsageView mockView = createResourceUsageViewMock(resourceType, usage); queryGroupLevelViews.put(queryGroupId1, mockView); activeQueryGroups.add(queryGroup1); + when(workloadManagementSettings.getNodeLevelCpuCancellationThreshold()).thenReturn(0.90); List cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup1); assertTrue(cancellableTasksFrom.isEmpty()); @@ -179,6 +192,7 @@ public void testGetCancellableTasksFrom_filtersQueryGroupCorrectly() { activeQueryGroups.add(queryGroup1); TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl( + workloadManagementSettings, new DefaultTaskSelectionStrategy(), queryGroupLevelViews, activeQueryGroups, @@ -208,6 +222,7 @@ public void testCancelTasks_cancelsGivenTasks() { activeQueryGroups.add(queryGroup1); TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl( + workloadManagementSettings, new DefaultTaskSelectionStrategy(), queryGroupLevelViews, activeQueryGroups, @@ -254,6 +269,7 @@ public void testCancelTasks_cancelsTasksFromDeletedQueryGroups() { deletedQueryGroups.add(deletedQueryGroup); TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl( + workloadManagementSettings, new DefaultTaskSelectionStrategy(), queryGroupLevelViews, activeQueryGroups, @@ -310,6 +326,7 @@ public void testCancelTasks_does_not_cancelTasksFromDeletedQueryGroups_whenNodeN deletedQueryGroups.add(deletedQueryGroup); TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl( + workloadManagementSettings, new DefaultTaskSelectionStrategy(), queryGroupLevelViews, activeQueryGroups, @@ -365,6 +382,7 @@ public void testCancelTasks_cancelsGivenTasks_WhenNodeInDuress() { Collections.addAll(activeQueryGroups, queryGroup1, queryGroup2); TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl( + workloadManagementSettings, new DefaultTaskSelectionStrategy(), queryGroupLevelViews, activeQueryGroups, @@ -470,18 +488,10 @@ private QueryGroupLevelResourceUsageView createResourceUsageViewMock(ResourceTyp return mockView; } - private QueryGroupLevelResourceUsageView createResourceUsageViewMock( - ResourceType resourceType, - Long usage, - Collection ids - ) { + private QueryGroupLevelResourceUsageView createResourceUsageViewMock(ResourceType resourceType, Long usage, Collection ids) { QueryGroupLevelResourceUsageView mockView = mock(QueryGroupLevelResourceUsageView.class); when(mockView.getResourceUsageData()).thenReturn(Collections.singletonMap(resourceType, usage)); - when(mockView.getActiveTasks()).thenReturn( - ids.stream() - .map(this::getRandomSearchTask) - .collect(Collectors.toList()) - ); + when(mockView.getActiveTasks()).thenReturn(ids.stream().map(this::getRandomSearchTask).collect(Collectors.toList())); return mockView; } diff --git a/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskSelectionStrategyTests.java b/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskSelectionStrategyTests.java index 9649a5dea0bb7..fc2b7e42406cb 100644 --- a/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskSelectionStrategyTests.java +++ b/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskSelectionStrategyTests.java @@ -10,20 +10,17 @@ import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchTask; -import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.core.tasks.TaskId; import org.opensearch.core.tasks.resourcetracker.ResourceStats; import org.opensearch.core.tasks.resourcetracker.ResourceStatsType; import org.opensearch.core.tasks.resourcetracker.ResourceUsageMetric; import org.opensearch.search.ResourceType; import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskCancellation; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; public class DefaultTaskSelectionStrategyTests extends OpenSearchTestCase { @@ -43,11 +40,7 @@ public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsGrea long reduceBy = 50L; ResourceType resourceType = ResourceType.MEMORY; List tasks = getListOfTasks(thresholdInLong); - List selectedTasks = testDefaultTaskSelectionStrategy.selectTasksForCancellation( - tasks, - reduceBy, - resourceType - ); + List selectedTasks = testDefaultTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType); assertFalse(selectedTasks.isEmpty()); assertTrue(tasksUsageMeetsThreshold(selectedTasks, reduceBy)); } @@ -72,11 +65,7 @@ public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsEqua long reduceBy = 0; ResourceType resourceType = ResourceType.MEMORY; List tasks = getListOfTasks(thresholdInLong); - List selectedTasks = testDefaultTaskSelectionStrategy.selectTasksForCancellation( - tasks, - reduceBy, - resourceType - ); + List selectedTasks = testDefaultTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType); assertTrue(selectedTasks.isEmpty()); }