Skip to content

Commit

Permalink
[Test] Use a stable UT instead of unstable IT(test_global_runtime_fil…
Browse files Browse the repository at this point in the history
…ter_olap_table_sink) (#44519)

Signed-off-by: satanson <[email protected]>
  • Loading branch information
satanson authored Apr 22, 2024
1 parent bb1ec64 commit 936cdd1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 353 deletions.
54 changes: 54 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/qe/TPCDSCoordTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package com.starrocks.qe;

import com.starrocks.common.FeConstants;
import com.starrocks.planner.OlapTableSink;
import com.starrocks.planner.RuntimeFilterDescription;
import com.starrocks.qe.scheduler.dag.ExecutionFragment;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.TPCDSPlanTest;
import com.starrocks.sql.plan.TPCDSPlanTestBase;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.After;
Expand All @@ -31,6 +33,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -150,4 +153,55 @@ public void testSubQueryExtractedFromQ5() throws Exception {
}
Assert.assertTrue(rfExists);
}

@Test
public void testOlapTableSinkAsGRFCoordinator() throws Exception {
FeConstants.runningUnitTest = true;
ConnectContext ctx = starRocksAssert.getCtx();
ctx.setExecutionId(new TUniqueId(0x33, 0x0));
ConnectContext.threadLocalInfo.set(ctx);
ctx.getSessionVariable().setParallelExecInstanceNum(8);
ctx.getSessionVariable().setEnablePipelineEngine(true);
setTPCDSFactor(1);

// make sure global runtime filter been push-downed to two fragments.
String sql = "insert into item \n" +
"select item.*\n" +
"from\n" +
" item inner join[shuffle] store_sales on store_sales.ss_item_sk = item.i_item_sk \n" +
" inner join [shuffle] date_dim dt on dt.d_date_sk = store_sales.ss_sold_date_sk\n" +
"where \n" +
" item.i_manufact_id = 128\n" +
" and dt.d_moy=11";
String plan = UtFrameUtils.getVerboseFragmentPlan(ctx, sql);
String[] ss = plan.split("\\n");
List<String> filterLines = Stream.of(ss).filter(s -> s.contains("filter_id =")).collect(Collectors.toList());
Assert.assertFalse(filterLines.isEmpty());
Assert.assertTrue(filterLines.stream().anyMatch(ln -> ln.contains("remote = true")));
ExecPlan execPlan = UtFrameUtils.getPlanAndFragment(ctx, sql).second;
DefaultCoordinator coord = new DefaultCoordinator.Factory().createQueryScheduler(
ctx, execPlan.getFragments(), execPlan.getScanNodes(), execPlan.getDescTbl().toThrift());
coord.prepareExec();

ExecutionFragment rootExecFragment = coord.getExecutionDAG().getFragmentsInPreorder().get(0);
Assert.assertTrue(rootExecFragment.getPlanFragment().getSink() instanceof OlapTableSink);
Assert.assertFalse(rootExecFragment.getRuntimeFilterParams().getRuntime_filter_builder_number().isEmpty());

Set<TNetworkAddress> grfCoordinators =
coord.getExecutionDAG().getFragmentsInPreorder().stream().flatMap(execFragment -> {
Map<Integer, RuntimeFilterDescription> buildRfFilters =
execFragment.getPlanFragment().getBuildRuntimeFilters();
if (buildRfFilters == null || buildRfFilters.isEmpty()) {
return Stream.empty();
} else {
return buildRfFilters.values().stream()
.filter(RuntimeFilterDescription::isHasRemoteTargets)
.flatMap(rf -> rf.toThrift().getRuntime_filter_merge_nodes().stream());
}
}).collect(Collectors.toSet());

Assert.assertEquals(grfCoordinators.size(), 1);
Assert.assertTrue(
grfCoordinators.contains(rootExecFragment.getInstances().get(0).getWorker().getBrpcAddress()));
}
}

This file was deleted.

Loading

0 comments on commit 936cdd1

Please sign in to comment.