Skip to content

Commit f2e548f

Browse files
authored
Merge pull request #143 from nevans/config_loaders_namespace
Config loaders namespace
2 parents bf7a283 + 7dd5918 commit f2e548f

File tree

7 files changed

+279
-62
lines changed

7 files changed

+279
-62
lines changed

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,26 @@ returns. It can optionally implement a `#reset!` method, which will be invoked
191191
when the HUP signal is received, allowing the loader to flush its cache, or
192192
perform any other re-initialization.
193193

194+
You can reduce the frequency that your configuration loader is called by
195+
wrapping it with `Resque::Pool::ConfigLoaders::Throttled` and specifying a time
196+
(in seconds) to cache the previous value:
197+
198+
```ruby
199+
task "resque:pool:setup" do
200+
redis_loader = lambda do |env|
201+
worker_count = Redis.current.get("pool_workers_#{env}").to_i
202+
{ "queueA,queueB" => worker_count }
203+
end
204+
205+
# calls through to redis_loader at most once per 10 seconds
206+
Resque::Pool.config_loader = Resque::Pool::ConfigLoaders::Throttled.new(
207+
redis_loader, period: 10
208+
)
209+
end
210+
```
211+
212+
See [the spec](spec/config_loaders/throttled_spec.rb) for more details.
213+
194214
Zero-downtime code deploys
195215
--------------------------
196216

lib/resque/pool.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
require 'resque/pool/version'
55
require 'resque/pool/logging'
66
require 'resque/pool/pooled_worker'
7-
require 'resque/pool/file_or_hash_loader'
7+
require 'resque/pool/config_loaders/file_or_hash_loader'
88
require 'erb'
99
require 'fcntl'
1010
require 'yaml'
@@ -119,7 +119,7 @@ def self.create_configured
119119
def init_config(loader)
120120
case loader
121121
when String, Hash, nil
122-
@config_loader = FileOrHashLoader.new(loader)
122+
@config_loader = ConfigLoaders::FileOrHashLoader.new(loader)
123123
else
124124
@config_loader = loader
125125
end
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
module Resque
2+
class Pool
3+
module ConfigLoaders
4+
5+
class FileOrHashLoader
6+
def initialize(filename_or_hash=nil)
7+
case filename_or_hash
8+
when String, nil
9+
@filename = filename_or_hash
10+
when Hash
11+
@static_config = filename_or_hash.dup
12+
else
13+
raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}"
14+
end
15+
end
16+
17+
def call(environment)
18+
@config ||= load_config_from_file(environment)
19+
end
20+
21+
def reset!
22+
@config = nil
23+
end
24+
25+
private
26+
27+
def load_config_from_file(environment)
28+
if @static_config
29+
new_config = @static_config
30+
else
31+
filename = config_filename
32+
new_config = load_config filename
33+
end
34+
apply_environment new_config, environment
35+
end
36+
37+
def apply_environment(config, environment)
38+
environment and config[environment] and config.merge!(config[environment])
39+
config.delete_if {|key, value| value.is_a? Hash }
40+
end
41+
42+
def config_filename
43+
@filename || choose_config_file
44+
end
45+
46+
def load_config(filename)
47+
return {} unless filename
48+
YAML.load(ERB.new(IO.read(filename)).result)
49+
end
50+
51+
CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"]
52+
def choose_config_file
53+
if ENV["RESQUE_POOL_CONFIG"]
54+
ENV["RESQUE_POOL_CONFIG"]
55+
else
56+
CONFIG_FILES.detect { |f| File.exist?(f) }
57+
end
58+
end
59+
end
60+
61+
end
62+
end
63+
end
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
require "delegate"
2+
3+
module Resque
4+
class Pool
5+
6+
module ConfigLoaders
7+
8+
# Throttle the frequency of loading pool configuration
9+
# Defaults to call only once per 10 seconds.
10+
class Throttled < SimpleDelegator
11+
12+
def initialize(config_loader, period: 10, time_source: Time)
13+
super(config_loader)
14+
@period = period
15+
@resettable = config_loader.respond_to?(:reset!)
16+
@last_check = 0
17+
@time_source = time_source
18+
end
19+
20+
def call(env)
21+
# We do not need to cache per `env`, since the value of `env` will not
22+
# change during the life of the process.
23+
if (now > @last_check + @period)
24+
@cache = super
25+
@last_check = now
26+
end
27+
@cache
28+
end
29+
30+
def reset!
31+
@last_check = 0
32+
super if @resettable
33+
end
34+
35+
private
36+
37+
def now
38+
@time_source.now.to_f
39+
end
40+
end
41+
42+
end
43+
end
44+
end

lib/resque/pool/file_or_hash_loader.rb

