From 5eadbff0a2b607ba1fb739754ebad4c4c24d12ef Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 30 Jun 2026 21:38:03 +0800 Subject: [PATCH] [fix](fe) set cloud version_cache_ttl to 0 temporarily if retry a query with -230 (#63721) If a query get E-230 error and `cloud_partition_version_cache_ttl_ms` is not set to 0, this pr set the session var to 0 temporarily to get the newest version. --- .../org/apache/doris/qe/StmtExecutor.java | 36 +++++++++++- .../org/apache/doris/qe/StmtExecutorTest.java | 48 ++++++++++++++++ .../query_retry/test_retry_e-230.groovy | 55 +++++++++++++++++++ 3 files changed, 138 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 6d98e97af42d84..4c75df883a8279 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -512,6 +512,7 @@ public void queryRetry(TUniqueId queryId) throws Exception { UUID uuid; int retryTime = Config.max_query_retry_time; retryTime = retryTime <= 0 ? 1 : retryTime + 1; + boolean disableCloudVersionCacheOnRetry = false; // If the query is an `outfile` statement, // we execute it only once to avoid exporting redundant data. if (parsedStmt instanceof Queriable) { @@ -519,7 +520,11 @@ public void queryRetry(TUniqueId queryId) throws Exception { } for (int i = 1; i <= retryTime; i++) { try { - execute(queryId); + if (disableCloudVersionCacheOnRetry) { + executeWithVersionCacheDisabled(queryId); + } else { + execute(queryId); + } return; } catch (UserException e) { if (!SystemInfoService.needRetryWithReplan(e.getMessage()) || i == retryTime) { @@ -531,6 +536,7 @@ public void queryRetry(TUniqueId queryId) throws Exception { if (this.coord != null && this.coord.isQueryCancelled()) { throw e; } + disableCloudVersionCacheOnRetry = shouldDisableCloudVersionCacheOnRetry(e.getMessage()); TUniqueId lastQueryId = queryId; uuid = UUID.randomUUID(); queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); @@ -552,6 +558,34 @@ public void queryRetry(TUniqueId queryId) throws Exception { } } + // Temporarily disable the cloud version cache for this single attempt so that the retry + // re-fetches the visible version from meta-service, then restore the user-set TTLs. + private void executeWithVersionCacheDisabled(TUniqueId queryId) throws Exception { + SessionVariable sessionVariable = context.getSessionVariable(); + long oldPartitionTtl = sessionVariable.cloudPartitionVersionCacheTtlMs; + long oldTableTtl = sessionVariable.cloudTableVersionCacheTtlMs; + try { + sessionVariable.cloudPartitionVersionCacheTtlMs = 0; + sessionVariable.cloudTableVersionCacheTtlMs = 0; + LOG.info("temporarily set {} from {} to 0 and {} from {} to 0 before retry. {}", + SessionVariable.CLOUD_PARTITION_VERSION_CACHE_TTL_MS, oldPartitionTtl, + SessionVariable.CLOUD_TABLE_VERSION_CACHE_TTL_MS, oldTableTtl, + context.getQueryIdentifier()); + execute(queryId); + } finally { + sessionVariable.cloudPartitionVersionCacheTtlMs = oldPartitionTtl; + sessionVariable.cloudTableVersionCacheTtlMs = oldTableTtl; + } + } + + boolean shouldDisableCloudVersionCacheOnRetry(String errorMessage) { + return Config.isCloudMode() + && errorMessage != null + && errorMessage.contains(SystemInfoService.ERROR_E230) + && (context.getSessionVariable().cloudPartitionVersionCacheTtlMs != 0 + || context.getSessionVariable().cloudTableVersionCacheTtlMs != 0); + } + public void execute(TUniqueId queryId) throws Exception { SessionVariable sessionVariable = context.getSessionVariable(); if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index c778c965f6dd07..8d21e40282b307 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -339,4 +339,52 @@ public void testParseByNereidsSetsParsedStatementOnStatementContext() throws Exc "ParsedStatement should be a LogicalPlanAdapter after parseByNereids(), but was: " + (parsedStatement == null ? "null" : parsedStatement.getClass().getName())); } + + @Test + public void testShouldDisableCloudVersionCacheOnRetryForE230() { + String originalCloudUniqueId = Config.cloud_unique_id; + String originalDeployMode = Config.deploy_mode; + long originalPartitionTtl = connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs; + long originalTableTtl = connectContext.getSessionVariable().cloudTableVersionCacheTtlMs; + try { + Config.cloud_unique_id = "test-cloud-id"; + StmtExecutor executor = new StmtExecutor(connectContext, "select 1"); + + connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 1000L; + connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 1000L; + Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry( + "errCode = 2, detailMessage = E-230 versions are already compacted")); + Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry( + "errCode = 2, detailMessage = some other error")); + // null error message must not trigger the disable. + Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(null)); + + // Non-cloud mode must never disable the version cache, even on E-230. + Config.cloud_unique_id = ""; + Config.deploy_mode = ""; + Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry( + "errCode = 2, detailMessage = E-230 versions are already compacted")); + Config.cloud_unique_id = "test-cloud-id"; + + connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0L; + connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 1000L; + Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry( + "errCode = 2, detailMessage = E-230 versions are already compacted")); + + connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 1000L; + connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 0L; + Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry( + "errCode = 2, detailMessage = E-230 versions are already compacted")); + + connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0L; + connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = 0L; + Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry( + "errCode = 2, detailMessage = E-230 versions are already compacted")); + } finally { + Config.cloud_unique_id = originalCloudUniqueId; + Config.deploy_mode = originalDeployMode; + connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = originalPartitionTtl; + connectContext.getSessionVariable().cloudTableVersionCacheTtlMs = originalTableTtl; + } + } } diff --git a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy index b2ad94dd2a2b0c..8d87400e253fe8 100644 --- a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy +++ b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy @@ -162,7 +162,62 @@ suite("test_retry_e-230", 'docker') { // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s assertTrue(cost > 4000 && cost < 100000) + def restoreTbl = 'test_retry_e_230_restore_tbl' + sql """ DROP TABLE IF EXISTS ${restoreTbl} """ + sql """ + CREATE TABLE ${restoreTbl} ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num"="1" + ); + """ + for (def i = 1; i <= 3; i++) { + insert_sql """INSERT INTO ${restoreTbl} VALUES (${i}, ${100 * i})""", 1 + } + + sql """ set cloud_partition_version_cache_ttl_ms = 3600000 """ + sql """ set cloud_table_version_cache_ttl_ms = 3600000 """ + def row_cnt = sql """select count() from ${restoreTbl}""" + assertEquals(row_cnt[0][0], 3) + + cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null]) + cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null]) + insert_sql """INSERT INTO ${restoreTbl} VALUES (4, 400)""", 1 + + def futrue5 = thread { + Thread.sleep(4000) + cluster.clearBackendDebugPoints() + } + + begin = System.currentTimeMillis(); + def futrue6 = thread { + def result = sql """select * from ${restoreTbl} order by k1""" + log.info("select result: {}", result) + } + + futrue6.get() + cost = System.currentTimeMillis() - begin; + log.info("time cost restore check : {}", cost) + futrue5.get() + assertTrue(cost > 4000 && cost < 100000) + + def ttlRows = sql_return_maparray """ + select + @@session.cloud_partition_version_cache_ttl_ms as partition_ttl, + @@session.cloud_table_version_cache_ttl_ms as table_ttl + """ + assertEquals("3600000", ttlRows[0].partition_ttl.toString()) + assertEquals("3600000", ttlRows[0].table_ttl.toString()) + row_cnt = sql """select count() from ${restoreTbl}""" + assertEquals(row_cnt[0][0], 4) } finally { + sql """ set cloud_partition_version_cache_ttl_ms = DEFAULT """ + sql """ set cloud_table_version_cache_ttl_ms = DEFAULT """ cluster.clearFrontendDebugPoints() cluster.clearBackendDebugPoints() }