diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index fd45621e09d298..18046c9beb2485 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1296,6 +1296,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int streaming_task_timeout_multiplier = 10; + @ConfField(mutable = true, masterOnly = true) + public static int streaming_cdc_light_rpc_timeout_sec = 90; + + @ConfField(mutable = true, masterOnly = true) + public static int streaming_cdc_heavy_rpc_timeout_sec = 600; + /** * the max timeout of get kafka meta. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index b1d28b2dcad978..e9bf49c1ec8b9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -60,6 +60,8 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * In PostgreSQL/MySQL, multi-table writes are performed by tasks that only make calls. @@ -134,9 +136,9 @@ private void sendWriteRequest() throws JobException { TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); InternalService.PRequestCdcClientResult result = null; try { - Future future = - BackendServiceProxy.getInstance().requestCdcClient(address, request); - result = future.get(); + Future future = BackendServiceProxy.getInstance() + .requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec); + result = future.get(Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { log.error("Failed to send write records request, {}", result.getStatus().getErrorMsgs(0)); @@ -160,6 +162,11 @@ private void sendWriteRequest() throws JobException { throw new JobException("Failed to parse write records response: " + response); } throw new JobException("Failed to send write records request , error message: " + response); + } catch (TimeoutException te) { + log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} jobId={} backend={}:{} timeout_sec={}", + taskId, getJobId(), backend.getHost(), backend.getBrpcPort(), + Config.streaming_cdc_heavy_rpc_timeout_sec); + throw new JobException("cdc_client RPC timeout: /api/writeRecords taskId=" + taskId); } catch (ExecutionException | InterruptedException ex) { log.error("Send write request failed: ", ex); throw new JobException(ex); @@ -331,20 +338,20 @@ public boolean isTimeout() { * such as a data quality error, and needs to expose it to the user. */ public String getTimeoutReason() { + if (runningBackendId <= 0) { + log.info("No running backend for task {}", runningBackendId); + return ""; + } + Backend backend = Env.getCurrentSystemInfo().getBackend(runningBackendId); try { - if (runningBackendId <= 0) { - log.info("No running backend for task {}", runningBackendId); - return ""; - } - Backend backend = Env.getCurrentSystemInfo().getBackend(runningBackendId); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/getFailReason/" + getTaskId()) .build(); TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); InternalService.PRequestCdcClientResult result = null; - Future future = - BackendServiceProxy.getInstance().requestCdcClient(address, request); - result = future.get(); + Future future = BackendServiceProxy.getInstance() + .requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec); + result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { log.warn("Failed to get task timeout reason, {}", result.getStatus().getErrorMsgs(0)); @@ -363,6 +370,11 @@ public String getTimeoutReason() { } catch (JsonProcessingException e) { log.warn("Failed to get task timeout reason, response: {}", response); } + } catch (TimeoutException te) { + log.warn("cdc_client RPC timeout api=/api/getFailReason jobId={} taskId={} backend={}:{} " + + "timeout_sec={}", + getJobId(), getTaskId(), backend.getHost(), backend.getBrpcPort(), + Config.streaming_cdc_light_rpc_timeout_sec); } catch (ExecutionException | InterruptedException ex) { log.warn("Send get task fail reason request failed: ", ex); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 605de01bd2223c..3d73b2b057a2e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.httpv2.entity.ResponseBody; import org.apache.doris.httpv2.rest.RestApiStatusCode; import org.apache.doris.job.cdc.DataSourceConfigKeys; @@ -64,6 +65,8 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @Getter @@ -220,9 +223,9 @@ public void fetchRemoteMeta(Map properties) throws Exception { TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); InternalService.PRequestCdcClientResult result = null; try { - Future future = - BackendServiceProxy.getInstance().requestCdcClient(address, request); - result = future.get(); + Future future = BackendServiceProxy.getInstance() + .requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec); + result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { log.warn("Failed to get end offset from backend, {}", result.getStatus().getErrorMsgs(0)); @@ -246,6 +249,11 @@ public void fetchRemoteMeta(Map properties) throws Exception { log.warn("Failed to parse end offset response: {}", response); throw new JobException(response); } + } catch (TimeoutException te) { + log.warn("cdc_client RPC timeout api=/api/fetchEndOffset jobId={} backend={}:{} timeout_sec={}", + getJobId(), backend.getHost(), backend.getBrpcPort(), + Config.streaming_cdc_light_rpc_timeout_sec); + throw new JobException("cdc_client RPC timeout: /api/fetchEndOffset jobId=" + getJobId()); } catch (ExecutionException | InterruptedException ex) { log.warn("Get end offset error: ", ex); throw new JobException(ex); @@ -306,9 +314,9 @@ private boolean compareOffset(Map offsetFirst, Map future = - BackendServiceProxy.getInstance().requestCdcClient(address, request); - result = future.get(); + Future future = BackendServiceProxy.getInstance() + .requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec); + result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { log.warn("Failed to compare offset , {}", result.getStatus().getErrorMsgs(0)); @@ -328,6 +336,11 @@ private boolean compareOffset(Map offsetFirst, Map requestTableSplits(String table) throws JobException TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); InternalService.PRequestCdcClientResult result = null; try { - Future future = - BackendServiceProxy.getInstance().requestCdcClient(address, request); - result = future.get(); + Future future = BackendServiceProxy.getInstance() + .requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec); + result = future.get(Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { log.warn("Failed to get split from backend, {}", result.getStatus().getErrorMsgs(0)); @@ -572,6 +585,11 @@ private List requestTableSplits(String table) throws JobException log.warn("Failed to parse split response: {}", response); throw new JobException("Failed to parse split response: " + response); } + } catch (TimeoutException te) { + log.warn("cdc_client RPC timeout api=/api/fetchSplits jobId={} backend={}:{} table={} timeout_sec={}", + getJobId(), backend.getHost(), backend.getBrpcPort(), table, + Config.streaming_cdc_heavy_rpc_timeout_sec); + throw new JobException("cdc_client RPC timeout: /api/fetchSplits jobId=" + getJobId() + " table=" + table); } catch (ExecutionException | InterruptedException ex) { log.warn("Get splits error: ", ex); throw new JobException(ex); @@ -663,9 +681,9 @@ private void initSourceReader() throws JobException { TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); InternalService.PRequestCdcClientResult result = null; try { - Future future = - BackendServiceProxy.getInstance().requestCdcClient(address, request); - result = future.get(); + Future future = BackendServiceProxy.getInstance() + .requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec); + result = future.get(Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { log.warn("Failed to init job {} reader, {}", getJobId(), result.getStatus().getErrorMsgs(0)); @@ -693,6 +711,11 @@ private void initSourceReader() throws JobException { log.warn("Failed to init {} source reader, {}", getJobId(), response); throw new JobException("Failed to init source reader, cause " + e.getMessage()); } + } catch (TimeoutException te) { + log.warn("cdc_client RPC timeout api=/api/initReader jobId={} backend={}:{} timeout_sec={}", + getJobId(), backend.getHost(), backend.getBrpcPort(), + Config.streaming_cdc_heavy_rpc_timeout_sec); + throw new JobException("cdc_client RPC timeout: /api/initReader jobId=" + getJobId()); } catch (ExecutionException | InterruptedException ex) { log.warn("init source reader: ", ex); throw new JobException(ex); @@ -711,13 +734,17 @@ public void cleanMeta(Long jobId) throws JobException { TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); InternalService.PRequestCdcClientResult result = null; try { - Future future = - BackendServiceProxy.getInstance().requestCdcClient(address, request); - result = future.get(); + Future future = BackendServiceProxy.getInstance() + .requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec); + result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { log.warn("Failed to close job {} source {}", jobId, result.getStatus().getErrorMsgs(0)); } + } catch (TimeoutException te) { + log.warn("cdc_client RPC timeout api=/api/close jobId={} backend={}:{} timeout_sec={}", + jobId, backend.getHost(), backend.getBrpcPort(), + Config.streaming_cdc_light_rpc_timeout_sec); } catch (ExecutionException | InterruptedException ex) { log.warn("Close job error: ", ex); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java index 8e2e5eff9dffd5..749b592c775832 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java @@ -58,6 +58,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; /** @@ -203,9 +205,10 @@ private List> fetchTaskEndOffset(long taskId, List sca String rawResponse = null; try { TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - Future future = - BackendServiceProxy.getInstance().requestCdcClient(address, request); - InternalService.PRequestCdcClientResult result = future.get(); + Future future = BackendServiceProxy.getInstance() + .requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec); + InternalService.PRequestCdcClientResult result = + future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { log.warn("Failed to get task {} offset from BE {}: {}", taskId, @@ -221,6 +224,11 @@ private List> fetchTaskEndOffset(long taskId, List sca log.info("Fetched task {} offset from BE {}: {}", taskId, backend.getHost(), data); return data; } + } catch (TimeoutException te) { + log.warn("cdc_client RPC timeout api=/api/getTaskOffset jobId={} taskId={} backend={}:{} " + + "timeout_sec={}", + jobId, taskId, backend.getHost(), backend.getBrpcPort(), + Config.streaming_cdc_light_rpc_timeout_sec); } catch (Exception ex) { log.warn("Get task offset error for task {} from BE {}, raw response: {}", taskId, backend.getHost(), rawResponse, ex); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index fc3dac0c214764..456ec6a4620ce9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -217,6 +217,11 @@ public Future requestCdcClient( return stub.requestCdcClient(request); } + public Future requestCdcClient( + InternalService.PRequestCdcClientRequest request, int timeoutSec) { + return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).requestCdcClient(request); + } + public void shutdown() { ConnectivityState state = channel.getState(false); LOG.warn("shut down backend service client: {}, channel state: {}", address, state); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index fbdfb3cf223a70..2cd756ee4860ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -608,4 +608,15 @@ public Future requestCdcClient(TNetwork } return null; } + + public Future requestCdcClient(TNetworkAddress address, + InternalService.PRequestCdcClientRequest request, int timeoutSec) { + try { + final BackendServiceClient client = getProxy(address); + return client.requestCdcClient(request, timeoutSec); + } catch (Throwable e) { + LOG.warn("request cdc client failed, address={}:{}", address.getHostname(), address.getPort(), e); + } + return null; + } }