From e437bad65c3f22080ccb04f27622d9a2f5eab5fe Mon Sep 17 00:00:00 2001 From: 924060929 Date: Tue, 17 Mar 2026 20:16:40 +0800 Subject: [PATCH] [Fix](query cache) support partition-based instance parallelism (#60974) ### What problem does this PR solve? When total tablets are much larger than pipeline capacity, one-tablet-per-instance planning creates excessive BE concurrency pressure in query-cache workloads. Trigger partition-based planning when: total_tablets > parallel_pipeline_task_num * participating_be_num Before: instance_num ~= total_tablets After: instance_num ~= partitions_on_each_be Per-BE planning example: BE1 tablets: p1[t1,t2], p2[t3] -> instances: [p1:t1,t2], [p2:t3] BE2 tablets: p1[t4], p2[t5,t6] -> instances: [p1:t4], [p2:t5,t6] This keeps tablets from the same partition in one instance and separates different partitions into different instances. If partition mapping is incomplete or partition planning fails, fallback to default planning for correctness. Tests: - partition-based planning path - fallback-to-default path (incomplete mapping) - non-query-cache default planning path --- .../job/UnassignedScanSingleOlapTableJob.java | 141 +++++++ .../UnassignedScanSingleOlapTableJobTest.java | 380 ++++++++++++++++++ .../test_partition_instance_query_cache.out | 6 + ...test_partition_instance_query_cache.groovy | 152 +++++++ 4 files changed, 679 insertions(+) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java create mode 100644 regression-test/data/query_p0/cache/test_partition_instance_query_cache.out create mode 100644 regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java index 649e2fa9bb28ea..fa72f8c01050b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java @@ -17,6 +17,9 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; @@ -25,16 +28,27 @@ import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TScanRangeParams; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** UnassignedScanSingleOlapTableJob */ public class UnassignedScanSingleOlapTableJob extends AbstractUnassignedScanJob { + private static final Logger LOG = LogManager.getLogger(UnassignedScanSingleOlapTableJob.class); + private OlapScanNode olapScanNode; private final ScanWorkerSelector scanWorkerSelector; @@ -81,9 +95,136 @@ protected List insideMachineParallelization( // instance 5: olapScanNode1: ScanRanges([tablet_10007]) // ], // } + if (usePartitionParallelismForQueryCache(workerToScanRanges, distributeContext)) { + try { + // Best effort optimization for query cache: keep tablets in same partition + // on the same instance to reduce BE concurrency pressure. + List partitionInstances = insideMachineParallelizationByPartition(workerToScanRanges); + if (partitionInstances != null) { + return partitionInstances; + } + } catch (Exception e) { + LOG.warn("Failed to assign query cache instances by partition, fallback to default planning", + e); + } + } + return super.insideMachineParallelization(workerToScanRanges, inputJobs, distributeContext); } + private List insideMachineParallelizationByPartition( + Map workerToScanRanges) { + List selectedPartitionIds = new ArrayList<>(olapScanNode.getSelectedPartitionIds()); + Map tabletToPartitionId = buildTabletToPartitionId(selectedPartitionIds); + if (tabletToPartitionId.size() != olapScanNode.getScanTabletIds().size()) { + return null; + } + + ConnectContext context = statementContext.getConnectContext(); + List instances = new ArrayList<>(); + for (Map.Entry entry : workerToScanRanges.entrySet()) { + DistributedPlanWorker worker = entry.getKey(); + ScanSource scanSource = entry.getValue().scanSource; + if (!(scanSource instanceof DefaultScanSource)) { + return null; + } + + DefaultScanSource defaultScanSource = (DefaultScanSource) scanSource; + ScanRanges scanRanges = defaultScanSource.scanNodeToScanRanges.get(olapScanNode); + if (scanRanges == null) { + return null; + } + if (scanRanges.params.isEmpty()) { + continue; + } + + Map partitionToScanRanges = splitScanRangesByPartition(scanRanges, tabletToPartitionId); + if (partitionToScanRanges == null) { + return null; + } + + // One partition on one BE maps to one instance. Different BEs may miss some partitions. + for (Long partitionId : selectedPartitionIds) { + ScanRanges partitionScanRanges = partitionToScanRanges.remove(partitionId); + if (partitionScanRanges == null || partitionScanRanges.params.isEmpty()) { + continue; + } + instances.add(assignWorkerAndDataSources( + instances.size(), context.nextInstanceId(), worker, + new DefaultScanSource(ImmutableMap.of(olapScanNode, partitionScanRanges)))); + } + + if (!partitionToScanRanges.isEmpty()) { + return null; + } + } + return instances; + } + + private boolean usePartitionParallelismForQueryCache( + Map workerToScanRanges, + DistributeContext distributeContext) { + if (fragment.queryCacheParam == null || workerToScanRanges.isEmpty()) { + return false; + } + + ConnectContext context = statementContext.getConnectContext(); + if (context == null || useLocalShuffleToAddParallel(distributeContext)) { + return false; + } + + long totalTabletNum = olapScanNode.getScanTabletIds().size(); + int parallelPipelineTaskNum = Math.max( + context.getSessionVariable().getParallelExecInstanceNum( + olapScanNode.getScanContext().getClusterName()), 1); + long threshold = (long) parallelPipelineTaskNum * workerToScanRanges.size(); + return totalTabletNum > threshold; + } + + private Map buildTabletToPartitionId(List selectedPartitionIds) { + long selectedIndexId = olapScanNode.getSelectedIndexId(); + if (selectedIndexId == -1) { + selectedIndexId = olapScanNode.getOlapTable().getBaseIndexId(); + } + + Set scanTabletIds = new LinkedHashSet<>(olapScanNode.getScanTabletIds()); + Map tabletToPartitionId = new LinkedHashMap<>(scanTabletIds.size()); + for (Long partitionId : selectedPartitionIds) { + Partition partition = olapScanNode.getOlapTable().getPartition(partitionId); + if (partition == null) { + continue; + } + MaterializedIndex index = partition.getIndex(selectedIndexId); + if (index == null) { + continue; + } + for (Tablet tablet : index.getTablets()) { + long tabletId = tablet.getId(); + if (scanTabletIds.contains(tabletId)) { + tabletToPartitionId.put(tabletId, partitionId); + } + } + } + return tabletToPartitionId; + } + + private Map splitScanRangesByPartition( + ScanRanges scanRanges, Map tabletToPartitionId) { + Map partitionToScanRanges = new LinkedHashMap<>(); + for (int i = 0; i < scanRanges.params.size(); i++) { + TScanRangeParams scanRangeParams = scanRanges.params.get(i); + long tabletId = scanRangeParams.getScanRange().getPaloScanRange().getTabletId(); + Long partitionId = tabletToPartitionId.get(tabletId); + if (partitionId == null) { + return null; + } + partitionToScanRanges + .computeIfAbsent(partitionId, id -> new ScanRanges()) + .addScanRange(scanRangeParams, scanRanges.bytes.get(i)); + } + return partitionToScanRanges; + } + @Override protected List fillUpAssignedJobs(List assignedJobs, DistributedPlanWorkerManager workerManager, ListMultimap inputJobs) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java new file mode 100644 index 00000000000000..097e7930959bff --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java @@ -0,0 +1,380 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute.worker.job; + +import org.apache.doris.catalog.LocalTablet; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.ScanContext; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.thrift.TPaloScanRange; +import org.apache.doris.thrift.TQueryCacheParam; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeParams; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class UnassignedScanSingleOlapTableJobTest { + @Test + public void testQueryCacheAssignByPartition() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + connectContext.setQueryId(new TUniqueId(1, 1)); + connectContext.getSessionVariable().parallelPipelineTaskNum = 1; + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("select * from t", 0)); + connectContext.setStatementContext(statementContext); + + long partitionOne = 100L; + long partitionTwo = 200L; + long selectedIndexId = 10L; + Map tabletToPartition = ImmutableMap.of( + 1L, partitionOne, + 2L, partitionOne, + 3L, partitionOne, + 4L, partitionTwo, + 5L, partitionTwo, + 6L, partitionTwo + ); + + OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class); + OlapTable olapTable = Mockito.mock(OlapTable.class); + Mockito.when(olapScanNode.getSelectedPartitionIds()) + .thenReturn(Arrays.asList(partitionOne, partitionTwo)); + Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId); + Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable); + Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY); + Mockito.when(olapScanNode.getScanTabletIds()) + .thenReturn(new ArrayList<>(tabletToPartition.keySet())); + + Partition firstPartition = Mockito.mock(Partition.class); + MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition); + Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex); + Mockito.when(firstIndex.getTablets()).thenReturn(ImmutableList.of( + tablet(1L), tablet(2L), tablet(3L) + )); + + Partition secondPartition = Mockito.mock(Partition.class); + MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition); + Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex); + Mockito.when(secondIndex.getTablets()).thenReturn(ImmutableList.of( + tablet(4L), tablet(5L), tablet(6L) + )); + + PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, DataPartition.RANDOM); + fragment.queryCacheParam = new TQueryCacheParam(); + + DistributedPlanWorker worker1 = new TestWorker(1L, "be1"); + DistributedPlanWorker worker2 = new TestWorker(2L, "be2"); + Map workerToScanSources + = new LinkedHashMap<>(); + // Same partition tablets on one BE should be grouped into one instance. + workerToScanSources.put(worker1, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L))))); + workerToScanSources.put(worker2, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L))))); + + ScanWorkerSelector scanWorkerSelector = Mockito.mock(ScanWorkerSelector.class); + Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket( + Mockito.eq(olapScanNode), Mockito.eq(connectContext) + )).thenReturn(workerToScanSources); + + UnassignedScanSingleOlapTableJob unassignedJob = new UnassignedScanSingleOlapTableJob( + statementContext, + fragment, + olapScanNode, + ArrayListMultimap.create(), + scanWorkerSelector + ); + DistributeContext distributeContext = new DistributeContext( + Mockito.mock(DistributedPlanWorkerManager.class), + true + ); + + List assignedJobs = unassignedJob.computeAssignedJobs( + distributeContext, ArrayListMultimap.create()); + + Assertions.assertEquals(4, assignedJobs.size()); + + Map>> workerToInstanceTablets = new HashMap<>(); + for (AssignedJob assignedJob : assignedJobs) { + DefaultScanSource defaultScanSource = (DefaultScanSource) assignedJob.getScanSource(); + ScanRanges ranges = defaultScanSource.scanNodeToScanRanges.get(olapScanNode); + Set tabletIds = ranges.params.stream() + .map(param -> param.getScanRange().getPaloScanRange().getTabletId()) + .collect(Collectors.toCollection(HashSet::new)); + Set partitionIds = tabletIds.stream() + .map(tabletToPartition::get) + .collect(Collectors.toSet()); + + // Every instance must only contain tablets from one partition. + Assertions.assertEquals(1, partitionIds.size()); + + workerToInstanceTablets.computeIfAbsent( + assignedJob.getAssignedWorker().id(), k -> new HashSet<>() + ).add(tabletIds); + } + + Map>> expected = new HashMap<>(); + expected.put(1L, new HashSet<>(Arrays.asList( + new HashSet<>(Arrays.asList(1L, 2L)), + new HashSet<>(Arrays.asList(4L)) + ))); + expected.put(2L, new HashSet<>(Arrays.asList( + new HashSet<>(Arrays.asList(3L)), + new HashSet<>(Arrays.asList(5L, 6L)) + ))); + + // Different partitions are split into different instances on each BE. + Assertions.assertEquals(expected, workerToInstanceTablets); + } + + @Test + public void testQueryCacheFallbackToDefaultWhenPartitionMappingIncomplete() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + connectContext.setQueryId(new TUniqueId(2, 2)); + connectContext.getSessionVariable().parallelPipelineTaskNum = 1; + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("select * from t", 0)); + connectContext.setStatementContext(statementContext); + + long partitionOne = 100L; + long selectedIndexId = 10L; + + OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class); + OlapTable olapTable = Mockito.mock(OlapTable.class); + // Intentionally miss partitionTwo to trigger fallback. + Mockito.when(olapScanNode.getSelectedPartitionIds()) + .thenReturn(ImmutableList.of(partitionOne)); + Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId); + Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable); + Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY); + Mockito.when(olapScanNode.getScanTabletIds()) + .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L, 5L, 6L))); + + Partition firstPartition = Mockito.mock(Partition.class); + MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition); + Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex); + Mockito.when(firstIndex.getTablets()) + .thenReturn(ImmutableList.of(tablet(1L), tablet(2L), tablet(3L))); + + PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, DataPartition.RANDOM); + fragment.queryCacheParam = new TQueryCacheParam(); + + DistributedPlanWorker worker1 = new TestWorker(1L, "be1"); + DistributedPlanWorker worker2 = new TestWorker(2L, "be2"); + Map workerToScanSources + = new LinkedHashMap<>(); + workerToScanSources.put(worker1, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L))))); + workerToScanSources.put(worker2, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L))))); + + ScanWorkerSelector scanWorkerSelector = Mockito.mock(ScanWorkerSelector.class); + Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket( + Mockito.eq(olapScanNode), Mockito.eq(connectContext) + )).thenReturn(workerToScanSources); + + UnassignedScanSingleOlapTableJob unassignedJob = new UnassignedScanSingleOlapTableJob( + statementContext, + fragment, + olapScanNode, + ArrayListMultimap.create(), + scanWorkerSelector + ); + + List assignedJobs = unassignedJob.computeAssignedJobs( + new DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true), + ArrayListMultimap.create()); + + // query cache default planning uses one instance per tablet. + Assertions.assertEquals(6, assignedJobs.size()); + } + + @Test + public void testNonQueryCacheUseDefaultPlanning() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + connectContext.setQueryId(new TUniqueId(3, 3)); + connectContext.getSessionVariable().parallelPipelineTaskNum = 1; + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("select * from t", 0)); + connectContext.setStatementContext(statementContext); + + long partitionOne = 100L; + long partitionTwo = 200L; + long selectedIndexId = 10L; + + OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class); + OlapTable olapTable = Mockito.mock(OlapTable.class); + Mockito.when(olapScanNode.getSelectedPartitionIds()) + .thenReturn(Arrays.asList(partitionOne, partitionTwo)); + Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId); + Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable); + Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY); + Mockito.when(olapScanNode.getScanTabletIds()) + .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L, 5L, 6L))); + + Partition firstPartition = Mockito.mock(Partition.class); + MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition); + Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex); + Mockito.when(firstIndex.getTablets()) + .thenReturn(ImmutableList.of(tablet(1L), tablet(2L), tablet(3L))); + + Partition secondPartition = Mockito.mock(Partition.class); + MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class); + Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition); + Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex); + Mockito.when(secondIndex.getTablets()) + .thenReturn(ImmutableList.of(tablet(4L), tablet(5L), tablet(6L))); + + PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, DataPartition.RANDOM); + // No query cache param, must use default planning. + fragment.setParallelExecNum(10); + + DistributedPlanWorker worker1 = new TestWorker(1L, "be1"); + DistributedPlanWorker worker2 = new TestWorker(2L, "be2"); + Map workerToScanSources + = new LinkedHashMap<>(); + workerToScanSources.put(worker1, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L))))); + workerToScanSources.put(worker2, new UninstancedScanSource(new DefaultScanSource( + ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L))))); + + ScanWorkerSelector scanWorkerSelector = Mockito.mock(ScanWorkerSelector.class); + Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket( + Mockito.eq(olapScanNode), Mockito.eq(connectContext) + )).thenReturn(workerToScanSources); + + UnassignedScanSingleOlapTableJob unassignedJob = new UnassignedScanSingleOlapTableJob( + statementContext, + fragment, + olapScanNode, + ArrayListMultimap.create(), + scanWorkerSelector + ); + + List assignedJobs = unassignedJob.computeAssignedJobs( + new DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true), + ArrayListMultimap.create()); + + // default planning splits by tablet count when parallelExecNum is large enough. + Assertions.assertEquals(6, assignedJobs.size()); + } + + private static Tablet tablet(long tabletId) { + return new LocalTablet(tabletId); + } + + private static ScanRanges scanRanges(long... tabletIds) { + ScanRanges scanRanges = new ScanRanges(); + for (long tabletId : tabletIds) { + TPaloScanRange paloScanRange = new TPaloScanRange(); + paloScanRange.setTabletId(tabletId); + TScanRange scanRange = new TScanRange(); + scanRange.setPaloScanRange(paloScanRange); + TScanRangeParams scanRangeParams = new TScanRangeParams(); + scanRangeParams.setScanRange(scanRange); + scanRanges.addScanRange(scanRangeParams, 1L); + } + return scanRanges; + } + + private static class TestWorker implements DistributedPlanWorker { + private final long id; + private final String address; + + private TestWorker(long id, String address) { + this.id = id; + this.address = address; + } + + @Override + public long getCatalogId() { + return 0; + } + + @Override + public long id() { + return id; + } + + @Override + public String address() { + return address; + } + + @Override + public String host() { + return address; + } + + @Override + public int port() { + return 0; + } + + @Override + public String brpcAddress() { + return address; + } + + @Override + public int brpcPort() { + return 0; + } + + @Override + public boolean available() { + return true; + } + } +} diff --git a/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out new file mode 100644 index 00000000000000..111f7a196fa64a --- /dev/null +++ b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !partition_instance_query_result -- +/a 75 +/b 105 +/c 135 +/d 165 diff --git a/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy new file mode 100644 index 00000000000000..6d2281e3ef6589 --- /dev/null +++ b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partition_instance_query_cache") { + def tableName = "test_partition_instance" + def querySql = """ + SELECT + url, + SUM(cost) AS total_cost + FROM ${tableName} + WHERE dt >= '2026-01-01' + AND dt < '2026-01-15' + GROUP BY url + """ + + sql "set enable_nereids_planner=true" + sql "set enable_nereids_distribute_planner=true" + sql "set enable_query_cache=true" + sql "set parallel_pipeline_task_num=3" + sql "set enable_sql_cache=false" + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE ${tableName} ( + dt DATE, + user_id INT, + url STRING, + cost BIGINT + ) + ENGINE=OLAP + DUPLICATE KEY(dt, user_id) + PARTITION BY RANGE(dt) + ( + PARTITION p20260101 VALUES LESS THAN ("2026-01-05"), + PARTITION p20260105 VALUES LESS THAN ("2026-01-10"), + PARTITION p20260110 VALUES LESS THAN ("2026-01-15") + ) + DISTRIBUTED BY HASH(user_id) BUCKETS 3 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + + sql """ + INSERT INTO ${tableName} VALUES + ('2026-01-01',1,'/a',10), + ('2026-01-01',2,'/b',20), + ('2026-01-02',3,'/c',30), + ('2026-01-03',4,'/d',40), + + ('2026-01-06',1,'/a',15), + ('2026-01-06',2,'/b',25), + ('2026-01-07',3,'/c',35), + ('2026-01-08',4,'/d',45), + + ('2026-01-11',1,'/a',50), + ('2026-01-11',2,'/b',60), + ('2026-01-12',3,'/c',70), + ('2026-01-13',4,'/d',80) + """ + + order_qt_partition_instance_query_result """ + ${querySql} + ORDER BY url + """ + + def normalize = { rows -> + return rows.collect { row -> row.collect { col -> String.valueOf(col) }.join("|") }.sort() + } + + def baseline = normalize(sql(querySql)) + for (int i = 0; i < 3; i++) { + assertEquals(baseline, normalize(sql(querySql))) + } + + explain { + sql(querySql) + contains("DIGEST") + } + + def distributedRows = sql("EXPLAIN DISTRIBUTED PLAN ${querySql}") + def distributedPlan = distributedRows.collect { it[0].toString() }.join("\n") + assertTrue(distributedPlan.contains("UnassignedScanSingleOlapTableJob")) + + def partitionMatcher = (distributedPlan =~ /partitions=(\d+)\/(\d+)/) + assertTrue(partitionMatcher.find()) + int partitionCount = Integer.parseInt(partitionMatcher.group(1)) + + int scanFragmentBegin = distributedPlan.indexOf("fragmentJob: UnassignedScanSingleOlapTableJob") + assertTrue(scanFragmentBegin > 0) + def scanFragment = distributedPlan.substring(scanFragmentBegin) + + int scanInstanceCount = (scanFragment =~ /StaticAssignedJob\(/).count + assertEquals(partitionCount, scanInstanceCount) + + def instanceToTablets = [:].withDefault { [] } + String currentInstance = null + scanFragment.eachLine { line -> + def instanceMatcher = (line =~ /instanceId:\s*([0-9a-f\-]+)/) + if (instanceMatcher.find()) { + currentInstance = instanceMatcher.group(1) + instanceToTablets[currentInstance] = [] + } + + def tabletMatcher = (line =~ /tablet\s+(\d+)/) + if (tabletMatcher.find() && currentInstance != null) { + instanceToTablets[currentInstance] << tabletMatcher.group(1) + } + } + + assertEquals(partitionCount, instanceToTablets.size()) + instanceToTablets.each { _, tablets -> + assertTrue(tablets.size() > 0) + } + + def tabletToInstance = [:] + instanceToTablets.each { instanceId, tablets -> + tablets.each { tabletId -> + tabletToInstance[tabletId] = instanceId + } + } + + ["p20260101", "p20260105", "p20260110"].each { partitionName -> + def partitionTabletRows = sql("SHOW TABLETS FROM ${tableName} PARTITION(${partitionName})") + def partitionTabletIds = partitionTabletRows.collect { it[0].toString() } + assertTrue(partitionTabletIds.size() > 0) + + partitionTabletIds.each { tabletId -> + assertTrue(tabletToInstance.containsKey(tabletId)) + } + + def partitionInstanceIds = partitionTabletIds.collect { tabletId -> tabletToInstance[tabletId] }.toSet() + assertEquals(1, partitionInstanceIds.size()) + } + + sql "DROP TABLE IF EXISTS ${tableName}" +}