diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/TPCDSCoordTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/TPCDSCoordTest.java index 90dfcfe8613b3..060e39f33f8d5 100644 --- a/fe/fe-core/src/test/java/com/starrocks/qe/TPCDSCoordTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/qe/TPCDSCoordTest.java @@ -15,11 +15,13 @@ package com.starrocks.qe; import com.starrocks.common.FeConstants; +import com.starrocks.planner.OlapTableSink; import com.starrocks.planner.RuntimeFilterDescription; import com.starrocks.qe.scheduler.dag.ExecutionFragment; import com.starrocks.sql.plan.ExecPlan; import com.starrocks.sql.plan.TPCDSPlanTest; import com.starrocks.sql.plan.TPCDSPlanTestBase; +import com.starrocks.thrift.TNetworkAddress; import com.starrocks.thrift.TUniqueId; import com.starrocks.utframe.UtFrameUtils; import org.junit.After; @@ -31,6 +33,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -153,4 +156,55 @@ public void testSubQueryExtractedFromQ5() throws Exception { } Assert.assertTrue(rfExists); } + + @Test + public void testOlapTableSinkAsGRFCoordinator() throws Exception { + FeConstants.runningUnitTest = true; + ConnectContext ctx = starRocksAssert.getCtx(); + ctx.setExecutionId(new TUniqueId(0x33, 0x0)); + ConnectContext.threadLocalInfo.set(ctx); + ctx.getSessionVariable().setParallelExecInstanceNum(8); + ctx.getSessionVariable().setEnablePipelineEngine(true); + setTPCDSFactor(1); + + // make sure global runtime filter been push-downed to two fragments. + String sql = "insert into item \n" + + "select item.*\n" + + "from\n" + + " item inner join[shuffle] store_sales on store_sales.ss_item_sk = item.i_item_sk \n" + + " inner join [shuffle] date_dim dt on dt.d_date_sk = store_sales.ss_sold_date_sk\n" + + "where \n" + + " item.i_manufact_id = 128\n" + + " and dt.d_moy=11"; + String plan = UtFrameUtils.getVerboseFragmentPlan(ctx, sql); + String[] ss = plan.split("\\n"); + List filterLines = Stream.of(ss).filter(s -> s.contains("filter_id =")).collect(Collectors.toList()); + Assert.assertFalse(filterLines.isEmpty()); + Assert.assertTrue(filterLines.stream().anyMatch(ln -> ln.contains("remote = true"))); + ExecPlan execPlan = UtFrameUtils.getPlanAndFragment(ctx, sql).second; + DefaultCoordinator coord = new DefaultCoordinator.Factory().createQueryScheduler( + ctx, execPlan.getFragments(), execPlan.getScanNodes(), execPlan.getDescTbl().toThrift()); + coord.prepareExec(); + + ExecutionFragment rootExecFragment = coord.getExecutionDAG().getFragmentsInPreorder().get(0); + Assert.assertTrue(rootExecFragment.getPlanFragment().getSink() instanceof OlapTableSink); + Assert.assertFalse(rootExecFragment.getRuntimeFilterParams().getRuntime_filter_builder_number().isEmpty()); + + Set grfCoordinators = + coord.getExecutionDAG().getFragmentsInPreorder().stream().flatMap(execFragment -> { + Map buildRfFilters = + execFragment.getPlanFragment().getBuildRuntimeFilters(); + if (buildRfFilters == null || buildRfFilters.isEmpty()) { + return Stream.empty(); + } else { + return buildRfFilters.values().stream() + .filter(RuntimeFilterDescription::isHasRemoteTargets) + .flatMap(rf -> rf.toThrift().getRuntime_filter_merge_nodes().stream()); + } + }).collect(Collectors.toSet()); + + Assert.assertEquals(grfCoordinators.size(), 1); + Assert.assertTrue( + grfCoordinators.contains(rootExecFragment.getInstances().get(0).getWorker().getBrpcAddress())); + } }