diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index e8a2667..4c56007 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -161,9 +161,12 @@ def queue_event(msg, output_queue, channel=nil) @codec.decode(msg) do |event| decorate(event) event.set("[@metadata][redis_channel]", channel) if !channel.nil? + puts "enqueueing event #{event.to_json} onto queue #{output_queue.object_id}" if msg['c'] || msg['p'] output_queue << event + puts "event #{event.to_json} successfully enqueued, queue: #{output_queue.object_id}" if msg['c'] || msg['p'] end rescue => e # parse or event creation error + puts "couldn't queue event onto queue #{queue.object_id}" @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace); end end @@ -314,7 +317,9 @@ def channel_listener(output_queue) end on.message do |channel, message| + puts "received message #{message}" queue_event(message, output_queue, channel) + puts "successfully enqueued message #{message}" end on.unsubscribe do |channel, count| @@ -337,7 +342,9 @@ def pattern_channel_listener(output_queue) end on.pmessage do |pattern, channel, message| + puts "received message #{message}" queue_event(message, output_queue, channel) + puts "successfully enqueued message #{message}" end on.punsubscribe do |channel, count| diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index d070f13..71c31e1 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -69,7 +69,7 @@ def process(conf, event_count) end describe LogStash::Inputs::Redis do - let(:queue) { Queue.new } + #let(:queue) { Queue.new } let(:data_type) { 'list' } let(:batch_count) { 1 } @@ -141,6 +141,8 @@ def process(conf, event_count) context 'runtime for list data_type' do + let(:queue) { Queue.new } + before do subject.register allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true @@ -314,7 +316,7 @@ def process(conf, event_count) before { subject.register } - def run_it_thread(inst) + def run_it_thread(inst, queue) Thread.new(inst) do |subj| subj.run(queue) end @@ -324,16 +326,22 @@ def publish_thread(new_redis, prefix) Thread.new(new_redis, prefix) do |r, p| sleep 0.1 2.times do |i| - r.publish('foo', "#{p}#{i.next}") + puts "i is #{i}" + r.publish('foo', {"#{p}" => "#{i.next}"}.to_json) end end end - def close_thread(inst, rt) + def close_thread(inst, rt, queue) Thread.new(inst, rt) do |subj, runner| # block for the messages + puts "close_thread: queue: #{queue.object_id}, queue.size #{queue.size}" + puts "popping first event" e1 = queue.pop + puts "got event #{e1.to_hash}" + puts "popping second event" e2 = queue.pop + puts "got event #{e2.to_hash}" # put em back for the tests queue.push(e1) queue.push(e2) @@ -366,22 +374,32 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do + queue = Queue.new #simulate the input thread - rt = run_it_thread(subject) + puts "channel real redis: starting run thread queue: #{queue.object_id}, queue.size: #{queue.size}" + rt = run_it_thread(subject, queue) #simulate the other system thread + puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'c').join + puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the pipeline thread - close_thread(subject, rt).join + close_thread(subject, rt, queue).join + puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}" expect(queue.size).to eq(2) end it 'events had redis_channel' do + queue = Queue.new #simulate the input thread - rt = run_it_thread(subject) + puts "channel: events had redis_channel: starting run thread queue: #{queue.object_id}, queue.size: #{queue.size}" + rt = run_it_thread(subject, queue) + puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the other system thread publish_thread(subject.send(:new_redis_instance), 'c').join + puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the pipeline thread - close_thread(subject, rt).join + close_thread(subject, rt, queue).join + puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}" e1 = queue.pop e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo') @@ -404,23 +422,33 @@ def close_thread(inst, rt) context 'real redis', :redis => true do it 'calling the run method, adds events to the queue' do + queue = Queue.new #simulate the input thread - rt = run_it_thread(subject) + puts "pattern_channel: real redis: starting run thread, queue: #{queue.object_id}, queue.size: #{queue.size}" + rt = run_it_thread(subject, queue) #simulate the other system thread + puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'pc').join + puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the pipeline thread - close_thread(subject, rt).join + close_thread(subject, rt, queue).join + puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}" expect(queue.size).to eq(2) end it 'events had redis_channel' do + queue = Queue.new #simulate the input thread - rt = run_it_thread(subject) + puts "pattern_channel: redis_channel: starting run thread, queue: #{queue.object_id}, queue.size: #{queue.size}" + rt = run_it_thread(subject, queue) #simulate the other system thread + puts "starting publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" publish_thread(subject.send(:new_redis_instance), 'pc').join + puts "joined publish thread queue: #{queue.object_id}, queue.size: #{queue.size}" #simulate the pipeline thread - close_thread(subject, rt).join + close_thread(subject, rt, queue).join + puts "joined close thread queue: #{queue.object_id}, queue.size: #{queue.size}" e1 = queue.pop e2 = queue.pop expect(e1.get('[@metadata][redis_channel]')).to eq('foo')