From 14526b24b49379da68b4cfd57e9e3f4d68a9acef Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Thu, 14 Aug 2025 11:17:35 +0200 Subject: [PATCH 01/13] FEATURE: definitions for partitioned waitlists (no implementation yet) --- definition/loader_test.go | 25 ++++ definition/pipelines.go | 34 +++++- docs/2025_08_14_partitioned_waitlist.md | 111 ++++++++++++++++++ test/fixtures/partitioned_waitlist.yml | 11 ++ ...itioned_waitlist_err_0_partition_limit.yml | 10 ++ ...tioned_waitlist_err_no_partition_limit.yml | 10 ++ .../partitioned_waitlist_err_queue_limit.yml | 10 ++ .../waitlist_err_partitioned_queue_limit.yml | 11 ++ 8 files changed, 219 insertions(+), 3 deletions(-) create mode 100644 docs/2025_08_14_partitioned_waitlist.md create mode 100644 test/fixtures/partitioned_waitlist.yml create mode 100644 test/fixtures/partitioned_waitlist_err_0_partition_limit.yml create mode 100644 test/fixtures/partitioned_waitlist_err_no_partition_limit.yml create mode 100644 test/fixtures/partitioned_waitlist_err_queue_limit.yml create mode 100644 test/fixtures/waitlist_err_partitioned_queue_limit.yml diff --git a/definition/loader_test.go b/definition/loader_test.go index 30e1ae9..02f5016 100644 --- a/definition/loader_test.go +++ b/definition/loader_test.go @@ -23,3 +23,28 @@ func TestLoadRecursively_WithMissingDependency(t *testing.T) { _, err := LoadRecursively("../test/fixtures/missingDep.yml") require.EqualError(t, err, `loading ../test/fixtures/missingDep.yml: invalid pipeline definition "test_it": missing task "not_existing" referenced in depends_on of task "test"`) } + +func TestPartitionedWaitlist_OK(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/partitioned_waitlist.yml") + require.NoError(t, err) +} + +func TestPartitionedWaitlist_err_0_partition_limit(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_0_partition_limit.yml") + require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_0_partition_limit.yml: invalid pipeline definition "test_it": queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace`) +} + +func TestPartitionedWaitlist_err_no_partition_limit(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_no_partition_limit.yml") + require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_no_partition_limit.yml: invalid pipeline definition "test_it": queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace`) +} + +func TestPartitionedWaitlist_err_queue_limit(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_queue_limit.yml") + require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_queue_limit.yml: invalid pipeline definition "test_it": queue_limit is not allowed if queue_strategy=partitioned_replace, use queue_partition_limit instead`) +} + +func TestWaitlist_err_partitioned_queue_limit(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/waitlist_err_partitioned_queue_limit.yml") + require.EqualError(t, err, `loading ../test/fixtures/waitlist_err_partitioned_queue_limit.yml: invalid pipeline definition "test_it": queue_partition_limit is not allowed if queue_strategy=append|replace, use queue_limit instead`) +} diff --git a/definition/pipelines.go b/definition/pipelines.go index e5d3114..012393c 100644 --- a/definition/pipelines.go +++ b/definition/pipelines.go @@ -52,10 +52,15 @@ type OnErrorTaskDef struct { type PipelineDef struct { // Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1) Concurrency int `yaml:"concurrency"` - // QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil) + // QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil). Only allowed with queue_strategy=append|replace, not with partitioned_replace (there, use queue_partition_limit instead) QueueLimit *int `yaml:"queue_limit"` + + // QueuePartitionLimit is the number of slots for queueing jobs per partition, if queue_strategy=partitioned_replace is used. + QueuePartitionLimit *int `yaml:"queue_partition_limit"` + // QueueStrategy to use when adding jobs to the queue (defaults to append) QueueStrategy QueueStrategy `yaml:"queue_strategy"` + // StartDelay will delay the start of a job if the value is greater than zero (defaults to 0) StartDelay time.Duration `yaml:"start_delay"` @@ -100,6 +105,19 @@ func (d PipelineDef) validate() error { return errors.New("start_delay needs queue_limit > 0") } + if d.QueueStrategy == QueueStrategyPartitionedReplace { + if d.QueueLimit != nil { + return errors.New("queue_limit is not allowed if queue_strategy=partitioned_replace, use queue_partition_limit instead") + } + if d.QueuePartitionLimit == nil || *d.QueuePartitionLimit < 1 { + return errors.New("queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace") + } + } else { + if d.QueuePartitionLimit != nil { + return errors.New("queue_partition_limit is not allowed if queue_strategy=append|replace, use queue_limit instead") + } + } + for taskName, taskDef := range d.Tasks { for _, dependentTask := range taskDef.DependsOn { _, exists := d.Tasks[dependentTask] @@ -164,13 +182,21 @@ func (d PipelineDef) Equals(otherDef PipelineDef) bool { return true } +// QueueStrategy defines the behavior when jobs wait (=are queued) before pipeline execution. type QueueStrategy int const ( - // QueueStrategyAppend appends jobs to the queue until queue limit is reached + // QueueStrategyAppend appends jobs to the queue until the queue limit is reached (FIFO) QueueStrategyAppend QueueStrategy = 0 - // QueueStrategyReplace replaces pending jobs (with same variables) instead of appending to the queue + + // QueueStrategyReplace replaces the **LAST** pending job if the queue limit is reached. If the queue is not yet full, the job is appended. + // NOTE: if using queue_limit=1 + replace, this can lead to starvation if rapidly enqueuing jobs. If using queue_limit >= 2, this cannot happen anymore. + // (see 2025_08_14_partitioned_waitlist.md for detailed description) QueueStrategyReplace QueueStrategy = 1 + + // QueueStrategyPartitionedReplace implements the "partitioned waitlist" strategy, as explained in 2025_08_14_partitioned_waitlist.md. + // -> it replaces the **LAST** pending job of a given partition, if the partition is full (=queue_partition_limit). + QueueStrategyPartitionedReplace QueueStrategy = 2 ) func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -185,6 +211,8 @@ func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error { *s = QueueStrategyAppend case "replace": *s = QueueStrategyReplace + case "partitioned_replace": + *s = QueueStrategyPartitionedReplace default: return errors.Errorf("unknown queue strategy: %q", strategyName) } diff --git a/docs/2025_08_14_partitioned_waitlist.md b/docs/2025_08_14_partitioned_waitlist.md new file mode 100644 index 0000000..e8c34da --- /dev/null +++ b/docs/2025_08_14_partitioned_waitlist.md @@ -0,0 +1,111 @@ +# FEATURE: Partitioned Waitlists + + +# Problem description + +## Current state + +> **FROM README** +> +> By default, if you limit concurrency, and the limit is exceeded, further jobs are added to the +> waitlist of the pipeline. +> +> However, you have some options to configure this as well: +> +> The waitlist can have a maximum size, denoted by `queue_limit`: +> +> ```yaml +> pipelines: +> do_something: +> queue_limit: 1 +> concurrency: 1 +> tasks: # as usual +> ``` +> +> To deactivate the queuing altogether, set `queue_limit: 0`. +> +> Now, if the queue is limited, an error occurs when it is full and you try to add a new job. +> +> Alternatively, you can also set `queue_strategy: replace` to replace the last job in the +> queue by the newly added one: + +## Current state -> Problem "Starvation" with "Debounce" ?? + +> Queues a run of the job and only starts it after 10 seconds have passed (if no other run was triggered which replaced the queued job) + +``` +--> time (1 char = 1 s) + +# 1 waitlist slot, delay 10 s + +a____ _____ <-- here the job A is queued + A <-- here job A starts + + +# 1 waitlist slot, delay 10 s +# STARVATION CAN HAPPEN +a____ b____ c_____ _____ + C + + +# 2 waitlist slots, delay 10 s +# !!! PREVENTS STARVATION !!! +a____ b__c_ ______ _____ +[a_] [ab] A <-- here, a starts. NO STARVATION + [ac] + [c_] +``` + +SUMMARY: +- Starvation can only happen if waitlist size=1; if waitlist size=2 (or bigger) cannot happen because always the LAST job gets replaced. +- We cannot debounce immediately; so we ALWAYS wait at least for start_delay. (not a problem for us right now). + +## problem description + +In a project, we have 50 Neos instances, which use prunner for background tasks (some are short, some are long). + +Currently, we have 4 pipelines globally +- concurrency 1 +- concurrency 8 +- concurrency 4 +- concurrency 4 (import job) + - -> needs the global concurrency to limit the needed server resources + +Now, a new pipeline should be added for "irregular import jobs" triggered by webhooks. +- can happen very quickly after each other +- -> Pipeline should start after a certain amount of time (newer should override older pipelines) +- StartDelay combined with QueueStrategy "Replace" + - `waitList[len(waitList)-1] = job` -> *LAST* Element is replaced of wait list. +- -> GLOBAL replacement does not work, because each job has arguments (which are relevant, i.e. tenant ID). + +We still want to have a GLOBAL CONCURRENCY LIMIT (per pipeline), but a WAITLIST per instance. + + +## Solution Idea: + +we want to be able to SEGMENT the waitlist into different partitions. The `queue_strategy` and `queue_limit` should be per partition. +`concurrency` stays per pipeline (as before) + +``` +**LOGICAL VIEW** (Idea) +┌──────────────────────┐ ┌──────────────────────────────────────────────────┐ +│ Waitlist Instance 1 │ │ Pipeline (with concurrency 2) │ +├──────────────────────┤ │ │ +│ Waitlist Instance 2 │ -> ├──────────────────────────────────────────────────┤ +├──────────────────────┤ │ │ +│ Waitlist Instance 3 │ │ │ +└──────────────────────┘ └──────────────────────────────────────────────────┘ + + if job is *delayed*, + stays in wait list + for this duration + +``` + +Technically, we still have a SINGLE Wait List per pipeline, but the jobs can be partitioned by `waitlist_partition_id`. + +-> In this case, the `replace` strategy will replace the last element of the given partition. + +-> we create a new queueStrategy for the partitioned waitlist: `partitioned_replace` + +If we partition the waitlist, the waitlist can grow up to queue_limit * number of partitions. \ No newline at end of file diff --git a/test/fixtures/partitioned_waitlist.yml b/test/fixtures/partitioned_waitlist.yml new file mode 100644 index 0000000..fef5537 --- /dev/null +++ b/test/fixtures/partitioned_waitlist.yml @@ -0,0 +1,11 @@ +pipelines: + test_it: + queue_strategy: partitioned_replace + # keep two tasks in the queue PER PARTITION. + queue_partition_limit: 2 + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" diff --git a/test/fixtures/partitioned_waitlist_err_0_partition_limit.yml b/test/fixtures/partitioned_waitlist_err_0_partition_limit.yml new file mode 100644 index 0000000..fd8e87c --- /dev/null +++ b/test/fixtures/partitioned_waitlist_err_0_partition_limit.yml @@ -0,0 +1,10 @@ +pipelines: + test_it: + queue_strategy: partitioned_replace + queue_partition_limit: 0 # !!! ERROR + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" diff --git a/test/fixtures/partitioned_waitlist_err_no_partition_limit.yml b/test/fixtures/partitioned_waitlist_err_no_partition_limit.yml new file mode 100644 index 0000000..46fd4d7 --- /dev/null +++ b/test/fixtures/partitioned_waitlist_err_no_partition_limit.yml @@ -0,0 +1,10 @@ +pipelines: + test_it: + queue_strategy: partitioned_replace + # MISSING: queue_partition_limit: 2 + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" diff --git a/test/fixtures/partitioned_waitlist_err_queue_limit.yml b/test/fixtures/partitioned_waitlist_err_queue_limit.yml new file mode 100644 index 0000000..4bce093 --- /dev/null +++ b/test/fixtures/partitioned_waitlist_err_queue_limit.yml @@ -0,0 +1,10 @@ +pipelines: + test_it: + queue_strategy: partitioned_replace + queue_limit: 2 # not allowed in partitioned_replace + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" diff --git a/test/fixtures/waitlist_err_partitioned_queue_limit.yml b/test/fixtures/waitlist_err_partitioned_queue_limit.yml new file mode 100644 index 0000000..8f35b8a --- /dev/null +++ b/test/fixtures/waitlist_err_partitioned_queue_limit.yml @@ -0,0 +1,11 @@ +pipelines: + test_it: + queue_strategy: replace + # error: not allowed for queue_strategy: replace + queue_partition_limit: 2 + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" From 93772d7d1091f84b1b50e45446ab54b140803538 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Thu, 14 Aug 2025 11:47:43 +0200 Subject: [PATCH 02/13] TASK: improve code docs around scheduleAction --- prunner.go | 12 ++++++++++++ prunner_test.go | 3 +++ 2 files changed, 15 insertions(+) diff --git a/prunner.go b/prunner.go index 54531ab..666dae3 100644 --- a/prunner.go +++ b/prunner.go @@ -193,11 +193,22 @@ type jobTasks []jobTask type scheduleAction int const ( + // scheduleActionStart directly starts a job via PipelineRunner.startJob() scheduleActionStart scheduleAction = iota + + // scheduleActionQueue enqueues the job to the pipeline's waitlist scheduleActionQueue + + // TODO: never used, remove scheduleActionQueueDelay + + // scheduleActionReplace: replace the last job on the waitlist with this one scheduleActionReplace + + // scheduleActionNoQueue: error case, if queueing is not allowed (queue_limit=0) and a job is running scheduleActionNoQueue + + // scheduleActionQueueFull: error case, if queue_limit is reached (with append queue strategy) scheduleActionQueueFull ) @@ -365,6 +376,7 @@ func buildPipelineGraph(id uuid.UUID, tasks jobTasks, vars map[string]interface{ return g, nil } +// ReadJob loads the job identified by id and calls process() on it synchronously. All protected by the global r.mx mutex. func (r *PipelineRunner) ReadJob(id uuid.UUID, process func(j *PipelineJob)) error { r.mx.RLock() defer r.mx.RUnlock() diff --git a/prunner_test.go b/prunner_test.go index 83d2c89..0c72314 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -958,6 +958,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi job2, err := pRunner.ScheduleAsync("jobWithStartDelay", ScheduleOpts{}) require.NoError(t, err) + // original "job" should be canceled test.WaitForCondition(t, func() bool { var canceled bool _ = pRunner.ReadJob(jobID, func(j *PipelineJob) { @@ -966,6 +967,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi return canceled }, 1*time.Millisecond, "job is canceled") + // job2 is scheduled job2ID := job2.ID test.WaitForCondition(t, func() bool { var started bool @@ -975,6 +977,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi return !started }, 1*time.Millisecond, "job2 is not started (queued)") + // job2 is started after ~50 ms (which the test does not check for ^^) test.WaitForCondition(t, func() bool { var started bool _ = pRunner.ReadJob(job2ID, func(j *PipelineJob) { From 981cb35efd9bae884ef8160fede0acbb97665199 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Thu, 14 Aug 2025 11:48:49 +0200 Subject: [PATCH 03/13] REFACTOR: remove dead scheduleActionQueueDelay (unused, never written) --- prunner.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/prunner.go b/prunner.go index 666dae3..e36ad77 100644 --- a/prunner.go +++ b/prunner.go @@ -199,9 +199,6 @@ const ( // scheduleActionQueue enqueues the job to the pipeline's waitlist scheduleActionQueue - // TODO: never used, remove - scheduleActionQueueDelay - // scheduleActionReplace: replace the last job on the waitlist with this one scheduleActionReplace @@ -804,8 +801,6 @@ func (r *PipelineRunner) isSchedulable(pipeline string) bool { fallthrough case scheduleActionStart: return true - case scheduleActionQueueDelay: - return true } return false } From 69a14c2ccdf060c24b6c2bbc638a2834bd4a1300 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Thu, 14 Aug 2025 11:56:23 +0200 Subject: [PATCH 04/13] REFACTOR: rename scheduleActionErr cases --- prunner.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/prunner.go b/prunner.go index e36ad77..73d1fa6 100644 --- a/prunner.go +++ b/prunner.go @@ -202,11 +202,11 @@ const ( // scheduleActionReplace: replace the last job on the waitlist with this one scheduleActionReplace - // scheduleActionNoQueue: error case, if queueing is not allowed (queue_limit=0) and a job is running - scheduleActionNoQueue + // scheduleActionErrNoQueue: error case, if queueing is not allowed (queue_limit=0) and a job is running + scheduleActionErrNoQueue - // scheduleActionQueueFull: error case, if queue_limit is reached (with append queue strategy) - scheduleActionQueueFull + // scheduleActionErrQueueFull: error case, if queue_limit is reached (with append queue strategy) + scheduleActionErrQueueFull ) var errNoQueue = errors.New("concurrency exceeded and queueing disabled for pipeline") @@ -235,9 +235,9 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip action := r.resolveScheduleAction(pipeline, false) switch action { - case scheduleActionNoQueue: + case scheduleActionErrNoQueue: return nil, errNoQueue - case scheduleActionQueueFull: + case scheduleActionErrQueueFull: return nil, errQueueFull } @@ -766,7 +766,7 @@ func (r *PipelineRunner) resolveScheduleAction(pipeline string, ignoreStartDelay if runningJobsCount >= pipelineDef.Concurrency || (pipelineDef.StartDelay > 0 && !ignoreStartDelay) { // Check if jobs should be queued if concurrency factor is exceeded if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { - return scheduleActionNoQueue + return scheduleActionErrNoQueue } // Check if a queued job on the wait list should be replaced depending on queue strategy @@ -777,7 +777,7 @@ func (r *PipelineRunner) resolveScheduleAction(pipeline string, ignoreStartDelay // Error if there is a queue limit and the number of queued jobs exceeds the allowed queue limit if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit { - return scheduleActionQueueFull + return scheduleActionErrQueueFull } return scheduleActionQueue From 944f3b72c415b32881453e9c694543efdc3a7384 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Thu, 14 Aug 2025 12:06:05 +0200 Subject: [PATCH 05/13] REFACTOR: make resolveScheduleAction easier to follow (no functional changes), in preparation of upcoming bugfixes and features --- prunner.go | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/prunner.go b/prunner.go index 73d1fa6..4a9fadb 100644 --- a/prunner.go +++ b/prunner.go @@ -763,27 +763,46 @@ func (r *PipelineRunner) resolveScheduleAction(pipeline string, ignoreStartDelay // If a start delay is set, we will always queue the job, otherwise we check if the number of running jobs // exceed the maximum concurrency runningJobsCount := r.runningJobsCount(pipeline) - if runningJobsCount >= pipelineDef.Concurrency || (pipelineDef.StartDelay > 0 && !ignoreStartDelay) { - // Check if jobs should be queued if concurrency factor is exceeded + if runningJobsCount < pipelineDef.Concurrency && (pipelineDef.StartDelay == 0 || ignoreStartDelay) { + // job can be started right now, because pipeline is not running at full capacity, and there is no start delay + // (no queue handling) + return scheduleActionStart + } + + // here, we start with waitlist logic. + waitList := r.waitListByPipeline[pipeline] + + switch pipelineDef.QueueStrategy { + case definition.QueueStrategyAppend: if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { + // queue limit == 0 -> error -> NOTE: might be moved to config validation return scheduleActionErrNoQueue } + if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit { + // queue full + return scheduleActionErrQueueFull + } - // Check if a queued job on the wait list should be replaced depending on queue strategy - waitList := r.waitListByPipeline[pipeline] - if pipelineDef.QueueStrategy == definition.QueueStrategyReplace && len(waitList) > 0 { - return scheduleActionReplace + // queue not full -> append to queue + return scheduleActionQueue + + case definition.QueueStrategyReplace: + if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { + // queue limit == 0 -> error -> NOTE: might be moved to config validation + return scheduleActionErrNoQueue } - // Error if there is a queue limit and the number of queued jobs exceeds the allowed queue limit - if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit { - return scheduleActionErrQueueFull + if len(waitList) > 0 { + // queue full -> replace last element + return scheduleActionReplace } + // queue not full -> append to queue return scheduleActionQueue } - return scheduleActionStart + // TODO: THIS CASE SHOULD NEVER HAPPEN !!! + return scheduleActionErrQueueFull } func (r *PipelineRunner) resolveDequeueJobAction(job *PipelineJob) scheduleAction { From 87d979ed98631277a835979255b699787906ac7e Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Thu, 14 Aug 2025 12:07:10 +0200 Subject: [PATCH 06/13] !!! BUGFIX: properly respect queue_limit>1 for queue_strategy=replace before this change, queue_strategy=replace behaved always like queue_limit=1, leading to starvation even if this was not intended by the user. --- prunner.go | 2 +- prunner_test.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/prunner.go b/prunner.go index 4a9fadb..c26108a 100644 --- a/prunner.go +++ b/prunner.go @@ -792,7 +792,7 @@ func (r *PipelineRunner) resolveScheduleAction(pipeline string, ignoreStartDelay return scheduleActionErrNoQueue } - if len(waitList) > 0 { + if len(waitList) >= *pipelineDef.QueueLimit { // queue full -> replace last element return scheduleActionReplace } diff --git a/prunner_test.go b/prunner_test.go index 0c72314..be0c436 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -996,6 +996,93 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi }, 1*time.Millisecond, "job2 is finished") } +func TestPipelineRunner_ScheduleAsync_WithStartDelay2QueueAndReplaceWillReplaceLastJob(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "jobWithStartDelay": { + Concurrency: 1, + StartDelay: 50 * time.Millisecond, + QueueLimit: intPtr(2), // !!! IMPORTANT for this testcase + QueueStrategy: definition.QueueStrategyReplace, + Tasks: map[string]definition.TaskDef{ + "echo": { + Script: []string{"echo Test"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + require.NoError(t, defs.Validate()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := test.NewMockStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + return &test.MockRunner{ + OnRun: func(tsk *task.Task) error { + log.Debugf("Run task %s on job %s", tsk.Name, j.ID.String()) + return nil + }, + } + }, store, test.NewMockOutputStore()) + require.NoError(t, err) + + // job1 + job2 + job3 is appended + job1, err := pRunner.ScheduleAsync("jobWithStartDelay", ScheduleOpts{}) + require.NoError(t, err) + + job2, err := pRunner.ScheduleAsync("jobWithStartDelay", ScheduleOpts{}) + require.NoError(t, err) + + job3, err := pRunner.ScheduleAsync("jobWithStartDelay", ScheduleOpts{}) + require.NoError(t, err) + + // job2 should be canceled + test.WaitForCondition(t, func() bool { + var canceled bool + _ = pRunner.ReadJob(job2.ID, func(j *PipelineJob) { + canceled = j.Canceled + }) + return canceled + }, 1*time.Millisecond, "job2 was not canceled") + + // when job2 was canceled, job1 should still be not canceled, and job3 also should be in the queue. + _ = pRunner.ReadJob(job1.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + }) + _ = pRunner.ReadJob(job2.ID, func(j *PipelineJob) { + assert.True(t, j.Canceled) + }) + _ = pRunner.ReadJob(job3.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + }) + + // we wait for job1 to be finished + test.WaitForCondition(t, func() bool { + var started bool + var running bool + + _ = pRunner.ReadJob(job1.ID, func(j *PipelineJob) { + started = j.Start != nil + running = j.isRunning() + }) + return started && !running + }, 1*time.Millisecond, "job1 did not finish") + + test.WaitForCondition(t, func() bool { + var started bool + var running bool + + _ = pRunner.ReadJob(job3.ID, func(j *PipelineJob) { + started = j.Start != nil + running = j.isRunning() + }) + return started && !running + }, 1*time.Millisecond, "job3 did not finish") +} + func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunConcurrently(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{ From 56ef182430d8e822bd75777a4b901246f2def7a5 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Thu, 14 Aug 2025 14:05:53 +0200 Subject: [PATCH 07/13] REFACTOR: scheduleAction only used for initial scheduling decision, NOT anymore for re-scheduling after job delay finished, or job finished --- prunner.go | 83 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 28 deletions(-) diff --git a/prunner.go b/prunner.go index c26108a..a68c5bc 100644 --- a/prunner.go +++ b/prunner.go @@ -144,6 +144,33 @@ func (j *PipelineJob) isRunning() bool { return j.Start != nil && !j.Completed && !j.Canceled } +// canBeStarted is true if: +// - job is delayed and the delay has passed +// - job is not yet running and not canceled +// - job is not delayed +// +// Only decides for itself, without outer knowledge (e.g. if the pipeline still has free spots or not) +func (j *PipelineJob) canBeStarted() bool { + if j.StartDelay > 0 && j.startTimer != nil { + // delay still running, we can't start this job. + // (startTimer set to nil in StartDelayedJob(), after the timer has finished) + return false + } + + if j.Start != nil { + // already started, we can't start this job. + return false + } + + if j.Canceled { + // cancelled, we can't start this job. + return false + } + + // job is not delayed; or job is delayed and the delay has passed + return true +} + func (r *PipelineRunner) initScheduler(j *PipelineJob) { // For correct cancellation of tasks a single task runner and scheduler per job is used @@ -232,7 +259,7 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip return nil, errors.Errorf("pipeline %q is not defined", pipeline) } - action := r.resolveScheduleAction(pipeline, false) + action := r.resolveScheduleAction(pipeline) switch action { case scheduleActionErrNoQueue: @@ -672,29 +699,35 @@ func (r *PipelineRunner) JobCompleted(job *PipelineJob, err error) { } func (r *PipelineRunner) startJobsOnWaitList(pipeline string) { + // Pipeline currently full? (all executors are working) -> abort + pipelineDef := r.defs.Pipelines[pipeline] + runningJobsCount := r.runningJobsCount(pipeline) + freeSlots := pipelineDef.Concurrency - runningJobsCount + if freeSlots <= 0 { + // no room to start any jobs right now + return + } + // Check wait list if another job is queued waitList := r.waitListByPipeline[pipeline] - // Schedule as many jobs as are schedulable (also process if the schedule action is start delay and check individual jobs if they can be started) - for len(waitList) > 0 && r.resolveDequeueJobAction(waitList[0]) == scheduleActionStart { - queuedJob := waitList[0] - // Queued job has a start delay timer set - wait for it to fire - if queuedJob.startTimer != nil { - // TODO We need to check if we rather need to skip only this job and continue to process other jobs on the queue - break + // Schedule as many jobs as are schedulable + newWaitList := make([]*PipelineJob, 0, len(waitList)) + for _, job := range waitList { + if job.canBeStarted() && freeSlots > 0 { + freeSlots-- + r.startJob(job) + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + Debugf("Dequeue: scheduled job execution") + } else { + newWaitList = append(newWaitList, job) } - - waitList = waitList[1:] - - r.startJob(queuedJob) - - log. - WithField("component", "runner"). - WithField("pipeline", queuedJob.Pipeline). - WithField("jobID", queuedJob.ID). - Debugf("Dequeue: scheduled job execution") } - r.waitListByPipeline[pipeline] = waitList + + r.waitListByPipeline[pipeline] = newWaitList } // IterateJobs calls process for each job in a read lock. @@ -757,13 +790,13 @@ func (r *PipelineRunner) runningJobsCount(pipeline string) int { return running } -func (r *PipelineRunner) resolveScheduleAction(pipeline string, ignoreStartDelay bool) scheduleAction { +func (r *PipelineRunner) resolveScheduleAction(pipeline string) scheduleAction { pipelineDef := r.defs.Pipelines[pipeline] // If a start delay is set, we will always queue the job, otherwise we check if the number of running jobs // exceed the maximum concurrency runningJobsCount := r.runningJobsCount(pipeline) - if runningJobsCount < pipelineDef.Concurrency && (pipelineDef.StartDelay == 0 || ignoreStartDelay) { + if runningJobsCount < pipelineDef.Concurrency && pipelineDef.StartDelay == 0 { // job can be started right now, because pipeline is not running at full capacity, and there is no start delay // (no queue handling) return scheduleActionStart @@ -805,14 +838,8 @@ func (r *PipelineRunner) resolveScheduleAction(pipeline string, ignoreStartDelay return scheduleActionErrQueueFull } -func (r *PipelineRunner) resolveDequeueJobAction(job *PipelineJob) scheduleAction { - // Start the job if it had a start delay but the timer finished - ignoreStartDelay := job.StartDelay > 0 && job.startTimer == nil - return r.resolveScheduleAction(job.Pipeline, ignoreStartDelay) -} - func (r *PipelineRunner) isSchedulable(pipeline string) bool { - action := r.resolveScheduleAction(pipeline, false) + action := r.resolveScheduleAction(pipeline) switch action { case scheduleActionReplace: fallthrough From 37c871d0c639db3f38793b568df288a722bc1361 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Thu, 14 Aug 2025 15:05:32 +0200 Subject: [PATCH 08/13] WIP --- helper/slice_utils/sliceUtils.go | 11 ++ prunner.go | 237 +++++++++++++++++-------------- prunner_test.go | 87 ++++++++++++ 3 files changed, 230 insertions(+), 105 deletions(-) create mode 100644 helper/slice_utils/sliceUtils.go diff --git a/helper/slice_utils/sliceUtils.go b/helper/slice_utils/sliceUtils.go new file mode 100644 index 0000000..9331cac --- /dev/null +++ b/helper/slice_utils/sliceUtils.go @@ -0,0 +1,11 @@ +package slice_utils + +func Filter[T any](s []T, p func(i T) bool) []T { + var result []T + for _, i := range s { + if p(i) { + result = append(result, i) + } + } + return result +} diff --git a/prunner.go b/prunner.go index a68c5bc..8dfe34e 100644 --- a/prunner.go +++ b/prunner.go @@ -115,10 +115,13 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat type PipelineJob struct { ID uuid.UUID // Identifier of the pipeline (from the YAML file) - Pipeline string - Env map[string]string - Variables map[string]interface{} - StartDelay time.Duration + Pipeline string + // Identifier of the queue partition, if any (from definition) + queuePartition string + Env map[string]string + Variables map[string]interface{} + User string + StartDelay time.Duration Completed bool Canceled bool @@ -127,8 +130,7 @@ type PipelineJob struct { // Start is the actual start time of the job. Could be nil if not yet started. Start *time.Time // End is the actual end time of the job (can be nil if incomplete) - End *time.Time - User string + End *time.Time // Tasks is an in-memory representation with state of tasks, sorted by dependencies Tasks jobTasks LastError error @@ -242,6 +244,84 @@ var ErrJobNotFound = errors.New("job not found") var errJobAlreadyCompleted = errors.New("job is already completed") var ErrShuttingDown = errors.New("runner is shutting down") +type QueueStrategyImpl interface { + kannEntgegengenommenWerden(def definition.PipelineDef, waitList []*PipelineJob) error + + modifyWaitList(def definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob +} + +type QueueStrategyAppendImpl struct { +} + +func (q QueueStrategyAppendImpl) kannEntgegengenommenWerden(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { + if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { + // queue limit == 0 -> error -> NOTE: might be moved to config validation + return errNoQueue + } + if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit { + // queue full + return errQueueFull + } + return nil +} + +func (q QueueStrategyAppendImpl) modifyWaitList(def definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + WithField("variables", job.Variables). + Debugf("Queued: added job to wait list") + return append(previousWaitList, job) +} + +type QueueStrategyReplaceImpl struct { +} + +func (q QueueStrategyReplaceImpl) kannEntgegengenommenWerden(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { + if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { + // queue limit == 0 -> error -> NOTE: might be moved to config validation + return errNoQueue + } + return nil +} + +func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { + if len(previousWaitList) <= *pipelineDef.QueueLimit { + // waitlist nicht voll -> append + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + WithField("variables", job.Variables). + Debugf("Queued: added job to wait list") + return append(previousWaitList, job) + } else { + // waitlist voll -> replace + previousJob := previousWaitList[len(previousWaitList)-1] + previousJob.Canceled = true + if previousJob.startTimer != nil { + log. + WithField("previousJobID", previousJob.ID). + Debugf("Stopped start timer of previous job") + // Stop timer and unset reference for clean up + previousJob.startTimer.Stop() + previousJob.startTimer = nil + } + waitList := previousWaitList[:] + waitList[len(waitList)-1] = job + + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + WithField("variables", job.Variables). + Debugf("Queued: replaced job on wait list") + + return waitList + } +} + // ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it. // "pipeline" is the pipeline ID from the YAML file. // @@ -259,13 +339,10 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip return nil, errors.Errorf("pipeline %q is not defined", pipeline) } - action := r.resolveScheduleAction(pipeline) - - switch action { - case scheduleActionErrNoQueue: - return nil, errNoQueue - case scheduleActionErrQueueFull: - return nil, errQueueFull + queueStrategyImpl := getQueueStrategyImpl(pipelineDef.QueueStrategy) + err := queueStrategyImpl.kannEntgegengenommenWerden(pipelineDef, r.waitListByPipeline[pipeline]) + if err != nil { + return nil, err } id, err := uuid.NewV4() @@ -276,70 +353,46 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip defer r.requestPersist() job := &PipelineJob{ - ID: id, - Pipeline: pipeline, - Created: time.Now(), - Tasks: buildJobTasks(pipelineDef.Tasks), - Env: pipelineDef.Env, - Variables: opts.Variables, - User: opts.User, - StartDelay: pipelineDef.StartDelay, + ID: id, + Pipeline: pipeline, + Created: time.Now(), + Tasks: buildJobTasks(pipelineDef.Tasks), + Env: pipelineDef.Env, + Variables: opts.Variables, + User: opts.User, + queuePartition: opts.QueuePartition, + StartDelay: pipelineDef.StartDelay, } - r.jobsByID[id] = job r.jobsByPipeline[pipeline] = append(r.jobsByPipeline[pipeline], job) - if job.StartDelay > 0 { + if pipelineDef.StartDelay > 0 { // A delayed job is a job on the wait list that is started by a function after a delay job.startTimer = time.AfterFunc(job.StartDelay, func() { r.StartDelayedJob(id) }) - } - - switch action { - case scheduleActionQueue: - r.waitListByPipeline[pipeline] = append(r.waitListByPipeline[pipeline], job) - - log. - WithField("component", "runner"). - WithField("pipeline", job.Pipeline). - WithField("jobID", job.ID). - WithField("variables", job.Variables). - Debugf("Queued: added job to wait list") return job, nil - case scheduleActionReplace: - waitList := r.waitListByPipeline[pipeline] - previousJob := waitList[len(waitList)-1] - previousJob.Canceled = true - if previousJob.startTimer != nil { - log. - WithField("previousJobID", previousJob.ID). - Debugf("Stopped start timer of previous job") - // Stop timer and unset reference for clean up - previousJob.startTimer.Stop() - previousJob.startTimer = nil - } - waitList[len(waitList)-1] = job + } else { + // no delayed job + runningJobsCount := r.runningJobsCount(pipeline) + if runningJobsCount < pipelineDef.Concurrency { + // free capacity -> start job right now and finish. + r.startJob(job) - log. - WithField("component", "runner"). - WithField("pipeline", job.Pipeline). - WithField("jobID", job.ID). - WithField("variables", job.Variables). - Debugf("Queued: replaced job on wait list") + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + WithField("variables", job.Variables). + Debugf("Started: scheduled job execution") - return job, nil + return job, nil + } } - r.startJob(job) - - log. - WithField("component", "runner"). - WithField("pipeline", job.Pipeline). - WithField("jobID", job.ID). - WithField("variables", job.Variables). - Debugf("Started: scheduled job execution") + // modify the waitlist + r.waitListByPipeline[pipeline] = queueStrategyImpl.modifyWaitList(pipelineDef, r.waitListByPipeline[pipeline], job) return job, nil } @@ -790,56 +843,28 @@ func (r *PipelineRunner) runningJobsCount(pipeline string) int { return running } -func (r *PipelineRunner) resolveScheduleAction(pipeline string) scheduleAction { - pipelineDef := r.defs.Pipelines[pipeline] - - // If a start delay is set, we will always queue the job, otherwise we check if the number of running jobs - // exceed the maximum concurrency - runningJobsCount := r.runningJobsCount(pipeline) - if runningJobsCount < pipelineDef.Concurrency && pipelineDef.StartDelay == 0 { - // job can be started right now, because pipeline is not running at full capacity, and there is no start delay - // (no queue handling) - return scheduleActionStart - } +// waitlistModifierFn modifies the wait list, if the job cannot be executed right away. +type waitlistModifierFn func(previousWaitlist []*PipelineJob, job *PipelineJob) []*PipelineJob - // here, we start with waitlist logic. - waitList := r.waitListByPipeline[pipeline] +func waitlistAppendToQueue(previousWaitlist []*PipelineJob, job *PipelineJob) []*PipelineJob { + return append(previousWaitlist, job) +} - switch pipelineDef.QueueStrategy { +func getQueueStrategyImpl(queueStrategy definition.QueueStrategy) QueueStrategyImpl { + switch queueStrategy { case definition.QueueStrategyAppend: - if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { - // queue limit == 0 -> error -> NOTE: might be moved to config validation - return scheduleActionErrNoQueue - } - if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit { - // queue full - return scheduleActionErrQueueFull - } - - // queue not full -> append to queue - return scheduleActionQueue - + return QueueStrategyAppendImpl{} case definition.QueueStrategyReplace: - if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { - // queue limit == 0 -> error -> NOTE: might be moved to config validation - return scheduleActionErrNoQueue - } - - if len(waitList) >= *pipelineDef.QueueLimit { - // queue full -> replace last element - return scheduleActionReplace - } - - // queue not full -> append to queue - return scheduleActionQueue + return QueueStrategyReplaceImpl{} } - // TODO: THIS CASE SHOULD NEVER HAPPEN !!! - return scheduleActionErrQueueFull + return nil } func (r *PipelineRunner) isSchedulable(pipeline string) bool { - action := r.resolveScheduleAction(pipeline) + // TODO REPLACE ME!!! + return true + /*action := r.resolveScheduleAction(pipeline) switch action { case scheduleActionReplace: fallthrough @@ -848,12 +873,14 @@ func (r *PipelineRunner) isSchedulable(pipeline string) bool { case scheduleActionStart: return true } - return false + return false*/ } type ScheduleOpts struct { Variables map[string]interface{} User string + // for queue_strategy=partitioned_replace, the queue partition to use + QueuePartition string } func (r *PipelineRunner) initialLoadFromStore() error { diff --git a/prunner_test.go b/prunner_test.go index be0c436..bc5a4f0 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -1083,6 +1083,93 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelay2QueueAndReplaceWillReplaceL }, 1*time.Millisecond, "job3 did not finish") } +func TestPipelineRunner_ScheduleAsync_PartitionedReplaceWorksAsExpected(t *testing.T) { + // QueuePartitionLimit = 2 + // startDelay = 50ms + // --time--> (time flows from left to right) + // a(1) means: "add job 'a' to partition 1" + // + // The testcase does the following -> quickly sends N jobs to the queue: + // a(1)..b(2)..c(1)..d(2)..e(2)..f(1) + // ^^^ a,b,c,d still exist (no replacement yet) + // ^^^ d replaced by e, others not replaced. + // ^^^ c was replaced + + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "withReplacePartition": { + Concurrency: 1, + StartDelay: 50 * time.Millisecond, + QueuePartitionLimit: intPtr(2), // !!! IMPORTANT for this testcase + QueueStrategy: definition.QueueStrategyPartitionedReplace, // !!! IMPORTANT for this testcase + Tasks: map[string]definition.TaskDef{ + "echo": { + Script: []string{"echo Test"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + require.NoError(t, defs.Validate()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := test.NewMockStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + return &test.MockRunner{ + OnRun: func(tsk *task.Task) error { + log.Debugf("Run task %s on job %s", tsk.Name, j.ID.String()) + return nil + }, + } + }, store, test.NewMockOutputStore()) + require.NoError(t, err) + + jobA, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "1"}) + require.NoError(t, err) + jobB, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "2"}) + require.NoError(t, err) + jobC, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "1"}) + require.NoError(t, err) + jobD, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "2"}) + require.NoError(t, err) + + // jobs A-D not yet started, not canceled. + _ = pRunner.ReadJob(jobA.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + _ = pRunner.ReadJob(jobB.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + _ = pRunner.ReadJob(jobC.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + _ = pRunner.ReadJob(jobD.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + + // push E(2) (replaces D(2)) + jobE, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "2"}) + require.NoError(t, err) + _ = pRunner.ReadJob(jobD.ID, func(j *PipelineJob) { + assert.True(t, j.Canceled) // !! this changed now + assert.Nil(t, j.Start) + }) + _ = pRunner.ReadJob(jobE.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + + // push F(1) (replaces C(1)) + // check that A,B,E,F were executed. +} + func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunConcurrently(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{ From 9f09c8e36b690b21f416354d05095c8552f87de1 Mon Sep 17 00:00:00 2001 From: Alexander Hesse Date: Thu, 14 Aug 2025 15:22:50 +0200 Subject: [PATCH 09/13] BUGFIX: proper check if waitlist in replace case is reached --- prunner.go | 36 ++++++++++-------------------------- prunner_test.go | 2 +- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/prunner.go b/prunner.go index 8dfe34e..c27160f 100644 --- a/prunner.go +++ b/prunner.go @@ -219,25 +219,6 @@ type jobTask struct { type jobTasks []jobTask -type scheduleAction int - -const ( - // scheduleActionStart directly starts a job via PipelineRunner.startJob() - scheduleActionStart scheduleAction = iota - - // scheduleActionQueue enqueues the job to the pipeline's waitlist - scheduleActionQueue - - // scheduleActionReplace: replace the last job on the waitlist with this one - scheduleActionReplace - - // scheduleActionErrNoQueue: error case, if queueing is not allowed (queue_limit=0) and a job is running - scheduleActionErrNoQueue - - // scheduleActionErrQueueFull: error case, if queue_limit is reached (with append queue strategy) - scheduleActionErrQueueFull -) - var errNoQueue = errors.New("concurrency exceeded and queueing disabled for pipeline") var errQueueFull = errors.New("concurrency exceeded and queue limit reached for pipeline") var ErrJobNotFound = errors.New("job not found") @@ -245,15 +226,17 @@ var errJobAlreadyCompleted = errors.New("job is already completed") var ErrShuttingDown = errors.New("runner is shutting down") type QueueStrategyImpl interface { - kannEntgegengenommenWerden(def definition.PipelineDef, waitList []*PipelineJob) error + // first step in prunner.ScheduleAsync => check if we have capacity for this job + canAcceptJob(def definition.PipelineDef, waitList []*PipelineJob) error + // add the job to the waitlist, uses the queue_strategy to determine how modifyWaitList(def definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob } type QueueStrategyAppendImpl struct { } -func (q QueueStrategyAppendImpl) kannEntgegengenommenWerden(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { +func (q QueueStrategyAppendImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { // queue limit == 0 -> error -> NOTE: might be moved to config validation return errNoQueue @@ -268,6 +251,7 @@ func (q QueueStrategyAppendImpl) kannEntgegengenommenWerden(pipelineDef definiti func (q QueueStrategyAppendImpl) modifyWaitList(def definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { log. WithField("component", "runner"). + WithField("strategy", "append"). WithField("pipeline", job.Pipeline). WithField("jobID", job.ID). WithField("variables", job.Variables). @@ -278,7 +262,7 @@ func (q QueueStrategyAppendImpl) modifyWaitList(def definition.PipelineDef, prev type QueueStrategyReplaceImpl struct { } -func (q QueueStrategyReplaceImpl) kannEntgegengenommenWerden(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { +func (q QueueStrategyReplaceImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { // queue limit == 0 -> error -> NOTE: might be moved to config validation return errNoQueue @@ -287,10 +271,11 @@ func (q QueueStrategyReplaceImpl) kannEntgegengenommenWerden(pipelineDef definit } func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { - if len(previousWaitList) <= *pipelineDef.QueueLimit { + if len(previousWaitList) < *pipelineDef.QueueLimit { // waitlist nicht voll -> append log. WithField("component", "runner"). + WithField("strategy", "replace - waitlist not full -> no replace"). WithField("pipeline", job.Pipeline). WithField("jobID", job.ID). WithField("variables", job.Variables). @@ -313,6 +298,7 @@ func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.Pipeline log. WithField("component", "runner"). + WithField("strategy", "replace - waitlist was full -> replaced last job<"). WithField("pipeline", job.Pipeline). WithField("jobID", job.ID). WithField("variables", job.Variables). @@ -340,7 +326,7 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip } queueStrategyImpl := getQueueStrategyImpl(pipelineDef.QueueStrategy) - err := queueStrategyImpl.kannEntgegengenommenWerden(pipelineDef, r.waitListByPipeline[pipeline]) + err := queueStrategyImpl.canAcceptJob(pipelineDef, r.waitListByPipeline[pipeline]) if err != nil { return nil, err } @@ -371,8 +357,6 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip job.startTimer = time.AfterFunc(job.StartDelay, func() { r.StartDelayedJob(id) }) - - return job, nil } else { // no delayed job runningJobsCount := r.runningJobsCount(pipeline) diff --git a/prunner_test.go b/prunner_test.go index bc5a4f0..baeab0f 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -914,7 +914,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi Pipelines: map[string]definition.PipelineDef{ "jobWithStartDelay": { Concurrency: 1, - StartDelay: 50 * time.Millisecond, + StartDelay: 100 * time.Millisecond, QueueLimit: intPtr(1), QueueStrategy: definition.QueueStrategyReplace, Tasks: map[string]definition.TaskDef{ From 211771b5ae9559db04083d8eae93b4383af47bd3 Mon Sep 17 00:00:00 2001 From: Alexander Hesse Date: Thu, 14 Aug 2025 15:56:57 +0200 Subject: [PATCH 10/13] FEATURE: Add QueueStrategyPartitionedReplace --- prunner.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 6 deletions(-) diff --git a/prunner.go b/prunner.go index c27160f..03f03b1 100644 --- a/prunner.go +++ b/prunner.go @@ -3,6 +3,7 @@ package prunner import ( "context" "fmt" + "github.com/Flowpack/prunner/helper/slice_utils" "io" "sort" "sync" @@ -227,10 +228,10 @@ var ErrShuttingDown = errors.New("runner is shutting down") type QueueStrategyImpl interface { // first step in prunner.ScheduleAsync => check if we have capacity for this job - canAcceptJob(def definition.PipelineDef, waitList []*PipelineJob) error + canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error // add the job to the waitlist, uses the queue_strategy to determine how - modifyWaitList(def definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob + modifyWaitList(pipelineDef definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob } type QueueStrategyAppendImpl struct { @@ -248,10 +249,9 @@ func (q QueueStrategyAppendImpl) canAcceptJob(pipelineDef definition.PipelineDef return nil } -func (q QueueStrategyAppendImpl) modifyWaitList(def definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { +func (q QueueStrategyAppendImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { log. WithField("component", "runner"). - WithField("strategy", "append"). WithField("pipeline", job.Pipeline). WithField("jobID", job.ID). WithField("variables", job.Variables). @@ -275,7 +275,6 @@ func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.Pipeline // waitlist nicht voll -> append log. WithField("component", "runner"). - WithField("strategy", "replace - waitlist not full -> no replace"). WithField("pipeline", job.Pipeline). WithField("jobID", job.ID). WithField("variables", job.Variables). @@ -298,7 +297,6 @@ func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.Pipeline log. WithField("component", "runner"). - WithField("strategy", "replace - waitlist was full -> replaced last job<"). WithField("pipeline", job.Pipeline). WithField("jobID", job.ID). WithField("variables", job.Variables). @@ -308,6 +306,54 @@ func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.Pipeline } } +type QueueStrategyPartitionedReplaceImpl struct { +} + +func (q QueueStrategyPartitionedReplaceImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { + if pipelineDef.QueuePartitionLimit != nil && *pipelineDef.QueuePartitionLimit == 0 { + // queue limit == 0 -> error -> NOTE: might be moved to config validation + return errNoQueue + } + return nil +} + +func (q QueueStrategyPartitionedReplaceImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { + queuePartition := job.queuePartition + partitionedWaitList := slice_utils.Filter(previousWaitList, func(job *PipelineJob) bool { + return job.queuePartition == queuePartition + }) + + if len(partitionedWaitList) < *pipelineDef.QueuePartitionLimit { + // partitioned wait list not full -> append job + return append(previousWaitList, job) + } + + // partitioned wait list full -> replace partitioned job + previousJob := partitionedWaitList[len(partitionedWaitList)-1] + previousJob.Canceled = true + if previousJob.startTimer != nil { + log. + WithField("previousJobID", previousJob.ID). + Debugf("Stopped start timer of previous job") + // Stop timer and unset reference for clean up + previousJob.startTimer.Stop() + previousJob.startTimer = nil + } + + // remove the just cancelled job from the waitlist + waitList := slice_utils.Filter(previousWaitList, func(job *PipelineJob) bool { + return previousJob != job + }) + + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + WithField("variables", job.Variables). + Debugf("Queued: partitioned replaced job on wait list") + return append(waitList, job) +} + // ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it. // "pipeline" is the pipeline ID from the YAML file. // @@ -840,6 +886,8 @@ func getQueueStrategyImpl(queueStrategy definition.QueueStrategy) QueueStrategyI return QueueStrategyAppendImpl{} case definition.QueueStrategyReplace: return QueueStrategyReplaceImpl{} + case definition.QueueStrategyPartitionedReplace: + return QueueStrategyPartitionedReplaceImpl{} } return nil From a371849712aedb204ac2e343cd5bf034c6b17a6e Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Fri, 15 Aug 2025 13:25:45 +0200 Subject: [PATCH 11/13] FEATURE: expose QueuePartition via HTTP API --- server/server.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index 0fcdca7..75ac3c1 100644 --- a/server/server.go +++ b/server/server.go @@ -90,6 +90,10 @@ type pipelinesScheduleRequest struct { // Job variables // example: {"tag_name": "v1.17.4", "databases": ["mysql", "postgresql"]} Variables map[string]interface{} `json:"variables"` + + // for queue_strategy=partitioned_replace, the queue partition to use for this job + // example: "tenant1" + QueuePartition string `json:"queuePartition"` } } @@ -135,7 +139,7 @@ func (s *server) pipelinesSchedule(w http.ResponseWriter, r *http.Request) { return } - pJob, err := s.pRunner.ScheduleAsync(in.Body.Pipeline, prunner.ScheduleOpts{Variables: in.Body.Variables, User: user}) + pJob, err := s.pRunner.ScheduleAsync(in.Body.Pipeline, prunner.ScheduleOpts{Variables: in.Body.Variables, QueuePartition: in.Body.QueuePartition, User: user}) if err != nil { // TODO Send JSON error and include expected errors (see resolveScheduleAction) if errors.Is(err, prunner.ErrShuttingDown) { From dea9276ee6c8bed16a14e00a35c6b9ddf7648321 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Fri, 15 Aug 2025 13:25:55 +0200 Subject: [PATCH 12/13] TASK: partitioned_replace documented in README --- README.md | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 46ded0f..b98e9de 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ This is NOT a fully featured CI pipeline solution. * [Badges](#badges) * [Components](#components) - * [prunner (this repository)](#prunner--this-repository-) + * [prunner (this repository)](#prunner-this-repository) * [prunner-ui](#prunner-ui) * [Flowpack.Prunner](#flowpackprunner) * [User guide](#user-guide) @@ -32,7 +32,10 @@ This is NOT a fully featured CI pipeline solution. * [Limiting concurrency](#limiting-concurrency) * [The wait list](#the-wait-list) * [Debounce jobs with a start delay](#debounce-jobs-with-a-start-delay) + * [Partitioned Wait Lists](#partitioned-wait-lists) * [Disabling fail-fast behavior](#disabling-fail-fast-behavior) + * [Error handling with on_error](#error-handling-with-on_error) + * [Important notes:](#important-notes) * [Configuring retention period](#configuring-retention-period) * [Handling of child processes](#handling-of-child-processes) * [Graceful shutdown](#graceful-shutdown) @@ -44,11 +47,11 @@ This is NOT a fully featured CI pipeline solution. * [Development](#development) * [Requirements](#requirements) * [Running locally](#running-locally) - * [IDE Setup (IntelliJ/GoLand)](#ide-setup--intellijgoland-) + * [IDE Setup (IntelliJ/GoLand)](#ide-setup-intellijgoland) * [Building for different operating systems.](#building-for-different-operating-systems) * [Running Tests](#running-tests) * [Memory Leak Debugging](#memory-leak-debugging) - * [Generate OpenAPI (Swagger) spec](#generate-openapi--swagger--spec) + * [Generate OpenAPI (Swagger) spec](#generate-openapi-swagger-spec) * [Releasing](#releasing) * [Security concept](#security-concept) * [License](#license) @@ -222,7 +225,8 @@ pipelines: To deactivate the queuing altogether, set `queue_limit: 0`. -Now, if the queue is limited, an error occurs when it is full and you try to add a new job. +Now, if the queue is limited (and default `queue_strategy: append` is configured), +an error occurs when it is full and you try to add a new job. Alternatively, you can also set `queue_strategy: replace` to replace the last job in the queue by the newly added one: @@ -257,6 +261,7 @@ in form of a zero or positive decimal value with a time unit ("ms", "s", "m", "h ```yaml pipelines: do_something: + # NOTE: to prevent starvation, use queue_limit >= 2x queue_limit: 1 queue_strategy: replace concurrency: 1 @@ -265,6 +270,52 @@ pipelines: tasks: # as usual ``` +NOTE: If you use `queue_limit: 1` and `start_delay`, you will run into **starvation** (=the job never starts) +if jobs are submitted quicker than `start_delay`. If you instead use `queue_limit: 2` or higher, you can +avoid this issue: Then, the 1st slot will always be worked on after `start_delay`, while the 2nd slot will +be replaced quickly. + +### Partitioned Wait Lists + +If you have a multi-tenant application, you might want to use **one wait-list per tenant** (e.g. for import jobs), +combined with global `concurrency` limits (depending on the globally available server resources). + +To enable this, do the following: + +- `queue_strategy: partitioned_replace`: Enabled partitioned wait list +- `queue_partition_limit: 1` (or higher): Configure the number of wait-list slots per tenant. The last slot gets replaced + when the wait-list is full. +- can be combined with arbitrary `start_delay` and `concurrency` as expected + +Full example: + +``` +pipelines: + do_something: + queue_strategy: partitioned_replace + # prevent starvation + queue_partition_limit: 2 + concurrency: 1 + # Queues a run of the job and only starts it after 10 seconds have passed (if no other run was triggered which replaced the queued job) + start_delay: 10s + tasks: # as usual + +``` + +Additionally, when submitting a job, you need to specify the `queuePartition` argument: + +``` +POST /pipelines/schedule + +{ + "pipeline": "my_pipeline", + "variables": { + ... + }, + "queuePartition": "tenant_foo" +} +``` + ### Disabling fail-fast behavior From c39288fe8d69ab311e2e16a7664384c8e8e33bdf Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Fri, 15 Aug 2025 13:59:50 +0200 Subject: [PATCH 13/13] BUGFIX: fix implementation of isSchedulable --- prunner.go | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/prunner.go b/prunner.go index 03f03b1..75416dd 100644 --- a/prunner.go +++ b/prunner.go @@ -3,12 +3,13 @@ package prunner import ( "context" "fmt" - "github.com/Flowpack/prunner/helper/slice_utils" "io" "sort" "sync" "time" + "github.com/Flowpack/prunner/helper/slice_utils" + "github.com/Flowpack/prunner/store" "github.com/apex/log" @@ -873,13 +874,6 @@ func (r *PipelineRunner) runningJobsCount(pipeline string) int { return running } -// waitlistModifierFn modifies the wait list, if the job cannot be executed right away. -type waitlistModifierFn func(previousWaitlist []*PipelineJob, job *PipelineJob) []*PipelineJob - -func waitlistAppendToQueue(previousWaitlist []*PipelineJob, job *PipelineJob) []*PipelineJob { - return append(previousWaitlist, job) -} - func getQueueStrategyImpl(queueStrategy definition.QueueStrategy) QueueStrategyImpl { switch queueStrategy { case definition.QueueStrategyAppend: @@ -894,18 +888,14 @@ func getQueueStrategyImpl(queueStrategy definition.QueueStrategy) QueueStrategyI } func (r *PipelineRunner) isSchedulable(pipeline string) bool { - // TODO REPLACE ME!!! - return true - /*action := r.resolveScheduleAction(pipeline) - switch action { - case scheduleActionReplace: - fallthrough - case scheduleActionQueue: - fallthrough - case scheduleActionStart: - return true - } - return false*/ + pipelineDef, ok := r.defs.Pipelines[pipeline] + if !ok { + return false + } + + queueStrategyImpl := getQueueStrategyImpl(pipelineDef.QueueStrategy) + err := queueStrategyImpl.canAcceptJob(pipelineDef, r.waitListByPipeline[pipeline]) + return err == nil } type ScheduleOpts struct {