Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
Expand All @@ -25,16 +28,27 @@
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TScanRangeParams;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/** UnassignedScanSingleOlapTableJob */
public class UnassignedScanSingleOlapTableJob extends AbstractUnassignedScanJob {
private static final Logger LOG = LogManager.getLogger(UnassignedScanSingleOlapTableJob.class);

private OlapScanNode olapScanNode;
private final ScanWorkerSelector scanWorkerSelector;

Expand Down Expand Up @@ -81,9 +95,136 @@ protected List<AssignedJob> insideMachineParallelization(
// instance 5: olapScanNode1: ScanRanges([tablet_10007])
// ],
// }
if (usePartitionParallelismForQueryCache(workerToScanRanges, distributeContext)) {
try {
// Best effort optimization for query cache: keep tablets in same partition
// on the same instance to reduce BE concurrency pressure.
List<AssignedJob> partitionInstances = insideMachineParallelizationByPartition(workerToScanRanges);
if (partitionInstances != null) {
return partitionInstances;
}
} catch (Exception e) {
LOG.warn("Failed to assign query cache instances by partition, fallback to default planning",
e);
}
}

return super.insideMachineParallelization(workerToScanRanges, inputJobs, distributeContext);
}

private List<AssignedJob> insideMachineParallelizationByPartition(
Map<DistributedPlanWorker, UninstancedScanSource> workerToScanRanges) {
List<Long> selectedPartitionIds = new ArrayList<>(olapScanNode.getSelectedPartitionIds());
Map<Long, Long> tabletToPartitionId = buildTabletToPartitionId(selectedPartitionIds);
if (tabletToPartitionId.size() != olapScanNode.getScanTabletIds().size()) {
return null;
}

ConnectContext context = statementContext.getConnectContext();
List<AssignedJob> instances = new ArrayList<>();
for (Map.Entry<DistributedPlanWorker, UninstancedScanSource> entry : workerToScanRanges.entrySet()) {
DistributedPlanWorker worker = entry.getKey();
ScanSource scanSource = entry.getValue().scanSource;
if (!(scanSource instanceof DefaultScanSource)) {
return null;
}

DefaultScanSource defaultScanSource = (DefaultScanSource) scanSource;
ScanRanges scanRanges = defaultScanSource.scanNodeToScanRanges.get(olapScanNode);
if (scanRanges == null) {
return null;
}
if (scanRanges.params.isEmpty()) {
continue;
}

Map<Long, ScanRanges> partitionToScanRanges = splitScanRangesByPartition(scanRanges, tabletToPartitionId);
if (partitionToScanRanges == null) {
return null;
}

// One partition on one BE maps to one instance. Different BEs may miss some partitions.
for (Long partitionId : selectedPartitionIds) {
ScanRanges partitionScanRanges = partitionToScanRanges.remove(partitionId);
if (partitionScanRanges == null || partitionScanRanges.params.isEmpty()) {
continue;
}
instances.add(assignWorkerAndDataSources(
instances.size(), context.nextInstanceId(), worker,
new DefaultScanSource(ImmutableMap.of(olapScanNode, partitionScanRanges))));
}

if (!partitionToScanRanges.isEmpty()) {
return null;
}
}
return instances;
}

private boolean usePartitionParallelismForQueryCache(
Map<DistributedPlanWorker, UninstancedScanSource> workerToScanRanges,
DistributeContext distributeContext) {
if (fragment.queryCacheParam == null || workerToScanRanges.isEmpty()) {
return false;
}

ConnectContext context = statementContext.getConnectContext();
if (context == null || useLocalShuffleToAddParallel(distributeContext)) {
return false;
}

long totalTabletNum = olapScanNode.getScanTabletIds().size();
int parallelPipelineTaskNum = Math.max(
context.getSessionVariable().getParallelExecInstanceNum(
olapScanNode.getScanContext().getClusterName()), 1);
long threshold = (long) parallelPipelineTaskNum * workerToScanRanges.size();
return totalTabletNum > threshold;
}

private Map<Long, Long> buildTabletToPartitionId(List<Long> selectedPartitionIds) {
long selectedIndexId = olapScanNode.getSelectedIndexId();
if (selectedIndexId == -1) {
selectedIndexId = olapScanNode.getOlapTable().getBaseIndexId();
}

Set<Long> scanTabletIds = new LinkedHashSet<>(olapScanNode.getScanTabletIds());
Map<Long, Long> tabletToPartitionId = new LinkedHashMap<>(scanTabletIds.size());
for (Long partitionId : selectedPartitionIds) {
Partition partition = olapScanNode.getOlapTable().getPartition(partitionId);
if (partition == null) {
continue;
}
MaterializedIndex index = partition.getIndex(selectedIndexId);
if (index == null) {
continue;
}
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
if (scanTabletIds.contains(tabletId)) {
tabletToPartitionId.put(tabletId, partitionId);
}
}
}
return tabletToPartitionId;
}

private Map<Long, ScanRanges> splitScanRangesByPartition(
ScanRanges scanRanges, Map<Long, Long> tabletToPartitionId) {
Map<Long, ScanRanges> partitionToScanRanges = new LinkedHashMap<>();
for (int i = 0; i < scanRanges.params.size(); i++) {
TScanRangeParams scanRangeParams = scanRanges.params.get(i);
long tabletId = scanRangeParams.getScanRange().getPaloScanRange().getTabletId();
Long partitionId = tabletToPartitionId.get(tabletId);
if (partitionId == null) {
return null;
}
partitionToScanRanges
.computeIfAbsent(partitionId, id -> new ScanRanges())
.addScanRange(scanRangeParams, scanRanges.bytes.get(i));
}
return partitionToScanRanges;
}

@Override
protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> assignedJobs,
DistributedPlanWorkerManager workerManager, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
Expand Down
Loading
Loading