From b2900d06f3a30169776b6a3172f5635317063566 Mon Sep 17 00:00:00 2001 From: Wen Zhenghu Date: Wed, 1 Jul 2026 11:32:54 +0800 Subject: [PATCH] [feature](workload) Support remote scan bytes breaker in workload policy (#64649) ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: This PR adds a new workload policy condition `be_scan_bytes_from_remote_storage`, which allows Doris to cancel queries according to the amount of data read from remote storage by BE scan tasks. This is useful for limiting external table queries that read too much remote HDFS or object storage data. Implementation summary: - Add a new BE-side workload metric type in thrift for remote storage scan bytes. - Add FE workload policy parsing, validation, metadata mapping, and replay support for `be_scan_bytes_from_remote_storage`. - Add BE workload condition evaluation based on `io_context()->scan_bytes_from_remote_storage()`. - Add regression coverage using an existing Hive external `lineitem` table. ### Release note Support workload policy cancellation by BE remote storage scan bytes. ### Check List (For Author) - Test: - FE UT: passed - BE UT: passed - Regression test: passed, `test_workload_policy_remote_scan_bytes` - Manual test: verified existing workload policy behavior and new remote scan bytes cancellation on a deployed Doris instance - Behavior changed: Yes. Add a new workload policy condition `be_scan_bytes_from_remote_storage`. - Does this need documentation: Yes. The workload policy condition list should be updated. --- .../workload_condition.cpp | 15 +- .../workload_management/workload_condition.h | 33 +++- .../workload_sched_policy.cpp | 6 + .../runtime/workload_sched_policy_test.cpp | 34 ++++- .../WorkloadCondition.java | 3 + ...ConditionBeScanBytesFromRemoteStorage.java | 60 ++++++++ .../WorkloadMetricType.java | 3 +- .../WorkloadSchedPolicyMgr.java | 7 +- .../WorkloadSchedPolicyMgrTest.java | 46 ++++++ gensrc/thrift/BackendService.thrift | 4 +- ...t_workload_policy_remote_scan_bytes.groovy | 143 ++++++++++++++++++ 11 files changed, 347 insertions(+), 7 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytesFromRemoteStorage.java create mode 100644 regression-test/suites/external_table_p0/hive/test_workload_policy_remote_scan_bytes.groovy diff --git a/be/src/runtime/workload_management/workload_condition.cpp b/be/src/runtime/workload_management/workload_condition.cpp index 4c85a95ad1fd22..618168df69056f 100644 --- a/be/src/runtime/workload_management/workload_condition.cpp +++ b/be/src/runtime/workload_management/workload_condition.cpp @@ -56,6 +56,19 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) { return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes); } +// remote scan bytes +WorkloadConditionScanBytesFromRemoteStorage::WorkloadConditionScanBytesFromRemoteStorage( + WorkloadCompareOperator op, std::string str_val) { + _op = op; + _scan_bytes_from_remote_storage = std::stol(str_val); +} + +bool WorkloadConditionScanBytesFromRemoteStorage::eval(std::string str_val) { + int64_t scan_bytes_from_remote_storage_args = std::stol(str_val); + return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_from_remote_storage_args, + _scan_bytes_from_remote_storage); +} + // query memory WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val) { @@ -80,4 +93,4 @@ bool WorkloadConditionUsername::eval(std::string str_val) { return WorkloadCompareUtils::compare_string(_op, str_val, _username); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/workload_management/workload_condition.h b/be/src/runtime/workload_management/workload_condition.h index 1a8c9e8dc8b6bd..15ecc8d4b25471 100644 --- a/be/src/runtime/workload_management/workload_condition.h +++ b/be/src/runtime/workload_management/workload_condition.h @@ -23,7 +23,15 @@ namespace doris { -enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, QUERY_MEMORY_BYTES, USERNAME }; +enum WorkloadMetricType { + QUERY_TIME, + SCAN_ROWS, + SCAN_BYTES, + // Extend the BE workload metric enum with remote scan bytes support. + SCAN_BYTES_FROM_REMOTE_STORAGE, + QUERY_MEMORY_BYTES, + USERNAME +}; class WorkloadCondition { public: @@ -90,6 +98,25 @@ class WorkloadConditionScanBytes : public WorkloadCondition { WorkloadCompareOperator _op; }; +class WorkloadConditionScanBytesFromRemoteStorage : public WorkloadCondition { +public: + WorkloadConditionScanBytesFromRemoteStorage(WorkloadCompareOperator op, std::string str_val); + bool eval(std::string str_val) override; + WorkloadMetricType get_workload_metric_type() override { + return WorkloadMetricType::SCAN_BYTES_FROM_REMOTE_STORAGE; + } + + std::string get_metric_string() override { return "scan_bytes_from_remote_storage"; } + + std::string get_metric_value_string() override { + return std::to_string(_scan_bytes_from_remote_storage); + } + +private: + int64_t _scan_bytes_from_remote_storage; + WorkloadCompareOperator _op; +}; + class WorkloadConditionQueryMemory : public WorkloadCondition { public: WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val); @@ -136,6 +163,8 @@ class WorkloadConditionFactory { return std::make_unique(op, str_val); } else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) { return std::make_unique(op, str_val); + } else if (TWorkloadMetricType::type::BE_SCAN_BYTES_FROM_REMOTE_STORAGE == metric_name) { + return std::make_unique(op, str_val); } else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == metric_name) { return std::make_unique(op, str_val); } else if (TWorkloadMetricType::type::USERNAME == metric_name) { @@ -146,4 +175,4 @@ class WorkloadConditionFactory { } }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp index e439c873ad2e3f..a3cbc003ac08a5 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -80,6 +80,12 @@ bool WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext* action_runtim val = std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_bytes()); break; } + // Evaluate the remote read breaker against the existing IO context remote scan counter. + case WorkloadMetricType::SCAN_BYTES_FROM_REMOTE_STORAGE: { + val = std::to_string(action_runtime_ctx->resource_ctx->io_context() + ->scan_bytes_from_remote_storage()); + break; + } case WorkloadMetricType::SCAN_ROWS: { val = std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_rows()); break; diff --git a/be/test/runtime/workload_sched_policy_test.cpp b/be/test/runtime/workload_sched_policy_test.cpp index 719ce0f04ad9f1..311bf982685437 100644 --- a/be/test/runtime/workload_sched_policy_test.cpp +++ b/be/test/runtime/workload_sched_policy_test.cpp @@ -198,7 +198,39 @@ TEST_F(WorkloadSchedPolicyTest, one_policy_one_condition) { << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes(); } - // 5 check query be memory bytes + // 5 check remote scan bytes + { + std::shared_ptr policy = std::make_shared(); + std::vector> cond_ptr_list; + cond_ptr_list.push_back(create_workload_condition( + TWorkloadMetricType::type::BE_SCAN_BYTES_FROM_REMOTE_STORAGE, + TCompareOperator::type::GREATER, "1000")); + std::vector> action_ptr_list; + action_ptr_list.push_back(create_workload_action(TWorkloadActionType::type::CANCEL_QUERY)); + std::set wg_id_set; + policy->init(0, "p1", 0, true, 0, wg_id_set, std::move(cond_ptr_list), + std::move(action_ptr_list)); + + // Updating total scan bytes alone must not satisfy the remote read condition. + WorkloadAction::RuntimeContext action_runtime_ctx = create_runtime_context(); + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes(1001); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " << action_runtime_ctx.resource_ctx->io_context()->scan_bytes(); + + // Updating remote scan bytes below the threshold must still miss. + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes_from_remote_storage(999); + EXPECT_FALSE(policy->is_match(&action_runtime_ctx)) + << ": " + << action_runtime_ctx.resource_ctx->io_context()->scan_bytes_from_remote_storage(); + + // Only the remote scan bytes counter should drive this metric to a match. + action_runtime_ctx.resource_ctx->io_context()->update_scan_bytes_from_remote_storage(2); + EXPECT_TRUE(policy->is_match(&action_runtime_ctx)) + << ": " + << action_runtime_ctx.resource_ctx->io_context()->scan_bytes_from_remote_storage(); + } + + // 6 check query be memory bytes { std::shared_ptr policy = std::make_shared(); std::vector> cond_ptr_list; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java index c790a4013080d5..01dfae955dcb18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java @@ -37,6 +37,9 @@ static WorkloadCondition createWorkloadCondition(WorkloadConditionMeta cm) return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value); } else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) { return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value); + // Register the remote scan bytes condition so FE can parse and persist the new metric. + } else if (WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE.equals(cm.metricName)) { + return WorkloadConditionBeScanBytesFromRemoteStorage.createWorkloadCondition(cm.op, cm.value); } else if (WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) { return WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytesFromRemoteStorage.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytesFromRemoteStorage.java new file mode 100644 index 00000000000000..3b4e004971ebed --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytesFromRemoteStorage.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.common.UserException; + +/** + * Workload condition for matching query remote scan bytes collected on BE. + */ +public class WorkloadConditionBeScanBytesFromRemoteStorage implements WorkloadCondition { + + private long value; + + private WorkloadConditionOperator op; + + public WorkloadConditionBeScanBytesFromRemoteStorage(WorkloadConditionOperator op, long value) { + this.op = op; + this.value = value; + } + + @Override + public boolean eval(String strValue) { + // Currently this metric is evaluated only on BE, so FE-side matching always returns false. + return false; + } + + public static WorkloadConditionBeScanBytesFromRemoteStorage createWorkloadCondition( + WorkloadConditionOperator op, String value) throws UserException { + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid remote scan bytes value: " + value + ", it requires >= 0"); + } + return new WorkloadConditionBeScanBytesFromRemoteStorage(op, longValue); + } + + @Override + public WorkloadMetricType getMetricType() { + return WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java index 93e612a85c2ddd..5d83cb845f9491 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java @@ -18,5 +18,6 @@ package org.apache.doris.resource.workloadschedpolicy; public enum WorkloadMetricType { - USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES + // Keep the metric enum aligned with the workload policy metrics published to BE. + USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, BE_SCAN_BYTES_FROM_REMOTE_STORAGE, QUERY_BE_MEMORY_BYTES } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 3442c8a7789448..07d4288b6431f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -105,7 +105,9 @@ public WorkloadSchedPolicyMgr() { public static final ImmutableSet BE_METRIC_SET = new ImmutableSet.Builder().add(WorkloadMetricType.BE_SCAN_ROWS) - .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME) + .add(WorkloadMetricType.BE_SCAN_BYTES) + // Treat remote scan bytes as a BE-only runtime metric. + .add(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE).add(WorkloadMetricType.QUERY_TIME) .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).add(WorkloadMetricType.USERNAME).build(); // used for convert fe type to thrift type @@ -114,6 +116,9 @@ public WorkloadSchedPolicyMgr() { .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES) + // Map the new FE metric enum to the appended thrift metric enum. + .put(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE, + TWorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE) .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, TWorkloadMetricType.QUERY_BE_MEMORY_BYTES) .put(WorkloadMetricType.USERNAME, TWorkloadMetricType.USERNAME).build(); public static final ImmutableMap ACTION_MAP diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java index 7461cba03d5a71..6d4b6e2e6c3a7e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgrTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.persist.EditLog; +import org.apache.doris.thrift.TWorkloadMetricType; import org.junit.After; import org.junit.Assert; @@ -343,4 +344,49 @@ public void testNonCloudModeRejectsLeadingDotInWorkloadGroup() { e.getMessage().contains("non-cloud mode")); } } + + @Test + public void testRemoteScanBytesMetricCanCreateBePolicy() throws UserException { + List conditionMetas = new ArrayList<>(); + // Verify the new metric string can be parsed into a BE-side workload condition. + conditionMetas.add(new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "100")); + List actionMetas = new ArrayList<>(); + actionMetas.add(new WorkloadActionMeta("cancel_query", "")); + + mgr.createWorkloadSchedPolicy("policy_remote_scan_bytes", false, conditionMetas, actionMetas, null); + + Assert.assertTrue(WorkloadSchedPolicyMgr.BE_METRIC_SET.contains( + WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE)); + Assert.assertEquals(TWorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE, + WorkloadSchedPolicyMgr.METRIC_MAP.get(WorkloadMetricType.BE_SCAN_BYTES_FROM_REMOTE_STORAGE)); + } + + @Test + public void testRemoteScanBytesMetricRejectsNegativeValue() throws UserException { + try { + // Reject negative thresholds for the remote scan bytes breaker. + WorkloadCondition.createWorkloadCondition( + new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "-1")); + Assert.fail("Should throw exception for negative remote scan bytes value"); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("remote scan bytes")); + } + } + + @Test + public void testRemoteScanBytesMetricCanNotMixWithFeAction() throws UserException { + try { + List conditionMetas = new ArrayList<>(); + // Validate the new metric follows the existing BE-only action compatibility rules. + conditionMetas.add(new WorkloadConditionMeta("be_scan_bytes_from_remote_storage", ">", "100")); + List actionMetas = new ArrayList<>(); + actionMetas.add(new WorkloadActionMeta("set_session_variable", "workload_group=normal")); + + mgr.createWorkloadSchedPolicy("policy_remote_scan_bytes_with_fe_action", false, conditionMetas, + actionMetas, null); + Assert.fail("Should throw exception for remote scan bytes metric with FE action"); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains("action and metric must run in FE together or run in BE together")); + } + } } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 676f116f003bdb..faea4c46ceae78 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -297,7 +297,9 @@ enum TWorkloadMetricType { BE_SCAN_ROWS = 1, BE_SCAN_BYTES = 2, QUERY_BE_MEMORY_BYTES = 3, - USERNAME = 4 + USERNAME = 4, + // Append the new enum value to keep existing metric ids stable across versions. + BE_SCAN_BYTES_FROM_REMOTE_STORAGE = 5 } enum TCompareOperator { diff --git a/regression-test/suites/external_table_p0/hive/test_workload_policy_remote_scan_bytes.groovy b/regression-test/suites/external_table_p0/hive/test_workload_policy_remote_scan_bytes.groovy new file mode 100644 index 00000000000000..3b6cb46f1b3bba --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_workload_policy_remote_scan_bytes.groovy @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_workload_policy_remote_scan_bytes", "p0,external") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable Hive test.") + return + } + + String hivePrefix = "hive2" + setHivePrefix(hivePrefix) + + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalogName = "test_workload_policy_remote_scan_bytes" + String workloadGroupName = "test_remote_scan_bytes_wg" + String policyName = "test_remote_scan_bytes_policy" + String invalidPolicyName = "test_remote_scan_bytes_invalid" + + String forComputeGroupStr = "" + String currentCgName = "" + if (isCloudMode()) { + def clusters = sql "SHOW CLUSTERS" + assertTrue(!clusters.isEmpty()) + String validCluster = clusters[0][0] + currentCgName = "${validCluster}." + forComputeGroupStr = " for ${validCluster} " + } + + try { + sql """DROP WORKLOAD POLICY IF EXISTS ${policyName}""" + sql """DROP WORKLOAD POLICY IF EXISTS ${invalidPolicyName}""" + sql """DROP WORKLOAD GROUP IF EXISTS ${workloadGroupName} ${forComputeGroupStr}""" + sql """DROP CATALOG IF EXISTS ${catalogName}""" + + sql """ + CREATE CATALOG ${catalogName} PROPERTIES ( + 'type' = 'hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}', + 'hadoop.username' = 'hive', + 'ipc.client.fallback-to-simple-auth-allowed' = 'true' + ) + """ + + String lineitemDb = "tpch1_parquet" + try { + def tables = sql """SHOW TABLES FROM ${catalogName}.${lineitemDb} LIKE 'lineitem'""" + if (tables.isEmpty()) { + throw new IllegalStateException("${lineitemDb}.lineitem does not exist") + } + } catch (Throwable ignored) { + lineitemDb = "tpch1" + def tables = sql """SHOW TABLES FROM ${catalogName}.${lineitemDb} LIKE 'lineitem'""" + assertFalse(tables.isEmpty(), "${catalogName} does not contain tpch1_parquet.lineitem or tpch1.lineitem") + } + + sql """ + CREATE WORKLOAD GROUP ${workloadGroupName} ${forComputeGroupStr} + PROPERTIES ('max_cpu_percent' = '100') + """ + + test { + sql """ + CREATE WORKLOAD POLICY ${invalidPolicyName} + CONDITIONS(be_scan_bytes_from_remote_storage > -1) + ACTIONS(cancel_query) + PROPERTIES('enabled' = 'false') + """ + exception "invalid remote scan bytes value" + } + + sql """ + CREATE WORKLOAD POLICY ${policyName} + CONDITIONS(be_scan_bytes_from_remote_storage > 1) + ACTIONS(cancel_query) + PROPERTIES( + 'priority' = '100', + 'workload_group' = '${currentCgName}${workloadGroupName}' + ) + """ + + def policy = sql """ + SELECT name, condition, action, priority, enabled, workload_group + FROM information_schema.workload_policy + WHERE name = '${policyName}' + """ + assertEquals(1, policy.size()) + assertEquals(policyName, policy[0][0]) + assertTrue(policy[0][1].toString().contains("be_scan_bytes_from_remote_storage > 1")) + + // Wait for FE to publish the new BE-side policy before issuing the query. + Thread.sleep(15000) + + Throwable queryException = null + sql """SET workload_group = '${workloadGroupName}'""" + sql """SET enable_file_cache = false""" + sql """SET enable_sql_cache = false""" + try { + sql """ + SELECT SUM(SLEEP(1) + l_quantity) + FROM ( + SELECT l_quantity + FROM ${catalogName}.${lineitemDb}.lineitem + LIMIT 10 + ) s + """ + } catch (Throwable t) { + queryException = t + } + assertTrue(queryException != null, "query should be cancelled by remote scan bytes workload policy") + String msg = queryException.getMessage() + logger.info("Remote scan bytes workload policy cancel message: " + msg) + assertTrue(msg != null && msg.contains("cancelled by workload policy: ${policyName}"), + "unexpected cancel policy: " + msg) + assertTrue(msg.contains("scan_bytes_from_remote_storage"), + "remote scan bytes counter is missing from cancel message: " + msg) + } finally { + try { + sql """SET workload_group = ''""" + } catch (Throwable t) { + logger.info("ignore reset workload_group failure: " + t.getMessage()) + } + sql """DROP WORKLOAD POLICY IF EXISTS ${policyName}""" + sql """DROP WORKLOAD POLICY IF EXISTS ${invalidPolicyName}""" + sql """DROP WORKLOAD GROUP IF EXISTS ${workloadGroupName} ${forComputeGroupStr}""" + sql """DROP CATALOG IF EXISTS ${catalogName}""" + } +}