From ddea463f5295d692d775b7483c5cc10c9eaaddd1 Mon Sep 17 00:00:00 2001 From: hui lai Date: Sat, 31 Jan 2026 05:58:24 +0800 Subject: [PATCH] [fix](job) fix routine load task schedule stuck after create task fail (#60143) ### What problem does this PR solve? fix routine load task schedule stuck after create task fail: ``` 026-01-21 18:46:11,938 WARN (Routine load task scheduler|52) [RoutineLoadTaskScheduler.process():117] Taking routine load task from queue has been interrupted java.lang.IllegalStateException at com.google.common.base.Preconditions.checkState(Preconditions.java:499) at org.apache.doris.analysis.SlotRef.getTableName(SlotRef.java:356) at org.apache.doris.rewrite.ExtractCommonFactorsRule.rewriteOrToIn(ExtractCommonFactorsRule.java:536) at org.apache.doris.rewrite.ExtractCommonFactorsRule.makeCompoundRemaining(ExtractCommonFactorsRule.java:459) at org.apache.doris.rewrite.ExtractCommonFactorsRule.extractCommonFactors(ExtractCommonFactorsRule.java:205) at org.apache.doris.rewrite.ExtractCommonFactorsRule.apply(ExtractCommonFactorsRule.java:80) at org.apache.doris.rewrite.ExprRewriter.applyRuleOnce(ExprRewriter.java:178) at org.apache.doris.rewrite.ExprRewriter.rewrite(ExprRewriter.java:171) at org.apache.doris.planner.FileLoadScanNode.initWhereExpr(FileLoadScanNode.java:171) at org.apache.doris.planner.FileLoadScanNode.initAndSetPrecedingFilter(FileLoadScanNode.java:144) at org.apache.doris.planner.FileLoadScanNode.initParamCreateContexts(FileLoadScanNode.java:134) at org.apache.doris.planner.FileLoadScanNode.init(FileLoadScanNode.java:125) at org.apache.doris.planner.StreamLoadPlanner.plan(StreamLoadPlanner.java:307) at org.apache.doris.planner.StreamLoadPlanner.plan(StreamLoadPlanner.java:116) at org.apache.doris.load.routineload.RoutineLoadJob.plan(RoutineLoadJob.java:1032) at org.apache.doris.load.routineload.KafkaTaskInfo.rePlan(KafkaTaskInfo.java:136) at org.apache.doris.load.routineload.KafkaTaskInfo.createRoutineLoadTask(KafkaTaskInfo.java:99) at org.apache.doris.load.routineload.RoutineLoadTaskScheduler.scheduleOneTask(RoutineLoadTaskScheduler.java:193) at org.apache.doris.load.routineload.RoutineLoadTaskScheduler.process(RoutineLoadTaskScheduler.java:115) at org.apache.doris.load.routineload.RoutineLoadTaskScheduler.runAfterCatalogReady(RoutineLoadTaskScheduler.java:84) at org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58) at org.apache.doris.common.util.Daemon.run(Daemon.java:119) ``` --- .../routineload/RoutineLoadTaskScheduler.java | 7 +- ...outine_load_task_exception_recovery.groovy | 101 ++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 8be721fba585d6..4156410ef86a30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -195,6 +195,9 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc try { long startTime = System.currentTimeMillis(); tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask(); + if (DebugPointUtil.isEnable("FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception")) { + throw new RuntimeException("debug point: createRoutineLoadTask.exception"); + } if (LOG.isDebugEnabled()) { LOG.debug("create routine load task cost(ms): {}, job id: {}", (System.currentTimeMillis() - startTime), routineLoadTaskInfo.getJobId()); @@ -208,12 +211,12 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc new ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " + e.getMessage()), false); throw e; - } catch (UserException e) { + } catch (Exception e) { // set BE id to -1 to release the BE slot routineLoadTaskInfo.setBeId(-1); routineLoadManager.getJob(routineLoadTaskInfo.getJobId()) .updateState(JobState.PAUSED, - new ErrorReason(e.getErrorCode(), + new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR, "failed to create task: " + e.getMessage()), false); throw e; } diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy new file mode 100644 index 00000000000000..a9cd82fd702b1d --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.RoutineLoadTestUtils +import org.junit.Assert + +suite("test_routine_load_task_exception_recovery", "nonConcurrent") { + def kafkaCsvTpoics = [ + "test_routine_load_task_exception_recovery", + ] + + if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) { + def runSql = { String q -> sql q } + def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context) + def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker) + + def tableName = "test_routine_load_task_exception_recovery" + def job = "test_task_exception_recovery" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + try { + sql """ + CREATE ROUTINE LOAD ${job} ON ${tableName} + COLUMNS TERMINATED BY "," + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // Enable debug point to simulate exception during createRoutineLoadTask + def injection = "FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception" + try { + logger.info("---test task exception recovery: enable debug point to simulate exception---") + GetDebugPoint().enableDebugPointForAllFEs(injection) + + RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics) + + def maxWaitCount = 0 + while (true) { + def res = runSql("show routine load for ${job}") + def routineLoadState = res[0][8].toString() + def otherMsg = res[0][19].toString() + logger.info("Routine load state: ${routineLoadState}, error message: ${otherMsg}") + if (routineLoadState == "PAUSED" && otherMsg.contains("failed to create task")) { + break + } + Thread.sleep(1000) + if (maxWaitCount++ > 60) { + Assert.fail("Routine load job did not pause as expected within timeout") + } + } + } finally { + GetDebugPoint().disableDebugPointForAllFEs(injection) + } + + // After disabling the debug point, verify that the routine load can recover + // and successfully load data + logger.info("---test task exception recovery: verify data loading after recovery---") + RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics) + RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 0) + + // Verify data was loaded + def rowCount = sql "select count(*) from ${tableName}" + logger.info("Row count: ${rowCount[0][0]}") + Assert.assertTrue("Expected at least 2 rows in table", rowCount[0][0] >= 2) + } finally { + sql "stop routine load for ${job}" + } + } +}