diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java index 649e2fa9bb28ea..fa72f8c01050b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java @@ -17,6 +17,9 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; @@ -25,16 +28,27 @@ import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TScanRangeParams; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** UnassignedScanSingleOlapTableJob */ public class UnassignedScanSingleOlapTableJob extends AbstractUnassignedScanJob { + private static final Logger LOG = LogManager.getLogger(UnassignedScanSingleOlapTableJob.class); + private OlapScanNode olapScanNode; private final ScanWorkerSelector scanWorkerSelector; @@ -81,9 +95,136 @@ protected List insideMachineParallelization( // instance 5: olapScanNode1: ScanRanges([tablet_10007]) // ], // } + if (usePartitionParallelismForQueryCache(workerToScanRanges, distributeContext)) { + try { + // Best effort optimization for query cache: keep tablets in same partition + // on the same instance to reduce BE concurrency pressure. + List partitionInstances = insideMachineParallelizationByPartition(workerToScanRanges); + if (partitionInstances != null) { + return partitionInstances; + } + } catch (Exception e) { + LOG.warn("Failed to assign query cache instances by partition, fallback to default planning", + e); + } + } + return super.insideMachineParallelization(workerToScanRanges, inputJobs, distributeContext); } + private List insideMachineParallelizationByPartition( + Map workerToScanRanges) { + List selectedPartitionIds = new ArrayList<>(olapScanNode.getSelectedPartitionIds()); + Map tabletToPartitionId = buildTabletToPartitionId(selectedPartitionIds); + if (tabletToPartitionId.size() != olapScanNode.getScanTabletIds().size()) { + return null; + } + + ConnectContext context = statementContext.getConnectContext(); + List instances = new ArrayList<>(); + for (Map.Entry entry : workerToScanRanges.entrySet()) { + DistributedPlanWorker worker = entry.getKey(); + ScanSource scanSource = entry.getValue().scanSource; + if (!(scanSource instanceof DefaultScanSource)) { + return null; + } + + DefaultScanSource defaultScanSource = (DefaultScanSource) scanSource; + ScanRanges scanRanges = defaultScanSource.scanNodeToScanRanges.get(olapScanNode); + if (scanRanges == null) { + return null; + } + if (scanRanges.params.isEmpty()) { + continue; + } + + Map partitionToScanRanges = splitScanRangesByPartition(scanRanges, tabletToPartitionId); + if (partitionToScanRanges == null) { + return null; + } + + // One partition on one BE maps to one instance. Different BEs may miss some partitions. + for (Long partitionId : selectedPartitionIds) { + ScanRanges partitionScanRanges = partitionToScanRanges.remove(partitionId); + if (partitionScanRanges == null || partitionScanRanges.params.isEmpty()) { + continue; + } + instances.add(assignWorkerAndDataSources( + instances.size(), context.nextInstanceId(), worker, + new DefaultScanSource(ImmutableMap.of(olapScanNode, partitionScanRanges)))); + } + + if (!partitionToScanRanges.isEmpty()) { + return null; + } + } + return instances; + } + + private boolean usePartitionParallelismForQueryCache( + Map workerToScanRanges, + DistributeContext distributeContext) { + if (fragment.queryCacheParam == null || workerToScanRanges.isEmpty()) { + return false; + } + + ConnectContext context = statementContext.getConnectContext(); + if (context == null || useLocalShuffleToAddParallel(distributeContext)) { + return false; + } + + long totalTabletNum = olapScanNode.getScanTabletIds().size(); + int parallelPipelineTaskNum = Math.max( + context.getSessionVariable().getParallelExecInstanceNum( + olapScanNode.getScanContext().getClusterName()), 1); + long threshold = (long) parallelPipelineTaskNum * workerToScanRanges.size(); + return totalTabletNum > threshold; + } + + private Map buildTabletToPartitionId(List selectedPartitionIds) { + long selectedIndexId = olapScanNode.getSelectedIndexId(); + if (selectedIndexId == -1) { + selectedIndexId = olapScanNode.getOlapTable().getBaseIndexId(); + } + + Set scanTabletIds = new LinkedHashSet<>(olapScanNode.getScanTabletIds()); + Map tabletToPartitionId = new LinkedHashMap<>(scanTabletIds.size()); + for (Long partitionId : selectedPartitionIds) { + Partition partition = olapScanNode.getOlapTable().getPartition(partitionId); + if (partition == null) { + continue; + } + MaterializedIndex index = partition.getIndex(selectedIndexId); + if (index == null) { + continue; + } + for (Tablet tablet : index.getTablets()) { + long tabletId = tablet.getId(); + if (scanTabletIds.contains(tabletId)) { + tabletToPartitionId.put(tabletId, partitionId); + } + } + } + return tabletToPartitionId; + } + + private Map splitScanRangesByPartition( + ScanRanges scanRanges, Map tabletToPartitionId) { + Map partitionToScanRanges = new LinkedHashMap<>(); + for (int i = 0; i < scanRanges.params.size(); i++) { + TScanRangeParams scanRangeParams = scanRanges.params.get(i); + long tabletId = scanRangeParams.getScanRange().getPaloScanRange().getTabletId(); + Long partitionId = tabletToPartitionId.get(tabletId); + if (partitionId == null) { + return null; + } + partitionToScanRanges + .computeIfAbsent(partitionId, id -> new ScanRanges()) + .addScanRange(scanRangeParams, scanRanges.bytes.get(i)); + } + return partitionToScanRanges; + } + @Override protected List fillUpAssignedJobs(List assignedJobs, DistributedPlanWorkerManager workerManager, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java new file mode 100644 index 00000000000000..097e7930959bff --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java @@ -0,0 +1,380 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute.worker.job; + +import org.apache.doris.catalog.LocalTablet; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.ScanContext; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.thrift.TPaloScanRange; +import org.apache.doris.thrift.TQueryCacheParam; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeParams; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class UnassignedScanSingleOlapTableJobTest { + @Test + public void testQueryCacheAssignByPartition() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + connectContext.setQueryId(new TUniqueId(1, 1)); + connectContext.getSessionVariable().parallelPipelineTaskNum = 1; + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("select * from t", 0)); + connectContext.setStatementContext(statementContext); + + long partitionOne = 100L; + long partitionTwo = 200L; + long selectedIndexId = 10L; + Map tabletToPartition = ImmutableMap.of( + 1L, partitionOne, + 2L, partitionOne, + 3L, partitionOne, + 4L, partitionTwo, + 5L, partitionTwo, + 6L, partitionTwo + ); + + OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class); + OlapTable olapTable = Mockito.mock(OlapTable.class); + Mockito.when(olapScanNode.getSelectedPartitionIds()) + .thenReturn(Arrays.asList(partitionOne, partitionTwo)); + Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId); + Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable); + Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY); + Mockito.when(olapScanNode.getScanTabletIds()) + .thenReturn(new ArrayList<>(tabletToPartition.keySet())); + + Partition firstPartition = Mockito.mock(Partition.class); + MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition); + Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex); + Mockito.when(firstIndex.getTablets()).thenReturn(ImmutableList.of( + tablet(1L), tablet(2L), tablet(3L) + )); + + Partition secondPartition = Mockito.mock(Partition.class); + MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition); + Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex); + Mockito.when(secondIndex.getTablets()).thenReturn(ImmutableList.of( + tablet(4L), tablet(5L), tablet(6L) + )); + + PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, DataPartition.RANDOM); + fragment.queryCacheParam = new TQueryCacheParam(); + + DistributedPlanWorker worker1 = new TestWorker(1L, "be1"); + DistributedPlanWorker worker2 = new TestWorker(2L, "be2"); + Map workerToScanSources + = new LinkedHashMap<>(); + // Same partition tablets on one BE should be grouped into one instance. + workerToScanSources.put(worker1, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L))))); + workerToScanSources.put(worker2, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L))))); + + ScanWorkerSelector scanWorkerSelector = Mockito.mock(ScanWorkerSelector.class); + Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket( + Mockito.eq(olapScanNode), Mockito.eq(connectContext) + )).thenReturn(workerToScanSources); + + UnassignedScanSingleOlapTableJob unassignedJob = new UnassignedScanSingleOlapTableJob( + statementContext, + fragment, + olapScanNode, + ArrayListMultimap.create(), + scanWorkerSelector + ); + DistributeContext distributeContext = new DistributeContext( + Mockito.mock(DistributedPlanWorkerManager.class), + true + ); + + List assignedJobs = unassignedJob.computeAssignedJobs( + distributeContext, ArrayListMultimap.create()); + + Assertions.assertEquals(4, assignedJobs.size()); + + Map>> workerToInstanceTablets = new HashMap<>(); + for (AssignedJob assignedJob : assignedJobs) { + DefaultScanSource defaultScanSource = (DefaultScanSource) assignedJob.getScanSource(); + ScanRanges ranges = defaultScanSource.scanNodeToScanRanges.get(olapScanNode); + Set tabletIds = ranges.params.stream() + .map(param -> param.getScanRange().getPaloScanRange().getTabletId()) + .collect(Collectors.toCollection(HashSet::new)); + Set partitionIds = tabletIds.stream() + .map(tabletToPartition::get) + .collect(Collectors.toSet()); + + // Every instance must only contain tablets from one partition. + Assertions.assertEquals(1, partitionIds.size()); + + workerToInstanceTablets.computeIfAbsent( + assignedJob.getAssignedWorker().id(), k -> new HashSet<>() + ).add(tabletIds); + } + + Map>> expected = new HashMap<>(); + expected.put(1L, new HashSet<>(Arrays.asList( + new HashSet<>(Arrays.asList(1L, 2L)), + new HashSet<>(Arrays.asList(4L)) + ))); + expected.put(2L, new HashSet<>(Arrays.asList( + new HashSet<>(Arrays.asList(3L)), + new HashSet<>(Arrays.asList(5L, 6L)) + ))); + + // Different partitions are split into different instances on each BE. + Assertions.assertEquals(expected, workerToInstanceTablets); + } + + @Test + public void testQueryCacheFallbackToDefaultWhenPartitionMappingIncomplete() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + connectContext.setQueryId(new TUniqueId(2, 2)); + connectContext.getSessionVariable().parallelPipelineTaskNum = 1; + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("select * from t", 0)); + connectContext.setStatementContext(statementContext); + + long partitionOne = 100L; + long selectedIndexId = 10L; + + OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class); + OlapTable olapTable = Mockito.mock(OlapTable.class); + // Intentionally miss partitionTwo to trigger fallback. + Mockito.when(olapScanNode.getSelectedPartitionIds()) + .thenReturn(ImmutableList.of(partitionOne)); + Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId); + Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable); + Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY); + Mockito.when(olapScanNode.getScanTabletIds()) + .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L, 5L, 6L))); + + Partition firstPartition = Mockito.mock(Partition.class); + MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition); + Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex); + Mockito.when(firstIndex.getTablets()) + .thenReturn(ImmutableList.of(tablet(1L), tablet(2L), tablet(3L))); + + PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, DataPartition.RANDOM); + fragment.queryCacheParam = new TQueryCacheParam(); + + DistributedPlanWorker worker1 = new TestWorker(1L, "be1"); + DistributedPlanWorker worker2 = new TestWorker(2L, "be2"); + Map workerToScanSources + = new LinkedHashMap<>(); + workerToScanSources.put(worker1, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L))))); + workerToScanSources.put(worker2, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L))))); + + ScanWorkerSelector scanWorkerSelector = Mockito.mock(ScanWorkerSelector.class); + Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket( + Mockito.eq(olapScanNode), Mockito.eq(connectContext) + )).thenReturn(workerToScanSources); + + UnassignedScanSingleOlapTableJob unassignedJob = new UnassignedScanSingleOlapTableJob( + statementContext, + fragment, + olapScanNode, + ArrayListMultimap.create(), + scanWorkerSelector + ); + + List assignedJobs = unassignedJob.computeAssignedJobs( + new DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true), + ArrayListMultimap.create()); + + // query cache default planning uses one instance per tablet. + Assertions.assertEquals(6, assignedJobs.size()); + } + + @Test + public void testNonQueryCacheUseDefaultPlanning() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + connectContext.setQueryId(new TUniqueId(3, 3)); + connectContext.getSessionVariable().parallelPipelineTaskNum = 1; + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("select * from t", 0)); + connectContext.setStatementContext(statementContext); + + long partitionOne = 100L; + long partitionTwo = 200L; + long selectedIndexId = 10L; + + OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class); + OlapTable olapTable = Mockito.mock(OlapTable.class); + Mockito.when(olapScanNode.getSelectedPartitionIds()) + .thenReturn(Arrays.asList(partitionOne, partitionTwo)); + Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId); + Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable); + Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY); + Mockito.when(olapScanNode.getScanTabletIds()) + .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L, 5L, 6L))); + + Partition firstPartition = Mockito.mock(Partition.class); + MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition); + Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex); + Mockito.when(firstIndex.getTablets()) + .thenReturn(ImmutableList.of(tablet(1L), tablet(2L), tablet(3L))); + + Partition secondPartition = Mockito.mock(Partition.class); + MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition); + Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex); + Mockito.when(secondIndex.getTablets()) + .thenReturn(ImmutableList.of(tablet(4L), tablet(5L), tablet(6L))); + + PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, DataPartition.RANDOM); + // No query cache param, must use default planning. + fragment.setParallelExecNum(10); + + DistributedPlanWorker worker1 = new TestWorker(1L, "be1"); + DistributedPlanWorker worker2 = new TestWorker(2L, "be2"); + Map workerToScanSources + = new LinkedHashMap<>(); + workerToScanSources.put(worker1, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L))))); + workerToScanSources.put(worker2, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L))))); + + ScanWorkerSelector scanWorkerSelector = Mockito.mock(ScanWorkerSelector.class); + Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket( + Mockito.eq(olapScanNode), Mockito.eq(connectContext) + )).thenReturn(workerToScanSources); + + UnassignedScanSingleOlapTableJob unassignedJob = new UnassignedScanSingleOlapTableJob( + statementContext, + fragment, + olapScanNode, + ArrayListMultimap.create(), + scanWorkerSelector + ); + + List assignedJobs = unassignedJob.computeAssignedJobs( + new DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true), + ArrayListMultimap.create()); + + // default planning splits by tablet count when parallelExecNum is large enough. + Assertions.assertEquals(6, assignedJobs.size()); + } + + private static Tablet tablet(long tabletId) { + return new LocalTablet(tabletId); + } + + private static ScanRanges scanRanges(long... tabletIds) { + ScanRanges scanRanges = new ScanRanges(); + for (long tabletId : tabletIds) { + TPaloScanRange paloScanRange = new TPaloScanRange(); + paloScanRange.setTabletId(tabletId); + TScanRange scanRange = new TScanRange(); + scanRange.setPaloScanRange(paloScanRange); + TScanRangeParams scanRangeParams = new TScanRangeParams(); + scanRangeParams.setScanRange(scanRange); + scanRanges.addScanRange(scanRangeParams, 1L); + } + return scanRanges; + } + + private static class TestWorker implements DistributedPlanWorker { + private final long id; + private final String address; + + private TestWorker(long id, String address) { + this.id = id; + this.address = address; + } + + @Override + public long getCatalogId() { + return 0; + } + + @Override + public long id() { + return id; + } + + @Override + public String address() { + return address; + } + + @Override + public String host() { + return address; + } + + @Override + public int port() { + return 0; + } + + @Override + public String brpcAddress() { + return address; + } + + @Override + public int brpcPort() { + return 0; + } + + @Override + public boolean available() { + return true; + } + } +} diff --git a/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out new file mode 100644 index 00000000000000..111f7a196fa64a --- /dev/null +++ b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !partition_instance_query_result -- +/a 75 +/b 105 +/c 135 +/d 165 diff --git a/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy new file mode 100644 index 00000000000000..6d2281e3ef6589 --- /dev/null +++ b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partition_instance_query_cache") { + def tableName = "test_partition_instance" + def querySql = """ + SELECT + url, + SUM(cost) AS total_cost + FROM ${tableName} + WHERE dt >= '2026-01-01' + AND dt < '2026-01-15' + GROUP BY url + """ + + sql "set enable_nereids_planner=true" + sql "set enable_nereids_distribute_planner=true" + sql "set enable_query_cache=true" + sql "set parallel_pipeline_task_num=3" + sql "set enable_sql_cache=false" + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE ${tableName} ( + dt DATE, + user_id INT, + url STRING, + cost BIGINT + ) + ENGINE=OLAP + DUPLICATE KEY(dt, user_id) + PARTITION BY RANGE(dt) + ( + PARTITION p20260101 VALUES LESS THAN ("2026-01-05"), + PARTITION p20260105 VALUES LESS THAN ("2026-01-10"), + PARTITION p20260110 VALUES LESS THAN ("2026-01-15") + ) + DISTRIBUTED BY HASH(user_id) BUCKETS 3 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + + sql """ + INSERT INTO ${tableName} VALUES + ('2026-01-01',1,'/a',10), + ('2026-01-01',2,'/b',20), + ('2026-01-02',3,'/c',30), + ('2026-01-03',4,'/d',40), + + ('2026-01-06',1,'/a',15), + ('2026-01-06',2,'/b',25), + ('2026-01-07',3,'/c',35), + ('2026-01-08',4,'/d',45), + + ('2026-01-11',1,'/a',50), + ('2026-01-11',2,'/b',60), + ('2026-01-12',3,'/c',70), + ('2026-01-13',4,'/d',80) + """ + + order_qt_partition_instance_query_result """ + ${querySql} + ORDER BY url + """ + + def normalize = { rows -> + return rows.collect { row -> row.collect { col -> String.valueOf(col) }.join("|") }.sort() + } + + def baseline = normalize(sql(querySql)) + for (int i = 0; i < 3; i++) { + assertEquals(baseline, normalize(sql(querySql))) + } + + explain { + sql(querySql) + contains("DIGEST") + } + + def distributedRows = sql("EXPLAIN DISTRIBUTED PLAN ${querySql}") + def distributedPlan = distributedRows.collect { it[0].toString() }.join("\n") + assertTrue(distributedPlan.contains("UnassignedScanSingleOlapTableJob")) + + def partitionMatcher = (distributedPlan =~ /partitions=(\d+)\/(\d+)/) + assertTrue(partitionMatcher.find()) + int partitionCount = Integer.parseInt(partitionMatcher.group(1)) + + int scanFragmentBegin = distributedPlan.indexOf("fragmentJob: UnassignedScanSingleOlapTableJob") + assertTrue(scanFragmentBegin > 0) + def scanFragment = distributedPlan.substring(scanFragmentBegin) + + int scanInstanceCount = (scanFragment =~ /StaticAssignedJob\(/).count + assertEquals(partitionCount, scanInstanceCount) + + def instanceToTablets = [:].withDefault { [] } + String currentInstance = null + scanFragment.eachLine { line -> + def instanceMatcher = (line =~ /instanceId:\s*([0-9a-f\-]+)/) + if (instanceMatcher.find()) { + currentInstance = instanceMatcher.group(1) + instanceToTablets[currentInstance] = [] + } + + def tabletMatcher = (line =~ /tablet\s+(\d+)/) + if (tabletMatcher.find() && currentInstance != null) { + instanceToTablets[currentInstance] << tabletMatcher.group(1) + } + } + + assertEquals(partitionCount, instanceToTablets.size()) + instanceToTablets.each { _, tablets -> + assertTrue(tablets.size() > 0) + } + + def tabletToInstance = [:] + instanceToTablets.each { instanceId, tablets -> + tablets.each { tabletId -> + tabletToInstance[tabletId] = instanceId + } + } + + ["p20260101", "p20260105", "p20260110"].each { partitionName -> + def partitionTabletRows = sql("SHOW TABLETS FROM ${tableName} PARTITION(${partitionName})") + def partitionTabletIds = partitionTabletRows.collect { it[0].toString() } + assertTrue(partitionTabletIds.size() > 0) + + partitionTabletIds.each { tabletId -> + assertTrue(tabletToInstance.containsKey(tabletId)) + } + + def partitionInstanceIds = partitionTabletIds.collect { tabletId -> tabletToInstance[tabletId] }.toSet() + assertEquals(1, partitionInstanceIds.size()) + } + + sql "DROP TABLE IF EXISTS ${tableName}" +}