Skip to content

Commit

Permalink
[UT] Use a stable UT instead of unstable IT(test_global_runtime_filte…
Browse files Browse the repository at this point in the history
…r_olap_table_sink) (backport #44519)

Signed-off-by: satanson <[email protected]>
  • Loading branch information
satanson committed Apr 23, 2024
1 parent 3f77a0d commit 9359ff0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 361 deletions.
58 changes: 58 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/qe/TPCDSCoordTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package com.starrocks.qe;

import com.starrocks.common.FeConstants;
import com.starrocks.planner.OlapTableSink;
import com.starrocks.planner.PlanFragmentId;
import com.starrocks.planner.RuntimeFilterDescription;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.TPCDSPlanTest;
import com.starrocks.sql.plan.TPCDSPlanTestBase;
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.utframe.UtFrameUtils;
import org.junit.After;
Expand All @@ -32,6 +35,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -155,4 +159,58 @@ public void testSubQueryExtractedFromQ5() throws Exception {
}
Assert.assertTrue(rfExists);
}

@Test
public void testOlapTableSinkAsGRFCoordinator() throws Exception {
FeConstants.runningUnitTest = true;
ConnectContext ctx = starRocksAssert.getCtx();
ctx.setExecutionId(new TUniqueId(0x33, 0x0));
ConnectContext.threadLocalInfo.set(ctx);
ctx.getSessionVariable().setParallelExecInstanceNum(8);
ctx.getSessionVariable().setEnablePipelineEngine(true);
setTPCDSFactor(1);

// make sure global runtime filter been push-downed to two fragments.
String sql = "insert into item \n" +
"select item.*\n" +
"from\n" +
" item inner join[shuffle] store_sales on store_sales.ss_item_sk = item.i_item_sk \n" +
" inner join [shuffle] date_dim dt on dt.d_date_sk = store_sales.ss_sold_date_sk\n" +
"where \n" +
" item.i_manufact_id = 128\n" +
" and dt.d_moy=11";
String plan = UtFrameUtils.getVerboseFragmentPlan(ctx, sql);
String[] ss = plan.split("\\n");
List<String> filterLines = Stream.of(ss).filter(s -> s.contains("filter_id =")).collect(Collectors.toList());
Assert.assertFalse(filterLines.isEmpty());
Assert.assertTrue(filterLines.stream().anyMatch(ln -> ln.contains("remote = true")));
ExecPlan execPlan = UtFrameUtils.getPlanAndFragment(ctx, sql).second;
Coordinator coord = new Coordinator(ctx, execPlan.getFragments(), execPlan.getScanNodes(),
execPlan.getDescTbl().toThrift());
coord.prepareExec();

PlanFragmentId rootFragmentId = coord.getFragments().get(0).getFragmentId();
CoordinatorPreprocessor.FragmentExecParams rootExecFragment =
coord.getFragmentExecParamsMap().get(rootFragmentId);
CoordinatorPreprocessor.FInstanceExecParam rootFInstance = rootExecFragment.instanceExecParams.get(0);
Assert.assertTrue(rootExecFragment.fragment.getSink() instanceof OlapTableSink);
Assert.assertTrue(rootFInstance.isRuntimeFilterCoordinator());
Assert.assertFalse(rootFInstance.runtimeFilterParams.getRuntime_filter_builder_number().isEmpty());

Set<TNetworkAddress> grfCoordinators =
coord.getFragmentExecParamsMap().values().stream().flatMap(execFragment -> {
Map<Integer, RuntimeFilterDescription> buildRfFilters =
execFragment.fragment.getBuildRuntimeFilters();
if (buildRfFilters == null || buildRfFilters.isEmpty()) {
return Stream.empty();
} else {
return buildRfFilters.values().stream()
.filter(RuntimeFilterDescription::isHasRemoteTargets)
.flatMap(rf -> rf.toThrift().getRuntime_filter_merge_nodes().stream());
}
}).collect(Collectors.toSet());

Assert.assertEquals(grfCoordinators.size(), 1);
Assert.assertTrue(grfCoordinators.contains(SystemInfoService.toBrpcHost(rootFInstance.getHost())));
}
}

This file was deleted.

Loading

0 comments on commit 9359ff0

Please sign in to comment.