Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc
try {
long startTime = System.currentTimeMillis();
tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask();
if (DebugPointUtil.isEnable("FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception")) {
throw new RuntimeException("debug point: createRoutineLoadTask.exception");
}
if (LOG.isDebugEnabled()) {
LOG.debug("create routine load task cost(ms): {}, job id: {}",
(System.currentTimeMillis() - startTime), routineLoadTaskInfo.getJobId());
Expand All @@ -208,12 +211,12 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc
new ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " + e.getMessage()),
false);
throw e;
} catch (UserException e) {
} catch (Exception e) {
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.PAUSED,
new ErrorReason(e.getErrorCode(),
new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR,
"failed to create task: " + e.getMessage()), false);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.util.RoutineLoadTestUtils
import org.junit.Assert

suite("test_routine_load_task_exception_recovery", "nonConcurrent") {
def kafkaCsvTpoics = [
"test_routine_load_task_exception_recovery",
]

if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
def runSql = { String q -> sql q }
def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)

def tableName = "test_routine_load_task_exception_recovery"
def job = "test_task_exception_recovery"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int(20) NULL,
`k2` string NULL,
`v1` date NULL,
`v2` string NULL,
`v3` datetime NULL,
`v4` string NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

try {
sql """
CREATE ROUTINE LOAD ${job} ON ${tableName}
COLUMNS TERMINATED BY ","
FROM KAFKA
(
"kafka_broker_list" = "${kafka_broker}",
"kafka_topic" = "${kafkaCsvTpoics[0]}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""

// Enable debug point to simulate exception during createRoutineLoadTask
def injection = "FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception"
try {
logger.info("---test task exception recovery: enable debug point to simulate exception---")
GetDebugPoint().enableDebugPointForAllFEs(injection)

RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)

def maxWaitCount = 0
while (true) {
def res = runSql("show routine load for ${job}")
def routineLoadState = res[0][8].toString()
def otherMsg = res[0][19].toString()
logger.info("Routine load state: ${routineLoadState}, error message: ${otherMsg}")
if (routineLoadState == "PAUSED" && otherMsg.contains("failed to create task")) {
break
}
Thread.sleep(1000)
if (maxWaitCount++ > 60) {
Assert.fail("Routine load job did not pause as expected within timeout")
}
}
} finally {
GetDebugPoint().disableDebugPointForAllFEs(injection)
}

// After disabling the debug point, verify that the routine load can recover
// and successfully load data
logger.info("---test task exception recovery: verify data loading after recovery---")
RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 0)

// Verify data was loaded
def rowCount = sql "select count(*) from ${tableName}"
logger.info("Row count: ${rowCount[0][0]}")
Assert.assertTrue("Expected at least 2 rows in table", rowCount[0][0] >= 2)
} finally {
sql "stop routine load for ${job}"
}
}
}
Loading