Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ Status CloudRowsetBuilder::init() {

Status CloudRowsetBuilder::check_tablet_version_count() {
int64_t version_count = cloud_tablet()->fetch_add_approximate_num_rowsets(0);
DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version",
{ version_count = INT_MAX; });
// TODO(plat1ko): load backoff algorithm
int32_t max_version_config = cloud_tablet()->max_version_config();
if (version_count > max_version_config) {
Expand Down
10 changes: 7 additions & 3 deletions be/src/load/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,9 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>&
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets,
int timeout) {
DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", {
// sleep 60s
std::this_thread::sleep_for(std::chrono::seconds(60));
DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", {
// sleep 61s
std::this_thread::sleep_for(std::chrono::seconds(61));
});
MonotonicStopWatch watch;
watch.start();
Expand Down Expand Up @@ -456,6 +456,10 @@ Status KafkaDataConsumer::get_latest_offsets_for_partitions(
Status KafkaDataConsumer::get_real_offsets_for_partitions(
const std::vector<PIntegerPair>& offset_flags, std::vector<PIntegerPair>* offsets,
int timeout) {
DBUG_EXECUTE_IF("KafkaDataConsumer.get_offsets_for_partitions.timeout", {
// sleep 61s
std::this_thread::sleep_for(std::chrono::seconds(61));
});
MonotonicStopWatch watch;
watch.start();
for (const auto& entry : offset_flags) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ suite("test_black_list","nonConcurrent,p0") {
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

def inject = "KafkaDataConsumer.get_latest_offsets_for_partitions.timeout"
def inject = "KafkaDataConsumer.get_offsets_for_partitions.timeout"
try {
GetDebugPoint().enableDebugPointForAllBEs(inject)
sql """
Expand All @@ -116,7 +116,7 @@ suite("test_black_list","nonConcurrent,p0") {
log.info("routine load state: ${state[0][8].toString()}".toString())
log.info("reason of state changed: ${state[0][17].toString()}".toString())
log.info("other msg: ${state[0][19].toString()}".toString())
if (state[0][17].toString().contains("failed to get latest partition offset") || state[0][19].toString().contains("failed to get latest partition offset")) {
if (state[0][17].toString().contains("Failed to get real offsets of kafka topic") || state[0][19].toString().contains("Failed to get real offsets of kafka topic")) {
break
}
if (count >= 90) {
Expand Down
Loading