diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index 0ce4829d9fe1ba..adc1d199a6ad1a 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -96,6 +96,8 @@ Status CloudRowsetBuilder::init() { Status CloudRowsetBuilder::check_tablet_version_count() { int64_t version_count = cloud_tablet()->fetch_add_approximate_num_rowsets(0); + DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version", + { version_count = INT_MAX; }); // TODO(plat1ko): load backoff algorithm int32_t max_version_config = cloud_tablet()->max_version_config(); if (version_count > max_version_config) { diff --git a/be/src/load/routine_load/data_consumer.cpp b/be/src/load/routine_load/data_consumer.cpp index c3732ffee394b0..71e0e85941e168 100644 --- a/be/src/load/routine_load/data_consumer.cpp +++ b/be/src/load/routine_load/data_consumer.cpp @@ -420,9 +420,9 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector& Status KafkaDataConsumer::get_latest_offsets_for_partitions( const std::vector& partition_ids, std::vector* offsets, int timeout) { - DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", { - // sleep 60s - std::this_thread::sleep_for(std::chrono::seconds(60)); + DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", { + // sleep 61s + std::this_thread::sleep_for(std::chrono::seconds(61)); }); MonotonicStopWatch watch; watch.start(); @@ -456,6 +456,10 @@ Status KafkaDataConsumer::get_latest_offsets_for_partitions( Status KafkaDataConsumer::get_real_offsets_for_partitions( const std::vector& offset_flags, std::vector* offsets, int timeout) { + DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", { + // sleep 61s + std::this_thread::sleep_for(std::chrono::seconds(61)); + }); MonotonicStopWatch watch; watch.start(); for (const auto& entry : offset_flags) { diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy b/regression-test/suites/load_p0/routine_load/test_black_list.groovy index 29fc336492b7c5..a68ced241d6893 100644 --- a/regression-test/suites/load_p0/routine_load/test_black_list.groovy +++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy @@ -94,7 +94,7 @@ suite("test_black_list","nonConcurrent,p0") { PROPERTIES ("replication_allocation" = "tag.location.default: 1"); """ - def inject = "KafkaDataConsumer.get_latest_offsets_for_partitions.timeout" + def inject = "KafkaDataConsumer.get_offsets_for_partitions.timeout" try { GetDebugPoint().enableDebugPointForAllBEs(inject) sql """ @@ -116,7 +116,7 @@ suite("test_black_list","nonConcurrent,p0") { log.info("routine load state: ${state[0][8].toString()}".toString()) log.info("reason of state changed: ${state[0][17].toString()}".toString()) log.info("other msg: ${state[0][19].toString()}".toString()) - if (state[0][17].toString().contains("failed to get latest partition offset") || state[0][19].toString().contains("failed to get latest partition offset")) { + if (state[0][17].toString().contains("Failed to get real offsets of kafka topic") || state[0][19].toString().contains("Failed to get real offsets of kafka topic")) { break } if (count >= 90) {