diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java index f89ce749b06f..85c91f527283 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java @@ -58,6 +58,15 @@ public List getNormalizationTargets() { return normalizationTargets; } + @Override + public long getPlanSizeMb() { + long total = 0; + for (NormalizationTarget target : normalizationTargets) { + total += target.getRegionSizeMb(); + } + return total; + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java index 0e0d39d10b1c..de6db2cd554a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java @@ -34,4 +34,6 @@ enum PlanType { /** Returns the type of this plan */ PlanType getType(); + + long getPlanSizeMb(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java index 8890c2aba798..0a701cd3ad63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java @@ -19,8 +19,10 @@ import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -53,6 +55,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab "hbase.normalizer.throughput.max_bytes_per_sec"; private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec + static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb"; + static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE; + private final MasterServices masterServices; private final RegionNormalizer regionNormalizer; private final RegionNormalizerWorkQueue workQueue; @@ -62,6 +67,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab private final boolean defaultNormalizerTableLevel; private long splitPlanCount; private long mergePlanCount; + private final AtomicLong cumulativePlansSizeLimitMb; RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices, final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue workQueue) { @@ -73,6 +79,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab this.mergePlanCount = 0; this.rateLimiter = loadRateLimiter(configuration); this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration); + this.cumulativePlansSizeLimitMb = new AtomicLong( + configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)); } private boolean extractDefaultNormalizerValue(final Configuration configuration) { @@ -96,9 +104,20 @@ public void deregisterChildren(ConfigurationManager manager) { } } + private static long logLongConfigurationUpdated(final String key, final long oldValue, + final long newValue) { + if (oldValue != newValue) { + LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue); + } + return newValue; + } + @Override public void onConfigurationChange(Configuration conf) { rateLimiter.setRate(loadRateLimit(conf)); + cumulativePlansSizeLimitMb.set( + logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(), + conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB))); } private static RateLimiter loadRateLimiter(final Configuration configuration) { @@ -207,7 +226,10 @@ private List calculatePlans(final TableName tableName) { return Collections.emptyList(); } - final List plans = regionNormalizer.computePlansForTable(tblDesc); + List plans = regionNormalizer.computePlansForTable(tblDesc); + + plans = truncateForSize(plans); + if (CollectionUtils.isEmpty(plans)) { LOG.debug("No normalization required for table {}.", tableName); return Collections.emptyList(); @@ -215,6 +237,33 @@ private List calculatePlans(final TableName tableName) { return plans; } + private List truncateForSize(List plans) { + if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) { + List maybeTruncatedPlans = new ArrayList<>(plans.size()); + long totalCumulativeSizeMb = 0; + long truncatedCumulativeSizeMb = 0; + for (NormalizationPlan plan : plans) { + totalCumulativeSizeMb += plan.getPlanSizeMb(); + if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) { + truncatedCumulativeSizeMb += plan.getPlanSizeMb(); + maybeTruncatedPlans.add(plan); + } + } + if (maybeTruncatedPlans.size() != plans.size()) { + LOG.debug( + "Truncating list of normalization plans that RegionNormalizerWorker will process " + + "because of {}. Original list had {} plan(s), new list has {} plan(s). " + + "Original list covered regions with cumulative size {} mb, " + + "new list covers regions with cumulative size {} mb.", + CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(), + totalCumulativeSizeMb, truncatedCumulativeSizeMb); + } + return maybeTruncatedPlans; + } else { + return plans; + } + } + private void submitPlans(final List plans) { // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java index ceb8851645d4..a0c296de88f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.master.normalizer; +import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY; +import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.DEFAULT_CUMULATIVE_SIZE_LIMIT_MB; import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty; import java.time.Instant; @@ -229,6 +231,14 @@ public List computePlansForTable(final TableDescriptor tableD plans.addAll(mergePlans); } + if ( + normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB + ) { + // If we are going to truncate our list of plans, shuffle the split and merge plans together + // so that the merge plans, which are listed last, are not starved out. + shuffleNormalizationPlans(plans); + } + LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, " + "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount); return plans; @@ -464,6 +474,14 @@ private boolean isLargeEnoughForMerge(final NormalizerConfiguration normalizerCo return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx); } + /** + * This very simple method exists so we can verify it was called in a unit test. Visible for + * testing. + */ + void shuffleNormalizationPlans(List plans) { + Collections.shuffle(plans); + } + private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue, final Object... args) { final boolean value = predicate.getAsBoolean(); @@ -484,6 +502,7 @@ private static final class NormalizerConfiguration { private final int mergeMinRegionCount; private final Period mergeMinRegionAge; private final long mergeMinRegionSizeMb; + private final long cumulativePlansSizeLimitMb; private NormalizerConfiguration() { conf = null; @@ -492,6 +511,7 @@ private NormalizerConfiguration() { mergeMinRegionCount = DEFAULT_MERGE_MIN_REGION_COUNT; mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS); mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB; + cumulativePlansSizeLimitMb = DEFAULT_CUMULATIVE_SIZE_LIMIT_MB; } private NormalizerConfiguration(final Configuration conf, @@ -502,6 +522,8 @@ private NormalizerConfiguration(final Configuration conf, mergeMinRegionCount = parseMergeMinRegionCount(conf); mergeMinRegionAge = parseMergeMinRegionAge(conf); mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf); + cumulativePlansSizeLimitMb = + conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB); logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(), splitEnabled); logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(), @@ -574,6 +596,10 @@ public long getMergeMinRegionSizeMb(NormalizeContext context) { } return mergeMinRegionSizeMb; } + + private long getCumulativePlansSizeLimitMb() { + return cumulativePlansSizeLimitMb; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java index 6068eccdea17..5cbddda1483f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java @@ -45,6 +45,11 @@ public NormalizationTarget getSplitTarget() { return splitTarget; } + @Override + public long getPlanSizeMb() { + return splitTarget.getRegionSizeMb(); + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java index 995b3c74312a..5e8d332107c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java @@ -204,6 +204,36 @@ public void testRateLimit() throws Exception { Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5))); } + @Test + public void testPlansSizeLimit() throws Exception { + final TableName tn = tableName.getTableName(); + final TableDescriptor tnDescriptor = + TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); + final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build(); + final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build(); + final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build(); + when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); + when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L); + when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L); + when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList( + new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder() + .addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(), + new SplitNormalizationPlan(splitRegionInfo, 1))); + + final Configuration conf = testingUtility.getConfiguration(); + conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5); + + final RegionNormalizerWorker worker = new RegionNormalizerWorker( + testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); + workerPool.submit(worker); + queue.put(tn); + + assertThatEventually("worker should process first split plan, but not second", + worker::getSplitPlanCount, comparesEqualTo(1L)); + assertThatEventually("worker should process merge plan", worker::getMergePlanCount, + comparesEqualTo(1L)); + } + /** * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until * the matcher succeeds or the timeout period of 30 seconds is exhausted. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java index 54d39a9dc00b..5dba036bb705 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.normalizer; import static java.lang.String.format; +import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY; import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS; import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY; import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY; @@ -30,13 +31,18 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.time.Instant; @@ -607,6 +613,23 @@ public void testNormalizerCannotMergeNonAdjacentRegions() { assertThat(plans, empty()); } + @Test + public void testSizeLimitShufflesPlans() { + conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10); + final TableName tableName = name.getTableName(); + final List regionInfos = createRegionInfos(tableName, 4); + final Map regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3); + setupMocksForNormalizer(regionSizes, regionInfos); + when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L); + normalizer = spy(normalizer); + + assertTrue(normalizer.isSplitEnabled()); + assertTrue(normalizer.isMergeEnabled()); + List computedPlans = normalizer.computePlansForTable(tableDescriptor); + assertThat(computedPlans, hasSize(4)); + verify(normalizer, times(1)).shuffleNormalizationPlans(anyList()); + } + @SuppressWarnings("MockitoCast") private void setupMocksForNormalizer(Map regionSizes, List regionInfoList) {