diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 168bb1372598eb..4583271a64b94d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -20,10 +20,13 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.ComputeGroupException; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.InternalErrorCode; @@ -167,6 +170,10 @@ public class StreamingInsertJob extends AbstractJob convertedSourceProperties; + @Getter + @SerializedName("ccn") + private volatile String cloudCluster; + // The sampling window starts at the beginning of the sampling window. // If the error rate exceeds `max_filter_ratio` within the window, the sampling fails. @Setter @@ -238,8 +245,7 @@ private void initSourceJob() { } StreamingJobUtils.resolveAndValidateSource( dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls); - this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, - getConvertedSourceProperties()); + this.offsetProvider = createOffsetProvider(getConvertedSourceProperties()); JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider; rdsOffsetProvider.splitChunks(createTbls); } catch (Exception ex) { @@ -292,6 +298,7 @@ private void init() { this.jobProperties = new StreamingJobProperties(properties); jobProperties.validate(); this.sampleWindowMs = jobProperties.getMaxIntervalSecond() * 10 * 1000; + resolveCloudCluster(); // build time definition JobExecutionConfiguration execConfig = getJobConfig(); TimerDefinition timerDefinition = new TimerDefinition(); @@ -305,13 +312,75 @@ private void init() { } } + private void resolveCloudCluster() throws AnalysisException { + String requested = validateComputeGroupProperty(properties); + if (requested != null) { + this.cloudCluster = requested; + return; + } + if (!Config.isCloudMode()) { + return; + } + if (ConnectContext.get() == null) { + throw new AnalysisException("compute_group must be specified when no active session is available"); + } + String sessionCluster; + try { + sessionCluster = ConnectContext.get().getCloudCluster(); + } catch (ComputeGroupException e) { + throw new AnalysisException("failed to resolve compute_group: " + e.getMessage()); + } + if (StringUtils.isBlank(sessionCluster)) { + throw new AnalysisException("compute_group is required in cloud mode; " + + "specify compute_group explicitly or bind a default cluster with USAGE"); + } + try { + ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(sessionCluster); + } catch (DdlException e) { + throw new AnalysisException(e.getMessage()); + } + this.cloudCluster = sessionCluster; + } + + // returns the validated compute_group value, or null when the property is absent. + // throws if the key is present but blank, non-cloud mode, or the user lacks USAGE on the cluster. + private String validateComputeGroupProperty(Map props) throws AnalysisException { + if (props == null || !props.containsKey(StreamingJobProperties.COMPUTE_GROUP_PROPERTY)) { + return null; + } + String value = props.get(StreamingJobProperties.COMPUTE_GROUP_PROPERTY); + if (StringUtils.isBlank(value)) { + throw new AnalysisException("compute_group cannot be empty"); + } + if (!Config.isCloudMode()) { + throw new AnalysisException("compute_group is only supported in cloud mode"); + } + try { + ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(value); + } catch (DdlException e) { + throw new AnalysisException(e.getMessage()); + } + return value; + } + + private SourceOffsetProvider createOffsetProvider(Map jdbcSourceProps) { + SourceOffsetProvider provider; + if (tvfType != null) { + provider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType); + } else { + provider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, jdbcSourceProps); + } + provider.setCloudCluster(this.cloudCluster); + return provider; + } + private void initInsertJob() { try { init(); UnboundTVFRelation currentTvf = getCurrentTvf(); this.tvfType = currentTvf.getFunctionName(); this.originTvfProps = currentTvf.getProperties().getMap(); - this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName()); + this.offsetProvider = createOffsetProvider(sourceProperties); this.offsetProvider.ensureInitialized(getJobId(), originTvfProps); // Validate source-side resources (e.g. PG slot/publication ownership) once at job // creation so conflicts fail fast. No-op for standalone cdc_stream TVF (no job). @@ -411,6 +480,8 @@ public void alterJob(AlterJobCommand alterJobCommand) throws AnalysisException, logParts.add("sql: " + encryptedSql); } + validateComputeGroupProperty(alterJobCommand.getProperties()); + // update properties if (!alterJobCommand.getProperties().isEmpty()) { modifyPropertiesInternal(alterJobCommand.getProperties()); @@ -547,7 +618,7 @@ protected AbstractStreamingTask createStreamingTask() throws JobException { private AbstractStreamingTask createStreamingMultiTblTask() throws JobException { return new StreamingMultiTblTask(getJobId(), Env.getCurrentEnv().getNextId(), dataSourceType, offsetProvider, getConvertedSourceProperties(), targetDb, targetProperties, jobProperties, - getCreateUser()); + getCreateUser(), cloudCluster); } private Map getConvertedSourceProperties() throws JobException { @@ -571,7 +642,8 @@ private Map getProviderProps() throws JobException { protected AbstractStreamingTask createStreamingInsertTask() { return new StreamingInsertTask(getJobId(), Env.getCurrentEnv().getNextId(), getExecuteSql(), - offsetProvider, getCurrentDbName(), jobProperties, getOriginTvfProps(), getCreateUser()); + offsetProvider, getCurrentDbName(), jobProperties, getOriginTvfProps(), + getCreateUser(), cloudCluster); } public void recordTasks(AbstractStreamingTask task) { @@ -856,6 +928,10 @@ private void modifyPropertiesInternal(Map inputProperties) throw resetCloudProgress(offset); } } + if (inputProperties.containsKey(StreamingJobProperties.COMPUTE_GROUP_PROPERTY)) { + this.cloudCluster = inputProperties.get(StreamingJobProperties.COMPUTE_GROUP_PROPERTY); + offsetProvider.setCloudCluster(this.cloudCluster); + } this.properties.putAll(inputProperties); this.jobProperties = new StreamingJobProperties(this.properties); } @@ -1227,11 +1303,9 @@ public void replayOnVisible(TransactionState txnState) { @Override public void gsonPostProcess() throws IOException { if (offsetProvider == null) { + offsetProvider = createOffsetProvider(sourceProperties); if (tvfType != null) { - offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType); offsetProvider.restoreFromPersistInfo(offsetProviderPersist); - } else { - offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, sourceProperties); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java index 54832eb6fb1bd4..df23f10724049f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Status; import org.apache.doris.common.util.Util; @@ -41,6 +42,7 @@ import lombok.Getter; import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.StringUtils; import java.util.Arrays; import java.util.Collections; @@ -58,6 +60,7 @@ public class StreamingInsertTask extends AbstractStreamingTask { private ConnectContext ctx; private StreamingJobProperties jobProperties; private Map originTvfProps; + private String cloudCluster; SourceOffsetProvider offsetProvider; public StreamingInsertTask(long jobId, @@ -67,13 +70,15 @@ public StreamingInsertTask(long jobId, String currentDb, StreamingJobProperties jobProperties, Map originTvfProps, - UserIdentity userIdentity) { + UserIdentity userIdentity, + String cloudCluster) { super(jobId, taskId, userIdentity); this.sql = sql; this.currentDb = currentDb; this.offsetProvider = offsetProvider; this.jobProperties = jobProperties; this.originTvfProps = originTvfProps; + this.cloudCluster = cloudCluster; } @Override @@ -86,6 +91,10 @@ public void before() throws Exception { this.startTimeMs = System.currentTimeMillis(); ctx = InsertTask.makeConnectContext(userIdentity, currentDb); ctx.setSessionVariable(jobProperties.getSessionVariable(ctx.getSessionVariable())); + // apply after session merge so compute_group wins over session.cloud_cluster + if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) { + ctx.setCloudCluster(cloudCluster); + } StatementContext statementContext = new StatementContext(); ctx.setStatementContext(statementContext); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java index c7a814c4a5b772..3db3aecaf8b2ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java @@ -44,8 +44,9 @@ public class StreamingJobProperties implements JobProperties { public static final String SESSION_VAR_PREFIX = "session."; public static final String INTERNAL_KEY_PREFIX = "__"; public static final String OFFSET_PROPERTY = "offset"; + public static final String COMPUTE_GROUP_PROPERTY = "compute_group"; public static final List SUPPORT_STREAM_JOB_PROPS = Arrays.asList(MAX_INTERVAL_SECOND_PROPERTY, - S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY, OFFSET_PROPERTY); + S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY, OFFSET_PROPERTY, COMPUTE_GROUP_PROPERTY); public static final long DEFAULT_MAX_INTERVAL_SECOND = 10; public static final long DEFAULT_MAX_S3_BATCH_FILES = 256; @@ -195,4 +196,8 @@ private Map parseSessionVarMap() { public String getOffsetProperty() { return properties.get(OFFSET_PROPERTY); } + + public String getComputeGroup() { + return properties.get(COMPUTE_GROUP_PROPERTY); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index b1d28b2dcad978..ac5efe51542f31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -75,6 +75,7 @@ public class StreamingMultiTblTask extends AbstractStreamingTask { private Map targetProperties; private String targetDb; private StreamingJobProperties jobProperties; + private String cloudCluster; private long scannedRows = 0L; private long loadBytes = 0L; private long filteredRows = 0L; @@ -90,7 +91,8 @@ public StreamingMultiTblTask(Long jobId, String targetDb, Map targetProperties, StreamingJobProperties jobProperties, - UserIdentity userIdentity) { + UserIdentity userIdentity, + String cloudCluster) { super(jobId, taskId, userIdentity); this.dataSourceType = dataSourceType; this.offsetProvider = offsetProvider; @@ -98,6 +100,7 @@ public StreamingMultiTblTask(Long jobId, this.targetProperties = targetProperties; this.jobProperties = jobProperties; this.targetDb = targetDb; + this.cloudCluster = cloudCluster; this.timeoutMs = Config.streaming_task_timeout_multiplier * jobProperties.getMaxIntervalSecond() * 1000L; } @@ -123,7 +126,7 @@ public void run() throws JobException { } private void sendWriteRequest() throws JobException { - Backend backend = StreamingJobUtils.selectBackend(); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster); log.info("start to run streaming multi task {} in backend {}/{}, offset is {}", taskId, backend.getId(), backend.getHost(), runningOffset.toString()); this.runningBackendId = backend.getId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index 49c718c59504c5..87eca253c46577 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -88,6 +88,12 @@ default void initOnCreate() throws JobException {} */ void updateOffset(Offset offset); + /** + * Bind the compute group that should route FE-initiated RPCs. + * Default: no-op for providers that do not make BE RPCs. + */ + default void setCloudCluster(String cloudCluster) {} + /** * Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index cfa2fe3273b6fd..3209b190538312 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -94,6 +94,8 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider { volatile boolean hasMoreData = true; + transient volatile String cloudCluster; + /** * No-arg constructor for subclass use. */ @@ -220,7 +222,7 @@ public void updateOffset(Offset offset) { @Override public void fetchRemoteMeta(Map properties) throws Exception { - Backend backend = StreamingJobUtils.selectBackend(); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster); JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress()); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() @@ -306,7 +308,7 @@ public boolean hasMoreDataToConsume() { private boolean compareOffset(Map offsetFirst, Map offsetSecond) throws JobException { - Backend backend = StreamingJobUtils.selectBackend(); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster); CompareOffsetRequest requestParams = new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties, getFrontendAddress(), offsetFirst, offsetSecond); @@ -549,7 +551,7 @@ private void saveChunkMeta(Map> tableSplits) throws } private List requestTableSplits(String table) throws JobException { - Backend backend = StreamingJobUtils.selectBackend(); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster); FetchTableSplitsRequest requestParams = new FetchTableSplitsRequest(getJobId(), sourceType.name(), sourceProperties, getFrontendAddress(), table); @@ -664,7 +666,7 @@ public boolean hasReachedEnd() { * otherwise, conflicts will occur in multi-backends scenarios. */ private void initSourceReader() throws JobException { - Backend backend = StreamingJobUtils.selectBackend(); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster); JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress()); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() @@ -712,7 +714,7 @@ private void initSourceReader() throws JobException { public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); - Backend backend = StreamingJobUtils.selectBackend(); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster); JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress()); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index 438fb29417912e..56c9559699195d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -25,6 +25,8 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.SmallFileMgr; @@ -227,19 +229,29 @@ public static JdbcClient getJdbcClient(DataSourceType sourceType, Map bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getBackendsByClusterName(cloudCluster) + .stream() + .filter(Backend::isLoadAvailable) + .collect(Collectors.toList()); + if (bes.isEmpty()) { + throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + + ", compute_group: " + cloudCluster); + } + int idx = getLastSelectedBackendIndexAndUpdate(); + return bes.get(Math.floorMod(idx, bes.size())); + } - policy = new BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build(); + BeSelectionPolicy policy = new BeSelectionPolicy.Builder() + .setEnableRoundRobin(true).needLoadAvailable().build(); policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate(); - - List backendIds; - backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); if (backendIds.isEmpty()) { throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } - backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); if (backend == null) { throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy new file mode 100644 index 00000000000000..7fee4e14f17b26 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_compute_group.groovy @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Coverage for the non-TVF (source-to-target) path: verifies compute_group is +// rejected in non-cloud mode, persisted on CREATE JOB ... FROM MYSQL in cloud +// mode, and that the bound job runs end-to-end, exercising JdbcSourceOffsetProvider +// RPCs and StreamingMultiTblTask.sendWriteRequest. Lifecycle checks (empty / +// invalid / ALTER / PAUSE) are covered by test_streaming_insert_job_compute_group. +suite("test_streaming_mysql_job_compute_group", + "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_cg_job" + def currentDb = (sql "select database()")[0][0] + def tableName = "mysql_cg_normal1" + def mysqlDb = "test_cdc_cg_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${tableName} force""" + + // Non-cloud mode: compute_group must be rejected regardless of MySQL availability + if (!isCloudMode()) { + test { + sql """CREATE JOB ${jobName} + PROPERTIES ("compute_group" = "any_group") + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://127.0.0.1:3316", + "driver_url" = "nop", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "nop", + "database" = "${mysqlDb}", + "include_tables" = "${tableName}" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "only supported in cloud mode" + } + return + } + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + def clusterRows = sql "show clusters" + assert clusterRows.size() >= 1 : "cloud mode expects at least one cluster" + def cg = clusterRows.get(0).get(0) + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableName}""" + sql """CREATE TABLE ${mysqlDb}.${tableName} ( + `name` varchar(200) NOT NULL, + `age` int DEFAULT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${tableName} (name, age) VALUES ('A1', 1)""" + sql """INSERT INTO ${mysqlDb}.${tableName} (name, age) VALUES ('B1', 2)""" + } + + try { + sql """CREATE JOB ${jobName} + PROPERTIES ("compute_group" = "${cg}") + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${tableName}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + def props = sql """select properties from jobs("type"="insert") where Name='${jobName}'""" + assert props.get(0).get(0).contains("\"compute_group\":\"${cg}\"") + + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + cnt.size() == 1 && Integer.parseInt(cnt.get(0).get(0).toString()) >= 1 + }) + } catch (Exception ex) { + log.info("job: " + sql("""select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("task: " + sql("""select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + def rows = (sql """SELECT count(*) FROM ${currentDb}.${tableName}""").get(0).get(0) as long + assertTrue(rows >= 2, "expected snapshot rows in target table") + } finally { + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${tableName} force""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy new file mode 100644 index 00000000000000..66168a66f7a60f --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group.groovy @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_insert_job_compute_group") { + def tableName = "test_streaming_insert_job_cg_tbl" + def jobName = "test_streaming_insert_job_cg_job" + + sql """drop table if exists `${tableName}` force""" + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int NULL, + `c2` string NULL, + `c3` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def s3Source = """ + SELECT * FROM S3( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ) + """ + + // ---------- Non-cloud mode: compute_group must be rejected ---------- + if (!isCloudMode()) { + test { + sql """ + CREATE JOB ${jobName} + PROPERTIES ("compute_group" = "any_group") + ON STREAMING DO INSERT INTO ${tableName} ${s3Source}; + """ + exception "only supported in cloud mode" + } + return + } + + // ---------- Cloud mode ---------- + def clusterRows = sql "show clusters" + assert clusterRows.size() >= 1 : "cloud mode expects at least one cluster" + def cg = clusterRows.get(0).get(0) + + try { + // 0) Empty compute_group -> CREATE rejected + test { + sql """ + CREATE JOB ${jobName} + PROPERTIES ("compute_group" = "") + ON STREAMING DO INSERT INTO ${tableName} ${s3Source}; + """ + exception "compute_group cannot be empty" + } + + // 1) Invalid compute_group -> CREATE should fail + test { + sql """ + CREATE JOB ${jobName} + PROPERTIES ( + "s3.max_batch_files" = "1", + "compute_group" = "__not_exist_cg__" + ) + ON STREAMING DO INSERT INTO ${tableName} ${s3Source}; + """ + exception "not exist" + } + + // 2) Valid compute_group -> CREATE succeeds; properties reflect it + sql """ + CREATE JOB ${jobName} + PROPERTIES ( + "s3.max_batch_files" = "1", + "compute_group" = "${cg}" + ) + ON STREAMING DO INSERT INTO ${tableName} ${s3Source}; + """ + + def props = sql """select properties from jobs("type"="insert") where Name='${jobName}'""" + log.info("job properties: " + props) + assert props.get(0).get(0).contains("\"compute_group\":\"${cg}\"") + + // Wait for at least one successful task so the cluster binding is exercised end-to-end + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + cnt.size() == 1 && Integer.parseInt(cnt.get(0).get(0).toString()) >= 1 + }) + } catch (Exception ex) { + log.info("job: " + sql("""select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("task: " + sql("""select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + // 3) ALTER without PAUSE -> rejected by upstream guard (Only PAUSED job can be altered) + test { + sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = "${cg}")""" + exception "Only PAUSED job can be altered" + } + + sql """PAUSE JOB where jobname = '${jobName}'""" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def s = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + s.size() == 1 && 'PAUSED' == s.get(0).get(0) + }) + + // 4) ALTER to non-existent cluster -> rejected; state + bound cg unchanged + test { + sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = "__not_exist_cg__")""" + exception "not exist" + } + def afterBadAlter = sql """select status, properties from jobs("type"="insert") where Name='${jobName}'""" + assert afterBadAlter.get(0).get(0) == "PAUSED" + assert afterBadAlter.get(0).get(1).contains("\"compute_group\":\"${cg}\"") + + // 5) ALTER with empty compute_group -> rejected; bound cg unchanged + test { + sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = "")""" + exception "compute_group cannot be empty" + } + def afterEmptyAlter = sql """select properties from jobs("type"="insert") where Name='${jobName}'""" + assert afterEmptyAlter.get(0).get(0).contains("\"compute_group\":\"${cg}\"") + + // 6) ALTER to the same valid cluster -> succeeds, properties updated + sql """ALTER JOB ${jobName} PROPERTIES ("compute_group" = "${cg}")""" + def afterAlter = sql """select status, properties from jobs("type"="insert") where Name='${jobName}'""" + assert afterAlter.get(0).get(0) == "PAUSED" + assert afterAlter.get(0).get(1).contains("\"compute_group\":\"${cg}\"") + } finally { + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists `${tableName}` force""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy new file mode 100644 index 00000000000000..70d99c4483ff58 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_compute_group_docker.groovy @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Docker-mode coverage for compute_group routing on StreamingInsertJob: +// spins up cloud cluster with two compute groups and asserts that the bound +// compute_group actually steers BE traffic. Complements the non-docker suite +// test_streaming_insert_job_compute_group which covers property lifecycle only. +suite("test_streaming_insert_job_compute_group_docker", "docker") { + def options = new ClusterOptions() + options.cloudMode = true + options.setFeNum(1) + options.setBeNum(1) + + def tableName = "test_sij_cg_docker_tbl" + def jobName = "test_sij_cg_docker_job" + def cgA = "compute_cluster" + def cgB = "sij_cg_b_docker" + + docker(options) { + // default BE (index 0) lives in ${cgA}; the BE added below joins ${cgB}. + cluster.addBackend(1, cgB) + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def cgs = sql """SHOW COMPUTE GROUPS""" + cgs.size() == 2 + }) + log.info("compute groups: ${sql """SHOW COMPUTE GROUPS"""}") + + def backends = cluster.getAllBackends().sort { it.backendId } + assertEquals(2, backends.size()) + def beA = backends.get(0) + def beB = backends.get(1) + log.info("beA=${beA.host}:${beA.httpPort} (${cgA}) beB=${beB.host}:${beB.httpPort} (${cgB})") + + sql """drop table if exists `${tableName}` force""" + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int NULL, + `c2` string NULL, + `c3` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + DISTRIBUTED BY HASH(`c1`) BUCKETS 3; + """ + + def s3Source = """ + SELECT * FROM S3( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ) + """ + + try { + def aBefore = get_be_metric(beA.host, beA.httpPort, "load_rows") as long + def bBefore = get_be_metric(beB.host, beB.httpPort, "load_rows") as long + log.info("phase0 a=${aBefore} b=${bBefore}") + + // Phase 1: bind to cgA, verify traffic stays on cgA + sql """ + CREATE JOB ${jobName} + PROPERTIES ( + "s3.max_batch_files" = "1", + "compute_group" = "${cgA}" + ) + ON STREAMING DO INSERT INTO ${tableName} ${s3Source}; + """ + + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + cnt.size() == 1 && Integer.parseInt(cnt.get(0).get(0).toString()) >= 2 + }) + + def aAfter1 = get_be_metric(beA.host, beA.httpPort, "load_rows") as long + def bAfter1 = get_be_metric(beB.host, beB.httpPort, "load_rows") as long + log.info("phase1 a=${aAfter1} b=${bAfter1}") + assertTrue(aAfter1 > aBefore, "phase1 expects cgA load_rows to increase") + assertTrue(bAfter1 == bBefore, "phase1 expects cgB untouched") + def rows1 = (sql """SELECT count(*) FROM ${tableName}""").get(0).get(0) as long + assertTrue(rows1 > 0, "phase1 expects target table to receive rows") + + // Phase 2: ALTER compute_group to cgB with reset offset, verify traffic switches + sql """PAUSE JOB where jobname = '${jobName}'""" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def s = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + s.size() == 1 && 'PAUSED' == s.get(0).get(0) + }) + + sql """ + ALTER JOB ${jobName} PROPERTIES ( + "compute_group" = "${cgB}", + "offset" = '{"fileName":"regression/load/data/anoexist1234.csv"}' + ) + """ + sql """RESUME JOB where jobname = '${jobName}'""" + + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + cnt.size() == 1 && Integer.parseInt(cnt.get(0).get(0).toString()) >= 4 + }) + + def aAfter2 = get_be_metric(beA.host, beA.httpPort, "load_rows") as long + def bAfter2 = get_be_metric(beB.host, beB.httpPort, "load_rows") as long + log.info("phase2 a=${aAfter2} b=${bAfter2}") + assertTrue(aAfter2 == aAfter1, "phase2 expects cgA unchanged") + assertTrue(bAfter2 > bAfter1, "phase2 expects cgB load_rows to increase") + + // Phase 3: compute_group=cgA plus session.cloud_cluster=cgB; compute_group must win + sql """PAUSE JOB where jobname = '${jobName}'""" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def s = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + s.size() == 1 && 'PAUSED' == s.get(0).get(0) + }) + + sql """ + ALTER JOB ${jobName} PROPERTIES ( + "compute_group" = "${cgA}", + "session.cloud_cluster" = "${cgB}", + "offset" = '{"fileName":"regression/load/data/anoexist56789.csv"}' + ) + """ + sql """RESUME JOB where jobname = '${jobName}'""" + + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + cnt.size() == 1 && Integer.parseInt(cnt.get(0).get(0).toString()) >= 6 + }) + + def aAfter3 = get_be_metric(beA.host, beA.httpPort, "load_rows") as long + def bAfter3 = get_be_metric(beB.host, beB.httpPort, "load_rows") as long + log.info("phase3 a=${aAfter3} b=${bAfter3}") + assertTrue(aAfter3 > aAfter2, "phase3 expects cgA to increase (compute_group wins)") + assertTrue(bAfter3 == bAfter2, "phase3 expects cgB untouched") + } finally { + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists `${tableName}` force""" + } + } +}