Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] skip trigger analyze when table not update #52203

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2110,10 +2110,10 @@ public class Config extends ConfigBase {
public static long connector_table_query_trigger_analyze_small_table_rows = 10000000; // 10M

@ConfField(mutable = true)
public static long connector_table_query_trigger_analyze_small_table_interval = 6 * 60 * 60; // unit: second, default 6h
public static long connector_table_query_trigger_analyze_small_table_interval = 2 * 3600; // unit: second, default 2h

@ConfField(mutable = true)
public static long connector_table_query_trigger_analyze_large_table_interval = 24 * 60 * 60; // unit: second, default 24h
public static long connector_table_query_trigger_analyze_large_table_interval = 12 * 3600; // unit: second, default 12h

@ConfField(mutable = true)
public static int connector_table_query_trigger_analyze_max_running_task_num = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.Config;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.AnalyzeStatus;
import com.starrocks.statistic.ColumnStatsMeta;
import com.starrocks.statistic.ExternalAnalyzeStatus;
import com.starrocks.statistic.ExternalBasicStatsMeta;
import com.starrocks.statistic.StatisticExecutor;
import com.starrocks.statistic.StatisticUtils;
import com.starrocks.statistic.StatisticsCollectJobFactory;
Expand All @@ -35,9 +36,11 @@
import org.apache.logging.log4j.Logger;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -48,7 +51,7 @@ public class ConnectorAnalyzeTask {
private final String catalogName;
private final Database db;
private final Table table;
private final Set<String> columns;
private Set<String> columns;

public ConnectorAnalyzeTask(Triple<String, Database, Table> tableTriple, Set<String> columns) {
this.catalogName = Preconditions.checkNotNull(tableTriple.getLeft());
Expand All @@ -69,16 +72,17 @@ public void removeColumns(Set<String> columns) {
this.columns.removeAll(columns);
}

private boolean skipAnalyzeUseLastUpdateTime(LocalDateTime lastUpdateTime) {
// do not know the table row count, use small table analyze interval
if (lastUpdateTime.plusSeconds(Config.connector_table_query_trigger_analyze_small_table_interval).
isAfter(LocalDateTime.now())) {
LOG.info("Table {}.{}.{} is already analyzed at {}, skip it", catalogName, db.getFullName(),
table.getName(), lastUpdateTime);
return true;
} else {
return false;
private boolean needAnalyze(String column, LocalDateTime lastAnalyzedTime) {
LocalDateTime tableUpdateTime = StatisticUtils.getTableLastUpdateTime(table);
// check table update time is after last analyzed time
if (tableUpdateTime != null) {
if (lastAnalyzedTime.isAfter(tableUpdateTime)) {
LOG.info("Table {}.{}.{} column {} last update time: {}, last analyzed time: {}, skip analyze",
catalogName, db.getFullName(), table.getName(), column, tableUpdateTime, lastAnalyzedTime);
return false;
}
}
return true;
}

public Optional<AnalyzeStatus> run() {
Expand All @@ -91,36 +95,51 @@ public Optional<AnalyzeStatus> run() {
.filter(status -> !status.getStatus().equals(StatsConstants.ScheduleStatus.FAILED))
.max(Comparator.comparing(ExternalAnalyzeStatus::getStartTime));
if (lastAnalyzedStatus.isPresent()) {
// Do not analyze the table if the last analyze status is PENDING or RUNNING
StatsConstants.ScheduleStatus lastScheduleStatus = lastAnalyzedStatus.get().getStatus();
if (lastScheduleStatus == StatsConstants.ScheduleStatus.PENDING ||
lastScheduleStatus == StatsConstants.ScheduleStatus.RUNNING) {
LOG.info("Table {}.{}.{} analyze status is {}, skip it", catalogName, db.getFullName(),
table.getName(), lastScheduleStatus);
return Optional.empty();
} else {
// analyze status is Finished
// check the analyzed columns
List<String> analyzedColumns = lastAnalyzedStatus.get().getColumns();
if (analyzedColumns == null || analyzedColumns.isEmpty()) {
// analyzed all columns in last analyzed time, check the update time
if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) {
return Optional.empty();
}
} else {
Set<String> lastAnalyzedColumnsSet = new HashSet<>(analyzedColumns);
if (lastAnalyzedColumnsSet.containsAll(columns)) {
if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) {
return Optional.empty();
}
}
}

ExternalBasicStatsMeta externalBasicStatsMeta = GlobalStateMgr.getCurrentState().getAnalyzeMgr().
getExternalTableBasicStatsMeta(catalogName, db.getFullName(), table.getName());
Optional<LocalDateTime> lastEarliestAnalyzedTime = Optional.empty();
if (externalBasicStatsMeta != null) {
Map<String, LocalDateTime> columnLastAnalyzedTime = externalBasicStatsMeta.getColumnStatsMetaMap().values()
.stream().collect(
Collectors.toMap(ColumnStatsMeta::getColumnName, ColumnStatsMeta::getUpdateTime));
Set<String> needAnalyzeColumns = new HashSet<>(columns);

for (String column : columns) {
if (columnLastAnalyzedTime.containsKey(column)) {
LocalDateTime lastAnalyzedTime = columnLastAnalyzedTime.get(column);
Preconditions.checkNotNull(lastAnalyzedTime, "Last analyzed time is null");
if (needAnalyze(column, lastAnalyzedTime)) {
// need analyze columns, compare the last analyzed time, get the earliest time
lastEarliestAnalyzedTime = lastEarliestAnalyzedTime.
map(localDateTime -> localDateTime.isAfter(lastAnalyzedTime) ? lastAnalyzedTime : localDateTime).
or(() -> Optional.of(lastAnalyzedTime));
} else {
if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) {
columns.removeAll(lastAnalyzedColumnsSet);
}
needAnalyzeColumns.remove(column);
}
}
}
columns = needAnalyzeColumns;
}

if (columns.isEmpty()) {
LOG.info("Table {}.{}.{} columns {} are all up to date, skip analyze", catalogName, db.getFullName(),
table.getName(), columns.stream().map(Object::toString).collect(Collectors.joining(",")));
return Optional.empty();
}

Set<String> updatedPartitions = StatisticUtils.getUpdatedPartitionNames(table,
lastEarliestAnalyzedTime.orElse(LocalDateTime.MIN));

// Init new analyze status
AnalyzeStatus analyzeStatus = new ExternalAnalyzeStatus(GlobalStateMgr.getCurrentState().getNextId(),
catalogName, db.getOriginName(), table.getName(),
Expand All @@ -134,13 +153,14 @@ public Optional<AnalyzeStatus> run() {
ConnectContext statsConnectCtx = StatisticUtils.buildConnectContext();
statsConnectCtx.setThreadLocalInfo();
try {
return Optional.of(executeAnalyze(statsConnectCtx, analyzeStatus));
return Optional.of(executeAnalyze(statsConnectCtx, analyzeStatus, updatedPartitions));
} finally {
ConnectContext.remove();
}
}

public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus) {
public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus,
Set<String> updatedPartitions) {
List<String> columnNames = Lists.newArrayList(columns);
List<Type> columnTypes = columnNames.stream().map(col -> {
Column column = table.getColumn(col);
Expand All @@ -150,7 +170,7 @@ public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatu
StatisticExecutor statisticExecutor = new StatisticExecutor();
return statisticExecutor.collectStatistics(statsConnectCtx,
StatisticsCollectJobFactory.buildExternalStatisticsCollectJob(
catalogName, db, table, null,
catalogName, db, table, updatedPartitions == null ? null : new ArrayList<>(updatedPartitions),
columnNames, columnTypes,
StatsConstants.AnalyzeType.FULL,
StatsConstants.ScheduleType.ONCE, Maps.newHashMap()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.sql.optimizer.Utils.getLongFromDateTime;
Expand Down Expand Up @@ -225,6 +226,17 @@ public static LocalDateTime getTableLastUpdateTime(Table table) {
}
}

public static Set<String> getUpdatedPartitionNames(Table table, LocalDateTime checkTime) {
// get updated partitions
Set<String> updatedPartitions = null;
try {
updatedPartitions = ConnectorPartitionTraits.build(table).getUpdatedPartitionNames(checkTime, 60);
} catch (Exception e) {
// ConnectorPartitionTraits do not support all type of table, ignore exception
}
return updatedPartitions;
}

public static LocalDateTime getPartitionLastUpdateTime(Partition partition) {
long time = partition.getVisibleVersionTime();
return LocalDateTime.ofInstant(Instant.ofEpochMilli(time), Clock.systemDefaultZone().getZone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.Config;
import com.starrocks.connector.ConnectorPartitionTraits;
import com.starrocks.connector.statistics.ConnectorTableColumnStats;
import com.starrocks.monitor.unit.ByteSizeUnit;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -310,12 +309,7 @@ private static void createExternalFullStatsJob(List<StatisticsCollectJob> allTab
Database db, Table table, List<String> columnNames,
List<Type> columnTypes) {
// get updated partitions
Set<String> updatedPartitions = null;
try {
updatedPartitions = ConnectorPartitionTraits.build(table).getUpdatedPartitionNames(statisticsUpdateTime, 60);
} catch (Exception e) {
// ConnectorPartitionTraits do not support all type of table, ignore exception
}
Set<String> updatedPartitions = StatisticUtils.getUpdatedPartitionNames(table, statisticsUpdateTime);
LOG.info("create external full statistics job for table: {}, partitions: {}",
table.getName(), updatedPartitions);
allTableJobMap.add(buildExternalStatisticsCollectJob(job.getCatalogName(), db, table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,26 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.plan.ConnectorPlanTestBase;
import com.starrocks.statistic.AnalyzeStatus;
import com.starrocks.statistic.ColumnStatsMeta;
import com.starrocks.statistic.ExternalAnalyzeStatus;
import com.starrocks.statistic.ExternalBasicStatsMeta;
import com.starrocks.statistic.ExternalFullStatisticsCollectJob;
import com.starrocks.statistic.StatisticUtils;
import com.starrocks.statistic.StatsConstants;
import com.starrocks.utframe.UtFrameUtils;
import io.trino.hive.$internal.org.apache.commons.lang3.tuple.Triple;
import mockit.Mock;
import mockit.MockUp;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class ConnectorAnalyzeTaskTest {
private static ConnectContext ctx;
Expand All @@ -46,6 +53,12 @@ public static void beforeClass() throws Exception {
ConnectorPlanTestBase.mockHiveCatalog(ctx);
}

@After
public void tearDown() {
GlobalStateMgr.getCurrentState().getAnalyzeMgr().getAnalyzeStatusMap().clear();
GlobalStateMgr.getCurrentState().getAnalyzeMgr().getExternalBasicStatsMetaMap().clear();
}

@Test
public void testMergeTask() {
Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0",
Expand Down Expand Up @@ -77,16 +90,86 @@ public void testTaskRun() {

new MockUp<ConnectorAnalyzeTask>() {
@Mock
private AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus) {
private AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus,
Set<String> updatedPartitions) {
analyzeStatus.setStatus(StatsConstants.ScheduleStatus.FINISH);
return analyzeStatus;
}
};

new MockUp<StatisticUtils>() {
@Mock
public LocalDateTime getTableLastUpdateTime(Table table) {
return LocalDateTime.now().minusDays(1);
}
};
// execute analyze when last analyze status is finish
externalAnalyzeStatus.setStatus(StatsConstants.ScheduleStatus.FINISH);
// add ExternalBasicStatsMeta when analyze finish
ExternalBasicStatsMeta externalBasicStatsMeta = new ExternalBasicStatsMeta("hive0", "partitioned_db",
"orders", List.of("o_custkey", "o_orderstatus"), StatsConstants.AnalyzeType.FULL,
LocalDateTime.now(), Maps.newHashMap());
externalBasicStatsMeta.addColumnStatsMeta(new ColumnStatsMeta("o_custkey", StatsConstants.AnalyzeType.FULL,
LocalDateTime.now()));
externalBasicStatsMeta.addColumnStatsMeta(new ColumnStatsMeta("o_orderstatus", StatsConstants.AnalyzeType.FULL,
LocalDateTime.now()));

GlobalStateMgr.getCurrentState().getAnalyzeMgr().addExternalBasicStatsMeta(externalBasicStatsMeta);

result = task1.run();
Assert.assertTrue(result.isPresent());
Assert.assertTrue(result.get() instanceof ExternalAnalyzeStatus);
ExternalAnalyzeStatus externalAnalyzeStatusResult = (ExternalAnalyzeStatus) result.get();
Assert.assertEquals(List.of("o_orderkey"), externalAnalyzeStatusResult.getColumns());
}

@Test
public void testTaskRunWithTableUpdate() {
Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0",
"partitioned_db", "orders");
String tableUUID = table.getUUID();

Triple<String, Database, Table> tableTriple = StatisticsUtils.getTableTripleByUUID(tableUUID);
ConnectorAnalyzeTask task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey"));
new MockUp<ExternalFullStatisticsCollectJob>() {
@Mock
public void collect(ConnectContext context, AnalyzeStatus analyzeStatus) throws Exception {
// do nothing
}
};

Optional<AnalyzeStatus> result = task1.run();
Assert.assertTrue(result.isPresent());
Map<String, ColumnStatsMeta> columnStatsMetaMap = GlobalStateMgr.getCurrentState().getAnalyzeMgr().
getExternalTableBasicStatsMeta("hive0", "partitioned_db", "orders").getColumnStatsMetaMap();
Assert.assertEquals(2, columnStatsMetaMap.size());
Assert.assertTrue(columnStatsMetaMap.containsKey("o_orderkey"));
Assert.assertTrue(columnStatsMetaMap.containsKey("o_custkey"));

new MockUp<StatisticUtils>() {
@Mock
public LocalDateTime getTableLastUpdateTime(Table table) {
return LocalDateTime.now().minusDays(1);
}
};
// table not update, skip analyze
task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey"));
result = task1.run();
Assert.assertTrue(result.isEmpty());

// table update, analyze again
new MockUp<StatisticUtils>() {
@Mock
public LocalDateTime getTableLastUpdateTime(Table table) {
return LocalDateTime.now().plusMinutes(10);
}
};

task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey"));
result = task1.run();
Assert.assertTrue(result.isPresent());
Assert.assertTrue(result.get() instanceof ExternalAnalyzeStatus);
ExternalAnalyzeStatus externalAnalyzeStatusResult = (ExternalAnalyzeStatus) result.get();
Assert.assertEquals(2, externalAnalyzeStatusResult.getColumns().size());
}
}
Loading