Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Change the job topology spreading rules (#1182)
Browse files Browse the repository at this point in the history
* Change the job topology spreading rules

* Address code review requests

* Address code review requests
  • Loading branch information
tbak authored Nov 19, 2021
1 parent 73a1d61 commit f19eb9c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ public interface KubePodConfiguration {
@DefaultValue("NONE")
String getDisabledJobSpreadingPattern();

/**
* Job spreading on nodes skew adjustment factor, with the max skew computed as job_size/alpha.
* Setting value <= 1 effectively disables the spreading (max skew >= job size).
*/
@DefaultValue("3")
double getJobSpreadingSkewAlpha();

/**
* The maximum allowed skew. This is how many tasks from a job can land on the same node.
*/
@DefaultValue("48")
int getJobSpreadingMaxSkew();

/**
* @return the pod spec target region to use
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ private Optional<V1TopologySpreadConstraint> buildJobTopologySpreadConstraints(J
return Optional.empty();
}
int maxSkew = getJobMaxSkew(job);
if (maxSkew <= 0) {
return Optional.empty();
}

V1TopologySpreadConstraint nodeConstraint = new V1TopologySpreadConstraint()
.topologyKey(KubeConstants.NODE_LABEL_MACHINE_ID)
Expand Down Expand Up @@ -133,7 +136,7 @@ private boolean isJobSpreadingEnabled(Job<?> job) {
*
* @return -1 if max skew not set or is invalid
*/
private static int getJobMaxSkew(Job<?> job) {
private int getJobMaxSkew(Job<?> job) {
String maxSkewAttr = job.getJobDescriptor().getAttributes().get(JobAttributes.JOB_ATTRIBUTES_SPREADING_MAX_SKEW);
if (maxSkewAttr != null) {
try {
Expand All @@ -146,15 +149,20 @@ private static int getJobMaxSkew(Job<?> job) {
}

DisruptionBudgetPolicy policy = job.getJobDescriptor().getDisruptionBudget().getDisruptionBudgetPolicy();
// Job spreading is only relevant for jobs that care about the availability.
if (!(policy instanceof AvailabilityPercentageLimitDisruptionBudgetPolicy)) {
return 1;
return -1;
}
int jobSize = getJobDesiredSize(job);
if (jobSize <= 1) {
return 1;
return -1;
}
double alpha = configuration.getJobSpreadingSkewAlpha();
if (alpha <= 0) {
return -1;
}
AvailabilityPercentageLimitDisruptionBudgetPolicy availabilityPolicy = (AvailabilityPercentageLimitDisruptionBudgetPolicy) policy;
int maxSkew = (int) (jobSize * (100 - availabilityPolicy.getPercentageOfHealthyContainers()) / 100);
return Math.max(maxSkew, 1);
int skew = (int) (Math.floor(jobSize / alpha));
// The skew must be between 1 and the configured max skew.
return Math.max(1, Math.min(skew, configuration.getJobSpreadingMaxSkew()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class DefaultTopologyFactoryTest {
public void setUp() throws Exception {
topologyFactory = new DefaultTopologyFactory(configuration, features);
when(configuration.getDisabledJobSpreadingPattern()).thenReturn("NONE");
when(configuration.getJobSpreadingSkewAlpha()).thenReturn(3.0);
when(configuration.getJobSpreadingMaxSkew()).thenReturn(48);
when(features.isRelocationBinpackingEnabled()).thenReturn(true);
}

Expand Down Expand Up @@ -96,13 +98,12 @@ public void testBatchJobSpreading() {

@Test
public void testServiceJobSpreadingWithAvailabilityPercentageDisruptionBudget() {
// By default no job spreading
Job<ServiceJobExt> job = JobGenerator.serviceJobs(JobDescriptorGenerator.oneTaskServiceJobDescriptor()).getValue();
job = JobFunctions.changeServiceJobCapacity(job, Capacity.newBuilder().withDesired(100).withMax(100).build());
job = JobFunctions.changeDisruptionBudget(job, PERCENTAGE_OF_HEALTH_POLICY);
List<V1TopologySpreadConstraint> constraints = topologyFactory.buildTopologySpreadConstraints(job);
assertThat(constraints).hasSize(1);
assertThat(constraints.get(0).getMaxSkew()).isEqualTo(5);
assertThat(constraints.get(0).getMaxSkew()).isEqualTo(33);

// And now add zone constraint
job = JobFunctions.appendSoftConstraint(job, JobConstraints.ZONE_BALANCE, "true");
Expand All @@ -120,6 +121,8 @@ public void testServiceJobSpreadingWithAvailabilityPercentageDisruptionBudget()
@Test
public void testJobSpreadingDisabledConfiguration() {
Job<ServiceJobExt> job = JobGenerator.serviceJobs(JobDescriptorGenerator.oneTaskServiceJobDescriptor()).getValue();
job = JobFunctions.changeServiceJobCapacity(job, Capacity.newBuilder().withDesired(100).withMax(100).build());
job = JobFunctions.changeDisruptionBudget(job, PERCENTAGE_OF_HEALTH_POLICY);
assertThat(topologyFactory.buildTopologySpreadConstraints(job)).hasSize(1);

when(configuration.getDisabledJobSpreadingPattern()).thenReturn(".*");
Expand Down

0 comments on commit f19eb9c

Please sign in to comment.