From a87918cb02f3c5f67dc6ab105189500b52f39b92 Mon Sep 17 00:00:00 2001 From: val Date: Fri, 29 Apr 2016 10:59:02 +0200 Subject: [PATCH 01/15] Applied patch to send headers --- lib/logstash/outputs/stomp.rb | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 93c53c6..0460f87 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -27,6 +27,12 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base # The vhost to use config :vhost, :validate => :string, :default => nil + # Custom headers to send with each message. Supports string expansion, meaning + # %{foo} values will expand to the field value. + # + # Example: headers => ["amq-msg-type", "text", "host", "%{host}"] + config :headers, :validate => :hash + # Enable debugging output? config :debug, :validate => :boolean, :default => false @@ -59,9 +65,15 @@ def register end # def register def receive(event) - - @logger.debug(["stomp sending event", { :host => @host, :event => event }]) - @client.send(event.sprintf(@destination), event.to_json) + headers = Hash.new + if @headers + @headers.each do |k,v| + headers[k] = event.sprintf(v) + end + end + + @logger.debug(["stomp sending event", { :host => @host, :event => event, :headers => headers }]) + @client.send(event.sprintf(@destination), event.to_json, headers) end # def receive end # class LogStash::Outputs::Stomp From 89952890b681745e3caad620681efc2d4709df82 Mon Sep 17 00:00:00 2001 From: val Date: Thu, 16 Jun 2016 11:20:12 +0200 Subject: [PATCH 02/15] #8: Not all messages get sent --- lib/logstash/outputs/stomp.rb | 41 ++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 0460f87..f81b194 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -33,6 +33,9 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base # Example: headers => ["amq-msg-type", "text", "host", "%{host}"] config :headers, :validate => :hash + # Whether to send the messages in batch or one-by-one + config :batch, :validate => :boolean, :default => false + # Enable debugging output? config :debug, :validate => :boolean, :default => false @@ -63,17 +66,35 @@ def register connect end # def register - - def receive(event) - - headers = Hash.new - if @headers - @headers.each do |k,v| - headers[k] = event.sprintf(v) - end + + public + def do_close + @logger.warn("Disconnecting from stomp broker") + @client.disconnect if @client.connected? + end # def do_close + + def multi_receive(events) + + headers = Hash.new + if @headers + @headers.each do |k,v| + headers[k] = event.sprintf(v) end + end + + if @batch + @logger.debug(["stomp sending events in batch", { :host => @host, :events => events.length, :headers => headers }]) - @logger.debug(["stomp sending event", { :host => @host, :event => event, :headers => headers }]) - @client.send(event.sprintf(@destination), event.to_json, headers) + @client.transaction do |t| + events.each { |event| + t.send(event.sprintf(@destination), event.to_json, headers) + } + end + else + events.each { |event| + @logger.debug(["stomp sending event", { :host => @host, :event => event, :headers => headers }]) + @client.send(event.sprintf(@destination), event.to_json, headers) + } + end end # def receive end # class LogStash::Outputs::Stomp From 8da7fd1726f5929c765d2bd71afbe22e69015576 Mon Sep 17 00:00:00 2001 From: val Date: Thu, 16 Jun 2016 15:15:36 +0200 Subject: [PATCH 03/15] #8: Not all messages get sent --- lib/logstash/outputs/stomp.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index f81b194..4e7785d 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -68,10 +68,10 @@ def register end # def register public - def do_close + def close @logger.warn("Disconnecting from stomp broker") @client.disconnect if @client.connected? - end # def do_close + end # def close def multi_receive(events) From 50759851794a8453d7644edbbc23d6e51ca2f0f2 Mon Sep 17 00:00:00 2001 From: val Date: Thu, 16 Jun 2016 15:18:19 +0200 Subject: [PATCH 04/15] #8: Not all messages get sent (no need for batch mode) --- lib/logstash/outputs/stomp.rb | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 4e7785d..0e69bcb 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -33,9 +33,6 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base # Example: headers => ["amq-msg-type", "text", "host", "%{host}"] config :headers, :validate => :hash - # Whether to send the messages in batch or one-by-one - config :batch, :validate => :boolean, :default => false - # Enable debugging output? config :debug, :validate => :boolean, :default => false @@ -82,18 +79,11 @@ def multi_receive(events) end end - if @batch - @logger.debug(["stomp sending events in batch", { :host => @host, :events => events.length, :headers => headers }]) + @logger.debug(["stomp sending events in batch", { :host => @host, :events => events.length, :headers => headers }]) - @client.transaction do |t| - events.each { |event| - t.send(event.sprintf(@destination), event.to_json, headers) - } - end - else + @client.transaction do |t| events.each { |event| - @logger.debug(["stomp sending event", { :host => @host, :event => event, :headers => headers }]) - @client.send(event.sprintf(@destination), event.to_json, headers) + t.send(event.sprintf(@destination), event.to_json, headers) } end end # def receive From bde026ae2548ecde94690c55a224f3617f710a56 Mon Sep 17 00:00:00 2001 From: val Date: Tue, 21 Jun 2016 10:49:59 +0200 Subject: [PATCH 05/15] #13: undefined local variable or method "event" --- lib/logstash/outputs/stomp.rb | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 0e69bcb..ddc51a6 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -72,17 +72,17 @@ def close def multi_receive(events) - headers = Hash.new - if @headers - @headers.each do |k,v| - headers[k] = event.sprintf(v) - end - end - - @logger.debug(["stomp sending events in batch", { :host => @host, :events => events.length, :headers => headers }]) + @logger.debug(["stomp sending events in batch", { :host => @host, :events => events.length }]) @client.transaction do |t| events.each { |event| + headers = Hash.new + if @headers + @headers.each do |k,v| + headers[k] = event.sprintf(v) + end + end + t.send(event.sprintf(@destination), event.to_json, headers) } end From 9c60c799906b851f96506326df4414df72e5d599 Mon Sep 17 00:00:00 2001 From: val Date: Fri, 17 Feb 2017 10:03:24 +0100 Subject: [PATCH 06/15] Attempt to fix #16 (10-20% message loss) --- lib/logstash/outputs/stomp.rb | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 627f8c0..4e00242 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -60,19 +60,27 @@ def register @client.on_connection_closed { connect } - + + @done = false connect end # def register public def close @logger.warn("Disconnecting from stomp broker") - @client.disconnect if @client.connected? + Thread.pass until @done + @client.disconnect :receipt => 'disconnect-receipt-id' if @client.connected? end # def close - def multi_receive(events) + def done(inflight) + @done = inflight == 0 + puts "#{inflight} => @done = #{@done}" + end + def multi_receive(events) @logger.debug("stomp sending events in batch", { :host => @host, :events => events.length }) + inflight = events.length + done(inflight) @client.transaction do |t| events.each { |event| @@ -83,8 +91,11 @@ def multi_receive(events) end end - t.send(event.sprintf(@destination), event.to_json, headers) + t.send(event.sprintf(@destination), event.to_json, headers) do |r| + inflight -= 1 + done(inflight) + end } end - end # def receive + end # def multi_receive end # class LogStash::Outputs::Stomp From 7761ff7e490e8ec7deb7f522ef1b5a160f266532 Mon Sep 17 00:00:00 2001 From: val Date: Fri, 17 Feb 2017 10:06:49 +0100 Subject: [PATCH 07/15] Attempt to fix #16 (10-20% message loss) --- lib/logstash/outputs/stomp.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 4e00242..01919ad 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -74,7 +74,6 @@ def close def done(inflight) @done = inflight == 0 - puts "#{inflight} => @done = #{@done}" end def multi_receive(events) From 74427c43e871e1124368303d45c0bbf63d39ac9f Mon Sep 17 00:00:00 2001 From: val Date: Mon, 20 Feb 2017 17:16:24 +0100 Subject: [PATCH 08/15] New strategy with synchronous stomp library --- lib/logstash/outputs/stomp.rb | 48 +++++++++++++++-------------------- logstash-output-stomp.gemspec | 4 +-- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 01919ad..2e6e58c 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -36,11 +36,14 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base # Enable debugging output? config :debug, :validate => :boolean, :default => false + # this output is thread-safe + concurrency :shared + private def connect begin - @client.connect - @logger.debug("Connected to stomp server") if @client.connected? + @client = Stomp::Client.new(@user, @password.value, @host, @port) + @logger.debug("Connected to stomp server") if @client.open? rescue => e @logger.debug("Failed to connect to stomp server, will retry", :exception => e, :backtrace => e.backtrace) @@ -52,49 +55,38 @@ def connect public def register - require "onstomp" - @client = OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value) - @client.host = @vhost if @vhost - - # Handle disconnects - @client.on_connection_closed { - connect - } + require "stomp" - @done = false connect end # def register public def close @logger.warn("Disconnecting from stomp broker") - Thread.pass until @done - @client.disconnect :receipt => 'disconnect-receipt-id' if @client.connected? + @client.close end # def close - def done(inflight) - @done = inflight == 0 - end - def multi_receive(events) - @logger.debug("stomp sending events in batch", { :host => @host, :events => events.length }) - inflight = events.length - done(inflight) - @client.transaction do |t| - events.each { |event| - headers = Hash.new + tx_name = "tx-#{Random.rand(2**32..2**64-1)}" + @logger.debug("sending #{events.length} events in transaction #{tx_name}") + + begin + @client.begin tx_name + events.each do |event| + headers = Hash.new(:transaction => tx_name) if @headers @headers.each do |k,v| headers[k] = event.sprintf(v) end end - t.send(event.sprintf(@destination), event.to_json, headers) do |r| - inflight -= 1 - done(inflight) - end - } + @client.publish(event.sprintf(@destination), event.to_json, headers) + end + @client.commit tx_name + rescue Exception => exception + @logger.error("Error while sending #{events.length} events in transaction #{tx_name}", :error => exception) end + end # def multi_receive end # class LogStash::Outputs::Stomp diff --git a/logstash-output-stomp.gemspec b/logstash-output-stomp.gemspec index d146b92..d7fe3aa 100644 --- a/logstash-output-stomp.gemspec +++ b/logstash-output-stomp.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-stomp' - s.version = '3.0.4' + s.version = '3.0.5' s.licenses = ['Apache License (2.0)'] s.summary = "Send events to a stomp server" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -21,7 +21,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - s.add_runtime_dependency 'onstomp' + s.add_runtime_dependency 'stomp' s.add_development_dependency 'logstash-devutils' end From 67632d2021f3401bf1d1f45beceebaa82f8ae670 Mon Sep 17 00:00:00 2001 From: val Date: Wed, 22 Feb 2017 11:33:17 +0100 Subject: [PATCH 09/15] Made the stomp connection reliable --- lib/logstash/outputs/stomp.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 2e6e58c..baaf7a8 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -42,7 +42,7 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base private def connect begin - @client = Stomp::Client.new(@user, @password.value, @host, @port) + @client = Stomp::Client.new(@user, @password.value, @host, @port, :reliable => true) @logger.debug("Connected to stomp server") if @client.open? rescue => e @logger.debug("Failed to connect to stomp server, will retry", From e4b5635fb71f0514e773fae4e5b65280207a8df7 Mon Sep 17 00:00:00 2001 From: val Date: Wed, 22 Feb 2017 11:41:25 +0100 Subject: [PATCH 10/15] Made the stomp connection reliable --- lib/logstash/outputs/stomp.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index baaf7a8..b6ba78a 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -42,7 +42,13 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base private def connect begin - @client = Stomp::Client.new(@user, @password.value, @host, @port, :reliable => true) + conn_hash = { :hosts => [ + {:login => @user, :passcode => @password.value, + :host => @host, :port => @port}, + ], + :reliable => true + } + @client = Stomp::Client.new(conn_hash) @logger.debug("Connected to stomp server") if @client.open? rescue => e @logger.debug("Failed to connect to stomp server, will retry", From 4f158ae7b24dfd085fac094aae721e9c8d7aec53 Mon Sep 17 00:00:00 2001 From: val Date: Wed, 22 Feb 2017 11:45:20 +0100 Subject: [PATCH 11/15] Made the stomp connection reliable --- lib/logstash/outputs/stomp.rb | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index b6ba78a..9201b98 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -42,13 +42,7 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base private def connect begin - conn_hash = { :hosts => [ - {:login => @user, :passcode => @password.value, - :host => @host, :port => @port}, - ], - :reliable => true - } - @client = Stomp::Client.new(conn_hash) + @client = Stomp::Client.new(@user, @password.value, @host, @port, true) @logger.debug("Connected to stomp server") if @client.open? rescue => e @logger.debug("Failed to connect to stomp server, will retry", From b6724bf4b383f6c143a38480f6d912b7d0407cdc Mon Sep 17 00:00:00 2001 From: val Date: Wed, 22 Feb 2017 14:18:59 +0100 Subject: [PATCH 12/15] Made the stomp connection reliable --- lib/logstash/outputs/stomp.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 9201b98..2254d1b 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -42,7 +42,13 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base private def connect begin - @client = Stomp::Client.new(@user, @password.value, @host, @port, true) + params = { :reliable => true, + :max_reconnect_attempts => 3, + :hosts => [ { :login => @user, + :passcode => @password.value, + :host => @host, + :port => @port } ] } + @client = Stomp::Client.new(params) @logger.debug("Connected to stomp server") if @client.open? rescue => e @logger.debug("Failed to connect to stomp server, will retry", From 679d280e48658997a26bff4e92dc079c5e30dd4a Mon Sep 17 00:00:00 2001 From: val Date: Wed, 22 Feb 2017 15:11:28 +0100 Subject: [PATCH 13/15] Pinning the stomp dependency to min 1.4.3 because of memory leak in 1.4.2 --- logstash-output-stomp.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-stomp.gemspec b/logstash-output-stomp.gemspec index d7fe3aa..94bcddc 100644 --- a/logstash-output-stomp.gemspec +++ b/logstash-output-stomp.gemspec @@ -21,7 +21,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - s.add_runtime_dependency 'stomp' + s.add_runtime_dependency 'stomp', '>= 1.4.3' s.add_development_dependency 'logstash-devutils' end From 3ef33402d39bb074730012b799583cc97f2c95ec Mon Sep 17 00:00:00 2001 From: val Date: Wed, 22 Feb 2017 15:15:19 +0100 Subject: [PATCH 14/15] Pinning the stomp dependency to min 1.4.3 because of memory leak in 1.4.2 --- logstash-output-stomp.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-stomp.gemspec b/logstash-output-stomp.gemspec index 94bcddc..1ab3313 100644 --- a/logstash-output-stomp.gemspec +++ b/logstash-output-stomp.gemspec @@ -21,7 +21,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - s.add_runtime_dependency 'stomp', '>= 1.4.3' + s.add_runtime_dependency 'stomp', '~> 1.4.3' s.add_development_dependency 'logstash-devutils' end From 0cc56fad1deda6ec91aee5504b79f74cefc1d541 Mon Sep 17 00:00:00 2001 From: val Date: Thu, 23 Feb 2017 11:07:54 +0100 Subject: [PATCH 15/15] Added a custom logger --- lib/logstash/outputs/logger.rb | 296 +++++++++++++++++++++++++++++++++ lib/logstash/outputs/stomp.rb | 25 ++- 2 files changed, 306 insertions(+), 15 deletions(-) create mode 100644 lib/logstash/outputs/logger.rb diff --git a/lib/logstash/outputs/logger.rb b/lib/logstash/outputs/logger.rb new file mode 100644 index 0000000..d503aed --- /dev/null +++ b/lib/logstash/outputs/logger.rb @@ -0,0 +1,296 @@ +# encoding: utf-8 + +require "stomp" + +# == Example STOMP call back logger class. +# +# Optional callback methods: +# +# * on_connecting: connection starting +# * on_connected: successful connect +# * on_connectfail: unsuccessful connect (will usually be retried) +# * on_disconnect: successful disconnect +# +# * on_miscerr: on miscellaneous xmit/recv errors +# +# * on_publish: publish called +# * on_subscribe: subscribe called +# * on_unsubscribe: unsubscribe called +# +# * on_begin: begin called +# * on_ack: ack called +# * on_nack: nack called +# * on_commit: commit called +# * on_abort: abort called +# +# * on_receive: receive called and successful +# +# * on_ssl_connecting: SSL connection starting +# * on_ssl_connected: successful SSL connect +# * on_ssl_connectfail: unsuccessful SSL connect (will usually be retried) +# +# * on_hbread_fail: unsuccessful Heartbeat read +# * on_hbwrite_fail: unsuccessful Heartbeat write +# * on_hbfire: on any send or receive heartbeat +# +# All methods are optional, at the user's requirements. +# +# If a method is not provided, it is not called (of course.) +# +# IMPORTANT NOTE: in general, call back logging methods *SHOULD* not raise exceptions, +# otherwise the underlying STOMP connection may fail in mysterious ways. +# +# There are two useful exceptions to this rule for: +# +# * on_connectfail +# * on_ssl_connectfail +# +# These two methods can raise a Stomp::Errors::LoggerConnectionError. If this +# exception is raised, it is passed up the chain to the caller. +# +# Callback parameters: are a copy of the @parameters instance variable for +# the Stomp::Connection. +# +class StompLogger < Stomp::NullLogger + + # Initialize a new callback logger instance. + def initialize(logger = nil) + @log = logger || super() + @log.info("Logger initialization complete.") + end + + def marshal_dump + [] + end + + def marshal_load(array) + _init + end + + # Log connecting events + def on_connecting(parms) + begin + @log.debug "Connecting: #{info(parms)}" + rescue + @log.debug "Connecting oops" + end + end + + # Log connected events + def on_connected(parms) + begin + @log.debug "Connected: #{info(parms)}" + rescue + @log.debug "Connected oops" + end + end + + # Log connectfail events + def on_connectfail(parms) + begin + @log.debug "Connect Fail #{info(parms)}" + rescue + @log.debug "Connect Fail oops" + end +=begin + # An example LoggerConnectionError raise + @log.debug "Connect Fail, will raise" + raise Stomp::Error::LoggerConnectionError.new("quit from connect fail") +=end + end + + # Log disconnect events + def on_disconnect(parms) + begin + @log.debug "Disconnected #{info(parms)}" + rescue + @log.debug "Disconnected oops" + end + end + + # Log miscellaneous errors + def on_miscerr(parms, errstr) + begin + @log.debug "Miscellaneous Error #{info(parms)}" + @log.debug "Miscellaneous Error String #{errstr}" + rescue + @log.debug "Miscellaneous Error oops" + end + end + + # Log Subscribe + def on_subscribe(parms, headers) + begin + @log.debug "Subscribe Parms #{info(parms)}" + @log.debug "Subscribe Headers #{headers}" + rescue + @log.debug "Subscribe oops" + end + end + + # Log UnSubscribe + def on_unsubscribe(parms, headers) + begin + @log.debug "UnSubscribe Parms #{info(parms)}" + @log.debug "UnSubscribe Headers #{headers}" + rescue + @log.debug "UnSubscribe oops" + end + end + + # Log Publish + def on_publish(parms, message, headers) + begin + @log.debug "Publish Parms #{info(parms)}" + @log.debug "Publish Message #{message}" + @log.debug "Publish Headers #{headers}" + rescue + @log.debug "Publish oops" + end + end + + # Log Receive + def on_receive(parms, result) + begin + @log.debug "Receive Parms #{info(parms)}" + @log.debug "Receive Result #{result}" + rescue + @log.debug "Receive oops" + end + end + + # Log Begin + def on_begin(parms, headers) + begin + @log.debug "Begin Parms #{info(parms)}" + @log.debug "Begin Result #{headers}" + rescue + @log.debug "Begin oops" + end + end + + # Log Ack + def on_ack(parms, headers) + begin + @log.debug "Ack Parms #{info(parms)}" + @log.debug "Ack Result #{headers}" + rescue + @log.debug "Ack oops" + end + end + + # Log NAck + def on_nack(parms, headers) + begin + @log.debug "NAck Parms #{info(parms)}" + @log.debug "NAck Result #{headers}" + rescue + @log.debug "NAck oops" + end + end + + # Log Commit + def on_commit(parms, headers) + begin + @log.debug "Commit Parms #{info(parms)}" + @log.debug "Commit Result #{headers}" + rescue + @log.debug "Commit oops" + end + end + + # Log Abort + def on_abort(parms, headers) + begin + @log.debug "Abort Parms #{info(parms)}" + @log.debug "Abort Result #{headers}" + rescue + @log.debug "Abort oops" + end + end + + # Stomp 1.1+ - heart beat read (receive) failed. + def on_hbread_fail(parms, ticker_data = {}) + begin + @log.debug "Hbreadf Parms #{info(parms)}" + @log.debug "Hbreadf Result #{ticker_data.inspect}" + rescue + @log.debug "Hbreadf oops" + end + end + + # Stomp 1.1+ - heart beat send (transmit) failed. + def on_hbwrite_fail(parms, ticker_data = {}) + begin + @log.debug "Hbwritef Parms #{info(parms)}" + @log.debug "Hbwritef Result #{ticker_data.inspect}" + rescue + @log.debug "Hbwritef oops" + end + end + + # Log SSL connection start. + def on_ssl_connecting(parms) + begin + @log.debug "SSL Connecting Parms #{info(parms)}" + rescue + @log.debug "SSL Connecting oops" + end + end + + # Log a successful SSL connect. + def on_ssl_connected(parms) + begin + @log.debug "SSL Connected Parms #{info(parms)}" + rescue + @log.debug "SSL Connected oops" + end + end + + # Log an unsuccessful SSL connect. + def on_ssl_connectfail(parms) + begin + @log.debug "SSL Connect Fail Parms #{info(parms)}" + @log.debug "SSL Connect Fail Exception #{parms[:ssl_exception]}, #{parms[:ssl_exception].message}" + rescue + @log.debug "SSL Connect Fail oops" + end +=begin + # An example LoggerConnectionError raise + @log.debug "SSL Connect Fail, will raise" + raise Stomp::Error::LoggerConnectionError.new("quit from SSL connect") +=end + end + + # Log heart beat fires + def on_hbfire(parms, srind, firedata = {}) + begin + @log.debug "HeartBeat Fire Parms #{info(parms)}" + @log.debug "HeartBeat Fire Send/Receive #{srind}" + rescue + @log.debug "HeartBeat Fire oops" + end + end + + private + + # Example information extract. + def info(parms) + # + # Available in the parms Hash: + # parms[:cur_host] + # parms[:cur_port] + # parms[:cur_login] + # parms[:cur_passcode] + # parms[:cur_ssl] + # parms[:cur_recondelay] + # parms[:cur_parseto] + # parms[:cur_conattempts] + # parms[:openstat] + # + # For the on_ssl_connectfail callback these are also available: + # parms[:ssl_exception] + # + "Host: #{parms[:cur_host]}, Port: #{parms[:cur_port]}, Login: #{parms[:cur_login]}, Passcode: #{parms[:cur_passcode]}, ssl: #{parms[:cur_ssl]}" + end +end # of class diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 2254d1b..9439ddd 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -1,5 +1,6 @@ # encoding: utf-8 require "logstash/outputs/base" +require "logstash/outputs/logger" require "logstash/namespace" @@ -39,15 +40,17 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base # this output is thread-safe concurrency :shared - private - def connect + public + def register + require "stomp" begin params = { :reliable => true, - :max_reconnect_attempts => 3, - :hosts => [ { :login => @user, - :passcode => @password.value, - :host => @host, - :port => @port } ] } + :max_reconnect_attempts => 3, + :logger => StompLogger.new(@logger), + :hosts => [ { :login => @user, + :passcode => @password.value, + :host => @host, + :port => @port } ] } @client = Stomp::Client.new(params) @logger.debug("Connected to stomp server") if @client.open? rescue => e @@ -56,14 +59,6 @@ def connect sleep 2 retry end - end - - - public - def register - require "stomp" - - connect end # def register public