Skip to content

Commit

Permalink
[Enhancement] Store default resource group in FE (#50673)
Browse files Browse the repository at this point in the history
Signed-off-by: zihe.liu <[email protected]>
  • Loading branch information
ZiheLiu authored Sep 5, 2024
1 parent 4645e61 commit fa1256b
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 125 deletions.
11 changes: 6 additions & 5 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1272,11 +1272,12 @@ CONF_mInt64(load_tablet_timeout_seconds, "60");

CONF_mBool(enable_pk_value_column_zonemap, "true");

// Used by default mv resource group
CONF_mDouble(default_mv_resource_group_memory_limit, "0.8");
CONF_mInt32(default_mv_resource_group_cpu_limit, "1");
CONF_mInt32(default_mv_resource_group_concurrency_limit, "0");
CONF_mDouble(default_mv_resource_group_spill_mem_limit_threshold, "0.8");
// Used by default mv resource group.
// These parameters are deprecated because now FE store and persist default_mv_wg.
CONF_Double(default_mv_resource_group_memory_limit, "0.8");
CONF_Int32(default_mv_resource_group_cpu_limit, "1");
CONF_Int32(default_mv_resource_group_concurrency_limit, "0");
CONF_Double(default_mv_resource_group_spill_mem_limit_threshold, "0.8");

// Max size of key columns size of primary key table, default value is 128 bytes
CONF_mInt32(primary_key_limit_size, "128");
Expand Down
28 changes: 0 additions & 28 deletions be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,34 +245,6 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
}
});

_config_callback.emplace("default_mv_resource_group_memory_limit", [&]() {
LOG(INFO) << "set default_mv_resource_group_memory_limit:"
<< config::default_mv_resource_group_memory_limit;
workgroup::DefaultWorkGroupInitialization default_wg_initializer;
auto default_mv_wg = default_wg_initializer.create_default_mv_workgroup();
workgroup::WorkGroupManager::instance()->add_workgroup(default_mv_wg);
});
_config_callback.emplace("default_mv_resource_group_cpu_limit", [&]() {
LOG(INFO) << "set default_mv_resource_group_cpu_limit:" << config::default_mv_resource_group_cpu_limit;
workgroup::DefaultWorkGroupInitialization default_wg_initializer;
auto default_mv_wg = default_wg_initializer.create_default_mv_workgroup();
workgroup::WorkGroupManager::instance()->add_workgroup(default_mv_wg);
});
_config_callback.emplace("default_mv_resource_group_concurrency_limit", [&]() {
LOG(INFO) << "set default_mv_resource_group_concurrency_limit:"
<< config::default_mv_resource_group_concurrency_limit;
workgroup::DefaultWorkGroupInitialization default_wg_initializer;
auto default_mv_wg = default_wg_initializer.create_default_mv_workgroup();
workgroup::WorkGroupManager::instance()->add_workgroup(default_mv_wg);
});
_config_callback.emplace("default_mv_resource_group_spill_mem_limit_threshold", [&]() {
LOG(INFO) << "set default_mv_resource_group_spill_mem_limit_threshold:"
<< config::default_mv_resource_group_spill_mem_limit_threshold;
workgroup::DefaultWorkGroupInitialization default_wg_initializer;
auto default_mv_wg = default_wg_initializer.create_default_mv_workgroup();
workgroup::WorkGroupManager::instance()->add_workgroup(default_mv_wg);
});

