Skip to content

Commit

Permalink
only call getAliveComputeNodes once per query
Browse files Browse the repository at this point in the history
Signed-off-by: Connor Brennan <[email protected]>
  • Loading branch information
ctbrennan committed Oct 21, 2024
1 parent c46e2b9 commit 1236ee1
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 7 deletions.
31 changes: 24 additions & 7 deletions fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ public class OlapScanNode extends ScanNode {

private long totalScanRangeBytes = 0;

// Set to true after it's confirmed at some point during the execution of this request that there is some living CN.
// Set just once per query.
private boolean alreadyFoundSomeLivingCn = false;

// Constructs node to scan given data files of table 'tbl'.
// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
Expand Down Expand Up @@ -510,6 +514,25 @@ public List<TScanRangeLocations> updateScanRangeLocations(List<TScanRangeLocatio
return newLocations;
}


private void checkSomeAliveComputeNode() throws ErrorReportException {
// Note that it's theoretically possible that there were some living CN earlier in this query's execution, and then
// they all died, but in that case, the problem this will be surfaced later anyway.
if (alreadyFoundSomeLivingCn) {
return;
}
// We prefer to call getAliveComputeNodes infrequently, as it can come to dominate the execution time of a query in the
// frontend if there are many calls per request (e.g. one per partition when there are many partitions).
if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
if (CollectionUtils.isEmpty(warehouseManager.getAliveComputeNodes(warehouseId))) {
Warehouse warehouse = warehouseManager.getWarehouse(warehouseId);
throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName());
}
}
alreadyFoundSomeLivingCn = true;
}

public void addScanRangeLocations(Partition partition,
PhysicalPartition physicalPartition,
MaterializedIndex index,
Expand All @@ -527,13 +550,7 @@ public void addScanRangeLocations(Partition partition,
selectedPartitionNames.add(partition.getName());
selectedPartitionVersions.add(visibleVersion);

if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
if (CollectionUtils.isEmpty(warehouseManager.getAliveComputeNodes(warehouseId))) {
Warehouse warehouse = warehouseManager.getWarehouse(warehouseId);
throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName());
}
}
checkSomeAliveComputeNode();
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
LOG.debug("{} tabletId={}", (logNum++), tabletId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,83 @@ public RunMode getCurrentRunMode() {
Assert.assertEquals("No alive backend or compute node in warehouse null.", ex.getMessage());
}

@Test
public void testSelectWorkerGroupByWarehouseId_checkAliveNodesOnce(@Mocked WarehouseManager mockWarehouseMgr)
throws UserException {
Backend b1 = new Backend(10001L, "192.168.0.1", 9050);
b1.setBePort(9060);
b1.setAlive(false);
b1.setWarehouseId(WarehouseManager.DEFAULT_WAREHOUSE_ID);

new MockUp<NodeMgr>() {
@Mock
public SystemInfoService getClusterInfo() {
return systemInfo;
}
};

new MockUp<SystemInfoService>() {
@Mock
public ComputeNode getBackendOrComputeNode(long nodeId) {
return b1;
}
};

new MockUp<StarOSAgent>() {
@Mock
public List<Long> getWorkersByWorkerGroup(long workerGroupId) throws UserException {
if (workerGroupId == StarOSAgent.DEFAULT_WORKER_GROUP_ID) {
return Lists.newArrayList(b1.getId());
}
return Lists.newArrayList();
}
};

new MockUp<GlobalStateMgr>() {
@Mock
public NodeMgr getNodeMgr() {
return nodeMgr;
}

@Mock
public StarOSAgent getStarOSAgent() {
return starOSAgent;
}

@Mock
public WarehouseManager getWarehouseMgr() {
return mockWarehouseMgr;
}

};

ComputeNode livingCn = new ComputeNode();
livingCn.setAlive(true);
new Expectations() {
{
// This is the point of the test -- we only want to call this once even though we're calling
// addScanRangeLocations multiple times.
mockWarehouseMgr.getAliveComputeNodes(WarehouseManager.DEFAULT_WAREHOUSE_ID);
times = 1;
result = Lists.newArrayList(livingCn);
}
};
new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};

OlapScanNode scanNode = newOlapScanNode();
Partition partition = new Partition(123, "aaa", null, null);
MaterializedIndex index = new MaterializedIndex(1, MaterializedIndex.IndexState.NORMAL);
scanNode.addScanRangeLocations(partition, partition, index, Collections.emptyList(), 1);
// Since this is the second call to addScanRangeLocations on the same OlapScanNode, we do not expect another call to
// getAliveComputeNodes.
scanNode.addScanRangeLocations(partition, partition, index, Collections.emptyList(), 1);
}

private OlapScanNode newOlapScanNode() {
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
OlapTable table = new OlapTable();
Expand Down

0 comments on commit 1236ee1

Please sign in to comment.