diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java index 399b33e32d4761..cee1a079424e98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java @@ -54,7 +54,8 @@ protected List getAvailableBackendIds(long jobId) throws LoadException { return ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterName(routineLoadJob.getCloudCluster()) .stream() - .filter(Backend::isAlive) + .filter(Backend::isLoadAvailable) + .filter(backend -> !backend.isDecommissioned() && !backend.isDecommissioning()) .map(Backend::getId) .collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index 00fd28c88daf4c..8562d0eaed3b46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -238,8 +238,7 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx List backendIds = new ArrayList<>(); for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) { Backend backend = Env.getCurrentSystemInfo().getBackend(beId); - if (backend != null && backend.isLoadAvailable() - && !backend.isDecommissioned() + if (isBackendAvailableForMetaRequest(backend) && !failedBeIds.contains(beId) && !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) { backendIds.add(beId); @@ -255,9 +254,9 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx Map blacklist = Env.getCurrentEnv().getRoutineLoadManager().getBlacklist(); for (Long beId : blacklist.keySet()) { Backend backend = Env.getCurrentSystemInfo().getBackend(beId); - if (backend != null) { + if (isBackendAvailableForMetaRequest(backend) && !failedBeIds.contains(beId)) { backendIds.add(beId); - } else { + } else if (backend == null) { blacklist.remove(beId); LOG.warn("remove stale backend {} from routine load blacklist when getting kafka meta", beId); @@ -329,4 +328,9 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L); } } + + private static boolean isBackendAvailableForMetaRequest(Backend backend) { + return backend != null && backend.isLoadAvailable() + && !backend.isDecommissioned() && !backend.isDecommissioning(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kinesis/KinesisUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kinesis/KinesisUtil.java index 0e58633370ec83..e5769f8cf78d7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kinesis/KinesisUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kinesis/KinesisUtil.java @@ -101,16 +101,23 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx List backendIds = new ArrayList<>(); for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) { Backend backend = Env.getCurrentSystemInfo().getBackend(beId); - if (backend != null && backend.isLoadAvailable() - && !backend.isDecommissioned() + if (isBackendAvailableForMetaRequest(backend) && !failedBeIds.contains(beId) && !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) { backendIds.add(beId); } } if (backendIds.isEmpty()) { - for (Long beId : Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) { - backendIds.add(beId); + Map blacklist = Env.getCurrentEnv().getRoutineLoadManager().getBlacklist(); + for (Long beId : blacklist.keySet()) { + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + if (isBackendAvailableForMetaRequest(backend) && !failedBeIds.contains(beId)) { + backendIds.add(beId); + } else if (backend == null) { + blacklist.remove(beId); + LOG.warn("remove stale backend {} from routine load blacklist when getting kinesis meta", + beId); + } } } if (backendIds.isEmpty()) { @@ -162,4 +169,9 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L); } } + + private static boolean isBackendAvailableForMetaRequest(Backend backend) { + return backend != null && backend.isLoadAvailable() + && !backend.isDecommissioned() && !backend.isDecommissioning(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index cff663e0142b68..56d66d05ceb2ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -279,6 +279,7 @@ public static Backend selectBackend(String cloudCluster) throws JobException { .getBackendsByClusterName(cloudCluster) .stream() .filter(Backend::isLoadAvailable) + .filter(backend -> !backend.isDecommissioned() && !backend.isDecommissioning()) .collect(Collectors.toList()); if (bes.isEmpty()) { throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index bea5cd03c7dba0..534d38a09eed58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -336,7 +336,8 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE } List backendsInfo = backends.stream() .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() - + ", decommission=" + be.isDecommissioned() + " }") + + ", decommission=" + be.isDecommissioned() + + ", decommissioning=" + be.isDecommissioning() + " }") .collect(Collectors.toList()); throw new LoadException("No suitable backend " + ", backends = " + backendsInfo); } @@ -370,16 +371,13 @@ private Long getCachedBackend(String cluster, long tableId) { } private boolean isBackendAvailable(Backend backend, String cluster) { - if (backend == null || !backend.isAlive() || backend.isDecommissioned() || !backend.isLoadAvailable()) { + if (backend == null || !backend.isAlive() || backend.isDecommissioned() || backend.isDecommissioning() + || !backend.isLoadAvailable()) { return false; } if (!Config.isCloudMode()) { return true; } - // for cloud mode - if (backend.isDecommissioning()) { - return false; - } return cluster == null || cluster.equals(backend.getCloudClusterName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java index 48a678e3b71e83..e4971a96b7bbc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java @@ -97,7 +97,8 @@ public StreamLoadHandler(TStreamLoadPutRequest request, AtomicInteger indexId, public static Backend selectBackend(String clusterName) throws LoadException { List backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterName(clusterName) - .stream().filter(Backend::isLoadAvailable) + .stream().filter(backend -> backend.isLoadAvailable() && !backend.isDecommissioned() + && !backend.isDecommissioning()) .collect(Collectors.toList()); if (backends.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 904fa207ac3918..df5615016bfa54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -146,8 +146,13 @@ public void removeMultiLoadTaskTxnIdToRoutineLoadJobId(long txnId) { } public void updateBeIdToMaxConcurrentTasks() { - beIdToMaxConcurrentTasks = Env.getCurrentSystemInfo().getAllBackendIds(true).stream().collect( - Collectors.toMap(beId -> beId, beId -> Config.max_routine_load_task_num_per_be)); + beIdToMaxConcurrentTasks = Env.getCurrentSystemInfo().getAllBackendIds(true).stream() + .filter(beId -> { + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + return backend != null && backend.isLoadAvailable() + && !backend.isDecommissioned() && !backend.isDecommissioning(); + }) + .collect(Collectors.toMap(beId -> beId, beId -> Config.max_routine_load_task_num_per_be)); } // this is not real-time number @@ -519,6 +524,7 @@ public long getAvailableBeForTask(long jobId, long previousBeId) throws UserExce updateBeIdToMaxConcurrentTasks(); Map beIdToConcurrentTasks = getBeCurrentTasksNumMap(); int previousBeIdleTaskNum = 0; + boolean previousBeAvailable = false; // 1. Find if the given BE id has more than half of available slots if (previousBeId != -1L && availableBeIds.contains(previousBeId)) { @@ -526,6 +532,7 @@ public long getAvailableBeForTask(long jobId, long previousBeId) throws UserExce Backend previousBackend = Env.getCurrentSystemInfo().getBackend(previousBeId); // check previousBackend is not null && load available if (previousBackend != null && previousBackend.isLoadAvailable()) { + previousBeAvailable = true; if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) { previousBeIdleTaskNum = 0; } else if (beIdToConcurrentTasks.containsKey(previousBeId)) { @@ -534,7 +541,8 @@ public long getAvailableBeForTask(long jobId, long previousBeId) throws UserExce } else { previousBeIdleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId); } - if (previousBeIdleTaskNum == Config.max_routine_load_task_num_per_be) { + if (previousBeIdleTaskNum > 0 + && previousBeIdleTaskNum == Config.max_routine_load_task_num_per_be) { return previousBeId; } } @@ -563,7 +571,7 @@ public long getAvailableBeForTask(long jobId, long previousBeId) throws UserExce } // 4. on the basis of selecting the maximum idle slot be, // try to reuse the object cache as much as possible - if (previousBeIdleTaskNum == maxIdleSlotNum) { + if (previousBeAvailable && previousBeIdleTaskNum > 0 && previousBeIdleTaskNum == maxIdleSlotNum) { return previousBeId; } return resultBeId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 384b126aba0909..9699f8bab817d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -42,6 +42,7 @@ public class BeSelectionPolicy { public boolean needScheduleAvailable = false; public boolean needQueryAvailable = false; public boolean needLoadAvailable = false; + public boolean needNonDecommissioned = false; // Resource tag. Empty means no need to consider resource tag. public Set resourceTags = Sets.newHashSet(); // storage medium. null means no need to consider storage medium. @@ -86,6 +87,12 @@ public Builder needQueryAvailable() { public Builder needLoadAvailable() { policy.needLoadAvailable = true; + policy.needNonDecommissioned = true; + return this; + } + + public Builder needNonDecommissioned() { + policy.needNonDecommissioned = true; return this; } @@ -156,6 +163,7 @@ private boolean isMatch(Backend backend) { if (needScheduleAvailable && !backend.isScheduleAvailable() || needQueryAvailable && !backend.isQueryAvailable() || needLoadAvailable && !backend.isLoadAvailable() + || needNonDecommissioned && (backend.isDecommissioned() || backend.isDecommissioning()) || (!resourceTags.isEmpty() && !resourceTags.contains(backend.getLocationTag())) || storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium) || (requireAliveBe && !backend.isAlive())) { @@ -231,8 +239,10 @@ public List getCandidateBackends(Collection backends) { @Override public String toString() { - return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | tags=%s | medium=%s", + return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | nonDecommissioned=%s" + + " | tags=%s | medium=%s", preferComputeNode, needQueryAvailable, needLoadAvailable, needScheduleAvailable, + needNonDecommissioned, resourceTags.stream().map(tag -> tag.toString()).collect(Collectors.joining(",")), storageMedium); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/StreamLoadHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/StreamLoadHandlerTest.java new file mode 100644 index 00000000000000..20ecd88538c503 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/StreamLoadHandlerTest.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class StreamLoadHandlerTest { + @Test + public void testSelectBackendSkipsDecommissioningBackend() throws Exception { + SystemInfoService originalSystemInfoService = Env.getCurrentSystemInfo(); + Backend decommissioningBackend = createBackend(10001L, "127.0.0.1"); + decommissioningBackend.setDecommissioning(true); + Backend selectedBackend = createBackend(10002L, "127.0.0.2"); + CloudSystemInfoService systemInfoService = + new TestCloudSystemInfoService(Arrays.asList(decommissioningBackend, selectedBackend)); + + try { + Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", systemInfoService); + + Assert.assertEquals(selectedBackend.getId(), StreamLoadHandler.selectBackend("cluster0").getId()); + } finally { + Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", originalSystemInfoService); + } + } + + private Backend createBackend(long id, String host) { + Backend backend = new Backend(id, host, 9050); + backend.setAlive(true); + return backend; + } + + private static class TestCloudSystemInfoService extends CloudSystemInfoService { + private final List backends; + + private TestCloudSystemInfoService(List backends) { + this.backends = backends; + } + + @Override + public List getBackendsByClusterName(final String clusterName) { + return backends; + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadBackendSelectionTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadBackendSelectionTest.java new file mode 100644 index 00000000000000..10d39a7cfeda66 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadBackendSelectionTest.java @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.load.CloudRoutineLoadManager; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.Config; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RoutineLoadBackendSelectionTest { + @Test + public void testAvailableBeForTaskDoesNotReuseDecommissioningPreviousBeWhenEligibleBesAreSaturated() + throws UserException { + int originalMaxRoutineLoadTaskNumPerBe = Config.max_routine_load_task_num_per_be; + SystemInfoService originalSystemInfoService = Env.getCurrentSystemInfo(); + Backend previousBackend = createBackend(10001L, "127.0.0.1"); + previousBackend.setDecommissioning(true); + Backend eligibleBackend = createBackend(10002L, "127.0.0.2"); + SystemInfoService systemInfoService = new SystemInfoService(); + systemInfoService.addBackend(previousBackend); + systemInfoService.addBackend(eligibleBackend); + + try { + Config.max_routine_load_task_num_per_be = 0; + Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", systemInfoService); + RoutineLoadManager routineLoadManager = new TestRoutineLoadManager( + Collections.singletonList(eligibleBackend.getId())); + + Assert.assertEquals(-1L, routineLoadManager.getAvailableBeForTask(1L, previousBackend.getId())); + } finally { + Config.max_routine_load_task_num_per_be = originalMaxRoutineLoadTaskNumPerBe; + Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", originalSystemInfoService); + } + } + + @Test + public void testCloudAvailableBackendIdsSkipsLoadDisabledBackend() throws Exception { + SystemInfoService originalSystemInfoService = Env.getCurrentSystemInfo(); + Backend loadDisabledBackend = createBackend(10001L, "127.0.0.1"); + loadDisabledBackend.setLoadDisabled(true); + Backend selectedBackend = createBackend(10002L, "127.0.0.2"); + CloudSystemInfoService systemInfoService = + new TestCloudSystemInfoService(Arrays.asList(loadDisabledBackend, selectedBackend)); + RoutineLoadJob routineLoadJob = Mockito.mock(RoutineLoadJob.class); + Mockito.when(routineLoadJob.getCloudCluster()).thenReturn("cluster0"); + + try { + Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", systemInfoService); + TestCloudRoutineLoadManager routineLoadManager = new TestCloudRoutineLoadManager(routineLoadJob); + + Assert.assertEquals(Collections.singletonList(selectedBackend.getId()), + routineLoadManager.getAvailableBackendIdsForTest(1L)); + } finally { + Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", originalSystemInfoService); + } + } + + @Test + public void testAvailableBeForTaskDoesNotReuseSaturatedPreviousBe() throws Exception { + int originalMaxRoutineLoadTaskNumPerBe = Config.max_routine_load_task_num_per_be; + SystemInfoService originalSystemInfoService = Env.getCurrentSystemInfo(); + Backend previousBackend = createBackend(10001L, "127.0.0.1"); + Backend decommissioningBackend = createBackend(10002L, "127.0.0.2"); + decommissioningBackend.setDecommissioning(true); + SystemInfoService systemInfoService = new SystemInfoService(); + systemInfoService.addBackend(previousBackend); + systemInfoService.addBackend(decommissioningBackend); + RoutineLoadJob routineLoadJob = Mockito.mock(RoutineLoadJob.class); + Mockito.when(routineLoadJob.getState()).thenReturn(RoutineLoadJob.JobState.RUNNING); + Mockito.when(routineLoadJob.getBeCurrentTasksNumMap()) + .thenReturn(Collections.singletonMap(previousBackend.getId(), 1)); + Map idToRoutineLoadJob = new HashMap<>(); + idToRoutineLoadJob.put(1L, routineLoadJob); + + try { + Config.max_routine_load_task_num_per_be = 1; + Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", systemInfoService); + RoutineLoadManager routineLoadManager = new TestRoutineLoadManager( + Collections.singletonList(previousBackend.getId())); + Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); + + routineLoadManager.updateBeIdToMaxConcurrentTasks(); + Assert.assertEquals(0, routineLoadManager.getClusterIdleSlotNum()); + Assert.assertEquals(-1L, routineLoadManager.getAvailableBeForTask(1L, previousBackend.getId())); + } finally { + Config.max_routine_load_task_num_per_be = originalMaxRoutineLoadTaskNumPerBe; + Deencapsulation.setField(Env.getCurrentEnv(), "systemInfo", originalSystemInfoService); + } + } + + private Backend createBackend(long id, String host) { + Backend backend = new Backend(id, host, 9050); + backend.setAlive(true); + return backend; + } + + private static class TestRoutineLoadManager extends RoutineLoadManager { + private final List availableBackendIds; + + private TestRoutineLoadManager(List availableBackendIds) { + this.availableBackendIds = availableBackendIds; + } + + @Override + protected List getAvailableBackendIds(long jobId) { + return availableBackendIds; + } + } + + private static class TestCloudRoutineLoadManager extends CloudRoutineLoadManager { + private final RoutineLoadJob routineLoadJob; + + private TestCloudRoutineLoadManager(RoutineLoadJob routineLoadJob) { + this.routineLoadJob = routineLoadJob; + } + + @Override + public RoutineLoadJob getJob(long jobId) { + return routineLoadJob; + } + + private List getAvailableBackendIdsForTest(long jobId) throws LoadException { + return super.getAvailableBackendIds(jobId); + } + } + + private static class TestCloudSystemInfoService extends CloudSystemInfoService { + private final List backends; + + private TestCloudSystemInfoService(List backends) { + this.backends = backends; + } + + @Override + public List getBackendsByClusterName(final String clusterName) { + return backends; + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 541b40be0c5e61..ff68df157872a4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -48,6 +48,7 @@ import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.transaction.GlobalTransactionMgrIface; import org.apache.doris.transaction.TxnStateCallbackFactory; @@ -77,6 +78,12 @@ private void mockSessionVariable(ConnectContext connectContext) { Mockito.when(connectContext.getSessionVariable()).thenReturn(VariableMgr.newSessionVariable()); } + private void mockAvailableBackend(long beId) { + Backend backend = new Backend(beId, "host" + beId, 9050); + backend.setAlive(true); + Mockito.when(systemInfoService.getBackend(beId)).thenReturn(backend); + } + @Test public void testCreateJobAuthDeny() { AccessControllerManager accessManager = Mockito.mock(AccessControllerManager.class); @@ -225,6 +232,8 @@ public void testGetMinTaskBeId() throws LoadException { beIds.add(2L); Mockito.when(systemInfoService.getAllBackendIds(true)).thenReturn(beIds); + mockAvailableBackend(1L); + mockAvailableBackend(2L); try (MockedStatic envStatic = Mockito.mockStatic(Env.class)) { envStatic.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); @@ -302,6 +311,8 @@ public void testGetTotalIdleTaskNum() { beIds.add(2L); Mockito.when(systemInfoService.getAllBackendIds(true)).thenReturn(beIds); + mockAvailableBackend(1L); + mockAvailableBackend(2L); try (MockedStatic envStatic = Mockito.mockStatic(Env.class)) { envStatic.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index 033568017d90f3..f79a5a7020f9a9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -190,13 +190,17 @@ public void testSelectBackendIdsByPolicy() throws Exception { Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy4, 3).size()); BeSelectionPolicy policy5 = new BeSelectionPolicy.Builder().needLoadAvailable().build(); + Assert.assertTrue(policy5.toString().contains("nonDecommissioned=true")); Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy5, 1).size()); Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10001L)); Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10002L)); Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10005L)); - Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy5, 2).size()); - Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10003L)); - Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10004L)); + Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10004L)); + Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10003L)); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy5, 2).size()); + be3.setDecommissioning(true); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy5, 1).size()); + be3.setDecommissioning(false); // 5. set tags // reset all be