diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 220062bd5799..a801e6311c0f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -74,6 +74,13 @@ public class MasterConfig implements Validator {
*/
private String masterRegistryPath;
+ /**
+ * Configuration for the master's task dispatch timeout check mechanism.
+ * This controls whether the system enforces a time limit for dispatching tasks to workers,
+ * and if so, how long to wait before marking a task as failed due to dispatch timeout.
+ */
+ private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+
@Override
public boolean supports(Class> clazz) {
return MasterConfig.class.isAssignableFrom(clazz);
@@ -97,6 +104,19 @@ public void validate(Object target, Errors errors) {
if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
errors.rejectValue("worker-group-refresh-interval", null, "should >= 10s");
}
+
+ // Validate task dispatch policy config
+ TaskDispatchPolicy configTaskDispatchPolicy = masterConfig.getTaskDispatchPolicy();
+ if (configTaskDispatchPolicy != null && configTaskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
+ if (configTaskDispatchPolicy.getMaxTaskDispatchDuration() == null) {
+ errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
+ "must be specified when dispatch timeout checker is enabled");
+ } else if (configTaskDispatchPolicy.getMaxTaskDispatchDuration().toMillis() <= 0) {
+ errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
+ "must be a positive duration (e.g., '2m', '5m', '30m')");
+ }
+ }
+
if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
@@ -122,6 +142,7 @@ private void printConfig() {
"\n command-fetch-strategy: " + commandFetchStrategy +
"\n worker-load-balancer-configuration-properties: "
+ workerLoadBalancerConfigurationProperties +
+ "\n taskDispatchPolicy: " + taskDispatchPolicy +
"\n****************************Master Configuration**************************************";
log.info(config);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java
new file mode 100644
index 000000000000..5be6a7041b3b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/TaskDispatchPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.config;
+
+import java.time.Duration;
+
+import lombok.Data;
+
+/**
+ * Configuration for the master's task dispatch policy.
+ *
+ * When enabled, tasks that remain in the dispatch queue longer than
+ * {@link #maxTaskDispatchDuration} will be marked as failed to prevent indefinite queuing.
+ */
+@Data
+public class TaskDispatchPolicy {
+
+ /**
+ * Indicates whether the dispatch timeout checking mechanism is enabled.
+ *
+ * If {@code true}, tasks exceeding the configured dispatch duration will be failed automatically.
+ */
+ private boolean dispatchTimeoutFailedEnabled = false;
+
+ /**
+ * The maximum allowed duration a task may wait in the dispatch queue before being assigned to a worker.
+ *
+ * Tasks that exceed this duration will be marked as failed.
+ *
+ * Examples: {@code "2m"}, {@code "5m"}, {@code "30m"}.
+ */
+ private Duration maxTaskDispatchDuration;
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
index f7566d3133b1..96331b2dcae9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
@@ -24,10 +24,13 @@
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.IWorkerLoadBalancer;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.task.executor.eventbus.ITaskExecutorLifecycleEventReporter;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorDispatchResponse;
@@ -55,18 +58,26 @@ public class PhysicalTaskExecutorClientDelegator implements ITaskExecutorClientD
@Autowired
private IWorkerLoadBalancer workerLoadBalancer;
+ @Autowired
+ private ClusterManager clusterManager;
+
@Override
public void dispatch(final ITaskExecutionRunnable taskExecutionRunnable) throws TaskDispatchException {
final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
final String taskName = taskExecutionContext.getTaskName();
+ final String workerGroup = taskExecutionContext.getWorkerGroup();
+
+ // workerGroup not exist
+ if (!clusterManager.getWorkerClusters().containsWorkerGroup(workerGroup)) {
+ throw new WorkerGroupNotFoundException(workerGroup);
+ }
+
+ // select an available worker from the worker group; throws NoAvailableWorkerException if none is available.
final String physicalTaskExecutorAddress = workerLoadBalancer
- .select(taskExecutionContext.getWorkerGroup())
+ .select(workerGroup)
.map(Host::of)
.map(Host::getAddress)
- .orElseThrow(() -> new TaskDispatchException(
- String.format("Cannot find the host to dispatch Task[id=%s, name=%s, workerGroup=%s]",
- taskExecutionContext.getTaskInstanceId(), taskName,
- taskExecutionContext.getWorkerGroup())));
+ .orElseThrow(() -> new NoAvailableWorkerException(workerGroup));
taskExecutionContext.setHost(physicalTaskExecutorAddress);
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index 28834e27e7e1..1c1e7f6664d1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -18,12 +18,17 @@
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
+import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
+import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,11 +53,15 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
- public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
+ private final TaskDispatchPolicy taskDispatchPolicy;
+
+ public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient,
+ TaskDispatchPolicy taskDispatchPolicy) {
super("WorkerGroupTaskDispatcher-" + workerGroupName);
this.taskExecutorClient = taskExecutorClient;
this.workerGroupEventBus = new TaskDispatchableEventBus<>();
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
+ this.taskDispatchPolicy = taskDispatchPolicy;
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
}
@@ -84,23 +93,75 @@ public void run() {
}
private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
+ final int taskInstanceId = taskExecutionRunnable.getId();
+ final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
try {
- if (!waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) {
+ if (!waitingDispatchTaskIds.remove(taskInstanceId)) {
log.info(
"The task: {} doesn't exist in waitingDispatchTaskIds(it might be paused or killed), will skip dispatch",
- taskExecutionRunnable.getId());
+ taskInstanceId);
return;
}
taskExecutorClient.dispatch(taskExecutionRunnable);
- } catch (Exception e) {
+ } catch (Exception ex) {
+ if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
+ // Checks whether the given task has exceeded its allowed dispatch timeout.
+ // If a dispatch timeout occurs, the task will NOT be put back into the queue.
+ long timeoutMs = this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
+ long elapsed = System.currentTimeMillis() - taskExecutionContext.getFirstDispatchTime();
+ if (elapsed > timeoutMs) {
+ handleDispatchFailure(taskExecutionRunnable, ex, elapsed, timeoutMs);
+ return;
+ }
+ }
+
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not exceed 60 seconds
- long waitingTimeMills = Math.min(
+ long waitingTimeMillis = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
- dispatchTask(taskExecutionRunnable, waitingTimeMills);
- log.error("Dispatch Task: {} failed will retry after: {}/ms", taskExecutionRunnable.getId(),
- waitingTimeMills, e);
+ dispatchTask(taskExecutionRunnable, waitingTimeMillis);
+ log.warn("Dispatch Task: {} failed will retry after: {}/ms", taskInstanceId,
+ waitingTimeMillis, ex);
+ }
+ }
+
+ /**
+ * Marks the specified task as fatally failed due to an unrecoverable dispatch error,such as timeout or persistent client failure.
+ * Once this method is called, the task is considered permanently failed and will not be retried.
+ *
+ * @param taskExecutionRunnable the task to mark as fatally failed; must not be null
+ * @param ex the dispatch exception that triggered this failure handling; must not be null
+ * @param elapsed the time (in milliseconds) already spent attempting to dispatch the task
+ * @param timeoutMs the configured dispatch timeout threshold (in milliseconds)
+ */
+ private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex,
+ long elapsed, long timeoutMs) {
+ final String taskName = taskExecutionRunnable.getName();
+
+ log.warn("[DISPATCH_FAILED] taskName: {}, timed out after {} ms (limit: {} ms))", taskName, elapsed, timeoutMs);
+
+ if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) {
+ log.error("[DISPATCH_FAILED] taskName: {}, Worker group not found.", taskName, ex);
+ final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+ .endTime(new Date())
+ .build();
+ taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+ } else if (ExceptionUtils.isNoAvailableWorkerException(ex)) {
+ log.error("[DISPATCH_FAILED] taskName: {}, No available worker.", taskName, ex);
+ final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+ .endTime(new Date())
+ .build();
+ taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
+ } else {
+ log.error("[DISPATCH_FAILED] taskName: {}, Unexpected dispatch error.", taskName, ex);
+ final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
+ .taskExecutionRunnable(taskExecutionRunnable)
+ .endTime(new Date())
+ .build();
+ taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
index a85674c6f4d7..086fc5359ee4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
@@ -41,8 +42,11 @@ public class WorkerGroupDispatcherCoordinator implements AutoCloseable {
private final ConcurrentHashMap workerGroupDispatcherMap;
- public WorkerGroupDispatcherCoordinator() {
+ private final MasterConfig masterConfig;
+
+ public WorkerGroupDispatcherCoordinator(final MasterConfig masterConfig) {
workerGroupDispatcherMap = new ConcurrentHashMap<>();
+ this.masterConfig = masterConfig;
}
public void start() {
@@ -99,7 +103,8 @@ public void close() throws Exception {
private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String workerGroup) {
return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
- WorkerGroupDispatcher workerGroupDispatcher = new WorkerGroupDispatcher(wg, taskExecutorClient);
+ WorkerGroupDispatcher workerGroupDispatcher =
+ new WorkerGroupDispatcher(wg, taskExecutorClient, masterConfig.getTaskDispatchPolicy());
workerGroupDispatcher.start();
return workerGroupDispatcher;
});
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java
new file mode 100644
index 000000000000..d51e2342cc2b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/dispatch/NoAvailableWorkerException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.exception.dispatch;
+
+public class NoAvailableWorkerException extends TaskDispatchException {
+
+ public NoAvailableWorkerException(String workerGroup) {
+ super("Cannot find available worker under worker group: " + workerGroup);
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
index 9103bc5075d8..892dfab065e0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/ExceptionUtils.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.master.utils;
+import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
+import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
+
import org.springframework.dao.DataAccessResourceFailureException;
public class ExceptionUtils {
@@ -25,4 +28,11 @@ public static boolean isDatabaseConnectedFailedException(Throwable e) {
return e instanceof DataAccessResourceFailureException;
}
+ public static boolean isWorkerGroupNotFoundException(Throwable e) {
+ return e instanceof WorkerGroupNotFoundException;
+ }
+
+ public static boolean isNoAvailableWorkerException(Throwable e) {
+ return e instanceof NoAvailableWorkerException;
+ }
}
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index 39a0f4311ab7..3d5c6685140b 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -111,6 +111,11 @@ master:
# Master max concurrent workflow instances, when the master's workflow instance count exceeds this value, master server will be marked as busy.
max-concurrent-workflow-instances: 2147483647
worker-group-refresh-interval: 5m
+ # Task dispatch timeout check (currently disabled).
+ # When enabled, tasks not dispatched within this duration are marked as failed.
+ task-dispatch-policy:
+ dispatch-timeout-failed-enabled: false
+ max-task-dispatch-duration: 5m
command-fetch-strategy:
type: ID_SLOT_BASED
config:
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
index e2e96a96140a..8e838b724a21 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinatorTest.java
@@ -24,25 +24,33 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.test.util.ReflectionTestUtils;
@ExtendWith(MockitoExtension.class)
class WorkerGroupDispatcherCoordinatorTest {
- @InjectMocks
private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
@Mock
private ITaskExecutorClient taskExecutorClient;
+ @BeforeEach
+ void setUp() {
+ MasterConfig masterConfig = new MasterConfig();
+ workerGroupDispatcherCoordinator = new WorkerGroupDispatcherCoordinator(masterConfig);
+ ReflectionTestUtils.setField(workerGroupDispatcherCoordinator, "taskExecutorClient", taskExecutorClient);
+ }
+
@Test
void addTaskToWorkerGroup_NewWorkerGroup_ShouldAddTask() {
String workerGroup = "newGroup";
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
index 77525cb1810e..4f1def750758 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java
@@ -19,21 +19,36 @@
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
+import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
+import org.apache.dolphinscheduler.server.master.exception.dispatch.NoAvailableWorkerException;
import org.apache.dolphinscheduler.server.master.exception.dispatch.TaskDispatchException;
+import org.apache.dolphinscheduler.server.master.exception.dispatch.WorkerGroupNotFoundException;
import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
@@ -46,7 +61,9 @@ class WorkerGroupDispatcherTest {
@BeforeEach
void setUp() {
taskExecutorClient = mock(ITaskExecutorClient.class);
- dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient);
+ final MasterConfig masterConfig = new MasterConfig();
+ dispatcher =
+ new WorkerGroupDispatcher("TestGroup", taskExecutorClient, masterConfig.getTaskDispatchPolicy());
}
@Test
@@ -138,4 +155,276 @@ void dispatch_TaskDispatchFails_RetryLogicWorks() throws TaskDispatchException {
.untilAsserted(() -> verify(taskExecutorClient, times(2)).dispatch(taskExecutionRunnable));
}
+ @Test
+ void dispatchTask_WorkerGroupNotFound_TimeoutDisabled_ShouldKeepRetrying() throws TaskDispatchException {
+ // Given
+ ITaskExecutionRunnable task = mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+ WorkerGroupNotFoundException ex = new WorkerGroupNotFoundException("no worker group");
+ doThrow(ex).when(taskExecutorClient).dispatch(task);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(task, 0);
+
+ // When & Then
+ await().atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> {
+ // Ensure it's retrying
+ verify(taskExecutorClient, atLeast(2)).dispatch(task);
+
+ // Ensure NO event has been published during this time
+ WorkflowEventBus eventBus = task.getWorkflowEventBus();
+ verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
+ });
+ }
+
+ @Test
+ void dispatchTask_WorkerGroupNotFound_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent() throws TaskDispatchException {
+ // Given
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutFailedEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable = mockTaskExecutionRunnableWithFirstDispatchTime(
+ System.currentTimeMillis() - 500);
+
+ WorkerGroupNotFoundException ex = new WorkerGroupNotFoundException("worker group not found");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ // Then
+ await().atMost(Duration.ofSeconds(2))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
+ WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
+ verify(eventBus).publish(argThat(evt -> evt instanceof TaskFailedLifecycleEvent &&
+ ((TaskFailedLifecycleEvent) evt).getTaskExecutionRunnable() == taskExecutionRunnable));
+ });
+ }
+
+ @Test
+ void dispatchTask_WorkerGroupNotFound_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent() throws TaskDispatchException, InterruptedException {
+ // Given: Dispatcher configured with a 5-minute timeout (enabled)
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutFailedEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(5));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, taskDispatchPolicy);
+
+ // Mock task with first dispatch time set to 100ms ago → well within timeout window
+ ITaskExecutionRunnable taskExecutionRunnable = mockTaskExecutionRunnableWithFirstDispatchTime(
+ System.currentTimeMillis() - 100);
+
+ // Use CountDownLatch to reliably detect actual dispatch invocation
+ CountDownLatch dispatchCalled = new CountDownLatch(1);
+
+ // Stub client to throw WorkerGroupNotFoundException and signal the latch
+ doAnswer(invocation -> {
+ dispatchCalled.countDown(); // Confirm dispatch was attempted
+ throw new WorkerGroupNotFoundException("Worker group 'TestGroup' does not exist");
+ }).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ // When: Start dispatcher and dispatch the task
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ // Wait up to 1 second for the dispatch attempt to complete
+ boolean dispatched = dispatchCalled.await(1000, TimeUnit.MILLISECONDS);
+ Assertions.assertTrue(dispatched, "Expected dispatch() to be called within 1 second");
+
+ // Then: Verify NO failure events are published because timeout has NOT been exceeded
+ WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
+ verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
+ }
+
+ @Test
+ void dispatchTask_NoAvailableWorker_TimeoutDisabled_ShouldKeepRetrying() throws TaskDispatchException {
+ // Given
+ ITaskExecutionRunnable task = mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+ NoAvailableWorkerException ex = new NoAvailableWorkerException("no worker");
+ doThrow(ex).when(taskExecutorClient).dispatch(task);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(task, 0);
+
+ // When & Then
+ await().atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> {
+ // Ensure it's retrying
+ verify(taskExecutorClient, atLeast(2)).dispatch(task);
+
+ // Ensure NO event has been published during this time
+ WorkflowEventBus eventBus = task.getWorkflowEventBus();
+ verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
+ });
+ }
+
+ @Test
+ void dispatchTask_NoAvailableWorker_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent() throws TaskDispatchException {
+ // Given: enable timeout (200ms), task already waited 500ms
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutFailedEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable =
+ mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis() - 500);
+
+ NoAvailableWorkerException ex = new NoAvailableWorkerException("no worker");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ // Then
+ await().atMost(Duration.ofSeconds(2))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
+ WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
+ verify(eventBus).publish(argThat(evt -> evt instanceof TaskFailedLifecycleEvent &&
+ ((TaskFailedLifecycleEvent) evt).getTaskExecutionRunnable() == taskExecutionRunnable));
+ });
+ }
+
+ @Test
+ void dispatchTask_NoAvailableWorker_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent() throws TaskDispatchException, InterruptedException {
+ // Given: Configure dispatcher with a 5-minute dispatch timeout (enabled)
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutFailedEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(5));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, taskDispatchPolicy);
+
+ // Mock task with first dispatch time set to 100ms ago → ensures it's NOT timed out yet
+ ITaskExecutionRunnable taskExecutionRunnable = mockTaskExecutionRunnableWithFirstDispatchTime(
+ System.currentTimeMillis() - 100);
+
+ // Use CountDownLatch to reliably detect when dispatch is actually invoked (avoids timing flakiness)
+ CountDownLatch dispatchCalled = new CountDownLatch(1);
+
+ // Stub the client to throw NoAvailableWorkerException on dispatch and signal the latch
+ doAnswer(invocation -> {
+ dispatchCalled.countDown(); // Signal that dispatch was attempted
+ throw new NoAvailableWorkerException("no worker");
+ }).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ // When: Start dispatcher and trigger task dispatch
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ // Wait up to 1 second for the dispatch attempt to occur (ensures async execution completes)
+ boolean dispatched = dispatchCalled.await(1000, TimeUnit.MILLISECONDS);
+ Assertions.assertTrue(dispatched, "Expected dispatch() to be called within 1 second");
+
+ // Then: Verify NO failure events are published since timeout has NOT been exceeded
+ WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
+ verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
+ }
+
+ @Test
+ void dispatchTask_GenericTaskDispatchException_TimeoutDisabled_ShouldKeepRetrying() throws TaskDispatchException {
+ // Given
+ ITaskExecutionRunnable task = mockTaskExecutionRunnableWithFirstDispatchTime(System.currentTimeMillis());
+ TaskDispatchException ex = new TaskDispatchException("generic dispatch error");
+ doThrow(ex).when(taskExecutorClient).dispatch(task);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(task, 0);
+
+ // When & Then
+ await().atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> {
+ // Ensure it's retrying
+ verify(taskExecutorClient, atLeast(2)).dispatch(task);
+
+ // Ensure NO event has been published during this time
+ WorkflowEventBus eventBus = task.getWorkflowEventBus();
+ verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
+ });
+ }
+
+ @Test
+ void dispatchTask_GenericTaskDispatchException_TimeoutEnabledAndExceeded_ShouldPublishFailedEvent() throws TaskDispatchException {
+ // Given
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutFailedEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMillis(200));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, taskDispatchPolicy);
+
+ ITaskExecutionRunnable taskExecutionRunnable = mockTaskExecutionRunnableWithFirstDispatchTime(
+ System.currentTimeMillis() - 500);
+
+ TaskDispatchException ex = new TaskDispatchException("generic dispatch error");
+ doThrow(ex).when(taskExecutorClient).dispatch(taskExecutionRunnable);
+
+ dispatcher.start();
+ dispatcher.dispatchTask(taskExecutionRunnable, 0);
+
+ // Then
+ await().atMost(Duration.ofSeconds(2))
+ .untilAsserted(() -> {
+ verify(taskExecutorClient, times(1)).dispatch(taskExecutionRunnable);
+ WorkflowEventBus eventBus = taskExecutionRunnable.getWorkflowEventBus();
+ verify(eventBus).publish(argThat(evt -> evt instanceof TaskFailedLifecycleEvent &&
+ ((TaskFailedLifecycleEvent) evt).getTaskExecutionRunnable() == taskExecutionRunnable));
+ });
+ }
+
+ @Test
+ void dispatchTask_GenericTaskDispatchException_TimeoutEnabledButNotExceeded_ShouldNotPublishAnyFailureEvent() throws TaskDispatchException, InterruptedException {
+ // Given: Dispatcher configured with a 5-minute dispatch timeout (enabled)
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutFailedEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofMinutes(5));
+
+ dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, taskDispatchPolicy);
+
+ // Mock task with first dispatch time set to 100ms ago → well within timeout window
+ ITaskExecutionRunnable task = mockTaskExecutionRunnableWithFirstDispatchTime(
+ System.currentTimeMillis() - 100);
+
+ // Use CountDownLatch to reliably detect when dispatch is actually invoked
+ CountDownLatch dispatchCalled = new CountDownLatch(1);
+
+ // Stub client to throw a generic TaskDispatchException and signal the latch
+ doAnswer(invocation -> {
+ dispatchCalled.countDown(); // Confirm dispatch attempt occurred
+ throw new TaskDispatchException("Generic dispatch error");
+ }).when(taskExecutorClient).dispatch(task);
+
+ // When: Start dispatcher and trigger task dispatch
+ dispatcher.start();
+ dispatcher.dispatchTask(task, 0);
+
+ // Wait up to 1 second for the dispatch attempt to complete (handles async execution)
+ boolean dispatched = dispatchCalled.await(1000, TimeUnit.MILLISECONDS);
+ Assertions.assertTrue(dispatched, "Expected dispatch() to be called within 1 second");
+
+ // Then: Verify NO failure events are published because timeout has NOT been exceeded
+ WorkflowEventBus eventBus = task.getWorkflowEventBus();
+ verify(eventBus, never()).publish(any(TaskFailedLifecycleEvent.class));
+ }
+
+ private ITaskExecutionRunnable mockTaskExecutionRunnableWithFirstDispatchTime(long firstDispatchTime) {
+ ITaskExecutionRunnable taskExecutionRunnable = mock(ITaskExecutionRunnable.class);
+ TaskInstance taskInstance = mock(TaskInstance.class);
+ WorkflowInstance workflowInstance = mock(WorkflowInstance.class);
+ WorkflowEventBus eventBus = mock(WorkflowEventBus.class);
+
+ TaskExecutionContext context = mock(TaskExecutionContext.class);
+ when(context.getFirstDispatchTime()).thenReturn(firstDispatchTime);
+
+ when(taskExecutionRunnable.getTaskInstance()).thenReturn(taskInstance);
+ when(taskExecutionRunnable.getWorkflowInstance()).thenReturn(workflowInstance);
+ when(taskExecutionRunnable.getWorkflowEventBus()).thenReturn(eventBus);
+ when(taskExecutionRunnable.getId()).thenReturn(ThreadLocalRandom.current().nextInt(1000, 9999));
+ when(taskExecutionRunnable.getTaskExecutionContext()).thenReturn(context);
+
+ return taskExecutionRunnable;
+ }
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index a2f0331f3483..dffd6e6c240a 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -33,6 +33,7 @@
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
+import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
@@ -1435,4 +1436,92 @@ void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runF
});
masterContainer.assertAllResourceReleased();
}
+
+ @Test
+ @DisplayName("Test start a workflow whose task specifies a non-existent worker group when dispatch timeout is enabled")
+ public void testStartWorkflow_with_workerGroupNotFoundAndTimeoutEnabled() {
+ // Enable dispatch timeout to ensure tasks fail fast if worker group is missing
+ TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
+ taskDispatchPolicy.setDispatchTimeoutFailedEnabled(true);
+ taskDispatchPolicy.setMaxTaskDispatchDuration(Duration.ofSeconds(10));
+ this.masterConfig.setTaskDispatchPolicy(taskDispatchPolicy);
+
+ final String yaml = "/it/start/workflow_with_worker_group_not_found.yaml";
+ final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ // Observe the task over a reasonable period (e.g., 20 seconds)
+ // It should reach a fail state because:
+ // - workerGroup "workerGroupNotFound" does not exist
+ // - and timeout detection is ON → fallback failure mechanism
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
+ .isEqualTo(WorkflowExecutionStatus.FAILURE));
+
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(taskInstance -> {
+ assertThat(taskInstance.getName()).isEqualTo("A");
+ assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
+ @Test
+ @DisplayName("Task with non-existent worker group remains running indefinitely when dispatch timeout is disabled")
+ public void testTaskStaysRunning_whenWorkerGroupNotFoundAndTimeoutDisabled() {
+ // Disable dispatch timeout: system will NOT auto-fail tasks that cannot be dispatched
+ TaskDispatchPolicy policy = new TaskDispatchPolicy();
+ policy.setDispatchTimeoutFailedEnabled(false);
+ this.masterConfig.setTaskDispatchPolicy(policy);
+
+ final String yaml = "/it/start/workflow_with_worker_group_not_found.yaml";
+ final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(new RunWorkflowCommandParam())
+ .build();
+
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ // Observe the task over a reasonable period (e.g., 20 seconds)
+ // It should NEVER reach a terminal state because:
+ // - workerGroup "workerGroupNotFound" does not exist
+ // - and timeout detection is OFF → no fallback failure mechanism
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ Assertions
+ .assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
+ .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION));
+
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(1)
+ .anySatisfy(taskInstance -> {
+ assertThat(taskInstance.getName()).isEqualTo("A");
+ assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.RUNNING_EXECUTION);
+ });
+ });
+
+ masterContainer.assertAllResourceReleased();
+ }
+
}
diff --git a/dolphinscheduler-master/src/test/resources/application.yaml b/dolphinscheduler-master/src/test/resources/application.yaml
index 3aca1e3142eb..b03b815a5b8a 100644
--- a/dolphinscheduler-master/src/test/resources/application.yaml
+++ b/dolphinscheduler-master/src/test/resources/application.yaml
@@ -73,6 +73,11 @@ master:
cpu-usage-weight: 30
task-thread-pool-usage-weight: 30
worker-group-refresh-interval: 5m
+ # Task dispatch timeout check (currently disabled).
+ # When enabled, tasks not dispatched within this duration are marked as failed.
+ task-dispatch-policy:
+ dispatch-timeout-failed-enabled: false
+ max-task-dispatch-duration: 5m
command-fetch-strategy:
type: ID_SLOT_BASED
config:
diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml
new file mode 100644
index 000000000000..a2d5b3240502
--- /dev/null
+++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_worker_group_not_found.yaml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_worker_group_not_found
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ userId: 1
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
+ workerGroup: workerGroupNotFound
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ taskExecuteType: BATCH
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index a8f46435c1bb..e0fc55922640 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -126,6 +126,11 @@ public class TaskExecutionContext implements Serializable {
private boolean failover;
+ /**
+ * Timestamp (ms) when the task was first dispatched.
+ */
+ private final long firstDispatchTime = System.currentTimeMillis();
+
public int increaseDispatchFailTimes() {
return ++dispatchFailTimes;
}