Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ protected List<Long> getAvailableBackendIds(long jobId) throws LoadException {
return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(routineLoadJob.getCloudCluster())
.stream()
.filter(Backend::isAlive)
.filter(Backend::isLoadAvailable)
.filter(backend -> !backend.isDecommissioned() && !backend.isDecommissioning())
.map(Backend::getId)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
if (backend != null && backend.isLoadAvailable()
&& !backend.isDecommissioned()
&& !backend.isDecommissioning()
&& !failedBeIds.contains(beId)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new primary predicate skips decommissioning BEs, but the fallback below still bypasses it when backendIds is empty. Lines 255-260 repopulate candidates from routineLoadManager.getBlacklist() with only backend != null, so a BE that was blacklisted by an earlier metadata retry and later enters decommissioning can still be selected here and receive BackendServiceProxy.getInfo once all primary candidates are filtered out. KinesisUtil has the same fallback at lines 112-115. Please apply the same load-available/non-decommissioned predicate to the blacklist fallback, or share candidate construction, before sending metadata RPCs.

&& !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) {
backendIds.add(beId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
if (backend != null && backend.isLoadAvailable()
&& !backend.isDecommissioned()
&& !backend.isDecommissioning()
&& !failedBeIds.contains(beId)
&& !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) {
backendIds.add(beId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ public static Backend selectBackend(String cloudCluster) throws JobException {
.getBackendsByClusterName(cloudCluster)
.stream()
.filter(Backend::isLoadAvailable)
.filter(backend -> !backend.isDecommissioned() && !backend.isDecommissioning())
.collect(Collectors.toList());
if (bes.isEmpty()) {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE
}
List<String> backendsInfo = backends.stream()
.map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive()
+ ", decommission=" + be.isDecommissioned() + " }")
+ ", decommission=" + be.isDecommissioned()
+ ", decommissioning=" + be.isDecommissioning() + " }")
.collect(Collectors.toList());
throw new LoadException("No suitable backend " + ", backends = " + backendsInfo);
}
Expand Down Expand Up @@ -370,16 +371,13 @@ private Long getCachedBackend(String cluster, long tableId) {
}

private boolean isBackendAvailable(Backend backend, String cluster) {
if (backend == null || !backend.isAlive() || backend.isDecommissioned() || !backend.isLoadAvailable()) {
if (backend == null || !backend.isAlive() || backend.isDecommissioned() || backend.isDecommissioning()
Comment thread
liaoxin01 marked this conversation as resolved.
|| !backend.isLoadAvailable()) {
return false;
}
if (!Config.isCloudMode()) {
return true;
}
// for cloud mode
if (backend.isDecommissioning()) {
return false;
}
return cluster == null || cluster.equals(backend.getCloudClusterName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public StreamLoadHandler(TStreamLoadPutRequest request, AtomicInteger indexId,
public static Backend selectBackend(String clusterName) throws LoadException {
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(clusterName)
.stream().filter(Backend::isLoadAvailable)
.stream().filter(backend -> backend.isLoadAvailable() && !backend.isDecommissioned()
&& !backend.isDecommissioning())
.collect(Collectors.toList());

if (backends.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,13 +519,15 @@ public long getAvailableBeForTask(long jobId, long previousBeId) throws UserExce
updateBeIdToMaxConcurrentTasks();
Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
int previousBeIdleTaskNum = 0;
boolean previousBeAvailable = false;

// 1. Find if the given BE id has more than half of available slots
if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
// get the previousBackend info
Backend previousBackend = Env.getCurrentSystemInfo().getBackend(previousBeId);
// check previousBackend is not null && load available
if (previousBackend != null && previousBackend.isLoadAvailable()) {
previousBeAvailable = true;
if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) {
previousBeIdleTaskNum = 0;
} else if (beIdToConcurrentTasks.containsKey(previousBeId)) {
Expand Down Expand Up @@ -563,7 +565,7 @@ public long getAvailableBeForTask(long jobId, long previousBeId) throws UserExce
}
// 4. on the basis of selecting the maximum idle slot be,
// try to reuse the object cache as much as possible
if (previousBeIdleTaskNum == maxIdleSlotNum) {
if (previousBeAvailable && previousBeIdleTaskNum == maxIdleSlotNum) {
return previousBeId;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still allows a saturated previous BE to be reused when the only idle capacity comes from a BE that the new availability filter excludes. The scheduler gate uses getClusterIdleSlotNum(), whose slot map is still built from getAllBackendIds(true) and therefore counts alive decommissioning BEs. After this PR, getAvailableBackendIds() excludes that draining BE, so with one saturated eligible previous BE and one idle decommissioning BE, the loop below leaves maxIdleSlotNum == 0 and resultBeId == -1, but this tie check returns the previous BE anyway and allocateTaskToBe() submits another task to that saturated backend. Please keep routine-load slot accounting aligned with the new eligible-backend predicate, and only reuse the previous BE when it has a positive idle slot.

}
return resultBeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class BeSelectionPolicy {
public boolean needScheduleAvailable = false;
public boolean needQueryAvailable = false;
public boolean needLoadAvailable = false;
public boolean needNonDecommissioned = false;
// Resource tag. Empty means no need to consider resource tag.
Comment thread
liaoxin01 marked this conversation as resolved.
public Set<Tag> resourceTags = Sets.newHashSet();
// storage medium. null means no need to consider storage medium.
Expand Down Expand Up @@ -86,6 +87,12 @@ public Builder needQueryAvailable() {

public Builder needLoadAvailable() {
policy.needLoadAvailable = true;
policy.needNonDecommissioned = true;
return this;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By making needLoadAvailable() imply needNonDecommissioned(), the policy-backed load selectors now skip draining BEs, but the Kafka/Kinesis routine-load metadata proxy selectors still bypass this policy. KafkaUtil.getInfoRequest() and KinesisUtil.getInfoRequest() build candidates with backend.isLoadAvailable() && !backend.isDecommissioned() only; isLoadAvailable() does not check isDecommissioning(), and those helpers then send BackendServiceProxy.getInfo to the selected BE. So an alive decommissioning BE can still receive new routine-load Kafka/Kinesis metadata RPCs even though the PR moves the other load-selection paths away from draining BEs. Please add the same non-decommissioning predicate or a shared helper there, with focused coverage for decommissioning metadata-proxy candidates.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ac51c3d: KafkaUtil.getInfoRequest() and KinesisUtil.getInfoRequest() now also skip Backend.isDecommissioning() metadata-proxy candidates, matching the non-decommissioned load-selection behavior.

}

public Builder needNonDecommissioned() {
policy.needNonDecommissioned = true;
return this;
}

Expand Down Expand Up @@ -156,6 +163,7 @@ private boolean isMatch(Backend backend) {
if (needScheduleAvailable && !backend.isScheduleAvailable()
|| needQueryAvailable && !backend.isQueryAvailable()
|| needLoadAvailable && !backend.isLoadAvailable()
|| needNonDecommissioned && (backend.isDecommissioned() || backend.isDecommissioning())
Comment thread
liaoxin01 marked this conversation as resolved.
|| (!resourceTags.isEmpty() && !resourceTags.contains(backend.getLocationTag()))
Comment thread
liaoxin01 marked this conversation as resolved.
|| storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)
|| (requireAliveBe && !backend.isAlive())) {
Expand Down Expand Up @@ -231,8 +239,10 @@ public List<Backend> getCandidateBackends(Collection<Backend> backends) {

@Override
public String toString() {
return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | tags=%s | medium=%s",
return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | nonDecommissioned=%s"
+ " | tags=%s | medium=%s",
preferComputeNode, needQueryAvailable, needLoadAvailable, needScheduleAvailable,
needNonDecommissioned,
resourceTags.stream().map(tag -> tag.toString()).collect(Collectors.joining(",")), storageMedium);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load;

import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;

import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

public class StreamLoadHandlerTest {
@Test
public void testSelectBackendSkipsDecommissioningBackend() throws Exception {
SystemInfoService originalSystemInfoService = Env.getCurrentSystemInfo();
Backend decommissioningBackend = createBackend(10001L, "127.0.0.1");
decommissioningBackend.setDecommissioning(true);
Backend selectedBackend = createBackend(10002L, "127.0.0.2");
CloudSystemInfoService systemInfoService =
new TestCloudSystemInfoService(Arrays.asList(decommissioningBackend, selectedBackend));

try {
Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", systemInfoService);

Assert.assertEquals(selectedBackend.getId(), StreamLoadHandler.selectBackend("cluster0").getId());
} finally {
Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", originalSystemInfoService);
}
}

private Backend createBackend(long id, String host) {
Backend backend = new Backend(id, host, 9050);
backend.setAlive(true);
return backend;
}

private static class TestCloudSystemInfoService extends CloudSystemInfoService {
private final List<Backend> backends;

private TestCloudSystemInfoService(List<Backend> backends) {
this.backends = backends;
}

@Override
public List<Backend> getBackendsByClusterName(final String clusterName) {
return backends;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.routineload;

import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.load.CloudRoutineLoadManager;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;

import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class RoutineLoadBackendSelectionTest {
@Test
public void testAvailableBeForTaskDoesNotReuseDecommissioningPreviousBeWhenEligibleBesAreSaturated()
throws UserException {
int originalMaxRoutineLoadTaskNumPerBe = Config.max_routine_load_task_num_per_be;
SystemInfoService originalSystemInfoService = Env.getCurrentSystemInfo();
Backend previousBackend = createBackend(10001L, "127.0.0.1");
previousBackend.setDecommissioning(true);
Backend eligibleBackend = createBackend(10002L, "127.0.0.2");
SystemInfoService systemInfoService = new SystemInfoService();
systemInfoService.addBackend(previousBackend);
systemInfoService.addBackend(eligibleBackend);

try {
Config.max_routine_load_task_num_per_be = 0;
Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", systemInfoService);
RoutineLoadManager routineLoadManager = new TestRoutineLoadManager(
Collections.singletonList(eligibleBackend.getId()));

Assert.assertEquals(-1L, routineLoadManager.getAvailableBeForTask(1L, previousBackend.getId()));
} finally {
Config.max_routine_load_task_num_per_be = originalMaxRoutineLoadTaskNumPerBe;
Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", originalSystemInfoService);
}
}

@Test
public void testCloudAvailableBackendIdsSkipsLoadDisabledBackend() throws Exception {
SystemInfoService originalSystemInfoService = Env.getCurrentSystemInfo();
Backend loadDisabledBackend = createBackend(10001L, "127.0.0.1");
loadDisabledBackend.setLoadDisabled(true);
Backend selectedBackend = createBackend(10002L, "127.0.0.2");
CloudSystemInfoService systemInfoService =
new TestCloudSystemInfoService(Arrays.asList(loadDisabledBackend, selectedBackend));
RoutineLoadJob routineLoadJob = Mockito.mock(RoutineLoadJob.class);
Mockito.when(routineLoadJob.getCloudCluster()).thenReturn("cluster0");

try {
Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", systemInfoService);
TestCloudRoutineLoadManager routineLoadManager = new TestCloudRoutineLoadManager(routineLoadJob);

Assert.assertEquals(Collections.singletonList(selectedBackend.getId()),
routineLoadManager.getAvailableBackendIdsForTest(1L));
} finally {
Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", originalSystemInfoService);
}
}

private Backend createBackend(long id, String host) {
Backend backend = new Backend(id, host, 9050);
backend.setAlive(true);
return backend;
}

private static class TestRoutineLoadManager extends RoutineLoadManager {
private final List<Long> availableBackendIds;

private TestRoutineLoadManager(List<Long> availableBackendIds) {
this.availableBackendIds = availableBackendIds;
}

@Override
protected List<Long> getAvailableBackendIds(long jobId) {
return availableBackendIds;
}
}

private static class TestCloudRoutineLoadManager extends CloudRoutineLoadManager {
private final RoutineLoadJob routineLoadJob;

private TestCloudRoutineLoadManager(RoutineLoadJob routineLoadJob) {
this.routineLoadJob = routineLoadJob;
}

@Override
public RoutineLoadJob getJob(long jobId) {
return routineLoadJob;
}

private List<Long> getAvailableBackendIdsForTest(long jobId) throws LoadException {
return super.getAvailableBackendIds(jobId);
}
}

private static class TestCloudSystemInfoService extends CloudSystemInfoService {
private final List<Backend> backends;

private TestCloudSystemInfoService(List<Backend> backends) {
this.backends = backends;
}

@Override
public List<Backend> getBackendsByClusterName(final String clusterName) {
return backends;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,17 @@ public void testSelectBackendIdsByPolicy() throws Exception {
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy4, 3).size());

BeSelectionPolicy policy5 = new BeSelectionPolicy.Builder().needLoadAvailable().build();
Assert.assertTrue(policy5.toString().contains("nonDecommissioned=true"));
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy5, 1).size());
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10001L));
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10002L));
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10005L));
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy5, 2).size());
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10003L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10004L));
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10004L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10003L));
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy5, 2).size());
be3.setDecommissioning(true);
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy5, 1).size());
be3.setDecommissioning(false);

// 5. set tags
// reset all be
Expand Down
Loading