Skip to content

Commit 1ddd39e

Browse files
committed
execute child taks in run_task_matrix in expected order
fixes
1 parent 5405a8a commit 1ddd39e

File tree

7 files changed

+58
-29
lines changed

7 files changed

+58
-29
lines changed

pkg/coordinator/tasks/run_task_matrix/task.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,25 +127,34 @@ func (t *Task) Execute(ctx context.Context) error {
127127

128128
t.taskCtx = taskCtx
129129

130-
var taskWaitChan chan bool
131-
if !t.config.RunConcurrent {
132-
taskWaitChan = make(chan bool, 1)
133-
}
130+
var currentTaskWaitChan, previousTaskWaitChan chan bool
134131

135132
// start child tasks
136133
for i := range t.tasks {
137134
taskWaitGroup.Add(1)
138135

136+
if !t.config.RunConcurrent {
137+
previousTaskWaitChan = currentTaskWaitChan
138+
currentTaskWaitChan = make(chan bool)
139+
}
140+
139141
t.taskIdxMap[t.tasks[i]] = i
140142

141-
go func(i int) {
143+
go func(i int, taskWaitChan, prevTaskWaitChan chan bool) {
142144
defer taskWaitGroup.Done()
143145

144-
if taskWaitChan != nil {
145-
taskWaitChan <- true
146-
defer func() {
147-
<-taskWaitChan
148-
}()
146+
if !t.config.RunConcurrent {
147+
if prevTaskWaitChan != nil {
148+
// wait for previous task to be executed
149+
select {
150+
case <-prevTaskWaitChan:
151+
case <-ctx.Done():
152+
return
153+
}
154+
}
155+
156+
// allow next task to run once this finishes
157+
defer close(taskWaitChan)
149158
}
150159

151160
task := t.tasks[i]
@@ -158,7 +167,7 @@ func (t *Task) Execute(ctx context.Context) error {
158167

159168
//nolint:errcheck // ignore
160169
t.ctx.Scheduler.ExecuteTask(taskCtx, task, t.watchChildTask)
161-
}(i)
170+
}(i, currentTaskWaitChan, previousTaskWaitChan)
162171
}
163172

164173
// watch result updates

pkg/coordinator/tasks/run_task_options/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ The `run_task_options` task is designed to execute a single task with configurab
88
- **`task`**:\
99
The task to be executed. This is defined following the standard task definition format.
1010

11+
- **`propagateResult`**:\
12+
This setting controls how the result of the child task influences the result of the `run_task_options` task. If set to `true`, any change in the result of the child task (success or failure) is immediately reflected in the result of the parent `run_task_options` task. If `false`, the child task's result is only propagated to the parent task after the child task has completed its execution.
13+
1114
- **`exitOnResult`**:\
1215
If set to `true`, the task will cancel the child task as soon as it sets a result, whether it is "success" or "failure." This option is useful for scenarios where immediate response to the child task's result is necessary.
1316

@@ -37,6 +40,7 @@ Default settings for the `run_task_options` task:
3740
- name: run_task_options
3841
config:
3942
task: null
43+
propagateResult: false
4044
exitOnResult: false
4145
invertResult: false
4246
expectFailure: false

pkg/coordinator/tasks/run_task_options/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
type Config struct {
1010
Task *helper.RawMessage `yaml:"task" json:"tasks"`
11+
PropagateResult bool `yaml:"propagateResult" json:"propagateResult"`
1112
ExitOnResult bool `yaml:"exitOnResult" json:"exitOnResult"`
1213
InvertResult bool `yaml:"invertResult" json:"invertResult"`
1314
ExpectFailure bool `yaml:"expectFailure" json:"expectFailure"`

pkg/coordinator/tasks/run_task_options/task.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ var (
2020
)
2121

2222
type Task struct {
23-
ctx *types.TaskContext
24-
options *types.TaskOptions
25-
config Config
26-
logger logrus.FieldLogger
27-
task types.Task
23+
ctx *types.TaskContext
24+
options *types.TaskOptions
25+
config Config
26+
logger logrus.FieldLogger
27+
task types.Task
28+
taskResult types.TaskResult
2829
}
2930

3031
func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) {
@@ -86,6 +87,8 @@ func (t *Task) LoadConfig() error {
8687
}
8788

8889
func (t *Task) Execute(ctx context.Context) error {
90+
var taskErr error
91+
8992
retryCount := uint(0)
9093

9194
for {
@@ -106,37 +109,37 @@ func (t *Task) Execute(ctx context.Context) error {
106109
}
107110

108111
// execute task
109-
err = t.ctx.Scheduler.ExecuteTask(ctx, t.task, func(ctx context.Context, cancelFn context.CancelFunc, _ types.Task) {
112+
taskErr = t.ctx.Scheduler.ExecuteTask(ctx, t.task, func(ctx context.Context, cancelFn context.CancelFunc, _ types.Task) {
110113
t.watchTaskResult(ctx, cancelFn)
111114
})
112115

113116
switch {
114117
case t.config.RetryOnFailure && retryCount < t.config.MaxRetryCount:
115-
if err != nil {
118+
if taskErr != nil {
116119
retryCount++
117120

118-
t.logger.Warnf("child task failed: %w (retrying)", err)
121+
t.logger.Warnf("child task failed: %w (retrying)", taskErr)
119122

120123
continue
121124
}
122125
case t.config.ExpectFailure:
123-
if err == nil {
126+
if taskErr == nil {
124127
return fmt.Errorf("child task succeeded, but should have failed")
125128
}
126129
case t.config.IgnoreFailure:
127-
if err != nil {
128-
t.logger.Warnf("child task failed: %w", err)
130+
if taskErr != nil {
131+
t.logger.Warnf("child task failed: %w", taskErr)
129132
}
130133
default:
131-
if err != nil {
132-
return fmt.Errorf("child task failed: %w", err)
134+
if taskErr != nil {
135+
return fmt.Errorf("child task failed: %w", taskErr)
133136
}
134137
}
135138

136139
break
137140
}
138141

139-
return nil
142+
return taskErr
140143
}
141144

142145
func (t *Task) watchTaskResult(ctx context.Context, cancelFn context.CancelFunc) {
@@ -173,7 +176,9 @@ func (t *Task) watchTaskResult(ctx context.Context, cancelFn context.CancelFunc)
173176
}
174177
}
175178

176-
t.ctx.SetResult(taskResult)
179+
if t.config.PropagateResult {
180+
t.ctx.SetResult(taskResult)
181+
}
177182

178183
if t.config.ExitOnResult {
179184
cancelFn()

pkg/coordinator/tasks/run_tasks/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
## `run_tasks` Task
22

33
### Description
4-
The `run_tasks` task is designed for executing a series of tasks sequentially, ensuring each task is completed before starting the next. This setup is essential for tests requiring a specific order of task execution.
4+
The `run_tasks` task executes a series of specified tasks sequentially. This is particularly useful for scenarios where tasks need to be performed in a specific order, with the outcome of one potentially affecting the subsequent ones.
55

66
#### Task Behavior
77
- The task starts the child tasks one after the other in the order they are listed.
@@ -15,6 +15,9 @@ An important aspect of this task is that it cancels tasks once they return a res
1515
- **`tasks`**:\
1616
An array of tasks to be executed one after the other. Each task is defined according to the standard task structure.
1717

18+
- **`stopChildOnResult`**:\
19+
If set to `true`, each child task in the sequence is stopped as soon as it sets a result (either "success" or "failure"). This ensures that once a task has reached a outcome, it does not continue to run unnecessarily, allowing the next task in the sequence to commence.
20+
1821
- **`expectFailure`**:\
1922
If set to `true`, this option expects each task in the sequence to fail. The task sequence stops with a "failure" result if any task does not fail as expected.
2023

@@ -29,6 +32,7 @@ Default settings for the `run_tasks` task:
2932
- name: run_tasks
3033
config:
3134
tasks: []
35+
stopChildOnResult: true
3236
expectFailure: false
3337
continueOnFailure: false
3438
```

pkg/coordinator/tasks/run_tasks/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ import (
88

99
type Config struct {
1010
Tasks []helper.RawMessage `yaml:"tasks" json:"tasks"`
11+
StopChildOnResult bool `yaml:"stopChildOnResult" json:"stopChildOnResult"`
1112
ExpectFailure bool `yaml:"expectFailure" json:"expectFailure"`
1213
ContinueOnFailure bool `yaml:"continueOnFailure" json:"continueOnFailure"`
1314
}
1415

1516
func DefaultConfig() Config {
1617
return Config{
17-
Tasks: []helper.RawMessage{},
18+
Tasks: []helper.RawMessage{},
19+
StopChildOnResult: true,
1820
}
1921
}
2022

pkg/coordinator/tasks/run_tasks/task.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@ func (t *Task) LoadConfig() error {
105105

106106
func (t *Task) Execute(ctx context.Context) error {
107107
for i, task := range t.tasks {
108-
err := t.ctx.Scheduler.ExecuteTask(ctx, task, t.ctx.Scheduler.WatchTaskPass)
108+
err := t.ctx.Scheduler.ExecuteTask(ctx, task, func(ctx context.Context, cancelFn context.CancelFunc, task types.Task) {
109+
if t.config.StopChildOnResult {
110+
t.ctx.Scheduler.WatchTaskPass(ctx, cancelFn, task)
111+
}
112+
})
109113

110114
switch {
111115
case t.config.ExpectFailure:

0 commit comments

Comments
 (0)