From a599283c25115cdda791ed7a9b465565369d7579 Mon Sep 17 00:00:00 2001 From: tbak Date: Fri, 6 May 2022 14:11:27 -0700 Subject: [PATCH] Remove the V2 resource consumption endpoint (#1273) --- .../master/endpoint/v2/rest/JerseyModule.java | 1 - .../v2/rest/ResourceConsumptionEndpoint.java | 35 --- .../v2/rest/ResourceConsumptionResource.java | 51 ---- .../CapacityManagementAttributes.java | 28 -- .../CompositeResourceConsumption.java | 119 -------- .../service/management/ManagementModule.java | 6 - .../management/ResourceConsumption.java | 113 ------- .../management/ResourceConsumptionEvents.java | 72 ----- .../ResourceConsumptionService.java | 42 --- .../management/ResourceConsumptions.java | 226 -------------- .../DefaultResourceConsumptionService.java | 234 --------------- .../ResourceConsumptionEvaluator.java | 280 ------------------ .../internal/ResourceConsumptionLog.java | 125 -------- .../ResourceConsumptionServiceMetrics.java | 217 -------------- .../internal/ConsumptionModelGenerator.java | 208 ------------- .../ResourceConsumptionEvaluatorTest.java | 252 ---------------- .../internal/ResourceConsumptionLogTest.java | 73 ----- 17 files changed, 2082 deletions(-) delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/ResourceConsumptionEndpoint.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/ResourceConsumptionResource.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/CapacityManagementAttributes.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/CompositeResourceConsumption.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumption.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptionEvents.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptionService.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptions.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/DefaultResourceConsumptionService.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionEvaluator.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionLog.java delete mode 100644 titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionServiceMetrics.java delete mode 100644 titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ConsumptionModelGenerator.java delete mode 100644 titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionEvaluatorTest.java delete mode 100644 titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionLogTest.java diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/JerseyModule.java b/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/JerseyModule.java index a2825042c7..ad8817cf16 100644 --- a/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/JerseyModule.java +++ b/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/JerseyModule.java @@ -80,7 +80,6 @@ public UnaryOperator getConfig() { // V2 resources config.getClasses().add(ApplicationSlaManagementResource.class); - config.getClasses().add(ResourceConsumptionResource.class); return config; }; diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/ResourceConsumptionEndpoint.java b/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/ResourceConsumptionEndpoint.java deleted file mode 100644 index 6deb62e8ff..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/ResourceConsumptionEndpoint.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.endpoint.v2.rest; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -/** - * Internal REST API for exposing resource consumption information. - */ -@Path(ResourceConsumptionEndpoint.PATH_API_V2_RESOURCE_CONSUMPTION) -@Produces(MediaType.APPLICATION_JSON) -public interface ResourceConsumptionEndpoint { - String PATH_API_V2_RESOURCE_CONSUMPTION = "/api/v2/resource/consumption"; - - @GET - Response getSystemConsumption(); -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/ResourceConsumptionResource.java b/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/ResourceConsumptionResource.java deleted file mode 100644 index 090ed1608b..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/endpoint/v2/rest/ResourceConsumptionResource.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.endpoint.v2.rest; - -import java.util.Optional; -import javax.inject.Inject; -import javax.inject.Singleton; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; - -import com.netflix.titus.master.service.management.CompositeResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumptionService; - -@Path(ResourceConsumptionEndpoint.PATH_API_V2_RESOURCE_CONSUMPTION) -@Produces(MediaType.APPLICATION_JSON) -@Singleton -public class ResourceConsumptionResource implements ResourceConsumptionEndpoint { - - private final ResourceConsumptionService service; - - @Inject - public ResourceConsumptionResource(ResourceConsumptionService service) { - this.service = service; - } - - @Override - public Response getSystemConsumption() { - Optional systemConsumption = service.getSystemConsumption(); - if (systemConsumption.isPresent()) { - return Response.ok(systemConsumption.get()).build(); - } - return Response.status(Status.SERVICE_UNAVAILABLE).build(); - } -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/CapacityManagementAttributes.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/CapacityManagementAttributes.java deleted file mode 100644 index 8802aa3980..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/CapacityManagementAttributes.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2019 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management; - -/** - * Collection of capacity management related attributes that can be associated with agent instance groups and agent instances. - */ -public final class CapacityManagementAttributes { - - /** - * Mark an instance group such that capacity management will ignore the instance group when doing its operations. - */ - public static final String IGNORE = "titus.capacityManagement.ignore"; -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/CompositeResourceConsumption.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/CompositeResourceConsumption.java deleted file mode 100644 index e2da7f4884..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/CompositeResourceConsumption.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management; - -import java.util.Collections; -import java.util.Map; - -import com.netflix.titus.api.model.ResourceDimension; - -/** - */ -public class CompositeResourceConsumption extends ResourceConsumption { - - private final Map contributors; - private final ResourceDimension allowedConsumption; - private final boolean aboveLimit; - - public CompositeResourceConsumption(String consumerName, - ConsumptionLevel consumptionLevel, - ResourceDimension currentConsumption, - ResourceDimension maxConsumption, - ResourceDimension allowedConsumption, - Map attributes, - Map contributors, - boolean aboveLimit) { - super(consumerName, consumptionLevel, currentConsumption, maxConsumption, attributes); - this.contributors = contributors; - this.allowedConsumption = allowedConsumption; - this.aboveLimit = aboveLimit; - } - - public Map getContributors() { - return contributors; - } - - public ResourceDimension getAllowedConsumption() { - return allowedConsumption; - } - - public boolean isAboveLimit() { - return aboveLimit; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - - CompositeResourceConsumption that = (CompositeResourceConsumption) o; - - if (aboveLimit != that.aboveLimit) { - return false; - } - if (contributors != null ? !contributors.equals(that.contributors) : that.contributors != null) { - return false; - } - return allowedConsumption != null ? allowedConsumption.equals(that.allowedConsumption) : that.allowedConsumption == null; - - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (contributors != null ? contributors.hashCode() : 0); - result = 31 * result + (allowedConsumption != null ? allowedConsumption.hashCode() : 0); - result = 31 * result + (aboveLimit ? 1 : 0); - return result; - } - - @Override - public String toString() { - return "CompositeResourceConsumption{" + - "consumerName='" + getConsumerName() + '\'' + - ", consumptionLevel=" + getConsumptionLevel() + - ", currentConsumption=" + getCurrentConsumption() + - ", maxConsumption=" + getMaxConsumption() + - ", attributes=" + getAttributes() + - "contributors=" + contributors + - ", allowedConsumption=" + allowedConsumption + - ", aboveLimit=" + aboveLimit + - '}'; - } - - public static CompositeResourceConsumption unused(String consumerName, - ConsumptionLevel consumptionLevel, - ResourceDimension allowedConsumption) { - return new CompositeResourceConsumption( - consumerName, - consumptionLevel, - ResourceDimension.empty(), - ResourceDimension.empty(), - allowedConsumption, - Collections.emptyMap(), - Collections.emptyMap(), - false - ); - } -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ManagementModule.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ManagementModule.java index 566bfb6975..2219996cbc 100644 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ManagementModule.java +++ b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ManagementModule.java @@ -21,8 +21,6 @@ import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.netflix.archaius.ConfigProxyFactory; -import com.netflix.titus.master.service.management.internal.DefaultResourceConsumptionService; -import com.netflix.titus.master.service.management.internal.ResourceConsumptionLog; import com.netflix.titus.master.service.management.kube.KubeApplicationSlaManagementService; public class ManagementModule extends AbstractModule { @@ -30,10 +28,6 @@ public class ManagementModule extends AbstractModule { @Override protected void configure() { bind(ApplicationSlaManagementService.class).to(KubeApplicationSlaManagementService.class); - - // Resource consumption monitoring - bind(ResourceConsumptionService.class).to(DefaultResourceConsumptionService.class).asEagerSingleton(); - bind(ResourceConsumptionLog.class).asEagerSingleton(); } @Provides diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumption.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumption.java deleted file mode 100644 index 5ad47325d5..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumption.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management; - -import java.util.Map; - -import com.netflix.titus.api.model.ResourceDimension; - -/** - * Hierarchical resource consumption, which is system / tiers / capacity groups / applications. - */ -public class ResourceConsumption { - - public static final String SYSTEM_CONSUMER = "system"; - - public enum ConsumptionLevel {System, Tier, CapacityGroup, Application, InstanceType} - - private final String consumerName; - private final ConsumptionLevel consumptionLevel; - private final ResourceDimension currentConsumption; - private final ResourceDimension maxConsumption; - private final Map attributes; - - public ResourceConsumption(String consumerName, ConsumptionLevel consumptionLevel, ResourceDimension currentConsumption, ResourceDimension maxConsumption, Map attributes) { - this.consumerName = consumerName; - this.consumptionLevel = consumptionLevel; - this.currentConsumption = currentConsumption; - this.maxConsumption = maxConsumption; - this.attributes = attributes; - } - - public String getConsumerName() { - return consumerName; - } - - public ConsumptionLevel getConsumptionLevel() { - return consumptionLevel; - } - - public ResourceDimension getCurrentConsumption() { - return currentConsumption; - } - - public ResourceDimension getMaxConsumption() { - return maxConsumption; - } - - public Map getAttributes() { - return attributes; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ResourceConsumption that = (ResourceConsumption) o; - - if (consumerName != null ? !consumerName.equals(that.consumerName) : that.consumerName != null) { - return false; - } - if (consumptionLevel != that.consumptionLevel) { - return false; - } - if (currentConsumption != null ? !currentConsumption.equals(that.currentConsumption) : that.currentConsumption != null) { - return false; - } - if (maxConsumption != null ? !maxConsumption.equals(that.maxConsumption) : that.maxConsumption != null) { - return false; - } - return attributes != null ? attributes.equals(that.attributes) : that.attributes == null; - - } - - @Override - public int hashCode() { - int result = consumerName != null ? consumerName.hashCode() : 0; - result = 31 * result + (consumptionLevel != null ? consumptionLevel.hashCode() : 0); - result = 31 * result + (currentConsumption != null ? currentConsumption.hashCode() : 0); - result = 31 * result + (maxConsumption != null ? maxConsumption.hashCode() : 0); - result = 31 * result + (attributes != null ? attributes.hashCode() : 0); - return result; - } - - @Override - public String toString() { - return "ResourceConsumption{" + - "consumerName='" + consumerName + '\'' + - ", consumptionLevel=" + consumptionLevel + - ", currentConsumption=" + currentConsumption + - ", maxConsumption=" + maxConsumption + - ", attributes=" + attributes + - '}'; - } -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptionEvents.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptionEvents.java deleted file mode 100644 index 52cd70f19e..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptionEvents.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management; - -/** - */ -public class ResourceConsumptionEvents { - public abstract static class ResourceConsumptionEvent { - private final String capacityGroup; - private final long timestamp; - - public ResourceConsumptionEvent(String capacityGroup, long timestamp) { - this.capacityGroup = capacityGroup; - this.timestamp = timestamp; - } - - public String getCapacityGroup() { - return capacityGroup; - } - - public long getTimestamp() { - return timestamp; - } - } - - /** - * Objects of this class represent a resource utilization for a capacity category at a point in time. - */ - public static class CapacityGroupAllocationEvent extends ResourceConsumptionEvent { - - private final CompositeResourceConsumption capacityGroupConsumption; - - public CapacityGroupAllocationEvent(String capacityGroup, - long timestamp, - CompositeResourceConsumption capacityGroupConsumption) { - super(capacityGroup, timestamp); - this.capacityGroupConsumption = capacityGroupConsumption; - } - - public CompositeResourceConsumption getCapacityGroupConsumption() { - return capacityGroupConsumption; - } - } - - public static class CapacityGroupUndefinedEvent extends ResourceConsumptionEvent { - public CapacityGroupUndefinedEvent(String capacityGroup, - long timestamp) { - super(capacityGroup, timestamp); - } - } - - public static class CapacityGroupRemovedEvent extends ResourceConsumptionEvent { - public CapacityGroupRemovedEvent(String capacityGroup, - long timestamp) { - super(capacityGroup, timestamp); - } - } -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptionService.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptionService.java deleted file mode 100644 index e1df2e73ee..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptionService.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management; - -import java.util.Optional; - -import rx.Observable; - -/** - * A service that observers resource utilization by jobs, and checks if configured SLAs are not violated. - */ -public interface ResourceConsumptionService { - - /** - * Returns current system-level resource consumption. The return object is a composite, that provides - * further details at tier, capacity group and application levels. - * - * @return non-null value if system consumption data are available - */ - Optional getSystemConsumption(); - - /** - * Returns an observable that emits an item for each capacity category with its current resource utilization. - * On subscribe emits a single item for each known category, followed by updates caused by either resource - * consumption change or SLA update. - */ - Observable resourceConsumptionEvents(); -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptions.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptions.java deleted file mode 100644 index a9d8aaadc4..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/ResourceConsumptions.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - -import com.google.common.base.Preconditions; -import com.netflix.titus.api.model.ResourceDimension; -import com.netflix.titus.master.model.ResourceDimensions; - -import static java.util.Arrays.asList; - -public final class ResourceConsumptions { - - private ResourceConsumptions() { - } - - /** - * Find {@link ResourceConsumption} instance in the consumption hierarchy. - */ - public static Optional findConsumption(ResourceConsumption parent, String... path) { - ResourceConsumption current = parent; - for (String segment : path) { - if (!(current instanceof CompositeResourceConsumption)) { - return Optional.empty(); - } - current = ((CompositeResourceConsumption) current).getContributors().get(segment); - if (current == null) { - return Optional.empty(); - } - } - return Optional.of(current); - } - - /** - * Find all {@link ResourceConsumption} instances at a specific consumption level. - */ - public static Map groupBy(CompositeResourceConsumption parent, - ResourceConsumption.ConsumptionLevel level) { - Preconditions.checkArgument(parent.getConsumptionLevel().ordinal() <= level.ordinal()); - - if (parent.getConsumptionLevel() == level) { - return Collections.singletonMap(parent.getConsumerName(), parent); - } - - // If this is the Application level, the next one needs to be the InstanceType level, which is the last one - if (parent.getConsumptionLevel() == ResourceConsumption.ConsumptionLevel.Application) { - Preconditions.checkArgument(level == ResourceConsumption.ConsumptionLevel.InstanceType); - return parent.getContributors(); - } - - Map result = new HashMap<>(); - for (ResourceConsumption nested : parent.getContributors().values()) { - result.putAll(groupBy((CompositeResourceConsumption) nested, level)); - } - return result; - } - - /** - * Add a collection of {@link ResourceConsumption} values of the same kind (the same consumption name and level). - * - * @return aggregated value of the same type as the input - */ - public static C add(Collection consumptions) { - verifySameKind(consumptions); - - List consumptionList = new ArrayList<>(consumptions); - if (consumptionList.size() == 1) { - return consumptionList.get(0); - } - - ResourceDimension currentUsage = addCurrentConsumptions(consumptionList); - ResourceDimension maxUsage = addMaxConsumptions(consumptionList); - - Map mergedAttrs = mergeAttributesOf(consumptions); - - C first = consumptionList.get(0); - if (first instanceof CompositeResourceConsumption) { - Map mergedContributors = new HashMap<>(); - consumptionList.stream() - .map(CompositeResourceConsumption.class::cast) - .flatMap(c -> c.getContributors().entrySet().stream()) - .forEach(entry -> mergedContributors.compute(entry.getKey(), (name, current) -> - current == null ? entry.getValue() : ResourceConsumptions.add(current, entry.getValue()) - )); - - ResourceDimension allowedUsage = addAllowedConsumptions((Collection) consumptionList); - - return (C) new CompositeResourceConsumption( - first.getConsumerName(), - first.getConsumptionLevel(), - currentUsage, - maxUsage, - allowedUsage, - mergedAttrs, - mergedContributors, - !ResourceDimensions.isBigger(allowedUsage, maxUsage) - ); - } - - return (C) new ResourceConsumption( - first.getConsumerName(), - first.getConsumptionLevel(), - currentUsage, - maxUsage, - mergedAttrs - ); - } - - /** - * See {@link #add(Collection)} for description. - */ - public static ResourceConsumption add(ResourceConsumption... consumptions) { - return add(asList(consumptions)); - } - - /** - * Create a parent {@link CompositeResourceConsumption} instance from the provided consumption collection. - */ - public static CompositeResourceConsumption aggregate(String parentName, - ResourceConsumption.ConsumptionLevel parentLevel, - Collection consumptions, - ResourceDimension allowedUsage) { - - Map contributors = new HashMap<>(); - consumptions.forEach(c -> contributors.put(c.getConsumerName(), c)); - - ResourceDimension currentUsage = addCurrentConsumptions(consumptions); - ResourceDimension maxUsage = addMaxConsumptions(consumptions); - - Map mergedAttrs = mergeAttributesOf(consumptions); - - return new CompositeResourceConsumption( - parentName, - parentLevel, - currentUsage, - maxUsage, - allowedUsage, mergedAttrs, - contributors, - !ResourceDimensions.isBigger(allowedUsage, maxUsage) - ); - } - - public static CompositeResourceConsumption aggregate(String parentName, - ResourceConsumption.ConsumptionLevel parentLevel, - Collection consumptions) { - ResourceDimension allowedUsage = addAllowedConsumptions(consumptions); - return aggregate(parentName, parentLevel, consumptions, allowedUsage); - } - - public static ResourceDimension addCurrentConsumptions(Collection consumptions) { - List all = consumptions.stream().map(ResourceConsumption::getCurrentConsumption).collect(Collectors.toList()); - return ResourceDimensions.add(all); - } - - public static ResourceDimension addMaxConsumptions(Collection consumptions) { - List all = consumptions.stream().map(ResourceConsumption::getMaxConsumption).collect(Collectors.toList()); - return ResourceDimensions.add(all); - } - - public static ResourceDimension addAllowedConsumptions(Collection consumptions) { - List all = consumptions.stream().map(CompositeResourceConsumption::getAllowedConsumption).collect(Collectors.toList()); - return ResourceDimensions.add(all); - } - - public static Map mergeAttributes(Collection> attrCollection) { - Map result = new HashMap<>(); - attrCollection.stream().flatMap(ac -> ac.entrySet().stream()).forEach(entry -> { - Object effectiveValue = entry.getValue(); - if (result.containsKey(entry.getKey()) && effectiveValue instanceof Integer) { - try { - effectiveValue = ((Integer) effectiveValue).intValue() + ((Integer) result.get(entry.getKey())).intValue(); - } catch (Exception ignore) { - // This is best effort only - } - } - result.put(entry.getKey(), effectiveValue); - }); - return result; - } - - public static Map mergeAttributesOf(Collection attrCollection) { - return mergeAttributes(attrCollection.stream().map(ResourceConsumption::getAttributes).collect(Collectors.toList())); - } - - private static void verifySameKind(Collection consumptions) { - Preconditions.checkArgument(!consumptions.isEmpty(), "Empty argument list"); - - Iterator it = consumptions.iterator(); - C ref = it.next(); - - if (!it.hasNext()) { - return; - } - - while (it.hasNext()) { - C second = it.next(); - Preconditions.checkArgument(ref.getConsumerName().equals(second.getConsumerName()), - "Consumer names different %s != %s", ref.getConsumerName(), second.getConsumerName()); - Preconditions.checkArgument(ref.getConsumptionLevel().equals(second.getConsumptionLevel()), - "Consumption levels different %s != %s", ref.getConsumptionLevel(), second.getConsumptionLevel()); - } - } -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/DefaultResourceConsumptionService.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/DefaultResourceConsumptionService.java deleted file mode 100644 index 4195219287..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/DefaultResourceConsumptionService.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management.internal; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import javax.annotation.PreDestroy; -import javax.inject.Inject; -import javax.inject.Singleton; - -import com.google.common.annotations.VisibleForTesting; -import com.netflix.spectator.api.Registry; -import com.netflix.titus.api.jobmanager.service.V3JobOperations; -import com.netflix.titus.common.util.guice.ProxyType; -import com.netflix.titus.common.util.guice.annotation.Activator; -import com.netflix.titus.common.util.guice.annotation.ProxyConfiguration; -import com.netflix.titus.common.util.rx.ObservableExt; -import com.netflix.titus.master.MetricConstants; -import com.netflix.titus.master.service.management.ApplicationSlaManagementService; -import com.netflix.titus.master.service.management.CompositeResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption.ConsumptionLevel; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupAllocationEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupRemovedEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupUndefinedEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.ResourceConsumptionEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionService; -import com.netflix.titus.master.service.management.ResourceConsumptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.Scheduler; -import rx.Subscription; -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -import static com.netflix.titus.common.util.CollectionsExt.copyAndRemove; - -/** - * Periodically checks for SLA violations, and a pre-configured interval. - * - * @TODO Improve this implementation by immediately reacting to state changes in the system. - */ -@Singleton -@ProxyConfiguration(types = ProxyType.ActiveGuard) -public class DefaultResourceConsumptionService implements ResourceConsumptionService { - - private static final Logger logger = LoggerFactory.getLogger(DefaultResourceConsumptionService.class); - - private static final String METRIC_CONSUMPTION = MetricConstants.METRIC_CAPACITY_MANAGEMENT + "consumption."; - - static final long UPDATE_INTERVAL_MS = 5000; - - private final Supplier evaluator; - private final Registry registry; - private final Scheduler.Worker worker; - - private ResourceConsumptionServiceMetrics metrics; - private Subscription subscription; - - private final PublishSubject eventsSubject = PublishSubject.create(); - - private volatile ConsumptionEvaluationResult latestEvaluation; - - @Inject - public DefaultResourceConsumptionService(ApplicationSlaManagementService applicationSlaManagementService, - V3JobOperations v3JobOperations, - Registry registry) { - this(ResourceConsumptionEvaluator.newEvaluator(applicationSlaManagementService, v3JobOperations), registry, Schedulers.computation()); - } - - @VisibleForTesting - DefaultResourceConsumptionService(Supplier evaluator, - Registry registry, - Scheduler scheduler) { - this.evaluator = evaluator; - this.registry = registry; - this.worker = scheduler.createWorker(); - } - - @Activator - public Observable enterActiveMode() { - logger.info("Entering active mode"); - this.metrics = new ResourceConsumptionServiceMetrics(registry.createId(METRIC_CONSUMPTION), registry); - this.subscription = worker.schedulePeriodically(this::updateInfo, 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS); - return Observable.empty(); - } - - @PreDestroy - public void shutdown() { - if (subscription != null) { - subscription.unsubscribe(); - } - worker.unsubscribe(); - eventsSubject.onCompleted(); - } - - @Override - public Optional getSystemConsumption() { - return latestEvaluation == null ? Optional.empty() : Optional.of(latestEvaluation.getSystemConsumption()); - } - - @Override - public Observable resourceConsumptionEvents() { - return eventsSubject.compose(ObservableExt.head(() -> { - if (latestEvaluation == null) { - return Collections.emptyList(); - } - - long now = worker.now(); - List allEvents = new ArrayList<>(); - - // Capacity group consumptions - Collection groupConsumptions = ResourceConsumptions.groupBy( - latestEvaluation.getSystemConsumption(), ConsumptionLevel.CapacityGroup - ).values(); - groupConsumptions - .forEach(consumption -> - allEvents.add(new CapacityGroupAllocationEvent(consumption.getConsumerName(), now, (CompositeResourceConsumption) consumption)) - ); - - // Undefined capacity groups - latestEvaluation.getUndefinedCapacityGroups() - .forEach(capacityGroup -> - allEvents.add(new CapacityGroupUndefinedEvent(capacityGroup, now)) - ); - - return allEvents; - })); - } - - private void updateInfo() { - try { - ConsumptionEvaluationResult evaluationResult = evaluator.get(); - metrics.update(evaluationResult); - - ConsumptionEvaluationResult oldEvaluation = latestEvaluation; - this.latestEvaluation = evaluationResult; - - if (eventsSubject.hasObservers()) { - notifyAboutRemovedCapacityGroups(oldEvaluation); - notifyAboutUndefinedCapacityGroups(oldEvaluation); - notifyAboutResourceConsumptionChange(oldEvaluation); - } - } catch (Exception e) { - logger.warn("Resource consumption update failure", e); - } - logger.debug("Resource consumption update finished"); - } - - private void notifyAboutRemovedCapacityGroups(ConsumptionEvaluationResult oldEvaluation) { - if (oldEvaluation != null) { - Set removed = copyAndRemove(oldEvaluation.getDefinedCapacityGroups(), latestEvaluation.getDefinedCapacityGroups()); - removed.forEach(category -> publishEvent(new CapacityGroupRemovedEvent(category, worker.now()))); - } - } - - private void notifyAboutResourceConsumptionChange(ConsumptionEvaluationResult oldEvaluation) { - Map newCapacityGroupConsumptions = ResourceConsumptions.groupBy( - latestEvaluation.getSystemConsumption(), ConsumptionLevel.CapacityGroup - ); - Map oldCapacityGroupConsumptions = oldEvaluation == null - ? Collections.emptyMap() - : ResourceConsumptions.groupBy(oldEvaluation.getSystemConsumption(), ConsumptionLevel.CapacityGroup); - - long now = worker.now(); - newCapacityGroupConsumptions.values().forEach(newConsumption -> { - ResourceConsumption previous = oldCapacityGroupConsumptions.get(newConsumption.getConsumerName()); - if (previous == null || !previous.equals(newConsumption)) { - publishEvent(new CapacityGroupAllocationEvent(newConsumption.getConsumerName(), now, - (CompositeResourceConsumption) newConsumption)); - } - }); - } - - private void notifyAboutUndefinedCapacityGroups(ConsumptionEvaluationResult oldEvaluation) { - long now = worker.now(); - Set toNotify = oldEvaluation == null - ? latestEvaluation.getUndefinedCapacityGroups() - : copyAndRemove(latestEvaluation.getUndefinedCapacityGroups(), oldEvaluation.getUndefinedCapacityGroups()); - toNotify.forEach(capacityGroup -> publishEvent(new CapacityGroupUndefinedEvent(capacityGroup, now))); - } - - private void publishEvent(ResourceConsumptionEvent event) { - eventsSubject.onNext(event); - } - - static class ConsumptionEvaluationResult { - private final Set definedCapacityGroups; - private final Set undefinedCapacityGroups; - private final CompositeResourceConsumption systemConsumption; - - ConsumptionEvaluationResult(Set definedCapacityGroups, - Set undefinedCapacityGroups, - CompositeResourceConsumption systemConsumption) { - this.definedCapacityGroups = definedCapacityGroups; - this.undefinedCapacityGroups = undefinedCapacityGroups; - this.systemConsumption = systemConsumption; - } - - public Set getDefinedCapacityGroups() { - return definedCapacityGroups; - } - - public Set getUndefinedCapacityGroups() { - return undefinedCapacityGroups; - } - - public CompositeResourceConsumption getSystemConsumption() { - return systemConsumption; - } - } -} \ No newline at end of file diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionEvaluator.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionEvaluator.java deleted file mode 100644 index 26b70ee402..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionEvaluator.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management.internal; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import com.google.common.annotations.VisibleForTesting; -import com.netflix.titus.api.jobmanager.TaskAttributes; -import com.netflix.titus.api.jobmanager.model.job.ContainerResources; -import com.netflix.titus.api.jobmanager.model.job.Job; -import com.netflix.titus.api.jobmanager.model.job.JobFunctions; -import com.netflix.titus.api.jobmanager.model.job.Task; -import com.netflix.titus.api.jobmanager.model.job.TaskState; -import com.netflix.titus.api.jobmanager.model.job.ext.BatchJobExt; -import com.netflix.titus.api.jobmanager.model.job.ext.ServiceJobExt; -import com.netflix.titus.api.jobmanager.service.V3JobOperations; -import com.netflix.titus.api.model.ApplicationSLA; -import com.netflix.titus.api.model.ResourceDimension; -import com.netflix.titus.api.model.Tier; -import com.netflix.titus.common.util.CollectionsExt; -import com.netflix.titus.common.util.Evaluators; -import com.netflix.titus.common.util.tuple.Pair; -import com.netflix.titus.master.model.ResourceDimensions; -import com.netflix.titus.master.service.management.ApplicationSlaManagementService; -import com.netflix.titus.master.service.management.CompositeResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption.ConsumptionLevel; -import com.netflix.titus.master.service.management.ResourceConsumptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.netflix.titus.common.util.CollectionsExt.copyAndRemove; -import static com.netflix.titus.master.service.management.ApplicationSlaManagementService.DEFAULT_APPLICATION; -import static com.netflix.titus.master.service.management.ResourceConsumption.SYSTEM_CONSUMER; -import static java.util.stream.Collectors.groupingBy; - -/** - * Computes current resource consumption. - */ -class ResourceConsumptionEvaluator { - - private static final Logger logger = LoggerFactory.getLogger(ResourceConsumptionEvaluator.class); - - private final V3JobOperations v3JobOperations; - private final Set definedCapacityGroups; - private final Map applicationSlaMap; - - private CompositeResourceConsumption systemConsumption; - private Set undefinedCapacityGroups; - - ResourceConsumptionEvaluator(ApplicationSlaManagementService applicationSlaManagementService, - V3JobOperations v3JobOperations) { - this.v3JobOperations = v3JobOperations; - Collection applicationSLAs = applicationSlaManagementService.getApplicationSLAs(); - this.definedCapacityGroups = applicationSLAs.stream().map(ApplicationSLA::getAppName).collect(Collectors.toSet()); - this.applicationSlaMap = applicationSLAs.stream().collect(Collectors.toMap(ApplicationSLA::getAppName, Function.identity())); - - Pair>, Set> allocationsByCapacityGroupPair = computeAllocationsByCapacityGroupAndAppName(); - this.systemConsumption = buildSystemConsumption(allocationsByCapacityGroupPair.getLeft()); - this.undefinedCapacityGroups = allocationsByCapacityGroupPair.getRight(); - } - - Set getDefinedCapacityGroups() { - return definedCapacityGroups; - } - - Set getUndefinedCapacityGroups() { - return undefinedCapacityGroups; - } - - public CompositeResourceConsumption getSystemConsumption() { - return systemConsumption; - } - - private CompositeResourceConsumption buildSystemConsumption(Map> capacityGroupConsumptionMap) { - // Capacity group level - Map> tierConsumptions = new HashMap<>(); - capacityGroupConsumptionMap.forEach((capacityGroup, appConsumptions) -> { - - ApplicationSLA sla = applicationSlaMap.get(capacityGroup); - ResourceDimension allowedConsumption = ResourceDimensions.multiply(sla.getResourceDimension(), sla.getInstanceCount()); - ResourceDimension maxConsumption = ResourceConsumptions.addMaxConsumptions(appConsumptions.values()); - - List> attrsList = appConsumptions.values().stream().map(ResourceConsumption::getAttributes).collect(Collectors.toList()); - CompositeResourceConsumption capacityGroupConsumption = new CompositeResourceConsumption( - capacityGroup, - ConsumptionLevel.CapacityGroup, - ResourceConsumptions.addCurrentConsumptions(appConsumptions.values()), - maxConsumption, - allowedConsumption, - ResourceConsumptions.mergeAttributes(attrsList), - appConsumptions, - !ResourceDimensions.isBigger(allowedConsumption, maxConsumption) - ); - tierConsumptions.computeIfAbsent(sla.getTier(), t -> new ArrayList<>()).add(capacityGroupConsumption); - }); - - // Tier level - List aggregatedTierConsumptions = new ArrayList<>(); - tierConsumptions.forEach((tier, consumptions) -> - aggregatedTierConsumptions.add(ResourceConsumptions.aggregate(tier.name(), ConsumptionLevel.Tier, consumptions)) - ); - - // System level - return ResourceConsumptions.aggregate(SYSTEM_CONSUMER, ConsumptionLevel.System, aggregatedTierConsumptions); - } - - /** - * @return capacityGroups -> apps -> instanceTypes -> consumption - */ - private Pair>, Set> computeAllocationsByCapacityGroupAndAppName() { - Map> consumptionMap = new HashMap<>(); - Set undefinedCapacityGroups = new HashSet<>(); - - v3JobOperations.getJobsAndTasks().forEach(jobsAndTasks -> { - Job job = jobsAndTasks.getLeft(); - List tasks = jobsAndTasks.getRight(); - List runningTasks = getRunningWorkers(tasks); - - ResourceDimension taskResources = perTaskResourceDimension(job); - String appName = Evaluators.getOrDefault(job.getJobDescriptor().getApplicationName(), DEFAULT_APPLICATION); - ResourceDimension currentConsumption = ResourceDimensions.multiply(taskResources, runningTasks.size()); - ResourceDimension maxConsumption = ResourceDimensions.multiply(taskResources, getMaxJobSize(job)); - - Map> tasksByInstanceType = tasks.stream().collect( - groupingBy(task -> task.getTaskContext() - .getOrDefault(TaskAttributes.TASK_ATTRIBUTES_AGENT_ITYPE, "unknown")) - ); - Map consumptionByInstanceType = CollectionsExt.mapValuesWithKeys( - tasksByInstanceType, - (instanceType, instanceTypeTasks) -> { - List runningInstanceTypeTasks = getRunningWorkers(instanceTypeTasks); - ResourceDimension instanceTypeConsumption = ResourceDimensions.multiply( - taskResources, - runningInstanceTypeTasks.size() - ); - return new ResourceConsumption( - instanceType, - ConsumptionLevel.InstanceType, - instanceTypeConsumption, - instanceTypeConsumption, // maxConsumption is not relevant at ConsumptionLevel.InstanceType - getWorkerStateMap(instanceTypeTasks) - ); - }, - HashMap::new - ); - ResourceConsumption jobConsumption = new CompositeResourceConsumption( - appName, - ConsumptionLevel.Application, - currentConsumption, - maxConsumption, - maxConsumption, // allowedConsumption is not relevant at ConsumptionLevel.Application - getWorkerStateMap(tasks), - consumptionByInstanceType, - false // we consider a job is always within its allowed usage since it can't go over its max - ); - - String capacityGroup = resolveCapacityGroup(undefinedCapacityGroups, job, appName); - updateConsumptionMap(appName, capacityGroup, jobConsumption, consumptionMap); - }); - - // Add unused capacity groups - copyAndRemove(definedCapacityGroups, consumptionMap.keySet()).forEach(capacityGroup -> - consumptionMap.put(capacityGroup, Collections.emptyMap()) - ); - - return Pair.of(consumptionMap, undefinedCapacityGroups); - } - - private void updateConsumptionMap(String applicationName, - String capacityGroup, - ResourceConsumption jobConsumption, - Map> consumptionMap) { - - Map capacityGroupAllocation = consumptionMap.computeIfAbsent(capacityGroup, k -> new HashMap<>()); - - String effectiveAppName = applicationName == null ? DEFAULT_APPLICATION : applicationName; - ResourceConsumption appAllocation = capacityGroupAllocation.get(effectiveAppName); - - if (appAllocation == null) { - capacityGroupAllocation.put(effectiveAppName, jobConsumption); - } else { - capacityGroupAllocation.put(effectiveAppName, ResourceConsumptions.add(appAllocation, jobConsumption)); - } - } - - private int getMaxJobSize(Job job) { - return JobFunctions.isServiceJob(job) - ? ((Job) job).getJobDescriptor().getExtensions().getCapacity().getMax() - : ((Job) job).getJobDescriptor().getExtensions().getSize(); - } - - private Map getWorkerStateMap(List tasks) { - Map tasksStates = newTaskStateMap(); - tasks.stream().map(task -> task.getStatus().getState()).forEach(taskState -> - tasksStates.put(taskState.name(), (int) tasksStates.get(taskState.name()) + 1) - ); - return tasksStates; - } - - private Map newTaskStateMap() { - Map tasksStates = new HashMap<>(); - for (TaskState state : TaskState.values()) { - tasksStates.put(state.name(), 0); - } - return tasksStates; - } - - private String resolveCapacityGroup(Set undefinedCapacityGroups, Job job, String appName) { - String capacityGroup = job.getJobDescriptor().getCapacityGroup(); - if (capacityGroup == null) { - if (appName != null && definedCapacityGroups.contains(appName)) { - capacityGroup = appName; - } - } - if (capacityGroup == null) { - capacityGroup = DEFAULT_APPLICATION; - } else if (!definedCapacityGroups.contains(capacityGroup)) { - undefinedCapacityGroups.add(capacityGroup); - capacityGroup = DEFAULT_APPLICATION; - } - return capacityGroup; - } - - static Supplier newEvaluator(ApplicationSlaManagementService applicationSlaManagementService, - V3JobOperations v3JobOperations) { - return () -> { - ResourceConsumptionEvaluator evaluator = new ResourceConsumptionEvaluator(applicationSlaManagementService, v3JobOperations); - return new DefaultResourceConsumptionService.ConsumptionEvaluationResult( - evaluator.getDefinedCapacityGroups(), - evaluator.getUndefinedCapacityGroups(), - evaluator.getSystemConsumption() - ); - }; - } - - /** - * @return resource dimensions per task as defined in the job descriptor. - */ - @VisibleForTesting - static ResourceDimension perTaskResourceDimension(Job job) { - ContainerResources containerResources = job.getJobDescriptor().getContainer().getContainerResources(); - - return new ResourceDimension( - containerResources.getCpu(), - containerResources.getGpu(), - containerResources.getMemoryMB(), - containerResources.getDiskMB(), - containerResources.getNetworkMbps(), - 0); - } - - private List getRunningWorkers(List tasks) { - return tasks.stream().filter(t -> TaskState.isRunning(t.getStatus().getState())).collect(Collectors.toList()); - } -} \ No newline at end of file diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionLog.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionLog.java deleted file mode 100644 index e854c1490d..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionLog.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management.internal; - -import javax.annotation.PreDestroy; -import javax.inject.Inject; -import javax.inject.Singleton; - -import com.netflix.titus.common.util.guice.annotation.Activator; -import com.netflix.titus.master.model.ResourceDimensions; -import com.netflix.titus.master.service.management.CompositeResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupAllocationEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupRemovedEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupUndefinedEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.Subscription; - -/** - * A resource consumption state change event log. - */ -@Singleton -public class ResourceConsumptionLog { - - private static final Logger logger = LoggerFactory.getLogger(ResourceConsumptionLog.class); - - private final ResourceConsumptionService resourceConsumptionService; - private Subscription subscription; - - @Inject - public ResourceConsumptionLog(ResourceConsumptionService resourceConsumptionService) { - this.resourceConsumptionService = resourceConsumptionService; - } - - @Activator - public Observable enterActiveMode() { - logger.info("Activating resource consumption logging..."); - this.subscription = resourceConsumptionService.resourceConsumptionEvents().subscribe( - ResourceConsumptionLog::doLog, - e -> logger.error("Resource consumption log terminated with an error", e), - () -> logger.warn("Resource consumption log completed") - ); - return Observable.empty(); - } - - @PreDestroy - public void shutdown() { - if (subscription != null) { - subscription.unsubscribe(); - } - } - - /* Visible for testing */ - static String doLog(ResourceConsumptionEvents.ResourceConsumptionEvent event) { - if (event instanceof CapacityGroupAllocationEvent) { - CapacityGroupAllocationEvent changeEvent = (CapacityGroupAllocationEvent) event; - CompositeResourceConsumption consumption = changeEvent.getCapacityGroupConsumption(); - - StringBuilder sb = new StringBuilder("Resource consumption change: group="); - sb.append(consumption.getConsumerName()); - - if (consumption.isAboveLimit()) { - sb.append(" [above limit] "); - } else { - sb.append(" [below limit] "); - } - - sb.append("actual="); - ResourceDimensions.format(consumption.getCurrentConsumption(), sb); - sb.append(", max="); - ResourceDimensions.format(consumption.getMaxConsumption(), sb); - sb.append(", limit="); - ResourceDimensions.format(consumption.getAllowedConsumption(), sb); - - if (consumption.getAttributes().isEmpty()) { - sb.append(", attrs={}"); - } else { - sb.append(", attrs={"); - consumption.getAttributes().forEach((k, v) -> sb.append(k).append('=').append(v).append(',')); - sb.setCharAt(sb.length() - 1, '}'); - } - - String message = sb.toString(); - if (consumption.isAboveLimit()) { - logger.warn(message); - } else { - logger.info(message); - } - return message; - } - - if (event instanceof CapacityGroupUndefinedEvent) { - String message = "Capacity group not defined: group=" + event.getCapacityGroup(); - logger.warn(message); - return message; - } - - if (event instanceof CapacityGroupRemovedEvent) { - String message = "Capacity group no longer defined: group=" + event.getCapacityGroup(); - logger.info(message); - return message; - } - - String message = "Unrecognized resource consumption event type " + event.getClass(); - logger.error(message); - return message; - } -} diff --git a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionServiceMetrics.java b/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionServiceMetrics.java deleted file mode 100644 index a6ec96c636..0000000000 --- a/titus-server-master/src/main/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionServiceMetrics.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management.internal; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.base.Preconditions; -import com.netflix.spectator.api.Id; -import com.netflix.spectator.api.Registry; -import com.netflix.spectator.api.patterns.PolledMeter; -import com.netflix.titus.api.model.ResourceDimension; -import com.netflix.titus.common.util.tuple.Pair; -import com.netflix.titus.master.service.management.CompositeResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption.ConsumptionLevel; - -import static com.netflix.titus.common.util.CollectionsExt.copyAndRemove; - -class ResourceConsumptionServiceMetrics { - - private final Id rootId; - private final Registry registry; - - private final Map, ApplicationMetrics> metricsByCapacityGroupAndApp = new HashMap<>(); - private final Map capacityGroupLimits = new HashMap<>(); - - private volatile long updateTimestamp; - - ResourceConsumptionServiceMetrics(Id rootId, Registry registry) { - this.rootId = rootId; - this.registry = registry; - this.updateTimestamp = registry.clock().wallTime(); - - registry.gauge( - registry.createId(rootId.name() + "updateDelay", rootId.tags()), - this, - self -> (registry.clock().wallTime() - updateTimestamp) - ); - } - - public void update(DefaultResourceConsumptionService.ConsumptionEvaluationResult evaluationResult) { - CompositeResourceConsumption systemConsumption = evaluationResult.getSystemConsumption(); - - Set> touchedApps = new HashSet<>(); - Set touchedGroups = new HashSet<>(); - - systemConsumption.getContributors().values().forEach(tierConsumption -> - updateTier((CompositeResourceConsumption) tierConsumption, touchedApps, touchedGroups) - ); - - // Remove no longer referenced items - copyAndRemove(metricsByCapacityGroupAndApp.keySet(), touchedApps).forEach(removed -> - metricsByCapacityGroupAndApp.remove(removed).reset() - ); - copyAndRemove(capacityGroupLimits.keySet(), touchedGroups).forEach(removed -> - capacityGroupLimits.remove(removed).reset() - ); - - updateTimestamp = registry.clock().wallTime(); - } - - private void updateTier(CompositeResourceConsumption tierConsumption, - Set> touchedApps, - Set touchedGroups) { - String tierName = tierConsumption.getConsumerName(); - Set capacityGroupNames = tierConsumption.getContributors().keySet(); - Collection capacityGroupConsumption = tierConsumption.getContributors().values(); - - // Process application level metrics - capacityGroupConsumption - .forEach(groupConsumption -> { - ((CompositeResourceConsumption) groupConsumption).getContributors().values() - .forEach(appConsumption -> { - touchedApps.add(updateApp(tierName, groupConsumption, appConsumption)); - }); - } - ); - - // Process capacity group level metrics - capacityGroupConsumption.forEach((groupConsumption) -> updateCapacityGroupLimit(tierName, (CompositeResourceConsumption) groupConsumption)); - touchedGroups.addAll(capacityGroupNames); - - } - - private Pair updateApp(String tierName, ResourceConsumption groupConsumption, ResourceConsumption appConsumption) { - Pair key = Pair.of(groupConsumption.getConsumerName(), appConsumption.getConsumerName()); - ApplicationMetrics metrics = metricsByCapacityGroupAndApp.get(key); - if (metrics == null) { - metrics = new ApplicationMetrics(tierName, groupConsumption.getConsumerName(), appConsumption.getConsumerName()); - metricsByCapacityGroupAndApp.put(key, metrics); - } - metrics.update(appConsumption); - - return key; - } - - private void updateCapacityGroupLimit(String tierName, CompositeResourceConsumption groupConsumption) { - String name = groupConsumption.getConsumerName(); - ResourceMetrics metrics = capacityGroupLimits.get(name); - if (metrics == null) { - metrics = new ResourceMetrics(registry.createId(rootId.name() + "limit", rootId.tags()) - .withTag("tier", tierName) - .withTag("capacityGroup", name)); - capacityGroupLimits.put(name, metrics); - } - metrics.update(groupConsumption.getAllowedConsumption()); - } - - private enum ResourceType {Cpu, Memory, Disk, Network, Gpu, OpportunisticCpu} - - private class ResourceMetrics { - private final Map usage; - - private ResourceMetrics(Id id) { - this.usage = initialize(id); - } - - private Map initialize(Id id) { - Map result = new EnumMap<>(ResourceType.class); - for (ResourceType rt : ResourceType.values()) { - result.put(rt, PolledMeter.using(registry) - .withId(id.withTag("resourceType", rt.name())) - .monitorValue(new AtomicLong())); - } - return result; - } - - private void update(ResourceDimension consumption) { - usage.get(ResourceType.Cpu).set((int) consumption.getCpu()); - usage.get(ResourceType.Memory).set(consumption.getMemoryMB()); - usage.get(ResourceType.Disk).set(consumption.getDiskMB()); - usage.get(ResourceType.Network).set(consumption.getNetworkMbs()); - usage.get(ResourceType.Gpu).set(consumption.getGpu()); - usage.get(ResourceType.OpportunisticCpu).set(consumption.getOpportunisticCpu()); - } - - private void reset() { - usage.values().forEach(g -> g.set(0)); - } - } - - private class ApplicationMetrics { - private final String tierName; - private final String capacityGroup; - private final String appName; - private final ResourceMetrics maxUsage; - private final Map actualUsageByInstanceType = new HashMap<>(); - - private ApplicationMetrics(String tierName, String capacityGroup, String appName) { - this.tierName = tierName; - this.capacityGroup = capacityGroup; - this.appName = appName; - this.maxUsage = new ResourceMetrics( - registry.createId(rootId.name() + "maxUsage", rootId.tags()) - .withTag("tier", tierName) - .withTag("capacityGroup", capacityGroup) - .withTag("applicationName", appName) - ); - } - - private ResourceMetrics buildInstanceTypeMetrics(String instanceType) { - return new ResourceMetrics( - registry.createId(rootId.name() + "actualUsage", rootId.tags()) - .withTag("tier", tierName) - .withTag("capacityGroup", capacityGroup) - .withTag("applicationName", appName) - .withTag("instanceType", instanceType) - ); - } - - private void update(ResourceConsumption appConsumption) { - Preconditions.checkArgument(ConsumptionLevel.Application.equals(appConsumption.getConsumptionLevel())); - maxUsage.update(appConsumption.getMaxConsumption()); - - Set instanceTypesInUse = new HashSet<>(); - Map consumptionByInstanceType = ((CompositeResourceConsumption) appConsumption).getContributors(); - consumptionByInstanceType.forEach((instanceType, consumption) -> { - instanceTypesInUse.add(instanceType); - actualUsageByInstanceType.computeIfAbsent(instanceType, this::buildInstanceTypeMetrics) - .update(consumption.getCurrentConsumption()); - }); - // clean up instance types not being used anymore, create a copy to avoid ConcurrentModificationException - List trackedInstanceTypes = new ArrayList<>(actualUsageByInstanceType.keySet()); - trackedInstanceTypes.stream() - .filter(i -> !instanceTypesInUse.contains(i)) - .forEach(toRemove -> actualUsageByInstanceType.remove(toRemove).reset()); - } - - private void reset() { - maxUsage.reset(); - actualUsageByInstanceType.values().forEach(ResourceMetrics::reset); - actualUsageByInstanceType.clear(); - } - } -} diff --git a/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ConsumptionModelGenerator.java b/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ConsumptionModelGenerator.java deleted file mode 100644 index 4aa8c1737e..0000000000 --- a/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ConsumptionModelGenerator.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management.internal; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - -import com.google.common.base.Preconditions; -import com.netflix.titus.api.model.ApplicationSLA; -import com.netflix.titus.api.model.ResourceDimension; -import com.netflix.titus.api.model.Tier; -import com.netflix.titus.common.util.CollectionsExt; -import com.netflix.titus.master.model.ResourceDimensions; -import com.netflix.titus.master.service.management.CompositeResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption.ConsumptionLevel; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.ResourceConsumptionEvent; -import com.netflix.titus.master.service.management.ResourceConsumptions; -import com.netflix.titus.testkit.data.core.ApplicationSlaSample; - -/** - * A helper class to generate test data for capacity groups and resource consumption. - */ -class ConsumptionModelGenerator { - - static final ApplicationSLA CRITICAL_SLA_1 = ApplicationSlaSample.CriticalSmall.build(); - static final ApplicationSLA NOT_USED_SLA = ApplicationSlaSample.CriticalLarge.build(); - static final ApplicationSLA DEFAULT_SLA = ApplicationSlaSample.DefaultFlex.build(); - - private final Map capacityGroupMap = new HashMap<>(); - - private final Map> actualConsumptionByGroupAndApp = new HashMap<>(); - private final Map> maxConsumptionByApp = new HashMap<>(); - - public ConsumptionModelGenerator() { - capacityGroupMap.put(CRITICAL_SLA_1.getAppName(), CRITICAL_SLA_1); - capacityGroupMap.put(NOT_USED_SLA.getAppName(), NOT_USED_SLA); - capacityGroupMap.put(DEFAULT_SLA.getAppName(), DEFAULT_SLA); - } - - void addConsumption(String capacityGroup, String appName, ResourceDimension actualConsumption, ResourceDimension maxConsumption) { - addConsumption(capacityGroup, appName, actualConsumption, actualConsumptionByGroupAndApp); - addConsumption(capacityGroup, appName, maxConsumption, maxConsumptionByApp); - } - - private static void addConsumption(String capacityGroup, String appName, ResourceDimension consumption, Map> output) { - Map appsMap = output.computeIfAbsent(capacityGroup, k -> new HashMap<>()); - if (appsMap.get(appName) == null) { - appsMap.put(appName, consumption); - } else { - appsMap.put( - appName, - ResourceDimensions.add(appsMap.get(appName), consumption) - ); - } - } - - Map getCapacityGroupMap() { - return new HashMap<>(capacityGroupMap); - } - - Set getDefinedCapacityGroupNames() { - return capacityGroupMap.keySet(); - } - - void removeCapacityGroup(String capacityGroup) { - Preconditions.checkState(capacityGroupMap.containsKey(capacityGroup)); - capacityGroupMap.remove(capacityGroup); - } - - DefaultResourceConsumptionService.ConsumptionEvaluationResult getEvaluation() { - Map groupConsumptionMap = new HashMap<>(); - - Set definedCapacityGroups = new HashSet<>(capacityGroupMap.keySet()); - - // Used capacity groups - - Set capacityGroupNames = actualConsumptionByGroupAndApp.keySet(); - for (String capacityGroupName : capacityGroupNames) { - List appConsumptions = buildApplicationConsumptions(capacityGroupName); - - CompositeResourceConsumption groupConsumption = ResourceConsumptions.aggregate( - capacityGroupName, - ConsumptionLevel.CapacityGroup, - appConsumptions, - capacityGroupLimit(capacityGroupMap.getOrDefault(capacityGroupName, DEFAULT_SLA)) - ); - - groupConsumptionMap.put(capacityGroupName, groupConsumption); - } - - // Unused capacity groups - CollectionsExt.copyAndRemove(definedCapacityGroups, capacityGroupNames).forEach(capacityGroup -> { - ApplicationSLA sla = capacityGroupMap.getOrDefault(capacityGroup, DEFAULT_SLA); - ResourceDimension limit = capacityGroupLimit(sla); - groupConsumptionMap.put(capacityGroup, new CompositeResourceConsumption( - capacityGroup, - ConsumptionLevel.CapacityGroup, - ResourceDimension.empty(), - ResourceDimension.empty(), - limit, - Collections.emptyMap(), - Collections.emptyMap(), - false - )); - }); - - // Undefined capacity groups - Set undefinedCapacityGroups = CollectionsExt.copyAndRemove(capacityGroupNames, definedCapacityGroups); - - // Tier consumption - Map> tierCapacityGroups = groupConsumptionMap.values().stream() - .collect(Collectors.groupingBy(rc -> { - ApplicationSLA sla = capacityGroupMap.get(rc.getConsumerName()); - if (sla == null) { - sla = capacityGroupMap.get(DEFAULT_SLA.getAppName()); - } - return sla.getTier(); - })); - Map tierConsumptions = new HashMap<>(); - tierCapacityGroups.forEach((tier, consumptions) -> - tierConsumptions.put( - tier.name(), - ResourceConsumptions.aggregate( - tier.name(), - ConsumptionLevel.Tier, - consumptions - ) - )); - - // System consumption - CompositeResourceConsumption systemConsumption = ResourceConsumptions.aggregate( - ResourceConsumption.SYSTEM_CONSUMER, - ConsumptionLevel.System, - tierConsumptions.values() - ); - - return new DefaultResourceConsumptionService.ConsumptionEvaluationResult( - definedCapacityGroups, - undefinedCapacityGroups, - systemConsumption - ); - } - - private List buildApplicationConsumptions(String capacityGroupName) { - Map actual = actualConsumptionByGroupAndApp.get(capacityGroupName); - Map max = maxConsumptionByApp.get(capacityGroupName); - - List appConsumptions = new ArrayList<>(); - for (String appName : actual.keySet()) { - ResourceConsumption byInstanceType = new ResourceConsumption( - appName, - ConsumptionLevel.InstanceType, - actual.get(appName), - max.get(appName), - Collections.emptyMap() - ); - appConsumptions.add( - new CompositeResourceConsumption( - appName, - ConsumptionLevel.Application, - actual.get(appName), - max.get(appName), - max.get(appName), - Collections.emptyMap(), - Collections.singletonMap("itype.test", byInstanceType), - !ResourceDimensions.isBigger(max.get(appName), actual.get(appName)) - ) - ); - } - - return appConsumptions; - } - - static ResourceDimension capacityGroupLimit(ApplicationSLA sla) { - return ResourceDimensions.multiply(sla.getResourceDimension(), sla.getInstanceCount()); - } - - static Optional findEvent(List events, Class eventClass, String capacityGroup) { - for (ResourceConsumptionEvent event : events) { - if (eventClass.isAssignableFrom(event.getClass()) && event.getCapacityGroup().equals(capacityGroup)) { - return Optional.of((E) event); - } - } - return Optional.empty(); - } -} diff --git a/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionEvaluatorTest.java b/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionEvaluatorTest.java deleted file mode 100644 index db8742c981..0000000000 --- a/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionEvaluatorTest.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management.internal; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; - -import com.netflix.titus.api.jobmanager.model.job.ContainerResources; -import com.netflix.titus.api.jobmanager.model.job.Job; -import com.netflix.titus.api.jobmanager.model.job.JobDescriptor; -import com.netflix.titus.api.jobmanager.model.job.Task; -import com.netflix.titus.api.jobmanager.model.job.TaskState; -import com.netflix.titus.api.jobmanager.model.job.ext.BatchJobExt; -import com.netflix.titus.api.jobmanager.service.V3JobOperations; -import com.netflix.titus.api.model.ResourceDimension; -import com.netflix.titus.api.model.Tier; -import com.netflix.titus.common.runtime.TitusRuntime; -import com.netflix.titus.common.runtime.TitusRuntimes; -import com.netflix.titus.common.util.tuple.Pair; -import com.netflix.titus.master.model.ResourceDimensions; -import com.netflix.titus.master.service.management.ApplicationSlaManagementService; -import com.netflix.titus.master.service.management.CompositeResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption; -import com.netflix.titus.testkit.model.job.JobComponentStub; -import com.netflix.titus.testkit.model.job.JobDescriptorGenerator; -import org.junit.Before; -import org.junit.Test; - -import static com.netflix.titus.master.service.management.ResourceConsumptions.findConsumption; -import static com.netflix.titus.master.service.management.internal.ResourceConsumptionEvaluator.perTaskResourceDimension; -import static java.util.Arrays.asList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class ResourceConsumptionEvaluatorTest { - - private static final ContainerResources CONTAINER_RESOURCES = ContainerResources.newBuilder() - .withCpu(1) - .withMemoryMB(1024) - .withDiskMB(512) - .withNetworkMbps(128) - .build(); - - private final TitusRuntime titusRuntime = TitusRuntimes.test(); - - private final ApplicationSlaManagementService applicationSlaManagementService = mock(ApplicationSlaManagementService.class); - - private final V3JobOperations v3JobOperations = mock(V3JobOperations.class); - - private final JobComponentStub jobComponentStub = new JobComponentStub(titusRuntime); - - private final V3JobOperations jobOperations = jobComponentStub.getJobOperations(); - - @Before - public void setUp() throws Exception { - when(v3JobOperations.getJobsAndTasks()).then(invocation -> jobOperations.getJobsAndTasks()); - } - - @Test - public void testEvaluation() { - when(applicationSlaManagementService.getApplicationSLAs()).thenReturn(asList(ConsumptionModelGenerator.DEFAULT_SLA, ConsumptionModelGenerator.CRITICAL_SLA_1, ConsumptionModelGenerator.NOT_USED_SLA)); - - // Job with defined capacity group SLA - Pair> goodCapacity = newServiceJob( - "goodCapacityJob", - jd -> jd.toBuilder().withCapacityGroup(ConsumptionModelGenerator.CRITICAL_SLA_1.getAppName()).build() - ); - Job goodCapacityJob = goodCapacity.getLeft(); - - // Job without appName defined - Pair> noAppName = newServiceJob( - "badCapacityJob", - jd -> jd.toBuilder() - .withApplicationName("") - .withCapacityGroup(ConsumptionModelGenerator.DEFAULT_SLA.getAppName()) - .build() - ); - Job noAppNameJob = noAppName.getLeft(); - - // Job with capacity group for which SLA is not defined - Pair> badCapacity = newServiceJob( - "goodCapacityJob", - jd -> jd.toBuilder().withCapacityGroup("missingCapacityGroup").build() - ); - Job badCapacityJob = badCapacity.getLeft(); - - // Evaluate - ResourceConsumptionEvaluator evaluator = new ResourceConsumptionEvaluator(applicationSlaManagementService, v3JobOperations); - - Set undefined = evaluator.getUndefinedCapacityGroups(); - assertThat(undefined).contains("missingCapacityGroup"); - - CompositeResourceConsumption systemConsumption = evaluator.getSystemConsumption(); - Map tierConsumptions = systemConsumption.getContributors(); - assertThat(tierConsumptions).containsKeys(Tier.Critical.name(), Tier.Flex.name()); - - // Critical capacity group - CompositeResourceConsumption criticalConsumption = (CompositeResourceConsumption) findConsumption( - systemConsumption, Tier.Critical.name(), ConsumptionModelGenerator.CRITICAL_SLA_1.getAppName() - ).get(); - assertThat(criticalConsumption.getCurrentConsumption()).isEqualTo(perTaskResourceDimension(goodCapacityJob)); // We have single worker in Started state - - assertThat(criticalConsumption.getAllowedConsumption()).isEqualTo(ConsumptionModelGenerator.capacityGroupLimit(ConsumptionModelGenerator.CRITICAL_SLA_1)); - assertThat(criticalConsumption.isAboveLimit()).isTrue(); - - // Default capacity group - CompositeResourceConsumption defaultConsumption = (CompositeResourceConsumption) findConsumption( - systemConsumption, Tier.Flex.name(), ConsumptionModelGenerator.DEFAULT_SLA.getAppName() - ).get(); - assertThat(defaultConsumption.getCurrentConsumption()).isEqualTo(ResourceDimensions.add( - perTaskResourceDimension(noAppNameJob), - perTaskResourceDimension(badCapacityJob) - )); - - assertThat(defaultConsumption.getAllowedConsumption()).isEqualTo(ConsumptionModelGenerator.capacityGroupLimit(ConsumptionModelGenerator.DEFAULT_SLA)); - assertThat(defaultConsumption.isAboveLimit()).isFalse(); - - // Not used capacity group - CompositeResourceConsumption notUsedConsumption = (CompositeResourceConsumption) findConsumption( - systemConsumption, Tier.Critical.name(), ConsumptionModelGenerator.NOT_USED_SLA.getAppName() - ).get(); - assertThat(notUsedConsumption.getCurrentConsumption()).isEqualTo(ResourceDimension.empty()); - assertThat(notUsedConsumption.getAllowedConsumption()).isEqualTo(ConsumptionModelGenerator.capacityGroupLimit(ConsumptionModelGenerator.NOT_USED_SLA)); - assertThat(notUsedConsumption.isAboveLimit()).isFalse(); - } - - @SuppressWarnings("unchecked") - @Test - public void batchJobWithMultipleTasks() { - when(applicationSlaManagementService.getApplicationSLAs()).thenReturn(asList(ConsumptionModelGenerator.DEFAULT_SLA, ConsumptionModelGenerator.CRITICAL_SLA_1, ConsumptionModelGenerator.NOT_USED_SLA)); - - // Job with defined capacity group SLA - Job goodCapacityJob = newBatchJob( - "goodCapacityJob", - jd -> jd.toBuilder() - .withExtensions(jd.getExtensions().toBuilder().withSize(2).build()) - .withCapacityGroup(ConsumptionModelGenerator.CRITICAL_SLA_1.getAppName()) - .build() - ).getLeft(); - List goodCapacityTasks = jobComponentStub.getJobOperations().getTasks(goodCapacityJob.getId()); - - // Job without appName defined - Job noAppNameJob = newBatchJob( - "badCapacityJob", - jd -> jd.toBuilder() - .withApplicationName("") - .withExtensions(jd.getExtensions().toBuilder().withSize(2).build()) - .withCapacityGroup(ConsumptionModelGenerator.DEFAULT_SLA.getAppName()) - .build() - ).getLeft(); - List noAppNameTasks = jobComponentStub.getJobOperations().getTasks(noAppNameJob.getId()); - - // Job with capacity group for which SLA is not defined - Job badCapacityJob = newBatchJob( - "badCapacityJob", - jd -> jd.toBuilder() - .withExtensions(jd.getExtensions().toBuilder().withSize(2).build()) - .withCapacityGroup("missingCapacityGroup") - .build() - ).getLeft(); - List badCapacityTasks = jobComponentStub.getJobOperations().getTasks(badCapacityJob.getId()); - - // Evaluate - ResourceConsumptionEvaluator evaluator = new ResourceConsumptionEvaluator(applicationSlaManagementService, v3JobOperations); - - Set undefined = evaluator.getUndefinedCapacityGroups(); - assertThat(undefined).contains("missingCapacityGroup"); - - CompositeResourceConsumption systemConsumption = evaluator.getSystemConsumption(); - Map tierConsumptions = systemConsumption.getContributors(); - assertThat(tierConsumptions).containsKeys(Tier.Critical.name(), Tier.Flex.name()); - - // Critical capacity group - CompositeResourceConsumption criticalConsumption = (CompositeResourceConsumption) findConsumption( - systemConsumption, Tier.Critical.name(), ConsumptionModelGenerator.CRITICAL_SLA_1.getAppName() - ).get(); - assertThat(criticalConsumption.getCurrentConsumption()).isEqualTo(expectedCurrentConsumptionForBatchJob(goodCapacityJob, goodCapacityTasks)); - assertThat(criticalConsumption.getMaxConsumption()).isEqualTo(expectedMaxConsumptionForBatchJob(goodCapacityJob)); - assertThat(criticalConsumption.getAllowedConsumption()).isEqualTo(ConsumptionModelGenerator.capacityGroupLimit(ConsumptionModelGenerator.CRITICAL_SLA_1)); - assertThat(criticalConsumption.isAboveLimit()).isTrue(); - - // Default capacity group - CompositeResourceConsumption defaultConsumption = (CompositeResourceConsumption) findConsumption( - systemConsumption, Tier.Flex.name(), ConsumptionModelGenerator.DEFAULT_SLA.getAppName() - ).get(); - assertThat(defaultConsumption.getCurrentConsumption()).isEqualTo(ResourceDimensions.add( - expectedCurrentConsumptionForBatchJob(noAppNameJob, noAppNameTasks), - expectedCurrentConsumptionForBatchJob(badCapacityJob, badCapacityTasks) - )); - assertThat(defaultConsumption.getMaxConsumption()).isEqualTo(ResourceDimensions.add( - expectedMaxConsumptionForBatchJob(noAppNameJob), - expectedMaxConsumptionForBatchJob(badCapacityJob) - )); - assertThat(defaultConsumption.getAllowedConsumption()).isEqualTo(ConsumptionModelGenerator.capacityGroupLimit(ConsumptionModelGenerator.DEFAULT_SLA)); - assertThat(defaultConsumption.isAboveLimit()).isFalse(); - - // Not used capacity group - CompositeResourceConsumption notUsedConsumption = (CompositeResourceConsumption) findConsumption( - systemConsumption, Tier.Critical.name(), ConsumptionModelGenerator.NOT_USED_SLA.getAppName() - ).get(); - assertThat(notUsedConsumption.getCurrentConsumption()).isEqualTo(ResourceDimension.empty()); - assertThat(notUsedConsumption.getAllowedConsumption()).isEqualTo(ConsumptionModelGenerator.capacityGroupLimit(ConsumptionModelGenerator.NOT_USED_SLA)); - assertThat(notUsedConsumption.isAboveLimit()).isFalse(); - } - - private Pair> newServiceJob(String name, Function transformer) { - jobComponentStub.addJobTemplate(name, JobDescriptorGenerator.serviceJobDescriptors() - .map(jd -> jd.but(self -> self.getContainer().but(c -> CONTAINER_RESOURCES))) - .map(transformer::apply) - ); - return jobComponentStub.createJobAndTasks( - name, - (job, tasks) -> jobComponentStub.moveTaskToState(tasks.get(0), TaskState.Started) - ); - } - - private Pair> newBatchJob(String name, Function, JobDescriptor> transformer) { - jobComponentStub.addJobTemplate(name, JobDescriptorGenerator.batchJobDescriptors() - .map(jd -> jd.but(self -> self.getContainer().but(c -> CONTAINER_RESOURCES))) - .map(transformer::apply) - ); - return jobComponentStub.createJobAndTasks( - name, - (job, tasks) -> tasks.forEach(task -> jobComponentStub.moveTaskToState(task, TaskState.Started)) - ); - } - - private static ResourceDimension expectedCurrentConsumptionForBatchJob(Job job, List tasks) { - return ResourceDimensions.multiply(perTaskResourceDimension(job), job.getJobDescriptor().getExtensions().getSize()); - } - - private static ResourceDimension expectedMaxConsumptionForBatchJob(Job job) { - return ResourceDimensions.multiply(perTaskResourceDimension(job), job.getJobDescriptor().getExtensions().getSize()); - } -} \ No newline at end of file diff --git a/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionLogTest.java b/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionLogTest.java deleted file mode 100644 index 7f871486d6..0000000000 --- a/titus-server-master/src/test/java/com/netflix/titus/master/service/management/internal/ResourceConsumptionLogTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.titus.master.service.management.internal; - -import java.util.Collections; - -import com.netflix.titus.api.model.ResourceDimension; -import com.netflix.titus.master.service.management.CompositeResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumption; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupAllocationEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupRemovedEvent; -import com.netflix.titus.master.service.management.ResourceConsumptionEvents.CapacityGroupUndefinedEvent; -import org.junit.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -public class ResourceConsumptionLogTest { - - @Test - public void testAllocationEventLogging() throws Exception { - CompositeResourceConsumption consumption = new CompositeResourceConsumption( - "myCapacityGroup", - ResourceConsumption.ConsumptionLevel.CapacityGroup, - new ResourceDimension(1, 0, 1, 10, 10, 0), // actual - new ResourceDimension(2, 0, 2, 20, 20, 0), // max - new ResourceDimension(3, 0, 3, 30, 30, 0), // limit - Collections.singletonMap("attrKey", "attrValue"), - Collections.emptyMap(), - false - ); - CapacityGroupAllocationEvent event = new CapacityGroupAllocationEvent( - "myCapacityGroup", - System.currentTimeMillis(), - consumption - ); - String result = ResourceConsumptionLog.doLog(event); - - String expected = "Resource consumption change: group=myCapacityGroup [below limit] actual=[cpu=1.0, memoryMB=1, diskMB=10, networkMbs=10, gpu=0, opportunisticCpu=0], max=[cpu=2.0, memoryMB=2, diskMB=20, networkMbs=20, gpu=0, opportunisticCpu=0], limit=[cpu=3.0, memoryMB=3, diskMB=30, networkMbs=30, gpu=0, opportunisticCpu=0], attrs={attrKey=attrValue}"; - assertThat(result).isEqualTo(expected); - } - - @Test - public void testGroupUndefinedEvent() throws Exception { - CapacityGroupUndefinedEvent event = new CapacityGroupUndefinedEvent("myCapacityGroup", System.currentTimeMillis()); - String result = ResourceConsumptionLog.doLog(event); - - String expected = "Capacity group not defined: group=myCapacityGroup"; - assertThat(result).isEqualTo(expected); - } - - @Test - public void testGroupRemovedEvent() throws Exception { - CapacityGroupRemovedEvent event = new CapacityGroupRemovedEvent("myCapacityGroup", System.currentTimeMillis()); - String result = ResourceConsumptionLog.doLog(event); - - String expected = "Capacity group no longer defined: group=myCapacityGroup"; - assertThat(result).isEqualTo(expected); - } -} \ No newline at end of file