Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.InternalErrorCode;
Expand Down Expand Up @@ -167,6 +170,10 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
// Converted form of sourceProperties; must be refreshed whenever sourceProperties changes.
private transient Map<String, String> convertedSourceProperties;

@Getter
@SerializedName("ccn")
private volatile String cloudCluster;

// The sampling window starts at the beginning of the sampling window.
// If the error rate exceeds `max_filter_ratio` within the window, the sampling fails.
@Setter
Expand Down Expand Up @@ -238,8 +245,7 @@ private void initSourceJob() {
}
StreamingJobUtils.resolveAndValidateSource(
dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls);
this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType,
getConvertedSourceProperties());
this.offsetProvider = createOffsetProvider(getConvertedSourceProperties());
JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider;
rdsOffsetProvider.splitChunks(createTbls);
} catch (Exception ex) {
Expand Down Expand Up @@ -292,6 +298,7 @@ private void init() {
this.jobProperties = new StreamingJobProperties(properties);
jobProperties.validate();
this.sampleWindowMs = jobProperties.getMaxIntervalSecond() * 10 * 1000;
resolveCloudCluster();
// build time definition
JobExecutionConfiguration execConfig = getJobConfig();
TimerDefinition timerDefinition = new TimerDefinition();
Expand All @@ -305,13 +312,75 @@ private void init() {
}
}

private void resolveCloudCluster() throws AnalysisException {
String requested = validateComputeGroupProperty(properties);
if (requested != null) {
this.cloudCluster = requested;
return;
}
if (!Config.isCloudMode()) {
return;
}
if (ConnectContext.get() == null) {
throw new AnalysisException("compute_group must be specified when no active session is available");
}
String sessionCluster;
try {
sessionCluster = ConnectContext.get().getCloudCluster();
} catch (ComputeGroupException e) {
throw new AnalysisException("failed to resolve compute_group: " + e.getMessage());
}
if (StringUtils.isBlank(sessionCluster)) {
throw new AnalysisException("compute_group is required in cloud mode; "
+ "specify compute_group explicitly or bind a default cluster with USAGE");
}
try {
((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(sessionCluster);
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
this.cloudCluster = sessionCluster;
}

// returns the validated compute_group value, or null when the property is absent.
// throws if the key is present but blank, non-cloud mode, or the user lacks USAGE on the cluster.
private String validateComputeGroupProperty(Map<String, String> props) throws AnalysisException {
if (props == null || !props.containsKey(StreamingJobProperties.COMPUTE_GROUP_PROPERTY)) {
return null;
}
String value = props.get(StreamingJobProperties.COMPUTE_GROUP_PROPERTY);
if (StringUtils.isBlank(value)) {
throw new AnalysisException("compute_group cannot be empty");
}
if (!Config.isCloudMode()) {
throw new AnalysisException("compute_group is only supported in cloud mode");
}
try {
((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(value);
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
return value;
}

private SourceOffsetProvider createOffsetProvider(Map<String, String> jdbcSourceProps) {
SourceOffsetProvider provider;
if (tvfType != null) {
provider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
} else {
provider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, jdbcSourceProps);
}
provider.setCloudCluster(this.cloudCluster);
return provider;
}

private void initInsertJob() {
try {
init();
UnboundTVFRelation currentTvf = getCurrentTvf();
this.tvfType = currentTvf.getFunctionName();
this.originTvfProps = currentTvf.getProperties().getMap();
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
this.offsetProvider = createOffsetProvider(sourceProperties);
this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
// Validate source-side resources (e.g. PG slot/publication ownership) once at job
// creation so conflicts fail fast. No-op for standalone cdc_stream TVF (no job).
Expand Down Expand Up @@ -411,6 +480,8 @@ public void alterJob(AlterJobCommand alterJobCommand) throws AnalysisException,
logParts.add("sql: " + encryptedSql);
}

validateComputeGroupProperty(alterJobCommand.getProperties());

// update properties
if (!alterJobCommand.getProperties().isEmpty()) {
modifyPropertiesInternal(alterJobCommand.getProperties());
Expand Down Expand Up @@ -547,7 +618,7 @@ protected AbstractStreamingTask createStreamingTask() throws JobException {
private AbstractStreamingTask createStreamingMultiTblTask() throws JobException {
return new StreamingMultiTblTask(getJobId(), Env.getCurrentEnv().getNextId(), dataSourceType,
offsetProvider, getConvertedSourceProperties(), targetDb, targetProperties, jobProperties,
getCreateUser());
getCreateUser(), cloudCluster);
}

private Map<String, String> getConvertedSourceProperties() throws JobException {
Expand All @@ -571,7 +642,8 @@ private Map<String, String> getProviderProps() throws JobException {
protected AbstractStreamingTask createStreamingInsertTask() {
return new StreamingInsertTask(getJobId(), Env.getCurrentEnv().getNextId(),
getExecuteSql(),
offsetProvider, getCurrentDbName(), jobProperties, getOriginTvfProps(), getCreateUser());
offsetProvider, getCurrentDbName(), jobProperties, getOriginTvfProps(),
getCreateUser(), cloudCluster);
}

public void recordTasks(AbstractStreamingTask task) {
Expand Down Expand Up @@ -856,6 +928,10 @@ private void modifyPropertiesInternal(Map<String, String> inputProperties) throw
resetCloudProgress(offset);
}
}
if (inputProperties.containsKey(StreamingJobProperties.COMPUTE_GROUP_PROPERTY)) {
this.cloudCluster = inputProperties.get(StreamingJobProperties.COMPUTE_GROUP_PROPERTY);
offsetProvider.setCloudCluster(this.cloudCluster);
}
this.properties.putAll(inputProperties);
this.jobProperties = new StreamingJobProperties(this.properties);
}
Expand Down Expand Up @@ -1227,11 +1303,9 @@ public void replayOnVisible(TransactionState txnState) {
@Override
public void gsonPostProcess() throws IOException {
if (offsetProvider == null) {
offsetProvider = createOffsetProvider(sourceProperties);
if (tvfType != null) {
offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
offsetProvider.restoreFromPersistInfo(offsetProviderPersist);
} else {
offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, sourceProperties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.Util;
Expand All @@ -41,6 +42,7 @@

import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -58,6 +60,7 @@ public class StreamingInsertTask extends AbstractStreamingTask {
private ConnectContext ctx;
private StreamingJobProperties jobProperties;
private Map<String, String> originTvfProps;
private String cloudCluster;
SourceOffsetProvider offsetProvider;

public StreamingInsertTask(long jobId,
Expand All @@ -67,13 +70,15 @@ public StreamingInsertTask(long jobId,
String currentDb,
StreamingJobProperties jobProperties,
Map<String, String> originTvfProps,
UserIdentity userIdentity) {
UserIdentity userIdentity,
String cloudCluster) {
super(jobId, taskId, userIdentity);
this.sql = sql;
this.currentDb = currentDb;
this.offsetProvider = offsetProvider;
this.jobProperties = jobProperties;
this.originTvfProps = originTvfProps;
this.cloudCluster = cloudCluster;
}

@Override
Expand All @@ -86,6 +91,10 @@ public void before() throws Exception {
this.startTimeMs = System.currentTimeMillis();
ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
ctx.setSessionVariable(jobProperties.getSessionVariable(ctx.getSessionVariable()));
// apply after session merge so compute_group wins over session.cloud_cluster
if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
ctx.setCloudCluster(cloudCluster);
}
StatementContext statementContext = new StatementContext();
ctx.setStatementContext(statementContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ public class StreamingJobProperties implements JobProperties {
public static final String SESSION_VAR_PREFIX = "session.";
public static final String INTERNAL_KEY_PREFIX = "__";
public static final String OFFSET_PROPERTY = "offset";
public static final String COMPUTE_GROUP_PROPERTY = "compute_group";
public static final List<String> SUPPORT_STREAM_JOB_PROPS = Arrays.asList(MAX_INTERVAL_SECOND_PROPERTY,
S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY, OFFSET_PROPERTY);
S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY, OFFSET_PROPERTY, COMPUTE_GROUP_PROPERTY);

public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
Expand Down Expand Up @@ -195,4 +196,8 @@ private Map<String, String> parseSessionVarMap() {
public String getOffsetProperty() {
return properties.get(OFFSET_PROPERTY);
}

public String getComputeGroup() {
return properties.get(COMPUTE_GROUP_PROPERTY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class StreamingMultiTblTask extends AbstractStreamingTask {
private Map<String, String> targetProperties;
private String targetDb;
private StreamingJobProperties jobProperties;
private String cloudCluster;
private long scannedRows = 0L;
private long loadBytes = 0L;
private long filteredRows = 0L;
Expand All @@ -90,14 +91,16 @@ public StreamingMultiTblTask(Long jobId,
String targetDb,
Map<String, String> targetProperties,
StreamingJobProperties jobProperties,
UserIdentity userIdentity) {
UserIdentity userIdentity,
String cloudCluster) {
super(jobId, taskId, userIdentity);
this.dataSourceType = dataSourceType;
this.offsetProvider = offsetProvider;
this.sourceProperties = sourceProperties;
this.targetProperties = targetProperties;
this.jobProperties = jobProperties;
this.targetDb = targetDb;
this.cloudCluster = cloudCluster;
this.timeoutMs = Config.streaming_task_timeout_multiplier * jobProperties.getMaxIntervalSecond() * 1000L;
}

Expand All @@ -123,7 +126,7 @@ public void run() throws JobException {
}

private void sendWriteRequest() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
log.info("start to run streaming multi task {} in backend {}/{}, offset is {}",
taskId, backend.getId(), backend.getHost(), runningOffset.toString());
this.runningBackendId = backend.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ default void initOnCreate() throws JobException {}
*/
void updateOffset(Offset offset);

/**
* Bind the compute group that should route FE-initiated RPCs.
* Default: no-op for providers that do not make BE RPCs.
*/
default void setCloudCluster(String cloudCluster) {}

/**
* Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider {

volatile boolean hasMoreData = true;

transient volatile String cloudCluster;

/**
* No-arg constructor for subclass use.
*/
Expand Down Expand Up @@ -220,7 +222,7 @@ public void updateOffset(Offset offset) {

@Override
public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
Backend backend = StreamingJobUtils.selectBackend();
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
Expand Down Expand Up @@ -306,7 +308,7 @@ public boolean hasMoreDataToConsume() {

private boolean compareOffset(Map<String, String> offsetFirst, Map<String, String> offsetSecond)
throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
CompareOffsetRequest requestParams =
new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties,
getFrontendAddress(), offsetFirst, offsetSecond);
Expand Down Expand Up @@ -549,7 +551,7 @@ private void saveChunkMeta(Map<String, List<SnapshotSplit>> tableSplits) throws
}

private List<SnapshotSplit> requestTableSplits(String table) throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
FetchTableSplitsRequest requestParams =
new FetchTableSplitsRequest(getJobId(), sourceType.name(),
sourceProperties, getFrontendAddress(), table);
Expand Down Expand Up @@ -664,7 +666,7 @@ public boolean hasReachedEnd() {
* otherwise, conflicts will occur in multi-backends scenarios.
*/
private void initSourceReader() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
Expand Down Expand Up @@ -712,7 +714,7 @@ private void initSourceReader() throws JobException {
public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
Backend backend = StreamingJobUtils.selectBackend();
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.SmallFileMgr;
Expand Down Expand Up @@ -227,19 +229,29 @@ public static JdbcClient getJdbcClient(DataSourceType sourceType, Map<String, St
return JdbcClient.createJdbcClient(config);
}

public static Backend selectBackend() throws JobException {
Backend backend = null;
BeSelectionPolicy policy = null;
public static Backend selectBackend(String cloudCluster) throws JobException {
if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
List<Backend> bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(cloudCluster)
.stream()
.filter(Backend::isLoadAvailable)
.collect(Collectors.toList());
if (bes.isEmpty()) {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG
+ ", compute_group: " + cloudCluster);
}
int idx = getLastSelectedBackendIndexAndUpdate();
return bes.get(Math.floorMod(idx, bes.size()));
}

policy = new BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build();
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
.setEnableRoundRobin(true).needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();

List<Long> backendIds;
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
Backend backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
Expand Down
Loading
Loading