Skip to content

Commit 9a3be9e

Browse files
Throttle calls to a custom configuration loader
Many custom configuration loaders will retrieve the configuration from an external resource. `Resque::Pool` asks the loader for the latest configuration roughly once per second. You may want to reduce load on your external resource by caching the value, and only really fetching the latest configuration after a specific amount of time has passed. Instead of forcing each configuration loader author to re-write (and test) this logic, we provide `Resque::Pool::ConfigThrottle`. See the spec for full details of its behavior.
1 parent 8ad098e commit 9a3be9e

File tree

3 files changed

+174
-0
lines changed

3 files changed

+174
-0
lines changed

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,24 @@ returns. It can optionally implement a `#reset!` method, which will be invoked
180180
when the HUP signal is received, allowing the loader to flush its cache, or
181181
perform any other re-initialization.
182182

183+
You can reduce the frequency that your configuration loader is called by
184+
wrapping it with `Resque::Pool::ConfigThrottle` and specifying a time (in seconds)
185+
to cache the previous value (see [the spec](spec/config_throttle_spec.rb) for
186+
more details):
187+
188+
```ruby
189+
task "resque:pool:setup" do
190+
redis_loader = lambda do |env|
191+
worker_count = Redis.current.get("pool_workers_#{env}").to_i
192+
{"queueA,queueB" => worker_count }
193+
end
194+
195+
# calls through to redis_loader at most once per 10 seconds
196+
Resque::Pool.config_loader = Resque::Pool::ConfigThrottle(10, redis_loader)
197+
end
198+
```
199+
200+
183201
Zero-downtime code deploys
184202
--------------------------
185203

lib/resque/pool/config_throttle.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
module Resque
2+
class Pool
3+
# Throttle the frequency of loading pool configuration
4+
class ConfigThrottle
5+
def initialize(period, config_loader, time_source: Time)
6+
@period = period
7+
@config_loader = config_loader
8+
@resettable = config_loader.respond_to?(:reset!)
9+
@last_check = 0
10+
@time_source = time_source
11+
end
12+
13+
def call(env)
14+
# We do not need to cache per `env`, since the value of `env` will not
15+
# change during the life of the process.
16+
if (now > @last_check + @period)
17+
@cache = @config_loader.call(env)
18+
@last_check = now
19+
end
20+
@cache
21+
end
22+
23+
def reset!
24+
@last_check = 0
25+
if @resettable
26+
@config_loader.reset!
27+
end
28+
end
29+
30+
def now
31+
@time_source.now.to_f
32+
end
33+
end
34+
end
35+
end

spec/config_throttle_spec.rb

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
require 'spec_helper'
2+
require 'resque/pool/config_throttle'
3+
4+
describe Resque::Pool::ConfigThrottle do
5+
let(:fake_time) { FakeTime.new 1445898807 }
6+
7+
it "returns the config returned by the wrapped config loader for given env" do
8+
wrapped_config = {
9+
"dev" => {"qA,qB" => 1},
10+
"prd" => {"qA,qB" => 4}
11+
}
12+
wrapped_loader = lambda {|env| wrapped_config[env] }
13+
throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader)
14+
15+
throttle.call("prd").should eq({"qA,qB" => 4})
16+
end
17+
18+
it "does not call wrapped loader again until the specified period of time has elapsed" do
19+
wrapped_loader = TestConfigLoader.new
20+
wrapped_loader.configuration = {"qA,qB" => 1}
21+
22+
throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader, time_source: fake_time)
23+
first_call = throttle.call("prd")
24+
25+
new_config = {"qA,qB" => 22}
26+
wrapped_loader.configuration = new_config
27+
fake_time.advance_time 6
28+
# config changed, but not enough time elapsed
29+
30+
second_call = throttle.call("prd")
31+
32+
second_call.should eq(first_call)
33+
wrapped_loader.times_called.should == 1
34+
35+
fake_time.advance_time 6
36+
# now, enough time has elapsed to retrieve latest config
37+
38+
third_call = throttle.call("prd")
39+
40+
third_call.should_not eq(first_call)
41+
third_call.should eq(new_config)
42+
wrapped_loader.times_called.should == 2
43+
44+
# further calls continue to use cached value
45+
throttle.call("prd")
46+
throttle.call("prd")
47+
throttle.call("prd")
48+
wrapped_loader.times_called.should == 2
49+
end
50+
51+
it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do
52+
wrapped_loader = TestConfigLoader.new
53+
wrapped_loader.configuration = {"qA,qB" => 1}
54+
55+
throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader, time_source: fake_time)
56+
first_call = throttle.call("prd")
57+
58+
new_config = {"qA,qB" => 22}
59+
wrapped_loader.configuration = new_config
60+
fake_time.advance_time 6
61+
# the 10 second period has *not* elapsed
62+
63+
throttle.reset!
64+
65+
second_call = throttle.call("prd")
66+
67+
second_call.should eq(new_config)
68+
wrapped_loader.times_called.should == 2
69+
end
70+
71+
it "delegates reset! to the wrapped_loader, when supported" do
72+
wrapped_loader = TestConfigLoader.new
73+
throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader)
74+
75+
wrapped_loader.times_reset.should == 0
76+
throttle.reset!
77+
wrapped_loader.times_reset.should == 1
78+
end
79+
80+
it "does not delegate reset! to the wrapped_loader when it is not supported" do
81+
wrapped_loader = lambda {|env| Hash.new }
82+
throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader)
83+
84+
expect {
85+
throttle.reset!
86+
}.to_not raise_error
87+
end
88+
89+
90+
class TestConfigLoader
91+
attr_accessor :configuration
92+
attr_reader :times_called
93+
attr_reader :times_reset
94+
95+
def initialize
96+
@times_called = 0
97+
@times_reset = 0
98+
end
99+
100+
def call(env)
101+
@times_called += 1
102+
configuration
103+
end
104+
105+
def reset!
106+
@times_reset += 1
107+
end
108+
end
109+
110+
class FakeTime
111+
attr_reader :now
112+
113+
def initialize(start_time)
114+
@now = start_time
115+
end
116+
117+
def advance_time(seconds)
118+
@now += seconds
119+
end
120+
end
121+
end

0 commit comments

Comments
 (0)