Skip to content

Commit 8200f4c

Browse files
authored
Add thread based delayed workers (#3887)
Co-authored-by: Johannes Haass <johannes.haass@sap.com>
1 parent 39939a5 commit 8200f4c

File tree

7 files changed

+296
-13
lines changed

7 files changed

+296
-13
lines changed

lib/cloud_controller/config_schemas/base/api_schema.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,8 @@ class ApiSchema < VCAP::Config
345345
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
346346
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
347347
optional(:diego_sync) => { timeout_in_seconds: Integer },
348-
optional(:priorities) => Hash
348+
optional(:priorities) => Hash,
349+
optional(:number_of_worker_threads) => Integer
349350
},
350351

351352
# perm settings no longer have any effect but are preserved here

lib/cloud_controller/config_schemas/base/worker_schema.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ class WorkerSchema < VCAP::Config
171171
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
172172
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
173173
optional(:diego_sync) => { timeout_in_seconds: Integer },
174-
optional(:priorities) => Hash
174+
optional(:priorities) => Hash,
175+
optional(:number_of_worker_threads) => Integer
175176
},
176177

177178
volume_services_enabled: bool,

lib/delayed_job/delayed_worker.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'delayed_job/threaded_worker'
2+
13
class CloudController::DelayedWorker
24
def initialize(options)
35
@queue_options = {
@@ -21,7 +23,8 @@ def start_working
2123
Delayed::Worker.max_attempts = 3
2224
Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1
2325
Delayed::Worker.logger = logger
24-
worker = Delayed::Worker.new(@queue_options)
26+
num_worker_threads = config.get(:jobs, :number_of_worker_threads)
27+
worker = num_worker_threads.nil? ? Delayed::Worker.new(@queue_options) : ThreadedWorker.new(num_worker_threads, @queue_options)
2528
worker.name = @queue_options[:worker_name]
2629
worker.start
2730
end

lib/delayed_job/threaded_worker.rb

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
class ThreadedWorker < Delayed::Worker
2+
def initialize(num_threads, options={}, grace_period_seconds=30)
3+
super(options)
4+
@num_threads = num_threads
5+
@threads = []
6+
@unexpected_error = false
7+
@mutex = Mutex.new
8+
@grace_period_seconds = grace_period_seconds
9+
end
10+
11+
def start
12+
# add quit trap as in QuitTrap monkey patch
13+
trap('QUIT') do
14+
Thread.new { say 'Exiting...' }
15+
stop
16+
end
17+
18+
trap('TERM') do
19+
Thread.new { say 'Exiting...' }
20+
stop
21+
raise SignalException.new('TERM') if self.class.raise_signal_exceptions
22+
end
23+
24+
trap('INT') do
25+
Thread.new { say 'Exiting...' }
26+
stop
27+
raise SignalException.new('INT') if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term
28+
end
29+
30+
say "Starting threaded delayed worker with #{@num_threads} threads"
31+
32+
@num_threads.times do |thread_index|
33+
thread = Thread.new do
34+
Thread.current[:thread_name] = "thread:#{thread_index + 1}"
35+
threaded_start
36+
rescue Exception => e # rubocop:disable Lint/RescueException
37+
say "Unexpected error: #{e.message}\n#{e.backtrace.join("\n")}", 'error'
38+
@mutex.synchronize { @unexpected_error = true }
39+
stop
40+
end
41+
@mutex.synchronize do
42+
@threads << thread
43+
end
44+
end
45+
46+
@threads.each(&:join)
47+
ensure
48+
raise 'Unexpected error occurred in one of the worker threads' if @unexpected_error
49+
end
50+
51+
def name
52+
base_name = super
53+
thread_name = Thread.current[:thread_name]
54+
thread_name.nil? ? base_name : "#{base_name} #{thread_name}"
55+
end
56+
57+
def stop
58+
Thread.new do
59+
say 'Shutting down worker threads gracefully...'
60+
super
61+
62+
@threads.each do |t|
63+
Thread.new do
64+
t.join(@grace_period_seconds)
65+
if t.alive?
66+
say "Killing thread '#{t[:thread_name]}'"
67+
t.kill
68+
end
69+
end
70+
end.each(&:join) # Ensure all join threads complete
71+
end
72+
end
73+
74+
def threaded_start
75+
self.class.lifecycle.run_callbacks(:execute, self) do
76+
loop do
77+
self.class.lifecycle.run_callbacks(:loop, self) do
78+
@realtime = Benchmark.realtime do
79+
@result = work_off
80+
end
81+
end
82+
83+
count = @result[0] + @result[1]
84+
85+
if count.zero?
86+
if self.class.exit_on_complete
87+
say 'No more jobs available. Exiting'
88+
break
89+
elsif !stop?
90+
sleep(self.class.sleep_delay)
91+
reload!
92+
end
93+
else
94+
say sprintf("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last)
95+
end
96+
break if stop?
97+
end
98+
end
99+
end
100+
end
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
RSpec.describe 'delayed_job' do
22
describe 'version' do
33
it 'is not updated' do
4-
expect(Gem.loaded_specs['delayed_job'].version).to eq('4.1.11'), 'revisit monkey patch in lib/delayed_job/quit_trap.rb'
4+
expect(Gem.loaded_specs['delayed_job'].version).to eq('4.1.11'),
5+
'revisit monkey patch in lib/delayed_job/quit_trap.rb + review the changes related to lib/delayed_job/threaded_worker.rb'
56
end
67
end
78
end

spec/unit/lib/delayed_job/delayed_worker_spec.rb

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
RSpec.describe CloudController::DelayedWorker do
66
let(:options) { { queues: 'default', name: 'test_worker' } }
77
let(:environment) { instance_double(BackgroundJobEnvironment, setup_environment: nil) }
8-
let(:worker) { instance_double(Delayed::Worker, start: nil) }
8+
let(:delayed_worker) { instance_double(Delayed::Worker, start: nil) }
9+
let(:threaded_worker) { instance_double(ThreadedWorker, start: nil) }
910

1011
before do
1112
allow(RakeConfig).to receive(:config).and_return(TestConfig.config_instance)
1213
allow(BackgroundJobEnvironment).to receive(:new).with(anything).and_return(environment)
13-
allow(Delayed::Worker).to receive(:new).and_return(worker)
14-
allow(worker).to receive(:name=).with(anything)
14+
allow(Delayed::Worker).to receive(:new).and_return(delayed_worker)
15+
allow(delayed_worker).to receive(:name=).with(anything)
16+
allow(ThreadedWorker).to receive(:new).and_return(threaded_worker)
17+
allow(threaded_worker).to receive(:name=).with(anything)
1518
end
1619

1720
describe '#initialize' do
@@ -28,22 +31,36 @@
2831
end
2932

3033
describe '#start_working' do
31-
let(:worker_instance) { CloudController::DelayedWorker.new(options) }
34+
let(:cc_delayed_worker) { CloudController::DelayedWorker.new(options) }
3235

3336
it 'sets up the environment and starts the worker' do
34-
expect(environment).to receive(:setup_environment).with(anything)
35-
expect(worker).to receive(:name=).with('test_worker')
36-
expect(worker).to receive(:start)
37+
expect(environment).to receive(:setup_environment).with(nil)
38+
expect(Delayed::Worker).to receive(:new).with(anything).and_return(delayed_worker)
39+
expect(delayed_worker).to receive(:name=).with('test_worker')
40+
expect(delayed_worker).to receive(:start)
3741

38-
worker_instance.start_working
42+
cc_delayed_worker.start_working
3943
end
4044

4145
it 'configures Delayed::Worker settings' do
42-
worker_instance.start_working
46+
cc_delayed_worker.start_working
4347

4448
expect(Delayed::Worker.destroy_failed_jobs).to be false
4549
expect(Delayed::Worker.max_attempts).to eq(3)
4650
expect(Delayed::Worker.max_run_time).to eq(14_401)
4751
end
52+
53+
context 'when the number of threads is specified' do
54+
before { TestConfig.config[:jobs].merge!(number_of_worker_threads: 7) }
55+
56+
it 'creates a ThreadedWorker with the specified number of threads' do
57+
expect(environment).to receive(:setup_environment).with(nil)
58+
expect(ThreadedWorker).to receive(:new).with(7, anything).and_return(threaded_worker)
59+
expect(threaded_worker).to receive(:name=).with('test_worker')
60+
expect(threaded_worker).to receive(:start)
61+
62+
cc_delayed_worker.start_working
63+
end
64+
end
4865
end
4966
end
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
require 'spec_helper'
2+
require 'delayed_job'
3+
require 'delayed_job/threaded_worker'
4+
5+
RSpec.describe ThreadedWorker do
6+
let(:num_threads) { 2 }
7+
let(:grace_period_seconds) { 2 }
8+
let(:worker) { ThreadedWorker.new(num_threads, {}, grace_period_seconds) }
9+
let(:worker_name) { 'instance_name' }
10+
11+
before { worker.name = worker_name }
12+
13+
describe '#initialize' do
14+
it 'sets up the thread count' do
15+
expect(worker.instance_variable_get(:@num_threads)).to eq(num_threads)
16+
end
17+
18+
it 'sets up the grace period' do
19+
expect(worker.instance_variable_get(:@grace_period_seconds)).to eq(grace_period_seconds)
20+
end
21+
22+
it 'sets up the grace period to 30 seconds by default' do
23+
worker = ThreadedWorker.new(num_threads)
24+
expect(worker.instance_variable_get(:@grace_period_seconds)).to eq(30)
25+
end
26+
end
27+
28+
describe '#start' do
29+
before do
30+
allow(worker).to receive(:threaded_start)
31+
end
32+
33+
it 'sets up signal traps for all signals' do
34+
expect(worker).to receive(:trap).with('TERM')
35+
expect(worker).to receive(:trap).with('INT')
36+
expect(worker).to receive(:trap).with('QUIT')
37+
worker.start
38+
end
39+
40+
it 'starts the specified number of threads' do
41+
expect(worker).to receive(:threaded_start).exactly(num_threads).times
42+
43+
expect(worker.instance_variable_get(:@threads).length).to eq(0)
44+
worker.start
45+
expect(worker.instance_variable_get(:@threads).length).to eq(num_threads)
46+
end
47+
48+
it 'logs the start and shutdown messages' do
49+
expect(worker).to receive(:say).with("Starting threaded delayed worker with #{num_threads} threads")
50+
worker.start
51+
end
52+
53+
it 'sets the thread_name variable for each thread' do
54+
worker.start
55+
worker.instance_variable_get(:@threads).each_with_index do |thread, index|
56+
expect(thread[:thread_name]).to eq("thread:#{index + 1}")
57+
end
58+
end
59+
60+
it 'logs the error and stops the worker when an unexpected error occurs' do
61+
allow(worker).to receive(:threaded_start).and_raise(StandardError.new('test error'))
62+
allow(worker).to receive(:stop)
63+
expect { worker.start }.to raise_error('Unexpected error occurred in one of the worker threads')
64+
expect(worker.instance_variable_get(:@unexpected_error)).to be true
65+
end
66+
end
67+
68+
describe '#name' do
69+
it 'returns the instance name if thread name is set' do
70+
allow(Thread.current).to receive(:[]).with(:thread_name).and_return('some-thread-name')
71+
expect(worker.name).to eq('instance_name some-thread-name')
72+
end
73+
74+
it 'returns the instance name if thread name is not set' do
75+
allow(Thread.current).to receive(:[]).with(:thread_name).and_return(nil)
76+
expect(worker.name).to eq(worker_name)
77+
end
78+
end
79+
80+
describe '#stop' do
81+
it 'logs the shutdown message' do
82+
queue = Queue.new
83+
allow(worker).to(receive(:say)) { |message| queue.push(message) }
84+
85+
worker.stop
86+
expect(queue.pop).to eq('Shutting down worker threads gracefully...')
87+
end
88+
89+
it 'sets the exit flag in the parent worker' do
90+
worker.stop
91+
sleep 0.1 until worker.instance_variable_defined?(:@exit)
92+
expect(worker.instance_variable_get(:@exit)).to be true
93+
end
94+
95+
it 'allows threads to finish their work without being killed prematurely' do
96+
allow(worker).to receive(:threaded_start) do
97+
sleep grace_period_seconds / 2 until worker.instance_variable_get(:@exit) == true
98+
end
99+
100+
worker_thread = Thread.new { worker.start }
101+
sleep 0.1 until worker.instance_variable_get(:@threads).length == num_threads && worker.instance_variable_get(:@threads).all?(&:alive?)
102+
worker.instance_variable_get(:@threads).each { |t| allow(t).to receive(:kill).and_call_original }
103+
104+
Thread.new { worker.stop }.join
105+
worker_thread.join
106+
worker.instance_variable_get(:@threads).each { |t| expect(t).not_to have_received(:kill) }
107+
end
108+
109+
it 'kills threads that exceed the grace period during shutdown' do
110+
allow(worker).to receive(:threaded_start) do
111+
sleep grace_period_seconds * 2 until worker.instance_variable_get(:@exit) == true
112+
end
113+
114+
worker_thread = Thread.new { worker.start }
115+
sleep 0.1 until worker.instance_variable_get(:@threads).length == num_threads && worker.instance_variable_get(:@threads).all?(&:alive?)
116+
worker.instance_variable_get(:@threads).each { |t| allow(t).to receive(:kill).and_call_original }
117+
118+
Thread.new { worker.stop }.join
119+
worker_thread.join
120+
expect(worker.instance_variable_get(:@threads)).to all(have_received(:kill))
121+
end
122+
end
123+
124+
describe '#threaded_start' do
125+
before do
126+
allow(worker).to receive(:work_off).and_return([5, 2])
127+
allow(worker).to receive(:sleep)
128+
allow(worker).to receive(:stop?).and_return(false, true)
129+
allow(worker).to receive(:reload!).and_call_original
130+
end
131+
132+
it 'runs the work_off loop twice' do
133+
worker.threaded_start
134+
expect(worker).to have_received(:work_off).twice
135+
end
136+
137+
it 'logs the number of jobs processed' do
138+
expect(worker).to receive(:say).with(%r{7 jobs processed at \d+\.\d+ j/s, 2 failed}).twice
139+
worker.threaded_start
140+
end
141+
142+
it 'reloads the worker if stop is not set' do
143+
allow(worker).to receive(:work_off).and_return([0, 0])
144+
worker.threaded_start
145+
expect(worker).to have_received(:reload!).once
146+
end
147+
148+
context 'when exit_on_complete is set' do
149+
before do
150+
allow(worker.class).to receive(:exit_on_complete).and_return(true)
151+
allow(worker).to receive(:work_off).and_return([0, 0])
152+
end
153+
154+
it 'exits the worker when no more jobs are available' do
155+
expect(worker).to receive(:say).with('No more jobs available. Exiting')
156+
worker.threaded_start
157+
end
158+
end
159+
end
160+
end

0 commit comments

Comments
 (0)