-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpool.go
More file actions
143 lines (118 loc) · 3.25 KB
/
pool.go
File metadata and controls
143 lines (118 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package hypercache
import (
"sync"
)
// JobFunc is a function that can be enqueued in a worker pool.
type JobFunc func() error
// WorkerPool is a pool of workers that can execute jobs concurrently.
//
// Enqueue is safe to call concurrently with Shutdown. After Shutdown returns,
// further Enqueue calls silently drop the job — this prevents races during
// graceful cache shutdown where background loops (expiration, eviction) may
// still attempt to enqueue work after Stop() has begun.
type WorkerPool struct {
// shutdownMu protects the closed flag and serializes Shutdown vs Enqueue
// so Shutdown cannot close pool.jobs while an Enqueue is mid-send.
// Enqueue takes RLock (concurrent senders allowed); Shutdown takes Lock.
shutdownMu sync.RWMutex
closed bool
workers int
jobs chan JobFunc
wg sync.WaitGroup
quit chan struct{}
errorChan chan error
}
// NewWorkerPool creates a new worker pool with the given number of workers.
func NewWorkerPool(workers int) *WorkerPool {
pool := &WorkerPool{
workers: workers,
jobs: make(chan JobFunc, workers),
// buffer quit to allow multiple resize signals without blocking immediately
quit: make(chan struct{}, workers),
errorChan: make(chan error, workers),
}
pool.start()
return pool
}
// Enqueue adds a job to the worker pool. If the pool has been shut down,
// the job is silently dropped (see WorkerPool docstring).
func (pool *WorkerPool) Enqueue(job JobFunc) {
pool.shutdownMu.RLock()
defer pool.shutdownMu.RUnlock()
if pool.closed {
return
}
pool.wg.Add(1)
pool.jobs <- job
}
// Shutdown shuts down the worker pool. It waits for all enqueued jobs to
// finish before returning. Idempotent — repeat calls are no-ops.
func (pool *WorkerPool) Shutdown() {
pool.shutdownMu.Lock()
if pool.closed {
pool.shutdownMu.Unlock()
return
}
pool.closed = true
// Close jobs while holding the write lock so no Enqueue can race the close.
close(pool.jobs)
pool.shutdownMu.Unlock()
// Wait for all enqueued jobs to complete
pool.wg.Wait()
// Now signal any lingering workers to exit select loop
close(pool.quit)
// It's now safe to close the error channel (no more sends after wg completes)
close(pool.errorChan)
}
// Errors returns a channel that can be used to receive errors from the worker pool.
func (pool *WorkerPool) Errors() <-chan error {
return pool.errorChan
}
// Resize resizes the worker pool.
func (pool *WorkerPool) Resize(newSize int) {
if newSize < 0 {
return
}
diff := newSize - pool.workers
if diff == 0 {
return
}
pool.workers = newSize
if diff > 0 {
// Increase the number of workers
for range diff {
go pool.worker()
}
} else {
// Decrease the number of workers
// Send only the number of quit signals needed to remove workers
for range -diff {
pool.quit <- struct{}{}
}
}
}
// start starts the worker pool.
func (pool *WorkerPool) start() {
for range pool.workers {
go pool.worker()
}
}
// worker is the main loop executed by each worker goroutine.
func (pool *WorkerPool) worker() {
for {
select {
case job, ok := <-pool.jobs:
if !ok {
// jobs channel closed and drained
return
}
err := job()
if err != nil {
pool.errorChan <- err
}
pool.wg.Done()
case <-pool.quit:
return
}
}
}