Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -512,14 +512,19 @@ public void queryRetry(TUniqueId queryId) throws Exception {
UUID uuid;
int retryTime = Config.max_query_retry_time;
retryTime = retryTime <= 0 ? 1 : retryTime + 1;
boolean disableCloudVersionCacheOnRetry = false;
// If the query is an `outfile` statement,
// we execute it only once to avoid exporting redundant data.
if (parsedStmt instanceof Queriable) {
retryTime = ((Queriable) parsedStmt).hasOutFileClause() ? 1 : retryTime;
}
for (int i = 1; i <= retryTime; i++) {
try {
execute(queryId);
if (disableCloudVersionCacheOnRetry) {
executeWithVersionCacheDisabled(queryId);
} else {
execute(queryId);
}
return;
} catch (UserException e) {
if (!SystemInfoService.needRetryWithReplan(e.getMessage()) || i == retryTime) {
Expand All @@ -531,6 +536,7 @@ public void queryRetry(TUniqueId queryId) throws Exception {
if (this.coord != null && this.coord.isQueryCancelled()) {
throw e;
}
disableCloudVersionCacheOnRetry = shouldDisableCloudVersionCacheOnRetry(e.getMessage());
TUniqueId lastQueryId = queryId;
uuid = UUID.randomUUID();
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
Expand All @@ -552,6 +558,34 @@ public void queryRetry(TUniqueId queryId) throws Exception {
}
}

// Temporarily disable the cloud version cache for this single attempt so that the retry
// re-fetches the visible version from meta-service, then restore the user-set TTLs.
private void executeWithVersionCacheDisabled(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
long oldPartitionTtl = sessionVariable.cloudPartitionVersionCacheTtlMs;
long oldTableTtl = sessionVariable.cloudTableVersionCacheTtlMs;
try {
sessionVariable.cloudPartitionVersionCacheTtlMs = 0;
sessionVariable.cloudTableVersionCacheTtlMs = 0;
LOG.info("temporarily set {} from {} to 0 and {} from {} to 0 before retry. {}",
SessionVariable.CLOUD_PARTITION_VERSION_CACHE_TTL_MS, oldPartitionTtl,
SessionVariable.CLOUD_TABLE_VERSION_CACHE_TTL_MS, oldTableTtl,
context.getQueryIdentifier());
execute(queryId);
} finally {
sessionVariable.cloudPartitionVersionCacheTtlMs = oldPartitionTtl;
sessionVariable.cloudTableVersionCacheTtlMs = oldTableTtl;
}
}

boolean shouldDisableCloudVersionCacheOnRetry(String errorMessage) {
return Config.isCloudMode()
&& errorMessage != null
&& errorMessage.contains(SystemInfoService.ERROR_E230)
&& (context.getSessionVariable().cloudPartitionVersionCacheTtlMs != 0
|| context.getSessionVariable().cloudTableVersionCacheTtlMs != 0);
}

public void execute(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,52 @@ public void testParseByNereidsSetsParsedStatementOnStatementContext() throws Exc
"ParsedStatement should be a LogicalPlanAdapter after parseByNereids(), but was: "
+ (parsedStatement == null ? "null" : parsedStatement.getClass().getName()));
}

@Test
public void testShouldDisableCloudVersionCacheOnRetryForE230() {
String originalCloudUniqueId = Config.cloud_unique_id;
String originalDeployMode = Config.deploy_mode;
long originalPartitionTtl = connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs;
long originalTableTtl = connectContext.getSessionVariable().cloudTableVersionCacheTtlMs;
try {
Config.cloud_unique_id = "test-cloud-id";
StmtExecutor executor = new StmtExecutor(connectContext, "select 1");

connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 1000L;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 1000L;
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = some other error"));
// null error message must not trigger the disable.
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(null));

// Non-cloud mode must never disable the version cache, even on E-230.
Config.cloud_unique_id = "";
Config.deploy_mode = "";
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));
Config.cloud_unique_id = "test-cloud-id";

connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0L;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 1000L;
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));

connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 1000L;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 0L;
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));

connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0L;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 0L;
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(
"errCode = 2, detailMessage = E-230 versions are already compacted"));
} finally {
Config.cloud_unique_id = originalCloudUniqueId;
Config.deploy_mode = originalDeployMode;
connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = originalPartitionTtl;
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = originalTableTtl;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,62 @@ suite("test_retry_e-230", 'docker') {
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 4000 && cost < 100000)

def restoreTbl = 'test_retry_e_230_restore_tbl'
sql """ DROP TABLE IF EXISTS ${restoreTbl} """
sql """
CREATE TABLE ${restoreTbl} (
`k1` int(11) NULL,
`k2` int(11) NULL
)
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
"""
for (def i = 1; i <= 3; i++) {
insert_sql """INSERT INTO ${restoreTbl} VALUES (${i}, ${100 * i})""", 1
}

sql """ set cloud_partition_version_cache_ttl_ms = 3600000 """
sql """ set cloud_table_version_cache_ttl_ms = 3600000 """
def row_cnt = sql """select count() from ${restoreTbl}"""
assertEquals(row_cnt[0][0], 3)

cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])
cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])
insert_sql """INSERT INTO ${restoreTbl} VALUES (4, 400)""", 1

def futrue5 = thread {
Thread.sleep(4000)
cluster.clearBackendDebugPoints()
}

begin = System.currentTimeMillis();
def futrue6 = thread {
def result = sql """select * from ${restoreTbl} order by k1"""
log.info("select result: {}", result)
}

futrue6.get()
cost = System.currentTimeMillis() - begin;
log.info("time cost restore check : {}", cost)
futrue5.get()
assertTrue(cost > 4000 && cost < 100000)

def ttlRows = sql_return_maparray """
select
@@session.cloud_partition_version_cache_ttl_ms as partition_ttl,
@@session.cloud_table_version_cache_ttl_ms as table_ttl
"""
assertEquals("3600000", ttlRows[0].partition_ttl.toString())
assertEquals("3600000", ttlRows[0].table_ttl.toString())
row_cnt = sql """select count() from ${restoreTbl}"""
assertEquals(row_cnt[0][0], 4)
} finally {
sql """ set cloud_partition_version_cache_ttl_ms = DEFAULT """
sql """ set cloud_table_version_cache_ttl_ms = DEFAULT """
cluster.clearFrontendDebugPoints()
cluster.clearBackendDebugPoints()
}
Expand Down
Loading