Skip to content

Commit 8fb4174

Browse files
committed
Change TaskMetadata#timeout to Unix epoch integer
I thought timeout's meaning is changed in v0.9, now I can change its type.
1 parent fb0864f commit 8fb4174

File tree

7 files changed

+42
-31
lines changed

7 files changed

+42
-31
lines changed

lib/perfectqueue/task.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def initialize(client, key, attributes, task_token)
7272
end
7373

7474
def heartbeat!(options={})
75-
@client.heartbeat(@task_token, options)
75+
self.timeout = @client.heartbeat(@task_token, options.merge(last_heartbeat: timeout))
7676
end
7777

7878
def finish!(options={})

lib/perfectqueue/task_metadata.rb

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,11 @@ def created_at
5050
end
5151

5252
def timeout
53-
if t = @attributes[:timeout]
54-
return Time.at(t)
55-
else
56-
return nil
57-
end
53+
@attributes[:timeout]
5854
end
5955

60-
def last_heartbeat
61-
@attributes[:timeout] || 0
56+
def timeout=(v)
57+
@attributes[:timeout] = v
6258
end
6359

6460
def finished?

lib/perfectqueue/task_monitor.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def set_task(task, runner)
6060
@task = task
6161
}
6262
now = Time.now.to_i
63-
while @task && @task.last_heartbeat + @task_heartbeat_interval < now
63+
while @task && @task.timeout + @task_heartbeat_interval < now
6464
sleep 1
6565
end
6666
end
@@ -116,7 +116,7 @@ def run
116116
next_child_heartbeat = @last_child_heartbeat + @child_heartbeat_interval
117117

118118
if @task
119-
next_task_heartbeat = @task.last_heartbeat + @task_heartbeat_interval
119+
next_task_heartbeat = @task.timeout + @task_heartbeat_interval
120120
next_time = [next_child_heartbeat, next_task_heartbeat].min
121121
else
122122
next_time = next_child_heartbeat
@@ -126,7 +126,7 @@ def run
126126
@cond.wait(next_wait) if next_wait > 0
127127

128128
now = Time.now.to_i
129-
if @task && @task.last_heartbeat + @task_heartbeat_interval <= now
129+
if @task && @task.timeout + @task_heartbeat_interval <= now
130130
task_heartbeat
131131
end
132132

@@ -145,7 +145,7 @@ def run
145145
private
146146
def task_heartbeat
147147
task = @task
148-
task.attributes[:timeout] = task.heartbeat!(last_heartbeat: task.last_heartbeat)
148+
task.heartbeat!
149149
rescue
150150
# finished, preempted, etc.
151151
kill_task($!)

spec/multiprocess/child_process_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
end
7474

7575
describe '#process' do
76-
let (:task){ double('task', key: double, last_heartbeat: Time.now.to_i) }
76+
let (:task){ double('task', key: double, timeout: Time.now.to_i) }
7777
before do
7878
expect(runner_insntace).to receive(:run)
7979
end

spec/rdb_compat_backend_spec.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,13 @@
242242
expect(ary[3]).to be_an_instance_of(AcquiredTask)
243243
expect(ary[4]).to be_an_instance_of(AcquiredTask)
244244

245-
now1 = Time.at(now + alive_time)
246-
expect(now1).to receive(:to_time).exactly(5).times.and_call_original
247-
db.list({}){|task| expect(task.timeout).to eq now1.to_time }
245+
now1 = now + alive_time
246+
i = 0
247+
db.list({}) do |task|
248+
expect(task.timeout).to eq now1
249+
i += 1
250+
end
251+
expect(i).to eq(5)
248252
end
249253
end
250254
end

spec/task_metadata_spec.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,19 @@
6060
end
6161

6262
describe 'timeout' do
63-
it 'returns a time of given timeout' do
63+
it 'returns given timeout' do
6464
epoch = 72
6565
tm = TaskMetadata.new(double, double, timeout: epoch)
66-
expect(tm.timeout).to eq(Time.at(epoch))
66+
expect(tm.timeout).to eq(epoch)
67+
end
68+
end
69+
70+
describe 'timeout=' do
71+
it 'sets timeout' do
72+
epoch = 72
73+
tm = TaskMetadata.new(double, double, timeout: 1)
74+
tm.timeout = epoch
75+
expect(tm.timeout).to eq(epoch)
6776
end
6877
end
6978
end

spec/task_monitor_spec.rb

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,17 @@
4343
let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) }
4444
let (:err){ StandardError.new('heartbeat preempted') }
4545
let (:now){ Time.now.to_i }
46-
let (:task){ double('task', attributes: {}, last_heartbeat: now) }
46+
let (:task){ double('task', timeout: now) }
4747
let (:runner){ double('runner') }
48-
before do
49-
tm.set_task(task, double('runner'))
50-
end
51-
it 'calls kill_task($!) on heartbeat error' do
52-
allow(task).to receive(:heartbeat!){ raise err }
53-
expect(tm).to receive(:kill_task).with(err).exactly(:once)
54-
tm.__send__(:task_heartbeat)
48+
context 'timeout' do
49+
before do
50+
tm.set_task(task, double('runner'))
51+
end
52+
it 'calls kill_task($!) on heartbeat error' do
53+
allow(task).to receive(:heartbeat!){ raise err }
54+
expect(tm).to receive(:kill_task).with(err).exactly(:once)
55+
tm.__send__(:task_heartbeat)
56+
end
5557
end
5658
context 'normal' do
5759
before do
@@ -66,10 +68,10 @@
6668
it 'update timeout' do
6769
tasks = client.acquire(now: now-80)
6870
task = tasks[0]
69-
expect(task.last_heartbeat).to eq(now-80+config[:alive_time])
71+
expect(task.timeout).to eq(now-80+config[:alive_time])
7072
allow(Time).to receive(:now).and_return(now-50)
7173
tm.set_task(task, runner)
72-
expect(task.last_heartbeat).to eq(now-50+config[:alive_time])
74+
expect(task.timeout).to eq(now-50+config[:alive_time])
7375
end
7476
end
7577
context 'stolen' do
@@ -85,11 +87,11 @@
8587
it 'raise error' do
8688
tasks = client.acquire(now: now-80)
8789
task1 = tasks[0]
88-
expect(task1.timeout.to_i).to eq(now-80+config[:alive_time])
90+
expect(task1.timeout).to eq(now-80+config[:alive_time])
8991

9092
tasks = client.acquire(now: now-60)
9193
task2 = tasks[0]
92-
expect(task2.timeout.to_i).to eq(now-60+config[:alive_time])
94+
expect(task2.timeout).to eq(now-60+config[:alive_time])
9395

9496
allow(Time).to receive(:now).and_return(now-50)
9597
expect(runner).to receive(:kill)
@@ -109,7 +111,7 @@
109111
it 'raise error' do
110112
tasks = client.acquire(now: now-80)
111113
task1 = tasks[0]
112-
expect(task1.timeout.to_i).to eq(now-80+config[:alive_time])
114+
expect(task1.timeout).to eq(now-80+config[:alive_time])
113115

114116
allow(Time).to receive(:now).and_return(now-50)
115117
tm.set_task(task1, runner)

0 commit comments

Comments
 (0)