diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionFragment.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionFragment.java new file mode 100644 index 0000000000000..6259a8ade8cbd --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionFragment.java @@ -0,0 +1,461 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe.scheduler.dag; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.starrocks.common.util.DebugUtil; +import com.starrocks.planner.ExchangeNode; +import com.starrocks.planner.JoinNode; +import com.starrocks.planner.OlapScanNode; +import com.starrocks.planner.PlanFragment; +import com.starrocks.planner.PlanFragmentId; +import com.starrocks.planner.PlanNode; +import com.starrocks.planner.PlanNodeId; +import com.starrocks.planner.RuntimeFilterDescription; +import com.starrocks.planner.ScanNode; +import com.starrocks.qe.ColocatedBackendSelector; +import com.starrocks.qe.CoordinatorPreprocessor; +import com.starrocks.qe.FragmentScanRangeAssignment; +import com.starrocks.qe.scheduler.ExplainBuilder; +import com.starrocks.thrift.TEsScanRange; +import com.starrocks.thrift.THdfsScanRange; +import com.starrocks.thrift.TInternalScanRange; +import com.starrocks.thrift.TPlanFragmentDestination; +import com.starrocks.thrift.TRuntimeFilterParams; +import com.starrocks.thrift.TScanRangeParams; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; + +import static com.starrocks.qe.scheduler.dag.FragmentInstance.ABSENT_DRIVER_SEQUENCE; + +/** + * An {@code ExecutionFragment} is a part of the {@link ExecutionDAG}, and it corresponds one-to-one with a {@link PlanFragment}. + * + *

The {@code ExecutionFragment} represents a collection of multiple parallel instances of a {@link PlanFragment}. + * It contains a {@link FragmentInstance} for each parallel instance. + * Additionally, it includes information about the source and sink, including assigned scan ranges + * and destinations to the downstream. + */ +public class ExecutionFragment { + private final ExecutionDAG executionDAG; + private final int fragmentIndex; + private final PlanFragment planFragment; + private final Map scanNodes; + + private final List destinations; + private final Map numSendersPerExchange; + + private final List instances; + + private final FragmentScanRangeAssignment scanRangeAssignment; + private ColocatedBackendSelector.Assignment colocatedAssignment = null; + + public static class BucketSeqAssignment { + public List bucketSeqToInstance; + public List bucketSeqToDriverSeq; + public List bucketSeqToPartition; + } + private BucketSeqAssignment cachedBucketSeqAssignment = null; + private boolean bucketSeqToInstanceForFilterIsSet = false; + + private final TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams(); + + private Boolean cachedIsColocated = null; + private Boolean cachedIsReplicated = null; + private Boolean cachedIsLocalBucketShuffleJoin = null; + + private boolean isRightOrFullBucketShuffle = false; + // used for phased schedule + private boolean isScheduled = false; + private boolean needReportFragmentFinish = false; + + public ExecutionFragment(ExecutionDAG executionDAG, PlanFragment planFragment, int fragmentIndex) { + this.executionDAG = executionDAG; + this.fragmentIndex = fragmentIndex; + this.planFragment = planFragment; + this.scanNodes = planFragment.collectScanNodes(); + + this.destinations = Lists.newArrayList(); + this.numSendersPerExchange = Maps.newHashMap(); + + this.instances = Lists.newArrayList(); + this.scanRangeAssignment = new FragmentScanRangeAssignment(); + } + + public String getExplainString() { + ExplainBuilder builder = new ExplainBuilder(); + builder.addValue(String.format("PLAN FRAGMENT %d(%s)", fragmentIndex, getFragmentId()), () -> { + builder.addValue("DOP", planFragment.getPipelineDop()); + builder.addValue("INSTANCES", () -> instances.forEach(instance -> instance.buildExplainString(builder))); + }); + return builder.build(); + } + + public int getFragmentIndex() { + return fragmentIndex; + } + + public PlanFragment getPlanFragment() { + return planFragment; + } + + public PlanFragmentId getFragmentId() { + return planFragment.getFragmentId(); + } + + public Collection getScanNodes() { + return scanNodes.values(); + } + + public ScanNode getScanNode(PlanNodeId scanId) { + return scanNodes.get(scanId); + } + + public ExecutionDAG getExecutionDAG() { + return executionDAG; + } + + public void setLayoutInfosForRuntimeFilters() { + if (bucketSeqToInstanceForFilterIsSet) { + return; + } + bucketSeqToInstanceForFilterIsSet = true; + + BucketSeqAssignment bucketSeqAssignment = getBucketSeqAssignment(); + for (RuntimeFilterDescription rf : planFragment.getBuildRuntimeFilters().values()) { + rf.setNumInstances(instances.size()); + rf.setNumDriversPerInstance(planFragment.getPipelineDop()); + + if (bucketSeqAssignment == null) { + return; + } + rf.setBucketSeqToInstance(bucketSeqAssignment.bucketSeqToInstance); + rf.setBucketSeqToDriverSeq(bucketSeqAssignment.bucketSeqToDriverSeq); + rf.setBucketSeqToPartition(bucketSeqAssignment.bucketSeqToPartition); + } + } + + public FragmentScanRangeAssignment getScanRangeAssignment() { + return scanRangeAssignment; + } + + public ColocatedBackendSelector.Assignment getColocatedAssignment() { + return colocatedAssignment; + } + + public ColocatedBackendSelector.Assignment getOrCreateColocatedAssignment(OlapScanNode scanNode) { + if (colocatedAssignment == null) { + final int numOlapScanNodes = scanNodes.values().stream().mapToInt(node -> node instanceof OlapScanNode ? 1 : 0).sum(); + colocatedAssignment = new ColocatedBackendSelector.Assignment(scanNode, numOlapScanNodes); + } + return colocatedAssignment; + } + + public int getBucketNum() { + if (colocatedAssignment == null) { + return 0; + } + return colocatedAssignment.getBucketNum(); + } + + public BucketSeqAssignment getBucketSeqAssignment() { + if (cachedBucketSeqAssignment != null) { + return cachedBucketSeqAssignment; + } + + if (colocatedAssignment == null) { + return null; + } + + int numBuckets = getBucketNum(); + Integer[] bucketSeqToInstance = new Integer[numBuckets]; + Integer[] bucketSeqToDriverSeq = new Integer[numBuckets]; + Integer[] bucketSeqToPartition = new Integer[numBuckets]; + // some buckets are pruned, so set the corresponding instance ordinal to BUCKET_ABSENT to indicate + // absence of buckets. + Arrays.fill(bucketSeqToInstance, CoordinatorPreprocessor.BUCKET_ABSENT); + Arrays.fill(bucketSeqToDriverSeq, CoordinatorPreprocessor.BUCKET_ABSENT); + Arrays.fill(bucketSeqToPartition, CoordinatorPreprocessor.BUCKET_ABSENT); + boolean assignBucketsAmongDrivers = + instances.stream().flatMap(instance -> instance.getBucketSeqToDriverSeq().values().stream()) + .noneMatch(driverSeq -> driverSeq.equals(ABSENT_DRIVER_SEQUENCE)); + + int nextPartition = 0; + for (FragmentInstance instance : instances) { + if (assignBucketsAmongDrivers) { + List> sortedBucketSeqToDriverSeq = + instance.getBucketSeqToDriverSeq().entrySet().stream() + .sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList()); + for (Map.Entry entry : sortedBucketSeqToDriverSeq) { + Integer bucketSeq = entry.getKey(); + Integer driverSeq = entry.getValue(); + Preconditions.checkState(bucketSeq < numBuckets, + "bucketSeq exceeds bucketNum in colocate Fragment"); + bucketSeqToInstance[bucketSeq] = instance.getIndexInFragment(); + bucketSeqToDriverSeq[bucketSeq] = driverSeq; + bucketSeqToPartition[bucketSeq] = nextPartition++; + } + } else { + for (Integer bucketSeq : instance.getBucketSeqs()) { + Preconditions.checkState(bucketSeq < numBuckets, + "bucketSeq exceeds bucketNum in colocate Fragment"); + bucketSeqToInstance[bucketSeq] = instance.getIndexInFragment(); + } + } + } + + cachedBucketSeqAssignment = new BucketSeqAssignment(); + cachedBucketSeqAssignment.bucketSeqToInstance = Arrays.asList(bucketSeqToInstance); + if (assignBucketsAmongDrivers) { + cachedBucketSeqAssignment.bucketSeqToDriverSeq = Arrays.asList(bucketSeqToDriverSeq); + cachedBucketSeqAssignment.bucketSeqToPartition = Arrays.asList(bucketSeqToPartition); + } + return cachedBucketSeqAssignment; + } + + public void addDestination(TPlanFragmentDestination destination) { + destinations.add(destination); + } + + public List getDestinations() { + return destinations; + } + + public int childrenSize() { + return planFragment.getChildren().size(); + } + + public ExecutionFragment getChild(int i) { + return executionDAG.getFragment(planFragment.getChild(i).getFragmentId()); + } + + public List getInstances() { + return instances; + } + + public void addInstance(FragmentInstance instance) { + instance.setIndexInFragment(instances.size()); + instances.add(instance); + } + + public void shuffleInstances(Random random) { + Collections.shuffle(instances, random); + for (int i = 0; i < instances.size(); i++) { + instances.get(i).setIndexInFragment(i); + } + } + + public PlanNode getLeftMostNode() { + PlanNode node = planFragment.getPlanRoot(); + while (!node.getChildren().isEmpty() && !(node instanceof ExchangeNode)) { + node = node.getChild(0); + } + return node; + } + + public Map getNumSendersPerExchange() { + return numSendersPerExchange; + } + + public TRuntimeFilterParams getRuntimeFilterParams() { + return runtimeFilterParams; + } + + public boolean isColocated() { + if (cachedIsColocated != null) { + return cachedIsColocated; + } + + cachedIsColocated = isColocated(planFragment.getPlanRoot()); + return cachedIsColocated; + } + + public boolean isReplicated() { + if (cachedIsReplicated != null) { + return cachedIsReplicated; + } + + cachedIsReplicated = isReplicated(planFragment.getPlanRoot()); + return cachedIsReplicated; + } + + public boolean isLocalBucketShuffleJoin() { + if (cachedIsLocalBucketShuffleJoin != null) { + return cachedIsLocalBucketShuffleJoin; + } + + cachedIsLocalBucketShuffleJoin = isLocalBucketShuffleJoin(planFragment.getPlanRoot()); + return cachedIsLocalBucketShuffleJoin; + } + + public boolean isRightOrFullBucketShuffle() { + isLocalBucketShuffleJoin(); // isRightOrFullBucketShuffle is calculated when calculating isBucketShuffleJoin. + return isRightOrFullBucketShuffle; + } + + // Append range information + // [tablet_id(version),tablet_id(version)] + public void appendScanRange(StringBuilder sb, List params) { + sb.append("range=["); + int idx = 0; + for (TScanRangeParams range : params) { + TInternalScanRange internalScanRange = range.getScan_range().getInternal_scan_range(); + if (internalScanRange != null) { + if (idx++ != 0) { + sb.append(","); + } + sb.append("{tid=").append(internalScanRange.getTablet_id()) + .append(",ver=").append(internalScanRange.getVersion()).append("}"); + } + TEsScanRange esScanRange = range.getScan_range().getEs_scan_range(); + if (esScanRange != null) { + sb.append("{ index=").append(esScanRange.getIndex()) + .append(", shardid=").append(esScanRange.getShard_id()) + .append("}"); + } + THdfsScanRange hdfsScanRange = range.getScan_range().getHdfs_scan_range(); + if (hdfsScanRange != null) { + sb.append("{relative_path=").append(hdfsScanRange.getRelative_path()) + .append(", offset=").append(hdfsScanRange.getOffset()) + .append(", length=").append(hdfsScanRange.getLength()) + .append("}"); + } + } + sb.append("]"); + } + + public void appendTo(StringBuilder sb) { + // append fragment + sb.append("{plan="); + planFragment.getPlanRoot().appendTrace(sb); + sb.append(",instance=["); + // append instance + for (int i = 0; i < instances.size(); ++i) { + if (i != 0) { + sb.append(","); + } + + FragmentInstance instance = instances.get(i); + + Map> scanRanges = + scanRangeAssignment.get(instance.getWorkerId()); + sb.append("{"); + sb.append("id=").append(DebugUtil.printId(instance.getInstanceId())); + sb.append(",host=").append(instance.getWorker().getAddress()); + if (scanRanges == null) { + sb.append("}"); + continue; + } + sb.append(",range=["); + int eIdx = 0; + for (Map.Entry> entry : scanRanges.entrySet()) { + if (eIdx++ != 0) { + sb.append(","); + } + sb.append("id").append(entry.getKey()).append(","); + appendScanRange(sb, entry.getValue()); + } + sb.append("]"); + sb.append("}"); + } + sb.append("]"); // end of instances + sb.append("}"); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + appendTo(sb); + return sb.toString(); + } + + private boolean isColocated(PlanNode root) { + if (root instanceof ExchangeNode) { + return false; + } + + if (root.isColocate()) { + return true; + } + + if (root.isReplicated()) { + // Only check left if node is replicate join + return isColocated(root.getChild(0)); + } else { + return root.getChildren().stream().anyMatch(this::isColocated); + } + } + + public boolean isRuntimeFilterCoordinator() { + return runtimeFilterParams.isSetRuntime_filter_builder_number(); + } + + private boolean isReplicated(PlanNode root) { + if (root instanceof ExchangeNode) { + return false; + } + + if (root.isReplicated()) { + return true; + } + + return root.getChildren().stream().anyMatch(PlanNode::isReplicated); + } + + private boolean isLocalBucketShuffleJoin(PlanNode root) { + if (root instanceof ExchangeNode) { + return false; + } + + boolean hasBucketShuffle = false; + if (root instanceof JoinNode) { + JoinNode joinNode = (JoinNode) root; + if (joinNode.isLocalHashBucket()) { + hasBucketShuffle = true; + isRightOrFullBucketShuffle |= joinNode.getJoinOp().isFullOuterJoin() || joinNode.getJoinOp().isRightJoin(); + } + } + + for (PlanNode child : root.getChildren()) { + hasBucketShuffle |= isLocalBucketShuffleJoin(child); + } + + return hasBucketShuffle; + } + + public boolean isScheduled() { + return isScheduled; + } + + public void setIsScheduled(boolean isScheduled) { + this.isScheduled = isScheduled; + } + + public boolean isNeedReportFragmentFinish() { + return needReportFragmentFinish; + } + public void setNeedReportFragmentFinish(boolean needReportFragmentFinish) { + this.needReportFragmentFinish = needReportFragmentFinish; + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/TestBucketShuffleRightJoin.java b/fe/fe-core/src/test/java/com/starrocks/qe/TestBucketShuffleRightJoin.java new file mode 100644 index 0000000000000..0bc5527676584 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/qe/TestBucketShuffleRightJoin.java @@ -0,0 +1,88 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe; + +import com.starrocks.common.FeConstants; +import com.starrocks.common.Pair; +import com.starrocks.qe.scheduler.dag.ExecutionFragment; +import com.starrocks.sql.plan.ExecPlan; +import com.starrocks.statistic.StatsConstants; +import com.starrocks.utframe.StarRocksAssert; +import com.starrocks.utframe.UtFrameUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; + +import static com.starrocks.sql.optimizer.statistics.CachedStatisticStorageTest.DEFAULT_CREATE_TABLE_TEMPLATE; + +public class TestBucketShuffleRightJoin { + + private static StarRocksAssert starRocksAssert; + + @BeforeClass + public static void setUp() { + UtFrameUtils.createMinStarRocksCluster(); + ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + FeConstants.runningUnitTest = true; + starRocksAssert = new StarRocksAssert(ctx); + try { + starRocksAssert.withDatabase(StatsConstants.STATISTICS_DB_NAME) + .useDatabase(StatsConstants.STATISTICS_DB_NAME) + .withTable(DEFAULT_CREATE_TABLE_TEMPLATE); + + starRocksAssert.withDatabase("test").useDatabase("test"); + String tableFmt = "CREATE TABLE `%s` (\n" + + " `c0` bigint(20) NOT NULL COMMENT \"\",\n" + + " `c1` bigint(20) NOT NULL COMMENT \"\",\n" + + " `c2` bigint(20) NOT NULL COMMENT \"\"\n" + + ") ENGINE=OLAP \n" + + "DUPLICATE KEY(`c0`)\n" + + "COMMENT \"OLAP\"\n" + + "DISTRIBUTED BY HASH(`c0`) BUCKETS 9 \n" + + "PROPERTIES (\n" + + "\"compression\" = \"LZ4\",\n" + + "\"replication_num\" = \"1\"\n" + + ");"; + for (String tableName : Arrays.asList("t0", "t1", "t2")) { + String createTblSql = String.format(tableFmt, tableName); + starRocksAssert.withTable(createTblSql); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + FeConstants.runningUnitTest = true; + } + + @Test + public void test() throws Exception { + ConnectContext ctx = starRocksAssert.getCtx(); + String sql = "select t0.c0,t0.c1,t0.c2,t1.c2 as ab from " + + "(select * from t1 where c0 in (8)) t1 right outer join[bucket] " + + "(select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 " + + "left join[bucket] t2 on t2.c0 = t1.c0"; + Pair explainAndExecPlan = UtFrameUtils.getPlanAndFragment(ctx, sql); + String plan = explainAndExecPlan.first; + ExecPlan execPlan = explainAndExecPlan.second; + DefaultCoordinator coord = new DefaultCoordinator.Factory().createQueryScheduler( + ctx, execPlan.getFragments(), execPlan.getScanNodes(), execPlan.getDescTbl().toThrift()); + coord.prepareExec(); + + boolean bucketShuffleRightJoinPresent = coord.getExecutionDAG().getFragmentsInPreorder() + .stream().anyMatch(ExecutionFragment::isRightOrFullBucketShuffle); + Assert.assertTrue(plan, bucketShuffleRightJoinPresent); + } +} diff --git a/test/sql/test_bucket_shuffle_right_join/R/test_bucket_shuffle_right_join b/test/sql/test_bucket_shuffle_right_join/R/test_bucket_shuffle_right_join new file mode 100644 index 0000000000000..c79964b9999fc --- /dev/null +++ b/test/sql/test_bucket_shuffle_right_join/R/test_bucket_shuffle_right_join @@ -0,0 +1,270 @@ +-- name: test_bucket_shuffle_right_join + DROP TABLE if exists t0; +-- result: +-- !result + CREATE TABLE if not exists t0 + ( + c0 BIGINT NOT NULL, + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c0` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c0` ) BUCKETS 9 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); +-- result: +-- !result + DROP TABLE if exists t1; +-- result: +-- !result + CREATE TABLE if not exists t1 + ( + c0 BIGINT NOT NULL, + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c0` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c0` ) BUCKETS 9 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); +-- result: +-- !result + DROP TABLE if exists t2; +-- result: +-- !result + CREATE TABLE if not exists t2 + ( + c0 BIGINT NOT NULL, + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c0` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c0` ) BUCKETS 9 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); +-- result: +-- !result +INSERT INTO t0 +(c0, c1, c2) +VALUES +('0', '0', '0'), +('1', '1', '1'), +('2', '2', '2'), +('3', '3', '3'), +('4', '4', '4'), +('5', '5', '5'), +('6', '6', '6'), +('7', '7', '7'), +('8', '8', '8'), +('9', '9', '9'); +-- result: +-- !result +INSERT INTO t1 +(c0, c1, c2) +select * from t0; +-- result: +-- !result +INSERT INTO t2 +(c0, c1, c2) +select * from t0; +-- result: +-- !result + DROP TABLE if exists r; +-- result: +-- !result + CREATE TABLE if not exists r + ( + fp BIGINT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`fp` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`fp` ) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); +-- result: +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (0)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (0)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (1)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (1)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (2)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (2)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (3)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (3)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (4)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (4)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (5)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (5)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (6)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (6)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (7)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (7)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (8)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (8)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result +truncate table r; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (9)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (9)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +-- result: +-- !result +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +-- result: +1 1 +-- !result \ No newline at end of file diff --git a/test/sql/test_bucket_shuffle_right_join/T/test_bucket_shuffle_right_join b/test/sql/test_bucket_shuffle_right_join/T/test_bucket_shuffle_right_join new file mode 100644 index 0000000000000..17952c884d13f --- /dev/null +++ b/test/sql/test_bucket_shuffle_right_join/T/test_bucket_shuffle_right_join @@ -0,0 +1,165 @@ +-- name: test_bucket_shuffle_right_join + DROP TABLE if exists t0; + + CREATE TABLE if not exists t0 + ( + c0 BIGINT NOT NULL, + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c0` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c0` ) BUCKETS 9 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); + DROP TABLE if exists t1; + + CREATE TABLE if not exists t1 + ( + c0 BIGINT NOT NULL, + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c0` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c0` ) BUCKETS 9 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); + DROP TABLE if exists t2; + + CREATE TABLE if not exists t2 + ( + c0 BIGINT NOT NULL, + c1 BIGINT NOT NULL, + c2 BIGINT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c0` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c0` ) BUCKETS 9 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); +INSERT INTO t0 +(c0, c1, c2) +VALUES +('0', '0', '0'), +('1', '1', '1'), +('2', '2', '2'), +('3', '3', '3'), +('4', '4', '4'), +('5', '5', '5'), +('6', '6', '6'), +('7', '7', '7'), +('8', '8', '8'), +('9', '9', '9'); + +INSERT INTO t1 +(c0, c1, c2) +select * from t0; + +INSERT INTO t2 +(c0, c1, c2) +select * from t0; + + DROP TABLE if exists r; + + CREATE TABLE if not exists r + ( + fp BIGINT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`fp` ) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`fp` ) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "in_memory" = "false", + "storage_format" = "default" + ); +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (0)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (0)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (1)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (1)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (2)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (2)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (3)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (3)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (4)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (4)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (5)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (5)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (6)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (6)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (7)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (7)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (8)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (8)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r; +truncate table r; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (9)) t1 right outer join[bucket] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[bucket] t2 on t2.c0 = t1.c0) as t; +INSERT INTO r +(fp) +select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(c1,0))+murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(ab,0)))) as fingerprint from (select t0.c0,t0.c1,t0.c2,t1.c2 as ab from (select * from t1 where c0 in (9)) t1 right outer join[shuffle] (select if(murmur_hash3_32(c0)=0,c0,NULL) as c0,c1,c2 from t0) t0 on t0.c0 = t1.c0 left join[shuffle] t2 on t2.c0 = t1.c0) as t; +select assert_true(count(fp)=2), assert_true(count(distinct fp)=1) from r;