diff --git a/README.md b/README.md index 46ded0f..b98e9de 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ This is NOT a fully featured CI pipeline solution. * [Badges](#badges) * [Components](#components) - * [prunner (this repository)](#prunner--this-repository-) + * [prunner (this repository)](#prunner-this-repository) * [prunner-ui](#prunner-ui) * [Flowpack.Prunner](#flowpackprunner) * [User guide](#user-guide) @@ -32,7 +32,10 @@ This is NOT a fully featured CI pipeline solution. * [Limiting concurrency](#limiting-concurrency) * [The wait list](#the-wait-list) * [Debounce jobs with a start delay](#debounce-jobs-with-a-start-delay) + * [Partitioned Wait Lists](#partitioned-wait-lists) * [Disabling fail-fast behavior](#disabling-fail-fast-behavior) + * [Error handling with on_error](#error-handling-with-on_error) + * [Important notes:](#important-notes) * [Configuring retention period](#configuring-retention-period) * [Handling of child processes](#handling-of-child-processes) * [Graceful shutdown](#graceful-shutdown) @@ -44,11 +47,11 @@ This is NOT a fully featured CI pipeline solution. * [Development](#development) * [Requirements](#requirements) * [Running locally](#running-locally) - * [IDE Setup (IntelliJ/GoLand)](#ide-setup--intellijgoland-) + * [IDE Setup (IntelliJ/GoLand)](#ide-setup-intellijgoland) * [Building for different operating systems.](#building-for-different-operating-systems) * [Running Tests](#running-tests) * [Memory Leak Debugging](#memory-leak-debugging) - * [Generate OpenAPI (Swagger) spec](#generate-openapi--swagger--spec) + * [Generate OpenAPI (Swagger) spec](#generate-openapi-swagger-spec) * [Releasing](#releasing) * [Security concept](#security-concept) * [License](#license) @@ -222,7 +225,8 @@ pipelines: To deactivate the queuing altogether, set `queue_limit: 0`. -Now, if the queue is limited, an error occurs when it is full and you try to add a new job. +Now, if the queue is limited (and default `queue_strategy: append` is configured), +an error occurs when it is full and you try to add a new job. Alternatively, you can also set `queue_strategy: replace` to replace the last job in the queue by the newly added one: @@ -257,6 +261,7 @@ in form of a zero or positive decimal value with a time unit ("ms", "s", "m", "h ```yaml pipelines: do_something: + # NOTE: to prevent starvation, use queue_limit >= 2x queue_limit: 1 queue_strategy: replace concurrency: 1 @@ -265,6 +270,52 @@ pipelines: tasks: # as usual ``` +NOTE: If you use `queue_limit: 1` and `start_delay`, you will run into **starvation** (=the job never starts) +if jobs are submitted quicker than `start_delay`. If you instead use `queue_limit: 2` or higher, you can +avoid this issue: Then, the 1st slot will always be worked on after `start_delay`, while the 2nd slot will +be replaced quickly. + +### Partitioned Wait Lists + +If you have a multi-tenant application, you might want to use **one wait-list per tenant** (e.g. for import jobs), +combined with global `concurrency` limits (depending on the globally available server resources). + +To enable this, do the following: + +- `queue_strategy: partitioned_replace`: Enabled partitioned wait list +- `queue_partition_limit: 1` (or higher): Configure the number of wait-list slots per tenant. The last slot gets replaced + when the wait-list is full. +- can be combined with arbitrary `start_delay` and `concurrency` as expected + +Full example: + +``` +pipelines: + do_something: + queue_strategy: partitioned_replace + # prevent starvation + queue_partition_limit: 2 + concurrency: 1 + # Queues a run of the job and only starts it after 10 seconds have passed (if no other run was triggered which replaced the queued job) + start_delay: 10s + tasks: # as usual + +``` + +Additionally, when submitting a job, you need to specify the `queuePartition` argument: + +``` +POST /pipelines/schedule + +{ + "pipeline": "my_pipeline", + "variables": { + ... + }, + "queuePartition": "tenant_foo" +} +``` + ### Disabling fail-fast behavior diff --git a/definition/loader_test.go b/definition/loader_test.go index 30e1ae9..02f5016 100644 --- a/definition/loader_test.go +++ b/definition/loader_test.go @@ -23,3 +23,28 @@ func TestLoadRecursively_WithMissingDependency(t *testing.T) { _, err := LoadRecursively("../test/fixtures/missingDep.yml") require.EqualError(t, err, `loading ../test/fixtures/missingDep.yml: invalid pipeline definition "test_it": missing task "not_existing" referenced in depends_on of task "test"`) } + +func TestPartitionedWaitlist_OK(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/partitioned_waitlist.yml") + require.NoError(t, err) +} + +func TestPartitionedWaitlist_err_0_partition_limit(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_0_partition_limit.yml") + require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_0_partition_limit.yml: invalid pipeline definition "test_it": queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace`) +} + +func TestPartitionedWaitlist_err_no_partition_limit(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_no_partition_limit.yml") + require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_no_partition_limit.yml: invalid pipeline definition "test_it": queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace`) +} + +func TestPartitionedWaitlist_err_queue_limit(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_queue_limit.yml") + require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_queue_limit.yml: invalid pipeline definition "test_it": queue_limit is not allowed if queue_strategy=partitioned_replace, use queue_partition_limit instead`) +} + +func TestWaitlist_err_partitioned_queue_limit(t *testing.T) { + _, err := LoadRecursively("../test/fixtures/waitlist_err_partitioned_queue_limit.yml") + require.EqualError(t, err, `loading ../test/fixtures/waitlist_err_partitioned_queue_limit.yml: invalid pipeline definition "test_it": queue_partition_limit is not allowed if queue_strategy=append|replace, use queue_limit instead`) +} diff --git a/definition/pipelines.go b/definition/pipelines.go index e5d3114..012393c 100644 --- a/definition/pipelines.go +++ b/definition/pipelines.go @@ -52,10 +52,15 @@ type OnErrorTaskDef struct { type PipelineDef struct { // Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1) Concurrency int `yaml:"concurrency"` - // QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil) + // QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil). Only allowed with queue_strategy=append|replace, not with partitioned_replace (there, use queue_partition_limit instead) QueueLimit *int `yaml:"queue_limit"` + + // QueuePartitionLimit is the number of slots for queueing jobs per partition, if queue_strategy=partitioned_replace is used. + QueuePartitionLimit *int `yaml:"queue_partition_limit"` + // QueueStrategy to use when adding jobs to the queue (defaults to append) QueueStrategy QueueStrategy `yaml:"queue_strategy"` + // StartDelay will delay the start of a job if the value is greater than zero (defaults to 0) StartDelay time.Duration `yaml:"start_delay"` @@ -100,6 +105,19 @@ func (d PipelineDef) validate() error { return errors.New("start_delay needs queue_limit > 0") } + if d.QueueStrategy == QueueStrategyPartitionedReplace { + if d.QueueLimit != nil { + return errors.New("queue_limit is not allowed if queue_strategy=partitioned_replace, use queue_partition_limit instead") + } + if d.QueuePartitionLimit == nil || *d.QueuePartitionLimit < 1 { + return errors.New("queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace") + } + } else { + if d.QueuePartitionLimit != nil { + return errors.New("queue_partition_limit is not allowed if queue_strategy=append|replace, use queue_limit instead") + } + } + for taskName, taskDef := range d.Tasks { for _, dependentTask := range taskDef.DependsOn { _, exists := d.Tasks[dependentTask] @@ -164,13 +182,21 @@ func (d PipelineDef) Equals(otherDef PipelineDef) bool { return true } +// QueueStrategy defines the behavior when jobs wait (=are queued) before pipeline execution. type QueueStrategy int const ( - // QueueStrategyAppend appends jobs to the queue until queue limit is reached + // QueueStrategyAppend appends jobs to the queue until the queue limit is reached (FIFO) QueueStrategyAppend QueueStrategy = 0 - // QueueStrategyReplace replaces pending jobs (with same variables) instead of appending to the queue + + // QueueStrategyReplace replaces the **LAST** pending job if the queue limit is reached. If the queue is not yet full, the job is appended. + // NOTE: if using queue_limit=1 + replace, this can lead to starvation if rapidly enqueuing jobs. If using queue_limit >= 2, this cannot happen anymore. + // (see 2025_08_14_partitioned_waitlist.md for detailed description) QueueStrategyReplace QueueStrategy = 1 + + // QueueStrategyPartitionedReplace implements the "partitioned waitlist" strategy, as explained in 2025_08_14_partitioned_waitlist.md. + // -> it replaces the **LAST** pending job of a given partition, if the partition is full (=queue_partition_limit). + QueueStrategyPartitionedReplace QueueStrategy = 2 ) func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -185,6 +211,8 @@ func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error { *s = QueueStrategyAppend case "replace": *s = QueueStrategyReplace + case "partitioned_replace": + *s = QueueStrategyPartitionedReplace default: return errors.Errorf("unknown queue strategy: %q", strategyName) } diff --git a/docs/2025_08_14_partitioned_waitlist.md b/docs/2025_08_14_partitioned_waitlist.md new file mode 100644 index 0000000..e8c34da --- /dev/null +++ b/docs/2025_08_14_partitioned_waitlist.md @@ -0,0 +1,111 @@ +# FEATURE: Partitioned Waitlists + + +# Problem description + +## Current state + +> **FROM README** +> +> By default, if you limit concurrency, and the limit is exceeded, further jobs are added to the +> waitlist of the pipeline. +> +> However, you have some options to configure this as well: +> +> The waitlist can have a maximum size, denoted by `queue_limit`: +> +> ```yaml +> pipelines: +> do_something: +> queue_limit: 1 +> concurrency: 1 +> tasks: # as usual +> ``` +> +> To deactivate the queuing altogether, set `queue_limit: 0`. +> +> Now, if the queue is limited, an error occurs when it is full and you try to add a new job. +> +> Alternatively, you can also set `queue_strategy: replace` to replace the last job in the +> queue by the newly added one: + +## Current state -> Problem "Starvation" with "Debounce" ?? + +> Queues a run of the job and only starts it after 10 seconds have passed (if no other run was triggered which replaced the queued job) + +``` +--> time (1 char = 1 s) + +# 1 waitlist slot, delay 10 s + +a____ _____ <-- here the job A is queued + A <-- here job A starts + + +# 1 waitlist slot, delay 10 s +# STARVATION CAN HAPPEN +a____ b____ c_____ _____ + C + + +# 2 waitlist slots, delay 10 s +# !!! PREVENTS STARVATION !!! +a____ b__c_ ______ _____ +[a_] [ab] A <-- here, a starts. NO STARVATION + [ac] + [c_] +``` + +SUMMARY: +- Starvation can only happen if waitlist size=1; if waitlist size=2 (or bigger) cannot happen because always the LAST job gets replaced. +- We cannot debounce immediately; so we ALWAYS wait at least for start_delay. (not a problem for us right now). + +## problem description + +In a project, we have 50 Neos instances, which use prunner for background tasks (some are short, some are long). + +Currently, we have 4 pipelines globally +- concurrency 1 +- concurrency 8 +- concurrency 4 +- concurrency 4 (import job) + - -> needs the global concurrency to limit the needed server resources + +Now, a new pipeline should be added for "irregular import jobs" triggered by webhooks. +- can happen very quickly after each other +- -> Pipeline should start after a certain amount of time (newer should override older pipelines) +- StartDelay combined with QueueStrategy "Replace" + - `waitList[len(waitList)-1] = job` -> *LAST* Element is replaced of wait list. +- -> GLOBAL replacement does not work, because each job has arguments (which are relevant, i.e. tenant ID). + +We still want to have a GLOBAL CONCURRENCY LIMIT (per pipeline), but a WAITLIST per instance. + + +## Solution Idea: + +we want to be able to SEGMENT the waitlist into different partitions. The `queue_strategy` and `queue_limit` should be per partition. +`concurrency` stays per pipeline (as before) + +``` +**LOGICAL VIEW** (Idea) +┌──────────────────────┐ ┌──────────────────────────────────────────────────┐ +│ Waitlist Instance 1 │ │ Pipeline (with concurrency 2) │ +├──────────────────────┤ │ │ +│ Waitlist Instance 2 │ -> ├──────────────────────────────────────────────────┤ +├──────────────────────┤ │ │ +│ Waitlist Instance 3 │ │ │ +└──────────────────────┘ └──────────────────────────────────────────────────┘ + + if job is *delayed*, + stays in wait list + for this duration + +``` + +Technically, we still have a SINGLE Wait List per pipeline, but the jobs can be partitioned by `waitlist_partition_id`. + +-> In this case, the `replace` strategy will replace the last element of the given partition. + +-> we create a new queueStrategy for the partitioned waitlist: `partitioned_replace` + +If we partition the waitlist, the waitlist can grow up to queue_limit * number of partitions. \ No newline at end of file diff --git a/helper/slice_utils/sliceUtils.go b/helper/slice_utils/sliceUtils.go new file mode 100644 index 0000000..9331cac --- /dev/null +++ b/helper/slice_utils/sliceUtils.go @@ -0,0 +1,11 @@ +package slice_utils + +func Filter[T any](s []T, p func(i T) bool) []T { + var result []T + for _, i := range s { + if p(i) { + result = append(result, i) + } + } + return result +} diff --git a/prunner.go b/prunner.go index 54531ab..75416dd 100644 --- a/prunner.go +++ b/prunner.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/Flowpack/prunner/helper/slice_utils" + "github.com/Flowpack/prunner/store" "github.com/apex/log" @@ -115,10 +117,13 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat type PipelineJob struct { ID uuid.UUID // Identifier of the pipeline (from the YAML file) - Pipeline string - Env map[string]string - Variables map[string]interface{} - StartDelay time.Duration + Pipeline string + // Identifier of the queue partition, if any (from definition) + queuePartition string + Env map[string]string + Variables map[string]interface{} + User string + StartDelay time.Duration Completed bool Canceled bool @@ -127,8 +132,7 @@ type PipelineJob struct { // Start is the actual start time of the job. Could be nil if not yet started. Start *time.Time // End is the actual end time of the job (can be nil if incomplete) - End *time.Time - User string + End *time.Time // Tasks is an in-memory representation with state of tasks, sorted by dependencies Tasks jobTasks LastError error @@ -144,6 +148,33 @@ func (j *PipelineJob) isRunning() bool { return j.Start != nil && !j.Completed && !j.Canceled } +// canBeStarted is true if: +// - job is delayed and the delay has passed +// - job is not yet running and not canceled +// - job is not delayed +// +// Only decides for itself, without outer knowledge (e.g. if the pipeline still has free spots or not) +func (j *PipelineJob) canBeStarted() bool { + if j.StartDelay > 0 && j.startTimer != nil { + // delay still running, we can't start this job. + // (startTimer set to nil in StartDelayedJob(), after the timer has finished) + return false + } + + if j.Start != nil { + // already started, we can't start this job. + return false + } + + if j.Canceled { + // cancelled, we can't start this job. + return false + } + + // job is not delayed; or job is delayed and the delay has passed + return true +} + func (r *PipelineRunner) initScheduler(j *PipelineJob) { // For correct cancellation of tasks a single task runner and scheduler per job is used @@ -190,92 +221,69 @@ type jobTask struct { type jobTasks []jobTask -type scheduleAction int - -const ( - scheduleActionStart scheduleAction = iota - scheduleActionQueue - scheduleActionQueueDelay - scheduleActionReplace - scheduleActionNoQueue - scheduleActionQueueFull -) - var errNoQueue = errors.New("concurrency exceeded and queueing disabled for pipeline") var errQueueFull = errors.New("concurrency exceeded and queue limit reached for pipeline") var ErrJobNotFound = errors.New("job not found") var errJobAlreadyCompleted = errors.New("job is already completed") var ErrShuttingDown = errors.New("runner is shutting down") -// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it. -// "pipeline" is the pipeline ID from the YAML file. -// -// the returned PipelineJob is the individual execution context. -func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) { - r.mx.Lock() - defer r.mx.Unlock() +type QueueStrategyImpl interface { + // first step in prunner.ScheduleAsync => check if we have capacity for this job + canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error - if r.isShuttingDown { - return nil, ErrShuttingDown - } - - pipelineDef, ok := r.defs.Pipelines[pipeline] - if !ok { - return nil, errors.Errorf("pipeline %q is not defined", pipeline) - } + // add the job to the waitlist, uses the queue_strategy to determine how + modifyWaitList(pipelineDef definition.PipelineDef, waitList []*PipelineJob, job *PipelineJob) []*PipelineJob +} - action := r.resolveScheduleAction(pipeline, false) +type QueueStrategyAppendImpl struct { +} - switch action { - case scheduleActionNoQueue: - return nil, errNoQueue - case scheduleActionQueueFull: - return nil, errQueueFull +func (q QueueStrategyAppendImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { + if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { + // queue limit == 0 -> error -> NOTE: might be moved to config validation + return errNoQueue } - - id, err := uuid.NewV4() - if err != nil { - return nil, errors.Wrap(err, "generating job UUID") + if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit { + // queue full + return errQueueFull } + return nil +} - defer r.requestPersist() - - job := &PipelineJob{ - ID: id, - Pipeline: pipeline, - Created: time.Now(), - Tasks: buildJobTasks(pipelineDef.Tasks), - Env: pipelineDef.Env, - Variables: opts.Variables, - User: opts.User, - StartDelay: pipelineDef.StartDelay, - } +func (q QueueStrategyAppendImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + WithField("variables", job.Variables). + Debugf("Queued: added job to wait list") + return append(previousWaitList, job) +} - r.jobsByID[id] = job - r.jobsByPipeline[pipeline] = append(r.jobsByPipeline[pipeline], job) +type QueueStrategyReplaceImpl struct { +} - if job.StartDelay > 0 { - // A delayed job is a job on the wait list that is started by a function after a delay - job.startTimer = time.AfterFunc(job.StartDelay, func() { - r.StartDelayedJob(id) - }) +func (q QueueStrategyReplaceImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { + if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { + // queue limit == 0 -> error -> NOTE: might be moved to config validation + return errNoQueue } + return nil +} - switch action { - case scheduleActionQueue: - r.waitListByPipeline[pipeline] = append(r.waitListByPipeline[pipeline], job) - +func (q QueueStrategyReplaceImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { + if len(previousWaitList) < *pipelineDef.QueueLimit { + // waitlist nicht voll -> append log. WithField("component", "runner"). WithField("pipeline", job.Pipeline). WithField("jobID", job.ID). WithField("variables", job.Variables). Debugf("Queued: added job to wait list") - - return job, nil - case scheduleActionReplace: - waitList := r.waitListByPipeline[pipeline] - previousJob := waitList[len(waitList)-1] + return append(previousWaitList, job) + } else { + // waitlist voll -> replace + previousJob := previousWaitList[len(previousWaitList)-1] previousJob.Canceled = true if previousJob.startTimer != nil { log. @@ -285,6 +293,7 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip previousJob.startTimer.Stop() previousJob.startTimer = nil } + waitList := previousWaitList[:] waitList[len(waitList)-1] = job log. @@ -294,17 +303,127 @@ func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*Pip WithField("variables", job.Variables). Debugf("Queued: replaced job on wait list") - return job, nil + return waitList } +} + +type QueueStrategyPartitionedReplaceImpl struct { +} + +func (q QueueStrategyPartitionedReplaceImpl) canAcceptJob(pipelineDef definition.PipelineDef, waitList []*PipelineJob) error { + if pipelineDef.QueuePartitionLimit != nil && *pipelineDef.QueuePartitionLimit == 0 { + // queue limit == 0 -> error -> NOTE: might be moved to config validation + return errNoQueue + } + return nil +} - r.startJob(job) +func (q QueueStrategyPartitionedReplaceImpl) modifyWaitList(pipelineDef definition.PipelineDef, previousWaitList []*PipelineJob, job *PipelineJob) []*PipelineJob { + queuePartition := job.queuePartition + partitionedWaitList := slice_utils.Filter(previousWaitList, func(job *PipelineJob) bool { + return job.queuePartition == queuePartition + }) + + if len(partitionedWaitList) < *pipelineDef.QueuePartitionLimit { + // partitioned wait list not full -> append job + return append(previousWaitList, job) + } + + // partitioned wait list full -> replace partitioned job + previousJob := partitionedWaitList[len(partitionedWaitList)-1] + previousJob.Canceled = true + if previousJob.startTimer != nil { + log. + WithField("previousJobID", previousJob.ID). + Debugf("Stopped start timer of previous job") + // Stop timer and unset reference for clean up + previousJob.startTimer.Stop() + previousJob.startTimer = nil + } + + // remove the just cancelled job from the waitlist + waitList := slice_utils.Filter(previousWaitList, func(job *PipelineJob) bool { + return previousJob != job + }) log. WithField("component", "runner"). WithField("pipeline", job.Pipeline). WithField("jobID", job.ID). WithField("variables", job.Variables). - Debugf("Started: scheduled job execution") + Debugf("Queued: partitioned replaced job on wait list") + return append(waitList, job) +} + +// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it. +// "pipeline" is the pipeline ID from the YAML file. +// +// the returned PipelineJob is the individual execution context. +func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) { + r.mx.Lock() + defer r.mx.Unlock() + + if r.isShuttingDown { + return nil, ErrShuttingDown + } + + pipelineDef, ok := r.defs.Pipelines[pipeline] + if !ok { + return nil, errors.Errorf("pipeline %q is not defined", pipeline) + } + + queueStrategyImpl := getQueueStrategyImpl(pipelineDef.QueueStrategy) + err := queueStrategyImpl.canAcceptJob(pipelineDef, r.waitListByPipeline[pipeline]) + if err != nil { + return nil, err + } + + id, err := uuid.NewV4() + if err != nil { + return nil, errors.Wrap(err, "generating job UUID") + } + + defer r.requestPersist() + + job := &PipelineJob{ + ID: id, + Pipeline: pipeline, + Created: time.Now(), + Tasks: buildJobTasks(pipelineDef.Tasks), + Env: pipelineDef.Env, + Variables: opts.Variables, + User: opts.User, + queuePartition: opts.QueuePartition, + StartDelay: pipelineDef.StartDelay, + } + r.jobsByID[id] = job + r.jobsByPipeline[pipeline] = append(r.jobsByPipeline[pipeline], job) + + if pipelineDef.StartDelay > 0 { + // A delayed job is a job on the wait list that is started by a function after a delay + job.startTimer = time.AfterFunc(job.StartDelay, func() { + r.StartDelayedJob(id) + }) + } else { + // no delayed job + runningJobsCount := r.runningJobsCount(pipeline) + if runningJobsCount < pipelineDef.Concurrency { + // free capacity -> start job right now and finish. + r.startJob(job) + + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + WithField("variables", job.Variables). + Debugf("Started: scheduled job execution") + + return job, nil + } + } + + // modify the waitlist + r.waitListByPipeline[pipeline] = queueStrategyImpl.modifyWaitList(pipelineDef, r.waitListByPipeline[pipeline], job) return job, nil } @@ -365,6 +484,7 @@ func buildPipelineGraph(id uuid.UUID, tasks jobTasks, vars map[string]interface{ return g, nil } +// ReadJob loads the job identified by id and calls process() on it synchronously. All protected by the global r.mx mutex. func (r *PipelineRunner) ReadJob(id uuid.UUID, process func(j *PipelineJob)) error { r.mx.RLock() defer r.mx.RUnlock() @@ -663,29 +783,35 @@ func (r *PipelineRunner) JobCompleted(job *PipelineJob, err error) { } func (r *PipelineRunner) startJobsOnWaitList(pipeline string) { + // Pipeline currently full? (all executors are working) -> abort + pipelineDef := r.defs.Pipelines[pipeline] + runningJobsCount := r.runningJobsCount(pipeline) + freeSlots := pipelineDef.Concurrency - runningJobsCount + if freeSlots <= 0 { + // no room to start any jobs right now + return + } + // Check wait list if another job is queued waitList := r.waitListByPipeline[pipeline] - // Schedule as many jobs as are schedulable (also process if the schedule action is start delay and check individual jobs if they can be started) - for len(waitList) > 0 && r.resolveDequeueJobAction(waitList[0]) == scheduleActionStart { - queuedJob := waitList[0] - // Queued job has a start delay timer set - wait for it to fire - if queuedJob.startTimer != nil { - // TODO We need to check if we rather need to skip only this job and continue to process other jobs on the queue - break + // Schedule as many jobs as are schedulable + newWaitList := make([]*PipelineJob, 0, len(waitList)) + for _, job := range waitList { + if job.canBeStarted() && freeSlots > 0 { + freeSlots-- + r.startJob(job) + log. + WithField("component", "runner"). + WithField("pipeline", job.Pipeline). + WithField("jobID", job.ID). + Debugf("Dequeue: scheduled job execution") + } else { + newWaitList = append(newWaitList, job) } - - waitList = waitList[1:] - - r.startJob(queuedJob) - - log. - WithField("component", "runner"). - WithField("pipeline", queuedJob.Pipeline). - WithField("jobID", queuedJob.ID). - Debugf("Dequeue: scheduled job execution") } - r.waitListByPipeline[pipeline] = waitList + + r.waitListByPipeline[pipeline] = newWaitList } // IterateJobs calls process for each job in a read lock. @@ -748,59 +874,35 @@ func (r *PipelineRunner) runningJobsCount(pipeline string) int { return running } -func (r *PipelineRunner) resolveScheduleAction(pipeline string, ignoreStartDelay bool) scheduleAction { - pipelineDef := r.defs.Pipelines[pipeline] - - // If a start delay is set, we will always queue the job, otherwise we check if the number of running jobs - // exceed the maximum concurrency - runningJobsCount := r.runningJobsCount(pipeline) - if runningJobsCount >= pipelineDef.Concurrency || (pipelineDef.StartDelay > 0 && !ignoreStartDelay) { - // Check if jobs should be queued if concurrency factor is exceeded - if pipelineDef.QueueLimit != nil && *pipelineDef.QueueLimit == 0 { - return scheduleActionNoQueue - } - - // Check if a queued job on the wait list should be replaced depending on queue strategy - waitList := r.waitListByPipeline[pipeline] - if pipelineDef.QueueStrategy == definition.QueueStrategyReplace && len(waitList) > 0 { - return scheduleActionReplace - } - - // Error if there is a queue limit and the number of queued jobs exceeds the allowed queue limit - if pipelineDef.QueueLimit != nil && len(waitList) >= *pipelineDef.QueueLimit { - return scheduleActionQueueFull - } - - return scheduleActionQueue +func getQueueStrategyImpl(queueStrategy definition.QueueStrategy) QueueStrategyImpl { + switch queueStrategy { + case definition.QueueStrategyAppend: + return QueueStrategyAppendImpl{} + case definition.QueueStrategyReplace: + return QueueStrategyReplaceImpl{} + case definition.QueueStrategyPartitionedReplace: + return QueueStrategyPartitionedReplaceImpl{} } - return scheduleActionStart -} - -func (r *PipelineRunner) resolveDequeueJobAction(job *PipelineJob) scheduleAction { - // Start the job if it had a start delay but the timer finished - ignoreStartDelay := job.StartDelay > 0 && job.startTimer == nil - return r.resolveScheduleAction(job.Pipeline, ignoreStartDelay) + return nil } func (r *PipelineRunner) isSchedulable(pipeline string) bool { - action := r.resolveScheduleAction(pipeline, false) - switch action { - case scheduleActionReplace: - fallthrough - case scheduleActionQueue: - fallthrough - case scheduleActionStart: - return true - case scheduleActionQueueDelay: - return true + pipelineDef, ok := r.defs.Pipelines[pipeline] + if !ok { + return false } - return false + + queueStrategyImpl := getQueueStrategyImpl(pipelineDef.QueueStrategy) + err := queueStrategyImpl.canAcceptJob(pipelineDef, r.waitListByPipeline[pipeline]) + return err == nil } type ScheduleOpts struct { Variables map[string]interface{} User string + // for queue_strategy=partitioned_replace, the queue partition to use + QueuePartition string } func (r *PipelineRunner) initialLoadFromStore() error { diff --git a/prunner_test.go b/prunner_test.go index 83d2c89..baeab0f 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -914,7 +914,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi Pipelines: map[string]definition.PipelineDef{ "jobWithStartDelay": { Concurrency: 1, - StartDelay: 50 * time.Millisecond, + StartDelay: 100 * time.Millisecond, QueueLimit: intPtr(1), QueueStrategy: definition.QueueStrategyReplace, Tasks: map[string]definition.TaskDef{ @@ -958,6 +958,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi job2, err := pRunner.ScheduleAsync("jobWithStartDelay", ScheduleOpts{}) require.NoError(t, err) + // original "job" should be canceled test.WaitForCondition(t, func() bool { var canceled bool _ = pRunner.ReadJob(jobID, func(j *PipelineJob) { @@ -966,6 +967,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi return canceled }, 1*time.Millisecond, "job is canceled") + // job2 is scheduled job2ID := job2.ID test.WaitForCondition(t, func() bool { var started bool @@ -975,6 +977,7 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi return !started }, 1*time.Millisecond, "job2 is not started (queued)") + // job2 is started after ~50 ms (which the test does not check for ^^) test.WaitForCondition(t, func() bool { var started bool _ = pRunner.ReadJob(job2ID, func(j *PipelineJob) { @@ -993,6 +996,180 @@ func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillQueueSi }, 1*time.Millisecond, "job2 is finished") } +func TestPipelineRunner_ScheduleAsync_WithStartDelay2QueueAndReplaceWillReplaceLastJob(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "jobWithStartDelay": { + Concurrency: 1, + StartDelay: 50 * time.Millisecond, + QueueLimit: intPtr(2), // !!! IMPORTANT for this testcase + QueueStrategy: definition.QueueStrategyReplace, + Tasks: map[string]definition.TaskDef{ + "echo": { + Script: []string{"echo Test"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + require.NoError(t, defs.Validate()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := test.NewMockStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + return &test.MockRunner{ + OnRun: func(tsk *task.Task) error { + log.Debugf("Run task %s on job %s", tsk.Name, j.ID.String()) + return nil + }, + } + }, store, test.NewMockOutputStore()) + require.NoError(t, err) + + // job1 + job2 + job3 is appended + job1, err := pRunner.ScheduleAsync("jobWithStartDelay", ScheduleOpts{}) + require.NoError(t, err) + + job2, err := pRunner.ScheduleAsync("jobWithStartDelay", ScheduleOpts{}) + require.NoError(t, err) + + job3, err := pRunner.ScheduleAsync("jobWithStartDelay", ScheduleOpts{}) + require.NoError(t, err) + + // job2 should be canceled + test.WaitForCondition(t, func() bool { + var canceled bool + _ = pRunner.ReadJob(job2.ID, func(j *PipelineJob) { + canceled = j.Canceled + }) + return canceled + }, 1*time.Millisecond, "job2 was not canceled") + + // when job2 was canceled, job1 should still be not canceled, and job3 also should be in the queue. + _ = pRunner.ReadJob(job1.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + }) + _ = pRunner.ReadJob(job2.ID, func(j *PipelineJob) { + assert.True(t, j.Canceled) + }) + _ = pRunner.ReadJob(job3.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + }) + + // we wait for job1 to be finished + test.WaitForCondition(t, func() bool { + var started bool + var running bool + + _ = pRunner.ReadJob(job1.ID, func(j *PipelineJob) { + started = j.Start != nil + running = j.isRunning() + }) + return started && !running + }, 1*time.Millisecond, "job1 did not finish") + + test.WaitForCondition(t, func() bool { + var started bool + var running bool + + _ = pRunner.ReadJob(job3.ID, func(j *PipelineJob) { + started = j.Start != nil + running = j.isRunning() + }) + return started && !running + }, 1*time.Millisecond, "job3 did not finish") +} + +func TestPipelineRunner_ScheduleAsync_PartitionedReplaceWorksAsExpected(t *testing.T) { + // QueuePartitionLimit = 2 + // startDelay = 50ms + // --time--> (time flows from left to right) + // a(1) means: "add job 'a' to partition 1" + // + // The testcase does the following -> quickly sends N jobs to the queue: + // a(1)..b(2)..c(1)..d(2)..e(2)..f(1) + // ^^^ a,b,c,d still exist (no replacement yet) + // ^^^ d replaced by e, others not replaced. + // ^^^ c was replaced + + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "withReplacePartition": { + Concurrency: 1, + StartDelay: 50 * time.Millisecond, + QueuePartitionLimit: intPtr(2), // !!! IMPORTANT for this testcase + QueueStrategy: definition.QueueStrategyPartitionedReplace, // !!! IMPORTANT for this testcase + Tasks: map[string]definition.TaskDef{ + "echo": { + Script: []string{"echo Test"}, + }, + }, + SourcePath: "fixtures", + }, + }, + } + require.NoError(t, defs.Validate()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := test.NewMockStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + return &test.MockRunner{ + OnRun: func(tsk *task.Task) error { + log.Debugf("Run task %s on job %s", tsk.Name, j.ID.String()) + return nil + }, + } + }, store, test.NewMockOutputStore()) + require.NoError(t, err) + + jobA, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "1"}) + require.NoError(t, err) + jobB, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "2"}) + require.NoError(t, err) + jobC, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "1"}) + require.NoError(t, err) + jobD, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "2"}) + require.NoError(t, err) + + // jobs A-D not yet started, not canceled. + _ = pRunner.ReadJob(jobA.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + _ = pRunner.ReadJob(jobB.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + _ = pRunner.ReadJob(jobC.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + _ = pRunner.ReadJob(jobD.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + + // push E(2) (replaces D(2)) + jobE, err := pRunner.ScheduleAsync("withReplacePartition", ScheduleOpts{QueuePartition: "2"}) + require.NoError(t, err) + _ = pRunner.ReadJob(jobD.ID, func(j *PipelineJob) { + assert.True(t, j.Canceled) // !! this changed now + assert.Nil(t, j.Start) + }) + _ = pRunner.ReadJob(jobE.ID, func(j *PipelineJob) { + assert.False(t, j.Canceled) + assert.Nil(t, j.Start) + }) + + // push F(1) (replaces C(1)) + // check that A,B,E,F were executed. +} + func TestPipelineRunner_ScheduleAsync_WithStartDelayNoQueueAndReplaceWillNotRunConcurrently(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{ diff --git a/server/server.go b/server/server.go index 0fcdca7..75ac3c1 100644 --- a/server/server.go +++ b/server/server.go @@ -90,6 +90,10 @@ type pipelinesScheduleRequest struct { // Job variables // example: {"tag_name": "v1.17.4", "databases": ["mysql", "postgresql"]} Variables map[string]interface{} `json:"variables"` + + // for queue_strategy=partitioned_replace, the queue partition to use for this job + // example: "tenant1" + QueuePartition string `json:"queuePartition"` } } @@ -135,7 +139,7 @@ func (s *server) pipelinesSchedule(w http.ResponseWriter, r *http.Request) { return } - pJob, err := s.pRunner.ScheduleAsync(in.Body.Pipeline, prunner.ScheduleOpts{Variables: in.Body.Variables, User: user}) + pJob, err := s.pRunner.ScheduleAsync(in.Body.Pipeline, prunner.ScheduleOpts{Variables: in.Body.Variables, QueuePartition: in.Body.QueuePartition, User: user}) if err != nil { // TODO Send JSON error and include expected errors (see resolveScheduleAction) if errors.Is(err, prunner.ErrShuttingDown) { diff --git a/test/fixtures/partitioned_waitlist.yml b/test/fixtures/partitioned_waitlist.yml new file mode 100644 index 0000000..fef5537 --- /dev/null +++ b/test/fixtures/partitioned_waitlist.yml @@ -0,0 +1,11 @@ +pipelines: + test_it: + queue_strategy: partitioned_replace + # keep two tasks in the queue PER PARTITION. + queue_partition_limit: 2 + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" diff --git a/test/fixtures/partitioned_waitlist_err_0_partition_limit.yml b/test/fixtures/partitioned_waitlist_err_0_partition_limit.yml new file mode 100644 index 0000000..fd8e87c --- /dev/null +++ b/test/fixtures/partitioned_waitlist_err_0_partition_limit.yml @@ -0,0 +1,10 @@ +pipelines: + test_it: + queue_strategy: partitioned_replace + queue_partition_limit: 0 # !!! ERROR + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" diff --git a/test/fixtures/partitioned_waitlist_err_no_partition_limit.yml b/test/fixtures/partitioned_waitlist_err_no_partition_limit.yml new file mode 100644 index 0000000..46fd4d7 --- /dev/null +++ b/test/fixtures/partitioned_waitlist_err_no_partition_limit.yml @@ -0,0 +1,10 @@ +pipelines: + test_it: + queue_strategy: partitioned_replace + # MISSING: queue_partition_limit: 2 + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" diff --git a/test/fixtures/partitioned_waitlist_err_queue_limit.yml b/test/fixtures/partitioned_waitlist_err_queue_limit.yml new file mode 100644 index 0000000..4bce093 --- /dev/null +++ b/test/fixtures/partitioned_waitlist_err_queue_limit.yml @@ -0,0 +1,10 @@ +pipelines: + test_it: + queue_strategy: partitioned_replace + queue_limit: 2 # not allowed in partitioned_replace + concurrency: 1 + + tasks: + test: + script: + - echo "Foo" diff --git a/test/fixtures/waitlist_err_partitioned_queue_limit.yml b/test/fixtures/waitlist_err_partitioned_queue_limit.yml new file mode 100644 index 0000000..8f35b8a --- /dev/null +++ b/test/fixtures/waitlist_err_partitioned_queue_limit.yml @@ -0,0 +1,11 @@ +pipelines: + test_it: + queue_strategy: replace + # error: not allowed for queue_strategy: replace + queue_partition_limit: 2 + concurrency: 1 + + tasks: + test: + script: + - echo "Foo"