Lines changed: 0 additions & 59 deletions
This file was deleted.
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
require 'spec_helper'
2+
require 'resque/pool/config_loaders/throttled'
3+
4+
module Resque::Pool::ConfigLoaders
5+
6+
describe Throttled do
7+
let(:fake_time) { FakeTime.new 1445898807 }
8+
9+
it "returns the config returned by the wrapped config loader for given env" do
10+
wrapped_config = {
11+
"dev" => {"qA,qB" => 1},
12+
"prd" => {"qA,qB" => 4}
13+
}
14+
wrapped_loader = lambda {|env| wrapped_config[env] }
15+
throttle = Throttled.new(wrapped_loader)
16+
17+
throttle.call("prd").should eq({"qA,qB" => 4})
18+
end
19+
20+
it "does not call wrapped loader again until the default period of time has elapsed" do
21+
wrapped_loader = TestConfigLoader.new
22+
wrapped_loader.configuration = {"qA,qB" => 1}
23+
24+
throttle = Throttled.new(wrapped_loader, time_source: fake_time)
25+
first_call = throttle.call("prd")
26+
27+
new_config = {"qA,qB" => 22}
28+
wrapped_loader.configuration = new_config
29+
fake_time.advance_time 6
30+
# config changed, but not enough time elapsed
31+
32+
second_call = throttle.call("prd")
33+
34+
second_call.should eq(first_call)
35+
wrapped_loader.times_called.should == 1
36+
37+
fake_time.advance_time 6
38+
# now, enough time has elapsed to retrieve latest config
39+
40+
third_call = throttle.call("prd")
41+
42+
third_call.should_not eq(first_call)
43+
third_call.should eq(new_config)
44+
wrapped_loader.times_called.should == 2
45+
46+
# further calls continue to use cached value
47+
throttle.call("prd")
48+
throttle.call("prd")
49+
throttle.call("prd")
50+
wrapped_loader.times_called.should == 2
51+
end
52+
53+
it "can specify an alternate cache period" do
54+
config0 = {foo: 2, bar: 1}
55+
config1 = {bar: 3, baz: 9}
56+
config2 = {foo: 4, quux: 1}
57+
wrapped_loader = TestConfigLoader.new
58+
wrapped_loader.configuration = config0
59+
throttle = Throttled.new(
60+
wrapped_loader, period: 60, time_source: fake_time
61+
)
62+
throttle.call("prd").should eq(config0)
63+
wrapped_loader.configuration = config1
64+
fake_time.advance_time 59
65+
throttle.call("prd").should eq(config0)
66+
fake_time.advance_time 5
67+
throttle.call("prd").should eq(config1)
68+
wrapped_loader.configuration = config2
69+
fake_time.advance_time 59
70+
throttle.call("prd").should eq(config1)
71+
fake_time.advance_time 2
72+
throttle.call("prd").should eq(config2)
73+
end
74+
75+
it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do
76+
wrapped_loader = TestConfigLoader.new
77+
wrapped_loader.configuration = {"qA,qB" => 1}
78+
79+
throttle = Throttled.new(wrapped_loader, time_source: fake_time)
80+
throttle.call("prd")
81+
82+
new_config = {"qA,qB" => 22}
83+
wrapped_loader.configuration = new_config
84+
fake_time.advance_time 6
85+
# the 10 second period has *not* elapsed
86+
87+
throttle.reset!
88+
89+
second_call = throttle.call("prd")
90+
91+
second_call.should eq(new_config)
92+
wrapped_loader.times_called.should == 2
93+
end
94+
95+
it "delegates reset! to the wrapped_loader, when supported" do
96+
wrapped_loader = TestConfigLoader.new
97+
throttle = Throttled.new(wrapped_loader)
98+
99+
wrapped_loader.times_reset.should == 0
100+
throttle.reset!
101+
wrapped_loader.times_reset.should == 1
102+
end
103+
104+
it "does not delegate reset! to the wrapped_loader when it is not supported" do
105+
wrapped_loader = lambda {|env| Hash.new }
106+
throttle = Throttled.new(wrapped_loader)
107+
108+
expect {
109+
throttle.reset!
110+
}.to_not raise_error
111+
end
112+
113+
class TestConfigLoader
114+
attr_accessor :configuration
115+
attr_reader :times_called
116+
attr_reader :times_reset
117+
118+
def initialize
119+
@times_called = 0
120+
@times_reset = 0
121+
end
122+
123+
def call(env)
124+
@times_called += 1
125+
configuration
126+
end
127+
128+
def reset!
129+
@times_reset += 1
130+
end
131+
end
132+
133+
class FakeTime
134+
attr_reader :now
135+
136+
def initialize(start_time)
137+
@now = start_time
138+
end
139+
140+
def advance_time(seconds)
141+
@now += seconds
142+
end
143+
end
144+
145+
end
146+
147+
end

spec/resque_pool_spec.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,9 @@ module Rails; end
248248
subject { Resque::Pool.create_configured }
249249

250250
it "created pools use config file and hash loading logic" do
251-
subject.config_loader.should be_instance_of Resque::Pool::FileOrHashLoader
251+
subject.config_loader.should be_instance_of(
252+
Resque::Pool::ConfigLoaders::FileOrHashLoader
253+
)
252254
end
253255
end
254256

0 commit comments

Comments
 (0)