From 397ff55435694e5c85d32bd4a540e5d03b350960 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 22 Oct 2025 13:40:12 +0200 Subject: [PATCH 01/12] Add puts statements to find hanging location --- spec/inputs/redis_spec.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index d070f13..93b464e 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -367,11 +367,15 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread + puts "starting run thread" rt = run_it_thread(subject) #simulate the other system thread + puts "starting publish thread" publish_thread(subject.send(:new_redis_instance), 'c').join + puts "joined publish thread" #simulate the pipeline thread close_thread(subject, rt).join + puts "joined close thread" expect(queue.size).to eq(2) end @@ -405,11 +409,15 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread + puts "starting run thread" rt = run_it_thread(subject) #simulate the other system thread + puts "starting publish thread" publish_thread(subject.send(:new_redis_instance), 'pc').join + puts "joined publish thread" #simulate the pipeline thread close_thread(subject, rt).join + puts "joined close thread" expect(queue.size).to eq(2) end From ff5cadce1e06234491a86f4aef18618195c78521 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 22 Oct 2025 17:15:32 +0200 Subject: [PATCH 02/12] Print out queue size --- spec/inputs/redis_spec.rb | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 93b464e..b0693a4 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -332,6 +332,7 @@ def publish_thread(new_redis, prefix) def close_thread(inst, rt) Thread.new(inst, rt) do |subj, runner| # block for the messages + puts "close_thread: queue.size #{queue.size}" e1 = queue.pop e2 = queue.pop # put em back for the tests @@ -367,15 +368,15 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread - puts "starting run thread" + puts "starting run thread queue.size: #{queue.size}" rt = run_it_thread(subject) #simulate the other system thread - puts "starting publish thread" + puts "starting publish thread queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'c').join - puts "joined publish thread" + puts "joined publish thread queue.size: #{queue.size}" #simulate the pipeline thread close_thread(subject, rt).join - puts "joined close thread" + puts "joined close thread queue.size: #{queue.size}" expect(queue.size).to eq(2) end @@ -409,15 +410,15 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do #simulate the input thread - puts "starting run thread" + puts "starting run thread, queue.size: #{queue.size}" rt = run_it_thread(subject) #simulate the other system thread - puts "starting publish thread" + puts "starting publish thread queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'pc').join - puts "joined publish thread" + puts "joined publish thread queue.size: #{queue.size}" #simulate the pipeline thread close_thread(subject, rt).join - puts "joined close thread" + puts "joined close thread queue.size: #{queue.size}" expect(queue.size).to eq(2) end From eb695b2db1a847dcb39e39664bae7d9563612f3b Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 11:17:54 +0200 Subject: [PATCH 03/12] Use local queue objects --- spec/inputs/redis_spec.rb | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index b0693a4..5944495 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -69,7 +69,7 @@ def process(conf, event_count) end describe LogStash::Inputs::Redis do - let(:queue) { Queue.new } + #let(:queue) { Queue.new } let(:data_type) { 'list' } let(:batch_count) { 1 } @@ -141,6 +141,8 @@ def process(conf, event_count) context 'runtime for list data_type' do + let(:queue) { Queue.new } + before do subject.register allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true @@ -314,7 +316,7 @@ def process(conf, event_count) before { subject.register } - def run_it_thread(inst) + def run_it_thread(inst, queue) Thread.new(inst) do |subj| subj.run(queue) end @@ -329,7 +331,7 @@ def publish_thread(new_redis, prefix) end end - def close_thread(inst, rt) + def close_thread(inst, rt, queue) Thread.new(inst, rt) do |subj, runner| # block for the messages puts "close_thread: queue.size #{queue.size}" @@ -367,26 +369,28 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do + queue = Queue.new #simulate the input thread puts "starting run thread queue.size: #{queue.size}" - rt = run_it_thread(subject) + rt = run_it_thread(subject, queue) #simulate the other system thread puts "starting publish thread queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'c').join puts "joined publish thread queue.size: #{queue.size}" #simulate the pipeline thread - close_thread(subject, rt).join + close_thread(subject, rt, queue).join puts "joined close thread queue.size: #{queue.size}" expect(queue.size).to eq(2) end it 'events had redis_channel' do + queue = Queue.new #simulate the input thread - rt = run_it_thread(subject) + rt = run_it_thread(subject, queue) #simulate the other system thread publish_thread(subject.send(:new_redis_instance), 'c').join #simulate the pipeline thread - close_thread(subject, rt).join + close_thread(subject, rt, queue).join e1 = queue.pop e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') @@ -409,27 +413,29 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do + queue = Queue.new #simulate the input thread puts "starting run thread, queue.size: #{queue.size}" - rt = run_it_thread(subject) + rt = run_it_thread(subject, queue) #simulate the other system thread puts "starting publish thread queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'pc').join puts "joined publish thread queue.size: #{queue.size}" #simulate the pipeline thread - close_thread(subject, rt).join + close_thread(subject, rt, queue).join puts "joined close thread queue.size: #{queue.size}" expect(queue.size).to eq(2) end it 'events had redis_channel' do + queue = Queue.new #simulate the input thread - rt = run_it_thread(subject) + rt = run_it_thread(subject, queue) #simulate the other system thread publish_thread(subject.send(:new_redis_instance), 'pc').join #simulate the pipeline thread - close_thread(subject, rt).join + close_thread(subject, rt, queue).join e1 = queue.pop e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') From 4895f63c3dd99bc6f9ab0c5d22145b90722bd8db Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 11:29:18 +0200 Subject: [PATCH 04/12] More tests that need a local queue object --- spec/inputs/redis_spec.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 5944495..5be7636 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -32,6 +32,8 @@ def process(conf, event_count) describe "inputs/redis", :redis => true do + let(:queue) { Queue.new } + it "should read events from a list" do key = SecureRandom.hex event_count = 1000 + rand(50) From 580c1dd0098e29853d56d5f9d30ef0295b313251 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 11:44:16 +0200 Subject: [PATCH 05/12] Revert "More tests that need a local queue object" This reverts commit 4895f63c3dd99bc6f9ab0c5d22145b90722bd8db. --- spec/inputs/redis_spec.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 5be7636..5944495 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -32,8 +32,6 @@ def process(conf, event_count) describe "inputs/redis", :redis => true do - let(:queue) { Queue.new } - it "should read events from a list" do key = SecureRandom.hex event_count = 1000 + rand(50) From bc74c6a88e0ef09e8bc0ac79e99ed7ff04d38dbe Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 12:35:43 +0200 Subject: [PATCH 06/12] More debug statements --- spec/inputs/redis_spec.rb | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 5944495..c5bd142 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -334,8 +334,10 @@ def publish_thread(new_redis, prefix) def close_thread(inst, rt, queue) Thread.new(inst, rt) do |subj, runner| # block for the messages - puts "close_thread: queue.size #{queue.size}" + puts "close_thread: queue: #{queue.object_id}, queue.size #{queue.size}" + puts "popping first event" e1 = queue.pop + puts "popping second event" e2 = queue.pop # put em back for the tests queue.push(e1) @@ -371,26 +373,30 @@ def close_thread(inst, rt, queue) it 'calling the run method, adds events to the queue' do queue = Queue.new #simulate the input thread - puts "starting run thread queue.size: #{queue.size}" + puts "channel real redis: starting run thread queue: #{queue.object_id}, queue.size: #{queue.size}" rt = run_it_thread(subject, queue) #simulate the other system thread - puts "starting publish thread queue.size: #{queue.size}" + puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'c').join - puts "joined publish thread queue.size: #{queue.size}" + puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the pipeline thread close_thread(subject, rt, queue).join - puts "joined close thread queue.size: #{queue.size}" + puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}" expect(queue.size).to eq(2) end it 'events had redis_channel' do queue = Queue.new #simulate the input thread + puts "channel: events had redis_channel: starting run thread queue: #{queue.object_id}, queue.size: #{queue.size}" rt = run_it_thread(subject, queue) + puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the other system thread publish_thread(subject.send(:new_redis_instance), 'c').join + puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the pipeline thread close_thread(subject, rt, queue).join + puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}" e1 = queue.pop e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') @@ -415,15 +421,15 @@ def close_thread(inst, rt, queue) it 'calling the run method, adds events to the queue' do queue = Queue.new #simulate the input thread - puts "starting run thread, queue.size: #{queue.size}" + puts "pattern_channel: real redis: starting run thread, queue: #{queue.object_id}, queue.size: #{queue.size}" rt = run_it_thread(subject, queue) #simulate the other system thread - puts "starting publish thread queue.size: #{queue.size}" + puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'pc').join - puts "joined publish thread queue.size: #{queue.size}" + puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the pipeline thread close_thread(subject, rt, queue).join - puts "joined close thread queue.size: #{queue.size}" + puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}" expect(queue.size).to eq(2) end @@ -431,11 +437,15 @@ def close_thread(inst, rt, queue) it 'events had redis_channel' do queue = Queue.new #simulate the input thread + puts "pattern_channel: redis_channel: starting run thread, queue: #{queue.object_id}, queue.size: #{queue.size}" rt = run_it_thread(subject, queue) #simulate the other system thread + puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'pc').join + puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the pipeline thread close_thread(subject, rt, queue).join + puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}" e1 = queue.pop e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') From bc25e873f586bf7c7c63a0e71617462b3c64119e Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 12:59:30 +0200 Subject: [PATCH 07/12] More debug puts --- spec/inputs/redis_spec.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index c5bd142..dec01c4 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -337,8 +337,10 @@ def close_thread(inst, rt, queue) puts "close_thread: queue: #{queue.object_id}, queue.size #{queue.size}" puts "popping first event" e1 = queue.pop + puts "got event #{e1.object_id}" puts "popping second event" e2 = queue.pop + puts "got event #{e2.object_id}" # put em back for the tests queue.push(e1) queue.push(e2) From b33daf6ec717ca3978ecbaa27dc5ecc4d0c5fb7e Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 13:39:55 +0200 Subject: [PATCH 08/12] Try with json object --- spec/inputs/redis_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index dec01c4..ac382f3 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -326,7 +326,7 @@ def publish_thread(new_redis, prefix) Thread.new(new_redis, prefix) do |r, p| sleep 0.1 2.times do |i| - r.publish('foo', "#{p}#{i.next}") + r.publish('foo', {"#{p}#" => "{i.next}"}.to_json) end end end From e225d47196b84e1f7f2b2c24c595c844290b7dce Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 13:52:23 +0200 Subject: [PATCH 09/12] More debugging --- lib/logstash/inputs/redis.rb | 7 +++++++ spec/inputs/redis_spec.rb | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index e8a2667..837b9fc 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -161,9 +161,12 @@ def queue_event(msg, output_queue, channel=nil) @codec.decode(msg) do |event| decorate(event) event.set("[@metadata][redis_channel]", channel) if !channel.nil? + puts "queueing event #{event.object_id} onto queue #{output_queue.object_id}" output_queue << event + puts "event #{event.object_id} successfully enqueued, queue: #{output_queue.object_id}" end rescue => e # parse or event creation error + puts "couldn't queue event onto queue #{queue.object_id}" @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace); end end @@ -314,7 +317,9 @@ def channel_listener(output_queue) end on.message do |channel, message| + puts "received message #{message}" queue_event(message, output_queue, channel) + puts "successfully queued message #{message}" end on.unsubscribe do |channel, count| @@ -337,7 +342,9 @@ def pattern_channel_listener(output_queue) end on.pmessage do |pattern, channel, message| + puts "received message #{message}" queue_event(message, output_queue, channel) + puts "successfully queued message #{message}" end on.punsubscribe do |channel, count| diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index ac382f3..57bd3f3 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -326,7 +326,7 @@ def publish_thread(new_redis, prefix) Thread.new(new_redis, prefix) do |r, p| sleep 0.1 2.times do |i| - r.publish('foo', {"#{p}#" => "{i.next}"}.to_json) + r.publish('foo', {"#{p}" => "#{i.next}"}.to_json) end end end From 59ba446c053fa45a31a28aa0fdcaa10c1bd6b814 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 14:14:59 +0200 Subject: [PATCH 10/12] More debugging --- lib/logstash/inputs/redis.rb | 4 ++-- spec/inputs/redis_spec.rb | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 837b9fc..451520e 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -161,9 +161,9 @@ def queue_event(msg, output_queue, channel=nil) @codec.decode(msg) do |event| decorate(event) event.set("[@metadata][redis_channel]", channel) if !channel.nil? - puts "queueing event #{event.object_id} onto queue #{output_queue.object_id}" + puts "enqueueing event #{event.to_json} onto queue #{output_queue.object_id}" output_queue << event - puts "event #{event.object_id} successfully enqueued, queue: #{output_queue.object_id}" + puts "event #{event.to_json} successfully enqueued, queue: #{output_queue.object_id}" end rescue => e # parse or event creation error puts "couldn't queue event onto queue #{queue.object_id}" diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 57bd3f3..0b2df75 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -326,6 +326,7 @@ def publish_thread(new_redis, prefix) Thread.new(new_redis, prefix) do |r, p| sleep 0.1 2.times do |i| + puts "i is #{i}" r.publish('foo', {"#{p}" => "#{i.next}"}.to_json) end end From 30c99fa396ceed5d612c2f7eb06916cfd06408be Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 14:22:15 +0200 Subject: [PATCH 11/12] debugging --- lib/logstash/inputs/redis.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index 451520e..d002c0c 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -161,9 +161,9 @@ def queue_event(msg, output_queue, channel=nil) @codec.decode(msg) do |event| decorate(event) event.set("[@metadata][redis_channel]", channel) if !channel.nil? - puts "enqueueing event #{event.to_json} onto queue #{output_queue.object_id}" + puts "enqueueing event #{event.to_json} onto queue #{output_queue.object_id}" if msg['c'] || msg['p'] output_queue << event - puts "event #{event.to_json} successfully enqueued, queue: #{output_queue.object_id}" + puts "event #{event.to_json} successfully enqueued, queue: #{output_queue.object_id}" if msg['c'] || msg['p'] end rescue => e # parse or event creation error puts "couldn't queue event onto queue #{queue.object_id}" From 0b1e2264af5b2966f8b59f37f1d9a8c3889ac3c2 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 23 Oct 2025 14:36:47 +0200 Subject: [PATCH 12/12] debugging --- lib/logstash/inputs/redis.rb | 4 ++-- spec/inputs/redis_spec.rb | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index d002c0c..4c56007 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -319,7 +319,7 @@ def channel_listener(output_queue) on.message do |channel, message| puts "received message #{message}" queue_event(message, output_queue, channel) - puts "successfully queued message #{message}" + puts "successfully enqueued message #{message}" end on.unsubscribe do |channel, count| @@ -344,7 +344,7 @@ def pattern_channel_listener(output_queue) on.pmessage do |pattern, channel, message| puts "received message #{message}" queue_event(message, output_queue, channel) - puts "successfully queued message #{message}" + puts "successfully enqueued message #{message}" end on.punsubscribe do |channel, count| diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index 0b2df75..71c31e1 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -338,10 +338,10 @@ def close_thread(inst, rt, queue) puts "close_thread: queue: #{queue.object_id}, queue.size #{queue.size}" puts "popping first event" e1 = queue.pop - puts "got event #{e1.object_id}" + puts "got event #{e1.to_hash}" puts "popping second event" e2 = queue.pop - puts "got event #{e2.object_id}" + puts "got event #{e2.to_hash}" # put em back for the tests queue.push(e1) queue.push(e2)