#ifdef USE_STAROS
_config_callback.emplace("starlet_cache_thread_num", [&]() {
if (staros::starlet::common::GFlagsUtils::UpdateFlagValue("cachemgr_threadpool_size", value).empty()) {
Expand Down
23 changes: 10 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/catalog/ResourceGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.catalog;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import com.starrocks.qe.ShowResultSetMetaData;
import com.starrocks.sql.analyzer.SemanticException;
Expand All @@ -25,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

Expand All @@ -51,20 +53,15 @@ public class ResourceGroup {
public static final String DEFAULT_MV_RESOURCE_GROUP_NAME = "default_mv_wg";
public static final String SPILL_MEM_LIMIT_THRESHOLD = "spill_mem_limit_threshold";

public static final long DEFAULT_WG_ID = 0;
public static final long DEFAULT_MV_WG_ID = 1;
public static final long DEFAULT_MV_VERSION = 1;
/**
* In the old version, DEFAULT_WG and DEFAULT_MV_WG are not saved and persisted in the FE, but are only created in each
* BE. Its ID is 0 and 1. To distinguish it from the old version, a new ID 2 and 3 is used here.
*/
public static final long DEFAULT_WG_ID = 2;
public static final long DEFAULT_MV_WG_ID = 3;

public static final ResourceGroup DEFAULT_WG = new ResourceGroup();
public static final ResourceGroup DEFAULT_MV_WG = new ResourceGroup();

static {
DEFAULT_WG.setId(DEFAULT_WG_ID);
DEFAULT_WG.setName(DEFAULT_RESOURCE_GROUP_NAME);

DEFAULT_MV_WG.setId(DEFAULT_MV_WG_ID);
DEFAULT_MV_WG.setName(DEFAULT_MV_RESOURCE_GROUP_NAME);
}
public static final Set<String> BUILTIN_WG_NAMES =
ImmutableSet.of(DEFAULT_RESOURCE_GROUP_NAME, DEFAULT_MV_RESOURCE_GROUP_NAME);

private static class ColumnMeta {
public ColumnMeta(Column column, BiFunction<ResourceGroup, ResourceGroupClassifier, String> valueSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.starrocks.catalog;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.gson.annotations.SerializedName;
import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
Expand Down Expand Up @@ -56,7 +57,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -107,10 +108,10 @@ public void createResourceGroup(CreateResourceGroupStmt stmt) throws DdlExceptio
writeLock();
try {
ResourceGroup wg = stmt.getResourceGroup();
boolean needReplace = false;
if (resourceGroupMap.containsKey(wg.getName())) {
// create resource_group or replace <name> ...
if (stmt.isReplaceIfExists()) {
dropResourceGroupUnlocked(wg.getName());
needReplace = true;
} else if (!stmt.isIfNotExists()) {
throw new DdlException(String.format("RESOURCE_GROUP(%s) already exists", wg.getName()));
} else {
Expand All @@ -129,7 +130,8 @@ public void createResourceGroup(CreateResourceGroupStmt stmt) throws DdlExceptio
throw new DdlException("MV Resource Group not support classifiers.");
}

if (wg.getClassifiers() == null || wg.getClassifiers().isEmpty() &&
if ((wg.getClassifiers() == null || wg.getClassifiers().isEmpty()) &&
!ResourceGroup.BUILTIN_WG_NAMES.contains(wg.getName()) &&
!wg.getResourceGroupType().equals(TWorkGroupType.WG_MV)) {
throw new DdlException("This type Resource Group need define classifiers.");
}
Expand All @@ -141,7 +143,18 @@ public void createResourceGroup(CreateResourceGroupStmt stmt) throws DdlExceptio
BackendResourceStat.getInstance().getAvgNumHardwareCoresOfBe() - 1));
}

wg.setId(GlobalStateMgr.getCurrentState().getNextId());
if (needReplace) {
dropResourceGroupUnlocked(wg.getName());
}

if (ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME.equals(wg.getName())) {
wg.setId(ResourceGroup.DEFAULT_WG_ID);
} else if (ResourceGroup.DEFAULT_MV_RESOURCE_GROUP_NAME.equals(wg.getName())) {
wg.setId(ResourceGroup.DEFAULT_MV_WG_ID);
} else {
wg.setId(GlobalStateMgr.getCurrentState().getNextId());
}

wg.setVersion(wg.getId());
for (ResourceGroupClassifier classifier : wg.getClassifiers()) {
classifier.setResourceGroupId(wg.getId());
Expand All @@ -164,11 +177,9 @@ public List<List<String>> showResourceGroup(ShowResourceGroupStmt stmt) {

List<List<String>> rows;
if (stmt.getName() != null) {
rows = GlobalStateMgr.getCurrentState().getResourceGroupMgr().showOneResourceGroup(
stmt.getName(), stmt.isVerbose());
rows = showOneResourceGroup(stmt.getName(), stmt.isVerbose());
} else {
rows = GlobalStateMgr.getCurrentState().getResourceGroupMgr()
.showAllResourceGroups(ConnectContext.get(), stmt.isVerbose(), stmt.isListAll());
rows = showAllResourceGroups(ConnectContext.get(), stmt.isVerbose(), stmt.isListAll());
}
return rows;
}
Expand Down Expand Up @@ -309,23 +320,6 @@ public ResourceGroup getResourceGroup(long id) {
}
}

public ResourceGroup getResourceGroupIncludingDefault(long id) {
ResourceGroup group = getResourceGroup(id);
if (group != null) {
return group;
}

if (id == ResourceGroup.DEFAULT_WG_ID) {
return ResourceGroup.DEFAULT_WG;
}

if (id == ResourceGroup.DEFAULT_MV_WG_ID) {
return ResourceGroup.DEFAULT_MV_WG;
}

return null;
}

public void alterResourceGroup(AlterResourceGroupStmt stmt) throws DdlException {
writeLock();
try {
Expand All @@ -339,6 +333,7 @@ public void alterResourceGroup(AlterResourceGroupStmt stmt) throws DdlException
!(cmd instanceof AlterResourceGroupStmt.AlterProperties)) {
throw new DdlException("MV Resource Group not support classifiers.");
}

if (cmd instanceof AlterResourceGroupStmt.AddClassifiers) {
List<ResourceGroupClassifier> newAddedClassifiers = stmt.getNewAddedClassifiers();
for (ResourceGroupClassifier classifier : newAddedClassifiers) {
Expand Down Expand Up @@ -422,13 +417,8 @@ public void alterResourceGroup(AlterResourceGroupStmt stmt) throws DdlException
TWorkGroupType workGroupType = changedProperties.getResourceGroupType();
Preconditions.checkState(workGroupType == null);
} else if (cmd instanceof AlterResourceGroupStmt.DropClassifiers) {
Set<Long> classifierToDrop = stmt.getClassifiersToDrop().stream().collect(Collectors.toSet());
Iterator<ResourceGroupClassifier> classifierIterator = wg.getClassifiers().iterator();
while (classifierIterator.hasNext()) {
if (classifierToDrop.contains(classifierIterator.next().getId())) {
classifierIterator.remove();
}
}
Set<Long> classifierToDrop = new HashSet<>(stmt.getClassifiersToDrop());
wg.getClassifiers().removeIf(classifier -> classifierToDrop.contains(classifier.getId()));
for (Long classifierId : classifierToDrop) {
classifierMap.remove(classifierId);
}
Expand All @@ -439,6 +429,7 @@ public void alterResourceGroup(AlterResourceGroupStmt stmt) throws DdlException
}
classifierList.clear();
}

// only when changing properties, version is required to update. because changing classifiers needs not
// propagate to BE.
if (cmd instanceof AlterResourceGroupStmt.AlterProperties) {
Expand Down Expand Up @@ -634,6 +625,38 @@ public TWorkGroup chooseResourceGroup(ConnectContext ctx, ResourceGroupClassifie
}
}

public void createBuiltinResourceGroupsIfNotExist() {
try {
ResourceGroup defaultWg = getResourceGroup(ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME);
if (defaultWg != null) {
return;
}

final int avgCpuCores = BackendResourceStat.getInstance().getAvgNumHardwareCoresOfBe();

Map<String, String> defaultWgProperties = ImmutableMap.of(
ResourceGroup.CPU_WEIGHT, Integer.toString(avgCpuCores),
ResourceGroup.MEM_LIMIT, "1.0"
);
CreateResourceGroupStmt defaultWgStmt = new CreateResourceGroupStmt(ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME,
true, false, Collections.emptyList(), defaultWgProperties);
defaultWgStmt.analyze();
createResourceGroup(defaultWgStmt);

Map<String, String> defaultMvWgProperties = ImmutableMap.of(
ResourceGroup.CPU_WEIGHT, "1",
ResourceGroup.MEM_LIMIT, "0.8",
ResourceGroup.SPILL_MEM_LIMIT_THRESHOLD, "0.8"
);
CreateResourceGroupStmt defaultMvWgStmt = new CreateResourceGroupStmt(ResourceGroup.DEFAULT_MV_RESOURCE_GROUP_NAME,
true, false, Collections.emptyList(), defaultMvWgProperties);
defaultMvWgStmt.analyze();
createResourceGroup(defaultMvWgStmt);
} catch (Exception e) {
LOG.warn("failed to create builtin resource groups", e);
}
}

private static class SerializeData {
@SerializedName("WorkGroups")
public List<ResourceGroup> resourceGroups;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import com.starrocks.catalog.ResourceGroup;
import com.starrocks.catalog.ResourceGroupClassifier;
import com.starrocks.catalog.ResourceGroupMgr;
import com.starrocks.common.Config;
import com.starrocks.common.UserException;
import com.starrocks.common.util.DebugUtil;
Expand Down Expand Up @@ -305,33 +306,31 @@ public static TWorkGroup prepareResourceGroup(ConnectContext connect, ResourceGr
if (connect == null || !connect.getSessionVariable().isEnableResourceGroup()) {
return null;
}
SessionVariable sessionVariable = connect.getSessionVariable();

final ResourceGroupMgr resourceGroupMgr = GlobalStateMgr.getCurrentState().getResourceGroupMgr();
final SessionVariable sessionVariable = connect.getSessionVariable();
TWorkGroup resourceGroup = null;

// 1. try to use the resource group specified by the variable
if (StringUtils.isNotEmpty(sessionVariable.getResourceGroup())) {
String rgName = sessionVariable.getResourceGroup();
resourceGroup = GlobalStateMgr.getCurrentState().getResourceGroupMgr().chooseResourceGroupByName(rgName);
if (rgName.equalsIgnoreCase(ResourceGroup.DEFAULT_MV_RESOURCE_GROUP_NAME)) {
ResourceGroup defaultMVResourceGroup = new ResourceGroup();
defaultMVResourceGroup.setId(ResourceGroup.DEFAULT_MV_WG_ID);
defaultMVResourceGroup.setName(ResourceGroup.DEFAULT_MV_RESOURCE_GROUP_NAME);
defaultMVResourceGroup.setVersion(ResourceGroup.DEFAULT_MV_VERSION);
resourceGroup = defaultMVResourceGroup.toThrift();
}
resourceGroup = resourceGroupMgr.chooseResourceGroupByName(rgName);
}

// 2. try to use the resource group specified by workgroup_id
long workgroupId = connect.getSessionVariable().getResourceGroupId();
if (resourceGroup == null && workgroupId > 0) {
resourceGroup = GlobalStateMgr.getCurrentState().getResourceGroupMgr().chooseResourceGroupByID(workgroupId);
resourceGroup = resourceGroupMgr.chooseResourceGroupByID(workgroupId);
}

// 3. if the specified resource group not exist try to use the default one
if (resourceGroup == null) {
Set<Long> dbIds = connect.getCurrentSqlDbIds();
resourceGroup = GlobalStateMgr.getCurrentState().getResourceGroupMgr().chooseResourceGroup(
connect, queryType, dbIds);
resourceGroup = resourceGroupMgr.chooseResourceGroup(connect, queryType, dbIds);
}

if (resourceGroup == null) {
resourceGroup = resourceGroupMgr.chooseResourceGroupByName(ResourceGroup.DEFAULT_RESOURCE_GROUP_NAME);
}

if (resourceGroup != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,7 @@ private void transferToLeader() {
}

createBuiltinStorageVolume();
resourceGroupMgr.createBuiltinResourceGroupsIfNotExist();
keyMgr.initDefaultMasterKey();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import com.starrocks.sql.ast.DropHistogramStmt;
import com.starrocks.sql.ast.DropMaterializedViewStmt;
import com.starrocks.sql.ast.DropRepositoryStmt;
import com.starrocks.sql.ast.DropResourceGroupStmt;
import com.starrocks.sql.ast.DropResourceStmt;
import com.starrocks.sql.ast.DropRoleStmt;
import com.starrocks.sql.ast.DropStatsStmt;
Expand Down Expand Up @@ -234,6 +235,12 @@ public Void visitAlterResourceGroupStatement(AlterResourceGroupStmt statement, C
return null;
}

@Override
public Void visitDropResourceGroupStatement(DropResourceGroupStmt statement, ConnectContext session) {
statement.analyze();
return null;
}

@Override
public Void visitAdminSetReplicaStatusStatement(AdminSetReplicaStatusStmt statement, ConnectContext session) {
AdminStmtAnalyzer.analyze(statement, session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ public static void analyzeProperties(ResourceGroup resourceGroup, Map<String, St
} else {
memLimit = Double.parseDouble(value);
}
if (memLimit <= 0.0 || memLimit >= 1.0) {
throw new SemanticException("mem_limit should range from 0.00(exclude) to 1.00(exclude)");
if (memLimit <= 0.0 || memLimit > 1.0) {
throw new SemanticException("mem_limit should range from 0.00(exclude) to 1.00(include)");
}
resourceGroup.setMemLimit(memLimit);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ public void analyze() {
"should be specified");
}
}

if (ResourceGroup.BUILTIN_WG_NAMES.contains(name) && !(cmd instanceof AlterProperties)) {
throw new SemanticException(String.format("cannot alter classifiers of builtin resource group [%s]", name));
}
}

public List<ResourceGroupClassifier> getNewAddedClassifiers() {
Expand Down
Loading

0 comments on commit fa1256b

Please sign in to comment.