[DSIP-95][API] Complete the functionality of using dependencies in the complement data#18003
[DSIP-95][API] Complete the functionality of using dependencies in the complement data#18003det101 wants to merge 35 commits intoapache:devfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements downstream workflow triggering for complement/backfill runs in the API layer, adding support for “trigger dependent workflows” behavior and accompanying unit tests.
Changes:
- Implemented
doBackfillDependentWorkflowto fetch downstream workflow definitions and trigger backfill runs for them. - Added visited-code tracking intended to prevent self/cyclic triggering and duplicate downstream triggers.
- Added
BackfillWorkflowExecutorDelegateTestwith basic scenarios for downstream triggering and filtering.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java | Adds dependent workflow backfill triggering logic and wiring for lineage + workflow definition lookup. |
| dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java | Adds unit tests for the new dependent backfill triggering logic (single-hop scenarios). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Outdated
Show resolved
Hide resolved
.../org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Outdated
Show resolved
Hide resolved
SbloodyS
left a comment
There was a problem hiding this comment.
You didn't fill in the content according to the PR template, please fix it.
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
.../org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Show resolved
Hide resolved
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Outdated
Show resolved
Hide resolved
ca11eb6 to
1841eaf
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java:119
- In parallel mode,
expectedParallelismNumbercan be 0 (the validator only rejects values < 0). If it is 0,splitDateTime(listDate, expectedParallelismNumber)will divide by zero and throwArithmeticException. Treat 0 the same as null (default tolistDate.size()), or explicitly guard against<= 0before callingsplitDateTime.
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
Integer expectedParallelismNumber = backfillParams.getExpectedParallelismNumber();
List<ZonedDateTime> listDate = backfillParams.getBackfillDateList();
if (expectedParallelismNumber != null) {
expectedParallelismNumber = Math.min(listDate.size(), expectedParallelismNumber);
} else {
expectedParallelismNumber = listDate.size();
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Outdated
Show resolved
Hide resolved
| public void testDoParallelBackfillWorkflow_ShouldIsolateVisitedCodesAcrossChunks() { | ||
| long upstreamCode = 500L; | ||
| WorkflowDefinition upstreamWorkflow = | ||
| WorkflowDefinition.builder().code(upstreamCode).releaseState(ReleaseState.ONLINE).build(); | ||
| List<ZonedDateTime> dates = Arrays.asList( | ||
| ZonedDateTime.parse("2026-02-01T00:00:00Z"), | ||
| ZonedDateTime.parse("2026-02-02T00:00:00Z"), | ||
| ZonedDateTime.parse("2026-02-03T00:00:00Z")); | ||
| BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() | ||
| .runMode(RunMode.RUN_MODE_PARALLEL) | ||
| .backfillDateList(dates) | ||
| .expectedParallelismNumber(2) | ||
| .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) | ||
| .allLevelDependent(true) | ||
| .executionOrder(ExecutionOrder.ASC_ORDER) | ||
| .build(); | ||
| BackfillWorkflowDTO dto = BackfillWorkflowDTO.builder() | ||
| .workflowDefinition(upstreamWorkflow) | ||
| .backfillParams(params) | ||
| .build(); | ||
| Set<Long> baseVisitedCodes = new HashSet<>(Collections.singleton(upstreamCode)); | ||
| List<Set<Long>> visitedSnapshotPerChunk = new java.util.ArrayList<>(); | ||
|
|
||
| doAnswer(invocation -> { | ||
| Set<Long> chunkVisited = invocation.getArgument(2); | ||
| visitedSnapshotPerChunk.add(new HashSet<>(chunkVisited)); | ||
| chunkVisited.add(9000L + visitedSnapshotPerChunk.size()); | ||
| return null; | ||
| }).when(backfillWorkflowExecutorDelegate).doBackfillDependentWorkflowForTesting(any(), any(), any()); | ||
|
|
||
| List<Integer> result = backfillWorkflowExecutorDelegate.executeWithVisitedCodes(dto, baseVisitedCodes); | ||
|
|
There was a problem hiding this comment.
This test calls executeWithVisitedCodes, which will run the real doBackfillWorkflow and attempt to use registryClient / Clients.withService(IWorkflowControlClient) to contact a master. Since neither is mocked/stubbed in this test, it will fail with NPE or a ServiceException before exercising the visited-codes isolation assertions. Consider refactoring to unit-test the chunk visited-code cloning without invoking the master trigger, or add a test seam/mocking for the backfill trigger step.
.../org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Outdated
Show resolved
Hide resolved
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Show resolved
Hide resolved
SbloodyS
left a comment
There was a problem hiding this comment.
Please fill in the content according to the PR description template. Many parts of your code are written by ai, but there is no statement in PR.
done |
|
@caishunfeng Please help review |
@ruanwenjun Could you confirm whether “all downstream” here means only immediate dependents (same as today, just refactored), or all reachable downstream workflows (transitive closure)? I can go either way once I know which you want. |
Split downstream discovery from execution, add transitive resolution coverage, and keep parallel backfill triggering downstream dates by isolating visited state per chunk. Made-with: Cursor
I split the backfill dependent logic into resolve downstream + trigger downstream, and fixed the parallel case so downstream workflows no longer miss dates across chunks. @ruanwenjun |
| boolean filterOfflineWorkflow) { | ||
|
|
||
| Set<Long> resultCodes = new LinkedHashSet<>(); | ||
| Set<Long> expandedCodes = new HashSet<>(); |
There was a problem hiding this comment.
| Set<Long> expandedCodes = new HashSet<>(); | |
| Set<Long> visitedWorkflowCodes = new HashSet<>(); |
|


Was this PR generated or assisted by AI?
yes. The UT portion is generated by AI, Main functional code AI assisted
Purpose of the pull request
close #17748
Brief change log
The function of supplementing data supports dependency relationships: During the process of supplementing data, it can identify the dependency relationships of workflows and recursively pull up downstream dependent workflows
Verify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md